diff --git a/.github/workflows/tests.yml b/.github/workflows/verify-pr.yml similarity index 97% rename from .github/workflows/tests.yml rename to .github/workflows/verify-pr.yml index 1759eed..a4b839f 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/verify-pr.yml @@ -1,4 +1,4 @@ -name: Tests +name: Verify PR on: pull_request: diff --git a/README.md b/README.md index 811884e..ffe49b8 100644 --- a/README.md +++ b/README.md @@ -1,10 +1,10 @@ # Grogbot -Grogbot is a uv-based Python monorepo for multiple systems. The first system, **search**, provides local storage and rank-fused search over markdown documents, exposed through both a CLI and a FastAPI service. +Grogbot is a uv-based Python monorepo for multiple systems. The first system, **search**, provides local storage and rank-fused search over markdown documents using FTS, vector, and link authority signals, exposed through both a CLI and a FastAPI service. ## Packages -- **`grogbot-search-core`** (`packages/search-core`): Core models, SQLite persistence, ingestion, chunking, embeddings, and rank-fused search. +- **`grogbot-search-core`** (`packages/search-core`): Core models, SQLite persistence, ingestion, chunking, embeddings, document-link graph storage, and three-signal rank-fused search. - **`grogbot-cli`** (`packages/cli`): Typer-powered CLI (`grogbot`) that surfaces search functionality. - **`grogbot-api`** (`packages/api`): FastAPI app exposing the search system over HTTP. diff --git a/openspec/changes/archive/2026-03-04-add-link-rank-signal/.openspec.yaml b/openspec/changes/archive/2026-03-04-add-link-rank-signal/.openspec.yaml new file mode 100644 index 0000000..5aae5cf --- /dev/null +++ b/openspec/changes/archive/2026-03-04-add-link-rank-signal/.openspec.yaml @@ -0,0 +1,2 @@ +schema: spec-driven +created: 2026-03-04 diff --git a/openspec/changes/archive/2026-03-04-add-link-rank-signal/design.md b/openspec/changes/archive/2026-03-04-add-link-rank-signal/design.md new file mode 100644 index 0000000..a800522 --- /dev/null +++ b/openspec/changes/archive/2026-03-04-add-link-rank-signal/design.md @@ -0,0 +1,90 @@ +## Context + +Search ranking currently fuses two chunk-level retrieval streams (FTS and vector) using reciprocal row-number scoring. The system has no graph signal for document authority, even though ingested markdown often contains outbound links. We need a lightweight page-rank-style signal where documents linked by more distinct source documents receive better rank. + +Existing behavior already separates document upsert from chunking: content changes delete chunks, and `chunk_document(document_id)` regenerates chunks. This change should align link lifecycle with that same pattern. + +Constraints: +- Link identity is exactly `(from_document_id, to_document_id)` with no per-link count. +- Multiple links from one document to the same target collapse to one edge. +- Outbound links from a document must be cleared when content changes or when document is deleted. +- Link extraction should run as part of `chunk_document`. +- Search must expose `link_score` and treat it as equal-weight to FTS and vector scores. +- Documents with zero inbound links must have `link_score = 0.0`. + +## Goals / Non-Goals + +**Goals:** +- Add persistent link graph storage with uniqueness by `(from,to)`. +- Keep links in sync with document lifecycle (content change/delete/chunk regenerate). +- Derive `to_document_id` for unknown targets using `_canonicalize_url` + `document_id_for_url`. +- Add a third reciprocal-rank search signal (`link_score`) with equal additive weight. +- Return `link_score` in `SearchResult` payloads. + +**Non-Goals:** +- Implement iterative/global PageRank or damping-factor graph algorithms. +- Track per-link multiplicity within one source document. +- Add new API endpoints for direct link CRUD. +- Change query endpoint/CLI shape beyond additional `link_score` field in results. + +## Decisions + +1. **Store links in a dedicated `links` table keyed by `(from_document_id, to_document_id)`** + - Decision: Add table: + - `from_document_id TEXT NOT NULL` + - `to_document_id TEXT NOT NULL` + - `PRIMARY KEY (from_document_id, to_document_id)` + - `FOREIGN KEY (from_document_id) REFERENCES documents(id) ON DELETE CASCADE` + - index on `to_document_id` + - Rationale: Enforces one edge per source-target pair and supports efficient inbound counting. + - Alternative considered: `id` surrogate plus unique index; rejected as unnecessary complexity. + +2. **Extract outbound links during `chunk_document` and fully refresh per source document** + - Decision: In `chunk_document(document_id)`, delete existing outbound links for `document_id`, then extract links from `document.content_markdown`, dedupe targets, ignore self-links, and insert with `INSERT OR IGNORE`. + - Rationale: Mirrors chunk regeneration semantics and guarantees graph consistency with current content. + - Alternative considered: extract during `upsert_document`; rejected because chunking is already the indexing boundary. + +3. **Clear outbound links when document content changes** + - Decision: In `upsert_document`, when `content_changed` is true, delete both chunks and outbound links for that document before commit. + - Rationale: Prevents stale link edges between upsert and next chunk run. + - Alternative considered: only clear on next `chunk_document`; rejected because stale links would affect ranking in the interim. + +4. **Treat unknown targets as first-class link destinations** + - Decision: For each extracted href, compute `target_url = _canonicalize_url(href)` and `to_document_id = document_id_for_url(target_url)` even if no matching `documents` row exists. + - Rationale: Preserves graph evidence ahead of ingestion order and matches requested behavior. + - Alternative considered: only store links to known documents; rejected by requirement. + +5. **Compute link signal as query-time reciprocal rank over candidate documents with inbound links** + - Decision: In `search`, add CTEs to: + - map candidate chunks to candidate documents, + - count inbound edges per candidate document (`COUNT(*)` on distinct `(from,to)` table rows), + - rank only documents with inbound count > 0 by `inbound_count DESC, document_id ASC`, + - compute `link_score = 1.0 / (1 + row_number)`, + - `COALESCE(link_score, 0.0)` for documents with zero inbound links. + - Rationale: Keeps scoring query-local, deterministic, and directly combinable with existing reciprocal FTS/vector channels. + - Alternative considered: global precomputed link ranks; rejected for additional complexity and staleness management. + +6. **Expose `link_score` in the public search model** + - Decision: Extend `SearchResult` with a required `link_score: float` and populate it in `SearchService.search`. + - Rationale: Required for transparency and downstream tuning/inspection. + - Alternative considered: keep link score internal to `score`; rejected by requirement. + +## Risks / Trade-offs + +- **[Risk] Relative/fragment links may map to hashed IDs that never resolve cleanly** → **Mitigation:** canonicalize uniformly and accept unknown targets; behavior remains deterministic and non-blocking. +- **[Risk] Additional SQL CTE/join complexity may affect query latency** → **Mitigation:** keep candidate pool bounded (`limit * 10`), index `links.to_document_id`, and validate via test coverage. +- **[Risk] Query-local link ranking means scores are relative to candidate set, not global authority** → **Mitigation:** intentional for equal-weight reciprocal fusion; revisit with global precompute only if relevance data demands it. +- **[Risk] Stale links if ingestion writes content but chunking is deferred indefinitely** → **Mitigation:** content-change path clears outbound links immediately; synchronization jobs can rebuild when needed. + +## Migration Plan + +1. Add `links` table/index in `_init_schema` (idempotent `CREATE TABLE/INDEX IF NOT EXISTS`). +2. Update link lifecycle in `upsert_document`, `chunk_document`, and delete behavior (via FK cascade on `from_document_id`). +3. Add outbound link extraction + insertion helper(s) used by `chunk_document`. +4. Extend search SQL and `SearchResult` model with `link_score`. +5. Update tests for link persistence/lifecycle and three-signal fusion semantics. +6. Rollback path: remove link-score CTE integration and `link_score` model field while leaving table unused (safe backward rollback without destructive migration). + +## Open Questions + +- None. \ No newline at end of file diff --git a/openspec/changes/archive/2026-03-04-add-link-rank-signal/proposal.md b/openspec/changes/archive/2026-03-04-add-link-rank-signal/proposal.md new file mode 100644 index 0000000..36baeee --- /dev/null +++ b/openspec/changes/archive/2026-03-04-add-link-rank-signal/proposal.md @@ -0,0 +1,28 @@ +## Why + +Search currently ranks chunks using only FTS and vector signals, so it misses an important authority cue: documents that are frequently linked by other documents should generally rank higher. Adding a link-based rank signal now improves relevance using data we already ingest in document content. + +## What Changes + +- Add persistent document-to-document link storage using `from_document_id` and `to_document_id` pairs, with uniqueness per pair (no intra-document duplicate counts). +- Generate and refresh outbound links for a document during `chunk_document(document_id)` by parsing links from the document content. +- Remove outbound links from a document whenever its content changes or the document is deleted, matching chunk lifecycle semantics. +- Store links to not-yet-ingested targets by canonicalizing target URLs and deriving `to_document_id` via `document_id_for_url`. +- Extend search rank fusion with a third, equal-weight signal (`link_score`) based on inbound-link rank (in-degree), where documents with zero inbound links receive `link_score = 0.0`. +- Expose `link_score` in search results alongside `fts_score`, `vector_score`, and total `score`. + +## Capabilities + +### New Capabilities +- `document-link-graph`: Manage outbound document links derived from document content and kept in sync with document chunking/deletion lifecycle. +- `search-link-rank-fusion`: Add a link-based reciprocal-rank signal to search scoring and expose per-result link scoring metadata. + +### Modified Capabilities +- None. + +## Impact + +- Affected code: `packages/search-core/src/grogbot_search/service.py`, `packages/search-core/src/grogbot_search/models.py`, `packages/search-core/src/grogbot_search/__init__.py`. +- Affected tests: `packages/search-core/tests/test_service.py` (link persistence/lifecycle, scoring, and result payload assertions). +- API/CLI contracts: search response payload includes new `link_score` field; query endpoints/commands remain unchanged. +- Database/schema: new `links` table and related indexes/constraints/triggers as needed. \ No newline at end of file diff --git a/openspec/changes/archive/2026-03-04-add-link-rank-signal/specs/document-link-graph/spec.md b/openspec/changes/archive/2026-03-04-add-link-rank-signal/specs/document-link-graph/spec.md new file mode 100644 index 0000000..c244792 --- /dev/null +++ b/openspec/changes/archive/2026-03-04-add-link-rank-signal/specs/document-link-graph/spec.md @@ -0,0 +1,34 @@ +## ADDED Requirements + +### Requirement: Outbound document links SHALL be stored as unique directed pairs +The system SHALL persist document links as directed `(from_document_id, to_document_id)` pairs, and it MUST prevent duplicate pairs from the same source document to the same target document. + +#### Scenario: Multiple links to the same target in one document +- **WHEN** a document contains multiple outbound links that resolve to the same target URL +- **THEN** the system stores exactly one link pair for that `(from_document_id, to_document_id)` relationship + +#### Scenario: Links to different targets from one document +- **WHEN** a document contains outbound links that resolve to different target URLs +- **THEN** the system stores one link pair per unique target document id + +### Requirement: Link targets SHALL be derived even when target documents are not ingested +For each outbound link extracted from document content, the system SHALL canonicalize the URL and MUST derive `to_document_id` via `document_id_for_url` regardless of whether a corresponding `documents` row exists. + +#### Scenario: Outbound link points to unknown target URL +- **WHEN** a chunked document links to a URL that has not been ingested as a document +- **THEN** the system stores a link pair using `to_document_id = document_id_for_url(_canonicalize_url(url))` + +### Requirement: Outbound links SHALL follow chunk lifecycle and ignore self-links +The system SHALL refresh outbound links for a document during `chunk_document(document_id)` by deleting existing links from that document and inserting links extracted from current content. The system MUST delete outbound links from a document when its content changes or the document is deleted. The system MUST ignore self-links where `from_document_id == to_document_id`. + +#### Scenario: Chunking regenerates outbound links from current content +- **WHEN** `chunk_document(document_id)` is invoked for a document with previously stored outbound links +- **THEN** existing links from that document are deleted before new outbound links are inserted + +#### Scenario: Content change clears stale outbound links before re-chunking +- **WHEN** `upsert_document` updates an existing document with changed `content_markdown` +- **THEN** all links where `from_document_id` equals that document id are deleted + +#### Scenario: Self-links are excluded +- **WHEN** an outbound link resolves to the same document id as the source document +- **THEN** the system does not store that link pair \ No newline at end of file diff --git a/openspec/changes/archive/2026-03-04-add-link-rank-signal/specs/search-link-rank-fusion/spec.md b/openspec/changes/archive/2026-03-04-add-link-rank-signal/specs/search-link-rank-fusion/spec.md new file mode 100644 index 0000000..c08865e --- /dev/null +++ b/openspec/changes/archive/2026-03-04-add-link-rank-signal/specs/search-link-rank-fusion/spec.md @@ -0,0 +1,26 @@ +## ADDED Requirements + +### Requirement: Search SHALL include a link-based reciprocal-rank signal +The search ranking pipeline SHALL compute a `link_score` channel from inbound-link counts and MUST add it to existing reciprocal FTS and vector scores with equal weight. + +#### Scenario: Final score includes all three signals +- **WHEN** a query returns ranked chunk candidates +- **THEN** each result score is computed as `score = fts_score + vector_score + link_score` + +### Requirement: Link score SHALL rank by inbound links and zero-fill missing link authority +For ranked candidate documents with one or more inbound links, the system SHALL assign link row numbers ordered by inbound link count descending and document id ascending for deterministic ties, and MUST compute `link_score = 1.0 / (1 + row_number)`. Documents with zero inbound links MUST receive `link_score = 0.0`. + +#### Scenario: Document with inbound links gets reciprocal link score +- **WHEN** a candidate document has at least one inbound link and ranks first among candidate documents by inbound count +- **THEN** its `link_score` is `1.0 / (1 + 1)` + +#### Scenario: Document without inbound links gets zero link score +- **WHEN** a candidate document has zero inbound links +- **THEN** its `link_score` is `0.0` + +### Requirement: Search results SHALL expose link_score +The search result model SHALL include `link_score` for every returned result, alongside `fts_score`, `vector_score`, and final `score`. + +#### Scenario: Query response contains link_score field +- **WHEN** search results are returned from the service +- **THEN** each result includes a numeric `link_score` field \ No newline at end of file diff --git a/openspec/changes/archive/2026-03-04-add-link-rank-signal/tasks.md b/openspec/changes/archive/2026-03-04-add-link-rank-signal/tasks.md new file mode 100644 index 0000000..bf260dc --- /dev/null +++ b/openspec/changes/archive/2026-03-04-add-link-rank-signal/tasks.md @@ -0,0 +1,32 @@ +## 1. Schema and model updates + +- [x] 1.1 Add `links` table creation to `SearchService._init_schema` with `(from_document_id, to_document_id)` primary key and FK cascade on `from_document_id` +- [x] 1.2 Add an index on `links.to_document_id` for inbound count lookups +- [x] 1.3 Extend `SearchResult` in `packages/search-core/src/grogbot_search/models.py` with required `link_score: float` + +## 2. Link extraction and lifecycle behavior + +- [x] 2.1 Implement markdown outbound-link extraction helper(s) in `service.py` and normalize each href with `_canonicalize_url` +- [x] 2.2 Derive `to_document_id` via `document_id_for_url` for every extracted target and dedupe pairs per source document +- [x] 2.3 Update `upsert_document` to delete outbound links for a document when `content_markdown` changes +- [x] 2.4 Update `chunk_document` to clear existing outbound links for the source document, ignore self-links, and insert refreshed links alongside chunk regeneration + +## 3. Search rank fusion integration + +- [x] 3.1 Extend the `search` SQL CTE pipeline to compute candidate-document inbound-link counts from `links` +- [x] 3.2 Add reciprocal `link_score` ranking (`1.0 / (1 + row_number)`) ordered by inbound count DESC then document id ASC +- [x] 3.3 Ensure documents with zero inbound links receive `link_score = 0.0` via `COALESCE` +- [x] 3.4 Update final score computation to `fts_score + vector_score + link_score` and map `link_score` into returned `SearchResult` objects + +## 4. Test coverage + +- [x] 4.1 Add tests verifying unique `(from,to)` storage and collapse of multiple same-target links within one source document +- [x] 4.2 Add tests verifying unknown target URLs are stored via `document_id_for_url(_canonicalize_url(url))` +- [x] 4.3 Add tests verifying self-links are ignored and outbound links are cleared on content change/document delete/chunk refresh +- [x] 4.4 Add ranking tests verifying three-signal additive scoring, deterministic tie handling, and `link_score = 0.0` for zero-inbound documents +- [x] 4.5 Add result-shape tests verifying `link_score` is present in query results (service/API/CLI JSON output paths as applicable) + +## 5. Validation and readiness + +- [x] 5.1 Run `uv run pytest packages/search-core/tests` and confirm passing coverage for updated behavior +- [x] 5.2 Update any remaining user/developer-facing wording that describes search scoring to include the new `link_score` signal diff --git a/packages/cli/src/grogbot_cli/app.py b/packages/cli/src/grogbot_cli/app.py index 4655410..4b472c6 100644 --- a/packages/cli/src/grogbot_cli/app.py +++ b/packages/cli/src/grogbot_cli/app.py @@ -236,16 +236,6 @@ def bootstrap( sources_list = list(sources) with _service() as service: - if not skip_sitemaps: - for source in sources_list: - sitemap = source.get("sitemap") - if not sitemap: - continue - typer.echo(f"Scraping sitemap {sitemap}") - try: - service.create_documents_from_sitemap(sitemap, bootstrap=True) - except Exception as exc: - print(f"Bootstrap failed for sitemap {sitemap}: {exc}", file=sys.stderr) if not skip_feeds: for source in sources_list: feed = source.get("feed") @@ -256,6 +246,16 @@ def bootstrap( service.create_documents_from_feed(feed, paginate=True) except Exception as exc: print(f"Bootstrap failed for feed {feed}: {exc}", file=sys.stderr) + if not skip_sitemaps: + for source in sources_list: + sitemap = source.get("sitemap") + if not sitemap: + continue + typer.echo(f"Scraping sitemap {sitemap}") + try: + service.create_documents_from_sitemap(sitemap, bootstrap=True) + except Exception as exc: + print(f"Bootstrap failed for sitemap {sitemap}: {exc}", file=sys.stderr) @search_app.command("query") diff --git a/packages/search-core/src/grogbot_search/embeddings.py b/packages/search-core/src/grogbot_search/embeddings.py index c46c93d..008751a 100644 --- a/packages/search-core/src/grogbot_search/embeddings.py +++ b/packages/search-core/src/grogbot_search/embeddings.py @@ -5,6 +5,8 @@ from sentence_transformers import SentenceTransformer +_EMBEDDING_BATCH_SIZE = 8 + @lru_cache(maxsize=1) def _load_model() -> SentenceTransformer: @@ -12,6 +14,13 @@ def _load_model() -> SentenceTransformer: def embed_texts(texts: Iterable[str], *, prompt: str) -> List[list[float]]: + text_list = list(texts) + if not text_list: + return [] + model = _load_model() - embeddings = model.encode(list(texts), normalize_embeddings=True, prompt=prompt) + embeddings = [] + for start in range(0, len(text_list), _EMBEDDING_BATCH_SIZE): + batch = text_list[start : start + _EMBEDDING_BATCH_SIZE] + embeddings.extend(model.encode(batch, normalize_embeddings=True, prompt=prompt)) return [embedding.tolist() for embedding in embeddings] diff --git a/packages/search-core/src/grogbot_search/models.py b/packages/search-core/src/grogbot_search/models.py index 44b0880..7c896cf 100644 --- a/packages/search-core/src/grogbot_search/models.py +++ b/packages/search-core/src/grogbot_search/models.py @@ -32,6 +32,7 @@ class Chunk(BaseModel): class SearchResult(BaseModel): chunk: Chunk document: Document - score: float = Field(..., description="Final rank-fusion score combining FTS and vector rankings") + score: float = Field(..., description="Final rank-fusion score combining FTS, vector, and link rankings") fts_score: float vector_score: float + link_score: float diff --git a/packages/search-core/src/grogbot_search/service.py b/packages/search-core/src/grogbot_search/service.py index ba8cba3..6e0217c 100644 --- a/packages/search-core/src/grogbot_search/service.py +++ b/packages/search-core/src/grogbot_search/service.py @@ -12,10 +12,11 @@ from urllib.parse import parse_qs, urlencode, urlparse, urljoin, urlunparse import httpx +import markdown +from bs4 import BeautifulSoup from dateutil import parser as date_parser from markdownify import markdownify as html_to_markdown from readability import Document as ReadabilityDocument -from bs4 import BeautifulSoup from grogbot_search.chunking import chunk_markdown from grogbot_search.embeddings import embed_texts @@ -27,6 +28,7 @@ class SearchScores: fts: float vector: float + link: float hybrid: float @@ -38,15 +40,18 @@ class SearchScores: "verify you are human", ) -_DEFAULT_USER_AGENT = ( - "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 " - "(KHTML, like Gecko) Chrome/113.0.0.0 Safari/537.3" -) -_DEFAULT_HEADERS = {"User-Agent": _DEFAULT_USER_AGENT} - - -def _http_get(url: str, timeout: float = 20.0) -> httpx.Response: - return httpx.get(url, timeout=timeout, headers=_DEFAULT_HEADERS) +_DEFAULT_HEADERS = { + "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:147.0) Gecko/20100101 Firefox/147.0", + "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", + "Accept-Language": "en-US,en;q=0.9", + "Accept-Encoding": "gzip, deflate, br, zstd", + "Connection": "keep-alive", + "Upgrade-Insecure-Requests": "1", + "Sec-Fetch-Dest": "document", + "Sec-Fetch-Mode": "navigate", + "Sec-Fetch-Site": "none", + "Priority": "u=0, i", +} class BackoffError(RuntimeError): @@ -64,7 +69,8 @@ def _classify_backoff_response(response: httpx.Response) -> Optional[str]: if response.headers.get("Retry-After") is not None: return "retry-after-header" - body = response.text.lower() + body_match = re.search(r"]*>(.*?)", response.text, flags=re.IGNORECASE | re.DOTALL) + body = (body_match.group(1) if body_match else "").lower() for marker in _CAPTCHA_MARKERS: if marker in body: return f"body-marker={marker}" @@ -134,6 +140,27 @@ def _dedupe_urls(urls: Iterable[str]) -> List[str]: return unique_urls +def _extract_markdown_links(content_markdown: str) -> List[str]: + rendered_html = markdown.markdown(content_markdown) + soup = BeautifulSoup(rendered_html, "html.parser") + links: List[str] = [] + for anchor in soup.find_all("a", href=True): + href = _canonicalize_url(str(anchor.get("href") or "")) + if href: + links.append(href) + return links + + +def _to_document_ids_from_markdown(*, source_document_id: str, content_markdown: str) -> set[str]: + to_document_ids: set[str] = set() + for href in _extract_markdown_links(content_markdown): + to_document_id = document_id_for_url(_canonicalize_url(href)) + if to_document_id == source_document_id: + continue + to_document_ids.add(to_document_id) + return to_document_ids + + def _parse_datetime(value: Optional[str]) -> Optional[datetime]: if not value: return None @@ -189,9 +216,15 @@ def __init__(self, db_path: Path): self.connection.row_factory = sqlite3.Row self.connection.execute("PRAGMA foreign_keys = ON") self._sqlite_vec = _ensure_sqlite_vec(self.connection) + self._http_client = httpx.Client(headers=_DEFAULT_HEADERS) self._init_schema() + def _http_get(self, url: str, timeout: float = 20.0) -> httpx.Response: + # Keep a single in-memory cookie jar for non-feed requests during this service run. + return self._http_client.get(url, timeout=timeout) + def close(self) -> None: + self._http_client.close() self.connection.close() def __enter__(self) -> "SearchService": @@ -229,6 +262,15 @@ def _init_schema(self) -> None: UNIQUE (document_id, chunk_index) ); + CREATE TABLE IF NOT EXISTS links ( + from_document_id TEXT NOT NULL, + to_document_id TEXT NOT NULL, + PRIMARY KEY (from_document_id, to_document_id), + FOREIGN KEY (from_document_id) REFERENCES documents(id) ON DELETE CASCADE + ); + + CREATE INDEX IF NOT EXISTS idx_links_to_document_id ON links (to_document_id); + CREATE VIRTUAL TABLE IF NOT EXISTS chunks_fts USING fts5(content_text, content='chunks', content_rowid='id', tokenize='porter'); @@ -358,6 +400,7 @@ def upsert_document( ) if content_changed: self.connection.execute("DELETE FROM chunks WHERE document_id = ?", (document_id,)) + self.connection.execute("DELETE FROM links WHERE from_document_id = ?", (document_id,)) self.connection.commit() return Document( id=document_id, @@ -422,7 +465,9 @@ def chunk_document(self, document_id: str) -> int: if not document: raise DocumentNotFoundError(f"Document not found: {document_id}") self.connection.execute("DELETE FROM chunks WHERE document_id = ?", (document_id,)) + self.connection.execute("DELETE FROM links WHERE from_document_id = ?", (document_id,)) created = self._create_chunks(document_id, document.content_markdown) + self._insert_document_links(document_id=document_id, content_markdown=document.content_markdown) self.connection.commit() return len(created) @@ -446,6 +491,17 @@ def synchronize_document_chunks(self, maximum: Optional[int] = None) -> int: total_created += self.chunk_document(row["id"]) return total_created + def _insert_document_links(self, *, document_id: str, content_markdown: str) -> None: + to_document_ids = _to_document_ids_from_markdown( + source_document_id=document_id, + content_markdown=content_markdown, + ) + for to_document_id in sorted(to_document_ids): + self.connection.execute( + "INSERT OR IGNORE INTO links (from_document_id, to_document_id) VALUES (?, ?)", + (document_id, to_document_id), + ) + def _create_chunks(self, document_id: str, content_markdown: str) -> List[Chunk]: chunks = chunk_markdown(content_markdown) embeddings = embed_texts(chunks, prompt="search_document") if chunks else [] @@ -467,7 +523,7 @@ def _create_chunks(self, document_id: str, content_markdown: str) -> List[Chunk] return created def create_document_from_url(self, url: str) -> Document: - response = _http_get(url, timeout=20.0) + response = self._http_get(url, timeout=20.0) backoff_reason = _classify_backoff_response(response) if backoff_reason: raise BackoffError(f"Backoff detected while ingesting URL {url}: {backoff_reason}") @@ -614,7 +670,7 @@ def _next_wordpress_url(base_url: str) -> Optional[str]: def create_documents_from_opml(self, opml_url: str, paginate: bool = False) -> List[Document]: """Fetch and parse OPML, then ingest documents from each feed URL with best-effort handling.""" - response = _http_get(opml_url, timeout=20.0) + response = self._http_get(opml_url, timeout=20.0) response.raise_for_status() xml_content = response.text @@ -634,7 +690,7 @@ def create_documents_from_opml(self, opml_url: str, paginate: bool = False) -> L def create_documents_from_sitemap(self, sitemap_url: str, bootstrap: bool = False) -> List[Document]: """Fetch and parse sitemap XML, then ingest each URL entry with best-effort handling.""" - response = _http_get(sitemap_url, timeout=20.0) + response = self._http_get(sitemap_url, timeout=20.0) response.raise_for_status() xml_content = response.text @@ -715,15 +771,40 @@ def search(self, query: str, limit: int = 10) -> List[SearchResult]: SELECT chunk_id FROM fts_ranked UNION SELECT chunk_id FROM vec_ranked + ), + candidate_documents AS ( + SELECT DISTINCT chunks.document_id + FROM all_chunk_ids + JOIN chunks ON chunks.id = all_chunk_ids.chunk_id + ), + candidate_inbound_links AS ( + SELECT + candidate_documents.document_id, + COUNT(links.from_document_id) AS inbound_count + FROM candidate_documents + LEFT JOIN links ON links.to_document_id = candidate_documents.document_id + GROUP BY candidate_documents.document_id + ), + link_ranked AS ( + SELECT + document_id, + 1.0 / (1 + row_number() OVER (ORDER BY inbound_count DESC, document_id ASC)) AS link_score + FROM candidate_inbound_links + WHERE inbound_count > 0 ) SELECT all_chunk_ids.chunk_id, COALESCE(fts_ranked.fts_score, 0.0) AS fts_score, COALESCE(vec_ranked.vector_score, 0.0) AS vector_score, - COALESCE(fts_ranked.fts_score, 0.0) + COALESCE(vec_ranked.vector_score, 0.0) AS final_score + COALESCE(link_ranked.link_score, 0.0) AS link_score, + COALESCE(fts_ranked.fts_score, 0.0) + + COALESCE(vec_ranked.vector_score, 0.0) + + COALESCE(link_ranked.link_score, 0.0) AS final_score FROM all_chunk_ids + JOIN chunks ON chunks.id = all_chunk_ids.chunk_id LEFT JOIN fts_ranked ON fts_ranked.chunk_id = all_chunk_ids.chunk_id LEFT JOIN vec_ranked ON vec_ranked.chunk_id = all_chunk_ids.chunk_id + LEFT JOIN link_ranked ON link_ranked.document_id = chunks.document_id ORDER BY final_score DESC, all_chunk_ids.chunk_id ASC LIMIT ? """, @@ -746,6 +827,7 @@ def search(self, query: str, limit: int = 10) -> List[SearchResult]: scores[chunk_id] = SearchScores( fts=row["fts_score"], vector=row["vector_score"], + link=row["link_score"], hybrid=row["final_score"], ) @@ -799,6 +881,7 @@ def search(self, query: str, limit: int = 10) -> List[SearchResult]: score=score.hybrid, fts_score=score.fts, vector_score=score.vector, + link_score=score.link, ) ) diff --git a/packages/search-core/tests/test_embeddings.py b/packages/search-core/tests/test_embeddings.py index cc376be..bb67b35 100644 --- a/packages/search-core/tests/test_embeddings.py +++ b/packages/search-core/tests/test_embeddings.py @@ -48,3 +48,25 @@ def encode(self, texts, *, normalize_embeddings: bool, prompt: str): assert result == [[1.0, 2.0], [3.0, 4.0]] assert fake_model.calls == [(["first", "second"], True, "search_query")] + + +def test_embed_texts_batches_requests_to_max_eight(monkeypatch): + class FakeModel: + def __init__(self): + self.calls = [] + + def encode(self, texts, *, normalize_embeddings: bool, prompt: str): + self.calls.append((list(texts), normalize_embeddings, prompt)) + return [_FakeArray([float(int(text.removeprefix("chunk-")))]) for text in texts] + + fake_model = FakeModel() + monkeypatch.setattr(embeddings, "_load_model", lambda: fake_model) + + inputs = [f"chunk-{index}" for index in range(10)] + result = embeddings.embed_texts(inputs, prompt="search_document") + + assert result == [[float(index)] for index in range(10)] + assert fake_model.calls == [ + ([f"chunk-{index}" for index in range(8)], True, "search_document"), + (["chunk-8", "chunk-9"], True, "search_document"), + ] diff --git a/packages/search-core/tests/test_service.py b/packages/search-core/tests/test_service.py index 094db5e..5761428 100644 --- a/packages/search-core/tests/test_service.py +++ b/packages/search-core/tests/test_service.py @@ -4,6 +4,7 @@ from types import SimpleNamespace from urllib.parse import urlparse +import httpx import pysqlite3 as sqlite3 import pytest @@ -235,6 +236,125 @@ def test_synchronize_document_chunks_non_positive_maximum_is_noop(service: Searc assert chunk_count == 0 +# Link graph behavior + +def test_chunk_document_stores_unique_outbound_links_per_target(service: SearchService): + source = service.upsert_source("example.com", name="Example") + document = service.upsert_document( + source_id=source.id, + canonical_url="https://example.com/source", + title="Source", + published_at=None, + content_markdown=( + "[one](https://example.com/target) " + "[two](https://example.com/target) " + "[three](https://example.com/other-target)" + ), + ) + + service.chunk_document(document.id) + + links = service.connection.execute( + """ + SELECT from_document_id, to_document_id + FROM links + WHERE from_document_id = ? + ORDER BY to_document_id + """, + (document.id,), + ).fetchall() + + assert len(links) == 2 + assert [row["to_document_id"] for row in links] == sorted( + [ + service_module.document_id_for_url(service_module._canonicalize_url("https://example.com/target")), + service_module.document_id_for_url(service_module._canonicalize_url("https://example.com/other-target")), + ] + ) + + +def test_chunk_document_stores_unknown_targets_by_canonicalized_url(service: SearchService): + source = service.upsert_source("example.com", name="Example") + target_url = "https://example.com/not-ingested" + document = service.upsert_document( + source_id=source.id, + canonical_url="https://example.com/source", + title="Source", + published_at=None, + content_markdown=f"[unknown]({target_url})", + ) + + service.chunk_document(document.id) + + link_row = service.connection.execute( + "SELECT to_document_id FROM links WHERE from_document_id = ?", + (document.id,), + ).fetchone() + + assert link_row is not None + assert link_row["to_document_id"] == service_module.document_id_for_url( + service_module._canonicalize_url(target_url) + ) + + +def test_outbound_links_ignore_self_and_follow_content_delete_and_refresh_lifecycle(service: SearchService): + source = service.upsert_source("example.com", name="Example") + canonical_url = "https://example.com/lifecycle" + document = service.upsert_document( + source_id=source.id, + canonical_url=canonical_url, + title="Lifecycle", + published_at=None, + content_markdown=f"[self]({canonical_url}) [other](https://example.com/other)", + ) + + service.chunk_document(document.id) + + links = service.connection.execute( + "SELECT to_document_id FROM links WHERE from_document_id = ? ORDER BY to_document_id", + (document.id,), + ).fetchall() + assert [row["to_document_id"] for row in links] == [ + service_module.document_id_for_url(service_module._canonicalize_url("https://example.com/other")) + ] + + updated = service.upsert_document( + source_id=source.id, + canonical_url=canonical_url, + title="Lifecycle updated", + published_at=None, + content_markdown="updated body with no links", + ) + + stale_links = service.connection.execute( + "SELECT COUNT(*) AS count FROM links WHERE from_document_id = ?", + (updated.id,), + ).fetchone() + assert stale_links["count"] == 0 + + service.connection.execute( + "UPDATE documents SET content_markdown = ? WHERE id = ?", + ("[refreshed](https://example.com/refreshed)", updated.id), + ) + service.connection.commit() + service.chunk_document(updated.id) + + refreshed_links = service.connection.execute( + "SELECT to_document_id FROM links WHERE from_document_id = ?", + (updated.id,), + ).fetchall() + assert [row["to_document_id"] for row in refreshed_links] == [ + service_module.document_id_for_url(service_module._canonicalize_url("https://example.com/refreshed")) + ] + + assert service.delete_document(updated.id) is True + remaining_links = service.connection.execute( + "SELECT COUNT(*) AS count FROM links WHERE from_document_id = ?", + (updated.id,), + ).fetchone() + assert remaining_links["count"] == 0 + + # Search behavior def test_rank_fusion_search_returns_results(service: SearchService): @@ -277,7 +397,8 @@ def test_rank_fusion_scores_are_reciprocal_and_additive(service: SearchService): expected_method_score = pytest.approx(1.0 / (1 + rank)) assert result.fts_score == expected_method_score assert result.vector_score == expected_method_score - assert result.score == pytest.approx(result.fts_score + result.vector_score) + assert result.link_score == 0.0 + assert result.score == pytest.approx(result.fts_score + result.vector_score + result.link_score) def test_rank_fusion_zero_fills_missing_method_score(service: SearchService): @@ -297,7 +418,8 @@ def test_rank_fusion_zero_fills_missing_method_score(service: SearchService): top = results[0] assert top.fts_score == 0.0 assert top.vector_score > 0.0 - assert top.score == pytest.approx(top.fts_score + top.vector_score) + assert top.link_score == 0.0 + assert top.score == pytest.approx(top.fts_score + top.vector_score + top.link_score) def test_search_respects_result_limit(service: SearchService): @@ -333,8 +455,121 @@ def test_search_returns_empty_for_blank_query_or_non_positive_limit(service: Sea assert service.search("hello", limit=-1) == [] +def test_search_includes_link_score_with_deterministic_ties_and_zero_fill(service: SearchService): + source = service.upsert_source("example.com", name="Example") + + doc_a = service.upsert_document( + source_id=source.id, + canonical_url="https://example.com/a", + title="A", + published_at=None, + content_markdown="alpha", + ) + doc_b = service.upsert_document( + source_id=source.id, + canonical_url="https://example.com/b", + title="B", + published_at=None, + content_markdown=f"alpha [a]({doc_a.canonical_url})", + ) + doc_c = service.upsert_document( + source_id=source.id, + canonical_url="https://example.com/c", + title="C", + published_at=None, + content_markdown=f"alpha [a]({doc_a.canonical_url})", + ) + doc_d = service.upsert_document( + source_id=source.id, + canonical_url="https://example.com/d", + title="D", + published_at=None, + content_markdown=f"alpha [b]({doc_b.canonical_url}) [c]({doc_c.canonical_url})", + ) + + service.chunk_document(doc_a.id) + service.chunk_document(doc_b.id) + service.chunk_document(doc_c.id) + service.chunk_document(doc_d.id) + + results = service.search("alpha", limit=4) + + assert len(results) == 4 + + by_document_id = {result.document.id: result for result in results} + for result in results: + assert result.score == pytest.approx(result.fts_score + result.vector_score + result.link_score) + + assert by_document_id[doc_a.id].link_score == pytest.approx(1.0 / (1 + 1)) + assert by_document_id[doc_d.id].link_score == 0.0 + + tied_doc_ids = sorted([doc_b.id, doc_c.id]) + assert by_document_id[tied_doc_ids[0]].link_score == pytest.approx(1.0 / (1 + 2)) + assert by_document_id[tied_doc_ids[1]].link_score == pytest.approx(1.0 / (1 + 3)) + + +def test_search_result_model_dump_contains_link_score(service: SearchService): + source = service.upsert_source("example.com", name="Example") + document = service.upsert_document( + source_id=source.id, + canonical_url="https://example.com/result-shape", + title="Result shape", + published_at=None, + content_markdown="alpha", + ) + service.chunk_document(document.id) + + results = service.search("alpha", limit=1) + + assert len(results) == 1 + payload = results[0].model_dump() + assert "link_score" in payload + assert isinstance(payload["link_score"], float) + + # Ingestion behavior implemented in service.py +def test_non_feed_http_get_uses_configured_browser_headers(service: SearchService): + captured: dict[str, httpx.Request] = {} + + def handler(request: httpx.Request) -> httpx.Response: + captured["request"] = request + return httpx.Response(200, text="ok") + + service._http_client.close() + service._http_client = httpx.Client( + transport=httpx.MockTransport(handler), + headers=service_module._DEFAULT_HEADERS, + ) + + service._http_get("https://example.com/header-check") + + request = captured["request"] + for header_name, expected_value in service_module._DEFAULT_HEADERS.items(): + assert request.headers.get(header_name) == expected_value + + +def test_non_feed_http_get_maintains_cookies_for_service_run(service: SearchService): + seen_cookie_headers: list[str | None] = [] + + def handler(request: httpx.Request) -> httpx.Response: + seen_cookie_headers.append(request.headers.get("cookie")) + if request.url.path == "/set-cookie": + return httpx.Response(200, headers={"Set-Cookie": "sessionid=abc123; Path=/"}, text="set") + return httpx.Response(200, text="ok") + + service._http_client.close() + service._http_client = httpx.Client( + transport=httpx.MockTransport(handler), + headers=service_module._DEFAULT_HEADERS, + ) + + service._http_get("https://example.com/set-cookie") + service._http_get("https://example.com/follow-up") + + assert seen_cookie_headers == [None, "sessionid=abc123"] + + def test_create_document_from_url(service: SearchService, http_server): document = service.create_document_from_url(f"{http_server}/article") @@ -609,6 +844,20 @@ def test_parse_datetime_returns_none_for_invalid_values(): assert service_module._parse_datetime("not a date") is None +def test_classify_backoff_response_checks_captcha_markers_only_in_html_body(): + head_only = httpx.Response( + 200, + text="captcha challengeall clear", + ) + assert service_module._classify_backoff_response(head_only) is None + + body_marker = httpx.Response( + 200, + text="okPlease verify you are human", + ) + assert service_module._classify_backoff_response(body_marker) == "body-marker=verify you are human" + + def test_search_returns_empty_when_ranked_chunk_rows_are_missing(service: SearchService): service.connection.execute( "INSERT INTO chunks_vec (rowid, embedding) VALUES (?, ?)", @@ -619,6 +868,57 @@ def test_search_returns_empty_when_ranked_chunk_rows_are_missing(service: Search assert service.search("orphan chunk", limit=5) == [] +def test_search_skips_chunk_ids_missing_after_scoring(service: SearchService): + source = service.upsert_source("example.com", name="Example") + doc_a = service.upsert_document( + source_id=source.id, + canonical_url="https://example.com/missing-after-score-a", + title="A", + published_at=None, + content_markdown="alpha", + ) + doc_b = service.upsert_document( + source_id=source.id, + canonical_url="https://example.com/missing-after-score-b", + title="B", + published_at=None, + content_markdown="alpha", + ) + service.synchronize_document_chunks() + + real_connection = service.connection + + class DeletingConnectionProxy: + def __init__(self, connection): + self._connection = connection + self._deleted = False + + def execute(self, sql, params=()): + if ( + "FROM chunks" in sql + and "JOIN documents ON documents.id = chunks.document_id" in sql + and "WHERE chunks.id IN" in sql + and not self._deleted + and params + ): + self._connection.execute("DELETE FROM chunks WHERE id = ?", (params[0],)) + self._connection.commit() + self._deleted = True + return self._connection.execute(sql, params) + + def __getattr__(self, name): + return getattr(self._connection, name) + + service.connection = DeletingConnectionProxy(real_connection) + try: + results = service.search("alpha", limit=2) + finally: + service.connection = real_connection + + assert len(results) == 1 + assert results[0].document.id in {doc_a.id, doc_b.id} + + def test_create_document_from_url_rejects_empty_extracted_content(service: SearchService, http_server, monkeypatch): class EmptyReadable: def __init__(self, _html: str):