|
| 1 | +# Writing a workflow - Tutorial |
| 2 | + |
| 3 | +The explicit aim of cgat-core is to allow users to quickly and easily build their own computational pipelines that will speed up your analysis workflow. |
| 4 | + |
| 5 | +## Installation of cgat-core |
| 6 | + |
| 7 | +In order to begin writing a pipeline, you will need to install the cgat-core code (see installation instructions in the "Getting Started" section). |
| 8 | + |
| 9 | +## Tutorial start |
| 10 | + |
| 11 | +### Setting up the pipeline |
| 12 | + |
| 13 | +**1.** First, navigate to a directory where you want to start building your code: |
| 14 | + |
| 15 | +```bash |
| 16 | +mkdir test && cd test && mkdir configuration && touch configuration/pipeline.yml && touch pipeline_test.py && touch ModuleTest.py |
| 17 | +``` |
| 18 | + |
| 19 | +This command will create a directory called `test` in the current directory with the following layout: |
| 20 | + |
| 21 | +``` |
| 22 | +|-- configuration |
| 23 | +| \-- pipeline.yml |
| 24 | +|-- pipeline_test.py |
| 25 | +|-- ModuleTest.py |
| 26 | +``` |
| 27 | + |
| 28 | +The layout has the following components: |
| 29 | + |
| 30 | +- **pipeline_test.py**: This is the file that will contain all of the ruffus workflows. The file needs to be named in the format `pipeline_<name>.py`. |
| 31 | +- **test/**: Directory containing the configuration `.yml` file. The directory needs to have the same name as the `pipeline_<name>.py` file. This folder will contain the `pipeline.yml` configuration file. |
| 32 | +- **ModuleTest.py**: This file will contain functions that will be imported into the main ruffus workflow file (`pipeline_test.py`). |
| 33 | + |
| 34 | +**2.** View the source code within `pipeline_test.py` |
| 35 | + |
| 36 | +This is where the ruffus tasks will be written. The code begins with a docstring detailing the pipeline functionality. You should use this section to document your pipeline: |
| 37 | + |
| 38 | +```python |
| 39 | +'''This pipeline is a test and this is where the documentation goes ''' |
| 40 | +``` |
| 41 | + |
| 42 | +The pipeline then needs a few utility functions to help with executing the pipeline. |
| 43 | + |
| 44 | +- **Import statements**: You will need to import ruffus and cgatcore utilities: |
| 45 | + |
| 46 | +```python |
| 47 | +from ruffus import * |
| 48 | +import cgatcore.experiment as E |
| 49 | +from cgatcore import pipeline as P |
| 50 | +``` |
| 51 | + |
| 52 | +Importing `ruffus` allows ruffus decorators to be used within the pipeline. |
| 53 | +Importing `experiment` from `cgatcore` provides utility functions for argument parsing, logging, and record-keeping within scripts. |
| 54 | +Importing `pipeline` from `cgatcore` provides utility functions for interfacing CGAT ruffus pipelines with an HPC cluster, uploading data to a database, and parameterisation. |
| 55 | + |
| 56 | +You'll also need some Python modules: |
| 57 | + |
| 58 | +```python |
| 59 | +import os |
| 60 | +import sys |
| 61 | +``` |
| 62 | + |
| 63 | +- **Config parser**: This code helps with parsing the `pipeline.yml` file: |
| 64 | + |
| 65 | +```python |
| 66 | +# Load options from the config file |
| 67 | +PARAMS = P.get_parameters([ |
| 68 | + "%s/pipeline.yml" % os.path.splitext(__file__)[0], |
| 69 | + "../pipeline.yml", |
| 70 | + "pipeline.yml"]) |
| 71 | +``` |
| 72 | + |
| 73 | +- **Pipeline configuration**: We will add configurable variables to our `pipeline.yml` file so that we can modify the output of our pipeline. Open `pipeline.yml` and add the following: |
| 74 | + |
| 75 | +```yaml |
| 76 | +database: |
| 77 | + name: "csvdb" |
| 78 | +``` |
| 79 | +
|
| 80 | +When you run the pipeline, the configuration variables (in this case `csvdb`) can be accessed in the pipeline by `PARAMS["database_name"]`. |
| 81 | + |
| 82 | +- **Database connection**: This code helps with connecting to an SQLite database: |
| 83 | + |
| 84 | +```python |
| 85 | +def connect(): |
| 86 | + '''Utility function to connect to the database. |
| 87 | +
|
| 88 | + Use this method to connect to the pipeline database. |
| 89 | + Additional databases can be attached here as well. |
| 90 | +
|
| 91 | + Returns an sqlite3 database handle. |
| 92 | + ''' |
| 93 | + dbh = sqlite3.connect(PARAMS["database_name"]) |
| 94 | + return dbh |
| 95 | +``` |
| 96 | + |
| 97 | +- **Commandline parser**: This code allows the pipeline to parse arguments: |
| 98 | + |
| 99 | +```python |
| 100 | +def main(argv=None): |
| 101 | + if argv is None: |
| 102 | + argv = sys.argv |
| 103 | + P.main(argv) |
| 104 | +
|
| 105 | +if __name__ == "__main__": |
| 106 | + sys.exit(P.main(sys.argv)) |
| 107 | +``` |
| 108 | + |
| 109 | +### Running the test pipeline |
| 110 | + |
| 111 | +You now have the bare bones layout of the pipeline, and you need some code to execute. Below is example code that you can copy and paste into your `pipeline_test.py` file. The code includes two ruffus `@transform` tasks that parse `pipeline.yml`. The first function, called `countWords`, contains a statement that counts the number of words in the file. The statement is then executed using the `P.run()` function. |
| 112 | + |
| 113 | +The second ruffus `@transform` function called `loadWordCounts` takes as input the output of the function `countWords` and loads the number of words into an SQLite database using `P.load()`. |
| 114 | + |
| 115 | +The third function, `full()`, is a dummy task that runs the entire pipeline. It has an `@follows` decorator that takes the `loadWordCounts` function, completing the pipeline chain. |
| 116 | + |
| 117 | +The following code should be pasted just before the **Commandline parser** arguments and after the **database connection** code: |
| 118 | + |
| 119 | +```python |
| 120 | +# --------------------------------------------------- |
| 121 | +# Specific pipeline tasks |
| 122 | +@transform("pipeline.yml", |
| 123 | + regex("(.*)\.(.*)"), |
| 124 | + r"\1.counts") |
| 125 | +def countWords(infile, outfile): |
| 126 | + '''Count the number of words in the pipeline configuration files.''' |
| 127 | +
|
| 128 | + # The command line statement we want to execute |
| 129 | + statement = '''awk 'BEGIN { printf("word\tfreq\n"); } |
| 130 | + {for (i = 1; i <= NF; i++) freq[$i]++} |
| 131 | + END { for (word in freq) printf "%s\t%d\n", word, freq[word] }' |
| 132 | + < %(infile)s > %(outfile)s''' |
| 133 | +
|
| 134 | + # Execute the command in the variable statement. |
| 135 | + P.run(statement) |
| 136 | +
|
| 137 | +@transform(countWords, |
| 138 | + suffix(".counts"), |
| 139 | + "_counts.load") |
| 140 | +def loadWordCounts(infile, outfile): |
| 141 | + '''Load results of word counting into database.''' |
| 142 | + P.load(infile, outfile, "--add-index=word") |
| 143 | +
|
| 144 | +# --------------------------------------------------- |
| 145 | +# Generic pipeline tasks |
| 146 | +@follows(loadWordCounts) |
| 147 | +def full(): |
| 148 | + pass |
| 149 | +``` |
| 150 | + |
| 151 | +To run the pipeline, navigate to the working directory and then run the pipeline: |
| 152 | + |
| 153 | +```bash |
| 154 | +python /location/to/code/pipeline_test.py config |
| 155 | +python /location/to/code/pipeline_test.py show full -v 5 |
| 156 | +``` |
| 157 | + |
| 158 | +This will place the `pipeline.yml` in the folder. Then run: |
| 159 | + |
| 160 | +```bash |
| 161 | +python /location/to/code/pipeline_test.py make full -v5 --local |
| 162 | +``` |
| 163 | + |
| 164 | +The pipeline will then execute and count the words in the `yml` file. |
| 165 | + |
| 166 | +### Modifying the test pipeline to build your own workflows |
| 167 | + |
| 168 | +The next step is to modify the basic code in the pipeline to fit your particular NGS workflow needs. For example, suppose you want to convert a SAM file into a BAM file, then perform flag stats on that output BAM file. The code and layout that we just wrote can be easily modified to perform this. |
| 169 | + |
| 170 | +The pipeline will have two steps: |
| 171 | +1. Identify all SAM files and convert them to BAM files. |
| 172 | +2. Take the output of step 1 and perform flag stats on that BAM file. |
| 173 | + |
| 174 | +The first step would involve writing a function to identify all `sam` files in a `data.dir/` directory and convert them to BAM files using `samtools view`. The second function would then take the output of the first function, perform `samtools flagstat`, and output the results as a flat `.txt` file. This would be written as follows: |
| 175 | + |
| 176 | +```python |
| 177 | +@transform("data.dir/*.sam", |
| 178 | + regex("data.dir/(\S+).sam"), |
| 179 | + r"\1.bam") |
| 180 | +def bamConvert(infile, outfile): |
| 181 | + '''Convert a SAM file into a BAM file using samtools view.''' |
| 182 | + |
| 183 | + statement = '''samtools view -bT /ifs/mirror/genomes/plain/hg19.fasta \ |
| 184 | + %(infile)s > %(outfile)s''' |
| 185 | + P.run(statement) |
| 186 | +
|
| 187 | +@transform(bamConvert, |
| 188 | + suffix(".bam"), |
| 189 | + "_flagstats.txt") |
| 190 | +def bamFlagstats(infile, outfile): |
| 191 | + '''Perform flagstats on a BAM file.''' |
| 192 | + |
| 193 | + statement = '''samtools flagstat %(infile)s > %(outfile)s''' |
| 194 | + P.run(statement) |
| 195 | +``` |
| 196 | + |
| 197 | +To run the pipeline: |
| 198 | + |
| 199 | +```bash |
| 200 | +python /path/to/file/pipeline_test.py make full -v5 |
| 201 | +``` |
| 202 | + |
| 203 | +The BAM files and flagstats outputs should be generated. |
| 204 | + |
| 205 | +### Parameterising the code using the `.yml` file |
| 206 | + |
| 207 | +As a philosophy, we try and avoid any hardcoded parameters, so that any variables can be easily modified by the user without changing the code. |
| 208 | + |
| 209 | +Looking at the code above, the hardcoded link to the `hg19.fasta` file can be added as a customisable parameter, allowing users to specify any FASTA file depending on the genome build used. In the `pipeline.yml`, add: |
| 210 | + |
| 211 | +```yaml |
| 212 | +genome: |
| 213 | + fasta: /ifs/mirror/genomes/plain/hg19.fasta |
| 214 | +``` |
| 215 | + |
| 216 | +In the `pipeline_test.py` code, the value can be accessed via `PARAMS["genome_fasta"]`. |
| 217 | +Therefore, the code for parsing BAM files can be modified as follows: |
| 218 | + |
| 219 | +```python |
| 220 | +@transform("data.dir/*.sam", |
| 221 | + regex("data.dir/(\S+).sam"), |
| 222 | + r"\1.bam") |
| 223 | +def bamConvert(infile, outfile): |
| 224 | + '''Convert a SAM file into a BAM file using samtools view.''' |
| 225 | +
|
| 226 | + genome_fasta = PARAMS["genome_fasta"] |
| 227 | +
|
| 228 | + statement = '''samtools view -bT %(genome_fasta)s \ |
| 229 | + %(infile)s > %(outfile)s''' |
| 230 | + P.run(statement) |
| 231 | +
|
| 232 | +@transform(bamConvert, |
| 233 | + suffix(".bam"), |
| 234 | + "_flagstats.txt") |
| 235 | +def bamFlagstats(infile, outfile): |
| 236 | + '''Perform flagstats on a BAM file.''' |
| 237 | + |
| 238 | + statement = '''samtools flagstat %(infile)s > %(outfile)s''' |
| 239 | + P.run(statement) |
| 240 | +``` |
| 241 | + |
| 242 | +Running the code again will generate the same output. However, if you had BAM files that came from a different genome build, the parameter in the `yml` file can be easily modified, the output files deleted, and the pipeline run again with the new configuration values. |
| 243 | + |
0 commit comments