Apache Storm 简明教程

Apache Storm - Quick Guide

Apache Storm - Introduction

What is Apache Storm?

Apache Storm 是一个分布式实时大数据处理系统。Storm 被设计为以容错和水平可扩展的方法处理大量数据。它是一个流数据框架,具有最高的摄取率。虽然 Storm 是无状态的,但它通过 Apache ZooKeeper 管理分布式环境和集群状态。它很简单,并且您可以并行对实时数据执行各种操作。

Apache Storm 继续成为实时数据分析的领导者。Storm 易于设置和操作,并且保证每条消息都将至少处理一次。

Apache Storm vs Hadoop

基本上,Hadoop 和 Storm 框架用于分析大数据。它们在某些方面互补,但在某些方面也有所不同。Apache Storm 除了持久性之外,执行所有操作,而 Hadoop 擅长所有内容,但实时计算方面滞后。下表比较了 Storm 和 Hadoop 的属性。

Storm

Hadoop

Real-time stream processing

Batch processing

Stateless

Stateful

基于 ZooKeeper 协调的主/从架构。主节点称为 nimbus ,从节点称为 supervisors

基于或者不基于 ZooKeeper 的协调的主从架构。主节点为 job tracker ,从节点为 task tracker

Storm 流处理可以在集群上每秒访问数万条消息。

Hadoop 分布式文件系统 (HDFS) 使用 MapReduce 框架处理需要数分钟或数小时的大量数据。

Storm 拓扑运行,直到用户关闭或发生意外的不可恢复故障为止。

MapReduce 作业按顺序执行,最终完成。

Both are distributed and fault-tolerant

如果 Nimbus/监管者进程死了,重新启动会使其从停止处继续进行,因此不受影响。

Use-Cases of Apache Storm

Apache Storm 以实时大数据流处理而闻名。出于这个原因,大多数公司将 Storm 用作其系统的组成部分。一些值得注意的示例如下 −

Twitter − Twitter 使用 Apache Storm 来处理其 “Publisher Analytics 产品” 的范围。Publisher Analytics 产品在 Twitter 平台中处理每条推文和点击。Apache Storm 与 Twitter 基础设施深度集成。

NaviSite − NaviSite 使用 Storm 用于事件日志监控/审计系统。在系统中生成的每条日志都将通过 Storm。Storm 将根据配置的正则表达式集检查消息,如果有匹配项,则会将该特定消息保存到数据库中。

Wego − Wego 是位于新加坡的旅游元搜索引擎。与旅游相关的的数据来自世界各地,时间不同。Storm 帮助 Wego 搜索实时数据、解决并发问题并找到最终用户的最佳匹配。

Apache Storm Benefits

以下是 Apache Storm 提供的好处列表 −

  1. Storm 是开源的、可靠的且用户友好的。它可以在小公司和大型公司中使用。

  2. Storm 具有容错性、灵活性、可靠性,且支持任何编程语言。

  3. Allows real-time stream processing.

  4. Storm 以难以置信的速度运行,因为它有着强大的数据处理能力。

  5. Storm 可以通过线性增加资源,即使在负载不断增加的情况下也能保持性能。它具有高度可扩展性。

  6. Storm 会在几秒或几分钟内执行数据刷新和端到端传递响应,具体取决于问题。它的延迟非常低。

  7. Storm has operational intelligence.

  8. Storm 即使在群集中的任何已连接节点崩溃或消息丢失的情况下,也能提供数据处理保障。

Apache Storm - Core Concepts

Apache Storm 从一端读取实时的原始数据流,并通过一系列的小处理单元对其进行处理,然后在另一端输出已处理/有用的信息。

以下图表描述了 Apache Storm 的核心概念。

core concept

现在,让我们仔细了解 Apache Storm 的组件 −

Components

Description

Tuple

元组是 Storm 中的主要数据结构。它是有序元素的列表。默认情况下,元组支持所有数据类型。通常,它被建模为一组逗号分隔的值,并被传递到 Storm 集群。

Stream

流是元组的无序序列。

Spouts

流的源。通常,Storm 接受来自原始数据源(例如 Twitter Streaming API、Apache Kafka 队列、Kestrel 队列等)的输入数据。否则,您可以编写喷口来读取数据源中的数据。“ISpout”是用于实现喷口的核心接口。某些特定接口包括 IRichSpout、BaseRichSpout、KafkaSpout 等。

Bolts

螺栓是逻辑处理单元。喷口将数据传递给螺栓,而螺栓则对其进行处理并生成新的输出流。螺栓可以执行过滤、聚合、连接、与数据源和数据库交互的操作。螺栓接收数据并发送给一个或多个螺栓。“IBolt”是用于实现螺栓的核心接口。某些常用接口包括 IRichBolt、IBasicBolt 等。

让我们举一个“Twitter 分析”的实时示例,并了解如何在 Apache Storm 中对其进行建模。以下图表描绘了结构。

twitter analysis

“Twitter 分析”的输入来自 Twitter Streaming API。喷口将使用 Twitter Streaming API 读取用户发出的微博,并作为元组流进行输出。喷口的一个元组将具有一个 Twitter 用户名和一个作为逗号分隔值的单条微博。然后,这个元组流将被转发给螺栓,而螺栓将把微博拆分为各个单词,计算单词数量,并将信息持久保存到已配置的数据源。现在,我们可以通过查询数据源轻松地得到结果。

Topology

喷口和螺栓相互连接,它们形成了一个拓扑。实时应用程序逻辑在 Storm 拓扑内指定。简单来说,拓扑是有向图,其中顶点是计算,边是数据流。

一个简单的拓扑结构从流经器开始。流经器向一个或多个螺栓发送数据。螺栓表示拓扑结构中节点,该节点拥有最小的处理逻辑,而且螺栓的输出可以被输入至另一个螺栓。

Storm 会一直保持拓扑一直运行,直到你终止拓扑。Apache Storm 的主要工作是运行拓扑,并且可在特定时间运行任何数量的拓扑。

Tasks

现在,你已经对流经器和螺栓有了基本概念。它们是拓扑结构的最小逻辑单元,而且使用单个流经器和一系列螺栓构建拓扑结构。为了让拓扑成功运行,应对它们按特定顺序恰当执行。Storm 对各个流经器和螺栓执行的操作称为“任务”。简而言之,任务即是流经器或螺栓的执行。在特定时间,每个流经器和螺栓可以具有多个实例,并在多个独立线程中运行。

Workers

拓扑结构在分布式的方式运行,针对多个工作节点运行。Storm 将任务均匀地分散在各个工作节点上。工作节点的作用是对任务进行监听,以及在有新任务到来时启动或停止处理。

Stream Grouping

数据流会从流经器流向螺栓,或从一个螺栓流向另一个螺栓。流分组控制如何向拓扑结构中分流元组,并帮助我们理解元组在拓扑结构中的流动方式。共有四种内置的分组,如下所示。

Shuffle Grouping

在随机分组中,相同数量的元组随机分配到执行螺栓的所有工作节点。下图生动地描述了这个结构。

shuffle grouping

Field Grouping

元组中具有相同值字段的元组被分组在一起,而剩余的元组被保留在外。然后,具有相同字段值的元组会被发送到执行螺栓的相同工作节点。例如,如果通过“单词”字段对流进行分组,那么具有相同字符串“你好”的元组将传入同一工作节点。下图说明了字段分组的工作原理。

field grouping

Global Grouping

所有流可以被分组并转发至一个螺栓。此分组将由源的所有实例生成的元组发送至单个目标实例(具体来讲,选择 ID 最低的工作节点)。

global grouping

All Grouping

所有分组将每个元组的单个副本发送至接收螺栓的所有实例。这种分组用于向螺栓发送信号。所有分组对于操作连接很有用。

all grouping

Apache Storm - Cluster Architecture

Apache Storm 的主要亮点之一在于它是一种容错的、没有“单点故障”(SPOF) 的快速分布式应用程序。我们可以根据需要在尽可能多的系统中安装 Apache Storm,以增加应用程序的容量。

