Apache Storm 简明教程

Apache Storm - Trident

Trident 是 Storm 的扩展。像 Storm 一样,Trident 也由 Twitter 开发。开发 Trident 的主要原因是提供一个基于 Storm 的高级抽象以及有状态流处理和低延迟分布式查询。

Trident 使用水 spout 和螺栓,但这些底层组件在执行前是由 Trident 自动生成的。Trident 具有函数、过滤器、联接、分组和聚合。

Trident 处理作为一系列批次的流,这些批次称为事务。通常来说,这些小批次的大小根据输入流的规模,将达到数千或数百万个元组。通过这种方式,Trident 不同于按元组执行处理的 Storm。

批处理的概念与数据库事务非常相似。每个事务都被分配一个事务 ID。一旦事务的所有处理完成,事务就会被认为成功。但是,如果处理事务元组中的一个失败,整个事务将被重新传输。对于每个批次,Trident 将在事务开始时调用 beginCommit,在事务结束时调用 commit。

Trident Topology

Trident API 暴露出使用 “TridentTopology” 类创建 Trident 拓扑的简单选项。基本上,Trident 拓扑从喷口接收输入流,并对流执行有序的操作序列(过滤、聚合、分组等)。Storm Tuple 被 Trident Tuple 替换,Bolt 被操作替换。一个简单的 Trident 拓扑可以如下创建 −

TridentTopology topology = new TridentTopology();

Trident Tuples

Trident 元组是一个带名称的值列表。TridentTuple 接口是 Trident 拓扑的数据模型。TridentTuple 接口是可以由 Trident 拓扑处理的数据的基本单位。

Trident Spout

Trident 喷口类似于 Storm 喷口,并且具有使用 Trident 功能的附加选项。实际上,我们仍然可以使用我们在 Storm 拓扑中使用的 IRichSpout,但其本质上是非事务性的,我们无法使用 Trident 提供的优势。

具有使用 Trident 功能的所有功能的基本喷口是 "ITridentSpout"。它支持事务语义和不透明事务语义。其他喷口有 IBatchSpout、IPartitionedTridentSpout 和 IOpaquePartitionedTridentSpout。

除了这些通用喷口之外,Trident 还有许多三叉喷口的示例实现。其中之一是 FeederBatchSpout 喷口,我们可以使用它轻松发送三叉元组的已命名列表,而无需担心批处理、并行性等问题。

FeederBatchSpout 的创建和数据馈送可以按如下所示完成 −

TridentTopology topology = new TridentTopology();
FeederBatchSpout testSpout = new FeederBatchSpout(
   ImmutableList.of("fromMobileNumber", "toMobileNumber", “duration”));
topology.newStream("fixed-batch-spout", testSpout)
testSpout.feed(ImmutableList.of(new Values("1234123401", "1234123402", 20)));

Trident Operations

Trident 依赖于 “Trident Operation” 来处理三叉元组的输入流。Trident API 具有大量内置操作,可用于处理从简单到复杂的流处理。这些操作范围从验证三叉元组的简单分组和聚合到复杂的验证。让我们了解一下最重要的和最常用的操作。

Filter

过滤器是一个用于执行输入验证任务的对象。Trident 过滤器获取三叉元组字段的子集作为输入,并根据满足或不满足某些条件返回 true 或 false。如果返回 true,则元组保留在输出流中;否则,元组从流中移除。过滤器基本上将继承 BaseFilter 类并实现 isKeep 方法。如下所示是过滤器操作的一个示例实现 −

public class MyFilter extends BaseFilter {
   public boolean isKeep(TridentTuple tuple) {
      return tuple.getInteger(1) % 2 == 0;
   }
}

input

[1, 2]
[1, 3]
[1, 4]

output

[1, 2]
[1, 4]

可以使用 “each” 方法在拓扑中调用过滤器函数。可以使用 “Fields” 类指定输入(三叉元组子集)。示例代码如下 −

TridentTopology topology = new TridentTopology();
topology.newStream("spout", spout)
.each(new Fields("a", "b"), new MyFilter())

Function

Function 是一个用于对单个三叉元组执行简单操作的对象。它采用三叉元组字段的子集,并发出零个或多个新的三叉元组字段。

Function 基本上继承 BaseFunction 类并实现 execute 方法。示例实现如下 −

public class MyFunction extends BaseFunction {
   public void execute(TridentTuple tuple, TridentCollector collector) {
      int a = tuple.getInteger(0);
      int b = tuple.getInteger(1);
      collector.emit(new Values(a + b));
   }
}

input

[1, 2]
[1, 3]
[1, 4]

output

[1, 2, 3]
[1, 3, 4]
[1, 4, 5]

就像过滤器操作一样,可以使用 each 方法在拓扑中调用函数操作。示例代码如下 −

TridentTopology topology = new TridentTopology();
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d")));

Aggregation

