Block Image

In questo tutorial vedremo come creare facilmente un'applicazione event-driven con Spring Cloud Stream e Kafka.

L'applicazione che creeremo simulerà l'invio e la ricezione di eventi di un sensore di temperatura.
Vedremo quindi come scrivere Producer e Consumer, anche Reactive, in Spring Cloud Stream.

Un po' di teoria

Cos'è Spring Cloud Stream

Spring Cloud Stream è un modulo di Spring che fonde Spring Integration (che implementa i pattern d'integrazione) con Spring Boot.
L'obiettivo di questo modulo è quello di permettere allo sviluppatore di concentrarsi unicamente sulla business logic di applicazioni event-driven, senza preoccuparsi del codice per gestire i diversi tipi di sistemi di messaggi.

Infatti con Spring Cloud Stream, possiamo scrivere codice per produrre/consumare messaggi su Kafka, ma lo stesso codice funzionerebbe anche se utilizzassimo RabbitMQ, AWS Kinesis, AWS SQS, Azure EventHubs, etc!

Componenti di Spring Cloud Stream

Spring Cloud Stream ha tre componenti principali:

  • Destination Binder: il componente che fornisce l'integrazione con i sistemi di messaggi esterni come RabbitMQ o Kafka.
  • Binding: Un bridge tra il sistema di messaggi esterno e i producer/consumer forniti dall'applicazione.
  • Message: La struttura dati usata dai producer/consumer per comunicare con i Destination Binders.
Block Image
Immagine presa da https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#spring-cloud-stream-reference

Dall'immagine sopra si può notare che i bindings sono di input e di output. Possiamo combinare più binders; ad esempio potremmo leggere un messaggio da RabbitMQ e scriverlo su Kafka, creando una semplice funzione Java (a breve approfondiremo questa tematica).

Per saperne di più sul multi-binders, ho lasciato il link della doc ufficiale nei riferimenti, in fondo alla pagina.

Lo sviluppatore potrà concentrarsi unicamente sulla Business Logic nella forma di functions, dopodiché quando sceglierà il sistema di messaggi da utilizzare, dovrà importare la dipendenza del binder scelto.

Esistono binder gestiti direttamente da Spring Cloud Stream come Apache Kafka, RabbitMQ, Amazon Kinesis e altri gestiti da maintainers esterni come Azure EventHubs, Amazon SQS, etc. Comunque, Spring Cloud Stream permette di creare binders personalizzati.

Spring Cloud Stream integra inoltre le funzionalità di Partition e Message Groups speculari a quelli di Apache Kafka.
Questo significa che il framework mette a disposizione queste due funzionalità per qualsiasi sistema di messaggi scelto (quindi ad esempio si avranno a disposizione anche se si utilizza RabbitMQ, nonostante questo sistema non li supporta nativamente).

Spring Cloud Stream da Spring Cloud Function

Spring Cloud Stream si basa su Spring Cloud Function. La business logic può essere scritta attraverso semplici funzioni.
Vengono utilizzate le classiche tre interfacce di Java:

  • Supplier: una funziona che ha output ma non ha input, viene chiamata anche producer, publisher, source .
  • Consumer: una funzione che ha input ma non ha output, viene chiamata anche subscriber o sink.
  • Function: una funzione che ha sia input che output, viene chiamata anche processor.

Ed è proprio per questo motivo che vi parlo oggi di Spring Cloud Stream. Nelle versioni precedenti, non veniva utilizzato questo approccio, il ché rendeva il framework meno intuitivo.

Questo è un esempio di processor che prende in input una stringa e la trasforma in upperCase, nella versione precedente del modulo:

@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public String handle(Strign value) {
    return value.toUpperCase();
}

Lo stesso esempio, con la versione attuale:

@Bean
public Function<String, String> uppercase(Strign value) {
    return v -> v.toUpperCase();
}

Adesso un po' di pratica!

Primo passo: creiamo il progetto da Spring Initializr

Creiamo lo scheletro del progetto tramite il sito di Spring Initializr: Spring Initializr (il link contiene già le configurazioni necessarie).

Scarichiamo il progetto, un-zippiamolo e apriamolo. Analizziamo il pom.xml:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

La prima dipendenza è quella generale di Spring Cloud Stream, mentre la seconda serve a dare le configurazioni per il binder scelto. Se volessimo cambiare binder, cambieremmo questa dipendenza.

Secondo passo: creiamo un model per il messaggio

Creiamo un sub-package model con il seguente record:

public record SensorEventMessage(
        String sensorId,
        Instant timestampEvent,
        Double degrees
)
{}

Questo model rappresenta un messaggio inviato da un sensore di temperatura.

Terzo passo: creiamo un consumer

Creiamo un sub-package event, e all'interno la seguente classe:

@Configuration
@Slf4j
@RequiredArgsConstructor
public class SensorEventFunctions {

    @Bean
    public Consumer<SensorEventMessage> logEventReceived() {
        return sensorEventMessage -> log.info("Message received: {}", sensorEventMessage);
    }
}

Analizziamo il codice:

  • La classe è annotata con @Configuration in modo tale da contenere dei bean (metodi annotati con @Bean).
  • Il metodo, che è il consumer vero e proprio, è una semplice funzione Java di tipo Consumer, che logga il messaggio ricevuto. Inoltre è annotato con @Bean. In Spring Cloud Function, per attivare le funzioni, basta che queste ultime siano dei bean.

Avete trovato riferimenti a Kafka? 🤭

Quarto passo: creiamo un producer

Nella stessa classe, creiamo un metodo che fa da producer:

@Bean
public Supplier<SensorEventMessage> sensorEventProducer() {
    return () -> new SensorEventMessage("2", Instant.now(), 30.0);
}

Questo producer non fa altro che simulare l'invio di un messaggio di un sensore con id 2.

Basta così, abbiamo finito di scrivere codice! Non abbiamo necessità di scrivere serializzatori e deserializzatori.
Viene gestito tutto automaticamente da Spring Cloud Stream.

Quinto passo: creiamo le configurazioni per Spring Cloud Stream

Rinominiamo il file application.properties in application.yml e inseriamo queste config:

spring:
  application:
    name: spring-cloud-stream
  cloud:
    function:
      definition: logEventReceived;sensorEventProducer
    stream:
      bindings:
        logEventReceived-in-0: #a consumer
          destination: sensor_event_topic
          group: ${spring.application.name}
        sensorEventProducer-out-0: #a producer
          destination: sensor_event_topic

Analizziamo il file:

  • La property spring.cloud.function.definition serve a definire le funzioni dell'applicativo annotate con @Bean. Le funzioni sono separate da un punto e virgola.
  • La property spring.cloud.stream.bindings serve a dichiarare per ogni producer/consumer la sua destinazione (in questo caso è il nome di una Topic Kafka). Viene utilizzata la seguente naming convention:
    • per i bindings di input: <functionName> + -in- + <index>
    • per i bindings di output: <functionName> + -out- + <index>
    Il campo index è sempre 0 quando vogliamo sfruttare il concetto di message groups.
  • Il consumer logEventReceived, oltre ad avere il campo destination, ha anche il campo group che indica il nome del Message Group.
    Ogni consumer di uno stesso Message Group, legge i messaggi da una partition diversa, in modo tale da dividersi i messaggi con gli altri consumer, in un'ottica di horizontal scaling.
Sesto passo: avviamo in locale Kafka tramite container Docker

Utilizziamo il Docker Compose scaricato dal sito di Confluent, in modo tale da avviare Kafka in locale tramite Docker:

---
version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.3.0
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-kafka:7.3.0
    container_name: broker
    ports:
      # To learn about configuring Kafka for access across networks see
      # https://www.confluent.io/blog/kafka-client-cannot-connect-to-broker-on-aws-on-docker-etc/
      - "9092:9092"
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://broker:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1

Non dobbiamo eseguire nessun comando per avviare il Docker Compose.
Infatti dalla versione 3.1 di Spring Boot, con la dipendenza spring-boot-docker-compose, Docker Compose verrà avviato automaticamente all'avvio dell'applicazione!

Settimo passo: avviamo l'applicazione

Avviamo l'applicazione e analizziamo i log. Avremo un log del genere ogni secondo:
Message received: SensorEventMessage[sensorId=2, timestampEvent=2023-02-27T17:40:08.367719Z, degrees=30.0].

Questo perché di default, in Spring Cloud Function, viene eseguito un polling ogni secondo per i bean Supplier (tramite la property spring.integration.poller.fixed-delay). Quindi il producer invia messaggi ogni secondo. L'applicativo produce e consuma correttamente i messaggi!

E se volessimo utilizzare il paradigma Reactive?

Ottavo passo: Spring Cloud Stream Reactive

Con la nuova versione di Spring Cloud Stream, non c'è necessità di importare una nuova di dipendenza.
La versione reactive diventa:

@Bean
public Function<Flux<SensorEventMessage>, Mono<Void>> logEventReceived() {
    return fluxEvent -> fluxEvent
            .doOnNext(sensorEventMessage -> log.info("Message received: {}", sensorEventMessage))
            .then();
}

@Bean
 public Supplier<Flux<SensorEventMessage>> sensorEventProducer() {
    return () -> Flux.fromStream(Stream.generate(() -> {
        try {
            Thread.sleep(5000);
            return new SensorEventMessage("2", Instant.now(), 30.0);
        } catch (Exception e) {
            // ignore
        }
        return null;
    })).subscribeOn(Schedulers.boundedElastic()).share();
}

Nota che il consumer ora è una Function che prende in input un Flux di messaggi e restituisce un Mono<Void>.
Tuttavia, può essere scritto anche in questo modo:

@Bean
public Consumer<Flux<SensorEventMessage>> logEventReceived() {
    return fluxEvent -> {
        fluxEvent
                .doOnNext(sensorEventMessage -> log.info("Message received: {}", sensorEventMessage))
                .subscribe();

    };
}

Siccome in WebFlux "niente succede se il flusso non è sottoscritto", qui siamo costretti a esplicitare il subscribe.
Nella modalità Reactive quindi, per creare un consumer, possiamo utilizzare o l'interfaccia Function o quella Consumer.

Per quanto riguarda il producer, Spring Cloud Stream nella modalità Reactive, non fa partire automaticamente il polling di un secondo, quindi eseguiamo Stream.generate... per questo (e invece di inviare il messaggio ogni secondo, lo inviamo ogni cinque).

Nono passo: Producer da un flusso sincrono

Spesso è necessario inviare un messaggio da un flusso sincrono, ad esempio dopo una richiesta REST. A quel punto, il Supplier non va più bene.
Spring Cloud Stream mette a disposizione un bean di tipo StreamBridge per questo caso d'uso!
Creiamo una bean Producer che utilizza la classe StreamBridge per inviare un messaggio.
Questo producer verrà invocato da una API REST :

@Component
@RequiredArgsConstructor
public class ProducerSensorEvent {

    private final StreamBridge streamBridge;
    
    
    public boolean publishMessage(SensorEventMessage message) {
        return streamBridge.send("sensorEventAnotherProducer-out-0", message);
    }

}

//REST API
@RestController
@RequestMapping("sensor-event")
@RequiredArgsConstructor
public class SensorEventApi {

  private final ProducerSensorEvent producerSensorEvent;


  @PostMapping
  public Mono<ResponseEntity<Boolean>> sendDate(@RequestBody Mono<SensorEventMessage> sensorEventMessage) {
    return sensorEventMessage
            .map(producerSensorEvent::publishMessage)
            .map(ResponseEntity::ok);

  }

}

Niente di più semplice. Il metodo send della classe StreamBridge prende in input il nome del binding e il messaggio da inviare.
Ovviamente, dobbiamo aggiungere questo nuovo binding nel file application.yml solo nella sezione bindings (non dobbiamo aggiungerlo nella sezione function perché banalmente non è un funzione):

sensorEventAnotherProducer-out-0: #another producer
  destination: sensor_event_topic

Per inviare il messaggio, possiamo effettuare una chiamare REST del genere:
http POST :8080/sensor-event sensorId=1 timestampEvent=2023-02-26T15:15:00Z degrees=26.0

Passo Bonus: Functions composition in Spring Cloud Function

In Spring Cloud Function, possiamo concatenare pìù funzioni. Ad esempio, oltre a loggare il messaggio ricevuto, vogliamo anche salvarlo a DB. Per fare cioè scriviamo un'altra funziona che prende in input il messaggio e lo salva a DB:

@Bean
public Function<Flux<SensorEventMessage>, Mono<Void>> saveInDBEventReceived() {
    return fluxEvent -> fluxEvent
            .doOnNext(sensorEventMessage -> log.info("Message saving: {}", sensorEventMessage))
            .flatMap(sensorEventMessage -> sensorEventDao.save(Mono.just(sensorEventMessage)))
            .then();
}

a quel punto, modifichiamo la funzione di log in modo tale da restituire di nuovo il messaggio, dopo averlo loggato. L'output della funzione logEventReceived sarà l'input della funzione saveInDBEventReceived:

 @Bean
public Function<Flux<SensorEventMessage>, Flux<SensorEventMessage>> logEventReceived() {
    return fluxEvent -> fluxEvent
            .doOnNext(sensorEventMessage -> log.info("Message received: {}", sensorEventMessage));
}
La classe DAO semplicemente memorizza il messaggio su un ConcurrentHashMap. Trovate il codice della classe sul mio GitHub. (link in fondo alla pagina)

Per concatenare le due funzioni, utilizziamo l'operatore | (pipe) nell'application.yml:

spring:
  application:
    name: spring-cloud-stream
  cloud:
    function:
      definition: logEventReceived|saveInDBEventReceived;sensorEventProducer
    stream:
      bindings:
        logEventReceivedSaveInDBEventReceived-in-0: #a consumer
          destination: sensor_event_topic
          group: ${spring.application.name}
        sensorEventProducer-out-0: #a producer
          destination: sensor_event_topic
        sensorEventAnotherProducer-out-0: #another producer
          destination: sensor_event_topic

Notiamo due cose:

  • Le due funzioni logEventReceived e saveInDBEventReceived sono concatenate con la pipe. L'output della prima funzione diventa l'input della seconda.
  • Il nome del binding diventa logEventReceivedSaveInDBEventReceived, cioè la concatenazione dei nomi delle due funzioni.

E se volessimo aggiungere delle properties custom di Kafka?

Personalizzare properties per Kafka

Per farlo, basta utilizzare le properties spring.cloud.stream.kafka.binder. Possiamo aggiungere configurazioni globali, configurazioni limitate a tutti i producer (spring.cloud.stream.kafka.binder.producerProperties) o specifico per un singolo producer (spring.cloud.stream.kafka.bindings.<channelName>.producer..).
Lo stesso per i consumer.

Ad esempio:

spring:
  application:
    name: spring-cloud-stream
  cloud:
    function:
      definition: logEventReceived|saveInDBEventReceived;sensorEventProducer
    stream:
      kafka:
        binder:
          brokers: localhost:9092
#          kafka-properties: global props
#            boootstrap.server: localhost:9092 Global props
          consumerProperties: #consumer props
            max.poll.records: 250
#          producerProperties: producer props
      bindings:
        logEventReceivedSaveInDBEventReceived-in-0: #a consumer
          destination: sensor_event_topic
          group: ${spring.application.name}
        sensorEventProducer-out-0: #a producer
          destination: sensor_event_topic
        sensorEventAnotherProducer-out-0: #another producer
          destination: sensor_event_topic

Error Handling: retries e DLQ

Per testare l'Error Handling in Spring Cloud Stream, modifichiamo il codice della classe DAO in questo modo:

public Mono<SensorEventMessage> save(Mono<SensorEventMessage> sensorEventMessageMono) {
    return sensorEventMessageMono
            .doOnNext(sensorEventMessage -> log.info("Message saving: {}", sensorEventMessage))
            .map(sensorEventMessage -> {
                if (sensorEventMessage.degrees() == 10.0) throw new RuntimeException("Error in saveMessage");
                return sensorEventMessage;
            })
            .doOnNext(sensorEventMessage -> memoryDB.put(sensorEventMessage.sensorId(), sensorEventMessage))
            .doOnNext(sensorEventMessage -> log.info("Message saved with id: {}", sensorEventMessage.sensorId()));
}

Simuliamo un errore se l'evento ricevuto contiene 10 gradi.

Versione imperativa

Un consumer in Spring Cloud Stream, nella versione imperativa, tenterà di elaborare in totale tre volte il messaggio, nel caso quest'ultimo sollevi una eccezione.
Il valore di default di maxAttempts può essere sovrascritto in questo modo:

bindings:
  logEventReceivedSaveInDBEventReceived-in-0: #a consumer
    destination: sensor_event_topic
    group: ${spring.application.name}
    consumer:
      max-attempts: 3 #default is 3

Puoi anche decidere su quali eccezioni effettuare le retries popolando la property retryable-exceptions in questo modo:

bindings:
  logEventReceivedSaveInDBEventReceived-in-0: #a consumer
    destination: sensor_event_topic
    group: ${spring.application.name}
    consumer:
      max-attempts: 3 #default is 3
      retryable-exceptions:
        java.lang.RuntimeException: true #retryable
        java.lang.IllegalArgumentException: false #not retryable

Se l'eccezione non è esplicitata in retryable-exceptions, sarà gestita con la retry.

Per quanto riguarda le DLQ, anche questa funzionalità è gestita out-of-box da Spring Cloud Stream, nella versione imperativa. Basta aggiungere le seguenti properties:

cloud:
  function:
    definition: logEventReceivedSaveInDBEventReceived;sensorEventProducer
  stream:
    kafka:
      binder:
        brokers: localhost:9092
        consumerProperties: #consumer props
          max.poll.records: 250
      bindings:
        logEventReceivedSaveInDBEventReceived-in-0:
          consumer:
            enable-dlq: true
            dlq-name: sensor_event_topic_dlq

Con enable-dlq indichiamo a Spring che vogliamo che il consumer abbia una DLQ.
Con dlq-name diamo il nome della topic DLQ.

Per provare l'Error Handling, possiamo inviare un messaggio dalla CLI di Kafka.

  1. Con Docker e il progetto in esecuzione, esegui il comando docker exec -it broker bash.
  2. Esegui il comando
    /bin/kafka-console-consumer --bootstrap-server broker:9092 --topic sensor_event_topic_dlq.
    In questo modo vedrai il contenuto della coda DLQ.
  3. Apri un'altra shell ed esegui lo stesso comando del passo 1.
  4. Esegui il comando
    /bin/kafka-console-producer --bootstrap-server broker:9092 --topic sensor_event_topic.
  5. Ora puoi inviare il seguente messaggio:

{ "sensorId": "prova-id", "timestampEvent": "2023-06-09T18:40:00Z", "degrees": 10.0 }

Vedremo il seguente messaggio di log:

Message received: SensorEventMessage[sensorId=prova-id, timestampEvent=2023-06-09T18:40:00Z, degrees=10.0]
Message saving: SensorEventMessage[sensorId=prova-id, timestampEvent=2023-06-09T18:40:00Z, degrees=10.0]
Message received: SensorEventMessage[sensorId=prova-id, timestampEvent=2023-06-09T18:40:00Z, degrees=10.0]
Message saving: SensorEventMessage[sensorId=prova-id, timestampEvent=2023-06-09T18:40:00Z, degrees=10.0]
Message received: SensorEventMessage[sensorId=prova-id, timestampEvent=2023-06-09T18:40:00Z, degrees=10.0]
Message saving: SensorEventMessage[sensorId=prova-id, timestampEvent=2023-06-09T18:40:00Z, degrees=10.0]
org.springframework.messaging.MessageHandlingException: error occurred in message handler 
[org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1@5c81d158], 
failedMessage=GenericMessage [payload=byte[85], headers={kafka_offset=99, 
kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@306f6122, deliveryAttempt=3, 
kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=sensor_event_topic, 
kafka_receivedTimestamp=1686585227257, contentType=application/json, kafka_groupId=spring-cloud-stream}]
...
Caused by: java.lang.RuntimeException: Error in saveMessage
	at com.vincenzoracca.springcloudstream.dao.SensorEventInMemoryDao.lambda$save$1(SensorEventInMemoryDao.java:26)
	at com.vincenzoracca.springcloudstream.event.SensorEventImperativeFunctions.lambda$saveInDBEventReceived$2(SensorEventImperativeFunctions.java:45)
...
o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: ...

Come si evince dai log, il messaggio viene elaborato tre volte in totale prima di andare definitivamente in errore ed essere spedito in DLQ (puoi controllare anche la coda DLQ).

Sembra che la DLQ non funzioni correttamente con la concatenazione di funzioni, quindi consiglio di unire le due funzioni logEventReceived e saveInDBEventReceived.

Versione reactive

Con la versione reactive di Spring Cloud Stream, quelle properties settate in precedenza non funzionano!
Un modo per gestire retries è DLQ potrebbe essere:

  • Avere il consumer con un una sola funziona (no concatenazioni).
  • Wrappare la business logic del consumer in una funzione flatMap, per avere a disposizione il messaggio da mandare in DLQ.
  • Gestire le retries con la funzione retry di Reactor.
  • Creare un channel per la DLQ e mandare il messaggio in errore a quest'ultimo.

Ecco il consumer reactive modificato per gestire retries e DLQ:

private static final String DLQ_CHANNEL = "sensorEventDlqProducer-out-0";

@Bean
public Function<Flux<Message<SensorEventMessage>>, Mono<Void>> logEventReceivedSaveInDBEventReceived() {
    return fluxEvent -> fluxEvent
            .flatMap(this::consumeMessage)
            .then();
}

private Mono<SensorEventMessage> consumeMessage(Message<SensorEventMessage> message) {
    return Mono.just(message)
            .doOnNext(sensorEventMessage -> log.info("Message received: {}", sensorEventMessage))
            .flatMap(sensorEventMessage -> sensorEventDao.save(Mono.just(sensorEventMessage.getPayload())))
            .retry(2)
            .onErrorResume(throwable -> dlqEventUtil.handleDLQ(message, throwable, DLQ_CHANNEL));
}

DlqEventUtil è una classe di utility che manda qualsiasi messaggio in DLQ:

@Component
@Slf4j
@RequiredArgsConstructor
public class DlqEventUtil {

    private final StreamBridge streamBridge;

    public  <T> Mono<T> handleDLQ(Message<T> message, Throwable throwable, String channel) {
        log.error("Error for message: {}", message, throwable);
        streamBridge.send(channel, message);
        return Mono.empty();
    }
}

Il file application.yml diventa:

spring:
  application:
    name: spring-cloud-stream
  cloud:
    function:
      definition: logEventReceivedSaveInDBEventReceived;sensorEventProducer
    stream:
      kafka:
        binder:
          brokers: localhost:9092
          consumerProperties: #consumer props
            max.poll.records: 250
      bindings:
        logEventReceivedSaveInDBEventReceived-in-0: #a consumer
          destination: sensor_event_topic
          group: ${spring.application.name}
        sensorEventProducer-out-0: #a producer
          destination: sensor_event_topic
        sensorEventAnotherProducer-out-0: #another producer
          destination: sensor_event_topic
        sensorEventDlqProducer-out-0: #DLQ producer
          destination: sensor_event_topic_dlq
Testing Spring Cloud Stream applications

Quando abbiamo importato il progetto da Spring Initializr, nel pom è stata aggiunta anche la seguente dipendenza:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-test-binder</artifactId>
    <scope>test</scope>
</dependency>

Grazie a essa, possiamo testare facilmente i nostri consumer e producer. Vediamo come:

@SpringBootTest
@Import(TestChannelBinderConfiguration.class)
class SensorEventTestsIT {

    @Autowired
    private InputDestination input;

    @Autowired
    private OutputDestination output;

    @Autowired
    private ObjectMapper objectMapper;

    @Autowired
    private ProducerSensorEvent producerSensorEvent;

    @Autowired
    private SensorEventDao sensorEventDao;


    @Test
    public void produceSyncMessageTest() throws IOException {
        SensorEventMessage sensorEventMessage = new SensorEventMessage("3", Instant.now(), 15.0);
        producerSensorEvent.publishMessage(sensorEventMessage);
        byte[] payloadBytesReceived = output.receive(100L, "sensor_event_topic").getPayload();
        assertThat(objectMapper.readValue(payloadBytesReceived, SensorEventMessage.class)).isEqualTo(sensorEventMessage);
    }

    @Test
    public void producePollableMessageTest() throws IOException {
        byte[] payloadBytesReceived = output.receive(5000L, "sensor_event_topic").getPayload();
        SensorEventMessage sensorEventMessageReceived = objectMapper.readValue(payloadBytesReceived, SensorEventMessage.class);
        assertThat(sensorEventMessageReceived.sensorId()).isEqualTo("2");
        assertThat(sensorEventMessageReceived.degrees()).isEqualTo(30.0);
    }

    @DirtiesContext(methodMode = DirtiesContext.MethodMode.BEFORE_METHOD)
    @Test
    public void consumeMessageTest() throws IOException {
        SensorEventMessage sensorEventMessage = new SensorEventMessage("3", Instant.now(), 15.0);
        byte[] payloadBytesSend = objectMapper.writeValueAsBytes(sensorEventMessage);
        input.send(new GenericMessage<>(payloadBytesSend), "sensor_event_topic");
        StepVerifier
                .create(sensorEventDao.findAll())
                .expectNext(sensorEventMessage)
                .expectComplete()
                .verify();
    }

}

Importando la classe TestChannelBinderConfiguration, Spring ci fornire dei bean di InputDestination e OutputDestination, che possiamo utilizzare per testare consumer e producer rispettivamente.
Non abbiamo bisogno del binder reale; il framework ci fornisce un binder di test.
Analizziamo uno per uno i test:

  • Nel test produceSyncMessageTest, utilizziamo il nostro producer sincrono per inviare un messaggio.
    Verifichiamo con l'OutputDestination che il messaggio sia stato effettivamente inviato sulla destination (topic Kafka).
  • Nel test producePollableMessageTest, non inviamo nessun messaggio esplicitamente, poiché il producer è di tipo pollable. Aspettiamo cinque secondi per verificare che il messaggio sia stato inviato correttamente (siccome il polling è di cinque secondi).
  • Nel test consumeMessageTest, inviamo con l'OutputDestination un messaggio sulla topic Kafka e in seguito verifichiamo che lo stesso messaggio sia stato salvato a DB.
    Inoltre il metodo è annotato con @DirtiesContext(methodMode = DirtiesContext.MethodMode.BEFORE_METHOD), per pulire il contesto (e quindi il DB), prima che venga invocato il metodo.

Conclusioni

In questo articolo abbiamo visto come utilizzare la nuova versione di Spring Cloud Stream.
Il fatto di poterci concentrare unicamente sulla business logic, scrivendo semplici funzioni Java, indipendenti dal broker utilizzato, rende questo framework sicuramente molto interessante. Potremmo switchare da Kafka a RabbitMQ, senza necessità di cambiare il codice che abbiamo appena scritto. Che potenza! Possiamo anche avere diversi binder nella stessa applicazione.

Trovate il codice completo sul mio repo di GitHub al seguente link: GitHub.

Altri articoli su Spring: Spring.
Articoli su Docker: Docker.

Libri consigliati su Spring, Docker e Kubernetes:

Riferimenti