From faded4e634563291c0cb419df16ddbc681627925 Mon Sep 17 00:00:00 2001 From: hupeh Date: Thu, 19 Oct 2023 20:03:35 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E9=87=8D=E6=9E=84=E8=BF=90=E8=A1=8C?= =?UTF-8?q?=E6=97=B6=EF=BC=8C=E6=94=AF=E6=8C=81=E5=B0=86=E6=9C=8D=E5=8A=A1?= =?UTF-8?q?=E7=BB=84=E4=BB=B6=E5=8C=96=E5=BC=80=E5=8F=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 2 + internal/init.go | 47 ++--- internal/middleware/cors.go | 8 +- internal/middleware/key_auth.go | 74 ------- internal/middleware/middleware.go | 24 --- internal/middleware/rate_limiter.go | 268 -------------------------- internal/middleware/recover.go | 24 +-- internal/runtime/goroutine.go | 108 +++++++++++ internal/runtime/runtime.go | 90 +++++++++ internal/runtime/server.go | 121 ++++++++++++ internal/runtime/servlet.go | 69 +++++++ internal/services/company/service.go | 25 ++- internal/services/config/service.go | 23 ++- internal/services/feature/service.go | 29 ++- internal/services/resource/service.go | 23 ++- internal/services/service.go | 72 ------- internal/services/system/service.go | 31 ++- internal/util/echo_logger.go | 2 +- main.go | 23 ++- pkg/app/app.go | 128 ------------ pkg/app/appkit/bus.go | 5 - pkg/app/appkit/codec.go | 81 -------- pkg/app/bus.go | 215 --------------------- pkg/app/lifecycle.go | 83 -------- pkg/app/stats.go | 44 ----- pkg/app/udp.go | 63 ------ pkg/app/util.go | 101 ---------- pkg/crud/context.go | 53 ----- pkg/crud/controller.go | 4 +- pkg/crud/service.go | 20 -- pkg/log/log.go | 4 + pkg/log/logger.go | 7 + 32 files changed, 539 insertions(+), 1332 deletions(-) delete mode 100644 internal/middleware/key_auth.go delete mode 100644 internal/middleware/middleware.go delete mode 100644 internal/middleware/rate_limiter.go create mode 100644 internal/runtime/goroutine.go create mode 100644 internal/runtime/runtime.go create mode 100644 internal/runtime/server.go create mode 100644 internal/runtime/servlet.go delete mode 100644 internal/services/service.go delete mode 100644 pkg/app/app.go delete mode 100644 pkg/app/appkit/bus.go delete mode 100644 pkg/app/appkit/codec.go delete mode 100644 pkg/app/bus.go delete mode 100644 pkg/app/lifecycle.go delete mode 100644 pkg/app/stats.go delete mode 100644 pkg/app/udp.go delete mode 100644 pkg/app/util.go delete mode 100644 pkg/crud/context.go delete mode 100644 pkg/crud/service.go diff --git a/.gitignore b/.gitignore index 44c3a55..5ba633f 100644 --- a/.gitignore +++ b/.gitignore @@ -5,6 +5,8 @@ .env.* !.env.example /docs +/tmp +/.air.toml .DS_Store *.log TODOs.md diff --git a/internal/init.go b/internal/init.go index 0cd1d11..b50072c 100644 --- a/internal/init.go +++ b/internal/init.go @@ -2,32 +2,32 @@ package internal import ( "errors" - "fmt" - "github.com/labstack/echo/v4" "sorbet/internal/entities" - "sorbet/internal/middleware" - "sorbet/internal/util" - "sorbet/pkg/app" + "sorbet/internal/runtime" + "sorbet/internal/services/company" + "sorbet/internal/services/config" + "sorbet/internal/services/feature" + "sorbet/internal/services/resource" + "sorbet/internal/services/system" "sorbet/pkg/db" "sorbet/pkg/env" "sorbet/pkg/log" - "sorbet/pkg/rsp" ) func Init() error { - // 同步数据库结构 - if err := syncEntities(); err != nil { + err := syncEntities() + if err != nil { if !errors.Is(err, db.ErrNoCodeFirst) { return err } if !env.IsEnv("prod") { - log.Error("同步数据表结构需要开启 [DB_CODE_FIRST],在生产模式下请务必关闭。") + log.Warn("同步数据表结构需要开启 [DB_CODE_FIRST],在生产模式下请务必关闭。") } } - app.OnStart(startServer) - return nil + return useServlets() } +// 同步数据库结构 func syncEntities() error { return db.Sync( &entities.Company{}, @@ -52,21 +52,12 @@ func syncEntities() error { ) } -func startServer() { - e := echo.New() - e.HideBanner = true - e.HidePort = true - e.HTTPErrorHandler = rsp.HTTPErrorHandler - e.Logger = util.NewLogger() - e.Use(middleware.Recover()) - e.Use(middleware.CORS()) - e.Use(middleware.Logger) - - port := env.String("SERVER_PORT", "1324") - addr := fmt.Sprintf(":%s", port) - // TODO(hupeh): 验证 addr 是否合法 - e.Logger.Error(e.Start(addr)) - - // 通知应用退出 - app.Stop() +func useServlets() error { + return runtime.Use( + &config.Service{}, + &company.Service{}, + &resource.Service{}, + &feature.Service{}, + &system.Service{}, + ) } diff --git a/internal/middleware/cors.go b/internal/middleware/cors.go index 975aa7e..351320d 100644 --- a/internal/middleware/cors.go +++ b/internal/middleware/cors.go @@ -8,7 +8,7 @@ import ( type CORSConfig struct { // Skipper defines a function to skip middleware. - Skipper Skipper + Skipper func(c echo.Context) bool // AllowOrigins determines the value of the Access-Control-Allow-Origin // response header. This header defines a list of origins that may access the @@ -59,7 +59,7 @@ type CORSConfig struct { // AllowCredentials determines the value of the // Access-Control-Allow-Credentials response header. This header indicates - // whether or not the response to the request can be exposed when the + // whether the response to the request can be exposed when the // credentials mode (Request.credentials) is true. When used as part of a // response to a preflight request, this indicates whether or not the actual // request can be made using credentials. See also @@ -103,7 +103,9 @@ type CORSConfig struct { // DefaultCORSConfig is the default CORS middleware config. var DefaultCORSConfig = CORSConfig{ - Skipper: middleware.DefaultSkipper, + Skipper: func(c echo.Context) bool { + return false + }, AllowOrigins: []string{"*"}, AllowMethods: []string{ http.MethodGet, diff --git a/internal/middleware/key_auth.go b/internal/middleware/key_auth.go deleted file mode 100644 index b5c8351..0000000 --- a/internal/middleware/key_auth.go +++ /dev/null @@ -1,74 +0,0 @@ -package middleware - -import ( - "github.com/labstack/echo/v4" - "github.com/labstack/echo/v4/middleware" -) - -// KeyAuthValidator defines a function to validate KeyAuth credentials. -type KeyAuthValidator = middleware.KeyAuthValidator - -// KeyAuthErrorHandler defines a function which is executed for an invalid key. -type KeyAuthErrorHandler = middleware.KeyAuthErrorHandler - -// KeyAuthConfig defines the config for KeyAuth middleware. -type KeyAuthConfig struct { - Skipper Skipper - - // KeyLookup is a string in the form of ":" or ":,:" that is used - // to extract key from the request. - // Optional. Default value "header:Authorization". - // Possible values: - // - "header:" or "header::" - // `` is argument value to cut/trim prefix of the extracted value. This is useful if header - // value has static prefix like `Authorization: ` where part that we - // want to cut is ` ` note the space at the end. - // In case of basic authentication `Authorization: Basic ` prefix we want to remove is `Basic `. - // - "query:" - // - "form:" - // - "cookie:" - // Multiple sources example: - // - "header:Authorization,header:X-Api-Key" - KeyLookup string - - // AuthScheme to be used in the Authorization header. - // Optional. Default value "Bearer". - AuthScheme string - - // Validator is a function to validate key. - // Required. - Validator KeyAuthValidator - - // ErrorHandler defines a function which is executed for an invalid key. - // It may be used to define a custom error. - ErrorHandler KeyAuthErrorHandler - - // ContinueOnIgnoredError allows the next middleware/handler to be called when ErrorHandler decides to - // ignore the error (by returning `nil`). - // This is useful when parts of your site/api allow public access and some authorized routes provide extra functionality. - // In that case you can use ErrorHandler to set a default public key auth value in the request context - // and continue. Some logic down the remaining execution chain needs to check that (public) key auth value then. - ContinueOnIgnoredError bool -} - -// DefaultKeyAuthConfig is the default KeyAuth middleware config. -var DefaultKeyAuthConfig = KeyAuthConfig{ - Skipper: DefaultSkipper, - KeyLookup: "header:" + echo.HeaderAuthorization, - AuthScheme: "Bearer", -} - -func (a *KeyAuthConfig) ToMiddleware() echo.MiddlewareFunc { - return middleware.KeyAuthWithConfig(middleware.KeyAuthConfig{ - Skipper: a.Skipper, - KeyLookup: a.KeyLookup, - AuthScheme: a.AuthScheme, - Validator: a.Validator, - ErrorHandler: a.ErrorHandler, - ContinueOnIgnoredError: a.ContinueOnIgnoredError, - }) -} - -func KeyAuth() echo.MiddlewareFunc { - return DefaultKeyAuthConfig.ToMiddleware() -} diff --git a/internal/middleware/middleware.go b/internal/middleware/middleware.go deleted file mode 100644 index 27a5914..0000000 --- a/internal/middleware/middleware.go +++ /dev/null @@ -1,24 +0,0 @@ -package middleware - -import ( - "github.com/labstack/echo/v4" - "github.com/labstack/echo/v4/middleware" -) - -// Skipper defines a function to skip middleware. Returning true skips processing -// the middleware. -type Skipper = middleware.Skipper - -// BeforeFunc defines a function which is executed just before the middleware. -type BeforeFunc = middleware.BeforeFunc - -type ValuesExtractor = middleware.ValuesExtractor - -type ToMiddleware interface { - ToMiddleware() echo.MiddlewareFunc -} - -// DefaultSkipper returns false which processes the middleware. -func DefaultSkipper(echo.Context) bool { - return false -} diff --git a/internal/middleware/rate_limiter.go b/internal/middleware/rate_limiter.go deleted file mode 100644 index 4d87a9a..0000000 --- a/internal/middleware/rate_limiter.go +++ /dev/null @@ -1,268 +0,0 @@ -package middleware - -import ( - "github.com/labstack/echo/v4/middleware" - "net/http" - "sync" - "time" - - "github.com/labstack/echo/v4" - "golang.org/x/time/rate" -) - -// RateLimiterStore is the interface to be implemented by custom stores. -type RateLimiterStore interface { - // Allow Stores for the rate limiter have to implement the Allow method - Allow(identifier string) (bool, error) -} - -type ( - // RateLimiterConfig defines the configuration for the rate limiter - RateLimiterConfig struct { - Skipper Skipper - BeforeFunc middleware.BeforeFunc - // IdentifierExtractor uses echo.Context to extract the identifier for a visitor - IdentifierExtractor Extractor - // Store defines a store for the rate limiter - Store RateLimiterStore - // ErrorHandler provides a handler to be called when IdentifierExtractor returns an error - ErrorHandler func(context echo.Context, err error) error - // DenyHandler provides a handler to be called when RateLimiter denies access - DenyHandler func(context echo.Context, identifier string, err error) error - } - // Extractor is used to extract data from echo.Context - Extractor func(context echo.Context) (string, error) -) - -// errors -var ( - // ErrRateLimitExceeded denotes an error raised when rate limit is exceeded - ErrRateLimitExceeded = echo.NewHTTPError(http.StatusTooManyRequests, "rate limit exceeded") - // ErrExtractorError denotes an error raised when extractor function is unsuccessful - ErrExtractorError = echo.NewHTTPError(http.StatusForbidden, "error while extracting identifier") -) - -// DefaultRateLimiterConfig defines default values for RateLimiterConfig -var DefaultRateLimiterConfig = RateLimiterConfig{ - Skipper: middleware.DefaultSkipper, - IdentifierExtractor: func(ctx echo.Context) (string, error) { - id := ctx.RealIP() - return id, nil - }, - ErrorHandler: func(context echo.Context, err error) error { - return &echo.HTTPError{ - Code: ErrExtractorError.Code, - Message: ErrExtractorError.Message, - Internal: err, - } - }, - DenyHandler: func(context echo.Context, identifier string, err error) error { - return &echo.HTTPError{ - Code: ErrRateLimitExceeded.Code, - Message: ErrRateLimitExceeded.Message, - Internal: err, - } - }, -} - -/* -RateLimiter returns a rate limiting middleware - - e := echo.New() - - limiterStore := middleware.NewRateLimiterMemoryStore(20) - - e.GET("/rate-limited", func(c echo.Context) error { - return c.String(http.StatusOK, "test") - }, RateLimiter(limiterStore)) -*/ -func RateLimiter(store RateLimiterStore) echo.MiddlewareFunc { - config := DefaultRateLimiterConfig - config.Store = store - return config.ToMiddleware() -} - -/* -ToMiddleware returns a rate limiting middleware - - e := echo.New() - - config := middleware.RateLimiterConfig{ - Skipper: DefaultSkipper, - Store: middleware.NewRateLimiterMemoryStore( - middleware.RateLimiterMemoryStoreConfig{Rate: 10, Burst: 30, ExpiresIn: 3 * time.Minute} - ) - IdentifierExtractor: func(ctx echo.Context) (string, error) { - id := ctx.RealIP() - return id, nil - }, - ErrorHandler: func(context echo.Context, err error) error { - return context.JSON(http.StatusTooManyRequests, nil) - }, - DenyHandler: func(context echo.Context, identifier string) error { - return context.JSON(http.StatusForbidden, nil) - }, - } - - e.GET("/rate-limited", func(c echo.Context) error { - return c.String(http.StatusOK, "test") - }, middleware.RateLimiterWithConfig(config)) -*/ -func (config *RateLimiterConfig) ToMiddleware() echo.MiddlewareFunc { - if config.Skipper == nil { - config.Skipper = DefaultSkipper - } - if config.IdentifierExtractor == nil { - config.IdentifierExtractor = DefaultRateLimiterConfig.IdentifierExtractor - } - if config.ErrorHandler == nil { - config.ErrorHandler = DefaultRateLimiterConfig.ErrorHandler - } - if config.DenyHandler == nil { - config.DenyHandler = DefaultRateLimiterConfig.DenyHandler - } - if config.Store == nil { - panic("Store configuration must be provided") - } - return func(next echo.HandlerFunc) echo.HandlerFunc { - return func(c echo.Context) error { - if config.Skipper(c) { - return next(c) - } - if config.BeforeFunc != nil { - config.BeforeFunc(c) - } - - identifier, err := config.IdentifierExtractor(c) - if err != nil { - c.Error(config.ErrorHandler(c, err)) - return nil - } - - if allow, err := config.Store.Allow(identifier); !allow { - c.Error(config.DenyHandler(c, identifier, err)) - return nil - } - return next(c) - } - } -} - -type ( - // RateLimiterMemoryStore is the built-in store implementation for RateLimiter - RateLimiterMemoryStore struct { - visitors map[string]*Visitor - mutex sync.Mutex - rate rate.Limit // for more info check out Limiter docs - https://pkg.go.dev/golang.org/x/time/rate#Limit. - - burst int - expiresIn time.Duration - lastCleanup time.Time - - timeNow func() time.Time - } - // Visitor signifies a unique user's limiter details - Visitor struct { - *rate.Limiter - lastSeen time.Time - } -) - -/* -NewRateLimiterMemoryStore returns an instance of RateLimiterMemoryStore with -the provided rate (as req/s). -for more info check out Limiter docs - https://pkg.go.dev/golang.org/x/time/rate#Limit. - -Burst and ExpiresIn will be set to default values. - -Note that if the provided rate is a float number and Burst is zero, Burst will be treated as the rounded down value of the rate. - -Example (with 20 requests/sec): - - limiterStore := middleware.NewRateLimiterMemoryStore(20) -*/ -func NewRateLimiterMemoryStore(rate rate.Limit) (store *RateLimiterMemoryStore) { - return NewRateLimiterMemoryStoreWithConfig(RateLimiterMemoryStoreConfig{ - Rate: rate, - }) -} - -/* -NewRateLimiterMemoryStoreWithConfig returns an instance of RateLimiterMemoryStore -with the provided configuration. Rate must be provided. Burst will be set to the rounded down value of -the configured rate if not provided or set to 0. - -The build-in memory store is usually capable for modest loads. For higher loads other -store implementations should be considered. - -Characteristics: -* Concurrency above 100 parallel requests may causes measurable lock contention -* A high number of different IP addresses (above 16000) may be impacted by the internally used Go map -* A high number of requests from a single IP address may cause lock contention - -Example: - - limiterStore := middleware.NewRateLimiterMemoryStoreWithConfig( - middleware.RateLimiterMemoryStoreConfig{Rate: 50, Burst: 200, ExpiresIn: 5 * time.Minute}, - ) -*/ -func NewRateLimiterMemoryStoreWithConfig(config RateLimiterMemoryStoreConfig) (store *RateLimiterMemoryStore) { - store = &RateLimiterMemoryStore{} - - store.rate = config.Rate - store.burst = config.Burst - store.expiresIn = config.ExpiresIn - if config.ExpiresIn == 0 { - store.expiresIn = DefaultRateLimiterMemoryStoreConfig.ExpiresIn - } - if config.Burst == 0 { - store.burst = int(config.Rate) - } - store.visitors = make(map[string]*Visitor) - store.timeNow = time.Now - store.lastCleanup = store.timeNow() - return -} - -// RateLimiterMemoryStoreConfig represents configuration for RateLimiterMemoryStore -type RateLimiterMemoryStoreConfig struct { - Rate rate.Limit // Rate of requests allowed to pass as req/s. For more info check out Limiter docs - https://pkg.go.dev/golang.org/x/time/rate#Limit. - Burst int // Burst is maximum number of requests to pass at the same moment. It additionally allows a number of requests to pass when rate limit is reached. - ExpiresIn time.Duration // ExpiresIn is the duration after that a rate limiter is cleaned up -} - -// DefaultRateLimiterMemoryStoreConfig provides default configuration values for RateLimiterMemoryStore -var DefaultRateLimiterMemoryStoreConfig = RateLimiterMemoryStoreConfig{ - ExpiresIn: 3 * time.Minute, -} - -// Allow implements RateLimiterStore.Allow -func (store *RateLimiterMemoryStore) Allow(identifier string) (bool, error) { - store.mutex.Lock() - limiter, exists := store.visitors[identifier] - if !exists { - limiter = new(Visitor) - limiter.Limiter = rate.NewLimiter(store.rate, store.burst) - store.visitors[identifier] = limiter - } - now := store.timeNow() - limiter.lastSeen = now - if now.Sub(store.lastCleanup) > store.expiresIn { - store.cleanupStaleVisitors() - } - store.mutex.Unlock() - return limiter.AllowN(store.timeNow(), 1), nil -} - -/* -cleanupStaleVisitors helps manage the size of the visitors map by removing stale records -of users who haven't visited again after the configured expiry time has elapsed -*/ -func (store *RateLimiterMemoryStore) cleanupStaleVisitors() { - for id, visitor := range store.visitors { - if store.timeNow().Sub(visitor.lastSeen) > store.expiresIn { - delete(store.visitors, id) - } - } - store.lastCleanup = store.timeNow() -} diff --git a/internal/middleware/recover.go b/internal/middleware/recover.go index 629368e..929ed71 100644 --- a/internal/middleware/recover.go +++ b/internal/middleware/recover.go @@ -15,9 +15,6 @@ type LogErrorFunc func(c echo.Context, err error, stack []byte) error // RecoverConfig defines the config for Recover middleware. type RecoverConfig struct { - // Skipper defines a function to skip middleware. - Skipper Skipper - // Size of the stack to be printed. // Optional. Default value 4KB. StackSize int @@ -48,7 +45,6 @@ type RecoverConfig struct { // DefaultRecoverConfig is the default Recover middleware config. var DefaultRecoverConfig = RecoverConfig{ - Skipper: DefaultSkipper, StackSize: 4 << 10, // 4 KB DisableStackAll: false, DisablePrintStack: false, @@ -57,10 +53,13 @@ var DefaultRecoverConfig = RecoverConfig{ DisableErrorHandler: false, } -func (config *RecoverConfig) ToMiddleware() echo.MiddlewareFunc { - if config.Skipper == nil { - config.Skipper = DefaultRecoverConfig.Skipper - } +// Recover returns a middleware which recovers from panics anywhere in the chain +// and handles the control to the centralized HTTPErrorHandler. +func Recover() echo.MiddlewareFunc { + return RecoverWithConfig(DefaultRecoverConfig) +} + +func RecoverWithConfig(config RecoverConfig) echo.MiddlewareFunc { if config.StackSize == 0 { config.StackSize = DefaultRecoverConfig.StackSize } @@ -70,9 +69,6 @@ func (config *RecoverConfig) ToMiddleware() echo.MiddlewareFunc { } return func(next echo.HandlerFunc) echo.HandlerFunc { return func(c echo.Context) (returnErr error) { - if config.Skipper(c) { - return next(c) - } defer func() { if r := recover(); r != nil { if r == http.ErrAbortHandler { @@ -127,9 +123,3 @@ func (config *RecoverConfig) ToMiddleware() echo.MiddlewareFunc { } } } - -// Recover returns a middleware which recovers from panics anywhere in the chain -// and handles the control to the centralized HTTPErrorHandler. -func Recover() echo.MiddlewareFunc { - return DefaultRecoverConfig.ToMiddleware() -} diff --git a/internal/runtime/goroutine.go b/internal/runtime/goroutine.go new file mode 100644 index 0000000..cab85cc --- /dev/null +++ b/internal/runtime/goroutine.go @@ -0,0 +1,108 @@ +package runtime + +import ( + "context" + "sorbet/pkg/log" + "sync" +) + +var ( + // 并发组控制器 + wg sync.WaitGroup + // 运行时上下文对象 + ctx context.Context + // 信号量,控制任务并发数量 + semaphore chan struct{} + // 异步任务通道 + tasks chan func(context.Context) + // 上下文取消函数 + cancel context.CancelFunc + // 程序退出通道 + exit chan chan error +) + +func tryCancel() { + cancelFunc := cancel + if cancelFunc == nil { + log.Panic("context already cancelled") + } + cancel = nil + cancelFunc() + // 等待任务执行完成 + for len(semaphore) > 0 { + work(nil) + } + // 理论上在没有信号量的情况下,是没有任务队列的。 + for len(tasks) > 0 { + log.Panic("nonempty task channel") + } + // 销毁注册的服务 + for i := len(servlets) - 1; i >= 0; i-- { + if err := servlets[i].Stop(); err != nil { + log.Warn("servlet Stop returned with error: ", err) + } + } + ctx = nil +} + +// Go 异步任务 +func Go(fn func(context.Context)) { + if semaphore == nil || ctx == nil { + log.Panic("not initialized") + return + } + select { + case semaphore <- struct{}{}: + // If we are below our limit, spawn a new worker rather + // than waiting for one to become available. + async(&wg, func(context.Context) { + work(fn) + }) + case tasks <- fn: + // 放到任务队列中 + // A worker is available and has accepted the task. + } +} + +// Async 异步执行函数 +func Async(fn func(context.Context)) { + async(&wg, fn) +} + +// 执行任务 +func work(f func(context.Context)) { + defer func() { + if semaphore != nil { + // 释放 + <-semaphore + } + }() + + // todo 能不能复用 sync.WaitGroup + g := &sync.WaitGroup{} + if f != nil { + Async(f) + } + for task := range tasks { + Async(task) + } + g.Wait() +} + +// 异步执行函数 +func async(wg *sync.WaitGroup, fn func(context.Context)) { + wg.Add(1) + go func() { + defer wg.Done() + if ctx == nil { + return + } + select { + case <-ctx.Done(): + // 上下文一旦结束,任务将被忽略 + log.Error("context done") + default: + fn(ctx) + } + }() +} diff --git a/internal/runtime/runtime.go b/internal/runtime/runtime.go new file mode 100644 index 0000000..f8fdba0 --- /dev/null +++ b/internal/runtime/runtime.go @@ -0,0 +1,90 @@ +package runtime + +import ( + "context" + "errors" + "os" + "os/signal" + "sorbet/pkg/log" + "syscall" +) + +// Init 初始化 +func Init() error { + return InitWith(context.Background()) +} + +func InitWith(root context.Context) error { + if ctx != nil || cancel != nil { + return errors.New("not stopped") + } + // 初始化上下文相关 + ctx, cancel = context.WithCancel(root) + // 初始化退出通道 + if exit == nil { + exit = make(chan chan error) + } + // 初始化信号量 + if semaphore == nil { + semaphore = make(chan struct{}, 256) + } + // 初始化任务通道 + if tasks != nil { + tasks = make(chan func(context.Context), 256) + } + // 初始化网络服务组件 + for _, servlet := range servlets { + if err := servlet.Init(ctx); err != nil { + return err + } + } + return nil +} + +func Start() error { + // 创建服务器 + srv, ln, err := createServer() + if err != nil { + return err + } + + // 启动网络服务器 + go func() { + err := srv.Serve(ln) + if err != nil { + log.Error("encountered an error while serving listener: ", err) + } + }() + log.Info("Listening on %s", ln.Addr().String()) + + // 监听停止命令,停止网络服务 + go func() { + errChan := <-exit + tryCancel() + errChan <- ln.Close() // stop the listener + }() + + return nil +} + +func Run() error { + if err := Start(); err != nil { + return err + } + // parse address for host, port + ch := make(chan os.Signal, 1) + signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT) + log.Info("Received signal %s", <-ch) + return Stop() +} + +// Stop 停止运行 +func Stop() error { + if ctx == nil { + return errors.New("already stopped") + } + ch := make(chan error) + exit <- ch + wg.Wait() + return <-ch +} diff --git a/internal/runtime/server.go b/internal/runtime/server.go new file mode 100644 index 0000000..7b4be69 --- /dev/null +++ b/internal/runtime/server.go @@ -0,0 +1,121 @@ +package runtime + +import ( + "fmt" + "github.com/labstack/echo/v4" + "github.com/labstack/echo/v4/middleware" + "net" + "net/http" + "sorbet/internal/util" + "sorbet/pkg/env" + "sorbet/pkg/rsp" + "time" +) + +var ( + // maxHeaderBytes is used by the http server to limit the size of request headers. + // This may need to be increased if accepting cookies from the public. + maxHeaderBytes = 1 << 20 + // readTimeout is used by the http server to set a maximum duration before + // timing out read of the request. The default timeout is 10 seconds. + readTimeout = 10 * time.Second + // writeTimeout is used by the http server to set a maximum duration before + // timing out write of the response. The default timeout is 10 seconds. + writeTimeout = 10 * time.Second + // idleTimeout is used by the http server to set a maximum duration for + // keep-alive connections. + idleTimeout = 120 * time.Second +) + +func newEchoFramework() (*echo.Echo, error) { + e := echo.New() + e.HideBanner = true + e.HidePort = true + e.HTTPErrorHandler = rsp.HTTPErrorHandler + e.Debug = !env.IsEnv("prod") + e.Logger = util.NewEchoLogger() + e.Use(middleware.Recover()) + e.Use(middleware.CORS()) + e.Use(middleware.Logger()) + for _, servlet := range servlets { + group := e.Group("") + for _, routable := range servlet.Routes() { + routable.InitRoutes(group) + } + } + routes := e.Routes() + e.GET("/_routes", func(c echo.Context) error { + return c.JSON(http.StatusOK, routes) + }) + return e, nil +} + +func createServer() (*http.Server, net.Listener, error) { + e, err := newEchoFramework() + if err != nil { + return nil, nil, err + } + + l, err := createTCPListener() + if err != nil { + return nil, nil, err + } + + s := &http.Server{ + Handler: e, + MaxHeaderBytes: env.Int("SERVER_MAX_HEADER_BYTES", maxHeaderBytes), + ReadTimeout: env.Duration("SERVER_READ_TIMEOUT", readTimeout), + WriteTimeout: env.Duration("SERVER_WRITE_TIMEOUT", writeTimeout), + IdleTimeout: env.Duration("SERVER_IDLE_TIMEOUT", idleTimeout), + } + + return s, l, nil +} + +func createTCPListener() (net.Listener, error) { + l, err := net.Listen( + env.String("SERVER_NETWORK", "tcp"), + fmt.Sprintf("%s:%d", + env.String("SERVER_ADDRESS", "0.0.0.0"), + env.Int("SERVER_PORT", 1324), + ), + ) + if err == nil { + l = net.Listener(TCPKeepAliveListener{ + TCPListener: l.(*net.TCPListener), + }) + } + return l, err +} + +// TCPKeepAliveListener sets TCP keep-alive timeouts on accepted +// connections. It's used by ListenAndServe and ListenAndServeTLS so +// dead TCP connections (e.g. closing laptop mid-download) eventually +// go away. +// +// This is here because it is not exposed in the stdlib and +// we'd prefer to have a hold of the http.Server's net.Listener so we can close it +// on shutdown. +// +// Taken from here: https://golang.org/src/net/http/server.go?s=63121:63175#L2120 +type TCPKeepAliveListener struct { + *net.TCPListener +} + +// Accept accepts the next incoming call and returns the new +// connection. KeepAlivePeriod is set properly. +func (ln TCPKeepAliveListener) Accept() (c net.Conn, err error) { + tc, err := ln.AcceptTCP() + if err != nil { + return + } + err = tc.SetKeepAlive(true) + if err != nil { + return + } + err = tc.SetKeepAlivePeriod(3 * time.Minute) + if err != nil { + return + } + return tc, nil +} diff --git a/internal/runtime/servlet.go b/internal/runtime/servlet.go new file mode 100644 index 0000000..187091d --- /dev/null +++ b/internal/runtime/servlet.go @@ -0,0 +1,69 @@ +package runtime + +import ( + "context" + "errors" + "github.com/labstack/echo/v4" + "slices" +) + +// 注册的网络服务组件 +var servlets []Servlet + +// Servlet 网络服务组件接口 +type Servlet interface { + // Name 服务名称 + Name() string + // Priority 优先级,用于启动和销毁的执行顺序 + Priority() int + // Init 初始化服务 + Init(ctx context.Context) error + // Routes 注册路由 + Routes() []Routable + // Start 非阻塞方式启动服务 + Start() error + // Stop 停止服务 + Stop() error +} + +func Reset() error { + if len(servlets) == 0 { + return nil + } + if ctx == nil { + return errors.New("servlets is running") + } + servlets = nil + return nil +} + +// Use 注册服务 +func Use(servlets ...Servlet) error { + if len(servlets) == 0 { + return nil + } + for i := 0; i < len(servlets); i++ { + if !use(servlets[i]) { + return errors.New("service already registered") + } + } + slices.SortFunc(servlets, func(a, b Servlet) int { + return b.Priority() - a.Priority() // 按优先级排序 + }) + return nil +} + +func use(servlet Servlet) bool { + exists := slices.ContainsFunc(servlets, func(s Servlet) bool { + return s.Name() == servlet.Name() + }) + if !exists { + servlets = append(servlets, servlet) + return true + } + return false +} + +type Routable interface { + InitRoutes(*echo.Group) +} diff --git a/internal/services/company/service.go b/internal/services/company/service.go index e13c0e7..c3fa350 100644 --- a/internal/services/company/service.go +++ b/internal/services/company/service.go @@ -1,8 +1,9 @@ package company import ( + "context" + "sorbet/internal/runtime" "sorbet/internal/services/company/controller" - "sorbet/pkg/crud" ) type Service struct{} @@ -15,12 +16,22 @@ func (*Service) Priority() int { return 1 } -func (*Service) Init(ctx *crud.Context) error { - ctx.Routes(&controller.CompanyController{}) - ctx.Routes(&controller.CompanyDepartmentController{}) - ctx.Routes(&controller.CompanyStaffController{}) +func (s *Service) Init(context.Context) error { return nil } -func (s *Service) Bootstrap() error { return nil } -func (s *Service) Destroy() error { return nil } +func (s *Service) Routes() []runtime.Routable { + return []runtime.Routable{ + &controller.CompanyController{}, + &controller.CompanyDepartmentController{}, + &controller.CompanyStaffController{}, + } +} + +func (s *Service) Start() error { + return nil +} + +func (s *Service) Stop() error { + return nil +} diff --git a/internal/services/config/service.go b/internal/services/config/service.go index 16c26f2..b6dba5f 100644 --- a/internal/services/config/service.go +++ b/internal/services/config/service.go @@ -1,8 +1,9 @@ package config import ( + "context" + "sorbet/internal/runtime" "sorbet/internal/services/config/controller" - "sorbet/pkg/crud" ) type Service struct{} @@ -15,11 +16,21 @@ func (*Service) Priority() int { return 0 } -func (*Service) Init(ctx *crud.Context) error { - ctx.Routes(&controller.ConfigGroupController{}) - ctx.Routes(&controller.ConfigController{}) +func (*Service) Init(context.Context) error { return nil } -func (s *Service) Bootstrap() error { return nil } -func (s *Service) Destroy() error { return nil } +func (*Service) Routes() []runtime.Routable { + return []runtime.Routable{ + &controller.ConfigGroupController{}, + &controller.ConfigController{}, + } +} + +func (s *Service) Start() error { + return nil +} + +func (s *Service) Stop() error { + return nil +} diff --git a/internal/services/feature/service.go b/internal/services/feature/service.go index 76dc27c..bcf7b78 100644 --- a/internal/services/feature/service.go +++ b/internal/services/feature/service.go @@ -1,8 +1,9 @@ package feature import ( + "context" + "sorbet/internal/runtime" "sorbet/internal/services/feature/controller" - "sorbet/pkg/crud" ) type Service struct{} @@ -15,14 +16,24 @@ func (*Service) Priority() int { return 3 } -func (s *Service) Init(ctx *crud.Context) error { - ctx.Routes(&controller.FeatureCategoryController{}) - ctx.Routes(&controller.FeatureConfigController{}) - ctx.Routes(&controller.FeatureContentChapterController{}) - ctx.Routes(&controller.FeatureContentDetailController{}) - ctx.Routes(&controller.FeatureController{}) +func (s *Service) Init(context.Context) error { return nil } -func (s *Service) Bootstrap() error { return nil } -func (s *Service) Destroy() error { return nil } +func (s *Service) Routes() []runtime.Routable { + return []runtime.Routable{ + &controller.FeatureCategoryController{}, + &controller.FeatureConfigController{}, + &controller.FeatureContentChapterController{}, + &controller.FeatureContentDetailController{}, + &controller.FeatureController{}, + } +} + +func (s *Service) Start() error { + return nil +} + +func (s *Service) Stop() error { + return nil +} diff --git a/internal/services/resource/service.go b/internal/services/resource/service.go index 2ca7355..1a8aeb5 100644 --- a/internal/services/resource/service.go +++ b/internal/services/resource/service.go @@ -1,8 +1,9 @@ package resource import ( + "context" + "sorbet/internal/runtime" "sorbet/internal/services/resource/controller" - "sorbet/pkg/crud" ) type Service struct{} @@ -15,11 +16,21 @@ func (*Service) Priority() int { return 2 } -func (*Service) Init(ctx *crud.Context) error { - ctx.Routes(&controller.ResourceController{}) - ctx.Routes(&controller.ResourceCategoryController{}) +func (*Service) Init(context.Context) error { return nil } -func (s *Service) Bootstrap() error { return nil } -func (s *Service) Destroy() error { return nil } +func (*Service) Routes() []runtime.Routable { + return []runtime.Routable{ + &controller.ResourceController{}, + &controller.ResourceCategoryController{}, + } +} + +func (s *Service) Start() error { + return nil +} + +func (s *Service) Stop() error { + return nil +} diff --git a/internal/services/service.go b/internal/services/service.go deleted file mode 100644 index 6b677df..0000000 --- a/internal/services/service.go +++ /dev/null @@ -1,72 +0,0 @@ -package services - -import ( - "context" - "errors" - "github.com/labstack/echo/v4" - "slices" - "sorbet/internal/services/company" - "sorbet/internal/services/config" - "sorbet/internal/services/feature" - "sorbet/internal/services/resource" - "sorbet/internal/services/system" - "sorbet/pkg/crud" -) - -var services []crud.Service - -const ContextEchoKey = "echo_framework" - -func Register(service crud.Service) error { - for _, applet := range services { - if applet.Name() == service.Name() { - return errors.New("service already registered") - } - } - services = append(services, service) - return nil -} - -func Init(ctx context.Context) error { - services = []crud.Service{ - &config.Service{}, - &company.Service{}, - &resource.Service{}, - &feature.Service{}, - &system.Service{}, - } - - // 按优先级排序 - slices.SortFunc(services, func(a, b crud.Service) int { - return b.Priority() - a.Priority() - }) - - return nil -} - -func Bootstrap(ctx context.Context) error { - e := ctx.Value(ContextEchoKey).(*echo.Echo) - for _, service := range services { - err := service.Init(crud.NewContext(ctx, e.Group(""))) - if err != nil { - return err - } - err = service.Bootstrap() - if err != nil { - return err - } - } - return nil -} - -func Destroy() error { - for i := len(services) - 1; i >= 0; i++ { - service := services[i] - err := service.Destroy() - if err != nil { - // TODO(hupeh): 是否需要销毁策略,比如可以继续或者中断等行为 - return err - } - } - return nil -} diff --git a/internal/services/system/service.go b/internal/services/system/service.go index b55f975..9d99ffc 100644 --- a/internal/services/system/service.go +++ b/internal/services/system/service.go @@ -1,8 +1,9 @@ package system import ( + "context" + "sorbet/internal/runtime" "sorbet/internal/services/system/controller" - "sorbet/pkg/crud" ) type Service struct{} @@ -15,15 +16,25 @@ func (*Service) Priority() int { return 3 } -func (s *Service) Init(ctx *crud.Context) error { - ctx.Routes(&controller.SystemLogController{}) - ctx.Routes(&controller.SystemMenuController{}) - ctx.Routes(&controller.SystemPermissionController{}) - ctx.Routes(&controller.SystemRoleController{}) - ctx.Routes(&controller.SystemRolePowerController{}) - ctx.Routes(&controller.SystemUserController{}) +func (s *Service) Init(context.Context) error { return nil } -func (s *Service) Bootstrap() error { return nil } -func (s *Service) Destroy() error { return nil } +func (s *Service) Routes() []runtime.Routable { + return []runtime.Routable{ + &controller.SystemLogController{}, + &controller.SystemMenuController{}, + &controller.SystemPermissionController{}, + &controller.SystemRoleController{}, + &controller.SystemRolePowerController{}, + &controller.SystemUserController{}, + } +} + +func (s *Service) Start() error { + return nil +} + +func (s *Service) Stop() error { + return nil +} diff --git a/internal/util/echo_logger.go b/internal/util/echo_logger.go index 6681a14..b66b44e 100644 --- a/internal/util/echo_logger.go +++ b/internal/util/echo_logger.go @@ -13,7 +13,7 @@ type EchoLogger struct { l sorbetLog.Logger } -func NewLogger() *EchoLogger { +func NewEchoLogger() *EchoLogger { return NewCustomLogger(sorbetLog.Default()) } diff --git a/main.go b/main.go index b8ac967..880e142 100644 --- a/main.go +++ b/main.go @@ -3,21 +3,20 @@ package main import ( "log" "sorbet/internal" - "sorbet/pkg/app" + "sorbet/internal/runtime" "sorbet/pkg/env" ) func main() { - if err := env.Init(); err != nil { - panic(err) - } - if err := internal.Init(); err != nil { - panic(err) - } - if err := app.Init("--udp", "false"); err != nil { - panic(err) - } - if err := app.Loop(); err != nil { - log.Panicln(err) + invoke(runtime.Reset) + invoke(env.Init) + invoke(internal.Init) + invoke(runtime.Init) + invoke(runtime.Run) +} + +func invoke(fn func() error) { + if err := fn(); err != nil { + log.Fatalln(err) } } diff --git a/pkg/app/app.go b/pkg/app/app.go deleted file mode 100644 index 1f9e17b..0000000 --- a/pkg/app/app.go +++ /dev/null @@ -1,128 +0,0 @@ -package app - -import ( - "context" - "errors" - "os" - "sorbet/pkg/app/appkit" -) - -func Init(args ...string) (err error) { - if len(args) == 0 { - args = os.Args[1:] - } - - switch Status() { - case Initialized: - return nil - case Starting: - return errors.New("starting") - case Running: - return errors.New("running") - case Stopping: - return errors.New("stopping") - case Idle, Stopped: - // continue - } - - initLifecycle() - initStats(args) - initUdp(args) - initBus() - initBuiltins() - - setStatus(Initialized) - - return -} - -func initBuiltins() { - // 订阅应用启动 - var sub appkit.Subscriber - sub = Sub("start", func([]byte) []byte { - sub.Cancel() - return nil - }) - - // 订阅应用停止 - Sub("stop", func(bytes []byte) []byte { - quit := exit - if quit != nil { - exit = nil - quit() - } - return nil - }) -} - -func Loop() error { - switch Status() { - case Starting, Running: - return nil - case Stopping: - return errors.New("stopping") - case Idle: - return errors.New("idle, you maybe forgot call app.Init()") - case Stopped: - return errors.New("stopped, you maybe forgot call app.Init()") - case Initialized: - // nothing - } - - // 释放内存 - defer free() - - wg.Add(1) - setStatus(Starting) - - // In a goroutine, we wait on for all goroutines to complete (for example - // timers). We use this to signal to the main thread to exit. - // wg.Add(1) basically translates to uv_ref, if this was Node. - // wg.Done() basically translates to uv_unref - go func() { - wg.Wait() - setStatus(Stopping) - stop() - }() - - // 启动应用指令 - Pub("start", nil) - - for { - select { - case msg := <-pubs: - err := dispatch(msg) - exitOnError(err) - wg.Done() // Corresponds to the wg.Add(1) in Pub(). - - case <-ctx.Done(): - // 只有等待所有协程全部完成后, - // 然后才可以退出主循环 - checkPubsEmpty() - setStatus(Stopped) - err := ctx.Err() - if errors.Is(err, context.Canceled) { - return nil - } - return err - } - - // We don't want to exit until we've received at least one message. - // This is so the program doesn't exit after sending the "start" - // message. - if Status() == Starting { - wg.Done() - start() - setStatus(Running) - } - } -} - -func Stop() { - Pub("stop", nil) -} - -func Go(fn func(context.Context)) { - assert(fn != nil, "invalid fn") - async(fn) -} diff --git a/pkg/app/appkit/bus.go b/pkg/app/appkit/bus.go deleted file mode 100644 index 03dd01a..0000000 --- a/pkg/app/appkit/bus.go +++ /dev/null @@ -1,5 +0,0 @@ -package appkit - -type Subscriber interface { - Cancel() -} diff --git a/pkg/app/appkit/codec.go b/pkg/app/appkit/codec.go deleted file mode 100644 index 261ba23..0000000 --- a/pkg/app/appkit/codec.go +++ /dev/null @@ -1,81 +0,0 @@ -package appkit - -import ( - "bytes" - "encoding/binary" - "errors" - "math" -) - -// 数据协议如下: -// -// +-----------+--------+--------------+-------+------+ -// | 0x01,0x02 | length | topic-length | topic | data | -// +-----------+--------+--------------+-------+------+ -// 2 2 2 n n -// -// * 头部有两个模数 0x01、0x02; -// * length 表示消息的总长度; -// * 然后分别是 topic、data,包含其长度和数据; -// * 长度使用 uint16 类型,所以占用 2 个字节; -// -// todo 实现数据校验码,防止篡改 - -// Encode 数据编码 -func Encode(topic, data []byte) ([]byte, error) { - topicLen := len(topic) - dataLen := len(data) - size := 2 + topicLen + dataLen // 不包括模数和数据长度 - if topicLen == 0 { - return nil, errors.New("topic too small") - } - if size+4 > math.MaxUint16 { - return nil, errors.New("data too big") - } - buf := &bytes.Buffer{} - // 由于 模数、topic 和 data 都是基本类型,所以当 - // 执行 binary.Write() 时是不会出错的 - _ = binary.Write(buf, binary.BigEndian, []byte{0x01, 0x02}) - _ = binary.Write(buf, binary.BigEndian, uint16(size)) - _ = binary.Write(buf, binary.BigEndian, uint16(topicLen)) - _ = binary.Write(buf, binary.BigEndian, topic) - _ = binary.Write(buf, binary.BigEndian, data) // 如果没有数据会不会报错 - return buf.Bytes(), nil -} - -// Decode 数据解码 -func Decode(buf []byte) (topic, data []byte, err error) { - r := bytes.NewReader(buf) - // 读取模数 - var v [2]byte - err = binary.Read(r, binary.BigEndian, &v) - if err != nil { - return - } - if v[0] != 0x01 || v[1] != 0x02 { - err = errors.New("protocol error") - return - } - // 读取数据长度 - var size uint16 - err = binary.Read(r, binary.BigEndian, &size) - if err != nil { - return - } - // 读取主题长度 - var topicLen uint16 - err = binary.Read(r, binary.BigEndian, &topicLen) - if err != nil { - return - } - // 读取主题 - topic = make([]byte, topicLen) - err = binary.Read(r, binary.BigEndian, &topic) - if err != nil { - return - } - // 读取数据 - data = make([]byte, size-2-topicLen) - err = binary.Read(r, binary.BigEndian, &data) - return -} diff --git a/pkg/app/bus.go b/pkg/app/bus.go deleted file mode 100644 index fa0225f..0000000 --- a/pkg/app/bus.go +++ /dev/null @@ -1,215 +0,0 @@ -package app - -import ( - "errors" - "net" - "slices" - "sorbet/pkg/app/appkit" - "sync" - "sync/atomic" -) - -var ( - // 操作锁 - // - // * 添加订阅器 - lock sync.RWMutex - - // 用于控制主体程序并发,涉及以下几个方面: - // - // * 任务调度 - // * 定时器 - // * 发布订阅 - // * 主程序轮询 - wg sync.WaitGroup - - // 消息发布通道 - pubs chan Msg - - // 消息订阅器 - subs map[string][]*subscriber - - nextSubId int32 -) - -func initBus() { - if pubs == nil { - pubs = make(chan Msg, 256) - } - if subs == nil { - subs = make(map[string][]*subscriber) - } - OnFree(freeBus) -} - -func freeBus() { - clear(subs) - - select { - case _, ok := <-pubs: - if !ok { - // 通道已经关闭了,需要重新构建 - pubs = make(chan Msg, 256) - return - } - // 清空 pubs 缓存的数据,复用它 - l := len(pubs) - for i := 0; i < l; i++ { - <-pubs - } - default: - // 通道里面没有值,可以复用 - } -} - -func Sub(topic string, handle func([]byte) []byte) appkit.Subscriber { - lock.Lock() - defer lock.Unlock() - sub := &subscriber{ - id: atomic.AddInt32(&nextSubId, 1), - topic: topic, - handle: handle, - } - if _, ok := subs[topic]; !ok { - subs[topic] = make([]*subscriber, 0) - } - subs[topic] = append(subs[topic], sub) - return sub -} - -func Pub(topic string, data []byte) { - pubMsg(Msg{Data: data, topic: topic}) -} - -func pubMsg(msg Msg) { - wg.Add(1) - pubs <- msg -} - -type Msg struct { - Data []byte - - topic string - addr net.Addr - pc net.PacketConn -} - -// Size 消息长度 -func (m Msg) Size() int { - return 6 + len([]byte(m.topic)) + len(m.Data) -} - -type subscriber struct { - id int32 - topic string - handle func([]byte) []byte -} - -func (s *subscriber) Active() bool { - return atomic.LoadInt32(&s.id) > 0 -} - -func (s *subscriber) Cancel() { - id := atomic.SwapInt32(&s.id, 0) - if id == 0 { - return - } - lock.Lock() - defer lock.Unlock() - subscribers, ok := subs[s.topic] - if !ok { - return - } - if len(subscribers) > 0 { - subscribers = slices.DeleteFunc(subscribers, func(sub *subscriber) bool { - return sub.id == id - }) - } - if len(subscribers) > 0 { - subs[s.topic] = subscribers - } else { - delete(subs, s.topic) - } -} - -func (s *subscriber) invoke(data []byte) []byte { - if s.Active() { - return s.handle(data) - } - return nil -} - -func dispatch(msg Msg) error { - lock.RLock() - var cbs []*subscriber - if subscribers, ok := subs[msg.topic]; ok { - for _, s := range subscribers { - if s.Active() { - cbs = append(cbs, s) - } - } - } - lock.RUnlock() - - if len(cbs) == 0 { - return errors.New("no subscribers for topic " + msg.topic) - } - - var response []byte - for _, cb := range cbs { - res := cb.invoke(msg.Data) - if res != nil { - response = res - } - } - - if len(response) > 0 && msg.pc != nil { - stats.v8workerRespond++ - stats.v8workerBytesSent += len(response) - - n, ex := msg.pc.WriteTo(response, msg.addr) - if n > 0 { - stats.v8workerSend++ - stats.v8workerBytesSent += n - } - if ex != nil { - // todo handle the error - } - } - - return nil -} - -func recv(pc net.PacketConn, addr net.Addr, buf []byte) { - stats.v8workerRecv++ - stats.v8workerBytesRecv += len(buf) - - topic, data, err := appkit.Decode(buf) - if err != nil { - // todo handle the error - // return errors.New("invalid payload") - return - } - - pubMsg(Msg{ - Data: data, - topic: string(topic), - addr: addr, - pc: pc, - }) -} - -func checkPubsEmpty() { - // We've received a done event. As a sanity check, make sure that resChan is - // empty. - select { - case _, ok := <-pubs: - if ok { - panic("Read a message from pubs after context deadlined.") - } else { - panic("pubs closed. Unexpected.") - } - default: - // No value ready, moving on. - } -} diff --git a/pkg/app/lifecycle.go b/pkg/app/lifecycle.go deleted file mode 100644 index ae1f8e8..0000000 --- a/pkg/app/lifecycle.go +++ /dev/null @@ -1,83 +0,0 @@ -package app - -import ( - "context" - "sync/atomic" -) - -const ( - Idle int32 = iota - Initialized - Starting - Running - Stopping - Stopped -) - -var ( - // 当前状态 - status int32 - - // 应用上下文 - ctx context.Context - exit context.CancelFunc - - start func() // 应用启动钩子 - stop func() // 应用停止钩子 - free func() // 内存释放钩子 -) - -func initLifecycle() { - ctx, exit = context.WithCancel(context.Background()) - - start = func() { - // nothing - } - - stop = func() { - if exit != nil { - exit() - } - } - - free = func() { - ctx = nil - start = nil - stop = nil - exit = nil - } -} - -// setStatus 设置应用状态 -func setStatus(newStatus int32) { - atomic.StoreInt32(&status, newStatus) -} - -// Status 返回当前状态 -func Status() int32 { - return atomic.LoadInt32(&status) -} - -func OnStart(fn func()) { - oldStart := start - start = func() { - oldStart() - fn() - } -} - -func OnStop(fn func()) { - oldStop := stop - stop = func() { - fn() - oldStop() - } -} - -func OnFree(fn func()) { - oldFree := free - free = func() { - fn() - oldFree() - } -} diff --git a/pkg/app/stats.go b/pkg/app/stats.go deleted file mode 100644 index 6775ae4..0000000 --- a/pkg/app/stats.go +++ /dev/null @@ -1,44 +0,0 @@ -package app - -import ( - "os" - "reflect" - "runtime/pprof" -) - -var stats struct { - v8workerSend int - v8workerRespond int - v8workerRecv int - v8workerBytesSent int - v8workerBytesRecv int -} - -func initStats(args []string) { - initCpuProfile(strArg(args, "--cpuprof")) - initMemoryProfile(strArg(args, "--memprof")) - OnFree(freeStats) -} - -func initCpuProfile(cpuprof string) { - if cpuprof != "" { - cpuProfile, err := os.Create(cpuprof) - check(err) - check(pprof.StartCPUProfile(cpuProfile)) - OnStop(pprof.StopCPUProfile) - } -} - -func initMemoryProfile(memprof string) { - if memprof != "" { - memProfile, err := os.Create(memprof) - check(err) - check(pprof.WriteHeapProfile(memProfile)) - OnStop(func() { check(memProfile.Close()) }) - } -} - -func freeStats() { - newStats := reflect.New(reflect.TypeOf(stats)) - reflect.ValueOf(&stats).Elem().Set(newStats.Elem()) -} diff --git a/pkg/app/udp.go b/pkg/app/udp.go deleted file mode 100644 index d09acd3..0000000 --- a/pkg/app/udp.go +++ /dev/null @@ -1,63 +0,0 @@ -package app - -import ( - "context" - "errors" - "io" - "net" -) - -// 通过 UDP 协议实现外部通信 -var pc net.PacketConn - -func initUdp(args []string) { - // 未开启 udp - if !boolArg(args, "--udp") { - return - } - - // 这一步属于初始化,切不可放到 OnStart 里面 - sizeLimit := intArg(args, "--payload", 1024<<2) - addr := strArg(args, "--addr", ":1324") - - // 当程序启动时开启 udp 服务器 - OnStart(func() { - listenUDP(addr, sizeLimit) - }) - - OnStop(freeUdp) -} - -func freeUdp() { - if pc != nil { - err := pc.Close() - pc = nil - check(err) - } -} - -func listenUDP(address string, maxSize int) { - var err error - pc, err = net.ListenPacket("udp", address) - check(err) - - async(func(ctx context.Context) { - for { - select { - case <-ctx.Done(): - return - default: - } - - buf := make([]byte, maxSize) - n, addr, err := pc.ReadFrom(buf) - if err != nil { - if errors.Is(err, io.EOF) { - return - } - continue - } - recv(pc, addr, buf[:n]) - } - }) -} diff --git a/pkg/app/util.go b/pkg/app/util.go deleted file mode 100644 index 39cf83e..0000000 --- a/pkg/app/util.go +++ /dev/null @@ -1,101 +0,0 @@ -package app - -import ( - "context" - "errors" - "os" - "slices" - "strconv" - "strings" -) - -func assert(cond bool, msg string) { - if !cond { - panic(msg) - } -} - -func check(e error) { - if e != nil { - panic(e) - } -} - -func exitOnError(err error) { - if err != nil { - os.Stderr.WriteString(err.Error()) - os.Exit(1) - } -} - -func async(cb func(ctx context.Context)) { - wg.Add(1) - go func() { - defer wg.Done() - select { - case <-ctx.Done(): - return - default: - cb(ctx) - } - }() -} - -func getArg[T any](args []string, name string, f func(string, bool) T) T { - i := slices.Index(args, name) - if i == -1 { - return f("", false) - } - if i == len(args)-1 { - panic(errors.New("invalid " + name + " option")) - } - s := args[i+1] - if strings.HasPrefix(s, "--") { - return f("", true) - } - return f(s, true) -} - -func strArg(args []string, name string, def ...string) string { - return getArg(args, name, func(s string, b bool) string { - if b { - return s - } - for _, v := range def { - return v - } - return "" - }) -} - -func intArg(args []string, name string, def ...int) int { - return getArg(args, name, func(s string, b bool) int { - if b { - size, err := strconv.Atoi(s) - check(err) - assert(size > 0, "invalid "+name+" option") - return size - } - for _, v := range def { - return v - } - return 0 - }) -} - -func boolArg(args []string, name string, def ...bool) bool { - return getArg(args, name, func(s string, b bool) bool { - if b { - if s == "" { - return true - } - v, err := strconv.ParseBool(s) - check(err) - return v - } - for _, v := range def { - return v - } - return false - }) -} diff --git a/pkg/crud/context.go b/pkg/crud/context.go deleted file mode 100644 index 512932a..0000000 --- a/pkg/crud/context.go +++ /dev/null @@ -1,53 +0,0 @@ -package crud - -import ( - "context" - "github.com/labstack/echo/v4" - "sync" -) - -type Context struct { - context.Context - store map[any]any - router *echo.Group - mu sync.RWMutex -} - -func NewContext(ctx context.Context, router *echo.Group) *Context { - return &Context{ - Context: ctx, - store: make(map[any]any), - router: router, - mu: sync.RWMutex{}, - } -} - -// Routes 注册路由 -func (c *Context) Routes(routes Routable) { - c.mu.Lock() - defer c.mu.Unlock() - routes.InitRoutes(c.router) -} - -// Set 设置值 -func (c *Context) Set(key, val any) { - c.mu.Lock() - defer c.mu.Unlock() - c.store[key] = val -} - -// Get 获取值,只会获取通过 Set 方法设置的值 -func (c *Context) Get(key any) (any, bool) { - c.mu.RLock() - defer c.mu.RUnlock() - val, ok := c.store[key] - return val, ok -} - -// Value 获取值 -func (c *Context) Value(key any) any { - if val, ok := c.Get(key); ok { - return val - } - return c.Value(key) -} diff --git a/pkg/crud/controller.go b/pkg/crud/controller.go index 849e0e8..19122eb 100644 --- a/pkg/crud/controller.go +++ b/pkg/crud/controller.go @@ -62,11 +62,11 @@ type Controller[Entity any, Upsert any] struct{} // RegisterRoutes 注册路由 func (ctr *Controller[Entity, Upsert]) RegisterRoutes(path string, r *echo.Group) { + r.GET(path, ctr.List) + r.GET(path+"/:id", ctr.Get) r.PUT(path, ctr.Create) r.DELETE(path+"/:id", ctr.Delete) r.POST(path+"/:id", ctr.Update) - r.GET(path+"/:id", ctr.Get) - r.GET(path, ctr.List) } // ORM 获取 gorm.DB 实例 diff --git a/pkg/crud/service.go b/pkg/crud/service.go deleted file mode 100644 index 6602f65..0000000 --- a/pkg/crud/service.go +++ /dev/null @@ -1,20 +0,0 @@ -package crud - -import "github.com/labstack/echo/v4" - -type Service interface { - // Name 服务名称 - Name() string - // Priority 优先级,用于启动和销毁的执行顺序 - Priority() int - // Init 初始化服务 - Init(ctx *Context) error - // Bootstrap 启动服务 - Bootstrap() error - // Destroy 销毁服务 - Destroy() error -} - -type Routable interface { - InitRoutes(r *echo.Group) -} diff --git a/pkg/log/log.go b/pkg/log/log.go index e5fe588..dd561de 100644 --- a/pkg/log/log.go +++ b/pkg/log/log.go @@ -91,6 +91,10 @@ func Error(msg string, args ...any) { std.Error(msg, args...) } +func Panic(msg string, args ...any) { + std.Panic(msg, args...) +} + // Fatal logs at LevelFatal. func Fatal(msg string, args ...any) { std.Fatal(msg, args...) diff --git a/pkg/log/logger.go b/pkg/log/logger.go index 5a40646..09374f7 100644 --- a/pkg/log/logger.go +++ b/pkg/log/logger.go @@ -56,6 +56,7 @@ type Logger interface { Info(msg string, args ...any) Warn(msg string, args ...any) Error(msg string, args ...any) + Panic(msg string, args ...any) Fatal(msg string, args ...any) } @@ -152,6 +153,7 @@ func (l *logger) SetIgnorePC(ignore bool) { } func (l *logger) Enabled(level Level) bool { + // todo panic 和 fatal 用于开启,取消 off 等级 return l.handler.Enabled(nil, level.slog().Level()) } @@ -237,6 +239,11 @@ func (l *logger) Error(msg string, args ...any) { l.log(LevelError, msg, args...) } +// Panic logs at PanicError. +func (l *logger) Panic(msg string, args ...any) { + l.log(LevelPanic, msg, args...) +} + // Fatal logs at LevelFatal. func (l *logger) Fatal(msg string, args ...any) { l.log(LevelFatal, msg, args...)