Skip to content

Commit

Permalink
engine/compatmanager: migrate to sqlc (#4104)
Browse files Browse the repository at this point in the history
* migrate compat module to sqlc

* remove old sql code

* fix query

* fix compat

* revert dest migration
  • Loading branch information
mastercactapus authored Dec 26, 2024
1 parent ec6fc7c commit 91dd701
Show file tree
Hide file tree
Showing 4 changed files with 274 additions and 96 deletions.
51 changes: 1 addition & 50 deletions engine/compatmanager/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (

"github.com/target/goalert/engine/processinglock"
"github.com/target/goalert/notification/slack"
"github.com/target/goalert/util"
)

// DB handles keeping compatibility-related data in sync.
Expand All @@ -15,13 +14,6 @@ type DB struct {
lock *processinglock.Lock

cs *slack.ChannelSender

slackSubMissingCM *sql.Stmt
updateSubCMID *sql.Stmt
insertCM *sql.Stmt

cmMissingSub *sql.Stmt
insertSub *sql.Stmt
}

// Name returns the name of the module.
Expand All @@ -37,50 +29,9 @@ func NewDB(ctx context.Context, db *sql.DB, cs *slack.ChannelSender) (*DB, error
return nil, err
}

p := &util.Prepare{Ctx: ctx, DB: db}

return &DB{
db: db,
lock: lock,
cs: cs,

// get all entries missing cm_id where provider_id starts with "slack:"
slackSubMissingCM: p.P(`
select id, user_id, subject_id, provider_id from auth_subjects where
provider_id like 'slack:%' and cm_id is null
for update skip locked
limit 10
`),

// update cm_id for a given user_id and subject_id
updateSubCMID: p.P(`
update auth_subjects
set cm_id = (
select id from user_contact_methods
where type = 'SLACK_DM' and value = $2
) where id = $1
`),

insertCM: p.P(`
insert into user_contact_methods (id, name, type, value, user_id, pending)
values ($1, $2, $3, $4, $5, false)
on conflict (type, value) do nothing
`),

// find verified contact methods (disabled false) with no auth subject
cmMissingSub: p.P(`
select id, user_id, value from user_contact_methods where
type = 'SLACK_DM' and not disabled and not exists (
select 1 from auth_subjects where cm_id = user_contact_methods.id
)
for update skip locked
limit 10
`),

insertSub: p.P(`
insert into auth_subjects (user_id, subject_id, provider_id, cm_id)
values ($1, $2, $3, $4)
on conflict (subject_id, provider_id) do update set user_id = $1, cm_id = $4
`),
}, p.Err
}, nil
}
66 changes: 66 additions & 0 deletions engine/compatmanager/queries.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
-- name: CompatAuthSubSlackMissingCM :many
-- Get up to 10 auth_subjects (slack only) missing a contact method.
SELECT
*
FROM
auth_subjects
WHERE
provider_id LIKE 'slack:%'
AND cm_id IS NULL
FOR UPDATE
SKIP LOCKED
LIMIT 10;

-- name: CompatAuthSubSetCMID :exec
-- Updates the contact method id for an auth_subject with the given destination.
UPDATE
auth_subjects
SET
cm_id =(
SELECT
id
FROM
user_contact_methods
WHERE
type = 'SLACK_DM'
AND value = $2)
WHERE
auth_subjects.id = $1;

-- name: CompatInsertUserCM :exec
-- Inserts a new contact method for a user.
INSERT INTO user_contact_methods(id, name, type, value, user_id, pending)
VALUES ($1, $2, $3, $4, $5, FALSE)
ON CONFLICT (type, value)
DO NOTHING;

-- name: CompatCMMissingSub :many
-- Get up to 10 contact methods missing an auth_subjects link.
SELECT
id,
user_id,
value
FROM
user_contact_methods
WHERE
type = 'SLACK_DM'
AND NOT disabled
AND NOT EXISTS (
SELECT
1
FROM
auth_subjects
WHERE
cm_id = user_contact_methods.id)
FOR UPDATE
SKIP LOCKED
LIMIT 10;

-- name: CompatUpsertAuthSubject :exec
-- Inserts a new auth_subject for a user.
INSERT INTO auth_subjects(user_id, subject_id, provider_id, cm_id)
VALUES ($1, $2, $3, $4)
ON CONFLICT (subject_id, provider_id)
DO UPDATE SET
user_id = $1, cm_id = $4;

78 changes: 32 additions & 46 deletions engine/compatmanager/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"strings"

