Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add logging for task filter_mudata #257

Merged
merged 12 commits into from
Apr 22, 2024
51 changes: 46 additions & 5 deletions panpipes/panpipes/pipeline_ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,17 @@

import yaml

import logging
# L = logging.getLogger('panpipes')
# L.setLevel(logging.INFO)
# log_handler = logging.StreamHandler(sys.stdout)
# log_formatter = logging.Formatter('%(asctime)s: %(levelname)s - %(message)s')
# log_handler.setFormatter(log_formatter)
# L.addHandler(log_handler)

def get_logger():
return logging.getLogger("cgatcore.pipeline")


PARAMS = P.get_parameters(
["%s/pipeline.yml" % os.path.splitext(__file__)[0],
Expand Down Expand Up @@ -141,9 +152,12 @@ def load_mudatas(rna_path, outfile,
if bcr_path is not None and pd.notna(bcr_path):
cmd += " --bcr_filtered_contigs %(bcr_path)s"
cmd += " --bcr_filetype %(bcr_filetype)s"
cmd += " > logs/load_mudatas_%(sample_id)s.log"
logfile = f"load_mudatas_{sample_id}.log"
cmd += f" > logs/{logfile}"
# print(cmd)
job_kwargs["job_threads"] = PARAMS['resources_threads_medium']
log_msg = f"TASK: 'load_mudatas'" + f" IN CASE OF ERROR, PLEASE REFER TO : '{logfile}' FOR MORE INFORMATION."
get_logger().info(log_msg)
P.run(cmd, **job_kwargs)


Expand Down Expand Up @@ -173,8 +187,11 @@ def concat_filtered_mudatas(infiles, outfile):
cmd += " --protein_var_table %(protein_metadata_table)s"
if PARAMS['index_col_choice'] is not None:
cmd += " --protein_new_index_col %(index_col_choice)s"
cmd += " > logs/concat_filtered_mudatas.log"
logfile = "logs/concat_filtered_mudatas.log"
cmd += f" > {logfile}"
job_kwargs["job_threads"] = PARAMS['resources_threads_high']
log_msg = f"TASK: 'concat_filtered_mudatas'" + f" IN CASE OF ERROR, PLEASE REFER TO : '{logfile}' FOR MORE INFORMATION."
get_logger().info(log_msg)
P.run(cmd, **job_kwargs)
# P.run("rm tmp/*", job_threads=PARAMS['resources_threads_low'])

Expand Down Expand Up @@ -254,8 +271,11 @@ def load_bg_mudatas(rna_path, outfile,
cmd += " --protein_var_table %(protein_metadata_table)s" #check which of these 2 needs to stay!!!
if PARAMS['index_col_choice'] is not None:
cmd += " --protein_new_index_col %(index_col_choice)s"
cmd += " > logs/load_bg_mudatas_%(sample_id)s.log"
logfile = f"logs/load_bg_mudatas_%(sample_id)s.log"
cmd += " > {logfile}"
job_kwargs["job_threads"] = PARAMS['resources_threads_medium']
log_msg = f"TASK: 'load_bg_mudatas'" + f" IN CASE OF ERROR, PLEASE REFER TO : '{logfile}' FOR MORE INFORMATION."
get_logger().info(log_msg)
P.run(cmd, **job_kwargs)


Expand All @@ -273,6 +293,7 @@ def downsample_bg_mudatas(infile, outfile):
--downsample_value 20000 > %(outfile)s
"""
job_kwargs["job_threads"] = PARAMS['resources_threads_medium']
# TODO: add log file?
P.run(cmd, **job_kwargs)


Expand Down Expand Up @@ -301,8 +322,11 @@ def concat_bg_mudatas(infiles, outfile):
# if PARAMS["barcode_mtd_include"] is True:
# cmd += " --barcode_mtd_df %(barcode_mtd_path)s"
# cmd += " --barcode_mtd_metadatacols %(barcode_mtd_metadatacols)s"
cmd += " > logs/concat_bg_mudatas.log"
logfile = "logs/concat_bg_mudatas.log"
cmd += " > {logfile}"
job_kwargs["job_threads"] = PARAMS['resources_threads_high']
log_msg = f"TASK: 'concat_bg_mudatas'" + f" IN CASE OF ERROR, PLEASE REFER TO : '{logfile}' FOR MORE INFORMATION."
get_logger().info(log_msg)
P.run(cmd, **job_kwargs)
# P.run("rm tmp/*", job_threads=PARAMS['resources_threads_low'])

Expand Down Expand Up @@ -344,8 +368,11 @@ def run_scrublet(infile, outfile, sample_id):
cmd += " --use_thr %(scr_use_thr)s"
if PARAMS['scr_call_doublets_thr'] is not None:
cmd += " --call_doublets_thr %(scr_call_doublets_thr)s"
cmd += " > logs/run_scrublet_" + sample_id + ".log"
logfile = "run_scrublet_" + sample_id + ".log"
cmd += f" > logs/{logfile}"
job_kwargs["job_threads"] = PARAMS['resources_threads_medium']
log_msg = f"TASK: 'run_scrublet'" + f" IN CASE OF ERROR, PLEASE REFER TO : '{logfile}' FOR MORE INFORMATION."
get_logger().info(log_msg)
P.run(cmd,**job_kwargs)
IOTools.touch_file(outfile)

Expand Down Expand Up @@ -391,6 +418,8 @@ def run_rna_qc(log_file, outfile, unfilt_file):
# add log file
cmd += " > %(log_file)s"
job_kwargs["job_threads"] = PARAMS['resources_threads_high']
log_msg = f"TASK: 'run_rna_qc'" + f" IN CASE OF ERROR, PLEASE REFER TO : '{log_file}' FOR MORE INFORMATION."
get_logger().info(log_msg)
P.run(cmd, **job_kwargs)
if os.path.exists("cache"):
P.run("rm -r cache")
Expand Down Expand Up @@ -437,6 +466,8 @@ def run_scanpy_prot_qc(log_file, outfile, unfilt_file):
# add log file
cmd += " > %(log_file)s"
job_kwargs["job_threads"] = PARAMS['resources_threads_high']
log_msg = f"TASK: 'run_scanpy_prot_qc'" + f" IN CASE OF ERROR, PLEASE REFER TO : '{log_file}' FOR MORE INFORMATION."
get_logger().info(log_msg)
P.run(cmd, **job_kwargs)
pass

Expand Down Expand Up @@ -467,6 +498,8 @@ def run_dsb_clr(outfile, unfilt_file, bg_file):
cmd += " --bg_mudata %(bg_file)s"
cmd += " > %(outfile)s"
job_kwargs["job_threads"] = PARAMS['resources_threads_high']
log_msg = f"TASK: 'run_dsb_clr'" + f" IN CASE OF ERROR, PLEASE REFER TO : '{outfile}' FOR MORE INFORMATION."
get_logger().info(log_msg)
P.run(cmd, **job_kwargs)

@follows(run_scanpy_prot_qc, run_dsb_clr)
Expand All @@ -493,6 +526,8 @@ def run_repertoire_qc(logfile, unfilt_file):
"""
cmd += " > %(logfile)s"
job_kwargs["job_threads"] = PARAMS['resources_threads_low']
log_msg = f"TASK: 'run_repertoire_qc'" + f" IN CASE OF ERROR, PLEASE REFER TO : '{outfile}' FOR MORE INFORMATION."
get_logger().info(log_msg)
P.run(cmd, **job_kwargs)

# -----------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -538,6 +573,8 @@ def run_atac_qc(log_file, outfile, unfilt_file):

cmd += " > %(log_file)s"
job_kwargs["job_threads"] = PARAMS['resources_threads_low']
log_msg = f"TASK: 'run_atac_qc'" + f" IN CASE OF ERROR, PLEASE REFER TO : '{log_file}' FOR MORE INFORMATION."
get_logger().info(log_msg)
P.run(cmd, **job_kwargs)

@follows(run_rna_qc, run_prot_qc, run_repertoire_qc, run_atac_qc)
Expand Down Expand Up @@ -573,6 +610,8 @@ def plot_qc(log_file, cell_file):
cmd += " --rep_qc_metrics %(pqrm)s"
cmd += " > %(log_file)s"
job_kwargs["job_threads"] = PARAMS['resources_threads_low']
log_msg = f"TASK: 'plot_qc'" + f" IN CASE OF ERROR, PLEASE REFER TO : '{log_file}' FOR MORE INFORMATION."
get_logger().info(log_msg)
P.run(cmd, **job_kwargs)


Expand All @@ -595,6 +634,8 @@ def run_assess_background(log_file, unfilt_file, bg_file):
"""
cmd += " > %(log_file)s"
job_kwargs["job_threads"] = PARAMS['resources_threads_high']
log_msg = f"TASK: 'run_assess_background'" + f" IN CASE OF ERROR, PLEASE REFER TO : '{log_file}' FOR MORE INFORMATION."
get_logger().info(log_msg)
P.run(cmd, **job_kwargs)


Expand Down
Loading