Pipeline operators

Note

The following section is only recommended for advanced users who would like to create their own operators. If you are not familiar with custom pipelines, you can use any of the standard pipelines that are available on the SeqsLab platform.

Operators

Source

  • Can be used in preprocess pipeline or call pipeline

  • Read data into memory and could be partitioned or execute task cmd later

  • Steps for customizing Source:

    1. Extend Source class

    2. Type, order and number of arguments to class should be IDENTICAL to Source

    3. Implement readImpl()

      def readImpl(drsId: DrsID, method: AccessMethod, local: Path)(implicit spark: SparkSession): ContentRddDataset
    
    

Partition

  • Can only be used in preprocess pipeline

  • Use partition operator to parallelize the input. The partitioned input is used to parallelly execute task cmd to speed up

  • Steps for customizing Partition:

    1. Extend Partition class

    2. Type, order and number of arguments to class should be IDENTICAL as Partition

    3. implement partitionImpl()

    def partitionImpl(ds: RddDataset)(implicit spark: SparkSession): Dataset
    

Mpipe

  • In-memory operation

  • Can be used in preprocess pipeline call pipeline and Postprocess pipeline

  • Steps for customizing Mpipe:

    1. Extend Mpipe class

    2. Type, order and number of arguments to class should be EXACTLY SAME as Mpipe

    3. Implement pipe()

    def pipe(ds: Dataset)(implicit spark: SparkSession): Dataset
    

Cpipe

  • Take StringRddDataset as input to WDL task cmd

  • Concrete class, just use it

Ppipe

  • Take PathRddDataset as input to WDL task cmd

  • Concrete class, just use it

Format

  • An operator for read/write data from/to url

  • Can only be used in call pipeline

  • Format class will perform integrity check if integrity arg to read() is true

  • GenericFormat does NOT perform integrity check by default

  • Steps for customizing Format:

    1. Extend Format class

    2. Type, order and number of arguments to class should be EXACTLY SAME as Format

    3. Implement readImpl()

    def readImpl(drsId: DrsID, method: AccessMethod, local: Path)(implicit spark: SparkSession): Dataset
    
    1. Implement writeImpl()

    def writeImpl(ds: Dataset, drsId: DrsID, method: AccessMethod)(implicit spark: SparkSession): Unit
    

IntegrityFormat

  • A Format operator that performs data integrity check before downloading

  • Only be used in call pipeline

  • Steps for customizing Format:

    1. Extend IntegrityFormat class

    2. Type, order and number of arguments to class should be IDENTICAL as IntegrityFormat

    3. Method required to implement is the same as Format

Collect

  • Collect WDL task cmd output data

  • Can only be used in output pipeline

  • Steps for customizing Collect:

    1. Extend Collect class

    2. Type, order and number of arguments to class should be EXACTLY SAME as Collect

    3. Implement collect()

    def collect(ds: Dataset)(implicit spark: SparkSession): (RddDataset, OutputC)
    

Sink

  • Upload WDL taskcmd output data to specific url

  • Can only be used in output pipeline

  • Steps for customizing Sink :

    1. Extend Sink class

    2. Type, order and number of arguments to class should be EXACTLY SAME as Sink

    3. Implement uploader()

    def uploader(appId: String, src: Path, dst: URI)(implicit hadoopConfMap: Map[String, String]): Unit
    

Datasets

Operators could generate any of the following kinds of dataset:

  • RegularDataset

    • Only the RegularFile and RegularDir operators generate this dataset

    • Used for preparing reference or database data for WDL tasks

  • RddDataset

    • PathRddDataset

      • File path is stored in rdd, so these files will be downloaded before executing WDL task cmd

      • The inputs to PPipe is PathRddDataset

    • ListRddDataset

      • Input dataset which has multiple elements

    • ContentRddDataset

      • StringRddDataset

        • Abstract class

        • String type data content is stored in rdd

        • The inputs to CPipe could be StringRddDataset

      • BinaryRddDataset

        • Abstract class

        • Binary data content is stored in rdd

        • The inputs to CPipe could be BinaryRddDataset

    • OutputDataset

      • Output result for WDL task cmd

      • Does not contain inputId

Note

The dataset type should be consistent in a pipeline. In other words, the type of output dataset of the current operator should be the same as the type of input dataset of the next operator. For example, if a preprocess pipeline uses FastqSource FastqPartition and FastqFormat, then the type of input/output dataset for each operator should be:

  • FastqSource

    • output: FastqInputDataset

  • FastqPartition

    • input: FastqInputDataset

    • output: StringRddDataset

  • FastqFormat

    • input: StringRddDataset