Building Data Pipeline with Kotlin Coroutines Actors

How to build simple data-enriching pipeline using Kotlin coroutines with Actors model.

Jan 30, 2019 13 minutes read

In this post I will show how to build simple data-enriching pipeline using Kotlin coroutines. I will use Channels and Actors abstractions provided by kotlinx-coroutines.

In Actors model «actors» are the universal primitives of concurrent computation. In response to a message that it receives, an actor can: make local decisions, create more actors, send more messages, and determine how to respond to the next message received. Actors may modify their own private state, but can only affect each other through messages (avoiding the need for any locks).

Let’s start with high-level definition of the pipeline:

(๐Ÿ‘คProducer) -โœ‰๏ธโ†’ ๐Ÿ“ฌ(๐Ÿ‘คEnricher) -โœ‰๏ธโ†’ ๐Ÿ“ฌ(๐Ÿ‘คUpdater)
           RawData            RichData
  • Pipeline will have Producer Actor, which will get some raw data from database or some mock data and will send it to pipeline for enrichment.
  • Then Enricher Actor will handle raw data object and add some attributes to it.
  • Finally, Updater Actor will store enriched data to the database.

For the sake of simplicity, let’s implement squaring function: we will take integers as raw data and will enrich them by squaring.

Let’s define our data model:

data class RawData(val value: Int)

Enriched data will be represented by data class RichData:

data class RichData(val value: Int, val square: Int)

Following Actors model, we will use Kotlin Actors to represent processing units in a pipeline and Channels to communicate with Actors.

In Kotlin actors are implemented as part of kotlinx-coroutines library:

An actor is an entity made up of a combination of a coroutine, the state that is confined and encapsulated into this coroutine, and a channel to communicate with other coroutines. A simple actor can be written as a function, but an actor with a complex state is better suited for a class.

There is an actor coroutine builder that conveniently combines actor’s mailbox channel into its scope to receive messages from and combines the send channel into the resulting job object, so that a single reference to the actor can be carried around as its handle.

Defining Messages

Actor always reacts to some external message or set of messages. It’s a good idea to define an Envelope to transfer metadata along the payload. It is very important that all messages are immutable so it is safe to pass messages to different threads.

data class Metadata(val timestampMillis: Long, val correlationId: UUID = UUID.randomUUID())
data class Envelope<T>(val payload: T, val metadata: Metadata)

In this case Metadata contains timestamp in milliseconds and correlation Id.

Following function will be useful for data transformation and copy metadata to new envelope.

fun <T, R> transformMessage(input: Envelope<T>, block: (T) -> R): Envelope<R> {
    val result = block(input.payload)
    return Envelope(result, input.metadata)
}

Defining Actors

Let’s define our Producer:

private val context = Executors.newFixedThreadPool(5, NamedThreadFactory("producer")).asCoroutineDispatcher()

@InternalCoroutinesApi
@ExperimentalCoroutinesApi
fun CoroutineScope.producerActor(total: Int) = produce<Envelope<RawData>>(
    context,
    capacity = 10,
    onCompletion = {
        context.close() // close context on stopping the actor
        log("๐Ÿ›‘ Completed. Exception: $it")
    }
) {
    for (i in 1..total) {
        val rawData = RawData(i)
        val result = Envelope(rawData, Metadata(Instant.now().toEpochMilli()))
        log("๐ŸฅProducing $result")
        channel.send(result)
    }
    channel.close()
}

Coroutines are always executed in some CoroutineContext. If we want to control how many threads will be available for actor, we can use ExecutorCoroutineDispatcher. As you can see, we defined the Dispatcher with 5 threads with FixedThreadPool. Actor will use this thread pool to run.

You may notice, that actor is defined with channel buffer with capacity=100. Actor will be able to send messages to the channel unless it’s buffer is full.

Also, onCompletion function will be called when actor will be stopped or canceled.

ExecutorCoroutineDispatcher needs to be stopped (closed) when our actor is completed, either successfully or exceptionally. That’s why context.close() is called in onCompletion function.

Now let’s define Enricher Actor:

private val context = Executors.newFixedThreadPool(3, NamedThreadFactory("enricher")).asCoroutineDispatcher()

@ExperimentalCoroutinesApi
@InternalCoroutinesApi
fun CoroutineScope.enricherActor(inbox: ReceiveChannel<Envelope<RawData>>): ReceiveChannel<Envelope<RichData>> =
    produce(
        context,
        capacity = 10,
        onCompletion = {
            context.close() // close context on stopping the actor
            log("๐Ÿ›‘ Completed. Exception: $it")
        }
    ) {
        for (msg in inbox) { // iterate over incoming messages
            log("๐Ÿฅ Processing $msg")
            val result = transformMessage(msg) { enrich(msg.payload) }
            log("๐Ÿฅ Enriched $result")
            channel.send(result) // send to next
        }
    }

private fun enrich(rawData: RawData): RichData {
    val value = rawData.value
    val square = value * value
    return RichData(value, square)
}

The implementation is very similar to Producer. The difference is that it receives messages from inbox which is ReceiveChannel<Envelope<RichData>> and sends them to it’s channel. You may notice that actor’s thread pool has only 3 threads now.

Updater will receive RichData message and print it. In real-life case it should save a message to database.

private val context = Executors.newFixedThreadPool(2, NamedThreadFactory("updater")).asCoroutineDispatcher()

@ExperimentalCoroutinesApi
@InternalCoroutinesApi
fun CoroutineScope.updaterActor(inbox: ReceiveChannel<Envelope<RichData>>): ReceiveChannel<Envelope<RichData>> =
    produce(
        context,
        capacity = 100,
        onCompletion = {
            context.close() // close context on stopping the actor
            log("๐Ÿ›‘ Completed. Exception: $it")
        }
    )
    {
        for (msg in inbox) { // iterate over incoming messages
            val created = msg.metadata.timestampMillis
            log("๐Ÿ“ Writing $msg, processed in ${Instant.now().toEpochMilli() - created}ms")
            Thread.sleep(100) // to simulate blocking operation
            log("โœ… Done with $msg")
            channel.send(msg)
        }
    }

Actor will print received messages and simulate IO operation by calling Thread.sleep(500). It has only 2 threads. When message is processed we send it again to outgoing channel, so it could be handled externally.

