前言
从19年3月份开始接触GO,到现在20年3月,也有一年了,虽说是赶鸭子上架,到处都是坑,磕磕碰碰的也是坚持下来了,有辛苦有劳累, 但也有快乐的时候,要不我也不会百忙之中抽出时间来更新博客了
缘由
我在项目组中相当于一个中间件的的模块,将消息发给下游或者传递给上游。在由上游通过kafka或者grpc把消息给我, 然后我把消息丢到rabbitMQ中,下游的模块再从MQ中取出。当发生网络波动造成断连时,我这边就连不上MQ了,导致下游收不到消息。
实现目标
- 有个连接池
- 断线重连
- 缓存池
后续目标
- 池的弹性伸缩机制
- 或许换个想法重构?
- 单独提取出来封装
实现思想
-
3月4号
每次消息过来就起一个新go程去把消息发到MQ中,某一刻,突然和MQ断连,就立马进入到wait,后面不断进来的go程也是去获取连接, 获取失败就wait,一旦出现获取成功的,就发个通知,前面之前的wait全部都重新去获取连接发送消息。
-
3月5号
当有一个go程没有获取到连接,就立马将这个go程做5S一次的循环请求,而后面的请求会进入到一个设置了大小的缓存池中等待, 一旦超过阈值就会丢弃消息
问题
-
3月5号
在出现 context deadline exceeded 错误的时候就再也连不上了,不知道是怎么引起的,还在模拟复现。怀疑在池里边context被取消了, 打印了个日志,需要复现才能知道
答案:经过几次复现和在对源码的研究,发现,是在生成context的时候,我应该使用一个可以手动取消的context.WithCancel, 一旦报错了,就把这个连接给释放了,就不会在获取到了, 但是我看源码中,只要这个连接的context发来了done,就会从维护的map中把这个io删除,那就不管是不是手动取消的问题了, 看来还需要在定位,这次的定位不准确
-
3月7号
尝试使用cond,一旦only loop唤醒和等待同时进行,先唤醒了only loop然后马上接着wait,就造成死锁了,由于是高并发的场景, 我不能搞成队列,搞成队列的话就有点本末倒置的味道,所以我换成了chan close,关闭chan,通知所有的select,不用担心如果后面的等待了谁去唤醒, 每次连接成功都会区判断缓存池是否为空,不为空就通过关闭chan来唤醒
-
3月12号
发现only loop 连接通了,但是没有出发wake up,那就是通道是关闭了的,查看日志,only loop联通之后发出消息,但没有发出唤醒, 缓冲池中的go程没有被唤醒,一直卡住了
-
3月18号
发现WaitProducerMapCount的数量直接从195开始,然后only loop也没有,怀疑是没有发生重置功能, 在376行开始,加一个判断条件,如果WaitProducerMapCount不为0,就进入,强制清空, 今晚在开发环境多试试几次。
再开发环境也出现了,但是情况是直接无限loop,我怀疑是池的连接过期了,我直接关闭在打开试试了。
-
3月22号
在加入了reset功能之后,一旦有的线程正在获取连接,而线程池又正在reset就会导致程序崩溃。 我加了一个全局的bool,一旦在reset的时候bool会为false,5s后恢复为true,而每次在连接时会进行一个基于bool判断的循环, 当bool为true就推出while循环
-
3月30号
我进入了一个误区,在amqp中,池的策略有两种,一种是connection池,另一种是channel池。 在spring中,默认使用的是channel池,而我现在的是connection池,所以会不会就是这种影响? 不过我还怀疑我应该是缺少一个心跳机制导致的,我要看看源码有没有心跳机制。
经查看是有一个心跳线程,默认时间是60s,
每隔timeout / 2秒发送一个Heartbeat消息帧,这个值有时被称作heartbeat interval,如果连续丢失两个heartbeats消息帧,就认为tcp连接中断了。 不同的client表现不同,但是都会关闭tcp连接。当客户端通过heatbeat机制检测到RabbitMQ节点不可到达时,应该尝试重新连接。
-
3月31号
关于go中的mq,我研究了下,借鉴了spring中的CachingConnectionFactory,这个就是一个mq的连接池, 在mq中其实是两种方案,连接池和通道池,默认是通道池+一个通道,如果时高并发场景需要多设置几个通道。 然后在go中出现的问题就是因为使用的是连接池,我每次发送完数据就把channel关闭了,然后把连接丢回池里, 下次再获取就会导致通道关闭的情况发生,所以改造一下为通道池,理论上应该就是ok了,但还是需要在多测试测试
-
4月8号
改了几次,也放到测试环境去测试了,遇到了各种坑,还有莫名其妙的问题,也都慢慢改了,今天发一版新的, 不知道能不能抗的过去。。。 ps.刚刚后端的同事说,mq里挤压了太多消息,消费不掉要我做过滤,又是要多方面考虑的活,写完再来记录一下
索引
正文
1 连接池
连接池是上个同事留下的代码,那就把这个先研究一遍
- 接口:Pool
提供了三个方法
type Pool interface {
Get(ctx context.Context) (io.Closer, error)
Put(ctx context.Context, c io.Closer, forceClose bool) error
Close() error
}
主要我们就来研究一下这三个方法的实现
- Get
get方法,之前我怀疑说如果连接取消,那这个连接不就是没用了,会一直报错,后面看了一下,发现是多虑了。 一旦收到上下文的取消,
func (p *Slice) Get(ctx context.Context) (io.Closer, error) {
//锁定
p.mu.Lock()
if p.closed {
p.mu.Unlock()
return nil, ErrPoolClosed
}
//获取空闲时间,即你关闭了之后他还能活多久的时间
idleTimeout := time.Duration(p.conf.IdleTimeout)
// Prefer a free item, if possible.
//空闲的携程
numFree := len(p.freeItem)
for numFree > 0 {
i := p.freeItem[0]
copy(p.freeItem, p.freeItem[1:])
p.freeItem = p.freeItem[:numFree-1]
p.mu.Unlock()
//判断空闲线程是否已到期,到期就释放
if i.expired(idleTimeout) {
i.close()
p.mu.Lock()
p.release()
} else {
return i.c, nil
}
numFree = len(p.freeItem)
}
// Out of free items or we were asked not to use one. If we're not
// allowed to open any more items, make a request and wait.
//判断设置里边的活跃线程数是否大于零且当前活跃线程>=设置的活跃线程
//从数组里取已经存在的
if p.conf.Active > 0 && p.active >= p.conf.Active {
// check WaitTimeout and return directly
if p.conf.WaitTimeout == 0 && !p.conf.Wait {
p.mu.Unlock()
return nil, ErrPoolExhausted
}
// Make the item channel. It's buffered so that the
// itemOpener doesn't block while waiting for the req to be read.
req := make(chan item, 1)
//获取下一个req的key
reqKey := p.nextRequestKeyLocked()
p.itemRequests[reqKey] = req
wt := p.conf.WaitTimeout
p.mu.Unlock()
// reset context timeout
//重置超时时间
if wt > 0 {
var cancel func()
_, ctx, cancel = wt.Shrink(ctx)
defer cancel()
}
// Timeout the item request with the context.
select {
case <-ctx.Done()://当收到上下文结束时就从数组中把这个map的key删掉
// Remove the item request and ensure no value has been sent
// on it after removing.
p.mu.Lock()
//把这个io从map中剔除
delete(p.itemRequests, reqKey)
p.mu.Unlock()
return nil, ctx.Err()
case ret, ok := <-req://如果这个io收到消息,就返回这个
if !ok {
return nil, ErrPoolClosed
}
//如果此时的时间>创建时间+空闲等待时间
if ret.expired(idleTimeout) {
ret.close()
p.mu.Lock()
p.release()
} else {
return ret.c, nil
}
}
}
//新建
p.active++ // optimistically
p.mu.Unlock()
c, err := p.New(ctx)
if err != nil {
p.mu.Lock()
p.release()
p.mu.Unlock()
return nil, err
}
return c, nil
}
接下来看看put方法
func (p *Slice) Put(ctx context.Context, c io.Closer, forceClose bool) error {
p.mu.Lock()
defer p.mu.Unlock()
//true,直接关闭释放
if forceClose {
p.release()
return c.Close()
}
//
added := p.putItemLocked(c)
if !added {
p.active--
return c.Close()
}
return nil
}
func (p *Slice) putItemLocked(c io.Closer) bool {
if p.closed {
return false
}
//当前数组已满,不给放回
if p.conf.Active > 0 && p.active > p.conf.Active {
return false
}
i := item{
c: c,
createdAt: nowFunc(),
}
//这一块没看太明白,道行还不够
if l := len(p.itemRequests); l > 0 {
var req chan item
var reqKey uint64
for reqKey, req = range p.itemRequests {
break
}
delete(p.itemRequests, reqKey) // Remove from pending requests.
req <- i
return true
} else if !p.closed && p.maxIdleItemsLocked() > len(p.freeItem) {//已关闭且空闲数组数目小于设置的数目
p.freeItem = append(p.freeItem, &i)
return true
}
return false
}
还有最后一个close,关闭整个池
func (p *Slice) Close() error {
p.mu.Lock()
if p.closed {
p.mu.Unlock()
return nil
}
if p.cleanerCh != nil {
close(p.cleanerCh)
}
var err error
for _, i := range p.freeItem {
i.close()
}
p.freeItem = nil
p.closed = true
for _, req := range p.itemRequests {
close(req)
}
p.mu.Unlock()
//关闭后的回调函数,可以自己设定
p.stop()
return err
}
2 重连机制
这一块在网上抄了两种代码,分别进行了实验都无法满足现场的情况,于是乎改装后再实验,经过了7,8次实验,大体的模型就敲定了。 高并发的重连必须要考虑到等待和唤醒的问题,等待的也不能太多,要不会把资源吃满,也不能丢弃得太多,要不数据都丢失了,这就需要多次实验, 选定几个参数,1.重试间隔时间,2.缓存区得大小,重连时间太短,缓存区设置大了就造成浪费,很容易丢多的信息,重连时间太长, 如果缓存区设置得小了,就会造成消息丢失过多。
结构:ProducerToPoolField
参数名 | 参数类型 | 描述 |
---|---|---|
Key | string | 路由键 |
Exchange | string | 路由 |
ExchangeType | string | 类型 |
Uid | string | uuid |
Expiration | int | ttl时间,-1代表不设置,单位毫秒 |
Durable | bool | 是否持久化 |
Reliable | bool | 是否需要mq回调发送成功信息 |
Data | string | 发送的数据 |
结构:ProducerToPool
参数名 | 参数类型 | 描述 |
---|---|---|
isConnected | bool | 是否连通mq |
connection | *amqp.Connection | 连接 |
channel | *amqp.Channel | 通道 |
done | chan struct{} | 关闭通道 |
notifyClose | chan *amqp.Error | mq错误通道 |
notifyConfirm | chan amqp.Confirmation | 发送回调通道 |
field | *ProducerToPoolField | ProducerToPoolField |
onlyloopchan | chan struct{} | 唯一循环通道 |
isSend | chan struct{} | 可以发送数据得通知通道 |
5个很重要的全局参数
参数名 | 参数类型 | 描述 |
---|---|---|
WaitProducerMap | sync.Map | 线程安全的map(缓存池) |
WaitProducerMapCount | int32 | WaitProducerMap当前大小 |
WaitChan | chan *ProducerToPool | 每当出现一个断联就送到这里边 |
ExitWait | chan struct{} | 关闭通道 |
MAX_CACHE_PRODUCER | int | 缓存池大小阈值 |
这里边是新建一个ProducerToTool,同时打开一个监听go程,一旦连接失败,就不停的重试,如果出错就退出
func (d *Dao)NewProducerToTool(field *ProducerToPoolField) (*ProducerToPool,error) {
//count := WaitProducerMapLen()
//log.Info("uid is:%s,WaitProducerMapLen:%d",field.Uid,WaitProducerMapCount)
var p ProducerToPool
//打开ExitWait,如果它关闭了的话
if !exitWaitIsOpen(field.Uid) {
openExitWait(field.Uid)
}
p = ProducerToPool{
done: make(chan struct{}),
isConnected:false,
field:field,
onlyloopchan: make(chan struct{}),
isSend: make(chan struct{}),
}
go p.handleReconnect()
return &p,nil
}
//断联监听,一旦断联就不停的尝试重连
func (p *ProducerToPool)handleReconnect(){
count := 0
for{
p.isConnected = false
log.Info("uid is:%s,Attempting to connect",p.field.Uid)
for {
i := p.connect()
if i == 2 {
//WaitProducerMapCountLock.Lock()
//WaitProducerMapCount++
//WaitProducerMapCountLock.Unlock()
atomic.AddInt32(&WaitProducerMapCount, 1)
WaitChan <- p
select {
case <-ExitWait:
continue
case <-p.onlyloopchan:
continue
case <- p.done:
return
}
} else if i != 0 {
count++
log.Info("uid is:%s,reconnect:%d",p.field.Uid,count)
//time.Sleep(reconnectDelay)
continue
}
break
}
select {
case <-p.done:
return
case <-p.notifyClose:
return
}
}
}
//0:success 1:fail->reconnect 2:con fail
func (p *ProducerToPool)connect() (int){
ctx, cancel := context.WithCancel(context.Background())
io_conn, err := cleanrbmqpool.Get(ctx)
if err != nil {
log.Info("uid is:%s,get io to pool err:%v",p.field.Uid,err)
//如果是连接不通,退出重连循环,开始等待
cancel()
return 2
}
if conn, ok := io_conn.(*amqp.Connection);ok {
//判断拿到的链接是否已被关闭
if conn.IsClosed() {
//直接释放掉
cleanrbmqpool.Put(context.Background(),conn,true)
return 1
}
ch, err := conn.Channel()
if err != nil {
log.Info("uid is:%s,get channel err:%v",p.field.Uid,err)
return 1
}
err = ch.ExchangeDeclare(
p.field.Exchange, // name
p.field.ExchangeType, // type
p.field.Durable, // durable
false, // auto-deleted
false, // internal
false, // noWait
nil, // arguments
)
if err != nil {
log.Info("uid is:%s, channel exchange declare failed",p.field.Uid)
return 1
}
ch.Confirm(false)
p.changeConnection(conn, ch)
p.isConnected = true
p.isSend <- struct{}{}
//关闭ExitWait,如果它打开的话
//做一个通知
if exitWaitIsOpen(p.field.Uid) {
//唤醒之前先做一个判断,判断map长度是否非空
if !WaitProducerMapIsEmpty(p.field.Uid) {
log.Info("uid is:%s,wake up",p.field.Uid)
//notifyCond.Broadcast()
close(ExitWait)
//清空map
ClearWaitProducerMap()
//置0
WaitProducerMapCount = 0
}
}
return 0
} else {
log.Info("uid is:%s,io to amqp.Connection fail",p.field.Uid)
return 1
}
}
3 缓存机制
程序启动之后就会进入的一个监听go程,一旦有断连的就送来这里处理,是保存到缓存中还是丢弃都在这里判断
func(d *Dao)selectWaitProducerMap(){
key := ""
for {
select {
case p := <- WaitChan:
log.Info("uid is:%s,WaitProducerMapCount:%d",p.field.Uid, WaitProducerMapCount)
if WaitProducerMapCount == 1 {
log.Info("uid is:%s,only loop",p.field.Uid)
key = p.field.Uid
time.Sleep(5 * time.Second)
p.onlyloopchan <- struct{}{}
} else if WaitProducerMapCount > 1{
//判断是否那个唯一循环
if key == p.field.Uid {
log.Info("uid is:%s,only loop",p.field.Uid)
//WaitProducerMapCountLock.Lock()
//WaitProducerMapCount--
//WaitProducerMapCountLock.Unlock()
atomic.AddInt32(&WaitProducerMapCount, -1)
time.Sleep(5 * time.Second)
p.onlyloopchan <- struct{}{}
continue
}
//判断是否已经存在于map中
log.Info("uid is:%s,WaitProducerMapCount <= MAX_WAIT_PRODUCER",p.field.Uid)
//WaitProducerMapCountLock.Lock()
if WaitProducerMapCount <= MAX_CACHE_PRODUCER {
log.Info("uid is:%s,field add cache",p.field.Uid)
//CacheProducers = append(CacheProducers, p.field)
//进map
WaitProducerMap.Store(p.field.Uid,p)
} else {
log.Info("uid is:%s,cache filled",p.field.Uid)
close(p.done)
}
//WaitProducerMapCountLock.Unlock()
} else {
log.Info("uid is:%s,wait",p.field.Uid)
}
}
}
}