让我们看看 Apache Storm 集群的设计和内部架构是如何的。下图显示了集群设计。

zookeeper framework

Apache Storm 有两种类型的节点, Nimbus (主节点)和 Supervisor (工作节点)。Nimbus 是 Apache Storm 的核心组件。Nimbus 的主要工作是运行 Storm 拓扑。Nimbus 分析拓扑并收集要执行的任务。然后,它会将任务分配给可用的 supervisior。

一个 supervisor 将拥有一个或多个工作进程。supervisior 会将任务委托给工作进程。工作进程将根据需要生成尽可能多的执行程序并运行任务。Apache Storm 使用内部分布式消息系统在 nimbus 和 supervisior 之间进行通信。

Components

Description

Nimbus

Nimbus 是 Storm 集群的主节点。群集中的所有其他节点都被称为 worker nodes 。主节点负责在所有工作节点之间分配数据、向工作节点分配任务和监控故障。

Supervisor

遵循 nimbus 给出的指令的节点称为 Supervisior。一个 supervisor 具有多个工作进程,它控制工作进程,以完成 nimbus 分配的任务。

Worker process

一个工作进程将执行与特定拓扑相关的任务。一个工作进程本身不会运行任务,而是创建 executors ,并要求它们执行特定任务。一个工作进程将具有多个执行程序。

Executor

一个执行程序只不过是由一个工作进程产生的单个线程。一个执行程序运行一个或多个任务,但只针对一个特定的 spout 或 bolt。

Task

一个任务执行实际的数据处理。因此,它要么是一个 spout,要么是一个 bolt。

ZooKeeper framework

Apache ZooKeeper 是一个由一个群集(一组节点)用来协调它们自己并使用强大的同步技术维护共享数据的一项服务。Nimbus 是无状态的,因此它依赖 ZooKeeper 来监控工作节点的状态。ZooKeeper 帮助 supervisior 与 nimbus 进行交互。它负责维护 nimbus 和 supervisior 的状态。

Storm 本质上是无状态的。即使无状态的本质有其自身的缺点,但它实际上帮助 Storm 以尽可能最佳、最快速的方式处理实时数据。

不过,Storm 并不完全是无状态的。它将自己的状态存储在 Apache ZooKeeper 中。由于状态在 Apache ZooKeeper 中可用,因此一个失败的 nimbus 可以重新启动并从它停止的位置继续工作。通常,类似于 monit 的服务监控工具将监控 Nimbus,并在出现任何故障时重新启动它。

Apache Storm 还具有一种称为 Trident Topology 的高级拓扑,具有状态维护,并且还提供类似于 Pig 的高级别 API。我们将在接下来的章节讨论所有这些功能。

Apache Storm - Workflow

一个正常工作的 Storm 群集应有一个 nimbus 和一个或多个管理员。另一个重要的节点是 Apache ZooKeeper,它将用于在 nimbus 和管理员之间进行协调。

现在让我们仔细了解 Apache Storm 的工作流程 −

  1. 最初,nimbus 将等待向其提交“Storm 拓扑”。

  2. 在提交拓扑后,它将处理该拓扑并收集要执行的所有任务以及要执行任务的顺序。

  3. 然后,nimbus 将任务均匀分配给所有可用的管理员。

  4. 在特定时间间隔内,所有管理员将向 nimbus 发送心跳以通知其它们仍然处于活动状态。

  5. 当一个管理者死亡且不会向 nimbus 发送心跳时,nimbus 会将任务分配给另一个管理者。

  6. 当 nimbus 本身死亡时,supervisor 将处理已分配的任务,并且没有任何问题。

  7. 一旦所有任务完成,supervisor 将等待一个新任务的到来。

  8. 与此同时,服务监控工具会自动重启已死的 nimbus。

  9. 重启的 nimbus 将从它停止的地方继续。类似地,已死的 supervisor 也可以自动重新启动。由于 nimbus 和 supervisor 都可以自动重新启动,并且都将像以前一样继续,因此 Storm 保证至少处理所有任务一次。

  10. 一旦所有拓扑都处理完毕,nimbus 将等待一个新拓扑到来,类似地,supervisor 将等待新任务。

默认情况下,Storm 集群有两种模式 −

  1. Local mode − 此模式用于开发、测试和调试,因为它是最简单的方法,可以查看所有拓扑组件协同工作。在此模式下,我们可以调整允许我们查看我们的拓扑如何在不同 Storm 配置环境中运行的参数。在本地模式下,storm 拓扑在单个 JVM 中的本地计算机上运行。

  2. Production mode − 在此模式下,我们将拓扑提交到正在工作的 storm 集群,该集群由许多进程组成,通常在不同的计算机上运行。正如在 storm 的工作流中讨论的那样,正在进行集群会无限期地运行,直至关闭。

Storm - Distributed Messaging System

Apache Storm 处理实时数据,而输入通常来自消息排队系统。外部分布式消息系统将为实时计算提供必要的输入。流经器将从消息系统读取数据,并将它转换为元组,并输入至 Apache Storm。有趣的是,Apache Storm 在其内部使用自己的分布式消息系统,来进行其 Nimbus 和 Supervisor 之间的通信。

What is Distributed Messaging System?

分布式消息基于可靠消息排队的概念。消息在客户端应用程序和消息系统之间异步排队。分布式消息系统提供了可靠性、可伸缩性和持久性的优势。

大多数消息模式都遵循 publish-subscribe 模型(即 Pub-Sub ),其中消息的发送者被称为 publishers ,而希望接收消息者被称为 subscribers

在发送者发布消息后,订阅者借助筛选选项可以接收选定的消息。通常情况下,我们有两种类型的筛选,一种是 topic-based filtering ,另一种是 content-based filtering

请注意,发布-订阅模式只能通过消息进行通信。这是一种非常松散耦合的架构;即使是发送者也不了解其订阅者是谁。许多消息模式通过消息代理实现,以供多个订阅者及时访问发布的消息。现实生活中的一个例子是 Dish TV,它发布不同的频道,如运动频道、电影频道、音乐频道等,而任何人都可以订阅他们自己的频道集,并在其订阅的频道可用时获取它们。

messaging system

下表介绍了一些常见的超高吞吐消息系统 −

Distributed messaging system

Description

Apache Kafka

Kafka 是由 LinkedIn 公司开发的,后来它成为 Apache 的一个子项目。Apache Kafka 基于支持代理的、持久化的、分布式的发布-订阅模型。Kafka 速度快、可伸缩且高效。

RabbitMQ

RabbitMQ 是一个开源的、分布式的强健消息应用程序。它易于使用,可以在所有平台上运行。

JMS(Java Message Service)

JMS 是一个开放源码 API,支持在应用程序之间创建、读取和发送消息。它提供有保证的消息传递,并遵循发布-订阅模式。

ActiveMQ

ActiveMQ 消息系统是 JMS 的一个开放源码 API。

ZeroMQ

ZeroMQ 是个无服务器点对点消息处理。它提供点对点传输、路由器-交易商消息模式。

Kestrel

Kestrel 是一个快速、可靠、简单的分布式消息队列。

Thrift Protocol

Thrift 是 Facebook 为跨语言服务开发和远程过程调用 (RPC) 而构建的。后来,它成为一个开源 Apache 项目。Apache Thrift 是一个 Interface Definition Language ,允许在已定义的数据类型之上以一种简单的方式定义新的数据类型和服务实现。

Apache Thrift 也是一个通信框架,它支持嵌入式系统、移动应用程序、Web 应用程序和许多其他编程语言。与 Apache Thrift 相关的一些关键功能是它的模块化、灵活性以及高性能。此外,它还可以在分布式应用程序中执行流式传输、消息传递和 RPC。

Storm 广泛使用 Thrift 协议进行其内部通信和数据定义。Storm 拓扑只是 Thrift Structs 。Apache Storm 中运行拓扑的 Storm Nimbus 是一个 Thrift service

