Apache Flume 简明教程

Apache Flume - Quick Guide

Apache Flume - Introduction

What is Flume?

Apache Flume 是一种工具/服务/数据收集机制,用于从各种来源收集、聚集和传输大量流数据(例如日志文件、事件等)到集中式数据存储。

Apache Flume is a tool/service/data ingestion mechanism for collecting aggregating and transporting large amounts of streaming data such as log files, events (etc…​) from various sources to a centralized data store.

Flume 是一款可靠性高、可分布且可配置的工具。它主要设计为将各种 Web 服务器的流数据(日志数据)复制到 HDFS。

Flume is a highly reliable, distributed, and configurable tool. It is principally designed to copy streaming data (log data) from various web servers to HDFS.

apache flume

Applications of Flume

假设一个电子商务 Web 应用程序想要分析特定区域的客户行为。为此,他们需要将可用的日志数据移至 Hadoop 进行分析。这里,Apache Flume 派上用场了。

Assume an e-commerce web application wants to analyze the customer behavior from a particular region. To do so, they would need to move the available log data in to Hadoop for analysis. Here, Apache Flume comes to our rescue.

Flume 用于以更高的速度将应用程序服务器生成日志数据移至 HDFS。

Flume is used to move the log data generated by application servers into HDFS at a higher speed.

Advantages of Flume

以下是使用 Flume 的优点:

Here are the advantages of using Flume −

  1. Using Apache Flume we can store the data in to any of the centralized stores (HBase, HDFS).

  2. When the rate of incoming data exceeds the rate at which data can be written to the destination, Flume acts as a mediator between data producers and the centralized stores and provides a steady flow of data between them.

  3. Flume provides the feature of contextual routing.

  4. The transactions in Flume are channel-based where two transactions (one sender and one receiver) are maintained for each message. It guarantees reliable message delivery.

  5. Flume is reliable, fault tolerant, scalable, manageable, and customizable.

Features of Flume

以下是 Flume 的一些显着特性−

Some of the notable features of Flume are as follows −

  1. Flume ingests log data from multiple web servers into a centralized store (HDFS, HBase) efficiently.

  2. Using Flume, we can get the data from multiple servers immediately into Hadoop.

  3. Along with the log files, Flume is also used to import huge volumes of event data produced by social networking sites like Facebook and Twitter, and e-commerce websites like Amazon and Flipkart.

  4. Flume supports a large set of sources and destinations types.

  5. Flume supports multi-hop flows, fan-in fan-out flows, contextual routing, etc.

  6. Flume can be scaled horizontally.

Apache Flume - Data Transfer In Hadoop

如我们所知, Big Data, 是一组无法使用传统计算技术处理的大型数据集。大数据经过分析后可得出有价值的结果。 Hadoop 是一个开源框架,允许使用简单的编程模型通过计算机集群在分布式环境中存储和处理大数据。

Big Data, as we know, is a collection of large datasets that cannot be processed using traditional computing techniques. Big Data, when analyzed, gives valuable results. Hadoop is an open-source framework that allows to store and process Big Data in a distributed environment across clusters of computers using simple programming models.

Streaming / Log Data

一般而言,要分析的大部分数据都是由各种数据源(如应用程序服务器、社交网站、云服务器和企业服务器)产生的。这些数据将采用 log filesevents 的形式。

Generally, most of the data that is to be analyzed will be produced by various data sources like applications servers, social networking sites, cloud servers, and enterprise servers. This data will be in the form of log files and events.

Log file − 通常,日志文件是一种列出操作系统中发生的事件/操作的 file 。例如,Web 服务器会在日志文件中列出向服务器发出的每个请求。

Log file − In general, a log file is a file that lists events/actions that occur in an operating system. For example, web servers list every request made to the server in the log files.

收集此类日志数据后,我们可以获取有关以下方面的信息:

On harvesting such log data, we can get information about −

  1. the application performance and locate various software and hardware failures.

  2. the user behavior and derive better business insights.

将数据导入 HDFS 系统的传统方法是使用 put 命令。我们来看看如何使用 put 命令。

The traditional method of transferring data into the HDFS system is to use the put command. Let us see how to use the put command.

HDFS put Command

处理日志数据的主要挑战在于将多台服务器产生的这些日志移到 Hadoop 环境中。

The main challenge in handling the log data is in moving these logs produced by multiple servers to the Hadoop environment.

Hadoop File System Shell 提供了将数据插入到 Hadoop 并从中读取数据的命令。您可以使用 put 命令将数据插入到 Hadoop 中,如下所示。

Hadoop File System Shell provides commands to insert data into Hadoop and read from it. You can insert data into Hadoop using the put command as shown below.

$ Hadoop fs –put /path of the required file  /path in HDFS where to save the file

Problem with put Command

我们可以使用 Hadoop 的 put 命令将数据从这些源传输到 HDFS。但它存在以下缺点:

We can use the put command of Hadoop to transfer data from these sources to HDFS. But, it suffers from the following drawbacks −

  1. Using put command, we can transfer only one file at a time while the data generators generate data at a much higher rate. Since the analysis made on older data is less accurate, we need to have a solution to transfer data in real time.

  2. If we use put command, the data is needed to be packaged and should be ready for the upload. Since the webservers generate data continuously, it is a very difficult task.

我们在这里需要一个解决方案,该解决方案可以克服 put 命令的缺点,并且可以将数据生成器中的“流式数据”传输到集中存储(尤其是 HDFS)中,且延迟较低。

What we need here is a solutions that can overcome the drawbacks of put command and transfer the "streaming data" from data generators to centralized stores (especially HDFS) with less delay.

Problem with HDFS

在 HDFS 中,文件以目录项的形式存在,并且在关闭文件之前,文件的长度将被视为零。例如,如果某个源正在向 HDFS 中写入数据,并且网络在操作期间中断(而没有关闭文件),那么写入该文件的数据将丢失。

In HDFS, the file exists as a directory entry and the length of the file will be considered as zero till it is closed. For example, if a source is writing data into HDFS and the network was interrupted in the middle of the operation (without closing the file), then the data written in the file will be lost.

因此,我们需要一个可靠、可配置且可维护的系统才能将日志数据传输到 HDFS 中。

Therefore we need a reliable, configurable, and maintainable system to transfer the log data into HDFS.

Note − 在 POSIX 文件系统中,每当我们访问某个文件(比如执行写操作)时,其他程序仍然可以读取此文件(至少是已保存的部分)。这是因为文件在关闭之前已存在于磁盘上。

Note − In POSIX file system, whenever we are accessing a file (say performing write operation), other programs can still read this file (at least the saved portion of the file). This is because the file exists on the disc before it is closed.

