Skip to the content.

nsq 快速入门经验分享

2016-11-24 10:36:21


nsq 是什么东西,这里就不长篇大论啦

我在 Mac 上尝试 nsq.io 中提供的 QUICK START 遇到问题,这里作简要说明

当执行到第 6 步时,nsq_to_file 报如下错误: error connecting to nsqd - dial tcp: i/o timeout 同时当你访问 http://127.0.0.1:4171/ 点击某些菜单的时候也会报错

对于第一次部署的我,这种问题犹如晴天霹雳,因为那可是官方提供的教程啊

还好这种问题经不起推敲,大意就是连不上 nsqd 呗,但是为什么会连不上呢?我在 nsqd 命令行参数里找到了答案 -broadcast-address="": address that will be registered with lookupd (defaults to the OS hostname) 在 Mac 上应该显式指定 nsqd --lookupd-tcp-address=127.0.0.1:4160 --broadcast-address=127.0.0.1 我想正如上面说的那样,默认是操作系统主机名,这样 nsqd 虽然能绑定成功,但是客户端就是无法识别

关于 nsq 我还发现一个小小细节 nsqd 提供的 HTTP API /pub publish a message to a topic(发布消息到话题) 但真正在实际操作的时候却是这样的,两种方式都可以?

curl -d "<message>" http://127.0.0.1:4151/put?topic=name
curl -d "<message>" http://127.0.0.1:4151/pub?topic=name

关于 nsq 入门我也仅有一天多的功力,分享不了太多有价值的东西,但是通过文档的阅读和实践,这里我作一下总结: 官方文档(其实就是上面的 QUICK START) 中文翻译(拜读部分翻译,表示感谢) go-nsq Golang客户端库(官方客户端开发库)

这里给出我用客户端开发库写的测试代码

package main

import (
	"fmt"
	"time"

	"github.com/nsqio/go-nsq"
)

// ConsumerHandler 消费者处理者
type ConsumerHandler struct{}

// HandleMessage 处理消息
func (*ConsumerHandler) HandleMessage(msg *nsq.Message) error {
	fmt.Println(string(msg.Body))
	return nil
}

// Producer 生产者
func Producer() {
	producer, err := nsq.NewProducer("127.0.0.1:4150", nsq.NewConfig())
	if err != nil {
		fmt.Println("NewProducer", err)
		panic(err)
	}

	i := 1
	for {
		if err := producer.Publish("test", []byte(fmt.Sprintf("Hello World %d", i))); err != nil {
			fmt.Println("Publish", err)
			panic(err)
		}

		time.Sleep(time.Second * 5)

		i++
	}
}

// ConsumerA 消费者
func ConsumerA() {
	consumer, err := nsq.NewConsumer("test", "test-channel-a", nsq.NewConfig())
	if err != nil {
		fmt.Println("NewConsumer", err)
		panic(err)
	}

	consumer.AddHandler(&ConsumerHandler{})

	if err := consumer.ConnectToNSQLookupd("127.0.0.1:4161"); err != nil {
		fmt.Println("ConnectToNSQLookupd", err)
		panic(err)
	}
}

// ConsumerB 消费者
func ConsumerB() {
	consumer, err := nsq.NewConsumer("test", "test-channel-b", nsq.NewConfig())
	if err != nil {
		fmt.Println("NewConsumer", err)
		panic(err)
	}

	consumer.AddHandler(&ConsumerHandler{})

	if err := consumer.ConnectToNSQLookupd("127.0.0.1:4161"); err != nil {
		fmt.Println("ConnectToNSQLookupd", err)
		panic(err)
	}
}

func main() {
	ConsumerA()
	ConsumerB()
	Producer()
}

命令执行顺序如下

nsqlookupd
nsqd --lookupd-tcp-address=127.0.0.1:4160 --broadcast-address=127.0.0.1
nsqadmin --lookupd-http-address=127.0.0.1:4161

测试程序执行打印如下

banjakukutekiiMac:test panshiqu$ ./main 
2016/11/24 09:52:49 INF    1 [test/test-channel-a] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=test
2016/11/24 09:52:49 ERR    1 [test/test-channel-a] error querying nsqlookupd (http://127.0.0.1:4161/lookup?topic=test) - got response 404 Not Found "{\"message\":\"TOPIC_NOT_FOUND\"}"
2016/11/24 09:52:49 INF    2 [test/test-channel-b] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=test
2016/11/24 09:52:49 ERR    2 [test/test-channel-b] error querying nsqlookupd (http://127.0.0.1:4161/lookup?topic=test) - got response 404 Not Found "{\"message\":\"TOPIC_NOT_FOUND\"}"
2016/11/24 09:52:49 INF    3 (127.0.0.1:4150) connecting to nsqd
2016/11/24 09:53:57 INF    2 [test/test-channel-b] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=test
2016/11/24 09:53:57 INF    2 [test/test-channel-b] (127.0.0.1:4150) connecting to nsqd
Hello World 1
Hello World 2
Hello World 3
Hello World 4
Hello World 5
Hello World 6
Hello World 7
Hello World 8
Hello World 9
Hello World 10
Hello World 11
Hello World 12
Hello World 13
Hello World 14
Hello World 15
2016/11/24 09:54:01 INF    1 [test/test-channel-a] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=test
2016/11/24 09:54:01 INF    1 [test/test-channel-a] (127.0.0.1:4150) connecting to nsqd
Hello World 16
Hello World 16
Hello World 17
Hello World 17
Hello World 18
Hello World 18
...

对于输出我作如下理解,因为初次启动 nsq 相关程序,ConsumerA[test/test-channel-a] 查询 nsqlookupd 主题为 test,返回错误,主题不存在。ConsumerB[test/test-channel-b] 也执行上面的动作。这个时候应该不会创建两个 channel,test-channel-a 和 test-channel-b,也不会创建主题。接下来 Producer 成功连接 nsqd,这个时候会创建 test 主题。等待了一会后 ConsumerB 尝试查询主题成功,进而连接 nsqd,成功建立 test-channel-b,消费已被生产出的 15 条消息,因为 test-channel-a 还未被创建,所以目前已有的消息是不会被复制分发的。接着 ConsumerA 尝试查询主题成功,进而连接 nsqd,成功建立 test-channel-a,接下来的消息都是被复制分发的,两个消费者都能收到

两个 channel 都指定为 test-channel-a 将得到如下输出,可以确定的是多个消费者守在同一个 channel 中,同一条消息将只会被一个消费者处理

banjakukutekiiMac:test panshiqu$ go run main.go
2016/11/24 10:23:52 INF    1 [test/test-channel-a] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=test
2016/11/24 10:23:52 INF    1 [test/test-channel-a] (127.0.0.1:4150) connecting to nsqd
2016/11/24 10:23:52 INF    2 [test/test-channel-a] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=test
2016/11/24 10:23:52 INF    2 [test/test-channel-a] (127.0.0.1:4150) connecting to nsqd
2016/11/24 10:23:52 INF    3 (127.0.0.1:4150) connecting to nsqd
Hello World 1
Hello World 2
Hello World 3

提醒大家在执行上面流程的时候多去 http://127.0.0.1:4171/ 查看运行状态,将会在那里发现很多内部细节