Dynamodb 简明教程

DynamoDB - Table Activity

Amazon DynamoDB 流可让您跟踪和响应表内项目的更改。使用此功能创建一个应用程序,该应用程序可通过更新不同来源上的信息来响应这些更改。同步大型多用户系统中成千上万用户的相关信息。使用此功能向用户发送更新通知。其应用程序具有多样且重要的功能。Amazon DynamoDB 流作为用于实现此功能的主要工具。

流记录按时间序列捕获表内项目的修改信息。它们最多保存这些数据 24 小时。应用程序使用这些数据几乎实时地查看原始项目和修改后的项目。

启用后的流可捕获所有修改。在任何 CRUD 操作中,Amazon DynamoDB 都会创建一个包含修改后的项目主键属性的流记录。您可以为流配置其它信息,例如修改前和修改后的镜像。

流具有两项保障,即:

  1. 流中每条记录只出现一次

  2. 每个项目修改都会导致流记录的顺序与修改顺序相同

所有流都进行实时处理,这让您可将它们应用于应用程序中的相关功能。

Managing Streams

您可以在创建表时启用流。对于现有表,您可以禁用或更改流设置。流提供异步操作的功能,这意味着不会影响表的性能。

利用 AWS 管理控制台简化流管理。首先,导航到该控制台,然后选择 Tables 。在“概述”选项卡中,选择 Manage Stream 。在窗口中,选择在表数据修改时添加到流中的信息。输入所有设置后,选择 Enable

如果您想禁用任何现有流,请选择 Manage Stream ,然后再选择 Disable

您还可以使用 CreateTable 和 UpdateTable API 启用或更改流。使用 StreamSpecification 参数配置流。StreamEnabled 指定状态,对于启用为 true,对于禁用为 false。

StreamViewType 指定添加到流中的信息:KEYS_ONLY、NEW_IMAGE、OLD_IMAGE 和 NEW_AND_OLD_IMAGES。

Stream Reading

通过连接到端点并发出 API 请求来读取和处理流。每个流都包含多个流记录,而每个记录都作为单个修改存在,而此修改拥有流。流记录包括一个序号,用于显示发布顺序。记录属于称为分片的组。分片用作多个记录的容器,并且还包含用于访问和遍历记录所需的信息。24 小时后,记录会自动删除。

这些分片根据需要生成和删除,并且不会长时间存在。它们还会自动分成多个新分片,通常是为了响应写入活动激增。禁用流后,打开的分片会关闭。分片之间的层次关系意味着应用程序必须优先处理父分片,以保证正确的处理顺序。您可以使用 Kinesis Adapter 来自动执行此操作。

Note - 未造成任何更改的操作不会写入流记录。

访问和处理记录需要执行以下任务:

  1. 确定目标流的 ARN。

  2. 确定包含目标记录的流分片。

  3. 访问分片来检索所需的记录。

Note −最多应同时有2个进程读取一个分片。如果超过2个进程,它可能会抑制来源。

可用的流API操作包括

  1. ListStreams

  2. DescribeStream

  3. GetShardIterator

  4. GetRecords

您可以查看流读操作的以下示例:−

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreamsClient;

import com.amazonaws.services.dynamodbv2.model.AttributeAction;
import com.amazonaws.services.dynamodbv2.model.AttributeDefinition;
import com.amazonaws.services.dynamodbv2.model.AttributeValue;
import com.amazonaws.services.dynamodbv2.model.AttributeValueUpdate;

import com.amazonaws.services.dynamodbv2.model.CreateTableRequest;
import com.amazonaws.services.dynamodbv2.model.DescribeStreamRequest;
import com.amazonaws.services.dynamodbv2.model.DescribeStreamResult;
import com.amazonaws.services.dynamodbv2.model.DescribeTableResult;

import com.amazonaws.services.dynamodbv2.model.GetRecordsRequest;
import com.amazonaws.services.dynamodbv2.model.GetRecordsResult;
import com.amazonaws.services.dynamodbv2.model.GetShardIteratorRequest;
import com.amazonaws.services.dynamodbv2.model.GetShardIteratorResult;

import com.amazonaws.services.dynamodbv2.model.KeySchemaElement;
import com.amazonaws.services.dynamodbv2.model.KeyType;
import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput;
import com.amazonaws.services.dynamodbv2.model.Record;

import com.amazonaws.services.dynamodbv2.model.Shard;
import com.amazonaws.services.dynamodbv2.model.ShardIteratorType;
import com.amazonaws.services.dynamodbv2.model.StreamSpecification;
import com.amazonaws.services.dynamodbv2.model.StreamViewType;
import com.amazonaws.services.dynamodbv2.util.Tables;