Apache Storm - Installation

现在让我们来看看如何在计算机上安装 Apache Storm 框架。这里有三个主要的步骤 −

  1. 如果您的系统上还没有安装 Java,请安装 Java。

  2. Install ZooKeeper framework.

  3. Install Apache Storm framework.

Step 1 − Verifying Java Installation

使用以下命令检查您的系统是否已安装 Java。

$ java -version

如果 Java 已经存在,那么您会看到它的版本号。如果没有,请下载 JDK 的最新版本。

Step 1.1 − Download JDK

使用以下链接下载 JDK 的最新版本 − www.oracle.com

最新版本是 JDK 8u 60,文件是 “jdk-8u60-linux-x64.tar.gz” 。将文件下载到您的计算机上。

Step 1.2 − Extract files

通常,文件被下载到 downloads 文件夹中。使用以下命令提取 tar 设置。

$ cd /go/to/download/path
$ tar -zxf jdk-8u60-linux-x64.gz

Step 1.3 − Move to opt directory

为了让所有用户都可以使用 Java,将提取的 Java 内容移动到“/usr/local/java”文件夹中。

$ su
password: (type password of root user)
$ mkdir /opt/jdk
$ mv jdk-1.8.0_60 /opt/jdk/

Step 1.4 − Set path

若要设置 path 及 JAVA_HOME 变量,请将以下命令添加到 ~/.bashrc 文件中。

export JAVA_HOME =/usr/jdk/jdk-1.8.0_60
export PATH=$PATH:$JAVA_HOME/bin

现在将所有更改应用到当前运行的系统中。

$ source ~/.bashrc

Step 1.5 − Java Alternatives

使用以下命令更改 Java 备用项。

update-alternatives --install /usr/bin/java java /opt/jdk/jdk1.8.0_60/bin/java 100

Step 1.6

现在,使用第 1 步中说明的验证命令 (java -version) 验证 Java 安装。

Step 2 − ZooKeeper Framework Installation

Step 2.1 − Download ZooKeeper

要在计算机上安装 ZooKeeper 框架,请访问以下链接并下载 ZooKeeper http://zookeeper.apache.org/releases.html 的最新版本

到目前为止,ZooKeeper 的最新版本是 3.4.6 (ZooKeeper-3.4.6.tar.gz)。

Step 2.2 − Extract tar file

使用以下命令提取 tar 文件 −

$ cd opt/
$ tar -zxf zookeeper-3.4.6.tar.gz
$ cd zookeeper-3.4.6
$ mkdir data

Step 2.3 − Create configuration file

使用命令 "vi conf/zoo.cfg" 打开名为“conf/zoo.cfg”的配置文件,并使用所有以下参数作为起始点。

$ vi conf/zoo.cfg
tickTime=2000
dataDir=/path/to/zookeeper/data
clientPort=2181
initLimit=5
syncLimit=2

一旦配置文件已成功保存,便可以启动 ZooKeeper 服务器。

Step 2.4 − Start ZooKeeper Server

使用以下命令启动 ZooKeeper 服务器。

$ bin/zkServer.sh start

执行该命令后,你会收到如下的响应 −

$ JMX enabled by default
$ Using config: /Users/../zookeeper-3.4.6/bin/../conf/zoo.cfg
$ Starting zookeeper ... STARTED

Step 2.5 − Start CLI

使用以下命令启动 CLI。

$ bin/zkCli.sh

执行上述命令后,你将连接到 ZooKeeper 服务器并得到以下响应。

Connecting to localhost:2181
................
................
................
Welcome to ZooKeeper!
................
................
WATCHER::
WatchedEvent state:SyncConnected type: None path:null
[zk: localhost:2181(CONNECTED) 0]

Step 2.6 − Stop ZooKeeper Server

连接到服务器并执行完所有操作后,可以使用以下命令停止 ZooKeeper 服务器。

bin/zkServer.sh stop

你已成功在你的机器上安装了 Java 和 ZooKeeper。现在让我们看看安装 Apache Storm 框架的步骤。

Step 3 − Apache Storm Framework Installation

Step 3.1 Download Storm

要在你的机器上安装 Storm 框架,请访问以下链接并下载 Storm 的最新版本。

截至目前,Storm 的最新版本为“apache-storm-0.9.5.tar.gz”。

Step 3.2 − Extract tar file

使用以下命令提取 tar 文件 −

$ cd opt/
$ tar -zxf apache-storm-0.9.5.tar.gz
$ cd apache-storm-0.9.5
$ mkdir data

Step 3.3 − Open configuration file

Storm 的当前发行版包含一个位于“conf/storm.yaml”的文件,可配置 Storm 守护程序。将以下信息添加到该文件中。

$ vi conf/storm.yaml
storm.zookeeper.servers:
 - "localhost"
storm.local.dir: “/path/to/storm/data(any path)”
nimbus.host: "localhost"
supervisor.slots.ports:
 - 6700
 - 6701
 - 6702
 - 6703

应用完所有更改后,保存并返回到终端。

Step 3.4 − Start the Nimbus

$ bin/storm nimbus

Step 3.5 − Start the Supervisor

$ bin/storm supervisor

Step 3.6 Start the UI

$ bin/storm ui

在启动 Storm 用户界面应用程序后,在浏览器中输入 URL,你便可以看到 Storm 集群信息及其正在运行的拓扑。该页面应该与以下屏幕截图类似。

storm ui

Apache Storm - Working Example

我们已经了解了 Apache Storm 的核心技术细节,现在是时候编写一些简单场景了。

Scenario – Mobile Call Log Analyzer

移动呼叫及其持续时间将作为输入提供给 Apache Storm,Storm 将处理和分组相同呼叫者和接收者之间的呼叫以及他们的呼叫总数。

Spout Creation

Spout 是一种用于数据生成的组件。基本上,Spout 将实现一个 IRichSpout 接口。“IRichSpout”接口有以下重要的方法 −

  1. open − 为 spout 提供了一个执行环境。执行器将运行此方法来初始化 spout。

  2. nextTuple - 借由收集器发射生成的数据。

  3. close - 当注水器关闭时调用此方法。

  4. declareOutputFields − 声明元组的输出模式。

  5. ack - 确认已处理特定数据元组

  6. fail - 指定某些数据元组尚未处理,也不应重新处理。

Open

open 的语法方法如下所示

open(Map conf, TopologyContext context, SpoutOutputCollector collector)
  1. conf - 提供此注水器的暴风配置。

  2. context - 提供有关拓扑中注水器放置位置、任务 ID、输入和输出信息等完整信息。

  3. collector - 能够向螺栓发出的数据元组。

nextTuple

nextTuple 的语法方法如下所示

nextTuple()

nextTuple() 将从与 ack() 和 fail() 方法相同的循环中周期性调用。当没有工作可做时,它必须释放对线程的控制,这样其他方法就有机会被调用。因此,nextTuple 的首行用于检查处理是否已完成。如果是,它应至少休眠一毫秒,以便在返回之前减少对处理器的负载。

close

close 的语法方法如下所示

close()

declareOutputFields

declareOutputFields 的语法方法如下所示

declareOutputFields(OutputFieldsDeclarer declarer)

declarer - 用于声明输出流 ID、输出字段等。

该方法用于指定数据元组的输出模式。

ack

ack 的语法方法如下所示

ack(Object msgId)

此方法确认已处理特定数据元组。

fail

nextTuple 的语法方法如下所示

ack(Object msgId)

该方法告知尚未完全处理特定数据元组。Storm 将重新处理特定数据元组。

FakeCallLogReaderSpout

在我们的情况下,我们需要收集通话记录的详细信息。通话记录的信息包含。

  1. caller number

  2. receiver number

  3. duration

由于我们没有通话记录的实时信息,因此我们将生成伪造的通话记录。伪造的信息将使用 Random 类创建。完整程序代码如下所示。

