Operator pipelines#

On the SeqsLab platform, an operator pipeline is a collection of operators that are chained together to perform a specific set of tasks.

SeqsLab provides different pipeline operators that can be used with different input data types, number of partitions, reference genomes, and other considerations. When running jobs on the SeqsLab management console, you can define the operator pipeline on the Workload Parallelization page.

To view the full list of available pipelines, see Operator pipelines.

Configurable data processing pipelines#

Atgenomix empowers everyone to build auto-scaling automation with the integrated flow of operator pipelines in the entire life cycle of workflow tasks. From input localization, command execution, to output delocalization, the SeqsLab operators make it straightforward to design data processing pipelines, chain them together in tasks, parallelize their execution, and boost workflow efficiency.

SeqsLab operator pipelines work silently in the background with minimal manual configuration. However, if you are a more advanced user, you can opt to combine existing operators to create your own custom pipelines.

Furthermore, when you write your own pipeline, SeqsLab automatically adds operators to your pipeline as needed. For example, a loader is automatically assigned to localize your data from the cloud storage to the memory. Likewise, a writer operator saves your output to the cloud storage by default, unless you specify a different location. Automatically assigning operators to your pipeline not only ensures that it would work as expected. It also helps simplify your work and reduce your efforts.

Examples#

To better understand how operator pipelines work, let’s examine the following input.json example.

