nsq 快速入门经验分享
2016-11-24 10:36:21
nsq 是什么东西,这里就不长篇大论啦
我在 Mac 上尝试 nsq.io 中提供的 QUICK START 遇到问题,这里作简要说明
当执行到第 6 步时,nsq_to_file 报如下错误:
error connecting to nsqd - dial tcp: i/o timeout
同时当你访问 http://127.0.0.1:4171/
点击某些菜单的时候也会报错
对于第一次部署的我,这种问题犹如晴天霹雳,因为那可是官方提供的教程啊
还好这种问题经不起推敲,大意就是连不上 nsqd 呗,但是为什么会连不上呢?我在 nsqd 命令行参数里找到了答案
-broadcast-address="": address that will be registered with lookupd (defaults to the OS hostname)
在 Mac 上应该显式指定
nsqd --lookupd-tcp-address=127.0.0.1:4160 --broadcast-address=127.0.0.1
我想正如上面说的那样,默认是操作系统主机名,这样 nsqd 虽然能绑定成功,但是客户端就是无法识别
关于 nsq 我还发现一个小小细节
nsqd 提供的 HTTP API /pub
publish a message to a topic(发布消息到话题)
但真正在实际操作的时候却是这样的,两种方式都可以?
curl -d "<message>" http://127.0.0.1:4151/put?topic=name
curl -d "<message>" http://127.0.0.1:4151/pub?topic=name
关于 nsq 入门我也仅有一天多的功力,分享不了太多有价值的东西,但是通过文档的阅读和实践,这里我作一下总结: 官方文档(其实就是上面的 QUICK START) 中文翻译(拜读部分翻译,表示感谢) go-nsq Golang客户端库(官方客户端开发库)
这里给出我用客户端开发库写的测试代码
package main
import (
"fmt"
"time"
"github.com/nsqio/go-nsq"
)
// ConsumerHandler 消费者处理者
type ConsumerHandler struct{}
// HandleMessage 处理消息
func (*ConsumerHandler) HandleMessage(msg *nsq.Message) error {
fmt.Println(string(msg.Body))
return nil
}
// Producer 生产者
func Producer() {
producer, err := nsq.NewProducer("127.0.0.1:4150", nsq.NewConfig())
if err != nil {
fmt.Println("NewProducer", err)
panic(err)
}
i := 1
for {
if err := producer.Publish("test", []byte(fmt.Sprintf("Hello World %d", i))); err != nil {
fmt.Println("Publish", err)
panic(err)
}
time.Sleep(time.Second * 5)
i++
}
}
// ConsumerA 消费者
func ConsumerA() {
consumer, err := nsq.NewConsumer("test", "test-channel-a", nsq.NewConfig())
if err != nil {
fmt.Println("NewConsumer", err)
panic(err)
}
consumer.AddHandler(&ConsumerHandler{})
if err := consumer.ConnectToNSQLookupd("127.0.0.1:4161"); err != nil {
fmt.Println("ConnectToNSQLookupd", err)
panic(err)
}
}
// ConsumerB 消费者
func ConsumerB() {
consumer, err := nsq.NewConsumer("test", "test-channel-b", nsq.NewConfig())
if err != nil {
fmt.Println("NewConsumer", err)
panic(err)
}
consumer.AddHandler(&ConsumerHandler{})
if err := consumer.ConnectToNSQLookupd("127.0.0.1:4161"); err != nil {
fmt.Println("ConnectToNSQLookupd", err)
panic(err)
}
}
func main() {
ConsumerA()
ConsumerB()
Producer()
}
命令执行顺序如下
nsqlookupd
nsqd --lookupd-tcp-address=127.0.0.1:4160 --broadcast-address=127.0.0.1
nsqadmin --lookupd-http-address=127.0.0.1:4161
测试程序执行打印如下
banjakukutekiiMac:test panshiqu$ ./main
2016/11/24 09:52:49 INF 1 [test/test-channel-a] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=test
2016/11/24 09:52:49 ERR 1 [test/test-channel-a] error querying nsqlookupd (http://127.0.0.1:4161/lookup?topic=test) - got response 404 Not Found "{\"message\":\"TOPIC_NOT_FOUND\"}"
2016/11/24 09:52:49 INF 2 [test/test-channel-b] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=test
2016/11/24 09:52:49 ERR 2 [test/test-channel-b] error querying nsqlookupd (http://127.0.0.1:4161/lookup?topic=test) - got response 404 Not Found "{\"message\":\"TOPIC_NOT_FOUND\"}"
2016/11/24 09:52:49 INF 3 (127.0.0.1:4150) connecting to nsqd
2016/11/24 09:53:57 INF 2 [test/test-channel-b] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=test
2016/11/24 09:53:57 INF 2 [test/test-channel-b] (127.0.0.1:4150) connecting to nsqd
Hello World 1
Hello World 2
Hello World 3
Hello World 4
Hello World 5
Hello World 6
Hello World 7
Hello World 8
Hello World 9
Hello World 10
Hello World 11
Hello World 12
Hello World 13
Hello World 14
Hello World 15
2016/11/24 09:54:01 INF 1 [test/test-channel-a] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=test
2016/11/24 09:54:01 INF 1 [test/test-channel-a] (127.0.0.1:4150) connecting to nsqd
Hello World 16
Hello World 16
Hello World 17
Hello World 17
Hello World 18
Hello World 18
...
对于输出我作如下理解,因为初次启动 nsq 相关程序,ConsumerA[test/test-channel-a] 查询 nsqlookupd 主题为 test,返回错误,主题不存在。ConsumerB[test/test-channel-b] 也执行上面的动作。这个时候应该不会创建两个 channel,test-channel-a 和 test-channel-b,也不会创建主题。接下来 Producer 成功连接 nsqd,这个时候会创建 test 主题。等待了一会后 ConsumerB 尝试查询主题成功,进而连接 nsqd,成功建立 test-channel-b,消费已被生产出的 15 条消息,因为 test-channel-a 还未被创建,所以目前已有的消息是不会被复制分发的。接着 ConsumerA 尝试查询主题成功,进而连接 nsqd,成功建立 test-channel-a,接下来的消息都是被复制分发的,两个消费者都能收到
两个 channel 都指定为 test-channel-a 将得到如下输出,可以确定的是多个消费者守在同一个 channel 中,同一条消息将只会被一个消费者处理
banjakukutekiiMac:test panshiqu$ go run main.go
2016/11/24 10:23:52 INF 1 [test/test-channel-a] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=test
2016/11/24 10:23:52 INF 1 [test/test-channel-a] (127.0.0.1:4150) connecting to nsqd
2016/11/24 10:23:52 INF 2 [test/test-channel-a] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=test
2016/11/24 10:23:52 INF 2 [test/test-channel-a] (127.0.0.1:4150) connecting to nsqd
2016/11/24 10:23:52 INF 3 (127.0.0.1:4150) connecting to nsqd
Hello World 1
Hello World 2
Hello World 3
提醒大家在执行上面流程的时候多去 http://127.0.0.1:4171/
查看运行状态,将会在那里发现很多内部细节