Coding − FakeCallLogReaderSpout.java

import java.util.*;
//import storm tuple packages
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

//import Spout interface packages
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;

//Create a class FakeLogReaderSpout which implement IRichSpout interface
   to access functionalities

public class FakeCallLogReaderSpout implements IRichSpout {
   //Create instance for SpoutOutputCollector which passes tuples to bolt.
   private SpoutOutputCollector collector;
   private boolean completed = false;

   //Create instance for TopologyContext which contains topology data.
   private TopologyContext context;

   //Create instance for Random class.
   private Random randomGenerator = new Random();
   private Integer idx = 0;

   @Override
   public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
      this.context = context;
      this.collector = collector;
   }

   @Override
   public void nextTuple() {
      if(this.idx <= 1000) {
         List<String> mobileNumbers = new ArrayList<String>();
         mobileNumbers.add("1234123401");
         mobileNumbers.add("1234123402");
         mobileNumbers.add("1234123403");
         mobileNumbers.add("1234123404");

         Integer localIdx = 0;
         while(localIdx++ < 100 && this.idx++ < 1000) {
            String fromMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
            String toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));

            while(fromMobileNumber == toMobileNumber) {
               toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
            }

            Integer duration = randomGenerator.nextInt(60);
            this.collector.emit(new Values(fromMobileNumber, toMobileNumber, duration));
         }
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("from", "to", "duration"));
   }

   //Override all the interface methods
   @Override
   public void close() {}

   public boolean isDistributed() {
      return false;
   }

   @Override
   public void activate() {}

   @Override
   public void deactivate() {}

   @Override
   public void ack(Object msgId) {}

   @Override
   public void fail(Object msgId) {}

   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
}

Bolt Creation

Bolt 是一个组件,它将元组作为输入,处理该元组,并产生新的元组作为输出。Bolts 将实现 IRichBolt 接口。在这个程序中,使用两个 bolt 类 CallLogCreatorBoltCallLogCounterBolt 来执行操作。

IRichBolt 接口具有以下方法:

  1. prepare − 为 bolt 提供一个执行环境。执行程序将运行此方法来初始化 spout。

  2. execute − 处理单个输入元组。

  3. cleanup − 在 bolt 将要关闭时调用。

  4. declareOutputFields − 声明元组的输出模式。

Prepare

prepare 方法的签名如下 −

prepare(Map conf, TopologyContext context, OutputCollector collector)
  1. conf − 为此 bolt 提供 Storm 配置。

  2. context − 提供关于 bolt 在拓扑结构中的完整信息,包括其任务 ID、输入和输出信息等。

  3. collector − 使我们能够发出处理过的元组。

execute

execute 方法的签名如下 −

execute(Tuple tuple)

此处的 tuple 是要处理的输入元组。

execute 方法一次处理单个元组。可以通过 Tuple 类的 getValue 方法来访问元组数据。不必立即处理输入元组。可以处理多个元组,并将其输出为一个输出元组。可以通过使用 OutputCollector 类来发出处理过的元组。

cleanup

cleanup 方法的签名如下 −

cleanup()

declareOutputFields

declareOutputFields 的语法方法如下所示

declareOutputFields(OutputFieldsDeclarer declarer)

此处参数 declarer 用于声明输出流 ID、输出字段等。

此方法用于指定元组的输出模式

Call log Creator Bolt

呼叫记录创建器 bolt 接收呼叫记录元组。呼叫记录元组包含呼叫者号码、接收者号码和呼叫持续时间。此 bolt 只需通过组合呼叫者号码和接收者号码来创建新值。新值的格式为“呼叫者号码 – 接收者号码”,并将其命名为新字段“call”。完整代码如下。

Coding − CallLogCreatorBolt.java

//import util packages
import java.util.HashMap;
import java.util.Map;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;

//import Storm IRichBolt package
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

//Create a class CallLogCreatorBolt which implement IRichBolt interface
public class CallLogCreatorBolt implements IRichBolt {
   //Create instance for OutputCollector which collects and emits tuples to produce output
   private OutputCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      String from = tuple.getString(0);
      String to = tuple.getString(1);
      Integer duration = tuple.getInteger(2);
      collector.emit(new Values(from + " - " + to, duration));
   }

   @Override
   public void cleanup() {}

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("call", "duration"));
   }

   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
}

Call log Counter Bolt

呼叫记录计数器 bolt 将呼叫及其持续时间作为元组接收。此 bolt 在准备方法中初始化了一个字典(Map)对象。在 execute 方法中,它将检查元组并在 tuple 中针对每个新“call”值在字典对象中创建一个新条目,并在字典对象中设置值 1。对于字典中已经存在的条目,它只会递增其值。简单来说,此 bolt 将呼叫及其计数保存在字典对象中。我们可以将呼叫及其计数保存在字典中,也可以将其保存在数据源中。完整的程序代码如下 −

Coding − CallLogCounterBolt.java

import java.util.HashMap;
import java.util.Map;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

public class CallLogCounterBolt implements IRichBolt {
   Map<String, Integer> counterMap;
   private OutputCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.counterMap = new HashMap<String, Integer>();
      this.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      String call = tuple.getString(0);
      Integer duration = tuple.getInteger(1);

      if(!counterMap.containsKey(call)){
         counterMap.put(call, 1);
      }else{
         Integer c = counterMap.get(call) + 1;
         counterMap.put(call, c);
      }

      collector.ack(tuple);
   }

   @Override
   public void cleanup() {
      for(Map.Entry<String, Integer> entry:counterMap.entrySet()){
         System.out.println(entry.getKey()+" : " + entry.getValue());
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("call"));
   }

   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }

}

Creating Topology

Storm 拓扑结构从本质上是一个 Thrift 结构。TopologyBuilder 类提供了简单易用的方法来创建复杂拓扑结构。TopologyBuilder 类具有设置 spout (setSpout) 和 bolt (setBolt) 的方法。最后,TopologyBuilder 具有 createTopology 来创建拓扑结构。使用以下代码段来创建拓扑结构 −

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());

builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
   .shuffleGrouping("call-log-reader-spout");

builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
   .fieldsGrouping("call-log-creator-bolt", new Fields("call"));

shuffleGroupingfieldsGrouping 方法有助于为 spout 和 bolt 设置流分组。

Local Cluster

出于开发目的,我们可以使用“LocalCluster”对象创建一个本地集群,然后使用“LocalCluster”类的“submitTopology”方法提交拓扑结构。“submitTopology”的一个参数是“Config”类的实例。“Config”类用于在提交拓扑结构之前设置配置选项。此配置选项将在运行时与集群配置合并,并通过准备方法发送到所有任务(spout 和 bolt)。一旦拓扑结构提交到集群,我们将等待 10 秒,以便集群计算提交的拓扑结构,然后再使用“LocalCluster”的“shutdown”方法关闭集群。完整的程序代码如下 −

Coding − LogAnalyserStorm.java

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

//import storm configuration packages
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;

//Create main class LogAnalyserStorm submit topology.
public class LogAnalyserStorm {
   public static void main(String[] args) throws Exception{
      //Create Config instance for cluster configuration
      Config config = new Config();
      config.setDebug(true);

      //
      TopologyBuilder builder = new TopologyBuilder();
      builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());

      builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
         .shuffleGrouping("call-log-reader-spout");

      builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
         .fieldsGrouping("call-log-creator-bolt", new Fields("call"));

      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("LogAnalyserStorm", config, builder.createTopology());
      Thread.sleep(10000);

      //Stop the topology

      cluster.shutdown();
   }
}

Building and Running the Application

完整应用程序包含四个 Java 代码。它们是 −

  1. FakeCallLogReaderSpout.java

  2. CallLogCreaterBolt.java

  3. CallLogCounterBolt.java

  4. LogAnalyerStorm.java

可以通过以下命令构建应用程序 −

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java

可以通过以下命令运行应用程序 −

java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserStorm

Output

