Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kvalobs importer #39

Merged
merged 67 commits into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
67 commits
Select commit Hold shift + click to select a range
5b490fd
Init kvalobs importer
Lun4m Nov 8, 2024
761b9e0
Update directory structure
Lun4m Nov 25, 2024
dab35ba
Use pointer in GetTimeseriesID
Lun4m Nov 25, 2024
dea347a
Use go-arg instead of go-flags
Lun4m Nov 26, 2024
e64d1dd
Remove Skip argument and don't insert flags for text data
Lun4m Nov 26, 2024
308140d
Add helpful error for timestamp parsing
Lun4m Nov 26, 2024
8079a61
Change BaseDir to Path
Lun4m Nov 26, 2024
c47718b
Simplify help messages
Lun4m Nov 26, 2024
d0a08d7
Need to add another fmt.Println() to make tests work :clown_face:
Lun4m Nov 27, 2024
ecf65d5
Use common Config and add test
Lun4m Nov 27, 2024
a85171a
Move doc comment
Lun4m Nov 27, 2024
8112708
Update function name
Lun4m Nov 27, 2024
ba7b319
Change GetTimeseriesID signature
Lun4m Nov 27, 2024
b600490
Generic overload
Lun4m Nov 27, 2024
471d7ab
Add some type alias
Lun4m Nov 28, 2024
56c491a
Dump one directory per station and use multiple connections
Lun4m Nov 28, 2024
23ea484
Remove log call
Lun4m Nov 28, 2024
d5cff05
Reset log output when returning from dumpTable
Lun4m Nov 28, 2024
ff5650d
Change position of if check
Lun4m Nov 28, 2024
cd4b1ac
Update help description
Lun4m Nov 28, 2024
1603e77
Move kvalobs Table and use consts for envars
Lun4m Nov 28, 2024
9e7349b
Kvalobs import WIP
Lun4m Nov 28, 2024
8b38506
Kvalobs import working commit
Lun4m Nov 29, 2024
af465ce
Change cfailed type
Lun4m Nov 29, 2024
cd14110
Log outside before if condition
Lun4m Nov 29, 2024
a14ca9b
Remove unnecessary generic struct and improve logging/bar outputs
Lun4m Nov 29, 2024
eca1ec1
Update metadata caching
Lun4m Nov 29, 2024
70d6941
Check if sensor and level have default values in KDVH and Kvalobs
Lun4m Dec 2, 2024
dabd954
Move whole loop body inside goroutine
Lun4m Dec 2, 2024
ad80e2c
Remove comments
Lun4m Dec 2, 2024
0b94eaa
Get fromtime and totime from the station_metadata table
Lun4m Dec 2, 2024
b8868cf
Fix formatting
Lun4m Dec 2, 2024
8806dba
Use const from different package
Lun4m Dec 2, 2024
5d6e541
Use different function to check if value is empty or equal
Lun4m Dec 2, 2024
08204fb
Filter out fake null values
Lun4m Dec 2, 2024
58064cc
Update docs
Lun4m Dec 2, 2024
d9dca14
Prettify
Lun4m Dec 2, 2024
4055dbe
Rework comments
Lun4m Dec 2, 2024
02f0d58
Unify env variable naming
Lun4m Dec 2, 2024
e55f887
Need to create the station directory
Lun4m Dec 2, 2024
fcd1696
More visual fixes
Lun4m Dec 2, 2024
55c5b4c
Need to call bar.Add before wg.Done
Lun4m Dec 2, 2024
733aa10
Insert timeseries even if fromtime is null
Lun4m Dec 2, 2024
cd49eb5
Move constructor close to struct
Lun4m Dec 2, 2024
b6d3050
Update docs
Lun4m Dec 2, 2024
7a0d986
Drop indices when importing from Kvalobs
Lun4m Dec 3, 2024
3f5f2e5
Remove unnecessary calls to log.SetOutput
Lun4m Dec 3, 2024
9500603
Simplify getStationLabelMap
Lun4m Dec 3, 2024
5ec43da
Add timestamp to log filename
Lun4m Dec 3, 2024
ddc193b
Fix bugs with label dump
Lun4m Dec 3, 2024
c413791
Add possibility to only dump labels
Lun4m Dec 3, 2024
af535d3
Add 'check' package
Lun4m Dec 3, 2024
7f66e17
Drop flags.old_databases in favor of flags.kvdata
Lun4m Dec 3, 2024
b1cc685
Rework and simplify overall structure and add edge cases
Lun4m Dec 4, 2024
47302de
Run cleanup even when tests fail
Lun4m Dec 5, 2024
75e445a
Add more automatic way of dropping indexes
Lun4m Dec 5, 2024
bb35e86
Remove directory that should not be there
Lun4m Dec 6, 2024
6e01c28
Unclutter kdvh package
Lun4m Dec 5, 2024
3b28f96
Rework kvalobs structure
Lun4m Dec 9, 2024
a734882
Add psql command
Lun4m Dec 9, 2024
0e1fa4e
Improve label dump (wip)
Lun4m Dec 10, 2024
1445cbf
Improve label dumps (final)
Lun4m Dec 11, 2024
19a35b2
Remove unnecessary comments
Lun4m Dec 11, 2024
0ac0890
Update check command
Lun4m Dec 11, 2024
1acb368
Update comments
Lun4m Dec 11, 2024
3509faf
Insert partitions for tests
Lun4m Dec 13, 2024
046151b
Allow specifying time range for import
Lun4m Dec 13, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions db/drop_indices.sql
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
-- Remove indices before bulk insertion
DROP INDEX IF EXISTS data_timestamp_index,
data_timeseries_index,
nonscalar_data_timestamp_index,
nonscalar_data_timeseries_index,
old_flags_obtime_index,
old_flags_timeseries_index;
DO $$
DECLARE
i RECORD;
BEGIN
FOR i IN (SELECT schemaname, indexname fROM pg_indexes
WHERE schemaname IN ('public', 'flags')
AND NOT indexdef LIKE '%UNIQUE%')
LOOP
EXECUTE format('DROP INDEX IF EXISTS %s.%s', i.schemaname, i.indexname);
END LOOP;
END $$;
18 changes: 3 additions & 15 deletions db/flags.sql
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,8 @@ CREATE TABLE IF NOT EXISTS flags.kvdata (
corrected REAL NULL,
controlinfo TEXT NULL,
useinfo TEXT NULL,
cfailed INT4 NULL,
cfailed TEXT NULL,
CONSTRAINT unique_kvdata_timeseries_obstime UNIQUE (timeseries, obstime)
);
CREATE INDEX IF NOT EXISTS kvdata_obtime_index ON flags.kvdata (obstime);
CREATE INDEX IF NOT EXISTS kvdata_timeseries_index ON flags.kvdata USING HASH (timeseries);

