Apache Kafka 简明教程
Apache Kafka - Integration With Storm
在本章中,我们将学习如何集 Kafka 与 Apache Storm。
In this chapter, we will learn how to integrate Kafka with Apache Storm.
About Storm
Storm 最初由 Nathan Marz 和 BackType 团队创建。在很短的时间内,Apache Storm 已成为分布式实时处理系统的标准,允许你处理海量数据。Storm 非常快,基准测试证明每个节点每秒可处理超过一百万个元组。Apache Storm 连续运行,从配置的源(注水口)消耗数据,并将数据传递到处理管线(螺栓)。结合注水口和螺栓形成拓扑。
Storm was originally created by Nathan Marz and team at BackType. In a short time, Apache Storm became a standard for distributed real-time processing system that allows you to process a huge volume of data. Storm is very fast and a benchmark clocked it at over a million tuples processed per second per node. Apache Storm runs continuously, consuming data from the configured sources (Spouts) and passes the data down the processing pipeline (Bolts). Com-bined, Spouts and Bolts make a Topology.
Integration with Storm
Kafka 和 Storm 会自然地相互补充,而且它们功能强大的合作可为快速移动的大数据启用实时流分析。Kafka 和 Storm 集成更便于开发人员从 Storm 拓扑中提取和发布数据流。
Kafka and Storm naturally complement each other, and their powerful cooperation enables real-time streaming analytics for fast-moving big data. Kafka and Storm integration is to make easier for developers to ingest and publish data streams from Storm topologies.
Conceptual flow
注水口是流的来源。例如,注水口可能会从 Kafka 主题读取元组并以流形式发出它们。螺栓会消耗输入流、处理并可能发出新的流。螺栓的操作从运行函数、过滤元组到执行流聚合、流联接、与数据库通信等可谓无所不包。Storm 拓扑中的每个节点都会并行执行。拓扑会在终止前无限期运行。Storm 会自动重新分配任何失败的任务。此外,Storm 能确保即使机器宕机且消息丢失也不会造成数据丢失。
A spout is a source of streams. For example, a spout may read tuples off a Kafka Topic and emit them as a stream. A bolt consumes input streams, process and possibly emits new streams. Bolts can do anything from running functions, filtering tuples, do streaming aggregations, streaming joins, talk to databases, and more. Each node in a Storm topology executes in parallel. A topology runs indefinitely until you terminate it. Storm will automatically reassign any failed tasks. Additionally, Storm guarantees that there will be no data loss, even if the machines go down and messages are dropped.
让我们详细了解 Kafka-Storm 集成 API。有三个主要类可以将 Kafka 集成到 Storm 中。它们如下所示 −
Let us go through the Kafka-Storm integration API’s in detail. There are three main classes to integrate Kafka with Storm. They are as follows −
BrokerHosts - ZkHosts & StaticHosts
BrokerHosts 是一个接口,ZkHosts 和 StaticHosts 是其两个主要实现。ZkHosts 通过维护 ZooKeeper 中的详细信息来动态跟踪 Kafka 代理,而 StaticHosts 则用于手动/静态设置 Kafka 代理及其详细信息。ZkHosts 是访问 Kafka 代理的简单快捷方式。
BrokerHosts is an interface and ZkHosts and StaticHosts are its two main implementations. ZkHosts is used to track the Kafka brokers dynamically by maintaining the details in ZooKeeper, while StaticHosts is used to manually / statically set the Kafka brokers and its details. ZkHosts is the simple and fast way to access the Kafka broker.
ZkHosts 的签名如下 −
The signature of ZkHosts is as follows −
public ZkHosts(String brokerZkStr, String brokerZkPath)
public ZkHosts(String brokerZkStr)
其中,brokerZkStr 是 ZooKeeper 主机,brokerZkPath 是 ZooKeeper 路径,用于维护 Kafka 代理详细信息。
Where brokerZkStr is ZooKeeper host and brokerZkPath is the ZooKeeper path to maintain the Kafka broker details.
KafkaConfig API
此 API 用于定义 Kafka 集群的配置设置。Kafka 配置的签名定义如下
This API is used to define configuration settings for the Kafka cluster. The signature of Kafka Con-fig is defined as follows
public KafkaConfig(BrokerHosts hosts, string topic)
SpoutConfig API
Spoutconfig 是 KafkaConfig 的扩展,支持附加的 ZooKeeper 信息。
Spoutconfig is an extension of KafkaConfig that supports additional ZooKeeper information.
public SpoutConfig(BrokerHosts hosts, string topic, string zkRoot, string id)
-
Hosts − The BrokerHosts can be any implementation of BrokerHosts interface
-
Topic − topic name.
-
zkRoot − ZooKeeper root path.
-
id − The spout stores the state of the offsets its consumed in Zookeeper. The id should uniquely identify your spout.
SchemeAsMultiScheme
SchemeAsMultiScheme 是一个接口,它规定从 Kafka 消费的 ByteBuffer 转换为 storm 元组的方式。它源自 MultiScheme 并接受 Scheme 类的实现。Scheme 类的实现有很多,其中一个实现是 StringScheme,它将字节解析为简单的字符串。它还控制着输出字段的命名。签名定义如下所示。
SchemeAsMultiScheme is an interface that dictates how the ByteBuffer consumed from Kafka gets transformed into a storm tuple. It is derived from MultiScheme and accept implementation of Scheme class. There are lot of implementation of Scheme class and one such implementation is StringScheme, which parses the byte as a simple string. It also controls the naming of your output field. The signature is defined as follows.
public SchemeAsMultiScheme(Scheme scheme)
-
Scheme − byte buffer consumed from kafka.
KafkaSpout API
KafkaSpout 是我们的流嘴实现,它将与 Storm 集成。它从 kafka 主题提取消息,并将其作为元组发送到 Storm 生态系统中。KafkaSpout 从 SpoutConfig 获取其配置详细信息。
KafkaSpout is our spout implementation, which will integrate with Storm. It fetches the mes-sages from kafka topic and emits it into Storm ecosystem as tuples. KafkaSpout get its config-uration details from SpoutConfig.
下面是创建简单 Kafka 流嘴的示例代码。
Below is a sample code to create a simple Kafka spout.
// ZooKeeper connection string
BrokerHosts hosts = new ZkHosts(zkConnString);
//Creating SpoutConfig Object
SpoutConfig spoutConfig = new SpoutConfig(hosts,
topicName, "/" + topicName UUID.randomUUID().toString());
//convert the ByteBuffer to String.
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
//Assign SpoutConfig to KafkaSpout.
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
Bolt Creation
Bolt 是一个将元组作为输入,处理元组并生成新元组作为输出的组件。Bolt 将实现 IRichBolt 接口。在此程序中,Bolt 类 WordSplitter-Bolt 和 WordCounterBolt 用于执行操作。
Bolt is a component that takes tuples as input, processes the tuple, and produces new tuples as output. Bolts will implement IRichBolt interface. In this program, two bolt classes WordSplitter-Bolt and WordCounterBolt are used to perform the operations.
IRichBolt 接口具有以下方法:
IRichBolt interface has the following methods −
-
Prepare − Provides the bolt with an environment to execute. The executors will run this method to initialize the spout.
-
Execute − Process a single tuple of input.
-
Cleanup − Called when a bolt is going to shut down.
-
declareOutputFields − Declares the output schema of the tuple.
让我们创建 SplitBolt.java,它实现将句子拆分为单词的逻辑和 CountBolt.java,它实现分离唯一单词并统计其出现的逻辑。
Let us create SplitBolt.java, which implements the logic to split a sentence into words and CountBolt.java, which implements logic to separate unique words and count its occurrence.
SplitBolt.java
import java.util.Map;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.IRichBolt;
import backtype.storm.task.TopologyContext;
public class SplitBolt implements IRichBolt {
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple input) {
String sentence = input.getString(0);
String[] words = sentence.split(" ");
for(String word: words) {
word = word.trim();
if(!word.isEmpty()) {
word = word.toLowerCase();
collector.emit(new Values(word));
}
}
collector.ack(input);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
@Override
public void cleanup() {}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
CountBolt.java
import java.util.Map;
import java.util.HashMap;
import backtype.storm.tuple.Tuple;
import backtype.storm.task.OutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.IRichBolt;
import backtype.storm.task.TopologyContext;
public class CountBolt implements IRichBolt{
Map<String, Integer> counters;
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.counters = new HashMap<String, Integer>();
this.collector = collector;
}
@Override
public void execute(Tuple input) {
String str = input.getString(0);
if(!counters.containsKey(str)){
counters.put(str, 1);
}else {
Integer c = counters.get(str) +1;
counters.put(str, c);
}
collector.ack(input);
}
@Override
public void cleanup() {
for(Map.Entry<String, Integer> entry:counters.entrySet()){
System.out.println(entry.getKey()+" : " + entry.getValue());
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
Submitting to Topology
Storm 拓扑基本上是一个 Thrift 结构。TopologyBuilder 类提供了简单易用的方法来创建复杂的拓扑。TopologyBuilder 类具有设置分流 (setSpout) 和设置 Bolt (setBolt) 的方法。最后,TopologyBuilder 具有 createTopology 来创建拓扑。shuffleGrouping 和 fieldsGrouping 方法有助于设置流嘴和 Bolt 的流分组。
The Storm topology is basically a Thrift structure. TopologyBuilder class provides simple and easy methods to create complex topologies. The TopologyBuilder class has methods to set spout (setSpout) and to set bolt (setBolt). Finally, TopologyBuilder has createTopology to create to-pology. shuffleGrouping and fieldsGrouping methods help to set stream grouping for spout and bolts.
Local Cluster − 出于开发目的,我们可以使用 LocalCluster 对象创建一个本地集群,然后使用 LocalCluster 类的方法 submitTopology 提交拓扑。
Local Cluster − For development purposes, we can create a local cluster using LocalCluster object and then submit the topology using submitTopology method of LocalCluster class.
KafkaStormSample.java
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import backtype.storm.spout.SchemeAsMultiScheme;
import storm.kafka.trident.GlobalPartitionInformation;
import storm.kafka.ZkHosts;
import storm.kafka.Broker;
import storm.kafka.StaticHosts;
import storm.kafka.BrokerHosts;
import storm.kafka.SpoutConfig;
import storm.kafka.KafkaConfig;
import storm.kafka.KafkaSpout;
import storm.kafka.StringScheme;
public class KafkaStormSample {
public static void main(String[] args) throws Exception{
Config config = new Config();
config.setDebug(true);
config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
String zkConnString = "localhost:2181";
String topic = "my-first-topic";
BrokerHosts hosts = new ZkHosts(zkConnString);
SpoutConfig kafkaSpoutConfig = new SpoutConfig (hosts, topic, "/" + topic,
UUID.randomUUID().toString());
kafkaSpoutConfig.bufferSizeBytes = 1024 * 1024 * 4;
kafkaSpoutConfig.fetchSizeBytes = 1024 * 1024 * 4;
kafkaSpoutConfig.forceFromStart = true;
kafkaSpoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafka-spout", new KafkaSpout(kafkaSpoutCon-fig));
builder.setBolt("word-spitter", new SplitBolt()).shuffleGroup-ing("kafka-spout");
builder.setBolt("word-counter", new CountBolt()).shuffleGroup-ing("word-spitter");
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("KafkaStormSample", config, builder.create-Topology());
Thread.sleep(10000);
cluster.shutdown();
}
}
在进行编译之前,Kakfa-Storm 集成需要 curator ZooKeeper client java 库。Curator 版本 2.9.1 支持 Apache Storm 版本 0.9.5(本教程中使用)。下载下面指定的 jar 文件并将其放在 java 类路径中。
Before moving compilation, Kakfa-Storm integration needs curator ZooKeeper client java library. Curator version 2.9.1 support Apache Storm version 0.9.5 (which we use in this tutorial). Down-load the below specified jar files and place it in java class path.
-
curator-client-2.9.1.jar
-
curator-framework-2.9.1.jar
包括依赖文件后,使用以下命令编译程序,
After including dependency files, compile the program using the following command,
javac -cp "/path/to/Kafka/apache-storm-0.9.5/lib/*" *.java
Execution
启动 Kafka Producer CLI(在上章中有说明),创建一个名为 my-first-topic 的新主题,并提供一些示例消息,如下所示:
Start Kafka Producer CLI (explained in previous chapter), create a new topic called my-first-topic and provide some sample messages as shown below −
hello
kafka
storm
spark
test message
another test message
现在使用以下命令执行应用程序:
Now execute the application using the following command −
java -cp “/path/to/Kafka/apache-storm-0.9.5/lib/*”:. KafkaStormSample
此应用程序的示例输出如下所示:
The sample output of this application is specified below −
storm : 1
test : 2
spark : 1
another : 1
kafka : 1
hello : 1
message : 2