Available Solutions

要将来自各种来源的流式数据(日志文件、事件等)发送到 HDFS,我们可以使用以下工具:

To send streaming data (log files, events etc..,) from various sources to HDFS, we have the following tools available at our disposal −

Facebook’s Scribe

Scribe 是一种非常受欢迎的工具,用于聚合和流式传输日志数据。它的设计可以扩展到极大量的节点,并且对于网络和节点故障具有鲁棒性。

Scribe is an immensely popular tool that is used to aggregate and stream log data. It is designed to scale to a very large number of nodes and be robust to network and node failures.

Apache Kafka

Kafka 已由 Apache 软件基金会开发。它是一个开源消息代理。使用 Kafka,我们可以处理高吞吐量、低延迟的信息源。

Kafka has been developed by Apache Software Foundation. It is an open-source message broker. Using Kafka, we can handle feeds with high-throughput and low-latency.

Apache Flume

Apache Flume 是一个用于收集、聚合和传输大量流式数据(如日志数据、事件等)的工具/服务/数据提取机制,从各种 Web 服务到集中化数据存储。

Apache Flume is a tool/service/data ingestion mechanism for collecting aggregating and transporting large amounts of streaming data such as log data, events (etc…​) from various webserves to a centralized data store.

它是一个高度可靠、可分布和可配置的工具,主要旨在传输来自各种来源的流式数据到 HDFS。

It is a highly reliable, distributed, and configurable tool that is principally designed to transfer streaming data from various sources to HDFS.

在本教程中,我们将详细讨论如何使用 Flume,并使用一些示例。

In this tutorial, we will discuss in detail how to use Flume with some examples.

Apache Flume - Architecture

下图描述了 Flume 的基本架构。如示意图所示, data generators (如 Facebook、Twitter)会生成数据,这些数据由在这些数据上运行的各个 Flume agents 收集。此后, data collector (也是一个代理)会从代理中收集数据,这些数据会聚合并推送到 HDFS 或 HBase 等集中存储中。

The following illustration depicts the basic architecture of Flume. As shown in the illustration, data generators (such as Facebook, Twitter) generate data which gets collected by individual Flume agents running on them. Thereafter, a data collector (which is also an agent) collects the data from the agents which is aggregated and pushed into a centralized store such as HDFS or HBase.

flume architecture

Flume Event

eventFlume 中传输的数据的基本单位。它包含一个字节数组有效负载,该有效负载将随可选标题一起从源传输到目标。典型的 Flume 事件将具有以下结构 −

An event is the basic unit of the data transported inside Flume. It contains a payload of byte array that is to be transported from the source to the destination accompanied by optional headers. A typical Flume event would have the following structure −

flume event

Flume Agent

在 Flume 中, agent 是一个独立的守护进程 (JVM)。它从客户端或其他代理接收数据(事件)并将其转发到其下一个目的地(汇或代理)。Flume 可以有多个代理。下图表示 Flume Agent

An agent is an independent daemon process (JVM) in Flume. It receives the data (events) from clients or other agents and forwards it to its next destination (sink or agent). Flume may have more than one agent. Following diagram represents a Flume Agent

flume agent1

如该图所示,Flume 代理包含三个主要组件,即 sourcechannelsink

As shown in the diagram a Flume Agent contains three main components namely, source, channel, and sink.

Source

source 是代理的一个组件,它从数据生成器处接收数据并将其作为 Flume 事件的形式传输到一个或多个信道。

A source is the component of an Agent which receives data from the data generators and transfers it to one or more channels in the form of Flume events.

Apache Flume 支持多种类型的源,每个源都会从指定的数据生成器接收事件。

Apache Flume supports several types of sources and each source receives events from a specified data generator.

Example − Avro 源、Thrift 源、twitter 1% 源等。

Example − Avro source, Thrift source, twitter 1% source etc.

Channel

channel 是一个瞬态存储,它从源接收事件并在汇使用它们之前对其进行缓冲。它充当源和汇之间的桥梁。

A channel is a transient store which receives the events from the source and buffers them till they are consumed by sinks. It acts as a bridge between the sources and the sinks.

这些信道是完全事务性的,并且它们可以与任何数量的源和汇协同工作。

These channels are fully transactional and they can work with any number of sources and sinks.

Example − JDBC 信道、文件系统信道、内存信道等。

Example − JDBC channel, File system channel, Memory channel, etc.

Sink

sink 将数据存储到 HBase 和 HDFS 等集中存储中。它使用来自信道的数据(事件)并将其传递到目的地。汇的目的地可能是另一个代理或集中存储。

A sink stores the data into centralized stores like HBase and HDFS. It consumes the data (events) from the channels and delivers it to the destination. The destination of the sink might be another agent or the central stores.

Example − HDFS 汇

Example − HDFS sink

Note − 一个 Flume 代理可以有多源、多汇、多信道。我们在本教程的 Flume 配置章节中列出了所有受支持的源、汇和信道。

Note − A flume agent can have multiple sources, sinks and channels. We have listed all the supported sources, sinks, channels in the Flume configuration chapter of this tutorial.

Additional Components of Flume Agent

我们在上面讨论的是代理的基本组件。除此之外,我们还有更多组件在将事件从数据生成器传输到集中存储中起着至关重要的作用。

What we have discussed above are the primitive components of the agent. In addition to this, we have a few more components that play a vital role in transferring the events from the data generator to the centralized stores.

Interceptors

拦截器用于更改/检查在源和信道之间传输的 Flume 事件。

Interceptors are used to alter/inspect flume events which are transferred between source and channel.

Channel Selectors

在多个信道的情况下,这些用于确定要选择哪个信道来传输数据。信道选择器有两种类型 −

These are used to determine which channel is to be opted to transfer the data in case of multiple channels. There are two types of channel selectors −

  1. Default channel selectors − These are also known as replicating channel selectors they replicates all the events in each channel.

  2. Multiplexing channel selectors − These decides the channel to send an event based on the address in the header of that event.

Sink Processors

这些用于从所选汇组中调用特定的汇。它们用于为您的汇创建故障转移路径,或者在信道中跨多个汇负载平衡事件。

These are used to invoke a particular sink from the selected group of sinks. These are used to create failover paths for your sinks or load balance events across multiple sinks from a channel.

Apache Flume - Data Flow

Flume 是一个用于将日志数据移动到 HDFS 的框架。通常,事件和日志数据由日志服务器生成,并且这些服务器运行着 Flume 代理。这些代理从数据生成器中接收数据。

Flume is a framework which is used to move log data into HDFS. Generally events and log data are generated by the log servers and these servers have Flume agents running on them. These agents receive the data from the data generators.

