您当前的位置: 首页 >  kafka

Kafka自定义Consumer

梁云亮 发布时间:2020-03-18 19:07:39 ,浏览量:3

代码实现
public class CustomConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        // 定义kakfa 服务的地址,不需要将所有broker指定上
        props.put("bootstrap.servers", "hcmaster:9092");
        // 制定消费者组id
        props.put("group.id", "g1");
        // 是否自动确认offset
        props.put("enable.auto.commit", "true");
        // 自动确认offset的时间间隔
        props.put("auto.commit.interval.ms", "1000");
        // key的序列化类
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        // value的序列化类
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        // 定义consumer
        KafkaConsumer consumer = new KafkaConsumer(props);

        // 消费者订阅的topic, 可同时订阅多个
        consumer.subscribe(Arrays.asList("first","second"));

        while (true) {
            // 读取数据,读取超时时间为100ms
            ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord record : records) {
                System.out.printf("topic = %s , partition = %s , offset = %d, key = %s, value = %s%n",record.topic(), record.partition(), record.offset(), record.key(), record.value());
            }
        }
    }
}
测试

在终端分别启动fist和second两个Producer 在这里插入图片描述 在这里插入图片描述 发送数据顺序:

  1. first发送aa、bb
  2. second发送11、22
  3. first发送33
  4. second发送cc

Intellij控制台输出结果: 在这里插入图片描述

关注
打赏
1688896170
查看更多评论

梁云亮

暂无认证

  • 3浏览

    0关注

    1121博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文
立即登录/注册

微信扫码登录

0.0550s