Asynchronous Programming
Nonblocking I/O¶
The AWS SDK 2.x features truly nonblocking asynchronous clients that implement high concurrency across a few threads.
SDK 1.x uses blocking I/O
The AWS SDK for Java 1.11.x has asynchronous clients that are wrappers around a thread pool and blocking synchronous clients that don’t provide the full benefit of nonblocking I/O.
Tempest Async APIs¶
Tempest for SDK 2.x comes with async APIs that utilize Kotlin coroutine and Java CompletableFuture.
Declare you DB and tables as AsyncLogicalDb
and AsyncLogicalTable
.
interface AsyncMusicDb : AsyncLogicalDb {
@TableName("music_items")
val music: AsyncMusicTable
}
interface AsyncMusicTable : AsyncLogicalTable<MusicItem> {
val albumInfo: AsyncInlineView<AlbumInfo.Key, AlbumInfo>
val albumTracks: AsyncInlineView<AlbumTrack.Key, AlbumTrack>
val playlistInfo: AsyncInlineView<PlaylistInfo.Key, PlaylistInfo>
// Global Secondary Indexes.
val albumInfoByGenre: AsyncSecondaryIndex<AlbumInfo.GenreIndexOffset, AlbumInfo>
val albumInfoByArtist: AsyncSecondaryIndex<AlbumInfo.ArtistIndexOffset, AlbumInfo>
// Local Secondary Indexes.
val albumTracksByTitle: AsyncSecondaryIndex<AlbumTrack.TitleIndexOffset, AlbumTrack>
}
public interface AsyncMusicDb extends AsyncLogicalDb {
@TableName("music_items")
AsyncMusicTable music();
}
public interface AsyncMusicTable extends AsyncLogicalTable<MusicItem> {
AsyncInlineView<AlbumInfo.Key, AlbumInfo> albumInfo();
AsyncInlineView<AlbumTrack.Key, AlbumTrack> albumTracks();
AsyncInlineView<PlaylistInfo.Key, PlaylistInfo> playlistInfo();
// Global Secondary Indexes.
AsyncSecondaryIndex<AlbumInfo.GenreIndexOffset, AlbumInfo> albumInfoByGenre();
AsyncSecondaryIndex<AlbumInfo.ArtistIndexOffset, AlbumInfo> albumInfoByArtist();
// Local Secondary Indexes.
AsyncSecondaryIndex<AlbumTrack.TitleIndexOffset, AlbumTrack> albumTracksByTitle();
}
Write familiar code that is asynchronous under the hood.
private val table: AsyncMusicTable
suspend fun changePlaylistName(playlistToken: String, newName: String) {
// Read.
val existing = checkNotNull(
table.playlistInfo.load(PlaylistInfo.Key(playlistToken)) // This is a suspend function.
) { "Playlist does not exist: $playlistToken" }
// Modify.
val newPlaylist = existing.copy(
playlist_name = newName,
playlist_version = existing.playlist_version + 1
)
// Write.
table.playlistInfo.save( // This is a suspend function.
newPlaylist,
ifPlaylistVersionIs(existing.playlist_version)
)
}
private fun ifPlaylistVersionIs(playlist_version: Long): Expression {
return Expression.builder()
.expression("playlist_version = :playlist_version")
.expressionValues(mapOf(":playlist_version" to AttributeValue.builder().n("$playlist_version").build()))
.build()
}
private final AsyncMusicTable table;
public CompletableFuture<Void> changePlaylistName(String playlistToken, String newName) {
// Read.
return table.playlistInfo()
.loadAsync(new PlaylistInfo.Key(playlistToken))
.thenCompose(existing -> {
if (existing == null) {
throw new IllegalStateException("Playlist does not exist: " + playlistToken);
}
// Modify.
PlaylistInfo newPlaylist = new PlaylistInfo(
existing.playlist_token,
newName,
existing.playlist_tracks,
// playlist_version.
existing.playlist_version + 1
);
// Write.
return table.playlistInfo()
.saveAsync(
newPlaylist,
ifPlaylistVersionIs(existing.playlist_version)
);
});
}
private Expression ifPlaylistVersionIs(Long playlist_version) {
return Expression.builder()
.expression("playlist_version = :playlist_version")
.expressionValues(
Map.of(":playlist_version", AttributeValue.builder().n("" + playlist_version).build()))
.build();
}
Check out the code samples on Github: