Spring Integration Kafka Support 2.0.0.M1 is now available

Releases | Artem Bilan | April 11, 2016 | ...

I am pleased to announce that the spring-integration-kafka (Spring Integration Kafka Support) First Milestone for version 2.0 is now available.

The Spring Integration Kafka extension project provides inbound and outbound channel adapters for Apache Kafka.

Starting with this version 2.0 the project is a complete rewrite based on the new spring-kafka project which uses the pure java Producer and Consumer clients provided by Kafka 0.9.x.x.

The artifact org.springframework.integration:spring-integration-kafka:2.0.0.M1 is available in the Milestone repository.

Key Features

The Kafka Consumer Channel Adapter

Having the MessageListenerContainer foundation from the spring-kafka project, the KafkaMessageDrivenChannelAdapter definition is very simple now:

@Bean
public MessageProducer kafkaProducer(
                   AbstractMessageListenerContainer<Integer, String> container) {
    KafkaMessageDrivenChannelAdapter<Integer, String> adapter = 
                              new KafkaMessageDrivenChannelAdapter<>(container);
    adapter.setMessageConverter(new StringJsonMessageConverter());
    adapter.setOutputChannel(fromKafkaChannel());
    adapter.setErrorChannel(myErrorChannel());
    return adapter;
}

With the XML configuration we should declare just single component as well:

<int-kafka:message-driven-channel-adapter
        id="kafkaListener"
        listener-container="container1"
        auto-startup="false"
        phase="100"
        send-timeout="5000"
        channel="nullChannel"
        message-converter="messageConverter"
        error-channel="errorChannel" />

The Kafka Producer Channel Adapter

With the KafkaTemplate foundation from the the spring-kafka project, the KafkaProducerMessageHandler is simple too:

@Bean
@ServiceActivator(inputChannel = "toKafka")
public MessageHandler kafkaProducerHandler(
                            KafkaTemplate<Integer, String> template) {
    KafkaProducerMessageHandler<Integer, String> handler = 
                         new KafkaProducerMessageHandler<>(template);
    handler.setTopicExpression(PARSER.parseExpression("headers.myTopic"));
    handler.setPartitionIdExpression(
                            PARSER.parseExpression("headers.myPartition"));
    return handler;
}

The XML configuration has been simplified, too:

<int-kafka:outbound-channel-adapter 
                kafka-template="template" 
                channel="inputToKafka"
                topic="foo"/>

Java DSL Changes

Starting with version 1.2 Spring Integration Java DSL introduces Kafka09 Factory to cover the functionality for aforementioned channel adapters from this new 2.0 version. For example the producing part may look like:

.handle(Kafka09.outboundChannelAdapter(producerFactory())
             .defaultTopic("foo")
             .partitionId(m -> m.getHeaders().get("myPartition", Integer.class)))

And finally, don't miss Spring for Apache Kafka announcement, too!

Next Steps

Together with the next Spring for Apache Kafka we may consider to implement some adapters for Kafka Streams as well.

Since the code base of the project became pretty straightforward and looks like Apache Kafka API is going to be stable, we intend to absorb this project in the Spring Integration Code 5.0, when the time comes.

Meanwhile we look forward to your feedback and if all goes well plan to release 2.0.0.RELEASE in the next few weeks!

Project Page | Help

Get the Spring newsletter

Stay connected with the Spring newsletter

Subscribe

Get ahead

VMware offers training and certification to turbo-charge your progress.

Learn more

Get support

Tanzu Spring offers support and binaries for OpenJDK™, Spring, and Apache Tomcat® in one simple subscription.

Learn more

Upcoming events

Check out all the upcoming events in the Spring community.

View all