Skip to content

Commit

Permalink
Merge pull request #7 from bluesky-social/interface_refactor
Browse files Browse the repository at this point in the history
[WIP] Refactor event struct for clarity
  • Loading branch information
ericvolp12 authored Oct 16, 2024
2 parents dc17352 + 5215851 commit 011b545
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 25 deletions.
1 change: 0 additions & 1 deletion .github/workflows/docker-image.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ name: Docker image
on:
workflow_dispatch:
push:
branches: ["main"]

env:
REGISTRY: ghcr.io
Expand Down
28 changes: 14 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,28 +77,28 @@ $ websocat "ws://localhost:6008/subscribe?wantedCollections=app.bsky.feed.post&w

### Example events:

Jetstream events have 3 `type`s (so far):
Jetstream events have 3 `kinds`s (so far):

- `com`: a Commit to a repo which involves either a create, update, or delete of a record
- `id`: an Identity update for a DID which indicates that you may want to purge an identity cache and revalidate the DID doc and handle
- `acc`: an Account event that indicates a change in account status i.e. from `active` to `deactivated`, or to `takendown` if the PDS has taken down the repo.
- `commit`: a Commit to a repo which involves either a create, update, or delete of a record
- `identity`: an Identity update for a DID which indicates that you may want to purge an identity cache and revalidate the DID doc and handle
- `account`: an Account event that indicates a change in account status i.e. from `active` to `deactivated`, or to `takendown` if the PDS has taken down the repo.

Jetstream Commits have 3 `types`:
Jetstream Commits have 3 `operations`:

- `c`: Create a new record with the contents provided
- `u`: Update an existing record and replace it with the contents provided
- `d`: Delete an existing record with the DID, Collection, and RKey provided
- `create`: Create a new record with the contents provided
- `update`: Update an existing record and replace it with the contents provided
- `delete`: Delete an existing record with the DID, Collection, and RKey provided

#### A like committed to a repo