public class StreamsExample {
   private static AmazonDynamoDBClient dynamoDBClient =
      new AmazonDynamoDBClient(new ProfileCredentialsProvider());
   private static AmazonDynamoDBStreamsClient streamsClient =
      new AmazonDynamoDBStreamsClient(new ProfileCredentialsProvider());

   public static void main(String args[]) {
      dynamoDBClient.setEndpoint("InsertDbEndpointHere");
      streamsClient.setEndpoint("InsertStreamEndpointHere");

      // table creation
      String tableName = "MyTestingTable";
      ArrayList<AttributeDefinition> attributeDefinitions =
         new ArrayList<AttributeDefinition>();

      attributeDefinitions.add(new AttributeDefinition()
         .withAttributeName("ID")
         .withAttributeType("N"));

      ArrayList<KeySchemaElement> keySchema = new
         ArrayList<KeySchemaElement>();

      keySchema.add(new KeySchemaElement()
         .withAttributeName("ID")
         .withKeyType(KeyType.HASH));                       //Partition key

      StreamSpecification streamSpecification = new StreamSpecification();
      streamSpecification.setStreamEnabled(true);
      streamSpecification.setStreamViewType(StreamViewType.NEW_AND_OLD_IMAGES);
      CreateTableRequest createTableRequest = new CreateTableRequest()
         .withTableName(tableName)
         .withKeySchema(keySchema)
         .withAttributeDefinitions(attributeDefinitions)
         .withProvisionedThroughput(new ProvisionedThroughput()
         .withReadCapacityUnits(1L)
         .withWriteCapacityUnits(1L))
         .withStreamSpecification(streamSpecification);

      System.out.println("Executing CreateTable for " + tableName);
      dynamoDBClient.createTable(createTableRequest);
      System.out.println("Creating " + tableName);

      try {
         Tables.awaitTableToBecomeActive(dynamoDBClient, tableName);
      } catch (InterruptedException e) {
         e.printStackTrace();
      }

      // Get the table's stream settings
      DescribeTableResult describeTableResult =
         dynamoDBClient.describeTable(tableName);

      String myStreamArn = describeTableResult.getTable().getLatestStreamArn();
      StreamSpecification myStreamSpec =
         describeTableResult.getTable().getStreamSpecification();

      System.out.println("Current stream ARN for " + tableName + ": "+ myStreamArn);
      System.out.println("Stream enabled: "+ myStreamSpec.getStreamEnabled());
      System.out.println("Update view type: "+ myStreamSpec.getStreamViewType());

      // Add an item
      int numChanges = 0;
      System.out.println("Making some changes to table data");
      Map<String, AttributeValue> item = new HashMap<String, AttributeValue>();
      item.put("ID", new AttributeValue().withN("222"));
      item.put("Alert", new AttributeValue().withS("item!"));
      dynamoDBClient.putItem(tableName, item);
      numChanges++;

      // Update the item
      Map<String, AttributeValue> key = new HashMap<String, AttributeValue>();
      key.put("ID", new AttributeValue().withN("222"));
      Map<String, AttributeValueUpdate> attributeUpdates =
      new HashMap<String, AttributeValueUpdate>();

      attributeUpdates.put("Alert", new AttributeValueUpdate()
         .withAction(AttributeAction.PUT)
         .withValue(new AttributeValue().withS("modified item")));

      dynamoDBClient.updateItem(tableName, key, attributeUpdates);
      numChanges++;

      // Delete the item
      dynamoDBClient.deleteItem(tableName, key);
      numChanges++;

      // Get stream shards
      DescribeStreamResult describeStreamResult =
      streamsClient.describeStream(new DescribeStreamRequest()
         .withStreamArn(myStreamArn));
      String streamArn =
         describeStreamResult.getStreamDescription().getStreamArn();
      List<Shard> shards =
         describeStreamResult.getStreamDescription().getShards();

      // Process shards
      for (Shard shard : shards) {
         String shardId = shard.getShardId();
         System.out.println("Processing " + shardId + " in "+ streamArn);

         // Get shard iterator
         GetShardIteratorRequest getShardIteratorRequest = new
            GetShardIteratorRequest()
            .withStreamArn(myStreamArn)
            .withShardId(shardId)
            .withShardIteratorType(ShardIteratorType.TRIM_HORIZON);

         GetShardIteratorResult getShardIteratorResult =
            streamsClient.getShardIterator(getShardIteratorRequest);
         String nextItr = getShardIteratorResult.getShardIterator();

         while (nextItr != null && numChanges > 0) {
            // Read data records with iterator
            GetRecordsResult getRecordsResult =
               streamsClient.getRecords(new GetRecordsRequest().
               withShardIterator(nextItr));

            List<Record> records = getRecordsResult.getRecords();
            System.out.println("Pulling records...");

            for (Record record : records) {
               System.out.println(record);
               numChanges--;
            }
            nextItr = getRecordsResult.getNextShardIterator();
         }
      }
   }
}