-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathscript_fapi.py
71 lines (56 loc) · 2.18 KB
/
script_fapi.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
from fastapi import FastAPI
from pydantic import BaseModel
from threading import Thread
import uvicorn
import requests
import time
# Define the FastAPI app
app = FastAPI()
# Internal storage dictionary
internal_dict = {}
# Endpoint for processing complex JSON
@app.post("/process/")
async def process_item(data: dict):
global internal_dict
internal_dict = {} # Reset internal dictionary
element_count = 0
try:
# Nested loops to process the JSON
for level1_key, level1_value in data.items():
for level2_key, level2_value in level1_value.items():
for level3_key, level3_value in level2_value.items():
for level4_key, level4_value in level3_value.items():
for level5_key, level5_value in level4_value.items():
# Store data in the internal dictionary
internal_dict[f"{level1_key}.{level2_key}.{level3_key}.{level4_key}.{level5_key}"] = level5_value
element_count += 1
except Exception as e:
return {"error": f"An error occurred: {str(e)}"}
# Return the count of elements
return {"element_count": element_count}
# Function to generate a complex nested JSON
def generate_nested_json(levels=5, elements_per_level=5):
def create_level(depth):
if depth == 0:
return {f"key_{i}": f"value_{i}" for i in range(elements_per_level)}
else:
return {f"level_{depth}_key_{i}": create_level(depth - 1) for i in range(elements_per_level)}
return create_level(levels - 1)
# Function to run the FastAPI server
def run_server():
uvicorn.run(app, host="127.0.0.1", port=8000, log_level="warning")
# Start the server in a separate thread
server_thread = Thread(target=run_server, daemon=True)
server_thread.start()
# Allow some time for the server to start
time.sleep(1)
# Client sends complex JSON to the API
try:
url = "http://127.0.0.1:8000/process/"
nested_json = generate_nested_json()
response = requests.post(url, json=nested_json)
print("Response from server:", response.json())
except Exception as e:
print("Error:", e)
# Stop the server
print("Done.")