Skip to content

Commit 1169cb6

Browse files
frankbriaclaude
andauthored
feat: Integrate QualityTracker into WorkerAgent workflow (#151)
- Add QualityTracker initialization to WorkerAgent with lazy loading - Record quality metrics after quality gates run in complete_task() - Check for quality degradation before allowing task completion - Create blockers when quality degrades >10% from peak - Add should_recommend_context_reset() method for proactive monitoring - Integrate quality stats into checkpoint metadata - Add 14 comprehensive integration tests This enables continuous quality tracking across AI sessions, detecting degradation patterns and recommending context resets when quality drops significantly. Co-authored-by: Claude <noreply@anthropic.com>
1 parent 3fa0e9c commit 1169cb6

File tree

4 files changed

+803
-1
lines changed

4 files changed

+803
-1
lines changed

codeframe/agents/worker_agent.py

Lines changed: 314 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
)
2222

2323
from codeframe.core.models import Task, AgentMaturity, ContextItemType, ContextTier, CallType
24+
from codeframe.enforcement.quality_tracker import QualityTracker, QualityMetrics
2425

2526
logger = logging.getLogger(__name__)
2627

@@ -80,6 +81,10 @@ def __init__(
8081
self._rate_limit = int(os.getenv("AGENT_RATE_LIMIT", "10")) # Max calls per minute
8182
self._rate_limit_lock = asyncio.Lock()
8283

84+
# Quality tracking integration
85+
self.response_count: int = 0 # Track AI conversation length
86+
self.quality_tracker: Optional[QualityTracker] = None # Lazy-initialized
87+
8388
def _get_project_id(self) -> int:
8489
"""Get project ID from current task.
8590
@@ -103,6 +108,51 @@ def _get_project_id(self) -> int:
103108

104109
return self.current_task.project_id
105110

111+
def _ensure_quality_tracker(self) -> Optional[QualityTracker]:
112+
"""Lazily initialize quality tracker when project context is available.
113+
114+
The quality tracker is initialized when:
115+
1. A database connection is available
116+
2. A current task with project_id is set
117+
3. The project has a workspace_path in the database
118+
119+
Returns:
120+
QualityTracker instance if initialization succeeds, None otherwise
121+
122+
Example:
123+
>>> agent = WorkerAgent(agent_id="backend-001", ...)
124+
>>> agent.current_task = task # Task with project_id
125+
>>> tracker = agent._ensure_quality_tracker()
126+
>>> if tracker:
127+
... tracker.record(metrics)
128+
"""
129+
if self.quality_tracker is not None:
130+
return self.quality_tracker
131+
132+
if not self.db or not self.current_task:
133+
return None
134+
135+
try:
136+
project_id = self._get_project_id()
137+
138+
# Get project workspace path from database
139+
cursor = self.db.conn.cursor()
140+
cursor.execute("SELECT workspace_path FROM projects WHERE id = ?", (project_id,))
141+
row = cursor.fetchone()
142+
143+
if not row or not row[0]:
144+
logger.debug(f"No workspace_path found for project {project_id}")
145+
return None
146+
147+
workspace_path = row[0]
148+
self.quality_tracker = QualityTracker(project_path=workspace_path)
149+
logger.debug(f"Initialized quality tracker for project {project_id}")
150+
return self.quality_tracker
151+
152+
except Exception as e:
153+
logger.warning(f"Failed to initialize quality tracker: {e}")
154+
return None
155+
106156
def _estimate_cost(self, model_name: str, input_tokens: int, max_output_tokens: int) -> float:
107157
"""Estimate maximum cost for an LLM call.
108158
@@ -429,6 +479,13 @@ async def execute_task(
429479
output_tokens=output_tokens,
430480
)
431481

482+
# Increment response count for quality tracking
483+
self.response_count += 1
484+
logger.debug(
485+
f"Agent {self.agent_id} response count: {self.response_count}",
486+
extra={"event": "response_count_increment", "count": self.response_count}
487+
)
488+
432489
return {
433490
"status": "completed",
434491
"output": content,
@@ -882,7 +939,30 @@ async def complete_task(self, task: Task, project_root: Optional[Any] = None) ->
882939

883940
quality_result = await quality_gates.run_all_gates(task)
884941

885-
# Step 2: Check if gates passed
942+
# Step 2: Record quality metrics for tracking
943+
await self._record_quality_metrics(quality_result, project_root)
944+
945+
# Step 3: Check for quality degradation
946+
degradation_result = self._check_quality_degradation()
947+
if degradation_result and degradation_result.get("has_degradation"):
948+
# Quality has degraded significantly - create blocker
949+
logger.warning(
950+
f"Task {task.id} blocked due to quality degradation: {degradation_result.get('issues')}"
951+
)
952+
953+
# Create blocker for degradation
954+
blocker_id = self._create_degradation_blocker(task, degradation_result)
955+
956+
return {
957+
"success": False,
958+
"status": "blocked",
959+
"quality_gate_result": quality_result,
960+
"blocker_id": blocker_id,
961+
"message": f"Quality degradation detected. {degradation_result.get('recommendation', 'Consider context reset.')}",
962+
"degradation": degradation_result,
963+
}
964+
965+
# Step 4: Check if gates passed
886966
if quality_result.passed:
887967
# All gates passed - mark task as completed
888968
cursor = self.db.conn.cursor()
@@ -994,3 +1074,236 @@ def _create_quality_blocker(self, task: Task, failures: List[Any]) -> int:
9941074
)
9951075

9961076
return blocker_id
1077+
1078+
# ========================================================================
1079+
# Quality Tracking Integration Methods
1080+
# ========================================================================
1081+
1082+
async def _record_quality_metrics(
1083+
self,
1084+
quality_result: Any,
1085+
project_root: Any,
1086+
) -> None:
1087+
"""Record quality metrics after quality gates run.
1088+
1089+
This method extracts metrics from quality gate results and records them
1090+
using the QualityTracker for trend analysis and degradation detection.
1091+
1092+
Args:
1093+
quality_result: QualityGateResult from quality gates
1094+
project_root: Project root path for language detection
1095+
1096+
Example:
1097+
>>> await agent._record_quality_metrics(quality_result, Path("/app"))
1098+
"""
1099+
from codeframe.enforcement.language_detector import LanguageDetector
1100+
1101+
tracker = self._ensure_quality_tracker()
1102+
if not tracker:
1103+
logger.debug("Quality tracker not available, skipping metrics recording")
1104+
return
1105+
1106+
try:
1107+
# Extract metrics from quality gate result
1108+
test_pass_rate = 100.0
1109+
coverage_percentage = 0.0
1110+
total_tests = 0
1111+
passed_tests = 0
1112+
failed_tests = 0
1113+
1114+
# Parse failures to extract test and coverage metrics
1115+
for failure in quality_result.failures:
1116+
gate_name = getattr(failure, "gate", None)
1117+
if gate_name:
1118+
gate_value = gate_name.value if hasattr(gate_name, "value") else str(gate_name)
1119+
1120+
if gate_value == "tests":
1121+
# Extract test counts from failure
1122+
reason = getattr(failure, "reason", "")
1123+
# Parse patterns like "3 tests failed" or "Pytest failed: 5 failed"
1124+
import re
1125+
failed_match = re.search(r"(\d+)\s*failed", reason)
1126+
if failed_match:
1127+
failed_tests = int(failed_match.group(1))
1128+
1129+
passed_match = re.search(r"(\d+)\s*passed", reason)
1130+
if passed_match:
1131+
passed_tests = int(passed_match.group(1))
1132+
1133+
total_tests = passed_tests + failed_tests
1134+
if total_tests > 0:
1135+
test_pass_rate = (passed_tests / total_tests) * 100
1136+
1137+
elif gate_value == "coverage":
1138+
# Extract coverage percentage from failure
1139+
reason = getattr(failure, "reason", "")
1140+
coverage_match = re.search(r"(\d+(?:\.\d+)?)\s*%", reason)
1141+
if coverage_match:
1142+
coverage_percentage = float(coverage_match.group(1))
1143+
1144+
# If no failures, assume perfect scores
1145+
if not quality_result.failures:
1146+
test_pass_rate = 100.0
1147+
# Try to get coverage from other sources if available
1148+
coverage_percentage = 100.0 # Assume passed coverage check
1149+
1150+
# Detect language for context
1151+
language = "unknown"
1152+
framework = None
1153+
try:
1154+
detector = LanguageDetector(str(project_root) if project_root else ".")
1155+
lang_info = detector.detect()
1156+
language = lang_info.language
1157+
framework = lang_info.framework
1158+
except Exception as e:
1159+
logger.debug(f"Language detection failed: {e}")
1160+
1161+
# Create and record metrics
1162+
metrics = QualityMetrics(
1163+
timestamp=datetime.now(timezone.utc).isoformat(),
1164+
response_count=self.response_count,
1165+
test_pass_rate=test_pass_rate,
1166+
coverage_percentage=coverage_percentage,
1167+
total_tests=total_tests,
1168+
passed_tests=passed_tests,
1169+
failed_tests=failed_tests,
1170+
language=language,
1171+
framework=framework,
1172+
)
1173+
1174+
tracker.record(metrics)
1175+
logger.info(
1176+
f"Quality metrics recorded: pass_rate={test_pass_rate:.1f}%, "
1177+
f"coverage={coverage_percentage:.1f}%, response_count={self.response_count}"
1178+
)
1179+
1180+
except Exception as e:
1181+
# Don't fail task completion if metrics recording fails
1182+
logger.warning(f"Failed to record quality metrics: {e}")
1183+
1184+
def _check_quality_degradation(self, threshold_percent: float = 10.0) -> Optional[Dict[str, Any]]:
1185+
"""Check if quality has degraded significantly.
1186+
1187+
Args:
1188+
threshold_percent: Degradation threshold (default: 10%)
1189+
1190+
Returns:
1191+
Dict with degradation info if degraded, None otherwise
1192+
1193+
Example:
1194+
>>> result = agent._check_quality_degradation()
1195+
>>> if result and result.get("has_degradation"):
1196+
... print("Quality degraded!")
1197+
"""
1198+
tracker = self._ensure_quality_tracker()
1199+
if not tracker:
1200+
return None
1201+
1202+
try:
1203+
return tracker.check_degradation(threshold_percent=threshold_percent)
1204+
except Exception as e:
1205+
logger.warning(f"Failed to check quality degradation: {e}")
1206+
return None
1207+
1208+
def _create_degradation_blocker(self, task: Task, degradation: Dict[str, Any]) -> int:
1209+
"""Create a SYNC blocker for quality degradation.
1210+
1211+
Args:
1212+
task: Task that triggered degradation check
1213+
degradation: Degradation result from QualityTracker
1214+
1215+
Returns:
1216+
int: ID of the created blocker
1217+
1218+
Example:
1219+
>>> blocker_id = agent._create_degradation_blocker(task, degradation)
1220+
"""
1221+
from codeframe.core.models import BlockerType
1222+
1223+
# Format degradation info into blocker question
1224+
question_parts = [
1225+
f"Quality degradation detected for task #{task.task_number} ({task.title}):",
1226+
"",
1227+
"Issues found:",
1228+
]
1229+
1230+
issues = degradation.get("issues", [])
1231+
for i, issue in enumerate(issues[:5], 1):
1232+
question_parts.append(f" {i}. {issue}")
1233+
1234+
question_parts.extend([
1235+
"",
1236+
f"Recommendation: {degradation.get('recommendation', 'Consider context reset.')}",
1237+
"",
1238+
"Options:",
1239+
" 1. Reset context and continue with fresh conversation",
1240+
" 2. Investigate and fix quality issues",
1241+
" 3. Type 'continue' to proceed anyway (not recommended)",
1242+
])
1243+
1244+
question = "\n".join(question_parts)
1245+
1246+
# Get project_id from task
1247+
project_id = task.project_id if task.project_id else self._get_project_id()
1248+
1249+
# Create SYNC blocker
1250+
blocker_id = self.db.create_blocker(
1251+
agent_id=self.agent_id,
1252+
project_id=project_id,
1253+
task_id=task.id,
1254+
blocker_type=BlockerType.SYNC,
1255+
question=question,
1256+
)
1257+
1258+
logger.info(f"Created degradation blocker {blocker_id} for task {task.id}")
1259+
return blocker_id
1260+
1261+
async def should_recommend_context_reset(self, max_responses: int = 20) -> Dict[str, Any]:
1262+
"""Check if context reset should be recommended.
1263+
1264+
This method checks both response count and quality degradation to determine
1265+
if the conversation context should be reset. Use this before starting new
1266+
tasks or periodically during long-running sessions.
1267+
1268+
Args:
1269+
max_responses: Maximum responses before reset is recommended (default: 20)
1270+
1271+
Returns:
1272+
Dict with:
1273+
- should_reset: bool
1274+
- reasons: List[str]
1275+
- recommendation: str
1276+
1277+
Example:
1278+
>>> result = await agent.should_recommend_context_reset()
1279+
>>> if result["should_reset"]:
1280+
... print(f"Reset recommended: {result['reasons']}")
1281+
"""
1282+
tracker = self._ensure_quality_tracker()
1283+
if not tracker:
1284+
# Fallback to just response count check
1285+
if self.response_count >= max_responses:
1286+
return {
1287+
"should_reset": True,
1288+
"reasons": [f"Response count ({self.response_count}) exceeds maximum ({max_responses})"],
1289+
"recommendation": "Context reset recommended due to conversation length",
1290+
}
1291+
return {
1292+
"should_reset": False,
1293+
"reasons": [],
1294+
"recommendation": "Context can continue",
1295+
}
1296+
1297+
try:
1298+
return tracker.should_reset_context(
1299+
response_count=self.response_count,
1300+
max_responses=max_responses,
1301+
check_degradation=True,
1302+
)
1303+
except Exception as e:
1304+
logger.warning(f"Failed to check context reset recommendation: {e}")
1305+
return {
1306+
"should_reset": False,
1307+
"reasons": [],
1308+
"recommendation": f"Check failed: {e}",
1309+
}

codeframe/core/models.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -934,6 +934,9 @@ class CheckpointMetadata(BaseModel):
934934
last_task_completed: Optional[str] = None
935935
context_items_count: int
936936
total_cost_usd: float
937+
# Quality tracking fields (optional for backward compatibility)
938+
quality_stats: Optional[Dict[str, Any]] = None # Current quality metrics from QualityTracker
939+
quality_trend: Optional[str] = None # "improving", "stable", or "declining"
937940

938941

939942
class Checkpoint(BaseModel):

codeframe/lib/checkpoint_manager.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -422,6 +422,26 @@ def _generate_metadata(self) -> CheckpointMetadata:
422422
except Exception:
423423
total_cost_usd = 0.0
424424

425+
# Get quality stats from QualityTracker
426+
quality_stats = None
427+
quality_trend = None
428+
try:
429+
from codeframe.enforcement.quality_tracker import QualityTracker
430+
431+
tracker = QualityTracker(project_path=str(self.project_root))
432+
stats = tracker.get_stats()
433+
434+
if stats.get("has_data"):
435+
quality_stats = {
436+
"current": stats.get("current"),
437+
"peak": stats.get("peak"),
438+
"average": stats.get("average"),
439+
"total_checkpoints": stats.get("total_checkpoints"),
440+
}
441+
quality_trend = stats.get("trend", "insufficient_data")
442+
except Exception as e:
443+
logger.debug(f"Failed to get quality stats for checkpoint: {e}")
444+
425445
return CheckpointMetadata(
426446
project_id=self.project_id,
427447
phase=phase,
@@ -431,6 +451,8 @@ def _generate_metadata(self) -> CheckpointMetadata:
431451
last_task_completed=last_task_completed,
432452
context_items_count=context_items_count,
433453
total_cost_usd=total_cost_usd,
454+
quality_stats=quality_stats,
455+
quality_trend=quality_trend,
434456
)
435457

436458
def _validate_path_safety(self, file_path: Path) -> bool:

0 commit comments

Comments
 (0)