Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

engine: refactor rotations to process through job queue #4269

Open
wants to merge 17 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
054b269
feat(rotation): enhance rotation management with new database integra…
mastercactapus Feb 5, 2025
379a34c
feat(rotation): implement tracking for rotation updates with database…
mastercactapus Feb 5, 2025
07426f7
feat(rotation): implement entity update tracking and rotation work ma…
mastercactapus Feb 6, 2025
40d58da
feat(rotation): integrate event bus into rotation store for enhanced …
mastercactapus Feb 6, 2025
d43c639
refactor(rotation): remove unused UpdateAll function from rotation ma…
mastercactapus Feb 6, 2025
0aa39cf
refactor(rotation): remove UpdateOneRotation and related functions fr…
mastercactapus Feb 6, 2025
20c7e9b
refactor(rotation): remove unused SQL statements and preparation logi…
mastercactapus Feb 6, 2025
1740697
feat(rotation): add EventBus to App struct and update rotation handli…
mastercactapus Feb 6, 2025
50599e9
refactor(rotation): remove EventBus dependency from RotationStore ini…
mastercactapus Feb 6, 2025
1f4046a
refactor(tests): simplify user assertion logic in WaitAndAssertOnCall…
mastercactapus Feb 7, 2025
5b6a5df
chore(schema): update auto-generated data hash in schema.sql
mastercactapus Feb 7, 2025
a1a1761
refactor(migrations): remove triggers and function related to rotatio…
mastercactapus Feb 7, 2025
ca9832f
refactor(migrations): rename triggers for clarity and update schema.sql
mastercactapus Feb 7, 2025
f05525b
refactor(rotation): add queue assignment in updateRotation and improv…
mastercactapus Feb 10, 2025
cb245d7
add version
mastercactapus Feb 10, 2025
13933da
refactor(rotation): enhance calcAdvance to handle multiple rotation v…
mastercactapus Feb 10, 2025
532494b
refactor(rotation): update comments for lookForWork and updateRotatio…
mastercactapus Feb 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions app/initgraphql.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func (app *App) initGraphQL(ctx context.Context) error {
SWO: app.cfg.SWO,
APIKeyStore: app.APIKeyStore,
DestReg: app.DestRegistry,
EventBus: app.EventBus,
}

return nil
Expand Down
2 changes: 1 addition & 1 deletion engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func NewEngine(ctx context.Context, db *sql.DB, c *Config) (*Engine, error) {
return nil, err
}

rotMgr, err := rotationmanager.NewDB(ctx, db)
rotMgr, err := rotationmanager.NewDB(ctx, db, c.RiverDBSQL)
if err != nil {
return nil, errors.Wrap(err, "rotation management backend")
}
Expand Down
20 changes: 13 additions & 7 deletions engine/rotationmanager/advance.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,26 @@ type advance struct {
}

type rotState struct {
rotation.State
Version int
ShiftStart time.Time
Position int
Version int
}

// calcAdvance will calculate rotation advancement if it is required. If not, nil is returned
func calcAdvance(ctx context.Context, t time.Time, rot *rotation.Rotation, state rotState, partCount int) *advance {
func calcAdvance(ctx context.Context, t time.Time, rot *rotation.Rotation, state rotState, partCount int) (*advance, error) {
var mustUpdate bool
origPos := state.Position

// get next shift start time
newStart := rot.EndTime(state.ShiftStart)
if state.Version == 1 {
switch state.Version {
case 1:
newStart = calcVersion1EndTime(rot, state.ShiftStart)
mustUpdate = true
case 2:
// no-op
default:
return nil, fmt.Errorf("unknown rotation version (supported: 1,2): %d", state.Version)
}

if state.Position >= partCount {
Expand All @@ -48,10 +54,10 @@ func calcAdvance(ctx context.Context, t time.Time, rot *rotation.Rotation, state
// If migrating from version 1 to 2 without changing
// who's on-call do so silently.
silent: state.Version == 1 && state.Position == origPos,
}
}, nil
}
// in the future, so nothing to do yet
return nil
return nil, nil
}

if !newStart.After(t.Add(-15 * time.Minute)) {
Expand All @@ -78,5 +84,5 @@ func calcAdvance(ctx context.Context, t time.Time, rot *rotation.Rotation, state
return &advance{
id: rot.ID,
newPosition: state.Position,
}
}, nil
}
40 changes: 5 additions & 35 deletions engine/rotationmanager/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,63 +4,33 @@ import (
"context"
"database/sql"

"github.com/riverqueue/river"
"github.com/target/goalert/engine/processinglock"
"github.com/target/goalert/util"
)

// DB manages rotations in Postgres.
type DB struct {
lock *processinglock.Lock

currentTime *sql.Stmt

lockPart *sql.Stmt
rotate *sql.Stmt
rotateData *sql.Stmt
riverDBSQL *river.Client[*sql.Tx]
}

// Name returns the name of the module.
func (db *DB) Name() string { return "Engine.RotationManager" }

// NewDB will create a new DB, preparing all statements necessary.
func NewDB(ctx context.Context, db *sql.DB) (*DB, error) {
func NewDB(ctx context.Context, db *sql.DB, riverDBSQL *river.Client[*sql.Tx]) (*DB, error) {
lock, err := processinglock.NewLock(ctx, db, processinglock.Config{
Type: processinglock.TypeRotation,
Version: 2,
})
if err != nil {
return nil, err
}
p := &util.Prepare{Ctx: ctx, DB: db}

return &DB{
lock: lock,

currentTime: p.P(`select now()`),
lockPart: p.P(`lock rotation_participants, rotation_state in exclusive mode`),
rotate: p.P(`
update rotation_state
set
shift_start = now(),
rotation_participant_id = (select id from rotation_participants where rotation_id = $1 and position = $2),
version = 2
where rotation_id = $1
`),
rotateData: p.P(`
select
rot.id,
rot."type",
rot.start_time,
rot.shift_length,
rot.time_zone,
state.shift_start,
state."position",
rot.participant_count,
state.version
from rotations rot
join rotation_state state on state.rotation_id = rot.id
where $1 or state.rotation_id = $2
for update skip locked
`),
}, p.Err
riverDBSQL: riverDBSQL,
}, nil
}
63 changes: 63 additions & 0 deletions engine/rotationmanager/lookforwork.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package rotationmanager

import (
"context"
"database/sql"
"errors"
"fmt"
"time"

"github.com/riverqueue/river"
"github.com/target/goalert/gadb"
)

type LookForWorkArgs struct{}

func (LookForWorkArgs) Kind() string { return "rotation-manager-lfw" }

// lookForWork will schedule jobs for rotations in the entity_updates table.
func (db *DB) lookForWork(ctx context.Context, j *river.Job[LookForWorkArgs]) error {
var hadWork bool
err := db.lock.WithTxShared(ctx, func(ctx context.Context, tx *sql.Tx) error {
g := gadb.New(tx)

rotations, err := g.RotMgrFindWork(ctx)
if errors.Is(err, sql.ErrNoRows) {
// done, no more work
return nil
}
if err != nil {
return fmt.Errorf("find work: %w", err)
}
if len(rotations) == 0 {
return nil
}

var params []river.InsertManyParams
for _, r := range rotations {
params = append(params, river.InsertManyParams{
Args: UpdateArgs{RotationID: r},
InsertOpts: &river.InsertOpts{
Queue: QueueName,
Priority: PriorityEvent,
},
})
}

_, err = db.riverDBSQL.InsertManyFastTx(ctx, tx, params)
if err != nil {
return fmt.Errorf("insert many: %w", err)
}
hadWork = true
return nil
})
if err != nil {
return fmt.Errorf("look for work: %w", err)
}
if !hadWork {
return nil
}

// There was work to do, so wait a bit before looking again.
return river.JobSnooze(time.Second)
}
82 changes: 82 additions & 0 deletions engine/rotationmanager/queries.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
-- name: RotMgrRotationData :one
-- Get rotation data for a given rotation ID
SELECT
now()::timestamptz AS now,
rot.id,
rot.type,
rot.start_time,
rot.shift_length,
rot.time_zone,
state.position,
state.shift_start,
state.version,
ARRAY (
SELECT
p.id
FROM
rotation_participants p
WHERE
p.rotation_id = rot.id
ORDER BY
position)::uuid[] AS participants
FROM
rotations rot
LEFT JOIN rotation_state state ON rot.id = state.rotation_id
WHERE
rot.id = @rotation_id;

-- name: RotMgrStart :exec
-- Start a rotation.
INSERT INTO rotation_state(rotation_id, position, shift_start, rotation_participant_id)
SELECT
p.rotation_id,
0,
now(),
id
FROM
rotation_participants p
WHERE
p.rotation_id = @rotation_id
AND position = 0;

-- name: RotMgrEnd :exec
-- End a rotation.
DELETE FROM rotation_state
WHERE rotation_id = @rotation_id;

-- name: RotMgrUpdate :exec
-- Update the rotation state.
UPDATE
rotation_state
SET
position = @position,
shift_start = now(),
rotation_participant_id = @rotation_participant_id
WHERE
rotation_id = @rotation_id;

-- name: RotMgrFindWork :many
WITH items AS (
SELECT
id,
entity_id
FROM
entity_updates
WHERE
entity_type = 'rotation'
FOR UPDATE
SKIP LOCKED
LIMIT 1000
),
_delete AS (
DELETE FROM entity_updates
WHERE id IN (
SELECT
id
FROM
items))
SELECT DISTINCT
entity_id
FROM
items;

54 changes: 54 additions & 0 deletions engine/rotationmanager/setup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package rotationmanager

import (
"context"
"fmt"
"time"

"github.com/riverqueue/river"
"github.com/target/goalert/engine/processinglock"
"github.com/target/goalert/event"
"github.com/target/goalert/schedule/rotation"
)

const (
QueueName = "rotation-manager"
PriorityScheduled = 1
PriorityEvent = 2
PriorityLFW = 4
)

var _ processinglock.Setupable = (*DB)(nil) // assert that DB implements processinglock.Setupable

// Setup implements processinglock.Setupable.
func (db *DB) Setup(ctx context.Context, args processinglock.SetupArgs) error {
river.AddWorker(args.Workers, river.WorkFunc(db.updateRotation))
river.AddWorker(args.Workers, river.WorkFunc(db.lookForWork))

event.RegisterJobSource(args.EventBus, func(data rotation.Update) (river.JobArgs, *river.InsertOpts) {
return UpdateArgs{RotationID: data.ID}, &river.InsertOpts{
Queue: QueueName,
Priority: PriorityEvent,
}
})

err := args.River.Queues().Add(QueueName, river.QueueConfig{MaxWorkers: 5})
if err != nil {
return fmt.Errorf("add queue: %w", err)
}

args.River.PeriodicJobs().AddMany([]*river.PeriodicJob{
river.NewPeriodicJob(
river.PeriodicInterval(time.Minute),
func() (river.JobArgs, *river.InsertOpts) {
return LookForWorkArgs{}, &river.InsertOpts{
Queue: QueueName,
Priority: PriorityLFW,
}
},
&river.PeriodicJobOpts{RunOnStart: true},
),
})

return nil
}
Loading
Loading