Tagged with #pipeline
1 documentation article | 2 announcements | 9 forum discussions

Created 2012-08-10 23:47:33 | Updated 2014-02-03 22:35:00 | Tags: queue developer pipeline

Comments (13)

1. Introduction

As mentioned in the introductory materials, the core concept behind the GATK tools is the walker. The Queue scripting framework contains several mechanisms which make it easy to chain together GATK walkers.

2. Authoring walkers

As part of authoring your walker there are several Queue behaviors that you can specify for [QScript]() authors using your particular walker.

Specifying how to partition

Queue can significantly speed up generating walker outputs by passing different instances of the GATK the same BAM or VCF data but specifying different regions of the data to analyze. After the different instances output their individual results Queue will gather the results back to the original output path requested by QScript.

Queue limits the level it will split genomic data by examining the @PartitionBy() annotation for your walker which specifies a PartitionType. This table lists the different partition types along with the default partition level for each of the different walker types.

PartitionType Default for Walker Type Description Example Intervals Example Splits
PartitionType.CONTIG Read walkers Data is grouped together so that all genomic data from the same contig is never presented to two different instances of the GATK. original: chr1:10-11, chr2:10-20, chr2:30-40, chr2:50-60, chr3:10-11 split 1: chr1:10-11, chr2:10-20, chr2:30-40, chr2:50-60; split 2:chr3:10-11
PartitionType.INTERVAL (none) Data is split down to the interval level but never divides up an explicitly specified interval. If no explicit intervals are specified in the QScript for the GATK then this is effectively the same as splitting by contig. original: chr1:10-11, chr2:10-20, chr2:30-40, chr2:50-60, chr3:10-11 split 1: chr1:10-11, chr2:10-20, chr2:30-40; split 2: chr2:50-60, chr3:10-11
PartitionType.LOCUS Locus walkers, ROD walkers Data is split down to the locus level possibly dividing up intervals. original: chr1:10-11, chr2:10-20, chr2:30-40, chr2:50-60, chr3:10-11 split 1: chr1:10-11, chr2:10-20, chr2:30-35; split 2: chr2:36-40, chr2:50-60, chr3:10-11
PartitionType.NONE Read pair walkers, Duplicate walkers The data cannot be split and Queue must run the single instance of the GATK as specified in the QScript. original: chr1:10-11, chr2:10-20, chr2:30-40, chr2:50-60, chr3:10-11 no split: chr1:10-11, chr2:10-20, chr2:30-40, chr2:50-60, chr3:10-11

If you walker is implemented in a way that Queue should not divide up your data you should explicitly set the @PartitionBy(PartitionType.NONE). If your walker can theoretically be run per genome location specify @PartitionBy(PartitionType.LOCUS).

public class ExampleWalker extends LocusWalker<Integer, Integer> {

Specifying how to join outputs

Queue will join the standard walker outputs.

Output type Default gatherer implementation
SAMFileWriter The BAM files are joined together using Picard's MergeSamFiles.
VCFWriter The VCF files are joined together using the GATK CombineVariants.
PrintStream The first two files are scanned for a common header. The header is written once into the output, and then each file is appended to the output, skipping past with the header lines.

If your PrintStream is not a simple text file that can be concatenated together, you must implement a Gatherer. Extend your custom Gatherer from the abstract base class and implement the gather() method.

package org.broadinstitute.sting.commandline;

import java.io.File;
import java.util.List;

 * Combines a list of files into a single output.
public abstract class Gatherer {
     * Gathers a list of files into a single output.
     * @param inputs Files to combine.
     * @param output Path to output file.
    public abstract void gather(List<File> inputs, File output);

