在 Golang 中消费来自 NSQ(消息队列)的数据

发布于 2021-11-18 17:16 ,所属分类:软件编程学习资料

CTO


CTO在线课堂,资Go师专题好课即将上线,本次Golang社区活跃人员,Go,以及一线大厂软件架构师。


手把手带领Go小白从零基础开始玩转Golang编程与实战,其中涉及Go基础进阶、Go-web开发、Go中高级实战、Go微服务、Go区块链等互联网企业热门技术要点、造一场动手实战Go线上专题课,现在预约即送200元优惠券,优惠券限量发售,送完即止,活动截止2021年12月25日


GoRustPythonIstiocontainerdCoreDNSEnvoyetcdFluentdHarborHelmJaegerKubernetesOpenPolicyAgentPrometheusRookTiKVTUFVitessArgoBuildpacksCloudEventsCNIContourCortexCRI-OFalcoFluxgRPCKubeEdgeLinkerdNATSNotaryOpenTracingOperatorFrameworkSPIFFESPIREThanos





在 Golang 中消费来自 NSQ(消息队列)的数据

先决条件

了解NSQ(消息队列)和Go Channels

NSQ(message queue) :https://nsq.io/

问题陈述

大规模系统中一个非常常见的问题是:您必须从一个服务中消费大量数据,进行一些处理并将处理后的数据推送到另一个服务

假设你正在处理一个后端微服务,它需要消耗来自NSQ(消息队列)的大量数据。大小为500万条信息。你必须做一些预处理,并将处理后的数据推入MongoDB/Redis等。

方法# 1(效率低)

互联网上几乎所有与使用NSQ消息相关的文章都使用这种方法

  • 调用消费者的方法。
  • 处理接收到的消息。
  • 推动MongoDB
  • 再次调用consumer方法。
packagemain
import(
nsq"github.com/nsqio/go-nsq"
)

funcAddMongoDB(data){
//yourcodeforpreprocessingandaddingnewvalueinmongoDB
}

varhandleMessage=func(msg*nsq.Message)error{
vardatainterface{}
_:=json.Unmarshal(msg.Body,&data)

AddMongoDB(data)
returnnil
}

funcInitNsqConsumer(){
nsqTopic:="dummy_topic"
nsqChannel:="dummy_channel"

consumer,err:=nsq.NewConsumer(nsqTopic,nsqChannel,nsq.NewConfig())
consumer.AddHandler(nsq.HandleFunc(handleMessage))

err=consumer.ConnectToNSQLookupd("127.0.0.1:4161")
iferr!=nil{
log.Print("errorconnectingtonsqlookupd")
}
}
有什么问题。

    1. 对于每一个msg,我们都在推进MongoDB。即约500万网络通话

    1. 此外,你不会消费下一个消息,直到“确认”收到来自Redis/MongoDB,即它将阻塞我们的goroutine

此解决方案无法扩展:(

方法2(有效的方法)

使用channelsgo-routines&批处理 步骤#1:当接收到消息时,将其推入channels,而不是立即处理它

varhandleMessage=func(msg*nsq.Message)error{
vardatainterface{}
_:=json.Unmarshal(msg.Body,&data)

consumerChannel<-data
returnnil
}

步骤2:一个单独的go例程,用于处理从channels接收到的消息。

在这里,消息从hannels接收,并批量推送到MongoDB。还添加了一个定时运行的代码块。这样做是为了在bulkArray的长度仍然小于阈值时推入数据。否则,小于1000(阈值)的批将永远不会被处理,因此需要一个计数器定期检查/处理这个

funcconsumeMessage(){
interval:=time.NewTicker(time.Second*10)//tickerofevery10second
threshold:=1000
bulkArray=make([]interface,0)

for{
select{
case<-interval.C:
AddMongoBulk(bulkArray)
bulkArray=nil

casemsg:=<-consumerChannel:
bulkArray=append(bulkArray,msg)
iflen(bulkArray)>=threshold{//pushing1000messagestoMongoDBatatime
AddMongoBulk(bulkArray)
bulkArray=nil
}
}
}
}

完整的代码(有效的方法)

packagemain
import(
nsq"github.com/nsqio/go-nsq"
)

varconsumerChannelchaninterface

funcAddMongoBulk(data){
//yourcodeforpreprocessingandaddingnewvalueinmongoDBinbulk
}

varhandleMessage=func(msg*nsq.Message)error{
vardatainterface{}
_:=json.Unmarshal(msg.Body,&data)

consumerChannel<-data
returnnil
}

funcconsumeMessage(){
interval:=time.NewTicker(time.Second*10)//tickerofevery10second
threshold:=1000
bulkArray=make([]interface,0)

for{
select{
case<-interval.C:
AddMongoBulk(bulkArray)
bulkArray=nil

casemsg:=<-consumerChannel:
bulkArray=append(bulkArray,msg)
iflen(bulkArray)>=threshold{//pushing1000messagestoMongoDBatatime
AddMongoBulk(bulkArray)
bulkArray=nil
}
}
}
}

funcInitNsqConsumer(){
consumerChannel=make(chaninterface,1)//channelofbuffer1,ifbuffer=0,itwillbeablockingchannel
goconsumeMessage()

//BelowcodeissameasMethod#1(inefficient)solution
nsqTopic:="dummy_topic"
nsqChannel:="dummy_channel"
consumer,err:=nsq.NewConsumer(nsqTopic,nsqChannel,nsq.NewConfig())
consumer.AddHandler(nsq.HandleFunc(handleMessage))
err=consumer.ConnectToNSQLookupd("127.0.0.1:4161")
iferr!=nil{
log.Print("errorconnectingtonsqlookupd")
}
}


[1]

[1]

:https://medium.com/@ajs219/consume-data-from-nsq-message-queue-in-golang-98278a63c192



相关资源