Logstash 简明教程

Logstash - Filters

Logstash 在输入和输出之间的管道中使用过滤器。Logstash 的过滤器测量、操作和创建事件,例如 Apache-Access 。许多过滤器插件用于管理 Logstash 中的事件。在这里,在 Logstash Aggregate Filter 的示例中,我们正在过滤数据库中每个 SQL 事务的持续时间并计算总时间。

Logstash uses filters in the middle of the pipeline between input and output. The filters of Logstash measures manipulate and create events like Apache-Access. Many filter plugins used to manage the events in Logstash. Here, in an example of the Logstash Aggregate Filter, we are filtering the duration every SQL transaction in a database and computing the total time.

Installing the Aggregate Filter Plugin

使用 Logstash 插件实用程序安装 Aggregate 过滤器插件。Logstash 插件是 bin folder 中 Logstash 中 windows 批处理文件。

Installing the Aggregate Filter Plugin using the Logstash-plugin utility. The Logstash-plugin is a batch file for windows in bin folder in Logstash.

>logstash-plugin install logstash-filter-aggregate

logstash.conf

在此配置中,对于 Initializing, Incrementing,generating 总交易持续时间(即 sql_duration )你可以看到三个“if”语句。聚合插件用于添加输入日志的每个事件中存在的 sql_duration。

In this configuration, you can see three ‘if’ statements for Initializing, Incrementing, and generating the total duration of transaction, i.e., the sql_duration. The aggregate plugin is used to add the sql_duration, present in every event of the input log.

input {
   file {
      path => "C:/tpwork/logstash/bin/log/input.log"
   }
}
filter {
   grok {
      match => [
         "message", "%{LOGLEVEL:loglevel} -
            %{NOTSPACE:taskid} - %{NOTSPACE:logger} -
            %{WORD:label}( - %{INT:duration:int})?"
      ]
   }
   if [logger] == "TRANSACTION_START" {
      aggregate {
         task_id => "%{taskid}"
         code => "map['sql_duration'] = 0"
         map_action => "create"
      }
   }
   if [logger] == "SQL" {
      aggregate {
         task_id => "%{taskid}"
         code => "map['sql_duration'] ||= 0 ;
            map['sql_duration'] += event.get('duration')"
      }
   }
   if [logger] == "TRANSACTION_END" {
      aggregate {
         task_id => "%{taskid}"
         code => "event.set('sql_duration', map['sql_duration'])"
         end_of_task => true
         timeout => 120
      }
   }
}
output {
   file {
      path => "C:/tpwork/logstash/bin/log/output.log"
   }
}

Run Logstash

我们可以使用以下命令运行 Logstash。

We can run Logstash by using the following command.

>logstash –f logstash.conf

input.log

以下代码块显示了输入日志数据。

The following code block shows the input log data.

INFO - 48566 - TRANSACTION_START - start
INFO - 48566 - SQL - transaction1 - 320
INFO - 48566 - SQL - transaction1 - 200
INFO - 48566 - TRANSACTION_END - end

output.log

如配置文件中所述,最后的“if”语句位于 logger – TRANSACTION_END,它会打印总交易时间或 sql_duration。这在 output.log 中以黄色突出显示。

As specified in the configuration file, the last ‘if’ statement where the logger is – TRANSACTION_END, which prints the total transaction time or sql_duration. This has been highlighted in yellow color in the output.log.

{
   "path":"C:/tpwork/logstash/bin/log/input.log","@timestamp": "2016-12-22T19:04:37.214Z",
   "loglevel":"INFO","logger":"TRANSACTION_START","@version": "1","host":"wcnlab-PC",
   "message":"8566 - TRANSACTION_START - start\r","tags":[]
}
{
   "duration":320,"path":"C:/tpwork/logstash/bin/log/input.log",
   "@timestamp":"2016-12-22T19:04:38.366Z","loglevel":"INFO","logger":"SQL",
   "@version":"1","host":"wcnlab-PC","label":"transaction1",
   "message":" INFO - 48566 - SQL - transaction1 - 320\r","taskid":"48566","tags":[]
}
{
   "duration":200,"path":"C:/tpwork/logstash/bin/log/input.log",
   "@timestamp":"2016-12-22T19:04:38.373Z","loglevel":"INFO","logger":"SQL",
   "@version":"1","host":"wcnlab-PC","label":"transaction1",
   "message":" INFO - 48566 - SQL - transaction1 - 200\r","taskid":"48566","tags":[]
}
{
   "sql_duration":520,"path":"C:/tpwork/logstash/bin/log/input.log",
   "@timestamp":"2016-12-22T19:04:38.380Z","loglevel":"INFO","logger":"TRANSACTION_END",
   "@version":"1","host":"wcnlab-PC","label":"end",
   "message":" INFO - 48566 - TRANSACTION_END - end\r","taskid":"48566","tags":[]
}