Skip to content

Commit

Permalink
save IBD obj instead of individual dataframes
Browse files Browse the repository at this point in the history
  • Loading branch information
bguo068 committed Mar 17, 2023
1 parent e399057 commit adb44ee
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 32 deletions.
16 changes: 12 additions & 4 deletions bin/proc_dist_ne.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,16 @@
genome_14_100 = ibdutils.Genome.get_genome("simu_14chr_100cm")
ibd = ibdutils.IBD(genome=genome_14_100, label=f"{idx}_orig")
ibd.read_ibd(ibd_fn_lst=args.ibd_files)
ibd.calc_ibd_cov()
ibd.find_peaks()


# output files:
ofs_ibd_pq = f"{args.genome_set_id}_ibddist_ibd.pq"
of_ibddist_obj = f"{args.genome_set_id}.ibddist.ibdobj.gz"


# store combined IBD for IBD distribution analysis
ibd._df.to_parquet(ofs_ibd_pq)
ibd.pickle_dump(of_ibddist_obj)


# remove highly relatedness samples
Expand All @@ -53,13 +55,17 @@
# calculate coverage and remove peaks
ibd.calc_ibd_cov()
ibd.find_peaks()
of_orig_ibdne_obj = f"{args.genome_set_id}_orig.ibdne.ibdobj.gz"
ibd.pickle_dump(of_orig_ibdne_obj)

ibd2 = ibd.duplicate(f"{idx}_rmpeaks")
ibd2.remove_peaks()
ibd2._df = ibd2.cut_and_split_ibd()
of_rmpeaks_ibdne_obj = f"{args.genome_set_id}_rmpeaks.ibdne.ibdobj.gz"
ibd2.pickle_dump(of_rmpeaks_ibdne_obj)

# link ibdne.jar file
if not Path(f"ibdne.jar").exists():
if not Path("ibdne.jar").exists():
assert Path(args.ibdne_jar).exists()
this = Path("ibdne.jar")
target = Path(args.ibdne_jar).absolute()
Expand Down Expand Up @@ -90,13 +96,15 @@
print(
f"""
output files:
{ofs_ibd_pq}
{of_ibddist_obj}
ibdne.jar
{idx}_orig.sh
{idx}_orig.map
{idx}_orig.ibd.gz
{idx}_rmpeaks.sh
{idx}_rmpeaks.map
{idx}_rmpeaks.ibd.gz
{of_orig_ibdne_obj}
{of_rmpeaks_ibdne_obj}
"""
)
8 changes: 4 additions & 4 deletions bin/proc_infomap.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@


# output files:
ofs_ifm_orig_ibd_pq = f"{args.genome_set_id}_ifm_orig_ibd.pq"
ofs_ifm_rmpeaks_ibd_pq = f"{args.genome_set_id}_ifm_rmpeaks_ibd.pq"
of_ifm_orig_ibd_obj = f"{args.genome_set_id}_orig.ifm.ibdobj.gz"
of_ifm_rmpeaks_ibd_obj = f"{args.genome_set_id}_rmpeaks.ifm.ibdobj.gz"


# remove highly relatedness samples
Expand All @@ -34,5 +34,5 @@
ibd2 = ibd.duplicate("rmpeak")
ibd2.remove_peaks()

ibd._df.to_parquet(ofs_ifm_orig_ibd_pq)
ibd2._df.to_parquet(ofs_ifm_rmpeaks_ibd_pq)
ibd.pickle_dump(of_ifm_orig_ibd_obj)
ibd2.pickle_dump(of_ifm_rmpeaks_ibd_obj)
18 changes: 7 additions & 11 deletions bin/run_infomap.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
#! /usr/bin/env python3

import pandas as pd
import numpy as np
from ibdutils.utils.ibdutils import IBD, Genome
from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser

import numpy as np
import pandas as pd
from ibdutils.utils.ibdutils import IBD


def parse_args():
p = ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter)
p.add_argument("--ibd_pq", type=str, required=True)
p.add_argument("--ibd_obj", type=str, required=True)
p.add_argument("--npop", type=int, required=True)
p.add_argument("--nsam", type=int, required=True)
p.add_argument("--genome_set_id", type=int, required=True)
Expand All @@ -26,9 +27,7 @@ def parse_args():

def run(args) -> pd.DataFrame:

g = Genome.get_genome("simu_14chr_100cm")
ibd = IBD(genome=g, label=f"gsid_{args.genome_set_id}")
ibd.read_ibd([args.ibd_pq], format="parquet")
ibd = IBD.pickle_load(args.ibd_obj)

