package pgsql import ( "errors" "ims/util/backoff" "gorm.io/gorm" ) type Migrator struct { DB *gorm.DB Retry *backoff.Options Log func(format string, args ...any) } func (m Migrator) retry(fn func() error) error { if m.Retry == nil { return fn() } return backoff.Retry(fn, func(o *backoff.Options) { *o = *m.Retry }) } func (m Migrator) acquireXact(tx *gorm.DB, lockKey string) error { return AcquireXact(tx, lockKey, m.Retry) } func (m Migrator) MigrateTenantModels(tenantId uint, models []any) error { m.Log("⏳ migrating tables for tenant %d", tenantId) if len(models) == 0 { m.Log("😭 no tenant tables to migrate") return errors.New("no tenant tables to migrate") } tx := m.DB.Session(&gorm.Session{}) schema := TenantSchema(tenantId) sql := "CREATE SCHEMA IF NOT EXISTS " + tx.Statement.Quote(schema) if err := tx.Exec(sql).Error; err != nil { m.Log("❌ failed to create schema for tenant %d: %s", tenantId, err) return err } err := tx.Transaction(func(tx *gorm.DB) error { err := m.acquireXact(tx, schema) if err != nil { m.Log("❌ failed to acquire advisory lock for tenant %d: %w", tenantId, err) return err } reset, searchPathErr := SetSearchPath(tx, schema) if searchPathErr != nil { m.Log("❌ failed to set search path '%s' to tenant %d: %w", schema, tenantId, searchPathErr) return searchPathErr } defer reset() if err := tx.AutoMigrate(models...); err != nil { m.Log("❌ failed to migrate tenant tables for tenant %d: %w", tenantId, err) return err } m.Log("✅ private tables migrated for tenant %d", tenantId) return nil }) if err != nil { return err } return nil } // MigrateSharedModels migrates the public tables in the database. func (m Migrator) MigrateSharedModels(models []any) error { m.Log("⏳ migrating public tables") if len(models) == 0 { m.Log("😭 no public tables to migrate") return errors.New("no public tables to migrate") } tx := m.DB.Begin() defer func() { if tx.Error == nil { tx.Commit() m.Log("✅ public tables migrated for all tenants") } else { tx.Rollback() } }() if err := m.acquireXact(tx, PublicSchema()); err != nil { m.Log("❌ failed to acquire advisory lock: %w", err) return err } if err := tx.AutoMigrate(models...); err != nil { m.Log("❌ failed to migrate public tables: %w", err) return err } return nil } func (m Migrator) DropSchemaForTenant(tenantId uint) error { m.Log("⏳ dropping schema for tenant %d", tenantId) tx := m.DB.Session(&gorm.Session{}) schema := TenantSchema(tenantId) return m.retry(func() error { sql := "DROP SCHEMA IF EXISTS " + tx.Statement.Quote(schema) + " CASCADE" if err := tx.Exec(sql).Error; err != nil { m.Log("❌ failed to drop schema for tenant %d: %s", tenantId, err) return err } m.Log("✅ schema dropped for tenant %d", tenantId) return nil }) }