Re-publishing of messages that couldn't get delivered to the exchange
Re-publishing of messages that would've been lost during a reconnection or instability
Smart decoder based on the content type of the message
Configurable parallel consumption of messages using goroutines
Simple auto-creation of queues, exchanges, binds, and dead letter queues
// main.gopackagemainimport ( thunderEventRabbitmq "github.com/gothunder/thunder/pkg/events/rabbitmq" thunderLogs "github.com/gothunder/thunder/pkg/log""github.com/rs/zerolog/diode""go.uber.org/fx")funcmain() {var w diode.Writer app := fx.New(// The order of these options isn't important. thunderLogs.Module, fx.Populate(&w), thunderEventRabbitmq.PublisherModule, thunderEventRabbitmq.InvokeConsumer, ) app.Run()// This is required to flush the logs to stdout.// We only want to do this after the app has exited. thunderLogs.DiodeShutdown(w)}
Publisher
If you're using the fx module, the publisher will automatically be started and closed.
When publishing an event, you'll be sending the struct that will be serialized and sent to the exchange. If there's any error with the serialization, it'll be returned back to you.
The message will be published asynchronously, and any errors will be treated and retried by the module.
typeEventPublisherinterface {// StartPublisher starts the background go routine that will publish messages// Returns an error if the publisher fails to start or reconnectStartPublisher(context.Context) error// Publish publishes a message to the given topic// The message is published asynchronously// The message will be republished if the connection is lostPublish( ctx context.Context,// The name of the event. topic string,// The payload of the event. payload interface{}, ) error// Close gracefully closes the publisher, making sure all messages are publishedClose(context.Context) error}
Consumer
Make sure that you define a single handler that matches the interface below.
typeHandlerResponseintconst (// Default, we remove the message from the queue. Success HandlerResponse=iota// The message will be delivered to a server configured dead-letter queue. DeadLetter// Deliver this message to a different worker. Retry)typeEventDecoderinterface {// Decode decodes the payload into the given interface.// Returns an error if the payload cannot be decoded.Decode(v interface{}) error}typeHandlerinterface {// The function that will be called when a message is received.Handle(ctx context.Context, topic string, decoder EventDecoder) HandlerResponse// The topics that will be subscribed to.Topics() []string}