Apache Storm 简明教程
Apache Storm - Working Example
我们已经了解了 Apache Storm 的核心技术细节,现在是时候编写一些简单场景了。
We have gone through the core technical details of the Apache Storm and now it is time to code some simple scenarios.
Scenario – Mobile Call Log Analyzer
移动呼叫及其持续时间将作为输入提供给 Apache Storm,Storm 将处理和分组相同呼叫者和接收者之间的呼叫以及他们的呼叫总数。
Mobile call and its duration will be given as input to Apache Storm and the Storm will process and group the call between the same caller and receiver and their total number of calls.
Spout Creation
Spout 是一种用于数据生成的组件。基本上,Spout 将实现一个 IRichSpout 接口。“IRichSpout”接口有以下重要的方法 −
Spout is a component which is used for data generation. Basically, a spout will implement an IRichSpout interface. “IRichSpout” interface has the following important methods −
-
open − Provides the spout with an environment to execute. The executors will run this method to initialize the spout.
-
nextTuple − Emits the generated data through the collector.
-
close − This method is called when a spout is going to shutdown.
-
declareOutputFields − Declares the output schema of the tuple.
-
ack − Acknowledges that a specific tuple is processed
-
fail − Specifies that a specific tuple is not processed and not to be reprocessed.
Open
open 的语法方法如下所示
The signature of the open method is as follows −
open(Map conf, TopologyContext context, SpoutOutputCollector collector)
-
conf − Provides storm configuration for this spout.
-
context − Provides complete information about the spout place within the topology, its task id, input and output information.
-
collector − Enables us to emit the tuple that will be processed by the bolts.
nextTuple
nextTuple 的语法方法如下所示
The signature of the nextTuple method is as follows −
nextTuple()
nextTuple() 将从与 ack() 和 fail() 方法相同的循环中周期性调用。当没有工作可做时,它必须释放对线程的控制,这样其他方法就有机会被调用。因此,nextTuple 的首行用于检查处理是否已完成。如果是,它应至少休眠一毫秒,以便在返回之前减少对处理器的负载。
nextTuple() is called periodically from the same loop as the ack() and fail() methods. It must release control of the thread when there is no work to do, so that the other methods have a chance to be called. So the first line of nextTuple checks to see if processing has finished. If so, it should sleep for at least one millisecond to reduce load on the processor before returning.
declareOutputFields
declareOutputFields 的语法方法如下所示
The signature of the declareOutputFields method is as follows −
declareOutputFields(OutputFieldsDeclarer declarer)
declarer - 用于声明输出流 ID、输出字段等。
declarer − It is used to declare output stream ids, output fields, etc.
该方法用于指定数据元组的输出模式。
This method is used to specify the output schema of the tuple.
ack
ack 的语法方法如下所示
The signature of the ack method is as follows −
ack(Object msgId)
此方法确认已处理特定数据元组。
This method acknowledges that a specific tuple has been processed.
fail
nextTuple 的语法方法如下所示
The signature of the nextTuple method is as follows −
ack(Object msgId)
该方法告知尚未完全处理特定数据元组。Storm 将重新处理特定数据元组。
This method informs that a specific tuple has not been fully processed. Storm will reprocess the specific tuple.
FakeCallLogReaderSpout
在我们的情况下,我们需要收集通话记录的详细信息。通话记录的信息包含。
In our scenario, we need to collect the call log details. The information of the call log contains.
-
caller number
-
receiver number
-
duration
由于我们没有通话记录的实时信息,因此我们将生成伪造的通话记录。伪造的信息将使用 Random 类创建。完整程序代码如下所示。
Since, we don’t have real-time information of call logs, we will generate fake call logs. The fake information will be created using Random class. The complete program code is given below.
Coding − FakeCallLogReaderSpout.java
import java.util.*;
//import storm tuple packages
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
//import Spout interface packages
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
//Create a class FakeLogReaderSpout which implement IRichSpout interface
to access functionalities
public class FakeCallLogReaderSpout implements IRichSpout {
//Create instance for SpoutOutputCollector which passes tuples to bolt.
private SpoutOutputCollector collector;
private boolean completed = false;
//Create instance for TopologyContext which contains topology data.
private TopologyContext context;
//Create instance for Random class.
private Random randomGenerator = new Random();
private Integer idx = 0;
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.context = context;
this.collector = collector;
}
@Override
public void nextTuple() {
if(this.idx <= 1000) {
List<String> mobileNumbers = new ArrayList<String>();
mobileNumbers.add("1234123401");
mobileNumbers.add("1234123402");
mobileNumbers.add("1234123403");
mobileNumbers.add("1234123404");
Integer localIdx = 0;
while(localIdx++ < 100 && this.idx++ < 1000) {
String fromMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
String toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
while(fromMobileNumber == toMobileNumber) {
toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
}
Integer duration = randomGenerator.nextInt(60);
this.collector.emit(new Values(fromMobileNumber, toMobileNumber, duration));
}
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("from", "to", "duration"));
}
//Override all the interface methods
@Override
public void close() {}
public boolean isDistributed() {
return false;
}
@Override
public void activate() {}
@Override
public void deactivate() {}
@Override
public void ack(Object msgId) {}
@Override
public void fail(Object msgId) {}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
Bolt Creation
Bolt 是一个组件,它将元组作为输入,处理该元组,并产生新的元组作为输出。Bolts 将实现 IRichBolt 接口。在这个程序中,使用两个 bolt 类 CallLogCreatorBolt 和 CallLogCounterBolt 来执行操作。
Bolt is a component that takes tuples as input, processes the tuple, and produces new tuples as output. Bolts will implement IRichBolt interface. In this program, two bolt classes CallLogCreatorBolt and CallLogCounterBolt are used to perform the operations.
IRichBolt 接口具有以下方法:
IRichBolt interface has the following methods −
-
prepare − Provides the bolt with an environment to execute. The executors will run this method to initialize the spout.
-
execute − Process a single tuple of input.
-
cleanup − Called when a bolt is going to shutdown.
-
declareOutputFields − Declares the output schema of the tuple.
Prepare
prepare 方法的签名如下 −
The signature of the prepare method is as follows −
prepare(Map conf, TopologyContext context, OutputCollector collector)
-
conf − Provides Storm configuration for this bolt.
-
context − Provides complete information about the bolt place within the topology, its task id, input and output information, etc.
-
collector − Enables us to emit the processed tuple.
execute
execute 方法的签名如下 −
The signature of the execute method is as follows −
execute(Tuple tuple)
此处的 tuple 是要处理的输入元组。
Here tuple is the input tuple to be processed.
execute 方法一次处理单个元组。可以通过 Tuple 类的 getValue 方法来访问元组数据。不必立即处理输入元组。可以处理多个元组,并将其输出为一个输出元组。可以通过使用 OutputCollector 类来发出处理过的元组。
The execute method processes a single tuple at a time. The tuple data can be accessed by getValue method of Tuple class. It is not necessary to process the input tuple immediately. Multiple tuple can be processed and output as a single output tuple. The processed tuple can be emitted by using the OutputCollector class.
declareOutputFields
declareOutputFields 的语法方法如下所示
The signature of the declareOutputFields method is as follows −
declareOutputFields(OutputFieldsDeclarer declarer)
此处参数 declarer 用于声明输出流 ID、输出字段等。
Here the parameter declarer is used to declare output stream ids, output fields, etc.
此方法用于指定元组的输出模式
This method is used to specify the output schema of the tuple
Call log Creator Bolt
呼叫记录创建器 bolt 接收呼叫记录元组。呼叫记录元组包含呼叫者号码、接收者号码和呼叫持续时间。此 bolt 只需通过组合呼叫者号码和接收者号码来创建新值。新值的格式为“呼叫者号码 – 接收者号码”,并将其命名为新字段“call”。完整代码如下。
Call log creator bolt receives the call log tuple. The call log tuple has caller number, receiver number, and call duration. This bolt simply creates a new value by combining the caller number and the receiver number. The format of the new value is "Caller number – Receiver number" and it is named as new field, "call". The complete code is given below.
Coding − CallLogCreatorBolt.java
//import util packages
import java.util.HashMap;
import java.util.Map;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
//import Storm IRichBolt package
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
//Create a class CallLogCreatorBolt which implement IRichBolt interface
public class CallLogCreatorBolt implements IRichBolt {
//Create instance for OutputCollector which collects and emits tuples to produce output
private OutputCollector collector;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple tuple) {
String from = tuple.getString(0);
String to = tuple.getString(1);
Integer duration = tuple.getInteger(2);
collector.emit(new Values(from + " - " + to, duration));
}
@Override
public void cleanup() {}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("call", "duration"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
Call log Counter Bolt
呼叫记录计数器 bolt 将呼叫及其持续时间作为元组接收。此 bolt 在准备方法中初始化了一个字典(Map)对象。在 execute 方法中,它将检查元组并在 tuple 中针对每个新“call”值在字典对象中创建一个新条目,并在字典对象中设置值 1。对于字典中已经存在的条目,它只会递增其值。简单来说,此 bolt 将呼叫及其计数保存在字典对象中。我们可以将呼叫及其计数保存在字典中,也可以将其保存在数据源中。完整的程序代码如下 −
Call log counter bolt receives call and its duration as a tuple. This bolt initializes a dictionary (Map) object in the prepare method. In execute method, it checks the tuple and creates a new entry in the dictionary object for every new “call” value in the tuple and sets a value 1 in the dictionary object. For the already available entry in the dictionary, it just increment its value. In simple terms, this bolt saves the call and its count in the dictionary object. Instead of saving the call and its count in the dictionary, we can also save it to a datasource. The complete program code is as follows −
Coding − CallLogCounterBolt.java
import java.util.HashMap;
import java.util.Map;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
public class CallLogCounterBolt implements IRichBolt {
Map<String, Integer> counterMap;
private OutputCollector collector;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
this.counterMap = new HashMap<String, Integer>();
this.collector = collector;
}
@Override
public void execute(Tuple tuple) {
String call = tuple.getString(0);
Integer duration = tuple.getInteger(1);
if(!counterMap.containsKey(call)){
counterMap.put(call, 1);
}else{
Integer c = counterMap.get(call) + 1;
counterMap.put(call, c);
}
collector.ack(tuple);
}
@Override
public void cleanup() {
for(Map.Entry<String, Integer> entry:counterMap.entrySet()){
System.out.println(entry.getKey()+" : " + entry.getValue());
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("call"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
Creating Topology
Storm 拓扑结构从本质上是一个 Thrift 结构。TopologyBuilder 类提供了简单易用的方法来创建复杂拓扑结构。TopologyBuilder 类具有设置 spout (setSpout) 和 bolt (setBolt) 的方法。最后,TopologyBuilder 具有 createTopology 来创建拓扑结构。使用以下代码段来创建拓扑结构 −
The Storm topology is basically a Thrift structure. TopologyBuilder class provides simple and easy methods to create complex topologies. The TopologyBuilder class has methods to set spout (setSpout) and to set bolt (setBolt). Finally, TopologyBuilder has createTopology to create topology. Use the following code snippet to create a topology −
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());
builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
.shuffleGrouping("call-log-reader-spout");
builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
.fieldsGrouping("call-log-creator-bolt", new Fields("call"));
shuffleGrouping 和 fieldsGrouping 方法有助于为 spout 和 bolt 设置流分组。
shuffleGrouping and fieldsGrouping methods help to set stream grouping for spout and bolts.
Local Cluster
出于开发目的,我们可以使用“LocalCluster”对象创建一个本地集群,然后使用“LocalCluster”类的“submitTopology”方法提交拓扑结构。“submitTopology”的一个参数是“Config”类的实例。“Config”类用于在提交拓扑结构之前设置配置选项。此配置选项将在运行时与集群配置合并,并通过准备方法发送到所有任务(spout 和 bolt)。一旦拓扑结构提交到集群,我们将等待 10 秒,以便集群计算提交的拓扑结构,然后再使用“LocalCluster”的“shutdown”方法关闭集群。完整的程序代码如下 −
For development purpose, we can create a local cluster using "LocalCluster" object and then submit the topology using "submitTopology" method of "LocalCluster" class. One of the arguments for "submitTopology" is an instance of "Config" class. The "Config" class is used to set configuration options before submitting the topology. This configuration option will be merged with the cluster configuration at run time and sent to all task (spout and bolt) with the prepare method. Once topology is submitted to the cluster, we will wait 10 seconds for the cluster to compute the submitted topology and then shutdown the cluster using “shutdown” method of "LocalCluster". The complete program code is as follows −
Coding − LogAnalyserStorm.java
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
//import storm configuration packages
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
//Create main class LogAnalyserStorm submit topology.
public class LogAnalyserStorm {
public static void main(String[] args) throws Exception{
//Create Config instance for cluster configuration
Config config = new Config();
config.setDebug(true);
//
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());
builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
.shuffleGrouping("call-log-reader-spout");
builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
.fieldsGrouping("call-log-creator-bolt", new Fields("call"));
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("LogAnalyserStorm", config, builder.createTopology());
Thread.sleep(10000);
//Stop the topology
cluster.shutdown();
}
}
Building and Running the Application
完整应用程序包含四个 Java 代码。它们是 −
The complete application has four Java codes. They are −
-
FakeCallLogReaderSpout.java
-
CallLogCreaterBolt.java
-
CallLogCounterBolt.java
-
LogAnalyerStorm.java
可以通过以下命令构建应用程序 −
The application can be built using the following command −
javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java
可以通过以下命令运行应用程序 −
The application can be run using the following command −
java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserStorm
Output
应用程序启动后,它将输出有关集群启动过程、喷口和螺栓处理以及最终集群关闭过程的完整详细信息。在“CallLogCounterBolt”中,我们打印了呼叫及其计数详细信息。此信息将显示在控制台上,如下所示 −
Once the application is started, it will output the complete details about the cluster startup process, spout and bolt processing, and finally, the cluster shutdown process. In "CallLogCounterBolt", we have printed the call and its count details. This information will be displayed on the console as follows −
1234123402 - 1234123401 : 78
1234123402 - 1234123404 : 88
1234123402 - 1234123403 : 105
1234123401 - 1234123404 : 74
1234123401 - 1234123403 : 81
1234123401 - 1234123402 : 81
1234123403 - 1234123404 : 86
1234123404 - 1234123401 : 63
1234123404 - 1234123402 : 82
1234123403 - 1234123402 : 83
1234123404 - 1234123403 : 86
1234123403 - 1234123401 : 93
Non-JVM languages
Storm 拓扑由 Thrift 接口实现,这使得使用任何语言提交拓扑变得很容易。Storm 支持 Ruby、Python 和许多其他语言。让我们看看 Python 绑定。
Storm topologies are implemented by Thrift interfaces which makes it easy to submit topologies in any language. Storm supports Ruby, Python and many other languages. Let’s take a look at python binding.
Python Binding
Python 是一种通用的解释型、交互式、面向对象且高级的编程语言。Storm 支持 Python 来实现其拓扑。Python 支持发射、锚定、确认日志操作。
Python is a general-purpose interpreted, interactive, object-oriented, and high-level programming language. Storm supports Python to implement its topology. Python supports emitting, anchoring, acking, and logging operations.
如您所知,螺栓可以用任何语言定义。用另一种语言编写的螺栓作为子进程执行,并且 Storm 通过 stdin/stdout 上的 JSON 消息与那些子进程通信。首先使用支持 Python 绑定的示例螺栓 WordCount。
As you know, bolts can be defined in any language. Bolts written in another language are executed as sub-processes, and Storm communicates with those sub-processes with JSON messages over stdin/stdout. First take a sample bolt WordCount that supports python binding.
public static class WordCount implements IRichBolt {
public WordSplit() {
super("python", "splitword.py");
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
此处,类 WordCount 实现 IRichBolt 接口并在 Python 实现指定的超类方法参数“splitword.py”下运行。现在,创建一个名为 “splitword.py” 的 Python 实现。
Here the class WordCount implements the IRichBolt interface and running with python implementation specified super method argument "splitword.py". Now create a python implementation named "splitword.py".
import storm
class WordCountBolt(storm.BasicBolt):
def process(self, tup):
words = tup.values[0].split(" ")
for word in words:
storm.emit([word])
WordCountBolt().run()
这是一个 Python 示例实现,用于计算给定句子中的单词数量。同样,您还可以与其他支持的语言绑定。
This is the sample implementation for Python that counts the words in a given sentence. Similarly you can bind with other supporting languages as well.