Dynamodb 简明教程
DynamoDB - Table Activity
Amazon DynamoDB 流可让您跟踪和响应表内项目的更改。使用此功能创建一个应用程序,该应用程序可通过更新不同来源上的信息来响应这些更改。同步大型多用户系统中成千上万用户的相关信息。使用此功能向用户发送更新通知。其应用程序具有多样且重要的功能。Amazon DynamoDB 流作为用于实现此功能的主要工具。
DynamoDB streams enable you to track and respond to table item changes. Employ this functionality to create an application which responds to changes by updating information across sources. Synchronize data for thousands of users of a large, multi-user system. Use it to send notifications to users on updates. Its applications prove diverse and substantial. DynamoDB streams serve as the main tool used to achieve this functionality.
流记录按时间序列捕获表内项目的修改信息。它们最多保存这些数据 24 小时。应用程序使用这些数据几乎实时地查看原始项目和修改后的项目。
The streams capture time-ordered sequences containing item modifications within a table. They hold this data for a maximum of 24 hours. Applications use them to view the original and modified items, almost in real-time.
启用后的流可捕获所有修改。在任何 CRUD 操作中,Amazon DynamoDB 都会创建一个包含修改后的项目主键属性的流记录。您可以为流配置其它信息,例如修改前和修改后的镜像。
Streams enabled on a table capture all modifications. On any CRUD operation, DynamoDB creates a stream record with the primary key attributes of the modified items. You can configure streams for additional information such as before and after images.
流具有两项保障,即:
The Streams carry two guarantees −
-
Each record appears one time in the stream and
-
Each item modification results in the stream records of the same order as that of the modifications.
所有流都进行实时处理,这让您可将它们应用于应用程序中的相关功能。
All streams process in real-time to allow you to employ them for related functionality in applications.
Managing Streams
您可以在创建表时启用流。对于现有表,您可以禁用或更改流设置。流提供异步操作的功能,这意味着不会影响表的性能。
On table creation, you can enable a stream. Existing tables allow stream disabling or settings changes. Streams offer the feature of asynchronous operation, which means no table performance impact.
利用 AWS 管理控制台简化流管理。首先,导航到该控制台,然后选择 Tables 。在“概述”选项卡中,选择 Manage Stream 。在窗口中,选择在表数据修改时添加到流中的信息。输入所有设置后,选择 Enable 。
Utilize the AWS Management console for simple stream management. First, navigate to the console, and choose Tables. In the Overview tab, choose Manage Stream. Inside the window, select the information added to a stream on table data modifications. After entering all settings, select Enable.
如果您想禁用任何现有流,请选择 Manage Stream ,然后再选择 Disable 。
If you want to disable any existing streams, select Manage Stream, and then Disable.
您还可以使用 CreateTable 和 UpdateTable API 启用或更改流。使用 StreamSpecification 参数配置流。StreamEnabled 指定状态,对于启用为 true,对于禁用为 false。
You can also utilize the APIs CreateTable and UpdateTable to enable or change a stream. Use the parameter StreamSpecification to configure the stream. StreamEnabled specifies status, meaning true for enabled and false for disabled.
StreamViewType 指定添加到流中的信息:KEYS_ONLY、NEW_IMAGE、OLD_IMAGE 和 NEW_AND_OLD_IMAGES。
StreamViewType specifies information added to the stream: KEYS_ONLY, NEW_IMAGE, OLD_IMAGE, and NEW_AND_OLD_IMAGES.
Stream Reading
通过连接到端点并发出 API 请求来读取和处理流。每个流都包含多个流记录,而每个记录都作为单个修改存在,而此修改拥有流。流记录包括一个序号,用于显示发布顺序。记录属于称为分片的组。分片用作多个记录的容器,并且还包含用于访问和遍历记录所需的信息。24 小时后,记录会自动删除。
Read and process streams by connecting to an endpoint and making API requests. Each stream consists of stream records, and every record exists as a single modification which owns the stream. Stream records include a sequence number revealing publishing order. Records belong to groups also known as shards. Shards function as containers for several records, and also hold information needed for accessing and traversing records. After 24 hours, records automatically delete.
这些分片根据需要生成和删除,并且不会长时间存在。它们还会自动分成多个新分片,通常是为了响应写入活动激增。禁用流后,打开的分片会关闭。分片之间的层次关系意味着应用程序必须优先处理父分片,以保证正确的处理顺序。您可以使用 Kinesis Adapter 来自动执行此操作。
These Shards are generated and deleted as needed, and do not last long. They also divide into multiple new shards automatically, typically in response to write activity spikes. On stream disabling, open shards close. The hierarchical relationship between shards means applications must prioritize the parent shards for correct processing order. You can use Kinesis Adapter to automatically do this.
Note - 未造成任何更改的操作不会写入流记录。
Note − The operations resulting in no change do not write stream records.
访问和处理记录需要执行以下任务:
Accessing and processing records requires performing the following tasks −
-
Determine the ARN of the target stream.
-
Determine the shard(s) of the stream holding the target records.
-
Access the shard(s) to retrieve the desired records.
Note −最多应同时有2个进程读取一个分片。如果超过2个进程,它可能会抑制来源。
Note − There should be a maximum of 2 processes reading a shard at once. If it exceeds 2 processes, then it can throttle the source.
可用的流API操作包括
The stream API actions available include
-
ListStreams
-
DescribeStream
-
GetShardIterator
-
GetRecords
您可以查看流读操作的以下示例:−
You can review the following example of the stream reading −
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreamsClient;
import com.amazonaws.services.dynamodbv2.model.AttributeAction;
import com.amazonaws.services.dynamodbv2.model.AttributeDefinition;
import com.amazonaws.services.dynamodbv2.model.AttributeValue;
import com.amazonaws.services.dynamodbv2.model.AttributeValueUpdate;
import com.amazonaws.services.dynamodbv2.model.CreateTableRequest;
import com.amazonaws.services.dynamodbv2.model.DescribeStreamRequest;
import com.amazonaws.services.dynamodbv2.model.DescribeStreamResult;
import com.amazonaws.services.dynamodbv2.model.DescribeTableResult;
import com.amazonaws.services.dynamodbv2.model.GetRecordsRequest;
import com.amazonaws.services.dynamodbv2.model.GetRecordsResult;
import com.amazonaws.services.dynamodbv2.model.GetShardIteratorRequest;
import com.amazonaws.services.dynamodbv2.model.GetShardIteratorResult;
import com.amazonaws.services.dynamodbv2.model.KeySchemaElement;
import com.amazonaws.services.dynamodbv2.model.KeyType;
import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput;
import com.amazonaws.services.dynamodbv2.model.Record;
import com.amazonaws.services.dynamodbv2.model.Shard;
import com.amazonaws.services.dynamodbv2.model.ShardIteratorType;
import com.amazonaws.services.dynamodbv2.model.StreamSpecification;
import com.amazonaws.services.dynamodbv2.model.StreamViewType;
import com.amazonaws.services.dynamodbv2.util.Tables;
public class StreamsExample {
private static AmazonDynamoDBClient dynamoDBClient =
new AmazonDynamoDBClient(new ProfileCredentialsProvider());
private static AmazonDynamoDBStreamsClient streamsClient =
new AmazonDynamoDBStreamsClient(new ProfileCredentialsProvider());
public static void main(String args[]) {
dynamoDBClient.setEndpoint("InsertDbEndpointHere");
streamsClient.setEndpoint("InsertStreamEndpointHere");
// table creation
String tableName = "MyTestingTable";
ArrayList<AttributeDefinition> attributeDefinitions =
new ArrayList<AttributeDefinition>();
attributeDefinitions.add(new AttributeDefinition()
.withAttributeName("ID")
.withAttributeType("N"));
ArrayList<KeySchemaElement> keySchema = new
ArrayList<KeySchemaElement>();
keySchema.add(new KeySchemaElement()
.withAttributeName("ID")
.withKeyType(KeyType.HASH)); //Partition key
StreamSpecification streamSpecification = new StreamSpecification();
streamSpecification.setStreamEnabled(true);
streamSpecification.setStreamViewType(StreamViewType.NEW_AND_OLD_IMAGES);
CreateTableRequest createTableRequest = new CreateTableRequest()
.withTableName(tableName)
.withKeySchema(keySchema)
.withAttributeDefinitions(attributeDefinitions)
.withProvisionedThroughput(new ProvisionedThroughput()
.withReadCapacityUnits(1L)
.withWriteCapacityUnits(1L))
.withStreamSpecification(streamSpecification);
System.out.println("Executing CreateTable for " + tableName);
dynamoDBClient.createTable(createTableRequest);
System.out.println("Creating " + tableName);
try {
Tables.awaitTableToBecomeActive(dynamoDBClient, tableName);
} catch (InterruptedException e) {
e.printStackTrace();
}
// Get the table's stream settings
DescribeTableResult describeTableResult =
dynamoDBClient.describeTable(tableName);
String myStreamArn = describeTableResult.getTable().getLatestStreamArn();
StreamSpecification myStreamSpec =
describeTableResult.getTable().getStreamSpecification();
System.out.println("Current stream ARN for " + tableName + ": "+ myStreamArn);
System.out.println("Stream enabled: "+ myStreamSpec.getStreamEnabled());
System.out.println("Update view type: "+ myStreamSpec.getStreamViewType());
// Add an item
int numChanges = 0;
System.out.println("Making some changes to table data");
Map<String, AttributeValue> item = new HashMap<String, AttributeValue>();
item.put("ID", new AttributeValue().withN("222"));
item.put("Alert", new AttributeValue().withS("item!"));
dynamoDBClient.putItem(tableName, item);
numChanges++;
// Update the item
Map<String, AttributeValue> key = new HashMap<String, AttributeValue>();
key.put("ID", new AttributeValue().withN("222"));
Map<String, AttributeValueUpdate> attributeUpdates =
new HashMap<String, AttributeValueUpdate>();
attributeUpdates.put("Alert", new AttributeValueUpdate()
.withAction(AttributeAction.PUT)
.withValue(new AttributeValue().withS("modified item")));
dynamoDBClient.updateItem(tableName, key, attributeUpdates);
numChanges++;
// Delete the item
dynamoDBClient.deleteItem(tableName, key);
numChanges++;
// Get stream shards
DescribeStreamResult describeStreamResult =
streamsClient.describeStream(new DescribeStreamRequest()
.withStreamArn(myStreamArn));
String streamArn =
describeStreamResult.getStreamDescription().getStreamArn();
List<Shard> shards =
describeStreamResult.getStreamDescription().getShards();
// Process shards
for (Shard shard : shards) {
String shardId = shard.getShardId();
System.out.println("Processing " + shardId + " in "+ streamArn);
// Get shard iterator
GetShardIteratorRequest getShardIteratorRequest = new
GetShardIteratorRequest()
.withStreamArn(myStreamArn)
.withShardId(shardId)
.withShardIteratorType(ShardIteratorType.TRIM_HORIZON);
GetShardIteratorResult getShardIteratorResult =
streamsClient.getShardIterator(getShardIteratorRequest);
String nextItr = getShardIteratorResult.getShardIterator();
while (nextItr != null && numChanges > 0) {
// Read data records with iterator
GetRecordsResult getRecordsResult =
streamsClient.getRecords(new GetRecordsRequest().
withShardIterator(nextItr));
List<Record> records = getRecordsResult.getRecords();
System.out.println("Pulling records...");
for (Record record : records) {
System.out.println(record);
numChanges--;
}
nextItr = getRecordsResult.getNextShardIterator();
}
}
}
}