-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Open
Description
using DEFAULT_DATABASE="example"
import json
import os
from datetime import datetime, timezone
from dotenv import load_dotenv
from graphiti_core import Graphiti
from graphiti_core.cross_encoder.openai_reranker_client import OpenAIRerankerClient
from graphiti_core.embedder.openai import OpenAIEmbedder, OpenAIEmbedderConfig
from graphiti_core.llm_client import OpenAIClient
from graphiti_core.nodes import EpisodeType
from openai import AsyncAzureOpenAI
# Azure OpenAI configuration
api_key = os.getenv("AZURE_KEY")
api_version = os.getenv("AZURE_VERSION")
azure_endpoint = os.getenv("AZURE_URL")
# Create Azure OpenAI client for LLM
azure_openai_client = AsyncAzureOpenAI(api_key=api_key, api_version=api_version, azure_endpoint=azure_endpoint)
# Initialize Graphiti with Azure OpenAI clients
graphiti = Graphiti(
os.getenv("NEO4J_URL"),
os.getenv("NEO4J_USER"),
os.getenv("NEO4J_PASSWORD"),
llm_client=OpenAIClient(client=azure_openai_client),
embedder=OpenAIEmbedder(
config=OpenAIEmbedderConfig(
embedding_model="text-embedding-3-small" # Use your Azure deployed embedding model name
),
client=azure_openai_client,
),
# Optional: Configure the OpenAI cross encoder with Azure OpenAI
cross_encoder=OpenAIRerankerClient(client=azure_openai_client),
)
# Initialize Graphiti with Neo4j connection
try:
# Initialize the graph database with graphiti's indices. This only needs to be done once.
await graphiti.build_indices_and_constraints()
# Additional code will go here
# Episodes list containing both text and JSON episodes
episodes = [
{
"content": "Kamala Harris is the Attorney General of California. She was previously "
"the district attorney for San Francisco.",
"type": EpisodeType.text,
"description": "podcast transcript",
},
{
"content": "As AG, Harris was in office from January 3, 2011 – January 3, 2017",
"type": EpisodeType.text,
"description": "podcast transcript",
},
{
"content": {
"name": "Gavin Newsom",
"position": "Governor",
"state": "California",
"previous_role": "Lieutenant Governor",
"previous_location": "San Francisco",
},
"type": EpisodeType.json,
"description": "podcast metadata",
},
{
"content": {
"name": "Gavin Newsom",
"position": "Governor",
"term_start": "January 7, 2019",
"term_end": "Present",
},
"type": EpisodeType.json,
"description": "podcast metadata",
},
]
# Add episodes to the graph
for i, episode in enumerate(episodes):
await graphiti.add_episode(
name=f"Freakonomics Radio {i}",
episode_body=episode["content"] if isinstance(episode["content"], str) else json.dumps(episode["content"]),
source=episode["type"],
source_description=episode["description"],
reference_time=datetime.now(timezone.utc),
)
print(f"Added episode: Freakonomics Radio {i} ({episode['type'].value})")
finally:
# Close the connection
await graphiti.close()
print("\nConnection closed")
error
---------------------------------------------------------------------------
ClientError Traceback (most recent call last)
Cell In[4], line 45
43 # Add episodes to the graph
44 for i, episode in enumerate(episodes):
---> 45 await graphiti.add_episode(
46 name=f"Freakonomics Radio {i}",
47 episode_body=episode["content"] if isinstance(episode["content"], str) else json.dumps(episode["content"]),
48 source=episode["type"],
49 source_description=episode["description"],
50 reference_time=datetime.now(timezone.utc),
51 )
52 print(f"Added episode: Freakonomics Radio {i} ({episode['type'].value})")
54 finally:
55 # Close the connection
File /opt/anaconda3/envs/grafiti/lib/python3.11/site-packages/graphiti_core/graphiti.py:499, in Graphiti.add_episode(self, name, episode_body, source_description, reference_time, source, group_id, uuid, update_communities, entity_types, previous_episode_uuids)
496 return AddEpisodeResults(episode=episode, nodes=nodes, edges=entity_edges)
498 except Exception as e:
--> 499 raise e
File /opt/anaconda3/envs/grafiti/lib/python3.11/site-packages/graphiti_core/graphiti.py:361, in Graphiti.add_episode(self, name, episode_body, source_description, reference_time, source, group_id, uuid, update_communities, entity_types, previous_episode_uuids)
355 await semaphore_gather(
356 *[node.generate_name_embedding(self.embedder) for node in extracted_nodes]
357 )
359 # Find relevant nodes already in the graph
360 existing_nodes_lists: list[list[EntityNode]] = list(
--> 361 await semaphore_gather(
362 *[
363 get_relevant_nodes(self.driver, SearchFilters(), [node])
364 for node in extracted_nodes
365 ]
366 )
367 )
369 # Resolve extracted nodes with nodes already in the graph and extract facts
370 logger.debug(f'Extracted nodes: {[(n.name, n.uuid) for n in extracted_nodes]}')
File /opt/anaconda3/envs/grafiti/lib/python3.11/site-packages/graphiti_core/helpers.py:96, in semaphore_gather(max_coroutines, *coroutines)
93 async with semaphore:
94 return await coroutine
---> 96 return await asyncio.gather(*(_wrap_coroutine(coroutine) for coroutine in coroutines))
File /opt/anaconda3/envs/grafiti/lib/python3.11/site-packages/graphiti_core/helpers.py:94, in semaphore_gather.<locals>._wrap_coroutine(coroutine)
92 async def _wrap_coroutine(coroutine):
93 async with semaphore:
---> 94 return await coroutine
File /opt/anaconda3/envs/grafiti/lib/python3.11/site-packages/graphiti_core/search/search_utils.py:668, in get_relevant_nodes(driver, search_filter, nodes)
638 async def get_relevant_nodes(
639 driver: AsyncDriver,
640 search_filter: SearchFilters,
641 nodes: list[EntityNode],
642 ) -> list[EntityNode]:
643 """
644 Retrieve relevant nodes based on the provided list of EntityNodes.
645
(...) 666 to use as search criteria.
667 """
--> 668 relevant_nodes = await hybrid_node_search(
669 [node.name for node in nodes],
670 [node.name_embedding for node in nodes if node.name_embedding is not None],
671 driver,
672 search_filter,
673 [node.group_id for node in nodes],
674 )
676 return relevant_nodes
File /opt/anaconda3/envs/grafiti/lib/python3.11/site-packages/graphiti_core/search/search_utils.py:612, in hybrid_node_search(queries, embeddings, driver, search_filter, group_ids, limit)
572 """
573 Perform a hybrid search for nodes using both text queries and embeddings.
574
(...) 607 limit (defined in the individual search functions) will be used.
608 """
610 start = time()
611 results: list[list[EntityNode]] = list(
--> 612 await semaphore_gather(
613 *[
614 node_fulltext_search(driver, q, search_filter, group_ids, 2 * limit)
615 for q in queries
616 ],
617 *[
618 node_similarity_search(driver, e, search_filter, group_ids, 2 * limit)
619 for e in embeddings
620 ],
621 )
622 )
624 node_uuid_map: dict[str, EntityNode] = {
625 node.uuid: node for result in results for node in result
626 }
627 result_uuids = [[node.uuid for node in result] for result in results]
File /opt/anaconda3/envs/grafiti/lib/python3.11/site-packages/graphiti_core/helpers.py:96, in semaphore_gather(max_coroutines, *coroutines)
93 async with semaphore:
94 return await coroutine
---> 96 return await asyncio.gather(*(_wrap_coroutine(coroutine) for coroutine in coroutines))
File /opt/anaconda3/envs/grafiti/lib/python3.11/site-packages/graphiti_core/helpers.py:94, in semaphore_gather.<locals>._wrap_coroutine(coroutine)
92 async def _wrap_coroutine(coroutine):
93 async with semaphore:
---> 94 return await coroutine
File /opt/anaconda3/envs/grafiti/lib/python3.11/site-packages/graphiti_core/search/search_utils.py:343, in node_fulltext_search(driver, query, search_filter, group_ids, limit)
339 return []
341 filter_query, filter_params = node_search_filter_query_constructor(search_filter)
--> 343 records, _, _ = await driver.execute_query(
344 """
345 CALL db.index.fulltext.queryNodes("node_name_and_summary", $query, {limit: $limit})
346 YIELD node AS node, score
347 MATCH (n:Entity)
348 WHERE n.uuid = node.uuid
349 """
350 + filter_query
351 + """
352 RETURN
353 n.uuid AS uuid,
354 n.group_id AS group_id,
355 n.name AS name,
356 n.name_embedding AS name_embedding,
357 n.created_at AS created_at,
358 n.summary AS summary,
359 labels(n) AS labels,
360 properties(n) AS attributes
361 ORDER BY score DESC
362 LIMIT $limit
363 """,
364 filter_params,
365 query=fuzzy_query,
366 group_ids=group_ids,
367 limit=limit,
368 database_=DEFAULT_DATABASE,
369 routing_='r',
370 )
371 nodes = [get_entity_node_from_record(record) for record in records]
373 return nodes
File /opt/anaconda3/envs/grafiti/lib/python3.11/site-packages/neo4j/_async/driver.py:971, in AsyncDriver.execute_query(self, query_, parameters_, routing_, database_, impersonated_user_, bookmark_manager_, auth_, result_transformer_, **kwargs)
967 raise ValueError(
968 f"Invalid routing control value: {routing_!r}"
969 )
970 with session._pipelined_begin:
--> 971 return await session._run_transaction(
972 access_mode,
973 TelemetryAPI.DRIVER,
974 work,
975 (query_str, parameters, result_transformer_),
976 {},
977 )
File /opt/anaconda3/envs/grafiti/lib/python3.11/site-packages/neo4j/_async/work/session.py:583, in AsyncSession._run_transaction(self, access_mode, api, transaction_function, args, kwargs)
581 tx = self._transaction
582 try:
--> 583 result = await transaction_function(tx, *args, **kwargs)
584 except asyncio.CancelledError:
585 # if cancellation callback has not been called yet:
586 if self._transaction is not None:
File /opt/anaconda3/envs/grafiti/lib/python3.11/site-packages/neo4j/_async/driver.py:1308, in _work(tx, query, parameters, transformer)
1301 async def _work(
1302 tx: AsyncManagedTransaction,
1303 query: te.LiteralString,
1304 parameters: dict[str, t.Any],
1305 transformer: t.Callable[[AsyncResult], t.Awaitable[_T]],
1306 ) -> _T:
1307 res = await tx.run(query, parameters)
-> 1308 return await transformer(res)
File /opt/anaconda3/envs/grafiti/lib/python3.11/site-packages/neo4j/_async/work/result.py:802, in AsyncResult.to_eager_result(self)
784 @AsyncNonConcurrentMethodChecker._non_concurrent_method
785 async def to_eager_result(self) -> EagerResult:
786 """
787 Convert this result to an :class:`.EagerResult`.
788
(...) 800 .. versionchanged:: 5.8 Stabilized from experimental.
801 """
--> 802 await self._buffer_all()
803 return EagerResult(
804 keys=list(self.keys()),
805 records=await AsyncUtil.list(self),
806 summary=await self.consume(),
807 )
File /opt/anaconda3/envs/grafiti/lib/python3.11/site-packages/neo4j/_async/work/result.py:459, in AsyncResult._buffer_all(self)
458 async def _buffer_all(self):
--> 459 await self._buffer()
File /opt/anaconda3/envs/grafiti/lib/python3.11/site-packages/neo4j/_async/work/result.py:448, in AsyncResult._buffer(self, n)
446 return
447 record_buffer = deque()
--> 448 async for record in self:
449 record_buffer.append(record)
450 if n is not None and len(record_buffer) >= n:
File /opt/anaconda3/envs/grafiti/lib/python3.11/site-packages/neo4j/_async/work/result.py:398, in AsyncResult.__aiter__(self)
396 yield self._record_buffer.popleft()
397 elif self._streaming:
--> 398 await self._connection.fetch_message()
399 elif self._discarding:
400 self._discard()
File /opt/anaconda3/envs/grafiti/lib/python3.11/site-packages/neo4j/_async/io/_common.py:195, in ConnectionErrorHandler.__getattr__.<locals>.outer_async.<locals>.inner(*args, **kwargs)
193 async def inner(*args, **kwargs):
194 try:
--> 195 await coroutine_func(*args, **kwargs)
196 except (
197 Neo4jError,
198 ServiceUnavailable,
199 SessionExpired,
200 asyncio.CancelledError,
201 ) as exc:
202 await AsyncUtil.callback(self.__on_error, exc)
File /opt/anaconda3/envs/grafiti/lib/python3.11/site-packages/neo4j/_async/io/_bolt.py:864, in AsyncBolt.fetch_message(self)
860 # Receive exactly one message
861 tag, fields = await self.inbox.pop(
862 hydration_hooks=self.responses[0].hydration_hooks
863 )
--> 864 res = await self._process_message(tag, fields)
865 self.idle_since = monotonic()
866 return res
File /opt/anaconda3/envs/grafiti/lib/python3.11/site-packages/neo4j/_async/io/_bolt5.py:500, in AsyncBolt5x0._process_message(self, tag, fields)
498 self._server_state_manager.state = self.bolt_states.FAILED
499 try:
--> 500 await response.on_failure(summary_metadata or {})
501 except (ServiceUnavailable, DatabaseUnavailable):
502 if self.pool:
File /opt/anaconda3/envs/grafiti/lib/python3.11/site-packages/neo4j/_async/io/_common.py:254, in Response.on_failure(self, metadata)
252 handler = self.handlers.get("on_summary")
253 await AsyncUtil.callback(handler)
--> 254 raise self._hydrate_error(metadata)
ClientError: {code: Neo.ClientError.Procedure.ProcedureCallFailed} {message: Failed to invoke procedure `db.index.fulltext.queryNodes`: Caused by: java.lang.IllegalArgumentException: There is no such fulltext schema index: node_name_and_summary}
lib version
graphiti-core 0.9.6
Python 3.11.11
Metadata
Metadata
Assignees
Labels
No labels