This document explains the concepts involved and how they are applied within the GATK (and Queue where applicable). For specific configuration recommendations, see the companion document on parallelizing GATK tools.
Parallelism is a way to make a program finish faster by performing several operations in parallel, rather than sequentially (i.e. waiting for each operation to finish before starting the next one).
Imagine you need to cook rice for sixty-four people, but your rice cooker can only make enough rice for four people at a time. If you have to cook all the batches of rice sequentially, it's going to take all night. But if you have eight rice cookers that you can use in parallel, you can finish up to eight times faster.
This is a very simple idea but it has a key requirement: you have to be able to break down the job into smaller tasks that can be done independently. It's easy enough to divide portions of rice because rice itself is a collection of discrete units. In contrast, let's look at a case where you can't make that kind of division: it takes one pregnant woman nine months to grow a baby, but you can't do it in one month by having nine women share the work.
The good news is that most GATK runs are more like rice than like babies. Because GATK tools are built to use the Map/Reduce method (see doc for details), most GATK runs essentially consist of a series of many small independent operations that can be parallelized.
Parallelism is a great way to speed up processing on large amounts of data, but it has "overhead" costs. Without getting too technical at this point, let's just say that parallelized jobs need to be managed, you have to set aside memory for them, regulate file access, collect results and so on. So it's important to balance the costs against the benefits, and avoid dividing the overall work into too many small jobs.
Going back to the introductory example, you wouldn't want to use a million tiny rice cookers that each boil a single grain of rice. They would take way too much space on your countertop, and the time it would take to distribute each grain then collect it when it's cooked would negate any benefits from parallelizing in the first place.
OK, parallelism sounds great (despite the tradeoffs caveat), but how do we get from cooking rice to executing programs? What actually happens in the computer?
Consider that when you run a program like the GATK, you're just telling the computer to execute a set of instructions.
Let's say we have a text file and we want to count the number of lines in it. The set of instructions to do this can be as simple as:
open the file, count the number of lines in the file, tell us the number, close the file
tell us the number can mean writing it to the console, or storing it somewhere for use later on.
Now let's say we want to know the number of words on each line. The set of instructions would be:
open the file, read the first line, count the number of words, tell us the number, read the second line, count the number of words, tell us the number, read the third line, count the number of words, tell us the number
And so on until we've read all the lines, and finally we can close the file. It's pretty straightforward, but if our file has a lot of lines, it will take a long time, and it will probably not use all the computing power we have available.
So to parallelize this program and save time, we just cut up this set of instructions into separate subsets like this:
open the file, index the lines
read the first line, count the number of words, tell us the number
read the second line, count the number of words, tell us the number
read the third line, count the number of words, tell us the number
[repeat for all lines]
collect final results and close the file
read the Nth line steps can be performed in parallel, because they are all independent operations.
You'll notice that we added a step,
index the lines. That's a little bit of peliminary work that allows us to perform the
read the Nth line steps in parallel (or in any order we want) because it tells us how many lines there are and where to find each one within the file. It makes the whole process much more efficient. As you may know, the GATK requires index files for the main data files (reference, BAMs and VCFs); the reason is essentially to have that indexing step already done.
Anyway, that's the general principle: you transform your linear set of instructions into several subsets of instructions. There's usually one subset that has to be run first and one that has to be run last, but all the subsets in the middle can be run at the same time (in parallel) or in whatever order you want.
There are three different modes of parallelism offered by the GATK, and to really understand the difference you first need to understand what are the different levels of computing that are involved.
By levels of computing, we mean the computing units in terms of hardware: the core, the machine (or CPU) and the cluster.
Core: the level below the machine. On your laptop or desktop, the CPU (central processing unit, or processor) contains one or more cores. If you have a recent machine, your CPU probably has at least two cores, and is therefore called dual-core. If it has four, it's a quad-core, and so on. High-end consumer machines like the latest Mac Pro have up to twelve-core CPUs (which should be called dodeca-core if we follow the Latin terminology) but the CPUs on some professional-grade machines can have tens or hundreds of cores.
Machine: the middle of the scale. For most of us, the machine is the laptop or desktop computer. Really we should refer to the CPU specifically, since that's the relevant part that does the processing, but the most common usage is to say machine. Except if the machine is part of a cluster, in which case it's called a node.
Cluster: the level above the machine. This is a high-performance computing structure made of a bunch of machines (usually called nodes) networked together. If you have access to a cluster, chances are it either belongs to your institution, or your company is renting time on it. A cluster can also be called a server farm or a load-sharing facility.
Parallelism can be applied at all three of these levels, but in different ways of course, and under different names. Parallelism takes the name of multi-threading at the core and machine levels, and scatter-gather at the cluster level.
In computing, a thread of execution is a set of instructions that the program issues to the processor to get work done. In single-threading mode, a program only sends a single thread at a time to the processor and waits for it to be finished before sending another one. In multi-threading mode, the program may send several threads to the processor at the same time.
Not making sense? Let's go back to our earlier example, in which we wanted to count the number of words in each line of our text document. Hopefully it is clear that the first version of our little program (one long set of sequential instructions) is what you would run in single-threaded mode. And the second version (several subsets of instructions) is what you would run in multi-threaded mode, with each subset forming a separate thread. You would send out the first thread, which performs the preliminary work; then once it's done you would send the "middle" threads, which can be run in parallel; then finally once they're all done you would send out the final thread to clean up and collect final results.
If you're still having a hard time visualizing what the different threads are like, just imagine that you're doing cross-stitching. If you're a regular human, you're working with just one hand. You're pulling a needle and thread (a single thread!) through the canvas, making one stitch after another, one row after another. Now try to imagine an octopus doing cross-stitching. He can make several rows of stitches at the same time using a different needle and thread for each. Multi-threading in computers is surprisingly similar to that.
Hey, if you have a better example, let us know in the forum and we'll use that instead.
Alright, now that you understand the idea of multithreading, let's get practical: how do we do get the GATK to use multi-threading?
There are two options for multi-threading with the GATK, controlled by the arguments
-nct, respectively. They can be combined, since they act at different levels of computing:
--num_threads controls the number of data threads sent to the processor (acting at the machine level)
--num_cpu_threads_per_data_thread controls the number of CPU threads allocated to each data thread (acting at the core level).
Not all GATK tools can use these options due to the nature of the analyses that they perform and how they traverse the data. Even in the case of tools that are used sequentially to perform a multi-step process, the individual tools may not support the same options. For example, at time of writing (Dec. 2012), of the tools involved in local realignment around indels, RealignerTargetCreator supports
-nt but not
-nct, while IndelRealigner does not support either of these options.
In addition, there are some important technical details that affect how these options can be used with optimal results. Those are explained along with specific recommendations for the main GATK tools in a companion document on parallelizing the GATK.
If you Google it, you'll find that the term scatter-gather can refer to a lot of different things, including strategies to get the best price quotes from online vendors, methods to control memory allocation and… an indie-rock band. What all of those things have in common (except possibly the band) is that they involve breaking up a task into smaller, parallelized tasks (scattering) then collecting and integrating the results (gathering). That should sound really familiar to you by now, since it's the general principle of parallel computing.
So yes, "scatter-gather" is really just another way to say we're parallelizing things. OK, but how is it different from multithreading, and why do we need yet another name?
As you know by now, multithreading specifically refers to what happens internally when the program (in our case, the GATK) sends several sets of instructions to the processor to achieve the instructions that you originally gave it in a single command-line. In contrast, the scatter-gather strategy as used by the GATK involves a separate program, called Queue, which generates separate GATK jobs (each with its own command-line) to achieve the instructions given in a so-called Qscript (i.e. a script written for Queue in a programming language called Scala).
At the simplest level, the Qscript can involve a single GATK tool*. In that case Queue will create separate GATK commands that will each run that tool on a portion of the input data (= the scatter step). The results of each run will be stored in temporary files. Then once all the runs are done, Queue will collate all the results into the final output files, as if the tool had been run as a single command (= the gather step).
Note that Queue has additional capabilities, such as managing the use of multiple GATK tools in a dependency-aware manner to run complex pipelines, but that is outside the scope of this article. To learn more about pipelining the GATK with Queue, please see the Queue documentation.
So you see, scatter-gather is a very different process from multi-threading because the parallelization happens outside of the program itself. The big advantage is that this opens up the upper level of computing: the cluster level. Remember, the GATK program is limited to dispatching threads to the processor of the machine on which it is run – it cannot by itself send threads to a different machine. But Queue can dispatch scattered GATK jobs to different machines in a computing cluster by interfacing with your cluster's job management software.
That being said, multithreading has the great advantage that cores and machines all have access to shared machine memory with very high bandwidth capacity. In contrast, the multiple machines on a network used for scatter-gather are fundamentally limited by network costs.
The good news is that you can combine scatter-gather and multithreading: use Queue to scatter GATK jobs to different nodes on your cluster, then use the GATK's internal multithreading capabilities to parallelize the jobs running on each node.
Going back to the rice-cooking example, it's as if instead of cooking the rice yourself, you hired a catering company to do it for you. The company assigns the work to several people, who each have their own cooking station with multiple rice cookers. Now you can feed a lot more people in the same amount of time! And you don't even have to clean the dishes.
This document provides technical details and recommendations on how the parallelism options offered by the GATK can be used to yield optimal performance results.
As explained in the primer on parallelism for the GATK, there are two main kinds of parallelism that can be applied to the GATK: multi-threading and scatter-gather (using Queue).
There are two options for multi-threading with the GATK, controlled by the arguments
-nct, respectively, which can be combined:
-nt / --num_threadscontrols the number of data threads sent to the processor
-nct / --num_cpu_threads_per_data_threadcontrols the number of CPU threads allocated to each data thread
For more information on how these multi-threading options work, please read the primer on parallelism for the GATK.
Each data thread needs to be given the full amount of memory you’d normally give a single run. So if you’re running a tool that normally requires 2 Gb of memory to run, if you use
-nt 4, the multithreaded run will use 8 Gb of memory. In contrast, CPU threads will share the memory allocated to their “mother” data thread, so you don’t need to worry about allocating memory based on the number of CPU threads you use.
-nctwith versions 2.2 and 2.3
Because of the way the
-nct option was originally implemented, in versions 2.2 and 2.3, there is one CPU thread that is reserved by the system to “manage” the rest. So if you use
-nct, you’ll only really start seeing a speedup with
-nct 3 (which yields two effective "working" threads) and above. This limitation has been resolved in the implementation that will be available in versions 2.4 and up.
For more details on scatter-gather, see the primer on parallelism for the GATK and the Queue documentation.
Please note that not all tools support all parallelization modes. The parallelization modes that are available for each tool depend partly on the type of traversal that the tool uses to walk through the data, and partly on the nature of the analyses it performs.
|Tool||Full name||Type of traversal||NT||NCT||SG|
The table below summarizes configurations that we typically use for our own projects (one per tool, except we give three alternate possibilities for the UnifiedGenotyper). The different values allocated for each tool reflect not only the technical capabilities of these tools (which options are supported), but also our empirical observations of what provides the best tradeoffs between performance gains and commitment of resources. Please note however that this is meant only as a guide, and that we cannot give you any guarantee that these configurations are the best for your own setup. You will probably have to experiment with the settings to find the configuration that is right for you.
|Cluster nodes||1||4||4||1||4||4 / 4 / 4|
|CPU threads (
||1||1||8||4-8||1||3 / 6 / 24|
|Data threads (
||24||1||1||1||1||8 / 4 / 1|
|Memory (Gb)||48||4||4||4||4||32 / 16 / 4|
Where NT is data multithreading, NCT is CPU multithreading and SG is scatter-gather using Queue. For more details on scatter-gather, see the primer on parallelism for the GATK and the Queue documentation.
Imagine a simple question like, "What's the depth of coverage at position A of the genome?"
First, you are given billions of reads that are aligned to the genome but not ordered in any particular way (except perhaps in the order they were emitted by the sequencer). This simple question is then very difficult to answer efficiently, because the algorithm is forced to examine every single read in succession, since any one of them might span position A. The algorithm must now take several hours in order to compute this value.
Instead, imagine the billions of reads are now sorted in reference order (that is to say, on each chromosome, the reads are stored on disk in the same order they appear on the chromosome). Now, answering the question above is trivial, as the algorithm can jump to the desired location, examine only the reads that span the position, and return immediately after those reads (and only those reads) are inspected. The total number of reads that need to be interrogated is only a handful, rather than several billion, and the processing time is seconds, not hours.
This reference-ordered sorting enables the GATK to process terabytes of data quickly and without tremendous memory overhead. Most GATK tools run very quickly and with less than 2 gigabytes of RAM. Without this sorting, the GATK cannot operate correctly. Thus, it is a fundamental rule of working with the GATK, which is the reason for the Central Dogma of the GATK:
One of the key challenges of working with next-gen sequence data is that input files are usually very large. We can’t just make the program open the files, load all the data into memory and perform whatever analysis is needed on all of it in one go. It’s just too much work, even for supercomputers.
Instead, we make the program cut the job into smaller tasks that the computer can easily process separately. Then we have it combine the results of each step into the final result.
Map/Reduce is the technique we use to achieve this. It consists of three steps formally called
reduce. Let’s apply it to an example case where we want to find out what is the average depth of coverage in our dataset for a certain region of the genome.
filter determines what subset of the data needs to be processed in each task. In our example, the program lists all the reference positions in our region of interest.
map applies the function, i.e. performs the analysis on each subset of data. In our example, for each position in the list, the program looks into the BAM file, pulls out the pileup of bases and outputs the depth of coverage at that position.
reduce combines the elements in the list of results output by the
map function. In our example, the program takes the coverage numbers that were calculated separately for all the reference positions and calculates their average, which is the final result we want.
This may seem trivial for such a simple example, but it is a very powerful method with many advantages. Among other things, it makes it relatively easy to parallelize operations, which makes the tools run much faster on large datasets.
All the tools in the GATK are built from the ground up to take advantage of this method. That’s why we call them walkers: because they “walk” across the genome, getting things done.
Note that even though it’s not included in the Map/Reduce technique’s name, the
filter step is very important. It determines what data get presented to the tool for analysis, selecting only the appropriate data for each task and discarding anything that’s not relevant. This is a key part of the Map/Reduce technique, because that’s what makes each task “bite-sized” enough for the computer to handle easily.
Each tool has filters that are tailored specifically for the type of analysis it performs. The filters rely on traversal engines, which are little programs that are designed to “traverse” the data (i.e. walk through the data) in specific ways.
There are three major types of traversal: Locus Traversal, Read Traversal and Active Region Traversal. In our interval coverage example, the tool’s filter uses the Locus Traversal engine, which walks through the data by locus, i.e. by position along the reference genome. Because of that, the tool is classified as a Locus Walker. Similarly, the Read Traversal engine is used, you’ve guessed it, by Read Walkers.
The GATK engine comes packed with many other ways to walk through the genome and get the job done seamlessly, but those are the ones you’ll encounter most often.
Hi All, I've been attempting to use the haplotype caller on my 50x coverage exome data. The bam being parsed is about 12G. Each time, the caller runs for many hours and then the output is only the header of the VCF - no errors seen. I'm wondering if this is due to limited space on my drives or if the expected file size is much larger than I am anticipating.
GenomeAnalysisTK.jar -T HaplotypeCaller -R Homo_sapiens_assembly19.fasta -I input.bam --dbsnp dbsnp_132.b37.nochr.vcf -stand_call_conf 30 -stand_emit_conf 10 -o output.Haplotypes.vcf
I have been running GATK2 ReduceReads on a large (100Gb) Bam file, and even though at the very beginning it runs very smoothly and predicts a week for finishing the task, after a few hours it gets totally stock. We first thought that it could be a garbage collection (or java memory allocation issue), but the logs show that the garbage collection works well.
The command is (similar behavior for smaller Xms and Xmx values) java -Xmx30g -Xms30g -XX:+PrintGCTimeStamps -XX:+UseParallelOldGC -XX:+PrintGCDetails -Xloggc:gc.log -verbose:gc -jar $path $ref -T ReduceReads -I input.bam -o output.bam
The first few lines of the log file are
INFO 01:12:21,541 TraversalEngine - chr1:1094599 5.89e+05 9.9 m 16.8 m 0.0% 19.4 d 19.4 d INFO 01:13:21,628 TraversalEngine - chr1:2112411 9.44e+05 10.9 m 11.6 m 0.1% 11.2 d 11.2 d INFO 01:14:22,065 TraversalEngine - chr1:3051535 1.29e+06 11.9 m 9.3 m 0.1% 8.5 d 8.5 d INFO 01:15:22,297 TraversalEngine - chr1:4084547 1.59e+06 12.9 m 8.1 m 0.1% 6.9 d 6.9 d INFO 01:16:24,130 TraversalEngine - chr1:4719991 1.82e+06 13.9 m 7.7 m 0.2% 6.4 d 6.4 d
but after a short while it gets totally stock, and even in the location 121485073 of chromosome 1, there is almost no progress at all, and the estimated finish time goes over 11 weeks, and still increasing.
Any idea what the reason for this could be, and how we can solve the problem? The same command runs successfully on small (less than 5gig) Bam files though
Thanks in advance. --Sina
Background: I am testing GATK (ver. 2.0-39) for use in de novo SNP identification using targeted Illumina seq. against a set of ~2500 genes from 28 different indiv. genotypes, same species. These are PE 50 and PE100 libs. I do not have a defined set of indels or SNPs to use as a reference as per GATK Phase 1 best practices. The genome seq. for this organism is a first draft (2.2 GB with ~ 835,000 clusters/contigs). I decided to first test four libraries (two PE50 and two PE100) and then check the results and tweak switches as necessary before scaling up to the full complement of sample libs. So far I have:
java -Djava.io.tmpdir=/path/tmp_dir -jar /path/GenomeAnalysisTK.jar -T UnifiedGenotyper -R speciesname_idx/speciesname.fasta -I 4.libs_reduced.bam -o 4.libs.UG -nt 6
My questions are:
The program initially died because java didn't have enough write space. So I gave it a tmp dir. and it ran for 3 days and died after maxing out a hard, 2 TB directory size limit. I am now running it again with a 4 TB limit.
After 27 hr, I have only traversed 5.2% of the genome (if I'm understanding the stdout correctly).
INFO 16:33:47,746 TraversalEngine - ctg7180006247957:754 1.15e+08 26.9 h 14.0 m 5.2% 3.1 w 2.9 w
So, at this rate, that's ~21 days to process ~15% of the libs. I thought maybe there was an excessive amt of swap occurring that might be slowing things down, but of the 126 GB RAM available only~ 20-30 GB are being utilized among mine several other jobs, so not likely an issue.
I have no experience with this program, but this just seems way too slow for processing a relatively small dataset... and I wonder if it will ever be able to crunch through the full set of 28 libs.
Any suggestions/thoughts as to why this is occurring, and what I might be able do to speed things up would be greatly appreciated!
I am currently running an analysis using the HaplotypeCaller on 300 large BAM files on our cluster and decided to chunk the the genome in 3MB bins in order for them to be processed in a decent time. I'm however experiencing very long runtimes as more and more jobs get scheduled to run in parallel on the same files. Looking at the GATK options, I saw these 2 that I thought could be of help and was wondering what were the recommendation for using them: --num_bam_file_handles --read_buffer_size
More precisely, does the num_bam_file_handles increase processing time by a lot? and what is the default value for --read_buffer_size ?
Thanks a lot, Laurent
Hello, I was wondering if you have a page that explains which parameters affect performance of which tragets of the GATK?
For whoel genome, DepthOfCoverage can take more than 30hours and this is with the ommit base counts. Would putting in a higher read_buffer_size with more ram help, etc.
Is there any rule of thumb for allocating memory through "bsub" for running DataProcessingPipeline per bam file or per number of reads ?