Skip to content
Snippets Groups Projects
main.nf 9.74 KiB
Newer Older
Dorian Lehmenkühler's avatar
Dorian Lehmenkühler committed
nextflow.enable.dsl=2

Dorian Lehmenkühler's avatar
Dorian Lehmenkühler committed
include {post_csv_to_json_api as post_species_csv_to_json_api; post_csv_to_json_api as post_abr_csv_to_json_api;
         score as score_species; score as score_abr; put_file as upload_fast5; put_file_base as upload_fsummary} from './lib.nf'
Dorian Lehmenkühler's avatar
Dorian Lehmenkühler committed
/*
    Map reads against local species database
*/
process map_species {
    
    // Only one fork so we _really_ don't run out of memory
    maxForks 1

    container 'quay.io/biocontainers/minimap2:2.26--he4a0461_1'

    input:
        path species_fasta
        tuple val(prefix), path(reads_fastq)

    output:
        tuple path("${reads_fastq}_species_raw.paf"), val(prefix), emit: paf

    script:
        """
        minimap2 --secondary=no -x map-ont "${species_fasta}" "${reads_fastq}" > "${reads_fastq}_species_raw.paf"
        """
}

/*
    Take minimap PAF result from map_species and discard matches with low quality
*/
process discard_low_quality_species_scores {

    input:
        tuple path(species_raw), val(prefix)

    output:
        tuple path ("${species_raw}_qc.csv"), val(prefix), emit: species_detected_qc

    shell:
        '''
        # discard matches with a quality(PAF column 12) < 60 or length(11) less than 200,
        # output the first 12 columns (PAF format) and add a column with mapping_percentage(10/11),
        # reduce all spaces to a single one and convert them into ','
        # finally sort by species_accession(col 6) and store as csv
        awk '$12 > 59 && $12 < 255 && $11 > 199 {print $1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$10/$11}' !{species_raw}\
         | sort | sed "s/^\\s\\+//g" | tr ' ' ',' | sort --key=6 --field-separator=, > !{species_raw}_qc.csv
        '''
}

/*
    Map matched species sequence IDs to species names so they can be included in the result
*/
process lookup_species_names {

    input:
        tuple path(species_detected_qc), val(prefix)
        path species_fasta

    output:
        path 'species_names.csv', emit: species_names

    script:
        """
        # TODO: this can probably be done more efficiently.. and also once and be permanently stored until the database changes..
        grep '^>' ${species_fasta} > species_meta.txt
        looked_up=''
        while read line; do
            id="\$(echo \${line} | awk -F, '{print \$6}')"
            if [[ ! \${looked_up[*]} =~ \${id} ]];then
                looked_up=(\${looked_up[@]} \${id})
                grep \${id} species_meta.txt
            fi;
        done < "${species_detected_qc}" | uniq | tr -d '>' | sed "s/,/ -/g" | sed "s/^\\([A-Z,0-9,_.]*\\)\\s/\\1,/g" | sort > species_names.csv
        """
}

/*
    Include species names in the table of detected species
*/
process join_species_names {

    input:
        path species_names
        tuple path(species_detected_qc), val(prefix)

    output:
        tuple path("${species_detected_qc}_with_names.csv"), val(prefix), emit: species_detected

    publishDir "${params.publishdir}/${prefix}/"

    script:
        """
        output_name="${species_detected_qc}_with_names.csv"
        echo 'species_accession,query_id,query_length,query_start,query_end,strand,target_length,target_start,'\
'target_end,residue_matches,alignment_block_length,mapping_quality,match_percentage,species_name' > "\${output_name}"
        join -1 6 -2 1 -t , "${species_detected_qc}" "${species_names}" >> "\${output_name}"
        """
}

/*
    Map reads agains CARD database for antibiotic resistancies
*/
process detect_abr {

    container 'quay.io/biocontainers/minimap2:2.26--he4a0461_1'
    
    input:
        path card_db
        tuple val(prefix), path(reads_fastq)

    output:
        tuple path("${reads_fastq}_abr_raw.paf"), val(prefix), emit: abr_raw

    script:
        """
        minimap2 --secondary=no -x map-ont  "${card_db}" "${reads_fastq}" > "${reads_fastq}_abr_raw.paf"
        """
}
        
/*
    Take minimap2 PAF result from detect_abr and discard matches with low quality
*/
process discard_low_quality_abr_scores {

    input:
        tuple path(abr_raw_paf), val(prefix)

    output:
        tuple path("${abr_raw_paf}_qc.csv"), val(prefix), emit: abr_detected_raw

    publishDir "${params.publishdir}/${prefix}"

    script:
        """
        # discard matches with a quality(PAF column 12) < 60 or length(11) less than 200,
        # output the first 12 columns (PAF format) and add a column with mapping_percentage(10/11),
        # reduce all spaces to a single one and convert them into ','
        # finally sort by species_accession(col 6) and store as csv
        echo 'query_id,query_length,query_start,query_end,strand,abr_card_id,target_length,target_start,target_end,'\
'residue_matches,alignment_block_length,mapping_quality,match_percentage' > "${abr_raw_paf}_qc.csv"
        awk '\$12 > 59 && \$12 < 255 && \$11 > 199 {print \$1,\$2,\$3,\$4,\$5,\$6,\$7,\$8,\$9,\$10,\$11,\$12,\$10/\$11}' "${abr_raw_paf}"|\
         sort | sed "s/^\\s\\+//g" | tr ' ' ',' >> "${abr_raw_paf}_qc.csv"
        """
}

