Skip to content

Commit

Permalink
feat: Add component CSVDocumentCleaner for removing empty rows and co…
Browse files Browse the repository at this point in the history
…lumns (#8816)

* Initial commit for csv cleaner

* Add release notes

* Update lineterminator

* Update releasenotes/notes/csv-document-cleaner-8eca67e884684c56.yaml

Co-authored-by: David S. Batista <[email protected]>

* alphabetize

* Use lazy import

* Some refactoring

* Some refactoring

---------

Co-authored-by: David S. Batista <[email protected]>
  • Loading branch information
sjrl and davidsbatista authored Feb 6, 2025
1 parent 1f25794 commit 1785ea6
Show file tree
Hide file tree
Showing 5 changed files with 271 additions and 2 deletions.
2 changes: 1 addition & 1 deletion docs/pydoc/config/preprocessors_api.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
loaders:
- type: haystack_pydoc_tools.loaders.CustomPythonLoader
search_path: [../../../haystack/components/preprocessors]
modules: ["document_cleaner", "document_splitter", "recursive_splitter", "text_cleaner"]
modules: ["csv_document_cleaner", "document_cleaner", "document_splitter", "recursive_splitter", "text_cleaner"]
ignore_when_discovered: ["__init__"]
processors:
- type: filter
Expand Down
3 changes: 2 additions & 1 deletion haystack/components/preprocessors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
#
# SPDX-License-Identifier: Apache-2.0

from .csv_document_cleaner import CSVDocumentCleaner
from .document_cleaner import DocumentCleaner
from .document_splitter import DocumentSplitter
from .recursive_splitter import RecursiveDocumentSplitter
from .text_cleaner import TextCleaner

__all__ = ["DocumentSplitter", "DocumentCleaner", "RecursiveDocumentSplitter", "TextCleaner"]
__all__ = ["CSVDocumentCleaner", "DocumentCleaner", "DocumentSplitter", "RecursiveDocumentSplitter", "TextCleaner"]
116 changes: 116 additions & 0 deletions haystack/components/preprocessors/csv_document_cleaner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
# SPDX-FileCopyrightText: 2022-present deepset GmbH <[email protected]>
#
# SPDX-License-Identifier: Apache-2.0

from io import StringIO
from typing import Dict, List

from haystack import Document, component, logging
from haystack.lazy_imports import LazyImport

with LazyImport("Run 'pip install pandas'") as pandas_import:
import pandas as pd

logger = logging.getLogger(__name__)


@component
class CSVDocumentCleaner:
"""
A component for cleaning CSV documents by removing empty rows and columns.
This component processes CSV content stored in Documents, allowing
for the optional ignoring of a specified number of rows and columns before performing
the cleaning operation.
"""

def __init__(self, ignore_rows: int = 0, ignore_columns: int = 0) -> None:
"""
Initializes the CSVDocumentCleaner component.
:param ignore_rows: Number of rows to ignore from the top of the CSV table before processing.
:param ignore_columns: Number of columns to ignore from the left of the CSV table before processing.
Rows and columns ignored using these parameters are preserved in the final output, meaning
they are not considered when removing empty rows and columns.
"""
self.ignore_rows = ignore_rows
self.ignore_columns = ignore_columns
pandas_import.check()

@component.output_types(documents=List[Document])
def run(self, documents: List[Document]) -> Dict[str, List[Document]]:
"""
Cleans CSV documents by removing empty rows and columns while preserving specified ignored rows and columns.
:param documents: List of Documents containing CSV-formatted content.
Processing steps:
1. Reads each document's content as a CSV table.
2. Retains the specified number of `ignore_rows` from the top and `ignore_columns` from the left.
3. Drops any rows and columns that are entirely empty (all NaN values).
4. Reattaches the ignored rows and columns to maintain their original positions.
5. Returns the cleaned CSV content as a new `Document` object.
"""
ignore_rows = self.ignore_rows
ignore_columns = self.ignore_columns

cleaned_documents = []
for document in documents:
try:
df = pd.read_csv(StringIO(document.content), header=None, dtype=object) # type: ignore
except Exception as e:
logger.error(
"Error processing document {id}. Keeping it, but skipping cleaning. Error: {error}",
id=document.id,
error=e,
)
cleaned_documents.append(document)
continue

if ignore_rows > df.shape[0] or ignore_columns > df.shape[1]:
logger.warning(
"Document {id} has fewer rows {df_rows} or columns {df_cols} "
"than the number of rows {rows} or columns {cols} to ignore. "
"Keeping the entire document.",
id=document.id,
df_rows=df.shape[0],
df_cols=df.shape[1],
rows=ignore_rows,
cols=ignore_columns,
)
cleaned_documents.append(document)
continue

# Save ignored rows
ignored_rows = None
if ignore_rows > 0:
ignored_rows = df.iloc[:ignore_rows, :]

# Save ignored columns
ignored_columns = None
if ignore_columns > 0:
ignored_columns = df.iloc[:, :ignore_columns]

# Drop rows and columns that are entirely empty
remaining_df = df.iloc[ignore_rows:, ignore_columns:]
final_df = remaining_df.dropna(axis=0, how="all").dropna(axis=1, how="all")

# Reattach ignored rows
if ignore_rows > 0 and ignored_rows is not None:
# Keep only relevant columns
ignored_rows = ignored_rows.loc[:, final_df.columns]
final_df = pd.concat([ignored_rows, final_df], axis=0)

# Reattach ignored columns
if ignore_columns > 0 and ignored_columns is not None:
# Keep only relevant rows
ignored_columns = ignored_columns.loc[final_df.index, :]
final_df = pd.concat([ignored_columns, final_df], axis=1)

cleaned_documents.append(
Document(
content=final_df.to_csv(index=False, header=False, lineterminator="\n"), meta=document.meta.copy()
)
)
return {"documents": cleaned_documents}
6 changes: 6 additions & 0 deletions releasenotes/notes/csv-document-cleaner-8eca67e884684c56.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
features:
- |
Introduced `CSVDocumentCleaner` component for cleaning CSV documents.
- Removes empty rows and columns, while preserving specified ignored rows and columns.
- Customizable number of rows and columns to ignore during processing.
146 changes: 146 additions & 0 deletions test/components/preprocessors/test_csv_document_cleaner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
# SPDX-FileCopyrightText: 2022-present deepset GmbH <[email protected]>
#
# SPDX-License-Identifier: Apache-2.0

from haystack import Document

from haystack.components.preprocessors.csv_document_cleaner import CSVDocumentCleaner


def test_empty_column() -> None:
csv_content = """,A,B,C
,1,2,3
,4,5,6
"""
csv_document = Document(content=csv_content)
csv_document_cleaner = CSVDocumentCleaner()
result = csv_document_cleaner.run([csv_document])
cleaned_document = result["documents"][0]
assert cleaned_document.content == "A,B,C\n1,2,3\n4,5,6\n"


def test_empty_row() -> None:
csv_content = """A,B,C
1,2,3
,,
4,5,6
"""
csv_document = Document(content=csv_content)
csv_document_cleaner = CSVDocumentCleaner()
result = csv_document_cleaner.run([csv_document])
cleaned_document = result["documents"][0]
assert cleaned_document.content == "A,B,C\n1,2,3\n4,5,6\n"


def test_empty_column_and_row() -> None:
csv_content = """,A,B,C
,1,2,3
,,,
,4,5,6
"""
csv_document = Document(content=csv_content)
csv_document_cleaner = CSVDocumentCleaner()
result = csv_document_cleaner.run([csv_document])
cleaned_document = result["documents"][0]
assert cleaned_document.content == "A,B,C\n1,2,3\n4,5,6\n"


def test_ignore_rows() -> None:
csv_content = """,,
A,B,C
4,5,6
7,8,9
"""
csv_document = Document(content=csv_content, meta={"name": "test.csv"})
csv_document_cleaner = CSVDocumentCleaner(ignore_rows=1)
result = csv_document_cleaner.run([csv_document])
cleaned_document = result["documents"][0]
assert cleaned_document.content == ",,\nA,B,C\n4,5,6\n7,8,9\n"
assert cleaned_document.meta == {"name": "test.csv"}


def test_ignore_rows_2() -> None:
csv_content = """A,B,C
,,
4,5,6
7,8,9
"""
csv_document = Document(content=csv_content, meta={"name": "test.csv"})
csv_document_cleaner = CSVDocumentCleaner(ignore_rows=1)
result = csv_document_cleaner.run([csv_document])
cleaned_document = result["documents"][0]
assert cleaned_document.content == "A,B,C\n4,5,6\n7,8,9\n"
assert cleaned_document.meta == {"name": "test.csv"}


def test_ignore_rows_3() -> None:
csv_content = """A,B,C
4,,6
7,,9
"""
csv_document = Document(content=csv_content, meta={"name": "test.csv"})
csv_document_cleaner = CSVDocumentCleaner(ignore_rows=1)
result = csv_document_cleaner.run([csv_document])
cleaned_document = result["documents"][0]
assert cleaned_document.content == "A,C\n4,6\n7,9\n"
assert cleaned_document.meta == {"name": "test.csv"}


def test_ignore_columns() -> None:
csv_content = """,,A,B
,2,3,4
,7,8,9
"""
csv_document = Document(content=csv_content)
csv_document_cleaner = CSVDocumentCleaner(ignore_columns=1)
result = csv_document_cleaner.run([csv_document])
cleaned_document = result["documents"][0]
assert cleaned_document.content == ",,A,B\n,2,3,4\n,7,8,9\n"


def test_too_many_ignore_rows() -> None:
csv_content = """,,
A,B,C
4,5,6
"""
csv_document = Document(content=csv_content)
csv_document_cleaner = CSVDocumentCleaner(ignore_rows=4)
result = csv_document_cleaner.run([csv_document])
cleaned_document = result["documents"][0]
assert cleaned_document.content == ",,\nA,B,C\n4,5,6\n"


def test_too_many_ignore_columns() -> None:
csv_content = """,,
A,B,C
4,5,6
"""
csv_document = Document(content=csv_content)
csv_document_cleaner = CSVDocumentCleaner(ignore_columns=4)
result = csv_document_cleaner.run([csv_document])
cleaned_document = result["documents"][0]
assert cleaned_document.content == ",,\nA,B,C\n4,5,6\n"


def test_ignore_rows_and_columns() -> None:
csv_content = """,A,B,C
1,item,s,
2,item2,fd,
"""
csv_document = Document(content=csv_content)
csv_document_cleaner = CSVDocumentCleaner(ignore_columns=1, ignore_rows=1)
result = csv_document_cleaner.run([csv_document])
cleaned_document = result["documents"][0]
assert cleaned_document.content == ",A,B\n1,item,s\n2,item2,fd\n"


def test_zero_ignore_rows_and_columns() -> None:
csv_content = """,A,B,C
1,item,s,
2,item2,fd,
"""
csv_document = Document(content=csv_content)
csv_document_cleaner = CSVDocumentCleaner(ignore_columns=0, ignore_rows=0)
result = csv_document_cleaner.run([csv_document])
cleaned_document = result["documents"][0]
assert cleaned_document.content == ",A,B,C\n1,item,s,\n2,item2,fd,\n"

0 comments on commit 1785ea6

Please sign in to comment.