1、介绍

RabbitMQ是一款使用Erlang开发的,实现AMQP(高级消息队列)的开源消息中间件。

以下是RabbitMQ中常用的一些术语:

  • Producing:仅发送。发送消息的程序就是一个生产者,图例如下:
    producer
  • queue:尽管流经RabbitMQ和你的应用程序,但消息只存储在队列中。队列实际上就是一个大的消息缓存区,所有仅它受主机内存和硬盘限制。多个生产者可以向同一个队列发送消息,多个消费者也可以冲同一个队列中获取数据,图例如下:
    queue
  • Consuming:等同于接收。消费者通常是等待接收消息的程序。

需要注意的是,生产者、消费者和代理并不需要在同一台主机上,实际上大多数应用程序都不在同一台机器上。一个应用程序可以是生产者,同时也可以是消费者。


2、Hello World

使用Go RabbitMQ Client

在本章节中,我们会使用两个Go写的小程序:发送消息的生产者,接收并将消息输出的消费者。期间会介绍一些Go RabbitMQ细节,以便更好地入门。

下面的示意图中,“P”代表生产者,“C”代表消费者,中间的方框代表队列 — RabbitMQ生成的消息缓存区:

P_Q_C

Go RabbitMQ client 库 RabbitMQ支持多种协议,本教程中使用的是AMQP 0-9-1,一种开发的通用消息传递协议。在多语言支持中有很多RabbitMQ客户端,我们这里使用的是Go amqp
使用go get github.com/streadway/amqp命令安装amqp。


2.1 Sending

在我们的例子中,消息生产者是send.go,消费者是receive.go。生产者会连接到RabbitMQ,发送一条信息,然后退出。

send.go文件中,首先我们需要导入RabbitMQ库:

package main

import (
  "log"

  "github.com/streadway/amqp"
)

我们还需要一个辅助函数来检查每次amqp调用之后的返回值:

func failOnError(err error, msg string) {
  if err != nil {
    log.Fatalf("%s: %s", msg, err)
  }
}

之后,连接RabbitMQ服务:

conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()

上面过程将Socker链接进行了抽象,帮我们做了协议版本磋商和认证。接着,创建一个通道,这个通道大多数API完成操作的地方:

ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()

然后创建一个队列来接收发送的消息:

q, err := ch.QueueDeclare(
  "hello", // name
  false,   // durable
  false,   // delete when unused
  false,   // exclusive
  false,   // no-wait
  nil,     // arguments
)
failOnError(err, "Failed to declare a queue")

body := "Hello World!"
err = ch.Publish(
  "",     // exchange
  q.Name, // routing key
  false,  // mandatory
  false,  // immediate
  amqp.Publishing {
    ContentType: "text/plain",
    Body:        []byte(body),
  })
failOnError(err, "Failed to publish a message")

只有在队列不存在时才需要创建它。消息的内容是一个byte数组,所以消息内容你可以编码任何你想要的内容。


2.2 Receiving

上面实现了消息的生产者。消费者监听来自RabbitMQ的消息,所以跟上面只发送一条消息就退出的生产者不同,消费者需要一直监听消息并将它们输出。

reveive.go有跟send.go中一样的导入项和复制函数:

package main

import (
  "log"

  "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
  if err != nil {
    log.Fatalf("%s: %s", msg, err)
  }
}

与生产者一样,我们需要打开连接和一条通道,声明一个我们即将消费的队列。声明的队列要与send.go中的匹配:

failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()

ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()

q, err := ch.QueueDeclare(
  "hello", // name
  false,   // durable
  false,   // delete when unused
  false,   // exclusive
  false,   // no-wait
  nil,     // arguments
)
failOnError(err, "Failed to declare a queue")

这里我们也声明了队列,因为我们可能在启动生产者之前启动了消费者。我们需要确保在我们们尝试消费消息前所用的队列已存在。

接下来,我们通知服务器通过队列给我们推送消息。因为是消息是异步推送的,所以我们在协程中通过通道(由amqp::Consume返回)来读取消息:

msgs, err := ch.Consume(
  q.Name, // queue
  "",     // consumer
  true,   // auto-ack
  false,  // exclusive
  false,  // no-local
  false,  // no-wait
  nil,    // args
)
failOnError(err, "Failed to register a consumer")

forever := make(chan bool)

go func() {
 for d := range msgs {
   log.Printf("Received a message: %s", d.Body)
 }
}()

log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever

2.3 整合到一起

现在我们可以执行上面的程序。启动生产者:

go run send.go

在另一个终端启动消费者:

go run receive.go

消费者将输出通过RabbitMQ获得的、从生产者发出的消息。因为消费者会一直运行,并等待消息(可通过Ctrl-C/control-C来关闭它),所以我们需要在另一个终端中运行生产者。

要查看队列的话,可以通过浏览器访问“localhost:15672”来查看,默认的用户名密码为:guest:guest


声明:本作品采用署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0)进行许可,使用时请注明出处。
Author: mengbin92
Github: mengbin92
cnblogs: 恋水无意