网站首页 > 文章中心 > 其它

go语言实现消息总线

作者:小编 更新时间:2023-08-30 01:13:36 浏览量:164人看过

golang使用Nsq

① 介绍

最近在研究一些消息中间件,常用的MQ如RabbitMQ,ActiveMQ,Kafka等.NSQ是一个基于Go语言的分布式实时消息平台,它基于MIT开源协议发布,由bitly公司开源出来的一款简单易用的消息中间件.

①1 Features

①.). Distributed

NSQ提供了分布式的,去中心化,且没有单点故障的拓扑结构,稳定的消息传输发布保障,能够具有高容错和HA(高可用)特性.

go语言实现消息总线-图1

NSQ支持水平扩展,没有中心化的brokers.内置的发现服务简化了在集群中增加节点.同时支持pub-sub和load-balanced 的消息分发.

NSQ非常容易配置和部署,生来就绑定了一个管理界面.二进制包没有运行时依赖.官方有Docker image.

官方的 Go 和 Python库都有提供.而且为大多数语言提供了库.

NSQ推荐通过他们相应的nsqd实例使用协同定位发布者,这意味着即使面对网络分区,消息也会被保存在本地,直到它们被一个消费者读取.更重要的是,发布者不必去发现其他的nsqd节点,他们总是可以向本地实例发布消息.

NSQ

首先,一个发布者向它的本地nsqd发送消息,要做到这点,首先要先打开一个连接,然后发送一个包含topic和消息主体的发布命令,在这种情况下,我们将消息发布到事件topic上以分散到我们不同的worker中.

go语言实现消息总线-图2

nsqd

每个channel的消息都会进行排队,直到一个worker把他们消费,如果此队列超出了内存限制,消息将会被写入到磁盘中.Nsqd节点首先会向nsqlookup广播他们的位置信息,一旦它们注册成功,worker将会从nsqlookup服务器节点上发现所有包含事件topic的nsqd节点.

nsqlookupd

①.)客户表示已经准备好接收消息

这确保了消息丢失唯一可能的情况是不正常结束 nsqd 进程.在这种情况下,这是在内存中的任何信息(或任何缓冲未刷新到磁盘)都将丢失.

如何防止消息丢失是最重要的,即使是这个意外情况可以得到缓解.一种解决方案是构成冗余 nsqd对(在不同的主机上)接收消息的相同部分的副本.因为你实现的消费者是幂等的,以两倍时间处理这些消息不会对下游造成影响,并使得系统能够承受任何单一节点故障而不会丢失信息.

单个 nsqd 实例被设计成可以同时处理多个数据流.流被称为"话题"和话题有 1 个或多个"通道".每个通道都接收到一个话题中所有消息的拷贝.在实践中,一个通道映射到下行服务消费一个话题.

efficiency

因为NSQ没有在守护程序之间共享信息,所以它从一开始就是为了分布式操作而生.个别的机器可以随便宕机随便启动而不会影响到系统的其余部分,消息发布者可以在本地发布,即使面对网络分区.

这种"分布式优先"的设计理念意味着NSQ基本上可以永远不断地扩展,需要更高的吞吐量?那就添加更多的nsqd吧.唯一的共享状态就是保存在lookup节点上,甚至它们不需要全局视图,配置某些nsqd注册到某些lookup节点上这是很简单的配置,唯一关键的地方就是消费者可以通过lookup节点获取所有完整的节点集.清晰的故障事件——NSQ在组件内建立了一套明确关于可能导致故障的的故障权衡机制,这对消息传递和恢复都有意义.虽然它们可能不像Kafka系统那样提供严格的保证级别,但NSQ简单的操作使故障情况非常明显.

不像其他的队列组件,NSQ并没有提供任何形式的复制和集群,也正是这点让它能够如此简单地运行,但它确实对于一些高保证性高可靠性的消息发布没有足够的保证.我们可以通过降低文件同步的时间来部分避免,只需通过一个标志配置,通过EBS支持我们的队列.但是这样仍然存在一个消息被发布后马上死亡,丢失了有效的写入的情况.

