PgMqClient

class PgMqClient(val pg: PgMqDbAdapter, options: PgMqClient.Options = Options())(source)

PgMqClient is a client for interfacing with the pgmq system, providing various messaging queue operations. This client makes use of PostgreSQL as a message queue backend.

Parameters

pg

The PgMqDbAdapter instance used for executing database queries.

options

Configuration options for the client, including installation verification and auto-install behavior.

Features:

  • Installation verification of the pgmq extension in the database.

  • Creating and managing queues with optional notification on message insertion.

  • Listing all existing queues.

  • Sending messages to a queue, with support for headers and optional delay.

  • Batch sending of messages to a queue.

  • Popping messages from a queue.

  • Reading messages from a queue with optional visibility timeout.

  • Archiving and deleting messages by single or bulk identifiers.

  • Purging all messages from a queue.

Constructors

Link copied to clipboard
constructor(pg: PgMqDbAdapter, options: PgMqClient.Options = Options())

Types

Link copied to clipboard
data class Options(val autoInstall: Boolean = true, val verifyInstallation: Boolean = true, val installFromFiles: Boolean = false, val intallFilesPath: String = "./pgmq")

A data class that represents configuration options for an installation process.

Link copied to clipboard
data class Queue(val name: String, val unlogged: Boolean = false, val enableNotifyInsert: Boolean = false, val throttleNotifyInterval: Duration = 250.milliseconds)

Represents a queue configuration with specific attributes.

Properties

Link copied to clipboard

Functions

Link copied to clipboard
suspend fun ack(queue: String, id: Long): Result<Boolean>

Acknowledges a message in the specified queue with the given ID by performing a delete operation.

suspend fun ack(queue: String, ids: List<Long>): Result<List<Long>>

Acknowledges the provided list of message IDs for the specified queue.

Link copied to clipboard
suspend fun archive(queue: String, id: Long): Result<Boolean>

Archives an item identified by its ID from a specified queue.

suspend fun archive(queue: String, ids: List<Long>): Result<List<Long>>

Archives the specified items identified by their IDs from the given queue.

context(db: QueryExecutor)
suspend fun archive(queue: String, id: Long): Result<Boolean>

Archives a message in the specified queue by calling the pgmq.archive function.

context(db: QueryExecutor)
suspend fun archive(queue: String, ids: List<Long>): Result<List<Long>>

Archives messages identified by their IDs from a specified queue.

Link copied to clipboard
suspend fun bindTopic(pattern: String, queueName: String): Result<Unit>
context(db: QueryExecutor)
suspend fun bindTopic(pattern: String, queueName: String): Result<Unit>

Creates a topic binding between a pattern and a queue.

suspend fun bindTopic(patterns: List<String>, queueName: String): Result<Unit>

Creates topic bindings for multiple patterns to a single queue.

Link copied to clipboard
suspend fun create(queue: PgMqClient.Queue): Result<Unit>

Creates a queue in the system with the specified configuration.

Link copied to clipboard
suspend fun delete(queue: String, id: Long): Result<Boolean>

Deletes an item with the specified ID from the given queue.

suspend fun delete(queue: String, ids: List<Long>): Result<List<Long>>

Deletes the specified items from the given queue.

context(db: QueryExecutor)
suspend fun delete(queue: String, id: Long): Result<Boolean>

Deletes a message from the specified queue based on its unique identifier.

context(db: QueryExecutor)
suspend fun delete(queue: String, ids: List<Long>): Result<List<Long>>

Deletes messages from the specified queue based on the provided list of IDs.

Link copied to clipboard
suspend fun drop(queue: PgMqClient.Queue): Result<Boolean>

Drops the specified queue from the system.

Link copied to clipboard

Retrieves the list of all available queues in the system.

Link copied to clipboard
suspend fun metrics(): Result<List<Metrics>>

Retrieves a list of metrics from the database using the pgmq.metrics() query.

suspend fun metrics(queue: String): Result<Metrics>

Retrieves metrics for the specified queue.

Link copied to clipboard
suspend fun nack(queue: String, id: Long, vt: Duration = Duration.ZERO): Result<Long>

Sends a negative acknowledgment (nack) to the specified queue for the given message ID, optionally resetting the visibility timeout (VT) to a new duration.

Link copied to clipboard
suspend fun pop(queue: String, quantity: Int = 1): Result<List<Message>>
context(db: QueryExecutor)
suspend fun pop(queue: String, quantity: Int = 1): Result<List<Message>>

Removes and retrieves messages from the specified queue.

Link copied to clipboard
suspend fun purge(queue: PgMqClient.Queue): Result<Long>

Removes all messages from the specified queue.

Link copied to clipboard
suspend fun read(queue: String, quantity: Int = 1, vt: Duration = 30.seconds): Result<List<Message>>

Reads messages from a specified queue.

context(db: QueryExecutor)
suspend fun read(queue: String, quantity: Int = 1, vt: Duration = 30.seconds): Result<List<Message>>

Reads messages from a specified queue with the given parameters.

Link copied to clipboard
suspend fun send(queue: String, message: String, headers: Map<String, String> = emptyMap(), delay: Duration = 0.seconds): Result<Long>
context(db: QueryExecutor)
suspend fun send(queue: String, message: String, headers: Map<String, String> = emptyMap(), delay: Duration = 0.seconds): Result<Long>

Sends a message to the specified queue with optional headers and delay.

suspend fun send(queue: String, messages: List<String>, headers: Map<String, String> = emptyMap(), delay: Duration = 0.seconds): Result<List<Long>>

Sends multiple messages to the specified queue with optional headers and delay.

context(db: QueryExecutor)
suspend fun send(queue: String, messages: List<String>, headers: Map<String, String> = emptyMap(), delay: Duration = 0.seconds): Result<List<Long>>

Sends a batch of messages to the specified queue with optional headers and a delay.

Link copied to clipboard
suspend fun sendTopic(routingKey: String, message: String, headers: Map<String, String> = emptyMap(), delay: Duration = 0.seconds): Result<Long>
context(db: QueryExecutor)
suspend fun sendTopic(routingKey: String, message: String, headers: Map<String, String> = emptyMap(), delay: Duration = 0.seconds): Result<Long>

Sends a message to all queues that match the routing key pattern.

Link copied to clipboard
suspend fun setVt(queue: String, id: Long, vt: Duration): Result<Long>

Updates the visibility timeout (VT) for a specific item in the given queue.

context(db: QueryExecutor)
suspend fun setVt(queue: String, id: Long, vt: Duration): Result<Long>

Updates the visibility timeout (VT) for a message in the specified queue.

Link copied to clipboard
suspend fun unbindTopic(pattern: String, queueName: String): Result<Boolean>

Removes a topic binding between a pattern and a queue.