// Create an unique project id from the name of the project directory, and the flowcell name and runIDs extracted from the filename of the given file
def get_project_id_from_fastx(somefile) {

    // the project dir is the parent of the directory containing the fastx files
    x = somefile.collect()*.toString()
    proj_dir = x[x.findIndexOf{ it in params.minknowsubdirs} - 2]
    
    // split the filename into parts, remove the second part (which is always 'fail' or 'pass') and the last part (which contains the file number and extension)
    // afterwards join the remaining parts together and then with the name of the project dir to obtain the full project id
    filename_parts = somefile.getBaseName().split(params.namedelimiter)
    return [proj_dir].plus( filename_parts.minus(filename_parts[1, -1]).join(params.idjoiner) ).join(params.idjoiner)
// Get the same project id from the filename of a final_summary which minkow creates when the run is finished.
// BUT: If the sample uses barcoding, the barcode-id will be part of the id extracted from the fastx files (so they can be separated in the frontend),
// but not part of this id, because there is only one final_summary for the whole run. This has to be handled by the cloud-workflow.
def get_project_id_from_summary(somesummary) {
    [somesummary.getParent()[-2]].plus(somesummary.getBaseName().split(params.namedelimiter)[2,3,4]).join(params.idjoiner)
def makeregexpattern(somedir, somepattern) {
    // as a function because nextflow otherwise complains about an already used variable
    return Pattern.compile( somedir + somepattern )
}

Dorian Lehmenkühler's avatar
Dorian Lehmenkühler committed
workflow {

    if (params.minknowdir.endsWith(File.separator)) { 
        minknowdir = params.minknowdir        
    } else {
        minknowdir = params.minknowdir + File.separator
    while (! file(minknowdir).exists()){
        println "MinKnow output dir '${minknowdir}' does not exist yet. Checking again in 10 seconds. (Press Ctrl+C three times to cancel)"
        sleep(10000)
Dorian Lehmenkühler's avatar
Dorian Lehmenkühler committed
    println "Starting workflow with following parameters:"
    println ""
    println "Reads directory to watch: ${minknowdir + params.fastq_pattern_regex}"
    println "FAST5_pass directory to watch: ${minknowdir + params.fast5_pattern_glob}"
Dorian Lehmenkühler's avatar
Dorian Lehmenkühler committed
    println "Spezies database: $params.species"
    println "CARD database: $params.card"
    println "DB API server: $params.db_api_server"
    println "Cloud API server: $params.cloud_api_server"
    println ""
Dorian Lehmenkühler's avatar
Dorian Lehmenkühler committed


    // Fasta file with genomes of all species relevant to us
    species = channel.value(params.species)

    // Creates a channel which emits every file newly cerated in the directory params.minknowdir/fastq_pass/
    // The channel emits a tuple (with the .map operator) of the file and an unique project id
    reads_glob = channel.watchPath(minknowdir+params.fastq_pattern_glob).map{ it -> [ get_project_id_from_fastx(it) , it]}

    checkreadsPattern = makeregexpattern(minknowdir, params.fastq_pattern_regex)
    reads = reads_glob.filter({ prj, read -> checkreadsPattern.matcher(read.toString()).matches()})
    if( !(params.upload_fast5 in params.false_values) ) {
        // Additionally watch the created FAST5 files to forward them to the cloud workflow and wait for the final summary
        fast5files = channel.watchPath(minknowdir+params.fast5_pattern_glob).map{ it -> [it, get_project_id_from_fastx(it)]}
        fsummary = channel.watchPath(minknowdir+params.summary_pattern_glob).map{ it -> [it, get_project_id_from_summary(it)]}
        checkfsumPattern = makeregexpattern(minknowdir, params.summary_pattern_regex)
        
        upload_fsummary(fsummary.filter({ sum, prj -> checkfsumPattern.matcher(sum.toString()).matches()}), params.cloud_api_server)
        upload_fast5(fast5files,params.cloud_api_server, params.cloud_api_fast5_endpoint)
    }
Dorian Lehmenkühler's avatar
Dorian Lehmenkühler committed

    /*
        Species part
    */
    
    // Map reads to species and discard mappings with low quality
    map_species(species, reads) | discard_low_quality_species_scores
            
    // Match the detected species ids with their common names
    lookup_species_names(discard_low_quality_species_scores.out.species_detected_qc, species)
    join_species_names(lookup_species_names.out.species_names, discard_low_quality_species_scores.out.species_detected_qc)

    score_species(join_species_names.out.species_detected)

    post_species_csv_to_json_api(score_species.out.scored_paflike, params.db_api_server, params.db_api_species_endpoint)

    /*
        ABR part
    */

    // CARD database of ABR
    card = channel.value(params.card)

    // Detect antibiotic resistances
    detect_abr(card, reads) | discard_low_quality_abr_scores
    score_abr(discard_low_quality_abr_scores.out.abr_detected_raw)

    post_abr_csv_to_json_api(score_abr.out.scored_paflike, params.db_api_server, params.db_api_abr_endpoint)

}

// vim: shiftwidth=4 smarttab expandtab autoindent