|
29 | 29 | import uuid
|
30 | 30 | from collections import OrderedDict, abc
|
31 | 31 | from io import BytesIO
|
32 |
| -from typing import Optional |
33 | 32 |
|
34 | 33 | sys.path[0:0] = [""]
|
35 | 34 |
|
@@ -1302,63 +1301,65 @@ def test_tz_clamping_utc(self):
|
1302 | 1301 | )
|
1303 | 1302 |
|
1304 | 1303 | def test_tz_clamping_non_utc(self):
|
1305 |
| - class DivergentTimezone(FixedOffset): |
1306 |
| - """A timezone that reverses the offset for dates before 1970.""" |
1307 |
| - |
1308 |
| - def utcoffset(self, dt: Optional[datetime]) -> datetime.timedelta: |
1309 |
| - if dt is None: |
1310 |
| - raise TypeError("DivergentTimezone.utcoffset requires a datetime") |
1311 |
| - offset = super().utcoffset(dt) |
1312 |
| - if dt.year < 1970: |
1313 |
| - return -offset |
1314 |
| - return offset |
1315 |
| - |
1316 |
| - tz = DivergentTimezone(60, "Custom") |
1317 |
| - opts = CodecOptions( |
1318 |
| - datetime_conversion=DatetimeConversion.DATETIME_CLAMP, tz_aware=True, tzinfo=tz |
1319 |
| - ) |
1320 |
| - # Min/max values in this timezone which can be represented in both BSON and datetime UTC. |
1321 |
| - min_tz = ( |
1322 |
| - datetime.datetime.min.replace(tzinfo=utc) + datetime.timedelta(minutes=60) |
1323 |
| - ).astimezone(tz) |
1324 |
| - max_tz = ( |
1325 |
| - datetime.datetime.max.replace(tzinfo=utc, microsecond=999000) |
1326 |
| - - datetime.timedelta(minutes=60) |
1327 |
| - ).astimezone(tz) |
1328 |
| - # Sanity check: |
1329 |
| - self.assertEqual(min_tz, datetime.datetime.min.replace(tzinfo=tz)) |
1330 |
| - self.assertEqual(max_tz, datetime.datetime.max.replace(tzinfo=tz, microsecond=999000)) |
1331 |
| - self.assertEqual(tz.utcoffset(datetime.datetime.min), datetime.timedelta(minutes=-60)) |
1332 |
| - self.assertEqual(tz.utcoffset(datetime.datetime.max), datetime.timedelta(minutes=60)) |
1333 |
| - for in_range in [ |
1334 |
| - min_tz, |
1335 |
| - min_tz + datetime.timedelta(milliseconds=1), |
1336 |
| - max_tz - datetime.timedelta(milliseconds=1), |
1337 |
| - max_tz, |
1338 |
| - ]: |
1339 |
| - doc = decode(encode({"x": in_range}), opts) |
1340 |
| - self.assertEqual(doc["x"], in_range) |
1341 |
| - |
1342 |
| - for too_low in [ |
1343 |
| - DatetimeMS(_datetime_to_millis(min_tz) - 1), |
1344 |
| - DatetimeMS(_datetime_to_millis(min_tz) - 60 * 60 * 1000), |
1345 |
| - DatetimeMS(_datetime_to_millis(datetime.datetime.min) - 1), |
1346 |
| - DatetimeMS(_datetime_to_millis(datetime.datetime.min) - 60 * 60 * 1000), |
1347 |
| - DatetimeMS(_datetime_to_millis(datetime.datetime.min) - 1 - 60 * 60 * 1000), |
1348 |
| - ]: |
1349 |
| - doc = decode(encode({"x": too_low}), opts) |
1350 |
| - self.assertEqual(doc["x"], min_tz) |
1351 |
| - |
1352 |
| - for too_high in [ |
1353 |
| - max_tz + datetime.timedelta(microseconds=1), |
1354 |
| - max_tz + datetime.timedelta(microseconds=999), |
1355 |
| - datetime.datetime.max.replace(tzinfo=tz), |
1356 |
| - DatetimeMS(_datetime_to_millis(max_tz) + 1), |
1357 |
| - DatetimeMS(_datetime_to_millis(max_tz) + 60 * 60 * 1000), |
1358 |
| - DatetimeMS(_datetime_to_millis(max_tz) + 1 + 60 * 60 * 1000), |
| 1304 | + for tz in [FixedOffset(60, "+1H"), FixedOffset(-60, "-1H")]: |
| 1305 | + opts = CodecOptions( |
| 1306 | + datetime_conversion=DatetimeConversion.DATETIME_CLAMP, tz_aware=True, tzinfo=tz |
| 1307 | + ) |
| 1308 | + # Min/max values in this timezone which can be represented in both BSON and datetime UTC. |
| 1309 | + try: |
| 1310 | + min_tz = datetime.datetime.min.replace(tzinfo=utc).astimezone(tz) |
| 1311 | + except OverflowError: |
| 1312 | + min_tz = datetime.datetime.min.replace(tzinfo=tz) |
| 1313 | + try: |
| 1314 | + max_tz = datetime.datetime.max.replace(tzinfo=utc, microsecond=999000).astimezone( |
| 1315 | + tz |
| 1316 | + ) |
| 1317 | + except OverflowError: |
| 1318 | + max_tz = datetime.datetime.max.replace(tzinfo=tz, microsecond=999000) |
| 1319 | + |
| 1320 | + for in_range in [ |
| 1321 | + min_tz, |
| 1322 | + min_tz + datetime.timedelta(milliseconds=1), |
| 1323 | + max_tz - datetime.timedelta(milliseconds=1), |
| 1324 | + max_tz, |
| 1325 | + ]: |
| 1326 | + doc = decode(encode({"x": in_range}), opts) |
| 1327 | + self.assertEqual(doc["x"], in_range) |
| 1328 | + |
| 1329 | + for too_low in [ |
| 1330 | + DatetimeMS(_datetime_to_millis(min_tz) - 1), |
| 1331 | + DatetimeMS(_datetime_to_millis(min_tz) - 60 * 60 * 1000), |
| 1332 | + DatetimeMS(_datetime_to_millis(min_tz) - 1 - 60 * 60 * 1000), |
| 1333 | + DatetimeMS(_datetime_to_millis(datetime.datetime.min) - 1), |
| 1334 | + DatetimeMS(_datetime_to_millis(datetime.datetime.min) - 60 * 60 * 1000), |
| 1335 | + DatetimeMS(_datetime_to_millis(datetime.datetime.min) - 1 - 60 * 60 * 1000), |
| 1336 | + ]: |
| 1337 | + doc = decode(encode({"x": too_low}), opts) |
| 1338 | + self.assertEqual(doc["x"], min_tz) |
| 1339 | + |
| 1340 | + for too_high in [ |
| 1341 | + DatetimeMS(_datetime_to_millis(max_tz) + 1), |
| 1342 | + DatetimeMS(_datetime_to_millis(max_tz) + 60 * 60 * 1000), |
| 1343 | + DatetimeMS(_datetime_to_millis(max_tz) + 1 + 60 * 60 * 1000), |
| 1344 | + DatetimeMS(_datetime_to_millis(datetime.datetime.max) + 1), |
| 1345 | + DatetimeMS(_datetime_to_millis(datetime.datetime.max) + 60 * 60 * 1000), |
| 1346 | + DatetimeMS(_datetime_to_millis(datetime.datetime.max) + 1 + 60 * 60 * 1000), |
| 1347 | + ]: |
| 1348 | + doc = decode(encode({"x": too_high}), opts) |
| 1349 | + self.assertEqual(doc["x"], max_tz) |
| 1350 | + |
| 1351 | + def test_tz_clamping_non_utc_simple(self): |
| 1352 | + dtm = datetime.datetime(2024, 8, 23) |
| 1353 | + encoded = encode({"d": dtm}) |
| 1354 | + self.assertEqual(decode(encoded)["d"], dtm) |
| 1355 | + for conversion in [ |
| 1356 | + DatetimeConversion.DATETIME, |
| 1357 | + DatetimeConversion.DATETIME_CLAMP, |
| 1358 | + DatetimeConversion.DATETIME_AUTO, |
1359 | 1359 | ]:
|
1360 |
| - doc = decode(encode({"x": too_high}), opts) |
1361 |
| - self.assertEqual(doc["x"], max_tz) |
| 1360 | + for tz in [FixedOffset(60, "+1H"), FixedOffset(-60, "-1H")]: |
| 1361 | + opts = CodecOptions(datetime_conversion=conversion, tz_aware=True, tzinfo=tz) |
| 1362 | + self.assertEqual(decode(encoded, opts)["d"], dtm.replace(tzinfo=utc).astimezone(tz)) |
1362 | 1363 |
|
1363 | 1364 | def test_datetime_auto(self):
|
1364 | 1365 | # Naive auto, in range.
|
|
0 commit comments