Skip to content

Commit 6e20856

Browse files
committed
Add a data connector for Attio CRM
1 parent dbef518 commit 6e20856

25 files changed

Lines changed: 2093 additions & 42 deletions

.env.sample

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,5 +10,7 @@ SOURCE_POSTGRES_DATABASE_URL=postgres://postgres:postgres@host.docker.internal:5
1010
SOURCE_AMPLITUDE_API_KEY=[REPLACE_ME]
1111
SOURCE_AMPLITUDE_SECRET_KEY=[REPLACE_ME]
1212

13+
SOURCE_ATTIO_API_ACCESS_TOKEN=[REPLACE_ME]
14+
1315
BEMIDB_LOG_LEVEL=INFO
1416
BEMIDB_DISABLE_ANONYMOUS_ANALYTICS=true

Dockerfile

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,24 +43,34 @@ RUN cd /app/src/syncer-postgres && go mod download
4343
COPY --chown=app:app src/syncer-amplitude/go.mod src/syncer-amplitude/go.sum /app/src/syncer-amplitude/
4444
RUN cd /app/src/syncer-amplitude && go mod download
4545

46+
COPY --chown=app:app src/syncer-attio/go.mod src/syncer-attio/go.sum /app/src/syncer-attio/
47+
RUN cd /app/src/syncer-attio && go mod download
48+
4649
COPY --chown=app:app src/server/go.mod src/server/go.sum /app/src/server/
4750
RUN cd /app/src/server && go mod download
4851

4952
COPY --chown=app:app src/common /app/src/common
5053
COPY --chown=app:app src/syncer-postgres /app/src/syncer-postgres
5154
COPY --chown=app:app src/syncer-amplitude /app/src/syncer-amplitude
55+
COPY --chown=app:app src/syncer-attio /app/src/syncer-attio
5256
COPY --chown=app:app src/server /app/src/server
5357

5458
RUN ARCH=$(dpkg --print-architecture) \
5559
&& cd /app/src/syncer-postgres && CGO_ENABLED=1 GOOS=linux GOARCH=$ARCH go build -o /app/bin/syncer-postgres \
5660
&& cd /app/src/syncer-amplitude && CGO_ENABLED=1 GOOS=linux GOARCH=$ARCH go build -o /app/bin/syncer-amplitude \
61+
&& cd /app/src/syncer-attio && CGO_ENABLED=1 GOOS=linux GOARCH=$ARCH go build -o /app/bin/syncer-attio \
5762
&& cd /app/src/server && CGO_ENABLED=1 GOOS=linux GOARCH=$ARCH go build -o /app/bin/server
5863

5964
# Prepare final image ##############################################################################
6065

6166
FROM base AS final
6267

63-
COPY --chown=app:app --from=compile /app/bin/syncer-postgres /app/bin/syncer-amplitude /app/bin/server /app/bin/
68+
COPY --chown=app:app --from=compile \
69+
/app/bin/syncer-postgres \
70+
/app/bin/syncer-amplitude \
71+
/app/bin/syncer-attio \
72+
/app/bin/server \
73+
/app/bin/
6474
COPY --chown=app:app docker/bin /app/bin/
6575

6676
USER app

Makefile

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,14 @@ install:
55
devbox run "cd src/common && go mod tidy && \
66
cd ../syncer-postgres && go mod tidy && \
77
cd ../syncer-amplitude && go mod tidy && \
8+
cd ../syncer-attio && go mod tidy && \
89
cd ../server && go mod tidy"
910

1011
lint:
1112
devbox run "cd src/common && go fmt && staticcheck . && \
1213
cd ../syncer-postgres && go fmt && deadcode . && staticcheck . && \
1314
cd ../syncer-amplitude && go fmt && deadcode . && staticcheck . && \
15+
cd ../syncer-attio && go fmt && deadcode . && staticcheck . && \
1416
cd ../server && go fmt && deadcode . && staticcheck ."
1517

