1、介绍
RabbitMQ是一款使用Erlang开发的,实现AMQP(高级消息队列)的开源消息中间件。
以下是RabbitMQ中常用的一些术语:
- Producing:仅发送。发送消息的程序就是一个生产者,图例如下:
- queue:尽管流经RabbitMQ和你的应用程序,但消息只存储在队列中。队列实际上就是一个大的消息缓存区,所有仅它受主机内存和硬盘限制。多个生产者可以向同一个队列发送消息,多个消费者也可以冲同一个队列中获取数据,图例如下:
- Consuming:等同于接收。消费者通常是等待接收消息的程序。
需要注意的是,生产者、消费者和代理并不需要在同一台主机上,实际上大多数应用程序都不在同一台机器上。一个应用程序可以是生产者,同时也可以是消费者。
2、Hello World
使用Go RabbitMQ Client
在本章节中,我们会使用两个Go写的小程序:发送消息的生产者,接收并将消息输出的消费者。期间会介绍一些Go RabbitMQ细节,以便更好地入门。
下面的示意图中,“P”代表生产者,“C”代表消费者,中间的方框代表队列 — RabbitMQ生成的消息缓存区:
Go RabbitMQ client 库 RabbitMQ支持多种协议,本教程中使用的是AMQP 0-9-1,一种开发的通用消息传递协议。在多语言支持中有很多RabbitMQ客户端,我们这里使用的是Go amqp。
使用go get github.com/streadway/amqp
命令安装amqp。
2.1 Sending
在我们的例子中,消息生产者是send.go
,消费者是receive.go
。生产者会连接到RabbitMQ,发送一条信息,然后退出。
在send.go
文件中,首先我们需要导入RabbitMQ库:
package main
import (
"log"
"github.com/streadway/amqp"
)
我们还需要一个辅助函数来检查每次amqp调用之后的返回值:
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
之后,连接RabbitMQ服务:
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
上面过程将Socker链接进行了抽象,帮我们做了协议版本磋商和认证。接着,创建一个通道,这个通道大多数API完成操作的地方:
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
然后创建一个队列来接收发送的消息:
q, err := ch.QueueDeclare(
"hello", // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
body := "Hello World!"
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing {
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
只有在队列不存在时才需要创建它。消息的内容是一个byte数组,所以消息内容你可以编码任何你想要的内容。
2.2 Receiving
上面实现了消息的生产者。消费者监听来自RabbitMQ的消息,所以跟上面只发送一条消息就退出的生产者不同,消费者需要一直监听消息并将它们输出。
reveive.go
有跟send.go
中一样的导入项和复制函数:
package main
import (
"log"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
与生产者一样,我们需要打开连接和一条通道,声明一个我们即将消费的队列。声明的队列要与send.go
中的匹配:
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare(
"hello", // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
这里我们也声明了队列,因为我们可能在启动生产者之前启动了消费者。我们需要确保在我们们尝试消费消息前所用的队列已存在。
接下来,我们通知服务器通过队列给我们推送消息。因为是消息是异步推送的,所以我们在协程中通过通道(由amqp::Consume
返回)来读取消息:
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
2.3 整合到一起
现在我们可以执行上面的程序。启动生产者:
go run send.go
在另一个终端启动消费者:
go run receive.go
消费者将输出通过RabbitMQ获得的、从生产者发出的消息。因为消费者会一直运行,并等待消息(可通过Ctrl-C
/control-C
来关闭它),所以我们需要在另一个终端中运行生产者。
要查看队列的话,可以通过浏览器访问“localhost:15672”来查看,默认的用户名密码为:guest:guest。
声明:本作品采用署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0)进行许可,使用时请注明出处。
Author: mengbin92
Github: mengbin92
cnblogs: 恋水无意