Building Pipeline

@ExperimentalCoroutinesApi
@InternalCoroutinesApi
fun main() {
    val total = 15
    val time = measureTimeMillis {
        runBlocking {

            val raw = producerActor(total)
            val enriched = enricherActor(raw)
            val updated = updaterActor(enriched)

            var counter = 0
            for (msg in updated) {
                counter++
                log("๐Ÿ Processed ${counter} : ${msg}")
            }
            log("The End")
        }
    }
    log("Done in $time ms")
}

As you can see, it’s now very easy to create pipeline. Although it is not as visual as in Akka Streams, but still very clear.

Let’s run it:

2019-01-30T22:45:43.678798Z [producer-0] ๐ŸฅProducing Envelope(payload=RawData(value=1), metadata=Metadata(timestampMillis=1548888343659, correlationId=8704b3b5-c357-4044-bc5d-995fe6a4797e))
2019-01-30T22:45:43.692628Z [producer-0] ๐ŸฅProducing Envelope(payload=RawData(value=2), metadata=Metadata(timestampMillis=1548888343692, correlationId=efaa86ff-292d-4657-bd63-925779caa133))
2019-01-30T22:45:43.692860Z [producer-0] ๐ŸฅProducing Envelope(payload=RawData(value=3), metadata=Metadata(timestampMillis=1548888343692, correlationId=b266e28d-bffd-4066-8bd9-b103dc67d5bc))
2019-01-30T22:45:43.693005Z [enricher-1] ๐Ÿฅ Processing Envelope(payload=RawData(value=1), metadata=Metadata(timestampMillis=1548888343659, correlationId=8704b3b5-c357-4044-bc5d-995fe6a4797e))
2019-01-30T22:45:43.693075Z [producer-0] ๐ŸฅProducing Envelope(payload=RawData(value=4), metadata=Metadata(timestampMillis=1548888343693, correlationId=f5d468be-6845-4973-ba25-ebf773152712))
2019-01-30T22:45:43.693259Z [producer-0] ๐ŸฅProducing Envelope(payload=RawData(value=5), metadata=Metadata(timestampMillis=1548888343693, correlationId=039b9f12-35f1-4b82-80af-07279eb17c8f))
2019-01-30T22:45:43.693468Z [producer-0] ๐ŸฅProducing Envelope(payload=RawData(value=6), metadata=Metadata(timestampMillis=1548888343693, correlationId=718d484d-fa21-4345-940c-6ad738914725))
2019-01-30T22:45:43.693638Z [producer-0] ๐ŸฅProducing Envelope(payload=RawData(value=7), metadata=Metadata(timestampMillis=1548888343693, correlationId=fee9b735-745c-4bf6-988d-89d2adc42df4))
2019-01-30T22:45:43.693826Z [enricher-1] ๐Ÿฅ Enriched Envelope(payload=RichData(value=1, square=1), metadata=Metadata(timestampMillis=1548888343659, correlationId=8704b3b5-c357-4044-bc5d-995fe6a4797e))
2019-01-30T22:45:43.693836Z [producer-0] ๐ŸฅProducing Envelope(payload=RawData(value=8), metadata=Metadata(timestampMillis=1548888343693, correlationId=9d66c785-f765-4044-930f-4b7a87668c59))
2019-01-30T22:45:43.694165Z [producer-0] ๐ŸฅProducing Envelope(payload=RawData(value=9), metadata=Metadata(timestampMillis=1548888343694, correlationId=e0449c6b-a720-4a0d-953b-3f2e25930177))
2019-01-30T22:45:43.694342Z [producer-0] ๐ŸฅProducing Envelope(payload=RawData(value=10), metadata=Metadata(timestampMillis=1548888343694, correlationId=7b8ceefa-eebf-47ca-bc3a-1322193d2d64))
2019-01-30T22:45:43.694343Z [updater-1] ๐Ÿ“ Writing Envelope(payload=RichData(value=1, square=1), metadata=Metadata(timestampMillis=1548888343659, correlationId=8704b3b5-c357-4044-bc5d-995fe6a4797e)), processed in 35ms
2019-01-30T22:45:43.694643Z [producer-0] ๐ŸฅProducing Envelope(payload=RawData(value=11), metadata=Metadata(timestampMillis=1548888343694, correlationId=f87da826-7a94-4373-8dae-8c5bd72bb27e))
2019-01-30T22:45:43.694730Z [enricher-1] ๐Ÿฅ Processing Envelope(payload=RawData(value=2), metadata=Metadata(timestampMillis=1548888343692, correlationId=efaa86ff-292d-4657-bd63-925779caa133))
2019-01-30T22:45:43.694885Z [producer-0] ๐ŸฅProducing Envelope(payload=RawData(value=12), metadata=Metadata(timestampMillis=1548888343694, correlationId=2188daea-4dc0-4004-adc8-af634413284f))
2019-01-30T22:45:43.694891Z [enricher-1] ๐Ÿฅ Enriched Envelope(payload=RichData(value=2, square=4), metadata=Metadata(timestampMillis=1548888343692, correlationId=efaa86ff-292d-4657-bd63-925779caa133))
2019-01-30T22:45:43.695178Z [producer-0] ๐ŸฅProducing Envelope(payload=RawData(value=13), metadata=Metadata(timestampMillis=1548888343695, correlationId=1942a54d-f914-4be0-b511-485c3fb4c138))
2019-01-30T22:45:43.695178Z [enricher-1] ๐Ÿฅ Processing Envelope(payload=RawData(value=3), metadata=Metadata(timestampMillis=1548888343692, correlationId=b266e28d-bffd-4066-8bd9-b103dc67d5bc))
2019-01-30T22:45:43.695391Z [producer-0] ๐ŸฅProducing Envelope(payload=RawData(value=14), metadata=Metadata(timestampMillis=1548888343695, correlationId=61f6f513-b1b3-4fb2-ab16-09c5e42408ab))
2019-01-30T22:45:43.695414Z [enricher-1] ๐Ÿฅ Enriched Envelope(payload=RichData(value=3, square=9), metadata=Metadata(timestampMillis=1548888343692, correlationId=b266e28d-bffd-4066-8bd9-b103dc67d5bc))
2019-01-30T22:45:43.695642Z [enricher-1] ๐Ÿฅ Processing Envelope(payload=RawData(value=4), metadata=Metadata(timestampMillis=1548888343693, correlationId=f5d468be-6845-4973-ba25-ebf773152712))
2019-01-30T22:45:43.695772Z [enricher-1] ๐Ÿฅ Enriched Envelope(payload=RichData(value=4, square=16), metadata=Metadata(timestampMillis=1548888343693, correlationId=f5d468be-6845-4973-ba25-ebf773152712))
2019-01-30T22:45:43.695936Z [enricher-1] ๐Ÿฅ Processing Envelope(payload=RawData(value=5), metadata=Metadata(timestampMillis=1548888343693, correlationId=039b9f12-35f1-4b82-80af-07279eb17c8f))
2019-01-30T22:45:43.696087Z [enricher-1] ๐Ÿฅ Enriched Envelope(payload=RichData(value=5, square=25), metadata=Metadata(timestampMillis=1548888343693, correlationId=039b9f12-35f1-4b82-80af-07279eb17c8f))
2019-01-30T22:45:43.696225Z [enricher-1] ๐Ÿฅ Processing Envelope(payload=RawData(value=6), metadata=Metadata(timestampMillis=1548888343693, correlationId=718d484d-fa21-4345-940c-6ad738914725))
2019-01-30T22:45:43.696359Z [enricher-1] ๐Ÿฅ Enriched Envelope(payload=RichData(value=6, square=36), metadata=Metadata(timestampMillis=1548888343693, correlationId=718d484d-fa21-4345-940c-6ad738914725))
2019-01-30T22:45:43.696498Z [enricher-1] ๐Ÿฅ Processing Envelope(payload=RawData(value=7), metadata=Metadata(timestampMillis=1548888343693, correlationId=fee9b735-745c-4bf6-988d-89d2adc42df4))
2019-01-30T22:45:43.696621Z [enricher-1] ๐Ÿฅ Enriched Envelope(payload=RichData(value=7, square=49), metadata=Metadata(timestampMillis=1548888343693, correlationId=fee9b735-745c-4bf6-988d-89d2adc42df4))
2019-01-30T22:45:43.696753Z [enricher-1] ๐Ÿฅ Processing Envelope(payload=RawData(value=8), metadata=Metadata(timestampMillis=1548888343693, correlationId=9d66c785-f765-4044-930f-4b7a87668c59))
2019-01-30T22:45:43.696875Z [enricher-1] ๐Ÿฅ Enriched Envelope(payload=RichData(value=8, square=64), metadata=Metadata(timestampMillis=1548888343693, correlationId=9d66c785-f765-4044-930f-4b7a87668c59))
2019-01-30T22:45:43.697006Z [enricher-1] ๐Ÿฅ Processing Envelope(payload=RawData(value=9), metadata=Metadata(timestampMillis=1548888343694, correlationId=e0449c6b-a720-4a0d-953b-3f2e25930177))
2019-01-30T22:45:43.697142Z [enricher-1] ๐Ÿฅ Enriched Envelope(payload=RichData(value=9, square=81), metadata=Metadata(timestampMillis=1548888343694, correlationId=e0449c6b-a720-4a0d-953b-3f2e25930177))
2019-01-30T22:45:43.697278Z [enricher-1] ๐Ÿฅ Processing Envelope(payload=RawData(value=10), metadata=Metadata(timestampMillis=1548888343694, correlationId=7b8ceefa-eebf-47ca-bc3a-1322193d2d64))
2019-01-30T22:45:43.697403Z [enricher-1] ๐Ÿฅ Enriched Envelope(payload=RichData(value=10, square=100), metadata=Metadata(timestampMillis=1548888343694, correlationId=7b8ceefa-eebf-47ca-bc3a-1322193d2d64))
2019-01-30T22:45:43.697537Z [enricher-1] ๐Ÿฅ Processing Envelope(payload=RawData(value=11), metadata=Metadata(timestampMillis=1548888343694, correlationId=f87da826-7a94-4373-8dae-8c5bd72bb27e))
2019-01-30T22:45:43.697548Z [producer-0] ๐ŸฅProducing Envelope(payload=RawData(value=15), metadata=Metadata(timestampMillis=1548888343697, correlationId=bc7bd5e0-2745-46d0-b01b-f7c86d179559))
2019-01-30T22:45:43.697676Z [enricher-1] ๐Ÿฅ Enriched Envelope(payload=RichData(value=11, square=121), metadata=Metadata(timestampMillis=1548888343694, correlationId=f87da826-7a94-4373-8dae-8c5bd72bb27e))
2019-01-30T22:45:43.697892Z [enricher-1] ๐Ÿฅ Processing Envelope(payload=RawData(value=12), metadata=Metadata(timestampMillis=1548888343694, correlationId=2188daea-4dc0-4004-adc8-af634413284f))
2019-01-30T22:45:43.698051Z [enricher-1] ๐Ÿฅ Enriched Envelope(payload=RichData(value=12, square=144), metadata=Metadata(timestampMillis=1548888343694, correlationId=2188daea-4dc0-4004-adc8-af634413284f))
2019-01-30T22:45:43.698650Z [producer-0] ๐Ÿ›‘ Completed. Exception: null
2019-01-30T22:45:43.797951Z [updater-1] โœ… Done with Envelope(payload=RichData(value=1, square=1), metadata=Metadata(timestampMillis=1548888343659, correlationId=8704b3b5-c357-4044-bc5d-995fe6a4797e))
2019-01-30T22:45:43.798638Z [main] ๐Ÿ Processed 1 : Envelope(payload=RichData(value=1, square=1), metadata=Metadata(timestampMillis=1548888343659, correlationId=8704b3b5-c357-4044-bc5d-995fe6a4797e))
2019-01-30T22:45:43.798806Z [updater-1] ๐Ÿ“ Writing Envelope(payload=RichData(value=2, square=4), metadata=Metadata(timestampMillis=1548888343692, correlationId=efaa86ff-292d-4657-bd63-925779caa133)), processed in 106ms
2019-01-30T22:45:43.798936Z [enricher-2] ๐Ÿฅ Processing Envelope(payload=RawData(value=13), metadata=Metadata(timestampMillis=1548888343695, correlationId=1942a54d-f914-4be0-b511-485c3fb4c138))
2019-01-30T22:45:43.799176Z [enricher-2] ๐Ÿฅ Enriched Envelope(payload=RichData(value=13, square=169), metadata=Metadata(timestampMillis=1548888343695, correlationId=1942a54d-f914-4be0-b511-485c3fb4c138))
2019-01-30T22:45:43.901131Z [updater-1] โœ… Done with Envelope(payload=RichData(value=2, square=4), metadata=Metadata(timestampMillis=1548888343692, correlationId=efaa86ff-292d-4657-bd63-925779caa133))
2019-01-30T22:45:43.901515Z [main] ๐Ÿ Processed 2 : Envelope(payload=RichData(value=2, square=4), metadata=Metadata(timestampMillis=1548888343692, correlationId=efaa86ff-292d-4657-bd63-925779caa133))
2019-01-30T22:45:43.902077Z [updater-1] ๐Ÿ“ Writing Envelope(payload=RichData(value=3, square=9), metadata=Metadata(timestampMillis=1548888343692, correlationId=b266e28d-bffd-4066-8bd9-b103dc67d5bc)), processed in 210ms
2019-01-30T22:45:43.902125Z [enricher-0] ๐Ÿฅ Processing Envelope(payload=RawData(value=14), metadata=Metadata(timestampMillis=1548888343695, correlationId=61f6f513-b1b3-4fb2-ab16-09c5e42408ab))
2019-01-30T22:45:43.902342Z [enricher-0] ๐Ÿฅ Enriched Envelope(payload=RichData(value=14, square=196), metadata=Metadata(timestampMillis=1548888343695, correlationId=61f6f513-b1b3-4fb2-ab16-09c5e42408ab))
2019-01-30T22:45:44.002874Z [updater-1] โœ… Done with Envelope(payload=RichData(value=3, square=9), metadata=Metadata(timestampMillis=1548888343692, correlationId=b266e28d-bffd-4066-8bd9-b103dc67d5bc))
2019-01-30T22:45:44.003432Z [main] ๐Ÿ Processed 3 : Envelope(payload=RichData(value=3, square=9), metadata=Metadata(timestampMillis=1548888343692, correlationId=b266e28d-bffd-4066-8bd9-b103dc67d5bc))
2019-01-30T22:45:44.003488Z [enricher-1] ๐Ÿฅ Processing Envelope(payload=RawData(value=15), metadata=Metadata(timestampMillis=1548888343697, correlationId=bc7bd5e0-2745-46d0-b01b-f7c86d179559))
2019-01-30T22:45:44.003448Z [updater-1] ๐Ÿ“ Writing Envelope(payload=RichData(value=4, square=16), metadata=Metadata(timestampMillis=1548888343693, correlationId=f5d468be-6845-4973-ba25-ebf773152712)), processed in 310ms
2019-01-30T22:45:44.003834Z [enricher-1] ๐Ÿฅ Enriched Envelope(payload=RichData(value=15, square=225), metadata=Metadata(timestampMillis=1548888343697, correlationId=bc7bd5e0-2745-46d0-b01b-f7c86d179559))
2019-01-30T22:45:44.108139Z [updater-1] โœ… Done with Envelope(payload=RichData(value=4, square=16), metadata=Metadata(timestampMillis=1548888343693, correlationId=f5d468be-6845-4973-ba25-ebf773152712))
2019-01-30T22:45:44.108577Z [updater-1] ๐Ÿ“ Writing Envelope(payload=RichData(value=5, square=25), metadata=Metadata(timestampMillis=1548888343693, correlationId=039b9f12-35f1-4b82-80af-07279eb17c8f)), processed in 415ms
2019-01-30T22:45:44.108548Z [main] ๐Ÿ Processed 4 : Envelope(payload=RichData(value=4, square=16), metadata=Metadata(timestampMillis=1548888343693, correlationId=f5d468be-6845-4973-ba25-ebf773152712))
2019-01-30T22:45:44.109303Z [enricher-2] ๐Ÿ›‘ Completed. Exception: null
2019-01-30T22:45:44.210843Z [updater-1] โœ… Done with Envelope(payload=RichData(value=5, square=25), metadata=Metadata(timestampMillis=1548888343693, correlationId=039b9f12-35f1-4b82-80af-07279eb17c8f))
2019-01-30T22:45:44.211289Z [updater-1] ๐Ÿ“ Writing Envelope(payload=RichData(value=6, square=36), metadata=Metadata(timestampMillis=1548888343693, correlationId=718d484d-fa21-4345-940c-6ad738914725)), processed in 518ms
2019-01-30T22:45:44.211250Z [main] ๐Ÿ Processed 5 : Envelope(payload=RichData(value=5, square=25), metadata=Metadata(timestampMillis=1548888343693, correlationId=039b9f12-35f1-4b82-80af-07279eb17c8f))
2019-01-30T22:45:44.316362Z [updater-1] โœ… Done with Envelope(payload=RichData(value=6, square=36), metadata=Metadata(timestampMillis=1548888343693, correlationId=718d484d-fa21-4345-940c-6ad738914725))
2019-01-30T22:45:44.316923Z [main] ๐Ÿ Processed 6 : Envelope(payload=RichData(value=6, square=36), metadata=Metadata(timestampMillis=1548888343693, correlationId=718d484d-fa21-4345-940c-6ad738914725))
2019-01-30T22:45:44.316908Z [updater-1] ๐Ÿ“ Writing Envelope(payload=RichData(value=7, square=49), metadata=Metadata(timestampMillis=1548888343693, correlationId=fee9b735-745c-4bf6-988d-89d2adc42df4)), processed in 623ms
2019-01-30T22:45:44.417721Z [updater-1] โœ… Done with Envelope(payload=RichData(value=7, square=49), metadata=Metadata(timestampMillis=1548888343693, correlationId=fee9b735-745c-4bf6-988d-89d2adc42df4))
2019-01-30T22:45:44.418917Z [updater-1] ๐Ÿ“ Writing Envelope(payload=RichData(value=8, square=64), metadata=Metadata(timestampMillis=1548888343693, correlationId=9d66c785-f765-4044-930f-4b7a87668c59)), processed in 725ms
2019-01-30T22:45:44.418966Z [main] ๐Ÿ Processed 7 : Envelope(payload=RichData(value=7, square=49), metadata=Metadata(timestampMillis=1548888343693, correlationId=fee9b735-745c-4bf6-988d-89d2adc42df4))
2019-01-30T22:45:44.522580Z [updater-1] โœ… Done with Envelope(payload=RichData(value=8, square=64), metadata=Metadata(timestampMillis=1548888343693, correlationId=9d66c785-f765-4044-930f-4b7a87668c59))
2019-01-30T22:45:44.522889Z [updater-1] ๐Ÿ“ Writing Envelope(payload=RichData(value=9, square=81), metadata=Metadata(timestampMillis=1548888343694, correlationId=e0449c6b-a720-4a0d-953b-3f2e25930177)), processed in 828ms
2019-01-30T22:45:44.522913Z [main] ๐Ÿ Processed 8 : Envelope(payload=RichData(value=8, square=64), metadata=Metadata(timestampMillis=1548888343693, correlationId=9d66c785-f765-4044-930f-4b7a87668c59))
2019-01-30T22:45:44.626517Z [updater-1] โœ… Done with Envelope(payload=RichData(value=9, square=81), metadata=Metadata(timestampMillis=1548888343694, correlationId=e0449c6b-a720-4a0d-953b-3f2e25930177))
2019-01-30T22:45:44.627721Z [updater-1] ๐Ÿ“ Writing Envelope(payload=RichData(value=10, square=100), metadata=Metadata(timestampMillis=1548888343694, correlationId=7b8ceefa-eebf-47ca-bc3a-1322193d2d64)), processed in 933ms
2019-01-30T22:45:44.627687Z [main] ๐Ÿ Processed 9 : Envelope(payload=RichData(value=9, square=81), metadata=Metadata(timestampMillis=1548888343694, correlationId=e0449c6b-a720-4a0d-953b-3f2e25930177))
2019-01-30T22:45:44.730284Z [updater-1] โœ… Done with Envelope(payload=RichData(value=10, square=100), metadata=Metadata(timestampMillis=1548888343694, correlationId=7b8ceefa-eebf-47ca-bc3a-1322193d2d64))
2019-01-30T22:45:44.730587Z [updater-1] ๐Ÿ“ Writing Envelope(payload=RichData(value=11, square=121), metadata=Metadata(timestampMillis=1548888343694, correlationId=f87da826-7a94-4373-8dae-8c5bd72bb27e)), processed in 1036ms
2019-01-30T22:45:44.730599Z [main] ๐Ÿ Processed 10 : Envelope(payload=RichData(value=10, square=100), metadata=Metadata(timestampMillis=1548888343694, correlationId=7b8ceefa-eebf-47ca-bc3a-1322193d2d64))
2019-01-30T22:45:44.834934Z [updater-1] โœ… Done with Envelope(payload=RichData(value=11, square=121), metadata=Metadata(timestampMillis=1548888343694, correlationId=f87da826-7a94-4373-8dae-8c5bd72bb27e))
2019-01-30T22:45:44.835236Z [updater-1] ๐Ÿ“ Writing Envelope(payload=RichData(value=12, square=144), metadata=Metadata(timestampMillis=1548888343694, correlationId=2188daea-4dc0-4004-adc8-af634413284f)), processed in 1141ms
2019-01-30T22:45:44.835253Z [main] ๐Ÿ Processed 11 : Envelope(payload=RichData(value=11, square=121), metadata=Metadata(timestampMillis=1548888343694, correlationId=f87da826-7a94-4373-8dae-8c5bd72bb27e))
2019-01-30T22:45:44.936474Z [updater-1] โœ… Done with Envelope(payload=RichData(value=12, square=144), metadata=Metadata(timestampMillis=1548888343694, correlationId=2188daea-4dc0-4004-adc8-af634413284f))
2019-01-30T22:45:44.936821Z [main] ๐Ÿ Processed 12 : Envelope(payload=RichData(value=12, square=144), metadata=Metadata(timestampMillis=1548888343694, correlationId=2188daea-4dc0-4004-adc8-af634413284f))
2019-01-30T22:45:44.936799Z [updater-1] ๐Ÿ“ Writing Envelope(payload=RichData(value=13, square=169), metadata=Metadata(timestampMillis=1548888343695, correlationId=1942a54d-f914-4be0-b511-485c3fb4c138)), processed in 1241ms
2019-01-30T22:45:45.037599Z [updater-1] โœ… Done with Envelope(payload=RichData(value=13, square=169), metadata=Metadata(timestampMillis=1548888343695, correlationId=1942a54d-f914-4be0-b511-485c3fb4c138))
2019-01-30T22:45:45.038629Z [updater-1] ๐Ÿ“ Writing Envelope(payload=RichData(value=14, square=196), metadata=Metadata(timestampMillis=1548888343695, correlationId=61f6f513-b1b3-4fb2-ab16-09c5e42408ab)), processed in 1343ms
2019-01-30T22:45:45.038636Z [main] ๐Ÿ Processed 13 : Envelope(payload=RichData(value=13, square=169), metadata=Metadata(timestampMillis=1548888343695, correlationId=1942a54d-f914-4be0-b511-485c3fb4c138))
2019-01-30T22:45:45.142661Z [updater-1] โœ… Done with Envelope(payload=RichData(value=14, square=196), metadata=Metadata(timestampMillis=1548888343695, correlationId=61f6f513-b1b3-4fb2-ab16-09c5e42408ab))
2019-01-30T22:45:45.143027Z [updater-1] ๐Ÿ“ Writing Envelope(payload=RichData(value=15, square=225), metadata=Metadata(timestampMillis=1548888343697, correlationId=bc7bd5e0-2745-46d0-b01b-f7c86d179559)), processed in 1446ms
2019-01-30T22:45:45.143067Z [main] ๐Ÿ Processed 14 : Envelope(payload=RichData(value=14, square=196), metadata=Metadata(timestampMillis=1548888343695, correlationId=61f6f513-b1b3-4fb2-ab16-09c5e42408ab))
2019-01-30T22:45:45.244791Z [updater-1] โœ… Done with Envelope(payload=RichData(value=15, square=225), metadata=Metadata(timestampMillis=1548888343697, correlationId=bc7bd5e0-2745-46d0-b01b-f7c86d179559))
2019-01-30T22:45:45.245192Z [main] ๐Ÿ Processed 15 : Envelope(payload=RichData(value=15, square=225), metadata=Metadata(timestampMillis=1548888343697, correlationId=bc7bd5e0-2745-46d0-b01b-f7c86d179559))
2019-01-30T22:45:45.245485Z [updater-1] ๐Ÿ›‘ Completed. Exception: null
2019-01-30T22:45:45.247817Z [main] The End
2019-01-30T22:45:45.248840Z [main] Done in 1678 ms

