Skip to content

Commit

Permalink
#501 Fix Jdbc Native treating of nullable fields.
Browse files Browse the repository at this point in the history
  • Loading branch information
yruslan committed Oct 21, 2024
1 parent 0c2e6a5 commit cb68ec6
Show file tree
Hide file tree
Showing 7 changed files with 81 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,27 @@ class ResultSetToRowIterator(rs: ResultSet, sanitizeDateTime: Boolean, incorrect

// WARNING. Do not forget that `null` is a valid value returned by RecordSet methods that return a reference type objects.
dataType match {
case BIT | BOOLEAN => rs.getBoolean(columnIndex)
case TINYINT => rs.getByte(columnIndex)
case SMALLINT => rs.getShort(columnIndex)
case INTEGER => rs.getInt(columnIndex)
case BIGINT => rs.getLong(columnIndex)
case FLOAT => rs.getFloat(columnIndex)
case DOUBLE => rs.getDouble(columnIndex)
case BIT | BOOLEAN =>
val v = rs.getBoolean(columnIndex)
if (rs.wasNull()) null else v
case TINYINT =>
val v = rs.getByte(columnIndex)
if (rs.wasNull()) null else v
case SMALLINT =>
val v = rs.getShort(columnIndex)
if (rs.wasNull()) null else v
case INTEGER =>
val v = rs.getInt(columnIndex)
if (rs.wasNull()) null else v
case BIGINT =>
val v = rs.getLong(columnIndex)
if (rs.wasNull()) null else v
case FLOAT =>
val v = rs.getFloat(columnIndex)
if (rs.wasNull()) null else v
case DOUBLE =>
val v = rs.getDouble(columnIndex)
if (rs.wasNull()) null else v
case REAL => rs.getBigDecimal(columnIndex)
case NUMERIC => rs.getBigDecimal(columnIndex)
case DATE => sanitizeDate(rs.getDate(columnIndex))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ class IngestionJobSuite extends AnyWordSpec with SparkTestBase with TextComparis
assert(df.schema.fields(2).name == "DESCRIPTION")
assert(df.schema.fields(3).name == "EMAIL")
assert(df.schema.fields(4).name == "FOUNDED")
assert(df.schema.fields(5).name == "LAST_UPDATED")
assert(df.schema.fields(5).name == "IS_TAX_FREE")
}

"get the source data frame for source with disabled count query" in {
Expand All @@ -301,7 +301,7 @@ class IngestionJobSuite extends AnyWordSpec with SparkTestBase with TextComparis
assert(df.schema.fields(2).name == "DESCRIPTION")
assert(df.schema.fields(3).name == "EMAIL")
assert(df.schema.fields(4).name == "FOUNDED")
assert(df.schema.fields(5).name == "LAST_UPDATED")
assert(df.schema.fields(5).name == "IS_TAX_FREE")

TransientTableManager.reset()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ object RdbExampleTable {
| description VARCHAR NOT NULL,
| email VARCHAR(50) NOT NULL,
| founded DATE NOT NULL,
| is_tax_free BOOLEAN,
| tax_id BIGINT,
| last_updated TIMESTAMP NOT NULL,
| info_date VARCHAR(10) NOT NULL,
| PRIMARY KEY (id))
Expand All @@ -75,10 +77,10 @@ object RdbExampleTable {
)

val inserts: Seq[String] = Seq(
s"INSERT INTO $tableName VALUES (1,'Company1', 'description1', '[email protected]', DATE '2000-10-11', TIMESTAMP '2020-11-04 10:11:00+02:00', '2022-02-18')",
s"INSERT INTO $tableName VALUES (2,'Company2', 'description2', '[email protected]', DATE '2005-03-29', TIMESTAMP '2020-11-04 10:22:33+02:00', '2022-02-18')",
s"INSERT INTO $tableName VALUES (3,'Company3', 'description3', '[email protected]', DATE '2016-12-30', TIMESTAMP '2020-11-04 10:33:59+02:00', '2022-02-18')",
s"INSERT INTO $tableName VALUES (4,'Company4', 'description4', '[email protected]', DATE '2016-12-31', TIMESTAMP '2020-11-04 10:34:22+02:00', '2022-02-19')"
s"INSERT INTO $tableName VALUES (1,'Company1', 'description1', '[email protected]', DATE '2000-10-11', FALSE, 123, TIMESTAMP '2020-11-04 10:11:00+02:00', '2022-02-18')",
s"INSERT INTO $tableName VALUES (2,'Company2', 'description2', '[email protected]', DATE '2005-03-29', TRUE, 456, TIMESTAMP '2020-11-04 10:22:33+02:00', '2022-02-18')",
s"INSERT INTO $tableName VALUES (3,'Company3', 'description3', '[email protected]', DATE '2016-12-30', FALSE, NULL, TIMESTAMP '2020-11-04 10:33:59+02:00', '2022-02-18')",
s"INSERT INTO $tableName VALUES (4,'Company4', 'description4', '[email protected]', DATE '2016-12-31', NULL, NULL, TIMESTAMP '2020-11-04 10:34:22+02:00', '2022-02-19')"
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ class TableReaderJdbcSuite extends AnyWordSpec with BeforeAndAfterAll with Spark
val df = reader.getData(Query.Table("company"), null, null, Seq.empty[String])

assert(df.count() == 4)
assert(df.schema.fields.length == 7)
assert(df.schema.fields.length == 9)
}

"return selected column for a table snapshot-like query" in {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -711,15 +711,15 @@ class SqlGeneratorLoaderSuite extends AnyWordSpec with RelationalDbFixture {
}

"generate data queries without date ranges" in {
assert(gen.getDataQuery("company", Nil, None) == "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company")
assert(gen.getDataQuery("company", Nil, None) == "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', IS_TAX_FREE 'IS_TAX_FREE', TAX_ID 'TAX_ID', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company")
}

"generate data queries when list of columns is specified" in {
assert(genEscaped.getDataQuery("company", columns, None) == "SELECT 'A'n, 'D'n, 'Column with spaces'n FROM 'company'n")
}

"generate data queries with limit clause date ranges" in {
assert(gen.getDataQuery("company", Nil, Some(100)) == "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company LIMIT 100")
assert(gen.getDataQuery("company", Nil, Some(100)) == "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', IS_TAX_FREE 'IS_TAX_FREE', TAX_ID 'TAX_ID', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company LIMIT 100")
}

"generate ranged count queries" when {
Expand Down Expand Up @@ -761,30 +761,30 @@ class SqlGeneratorLoaderSuite extends AnyWordSpec with RelationalDbFixture {
"generate ranged data queries" when {
"date is in DATE format" in {
assert(gen.getDataQuery("company", date1, date1, Nil, None) ==
"SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D = date'2020-08-17'")
"SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', IS_TAX_FREE 'IS_TAX_FREE', TAX_ID 'TAX_ID', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D = date'2020-08-17'")
assert(gen.getDataQuery("company", date1, date2, Nil, None) ==
"SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D >= date'2020-08-17' AND D <= date'2020-08-30'")
"SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', IS_TAX_FREE 'IS_TAX_FREE', TAX_ID 'TAX_ID', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D >= date'2020-08-17' AND D <= date'2020-08-30'")
}

"date is in STRING format" in {
assert(genStr.getDataQuery("company", date1, date1, Nil, None) ==
"SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D = '2020-08-17'")
"SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', IS_TAX_FREE 'IS_TAX_FREE', TAX_ID 'TAX_ID', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D = '2020-08-17'")
assert(genStr.getDataQuery("company", date1, date2, Nil, None) ==
"SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D >= '2020-08-17' AND D <= '2020-08-30'")
"SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', IS_TAX_FREE 'IS_TAX_FREE', TAX_ID 'TAX_ID', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D >= '2020-08-17' AND D <= '2020-08-30'")
}

"date is in NUMBER format" in {
assert(genNum.getDataQuery("company", date1, date1, Nil, None) ==
"SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D = 20200817")
"SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', IS_TAX_FREE 'IS_TAX_FREE', TAX_ID 'TAX_ID', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D = 20200817")
assert(genNum.getDataQuery("company", date1, date2, Nil, None) ==
"SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D >= 20200817 AND D <= 20200830")
"SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', IS_TAX_FREE 'IS_TAX_FREE', TAX_ID 'TAX_ID', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D >= 20200817 AND D <= 20200830")
}

