package runtime import ( "context" "sorbet/pkg/log" "sync" ) var ( // 并发组控制器 wg sync.WaitGroup // 运行时上下文对象 ctx context.Context // 信号量,控制任务并发数量 semaphore chan struct{} // 异步任务通道 tasks chan func(context.Context) // 上下文取消函数 cancel context.CancelFunc // 程序退出通道 exit chan chan error ) func tryCancel() { cancelFunc := cancel if cancelFunc == nil { log.Panic("context already cancelled") } cancel = nil cancelFunc() // 等待任务执行完成 for len(semaphore) > 0 { work(nil) } // 理论上在没有信号量的情况下,是没有任务队列的。 for len(tasks) > 0 { log.Panic("nonempty task channel") } // 销毁注册的服务 for i := len(servlets) - 1; i >= 0; i-- { if err := servlets[i].Stop(); err != nil { log.Warn("servlet Stop returned with error: ", err) } } ctx = nil } // Go 异步任务 func Go(fn func(context.Context)) { if semaphore == nil || ctx == nil { log.Panic("not initialized") return } select { case semaphore <- struct{}{}: // If we are below our limit, spawn a new worker rather // than waiting for one to become available. async(&wg, func(context.Context) { work(fn) }) case tasks <- fn: // 放到任务队列中 // A worker is available and has accepted the task. } } // Async 异步执行函数 func Async(fn func(context.Context)) { async(&wg, fn) } // 执行任务 func work(f func(context.Context)) { defer func() { if semaphore != nil { // 释放 <-semaphore } }() // todo 能不能复用 sync.WaitGroup g := &sync.WaitGroup{} if f != nil { Async(f) } for task := range tasks { Async(task) } g.Wait() } // 异步执行函数 func async(wg *sync.WaitGroup, fn func(context.Context)) { wg.Add(1) go func() { defer wg.Done() if ctx == nil { return } select { case <-ctx.Done(): // 上下文一旦结束,任务将被忽略 log.Error("context done") default: fn(ctx) } }() }