Skip to content

Commit 5238fda

Browse files
authored
fix: improve source tracking so no file stat is lost (#190)
Signed-off-by: Anupam Kumar <[email protected]>
1 parent 64f0f52 commit 5238fda

File tree

3 files changed

+23
-13
lines changed

3 files changed

+23
-13
lines changed

context_chat_backend/chain/ingest/injest.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ def _process_sources(
117117
'len(filtered_sources)': len(filtered_sources),
118118
'filtered_sources': filtered_sources,
119119
})
120+
loaded_source_ids = [source.filename for source in existing_sources]
120121

121122
# update userIds for existing sources
122123
# allow the userIds as additional users, not as the only users
@@ -138,23 +139,26 @@ def _process_sources(
138139
if len(filtered_sources) == 0:
139140
# no new sources to embed
140141
logger.debug('Filtered all sources, nothing to embed')
141-
return [], []
142+
return loaded_source_ids, [] # pyright: ignore[reportReturnType]
142143

143144
logger.debug('Filtered sources:', extra={
144145
'source_ids': [source.filename for source in filtered_sources]
145146
})
147+
# invalid/empty sources are filtered out here and not counted in loaded/retryable
146148
indocuments = _sources_to_indocuments(config, filtered_sources)
147149

148150
logger.debug('Converted all sources to documents')
149151

150152
if len(indocuments) == 0:
151-
# document(s) were empty, not an error
153+
# filtered document(s) were invalid/empty, not an error
152154
logger.debug('All documents were found empty after being processed')
153-
return [], []
155+
return loaded_source_ids, [] # pyright: ignore[reportReturnType]
154156

155-
added_sources, not_added_sources = vectordb.add_indocuments(indocuments)
157+
added_source_ids, retry_source_ids = vectordb.add_indocuments(indocuments)
158+
loaded_source_ids.extend(added_source_ids)
156159
logger.debug('Added documents to vectordb')
157-
return added_sources, not_added_sources
160+
161+
return loaded_source_ids, retry_source_ids # pyright: ignore[reportReturnType]
158162

159163

160164
def _decode_latin_1(s: str) -> str:

context_chat_backend/controller.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -381,7 +381,7 @@ def _(sources: list[UploadFile]):
381381
_indexing[source.filename] = source.size
382382

383383
try:
384-
added_sources, not_added_sources = exec_in_proc(
384+
loaded_sources, not_added_sources = exec_in_proc(
385385
target=embed_sources,
386386
args=(vectordb_loader, app.extra['CONFIG'], sources)
387387
)
@@ -395,13 +395,14 @@ def _(sources: list[UploadFile]):
395395
_indexing.pop(source.filename, None)
396396
doc_parse_semaphore.release()
397397

398-
if len(added_sources) != len(sources):
398+
if len(loaded_sources) != len(sources):
399399
logger.debug('Some sources were not loaded', extra={
400-
'Count of newly loaded sources': f'{len(added_sources)}/{len(sources)}',
401-
'source_ids': added_sources,
400+
'Count of loaded sources': f'{len(loaded_sources)}/{len(sources)}',
401+
'source_ids': loaded_sources,
402402
})
403403

404-
return JSONResponse({'loaded_sources': added_sources, 'sources_to_retry': not_added_sources})
404+
# loaded sources include the existing sources that may only have their access updated
405+
return JSONResponse({'loaded_sources': loaded_sources, 'sources_to_retry': not_added_sources})
405406

406407

407408
class Query(BaseModel):

context_chat_backend/vectordb/pgvector.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ def get_users(self) -> list[str]:
131131

132132
def add_indocuments(self, indocuments: list[InDocument]) -> tuple[list[str], list[str]]:
133133
added_sources = []
134-
not_added_sources = []
134+
retry_sources = []
135135
batch_size = PG_BATCH_SIZE // 5
136136

137137
with self.session_maker() as session:
@@ -155,24 +155,29 @@ def add_indocuments(self, indocuments: list[InDocument]) -> tuple[list[str], lis
155155

156156
self.decl_update_access(indoc.userIds, indoc.source_id, session)
157157
added_sources.append(indoc.source_id)
158+
session.commit()
158159
except SafeDbException as e:
160+
# for when the source_id is not found. This here can be an error in the DB
161+
# and the source should be retried later
159162
logger.exception('Error adding documents to vectordb', exc_info=e, extra={
160163
'source_id': indoc.source_id,
161164
})
165+
retry_sources.append(indoc.source_id)
162166
continue
163167
except EmbeddingException as e:
164168
logger.exception('Error adding documents to vectordb', exc_info=e, extra={
165169
'source_id': indoc.source_id,
166170
})
167-
not_added_sources.append(indoc.source_id)
171+
retry_sources.append(indoc.source_id)
168172
continue
169173
except Exception as e:
170174
logger.exception('Error adding documents to vectordb', exc_info=e, extra={
171175
'source_id': indoc.source_id,
172176
})
177+
retry_sources.append(indoc.source_id)
173178
continue
174179

175-
return added_sources, not_added_sources
180+
return added_sources, retry_sources
176181

177182
@timed
178183
def check_sources(self, sources: list[UploadFile]) -> tuple[list[str], list[str]]:

0 commit comments

Comments
 (0)