通过名为“ Collector ”的中间节点收集这些代理中的数据。和代理一样,Flume 中可以有多个收集器。

The data in these agents will be collected by an intermediate node known as Collector. Just like agents, there can be multiple collectors in Flume.

最后,所有这些收集器中的数据都会被聚合起来并推送到集中存储,例如 HBase 或 HDFS。下图阐明了 Flume 中的数据流。

Finally, the data from all these collectors will be aggregated and pushed to a centralized store such as HBase or HDFS. The following diagram explains the data flow in Flume.

flume dataflow

Multi-hop Flow

在 Flume 中,可以有多个代理,而且在到达最终目的地之前,一个事件可能会通过多个代理。这称为“ multi-hop flow ”。

Within Flume, there can be multiple agents and before reaching the final destination, an event may travel through more than one agent. This is known as multi-hop flow.

Fan-out Flow

从一个源到多个通道的数据流称为“ fan-out flow ”。它有两种类型 −

The dataflow from one source to multiple channels is known as fan-out flow. It is of two types −

  1. Replicating − The data flow where the data will be replicated in all the configured channels.

  2. Multiplexing − The data flow where the data will be sent to a selected channel which is mentioned in the header of the event.

Fan-in Flow

将数据从许多源传输到一个通道的数据流称为“ fan-in flow ”。

The data flow in which the data will be transferred from many sources to one channel is known as fan-in flow.

Failure Handling

在 Flume 中,每个事件都会进行两个事务:一个在发送方,一个在接收方。发送方将事件发送到接收方。在收到数据后,接收方立即提交它自己的事务,并向发送方发送“已收到”信号。在收到信号后,发送方提交自己的事务。(在收到接收方的信号之前,发送方不会提交其事务。)

In Flume, for each event, two transactions take place: one at the sender and one at the receiver. The sender sends events to the receiver. Soon after receiving the data, the receiver commits its own transaction and sends a “received” signal to the sender. After receiving the signal, the sender commits its transaction. (Sender will not commit its transaction till it receives a signal from the receiver.)

Apache Flume - Environment

我们已经在上一章讨论了 Flume 的架构。在本章中,让我们看看如何下载并设置 Apache Flume。

We already discussed the architecture of Flume in the previous chapter. In this chapter, let us see how to download and setup Apache Flume.

在继续操作之前,您的系统中需要有 Java 环境。所以首先,请确保您的系统中已安装 Java。在本教程中的一些示例中,我们使用了 Hadoop HDFS(作为接收层)。因此,我们建议您和 Java 一起安装 Hadoop。要收集更多信息,请访问以下链接 − https://www.tutorialspoint.com/hadoop/hadoop_enviornment_setup.htm

Before proceeding further, you need to have a Java environment in your system. So first of all, make sure you have Java installed in your system. For some examples in this tutorial, we have used Hadoop HDFS (as sink). Therefore, we would recommend that you go install Hadoop along with Java. To collect more information, follow the link −https://www.tutorialspoint.com/hadoop/hadoop_enviornment_setup.htm

Installing Flume

首先,从网站 https://flume.apache.org/ 下载最新版本的 Apache Flume 软件。

First of all, download the latest version of Apache Flume software from the website https://flume.apache.org/.

Step 1

打开该网站。点击主页左侧的 download 链接。它将带您到 Apache Flume 的下载页面。

Open the website. Click on the download link on the left-hand side of the home page. It will take you to the download page of Apache Flume.

installing flume

Step 2

在“下载”页面中,您可以看到 Apache Flume 的二进制文件和源文件的链接。点击 apache-flume-1.6.0-bin.tar.gz 链接

In the Download page, you can see the links for binary and source files of Apache Flume. Click on the link apache-flume-1.6.0-bin.tar.gz

您将被重新定向到一个镜像列表,您可以通过点击其中任何一个镜像开始下载。同样,您可以通过点击 apache-flume-1.6.0-src.tar.gz 下载 Apache Flume 的源代码。

You will be redirected to a list of mirrors where you can start your download by clicking any of these mirrors. In the same way, you can download the source code of Apache Flume by clicking on apache-flume-1.6.0-src.tar.gz.

Step 3

在安装 HadoopHBase 和其他软件的安装目录所在的同一个目录中创建一个名为 Flume 的目录(如果您已经安装了任何软件),如下所示。

Create a directory with the name Flume in the same directory where the installation directories of Hadoop, HBase, and other software were installed (if you have already installed any) as shown below.

$ mkdir Flume

Step 4

解压已下载的 tar 文件,如下所示。

Extract the downloaded tar files as shown below.

$ cd Downloads/
$ tar zxvf apache-flume-1.6.0-bin.tar.gz
$ tar zxvf apache-flume-1.6.0-src.tar.gz

Step 5

将 apache- flume-1.6.0-bin.tar 文件的内容移动到前面创建的 Flume 目录中,如下所示。(假设我们在名为 Hadoop 的本地用户中创建了 Flume 目录。)

Move the content of apache-flume-1.6.0-bin.tar file to the Flume directory created earlier as shown below. (Assume we have created the Flume directory in the local user named Hadoop.)