应用程序启动后,它将输出有关集群启动过程、喷口和螺栓处理以及最终集群关闭过程的完整详细信息。在“CallLogCounterBolt”中,我们打印了呼叫及其计数详细信息。此信息将显示在控制台上,如下所示 −

1234123402 - 1234123401 : 78
1234123402 - 1234123404 : 88
1234123402 - 1234123403 : 105
1234123401 - 1234123404 : 74
1234123401 - 1234123403 : 81
1234123401 - 1234123402 : 81
1234123403 - 1234123404 : 86
1234123404 - 1234123401 : 63
1234123404 - 1234123402 : 82
1234123403 - 1234123402 : 83
1234123404 - 1234123403 : 86
1234123403 - 1234123401 : 93

Non-JVM languages

Storm 拓扑由 Thrift 接口实现,这使得使用任何语言提交拓扑变得很容易。Storm 支持 Ruby、Python 和许多其他语言。让我们看看 Python 绑定。

Python Binding

Python 是一种通用的解释型、交互式、面向对象且高级的编程语言。Storm 支持 Python 来实现其拓扑。Python 支持发射、锚定、确认日志操作。

如您所知,螺栓可以用任何语言定义。用另一种语言编写的螺栓作为子进程执行,并且 Storm 通过 stdin/stdout 上的 JSON 消息与那些子进程通信。首先使用支持 Python 绑定的示例螺栓 WordCount。

public static class WordCount implements IRichBolt {
   public WordSplit() {
      super("python", "splitword.py");
   }

   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("word"));
   }
}

此处,类 WordCount 实现 IRichBolt 接口并在 Python 实现指定的超类方法参数“splitword.py”下运行。现在,创建一个名为 “splitword.py” 的 Python 实现。

import storm
   class WordCountBolt(storm.BasicBolt):
      def process(self, tup):
         words = tup.values[0].split(" ")
         for word in words:
         storm.emit([word])
WordCountBolt().run()

这是一个 Python 示例实现,用于计算给定句子中的单词数量。同样,您还可以与其他支持的语言绑定。

Apache Storm - Trident

Trident 是 Storm 的扩展。像 Storm 一样,Trident 也由 Twitter 开发。开发 Trident 的主要原因是提供一个基于 Storm 的高级抽象以及有状态流处理和低延迟分布式查询。

Trident 使用水 spout 和螺栓,但这些底层组件在执行前是由 Trident 自动生成的。Trident 具有函数、过滤器、联接、分组和聚合。

Trident 处理作为一系列批次的流,这些批次称为事务。通常来说,这些小批次的大小根据输入流的规模,将达到数千或数百万个元组。通过这种方式,Trident 不同于按元组执行处理的 Storm。

批处理的概念与数据库事务非常相似。每个事务都被分配一个事务 ID。一旦事务的所有处理完成,事务就会被认为成功。但是,如果处理事务元组中的一个失败,整个事务将被重新传输。对于每个批次,Trident 将在事务开始时调用 beginCommit,在事务结束时调用 commit。

Trident Topology

Trident API 暴露出使用 “TridentTopology” 类创建 Trident 拓扑的简单选项。基本上,Trident 拓扑从喷口接收输入流,并对流执行有序的操作序列(过滤、聚合、分组等)。Storm Tuple 被 Trident Tuple 替换,Bolt 被操作替换。一个简单的 Trident 拓扑可以如下创建 −

TridentTopology topology = new TridentTopology();

Trident Tuples

Trident 元组是一个带名称的值列表。TridentTuple 接口是 Trident 拓扑的数据模型。TridentTuple 接口是可以由 Trident 拓扑处理的数据的基本单位。

Trident Spout

Trident 喷口类似于 Storm 喷口,并且具有使用 Trident 功能的附加选项。实际上,我们仍然可以使用我们在 Storm 拓扑中使用的 IRichSpout,但其本质上是非事务性的,我们无法使用 Trident 提供的优势。

具有使用 Trident 功能的所有功能的基本喷口是 "ITridentSpout"。它支持事务语义和不透明事务语义。其他喷口有 IBatchSpout、IPartitionedTridentSpout 和 IOpaquePartitionedTridentSpout。

除了这些通用喷口之外,Trident 还有许多三叉喷口的示例实现。其中之一是 FeederBatchSpout 喷口,我们可以使用它轻松发送三叉元组的已命名列表,而无需担心批处理、并行性等问题。

FeederBatchSpout 的创建和数据馈送可以按如下所示完成 −

TridentTopology topology = new TridentTopology();
FeederBatchSpout testSpout = new FeederBatchSpout(
   ImmutableList.of("fromMobileNumber", "toMobileNumber", “duration”));
topology.newStream("fixed-batch-spout", testSpout)
testSpout.feed(ImmutableList.of(new Values("1234123401", "1234123402", 20)));

Trident Operations

Trident 依赖于 “Trident Operation” 来处理三叉元组的输入流。Trident API 具有大量内置操作,可用于处理从简单到复杂的流处理。这些操作范围从验证三叉元组的简单分组和聚合到复杂的验证。让我们了解一下最重要的和最常用的操作。

Filter

过滤器是一个用于执行输入验证任务的对象。Trident 过滤器获取三叉元组字段的子集作为输入,并根据满足或不满足某些条件返回 true 或 false。如果返回 true,则元组保留在输出流中;否则,元组从流中移除。过滤器基本上将继承 BaseFilter 类并实现 isKeep 方法。如下所示是过滤器操作的一个示例实现 −

public class MyFilter extends BaseFilter {
   public boolean isKeep(TridentTuple tuple) {
      return tuple.getInteger(1) % 2 == 0;
   }
}

input

[1, 2]
[1, 3]
[1, 4]

output

[1, 2]
[1, 4]

可以使用 “each” 方法在拓扑中调用过滤器函数。可以使用 “Fields” 类指定输入(三叉元组子集)。示例代码如下 −

TridentTopology topology = new TridentTopology();
topology.newStream("spout", spout)
.each(new Fields("a", "b"), new MyFilter())

Function

Function 是一个用于对单个三叉元组执行简单操作的对象。它采用三叉元组字段的子集,并发出零个或多个新的三叉元组字段。

Function 基本上继承 BaseFunction 类并实现 execute 方法。示例实现如下 −

public class MyFunction extends BaseFunction {
   public void execute(TridentTuple tuple, TridentCollector collector) {
      int a = tuple.getInteger(0);
      int b = tuple.getInteger(1);
      collector.emit(new Values(a + b));
   }
}

input

[1, 2]
[1, 3]
[1, 4]

output

[1, 2, 3]
[1, 3, 4]
[1, 4, 5]

就像过滤器操作一样,可以使用 each 方法在拓扑中调用函数操作。示例代码如下 −

TridentTopology topology = new TridentTopology();
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d")));

Aggregation

聚合是一个用于对输入批次或分区或流执行聚合操作的对象。Trident 有三种类型的聚合。具体如下 −

  1. aggregate − 孤立地聚合每个批次的三叉元组。在聚合过程中,最初使用全局分组对元组进行重新分区,以将同一批次的所有分区合并为单个分区。

  2. partitionAggregate − 聚合每个分区,而不是整个批次的三叉元组。分区聚合的输出完全替换输入元组。分区聚合的输出包含一个单字段元组。

  3. persistentaggregate − 在所有批次中的所有三叉元组上进行聚合,并将结果存储在内存或数据库中。

TridentTopology topology = new TridentTopology();

// aggregate operation
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .aggregate(new Count(), new Fields(“count”))

// partitionAggregate operation
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .partitionAggregate(new Count(), new Fields(“count"))

// persistentAggregate - saving the count to memory
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));

可以使用 CombinerAggregator、ReducerAggregator 或通用 Aggregator 接口创建聚合操作。上面示例中使用的 “count” 聚合器是内置聚合器之一。它使用 “CombinerAggregator” 实现。实现如下 −

