Get ahead
VMware offers training and certification to turbo-charge your progress.
Learn moreI am pleased to announce that the spring-integration-kafka
(Spring Integration Kafka Support) First Milestone for version 2.0
is now available.
The Spring Integration Kafka extension project provides inbound
and outbound
channel adapters for Apache Kafka.
Starting with this version 2.0 the project is a complete rewrite based on the new spring-kafka
project which uses the pure java Producer
and Consumer
clients provided by Kafka 0.9.x.x
.
The artifact org.springframework.integration:spring-integration-kafka:2.0.0.M1
is available in the Milestone repository.
Having the MessageListenerContainer
foundation from the spring-kafka
project,
the KafkaMessageDrivenChannelAdapter
definition is very simple now:
@Bean
public MessageProducer kafkaProducer(
AbstractMessageListenerContainer<Integer, String> container) {
KafkaMessageDrivenChannelAdapter<Integer, String> adapter =
new KafkaMessageDrivenChannelAdapter<>(container);
adapter.setMessageConverter(new StringJsonMessageConverter());
adapter.setOutputChannel(fromKafkaChannel());
adapter.setErrorChannel(myErrorChannel());
return adapter;
}
With the XML configuration we should declare just single component as well:
<int-kafka:message-driven-channel-adapter
id="kafkaListener"
listener-container="container1"
auto-startup="false"
phase="100"
send-timeout="5000"
channel="nullChannel"
message-converter="messageConverter"
error-channel="errorChannel" />
With the KafkaTemplate
foundation from the the spring-kafka
project, the KafkaProducerMessageHandler
is simple too:
@Bean
@ServiceActivator(inputChannel = "toKafka")
public MessageHandler kafkaProducerHandler(
KafkaTemplate<Integer, String> template) {
KafkaProducerMessageHandler<Integer, String> handler =
new KafkaProducerMessageHandler<>(template);
handler.setTopicExpression(PARSER.parseExpression("headers.myTopic"));
handler.setPartitionIdExpression(
PARSER.parseExpression("headers.myPartition"));
return handler;
}
The XML configuration has been simplified, too:
<int-kafka:outbound-channel-adapter
kafka-template="template"
channel="inputToKafka"
topic="foo"/>
Starting with version 1.2
Spring Integration Java DSL introduces Kafka09
Factory to cover the functionality for aforementioned channel adapters from this new 2.0
version.
For example the producing part may look like:
.handle(Kafka09.outboundChannelAdapter(producerFactory())
.defaultTopic("foo")
.partitionId(m -> m.getHeaders().get("myPartition", Integer.class)))
And finally, don't miss Spring for Apache Kafka announcement, too!
Together with the next Spring for Apache Kafka we may consider to implement some adapters for Kafka Streams as well.
Since the code base of the project became pretty straightforward and looks like Apache Kafka API is going to be stable, we intend to absorb this project in the Spring Integration Code 5.0, when the time comes.
Meanwhile we look forward to your feedback and if all goes well plan to release 2.0.0.RELEASE
in the next few weeks!