Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring MI into python #73

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ install:
- python setup.py install
- pip install -r requirements.txt
# install R components
- sudo R -f inferelator_ng/R_code/packages.R
# - sudo R -f inferelator_ng/R_code/packages.R
script:
- git clone https://github.com/simonsfoundation/kvsstcp
- export PYTHONPATH=$PYTHONPATH:$(pwd)/kvsstcp
Expand All @@ -67,5 +67,5 @@ after_success:
after_failure:
- pwd
- find .
- cat ./inferelator_ng/tests/artifacts/run_mi.R
- R -f ./inferelator_ng/tests/artifacts/run_mi.R
#- cat ./inferelator_ng/tests/artifacts/run_mi.R
#- R -f ./inferelator_ng/tests/artifacts/run_mi.R
1 change: 0 additions & 1 deletion inferelator_ng/R_code/README

This file was deleted.

122 changes: 0 additions & 122 deletions inferelator_ng/R_code/mi_and_clr.R

This file was deleted.

4 changes: 0 additions & 4 deletions inferelator_ng/R_code/packages.R

This file was deleted.

21 changes: 9 additions & 12 deletions inferelator_ng/bbsr_tfa_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,17 @@
import inferelator_ng.design_response_translation as design_response_translation
from inferelator_ng.tfa import TFA
from inferelator_ng.results_processor import ResultsProcessor
import inferelator_ng.mi_R as mi_R
import inferelator_ng.mi as mi
import inferelator_ng.bbsr_python as bbsr_python
import datetime
from kvsstcp.kvsclient import KVSClient
from inferelator_ng import kvs_controller
import pandas as pd
from inferelator_ng import utils

# Connect to the key value store service (its location is found via an
# environment variable that is set when this is started vid kvsstcp.py
# --execcmd).
kvs = KVSClient()
kvs = kvs_controller.KVSController()
# Find out which process we are (assumes running under SLURM).
rank = int(os.environ['SLURM_PROCID'])

Expand All @@ -31,9 +31,9 @@ def run(self):
"""
np.random.seed(self.random_seed)

self.mi_clr_driver = mi_R.MIDriver()
self.regression_driver = bbsr_python.BBSR_runner()
self.design_response_driver = design_response_translation.PythonDRDriver() #this is the python switch
self.mi_clr_driver = mi.MIDriver
self.regression_driver = bbsr_python.BBSR_runner
self.design_response_driver = design_response_translation.PythonDRDriver #this is the python switch
self.get_data()
self.compute_common_data()
self.compute_activity()
Expand All @@ -45,14 +45,11 @@ def run(self):
X = self.activity.ix[:, bootstrap]
Y = self.response.ix[:, bootstrap]
print('Calculating MI, Background MI, and CLR Matrix')
if 0 == rank:
(self.clr_matrix, self.mi_matrix) = self.mi_clr_driver.run(X, Y)
kvs.put('mi %d'%idx, (self.clr_matrix, self.mi_matrix))
else:
(self.clr_matrix, self.mi_matrix) = kvs.view('mi %d'%idx)
clr_matrix, mi_matrix = self.mi_clr_driver(kvs=kvs).run(X, Y)
mi_matrix = None
print('Calculating betas using BBSR')
ownCheck = utils.ownCheck(kvs, rank, chunk=25)
current_betas,current_rescaled_betas = self.regression_driver.run(X, Y, self.clr_matrix, self.priors_data,kvs,rank, ownCheck)
current_betas,current_rescaled_betas = self.regression_driver().run(X, Y, clr_matrix, self.priors_data,kvs,rank, ownCheck)
if rank: continue
betas.append(current_betas)
rescaled_betas.append(current_rescaled_betas)
Expand Down
131 changes: 131 additions & 0 deletions inferelator_ng/kvs_controller.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
"""
KVSController is a wrapper for KVSClient that adds some useful functionality related to interprocess
communication.

It also keeps track of a bunch of SLURM related stuff that was previously workflow's problem.
"""

from kvsstcp import KVSClient

import os

# SLURM environment variables
SBATCH_VARS = dict(SLURM_PROCID=('rank', int, 0),
SLURM_NTASKS_PER_NODE=('cores', int, 1),
SLURM_NTASKS=('tasks', int, 1),
SLURM_NODEID=('node', int, 0),
SLURM_JOB_NUM_NODES=('num_nodes', int, 1))

DEFAULT_WARNING = "SBATCH has not set ENV {var}. Setting {var} to {defa}."

NODE_MAP_KEY = "kvs_node_map"


class KVSController(KVSClient):
# Set from SLURM environment variables
rank = None # int
tasks = None # int
node = None # int
cores = None # int
num_nodes = None # int
is_master = False # bool

# Poll workers for location
proc_nodes = list() # list

def __init__(self, *args, **kwargs):
"""
Create a new KVS object with some object variables set to reflect the slurm environment
"""

# Get local environment variables
self._get_env(suppress_warnings=kwargs.pop("suppress_warnings", False))

# Connect to the host server by calling to KVSClient.__init__
super(KVSController, self).__init__(*args, **kwargs)


def _get_env(self, slurm_variables=SBATCH_VARS, suppress_warnings=False):
"""
Get the SLURM environment variables that are set by sbatch at runtime.
The default values mean multiprocessing won't work at all.
"""
for env_var, (class_var, func, default) in slurm_variables.items():
try:
val = func(os.environ[env_var])
except (KeyError, TypeError):
val = default
if not suppress_warnings:
print(DEFAULT_WARNING.format(var=env_var, defa=default))
setattr(self, class_var, val)
if self.rank == 0:
self.is_master = True
else:
self.is_master = False

def own_check(self, chunk=1, kvs_key='count'):
return ownCheck(self, self.rank, chunk=chunk, kvs_key=kvs_key)

def master_remove_key(self, kvs_key='count'):
if self.is_master:
self.get(kvs_key)

def sync_processes(self, pref=""):
# Block all processes until they reach this point
# Then release them
# It may be wise to use unique prefixes if this is gonna get called rapidly so there's no collision
# Or not. I'm a comment, not a cop.

wkey = pref + '_wait'
ckey = pref + '_continue'

# Every process puts a wait key up when it gets here
self.put(wkey, True)

# The master pulls down the wait keys until it has all of them
# Then it puts up a go key for each process
if self.is_master:
for _ in range(self.tasks):
self.get(wkey)
for _ in range(self.tasks):
self.put(ckey, True)

# Every process waits here until go keys are available
self.get(ckey)


def ownCheck(kvs, rank, chunk=1, kvs_key='count'):
"""
Generator

:param kvs: KVSClient
KVS object for server access
:param chunk: int
The size of the chunk given to each subprocess
:param kvs_key: str
The KVS key to increment (default is 'count')

:yield: bool
True if this process has dibs on whatever. False if some other process has claimed it first.
"""
if rank == 0:
kvs.put(kvs_key, 0)

# Start at the baseline
checks, lower, upper = 0, -1, -1

while True:

# Checks increments every loop
# If it's greater than the upper bound, get a new lower bound from the KVS count
# Set the new upper bound by adding chunk to lower
# And then put the new upper bound back into KVS key

if checks >= upper:
lower = kvs.get(kvs_key)
upper = lower + chunk
kvs.put(kvs_key, upper)

# Yield TRUE if this row belongs to this process and FALSE if it doesn't
yield lower <= checks < upper
checks += 1
Loading