Queue custom job schedulers
Posted in Pipelining with Queue on 2012-08-15 17:07:32 | Last updated on 2014-04-02 16:12:09

Comments (11)

Implementing a Queue JobRunner

The following scala methods need to be implemented for a new JobRunner. See the implementations of GridEngine and LSF for concrete full examples.

1. class JobRunner.start()

Start should to copy the settings from the CommandLineFunction into your job scheduler and invoke the command via sh <jobScript>. As an example of what needs to be implemented, here is the current contents of the start() method in MyCustomJobRunner which contains the pseudo code.

  def start() {
    // TODO: Copy settings from function to your job scheduler syntax.

    val mySchedulerJob = new ...

    // Set the display name to 4000 characters of the description (or whatever your max is)
    mySchedulerJob.displayName = function.description.take(4000)

    // Set the output file for stdout
    mySchedulerJob.outputFile = function.jobOutputFile.getPath

    // Set the current working directory
    mySchedulerJob.workingDirectory = function.commandDirectory.getPath

    // If the error file is set specify the separate output for stderr
    if (function.jobErrorFile != null) {
      mySchedulerJob.errFile = function.jobErrorFile.getPath

    // If a project name is set specify the project name
    if (function.jobProject != null) {
      mySchedulerJob.projectName = function.jobProject

    // If the job queue is set specify the job queue
    if (function.jobQueue != null) {
      mySchedulerJob.queue = function.jobQueue

    // If the resident set size is requested pass on the memory request
    if (residentRequestMB.isDefined) {
      mySchedulerJob.jobMemoryRequest = "%dM".format(residentRequestMB.get.ceil.toInt)

    // If the resident set size limit is defined specify the memory limit
    if (residentLimitMB.isDefined) {
      mySchedulerJob.jobMemoryLimit = "%dM".format(residentLimitMB.get.ceil.toInt)

    // If the priority is set (user specified Int) specify the priority
    if (function.jobPriority.isDefined) {
      mySchedulerJob.jobPriority = function.jobPriority.get

    // Instead of running the function.commandLine, run "sh <jobScript>"
    mySchedulerJob.command = "sh " + jobScript

    // Store the status so it can be returned in the status method.
    myStatus = RunnerStatus.RUNNING

    // Start the job and store the id so it can be killed in tryStop
    myJobId = mySchedulerJob.start()

2. class JobRunner.status

The status method should return one of the enum values from org.broadinstitute.sting.queue.engine.RunnerStatus:

  • RunnerStatus.RUNNING
  • RunnerStatus.DONE
  • RunnerStatus.FAILED

3. object JobRunner.init()

Add any initialization code to the companion object static initializer. See the LSF or GridEngine implementations for how this is done.

4. object JobRunner.tryStop()

The jobs that are still in RunnerStatus.RUNNING will be passed into this function. tryStop() should send these jobs the equivalent of a Ctrl-C or SIGTERM(15), or worst case a SIGKILL(9) if SIGTERM is not available.

Running Queue with a new JobRunner

Once there is a basic implementation, you can try out the Hello World example with -jobRunner MyJobRunner.

java -Djava.io.tmpdir=tmp -jar dist/Queue.jar -S scala/qscript/examples/HelloWorld.scala -jobRunner MyJobRunner -run

If all goes well Queue should dispatch the job to your job scheduler and wait until the status returns RunningStatus.DONE and hello world should be echo'ed into the output file, possibly with other log messages.

See [QFunction and Command Line Options]() for more info on Queue options.

Return to top Comment on this article in the forum