$ mv apache-flume-1.6.0-bin.tar/* /home/Hadoop/Flume/

Configuring Flume

若要配置 Flume,我们必须修改三个文件,即 flume-env.sh, flumeconf.properties,bash.rc

To configure Flume, we have to modify three files namely, flume-env.sh, flumeconf.properties, and bash.rc.

Setting the Path / Classpath

.bashrc 文件中,为 Flume 设置主页文件夹、路径和类路径,如下所示。

In the .bashrc file, set the home folder, the path, and the classpath for Flume as shown below.

setting the path

conf Folder

如果你打开 Apache Flume 的 conf ,你将有以下四个文件 −

If you open the conf folder of Apache Flume, you will have the following four files −

  1. flume-conf.properties.template,

  2. flume-env.sh.template,

  3. flume-env.ps1.template, and

  4. log4j.properties.

conf folder

现在将

Now rename

  1. flume-conf.properties.template file as flume-conf.properties and

  2. flume-env.sh.template as flume-env.sh

flume-env.sh

打开 flume-env.sh 文件并将 JAVA_Home 设置到你在系统里安装 Java 的文件夹。

Open flume-env.sh file and set the JAVA_Home to the folder where Java was installed in your system.

flume env sh

Verifying the Installation

通过浏览 bin 文件夹并输入以下命令,确认 Apache Flume 的安装。

Verify the installation of Apache Flume by browsing through the bin folder and typing the following command.

$ ./flume-ng

如果你已经成功安装 Flume,你将得到一个如图所示的 Flume 帮助提示。

If you have successfully installed Flume, you will get a help prompt of Flume as shown below.

verifying the installation

Apache Flume - Configuration

安装 Flume 后,我们需要使用配置文件对其进行配置,配置文件是一个具有 key-value pairs 的 Java 属性文件。我们需要将值传递到文件中的键。

After installing Flume, we need to configure it using the configuration file which is a Java property file having key-value pairs. We need to pass values to the keys in the file.

在 Flume 配置文件中,我们需要 −

In the Flume configuration file, we need to −

  1. Name the components of the current agent.

  2. Describe/Configure the source.

  3. Describe/Configure the sink.

  4. Describe/Configure the channel.

  5. Bind the source and the sink to the channel.

通常我们可以在 Flume 中有多个代理。我们可以使用唯一名称来区分每个代理。并使用此名称,我们必须配置每个代理。

Usually we can have multiple agents in Flume. We can differentiate each agent by using a unique name. And using this name, we have to configure each agent.

Naming the Components

首先,您需要对组件命名/列出组件,例如代理的源、汇聚和通道,如下所示。

First of all, you need to name/list the components such as sources, sinks, and the channels of the agent, as shown below.

agent_name.sources = source_name
agent_name.sinks = sink_name
agent_name.channels = channel_name

Flume 支持各种源、汇聚和通道。它们列在下面的表格中。

Flume supports various sources, sinks, and channels. They are listed in the table given below.

Sources

Channels

Sinks

Avro SourceThrift SourceExec SourceJMS SourceSpooling Directory SourceTwitter 1% firehose SourceKafka SourceNetCat SourceSequence Generator SourceSyslog SourcesSyslog TCP SourceMultiport Syslog TCP SourceSyslog UDP SourceHTTP SourceStress SourceLegacy SourcesThrift Legacy SourceCustom SourceScribe Source

Memory ChannelJDBC ChannelKafka ChannelFile ChannelSpillable Memory ChannelPseudo Transaction Channel

HDFS SinkHive SinkLogger SinkAvro SinkThrift SinkIRC SinkFile Roll SinkNull SinkHBaseSinkAsyncHBaseSinkMorphlineSolrSinkElasticSearchSinkKite Dataset SinkKafka Sink

您可以使用其中的任何一个。例如,如果使用 Twitter 源通过内存通道将 Twitter 数据传输到 HDFS 收集器,且代理名称 ID 为 TwitterAgent ,则:

You can use any of them. For example, if you are transferring Twitter data using Twitter source through a memory channel to an HDFS sink, and the agent name id TwitterAgent, then

TwitterAgent.sources = Twitter
TwitterAgent.channels = MemChannel
TwitterAgent.sinks = HDFS

列出代理组件后,您必须通过向属性提供值来描述源、收集器和通道。

After listing the components of the agent, you have to describe the source(s), sink(s), and channel(s) by providing values to their properties.

Describing the Source

每个源将有一个单独的属性列表。名为“type”的属性是每个源的共有属性,用于指定我们正在使用的源类型。

Each source will have a separate list of properties. The property named “type” is common to every source, and it is used to specify the type of the source we are using.

除属性“type”外,还需要提供特定源的所有 required 属性的值才能对其进行配置,如下所示。

Along with the property “type”, it is needed to provide the values of all the required properties of a particular source to configure it, as shown below.

agent_name.sources. source_name.type = value
agent_name.sources. source_name.property2 = value
agent_name.sources. source_name.property3 = value

例如,如果我们考虑 twitter source ,则以下属性是我们必须为其提供值才能对其进行配置的属性。

For example, if we consider the twitter source, following are the properties to which we must provide values to configure it.

TwitterAgent.sources.Twitter.type = Twitter (type name)
TwitterAgent.sources.Twitter.consumerKey =
TwitterAgent.sources.Twitter.consumerSecret =
TwitterAgent.sources.Twitter.accessToken =
TwitterAgent.sources.Twitter.accessTokenSecret =

Describing the Sink

与源一样,每个收集器都将有一个单独的属性列表。名为“type”的属性是每个收集器的共有属性,用于指定我们正在使用的收集器类型。除属性“type”外,还需要提供特定收集器所有 required 属性的值才能对其进行配置,如下所示。

Just like the source, each sink will have a separate list of properties. The property named “type” is common to every sink, and it is used to specify the type of the sink we are using. Along with the property “type”, it is needed to provide values to all the required properties of a particular sink to configure it, as shown below.

agent_name.sinks. sink_name.type = value
agent_name.sinks. sink_name.property2 = value
agent_name.sinks. sink_name.property3 = value

例如,如果我们考虑 HDFS sink ,则以下属性是我们必须为其提供值才能对其进行配置的属性。

For example, if we consider HDFS sink, following are the properties to which we must provide values to configure it.

TwitterAgent.sinks.HDFS.type = hdfs (type name)
TwitterAgent.sinks.HDFS.hdfs.path = HDFS directory’s Path to store the data

Describing the Channel

Flume 提供了多种通道可以在源和收集器之间传输数据。因此,除了源和通道之外,还需要描述代理中使用的通道。

Flume provides various channels to transfer data between sources and sinks. Therefore, along with the sources and the channels, it is needed to describe the channel used in the agent.

要描述每个通道,您需要设置必需的属性,如下所示。

To describe each channel, you need to set the required properties, as shown below.

agent_name.channels.channel_name.type = value
agent_name.channels.channel_name. property2 = value
agent_name.channels.channel_name. property3 = value

例如,如果我们考虑 memory channel ,则以下属性是我们必须为其提供值才能对其进行配置的属性。

For example, if we consider memory channel, following are the properties to which we must provide values to configure it.

TwitterAgent.channels.MemChannel.type = memory (type name)

Binding the Source and the Sink to the Channel

由于通道连接了源和收集器,因此需要将两者都绑定到通道,如下所示。

Since the channels connect the sources and sinks, it is required to bind both of them to the channel, as shown below.

agent_name.sources.source_name.channels = channel_name
agent_name.sinks.sink_name.channels = channel_name

以下示例展示了如何将源和收集器绑定到通道。在此,我们考虑 twitter source, memory channel,HDFS sink

The following example shows how to bind the sources and the sinks to a channel. Here, we consider twitter source, memory channel, and HDFS sink.

TwitterAgent.sources.Twitter.channels = MemChannel
TwitterAgent.sinks.HDFS.channels = MemChannel

Starting a Flume Agent

配置完成后,我们必须启动 Flume 代理。执行方式如下:

After configuration, we have to start the Flume agent. It is done as follows −

$ bin/flume-ng agent --conf ./conf/ -f conf/twitter.conf
Dflume.root.logger=DEBUG,console -n TwitterAgent

其中:

where −

  1. agent − Command to start the Flume agent

  2. --conf ,-c<conf> − Use configuration file in the conf directory

  3. -f<file> − Specifies a config file path, if missing

  4. --name, -n <name> − Name of the twitter agent

  5. -D property =value − Sets a Java system property value.

Apache Flume - Fetching Twitter Data

使用 Flume,我们可以从各种服务获取数据并将其传输到集中化存储(HDFS 和 HBase)。本章介绍了如何从 Twitter 服务获取数据并使用 Apache Flume 将其存储在 HDFS 中。

Using Flume, we can fetch data from various services and transport it to centralized stores (HDFS and HBase). This chapter explains how to fetch data from Twitter service and store it in HDFS using Apache Flume.

如 Flume 体系结构中所述,Web 服务器生成日志数据,而这些数据由 Flume 中的代理收集。通道将这些数据缓冲到一个接收器中,而该接收器最终将其推送到集中化存储。

As discussed in Flume Architecture, a webserver generates log data and this data is collected by an agent in Flume. The channel buffers this data to a sink, which finally pushes it to centralized stores.

在本教程中提供的示例中,我们将创建一个应用程序并使用 Apache Flume 提供的实验性 Twitter 源从中获取推文。我们将使用内存通道缓冲这些推文,并使用 HDFS 接收器将这些推文推入 HDFS。

In the example provided in this chapter, we will create an application and get the tweets from it using the experimental twitter source provided by Apache Flume. We will use the memory channel to buffer these tweets and HDFS sink to push these tweets into the HDFS.

fetch data

要获取 Twitter 数据,我们必须执行以下步骤:

To fetch Twitter data, we will have to follow the steps given below −

  1. Create a twitter Application

  2. Install / Start HDFS

  3. Configure Flume

Creating a Twitter Application

要从 Twitter 获取推文,则需要创建一个 Twitter 应用程序。请按照以下步骤创建 Twitter 应用程序。

In order to get the tweets from Twitter, it is needed to create a Twitter application. Follow the steps given below to create a Twitter application.

Step 1

要创建 Twitter 应用程序,点击以下链接 https://apps.twitter.com/ 。登录你的 Twitter 账号。你将有一个 Twitter 应用程序管理窗口,你可以在该窗口创建、删除和管理 Twitter 应用程序。

To create a Twitter application, click on the following link https://apps.twitter.com/. Sign in to your Twitter account. You will have a Twitter Application Management window where you can create, delete, and manage Twitter Apps.

image::https://www.iokays.com/tutorialspoint/apache_flume/_images/application_management_window.jpg [应用程序管理窗口]

image::https://www.iokays.com/tutorialspoint/apache_flume/_images/application_management_window.jpg [Application Management window]

Step 2

单击 Create New App 按钮。您将重定向到一个窗口,您将在其中获取一个应用程序表单,您必须在其中填写详细信息才能创建该应用程序。填写网站地址时,请提供完整的 URL 模式,例如 http://example.com.

Click on the Create New App button. You will be redirected to a window where you will get an application form in which you have to fill in your details in order to create the App. While filling the website address, give the complete URL pattern, for example, http://example.com.

create an application

Step 3

填入详细信息,完成后接受 Developer Agreement ,单击页面底部的 Create your Twitter application button 。如果一切顺利,将使用给定的详细信息创建一个应用程序,如下所示。

Fill in the details, accept the Developer Agreement when finished, click on the Create your Twitter application button which is at the bottom of the page. If everything goes fine, an App will be created with the given details as shown below.

application created

Step 4

您可以在页面底部的 keys and Access Tokens 选项卡下,看到一个名为 Create my access token 的按钮。单击此按钮以生成访问令牌。

Under keys and Access Tokens tab at the bottom of the page, you can observe a button named Create my access token. Click on it to generate the access token.

key access tokens

Step 5

最后,单击页面右上角的 Test OAuth 按钮。这将转到一个页面,其中显示您的 Consumer key, Consumer secret, Access token,Access token secret 。复制这些详细信息。它们可用于配置 Flume 中的代理。

Finally, click on the Test OAuth button which is on the right side top of the page. This will lead to a page which displays your Consumer key, Consumer secret, Access token, and Access token secret. Copy these details. These are useful to configure the agent in Flume.

oauth tool

Starting HDFS

由于我们会将数据存储在 HDFS 中,因此我们需要安装/验证 Hadoop。启动 Hadoop并在其中创建一个文件夹以存储 Flume 数据。在配置 Flume 之前,请执行以下步骤。

Since we are storing the data in HDFS, we need to install / verify Hadoop. Start Hadoop and create a folder in it to store Flume data. Follow the steps given below before configuring Flume.

Step 1: Install / Verify Hadoop

安装 Hadoop 。如果您的系统中已安装 Hadoop,请使用 Hadoop 版本命令验证安装,如下所示。

Install Hadoop. If Hadoop is already installed in your system, verify the installation using Hadoop version command, as shown below.

$ hadoop version

如果您的系统中包含 Hadoop,并且您已设置路径变量,那么您将获得以下输出:

If your system contains Hadoop, and if you have set the path variable, then you will get the following output −

Hadoop 2.6.0
Subversion https://git-wip-us.apache.org/repos/asf/hadoop.git -r
e3496499ecb8d220fba99dc5ed4c99c8f9e33bb1
Compiled by jenkins on 2014-11-13T21:10Z
Compiled with protoc 2.5.0
From source with checksum 18e43357c8f927c0695f1e9522859d6a
This command was run using /home/Hadoop/hadoop/share/hadoop/common/hadoop-common-2.6.0.jar

Step 2: Starting Hadoop

浏览 Hadoop 的 sbin 目录,按如下所示启动 yarn 和 Hadoop dfs(分布式文件系统)。

Browse through the sbin directory of Hadoop and start yarn and Hadoop dfs (distributed file system) as shown below.

cd /$Hadoop_Home/sbin/
$ start-dfs.sh
localhost: starting namenode, logging to
   /home/Hadoop/hadoop/logs/hadoop-Hadoop-namenode-localhost.localdomain.out
localhost: starting datanode, logging to
   /home/Hadoop/hadoop/logs/hadoop-Hadoop-datanode-localhost.localdomain.out
Starting secondary namenodes [0.0.0.0]
starting secondarynamenode, logging to
   /home/Hadoop/hadoop/logs/hadoop-Hadoop-secondarynamenode-localhost.localdomain.out

$ start-yarn.sh
starting yarn daemons
starting resourcemanager, logging to
   /home/Hadoop/hadoop/logs/yarn-Hadoop-resourcemanager-localhost.localdomain.out
localhost: starting nodemanager, logging to
   /home/Hadoop/hadoop/logs/yarn-Hadoop-nodemanager-localhost.localdomain.out

Step 3: Create a Directory in HDFS

在 Hadoop DFS 中,可以使用命令 mkdir 创建目录。浏览它,并在所需路径中创建名称为 twitter_data 的目录,如下所示。

In Hadoop DFS, you can create directories using the command mkdir. Browse through it and create a directory with the name twitter_data in the required path as shown below.

$cd /$Hadoop_Home/bin/
$ hdfs dfs -mkdir hdfs://localhost:9000/user/Hadoop/twitter_data

Configuring Flume

我们必须使用 conf 文件夹中的配置文件配置源、通道和接收器。本章中给出的示例使用 Apache Flume 提供的实验性源,称为 Twitter 1% Firehose 内存通道和 HDFS 接收器。

We have to configure the source, the channel, and the sink using the configuration file in the conf folder. The example given in this chapter uses an experimental source provided by Apache Flume named Twitter 1% Firehose Memory channel and HDFS sink.

Twitter 1% Firehose Source

该源具有高度实验性。它使用流式 API 连接到 1% 示例 Twitter Firehose,并连续下载推文,将它们转换为 Avro 格式,并将 Avro 事件发送到下游的 Flume 接收器。

This source is highly experimental. It connects to the 1% sample Twitter Firehose using streaming API and continuously downloads tweets, converts them to Avro format, and sends Avro events to a downstream Flume sink.

安装 Flume 后,我们默认会得到该源。对应于该源的 jar 文件可以位于 lib 文件夹中,如下所示。

We will get this source by default along with the installation of Flume. The jar files corresponding to this source can be located in the lib folder as shown below.

twitter jarfiles

Setting the classpath

classpath 变量设置为 Flume-env.sh 文件中 Flume 的 lib 文件夹,如下所示。

Set the classpath variable to the lib folder of Flume in Flume-env.sh file as shown below.

export CLASSPATH=$CLASSPATH:/FLUME_HOME/lib/*

该源需要详细信息,如 Twitter 应用程序的 Consumer key, Consumer secret, Access token,Access token secret 。配置该源时,您必须向以下属性提供值 −

This source needs the details such as Consumer key, Consumer secret, Access token, and Access token secret of a Twitter application. While configuring this source, you have to provide values to the following properties −

  1. Channels

  2. Source type : org.apache.flume.source.twitter.TwitterSource

  3. consumerKey − The OAuth consumer key

  4. consumerSecret − OAuth consumer secret

  5. accessToken − OAuth access token

  6. accessTokenSecret − OAuth token secret

  7. maxBatchSize − Maximum number of twitter messages that should be in a twitter batch. The default value is 1000 (optional).

  8. maxBatchDurationMillis − Maximum number of milliseconds to wait before closing a batch. The default value is 1000 (optional).

Channel

我们正在使用内存通道。要配置内存通道,您必须向通道类型提供值。

We are using the memory channel. To configure the memory channel, you must provide value to the type of the channel.

  1. type − It holds the type of the channel. In our example, the type is MemChannel.

  2. Capacity − It is the maximum number of events stored in the channel. Its default value is 100 (optional).

  3. TransactionCapacity − It is the maximum number of events the channel accepts or sends. Its default value is 100 (optional).

HDFS Sink

该接收器将数据写入 HDFS。要配置该接收器,您必须提供以下详细信息。

This sink writes data into the HDFS. To configure this sink, you must provide the following details.

  1. Channel

  2. type − hdfs

  3. hdfs.path − the path of the directory in HDFS where data is to be stored.

我们可以根据场景提供一些可选值。以下是我们在应用程序中配置的 HDFS 接收器的可选属性。

And we can provide some optional values based on the scenario. Given below are the optional properties of the HDFS sink that we are configuring in our application.

  1. fileType − This is the required file format of our HDFS file. SequenceFile, DataStream and CompressedStream are the three types available with this stream. In our example, we are using the DataStream.

  2. writeFormat − Could be either text or writable.

  3. batchSize − It is the number of events written to a file before it is flushed into the HDFS. Its default value is 100.

  4. rollsize − It is the file size to trigger a roll. It default value is 100.

  5. rollCount − It is the number of events written into the file before it is rolled. Its default value is 10.

Example – Configuration File

以下是配置文件的一个示例。复制此内容,并将其保存为 Flume conf 文件夹中的 twitter.conf

Given below is an example of the configuration file. Copy this content and save as twitter.conf in the conf folder of Flume.

# Naming the components on the current agent.
TwitterAgent.sources = Twitter
TwitterAgent.channels = MemChannel
TwitterAgent.sinks = HDFS

# Describing/Configuring the source
TwitterAgent.sources.Twitter.type = org.apache.flume.source.twitter.TwitterSource
TwitterAgent.sources.Twitter.consumerKey = Your OAuth consumer key
TwitterAgent.sources.Twitter.consumerSecret = Your OAuth consumer secret
TwitterAgent.sources.Twitter.accessToken = Your OAuth consumer key access token
TwitterAgent.sources.Twitter.accessTokenSecret = Your OAuth consumer key access token secret
TwitterAgent.sources.Twitter.keywords = tutorials point,java, bigdata, mapreduce, mahout, hbase, nosql

# Describing/Configuring the sink

TwitterAgent.sinks.HDFS.type = hdfs
TwitterAgent.sinks.HDFS.hdfs.path = hdfs://localhost:9000/user/Hadoop/twitter_data/
TwitterAgent.sinks.HDFS.hdfs.fileType = DataStream
TwitterAgent.sinks.HDFS.hdfs.writeFormat = Text
TwitterAgent.sinks.HDFS.hdfs.batchSize = 1000
TwitterAgent.sinks.HDFS.hdfs.rollSize = 0
TwitterAgent.sinks.HDFS.hdfs.rollCount = 10000

# Describing/Configuring the channel
TwitterAgent.channels.MemChannel.type = memory
TwitterAgent.channels.MemChannel.capacity = 10000
TwitterAgent.channels.MemChannel.transactionCapacity = 100

# Binding the source and sink to the channel
TwitterAgent.sources.Twitter.channels = MemChannel
TwitterAgent.sinks.HDFS.channel = MemChannel

Execution

浏览 Flume 主目录,并如下所示执行应用程序。

Browse through the Flume home directory and execute the application as shown below.

$ cd $FLUME_HOME
$ bin/flume-ng agent --conf ./conf/ -f conf/twitter.conf
Dflume.root.logger=DEBUG,console -n TwitterAgent

如果一切顺利,开始向 HDFS 中流式传输推文。以下是获取推文时的命令提示符窗口的快照。

If everything goes fine, the streaming of tweets into HDFS will start. Given below is the snapshot of the command prompt window while fetching tweets.

fetching tweets

Verifying HDFS

可以使用以下 URL 访问 Hadoop 管理 Web UI。

You can access the Hadoop Administration Web UI using the URL given below.

http://localhost:50070/

点击页面右侧名为 Utilities 的下拉菜单。你可以看到两个选项,如下面的快照所示。

Click on the dropdown named Utilities on the right-hand side of the page. You can see two options as shown in the snapshot given below.

verifying the hdfs

单击 Browse the file system ,然后输入你在其中存储推文的 HDFS 目录路径。在我们的示例中,路径为 /user/Hadoop/twitter_data/ 。然后,你可以看到如下所示,存储在 HDFS 中的 Twitter 日志文件列表。

Click on Browse the file system and enter the path of the HDFS directory where you have stored the tweets. In our example, the path will be /user/Hadoop/twitter_data/. Then, you can see the list of twitter log files stored in HDFS as given below.

browse file system

Apache Flume - Sequence Generator Source

在上一章,我们已经了解到如何从 twitter 来源获取数据到 HDFS。本章将会讲解如何从 Sequence generator 获取数据。

In the previous chapter, we have seen how to fetch data from twitter source to HDFS. This chapter explains how to fetch data from Sequence generator.

Prerequisites

要运行本章提供的示例,你需要安装 HDFSFlume 。因此,在继续阅读后续内容之前,请确认 Hadoop 安装并启动 HDFS。(参考上一章来学习如何启动 HDFS)。

To run the example provided in this chapter, you need to install HDFS along with Flume. Therefore, verify Hadoop installation and start the HDFS before proceeding further. (Refer the previous chapter to learn how to start the HDFS).

Configuring Flume

我们必须使用 conf 文件夹中的配置文件来配置来源、通道和接收器。本章中的示例使用一个 sequence generator source 、一个 memory channel 和一个 HDFS sink

We have to configure the source, the channel, and the sink using the configuration file in the conf folder. The example given in this chapter uses a sequence generator source, a memory channel, and an HDFS sink.

Sequence Generator Source

它是不断产生事件的来源。它维护了一个从 0 开始并按 1 递增的计数器。它用于测试目的。在配置此来源时,你必须为以下属性提供值 −

It is the source that generates the events continuously. It maintains a counter that starts from 0 and increments by 1. It is used for testing purpose. While configuring this source, you must provide values to the following properties −

  1. Channels

  2. Source type − seq

Channel

我们正在使用 memory 通道。若要配置内存通道,您必须为通道类型提供值。以下是配置内存通道时您需要提供的属性列表 −

We are using the memory channel. To configure the memory channel, you must provide a value to the type of the channel. Given below are the list of properties that you need to supply while configuring the memory channel −

  1. type − It holds the type of the channel. In our example the type is MemChannel.

  2. Capacity − It is the maximum number of events stored in the channel. Its default value is 100. (optional)

  3. TransactionCapacity − It is the maximum number of events the channel accepts or sends. Its default is 100. (optional).

HDFS Sink

该接收器将数据写入 HDFS。要配置该接收器,您必须提供以下详细信息。

This sink writes data into the HDFS. To configure this sink, you must provide the following details.

  1. Channel

  2. type − hdfs

  3. hdfs.path − the path of the directory in HDFS where data is to be stored.

我们可以根据场景提供一些可选值。以下是我们在应用程序中配置的 HDFS 接收器的可选属性。

And we can provide some optional values based on the scenario. Given below are the optional properties of the HDFS sink that we are configuring in our application.

  1. fileType − This is the required file format of our HDFS file. SequenceFile, DataStream and CompressedStream are the three types available with this stream. In our example, we are using the DataStream.

  2. writeFormat − Could be either text or writable.

  3. batchSize − It is the number of events written to a file before it is flushed into the HDFS. Its default value is 100.

  4. rollsize − It is the file size to trigger a roll. It default value is 100.

  5. rollCount − It is the number of events written into the file before it is rolled. Its default value is 10.

Example – Configuration File

以下是配置文件的示例。复制此内容并将其保存为 Flume 的 conf 文件夹中的 seq_gen .conf

Given below is an example of the configuration file. Copy this content and save as seq_gen .conf in the conf folder of Flume.

# Naming the components on the current agent

SeqGenAgent.sources = SeqSource
SeqGenAgent.channels = MemChannel
SeqGenAgent.sinks = HDFS

# Describing/Configuring the source
SeqGenAgent.sources.SeqSource.type = seq

# Describing/Configuring the sink
SeqGenAgent.sinks.HDFS.type = hdfs
SeqGenAgent.sinks.HDFS.hdfs.path = hdfs://localhost:9000/user/Hadoop/seqgen_data/
SeqGenAgent.sinks.HDFS.hdfs.filePrefix = log
SeqGenAgent.sinks.HDFS.hdfs.rollInterval = 0
SeqGenAgent.sinks.HDFS.hdfs.rollCount = 10000
SeqGenAgent.sinks.HDFS.hdfs.fileType = DataStream

# Describing/Configuring the channel
SeqGenAgent.channels.MemChannel.type = memory
SeqGenAgent.channels.MemChannel.capacity = 1000
SeqGenAgent.channels.MemChannel.transactionCapacity = 100

# Binding the source and sink to the channel
SeqGenAgent.sources.SeqSource.channels = MemChannel
SeqGenAgent.sinks.HDFS.channel = MemChannel

Execution

浏览 Flume 主目录,并如下所示执行应用程序。

Browse through the Flume home directory and execute the application as shown below.

$ cd $FLUME_HOME
$./bin/flume-ng agent --conf $FLUME_CONF --conf-file $FLUME_CONF/seq_gen.conf
   --name SeqGenAgent

如果一切顺利,来源开始生成序列号,这些序列号将以日志文件形式推送到 HDFS。

If everything goes fine, the source starts generating sequence numbers which will be pushed into the HDFS in the form of log files.

以下是命令提示符窗口的截图,该窗口将序列号生成器生成的数据获取到 HDFS。

Given below is a snapshot of the command prompt window fetching the data generated by the sequence generator into the HDFS.

data generated

Verifying the HDFS

你可以使用以下 URL 访问 Hadoop 管理 Web UI: −

You can access the Hadoop Administration Web UI using the following URL −

http://localhost:50070/

在页面的右侧点击名为 Utilities 的下拉栏。如以下图表所示,你可以看到两个选项。

Click on the dropdown named Utilities on the right-hand side of the page. You can see two options as shown in the diagram given below.

verifying the hdfs

点击 Browse the file system 并输入已将序列号生成器生成的数据存储至 HDFS 目录的路径。

Click on Browse the file system and enter the path of the HDFS directory where you have stored the data generated by the sequence generator.

在我们的示例中,路径将为 /user/Hadoop/ seqgen_data / 。然后,您可以看到由序列发生器生成的日志文件列表,如下所示,该列表存储在 HDFS 中。

In our example, the path will be /user/Hadoop/ seqgen_data /. Then, you can see the list of log files generated by the sequence generator, stored in the HDFS as given below.

browse file system

Verifying the Contents of the File

所有这些日志文件都按顺序格式包含数字。您可以使用 cat 命令验证文件系统中这些文件的内容,如下所示。

All these log files contain numbers in sequential format. You can verify the contents of these file in the file system using the cat command as shown below.

verifying the contents of file

Apache Flume - NetCat Source

本章提供一个示例,说明如何生成事件并在随后将其记录到控制台。为此,我们使用了 NetCat 源和 logger 汇。

This chapter takes an example to explain how you can generate events and subsequently log them into the console. For this, we are using the NetCat source and the logger sink.

Prerequisites

要运行本章中提供的示例,您需要安装 Flume

To run the example provided in this chapter, you need to install Flume.

Configuring Flume

我们必须使用 conf 文件夹中的配置文件配置源、通道和汇。本章中给出的示例使用了 NetCat Source, Memory channellogger sink

We have to configure the source, the channel, and the sink using the configuration file in the conf folder. The example given in this chapter uses a NetCat Source, Memory channel, and a logger sink.

NetCat Source

在配置 NetCat 源时,配置源的同时我们必须指定一个端口。现在源(NetCat 源)侦听给定的端口,并将我们输入该端口的每行接收为单个事件,并通过指定通道将其传输到汇。

While configuring the NetCat source, we have to specify a port while configuring the source. Now the source (NetCat source) listens to the given port and receives each line we entered in that port as an individual event and transfers it to the sink through the specified channel.

在配置此源的同时,您必须为以下属性提供值−

While configuring this source, you have to provide values to the following properties −

  1. channels

  2. Source type − netcat

  3. bind − Host name or IP address to bind.

  4. port − Port number to which we want the source to listen.

Channel

我们正在使用 memory 通道。若要配置内存通道,您必须为通道类型提供值。以下是配置内存通道时您需要提供的属性列表 −

We are using the memory channel. To configure the memory channel, you must provide a value to the type of the channel. Given below are the list of properties that you need to supply while configuring the memory channel −

  1. type − It holds the type of the channel. In our example, the type is MemChannel.

  2. Capacity − It is the maximum number of events stored in the channel. Its default value is 100. (optional)

  3. TransactionCapacity − It is the maximum number of events the channel accepts or sends. Its default value is 100. (optional).

Logger Sink

此汇聚记录传递给它的所有事件。通常,它用于测试或调试目的。若要配置此汇聚,您必须提供以下详细信息。

This sink logs all the events passed to it. Generally, it is used for testing or debugging purpose. To configure this sink, you must provide the following details.

  1. Channel

  2. type − logger

Example Configuration File

下面给出了配置文件的一个示例。复制此内容并将其保存在 Flume 的 conf 文件夹中的 netcat.conf 中。

Given below is an example of the configuration file. Copy this content and save as netcat.conf in the conf folder of Flume.

# Naming the components on the current agent
NetcatAgent.sources = Netcat
NetcatAgent.channels = MemChannel
NetcatAgent.sinks = LoggerSink

# Describing/Configuring the source
NetcatAgent.sources.Netcat.type = netcat
NetcatAgent.sources.Netcat.bind = localhost
NetcatAgent.sources.Netcat.port = 56565

# Describing/Configuring the sink
NetcatAgent.sinks.LoggerSink.type = logger

# Describing/Configuring the channel
NetcatAgent.channels.MemChannel.type = memory
NetcatAgent.channels.MemChannel.capacity = 1000
NetcatAgent.channels.MemChannel.transactionCapacity = 100

# Bind the source and sink to the channel
NetcatAgent.sources.Netcat.channels = MemChannel
NetcatAgent.sinks. LoggerSink.channel = MemChannel

Execution

浏览 Flume 主目录,并如下所示执行应用程序。

Browse through the Flume home directory and execute the application as shown below.

$ cd $FLUME_HOME
$ ./bin/flume-ng agent --conf $FLUME_CONF --conf-file $FLUME_CONF/netcat.conf
   --name NetcatAgent -Dflume.root.logger=INFO,console

如果一切都正常,则源将开始侦听给定的端口。在此情况下,它是 56565 。下面给出了已启动并正在侦听端口 56565 的 NetCat 源的命令提示符窗口的快照。

If everything goes fine, the source starts listening to the given port. In this case, it is 56565. Given below is the snapshot of the command prompt window of a NetCat source which has started and listening to the port 56565.

execution

Passing Data to the Source

若要将数据传递到 NetCat 源,您必须打开配置文件中给出的端口。打开一个单独的终端并使用 curl 命令连接到源 (56565)。当连接成功后,您会收到一条 connected 消息,如下所示。

To pass data to NetCat source, you have to open the port given in the configuration file. Open a separate terminal and connect to the source (56565) using the curl command. When the connection is successful, you will get a message “connected” as shown below.

$ curl telnet://localhost:56565
connected

现在您可以逐行输入您的数据(在每一行后,您必须按 Enter)。NetCat 源将每行作为单独的事件接收,您将收到一条 OK 已接收消息。

Now you can enter your data line by line (after each line, you have to press Enter). The NetCat source receives each line as an individual event and you will get a received message “OK”.

每当您完成数据传递时,您都可以按 ( Ctrl+C ) 退出控制台。下面给出了我们使用 curl 命令连接到的源的控制台的快照。

Whenever you are done with passing data, you can exit the console by pressing (Ctrl+C). Given below is the snapshot of the console where we have connected to the source using the curl command.

passing data

在上述控制台中输入的每一行都将被源作为单独的事件接收。由于我们使用了 Logger 汇聚,这些事件将被记录到控制台(源控制台),通过指定的通道(在本例中为内存通道)。

Each line that is entered in the above console will be received as an individual event by the source. Since we have used the Logger sink, these events will be logged on to the console (source console) through the specified channel (memory channel in this case).

以下快照显示了 NetCat 控制台,事件被记录在那里。

The following snapshot shows the NetCat console where the events are logged.

netcat console