PgMqConsumer

class PgMqConsumer(pgmq: PgMqClient, options: PgMqConsumer.Options, onMessage: suspend (Message) -> Unit, onFaiToRead: suspend (Throwable) -> Unit = {}, onFailToProcess: suspend (Throwable) -> Unit = {}, onFaiToAck: suspend (Throwable) -> Unit = {}, onFaiToNack: suspend (Throwable) -> Unit = {})(source)

PgMqConsumer is responsible for consuming messages from a PostgreSQL-based message queue (PgMq). It provides configurable options for queue management and handles message processing, acknowledging, and retrying logic.

Constructors

Link copied to clipboard
constructor(pgmq: PgMqClient, options: PgMqConsumer.Options, onMessage: suspend (Message) -> Unit, onFaiToRead: suspend (Throwable) -> Unit = {}, onFailToProcess: suspend (Throwable) -> Unit = {}, onFaiToAck: suspend (Throwable) -> Unit = {}, onFaiToNack: suspend (Throwable) -> Unit = {})

Creates an instance of PgMqConsumer with the provided PgMq client, configuration options, and callbacks for message processing and error handling.

Types

Link copied to clipboard
object Companion
Link copied to clipboard
data class Options(val queue: String, val prefetch: Int = 250, val vt: Duration = 10.seconds, val autoStart: Boolean = true, val enableNotifyInsert: Boolean = false, val queueMinPullDelay: Duration = 50.milliseconds, val queueMaxPullDelay: Duration = 2.seconds, val messageRetryDelayStep: Duration = 500.milliseconds, val messageMaxRetryDelay: Duration = 60.seconds)

Configuration options for a message queue consumer.

Functions

Link copied to clipboard
suspend fun metrics(): Result<Metrics>

Retrieves metrics for the current queue managed by the consumer.

Link copied to clipboard
fun start()

Starts the message queue consumer by initializing and launching processes for notification, consumption, and fetching, if they are not already running.

Link copied to clipboard
fun stop()

Stops the message queue consumer by gracefully canceling ongoing processes for consumption and fetching.

Link copied to clipboard
open override fun toString(): String