协程里的通信

在我们项目中经常遇到一些生产者-消费者问题,比如在我们提出的kafka组件,就是面对生产者与消费者的中间通信组件。我们在开发中常提到线程,常常通过线程来做并发处理。任何一个函数,或者任何一个组件,实际都是存在输入、输出。从其中将问题归纳,也是生产者-消费者。

在Golang中存在协程,在协程中怎么解决并发的问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
errChan := make(chan error, 10)
// 生产者
go func() {
defer close(errChan)
conCu := 10
wg := new(sync.WaitGroup)
wg.Add(conCu)
for i := 0; i < conCu; i++ {
go func(flag int) {
defer wg.Done()
println(flag)
if flag % 3 == 0 {
errChan <- fmt.Errorf("3, i=%d", flag)
}
}(i)
}
wg.Wait()
}()

// 消费者
for true {
err, ok := <- errChan
if !ok {
break
}
fmt.Printf("err=%s\n", err)
}

println("done")

代码中使用的并发方案,使用sync.WaitGroup与chan来进行协程间通信。

协程是什么

协程是一种比线程更加轻量级的存在,又言用户级协程。其对内核透明,系统并不知道协程存在,由用户程序进行调度控制,协程上下文切换也是完全由用户自己控制。

「Golang-协程调度原理」

协程间通信

常用的协程间通信方式由多种:

  1. 共享内存
  2. WaitGroup
  3. channel 通道

在Golang中推荐通过通道通信,而不推荐通过共享内存的方案进行通信。

在上述三者中,WaitGroup是基于CAS来实现,为什么使用CAS以及CAS解决了什么问题,这个我们后面再讲讲,这次中就不提及了。

channel使用要注意什么(内部原理)

channel底层数据结构中,有几个重要的点:

  • buf:数据缓存,指向一个环形缓冲区
  • recvq:接收者列表
  • sendq:发送者列表
  • lock:互斥锁,每次写入与消费数据都需要加索

从channel的语法中分为两种:有缓冲的channel与没有缓冲的channel,我个人理解是认为他们是相同的,唯一的区别缓冲buf为0,不能缓冲任何结果。

从上述描述中可以看出,在使用channel中需要注意些什么?

  1. 当channel中buf不够时,再向channel写入的时候,回阻塞协程
  2. channel使用完成后,需要close掉,否则消费channel的协程最后不会被销毁、回收
  3. channel在close之后,如果其中数据存在,依然可以从其中读取数据,直到最终监听到channel close的消息后

死锁

在使用channel的时候,需要避免的事死锁,什么时候会发生死锁的情况呢?其中channel中的缓冲区写满,在被消费完成前,再向channel中写入的时候,会阻塞当前协程,如果在阻塞之后没有协程会消费channel中的数据,则会发生死锁,协程也是因此阻塞。