1618
build:
@@ -37,6 +39,9 @@ local-syncer-postgres: local-build
3739
local-syncer-amplitude: local-build
3840
docker run -it --rm --env-file .env -e DESTINATION_SCHEMA_NAME=amplitude bemidb:local syncer-amplitude
3941

42+
local-syncer-attio: local-build
43+
docker run -it --rm --env-file .env -e DESTINATION_SCHEMA_NAME=attio bemidb:local syncer-attio
44+
4045
local-sh:
4146
docker run -it --rm --env-file .env bemidb:local bash
4247

README.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,16 @@ docker run \
165165
ghcr.io/bemihq/bemidb:latest syncer-amplitude
166166
```
167167

168+
#### Syncing from Attio
169+
170+
```sh
171+
docker run \
172+
-e SOURCE_ATTIO_API_ACCESS_TOKEN=[...] \
173+
-e DESTINATION_SCHEMA_NAME=attio \
174+
-e AWS_REGION -e AWS_S3_BUCKET -e AWS_ACCESS_KEY_ID -e AWS_SECRET_ACCESS_KEY -e CATALOG_DATABASE_URL \
175+
ghcr.io/bemihq/bemidb:latest syncer-attio
176+
```
177+
168178
#### Customizing S3 endpoint
169179

170180
BemiDB can work with various S3-compatible object storage solutions, such as MinIO.
@@ -215,6 +225,13 @@ export AWS_S3_ENDPOINT=http://localhost:9000
215225
| `SOURCE_AMPLITUDE_SECRET_KEY` | Required | Amplitude secret key for authentication. |
216226
| `SOURCE_AMPLITUDE_START_DATE` | `2025-01-01` | Start date for syncing data from Amplitude in `YYYY-MM-DD` format. |
217227

228+
#### `syncer-attio` command options
229+
230+
| Environment variable | Default value | Description |
231+
|---------------------------------|---------------|--------------------------------------------|
232+
| `DESTINATION_SCHEMA_NAME` | Required | Schema name in BemiDB to sync data to. |
233+
| `SOURCE_ATTIO_API_ACCESS_TOKEN` | Required | Attio API access token for authentication. |
234+
218235
#### `server` command options
219236

220237
| Environment variable | Default value | Description |
@@ -314,6 +331,7 @@ SELECT * FROM [TABLE] WHERE [JSON_COLUMN]->>'[JSON_KEY]' = '[JSON_VALUE]';
314331
- [ ] Jupyter notebooks ([#27](https://github.com/BemiHQ/BemiDB/issues/27))
315332
- [x] Data syncing from other sources
316333
- [x] Amplitude (incremental)
334+
- [x] Attio CRM (full-refresh)
317335
- [x] Postgres (full-refresh)
318336
- [ ] HubSpot
319337
- [ ] Stripe

docker/bin/run.sh

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,16 @@ case "${1:-}" in
2727
./bin/syncer-amplitude 2>&1 | sed 's/^/[Syncer] /'
2828
echo "Syncer for Amplitude finished."
2929
;;
30+
syncer-attio)
31+
: "${SOURCE_ATTIO_API_ACCESS_TOKEN:?Environment variable SOURCE_ATTIO_API_ACCESS_TOKEN must be set}"
32+
: "${DESTINATION_SCHEMA_NAME:?Environment variable DESTINATION_SCHEMA_NAME must be set}"
33+
34+
psql $CATALOG_DATABASE_URL -f /app/scripts/catalog.sql
35+
36+
echo "Starting Syncer for Attio..."
37+
./bin/syncer-attio 2>&1 | sed 's/^/[Syncer] /'
38+
echo "Syncer for Attio finished."
39+
;;
3040
server)
3141
: "${AWS_REGION:?Environment variable AWS_REGION must be set}"
3242
: "${AWS_S3_BUCKET:?Environment variable AWS_S3_BUCKET must be set}"

src/common/capped_buffer.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ import (
66
"sync"
77
)
88

9+
const DEFAULT_CAPPED_BUFFER_SIZE = 32 * 1024 * 1024 // 32 MB in memory
10+
911
type CappedBuffer struct {
1012
Config *CommonConfig
1113
MaxSizeBytes int

src/common/common_config.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package common
22

33
const (
4-
VERSION = "1.2.0"
4+
VERSION = "1.3.0"
55

66
ENV_LOG_LEVEL = "BEMIDB_LOG_LEVEL"
77
ENV_DISABLE_ANONYMOUS_ANALYTICS = "BEMIDB_DISABLE_ANONYMOUS_ANALYTICS"

src/common/iceberg_schema_column.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -368,8 +368,17 @@ func (col *IcebergSchemaColumn) duckdbPrimitiveValueFromJson(value any) interfac
368368
return value
369369
}
370370
case IcebergColumnTypeDate:
371-
days := value.(float64)
372-
return time.Unix(0, 0).UTC().AddDate(0, 0, int(days))
371+
switch kind {
372+
case reflect.String:
373+
valueString := value.(string)
374+
if valueString == "" {
375+
return nil
376+
}
377+
return StringDateToTime(valueString)
378+
case reflect.Float64:
379+
days := value.(float64)
380+
return time.Unix(0, 0).UTC().AddDate(0, 0, int(days))
381+
}
373382
case IcebergColumnTypeTime:
374383
var nanoseconds int64
375384
if col.DatetimePrecision == 6 {

src/common/iceberg_table.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,32 @@ func (table *IcebergTable) Create(tableS3Path string) {
4343
table.IcebergCatalog.CreateTable(table.IcebergSchemaTable, tableS3Path+"/metadata/"+ICEBERG_METADATA_INITIAL_FILE_NAME)
4444
}
4545

46+
func (table *IcebergTable) ReplaceWith(callbackFunc func(syncingIcebergTable *IcebergTable)) {
47+
originalTableName := table.IcebergSchemaTable.Table
48+
49+
// Delete -syncing table
50+
syncingIcebergSchemaTable := IcebergSchemaTable{Schema: table.IcebergSchemaTable.Schema, Table: originalTableName + TEMP_TABLE_SUFFIX_SYNCING}
51+
syncingIcebergTable := NewIcebergTable(table.Config, table.StorageS3, table.DuckdbClient, syncingIcebergSchemaTable)
52+
syncingIcebergTable.DropIfExists()
53+
54+
// Insert into -syncing table
55+
callbackFunc(syncingIcebergTable)
56+
57+
// Delete -deleting table
58+
deletingIcebergSchemaTable := IcebergSchemaTable{Schema: table.IcebergSchemaTable.Schema, Table: originalTableName + TEMP_TABLE_SUFFIX_DELETING}
59+
deletingIcebergTable := NewIcebergTable(table.Config, table.StorageS3, table.DuckdbClient, deletingIcebergSchemaTable)
60+
deletingIcebergTable.DropIfExists()
61+
62+
// Rename table to -deleting
63+
table.Rename(deletingIcebergSchemaTable.Table)
64+
65+
// Rename -syncing to table
66+
syncingIcebergTable.Rename(originalTableName)
67+
68+
// Delete -deleting table
69+
deletingIcebergTable.DropIfExists()
70+
}
71+
4672
func (table *IcebergTable) DropIfExists() {
4773
tableS3Path := table.IcebergCatalog.TableS3Path(table.IcebergSchemaTable)
4874
if tableS3Path == "" {

src/syncer-amplitude/event.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ func (event *Event) ToMap() map[string]interface{} {
128128
return result
129129
}
130130

131-
func EventIcebergSchemaColumns(config *common.CommonConfig) []*common.IcebergSchemaColumn {
131+
func EventsIcebergSchemaColumns(config *common.CommonConfig) []*common.IcebergSchemaColumn {
132132
return []*common.IcebergSchemaColumn{
133133
{Config: config, ColumnName: "adid", ColumnType: common.IcebergColumnTypeString, Position: 1},
134134
{Config: config, ColumnName: "amplitude_attribution_ids", ColumnType: common.IcebergColumnTypeString, Position: 2},

0 commit comments

Comments
 (0)