Client Library
Client Library
Section titled “Client Library”Pure Go client libraries for MQTT 3.1.1/5.0 and AMQP 0.9.1 with durable queue support. Note: FluxMQ also exposes AMQP 1.0 server support, but this repository does not currently include a dedicated AMQP 1.0 client library.
Features
Section titled “Features”- Protocol Support: MQTT 3.1.1 (v4) and MQTT 5.0 (v5)
- Auto-Reconnect: Exponential backoff with configurable limits
- QoS Levels: Full QoS 0/1/2 support with pluggable in-flight store (memory by default)
- TLS/SSL: Secure connections with custom certificates
- Session Persistence: Configurable session expiry
- Durable Queues: Consumer groups and acknowledgments (DLQ wiring pending)
- MQTT 5.0 Features: Topic aliases, user properties (publish/receive/will), flow control
MQTT Client
Section titled “MQTT Client”Quick Start
Section titled “Quick Start”Basic Connection
Section titled “Basic Connection”package main
import ( "log" "github.com/absmach/fluxmq/client")
func main() { // Create client with options opts := client.NewOptions(). SetServers("localhost:1883"). SetClientID("my-client"). SetProtocolVersion(5). SetOnMessage(func(topic string, payload []byte, qos byte) { log.Printf("Received: %s -> %s", topic, string(payload)) })
c := client.New(opts)
// Connect if err := c.Connect(); err != nil { log.Fatal(err) } defer c.Disconnect()
// Subscribe if err := c.SubscribeSingle("sensors/#", 1); err != nil { log.Fatal(err) }
// Publish if err := c.Publish("sensors/temp", []byte("22.5"), 1, false); err != nil { log.Fatal(err) }
// Keep running select {}}Configuration Options
Section titled “Configuration Options”Connection Settings
Section titled “Connection Settings”opts := client.NewOptions(). SetServers("broker1:1883", "broker2:1883"). // Multiple servers SetClientID("device-001"). SetCredentials("user", "password"). SetTLSConfig(&tls.Config{...}). // Enable TLS SetConnectTimeout(10 * time.Second). SetKeepAlive(60 * time.Second)Protocol Version
Section titled “Protocol Version”opts.SetProtocolVersion(4) // MQTT 3.1.1opts.SetProtocolVersion(5) // MQTT 5.0Session Options
Section titled “Session Options”opts.SetCleanSession(true) // Start fresh each connectopts.SetCleanSession(false) // Resume previous sessionopts.SetSessionExpiry(3600) // Session persists 1 hour after disconnect (v5)MQTT 5.0 Specific
Section titled “MQTT 5.0 Specific”opts.SetSessionExpiry(86400). // Session expires in 24h SetReceiveMaximum(100). // Max inflight messages to receive SetMaximumPacketSize(1048576). // Max 1MB packets SetTopicAliasMaximum(10). // Enable topic aliases SetRequestResponseInfo(true). // Request response info SetRequestProblemInfo(true) // Get detailed errorsReconnection
Section titled “Reconnection”opts.SetAutoReconnect(true) // Enable auto-reconnectopts.ReconnectBackoff = 1 * time.Second // Initial delayopts.MaxReconnectWait = 2 * time.Minute // Max delayPublishing Messages
Section titled “Publishing Messages”Basic Publish
Section titled “Basic Publish”// QoS 0 - Fire and forgetc.Publish("topic", []byte("payload"), 0, false)
// QoS 1 - At least oncec.Publish("topic", []byte("payload"), 1, false)
// QoS 2 - Exactly oncec.Publish("topic", []byte("payload"), 2, false)
// Retained messagec.Publish("config/device", []byte("settings"), 1, true)MQTT 5.0 Publish Properties
Section titled “MQTT 5.0 Publish Properties”Use PublishMessage to set publish properties such as content type, response
topic, correlation data, and user properties (MQTT 5.0 only).
msg := &client.Message{ Topic: "sensors/temp", Payload: []byte("22.5"), QoS: 1, ContentType: "text/plain", ResponseTopic: "responses/temp", CorrelationData: []byte("req-123"), UserProperties: map[string]string{"unit": "celsius"},}if err := c.PublishMessage(msg); err != nil { log.Fatal(err)}Subscribing to Topics
Section titled “Subscribing to Topics”Basic Subscription
Section titled “Basic Subscription”// Single topicc.SubscribeSingle("sensors/temp", 1)
// Multiple topicsc.Subscribe(map[string]byte{ "sensors/#": 1, "devices/+/status": 2,})MQTT 5.0 Subscription Options
Section titled “MQTT 5.0 Subscription Options”opts := &client.SubscribeOption{ Topic: "sensors/temp", QoS: 1, NoLocal: true, // Don't receive own messages RetainAsPublished: true, // Keep original retain flag RetainHandling: 1, // Only send retained if new sub SubscriptionID: 42, // Track subscription}c.SubscribeWithOptions(opts)Unsubscribe
Section titled “Unsubscribe”c.Unsubscribe("sensors/temp")Message Handling
Section titled “Message Handling”Simple Handler
Section titled “Simple Handler”opts.SetOnMessage(func(topic string, payload []byte, qos byte) { log.Printf("[%s] QoS %d: %s", topic, qos, payload)})Full Message Context (MQTT 5.0)
Section titled “Full Message Context (MQTT 5.0)”opts.SetOnMessageV2(func(msg *client.Message) { log.Printf("Topic: %s", msg.Topic) log.Printf("Payload: %s", msg.Payload) log.Printf("QoS: %d", msg.QoS) log.Printf("Retain: %v", msg.Retain) log.Printf("Properties: %+v", msg.UserProperties) log.Printf("Response Topic: %s", msg.ResponseTopic)})Durable Queues
Section titled “Durable Queues”The client supports durable queues with consumer groups and message acknowledgment. Reject/DLQ wiring in the broker is pending. MQTT v3 can publish and subscribe to queue topics, but acknowledgments require MQTT v5 user properties.
When to use queues instead of regular pub/sub:
- You need at-least-once processing with explicit acknowledgments
- Multiple consumers should share the workload (consumer groups)
- Failed messages need retry logic or dead-letter handling
Key concepts:
- Queue: Persistent message buffer with ordered delivery per queue (single log)
- Consumer Group: Multiple consumers share messages from the same queue
- Acknowledgment: Confirm success (Ack), request redelivery (Nack), or reject permanently (Reject)
Publishing to Queues
Section titled “Publishing to Queues”// Simple queue publishc.PublishToQueue("orders", []byte(`{"item": "widget"}`))
// Full controlc.PublishToQueueWithOptions(&client.QueuePublishOptions{ QueueName: "events", Payload: []byte("event-data"), Properties: map[string]string{"priority": "high"}, QoS: 1,})Subscribing to Queues
Section titled “Subscribing to Queues”// Subscribe with consumer grouperr := c.SubscribeToQueue("orders", "order-processors", func(msg *client.QueueMessage) { log.Printf("Processing order: %s", msg.Payload) log.Printf("Message ID: %s", msg.MessageID) log.Printf("Group: %s", msg.GroupID) log.Printf("Offset: %d", msg.Offset)
// Process message... if processedOK { msg.Ack() // Message removed from queue } else if shouldRetry { msg.Nack() // Redelivery eligible (subject to broker delivery/visibility timing) } else { msg.Reject() // Removes from pending; DLQ routing not wired yet }})Message Acknowledgment
Section titled “Message Acknowledgment”| Method | Effect |
|---|---|
msg.Ack() | Message processed successfully, removed from queue |
msg.Nack() | Processing failed, make eligible for redelivery |
msg.Reject() | Remove from pending; DLQ routing not wired yet |
Direct Acknowledgment
Section titled “Direct Acknowledgment”// Acknowledge by message ID (explicit group)c.AckWithGroup("orders", "msg-12345", "processors")c.NackWithGroup("orders", "msg-12345", "processors")c.RejectWithGroup("orders", "msg-12345", "processors")Note: MQTT queue acknowledgments require MQTT v5 and the broker expects
message-id and group-id user properties on ack messages. QueueMessage.Ack()
sends both when they are present on incoming messages.
Unsubscribe from Queue
Section titled “Unsubscribe from Queue”c.UnsubscribeFromQueue("orders")Queue Code Example
Section titled “Queue Code Example”package main
import ( "log" "github.com/absmach/fluxmq/client")
func main() { opts := client.NewOptions(). SetServers("localhost:1883"). SetClientID("order-processor"). SetProtocolVersion(5)
c := client.New(opts) if err := c.Connect(); err != nil { log.Fatal(err) } defer c.Disconnect()
// Subscribe to order queue with consumer group err := c.SubscribeToQueue("orders", "processors", func(msg *client.QueueMessage) { log.Printf("Order received: %s", msg.Payload)
// Simulate processing if processOrder(msg.Payload) { if err := msg.Ack(); err != nil { log.Printf("Ack failed: %v", err) } } else { msg.Nack() // Retry later } }) if err != nil { log.Fatal(err) }
// Publish some orders for i := 0; i < 10; i++ { c.PublishToQueue("orders", []byte(`{"id": "`+string(rune(i))+`"}`)) }
select {} // Keep running}
func processOrder(payload []byte) bool { // Process order logic return true}Connection Lifecycle
Section titled “Connection Lifecycle”Callbacks
Section titled “Callbacks”opts.SetOnConnect(func() { log.Println("Connected!")}).SetOnConnectionLost(func(err error) { log.Printf("Connection lost: %v", err)}).SetOnReconnecting(func(attempt int) { log.Printf("Reconnecting (attempt %d)...", attempt)}).SetOnServerCapabilities(func(caps *client.ServerCapabilities) { log.Printf("Server max QoS: %d", caps.MaximumQoS) log.Printf("Server retain available: %v", caps.RetainAvailable)})Disconnect
Section titled “Disconnect”// Normal disconnectc.Disconnect()
// With reason (MQTT 5.0)c.DisconnectWithReason(0x04, 0, "Going offline")Will Messages
Section titled “Will Messages”Configure a last-will message sent when the client disconnects unexpectedly:
opts.SetWill("clients/device-001/status", []byte("offline"), 1, true)With MQTT 5.0 Properties
Section titled “With MQTT 5.0 Properties”opts.Will = &client.WillMessage{ Topic: "clients/device-001/status", Payload: []byte("offline"), QoS: 1, Retain: true, WillDelayInterval: 30, // Wait 30s before sending MessageExpiry: 3600, UserProperties: map[string]string{"reason": "unexpected"},}Error Handling
Section titled “Error Handling”Common Errors
Section titled “Common Errors”| Error | Cause |
|---|---|
ErrNotConnected | Operation attempted while disconnected |
ErrNoServers | No broker addresses configured |
ErrEmptyClientID | ClientID not set |
ErrInvalidProtocol | Protocol version must be 4 or 5 |
ErrInvalidQoS | QoS must be 0, 1, or 2 |
ErrInvalidTopic | Empty or invalid topic string |
ErrInvalidMessage | Message is nil or invalid |
ErrMaxInflight | Too many pending messages |
ErrQueueAckRequiresV5 | Queue acks require MQTT v5 user properties |
ErrQueueAckMissingGroup | group-id missing for queue ack |
Handling Connection Errors
Section titled “Handling Connection Errors”if err := c.Connect(); err != nil { switch err { case client.ErrNoServers: log.Fatal("No brokers configured") case client.ErrEmptyClientID: log.Fatal("ClientID required") default: log.Printf("Connection error: %v", err) }}Message Store
Section titled “Message Store”For QoS 1/2 in-flight storage:
store := client.NewMemoryStore()opts.SetStore(store)Built-in stores:
- MemoryStore (default): In-memory, lost on restart
You can implement the MessageStore interface to persist QoS 1/2 in-flight data.
Defaults
Section titled “Defaults”| Option | Default Value |
|---|---|
| KeepAlive | 60 seconds |
| ConnectTimeout | 10 seconds |
| WriteTimeout | 5 seconds |
| AckTimeout | 10 seconds |
| PingTimeout | 5 seconds |
| MaxInflight | 100 |
| MessageChanSize | 256 |
| AutoReconnect | true |
| ReconnectBackoff | 1 second |
| MaxReconnectWait | 2 minutes |
| ProtocolVersion | 4 (MQTT 3.1.1) |
| CleanSession | true |
AMQP 0.9.1 Client
Section titled “AMQP 0.9.1 Client”The AMQP 0.9.1 client focuses on durable queue interop with the broker. It uses the same queue naming convention as MQTT: pass the queue name without the $queue/ prefix.
Quick Start
Section titled “Quick Start”package main
import ( "log"
"github.com/absmach/fluxmq/client/amqp")
func main() { opts := amqp.NewOptions(). SetAddress("localhost:5682"). SetCredentials("guest", "guest")
c, err := amqp.New(opts) if err != nil { log.Fatal(err) }
if err := c.Connect(); err != nil { log.Fatal(err) } defer c.Close()
// Subscribe to a queue with a consumer group err = c.SubscribeToQueue("tasks/orders", "order-shipper", func(msg *amqp.QueueMessage) { log.Printf("Received: %s", string(msg.Body)) _ = msg.Ack() }) if err != nil { log.Fatal(err) }
// Publish to the same queue if err := c.PublishToQueue("tasks/orders", []byte("hello")); err != nil { log.Fatal(err) }
select {}}Queue Semantics
Section titled “Queue Semantics”SubscribeToQueuepasses the consumer group viax-consumer-grouponbasic.consume.Ack,Nack, andRejectmap tobasic.ack,basic.nack, andbasic.reject.
Stream Queues (RabbitMQ-Compatible)
Section titled “Stream Queues (RabbitMQ-Compatible)”Stream queues provide log-style consumption with cursor offsets.
Stream queue names follow RabbitMQ conventions (no $queue/ prefix).
Offsets are passed as x-stream-offset strings; values like first, last,
next, offset=<n>, timestamp=<unix> are interpreted by the broker.
// Declare a stream queueqName, err := c.DeclareStreamQueue(&amqp.StreamQueueOptions{ Name: "events", Durable: true, MaxAge: "7D", MaxLengthBytes: 10 * 1024 * 1024 * 1024,})if err != nil { log.Fatal(err)}log.Printf("stream queue: %s", qName)
// Consume from the beginningerr = c.SubscribeToStream(&amqp.StreamConsumeOptions{ QueueName: "events", Offset: "first",}, func(msg *amqp.QueueMessage) { if off, ok := msg.StreamOffset(); ok { log.Printf("offset=%d payload=%s", off, string(msg.Body)) } _ = msg.Ack()})if err != nil { log.Fatal(err)}
// Publish to the stream queue (RabbitMQ-style)if err := c.PublishToStream("events", []byte("hello"), nil); err != nil { log.Fatal(err)}Stream deliveries include:
x-stream-offsetx-stream-timestampx-work-acked/x-work-committed-offset
The x-work-* fields report the configured primary work group’s committed offset.
x-work-acked is true when this message’s offset is below the committed offset,
which can lag slightly due to auto-commit interval batching.
Convenience accessors are available on QueueMessage:
StreamOffset(), StreamTimestamp(), WorkAcked(), WorkCommittedOffset(), WorkGroup().
Manual Commit Mode
Section titled “Manual Commit Mode”By default, stream consumers auto-commit offsets as messages are delivered
(similar to Kafka’s enable.auto.commit=true). For exactly-once processing,
disable auto-commit and commit explicitly.
Auto-commit is rate-limited by the server setting
queue_manager.auto_commit_interval (default: 5s).
Minimal example:
autoCommit := false_ = c.SubscribeToStream(&amqp.StreamConsumeOptions{ QueueName: "events", ConsumerGroup: "my-group", AutoCommit: &autoCommit,}, handler)
_ = c.CommitOffset("events", "my-group", lastProcessedOffset)Use the same consumer group name in both calls.
With manual commit:
- Messages are delivered but the committed offset doesn’t advance automatically
- On reconnect, delivery resumes from the last committed offset
- Use
CommitOffset()to advance the committed position
Pub/Sub
Section titled “Pub/Sub”_ = c.Subscribe("sensors/#", func(msg *amqp.Message) { log.Printf("Topic: %s Payload: %s", msg.Topic, string(msg.Body))})
_ = c.Publish("sensors/temp", []byte("22.5"))Reconnection
Section titled “Reconnection”opts.SetAutoReconnect(true). SetReconnectBackoff(1 * time.Second). SetMaxReconnectWait(2 * time.Minute). SetOnConnectionLost(func(err error) { log.Printf("lost: %v", err) }). SetOnReconnecting(func(attempt int) { log.Printf("reconnect attempt %d", attempt) })