malefo`s blog 别想那么多,去做就对了

手造一个Go的Mq连接池

2020-03-04
malefo

Go
   

前言

从19年3月份开始接触GO,到现在20年3月,也有一年了,虽说是赶鸭子上架,到处都是坑,磕磕碰碰的也是坚持下来了,有辛苦有劳累, 但也有快乐的时候,要不我也不会百忙之中抽出时间来更新博客了

缘由

我在项目组中相当于一个中间件的的模块,将消息发给下游或者传递给上游。在由上游通过kafka或者grpc把消息给我, 然后我把消息丢到rabbitMQ中,下游的模块再从MQ中取出。当发生网络波动造成断连时,我这边就连不上MQ了,导致下游收不到消息。

实现目标

  • 有个连接池
  • 断线重连
  • 缓存池

后续目标

  • 池的弹性伸缩机制
  • 或许换个想法重构?
  • 单独提取出来封装

实现思想

  • 3月4号

每次消息过来就起一个新go程去把消息发到MQ中,某一刻,突然和MQ断连,就立马进入到wait,后面不断进来的go程也是去获取连接, 获取失败就wait,一旦出现获取成功的,就发个通知,前面之前的wait全部都重新去获取连接发送消息。

  • 3月5号

当有一个go程没有获取到连接,就立马将这个go程做5S一次的循环请求,而后面的请求会进入到一个设置了大小的缓存池中等待, 一旦超过阈值就会丢弃消息

问题

  • 3月5号

在出现 context deadline exceeded 错误的时候就再也连不上了,不知道是怎么引起的,还在模拟复现。怀疑在池里边context被取消了, 打印了个日志,需要复现才能知道

答案:经过几次复现和在对源码的研究,发现,是在生成context的时候,我应该使用一个可以手动取消的context.WithCancel, 一旦报错了,就把这个连接给释放了,就不会在获取到了, 但是我看源码中,只要这个连接的context发来了done,就会从维护的map中把这个io删除,那就不管是不是手动取消的问题了, 看来还需要在定位,这次的定位不准确

  • 3月7号

尝试使用cond,一旦only loop唤醒和等待同时进行,先唤醒了only loop然后马上接着wait,就造成死锁了,由于是高并发的场景, 我不能搞成队列,搞成队列的话就有点本末倒置的味道,所以我换成了chan close,关闭chan,通知所有的select,不用担心如果后面的等待了谁去唤醒, 每次连接成功都会区判断缓存池是否为空,不为空就通过关闭chan来唤醒

  • 3月12号

发现only loop 连接通了,但是没有出发wake up,那就是通道是关闭了的,查看日志,only loop联通之后发出消息,但没有发出唤醒, 缓冲池中的go程没有被唤醒,一直卡住了

  • 3月18号

发现WaitProducerMapCount的数量直接从195开始,然后only loop也没有,怀疑是没有发生重置功能, 在376行开始,加一个判断条件,如果WaitProducerMapCount不为0,就进入,强制清空, 今晚在开发环境多试试几次。

再开发环境也出现了,但是情况是直接无限loop,我怀疑是池的连接过期了,我直接关闭在打开试试了。

  • 3月22号

在加入了reset功能之后,一旦有的线程正在获取连接,而线程池又正在reset就会导致程序崩溃。 我加了一个全局的bool,一旦在reset的时候bool会为false,5s后恢复为true,而每次在连接时会进行一个基于bool判断的循环, 当bool为true就推出while循环

  • 3月30号

我进入了一个误区,在amqp中,池的策略有两种,一种是connection池,另一种是channel池。 在spring中,默认使用的是channel池,而我现在的是connection池,所以会不会就是这种影响? 不过我还怀疑我应该是缺少一个心跳机制导致的,我要看看源码有没有心跳机制。

经查看是有一个心跳线程,默认时间是60s,

每隔timeout / 2秒发送一个Heartbeat消息帧,这个值有时被称作heartbeat interval,如果连续丢失两个heartbeats消息帧,就认为tcp连接中断了。 不同的client表现不同,但是都会关闭tcp连接。当客户端通过heatbeat机制检测到RabbitMQ节点不可到达时,应该尝试重新连接。

  • 3月31号

关于go中的mq,我研究了下,借鉴了spring中的CachingConnectionFactory,这个就是一个mq的连接池, 在mq中其实是两种方案,连接池和通道池,默认是通道池+一个通道,如果时高并发场景需要多设置几个通道。 然后在go中出现的问题就是因为使用的是连接池,我每次发送完数据就把channel关闭了,然后把连接丢回池里, 下次再获取就会导致通道关闭的情况发生,所以改造一下为通道池,理论上应该就是ok了,但还是需要在多测试测试

  • 4月8号

改了几次,也放到测试环境去测试了,遇到了各种坑,还有莫名其妙的问题,也都慢慢改了,今天发一版新的, 不知道能不能抗的过去。。。 ps.刚刚后端的同事说,mq里挤压了太多消息,消费不掉要我做过滤,又是要多方面考虑的活,写完再来记录一下

索引

正文

1 连接池

连接池是上个同事留下的代码,那就把这个先研究一遍

  • 接口:Pool

提供了三个方法

type Pool interface {
	Get(ctx context.Context) (io.Closer, error)
	Put(ctx context.Context, c io.Closer, forceClose bool) error
	Close() error
}

主要我们就来研究一下这三个方法的实现

  • Get

get方法,之前我怀疑说如果连接取消,那这个连接不就是没用了,会一直报错,后面看了一下,发现是多虑了。 一旦收到上下文的取消,

func (p *Slice) Get(ctx context.Context) (io.Closer, error) {
	//锁定
	p.mu.Lock()

	if p.closed {
		p.mu.Unlock()
		return nil, ErrPoolClosed
	}
	//获取空闲时间,即你关闭了之后他还能活多久的时间
	idleTimeout := time.Duration(p.conf.IdleTimeout)
	// Prefer a free item, if possible.
	//空闲的携程
	numFree := len(p.freeItem)
	for numFree > 0 {
		i := p.freeItem[0]
		copy(p.freeItem, p.freeItem[1:])
		p.freeItem = p.freeItem[:numFree-1]
		p.mu.Unlock()
		//判断空闲线程是否已到期,到期就释放
		if i.expired(idleTimeout) {
			i.close()
			p.mu.Lock()
			p.release()
		} else {
			return i.c, nil
		}
		numFree = len(p.freeItem)
	}

	// Out of free items or we were asked not to use one. If we're not
	// allowed to open any more items, make a request and wait.
	//判断设置里边的活跃线程数是否大于零且当前活跃线程>=设置的活跃线程
	//从数组里取已经存在的
	if p.conf.Active > 0 && p.active >= p.conf.Active {
		// check WaitTimeout and return directly
		if p.conf.WaitTimeout == 0 && !p.conf.Wait {
			p.mu.Unlock()
			return nil, ErrPoolExhausted
		}
		// Make the item channel. It's buffered so that the
		// itemOpener doesn't block while waiting for the req to be read.
		req := make(chan item, 1)
		//获取下一个req的key
		reqKey := p.nextRequestKeyLocked()
		p.itemRequests[reqKey] = req
		wt := p.conf.WaitTimeout
		p.mu.Unlock()

		// reset context timeout
        //重置超时时间
		if wt > 0 {
			var cancel func()
			_, ctx, cancel = wt.Shrink(ctx)
			defer cancel()
		}
		// Timeout the item request with the context.
		select {
		case <-ctx.Done()://当收到上下文结束时就从数组中把这个map的key删掉
			// Remove the item request and ensure no value has been sent
			// on it after removing.
			p.mu.Lock()
            //把这个io从map中剔除
			delete(p.itemRequests, reqKey)
			p.mu.Unlock()
			return nil, ctx.Err()
		case ret, ok := <-req://如果这个io收到消息,就返回这个
			if !ok {
				return nil, ErrPoolClosed
			}
			//如果此时的时间>创建时间+空闲等待时间
			if ret.expired(idleTimeout) {
				ret.close()
				p.mu.Lock()
				p.release()
			} else {
				return ret.c, nil
			}
		}
	}

	//新建
	p.active++ // optimistically
	p.mu.Unlock()
	c, err := p.New(ctx)
	if err != nil {
		p.mu.Lock()
		p.release()
		p.mu.Unlock()
		return nil, err
	}
	return c, nil
}

接下来看看put方法

func (p *Slice) Put(ctx context.Context, c io.Closer, forceClose bool) error {
	p.mu.Lock()
	defer p.mu.Unlock()

	//true,直接关闭释放
	if forceClose {
		p.release()
		return c.Close()
	}
	//
	added := p.putItemLocked(c)
	if !added {
		p.active--
		return c.Close()
	}
	return nil
}

func (p *Slice) putItemLocked(c io.Closer) bool {
	if p.closed {
		return false
	}
	//当前数组已满,不给放回
	if p.conf.Active > 0 && p.active > p.conf.Active {
		return false
	}
	i := item{
		c:         c,
		createdAt: nowFunc(),
	}
    //这一块没看太明白,道行还不够
	if l := len(p.itemRequests); l > 0 {
		var req chan item
		var reqKey uint64
		for reqKey, req = range p.itemRequests {
			break
		}
		delete(p.itemRequests, reqKey) // Remove from pending requests.
		req <- i
		return true
	} else if !p.closed && p.maxIdleItemsLocked() > len(p.freeItem) {//已关闭且空闲数组数目小于设置的数目
		p.freeItem = append(p.freeItem, &i)
		return true
	}
	return false
}

还有最后一个close,关闭整个池

func (p *Slice) Close() error {
	p.mu.Lock()
	if p.closed {
		p.mu.Unlock()
		return nil
	}
	if p.cleanerCh != nil {
		close(p.cleanerCh)
	}
	var err error
	for _, i := range p.freeItem {
		i.close()
	}
	p.freeItem = nil
	p.closed = true
	for _, req := range p.itemRequests {
		close(req)
	}
	p.mu.Unlock()
    //关闭后的回调函数,可以自己设定
	p.stop()
	return err
}
2 重连机制

这一块在网上抄了两种代码,分别进行了实验都无法满足现场的情况,于是乎改装后再实验,经过了7,8次实验,大体的模型就敲定了。 高并发的重连必须要考虑到等待和唤醒的问题,等待的也不能太多,要不会把资源吃满,也不能丢弃得太多,要不数据都丢失了,这就需要多次实验, 选定几个参数,1.重试间隔时间,2.缓存区得大小,重连时间太短,缓存区设置大了就造成浪费,很容易丢多的信息,重连时间太长, 如果缓存区设置得小了,就会造成消息丢失过多。

结构:ProducerToPoolField

参数名 参数类型 描述
Key string 路由键
Exchange string 路由
ExchangeType string 类型
Uid string uuid
Expiration int ttl时间,-1代表不设置,单位毫秒
Durable bool 是否持久化
Reliable bool 是否需要mq回调发送成功信息
Data string 发送的数据

结构:ProducerToPool

参数名 参数类型 描述
isConnected bool 是否连通mq
connection *amqp.Connection 连接
channel *amqp.Channel 通道
done chan struct{} 关闭通道
notifyClose chan *amqp.Error mq错误通道
notifyConfirm chan amqp.Confirmation 发送回调通道
field *ProducerToPoolField ProducerToPoolField
onlyloopchan chan struct{} 唯一循环通道
isSend chan struct{} 可以发送数据得通知通道

5个很重要的全局参数

参数名 参数类型 描述
WaitProducerMap sync.Map 线程安全的map(缓存池)
WaitProducerMapCount int32 WaitProducerMap当前大小
WaitChan chan *ProducerToPool 每当出现一个断联就送到这里边
ExitWait chan struct{} 关闭通道
MAX_CACHE_PRODUCER int 缓存池大小阈值

这里边是新建一个ProducerToTool,同时打开一个监听go程,一旦连接失败,就不停的重试,如果出错就退出

func (d *Dao)NewProducerToTool(field *ProducerToPoolField) (*ProducerToPool,error) {
	//count := WaitProducerMapLen()
	//log.Info("uid is:%s,WaitProducerMapLen:%d",field.Uid,WaitProducerMapCount)

	var p ProducerToPool

	//打开ExitWait,如果它关闭了的话
	if !exitWaitIsOpen(field.Uid) {
		openExitWait(field.Uid)
	}

	p = ProducerToPool{
		done:   make(chan struct{}),
		isConnected:false,
		field:field,
		onlyloopchan: make(chan struct{}),
		isSend: make(chan struct{}),
	}
	go p.handleReconnect()
	return &p,nil
}

//断联监听,一旦断联就不停的尝试重连
func (p *ProducerToPool)handleReconnect(){
	count := 0
	for{
		p.isConnected = false
		log.Info("uid is:%s,Attempting to connect",p.field.Uid)

		for  {
			i := p.connect()
			if i == 2 {

				//WaitProducerMapCountLock.Lock()
				//WaitProducerMapCount++
				//WaitProducerMapCountLock.Unlock()

				atomic.AddInt32(&WaitProducerMapCount, 1)

				WaitChan <- p

				select {
				case <-ExitWait:
					continue
				case <-p.onlyloopchan:
					continue
				case <- p.done:
					return
				}
			} else if i != 0 {
				count++
				log.Info("uid is:%s,reconnect:%d",p.field.Uid,count)
				//time.Sleep(reconnectDelay)
				continue
			}
			break
		}
		select {
		case <-p.done:
			return
		case <-p.notifyClose:
			return
		}
	}
}

//0:success 1:fail->reconnect 2:con fail
func (p *ProducerToPool)connect() (int){
	ctx, cancel := context.WithCancel(context.Background())
	io_conn, err := cleanrbmqpool.Get(ctx)
	if err != nil {
		log.Info("uid is:%s,get io to pool err:%v",p.field.Uid,err)
		//如果是连接不通,退出重连循环,开始等待
		cancel()
		return 2
	}

	if conn, ok := io_conn.(*amqp.Connection);ok {

		//判断拿到的链接是否已被关闭
		if conn.IsClosed() {
			//直接释放掉
			cleanrbmqpool.Put(context.Background(),conn,true)
			return 1
		}

		ch, err := conn.Channel()

		if err != nil {
			log.Info("uid is:%s,get channel err:%v",p.field.Uid,err)
			return 1
		}

		err = ch.ExchangeDeclare(
			p.field.Exchange,     // name
			p.field.ExchangeType, // type
			p.field.Durable,      // durable
			false, // auto-deleted
			false, // internal
			false, // noWait
			nil,   // arguments
		)

		if err != nil {
			log.Info("uid is:%s, channel exchange declare failed",p.field.Uid)
			return 1
		}
		ch.Confirm(false)
		p.changeConnection(conn, ch)
		p.isConnected = true
		p.isSend <- struct{}{}
		//关闭ExitWait,如果它打开的话
		//做一个通知
		if exitWaitIsOpen(p.field.Uid) {
			//唤醒之前先做一个判断,判断map长度是否非空
			if !WaitProducerMapIsEmpty(p.field.Uid) {
				log.Info("uid is:%s,wake up",p.field.Uid)

				//notifyCond.Broadcast()

				close(ExitWait)

				//清空map
				ClearWaitProducerMap()
				//置0
				WaitProducerMapCount = 0

			}


		}

		return 0
	} else {
		log.Info("uid is:%s,io to amqp.Connection fail",p.field.Uid)
		return 1
	}
}

3 缓存机制

程序启动之后就会进入的一个监听go程,一旦有断连的就送来这里处理,是保存到缓存中还是丢弃都在这里判断

func(d *Dao)selectWaitProducerMap(){
	key := ""
	for {
		select {
		case p := <- WaitChan:
			log.Info("uid is:%s,WaitProducerMapCount:%d",p.field.Uid, WaitProducerMapCount)

			if WaitProducerMapCount == 1 {
				log.Info("uid is:%s,only loop",p.field.Uid)
				key = p.field.Uid
				time.Sleep(5 * time.Second)
				p.onlyloopchan <- struct{}{}
			} else if WaitProducerMapCount > 1{
				//判断是否那个唯一循环
				if key == p.field.Uid {
					log.Info("uid is:%s,only loop",p.field.Uid)

					//WaitProducerMapCountLock.Lock()
					//WaitProducerMapCount--
					//WaitProducerMapCountLock.Unlock()
					atomic.AddInt32(&WaitProducerMapCount, -1)

					time.Sleep(5 * time.Second)
					p.onlyloopchan <- struct{}{}
					continue
				}
				//判断是否已经存在于map中


				log.Info("uid is:%s,WaitProducerMapCount <= MAX_WAIT_PRODUCER",p.field.Uid)
				//WaitProducerMapCountLock.Lock()
				if WaitProducerMapCount <= MAX_CACHE_PRODUCER {
					log.Info("uid is:%s,field add cache",p.field.Uid)
					//CacheProducers = append(CacheProducers, p.field)

					//进map
					WaitProducerMap.Store(p.field.Uid,p)

				} else {
					log.Info("uid is:%s,cache filled",p.field.Uid)
					close(p.done)
				}
				//WaitProducerMapCountLock.Unlock()
			} else {
				log.Info("uid is:%s,wait",p.field.Uid)
			}
		}
	}
}


Similar Posts

下一篇 rust学习笔记