Skip to the content.

RabbitMQ SDK 支持发布、消费,连接恢复,死信队列,多种使用场景

2024-06-24 10:00:00


基于Example封装便于使用的SDK,支持发布、消费,连接恢复,死信队列,以及官方入门中的多种使用场景

参数解释(测试代码在下面)

启动RabbitMQ

docker run -d -p 15672:15672 -p 5672:5672 --hostname rabbitmq --name rabbitmq -e RABBITMQ_DEFAULT_USER=guest -e RABBITMQ_DEFAULT_PASS=guest rabbitmq:3-management

消费者

package main

import (
	"context"
	"flag"
	"log/slog"
	"os"
	"os/signal"
	"strings"
	"sync"
	"time"

	"github.com/panshiqu/golang/rabbitmq"
	amqp "github.com/rabbitmq/amqp091-go"
)

var addr = flag.String("addr", "amqp://guest:guest@localhost:5672/", "address")
var queueName = flag.String("queueName", "", "queue name")
var keys = flag.String("keys", "", "routing keys")

func onDelivery(delivery *amqp.Delivery) error {
	slog.Info("onDelivery", slog.String("body", string(delivery.Body)))
	// if string(delivery.Body) == "hi2" {
	// 	return errors.New("dead letter")
	// }
	time.Sleep(time.Second)
	slog.Info("onDelivery done")
	return nil
}

func main() {
	flag.Parse()

	client := rabbitmq.New(*queueName, *addr, strings.Split(*keys, ","))

	ctx, cancel := context.WithCancel(context.Background())

	wg := &sync.WaitGroup{}

	go client.ConsumeFunc(ctx, wg, onDelivery)

	c := make(chan os.Signal, 1)
	signal.Notify(c, os.Interrupt)
	sig := <-c
	slog.Info("notify", slog.Any("signal", sig))

	cancel()

	wg.Wait()
}

生产者

package main

import (
	"flag"
	"fmt"
	"log/slog"
	"os"
	"os/signal"
	"strings"
	"sync"
	"time"

	"github.com/panshiqu/golang/rabbitmq"
)

var addr = flag.String("addr", "amqp://guest:guest@localhost:5672/", "address")
var queueName = flag.String("queueName", "", "queue name")
var keys = flag.String("keys", "", "routing keys")
var key = flag.String("key", "", "routing key")

func publish(client *rabbitmq.Client, wg *sync.WaitGroup) {
	wg.Add(1)
	defer wg.Done()

	for i := 1; ; i++ {
		time.Sleep(time.Second)
		data := fmt.Sprintf("hi%d", i)

		if err := client.Push(*key, []byte(data)); err != nil {
			slog.Error("push", slog.Any("err", err))
			return
		}

		slog.Info("push", slog.String("data", data))
	}
}

func main() {
	flag.Parse()

	client := rabbitmq.New(*queueName, *addr, strings.Split(*keys, ","))

	wg := &sync.WaitGroup{}

	go publish(client, wg)

	c := make(chan os.Signal, 1)
	signal.Notify(c, os.Interrupt)
	sig := <-c
	slog.Info("notify", slog.Any("signal", sig))

	if err := client.Close(); err != nil {
		slog.Error("close", slog.Any("err", err))
	}

	wg.Wait()
}

多种使用场景

Hello World
go run consume/main.go -queueName one_queue -keys one_key
go run publish/main.go -queueName one_queue -keys one_key -key one_key

go run consume/main.go -queueName one_queue -keys one_key_new

注:后续启动修改keys,并不会解绑原路由键,而是纯粹绑定新路由键,如此以来发布key不管是one_key还是one_key_new都将路由到one_queue

# rabbitmqctl list_bindings
Listing bindings for vhost /...
source_name     source_kind     destination_name     destination_kind     routing_key     arguments
amq.topic       exchange        one_queue            queue                one_key         []
amq.topic       exchange        one_queue            queue                one_key_new     []
Work Queues
go run consume/main.go -queueName two_queue -keys two_key
go run consume/main.go -queueName two_queue -keys two_key
go run publish/main.go -queueName two_queue -keys two_key -key two_key
Publish/Subscribe
go run consume/main.go -queueName three_queue_new -keys three_key
go run publish/main.go -queueName three_queue -keys three_key -key three_key
go run consume/main.go -queueName three_queue -keys three_key

注:上面的生产者仅会声明queueNamethree_queue的队列,并不会声明three_queue_new,所以声明three_queue_new的消费者必须在生产者发布消息前启动,但three_queue没有这个限制

Routing
go run consume/main.go -queueName four_queue -keys error
go run consume/main.go -queueName four_queue_new -keys error,info
go run publish/main.go -queueName four_queue -keys error -key error
go run publish/main.go -queueName four_queue -keys error -key info
Topics
go run consume/main.go -queueName five_queue -keys "*.error"
go run consume/main.go -queueName five_queue_new -keys "kern.#"
go run publish/main.go -queueName five_queue -keys "*.error" -key cron.error
go run publish/main.go -queueName five_queue -keys "*.error" -key kern.info
go run publish/main.go -queueName five_queue -keys "*.error" -key kern.error
死信队列

业务逻辑如下返回错误,重新投递仍失败后将进入死信队列,保证消息不丢失,还可反复消费来排查问题

	if string(delivery.Body) == "hi2" {
		return errors.New("dead letter")
	}
go run consume/main.go -queueName six_queue -keys six_key
go run publish/main.go -queueName six_queue -keys six_key -key six_key
go run consume/main.go -queueName six_queue_dead_letter

特殊情况说明

建议业务逻辑对消息的消费支持幂等

关于发布确认

目前并未支持一对一的确认,意思就是发布三条消息,某次的发布因未收到确认而阻塞,但是无从知晓是三条中的哪条消息?当前是可以换用PublishWithDeferredConfirm发布并记录本端递增生成的DeliveryTag,虽然相同通道的消息是加锁顺序发布的,但是想要和对端RabbitMQ递增生成的DeliveryTag对应起来,在依赖可靠网络传输的基础上,收到消息时应立即递增生成,如此以来对应关系才能得到保证,目前还没有看RabbitMQ实现源码来明确这点

heartbeat

两端协调心跳时间会取较小值,但想禁用心跳双端须同时置0,如此便于调试避免触发断线重连

amqp://guest:guest@localhost:5672/?heartbeat=1800

rabbitmqctl eval 'application:get_env(rabbit, heartbeat).'
rabbitmqctl eval 'application:set_env(rabbit, heartbeat, 1800).'

有用的命令

rabbitmqctl list_queues
rabbitmqctl list_queues name messages_ready messages_unacknowledged
rabbitmqctl list_exchanges
rabbitmqctl list_bindings