As you can see, we have simulated fast producer and slow consumer. Producer initially started and was producing messages unless it’s buffer became full. Then Enricher started processing messages unblocking producer. Next bottleneck was in Updater (Thread.sleep(500) did the job). At steady mode Producer and Enricher are limited by the Updater performance. Automatic back-pressure support is really nice feature of Kotlin coroutines.

You may noticed that processing time is O(N) : 1678ms ~= (15 * 100ms). Even if we have created thread pools, it is only one actor of each type working at a time. Let’s change our pipeline so multiple actor work in parallel.

                                          -โ†’ (๐Ÿ‘คUpdater-0) -โœ‰๏ธโ†’
(๐Ÿ‘คProducer) -โœ‰๏ธโ†’ ๐Ÿ“ฌ(๐Ÿ‘คEnricher) -โœ‰๏ธโ†’ ๐Ÿ“ฌ  -โ†’ (๐Ÿ‘คUpdater-1) -โœ‰๏ธโ†’   ๐Ÿ“ฌ
           RawData             RichData   -โ†’ (๐Ÿ‘คUpdater-2) -โœ‰๏ธโ†’ 
                                                           Done

We introduce Done message so Updaters could signal that it has processed all messages. Let’s add new message type to Messages.kotlin:

object Done

Our pipeline will change to:

