昨天在内网上看到一篇讲数据库连接的文章,列出了一些 sql 包的一些源码,我注意到其中取用、归还连接的方式非常有意思——通过临时创建的 channel 来传递连接。

在 sql.DB 结构体里,使用 freeConn 字段来表示当前所有的连接,也就是一个连接池。

type DB struct {
    freeConn     []*driverConn
}

当需要拿连接的时候,从 freeConn 中取出第一个元素:

conn := db.freeConn[0]
copy(db.freeConn, db.freeConn[1:])
db.freeConn = db.freeConn[:numFree-1]
conn.inUse = true

取 slice 切片的第一个元素,然后将 slice 后面的元素往前挪,最后通过截断来“释放”最后一个元素。

当然,能进行上述操作的前提是切片 db.freeConn 长度大于 0,即有空闲连接存在。如果当前没有空闲连接,那如何处理呢?接下来就是 channel 的妙用的地方。

sql.DB 结构体里还有另一个字段 connRequests,它用来存储当前有哪些“协程”在申请连接:

type DB struct {
    freeConn     []*driverConn
    connRequests map[uint64]chan connRequest
}

connRequests 的 key 是一个 uint64类型,其实就是一个递增加 1 的 key;而 connRequest 表示申请一个新连接的请求:

type connRequest struct {
	conn *driverConn
	err  error
}

这里的 conn 正是需要的连接。

当连接池中没有空闲连接的时候:

req := make(chan connRequest, 1)
reqKey := db.nextRequestKeyLocked()
db.connRequests[reqKey] = req

先是构建了一个 chan connRequest,同时拿到了一个 reqKey,将它和 req 绑定到 connRequests 中。

接下来,在 select 中等待超时或者从 req 这个 channel 中拿到空闲连接:

select {
	case <-ctx.Done():
		
	case ret, ok := <-req:
		if !ok {
			return nil, errDBClosed
		}
		
		return ret.conn, ret.err
}

可以看到,select 有两个 case,第一个是通过 context 控制的 <-Done;第二个则是前面构造的 <-req,如果从 req 中读出了元素,那就相当于获得了连接:ret.conn。

那什么时候会向 req 中发送连接呢?答案是在向连接池归还连接的时候。

前面提到,空闲连接是一个切片,归还的时候直接 append 到这个切片就可以了:

func (db *DB) putConnDBLocked(dc *driverConn, err error) bool {
    db.freeConn = append(db.freeConn, dc)
}

但其实在 append 之前,还会去检查当前 connRequests 中是否有申请空闲连接的请求:

if c := len(db.connRequests); c > 0 {
	var req chan connRequest
	var reqKey uint64
	for reqKey, req = range db.connRequests {
		break
	}
	delete(db.connRequests, reqKey) // Remove from pending requests.
	if err == nil {
		dc.inUse = true
	}
	req <- connRequest{
		conn: dc,
		err:  err,
	}
	return true
} 

如果有请求的话,直接将当前连接“塞到” req channel 里去了。另一边,申请连接的 goroutine 就可以从 req channel 中读出 conn。

于是,通过 channel 就实现了一次“连接传输”的功能。

这让我想到不久之前芮神写的一篇《高并发服务遇redis瓶颈引发time-wait事故》,文中提到了将多个 redis command 组装为一个 pipeline:

调用方把 redis command 和接收结果的 chan 推送到任务队列中,然后由一个 worker 去消费,worker 组装多个 redis cmd 为 pipeline,向 redis 发起请求并拿回结果,拆解结果集后,给每个命令对应的结果 chan 推送结果。调用方在推送任务到队列后,就一直监听传输结果的 chan。

redis commnd 组装成 pipeline

这里的用法就和本文描述的 channel 用法一致。

细想一下,以上提到的 channel 用法很神奇吗?我们平时没有接触过吗?

我用过最多的是“生产者-消费者”模式,先启动 N 个 goroutine 消费者,读某个 channel,之后,生产者再在某个时候向 channel 中发送元素:

for i := 0; i < engine.workerNum; i++ {
    go func() {
        for {
            work = <-engine.workChan
        }
    }
}

另外,我还会用 channel 充当一个 “ready” 的信号,用来指示某个“过程”准备好了,可以接收结果了:

func (j *Job) Finished() <-chan bool {
	return j.finish
}

前面提到的“生产者-消费者”和 “ready” 信号这两种 channel 用法和本文的 channel 用法并没有什么本质区别。唯一不同的点是前者的 channel 是事先创建好的,并且是“公用”的;而本文中用到的 channel 实际上是“临时”创建的,并且只有这一个请求使用。

最后,用曹大最近在读者群里说的话结尾:

  1. 抄代码是很好的学习方式。

  2. 选一两个感兴趣的方向,自己尝试实现相应的 feature list,实现完和标准实现做对比。

  3. 先积累再创造,别一上来就想着造轮子,看的多了碰上很多东西就有新思路了。