Skip to content

Commit

Permalink
Merge pull request #169 from AlexanderMann/fix/buffer-size-calculations
Browse files Browse the repository at this point in the history
Fix/buffer size calculations
  • Loading branch information
AlexanderMann authored Feb 13, 2020
2 parents 2157fba + 73117f0 commit adec7da
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 33 deletions.
29 changes: 0 additions & 29 deletions target_postgres/pysize.py

This file was deleted.

20 changes: 18 additions & 2 deletions target_postgres/singer_stream.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from copy import deepcopy
import json
import uuid

import arrow
Expand All @@ -7,7 +8,22 @@

from target_postgres import json_schema, singer
from target_postgres.exceptions import SingerStreamError
from target_postgres.pysize import get_size


SINGER_RECEIVED_AT = '_sdc_received_at'
SINGER_BATCHED_AT = '_sdc_batched_at'
SINGER_SEQUENCE = '_sdc_sequence'
SINGER_TABLE_VERSION = '_sdc_table_version'
SINGER_PK = '_sdc_primary_key'
SINGER_SOURCE_PK_PREFIX = '_sdc_source_key_'
SINGER_LEVEL = '_sdc_level_{}_id'
SINGER_VALUE = '_sdc_value'

RAW_LINE_SIZE = '__raw_line_size'


def get_line_size(line_data):
return line_data.get(RAW_LINE_SIZE) or len(json.dumps(line_data))


class BufferedSingerStream():
Expand Down Expand Up @@ -133,7 +149,7 @@ def add_record_message(self, record_message):

if add_record:
self.__buffer.append(record_message)
self.__size += get_size(record_message)
self.__size += get_line_size(record_message)
self.__count += 1
elif self.invalid_records_detect \
and len(self.invalid_records) >= self.invalid_records_threshold:
Expand Down
3 changes: 2 additions & 1 deletion target_postgres/target_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from target_postgres import json_schema
from target_postgres.exceptions import TargetError
from target_postgres.singer_stream import BufferedSingerStream
from target_postgres.singer_stream import BufferedSingerStream, RAW_LINE_SIZE
from target_postgres.stream_tracker import StreamTracker

LOGGER = singer.get_logger()
Expand Down Expand Up @@ -136,6 +136,7 @@ def _line_handler(state_tracker, target, invalid_records_detect, invalid_records
if 'stream' not in line_data:
raise TargetError('`stream` is a required key: {}'.format(line))

line_data[RAW_LINE_SIZE] = len(line)
state_tracker.handle_record_message(line_data['stream'], line_data)
elif line_data['type'] == 'ACTIVATE_VERSION':
if 'stream' not in line_data:
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/test_BufferedSingerStream.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ def test_multiple_batches__by_memory():
singer_stream = BufferedSingerStream(CATS_SCHEMA['stream'],
CATS_SCHEMA['schema'],
CATS_SCHEMA['key_properties'],
max_buffer_size=1024)
max_buffer_size=10)

assert len(singer_stream.peek_buffer()) == 0

Expand Down

0 comments on commit adec7da

Please sign in to comment.