     * Returns true if the caller should wait for the input files to propagate over NFS before running gather().
    public boolean waitForInputs() { return true; }

Specify your gatherer using the @Gather() annotation by your @Output.

public PrintStream out;

Queue will run your custom gatherer to join the intermediate outputs together.

3. Using GATK walkers in Queue

Queue GATK Extensions

Running 'ant queue' builds a set of Queue extensions for the GATK-Engine. Every GATK walker and command line program in the compiled GenomeAnalysisTK.jar a Queue compatible wrapper is generated.

The extensions can be imported via import org.broadinstitute.sting.queue.extensions.gatk._

import org.broadinstitute.sting.queue.QScript
import org.broadinstitute.sting.queue.extensions.gatk._

class MyQscript extends QScript {

Note that the generated GATK extensions will automatically handle shell-escaping of all values assigned to the various Walker parameters, so you can rest assured that all of your values will be taken literally by the shell. Do not attempt to escape values yourself -- ie.,

Do this:

filterSNPs.filterExpression = List("QD<2.0", "MQ<40.0", "HaplotypeScore>13.0")

NOT this:

filterSNPs.filterExpression = List("\"QD<2.0\"", "\"MQ<40.0\"", "\"HaplotypeScore>13.0\"")

Listing variables

In addition to the GATK documentation on this wiki you can also find the full list of arguments for each walker extension in a variety of ways.

The source code for the extensions is generated during ant queue and placed in this directory:


When properly configured an IDE can provide command completion of the walker extensions. See Queue with IntelliJ IDEA for our recommended settings.

If you do not have access to an IDE you can still find the names of the generated variables using the command line. The generated variable names on each extension are based off of the fullName of the Walker argument. To see the built in documentation for each Walker, run the GATK with:

java -jar GenomeAnalysisTK.jar -T <walker name> -help

Once the import statement is specified you can add() instances of gatk extensions in your QScript's script() method.

Setting variables

If the GATK walker input allows more than one of a value you should specify the values as a List().

