Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added array job in AWS batch and partial data analysis #84

Merged
merged 2 commits into from
Oct 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 6 additions & 21 deletions aws_module/batch_deployment/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,28 +7,13 @@
apt-get install awscli
```

### Usage on local PC
* To change sample, please replace JSON file to calculate the score
```json
"environment": [
{
"name": "msigdb",
"value": "msigdb.v7.4.entrez.gmt(don't change this)"
},
{
"name": "inputfile",
"value": "Sample name here"
}
]
```
* And run module
```
# Single job
sh batch_module_singleJob.sh
## Batch Jobs List
| Name | Description | Main exec file |
|---------|---------|---------|
| activation_score_batch | Activation score calculation (Parallelized or Single)| batch_module_singleJob.sh or batch_module_parallel.sh |
| deg_pipeline_batch | DEG calculation pipeline(Array job) | batch_module_singleJob.sh |
| feature_extraction_batch | Feature extraction pipeline with activation scores | Developing |

# Parallelized job
sh batch_module_parallel.sh
```

### Multiple Jobs Flow
![flow1](../../README_resource/batch_detail.png)
34 changes: 34 additions & 0 deletions aws_module/batch_deployment/activation_score_batch/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
## AWS module for running the project
* This module supports to run the project codes, pipelines and analysis by launching AWS Batch. Currently, it is on development phase and this module can run with limited code (Activation Score Calculation).
* Parallel jobs execution is needed lambda function input, please use lambda_deployment section first

### Requirements on local PC
```
apt-get install awscli
```

### Usage on local PC
* To change sample, please replace JSON file to calculate the score
```json
"environment": [
{
"name": "msigdb",
"value": "msigdb.v7.4.entrez.gmt(don't change this)"
},
{
"name": "inputfile",
"value": "Sample name here"
}
]
```
* And run module
```
# Single job
sh batch_module_singleJob.sh

