Camel, Tomcat, camel-exec et camel-file

Il est tout à fait possible d’utiliser Camel pour effectuer différentes tâches de routage ou de connexion au sein d’un War déployé dans un Tomcat. Les principes de développement de Camel sont les mêmes.
Dans cet exemple, nous allons créer un War exécutant une route Camel simple : attendre la mise à jour d’un fichier, exécuter une commande shell et enfin traçer le résultat de cette commande dans un fichier de log.

Durant cet exercice, nous allons lever 3 difficultés :

  • démarrer le contexte Spring depuis Tomcat et ainsi démarrer les routes
  • réagir sur la mise à jour d’un fichier sans l’effacer ou le traiter en mitraillette
  • gérer proprement la commande exec de Camel

1) Intégration dans Tomcat

Etape n°1, modifier le web.xml pour déclencher Spring :



  TestCamelWar
  
    index.jsp
  
  

org.springframework.web.context.ContextLoaderListener
  

Puis ajouter dans /WebContent/WEB-INF/ un fichier applicationContext.xml :



  
  
    
  
  
  
  

2) La route Camel

Dans cette route nous allons attendre le fichier, puis exécuter une commande shell et enfin loguer le résultat dans un fichier. Nous allons effectuer la commande shell : « ssh toto@monserveur.domaine.com touch toto20101209.txt »

package org.giwi.file.watcher.route;

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.exec.ExecBinding;
import org.giwi.file.watcher.FileWatcherProcessor;

public class FileWatcher extends RouteBuilder {
  private final SimpleDateFormat format = new SimpleDateFormat("yyyyMMdd");
  /**
   * (non-Javadoc)
   * @see org.apache.camel.builder.RouteBuilder#configure()
   */
  @Override
  public void configure() throws Exception {
    /**
     * Gestion des exceptions
     */
    onException(Exception.class).handled(true).to("direct:log");

    from("direct:log").to("file:/logs/?fileExist=Append&fileName=log-${date:now:yyyyMMdd}.txt");

    from("file:/input/?noop=true&idempotentRepository=#idempotentBean&include=monFichier.*xml")
    // On place dans les entêtes les paramètres du ssh
    .setHeader(ExecBinding.EXEC_COMMAND_ARGS, constant(getExecCommand("toto")))
    .to("exec:ssh?useStderrOnEmptyStdout=true")
    // On traite les stdErr et stdOut
    .process(new FileWatcherProcessor())
    // On log le résultat dans un fichier
    .to("direct:log");
  }
  /**
   * @param remoteFile
   * @return
   */
  private List getExecCommand(final String remoteFile) {
    return new ArrayList() {
      private static final long serialVersionUID = 2722515747862036106L;
        {
        final Date now = new Date();
        add("toto@monserveur.domaine.com");
        add("touch");
        add(remoteFile + format.format(now) + ".xml");
        }
    };
  }
}

Explication :

  • from(« file:/input/?noop=true&idempotentRepository=#idempotentBean&include=monFichier.*xml ») :
    Nous allons scruter le répertoire input à la recherche d’un fichier dont le nom correspond à l’expression régulière monFichier.*xml. Une fois ce fichier trouvé, noop=true précise que l’on en fera rien (pas de déplacement ou d’éffacement). Par contre, pour éviter de boucler sur ce fichier, nous nous appuierons sur le bean idempotentBean qui précisera grâce à la commande idempotentRepository=#idempotentBean s’il est éligible ou non à chaque tour du pooleur de Camel.
  • setHeader(ExecBinding.EXEC_COMMAND_ARGS, constant(getExecCommand(« toto »))) : Nous allons préparer la liste des arguments à passer à la commande à exécuter. Cela se fait avec la création d’une ArrayList (crée inline dans notre cas).
  • to(« exec:ssh?useStderrOnEmptyStdout=true ») : exécution de la commande shell<:li>
  • process(new FileWatcherProcessor()) : on va traiter le résultat de l’exécution de cette commande avec un processor Camel classique.

2.1) idempotentBean

Il s’agit de notre propre implémentation du gestionnaire de cache du pooleur de fichiers Camel. D’origine, il en existe 2 : un système de cache en mémoire (par défaut) et un système de cache en fichier. Dans ces deux cas, même si le timestamp du fichier ciblé change, ces deux pooleurs ne réagissent pas, ils ne détectent que la présence d’un fichier.

package org.giwi.file.watcher.beans;

import java.io.File;
import java.util.HashMap;
import java.util.Map;
import org.apache.camel.spi.IdempotentRepository;
/**
 * Classe permettant de gérer un cache pour éviter de traiter en boucle les fichiers.
 */
