diff --git a/aws_module/batch_deployment/activation_score_batch/README.md b/aws_module/batch_deployment/activation_score_batch/README.md index 8441022..46b3c8b 100644 --- a/aws_module/batch_deployment/activation_score_batch/README.md +++ b/aws_module/batch_deployment/activation_score_batch/README.md @@ -1,3 +1,5 @@ +# Activation Score Calculation in AWS Batch + ## AWS module for running the project * This is initial version of DEG pipeline with AWS batch, it has same function with pipeline module in this project. * To change input, JSON file needs modification diff --git a/aws_module/batch_deployment/deg_pipeline_batch/README.md b/aws_module/batch_deployment/deg_pipeline_batch/README.md index a182648..dd656dc 100644 --- a/aws_module/batch_deployment/deg_pipeline_batch/README.md +++ b/aws_module/batch_deployment/deg_pipeline_batch/README.md @@ -1,6 +1,8 @@ +# DEG Pipieline in AWS Batch + ## AWS module for running the project -* This module supports to run the project codes, pipelines and analysis by launching AWS Batch. Currently, it is on development phase and this module can run with limited code (Activation Score Calculation). -* Parallel jobs execution is needed lambda function input, please use lambda_deployment section first +* This module supports to get sample DEG by certain conditions, it is using AWS Batch and need to launch separate running for each cells result(CD4, CD8, CD14) +* There is no parallel job for this and it is sequential job type. ### Requirements on local PC ``` @@ -25,5 +27,5 @@ apt-get install jq sh batch_module_singleJob.sh # For CD4 only ``` -### Multiple Jobs Flow -![flow1](../../../README_resource/batch_detail2.png) +### Sequential Jobs Flow +![flow1](../../../README_resource/batch_detail2.png) \ No newline at end of file diff --git a/aws_module/batch_deployment/feature_extraction_batch/README.md b/aws_module/batch_deployment/feature_extraction_batch/README.md new file mode 100644 index 0000000..a4cd87b --- /dev/null +++ b/aws_module/batch_deployment/feature_extraction_batch/README.md @@ -0,0 +1,31 @@ +# Feature Extranction Pipieline in AWS Batch + +## AWS module for running the project +* This module supports to extract features by certain conditions, it is using AWS Batch and need to launch separate running for each cells result(CD4, CD8, CD14) +* There is no parallel job for this and it is sequential job type. + +### Requirements on local PC +``` +apt-get install awscli +apt-get install jq +``` + +### Usage on local PC +* To change cell type(CD4, CD8, CD14) or category, please replace JSON file to run them separately +```json + { + "name": "msigDBPATH", + "value": "msigdb.v7.4.entrez.gmt" + }, + { + "name": "startFile", + "value": "counts_vst_CD4.csv" # Change here for input + }, +``` +* And run module +``` +sh batch_module_singleJob.sh # For CD4 only +``` + +### Sequential Jobs Flow +![flow1](../../../README_resource/batch_detail2.png) diff --git a/aws_module/batch_deployment/feature_extraction_batch/batch_jobs/Dockerfile b/aws_module/batch_deployment/feature_extraction_batch/batch_jobs/Dockerfile new file mode 100644 index 0000000..7e9da36 --- /dev/null +++ b/aws_module/batch_deployment/feature_extraction_batch/batch_jobs/Dockerfile @@ -0,0 +1,12 @@ +FROM continuumio/miniconda + +RUN mkdir /data +RUN mkdir /output + +COPY . . +RUN conda create -n pipeline_controller_base python=3.8.2 R=3.6 +SHELL ["conda", "run", "-n", "pipeline_controller_base", "/bin/bash", "-c"] + +RUN pip install -r requirements.txt +RUN Rscript installer_Rpackage.R +RUN chmod +x pipeline_controller.sh \ No newline at end of file diff --git a/aws_module/batch_deployment/feature_extraction_batch/batch_jobs/deseq2_norm.R b/aws_module/batch_deployment/feature_extraction_batch/batch_jobs/deseq2_norm.R new file mode 100644 index 0000000..1edcef8 --- /dev/null +++ b/aws_module/batch_deployment/feature_extraction_batch/batch_jobs/deseq2_norm.R @@ -0,0 +1,75 @@ +################################################################################## +##### normalization using DESeq2 - get normalized and vst transformed counts ##### +################################################################################## +### install DESeq2, tximport packages from Bioconductor +#if (!requireNamespace("BiocManager", quietly = TRUE)) +# install.packages("BiocManager", repos='http://cran.us.r-project.org') +# BiocManager::install("DESeq2") +# BiocManager::install("tximport") + +library(tidyverse) +library(DESeq2) +library(tximport) + +args = commandArgs(trailingOnly=TRUE) +inputpath = args[1] # with path +outputFile = args[2] # with path +metadataPath = args[3] + +data_path <- "../data/" +print(getwd()) +setwd(data_path) + +# loading metadata +metadata <- read_csv(metadataPath)) +colnames(metadata)[1] <- "sampleID" + +# loading rsem filelist +files <- list.files(path=inputpath, pattern="*.genes.results", recursive = TRUE, full.names = TRUE) + +# create sample table +sampleTable <- data.frame(sampleID = str_extract(files, "\\d{5}\\w"), CellType = str_extract(files, "CD\\d{1,2}"), files = files) %>% + left_join(., metadata, by="sampleID") +row.names(sampleTable) <- paste(sampleTable$sampleID, sampleTable$CellType, sep=".") + + +# function for loading rsem counts and normalization +getNormCounts <- function(celltypes, files, sampleTable){ + rsem.gene.sub <- tximport(files[grep(celltypes, files)], type = "rsem", txIn = FALSE, txOut = FALSE) + sampleTable.sub <- sampleTable %>% filter(files %in% files[grep(celltypes, files)]) %>% + arrange(match(files, files[grep(celltypes, files)])) + + rsem.gene.sub$length[rsem.gene.sub$length == 0] <- 1 + cds.sub <- DESeqDataSetFromTximport(txi=rsem.gene.sub, colData=sampleTable.sub, design=~1) + cds.sub <- cds.sub[ rowSums(counts(cds.sub)) > 0, ] + cds.sub <- DESeq(cds.sub) + return(cds.sub) +} + +# run function +celltypes <- c("CD4","CD8","CD14") + +deseq_obj <- list() +for (i in celltypes){ + deseq_obj[[i]] <- getNormCounts(i, files, sampleTable) +} + + +### save count matrix +# save DESeq2 object +save(deseq_obj, file = "DESeq2_object.RData") + +# normalized counts +write.csv(counts(deseq_obj$CD4, normalized = TRUE), "counts_norm_CD4.csv") +write.csv(counts(deseq_obj$CD8, normalized = TRUE), "counts_norm_CD8.csv") +write.csv(counts(deseq_obj$CD14, normalized = TRUE), "counts_norm_CD14.csv") + +# vst(variance stabilization transformation) transformed counts +write.csv(assay(vst(deseq_obj$CD4)), "counts_vst_CD4.csv") +write.csv(assay(vst(deseq_obj$CD8)), "counts_vst_CD8.csv") +write.csv(assay(vst(deseq_obj$CD14)), "counts_vst_CD14.csv") + +# rlog transformed counts +write.csv(assay(rlog(deseq_obj$CD4)), "counts_rlog_CD4.csv") +write.csv(assay(rlog(deseq_obj$CD8)), "counts_rlog_CD8.csv") +write.csv(assay(rlog(deseq_obj$CD14)), "counts_rlog_CD14.csv") \ No newline at end of file diff --git a/aws_module/batch_deployment/feature_extraction_batch/batch_jobs/installer_Rpackage.R b/aws_module/batch_deployment/feature_extraction_batch/batch_jobs/installer_Rpackage.R new file mode 100644 index 0000000..3cc3631 --- /dev/null +++ b/aws_module/batch_deployment/feature_extraction_batch/batch_jobs/installer_Rpackage.R @@ -0,0 +1,7 @@ +### install DESeq2, tximport packages from Bioconductor +if (!requireNamespace("BiocManager", quietly = TRUE)) + install.packages("BiocManager", repos='http://cran.us.r-project.org') + BiocManager::install("DESeq2") + BiocManager::install("tximport") + BiocManager::install("AnnotationDbi") + BiocManager::install("org.Hs.eg.db") \ No newline at end of file diff --git a/aws_module/batch_deployment/feature_extraction_batch/batch_jobs/libraries/botoClass.py b/aws_module/batch_deployment/feature_extraction_batch/batch_jobs/libraries/botoClass.py new file mode 100644 index 0000000..70ffe32 --- /dev/null +++ b/aws_module/batch_deployment/feature_extraction_batch/batch_jobs/libraries/botoClass.py @@ -0,0 +1,59 @@ +import boto3 +import os + + +class botoHandler(object): + + def __init__(self, bucketName): + s3 = boto3.resource('s3') # s3 resource + my_bucket = s3.Bucket(bucketName) # Bucket Set + self.s3 = s3 + self.my_bucket = my_bucket + + + def getDirFiles(self, dirname, destpath='/data/'): + """ + Get all ojects in specific folder + input: folder name, output destination in container + output: Downloading objects + """ + + for object_summary in self.my_bucket.objects.filter(Prefix=dirname): + targetFile=destpath+os.path.basename(object_summary.key) + self.my_bucket.download_file(object_summary.key, targetFile) + + # Data search + def search_obj(self, bucketObj, searchList): + """ + Search function for s3 objects + input: s3 object in boto3, list of items + output: s3 object key list + """ + result=[] + for target in searchList: + for candidate in bucketObj.objects.all(): + if str(candidate.key).find(target) > -1: # if target finds in string + result.append(candidate.key) # get key + return result + + # Get data from S3 + def getFile(self, searchList, destpath='/data/'): + """ + Download file from S3 + input: bucketname, input name, container path for samples + output: output path string + """ + s3_list = self.search_obj(self.my_bucket, searchList) # Search file object + targetFile=destpath+searchList[0] + self.my_bucket.download_file(s3_list[0], targetFile) + + return targetFile + + # Upload data to S3 + def uploadFile(self, writeFileName, data, datatype='pandas'): + if datatype=='pandas': + self.my_bucket.put_object(Key=os.path.basename(writeFileName), Body=data.to_csv()) # Streaming to S3 + elif datatype=='txt': + self.my_bucket.put_object(Key=os.path.basename(writeFileName), Body=data) # Streaming to S3 + else: + raise ValueError('Error for upload data type definition') diff --git a/aws_module/batch_deployment/feature_extraction_batch/batch_jobs/libraries/externalHandler.py b/aws_module/batch_deployment/feature_extraction_batch/batch_jobs/libraries/externalHandler.py new file mode 100644 index 0000000..939ea65 --- /dev/null +++ b/aws_module/batch_deployment/feature_extraction_batch/batch_jobs/libraries/externalHandler.py @@ -0,0 +1,66 @@ +import pandas as pd +import itertools + +class handlers(object): + def get_column(filename_with_path, ext_value, annot='gene_id', header_line=0, sep="\t"): + """ + filename_with_path = filepath + basename + ext_value = column name of file + sep = separator + """ + + # Don't use pandas.read_csv because of memory usage + index_list = [] + value_list = [] + with open(filename_with_path, 'r') as infile: + for i, line in enumerate(infile): + line = line.strip() + if i==header_line: # found header + header_info = line.split(sep) + value_ext_location = header_info.index(ext_value) # location of value extraction point + index_ext_location = header_info.index(annot) # location of value extraction point + + elif i!=header_line: + line_list = line.split(sep) + index_list.append(str(line_list[index_ext_location])) # Value list + value_list.append(float(line_list[value_ext_location])) # Index list + + result_df = pd.DataFrame(data={ext_value: value_list}, index=index_list) + return result_df + + def get_samplename(filelist): + """ + filelist = list of basename + Lambda function could be-- + _get_samplename = lambda filelist : [x.split("-")[0] for x in filelist] + """ + sampleName = [x.split("-")[0] for x in filelist] + return sampleName + + def get_condtionMatrix_by_category(dataframe, sampleColumn, dataColname, conditions:list): + """ + Transform meta data to DESeq condition matrix + Input + dataframe: metadata input + sampleColumn: Column name for Sample ID in metadata input + dataColumn: Column name for category value in metadata input + conditions: Conditions you selected, list type, and it has 2 elements + + Output + result dataframe with 2 columns (colnames: sampleID, conditions) + """ + + assert len(conditions)==2, "Please make sure that conditions list has 2 elements" + + sampleList = [] # empty list + conditionValues = [] + for x in conditions: + data = dataframe[dataframe[dataColname]==x][sampleColumn] # get sample name + sampleList.append(data.values.tolist()) # sampleID + conditionValues.append([x]*len(data.values.tolist())) # condition value + + sampleList = list(itertools.chain(*sampleList)) # flatten + conditionValues = list(itertools.chain(*conditionValues)) + + result = pd.DataFrame(data={'sampleID':sampleList, 'conditions':conditionValues}).set_index('sampleID') + return result diff --git a/aws_module/batch_deployment/feature_extraction_batch/batch_jobs/libraries/standard_gzscore.py b/aws_module/batch_deployment/feature_extraction_batch/batch_jobs/libraries/standard_gzscore.py new file mode 100644 index 0000000..18f4555 --- /dev/null +++ b/aws_module/batch_deployment/feature_extraction_batch/batch_jobs/libraries/standard_gzscore.py @@ -0,0 +1,33 @@ +__author__ = "Junhee Yoon" +__version__ = "1.0.0" +__maintainer__ = "Junhee Yoon" +__email__ = "swiri021@gmail.com" + +""" +Reference: https://genomebiology.biomedcentral.com/articles/10.1186/gb-2006-7-10-r93 +Description: calculating activation score by using z score +""" + +import pandas as pd +import numpy as np + +class calculator(object): + + def __init__(self, df): + if df.empty: + raise ValueError("Input Dataframe is empty, please try with different one.") + else: + self.df = df + + # function structure + def gs_zscore(self, names='Zscore', gene_set=[]): + zscore=[] + arr1_index = self.df.index.tolist() + inter = list(set(arr1_index).intersection(gene_set)) + + diff_mean = self.df.loc[inter].mean(axis=0).subtract(self.df.mean(axis=0)) + len_norm = self.df.std(ddof=1, axis=0).apply(lambda x: np.sqrt(len(inter))/x) + zscore = diff_mean*len_norm + zscore = zscore.to_frame() + zscore.columns = [names] + return zscore \ No newline at end of file diff --git a/aws_module/batch_deployment/feature_extraction_batch/batch_jobs/libraries/statFunction.py b/aws_module/batch_deployment/feature_extraction_batch/batch_jobs/libraries/statFunction.py new file mode 100644 index 0000000..7266fb8 --- /dev/null +++ b/aws_module/batch_deployment/feature_extraction_batch/batch_jobs/libraries/statFunction.py @@ -0,0 +1,72 @@ +__author__ = "Junhee Yoon" +__version__ = "1.0.0" +__maintainer__ = "Junhee Yoon" +__email__ = "swiri021@gmail.com" + +""" +Description: Repeative functions in notebook +""" +from sklearn.svm import SVC +from sklearn.model_selection import StratifiedKFold +from sklearn.feature_selection import RFECV + +from scipy.stats import ranksums +import pandas as pd +import numpy as np + +class StatHandler(object): + """ + Statistics handlers + + """ + + def calculate_ranksum(df, sampleList, controlList): + """ + ranksum statistics wrapper by following notebook + + Input: + df = Sample dataframe + sampleList = short disease duration + controlList = long disease duration + + Output: + significant index(statistics and pvalue) by dataframe + """ + + significant_list = [] + for x in df.index.tolist(): + long_data = df[controlList].loc[x] # Long expr list + short_data = df[sampleList].loc[x] # Short expr list + + s, p = ranksums(long_data.values.tolist(), short_data.values.tolist()) # ranksum + fc = short_data.mean(skipna=True) - long_data.mean(skipna=True) # FC + + if p<0.05: + significant_list.append([x,fc, p]) # sig list + + sig_df = pd.DataFrame(significant_list, columns=["Names", "fc", "pval"]) + return sig_df + + def calculate_RFECV(df, X, y, rankthresh=10): + ## Log function is needed here + ## Reference: + ## https://scikit-learn.org/stable/auto_examples/feature_selection/plot_rfe_with_cross_validation.html + + estimator = SVC(kernel="linear") # linear + min_features_to_select = 1 + rfecv = RFECV(estimator=estimator, step=1, cv=StratifiedKFold(2),\ + scoring='accuracy', min_features_to_select=min_features_to_select) + rfecv.fit(X, y) + + print("Optimal number of features : %d" % rfecv.n_features_) + return np.where(rfecv.ranking_ <= int(rankthresh)) + + """ + # Muted visualization part + # Plot number of features VS. cross-validation scores + plt.figure() + plt.xlabel("Number of features selected") + plt.ylabel("Cross validation score (nb of correct classifications)") + plt.plot(range(min_features_to_select, len(rfecv.grid_scores_) + min_features_to_select), rfecv.grid_scores_) + plt.show() + """ \ No newline at end of file diff --git a/aws_module/batch_deployment/feature_extraction_batch/batch_jobs/pipeline_controller.sh b/aws_module/batch_deployment/feature_extraction_batch/batch_jobs/pipeline_controller.sh new file mode 100644 index 0000000..d22aa75 --- /dev/null +++ b/aws_module/batch_deployment/feature_extraction_batch/batch_jobs/pipeline_controller.sh @@ -0,0 +1,38 @@ +#!/bin/bash +#Example: sh pipeline_controller.sh + +if [ $AWS_BATCH_JOB_ARRAY_INDEX -eq 0 ]; +then + conda run -n pipeline_controller_base python step0_downloadFile.py + +elif [ $AWS_BATCH_JOB_ARRAY_INDEX -eq 1 ]; +then + conda run -n pipeline_controller_base python step1_cleanup_normalized_matrix.py --vst + +elif [ $AWS_BATCH_JOB_ARRAY_INDEX -eq 2 ]; +then + conda run -n pipeline_controller_base python step2_IDconverter.R + +elif [ $AWS_BATCH_JOB_ARRAY_INDEX -eq 3 ]; +then + conda run -n pipeline_controller_base python step3_get_all_activation_scores.py + +elif [ $AWS_BATCH_JOB_ARRAY_INDEX -eq 4 ]; +then + conda run -n pipeline_controller_base python step4_actscoreDiff.py --type RR,CIS + +elif [ $AWS_BATCH_JOB_ARRAY_INDEX -eq 5 ]; +then + conda run -n pipeline_controller_base python step5_actscoreRFECV.py --type RR,CIS --rthresh 10 + +elif [ $AWS_BATCH_JOB_ARRAY_INDEX -eq 6 ]; +then + conda run -n pipeline_controller_base python step6_geneDiff.py --type RR,CIS + +elif [ $AWS_BATCH_JOB_ARRAY_INDEX -eq 7 ]; +then + conda run -n pipeline_controller_base python step7_geneRFECV.py --type RR,CIS --rthresh 5 + +else + conda run -n pipeline_controller_base python step8_upload_to_s3.py +fi \ No newline at end of file diff --git a/aws_module/batch_deployment/feature_extraction_batch/batch_jobs/requirements.txt b/aws_module/batch_deployment/feature_extraction_batch/batch_jobs/requirements.txt new file mode 100644 index 0000000..4c509de --- /dev/null +++ b/aws_module/batch_deployment/feature_extraction_batch/batch_jobs/requirements.txt @@ -0,0 +1,23 @@ +##controller requirements +gunicorn==20.1.0 +Jinja2==3.0.1 +PyYAML==5.4.1 +flask==2.0.1 +Flask-WTF==0.15.1 +Flask-Bootstrap==3.3.7.1 +flask-nav==0.6 +celery==5.1.2 +redis==3.5.3 +boto3==1.18.54 +awscli==1.20.54 +##deg requirements +pip==21.2.2 +pandas==1.3.2 +numpy==1.21.2 +feather-format==0.4.1 +scikit-learn==0.24.2 +scipy==1.7.1 +snakemake==6.8.0 +# feature-ext requirements +matplotlib==3.4.3 +matplotlib-inline==0.1.2 \ No newline at end of file diff --git a/aws_module/batch_deployment/feature_extraction_batch/batch_jobs/step0_downloadFile.py b/aws_module/batch_deployment/feature_extraction_batch/batch_jobs/step0_downloadFile.py new file mode 100644 index 0000000..d762cae --- /dev/null +++ b/aws_module/batch_deployment/feature_extraction_batch/batch_jobs/step0_downloadFile.py @@ -0,0 +1,31 @@ +__author__ = "Junhee Yoon" +__version__ = "1.0.0" +__maintainer__ = "Junhee Yoon" +__email__ = "swiri021@gmail.com" + +""" +Description: File prepration for AWS batch, Downloading target files in EFS +""" +import os +from libraries.botoClass import botoHandler +from libraries.externalHandler import handlers as dataHandler +import argparse + + +if __name__ == "__main__": + + ### Get ENV variables + mainDataBucket = os.environ['mainbucket'] # openkbc-ms-maindata-bucket + outputPath = os.environ['efspoint'] # /output/ + inputFile = os.environ['startFile'] # counts_vst_CD4.csv + metaName = os.environ['metafile'] # EPIC_HCvB_metadata_baseline_updated-share.csv + msigFile = os.environ['msigDBPATH'] # msigdb.v7.4.entrez.gmt + + ### Data prepration + s3 = botoHandler(mainDataBucket) # Call boto3 + + print("Download Path: "+outputPath) + s3.getFile([metaName], destpath=outputPath) ## This is FIXED parameter + s3.getFile([msigFile], destpath=outputPath) ## This is FIXED parameter + s3.getFile([inputFile], destpath=outputPath) ## This is FIXED parameter + print("Finished donwloading files") \ No newline at end of file diff --git a/aws_module/batch_deployment/feature_extraction_batch/batch_jobs/step1_cleanup_normalized_matrix.py b/aws_module/batch_deployment/feature_extraction_batch/batch_jobs/step1_cleanup_normalized_matrix.py new file mode 100644 index 0000000..9e03a73 --- /dev/null +++ b/aws_module/batch_deployment/feature_extraction_batch/batch_jobs/step1_cleanup_normalized_matrix.py @@ -0,0 +1,43 @@ +__author__ = "Junhee Yoon" +__version__ = "1.0.0" +__maintainer__ = "Junhee Yoon" +__email__ = "swiri021@gmail.com" + +""" +Description: This code cleans normalized matrix which contains informal Ensembl ID as index and columns with informal string, +and it takes care of duplications of Ensembl IDs. After doing this code, R/IDconverter.R is highly recommended to convert ID(CSV only). +""" + +import os +import argparse +import pandas as pd +import numpy as np + +parser = argparse.ArgumentParser(prog='cleanup_normalized_matrix.py') +# Input data +parser.add_argument('-v','--vst', dest='vst', action='store_true',default=False,\ + help='Input data is vst normalized or not, default = False') +args = parser.parse_args() + +if __name__ == "__main__": + # Input Name: step0_name.csv + + SharedFilePath = os.environ['efspoint'] # Main data path here, goes to EFS volume + inputFile = os.environ['startFile'] # counts_normalized/rawFiles/counts_vst_CD4.csv + + # Load data + if '.csv' in inputFile: + df = pd.read_csv(SharedFilePath+inputFile, index_col=0) + elif '.feather' in inputFile: + df = pd.read_feather(SharedFilePath+inputFile, index_col=0) + + df.index = [x.split(".")[0] for x in df.index.tolist()] # New index names + df.columns = [x.split(".")[0] for x in df.columns.tolist()] # New column names + df = df[~df.index.duplicated(keep='first')] # Taking first values in duplicated index + + if args.vst==False: + df=df.applymap(lambda x: np.log2(x+1)) # Apply log2 for non-vst normalized data + + ## Need to add file name handler + df.to_csv(SharedFilePath+os.path.basename(inputFile).replace('.csv', '.step1.csv')) + # Output Name: step0_name.step1.csv \ No newline at end of file diff --git a/aws_module/batch_deployment/feature_extraction_batch/batch_jobs/step2_IDconverter.R b/aws_module/batch_deployment/feature_extraction_batch/batch_jobs/step2_IDconverter.R new file mode 100644 index 0000000..2385fa2 --- /dev/null +++ b/aws_module/batch_deployment/feature_extraction_batch/batch_jobs/step2_IDconverter.R @@ -0,0 +1,26 @@ +# ID converter from Ensembl ID to Entrez ID, please cleanup data by using python code before using this code +# Only working with csv file +# Usage Rscript +# Rscript IDconverter.R path/counts_norm_CD8.csv path/outputfile.csv + +library("AnnotationDbi") +library("org.Hs.eg.db") + +inputPath = Sys.getenv("efspoint") +step1Input = Sys.getenv("startFile") +inputFile = gsub(".csv", ".step1.csv", step1Input) # File from step1 +inputFile = paste(inputPath,inputFile,sep="") # Full path with file name + +#str_split +data<-read.table(inputFile, row.names=1, sep=',', header=TRUE) # Read data +names(data) <- sub("^X", "", names(data)) # drop "X" string in columns name + +### Warning ### +# Entrez ID might duplicate for Ensemble ID +entrez = mapIds(org.Hs.eg.db, keys=row.names(data), column="ENTREZID", keytype="ENSEMBL", multiVals="first") +row.names(data)<-make.names(entrez, unique=TRUE) +row.names(data) <- sub("^X", "", row.names(data)) # drop "X" string in index name + +outputFile = gsub('.csv','.step2.csv',step1Input) # File name replacement for output +outputFile = paste(inputPath,outputFile,sep="") # Full path with file name +write.table(data, outputFile, sep=',', row.names = TRUE, col.names = TRUE) # Write result \ No newline at end of file diff --git a/aws_module/batch_deployment/feature_extraction_batch/batch_jobs/step3_get_all_activation_scores.py b/aws_module/batch_deployment/feature_extraction_batch/batch_jobs/step3_get_all_activation_scores.py new file mode 100644 index 0000000..702ca6e --- /dev/null +++ b/aws_module/batch_deployment/feature_extraction_batch/batch_jobs/step3_get_all_activation_scores.py @@ -0,0 +1,49 @@ +__author__ = "Junhee Yoon" +__version__ = "1.0.0" +__maintainer__ = "Junhee Yoon" +__email__ = "swiri021@gmail.com" + +""" +Description: This code generates activation scores by using MsigDB. This code needs expression matrix by EntrezID index +""" + +## Move to setting.env in the future +MSIGDB_PATH = "/Users/junheeyun/OpenKBC/multiple_sclerosis_proj/data/MsigDB_list/msigdb.v7.4.entrez.gmt" + +import os +import pandas as pd +from libraries.standard_gzscore import calculator + +if __name__ == "__main__": + + SharedFilePath = os.environ['efspoint'] # Main data path here, goes to EFS volume + metaName = os.environ['metafile'] # EPIC_HCvB_metadata_baseline_updated-share.csv + msigFile = os.environ['msigDBPATH'] # msigdb.v7.4.entrez.gmt + step1Input = os.environ['startFile'] # counts_vst_CD4.csv + inputFile = SharedFilePath+os.path.basename(step1Input).replace('.csv', '.step2.csv') # replace to step3 input + + # .gmt parsing + count = 0 + gmt_arr = [] # gmt parsing array + MSIGDB_PATH = SharedFilePath+msigFile + with open(MSIGDB_PATH, 'r') as infile: + for line in infile: + gmt_value = line.strip().split("\t") # splitting line + sig_names = gmt_value[0] # signature name + gene_list = gmt_value[2:] # gene list + gmt_arr.append([sig_names]+gene_list) + + # Sample loading, and some entrezIDs are duplicated in the matrix + gexpr = pd.read_csv(inputFile, index_col=0) + gexpr.index = [x.split(".")[0] for x in gexpr.index.tolist()] # remove effect from R make.names + gexpr = gexpr.groupby(gexpr.index).max() # keeping max for duplicated index + + zscore_arr = [] # result array + zscore_calculator = calculator(gexpr) # Set activation calculator + for sig in gmt_arr: + zscore_value = zscore_calculator.gs_zscore(names=sig[0], gene_set=sig[1:]) # using standard, threaded version has an error + zscore_arr.append(zscore_value) + zscore_df = pd.concat(zscore_arr, axis=1) # make dataframe + + outputFile = SharedFilePath+os.path.basename(step1Input).replace('.csv', '.step3.csv') # replace to step3 output + zscore_df.to_csv(outputFile) \ No newline at end of file diff --git a/aws_module/batch_deployment/feature_extraction_batch/batch_jobs/step4_actscoreDiff.py b/aws_module/batch_deployment/feature_extraction_batch/batch_jobs/step4_actscoreDiff.py new file mode 100644 index 0000000..d0bdbe5 --- /dev/null +++ b/aws_module/batch_deployment/feature_extraction_batch/batch_jobs/step4_actscoreDiff.py @@ -0,0 +1,122 @@ +__author__ = "Junhee Yoon" +__version__ = "1.0.0" +__maintainer__ = "Junhee Yoon" +__email__ = "swiri021@gmail.com" + +""" +Description: Mimic of notebook code for pipeline work, please see step1 in Jun notebook archive +""" + +import os +import pandas as pd +import numpy as np +import argparse +import itertools +from sys import argv +from libraries.statFunction import StatHandler + +parser = argparse.ArgumentParser(prog='actscoreDiff.py') +# Input data +parser.add_argument('-t','--type', dest='resultType', default='RR,CIS',\ + help='Result type, ex: long, healthy, "RR,CIS" ') +args = parser.parse_args() + + +# Copy of OpenKbcMSToolkit.py +def _get_sample_name_by_contValues(dataframe, sampleColumn, dataColname, threshold): + cont_df = dataframe.dropna(subset=[dataColname]) # continuous perspective dataframe + cont_df[dataColname] = cont_df[dataColname].astype(float) # make float + + #threshValue = np.percentile(cont_df[dataColname].values.tolist(), threshold) + #greater_samples = cont_df.loc[ cont_df[dataColname] >= threshValue, sampleColumn] + #less_samples = cont_df.loc[ cont_df[dataColname] < threshValue, sampleColumn] + + threshValue_below = np.percentile(cont_df[dataColname].values.tolist(), 100-threshold) + threshValue_above = np.percentile(cont_df[dataColname].values.tolist(), threshold) + + greater_samples = cont_df.loc[ cont_df[dataColname] > threshValue_below, sampleColumn] + less_samples = cont_df.loc[ cont_df[dataColname] <= threshValue_above, sampleColumn] + + return (less_samples, greater_samples) + +# Copy of OpenKbcMSToolkit.py +def _get_sample_name_by_category(dataframe, sampleColumn, dataColname): + sample_category = dataframe[dataColname].unique() # get unique value for category + result = [] # empty list + for x in sample_category: + data = dataframe[dataframe[dataColname]==x][sampleColumn] # get sample name + result.append(data.values.tolist()) + + return (result, sample_category) + +# This is not global function for other codes, specifically fixed path and data, don't use other purposes. +def _LoadDiseaseDuration(df, meta_data, returntype='long'): + """ + df : Expression or activation score matrix + meta_data : meta data which contains duration and sample ID + output: long DD samples and short DD samples by list, or healthy samples and short DD samples by list + """ + # checking multiple element for returntype + if returntype.count(',')>1: raise ValueError('No more than 2 elements for returntype') + + if returntype.find(',')==-1: # if returnType is single(long and healthy) + # Sample by disease category + sample_list, sample_category = _get_sample_name_by_category(dataframe=meta_data, sampleColumn='HCVB_ID', dataColname='DiseaseCourse') + + # Sort by disease category and exclude uknown samples + patient_samples = [] # patient samples + healthy_samples = [] # healthy samples + for samples, category in zip(sample_list, sample_category): + if category=='Healthy': + healthy_samples = samples + else: + if category!='Unknown':# Excluding unknown samples + patient_samples.append(samples) + + patient_samples = list(itertools.chain(*patient_samples)) # flatten + patient_samples = list(set(patient_samples).intersection(df.columns.tolist())) # intersected with act score matrix + healthy_samples = list(set(healthy_samples).intersection(df.columns.tolist())) # intersected with act score matrix + patient_meta = meta_data.loc[meta_data['HCVB_ID'].isin(patient_samples)] # Make patient metadata + + longDD_samples, shortDD_samples = _get_sample_name_by_contValues(patient_meta, 'HCVB_ID', 'DiseaseDuration', 25) + longDD_samples = list(set(longDD_samples.values.tolist()).intersection(df.columns.tolist())) # intersected with act score matrix + shortDD_samples = list(set(shortDD_samples.values.tolist()).intersection(df.columns.tolist())) # intersected with act score matrix + + else: # if returnType is multiple(List) + # Sample by disease category + sample_list, sample_category = _get_sample_name_by_category(dataframe=meta_data, sampleColumn='HCVB_ID', dataColname='DiseaseCourse') + category1 = returntype.split(',')[0] + category2 = returntype.split(',')[1] + + # Sort by disease category and exclude uknown samples + patient_samples = [] # patient samples + healthy_samples = [] # healthy samples + for samples, category in zip(sample_list, sample_category): + if category==category1: + category1_samples = list(set(samples).intersection(df.columns.tolist())) # intersected with act score matrix + elif category==category2: + category2_samples = list(set(samples).intersection(df.columns.tolist())) # intersected with act score matrix + + # return result + if returntype=='long': + return longDD_samples, shortDD_samples + elif returntype=='healthy': + return healthy_samples, shortDD_samples + else: + return category1_samples, category2_samples + +if __name__ == "__main__": + + SharedFilePath = os.environ['efspoint'] # Main data path here, goes to EFS volume + metaName = os.environ['metafile'] # EPIC_HCvB_metadata_baseline_updated-share.csv + msigFile = os.environ['msigDBPATH'] # msigdb.v7.4.entrez.gmt + step1Input = os.environ['startFile'] # counts_vst_CD4.csv + inputFile = SharedFilePath+os.path.basename(step1Input).replace('.csv', '.step3.csv') # replace to step3 input + + df = pd.read_csv(inputFile, engine='c', index_col=0).T.dropna() # Activation Score + meta_data = pd.read_csv(SharedFilePath+metaName) # Meta data + longDD_samples, shortDD_samples = _LoadDiseaseDuration(df, meta_data, args.resultType) + ranksumSig = StatHandler.calculate_ranksum(df, shortDD_samples, longDD_samples) # get ranksum result + + outputFile = SharedFilePath+os.path.basename(step1Input).replace('.csv', '.step4.csv') # replace to step4 output + df.loc[ranksumSig["Names"].values.tolist()].to_csv(outputFile) # Writing diff --git a/aws_module/batch_deployment/feature_extraction_batch/batch_jobs/step5_actscoreRFECV.py b/aws_module/batch_deployment/feature_extraction_batch/batch_jobs/step5_actscoreRFECV.py new file mode 100644 index 0000000..fc66dc3 --- /dev/null +++ b/aws_module/batch_deployment/feature_extraction_batch/batch_jobs/step5_actscoreRFECV.py @@ -0,0 +1,50 @@ +__author__ = "Junhee Yoon" +__version__ = "1.0.0" +__maintainer__ = "Junhee Yoon" +__email__ = "swiri021@gmail.com" + +""" +Description: Mimic of notebook code for pipeline work, please see step2 in Jun notebook archive +Step4 is the same process with different input(activation socre -> gene expression) +""" + +import pandas as pd +import argparse +import os + +## call previous step for calling internal function +import step4_actscoreDiff +from libraries.statFunction import StatHandler + + +parser = argparse.ArgumentParser(prog='actscoreDiff.py') +# Input data +parser.add_argument('-r','--rthresh', dest='rankingthresh', required=True,\ + help='Threshold for RFECV" ') +parser.add_argument('-t','--type', dest='resultType', default='RR,CIS',\ + help='Result type, ex: long, healthy, "RR,CIS" ') +args = parser.parse_args() + +# Simple control for snakemake(no argparse) +if __name__ == "__main__": + + SharedFilePath = os.environ['efspoint'] # Main data path here, goes to EFS volume + metaName = os.environ['metafile'] # EPIC_HCvB_metadata_baseline_updated-share.csv + msigFile = os.environ['msigDBPATH'] # msigdb.v7.4.entrez.gmt + step1Input = os.environ['startFile'] # counts_vst_CD4.csv + inputFile = SharedFilePath+os.path.basename(step1Input).replace('.csv', '.step4.csv') # replace to step4 input + + #Data loading + df = pd.read_csv(inputFile, engine='c', index_col=0) + meta_data = pd.read_csv(SharedFilePath+metaName) + longDD_samples, shortDD_samples = step4_actscoreDiff._LoadDiseaseDuration(df, meta_data, args.resultType) + df = df[longDD_samples+shortDD_samples].dropna() # reform df with intersected samples + + # Make training samples + X = df.T.values # Training sample + y = [0]*len(longDD_samples)+[1]*len(shortDD_samples) # Training y + + # features and ranking + outputFile = SharedFilePath+os.path.basename(step1Input).replace('.csv', '.step5.csv') # replace to step5 output + rankarr = StatHandler.calculate_RFECV(df, X, y, args.rankingthresh) # get ranksum result + df.loc[df.index[rankarr]].to_csv(outputFile) # Writing \ No newline at end of file diff --git a/aws_module/batch_deployment/feature_extraction_batch/batch_jobs/step6_geneDiff.py b/aws_module/batch_deployment/feature_extraction_batch/batch_jobs/step6_geneDiff.py new file mode 100644 index 0000000..0d5b15e --- /dev/null +++ b/aws_module/batch_deployment/feature_extraction_batch/batch_jobs/step6_geneDiff.py @@ -0,0 +1,67 @@ +__author__ = "Junhee Yoon" +__version__ = "1.0.0" +__maintainer__ = "Junhee Yoon" +__email__ = "swiri021@gmail.com" + +""" +Description: Mimic of notebook code for pipeline work, please see step3 in Jun notebook archive +""" + +import os +from scipy.stats import ranksums +import pandas as pd + +## call previous step for calling internal function +import step4_actscoreDiff +from libraries.statFunction import StatHandler +import itertools +import argparse + +def _extract_geneSignature(actInput, msigDBPath): + #Data loading + df = pd.read_csv(actInput, engine='c', index_col=0) # activation score data for extracting gene signature + + # MsigDB Parsing + gmt_arr = [] # gmt parsing array + with open(msigDBPath, 'r') as infile: + for line in infile: + gmt_value = line.strip().split("\t") # splitting line + sig_names = gmt_value[0] # signature name + gene_list = gmt_value[2:] # gene list + gmt_arr.append([sig_names]+gene_list) + + gmt_ext_arr = [x[1:] for x in gmt_arr if x[0] in df.index.tolist()] # Selected signature genes + gmt_ext_arr = list(itertools.chain(*gmt_ext_arr)) + gmt_ext_arr = list(set(gmt_ext_arr)) # remove duplicated + return gmt_ext_arr + +parser = argparse.ArgumentParser(prog='step6_geneDiff.py') +# Input data +parser.add_argument('-t','--type', dest='resultType', default='RR,CIS',\ + help='Result type, ex: long, healthy, "RR,CIS" ') +args = parser.parse_args() + + +if __name__ == "__main__": + + SharedFilePath = os.environ['efspoint'] # Main data path here, goes to EFS volume + metaName = os.environ['metafile'] # EPIC_HCvB_metadata_baseline_updated-share.csv + msigFile = os.environ['msigDBPATH'] # msigdb.v7.4.entrez.gmt + step1Input = os.environ['startFile'] # counts_vst_CD4.csv + + inputExprFile = SharedFilePath+os.path.basename(step1Input).replace('.csv', '.step2.csv') # replace to step2 input, expression + inputFile = SharedFilePath+os.path.basename(step1Input).replace('.csv', '.step5.csv') # replace to step5 input + + df_expr = pd.read_csv(SharedFilePath+inputExprFile, engine='c', index_col=0) # get expr + meta_data = pd.read_csv(SharedFilePath+metaName) # metadata + longDD_samples, shortDD_samples = step4_actscoreDiff._LoadDiseaseDuration(df_expr, meta_data, args.resultType) # get samples for LDD SDD + + geneList = _extract_geneSignature(inputFile, SharedFilePath+msigFile) # Extracting genes + print("Total extracted genes: "+ str(len(geneList))) + + gene_intersected = list(set(geneList).intersection(df_expr.index.tolist())) # intersected between expr and actScore sig + df_expr = df_expr[longDD_samples+shortDD_samples].loc[gene_intersected].dropna() # selected expr only + + outputFile = SharedFilePath+os.path.basename(step1Input).replace('.csv', '.step6.csv') # replace to step6 output + ranksumSig = StatHandler.calculate_ranksum(df_expr, shortDD_samples, longDD_samples) # get ranksum result + df_expr.loc[ranksumSig["Names"].values.tolist()].to_csv(outputFile) # Writing \ No newline at end of file diff --git a/aws_module/batch_deployment/feature_extraction_batch/batch_jobs/step7_geneRFECV.py b/aws_module/batch_deployment/feature_extraction_batch/batch_jobs/step7_geneRFECV.py new file mode 100644 index 0000000..ea629ca --- /dev/null +++ b/aws_module/batch_deployment/feature_extraction_batch/batch_jobs/step7_geneRFECV.py @@ -0,0 +1,50 @@ +__author__ = "Junhee Yoon" +__version__ = "1.0.0" +__maintainer__ = "Junhee Yoon" +__email__ = "swiri021@gmail.com" + +""" +Description: Mimic of notebook code for pipeline work, please see step2 in Jun notebook archive +Step4 is the same process with different input(activation socre -> gene expression) +""" + +import pandas as pd +import argparse +import os + +## call previous step for calling internal function +import step4_actscoreDiff +from libraries.statFunction import StatHandler + + +parser = argparse.ArgumentParser(prog='actscoreDiff.py') +# Input data +parser.add_argument('-r','--rthresh', dest='rankingthresh', required=True,\ + help='Threshold for RFECV" ') +parser.add_argument('-t','--type', dest='resultType', default='RR,CIS',\ + help='Result type, ex: long, healthy, "RR,CIS" ') +args = parser.parse_args() + +# Simple control for snakemake(no argparse) +if __name__ == "__main__": + + SharedFilePath = os.environ['efspoint'] # Main data path here, goes to EFS volume + metaName = os.environ['metafile'] # EPIC_HCvB_metadata_baseline_updated-share.csv + msigFile = os.environ['msigDBPATH'] # msigdb.v7.4.entrez.gmt + step1Input = os.environ['startFile'] # counts_vst_CD4.csv + inputFile = SharedFilePath+os.path.basename(step1Input).replace('.csv', '.step6.csv') # replace to step4 input + + #Data loading + df = pd.read_csv(inputFile, engine='c', index_col=0) + meta_data = pd.read_csv(SharedFilePath+metaName) + longDD_samples, shortDD_samples = step4_actscoreDiff._LoadDiseaseDuration(df, meta_data, args.resultType) + df = df[longDD_samples+shortDD_samples].dropna() # reform df with intersected samples + + # Make training samples + X = df.T.values # Training sample + y = [0]*len(longDD_samples)+[1]*len(shortDD_samples) # Training y + + # features and ranking + outputFile = SharedFilePath+os.path.basename(step1Input).replace('.csv', '.step7.csv') # replace to step5 output + rankarr = StatHandler.calculate_RFECV(df, X, y, args.rankingthresh) # get ranksum result + df.loc[df.index[rankarr]].to_csv(outputFile) # Writing \ No newline at end of file diff --git a/aws_module/batch_deployment/feature_extraction_batch/batch_jobs/step8_upload_to_s3.py b/aws_module/batch_deployment/feature_extraction_batch/batch_jobs/step8_upload_to_s3.py new file mode 100644 index 0000000..337f223 --- /dev/null +++ b/aws_module/batch_deployment/feature_extraction_batch/batch_jobs/step8_upload_to_s3.py @@ -0,0 +1,22 @@ +__author__ = "Junhee Yoon" +__version__ = "1.0.0" +__maintainer__ = "Junhee Yoon" +__email__ = "swiri021@gmail.com" + +""" +Description: This is batch job for uploading result file to S3. +""" +import os +import argparse +from libraries.botoClass import botoHandler + +if __name__ == "__main__": + uploadDataBucket = os.environ['uploadbucket'] # openkbc-ms-casting-bucket + SharedFilePath = os.environ['efspoint'] # Main data path here, goes to EFS volume, /output/ + step1Input = os.environ['startFile'] # counts_vst_CD4.csv + + ### Data prepration + outputFile = SharedFilePath+os.path.basename(step1Input).replace('.csv', '.step7.csv') # replace to step6 output + s3 = botoHandler(uploadDataBucket) # Call boto3 + f = open(outputFile, 'r').read() + s3.uploadFile(outputFile, f, datatype='txt') \ No newline at end of file diff --git a/aws_module/batch_deployment/feature_extraction_batch/batch_module_singleJob.sh b/aws_module/batch_deployment/feature_extraction_batch/batch_module_singleJob.sh new file mode 100644 index 0000000..a0b0377 --- /dev/null +++ b/aws_module/batch_deployment/feature_extraction_batch/batch_module_singleJob.sh @@ -0,0 +1,26 @@ +#!/bin/bash + +## Need to push job docker images before running this module, this module is an example for how to run single job for AWS batch. +## This is same pipeline with snakemake pipeline and it will generate DEG result for one cell type. +# echo "Creating EFS to share files.." + + +echo "Creating compute environment.." +aws batch create-compute-environment --compute-environment-name feature-ext-pipeline-env \ +--type MANAGED --compute-resources type=FARGATE,maxvCpus=8,securityGroupIds=sg-08946d1b26a30d376,subnets=[subnet-46231822,subnet-5c5f8b53] + +sleep 3 + +echo "Creating job queue.." +aws batch create-job-queue --job-queue-name feature-ext-pipeline-queue --compute-environment-order order=1,computeEnvironment=feature-ext-pipeline-env --priority 100 + +echo "Creating job.." +aws batch register-job-definition --job-definition-name feature-ext-pipeline-job --platform-capabilities FARGATE \ +--type container --container-properties file://container_configure.json + +echo "Submit.." +aws batch submit-job --cli-input-json file://submit_configure.json > job.submitted + +sleep 2 + +echo "Job has been submitted and go to console for checking further status.." \ No newline at end of file diff --git a/aws_module/batch_deployment/feature_extraction_batch/container_configure.json b/aws_module/batch_deployment/feature_extraction_batch/container_configure.json new file mode 100644 index 0000000..25b6a65 --- /dev/null +++ b/aws_module/batch_deployment/feature_extraction_batch/container_configure.json @@ -0,0 +1,67 @@ +{ + "image" : "swiri021/deg_pipeline:v1.0.0", + "jobRoleArn" : "arn:aws:iam::601333025120:role/ecsTaskExecutionRole", + "executionRoleArn" : "arn:aws:iam::601333025120:role/ecsTaskExecutionRole", + + "command":[ "sh", "pipeline_controller.sh"], + "mountPoints": [ + { + "sourceVolume": "efsVolume", + "containerPath": "/output", + "readOnly": false + } + ], + "volumes": [ + { + "name": "efsVolume", + "efsVolumeConfiguration": { + "fileSystemId": "fs-017866e38b27933d8" + } + } + ], + + "networkConfiguration": { + "assignPublicIp": "ENABLED" + }, + + "fargatePlatformConfiguration" : { + "platformVersion": "1.4.0" + }, + + "resourceRequirements" : [ + { + "value":"2", + "type":"VCPU" + }, + { + "value":"5120", + "type":"MEMORY" + } + ], + "environment": [ + { + "name": "metafile", + "value": "EPIC_HCvB_metadata_baseline_updated-share.csv" + }, + { + "name": "efspoint", + "value": "/output/" + }, + { + "name": "msigDBPATH", + "value": "msigdb.v7.4.entrez.gmt" + }, + { + "name": "startFile", + "value": "counts_vst_CD4.csv" + }, + { + "name": "mainbucket", + "value": "openkbc-ms-maindata-bucket" + }, + { + "name": "uploadbucket", + "value": "openkbc-ms-batchresult-bucket" + } + ] +} \ No newline at end of file diff --git a/aws_module/batch_deployment/feature_extraction_batch/submit_configure.json b/aws_module/batch_deployment/feature_extraction_batch/submit_configure.json new file mode 100644 index 0000000..6d32f67 --- /dev/null +++ b/aws_module/batch_deployment/feature_extraction_batch/submit_configure.json @@ -0,0 +1,13 @@ +{ + "jobName": "feature-ext-pipeline-job", + "jobQueue": "feature-ext-pipeline-queue", + "arrayProperties": { + "size": 3 + }, + "dependsOn":[ + { + "type":"SEQUENTIAL" + } + ], + "jobDefinition": "deg-pipeline-job" +} \ No newline at end of file