# Parallelized job
sh batch_module_parallel.sh
```

### Multiple Jobs Flow
![flow1](../../README_resource/batch_detail.png)
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ echo "Submit.."
aws batch submit-job --job-name activation-score-job --job-queue activation-score-queue --job-definition activation-score-job > job.submitted

jobID=$(jq '.jobId' job.submitted)
jobID="${objectState%\"}" # Remove double quotes from string
jobID="${objectState#\"}" # Remove double quotes from string
jobID="${jobID%\"}" # Remove double quotes from string
jobID="${jobID#\"}" # Remove double quotes from string

## Purpose of this bash file is running while EC2 is ready. When it is ready, automatically it will be out
while [ "$objectState" != "SUCCEEDED" ];do # EC2 running checking
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
FROM continuumio/miniconda

RUN mkdir /data
RUN mkdir /output

COPY . .
RUN conda create -n pipeline_controller_base python=3.8.2 R=3.6
SHELL ["conda", "run", "-n", "pipeline_controller_base", "/bin/bash", "-c"]

RUN pip install -r requirements.txt
RUN Rscript installer_Rpackage.R
RUN chmod +x pipeline_controller.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
### install DESeq2, tximport packages from Bioconductor
if (!requireNamespace("BiocManager", quietly = TRUE))
install.packages("BiocManager", repos='http://cran.us.r-project.org')
BiocManager::install("DESeq2")
BiocManager::install("tximport")
BiocManager::install("AnnotationDbi")
BiocManager::install("org.Hs.eg.db")
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import boto3
import os


class botoHandler(object):

def __init__(self, bucketName):
s3 = boto3.resource('s3') # s3 resource
my_bucket = s3.Bucket(bucketName) # Bucket Set
self.s3 = s3
self.my_bucket = my_bucket


def getDirFiles(self, dirname, destpath='/data/'):
"""
Get all ojects in specific folder
input: folder name, output destination in container
output: Downloading objects
"""

for object_summary in self.my_bucket.objects.filter(Prefix=dirname):
targetFile=destpath+os.path.basename(object_summary.key)
self.my_bucket.download_file(object_summary.key, targetFile)

# Data search
def search_obj(self, bucketObj, searchList):
"""
Search function for s3 objects
input: s3 object in boto3, list of items
output: s3 object key list
"""
result=[]
for target in searchList:
for candidate in bucketObj.objects.all():
if str(candidate.key).find(target) > -1: # if target finds in string
result.append(candidate.key) # get key
return result

# Get data from S3
def getFile(self, searchList, destpath='/data/'):
"""
Download file from S3
input: bucketname, input name, container path for samples
output: output path string
"""
s3_list = self.search_obj(self.my_bucket, searchList) # Search file object
targetFile=destpath+searchList[0]
self.my_bucket.download_file(s3_list[0], targetFile)

return targetFile

# Upload data to S3
def uploadFile(self, writeFileName, data, datatype='pandas'):
if datatype=='pandas':
self.my_bucket.put_object(Key=os.path.basename(writeFileName), Body=data.to_csv()) # Streaming to S3
elif datatype=='txt':
self.my_bucket.put_object(Key=os.path.basename(writeFileName), Body=data) # Streaming to S3
else:
raise ValueError('Error for upload data type definition')
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import pandas as pd
import itertools

class handlers(object):
def get_column(filename_with_path, ext_value, annot='gene_id', header_line=0, sep="\t"):
"""
filename_with_path = filepath + basename
ext_value = column name of file
sep = separator
"""

# Don't use pandas.read_csv because of memory usage
index_list = []
value_list = []
with open(filename_with_path, 'r') as infile:
for i, line in enumerate(infile):
line = line.strip()
if i==header_line: # found header
header_info = line.split(sep)
value_ext_location = header_info.index(ext_value) # location of value extraction point
index_ext_location = header_info.index(annot) # location of value extraction point

elif i!=header_line:
line_list = line.split(sep)
index_list.append(str(line_list[index_ext_location])) # Value list
value_list.append(float(line_list[value_ext_location])) # Index list

result_df = pd.DataFrame(data={ext_value: value_list}, index=index_list)
return result_df

def get_samplename(filelist):
"""
filelist = list of basename
Lambda function could be--
_get_samplename = lambda filelist : [x.split("-")[0] for x in filelist]
"""
sampleName = [x.split("-")[0] for x in filelist]
return sampleName

def get_condtionMatrix_by_category(dataframe, sampleColumn, dataColname, conditions:list):
"""
Transform meta data to DESeq condition matrix
Input
dataframe: metadata input
sampleColumn: Column name for Sample ID in metadata input
dataColumn: Column name for category value in metadata input
conditions: Conditions you selected, list type, and it has 2 elements

Output
result dataframe with 2 columns (colnames: sampleID, conditions)
"""

assert len(conditions)==2, "Please make sure that conditions list has 2 elements"

sampleList = [] # empty list
conditionValues = []
for x in conditions:
data = dataframe[dataframe[dataColname]==x][sampleColumn] # get sample name
sampleList.append(data.values.tolist()) # sampleID
conditionValues.append([x]*len(data.values.tolist())) # condition value

sampleList = list(itertools.chain(*sampleList)) # flatten
conditionValues = list(itertools.chain(*conditionValues))

result = pd.DataFrame(data={'sampleID':sampleList, 'conditions':conditionValues}).set_index('sampleID')
return result
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#!/bin/bash
#Example: sh pipeline_controller.sh CD4 Sex M F
celltype=$1
condition=$2
cond1=$3
cond2=$4

if [ $AWS_BATCH_JOB_ARRAY_INDEX -eq 0 ];
then
conda run -n pipeline_controller_base python step1_get_DESeq2_input.py -c $celltype -v $condition -x $cond1 -y $cond2

elif [ $AWS_BATCH_JOB_ARRAY_INDEX -eq 1 ];
then
conda run -n pipeline_controller_base Rscript step2_DESeq2_calculator.R /output/${celltype}_output.csv /output/${celltype}_meta_output.csv /output/DEG_${celltype}.result

else
conda run -n pipeline_controller_base python step3_upload_to_s3.py -c $celltype
fi
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
##controller requirements
gunicorn==20.1.0
Jinja2==3.0.1
PyYAML==5.4.1
flask==2.0.1
Flask-WTF==0.15.1
Flask-Bootstrap==3.3.7.1
flask-nav==0.6
celery==5.1.2
redis==3.5.3
boto3==1.18.54
awscli==1.20.54
##deg requirements
pip==21.2.2
pandas==1.3.2
numpy==1.21.2
feather-format==0.4.1
scikit-learn==0.24.2
scipy==1.7.1
snakemake==6.8.0
# feature-ext requirements
matplotlib==3.4.3
matplotlib-inline==0.1.2
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
__author__ = "Junhee Yoon"
__version__ = "1.0.0"
__maintainer__ = "Junhee Yoon"
__email__ = "[email protected]"

"""
Description: This is batch job for transforming data to DESeq input
"""

import pandas as pd
import numpy as np
import os
import glob
import argparse

from libraries.botoClass import botoHandler
from libraries.externalHandler import handlers as dataHandler

## argparse setting
parser = argparse.ArgumentParser(prog='step1_get_DESeq2_input.py')

parser.add_argument('-c','--ctype', type=str, dest='celltype', required=True,\
choices=['CD4','CD8','CD14'],help='Cell type for extraction, default = CD8')

parser.add_argument('-v','--condcolumn', type=str, dest='condcolumn', required=True,\
help='Column name which is using for condition value')

parser.add_argument('-x','--cond1', type=str, dest='cond1', required=True,\
help='condition1 for metadata')

parser.add_argument('-y','--cond2', type=str, dest='cond2', required=True,\
help='condition2 for metadata')

args = parser.parse_args()

# Main function
if __name__ == "__main__":

### Get ENV variables
mainDataBucket = os.environ['mainbucket'] # openkbc-ms-maindata-bucket
metaName = os.environ['metafile'] # EPIC_HCvB_metadata_baseline_updated-share.csv
outputPath = os.environ['efspoint'] # /output/

### Error handling here

### Data prepration
s3 = botoHandler(mainDataBucket) # Call boto3
COUNT_PATH = "/data/" # Main data path

META_PATH = s3.getFile([metaName]) ## This is FIXED parameter
s3.getDirFiles('rsem_counts/', destpath=COUNT_PATH) # Download all count files

filelist = glob.glob(COUNT_PATH+"*-"+args.celltype+".genes.results") # File path
filelist = [os.path.basename(cursor) for cursor in filelist] # Extracting base file name
sampleName = dataHandler.get_samplename(filelist)

result_arr = [] # result array
# sampleName and filelist have same order, and appending to result array
for filename in filelist:
sampleValues = dataHandler.get_column(COUNT_PATH+filename, 'expected_count')
result_arr.append(sampleValues)
result_df = pd.concat(result_arr, axis=1)
result_df.columns = sampleName # Change column name by using sample names

metadata = pd.read_csv(META_PATH) # read meta data

# get meta result
meta_result_df = dataHandler.get_condtionMatrix_by_category(metadata, 'HCVB_ID', args.condcolumn, [args.cond1, args.cond2])
overlapped_samples = list(set(meta_result_df.index.tolist()).intersection(set(result_df.columns.tolist()))) # Overlapped samples

# Extract overlapped samples
meta_result_df = meta_result_df.loc[overlapped_samples]
result_df = result_df[overlapped_samples]
result_df.astype(int).to_csv('/output/'+args.celltype+"_output.csv") # Output
meta_result_df.to_csv('/output/'+args.celltype+"_meta_output.csv")
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Reference: http://bioconductor.org/packages/devel/bioc/vignettes/DESeq2/inst/doc/DESeq2.html
#
# This code calculates DESeq2 DEG from the matrix with only specific condition
# Example: Rscript inputfile metafile columnname_in_meta condition1 condition2 outputfile
# inputfile = "./sample_CD4_ext.csv"
# metafile = "./sample_CD4_meta.csv"
# outputfile = "./CD4_DEG.csv"

#library(tidyverse)
library(DESeq2)
library(tximport)

args = commandArgs(trailingOnly=TRUE)
inputFile = args[1]
metaFile = args[2]
outputFile = args[3]

exprData <- read.table(inputFile,sep=",", header=TRUE, row.names=1)
metaData <- read.table(metaFile, sep=",", header=TRUE, row.names=1)

names(exprData) <- sub("^X", "", names(exprData)) # drop "X" string in columns name
exprData <- as.matrix(exprData) # To matrix
metaData$conditions <- factor(metaData$conditions)

exprData <- round(exprData) # Intersected samples only for expression, make integer

degSet <- DESeqDataSetFromMatrix(countData = exprData, colData = metaData, design = ~ conditions) # Perfom DESeq2
degSet <- DESeq(degSet)
res <- results(degSet) # result

write.table(res, file = outputFile, row.names = TRUE, col.names = TRUE,)
Loading