Skip to content

Commit

Permalink
Fix for a bug affecting gzip json being writtern in uncompressed form…
Browse files Browse the repository at this point in the history
…at (#384)

* Fix for a bug affecting gzip json being writtern in uncompressed format

* Changelog
  • Loading branch information
matteofigus authored Oct 13, 2023
1 parent 7aa03bf commit e9d3144
Show file tree
Hide file tree
Showing 13 changed files with 80 additions and 27 deletions.
7 changes: 7 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,34 +1,40 @@
fail_fast: true
repos:
- repo: local
hooks:
- id: format-cfn
name: Format CloudFormation
entry: make format-cfn
language: system
files: ^templates/
- repo: local
hooks:
- id: format-js
name: Format Javascript
entry: make format-js
language: system
files: ^frontend/
- repo: local
hooks:
- id: format-python
name: Format Python Code
entry: make format-python
language: system
files: ^backend/
- repo: local
hooks:
- id: format-docs
name: Format Markdown docs
entry: make format-docs
language: system
files: .*\.md
- repo: local
hooks:
- id: generate-api-docs
name: Generate API Docs
entry: make generate-api-docs
language: system
files: ^templates/api.yaml
- repo: https://github.com/awslabs/git-secrets
rev: 5e28df337746db4f070c84f7069d365bfd0d72a8
hooks:
Expand All @@ -39,3 +45,4 @@ repos:
name: Lint CloudFormation templates
entry: make lint-cfn
language: system
files: ^templates/
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
# Change Log

## v0.63 (unreleased)
## v0.63

- [#376](https://github.com/awslabs/amazon-s3-find-and-forget/issues/376):
Upgrade backend dependencies
- [#377](https://github.com/awslabs/amazon-s3-find-and-forget/issues/377):
Upgrade backend dependencies
- [#383](https://github.com/awslabs/amazon-s3-find-and-forget/issues/383):
Upgrade backend dependencies
- [#384](https://github.com/awslabs/amazon-s3-find-and-forget/issues/384):
- Fix an issue impacting JSON Gzipped S3 objects being written uncompressed
after a deletion Job
- Upgrade backend dependencies

## v0.62

Expand Down
9 changes: 7 additions & 2 deletions backend/ecs_tasks/delete_files/json_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,12 @@ def get_value(key, obj):
return obj


def delete_matches_from_json_file(input_file, to_delete):
def delete_matches_from_json_file(input_file, to_delete, compressed=False):
deleted_rows = 0
with BufferOutputStream() as out_stream:
writer = (
CompressedOutputStream(out_stream, "gzip") if compressed else out_stream
)
content = input_file.read().decode("utf-8")
total_rows = 0
for parsed, line in json_lines_iterator(content, include_unparsed=True):
Expand All @@ -64,6 +67,8 @@ def delete_matches_from_json_file(input_file, to_delete):
if should_delete:
deleted_rows += 1
else:
out_stream.write(bytes(line + "\n", "utf-8"))
writer.write(bytes(line + "\n", "utf-8"))
if compressed:
writer.close()
stats = Counter({"ProcessedRows": total_rows, "DeletedRows": deleted_rows})
return out_stream, stats
7 changes: 4 additions & 3 deletions backend/ecs_tasks/delete_files/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,10 @@ def validate_message(message):
raise ValueError("Malformed message. Missing key: %s", k)


def delete_matches_from_file(input_file, to_delete, file_format):
def delete_matches_from_file(input_file, to_delete, file_format, compressed=False):
logger.info("Generating new file without matches")
if file_format == "json":
return delete_matches_from_json_file(input_file, to_delete)
return delete_matches_from_json_file(input_file, to_delete, compressed)
return delete_matches_from_parquet_file(input_file, to_delete)


Expand Down Expand Up @@ -161,14 +161,15 @@ def execute(queue_url, message_body, receipt_handle):
source_version = f.metadata()["VersionId"].decode("utf-8")
logger.info("Using object version %s as source", source_version)
# Write new file in-memory
compressed = object_path.endswith(".gz")
object_info, _ = get_object_info(
client, input_bucket, input_key, source_version
)
metadata = object_info["Metadata"]
is_encrypted = is_kms_cse_encrypted(metadata)
input_file = decrypt(f, metadata, kms_client) if is_encrypted else f
out_sink, stats = delete_matches_from_file(
input_file, match_ids, file_format
input_file, match_ids, file_format, compressed
)
if stats["DeletedRows"] == 0:
raise ValueError(
Expand Down
2 changes: 1 addition & 1 deletion backend/ecs_tasks/delete_files/requirements.in
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
pyarrow==8.0.0
pyarrow==13.0.0
python-snappy==0.6.1
pandas==1.4.3
boto3==1.24.38
Expand Down
2 changes: 1 addition & 1 deletion backend/ecs_tasks/delete_files/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ numpy==1.22.0
# pyarrow
pandas==1.4.3
# via -r backend/ecs_tasks/delete_files/requirements.in
pyarrow==8.0.0
pyarrow==13.0.0
# via -r backend/ecs_tasks/delete_files/requirements.in
pycparser==2.21
# via cffi
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ pluggy==1.0.0
# via pytest
pre-commit==2.12.1
# via -r requirements.in
pyarrow==8.0.0
pyarrow==13.0.0
# via -r ./backend/ecs_tasks/delete_files/requirements.txt
pycparser==2.21
# via
Expand Down
4 changes: 2 additions & 2 deletions templates/template.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
AWSTemplateFormatVersion: "2010-09-09"
Transform: AWS::Serverless-2016-10-31
Description: Amazon S3 Find and Forget (uksb-1q2j8beb0) (version:v0.62) (tag:main)
Description: Amazon S3 Find and Forget (uksb-1q2j8beb0) (version:v0.63) (tag:main)

Parameters:
AccessControlAllowOriginOverride:
Expand Down Expand Up @@ -206,7 +206,7 @@ Conditions:
Mappings:
Solution:
Constants:
Version: 'v0.62'
Version: 'v0.63'

Resources:
TempBucket:
Expand Down
6 changes: 6 additions & 0 deletions tests/acceptance/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import gzip
import logging
import tempfile
from pathlib import Path
Expand Down Expand Up @@ -50,6 +51,11 @@ def query_parquet_file(f, column, val):
return [i for i in table.column(column) if i.as_py() == val]


def query_compressed_json_file(f, column, val):
f_gz = gzip.open(f, "rb")
return query_json_file(f_gz, column, val)


def query_json_file(f, column, val):
table = pj.read_json(f)
return [i for i in table.column(column) if i.as_py() == val]
Expand Down
14 changes: 10 additions & 4 deletions tests/acceptance/test_job_cognito.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,12 @@
import pytest
from decimal import Decimal

from tests.acceptance import query_json_file, query_parquet_file, download_and_decrypt
from tests.acceptance import (
query_json_file,
query_compressed_json_file,
query_parquet_file,
download_and_decrypt,
)

pytestmark = [
pytest.mark.acceptance_cognito,
Expand Down Expand Up @@ -539,9 +544,10 @@ def test_it_runs_for_gzip_json_happy_path(
"COMPLETED"
== job_table.get_item(Key={"Id": job_id, "Sk": job_id})["Item"]["JobStatus"]
)
assert 0 == len(query_json_file(tmp.name, "customer_id", "12345"))
assert 1 == len(query_json_file(tmp.name, "customer_id", "23456"))
assert 1 == len(query_json_file(tmp.name, "customer_id", "34567"))

assert 0 == len(query_compressed_json_file(tmp.name, "customer_id", "12345"))
assert 1 == len(query_compressed_json_file(tmp.name, "customer_id", "23456"))
assert 1 == len(query_compressed_json_file(tmp.name, "customer_id", "34567"))
assert 2 == len(list(bucket.object_versions.filter(Prefix=object_key)))
assert {"foo": "bar"} == bucket.Object(object_key).metadata
assert "cache" == bucket.Object(object_key).cache_control
Expand Down
13 changes: 9 additions & 4 deletions tests/acceptance/test_job_iam.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,12 @@
import pytest
from decimal import Decimal

from tests.acceptance import query_json_file, query_parquet_file, download_and_decrypt
from tests.acceptance import (
query_json_file,
query_compressed_json_file,
query_parquet_file,
download_and_decrypt,
)

pytestmark = [
pytest.mark.acceptance_iam,
Expand Down Expand Up @@ -529,9 +534,9 @@ def test_it_runs_for_gzip_json_happy_path(
"COMPLETED"
== job_table.get_item(Key={"Id": job_id, "Sk": job_id})["Item"]["JobStatus"]
)
assert 0 == len(query_json_file(tmp.name, "customer_id", "12345"))
assert 1 == len(query_json_file(tmp.name, "customer_id", "23456"))
assert 1 == len(query_json_file(tmp.name, "customer_id", "34567"))
assert 0 == len(query_compressed_json_file(tmp.name, "customer_id", "12345"))
assert 1 == len(query_compressed_json_file(tmp.name, "customer_id", "23456"))
assert 1 == len(query_compressed_json_file(tmp.name, "customer_id", "34567"))
assert 2 == len(list(bucket.object_versions.filter(Prefix=object_key)))
assert {"foo": "bar"} == bucket.Object(object_key).metadata
assert "cache" == bucket.Object(object_key).cache_control
Expand Down
25 changes: 22 additions & 3 deletions tests/unit/ecs_tasks/test_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,25 @@ def test_it_generates_new_json_file_without_matches():
)


def test_it_handles_json_with_gzip_compression():
# Arrange
to_delete = [{"Column": "customer_id", "MatchIds": ["23456"], "Type": "Simple"}]
data = (
'{"customer_id": "12345", "x": 7, "d":"2001-01-01"}\n'
'{"customer_id": "23456", "x": 8, "d":"2001-01-03"}\n'
'{"customer_id": "34567", "x": 9, "d":"2001-01-05"}\n'
)
out_stream = to_json_file(data)
# Act
out, stats = delete_matches_from_json_file(out_stream, to_delete, True)
assert isinstance(out, pa.BufferOutputStream)
assert {"ProcessedRows": 3, "DeletedRows": 1} == stats
assert to_decompressed_json_string(out) == (
'{"customer_id": "12345", "x": 7, "d":"2001-01-01"}\n'
'{"customer_id": "34567", "x": 9, "d":"2001-01-05"}\n'
)


def test_delete_correct_rows_when_missing_newline_at_the_end():
# Arrange
to_delete = [{"Column": "customer_id", "MatchIds": ["23456"], "Type": "Simple"}]
Expand All @@ -40,7 +59,7 @@ def test_delete_correct_rows_when_missing_newline_at_the_end():
)
out_stream = to_json_file(data)
# Act
out, stats = delete_matches_from_json_file(out_stream, to_delete)
out, stats = delete_matches_from_json_file(out_stream, to_delete, False)
assert isinstance(out, pa.BufferOutputStream)
assert {"ProcessedRows": 3, "DeletedRows": 1} == stats
assert to_json_string(out) == (
Expand All @@ -60,7 +79,7 @@ def test_delete_correct_rows_containing_newlines_as_content():
)
out_stream = to_json_file(data)
# Act
out, stats = delete_matches_from_json_file(out_stream, to_delete)
out, stats = delete_matches_from_json_file(out_stream, to_delete, False)
assert isinstance(out, pa.BufferOutputStream)
assert {"ProcessedRows": 3, "DeletedRows": 1} == stats
assert to_json_string(out) == (
Expand All @@ -79,7 +98,7 @@ def test_delete_correct_rows_from_json_file_with_complex_types():
)
out_stream = to_json_file(data)
# Act
out, stats = delete_matches_from_json_file(out_stream, to_delete)
out, stats = delete_matches_from_json_file(out_stream, to_delete, False)
assert isinstance(out, pa.BufferOutputStream)
assert {"ProcessedRows": 3, "DeletedRows": 1} == stats
assert to_json_string(out) == (
Expand Down
10 changes: 5 additions & 5 deletions tests/unit/ecs_tasks/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def test_happy_path_when_queue_not_empty(
mock_fs.open_input_stream.assert_called_with(
"bucket/path/basic.parquet", buffer_size=5 * 2**20
)
mock_delete.assert_called_with(ANY, [column], "parquet")
mock_delete.assert_called_with(ANY, [column], "parquet", False)
mock_save.assert_called_with(ANY, ANY, "bucket", "path/basic.parquet", {}, "abc123")
mock_emit.assert_called()
mock_session.assert_called_with(None, "s3f2")
Expand Down Expand Up @@ -146,7 +146,7 @@ def test_happy_path_when_queue_not_empty_for_compressed_json(
mock_fs.open_input_stream.assert_called_with(
"bucket/path/basic.json.gz", buffer_size=5 * 2**20
)
mock_delete.assert_called_with(ANY, [column], "json")
mock_delete.assert_called_with(ANY, [column], "json", True)
mock_save.assert_called_with(ANY, ANY, "bucket", "path/basic.json.gz", {}, "abc123")
mock_emit.assert_called()
mock_session.assert_called_with(None, "s3f2")
Expand Down Expand Up @@ -216,7 +216,7 @@ def test_cse_kms_encrypted(
mock_fs.open_input_stream.assert_called_with(
"bucket/path/basic.parquet", buffer_size=5 * 2**20
)
mock_delete.assert_called_with(mock_file_decrypted, [column], "parquet")
mock_delete.assert_called_with(mock_file_decrypted, [column], "parquet", False)
mock_encrypt.assert_called_with(ANY, metadata, ANY)
mock_save.assert_called_with(
ANY,
Expand Down Expand Up @@ -1105,8 +1105,8 @@ def test_it_sets_kill_handlers(mock_queue, mock_signal):
def test_it_deletes_from_json_file(mock_parquet, mock_json):
f = MagicMock()
cols = MagicMock()
delete_matches_from_file(f, cols, "json")
mock_json.assert_called_with(f, cols)
delete_matches_from_file(f, cols, "json", False)
mock_json.assert_called_with(f, cols, False)
mock_parquet.assert_not_called()


Expand Down

0 comments on commit e9d3144

Please sign in to comment.