# make meta data
meta = pd.DataFrame(
Expand All @@ -46,13 +45,10 @@ def run(args) -> pd.DataFrame:
return member_df


def main():
if __name__ == "__main__":
args = parse_args()
member_df = run(args)

ofs = f"{args.genome_set_id}_{args.cut_mode}_member.pq"
member_df.to_parquet(ofs)
print(member_df)


main()
28 changes: 16 additions & 12 deletions main.nf
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,8 @@ process PROC_DIST_NE {
publishDir "${resdir}/${genome_set_id}_${label}/ne_input/", pattern: "*.sh", mode: 'symlink'
publishDir "${resdir}/${genome_set_id}_${label}/ne_input/", pattern: "*.map", mode: 'symlink'
publishDir "${resdir}/${genome_set_id}_${label}/ne_input/", pattern: "*.ibd.gz", mode: 'symlink'
publishDir "${resdir}/${genome_set_id}_${label}/ibddist_ibd/", pattern: "*_ibddist_ibd.pq", mode: 'symlink'
publishDir "${resdir}/${genome_set_id}_${label}/ibddist_ibd/", pattern: "*.ibddist.ibdobj.gz", mode: 'symlink'
publishDir "${resdir}/${genome_set_id}_${label}/ibdne_ibd/", pattern: "*.ibdne.ibdobj.gz", mode: 'symlink'

input:
tuple val(label), path(ibd_lst), val(genome_set_id)
Expand All @@ -171,7 +172,8 @@ process PROC_DIST_NE {
path("*_orig.map"), path("*_orig.ibd.gz"), emit: ne_input_orig
tuple val(label), path("ibdne.jar"), path("*_rmpeaks.sh"), \
path("*_rmpeaks.map"), path("*_rmpeaks.ibd.gz"), emit: ne_input_rmpeaks
tuple val(label), path("*_ibddist_ibd.pq"), emit: ibddist_ibd_pq
tuple val(label), path("*.ibddist.ibdobj.gz"), emit: ibddist_ibd_obj
tuple val(label), path("*.ibdne.ibdobj.gz"), emit: ibdne_ibd_obj
script:
def args_local = [
ibd_files: "${ibd_lst}", // path is a blank separate list
Expand All @@ -185,7 +187,9 @@ process PROC_DIST_NE {
touch ibdne.jar
touch ${genome_set_id}{_orig.sh,_orig.map,_orig.ibd.gz}
touch ${genome_set_id}{_rmpeaks.sh,_rmpeaks.map,_rmpeaks.ibd.gz}
touch ${genome_set_id}_ibddist_ibd.pq
touch ${genome_set_id}.ibddist.ibdobj.gz
touch ${genome_set_id}_orig.ibdne.ibdobj.gz
touch ${genome_set_id}_rmpeaks.ibdne.ibdobj.gz
"""
}

Expand All @@ -197,8 +201,8 @@ process PROC_INFOMAP {
input:
tuple val(label), path(ibd_lst), val(genome_set_id)
output:
tuple val(label), path("*_ifm_orig_ibd.pq"), emit: ifm_orig_ibd_pq
tuple val(label), path("*_ifm_rmpeaks_ibd.pq"), emit: ifm_rmpeaks_ibd_pq
tuple val(label), path("*_orig.ifm.ibdobj.gz"), emit: ifm_orig_ibd_obj
tuple val(label), path("*_rmpeaks.ifm.ibdobj.gz"), emit: ifm_rmpeaks_ibd_obj
script:
def args_local = [
ibd_files: "${ibd_lst}", // path is a blank separate list
Expand All @@ -209,7 +213,7 @@ process PROC_INFOMAP {
"""
stub:
"""
touch ${genome_set_id}{_ifm_orig_ibd.pq,_ifm_rmpeaks_ibd.pq}
touch ${genome_set_id}{_orig.ifm.ibdobj.gz,_rmpeaks.ifm.ibdobj.gz}
"""
}

Expand Down Expand Up @@ -238,13 +242,13 @@ process RUN_INFOMAP {
tag "${args.genome_set_id}_${are_peaks_removed}"
publishDir "${resdir}/${args.genome_set_id}_${label}/ifm_output/", mode: 'symlink'
input:
tuple val(label), path(ibd_pq), val(are_peaks_removed), val(args)
tuple val(label), path(ibd_obj), val(are_peaks_removed), val(args)
output:
tuple val(label), val(are_peaks_removed), path("*_member.pq")
script:
def cut_mode = are_peaks_removed? 'rmpeaks': 'orig'
def args_local = [
ibd_pq: ibd_pq,
ibd_obj: ibd_obj,
npop: args.npop,
nsam: args.nsam,
genome_set_id: args.genome_set_id,
Expand Down Expand Up @@ -308,7 +312,7 @@ workflow WF_SP {

// Process IBD for ibd distribution and ne analyses
PROC_DIST_NE(ch_ibd_per_genome)
// PROC_DIST_NE.out.ibddist_ibd_pq.view{label, ibdpq -> [label, ibdpq.getName()]}
// PROC_DIST_NE.out.ibddist_ibd_obj.view{label, ibdpq -> [label, ibdpq.getName()]}


// RUN IBDNe actually
Expand Down Expand Up @@ -370,13 +374,13 @@ workflow WF_MP {
// Process IBD for ibd distribution and ne analyses
PROC_INFOMAP(ch_ibd_per_genome)

// PROC_INFOMAP.out.ifm_orig_ibd_pq.view{label, ibdpq -> [label, ibdpq.getName()]}
// PROC_INFOMAP.out.ifm_orig_ibd_ob.view{label, ibdpq -> [label, ibdpq.getName()]}


// RUN INFOMAP
RUN_INFOMAP(
PROC_INFOMAP.out.ifm_orig_ibd_pq.map{it -> it + false}.combine(ch_mp_params, by:0).concat(
PROC_INFOMAP.out.ifm_rmpeaks_ibd_pq.map{it -> it + true}.combine(ch_mp_params, by:0)
PROC_INFOMAP.out.ifm_orig_ibd_obj.map{it -> it + false}.combine(ch_mp_params, by:0).concat(
PROC_INFOMAP.out.ifm_rmpeaks_ibd_obj.map{it -> it + true}.combine(ch_mp_params, by:0)
)
)

Expand Down
2 changes: 1 addition & 1 deletion nextflow.config
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ profiles{
}
process {
// For faster testing
conda = "/home/gbinux/mambaforge/envs/simulation"
// conda = "/home/gbinux/mambaforge/envs/simulation"

errorStrategy = {task.attempt < 5 ? 'retry': 'finish'}
maxRetries = 5
Expand Down

0 comments on commit adb44ee

Please sign in to comment.