Reactive Backend Applications with Spring Boot, Kotlin and Coroutines (Part 2)

By Léo Schneider and Mehmet Akif Tütüncü

9 min read

An exploration of Kotlin and coroutines to have a Spring reactive application written in direct style

Reactive Backend Applications with Spring Boot, Kotlin and Coroutines (Part 2)
Authors

Introduction

In this second part of the article, we will show that we can use Kotlin coroutines to have a reactive application while writing code in direct style.

We expect to start from a Spring Boot reactive project using Reactor, written in Java, see part 1.

In the first part, we will focus on Kotlin and adapting the project to the language. In the second part, we will explore coroutines, and how we can use them.

Reactive with Kotlin

As of now, we have a reactive application, throughout the whole chain. We even have a safeguard to detect blocking calls, to verify the non-blocking nature of the code at runtime.

We could stop everything right here, but there is one possible upgrade that we can do at this point. That upgrade is too interesting to skip, even if it means that we have to change the programming language!

You read it in the title, this is of course Kotlin.

Kotlin Overview

Interestingly enough, Kotlin is a multiplatform language, initially used for Android development, but it is getting now more and more used as a backend language. The compiled code will be normal Java byte code, so it is even possible to have both Kotlin and Java files in the same project. It also means that we can keep Java frameworks and libraries, so no other change is required there.

Kotlin Features

Kotlin has many interesting features, some of them are even copied by Java in the latest versions.

Null Safety

One major feature is that every type has the variation nullable and not nullable. The compiler can know at compile time if a NullPointerException could happen, and will refuse to compile if such a case isn't handled.

Immutability

Everything is immutable in Kotlin by default: the collections are immutable unless you specify otherwise. You can use val to mark your variables as constant references. Knowing what can and what cannot change, it helps to reduce the amount of side effects produced.

Extension Functions

Kotlin provides the ability to create functions from a receiver without inheriting the class. You can then extend the functionality of classes that you don't own.

Standard Library

The standard library of the language has many, many more convenient functions if we compare it to Java. For example, you don't need to use stream() whenever you want to do something from your Collections. And thanks to the extension functions, you can even get additional methods on your Java types.

Migrate the project to Kotlin

The next step is now to upgrade our project to Kotlin. We have two methods possible:

1. Use IDE Automatic Tool

migrate_file_kotlin.png
@get:GetMapping
val allWeather: Flux<WeatherInfo>
    get() = weatherService.allWeather

This method started with get, the IDE concluded that this was a property called allWeather. It created a property with a getter to replace it instead. But us humans can see that this was meant to be a method instead, you have to manually readjust it to:

fun getAllWeather(): Flux<WeatherInfo> {
    return weatherService.getAllWeather()
}

2. Manually Change the Files

As the code automatically generated by the IDE isn't ideal to have some readable code, it is better to rewrite it manually, or at least to review all code automatically generated.

it sounds like a very time-consuming task, but using some regexes, find/replace, and multi carets help to change some files relatively quickly.

almost every function will change from that pattern:

public Foo foo(Bar bar) {}

to this one:

fun foo(bar: Bar): Foo {}

Migrate to Data Classes

Java restricts us to have only one class per file. Sometimes we have very simple classes, in that case, we can migrate them as data class, which has its equivalent in Java as Record. The added value of Kotlin is that we can put them all in one single file. The resulting file can contain the entire domain for a small module like this one.

package com.io.reactivecoroutines.weather.api

import com.fasterxml.jackson.annotation.JsonProperty
import java.time.LocalDate

data class Forecast(
    @JsonProperty("forecastday") val days: List<ForecastDay>
)

data class ForecastDay(
    val date: LocalDate,
    @JsonProperty("day") val temperature: Temperature,
)

data class Location(
    @JsonProperty("name") val city: String,
    val region: String,
    val country: String,
)

data class Temperature(
    @JsonProperty("maxtemp_c") val maxC: Double,
    @JsonProperty("maxtemp_f") val maxF: Double,
    @JsonProperty("mintemp_c") val minC: Double,
    @JsonProperty("mintemp_f") val minF: Double,
    @JsonProperty("avgtemp_c") val avgC: Double,
    @JsonProperty("avgtemp_f") val avgF: Double,
)

