Tutoriel Kafka : De son installation à l'éxécution d'un premier programme en Java
par
, 13/04/2021 à 13h36 (2315 Affichages)
Information
Le tutoriel est également disponible au format developpez.com via le lien https://dimbo.developpez.com/tutorie...rogramme-java/
1. Pour commencer : qu’est-ce que Kafka ?
Vous pouvez le découvrir sur le site officiel (https://kafka.apache.org/) ou au travers de ce tutoriel https://soat.developpez.com/tutoriel...onctionnement/.
2. Information et objectif
J’ai rencontré plusieurs difficultés en voulant utiliser Kafka avec Java. Ainsi, j’ai souhaité réaliser ce tutoriel pour partager mon expérience avec d’autres personnes qui voudraient mettre en place Kafka.
L’objectif est de pouvoir envoyer un message et le réceptionner à l’aide de la messagerie Kafka en utilisant la langage JAVA.
Ce tutoriel expliquera comment installer Kafka à l'aide de Docker puis lancer un programme.
Nous verrons étape par étape comment y arriver...
Limite : Ce tutoriel n’a pas vocation à expliquer le fonctionnement de Kafka.
3. Environnement
3.1. Technique
- Système d’exploitation : Windows 10 (il est possible d’utiliser Linux).
- Docker pour déployer Kafka.
- Dans ce tutoriel, Docker Desktop de Windows a été utilisé (https://docs.docker.com/docker-for-windows/install/)
- A savoir : il est nécessaire d’activer la virtualisation dans le Bios et d’installer Linux pour Windows 10 (https://docs.microsoft.com/fr-fr/win.../install-win10)
- Environnement de développement : Eclipse(https://www.eclipse.org/)
- Langage : Java avec le Framework Spring et Maven.
3.2. Schéma
4. Démarrage de l’image docker Kafka
4.1. Installation et lancement
- Démarrer Docker
- Créer un fichier « docker-compose.yml » dans un répertoire
- Copier le contenu ci-dessous dans le fichier
Pour information : Les images utilisées de docker pour Kafka sont :
- https://hub.docker.com/r/wurstmeister/kafka
- https://hub.docker.com/_/zookeeper
- https://hub.docker.com/r/kafkamanager/kafka-manager
Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34 version: '3.5' services: zookeeper: image: zookeeper ports: - "2181:2181" kafka: image: wurstmeister/kafka ports: - "9092:9092" environment: # HOSTNAME_COMMAND: "route -n | awk '/UG[ \t]/{print $$2}'" KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_HOST_NAME: kafka KAFKA_ADVERTISED_PORT: 9092 KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_JMX_OPTS: "-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=kafka -Dcom.sun.management.jmxremote.rmi.port=1099" JMX_PORT: 1099 volumes: - /var/run/docker.sock:/var/run/docker.sock depends_on: - zookeeper kafka-manager: image: kafkamanager/kafka-manager ports: - "9000:9000" links: - zookeeper - kafka environment: ZK_HOSTS: zookeeper:2181- En ligne de commande (CMD sous Windows), se positionner dans le répertoire et lancer la ligne de commande : «*docker-compose up*» ou* «*docker compose up*» (suivant la version).
- Les images vont se télécharger et démarrer (la première exécution peut prendre un certain temps).
- Avec une autre invite de commande, il est également possible de vérifier que les conteneurs sont démarrés en tapant «*docker ps*».
4.2. Administration par kafka-manager (CMAK)
- Via un navigateur, lancer http://localhost:9000
- Ajouter un cluster en renseignant les informations ci-après
5. Modélisation
Diagramme de classe
Diagramme de séquence
6. Initialisation du projet
- Aller sur le site https://start.spring.io/
- Renseigner les champs et ajouter dans les « Dependencies » (Spring for Apache Kafka) et enfin « Generate »
- Télécharger le fichier « zip » et l’extraire dans un répertoire
- Lancer Eclipse
- Importer le dossier
- Ouvrir l’arborescence du projet
- Ouvrir et ajouter dans le fichier « pom.xml »
Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5 <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency>
- Si vous lancez le programme en local, il faudra ajouter dans le fichier Host « 127.0.0.1 kafka » (C:\Windows\System32\drivers\etc\hosts)
Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29 # Copyright (c) 1993-2009 Microsoft Corp. # # This is a sample HOSTS file used by Microsoft TCP/IP for Windows. # # This file contains the mappings of IP addresses to host names. Each # entry should be kept on an individual line. The IP address should # be placed in the first column followed by the corresponding host name. # The IP address and the host name should be separated by at least one # space. # # Additionally, comments (such as these) may be inserted on individual # lines or following the machine name denoted by a '#' symbol. # # For example: # # 102.54.94.97 rhino.acme.com # source server # 38.25.63.10 x.acme.com # x client host # localhost name resolution is handled within DNS itself. # 127.0.0.1 localhost # ::1 localhost # Added by Docker Desktop 192.168.3.138 host.docker.internal 192.168.3.138 gateway.docker.internal # To allow the same kube context to work on the host and the container: 127.0.0.1 kubernetes.docker.internal 127.0.0.1 kafka # End of section
7. Programme
7.1. Structure du programme
Voici la structure du programme
7.2 Data To Object (dto)
Person.java
Utilisation d'un objet ex : "Person" qui sera transmis entre l'expediteur et le destinataire
7.3. receiver
Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49 package fr.tutoriel.kafka.dto; /** * Exemple d'un objet */ public class Person { private String firstName; private String lastName; private int age; public Person() { } public Person(String firstName, String lastName, int age) { this.firstName = firstName; this.lastName = lastName; this.age = age; } public String getFirstName() { return firstName; } public void setFirstName(String msg) { this.firstName = msg; } public String getLastName() { return lastName; } public void setLastName(String name) { this.lastName = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } @Override public String toString() { return "{firstName:\""+firstName+"\", lastName:\""+lastName+"\", age:"+String.valueOf(age)+"}"; } }
Développement de la partie Destinataire pour écouter et réceptionner les messages
IProcess.java
Interface pour l'exécution d'un traitement
(NB : facultatif : pour anticiper la séparation du programme)
IReceiver.java
Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package fr.tutoriel.kafka.receiver; import fr.tutoriel.kafka.dto.Person; /** * Interface pour gerer les traitements * * */ public interface IProcess { /** * Exécuter un traitement * @param info * @param person */ public void execute(String info, Person person); }
Interface pour écouter la réception d'un message.
Il serait possible d'utiliser la même interface pour écouter un message avec une autre messagerie que Kafka (ex : RabbitMQ)
(NB : facultatif : pour anticiper la séparation du programme)
KafkaReceiverConfig.java
Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
6
7
8
9
10
11
12
13
14 package fr.tutoriel.kafka.receiver; import fr.tutoriel.kafka.dto.Person; /** * Interface pour écouter la réception d'un message * * Il serait possible d'utiliser la même interface pour écouter * un message avec une autre messagerie que Kafka (ex : RabbitMQ) * */ public interface IReceiver { public void listen(String topicName, Person person); }
Configuration du destinataire avec l'adresse du serveur, le groupe, type de message en JSON.
KafkaReceiver.java
Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46 package fr.tutoriel.kafka.receiver; import java.util.HashMap; import java.util.Map; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.support.serializer.JsonDeserializer; import fr.tutoriel.kafka.dto.Person; /** * Configuration du destinataire */ @EnableKafka @Configuration public class KafkaReceiverConfig { @Value(value = "${kafka.bootstrapAddress:kafka:9092}") // Adresse du serveur Kafka pour envoyer les messages (il est possible de le configurer via application.properties) private String bootstrapAddress; private static final String groupId = "Tutorial"; // Définition du groupe ex: Tutorial @Bean public ConsumerFactory<String, Person> receiverFactory() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapAddress); // Configuration de l'adresse du serveur props.put(ConsumerConfig.GROUP_ID_CONFIG,groupId);// Configuration du groupe return new DefaultKafkaConsumerFactory<>(props,new StringDeserializer(),new JsonDeserializer<>(Person.class)); } @Bean public ConcurrentKafkaListenerContainerFactory<String, Person> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, Person> factory =new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(receiverFactory()); return factory; } }
Permet d'écouter la réception d'un message et d'exécuter un traitement (au travers de l'interface)
ProcessExample.java
Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56 package fr.tutoriel.kafka.receiver; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Service; import fr.tutoriel.kafka.dto.Person; /** * Receveur Kafka * */ @Service public class KafkaReceiver implements IReceiver { @Autowired private IProcess process; // Affectation de l'instance pour la gestion des traitements /** * Ecoute sur le topic "Tuto1", s'il existe un message * * @param person * Objet reçu */ @KafkaListener(topics = "Tuto1",containerFactory="kafkaListenerContainerFactory") public void listenTuto1(Person person) { listen("Tuto1",person); } /** * Ecoute sur le topic "Tuto2", s'il existe un message * @param person * Objet reçu */ @KafkaListener(topics = "Tuto2",containerFactory="kafkaListenerContainerFactory") public void listenTuto2(Person person) { listen("Tuto2",person); } /** * * Exécution d'un traitement * @param topicName * Nom du topic * @param topicName * Objet reçu * */ @Override public void listen(String topicName, Person person) { process.execute(topicName+"Info", person); } }
Permet d'exécuter un traitement.
Dans l'exemple, affichage d'un texte dans la console
7.4. Sender
Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26 package fr.tutoriel.kafka.receiver; import org.springframework.stereotype.Service; import fr.tutoriel.kafka.dto.Person; /** * Traitement exemple */ @Service public class ProcessExample implements IProcess { /** * Exécuter le traitement : Dans notre exemple affichage du message dans la console * @param topicName * Nom du Topic * @param person * objet person */ @Override public void execute(String info, Person person) { System.out.println("Message reçu: info="+info +", person=" + person); } }
Développement de la partie Expéditeur pour envoyer les messages
ISender.java
Interface pour envoyer un message.
Il serait possible d'utiliser la même interface pour envoyer un message avec une autre messagerie que Kafka (ex : RabbitMQ)
(NB : facultatif : pour anticiper la séparation du programme)
KafkaSenderConfig.java
Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package fr.tutoriel.kafka.sender; import fr.tutoriel.kafka.dto.Person; /** * Interface pour envoyer un message * * Il serait possible d'utiliser la même interface pour envoyer * un message avec une autre messagerie que Kafka (ex : RabbitMQ) * */ public interface ISender { void send(String topicName, Person person); }
Configuration de l'expéditeur avec l'adresse du serveur, type de message en JSON
KafkaTopicConfig.java
Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41 package fr.tutoriel.kafka.sender; import java.util.HashMap; import java.util.Map; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.support.serializer.JsonSerializer; import fr.tutoriel.kafka.dto.Person; /** * Configuration de l'expéditeur */ @Configuration public class KafkaSenderConfig { @Value(value = "${kafka.bootstrapAddress:kafka:9092}") // Adresse du serveur Kafka (il est possible de le configurer via application.properties) private String bootstrapAddress; @Bean public ProducerFactory<String, Person> senderFactory() { Map<String, Object> configProps = new HashMap<>(); configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapAddress); // Adresse du serveur configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class); configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,JsonSerializer.class); return new DefaultKafkaProducerFactory<>(configProps); } @Bean public KafkaTemplate<String, Person> kafkaTemplate() { return new KafkaTemplate<>(senderFactory()); } }
Configuration Topic avec l'adresse du serveur
KafkaSender.java
Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29 package fr.tutoriel.kafka.sender; import java.util.HashMap; import java.util.Map; import org.apache.kafka.clients.admin.AdminClientConfig; //import org.apache.kafka.clients.admin.NewTopic; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.core.KafkaAdmin; /** * Configuration Topic */ @Configuration public class KafkaTopicConfig { @Value(value = "${kafka.bootstrapAddress:kafka:9092}") // Adresse du serveur Kafka (il est possible de le configurer via application.properties) private String bootstrapAddress; @Bean public KafkaAdmin kafkaAdmin() { Map<String, Object> configs = new HashMap<>(); configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); return new KafkaAdmin(configs); } }
Permet d'envoyer un message (Person) dans un topic.
7.5. Main
Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36 package fr.tutoriel.kafka.sender; import org.apache.kafka.clients.admin.NewTopic; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; import fr.tutoriel.kafka.dto.Person; /** * * Expediteur Kafka */ @Service public class KafkaSender implements ISender { @Autowired private KafkaTemplate<String, Person> kafkaTemplate; /** * Permet d'envoyer un message (Person) dans un topic * * @param topicName * Nom du topic * @param person * Objet à envoyer */ @Override public void send(String topicName, Person person) { System.out.println("Message à envoyer : topicName="+topicName +", person=" + person); new NewTopic(topicName, 1, (short) 1); kafkaTemplate.send(topicName, person); } }
Programme principal
KafkaApplication.java
Spring boot permet de démarrer et configurer les parties "réception" et "destinataire".
La méthode "run" permet d'envoyer en boucle des messages vers une messagerie (Kafka ou autre).
Au travers de la méthode "send", un texte est affiché dans la console.
En asynchrone, la réception d'un message est affiché dans la console.
8. Exécution
Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48 package fr.tutoriel.kafka; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import fr.tutoriel.kafka.dto.Person; import fr.tutoriel.kafka.sender.ISender; /** * KafkaApplication * Programme principal */ @SpringBootApplication public class KafkaApplication implements CommandLineRunner { @Autowired private ISender sender; // Affectation de l'instance sender public static void main(String[] args) { SpringApplication.run(KafkaApplication.class, args); } /** * Exécuter le programme * Permet d'envoyer des messages vers une messagerie (Kafka ou autre) * * La réception du message est asynchrone (en attente) */ @Override public void run(String... args) throws Exception { int ageTuto1 = 10; // int ageTuto2 = 20; // while (true) { // Boucle infinie Person personTuto1 = new Person("Jean", "DUPOND", ageTuto1); // Créer un objet pour Tuto1 sender.send("Tuto1", personTuto1); // Envoyer l'objet sur le topic "Tuto1" Thread.sleep(3000); // Attendre 3s Person personTuto2 = new Person("Pierre","DURAND",ageTuto2); // Créer un objet pour Tuto2 sender.send("Tuto2", personTuto2); // Envoyer l'objet sur le topic "Tuto2" Thread.sleep(3000); // Attendre 3s ageTuto1++; // Incrémentation du texte ageTuto2++; // Incrémentation du texte } } }
8.1 Exécuter
8.2. Résultat
Après exécution, il est possible de visualiser dans la console les messages (envoi et réception).
9. Pour aller plus loin
Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
6
7 Message à envoyer : topicName=Tuto2, person={firstName:"Pierre", lastName:"DURAND", age:20} Message reçu: info=Tuto2Info, person={firstName:"Pierre", lastName:"DURAND", age:20} Message à envoyer : topicName=Tuto1, person={firstName:"Jean", lastName:"DUPOND", age:11} Message reçu: info=Tuto1Info, person={firstName:"Jean", lastName:"DUPOND", age:11} Message à envoyer : topicName=Tuto2, person={firstName:"Pierre", lastName:"DURAND", age:21} Message reçu: info=Tuto2Info, person={firstName:"Pierre", lastName:"DURAND", age:21}
Il serait judicieux de séparer le programme au moins en 3 parties :
- Une bibliothèque partagée (pour mettre la classe "Person" et les interfaces)
- Une partie "Sender"
- Une partie "Receiver"
Il est possible en gardant cette strucuture de faire fonctionner le programme avec une autre messagerie comme RabbitMQ (https://www.rabbitmq.com/)
10. Liens
- Kafka : https://kafka.apache.org/
- Docker*: https://www.docker.com/
- Code source inspiré du site : https://www.baeldung.com/spring-kafka