Go语言异步高并发编程的秘密:无锁,无条件变量,无回调( 二 )

那么上面的代码是如何处理三个关键问题的呢?

  • 首先关于关闭并退出loop
上述代码通过监听sub结构的closing属性,实现退出 。
//Close asks loop to exit and waits for a response.func (s *sub) Close() error {errc := make(chan error)s.closing <- errcreturn <-errc}当调用sub的Close方法时,s.closing会接收一个errc的通道,loop主体向errc中写入error信息并退出,调用sub的Close方法的客户端从errc中也同步收到error信息 。这是一个同步关闭的过程 。loop主体可以在给客户端发送error信息之前,可以完成一系列的关闭清理工作 。
  • 关于事件处理与调度
程序中设置的下一次获取元素的延迟调度的最小单位是10秒,从下面第22行可以看到,如果获取元素很快,没有耗费10秒,那么fetchDelay便有个时间gap,startFetch(第7行)这个时间通道便会通过time.After这个方法,在fetchDelay时间后,收到信号,完成18到25行的获取元素工作 。
var pending []Item // appended by fetch; consumed by sendvar next time.Time // initially January 1, year 0var err errorfor {var fetchDelay time.Duration // initially 0 (no delay)if now := time.Now(); next.After(now) {fetchDelay = next.Sub(now)}startFetch := time.After(fetchDelay)select {case <-startFetch:var fetched []Itemfetched, next, err = s.fetcher.Fetch()if err != nil {next = time.Now().Add(10 * time.Second)break}pending = append(pending, fetched...)}}问题:为了防止等待队列过大,所以只有当长度不超过maxPending,并且获取的数据已经入队了的时候,才会设置startFetch,否则就不触发fetch 。这块可以结合上面整个代码看看
var fetchDelay time.Durationif now := time.Now(); next.After(now) {fetchDelay = next.Sub(now)}var startFetch <-chan time.Timeif fetchDone == nil && len(pending) < maxPending {startFetch = time.After(fetchDelay) // enable fetch case}问题: Loop blocks on Fetch.
golang有个特性,就是Sends and receives on nil channels block.利用这个特性,当fetchDone是nil或者他里面没有准备好结果的时候,相关的case都会阻塞,那么select也不会选择它 。同时为了防止fetch函数阻塞loop主函数,通过启动协程(下面9-12行),再次提升主loop的性能 。
type fetchResult struct{ fetched []Item; next time.Time; err error }var fetchDone chan fetchResult // if non-nil, Fetch is runningvar startFetch <-chan time.Timeif fetchDone == nil && len(pending) < maxPending {startFetch = time.After(fetchDelay) // enable fetch case}select {case <-startFetch:fetchDone = make(chan fetchResult, 1)go func() {fetched, next, err := s.fetcher.Fetch()fetchDone <- fetchResult{fetched, next, err}}()case result := <-fetchDone:fetchDone = nil// Use result.fetched, result.next, result.err总结上面用到了3个技巧,如下所示:
  • for-select loop
  • service channel, reply channels (chan chan error)
  • nil channels in select cases
通过err,next,pending三个变量,就实现了在没有锁,条件变量,回调情况下,编写高效并发go程序的需求
参考文献:
https://go.dev/talks/2013/advconc.slide#43




推荐阅读