管道操作的定义

在 Go 中没有管道操作的正式定义,它仅仅是众多并发程序中的一种。

非正式地定义是,管道操作可以由一系列用channel连接的stage组成,每个stage是一组运行着相同函数的 Goroutine。 在每个stage中的 Goroutine 中,执行着以下的操作:

  • 通过inbound channel 从上游接收值
  • 对输入的值中执行一组操作,通常执行完以后会生成新的值
  • 通过outbound channel 向下游发送值

第一个和最后一个stage分别通常只有一组inboundoutbound channel,其他的stage可以有任意多个inboundoutbound channel。第一个stage通常叫做sourceproducer,最后一个stage通常叫做sinkconsumer

管道操作的简单例子

下面是一个管道操作的简单例子,一共分为三个stagegensqmain

  • gen 用来生成数字到管道中
  • sq 用来处理管道中的数字
  • main 用来消费(输出)管道中的数字
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package main

import "fmt"

func gen(nums ...int) <-chan int {
	out := make(chan int)
	go func() {
		for _, n := range nums {
			out <- n
		}
		close(out)
	}()

	return out
}

func sq(in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		for n := range in {
			out <- n * n
		}
		close(out)
	}()

	return out
}

func main() {
	c := gen(2, 3)
	out := sq(c)

	fmt.Println(<-out)
	fmt.Println(<-out)


	// 因为 sq 的输入和输出是相同的,所以我们可以把 sq 嵌套使用
	for n := range sq(sq(gen(2, 3))) {
		fmt.Println(n)
	}
}

扇入/扇出

配合下面这张图,我觉得更容易理解扇入/扇出的概念:

上面的图是一个门电路,看起来就像一个扇子。

  • 从左往右看,将多个管道的数据输出到一个管道中,这就叫做扇入。
  • 从右往左看,将一个管道的数据输出到多个管道中,这就叫做扇出。

下面是一个执行了扇入/扇出操作的例子,

  • gen生成的数据分别交个了两个sq去处理,这叫做扇入。
  • 两个sq处理的数据交给merge函数统一处理,这叫做扇出。

注意: 向关闭的channel发送数据将会导致panic,所以merge函数中使用了sync.WaitGroup来确保在关闭out channel 的时候,所有读取数据的 Goroutine 已经都执行完毕了。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
package main

import (
	"fmt"
	"sync"
)

func gen(nums ...int) <-chan int {
	out := make(chan int)
	go func() {
		for _, n := range nums {
			out <- n
		}
		close(out)
	}()

	return out
}

func sq(in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		for n := range in {
			out <- n * n
		}
		close(out)
	}()

	return out
}

func merge(cs ...<-chan int) <-chan int {
	var wg sync.WaitGroup
	out := make(chan int)

	output := func(c <-chan int) {
		for n := range c {
			out <- n
		}
		wg.Done()
	}
	wg.Add(len(cs))

	for _, c := range cs {
		go output(c)
	}

	go func() {
		wg.Wait()
		close(out)
	}()

	return out
}

func main() {
	in := gen(2, 3)

	// 将 in 中的数据分别分发到两个 Goroutine 中
	c1 := sq(in)
	c2 := sq(in)

	// 将 c1 和 c2 中的数据消费并合并到一起
	for n := range merge(c1, c2) {
		fmt.Println(n) // 4 then 9, or 9 then 4
	}
}