您当前的位置: 首页 >  kafka

Kafka Stream

梁云亮 发布时间:2020-03-18 21:36:36 ,浏览量:2

需求:

使用Kafka Stream清洗数据:只截取用户输入字符串>>>后面的内容。

实现: 第一步:创建一个Maven项目,添加依赖:

    org.apache.kafka
    kafka-clients
    2.4.1



    org.slf4j
    slf4j-api
    1.7.25


    org.slf4j
    slf4j-log4j12
    1.7.25



    org.apache.kafka
    kafka-streams
    2.4.1




    org.apache.kafka
    kafka_2.12
    2.4.1

第二步:创建主类
public class AppDemo {
    public static void main(String[] args) {
        // 定义输入的topic
        String from = "first";
        // 定义输出的topic
        String to = "second";

        // 设置参数
        Properties properties = new Properties();
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafkastream");
        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "hcmaster:9092");

        //创建拜拓扑对象
        Topology topology = new Topology();

        topology.addSource("SOURCE", from)
                .addProcessor("PROCESSOR", new ProcessorSupplier() {
                    @Override
                    public Processor get() {
                        // 具体分析处理
                        return new LogProcessor();
                    }
                }, "SOURCE")
                .addSink("SINK", to, "PROCESSOR");

        // 创建kafka stream
        KafkaStreams streams = new KafkaStreams(topology, properties);
        streams.start();
    }

}
第三步:具体业务处理
public class LogProcessor implements Processor {

    private ProcessorContext context;

    @Override
    public void init(ProcessorContext context) {
        this.context = context;
    }

    @Override
    public void process(byte[] key, byte[] value) {
        String v = new String(value);
        // 如果包含>>>则只保留该标记后面的内容
        if (v.contains(">>>")) {
            v = v.split(">>>")[1].trim();
            // 输出到下一个topic
            context.forward(key, v.getBytes());
        } else {
            context.forward(key, v.getBytes());
        }
    }

    @Override
    public void close() {

    }
}
运行程序

1)运行AppDemo 2)启动生产者 在这里插入图片描述 3)启动消费者 在这里插入图片描述

关注
打赏
1688896170
查看更多评论

梁云亮

暂无认证

  • 2浏览

    0关注

    1121博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文
立即登录/注册

微信扫码登录

0.4286s