Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Pass Through using ASDK #1196

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3,614 changes: 1,815 additions & 1,799 deletions lib/sycamore/poetry.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions lib/sycamore/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ pydantic = "^2.8.2"

typing-extensions = "^4.12.2"

aryn-sdk = "*"

#Evaluation dependencies
apted = { version = "^1.0.3", optional = true }
datasets = { version = "^2.16.1", optional = true }
Expand Down
2 changes: 1 addition & 1 deletion lib/sycamore/sycamore/grouped_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,6 @@ def to_doc(row: dict):
return DocSet(self._docset.context, DatasetScan(serialized))

def count(self) -> DocSet:
from ray.data._internal.aggregate import Count
from ray.data.aggregate import Count

return self.aggregate(Count())
2 changes: 1 addition & 1 deletion lib/sycamore/sycamore/materialize.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from sycamore.transforms.base import rename

if TYPE_CHECKING:
from ray import Dataset
from ray.data import Dataset
import pyarrow


Expand Down
2 changes: 1 addition & 1 deletion lib/sycamore/sycamore/plan_nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from typing import Callable, Optional, TYPE_CHECKING

if TYPE_CHECKING:
from ray import Dataset
from ray.data import Dataset
from sycamore.context import Context


Expand Down
119 changes: 14 additions & 105 deletions lib/sycamore/sycamore/transforms/detr_partitioner.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,15 @@
import tempfile
import tracemalloc
from abc import ABC, abstractmethod
from collections.abc import Mapping
from typing import Any, BinaryIO, Literal, Union, Optional
from itertools import repeat

import requests
import json
from tenacity import retry, retry_if_exception, wait_exponential, stop_after_delay
import base64
from PIL import Image
from pypdf import PdfReader

from aryn_sdk.partition import partition_file
from sycamore.data import Element, BoundingBox, ImageElement, TableElement
from sycamore.data.document import DocumentPropertyTypes
from sycamore.data.element import create_element
Expand Down Expand Up @@ -156,6 +154,7 @@ def partition_pdf(
text_extraction_options: dict[str, Any] = {},
source: str = "",
output_label_options: dict[str, Any] = {},
**kwargs,
) -> list[Element]:
if use_partitioning_service:
assert aryn_api_key != ""
Expand All @@ -172,6 +171,7 @@ def partition_pdf(
output_format=output_format,
source=source,
output_label_options=output_label_options,
**kwargs,
)
else:
if isinstance(threshold, str):
Expand Down Expand Up @@ -217,108 +217,15 @@ def partition_pdf(
wait=wait_exponential(multiplier=1, min=1),
stop=stop_after_delay(_TEN_MINUTES),
)
def _call_remote_partitioner(
file: BinaryIO,
aryn_api_key: str,
aryn_partitioner_address=DEFAULT_ARYN_PARTITIONER_ADDRESS,
threshold: Union[float, Literal["auto"]] = "auto",
use_ocr: bool = False,
extract_table_structure: bool = False,
extract_images: bool = False,
selected_pages: list = [],
output_format: Optional[str] = None,
source: str = "",
output_label_options: dict[str, Any] = {},
) -> list[Element]:
file.seek(0)
options = {
"threshold": threshold,
"use_ocr": use_ocr,
"extract_table_structure": extract_table_structure,
"extract_images": extract_images,
"selected_pages": selected_pages,
"source": f"sycamore-{source}" if source else "sycamore",
"output_label_options": output_label_options,
}
if output_format:
options["output_format"] = output_format

files: Mapping = {"pdf": file, "options": json.dumps(options).encode("utf-8")}
header = {"Authorization": f"Bearer {aryn_api_key}"}

logger.debug(f"ArynPartitioner POSTing to {aryn_partitioner_address} with files={files}")
response = requests.post(aryn_partitioner_address, files=files, headers=header, stream=True)
content = []
in_status = False
in_bulk = False
partial_line = b""
for part in response.iter_content(None):
if not part:
continue

