Skip to content

Commit 73117f0

Browse files
committed
Refactor: Raw line length
- Only calculate raw line length on `RECORD` messages - Rename `DATAMILL_RAW_LINE_SIZE` -> `RAW_LINE_SIZE`
1 parent de85576 commit 73117f0

File tree

2 files changed

+4
-4
lines changed

2 files changed

+4
-4
lines changed

target_postgres/singer_stream.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,11 @@
1919
SINGER_LEVEL = '_sdc_level_{}_id'
2020
SINGER_VALUE = '_sdc_value'
2121

22-
DATAMILL_RAW_LINE_SIZE = '_dm_raw_line_size'
22+
RAW_LINE_SIZE = '__raw_line_size'
2323

2424

2525
def get_line_size(line_data):
26-
return line_data.get(DATAMILL_RAW_LINE_SIZE) or len(json.dumps(line_data))
26+
return line_data.get(RAW_LINE_SIZE) or len(json.dumps(line_data))
2727

2828

2929
class BufferedSingerStream():

target_postgres/target_tools.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
from target_postgres import json_schema
1212
from target_postgres.exceptions import TargetError
13-
from target_postgres.singer_stream import BufferedSingerStream, DATAMILL_RAW_LINE_SIZE
13+
from target_postgres.singer_stream import BufferedSingerStream, RAW_LINE_SIZE
1414
from target_postgres.stream_tracker import StreamTracker
1515

1616
LOGGER = singer.get_logger()
@@ -91,7 +91,6 @@ def _line_handler(state_tracker, target, invalid_records_detect, invalid_records
9191
max_batch_size, line):
9292
try:
9393
line_data = json.loads(line)
94-
line_data[DATAMILL_RAW_LINE_SIZE] = len(line)
9594
except json.decoder.JSONDecodeError:
9695
LOGGER.error("Unable to parse JSON: {}".format(line))
9796
raise
@@ -137,6 +136,7 @@ def _line_handler(state_tracker, target, invalid_records_detect, invalid_records
137136
if 'stream' not in line_data:
138137
raise TargetError('`stream` is a required key: {}'.format(line))
139138

139+
line_data[RAW_LINE_SIZE] = len(line)
140140
state_tracker.handle_record_message(line_data['stream'], line_data)
141141
elif line_data['type'] == 'ACTIVATE_VERSION':
142142
if 'stream' not in line_data:

0 commit comments

Comments
 (0)