Testing

If you are using Mockito, you can realize that the function when() is now invalid because it is a reserved keyword in Kotlin. Mockito has a Kotlin variation of the library, you can get the dependency: org.mockito.kotlin:mockito-kotlin to get some better support than the Java one. when is called whenever, but aside from that, almost everything should be similar.

Now that we have the whole project in Kotlin, we can go right away to what we are looking for: Coroutines

Coroutines

Coroutines were first introduced in Design of a Separable Transition-Diagram Compiler, published in 1963. It represents a part of a program that is separable, which can pause and resume its execution.

It means that if we have functions that can be suspended, we can have non-blocking behavior and asynchronous calls. With more tweaking, we can have concurrency and enable parallelism.

Launching a new flow of execution with coroutines is very lightweight compared to instantiating a new thread, so we can have many coroutines at the same time. This shows how powerful coroutines are and why they get interesting for us, especially for non-blocking scenarios.

Kotlin has coroutines that are supported by the language and implemented by a library (the implementation remains platform-dependent): kotlinx.coroutines and a sandbox environment is available here to try it out.

Coroutines as Part of the Language

Coroutines in Kotlin allow us to write code in direct style, but runs asynchronously with callbacks. when you change a function with the modifier suspend, then you indicate that this function can suspend its execution Thus, it can run only in a coroutine context. The compiler will effectively wrap the returning type with a Continuation.

In direct style we would typically have code written like this:

fun postItem(item: Item) {
    val token = createToken()
    val response = client.post(item, token)
    handleResponse(response)
}

If we chain callbacks about the continuation of the flow, it would be something like this which gets quickly difficult to read:

fun postItem(item: Item) {
    createToken { token ->
        client.post(item, token) { response ->
            handleResponse(response)
        }
    }
}

Simply by adding the suspend keyword to the direct style code, we get it to run with continuation callbacks! We can now have the best of both worlds.

            suspend fun postItem(item: Item) {
/*Suspend*/     val token = createToken()
/*Suspend*/     val response = client.post(item, token)
                handleResponse(response)
            }

Integration With Project Reactor

If you want to keep your code just the way it is with the project Reactor and benefit from its specific APIs, then you can simply wrap everything in Kotlin code, just the way you had it in Java.

because of the interoperability in between libraries, you can use asFlow() and asFlux() to change your code. This is especially valuable if you intend to do some migration. Because in that case, you can do it step by step.

You can benefit from the quality of life offered by Kotlin and keep your reactor-specific code just how it is. But that wouldn't be a good reason to migrate the whole project.

It is interesting to know that this is possible, but what we really want is to use coroutines as it will reduce significantly the complexity of the code that you read.

Coroutines Migrations

In our project, we realize that we need to suspend all the functions in the chain from controllers to repositories. The best thing to start with is to start from the external layers such as repositories and then move in the chain layer by layer.

1. Repository Layer

In the repository layer, we previously had a reactive repository:

@Repository
interface WeatherRepository : ReactiveCrudRepository<WeatherInfo, String>

We can change it to CoroutineCrudRepository and we can also get one that supports sorting: CoroutineSortingCrudRepository: these interfaces come from the dependency spring-boot-starter-data-r2dbc

@Repository
interface WeatherRepository : CoroutineCrudRepository<WeatherInfo, String>

If we check the function findById(id: Long), we can see that the repository provides this:

suspend fun findById(id: Long): WeatherInfo?

this is the amazing part! this function is suspending and that repository is non-blocking, but we don't have to deal with the reactive types! No more type wrapping is necessary!

If we have a look at the findAll() method, we can see that it is not suspending and return a reactive type:

fun findAll(): Flow<T>

To get rid of the Flow type, we would need to collect it, otherwise, we can pass it on to the levels higher.

  • If we work in a way of pages, then we can simply use toList() to await for the entire list
  • If we want to stream the data progressively, then we can pass the flow until the body of the serverResponse.

2. WebClients from Webflux

Most real applications don't only have a database, but also have CRUD operations over the network. That is why we have some reactive webclients in our application. They are written in functional style using the reactor API.

They look like this:

