在 Golang 中消费来自 NSQ(消息队列)的数据
发布于 2021-11-18 17:16 ,所属分类:软件编程学习资料
关注微信公众号《云原生CTO》更多云原生干货等你来探索
专注于云原生技术分享
提供优质云原生开发视频技术培训
面试技巧,及技术疑难问题解答
云原生CTO在线课堂,资深Go语言工程师专题好课即将上线,本次特邀Golang社区活跃人员,资深Go语言专家,以及一线大厂软件架构师。
手把手带领Go小白从零基础开始玩转Golang编程与实战,其中涉及Go基础进阶、Go-web开发、Go中高级实战、Go微服务、Go区块链等互联网企业热门技术要点、打造一场动手实战Go技能线上专题课,现在预约即送200元优惠券,优惠券限量发售,送完即止,活动截止2021年12月25日

云原生技术分享不仅仅局限于Go、Rust、Python、Istio、containerd、CoreDNS、Envoy、etcd、Fluentd、Harbor、Helm、Jaeger、Kubernetes、OpenPolicyAgent、Prometheus、Rook、TiKV、TUF、Vitess、Argo、Buildpacks、CloudEvents、CNI、Contour、Cortex、CRI-O、Falco、Flux、gRPC、KubeEdge、Linkerd、NATS、Notary、OpenTracing、OperatorFramework、SPIFFE、SPIRE和Thanos等


在 Golang 中消费来自 NSQ(消息队列)的数据

先决条件
了解NSQ(消息队列)和Go Channels
NSQ(message queue) :https://nsq.io/
问题陈述
大规模系统中一个非常常见的问题是:您必须从一个服务中消费大量数据,进行一些处理并将处理后的数据推送到另一个服务
假设你正在处理一个后端微服务,它需要消耗来自NSQ(消息队列)的大量数据。大小为500万条信息。你必须做一些预处理,并将处理后的数据推入MongoDB/Redis等。
方法# 1(效率低)
互联网上几乎所有与使用NSQ消息相关的文章都使用这种方法
调用消费者的方法。 处理接收到的消息。 推动 MongoDB。再次调用 consumer方法。
packagemain
import(
nsq"github.com/nsqio/go-nsq"
)
funcAddMongoDB(data){
//yourcodeforpreprocessingandaddingnewvalueinmongoDB
}
varhandleMessage=func(msg*nsq.Message)error{
vardatainterface{}
_:=json.Unmarshal(msg.Body,&data)
AddMongoDB(data)
returnnil
}
funcInitNsqConsumer(){
nsqTopic:="dummy_topic"
nsqChannel:="dummy_channel"
consumer,err:=nsq.NewConsumer(nsqTopic,nsqChannel,nsq.NewConfig())
consumer.AddHandler(nsq.HandleFunc(handleMessage))
err=consumer.ConnectToNSQLookupd("127.0.0.1:4161")
iferr!=nil{
log.Print("errorconnectingtonsqlookupd")
}
}
有什么问题。
对于每一个 msg,我们都在推进MongoDB。即约500万网络通话此外,你不会消费下一个消息,直到“确认”收到来自 Redis/MongoDB,即它将阻塞我们的goroutine
此解决方案无法扩展:(
方法2(有效的方法)
使用channels和go-routines&批处理 步骤#1:当接收到消息时,将其推入channels,而不是立即处理它
varhandleMessage=func(msg*nsq.Message)error{
vardatainterface{}
_:=json.Unmarshal(msg.Body,&data)
consumerChannel<-data
returnnil
}
步骤2:一个单独的go例程,用于处理从channels接收到的消息。
在这里,消息从hannels接收,并批量推送到MongoDB。还添加了一个定时运行的代码块。这样做是为了在bulkArray的长度仍然小于阈值时推入数据。否则,小于1000(阈值)的批将永远不会被处理,因此需要一个计数器定期检查/处理这个
funcconsumeMessage(){
interval:=time.NewTicker(time.Second*10)//tickerofevery10second
threshold:=1000
bulkArray=make([]interface,0)
for{
select{
case<-interval.C:
AddMongoBulk(bulkArray)
bulkArray=nil
casemsg:=<-consumerChannel:
bulkArray=append(bulkArray,msg)
iflen(bulkArray)>=threshold{//pushing1000messagestoMongoDBatatime
AddMongoBulk(bulkArray)
bulkArray=nil
}
}
}
}
完整的代码(有效的方法)
packagemain
import(
nsq"github.com/nsqio/go-nsq"
)
varconsumerChannelchaninterface
funcAddMongoBulk(data){
//yourcodeforpreprocessingandaddingnewvalueinmongoDBinbulk
}
varhandleMessage=func(msg*nsq.Message)error{
vardatainterface{}
_:=json.Unmarshal(msg.Body,&data)
consumerChannel<-data
returnnil
}
funcconsumeMessage(){
interval:=time.NewTicker(time.Second*10)//tickerofevery10second
threshold:=1000
bulkArray=make([]interface,0)
for{
select{
case<-interval.C:
AddMongoBulk(bulkArray)
bulkArray=nil
casemsg:=<-consumerChannel:
bulkArray=append(bulkArray,msg)
iflen(bulkArray)>=threshold{//pushing1000messagestoMongoDBatatime
AddMongoBulk(bulkArray)
bulkArray=nil
}
}
}
}
funcInitNsqConsumer(){
consumerChannel=make(chaninterface,1)//channelofbuffer1,ifbuffer=0,itwillbeablockingchannel
goconsumeMessage()
//BelowcodeissameasMethod#1(inefficient)solution
nsqTopic:="dummy_topic"
nsqChannel:="dummy_channel"
consumer,err:=nsq.NewConsumer(nsqTopic,nsqChannel,nsq.NewConfig())
consumer.AddHandler(nsq.HandleFunc(handleMessage))
err=consumer.ConnectToNSQLookupd("127.0.0.1:4161")
iferr!=nil{
log.Print("errorconnectingtonsqlookupd")
}
}
参考资料
参考[1]
参考资料
参考:https://medium.com/@ajs219/consume-data-from-nsq-message-queue-in-golang-98278a63c192


















![[视频教程] golang 入门到精通](https://static.kouhao8.com/sucaidashi/xkbb/e842684397e882f374ce618ef1e850dd.png?x-oss-process=image/format,webp/resize,w_88/crop,w_88,h_88,g_nw)





![[进阶教程] C4D在圆环表面跳跃的动画教程 Animated Spikes Cinema 4](https://static.kouhao8.com/sucaidashi/xkbb/70ccd2b65008c741722491f2a791dae9.jpg?x-oss-process=image/format,webp/resize,w_88/crop,w_88,h_88,g_nw)




相关资源