You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
108 lines
2.0 KiB
108 lines
2.0 KiB
package runtime
|
|
|
|
import (
|
|
"context"
|
|
"sorbet/pkg/log"
|
|
"sync"
|
|
)
|
|
|
|
var (
|
|
// 并发组控制器
|
|
wg sync.WaitGroup
|
|
// 运行时上下文对象
|
|
ctx context.Context
|
|
// 信号量,控制任务并发数量
|
|
semaphore chan struct{}
|
|
// 异步任务通道
|
|
tasks chan func(context.Context)
|
|
// 上下文取消函数
|
|
cancel context.CancelFunc
|
|
// 程序退出通道
|
|
exit chan chan error
|
|
)
|
|
|
|
func tryCancel() {
|
|
cancelFunc := cancel
|
|
if cancelFunc == nil {
|
|
log.Panic("context already cancelled")
|
|
}
|
|
cancel = nil
|
|
cancelFunc()
|
|
// 等待任务执行完成
|
|
for len(semaphore) > 0 {
|
|
work(nil)
|
|
}
|
|
// 理论上在没有信号量的情况下,是没有任务队列的。
|
|
for len(tasks) > 0 {
|
|
log.Panic("nonempty task channel")
|
|
}
|
|
// 销毁注册的服务
|
|
for i := len(servlets) - 1; i >= 0; i-- {
|
|
if err := servlets[i].Stop(); err != nil {
|
|
log.Warn("servlet Stop returned with error: ", err)
|
|
}
|
|
}
|
|
ctx = nil
|
|
}
|
|
|
|
// Go 异步任务
|
|
func Go(fn func(context.Context)) {
|
|
if semaphore == nil || ctx == nil {
|
|
log.Panic("not initialized")
|
|
return
|
|
}
|
|
select {
|
|
case semaphore <- struct{}{}:
|
|
// If we are below our limit, spawn a new worker rather
|
|
// than waiting for one to become available.
|
|
async(&wg, func(context.Context) {
|
|
work(fn)
|
|
})
|
|
case tasks <- fn:
|
|
// 放到任务队列中
|
|
// A worker is available and has accepted the task.
|
|
}
|
|
}
|
|
|
|
// Async 异步执行函数
|
|
func Async(fn func(context.Context)) {
|
|
async(&wg, fn)
|
|
}
|
|
|
|
// 执行任务
|
|
func work(f func(context.Context)) {
|
|
defer func() {
|
|
if semaphore != nil {
|
|
// 释放
|
|
<-semaphore
|
|
}
|
|
}()
|
|
|
|
// todo 能不能复用 sync.WaitGroup
|
|
g := &sync.WaitGroup{}
|
|
if f != nil {
|
|
Async(f)
|
|
}
|
|
for task := range tasks {
|
|
Async(task)
|
|
}
|
|
g.Wait()
|
|
}
|
|
|
|
// 异步执行函数
|
|
func async(wg *sync.WaitGroup, fn func(context.Context)) {
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
if ctx == nil {
|
|
return
|
|
}
|
|
select {
|
|
case <-ctx.Done():
|
|
// 上下文一旦结束,任务将被忽略
|
|
log.Error("context done")
|
|
default:
|
|
fn(ctx)
|
|
}
|
|
}()
|
|
}
|
|
|