上次說了一下Go語言布道師 Dave Cheney對Go并發(fā)的建議 , 個人覺得最重要的一條 , 這次主要想說一下這個 。8.3. Never start a goroutine without knowning when it will stop(永遠(yuǎn)不要在不知道何時停止的情況下啟動 goroutine)我們的需求【完 golang開發(fā):go并發(fā)的建議】我這邊當(dāng)時有個需求是這樣的 , 我們有個考試系統(tǒng)的 , 每次學(xué)員答完試卷去檢查一下這次交卷是否是這次考試的最后一份試卷 , 如果是最后一份試卷的話 , 需要計算這次考試的總成績 , 生成考試的學(xué)習(xí)報告 , 當(dāng)然了 , 如果不是最后一份試卷的話啥也不干 。生成試卷和報告是必須要生成的 , 不能出現(xiàn)考完試了沒有總成績和總報告 。接到這個需求的時候 , 我首先想到的是使用golang的goroutine去異步算出成績生成報告 。然后寫代碼就是這樣的 。
go createReport()這不剛好是8.3 永遠(yuǎn)不要這樣寫的建議么?然后覺得應(yīng)該寫一個管理goroutine異步執(zhí)行任務(wù)的類庫 , 創(chuàng)建執(zhí)行銷毀都由這個管理工具去執(zhí)行 。準(zhǔn)備寫的時候發(fā)現(xiàn)B站的代碼里有一個這樣的類庫 , 異步執(zhí)行的類庫 。
B站的類庫B站代碼里面異步任務(wù)是這個文件openbilibili-go-common-master/library/sync/pipeline/fanout/fanout.go
var ( // ErrFull chan full. ErrFull = errors.New("fanout: chan full") stats = prom.BusinessInfoCount traceTags = []trace.Tag{trace.Tag{Key: trace.TagSpanKind, Value: "background"},trace.Tag{Key: trace.TagComponent, Value: "sync/pipeline/fanout"}, })type options struct { worker int buffer int}// Option fanout optiontype Option func(*options)// Worker specifies the worker of fanoutfunc Worker(n int) Option { if n <= 0 {panic("fanout: worker should > 0") } return func(o *options) {o.worker = n }}// Buffer specifies the buffer of fanoutfunc Buffer(n int) Option { if n <= 0 {panic("fanout: buffer should > 0") } return func(o *options) {o.buffer = n }}type item struct { f func(c context.Context) ctx context.Context}// Fanout async consume data from chan.type Fanout struct { name string ch chan item options *options waiter sync.WaitGroup ctx context.Context cancel func()}// New new a fanout struct.func New(name string, opts ...Option) *Fanout { if name == "" {name = "fanout" } o := &options{worker: 1,buffer: 1024, } for _, op := range opts {op(o) } c := &Fanout{ch: make(chan item, o.buffer),name: name,options: o, } c.ctx, c.cancel = context.WithCancel(context.Background()) c.waiter.Add(o.worker) for i := 0; i < o.worker; i++ {go c.proc() } return c}func (c *Fanout) proc() { defer c.waiter.Done() for {select {case t := <-c.ch:wrapFunc(t.f)(t.ctx)stats.State(c.name+"_channel", int64(len(c.ch)))case <-c.ctx.Done():return} }}func wrapFunc(f func(c context.Context)) (res func(context.Context)) { res = func(ctx context.Context) {defer func() {if r := recover(); r != nil {buf := make([]byte, 64*1024)buf = buf[:runtime.Stack(buf, false)]log.Error("panic in fanout proc, err: %s, stack: %s", r, buf)}}()f(ctx)if tr, ok := trace.FromContext(ctx); ok {tr.Finish(nil)} } return}// Do save a callback func.func (c *Fanout) Do(ctx context.Context, f func(ctx context.Context)) (err error) { if f == nil || c.ctx.Err() != nil {return c.ctx.Err() } nakeCtx := metadata.WithContext(ctx) if tr, ok := trace.FromContext(ctx); ok {tr = tr.Fork("", "Fanout:Do").SetTag(traceTags...)nakeCtx = trace.NewContext(nakeCtx, tr) } select { case c.ch <- item{f: f, ctx: nakeCtx}: default:err = ErrFull } stats.State(c.name+"_channel", int64(len(c.ch))) return}// Close close fanoutfunc (c *Fanout) Close() error { if err := c.ctx.Err(); err != nil {return err } c.cancel() c.waiter.Wait() return nil}使用方法 ca := New("cache", Worker(100), Buffer(1024)) var run bool ca.Do(context.Background(), func(c context.Context) {run = true })
經(jīng)驗總結(jié)擴展閱讀
- 松下和lg電視哪個好 看完你會傾向誰
- 西瓜和羊肉隔多長時間食用
- 駕校考完科目二還能退款嗎
- 三十七 Java開發(fā)學(xué)習(xí)----SpringBoot多環(huán)境配置及配置文件分類
- 🔥支持 Java 19 的輕量級應(yīng)用開發(fā)框架,Solon v1.10.4 發(fā)布
- 案例分享-https證書鏈不完整導(dǎo)致請求失敗
- 怎么判斷煙彈用完了
- 做完拔絲蘋果的鍋如何清洗
- 單擺怎么運動算一個周期
- 熱門完結(jié)小說
