Skip to content
Snippets Groups Projects
main.nf 8.04 KiB
Newer Older
  • Learn to ignore specific revisions
  • Dorian Lehmenkühler's avatar
    Dorian Lehmenkühler committed
    nextflow.enable.dsl=2
    
    include {post_csv_to_json_api as post_species_csv_to_json_api; post_csv_to_json_api as post_abr_csv_to_json_api;
    
             score as score_species; score as score_abr; put_file as upload_fast5; put_file_base as upload_fsummary} from './lib.nf'
    
    Dorian Lehmenkühler's avatar
    Dorian Lehmenkühler committed
    /*
        Map reads against local species database
    */
    process map_species {
        
        // Only one fork so we _really_ don't run out of memory
        maxForks 1
    
        container 'quay.io/biocontainers/minimap2:2.26--he4a0461_1'
    
        input:
            path species_fasta
            tuple val(prefix), path(reads_fastq)
    
        output:
            tuple path("${reads_fastq}_species_raw.paf"), val(prefix), emit: paf
    
        script:
            """
            minimap2 --secondary=no -x map-ont "${species_fasta}" "${reads_fastq}" > "${reads_fastq}_species_raw.paf"
            """
    }
    
    /*
        Take minimap PAF result from map_species and discard matches with low quality
    */
    process discard_low_quality_species_scores {
    
        input:
            tuple path(species_raw), val(prefix)
    
        output:
            tuple path ("${species_raw}_qc.csv"), val(prefix), emit: species_detected_qc
    
        shell:
            '''
            # discard matches with a quality(PAF column 12) < 60 or length(11) less than 200,
            # output the first 12 columns (PAF format) and add a column with mapping_percentage(10/11),
            # reduce all spaces to a single one and convert them into ','
            # finally sort by species_accession(col 6) and store as csv
            awk '$12 > 59 && $12 < 255 && $11 > 199 {print $1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$10/$11}' !{species_raw}\
             | sort | sed "s/^\\s\\+//g" | tr ' ' ',' | sort --key=6 --field-separator=, > !{species_raw}_qc.csv
            '''
    }
    
    /*
        Map matched species sequence IDs to species names so they can be included in the result
    */
    process lookup_species_names {
    
        input:
            tuple path(species_detected_qc), val(prefix)
            path species_fasta
    
        output:
            path 'species_names.csv', emit: species_names
    
        script:
            """
            # TODO: this can probably be done more efficiently.. and also once and be permanently stored until the database changes..
            grep '^>' ${species_fasta} > species_meta.txt
            looked_up=''
            while read line; do
                id="\$(echo \${line} | awk -F, '{print \$6}')"
                if [[ ! \${looked_up[*]} =~ \${id} ]];then
                    looked_up=(\${looked_up[@]} \${id})
                    grep \${id} species_meta.txt
                fi;
            done < "${species_detected_qc}" | uniq | tr -d '>' | sed "s/,/ -/g" | sed "s/^\\([A-Z,0-9,_.]*\\)\\s/\\1,/g" | sort > species_names.csv
            """
    }
    
    /*
        Include species names in the table of detected species
    */
    process join_species_names {
    
        input:
            path species_names
            tuple path(species_detected_qc), val(prefix)
    
        output:
            tuple path("${species_detected_qc}_with_names.csv"), val(prefix), emit: species_detected
    
        publishDir "${params.publishdir}/${prefix}/"
    
        script:
            """
            output_name="${species_detected_qc}_with_names.csv"
            echo 'species_accession,query_id,query_length,query_start,query_end,strand,target_length,target_start,'\
    'target_end,residue_matches,alignment_block_length,mapping_quality,match_percentage,species_name' > "\${output_name}"
            join -1 6 -2 1 -t , "${species_detected_qc}" "${species_names}" >> "\${output_name}"
            """
    }
    
    /*
        Map reads agains CARD database for antibiotic resistancies
    */
    process detect_abr {
    
        container 'quay.io/biocontainers/minimap2:2.26--he4a0461_1'
        
        input:
            path card_db
            tuple val(prefix), path(reads_fastq)
    
        output:
            tuple path("${reads_fastq}_abr_raw.paf"), val(prefix), emit: abr_raw
    
        script:
            """
            minimap2 --secondary=no -x map-ont  "${card_db}" "${reads_fastq}" > "${reads_fastq}_abr_raw.paf"
            """
    }
            
    /*
        Take minimap2 PAF result from detect_abr and discard matches with low quality
    */
    process discard_low_quality_abr_scores {
    
        input:
            tuple path(abr_raw_paf), val(prefix)
    
        output:
            tuple path("${abr_raw_paf}_qc.csv"), val(prefix), emit: abr_detected_raw
    
        publishDir "${params.publishdir}/${prefix}"
    
        script:
            """
            # discard matches with a quality(PAF column 12) < 60 or length(11) less than 200,
            # output the first 12 columns (PAF format) and add a column with mapping_percentage(10/11),
            # reduce all spaces to a single one and convert them into ','
            # finally sort by species_accession(col 6) and store as csv
            echo 'query_id,query_length,query_start,query_end,strand,abr_card_id,target_length,target_start,target_end,'\
    'residue_matches,alignment_block_length,mapping_quality,match_percentage' > "${abr_raw_paf}_qc.csv"
            awk '\$12 > 59 && \$12 < 255 && \$11 > 199 {print \$1,\$2,\$3,\$4,\$5,\$6,\$7,\$8,\$9,\$10,\$11,\$12,\$10/\$11}' "${abr_raw_paf}"|\
             sort | sed "s/^\\s\\+//g" | tr ' ' ',' >> "${abr_raw_paf}_qc.csv"
            """
    }
    
    
    // Create an unique project id from the name of the project directory, and the flowcell name and runIDs extracted from the filename of the given file
    def get_project_id_from_fastx(project_dir_str, somefile) {
        [file(project_dir_str).collect()*.toString()[-1]].plus(somefile.getName().split(params.namedelimiter)[0,2,3]).join(params.idjoiner)
    }
    
    def get_project_id_from_summary(project_dir_str, somesummary) {
        [file(project_dir_str).collect()*.toString()[-1]].plus(somesummary.getName().split(params.namedelimiter)[2,3,4]).join(params.idjoiner)
    }
    
    
    Dorian Lehmenkühler's avatar
    Dorian Lehmenkühler committed
    workflow {
    
    
        if (params.minknowdir.endsWith(File.separator)) { 
            minknowdir = params.minknowdir        
        } else {
            minknowdir = params.minknowdir + File.separator
    
        }
    
    
    Dorian Lehmenkühler's avatar
    Dorian Lehmenkühler committed
        println "Starting workflow with following parameters:"
    
        println ""
    
        println "Reads directory to watch: ${minknowdir + params.fastq_pattern}"
        println "FAST5_pass directory to watch: ${minknowdir + params.fast5_pattern}"
    
    Dorian Lehmenkühler's avatar
    Dorian Lehmenkühler committed
        println "Spezies database: $params.species"
        println "CARD database: $params.card"
        println "DB API server: $params.db_api_server"
        println "Cloud API server: $params.cloud_api_server"
    
        println ""
    
    Dorian Lehmenkühler's avatar
    Dorian Lehmenkühler committed
    
    
        // Fasta file with genomes of all species relevant to us
        species = channel.value(params.species)
    
        // Creates a channel which emits every file newly cerated in the directory params.minknowdir/fastq_pass/
    
        // The channel emits a tuple (with the .map operator) of the file and an unique project id
        reads = channel.watchPath(minknowdir+params.fastq_pattern).map{ it -> [ get_project_id_from_fastx(minknowdir, it) , it]}
    
    Dorian Lehmenkühler's avatar
    Dorian Lehmenkühler committed
    
        // Additionally watch the created FAST5 files to forward them to the cloud workflow and wait for the final summary
    
        fast5files = channel.watchPath(minknowdir+params.fast5_pattern).map{ it -> [it, get_project_id_from_fastx(minknowdir, it)]}
        fsummary = channel.watchPath(minknowdir+params.summary_pattern).map{ it -> [it, get_project_id_from_summary(minknowdir, it)]}
    
        upload_fast5(fast5files,params.cloud_api_server, params.cloud_api_fast5_endpoint)
    
    Dorian Lehmenkühler's avatar
    Dorian Lehmenkühler committed
        upload_fsummary(fsummary,params.cloud_api_server)
    
        /*
            Species part
        */
        
        // Map reads to species and discard mappings with low quality
        map_species(species, reads) | discard_low_quality_species_scores
                
        // Match the detected species ids with their common names
        lookup_species_names(discard_low_quality_species_scores.out.species_detected_qc, species)
        join_species_names(lookup_species_names.out.species_names, discard_low_quality_species_scores.out.species_detected_qc)
    
        score_species(join_species_names.out.species_detected)
    
        post_species_csv_to_json_api(score_species.out.scored_paflike, params.db_api_server, params.db_api_species_endpoint)
    
        /*
            ABR part
        */
    
        // CARD database of ABR
        card = channel.value(params.card)
    
        // Detect antibiotic resistances
        detect_abr(card, reads) | discard_low_quality_abr_scores
        score_abr(discard_low_quality_abr_scores.out.abr_detected_raw)
    
        post_abr_csv_to_json_api(score_abr.out.scored_paflike, params.db_api_server, params.db_api_abr_endpoint)
    
    }
    
    // vim: shiftwidth=4 smarttab expandtab autoindent