Parallelism and the GATK

From GSA

Jump to: navigation, search

Contents

Overview

The MapReduce architecture of the GATK allows most walkers in the GATK to be in a parallel-processing mode. The GATK supports three basic parallel processing models:

Shared memory parallelism 
Parallelism within a single multi-threading process with access to a large, shared RAM. Shared memory parallelism is stable and supported by many tools that access pileups of bases.
Scatter/gather (SG) parallelism 
In SG parallelism, the target genomic regions are divided up into N independent GATK instances that are run separately on a single machine or across a computing cluster. The output of each independent walker, when all have completed, are merged together. SG works very efficiently in the GATK, provided the output of a walker is independent per site (such as the Unified Genotyper) or per chromosome (Table Recalibrator or Indel Realigner). SG parallelism is a completely stable approach in the GATK, and used routinely by the GATK team in processing large data sets; it is also natively supported by GATK-Queue, which automatically scatter and gather GATK processes given a desired N number of processes to execute simulataneously.
Distributed parallelism 
Coordinated processing among separate GATK-Engine instances running on independent machines. Distributed parallelism is effectively a coordinated extension of SG parallelism where all of the GATK instances communicate to process a single data set, without pre-allocated regions of the genome to each instance. Distributed parallelism is a completely experimental system in the GATK, and may or may not work properly in some cases, due to bugs.

Comparison of GATK parallelism options

There are costs and benefits to each type of parallelism in the GATK, as outlined in the following table.

Comparison of standard parallelism approaches in the GATK
Property Shared Memory Scatter/Gather Distributed
Stability Stable Stable Experimental
Applicable walker types By locus and by rod only. ReadWalkers are not supported All walker types. ReadWalkers can only be split safely by chromosome in general By locus and by rod only. ReadWalkers are not supported
Example walkers Unified Genotyper, CountCovariates, VariantEval All walkers, including ReadWalkers like TableRecalibrator and Indel Realigner Unified Genotyper, CountCovariates. Not VariantEval, because it accumulates data in memory
Scalability Less than 32 cores. Each thread operates completely independently, so N threads requires N times more memory than 1 thread alone Hundreds of processes. Limited by capabilities of the underlying storage system, in general. Isilon-class storage can run thousands of jobs effectively. Hundreds of processes, currently, but likely will scale to thousands of inter-communicating processes in the coming years
How to enable Use the -nt argument in the Engine, on any walker that supports shared memory parallelism (the engine will tell you if not)
  1. Provide -L interval lists to the GATK, one different one for each of the N independent GATK tools. For example -L chr1 for first process, and -L chr2 for the second. When all processes have finished, merge the output together, as appropriate (e.g., MergeSam.jar for BAMs, and cat or CombineVariants for VCFs)
  2. Use GATK-Queue to automatically divide up your GATK jobs. this.scatterCount = 10 argument will result in 10 independent processes.
Add tracker file argument to engine, and spawn off N jobs with different output files. See below for more details.
Pros
  • Easy to enable.
  • Single output file merged together by internally by the GATK engine
  • Efficiently uses multi-core processors sharing a single memory space
  • Works for all walker types, including ReadWalkers
  • Scales to hundreds of independent jobs
  • Easy to enable with single -L argument
  • Directly supported and managed by GATK-Queue
  • Totally independent processing per interval -- failed parts can be easily resumed without repeating already successfully processed regions
  • More scalable than scatter/gather, up to thousands of processes
  • Better load balancing among all instances: faster end-to-end times
  • More tolerant to computing farm oddities; more reliable end-to-end processing time
