路漫漫其修远兮
吾将上下而求索

第14章总结:协程(goroutine)与通道(channel)

14章

14.1 
一个应用程序是运行在机器上的一个进程;
进程是一个运行在自己内存地址空间里的独立执行体。
一个进程由一个或多个操作系统线程组成,这些线程其实是共享同一个内存地址空间的一起工作的执行体。
几乎所有'正式'的程序都是多线程的,以便让用户或计算机不必等待,或者能够同时服务多个请求(如 Web 服务器),或增加性能和吞吐量(例如,
通过对不同的数据集并行执行代码)。
一个并发程序可以在一个处理器或者内核上使用多个线程来执行任务,但是只有同一个程序在某个时间点同时运行在多核或者多处理器上才是真正的并行。

不要使用全局变量或者共享内存,它们会给你的代码在并发运算的时候带来危险。

在 Go 中,应用程序并发处理的部分被称作 goroutines(协程),它可以进行更有效的并发运算。在协程和操作系统线程之间并无一对一的关系:协程
是根据一个或多个线程的可用性,映射(多路复用,执行于)在他们之上的;协程调度器在 Go 运行时很好的完成了这个工作。

协程是轻量的,比线程更轻。它们痕迹非常不明显(使用少量的内存和资源):使用 4K 的栈内存就可以在堆中创建它们。因为创建非常廉价,必要的时
候可以轻松创建并运行大量的协程(在同一个地址空间中 100,000 个连续的协程)。并且它们对栈进行了分割,从而动态的增加(或缩减)内存的使用;
栈的管理是自动的,但不是由垃圾回收器管理的,而是在协程退出后自动释放。

协程是通过使用关键字 go 调用(执行)一个函数或者方法来实现的(也可以是匿名或者 lambda 函数)。这样会在当前的计算过程中开始一个同时进行
的函数,在相同的地址空间中并且分配了独立的栈,比如:go sum(bigArray),在后台计算总和。

在 gc 编译器下(6g 或者 8g)你必须设置 GOMAXPROCS 为一个大于默认值 1 的数值来允许运行时支持使用多于 1 个的操作系统线程,所有的协程都会
共享同一个线程除非将 GOMAXPROCS 设置为一个大于 1 的数。当 GOMAXPROCS 大于 1 时,会有一个线程池管理许多的线程。通过 gccgo 编译器 GOMAXPROCS 
有效的与运行中的协程数量相等。假设 n 是机器上处理器或者核心的数量。如果你设置环境变量 GOMAXPROCS>=n,或者执行 runtime.GOMAXPROCS(n),接
下来协程会被分割(分散)到 n 个处理器上。更多的处理器并不意味着性能的线性提升。有这样一个经验法则,对于 n 个核心的情况设置 GOMAXPROCS 为
 n-1 以获得最佳性能,也同样需要遵守这条规则:协程的数量 > 1 + GOMAXPROCS > 1。

还有一些通过实验观察到的现象:在一台 1 颗 CPU 的笔记本电脑上,增加 GOMAXPROCS 到 9 会带来性能提升。在一台 32 核的机器上,设置 GOMAXPROCS=8 
会达到最好的性能,在测试环境中,更高的数值无法提升性能。如果设置一个很大的 GOMAXPROCS 只会带来轻微的性能下降;设置 GOMAXPROCS=100,使用 
top 命令和 H 选项查看到只有 7 个活动的线程。

注意:当 main() 函数返回的时候,程序退出:它不会等待任何其他非 main 协程的结束。当执行到两个协程的时候,这两个协程开始同时在后台执行,同
时main函数会继续向下执行,main函数必须执行的时间大于协程执行的时间,不然main函数执行完成后会退出,继而协程都会中断。

func main() {
	// setting GOMAXPROCS to 2 gives +- 22% performance increase,
	// but increasing the number doesn't increase the performance
	// without GOMAXPROCS: +- 86000
	// setting GOMAXPROCS to 2: +- 105000
	// setting GOMAXPROCS to 3: +- 94000
	runtime.GOMAXPROCS(2)
	ch1 := make(chan int)
	ch2 := make(chan int)

	go pump1(ch1)
	go pump2(ch2)
	go suck(ch1, ch2)

	time.Sleep(1e9)
}

