// Copyright (c) 2021 VMware, Inc. or its affiliates. All Rights Reserved. // Copyright (c) 2012-2021, Sean Treadway, SoundCloud Ltd. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. package amqp091_test import ( "context" "errors" "fmt" "log" "os" "time" amqp "github.com/rabbitmq/amqp091-go" ) // This exports a Client object that wraps this library. It // automatically reconnects when the connection fails, and // blocks all pushes until the connection succeeds. It also // confirms every outgoing message, so none are lost. // It doesn't automatically ack each message, but leaves that // to the parent process, since it is usage-dependent. // // Try running this in one terminal, and `rabbitmq-server` in another. // Stop & restart RabbitMQ to see how the queue reacts. func Example() { queueName := "job_queue" addr := "amqp://guest:guest@localhost:5672/" queue := New(queueName, addr) message := []byte("message") ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second*20)) defer cancel() loop: for { select { // Attempt to push a message every 2 seconds case <-time.After(time.Second * 2): if err := queue.Push(message); err != nil { fmt.Printf("Push failed: %s\n", err) } else { fmt.Println("Push succeeded!") } case <-ctx.Done(): queue.Close() break loop } } } func Example_consume() { queueName := "job_queue" addr := "amqp://guest:guest@localhost:5672/" queue := New(queueName, addr) // Give the connection sometime to setup <-time.After(time.Second) ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) defer cancel() deliveries, err := queue.Consume() if err != nil { fmt.Printf("Could not start consuming: %s\n", err) return } // This channel will receive a notification when a channel closed event // happens. This must be different than Client.notifyChanClose because the // library sends only one notification and Client.notifyChanClose already has // a receiver in handleReconnect(). // Recommended to make it buffered to avoid deadlocks chClosedCh := make(chan *amqp.Error, 1) queue.channel.NotifyClose(chClosedCh) for { select { case <-ctx.Done(): queue.Close() return case amqErr := <-chClosedCh: // This case handles the event of closed channel e.g. abnormal shutdown fmt.Printf("AMQP Channel closed due to: %s\n", amqErr) deliveries, err = queue.Consume() if err != nil { // If the AMQP channel is not ready, it will continue the loop. Next // iteration will enter this case because chClosedCh is closed by the // library fmt.Println("Error trying to consume, will try again") continue } // Re-set channel to receive notifications // The library closes this channel after abnormal shutdown chClosedCh = make(chan *amqp.Error, 1) queue.channel.NotifyClose(chClosedCh) case delivery := <-deliveries: // Ack a message every 2 seconds fmt.Printf("Received message: %s\n", delivery.Body) if err := delivery.Ack(false); err != nil { fmt.Printf("Error acknowledging message: %s\n", err) } <-time.After(time.Second * 2) } } } type Client struct { queueName string logger *log.Logger connection *amqp.Connection channel *amqp.Channel done chan bool notifyConnClose chan *amqp.Error notifyChanClose chan *amqp.Error notifyConfirm chan amqp.Confirmation isReady bool } const ( // When reconnecting to the server after connection failure reconnectDelay = 5 * time.Second // When setting up the channel after a channel exception reInitDelay = 2 * time.Second // When resending messages the server didn't confirm resendDelay = 5 * time.Second ) var ( errNotConnected = errors.New("not connected to a server") errAlreadyClosed = errors.New("already closed: not connected to the server") errShutdown = errors.New("client is shutting down") ) // New creates a new consumer state instance, and automatically // attempts to connect to the server. func New(queueName, addr string) *Client { client := Client{ logger: log.New(os.Stdout, "", log.LstdFlags), queueName: queueName, done: make(chan bool), } go client.handleReconnect(addr) return &client } // handleReconnect will wait for a connection error on // notifyConnClose, and then continuously attempt to reconnect. func (client *Client) handleReconnect(addr string) { for { client.isReady = false client.logger.Println("Attempting to connect") conn, err := client.connect(addr) if err != nil { client.logger.Println("Failed to connect. Retrying...") select { case <-client.done: return case <-time.After(reconnectDelay): } continue } if done := client.handleReInit(conn); done { break } } } // connect will create a new AMQP connection func (client *Client) connect(addr string) (*amqp.Connection, error) { conn, err := amqp.Dial(addr) if err != nil { return nil, err } client.changeConnection(conn) client.logger.Println("Connected!") return conn, nil } // handleReconnect will wait for a channel error // and then continuously attempt to re-initialize both channels func (client *Client) handleReInit(conn *amqp.Connection) bool { for { client.isReady = false err := client.init(conn) if err != nil { client.logger.Println("Failed to initialize channel. Retrying...") select { case <-client.done: return true case <-time.After(reInitDelay): } continue } select { case <-client.done: return true case <-client.notifyConnClose: client.logger.Println("Connection closed. Reconnecting...") return false case <-client.notifyChanClose: client.logger.Println("Channel closed. Re-running init...") } } } // init will initialize channel & declare queue func (client *Client) init(conn *amqp.Connection) error { ch, err := conn.Channel() if err != nil { return err } err = ch.Confirm(false) if err != nil { return err } _, err = ch.QueueDeclare( client.queueName, false, // Durable false, // Delete when unused false, // Exclusive false, // No-wait nil, // Arguments ) if err != nil { return err } client.changeChannel(ch) client.isReady = true client.logger.Println("Setup!") return nil } // changeConnection takes a new connection to the queue, // and updates the close listener to reflect this. func (client *Client) changeConnection(connection *amqp.Connection) { client.connection = connection client.notifyConnClose = make(chan *amqp.Error, 1) client.connection.NotifyClose(client.notifyConnClose) } // changeChannel takes a new channel to the queue, // and updates the channel listeners to reflect this. func (client *Client) changeChannel(channel *amqp.Channel) { client.channel = channel client.notifyChanClose = make(chan *amqp.Error, 1) client.notifyConfirm = make(chan amqp.Confirmation, 1) client.channel.NotifyClose(client.notifyChanClose) client.channel.NotifyPublish(client.notifyConfirm) } // Push will push data onto the queue, and wait for a confirm. // If no confirms are received until within the resendTimeout, // it continuously re-sends messages until a confirm is received. // This will block until the server sends a confirm. Errors are // only returned if the push action itself fails, see UnsafePush. func (client *Client) Push(data []byte) error { if !client.isReady { return errors.New("failed to push: not connected") } for { err := client.UnsafePush(data) if err != nil { client.logger.Println("Push failed. Retrying...") select { case <-client.done: return errShutdown case <-time.After(resendDelay): } continue } select { case confirm := <-client.notifyConfirm: if confirm.Ack { client.logger.Println("Push confirmed!") return nil } case <-time.After(resendDelay): } client.logger.Println("Push didn't confirm. Retrying...") } } // UnsafePush will push to the queue without checking for // confirmation. It returns an error if it fails to connect. // No guarantees are provided for whether the server will // receive the message. func (client *Client) UnsafePush(data []byte) error { if !client.isReady { return errNotConnected } ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() return client.channel.PublishWithContext( ctx, "", // Exchange client.queueName, // Routing key false, // Mandatory false, // Immediate amqp.Publishing{ ContentType: "text/plain", Body: data, }, ) } // Consume will continuously put queue items on the channel. // It is required to call delivery.Ack when it has been // successfully processed, or delivery.Nack when it fails. // Ignoring this will cause data to build up on the server. func (client *Client) Consume() (<-chan amqp.Delivery, error) { if !client.isReady { return nil, errNotConnected } if err := client.channel.Qos( 1, // prefetchCount 0, // prefrechSize false, // global ); err != nil { return nil, err } return client.channel.Consume( client.queueName, "", // Consumer false, // Auto-Ack false, // Exclusive false, // No-local false, // No-Wait nil, // Args ) } // Close will cleanly shutdown the channel and connection. func (client *Client) Close() error { if !client.isReady { return errAlreadyClosed } close(client.done) err := client.channel.Close() if err != nil { return err } err = client.connection.Close() if err != nil { return err } client.isReady = false return nil }