在 Golang 中消费来自 NSQ(消息队列)的数据
发布于 2021-11-18 17:16 ,所属分类:软件编程学习资料
关注微信公众号《云原生CTO》更多云原生干货等你来探索
专注于云原生技术
分享
提供优质云原生开发
视频技术培训
面试技巧
,及技术疑难问题解答
云原生CTO
在线课堂,资深Go
语言工程师专题好课即将上线,本次特邀Golang
社区活跃人员,资深Go
语言专家,以及一线大厂软件架构师。
手把手带领Go
小白从零基础开始玩转Golang
编程与实战,其中涉及Go
基础进阶、Go-web
开发、Go
中高级实战、Go
微服务、Go
区块链等互联网企业热门技术要点、打造一场动手实战Go
技能线上专题课,现在预约即送200
元优惠券,优惠券限量发售,送完即止,活动截止2021年12月25日
云原生技术分享不仅仅局限于Go
、Rust
、Python
、Istio
、containerd
、CoreDNS
、Envoy
、etcd
、Fluentd
、Harbor
、Helm
、Jaeger
、Kubernetes
、OpenPolicyAgent
、Prometheus
、Rook
、TiKV
、TUF
、Vitess
、Argo
、Buildpacks
、CloudEvents
、CNI
、Contour
、Cortex
、CRI-O
、Falco
、Flux
、gRPC
、KubeEdge
、Linkerd
、NATS
、Notary
、OpenTracing
、OperatorFramework
、SPIFFE
、SPIRE
和Thanos
等
在 Golang 中消费来自 NSQ(消息队列)的数据
先决条件
了解NSQ(消息队列)和Go Channels
NSQ(message queue) :https://nsq.io/
问题陈述
大规模系统中一个非常常见的问题是:您必须从一个服务中消费大量数据,进行一些处理并将处理后的数据推送到另一个服务
假设你正在处理一个后端微服务,它需要消耗来自NSQ
(消息队列)的大量数据。大小为500
万条信息。你必须做一些预处理,并将处理后的数据推入MongoDB/Redis
等。
方法# 1(效率低)
互联网上几乎所有与使用NSQ
消息相关的文章都使用这种方法
调用消费者的方法。 处理接收到的消息。 推动 MongoDB
。再次调用 consumer
方法。
packagemain
import(
nsq"github.com/nsqio/go-nsq"
)
funcAddMongoDB(data){
//yourcodeforpreprocessingandaddingnewvalueinmongoDB
}
varhandleMessage=func(msg*nsq.Message)error{
vardatainterface{}
_:=json.Unmarshal(msg.Body,&data)
AddMongoDB(data)
returnnil
}
funcInitNsqConsumer(){
nsqTopic:="dummy_topic"
nsqChannel:="dummy_channel"
consumer,err:=nsq.NewConsumer(nsqTopic,nsqChannel,nsq.NewConfig())
consumer.AddHandler(nsq.HandleFunc(handleMessage))
err=consumer.ConnectToNSQLookupd("127.0.0.1:4161")
iferr!=nil{
log.Print("errorconnectingtonsqlookupd")
}
}
有什么问题。
对于每一个 msg
,我们都在推进MongoDB
。即约500
万网络通话此外,你不会消费下一个消息,直到“确认”收到来自 Redis/MongoDB
,即它将阻塞我们的goroutine
此解决方案无法扩展:(
方法2(有效的方法)
使用channels
和go-routines
&批处理 步骤#1:当接收到消息时,将其推入channels
,而不是立即处理它
varhandleMessage=func(msg*nsq.Message)error{
vardatainterface{}
_:=json.Unmarshal(msg.Body,&data)
consumerChannel<-data
returnnil
}
步骤2:一个单独的go
例程,用于处理从channels
接收到的消息。
在这里,消息从hannels
接收,并批量推送到MongoDB
。还添加了一个定时运行的代码块。这样做是为了在bulkArray
的长度仍然小于阈值时推入数据。否则,小于1000
(阈值)的批将永远不会被处理,因此需要一个计数器定期检查/处理这个
funcconsumeMessage(){
interval:=time.NewTicker(time.Second*10)//tickerofevery10second
threshold:=1000
bulkArray=make([]interface,0)
for{
select{
case<-interval.C:
AddMongoBulk(bulkArray)
bulkArray=nil
casemsg:=<-consumerChannel:
bulkArray=append(bulkArray,msg)
iflen(bulkArray)>=threshold{//pushing1000messagestoMongoDBatatime
AddMongoBulk(bulkArray)
bulkArray=nil
}
}
}
}
完整的代码(有效的方法)
packagemain
import(
nsq"github.com/nsqio/go-nsq"
)
varconsumerChannelchaninterface
funcAddMongoBulk(data){
//yourcodeforpreprocessingandaddingnewvalueinmongoDBinbulk
}
varhandleMessage=func(msg*nsq.Message)error{
vardatainterface{}
_:=json.Unmarshal(msg.Body,&data)
consumerChannel<-data
returnnil
}
funcconsumeMessage(){
interval:=time.NewTicker(time.Second*10)//tickerofevery10second
threshold:=1000
bulkArray=make([]interface,0)
for{
select{
case<-interval.C:
AddMongoBulk(bulkArray)
bulkArray=nil
casemsg:=<-consumerChannel:
bulkArray=append(bulkArray,msg)
iflen(bulkArray)>=threshold{//pushing1000messagestoMongoDBatatime
AddMongoBulk(bulkArray)
bulkArray=nil
}
}
}
}
funcInitNsqConsumer(){
consumerChannel=make(chaninterface,1)//channelofbuffer1,ifbuffer=0,itwillbeablockingchannel
goconsumeMessage()
//BelowcodeissameasMethod#1(inefficient)solution
nsqTopic:="dummy_topic"
nsqChannel:="dummy_channel"
consumer,err:=nsq.NewConsumer(nsqTopic,nsqChannel,nsq.NewConfig())
consumer.AddHandler(nsq.HandleFunc(handleMessage))
err=consumer.ConnectToNSQLookupd("127.0.0.1:4161")
iferr!=nil{
log.Print("errorconnectingtonsqlookupd")
}
}
参考资料
参考[1]
参考资料
参考:https://medium.com/@ajs219/consume-data-from-nsq-message-queue-in-golang-98278a63c192
相关资源