Skip to content

Commit

Permalink
(feat/extract) Add sources to the extraction (#1101)
Browse files Browse the repository at this point in the history
* Nick: good state

* Nick: source tracker class

* Nick: show sources under flag
  • Loading branch information
nickscamara authored Jan 28, 2025
1 parent 2a0b408 commit 04c6f51
Show file tree
Hide file tree
Showing 10 changed files with 425 additions and 26 deletions.
2 changes: 1 addition & 1 deletion apps/api/src/controllers/v1/extract-status.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,6 @@ export async function extractStatusController(
expiresAt: (await getExtractExpiry(req.params.jobId)).toISOString(),
steps: extract.showSteps ? extract.steps : undefined,
llmUsage: extract.showLLMUsage ? extract.llmUsage : undefined,
// sources: extract.sources,
sources: extract.showSources ? extract.sources : undefined,
});
}
1 change: 1 addition & 0 deletions apps/api/src/controllers/v1/extract.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ export async function extractController(
status: "processing",
showSteps: req.body.__experimental_streamSteps,
showLLMUsage: req.body.__experimental_llmUsage,
showSources: req.body.__experimental_showSources,
});

if (Sentry.isInitialized()) {
Expand Down
1 change: 1 addition & 0 deletions apps/api/src/controllers/v1/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ export const extractV1Options = z
urlTrace: z.boolean().default(false),
__experimental_streamSteps: z.boolean().default(false),
__experimental_llmUsage: z.boolean().default(false),
__experimental_showSources: z.boolean().default(false),
timeout: z.number().int().positive().finite().safe().default(60000),
})
.strict(strictMessage)
Expand Down
5 changes: 3 additions & 2 deletions apps/api/src/lib/extract/document-scraper.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Document, URLTrace, scrapeOptions } from "../../controllers/v1/types";
import { Document, ScrapeOptions, URLTrace, scrapeOptions } from "../../controllers/v1/types";
import { PlanType } from "../../types";
import { logger } from "../logger";
import { getScrapeQueue } from "../../services/queue-service";
Expand All @@ -20,6 +20,7 @@ export async function scrapeDocument(
options: ScrapeDocumentOptions,
urlTraces: URLTrace[],
logger: Logger,
internalScrapeOptions: Partial<ScrapeOptions> = { onlyMainContent: false },
): Promise<Document | null> {
const trace = urlTraces.find((t) => t.url === options.url);
if (trace) {
Expand All @@ -40,7 +41,7 @@ export async function scrapeDocument(
url: options.url,
mode: "single_urls",
team_id: options.teamId,
scrapeOptions: scrapeOptions.parse({ onlyMainContent: false }),
scrapeOptions: scrapeOptions.parse({ ...internalScrapeOptions }),
internalOptions: {
useCache: true,
},
Expand Down
1 change: 1 addition & 0 deletions apps/api/src/lib/extract/extract-redis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ export type StoredExtract = {
showSteps?: boolean;
steps?: ExtractedStep[];
showLLMUsage?: boolean;
showSources?: boolean;
llmUsage?: number;
sources?: {
[key: string]: string[];
Expand Down
55 changes: 33 additions & 22 deletions apps/api/src/lib/extract/extraction-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ const ajv = new Ajv();
import { ExtractStep, updateExtract } from "./extract-redis";
import { deduplicateObjectsArray } from "./helpers/deduplicate-objs-array";
import { mergeNullValObjs } from "./helpers/merge-null-val-objs";
import { areMergeable } from "./helpers/merge-null-val-objs";
import { CUSTOM_U_TEAMS } from "./config";
import {
calculateFinalResultCost,
Expand All @@ -34,6 +35,7 @@ import { analyzeSchemaAndPrompt } from "./completions/analyzeSchemaAndPrompt";
import { checkShouldExtract } from "./completions/checkShouldExtract";
import { batchExtractPromise } from "./completions/batchExtract";
import { singleAnswerCompletion } from "./completions/singleAnswer";
import { SourceTracker } from "./helpers/source-tracker";

interface ExtractServiceOptions {
request: ExtractRequest;
Expand Down Expand Up @@ -272,6 +274,10 @@ export async function performExtraction(
url,
isMultiEntity: true,
}),
{
// Needs to be true for multi-entity to work properly
onlyMainContent: true,
}
);
}
return docsMap.get(url);
Expand Down Expand Up @@ -313,6 +319,7 @@ export async function performExtraction(
const chunkSize = 50;
const timeoutCompletion = 45000; // 45 second timeout
const chunks: Document[][] = [];
const extractionResults: {extract: any, url: string}[] = [];

// Split into chunks
for (let i = 0; i < multyEntityDocs.length; i += chunkSize) {
Expand Down Expand Up @@ -361,7 +368,6 @@ export async function performExtraction(
"is_content_relevant",
],
};
// console.log("schemaWithConfidence", schemaWithConfidence);

await updateExtract(extractId, {
status: "processing",
Expand All @@ -377,7 +383,7 @@ export async function performExtraction(
],
});

const completionPromise = batchExtractPromise(multiEntitySchema, links, request.prompt ?? "", request.systemPrompt ?? "", doc);
const completionPromise = batchExtractPromise(multiEntitySchema, links, request.prompt ?? "", request.systemPrompt ?? "", doc);

// Race between timeout and completion
const multiEntityCompletion = (await Promise.race([
Expand All @@ -389,21 +395,11 @@ export async function performExtraction(
if (multiEntityCompletion) {
tokenUsage.push(multiEntityCompletion.totalUsage);

// Track sources for multi-entity items
if (multiEntityCompletion.extract) {
// For each multi-entity key, track the source URL
multiEntityKeys.forEach(key => {
const items = multiEntityCompletion.extract[key];
if (Array.isArray(items)) {
items.forEach((item, index) => {
const sourcePath = `${key}[${index}]`;
if (!sources[sourcePath]) {
sources[sourcePath] = [];
}
sources[sourcePath].push(doc.metadata.url || doc.metadata.sourceURL || "");
});
}
});
return {
extract: multiEntityCompletion.extract,
url: doc.metadata.url || doc.metadata.sourceURL || ""
};
}
}

Expand Down Expand Up @@ -439,7 +435,7 @@ export async function performExtraction(
// return null;
// }

return multiEntityCompletion.extract;
return null;
} catch (error) {
logger.error(`Failed to process document.`, {
error,
Expand All @@ -451,22 +447,37 @@ export async function performExtraction(

// Wait for current chunk to complete before processing next chunk
const chunkResults = await Promise.all(chunkPromises);
multiEntityCompletions.push(
...chunkResults.filter((result) => result !== null),
);
const validResults = chunkResults.filter((result): result is {extract: any, url: string} => result !== null);
extractionResults.push(...validResults);
multiEntityCompletions.push(...validResults.map(r => r.extract));
logger.debug("All multi-entity completion chunks finished.", {
completionCount: multiEntityCompletions.length,
});
}

try {
// Use SourceTracker to handle source tracking
const sourceTracker = new SourceTracker();

// Transform and merge results while preserving sources
sourceTracker.transformResults(extractionResults, multiEntitySchema, false);

multiEntityResult = transformArrayToObject(
multiEntitySchema,
multiEntityCompletions,
);

// Track sources before deduplication
sourceTracker.trackPreDeduplicationSources(multiEntityResult);

// Apply deduplication and merge
multiEntityResult = deduplicateObjectsArray(multiEntityResult);
multiEntityResult = mergeNullValObjs(multiEntityResult);
// @nick: maybe we can add here a llm that checks if the array probably has a primary key?

// Map sources to final deduplicated/merged items
const multiEntitySources = sourceTracker.mapSourcesToFinalItems(multiEntityResult, multiEntityKeys);
Object.assign(sources, multiEntitySources);

} catch (error) {
logger.error(`Failed to transform array to object`, { error });
return {
Expand Down Expand Up @@ -741,6 +752,6 @@ export async function performExtraction(
urlTrace: request.urlTrace ? urlTraces : undefined,
llmUsage,
totalUrlsScraped,
// sources,
sources,
};
}
Loading

0 comments on commit 04c6f51

Please sign in to comment.