CREATE TABLE IF NOT EXISTS flags.old_databases (
timeseries INT4 REFERENCES public.timeseries,
obstime TIMESTAMPTZ NOT NULL,
corrected REAL NULL,
controlinfo TEXT NULL,
useinfo TEXT NULL,
cfailed INT4 NULL ,
CONSTRAINT unique_old_flags_timeseries_obstime UNIQUE (timeseries, obstime)
);
CREATE INDEX IF NOT EXISTS old_flags_obtime_index ON flags.old_databases (obstime);
CREATE INDEX IF NOT EXISTS old_flags_timeseries_index ON flags.old_databases USING HASH (timeseries);
CREATE INDEX IF NOT EXISTS kvdata_obstime_index ON flags.kvdata (obstime);
CREATE INDEX IF NOT EXISTS kvdata_timeseries_index ON flags.kvdata USING HASH (timeseries);
4 changes: 4 additions & 0 deletions db/partitions_generated.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
-- Generated by simple script for testing
CREATE TABLE IF NOT EXISTS data_y1850_to_y1950 PARTITION OF public.data
FOR VALUES FROM ('1850-01-01 00:00:00+00') TO ('1950-01-01 00:00:00+00');
CREATE TABLE IF NOT EXISTS data_y1950_to_y2000 PARTITION OF public.data
FOR VALUES FROM ('1950-01-01 00:00:00+00') TO ('2000-01-01 00:00:00+00');
CREATE TABLE IF NOT EXISTS data_y2000_to_y2010 PARTITION OF public.data
Expand Down Expand Up @@ -35,6 +37,8 @@ CREATE TABLE IF NOT EXISTS data_y2028_to_y2029 PARTITION OF public.data
FOR VALUES FROM ('2028-01-01 00:00:00+00') TO ('2029-01-01 00:00:00+00');
CREATE TABLE IF NOT EXISTS data_y2029_to_y2030 PARTITION OF public.data
FOR VALUES FROM ('2029-01-01 00:00:00+00') TO ('2030-01-01 00:00:00+00');
CREATE TABLE IF NOT EXISTS nonscalar_data_y1850_to_y1950 PARTITION OF public.nonscalar_data
FOR VALUES FROM ('1850-01-01 00:00:00+00') TO ('1950-01-01 00:00:00+00');
CREATE TABLE IF NOT EXISTS nonscalar_data_y1950_to_y2000 PARTITION OF public.nonscalar_data
FOR VALUES FROM ('1950-01-01 00:00:00+00') TO ('2000-01-01 00:00:00+00');
CREATE TABLE IF NOT EXISTS nonscalar_data_y2000_to_y2010 PARTITION OF public.nonscalar_data
Expand Down
2 changes: 1 addition & 1 deletion ingestion/src/kvkafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ pub struct Kvdata {
#[serde(default, deserialize_with = "optional")]
useinfo: Option<String>,
#[serde(default, deserialize_with = "optional")]
cfailed: Option<i32>,
cfailed: Option<String>,
}

