You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
ims/util/db/pgsql_schema.go

107 lines
2.6 KiB

package db
import (
"errors"
"fmt"
"ims/util/backoff"
"gorm.io/gorm"
"zestack.dev/env"
)
type pgsqlSchemaHelper struct{}
func (s *pgsqlSchemaHelper) PublicSchema() string {
return env.String("DB_PUBLIC_SCHEMA", "public")
}
func (s *pgsqlSchemaHelper) TenantSchema(tenantId uint) string {
if tenantId == 0 {
return s.PublicSchema()
}
return fmt.Sprintf(
"%s%d%s",
env.String("DB_TENANT_PREFIX", "tenant_"),
tenantId,
env.String("DB_TENANT_SUFFIX", ""),
)
}
func (s *pgsqlSchemaHelper) CurrentSchema(tx *gorm.DB) string {
// tx = tx.Session(&gorm.Session{})
var schema string
tx.Raw("SHOW search_path").Scan(&schema)
if schema == `"$user", public` {
return "public"
}
return schema
}
func (s *pgsqlSchemaHelper) UseSchema(tx *gorm.DB, schema string) (func() error, error) {
// tx = tx.Session(&gorm.Session{})
if schema == "" {
err := errors.New("schema name is empty")
tx.AddError(err)
return nil, err
}
currentSchema := s.CurrentSchema(tx)
publicSchema := s.PublicSchema()
// 当前 schema 与目标 schema 相同,无需切换
if schema == s.CurrentSchema(tx) {
reset := func() error { return nil }
return reset, nil
}
// 不支持租户切换
if currentSchema != publicSchema && schema != publicSchema {
err := fmt.Errorf(
"failed to switch schema %s from current schema %s: %w",
schema,
currentSchema,
ErrSwitchSchema,
)
tx.AddError(err)
return nil, err
}
sqlstr := "SET search_path TO " + tx.Statement.Quote(schema)
if execErr := tx.Exec(sqlstr).Error; execErr != nil {
err := fmt.Errorf("failed to set search path %q: %w", schema, execErr)
tx.AddError(err)
return nil, err
}
if schema == publicSchema {
reset := func() error { return nil }
return reset, nil
}
reset := func() error {
return tx.Exec("SET search_path TO " + tx.Statement.Quote(publicSchema)).Error
}
return reset, nil
}
// AcquireXact acquires a PostgreSQL transaction-level advisory lock.
// The caller is responsible for ensuring that a transaction is active,
// and that the lock is released after use.
func (s *pgsqlSchemaHelper) LockSchema(tx *gorm.DB, schema string, retry *backoff.Options) (func() error, error) {
return AcquireXact(tx, schema, retry)
}
func (s *pgsqlSchemaHelper) CreateSchema(tx *gorm.DB, schema string) error {
sql := "CREATE SCHEMA IF NOT EXISTS " + tx.Statement.Quote(schema)
return tx.Exec(sql).Error
}
func (s *pgsqlSchemaHelper) DropSchema(tx *gorm.DB, schema string) error {
sql := "DROP SCHEMA IF EXISTS " + tx.Statement.Quote(schema) + " CASCADE"
return tx.Exec(sql).Error
}
func (s *pgsqlSchemaHelper) String() string {
return "pgsql"
}