Faire communiquer Thrift et Camel

Dans le cadre de mes pérégrinations architecturales, j’étais tombé au hasard d’un surf sur des slides présentant l’architecture ultime en terme de performances. Cette archi utilisait Thrift et Camel. Thrift servant de protocole et de transport pour communiquer avec un bus Camel. Ça m’a intrigué car le composant Thrift n’existe pas chez Camel. Alors, je me suis arraché le peu de cheveux qu’il me reste et voici la solution.

 

 

Dans cet exemple, un client Thrift appelle une servlet Camel hébergé sur un Tomcat.

1) L’IDL de Thrift

namespace org.giwi.camel.thrift.data

const string helloServiceEntryPoint = "HelloService";

enum HelloExceptionCode {

 GENERIC_ERROR = 1;
 THRIFT_ERROR = 2;
 NOT_IMPLEMENTED = 3;
 ENCODING_ERROR = 4;
 HTTP_ERROR = 12;
} 

exception HelloException {
 1: HelloExceptionCode code,
 2: string message,
}

struct HelloServiceRequest {
 1: string name,
}

struct HelloServiceResponse {
 1: HelloServiceRequest request,
 2: string resp,
}

et le service :

namespace java org.giwi.camel.thrift.services

include "org.giwi.camel.thrift.data.thrift"

service HelloService {

 org.giwi.camel.thrift.data.HelloServiceResponse
 sayHello(1: org.giwi.camel.thrift.data.HelloServiceRequest request)
 throws (1: org.giwi.camel.thrift.data.HelloException e)
}

Je vous laisse le soin de générer les classes java relatives à cet IDL.

2) Le client Thrift

package org.giwi.camel.thrif.test;

import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;

import org.apache.http.HttpVersion;
import org.apache.http.conn.ClientConnectionManager;
import org.apache.http.conn.params.ConnManagerParams;
import org.apache.http.conn.params.ConnPerRouteBean;
import org.apache.http.conn.scheme.PlainSocketFactory;
import org.apache.http.conn.scheme.Scheme;
import org.apache.http.conn.scheme.SchemeRegistry;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.impl.conn.tsccm.ThreadSafeClientConnManager;
import org.apache.http.params.BasicHttpParams;
import org.apache.http.params.HttpConnectionParams;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TTransportException;

import com.arkea.commons.thrift.http.THttpTransport;
import com.arkea.methodes.cobolAnalyzer.thrift.data.HelloException;
import com.arkea.methodes.cobolAnalyzer.thrift.data.HelloServiceRequest;
import com.arkea.methodes.cobolAnalyzer.thrift.data.HelloServiceResponse;
import com.arkea.methodes.cobolAnalyzer.thrift.services.HelloService;

public class Test  {
 private static ClientConnectionManager cm = null;
 private static BasicHttpParams params = null;
 private static String URL = "http://localhost:8282/TomcatCamel/camel/hello";

static {
 URL netUrl = null;
 try {
 netUrl = new URL(URL);
 } catch (final MalformedURLException e) {
 e.printStackTrace();
 }
 params = new BasicHttpParams();
 params.setParameter("http.protocol.version", HttpVersion.HTTP_1_1);
 params.setParameter("http.protocol.content-charset", "UTF-8");
 params.setParameter("http.protocol.expect-continue", false);
 params.setParameter("http.connection.stalecheck", true);

 HttpConnectionParams.setSoTimeout(params, 10000); 
 HttpConnectionParams.setConnectionTimeout(params, 10000);

 ConnManagerParams.setMaxTotalConnections(params, 20);
 final ConnPerRouteBean connPerRoute = new ConnPerRouteBean(20);
 ConnManagerParams.setMaxConnectionsPerRoute(params, connPerRoute);

 final SchemeRegistry schemeRegistry = new SchemeRegistry();

 schemeRegistry.register(new Scheme(netUrl.getProtocol(), PlainSocketFactory.getSocketFactory(), netUrl.getPort()));

 cm = new ThreadSafeClientConnManager(params, schemeRegistry);
 }

 public static void main(final String[] args) {

 THttpTransport tht;
 try {
 tht = new THttpTransport(URL, new DefaultHttpClient(cm, params));
 final TProtocol loPFactory = new TCompactProtocol(tht);
 final HelloService.Client client = new HelloService.Client(loPFactory);

 final HelloServiceResponse res = client.sayHello(new HelloServiceRequest("toto"));
 System.out.println(res.getResp());
 } catch (final TTransportException e) {
 e.printStackTrace();
 } catch (final HelloException e) {
 e.printStackTrace();
 } catch (final TException e) {
 e.printStackTrace();
 }
 }
}

3) Le serveur

3.1) Le Web.xml de Tomcat