public class Count implements CombinerAggregator<Long> {
   @Override
   public Long init(TridentTuple tuple) {
      return 1L;
   }

   @Override
   public Long combine(Long val1, Long val2) {
      return val1 + val2;
   }

   @Override
   public Long zero() {
      return 0L;
   }
}

Grouping

分组操作是内置操作,可通过 groupBy 方法调用。groupBy 方法通过对指定字段执行 partitionBy,重新对流进行分区,然后在每个分区中,将组字段相等的元组分组在一起。通常,我们使用“groupBy”和“persistentAggregate”来获取分组聚合。示例代码如下 −

TridentTopology topology = new TridentTopology();

// persistentAggregate - saving the count to memory
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .groupBy(new Fields(“d”)
   .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));

Merging and Joining

可以通过分别使用“merge”和“join”方法来进行合并和连接。合并组合一个或多个流。连接与合并类似,但连接使用来自两侧的三元组字段来检查和连接两个流。此外,连接仅在批处理级别起作用。示例代码如下 −

TridentTopology topology = new TridentTopology();
topology.merge(stream1, stream2, stream3);
topology.join(stream1, new Fields("key"), stream2, new Fields("x"),
   new Fields("key", "a", "b", "c"));

State Maintenance

Trident 提供了维护状态的机制。状态信息可以存储在拓扑本身中,否则你也可以将其存储在单独的数据库中。目的是维护一个状态:如果在处理过程中任何元组发生故障,那么将重试失败的元组。在更新状态时,这会导致问题,因为你无法确定先前是否更新过此元组的状态。如果在更新状态之前元组发生故障,那么重试元组将使状态保持稳定。但是,如果在更新状态后元组发生故障,那么重试同一个元组将再次增加数据库中记录的数目,并使状态不稳定。需要执行以下步骤来确保仅处理消息一次 −

  1. 以小批次处理元组。

  2. 为每个批次分配一个唯一 ID。如果重试批次,它将获得相同的唯一 ID。

  3. 状态更新在批次之间排序。例如,在第一个批次的状态更新完成之前,不可能更新第二个批次的状态。

Distributed RPC

分布式 RPC 用于从 Trident 拓扑查询和检索结果。Storm 有一个内置的分布式 RPC 服务器。分布式 RPC 服务器接收来自客户端的 RPC 请求,并将其传递给拓扑。拓扑处理请求并将结果发送到分布式 RPC 服务器,该服务器由分布式 RPC 服务器重定向到客户端。Trident 的分布式 RPC 查询与普通 RPC 查询一样执行,只是这些查询并行运行。

When to Use Trident?

在许多用例中,如果要求仅处理一次查询,我们可以通过在 Trident 中编写拓扑来实现。另一方面,在 Storm 中难以实现处理一次。因此,在需要精确处理一次的用例中,Trident 将非常有用。Trident 不适用于所有用例,特别是高性能用例,因为它增加了 Storm 的复杂性并管理状态。

Working Example of Trident

我们准备将上一部分中研究过的呼叫记录分析器应用程序转换为 Trident 框架。与普通 Storm 相比,Trident 应用程序相对容易,这要归功于其高级 API。在 Trident 中,Storm 主要需要执行函数、过滤器、聚合、分组、连接和合并运算中的任何一个。最后,我们将使用 LocalDRPC 类启动 DRPC 服务器,并使用 LocalDRPC 类的 execute 方法搜索一些关键字。

Formatting the call information

FormatCall 类的目的是格式化呼叫信息,包括“呼叫号码”和“接收号码”。完整的程序代码如下所示 −

Coding: FormatCall.java

import backtype.storm.tuple.Values;

import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;

public class FormatCall extends BaseFunction {
   @Override
   public void execute(TridentTuple tuple, TridentCollector collector) {
      String fromMobileNumber = tuple.getString(0);
      String toMobileNumber = tuple.getString(1);
      collector.emit(new Values(fromMobileNumber + " - " + toMobileNumber));
   }
}

CSVSplit

CSVSplit 类的目的是基于“逗号 (,)”拆分输入字符串,并发出字符串中的每个单词。此函数用于解析分布式查询的输入参数。完整的代码如下所示 −

Coding: CSVSplit.java

import backtype.storm.tuple.Values;

import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;

public class CSVSplit extends BaseFunction {
   @Override
   public void execute(TridentTuple tuple, TridentCollector collector) {
      for(String word: tuple.getString(0).split(",")) {
         if(word.length() > 0) {
            collector.emit(new Values(word));
         }
      }
   }
}

Log Analyzer

这是主应用程序。最初,应用程序将初始化 TridentTopology 并使用 FeederBatchSpout 输入呼叫者信息。可以使用 TridentTopology 类的 newStream 方法创建 Trident 拓扑流。类似地,可以使用 TridentTopology 类的 newDRCPStream 方法创建 Trident 拓扑 DRPC 流。可以使用 LocalDRPC 类创建一个简单的 DRCP 服务器。 LocalDRPC 有一个用于搜索一些关键字的 execute 方法。完整的代码如下所示。

Coding: LogAnalyserTrident.java

import java.util.*;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.utils.DRPCClient;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import storm.trident.TridentState;
import storm.trident.TridentTopology;
import storm.trident.tuple.TridentTuple;

import storm.trident.operation.builtin.FilterNull;
import storm.trident.operation.builtin.Count;
import storm.trident.operation.builtin.Sum;
import storm.trident.operation.builtin.MapGet;
import storm.trident.operation.builtin.Debug;
import storm.trident.operation.BaseFilter;

import storm.trident.testing.FixedBatchSpout;
import storm.trident.testing.FeederBatchSpout;
import storm.trident.testing.Split;
import storm.trident.testing.MemoryMapState;

import com.google.common.collect.ImmutableList;

