Parallelism and the GATK
From GSA
Contents |
Overview
The MapReduce architecture of the GATK allows most walkers in the GATK to be in a parallel-processing mode. The GATK supports three basic parallel processing models:
- Shared memory parallelism
- Parallelism within a single multi-threading process with access to a large, shared RAM. Shared memory parallelism is stable and supported by many tools that access pileups of bases.
- Scatter/gather (SG) parallelism
- In SG parallelism, the target genomic regions are divided up into N independent GATK instances that are run separately on a single machine or across a computing cluster. The output of each independent walker, when all have completed, are merged together. SG works very efficiently in the GATK, provided the output of a walker is independent per site (such as the Unified Genotyper) or per chromosome (Table Recalibrator or Indel Realigner). SG parallelism is a completely stable approach in the GATK, and used routinely by the GATK team in processing large data sets; it is also natively supported by GATK-Queue, which automatically scatter and gather GATK processes given a desired N number of processes to execute simulataneously.
- Distributed parallelism
- Coordinated processing among separate GATK-Engine instances running on independent machines. Distributed parallelism is effectively a coordinated extension of SG parallelism where all of the GATK instances communicate to process a single data set, without pre-allocated regions of the genome to each instance. Distributed parallelism is a completely experimental system in the GATK, and may or may not work properly in some cases, due to bugs.
Comparison of GATK parallelism options
There are costs and benefits to each type of parallelism in the GATK, as outlined in the following table.
| Property | Shared Memory | Scatter/Gather | Distributed |
|---|---|---|---|
| Stability | Stable | Stable | Experimental |
| Applicable walker types | By locus and by rod only. ReadWalkers are not supported | All walker types. ReadWalkers can only be split safely by chromosome in general | By locus and by rod only. ReadWalkers are not supported |
| Example walkers | Unified Genotyper, CountCovariates, VariantEval | All walkers, including ReadWalkers like TableRecalibrator and Indel Realigner | Unified Genotyper, CountCovariates. Not VariantEval, because it accumulates data in memory |
| Scalability | Less than 32 cores. Each thread operates completely independently, so N threads requires N times more memory than 1 thread alone | Hundreds of processes. Limited by capabilities of the underlying storage system, in general. Isilon-class storage can run thousands of jobs effectively. | Hundreds of processes, currently, but likely will scale to thousands of inter-communicating processes in the coming years |
| How to enable | Use the -nt argument in the Engine, on any walker that supports shared memory parallelism (the engine will tell you if not) |
| Add tracker file argument to engine, and spawn off N jobs with different output files. See below for more details. |
| Pros |
|
|
|
| Cons |
|
|
|
Which parallelism option is right for me?
Almost certainly, either shared memory or scatter/gather parallelism is the right choice for your NGS pipeline. Our go-to option for parallelism here at the Broad Institute is S/G, since S/G allows you to cut up your jobs into hundreds of pieces to run on our standard computing farm, using GATK-Queue. When I have a small job that needs to be run quickly, am testing out program options or need a quick VariantEval result, I'm using shared memory parallelism with ~10 threads on a single large computer with a lot (>64 GB) of memory. In general I would not consider using distributed parallelism except for specific, experimental pipelines.
Basically, if I have N processors, and I want to choose between shared memory or S/G, here's how I would choose:
- If all N processors are on a single computer, and my walker supports it, use shared memory parallelism
- If not, use S/G
Shared Memory Parallelism (Stable)
The GATK currently supports a hierarchical version of parallelism. In this form of parallelism, data is divided into shards, each shard is map/reduced independently, and the results are combined with a 'tree reduce' step. While the framework handles much of the heavy lifting of data division required for parallelism, each walker must individually be reviewed to ensure that it isn't tracking state internally in a non-threadsafe way. Many tools support shared memory parallelism, including critical tools such as:
- UnifiedGenotyper
- CountCovariates
- VariantEval
Please review the source to discover if your walker is parallelizable, or attempt to run it with -nt 2 and if it the engine doesn't complain the walker is parallelized.
In shared memory parallelism, each thread operates on a 16 kbp shard of reference sequence in a completely independent memory space from all other threads. Up to 24 threads can run efficiently in this design, but greater parallelism is limited by resource starvation from the single reduce thread and/or memory inefficiency by keeping each thread’s data totally independent.
Enabling n-way parallelism from the command-line
- Run the GATK, using the -nt command-line argument to specify the number of threads that the GATK should attempt to use.
Helpful hints: Implementing a walker with parallelism in mind
First, be aware that some walkers may, by design, require a rewrite for complete parallelization.
- When implementing a standard (non-parallelized) walker, one must implement the reduce method, which combines an individual datum returned by the map function with the aggregate of the prior map calls. When implementing a parallelizable walker, one must also implement the org.broadinstitute.sting.gatk.walkers.TreeReducible interface and the treeReduce() function. The TreeReduce function tells the GATK how to combine two adjacent reduces, rather than one map result and one reduce.
- The GATK supports writing to output files from either the map or the reduce when running in parallel. However, only unbuffered writers are supported at this moment. Please use PrintStream rather than PrintWriter at this time.
Limitations
The GATK's support for parallelism is currently limited. The following classes of walkers are not supported by our parallelization framework:
- Read walkers
- Reduce-by-interval walkers
Note that each thread operates completely independently in the current GATK implementation of shared memory parallelism. So if you provide 1Gb to the GATK with nt 1, then you should provide 4Gb to run with nt 4. If you don't do this, it is possible to starve out the GATK so that it runs much much slower.
The performance of the multi-threaded GATK is really dependent on whether you are IO or CPU limited and the relative overhead of the parallelism on your computer. Additionally, nt can start to have very high overheads with nt > 20 due to our implement and memory contention issues.
The best option for NT is a value less or equal to the number of available cores with sufficient memory to run each threads (nt x amount provided to 1 core), capped additionally by the available IO bandwidth so that the individual threads don't starve each other.
Scatter / gather parallelism
Scatter / gather is a simple approach for process-independently parallelism with the GATK. First you scatter multiple independent GATK instances out over a network of computing nodes, each operating on slightly different genomic intervals, and when they all complete, the output of each is gathered up into a merged output dataset. In the GATK S/G is extremely easy to use, as all of the GATK tools support the -L option to operate over only genomic specific intervals, and many tools emit files that can be merged together across multiple regions. Unified Genotyper, for example, can operate over the whole genome in one process, or on each chromosome independently. The output of this later approach, after merging together, should be the same as the whole genome results, minus any slight differences due to random number effects. The simplicity and efficiency of S/G parallelism makes this a key option for getting things done quickly with the GATK.
Using S/G parallelism is extremely easy, either by hand or using the automated Scatter/Gather in GATK-Queue. Suppose I have the following command line:
java -jar GenomeAnalysisTK -R human.fasta -T UnifiedGenotyper -I my.bam -L chr1
This runs a single process of the GATK over chr1, and let's say it takes an hour when I run it. In order to run it with S/G parallelism mode, just partition chr1 into two independent parts:
This file distributed.tracker.txt will contain genomic locations and GATK process ids that are processing each location, in text format, so you can cat it. If you run at the command line:
gsa1> java -jar GenomeAnalysisTK -R human.fasta -T UnifiedGenotyper -I my.bam -L chr1:1-125,000,000 -o my.1.vcf & gsa1> java -jar GenomeAnalysisTK -R human.fasta -T UnifiedGenotyper -I my.bam -L chr1:125,000,001-249,250,621 -o my.2.vcf &
When these two jobs finish, I just merge the two VCFs together and I've got a complete data set in half the time.
Distributed Parallelism (Experimental, not for public use)
Note that distributed parallelism is an experimental, and essentially untested feature of the GATK. As of July 20, this feature has been disabled.
The GATK just (Jan. 2011) was updated to include a generic capabilities to synchronize work loads across multiple GATK *processes* through a single file. In order to use distributed parallelism in the GATK, you will need:
- A file visible to all GATK processes (on a common file system, either locally or shared across the network)
- This file must be lockable via a atomic file creation event. The Broad's NFS system supports this type of locking. Jobs will fail with error messages reading the shared tracking file or you will see duplicate output, depending on the exact race conditions, if your system doesn't support this type of locking.
- The walker currently must be a Locus or ROD walker -- Read walkers are not yet supported.
- Shared memory parallelism must not be enabled in individual processes, but this restrict will be lifted in the very near future.
Enabling distributed parallelism is extremely easy. Suppose I have the following command line:
CMD = java -jar GenomeAnalysisTK -R human.fasta -T UnifiedGenotyper -I my.bam -L chr1
This runs a single process of the GATK over chr1, and let's say it takes an hour when I run it. In order to run it in distributed parallelism mode, just add the argument:
CMD = java -jar GenomeAnalysisTK -R human.fasta -T UnifiedGenotyper -I my.bam -L chr1 -C distributed.tracker.txt
This file distributed.tracker.txt will contain genomic locations and GATK process ids that are processing each location, in text format, so you can cat it. The format of this file is:
20:1-16384 GATK_110748018 20:16385-32768 GATK_115851309 20:32769-49152 GATK_110124471 et cetera
The first line is the 16 kbp "shard" region, and the second is the ID of the GATK instance that processed it. The numbers are random integers assigned from the nano-second resolution system clock. Note that the format, and even existence, of this file may change in the future, as we experiment with better ways of handling distributed parallelism in the GATK. That said, right now there's one line for each region being processed, and for each region a single GATK ID of the instance that processed it. The file grows over time as more intervals are processed by the swarm of GATK instances, each of which is writing to this file.
If you run at the command line:
gsa1> $CMD -o my.1.vcf & gsa1> $CMD -o my.2.vcf &
Two processes of the GATK will start, and they will coordinate their activities via that file. Note that the output of each job is given a unique name.
Some important practical issues:
- If any job fails, the system will be left in an unstable state, so resuming is currently not supported
- If you want to rerun / restart the jobs, you *must* manually delete this file, otherwise the GATK will think *resume* computation based on the information in the file
- You can start up jobs whenever you want, they don't have to occur close to each other in time.
- If you start a job and there's no work left to do, the GATK will quickly run through the file and exit without an error.
Example of distributed parallelism
Suppose I want to process a single BAM file with the unified genotyper over the whole genome. In this case my default GATK command would look like:
java -jar GenomeAnalysisTK.jar -R my.ref -I my.bam -o my.vcf
Now, if I want to enable distributed parallelism to run 10 jobs simultaneously, I'd need to generate 20 jobs, each with output going to separate VCF files but using a single shared tracker file, and send these off to LSF or some other cluster to work at the same time on my data:
rm -f my.tracker # make sure I don't keep an older tracker file around!
foreach i (1 2 3 4 5 6 7 8 9 10):
bsub java -jar GenomeAnalysisTK.jar -R my.ref -I my.bam -o my.$i.vcf -C my.tracker
end
Now I'll see in each VCF some part of my whole genome call set. The VCFs have interlaced outputs, so for example you might see my.1.vcf have chr1:1-1mb and chr1:10mb-11mb, while my.2.vcf would have chr1:1mb-2mb. Each run is randomly different, though. In order to reconstitute the full VCF I'll need to merge the my.$i.vcf files, using something like CombineVariants or a simple perl script.
Analysis of distributed parallelism performance
Distributed parallelism status logs
By adding the -CSF file.$PART.status to your distributed GATK jobs, where each $PART is replaced with a unique name for each job (like it's count) you can get a detailed picture of what the distributed processing is doing across all processes over time. This is only useful for debugging purpose and isn't a standard, or even generally supported, option but if you are playing with distributed GATK and want to see what's going on, add this argument and have a look at the output. In the SVN there's a few R scripts that create interesting plots based on these files.
Parallelism and Determinism
Similar to downsampling parallel-processing will significantly speed up data processing but may produce statistically insignificant differences. While this non-determinism is not ideal in practice the minute differences have been mathematically meaningless while producing consistent results in a reasonable amount of time for whole genome and whole exome data. However, if absolute determinism is more important than speed we recommend you do not use parallelism with the GATK.
