Marier Camel et Kafka

Dans la même lignée que le mariage Thrift-Camel, ça me trottait depuis longtemps dans la tête de marier Kafka et Camel. Kafka est un bus événementiel très performant de type publish/subscribe. Il gère de la persistance, se coordonne avec Zookeeper fonctionne en cluster. J’avais dans l’idée de créer un composant Camel pour produire et consommer des messages et c’est chose faite.

 

 

1. Installation

Tout d’abord, il faut installer Zookeeper :

  1. télécharger Zookeeper : http://www.apache.org/dyn/closer.cgi/zookeeper/
  2. installer Zookeeper : tar -zxvf zookeeper-xxx.tar.gz
  3. démarrer Zookeeper : zookeeper-xxx/bin/zkServer.sh start
  4. dans une autre console, créer les noeuds pour Kafka :
    1. zookeeper-xxx/bin/zkCli.sh
    2. > create /zk ‘ZK’
    3. > create /zk/kafka ‘Kafka Root Directory’
    4. > create /zk/kafka/local ‘Local Kafka cell’
    5. > quit

Bon je sais c’est un peu quick&dirty

Au tour de Kafka :

  1. Télécharger Kafka : http://sna-projects.com/kafka/downloads/kafka-0.6.zip
  2. Installer Kafka : gunzip kafka-0.6.zip
  3. démarrer Kafka : kafka-0.6/kafka-server-start.sh

Et voilà, c’est là aussi un peu quick&dirty.

2. La partie Camel

2.1 Le composant Kafka

package org.giwi.camel.kafka.component;

import java.net.URI;
import java.util.Map;

import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
import org.apache.camel.impl.DefaultComponent;
import org.apache.camel.util.CastUtils;
import org.apache.camel.util.URISupport;

public class KafkaComponent extends DefaultComponent {

	private Map parameters;
	private String zkConnect;

	public KafkaComponent() {
	}

	public KafkaComponent(final CamelContext context) {
		super(context);
	}

	@Override
	protected Endpoint createEndpoint(final String addressUri, final String remaining, final Map parameters) throws Exception {
		final URI endpointUri = URISupport.createRemainingURI(new URI(addressUri), CastUtils.cast(parameters));
		final Endpoint endpoint = new KafkaEndpoint(addressUri, this, endpointUri);
		setProperties(endpoint, parameters);
		setParameters(parameters);
		return endpoint;
	}

	public Map getParameters() {
		return parameters;
	}

	public String getZkConnect() {
		return zkConnect;
	}

	public void setParameters(final Map parameters) {
		this.parameters = parameters;
	}

	public void setZkConnect(final String zkConnect) {
		this.zkConnect = zkConnect;
	}
}

2.2 Le Endpoint

package org.giwi.camel.kafka.component;

import java.net.URI;
import java.net.URISyntaxException;

import org.apache.camel.Consumer;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.impl.DefaultPollingEndpoint;

public class KafkaEndpoint extends DefaultPollingEndpoint {
	private final int concurrentConsumers = 1;
	private String groupId;
	private final String topicName;

	public KafkaEndpoint(final String endPointURI, final KafkaComponent component, final URI httpURI) throws URISyntaxException {
		super(endPointURI, component);
		topicName = httpURI.getHost();
	}

	@Override
	public Consumer createConsumer(final Processor processor) throws Exception {
		return new KafkaConsumer(this, processor);
	}

	@Override
	public Producer createProducer() throws Exception {
		return new KafkaProducer(this);
	}

	public String getGroupId() {
		return groupId;
	}

	public String getTopicName() {
		return topicName;
	}

	@Override
	public boolean isSingleton() {
		return true;
	}

	public void setGroupId(final String groupId) {
		this.groupId = groupId;
	}

	public int getConcurrentConsumers() {
		return concurrentConsumers;
	}

}

2.3 Le producteur

package org.giwi.camel.kafka.component;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.javaapi.producer.ProducerData;
import kafka.message.Message;
import kafka.producer.ProducerConfig;

import org.apache.camel.Exchange;
import org.apache.camel.impl.DefaultProducer;

public class KafkaProducer extends DefaultProducer {

	private final KafkaEndpoint endpoint;
	private final Producer producer;

