Logstash 简明教程

Logstash - Quick Guide

Logstash - Introduction

Logstash 是一种基于 filter/pipes 模式的工具,用于收集、处理和生成日志或事件。它有助于集中化实时分析来自不同来源的日志和事件。

Logstash is a tool based on the filter/pipes patterns for gathering, processing and generating the logs or events. It helps in centralizing and making real time analysis of logs and events from different sources.

Logstash 用 JRuby 编程语言编写,在 JVM 上运行,因此可以在不同平台上运行 Logstash。它从几乎所有类型的来源收集不同类型的数据,例如日志、分组、事件、事务、时间戳数据等。数据源可以是社交数据、电子商务、新闻文章、CRM、游戏数据、Web 趋势、财务数据、物联网、移动设备等。

Logstash is written on JRuby programming language that runs on the JVM, hence you can run Logstash on different platforms. It collects different types of data like Logs, Packets, Events, Transactions, Timestamp Data, etc., from almost every type of source. The data source can be Social data, E-commerce, News articles, CRM, Game data, Web trends, Financial data, Internet of Things, Mobile devices, etc.

Logstash General Features

Logstash 的一般功能如下 −

The general features of Logstash are as follows −

  1. Logstash can collect data from different sources and send to multiple destinations.

  2. Logstash can handle all types of logging data like Apache Logs, Windows Event Logs, Data over Network Protocols, Data from Standard Input and many more.

  3. Logstash can also handle http requests and response data.

  4. Logstash provides a variety of filters, which helps the user to find more meaning in the data by parsing and transforming it.

  5. Logstash can also be used for handling sensors data in internet of things.

  6. Logstash is open source and available under the Apache license version 2.0.

Logstash Key Concepts

Logstash 的关键概念如下 −

The key concepts of Logstash are as follows −

Event Object

它是 Logstash 中的主要对象,它封装了 Logstash 管道中的数据流。Logstash 使用此对象来存储输入数据和添加在过滤阶段创建的额外字段。

It is the main object in Logstash, which encapsulates the data flow in the Logstash pipeline. Logstash uses this object to store the input data and add extra fields created during the filter stage.

Logstash 为开发人员提供一个事件 API 来操作事件。在本教程中,此事件被称作各种名称,如日志数据事件、日志事件、日志数据、输入日志数据、输出日志数据等。

Logstash offers an Event API to developers to manipulate events. In this tutorial, this event is referred with various names like Logging Data Event, Log Event, Log Data, Input Log Data, Output Log Data, etc.

Pipeline

它包含 Logstash 从输入到输出的数据流阶段。输入数据输入管道,并以事件的形式进行处理。然后以用户或终端系统所需格式发送到输出目标。

It comprises of data flow stages in Logstash from input to output. The input data is entered in the pipeline and is processed in the form of an event. Then sends to an output destination in the user or end system’s desirable format.

Input

这是 Logstash 管道中的第一阶段,用于获取 Logstash 中的数据以进一步处理。Logstash 提供各种插件从不同的平台获取数据。一些最常用的插件包括文件、Syslog、Redis 和 Beats。

This is the first stage in the Logstash pipeline, which is used to get the data in Logstash for further processing. Logstash offers various plugins to get data from different platforms. Some of the most commonly used plugins are – File, Syslog, Redis and Beats.

Filter

这是 Logstash 的中间阶段,在此阶段发生实际的事件处理。开发人员可以使用 Logstash 预先定义的 Regex 模式,为事件中的字段之间的区别以及接受的输入事件的标准创建序列。

This is the middle stage of Logstash, where the actual processing of events take place. A developer can use pre-defined Regex Patterns by Logstash to create sequences for differentiating between the fields in the events and criteria for accepted input events.

Logstash 提供各种插件来帮助开发人员解析事件并将其转换成为所需的结构。一些最常用的过滤器插件包括 Grok、Mutate、Drop、Clone 和 Geoip。

Logstash offers various plugins to help the developer to parse and transform the events into a desirable structure. Some of the most commonly used filter plugins are – Grok, Mutate, Drop, Clone and Geoip.

Output

这是 Logstash 管道中的最后一个阶段,在此阶段可以将输出事件格式化为目标系统所需的结构。最后,它在完全处理后使用插件将输出事件发送到目标。一些最常用的插件包括 Elasticsearch、文件、Graphite、Statsd 等。

This is the last stage in the Logstash pipeline, where the output events can be formatted into the structure required by the destination systems. Lastly, it sends the output event after complete processing to the destination by using plugins. Some of the most commonly used plugins are – Elasticsearch, File, Graphite, Statsd, etc.

Logstash Advantages

以下几点解释了 Logstash 的各种优势。

The following points explain the various advantages of Logstash.

  1. Logstash offers regex pattern sequences to identify and parse the various fields in any input event.

  2. Logstash supports a variety of web servers and data sources for extracting logging data.

  3. Logstash provides multiple plugins to parse and transform the logging data into any user desirable format.

  4. Logstash is centralized, which makes it easy to process and collect data from different servers.

  5. Logstash supports many databases, network protocols and other services as a destination source for the logging events.

  6. Logstash uses the HTTP protocol, which enables the user to upgrade Elasticsearch versions without having to upgrade Logstash in a lock step.

Logstash Disadvantages

以下几点阐述了 Logstash 的各种劣势。

The following points explain the various disadvantages of Logstash.

  1. Logstash uses http, which negatively affects the processing of the logging data.

  2. Working with Logstash can sometimes be a little complex, as it needs a good understanding and analysis of the input logging data.

  3. Filter plugins are not generic, so, the user may need to find the correct sequence of patterns to avoid error in parsing.

在下一章中,我们将了解 ELK 堆栈是什么以及它如何帮助 Logstash。

In the next chapter, we will understand what the ELK Stack is and how it helps Logstash.

Logstash - ELK Stack

ELK 代表 Elasticsearch, Logstash,Kibana 。在 ELK 堆栈中,Logstash 从不同的输入源中提取日志数据或其他事件。它处理事件,然后将其存储在 Elasticsearch 中。Kibana 是一个 Web 界面,它从 Elasticsearch 中访问日志数据形式并对其进行可视化。

ELK stands for Elasticsearch, Logstash, and Kibana. In the ELK stack, Logstash extracts the logging data or other events from different input sources. It processes the events and later stores it in Elasticsearch. Kibana is a web interface, which accesses the logging data form Elasticsearch and visualizes it.

elk

Logstash and Elasticsearch

Logstash 提供输入和输出 Elasticsearch 插件,用于读写日志事件到 Elasticsearch。Elasticsearch 公司也建议将 Elasticsearch 作为输出目标,因为它与 Kibana 兼容。Logstash 通过 HTTP 协议将数据发送到 Elasticsearch。

Logstash provides input and output Elasticsearch plugin to read and write log events to Elasticsearch. Elasticsearch as an output destination is also recommended by Elasticsearch Company because of its compatibility with Kibana. Logstash sends the data to Elasticsearch over the http protocol.

Elasticsearch 提供批量上传功能,它有助于将来自不同源或 Logstash 实例的数据上传到集中式 Elasticsearch 引擎。ELK 在其他 DevOps 解决方案方面具有以下优势−

Elasticsearch provides bulk upload facility, which helps to upload the data from different sources or Logstash instances to a centralized Elasticsearch engine. ELK has the following advantages over other DevOps Solutions −

  1. ELK stack is easier to manage and can be scaled for handling petabytes of events.

  2. ELK stack architecture is very flexible and it provides integration with Hadoop. Hadoop is mainly used for archive purposes. Logstash can be directly connected to Hadoop by using flume and Elasticsearch provides a connector named es-hadoop to connect with Hadoop.

  3. ELK ownership total cost is much lesser than its alternatives.

Logstash and Kibana

Kibana 不直接与 Logstash 交互,而是通过数据源(在 ELK 堆栈中为 Elasticsearch)进行交互。Logstash 从每个源收集数据,Elasticsearch 以非常快的速度对其进行分析,然后 Kibana 对该数据提供可操作的见解。

Kibana does not interact with Logstash directly but through a data source, which is Elasticsearch in the ELK stack. Logstash collects the data from every source and Elasticsearch analyzes it at a very fast speed, then Kibana provides the actionable insights on that data.

Kibana 是一款基于 Web 的可视化工具,它帮助开发人员和其他人员分析 Logstash 在 Elasticsearch 引擎中收集的大量事件中的变化。此可视化功能使其能够轻松预测或查看输入源的错误或其他重要事件趋势的变化。

Kibana is a web based visualization tool, which helps developers and others to analyze the variations in large amounts of events collected by Logstash in Elasticsearch engine. This visualization makes it easy to predict or to see the changes in trends of errors or other significant events of the input source.

Logstash - Installation

要在系统上安装 Logstash,我们应遵循以下步骤−

To install Logstash on the system, we should follow the steps given below −

Step 1 − 检查计算机中安装的 Java 版本;它应为 Java 8,因为它与 Java 9 不兼容。您可以通过以下方式进行检查:

Step 1 − Check the version of your Java installed in your computer; it should be Java 8 because it is not compatible with Java 9. You can check this by −

在 Windows 操作系统 (OS) 中(使用命令提示符)−

In a Windows Operating System (OS) (using command prompt) −

> java -version

在 UNIX OS 中(使用终端)-

In UNIX OS (Using Terminal) −

$ echo $JAVA_HOME

Step 2 − 从以下位置下载 Logstash −

Step 2 − Download Logstash from −

  1. For Windows OS, download the ZIP file.

  2. For UNIX OS, download the TAR file.

  3. For Debian OS download the DEB file.

  4. For Red Hat and other Linux distributions, download the RPN file.

  5. APT and Yum utilities can also be used to install Logstash in many Linux distributions.

Step 3 − 安装 Logstash 的过程非常简单。让我们看看如何在不同平台上安装 Logstash。

Step 3 − The installation process for Logstash is very easy. Let’s see how you can install Logstash on different platforms.

Note − 安装文件夹中不要有任何空格或冒号。

Note − Do not put any whitespace or colon in the installation folder.

  1. Windows OS − Unzip the zip package and the Logstash is installed.

  2. UNIX OS − Extract the tar file in any location and the Logstash is installed.

$tar –xvf logstash-5.0.2.tar.gz

Using APT utility for Linux OS −

Using APT utility for Linux OS −

  1. Download and install the Public Signing Key −

$ wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo apt-key add -
  1. Save the repository definition −

$ echo "deb https://artifacts.elastic.co/packages/5.x/apt stable main" | sudo
   tee -a /etc/apt/sources.list.d/elastic-5.x.list
  1. Run update −

$ sudo apt-get update
  1. Now you can install by using the following command −

$ sudo apt-get install logstash

Using YUM utility for Debian Linux OS

Using YUM utility for Debian Linux OS

  1. Download and install the Public Signing Key −

$ rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch
  1. Add the following text in the file with the .repo suffix in your o “/etc/yum.repos.d/” directory. For example, logstash.repo

[logstash-5.x]
name = Elastic repository for 5.x packages
baseurl = https://artifacts.elastic.co/packages/5.x/yum
gpgcheck = 1
gpgkey = https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled = 1
autorefresh = 1
type = rpm-md
  1. You can now install Logstash by using the following command −

$ sudo yum install logstash

Step 4 − 前往 Logstash 主目录。在 bin 文件夹内,如果使用 Windows,则运行 elasticsearch.bat 文件,或者你同样可以通过命令提示符或终端来执行相同操作。在 UNIX 中运行 Logstash 文件。

Step 4 − Go to the Logstash home directory. Inside the bin folder, run the elasticsearch.bat file in case of windows or you can do the same using the command prompt and through the terminal. In UNIX, run the Logstash file.

我们需要指定输入源、输出源和可选项过滤器。为了验证安装,你可以使用标准输入流 (stdin) 作为输入源和标准输出流 (stdout) 作为输出源通过基本配置来运行它。你可以在命令行中使用 –e 选项指定配置。

We need to specify the input source, output source and optional filters. For verifying the installation, you can run it with the basic configuration by using a standard input stream (stdin) as the input source and a standard output stream (stdout) as the output source. You can specify the configuration in the command line also by using –e option.

In Windows −

In Windows −

> cd logstash-5.0.1/bin
> Logstash -e 'input { stdin { } } output { stdout {} }'

In Linux −

In Linux −

