From f948d7ac52ea02cc0835a7b20cb498ffdf5a9a77 Mon Sep 17 00:00:00 2001 From: davicorreiajr Date: Sat, 28 Mar 2020 14:05:25 -0300 Subject: [PATCH 1/3] Fix parse of multipleOf (to avoid validation error) --- target_postgres/json_schema.py | 4 +- target_postgres/target_tools.py | 3 +- tests/unit/test_BufferedSingerStream.py | 70 ++++++++++++++++++++++++- tests/unit/test_json_schema.py | 3 ++ tests/unit/test_target_tools.py | 39 ++++++++++++++ 5 files changed, 116 insertions(+), 3 deletions(-) diff --git a/target_postgres/json_schema.py b/target_postgres/json_schema.py index 9829d1d2..a3add20c 100644 --- a/target_postgres/json_schema.py +++ b/target_postgres/json_schema.py @@ -1,6 +1,7 @@ from copy import deepcopy import json import re +import decimal from jsonschema import Draft4Validator from jsonschema.exceptions import SchemaError @@ -20,7 +21,8 @@ float: NUMBER, bool: BOOLEAN, str: STRING, - type(None): NULL + type(None): NULL, + decimal.Decimal: NUMBER } diff --git a/target_postgres/target_tools.py b/target_postgres/target_tools.py index e1b527f4..c01cc40b 100644 --- a/target_postgres/target_tools.py +++ b/target_postgres/target_tools.py @@ -4,6 +4,7 @@ import pkg_resources import sys import threading +import decimal import singer from singer import utils, metadata, metrics @@ -90,7 +91,7 @@ def _report_invalid_records(streams): def _line_handler(state_tracker, target, invalid_records_detect, invalid_records_threshold, max_batch_rows, max_batch_size, line): try: - line_data = json.loads(line) + line_data = json.loads(line, parse_float=decimal.Decimal) except json.decoder.JSONDecodeError: LOGGER.error("Unable to parse JSON: {}".format(line)) raise diff --git a/tests/unit/test_BufferedSingerStream.py b/tests/unit/test_BufferedSingerStream.py index c178f986..141d49fc 100644 --- a/tests/unit/test_BufferedSingerStream.py +++ b/tests/unit/test_BufferedSingerStream.py @@ -1,9 +1,10 @@ +from decimal import Decimal from copy import deepcopy import pytest from target_postgres import singer -from target_postgres.singer_stream import BufferedSingerStream, SingerStreamError +from target_postgres.singer_stream import BufferedSingerStream, SingerStreamError, RAW_LINE_SIZE from utils.fixtures import CatStream, InvalidCatStream, CATS_SCHEMA @@ -75,6 +76,73 @@ def test_add_record_message__invalid_record(): assert [] == missing_sdc_properties(singer_stream) +SIMPLE_MULTIPLE_OF_VALID_SCHEMA = { + 'properties': { + 'multipleOfKey': { + 'type': 'number', + 'multipleOf': Decimal('1e-15') + } + } +} + +SIMPLE_MULTIPLE_OF_INVALID_SCHEMA = { + 'properties': { + 'multipleOfKey': { + 'type': 'number', + 'multipleOf': 1e-15 + } + } +} + +def test_add_record_message__multipleOf(): + stream_name = 'test' + singer_stream = BufferedSingerStream(stream_name, + deepcopy(SIMPLE_MULTIPLE_OF_VALID_SCHEMA), + []) + + multiple_of_values = ['1', '2', '3', '4', '5', '1.1', '2.3', '1.23456789', '20', '100.1'] + + for value in multiple_of_values: + singer_stream.add_record_message( + { + 'type': 'RECORD', + 'stream': stream_name, + 'record': {'multipleOfKey': Decimal(value)}, + 'sequence': 0, + RAW_LINE_SIZE: 100 + } + ) + + assert not singer_stream.peek_invalid_records() + assert singer_stream.count == len(multiple_of_values) + assert [] == missing_sdc_properties(singer_stream) + + +def test_add_record_message__multipleOf_invalid_record(): + stream_name = 'test' + singer_stream = BufferedSingerStream(stream_name, + deepcopy(SIMPLE_MULTIPLE_OF_INVALID_SCHEMA), + []) + + multiple_of_values = [1, 2] + + for value in multiple_of_values: + with pytest.raises(SingerStreamError): + singer_stream.add_record_message( + { + 'type': 'RECORD', + 'stream': stream_name, + 'record': {'multipleOfKey': value}, + 'sequence': 0, + RAW_LINE_SIZE: 100 + } + ) + + assert singer_stream.peek_invalid_records() + assert singer_stream.count == 0 + assert [] == missing_sdc_properties(singer_stream) + + SIMPLE_ALLOF_SCHEMA = { 'type': 'object', 'properties': { diff --git a/tests/unit/test_json_schema.py b/tests/unit/test_json_schema.py index 9e998a32..dbb8edcc 100644 --- a/tests/unit/test_json_schema.py +++ b/tests/unit/test_json_schema.py @@ -1,4 +1,5 @@ import re +import decimal import pytest @@ -31,6 +32,8 @@ def test_python_type(): == json_schema.STRING assert json_schema.python_type('world') \ == json_schema.STRING + assert json_schema.python_type(decimal.Decimal(1)) \ + == json_schema.NUMBER def test_is_object(): diff --git a/tests/unit/test_target_tools.py b/tests/unit/test_target_tools.py index 2e6c57d3..d12450a0 100644 --- a/tests/unit/test_target_tools.py +++ b/tests/unit/test_target_tools.py @@ -115,6 +115,45 @@ class TestStream(ListStream): assert rows_persisted == expected_rows +def test_record_with_multiple_of(): + values = [1, 1.0, 2, 2.0, 3, 7, 10.1] + records = [] + for value in values: + records.append({ + "type": "RECORD", + "stream": "test", + "record": {"multipleOfKey": value}, + }) + + class TestStream(ListStream): + stream = [ + { + "type": "SCHEMA", + "stream": "test", + "schema": { + "properties": { + "multipleOfKey": { + "type": "number", + "multipleOf": 1e-15 + } + } + }, + "key_properties": [] + } + ] + records + + target = Target() + + target_tools.stream_to_target(TestStream(), target, config=CONFIG.copy()) + + expected_rows = len(records) + rows_persisted = 0 + for call in target.calls['write_batch']: + rows_persisted += call['records_count'] + + assert rows_persisted == expected_rows + + def test_state__capture(capsys): stream = [ json.dumps({'type': 'STATE', 'value': {'test': 'state-1'}}), From 9631603b4d7fd03f24347aa6e394bf2ffafbd36a Mon Sep 17 00:00:00 2001 From: davicorreiajr Date: Sat, 28 Mar 2020 15:46:21 -0300 Subject: [PATCH 2/3] Fix code review comments (fix import order and remove unnecessary assert from test_BufferedSingerStream) --- target_postgres/json_schema.py | 2 +- tests/unit/test_BufferedSingerStream.py | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/target_postgres/json_schema.py b/target_postgres/json_schema.py index a3add20c..8e971877 100644 --- a/target_postgres/json_schema.py +++ b/target_postgres/json_schema.py @@ -1,7 +1,7 @@ from copy import deepcopy +import decimal import json import re -import decimal from jsonschema import Draft4Validator from jsonschema.exceptions import SchemaError diff --git a/tests/unit/test_BufferedSingerStream.py b/tests/unit/test_BufferedSingerStream.py index 141d49fc..1a6e4e5c 100644 --- a/tests/unit/test_BufferedSingerStream.py +++ b/tests/unit/test_BufferedSingerStream.py @@ -140,7 +140,6 @@ def test_add_record_message__multipleOf_invalid_record(): assert singer_stream.peek_invalid_records() assert singer_stream.count == 0 - assert [] == missing_sdc_properties(singer_stream) SIMPLE_ALLOF_SCHEMA = { From db6a87bf3706e85825fda4041ca0312bbbc0dee2 Mon Sep 17 00:00:00 2001 From: davicorreiajr Date: Mon, 30 Mar 2020 11:32:04 -0300 Subject: [PATCH 3/3] Fix after code review (unecessary assert) --- tests/unit/test_BufferedSingerStream.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/unit/test_BufferedSingerStream.py b/tests/unit/test_BufferedSingerStream.py index 1a6e4e5c..077d4e16 100644 --- a/tests/unit/test_BufferedSingerStream.py +++ b/tests/unit/test_BufferedSingerStream.py @@ -115,7 +115,6 @@ def test_add_record_message__multipleOf(): assert not singer_stream.peek_invalid_records() assert singer_stream.count == len(multiple_of_values) - assert [] == missing_sdc_properties(singer_stream) def test_add_record_message__multipleOf_invalid_record():