Apache Storm 简明教程
Apache Storm - Introduction
What is Apache Storm?
Apache Storm 是一个分布式实时大数据处理系统。Storm 被设计为以容错和水平可扩展的方法处理大量数据。它是一个流数据框架,具有最高的摄取率。虽然 Storm 是无状态的,但它通过 Apache ZooKeeper 管理分布式环境和集群状态。它很简单,并且您可以并行对实时数据执行各种操作。
Apache Storm is a distributed real-time big data-processing system. Storm is designed to process vast amount of data in a fault-tolerant and horizontal scalable method. It is a streaming data framework that has the capability of highest ingestion rates. Though Storm is stateless, it manages distributed environment and cluster state via Apache ZooKeeper. It is simple and you can execute all kinds of manipulations on real-time data in parallel.
Apache Storm 继续成为实时数据分析的领导者。Storm 易于设置和操作,并且保证每条消息都将至少处理一次。
Apache Storm is continuing to be a leader in real-time data analytics. Storm is easy to setup, operate and it guarantees that every message will be processed through the topology at least once.
Apache Storm vs Hadoop
基本上,Hadoop 和 Storm 框架用于分析大数据。它们在某些方面互补,但在某些方面也有所不同。Apache Storm 除了持久性之外,执行所有操作,而 Hadoop 擅长所有内容,但实时计算方面滞后。下表比较了 Storm 和 Hadoop 的属性。
Basically Hadoop and Storm frameworks are used for analyzing big data. Both of them complement each other and differ in some aspects. Apache Storm does all the operations except persistency, while Hadoop is good at everything but lags in real-time computation. The following table compares the attributes of Storm and Hadoop.
Storm |
Hadoop |
Real-time stream processing |
Batch processing |
Stateless |
Stateful |
Master/Slave architecture with ZooKeeper based coordination. The master node is called as nimbus and slaves are supervisors. |
Master-slave architecture with/without ZooKeeper based coordination. Master node is job tracker and slave node is task tracker. |
A Storm streaming process can access tens of thousands messages per second on cluster. |
Hadoop Distributed File System (HDFS) uses MapReduce framework to process vast amount of data that takes minutes or hours. |
Storm topology runs until shutdown by the user or an unexpected unrecoverable failure. |
MapReduce jobs are executed in a sequential order and completed eventually. |
Both are distributed and fault-tolerant |
If nimbus / supervisor dies, restarting makes it continue from where it stopped, hence nothing gets affected. |
Use-Cases of Apache Storm
Apache Storm 以实时大数据流处理而闻名。出于这个原因,大多数公司将 Storm 用作其系统的组成部分。一些值得注意的示例如下 −
Apache Storm is very famous for real-time big data stream processing. For this reason, most of the companies are using Storm as an integral part of their system. Some notable examples are as follows −
Twitter − Twitter 使用 Apache Storm 来处理其 “Publisher Analytics 产品” 的范围。Publisher Analytics 产品在 Twitter 平台中处理每条推文和点击。Apache Storm 与 Twitter 基础设施深度集成。
Twitter − Twitter is using Apache Storm for its range of “Publisher Analytics products”. “Publisher Analytics Products” process each and every tweets and clicks in the Twitter Platform. Apache Storm is deeply integrated with Twitter infrastructure.
NaviSite − NaviSite 使用 Storm 用于事件日志监控/审计系统。在系统中生成的每条日志都将通过 Storm。Storm 将根据配置的正则表达式集检查消息,如果有匹配项,则会将该特定消息保存到数据库中。
NaviSite − NaviSite is using Storm for Event log monitoring/auditing system. Every logs generated in the system will go through the Storm. Storm will check the message against the configured set of regular expression and if there is a match, then that particular message will be saved to the database.
Wego − Wego 是位于新加坡的旅游元搜索引擎。与旅游相关的的数据来自世界各地,时间不同。Storm 帮助 Wego 搜索实时数据、解决并发问题并找到最终用户的最佳匹配。
Wego − Wego is a travel metasearch engine located in Singapore. Travel related data comes from many sources all over the world with different timing. Storm helps Wego to search real-time data, resolves concurrency issues and find the best match for the end-user.
Apache Storm Benefits
以下是 Apache Storm 提供的好处列表 −
Here is a list of the benefits that Apache Storm offers −
-
Storm is open source, robust, and user friendly. It could be utilized in small companies as well as large corporations.
-
Storm is fault tolerant, flexible, reliable, and supports any programming language.
-
Allows real-time stream processing.
-
Storm is unbelievably fast because it has enormous power of processing the data.
-
Storm can keep up the performance even under increasing load by adding resources linearly. It is highly scalable.
-
Storm performs data refresh and end-to-end delivery response in seconds or minutes depends upon the problem. It has very low latency.
-
Storm has operational intelligence.
-
Storm provides guaranteed data processing even if any of the connected nodes in the cluster die or messages are lost.
Apache Storm - Core Concepts
Apache Storm 从一端读取实时的原始数据流,并通过一系列的小处理单元对其进行处理,然后在另一端输出已处理/有用的信息。
Apache Storm reads raw stream of real-time data from one end and passes it through a sequence of small processing units and output the processed / useful information at the other end.
以下图表描述了 Apache Storm 的核心概念。
The following diagram depicts the core concept of Apache Storm.

现在,让我们仔细了解 Apache Storm 的组件 −
Let us now have a closer look at the components of Apache Storm −
Components |
Description |
Tuple |
Tuple is the main data structure in Storm. It is a list of ordered elements. By default, a Tuple supports all data types. Generally, it is modelled as a set of comma separated values and passed to a Storm cluster. |
Stream |
Stream is an unordered sequence of tuples. |
Spouts |
Source of stream. Generally, Storm accepts input data from raw data sources like Twitter Streaming API, Apache Kafka queue, Kestrel queue, etc. Otherwise you can write spouts to read data from datasources. “ISpout" is the core interface for implementing spouts. Some of the specific interfaces are IRichSpout, BaseRichSpout, KafkaSpout, etc. |
Bolts |
Bolts are logical processing units. Spouts pass data to bolts and bolts process and produce a new output stream. Bolts can perform the operations of filtering, aggregation, joining, interacting with data sources and databases. Bolt receives data and emits to one or more bolts. “IBolt” is the core interface for implementing bolts. Some of the common interfaces are IRichBolt, IBasicBolt, etc. |
让我们举一个“Twitter 分析”的实时示例,并了解如何在 Apache Storm 中对其进行建模。以下图表描绘了结构。
Let’s take a real-time example of “Twitter Analysis” and see how it can be modelled in Apache Storm. The following diagram depicts the structure.

“Twitter 分析”的输入来自 Twitter Streaming API。喷口将使用 Twitter Streaming API 读取用户发出的微博,并作为元组流进行输出。喷口的一个元组将具有一个 Twitter 用户名和一个作为逗号分隔值的单条微博。然后,这个元组流将被转发给螺栓,而螺栓将把微博拆分为各个单词,计算单词数量,并将信息持久保存到已配置的数据源。现在,我们可以通过查询数据源轻松地得到结果。
The input for the “Twitter Analysis” comes from Twitter Streaming API. Spout will read the tweets of the users using Twitter Streaming API and output as a stream of tuples. A single tuple from the spout will have a twitter username and a single tweet as comma separated values. Then, this steam of tuples will be forwarded to the Bolt and the Bolt will split the tweet into individual word, calculate the word count, and persist the information to a configured datasource. Now, we can easily get the result by querying the datasource.
Topology
喷口和螺栓相互连接,它们形成了一个拓扑。实时应用程序逻辑在 Storm 拓扑内指定。简单来说,拓扑是有向图,其中顶点是计算,边是数据流。
Spouts and bolts are connected together and they form a topology. Real-time application logic is specified inside Storm topology. In simple words, a topology is a directed graph where vertices are computation and edges are stream of data.
一个简单的拓扑结构从流经器开始。流经器向一个或多个螺栓发送数据。螺栓表示拓扑结构中节点,该节点拥有最小的处理逻辑,而且螺栓的输出可以被输入至另一个螺栓。
A simple topology starts with spouts. Spout emits the data to one or more bolts. Bolt represents a node in the topology having the smallest processing logic and the output of a bolt can be emitted into another bolt as input.
Storm 会一直保持拓扑一直运行,直到你终止拓扑。Apache Storm 的主要工作是运行拓扑,并且可在特定时间运行任何数量的拓扑。
Storm keeps the topology always running, until you kill the topology. Apache Storm’s main job is to run the topology and will run any number of topology at a given time.
Tasks
现在,你已经对流经器和螺栓有了基本概念。它们是拓扑结构的最小逻辑单元,而且使用单个流经器和一系列螺栓构建拓扑结构。为了让拓扑成功运行,应对它们按特定顺序恰当执行。Storm 对各个流经器和螺栓执行的操作称为“任务”。简而言之,任务即是流经器或螺栓的执行。在特定时间,每个流经器和螺栓可以具有多个实例,并在多个独立线程中运行。
Now you have a basic idea on spouts and bolts. They are the smallest logical unit of the topology and a topology is built using a single spout and an array of bolts. They should be executed properly in a particular order for the topology to run successfully. The execution of each and every spout and bolt by Storm is called as “Tasks”. In simple words, a task is either the execution of a spout or a bolt. At a given time, each spout and bolt can have multiple instances running in multiple separate threads.
Workers
拓扑结构在分布式的方式运行,针对多个工作节点运行。Storm 将任务均匀地分散在各个工作节点上。工作节点的作用是对任务进行监听,以及在有新任务到来时启动或停止处理。
A topology runs in a distributed manner, on multiple worker nodes. Storm spreads the tasks evenly on all the worker nodes. The worker node’s role is to listen for jobs and start or stop the processes whenever a new job arrives.
Stream Grouping
数据流会从流经器流向螺栓,或从一个螺栓流向另一个螺栓。流分组控制如何向拓扑结构中分流元组,并帮助我们理解元组在拓扑结构中的流动方式。共有四种内置的分组,如下所示。
Stream of data flows from spouts to bolts or from one bolt to another bolt. Stream grouping controls how the tuples are routed in the topology and helps us to understand the tuples flow in the topology. There are four in-built groupings as explained below.
Shuffle Grouping
在随机分组中,相同数量的元组随机分配到执行螺栓的所有工作节点。下图生动地描述了这个结构。
In shuffle grouping, an equal number of tuples is distributed randomly across all of the workers executing the bolts. The following diagram depicts the structure.