$ cd logstash-5.0.1/bin
$ ./logstash -e 'input { stdin { } } output { stdout {} }'

Note − 如果使用 Windows,你可能会收到一条指出 JAVA_HOME 未设置的错误消息。就此,请在环境变量中将其设置为“C:\Program Files\Java\jre1.8.0_111”或你安装 Java 的位置。

Note − in case of windows, you might get an error stating JAVA_HOME is not set. For this, please set it in environment variables to “C:\Program Files\Java\jre1.8.0_111” or the location where you installed java.

Step 5 - Logstash Web 界面的默认端口为 9600 至 9700,在 logstash-5.0.1\config\logstash.yml 中定义为 http.port ,它将选取给定范围内的第一个可用端口。

Step 5 − Default ports for Logstash web interface are 9600 to 9700 are defined in the logstash-5.0.1\config\logstash.yml as the http.port and it will pick up the first available port in the given range.

我们可以通过浏览 http://localhost:9600 来检查 Logstash 服务器是否启动和运行,或者端口是否不同,然后检查命令提示符或终端。我们可以看到分配的端口,如 “成功启动 Logstash API 终结点 {:port ⇒ 9600}”。它将返回一个 JSON 对象,其中包含有关已安装 Logstash 的以下信息 -

We can check if the Logstash server is up and running by browsing http://localhost:9600 or if the port is different and then please check the command prompt or terminal. We can see the assigned port as “Successfully started Logstash API endpoint {:port ⇒ 9600}. It will return a JSON object, which contains the information about the installed Logstash in the following way −

{
   "host":"manu-PC",
   "version":"5.0.1",
   "http_address":"127.0.0.1:9600",
   "build_date":"2016-11-11T22:28:04+00:00",
   "build_sha":"2d8d6263dd09417793f2a0c6d5ee702063b5fada",
   "build_snapshot":false
}

Logstash - Internal Architecture

在本章中,我们将讨论有关 Logstash 的内部架构和不同组件的内容。

In this chapter, we will discuss regarding the internal architecture and the different components of Logstash.

Logstash Service Architecture

Logstash 处理来自不同服务器和数据源日志,并作为发送器运行。发送器用于收集日志,并将其安装在每个输入源中。 Redis, KafkaRabbitMQ 等代理是用于为索引器保存数据的缓冲区,可能有多个代理作为故障转移实例。

Logstash processes logs from different servers and data sources and it behaves as the shipper. The shippers are used to collect the logs and these are installed in every input source. Brokers like Redis, Kafka or RabbitMQ are buffers to hold the data for indexers, there may be more than one brokers as failed over instances.

Lucene 等索引器用于对日志进行索引,以提高搜索性能,然后将输出存储在 Elasticsearch 或其他输出目标中。输出存储中的数据可供 Kibana 和其他可视化软件使用。

Indexers like Lucene are used to index the logs for better search performance and then the output is stored in Elasticsearch or other output destination. The data in output storage is available for Kibana and other visualization software.

logstash service architecture

Logstash Internal Architecture

Logstash 管道由三个组件组成 Input, FiltersOutput 。输入部分负责指定和访问输入数据源,例如 Apache Tomcat Server 的日志文件夹。

The Logstash pipeline consists of three components Input, Filters and Output. The input part is responsible to specify and access the input data source such as the log folder of the Apache Tomcat Server.

logstash internal architecture

Example to Explain the Logstash Pipeline

Logstash 配置文件包含有关 Logstash 三个组件的详细信息。在这种情况下,我们将创建一个名为 Logstash.conf 的文件。

The Logstash configuration file contains the details about the three components of Logstash. In this case, we are creating a file name called Logstash.conf.

以下配置从输入日志 “inlog.log” 中捕获数据,并将其写入输出日志 “outlog.log”,没有任何筛选器。

The following configuration captures data from an input log “inlog.log” and writes it to an output log “outlog.log” without any filters.

Logstash.conf

Logstash 配置文件仅使用输入插件从 inlog.log 文件复制数据,并使用输出插件将日志数据刷新到 outlog.log 文件。

The Logstash configuration file just copies the data from the inlog.log file using the input plugin and flushes the log data to outlog.log file using the output plugin.

input {
   file {
      path => "C:/tpwork/logstash/bin/log/inlog.log"
   }
}
output {
   file {
      path => "C:/tpwork/logstash/bin/log/outlog.log"
   }
}

Run Logstash

Logstash 使用 –f 选项指定配置文件。

Logstash uses –f option to specify the config file.

C:\logstash\bin> logstash –f logstash.conf

inlog.log

以下代码块显示了输入日志数据。

The following code block shows the input log data.

Hello tutorialspoint.com

outlog.log

Logstash 输出包含消息字段中的输入数据。Logstash 还向输出添加其他字段,例如时间戳、输入源的路径、版本、主机和标记。

The Logstash output contains the input data in message field. Logstash also adds other fields to the output like Timestamp, Path of the Input Source, Version, Host and Tags.

{
   "path":"C:/tpwork/logstash/bin/log/inlog1.log",
   "@timestamp":"2016-12-13T02:28:38.763Z",
   "@version":"1", "host":"Dell-PC",
   "message":" Hello tutorialspoint.com", "tags":[]
}

正如您所见,Logstash 的输出包含了输入日志提供的数据之外的内容。输出包含源路径、时间戳、版本、主机名和标记,这些用于表示额外的消息,例如错误。

As you can, the output of Logstash contains more than the data supplied through the input log. The output contains the Source Path, Timestamp, Version, Hostname and Tag, which are used to represent the extra messages like errors.

我们可以使用筛选器处理数据,并使其对我们的需求有用。在下一个示例中,我们使用筛选器获取数据,该数据将输出限制为仅包含动词(例如 GET 或 POST)后面跟 Unique Resource Identifier 的数据。

We can use filters to process the data and make its useful for our needs. In the next example, we are using filter to get the data, which restricts the output to only data with a verb like GET or POST followed by a Unique Resource Identifier.

Logstash.conf

在此 Logstash 配置中,我们添加了一个名为 grok 的筛选器来筛选输入数据。与模式序列输入日志匹配的输入日志事件仅会进入具有错误的输出目标。Logstash 在不匹配 grok 筛选器模式序列的输出事件中添加了一个名为 "_grokparsefailure" 的标记。

In this Logstash configuration, we add a filter named grok to filter out the input data. The input log event, which matches the pattern sequence input log, only get to the output destination with error. Logstash adds a tag named "_grokparsefailure" in the output events, which does not match the grok filter pattern sequence.

Logstash 为解析流行的服务器日志(如 Apache)提供了许多内置 regex 模式。这里使用的模式要求动词(例如 get、post 等),后面跟一个统一资源标识符。

Logstash offers many inbuilt regex patterns for parsing popular server logs like Apache. The pattern used here expects a verb like get, post, etc., followed by a uniform resource identifier.

input {
   file {
      path => "C:/tpwork/logstash/bin/log/inlog2.log"
   }
}
filter {
   grok {
      match => {"message" => "%{WORD:verb} %{URIPATHPARAM:uri}"}
   }
}
output {
   file {
      path => "C:/tpwork/logstash/bin/log/outlog2.log"
   }
}

Run Logstash

我们可以使用以下命令运行 Logstash。

We can run Logstash by using the following command.

C:\logstash\bin> logstash –f  Logstash.conf

inlog2.log

我们的输入文件包含两个由默认分隔符(即换行符分隔符)分隔的事件。第一个事件与 GROk 中指定的模式匹配,而第二个事件不匹配。

Our input file contains two events separated by default delimiter, i.e., new line delimiter. The first event matches the pattern specified in GROk and the second one does not.

GET /tutorialspoint/Logstash
Input 1234

outlog2.log

我们可以看到,第二个输出事件包含 "_grokparsefailure" 标记,因为它不匹配 grok 筛选器模式。用户还可以通过在输出插件中使用 ‘if’ 条件来删除输出中的这些不匹配的事件。

We can see that the second output event contains "_grokparsefailure" tag, because it does not match the grok filter pattern. The user can also remove these unmatched events in output by using the ‘if’ condition in the output plugin.

{
   "path":"C:/tpwork/logstash/bin/log/inlog2.log",
   "@timestamp":"2016-12-13T02:47:10.352Z","@version":"1","host":"Dell-PC","verb":"GET",
   "message":"GET /tutorialspoint/logstash", "uri":"/tutorialspoint/logstash", "tags":[]
}
{
   "path":"C:/tpwork/logstash/bin/log/inlog2.log",
   "@timestamp":"2016-12-13T02:48:12.418Z", "@version":"1", "host":"Dell-PC",
   "message":"t 1234\r", "tags":["_grokparsefailure"]
}

Logstash - Collecting Logs

使用 shippers 从不同的服务器或数据源中收集日志。Shipper 是 Logstash 中安装的一个实例,它可以访问服务器日志并将其发送到特定的输出位置。

Logs from different servers or data sources are collected using shippers. A shipper is an instance of Logstash installed in the server, which accesses the server logs and sends to specific output location.

它主要向 Elasticsearch 发送输出以进行存储。Logstash 从以下来源获取输入——

It mainly sends the output to the Elasticsearch for storage. Logstash takes input from the following sources −

  1. STDIN

  2. Syslog

  3. Files

  4. TCP/UDP

  5. Microsoft windows Eventlogs

  6. Websocket

  7. Zeromq

  8. Customized extensions

Collecting Logs Using Apache Tomcat 7 Server

在这个示例中,我们使用 file 输入插件收集了安装在 Windows 中的 Apache Tomcat 7 服务器的日志并将其发送到另一个日志。

In this example, we are collecting logs of Apache Tomcat 7 Server installed in windows using the file input plugin and sending them to the other log.

logstash.conf

在此处,Logstash 配置为访问本地安装的 Apache Tomcat 7 的访问日志。file 插件的路径设置中使用了正则表达式模式来从日志文件中获取数据。它在其名称中包含“access”,它添加了一个 apache 类型,这样有助于在集中目的地源中区分 apache 事件和其他事件。最后,输出事件将显示在 output.log 中。

Here, Logstash is configured to access the access log of Apache Tomcat 7 installed locally. A regex pattern is used in path setting of the file plugin to get the data from the log file. This contains “access” in its name and it adds an apache type, which helps in differentiating the apache events from the other in a centralized destination source. Finally, the output events will be shown in the output.log.

input {
   file {
      path => "C:/Program Files/Apache Software Foundation/Tomcat 7.0/logs/*access*"
      type => "apache"
   }
}
output {
   file {
      path => "C:/tpwork/logstash/bin/log/output.log"
   }
}

Run Logstash

我们可以使用以下命令运行 Logstash。

We can run Logstash by using the following command.

C:\logstash\bin> logstash –f  Logstash.conf

Apache Tomcat Log