public class IdempotentBean implements IdempotentRepository {
  /**
   * Le cache en lui même
   */
  private final Map idempotentCache = new HashMap();
    /**
     * (non-Javadoc)
     * @see org.apache.camel.spi.IdempotentRepository#add(java.lang.Object)
     */
    @Override
    public boolean add(final String arg0) {
      return false;
    }
    /**
     * (non-Javadoc)
     * @see org.apache.camel.spi.IdempotentRepository#confirm(java.lang.Object)
     */
    @Override
    public boolean confirm(final String arg0) {
      return true;
    }
    /**
     * (non-Javadoc)
     * @see org.apache.camel.spi.IdempotentRepository#contains(java.lang.Object)
     */
    @Override
    public boolean contains(final String filePath) {
      synchronized (this) {
        // On récupère le fichier scruté
        final File file = new File(filePath);
        // si il n'est pas en cache on l'ajoute avec un lastModified à 0
        if (!idempotentCache.containsKey(filePath)) {
          idempotentCache.put(filePath, Long.valueOf(0));
        }
        // On compare les lastModifed
        if (file.lastModified() > idempotentCache.get(filePath)) {
          // si le cache est plus vieux que le timestamp du fichier, on le met à jour.
          idempotentCache.put(filePath, Long.valueOf(file.lastModified()));
          // Et oui, on doit le traiter
          return false;
        }
        // Sinon on ne le traite pas.
        return true;
      }
    }
    /**
     * (non-Javadoc)
     * @see org.apache.camel.spi.IdempotentRepository#remove(java.lang.Object)
     */
    @Override
    public boolean remove(final String arg0) {
      return false;
    }
  }
}

2.2) FileWatcherProcessor

Ce processor va analyser le résultat de la commande exec et générer une ligne de log.

package org.giwi.file.watcher;

import java.io.ByteArrayInputStream;
import java.text.SimpleDateFormat;
import java.util.Date;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.component.exec.ExecResult;
import org.apache.log4j.Logger;
/**
 * Simple processor afin de traiter les évènements liés à l'appel de la ligne de commande
 */
public class FileWatcherProcessor implements Processor {
  private static final Logger LOG = Logger.getLogger(FileWatcherProcessor.class);
  private final SimpleDateFormat format = new SimpleDateFormat("dd/MM/yyyy hh:mm:ss");
  /**
   * (non-Javadoc)
   * @see org.apache.camel.Processor#process(org.apache.camel.Exchange)
   */
  @Override
  public void process(final Exchange exchange) throws Exception {
    // On récupère l'objet issu de l'éxécution de la ligne de commande
    final ExecResult res = exchange.getIn().getBody(ExecResult.class);
    // Le header contient toujours les informations du fichier d'origine
    final String fileName = (String) exchange.getIn().getHeader("CamelFileNameOnly");
    // On va concaténer les Strings proprement quand même
    final StringBuilder sb = new StringBuilder();
    sb.append('[').append(format.format(new Date())).append(']');
    // Si le returnCode est différent, c'est qu'il y a eu une erreur
    if (res.getExitValue() != 0) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Un message a été produit sur stdErr");
        }
        sb.append(" ERROR (").append(fileName).append(") ssh ");
        // En plus on a le message d'erreur de la commande
        sb.append("returnCode= ").append(res.getExitValue())
           .append('\n').append(baToString((ByteArrayInputStream)  res.getStderr()))
           .append('\n');
        // On passe l'erreur au bloc suivant
        exchange.getOut().setBody(sb.toString());
      } else {
        // returncode à 0 : tout s'est bien passé
        if (LOG.isDebugEnabled()) {
          LOG.debug("Un message a été produit sur stdOut"); // ou pas
        }
        sb.append(" INFO   (").append(fileName).append(") ssh ");
        sb.append("returnCode= ").append(res.getExitValue()).append('\n');
        // Sinon tout s'est bien passé, on passe la sortie de la console au bloc suivant
        exchange.getOut().setBody(sb.toString());
      }
    }
  /**
   * Méthode permettant de transformer un ByteArrayInputStream en String
   * @param is
   * @return
   */
  private String baToString(final ByteArrayInputStream is) {
    final int size = is.available();
    final char[] theChars = new char[size];
    final byte[] bytes = new byte[size];
    is.read(bytes, 0, size);
    for (int i = 0; i < size;) {
        theChars[i] = (char) (bytes[i++] & 0xff);
    }
    return new String(theChars);
  }
}

Les petits malins me dirons que tant qu'à écrire des logs, autant utiliser log4j... c'est pas faux. A vous de faire mieux 🙂

3) Pour aller plus loin

Partager c'est la vie

Laisser un commentaire

Votre adresse e-mail ne sera pas publiée. Les champs obligatoires sont indiqués avec *