package db import ( "errors" "fmt" "ims/util/backoff" "gorm.io/gorm" "zestack.dev/env" ) type pgsqlSchemaHelper struct{} func (s *pgsqlSchemaHelper) PublicSchema() string { return env.String("DB_PUBLIC_SCHEMA", "public") } func (s *pgsqlSchemaHelper) TenantSchema(tenantId uint) string { if tenantId == 0 { return s.PublicSchema() } return fmt.Sprintf( "%s%d%s", env.String("DB_TENANT_PREFIX", "tenant_"), tenantId, env.String("DB_TENANT_SUFFIX", ""), ) } func (s *pgsqlSchemaHelper) CurrentSchema(tx *gorm.DB) string { // tx = tx.Session(&gorm.Session{}) var schema string tx.Raw("SHOW search_path").Scan(&schema) if schema == `"$user", public` { return "public" } return schema } func (s *pgsqlSchemaHelper) UseSchema(tx *gorm.DB, schema string) (func() error, error) { // tx = tx.Session(&gorm.Session{}) if schema == "" { err := errors.New("schema name is empty") tx.AddError(err) return nil, err } currentSchema := s.CurrentSchema(tx) publicSchema := s.PublicSchema() // 当前 schema 与目标 schema 相同,无需切换 if schema == s.CurrentSchema(tx) { reset := func() error { return nil } return reset, nil } // 不支持租户切换 if currentSchema != publicSchema && schema != publicSchema { err := fmt.Errorf( "failed to switch schema %s from current schema %s: %w", schema, currentSchema, ErrSwitchSchema, ) tx.AddError(err) return nil, err } sqlstr := "SET search_path TO " + tx.Statement.Quote(schema) if execErr := tx.Exec(sqlstr).Error; execErr != nil { err := fmt.Errorf("failed to set search path %q: %w", schema, execErr) tx.AddError(err) return nil, err } if schema == publicSchema { reset := func() error { return nil } return reset, nil } reset := func() error { return tx.Exec("SET search_path TO " + tx.Statement.Quote(publicSchema)).Error } return reset, nil } // AcquireXact acquires a PostgreSQL transaction-level advisory lock. // The caller is responsible for ensuring that a transaction is active, // and that the lock is released after use. func (s *pgsqlSchemaHelper) LockSchema(tx *gorm.DB, schema string, retry *backoff.Options) (func() error, error) { return AcquireXact(tx, schema, retry) } func (s *pgsqlSchemaHelper) CreateSchema(tx *gorm.DB, schema string) error { sql := "CREATE SCHEMA IF NOT EXISTS " + tx.Statement.Quote(schema) return tx.Exec(sql).Error } func (s *pgsqlSchemaHelper) DropSchema(tx *gorm.DB, schema string) error { sql := "DROP SCHEMA IF EXISTS " + tx.Statement.Quote(schema) + " CASCADE" return tx.Exec(sql).Error } func (s *pgsqlSchemaHelper) String() string { return "pgsql" }