From c29c4a6da26b6bc61403eb94b8865ac0a6742777 Mon Sep 17 00:00:00 2001 From: Jaz Date: Wed, 16 Oct 2024 12:00:47 -0700 Subject: [PATCH 1/3] WIP update event interface --- README.md | 28 ++++++++++++++-------------- pkg/consumer/consumer.go | 6 ++++++ pkg/models/models.go | 10 ++++++++++ 3 files changed, 30 insertions(+), 14 deletions(-) diff --git a/README.md b/README.md index 21a860f..4272cbd 100644 --- a/README.md +++ b/README.md @@ -77,17 +77,17 @@ $ websocat "ws://localhost:6008/subscribe?wantedCollections=app.bsky.feed.post&w ### Example events: -Jetstream events have 3 `type`s (so far): +Jetstream events have 3 `kinds`s (so far): -- `com`: a Commit to a repo which involves either a create, update, or delete of a record -- `id`: an Identity update for a DID which indicates that you may want to purge an identity cache and revalidate the DID doc and handle -- `acc`: an Account event that indicates a change in account status i.e. from `active` to `deactivated`, or to `takendown` if the PDS has taken down the repo. +- `commit`: a Commit to a repo which involves either a create, update, or delete of a record +- `identity`: an Identity update for a DID which indicates that you may want to purge an identity cache and revalidate the DID doc and handle +- `account`: an Account event that indicates a change in account status i.e. from `active` to `deactivated`, or to `takendown` if the PDS has taken down the repo. -Jetstream Commits have 3 `types`: +Jetstream Commits have 3 `operations`: -- `c`: Create a new record with the contents provided -- `u`: Update an existing record and replace it with the contents provided -- `d`: Delete an existing record with the DID, Collection, and RKey provided +- `create`: Create a new record with the contents provided +- `update`: Update an existing record and replace it with the contents provided +- `delete`: Delete an existing record with the DID, Collection, and RKey provided #### A like committed to a repo @@ -95,10 +95,10 @@ Jetstream Commits have 3 `types`: { "did": "did:plc:eygmaihciaxprqvxpfvl6flk", "time_us": 1725911162329308, - "type": "com", + "kind": "commit", "commit": { "rev": "3l3qo2vutsw2b", - "type": "c", + "operation": "create", "collection": "app.bsky.feed.like", "rkey": "3l3qo2vuowo2b", "record": { @@ -120,10 +120,10 @@ Jetstream Commits have 3 `types`: { "did": "did:plc:rfov6bpyztcnedeyyzgfq42k", "time_us": 1725516666833633, - "type": "com", + "type": "commit", "commit": { "rev": "3l3f6nzl3cv2s", - "type": "d", + "operation": "delete", "collection": "app.bsky.graph.follow", "rkey": "3l3dn7tku762u" } @@ -136,7 +136,7 @@ Jetstream Commits have 3 `types`: { "did": "did:plc:ufbl4k27gp6kzas5glhz7fim", "time_us": 1725516665234703, - "type": "id", + "kind": "identity", "identity": { "did": "did:plc:ufbl4k27gp6kzas5glhz7fim", "handle": "yohenrique.bsky.social", @@ -152,7 +152,7 @@ Jetstream Commits have 3 `types`: { "did": "did:plc:ufbl4k27gp6kzas5glhz7fim", "time_us": 1725516665333808, - "type": "acc", + "kind": "account", "account": { "active": true, "did": "did:plc:ufbl4k27gp6kzas5glhz7fim", diff --git a/pkg/consumer/consumer.go b/pkg/consumer/consumer.go index aebe845..27c8ebe 100644 --- a/pkg/consumer/consumer.go +++ b/pkg/consumer/consumer.go @@ -145,6 +145,7 @@ func (c *Consumer) HandleStreamEvent(ctx context.Context, xe *events.XRPCStreamE // Emit identity update e := models.Event{ Did: xe.RepoIdentity.Did, + Kind: models.EventKindIdentity, EventType: models.EventIdentity, Identity: xe.RepoIdentity, } @@ -168,6 +169,7 @@ func (c *Consumer) HandleStreamEvent(ctx context.Context, xe *events.XRPCStreamE // Emit account update e := models.Event{ Did: xe.RepoAccount.Did, + Kind: models.EventKindAccount, EventType: models.EventAccount, Account: xe.RepoAccount, } @@ -234,6 +236,7 @@ func (c *Consumer) HandleRepoCommit(ctx context.Context, evt *comatproto.SyncSub e := models.Event{ Did: evt.Repo, EventType: models.EventCommit, + Kind: models.EventKindCommit, } switch ek { @@ -269,6 +272,7 @@ func (c *Consumer) HandleRepoCommit(ctx context.Context, evt *comatproto.SyncSub e.Commit = &models.Commit{ Rev: evt.Rev, + Operation: models.CommitOperationCreate, OpType: models.CommitCreateRecord, Collection: collection, RKey: rkey, @@ -307,6 +311,7 @@ func (c *Consumer) HandleRepoCommit(ctx context.Context, evt *comatproto.SyncSub e.Commit = &models.Commit{ Rev: evt.Rev, + Operation: models.CommitOperationUpdate, OpType: models.CommitUpdateRecord, Collection: collection, RKey: rkey, @@ -317,6 +322,7 @@ func (c *Consumer) HandleRepoCommit(ctx context.Context, evt *comatproto.SyncSub // Emit the delete e.Commit = &models.Commit{ Rev: evt.Rev, + Operation: models.CommitOperationDelete, OpType: models.CommitDeleteRecord, Collection: collection, RKey: rkey, diff --git a/pkg/models/models.go b/pkg/models/models.go index 9c7e115..536b1af 100644 --- a/pkg/models/models.go +++ b/pkg/models/models.go @@ -15,6 +15,7 @@ type Event struct { Did string `json:"did"` TimeUS int64 `json:"time_us"` EventType string `json:"type"` + Kind string `json:"kind,omitempty"` Commit *Commit `json:"commit,omitempty"` Account *comatproto.SyncSubscribeRepos_Account `json:"account,omitempty"` Identity *comatproto.SyncSubscribeRepos_Identity `json:"identity,omitempty"` @@ -23,6 +24,7 @@ type Event struct { type Commit struct { Rev string `json:"rev,omitempty"` OpType string `json:"type"` + Operation string `json:"operation,omitempty"` Collection string `json:"collection,omitempty"` RKey string `json:"rkey,omitempty"` Record json.RawMessage `json:"record,omitempty"` @@ -37,4 +39,12 @@ var ( CommitCreateRecord = "c" CommitUpdateRecord = "u" CommitDeleteRecord = "d" + + EventKindCommit = "commit" + EventKindAccount = "account" + EventKindIdentity = "identity" + + CommitOperationCreate = "create" + CommitOperationUpdate = "update" + CommitOperationDelete = "delete" ) From 0c55a1b783ec3b835e31b6726b546bb22b1f3769 Mon Sep 17 00:00:00 2001 From: Jaz Date: Wed, 16 Oct 2024 12:16:38 -0700 Subject: [PATCH 2/3] Workflow and client --- .github/workflows/docker-image.yml | 2 ++ cmd/client/main.go | 14 ++++---------- 2 files changed, 6 insertions(+), 10 deletions(-) diff --git a/.github/workflows/docker-image.yml b/.github/workflows/docker-image.yml index c808ef2..d286fc3 100644 --- a/.github/workflows/docker-image.yml +++ b/.github/workflows/docker-image.yml @@ -4,6 +4,8 @@ on: workflow_dispatch: push: branches: ["main"] + pull_request: + types: [opened, reopened] env: REGISTRY: ghcr.io diff --git a/cmd/client/main.go b/cmd/client/main.go index b8aa480..7e9cc58 100644 --- a/cmd/client/main.go +++ b/cmd/client/main.go @@ -16,7 +16,7 @@ import ( ) const ( - serverAddr = "ws://localhost:6008/subscribe" + serverAddr = "wss://jetstream.atproto.tools/subscribe" ) func main() { @@ -31,10 +31,6 @@ func main() { config.WebsocketURL = serverAddr config.Compress = true - h := &handler{ - seenSeqs: make(map[int64]struct{}), - } - scheduler := sequential.NewScheduler("jetstream_localdev", logger, h.HandleEvent) c, err := client.NewClient(config, logger, scheduler) @@ -42,7 +38,7 @@ func main() { log.Fatalf("failed to create client: %v", err) } - cursor := time.Now().Add(5 * -time.Hour).UnixMicro() + cursor := time.Now().Add(90 * -time.Minute).UnixMicro() // Every 5 seconds print the events read and bytes read and average event size go func() { @@ -71,17 +67,15 @@ type handler struct { } func (h *handler) HandleEvent(ctx context.Context, event *models.Event) error { - // fmt.Println("evt") - // Unmarshal the record if there is one - if event.Commit != nil && (event.Commit.OpType == models.CommitCreateRecord || event.Commit.OpType == models.CommitUpdateRecord) { + if event.Commit != nil && (event.Commit.Operation == models.CommitOperationCreate || event.Commit.Operation == models.CommitOperationUpdate) { switch event.Commit.Collection { case "app.bsky.feed.post": var post apibsky.FeedPost if err := json.Unmarshal(event.Commit.Record, &post); err != nil { return fmt.Errorf("failed to unmarshal post: %w", err) } - // fmt.Printf("%v |(%s)| %s\n", time.UnixMicro(event.TimeUS).Local().Format("15:04:05"), event.Did, post.Text) + fmt.Printf("%v |(%s)| %s\n", time.UnixMicro(event.TimeUS).Local().Format("15:04:05"), event.Did, post.Text) } } From 5215851d60a5f389809a17eb46ea37cb6b60d24a Mon Sep 17 00:00:00 2001 From: Jaz Date: Wed, 16 Oct 2024 12:18:14 -0700 Subject: [PATCH 3/3] Fix workflow --- .github/workflows/docker-image.yml | 3 --- 1 file changed, 3 deletions(-) diff --git a/.github/workflows/docker-image.yml b/.github/workflows/docker-image.yml index d286fc3..2971112 100644 --- a/.github/workflows/docker-image.yml +++ b/.github/workflows/docker-image.yml @@ -3,9 +3,6 @@ name: Docker image on: workflow_dispatch: push: - branches: ["main"] - pull_request: - types: [opened, reopened] env: REGISTRY: ghcr.io