Skip to content

Commit

Permalink
Merge pull request #179 from davicorreiajr/fix/validation-error-decimal
Browse files Browse the repository at this point in the history
Fix parse to properly support multipleOf (to avoid validation error)
  • Loading branch information
AlexanderMann authored Mar 30, 2020
2 parents 4f07509 + db6a87b commit dc1020d
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 3 deletions.
4 changes: 3 additions & 1 deletion target_postgres/json_schema.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from copy import deepcopy
import decimal
import json
import re

Expand All @@ -20,7 +21,8 @@
float: NUMBER,
bool: BOOLEAN,
str: STRING,
type(None): NULL
type(None): NULL,
decimal.Decimal: NUMBER
}


Expand Down
3 changes: 2 additions & 1 deletion target_postgres/target_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import pkg_resources
import sys
import threading
import decimal

import singer
from singer import utils, metadata, metrics
Expand Down Expand Up @@ -90,7 +91,7 @@ def _report_invalid_records(streams):
def _line_handler(state_tracker, target, invalid_records_detect, invalid_records_threshold, max_batch_rows,
max_batch_size, line):
try:
line_data = json.loads(line)
line_data = json.loads(line, parse_float=decimal.Decimal)
except json.decoder.JSONDecodeError:
LOGGER.error("Unable to parse JSON: {}".format(line))
raise
Expand Down
68 changes: 67 additions & 1 deletion tests/unit/test_BufferedSingerStream.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from decimal import Decimal
from copy import deepcopy

import pytest

from target_postgres import singer
from target_postgres.singer_stream import BufferedSingerStream, SingerStreamError
from target_postgres.singer_stream import BufferedSingerStream, SingerStreamError, RAW_LINE_SIZE

from utils.fixtures import CatStream, InvalidCatStream, CATS_SCHEMA

Expand Down Expand Up @@ -75,6 +76,71 @@ def test_add_record_message__invalid_record():
assert [] == missing_sdc_properties(singer_stream)


SIMPLE_MULTIPLE_OF_VALID_SCHEMA = {
'properties': {
'multipleOfKey': {
'type': 'number',
'multipleOf': Decimal('1e-15')
}
}
}

SIMPLE_MULTIPLE_OF_INVALID_SCHEMA = {
'properties': {
'multipleOfKey': {
'type': 'number',
'multipleOf': 1e-15
}
}
}

def test_add_record_message__multipleOf():
stream_name = 'test'
singer_stream = BufferedSingerStream(stream_name,
deepcopy(SIMPLE_MULTIPLE_OF_VALID_SCHEMA),
[])

multiple_of_values = ['1', '2', '3', '4', '5', '1.1', '2.3', '1.23456789', '20', '100.1']

for value in multiple_of_values:
singer_stream.add_record_message(
{
'type': 'RECORD',
'stream': stream_name,
'record': {'multipleOfKey': Decimal(value)},
'sequence': 0,
RAW_LINE_SIZE: 100
}
)

assert not singer_stream.peek_invalid_records()
assert singer_stream.count == len(multiple_of_values)


def test_add_record_message__multipleOf_invalid_record():
stream_name = 'test'
singer_stream = BufferedSingerStream(stream_name,
deepcopy(SIMPLE_MULTIPLE_OF_INVALID_SCHEMA),
[])

multiple_of_values = [1, 2]

for value in multiple_of_values:
with pytest.raises(SingerStreamError):
singer_stream.add_record_message(
{
'type': 'RECORD',
'stream': stream_name,
'record': {'multipleOfKey': value},
'sequence': 0,
RAW_LINE_SIZE: 100
}
)

assert singer_stream.peek_invalid_records()
assert singer_stream.count == 0


SIMPLE_ALLOF_SCHEMA = {
'type': 'object',
'properties': {
Expand Down
3 changes: 3 additions & 0 deletions tests/unit/test_json_schema.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import re
import decimal

import pytest

Expand Down Expand Up @@ -31,6 +32,8 @@ def test_python_type():
== json_schema.STRING
assert json_schema.python_type('world') \
== json_schema.STRING
assert json_schema.python_type(decimal.Decimal(1)) \
== json_schema.NUMBER


def test_is_object():
Expand Down
39 changes: 39 additions & 0 deletions tests/unit/test_target_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,45 @@ class TestStream(ListStream):
assert rows_persisted == expected_rows


def test_record_with_multiple_of():
values = [1, 1.0, 2, 2.0, 3, 7, 10.1]
records = []
for value in values:
records.append({
"type": "RECORD",
"stream": "test",
"record": {"multipleOfKey": value},
})

class TestStream(ListStream):
stream = [
{
"type": "SCHEMA",
"stream": "test",
"schema": {
"properties": {
"multipleOfKey": {
"type": "number",
"multipleOf": 1e-15
}
}
},
"key_properties": []
}
] + records

target = Target()

target_tools.stream_to_target(TestStream(), target, config=CONFIG.copy())

expected_rows = len(records)
rows_persisted = 0
for call in target.calls['write_batch']:
rows_persisted += call['records_count']

assert rows_persisted == expected_rows


def test_state__capture(capsys):
stream = [
json.dumps({'type': 'STATE', 'value': {'test': 'state-1'}}),
Expand Down

0 comments on commit dc1020d

Please sign in to comment.