Apache Storm 简明教程
Apache Storm - Working Example
我们已经了解了 Apache Storm 的核心技术细节,现在是时候编写一些简单场景了。
Scenario – Mobile Call Log Analyzer
移动呼叫及其持续时间将作为输入提供给 Apache Storm,Storm 将处理和分组相同呼叫者和接收者之间的呼叫以及他们的呼叫总数。
Spout Creation
Spout 是一种用于数据生成的组件。基本上,Spout 将实现一个 IRichSpout 接口。“IRichSpout”接口有以下重要的方法 −
-
open − 为 spout 提供了一个执行环境。执行器将运行此方法来初始化 spout。
-
nextTuple - 借由收集器发射生成的数据。
-
close - 当注水器关闭时调用此方法。
-
declareOutputFields − 声明元组的输出模式。
-
ack - 确认已处理特定数据元组
-
fail - 指定某些数据元组尚未处理,也不应重新处理。
Open
open 的语法方法如下所示
open(Map conf, TopologyContext context, SpoutOutputCollector collector)
-
conf - 提供此注水器的暴风配置。
-
context - 提供有关拓扑中注水器放置位置、任务 ID、输入和输出信息等完整信息。
-
collector - 能够向螺栓发出的数据元组。
nextTuple
nextTuple 的语法方法如下所示
nextTuple()
nextTuple() 将从与 ack() 和 fail() 方法相同的循环中周期性调用。当没有工作可做时,它必须释放对线程的控制,这样其他方法就有机会被调用。因此,nextTuple 的首行用于检查处理是否已完成。如果是,它应至少休眠一毫秒,以便在返回之前减少对处理器的负载。
declareOutputFields
declareOutputFields 的语法方法如下所示
declareOutputFields(OutputFieldsDeclarer declarer)
declarer - 用于声明输出流 ID、输出字段等。
该方法用于指定数据元组的输出模式。
FakeCallLogReaderSpout
在我们的情况下,我们需要收集通话记录的详细信息。通话记录的信息包含。
-
caller number
-
receiver number
-
duration
由于我们没有通话记录的实时信息,因此我们将生成伪造的通话记录。伪造的信息将使用 Random 类创建。完整程序代码如下所示。
Coding − FakeCallLogReaderSpout.java
import java.util.*;
//import storm tuple packages
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
//import Spout interface packages
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
//Create a class FakeLogReaderSpout which implement IRichSpout interface
to access functionalities
public class FakeCallLogReaderSpout implements IRichSpout {
//Create instance for SpoutOutputCollector which passes tuples to bolt.
private SpoutOutputCollector collector;
private boolean completed = false;
//Create instance for TopologyContext which contains topology data.
private TopologyContext context;
//Create instance for Random class.
private Random randomGenerator = new Random();
private Integer idx = 0;
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.context = context;
this.collector = collector;
}
@Override
public void nextTuple() {
if(this.idx <= 1000) {
List<String> mobileNumbers = new ArrayList<String>();
mobileNumbers.add("1234123401");
mobileNumbers.add("1234123402");
mobileNumbers.add("1234123403");
mobileNumbers.add("1234123404");
Integer localIdx = 0;
while(localIdx++ < 100 && this.idx++ < 1000) {
String fromMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
String toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
while(fromMobileNumber == toMobileNumber) {
toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
}
Integer duration = randomGenerator.nextInt(60);
this.collector.emit(new Values(fromMobileNumber, toMobileNumber, duration));
}
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("from", "to", "duration"));
}
//Override all the interface methods
@Override
public void close() {}
public boolean isDistributed() {
return false;
}
@Override
public void activate() {}
@Override
public void deactivate() {}
@Override
public void ack(Object msgId) {}
@Override
public void fail(Object msgId) {}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
Bolt Creation
Bolt 是一个组件,它将元组作为输入,处理该元组,并产生新的元组作为输出。Bolts 将实现 IRichBolt 接口。在这个程序中,使用两个 bolt 类 CallLogCreatorBolt 和 CallLogCounterBolt 来执行操作。
IRichBolt 接口具有以下方法:
-
prepare − 为 bolt 提供一个执行环境。执行程序将运行此方法来初始化 spout。
-
execute − 处理单个输入元组。
-
cleanup − 在 bolt 将要关闭时调用。
-
declareOutputFields − 声明元组的输出模式。
Prepare
prepare 方法的签名如下 −
prepare(Map conf, TopologyContext context, OutputCollector collector)
-
conf − 为此 bolt 提供 Storm 配置。
-
context − 提供关于 bolt 在拓扑结构中的完整信息,包括其任务 ID、输入和输出信息等。
-
collector − 使我们能够发出处理过的元组。
Call log Creator Bolt
呼叫记录创建器 bolt 接收呼叫记录元组。呼叫记录元组包含呼叫者号码、接收者号码和呼叫持续时间。此 bolt 只需通过组合呼叫者号码和接收者号码来创建新值。新值的格式为“呼叫者号码 – 接收者号码”,并将其命名为新字段“call”。完整代码如下。
Coding − CallLogCreatorBolt.java
//import util packages
import java.util.HashMap;
import java.util.Map;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
//import Storm IRichBolt package
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
//Create a class CallLogCreatorBolt which implement IRichBolt interface
public class CallLogCreatorBolt implements IRichBolt {
//Create instance for OutputCollector which collects and emits tuples to produce output
private OutputCollector collector;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple tuple) {
String from = tuple.getString(0);
String to = tuple.getString(1);
Integer duration = tuple.getInteger(2);
collector.emit(new Values(from + " - " + to, duration));
}
@Override
public void cleanup() {}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("call", "duration"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
Call log Counter Bolt
呼叫记录计数器 bolt 将呼叫及其持续时间作为元组接收。此 bolt 在准备方法中初始化了一个字典(Map)对象。在 execute 方法中,它将检查元组并在 tuple 中针对每个新“call”值在字典对象中创建一个新条目,并在字典对象中设置值 1。对于字典中已经存在的条目,它只会递增其值。简单来说,此 bolt 将呼叫及其计数保存在字典对象中。我们可以将呼叫及其计数保存在字典中,也可以将其保存在数据源中。完整的程序代码如下 −
Coding − CallLogCounterBolt.java
import java.util.HashMap;
import java.util.Map;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
public class CallLogCounterBolt implements IRichBolt {
Map<String, Integer> counterMap;
private OutputCollector collector;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
this.counterMap = new HashMap<String, Integer>();
this.collector = collector;
}
@Override
public void execute(Tuple tuple) {
String call = tuple.getString(0);
Integer duration = tuple.getInteger(1);
if(!counterMap.containsKey(call)){
counterMap.put(call, 1);
}else{
Integer c = counterMap.get(call) + 1;
counterMap.put(call, c);
}
collector.ack(tuple);
}
@Override
public void cleanup() {
for(Map.Entry<String, Integer> entry:counterMap.entrySet()){
System.out.println(entry.getKey()+" : " + entry.getValue());
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("call"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
Creating Topology
Storm 拓扑结构从本质上是一个 Thrift 结构。TopologyBuilder 类提供了简单易用的方法来创建复杂拓扑结构。TopologyBuilder 类具有设置 spout (setSpout) 和 bolt (setBolt) 的方法。最后,TopologyBuilder 具有 createTopology 来创建拓扑结构。使用以下代码段来创建拓扑结构 −
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());
builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
.shuffleGrouping("call-log-reader-spout");
builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
.fieldsGrouping("call-log-creator-bolt", new Fields("call"));
shuffleGrouping 和 fieldsGrouping 方法有助于为 spout 和 bolt 设置流分组。
Local Cluster
出于开发目的,我们可以使用“LocalCluster”对象创建一个本地集群,然后使用“LocalCluster”类的“submitTopology”方法提交拓扑结构。“submitTopology”的一个参数是“Config”类的实例。“Config”类用于在提交拓扑结构之前设置配置选项。此配置选项将在运行时与集群配置合并,并通过准备方法发送到所有任务(spout 和 bolt)。一旦拓扑结构提交到集群,我们将等待 10 秒,以便集群计算提交的拓扑结构,然后再使用“LocalCluster”的“shutdown”方法关闭集群。完整的程序代码如下 −
Coding − LogAnalyserStorm.java
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
//import storm configuration packages
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
//Create main class LogAnalyserStorm submit topology.
public class LogAnalyserStorm {
public static void main(String[] args) throws Exception{
//Create Config instance for cluster configuration
Config config = new Config();
config.setDebug(true);
//
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());
builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
.shuffleGrouping("call-log-reader-spout");
builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
.fieldsGrouping("call-log-creator-bolt", new Fields("call"));
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("LogAnalyserStorm", config, builder.createTopology());
Thread.sleep(10000);
//Stop the topology
cluster.shutdown();
}
}
Building and Running the Application
完整应用程序包含四个 Java 代码。它们是 −
-
FakeCallLogReaderSpout.java
-
CallLogCreaterBolt.java
-
CallLogCounterBolt.java
-
LogAnalyerStorm.java
可以通过以下命令构建应用程序 −
javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java
可以通过以下命令运行应用程序 −
java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserStorm
Output
应用程序启动后,它将输出有关集群启动过程、喷口和螺栓处理以及最终集群关闭过程的完整详细信息。在“CallLogCounterBolt”中,我们打印了呼叫及其计数详细信息。此信息将显示在控制台上,如下所示 −
1234123402 - 1234123401 : 78
1234123402 - 1234123404 : 88
1234123402 - 1234123403 : 105
1234123401 - 1234123404 : 74
1234123401 - 1234123403 : 81
1234123401 - 1234123402 : 81
1234123403 - 1234123404 : 86
1234123404 - 1234123401 : 63
1234123404 - 1234123402 : 82
1234123403 - 1234123402 : 83
1234123404 - 1234123403 : 86
1234123403 - 1234123401 : 93
Non-JVM languages
Storm 拓扑由 Thrift 接口实现,这使得使用任何语言提交拓扑变得很容易。Storm 支持 Ruby、Python 和许多其他语言。让我们看看 Python 绑定。
Python Binding
Python 是一种通用的解释型、交互式、面向对象且高级的编程语言。Storm 支持 Python 来实现其拓扑。Python 支持发射、锚定、确认日志操作。
如您所知,螺栓可以用任何语言定义。用另一种语言编写的螺栓作为子进程执行,并且 Storm 通过 stdin/stdout 上的 JSON 消息与那些子进程通信。首先使用支持 Python 绑定的示例螺栓 WordCount。
public static class WordCount implements IRichBolt {
public WordSplit() {
super("python", "splitword.py");
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
此处,类 WordCount 实现 IRichBolt 接口并在 Python 实现指定的超类方法参数“splitword.py”下运行。现在,创建一个名为 “splitword.py” 的 Python 实现。
import storm
class WordCountBolt(storm.BasicBolt):
def process(self, tup):
words = tup.values[0].split(" ")
for word in words:
storm.emit([word])
WordCountBolt().run()
这是一个 Python 示例实现,用于计算给定句子中的单词数量。同样,您还可以与其他支持的语言绑定。