Skip to content

Commit

Permalink
fix: CachingRegistry threading cache refresh (#179)
Browse files Browse the repository at this point in the history
* add logging to CachingRegistry

* chore: formatting

* fix: add more logs to find what line is bringing down thread

* fix: delete added coverage files

* fix: switch to time.sleep in refresh thread

* fix: remove verbose logging and add exception handling
  • Loading branch information
zabarn authored Feb 28, 2025
1 parent a6881dc commit 38689ff
Showing 1 changed file with 26 additions and 6 deletions.
32 changes: 26 additions & 6 deletions sdk/python/feast/infra/registry/caching_registry.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import atexit
import logging
import threading
import time
import warnings
from abc import abstractmethod
from datetime import timedelta
Expand Down Expand Up @@ -37,6 +38,8 @@ def __init__(self, project: str, cache_ttl_seconds: int, cache_mode: str):
)
self.cached_registry_proto = self.proto()
self.cached_registry_proto_created = _utc_now()
self._stop_event = threading.Event()
logger.info(f"Registry initialized with cache mode: {cache_mode}")
if cache_mode == "thread":
self._start_thread_async_refresh(cache_ttl_seconds)
atexit.register(self._exit_handler)
Expand Down Expand Up @@ -458,12 +461,29 @@ def _refresh_cached_registry_if_necessary(self):
def _start_thread_async_refresh(self, cache_ttl_seconds):
self.refresh()
if cache_ttl_seconds <= 0:
logger.info("Registry cache refresh thread not started as TTL is 0")
return
self.registry_refresh_thread = threading.Timer(
cache_ttl_seconds, self._start_thread_async_refresh, [cache_ttl_seconds]
)
self.registry_refresh_thread.daemon = True
self.registry_refresh_thread.start()

def refresh_loop():
while not self._stop_event.is_set():
try:
time.sleep(cache_ttl_seconds)
if not self._stop_event.is_set():
self.refresh()
except Exception as e:
logger.exception("Exception in refresh_loop: %s", e)

try:
self.registry_refresh_thread = threading.Thread(
target=refresh_loop, daemon=True
)
self.registry_refresh_thread.start()
logger.info(
f"Registry cache refresh thread started with TTL {cache_ttl_seconds}"
)
except Exception as e:
logger.exception("Failed to start registry refresh thread: %s", e)

def _exit_handler(self):
self.registry_refresh_thread.cancel()
logger.info("Exiting, setting stop event for registry cache refresh thread")
self._stop_event.set()

0 comments on commit 38689ff

Please sign in to comment.