const val TOTAL = 15
const val PARALLEL_ACTORS = 5

@ExperimentalCoroutinesApi
@InternalCoroutinesApi
fun main() {
    val time = measureTimeMillis {

        runBlocking {
            val raw = producerActor(TOTAL)
            val enriched = enricherActor(raw)

            val completed = Channel<Done>(5)

            repeat(PARALLEL_ACTORS) { // launch 5 Updaters in parallel
                updaterActor(enriched, completed)
            }

            var counter = 0
            for (msg in completed) {
                if (msg === Done) {
                    counter++
                    log("๐Ÿ Updater is finished")
                    if (counter == PARALLEL_ACTORS) {
                        break // break when all Updaters have finished
                    }
                }
            }

            log("The End")
            coroutineContext.cancelChildren()
        }
    }
    log("Done in $time ms")
}

Updater will be changed to:

private val context = Executors.newFixedThreadPool(5, NamedThreadFactory("updater")).asCoroutineDispatcher()

@ExperimentalCoroutinesApi
@InternalCoroutinesApi
fun CoroutineScope.updaterActor(
    inbox: ReceiveChannel<Envelope<RichData>>,
    updated: Channel<Done>
) = launch(context = context) {
    for (msg in inbox) { // iterate over incoming messages
        val created = msg.metadata.startMillis
        log("๐Ÿ“ Writing $msg, processed in ${Instant.now().toEpochMilli() - created}ms")
        Thread.sleep(100) // to simulate blocking operation
        log("โœ… Done with $msg")
    }
    updated.send(Done)
}

