赞
踩
errgroup为goroutine组提供了通知、错误传递、上下文取消功能。
一般在一组goroutine出现错误需要取消全部goroutine的情况使用到
errGroup主要有以下结构
有以下方法
WithContext(ctx context.Context) (*Group, context.Context): 使传入context转换为一个可取消的context,并将cancel方法传入errGroup
func WithContext(ctx context.Context) (*Group, context.Context) {
ctx, cancel := context.WithCancel(ctx)
return &Group{cancel: cancel}, ctx
}
Go(f func() error): 保存errGroup传入方法f的第一次错误,并取消errGroup中的context
func (g *Group) Go(f func() error) { if g.sem != nil { g.sem <- token{} } g.wg.Add(1) go func() { defer g.done() if err := f(); err != nil { g.errOnce.Do(func() { g.err = err if g.cancel != nil { g.cancel() } }) } }() } func (g *Group) done() { if g.sem != nil { <-g.sem } g.wg.Done() }
TryGo(f func() error) bool: 能返回开启协程额度是否足够的Go(f func() error)方法
Wait() error: 等待errGroup Go中方法运行结束
func (g *Group) Wait() error {
g.wg.Wait()
if g.cancel != nil {
g.cancel()
}
return g.err
}
SetLimit(n int): 设置限制活动goroutine的最大数量(负数代表没有限制)。通过固定长度n的channel是实现的,setLimit设置固定额度,每个活动的goroutine消费该额度,然后处理完之后释放额度
func (g *Group) SetLimit(n int) {
if n < 0 {
g.sem = nil
return
}
if len(g.sem) != 0 {
panic(fmt.Errorf("errgroup: modify limit while %v goroutines in the group are still active", len(g.sem)))
}
g.sem = make(chan token, n)
}
以下是官方利用errGroup开启1个goroutine检查文件和20个goroutine遍历目标路径文件计算md5 sum。若只要有一个文件检查出现就直接返回错误,终止后面20个goroutine的sum计算
func ExampleGroup_pipeline() { m, err := MD5All(context.Background(), ".") if err != nil { log.Fatal(err) } for k, sum := range m { fmt.Printf("%s:\t%x\n", k, sum) } } type result struct { path string sum [md5.Size]byte } // MD5All reads all the files in the file tree rooted at root and returns a map // from file path to the MD5 sum of the file's contents. If the directory walk // fails or any read operation fails, MD5All returns an error. func MD5All(ctx context.Context, root string) (map[string][md5.Size]byte, error) { // ctx is canceled when g.Wait() returns. When this version of MD5All returns // - even in case of error! - we know that all of the goroutines have finished // and the memory they were using can be garbage-collected. g, ctx := errgroup.WithContext(ctx) paths := make(chan string) g.Go(func() error { defer close(paths) return filepath.Walk(root, func(path string, info os.FileInfo, err error) error { if err != nil { return err } if !info.Mode().IsRegular() { return nil } select { case paths <- path: case <-ctx.Done(): return ctx.Err() } return nil }) }) // Start a fixed number of goroutines to read and digest files. c := make(chan result) const numDigesters = 20 for i := 0; i < numDigesters; i++ { g.Go(func() error { for path := range paths { data, err := ioutil.ReadFile(path) if err != nil { return err } select { case c <- result{path, md5.Sum(data)}: case <-ctx.Done(): return ctx.Err() } } return nil }) } go func() { g.Wait() close(c) }() m := make(map[string][md5.Size]byte) for r := range c { m[r.path] = r.sum } // Check whether any of the goroutines failed. Since g is accumulating the // errors, we don't need to send them (or check for them) in the individual // results sent on the channel. if err := g.Wait(); err != nil { return nil, err } return m, nil }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。