这部分看的是 golang1.2 的源码(其实是 3 年前看的,最近又拿出来复习整理了下),C语言实现的。
如果只是为了学习,而不是为了实际参与项目的开发,还是建议阅读低版本源代码,因为更纯粹些。
越是新的版本,里面掺杂的周边逻辑越多,比如 race
, debug
, profile
等等,这些周边功能就像打日志一样,到处都是,对阅读源码有较强的干扰。
基本原理
Channel 的内部主要结构挺简单的,本质上就是一个循环队列。如下图:
Channel 内部结构
使用一个循环队列来实现channel buffer,如果是非buffer channel,那么这个队列长度为0.
dataqsiz
表示队列总长度,即cap(ch)
, dataq
表示 buffer 中元素的个数,即len(ch)
sendx
和recvx
分别表示队列的游标,表示队列中元素的头和尾。
channel 还拥有两个 goroutine 链表,sendq
和 recvq
;链表中存放的是因该 channel 阻塞住的 goroutine。
一个 goroutine 操作这个channel(发送或者读取),如果发生阻塞,都会放到相应的链表中;如果没发生阻塞,会尝试从相应的链表中唤醒一个goroutine。
以ch <- 1
为例子,流程大致如下:
-
检查channel是否能正常写入(即队列没有满),如果能则运行第2步, 否则执行第3步.
-
将数据写入buffer, 唤醒 recvq 中的一个 goroutine,并把 recvx 处的数据 copy 给刚唤醒的 goroutine.
-
把自己加到 sendq 中,然后 block. 该 goroutine 会等到后续的
<-ch
操作被叫醒.
var ch chan int
注意看下面代码的注释。
// 对goroutine的一个封装,专门给channel使用 struct SudoG { G* g; SudoG* link; byte* elem; ... }; struct WaitQ // goroutine队列, 链表结构 { SudoG* first; SudoG* last; }; struct Hchan // 我们的channel,各个成员都在注释中说明 { uintgo qcount; // len(ch) uintgo dataqsiz; // cap(ch) uint16 elemsize; // 单个元素大小,因为我们声明的是 chan int 类型,这里就是 sizeof(int) bool closed; // 是否关闭, 执行 close(ch) 后就关闭了 uintgo sendx; // send index for buffer uintgo recvx; // receive index for buffer WaitQ recvq; // 接收的goroutine队列,单链表 WaitQ sendq; // 发送的goroutine队列,单链表 Lock; };
make(chan int, 10)
runtime·makechan_c(ChanType *t, int64 hint) { // .... c = (Hchan*)runtime·mallocgc(sizeof(*c) + hint*elem->size, (uintptr)t | TypeInfo_Chan, 0); c->elemsize = elem->size; c->dataqsiz = hint; // ... return c }
我们可以看到,make一个channel,就是开辟了一块连续的内存空间,内存的大小就是Hchan结构本身 加上 channel buffer大小: sizeof(c) + hint*elem->size
ch <- 1
写入操作,这个函数很长,下面分成了几个分支来说明
当 ch 是 nil 时
channel是一个nil值, 会使 goroutine 阻塞住
if(c == nil) { //... runtime·park(nil, nil, "chan send (nil chan)"); return; // not reached }
如果 ch 已经被 close 了
channel 已经关闭,panic
runtime·lock(c); if(c->closed) goto closed; // unlock an panic // .... closed: runtime·unlock(c); runtime·panicstring("send on closed channel");
如果 ch 是 buffer 的
处理带 buffer 的channel,仔细看注释
runtime·chansend(ChanType *t, Hchan *c, byte *ep, ...){ if(c->dataqsiz > 0) // 带buffer 的 channel goto asynch; // 异步 // ... asynch: if(c->closed) goto closed; if(c->qcount >= c->dataqsiz) { // buffer 满了 enqueue(&c->sendq, &mysg); // 放到sendq中 runtime·park(runtime·unlock, c, "chan send");// 阻塞 goto asynch; // 回去重新检查 } // 把数据copy到buffer中 c->elemalg->copy(c->elemsize, chanbuf(c, c->sendx), ep); if(++c->sendx == c->dataqsiz) // 修改队列游标 c->sendx = 0; c->qcount++; sg = dequeue(&c->recvq); // 把 recvq 里的找到一个goroutine if(sg != nil) { gp = sg->g; runtime·ready(gp); // 唤醒 } }
ch 不是 buffer 的
非 buffer channel 的处理方式
runtime·chansend(ChanType *t, Hchan *c, byte *ep, ...){ sg = dequeue(&c->recvq); // 从 recvq 获取一个等待该channel的goroutine if(sg != nil) { c->elemalg->copy(c->elemsize, sg->elem, ep); // 把值 copy 给这个 goroutine runtime·ready(gp); // 唤醒 return; } enqueue(&c->sendq, &mysg); // 把自己放到发送队列 sendq 中 runtime·park(runtime·unlock, c, "chan send"); // 阻塞自己 }
v, ok := <-ch
和上面的过程几乎是一模一样,不重复了。唯独有点不同的就是对关闭 channel 的处理。从一个已关闭的 channel 中读取数据,是不会 panic 的,而是直接返回。
closed: if(ep != nil) c->elemalg->copy(c->elemsize, ep, nil); if(selected != nil) // 如果在select语句里 *selected = true; if(received != nil) // ok 值 *received = false; runtime·unlock(c);
select
结构
truct Scase { SudoG sg; // must be first member (cast to Scase) Hchan* chan; // chan byte* pc; // return pc uint16 kind; // 类型, recv或send或default bool* receivedp; // pointer to received bool (recv2) }; struct Select { uint16 tcase; // total count of scase[] uint16 ncase; // currently filled scase[] uint16* pollorder; // case poll order Hchan** lockorder; // channel lock order Scase scase[1]; // one per case (in order of appearance) }
pollorder
是用来遍历lockorder
的,为了实现乱序,把pollorder
设置成乱序的数组,然后用其值作为索引遍历lockeorder
。支持乱序的原因是,避免如果第一个 channel 总是有数据,那其他的 channel case 就永远没机会执行了。
乱序
select 中所有 case 如果都满足达到非阻塞条件,哪个会被执行是随机的。这个随机是在程序里故意实现的。下面是一个乱序算法。
for(i=0; i<sel->ncase; i++) sel->pollorder[i] = i; for(i=1; i<sel->ncase; i++) { o = sel->pollorder[i]; j = runtime·fastrand1()%(i+1); sel->pollorder[i] = sel->pollorder[j]; sel->pollorder[j] = o; }
三次循环
Select的源代码函数里,有三个主要的循环。太长了这里不贴代码了。
-
循环检查(乱序遍历)所有 case 看是否有满足的channel,有就直接执行,然后return,否则执行第2步。
-
把 goroutine(也就是自己)加到所有 case 的 channel 的发送或接收队列中,然后阻塞,等待被叫醒。
-
被其中一个 case 的 channel 唤醒,把自己从其他所有 case 的 channel 的队列中删除,设置 PC 值,即被唤醒后进入哪个 case。
看着效率很差,但我们一般在代码中也写不了多少个 case,一般都是三五个,也不会导致性能下降。
close(ch)
关闭channel, 很简单。如果是 nil 或者已经被 close 了,直接 panic。
关闭后,会唤醒 recvq
和 sendq
两个链表中的所有 goroutine。
if(c == nil)// nil channel runtime·panicstring("close of nil channel"); if(c->closed) { // closed channel runtime·panicstring("close of closed channel"); } c->closed = true; // release all readers for(;;) { sg = dequeue(&c->recvq); if(sg == nil) break; runtime·ready(gp); } // release all writers for(;;) { sg = dequeue(&c->sendq); if(sg == nil) break; runtime·ready(gp); }
sendq
里的 goroutine 一旦被唤醒,就会 panic,因为它在尝试向一个关闭 channel 发数据。所以在参数传递时,会把 channel 做类型转换,声明下它是消费者还是生产者,即 <-chan Type
和 chan<- Type
2 种类型。避免消费者随意关闭 channel 导致生产者 panic。
nil channel 的用途
通常用来暂时屏蔽一个 channel,比如:
var done <-chan struct{} // 初始值是 nil for { select { case <-done: return case <-input: // 代码逻辑 if canReturn { done = ctx.Done() } else { done = nil } } }
代码中,虽然要被 context 来控制退出,但如果存在某些特殊状态,不允许被终止。就要用到 nil channel 了。
总结
非 buffer 的 channel 比 buffer channel 少了一次内存 copy。但非 buffer channel 工作起来基本就是相当于个互斥锁,会让 goroutine 无法并行,在多核机器上会导致程序的处理效率很差(即最大并发量很低,机器的 CPU 利用率低)。所以如果 channel 是一个常驻型的,直接make 一个大一点的 buffer channel 没关系。
通过 close channel 做简单的广播通知,这个很常用,官方 context 库也是这么做的。并不一定要用 context 库去通知,一个非 buffer 的 chan struct
变量就够。
channel 也是一种数据类型,即使没有 close 也是可以被 GC 的,没必要去做特殊的管理。
–
–
–
评论前必须登录!
注册