kourier

License Maven Central Version Issues Pull Requests codecov CodeFactor Open Source Helpers

Pure Kotlin AMQP/RabbitMQ client and protocol implementation, optimized for KMP and coroutines.

Motivation

Why we made kourier:

  • Pure Kotlin Implementation with no dependency on the Java client or other library.
  • Coroutines-first design, allowing better integration with Kotlin’s concurrency model and asynchronous consuming.
  • Multiplatform support allows compatibility with JVM but also Native targets.
  • Robustness with automatic recovery and reconnection logic, making it resilient to network issues and protocol errors.

Modules

  • amqp-core: Core AMQP 0.9.1 protocol implementation, including frames and encoding/decoding logic.
  • amqp-client: High-level AMQP client built on top of amqp-core, providing connection management, channel handling, and basic operations like publishing and consuming messages.
  • amqp-client-robust: Adds automatic recovery and reconnection logic to the amqp-client, making it more resilient to network issues and protocol errors, inspired by aio-pika’s robust client.

Most of the time you will only need the amqp-client module, which depends itself on amqp-core, or the amqp-client-robust module which depends on both amqp-client and amqp-core if you want automatic recovery features.

Installation

To use kourier, add the following to your build.gradle.kts:

dependencies {
    implementation("dev.kourier:amqp-client:0.2.2")
}

Or if you want the robust client with automatic recovery:

dependencies {
    implementation("dev.kourier:amqp-client-robust:0.2.2")
}

Make sure you have Maven Central configured:

repositories {
    mavenCentral()
}

Usage

Here is a simple example of how to connect to an AMQP server, open a channel and do some stuff with it:

fun main() = runBlocking {
    val config = amqpConfig {
        server {
            host = "127.0.0.1"
            port = 5672
            user = "guest"
            password = "guest"
        }
    }
    val connection = createAMQPConnection(this, config)

    val channel = connection.openChannel()
    channel.exchangeDeclare("my-exchange", BuiltinExchangeType.DIRECT)
    channel.queueDeclare("my-queue", durable = true)
    channel.queueBind("my-queue", "my-exchange", "my-routing-key")
    channel.basicPublish("Hello, AMQP!".toByteArray(), "my-exchange", "my-routing-key")

    val consumer = channel.basicConsume("my-queue")
    for (delivery in consumer) {
        println("Received message: ${delivery.message.body.decodeToString()}")
        delay(10_000) // Simulate processing time
        channel.basicAck(delivery.message)
    }

    channel.close()
    connection.close()
}

Alternative ways to connect to a broker:

// Configuration when connecting
val connection = createAMQPConnection(this) {
    server {
        // ... same as before
    }
}

// Configuration from URL
val config = amqpConfig("amqp://guest:guest@localhost:5672/")
val connection = createAMQPConnection(this, config)

// Directly using a connection string
val connection = createAMQPConnection(this, "amqp://guest:guest@localhost:5672/")

If you want to use the robust client with automatic recovery, you can use createRobustAMQPConnection instead of createAMQPConnection. This will handle reconnections and recovery of channels and consumers automatically.

val connection = createRobustAMQPConnection(this, config) // All configuration options are available as before

// Do stuff with the connection as before

More examples can be found on the tutorial section of the documentation.

Libraries using kourier

If you are using kourier in your library, please let us know by opening a pull request to add it to this list!