Skip to content
Snippets Groups Projects
main.nf 7.64 KiB
Newer Older
  • Learn to ignore specific revisions
  • Dorian Lehmenkühler's avatar
    Dorian Lehmenkühler committed
    nextflow.enable.dsl=2
    
    include {post_csv_to_json_api as post_species_csv_to_json_api; post_csv_to_json_api as post_abr_csv_to_json_api;
             score as score_species; score as score_abr; put_file as upload_fast5pass; put_file as upload_fast5fail; put_file_base as upload_fsummary} from './lib.nf'
    /*
        Map reads against local species database
    */
    process map_species {
        
        // Only one fork so we _really_ don't run out of memory
        maxForks 1
    
        container 'quay.io/biocontainers/minimap2:2.26--he4a0461_1'
    
        input:
            path species_fasta
            tuple val(prefix), path(reads_fastq)
    
        output:
            tuple path("${reads_fastq}_species_raw.paf"), val(prefix), emit: paf
    
        script:
            """
            minimap2 --secondary=no -x map-ont "${species_fasta}" "${reads_fastq}" > "${reads_fastq}_species_raw.paf"
            """
    }
    
    /*
        Take minimap PAF result from map_species and discard matches with low quality
    */
    process discard_low_quality_species_scores {
    
        input:
            tuple path(species_raw), val(prefix)
    
        output:
            tuple path ("${species_raw}_qc.csv"), val(prefix), emit: species_detected_qc
    
        shell:
            '''
            # discard matches with a quality(PAF column 12) < 60 or length(11) less than 200,
            # output the first 12 columns (PAF format) and add a column with mapping_percentage(10/11),
            # reduce all spaces to a single one and convert them into ','
            # finally sort by species_accession(col 6) and store as csv
            awk '$12 > 59 && $12 < 255 && $11 > 199 {print $1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$10/$11}' !{species_raw}\
             | sort | sed "s/^\\s\\+//g" | tr ' ' ',' | sort --key=6 --field-separator=, > !{species_raw}_qc.csv
            '''
    }
    
    /*
        Map matched species sequence IDs to species names so they can be included in the result
    */
    process lookup_species_names {
    
        input:
            tuple path(species_detected_qc), val(prefix)
            path species_fasta
    
        output:
            path 'species_names.csv', emit: species_names
    
        script:
            """
            # TODO: this can probably be done more efficiently.. and also once and be permanently stored until the database changes..
            grep '^>' ${species_fasta} > species_meta.txt
            looked_up=''
            while read line; do
                id="\$(echo \${line} | awk -F, '{print \$6}')"
                if [[ ! \${looked_up[*]} =~ \${id} ]];then
                    looked_up=(\${looked_up[@]} \${id})
                    grep \${id} species_meta.txt
                fi;
            done < "${species_detected_qc}" | uniq | tr -d '>' | sed "s/,/ -/g" | sed "s/^\\([A-Z,0-9,_.]*\\)\\s/\\1,/g" | sort > species_names.csv
            """
    }
    
    /*
        Include species names in the table of detected species
    */
    process join_species_names {
    
        input:
            path species_names
            tuple path(species_detected_qc), val(prefix)
    
        output:
            tuple path("${species_detected_qc}_with_names.csv"), val(prefix), emit: species_detected
    
        publishDir "${params.publishdir}/${prefix}/"
    
        script:
            """
            output_name="${species_detected_qc}_with_names.csv"
            echo 'species_accession,query_id,query_length,query_start,query_end,strand,target_length,target_start,'\
    'target_end,residue_matches,alignment_block_length,mapping_quality,match_percentage,species_name' > "\${output_name}"
            join -1 6 -2 1 -t , "${species_detected_qc}" "${species_names}" >> "\${output_name}"
            """
    }
    
    /*
        Map reads agains CARD database for antibiotic resistancies
    */
    process detect_abr {
    
        container 'quay.io/biocontainers/minimap2:2.26--he4a0461_1'
        
        input:
            path card_db
            tuple val(prefix), path(reads_fastq)
    
        output:
            tuple path("${reads_fastq}_abr_raw.paf"), val(prefix), emit: abr_raw
    
        script:
            """
            minimap2 --secondary=no -x map-ont  "${card_db}" "${reads_fastq}" > "${reads_fastq}_abr_raw.paf"
            """
    }
            
    /*
        Take minimap2 PAF result from detect_abr and discard matches with low quality
    */
    process discard_low_quality_abr_scores {
    
        input:
            tuple path(abr_raw_paf), val(prefix)
    
        output:
            tuple path("${abr_raw_paf}_qc.csv"), val(prefix), emit: abr_detected_raw
    
        publishDir "${params.publishdir}/${prefix}"
    
        script:
            """
            # discard matches with a quality(PAF column 12) < 60 or length(11) less than 200,
            # output the first 12 columns (PAF format) and add a column with mapping_percentage(10/11),
            # reduce all spaces to a single one and convert them into ','
            # finally sort by species_accession(col 6) and store as csv
            echo 'query_id,query_length,query_start,query_end,strand,abr_card_id,target_length,target_start,target_end,'\
    'residue_matches,alignment_block_length,mapping_quality,match_percentage' > "${abr_raw_paf}_qc.csv"
            awk '\$12 > 59 && \$12 < 255 && \$11 > 199 {print \$1,\$2,\$3,\$4,\$5,\$6,\$7,\$8,\$9,\$10,\$11,\$12,\$10/\$11}' "${abr_raw_paf}"|\
             sort | sed "s/^\\s\\+//g" | tr ' ' ',' >> "${abr_raw_paf}_qc.csv"
            """
    }
    
    workflow {
    
        println "Starting workflow with following parameters:"
        println
        println "Reads directory to watch: ${params.minknowdir}fastq_pass/"
        println "FAST5_pass directory to watch: ${params.minknowdir}fast5_pass/"
        println "FAST5_fail directory to watch: ${params.minknowdir}fast5_fail/"
        println "Spezies database: $params.species"
        println "CARD database: $params.card"
        println "DB API server: $params.db_api_server"
        println "Cloud API server: $params.cloud_api_server"
        println
    
    
        // Fasta file with genomes of all species relevant to us
        species = channel.value(params.species)
    
        // Creates a channel which emits every file newly cerated in the directory params.minknowdir/fastq_pass/
        // The channel emits a tuple (with the .map operator) of the file and its prefix, which is used as a project name
        reads = channel.watchPath(params.minknowdir+'fastq_pass/').map{ it -> [it.getName().split(params.namedelimiter)[0], it]}
    
        // Additionally watch the created FAST5 files to forward them to the cloud workflow and wait for the final summary
        fast5pass = channel.watchPath(params.minknowdir+'fast5_pass/').map{ it -> [it, it.getName().split(params.namedelimiter)[0]]}
        fast5fail = channel.watchPath(params.minknowdir+'fast5_fail/').map{ it -> [it, it.getName().split(params.namedelimiter)[0]]}
        fsummary = channel.watchPath(params.minknowdir+'final_summary*').map{ it -> [it, it.getName().split('_')[2]]}
     
        upload_fast5pass(fast5pass,params.cloud_api_server, params.cloud_api_fast5_endpoint)
        upload_fast5fail(fast5fail,params.cloud_api_server, params.cloud_api_fast5_endpoint)
        upload_fsummary(fsummary,params.cloud_api_server)
    
        /*
            Species part
        */
        
        // Map reads to species and discard mappings with low quality
        map_species(species, reads) | discard_low_quality_species_scores
                
        // Match the detected species ids with their common names
        lookup_species_names(discard_low_quality_species_scores.out.species_detected_qc, species)
        join_species_names(lookup_species_names.out.species_names, discard_low_quality_species_scores.out.species_detected_qc)
    
        score_species(join_species_names.out.species_detected)
    
        post_species_csv_to_json_api(score_species.out.scored_paflike, params.db_api_server, params.db_api_species_endpoint)
    
        /*
            ABR part
        */
    
        // CARD database of ABR
        card = channel.value(params.card)
    
        // Detect antibiotic resistances
        detect_abr(card, reads) | discard_low_quality_abr_scores
        score_abr(discard_low_quality_abr_scores.out.abr_detected_raw)
    
        post_abr_csv_to_json_api(score_abr.out.scored_paflike, params.db_api_server, params.db_api_abr_endpoint)
    
    }
    
    // vim: shiftwidth=4 smarttab expandtab autoindent