实际案例解读golang的context机制
一些处理http请求的库,或者涉及到网络IO的库比如rpc的库,都会涉及到golang里面的context,这玩意究竟干啥的?在golang源码包中src/context/context.go中,一共也没几个方法,核心就是WithXXXX函数和Context接口
type Context interface {
Deadline() (deadline time.Time, ok bool)
Done() <-chan struct{}
Err() error
Value(key interface{}) interface{}
}
- Deadline方法返回被取消的时间,也就是调用WithDealine或者WithTimeout传入的时间
- Err方法返回Context结束的原因,这个地方有个细节,如果Done返回的Channel没有被关闭的话会返回nil空值,如果Done返回的Channel被关闭后,就会返回具体的结束原因:
- 如果当前Context被取消就会返回Canceled错误
- 如果当前Context超时就会返回DeadlineExceeded错误
- Value方法返回func WithValue(parent Context, key, val interface{})调用是传入的val数据,这个是通过Context在多个goroutine之间传递一个数据使用的,多个goroutine我们用一个专门的属于叫做“上下文”
要严重注意Context接口中的Done方法返回的是一个chan对象,如果能够从这个chan中读取到内容,就表示调用者想要取消本次操作了,究竟是什么意思呢,什么叫调用者要取消本次操作?
大白话: 当需要在多个协程之间进行通讯的时候,尤其是主协程和N个子协程之间,涉及到所谓的通讯,这个通讯不是说用来传输数据,传输视频,传输图片,也不是所谓的rpc调用,你看Context.Withxxxx这些方法的返回值都是(ctx Context, cancel CancelFunc),那就要思考,除了最后一个WithValue,前面3个为什么都会返回一个CancelFunc呢,而CancelFunc是一个啥?它是一个函数,我们看它的源码,从源码注释上可以看出端倪,写代码的时候就按照下面步骤来写
1. 写代码的时候,先用Context.Withxxxxx得到一个Context对象和一个CancelFunc
2. 把得到的Context对象传入一个另外的协程
3. 主协程可以根据自己的需要调用CancelFunc
4. 子协程在自己的代码中用select语法检查Context对象的Done方法返回的通道,看看能否从通道中读取到内容,如果能够读取到,就表示主协程已经调用了CancelFunc了,能够读取到的话,就可以return出去了
// A CancelFunc tells an operation to abandon its work.
// A CancelFunc does not wait for the work to stop.
// A CancelFunc may be called by multiple goroutines simultaneously.
// After the first call, subsequent calls to a CancelFunc do nothing.
type CancelFunc func()
func Stream(ctx context.Context, out chan<- Value) error {
for {
v, err := DoSomething(ctx)
if err != nil {
return err
}
select {
case <-ctx.Done():
return ctx.Err()
case out <- v:
}
}
}
也就是说可以调用这个CancelFunc函数来取消Context,一旦取消了,其它的goroutine通过ctx.Done()方法就会得到通知,我再总结一下用法:
- 主协程通过context.Withxxxxx之类的方法创建Context对象
- 主协程创建其它协程,用go函数调用方法,就是创建协程了,然后把第一步创建出来的Context对象传递给这些子协程
- 子协程中通过 <- ctx.Done()方法来判断,父协程是否取消了这个上下文,也就是说父协程是否调用了CancelFunc方法,或者是Context是否自动到期了,比如创建Context对象的时候是调用WithDeadline,WithTimeout方法创建的,那么时间一到,这个Context对象就会自动超时,那么其它子协程自然就知道了
子协程中能否从 <- ctx.Done() 通道中读取到内容,取决于2点:
1. 父协程有没有调用CancelFunc函数
2. 如果是用的WithDeadline、WithTimeout创建的Context,就看时间有没有到,如果时间一到,<- ctx.Done() 通道中就能读取到内容
刚才说了这么多,其实是从父协程的角度出发来描述讲解的,那么从子协程写代码的角度,都是用select关键字来对通道进行检查的。
func TestContext1(t *testing.T) {
parentCtx := context.Background()
timeoutCtx, cancelFunc := context.WithTimeout(parentCtx, 2 *time.Second)
defer cancelFunc()
fmt.Println("开始执行测试用例,当前时间:", time.Now().Format("2006-01-02 15:04:05"))
//通过select关键字检查2个通道,哪个通道先读取到内容,就执行哪个case代码块
select {
case <-time.After(5 * time.Second): //5秒后超时
fmt.Println("5秒时间到了,当前时间:", time.Now().Format("2006-01-02 15:04:05"))
case <- timeoutCtx.Done(): //2秒就会超时,所以select判断会进入到这个分支
fmt.Println(timeoutCtx.Err(), "当前时间:", time.Now().Format("2006-01-02 15:04:05"))
}
}
实际应用场景
比如一个客户端请求过来,服务端逻辑中可能需要调用其他的rpc服务,或者读写数据库,而调用其他rpc服务、读写数据库的时间都不是固定的,有可能会时间比较长,这会导致从客户端调用方的视角来看,时间比较长。所以我们提供给客户端调用的函数,一般可以设置为传入超时时间,如果超过指定的时间方法都没有结束,那么这个方法就会自动超时,客户端可以自己酌情处理,比如友好地提示用户:系统繁忙,请稍后再试。
此时就涉及到设计支持主动取消或者自动超时的API,提供给客户端调用,我们的目标是当主动取消或者自动超时时间到了以后,能够自动通知调用链上的所有goroutine,让它们迅速退出,这样的话,golang的运行时才能够释放这些goroutine占用的资源。
专门用来简化 对于处理单个请求的多个 goroutine 之间与请求域的数据、取消信号、截止时间等相关操作,这些操作可能涉及多个 API 调用,也就是说一个函数内部比较复杂,涉及到调用多个其他的goroutine,所以要和这些goroutine之间通过Context进行控制交互。
这种用在处理http请求的地方很多,对服务器传入的请求应该创建上下文,而对服务器的传出调用应该接受上下文。它们之间的函数调用链必须传递上下文,或者可以使用WithCancel、WithDeadline、WithTimeout或WithValue创建的派生上下文。当一个上下文被取消时,它派生的所有上下文也被取消
这个地方涉及到一个细节,就是写代码的时候,往往是主goroutine会开启子goroutine,子goroutine也会开启孙子goroutine,甚至继续往下面开启goroutine,这就像一棵树了。
现在想要从根goroutine开始,取消所有儿子、孙子、重孙....辈的goroutine,该如何实现?具体的做法就是根goroutine的代码中要创建一个Context,然后把这个Context对象层层传递下去,传遍所有的儿子、孙子...辈的goroutine
这样也就意味着所有的goroutine都拿到了根代码创建的Context对象,那么只要根代码中对Context发起一次Cancel操作或者Context自动超时(WithDeadline、WithTimeout创建的Context),那么所有儿子、孙子辈的goroutine都可以得到通知。
具体的通知方式就是,儿子、孙子辈的goroutine代码中都用select对Context的Done()方法返回的通道进行监听,所有的代码都是类似这样的:
select {
case <-ctx.Done(): //等待上级通知
fmt.Println("根goroutine主动取消或者Context超时了,我们不能继续执行了,该退出了"
}
//这是一个死循环用于生成递增的整数,不停地往chan里面放(最大容量1000),然后把chan返回给调用者,调用者可以从这个chan中读取整数
func gen(ctx context.Context) <-chan int {
dst := make(chan int, 1000)
n := 1
go func() {
for {//这是一个死循环,什么时候结束,取决于父goroutine是否cancel掉传入的ctx上下文
select {
case <-ctx.Done():
return // return结束该goroutine,防止泄露
case dst <- n:
fmt.Println("现在往通道中放一个整数:", n)
n++
}
}
}()
return dst
}
func TestContext2(t *testing.T) {
//ctx, cancel := context.WithCancel(context.Background())
//defer cancel() // 当我们取完需要的整数后调用cancel
ctx, _ := context.WithTimeout(context.Background(), 5 * time.Second) //5秒后自动超时
for n := range gen(ctx) {
fmt.Println(n)
//if n == 5 {
// break
//}
if ctx.Err() != nil { //不等于nil就表示上下文已经自动超时或者被手动cancel掉了
fmt.Println(ctx.Err()) //会输出 context deadline exceeded
break
}
time.Sleep(1 * time.Second)
}
}
搞段样本代码试验一下,实现带超时的http客户端请求
这个案例是用Context做超时控制的典型案例,这个代码就是一种套路代码,其他的场景都是这样编程:
1. 有一个http服务端,请求它的时候,有的时候快,有的时候会卡好10秒才返回,比如 https://www.baidu.com很快,https://github.com很慢
2. 客户端发起http请求调用的时候,就要实现超时控制,不能无限等待,最多等待500毫秒,500毫秒一到,就返回
3. 现在来规划一下代码怎么实现
客户端发起http调用,这个就很关键,要先有思路,怎么写程序,没有思路屁用都没有
1. 因为http.DefaultClient.Do()方法是阻塞调用,直接返回*Response,如果服务端卡住的话,这个Do调用就会卡住,所以我们肯定不能直接在主协程中调用Do方法
2. 必须另外搞一个goroutine去调用http.DefaultClient.Do()方法
3. 这就带来一个问题,子协程在调用http.DefaultClient.Do()方法后,怎么把*Response返回给主协程呢
4. 这就是golang所推崇的所谓的chan了,其实就和java里面的阻塞队列差不多(BlockingQueue),预先构造出一个chan,假设叫respChan,然后主协程等待在这个chan上,子协程拿到*Response后塞进这个chan
5. 考虑到主协程如果一直在这个chan上等待,那么必然会死等,这个时候怎么办呢
6. 一开始主协程就要构造一个支持超时的Context对象, ctx, cancel := context.WithTimeout(context.Background(), 500 * time.Millisecond),
然后用select关键字,同时对这个respChan和ctx.Done()返回的chan进行监控,哪个先取到数据,就先返回,看select语法怎么写就知道了
7. 核心:我们知道ctx.Done()返回的chan是受超时控制的,时间一到,调用 <- ctx.Done()必然会返回
//封装一下,子程序读取到数据后,构造一个InvokeResponse对象,塞入chan
type InvokeResponse struct {
resp *http.Response
err error
}
func Client(ctx context.Context, weburl string) {
//1. 这个chan就是主协程和子协程交换数据的核心了,其实就是个阻塞队列罢了
respChan := make(chan *InvokeResponse, 1)
//2. 发http请求,为了不阻塞主协程,用go关键字启动子协程
request, err := http.NewRequestWithContext(ctx, http.MethodGet, weburl, nil)
if err != nil {
fmt.Printf("http.NewRequest failed, err: %v\n", err)
return
}
//注意,为了不让当前请求阻塞,我们需要在另外一个协程中发起http请求
go func() {
response, err := http.DefaultClient.Do(request)
result := &InvokeResponse{
resp: response,
err: err,
}
//写入chan通道
respChan <- result
}()
//此时主协程继续往下走,接下来写什么代码呢,既然另外一个协程在发起http请求了,我们主协程自然而然就是等待数据了
//在respChan上等待数据,有可能因为服务端卡死导致客户端会死等,所以下面这行代码不能这样写
//httpResponse := <- respChan
//4. 换成golang的select写法
select {
case <-ctx.Done(): //100毫秒自动超时
fmt.Println("call http api timeout")
case httpResponse := <- respChan:
fmt.Println("call http api success")
if httpResponse.err != nil {
fmt.Printf("call server api failed, err:%v\n", httpResponse.err)
return
}
defer httpResponse.resp.Body.Close()
content, _ := ioutil.ReadAll(httpResponse.resp.Body)
fmt.Printf("收到http响应:%s\n", string(content))
}
}
func TestInvoke1(t *testing.T) {
//对于访问百度来说,500毫秒足够了,所以百度的响应可以在500毫秒内打开
ctx, cancel := context.WithTimeout(context.Background(), 500 * time.Millisecond)
defer cancel()
Client(ctx, "https://www.baidu.com")
fmt.Printf("\n\n\n")
//测试比对,访问github因为速度很慢,所以就容易超时,我们设置超时时间为2秒(2秒往往打不开github.com),所以必然超时,但是客户端并不会卡死
ctx2, cancel2 := context.WithTimeout(context.Background(), 3000 * time.Millisecond)
defer cancel2()
Client(ctx2, "https://github.com")
}
以上,总结了golangContext的作用,码字真累!