Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Quick cleanup #2

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 16 additions & 14 deletions DrQueue/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import time
import pickle
import datetime
import logging
from IPython.parallel import Client as IPClient
from IPython.parallel.util import unpack_apply_message
from IPython.parallel import dependent
Expand All @@ -23,6 +24,9 @@
from .computer import Computer as DrQueueComputer


log = logging.getLogger(__name__)


class Client():
"""DrQueue client actions"""
def __init__(self):
Expand All @@ -46,7 +50,6 @@ def job_run(self, job):
# check job name
if job['name'] in DrQueueJob.query_jobnames():
raise ValueError("Job name %s is already used!" % job['name'])
return False

# save job in database
job_id = DrQueueJob.store_db(job)
Expand All @@ -72,21 +75,20 @@ def job_run(self, job):
# check frame numbers
if not (job['startframe'] >= 1):
raise ValueError("Invalid value for startframe. Has to be equal or greater than 1.")
return False

if not (job['endframe'] >= 1):
raise ValueError("Invalid value for endframe. Has to be equal or greater than 1.")
return False

if not (job['endframe'] >= job['startframe']):
raise ValueError("Invalid value for endframe. Has be to equal or greater than startframe.")
return False

if job['endframe'] > job['startframe']:
if not (job['endframe'] - job['startframe'] >= job['blocksize']):
raise ValueError("Invalid value for blocksize. Has to be equal or lower than endframe-startframe.")
return False

if job['endframe'] == job['startframe']:
if job['blocksize'] != 1:
raise ValueError("Invalid value for blocksize. Has to be equal 1 if endframe equals startframe.")
return False

task_frames = list(range(job['startframe'], job['endframe'] + 1, job['blocksize']))
ar = None
Expand Down Expand Up @@ -242,22 +244,22 @@ def identify_computer(self, engine_id, cache_time, timeout=15):
now = int(time.time())
# check existence and age of info
if (engine != None) and (now <= engine['created_at'] + cache_time):
print("DEBUG: Engine %i was found in DB and info is up-to-date." % engine_id)
log.DEBUG("Engine %i was found in DB and info is up-to-date." % engine_id)
return engine
# store new info
else:
if engine != None:
print("DEBUG: Engine %i was found in DB, but info needs to be updated." % engine_id)
log.DEBUG("Engine %i was found in DB, but info needs to be updated." % engine_id)
else:
print("DEBUG: Engine %i was not found in DB." % engine_id)
log.DEBUG("Engine %i was not found in DB." % engine_id)
# run command only on specific computer
try:
dview = self.ip_client[engine_id]
except IndexError:
print("DEBUG: Engine with id %i unknown." % engine_id)
log.DEBUG("Engine with id %i unknown." % engine_id)
# delete old entry from database
DrQueueComputer.delete_from_db_by_engine_id(engine_id)
print("DEBUG: Engine with id %i deleted from database." % engine_id)
log.DEBUG("Engine with id %i deleted from database." % engine_id)
new_engine = None
else:
# run command in async mode
Expand All @@ -269,10 +271,10 @@ def identify_computer(self, engine_id, cache_time, timeout=15):
ar.get(timeout)
except Exception:
if engine != None:
print("DEBUG: Update request for engine %i timed out. Using old information from DB." % engine_id)
log.DEBUG("Update request for engine %i timed out. Using old information from DB." % engine_id)
new_engine = engine
else:
print("DEBUG: Information request for engine %i timed out." % engine_id)
log.DEBUG("Information request for engine %i timed out." % engine_id)
new_engine = None
else:
# get computer dict from engine namespace
Expand All @@ -298,7 +300,7 @@ def computer_set_pools(self, computer, pool_list):
# update database entry
computer['pools'] = pool_list
DrQueueComputer.store_db(computer)
print("DEBUG: Engine " + str(computer['engine_id']) + " added to pools " + pool_str + ".")
log.DEBUG("Engine " + str(computer['engine_id']) + " added to pools " + pool_str + ".")
return computer


Expand Down
81 changes: 34 additions & 47 deletions DrQueue/computer_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,39 @@
"""

import os
import getpass
import logging


log = logging.getLogger(__name__)


try:
import pymongo
import bson
except ImportError as err:
log.debug("Can't import pymongo/bson: %s" % err)
pymongo = bson = None



def get_queue_pools():
if pymongo is None:
raise RuntimeError("pymongo is needed, please install it!")

connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB'))
db = connection['ipythondb']
pools = db['drqueue_pools']
return pools


class ComputerPool(dict):
"""Subclass of dict for collecting Pool attribute values."""
def __init__(self, name, engine_names=[]):

def __init__(self, name, engine_names=None):
dict.__init__(self)

if type(engine_names).__name__ != 'list':
if isinstance(engine_names, list):
raise ValueError("argument is not of type list")
return False

# mandatory elements
pool = {
Expand All @@ -29,101 +51,66 @@ def __init__(self, name, engine_names=[]):
}
self.update(pool)


@staticmethod
def store_db(pool):
import pymongo
"""store pool information in MongoDB"""
connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB'))
db = connection['ipythondb']
pools = db['drqueue_pools']
pools = get_queue_pools()
pool_id = pools.insert(pool)
pool['_id'] = str(pool['_id'])
return pool_id


@staticmethod
def update_db(pool):
import pymongo
"""update pool information in MongoDB"""
connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB'))
db = connection['ipythondb']
pools = db['drqueue_pools']
pools = get_queue_pools()
pool_id = pools.save(pool)
pool['_id'] = str(pool['_id'])
return pool_id


@staticmethod
def query_db(pool_id):
import pymongo
import bson
"""query pool information from MongoDB"""
connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB'))
db = connection['ipythondb']
pools = db['drqueue_pools']
pools = get_queue_pools()
pool = pools.find_one({"_id": bson.ObjectId(pool_id)})
return pool


@staticmethod
def delete_from_db(pool_id):
import pymongo
import bson
"""delete pool information from MongoDB"""
connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB'))
db = connection['ipythondb']
pools = db['drqueue_pools']
pools = get_queue_pools()
return pools.remove({"_id": bson.ObjectId(pool_id)})


@staticmethod
def query_poolnames():
import pymongo
"""query pool names from MongoDB"""
connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB'))
db = connection['ipythondb']
pools = db['drqueue_pools']
pools = get_queue_pools()
names = []
for pool in pools.find():
names.append(pool['name'])
return names


@staticmethod
def query_pool_by_name(pool_name):
import pymongo
"""query pool information from MongoDB by name"""
connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB'))
db = connection['ipythondb']
pools = db['drqueue_pools']
pools = get_queue_pools()
pool = pools.find_one({"name": pool_name})
return pool


@staticmethod
def query_pool_list():
import pymongo
"""query list of pools from MongoDB"""
connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB'))
db = connection['ipythondb']
pools = db['drqueue_pools']
pools = get_queue_pools()
pool_arr = []
for pool in pools.find():
pool_arr.append(pool)
return pool_arr


@staticmethod
def query_pool_members(pool_name):
import pymongo
"""query list of members of pool from MongoDB"""
connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB'))
db = connection['ipythondb']
pools = db['drqueue_pools']
pools = get_queue_pools()
pool = pools.find_one({"name": pool_name})
if pool == None:
return None
else:
return list(pool['engine_names'])