// This example declares a durable Exchange, an ephemeral (auto-delete) Queue, // binds the Queue to the Exchange with a binding key, and consumes every // message published to that Exchange with that routing key. // package main import ( "flag" "fmt" "github.com/streadway/amqp" "log" "time" ) var ( uri = flag.String("uri", "amqp://guest:guest@localhost:5672/", "AMQP URI") exchange = flag.String("exchange", "test-exchange", "Durable, non-auto-deleted AMQP exchange name") exchangeType = flag.String("exchange-type", "direct", "Exchange type - direct|fanout|topic|x-custom") queue = flag.String("queue", "test-queue", "Ephemeral AMQP queue name") bindingKey = flag.String("key", "test-key", "AMQP binding key") consumerTag = flag.String("consumer-tag", "simple-consumer", "AMQP consumer tag (should not be blank)") lifetime = flag.Duration("lifetime", 5*time.Second, "lifetime of process before shutdown (0s=infinite)") ) func init() { flag.Parse() } func main() { c, err := NewConsumer(*uri, *exchange, *exchangeType, *queue, *bindingKey, *consumerTag) if err != nil { log.Fatalf("%s", err) } if *lifetime > 0 { log.Printf("running for %s", *lifetime) time.Sleep(*lifetime) } else { log.Printf("running forever") select {} } log.Printf("shutting down") if err := c.Shutdown(); err != nil { log.Fatalf("error during shutdown: %s", err) } } type Consumer struct { conn *amqp.Connection channel *amqp.Channel tag string done chan error } func NewConsumer(amqpURI, exchange, exchangeType, queueName, key, ctag string) (*Consumer, error) { c := &Consumer{ conn: nil, channel: nil, tag: ctag, done: make(chan error), } var err error log.Printf("dialing %q", amqpURI) c.conn, err = amqp.Dial(amqpURI) if err != nil { return nil, fmt.Errorf("Dial: %s", err) } go func() { fmt.Printf("closing: %s", <-c.conn.NotifyClose(make(chan *amqp.Error))) }() log.Printf("got Connection, getting Channel") c.channel, err = c.conn.Channel() if err != nil { return nil, fmt.Errorf("Channel: %s", err) } log.Printf("got Channel, declaring Exchange (%q)", exchange) if err = c.channel.ExchangeDeclare( exchange, // name of the exchange exchangeType, // type true, // durable false, // delete when complete false, // internal false, // noWait nil, // arguments ); err != nil { return nil, fmt.Errorf("Exchange Declare: %s", err) } log.Printf("declared Exchange, declaring Queue %q", queueName) queue, err := c.channel.QueueDeclare( queueName, // name of the queue true, // durable false, // delete when unused false, // exclusive false, // noWait nil, // arguments ) if err != nil { return nil, fmt.Errorf("Queue Declare: %s", err) } log.Printf("declared Queue (%q %d messages, %d consumers), binding to Exchange (key %q)", queue.Name, queue.Messages, queue.Consumers, key) if err = c.channel.QueueBind( queue.Name, // name of the queue key, // bindingKey exchange, // sourceExchange false, // noWait nil, // arguments ); err != nil { return nil, fmt.Errorf("Queue Bind: %s", err) } log.Printf("Queue bound to Exchange, starting Consume (consumer tag %q)", c.tag) deliveries, err := c.channel.Consume( queue.Name, // name c.tag, // consumerTag, false, // noAck false, // exclusive false, // noLocal false, // noWait nil, // arguments ) if err != nil { return nil, fmt.Errorf("Queue Consume: %s", err) } go handle(deliveries, c.done) return c, nil } func (c *Consumer) Shutdown() error { // will close() the deliveries channel if err := c.channel.Cancel(c.tag, true); err != nil { return fmt.Errorf("Consumer cancel failed: %s", err) } if err := c.conn.Close(); err != nil { return fmt.Errorf("AMQP connection close error: %s", err) } defer log.Printf("AMQP shutdown OK") // wait for handle() to exit return <-c.done } func handle(deliveries <-chan amqp.Delivery, done chan error) { for d := range deliveries { log.Printf( "got %dB delivery: [%v] %q", len(d.Body), d.DeliveryTag, d.Body, ) d.Ack(false) } log.Printf("handle: deliveries channel closed") done <- nil }