input.json sample file
{
"tasks": {
	"GermlineSnpsIndelsGatk4Hg19.IndexBam.inFileBam": {
		"operators": [
			"BamPartitionerPart1",
			"BamExecutor"
		],
		"description": "File-based BAM workload pipeline with all BAM records in a single partition"
	},
	"GermlineSnpsIndelsGatk4Hg19.IndexVcf.inFileVCF": {
		"operators": [
			"VcfPartitionerHg19Part1",
			"VcfExecutor"
		],
		"description": "File-based VCF workload pipeline with HG19 reference genome with all VCF records in a single partition"
	},
	"GermlineSnpsIndelsGatk4Hg19.HaplotypeCallerGvcf_GATK4.HaplotypeCaller.ref_dict": {
		"operators": [
			{
				"name": "RefLoader",
				"scope": "cluster",
				"arguments": {}
			}
		],
		"description": "Automatic workload pipeline for localizing either a file or directory shared within cluster in a single node cluster"
	},
	"GermlineSnpsIndelsGatk4Hg19.HaplotypeCallerGvcf_GATK4.HaplotypeCaller.input_bam": {
		"operators": [
			"CopyToLocalLoader"
		],
		"description": "Automatic workload pipeline for localizing a file per partition to each executor of cluster"
	},
	"GermlineSnpsIndelsGatk4Hg19.HaplotypeCallerGvcf_GATK4.HaplotypeCaller.ref_fasta": {
		"operators": [
			{
				"name": "RefLoader",
				"scope": "cluster",
				"arguments": {}
			}
		],
		"description": "Automatic workload pipeline for localizing either a file or directory shared within cluster in a single node cluster"
	},
	"GermlineSnpsIndelsGatk4Hg19.PreProcessingForVariantDiscovery_GATK4.BwaMem.ref_sa": {
		"operators": [
			{
				"name": "RefLoader",
				"scope": "cluster",
				"arguments": {}
			}
		],
		"description": "Automatic workload pipeline for localizing either a file or directory shared within cluster in a single node cluster"
	},
	"GermlineSnpsIndelsGatk4Hg19.PreProcessingForVariantDiscovery_GATK4.BwaMem.ref_alt": {
		"operators": [
			{
				"name": "RefLoader",
				"scope": "cluster",
				"arguments": {}
			}
		],
		"description": "Automatic workload pipeline for localizing either a file or directory shared within cluster in a single node cluster"
	},
	"GermlineSnpsIndelsGatk4Hg19.PreProcessingForVariantDiscovery_GATK4.BwaMem.ref_amb": {
		"operators": [
			{
				"name": "RefLoader",
				"scope": "cluster",
				"arguments": {}
			}
		],
		"description": "Automatic workload pipeline for localizing either a file or directory shared within cluster in a single node cluster"
	},
	"GermlineSnpsIndelsGatk4Hg19.PreProcessingForVariantDiscovery_GATK4.BwaMem.ref_ann": {
		"operators": [
			{
				"name": "RefLoader",
				"scope": "cluster",
				"arguments": {}
			}
		],
		"description": "Automatic workload pipeline for localizing either a file or directory shared within cluster in a single node cluster"
	},
	"GermlineSnpsIndelsGatk4Hg19.PreProcessingForVariantDiscovery_GATK4.BwaMem.ref_bwt": {
		"operators": [
			{
				"name": "RefLoader",
				"scope": "cluster",
				"arguments": {}
			}
		],
		"description": "Automatic workload pipeline for localizing either a file or directory shared within cluster in a single node cluster"
	},
	"GermlineSnpsIndelsGatk4Hg19.PreProcessingForVariantDiscovery_GATK4.BwaMem.ref_pac": {
		"operators": [
			{
				"name": "RefLoader",
				"scope": "cluster",
				"arguments": {}
			}
		],
		"description": "Automatic workload pipeline for localizing either a file or directory shared within cluster in a single node cluster"
	},
	"GermlineSnpsIndelsGatk4Hg19.PreProcessingForVariantDiscovery_GATK4.BwaMem.ref_dict": {
		"operators": [
			{
				"name": "RefLoader",
				"scope": "cluster",
				"arguments": {}
			}
		],
		"description": "Automatic workload pipeline for localizing either a file or directory shared within cluster in a single node cluster"
	},
	"GermlineSnpsIndelsGatk4Hg19.PreProcessingForVariantDiscovery_GATK4.BwaMem.ref_fasta": {
		"operators": [
			{
				"name": "RefLoader",
				"scope": "cluster",
				"arguments": {}
			}
		],
		"description": "Automatic workload pipeline for localizing either a file or directory shared within cluster in a single node cluster"
	},
	"GermlineSnpsIndelsGatk4Hg19.HaplotypeCallerGvcf_GATK4.HaplotypeCaller.ref_fasta_index": {
		"operators": [
			{
				"name": "RefLoader",
				"scope": "cluster",
				"arguments": {}
			}
		],
		"description": "Automatic workload pipeline for localizing either a file or directory shared within cluster in a single node cluster"
	},
	"GermlineSnpsIndelsGatk4Hg19.PreProcessingForVariantDiscovery_GATK4.ApplyBQSR.ref_dict": {
		"operators": [
			{
				"name": "RefLoader",
				"scope": "cluster",
				"arguments": {}
			}
		],
		"description": "Automatic workload pipeline for localizing either a file or directory shared within cluster in a single node cluster"
	},
	"GermlineSnpsIndelsGatk4Hg19.PreProcessingForVariantDiscovery_GATK4.ApplyBQSR.input_bam": {
		"operators": [
			"CopyToLocalLoader"
		],
		"description": "Automatic workload pipeline for localizing a file per partition to each executor of cluster"
	},
	"GermlineSnpsIndelsGatk4Hg19.PreProcessingForVariantDiscovery_GATK4.ApplyBQSR.ref_fasta": {
		"operators": [
			{
				"name": "RefLoader",
				"scope": "cluster",
				"arguments": {}
			}
		],
		"description": "Automatic workload pipeline for localizing either a file or directory shared within cluster in a single node cluster"
	},
	"GermlineSnpsIndelsGatk4Hg19.PreProcessingForVariantDiscovery_GATK4.Fastp.inFileFastqR1": {
		"operators": [
			"FastqPartitioner",
			"FastqExecutor"
		],
		"description": "File-based FASTQ workload parallelization pipeline with 1,048,576 read records for each partition"
	},
	"GermlineSnpsIndelsGatk4Hg19.PreProcessingForVariantDiscovery_GATK4.Fastp.inFileFastqR2": {
		"operators": [
			"FastqPartitioner",
			"FastqExecutor"
		],
		"description": "File-based FASTQ workload parallelization pipeline with 1,048,576 read records for each partition"
	},
	"GermlineSnpsIndelsGatk4Hg19.PreProcessingForVariantDiscovery_GATK4.BwaMem.inFileFastqR1": {
		"operators": [
			"CopyToLocalLoader"
		],
		"description": "Automatic workload pipeline for localizing a file per partition to each executor of cluster"
	},
	"GermlineSnpsIndelsGatk4Hg19.PreProcessingForVariantDiscovery_GATK4.BwaMem.inFileFastqR2": {
		"operators": [
			"CopyToLocalLoader"
		],
		"description": "Automatic workload pipeline for localizing a file per partition to each executor of cluster"
	},
	"GermlineSnpsIndelsGatk4Hg19.PreProcessingForVariantDiscovery_GATK4.BwaMem.ref_fasta_index": {
		"operators": [
			{
				"name": "RefLoader",
				"scope": "cluster",
				"arguments": {}
			}
		],
		"description": "Automatic workload pipeline for localizing either a file or directory shared within cluster in a single node cluster"
	},
	"GermlineSnpsIndelsGatk4Hg19.PreProcessingForVariantDiscovery_GATK4.SortAndFixTags.ref_dict": {
		"operators": [
			{
				"name": "RefLoader",
				"scope": "cluster",
				"arguments": {}
			}
		],
		"description": "Automatic workload pipeline for localizing either a file or directory shared within cluster in a single node cluster"
	},
	"GermlineSnpsIndelsGatk4Hg19.PreProcessingForVariantDiscovery_GATK4.MarkDuplicates.input_bam": {
		"operators": [
			"BamPartitionerHg19Chr20Part45",
			"BamExecutor"
		],
		"description": "File-based BAM workload pipeline with reads on HG19 reference genome chr20 parallelized into 45 contiguous unmasked regions"
	},
	"GermlineSnpsIndelsGatk4Hg19.PreProcessingForVariantDiscovery_GATK4.SortAndFixTags.input_bam": {
		"operators": [
			"CopyToLocalLoader"
		],
		"description": "Automatic workload pipeline for localizing a file per partition to each executor of cluster"
	},
	"GermlineSnpsIndelsGatk4Hg19.PreProcessingForVariantDiscovery_GATK4.SortAndFixTags.ref_fasta": {
		"operators": [
			{
				"name": "RefLoader",
				"scope": "cluster",
				"arguments": {}
			}
		],
		"description": "Automatic workload pipeline for localizing either a file or directory shared within cluster in a single node cluster"
	},
	"GermlineSnpsIndelsGatk4Hg19.PreProcessingForVariantDiscovery_GATK4.ApplyBQSR.input_bam_index": {
		"operators": [
			"CopyToLocalLoader"
		],
		"description": "Automatic workload pipeline for localizing a file per partition to each executor of cluster"
	},
	"GermlineSnpsIndelsGatk4Hg19.PreProcessingForVariantDiscovery_GATK4.ApplyBQSR.ref_fasta_index": {
		"operators": [
			{
				"name": "RefLoader",
				"scope": "cluster",
				"arguments": {}
			}
		],
		"description": "Automatic workload pipeline for localizing either a file or directory shared within cluster in a single node cluster"
	},
	"GermlineSnpsIndelsGatk4Hg19.PreProcessingForVariantDiscovery_GATK4.BaseRecalibrator.ref_dict": {
		"operators": [
			{
				"name": "RefLoader",
				"scope": "cluster",
				"arguments": {}
			}
		],
		"description": "Automatic workload pipeline for localizing either a file or directory shared within cluster in a single node cluster"
	},
	"GermlineSnpsIndelsGatk4Hg19.PreProcessingForVariantDiscovery_GATK4.BaseRecalibrator.dbSNP_vcf": {
		"operators": [
			{
				"name": "RefLoader",
				"scope": "cluster",
				"arguments": {}
			}
		],
		"description": "Automatic workload pipeline for localizing either a file or directory shared within cluster in a single node cluster"
	},
	"GermlineSnpsIndelsGatk4Hg19.PreProcessingForVariantDiscovery_GATK4.BaseRecalibrator.input_bam": {
		"operators": [
			"CopyToLocalLoader"
		],
		"description": "Automatic workload pipeline for localizing a file per partition to each executor of cluster"
	},
	"GermlineSnpsIndelsGatk4Hg19.PreProcessingForVariantDiscovery_GATK4.BaseRecalibrator.ref_fasta": {
		"operators": [
			{
				"name": "RefLoader",
				"scope": "cluster",
				"arguments": {}
			}
		],
		"description": "Automatic workload pipeline for localizing either a file or directory shared within cluster in a single node cluster"
	},
	"GermlineSnpsIndelsGatk4Hg19.PreProcessingForVariantDiscovery_GATK4.ApplyBQSR.recalibration_report": {
		"operators": [
			"CopyToLocalLoader"
		],
		"description": "Automatic workload pipeline for localizing a file per partition to each executor of cluster"
	},
	"GermlineSnpsIndelsGatk4Hg19.PreProcessingForVariantDiscovery_GATK4.SortAndFixTags.ref_fasta_index": {
		"operators": [
			{
				"name": "RefLoader",
				"scope": "cluster",
				"arguments": {}
			}
		],
		"description": "Automatic workload pipeline for localizing either a file or directory shared within cluster in a single node cluster"
	},
	"GermlineSnpsIndelsGatk4Hg19.PreProcessingForVariantDiscovery_GATK4.BaseRecalibrator.dbSNP_vcf_index": {
		"operators": [
			{
				"name": "RefLoader",
				"scope": "cluster",
				"arguments": {}
			}
		],
		"description": "Automatic workload pipeline for localizing either a file or directory shared within cluster in a single node cluster"
	},
	"GermlineSnpsIndelsGatk4Hg19.PreProcessingForVariantDiscovery_GATK4.BaseRecalibrator.input_bam_index": {
		"operators": [
			"CopyToLocalLoader"
		],
		"description": "Automatic workload pipeline for localizing a file per partition to each executor of cluster"
	},
	"GermlineSnpsIndelsGatk4Hg19.PreProcessingForVariantDiscovery_GATK4.BaseRecalibrator.ref_fasta_index": {
		"operators": [
			{
				"name": "RefLoader",
				"scope": "cluster",
				"arguments": {}
			}
		],
		"description": "Automatic workload pipeline for localizing either a file or directory shared within cluster in a single node cluster"
	},
	"GermlineSnpsIndelsGatk4Hg19.PreProcessingForVariantDiscovery_GATK4.BaseRecalibrator.known_indels_sites_VCFs": {
		"operators": [
			{
				"name": "RefLoader",
				"scope": "cluster",
				"arguments": {}
			}
		],
		"description": "Automatic workload pipeline for localizing either a file or directory shared within cluster in a single node cluster"
	},
	"GermlineSnpsIndelsGatk4Hg19.PreProcessingForVariantDiscovery_GATK4.BaseRecalibrator.known_indels_sites_indices": {
		"operators": [
			{
				"name": "RefLoader",
				"scope": "cluster",
				"arguments": {}
			}
		],
		"description": "Automatic workload pipeline for localizing either a file or directory shared within cluster in a single node cluster"
	}
},
"inputs": {
	"GermlineSnpsIndelsGatk4Hg19.refFa": "hg19/ref.fa",
	"GermlineSnpsIndelsGatk4Hg19.refSa": "hg19/ref.fa.sa",
	"GermlineSnpsIndelsGatk4Hg19.refAmb": "hg19/ref.fa.amb",
	"GermlineSnpsIndelsGatk4Hg19.refAnn": "hg19/ref.fa.ann",
	"GermlineSnpsIndelsGatk4Hg19.refBwt": "hg19/ref.fa.bwt",
	"GermlineSnpsIndelsGatk4Hg19.refFai": "hg19/ref.fa.fai",
	"GermlineSnpsIndelsGatk4Hg19.refPac": "hg19/ref.fa.pac",
	"GermlineSnpsIndelsGatk4Hg19.refDict": "hg19/ref.dict",
	"GermlineSnpsIndelsGatk4Hg19.refName": "hg19",
	"GermlineSnpsIndelsGatk4Hg19.dbsnpVCF": "hg19/DbSNP.vcf.gz",
	"GermlineSnpsIndelsGatk4Hg19.makeGVCF": false,
	"GermlineSnpsIndelsGatk4Hg19.gatk_path": "/gatk/gatk-4.2.0.0/gatk",
	"GermlineSnpsIndelsGatk4Hg19.gotc_path": "/usr/bin/",
	"GermlineSnpsIndelsGatk4Hg19.inFileFqs": [
		"R1.fq.gz",
		"R2.fq.gz"
	],
	"GermlineSnpsIndelsGatk4Hg19.makeBamout": false,
	"GermlineSnpsIndelsGatk4Hg19.sampleName": "NA12878",
	"GermlineSnpsIndelsGatk4Hg19.dbsnpVCFTbi": "hg19/DbSNP.vcf.gz.tbi",
	"GermlineSnpsIndelsGatk4Hg19.maxMemForGATK": 6,
	"GermlineSnpsIndelsGatk4Hg19.bwaCommandline": "bwa mem -K 100000000 -v 3 -t 16 -Y $bash_ref_fasta",
	"GermlineSnpsIndelsGatk4Hg19.IndexBam.dockerImage": "atgenomix.azurecr.io/atgenomix/seqslab_runtime-1.5_ubuntu-20.04_preprocessgatk4-4.2.0.0",
	"GermlineSnpsIndelsGatk4Hg19.IndexVcf.dockerImage": "atgenomix.azurecr.io/atgenomix/seqslab_runtime-1.5_ubuntu-20.04_preprocessgatk4-4.2.0.0",
	"GermlineSnpsIndelsGatk4Hg19.knownIndelsSitesIdxs": [
		"hg19/Homo_sapiens_known_indels.vcf.gz.tbi",
		"hg19/Mills_and_1000G_gold_standard.indels.vcf.gz.tbi"
	],
	"GermlineSnpsIndelsGatk4Hg19.knownIndelsSitesVCFs": [
		"hg19/Homo_sapiens_known_indels.vcf.gz",
		"hg19/Mills_and_1000G_gold_standard.indels.vcf.gz"
	]
},
"datasets": {
	"GermlineSnpsIndelsGatk4Hg19.refFa": "drs://dev-run-test-api.seqslab.net/drs_m2p4rn9AAQxzq8r",
	"GermlineSnpsIndelsGatk4Hg19.refSa": "drs://dev-run-test-api.seqslab.net/drs_J92ooLhbQT0Ejr8",
	"GermlineSnpsIndelsGatk4Hg19.refAmb": "drs://dev-run-test-api.seqslab.net/drs_lomzuO7Y8uwYVHW",
	"GermlineSnpsIndelsGatk4Hg19.refAnn": "drs://dev-run-test-api.seqslab.net/drs_0OZxbvK0Pd2h7qa",
	"GermlineSnpsIndelsGatk4Hg19.refBwt": "drs://dev-run-test-api.seqslab.net/drs_JrIY9ijlOCD8Kzo",
	"GermlineSnpsIndelsGatk4Hg19.refFai": "drs://dev-run-test-api.seqslab.net/drs_cc8wJojcK2Cr4rV",
	"GermlineSnpsIndelsGatk4Hg19.refPac": "drs://dev-run-test-api.seqslab.net/drs_4o2P76KSbI1NpYg",
	"GermlineSnpsIndelsGatk4Hg19.refDict": "drs://dev-run-test-api.seqslab.net/drs_ppHVTW5bXrc1bGM",
	"GermlineSnpsIndelsGatk4Hg19.dbsnpVCF": "drs://dev-run-test-api.seqslab.net/drs_NeR755e3xvezpIl",
	"GermlineSnpsIndelsGatk4Hg19.inFileFqs": [
		"drs://dev-run-test-api.seqslab.net/drs_DHDJR2fDcfOiTml",
		"drs://dev-run-test-api.seqslab.net/drs_oEg0ZKfJmImj2Qj"
	],
	"GermlineSnpsIndelsGatk4Hg19.dbsnpVCFTbi": "drs://dev-run-test-api.seqslab.net/drs_mIhEtbyNn3uRp3o",
	"GermlineSnpsIndelsGatk4Hg19.knownIndelsSitesIdxs": [
		"drs://dev-run-test-api.seqslab.net/drs_vQkUPP6U0T90QLr",
		"drs://dev-run-test-api.seqslab.net/drs_zKDNhTrtXQAeP0a"
	],
	"GermlineSnpsIndelsGatk4Hg19.knownIndelsSitesVCFs": [
		"drs://dev-run-test-api.seqslab.net/drs_JYeHsgeebq2tthj",
		"drs://dev-run-test-api.seqslab.net/drs_M2sJHwsqkdjhSqq"
	]
}

}