func pump1(ch chan int) {
	for i := 0; ; i++ {
		ch <- i * 2
	}
}

func pump2(ch chan int) {
	for i := 0; ; i++ {
		ch <- i + 5
	}
}

func suck(ch1, ch2 chan int) {
	for i := 0; ; i++ {
		select {
		case v := <-ch1:
			fmt.Printf("%d - Received on channel 1: %d\n", i, v)
		case v := <-ch2:
			fmt.Printf("%d - Received on channel 2: %d\n", i, v)
		}
	}
}


14.2 

而Go有一个特殊的类型,通道(channel),像是通道(管道),可以通过它们发送类型化的数据在协程之间通信,可以避开所有内存共享导致的坑;通道的
通信方式保证了同步性。数据通过通道:同一时间只有一个协程可以访问数据:所以不会出现数据竞争,设计如此。数据的归属(可以读写数据的能力)被传递。

通常使用这样的格式来声明通道:var identifier chan datatype
var ch1 chan string
ch1 = make(chan string)
或者
ch1 := make(chan string)

流向通道(发送)
ch <- int1 表示:用通道 ch 发送变量 int1(双目运算符,中缀 = 发送)

从通道流出(接收),三种方式:
int2 = <- ch 表示:变量 int2 从通道 ch(一元运算的前缀操作符,前缀 = 接收)接收数据(获取新值);假设 int2 已经声明过了,如果没有的话可
以写成:int2 := <- ch。

func main() {
	ch := make(chan string)

	go sendData(ch)
	go getData(ch)  

	time.Sleep(1e9)
}

func sendData(ch chan string) {
	ch <- "Washington"
	ch <- "Tripoli"
	ch <- "London"
	ch <- "Beijing"
	ch <- "Tokio"
}

func getData(ch chan string) {
	var input string
	// time.Sleep(2e9)
	for {
		input = <-ch
		fmt.Printf("%s ", input)
	}
}
main() 等待了 1 秒让两个协程完成,如果不这样,sendData() 就没有机会输出。
getData() 使用了无限循环:它随着 sendData() 的发送完成和 ch 变空也结束了。
如果我们移除一个或所有 go 关键字,程序无法运行,Go 运行时会抛出 panic:

下面的话重要
默认情况下,通信是同步且无缓冲的:在有接受者接收数据之前,发送不会结束。可以想象一个无缓冲的通道在没有空间来保存数据的时候:必须要一个接
收者准备好接收通道的数据然后发送者可以直接把数据发送给接收者。所以通道的发送/接收操作在对方准备好之前是阻塞的:

1)对于同一个通道,发送操作(协程或者函数中的),在接收者准备好之前是阻塞的:如果ch中的数据无人接收,就无法再给通道传入其他数据:新的输
入无法在通道非空的情况下传入。所以发送操作会等待 ch 再次变为可用状态:就是通道值被接收时(可以传入变量)。

2)对于同一个通道,接收操作是阻塞的(协程或函数中的),直到发送者可用:如果通道中没有数据,接收者就阻塞了。

尽管这看上去是非常严格的约束,实际在大部分情况下工作的很不错。


一个无缓冲通道只能包含 1 个元素,有时显得很局限。我们给通道提供了一个缓存,可以在扩展的 make 命令中设置它的容量,如下:
buf := 100
ch1 := make(chan string, buf)
buf 是通道可以同时容纳的元素(这里是 string)个数
在缓冲满载(缓冲被全部使用)之前,给一个带缓冲的通道发送数据是不会阻塞的,而从通道读取数据也不会阻塞,直到缓冲空了。
如果容量大于 0,通道就是异步的了:缓冲满载(发送)或变空(接收)之前通信不会阻塞,元素会按照发送的顺序被接收。

同步:ch :=make(chan type, value)
value == 0 -> synchronous, unbuffered (阻塞)
value > 0 -> asynchronous, buffered(非阻塞)取决于value元素
若使用通道的缓冲,你的程序会在“请求”激增的时候表现更好:更具弹性,专业术语叫:更具有伸缩性(scalable)。要在首要位置使用无缓冲通道来
设计算法,只在不确定的情况下使用缓冲。

下面几种方法都可以等待协程处理完成后退出整个程序
func compute(ch chan int){
	ch <- someComputation() // when it completes, signal on the channel.
}