聚合是一个用于对输入批次或分区或流执行聚合操作的对象。Trident 有三种类型的聚合。具体如下 −

  1. aggregate − 孤立地聚合每个批次的三叉元组。在聚合过程中,最初使用全局分组对元组进行重新分区,以将同一批次的所有分区合并为单个分区。

  2. partitionAggregate − 聚合每个分区,而不是整个批次的三叉元组。分区聚合的输出完全替换输入元组。分区聚合的输出包含一个单字段元组。

  3. persistentaggregate − 在所有批次中的所有三叉元组上进行聚合,并将结果存储在内存或数据库中。

TridentTopology topology = new TridentTopology();

// aggregate operation
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .aggregate(new Count(), new Fields(“count”))

// partitionAggregate operation
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .partitionAggregate(new Count(), new Fields(“count"))

// persistentAggregate - saving the count to memory
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));

可以使用 CombinerAggregator、ReducerAggregator 或通用 Aggregator 接口创建聚合操作。上面示例中使用的 “count” 聚合器是内置聚合器之一。它使用 “CombinerAggregator” 实现。实现如下 −

public class Count implements CombinerAggregator<Long> {
   @Override
   public Long init(TridentTuple tuple) {
      return 1L;
   }

   @Override
   public Long combine(Long val1, Long val2) {
      return val1 + val2;
   }

   @Override
   public Long zero() {
      return 0L;
   }
}

Grouping

分组操作是内置操作,可通过 groupBy 方法调用。groupBy 方法通过对指定字段执行 partitionBy,重新对流进行分区,然后在每个分区中,将组字段相等的元组分组在一起。通常,我们使用“groupBy”和“persistentAggregate”来获取分组聚合。示例代码如下 −

TridentTopology topology = new TridentTopology();

// persistentAggregate - saving the count to memory
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .groupBy(new Fields(“d”)
   .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));

Merging and Joining

可以通过分别使用“merge”和“join”方法来进行合并和连接。合并组合一个或多个流。连接与合并类似,但连接使用来自两侧的三元组字段来检查和连接两个流。此外,连接仅在批处理级别起作用。示例代码如下 −

TridentTopology topology = new TridentTopology();
topology.merge(stream1, stream2, stream3);
topology.join(stream1, new Fields("key"), stream2, new Fields("x"),
   new Fields("key", "a", "b", "c"));

State Maintenance

Trident 提供了维护状态的机制。状态信息可以存储在拓扑本身中,否则你也可以将其存储在单独的数据库中。目的是维护一个状态:如果在处理过程中任何元组发生故障,那么将重试失败的元组。在更新状态时,这会导致问题,因为你无法确定先前是否更新过此元组的状态。如果在更新状态之前元组发生故障,那么重试元组将使状态保持稳定。但是,如果在更新状态后元组发生故障,那么重试同一个元组将再次增加数据库中记录的数目,并使状态不稳定。需要执行以下步骤来确保仅处理消息一次 −

  1. 以小批次处理元组。

  2. 为每个批次分配一个唯一 ID。如果重试批次,它将获得相同的唯一 ID。

  3. 状态更新在批次之间排序。例如,在第一个批次的状态更新完成之前,不可能更新第二个批次的状态。

Distributed RPC

分布式 RPC 用于从 Trident 拓扑查询和检索结果。Storm 有一个内置的分布式 RPC 服务器。分布式 RPC 服务器接收来自客户端的 RPC 请求,并将其传递给拓扑。拓扑处理请求并将结果发送到分布式 RPC 服务器,该服务器由分布式 RPC 服务器重定向到客户端。Trident 的分布式 RPC 查询与普通 RPC 查询一样执行,只是这些查询并行运行。

When to Use Trident?

在许多用例中,如果要求仅处理一次查询,我们可以通过在 Trident 中编写拓扑来实现。另一方面,在 Storm 中难以实现处理一次。因此,在需要精确处理一次的用例中,Trident 将非常有用。Trident 不适用于所有用例,特别是高性能用例,因为它增加了 Storm 的复杂性并管理状态。

Working Example of Trident

我们准备将上一部分中研究过的呼叫记录分析器应用程序转换为 Trident 框架。与普通 Storm 相比,Trident 应用程序相对容易,这要归功于其高级 API。在 Trident 中,Storm 主要需要执行函数、过滤器、聚合、分组、连接和合并运算中的任何一个。最后,我们将使用 LocalDRPC 类启动 DRPC 服务器,并使用 LocalDRPC 类的 execute 方法搜索一些关键字。

Formatting the call information

FormatCall 类的目的是格式化呼叫信息,包括“呼叫号码”和“接收号码”。完整的程序代码如下所示 −

Coding: FormatCall.java

import backtype.storm.tuple.Values;

import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;

public class FormatCall extends BaseFunction {
   @Override
   public void execute(TridentTuple tuple, TridentCollector collector) {
      String fromMobileNumber = tuple.getString(0);
      String toMobileNumber = tuple.getString(1);
      collector.emit(new Values(fromMobileNumber + " - " + toMobileNumber));
   }
}

