1
1
from __future__ import annotations
2
- from typing import List , Union , Tuple
2
+ from typing import List , Union , Tuple , Dict , Optional , Literal
3
3
from pathlib import Path
4
4
from datetime import date
5
- from typing import Literal
6
5
import logging
7
6
8
7
from markitdown import MarkItDown
@@ -13,10 +12,109 @@ class DocumentParser:
13
12
14
13
SUPPORTED_FORMATS = {'.pdf' , '.docx' , '.pptx' , '.xlsx' , '.html' , '.txt' , '.json' , '.xml' , '.zip' }
15
14
16
- def __init__ (self , use_ocr : bool = False , llm_client : Optional [object ] = None ):
15
+ def __init__ (self ,
16
+ use_ocr : bool = False ,
17
+ llm_client : Optional [object ] = None ,
18
+ chunk_size : int = 1000 ,
19
+ chunk_overlap : int = 200 ,
20
+ use_tokens : bool = True ):
21
+ """
22
+ Initialize the MarkItDown document parser.
23
+
24
+ Args:
25
+ use_ocr: Whether to use OCR for document parsing
26
+ llm_client: Optional LLM client for enhanced parsing
27
+ chunk_size: Maximum size of text chunks (in characters or tokens)
28
+ chunk_overlap: Number of characters or tokens to overlap between chunks
29
+ use_tokens: If True, measures length in tokens; if False, uses characters
30
+ """
17
31
self .parser = MarkItDown (llm_client = llm_client ) if llm_client else MarkItDown ()
18
32
self .use_ocr = use_ocr
19
33
self .logger = logging .getLogger (__name__ )
34
+ self .chunk_size = chunk_size
35
+ self .chunk_overlap = chunk_overlap
36
+ self .use_tokens = use_tokens
37
+
38
+ def split_text_with_overlap (self , text : str ) -> List [str ]:
39
+ """
40
+ Splits text into chunks based on paragraphs, respecting max length and overlap.
41
+
42
+ Args:
43
+ text: The input text to split
44
+
45
+ Returns:
46
+ List of text chunks
47
+ """
48
+ # Normalize newlines for consistent splitting
49
+ normalized_text = text .replace ('\r \n ' , '\n ' ).replace ('\n +' , '\n \n ' ).strip ()
50
+
51
+ # Split into paragraphs based on double newlines
52
+ paragraphs = normalized_text .split ('\n \n ' )
53
+
54
+ chunks = []
55
+ current_chunk = ''
56
+
57
+ # Helper function to measure length (characters or tokens)
58
+ def get_length (s : str ) -> int :
59
+ if self .use_tokens :
60
+ # Rough token count: split by whitespace and filter out empty strings
61
+ return len ([word for word in s .split () if word ])
62
+ return len (s ) # Character count
63
+
64
+ # Helper function to get the last N characters or tokens for overlap
65
+ def get_overlap_segment (s : str , size : int ) -> str :
66
+ if self .use_tokens :
67
+ words = [word for word in s .split () if word ]
68
+ overlap_words = words [- min (size , len (words )):]
69
+ return ' ' .join (overlap_words )
70
+ return s [- min (size , len (s )):]
71
+
72
+ for paragraph in paragraphs :
73
+ paragraph_length = get_length (paragraph )
74
+
75
+ # If paragraph fits in current chunk
76
+ if get_length (current_chunk ) + paragraph_length <= self .chunk_size :
77
+ current_chunk += ('\n \n ' if current_chunk else '' ) + paragraph
78
+ else :
79
+ # If current chunk isn't empty, append it and start a new one with overlap
80
+ if current_chunk :
81
+ chunks .append (current_chunk )
82
+ overlap_text = get_overlap_segment (current_chunk , self .chunk_overlap )
83
+ current_chunk = overlap_text + '\n \n ' + paragraph
84
+ else :
85
+ # If paragraph alone exceeds max_length, split it further
86
+ remaining = paragraph
87
+ while get_length (remaining ) > self .chunk_size :
88
+ if self .use_tokens :
89
+ # Find approximate token boundary
90
+ words = [word for word in remaining .split () if word ]
91
+ token_count = 0
92
+ char_count = 0
93
+ for i , word in enumerate (words ):
94
+ token_count += 1
95
+ char_count += len (word ) + (1 if i > 0 else 0 ) # Add space
96
+ if token_count >= self .chunk_size :
97
+ split_point = char_count
98
+ break
99
+ else :
100
+ split_point = len (remaining )
101
+ else :
102
+ # Find last space before max_length for clean character split
103
+ split_point = remaining .rfind (' ' , 0 , self .chunk_size )
104
+ if split_point == - 1 :
105
+ split_point = self .chunk_size
106
+
107
+ chunk = remaining [:split_point ].strip ()
108
+ chunks .append (chunk )
109
+ overlap_text = get_overlap_segment (chunk , self .chunk_overlap )
110
+ remaining = overlap_text + ' ' + remaining [split_point :].strip ()
111
+ current_chunk = remaining
112
+
113
+ # Append the final chunk if it exists
114
+ if current_chunk :
115
+ chunks .append (current_chunk )
116
+
117
+ return chunks
20
118
21
119
def parse_batch (self , files : List [Path ], batch_size : int = 1 ) -> List [Tuple [List [Node ], FileMetadata ]]:
22
120
"""Process multiple files in batches."""
@@ -46,10 +144,13 @@ def _get_metadata(self, result, file_path: Path) -> Dict:
46
144
}
47
145
48
146
def _text_to_nodes (self , text : str , start_page : int = 1 ) -> List [Node ]:
49
- """Convert text content to nodes."""
147
+ """Convert text content to nodes with RAG-based chunking ."""
50
148
nodes = []
51
149
if text and len (text .strip ()) > 0 :
52
- chunks = [text [i :i + 1000 ] for i in range (0 , len (text ), 1000 )]
150
+ # Apply RAG-based chunking
151
+ chunks = self .split_text_with_overlap (text )
152
+
153
+ self .logger .debug (f"Split text into { len (chunks )} chunks using RAG-based chunking" )
53
154
54
155
for i , chunk in enumerate (chunks , start_page ):
55
156
if chunk .strip ():
0 commit comments