Cons
  • Limited to fewer than 32 processors without significant overhead
  • Limited to cores physically present on the machine, cannot take advantage of computing cluster resources
  • Does not work for ReadWalkers (Table Recalibrator, Indel Realigner)
  • Requires manual preparation of sliced genomic intervals for processing (if you aren't using GATK-Queue).
  • For ReadWalkers and other tools that can only be processed by chromosome, leading to load balancing problems (chr1 is much bigger than chr22)
  • Sensitive to data density variation over the genome. Dividing chr20 processing in 63 1MB chunks leads to 10x differences in completion times due to data pileups around the centromere, for example.
  • Must wait until all parts of the scatter have completed before gathering, making the process sensitive to farm scheduling problems. If a job will run for M minutes, but waits Z minutes to start on the farm, the entire SG process will not complete for at least M + Z minutes.
  • Not stable, likely to crash or exhibit other buggy behavior
  • Currently will not work for ReadWalkers
  • More than a few hundred jobs can result in file lock starvation due to limitations of the file-based communication approach

Which parallelism option is right for me?

Almost certainly, either shared memory or scatter/gather parallelism is the right choice for your NGS pipeline. Our go-to option for parallelism here at the Broad Institute is S/G, since S/G allows you to cut up your jobs into hundreds of pieces to run on our standard computing farm, using GATK-Queue. When I have a small job that needs to be run quickly, am testing out program options or need a quick VariantEval result, I'm using shared memory parallelism with ~10 threads on a single large computer with a lot (>64 GB) of memory. In general I would not consider using distributed parallelism except for specific, experimental pipelines.

Basically, if I have N processors, and I want to choose between shared memory or S/G, here's how I would choose:

  • If all N processors are on a single computer, and my walker supports it, use shared memory parallelism
  • If not, use S/G

Shared Memory Parallelism (Stable)

The GATK currently supports a hierarchical version of parallelism. In this form of parallelism, data is divided into shards, each shard is map/reduced independently, and the results are combined with a 'tree reduce' step. While the framework handles much of the heavy lifting of data division required for parallelism, each walker must individually be reviewed to ensure that it isn't tracking state internally in a non-threadsafe way. Many tools support shared memory parallelism, including critical tools such as:

  • UnifiedGenotyper
  • CountCovariates
  • VariantEval

Please review the source to discover if your walker is parallelizable, or attempt to run it with -nt 2 and if it the engine doesn't complain the walker is parallelized.

In shared memory parallelism, each thread operates on a 16 kbp shard of reference sequence in a completely independent memory space from all other threads. Up to 24 threads can run efficiently in this design, but greater parallelism is limited by resource starvation from the single reduce thread and/or memory inefficiency by keeping each thread’s data totally independent.

Enabling n-way parallelism from the command-line

  • Run the GATK, using the -nt command-line argument to specify the number of threads that the GATK should attempt to use.
    Shared memory parallelism architecture

Helpful hints: Implementing a walker with parallelism in mind

First, be aware that some walkers may, by design, require a rewrite for complete parallelization.

  • When implementing a standard (non-parallelized) walker, one must implement the reduce method, which combines an individual datum returned by the map function with the aggregate of the prior map calls. When implementing a parallelizable walker, one must also implement the org.broadinstitute.sting.gatk.walkers.TreeReducible interface and the treeReduce() function. The TreeReduce function tells the GATK how to combine two adjacent reduces, rather than one map result and one reduce.
  • The GATK supports writing to output files from either the map or the reduce when running in parallel. However, only unbuffered writers are supported at this moment. Please use PrintStream rather than PrintWriter at this time.

Limitations

The GATK's support for parallelism is currently limited. The following classes of walkers are not supported by our parallelization framework:

  • Read walkers
  • Reduce-by-interval walkers

Note that each thread operates completely independently in the current GATK implementation of shared memory parallelism. So if you provide 1Gb to the GATK with nt 1, then you should provide 4Gb to run with nt 4. If you don't do this, it is possible to starve out the GATK so that it runs much much slower.

The performance of the multi-threaded GATK is really dependent on whether you are IO or CPU limited and the relative overhead of the parallelism on your computer. Additionally, nt can start to have very high overheads with nt > 20 due to our implement and memory contention issues.

The best option for NT is a value less or equal to the number of available cores with sufficient memory to run each threads (nt x amount provided to 1 core), capped additionally by the available IO bandwidth so that the individual threads don't starve each other.

Scatter / gather parallelism

Scatter / gather is a simple approach for process-independently parallelism with the GATK. First you scatter multiple independent GATK instances out over a network of computing nodes, each operating on slightly different genomic intervals, and when they all complete, the output of each is gathered up into a merged output dataset. In the GATK S/G is extremely easy to use, as all of the GATK tools support the -L option to operate over only genomic specific intervals, and many tools emit files that can be merged together across multiple regions. Unified Genotyper, for example, can operate over the whole genome in one process, or on each chromosome independently. The output of this later approach, after merging together, should be the same as the whole genome results, minus any slight differences due to random number effects. The simplicity and efficiency of S/G parallelism makes this a key option for getting things done quickly with the GATK.

Using S/G parallelism is extremely easy, either by hand or using the automated Scatter/Gather in GATK-Queue. Suppose I have the following command line:

java -jar GenomeAnalysisTK -R human.fasta -T UnifiedGenotyper -I my.bam -L chr1

This runs a single process of the GATK over chr1, and let's say it takes an hour when I run it. In order to run it with S/G parallelism mode, just partition chr1 into two independent parts:

This file distributed.tracker.txt will contain genomic locations and GATK process ids that are processing each location, in text format, so you can cat it. If you run at the command line:

gsa1> java -jar GenomeAnalysisTK -R human.fasta -T UnifiedGenotyper -I my.bam -L chr1:1-125,000,000 -o my.1.vcf &
gsa1> java -jar GenomeAnalysisTK -R human.fasta -T UnifiedGenotyper -I my.bam -L chr1:125,000,001-249,250,621 -o my.2.vcf &

When these two jobs finish, I just merge the two VCFs together and I've got a complete data set in half the time.

Distributed Parallelism (Experimental, not for public use)

Note that distributed parallelism is an experimental, and essentially untested feature of the GATK. As of July 20, this feature has been disabled.

The GATK just (Jan. 2011) was updated to include a generic capabilities to synchronize work loads across multiple GATK *processes* through a single file. In order to use distributed parallelism in the GATK, you will need:

  • A file visible to all GATK processes (on a common file system, either locally or shared across the network)
  • This file must be lockable via a atomic file creation event. The Broad's NFS system supports this type of locking. Jobs will fail with error messages reading the shared tracking file or you will see duplicate output, depending on the exact race conditions, if your system doesn't support this type of locking.
  • The walker currently must be a Locus or ROD walker -- Read walkers are not yet supported.
  • Shared memory parallelism must not be enabled in individual processes, but this restrict will be lifted in the very near future.

Enabling distributed parallelism is extremely easy. Suppose I have the following command line:

CMD = java -jar GenomeAnalysisTK -R human.fasta -T UnifiedGenotyper -I my.bam -L chr1

This runs a single process of the GATK over chr1, and let's say it takes an hour when I run it. In order to run it in distributed parallelism mode, just add the argument:

CMD = java -jar GenomeAnalysisTK -R human.fasta -T UnifiedGenotyper -I my.bam -L chr1 -C distributed.tracker.txt

This file distributed.tracker.txt will contain genomic locations and GATK process ids that are processing each location, in text format, so you can cat it. The format of this file is:

20:1-16384 GATK_110748018
20:16385-32768 GATK_115851309
20:32769-49152 GATK_110124471
et cetera

The first line is the 16 kbp "shard" region, and the second is the ID of the GATK instance that processed it. The numbers are random integers assigned from the nano-second resolution system clock. Note that the format, and even existence, of this file may change in the future, as we experiment with better ways of handling distributed parallelism in the GATK. That said, right now there's one line for each region being processed, and for each region a single GATK ID of the instance that processed it. The file grows over time as more intervals are processed by the swarm of GATK instances, each of which is writing to this file.

If you run at the command line:

gsa1> $CMD -o my.1.vcf &
gsa1> $CMD -o my.2.vcf &

Two processes of the GATK will start, and they will coordinate their activities via that file. Note that the output of each job is given a unique name.

Some important practical issues:

  • If any job fails, the system will be left in an unstable state, so resuming is currently not supported
  • If you want to rerun / restart the jobs, you *must* manually delete this file, otherwise the GATK will think *resume* computation based on the information in the file
  • You can start up jobs whenever you want, they don't have to occur close to each other in time.
  • If you start a job and there's no work left to do, the GATK will quickly run through the file and exit without an error.

Example of distributed parallelism

Suppose I want to process a single BAM file with the unified genotyper over the whole genome. In this case my default GATK command would look like:

java -jar GenomeAnalysisTK.jar -R my.ref -I my.bam -o my.vcf

Now, if I want to enable distributed parallelism to run 10 jobs simultaneously, I'd need to generate 20 jobs, each with output going to separate VCF files but using a single shared tracker file, and send these off to LSF or some other cluster to work at the same time on my data:

rm -f my.tracker # make sure I don't keep an older tracker file around!
foreach i (1 2 3 4 5 6 7 8 9 10):
    bsub java -jar GenomeAnalysisTK.jar -R my.ref -I my.bam -o my.$i.vcf -C my.tracker
end

Now I'll see in each VCF some part of my whole genome call set. The VCFs have interlaced outputs, so for example you might see my.1.vcf have chr1:1-1mb and chr1:10mb-11mb, while my.2.vcf would have chr1:1mb-2mb. Each run is randomly different, though. In order to reconstitute the full VCF I'll need to merge the my.$i.vcf files, using something like CombineVariants or a simple perl script.

Analysis of distributed parallelism performance

Distributed parallelism status logs

By adding the -CSF file.$PART.status to your distributed GATK jobs, where each $PART is replaced with a unique name for each job (like it's count) you can get a detailed picture of what the distributed processing is doing across all processes over time. This is only useful for debugging purpose and isn't a standard, or even generally supported, option but if you are playing with distributed GATK and want to see what's going on, add this argument and have a look at the output. In the SVN there's a few R scripts that create interesting plots based on these files.

Parallelism and Determinism

Similar to downsampling parallel-processing will significantly speed up data processing but may produce statistically insignificant differences. While this non-determinism is not ideal in practice the minute differences have been mathematically meaningless while producing consistent results in a reasonable amount of time for whole genome and whole exome data. However, if absolute determinism is more important than speed we recommend you do not use parallelism with the GATK.

Personal tools