Apache Storm 简明教程
Apache Storm - Trident
Trident 是 Storm 的扩展。像 Storm 一样,Trident 也由 Twitter 开发。开发 Trident 的主要原因是提供一个基于 Storm 的高级抽象以及有状态流处理和低延迟分布式查询。
Trident is an extension of Storm. Like Storm, Trident was also developed by Twitter. The main reason behind developing Trident is to provide a high-level abstraction on top of Storm along with stateful stream processing and low latency distributed querying.
Trident 使用水 spout 和螺栓,但这些底层组件在执行前是由 Trident 自动生成的。Trident 具有函数、过滤器、联接、分组和聚合。
Trident uses spout and bolt, but these low-level components are auto-generated by Trident before execution. Trident has functions, filters, joins, grouping, and aggregation.
Trident 处理作为一系列批次的流,这些批次称为事务。通常来说,这些小批次的大小根据输入流的规模,将达到数千或数百万个元组。通过这种方式,Trident 不同于按元组执行处理的 Storm。
Trident processes streams as a series of batches which are referred as transactions. Generally the size of those small batches will be on the order of thousands or millions of tuples, depending on the input stream. This way, Trident is different from Storm, which performs tuple-by-tuple processing.
批处理的概念与数据库事务非常相似。每个事务都被分配一个事务 ID。一旦事务的所有处理完成,事务就会被认为成功。但是,如果处理事务元组中的一个失败,整个事务将被重新传输。对于每个批次,Trident 将在事务开始时调用 beginCommit,在事务结束时调用 commit。
Batch processing concept is very similar to database transactions. Every transaction is assigned a transaction ID. The transaction is considered successful, once all its processing complete. However, a failure in processing one of the transaction’s tuples will cause the entire transaction to be retransmitted. For each batch, Trident will call beginCommit at the beginning of the transaction, and commit at the end of it.
Trident Topology
Trident API 暴露出使用 “TridentTopology” 类创建 Trident 拓扑的简单选项。基本上,Trident 拓扑从喷口接收输入流,并对流执行有序的操作序列(过滤、聚合、分组等)。Storm Tuple 被 Trident Tuple 替换,Bolt 被操作替换。一个简单的 Trident 拓扑可以如下创建 −
Trident API exposes an easy option to create Trident topology using “TridentTopology” class. Basically, Trident topology receives input stream from spout and do ordered sequence of operation (filter, aggregation, grouping, etc.,) on the stream. Storm Tuple is replaced by Trident Tuple and Bolts are replaced by operations. A simple Trident topology can be created as follow −
TridentTopology topology = new TridentTopology();
Trident Tuples
Trident 元组是一个带名称的值列表。TridentTuple 接口是 Trident 拓扑的数据模型。TridentTuple 接口是可以由 Trident 拓扑处理的数据的基本单位。
Trident tuple is a named list of values. The TridentTuple interface is the data model of a Trident topology. The TridentTuple interface is the basic unit of data that can be processed by a Trident topology.
Trident Spout
Trident 喷口类似于 Storm 喷口,并且具有使用 Trident 功能的附加选项。实际上,我们仍然可以使用我们在 Storm 拓扑中使用的 IRichSpout,但其本质上是非事务性的,我们无法使用 Trident 提供的优势。
Trident spout is similar to Storm spout, with additional options to use the features of Trident. Actually, we can still use the IRichSpout, which we have used in Storm topology, but it will be non-transactional in nature and we won’t be able to use the advantages provided by Trident.
具有使用 Trident 功能的所有功能的基本喷口是 "ITridentSpout"。它支持事务语义和不透明事务语义。其他喷口有 IBatchSpout、IPartitionedTridentSpout 和 IOpaquePartitionedTridentSpout。
The basic spout having all the functionality to use the features of Trident is "ITridentSpout". It supports both transactional and opaque transactional semantics. The other spouts are IBatchSpout, IPartitionedTridentSpout, and IOpaquePartitionedTridentSpout.
除了这些通用喷口之外,Trident 还有许多三叉喷口的示例实现。其中之一是 FeederBatchSpout 喷口,我们可以使用它轻松发送三叉元组的已命名列表,而无需担心批处理、并行性等问题。
In addition to these generic spouts, Trident has many sample implementation of trident spout. One of them is FeederBatchSpout spout, which we can use to send named list of trident tuples easily without worrying about batch processing, parallelism, etc.
FeederBatchSpout 的创建和数据馈送可以按如下所示完成 −
FeederBatchSpout creation and data feeding can be done as shown below −
TridentTopology topology = new TridentTopology();
FeederBatchSpout testSpout = new FeederBatchSpout(
ImmutableList.of("fromMobileNumber", "toMobileNumber", “duration”));
topology.newStream("fixed-batch-spout", testSpout)
testSpout.feed(ImmutableList.of(new Values("1234123401", "1234123402", 20)));
Trident Operations
Trident 依赖于 “Trident Operation” 来处理三叉元组的输入流。Trident API 具有大量内置操作,可用于处理从简单到复杂的流处理。这些操作范围从验证三叉元组的简单分组和聚合到复杂的验证。让我们了解一下最重要的和最常用的操作。
Trident relies on the “Trident Operation” to process the input stream of trident tuples. Trident API has a number of in-built operations to handle simple-to-complex stream processing. These operations range from simple validation to complex grouping and aggregation of trident tuples. Let us go through the most important and frequently used operations.
Filter
过滤器是一个用于执行输入验证任务的对象。Trident 过滤器获取三叉元组字段的子集作为输入,并根据满足或不满足某些条件返回 true 或 false。如果返回 true,则元组保留在输出流中;否则,元组从流中移除。过滤器基本上将继承 BaseFilter 类并实现 isKeep 方法。如下所示是过滤器操作的一个示例实现 −
Filter is an object used to perform the task of input validation. A Trident filter gets a subset of trident tuple fields as input and returns either true or false depending on whether certain conditions are satisfied or not. If true is returned, then the tuple is kept in the output stream; otherwise, the tuple is removed from the stream. Filter will basically inherit from the BaseFilter class and implement the isKeep method. Here is a sample implementation of filter operation −
public class MyFilter extends BaseFilter {
public boolean isKeep(TridentTuple tuple) {
return tuple.getInteger(1) % 2 == 0;
}
}
input
[1, 2]
[1, 3]
[1, 4]
output
[1, 2]
[1, 4]
可以使用 “each” 方法在拓扑中调用过滤器函数。可以使用 “Fields” 类指定输入(三叉元组子集)。示例代码如下 −
Filter function can be called in the topology using “each” method. “Fields” class can be used to specify the input (subset of trident tuple). The sample code is as follows −
TridentTopology topology = new TridentTopology();
topology.newStream("spout", spout)
.each(new Fields("a", "b"), new MyFilter())
Function
Function 是一个用于对单个三叉元组执行简单操作的对象。它采用三叉元组字段的子集,并发出零个或多个新的三叉元组字段。
Function is an object used to perform a simple operation on a single trident tuple. It takes a subset of trident tuple fields and emits zero or more new trident tuple fields.
Function 基本上继承 BaseFunction 类并实现 execute 方法。示例实现如下 −
Function basically inherits from the BaseFunction class and implements the execute method. A sample implementation is given below −
public class MyFunction extends BaseFunction {
public void execute(TridentTuple tuple, TridentCollector collector) {
int a = tuple.getInteger(0);
int b = tuple.getInteger(1);
collector.emit(new Values(a + b));
}
}
input
[1, 2]
[1, 3]
[1, 4]
output
[1, 2, 3]
[1, 3, 4]
[1, 4, 5]
就像过滤器操作一样,可以使用 each 方法在拓扑中调用函数操作。示例代码如下 −
Just like Filter operation, Function operation can be called in a topology using the each method. The sample code is as follows −
TridentTopology topology = new TridentTopology();
topology.newStream("spout", spout)
.each(new Fields(“a, b"), new MyFunction(), new Fields(“d")));
Aggregation
聚合是一个用于对输入批次或分区或流执行聚合操作的对象。Trident 有三种类型的聚合。具体如下 −
Aggregation is an object used to perform aggregation operations on an input batch or partition or stream. Trident has three types of aggregation. They are as follows −
-
aggregate − Aggregates each batch of trident tuple in isolation. During the aggregate process, the tuples are initially repartitioned using the global grouping to combine all partitions of the same batch into a single partition.
-
partitionAggregate − Aggregates each partition instead of the entire batch of trident tuple. The output of the partition aggregate completely replaces the input tuple. The output of the partition aggregate contains a single field tuple.
-
persistentaggregate − Aggregates on all trident tuple across all batch and stores the result in either memory or database.
TridentTopology topology = new TridentTopology();
// aggregate operation
topology.newStream("spout", spout)
.each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
.aggregate(new Count(), new Fields(“count”))
// partitionAggregate operation
topology.newStream("spout", spout)
.each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
.partitionAggregate(new Count(), new Fields(“count"))
// persistentAggregate - saving the count to memory
topology.newStream("spout", spout)
.each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
.persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));
可以使用 CombinerAggregator、ReducerAggregator 或通用 Aggregator 接口创建聚合操作。上面示例中使用的 “count” 聚合器是内置聚合器之一。它使用 “CombinerAggregator” 实现。实现如下 −
Aggregation operation can be created using either CombinerAggregator, ReducerAggregator, or generic Aggregator interface. The "count” aggregator used in the above example is one of the build-in aggregators. It is implemented using “CombinerAggregator”. The implementation is as follows −
public class Count implements CombinerAggregator<Long> {
@Override
public Long init(TridentTuple tuple) {
return 1L;
}
@Override
public Long combine(Long val1, Long val2) {
return val1 + val2;
}
@Override
public Long zero() {
return 0L;
}
}
Grouping
分组操作是内置操作,可通过 groupBy 方法调用。groupBy 方法通过对指定字段执行 partitionBy,重新对流进行分区,然后在每个分区中,将组字段相等的元组分组在一起。通常,我们使用“groupBy”和“persistentAggregate”来获取分组聚合。示例代码如下 −
Grouping operation is an inbuilt operation and can be called by the groupBy method. The groupBy method repartitions the stream by doing a partitionBy on the specified fields, and then within each partition, it groups tuples together whose group fields are equal. Normally, we use “groupBy” along with “persistentAggregate” to get the grouped aggregation. The sample code is as follows −
TridentTopology topology = new TridentTopology();
// persistentAggregate - saving the count to memory
topology.newStream("spout", spout)
.each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
.groupBy(new Fields(“d”)
.persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));
Merging and Joining
可以通过分别使用“merge”和“join”方法来进行合并和连接。合并组合一个或多个流。连接与合并类似,但连接使用来自两侧的三元组字段来检查和连接两个流。此外,连接仅在批处理级别起作用。示例代码如下 −
Merging and joining can be done by using “merge” and “join” method respectively. Merging combines one or more streams. Joining is similar to merging, except the fact that joining uses trident tuple field from both sides to check and join two streams. Moreover, joining will work under batch level only. The sample code is as follows −
TridentTopology topology = new TridentTopology();
topology.merge(stream1, stream2, stream3);
topology.join(stream1, new Fields("key"), stream2, new Fields("x"),
new Fields("key", "a", "b", "c"));
State Maintenance
Trident 提供了维护状态的机制。状态信息可以存储在拓扑本身中,否则你也可以将其存储在单独的数据库中。目的是维护一个状态:如果在处理过程中任何元组发生故障,那么将重试失败的元组。在更新状态时,这会导致问题,因为你无法确定先前是否更新过此元组的状态。如果在更新状态之前元组发生故障,那么重试元组将使状态保持稳定。但是,如果在更新状态后元组发生故障,那么重试同一个元组将再次增加数据库中记录的数目,并使状态不稳定。需要执行以下步骤来确保仅处理消息一次 −
Trident provides a mechanism for state maintenance. State information can be stored in the topology itself, otherwise you can store it in a separate database as well. The reason is to maintain a state that if any tuple fails during processing, then the failed tuple is retried. This creates a problem while updating the state because you are not sure whether the state of this tuple has been updated previously or not. If the tuple has failed before updating the state, then retrying the tuple will make the state stable. However, if the tuple has failed after updating the state, then retrying the same tuple will again increase the count in the database and make the state unstable. One needs to perform the following steps to ensure a message is processed only once −
-
Process the tuples in small batches.
-
Assign a unique ID to each batch. If the batch is retried, it is given the same unique ID.
-
The state updates are ordered among batches. For example, the state update of the second batch will not be possible until the state update for the first batch has completed.
Distributed RPC
分布式 RPC 用于从 Trident 拓扑查询和检索结果。Storm 有一个内置的分布式 RPC 服务器。分布式 RPC 服务器接收来自客户端的 RPC 请求,并将其传递给拓扑。拓扑处理请求并将结果发送到分布式 RPC 服务器,该服务器由分布式 RPC 服务器重定向到客户端。Trident 的分布式 RPC 查询与普通 RPC 查询一样执行,只是这些查询并行运行。
Distributed RPC is used to query and retrieve the result from the Trident topology. Storm has an inbuilt distributed RPC server. The distributed RPC server receives the RPC request from the client and passes it to the topology. The topology processes the request and sends the result to the distributed RPC server, which is redirected by the distributed RPC server to the client. Trident’s distributed RPC query executes like a normal RPC query, except for the fact that these queries are run in parallel.
When to Use Trident?
在许多用例中,如果要求仅处理一次查询,我们可以通过在 Trident 中编写拓扑来实现。另一方面,在 Storm 中难以实现处理一次。因此,在需要精确处理一次的用例中,Trident 将非常有用。Trident 不适用于所有用例,特别是高性能用例,因为它增加了 Storm 的复杂性并管理状态。
As in many use-cases, if the requirement is to process a query only once, we can achieve it by writing a topology in Trident. On the other hand, it will be difficult to achieve exactly once processing in the case of Storm. Hence Trident will be useful for those use-cases where you require exactly once processing. Trident is not for all use cases, especially high-performance use-cases because it adds complexity to Storm and manages the state.
Working Example of Trident
我们准备将上一部分中研究过的呼叫记录分析器应用程序转换为 Trident 框架。与普通 Storm 相比,Trident 应用程序相对容易,这要归功于其高级 API。在 Trident 中,Storm 主要需要执行函数、过滤器、聚合、分组、连接和合并运算中的任何一个。最后,我们将使用 LocalDRPC 类启动 DRPC 服务器,并使用 LocalDRPC 类的 execute 方法搜索一些关键字。
We are going to convert our call log analyzer application worked out in the previous section to Trident framework. Trident application will be relatively easy as compared to plain storm, thanks to its high-level API. Storm will be basically required to perform any one of Function, Filter, Aggregate, GroupBy, Join and Merge operations in Trident. Finally we will start the DRPC Server using the LocalDRPC class and search some keyword using the execute method of LocalDRPC class.
Formatting the call information
FormatCall 类的目的是格式化呼叫信息,包括“呼叫号码”和“接收号码”。完整的程序代码如下所示 −
The purpose of the FormatCall class is to format the call information comprising “Caller number” and “Receiver number”. The complete program code is as follows −
Coding: FormatCall.java
import backtype.storm.tuple.Values;
import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;
public class FormatCall extends BaseFunction {
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
String fromMobileNumber = tuple.getString(0);
String toMobileNumber = tuple.getString(1);
collector.emit(new Values(fromMobileNumber + " - " + toMobileNumber));
}
}
CSVSplit
CSVSplit 类的目的是基于“逗号 (,)”拆分输入字符串,并发出字符串中的每个单词。此函数用于解析分布式查询的输入参数。完整的代码如下所示 −
The purpose of the CSVSplit class is to split the input string based on “comma (,)” and emit every word in the string. This function is used to parse the input argument of distributed querying. The complete code is as follows −
Coding: CSVSplit.java
import backtype.storm.tuple.Values;
import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;
public class CSVSplit extends BaseFunction {
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
for(String word: tuple.getString(0).split(",")) {
if(word.length() > 0) {
collector.emit(new Values(word));
}
}
}
}
Log Analyzer
这是主应用程序。最初,应用程序将初始化 TridentTopology 并使用 FeederBatchSpout 输入呼叫者信息。可以使用 TridentTopology 类的 newStream 方法创建 Trident 拓扑流。类似地,可以使用 TridentTopology 类的 newDRCPStream 方法创建 Trident 拓扑 DRPC 流。可以使用 LocalDRPC 类创建一个简单的 DRCP 服务器。 LocalDRPC 有一个用于搜索一些关键字的 execute 方法。完整的代码如下所示。
This is the main application. Initially, the application will initialize the TridentTopology and feed caller information using FeederBatchSpout. Trident topology stream can be created using the newStream method of TridentTopology class. Similarly, Trident topology DRPC stream can be created using the newDRCPStream method of TridentTopology class. A simple DRCP server can be created using LocalDRPC class. LocalDRPC has execute method to search some keyword. The complete code is given below.
Coding: LogAnalyserTrident.java
import java.util.*;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.utils.DRPCClient;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import storm.trident.TridentState;
import storm.trident.TridentTopology;
import storm.trident.tuple.TridentTuple;
import storm.trident.operation.builtin.FilterNull;
import storm.trident.operation.builtin.Count;
import storm.trident.operation.builtin.Sum;
import storm.trident.operation.builtin.MapGet;
import storm.trident.operation.builtin.Debug;
import storm.trident.operation.BaseFilter;
import storm.trident.testing.FixedBatchSpout;
import storm.trident.testing.FeederBatchSpout;
import storm.trident.testing.Split;
import storm.trident.testing.MemoryMapState;
import com.google.common.collect.ImmutableList;
public class LogAnalyserTrident {
public static void main(String[] args) throws Exception {
System.out.println("Log Analyser Trident");
TridentTopology topology = new TridentTopology();
FeederBatchSpout testSpout = new FeederBatchSpout(ImmutableList.of("fromMobileNumber",
"toMobileNumber", "duration"));
TridentState callCounts = topology
.newStream("fixed-batch-spout", testSpout)
.each(new Fields("fromMobileNumber", "toMobileNumber"),
new FormatCall(), new Fields("call"))
.groupBy(new Fields("call"))
.persistentAggregate(new MemoryMapState.Factory(), new Count(),
new Fields("count"));
LocalDRPC drpc = new LocalDRPC();
topology.newDRPCStream("call_count", drpc)
.stateQuery(callCounts, new Fields("args"), new MapGet(), new Fields("count"));
topology.newDRPCStream("multiple_call_count", drpc)
.each(new Fields("args"), new CSVSplit(), new Fields("call"))
.groupBy(new Fields("call"))
.stateQuery(callCounts, new Fields("call"), new MapGet(),
new Fields("count"))
.each(new Fields("call", "count"), new Debug())
.each(new Fields("count"), new FilterNull())
.aggregate(new Fields("count"), new Sum(), new Fields("sum"));
Config conf = new Config();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("trident", conf, topology.build());
Random randomGenerator = new Random();
int idx = 0;
while(idx < 10) {
testSpout.feed(ImmutableList.of(new Values("1234123401",
"1234123402", randomGenerator.nextInt(60))));
testSpout.feed(ImmutableList.of(new Values("1234123401",
"1234123403", randomGenerator.nextInt(60))));
testSpout.feed(ImmutableList.of(new Values("1234123401",
"1234123404", randomGenerator.nextInt(60))));
testSpout.feed(ImmutableList.of(new Values("1234123402",
"1234123403", randomGenerator.nextInt(60))));
idx = idx + 1;
}
System.out.println("DRPC : Query starts");
System.out.println(drpc.execute("call_count","1234123401 - 1234123402"));
System.out.println(drpc.execute("multiple_call_count", "1234123401 -
1234123402,1234123401 - 1234123403"));
System.out.println("DRPC : Query ends");
cluster.shutdown();
drpc.shutdown();
// DRPCClient client = new DRPCClient("drpc.server.location", 3772);
}
}
Building and Running the Application
完整的应用程序有三个 Java 代码。它们如下:
The complete application has three Java codes. They are as follows −
-
FormatCall.java
-
CSVSplit.java
-
LogAnalyerTrident.java
可以使用以下命令构建应用程序 −
The application can be built by using the following command −
javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java
可以使用以下命令运行应用程序 −
The application can be run by using the following command −
java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserTrident
Output
一旦应用程序启动,应用程序将输出有关集群启动过程、处理操作、DRPC 服务器和客户端信息以及集群关闭过程的完整详细信息。该输出将显示在控制台上,如下所示。
Once the application is started, the application will output the complete details about the cluster startup process, operations processing, DRPC Server and client information, and finally, the cluster shutdown process. This output will be displayed on the console as shown below.
DRPC : Query starts
[["1234123401 - 1234123402",10]]
DEBUG: [1234123401 - 1234123402, 10]
DEBUG: [1234123401 - 1234123403, 10]
[[20]]
DRPC : Query ends