-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathutils.py
More file actions
141 lines (103 loc) · 4.36 KB
/
utils.py
File metadata and controls
141 lines (103 loc) · 4.36 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# -*- coding: utf-8 -*-
import json
import sys
import requests
import configuration
from models import session, SyncLog
REDMINE_HOSTNAME = getattr(configuration, 'REDMINE_HOSTNAME', None)
REDMINE_API_KEY = getattr(configuration, 'REDMINE_API_KEY', None)
REDMINE_DEFAULT_ACTIVITY = getattr(configuration, 'REDMINE_DEFAULT_ACTIVITY', 'Development')
MERGE_TASKS = getattr(configuration, 'MERGE_TASKS', True)
def _get_data(endpoint):
"""Performing GET request to Redmine API."""
headers = {'X-Redmine-API-Key': REDMINE_API_KEY}
r = requests.get(REDMINE_HOSTNAME + endpoint, headers=headers)
return r
def _task_exists(task_id):
"""
Checking if corresponding task exists in Redmine,
before performing synchronization.
"""
r = _get_data('/issues/%s.json' % task_id)
if r.status_code == 200:
return True
return False
def _fact_synced(task_id, fact_id):
"""Checking if current fact was synchronized per logs."""
if session.query(SyncLog).filter_by(fact_id=fact_id,
task_id=task_id).count() == 0:
return False
return True
def _facts_synced(task_id, fact_ids):
"""
Merged task might have several corresponding tasks,
so checking if they all were synchronized.
"""
if not all([_fact_synced(task_id, f) for f in fact_ids]):
return False
return True
def _get_time_entry_activities():
"""Fetching Redmine activities."""
return _get_data('/enumerations/time_entry_activities.json').json()
def _get_time_entry_activity():
"""Determining default activity ID."""
activities = _get_time_entry_activities()['time_entry_activities']
for a in activities:
if a['name'] == REDMINE_DEFAULT_ACTIVITY:
return a['id']
return
def _log_sync(task_id, fact_ids):
"""Adding entry to sychronization log model."""
entries = [SyncLog(fact_id=fact_id, task_id=task_id) for fact_id in fact_ids]
session.add_all(entries)
session.commit()
def _sync_task(task_id, fact_ids, activity_id, duration, date):
"""Synchronizing fact by adding new time entry though Redmine API."""
headers = {'X-Redmine-API-Key': REDMINE_API_KEY, 'content-type': 'application/json'}
data = {'time_entry': {
'issue_id': int(task_id),
'spent_on': date.isoformat(),
'hours': duration,
'activity_id': activity_id,
}
}
if not _facts_synced(task_id, fact_ids):
sys.stdout.write('Logging task %s.\n' % task_id)
r = requests.post(REDMINE_HOSTNAME + '/time_entries.json', headers=headers, data=json.dumps(data))
if r.status_code == 201:
_log_sync(task_id, fact_ids)
def _merge_tasks(tasks):
"""Aggregating tasks data."""
for k, v in tasks.items():
task_data = []
task_ids = set([t['task_id'] for t in v])
for task_id in task_ids:
duration = 0
fact_ids = []
for e in v:
if e['task_id'] == task_id:
duration += round(e['duration'], 1) # summarize all time, spent on a same task
fact_ids.append(e['fact_id'])
task_data.append({'fact_ids': fact_ids, 'duration': round(duration, 1), 'task_id': task_id})
tasks[k] = task_data
return tasks
def synchronize_tasks(tasks):
"""Performing synchronization for given bunch of facts."""
if MERGE_TASKS:
tasks = _merge_tasks(tasks)
activity_id = _get_time_entry_activity()
for k, v in sorted(tasks.items()):
for e in v:
if _task_exists(e['task_id']):
_sync_task(e['task_id'], e['fact_ids'], activity_id, e['duration'], k)
else:
sys.stderr.write('Task %s does not exist in Redmine.\n' % e['task_id'])
def check_tasks(tasks):
"""Checking if given facts are already synchronized."""
for k, v in sorted(tasks.items()):
sys.stdout.write('Tasks logged on: %s\n' % k.isoformat())
for task in v:
if not _fact_synced(task['task_id'], task['fact_id']):
sys.stdout.write('Task %s, fact id %s was not synchronized.\n' % (task['task_id'], task['fact_id']))
else:
sys.stdout.write('Task %s, fact id %s was synchronized.\n' % (task['task_id'], task['fact_id']))