Field Grouping
元组中具有相同值字段的元组被分组在一起,而剩余的元组被保留在外。然后,具有相同字段值的元组会被发送到执行螺栓的相同工作节点。例如,如果通过“单词”字段对流进行分组,那么具有相同字符串“你好”的元组将传入同一工作节点。下图说明了字段分组的工作原理。
The fields with same values in tuples are grouped together and the remaining tuples kept outside. Then, the tuples with the same field values are sent forward to the same worker executing the bolts. For example, if the stream is grouped by the field “word”, then the tuples with the same string, “Hello” will move to the same worker. The following diagram shows how Field Grouping works.

Apache Storm - Cluster Architecture
Apache Storm 的主要亮点之一在于它是一种容错的、没有“单点故障”(SPOF) 的快速分布式应用程序。我们可以根据需要在尽可能多的系统中安装 Apache Storm,以增加应用程序的容量。
One of the main highlight of the Apache Storm is that it is a fault-tolerant, fast with no “Single Point of Failure” (SPOF) distributed application. We can install Apache Storm in as many systems as needed to increase the capacity of the application.
让我们看看 Apache Storm 集群的设计和内部架构是如何的。下图显示了集群设计。
Let’s have a look at how the Apache Storm cluster is designed and its internal architecture. The following diagram depicts the cluster design.

Apache Storm 有两种类型的节点, Nimbus (主节点)和 Supervisor (工作节点)。Nimbus 是 Apache Storm 的核心组件。Nimbus 的主要工作是运行 Storm 拓扑。Nimbus 分析拓扑并收集要执行的任务。然后,它会将任务分配给可用的 supervisior。
Apache Storm has two type of nodes, Nimbus (master node) and Supervisor (worker node). Nimbus is the central component of Apache Storm. The main job of Nimbus is to run the Storm topology. Nimbus analyzes the topology and gathers the task to be executed. Then, it will distributes the task to an available supervisor.
一个 supervisor 将拥有一个或多个工作进程。supervisior 会将任务委托给工作进程。工作进程将根据需要生成尽可能多的执行程序并运行任务。Apache Storm 使用内部分布式消息系统在 nimbus 和 supervisior 之间进行通信。
A supervisor will have one or more worker process. Supervisor will delegate the tasks to worker processes. Worker process will spawn as many executors as needed and run the task. Apache Storm uses an internal distributed messaging system for the communication between nimbus and supervisors.
Components |
Description |
Nimbus |
Nimbus is a master node of Storm cluster. All other nodes in the cluster are called as worker nodes. Master node is responsible for distributing data among all the worker nodes, assign tasks to worker nodes and monitoring failures. |
Supervisor |
The nodes that follow instructions given by the nimbus are called as Supervisors. A supervisor has multiple worker processes and it governs worker processes to complete the tasks assigned by the nimbus. |
Worker process |
A worker process will execute tasks related to a specific topology. A worker process will not run a task by itself, instead it creates executors and asks them to perform a particular task. A worker process will have multiple executors. |
Executor |
An executor is nothing but a single thread spawn by a worker process. An executor runs one or more tasks but only for a specific spout or bolt. |
Task |
A task performs actual data processing. So, it is either a spout or a bolt. |
ZooKeeper framework |
Apache ZooKeeper is a service used by a cluster (group of nodes) to coordinate between themselves and maintaining shared data with robust synchronization techniques. Nimbus is stateless, so it depends on ZooKeeper to monitor the working node status. ZooKeeper helps the supervisor to interact with the nimbus. It is responsible to maintain the state of nimbus and supervisor. |
Storm 本质上是无状态的。即使无状态的本质有其自身的缺点,但它实际上帮助 Storm 以尽可能最佳、最快速的方式处理实时数据。
Storm is stateless in nature. Even though stateless nature has its own disadvantages, it actually helps Storm to process real-time data in the best possible and quickest way.
不过,Storm 并不完全是无状态的。它将自己的状态存储在 Apache ZooKeeper 中。由于状态在 Apache ZooKeeper 中可用,因此一个失败的 nimbus 可以重新启动并从它停止的位置继续工作。通常,类似于 monit 的服务监控工具将监控 Nimbus,并在出现任何故障时重新启动它。
Storm is not entirely stateless though. It stores its state in Apache ZooKeeper. Since the state is available in Apache ZooKeeper, a failed nimbus can be restarted and made to work from where it left. Usually, service monitoring tools like monit will monitor Nimbus and restart it if there is any failure.
Apache Storm 还具有一种称为 Trident Topology 的高级拓扑,具有状态维护,并且还提供类似于 Pig 的高级别 API。我们将在接下来的章节讨论所有这些功能。
Apache Storm also have an advanced topology called Trident Topology with state maintenance and it also provides a high-level API like Pig. We will discuss all these features in the coming chapters.
Apache Storm - Workflow
一个正常工作的 Storm 群集应有一个 nimbus 和一个或多个管理员。另一个重要的节点是 Apache ZooKeeper,它将用于在 nimbus 和管理员之间进行协调。
A working Storm cluster should have one nimbus and one or more supervisors. Another important node is Apache ZooKeeper, which will be used for the coordination between the nimbus and the supervisors.
现在让我们仔细了解 Apache Storm 的工作流程 −
Let us now take a close look at the workflow of Apache Storm −
-
Initially, the nimbus will wait for the “Storm Topology” to be submitted to it.
-
Once a topology is submitted, it will process the topology and gather all the tasks that are to be carried out and the order in which the task is to be executed.
-
Then, the nimbus will evenly distribute the tasks to all the available supervisors.
-
At a particular time interval, all supervisors will send heartbeats to the nimbus to inform that they are still alive.
-
When a supervisor dies and doesn’t send a heartbeat to the nimbus, then the nimbus assigns the tasks to another supervisor.
-
When the nimbus itself dies, supervisors will work on the already assigned task without any issue.
-
Once all the tasks are completed, the supervisor will wait for a new task to come in.
-
In the meantime, the dead nimbus will be restarted automatically by service monitoring tools.
-
The restarted nimbus will continue from where it stopped. Similarly, the dead supervisor can also be restarted automatically. Since both the nimbus and the supervisor can be restarted automatically and both will continue as before, Storm is guaranteed to process all the task at least once.
-
Once all the topologies are processed, the nimbus waits for a new topology to arrive and similarly the supervisor waits for new tasks.
默认情况下,Storm 集群有两种模式 −
By default, there are two modes in a Storm cluster −
-
Local mode − This mode is used for development, testing, and debugging because it is the easiest way to see all the topology components working together. In this mode, we can adjust parameters that enable us to see how our topology runs in different Storm configuration environments. In Local mode, storm topologies run on the local machine in a single JVM.
-
Production mode − In this mode, we submit our topology to the working storm cluster, which is composed of many processes, usually running on different machines. As discussed in the workflow of storm, a working cluster will run indefinitely until it is shut down.
Storm - Distributed Messaging System
Apache Storm 处理实时数据,而输入通常来自消息排队系统。外部分布式消息系统将为实时计算提供必要的输入。流经器将从消息系统读取数据,并将它转换为元组,并输入至 Apache Storm。有趣的是,Apache Storm 在其内部使用自己的分布式消息系统,来进行其 Nimbus 和 Supervisor 之间的通信。
Apache Storm processes real-time data and the input normally comes from a message queuing system. An external distributed messaging system will provide the input necessary for the realtime computation. Spout will read the data from the messaging system and convert it into tuples and input into the Apache Storm. The interesting fact is that Apache Storm uses its own distributed messaging system internally for the communication between its nimbus and supervisor.
What is Distributed Messaging System?
分布式消息基于可靠消息排队的概念。消息在客户端应用程序和消息系统之间异步排队。分布式消息系统提供了可靠性、可伸缩性和持久性的优势。
Distributed messaging is based on the concept of reliable message queuing. Messages are queued asynchronously between client applications and messaging systems. A distributed messaging system provides the benefits of reliability, scalability, and persistence.
大多数消息模式都遵循 publish-subscribe 模型(即 Pub-Sub ),其中消息的发送者被称为 publishers ,而希望接收消息者被称为 subscribers 。
Most of the messaging patterns follow the publish-subscribe model (simply Pub-Sub) where the senders of the messages are called publishers and those who want to receive the messages are called subscribers.
在发送者发布消息后,订阅者借助筛选选项可以接收选定的消息。通常情况下,我们有两种类型的筛选,一种是 topic-based filtering ,另一种是 content-based filtering 。
Once the message has been published by the sender, the subscribers can receive the selected message with the help of a filtering option. Usually we have two types of filtering, one is topic-based filtering and another one is content-based filtering.
请注意,发布-订阅模式只能通过消息进行通信。这是一种非常松散耦合的架构;即使是发送者也不了解其订阅者是谁。许多消息模式通过消息代理实现,以供多个订阅者及时访问发布的消息。现实生活中的一个例子是 Dish TV,它发布不同的频道,如运动频道、电影频道、音乐频道等,而任何人都可以订阅他们自己的频道集,并在其订阅的频道可用时获取它们。
Note that the pub-sub model can communicate only via messages. It is a very loosely coupled architecture; even the senders don’t know who their subscribers are. Many of the message patterns enable with message broker to exchange publish messages for timely access by many subscribers. A real-life example is Dish TV, which publishes different channels like sports, movies, music, etc., and anyone can subscribe to their own set of channels and get them whenever their subscribed channels are available.

