-
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathqueue_manager.py
executable file
·336 lines (288 loc) · 15.5 KB
/
queue_manager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
import logging
from collections import OrderedDict
from datetime import datetime
from typing import Dict, Any, List
from database import update_media_item_state, get_media_item_by_id
from queues.wanted_queue import WantedQueue
from queues.scraping_queue import ScrapingQueue
from queues.adding_queue import AddingQueue
from queues.checking_queue import CheckingQueue
from queues.sleeping_queue import SleepingQueue
from queues.unreleased_queue import UnreleasedQueue
from queues.blacklisted_queue import BlacklistedQueue
from queues.pending_uncached_queue import PendingUncachedQueue
from queues.upgrading_queue import UpgradingQueue
from wake_count_manager import wake_count_manager
class QueueManager:
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super(QueueManager, cls).__new__(cls)
cls._instance.initialize()
return cls._instance
def initialize(self):
self.queues = {
"Wanted": WantedQueue(),
"Scraping": ScrapingQueue(),
"Adding": AddingQueue(),
"Checking": CheckingQueue(),
"Sleeping": SleepingQueue(),
"Unreleased": UnreleasedQueue(),
"Blacklisted": BlacklistedQueue(),
"Pending Uncached": PendingUncachedQueue(),
"Upgrading": UpgradingQueue()
}
self.paused = False
def reinitialize_queues(self):
"""Force reinitialization of all queues to pick up new settings"""
self.initialize()
def update_all_queues(self):
for queue_name, queue in self.queues.items():
before_count = len(queue.get_contents())
queue.update()
after_count = len(queue.get_contents())
# logging.debug(f"Queue {queue_name} update: {before_count} -> {after_count} items")
def get_queue_contents(self):
contents = OrderedDict()
for state, queue in self.queues.items():
contents[state] = queue.get_contents()
return contents
@staticmethod
def generate_identifier(item: Dict[str, Any]) -> str:
if item['type'] == 'movie':
return f"movie_{item.get('title', 'Unknown')}_{item.get('imdb_id', 'Unknown')}_{item.get('version', 'Unknown')}"
elif item['type'] == 'episode':
season = f"S{item.get('season_number', 0):02d}" if item.get('season_number') is not None else "S00"
episode = f"E{item.get('episode_number', 0):02d}" if item.get('episode_number') is not None else "E00"
return f"episode_{item.get('title', 'Unknown')}_{item.get('imdb_id', 'Unknown')}_{season}{episode}_{item.get('version', 'Unknown')}"
else:
raise ValueError(f"Unknown item type: {item['type']}")
def get_item_queue(self, item: Dict[str, Any]) -> str:
for queue_name, queue in self.queues.items():
if any(i['id'] == item['id'] for i in queue.get_contents()):
return queue_name
return None # or raise an exception if the item should always be in a queue
def process_checking(self):
if not self.paused:
self.queues["Checking"].process(self)
self.queues["Checking"].clean_up_checking_times()
# else:
# logging.debug("Skipping Checking queue processing: Queue is paused")
def process_wanted(self):
if not self.paused:
# logging.debug("Processing Wanted queue")
queue_contents = self.queues["Wanted"].get_contents()
# logging.debug(f"Wanted queue contains {len(queue_contents)} items")
# if queue_contents:
# for item in queue_contents:
# logging.debug(f"Processing Wanted item: {self.generate_identifier(item)}")
self.queues["Wanted"].process(self)
# else:
# logging.debug("Skipping Wanted queue processing: Queue is paused")
def process_scraping(self):
if not self.paused:
# logging.debug("Processing Scraping queue")
# Update queue before processing
self.queues["Scraping"].update()
queue_contents = self.queues["Scraping"].get_contents()
# logging.info(f"Scraping queue contains {len(queue_contents)} items after update")
if queue_contents:
for item in queue_contents:
logging.debug(f"Scraping queue item: {self.generate_identifier(item)}")
result = self.queues["Scraping"].process(self)
logging.info(f"Scraping queue process result: {result}")
return result
else:
logging.debug("Skipping Scraping queue processing: Queue is paused")
return False
def process_adding(self):
if not self.paused:
self.queues["Adding"].process(self)
else:
logging.debug("Skipping Adding queue processing: Queue is paused")
def process_unreleased(self):
if not self.paused:
self.queues["Unreleased"].process(self)
else:
logging.debug("Skipping Unreleased queue processing: Queue is paused")
def process_sleeping(self):
if not self.paused:
self.queues["Sleeping"].process(self)
else:
logging.debug("Skipping Sleeping queue processing: Queue is paused")
def process_blacklisted(self):
if not self.paused:
self.queues["Blacklisted"].process(self)
else:
logging.debug("Skipping Blacklisted queue processing: Queue is paused")
def process_pending_uncached(self):
if not self.paused:
self.queues["Pending Uncached"].process(self)
else:
logging.debug("Skipping Pending Uncached queue processing: Queue is paused")
def process_upgrading(self):
if not self.paused:
self.queues["Upgrading"].process(self)
self.queues["Upgrading"].clean_up_upgrade_times()
else:
logging.debug("Skipping Upgrading queue processing: Queue is paused")
def blacklist_item(self, item: Dict[str, Any], from_queue: str):
self.queues["Blacklisted"].blacklist_item(item, self)
self.queues[from_queue].remove_item(item)
def blacklist_old_season_items(self, item: Dict[str, Any], from_queue: str):
self.queues["Blacklisted"].blacklist_old_season_items(item, self)
self.queues[from_queue].remove_item(item)
def move_to_wanted(self, item: Dict[str, Any], from_queue: str):
item_identifier = self.generate_identifier(item)
logging.info(f"Moving item {item_identifier} to Wanted queue")
wake_count = wake_count_manager.get_wake_count(item['id'])
logging.debug(f"Wake count before moving to Wanted: {wake_count}")
update_media_item_state(item['id'], 'Wanted', filled_by_title=None, filled_by_magnet=None)
wanted_item = get_media_item_by_id(item['id'])
if wanted_item:
wanted_item_identifier = self.generate_identifier(wanted_item)
self.queues["Wanted"].add_item(wanted_item)
self.queues[from_queue].remove_item(item)
logging.debug(f"Successfully moved item {item_identifier} to Wanted queue")
else:
logging.error(f"Failed to retrieve wanted item for ID: {item['id']}")
def move_to_upgrading(self, item: Dict[str, Any], from_queue: str):
item_identifier = self.generate_identifier(item)
logging.debug(f"Moving item to Upgrading: {item_identifier}")
update_media_item_state(item['id'], 'Upgrading')
updated_item = get_media_item_by_id(item['id'])
if updated_item:
self.queues["Upgrading"].add_item(updated_item)
self.queues[from_queue].remove_item(item)
logging.info(f"Moved item {item_identifier} to Upgrading queue")
def move_to_scraping(self, item: Dict[str, Any], from_queue: str):
item_identifier = self.generate_identifier(item)
logging.debug(f"Moving item to Scraping: {item_identifier}")
update_media_item_state(item['id'], 'Scraping')
updated_item = get_media_item_by_id(item['id'])
if updated_item:
self.queues["Scraping"].add_item(updated_item)
self.queues[from_queue].remove_item(item)
logging.info(f"Moved item {item_identifier} to Scraping queue")
else:
logging.error(f"Failed to retrieve updated item for ID: {item['id']}")
def move_to_adding(self, item: Dict[str, Any], from_queue: str, filled_by_title: str, scrape_results: List[Dict]):
item_identifier = self.generate_identifier(item)
logging.debug(f"Moving item to Adding: {item_identifier}")
update_media_item_state(item['id'], 'Adding', filled_by_title=filled_by_title, scrape_results=scrape_results)
updated_item = get_media_item_by_id(item['id'])
if updated_item:
self.queues["Adding"].add_item(updated_item)
# Remove the item from the Scraping queue, but not from the Wanted queue
if from_queue == "Scraping":
self.queues[from_queue].remove_item(item)
logging.info(f"Moved item {item_identifier} to Adding queue")
else:
logging.error(f"Failed to retrieve updated item for ID: {item['id']}")
def move_to_checking(self, item: Dict[str, Any], from_queue: str, title: str, link: str, filled_by_file: str, torrent_id: str = None):
item_identifier = self.generate_identifier(item)
logging.debug(f"Moving item to Checking: {item_identifier}")
update_media_item_state(item['id'], 'Checking', filled_by_title=title, filled_by_magnet=link, filled_by_file=filled_by_file, filled_by_torrent_id=torrent_id)
updated_item = get_media_item_by_id(item['id'])
if updated_item:
# Copy downloading flag from original item
if 'downloading' in item:
updated_item['downloading'] = item['downloading']
self.queues["Checking"].add_item(updated_item)
# Remove the item from the Adding queue and the Wanted queue
if from_queue in ["Adding", "Wanted"]:
self.queues[from_queue].remove_item(item)
logging.info(f"Moved item {item_identifier} to Checking queue")
else:
logging.error(f"Failed to retrieve updated item for ID: {item['id']}")
def move_to_sleeping(self, item: Dict[str, Any], from_queue: str):
item_identifier = self.generate_identifier(item)
logging.info(f"Moving item {item_identifier} to Sleeping queue")
wake_count = wake_count_manager.get_wake_count(item['id'])
logging.debug(f"Wake count before moving to Sleeping: {wake_count}")
update_media_item_state(item['id'], 'Sleeping')
updated_item = get_media_item_by_id(item['id'])
if updated_item:
updated_item['wake_count'] = wake_count
self.queues["Sleeping"].add_item(updated_item)
self.queues[from_queue].remove_item(item)
logging.debug(f"Successfully moved item {item_identifier} to Sleeping queue (Wake count: {wake_count})")
else:
logging.error(f"Failed to retrieve updated item for ID: {item['id']}")
def move_to_unreleased(self, item: Dict[str, Any], from_queue: str):
item_identifier = self.generate_identifier(item)
logging.info(f"Moving item {item_identifier} to Unreleased queue")
update_media_item_state(item['id'], 'Unreleased')
updated_item = get_media_item_by_id(item['id'])
if updated_item:
self.queues["Unreleased"].add_item(updated_item)
self.queues[from_queue].remove_item(item)
logging.debug(f"Successfully moved item {item_identifier} to Unreleased queue")
else:
logging.error(f"Failed to retrieve updated item for ID: {item['id']}")
def move_to_blacklisted(self, item: Dict[str, Any], from_queue: str):
item_identifier = self.generate_identifier(item)
logging.info(f"Moving item {item_identifier} to Blacklisted queue")
update_media_item_state(item['id'], 'Blacklisted')
updated_item = get_media_item_by_id(item['id'])
if updated_item:
self.queues["Blacklisted"].add_item(updated_item)
self.queues[from_queue].remove_item(item)
logging.debug(f"Successfully moved item {item_identifier} to Blacklisted queue")
else:
logging.error(f"Failed to retrieve updated item for ID: {item['id']}")
def move_to_pending_uncached(self, item: Dict[str, Any], from_queue: str, title: str, link: str, scrape_results: List[Dict]):
item_identifier = self.generate_identifier(item)
logging.info(f"Moving item {item_identifier} to Pending Uncached Additions queue")
update_media_item_state(item['id'], 'Pending Uncached', filled_by_title=title, filled_by_magnet=link, scrape_results=scrape_results)
updated_item = get_media_item_by_id(item['id'])
if updated_item:
self.queues["Pending Uncached"].add_item(updated_item)
self.queues[from_queue].remove_item(item)
logging.debug(f"Successfully moved item {item_identifier} to Pending Uncached Additions queue")
else:
logging.error(f"Failed to retrieve updated item for ID: {item['id']}")
def get_scraping_items(self) -> List[Dict]:
"""Get all items currently in the Scraping state"""
return self.queues["Scraping"].get_contents()
def get_wake_count(self, item_id):
return wake_count_manager.get_wake_count(item_id)
def pause_queue(self):
if not self.paused:
self.paused = True
logging.info("Queue processing paused")
else:
logging.warning("Queue is already paused")
def resume_queue(self):
if self.paused:
self.paused = False
logging.info("Queue processing resumed")
else:
logging.warning("Queue is not paused")
def is_paused(self):
return self.paused
def move_to_collected(self, item: Dict[str, Any], from_queue: str, skip_notification: bool = False):
"""Move an item to the Collected state after symlink is created."""
item_identifier = self.generate_identifier(item)
logging.info(f"Moving item {item_identifier} to Collected state")
from datetime import datetime
collected_at = datetime.now()
# Update the item state in the database
from database import update_media_item_state, get_media_item_by_id
update_media_item_state(item['id'], 'Collected', collected_at=collected_at)
# Get the updated item
updated_item = get_media_item_by_id(item['id'])
if updated_item:
# Remove from the source queue
self.queues[from_queue].remove_item(item)
logging.info(f"Successfully moved item {item_identifier} to Collected state")
# Add to collected notifications if not skipped
if not skip_notification:
from database.collected_items import add_to_collected_notifications
updated_item_dict = dict(updated_item)
updated_item_dict['is_upgrade'] = False # Not an upgrade since it's a new collection
updated_item_dict['original_collected_at'] = collected_at
add_to_collected_notifications(updated_item_dict)
else:
logging.error(f"Failed to retrieve updated item for ID: {item['id']}")
# Add other methods as needed