diff --git a/adr/20250825-workflow-params.md b/adr/20250825-workflow-params.md index 17a41ef626..b26b669616 100644 --- a/adr/20250825-workflow-params.md +++ b/adr/20250825-workflow-params.md @@ -39,6 +39,8 @@ This approach has several limitations: - Coerce CLI parameter values based on declared types, rather than relying on heuristics. +- Support collection-type parameters that can be loaded from structured files (CSV, JSON, YAML). + ## Non-goals - Removing the legacy `params.foo = bar` syntax -- legacy parameters must continue to work without modification. @@ -103,6 +105,42 @@ When a parameter is supplied on the command line, Nextflow converts the string v This replaces the heuristic type detection used for legacy parameters. +### Samplesheets as collection-type parameters + +A parameter with a collection type (`List`, `Set`, `Bag`) can be supplied as a file path. Nextflow parses the file and assigns the resulting collection to the parameter. Supported formats are CSV, JSON, and YAML: + +```groovy +params { + samples: List // can be supplied as a CSV, JSON, or YAML file path +} + +record Sample { + id: String + fastq_1: Path + fastq_2: Path +} +``` + +The file contents must be compatible with the declared element type; an error is thrown if they are not. CSV files must include a header row and use a comma as the column separator. + +The collection-type parameter can use a generic type such as `Map` or `Record`, or a custom record type to enable further validation. In the above example, using the `Sample` type ensures that each samplesheet row is validated against the record fields and the `fastq_1` and `fastq_2` columns are treated as file paths. + +This feature allows collection-type parameters to serve as *samplesheet inputs*, which simplifies the workflow logic and allows it to be agnostic to the input format: + +```groovy +// before (CSV only) +ch_samples = channel.fromPath(param.samples) + .flatMap { csv -> + csv.splitCsv(header: true, sep: ',') + } + .map { r -> + record(id: r.id, fastq_1: file(r.fastq_1), fastq_2: file(r.fastq_2)) + } + +// after (CSV, JSON, or YAML) +ch_samples = channel.fromList(param.samples) +``` + ### Compile-time validation Legacy parameters can be accessed globally by all scripts in the pipeline. While this approach is flexible, it prevents compile-time validation and breaks modularity. @@ -227,3 +265,80 @@ For example, when loading a JSON file as a collection of records, Nextflow uses - If a JSON object is missing a record field that is marked as nullable, it is considered valid While type annotations are used only at compile-time in all other contexts, they are needed at runtime for pipeline parameters in order to validate and convert external input data to the expected type. + +### Standard library functions for loading structured files + +The automatic loading of samplesheet inputs is supported only in the `params` block. It could also be useful to load structured files with functions. For example: + +```groovy +samples = fromJson('samples.json') +``` + +Where `fromJson` is a function that loads arbitrary data from a JSON file. + +A function is more flexible because it can be used anywhere in pipeline code, whereas the automatic samplesheet loading can only be used for pipeline-level inputs. For example, a process might produce a JSON file that needs to be read in workflow logic and processed by downstream tasks in parallel. + +However, data-loading functions like `fromJson` cannot be statically typed, since the data file could contain anything (number, string, list, map, etc). Primitive values can be coerced using a Groovy-style cast (e.g. `fromJson('...') as Map`), but this approach does not support parameterized types or Nextflow record types. A function like `fromJson` also assumes a specific file format, which is overly restrictive for a pipeline-level input that could be supplied in a variety of formats. + +Loading structured files with a typed parameter such as `samples: List` allows the input to have a well-defined type at compile-time and allows it to be sourced from any data format. While currently only CSV, JSON, and YAML are supported, the format can be made extensible in the future so that users can integrate their own data formats (e.g. Parquet) via plugins. Data-loading functions like `fromJson` can also be implemented for other use cases in the future, but they are not sufficient for handling pipeline-level inputs. + +### Typed parameters and schemas + +While this ADR does not specify any native integration with JSON schema, it is worth addressing how typed parameters are expected to interact with schemas in the future. + +A common approach for Nextflow pipelines is to define a JSON schema for the pipeline parameters. A samplesheet param is typically defined as a file input with its own *samplesheet schema*. Developers typically load and validate samplesheet params using the `samplesheetToList` function from the `nf-schema` plugin: + +```groovy +include { samplesheetToList } from 'plugin/nf-schema' + +params.input = null + +workflow { + samples = samplesheetToList(params.input, "assets/schema_input.json") + channel.fromList(samples).view() +} +``` + +The parameter schema provides similar validation and type coercion as a typed parameter. For example, the following record type: + +```groovy +record Sample { + id: String + fastq_1: Path + fastq_2: Path? +} +``` + +Is equivalent to the following JSON schema: + +```json +{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "type": "array", + "items": { + "type": "object", + "properties": { + "id": { "type": "string" }, + "fastq_1": { "type": "string", "format": "file-path", "exists": true }, + "fastq_2": { "type": "string", "format": "file-path", "exists": true } + }, + "required": ["id", "fastq_1"] + } +} +``` + +The `samplesheetToList` function has the same limitation as `fromJson` described above -- it is not statically typed. Even with the schema file, the type checker cannot guarantee a specific return type at compile-time because there is no type annotation. The above example must be rewritten as follows in order to be statically typed: + +```groovy +params { + input: List +} + +workflow { + channel.fromList(params.input).view() +} +``` + +On the other hand, the samplesheet schema can specify additional validations that cannot be expressed with Nextflow types, such as min/max constraints for numbers and pattern constraints for strings. + +It is not yet clear whether it is better for Nextflow to enforce these schema properties at runtime, or for users to implement the equivalent validation in their pipeline code. If needed, Nextflow should be able to augment the `params` block with the parameter schema to provide this extra validation. The above example would work without any additional code changes. diff --git a/adr/20251020-workflow-outputs.md b/adr/20251020-workflow-outputs.md index 65da201faf..befb80ecfc 100644 --- a/adr/20251020-workflow-outputs.md +++ b/adr/20251020-workflow-outputs.md @@ -191,6 +191,72 @@ While this approach is less verbose, it breaks the modularity of processes and s On the other hand, propagating all workflow outputs to the top will make pipelines more verbose, especially when using "skinny tuple" channels. This issue will be alleviated by migrating from tuples to records -- for this reason, it is recommended that large pipelines be migrated to records before being migrated to workflow outputs. +### Inferring params and outputs from a named workflow + +Consider the following entry workflow which simply wraps a named workflow: + +```groovy +params { + samples: List + index: Path +} + +workflow { + main: + ch_samples = channel.fromList(samples) + rnaseq = RNASEQ(ch_samples, index) + + publish: + aligned = rnaseq.aligned + multiqc_report = rnaseq.multiqc_report +} + +output { + aligned: Channel { + path { s -> /* ... */ } + index { path 'aligned.json' } + } + multiqc_report: Path { + path '.' + } +} +``` + +Where the `RNASEQ` workflow is defined as follows: + +```groovy +workflow RNASEQ { + take: + ch_samples: Channel + index: Path + + main: + ch_aligned = ALIGN(ch_samples, index) + multiqc_report = MULTIQC(ch_aligned.collect()) + + emit: + aligned: Channel = ch_aligned + multiqc_report: Path = multiqc_report +} + +record Sample { /* ... */ } +record AlignedSample { /* ... */ } +``` + +This example demonstrates that most of the `params` / `workflow` / `output` trio can be equivalently expressed by a named workflow: the `params` block mirrors the `take:` section, and the `output` block and `publish:` section together mirror the `emit:` section. + +Named workflows typically consume and produce channels so that they can be composed into larger pipelines. But this prevents them from being directly executable -- the purpose of an entry workflow is to translate between dataflow logic and the external world. If this translation could be inferred automatically, it would allow a named workflow to be both executable and composable, eliminating the need to define explicit entry workflows. + +Given a named workflow with dataflow inputs and outputs, the following capabilities would be needed to execute it directly: + +- Loading an input channel (e.g. channel of records) from an index file (e.g. CSV, JSON, or YAML file) +- Saving an output channel (e.g. channel of records) as an index file +- Publishing output files to a permanent location + +Channels can be automatically translated to/from index files using record types. However, the output directory structure cannot be automatically inferred. It is normally specified by the output `path` directive, and need not correspond at all to the structure of output channels. + +One solution is to not create an output directory at all. The workflow outputs provide a structured view of the output files, so this can be used by an external system (e.g. Seqera Platform) to provide a user interface. The output files can simply remain where they are produced, instead of being copied to a separate location. The work directory will likely need to be a global persistent data store, which implies global caching, automatic cleanup, and global search. + ## Links - Community issues: [#4042](https://github.com/nextflow-io/nextflow/issues/4042), [#4661](https://github.com/nextflow-io/nextflow/issues/4661), [#4670](https://github.com/nextflow-io/nextflow/issues/4670) diff --git a/docs/tutorials/static-types.md b/docs/tutorials/static-types.md index 8f96de8e21..7c524a069b 100644 --- a/docs/tutorials/static-types.md +++ b/docs/tutorials/static-types.md @@ -166,6 +166,47 @@ read_pairs_ch = channel.of(params.reads) } ``` +You can simplify the code further by modeling `params.reads` as a collection of records instead of a file path. + +Add a header row to the samplesheet: + +``` +id,fastq_1,fastq_2 +gut,... +liver,... +lung,... +spleen,... +``` + +Refactor `params.reads` as a collection of records: + +```nextflow +params { + // The input samplesheet of paired-end reads + reads: List = "${projectDir}/data/allreads.csv" + + // ... +} + +record Sample { + id: String + fastq_1: Path + fastq_2: Path +} +``` + +In the above, `Sample` is a *record type* based on the samplesheet structure. When a file path is supplied to a collection-type parameter (e.g., `List`), the file path is automatically loaded and parsed into a collection. + +Refactor the `read_pairs_ch` to load the collection into a channel: + +```nextflow +read_pairs_ch = channel.fromList(params.reads) +``` + +:::{note} +Collection-type params can also be loaded from JSON and YAML samplesheets. See {ref}`workflow-typed-params` for more information. +::: + ### Migrating processes See {ref}`process-typed-page` for an overview of typed processes. @@ -393,11 +434,7 @@ You can infer the type of each workflow input by examining how the workflow is c ```nextflow workflow { - read_pairs_ch = channel.of(params.reads) - .flatMap { csv -> csv.splitCsv() } - .map { row -> - record(id: row[0], fastq_1: file(row[1]), fastq_2: file(row[2])) - } + read_pairs_ch = channel.fromList(params.reads) RNASEQ(read_pairs_ch, params.transcriptome) @@ -407,7 +444,7 @@ workflow { You can determine the type of each input as follows: -- The channel `read_pairs_ch` has type `Channel`, where each record contains the fields `id`, `fastq_1`, `fastq_2`. +- The channel `read_pairs_ch` has type `Channel`, where `E` is the type of each value in the channel. It is loaded from `params.reads` which has type `List`. Therefore `read_pairs_ch` has type `Channel`. - The parameter `params.transcriptome` has type `Path` as defined in the `params` block. @@ -423,12 +460,6 @@ workflow RNASEQ { // ... } - -record Sample { - id: String - fastq_1: Path - fastq_2: Path -} ``` The `read_pairs_ch` channel also needs to provide all of the record fields required by downstream processes. It is used by `FASTQC` and `QUANT`, which both declare the following record input: @@ -518,11 +549,7 @@ The entry workflow is defined as follows: ```nextflow workflow { - read_pairs_ch = channel.of(params.reads) - .flatMap { csv -> csv.splitCsv() } - .map { row -> - record(id: row[0], fastq_1: file(row[1]), fastq_2: file(row[2])) - } + read_pairs_ch = channel.fromFilePairs(params.reads, checkIfExists: true, flat: true) (fastqc_ch, quant_ch) = RNASEQ(read_pairs_ch, params.transcriptome) @@ -538,11 +565,7 @@ Rewrite this workflow based on the updated params, processes, and subworkflows: nextflow.enable.types = true workflow { - read_pairs_ch = channel.of(params.reads) - .flatMap { csv -> csv.splitCsv() } - .map { row -> - record(id: row[0], fastq_1: file(row[1]), fastq_2: file(row[2])) - } + read_pairs_ch = channel.fromList(params.reads) samples_ch = RNASEQ(read_pairs_ch, params.transcriptome) @@ -554,7 +577,7 @@ workflow { } ``` -The `reads` param was refactored as a `Path`, so it is loaded into a channel of records using `splitCsv`. It is compatible with the records expected by `RNASEQ`. +The `reads` param was refactored as a collection of records, so it is loaded into a channel using `channel.fromList`. It is compatible with the records expected by `RNASEQ`. The `RNASEQ` workflow now returns a single combined channel, so the `mix` operation is no longer needed. The `flatMap` operator is used to extract the files from each record in `samples_ch`. diff --git a/modules/nextflow/src/main/groovy/nextflow/script/ParamsDsl.groovy b/modules/nextflow/src/main/groovy/nextflow/script/ParamsDsl.groovy index 8e35dbf0e5..ed22129bc9 100644 --- a/modules/nextflow/src/main/groovy/nextflow/script/ParamsDsl.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/script/ParamsDsl.groovy @@ -19,12 +19,16 @@ package nextflow.script import java.lang.reflect.Type import java.nio.file.Path +import groovy.json.JsonSlurper import groovy.transform.Canonical import groovy.transform.CompileStatic import groovy.util.logging.Slf4j +import groovy.yaml.YamlSlurper import nextflow.Session import nextflow.exception.ScriptRuntimeException +import nextflow.file.FileHelper import nextflow.script.dsl.Types +import nextflow.splitter.CsvSplitter import nextflow.util.Duration import nextflow.util.MemoryUnit import nextflow.util.TypeHelper @@ -135,6 +139,10 @@ class ParamsDsl { return MemoryUnit.of(str) } + if( TypeHelper.isCollectionType(decl.type) ) { + return resolveFromFile(decl, FileHelper.asPath(str)) + } + if( decl.type == Path ) { return TypeHelper.asPathType(str) } @@ -154,6 +162,9 @@ class ParamsDsl { final str = value.toString() + if( TypeHelper.isCollectionType(decl.type) ) + return resolveFromFile(decl, FileHelper.asPath(str)) + if( decl.type == Path ) return TypeHelper.asPathType(str) @@ -170,6 +181,25 @@ class ParamsDsl { } } + private static Object resolveFromFile(Param decl, Path file) { + final ext = file.getExtension() + final value = switch( ext ) { + case 'csv' -> new CsvSplitter().options(header: true, sep: ',').target(file).list() + case 'json' -> new JsonSlurper().parse(file) + case 'yaml' -> new YamlSlurper().parse(file) + case 'yml' -> new YamlSlurper().parse(file) + default -> throw new ScriptRuntimeException("Unrecognized file format '${ext}' for input file '${file}' supplied for parameter `${decl.name}` -- should be CSV, JSON, or YAML") + } + + try { + return TypeHelper.asCollectionType(value as Collection, decl.type) + } + catch( GroovyCastException | UnsupportedOperationException e ) { + final actualType = value.getClass() + throw new ScriptRuntimeException("Parameter `${decl.name}` with type ${Types.getName(decl.type)} cannot be assigned to contents of '${file}' [${Types.getName(actualType)}]") + } + } + private static boolean isAssignableFrom(Class target, Class source) { if( target == Float.class ) return Number.class.isAssignableFrom(source) diff --git a/modules/nextflow/src/test/groovy/nextflow/script/ParamsDslTest.groovy b/modules/nextflow/src/test/groovy/nextflow/script/ParamsDslTest.groovy index e2ceee469c..eade10ce64 100644 --- a/modules/nextflow/src/test/groovy/nextflow/script/ParamsDslTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/script/ParamsDslTest.groovy @@ -287,4 +287,199 @@ class ParamsDslTest extends Specification { e.message == "Input record [id:1, name:sample1] is missing field 'value' required by record type 'Sample'" } + def 'should load record collection param from CSV file'() { + given: + def csvFile = Files.createTempFile('test', '.csv') + csvFile.text = '''\ + id,name,value + 1,sample1,100 + 2,sample2,200 + 3,sample3,300 + '''.stripIndent() + def cliParams = [samples: csvFile.toString()] + + when: + def samples = runScript( + '''\ + params { + samples: List + } + + workflow { params.samples } + ''', + params: cliParams, + configParams: [:] + ) + + then: + samples instanceof List + samples.size() == 3 + samples[0].id == '1' + samples[0].name == 'sample1' + samples[0].value == '100' + samples[1].id == '2' + samples[2].id == '3' + + cleanup: + csvFile?.delete() + } + + def 'should load collection param from JSON file'() { + given: + def jsonFile = Files.createTempFile('test', '.json') + jsonFile.text = '''\ + [ + {"id": 1, "name": "sample1", "value": 100}, + {"id": 2, "name": "sample2", "value": 200}, + {"id": 3, "name": "sample3", "value": 300} + ] + '''.stripIndent() + def cliParams = [ + samplesList: jsonFile.toString(), + samplesBag: jsonFile.toString(), + samplesSet: jsonFile.toString() + ] + + when: + def params = runScript( + '''\ + params { + samplesList: List + samplesBag: Bag + samplesSet: Set + } + + workflow { params } + ''', + params: cliParams, + configParams: [:] + ) + then: + def samplesList = params.samplesList + samplesList instanceof List + samplesList.size() == 3 + samplesList[0].id == 1 + samplesList[0].name == 'sample1' + samplesList[0].value == 100 + samplesList[1].id == 2 + samplesList[2].id == 3 + + def samplesBag = params.samplesBag + samplesBag instanceof Bag + samplesBag.size() == 3 + + def samplesSet = params.samplesSet + samplesSet instanceof Set + samplesSet.size() == 3 + + cleanup: + jsonFile?.delete() + } + + def 'should load collection param from YAML file'() { + given: + def yamlFile = Files.createTempFile('test', '.yml') + yamlFile.text = '''\ + - id: 1 + name: sample1 + value: 100 + - id: 2 + name: sample2 + value: 200 + - id: 3 + name: sample3 + value: 300 + '''.stripIndent() + def cliParams = [samples: yamlFile.toString()] + + when: + def samples = runScript( + '''\ + params { + samples: List + } + + workflow { params.samples } + ''', + params: cliParams, + configParams: [:] + ) + + then: + samples instanceof List + samples.size() == 3 + samples[0].id == 1 + samples[0].name == 'sample1' + samples[0].value == 100 + samples[1].id == 2 + samples[2].id == 3 + + cleanup: + yamlFile?.delete() + } + + def 'should report error for unrecognized file format'() { + given: + def txtFile = Files.createTempFile('test', '.txt') + txtFile.text = 'some text' + def cliParams = [items: txtFile.toString()] + + when: + runScript( + '''\ + params { + items: List + } + + workflow { params.items } + ''', + params: cliParams, + configParams: [:] + ) + then: + def e = thrown(ScriptRuntimeException) + e.message.contains("Unrecognized file format 'txt' for input file") + } + + def 'should validate collection param with record type'() { + given: + def inputFile = Files.createTempFile('test', '.json') + inputFile.text = '''\ + [ + {"id": 1, "name": "sample1", "value": 100}, + {"id": 2, "name": "sample2", "value": 200}, + {"id": 3, "name": "sample3", "value": 300} + ] + '''.stripIndent() + + when: + def samples = runScript( + '''\ + params { + samples: List + } + + record Sample { + id: Integer + name: String + value: Integer + } + + workflow { + params.samples + } + ''', + params: [samples: inputFile.toString()] + ) + then: + samples instanceof List + samples.size() == 3 + samples[0] instanceof Record + samples[0].id == 1 + samples[0].name == 'sample1' + samples[0].value == 100 + samples[1].id == 2 + samples[2].id == 3 + } + }