You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
107 lines
2.6 KiB
107 lines
2.6 KiB
package db
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"ims/util/backoff"
|
|
|
|
"gorm.io/gorm"
|
|
"zestack.dev/env"
|
|
)
|
|
|
|
type pgsqlSchemaHelper struct{}
|
|
|
|
func (s *pgsqlSchemaHelper) PublicSchema() string {
|
|
return env.String("DB_PUBLIC_SCHEMA", "public")
|
|
}
|
|
|
|
func (s *pgsqlSchemaHelper) TenantSchema(tenantId uint) string {
|
|
if tenantId == 0 {
|
|
return s.PublicSchema()
|
|
}
|
|
return fmt.Sprintf(
|
|
"%s%d%s",
|
|
env.String("DB_TENANT_PREFIX", "tenant_"),
|
|
tenantId,
|
|
env.String("DB_TENANT_SUFFIX", ""),
|
|
)
|
|
}
|
|
|
|
func (s *pgsqlSchemaHelper) CurrentSchema(tx *gorm.DB) string {
|
|
// tx = tx.Session(&gorm.Session{})
|
|
var schema string
|
|
tx.Raw("SHOW search_path").Scan(&schema)
|
|
if schema == `"$user", public` {
|
|
return "public"
|
|
}
|
|
return schema
|
|
}
|
|
|
|
func (s *pgsqlSchemaHelper) UseSchema(tx *gorm.DB, schema string) (func() error, error) {
|
|
// tx = tx.Session(&gorm.Session{})
|
|
if schema == "" {
|
|
err := errors.New("schema name is empty")
|
|
tx.AddError(err)
|
|
return nil, err
|
|
}
|
|
|
|
currentSchema := s.CurrentSchema(tx)
|
|
publicSchema := s.PublicSchema()
|
|
|
|
// 当前 schema 与目标 schema 相同,无需切换
|
|
if schema == s.CurrentSchema(tx) {
|
|
reset := func() error { return nil }
|
|
return reset, nil
|
|
}
|
|
|
|
// 不支持租户切换
|
|
if currentSchema != publicSchema && schema != publicSchema {
|
|
err := fmt.Errorf(
|
|
"failed to switch schema %s from current schema %s: %w",
|
|
schema,
|
|
currentSchema,
|
|
ErrSwitchSchema,
|
|
)
|
|
tx.AddError(err)
|
|
return nil, err
|
|
}
|
|
|
|
sqlstr := "SET search_path TO " + tx.Statement.Quote(schema)
|
|
if execErr := tx.Exec(sqlstr).Error; execErr != nil {
|
|
err := fmt.Errorf("failed to set search path %q: %w", schema, execErr)
|
|
tx.AddError(err)
|
|
return nil, err
|
|
}
|
|
|
|
if schema == publicSchema {
|
|
reset := func() error { return nil }
|
|
return reset, nil
|
|
}
|
|
|
|
reset := func() error {
|
|
return tx.Exec("SET search_path TO " + tx.Statement.Quote(publicSchema)).Error
|
|
}
|
|
|
|
return reset, nil
|
|
}
|
|
|
|
// AcquireXact acquires a PostgreSQL transaction-level advisory lock.
|
|
// The caller is responsible for ensuring that a transaction is active,
|
|
// and that the lock is released after use.
|
|
func (s *pgsqlSchemaHelper) LockSchema(tx *gorm.DB, schema string, retry *backoff.Options) (func() error, error) {
|
|
return AcquireXact(tx, schema, retry)
|
|
}
|
|
|
|
func (s *pgsqlSchemaHelper) CreateSchema(tx *gorm.DB, schema string) error {
|
|
sql := "CREATE SCHEMA IF NOT EXISTS " + tx.Statement.Quote(schema)
|
|
return tx.Exec(sql).Error
|
|
}
|
|
|
|
func (s *pgsqlSchemaHelper) DropSchema(tx *gorm.DB, schema string) error {
|
|
sql := "DROP SCHEMA IF EXISTS " + tx.Statement.Quote(schema) + " CASCADE"
|
|
return tx.Exec(sql).Error
|
|
}
|
|
|
|
func (s *pgsqlSchemaHelper) String() string {
|
|
return "pgsql"
|
|
}
|
|
|