When running jobs, the input.json file defines the configuration of the entire workflow. While you can choose to write your own code, SeqsLab allows you to leverage the power of operator pipelines.

Example 1#

For example, the above example defines inFileBam as an input for the task GermlineSnpsIndelsGatk4Hg19.IndexBam. It uses the following operator pipeline:

["BamPartitionerPart1", "BamExecutor"]

When combined, these two operators perform the following tasks:

  1. Repartition the BAM file into one partition (BamPartitionerPart1).

  2. Localize the repartitioned BAM file (BamExecutor).

Example 2#

Next, let’s take a look at the following code snippet:

},
"GermlineSnpsIndelsGatk4Hg19.IndexVcf.inFileVCF": {
	"operators": [
		"VcfPartitionerHg19Part1",
		"VcfExecutor"
	],
	"description": "File-based VCF workload pipeline with HG19 reference genome with all VCF records in a single partition"

Here, the operator pipeline is "VcfPartitionerHg19Part1","VcfExecutor", which first repartitions the VCF file to one partition based on the HG19 reference, and then localizes the repartitioned VCF to the disk.

Example 3#

Lastly, let’s consider the following code snippet:

},
"GermlineSnpsIndelsGatk4Hg19.HaplotypeCallerGvcf_GATK4.HaplotypeCaller.input_bam": {
	"operators": [
		"CopyToLocalLoader"
	],
	"description": "Automatic workload pipeline for localizing a file per partition to each executor of cluster"

The pipeline operator "CopyToLocalLoader" may seem simple, but it actually performs several tasks. First, it localizes the data to the disk. Next, if the data is partitioned, the each data is downloaded into an executor. If not, the entire data is treated as one partition and downloaded into one executor. The downloaded data is then used when executing a command.