generated from FNNDSC/python-chrisapp-template
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathchrisClient.py
68 lines (62 loc) · 2.34 KB
/
chrisClient.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
### Python Chris Client Implementation ###
from base_client import BaseClient
from chrisclient import client
from chris_pacs_service import PACSClient
import json
import time
from loguru import logger
import sys
from pipeline import Pipeline
LOG = logger.debug
logger_format = (
"<green>{time:YYYY-MM-DD HH:mm:ss}</green> │ "
"<level>{level: <5}</level> │ "
"<yellow>{name: >28}</yellow>::"
"<cyan>{function: <30}</cyan> @"
"<cyan>{line: <4}</cyan> ║ "
"<level>{message}</level>"
)
logger.remove()
logger.add(sys.stderr, format=logger_format)
class ChrisClient(BaseClient):
def __init__(self, url: str, username: str, password: str):
self.cl = client.Client(url, username, password)
self.cl.pacs_series_url = f"{url}pacs/series/"
self.req = PACSClient(self.cl.pacs_series_url,username,password)
def create_con(self,params:dict):
return self.cl
def health_check(self):
return self.cl.get_chris_instance()
def pacs_pull(self):
pass
def pacs_push(self):
pass
def anonymize(self, params: dict, pv_id: int):
pipe = Pipeline(self.cl)
plugin_params = {
'PACS-query': {
"PACSurl": params["pull"]["url"],
"PACSname": params["pull"]["pacs"],
"PACSdirective": json.dumps(params["search"])
},
'PACS-retrieve': {
"PACSurl": params["pull"]["url"],
"PACSname": params["pull"]["pacs"],
"inputJSONfile": "search_results.json",
"copyInputFile": True
},
'verify-registration': {
"CUBEurl": self.cl.url,
"CUBEuser": self.cl.username,
"CUBEpassword": self.cl.password,
"inputJSONfile": "search_results.json",
"tagStruct": json.dumps(params["anon"]),
"orthancUrl": params["push"]["url"],
"orthancUsername": params["push"]["username"],
"orthancPassword": params["push"]["password"],
"PACSurl": params["pull"]["url"],
"PACSname": params["pull"]["pacs"],
"pushToRemote": params["push"]["aec"]
}
}
pipe.workflow_schedule(pv_id,"PACS query, retrieve, and registration verification in CUBE 20241217",plugin_params)