PgmqClient

class PgmqClient(val pg: PgmqDbAdapter, options: PgmqClient.Options = Options())(source)

PgMqClient is a client for interfacing with the pgmq system, providing various messaging queue operations. This client makes use of PostgreSQL as a message queue backend.

Parameters

pg

The PgMqDbAdapter instance used for executing database queries.

options

Configuration options for the client, including installation verification and auto-install behavior.

Features:

  • Installation verification of the pgmq extension in the database.

  • Creating and managing queues with optional notification on message insertion.

  • Listing all existing queues.

  • Sending messages to a queue, with support for headers and optional delay.

  • Batch sending of messages to a queue.

  • Popping messages from a queue.

  • Reading messages from a queue with optional visibility timeout.

  • Archiving and deleting messages by single or bulk identifiers.

  • Purging all messages from a queue.

Constructors

Link copied to clipboard
constructor(pg: PgmqDbAdapter, options: PgmqClient.Options = Options())

Types

Link copied to clipboard
data class Options(val autoInstall: Boolean = false, val verifyInstallation: Boolean = true, val installFromFiles: Boolean = false, val installFilesPath: String = "./pgmq")

A data class that represents configuration options for an installation process.

Link copied to clipboard
data class Queue(val name: String, val unlogged: Boolean = false, val enableNotifyInsert: Boolean = false, val throttleNotifyInterval: Duration = 250.milliseconds)

Represents a queue configuration with specific attributes.

Properties

Link copied to clipboard

Functions

Link copied to clipboard
suspend fun ack(queue: String, id: Long): Result<Boolean>

Acknowledges a message in the specified queue with the given ID by performing a delete operation.

suspend fun ack(queue: String, ids: List<Long>): Result<List<Long>>

Acknowledges the provided list of message IDs for the specified queue.

Link copied to clipboard
suspend fun archive(queue: String, id: Long): Result<Boolean>

Archives an item identified by its ID from a specified queue.

suspend fun archive(queue: String, ids: List<Long>): Result<List<Long>>

Archives the specified items identified by their IDs from the given queue.

context(db: QueryExecutor)
suspend fun archive(queue: String, id: Long): Result<Boolean>

Archives a message in the specified queue by calling the pgmq.archive function.

context(db: QueryExecutor)
suspend fun archive(queue: String, ids: List<Long>): Result<List<Long>>

Archives messages identified by their IDs from a specified queue.

Link copied to clipboard
suspend fun bindTopic(pattern: String, queueName: String): Result<Unit>
context(db: QueryExecutor)
suspend fun bindTopic(pattern: String, queueName: String): Result<Unit>

Creates a topic binding between a pattern and a queue.

suspend fun bindTopic(patterns: List<String>, queueName: String): Result<Unit>

Creates topic bindings for multiple patterns to a single queue.

Link copied to clipboard
suspend fun create(queue: PgmqClient.Queue): Result<Unit>

Creates a queue in the system with the specified configuration.

Link copied to clipboard
suspend fun delete(queue: String, id: Long): Result<Boolean>

Deletes an item with the specified ID from the given queue.

suspend fun delete(queue: String, ids: List<Long>): Result<List<Long>>

Deletes the specified items from the given queue.

context(db: QueryExecutor)
suspend fun delete(queue: String, id: Long): Result<Boolean>

Deletes a message from the specified queue based on its unique identifier.

context(db: QueryExecutor)
suspend fun delete(queue: String, ids: List<Long>): Result<List<Long>>

Deletes messages from the specified queue based on the provided list of IDs.

Link copied to clipboard
suspend fun drop(queue: PgmqClient.Queue): Result<Boolean>

Drops the specified queue from the system.

Link copied to clipboard
suspend fun install()

Installs the required components or extensions for the pgmq functionality.

Link copied to clipboard
suspend fun installExtension(file: Pair<String, String>)

Installs an extension by executing SQL statements from the provided file. The file content is split into individual SQL statements, which are executed in a single transaction.

Link copied to clipboard
suspend fun installExtentions(path: String)

Installs extensions by reading SQL files from the specified directory and processing each file.

Link copied to clipboard
suspend fun installFromExtension()

Installs the pgmq extension in the PostgreSQL database if it is not already installed.

Link copied to clipboard

Executes database migrations from a list of migration files.

Link copied to clipboard
suspend fun installFromPath(path: String = options.installFilesPath)

Installs the necessary files from a specified directory path.

Link copied to clipboard
suspend fun installFromSqlFiles(files: List<Pair<String, String>>)

Installs database migrations from a list of SQL files.

Link copied to clipboard

Retrieves the list of all available queues in the system.

Link copied to clipboard
suspend fun metrics(): Result<List<Metrics>>

Retrieves a list of metrics from the database using the pgmq.metrics() query.

suspend fun metrics(queue: String): Result<Metrics>

Retrieves metrics for the specified queue.

Link copied to clipboard
suspend fun nack(queue: String, id: Long, vt: Duration = Duration.ZERO): Result<Long>

Sends a negative acknowledgment (nack) to the specified queue for the given message ID, optionally resetting the visibility timeout (VT) to a new duration.

Link copied to clipboard
suspend fun pop(queue: String, quantity: Int = 1): Result<List<Message>>
context(db: QueryExecutor)
suspend fun pop(queue: String, quantity: Int = 1): Result<List<Message>>

Removes and retrieves messages from the specified queue.

Link copied to clipboard
suspend fun purge(queue: PgmqClient.Queue): Result<Long>

Removes all messages from the specified queue.

Link copied to clipboard
suspend fun read(queue: String, quantity: Int = 1, vt: Duration = 30.seconds): Result<List<Message>>

Reads messages from a specified queue.

context(db: QueryExecutor)
suspend fun read(queue: String, quantity: Int = 1, vt: Duration = 30.seconds): Result<List<Message>>

Reads messages from a specified queue with the given parameters.

Link copied to clipboard
suspend fun send(queue: String, message: String, headers: Map<String, String> = emptyMap(), delay: Duration = 0.seconds): Result<Long>
context(db: QueryExecutor)
suspend fun send(queue: String, message: String, headers: Map<String, String> = emptyMap(), delay: Duration = 0.seconds): Result<Long>

Sends a message to the specified queue with optional headers and delay.

suspend fun send(queue: String, messages: List<String>, headers: Map<String, String> = emptyMap(), delay: Duration = 0.seconds): Result<List<Long>>

Sends multiple messages to the specified queue with optional headers and delay.

context(db: QueryExecutor)
suspend fun send(queue: String, messages: List<String>, headers: Map<String, String> = emptyMap(), delay: Duration = 0.seconds): Result<List<Long>>

Sends a batch of messages to the specified queue with optional headers and a delay.

Link copied to clipboard
suspend fun sendTopic(routingKey: String, message: String, headers: Map<String, String> = emptyMap(), delay: Duration = 0.seconds): Result<Long>
context(db: QueryExecutor)
suspend fun sendTopic(routingKey: String, message: String, headers: Map<String, String> = emptyMap(), delay: Duration = 0.seconds): Result<Long>

Sends a message to all queues that match the routing key pattern.

Link copied to clipboard
suspend fun setVt(queue: String, id: Long, vt: Duration): Result<Long>

Updates the visibility timeout (VT) for a specific item in the given queue.

context(db: QueryExecutor)
suspend fun setVt(queue: String, id: Long, vt: Duration): Result<Long>

Updates the visibility timeout (VT) for a message in the specified queue.

Link copied to clipboard
suspend fun unbindTopic(pattern: String, queueName: String): Result<Boolean>

Removes a topic binding between a pattern and a queue.