1、数据清洗业务类LogProcessor
package com.css.kafka.kafka_stream; import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext; /**
* 数据清洗*/
public class LogProcessor implements Processor<byte[], byte[]>{ private ProcessorContext context; //初始化
public void init(ProcessorContext context) {
//传输
this.context = context;
} //具体业务逻辑
public void process(byte[] key, byte[] value) {
//1.拿到消息数据,转成字符串
String message = new String(value); //2.如果包含- 去除
if (message.contains("-")) {
//3.把- 去掉 之后去掉左侧数据
message = message.split("-")[1];
}
//4.发送数据
context.forward(key, message.getBytes());
} //释放资源
public void close() {
}
}
2、Application类
package com.css.kafka.kafka_stream; import java.util.Properties; import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorSupplier; /**
* 需求:对数据进行清洗操作
*
* 思路:wo-henshuai 把-和wo清洗掉*/
public class Application { public static void main(String[] args) {
//1.定义主题 发送到 另外一个主题中 数据清洗
String oneTopic = "t1";
String twoTopic = "t2"; //2.设置属性
Properties prop = new Properties();
prop.put(StreamsConfig.APPLICATION_ID_CONFIG, "logProcessor");
prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.146.132:9092,192.168.146.133:9092,192.168.146.134:9092"); //3.实例对象
StreamsConfig config = new StreamsConfig(prop); //4.流计算 拓扑
Topology builder = new Topology(); //5.定义kafka组件数据源
builder.addSource("Source", oneTopic).addProcessor("Processor", new ProcessorSupplier<byte[], byte[]>() { public Processor<byte[], byte[]> get() {
return new LogProcessor();
}
//从哪里来
}, "Source")
//到哪里去
.addSink("Sink", twoTopic, "Processor"); //6.实例化kafkaStream
KafkaStreams kafkaStreams = new KafkaStreams(builder, prop);
kafkaStreams.start();
}
}
3、运行Application类的main方法
4、在hd09-1机器上创建主题t1
bin/kafka-topics.sh --zookeeper hd09-1:2181 --create --replication-factor 3 --partition 1 --topic t1
5、在hd09-2机器上启动消费者
bin/kafka-console-consumer.sh --bootstrap-server hd09-2:9092 --topic t2 --from-beginning --consumer.config config/consumer.properties
6、在hd09-1机器上启动生产者
bin/kafka-console-producer.sh --broker-list hd09-1:9092 --topic t1
7、此时在hd09-1机器kafka生产者上输入 wo-henshuai,在hd09-2消费者机器上会显示henshuai,即完成了数据清洗,如下图。