在上一节中,我们构建了一个简单日志系统,将日志信息广播给众多的接收者。
在本章节中,我们会给它添加个功能 – 只订阅部分消息。例如,我们直接将危重的错误信息写入日志文件(写入硬盘),于此同时所有信息仍会全部输出到终端上。
1、绑定
在之前的例子中我们已经创建了一个绑定,类似下面的代码:
err = ch.QueueBind(
q.Name, // queue name
"", // routing key
"logs", // exchange
false,
nil)
绑定是交换器和队列之间的关系。可以简单理解为:队列只对来自这个交换器的信息感兴趣。
绑定可以使用额外的routing_key
参数。为了避免与Channel.Publish
中的参数冲突,我们把它叫做绑定键
。创建带有key值的绑定可以使用下面的代码:
err = ch.QueueBind(
q.Name, // queue name
"black", // routing key
"logs", // exchange
false,
nil)
绑定键的意义取决于交换机类型。像我们之前使用的fanout
,会直接忽略绑定键的值。
2、直接交换
上一节日志系统将消息广播给所有的消费者。我们希望扩展它的功能,可以根据严重程度来过滤消息。例如我们可能只想将接收到的危重错误消息写入日志存入硬盘,并不希望在warning和info级别的日志上浪费空间。
之前我们使用的是fanout交换机,它并没有给我们过多的选择,它只能进行无意识的广播。
现在我们使用direct
交换机。direct
背后的路由算法很简单 – 消息会进入那些绑定键
与它的routing key
完全匹配的队列中。
下图是为了更好的说明direct
交换机:
在上面的配置中,direct
交换机X
绑定了两个队列。第一个队列的绑定键
是orange
,第二个队列有两个绑定:一个的绑定键
是black
,另一个是green
。
此时,发布到交换机的消息,如果它的路由键是orange
,那么它将会被发送的队列Q1
,如果它的路由键是black
或者green
,那么它将会被发送的队列Q2
。其它消息会被丢弃。
3、多重绑定
使用相同的绑定键绑定多个队列是完全合法的。在我们的示例中,我们也可以用black
来绑定X
和Q1
。这种情况下,direct
交换机跟fanout
类似,会把消息广播到所有匹配的队列中。带有black
路由键的消息会被分发给Q1
和Q2
队列。
4、发送日志
我们将使用多重绑定的模式来实现我们的日志服务,使用direct
交换机来发送消息,使用routing key
来表示日志*级别。这样接收脚本就可以根据级别来选择它想要接收。接下来我们聚焦日志发送。
跟之前一样,我们需要先创建一个交换机:
err = ch.ExchangeDeclare(
"logs_direct", // name
"direct", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
然后,准备发送消息:
err = ch.ExchangeDeclare(
"logs_direct", // name
"direct", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare an exchange")
body := bodyFrom(os.Args)
err = ch.Publish(
"logs_direct", // exchange
severityFrom(os.Args), // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
为了简化操作,我们假定日志级别有:info
、warning
和error
。
5、订阅
接收消息时,除了我们需要为每个感兴趣的级别创建一个新的绑定以外,其它操作跟之前的都是一样的。
q, err := ch.QueueDeclare(
"", // name
false, // durable
false, // delete when unused
true, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
if len(os.Args) < 2 {
log.Printf("Usage: %s [info] [warning] [error]", os.Args[0])
os.Exit(0)
}
for _, s := range os.Args[1:] {
log.Printf("Binding queue %s to exchange %s with routing key %s",
q.Name, "logs_direct", s)
err = ch.QueueBind(
q.Name, // queue name
s, // routing key
"logs_direct", // exchange
false,
nil)
failOnError(err, "Failed to bind a queue")
}
6、整合
日志发送脚本emit_log_direct.go
代码如下:
package main
import (
"log"
"os"
"strings"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
err = ch.ExchangeDeclare(
"logs_direct", // name
"direct", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare an exchange")
body := bodyFrom(os.Args)
err = ch.Publish(
"logs_direct", // exchange
severityFrom(os.Args), // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)
}
func bodyFrom(args []string) string {
var s string
if (len(args) < 3) || os.Args[2] == "" {
s = "hello"
} else {
s = strings.Join(args[2:], " ")
}
return s
}
func severityFrom(args []string) string {
var s string
if (len(args) < 2) || os.Args[1] == "" {
s = "info"
} else {
s = os.Args[1]
}
return s
}
日志接收receive_log_direct.go
代码如下:
package main
import (
"log"
"os"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
err = ch.ExchangeDeclare(
"logs_direct", // name
"direct", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare an exchange")
q, err := ch.QueueDeclare(
"", // name
false, // durable
false, // delete when unused
true, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
if len(os.Args) < 2 {
log.Printf("Usage: %s [info] [warning] [error]", os.Args[0])
os.Exit(0)
}
for _, s := range os.Args[1:] {
log.Printf("Binding queue %s to exchange %s with routing key %s", q.Name, "logs_direct", s)
err = ch.QueueBind(
q.Name, // queue name
s, // routing key
"logs_direct", // exchange
false,
nil)
failOnError(err, "Failed to bind a queue")
}
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto ack
false, // exclusive
false, // no local
false, // no wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf(" [x] %s", d.Body)
}
}()
log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
<-forever
}
如果你只想将保存warning
和error
级别的日志到文件中,那么打开终端,执行下面的命令:
go run receive_logs_direct.go warning error > logs_from_rabbit.log
如果你想将所有的日志输出到屏幕上,那开启新的中,执行下面的命令:
go run receive_logs_direct.go info warning error
# => [*] Waiting for logs. To exit press CTRL+C
发送error
日志:
go run emit_log_direct.go error "Run. Run. Or it will explode."
# => [x] Sent 'error':'Run. Run. Or it will explode.'
接下来,在下一节中,将介绍如何基于模型来监听消息。
声明:本作品采用署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0)进行许可,使用时请注明出处。
Author: mengbin92
Github: mengbin92
cnblogs: 恋水无意