Skip to content

Commit ff57a5a

Browse files
Merge branch 'main' into feat/async_pipeline
2 parents f2bde4f + 1785ea6 commit ff57a5a

File tree

10 files changed

+300
-17
lines changed

10 files changed

+300
-17
lines changed

Diff for: docs/pydoc/config/preprocessors_api.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
loaders:
22
- type: haystack_pydoc_tools.loaders.CustomPythonLoader
33
search_path: [../../../haystack/components/preprocessors]
4-
modules: ["document_cleaner", "document_splitter", "recursive_splitter", "text_cleaner"]
4+
modules: ["csv_document_cleaner", "document_cleaner", "document_splitter", "recursive_splitter", "text_cleaner"]
55
ignore_when_discovered: ["__init__"]
66
processors:
77
- type: filter

Diff for: haystack/components/generators/chat/hugging_face_api.py

+18-7
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
with LazyImport(message="Run 'pip install \"huggingface_hub[inference]>=0.27.0\"'") as huggingface_hub_import:
1717
from huggingface_hub import (
18+
ChatCompletionInputFunctionDefinition,
1819
ChatCompletionInputTool,
1920
ChatCompletionOutput,
2021
ChatCompletionStreamOutput,
@@ -255,8 +256,15 @@ def run(
255256

256257
hf_tools = None
257258
if tools:
258-
hf_tools = [{"type": "function", "function": {**t.tool_spec}} for t in tools]
259-
259+
hf_tools = [
260+
ChatCompletionInputTool(
261+
function=ChatCompletionInputFunctionDefinition(
262+
name=tool.name, description=tool.description, arguments=tool.parameters
263+
),
264+
type="function",
265+
)
266+
for tool in tools
267+
]
260268
return self._run_non_streaming(formatted_messages, generation_kwargs, hf_tools)
261269

262270
def _run_streaming(
@@ -278,13 +286,12 @@ def _run_streaming(
278286
# see https://huggingface.co/docs/huggingface_hub/package_reference/inference_client#huggingface_hub.InferenceClient.chat_completion.n
279287
choice = chunk.choices[0]
280288

281-
text = choice.delta.content
282-
if text:
283-
generated_text += text
289+
text = choice.delta.content or ""
290+
generated_text += text
284291

285292
finish_reason = choice.finish_reason
286293

287-
meta = {}
294+
meta: Dict[str, Any] = {}
288295
if finish_reason:
289296
meta["finish_reason"] = finish_reason
290297

@@ -336,7 +343,11 @@ def _run_non_streaming(
336343
)
337344
tool_calls.append(tool_call)
338345

339-
meta = {"model": self._client.model, "finish_reason": choice.finish_reason, "index": choice.index}
346+
meta: Dict[str, Any] = {
347+
"model": self._client.model,
348+
"finish_reason": choice.finish_reason,
349+
"index": choice.index,
350+
}
340351

341352
usage = {"prompt_tokens": 0, "completion_tokens": 0}
342353
if api_chat_output.usage:

Diff for: haystack/components/generators/chat/hugging_face_local.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ def __init__( # pylint: disable=too-many-positional-arguments
145145
elif isinstance(huggingface_pipeline_kwargs["model"], str):
146146
task = model_info(
147147
huggingface_pipeline_kwargs["model"], token=huggingface_pipeline_kwargs["token"]
148-
).pipeline_tag
148+
).pipeline_tag # type: ignore[assignment] # we'll check below if task is in supported tasks
149149

150150
if task not in PIPELINE_SUPPORTED_TASKS:
151151
raise ValueError(

Diff for: haystack/components/generators/chat/openai.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -431,8 +431,9 @@ def _convert_chat_completion_chunk_to_streaming_chunk(self, chunk: ChatCompletio
431431
Converts the streaming response chunk from the OpenAI API to a StreamingChunk.
432432
433433
:param chunk: The chunk returned by the OpenAI API.
434-
:param choice: The choice returned by the OpenAI API.
435-
:return: The StreamingChunk.
434+
435+
:returns:
436+
The StreamingChunk.
436437
"""
437438
# we stream the content of the chunk if it's not a tool or function call
438439
choice: ChunkChoice = chunk.choices[0]

Diff for: haystack/components/generators/hugging_face_api.py

+5-4
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
from dataclasses import asdict
66
from datetime import datetime
7-
from typing import Any, Callable, Dict, Iterable, List, Optional, Union
7+
from typing import Any, Callable, Dict, Iterable, List, Optional, Union, cast
88

99
from haystack import component, default_from_dict, default_to_dict, logging
1010
from haystack.dataclasses import StreamingChunk
@@ -17,8 +17,8 @@
1717
from huggingface_hub import (
1818
InferenceClient,
1919
TextGenerationOutput,
20-
TextGenerationOutputToken,
2120
TextGenerationStreamOutput,
21+
TextGenerationStreamOutputToken,
2222
)
2323

2424

@@ -212,7 +212,8 @@ def run(
212212
if streaming_callback is not None:
213213
return self._stream_and_build_response(hf_output, streaming_callback)
214214

215-
return self._build_non_streaming_response(hf_output)
215+
# mypy doesn't know that hf_output is a TextGenerationOutput, so we cast it
216+
return self._build_non_streaming_response(cast(TextGenerationOutput, hf_output))
216217

217218
def _stream_and_build_response(
218219
self, hf_output: Iterable["TextGenerationStreamOutput"], streaming_callback: Callable[[StreamingChunk], None]
@@ -221,7 +222,7 @@ def _stream_and_build_response(
221222
first_chunk_time = None
222223

223224
for chunk in hf_output:
224-
token: TextGenerationOutputToken = chunk.token
225+
token: TextGenerationStreamOutputToken = chunk.token
225226
if token.special:
226227
continue
227228

Diff for: haystack/components/preprocessors/__init__.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,10 @@
22
#
33
# SPDX-License-Identifier: Apache-2.0
44

5+
from .csv_document_cleaner import CSVDocumentCleaner
56
from .document_cleaner import DocumentCleaner
67
from .document_splitter import DocumentSplitter
78
from .recursive_splitter import RecursiveDocumentSplitter
89
from .text_cleaner import TextCleaner
910

10-
__all__ = ["DocumentSplitter", "DocumentCleaner", "RecursiveDocumentSplitter", "TextCleaner"]
11+
__all__ = ["CSVDocumentCleaner", "DocumentCleaner", "DocumentSplitter", "RecursiveDocumentSplitter", "TextCleaner"]
+116
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
# SPDX-FileCopyrightText: 2022-present deepset GmbH <[email protected]>
2+
#
3+
# SPDX-License-Identifier: Apache-2.0
4+
5+
from io import StringIO
6+
from typing import Dict, List
7+
8+
from haystack import Document, component, logging
9+
from haystack.lazy_imports import LazyImport
10+
11+
with LazyImport("Run 'pip install pandas'") as pandas_import:
12+
import pandas as pd
13+
14+
logger = logging.getLogger(__name__)
15+
16+
17+
@component
18+
class CSVDocumentCleaner:
19+
"""
20+
A component for cleaning CSV documents by removing empty rows and columns.
21+
22+
This component processes CSV content stored in Documents, allowing
23+
for the optional ignoring of a specified number of rows and columns before performing
24+
the cleaning operation.
25+
"""
26+
27+
def __init__(self, ignore_rows: int = 0, ignore_columns: int = 0) -> None:
28+
"""
29+
Initializes the CSVDocumentCleaner component.
30+
31+
:param ignore_rows: Number of rows to ignore from the top of the CSV table before processing.
32+
:param ignore_columns: Number of columns to ignore from the left of the CSV table before processing.
33+
34+
Rows and columns ignored using these parameters are preserved in the final output, meaning
35+
they are not considered when removing empty rows and columns.
36+
"""
37+
self.ignore_rows = ignore_rows
38+
self.ignore_columns = ignore_columns
39+
pandas_import.check()
40+
41+
@component.output_types(documents=List[Document])
42+
def run(self, documents: List[Document]) -> Dict[str, List[Document]]:
43+
"""
44+
Cleans CSV documents by removing empty rows and columns while preserving specified ignored rows and columns.
45+
46+
:param documents: List of Documents containing CSV-formatted content.
47+
48+
Processing steps:
49+
1. Reads each document's content as a CSV table.
50+
2. Retains the specified number of `ignore_rows` from the top and `ignore_columns` from the left.
51+
3. Drops any rows and columns that are entirely empty (all NaN values).
52+
4. Reattaches the ignored rows and columns to maintain their original positions.
53+
5. Returns the cleaned CSV content as a new `Document` object.
54+
"""
55+
ignore_rows = self.ignore_rows
56+
ignore_columns = self.ignore_columns
57+
58+
cleaned_documents = []
59+
for document in documents:
60+
try:
61+
df = pd.read_csv(StringIO(document.content), header=None, dtype=object) # type: ignore
62+
except Exception as e:
63+
logger.error(
64+
"Error processing document {id}. Keeping it, but skipping cleaning. Error: {error}",
65+
id=document.id,
66+
error=e,
67+
)
68+
cleaned_documents.append(document)
69+
continue
70+
71+
if ignore_rows > df.shape[0] or ignore_columns > df.shape[1]:
72+
logger.warning(
73+
"Document {id} has fewer rows {df_rows} or columns {df_cols} "
74+
"than the number of rows {rows} or columns {cols} to ignore. "
75+
"Keeping the entire document.",
76+
id=document.id,
77+
df_rows=df.shape[0],
78+
df_cols=df.shape[1],
79+
rows=ignore_rows,
80+
cols=ignore_columns,
81+
)
82+
cleaned_documents.append(document)
83+
continue
84+
85+
# Save ignored rows
86+
ignored_rows = None
87+
if ignore_rows > 0:
88+
ignored_rows = df.iloc[:ignore_rows, :]
89+
90+
# Save ignored columns
91+
ignored_columns = None
92+
if ignore_columns > 0:
93+
ignored_columns = df.iloc[:, :ignore_columns]
94+
95+
# Drop rows and columns that are entirely empty
96+
remaining_df = df.iloc[ignore_rows:, ignore_columns:]
97+
final_df = remaining_df.dropna(axis=0, how="all").dropna(axis=1, how="all")
98+
99+
# Reattach ignored rows
100+
if ignore_rows > 0 and ignored_rows is not None:
101+
# Keep only relevant columns
102+
ignored_rows = ignored_rows.loc[:, final_df.columns]
103+
final_df = pd.concat([ignored_rows, final_df], axis=0)
104+
105+
# Reattach ignored columns
106+
if ignore_columns > 0 and ignored_columns is not None:
107+
# Keep only relevant rows
108+
ignored_columns = ignored_columns.loc[final_df.index, :]
109+
final_df = pd.concat([ignored_columns, final_df], axis=1)
110+
111+
cleaned_documents.append(
112+
Document(
113+
content=final_df.to_csv(index=False, header=False, lineterminator="\n"), meta=document.meta.copy()
114+
)
115+
)
116+
return {"documents": cleaned_documents}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
features:
3+
- |
4+
Introduced `CSVDocumentCleaner` component for cleaning CSV documents.
5+
- Removes empty rows and columns, while preserving specified ignored rows and columns.
6+
- Customizable number of rows and columns to ignore during processing.

Diff for: test/components/generators/test_hugging_face_api.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import pytest
99
from huggingface_hub import (
10+
TextGenerationOutput,
1011
TextGenerationOutputToken,
1112
TextGenerationStreamOutput,
1213
TextGenerationStreamOutputStreamDetails,
@@ -30,7 +31,7 @@ def mock_check_valid_model():
3031
@pytest.fixture
3132
def mock_text_generation():
3233
with patch("huggingface_hub.InferenceClient.text_generation", autospec=True) as mock_text_generation:
33-
mock_response = Mock()
34+
mock_response = Mock(spec=TextGenerationOutput)
3435
mock_response.generated_text = "I'm fine, thanks."
3536
details = Mock()
3637
details.finish_reason = MagicMock(field1="value")

0 commit comments

Comments
 (0)