func main(){
	ch := make(chan int) 	// allocate a channel.
	go compute(ch)		// stat something in a goroutines
	doSomethingElseForAWhile()
	result := <- ch
}
这个信号也可以是其他的,不返回结果,比如下面这个协程中的匿名函数(lambda)协程:

ch := make(chan int)
go func(){
	// doSomething
	ch <- 1 // Send a signal; value does not matter
}()
doSomethingElseForAWhile()
<- ch	// Wait for goroutine to finish; discard sent value.

########################################
type Empty interface {}
var empty Empty
...
data := make([]float64, N)
res := make([]float64, N)
sem := make(chan Empty, N)
...
for i, xi := range data {
	go func (i int, xi float64) {
		res[i] = doSomething(i, xi)
		sem <- empty
	} (i, xi)
}
// wait for goroutines to finish
for i := 0; i < N; i++ { <-sem }

###################################################
习惯用法:通道工厂模式
编程中常见的另外一种模式如下:不将通道作为参数传递给协程,而用函数来生成一个通道并返回(工厂角色);函数内有个匿名函数被协程调用。

import (
	"fmt"
	"time"
)

func main() {
	stream := pump()
	go suck(stream)
	time.Sleep(1e9)

}


func pump() chan int {
	ch := make(chan int)
	go func() {
		for i := 0; ; i++ {
			ch <- i
		}
	}()
	return ch
}

func suck(ch chan int) {
	for {
		fmt.Println(<-ch)
	}
}



for 循环的 range 语句可以用在通道 ch 上,便可以从通道中获取值,像这样:
for v := range ch {
	fmt.Printf("The value is %v\n", v)
}

素数算法
https://github.com/unknwon/the-way-to-go_ZH_CN/blob/master/eBook/14.2.md



通常和 if 语句一起使用:
if v, ok := <-ch; ok {
  process(v)
}

或者在 for 循环中接收的时候,当关闭或者阻塞的时候使用 break:
v, ok := <-ch
if !ok {
  break
}
process(v)

使用 for-range 语句来读取通道是更好的办法,因为这会自动检测通道是否关闭:
for input := range ch {
  	process(input)
}



通道类型可以用注解来表示它只发送或者只接收:
var send_only chan<- int 		// channel can only send data
var recv_only <-chan int		// channel can only receive data
只接收的通道(<-chan T)无法关闭,因为关闭通道是发送者用来表示不再给通道发送值了,所以对只接收通道是没有意义的。通道创建的时候都是
双向的,但也可以分配有方向的通道变量,就像以下代码:

var c = make(chan int) // bidirectional
go source(c)
go sink(c)

func source(ch chan<- int){
	for { ch <- 1 }
}

func sink(ch <-chan int) {
	for { <-ch }
}


14.3 
通道可以被显式的关闭;尽管它们和文件不同:不必每次都关闭。只有在当需要告诉接收者不会再提供新的值的时候,才需要关闭通道。只有发送者
需要关闭通道,接收者永远不会需要。
第一个可以通过函数 close(ch) 来完成:这个将通道标记为无法通过发送操作 <- 接受更多的值;给已经关闭的通道发送或者再次关闭都会导致运
行时的 panic。在创建一个通道后使用 defer 语句是个不错的办法

getData() 又如何检测到通道是否关闭或阻塞?
可以使用逗号,ok 操作符:用来检测通道是否被关闭。
如何来检测可以收到没有被阻塞(或者通道没有被关闭)?
v, ok := <-ch   // ok is true if v received value


14.4 
从不同的并发执行的协程中获取值可以通过关键字select来完成,它和switch控制语句非常相似(章节5.3)也被称作通信开关;它的行为像是“
你准备好了吗”的轮询机制;select监听进入通道的数据,也可以是用通道发送值的时候
default 语句是可选的;fallthrough 行为,和普通的 switch 相似,是不允许的。在任何一个 case 中执行 break 或者 return,select 就结束了。

select 做的就是:选择处理列出的多个通信情况中的一个。
如果都阻塞了,会等待直到其中一个可以处理
如果多个可以处理,随机选择一个
如果没有通道操作可以处理并且写了 default 语句,它就会执行:default 永远是可运行的(这就是准备好了,可以执行)。
在 select 中使用发送操作并且有 default可以确保发送不被阻塞!如果没有 case,select 就会一直阻塞。
select 语句实现了一种监听模式,通常用在(无限)循环中;在某种情况下,通过 break 语句使循环退出。