<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://java.sun.com/xml/ns/javaee"
 xmlns:web="http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd" xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd"
 id="WebApp_ID" version="2.5">
 <display-name>ThriftCamel</display-name>
 <context-param>
 <param-name>contextConfigLocation</param-name>
 <param-value>WEB-INF/applicationContext.xml</param-value>
 </context-param>
 <listener>
 <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
 </listener>

 <!-- Camel servlet -->
 <servlet>
 <servlet-name>CamelServlet</servlet-name>
 <servlet-class>org.apache.camel.component.servlet.CamelHttpTransportServlet</servlet-class>
 <load-on-startup>1</load-on-startup>
 </servlet>

 <!-- Camel servlet mapping -->
 <servlet-mapping>
 <servlet-name>CamelServlet</servlet-name>
 <url-pattern>/camel/*</url-pattern>
 </servlet-mapping>

 <welcome-file-list>
 <welcome-file>index.jsp</welcome-file>
 </welcome-file-list>
</web-app>

3.2) Le application-context.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xmlns:context="http://www.springframework.org/schema/context" xmlns:cxf="http://camel.apache.org/schema/cxf"
 xmlns:http="http://cxf.apache.org/transports/http/configuration" xmlns:sec="http://cxf.apache.org/configuration/security"
 xsi:schemaLocation="
 http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
 http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">

 <!-- Déclaration des routes camel à éxécuter -->
 <camelContext xmlns="http://camel.apache.org/schema/spring">
 <package>org.giwi.camel.thrift.routes</package>
 </camelContext>
</beans>

3.3) La route Camel proprement dite

package org.giwi.camel.thrift.routes;

import java.io.InputStream;
import java.io.OutputStream;

import javax.servlet.http.HttpServletResponse;

import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.http.HttpMessage;
import org.apache.http.protocol.HTTP;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TMessage;
import org.apache.thrift.protocol.TMessageType;
import org.apache.thrift.transport.TIOStreamTransport;

import org.giwi.camel.thrift.services.HelloService.sayHello_args;
import org.giwi.camel.thrift.services.HelloService.sayHello_result;
import org.giwi.camel.thrift.data.HelloServiceRequest;
import org.giwi.camel.thrift.data.HelloServiceResponse;

public class MainRoute extends RouteBuilder {
	@Override
	public void configure() throws Exception {

		from("servlet:///hello").process(new Processor() {
			// On décode
			@Override
			public void process(final Exchange exch) throws Exception {
				final HttpMessage mess = exch.getIn(HttpMessage.class);
				// Formidable, le contenu du body est déjà le request.getInputStream() de la request 
				final TCompactProtocol iprot = new TCompactProtocol(new TIOStreamTransport(exch.getIn().getBody(InputStream.class)));
				final TMessage msg = iprot.readMessageBegin();
				final sayHello_args args = new sayHello_args();
				args.read(iprot);
				iprot.readMessageEnd();
				// Enfin, on a notre objet désérialisé
				final HelloServiceRequest req = args.getRequest();
				// La méthode invoquée
				exch.getIn().setHeader("operationName", msg.name); 
				// la response pour streamer la sortie
				exch.getIn().setHeader("outputStream", mess.getResponse());
				// l'id du message
				exch.getIn().setHeader("seqId", msg.seqid);
				exch.getIn().setBody(req);
			}
		}).recipientList(simple("direct:testThrift.${header.operationName}"));

		from("direct:testThrift.sayHello").process(new Processor() {
			// C'est bien la méthode invoquée
			@Override
			public void process(final Exchange exch) throws Exception {
				// On manipule
				final HelloServiceRequest req = exch.getIn().getBody(HelloServiceRequest.class);
				// On forge la réponse
				final HelloServiceResponse resp = new HelloServiceResponse();
				resp.setRequest(req);
				resp.setResp("Hello " + req.getName());
				exch.getIn().setBody(resp);
			}

		}).process(new Processor() {
			// On envoie lé réponse
			@Override
			public void process(final Exchange exch) throws Exception {
				final HelloServiceResponse resp = exch.getIn().getBody(HelloServiceResponse.class);
				final sayHello_result result = new sayHello_result();
				result.setSuccess(resp);
				final HttpServletResponse response = (HttpServletResponse) exch.getIn().getHeader("outputStream");
				// On sérialise l'objet de réponse sur le stream de sortie
				final OutputStream out = response.getOutputStream();
				final TCompactProtocol oprot = new TCompactProtocol(new TIOStreamTransport(out));
				oprot.writeMessageBegin(new TMessage("sayHello", TMessageType.REPLY, (Integer) exch.getIn().getHeader("seqId")));
				result.write(oprot);
				oprot.writeMessageEnd();
				// et hop
				exch.getOut().setHeader(HTTP.CONTENT_TYPE, "application/x-thrift");
				exch.getIn().setBody(out);
			}
		});
	}
}

4) Conclusion

Bon ok, je suis sur on peut faire mieux, comme créer un composant Camel plus « natif » en lui passant la classe d’implémentation en paramètre.

Ouvert à vos propositions, je suis.

Partager c'est la vie

Laisser un commentaire

Votre adresse e-mail ne sera pas publiée. Les champs obligatoires sont indiqués avec *