您当前的位置: 首页 >  kafka

Kafka 过滤器链

梁云亮 发布时间:2020-03-20 14:33:53 ,浏览量:2

需求

Producer产生的消息经过过滤器链后,在每条消息的后面带上时间戳,最后分别输出消息发送成功和失败的个数

实现

第一步:

public class TimeInterceptor implements ProducerInterceptor {
    @Override
    public void configure(Map configs) {
    }

    @Override
    public ProducerRecord onSend(ProducerRecord record) {
        // 创建一个新的record,把时间戳写入消息体的最前部
        return new ProducerRecord(record.topic(), record.partition(), record.timestamp(), record.key(),record.value().toString() + "," + System.currentTimeMillis());
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {

    }

    @Override
    public void close() {
    }
}

第二步:

public class CountInterceptor implements ProducerInterceptor{
    private int error = 0;
    private int success = 0;

	@Override
	public void configure(Map configs) {
		
	}

	@Override
	public ProducerRecord onSend(ProducerRecord record) {
		 return record;
	}

	@Override
	public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
		// 统计成功和失败的次数
        if (exception == null) {
            success++;
        } else {
            error++;
        }
	}

	@Override
	public void close() {
        // 保存结果
        System.out.println("Successful Amount: " + success);
        System.out.println("Failed Amount: " + error);
	}
}

第三步:自定义生产者,在其中指定过滤器链

public class InterceptorProducer {

	public static void main(String[] args) throws Exception {
		// 1 设置配置信息
		Properties props = new Properties();
		props.put("bootstrap.servers", "hcmaster:9092");
		props.put("acks", "all");
		props.put("retries", 0);
		props.put("batch.size", 16384);
		props.put("linger.ms", 1);
		props.put("buffer.memory", 33554432);
		props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		
		// 2 构建滤器链
		List interceptors = new ArrayList();
		interceptors.add("com.hc.kafka.interceptor.TimeInterceptor");
		interceptors.add("com.hc.kafka.interceptor.CountInterceptor");
		props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
		 
		String topic = "first";
		Producer producer = new KafkaProducer(props);
		
		// 3 发送消息
		for (int i = 0; i             
关注
打赏
1688896170
查看更多评论
0.2198s