  def script() {
    val snps = new UnifiedGenotyper
    snps.reference_file = new File("testdata/exampleFASTA.fasta")
    snps.input_file = List(new File("testdata/exampleBAM.bam"))
    snps.out = new File("snps.vcf")

Although it may be harder for others trying to read your QScript, for each of the long name arguments the extensions contain aliases to their short names as well.

  def script() {
    val snps = new UnifiedGenotyper
    snps.R = new File("testdata/exampleFASTA.fasta")
    snps.I = List(new File("testdata/exampleBAM.bam"))
    snps.out = new File("snps.vcf")

Here are a few more examples using various list assignment operators.

  def script() {
    val countCovariates = new CountCovariates

    // Append to list using item appender :+
    countCovariates.rodBind :+= RodBind("dbsnp", "VCF", dbSNP)

    // Append to list using collection appender ++
    countCovariates.covariate ++= List("ReadGroupCovariate", "QualityScoreCovariate", "CycleCovariate", "DinucCovariate")

    // Assign list using plain old object assignment
    countCovariates.input_file = List(inBam)

    // The following is not a list, so just assigning one file to another
    countCovariates.recal_file = outRecalFile


Specifying an alternate GATK jar

By default Queue runs the GATK from the current classpath. This works best since the extensions are generated and compiled at time same time the GATK is compiled via ant queue.

If you need to swap in a different version of the GATK you may not be able to use the generated extensions. The alternate GATK jar must have the same command line arguments as the GATK compiled with Queue. Otherwise the arguments will not match and you will get an error when Queue attempts to run the alternate GATK jar. In this case you will have to create your own custom CommandLineFunction for your analysis.

  def script {
    val snps = new UnifiedGenotyper
    snps.jarFile = new File("myPatchedGATK.jar")
    snps.reference_file = new File("testdata/exampleFASTA.fasta")
    snps.input_file = List(new File("testdata/exampleBAM.bam"))
    snps.out = new File("snps.vcf")

GATK scatter/gather

Queue currently allows QScript authors to explicitly invoke scatter/gather on GATK walkers by setting the scatter count on a function.

  def script {
    val snps = new UnifiedGenotyper
    snps.reference_file = new File("testdata/exampleFASTA.fasta")
    snps.input_file = List(new File("testdata/exampleBAM.bam"))
    snps.out = new File("snps.vcf")
    snps.scatterCount = 20

This will run the UnifiedGenotyper up to 20 ways parallel and then will merge the partial VCFs back into the single snps.vcf.

Additional caveat

Some walkers are still being updated to support Queue fully. For example they may not have defined the @Input and @Output and thus Queue is unable to correctly track their dependencies, or a custom Gatherer may not be implemented yet.

Created 2016-03-31 17:40:04 | Updated 2016-04-11 18:30:23 | Tags: workflow pipeline topstory cromwell wdl

Comments (1)

Today I'm delighted to introduce WDL, pronounced widdle, a new workflow description language that is designed from the ground up as a human-readable and -writable way to express tasks and workflows.

As a lab-grown biologist (so to speak), I think analysis pipelines are amazing. The thought that you can write a script that describes a complex chain of processing and analytical operations, then all you need to do every time you have new data is feed it through the pipe, and out come results -- that's awesome. Back in the day, I learned to run BLAST jobs by copy-pasting gene sequences into the NCBI BLAST web browser window. When I found out you could write a script that takes a collection of sequences, submit them directly to the BLAST server, then extracts the results and does some more computation on them… mind blown. And this was in Perl, so there was pain involved, but it was worth it -- for a few weeks afterward I almost had a social life! In grad school! Then it was back to babysitting cell cultures day and night until I could find another excuse to do some "in silico" work. Where "in silico" was the hand-wavy way of saying we were doing some stuff with computers. Those were simpler times.

So it's been kind of a disappointment for the last decade or so that writing pipelines was still so flipping hard. I mean smart, robust pipelines that can understand things like parallelism, dependencies of inputs and outputs between tasks, and resume intelligently if they get interrupted. Sure, in the GATK world we have Queue, which is very useful in many respects, but it's tailored to specific computing cluster environments like LSF, and writing scala scripts is really not trivial if you're new to it. I wouldn't call Queue user-friendly by a long shot. Plus, we only really use it internally for development work -- to run our production pipelines, we've actually been using a more robust and powerful system called Zamboni. It's a real workhorse, and has gracefully handled the exponential growth of sequencer output up to this point, but it's more complicated to use -- I have to admit I haven't managed to wrap my head around how it works.

Fortunately I don't have to try anymore: our engineers have developed a new pipelining solution that involves WDL, the Workflow Description Language I mentioned earlier, and an execution engine called Cromwell that can run WDL scripts anywhere, whether locally or on the cloud.

It's portable, super user-friendly by design, and open-source (under BSD) -- we’re eager to share it with the world!

In a nutshell, WDL was designed to make it easy to write scripts that describe analysis tasks and chain those tasks into workflows, with built-in support for advanced features like scatter-gather parallelism. This came at the cost of some additional headaches for the engineering team when they built Cromwell, the execution engine that runs WDL scripts -- but that was a conscious design decision, to push off the complexity onto the engineers rather than leave it in the lap of pipeline authors.

You can learn more about WDL at https://software.broadinstitute.org/wdl/, a website we put together to provide detailed documentation on how to write WDL scripts and run them on Cromwell. It's further enriched by a number of tutorials and real GATK-focused workflows provided as examples, and in the near future we'll also share our GATK Best Practices production pipeline scripts.

Yes, I just said that, we're going to be sharing full-on production scripts! In the past we refused to share scripts because they weren't portable across systems and they were difficult to read, so we were concerned that sharing them would generate more support burden than we could handle. But now with that we have a solution that is both portable and user-friendly, we're not so worried -- and we're very excited about this as an opportunity to improve reproducibility among those who adopt our Best Practices in their work.

Of course it won't be the first time someone comes up with a revolutionary way to do things that ends up not working all that well in the real world. To that I say -- have you ever heard of the phrase "eating your own dog food", or "dogfooding" for short? It refers to a practice in software engineering where developers use their own product for running critical systems. I'm happy to say that we are now dogfooding Cromwell and WDL in our own production pipelines; a substantial part of the Broad's own analysis work, which involves processing terabytes worth of whole genome sequencing data, is now done on the cloud using Cromwell to execute pipelines written entirely in WDL.

That's how we know it works at scale, it's solid, and it does what we need it to do. That being said, there are a few additional features that are defined in WDL but not yet fully implemented in Cromwell, as well as a wishlist of convenience functions requested by our own methods development team, which the engineering team will pursue over the coming months. So its capabilities will keep growing as we add convenience functions to address our own and the wider community's pipelining needs.

Finally, I want to reiterate that WDL itself is meant to be very accessible to people who are not engineers or programmers; our own methods development team and various scientific collaborators have all commented that it's very easy to write and understand. So please try it out, and let us know what you think in the WDL forum!

Created 2013-09-17 18:18:29 | Updated | Tags: best-practices workshop queue pipeline

Comments (2)

Register now for a spot at the upcoming GATK workshop, which will be held in Cambridge, MA on October 21-22.


This workshop will cover the following topics:

  • GATK Best Practices for Variant Detection
  • Building Analysis Pipelines with Queue

The workshop is scheduled right before ASHG Boston, so if you're going to be in town for the conference, make sure you come a couple of days early and attend the GATK workshop!

Created 2015-07-21 22:33:32 | Updated 2015-07-21 22:33:53 | Tags: workflow pipeline exome cancer

Comments (1)

I am new to Bioinformatics, and would like some advice on changes to the GATK workflow for cancer. I was told that the cancer workflow is different, and see that several different tools are available.

I have Exome data from tumor and normal. I have aligned them, and have BAM files for each sample. I am interested in identifying somatic variants.

The current workflow as I understand it is: -(Non-GATK) Picard Mark Duplicates or Samtools roundup -Indel Realignment (Realigner TargetCreator + Indel Realigner) -Base Quality Score Reacalibration (Base Recalibrator + PrintReads) -UnifiedGenotyper -Annotation using Oncotator (?) -MuTect (identify somatic mutations)

**My questions are:

  1. Is the above workflow reasonable/correct for what I'm trying to do?
  2. Is there any difference running samples one pair at a time, or running them all together? (I have 57 pairs. Should I do 57 runs of normal-tumor pairs, or 1 run of all 57 pairs?)**

Thank you, Gaius

Created 2014-08-27 14:38:55 | Updated | Tags: queue developer pipeline

Comments (4)

Let's say I have a case class in a qscript like so:

case class getFileFromRemote(remoteFile:File, localFile:file) extends ExternalCommonArgs with SixteenCoreJob with FourDayJob{
    @Input(doc = "remote") val _remoteFile = remoteFile
    @Output(doc = "local") val _localFile = localFile
    def commandLine = "fetchFile.sh " + remoteFile + " " + localFile
    this.isIntermediate = true
    this.jobName = "fetchFile"
    this.analysisName = this.jobName

Then I can add it to my script() like so:

add( getFileFromRemote("user@server:/path/to/remote", "path/to/local") )

All is well so far.

Lets say that I have lots of files to fetch, so I add jobs in a for loop over a Seq of file names. I then add jobs downstream jobs as usual. The problem that I run in to is that all 1000+ fetchFile.sh (which uses irods/irsync behind the scenes) sessions will start at the same time, choking the system and nothing will get downloaded.

One solution to my problem would be to be able to set a limit in my fetcher case class, to tell Queue to never submit more than 5 (or so) of these jobs to the cluster. Is that possible, or can anyone see another way around this?

Created 2014-03-18 14:12:40 | Updated 2014-03-18 14:13:47 | Tags: pipeline markduplicates lanes

Comments (2)

Referring to broadinstitute.org/gatk/guide/article?id=3060, is removing duplicates necessary to be done twice, once per-lane and then per-sample?

Is it not enough to just mark the duplicates in the final BAM file with all the lanes merged, which should remove both optical and PCR duplicates (I am using Picard MarkDuplicates.jar)? So specifically, in the link above what is wrong with generating -

  • sample1_lane1.realn.recal.bam
  • sample1_lane2.realn.recal.bam
  • sample2_lane1.realn.recal.bam
  • sample2_lane2.realn.recal.bam

Then, merging them to get

  • sample1.merged.bam
  • sample2.merged.bam

and finally, include "de-dupping" only for the merged BAM file.

  • sample1.merged.dedup.realn.bam
  • sample2.merged.dedup.realn.bam

Created 2013-09-27 13:45:04 | Updated | Tags: test queue pipeline

Comments (1)

I'm wondering if there's any way to skip the GATKCommandLine line in the vcf-header (in a vcf file generated by UnifiedGenotyper). I thought that the --remove_program_records would do this but it doesn't seem to do the trick. I'm still seeing the header line.

##GATKCommandLine=<ID=UnifiedGenotyper,Version=2.7-2-g6bda569,Date="Fri Sep 27 15:17:59 CEST 2013", [...] >

The reason this is important to me is that I'm using the Pipeline test code provided in Queue and, as you know, this is based on md5 sums, and as the time when the tests was run is included, the md5 hash changes for each run. So, if there is no way to skip the header, is there any other, better way to do this.

Cheers, Johan

Created 2013-07-02 06:38:26 | Updated | Tags: gatk2 vcf pipeline bug variant-calling

Comments (9)

Hello all,

We've just started using GATK in order to perform variant calling in a non-model teleost fish. The fish genome is highly repetitive (>65%), and also suffers from the whole genome duplication event common in teleosts (e.g. zebrafish). Additionally, the fish strain we use is highly inbred, which should result in a highly homogenous genome. We have generated a genome assembly and a de novo repeat library based on NGS data (manuscript submitted) before mapping the reads from four individuals (male and female) to the genome via bowtie2. Variants were called using UnifiedGenotyper.

We generally get a very good list of variants, but it seems that we're getting a number of false positives and negatives when calling variants. Some of these appear to be due to paralogues, but some seem to be errors in the actual genotype call. For example:

scaffold00001 1199020 . T G 44.35 . AC=1;AF=0.167;AN=6;BaseQRankSum=-7.420;DP=110;Dels=0.00;FS=152.859;HaplotypeScore=3.6965;MLEAC=1;MLEAF=0.167;MQ=42.00;MQ0=0;MQRankSum=-1.972;QD=1.53;ReadPosRankSum=-2.777;SB=-4.096e+00 GT:AD:DP:GQ:PL 0/1:20,9:29:79:79,0,588 0/0:16,7:23:12:0,12,447 0/0:39,18:57:65:0,65,1426 ./.

In this case, individual 3 has a homozygous reference genotype, despite having a 31% minor allele frequency. Individual 1 also has a 31% minor allele frequency, but is called heterozygous.Some of the bases used to call the G allele are of low quality (when looking more closely using IGV), but I would still expect the genotype to be heterozygous.

A reverse example:

scaffold00458 298207 . A G 64.81 . AC=2;AF=0.333;AN=6;BaseQRankSum=3.027;DP=64;Dels=0.00;FS=5.080;HaplotypeScore=0.0000;MLEAC=2;MLEAF=0.333;MQ=16.26;MQ0=0;MQRankSum=3.177;QD=1.16;ReadPosRankSum=-3.252;SB=0.439 GT:AD:DP:GQ:PL 0/0:8,0:8:21:0,21,207 0/1:20,1:21:13:13,0,152 0/1:31,4:35:90:90,0,102 ./.

Here, individual 2 is called heterozygous, but there is only a single read which supports the minor allele. Additionally, when looking at IGV, you can see that the read in question has a number of mismatches, suggesting it originates from another area of the genome.

I've also uploaded screenshots of IGV if that I hope will help clarify the problems we're having. We have used default parameters of GATK in almost all cases, and we did not used VQSR, as we did not have a list of high confidence SNPs at the time.

Created 2013-05-15 08:23:58 | Updated | Tags: pipeline hadoop hadoop-bam

Comments (5)

Hello I'm a developer in Korea. Recently, I have been developed about Bioinformatics pipeline. I'm using BWA, Samtools, Picard, GATK. And then I wanna make this tool on hadoop. The reason is why Using MR is efficient to speed or memory something like that. So, I know GATK is made by MR. If so, did you test GATK on MR? In theory, that is more efficient than just GATK.

And, If GATK needs indexed and sorted SAM, with using hadoop-BAM library do I just make index and sort??

Because I am novice in Bioinformatics, this issue is too complicated to me.


e-mail : leoniz127@gmail.com phone : +821027266808

Created 2013-02-26 17:31:08 | Updated | Tags: pedigree vcf pipeline merge

Comments (3)

Hi to all

I have just started using GATK and I have few question about some tools and about the general workflow.

I have 3 exome-seq data from a trio and I have to detect rare or private variants that segregate with the disease.

From the 3 aligned bam file I procedeed with the GATK pipeline (ADDgroupInfo, MarkDup, Realign, BQSR, Unified Genotyper and variant filtration) and I generated 3 VCF file.

As now I have to use the PhaseByTrasmission tool, should I merge the 3 VCF file?

Or it was better to merge the BAM file after adding the group info and proceed with the other analysis?

And should I create my .ped file,(I visited http://pngu.mgh.harvard.edu/~purcell/plink/data.shtml#ped, but I couln't understand how ped file is generated) based on the read group that I have assigned?


Created 2012-11-14 17:19:17 | Updated 2013-01-07 20:03:38 | Tags: pipeline

Comments (1)

I have a pipeline someone gave me; in it, it runs the following obsolete GATK command:

java -Xms5g -Xmx5g -jar src/GenomeAnalysisTK-1.0.5506/GenomeAnalysisTK.jar -T CountCovariates -cov ReadGroupCovariate -cov QualityScoreCovariate -cov CycleCovariate -cov DinucCovariate -R data/ucsc.hg19.fasta --DBSNP data/hg19/snp131.rod -I runs/nwp/run-000/chrUn_gl000228.realn.bam -recalFile runs/nwp/run-000/chrUn_gl000228.recal.csv

How does the following differ from above?

java -Xms5g -Xmx5g -jar src/GenomeAnalysisTK-2.1-8-g5efb575/GenomeAnalysisTK.jar -T BaseRecalibrator -I runs/nwp/run-000/chrUn_gl000228.realn.bam -R data/ucsc.hg19.fasta -knownSites data/dbsnp_135.hg19.vcf -o runs/nwp/run-000/chrUn_gl000228.recal.grp

Then there is another step at this stage of the pipeline:

java -Xms5g -Xmx5g -jar src/GenomeAnalysisTK-1.0.5506/GenomeAnalysisTK.jar -R data/ucsc.hg19.fasta -I runs/nwp/run-000/chrUn_gl000228.realn.bam -o runs/nwp/run-000/chrUn_gl000228.recal.bam -T TableRecalibration -baq RECALCULATE --doNotWriteOriginalQuals -recalFile runs/nwp/run-000/chrUn_gl000228.recal.csv

How does one run this last step map in 2.1-8 version of GATK?

Created 2012-11-08 12:58:35 | Updated 2013-01-07 20:07:00 | Tags: queue pipeline scala

Comments (13)

Hi there, I wanted to reproduce in my variant calling Queue script the same conditional you have in MethodsDevelopmenCallingPipeline, i.e. including InbreedingCoeff depending on the number of samples. However, in that script the number of samples is passed to the Target object as an integer, and I would like to count it from the bam file list passed as an input to the script.

Therefore I followed the method in DataProcessingPipeline, i.e.

import org.broadinstitute.sting.queue.util.QScriptUtils
@Input(doc="input BAM file - or list of BAM files", fullName="input", shortName="I", required=true)
var bamFile: File = _
val bamFilesList = QScriptUtils.createSeqFromFile(bamFile)
val sampleNo = bamFilesList.size

But unfortunately, despite DataProcessingPipeline works just fine, when I put these lines in my other script I get the following error:

INFO  12:48:08,616 HelpFormatter - Date/Time: 2012/11/08 12:48:08 
INFO  12:48:08,616 HelpFormatter - ---------------------------------------------------------------------- 
INFO  12:48:08,616 HelpFormatter - ---------------------------------------------------------------------- 
##### ERROR ------------------------------------------------------------------------------------------
##### ERROR stack trace 
org.broadinstitute.sting.utils.exceptions.DynamicClassResolutionException: Could not create module HaplotypeCallerStep because      Cannot instantiate class (Invocation failure) caused by exception null
at org.broadinstitute.sting.utils.classloader.PluginManager.createByType(PluginManager.java:306)
at org.broadinstitute.sting.utils.classloader.PluginManager.createAllTypes(PluginManager.java:317)
at org.broadinstitute.sting.queue.QCommandLine.execute(QCommandLine.scala:126)
at org.broadinstitute.sting.commandline.CommandLineProgram.start(CommandLineProgram.java:236)
at org.broadinstitute.sting.commandline.CommandLineProgram.start(CommandLineProgram.java:146)
at org.broadinstitute.sting.queue.QCommandLine$.main(QCommandLine.scala:62)
at org.broadinstitute.sting.queue.QCommandLine.main(QCommandLine.scala)
##### ERROR ------------------------------------------------------------------------------------------
##### ERROR A GATK RUNTIME ERROR has occurred (version 2.2-2-gf44cc4e):
##### ERROR
##### ERROR Please visit the wiki to see if this is a known problem
##### ERROR If not, please post the error, with stack trace, to the GATK forum
##### ERROR Visit our website and forum for extensive documentation and answers to 
##### ERROR commonly asked questions http://www.broadinstitute.org/gatk
##### ERROR
##### ERROR MESSAGE: Could not create module HaplotypeCallerStep because Cannot instantiate class (Invocation failure) caused by exception null
##### ERROR ------------------------------------------------------------------------------------------

I tried several alternatives looking at the imports in DataProcessingPipeline but maybe I am missing something. Could you please advise?

thanks very much Francesco