在上一节中,我们构建了一个简单日志系统,将日志信息广播给众多的接收者。

在本章节中,我们会给它添加个功能 – 只订阅部分消息。例如,我们直接将危重的错误信息写入日志文件(写入硬盘),于此同时所有信息仍会全部输出到终端上。


1、绑定

在之前的例子中我们已经创建了一个绑定,类似下面的代码:

err = ch.QueueBind(
		q.Name, // queue name
		"",     // routing key
		"logs", // exchange
		false,
		nil)

绑定是交换器和队列之间的关系。可以简单理解为:队列只对来自这个交换器的信息感兴趣。

绑定可以使用额外的routing_key参数。为了避免与Channel.Publish中的参数冲突,我们把它叫做绑定键。创建带有key值的绑定可以使用下面的代码:

err = ch.QueueBind(
		q.Name,  // queue name
		"black", // routing key
		"logs",  // exchange
		false,
		nil)

绑定键的意义取决于交换机类型。像我们之前使用的fanout,会直接忽略绑定键的值。


2、直接交换

上一节日志系统将消息广播给所有的消费者。我们希望扩展它的功能,可以根据严重程度来过滤消息。例如我们可能只想将接收到的危重错误消息写入日志存入硬盘,并不希望在warning和info级别的日志上浪费空间。

之前我们使用的是fanout交换机,它并没有给我们过多的选择,它只能进行无意识的广播。

现在我们使用direct交换机。direct背后的路由算法很简单 – 消息会进入那些绑定键与它的routing key完全匹配的队列中。

下图是为了更好的说明direct交换机:

在上面的配置中,direct交换机X绑定了两个队列。第一个队列的绑定键orange,第二个队列有两个绑定:一个的绑定键black,另一个是green

此时,发布到交换机的消息,如果它的路由键是orange,那么它将会被发送的队列Q1,如果它的路由键是black或者green,那么它将会被发送的队列Q2。其它消息会被丢弃。


3、多重绑定

使用相同的绑定键绑定多个队列是完全合法的。在我们的示例中,我们也可以用black来绑定XQ1。这种情况下,direct交换机跟fanout类似,会把消息广播到所有匹配的队列中。带有black路由键的消息会被分发给Q1Q2队列。


4、发送日志

我们将使用多重绑定的模式来实现我们的日志服务,使用direct交换机来发送消息,使用routing key来表示日志*级别。这样接收脚本就可以根据级别来选择它想要接收。接下来我们聚焦日志发送。

跟之前一样,我们需要先创建一个交换机:

err = ch.ExchangeDeclare(
	"logs_direct", // name
	"direct",      // type
	true,          // durable
	false,         // auto-deleted
	false,         // internal
	false,         // no-wait
	nil,           // arguments
)

然后,准备发送消息:

err = ch.ExchangeDeclare(
	"logs_direct", // name
	"direct",      // type
	true,          // durable
	false,         // auto-deleted
	false,         // internal
	false,         // no-wait
	nil,           // arguments
)
failOnError(err, "Failed to declare an exchange")

body := bodyFrom(os.Args)
err = ch.Publish(
	"logs_direct",         // exchange
	severityFrom(os.Args), // routing key
	false,                 // mandatory
	false,                 // immediate
	amqp.Publishing{
		ContentType: "text/plain",
    	Body:        []byte(body),
	})

为了简化操作,我们假定日志级别有:infowarningerror


5、订阅

接收消息时,除了我们需要为每个感兴趣的级别创建一个新的绑定以外,其它操作跟之前的都是一样的。

q, err := ch.QueueDeclare(
	"",    // name
	false, // durable
	false, // delete when unused
	true,  // exclusive
	false, // no-wait
	nil,   // arguments
)
failOnError(err, "Failed to declare a queue")

if len(os.Args) < 2 {
	log.Printf("Usage: %s [info] [warning] [error]", os.Args[0])
	os.Exit(0)
}
for _, s := range os.Args[1:] {
		log.Printf("Binding queue %s to exchange %s with routing key %s",
		q.Name, "logs_direct", s)
	err = ch.QueueBind(
		q.Name,        // queue name
		s,             // routing key
		"logs_direct", // exchange
		false,
		nil)
	failOnError(err, "Failed to bind a queue")
}

6、整合

日志发送脚本emit_log_direct.go代码如下:

package main

import (
	"log"
	"os"
	"strings"

	"github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()

	err = ch.ExchangeDeclare(
		"logs_direct", // name
		"direct",      // type
		true,          // durable
		false,         // auto-deleted
		false,         // internal
		false,         // no-wait
		nil,           // arguments
	)
	failOnError(err, "Failed to declare an exchange")

	body := bodyFrom(os.Args)
	err = ch.Publish(
		"logs_direct",         // exchange
		severityFrom(os.Args), // routing key
		false, // mandatory
		false, // immediate
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(body),
		})
	failOnError(err, "Failed to publish a message")

	log.Printf(" [x] Sent %s", body)
}

func bodyFrom(args []string) string {
	var s string
	if (len(args) < 3) || os.Args[2] == "" {
		s = "hello"
	} else {
		s = strings.Join(args[2:], " ")
	}
	return s
}

func severityFrom(args []string) string {
	var s string
	if (len(args) < 2) || os.Args[1] == "" {
		s = "info"
	} else {
		s = os.Args[1]
	}
	return s
}

日志接收receive_log_direct.go代码如下:

package main

import (
	"log"
	"os"

	"github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()

	err = ch.ExchangeDeclare(
		"logs_direct", // name
		"direct",      // type
		true,          // durable
		false,         // auto-deleted
		false,         // internal
		false,         // no-wait
		nil,           // arguments
	)
	failOnError(err, "Failed to declare an exchange")

	q, err := ch.QueueDeclare(
		"",    // name
		false, // durable
		false, // delete when unused
		true,  // exclusive
		false, // no-wait
		nil,   // arguments
	)
	failOnError(err, "Failed to declare a queue")

	if len(os.Args) < 2 {
		log.Printf("Usage: %s [info] [warning] [error]", os.Args[0])
		os.Exit(0)
	}
	for _, s := range os.Args[1:] {
		log.Printf("Binding queue %s to exchange %s with routing key %s", q.Name, "logs_direct", s)
		err = ch.QueueBind(
			q.Name,        // queue name
			s,             // routing key
			"logs_direct", // exchange
			false,
			nil)
		failOnError(err, "Failed to bind a queue")
	}

	msgs, err := ch.Consume(
		q.Name, // queue
		"",     // consumer
		true,   // auto ack
		false,  // exclusive
		false,  // no local
		false,  // no wait
		nil,    // args
	)
	failOnError(err, "Failed to register a consumer")

	forever := make(chan bool)

	go func() {
		for d := range msgs {
			log.Printf(" [x] %s", d.Body)
		}
	}()

	log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
	<-forever
}

如果你只想将保存warningerror级别的日志到文件中,那么打开终端,执行下面的命令:

go run receive_logs_direct.go warning error > logs_from_rabbit.log

如果你想将所有的日志输出到屏幕上,那开启新的中,执行下面的命令:

go run receive_logs_direct.go info warning error
# => [*] Waiting for logs. To exit press CTRL+C

发送error日志:

go run emit_log_direct.go error "Run. Run. Or it will explode."
# => [x] Sent 'error':'Run. Run. Or it will explode.'

接下来,在下一节中,将介绍如何基于模型来监听消息。


声明:本作品采用署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0)进行许可,使用时请注明出处。
Author: mengbin92
Github: mengbin92
cnblogs: 恋水无意