This document explains the concepts involved and how they are applied within the GATK (and Queue where applicable). For specific configuration recommendations, see the companion document on parallelizing GATK tools.
Parallelism is a way to make a program finish faster by performing several operations in parallel, rather than sequentially (i.e. waiting for each operation to finish before starting the next one).
Imagine you need to cook rice for sixty-four people, but your rice cooker can only make enough rice for four people at a time. If you have to cook all the batches of rice sequentially, it's going to take all night. But if you have eight rice cookers that you can use in parallel, you can finish up to eight times faster.
This is a very simple idea but it has a key requirement: you have to be able to break down the job into smaller tasks that can be done independently. It's easy enough to divide portions of rice because rice itself is a collection of discrete units. In contrast, let's look at a case where you can't make that kind of division: it takes one pregnant woman nine months to grow a baby, but you can't do it in one month by having nine women share the work.
The good news is that most GATK runs are more like rice than like babies. Because GATK tools are built to use the Map/Reduce method (see doc for details), most GATK runs essentially consist of a series of many small independent operations that can be parallelized.
Parallelism is a great way to speed up processing on large amounts of data, but it has "overhead" costs. Without getting too technical at this point, let's just say that parallelized jobs need to be managed, you have to set aside memory for them, regulate file access, collect results and so on. So it's important to balance the costs against the benefits, and avoid dividing the overall work into too many small jobs.
Going back to the introductory example, you wouldn't want to use a million tiny rice cookers that each boil a single grain of rice. They would take way too much space on your countertop, and the time it would take to distribute each grain then collect it when it's cooked would negate any benefits from parallelizing in the first place.
OK, parallelism sounds great (despite the tradeoffs caveat), but how do we get from cooking rice to executing programs? What actually happens in the computer?
Consider that when you run a program like the GATK, you're just telling the computer to execute a set of instructions.
Let's say we have a text file and we want to count the number of lines in it. The set of instructions to do this can be as simple as:
open the file, count the number of lines in the file, tell us the number, close the file
tell us the number can mean writing it to the console, or storing it somewhere for use later on.
Now let's say we want to know the number of words on each line. The set of instructions would be:
open the file, read the first line, count the number of words, tell us the number, read the second line, count the number of words, tell us the number, read the third line, count the number of words, tell us the number
And so on until we've read all the lines, and finally we can close the file. It's pretty straightforward, but if our file has a lot of lines, it will take a long time, and it will probably not use all the computing power we have available.
So to parallelize this program and save time, we just cut up this set of instructions into separate subsets like this:
open the file, index the lines
read the first line, count the number of words, tell us the number
read the second line, count the number of words, tell us the number
read the third line, count the number of words, tell us the number
[repeat for all lines]
collect final results and close the file
read the Nth line steps can be performed in parallel, because they are all independent operations.
You'll notice that we added a step,
index the lines. That's a little bit of peliminary work that allows us to perform the
read the Nth line steps in parallel (or in any order we want) because it tells us how many lines there are and where to find each one within the file. It makes the whole process much more efficient. As you may know, the GATK requires index files for the main data files (reference, BAMs and VCFs); the reason is essentially to have that indexing step already done.
Anyway, that's the general principle: you transform your linear set of instructions into several subsets of instructions. There's usually one subset that has to be run first and one that has to be run last, but all the subsets in the middle can be run at the same time (in parallel) or in whatever order you want.
There are three different modes of parallelism offered by the GATK, and to really understand the difference you first need to understand what are the different levels of computing that are involved.
By levels of computing, we mean the computing units in terms of hardware: the core, the machine (or CPU) and the cluster.
Core: the level below the machine. On your laptop or desktop, the CPU (central processing unit, or processor) contains one or more cores. If you have a recent machine, your CPU probably has at least two cores, and is therefore called dual-core. If it has four, it's a quad-core, and so on. High-end consumer machines like the latest Mac Pro have up to twelve-core CPUs (which should be called dodeca-core if we follow the Latin terminology) but the CPUs on some professional-grade machines can have tens or hundreds of cores.
Machine: the middle of the scale. For most of us, the machine is the laptop or desktop computer. Really we should refer to the CPU specifically, since that's the relevant part that does the processing, but the most common usage is to say machine. Except if the machine is part of a cluster, in which case it's called a node.
Cluster: the level above the machine. This is a high-performance computing structure made of a bunch of machines (usually called nodes) networked together. If you have access to a cluster, chances are it either belongs to your institution, or your company is renting time on it. A cluster can also be called a server farm or a load-sharing facility.
Parallelism can be applied at all three of these levels, but in different ways of course, and under different names. Parallelism takes the name of multi-threading at the core and machine levels, and scatter-gather at the cluster level.
In computing, a thread of execution is a set of instructions that the program issues to the processor to get work done. In single-threading mode, a program only sends a single thread at a time to the processor and waits for it to be finished before sending another one. In multi-threading mode, the program may send several threads to the processor at the same time.
Not making sense? Let's go back to our earlier example, in which we wanted to count the number of words in each line of our text document. Hopefully it is clear that the first version of our little program (one long set of sequential instructions) is what you would run in single-threaded mode. And the second version (several subsets of instructions) is what you would run in multi-threaded mode, with each subset forming a separate thread. You would send out the first thread, which performs the preliminary work; then once it's done you would send the "middle" threads, which can be run in parallel; then finally once they're all done you would send out the final thread to clean up and collect final results.
If you're still having a hard time visualizing what the different threads are like, just imagine that you're doing cross-stitching. If you're a regular human, you're working with just one hand. You're pulling a needle and thread (a single thread!) through the canvas, making one stitch after another, one row after another. Now try to imagine an octopus doing cross-stitching. He can make several rows of stitches at the same time using a different needle and thread for each. Multi-threading in computers is surprisingly similar to that.
Hey, if you have a better example, let us know in the forum and we'll use that instead.
Alright, now that you understand the idea of multithreading, let's get practical: how do we do get the GATK to use multi-threading?
There are two options for multi-threading with the GATK, controlled by the arguments
-nct, respectively. They can be combined, since they act at different levels of computing:
--num_threads controls the number of data threads sent to the processor (acting at the machine level)
--num_cpu_threads_per_data_thread controls the number of CPU threads allocated to each data thread (acting at the core level).
Not all GATK tools can use these options due to the nature of the analyses that they perform and how they traverse the data. Even in the case of tools that are used sequentially to perform a multi-step process, the individual tools may not support the same options. For example, at time of writing (Dec. 2012), of the tools involved in local realignment around indels, RealignerTargetCreator supports
-nt but not
-nct, while IndelRealigner does not support either of these options.
In addition, there are some important technical details that affect how these options can be used with optimal results. Those are explained along with specific recommendations for the main GATK tools in a companion document on parallelizing the GATK.
If you Google it, you'll find that the term scatter-gather can refer to a lot of different things, including strategies to get the best price quotes from online vendors, methods to control memory allocation and… an indie-rock band. What all of those things have in common (except possibly the band) is that they involve breaking up a task into smaller, parallelized tasks (scattering) then collecting and integrating the results (gathering). That should sound really familiar to you by now, since it's the general principle of parallel computing.
So yes, "scatter-gather" is really just another way to say we're parallelizing things. OK, but how is it different from multithreading, and why do we need yet another name?
As you know by now, multithreading specifically refers to what happens internally when the program (in our case, the GATK) sends several sets of instructions to the processor to achieve the instructions that you originally gave it in a single command-line. In contrast, the scatter-gather strategy as used by the GATK involves a separate program, called Queue, which generates separate GATK jobs (each with its own command-line) to achieve the instructions given in a so-called Qscript (i.e. a script written for Queue in a programming language called Scala).
At the simplest level, the Qscript can involve a single GATK tool*. In that case Queue will create separate GATK commands that will each run that tool on a portion of the input data (= the scatter step). The results of each run will be stored in temporary files. Then once all the runs are done, Queue will collate all the results into the final output files, as if the tool had been run as a single command (= the gather step).
Note that Queue has additional capabilities, such as managing the use of multiple GATK tools in a dependency-aware manner to run complex pipelines, but that is outside the scope of this article. To learn more about pipelining the GATK with Queue, please see the Queue documentation.
So you see, scatter-gather is a very different process from multi-threading because the parallelization happens outside of the program itself. The big advantage is that this opens up the upper level of computing: the cluster level. Remember, the GATK program is limited to dispatching threads to the processor of the machine on which it is run – it cannot by itself send threads to a different machine. But Queue can dispatch scattered GATK jobs to different machines in a computing cluster by interfacing with your cluster's job management software.
That being said, multithreading has the great advantage that cores and machines all have access to shared machine memory with very high bandwidth capacity. In contrast, the multiple machines on a network used for scatter-gather are fundamentally limited by network costs.
The good news is that you can combine scatter-gather and multithreading: use Queue to scatter GATK jobs to different nodes on your cluster, then use the GATK's internal multithreading capabilities to parallelize the jobs running on each node.
Going back to the rice-cooking example, it's as if instead of cooking the rice yourself, you hired a catering company to do it for you. The company assigns the work to several people, who each have their own cooking station with multiple rice cookers. Now you can feed a lot more people in the same amount of time! And you don't even have to clean the dishes.
This document provides technical details and recommendations on how the parallelism options offered by the GATK can be used to yield optimal performance results.
As explained in the primer on parallelism for the GATK, there are two main kinds of parallelism that can be applied to the GATK: multi-threading and scatter-gather (using Queue).
There are two options for multi-threading with the GATK, controlled by the arguments
-nct, respectively, which can be combined:
-nt / --num_threadscontrols the number of data threads sent to the processor
-nct / --num_cpu_threads_per_data_threadcontrols the number of CPU threads allocated to each data thread
For more information on how these multi-threading options work, please read the primer on parallelism for the GATK.
Each data thread needs to be given the full amount of memory you’d normally give a single run. So if you’re running a tool that normally requires 2 Gb of memory to run, if you use
-nt 4, the multithreaded run will use 8 Gb of memory. In contrast, CPU threads will share the memory allocated to their “mother” data thread, so you don’t need to worry about allocating memory based on the number of CPU threads you use.
-nctwith versions 2.2 and 2.3
Because of the way the
-nct option was originally implemented, in versions 2.2 and 2.3, there is one CPU thread that is reserved by the system to “manage” the rest. So if you use
-nct, you’ll only really start seeing a speedup with
-nct 3 (which yields two effective "working" threads) and above. This limitation has been resolved in the implementation that will be available in versions 2.4 and up.
For more details on scatter-gather, see the primer on parallelism for the GATK and the Queue documentation.
Please note that not all tools support all parallelization modes. The parallelization modes that are available for each tool depend partly on the type of traversal that the tool uses to walk through the data, and partly on the nature of the analyses it performs.
|Tool||Full name||Type of traversal||NT||NCT||SG|
The table below summarizes configurations that we typically use for our own projects (one per tool, except we give three alternate possibilities for the UnifiedGenotyper). The different values allocated for each tool reflect not only the technical capabilities of these tools (which options are supported), but also our empirical observations of what provides the best tradeoffs between performance gains and commitment of resources. Please note however that this is meant only as a guide, and that we cannot give you any guarantee that these configurations are the best for your own setup. You will probably have to experiment with the settings to find the configuration that is right for you.
|Cluster nodes||1||4||4||1||4||4 / 4 / 4|
|CPU threads (
||1||1||8||4-8||1||3 / 6 / 24|
|Data threads (
||24||1||1||1||1||8 / 4 / 1|
|Memory (Gb)||48||4||4||4||4||32 / 16 / 4|
Where NT is data multithreading, NCT is CPU multithreading and SG is scatter-gather using Queue. For more details on scatter-gather, see the primer on parallelism for the GATK and the Queue documentation.
The MapReduce architecture of the GATK allows most walkers in the GATK to be run in a parallel-processing mode. The GATK supports two basic parallel processing models known as shared memory and scatter-gather.
Shared memory parallelism
Parallelism within a single multi-threading process with access to a large, shared RAM. Shared memory parallelism is stable and supported by many tools that access pileups of bases.
Scatter/gather (SG) parallelism
In SG parallelism, the target genomic regions are divided up into N independent GATK instances that are run separately on a single machine or across a computing cluster. The outputs of each independent walker, are merged together once all are completed. SG works very efficiently in the GATK, provided the output of a walker is independent per site (e.g. Unified Genotyper) or per chromosome (e.g. Indel Realigner). SG parallelism is a completely stable approach in the GATK, and used routinely by the GATK team in processing large data sets; it is also natively supported by GATK-Queue, which automatically scatters and gathers GATK processes given a desired N number of processes to execute simultaneously.
Note that parallel-processing will significantly speed up data processing but may produce statistically insignificant differences. While this non-determinism is not ideal in practice the minute differences have been mathematically meaningless while producing consistent results in a reasonable amount of time for whole genome and whole exome data. However, if absolute determinism is more important than speed we recommend you do not use parallelism with the GATK.
There are costs and benefits to each type of parallelism in the GATK, as outlined in the following table.
Comparison of standard parallelism approaches in the GATK
|Stability||Stable||Stable | Retired in codebase|
|Applicable walker types||By locus and by ROD only. ReadWalkers are not supported.||All walker types. ReadWalkers can only be split safely by chromosome in general|
|Example walkers||UnifiedGenotyper, BaseRecalibrator, VariantEval||All walkers, including ReadWalkers like IndelRealigner|
|Scalability||Fewer than 32 cores. Each thread operates completely independently, so N threads requires N times more memory than 1 thread alone. Best scaling at 8 or fewer threads.||Hundreds of processes. Limited by capabilities of the underlying storage system, in general. Isilon-class storage can run thousands of jobs effectively.|
|How to enable||Use the
|Pros||- Easy to enable. - Single output file merged together by internally by the GATK engine - Efficiently uses multi-core processors sharing a single memory space||- Works for all walker types, including ReadWalkers - Scales to hundreds of independent jobs - Easy to enable with single
|Cons||- Limited to fewer than 32 processors without significant overhead - Limited to cores physically present on the machine, cannot take advantage of computing cluster resources - Does not work for ReadWalkers (Table Recalibrator, Indel Realigner)||- Requires manual preparation of sliced genomic intervals for processing (if you aren't using GATK-Queue). - For ReadWalkers and other tools that can only be processed by chromosome, leading to load balancing problems (chr1 is much bigger than chr22) - Sensitive to data density variation over the genome. Dividing chr20 processing in 63 1MB chunks leads to 10x differences in completion times due to data pileups around the centromere, for example. - Must wait until all parts of the scatter have completed before gathering, making the process sensitive to farm scheduling problems. If a job will run for M minutes, but waits Z minutes to start on the farm, the entire SG process will not complete for at least M + Z minutes.|
Almost certainly, either shared memory or scatter/gather parallelism is the right choice for your NGS pipeline. Our go-to option for parallelism here at the Broad Institute is S/G, since S/G allows you to cut up your jobs into hundreds of pieces to run on our standard computing farm, using GATK-Queue. When I have a small job that needs to be run quickly, am testing out program options or need a quick VariantEval result, I'm using shared memory parallelism with ~10 threads on a single large computer with a lot (>64 GB) of memory.
Basically, if I have N processors, and I want to choose between shared memory or S/G, here's how I would choose:
If all N processors are on a single computer, and my walker supports it, use shared memory parallelism
If not, use S/G
The GATK currently supports a hierarchical version of parallelism. In this form of parallelism, data is divided into shards, each shard is map/reduced independently, and the results are combined with a 'tree reduce' step. While the framework handles much of the heavy lifting of data division required for parallelism, each walker must individually be reviewed to ensure that it isn't tracking state internally in a non-threadsafe way. Many tools support shared memory parallelism, including critical tools such as:
Please review the source to discover if your walker is parallelizable, or attempt to run it with
-nt 2 and if it the engine doesn't complain the walker is parallelized.
In shared memory parallelism, each thread operates on a 16 kbp shard of reference sequence in a completely independent memory space from all other threads. Up to 24 threads can run efficiently in this design, but greater parallelism is limited by resource starvation from the single reduce thread and/or memory inefficiency by keeping each thread’s data totally independent. See gatkParallelism performance 082112 these plots for an analysis of the scalability of several key GATK tools as a function of nt.
Run the GATK, using the
-nt command-line argument to specify the number of threads that the GATK should attempt to use.
[[image:HierarchicalParallelism.png|thumb|Shared memory parallelism architecture]]
First, be aware that some walkers may, by design, require a rewrite for complete parallelization.
When implementing a standard (non-parallelized) walker, one must implement the reduce method, which combines an individual datum returned by the map function with the aggregate of the prior map calls. When implementing a parallelizable walker, one must also implement the org.broadinstitute.sting.gatk.walkers.TreeReducible interface and the treeReduce() function. The TreeReduce function tells the GATK how to combine two adjacent reduces, rather than one map result and one reduce.
The GATK supports writing to output files from either the map or the reduce when running in parallel. However, only unbuffered writers are currently supported. Please use PrintStream rather than PrintWriter at this time.
The GATK's support for parallelism is currently limited. The following classes of walkers are not supported by our parallelization framework:
Note that each thread operates completely independently in the current GATK implementation of shared memory parallelism. So if you provide 1Gb to the GATK with
nt 1, then you should provide 4Gb to run with
nt 4. If you don't do this, it is possible to starve out the GATK so that it runs much much slower.
The performance of the multi-threaded GATK is really dependent on whether you are IO or CPU limited and the relative overhead of the parallelism on your computer. Additionally,
nt can start to have very high overheads with
nt > 20 due to our implementation and memory contention issues.
The best option for
nt is a value less or equal to the number of available cores with sufficient memory to run each threads (
nt x amount provided to 1 core), capped additionally by the available IO bandwidth so that the individual threads don't starve each other.
Scatter / gather is a simple approach for process-independent parallelism with the GATK. First you scatter multiple independent GATK instances out over a network of computing nodes, each operating on slightly different genomic intervals, and when they all complete, the output of each is gathered up into a merged output dataset. In the GATK S/G is extremely easy to use, as all of the GATK tools support the -L option to operate over only genomic specific intervals, and many tools emit files that can be merged together across multiple regions. Unified Genotyper, for example, can operate over the whole genome in one process, or on each chromosome independently. The output of this later approach, after merging together, should be the same as the whole genome results, minus any slight differences due to random number effects. The simplicity and efficiency of S/G parallelism makes this a key option for getting things done quickly with the GATK.
Using S/G parallelism is extremely easy, either by hand or using the automated Scatter/Gather in GATK-Queue. Suppose I have the following command line:
java -jar GenomeAnalysisTK -R human.fasta -T UnifiedGenotyper -I my.bam -L chr1
This runs a single process of the GATK over chr1, and let's say it takes an hour when I run it. In order to run it with S/G parallelism mode, just partition chr1 into two independent parts:
This file distributed.tracker.txt will contain genomic locations and GATK process ids that are processing each location, in text format, so you can cat it. If you run at the command line:
gsa1> java -jar GenomeAnalysisTK -R human.fasta -T UnifiedGenotyper -I my.bam -L chr1:1-125,000,000 -o my.1.vcf & gsa1> java -jar GenomeAnalysisTK -R human.fasta -T UnifiedGenotyper -I my.bam -L chr1:125,000,001-249,250,621 -o my.2.vcf &
When these two jobs finish, I just merge the two VCFs together and I've got a complete data set in half the time.
Please be aware that if you have been using BaseRecalibrator scatter-gathered with Queue (GATK versions 2.0 and 2.1), your results may be wrong. You will need to redo the base recalibration of your data WITHOUT scatter-gathering.
This issue will be fixed in the next release (version 2.2). We apologize for any inconvenience this may cause you!
I just managed to use HaplotypeCaller with the lasted version of Queue to call variants on 40 human exomes. The HaplotypeCaller job were scattered into 50 sub jobs and spread in our cluster with Sun Grid Engine.
The problem I found is that sub jobs take quite vary time to finish, which is from 5 hours to 80 hours and majority of them are below 55 hours, hence the whole job were actually slowed down by just a few longer sub jobs. I know that part of the difference were definitely caused by the performance of the cluster node running the job, but I think the major cause of the difference is reply on how the job were split. The qscript I used is adapted from here (without filtering part), from which I can not figure out how the job were split. Hence, I am wondering if anyone could tell me based on what (Genomic Regions ?) HaplotypeCaller job were actually scattered and how I can split the job more evenly so most of the sub jobs will finish at about the same time.
Thanks in advance,
Hallo everyone, I have a question about ReduceReads when using scatter/gather. In the argument details of ReduceReads you write for the parameter -nocmp_names: "... If you scatter/gather there is no guarantee that read name uniqueness will be maintained -- in this case we recommend not compressing."
Do you mean, that if I use scatter/gather, I should use ReduceReads with the -nocmp_names option so that the read names will not be compressed OR do you mean that I should not use ReduceReads at all when scatter/gathering.
I assume the first is meant, I just wanted to make sure. Thank you for your time and effort. Eva
At the Minnesota Supercomputing Institute, our environment requires that jobs on our high performance clusters reserve an entire node. I have implemented my own Torque Manager/Runner for our environment based on the Grid Engine Manager/Runner. The way I have gotten this to work in our environment is to set the nCoresRequest for the scatter/gather method to the minimum required of eight. My understanding is that for the InDelRealigner, for example, the job reserves a node with eight cores, but only uses one. That means our users would have their compute time allocation consumed eight times faster than is necessary.
What I am wondering is are there options that I am missing where some number of the scatter/gather requests can be grouped into a single job submission? If I were writing this as a PBS script for our environment and I wanted to use 16 cores in a scatter/gather implementation, I would write two jobs, each with eight commands. They would look something like the following:
#PBS Job Configuration stuff pbsdsh -n 0 java -jar ... & pbsdsh -n 1 java -jar ... & pbsdsh -n 2 java -jar ... & pbsdsh -n 3 java -jar ... & pbsdsh -n 4 java -jar ... & pbsdsh -n 5 java -jar ... & pbsdsh -n 6 java -jar ... & pbsdsh -n 7 java -jar ... & wait
Has anyone done something similar in Queue? Any pointers? Thanks in advance!