// If the field is either empty or missing it should deserialize to None.
Expand Down
25 changes: 6 additions & 19 deletions integration_tests/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,6 @@ async fn insert_schema(client: &tokio_postgres::Client, filename: &str) -> Resul
client.batch_execute(schema.as_str()).await
}

fn format_partition(start: &str, end: &str, table: &str) -> String {
// TODO: add multiple partitions?
format!(
"CREATE TABLE {table}_y{start}_to_y{end} PARTITION OF {table} \
FOR VALUES FROM ('{start}-01-01 00:00:00+00') TO ('{end}-01-01 00:00:00+00')",
)
}

async fn create_data_partitions(client: &tokio_postgres::Client) -> Result<(), Error> {
let scalar_string = format_partition("1950", "2100", "public.data");
let nonscalar_string = format_partition("1950", "2100", "public.nonscalar_data");

client.batch_execute(scalar_string.as_str()).await?;
client.batch_execute(nonscalar_string.as_str()).await
}

#[tokio::main]
async fn main() {
let (client, connection) = tokio_postgres::connect(CONNECT_STRING, NoTls)
Expand All @@ -38,10 +22,13 @@ async fn main() {
});

// NOTE: order matters
let schemas = ["db/public.sql", "db/labels.sql", "db/flags.sql"];
let schemas = [
"db/public.sql",
"db/partitions_generated.sql",
"db/labels.sql",
"db/flags.sql",
];
for schema in schemas {
insert_schema(&client, schema).await.unwrap();
}

create_data_partitions(&client).await.unwrap();
}
13 changes: 9 additions & 4 deletions justfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,25 @@ test_all: setup && clean
cargo test --workspace --no-fail-fast -- --nocapture --test-threads=1

test_end_to_end: setup && clean
cargo test --test end_to_end --no-fail-fast -- --nocapture --test-threads=1
-cargo test --test end_to_end --no-fail-fast -- --nocapture --test-threads=1

test_migrations: debug_migrations && clean

# Debug commands don't perfom the clean up action after running.
# This allows to manually check the state of the database.

debug_kafka: setup
cargo test --test end_to_end test_kafka --features debug --no-fail-fast -- --nocapture --test-threads=1
-cargo test --test end_to_end test_kafka --features debug --no-fail-fast -- --nocapture --test-threads=1

debug_test TEST: setup
cargo test {{TEST}} --features debug --no-fail-fast -- --nocapture --test-threads=1
-cargo test {{TEST}} --features debug --no-fail-fast -- --nocapture --test-threads=1

debug_migrations: setup
@ cd migrations && go test -v ./...
-@ cd migrations && go test -v ./...

# psql into the container database
psql:
@docker exec -it lard_tests psql -U postgres

setup:
@ echo "Starting Postgres docker container..."
Expand Down
6 changes: 4 additions & 2 deletions migrations/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Migrations

Go package used to dump tables from old databases (KDVH, Kvalobs) and import them into LARD.
Go package that dumps tables from old databases (KDVH, Kvalobs) and imports them into LARD.

## Usage

