nextflow.enable.dsl=2 import java.util.regex.Pattern include {post_csv_to_json_api as post_species_csv_to_json_api; post_csv_to_json_api as post_abr_csv_to_json_api; score as score_species; score as score_abr; put_file as upload_fast5; put_file_base as upload_fsummary} from './lib.nf' /* Map reads against local species database */ process map_species { // Only one fork so we _really_ don't run out of memory maxForks 1 container 'quay.io/biocontainers/minimap2:2.26--he4a0461_1' input: path species_fasta tuple val(prefix), path(reads_fastq) output: tuple path("${reads_fastq}_species_raw.paf"), val(prefix), emit: paf script: """ minimap2 --secondary=no -x map-ont "${species_fasta}" "${reads_fastq}" > "${reads_fastq}_species_raw.paf" """ } /* Take minimap PAF result from map_species and discard matches with low quality */ process discard_low_quality_species_scores { input: tuple path(species_raw), val(prefix) output: tuple path ("${species_raw}_qc.csv"), val(prefix), emit: species_detected_qc shell: ''' # discard matches with a quality(PAF column 12) < 60 or length(11) less than 200, # output the first 12 columns (PAF format) and add a column with mapping_percentage(10/11), # reduce all spaces to a single one and convert them into ',' # finally sort by species_accession(col 6) and store as csv awk '$12 > 59 && $12 < 255 && $11 > 199 {print $1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$10/$11}' !{species_raw}\ | sort | sed "s/^\\s\\+//g" | tr ' ' ',' | sort --key=6 --field-separator=, > !{species_raw}_qc.csv ''' } /* Map matched species sequence IDs to species names so they can be included in the result */ process lookup_species_names { input: tuple path(species_detected_qc), val(prefix) path species_fasta output: path 'species_names.csv', emit: species_names script: """ # TODO: this can probably be done more efficiently.. and also once and be permanently stored until the database changes.. grep '^>' ${species_fasta} > species_meta.txt looked_up='' while read line; do id="\$(echo \${line} | awk -F, '{print \$6}')" if [[ ! \${looked_up[*]} =~ \${id} ]];then looked_up=(\${looked_up[@]} \${id}) grep \${id} species_meta.txt fi; done < "${species_detected_qc}" | uniq | tr -d '>' | sed "s/,/ -/g" | sed "s/^\\([A-Z,0-9,_.]*\\)\\s/\\1,/g" | sort > species_names.csv """ } /* Include species names in the table of detected species */ process join_species_names { input: path species_names tuple path(species_detected_qc), val(prefix) output: tuple path("${species_detected_qc}_with_names.csv"), val(prefix), emit: species_detected publishDir "${params.publishdir}/${prefix}/" script: """ output_name="${species_detected_qc}_with_names.csv" echo 'species_accession,query_id,query_length,query_start,query_end,strand,target_length,target_start,'\ 'target_end,residue_matches,alignment_block_length,mapping_quality,match_percentage,species_name' > "\${output_name}" join -1 6 -2 1 -t , "${species_detected_qc}" "${species_names}" >> "\${output_name}" """ } /* Map reads agains CARD database for antibiotic resistancies */ process detect_abr { container 'quay.io/biocontainers/minimap2:2.26--he4a0461_1' input: path card_db tuple val(prefix), path(reads_fastq) output: tuple path("${reads_fastq}_abr_raw.paf"), val(prefix), emit: abr_raw script: """ minimap2 --secondary=no -x map-ont "${card_db}" "${reads_fastq}" > "${reads_fastq}_abr_raw.paf" """ } /* Take minimap2 PAF result from detect_abr and discard matches with low quality */ process discard_low_quality_abr_scores { input: tuple path(abr_raw_paf), val(prefix) output: tuple path("${abr_raw_paf}_qc.csv"), val(prefix), emit: abr_detected_raw publishDir "${params.publishdir}/${prefix}" script: """ # discard matches with a quality(PAF column 12) < 60 or length(11) less than 200, # output the first 12 columns (PAF format) and add a column with mapping_percentage(10/11), # reduce all spaces to a single one and convert them into ',' # finally sort by species_accession(col 6) and store as csv echo 'query_id,query_length,query_start,query_end,strand,abr_card_id,target_length,target_start,target_end,'\ 'residue_matches,alignment_block_length,mapping_quality,match_percentage' > "${abr_raw_paf}_qc.csv" awk '\$12 > 59 && \$12 < 255 && \$11 > 199 {print \$1,\$2,\$3,\$4,\$5,\$6,\$7,\$8,\$9,\$10,\$11,\$12,\$10/\$11}' "${abr_raw_paf}"|\ sort | sed "s/^\\s\\+//g" | tr ' ' ',' >> "${abr_raw_paf}_qc.csv" """ } // Create an unique project id from the name of the project directory, and the flowcell name and runIDs extracted from the filename of the given file def get_project_id_from_fastx(somefile) { // the project dir is the parent of the directory containing the fastx files x = somefile.collect()*.toString() proj_dir = x[x.findIndexOf{ it in params.minknowsubdirs} - 2] // split the filename into parts, remove the second part (which is always 'fail' or 'pass') and the last part (which contains the file number and extension) // afterwards join the remaining parts together and then with the name of the project dir to obtain the full project id filename_parts = somefile.getBaseName().split(params.namedelimiter) return [proj_dir].plus( filename_parts.minus(filename_parts[1, -1]).join(params.idjoiner) ).join(params.idjoiner) } // Get the same project id from the filename of a final_summary which minkow creates when the run is finished. // BUT: If the sample uses barcoding, the barcode-id will be part of the id extracted from the fastx files (so they can be separated in the frontend), // but not part of this id, because there is only one final_summary for the whole run. This has to be handled by the cloud-workflow. def get_project_id_from_summary(somesummary) { [somesummary.getParent()[-2]].plus(somesummary.getBaseName().split(params.namedelimiter)[2,3,4]).join(params.idjoiner) } def makeregexpattern(somedir, somepattern) { // as a function because nextflow otherwise complains about an already used variable return Pattern.compile( somedir + somepattern ) } workflow { if (params.minknowdir.endsWith(File.separator)) { minknowdir = params.minknowdir } else { minknowdir = params.minknowdir + File.separator } while (! file(minknowdir).exists()){ println "MinKnow output dir '${minknowdir}' does not exist yet. Checking again in 10 seconds. (Press Ctrl+C three times to cancel)" sleep(10000) } println "Starting workflow with following parameters:" println "" println "Reads directory to watch: ${minknowdir + params.fastq_pattern_regex}" println "FAST5_pass directory to watch: ${minknowdir + params.fast5_pattern_glob}" println "Spezies database: $params.species" println "CARD database: $params.card" println "DB API server: $params.db_api_server" println "Cloud API server: $params.cloud_api_server" println "" // Fasta file with genomes of all species relevant to us species = channel.value(params.species) // Creates a channel which emits every file newly cerated in the directory params.minknowdir/fastq_pass/ // The channel emits a tuple (with the .map operator) of the file and an unique project id reads_glob = channel.watchPath(minknowdir+params.fastq_pattern_glob).map{ it -> [ get_project_id_from_fastx(it) , it]} checkreadsPattern = makeregexpattern(minknowdir, params.fastq_pattern_regex) reads = reads_glob.filter({ prj, read -> checkreadsPattern.matcher(read.toString()).matches()}) if( !(params.upload_fast5 in params.false_values) ) { // Additionally watch the created FAST5 files to forward them to the cloud workflow and wait for the final summary fast5files = channel.watchPath(minknowdir+params.fast5_pattern_glob).map{ it -> [it, get_project_id_from_fastx(it)]} fsummary = channel.watchPath(minknowdir+params.summary_pattern_glob).map{ it -> [it, get_project_id_from_summary(it)]} checkfsumPattern = makeregexpattern(minknowdir, params.summary_pattern_regex) upload_fsummary(fsummary.filter({ sum, prj -> checkfsumPattern.matcher(sum.toString()).matches()}), params.cloud_api_server) upload_fast5(fast5files,params.cloud_api_server, params.cloud_api_fast5_endpoint) } /* Species part */ // Map reads to species and discard mappings with low quality map_species(species, reads) | discard_low_quality_species_scores // Match the detected species ids with their common names lookup_species_names(discard_low_quality_species_scores.out.species_detected_qc, species) join_species_names(lookup_species_names.out.species_names, discard_low_quality_species_scores.out.species_detected_qc) score_species(join_species_names.out.species_detected) post_species_csv_to_json_api(score_species.out.scored_paflike, params.db_api_server, params.db_api_species_endpoint) /* ABR part */ // CARD database of ABR card = channel.value(params.card) // Detect antibiotic resistances detect_abr(card, reads) | discard_low_quality_abr_scores score_abr(discard_low_quality_abr_scores.out.abr_detected_raw) post_abr_csv_to_json_api(score_abr.out.scored_paflike, params.db_api_server, params.db_api_abr_endpoint) } // vim: shiftwidth=4 smarttab expandtab autoindent