	public KafkaProducer(final KafkaEndpoint endpoint) {
		super(endpoint);
		this.endpoint = endpoint;
		final Properties props = new Properties();
		props.put("zk.connect", ((KafkaComponent) endpoint.getComponent()).getZkConnect());
		final ProducerConfig config = new ProducerConfig(props);
		producer = new kafka.javaapi.producer.Producer(config);
	}

	@Override
	protected void doStop() throws Exception {
		super.doStop();
		producer.close();
	}

	@Override
	public void process(final Exchange exchange) throws Exception {
		final ProducerData data = new kafka.javaapi.producer.ProducerData(endpoint.getTopicName(), new Message(toBytes(exchange.getIn().getBody())));
		producer.send(data);

	}

	private byte[] toBytes(final Object object) throws IOException {
		final ByteArrayOutputStream baos = new ByteArrayOutputStream();
		final ObjectOutputStream oos = new ObjectOutputStream(baos);
		oos.writeObject(object);
		return baos.toByteArray();
	}
}

2.4 Le consommateur

package org.giwi.camel.kafka.component;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaMessageStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.Message;

import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.impl.DefaultConsumer;

public class KafkaConsumer extends DefaultConsumer {
	private final ExecutorService executor;

	public KafkaConsumer(final KafkaEndpoint endpoint, final Processor processor) {
		super(endpoint, processor);
		final String topic = endpoint.getTopicName();
		final Properties props = new Properties();
		props.put("zk.connect", ((KafkaComponent) endpoint.getComponent()).getZkConnect());
		props.put("groupid", endpoint.getGroupId());
		final ConsumerConfig config = new ConsumerConfig(props);
		final ConsumerConnector connector = Consumer.createJavaConsumerConnector(config);
		final Map topicmap = new HashMap() {
			private static final long serialVersionUID = 1L;
			{
				put(topic, endpoint.getConcurrentConsumers());
			}
		};
		final List streams = connector.createMessageStreams(topicmap).get(topic);
		executor = Executors.newFixedThreadPool(endpoint.getConcurrentConsumers());
		// consume the messages in the threads
		for (final KafkaMessageStream stream : streams) {
			executor.submit(new Runnable() {
				@Override
				public void run() {
					for (final Message message : stream) {
						final Exchange exchange = endpoint.createExchange();
						final byte[] bytes = new byte[message.payload().remaining()];
						message.payload().get(bytes);
						try {
							exchange.getIn().setBody(toObject(bytes));
							getProcessor().process(exchange);
						} catch (final Exception e) {
							exchange.setException(e);
						} finally {
							if (exchange.getException() != null) {
								getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException());
							}
						}
					}
				}
			});
		}
	}

	private static Object toObject(final byte[] bytes) throws IOException, ClassNotFoundException {
		Object object = null;
		object = new ObjectInputStream(new ByteArrayInputStream(bytes)).readObject();
		return object;
	}
}

2.4 La configuration

Ce test est démarré dans un Tomcat (cf. ici). Voici le fichier applicationContext.xml.

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xmlns:context="http://www.springframework.org/schema/context"
 xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
 http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">

 <camelContext xmlns="http://camel.apache.org/schema/spring">
 <package>org.giwi.camel.kafka.routes</package>
 </camelContext>

 <bean id="kafka">
 <property name="zkConnect" value="localhost:2181" />
 </bean>
</beans>

2.5 La route Camel

Et enfin le testDrive :

package org.giwi.camel.kafka.routes;

import org.apache.camel.builder.RouteBuilder;

public class KafkaTestDrive extends RouteBuilder {

	@Override
	public void configure() throws Exception {
		// go !
		from("timer://foo?fixedRate=true&period=5000").setBody(constant("hello from Giwi Softwares")).to("kafka:TOPIC-TEST");

		// En réception
		from("kafka:TOPIC-TEST/1?groupId=camelTest").log("${body}");

	}
}

3. Conclusion

Bon, ok, c’est simpliste et quick&dirty (je crois que je l’ai déjà dit), mais ça marche. Et comme d’habitude, je suis ouvert à vos proposition pour améliorer cet exemple.

Partager c'est la vie

Laisser un commentaire

Votre adresse e-mail ne sera pas publiée. Les champs obligatoires sont indiqués avec *