The MapReduce architecture of the GATK allows most walkers in the GATK to be run in a parallel-processing mode. The GATK supports two basic parallel processing models known as shared memory and scatter-gather.
Shared memory parallelism
Parallelism within a single multi-threading process with access to a large, shared RAM. Shared memory parallelism is stable and supported by many tools that access pileups of bases.
Scatter/gather (SG) parallelism
In SG parallelism, the target genomic regions are divided up into N independent GATK instances that are run separately on a single machine or across a computing cluster. The outputs of each independent walker, are merged together once all are completed. SG works very efficiently in the GATK, provided the output of a walker is independent per site (e.g. Unified Genotyper) or per chromosome (e.g. Indel Realigner). SG parallelism is a completely stable approach in the GATK, and used routinely by the GATK team in processing large data sets; it is also natively supported by GATK-Queue, which automatically scatters and gathers GATK processes given a desired N number of processes to execute simultaneously.
Note that parallel-processing will significantly speed up data processing but may produce statistically insignificant differences. While this non-determinism is not ideal in practice the minute differences have been mathematically meaningless while producing consistent results in a reasonable amount of time for whole genome and whole exome data. However, if absolute determinism is more important than speed we recommend you do not use parallelism with the GATK.
There are costs and benefits to each type of parallelism in the GATK, as outlined in the following table.
Comparison of standard parallelism approaches in the GATK
| Property | Shared Memory | Scatter/Gather |
|---|---|---|
| Stability | Stable | Stable | Retired in codebase |
| Applicable walker types | By locus and by ROD only. ReadWalkers are not supported. | All walker types. ReadWalkers can only be split safely by chromosome in general |
| Example walkers | UnifiedGenotyper, BaseRecalibrator, VariantEval | All walkers, including ReadWalkers like IndelRealigner |
| Scalability | Fewer than 32 cores. Each thread operates completely independently, so N threads requires N times more memory than 1 thread alone. Best scaling at 8 or fewer threads. | Hundreds of processes. Limited by capabilities of the underlying storage system, in general. Isilon-class storage can run thousands of jobs effectively. |
| How to enable | Use the -nt argument in the Engine, on any walker that supports shared memory parallelism (the engine will tell you if it does not). |
1. Provide -L interval lists to the GATK; a different one for each of the N independent GATK tools. For example -L chr1 for first process, and -L chr2 for the second. When all processes have finished, merge the output together, as appropriate (e.g. use MergeSam.jar for BAMs, and cat or CombineVariants for VCFs). 2. Use GATK-Queue to automatically divide up your GATK jobs. For example, using this.scatterCount = 10 argument will result in 10 independent processes. |
| Pros | - Easy to enable. - Single output file merged together by internally by the GATK engine - Efficiently uses multi-core processors sharing a single memory space | - Works for all walker types, including ReadWalkers - Scales to hundreds of independent jobs - Easy to enable with single -L argument - Directly supported and managed by GATK-Queue - Totally independent processing per interval -- failed parts can be easily resumed without repeating already successfully processed regions |
| Cons | - Limited to fewer than 32 processors without significant overhead - Limited to cores physically present on the machine, cannot take advantage of computing cluster resources - Does not work for ReadWalkers (Table Recalibrator, Indel Realigner) | - Requires manual preparation of sliced genomic intervals for processing (if you aren't using GATK-Queue). - For ReadWalkers and other tools that can only be processed by chromosome, leading to load balancing problems (chr1 is much bigger than chr22) - Sensitive to data density variation over the genome. Dividing chr20 processing in 63 1MB chunks leads to 10x differences in completion times due to data pileups around the centromere, for example. - Must wait until all parts of the scatter have completed before gathering, making the process sensitive to farm scheduling problems. If a job will run for M minutes, but waits Z minutes to start on the farm, the entire SG process will not complete for at least M + Z minutes. |
Almost certainly, either shared memory or scatter/gather parallelism is the right choice for your NGS pipeline. Our go-to option for parallelism here at the Broad Institute is S/G, since S/G allows you to cut up your jobs into hundreds of pieces to run on our standard computing farm, using GATK-Queue. When I have a small job that needs to be run quickly, am testing out program options or need a quick VariantEval result, I'm using shared memory parallelism with ~10 threads on a single large computer with a lot (>64 GB) of memory.
Basically, if I have N processors, and I want to choose between shared memory or S/G, here's how I would choose:
If all N processors are on a single computer, and my walker supports it, use shared memory parallelism
If not, use S/G
The GATK currently supports a hierarchical version of parallelism. In this form of parallelism, data is divided into shards, each shard is map/reduced independently, and the results are combined with a 'tree reduce' step. While the framework handles much of the heavy lifting of data division required for parallelism, each walker must individually be reviewed to ensure that it isn't tracking state internally in a non-threadsafe way. Many tools support shared memory parallelism, including critical tools such as:
Please review the source to discover if your walker is parallelizable, or attempt to run it with -nt 2 and if it the engine doesn't complain the walker is parallelized.
In shared memory parallelism, each thread operates on a 16 kbp shard of reference sequence in a completely independent memory space from all other threads. Up to 24 threads can run efficiently in this design, but greater parallelism is limited by resource starvation from the single reduce thread and/or memory inefficiency by keeping each thread’s data totally independent. See gatkParallelism performance 082112 these plots for an analysis of the scalability of several key GATK tools as a function of nt.
Run the GATK, using the -nt command-line argument to specify the number of threads that the GATK should attempt to use.
[[image:HierarchicalParallelism.png|thumb|Shared memory parallelism architecture]]
First, be aware that some walkers may, by design, require a rewrite for complete parallelization.
When implementing a standard (non-parallelized) walker, one must implement the reduce method, which combines an individual datum returned by the map function with the aggregate of the prior map calls. When implementing a parallelizable walker, one must also implement the org.broadinstitute.sting.gatk.walkers.TreeReducible interface and the treeReduce() function. The TreeReduce function tells the GATK how to combine two adjacent reduces, rather than one map result and one reduce.
The GATK supports writing to output files from either the map or the reduce when running in parallel. However, only unbuffered writers are currently supported. Please use PrintStream rather than PrintWriter at this time.
The GATK's support for parallelism is currently limited. The following classes of walkers are not supported by our parallelization framework:
Note that each thread operates completely independently in the current GATK implementation of shared memory parallelism. So if you provide 1Gb to the GATK with nt 1, then you should provide 4Gb to run with nt 4. If you don't do this, it is possible to starve out the GATK so that it runs much much slower.
The performance of the multi-threaded GATK is really dependent on whether you are IO or CPU limited and the relative overhead of the parallelism on your computer. Additionally, nt can start to have very high overheads with nt > 20 due to our implementation and memory contention issues.
The best option for nt is a value less or equal to the number of available cores with sufficient memory to run each threads (nt x amount provided to 1 core), capped additionally by the available IO bandwidth so that the individual threads don't starve each other.
Scatter / gather is a simple approach for process-independent parallelism with the GATK. First you scatter multiple independent GATK instances out over a network of computing nodes, each operating on slightly different genomic intervals, and when they all complete, the output of each is gathered up into a merged output dataset. In the GATK S/G is extremely easy to use, as all of the GATK tools support the -L option to operate over only genomic specific intervals, and many tools emit files that can be merged together across multiple regions. Unified Genotyper, for example, can operate over the whole genome in one process, or on each chromosome independently. The output of this later approach, after merging together, should be the same as the whole genome results, minus any slight differences due to random number effects. The simplicity and efficiency of S/G parallelism makes this a key option for getting things done quickly with the GATK.
Using S/G parallelism is extremely easy, either by hand or using the automated Scatter/Gather in GATK-Queue. Suppose I have the following command line:
java -jar GenomeAnalysisTK -R human.fasta -T UnifiedGenotyper -I my.bam -L chr1
This runs a single process of the GATK over chr1, and let's say it takes an hour when I run it. In order to run it with S/G parallelism mode, just partition chr1 into two independent parts:
This file distributed.tracker.txt will contain genomic locations and GATK process ids that are processing each location, in text format, so you can cat it. If you run at the command line:
gsa1> java -jar GenomeAnalysisTK -R human.fasta -T UnifiedGenotyper -I my.bam -L chr1:1-125,000,000 -o my.1.vcf &
gsa1> java -jar GenomeAnalysisTK -R human.fasta -T UnifiedGenotyper -I my.bam -L chr1:125,000,001-249,250,621 -o my.2.vcf &
When these two jobs finish, I just merge the two VCFs together and I've got a complete data set in half the time.