public class LogAnalyserTrident {
   public static void main(String[] args) throws Exception {
      System.out.println("Log Analyser Trident");
      TridentTopology topology = new TridentTopology();

      FeederBatchSpout testSpout = new FeederBatchSpout(ImmutableList.of("fromMobileNumber",
         "toMobileNumber", "duration"));

      TridentState callCounts = topology
         .newStream("fixed-batch-spout", testSpout)
         .each(new Fields("fromMobileNumber", "toMobileNumber"),
         new FormatCall(), new Fields("call"))
         .groupBy(new Fields("call"))
         .persistentAggregate(new MemoryMapState.Factory(), new Count(),
         new Fields("count"));

      LocalDRPC drpc = new LocalDRPC();

      topology.newDRPCStream("call_count", drpc)
         .stateQuery(callCounts, new Fields("args"), new MapGet(), new Fields("count"));

      topology.newDRPCStream("multiple_call_count", drpc)
         .each(new Fields("args"), new CSVSplit(), new Fields("call"))
         .groupBy(new Fields("call"))
         .stateQuery(callCounts, new Fields("call"), new MapGet(),
         new Fields("count"))
         .each(new Fields("call", "count"), new Debug())
         .each(new Fields("count"), new FilterNull())
         .aggregate(new Fields("count"), new Sum(), new Fields("sum"));

      Config conf = new Config();
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("trident", conf, topology.build());
      Random randomGenerator = new Random();
      int idx = 0;

      while(idx < 10) {
         testSpout.feed(ImmutableList.of(new Values("1234123401",
            "1234123402", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123401",
            "1234123403", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123401",
            "1234123404", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123402",
            "1234123403", randomGenerator.nextInt(60))));

         idx = idx + 1;
      }

      System.out.println("DRPC : Query starts");
      System.out.println(drpc.execute("call_count","1234123401 - 1234123402"));
      System.out.println(drpc.execute("multiple_call_count", "1234123401 -
         1234123402,1234123401 - 1234123403"));
      System.out.println("DRPC : Query ends");

      cluster.shutdown();
      drpc.shutdown();

      // DRPCClient client = new DRPCClient("drpc.server.location", 3772);
   }
}

Building and Running the Application

完整的应用程序有三个 Java 代码。它们如下:

  1. FormatCall.java

  2. CSVSplit.java

  3. LogAnalyerTrident.java

可以使用以下命令构建应用程序 −

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java

可以使用以下命令运行应用程序 −

java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserTrident

Output

一旦应用程序启动,应用程序将输出有关集群启动过程、处理操作、DRPC 服务器和客户端信息以及集群关闭过程的完整详细信息。该输出将显示在控制台上,如下所示。

DRPC : Query starts
[["1234123401 - 1234123402",10]]
DEBUG: [1234123401 - 1234123402, 10]
DEBUG: [1234123401 - 1234123403, 10]
[[20]]
DRPC : Query ends

Apache Storm in Twitter

在本节中,我们将讨论 Apache Storm 的实时应用程序。我们将看到 Twitter 中如何使用 Storm。

Twitter

Twitter 是一款在线社交网络服务,它提供了一个用于发送和接收用户推文(tweet)的平台。注册用户可以阅读并发布推文,但未注册用户只能阅读推文。主题标签用于通过在相关关键词之前添加 # 来按关键词对推文进行分类。现在,让我们来了解一个按主题查找最常用的主题标签的实时场景。

Spout Creation

喷发的目的是尽快获取人们提交的推文。Twitter 提供“Twitter Streaming API”,这是一个基于 Web 服务的工具,用于实时检索人们提交的推文。可以以任何编程语言访问 Twitter Streaming API。

twitter4j 是一个开源的非官方 Java 库,它提供了基于 Java 的模块以轻松访问 Twitter Streaming API。 twitter4j 提供了一个基于侦听器的框架来访问推文。要访问 Twitter Streaming API,我们需要登录 Twitter 开发人员帐户,并应该获取以下 OAuth 身份验证详细信息。

  1. Customerkey

  2. CustomerSecret

  3. AccessToken

  4. AccessTookenSecret

Storm 在其入门套件中提供了 twitter 喷发 TwitterSampleSpout, 。我们将使用它来检索推文。该喷发需要 OAuth 身份验证详细信息和至少一个关键词。该喷发将基于关键词发出实时推文。完整的程序代码如下所示。

Coding: TwitterSampleSpout.java

import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;

import twitter4j.FilterQuery;
import twitter4j.StallWarning;
import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;

import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;
import twitter4j.auth.AccessToken;
import twitter4j.conf.ConfigurationBuilder;

import backtype.storm.Config;
import backtype.storm.spout.SpoutOutputCollector;

import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import backtype.storm.utils.Utils;

@SuppressWarnings("serial")
public class TwitterSampleSpout extends BaseRichSpout {
   SpoutOutputCollector _collector;
   LinkedBlockingQueue<Status> queue = null;
   TwitterStream _twitterStream;

   String consumerKey;
   String consumerSecret;
   String accessToken;
   String accessTokenSecret;
   String[] keyWords;

   public TwitterSampleSpout(String consumerKey, String consumerSecret,
      String accessToken, String accessTokenSecret, String[] keyWords) {
         this.consumerKey = consumerKey;
         this.consumerSecret = consumerSecret;
         this.accessToken = accessToken;
         this.accessTokenSecret = accessTokenSecret;
         this.keyWords = keyWords;
   }

   public TwitterSampleSpout() {
      // TODO Auto-generated constructor stub
   }

   @Override
   public void open(Map conf, TopologyContext context,
      SpoutOutputCollector collector) {
         queue = new LinkedBlockingQueue<Status>(1000);
         _collector = collector;
         StatusListener listener = new StatusListener() {
            @Override
            public void onStatus(Status status) {
               queue.offer(status);
            }

            @Override
            public void onDeletionNotice(StatusDeletionNotice sdn) {}

            @Override
            public void onTrackLimitationNotice(int i) {}

            @Override
            public void onScrubGeo(long l, long l1) {}

            @Override
            public void onException(Exception ex) {}

            @Override
            public void onStallWarning(StallWarning arg0) {
               // TODO Auto-generated method stub
            }
         };

         ConfigurationBuilder cb = new ConfigurationBuilder();

         cb.setDebugEnabled(true)
            .setOAuthConsumerKey(consumerKey)
            .setOAuthConsumerSecret(consumerSecret)
            .setOAuthAccessToken(accessToken)
            .setOAuthAccessTokenSecret(accessTokenSecret);

         _twitterStream = new TwitterStreamFactory(cb.build()).getInstance();
         _twitterStream.addListener(listener);

         if (keyWords.length == 0) {
            _twitterStream.sample();
         }else {
            FilterQuery query = new FilterQuery().track(keyWords);
            _twitterStream.filter(query);
         }
   }

   @Override
   public void nextTuple() {
      Status ret = queue.poll();

      if (ret == null) {
         Utils.sleep(50);
      } else {
         _collector.emit(new Values(ret));
      }
   }

   @Override
   public void close() {
      _twitterStream.shutdown();
   }

   @Override
   public Map<String, Object> getComponentConfiguration() {
      Config ret = new Config();
      ret.setMaxTaskParallelism(1);
      return ret;
   }

   @Override
   public void ack(Object id) {}

   @Override
   public void fail(Object id) {}

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("tweet"));
   }
}

Hashtag Reader Bolt

喷发发出的推文将转发到 HashtagReaderBolt ,它将处理该推文并发出所有可用的主题标签。HashtagReaderBolt 使用了 twitter4j 提供的 getHashTagEntities 方法。getHashTagEntities 读取推文并返回主题标签列表。完整的程序代码如下 −

Coding: HashtagReaderBolt.java

import java.util.HashMap;
import java.util.Map;

import twitter4j.*;
import twitter4j.conf.*;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

public class HashtagReaderBolt implements IRichBolt {
   private OutputCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      Status tweet = (Status) tuple.getValueByField("tweet");
      for(HashtagEntity hashtage : tweet.getHashtagEntities()) {
         System.out.println("Hashtag: " + hashtage.getText());
         this.collector.emit(new Values(hashtage.getText()));
      }
   }

   @Override
   public void cleanup() {}

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("hashtag"));
   }

   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }

}

Hashtag Counter Bolt

发出的主题标签将转发到 HashtagCounterBolt 。此螺栓将处理所有主题标签,并将每个主题标签及其计数保存在内存中,使用 Java Map 对象。完整的程序代码如下所示。

Coding: HashtagCounterBolt.java

import java.util.HashMap;
import java.util.Map;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;

import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

public class HashtagCounterBolt implements IRichBolt {
   Map<String, Integer> counterMap;
   private OutputCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.counterMap = new HashMap<String, Integer>();
      this.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      String key = tuple.getString(0);

      if(!counterMap.containsKey(key)){
         counterMap.put(key, 1);
      }else{
         Integer c = counterMap.get(key) + 1;
         counterMap.put(key, c);
      }

      collector.ack(tuple);
   }

   @Override
   public void cleanup() {
      for(Map.Entry<String, Integer> entry:counterMap.entrySet()){
         System.out.println("Result: " + entry.getKey()+" : " + entry.getValue());
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("hashtag"));
   }

   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }

}

Submitting a Topology

提交拓扑是主要应用程序。Twitter 拓扑包括 TwitterSampleSpoutHashtagReaderBoltHashtagCounterBolt 。以下程序代码显示了如何提交拓扑。

Coding: TwitterHashtagStorm.java

import java.util.*;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;

public class TwitterHashtagStorm {
   public static void main(String[] args) throws Exception{
      String consumerKey = args[0];
      String consumerSecret = args[1];

      String accessToken = args[2];
      String accessTokenSecret = args[3];

      String[] arguments = args.clone();
      String[] keyWords = Arrays.copyOfRange(arguments, 4, arguments.length);

      Config config = new Config();
      config.setDebug(true);

      TopologyBuilder builder = new TopologyBuilder();
      builder.setSpout("twitter-spout", new TwitterSampleSpout(consumerKey,
         consumerSecret, accessToken, accessTokenSecret, keyWords));

      builder.setBolt("twitter-hashtag-reader-bolt", new HashtagReaderBolt())
         .shuffleGrouping("twitter-spout");

      builder.setBolt("twitter-hashtag-counter-bolt", new HashtagCounterBolt())
         .fieldsGrouping("twitter-hashtag-reader-bolt", new Fields("hashtag"));

      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("TwitterHashtagStorm", config,
         builder.createTopology());
      Thread.sleep(10000);
      cluster.shutdown();
   }
}

