-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy path__init__.py
107 lines (78 loc) · 2.68 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import asyncio
import os
import time
import dotenv
from upstash_vector import AsyncIndex, Index
from upstash_vector.core.index_operations import DEFAULT_NAMESPACE
dotenv.load_dotenv()
NAMESPACES = [DEFAULT_NAMESPACE, "ns"]
INDEX_URL = os.environ["URL"]
INDEX_TOKEN = os.environ["TOKEN"]
SPARSE_INDEX_URL = os.environ["SPARSE_URL"]
SPARSE_INDEX_TOKEN = os.environ["SPARSE_TOKEN"]
HYBRID_INDEX_URL = os.environ["HYBRID_URL"]
HYBRID_INDEX_TOKEN = os.environ["HYBRID_TOKEN"]
EMBEDDING_INDEX_URL = os.environ["EMBEDDING_URL"]
EMBEDDING_INDEX_TOKEN = os.environ["EMBEDDING_TOKEN"]
HYBRID_EMBEDDING_INDEX_URL = os.environ["HYBRID_EMBEDDING_URL"]
HYBRID_EMBEDDING_INDEX_TOKEN = os.environ["HYBRID_EMBEDDING_TOKEN"]
def assert_eventually(assertion, retry_delay=0.5, timeout=5.0):
deadline = time.time() + timeout
last_err = None
while time.time() < deadline:
try:
assertion()
return
except AssertionError as e:
last_err = e
time.sleep(retry_delay)
if last_err is None:
raise AssertionError("Couldn't run the assertion")
raise last_err
async def assert_eventually_async(assertion, retry_delay=0.5, timeout=5.0):
deadline = time.time() + timeout
last_err = None
while time.time() < deadline:
try:
await assertion()
return
except AssertionError as e:
last_err = e
await asyncio.sleep(retry_delay)
if last_err is None:
raise AssertionError("Couldn't run the assertion")
raise last_err
def ensure_ns_exists(index: Index, ns: str):
"""
Ensures the given namespace exists in the index by upserting some
random vector into it, and calling reset.
No need to call this method, if you are upserting some vector/data.
"""
if ns == DEFAULT_NAMESPACE:
return
index.upsert(
vectors=[("0", [0.1, 0.1])],
namespace=ns,
)
def assertion():
info = index.info()
assert info.namespaces[ns].pending_vector_count == 0
assert_eventually(assertion)
index.reset(namespace=ns)
async def ensure_ns_exists_async(index: AsyncIndex, ns: str):
"""
Ensures the given namespace exists in the index by upserting some
random vector into it, and calling reset.
No need to call this method, if you are upserting some vector/data.
"""
if ns == DEFAULT_NAMESPACE:
return
await index.upsert(
vectors=[("0", [0.1, 0.1])],
namespace=ns,
)
async def assertion():
info = await index.info()
assert info.namespaces[ns].pending_vector_count == 0
await assert_eventually_async(assertion)
await index.reset(namespace=ns)