val forecast: WeatherApiResponse = webClient
    .get()
    .uri("$host/v1/forecast.json?key=$apiKey&q=$query&days=7")
    .exchangeToMono<WeatherAPIResponse?> { it.bodyToMono() }
    .doFirst {
        log.info("Getting weather forecast for {}", query)
    }
    .doOnError {
        log.error("Cannot get weather forecast for $query", it)
        Exceptions.propagate(it)
    }
    .doOnSuccess { response ->
        log.info(
            "Weather forecast for query {}: {}",
            query,
            response
        )
    }

We can use the coroutine methods instead: We marked the suspending calls like this: /*S*/

        val forecastResponse: ClientResponse = webClient
            .get()
            .uri("$host/v1/forecast.json?key=$apiKey&q=$query&days=7")
/*S*/       .awaitExchange { it }

        val statusCode = forecastResponse.statusCode()

        return when {
/*S*/       statusCode.is2xxSuccessful -> forecastResponse.awaitBody()
/*S*/       statusCode.is4xxClientError -> throw IllegalArgumentException(forecastResponse.createExceptionAndAwait())
/*S*/       statusCode.is5xxServerError -> throw forecastResponse.createExceptionAndAwait()
/*S*/       else -> throw forecastResponse.createExceptionAndAwait()
        }

Now, we are back to a direct style, and we have much more control on the type of code that we can use.

We end up using different functions: the conversion table looks like this:

  1. exchangeToMono() -> awaitExchange() and its nullable variation awaitExchangeOrNull()
  2. exchangeToFlux() -> exchangeToFlow()
  3. bodyToMono()-> awaitBody() and its nullable variation awaitBodyOrNull()
  4. bodyToFlux() -> bodyToFlow() ...

As you can see, the bodies of zero or one objects can be awaited to be retrieved directly, the flows can be awaited later on or handled one by one.

3. Service Layer

In the service layer, we can mark the functions as suspend and change the types from their reactive types to their "unwrapped" types:

fun findById(id: Long): Mono<WeatherInfo> {}
fun findAll(): Flux<WeatherInfo> {}
fun streamAll(): Flux<WeatherInfo> {}

should all become like this:

suspend fun findById(id: Long): WeatherInfo {}
suspend fun findAll(): List<WeatherInfo> {}
suspend fun streamAll(): Flow<WeatherInfo> {}

4. Controllers and Request Handlers

If you are using the @RestController annotation for your controller, then whichever type you have will be handled by Spring: but as you are calling suspending functions down the line, you need to change the controllers to be suspending too.

@GetMapping("/{id}")
fun getById(@PathVariable id: Long): Mono<WeatherInfo> {}

will become:

@GetMapping("/{id}")
suspend fun getById(@PathVariable id: Long): WeatherInfo {}

Launching Coroutines

One important question that you might have is the following: But where do we start coroutines? I can't see any launch {} anywhere in the code.

meme launch coroutines.png

The same way as calling subscribe() yourself in reactor, you most likely don't need to. This is because if your controller functions are suspending, then Spring will launch them under the hood for you. if you use the router style approach, then you can use a coRouter instead of a normal one.

Your router will change from this:

@Bean
fun route(weatherSearchHandler: WeatherSearchHandler): RouterFunction<ServerResponse> {
    return RouterFunctions
        .route(
            POST("/weather-infos/search/")
                .and(accept(MediaType.APPLICATION_JSON)), weatherSearchHandler::searchByExample
        )
}

to this:

@Bean
fun route(weatherSearchHandler: WeatherSearchHandler) = coRouter {
        POST("/weather-infos/search/", weatherSearchHandler::searchByExample).apply {
            accept(MediaType.APPLICATION_JSON)
        }
}

When we reached this point, we finally migrated everything, we can have a reactive application while keeping our usual writing style.

Takeaways

  1. Reactive libraries are interoperable because they work with the same abstractions
  2. Kotlin is a great language to work with, on a reactive project but not only
  3. Improving such projects can be done incrementally
  4. Coroutines allow us to keep our application reactive, but without the functional code style limitation
  5. While using suspending functions, we can work with normal types, and not with the reactive ones

Extra Resources:


Share