content.append(part)
if in_bulk:
continue
partial_line = partial_line + part
if b"\n" not in part:
# Make sure we don't go O(n^2) from constantly appending to our partial_line.
if len(partial_line) > 100000:
logger.warning("Too many bytes without newline. Skipping incremental status")
in_bulk = True

continue

lines = partial_line.split(b"\n")
if part.endswith(b"\n"):
partial_line = b""
else:
partial_line = lines.pop()

for line in lines:
if line.startswith(b' "status"'):
in_status = True
if not in_status:
continue
if line.startswith(b" ],"):
in_status = False
in_bulk = True
continue
if line.startswith(b' "T+'):
t = json.loads(line.decode("utf-8").removesuffix(","))
logger.info(f"ArynPartitioner: {t}")

body = b"".join(content).decode("utf-8")
logger.debug("ArynPartitioner Recieved data")

if response.status_code != 200:
if response.status_code == 500 or response.status_code == 502:
logger.debug(
"ArynPartitioner recieved a retry-able error {} x-aryn-call-id: {}".format(
response, response.headers.get("x-aryn-call-id")
)
)
raise ArynPDFPartitionerException(
"Error: status_code: {}, reason: {} (x-aryn-call-id: {})".format(
response.status_code, body, response.headers.get("x-aryn-call-id")
),
can_retry=True,
)
raise ArynPDFPartitionerException(
"Error: status_code: {}, reason: {} (x-aryn-call-id: {})".format(
response.status_code, body, response.headers.get("x-aryn-call-id")
)
)

response_json = json.loads(body)
if isinstance(response_json, dict):
status = response_json.get("status", [])
if "error" in response_json:
raise ArynPDFPartitionerException(
f"Error partway through processing: {response_json['error']}\nPartial Status:\n{status}"
)
if (output_format == "markdown") and ((md := response_json.get("markdown")) is not None):
return [text_elem(md)]
response_json = response_json.get("elements", [])
def _call_remote_partitioner(file: BinaryIO, **kwargs) -> list[Element]:
try:
file.seek(0)
response_json = partition_file(file, **kwargs)
except Exception as e:
raise ArynPDFPartitionerException(f"Error calling Aryn DocParse: {e}", can_retry=True)
if (kwargs.get("output_format") == "markdown") and ((md := response_json.get("markdown")) is not None):
return [text_elem(md)]
response_json = response_json.get("elements", [])

elements = []
for idx, element_json in enumerate(response_json):
Expand All @@ -342,6 +249,7 @@ def _partition_remote(
output_format: Optional[str] = None,
source: str = "",
output_label_options: dict[str, Any] = {},
**kwargs,
) -> list[Element]:
page_count = get_page_count(file)

Expand All @@ -364,6 +272,7 @@ def _partition_remote(
output_format=output_format,
source=source,
output_label_options=output_label_options,
**kwargs,
)
)
low = high + 1
Expand Down
4 changes: 4 additions & 0 deletions lib/sycamore/sycamore/transforms/partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,7 @@ class ArynPartitioner(Partitioner):
Here is an example set of output label options:
{"promote_title": True, "title_candidate_elements": ["Section-header", "Caption"]}
default: None (no element is promoted to "Title")
kwargs: Additional keyword arguments to pass to the remote partitioner.
Example:
The following shows an example of using the ArynPartitioner to partition a PDF and extract
both table structure and image
Expand Down Expand Up @@ -462,6 +463,7 @@ def __init__(
text_extraction_options: dict[str, Any] = {},
source: str = "",
output_label_options: dict[str, Any] = {},
**kwargs,
):
if use_partitioning_service:
device = "cpu"
Expand Down Expand Up @@ -502,6 +504,7 @@ def __init__(
self._text_extraction_options = text_extraction_options
self._source = source
self.output_label_options = output_label_options
self._kwargs = kwargs

@timetrace("SycamorePdf")
def partition(self, document: Document) -> Document:
Expand Down Expand Up @@ -531,6 +534,7 @@ def partition(self, document: Document) -> Document:
text_extraction_options=self._text_extraction_options,
source=self._source,
output_label_options=self.output_label_options,
**self._kwargs,
)
except Exception as e:
path = document.properties["path"]
Expand Down
Loading
Loading