flume1.6+elasticsearch6.3.2

Pom

<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch -->
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>6.4.</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.elasticsearch.client/transport -->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
<version>6.4.</version>
</dependency>
<!-- <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId>
<version>4.1..Final</version> </dependency> -->
<!-- https://mvnrepository.com/artifact/org.apache.flume.flume-ng-sinks/flume-ng-elasticsearch-sink -->
<dependency>
<groupId>org.apache.flume.flume-ng-sinks</groupId>
<artifactId>flume-ng-elasticsearch-sink</artifactId>
<version>1.6.</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.google.code.gson/gson -->
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.</version>
</dependency> </dependencies>

ElasticSearchForLogSink.java

package com.jachs.sink.elasticsearch;

import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.apache.flume.sink.elasticsearch.ElasticSearchEventSerializer;
import org.apache.flume.sink.elasticsearch.client.RoundRobinList;
import org.apache.http.client.HttpClient;
import org.apache.http.impl.client.DefaultHttpClient;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.transport.client.PreBuiltTransportClient; import com.google.gson.Gson; import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.CLUSTER_NAME;
import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.INDEX_NAME; import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map; import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.HOSTNAMES; public class ElasticSearchForLogSink extends AbstractSink implements Configurable {
private String hostNames;
private String indexName;
private String clusterName;
static TransportClient client;
static Map<String, String> dataMap = new HashMap<String, String>();; public void configure(Context context) {
hostNames = context.getString(HOSTNAMES);
indexName = context.getString(INDEX_NAME);
clusterName = context.getString(CLUSTER_NAME);
} @Override
public void start() {
Settings settings = Settings.builder().put("cluster.name", clusterName).build();
try {
client = new PreBuiltTransportClient(settings).addTransportAddress(new TransportAddress(
InetAddress.getByName(hostNames.split(":")[]), Integer.parseInt(hostNames.split(":")[])));
} catch (UnknownHostException e) {
e.printStackTrace();
}
} @Override
public void stop() {
super.stop();
} public Status process() throws EventDeliveryException {
Status status = Status.BACKOFF;
Channel ch = getChannel();
Transaction txn = ch.getTransaction();
txn.begin();
try {
Event event = ch.take();
if (event == null) {
txn.rollback();
return status;
}
String data = new String(event.getBody(), "UTF-8");
if (data.indexOf("token") != -) {
String token = data.substring(data.length() - , data.length());
System.out.println("获取标识" + token);
String sb = dataMap.get(token);
if (sb != null) {
sb = sb + data;
} else {
dataMap.put(token, data);
}
}
System.out.println("打印" + dataMap.size());
if (dataMap.size() >= ) {//十条数据一提交,条件自己改
BulkRequestBuilder bulkRequest = client.prepareBulk(); bulkRequest.add(client.prepareIndex(indexName, "text").setSource(dataMap));
bulkRequest.execute().actionGet();
dataMap.clear();
System.out.println("归零" + dataMap.size());
}
// Map<String, Object> map = new HashMap<String, Object>(); // for (String key : head.keySet()) {
// map.put("topic", key);
// map.put("timestamp", head.get(key));
// map.put("data", new String(event.getBody(), "UTF-8"));
// } // IndexRequestBuilder create = client.prepareIndex(indexName,
// "text").setSource(map);
// IndexResponse response = create.execute().actionGet(); txn.commit();
status = Status.READY;
} catch (Throwable t) {
txn.rollback();
status = Status.BACKOFF;
t.printStackTrace();
if (t instanceof Error) {
throw (Error) t;
}
} finally {
txn.close();
}
return status;
}
}

kafka生成者模仿日志写入代码

package com.test.Kafka;

import java.util.Properties;

import org.apache.commons.lang.RandomStringUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord; import com.google.gson.Gson; public class App {
public static void main(String[] args) {
Properties properties = new Properties();
// properties.put("bootstrap.servers",
// "192.168.2.200:9092,192.168.2.157:9092,192.168.2.233:9092,192.168.2.194:9092,192.168.2.122:9092");
// properties.put("bootstrap.servers",
// "192.168.2.200:9092,192.168.2.233:9092,192.168.2.122:9092");
properties.put("bootstrap.servers", "127.0.0.1:9092");
properties.put("acks", "all");
properties.put("retries", );
properties.put("batch.size", );
properties.put("linger.ms", );
properties.put("buffer.memory", );
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = null;
RandomStringUtils randomStringUtils=new RandomStringUtils();
try {
producer = new KafkaProducer<String, String>(properties);
for (int i = ; i < ; i++) {// topID无所谓
producer.send(new ProducerRecord<String, String>("test1", "tokenk"+randomStringUtils.random()));
}
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.close();
}
}
}

修改flume配置

a1.sinks.elasticsearch.type=com.jachs.sink.elasticsearch.ElasticSearchForLogSink
04-25 07:09