Skip to content

Commit 5a48c90

Browse files
committed
Fix grpc setup timeout
1 parent 239824b commit 5a48c90

1 file changed

Lines changed: 40 additions & 48 deletions

File tree

sdks/python/apache_beam/ml/rag/test_utils.py

Lines changed: 40 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import contextlib
1919
import logging
2020
import os
21-
import socket
2221
import tempfile
2322
import unittest
2423
from dataclasses import dataclass
@@ -47,6 +46,10 @@
4746

4847
_LOGGER = logging.getLogger(__name__)
4948

49+
# Milvus standalone defaults (match testcontainers MilvusContainer).
50+
_MILVUS_SERVICE_PORT = 19530
51+
_MILVUS_METRICS_PORT = 9091
52+
5053

5154
@dataclass
5255
class VectorDBContainerInfo:
@@ -68,58 +71,25 @@ def uri(self) -> str:
6871
return f"http://{self.host}:{self.port}"
6972

7073

71-
class TestHelpers:
72-
@staticmethod
73-
def find_free_port():
74-
"""Find a free port on the local machine."""
75-
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
76-
# Bind to port 0, which asks OS to assign a free port.
77-
s.bind(('', 0))
78-
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
79-
# Return the port number assigned by OS.
80-
return s.getsockname()[1]
81-
82-
8374
class CustomMilvusContainer(MilvusContainer):
84-
"""Custom Milvus container with configurable ports and environment setup.
85-
86-
Extends MilvusContainer to provide custom port binding and environment
87-
configuration for testing with standalone Milvus instances.
88-
"""
75+
"""Milvus container with user.yaml volume for integration test configuration."""
8976

9077
def __init__( # pylint: disable=bad-super-call
9178
self,
9279
image: str,
93-
service_container_port,
94-
healthcheck_container_port,
9580
**kwargs,
9681
) -> None:
97-
# Skip the parent class's constructor and go straight to
98-
# GenericContainer.
99-
super(
100-
MilvusContainer,
101-
self,
102-
).__init__(
103-
image=image, **kwargs)
104-
self.port = service_container_port
105-
self.healthcheck_port = healthcheck_container_port
106-
self.with_exposed_ports(service_container_port, healthcheck_container_port)
107-
108-
# Get free host ports.
109-
service_host_port = TestHelpers.find_free_port()
110-
healthcheck_host_port = TestHelpers.find_free_port()
111-
112-
# Bind container and host ports.
113-
self.with_bind_ports(service_container_port, service_host_port)
114-
self.with_bind_ports(healthcheck_container_port, healthcheck_host_port)
82+
super(MilvusContainer, self).__init__(image=image, **kwargs)
83+
self.port = _MILVUS_SERVICE_PORT
84+
self.healthcheck_port = _MILVUS_METRICS_PORT
85+
self.with_exposed_ports(self.port, self.healthcheck_port)
11586
self.cmd = "milvus run standalone"
11687

117-
# Set environment variables needed for Milvus.
11888
envs = {
11989
"ETCD_USE_EMBED": "true",
12090
"ETCD_DATA_DIR": "/var/lib/milvus/etcd",
12191
"COMMON_STORAGETYPE": "local",
122-
"METRICS_PORT": str(healthcheck_container_port)
92+
"METRICS_PORT": str(_MILVUS_METRICS_PORT),
12393
}
12494
for env, value in envs.items():
12595
self.with_env(env, value)
@@ -137,36 +107,54 @@ class MilvusTestHelpers:
137107
# https://milvus.io/docs/release_notes.md or PyPI at
138108
# https://pypi.org/project/pymilvus/ for version compatibility.
139109
# Example: Milvus v2.6.0 requires pymilvus==2.6.0 (exact match required).
110+
@staticmethod
111+
def _wait_for_milvus_grpc(uri: str) -> None:
112+
"""Wait until Milvus gRPC proxy accepts connections.
113+
114+
MilvusContainer.start() only health-checks the metrics HTTP port; the gRPC
115+
proxy can become ready later. Use the same bounded retry budget as other
116+
Milvus client setup in this module (well under the pytest 600s limit).
117+
"""
118+
def list_collections_probe():
119+
client = MilvusClient(uri=uri)
120+
try:
121+
client.list_collections()
122+
finally:
123+
client.close()
124+
125+
retry_with_backoff(
126+
list_collections_probe,
127+
max_retries=5,
128+
retry_delay=2.0,
129+
operation_name="Milvus client connection after container start",
130+
exception_types=(MilvusException, ))
131+
140132
@staticmethod
141133
def start_db_container(
142134
image="milvusdb/milvus:v2.5.10",
143135
max_vec_fields=5,
144136
vector_client_max_retries=3,
145137
tc_max_retries=None) -> Optional[VectorDBContainerInfo]:
146-
service_container_port = TestHelpers.find_free_port()
147-
healthcheck_container_port = TestHelpers.find_free_port()
148138
user_yaml_creator = MilvusTestHelpers.create_user_yaml
149-
with user_yaml_creator(service_container_port, max_vec_fields) as cfg:
139+
with user_yaml_creator(_MILVUS_SERVICE_PORT, max_vec_fields) as cfg:
150140
info = None
151141
original_tc_max_tries = testcontainers_config.max_tries
152142
if tc_max_retries is not None:
153143
testcontainers_config.max_tries = tc_max_retries
154144
for i in range(vector_client_max_retries):
155145
vector_db_container: Optional[CustomMilvusContainer] = None
156146
try:
157-
vector_db_container = CustomMilvusContainer(
158-
image=image,
159-
service_container_port=service_container_port,
160-
healthcheck_container_port=healthcheck_container_port)
147+
vector_db_container = CustomMilvusContainer(image=image)
161148
mapped_container = vector_db_container.with_volume_mapping(
162149
cfg, "/milvus/configs/user.yaml")
163150
assert mapped_container is not None
164151
running_container: CustomMilvusContainer = mapped_container
165152
vector_db_container = running_container
166153
running_container.start()
167154
host = running_container.get_container_host_ip()
168-
port = running_container.get_exposed_port(service_container_port)
155+
port = running_container.get_exposed_port(_MILVUS_SERVICE_PORT)
169156
info = VectorDBContainerInfo(running_container, host, port)
157+
MilvusTestHelpers._wait_for_milvus_grpc(info.uri)
170158
_LOGGER.info(
171159
"milvus db container started successfully on %s.", info.uri)
172160
break
@@ -176,6 +164,10 @@ def start_db_container(
176164
raw_out, raw_err = vector_db_container.get_logs()
177165
stdout_logs = raw_out.decode("utf-8")
178166
stderr_logs = raw_err.decode("utf-8")
167+
try:
168+
vector_db_container.stop()
169+
except Exception: # pylint: disable=broad-except
170+
pass
179171
_LOGGER.warning(
180172
"Retry %d/%d: Failed to start Milvus DB container. Reason: %s. "
181173
"STDOUT logs:\n%s\nSTDERR logs:\n%s",

0 commit comments

Comments
 (0)