go项目脚手架
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
sorbet/internal/runtime/goroutine.go

108 lines
2.0 KiB

package runtime
import (
"context"
"sorbet/pkg/log"
"sync"
)
var (
// 并发组控制器
wg sync.WaitGroup
// 运行时上下文对象
ctx context.Context
// 信号量,控制任务并发数量
semaphore chan struct{}
// 异步任务通道
tasks chan func(context.Context)
// 上下文取消函数
cancel context.CancelFunc
// 程序退出通道
exit chan chan error
)
func tryCancel() {
cancelFunc := cancel
if cancelFunc == nil {
log.Panic("context already cancelled")
}
cancel = nil
cancelFunc()
// 等待任务执行完成
for len(semaphore) > 0 {
work(nil)
}
// 理论上在没有信号量的情况下,是没有任务队列的。
for len(tasks) > 0 {
log.Panic("nonempty task channel")
}
// 销毁注册的服务
for i := len(servlets) - 1; i >= 0; i-- {
if err := servlets[i].Stop(); err != nil {
log.Warn("servlet Stop returned with error: ", err)
}
}
ctx = nil
}
// Go 异步任务
func Go(fn func(context.Context)) {
if semaphore == nil || ctx == nil {
log.Panic("not initialized")
return
}
select {
case semaphore <- struct{}{}:
// If we are below our limit, spawn a new worker rather
// than waiting for one to become available.
async(&wg, func(context.Context) {
work(fn)
})
case tasks <- fn:
// 放到任务队列中
// A worker is available and has accepted the task.
}
}
// Async 异步执行函数
func Async(fn func(context.Context)) {
async(&wg, fn)
}
// 执行任务
func work(f func(context.Context)) {
defer func() {
if semaphore != nil {
// 释放
<-semaphore
}
}()
// todo 能不能复用 sync.WaitGroup
g := &sync.WaitGroup{}
if f != nil {
Async(f)
}
for task := range tasks {
Async(task)
}
g.Wait()
}
// 异步执行函数
func async(wg *sync.WaitGroup, fn func(context.Context)) {
wg.Add(1)
go func() {
defer wg.Done()
if ctx == nil {
return
}
select {
case <-ctx.Done():
// 上下文一旦结束,任务将被忽略
log.Error("context done")
default:
fn(ctx)
}
}()
}