多个kafka输入主题设置elasticsearch多个索引输出

多个kafka输入主题设置elasticsearch多个索引输出

本文介绍了logstash 5.0.1:为多个kafka输入主题设置elasticsearch多个索引输出的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个logstash输入设置

I have a logstash input setup as

input {
  kafka {
  bootstrap_servers => "zookeper_address"
  topics => ["topic1","topic2"]
  }
}

我需要在Elasticsearch中将主题输入两个不同的索引中.谁能帮助我解决应如何设置此任务的输出.目前,我只能设置

I need to feed the topics into two different indexes in elasticsearch. Can anyone help me with how the ouput should be setup for such a task. At this time I am only able to setup

output {
  elasticsearch {
    hosts => ["localhost:9200"]
    index => "my_index"
    codec => "json"
    document_id => "%{id}"
  }
}

在同一个Elasticsearch实例上我需要两个索引,分别为index1index2,这些索引将由来自topic1topic2

I need two indexes on the same elasticsearch instance say index1 and index2 which will be fed by messages coming in on topic1 and topic2

推荐答案

首先,您需要在kafka输入中添加decorate_events以便知道消息来自哪个主题

First, you need to add decorate_events to your kafka input in order to know from which topic the message is coming

input {
  kafka {
    bootstrap_servers => "zookeper_address"
    topics => ["topic1","topic2"]
    decorate_events => true
  }
}

然后,您有两个选择,都涉及条件逻辑.第一种是通过引入一个过滤器来根据主题名称添加正确的索引名称.为此,您需要添加

Then, you have two options, both involving conditional logic. The first is by introducing a filter for adding the correct index name depending on the topic name. For this you need to add

filter {
   if [kafka][topic] == "topic1" {
      mutate {
         add_field => {"[@metadata][index]" => "index1"}
      }
   } else {
      mutate {
         add_field => {"[@metadata][index]" => "index2"}
      }
   }
   # remove the field containing the decorations, unless you want them to land into ES
   mutate {
      remove_field => ["kafka"]
   }
}
output {
  elasticsearch {
    hosts => ["localhost:9200"]
    index => "%{[@metadata][index]}"
    codec => "json"
    document_id => "%{id}"
  }
}

第二个选择是像这样直接在输出部分执行if/else(但附加的kafka字段将落入ES中):

Then second option is to do the if/else directly in the output section, like this (but the additional kafka field will land into ES):

output {
   if [@metadata][kafka][topic] == "topic1" {
     elasticsearch {
       hosts => ["localhost:9200"]
       index => "index1"
       codec => "json"
       document_id => "%{id}"
     }
   } else {
     elasticsearch {
       hosts => ["localhost:9200"]
       index => "index2"
       codec => "json"
       document_id => "%{id}"
     }
   }
}

这篇关于logstash 5.0.1:为多个kafka输入主题设置elasticsearch多个索引输出的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-20 13:08