Skip to content

Commit bc1d7fb

Browse files
Track times for each model independently (#25)
Co-authored-by: Eric <[email protected]>
1 parent 05bc87c commit bc1d7fb

File tree

8 files changed

+328
-210
lines changed

8 files changed

+328
-210
lines changed

neurons/_validator/core/validator_loop.py

Lines changed: 43 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@
3030
from _validator.models.request_type import RequestType
3131
from _validator.utils.proof_of_weights import save_proof_of_weights
3232
from _validator.utils.uid import get_queryable_uids
33+
from utils import AutoUpdate, clean_temp_files, with_rate_limit
34+
from utils.gc_logging import log_responses as gc_log_responses
35+
from _validator.utils.logging import log_responses as console_log_responses
3336
from constants import (
3437
LOOP_DELAY_SECONDS,
3538
EXCEPTION_DELAY_SECONDS,
@@ -38,7 +41,6 @@
3841
FIVE_MINUTES,
3942
ONE_HOUR,
4043
)
41-
from utils import AutoUpdate, clean_temp_files, with_rate_limit
4244

4345

4446
class ValidatorLoop:
@@ -79,10 +81,10 @@ def __init__(self, config: ValidatorConfig):
7981
)
8082

8183
self.request_queue = asyncio.Queue()
82-
self.response_queue = asyncio.Queue()
8384
self.active_tasks: dict[int, asyncio.Task] = {}
8485
self.processed_uids: set[int] = set()
8586
self.queryable_uids: list[int] = []
87+
self.last_response_time = time.time()
8688

8789
self._should_run = True
8890

@@ -91,6 +93,8 @@ def __init__(self, config: ValidatorConfig):
9193
max_workers=16
9294
)
9395

96+
self.recent_responses: list[MinerResponse] = []
97+
9498
if self.config.bt_config.prometheus_monitoring:
9599
start_prometheus_logging(self.config.bt_config.prometheus_port)
96100

@@ -127,14 +131,11 @@ def log_health(self):
127131
bt.logging.info(
128132
f"In-flight requests: {len(self.active_tasks)} / {MAX_CONCURRENT_REQUESTS}"
129133
)
130-
bt.logging.info(
131-
f"Response queue size: {self.response_queue.qsize()} tasks pending processing"
132-
)
133134
bt.logging.debug(f"Processed UIDs: {len(self.processed_uids)}")
134135
bt.logging.debug(f"Queryable UIDs: {len(self.queryable_uids)}")
135136

136137
log_system_metrics()
137-
queue_size = self.response_queue.qsize()
138+
queue_size = self.request_queue.qsize()
138139
est_latency = (
139140
queue_size * (LOOP_DELAY_SECONDS / MAX_CONCURRENT_REQUESTS)
140141
if queue_size > 0
@@ -146,6 +147,39 @@ def update_processed_uids(self):
146147
if len(self.processed_uids) >= len(self.queryable_uids):
147148
self.processed_uids.clear()
148149

150+
@with_rate_limit(period=ONE_MINUTE)
151+
async def log_responses(self):
152+
if self.recent_responses:
153+
console_log_responses(self.recent_responses)
154+
155+
try:
156+
block = (
157+
self.config.metagraph.block.item()
158+
if self.config.metagraph.block is not None
159+
else 0
160+
)
161+
_ = await asyncio.get_event_loop().run_in_executor(
162+
self.thread_pool,
163+
lambda: gc_log_responses(
164+
self.config.metagraph,
165+
self.config.wallet.hotkey,
166+
self.config.user_uid,
167+
self.recent_responses,
168+
(
169+
time.time() - self.last_response_time
170+
if hasattr(self, "last_response_time")
171+
else 0
172+
),
173+
block,
174+
self.score_manager.scores,
175+
),
176+
)
177+
except Exception as e:
178+
bt.logging.error(f"Error in GC logging: {e}")
179+
180+
self.last_response_time = time.time()
181+
self.recent_responses = []
182+
149183
async def maintain_request_pool(self):
150184
while True:
151185
try:
@@ -195,9 +229,11 @@ async def run_periodic_tasks(self):
195229
self.update_queryable_uids()
196230
self.update_processed_uids()
197231
self.log_health()
232+
await self.log_responses()
198233
await asyncio.sleep(LOOP_DELAY_SECONDS)
199234
except Exception as e:
200235
bt.logging.error(f"Error in periodic tasks: {e}")
236+
traceback.print_exc()
201237
await asyncio.sleep(EXCEPTION_DELAY_SECONDS)
202238

203239
async def run(self) -> NoReturn:
@@ -212,7 +248,6 @@ async def run(self) -> NoReturn:
212248
await asyncio.gather(
213249
self.maintain_request_pool(),
214250
self.run_periodic_tasks(),
215-
# self.process_responses_worker(),
216251
)
217252
except KeyboardInterrupt:
218253
self._should_run = False
@@ -244,25 +279,6 @@ async def _process_single_request(self, request: Request) -> Request:
244279
log_error("request_processing", "axon_query", str(e))
245280
return request
246281

247-
async def process_responses_worker(self):
248-
while self._should_run:
249-
try:
250-
response = await self.response_queue.get()
251-
if asyncio.iscoroutine(response):
252-
response = await response
253-
processed_response = await asyncio.get_event_loop().run_in_executor(
254-
self.response_thread_pool,
255-
self.response_processor.process_single_response,
256-
response,
257-
)
258-
if processed_response:
259-
await self._handle_response(processed_response)
260-
self.response_queue.task_done()
261-
except Exception as e:
262-
bt.logging.error(f"Error in response worker: {e}")
263-
traceback.print_exc()
264-
await asyncio.sleep(EXCEPTION_DELAY_SECONDS)
265-
266282
async def _handle_response(self, response: MinerResponse) -> None:
267283
"""
268284
Handle a processed response, updating scores and weights as needed.
@@ -272,6 +288,7 @@ async def _handle_response(self, response: MinerResponse) -> None:
272288
"""
273289
try:
274290
request_hash = response.input_hash
291+
self.recent_responses.append(response)
275292
if response.request_type == RequestType.RWR:
276293
if response.verification_result:
277294
self.api.set_request_result(

neurons/_validator/models/miner_response.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ def to_log_dict(self, metagraph: bt.metagraph) -> dict: # type: ignore
175175
"response_duration": self.response_time,
176176
"is_verified": self.verification_result,
177177
"input_hash": self.input_hash,
178-
"request_type": self.request_type,
178+
"request_type": self.request_type.value,
179179
"error": self.error,
180180
"save": self.save,
181181
}

neurons/_validator/utils/logging.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from rich.console import Console, JustifyMethod
55
from rich.table import Table
66

7-
from deployment_layer import circuit_store
7+
from deployment_layer.circuit_store import circuit_store
88
import utils.wandb_logger as wandb_logger
99
from _validator.models.miner_response import MinerResponse
1010

neurons/execution_layer/circuit.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -208,15 +208,13 @@ def __init__(self, model_id: str, evaluation_store_path: str):
208208

209209
def update(self, item: CircuitEvaluationItem):
210210
"""Update evaluation data, maintaining size limit."""
211-
# Replace existing item with same UID if found
212211
for i, existing_item in enumerate(self.data):
213212
if existing_item.uid == item.uid:
214213
self.data[i] = item
215214
break
216215
else:
217216
self.data.append(item)
218217

219-
# Keep only most recent items
220218
if len(self.data) > MAX_EVALUATION_ITEMS:
221219
self.data = self.data[-MAX_EVALUATION_ITEMS:]
222220

0 commit comments

Comments
 (0)