"github.com/google/uuid"
"github.com/target/goalert/gadb"
"github.com/target/goalert/permission"
"github.com/target/goalert/util/log"
"github.com/target/goalert/util/sqlutil"
Expand Down Expand Up @@ -39,39 +40,26 @@ func (db *DB) updateAuthSubjects(ctx context.Context) error {
}
defer sqlutil.Rollback(ctx, "engine: update auth subjects", tx)

type cm struct {
ID uuid.UUID
UserID uuid.UUID
SlackUserID string
SlackTeamID string
}

var cms []cm
rows, err := tx.StmtContext(ctx, db.cmMissingSub).QueryContext(ctx)
q := gadb.New(tx)
rows, err := q.CompatCMMissingSub(ctx)
if err != nil {
return fmt.Errorf("query: %w", err)
}
for rows.Next() {
var c cm
err = rows.Scan(&c.ID, &c.UserID, &c.SlackUserID)
for _, row := range rows {
u, err := db.cs.User(ctx, row.Value)
if err != nil {
return fmt.Errorf("scan: %w", err)
}

u, err := db.cs.User(ctx, c.SlackUserID)
if err != nil {
log.Log(ctx, fmt.Errorf("update auth subjects: lookup Slack user (%s): %w", c.SlackUserID, err))
log.Log(ctx, fmt.Errorf("update auth subjects: lookup Slack user (%s): %w", row.Value, err))
continue
}

c.SlackTeamID = u.TeamID
cms = append(cms, c)
}

for _, c := range cms {
_, err = tx.StmtContext(ctx, db.insertSub).ExecContext(ctx, c.UserID, c.SlackUserID, "slack:"+c.SlackTeamID, c.ID)
err = q.CompatUpsertAuthSubject(ctx, gadb.CompatUpsertAuthSubjectParams{
UserID: row.UserID,
ProviderID: "slack:" + u.TeamID,
SubjectID: u.ID,
CmID: uuid.NullUUID{UUID: row.ID, Valid: true},
})
if err != nil {
return fmt.Errorf("insert: %w", err)
return fmt.Errorf("upsert auth subject: %w", err)
}
}

Expand All @@ -83,36 +71,25 @@ func (db *DB) updateAuthSubjects(ctx context.Context) error {
return nil
}

// updateContactMethods will create contact methods for associated auth_subjects (e.g. Slack direct message).
//
// To do this, we look for auth_subjects that are missing the contact method ID
// field (`cm_id`) for slack, and create a Slack DM contact method for the user
// associated with the record.
func (db *DB) updateContactMethods(ctx context.Context) error {
tx, err := db.lock.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("begin tx: %w", err)
}
defer sqlutil.Rollback(ctx, "engine: update contact methods", tx)

type sub struct {
ID int
UserID string
SubjectID string
ProviderID string
}

var subs []sub
rows, err := tx.StmtContext(ctx, db.slackSubMissingCM).QueryContext(ctx)
q := gadb.New(tx)
rows, err := q.CompatAuthSubSlackMissingCM(ctx)
if err != nil {
return fmt.Errorf("query: %w", err)
}

for rows.Next() {
var s sub
err = rows.Scan(&s.ID, &s.UserID, &s.SubjectID, &s.ProviderID)
if err != nil {
return fmt.Errorf("scan: %w", err)
}
subs = append(subs, s)
}

for _, s := range subs {
for _, s := range rows {
// provider id contains the team id in the format "slack:team_id"
// but we need to store the contact method id in the format "team_id:subject_id"
teamID := strings.TrimPrefix(s.ProviderID, "slack:")
Expand All @@ -123,12 +100,21 @@ func (db *DB) updateContactMethods(ctx context.Context) error {
continue
}

_, err = tx.StmtContext(ctx, db.insertCM).ExecContext(ctx, uuid.New(), team.Name, "SLACK_DM", value, s.UserID)
err = q.CompatInsertUserCM(ctx, gadb.CompatInsertUserCMParams{
ID: uuid.New(),
Name: team.Name,
Type: gadb.EnumUserContactMethodTypeSLACKDM,
Value: value,
UserID: s.UserID,
})
if err != nil {
return fmt.Errorf("insert cm: %w", err)
}

_, err = tx.StmtContext(ctx, db.updateSubCMID).ExecContext(ctx, s.ID, value)
err = q.CompatAuthSubSetCMID(ctx, gadb.CompatAuthSubSetCMIDParams{
ID: s.ID,
Value: value,
})
if err != nil {
return fmt.Errorf("update sub cm_id: %w", err)
}
Expand Down
Loading

0 comments on commit 91dd701

Please sign in to comment.