在程序 goroutine_select.go 中有 2 个通道 ch1 和 ch2,三个协程 pump1()、pump2() 和 suck()。这是一个典型的生产者消费者模式。在无
限循环中,ch1 和 ch2 通过 pump1() 和 pump2() 填充整数;suck() 也是在无限循环中轮询输入的,通过 select 语句获取 ch1 和 ch2 的整
数并输出。选择哪一个 case 取决于哪一个通道收到了信息。
func main() {
	ch1 := make(chan int)
	ch2 := make(chan int)

	go pump1(ch1)
	go pump2(ch2)
	go suck(ch1, ch2)

	time.Sleep(1e9)
}

func pump1(ch chan int) {
	for i := 0; ; i++ {
		ch <- i * 2
	}
}

func pump2(ch chan int) {
	for i := 0; ; i++ {
		ch <- i + 5
	}
}

func suck(ch1, ch2 chan int) {
	for {
		select {
		case v := <-ch1:
			fmt.Printf("Received on channel 1: %d\n", v)
		case v := <-ch2:
			fmt.Printf("Received on channel 2: %d\n", v)
		}
	}
}


习惯用法:后台服务模式
服务通常是是用后台协程中的无限循环实现的,在循环中使用 select 获取并处理通道中的数据:
// Backend goroutine.
func backend() {
	for {
		select {
		case cmd := <-ch1:
			// Handle ...
		case cmd := <-ch2:
			...
		case cmd := <-chStop:
			// stop server
		}
	}
}


14.5 

在协程周期性的执行一些事情(打印状态日志,输出,计算等等)的时候,使用time.Ticker比较有用
time.Ticker 结构体,这个对象以指定的时间间隔重复的向通道 C 发送时间值:
type Ticker struct {
    C <-chan Time // the channel on which the ticks are delivered.
    // contains filtered or unexported fields
    ...
}
时间间隔的单位是 ns(纳秒,int64),在工厂函数 time.NewTicker 中以 Duration 类型的参数传入:func Newticker(dur) *Ticker。
调用 Stop() 使计时器停止,在 defer 语句中使用。这些都很好的适应 select 语句:
ticker := time.NewTicker(updateInterval)
defer ticker.Stop()
...
select {
case u:= <-ch1:
    ...
case v:= <-ch2:
    ...
case <-ticker.C:
    logState(status) // call some logging function logState
default: // no value ready to be received
    ...
}

