Consume Kafka message with a custom header

问题: I'm trying to create a kafka Consumer with Spring Cloud Stream in order to listen a Kafka message, built outside any Spring context, with a custom header (operationType)....

问题:

I'm trying to create a kafka Consumer with Spring Cloud Stream in order to listen a Kafka message, built outside any Spring context, with a custom header (operationType).

I'm using Spring Boot 1.5.x / Spring Cloud Egdware.SR5 and the 1.1.1 version of kafka-client and kafka_2.11.

My Listener class contains this method

@StreamListener(value = "dataset-changed", condition = "headers['operationType']=='UPDATE'")
public void onEvent(@Payload DatasetChangedMessage payload) {
  // my code should be execute only if the header operationType == UPDATE
}

The Spring Cloud Stream configuration is

spring.cloud.stream:
  bindings:
    dataset-changed:
      group: preparation
      content-type: application/json
      destination: dataset-changed
      consumer:
        headerMode: raw
        configuration:
          key.deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
          value.deserializer: org.apache.kafka.common.serialization.StringDeserializer

The producer is a simple java doc with a kafka-client:1.1.1 library

Properties producerConfig = new Properties();
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

KafkaProducer<byte[], String> producer = new KafkaProducer<>(producerConfig);

// Headers (to condition the kafka listener)
final List<Header> headers = new ArrayList<>();
headers.add(new RecordHeader("operationType", "UPDATE".getBytes()));

ProducerRecord<byte[], String> record =
        new ProducerRecord<>("dataset-changed", 0, "111".getBytes(), getJsonPayload(), headers);
Future<RecordMetadata> future = producer.send(record);
future.get();

producer.close();

When i produce the kafka message, i have this kind of warnings

2019-03-15 14:48:32.103  WARN [tdp-preparation,1e24b9764ef9bb14,1e24b9764ef9bb14,false] 34760 --- [           -L-1] .DispatchingStreamListenerMessageHandler : Cannot find a @StreamListener matching for message with id: ea27a446-69da-7b8d-1b94-50b46a40dfde

whereas the operationType header is present

enter image description here


回答1:

I believe you are losing headers on the consumer side because you use headerMode: raw. That means, essentially, none - map no headers.

Consider to use a headerMode: headers instead.

See documentation for more info: https://cloud.spring.io/spring-cloud-stream/spring-cloud-stream.html#_consumer_properties

headerMode

When set to none, disables header parsing on input. Effective only for messaging middleware that does not support message headers natively and requires header embedding. This option is useful when consuming data from non-Spring Cloud Stream applications when native headers are not supported. When set to headers, it uses the middleware’s native header mechanism. When set to embeddedHeaders, it embeds headers into the message payload.

Default: depends on the binder implementation.

  • 发表于 2019-03-17 14:56
  • 阅读 ( 2082 )
  • 分类:sof

条评论

请先 登录 后评论
不写代码的码农
小编

篇文章

作家榜 »

  1. 小编 文章
返回顶部
部分文章转自于网络,若有侵权请联系我们删除