Skip to content

Commit

Permalink
♻️ refactor: added functions to support resolver #5
Browse files Browse the repository at this point in the history
  • Loading branch information
arisnguyen215 committed Dec 10, 2023
1 parent 12b9e67 commit 9df0f4e
Show file tree
Hide file tree
Showing 3 changed files with 158 additions and 22 deletions.
163 changes: 153 additions & 10 deletions db.resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,21 +42,25 @@ func NewMultiTenantDBResolver() *MultiTenantDBResolver {
return &MultiTenantDBResolver{
connectors: make(map[string]DBConnector),
once: make(map[string]*sync.Once),
dbs: make(map[string]struct {
C *sql.DB
S dbx.Dbx
}),
}
}

// AddConnector adds a new database connector for a specific tenant.
func (r *MultiTenantDBResolver) AddConnector(tenantId string, connector DBConnector) *MultiTenantDBResolver {
mu.Lock()
defer mu.Unlock()
r.mu.Lock()
defer r.mu.Unlock()
r.connectors[tenantId] = connector
r.once[tenantId] = &sync.Once{}
return r
}

func (r *MultiTenantDBResolver) AddConnectors(tenantId string, connectors ...DBConnector) *MultiTenantDBResolver {
mu.Lock()
defer mu.Unlock()
r.mu.Lock()
defer r.mu.Unlock()
for _, connector := range connectors {
r.connectors[tenantId] = connector
r.once[tenantId] = &sync.Once{}
Expand All @@ -65,29 +69,61 @@ func (r *MultiTenantDBResolver) AddConnectors(tenantId string, connectors ...DBC
}

func (r *MultiTenantDBResolver) AddPsqlConnectors(connectors ...postgres.MultiTenantPostgresConfig) *MultiTenantDBResolver {
r.mu.Lock()
defer r.mu.Unlock()
for _, connector := range connectors {
r.AddConnector(connector.Key, NewPostgresConnector(connector.Config))
}
return r
}

func (r *MultiTenantDBResolver) AddMsqlConnectors(connectors ...mysql.MultiTenantMysqlConfig) *MultiTenantDBResolver {
r.mu.Lock()
defer r.mu.Unlock()
for _, connector := range connectors {
r.AddConnector(connector.Key, NewMySQLConnector(connector.Config))
}
return r
}

// AddConnectorsFromConfig adds database connectors from configuration data.
func (r *MultiTenantDBResolver) AddConnectorsFromConfig(configs ...interface{}) *MultiTenantDBResolver {
r.mu.Lock()
defer r.mu.Unlock()

for _, config := range configs {
switch c := config.(type) {
case postgres.MultiTenantPostgresConfig:
connector := NewPostgresConnector(c.Config)
r.AddConnector(c.Key, connector)
case mysql.MultiTenantMysqlConfig:
connector := NewMySQLConnector(c.Config)
r.AddConnector(c.Key, connector)
case postgres.ClusterMultiTenantPostgresConfig:
for _, clusterConfig := range c.Clusters {
connector := NewPostgresConnector(clusterConfig.Config)
r.AddConnector(clusterConfig.Key, connector)
}
case mysql.ClusterMultiTenantMysqlConfig:
for _, clusterConfig := range c.Clusters {
connector := NewMySQLConnector(clusterConfig.Config)
r.AddConnector(clusterConfig.Key, connector)
}
}
}
return r
}

// GetConnector returns a database connection for a specific tenant.
func (r *MultiTenantDBResolver) GetConnector(tenantId string) (*sql.DB, dbx.Dbx) {
mu.RLock()
r.mu.RLock()
connector, ok := r.connectors[tenantId]
once := r.once[tenantId]
mu.RUnlock()
r.mu.RUnlock()

if !ok {
mu.Lock()
defer mu.Unlock()
r.mu.Lock()
defer r.mu.Unlock()
// Check again to avoid race condition
if connector, ok = r.connectors[tenantId]; !ok {
message := fmt.Sprintf("No connector found for tenant %s", tenantId)
Expand All @@ -104,14 +140,121 @@ func (r *MultiTenantDBResolver) GetConnector(tenantId string) (*sql.DB, dbx.Dbx)
if !s.IsConnected {
logger.Errorf(fmt.Sprintf("Error initializing database connection for tenant %s (executed in %v): %s", tenantId, time.Since(start), s.Message), s.Error)
}
dbs[tenantId] = struct {
r.dbs[tenantId] = struct {
C *sql.DB
S dbx.Dbx
}{
C: db,
S: s,
}
})
conn, _ := dbs[tenantId]
conn := r.dbs[tenantId]
return conn.C, conn.S
}

// RemoveConnector removes a database connector for a specific tenant.
func (r *MultiTenantDBResolver) RemoveConnector(tenantId string) *MultiTenantDBResolver {
r.mu.Lock()
defer r.mu.Unlock()
delete(r.connectors, tenantId)
delete(r.once, tenantId)
delete(r.dbs, tenantId)
return r
}

// UpdateConnector updates an existing database connector for a specific tenant.
func (r *MultiTenantDBResolver) UpdateConnector(tenantId string, connector DBConnector) *MultiTenantDBResolver {
r.mu.Lock()
defer r.mu.Unlock()
r.connectors[tenantId] = connector
r.once[tenantId] = &sync.Once{}
return r
}

// CloseConnection closes all database connections for a specific tenant.
func (r *MultiTenantDBResolver) CloseConnection(tenantId string) {
r.mu.Lock()
defer r.mu.Unlock()

if conn, ok := r.dbs[tenantId]; ok {
if conn.C != nil {
conn.C.Close()
}
delete(r.dbs, tenantId)
}
}

// CloseAllConnections closes all database connections for all tenants.
func (r *MultiTenantDBResolver) CloseAllConnections() {
r.mu.Lock()
defer r.mu.Unlock()
for tenantId, conn := range r.dbs {
if conn.C != nil {
conn.C.Close()
}
delete(r.dbs, tenantId)
}
}

// ClearAllConnectors removes all connectors and closes associated connections.
func (r *MultiTenantDBResolver) ClearAllConnectors() {
r.mu.Lock()
defer r.mu.Unlock()
for tenantId := range r.connectors {
r.CloseConnection(tenantId)
delete(r.connectors, tenantId)
delete(r.once, tenantId)
}
}

// SetDefaultConnector sets a default database connector for cases where no specific connector is available.
func (r *MultiTenantDBResolver) SetDefaultConnector(connector DBConnector) *MultiTenantDBResolver {
r.mu.Lock()
defer r.mu.Unlock()
r.connectors["default"] = connector
r.once["default"] = &sync.Once{}
return r
}

// GetDefaultConnector returns the default database connector.
func (r *MultiTenantDBResolver) GetDefaultConnector() (*sql.DB, dbx.Dbx) {
return r.GetConnector("default")
}

// GetConnectorInfo returns information about a specific connector for a tenant.
func (r *MultiTenantDBResolver) GetConnectorInfo(tenantId string) (DBConnector, bool) {
r.mu.RLock()
defer r.mu.RUnlock()
connector, ok := r.connectors[tenantId]
return connector, ok
}

// HealthCheck performs a health check on all connectors and returns the status for each tenant.
func (r *MultiTenantDBResolver) HealthCheck() map[string]bool {
r.mu.RLock()
defer r.mu.RUnlock()
status := make(map[string]bool, len(r.connectors))
for tenantId, connector := range r.connectors {
_, s := connector.Connect()
status[tenantId] = s.IsConnected
}
return status
}

// SafeConnector executes a function that requires a database connection safely.
// The function receives the database connection and returns an error if any.
// The connection is automatically closed after the function execution.
func (r *MultiTenantDBResolver) SafeConnector(tenantId string, fn func(db *sql.DB) error) error {
db, _ := r.GetConnector(tenantId)
defer r.CloseConnection(tenantId)
return fn(db)
}

// RefreshConnector refreshes the database connection for a specific tenant.
func (r *MultiTenantDBResolver) RefreshConnector(tenantId string) {
r.mu.RLock()
defer r.mu.RUnlock()
if _, ok := r.once[tenantId]; ok {
r.once[tenantId] = &sync.Once{}
}
}
12 changes: 0 additions & 12 deletions db.resolver_conf.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,6 @@
package dbresolver

import (
"database/sql"
"sync"

"github.com/sivaosorg/govm/dbx"
)

var (
dbs = make(map[string]struct {
C *sql.DB
S dbx.Dbx
})
mu sync.RWMutex
defaultConfig = dbConfig{
Host: "127.0.0.1",
Port: 5432,
Expand Down
5 changes: 5 additions & 0 deletions db.resolver_model.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,9 @@ type MySQLConnector struct {
type MultiTenantDBResolver struct {
connectors map[string]DBConnector
once map[string]*sync.Once
mu sync.RWMutex
dbs map[string]struct {
C *sql.DB
S dbx.Dbx
}
}

0 comments on commit 9df0f4e

Please sign in to comment.