Building and Running the Application

完整应用程序有四个 Java 代码。它们如下所示 −

  1. TwitterSampleSpout.java

  2. HashtagReaderBolt.java

  3. HashtagCounterBolt.java

  4. TwitterHashtagStorm.java

你可以使用以下命令编译应用程序 −

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:”/path/to/twitter4j/lib/*” *.java

使用以下命令执行应用程序 −

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:”/path/to/twitter4j/lib/*”:.
TwitterHashtagStorm <customerkey> <customersecret> <accesstoken> <accesstokensecret>
<keyword1> <keyword2> … <keywordN>

Output

应用程序将打印当前可用的标签和其计数。输出应类似于以下内容 −

Result: jazztastic : 1
Result: foodie : 1
Result: Redskins : 1
Result: Recipe : 1
Result: cook : 1
Result: android : 1
Result: food : 2
Result: NoToxicHorseMeat : 1
Result: Purrs4Peace : 1
Result: livemusic : 1
Result: VIPremium : 1
Result: Frome : 1
Result: SundayRoast : 1
Result: Millennials : 1
Result: HealthWithKier : 1
Result: LPs30DaysofGratitude : 1
Result: cooking : 1
Result: gameinsight : 1
Result: Countryfile : 1
Result: androidgames : 1

Apache Storm in Yahoo! Finance

Yahoo! Finance 是互联网领先的商业新闻和财务数据网站。它是 Yahoo! 的一部分,可以提供任何人均可访问的财务新闻、市场统计数据、国际市场数据和关于财务资源的其他信息。

若您是一位注册的 Yahoo! 用户,那么您可以定制 Yahoo! Finance 来利用它的某些服务内容。Yahoo! Finance API 用于查询 Yahoo! 上的财务数据。

此 API 显示的数据比实时数据延迟 15 分钟,每 1 分钟更新一次其数据库,用于访问当前的股票相关信息。现在我们来了解一个公司的实时场景,并了解如何在股票价值低于 100 时发出警报。

Spout Creation

喷口的目的是获取公司详细信息并将价格发送给螺栓。您可以使用以下程序代码来创建一个喷口。

Coding: YahooFinanceSpout.java

import java.util.*;
import java.io.*;
import java.math.BigDecimal;

//import yahoofinace packages
import yahoofinance.YahooFinance;
import yahoofinance.Stock;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;

public class YahooFinanceSpout implements IRichSpout {
   private SpoutOutputCollector collector;
   private boolean completed = false;
   private TopologyContext context;

   @Override
   public void open(Map conf, TopologyContext context, SpoutOutputCollector collector){
      this.context = context;
      this.collector = collector;
   }

   @Override
   public void nextTuple() {
      try {
         Stock stock = YahooFinance.get("INTC");
         BigDecimal price = stock.getQuote().getPrice();

         this.collector.emit(new Values("INTC", price.doubleValue()));
         stock = YahooFinance.get("GOOGL");
         price = stock.getQuote().getPrice();

         this.collector.emit(new Values("GOOGL", price.doubleValue()));
         stock = YahooFinance.get("AAPL");
         price = stock.getQuote().getPrice();

         this.collector.emit(new Values("AAPL", price.doubleValue()));
      } catch(Exception e) {}
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("company", "price"));
   }

   @Override
   public void close() {}

   public boolean isDistributed() {
      return false;
   }

   @Override
   public void activate() {}

   @Override
   public void deactivate() {}

   @Override
   public void ack(Object msgId) {}

   @Override
   public void fail(Object msgId) {}

   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }

}

Bolt Creation

此处,螺栓的目的是处理给定公司的价格(当价格低于 100 时)。它使用 Java Map 对象将截止价格限制警报设置为 true (当股票价格低于 100 时);否则为 false。完整的程序代码如下所示 −

Coding: PriceCutOffBolt.java

import java.util.HashMap;
import java.util.Map;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;

import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.tuple.Tuple;

public class PriceCutOffBolt implements IRichBolt {
   Map<String, Integer> cutOffMap;
   Map<String, Boolean> resultMap;

   private OutputCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.cutOffMap = new HashMap <String, Integer>();
      this.cutOffMap.put("INTC", 100);
      this.cutOffMap.put("AAPL", 100);
      this.cutOffMap.put("GOOGL", 100);

      this.resultMap = new HashMap<String, Boolean>();
      this.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      String company = tuple.getString(0);
      Double price = tuple.getDouble(1);

      if(this.cutOffMap.containsKey(company)){
         Integer cutOffPrice = this.cutOffMap.get(company);

         if(price < cutOffPrice) {
            this.resultMap.put(company, true);
         } else {
            this.resultMap.put(company, false);
         }
      }

      collector.ack(tuple);
   }

   @Override
   public void cleanup() {
      for(Map.Entry<String, Boolean> entry:resultMap.entrySet()){
         System.out.println(entry.getKey()+" : " + entry.getValue());
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("cut_off_price"));
   }

   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }

}

Submitting a Topology

这是 YahooFinanceSpout.java 和 PriceCutOffBolt.java 相互连接并产生拓扑的主应用程序。以下程序代码演示了如何提交拓扑。

Coding: YahooFinanceStorm.java

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;

public class YahooFinanceStorm {
   public static void main(String[] args) throws Exception{
      Config config = new Config();
      config.setDebug(true);

      TopologyBuilder builder = new TopologyBuilder();
      builder.setSpout("yahoo-finance-spout", new YahooFinanceSpout());

      builder.setBolt("price-cutoff-bolt", new PriceCutOffBolt())
         .fieldsGrouping("yahoo-finance-spout", new Fields("company"));

      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("YahooFinanceStorm", config, builder.createTopology());
      Thread.sleep(10000);
      cluster.shutdown();
   }
}

Building and Running the Application

完整的应用程序有三个 Java 代码。它们如下:

  1. YahooFinanceSpout.java

  2. PriceCutOffBolt.java

  3. YahooFinanceStorm.java

可以通过以下命令构建应用程序 −

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:”/path/to/yahoofinance/lib/*” *.java

可以通过以下命令运行应用程序 −

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:”/path/to/yahoofinance/lib/*”:.
YahooFinanceStorm

Output

输出将类似于以下内容 −

GOOGL : false
AAPL : false
INTC : true

Apache Storm - Applications

Apache Storm 框架支持当今许多最佳工业应用程序。本章将对 Storm 的一些最显著应用作一个非常简略的概述。

Klout

Klout 是一款应用程序,使用社交媒体分析来根据在线社交影响力对用户进行排名,通 Klout Score ,一个介于 1 和 100 之间的一个数值。Klout 使用 Apache Storm 内建的 Trident 抽象创建复杂的拓扑来传输数据流。

The Weather Channel

Weather Channel 使用 Storm 拓扑来采集天气数据。它已与 Twitter 合作来在 Twitter 和移动应用程序上实现气象信息广告。 OpenSignal 是一家专门进行无线覆盖映射的公司。 StormTagWeatherSignal 是 OpenSignal 创建的天气项目。StormTag 是一个附加在钥匙扣上的蓝牙天气站。该设备收集的气象数据将发送给 WeatherSignal 应用程序和 OpenSignal 服务器。

Telecom Industry

电信供应商每秒处理数百万通电话。他们对掉线和音质差的电话进行取证。通话详情记录以每秒百万条的速度流入,而 Apache Storm 实时处理这些记录并识别任何令人担忧的模式。Storm 分析可用于持续提高通话质量。