Expand All @@ -10,16 +10,18 @@ Go package used to dump tables from old databases (KDVH, Kvalobs) and import the
go build
```

1. Dump tables from KDVH
1. Dump tables

```terminal
./migrate kdvh dump
./migrate kvalobs dump
```

1. Import dumps into LARD

```terminal
./migrate kdvh import
./migrate kvalobs import
```

For each command, you can use the `--help` flag to see all available options.
Expand Down
5 changes: 2 additions & 3 deletions migrations/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@ module migrate
go 1.22.3

require (
github.com/alexflint/go-arg v1.5.1
github.com/gocarina/gocsv v0.0.0-20240520201108-78e41c74b4b1
github.com/jackc/pgx/v5 v5.6.0
github.com/jessevdk/go-flags v1.6.1
github.com/joho/godotenv v1.5.1
github.com/rickb777/period v1.0.5
github.com/schollz/progressbar/v3 v3.16.1
)

require (
github.com/alexflint/go-scalar v1.2.0 // indirect
github.com/govalues/decimal v0.1.29 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
Expand All @@ -25,5 +26,3 @@ require (
golang.org/x/term v0.25.0 // indirect
golang.org/x/text v0.16.0 // indirect
)

replace github.com/jessevdk/go-flags => github.com/Lun4m/go-flags v0.0.0-20241118100134-6375192b7985
11 changes: 5 additions & 6 deletions migrations/go.sum
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
github.com/Lun4m/go-flags v0.0.0-20241113125827-68757125e949 h1:7xyEGIr1X5alOjBjlNTDF+aRBcRIo60YX5sdlziLE5w=
github.com/Lun4m/go-flags v0.0.0-20241113125827-68757125e949/go.mod h1:42/L0FDbP0qe91I+81tBqjU3uoz1tn1GDMZAhcCE2PE=
github.com/Lun4m/go-flags v0.0.0-20241118100134-6375192b7985 h1:eUA/sFZ1CtY9+9y/fPpUivYW8fJBlXqB4/8CjC+yXqk=
github.com/Lun4m/go-flags v0.0.0-20241118100134-6375192b7985/go.mod h1:42/L0FDbP0qe91I+81tBqjU3uoz1tn1GDMZAhcCE2PE=
github.com/alexflint/go-arg v1.5.1 h1:nBuWUCpuRy0snAG+uIJ6N0UvYxpxA0/ghA/AaHxlT8Y=
github.com/alexflint/go-arg v1.5.1/go.mod h1:A7vTJzvjoaSTypg4biM5uYNTkJ27SkNTArtYXnlqVO8=
github.com/alexflint/go-scalar v1.2.0 h1:WR7JPKkeNpnYIOfHRa7ivM21aWAdHD0gEWHCx+WQBRw=
github.com/alexflint/go-scalar v1.2.0/go.mod h1:LoFvNMqS1CPrMVltza4LvnGKhaSpc3oyLEBUZVhhS2o=
github.com/chengxilo/virtualterm v1.0.4 h1:Z6IpERbRVlfB8WkOmtbHiDbBANU7cimRIof7mk9/PwM=
github.com/chengxilo/virtualterm v1.0.4/go.mod h1:DyxxBZz/x1iqJjFxTFcr6/x+jSpqN0iwWCOK1q10rlY=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand All @@ -21,8 +21,6 @@ github.com/jackc/pgx/v5 v5.6.0 h1:SWJzexBzPL5jb0GEsrPMLIsi/3jOo7RHlzTjcAeDrPY=
github.com/jackc/pgx/v5 v5.6.0/go.mod h1:DNZ/vlrUnhWCoFGxHAG8U2ljioxukquj7utPDgtQdTw=
github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk=
github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/jessevdk/go-flags v1.6.1 h1:Cvu5U8UGrLay1rZfv/zP7iLpSHGUZ/Ou68T0iX1bBK4=
github.com/jessevdk/go-flags v1.6.1/go.mod h1:Mk8T1hIAWpOiJiHa9rJASDK2UGWji0EuPGBnNLMooyc=
github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
github.com/mattn/go-runewidth v0.0.16 h1:E5ScNMtiwvlvB5paMFdw9p4kSQzbXFikJ5SQO6TULQc=
Expand All @@ -42,6 +40,7 @@ github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUc
github.com/schollz/progressbar/v3 v3.16.1 h1:RnF1neWZFzLCoGx8yp1yF7SDl4AzNDI5y4I0aUJRrZQ=
github.com/schollz/progressbar/v3 v3.16.1/go.mod h1:I2ILR76gz5VXqYMIY/LdLecvMHDPVcQm3W/MSKi1TME=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
Expand Down
Loading
Loading