下表介绍了一些常见的超高吞吐消息系统 −
The following table describes some of the popular high throughput messaging systems −
Distributed messaging system |
Description |
Apache Kafka |
Kafka was developed at LinkedIn corporation and later it became a sub-project of Apache. Apache Kafka is based on brokerenabled, persistent, distributed publish-subscribe model. Kafka is fast, scalable, and highly efficient. |
RabbitMQ |
RabbitMQ is an open source distributed robust messaging application. It is easy to use and runs on all platforms. |
JMS(Java Message Service) |
JMS is an open source API that supports creating, reading, and sending messages from one application to another. It provides guaranteed message delivery and follows publish-subscribe model. |
ActiveMQ |
ActiveMQ messaging system is an open source API of JMS. |
ZeroMQ |
ZeroMQ is broker-less peer-peer message processing. It provides push-pull, router-dealer message patterns. |
Kestrel |
Kestrel is a fast, reliable, and simple distributed message queue. |
Thrift Protocol
Thrift 是 Facebook 为跨语言服务开发和远程过程调用 (RPC) 而构建的。后来,它成为一个开源 Apache 项目。Apache Thrift 是一个 Interface Definition Language ,允许在已定义的数据类型之上以一种简单的方式定义新的数据类型和服务实现。
Thrift was built at Facebook for cross-language services development and remote procedure call (RPC). Later, it became an open source Apache project. Apache Thrift is an Interface Definition Language and allows to define new data types and services implementation on top of the defined data types in an easy manner.
Apache Thrift 也是一个通信框架,它支持嵌入式系统、移动应用程序、Web 应用程序和许多其他编程语言。与 Apache Thrift 相关的一些关键功能是它的模块化、灵活性以及高性能。此外,它还可以在分布式应用程序中执行流式传输、消息传递和 RPC。
Apache Thrift is also a communication framework that supports embedded systems, mobile applications, web applications, and many other programming languages. Some of the key features associated with Apache Thrift are its modularity, flexibility, and high performance. In addition, it can perform streaming, messaging, and RPC in distributed applications.
Storm 广泛使用 Thrift 协议进行其内部通信和数据定义。Storm 拓扑只是 Thrift Structs 。Apache Storm 中运行拓扑的 Storm Nimbus 是一个 Thrift service 。
Storm extensively uses Thrift Protocol for its internal communication and data definition. Storm topology is simply Thrift Structs. Storm Nimbus that runs the topology in Apache Storm is a Thrift service.
Apache Storm - Installation
现在让我们来看看如何在计算机上安装 Apache Storm 框架。这里有三个主要的步骤 −
Let us now see how to install Apache Storm framework on your machine. There are three majo steps here −
-
Install Java on your system, if you don’t have it already.
-
Install ZooKeeper framework.
-
Install Apache Storm framework.
Step 1 − Verifying Java Installation
使用以下命令检查您的系统是否已安装 Java。
Use the following command to check whether you have Java already installed on your system.
$ java -version
如果 Java 已经存在,那么您会看到它的版本号。如果没有,请下载 JDK 的最新版本。
If Java is already there, then you would see its version number. Else, download the latest version of JDK.
Step 1.1 − Download JDK
使用以下链接下载 JDK 的最新版本 − www.oracle.com
Download the latest version of JDK by using the following link − www.oracle.com
最新版本是 JDK 8u 60,文件是 “jdk-8u60-linux-x64.tar.gz” 。将文件下载到您的计算机上。
The latest version is JDK 8u 60 and the file is “jdk-8u60-linux-x64.tar.gz”. Download the file on your machine.
Step 1.2 − Extract files
通常,文件被下载到 downloads 文件夹中。使用以下命令提取 tar 设置。
Generally files are being downloaded onto the downloads folder. Extract the tar setup using the following commands.
$ cd /go/to/download/path
$ tar -zxf jdk-8u60-linux-x64.gz
Step 1.3 − Move to opt directory
为了让所有用户都可以使用 Java,将提取的 Java 内容移动到“/usr/local/java”文件夹中。
To make Java available to all users, move the extracted java content to “/usr/local/java” folder.
$ su
password: (type password of root user)
$ mkdir /opt/jdk
$ mv jdk-1.8.0_60 /opt/jdk/
Step 1.4 − Set path
若要设置 path 及 JAVA_HOME 变量,请将以下命令添加到 ~/.bashrc 文件中。
To set path and JAVA_HOME variables, add the following commands to ~/.bashrc file.
export JAVA_HOME =/usr/jdk/jdk-1.8.0_60
export PATH=$PATH:$JAVA_HOME/bin
现在将所有更改应用到当前运行的系统中。
Now apply all the changes in to the current running system.
$ source ~/.bashrc
Step 2 − ZooKeeper Framework Installation
Step 2.1 − Download ZooKeeper
要在计算机上安装 ZooKeeper 框架,请访问以下链接并下载 ZooKeeper http://zookeeper.apache.org/releases.html 的最新版本
To install ZooKeeper framework on your machine, visit the following link and download the latest version of ZooKeeper http://zookeeper.apache.org/releases.html
到目前为止,ZooKeeper 的最新版本是 3.4.6 (ZooKeeper-3.4.6.tar.gz)。
As of now, the latest version of ZooKeeper is 3.4.6 (ZooKeeper-3.4.6.tar.gz).
Step 2.2 − Extract tar file
使用以下命令提取 tar 文件 −
Extract the tar file using the following commands −
$ cd opt/
$ tar -zxf zookeeper-3.4.6.tar.gz
$ cd zookeeper-3.4.6
$ mkdir data
Step 2.3 − Create configuration file
使用命令 "vi conf/zoo.cfg" 打开名为“conf/zoo.cfg”的配置文件,并使用所有以下参数作为起始点。
Open configuration file named “conf/zoo.cfg” using the command "vi conf/zoo.cfg" and setting all the following parameters as starting point.
$ vi conf/zoo.cfg
tickTime=2000
dataDir=/path/to/zookeeper/data
clientPort=2181
initLimit=5
syncLimit=2
一旦配置文件已成功保存,便可以启动 ZooKeeper 服务器。
Once the configuration file has been saved successfully, you can start the ZooKeeper server.
Step 2.4 − Start ZooKeeper Server
使用以下命令启动 ZooKeeper 服务器。
Use the following command to start the ZooKeeper server.
$ bin/zkServer.sh start
执行该命令后,你会收到如下的响应 −
After executing this command, you will get a response as follows −
$ JMX enabled by default
$ Using config: /Users/../zookeeper-3.4.6/bin/../conf/zoo.cfg
$ Starting zookeeper ... STARTED
Step 2.5 − Start CLI
使用以下命令启动 CLI。
Use the following command to start the CLI.
$ bin/zkCli.sh
执行上述命令后,你将连接到 ZooKeeper 服务器并得到以下响应。
After executing the above command, you will be connected to the ZooKeeper server and get the following response.
Connecting to localhost:2181
................
................
................
Welcome to ZooKeeper!
................
................
WATCHER::
WatchedEvent state:SyncConnected type: None path:null
[zk: localhost:2181(CONNECTED) 0]
Step 2.6 − Stop ZooKeeper Server
连接到服务器并执行完所有操作后,可以使用以下命令停止 ZooKeeper 服务器。
After connecting the server and performing all the operations, you can stop the ZooKeeper server by using the following command.
bin/zkServer.sh stop
你已成功在你的机器上安装了 Java 和 ZooKeeper。现在让我们看看安装 Apache Storm 框架的步骤。
You have successfully installed Java and ZooKeeper on your machine. Let us now see the steps to install Apache Storm framework.
Step 3 − Apache Storm Framework Installation
Step 3.1 Download Storm
要在你的机器上安装 Storm 框架,请访问以下链接并下载 Storm 的最新版本。
To install Storm framework on your machine, visit the following link and download the latest version of Storm http://storm.apache.org/downloads.html
截至目前,Storm 的最新版本为“apache-storm-0.9.5.tar.gz”。
As of now, the latest version of Storm is “apache-storm-0.9.5.tar.gz”.
Step 3.2 − Extract tar file
使用以下命令提取 tar 文件 −
Extract the tar file using the following commands −
$ cd opt/
$ tar -zxf apache-storm-0.9.5.tar.gz
$ cd apache-storm-0.9.5
$ mkdir data
Step 3.3 − Open configuration file
Storm 的当前发行版包含一个位于“conf/storm.yaml”的文件,可配置 Storm 守护程序。将以下信息添加到该文件中。
The current release of Storm contains a file at “conf/storm.yaml” that configures Storm daemons. Add the following information to that file.
$ vi conf/storm.yaml
storm.zookeeper.servers:
- "localhost"
storm.local.dir: “/path/to/storm/data(any path)”
nimbus.host: "localhost"
supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703
应用完所有更改后,保存并返回到终端。
After applying all the changes, save and return to terminal.
Step 3.6 Start the UI
$ bin/storm ui
在启动 Storm 用户界面应用程序后,在浏览器中输入 URL,你便可以看到 Storm 集群信息及其正在运行的拓扑。该页面应该与以下屏幕截图类似。
After starting Storm user interface application, type the URL http://localhost:8080 in your favorite browser and you could see Storm cluster information and its running topology. The page should look similar to the following screenshot.

