BioSkills bio-workflow-management-nextflow-pipelines
Create scalable, containerized bioinformatics pipelines with Nextflow DSL2 supporting Docker, Singularity, and cloud execution. Use when building portable pipelines with container support, running workflows on cloud platforms (AWS, Google Cloud), or leveraging nf-core community pipelines.
git clone https://github.com/GPTomics/bioSkills
T=$(mktemp -d) && git clone --depth=1 https://github.com/GPTomics/bioSkills "$T" && mkdir -p ~/.claude/skills && cp -r "$T/workflow-management/nextflow-pipelines" ~/.claude/skills/gptomics-bioskills-bio-workflow-management-nextflow-pipelines && rm -rf "$T"
workflow-management/nextflow-pipelines/SKILL.mdVersion Compatibility
Reference examples tested with: FastQC 0.12+, MultiQC 1.21+, Nextflow 23.10+, Salmon 1.10+, Snakemake 8.0+, fastp 0.23+
Before using code patterns, verify installed versions match. If versions differ:
- CLI:
then<tool> --version
to confirm flags<tool> --help
If code throws ImportError, AttributeError, or TypeError, introspect the installed package and adapt the example to match the actual API rather than retrying.
Nextflow Pipelines
"Create a scalable containerized pipeline with Nextflow" → Build DSL2 workflows with process definitions, channel-based data flow, Docker/Singularity container support, and cloud execution (AWS, Google Cloud) for portable bioinformatics analysis.
- CLI:
for pipeline executionnextflow run main.nf - Groovy: DSL2 process/workflow syntax for pipeline definition
Basic Pipeline Structure
// main.nf nextflow.enable.dsl=2 params.reads = "data/*_{1,2}.fq.gz" params.outdir = "results" process FASTQC { input: tuple val(sample_id), path(reads) output: path("*.html"), emit: html path("*.zip"), emit: zip script: """ fastqc ${reads} """ } workflow { Channel.fromFilePairs(params.reads) | FASTQC }
DSL2 Modules
// modules/fastqc.nf process FASTQC { tag "${sample_id}" publishDir "${params.outdir}/qc", mode: 'copy' input: tuple val(sample_id), path(reads) output: tuple val(sample_id), path("*.html"), emit: html tuple val(sample_id), path("*.zip"), emit: zip script: """ fastqc -t ${task.cpus} ${reads} """ }
// main.nf include { FASTQC } from './modules/fastqc' include { ALIGN } from './modules/align' workflow { reads_ch = Channel.fromFilePairs(params.reads) FASTQC(reads_ch) ALIGN(reads_ch) }
Config File
// nextflow.config params { reads = "data/*_{1,2}.fq.gz" outdir = "results" genome = "ref/genome.fa" } process { cpus = 4 memory = '8 GB' time = '2h' withName: 'ALIGN' { cpus = 16 memory = '32 GB' } } profiles { docker { docker.enabled = true } singularity { singularity.enabled = true } slurm { process.executor = 'slurm' } }
Container Support
process SALMON_QUANT { container 'quay.io/biocontainers/salmon:1.10.0--h7e5ed60_0' input: tuple val(sample_id), path(reads) path(index) output: tuple val(sample_id), path("${sample_id}"), emit: quant script: """ salmon quant -i ${index} -l A -1 ${reads[0]} -2 ${reads[1]} \ -o ${sample_id} --threads ${task.cpus} """ }
Channel Operations
// From file pairs Channel.fromFilePairs("data/*_{1,2}.fq.gz") .set { reads_ch } // From path Channel.fromPath("data/*.bam") .map { file -> tuple(file.baseName, file) } .set { bam_ch } // From samplesheet Channel.fromPath(params.samplesheet) .splitCsv(header: true) .map { row -> tuple(row.sample, file(row.fastq_1), file(row.fastq_2)) } .set { samples_ch } // Combine channels reads_ch.combine(reference_ch)
Subworkflows
// subworkflows/qc.nf include { FASTQC } from '../modules/fastqc' include { MULTIQC } from '../modules/multiqc' workflow QC { take: reads main: FASTQC(reads) MULTIQC(FASTQC.out.zip.collect()) emit: qc_report = MULTIQC.out.report }
// main.nf include { QC } from './subworkflows/qc' include { ALIGN } from './subworkflows/align' workflow { reads = Channel.fromFilePairs(params.reads) QC(reads) ALIGN(reads) }
Cluster Execution
// nextflow.config for SLURM process { executor = 'slurm' queue = 'normal' clusterOptions = '--account=myproject' withLabel: 'high_memory' { memory = '128 GB' queue = 'highmem' } } executor { name = 'slurm' queueSize = 100 submitRateLimit = '10 sec' }
AWS/Cloud Execution
// nextflow.config for AWS Batch process { executor = 'awsbatch' queue = 'my-batch-queue' } aws { region = 'us-east-1' batch { cliPath = '/usr/local/bin/aws' } }
# Run on AWS nextflow run main.nf -profile awsbatch -bucket-dir s3://my-bucket/work
Resource Labels
process { withLabel: 'process_low' { cpus = 2 memory = '4 GB' time = '1h' } withLabel: 'process_medium' { cpus = 8 memory = '16 GB' time = '4h' } withLabel: 'process_high' { cpus = 16 memory = '64 GB' time = '12h' } }
process ALIGN { label 'process_high' // ... }
Error Handling
process RISKY_PROCESS { errorStrategy 'retry' maxRetries 3 memory { 8.GB * task.attempt } script: """ memory_intensive_command """ } process OPTIONAL_PROCESS { errorStrategy 'ignore' // ... }
Caching and Resume
# Resume from last run nextflow run main.nf -resume # Clean work directory nextflow clean -f # Show execution trace nextflow log
Complete RNA-seq Pipeline
nextflow.enable.dsl=2 params.reads = "data/*_{1,2}.fq.gz" params.salmon_index = "ref/salmon_index" params.outdir = "results" process FASTP { tag "${sample_id}" publishDir "${params.outdir}/trimmed", mode: 'copy' input: tuple val(sample_id), path(reads) output: tuple val(sample_id), path("${sample_id}_{1,2}.trimmed.fq.gz"), emit: reads path("${sample_id}.json"), emit: json script: """ fastp -i ${reads[0]} -I ${reads[1]} \ -o ${sample_id}_1.trimmed.fq.gz -O ${sample_id}_2.trimmed.fq.gz \ --json ${sample_id}.json --thread ${task.cpus} """ } process SALMON_QUANT { tag "${sample_id}" publishDir "${params.outdir}/salmon", mode: 'copy' input: tuple val(sample_id), path(reads) path(index) output: tuple val(sample_id), path("${sample_id}"), emit: quant script: """ salmon quant -i ${index} -l A -1 ${reads[0]} -2 ${reads[1]} \ -o ${sample_id} --threads ${task.cpus} """ } process MULTIQC { publishDir "${params.outdir}", mode: 'copy' input: path('*') output: path("multiqc_report.html") script: """ multiqc . """ } workflow { reads_ch = Channel.fromFilePairs(params.reads) index_ch = Channel.fromPath(params.salmon_index) FASTP(reads_ch) SALMON_QUANT(FASTP.out.reads, index_ch.first()) qc_files = FASTP.out.json.collect() .mix(SALMON_QUANT.out.quant.collect()) MULTIQC(qc_files.collect()) }
Related Skills
- workflow-management/snakemake-workflows - Snakemake alternative
- workflows/rnaseq-to-de - End-to-end RNA-seq
- read-qc/fastp-workflow - QC processes