Aller au contenu principal
Logo GiwiSoft
DevOps pour Apache NiFi : Automatisez vos tests avec nifi-tester

DevOps pour Apache NiFi : Automatisez vos tests avec nifi-tester

Xavier MARIN Xavier MARIN Non classé

Dans de nombreuses architectures data modernes, Apache NiFi est utilisé pour orchestrer l’ingestion, la transformation et la distribution de données entre systèmes hétérogènes. Sa capacité à modéliser des pipelines via un graphe de processors en fait une plateforme extrêmement puissante pour la construction de dataflows complexes.

Cependant, cette approche pose un problème majeur lorsqu’on veut appliquer des pratiques d’ingénierie modernes :

  • tests automatisés
  • CI/CD
  • reproductibilité
  • validation avant déploiement

Contrairement aux microservices classiques, un flow NiFi est souvent défini visuellement, et non comme un artefact testable.

C’est précisément ce problème que j’ai voulu résoudre avec nifi-tester : un outil permettant de tester des flows NiFi de manière automatisée et reproductible.

Dans cet article, je détaille :

  • le problème de test dans NiFi
  • l’architecture de l’outil
  • les choix techniques
  • comment intégrer ces tests dans une pipeline CI/CD.

Le problème : les flows NiFi ne sont pas naturellement testables

Dans un projet classique, les tests sont une évidence :

  • unit tests
  • integration tests
  • contract tests
  • end-to-end tests

Mais avec NiFi, plusieurs difficultés apparaissent.

1. Les pipelines sont définis dans une interface graphique

Les flows NiFi sont généralement :

  • créés dans le canvas web
  • versionnés via un registry
  • exportés sous forme de flow definitions

Cela signifie que le pipeline est une configuration runtime, pas du code directement exécutable.

Conséquences :

  • difficile de tester localement
  • difficile d’automatiser
  • difficile de reproduire un bug.

2. Les pipelines sont fortement intégrés

Un flow NiFi interagit souvent avec :

  • Kafka
  • S3
  • bases de données
  • APIs REST
  • systèmes legacy.

Le test devient alors un problème de simulation d’environnement.

Sans outil dédié, les équipes font souvent :

  • des tests manuels
  • des validations dans un environnement de staging
  • des vérifications via la provenance NiFi.

Cela ralentit énormément les cycles de développement.

3. Les pipelines data doivent suivre les mêmes standards que les services

Aujourd’hui, dans une plateforme data mature, les pipelines doivent être :

  • versionnés dans Git
  • validés automatiquement
  • déployés via CI/CD.

Autrement dit :

les pipelines doivent être traités comme du code.

C’est la philosophie derrière nifi-tester.

Objectif : rendre les flows NiFi testables

L’idée centrale du projet est simple :

un flow NiFi doit pouvoir être testé comme une fonction.

Conceptuellement :

Input Dataset


NiFi Flow


Output Dataset

Un test doit donc définir :

  • une entrée
  • un flow à exécuter
  • un résultat attendu.

Architecture de nifi-tester

L’outil repose sur trois composants principaux.

+-------------------+
| Test Definitions |
+---------+---------+
|
v
+---------+---------+
| Test Runner |
+---------+---------+
|
v
+---------+---------+
| NiFi Execution |
| Environment |
+---------+---------+
|
v
+-------------------+
| Result Validator |
+-------------------+

Chaque étape correspond à une responsabilité bien définie.

Définition des tests

Les tests sont définis avec Junit :

@Test
@DisplayName("Deploy Groovy script pipeline")
void testGroovyScriptPipeline() throws Exception {
var result = tester.deployPipeline(
new java.io.File("src/test/resources/pipelines/groovy-script-pipeline.yaml"),
"root"
);

assertNotNull(result);
assertTrue(result.isSuccess(), "Deployment failed: " + result.getMessage());

if (result.isSuccess()) {
String pgId = result.getProcessGroupId();
var group = tester.getProcessGroup(pgId);
assertNotNull(group);

// Start the process group
tester.startProcessGroup(pgId);

// Stop LogResult so flow file stays in the queue before it
String logResultId = tester.findProcessorIdByName(pgId, "LogResult");
if (logResultId != null) {
tester.stopProcessor(logResultId);
}

// Find the input port
String inputPortId = tester.findInputPortIdByName(pgId, "Input");
assertNotNull(inputPortId, "Input port not found");

// Wait for port to start up before pushing data via Site-to-Site
Thread.sleep(5000);

// Inject "hello" into the input port
tester.injectFlowFile(inputPortId, "hello");

// Find the connection between AppendWorld and LogResult
String connId = tester.findConnectionId(pgId, "AppendWorld", "LogResult");
assertNotNull(connId, "Connection not found");

// Wait for processing
long start = System.currentTimeMillis();
boolean found = false;
while (System.currentTimeMillis() - start < 15000) {
var contents = tester.getFlowFileContentsAsString(connId);
for (String content : contents) {
if (content.contains("hello world")) {
found = true;
break;
}
}
if (found) break;
Thread.sleep(500);
}
assertTrue(found, "Expected to find 'hello world' in flow file content");

// Stop the process group
tester.stopProcessGroup(pgId);

// Cleanup
tester.deleteProcessGroup(pgId);
}
}

Et les flows, en Yaml

name: Groovy Script Pipeline
description: A pipeline that uses Groovy script to append 'world' to 'hello'
parentGroupId: root

inputPorts:
- name: Input
x: 100
y: 100

processors:
- name: AppendWorld
type: org.apache.nifi.processors.groovyx.ExecuteGroovyScript
x: 400
y: 100
schedulingStrategy: TIMER_DRIVEN
schedulingPeriod: "1 sec"
properties:
Script Body: |
import org.apache.nifi.flowfile.FlowFile
import java.nio.charset.StandardCharsets

FlowFile flowFile = session.get()
if (flowFile != null) {
String content = ""
session.read(flowFile, { inputStream ->
content = new String(inputStream.readAllBytes(), StandardCharsets.UTF_8)
})
content = content + " world"
flowFile = session.write(flowFile, { outputStream ->
outputStream.write(content.getBytes(StandardCharsets.UTF_8))
})
session.transfer(flowFile, REL_SUCCESS)
}

- name: LogResult
type: org.apache.nifi.processors.standard.LogAttribute
x: 700
y: 100
schedulingStrategy: TIMER_DRIVEN
schedulingPeriod: "1 sec"
properties:
Log Level: info
Log Prefix: "Result: "
autoTerminatedRelationships:
- success

connections:
- name: Input to Groovy
sourceId: ${Input}
destinationId: ${AppendWorld}
x: 250
y: 100

- name: Groovy to Log
sourceId: ${AppendWorld}
destinationId: ${LogResult}
relationships:
- success
x: 550
y: 100

Conclusion

Les pipelines de données sont aujourd’hui des composants critiques de l’infrastructure logicielle.

Pourtant, ils restent souvent moins testés que les applications classiques.

Avec nifi-tester, l’objectif est simple :

apporter aux pipelines NiFi les mêmes pratiques d’ingénierie que pour le code applicatif.

En introduisant :

  • des tests automatisés
  • des validations reproductibles
  • une intégration CI/CD

il devient possible de traiter les dataflows comme des artefacts logiciels à part entière.