"with limit records" in {
assert(gen.getDataQuery("company", date1, date1, Nil, Some(100)) ==
"SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D = date'2020-08-17' LIMIT 100")
"SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', IS_TAX_FREE 'IS_TAX_FREE', TAX_ID 'TAX_ID', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D = date'2020-08-17' LIMIT 100")
assert(gen.getDataQuery("company", date1, date2, Nil, Some(100)) ==
"SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D >= date'2020-08-17' AND D <= date'2020-08-30' LIMIT 100")
"SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', IS_TAX_FREE 'IS_TAX_FREE', TAX_ID 'TAX_ID', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D >= date'2020-08-17' AND D <= date'2020-08-30' LIMIT 100")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,16 @@ class JdbcNativeUtilsSuite extends AnyWordSpec with RelationalDbFixture with Spa
| "nullable" : true,
| "metadata" : { }
| }, {
| "name" : "IS_TAX_FREE",
| "type" : "boolean",
| "nullable" : true,
| "metadata" : { }
| }, {
| "name" : "TAX_ID",
| "type" : "long",
| "nullable" : true,
| "metadata" : { }
| }, {
| "name" : "LAST_UPDATED",
| "type" : "timestamp",
| "nullable" : true,
Expand All @@ -163,25 +173,30 @@ class JdbcNativeUtilsSuite extends AnyWordSpec with RelationalDbFixture with Spa
| "ID" : 1,
| "NAME" : "Company1",
| "EMAIL" : "[email protected]",
| "FOUNDED" : "2000-10-11"
| "FOUNDED" : "2000-10-11",
| "IS_TAX_FREE" : false,
| "TAX_ID" : 123
|}, {
| "ID" : 2,
| "NAME" : "Company2",
| "EMAIL" : "[email protected]",
| "FOUNDED" : "2005-03-29"
| "FOUNDED" : "2005-03-29",
| "IS_TAX_FREE" : true,
| "TAX_ID" : 456
|}, {
| "ID" : 3,
| "NAME" : "Company3",
| "EMAIL" : "[email protected]",
| "FOUNDED" : "2016-12-30"
| "FOUNDED" : "2016-12-30",
| "IS_TAX_FREE" : false
|}, {
| "ID" : 4,
| "NAME" : "Company4",
| "EMAIL" : "[email protected]",
| "FOUNDED" : "2016-12-31"
|} ]""".stripMargin

val df = JdbcNativeUtils.getJdbcNativeDataFrame(jdbcConfig, jdbcConfig.primaryUrl.get, s"SELECT id, name, email, founded FROM $tableName")
val df = JdbcNativeUtils.getJdbcNativeDataFrame(jdbcConfig, jdbcConfig.primaryUrl.get, s"SELECT id, name, email, founded, is_tax_free, tax_id FROM $tableName")
val actual = SparkUtils.convertDataFrameToPrettyJSON(df)

compareText(actual, expected)
Expand Down Expand Up @@ -236,35 +251,35 @@ class JdbcNativeUtilsSuite extends AnyWordSpec with RelationalDbFixture with Spa
}

"getDecimalDataType" should {
val resultSet = mock(classOf[ResultSet])
val resultSetMetaData = mock(classOf[ResultSetMetaData])
val resultSet = mock(classOf[ResultSet])
val resultSetMetaData = mock(classOf[ResultSetMetaData])

when(resultSetMetaData.getColumnCount).thenReturn(1)
when(resultSet.getMetaData).thenReturn(resultSetMetaData)
when(resultSetMetaData.getColumnCount).thenReturn(1)
when(resultSet.getMetaData).thenReturn(resultSetMetaData)

"return normal decimal for correct precision and scale" in {
val iterator = new ResultSetToRowIterator(resultSet, true, incorrectDecimalsAsString = false)
when(resultSetMetaData.getPrecision(0)).thenReturn(10)
when(resultSetMetaData.getScale(0)).thenReturn(2)
"return normal decimal for correct precision and scale" in {
val iterator = new ResultSetToRowIterator(resultSet, true, incorrectDecimalsAsString = false)
when(resultSetMetaData.getPrecision(0)).thenReturn(10)
when(resultSetMetaData.getScale(0)).thenReturn(2)

assert(iterator.getDecimalDataType(0) == NUMERIC)
}
assert(iterator.getDecimalDataType(0) == NUMERIC)
}

"return fixed decimal for incorrect precision and scale" in {
val iterator = new ResultSetToRowIterator(resultSet, true, incorrectDecimalsAsString = false)
when(resultSetMetaData.getPrecision(0)).thenReturn(0)
when(resultSetMetaData.getScale(0)).thenReturn(2)
"return fixed decimal for incorrect precision and scale" in {
val iterator = new ResultSetToRowIterator(resultSet, true, incorrectDecimalsAsString = false)
when(resultSetMetaData.getPrecision(0)).thenReturn(0)
when(resultSetMetaData.getScale(0)).thenReturn(2)

assert(iterator.getDecimalDataType(0) == NUMERIC)
}
assert(iterator.getDecimalDataType(0) == NUMERIC)
}

"return string type for incorrect precision and scale" in {
val iterator = new ResultSetToRowIterator(resultSet, true, incorrectDecimalsAsString = true)
when(resultSetMetaData.getPrecision(0)).thenReturn(0)
when(resultSetMetaData.getScale(0)).thenReturn(2)
"return string type for incorrect precision and scale" in {
val iterator = new ResultSetToRowIterator(resultSet, true, incorrectDecimalsAsString = true)
when(resultSetMetaData.getPrecision(0)).thenReturn(0)
when(resultSetMetaData.getScale(0)).thenReturn(2)

assert(iterator.getDecimalDataType(0) == VARCHAR)
}
assert(iterator.getDecimalDataType(0) == VARCHAR)
}
}

"sanitizeDateTime" when {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,9 +263,6 @@ class SparkUtilsSuite extends AnyWordSpec with SparkTestBase with TempDirFixture
val newField2 = schema1Orig.fields.head.copy(metadata = metadata2)
val schema2 = schema1Orig.copy(fields = newField2 +: schema1Orig.fields.tail)

println(schema1.prettyJson)
println(schema2.prettyJson)

val diff = compareSchemas(schema1, schema2)

assert(diff.length == 1)
Expand Down

0 comments on commit cb68ec6

Please sign in to comment.