虽然Kafka由一个有序的日志构成,但NSQ不是.消息可以在任何时间以任何顺序进入队列.在我们使用的案例中,这通常没有关系,因为所有的数据都被加上了时间戳,但它并不适合需要严格顺序的情况.

NSQ对于超时系统,它使用了心跳检测机制去测试消费者是否存活还是死亡.很多原因会导致我们的consumer无法完成心跳检测,所以在consumer中必须有一个单独的步骤确保幂等性.

本文将nsq集群具体的安装过程略去,大家可以自行参考官网,比较简单.这部分介绍下笔者实验的拓扑,以及nsqadmin的go语言实现消息总线相关咨询.

topology

NSQ基本没有配置文件,配置通过命令行指定参数.

主要命令如下:

LOOKUPD命令

NSQD命令

工具类,消费后存储到本地文件.

发布一条消息

对Streams的详细信息进行查看,包括NSQD节点,具体的channel,队列中的消息数,连接数等信息.

nsqadmin

channel

列出所有的NSQD节点:

nodes

消息的统计:

msgs

lookup主机的列表:

hosts

NSQ基本核心就是简单性,是一个简单的队列,这意味着它很容易进行故障推理和很容易发现bug.消费者可以自行处理故障事件而不会影响系统剩下的其余部分.

事实上,简单性是我们决定使用NSQ的首要因素,这方便与我们的许多其他软件一起维护,通过引入队列使我们得到了堪称完美的表现,通过队列甚至让我们增加了几个数量级的吞吐量.越来越多的consumer需要一套严格可靠性和顺序性保障,这已经超过了NSQ提供的简单功能.

结合我们的业务系统来看,对于我们所需要传输的发票消息,相对比较敏感,无法容忍某个nsqd宕机,或者磁盘无法使用的情况,该节点堆积的消息无法找回.这是我们没有选择该消息中间件的主要原因.简单性和可靠性似乎并不能完全满足.相比Kafka,ops肩负起更多负责的运营.另一方面,它拥有一个可复制的、有序的日志可以提供给我们更好的服务.但对于其他适合NSQ的consumer,它为我们服务的相当好,我们期待着继续巩固它的坚实的基础.

go语言能做什么?

很多朋友可能知道Go语言的优势在哪,却不知道Go语言适合用于哪些地方.

①.0、 Tsuru:开源的PAAS平台,和SAE实现的功能一模一样.

以上的就是关于go语言能做什么的内容介绍了.

go语言聊天室实现(七)websocket收消息设置

上一节中,我们为每个连接都创建了一个goroutine来读取其中的消息,现在我们将这个读取消息的方法实现一下.

我们在application目录下新建controllers目录,并在其中创建一个MessageController.go文件.

首先我们新建一个MessageController的结构体,内容如下

这个结构体包括两个内容,一个是我们将连接放在数组之后,返回的索引,另一个是连接本身.

这个是具体的方法.

我们首先设置了一下读消息的大小、超时时间以及超时后需要的操作.

超时时间如果设置为0,那么就是永不超时.之前今天这一节直接写0,被告知需要传一个time.Time类型的数据.最终谷歌后才得到了这个值time.Time{}为"0001-01-01 00:00:00 +0000 UTC".

我们将用户手法消息的内容定义为一个结构体,然后将用户的订阅信息的json通过json.unmarshal转换成这个结构体.

之后的switch操作与我们在Swoole中的操作基本雷同,在查询到login之后,调用service中 的login方法来进行注册.

下一节中我们再介绍具体的注册逻辑.

基于go的websocket消息推送的集群实现

目前websocket技术已经很成熟,选型Go语言,当然是为了节省成本以及它强大的高并发性能.我使用的是第三方开源的websocket库即gorilla/websocket.

由于我们线上推送的量不小,推送后端需要部署多节点保持高可用,所以需要自己做集群,具体架构方案如图:

Auth Service:鉴权服务,根据Token验证用户权限.

Collect Service:消息采集服务,负责收集业务系统消息,存入MongoDB后,发送给消息分发服务.

Dispatch Service:消息分发服务,根据路由规则分发至对应消息推送服务节点上.