CSVSplit

CSVSplit 类的目的是基于“逗号 (,)”拆分输入字符串,并发出字符串中的每个单词。此函数用于解析分布式查询的输入参数。完整的代码如下所示 −

Coding: CSVSplit.java

import backtype.storm.tuple.Values;

import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;

public class CSVSplit extends BaseFunction {
   @Override
   public void execute(TridentTuple tuple, TridentCollector collector) {
      for(String word: tuple.getString(0).split(",")) {
         if(word.length() > 0) {
            collector.emit(new Values(word));
         }
      }
   }
}

Log Analyzer

这是主应用程序。最初,应用程序将初始化 TridentTopology 并使用 FeederBatchSpout 输入呼叫者信息。可以使用 TridentTopology 类的 newStream 方法创建 Trident 拓扑流。类似地,可以使用 TridentTopology 类的 newDRCPStream 方法创建 Trident 拓扑 DRPC 流。可以使用 LocalDRPC 类创建一个简单的 DRCP 服务器。 LocalDRPC 有一个用于搜索一些关键字的 execute 方法。完整的代码如下所示。

Coding: LogAnalyserTrident.java

import java.util.*;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.utils.DRPCClient;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import storm.trident.TridentState;
import storm.trident.TridentTopology;
import storm.trident.tuple.TridentTuple;

import storm.trident.operation.builtin.FilterNull;
import storm.trident.operation.builtin.Count;
import storm.trident.operation.builtin.Sum;
import storm.trident.operation.builtin.MapGet;
import storm.trident.operation.builtin.Debug;
import storm.trident.operation.BaseFilter;

import storm.trident.testing.FixedBatchSpout;
import storm.trident.testing.FeederBatchSpout;
import storm.trident.testing.Split;
import storm.trident.testing.MemoryMapState;

import com.google.common.collect.ImmutableList;

public class LogAnalyserTrident {
   public static void main(String[] args) throws Exception {
      System.out.println("Log Analyser Trident");
      TridentTopology topology = new TridentTopology();

      FeederBatchSpout testSpout = new FeederBatchSpout(ImmutableList.of("fromMobileNumber",
         "toMobileNumber", "duration"));

      TridentState callCounts = topology
         .newStream("fixed-batch-spout", testSpout)
         .each(new Fields("fromMobileNumber", "toMobileNumber"),
         new FormatCall(), new Fields("call"))
         .groupBy(new Fields("call"))
         .persistentAggregate(new MemoryMapState.Factory(), new Count(),
         new Fields("count"));

      LocalDRPC drpc = new LocalDRPC();

      topology.newDRPCStream("call_count", drpc)
         .stateQuery(callCounts, new Fields("args"), new MapGet(), new Fields("count"));

      topology.newDRPCStream("multiple_call_count", drpc)
         .each(new Fields("args"), new CSVSplit(), new Fields("call"))
         .groupBy(new Fields("call"))
         .stateQuery(callCounts, new Fields("call"), new MapGet(),
         new Fields("count"))
         .each(new Fields("call", "count"), new Debug())
         .each(new Fields("count"), new FilterNull())
         .aggregate(new Fields("count"), new Sum(), new Fields("sum"));

      Config conf = new Config();
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("trident", conf, topology.build());
      Random randomGenerator = new Random();
      int idx = 0;

      while(idx < 10) {
         testSpout.feed(ImmutableList.of(new Values("1234123401",
            "1234123402", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123401",
            "1234123403", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123401",
            "1234123404", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123402",
            "1234123403", randomGenerator.nextInt(60))));

         idx = idx + 1;
      }

      System.out.println("DRPC : Query starts");
      System.out.println(drpc.execute("call_count","1234123401 - 1234123402"));
      System.out.println(drpc.execute("multiple_call_count", "1234123401 -
         1234123402,1234123401 - 1234123403"));
      System.out.println("DRPC : Query ends");

      cluster.shutdown();
      drpc.shutdown();

      // DRPCClient client = new DRPCClient("drpc.server.location", 3772);
   }
}

Building and Running the Application

完整的应用程序有三个 Java 代码。它们如下:

  1. FormatCall.java

  2. CSVSplit.java

  3. LogAnalyerTrident.java

可以使用以下命令构建应用程序 −

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java

可以使用以下命令运行应用程序 −

java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserTrident

Output

一旦应用程序启动,应用程序将输出有关集群启动过程、处理操作、DRPC 服务器和客户端信息以及集群关闭过程的完整详细信息。该输出将显示在控制台上,如下所示。

DRPC : Query starts
[["1234123401 - 1234123402",10]]
DEBUG: [1234123401 - 1234123402, 10]
DEBUG: [1234123401 - 1234123403, 10]
[[20]]
DRPC : Query ends