|
8 | 8 |
|
9 | 9 | """KernelCI API main module"""
|
10 | 10 |
|
| 11 | +from datetime import timedelta |
11 | 12 | from typing import List
|
12 | 13 | from fastapi import Depends, FastAPI, HTTPException, status, Request, Security
|
| 14 | +from fastapi.encoders import jsonable_encoder |
13 | 15 | from fastapi.security import (
|
14 | 16 | OAuth2PasswordRequestForm,
|
15 | 17 | SecurityScopes
|
@@ -186,6 +188,54 @@ async def get_root_node(node_id: str):
|
186 | 188 | return node
|
187 | 189 |
|
188 | 190 |
|
| 191 | +@app.get('/trigger_completed_event/{node_id}') |
| 192 | +async def trigger_completed_event(node_id: str, wait_time_hours: int = 0, |
| 193 | + wait_time_minutes: int = 0, |
| 194 | + wait_time_seconds: int = 0): |
| 195 | + """Trigger an event when all child nodes are completed of |
| 196 | + a given node""" |
| 197 | + try: |
| 198 | + nodes = await db.find_by_attributes(Node, |
| 199 | + {"parent": ObjectId(node_id)}) |
| 200 | + |
| 201 | + except errors.InvalidId as error: |
| 202 | + raise HTTPException( |
| 203 | + status_code=status.HTTP_400_BAD_REQUEST, |
| 204 | + detail=str(error) |
| 205 | + ) from error |
| 206 | + |
| 207 | + if not nodes: |
| 208 | + raise HTTPException( |
| 209 | + status_code=status.HTTP_400_BAD_REQUEST, |
| 210 | + detail=f"No child node found of a given node id:{node_id}" |
| 211 | + ) |
| 212 | + |
| 213 | + for node in nodes: |
| 214 | + if node.status == 'pending': |
| 215 | + timeout = node.created + timedelta(hours=wait_time_hours, |
| 216 | + minutes=wait_time_minutes, |
| 217 | + seconds=wait_time_seconds) |
| 218 | + await Node.wait_for_node(timeout) |
| 219 | + |
| 220 | + ret = node.set_timeout_status() |
| 221 | + if ret: |
| 222 | + try: |
| 223 | + await db.update(node) |
| 224 | + except ValueError as error: |
| 225 | + raise HTTPException( |
| 226 | + status_code=status.HTTP_400_BAD_REQUEST, |
| 227 | + detail=str(error) |
| 228 | + ) from error |
| 229 | + else: |
| 230 | + return {"message": "Nodes are not completed yet"} |
| 231 | + |
| 232 | + operation = 'completed' |
| 233 | + await pubsub.publish_cloudevent('node', {'op': operation, |
| 234 | + 'id': str(node_id), |
| 235 | + 'nodes': jsonable_encoder(nodes)}) |
| 236 | + return {"message": "'Event triggered"} |
| 237 | + |
| 238 | + |
189 | 239 | @app.post('/node', response_model=Node)
|
190 | 240 | async def post_node(node: Node, token: str = Depends(get_user)):
|
191 | 241 | """Create a new node"""
|
|
0 commit comments