Push Service:消息推送服务,通过websocket将消息推送给用户.

集群推送的关键点在于,web端与服务端建立长连接之后,具体跟哪个推送节点保持长连接的,如果我们能够找到对应的连接节点,那么我们就可以将消息推送出去.下面讲解一下集群的大致流程:

① web端用户登录之后,带上token与后端推送服务(Push Service)保持长连接.

其他注意事项:

Go - Micro微服务框架实践 - API(十三)

Micro的api就是api网关

API参考了 API网关模式 为服务提供了一个单一的公共入口.基于服务发现,使得micro api可以提供具备http及动态路由的服务.

Micro的API基于HTTP协议.请求的API接口通过HTTP协议访问,并且路由是基于服务发现机制向下转发的. Micro API在 go-micro 之上开发,所以它集成了服务发现、负载均衡、编码及基于RPC的通信.

go语言实现消息总线-图3

因为micro api内部使用了go-micro,所以它自身也是可插拔的. 参考 go-plugins 了解对gRPC、kubernetes、etcd、nats、及rabbitmq等支持.另外,api也使用了 go-api ,这样,接口handler也是可以配置的.

ACME( Automatic Certificate Management Environment)是由 Let's Encrypt 制定的安全协议.

可以选择是否配置白名单

API服务支持TLS证书

API使用带分隔符的命名空间来在逻辑上区分后台服务及公开的服务.命名空间及http请求路径会用于解析服务名与方法,比如 GET /foo HTTP/1.1 会被路由到 go.micro.api.foo 服务上.

API默认的命名空间是 go.micro.api ,当然,也可以修改:

完整示例可以参考: examples/greeter

先决条件:我们使用Consul作为默认的服务发现,所以请先确定它已经安装好了,并且已经运行,比如执行 consul agent -dev 这样子方式运行.

向micro api发起http请求

HTTP请求的路径 /greeter/say/hello 会被路由到服务 go.micro.api.greeter 的方法 Say.Hello 上.

绕开api服务并且直接通过rpc调用:

使用JSON的方式执行同一请求:

micro api提供下面类型的http api接口

请看下面的例子

Handler负责持有并管理HTTP请求路由.

默认的handler使用从注册中心获取的端口元数据来决定指向服务的路由,如果路由不匹配,就会回退到使用"rpc" hander.在注册时,可以通过 go-api 来配置路由.

API有如下方法可以配置请求handler:

通过 /rpc 入口可以绕开handler处理器.

API处理器接收任何的HTTP请求,并且向前转发指定格式的RPC请求.

RPC处理器接收json或protobuf格式的HTTP POST请求,然后向前转成RPC请求.

代理Handler其实是内置在服务发现中的反向代理服务.

事件处理器使用go-micro的broker代理接收http请求并把请求作为消息传到消息总线上.

Web处理器是,它是内置在服务发现中的HTTP反向代理服务,支持web socket.

/rpc 端点允许绕过主handler,然后与任何服务直接会话.

示例:

更多信息查看可运行的示例: github.com/micro/examples/api

解析器,Micro使用命名空间与HTTP请求路径来动态路由到具体的服务.

API命名的空间是 go.micro.api .可以通过指令 --namespace 或者环境变量 MICRO_NAMESPACE= 设置命名空间.

下面说一下解析器是如何使用的:

RPC解析器示例中的RPC服务有名称与方法,分别是 go.micro.api.greeter , Greeter.Hello .

URL会被解析成以下几部分:

带版本号的API URL也可以很容易定位到具体的服务:

代理解析器只处理服务名,所以处理方案和RPC解析器有点不太一样.

go语言--Goroutines

①.、goroutine:在go语言中,每一个并发的执行单元叫做goroutine,如果一个程序中包含多个goroutine,对两个函数的调用则可能发生在同一时刻

版权声明:倡导尊重与保护知识产权。未经许可,任何人不得复制、转载、或以其他方式使用本站《原创》内容,违者将追究其法律责任。本站文章内容,部分图片来源于网络,如有侵权,请联系我们修改或者删除处理。

编辑推荐

热门文章