Now let’s run:

2019-03-25T09:10:52.538Z [producer-0] ๐ŸฅProducing Envelope(payload=RawData(value=1), metadata=Metadata(startMillis=1553505052537))
2019-03-25T09:10:52.617Z [producer-0] ๐ŸฅProducing Envelope(payload=RawData(value=2), metadata=Metadata(startMillis=1553505052617))
2019-03-25T09:10:52.617Z [producer-0] ๐ŸฅProducing Envelope(payload=RawData(value=3), metadata=Metadata(startMillis=1553505052617))
2019-03-25T09:10:52.617Z [producer-0] ๐ŸฅProducing Envelope(payload=RawData(value=4), metadata=Metadata(startMillis=1553505052617))
2019-03-25T09:10:52.618Z [producer-0] ๐ŸฅProducing Envelope(payload=RawData(value=5), metadata=Metadata(startMillis=1553505052618))
2019-03-25T09:10:52.618Z [producer-0] ๐ŸฅProducing Envelope(payload=RawData(value=6), metadata=Metadata(startMillis=1553505052618))
2019-03-25T09:10:52.618Z [enricher-1] ๐Ÿฅ Processing Envelope(payload=RawData(value=1), metadata=Metadata(startMillis=1553505052537))
2019-03-25T09:10:52.618Z [producer-0] ๐ŸฅProducing Envelope(payload=RawData(value=7), metadata=Metadata(startMillis=1553505052618))
2019-03-25T09:10:52.618Z [producer-0] ๐ŸฅProducing Envelope(payload=RawData(value=8), metadata=Metadata(startMillis=1553505052618))
2019-03-25T09:10:52.618Z [producer-0] ๐ŸฅProducing Envelope(payload=RawData(value=9), metadata=Metadata(startMillis=1553505052618))
2019-03-25T09:10:52.618Z [producer-0] ๐ŸฅProducing Envelope(payload=RawData(value=10), metadata=Metadata(startMillis=1553505052618))
2019-03-25T09:10:52.618Z [producer-0] ๐ŸฅProducing Envelope(payload=RawData(value=11), metadata=Metadata(startMillis=1553505052618))
2019-03-25T09:10:52.618Z [producer-0] ๐ŸฅProducing Envelope(payload=RawData(value=12), metadata=Metadata(startMillis=1553505052618))
2019-03-25T09:10:52.618Z [enricher-1] ๐Ÿฅ Enriched Envelope(payload=RichData(value=1, square=1), metadata=Metadata(startMillis=1553505052537))
2019-03-25T09:10:52.619Z [enricher-1] ๐Ÿฅ Processing Envelope(payload=RawData(value=2), metadata=Metadata(startMillis=1553505052617))
2019-03-25T09:10:52.619Z [enricher-1] ๐Ÿฅ Enriched Envelope(payload=RichData(value=2, square=4), metadata=Metadata(startMillis=1553505052617))
2019-03-25T09:10:52.619Z [updater-0] ๐Ÿ“ Writing Envelope(payload=RichData(value=1, square=1), metadata=Metadata(startMillis=1553505052537)), processed in 82ms
2019-03-25T09:10:52.619Z [enricher-1] ๐Ÿฅ Processing Envelope(payload=RawData(value=3), metadata=Metadata(startMillis=1553505052617))
2019-03-25T09:10:52.619Z [updater-1] ๐Ÿ“ Writing Envelope(payload=RichData(value=2, square=4), metadata=Metadata(startMillis=1553505052617)), processed in 2ms
2019-03-25T09:10:52.619Z [enricher-1] ๐Ÿฅ Enriched Envelope(payload=RichData(value=3, square=9), metadata=Metadata(startMillis=1553505052617))
2019-03-25T09:10:52.619Z [enricher-1] ๐Ÿฅ Processing Envelope(payload=RawData(value=4), metadata=Metadata(startMillis=1553505052617))
2019-03-25T09:10:52.619Z [updater-2] ๐Ÿ“ Writing Envelope(payload=RichData(value=3, square=9), metadata=Metadata(startMillis=1553505052617)), processed in 2ms
2019-03-25T09:10:52.619Z [enricher-1] ๐Ÿฅ Enriched Envelope(payload=RichData(value=4, square=16), metadata=Metadata(startMillis=1553505052617))
2019-03-25T09:10:52.619Z [enricher-1] ๐Ÿฅ Processing Envelope(payload=RawData(value=5), metadata=Metadata(startMillis=1553505052618))
2019-03-25T09:10:52.619Z [updater-3] ๐Ÿ“ Writing Envelope(payload=RichData(value=4, square=16), metadata=Metadata(startMillis=1553505052617)), processed in 2ms
2019-03-25T09:10:52.620Z [enricher-1] ๐Ÿฅ Enriched Envelope(payload=RichData(value=5, square=25), metadata=Metadata(startMillis=1553505052618))
2019-03-25T09:10:52.620Z [enricher-1] ๐Ÿฅ Processing Envelope(payload=RawData(value=6), metadata=Metadata(startMillis=1553505052618))
2019-03-25T09:10:52.620Z [updater-4] ๐Ÿ“ Writing Envelope(payload=RichData(value=5, square=25), metadata=Metadata(startMillis=1553505052618)), processed in 2ms
2019-03-25T09:10:52.620Z [enricher-1] ๐Ÿฅ Enriched Envelope(payload=RichData(value=6, square=36), metadata=Metadata(startMillis=1553505052618))
2019-03-25T09:10:52.620Z [enricher-1] ๐Ÿฅ Processing Envelope(payload=RawData(value=7), metadata=Metadata(startMillis=1553505052618))
2019-03-25T09:10:52.620Z [producer-0] ๐ŸฅProducing Envelope(payload=RawData(value=13), metadata=Metadata(startMillis=1553505052620))
2019-03-25T09:10:52.620Z [enricher-1] ๐Ÿฅ Enriched Envelope(payload=RichData(value=7, square=49), metadata=Metadata(startMillis=1553505052618))
2019-03-25T09:10:52.620Z [producer-0] ๐ŸฅProducing Envelope(payload=RawData(value=14), metadata=Metadata(startMillis=1553505052620))
2019-03-25T09:10:52.620Z [enricher-1] ๐Ÿฅ Processing Envelope(payload=RawData(value=8), metadata=Metadata(startMillis=1553505052618))
2019-03-25T09:10:52.620Z [producer-0] ๐ŸฅProducing Envelope(payload=RawData(value=15), metadata=Metadata(startMillis=1553505052620))
2019-03-25T09:10:52.620Z [enricher-1] ๐Ÿฅ Enriched Envelope(payload=RichData(value=8, square=64), metadata=Metadata(startMillis=1553505052618))
2019-03-25T09:10:52.620Z [enricher-1] ๐Ÿฅ Processing Envelope(payload=RawData(value=9), metadata=Metadata(startMillis=1553505052618))
2019-03-25T09:10:52.621Z [enricher-1] ๐Ÿฅ Enriched Envelope(payload=RichData(value=9, square=81), metadata=Metadata(startMillis=1553505052618))
2019-03-25T09:10:52.621Z [enricher-1] ๐Ÿฅ Processing Envelope(payload=RawData(value=10), metadata=Metadata(startMillis=1553505052618))
2019-03-25T09:10:52.621Z [enricher-1] ๐Ÿฅ Enriched Envelope(payload=RichData(value=10, square=100), metadata=Metadata(startMillis=1553505052618))
2019-03-25T09:10:52.621Z [enricher-1] ๐Ÿฅ Processing Envelope(payload=RawData(value=11), metadata=Metadata(startMillis=1553505052618))
2019-03-25T09:10:52.621Z [enricher-1] ๐Ÿฅ Enriched Envelope(payload=RichData(value=11, square=121), metadata=Metadata(startMillis=1553505052618))
2019-03-25T09:10:52.621Z [enricher-1] ๐Ÿฅ Processing Envelope(payload=RawData(value=12), metadata=Metadata(startMillis=1553505052618))
2019-03-25T09:10:52.621Z [enricher-1] ๐Ÿฅ Enriched Envelope(payload=RichData(value=12, square=144), metadata=Metadata(startMillis=1553505052618))
2019-03-25T09:10:52.621Z [enricher-1] ๐Ÿฅ Processing Envelope(payload=RawData(value=13), metadata=Metadata(startMillis=1553505052620))
2019-03-25T09:10:52.621Z [enricher-1] ๐Ÿฅ Enriched Envelope(payload=RichData(value=13, square=169), metadata=Metadata(startMillis=1553505052620))
2019-03-25T09:10:52.621Z [enricher-1] ๐Ÿฅ Processing Envelope(payload=RawData(value=14), metadata=Metadata(startMillis=1553505052620))
2019-03-25T09:10:52.622Z [enricher-1] ๐Ÿฅ Enriched Envelope(payload=RichData(value=14, square=196), metadata=Metadata(startMillis=1553505052620))
2019-03-25T09:10:52.622Z [enricher-1] ๐Ÿฅ Processing Envelope(payload=RawData(value=15), metadata=Metadata(startMillis=1553505052620))
2019-03-25T09:10:52.622Z [enricher-1] ๐Ÿฅ Enriched Envelope(payload=RichData(value=15, square=225), metadata=Metadata(startMillis=1553505052620))
2019-03-25T09:10:52.622Z [producer-0] ๐Ÿ›‘ Completed. Exception: null
2019-03-25T09:10:52.622Z [enricher-1] ๐Ÿ›‘ Completed. Exception: null
2019-03-25T09:10:52.721Z [updater-1] โœ… Done with Envelope(payload=RichData(value=2, square=4), metadata=Metadata(startMillis=1553505052617))
2019-03-25T09:10:52.721Z [updater-3] โœ… Done with Envelope(payload=RichData(value=4, square=16), metadata=Metadata(startMillis=1553505052617))
2019-03-25T09:10:52.721Z [updater-3] ๐Ÿ“ Writing Envelope(payload=RichData(value=7, square=49), metadata=Metadata(startMillis=1553505052618)), processed in 103ms
2019-03-25T09:10:52.721Z [updater-0] โœ… Done with Envelope(payload=RichData(value=1, square=1), metadata=Metadata(startMillis=1553505052537))
2019-03-25T09:10:52.721Z [updater-0] ๐Ÿ“ Writing Envelope(payload=RichData(value=8, square=64), metadata=Metadata(startMillis=1553505052618)), processed in 103ms
2019-03-25T09:10:52.721Z [updater-2] โœ… Done with Envelope(payload=RichData(value=3, square=9), metadata=Metadata(startMillis=1553505052617))
2019-03-25T09:10:52.721Z [updater-4] โœ… Done with Envelope(payload=RichData(value=5, square=25), metadata=Metadata(startMillis=1553505052618))
2019-03-25T09:10:52.721Z [updater-2] ๐Ÿ“ Writing Envelope(payload=RichData(value=9, square=81), metadata=Metadata(startMillis=1553505052618)), processed in 103ms
2019-03-25T09:10:52.721Z [updater-1] ๐Ÿ“ Writing Envelope(payload=RichData(value=6, square=36), metadata=Metadata(startMillis=1553505052618)), processed in 103ms
2019-03-25T09:10:52.721Z [updater-4] ๐Ÿ“ Writing Envelope(payload=RichData(value=10, square=100), metadata=Metadata(startMillis=1553505052618)), processed in 103ms
2019-03-25T09:10:52.826Z [updater-2] โœ… Done with Envelope(payload=RichData(value=9, square=81), metadata=Metadata(startMillis=1553505052618))
2019-03-25T09:10:52.826Z [updater-4] โœ… Done with Envelope(payload=RichData(value=10, square=100), metadata=Metadata(startMillis=1553505052618))
2019-03-25T09:10:52.826Z [updater-0] โœ… Done with Envelope(payload=RichData(value=8, square=64), metadata=Metadata(startMillis=1553505052618))
2019-03-25T09:10:52.826Z [updater-3] โœ… Done with Envelope(payload=RichData(value=7, square=49), metadata=Metadata(startMillis=1553505052618))
2019-03-25T09:10:52.826Z [updater-1] โœ… Done with Envelope(payload=RichData(value=6, square=36), metadata=Metadata(startMillis=1553505052618))
2019-03-25T09:10:52.826Z [updater-3] ๐Ÿ“ Writing Envelope(payload=RichData(value=14, square=196), metadata=Metadata(startMillis=1553505052620)), processed in 206ms
2019-03-25T09:10:52.826Z [updater-0] ๐Ÿ“ Writing Envelope(payload=RichData(value=13, square=169), metadata=Metadata(startMillis=1553505052620)), processed in 206ms
2019-03-25T09:10:52.826Z [updater-4] ๐Ÿ“ Writing Envelope(payload=RichData(value=12, square=144), metadata=Metadata(startMillis=1553505052618)), processed in 208ms
2019-03-25T09:10:52.826Z [updater-2] ๐Ÿ“ Writing Envelope(payload=RichData(value=11, square=121), metadata=Metadata(startMillis=1553505052618)), processed in 208ms
2019-03-25T09:10:52.826Z [updater-1] ๐Ÿ“ Writing Envelope(payload=RichData(value=15, square=225), metadata=Metadata(startMillis=1553505052620)), processed in 206ms
2019-03-25T09:10:52.929Z [updater-2] โœ… Done with Envelope(payload=RichData(value=11, square=121), metadata=Metadata(startMillis=1553505052618))
2019-03-25T09:10:52.929Z [updater-4] โœ… Done with Envelope(payload=RichData(value=12, square=144), metadata=Metadata(startMillis=1553505052618))
2019-03-25T09:10:52.929Z [updater-3] โœ… Done with Envelope(payload=RichData(value=14, square=196), metadata=Metadata(startMillis=1553505052620))
2019-03-25T09:10:52.929Z [updater-1] โœ… Done with Envelope(payload=RichData(value=15, square=225), metadata=Metadata(startMillis=1553505052620))
2019-03-25T09:10:52.929Z [updater-0] โœ… Done with Envelope(payload=RichData(value=13, square=169), metadata=Metadata(startMillis=1553505052620))
2019-03-25T09:10:52.931Z [main] ๐Ÿ Updater is finished
2019-03-25T09:10:52.931Z [main] ๐Ÿ Updater is finished
2019-03-25T09:10:52.931Z [main] ๐Ÿ Updater is finished
2019-03-25T09:10:52.931Z [main] ๐Ÿ Updater is finished
2019-03-25T09:10:52.931Z [main] ๐Ÿ Updater is finished
2019-03-25T09:10:52.931Z [main] The End
2019-03-25T09:10:52.958Z [main] Done in 544 ms

Now we see that execution time reduced to 544ms and Updaters work in parallel.

The sources code you may find here

I recommend following resources to learn more about coroutines and structured concurrency in Kotlin:

See Also

Customizing REST API Error Response in Spring Boot / Spring-Security-OAuth2

Defining error format is important part of REST API design.

Spring-Boot and Spring Security provide pretty nice error handling for RESTful APIs out of the box. Although it has to be documented, especially when contract-first approach to API design is used.

It is good idea to follow some common format for error responses. But OAuth2 specification and Spring Boot format may not satisfy those requirements.

logo   Never miss a story, subscribe to our newsletter