Apache Storm - Working Example
我们已经了解了 Apache Storm 的核心技术细节,现在是时候编写一些简单场景了。
We have gone through the core technical details of the Apache Storm and now it is time to code some simple scenarios.
Scenario – Mobile Call Log Analyzer
移动呼叫及其持续时间将作为输入提供给 Apache Storm,Storm 将处理和分组相同呼叫者和接收者之间的呼叫以及他们的呼叫总数。
Mobile call and its duration will be given as input to Apache Storm and the Storm will process and group the call between the same caller and receiver and their total number of calls.
Spout Creation
Spout 是一种用于数据生成的组件。基本上,Spout 将实现一个 IRichSpout 接口。“IRichSpout”接口有以下重要的方法 −
Spout is a component which is used for data generation. Basically, a spout will implement an IRichSpout interface. “IRichSpout” interface has the following important methods −
-
open − Provides the spout with an environment to execute. The executors will run this method to initialize the spout.
-
nextTuple − Emits the generated data through the collector.
-
close − This method is called when a spout is going to shutdown.
-
declareOutputFields − Declares the output schema of the tuple.
-
ack − Acknowledges that a specific tuple is processed
-
fail − Specifies that a specific tuple is not processed and not to be reprocessed.
Open
open 的语法方法如下所示
The signature of the open method is as follows −
open(Map conf, TopologyContext context, SpoutOutputCollector collector)
-
conf − Provides storm configuration for this spout.
-
context − Provides complete information about the spout place within the topology, its task id, input and output information.
-
collector − Enables us to emit the tuple that will be processed by the bolts.
nextTuple
nextTuple 的语法方法如下所示
The signature of the nextTuple method is as follows −
nextTuple()
nextTuple() 将从与 ack() 和 fail() 方法相同的循环中周期性调用。当没有工作可做时,它必须释放对线程的控制,这样其他方法就有机会被调用。因此,nextTuple 的首行用于检查处理是否已完成。如果是,它应至少休眠一毫秒,以便在返回之前减少对处理器的负载。
nextTuple() is called periodically from the same loop as the ack() and fail() methods. It must release control of the thread when there is no work to do, so that the other methods have a chance to be called. So the first line of nextTuple checks to see if processing has finished. If so, it should sleep for at least one millisecond to reduce load on the processor before returning.
declareOutputFields
declareOutputFields 的语法方法如下所示
The signature of the declareOutputFields method is as follows −
declareOutputFields(OutputFieldsDeclarer declarer)
declarer - 用于声明输出流 ID、输出字段等。
declarer − It is used to declare output stream ids, output fields, etc.
该方法用于指定数据元组的输出模式。
This method is used to specify the output schema of the tuple.
ack
ack 的语法方法如下所示
The signature of the ack method is as follows −
ack(Object msgId)
此方法确认已处理特定数据元组。
This method acknowledges that a specific tuple has been processed.
fail
nextTuple 的语法方法如下所示
The signature of the nextTuple method is as follows −
ack(Object msgId)
该方法告知尚未完全处理特定数据元组。Storm 将重新处理特定数据元组。
This method informs that a specific tuple has not been fully processed. Storm will reprocess the specific tuple.
FakeCallLogReaderSpout
在我们的情况下,我们需要收集通话记录的详细信息。通话记录的信息包含。
In our scenario, we need to collect the call log details. The information of the call log contains.
-
caller number
-
receiver number
-
duration
由于我们没有通话记录的实时信息,因此我们将生成伪造的通话记录。伪造的信息将使用 Random 类创建。完整程序代码如下所示。
Since, we don’t have real-time information of call logs, we will generate fake call logs. The fake information will be created using Random class. The complete program code is given below.
Coding − FakeCallLogReaderSpout.java
import java.util.*;
//import storm tuple packages
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
//import Spout interface packages
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
//Create a class FakeLogReaderSpout which implement IRichSpout interface
to access functionalities
public class FakeCallLogReaderSpout implements IRichSpout {
//Create instance for SpoutOutputCollector which passes tuples to bolt.
private SpoutOutputCollector collector;
private boolean completed = false;
//Create instance for TopologyContext which contains topology data.
private TopologyContext context;
//Create instance for Random class.
private Random randomGenerator = new Random();
private Integer idx = 0;
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.context = context;
this.collector = collector;
}
@Override
public void nextTuple() {
if(this.idx <= 1000) {
List<String> mobileNumbers = new ArrayList<String>();
mobileNumbers.add("1234123401");
mobileNumbers.add("1234123402");
mobileNumbers.add("1234123403");
mobileNumbers.add("1234123404");
Integer localIdx = 0;
while(localIdx++ < 100 && this.idx++ < 1000) {
String fromMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
String toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
while(fromMobileNumber == toMobileNumber) {
toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
}
Integer duration = randomGenerator.nextInt(60);
this.collector.emit(new Values(fromMobileNumber, toMobileNumber, duration));
}
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("from", "to", "duration"));
}
//Override all the interface methods
@Override
public void close() {}
public boolean isDistributed() {
return false;
}
@Override
public void activate() {}
@Override
public void deactivate() {}
@Override
public void ack(Object msgId) {}
@Override
public void fail(Object msgId) {}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
Bolt Creation
Bolt 是一个组件,它将元组作为输入,处理该元组,并产生新的元组作为输出。Bolts 将实现 IRichBolt 接口。在这个程序中,使用两个 bolt 类 CallLogCreatorBolt 和 CallLogCounterBolt 来执行操作。
Bolt is a component that takes tuples as input, processes the tuple, and produces new tuples as output. Bolts will implement IRichBolt interface. In this program, two bolt classes CallLogCreatorBolt and CallLogCounterBolt are used to perform the operations.
IRichBolt 接口具有以下方法:
IRichBolt interface has the following methods −
-
prepare − Provides the bolt with an environment to execute. The executors will run this method to initialize the spout.
-
execute − Process a single tuple of input.
-
cleanup − Called when a bolt is going to shutdown.
-
declareOutputFields − Declares the output schema of the tuple.
Prepare
prepare 方法的签名如下 −
The signature of the prepare method is as follows −
prepare(Map conf, TopologyContext context, OutputCollector collector)
-
conf − Provides Storm configuration for this bolt.
-
context − Provides complete information about the bolt place within the topology, its task id, input and output information, etc.
-
collector − Enables us to emit the processed tuple.
execute
execute 方法的签名如下 −
The signature of the execute method is as follows −
execute(Tuple tuple)
此处的 tuple 是要处理的输入元组。
Here tuple is the input tuple to be processed.
execute 方法一次处理单个元组。可以通过 Tuple 类的 getValue 方法来访问元组数据。不必立即处理输入元组。可以处理多个元组,并将其输出为一个输出元组。可以通过使用 OutputCollector 类来发出处理过的元组。
The execute method processes a single tuple at a time. The tuple data can be accessed by getValue method of Tuple class. It is not necessary to process the input tuple immediately. Multiple tuple can be processed and output as a single output tuple. The processed tuple can be emitted by using the OutputCollector class.
declareOutputFields
declareOutputFields 的语法方法如下所示
The signature of the declareOutputFields method is as follows −
declareOutputFields(OutputFieldsDeclarer declarer)
此处参数 declarer 用于声明输出流 ID、输出字段等。
Here the parameter declarer is used to declare output stream ids, output fields, etc.
此方法用于指定元组的输出模式
This method is used to specify the output schema of the tuple
Call log Creator Bolt
呼叫记录创建器 bolt 接收呼叫记录元组。呼叫记录元组包含呼叫者号码、接收者号码和呼叫持续时间。此 bolt 只需通过组合呼叫者号码和接收者号码来创建新值。新值的格式为“呼叫者号码 – 接收者号码”,并将其命名为新字段“call”。完整代码如下。
Call log creator bolt receives the call log tuple. The call log tuple has caller number, receiver number, and call duration. This bolt simply creates a new value by combining the caller number and the receiver number. The format of the new value is "Caller number – Receiver number" and it is named as new field, "call". The complete code is given below.
Coding − CallLogCreatorBolt.java
//import util packages
import java.util.HashMap;
import java.util.Map;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
//import Storm IRichBolt package
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
//Create a class CallLogCreatorBolt which implement IRichBolt interface
public class CallLogCreatorBolt implements IRichBolt {
//Create instance for OutputCollector which collects and emits tuples to produce output
private OutputCollector collector;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple tuple) {
String from = tuple.getString(0);
String to = tuple.getString(1);
Integer duration = tuple.getInteger(2);
collector.emit(new Values(from + " - " + to, duration));
}
@Override
public void cleanup() {}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("call", "duration"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
Call log Counter Bolt
呼叫记录计数器 bolt 将呼叫及其持续时间作为元组接收。此 bolt 在准备方法中初始化了一个字典(Map)对象。在 execute 方法中,它将检查元组并在 tuple 中针对每个新“call”值在字典对象中创建一个新条目,并在字典对象中设置值 1。对于字典中已经存在的条目,它只会递增其值。简单来说,此 bolt 将呼叫及其计数保存在字典对象中。我们可以将呼叫及其计数保存在字典中,也可以将其保存在数据源中。完整的程序代码如下 −
Call log counter bolt receives call and its duration as a tuple. This bolt initializes a dictionary (Map) object in the prepare method. In execute method, it checks the tuple and creates a new entry in the dictionary object for every new “call” value in the tuple and sets a value 1 in the dictionary object. For the already available entry in the dictionary, it just increment its value. In simple terms, this bolt saves the call and its count in the dictionary object. Instead of saving the call and its count in the dictionary, we can also save it to a datasource. The complete program code is as follows −
Coding − CallLogCounterBolt.java
import java.util.HashMap;
import java.util.Map;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
public class CallLogCounterBolt implements IRichBolt {
Map<String, Integer> counterMap;
private OutputCollector collector;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
this.counterMap = new HashMap<String, Integer>();
this.collector = collector;
}
@Override
public void execute(Tuple tuple) {
String call = tuple.getString(0);
Integer duration = tuple.getInteger(1);
if(!counterMap.containsKey(call)){
counterMap.put(call, 1);
}else{
Integer c = counterMap.get(call) + 1;
counterMap.put(call, c);
}
collector.ack(tuple);
}
@Override
public void cleanup() {
for(Map.Entry<String, Integer> entry:counterMap.entrySet()){
System.out.println(entry.getKey()+" : " + entry.getValue());
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("call"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
Creating Topology
Storm 拓扑结构从本质上是一个 Thrift 结构。TopologyBuilder 类提供了简单易用的方法来创建复杂拓扑结构。TopologyBuilder 类具有设置 spout (setSpout) 和 bolt (setBolt) 的方法。最后,TopologyBuilder 具有 createTopology 来创建拓扑结构。使用以下代码段来创建拓扑结构 −
The Storm topology is basically a Thrift structure. TopologyBuilder class provides simple and easy methods to create complex topologies. The TopologyBuilder class has methods to set spout (setSpout) and to set bolt (setBolt). Finally, TopologyBuilder has createTopology to create topology. Use the following code snippet to create a topology −
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());
builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
.shuffleGrouping("call-log-reader-spout");
builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
.fieldsGrouping("call-log-creator-bolt", new Fields("call"));
shuffleGrouping 和 fieldsGrouping 方法有助于为 spout 和 bolt 设置流分组。
shuffleGrouping and fieldsGrouping methods help to set stream grouping for spout and bolts.
Local Cluster
出于开发目的,我们可以使用“LocalCluster”对象创建一个本地集群,然后使用“LocalCluster”类的“submitTopology”方法提交拓扑结构。“submitTopology”的一个参数是“Config”类的实例。“Config”类用于在提交拓扑结构之前设置配置选项。此配置选项将在运行时与集群配置合并,并通过准备方法发送到所有任务(spout 和 bolt)。一旦拓扑结构提交到集群,我们将等待 10 秒,以便集群计算提交的拓扑结构,然后再使用“LocalCluster”的“shutdown”方法关闭集群。完整的程序代码如下 −
For development purpose, we can create a local cluster using "LocalCluster" object and then submit the topology using "submitTopology" method of "LocalCluster" class. One of the arguments for "submitTopology" is an instance of "Config" class. The "Config" class is used to set configuration options before submitting the topology. This configuration option will be merged with the cluster configuration at run time and sent to all task (spout and bolt) with the prepare method. Once topology is submitted to the cluster, we will wait 10 seconds for the cluster to compute the submitted topology and then shutdown the cluster using “shutdown” method of "LocalCluster". The complete program code is as follows −
Coding − LogAnalyserStorm.java
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
//import storm configuration packages
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
//Create main class LogAnalyserStorm submit topology.
public class LogAnalyserStorm {
public static void main(String[] args) throws Exception{
//Create Config instance for cluster configuration
Config config = new Config();
config.setDebug(true);
//
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());
builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
.shuffleGrouping("call-log-reader-spout");
builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
.fieldsGrouping("call-log-creator-bolt", new Fields("call"));
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("LogAnalyserStorm", config, builder.createTopology());
Thread.sleep(10000);
//Stop the topology
cluster.shutdown();
}
}
Building and Running the Application
完整应用程序包含四个 Java 代码。它们是 −
The complete application has four Java codes. They are −
-
FakeCallLogReaderSpout.java
-
CallLogCreaterBolt.java
-
CallLogCounterBolt.java
-
LogAnalyerStorm.java
可以通过以下命令构建应用程序 −
The application can be built using the following command −
javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java
可以通过以下命令运行应用程序 −
The application can be run using the following command −
java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserStorm
Output
应用程序启动后,它将输出有关集群启动过程、喷口和螺栓处理以及最终集群关闭过程的完整详细信息。在“CallLogCounterBolt”中,我们打印了呼叫及其计数详细信息。此信息将显示在控制台上,如下所示 −
Once the application is started, it will output the complete details about the cluster startup process, spout and bolt processing, and finally, the cluster shutdown process. In "CallLogCounterBolt", we have printed the call and its count details. This information will be displayed on the console as follows −
1234123402 - 1234123401 : 78
1234123402 - 1234123404 : 88
1234123402 - 1234123403 : 105
1234123401 - 1234123404 : 74
1234123401 - 1234123403 : 81
1234123401 - 1234123402 : 81
1234123403 - 1234123404 : 86
1234123404 - 1234123401 : 63
1234123404 - 1234123402 : 82
1234123403 - 1234123402 : 83
1234123404 - 1234123403 : 86
1234123403 - 1234123401 : 93
Non-JVM languages
Storm 拓扑由 Thrift 接口实现,这使得使用任何语言提交拓扑变得很容易。Storm 支持 Ruby、Python 和许多其他语言。让我们看看 Python 绑定。
Storm topologies are implemented by Thrift interfaces which makes it easy to submit topologies in any language. Storm supports Ruby, Python and many other languages. Let’s take a look at python binding.
Python Binding
Python 是一种通用的解释型、交互式、面向对象且高级的编程语言。Storm 支持 Python 来实现其拓扑。Python 支持发射、锚定、确认日志操作。
Python is a general-purpose interpreted, interactive, object-oriented, and high-level programming language. Storm supports Python to implement its topology. Python supports emitting, anchoring, acking, and logging operations.
如您所知,螺栓可以用任何语言定义。用另一种语言编写的螺栓作为子进程执行,并且 Storm 通过 stdin/stdout 上的 JSON 消息与那些子进程通信。首先使用支持 Python 绑定的示例螺栓 WordCount。
As you know, bolts can be defined in any language. Bolts written in another language are executed as sub-processes, and Storm communicates with those sub-processes with JSON messages over stdin/stdout. First take a sample bolt WordCount that supports python binding.
public static class WordCount implements IRichBolt {
public WordSplit() {
super("python", "splitword.py");
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
此处,类 WordCount 实现 IRichBolt 接口并在 Python 实现指定的超类方法参数“splitword.py”下运行。现在,创建一个名为 “splitword.py” 的 Python 实现。
Here the class WordCount implements the IRichBolt interface and running with python implementation specified super method argument "splitword.py". Now create a python implementation named "splitword.py".
import storm
class WordCountBolt(storm.BasicBolt):
def process(self, tup):
words = tup.values[0].split(" ")
for word in words:
storm.emit([word])
WordCountBolt().run()
这是一个 Python 示例实现,用于计算给定句子中的单词数量。同样,您还可以与其他支持的语言绑定。
This is the sample implementation for Python that counts the words in a given sentence. Similarly you can bind with other supporting languages as well.
Apache Storm - Trident
Trident 是 Storm 的扩展。像 Storm 一样,Trident 也由 Twitter 开发。开发 Trident 的主要原因是提供一个基于 Storm 的高级抽象以及有状态流处理和低延迟分布式查询。
Trident is an extension of Storm. Like Storm, Trident was also developed by Twitter. The main reason behind developing Trident is to provide a high-level abstraction on top of Storm along with stateful stream processing and low latency distributed querying.
Trident 使用水 spout 和螺栓,但这些底层组件在执行前是由 Trident 自动生成的。Trident 具有函数、过滤器、联接、分组和聚合。
Trident uses spout and bolt, but these low-level components are auto-generated by Trident before execution. Trident has functions, filters, joins, grouping, and aggregation.
Trident 处理作为一系列批次的流,这些批次称为事务。通常来说,这些小批次的大小根据输入流的规模,将达到数千或数百万个元组。通过这种方式,Trident 不同于按元组执行处理的 Storm。
Trident processes streams as a series of batches which are referred as transactions. Generally the size of those small batches will be on the order of thousands or millions of tuples, depending on the input stream. This way, Trident is different from Storm, which performs tuple-by-tuple processing.
批处理的概念与数据库事务非常相似。每个事务都被分配一个事务 ID。一旦事务的所有处理完成,事务就会被认为成功。但是,如果处理事务元组中的一个失败,整个事务将被重新传输。对于每个批次,Trident 将在事务开始时调用 beginCommit,在事务结束时调用 commit。
Batch processing concept is very similar to database transactions. Every transaction is assigned a transaction ID. The transaction is considered successful, once all its processing complete. However, a failure in processing one of the transaction’s tuples will cause the entire transaction to be retransmitted. For each batch, Trident will call beginCommit at the beginning of the transaction, and commit at the end of it.
Trident Topology
Trident API 暴露出使用 “TridentTopology” 类创建 Trident 拓扑的简单选项。基本上,Trident 拓扑从喷口接收输入流,并对流执行有序的操作序列(过滤、聚合、分组等)。Storm Tuple 被 Trident Tuple 替换,Bolt 被操作替换。一个简单的 Trident 拓扑可以如下创建 −
Trident API exposes an easy option to create Trident topology using “TridentTopology” class. Basically, Trident topology receives input stream from spout and do ordered sequence of operation (filter, aggregation, grouping, etc.,) on the stream. Storm Tuple is replaced by Trident Tuple and Bolts are replaced by operations. A simple Trident topology can be created as follow −
TridentTopology topology = new TridentTopology();
Trident Tuples
Trident 元组是一个带名称的值列表。TridentTuple 接口是 Trident 拓扑的数据模型。TridentTuple 接口是可以由 Trident 拓扑处理的数据的基本单位。
Trident tuple is a named list of values. The TridentTuple interface is the data model of a Trident topology. The TridentTuple interface is the basic unit of data that can be processed by a Trident topology.
Trident Spout
Trident 喷口类似于 Storm 喷口,并且具有使用 Trident 功能的附加选项。实际上,我们仍然可以使用我们在 Storm 拓扑中使用的 IRichSpout,但其本质上是非事务性的,我们无法使用 Trident 提供的优势。
Trident spout is similar to Storm spout, with additional options to use the features of Trident. Actually, we can still use the IRichSpout, which we have used in Storm topology, but it will be non-transactional in nature and we won’t be able to use the advantages provided by Trident.
具有使用 Trident 功能的所有功能的基本喷口是 "ITridentSpout"。它支持事务语义和不透明事务语义。其他喷口有 IBatchSpout、IPartitionedTridentSpout 和 IOpaquePartitionedTridentSpout。
The basic spout having all the functionality to use the features of Trident is "ITridentSpout". It supports both transactional and opaque transactional semantics. The other spouts are IBatchSpout, IPartitionedTridentSpout, and IOpaquePartitionedTridentSpout.
除了这些通用喷口之外,Trident 还有许多三叉喷口的示例实现。其中之一是 FeederBatchSpout 喷口,我们可以使用它轻松发送三叉元组的已命名列表,而无需担心批处理、并行性等问题。
In addition to these generic spouts, Trident has many sample implementation of trident spout. One of them is FeederBatchSpout spout, which we can use to send named list of trident tuples easily without worrying about batch processing, parallelism, etc.
FeederBatchSpout 的创建和数据馈送可以按如下所示完成 −
FeederBatchSpout creation and data feeding can be done as shown below −
TridentTopology topology = new TridentTopology();
FeederBatchSpout testSpout = new FeederBatchSpout(
ImmutableList.of("fromMobileNumber", "toMobileNumber", “duration”));
topology.newStream("fixed-batch-spout", testSpout)
testSpout.feed(ImmutableList.of(new Values("1234123401", "1234123402", 20)));
Trident Operations
Trident 依赖于 “Trident Operation” 来处理三叉元组的输入流。Trident API 具有大量内置操作,可用于处理从简单到复杂的流处理。这些操作范围从验证三叉元组的简单分组和聚合到复杂的验证。让我们了解一下最重要的和最常用的操作。
Trident relies on the “Trident Operation” to process the input stream of trident tuples. Trident API has a number of in-built operations to handle simple-to-complex stream processing. These operations range from simple validation to complex grouping and aggregation of trident tuples. Let us go through the most important and frequently used operations.
Filter
过滤器是一个用于执行输入验证任务的对象。Trident 过滤器获取三叉元组字段的子集作为输入,并根据满足或不满足某些条件返回 true 或 false。如果返回 true,则元组保留在输出流中;否则,元组从流中移除。过滤器基本上将继承 BaseFilter 类并实现 isKeep 方法。如下所示是过滤器操作的一个示例实现 −
Filter is an object used to perform the task of input validation. A Trident filter gets a subset of trident tuple fields as input and returns either true or false depending on whether certain conditions are satisfied or not. If true is returned, then the tuple is kept in the output stream; otherwise, the tuple is removed from the stream. Filter will basically inherit from the BaseFilter class and implement the isKeep method. Here is a sample implementation of filter operation −
public class MyFilter extends BaseFilter {
public boolean isKeep(TridentTuple tuple) {
return tuple.getInteger(1) % 2 == 0;
}
}
input
[1, 2]
[1, 3]
[1, 4]
output
[1, 2]
[1, 4]
可以使用 “each” 方法在拓扑中调用过滤器函数。可以使用 “Fields” 类指定输入(三叉元组子集)。示例代码如下 −
Filter function can be called in the topology using “each” method. “Fields” class can be used to specify the input (subset of trident tuple). The sample code is as follows −
TridentTopology topology = new TridentTopology();
topology.newStream("spout", spout)
.each(new Fields("a", "b"), new MyFilter())
Function
Function 是一个用于对单个三叉元组执行简单操作的对象。它采用三叉元组字段的子集,并发出零个或多个新的三叉元组字段。
Function is an object used to perform a simple operation on a single trident tuple. It takes a subset of trident tuple fields and emits zero or more new trident tuple fields.
Function 基本上继承 BaseFunction 类并实现 execute 方法。示例实现如下 −
Function basically inherits from the BaseFunction class and implements the execute method. A sample implementation is given below −
public class MyFunction extends BaseFunction {
public void execute(TridentTuple tuple, TridentCollector collector) {
int a = tuple.getInteger(0);
int b = tuple.getInteger(1);
collector.emit(new Values(a + b));
}
}
input
[1, 2]
[1, 3]
[1, 4]
output
[1, 2, 3]
[1, 3, 4]
[1, 4, 5]
就像过滤器操作一样,可以使用 each 方法在拓扑中调用函数操作。示例代码如下 −
Just like Filter operation, Function operation can be called in a topology using the each method. The sample code is as follows −
TridentTopology topology = new TridentTopology();
topology.newStream("spout", spout)
.each(new Fields(“a, b"), new MyFunction(), new Fields(“d")));
Aggregation
聚合是一个用于对输入批次或分区或流执行聚合操作的对象。Trident 有三种类型的聚合。具体如下 −
Aggregation is an object used to perform aggregation operations on an input batch or partition or stream. Trident has three types of aggregation. They are as follows −
-
aggregate − Aggregates each batch of trident tuple in isolation. During the aggregate process, the tuples are initially repartitioned using the global grouping to combine all partitions of the same batch into a single partition.
-
partitionAggregate − Aggregates each partition instead of the entire batch of trident tuple. The output of the partition aggregate completely replaces the input tuple. The output of the partition aggregate contains a single field tuple.
-
persistentaggregate − Aggregates on all trident tuple across all batch and stores the result in either memory or database.
TridentTopology topology = new TridentTopology();
// aggregate operation
topology.newStream("spout", spout)
.each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
.aggregate(new Count(), new Fields(“count”))
// partitionAggregate operation
topology.newStream("spout", spout)
.each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
.partitionAggregate(new Count(), new Fields(“count"))
// persistentAggregate - saving the count to memory
topology.newStream("spout", spout)
.each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
.persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));
可以使用 CombinerAggregator、ReducerAggregator 或通用 Aggregator 接口创建聚合操作。上面示例中使用的 “count” 聚合器是内置聚合器之一。它使用 “CombinerAggregator” 实现。实现如下 −
Aggregation operation can be created using either CombinerAggregator, ReducerAggregator, or generic Aggregator interface. The "count” aggregator used in the above example is one of the build-in aggregators. It is implemented using “CombinerAggregator”. The implementation is as follows −
public class Count implements CombinerAggregator<Long> {
@Override
public Long init(TridentTuple tuple) {
return 1L;
}
@Override
public Long combine(Long val1, Long val2) {
return val1 + val2;
}
@Override
public Long zero() {
return 0L;
}
}
Grouping
分组操作是内置操作,可通过 groupBy 方法调用。groupBy 方法通过对指定字段执行 partitionBy,重新对流进行分区,然后在每个分区中,将组字段相等的元组分组在一起。通常,我们使用“groupBy”和“persistentAggregate”来获取分组聚合。示例代码如下 −
Grouping operation is an inbuilt operation and can be called by the groupBy method. The groupBy method repartitions the stream by doing a partitionBy on the specified fields, and then within each partition, it groups tuples together whose group fields are equal. Normally, we use “groupBy” along with “persistentAggregate” to get the grouped aggregation. The sample code is as follows −
TridentTopology topology = new TridentTopology();
// persistentAggregate - saving the count to memory
topology.newStream("spout", spout)
.each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
.groupBy(new Fields(“d”)
.persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));
Merging and Joining
可以通过分别使用“merge”和“join”方法来进行合并和连接。合并组合一个或多个流。连接与合并类似,但连接使用来自两侧的三元组字段来检查和连接两个流。此外,连接仅在批处理级别起作用。示例代码如下 −
Merging and joining can be done by using “merge” and “join” method respectively. Merging combines one or more streams. Joining is similar to merging, except the fact that joining uses trident tuple field from both sides to check and join two streams. Moreover, joining will work under batch level only. The sample code is as follows −
TridentTopology topology = new TridentTopology();
topology.merge(stream1, stream2, stream3);
topology.join(stream1, new Fields("key"), stream2, new Fields("x"),
new Fields("key", "a", "b", "c"));
State Maintenance
Trident 提供了维护状态的机制。状态信息可以存储在拓扑本身中,否则你也可以将其存储在单独的数据库中。目的是维护一个状态:如果在处理过程中任何元组发生故障,那么将重试失败的元组。在更新状态时,这会导致问题,因为你无法确定先前是否更新过此元组的状态。如果在更新状态之前元组发生故障,那么重试元组将使状态保持稳定。但是,如果在更新状态后元组发生故障,那么重试同一个元组将再次增加数据库中记录的数目,并使状态不稳定。需要执行以下步骤来确保仅处理消息一次 −
Trident provides a mechanism for state maintenance. State information can be stored in the topology itself, otherwise you can store it in a separate database as well. The reason is to maintain a state that if any tuple fails during processing, then the failed tuple is retried. This creates a problem while updating the state because you are not sure whether the state of this tuple has been updated previously or not. If the tuple has failed before updating the state, then retrying the tuple will make the state stable. However, if the tuple has failed after updating the state, then retrying the same tuple will again increase the count in the database and make the state unstable. One needs to perform the following steps to ensure a message is processed only once −
-
Process the tuples in small batches.
-
Assign a unique ID to each batch. If the batch is retried, it is given the same unique ID.
-
The state updates are ordered among batches. For example, the state update of the second batch will not be possible until the state update for the first batch has completed.
Distributed RPC
分布式 RPC 用于从 Trident 拓扑查询和检索结果。Storm 有一个内置的分布式 RPC 服务器。分布式 RPC 服务器接收来自客户端的 RPC 请求,并将其传递给拓扑。拓扑处理请求并将结果发送到分布式 RPC 服务器,该服务器由分布式 RPC 服务器重定向到客户端。Trident 的分布式 RPC 查询与普通 RPC 查询一样执行,只是这些查询并行运行。
Distributed RPC is used to query and retrieve the result from the Trident topology. Storm has an inbuilt distributed RPC server. The distributed RPC server receives the RPC request from the client and passes it to the topology. The topology processes the request and sends the result to the distributed RPC server, which is redirected by the distributed RPC server to the client. Trident’s distributed RPC query executes like a normal RPC query, except for the fact that these queries are run in parallel.
When to Use Trident?
在许多用例中,如果要求仅处理一次查询,我们可以通过在 Trident 中编写拓扑来实现。另一方面,在 Storm 中难以实现处理一次。因此,在需要精确处理一次的用例中,Trident 将非常有用。Trident 不适用于所有用例,特别是高性能用例,因为它增加了 Storm 的复杂性并管理状态。
As in many use-cases, if the requirement is to process a query only once, we can achieve it by writing a topology in Trident. On the other hand, it will be difficult to achieve exactly once processing in the case of Storm. Hence Trident will be useful for those use-cases where you require exactly once processing. Trident is not for all use cases, especially high-performance use-cases because it adds complexity to Storm and manages the state.
Working Example of Trident
我们准备将上一部分中研究过的呼叫记录分析器应用程序转换为 Trident 框架。与普通 Storm 相比,Trident 应用程序相对容易,这要归功于其高级 API。在 Trident 中,Storm 主要需要执行函数、过滤器、聚合、分组、连接和合并运算中的任何一个。最后,我们将使用 LocalDRPC 类启动 DRPC 服务器,并使用 LocalDRPC 类的 execute 方法搜索一些关键字。
We are going to convert our call log analyzer application worked out in the previous section to Trident framework. Trident application will be relatively easy as compared to plain storm, thanks to its high-level API. Storm will be basically required to perform any one of Function, Filter, Aggregate, GroupBy, Join and Merge operations in Trident. Finally we will start the DRPC Server using the LocalDRPC class and search some keyword using the execute method of LocalDRPC class.
Formatting the call information
FormatCall 类的目的是格式化呼叫信息,包括“呼叫号码”和“接收号码”。完整的程序代码如下所示 −
The purpose of the FormatCall class is to format the call information comprising “Caller number” and “Receiver number”. The complete program code is as follows −
Coding: FormatCall.java
import backtype.storm.tuple.Values;
import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;
public class FormatCall extends BaseFunction {
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
String fromMobileNumber = tuple.getString(0);
String toMobileNumber = tuple.getString(1);
collector.emit(new Values(fromMobileNumber + " - " + toMobileNumber));
}
}
CSVSplit
CSVSplit 类的目的是基于“逗号 (,)”拆分输入字符串,并发出字符串中的每个单词。此函数用于解析分布式查询的输入参数。完整的代码如下所示 −
The purpose of the CSVSplit class is to split the input string based on “comma (,)” and emit every word in the string. This function is used to parse the input argument of distributed querying. The complete code is as follows −
Coding: CSVSplit.java
import backtype.storm.tuple.Values;
import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;
public class CSVSplit extends BaseFunction {
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
for(String word: tuple.getString(0).split(",")) {
if(word.length() > 0) {
collector.emit(new Values(word));
}
}
}
}
Log Analyzer
这是主应用程序。最初,应用程序将初始化 TridentTopology 并使用 FeederBatchSpout 输入呼叫者信息。可以使用 TridentTopology 类的 newStream 方法创建 Trident 拓扑流。类似地,可以使用 TridentTopology 类的 newDRCPStream 方法创建 Trident 拓扑 DRPC 流。可以使用 LocalDRPC 类创建一个简单的 DRCP 服务器。 LocalDRPC 有一个用于搜索一些关键字的 execute 方法。完整的代码如下所示。
This is the main application. Initially, the application will initialize the TridentTopology and feed caller information using FeederBatchSpout. Trident topology stream can be created using the newStream method of TridentTopology class. Similarly, Trident topology DRPC stream can be created using the newDRCPStream method of TridentTopology class. A simple DRCP server can be created using LocalDRPC class. LocalDRPC has execute method to search some keyword. The complete code is given below.
Coding: LogAnalyserTrident.java
import java.util.*;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.utils.DRPCClient;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import storm.trident.TridentState;
import storm.trident.TridentTopology;
import storm.trident.tuple.TridentTuple;
import storm.trident.operation.builtin.FilterNull;
import storm.trident.operation.builtin.Count;
import storm.trident.operation.builtin.Sum;
import storm.trident.operation.builtin.MapGet;
import storm.trident.operation.builtin.Debug;
import storm.trident.operation.BaseFilter;
import storm.trident.testing.FixedBatchSpout;
import storm.trident.testing.FeederBatchSpout;
import storm.trident.testing.Split;
import storm.trident.testing.MemoryMapState;
import com.google.common.collect.ImmutableList;
public class LogAnalyserTrident {
public static void main(String[] args) throws Exception {
System.out.println("Log Analyser Trident");
TridentTopology topology = new TridentTopology();
FeederBatchSpout testSpout = new FeederBatchSpout(ImmutableList.of("fromMobileNumber",
"toMobileNumber", "duration"));
TridentState callCounts = topology
.newStream("fixed-batch-spout", testSpout)
.each(new Fields("fromMobileNumber", "toMobileNumber"),
new FormatCall(), new Fields("call"))
.groupBy(new Fields("call"))
.persistentAggregate(new MemoryMapState.Factory(), new Count(),
new Fields("count"));
LocalDRPC drpc = new LocalDRPC();
topology.newDRPCStream("call_count", drpc)
.stateQuery(callCounts, new Fields("args"), new MapGet(), new Fields("count"));
topology.newDRPCStream("multiple_call_count", drpc)
.each(new Fields("args"), new CSVSplit(), new Fields("call"))
.groupBy(new Fields("call"))
.stateQuery(callCounts, new Fields("call"), new MapGet(),
new Fields("count"))
.each(new Fields("call", "count"), new Debug())
.each(new Fields("count"), new FilterNull())
.aggregate(new Fields("count"), new Sum(), new Fields("sum"));
Config conf = new Config();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("trident", conf, topology.build());
Random randomGenerator = new Random();
int idx = 0;
while(idx < 10) {
testSpout.feed(ImmutableList.of(new Values("1234123401",
"1234123402", randomGenerator.nextInt(60))));
testSpout.feed(ImmutableList.of(new Values("1234123401",
"1234123403", randomGenerator.nextInt(60))));
testSpout.feed(ImmutableList.of(new Values("1234123401",
"1234123404", randomGenerator.nextInt(60))));
testSpout.feed(ImmutableList.of(new Values("1234123402",
"1234123403", randomGenerator.nextInt(60))));
idx = idx + 1;
}
System.out.println("DRPC : Query starts");
System.out.println(drpc.execute("call_count","1234123401 - 1234123402"));
System.out.println(drpc.execute("multiple_call_count", "1234123401 -
1234123402,1234123401 - 1234123403"));
System.out.println("DRPC : Query ends");
cluster.shutdown();
drpc.shutdown();
// DRPCClient client = new DRPCClient("drpc.server.location", 3772);
}
}
Building and Running the Application
完整的应用程序有三个 Java 代码。它们如下:
The complete application has three Java codes. They are as follows −
-
FormatCall.java
-
CSVSplit.java
-
LogAnalyerTrident.java
可以使用以下命令构建应用程序 −
The application can be built by using the following command −
javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java
可以使用以下命令运行应用程序 −
The application can be run by using the following command −
java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserTrident
Output
一旦应用程序启动,应用程序将输出有关集群启动过程、处理操作、DRPC 服务器和客户端信息以及集群关闭过程的完整详细信息。该输出将显示在控制台上,如下所示。
Once the application is started, the application will output the complete details about the cluster startup process, operations processing, DRPC Server and client information, and finally, the cluster shutdown process. This output will be displayed on the console as shown below.
DRPC : Query starts
[["1234123401 - 1234123402",10]]
DEBUG: [1234123401 - 1234123402, 10]
DEBUG: [1234123401 - 1234123403, 10]
[[20]]
DRPC : Query ends
Apache Storm in Twitter
在本节中,我们将讨论 Apache Storm 的实时应用程序。我们将看到 Twitter 中如何使用 Storm。
Here in this chapter, we will discuss a real-time application of Apache Storm. We will see how Storm is used in Twitter.
Twitter 是一款在线社交网络服务,它提供了一个用于发送和接收用户推文(tweet)的平台。注册用户可以阅读并发布推文,但未注册用户只能阅读推文。主题标签用于通过在相关关键词之前添加 # 来按关键词对推文进行分类。现在,让我们来了解一个按主题查找最常用的主题标签的实时场景。
Twitter is an online social networking service that provides a platform to send and receive user tweets. Registered users can read and post tweets, but unregistered users can only read tweets. Hashtag is used to categorize tweets by keyword by appending # before the relevant keyword. Now let us take a real-time scenario of finding the most used hashtag per topic.
Spout Creation
喷发的目的是尽快获取人们提交的推文。Twitter 提供“Twitter Streaming API”,这是一个基于 Web 服务的工具,用于实时检索人们提交的推文。可以以任何编程语言访问 Twitter Streaming API。
The purpose of spout is to get the tweets submitted by people as soon as possible. Twitter provides “Twitter Streaming API”, a web service based tool to retrieve the tweets submitted by people in real time. Twitter Streaming API can be accessed in any programming language.
twitter4j 是一个开源的非官方 Java 库,它提供了基于 Java 的模块以轻松访问 Twitter Streaming API。 twitter4j 提供了一个基于侦听器的框架来访问推文。要访问 Twitter Streaming API,我们需要登录 Twitter 开发人员帐户,并应该获取以下 OAuth 身份验证详细信息。
twitter4j is an open source, unofficial Java library, which provides a Java based module to easily access the Twitter Streaming API. twitter4j provides a listener-based framework to access the tweets. To access the Twitter Streaming API, we need to sign in for Twitter developer account and should get the following OAuth authentication details.
-
Customerkey
-
CustomerSecret
-
AccessToken
-
AccessTookenSecret
Storm 在其入门套件中提供了 twitter 喷发 TwitterSampleSpout, 。我们将使用它来检索推文。该喷发需要 OAuth 身份验证详细信息和至少一个关键词。该喷发将基于关键词发出实时推文。完整的程序代码如下所示。
Storm provides a twitter spout, TwitterSampleSpout, in its starter kit. We will be using it to retrieve the tweets. The spout needs OAuth authentication details and at least a keyword. The spout will emit real-time tweets based on keywords. The complete program code is given below.
Coding: TwitterSampleSpout.java
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import twitter4j.FilterQuery;
import twitter4j.StallWarning;
import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;
import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;
import twitter4j.auth.AccessToken;
import twitter4j.conf.ConfigurationBuilder;
import backtype.storm.Config;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
@SuppressWarnings("serial")
public class TwitterSampleSpout extends BaseRichSpout {
SpoutOutputCollector _collector;
LinkedBlockingQueue<Status> queue = null;
TwitterStream _twitterStream;
String consumerKey;
String consumerSecret;
String accessToken;
String accessTokenSecret;
String[] keyWords;
public TwitterSampleSpout(String consumerKey, String consumerSecret,
String accessToken, String accessTokenSecret, String[] keyWords) {
this.consumerKey = consumerKey;
this.consumerSecret = consumerSecret;
this.accessToken = accessToken;
this.accessTokenSecret = accessTokenSecret;
this.keyWords = keyWords;
}
public TwitterSampleSpout() {
// TODO Auto-generated constructor stub
}
@Override
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
queue = new LinkedBlockingQueue<Status>(1000);
_collector = collector;
StatusListener listener = new StatusListener() {
@Override
public void onStatus(Status status) {
queue.offer(status);
}
@Override
public void onDeletionNotice(StatusDeletionNotice sdn) {}
@Override
public void onTrackLimitationNotice(int i) {}
@Override
public void onScrubGeo(long l, long l1) {}
@Override
public void onException(Exception ex) {}
@Override
public void onStallWarning(StallWarning arg0) {
// TODO Auto-generated method stub
}
};
ConfigurationBuilder cb = new ConfigurationBuilder();
cb.setDebugEnabled(true)
.setOAuthConsumerKey(consumerKey)
.setOAuthConsumerSecret(consumerSecret)
.setOAuthAccessToken(accessToken)
.setOAuthAccessTokenSecret(accessTokenSecret);
_twitterStream = new TwitterStreamFactory(cb.build()).getInstance();
_twitterStream.addListener(listener);
if (keyWords.length == 0) {
_twitterStream.sample();
}else {
FilterQuery query = new FilterQuery().track(keyWords);
_twitterStream.filter(query);
}
}
@Override
public void nextTuple() {
Status ret = queue.poll();
if (ret == null) {
Utils.sleep(50);
} else {
_collector.emit(new Values(ret));
}
}
@Override
public void close() {
_twitterStream.shutdown();
}
@Override
public Map<String, Object> getComponentConfiguration() {
Config ret = new Config();
ret.setMaxTaskParallelism(1);
return ret;
}
@Override
public void ack(Object id) {}
@Override
public void fail(Object id) {}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("tweet"));
}
}
Hashtag Reader Bolt
喷发发出的推文将转发到 HashtagReaderBolt ,它将处理该推文并发出所有可用的主题标签。HashtagReaderBolt 使用了 twitter4j 提供的 getHashTagEntities 方法。getHashTagEntities 读取推文并返回主题标签列表。完整的程序代码如下 −
The tweet emitted by spout will be forwarded to HashtagReaderBolt, which will process the tweet and emit all the available hashtags. HashtagReaderBolt uses getHashTagEntities method provided by twitter4j. getHashTagEntities reads the tweet and returns the list of hashtag. The complete program code is as follows −
Coding: HashtagReaderBolt.java
import java.util.HashMap;
import java.util.Map;
import twitter4j.*;
import twitter4j.conf.*;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
public class HashtagReaderBolt implements IRichBolt {
private OutputCollector collector;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple tuple) {
Status tweet = (Status) tuple.getValueByField("tweet");
for(HashtagEntity hashtage : tweet.getHashtagEntities()) {
System.out.println("Hashtag: " + hashtage.getText());
this.collector.emit(new Values(hashtage.getText()));
}
}
@Override
public void cleanup() {}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("hashtag"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
Hashtag Counter Bolt
发出的主题标签将转发到 HashtagCounterBolt 。此螺栓将处理所有主题标签,并将每个主题标签及其计数保存在内存中,使用 Java Map 对象。完整的程序代码如下所示。
The emitted hashtag will be forwarded to HashtagCounterBolt. This bolt will process all the hashtags and save each and every hashtag and its count in memory using Java Map object. The complete program code is given below.
Coding: HashtagCounterBolt.java
import java.util.HashMap;
import java.util.Map;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
public class HashtagCounterBolt implements IRichBolt {
Map<String, Integer> counterMap;
private OutputCollector collector;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
this.counterMap = new HashMap<String, Integer>();
this.collector = collector;
}
@Override
public void execute(Tuple tuple) {
String key = tuple.getString(0);
if(!counterMap.containsKey(key)){
counterMap.put(key, 1);
}else{
Integer c = counterMap.get(key) + 1;
counterMap.put(key, c);
}
collector.ack(tuple);
}
@Override
public void cleanup() {
for(Map.Entry<String, Integer> entry:counterMap.entrySet()){
System.out.println("Result: " + entry.getKey()+" : " + entry.getValue());
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("hashtag"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
Submitting a Topology
提交拓扑是主要应用程序。Twitter 拓扑包括 TwitterSampleSpout 、 HashtagReaderBolt 和 HashtagCounterBolt 。以下程序代码显示了如何提交拓扑。
Submitting a topology is the main application. Twitter topology consists of TwitterSampleSpout, HashtagReaderBolt, and HashtagCounterBolt. The following program code shows how to submit a topology.
Coding: TwitterHashtagStorm.java
import java.util.*;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
public class TwitterHashtagStorm {
public static void main(String[] args) throws Exception{
String consumerKey = args[0];
String consumerSecret = args[1];
String accessToken = args[2];
String accessTokenSecret = args[3];
String[] arguments = args.clone();
String[] keyWords = Arrays.copyOfRange(arguments, 4, arguments.length);
Config config = new Config();
config.setDebug(true);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("twitter-spout", new TwitterSampleSpout(consumerKey,
consumerSecret, accessToken, accessTokenSecret, keyWords));
builder.setBolt("twitter-hashtag-reader-bolt", new HashtagReaderBolt())
.shuffleGrouping("twitter-spout");
builder.setBolt("twitter-hashtag-counter-bolt", new HashtagCounterBolt())
.fieldsGrouping("twitter-hashtag-reader-bolt", new Fields("hashtag"));
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("TwitterHashtagStorm", config,
builder.createTopology());
Thread.sleep(10000);
cluster.shutdown();
}
}
Building and Running the Application
完整应用程序有四个 Java 代码。它们如下所示 −
The complete application has four Java codes. They are as follows −
-
TwitterSampleSpout.java
-
HashtagReaderBolt.java
-
HashtagCounterBolt.java
-
TwitterHashtagStorm.java
你可以使用以下命令编译应用程序 −
You can compile the application using the following command −
javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:”/path/to/twitter4j/lib/*” *.java
使用以下命令执行应用程序 −
Execute the application using the following commands −
javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:”/path/to/twitter4j/lib/*”:.
TwitterHashtagStorm <customerkey> <customersecret> <accesstoken> <accesstokensecret>
<keyword1> <keyword2> … <keywordN>
Output
应用程序将打印当前可用的标签和其计数。输出应类似于以下内容 −
The application will print the current available hashtag and its count. The output should be similar to the following −
Result: jazztastic : 1
Result: foodie : 1
Result: Redskins : 1
Result: Recipe : 1
Result: cook : 1
Result: android : 1
Result: food : 2
Result: NoToxicHorseMeat : 1
Result: Purrs4Peace : 1
Result: livemusic : 1
Result: VIPremium : 1
Result: Frome : 1
Result: SundayRoast : 1
Result: Millennials : 1
Result: HealthWithKier : 1
Result: LPs30DaysofGratitude : 1
Result: cooking : 1
Result: gameinsight : 1
Result: Countryfile : 1
Result: androidgames : 1
Apache Storm in Yahoo! Finance
Yahoo! Finance 是互联网领先的商业新闻和财务数据网站。它是 Yahoo! 的一部分,可以提供任何人均可访问的财务新闻、市场统计数据、国际市场数据和关于财务资源的其他信息。
Yahoo! Finance is the Internet’s leading business news and financial data website. It is a part of Yahoo! and gives information about financial news, market statistics, international market data and other information about financial resources that anyone can access.
若您是一位注册的 Yahoo! 用户,那么您可以定制 Yahoo! Finance 来利用它的某些服务内容。Yahoo! Finance API 用于查询 Yahoo! 上的财务数据。
If you are a registered Yahoo! user, then you can customize Yahoo! Finance to take advantage of its certain offerings. Yahoo! Finance API is used to query financial data from Yahoo!
此 API 显示的数据比实时数据延迟 15 分钟,每 1 分钟更新一次其数据库,用于访问当前的股票相关信息。现在我们来了解一个公司的实时场景,并了解如何在股票价值低于 100 时发出警报。
This API displays data that is delayed by 15-minutes from real time, and updates its database every 1 minute, to access current stock-related information. Now let us take a real-time scenario of a company and see how to raise an alert when its stock value goes below 100.
Spout Creation
喷口的目的是获取公司详细信息并将价格发送给螺栓。您可以使用以下程序代码来创建一个喷口。
The purpose of spout is to get the details of the company and emit the prices to bolts. You can use the following program code to create a spout.
Coding: YahooFinanceSpout.java
import java.util.*;
import java.io.*;
import java.math.BigDecimal;
//import yahoofinace packages
import yahoofinance.YahooFinance;
import yahoofinance.Stock;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
public class YahooFinanceSpout implements IRichSpout {
private SpoutOutputCollector collector;
private boolean completed = false;
private TopologyContext context;
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector){
this.context = context;
this.collector = collector;
}
@Override
public void nextTuple() {
try {
Stock stock = YahooFinance.get("INTC");
BigDecimal price = stock.getQuote().getPrice();
this.collector.emit(new Values("INTC", price.doubleValue()));
stock = YahooFinance.get("GOOGL");
price = stock.getQuote().getPrice();
this.collector.emit(new Values("GOOGL", price.doubleValue()));
stock = YahooFinance.get("AAPL");
price = stock.getQuote().getPrice();
this.collector.emit(new Values("AAPL", price.doubleValue()));
} catch(Exception e) {}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("company", "price"));
}
@Override
public void close() {}
public boolean isDistributed() {
return false;
}
@Override
public void activate() {}
@Override
public void deactivate() {}
@Override
public void ack(Object msgId) {}
@Override
public void fail(Object msgId) {}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
Bolt Creation
此处,螺栓的目的是处理给定公司的价格(当价格低于 100 时)。它使用 Java Map 对象将截止价格限制警报设置为 true (当股票价格低于 100 时);否则为 false。完整的程序代码如下所示 −
Here the purpose of bolt is to process the given company’s prices when the prices fall below 100. It uses Java Map object to set the cutoff price limit alert as true when the stock prices fall below 100; otherwise false. The complete program code is as follows −
Coding: PriceCutOffBolt.java
import java.util.HashMap;
import java.util.Map;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
public class PriceCutOffBolt implements IRichBolt {
Map<String, Integer> cutOffMap;
Map<String, Boolean> resultMap;
private OutputCollector collector;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
this.cutOffMap = new HashMap <String, Integer>();
this.cutOffMap.put("INTC", 100);
this.cutOffMap.put("AAPL", 100);
this.cutOffMap.put("GOOGL", 100);
this.resultMap = new HashMap<String, Boolean>();
this.collector = collector;
}
@Override
public void execute(Tuple tuple) {
String company = tuple.getString(0);
Double price = tuple.getDouble(1);
if(this.cutOffMap.containsKey(company)){
Integer cutOffPrice = this.cutOffMap.get(company);
if(price < cutOffPrice) {
this.resultMap.put(company, true);
} else {
this.resultMap.put(company, false);
}
}
collector.ack(tuple);
}
@Override
public void cleanup() {
for(Map.Entry<String, Boolean> entry:resultMap.entrySet()){
System.out.println(entry.getKey()+" : " + entry.getValue());
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("cut_off_price"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
Submitting a Topology
这是 YahooFinanceSpout.java 和 PriceCutOffBolt.java 相互连接并产生拓扑的主应用程序。以下程序代码演示了如何提交拓扑。
This is the main application where YahooFinanceSpout.java and PriceCutOffBolt.java are connected together and produce a topology. The following program code shows how you can submit a topology.
Coding: YahooFinanceStorm.java
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
public class YahooFinanceStorm {
public static void main(String[] args) throws Exception{
Config config = new Config();
config.setDebug(true);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("yahoo-finance-spout", new YahooFinanceSpout());
builder.setBolt("price-cutoff-bolt", new PriceCutOffBolt())
.fieldsGrouping("yahoo-finance-spout", new Fields("company"));
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("YahooFinanceStorm", config, builder.createTopology());
Thread.sleep(10000);
cluster.shutdown();
}
}
Building and Running the Application
完整的应用程序有三个 Java 代码。它们如下:
The complete application has three Java codes. They are as follows −
-
YahooFinanceSpout.java
-
PriceCutOffBolt.java
-
YahooFinanceStorm.java
可以通过以下命令构建应用程序 −
The application can be built using the following command −
javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:”/path/to/yahoofinance/lib/*” *.java
可以通过以下命令运行应用程序 −
The application can be run using the following command −
javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:”/path/to/yahoofinance/lib/*”:.
YahooFinanceStorm
Apache Storm - Applications
Apache Storm 框架支持当今许多最佳工业应用程序。本章将对 Storm 的一些最显著应用作一个非常简略的概述。
Apache Storm framework supports many of the today’s best industrial applications. We will provide a very brief overview of some of the most notable applications of Storm in this chapter.
Klout
Klout 是一款应用程序,使用社交媒体分析来根据在线社交影响力对用户进行排名,通 Klout Score ,一个介于 1 和 100 之间的一个数值。Klout 使用 Apache Storm 内建的 Trident 抽象创建复杂的拓扑来传输数据流。
Klout is an application that uses social media analytics to rank its users based on online social influence through Klout Score, which is a numerical value between 1 and 100. Klout uses Apache Storm’s inbuilt Trident abstraction to create complex topologies that stream data.
The Weather Channel
Weather Channel 使用 Storm 拓扑来采集天气数据。它已与 Twitter 合作来在 Twitter 和移动应用程序上实现气象信息广告。 OpenSignal 是一家专门进行无线覆盖映射的公司。 StormTag 和 WeatherSignal 是 OpenSignal 创建的天气项目。StormTag 是一个附加在钥匙扣上的蓝牙天气站。该设备收集的气象数据将发送给 WeatherSignal 应用程序和 OpenSignal 服务器。
The Weather Channel uses Storm topologies to ingest weather data. It has tied up with Twitter to enable weather-informed advertising on Twitter and mobile applications. OpenSignal is a company that specializes in wireless coverage mapping. StormTag and WeatherSignal are weather-based projects created by OpenSignal. StormTag is a Bluetooth weather station that attaches to a keychain. The weather data collected by the device is sent to the WeatherSignal app and OpenSignal servers.
Telecom Industry
电信供应商每秒处理数百万通电话。他们对掉线和音质差的电话进行取证。通话详情记录以每秒百万条的速度流入,而 Apache Storm 实时处理这些记录并识别任何令人担忧的模式。Storm 分析可用于持续提高通话质量。
Telecommunication providers process millions of phone calls per second. They perform forensics on dropped calls and poor sound quality. Call detail records flow in at a rate of millions per second and Apache Storm processes those in real-time and identifies any troubling patterns. Storm analysis can be used to continuously improve call quality.