diff --git a/integrations/nvidia/pyproject.toml b/integrations/nvidia/pyproject.toml index 82fb32b95..037ed1b62 100644 --- a/integrations/nvidia/pyproject.toml +++ b/integrations/nvidia/pyproject.toml @@ -48,7 +48,9 @@ dependencies = [ "pytest-rerunfailures", "haystack-pydoc-tools", "requests_mock", + "tritonclient[http,grpc]", ] + [tool.hatch.envs.default.scripts] test = "pytest {args:tests}" test-cov = "coverage run -m pytest {args:tests}" @@ -70,7 +72,7 @@ style = [ "ruff check {args:. --exclude tests/}", "black --check --diff {args:.}", ] -fmt = ["black {args:.}", "ruff --fix {args:. --exclude tests/}", "style"] +fmt = ["black {args:.}", "ruff check --fix {args:. --exclude tests/}", "style"] all = ["style", "typing"] [tool.black] @@ -160,6 +162,7 @@ module = [ "numpy.*", "requests_mock.*", "pydantic.*", + "tritonclient.*", ] ignore_missing_imports = true diff --git a/integrations/nvidia/src/haystack_integrations/components/embedders/nvidia/document_embedder.py b/integrations/nvidia/src/haystack_integrations/components/embedders/nvidia/document_embedder.py index 3e911e4f4..5427975e2 100644 --- a/integrations/nvidia/src/haystack_integrations/components/embedders/nvidia/document_embedder.py +++ b/integrations/nvidia/src/haystack_integrations/components/embedders/nvidia/document_embedder.py @@ -1,11 +1,11 @@ import warnings -from typing import Any, Dict, List, Optional, Tuple, Union +from typing import Any, Dict, List, Literal, Optional, Tuple, Union from haystack import Document, component, default_from_dict, default_to_dict from haystack.utils import Secret, deserialize_secrets_inplace from tqdm import tqdm -from haystack_integrations.utils.nvidia import NimBackend, is_hosted, url_validation +from haystack_integrations.utils.nvidia import NimBackend, TritonBackend, is_hosted, url_validation from .truncate import EmbeddingTruncateMode @@ -16,7 +16,7 @@ class NvidiaDocumentEmbedder: """ A component for embedding documents using embedding models provided by - [NVIDIA NIMs](https://ai.nvidia.com). + [NVIDIA NIMs](https://ai.nvidia.com) or [NIVIDIA Triton](https://developer.nvidia.com/triton-inference-server). Usage example: ```python @@ -44,6 +44,8 @@ def __init__( meta_fields_to_embed: Optional[List[str]] = None, embedding_separator: str = "\n", truncate: Optional[Union[EmbeddingTruncateMode, str]] = None, + backend: Literal["nim", "triton-http", "triton-grpc"] = "nim", + timeout: Optional[float] = None, ): """ Create a NvidiaTextEmbedder component. @@ -73,19 +75,30 @@ def __init__( :param truncate: Specifies how inputs longer that the maximum token length should be truncated. If None the behavior is model-dependent, see the official documentation for more information. + :param backend: + The backend to use for the component. Currently supported are "nim", "triton-http", and "triton-grpc". + Default is "nim". + :param timeout: + Timeout for the request in seconds. If not set, defaults either to `NVIDIA_TIMEOUT` environment variable + or 60 seconds. """ self.api_key = api_key self.model = model - self.api_url = url_validation(api_url, _DEFAULT_API_URL, ["v1/embeddings"]) + self.api_url = url_validation(api_url, _DEFAULT_API_URL, ["v1/embeddings"]) if backend == "nim" else api_url self.prefix = prefix self.suffix = suffix self.batch_size = batch_size self.progress_bar = progress_bar self.meta_fields_to_embed = meta_fields_to_embed or [] self.embedding_separator = embedding_separator + self._backend = backend + self.timeout = timeout if isinstance(truncate, str): + if self._backend != "nim": + error_message = "Truncation is only supported with the nim backend." + raise ValueError(error_message) truncate = EmbeddingTruncateMode.from_str(truncate) self.truncate = truncate @@ -121,15 +134,25 @@ def warm_up(self): if self._initialized: return - model_kwargs = {"input_type": "passage"} - if self.truncate is not None: - model_kwargs["truncate"] = str(self.truncate) - self.backend = NimBackend( - self.model, - api_url=self.api_url, - api_key=self.api_key, - model_kwargs=model_kwargs, - ) + if self._backend == "nim": + model_kwargs = {"input_type": "passage"} + if self.truncate is not None: + model_kwargs["truncate"] = str(self.truncate) + self.backend = NimBackend( + self.model, + api_url=self.api_url, + api_key=self.api_key, + model_kwargs=model_kwargs, + timeout=self.timeout, + ) + else: + self.backend = TritonBackend( + model=self.model, + api_url=self.api_url, + api_key=self.api_key, + protocol="http" if self._backend == "triton-http" else "grpc", + timeout=self.timeout, + ) self._initialized = True @@ -155,6 +178,7 @@ def to_dict(self) -> Dict[str, Any]: meta_fields_to_embed=self.meta_fields_to_embed, embedding_separator=self.embedding_separator, truncate=str(self.truncate) if self.truncate is not None else None, + backend=self._backend, ) @classmethod diff --git a/integrations/nvidia/src/haystack_integrations/components/embedders/nvidia/text_embedder.py b/integrations/nvidia/src/haystack_integrations/components/embedders/nvidia/text_embedder.py index 0387c32b7..b356dd778 100644 --- a/integrations/nvidia/src/haystack_integrations/components/embedders/nvidia/text_embedder.py +++ b/integrations/nvidia/src/haystack_integrations/components/embedders/nvidia/text_embedder.py @@ -1,10 +1,10 @@ import warnings -from typing import Any, Dict, List, Optional, Union +from typing import Any, Dict, List, Literal, Optional, Union from haystack import component, default_from_dict, default_to_dict from haystack.utils import Secret, deserialize_secrets_inplace -from haystack_integrations.utils.nvidia import NimBackend, is_hosted, url_validation +from haystack_integrations.utils.nvidia import NimBackend, TritonBackend, is_hosted, url_validation from .truncate import EmbeddingTruncateMode @@ -15,7 +15,7 @@ class NvidiaTextEmbedder: """ A component for embedding strings using embedding models provided by - [NVIDIA NIMs](https://ai.nvidia.com). + [NVIDIA NIMs](https://ai.nvidia.com) or [NIVIDIA Triton](https://developer.nvidia.com/triton-inference-server). For models that differentiate between query and document inputs, this component embeds the input string as a query. @@ -41,6 +41,8 @@ def __init__( prefix: str = "", suffix: str = "", truncate: Optional[Union[EmbeddingTruncateMode, str]] = None, + backend: Literal["nim", "triton-http", "triton-grpc"] = "nim", + timeout: Optional[float] = None, ): """ Create a NvidiaTextEmbedder component. @@ -61,15 +63,26 @@ def __init__( :param truncate: Specifies how inputs longer that the maximum token length should be truncated. If None the behavior is model-dependent, see the official documentation for more information. + :param backend: + The backend to use for the component. Currently supported are "nim", "triton-http", and "triton-grpc". + Default is "nim". + :param timeout: + Timeout for the request in seconds. If not set, defaults either to `NVIDIA_TIMEOUT` environment variable + or 60 seconds. """ self.api_key = api_key self.model = model - self.api_url = url_validation(api_url, _DEFAULT_API_URL, ["v1/embeddings"]) + self.api_url = url_validation(api_url, _DEFAULT_API_URL, ["v1/embeddings"]) if backend == "nim" else api_url self.prefix = prefix self.suffix = suffix + self._backend = backend + self.timeout = timeout if isinstance(truncate, str): + if self._backend != "nim": + error_message = "Truncation is only supported with the nim backend." + raise ValueError(error_message) truncate = EmbeddingTruncateMode.from_str(truncate) self.truncate = truncate @@ -105,15 +118,25 @@ def warm_up(self): if self._initialized: return - model_kwargs = {"input_type": "query"} - if self.truncate is not None: - model_kwargs["truncate"] = str(self.truncate) - self.backend = NimBackend( - self.model, - api_url=self.api_url, - api_key=self.api_key, - model_kwargs=model_kwargs, - ) + if self._backend == "nim": + model_kwargs = {"input_type": "query"} + if self.truncate is not None: + model_kwargs["truncate"] = str(self.truncate) + self.backend = NimBackend( + self.model, + api_url=self.api_url, + api_key=self.api_key, + model_kwargs=model_kwargs, + timeout=self.timeout, + ) + else: + self.backend = TritonBackend( + model=self.model, + api_url=self.api_url, + api_key=self.api_key, + protocol="http" if self._backend == "triton-http" else "grpc", + timeout=self.timeout, + ) self._initialized = True @@ -135,6 +158,7 @@ def to_dict(self) -> Dict[str, Any]: prefix=self.prefix, suffix=self.suffix, truncate=str(self.truncate) if self.truncate is not None else None, + backend=self._backend, ) @classmethod diff --git a/integrations/nvidia/src/haystack_integrations/utils/nvidia/__init__.py b/integrations/nvidia/src/haystack_integrations/utils/nvidia/__init__.py index da301d29d..4378203a0 100644 --- a/integrations/nvidia/src/haystack_integrations/utils/nvidia/__init__.py +++ b/integrations/nvidia/src/haystack_integrations/utils/nvidia/__init__.py @@ -1,4 +1,5 @@ -from .nim_backend import Model, NimBackend -from .utils import is_hosted, url_validation +from .nim_backend import NimBackend +from .triton_backend import TritonBackend +from .utils import Model, is_hosted, url_validation -__all__ = ["NimBackend", "Model", "is_hosted", "url_validation"] +__all__ = ["NimBackend", "TritonBackend", "Model", "is_hosted", "url_validation"] diff --git a/integrations/nvidia/src/haystack_integrations/utils/nvidia/nim_backend.py b/integrations/nvidia/src/haystack_integrations/utils/nvidia/nim_backend.py index 0d1f57e5c..cdea4182f 100644 --- a/integrations/nvidia/src/haystack_integrations/utils/nvidia/nim_backend.py +++ b/integrations/nvidia/src/haystack_integrations/utils/nvidia/nim_backend.py @@ -1,27 +1,10 @@ -from dataclasses import dataclass, field from typing import Any, Dict, List, Optional, Tuple import requests from haystack import Document from haystack.utils import Secret -REQUEST_TIMEOUT = 60 - - -@dataclass -class Model: - """ - Model information. - - id: unique identifier for the model, passed as model parameter for requests - aliases: list of aliases for the model - base_model: root model for the model - All aliases are deprecated and will trigger a warning when used. - """ - - id: str - aliases: Optional[List[str]] = field(default_factory=list) - base_model: Optional[str] = None +from haystack_integrations.utils.nvidia.utils import REQUEST_TIMEOUT, Model class NimBackend: @@ -31,6 +14,7 @@ def __init__( api_url: str, api_key: Optional[Secret] = Secret.from_env_var("NVIDIA_API_KEY"), model_kwargs: Optional[Dict[str, Any]] = None, + timeout: Optional[float] = None, ): headers = { "Content-Type": "application/json", @@ -47,6 +31,10 @@ def __init__( self.api_url = api_url self.model_kwargs = model_kwargs or {} + if timeout is None: + timeout = REQUEST_TIMEOUT + self.timeout = timeout + def embed(self, texts: List[str]) -> Tuple[List[List[float]], Dict[str, Any]]: url = f"{self.api_url}/embeddings" @@ -57,7 +45,7 @@ def embed(self, texts: List[str]) -> Tuple[List[List[float]], Dict[str, Any]]: "input": texts, **self.model_kwargs, }, - timeout=REQUEST_TIMEOUT, + timeout=self.timeout, ) res.raise_for_status() @@ -85,7 +73,7 @@ def generate(self, prompt: str) -> Tuple[List[str], List[Dict[str, Any]]]: ], **self.model_kwargs, }, - timeout=REQUEST_TIMEOUT, + timeout=self.timeout, ) res.raise_for_status() @@ -120,7 +108,7 @@ def models(self) -> List[Model]: res = self.session.get( url, - timeout=REQUEST_TIMEOUT, + timeout=self.timeout, ) res.raise_for_status() @@ -147,7 +135,7 @@ def rank( "passages": [{"text": doc.content} for doc in documents], **self.model_kwargs, }, - timeout=REQUEST_TIMEOUT, + timeout=self.timeout, ) res.raise_for_status() diff --git a/integrations/nvidia/src/haystack_integrations/utils/nvidia/triton_backend.py b/integrations/nvidia/src/haystack_integrations/utils/nvidia/triton_backend.py new file mode 100644 index 000000000..235099392 --- /dev/null +++ b/integrations/nvidia/src/haystack_integrations/utils/nvidia/triton_backend.py @@ -0,0 +1,86 @@ +from typing import Any, Dict, List, Literal, Optional, Tuple + +import numpy as np +from haystack import Document +from haystack.lazy_imports import LazyImport +from haystack.utils import Secret + +from haystack_integrations.utils.nvidia.utils import REQUEST_TIMEOUT, Model + +with LazyImport("Run 'pip install tritonclient[http]'") as tritonclient_http: + import tritonclient.http + +with LazyImport("Run 'pip install tritonclient[grpc]'") as tritonclient_grpc: + import tritonclient.grpc + + +class TritonBackend: + def __init__( + self, + model: str, + api_url: str, + api_key: Optional[Secret] = Secret.from_env_var("NVIDIA_API_KEY"), + model_kwargs: Optional[Dict[str, Any]] = None, + protocol: Literal["http", "grpc"] = "http", + timeout: Optional[float] = None, + ): + self.headers = {} + + if api_key: + self.headers["authorization"] = f"Bearer {api_key.resolve_value()}" + + if protocol == "grpc": + tritonclient_grpc.check() + self.triton = tritonclient.grpc + else: + tritonclient_http.check() + self.triton = tritonclient.http + + self.client = self.triton.InferenceServerClient(url=api_url) + + self.model = model + self.api_url = api_url + self.model_kwargs = model_kwargs or {} + + if timeout is None: + timeout = REQUEST_TIMEOUT + self.timeout = int(timeout) + + def embed(self, texts: List[str]) -> Tuple[List[List[float]], Dict[str, Any]]: + inputs = [] + text_input = self.triton.InferInput("text", [len(texts)], "BYTES") + text_input.set_data_from_numpy(np.array(texts, dtype=object)) + inputs.append(text_input) + + results = self.client.infer( + model_name=self.model, + inputs=inputs, + headers=self.headers, + timeout=self.timeout, + ) + + embeddings = results.as_numpy("embeddings").tolist() + + return embeddings, {} + + def generate(self, prompt: str) -> Tuple[List[str], List[Dict[str, Any]]]: + raise NotImplementedError() + + def models(self) -> List[Model]: + data = self.client.get_model_repository_index( + headers=self.headers, + ) + + models = [Model(result["name"]) for result in data] + if not models: + msg = f"No hosted model were found at URL '{self.api_url}'." + raise ValueError(msg) + return models + + def rank( + self, + query: str, + documents: List[Document], + endpoint: Optional[str] = None, + ) -> List[Dict[str, Any]]: + raise NotImplementedError() diff --git a/integrations/nvidia/src/haystack_integrations/utils/nvidia/utils.py b/integrations/nvidia/src/haystack_integrations/utils/nvidia/utils.py index 7d4dfc3b4..9a13e6111 100644 --- a/integrations/nvidia/src/haystack_integrations/utils/nvidia/utils.py +++ b/integrations/nvidia/src/haystack_integrations/utils/nvidia/utils.py @@ -1,7 +1,11 @@ +import os import warnings -from typing import List +from dataclasses import dataclass, field +from typing import List, Optional from urllib.parse import urlparse, urlunparse +REQUEST_TIMEOUT = float(os.environ.get("NVIDIA_TIMEOUT", 60.0)) + def url_validation(api_url: str, default_api_url: str, allowed_paths: List[str]) -> str: """ @@ -45,3 +49,19 @@ def is_hosted(api_url: str): "integrate.api.nvidia.com", "ai.api.nvidia.com", ] + + +@dataclass +class Model: + """ + Model information. + + id: unique identifier for the model, passed as model parameter for requests + aliases: list of aliases for the model + base_model: root model for the model + All aliases are deprecated and will trigger a warning when used. + """ + + id: str + aliases: Optional[List[str]] = field(default_factory=list) + base_model: Optional[str] = None diff --git a/integrations/nvidia/tests/test_document_embedder.py b/integrations/nvidia/tests/test_document_embedder.py index bef0f996e..3d55d3289 100644 --- a/integrations/nvidia/tests/test_document_embedder.py +++ b/integrations/nvidia/tests/test_document_embedder.py @@ -71,9 +71,22 @@ def test_to_dict(self, monkeypatch): "meta_fields_to_embed": [], "embedding_separator": "\n", "truncate": None, + "backend": "nim", }, } + @pytest.mark.parametrize("backend", ["triton-http", "triton-grpc"]) + def test_init_with_triton_backend(self, monkeypatch, backend: str): + monkeypatch.setenv("NVIDIA_API_KEY", "fake-api-key") + embedder = NvidiaDocumentEmbedder(backend=backend, model="my-triton-model", api_url="localhost:8000") + + assert embedder.api_key == Secret.from_env_var("NVIDIA_API_KEY") + assert embedder.api_url == "localhost:8000" + assert embedder.model == "my-triton-model" + assert embedder.prefix == "" + assert embedder.suffix == "" + assert embedder._backend == backend + def test_to_dict_with_custom_init_parameters(self, monkeypatch): monkeypatch.setenv("NVIDIA_API_KEY", "fake-api-key") component = NvidiaDocumentEmbedder( @@ -101,6 +114,7 @@ def test_to_dict_with_custom_init_parameters(self, monkeypatch): "meta_fields_to_embed": ["test_field"], "embedding_separator": " | ", "truncate": "END", + "backend": "nim", }, } @@ -418,3 +432,40 @@ def test_run_integration_with_api_catalog(self, model, api_url): for doc in docs_with_embeddings: assert isinstance(doc.embedding, list) assert isinstance(doc.embedding[0], float) + + @pytest.mark.skipif( + not os.environ.get("NVIDIA_TRITON_EMBEDDER_MODEL", None) + or not os.environ.get( + "NVIDIA_TRITON_HTTP_ENDPOINT_URL", os.environ.get("NVIDIA_TRITON_GRPC_ENDPOINT_URL", None) + ), + reason="Export an env var called NVIDIA_TRITON_EMBEDDER_MODEL containing the hosted model name and " + "NVIDIA_TRITON_HTTP_ENDPOINT_URL or NVIDIA_TRITON_GRPC_ENDPOINT_URL containing the local URL to call.", + ) + @pytest.mark.parametrize("backend", ["triton-http", "triton-grpc"]) + @pytest.mark.integration + def test_run_integration_with_triton_backend(self, backend: str): + model = os.environ["NVIDIA_TRITON_EMBEDDER_MODEL"] + url = os.environ[ + "NVIDIA_TRITON_HTTP_ENDPOINT_URL" if backend == "triton-http" else "NVIDIA_TRITON_GRPC_ENDPOINT_URL" + ] + embedder = NvidiaDocumentEmbedder( + model=model, + api_url=url, + api_key=None, + backend=backend, + ) + embedder.warm_up() + + docs = [ + Document(content="I love cheese", meta={"topic": "Cuisine"}), + Document(content="A transformer is a deep learning architecture", meta={"topic": "ML"}), + ] + + result = embedder.run(docs) + docs_with_embeddings = result["documents"] + + assert isinstance(docs_with_embeddings, list) + assert len(docs_with_embeddings) == len(docs) + for doc in docs_with_embeddings: + assert isinstance(doc.embedding, list) + assert isinstance(doc.embedding[0], float) diff --git a/integrations/nvidia/tests/test_text_embedder.py b/integrations/nvidia/tests/test_text_embedder.py index 7c8428cc2..b0478a924 100644 --- a/integrations/nvidia/tests/test_text_embedder.py +++ b/integrations/nvidia/tests/test_text_embedder.py @@ -39,6 +39,18 @@ def test_init_fail_wo_api_key(self, monkeypatch): with pytest.raises(ValueError): embedder.warm_up() + @pytest.mark.parametrize("backend", ["triton-http", "triton-grpc"]) + def test_init_with_triton_backend(self, monkeypatch, backend: str): + monkeypatch.setenv("NVIDIA_API_KEY", "fake-api-key") + embedder = NvidiaTextEmbedder(backend=backend, model="my-triton-model", api_url="localhost:8000") + + assert embedder.api_key == Secret.from_env_var("NVIDIA_API_KEY") + assert embedder.api_url == "localhost:8000" + assert embedder.model == "my-triton-model" + assert embedder.prefix == "" + assert embedder.suffix == "" + assert embedder._backend == backend + def test_to_dict(self, monkeypatch): monkeypatch.setenv("NVIDIA_API_KEY", "fake-api-key") component = NvidiaTextEmbedder("nvolveqa_40k") @@ -52,6 +64,7 @@ def test_to_dict(self, monkeypatch): "prefix": "", "suffix": "", "truncate": None, + "backend": "nim", }, } @@ -74,6 +87,7 @@ def test_to_dict_with_custom_init_parameters(self, monkeypatch): "prefix": "prefix", "suffix": "suffix", "truncate": "START", + "backend": "nim", }, } @@ -208,3 +222,31 @@ def test_run_integration_with_api_catalog(self, model, api_url): assert all(isinstance(x, float) for x in embedding) assert "usage" in meta + + @pytest.mark.skipif( + not os.environ.get("NVIDIA_TRITON_EMBEDDER_MODEL", None) + or not os.environ.get( + "NVIDIA_TRITON_HTTP_ENDPOINT_URL", os.environ.get("NVIDIA_TRITON_GRPC_ENDPOINT_URL", None) + ), + reason="Export an env var called NVIDIA_TRITON_EMBEDDER_MODEL containing the hosted model name and " + "NVIDIA_TRITON_HTTP_ENDPOINT_URL or NVIDIA_TRITON_GRPC_ENDPOINT_URL containing the local URL to call.", + ) + @pytest.mark.parametrize("backend", ["triton-http", "triton-grpc"]) + @pytest.mark.integration + def test_run_integration_with_triton_backend(self, backend: str): + model = os.environ["NVIDIA_TRITON_EMBEDDER_MODEL"] + url = os.environ[ + "NVIDIA_TRITON_HTTP_ENDPOINT_URL" if backend == "triton-http" else "NVIDIA_TRITON_GRPC_ENDPOINT_URL" + ] + embedder = NvidiaTextEmbedder( + model=model, + api_url=url, + api_key=None, + backend=backend, + ) + embedder.warm_up() + + result = embedder.run("A transformer is a deep learning architecture") + embedding = result["embedding"] + + assert all(isinstance(x, float) for x in embedding)