需求:
使用Kafka Stream清洗数据:只截取用户输入字符串>>>后面的内容。
实现: 第一步:创建一个Maven项目,添加依赖:
org.apache.kafka
kafka-clients
2.4.1
org.slf4j
slf4j-api
1.7.25
org.slf4j
slf4j-log4j12
1.7.25
org.apache.kafka
kafka-streams
2.4.1
org.apache.kafka
kafka_2.12
2.4.1
第二步:创建主类
public class AppDemo {
public static void main(String[] args) {
// 定义输入的topic
String from = "first";
// 定义输出的topic
String to = "second";
// 设置参数
Properties properties = new Properties();
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafkastream");
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "hcmaster:9092");
//创建拜拓扑对象
Topology topology = new Topology();
topology.addSource("SOURCE", from)
.addProcessor("PROCESSOR", new ProcessorSupplier() {
@Override
public Processor get() {
// 具体分析处理
return new LogProcessor();
}
}, "SOURCE")
.addSink("SINK", to, "PROCESSOR");
// 创建kafka stream
KafkaStreams streams = new KafkaStreams(topology, properties);
streams.start();
}
}
第三步:具体业务处理
public class LogProcessor implements Processor {
private ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public void process(byte[] key, byte[] value) {
String v = new String(value);
// 如果包含>>>则只保留该标记后面的内容
if (v.contains(">>>")) {
v = v.split(">>>")[1].trim();
// 输出到下一个topic
context.forward(key, v.getBytes());
} else {
context.forward(key, v.getBytes());
}
}
@Override
public void close() {
}
}
运行程序
1)运行AppDemo 2)启动生产者 3)启动消费者