访问 Apache Tomcat 服务器及其 Web 应用 ( http://localhost:8080 ) 以生成日志。Logstash 实时读取日志中更新的数据,并按配置文件中指定的格式将其存储到 output.log 中。

Access the Apache Tomcat Server and its web apps (http://localhost:8080) to generate logs. The updated data in the logs are read by Logstash in real time and stashed in output.log as specified in configuration file.

Apache Tomcat 根据日期生成一个新的访问日志文件,并将访问事件记录到其中。在本例中,它是在 Apache Tomcat 的 logs 目录中的 localhost_access_log.2016-12-24.txt。

Apache Tomcat generates a new access log file according to date and logs the access events there. In our case, it was localhost_access_log.2016-12-24.txt in the logs directory of Apache Tomcat.

0:0:0:0:0:0:0:1 - - [
   25/Dec/2016:18:37:00 +0800] "GET / HTTP/1.1" 200 11418
0:0:0:0:0:0:0:1 - munish [
   25/Dec/2016:18:37:02 +0800] "GET /manager/html HTTP/1.1" 200 17472
0:0:0:0:0:0:0:1 - - [
   25/Dec/2016:18:37:08 +0800] "GET /docs/ HTTP/1.1" 200 19373
0:0:0:0:0:0:0:1 - - [
   25/Dec/2016:18:37:10 +0800] "GET /docs/introduction.html HTTP/1.1" 200 15399

output.log

你可以在输出事件中看到,添加了一个类型字段,事件显示在消息字段中。

You can see in the output events, a type field is added and the event is present in the message field.

{
   "path":"C:/Program Files/Apache Software Foundation/Tomcat 7.0/logs/
   localhost_access_log.2016-12-25.txt",
   "@timestamp":"2016-12-25T10:37:00.363Z","@version":"1","host":"Dell-PC",
   "message":"0:0:0:0:0:0:0:1 - - [25/Dec/2016:18:37:00 +0800] \"GET /
   HTTP/1.1\" 200 11418\r","type":"apache","tags":[]
}
{
   "path":"C:/Program Files/Apache Software Foundation/Tomcat 7.0/logs/
   localhost_access_log.2016-12-25.txt","@timestamp":"2016-12-25T10:37:10.407Z",
   "@version":"1","host":"Dell-PC",
   "message":"0:0:0:0:0:0:0:1 - munish [25/Dec/2016:18:37:02 +0800] \"GET /
   manager/html HTTP/1.1\" 200 17472\r","type":"apache","tags":[]
}
{
   "path":"C:/Program Files/Apache Software Foundation/Tomcat 7.0/logs/
   localhost_access_log.2016-12-25.txt","@timestamp":"2016-12-25T10:37:10.407Z",
   "@version":"1","host":"Dell-PC",
   "message":"0:0:0:0:0:0:0:1 - - [25/Dec/2016:18:37:08 +0800] \"GET /docs/
   HTTP/1.1\" 200 19373\r","type":"apache","tags":[]
}
{
   "path":"C:/Program Files/Apache Software Foundation/Tomcat 7.0/logs/
   localhost_access_log.2016-12-25.txt","@timestamp":"2016-12-25T10:37:20.436Z",
   "@version":"1","host":"Dell-PC",
   "message":"0:0:0:0:0:0:0:1 - - [25/Dec/2016:18:37:10 +0800] \"GET /docs/
   introduction.html HTTP/1.1\" 200 15399\r","type":"apache","tags":[]
}

Collecting Logs Using STDIN Plugin

在本部分中,我们将讨论使用 STDIN Plugin 收集日志的另一个示例。

In this section, we will discuss another example of collecting logs using the STDIN Plugin.

logstash.conf

这是一个非常简单的示例,在其中 Logstash 正在读取用户在标准输入中输入的事件。在本例中,它是命令提示符,它存储在 output.log 文件中的事件。

It is a very simple example, where Logstash is reading the events entered by the user in a standard input. In our case, it is the command prompt, which stores the events in the output.log file.

input {
   stdin{}
}
output {
   file {
      path => "C:/tpwork/logstash/bin/log/output.log"
   }
}

Run Logstash

我们可以使用以下命令运行 Logstash。

We can run Logstash by using the following command.

C:\logstash\bin> logstash –f  Logstash.conf

在命令提示符中写入以下文本:

Write the following text in the command prompt −

用户输入了以下两行。Logstash 以分隔符设置分隔事件,其默认值为“\n”。用户可以通过更改 file 插件中的分隔符的值来进行更改。

The user entered the following two lines. Logstash separates the events by the delimiter setting and its value by default is ‘\n’. The user can change by changing the value of the delimiter in the file plugin.

Tutorialspoint.com welcomes you
Simply easy learning

output.log

以下代码块显示了输出日志数据。

The following code block shows the output log data.

{
   "@timestamp":"2016-12-25T11:41:16.518Z","@version":"1","host":"Dell-PC",
   "message":"tutrialspoint.com welcomes you\r","tags":[]
}
{
   "@timestamp":"2016-12-25T11:41:53.396Z","@version":"1","host":"Dell-PC",
   "message":"simply easy learning\r","tags":[]
}

Logstash - Supported Inputs

Logstash 支持来自不同来源的大量日志。它与下面说明的著名来源协同工作。

Logstash supports a huge range of logs from different sources. It is working with famous sources as explained below.

Collect Logs from Metrics

系统事件和其他时间活动记录在指标中。Logstash 可以从系统指标中访问日志,并使用过滤器对其进行处理。这有助于以自定义方式向用户显示事件的动态信息。指标会根据指标过滤器的 flush_interval setting 刷新,默认情况下,它设置为 5 秒。

System events and other time activities are recorded in metrics. Logstash can access the log from system metrics and process them using filters. This helps to show the user the live feed of the events in a customized manner. Metrics are flushed according to the flush_interval setting of metrics filter and by default; it is set to 5 seconds.

我们通过收集和分析运行 Logstash 的事件并显示命令提示符上的动态信息来追踪 Logstash 生成的测试指标。

We are tracking the test metrics generated by Logstash, by gathering and analyzing the events running through Logstash and showing the live feed on the command prompt.

logstash.conf

此配置包含 Logstash 提供的生成器插件,用于测试指标,并设置类型为“generated”以进行解析。在过滤阶段中,我们仅使用“if”语句处理具有生成类型行的语句。然后,指标插件计算米制设置中指定字段。指标插件会在 flush_interval 指定的每 5 秒内刷新计数。

This configuration contains a generator plugin, which is offered by Logstash for test metrics and set the type setting to “generated” for parsing. In the filtering phase, we are only processing the lines with a generated type by using the ‘if’ statement. Then, the metrics plugin counts the field specified in meter settings. The metrics plugin flushes the count after every 5 seconds specified in the flush_interval.

最后,使用格式化 codec plugin 将过滤器事件输出到标准输出(例如命令提示符)。编解码器插件使用 [events][rate_1m] 值在 1 分钟滑动窗口中输出每秒事件。

Lastly, output the filter events to a standard output like command prompt using the codec plugin for formatting. The Codec plugin is using [events][rate_1m] value to output the per second events in a 1-minute sliding window.

input {
   generator {
     	type => "generated"
   }
}
filter {
   if [type] == "generated" {
      metrics {
         meter => "events"
         add_tag => "metric"
      }
   }
}
output {
   # only emit events with the 'metric' tag
   if "metric" in [tags] {
      stdout {
         codec => line { format => "rate: %{[events][rate_1m]}"
      }
   }
}

Run Logstash

我们可以使用以下命令运行 Logstash。

We can run Logstash by using the following command.

>logsaths –f logstash.conf

stdout (command prompt)

rate: 1308.4
rate: 1308.4
rate: 1368.654529135342
rate: 1416.4796003951449
rate: 1464.974293984808
rate: 1523.3119444107458
rate: 1564.1602979542715
rate: 1610.6496496890895
rate: 1645.2184750334154
rate: 1688.7768007612485
rate: 1714.652283095914
rate: 1752.5150680019278
rate: 1785.9432934744932
rate: 1806.912181962126
rate: 1836.0070454626025
rate: 1849.5669494173826
rate: 1871.3814756851832
rate: 1883.3443123790712
rate: 1906.4879113216743
rate: 1925.9420717997118
rate: 1934.166137658981
rate: 1954.3176526556897
rate: 1957.0107444542625

Collect Logs from the Web Server

Web 服务器生成大量有关用户访问和错误的日志。Logstash 帮助使用输入插件从不同服务器中提取日志,并将其存储到集中位置。

Web servers generate a large number of logs regarding user access and errors. Logstash helps to extract the logs from different servers using input plugins and stash them in a centralized location.

我们正在从本地 Apache Tomcat 服务器的 stderr logs 中提取数据并将其存储在 output.log 中。

We are extracting the data from the stderr logs of the local Apache Tomcat Server and stashing it in the output.log.

logstash.conf

这个 Logstash 配置文件指示 Logstash 读取 apache 错误日志并添加一个名为“apache-error”的标签。通过使用文件输出插件,我们能够简单地将其发送到 output.log。

This Logstash configuration file directs Logstash to read apache error logs and add a tag named “apache-error”. We can simply send it to the output.log using the file output plugin.

input {
   file {
      path => "C:/Program Files/Apache Software Foundation/Tomcat 7.0 /logs/*stderr*"
      type => "apache-error"
   }
}
output {
   file {
      path => "C:/tpwork/logstash/bin/log/output.log"
   }
}

Run Logstash

我们可以使用以下命令运行 Logstash。

We can run Logstash by using the following command.

>Logstash –f Logstash.conf

Sample of Input log

这是示例 stderr log ,其在 Apache Tomcat 中发生服务器事件时生成。

This is the sample stderr log, which generates when the server events occur in Apache Tomcat.

C:\Program Files\Apache Software Foundation\Tomcat 7.0\logs\ tomcat7-stderr.2016-12-25.log

Dec 25, 2016 7:05:14 PM org.apache.coyote.AbstractProtocol start
INFO: Starting ProtocolHandler ["http-bio-9999"]
Dec 25, 2016 7:05:14 PM org.apache.coyote.AbstractProtocol start
INFO: Starting ProtocolHandler ["ajp-bio-8009"]
Dec 25, 2016 7:05:14 PM org.apache.catalina.startup.Catalina start
INFO: Server startup in 823 ms

output.log

{
   "path":"C:/Program Files/Apache Software Foundation/Tomcat 7.0/logs/
   tomcat7-stderr.2016-12-25.log","@timestamp":"2016-12-25T11:05:27.045Z",
   "@version":"1","host":"Dell-PC",
   "message":"Dec 25, 2016 7:05:14 PM org.apache.coyote.AbstractProtocol start\r",
   "type":"apache-error","tags":[]
}
{
   "path":"C:/Program Files/Apache Software Foundation/Tomcat 7.0/logs/
   tomcat7-stderr.2016-12-25.log","@timestamp":"2016-12-25T11:05:27.045Z",
   "@version":"1","host":"Dell-PC",
   "message":"INFO: Starting ProtocolHandler [
      \"ajp-bio-8009\"]\r","type":"apache-error","tags":[]
}
{
   "path":"C:/Program Files/Apache Software Foundation/Tomcat 7.0/logs/
   tomcat7-stderr.2016-12-25.log","@timestamp":"2016-12-25T11:05:27.045Z",
   "@version":"1","host":"Dell-PC",
   "message":"Dec 25, 2016 7:05:14 PM org.apache.catalina.startup.Catalina start\r",
   "type":"apache-error","tags":[]
}
{
   "path":"C:/Program Files/Apache Software Foundation/Tomcat 7.0/logs/
   tomcat7-stderr.2016-12-25.log","@timestamp":"2016-12-25T11:05:27.045Z",
   "@version":"1","host":"Dell-PC",
   "message":"INFO: Server startup in 823 ms\r","type":"apache-error","tags":[]
}

Collect Logs from Data sources

首先,让我们了解如何配置 MySQL 以进行日志记录。在 [mysqld] 下的 MySQL 数据库服务器的 my.ini file 中添加以下行。

To start with, let us understand how to Configure MySQL for logging. Add the following lines in my.ini file of the MySQL database server under [mysqld].

在 Windows 中,它保存在 MySQL 的安装目录内,位于:

In windows, it is present inside the installation directory of MySQL, which is in −

C:\wamp\bin\mysql\mysql5.7.11

在 UNIX 中,你可以在此处找到它:– /etc/mysql/my.cnf

In UNIX, you can find it in – /etc/mysql/my.cnf

general_log_file   = "C:/wamp/logs/queries.log"
general_log = 1

logstash.conf

在这个配置文件中,文件插件用于读取 MySQL 日志并将其写入 ouput.log。

In this config file, file plugin is used to read the MySQL log and write it to the ouput.log.

input {
   file {
      path => "C:/wamp/logs/queries.log"
   }
}
output {
   file {
      path => "C:/tpwork/logstash/bin/log/output.log"
   }
}

queries.log

这是 MySQL 数据库中执行查询所生成的日志。

This is the log generated by queries executed in the MySQL database.

2016-12-25T13:05:36.854619Z   2 Query		select * from test1_users
2016-12-25T13:05:51.822475Z    2 Query	select count(*) from users
2016-12-25T13:05:59.998942Z    2 Query         select count(*) from test1_users

output.log

{
   "path":"C:/wamp/logs/queries.log","@timestamp":"2016-12-25T13:05:37.905Z",
   "@version":"1","host":"Dell-PC",
   "message":"2016-12-25T13:05:36.854619Z    2 Query\tselect * from test1_users",
   "tags":[]
}
{
   "path":"C:/wamp/logs/queries.log","@timestamp":"2016-12-25T13:05:51.938Z",
   "@version":"1","host":"Dell-PC",
   "message":"2016-12-25T13:05:51.822475Z    2 Query\tselect count(*) from users",
   "tags":[]
}
{
   "path":"C:/wamp/logs/queries.log","@timestamp":"2016-12-25T13:06:00.950Z",
   "@version":"1","host":"Dell-PC",
   "message":"2016-12-25T13:05:59.998942Z    2 Query\tselect count(*) from test1_users",
   "tags":[]
}

Logstash - Parsing the Logs

Logstash 使用输入插件接收日志,然后使用过滤器插件解析和转换数据。根据输出目标中存在的系统执行日志的解析和转换。Logstash 解析日志数据并仅转发必需的字段。之后,这些字段会转换成目标系统兼容且易于理解的格式。

Logstash receives the logs using input plugins and then uses the filter plugins to parse and transform the data. The parsing and transformation of logs are performed according to the systems present in the output destination. Logstash parses the logging data and forwards only the required fields. Later, these fields are transformed into the destination system’s compatible and understandable form.

How to Parse the Logs?

日志解析是通过 GROK (知识图解)模式执行的,你可以在 Github 中找到这些模式:

Parsing of the logs is performed my using the GROK (Graphical Representation of Knowledge) patterns and you can find them in Github −

Logstash 将日志数据与一个指定的 GROK 模版或一个用于解析日志的模版序列(例如常用于 apache 日志的 "%{COMBINEDAPACHELOG}") 进行匹配。

Logstash matches the data of logs with a specified GROK Pattern or a pattern sequence for parsing the logs like "%{COMBINEDAPACHELOG}", which is commonly used for apache logs.

经解析的数据更加结构化,更便于搜索和执行查询。Logstash 在输入日志中搜索指定的 GROK 模版,并从日志中提取匹配的行。你可以使用 GROK 调试器来测试你的 GROK 模版。

The parsed data is more structured and easy to search and for performing queries. Logstash searches for the specified GROK patterns in the input logs and extracts the matching lines from the logs. You can use GROK debugger to test your GROK patterns.

GROK 模版的语法为 %{SYNTAX:SEMANTIC}。Logstash GROK 过滤器编写为以下形式:

The syntax for a GROK pattern is %{SYNTAX:SEMANTIC}. Logstash GROK filter is written in the following form −

%{PATTERN:FieldName}

%{PATTERN:FieldName}

其中,PATTERN 表示 GROK 模版,而字段名称是字段名称,表示输出中的经解析数据。

Here, PATTERN represents the GROK pattern and the fieldname is the name of the field, which represents the parsed data in the output.

例如,使用联机 GROK 调试器 https://grokdebugger.com/

For example, using online GROK debugger https://grokdebugger.com/

Input

日志中的一行示例错误:

A sample error line in a log −

[Wed Dec 07 21:54:54.048805 2016] [:error] [pid 1234:tid 3456829102]
   [client 192.168.1.1:25007] JSP Notice:  Undefined index: abc in
   /home/manu/tpworks/tutorialspoint.com/index.jsp on line 11

GROK Pattern Sequence

这个 GROK 模版序列与日志事件相匹配,该事件包括一个时间戳,之后是日志级别、进程 ID、事务 ID 和错误消息。

This GROK pattern sequence matches to the log event, which comprises of a timestamp followed by Log Level, Process Id, Transaction Id and an Error Message.

\[(%{DAY:day} %{MONTH:month} %{MONTHDAY} %{TIME} %{YEAR})\] \[.*:%{LOGLEVEL:loglevel}\]
   \[pid %{NUMBER:pid}:tid %{NUMBER:tid}\] \[client %{IP:clientip}:.*\]
   %{GREEDYDATA:errormsg}

output

输出以 JSON 格式显示。

The output is in JSON format.

{
   "day": [
      "Wed"
   ],
   "month": [
      "Dec"
   ],
   "loglevel": [
      "error"
   ],
   "pid": [
      "1234"
   ],
   "tid": [
      "3456829102"
   ],
   "clientip": [
      "192.168.1.1"
   ],
   "errormsg": [
      "JSP Notice:  Undefined index: abc in
      /home/manu/tpworks/tutorialspoint.com/index.jsp on line 11"
   ]
}

Logstash - Filters

Logstash 在输入和输出之间的管道中使用过滤器。Logstash 的过滤器测量、操作和创建事件,例如 Apache-Access 。许多过滤器插件用于管理 Logstash 中的事件。在这里,在 Logstash Aggregate Filter 的示例中,我们正在过滤数据库中每个 SQL 事务的持续时间并计算总时间。

Logstash uses filters in the middle of the pipeline between input and output. The filters of Logstash measures manipulate and create events like Apache-Access. Many filter plugins used to manage the events in Logstash. Here, in an example of the Logstash Aggregate Filter, we are filtering the duration every SQL transaction in a database and computing the total time.

Installing the Aggregate Filter Plugin

使用 Logstash 插件实用程序安装 Aggregate 过滤器插件。Logstash 插件是 bin folder 中 Logstash 中 windows 批处理文件。

Installing the Aggregate Filter Plugin using the Logstash-plugin utility. The Logstash-plugin is a batch file for windows in bin folder in Logstash.

>logstash-plugin install logstash-filter-aggregate

logstash.conf

在此配置中,对于 Initializing, Incrementing,generating 总交易持续时间(即 sql_duration )你可以看到三个“if”语句。聚合插件用于添加输入日志的每个事件中存在的 sql_duration。

In this configuration, you can see three ‘if’ statements for Initializing, Incrementing, and generating the total duration of transaction, i.e., the sql_duration. The aggregate plugin is used to add the sql_duration, present in every event of the input log.

input {
   file {
      path => "C:/tpwork/logstash/bin/log/input.log"
   }
}
filter {
   grok {
      match => [
         "message", "%{LOGLEVEL:loglevel} -
            %{NOTSPACE:taskid} - %{NOTSPACE:logger} -
            %{WORD:label}( - %{INT:duration:int})?"
      ]
   }
   if [logger] == "TRANSACTION_START" {
      aggregate {
         task_id => "%{taskid}"
         code => "map['sql_duration'] = 0"
         map_action => "create"
      }
   }
   if [logger] == "SQL" {
      aggregate {
         task_id => "%{taskid}"
         code => "map['sql_duration'] ||= 0 ;
            map['sql_duration'] += event.get('duration')"
      }
   }
   if [logger] == "TRANSACTION_END" {
      aggregate {
         task_id => "%{taskid}"
         code => "event.set('sql_duration', map['sql_duration'])"
         end_of_task => true
         timeout => 120
      }
   }
}
output {
   file {
      path => "C:/tpwork/logstash/bin/log/output.log"
   }
}

Run Logstash

我们可以使用以下命令运行 Logstash。

We can run Logstash by using the following command.

>logstash –f logstash.conf

input.log

以下代码块显示了输入日志数据。

The following code block shows the input log data.

INFO - 48566 - TRANSACTION_START - start
INFO - 48566 - SQL - transaction1 - 320
INFO - 48566 - SQL - transaction1 - 200
INFO - 48566 - TRANSACTION_END - end

output.log

如配置文件中所述,最后的“if”语句位于 logger – TRANSACTION_END,它会打印总交易时间或 sql_duration。这在 output.log 中以黄色突出显示。

As specified in the configuration file, the last ‘if’ statement where the logger is – TRANSACTION_END, which prints the total transaction time or sql_duration. This has been highlighted in yellow color in the output.log.

{
   "path":"C:/tpwork/logstash/bin/log/input.log","@timestamp": "2016-12-22T19:04:37.214Z",
   "loglevel":"INFO","logger":"TRANSACTION_START","@version": "1","host":"wcnlab-PC",
   "message":"8566 - TRANSACTION_START - start\r","tags":[]
}
{
   "duration":320,"path":"C:/tpwork/logstash/bin/log/input.log",
   "@timestamp":"2016-12-22T19:04:38.366Z","loglevel":"INFO","logger":"SQL",
   "@version":"1","host":"wcnlab-PC","label":"transaction1",
   "message":" INFO - 48566 - SQL - transaction1 - 320\r","taskid":"48566","tags":[]
}
{
   "duration":200,"path":"C:/tpwork/logstash/bin/log/input.log",
   "@timestamp":"2016-12-22T19:04:38.373Z","loglevel":"INFO","logger":"SQL",
   "@version":"1","host":"wcnlab-PC","label":"transaction1",
   "message":" INFO - 48566 - SQL - transaction1 - 200\r","taskid":"48566","tags":[]
}
{
   "sql_duration":520,"path":"C:/tpwork/logstash/bin/log/input.log",
   "@timestamp":"2016-12-22T19:04:38.380Z","loglevel":"INFO","logger":"TRANSACTION_END",
   "@version":"1","host":"wcnlab-PC","label":"end",
   "message":" INFO - 48566 - TRANSACTION_END - end\r","taskid":"48566","tags":[]
}

Logstash - Transforming the Logs

Logstash 提供各种插件来转换已解析的日志。这些插件可以 Add, Delete,Update 日志中的字段,以便在输出系统中更好地理解和查询。

Logstash offers various plugins to transform the parsed log. These plugins can Add, Delete, and Update fields in the logs for better understanding and querying in the output systems.

我们正在使用 Mutate Plugin 为输入日志的每一行添加用户字段名称。

We are using the Mutate Plugin to add a field name user in every line of the input log.

Install the Mutate Filter Plugin

要安装 mutate 过滤器插件,我们可以使用以下命令。

To install the mutate filter plugin; we can use the following command.

>Logstash-plugin install Logstash-filter-mutate

logstash.conf

在此配置文件中,Mutate 插件在 Aggregate 插件之后添加,以添加一个新字段。

In this config file, the Mutate Plugin is added after the Aggregate Plugin to add a new field.

input {
   file {
      path => "C:/tpwork/logstash/bin/log/input.log"
   }
}
filter {
   grok {
      match => [ "message", "%{LOGLEVEL:loglevel} -
         %{NOTSPACE:taskid} - %{NOTSPACE:logger} -
         %{WORD:label}( - %{INT:duration:int})?" ]
   }
   if [logger] == "TRANSACTION_START" {
      aggregate {
         task_id => "%{taskid}"
         code => "map['sql_duration'] = 0"
         map_action => "create"
      }
   }
   if [logger] == "SQL" {
      aggregate {
         task_id => "%{taskid}"
         code => "map['sql_duration'] ||= 0 ;
            map['sql_duration'] += event.get('duration')"
      }
   }
   if [logger] == "TRANSACTION_END" {
      aggregate {
         task_id => "%{taskid}"
         code => "event.set('sql_duration', map['sql_duration'])"
         end_of_task => true
         timeout => 120
      }
   }
   mutate {
      add_field => {"user" => "tutorialspoint.com"}
   }
}
output {
   file {
      path => "C:/tpwork/logstash/bin/log/output.log"
   }
}

Run Logstash

我们可以使用以下命令运行 Logstash。

We can run Logstash by using the following command.

>logstash –f logstash.conf

input.log

以下代码块显示了输入日志数据。

The following code block shows the input log data.

INFO - 48566 - TRANSACTION_START - start
INFO - 48566 - SQL - transaction1 - 320
INFO - 48566 - SQL - transaction1 - 200
INFO - 48566 - TRANSACTION_END - end

output.log

你可以看到输出事件中有一个新的字段“user”。

You can see that there is a new field named “user” in the output events.

{
   "path":"C:/tpwork/logstash/bin/log/input.log",
   "@timestamp":"2016-12-25T19:55:37.383Z",
   "@version":"1",
   "host":"wcnlab-PC",
   "message":"NFO - 48566 - TRANSACTION_START - start\r",
   "user":"tutorialspoint.com","tags":["_grokparsefailure"]
}
{
   "duration":320,"path":"C:/tpwork/logstash/bin/log/input.log",
   "@timestamp":"2016-12-25T19:55:37.383Z","loglevel":"INFO","logger":"SQL",
   "@version":"1","host":"wcnlab-PC","label":"transaction1",
   "message":" INFO - 48566 - SQL - transaction1 - 320\r",
   "user":"tutorialspoint.com","taskid":"48566","tags":[]
}
{
   "duration":200,"path":"C:/tpwork/logstash/bin/log/input.log",
   "@timestamp":"2016-12-25T19:55:37.399Z","loglevel":"INFO",
   "logger":"SQL","@version":"1","host":"wcnlab-PC","label":"transaction1",
   "message":" INFO - 48566 - SQL - transaction1 - 200\r",
   "user":"tutorialspoint.com","taskid":"48566","tags":[]
}
{
   "sql_duration":520,"path":"C:/tpwork/logstash/bin/log/input.log",
   "@timestamp":"2016-12-25T19:55:37.399Z","loglevel":"INFO",
   "logger":"TRANSACTION_END","@version":"1","host":"wcnlab-PC","label":"end",
   "message":" INFO - 48566 - TRANSACTION_END - end\r",
   "user":"tutorialspoint.com","taskid":"48566","tags":[]
}

Logstash - Output Stage

输出是 Logstash 管道中的最后阶段,它将过滤器数据从输入日志发送到指定的目标。Logstash 提供多个输出插件,以便将经过过滤的日志事件存储到各种不同的存储和搜索引擎。

Output is the last stage in Logstash pipeline, which send the filter data from input logs to a specified destination. Logstash offers multiple output plugins to stash the filtered log events to various different storage and searching engines.

Storing Logs

Logstash 可以将过滤日志存储到 File, Elasticsearch Engine, stdout, AWS CloudWatch, 等网络协议(例如 TCP, UDP, Websocket )也可以在 Logstash 中用于将日志事件传输到远程存储系统。

Logstash can store the filtered logs in a File, Elasticsearch Engine, stdout, AWS CloudWatch, etc. Network protocols like TCP, UDP, Websocket can also be used in Logstash for transferring the log events to remote storage systems.

在 ELK 栈中,用户使用 Elasticsearch 引擎存储日志事件。在这里,在以下示例中,我们将为本地 Elasticsearch 引擎生成日志事件。

In ELK stack, users use the Elasticsearch engine to store the log events. Here, in the following example, we will generate log events for a local Elasticsearch engine.

Installing the Elasticsearch Output Plugin

我们可以使用以下命令安装 Elasticsearch 输出插件。

We can install the Elasticsearch output plugin with the following command.

>logstash-plugin install Logstash-output-elasticsearch

logstash.conf

此配置文件包含 Elasticsearch 插件,该插件将输出事件存储在本地安装的 Elasticsearch 中。

This config file contains an Elasticsearch plugin, which stores the output event in Elasticsearch installed locally.

input {
   file {
      path => "C:/tpwork/logstash/bin/log/input.log"
   }
}
filter {
   grok {
      match => [ "message", "%{LOGLEVEL:loglevel} -
      %{NOTSPACE:taskid} - %{NOTSPACE:logger} -
      %{WORD:label}( - %{INT:duration:int})?" ]
   }
   if [logger] == "TRANSACTION_START" {
      aggregate {
         task_id => "%{taskid}"
         code => "map['sql_duration'] = 0"
         map_action => "create"
      }
   }
   if [logger] == "SQL" {
      aggregate {
         task_id => "%{taskid}"
         code => "map['sql_duration'] ||= 0 ;
            map['sql_duration'] += event.get('duration')"
      }
   }
   if [logger] == "TRANSACTION_END" {
      aggregate {
         task_id => "%{taskid}"
         code => "event.set('sql_duration', map['sql_duration'])"
         end_of_task => true
         timeout => 120
      }
   }
   mutate {
      add_field => {"user" => "tutorialspoint.com"}
   }
}
output {
   elasticsearch {
      hosts => ["127.0.0.1:9200"]
   }
}

Input.log

以下代码块显示了输入日志数据。

The following code block shows the input log data.

INFO - 48566 - TRANSACTION_START - start
INFO - 48566 - SQL - transaction1 - 320
INFO - 48566 - SQL - transaction1 - 200
INFO - 48566 - TRANSACTION_END - end

Start Elasticsearch at Localhost

要在本地主机上启动 Elasticsearch,你应该使用以下命令。

To start Elasticsearch at the localhost, you should use the following command.

C:\elasticsearch\bin> elasticsearch

一旦 Elasticsearch 准备好,你可以通过在浏览器中键入以下 URL 进行检查。

Once Elasticsearch is ready, you can check it by typing the following URL in your browser.

Response

以下代码段显示了本地主机上 Elasticsearch 的响应。

The following code block shows the response of Elasticsearch at localhost.

{
   "name" : "Doctor Dorcas",
   "cluster_name" : "elasticsearch",
   "version" : {
      "number" : "2.1.1",
      "build_hash" : "40e2c53a6b6c2972b3d13846e450e66f4375bd71",
      "build_timestamp" : "2015-12-15T13:05:55Z",
      "build_snapshot" : false,
      "lucene_version" : "5.3.1"
   },
   "tagline" : "You Know, for Search"
}

Note − 有关 Elasticsearch 的更多信息,您可以单击以下链接。

Note − For more information about Elasticsearch, you can click on the following link.

现在,使用上述 Logstash.conf 运行 Logstash

Now, run Logstash with the above-mentioned Logstash.conf

>Logstash –f Logstash.conf

在输出日志中粘贴上述文本后,Logstash 会将该文本存储在 Elasticsearch 中。您可以通过在浏览器中键入以下 URL 检查存储的数据。

After pasting the above-mentioned text in the output log, that text will be stored in Elasticsearch by Logstash. You can check the stored data by typing the following URL in the browser.

Response

它是以 JSON 格式存储在索引 Logstash-2017.01.01 中的数据。

It is the data in JSON format stored in index Logstash-2017.01.01.

{
   "took" : 20,
   "timed_out" : false,
   "_shards" : {
      "total" : 5,
      "successful" : 5,
      "failed" : 0
   },
   "hits" : {
      "total" : 10,
      "max_score" : 1.0,
      "hits" : [ {
         "_index" : "logstash-2017.01.01",
         "_type" : "logs",
         "_id" : "AVlZ9vF8hshdrGm02KOs",
         "_score" : 1.0,
         "_source":{
            "duration":200,"path":"C:/tpwork/logstash/bin/log/input.log",
            "@timestamp":"2017-01-01T12:17:49.140Z","loglevel":"INFO",
            "logger":"SQL","@version":"1","host":"wcnlab-PC",
            "label":"transaction1",
            "message":" INFO - 48566 - SQL - transaction1 - 200\r",
            "user":"tutorialspoint.com","taskid":"48566","tags":[]
         }
      },
      {
         "_index" : "logstash-2017.01.01",
         "_type" : "logs",
         "_id" : "AVlZ9vF8hshdrGm02KOt",
         "_score" : 1.0,
         "_source":{
            "sql_duration":520,"path":"C:/tpwork/logstash/bin/log/input.log",
            "@timestamp":"2017-01-01T12:17:49.145Z","loglevel":"INFO",
            "logger":"TRANSACTION_END","@version":"1","host":"wcnlab-PC",
            "label":"end",
            "message":" INFO - 48566 - TRANSACTION_END - end\r",
            "user":"tutorialspoint.com","taskid":"48566","tags":[]
         }
      }
   }
}

Logstash - Supported Outputs

Logstash 提供多个插件来支持各种数据存储或搜索引擎。日志的输出事件可以发送到输出文件、标准输出或 Elasticsearch 等搜索引擎。Logstash 中有三种类型的受支持输出,它们是:

Logstash provides multiple Plugins to support various data stores or search engines. The output events of logs can be sent to an output file, standard output or a search engine like Elasticsearch. There are three types of supported outputs in Logstash, which are −

  1. Standard Output

  2. File Output

  3. Null Output

让我们现在详细讨论每一个这些。

Let us now discuss each of these in detail.

Standard Output (stdout)

它用于将经过筛选的日志事件生成为命令行界面的数据流。以下是生成数据库事务的总持续时间到 stdout 的示例。

It is used for generating the filtered log events as a data stream to the command line interface. Here is an example of generating the total duration of a database transaction to stdout.

logstash.conf

此配置文件包含一个 stdout 输出插件,用于将总 sql_duration 写入标准输出。

This config file contains a stdout output plugin to write the total sql_duration to a standard output.

input {
   file {
      path => "C:/tpwork/logstash/bin/log/input.log"
   }
}
filter {
   grok {
      match => [
         "message", "%{LOGLEVEL:loglevel} - %{NOTSPACE:taskid}
            - %{NOTSPACE:logger} - %{WORD:label}( - %{INT:duration:int})?"
      ]
   }
   if [logger] == "TRANSACTION_START" {
      aggregate {
         task_id => "%{taskid}"
         code => "map['sql_duration'] = 0"
         map_action => "create"
      }
   }
   if [logger] == "SQL" {
      aggregate {
         task_id => "%{taskid}"
         code => "map['sql_duration'] ||= 0 ;
            map['sql_duration'] += event.get('duration')"
      }
   }
   if [logger] == "TRANSACTION_END" {
      aggregate {
         task_id => "%{taskid}"
         code => "event.set('sql_duration', map['sql_duration'])"
         end_of_task => true
         timeout => 120
      }
   }
}
output {
   if [logger] == "TRANSACTION_END" {
      stdout {
         codec => line{format => "%{sql_duration}"}
      }
   }
}

Note − 请安装聚合过滤器,如果尚未安装的话。

Note − Please install the aggregate filter, if not installed already.

>logstash-plugin install Logstash-filter-aggregate

Run Logstash

我们可以使用以下命令运行 Logstash。

We can run Logstash by using the following command.

>logstash –f logsatsh.conf

Input.log

以下代码块显示了输入日志数据。

The following code block shows the input log data.

INFO - 48566 - TRANSACTION_START - start
INFO - 48566 - SQL - transaction1 - 320
INFO - 48566 - SQL - transaction1 - 200
INFO - 48566 - TRANSACTION_END – end

stdout (在 Windows 系统中是命令提示符,而在 UNIX 系统中是终端)

stdout (it will be command prompt in windows or terminal in UNIX)

这是总的 sql_duration 320 + 200 = 520。

This is the total sql_duration 320 + 200 = 520.

520

File Output

Logstash 还可以将筛选日志事件存储到输出文件。我们将使用上述示例,并将输出存储在文件中而不是 STDOUT 中。

Logstash can also store the filter log events to an output file. We will use the above-mentioned example and store the output in a file instead of STDOUT.

logstash.conf

此 Logstash 配置文件将 Logstash 指示到将 total sql_duration 存储到输出日志文件中。

This Logstash config file direct Logstash to store the total sql_duration to an output log file.

input {
   file {
      path => "C:/tpwork/logstash/bin/log/input1.log"
   }
}
filter {
   grok {
      match => [
         "message", "%{LOGLEVEL:loglevel} - %{NOTSPACE:taskid} -
            %{NOTSPACE:logger} - %{WORD:label}( - %{INT:duration:int})?"
      ]
   }
   if [logger] == "TRANSACTION_START" {
      aggregate {
         task_id => "%{taskid}"
         code => "map['sql_duration'] = 0"
         map_action => "create"
      }
   }
   if [logger] == "SQL" {
      aggregate {
         task_id => "%{taskid}"
         code => "map['sql_duration'] ||= 0 ;
            map['sql_duration'] += event.get('duration')"
      }
   }
   if [logger] == "TRANSACTION_END" {
      aggregate {
         task_id => "%{taskid}"
         code => "event.set('sql_duration', map['sql_duration'])"
         end_of_task => true
         timeout => 120
      }
   }
}
output {
   if [logger] == "TRANSACTION_END" {
      file {
         path => "C:/tpwork/logstash/bin/log/output.log"
         codec => line{format => "%{sql_duration}"}
      }
   }
}

Run logstash

我们可以使用以下命令运行 Logstash。

We can run Logstash by using the following command.

>logstash –f logsatsh.conf

input.log

以下代码块显示了输入日志数据。

The following code block shows the input log data.

INFO - 48566 - TRANSACTION_START - start
INFO - 48566 - SQL - transaction1 - 320
INFO - 48566 - SQL - transaction1 - 200
INFO - 48566 - TRANSACTION_END – end

output.log

以下代码块显示了输出日志数据。

The following code block shows the output log data.

520

Null Output

这是一个特殊输出插件,用于分析输入和筛选插件的性能。

This is a special output plugin, which is used for analyzing the performance of input and filter Plugins.

Logstash - Plugins

Logstash 为其管道(输入、筛选和输出)的所有三个阶段提供各种插件。这些插件帮助用户从各种源(如 Web 服务器、数据库、网络协议等)捕获日志。

Logstash offers various plugins for all three stages of its pipeline (Input, Filter and Output). These plugins help the user to capture logs from various sources like Web Servers, Databases, Over Network Protocols, etc.

捕获后,Logstash 可以根据用户的需要,解析和转换数据为有意义的信息。最后,Logstash 可以将有意义的信息发送或存储到各种目标源,如 Elasticsearch、AWS Cloudwatch 等。

After capturing, Logstash can parse and transform the data into meaningful information as required by the user. Lastly, Logstash can send or store that meaningful information to various destination sources like Elasticsearch, AWS Cloudwatch, etc.

Input Plugins

Logstash 中的输入插件帮助用户从各种源中提取和接收日志。输入插件的使用语法如下:

Input plugins in Logstash helps the user to extract and receive logs from various sources. The syntax for using the input plugin is as follows −

Input {
   Plugin name {
      Setting 1……
      Setting 2……..
   }
}

您可以使用以下命令下载输入插件:

You can download input plugin by using the following command −

>Logstash-plugin install Logstash-input-<plugin name>

Logstash-plugin 实用程序存在于 Logstash 安装目录的 bin folder 中。下表列出了 Logstash 提供的输入插件。

The Logstash-plugin utility is present in the bin folder of the Logstash installation directory. The following table has a list of the input plugins offered by Logstash.

Sr.No.

Plugin name & Description

1

beats To get the logging data or events from elastic beats framework.

2

cloudwatch To extract events from CloudWatch, an API offer by Amazon Web Services.

3

couchdb_changes Events from _chages URI of couchdb shipped using this plugin.

4

drupal_dblog To extract drupal’s watchdog logging data with enabled DBLog.

5

Elasticsearch To retrieve the results of queries performed in Elasticsearch cluster.

6

eventlog To get the events from windows event log.

7

exec To get shell command output as an input in Logstash.

8

file To get the events from an input file. This is useful, when the Logstash is locally installed with the input source and have access to input source logs.

9

generator It is used for testing purposes, which creates random events.

10

github Captures events from GitHub webhook.

11

graphite To get metrics data from graphite monitoring tool.

12

heartbeat It is also used for testing and it produces heartbeat like events

13

http To collect log events over two network protocols and those are http and https.

14

http_poller It is used to decode the HTTP API output to an event.

15

jdbc It converts the JDBC transactions to an event in Logstash.

16

jmx To extract the metrics from remote java applications using JMX.

17

log4j Capture events from socketAppender object of Log4j over TCP socket.

18

rss To the output of command line tools as an input event in Logstash.

19

tcp Captures events over TCP socket.

20

twitter Collect events from twitter streaming API.

21

unix Collect events over UNIX socket.

22

websocket Capture events over websocket protocol.

23

xmpp Reads events over Jabber/xmpp protocols.

Plugin Settings

所有插件都有其特定的设置,这有助于在插件中指定重要的字段,例如端口、路径等等。我们将讨论其中一些输入插件的设置。

All the plugins have their specific settings, which helps to specify the important fields like Port, Path, etc., in a plugin. We will discuss the settings of some of the input plugins.

File

该输入插件用于直接从输入源中存在的日志或文本文件中提取事件。它与 UNIX 中的 tail 命令类似,保存上一次读取光标并仅从输入文件中读取新追加的数据,但可以通过使用 star_position 设置来更改它。以下是此输入插件的设置。

This input plugin is used to extract events directly from log or text files present in the input source. It works similar to the tail command in UNIX and save the last read cursor and read only the new appended data from the input file, but it can be changed by using star_position setting. Following are the settings of this input plugin.

Setting Name

Default Value

Description

add_field

{}

Append a new field to the input event.

close_older

3600

The files having last read time (in seconds) more than the specified in this plugin is closed.

codec

“plain”

It is used to decode the data before entering into the Logstash pipeline.

delimiter

“\n”

It is used to specify a new line delimiter.

discover_interval

15

It is the time interval (in seconds) between discovering new files in the specified path.

enable_metric

true

It is used to enable or disable the reporting and collection of metric for the specified plugin.

exclude

It is used to specify the filename or patterns, which should be excluded from input plugin.

Id

To specify a unique identity for that plugin instance.

max_open_files

It specifies the maximum number of input files by Logstash at any time.

path

Specify the path of the files and it can contain the patterns for filename.

start_position

“end”

You can change to “beginning”, if you want that; initially Logstash should start reading the files from the starting and not only the new log event.

start_interval

1

It specifies the time interval in seconds, after which Logstash checks for the modified files.

tags

To add any additional information, like Logstash, it adds "_grokparsefailure" in tags, when any log event failed to comply with the specified grok filter.

type

This is a special field, which you can add to an input event and it is useful in filters and kibana.

Elasticsearch

此特定插件用于读取 Elasticsearch 集群中的搜索查询结果。以下是该插件中使用的设置 −

This particular plugin is used to read the search queries results in an Elasticsearch cluster. The following has the settings used in this plugin −

Setting Name

Default Value

Description

add_field

{}

Same as in file plugin, it is used to append a field in input event.

ca_file

It is used to specify the path of SSL certificate Authority file.

codec

“plain”

It is used to decode the input events from Elasticsearch before entering in the Logstash pipeline.

docinfo

“false”

You can change it to true, if you want to extract the additional information like index, type and id from Elasticsearch engine.

docinfo_fields

["_index", "_type", "_id"]

You can eliminate any field, which you do not want in your Logstash input.

enable_metric

true

It is used to enable or disable the reporting and collection of metric for that plugin instance.

hosts

It is used to specify the addresses of all elasticsearch engines, which will be the input source of that Logstash instance. The syntax is host:port or IP:port.

Id

It is used to give a unique identity number to that specific input plugin instance.

index

"logstash-*"

It is used to specify the index name or a pattern, which Logstash will monitor by Logstash for input.

password

For authentication purposes.

query

"{ \"sort\": [ \"_doc\" ] }"

Query for the execution.

ssl

false

Enable or disable secure socket layer.

tags

To add any additional information in input events.

type

It is used to classify the input forms so that it will be easy to search all the input events at later stages.

user

For authentic purposes.

eventlog

该输入插件从 Windows 服务器的 Win32 API 中读取数据。以下是该插件的设置 −

This input plugin reads data from win32 API of windows servers. Followings are the settings of this plugin −

Setting Name

Default Value

Description

add_field

{}

Same as in file plugin, it is used to append a field in input event

codec

“plain”

It is used to decode the input events from windows; before entering in the Logstash pipeline

logfile

["Application", "Security", "System"]

Events required in the input log file

interval

1000

It is in milliseconds and defines the interval between two consecutive checks of new event logs

tags

To add any additional information in input events

type

It is used to classify the input form a specific plugins to given type, so that it will be easy to search all the input events in later stages

Twitter

此输入插件用于从其流媒体 API 收集 Twitter 订阅源。下表描述了此插件的设置。

This input plugin is used to collect the feed of twitter from its Streaming API. The following table describes the settings of this plugin.

Setting Name

Default Value

Description

add_field

{}

Same as in file plugin, it is used to append a field in input event

codec

“plain”

It is used to decode the input events from windows; before entering in the Logstash pipeline

consumer_key

It contains the twitter app’s consumer key. For more info, visit https://dev.twitter.com/apps/new

consumer_secret

It contains the twitter app’s consumer secret key. For more info, visit https://dev.twitter.com/apps/new

enable_metric

true

It is used to enable or disable the reporting and collection of metric for that plugin instance

follows

It specifies the user ids separated by commas and LogStash checks these users’ status in Twitter. For more info, visit https://dev.twitter.com

full_tweet

false

You can change it to true, if you want Logstash to read the full object return from twitter API

id

It is used to give a unique identity number to that specific input plugin instance

ignore_retweets

False

You can change set it true to ignore the retweets in the input twitter feed

keywords

It’s an array of keywords, which need to be tracked in the twitters input feed

language

It defines the language of the tweets needed by LogStash from input twitter feed. This is an array of identifier, which defines a specific language in twitter

locations

To filter out the tweets from input feed according to the location specified. This is an array, which contains longitude and latitude of the location

oauth_token

It is a required filed, which contains user oauth token. For more information please visit the following link https://dev.twitter.com/apps

oauth_token_secret

It is a required filed, which contains user oauth secret token. For more information please visit the following link https://dev.twitter.com/apps

tags

To add any additional information in input events

type

It is used to classify the input form a specific plugins to given type, so that it will be easy to search all the input events in later stages

TCP

TCP 用于通过 TCP 套接字获取事件;它可以从用户连接或服务器中读取,这是在模式设置中指定的。下表描述了此插件的设置 -

TCP is used to get the events over the TCP socket; it can read from the user connections or server, which is specified in mode setting. The following table describes the settings of this plugin −

Setting Name

Default Value

Description

add_field

{}

Same as in file plugin, it is used to append a field in input event

codec

“plain”

It is used to decode the input events from windows; before entering in the Logstash pipeline

enable_metric

true

It is used to enable or disable the reporting and collection of metric for that plugin instance

host

“0.0.0.0”

The address of the server OS the client depends upon

id

It contains the twitter app’s consumer key

mode

“server”

It is used to specify the input source is server or client.

port

It defines the port number

ssl_cert

It is used to specify the path of SSL certificate

ssl_enable

false

Enable or disable SSL

ssl_key

To specify the path of SSL key file

tags

To add any additional information in input events

type

It is used to classify the input form a specific plugins to given type, so that it will be easy to search all the input events in later stages

Logstash – Output Plugins

Logstash 支持各种输出源和不同技术,如数据库、文件、电子邮件、标准输出等。

Logstash supports various output sources and in different technologies like Database, File, Email, Standard Output, etc.

使用输出插件的语法如下:

The syntax for using the output plugin is as follows −

output {
   Plugin name {
      Setting 1……
      Setting 2……..
   }
}

可以使用以下命令下载输出插件:

You can download the output plugin by using the following command −

>logstash-plugin install logstash-output-<plugin name>

Logstash-plugin utility 位于 Logstash 安装目录的 bin 文件夹中。下表描述了 Logstash 提供的输出插件。

The Logstash-plugin utility is present in the bin folder of Logstash installation directory. The following table describes the output plugins offered by Logstash.

Sr.No.

Plugin Name & Description

1

CloudWatch This plugin is used to send aggregated metric data to CloudWatch of amazon web services.

2

csv It is used to write the output events in a comma-separated manner.

3

Elasticsearch It is used to store the output logs in Elasticsearch index.

4

email It is used to send a notification email, when the output is generated. User can add information about the output in email.

5

exec It is used to a run a command, which match the output event.

6

ganglia It writhe the metrics to gmond of Gangila.

7

gelf It is used to produce output for Graylog2 in GELF format.

8

google_bigquery It outputs the events to Google BigQuery.

9

google_cloud_storage It store the output events to Google Cloud Storage.

10

graphite It is used to store the output events to Graphite.

11

graphtastic It is used to write the output metrics on Windows.

12

hipchat It is used to store the output log events to HipChat.

13

http It is used to send the output log events to http or https endpoints.

14

influxdb It is used to store the output event in InfluxDB.

15

irc It is used to write the output events to irc.

16

mongodb It stores the output data in MongoDB.

17

nagios It is used to notify Nagios with the passive check results.

18

nagios_nsca It is used to notify Nagios with the passive check results over NSCA protocol.

19

opentsdb It store the Logstash output events to OpenTSDB.

20

pipe It streams the output events to the standard input of another program.

21

rackspace It is used to send the output log events to Queue service of Rackspace Cloud.

22

redis It uses rpush command to send the output logging data to Redis queue.

23

riak It is used to store the output events to the Riak distributed key/value pair.

24

s3 It store the output logging data to Amazon Simple Storage Service.

25

sns It is used to send the output events to Amazon’s Simple Notification Service.

26

solr_http It indexes and stores the output logging data in Solr.

27

sps It is used to ship the events to Simple Queue Service of AWS.

28

statsd It is used to ship the metrics data to statsd network daemon.

29

stdout It is used to show the output events on standard output of CLI like command prompt.

30

syslog It is used to ships the output events to syslog server.

31

tcp It is used to send the output events to TCP socket.

32

udp It is used to push the output events over UDP.

33

websocket It is used to push the output events over WebSocket protocol.

34

xmpp It is used to push the output events over XMPP protocol.

所有插件都有其特定的设置,它有助于在插件中指定端口、路径等重要字段。我们将讨论一些输出插件的设置。

All the plugins have their specific settings, which helps to specify the important fields like Port, Path, etc., in a plugin. We will discuss the settings of some of the output plugins.

Elasticsearch

Elasticsearch 输出插件允许 Logstash 将输出存储在 Elasticsearch 引擎的特定群集中。这是用户的热门选择之一,因为它包含在 ELK 堆栈包中,因此为 Devops 提供端到端的解决方案。下表描述了此输出插件的设置。

Elasticsearch output plugin enables Logstash to store the output in the specific clusters of Elasticsearch engine. This is one of the famous choices of users because it comes in the package of ELK Stack and therefore, provides end-to-end solutions for Devops. The following table describes the settings of this output plugin.

Setting Name

Default Value

Description

action

index

It is used to define the action performed in Elasticsearch engine. Other values for this settings are delete, create, update, etc.

cacert

It contains the path of file with .cer or .pem for server’s certificate validation.

codec

“plain”

It is used to encode the output logging data before sending it to the destination source.

doc_as_upset

false

This setting is used in case of update action. It creates a document in Elasticsearch engine, if the document id is not specified in output plugin.

document_type

It is used to store the same type of events in the same document type. If it is not specified, then the event type is used for the same.

flush_size

500

This is used for improving the performance of bulk upload in Elasticsearch

hosts

[“127.0.0.1”]

It is an array of destination addresses for output logging data

idle_flush_time

1

It defines the time limit (second) between the two flushes, Logstash forces flush after the specified time limit in this setting

index

"logstash-%{+YYYY.MM.dd}"

It is used to specify the index of Elasticsearch engine

manage_temlpate

true

It is used to apply the default template in Elasticsearch

parent

nil

It is used to specify the id of parent document in Elasticsearch

password

It is used to authenticate the request to a secure cluster in Elasticsearch

path

It is used to specify the HTTP path of Elasticsearch.

pipeline

nil

It is used to set the ingest pipeline, user wish to execute for an event

proxy

It is used to specify HTTP proxy

retry_initial_interval

2

It is used to set the initial time interval (seconds) between bulk retries. It get double after each retry until it reach to retry_max_interval

retry_max_interval

64

It is used to set the maximum time interval for retry_initial_interval

retry_on_conflict

1

It is the number of retries by Elasticsearch to update a document

ssl

To enable or disable SSL/TLS secured to Elasticsearch

template

It contains the path of the customized template in Elasticsearch

template_name

"logstash"

This is used to name the template in Elasticsearch

timeout

60

It is the timeout for network requests to Elasticsearch

upsert

“”

It update the document or if the document_id does not exist, it creates a new document in Elasticsearch

user

It contains the user to authenticate the Logstash request in secure Elasticsearch cluster

Email

电子邮件输出插件用于在 Logstash 生成输出时通知用户。下表描述了此插件的设置。

The email output plugin is used to notify the user, when Logstash generates output. The following table describes the settings for this plugin.

Setting Name

Default Value

Description

address

“localhost”

It is the address of mail server

attachments

[]

It contains the names and locations of the attached files

body

“”

It contains the body of email and should be plain text

cc

It contains the email addresses in comma separated manner for the cc of email

codec

“plain”

It is used to encode the output logging data before sending it to the destination source.

contenttype

"text/html; charset = UTF-8"

It is used to content-type of the email

debug

false

It is used to execute the mail relay in debug mode

domain

"localhost"

It is used to set the domain to send the email messages

from

"logstash.alert@nowhere.com"

It is used to specify the email address of the sender

htmlbody

“”

It is used to specify the body of email in html format

password

It is used to authenticate with the mail server

port

25

It is used to define the port to communicate with the mail server

replyto

It is used to specify the email id for reply-to field of email

subject

“”

It contains the subject line of the email

use_tls

false

Enable or disable TSL for the communication with the mail server

username

Is contains the username for the authentication with the server

via

“smtp”

It defines the methods of sending email by Logstash

Http

此设置用于通过 http 将输出事件发送到目的地。此插件具有以下设置 -

This setting is used to send the output events over http to the destination. This plugin has following settings −

Setting Name

Default Value

Description

automatic_retries

1

It is used to set the number of http request retries by logstash

cacert

It contains the path of file for server’s certificate validation

codec

“plain”

It is used to encode the output logging data before sending it to the destination source.

content_type

I specifies the content type of http request to the destination server

cookies

true

It is used to enable or disable cookies

format

"json"

It is used to set the format of http request body

headers

It contains the information of http header

http_method

“”

It is used to specify the http method used in the request by logstash and the values can be "put", "post", "patch", "delete", "get", "head"

request_timeout

60

It is used to authenticate with the mail server

url

It is a required setting for this plugin to specify the http or https endpoint

stdout

stdout 输出插件用于在命令行界面的标准输出上写输出事件。在 Windows 中为命令提示符,在 UNIX 中为终端。此插件有以下设置:

The stdout output plugin is used to write the output events on the standard output of the command line interface. It is command prompt in windows and terminal in UNIX. This plugin has the following settings −

Setting Name

Default Value

Description

codec

“plain”

It is used to encode the output logging data before sending it to the destination source.

workers

1

It is used to specify number of workers for the output

statsd

这是一个网络守护进程,用于通过 UDP 将矩阵数据发送到目标后端服务。在 Windows 中为命令提示符,在 UNIX 中为终端。此插件有以下设置:

It is a network daemon used to send the matrices data over UDP to the destination backend services. It is command prompt in windows and terminal in UNIX. This plugin has following settings −

Setting Name

Default Value

Description

codec

“plain”

It is used to encode the output logging data before sending it to the destination source.

count

{}

It is used to define the count to be used in metrics

decrement

[]

It is used to specify the decrement metric names

host

“localhost”

It contains the address of statsd server

increment

[]

It is used to specify the increment metric names

port

8125

It contains the port of statsd server

sample_rate

1

It is used specify the sample rate of metric

sender

“%{host}”

It specifies the name of the sender

set

{}

It is used to specify a set metric

timing

{}

It is used to specify a timing metric

workers

1

It is used to specify number of workers for the output

Filter Plugins

Logstash 支持多种过滤器插件,用于将输入日志解析并转换为结构化程度更高、易于查询的格式。

Logstash supports various filter plugins to parse and transform input logs to a more structured and easy to query format.

使用过滤器插件的语法如下:

The syntax for using the filter plugin is as follows −

filter {
   Plugin name {
      Setting 1……
      Setting 2……..
   }
}

你可以使用以下命令下载过滤器插件:

You can download the filter plugin by using the following command −

>logstash-plugin install logstash-filter-<plugin name>

Logstash 插件实用程序存在于 Logstash 安装目录的 bin 文件夹中。下表描述了 Logstash 提供的输出插件。

The Logstash-plugin utility is present in the bin folder of Logstash installation directory. The following table describes the output plugins offered by Logstash.

Sr.No.

Plugin Name & Description

1

aggregate This plugin collects or aggregate the data from various event of same type and process them in the final event

2

alter It allows user to alter the field of log events, which mutate filter do not handle

3

anonymize It is used replace the values of fields with a consistent hash

4

cipher It is used to encrypt the output events before storing them in destination source

5

clone It is used to create duplicate of the output events in Logstash

6

collate It merges the events from different logs by their time or count

7

csv This plugin parse data from input logs according to the separator

8

date It parse the dates from the fields in the event and set that as a timestamp for the event

9

dissect This plugin helps user to extract fields from unstructured data and makes it easy for grok filter to parse them correctly

10

drop It is used to drop all the events of same type or any other similarity

11

elapsed It is used to compute the time between the start and end events

12

Elasticsearch It is used to copy the fields of previous log events present in Elasticsearch to the current one in Logstash

13

extractnumbers It is used to extract the number from strings in the log events

14

geoip It adds a field in the event, which contains the latitude and longitude of the location of the IP present in the log event

15

grok It is the commonly used filter plugin to parse the event to get the fields

16

i18n It deletes the special characters from a filed in the log event

17

json It is used to create a structured Json object in event or in a specific field of an event

18

kv This plugin is useful in paring key value pairs in the logging data

19

metrics It is used to aggregate metrics like counting time duration in each event

20

multiline It is also one of the commonly use filter plugin, which helps user in case of converting a multiline logging data to a single event.

21

mutate This plugin is used to rename, remove, replace, and modify fields in your events

22

range It used to check the numerical values of fields in events against an expected range and string’s length within a range.

23

ruby It is used to run arbitrary Ruby code

24

sleep This makes Logstash sleeps for a specified amount of time

25

split It is used to split a field of an event and placing all the split values in the clones of that event

26

xml It is used to create event by paring the XML data present in the logs

Codec plugins

编解码器插件可以是输入或输出插件的一部分。这些插件用于更改或设置记录数据演示。Logstash 提供多个编解码器插件和那些如下 −

Codec Plugins can be a part of input or output plugins. These Plugins are used to change or format the logging data presentation. Logstash offers multiple codec Plugins and those are as follows −

Sr.No.

Plugin Name & Description

1

avro This plugin encode serialize Logstash events to avro datums or decode avro records to Logstash events

2

cloudfront This plugin reads the encoded data from AWS cloudfront

3

cloudtrail This plugin is used to read the data from AWS cloudtrail

4

collectd This reads data from the binary protocol called collected over UDP

5

compress_spooler It is used to compress the log events in Logstash to spooled batches

6

dots This is used performance tracking by setting a dot for every event to stdout

7

es_bulk This is used to convert the bulk data from Elasticsearch into Logstash events including Elasticsearch metadata

8

graphite This codec read data from graphite into events and change the event into graphite formatted records

9

gzip_lines This plugin is used to handle gzip encoded data

10

json This is used to convert a single element in Json array to a single Logstash event

11

json_lines It is used to handle Json data with newline delimiter

12

line It plugin will read and write event in a single live, that means after newline delimiter there will be a new event

13

multiline It is used to convert multiline logging data into a single event

14

netflow This plugin is used to convert nertflow v5/v9 data to logstash events

15

nmap It parses the nmap result data into an XML format

16

plain This reads text without delimiters

17

rubydebug This plugin will write the output Logstash events using Ruby awesome print library

Build Your Own Plugin

您还可以在 Logstash 中创建符合您要求的自己的插件。可以使用 Logstash-plugin 实用程序创建自定义插件。在此处,我们将创建一个过滤器插件,该插件会在事件中添加一条自定义消息。

You can also create your own Plugins in Logstash, which suites your requirements. The Logstash-plugin utility is used to create custom Plugins. Here, we will create a filter plugin, which will add a custom message in the events.

Generate the Base Structure

用户可以使用 logstash-plugin 实用程序的生成选项生成必要的文件,也可以从 GitHub 中获得。

A user can generate the necessary files by using the generate option of the logstash-plugin utility or it is also available on the GitHub.

>logstash-plugin generate --type filter --name myfilter --path c:/tpwork/logstash/lib

在此处, type 选项用于指定插件是输入、输出还是过滤器。在此示例中,我们创建一个过滤器插件,名为 myfilter 。路径选项用于指定要创建插件目录的位置。执行上述命令后,您将看到一个目录结构已创建。

Here, type option is used to specify the plugin is either Input, Output or Filter. In this example, we are creating a filter plugin named myfilter. The path option is used to specify the path, where you want your plugin directory to be created. After executing the above mentioned command, you will see that a directory structure is created.

Develop the Plugin

您可以在插件目录的 \lib\logstash\filters 文件夹中找到插件的代码文件。文件扩展名将为 .rb

You can find the code file of the plugin in the \lib\logstash\filters folder in the plugin directory. The file extension will be .rb.

在我们的案例中,代码文件位于以下路径中:

In our case, the code file was located inside the following path −

C:\tpwork\logstash\lib\logstash-filter-myfilter\lib\logstash\filters\myfilter.rb

我们将消息更改为 − default ⇒ “Hi, You are learning this on tutorialspoint.com” 并保存文件。

We change the message to − default ⇒ "Hi, You are learning this on tutorialspoint.com" and save the file.

Install the Plugin

要安装此插件,需要修改 Logstash 的 Gemfile。您可以在 Logstash 的安装目录中找到此文件。在我们的案例中,它将位于 C:\tpwork\logstash 中。使用任意文本编辑器编辑此文件并在其中添加以下文本。

To install this plugin, the Gemfile of Logstash need to be modified. You can find this file in the installation directory of Logstash. In our case, it will be in C:\tpwork\logstash. Edit this file using any text editor and add the following text in it.

gem "logstash-filter-myfilter",:path => "C:/tpwork/logstash/lib/logstash-filter-myfilter"

在上述命令中,我们指定了插件名称及其安装位置。然后,运行 Logstash-plugin 实用程序来安装此插件。

In the above command, we specify the name of the plugin along with where we can find it for installation. Then, run the Logstash-plugin utility to install this plugin.

>logstash-plugin install --no-verify

Testing

在此处,我们在以往的示例之一中添加 myfilter

Here, we are adding myfilter in one of the previous examples −

logstash.conf

logstash.conf

此 Logstash 配置文件在 grok 过滤器插件之后的过滤器部分包含 myfilter。

This Logstash config file contains myfilter in the filter section after the grok filter plugin.

input {
   file {
      path => "C:/tpwork/logstash/bin/log/input1.log"
   }
}
filter {
   grok {
      match => [
         "message", "%{LOGLEVEL:loglevel} - %{NOTSPACE:taskid} -
            %{NOTSPACE:logger} - %{WORD:label}( - %{INT:duration:int})?" ]
   }
   myfilter{}
}
output {
   file {
      path => "C:/tpwork/logstash/bin/log/output1.log"
      codec => rubydebug
   }
}

Run logstash

Run logstash

我们可以使用以下命令运行 Logstash。

We can run Logstash by using the following command.

>logstash –f logsatsh.conf

input.log

input.log

以下代码块显示了输入日志数据。

The following code block shows the input log data.

INFO - 48566 - TRANSACTION_START - start

output.log

output.log

以下代码块显示了输出日志数据。

The following code block shows the output log data.

{
   "path" => "C:/tpwork/logstash/bin/log/input.log",
   "@timestamp" => 2017-01-07T06:25:25.484Z,
   "loglevel" => "INFO",
   "logger" => "TRANSACTION_END",
   "@version" => "1",
   "host" => "Dell-PC",
   "label" => "end",
   "message" => "Hi, You are learning this on tutorialspoint.com",
   "taskid" => "48566",
   "tags" => []
}

Publish it on Logstash

开发人员还可以通过上传到 GitHub 并遵循 Elasticsearch Company 定义的标准步骤,来将他们/她们的自定义插件发布到 Logstash。

A developer can also publish his/her custom plugin to Logstash by uploading it on the github and following the standardized steps defined by the Elasticsearch Company.

请参阅以下 URL 了解更多有关发布的信息:

Please refer the following URL for more information on publishing −

Logstash - Monitoring APIs

Logstash 提供了用于监控其性能的 API。这些监控 API 会提取 Logstash 的运行时指标。

Logstash offers APIs to monitor its performance. These monitoring APIs extract runtime metrics about Logstash.

Node Info API

此 API 用于获取 Logstash 节点的信息。它将以 JSON 格式返回有关操作系统、Logstash 管道和 JVM 的信息。

This API is used to get the information about the nodes of Logstash. It returns the information of the OS, Logstash pipeline and JVM in JSON format.

你可以使用以下 URL 向 Logstash 发送 get 请求来提取信息 -

You can extract the information by sending a get request to Logstash using the following URL −

GET http://localhost:9600/_node?pretty

Response

以下是节点信息 API 的响应。

Following would be the response of the Node Info API.

{
   "host" : "Dell-PC",
   "version" : "5.0.1",
   "http_address" : "127.0.0.1:9600",

   "pipeline" : {
      "workers" : 4,
      "batch_size" : 125,
      "batch_delay" : 5,
      "config_reload_automatic" : false,
      "config_reload_interval" : 3
   },
   "os" : {
      "name" : "Windows 7",
      "arch" : "x86",
      "version" : "6.1",
      "available_processors" : 4
   },
   "jvm" : {
      "pid" : 312,
      "version" : "1.8.0_111",
      "vm_name" : "Java HotSpot(TM) Client VM",
      "vm_version" : "1.8.0_111",
      "vm_vendor" : "Oracle Corporation",
      "start_time_in_millis" : 1483770315412,

      "mem" : {
         "heap_init_in_bytes" : 16777216,
         "heap_max_in_bytes" : 1046937600,
         "non_heap_init_in_bytes" : 163840,
         "non_heap_max_in_bytes" : 0
      },
      "gc_collectors" : [ "ParNew", "ConcurrentMarkSweep" ]
   }
}

你还可以通过在 URL 中添加它们的名称来获取管道、操作系统和 JVM 的特定信息。

You can also get the specific information of Pipeline, OS and JVM, by just adding their names in the URL.

GET http://localhost:9600/_node/os?pretty
GET http://localhost:9600/_node/pipeline?pretty
GET http://localhost:9600/_node/jvm?pretty

Plugins Info API

此 API 用于获取 Logstash 中已安装插件的信息。你可以通过向下面提到的 URL 发送 get 请求来检索此信息 -

This API is used to get the information about the installed plugins in the Logstash. You can retrieve this information by sending a get request to the URL mentioned below −

GET http://localhost:9600/_node/plugins?pretty

Response

以下是插件信息 API 的响应。

Following would be the response of the Plugins Info API.

{
   "host" : "Dell-PC",
   "version" : "5.0.1",
   "http_address" : "127.0.0.1:9600",
   "total" : 95,
   "plugins" : [ {
      "name" : "logstash-codec-collectd",
      "version" : "3.0.2"
   },
   {
      "name" : "logstash-codec-dots",
      "version" : "3.0.2"
   },
   {
      "name" : "logstash-codec-edn",
      "version" : "3.0.2"
   },
   {
      "name" : "logstash-codec-edn_lines",
      "version" : "3.0.2"
   },
   ............
}

Node Stats API

此 API 用于以 JSON 对象形式提取 Logstash 的统计信息(内存、进程、JVM、管道)。你可以通过向下面提到的 URL 发送 get 请求来检索此信息 -

This API is used to extract the statistics of the Logstash (Memory, Process, JVM, Pipeline) in JSON objects. You can retrieve this information by sending a get request to the URLS mentioned below −

GET http://localhost:9600/_node/stats/?pretty
GET http://localhost:9600/_node/stats/process?pretty
GET http://localhost:9600/_node/stats/jvm?pretty
GET http://localhost:9600/_node/stats/pipeline?pretty

Hot Threads API

此 API 检索有关 Logstash 中热线程的信息。热线程是 java 线程,具有很高的 CPU 使用率且运行时间超过正常的执行时间。你可以通过向下面提到的 URL 发送 get 请求来检索此信息 -

This API retrieves the information about the hot threads in Logstash. Hot threads are the java threads, which has high CPU usage and run longer than then normal execution time. You can retrieve this information by sending a get request to the URL mentioned below −

GET http://localhost:9600/_node/hot_threads?pretty

用户可以使用以下 URL 以更易读的形式获取响应。

A user can use the following URL to get the response in a form that is more readable.

GET http://localhost:9600/_node/hot_threads?human = true

Logstash - Security and Monitoring

在本章中,我们将讨论 Logstash 的安全和监控方面。

In this chapter, we will discuss the security and monitoring aspects of Logstash.

Monitoring

Logstash 是一个非常好的工具,可在生产环境中监控服务器和服务。生产环境中的应用程序会产生不同类型的日志数据,例如访问日志、错误日志等。Logstash 可以使用过滤器插件来计数或分析错误、访问或其他事件的数量。此分析和计数可用于监控不同的服务器及其服务。

Logstash is a very good tool to monitor the servers and services in production environments. Applications in production environment produces different kinds of log data like access Logs, Error Logs, etc. Logstash can count or analyze the number of errors, accesses or other events using filter plugins. This analysis and counting can be used for monitoring different servers and their services.

Logstash 提供了 HTTP Poller 等插件来监控网站状态监控。在此,我们正在监控一个名为 mysite 的网站,该网站托管在本地 Apache Tomcat 服务器上。

Logstash offers plugins like HTTP Poller to monitor the website status monitoring. Here, we are monitoring a website named mysite hosted on a local Apache Tomcat Server.

logstash.conf

在此配置文件中,http_poller 插件用于在间隔设置中指定的时间间隔后访问插件中指定站点。最后,它将站点的状态写入标准输出。

In this config file, the http_poller plugin is used to hit the site specified in the plugin after a time interval specified in interval setting. Finally, it writes the status of the site to a standard output.

input {
   http_poller {
      urls => {
         site => "http://localhost:8080/mysite"
      }
      request_timeout => 20
      interval => 30
      metadata_target => "http_poller_metadata"
   }
}
output {
   if [http_poller_metadata][code] == 200 {
      stdout {
         codec => line{format => "%{http_poller_metadata[response_message]}"}
      }
   }
   if [http_poller_metadata][code] != 200 {
      stdout {
         codec => line{format => "down"}
      }
   }
}

Run logstash

我们可以使用以下命令运行 Logstash。

We can run Logstash with the following command.

>logstash –f logstash.conf

stdout

如果站点已启动,则输出将为 -

If the site is up, then the output will be −

Ok

如果我们停止 Tomcat 的 Manager App ,则输出将更改为 -

If we stop the site by using the Manager App of Tomcat, the output will change to −

down

Security

Logstash 为与外部系统进行安全通信提供了大量功能并支持身份验证机制。所有 Logstash 插件都支持通过 HTTP 连接进行身份验证和加密。

Logstash provides plenty of features for secure communication with external systems and supports authentication mechanism. All Logstash plugins support authentication and encryption over HTTP connections.

Security with HTTP protocol

Logstash 提供的各种插件中都有用于身份验证目的的用户和密码等设置,比如 Elasticsearch 插件中。

There are settings like user and password for authentication purposes in various plugins offered by Logstash like in the Elasticsearch plugin.

elasticsearch {
   user => <username>
   password => <password>
}

Elasticsearch 的其他身份验证是 PKI (public key infrastructure) 。开发者需要在 Elasticsearch 输出插件中定义两个设置来启用 PKI 认证。

The other authentication is PKI (public key infrastructure) for Elasticsearch. The developer needs to define two settings in the Elasticsearch output plugin to enable the PKI authentication.

elasticsearch {
   keystore => <string_value>
   keystore_password => <password>
}

在 HTTPS 协议中,开发者可以使用证书机构的证书进行 SSL/TLS。

In the HTTPS protocol, a developer can use the authority’s certificate for SSL/TLS.

elasticsearch {
   ssl => true
   cacert => <path to .pem file>
}

Security with Transport Protocol

要将传输协议用于 Elasticsearch,用户需要将协议设置设置为传输。这避免了 JSON 对象的不解组,从而提高了效率。

To use the transport protocol with Elasticsearch, users need to set protocol setting to transport. This avoids un-marshalling of JSON objects and leads to more efficiency.

基本身份验证与在 Elasticsearch 输出协议中执行的 http 协议中执行的身份验证相同。

The basic authentication is same as performed in http protocol in Elasticsearch output protocol.

elasticsearch {
   protocol => “transport”
   user => <username>
   password => <password>
}

PKI 身份验证还需要在 Elasticsearch 输出协议中将 SSL 设置为 true,以及其他设置 −

The PKI authentication also needs the SSL sets to be true with other settings in the Elasticsearch output protocol −

elasticsearch {
   protocol => “transport”
   ssl => true
   keystore => <string_value>
   keystore_password => <password>
}

最后,SSL 安全性需要比通信中的其他安全方法多一些设置。

Finally, the SSL security requires a little with more settings than other security methods in communication.

elasticsearch {
   ssl => true
   ssl => true
   keystore => <string_value>
   keystore_password => <password>
   truststore =>
   truststore_password => <password>
}

Other Security Benefits from Logstash

Logstash 可以帮助输入系统源来防止拒绝服务攻击之类的攻击。对日志进行监控以及分析其中的不同事件可以帮助系统管理员检查传入连接和错误的变化。这些分析可以帮助查看服务器上是否发生或将要发生攻击。

Logstash can help input system sources to prevent against attacks like denial of service attacks. The monitoring of logs and analyzing the different events in those logs can help system administrators to check the variation in the incoming connections and errors. These analyses can help to see if the attack is happening or going to happen on the servers.

Elasticsearch 公司的其他产品,比如 x-packfilebeat ,提供了一些与 Logstash 安全通信的功能。

Other products of the Elasticsearch Company such as x-pack and filebeat provides some functionality to communicate securely with Logstash.