diff --git a/target_postgres/json_schema.py b/target_postgres/json_schema.py index 9829d1d2..8e971877 100644 --- a/target_postgres/json_schema.py +++ b/target_postgres/json_schema.py @@ -1,4 +1,5 @@ from copy import deepcopy +import decimal import json import re @@ -20,7 +21,8 @@ float: NUMBER, bool: BOOLEAN, str: STRING, - type(None): NULL + type(None): NULL, + decimal.Decimal: NUMBER } diff --git a/target_postgres/target_tools.py b/target_postgres/target_tools.py index e1b527f4..c01cc40b 100644 --- a/target_postgres/target_tools.py +++ b/target_postgres/target_tools.py @@ -4,6 +4,7 @@ import pkg_resources import sys import threading +import decimal import singer from singer import utils, metadata, metrics @@ -90,7 +91,7 @@ def _report_invalid_records(streams): def _line_handler(state_tracker, target, invalid_records_detect, invalid_records_threshold, max_batch_rows, max_batch_size, line): try: - line_data = json.loads(line) + line_data = json.loads(line, parse_float=decimal.Decimal) except json.decoder.JSONDecodeError: LOGGER.error("Unable to parse JSON: {}".format(line)) raise diff --git a/tests/unit/test_BufferedSingerStream.py b/tests/unit/test_BufferedSingerStream.py index c178f986..077d4e16 100644 --- a/tests/unit/test_BufferedSingerStream.py +++ b/tests/unit/test_BufferedSingerStream.py @@ -1,9 +1,10 @@ +from decimal import Decimal from copy import deepcopy import pytest from target_postgres import singer -from target_postgres.singer_stream import BufferedSingerStream, SingerStreamError +from target_postgres.singer_stream import BufferedSingerStream, SingerStreamError, RAW_LINE_SIZE from utils.fixtures import CatStream, InvalidCatStream, CATS_SCHEMA @@ -75,6 +76,71 @@ def test_add_record_message__invalid_record(): assert [] == missing_sdc_properties(singer_stream) +SIMPLE_MULTIPLE_OF_VALID_SCHEMA = { + 'properties': { + 'multipleOfKey': { + 'type': 'number', + 'multipleOf': Decimal('1e-15') + } + } +} + +SIMPLE_MULTIPLE_OF_INVALID_SCHEMA = { + 'properties': { + 'multipleOfKey': { + 'type': 'number', + 'multipleOf': 1e-15 + } + } +} + +def test_add_record_message__multipleOf(): + stream_name = 'test' + singer_stream = BufferedSingerStream(stream_name, + deepcopy(SIMPLE_MULTIPLE_OF_VALID_SCHEMA), + []) + + multiple_of_values = ['1', '2', '3', '4', '5', '1.1', '2.3', '1.23456789', '20', '100.1'] + + for value in multiple_of_values: + singer_stream.add_record_message( + { + 'type': 'RECORD', + 'stream': stream_name, + 'record': {'multipleOfKey': Decimal(value)}, + 'sequence': 0, + RAW_LINE_SIZE: 100 + } + ) + + assert not singer_stream.peek_invalid_records() + assert singer_stream.count == len(multiple_of_values) + + +def test_add_record_message__multipleOf_invalid_record(): + stream_name = 'test' + singer_stream = BufferedSingerStream(stream_name, + deepcopy(SIMPLE_MULTIPLE_OF_INVALID_SCHEMA), + []) + + multiple_of_values = [1, 2] + + for value in multiple_of_values: + with pytest.raises(SingerStreamError): + singer_stream.add_record_message( + { + 'type': 'RECORD', + 'stream': stream_name, + 'record': {'multipleOfKey': value}, + 'sequence': 0, + RAW_LINE_SIZE: 100 + } + ) + + assert singer_stream.peek_invalid_records() + assert singer_stream.count == 0 + + SIMPLE_ALLOF_SCHEMA = { 'type': 'object', 'properties': { diff --git a/tests/unit/test_json_schema.py b/tests/unit/test_json_schema.py index 9e998a32..dbb8edcc 100644 --- a/tests/unit/test_json_schema.py +++ b/tests/unit/test_json_schema.py @@ -1,4 +1,5 @@ import re +import decimal import pytest @@ -31,6 +32,8 @@ def test_python_type(): == json_schema.STRING assert json_schema.python_type('world') \ == json_schema.STRING + assert json_schema.python_type(decimal.Decimal(1)) \ + == json_schema.NUMBER def test_is_object(): diff --git a/tests/unit/test_target_tools.py b/tests/unit/test_target_tools.py index 2e6c57d3..d12450a0 100644 --- a/tests/unit/test_target_tools.py +++ b/tests/unit/test_target_tools.py @@ -115,6 +115,45 @@ class TestStream(ListStream): assert rows_persisted == expected_rows +def test_record_with_multiple_of(): + values = [1, 1.0, 2, 2.0, 3, 7, 10.1] + records = [] + for value in values: + records.append({ + "type": "RECORD", + "stream": "test", + "record": {"multipleOfKey": value}, + }) + + class TestStream(ListStream): + stream = [ + { + "type": "SCHEMA", + "stream": "test", + "schema": { + "properties": { + "multipleOfKey": { + "type": "number", + "multipleOf": 1e-15 + } + } + }, + "key_properties": [] + } + ] + records + + target = Target() + + target_tools.stream_to_target(TestStream(), target, config=CONFIG.copy()) + + expected_rows = len(records) + rows_persisted = 0 + for call in target.calls['write_batch']: + rows_persisted += call['records_count'] + + assert rows_persisted == expected_rows + + def test_state__capture(capsys): stream = [ json.dumps({'type': 'STATE', 'value': {'test': 'state-1'}}),