time.Tick() 函数声明为 Tick(d Duration) <-chan Time,当你想返回一个通道而不必关闭它的时候这个函数非常有用:它以 d 为周期给返
回的通道发送时间,d是纳秒数。如果需要像下边的代码一样,限制处理频率(函数 client.Call() 是一个 RPC 
import "time"

rate_per_sec := 10
var dur Duration = 1e9 / rate_per_sec
chRate := time.Tick(dur) // a tick every 1/10th of a second
for req := range requests {
    <- chRate // rate limit our Service.Method RPC calls
    go client.Call("Service.Method", req, ...)
}
这样只会按照指定频率处理请求:chRate 阻塞了更高的频率。每秒处理的频率可以根据机器负载(和/或)资源的情况而增加或减少。


14.6

一个用到 recover 的程序(参见第 13.3 节)停掉了服务器内部一个失败的协程而不影响其他协程的工作。
func server(workChan <-chan *Work) {
    for work := range workChan {
        go safelyDo(work)   // start the goroutine for that work
    }
}

func safelyDo(work *Work) {
    defer func {
        if err := recover(); err != nil {
            log.Printf("Work failed with %s in %v", err, work)
        }
    }()
    do(work)
}
上边的代码,如果 do(work) 发生 panic,错误会被记录且协程会退出并释放,而其他协程不受影响。


14.7
这节主要讲思路,和传统的共享内存的区别

旧模式:使用共享内存进行同步
传统的多个程序访问同一个变量,要加锁和解锁操作

新模式:使用通道
这里的N表示启动多少个work进行并行处理


任务     未处理队列  已处理队列
              | ->P-> |
              | ->P-> |
||||||| ->P-> | ->P-> |
              | ->P-> |
              | ->P-> |


对于任何可以建模为Master-Worker范例的问题,一个类似于worker使用通道进行通信和交互、Master进行整体协调的方案都能完美解决。
如果系统部署在多台机器上,各个机器上执行Worker协程,Master和Worker之间使用netchan或者RPC进行通信

共享资源由原来的加锁变为通道
使用通道进行同步:使用一个通道接受需要处理的任务,一个通道接受处理完成的任务(及其结果)。worker在协程中启动,其数量N应该根据任务数量进行调整。

主线程扮演着Master节点角色,可能写成如下形式:
func main() {
    pending, done := make(chan *Task), make(chan *Task)
    go sendWork(pending)       // put tasks with work on the channel
    for i := 0; i < N; i++ {   // start N goroutines to do work
        go Worker(pending, done)
    }
    consumeWork(done)          // continue with the processed tasks
}

worker的逻辑比较简单:从pending通道拿任务,处理后将其放到done通道中:
func Worker(in, out chan *Task) {
    for {
        t := <-in
        process(t)
        out <- t
    }
}



从这个简单的例子中可能很难看出第二种模式的优势,但含有复杂锁运用的程序不仅在编写上显得困难,也不容易编写正确,使用第二种模式的话,
就无需考虑这么复杂的东西了。
因此,第二种模式对比第一种模式而言,不仅性能是一个主要优势,而且还有个更大的优势:代码显得更清晰、更优雅

使用锁的情景:
    访问共享数据结构中的缓存信息
    保存应用程序上下文和状态信息数据
使用通道的情景:
    与异步操作的结果进行交互
    分发任务
    传递数据所有权
当你发现你的锁使用规则变得很复杂时,可以反省使用通道会不会使问题变得简单些。


14.8 
就是python中的yield


14.9 
当程序需要两个计算,并且这两个计算没有前后依赖关系,可以使用协程并行计算
当开发一个计算密集型库时,使用Futures模式设计API接口是很有意义的。在你的包使用Futures模式,且能保持友好的API接口
下面的代码可以优化,当前两个函数没有关系的时候,可以并行处理
func InverseProduct(a Matrix, b Matrix) {
    a_inv := Inverse(a)
    b_inv := Inverse(b)
    return Product(a_inv, b_inv)
}

下面为优化的
func InverseProduct(a Matrix, b Matrix) {
    a_inv_future := InverseFuture(a)   // start as a goroutine
    b_inv_future := InverseFuture(b)   // start as a goroutine
    a_inv := <-a_inv_future
    b_inv := <-b_inv_future
    return Product(a_inv, b_inv)
}

InverseFuture函数起了一个goroutine协程,在其执行闭包运算,该闭包会将矩阵求逆结果放入到future通道中:
func InverseFuture(a Matrix) {
    future := make(chan Matrix)
    go func() {
        future <- Inverse(a)
    }()
    return future
}

上面是一种通用格式,其他的也可以套用Futures模式


14.10

下面的综合示例需要理解



14.11
限制同时处理的请求数
超过MAXREQS的请求将不会被同时处理,因为当信号通道表示缓冲区已满时handle函数会阻塞且不再处理其他请求,直到某个请求从sem中被移除。
sem就像一个信号量,这一专业术语用于在程序中表示特定条件的标志变量。
通过这种方式,应用程序可以通过使用缓冲通道(通道被用作信号量)使协程同步其对该资源的使用,从而充分利用有限的资源(如内存)
package main

const MAXREQS = 50

var sem = make(chan int, MAXREQS)

type Request struct {
	a, b   int
	replyc chan int
}

func process(r *Request) {
	// do something
}

func handle(r *Request) {
	sem <- 1 // doesn't matter what we put in it
	process(r)
	<-sem // one empty place in the buffer: the next request can start
}

func server(service chan *Request) {
	for {
		request := <-service
		go handle(request)
	}
}

func main() {
	service := make(chan *Request)
	go server(service)
}

未经允许不得转载:江哥架构师笔记 » 第14章总结:协程(goroutine)与通道(channel)

分享到:更多 ()

评论 抢沙发

评论前必须登录!