```json
{
"did": "did:plc:eygmaihciaxprqvxpfvl6flk",
"time_us": 1725911162329308,
"type": "com",
"kind": "commit",
"commit": {
"rev": "3l3qo2vutsw2b",
"type": "c",
"operation": "create",
"collection": "app.bsky.feed.like",
"rkey": "3l3qo2vuowo2b",
"record": {
Expand All @@ -120,10 +120,10 @@ Jetstream Commits have 3 `types`:
{
"did": "did:plc:rfov6bpyztcnedeyyzgfq42k",
"time_us": 1725516666833633,
"type": "com",
"type": "commit",
"commit": {
"rev": "3l3f6nzl3cv2s",
"type": "d",
"operation": "delete",
"collection": "app.bsky.graph.follow",
"rkey": "3l3dn7tku762u"
}
Expand All @@ -136,7 +136,7 @@ Jetstream Commits have 3 `types`:
{
"did": "did:plc:ufbl4k27gp6kzas5glhz7fim",
"time_us": 1725516665234703,
"type": "id",
"kind": "identity",
"identity": {
"did": "did:plc:ufbl4k27gp6kzas5glhz7fim",
"handle": "yohenrique.bsky.social",
Expand All @@ -152,7 +152,7 @@ Jetstream Commits have 3 `types`:
{
"did": "did:plc:ufbl4k27gp6kzas5glhz7fim",
"time_us": 1725516665333808,
"type": "acc",
"kind": "account",
"account": {
"active": true,
"did": "did:plc:ufbl4k27gp6kzas5glhz7fim",
Expand Down
14 changes: 4 additions & 10 deletions cmd/client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
)

const (
serverAddr = "ws://localhost:6008/subscribe"
serverAddr = "wss://jetstream.atproto.tools/subscribe"
)

func main() {
Expand All @@ -31,18 +31,14 @@ func main() {
config.WebsocketURL = serverAddr
config.Compress = true

h := &handler{
seenSeqs: make(map[int64]struct{}),
}

scheduler := sequential.NewScheduler("jetstream_localdev", logger, h.HandleEvent)

c, err := client.NewClient(config, logger, scheduler)
if err != nil {
log.Fatalf("failed to create client: %v", err)
}

cursor := time.Now().Add(5 * -time.Hour).UnixMicro()
cursor := time.Now().Add(90 * -time.Minute).UnixMicro()

// Every 5 seconds print the events read and bytes read and average event size
go func() {
Expand Down Expand Up @@ -71,17 +67,15 @@ type handler struct {
}

func (h *handler) HandleEvent(ctx context.Context, event *models.Event) error {
// fmt.Println("evt")

// Unmarshal the record if there is one
if event.Commit != nil && (event.Commit.OpType == models.CommitCreateRecord || event.Commit.OpType == models.CommitUpdateRecord) {
if event.Commit != nil && (event.Commit.Operation == models.CommitOperationCreate || event.Commit.Operation == models.CommitOperationUpdate) {
switch event.Commit.Collection {
case "app.bsky.feed.post":
var post apibsky.FeedPost
if err := json.Unmarshal(event.Commit.Record, &post); err != nil {
return fmt.Errorf("failed to unmarshal post: %w", err)
}
// fmt.Printf("%v |(%s)| %s\n", time.UnixMicro(event.TimeUS).Local().Format("15:04:05"), event.Did, post.Text)
fmt.Printf("%v |(%s)| %s\n", time.UnixMicro(event.TimeUS).Local().Format("15:04:05"), event.Did, post.Text)
}
}

Expand Down
6 changes: 6 additions & 0 deletions pkg/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ func (c *Consumer) HandleStreamEvent(ctx context.Context, xe *events.XRPCStreamE
// Emit identity update
e := models.Event{
Did: xe.RepoIdentity.Did,
Kind: models.EventKindIdentity,
EventType: models.EventIdentity,
Identity: xe.RepoIdentity,
}
Expand All @@ -168,6 +169,7 @@ func (c *Consumer) HandleStreamEvent(ctx context.Context, xe *events.XRPCStreamE
// Emit account update
e := models.Event{
Did: xe.RepoAccount.Did,
Kind: models.EventKindAccount,
EventType: models.EventAccount,
Account: xe.RepoAccount,
}
Expand Down Expand Up @@ -234,6 +236,7 @@ func (c *Consumer) HandleRepoCommit(ctx context.Context, evt *comatproto.SyncSub
e := models.Event{
Did: evt.Repo,
EventType: models.EventCommit,
Kind: models.EventKindCommit,
}

switch ek {
Expand Down Expand Up @@ -269,6 +272,7 @@ func (c *Consumer) HandleRepoCommit(ctx context.Context, evt *comatproto.SyncSub

e.Commit = &models.Commit{
Rev: evt.Rev,
Operation: models.CommitOperationCreate,
OpType: models.CommitCreateRecord,
Collection: collection,
RKey: rkey,
Expand Down Expand Up @@ -307,6 +311,7 @@ func (c *Consumer) HandleRepoCommit(ctx context.Context, evt *comatproto.SyncSub

e.Commit = &models.Commit{
Rev: evt.Rev,
Operation: models.CommitOperationUpdate,
OpType: models.CommitUpdateRecord,
Collection: collection,
RKey: rkey,
Expand All @@ -317,6 +322,7 @@ func (c *Consumer) HandleRepoCommit(ctx context.Context, evt *comatproto.SyncSub
// Emit the delete
e.Commit = &models.Commit{
Rev: evt.Rev,
Operation: models.CommitOperationDelete,
OpType: models.CommitDeleteRecord,
Collection: collection,
RKey: rkey,
Expand Down
10 changes: 10 additions & 0 deletions pkg/models/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type Event struct {
Did string `json:"did"`
TimeUS int64 `json:"time_us"`
EventType string `json:"type"`
Kind string `json:"kind,omitempty"`
Commit *Commit `json:"commit,omitempty"`
Account *comatproto.SyncSubscribeRepos_Account `json:"account,omitempty"`
Identity *comatproto.SyncSubscribeRepos_Identity `json:"identity,omitempty"`
Expand All @@ -23,6 +24,7 @@ type Event struct {
type Commit struct {
Rev string `json:"rev,omitempty"`
OpType string `json:"type"`
Operation string `json:"operation,omitempty"`
Collection string `json:"collection,omitempty"`
RKey string `json:"rkey,omitempty"`
Record json.RawMessage `json:"record,omitempty"`
Expand All @@ -37,4 +39,12 @@ var (
CommitCreateRecord = "c"
CommitUpdateRecord = "u"
CommitDeleteRecord = "d"

EventKindCommit = "commit"
EventKindAccount = "account"
EventKindIdentity = "identity"

CommitOperationCreate = "create"
CommitOperationUpdate = "update"
CommitOperationDelete = "delete"
)

0 comments on commit 011b545

Please sign in to comment.