void process()

in clns-acuity-transform/src/main/java/com/acuity/sdtm/transform/processor/SDTM_1_1/SDTM_1_1_PrimaryTumourLocationProcessor.groovy [72:334]


    void process(MongoDatabase mongo, Writer writer) {
        try {
            writer.open(study, "primary_tumour_location", HEADER)

            MongoCollection<Document> main = null
            MongoCollection<Document> supplementary = null
            List<? extends Bson> pipeline = []
            def nullSupplier = { r, s -> null }
            def rowSupplier = { it }
            def primaryTumorLocationSupplier = nullSupplier
            def originalDiagnosisDateSupplier = { r, s -> getOriginalDiagnosisDate(r, s) }
            def specifyPrimaryTumorLocationSupplier = nullSupplier
            if (studyIn(DR7, DR8)) {
                MongoCollection<Document> dm = mongo.getCollection("${sessionId}_DM")

                dm.find().each { row ->
                    row.primaryTumorLocation = 'Lung'
                    writer.write(map(row))
                }
            } else if (studyIn(DR3)) {
                main = mongo.getCollection("${sessionId}_FA")
                supplementary = mongo.getCollection("${sessionId}_SUPPFA")
                pipeline = [match(eq('FATESTCD', 'PTUMLOC'))]
                primaryTumorLocationSupplier = { Document r, MongoCollection<Document> s -> r['FALOC'] }
                originalDiagnosisDateSupplier = { Document r, MongoCollection s -> getOriginalDiagnosisDate(r, s) }
            } else if (studyIn(DR5)) {
                MongoCollection<Document> dm = mongo.getCollection("${sessionId}_DM")

                dm.find().each { row ->
                    row.primaryTumorLocation = 'Ovary'
                    writer.write(map(row))
                }
            } else if (studyIn(DR14)) {
                MongoCollection<Document> fa = mongo.getCollection("${sessionId}_FA")

                fa.find(eq('FATESTCD', 'DIAGP')).each { row ->
                    row.specifyPrimaryTumorLocation = null
                    row.primaryTumorLocation = row.FASTRESC
                    writer.write(map(row))
                }
            } else if (studyIn(DR4)) {
                MongoCollection<Document> fa = mongo.getCollection("${sessionId}_FA")

                fa.find(eq('FATESTCD', 'PHPRDIAG')).each { row ->
                    row.specifyPrimaryTumorLocation = null
                    row.primaryTumorLocation = row.FASTRESC
                    writer.write(map(row))
                }
            } else if (studyIn(DR9)) {
                MongoCollection<Document> mh = mongo.getCollection("${sessionId}_MH")
                MongoCollection<Document> suppmh = mongo.getCollection("${sessionId}_SUPPMH")

                Bson filter = and(
                        eq('MHCAT', 'CANCER DIAGNOSIS'),
                        not(
                                Filters.in('MHTERM', 'METASTATIC DIAGNOSIS', 'CYTOLOGICAL OR HISTO-PATHOLOGICAL DIAGNOSIS')
                        )
                )

                List<Bson> aggregationPipeline = [
                        match(filter),
                        sort(Sorts.ascending('MHDTC')),
                        group(new BasicDBObject('USUBJID', '$USUBJID'),
                                first('first', '$$ROOT')
                        )
                ]

                mh.aggregate(aggregationPipeline).each { grp ->
                    Document row = grp.first

                    row.primaryTumorLocation = row.MHTERM
                    row.originalDiagnosisDate = row.MHDTC
                    row.specifyPrimaryTumorLocation = getSuppFirstQval(row, suppmh, 'CDTOTH', 'MHSEQ')

                    writer.write(map(row))
                }
            } else if (studyIn(DR16)) {
                MongoCollection<Document> mh = mongo.getCollection("${sessionId}_MH")
                MongoCollection<Document> suppmh = mongo.getCollection("${sessionId}_SUPPMH")

                mh.aggregate([
                        match(eq('MHCAT', 'CANCER DIAGNOSIS')),
                        sort(Sorts.ascending('MHDTC')),
                        group(new BasicDBObject('USUBJID', '$USUBJID'), first('first', '$$ROOT'), sum('count', 1)),
                ])
                        .each { grp ->
                            Document row = grp.first
                            row.primaryTumorLocation = grp.count > 1 ? 'MULTIPLE' : getSuppFirstQval(row, suppmh, 'SITEMETA', 'MHSEQ')
                            row.originalDiagnosisDate = row.MHDTC
                            writer.write(map(row))
                        }
            } else if (studyIn(DR1, DR2)) {
                MongoCollection<Document> mh = mongo.getCollection("${sessionId}_MH")
                MongoCollection<Document> fa = mongo.getCollection("${sessionId}_FA")

                fa.find(eq('FATESTCD', 'PTUMLOC')).each { row ->
                    row.specifyPrimaryTumorLocation = null
                    row.primaryTumorLocation = row.FASTRESC


                    Document mhRow = (Document) mh.find(and(
                            eq('USUBJID', row.USUBJID),
                            eq('MHSCAT', 'FIRST DIAGNOSIS'),
                            eq('MHTERM', row.FAOBJ))).first()

                    row.originalDiagnosisDate = mhRow?.MHSTDTC

                    writer.write(map(row))
                }
            } else if (studyIn(DR12, DR11)) {
                MongoCollection<Document> mh = mongo.getCollection("${sessionId}_MH")

                mh.find(
                        and(
                                eq('MHCAT', 'DISEASE HISTORY'),
                                Filters.nin('MHTERM', 'Local or Regional Recurrence', 'Distant Metastases')
                        )
                ).each { row ->
                    row.primaryTumorLocation = row.MHTERM
                    row.originalDiagnosisDate = row.MHSTDTC

                    writer.write(map(row))
                }
            } else if (studyIn(DR13)) {
                MongoCollection<Document> fa = mongo.getCollection("${sessionId}_FA")
                MongoCollection<Document> suppfa = mongo.getCollection("${sessionId}_SUPPFA")

                Bson filter = eq('FATESTCD', 'LOCADMET')
                List<Bson> aggregationPipeline = [
                        match(filter),
                        sort(Sorts.ascending('FATDTC')),
                        group(new BasicDBObject('USUBJID', '$USUBJID'),
                                push('recs', '$$ROOT')
                        )
                ]


                fa.aggregate(aggregationPipeline).each { grp ->
                    List<Document> recs = grp.recs

                    Optional<Document> first = recs.stream().filter { row -> StringUtils.isNotBlank(row.FADTC as String) }.findFirst()
                    def row = first.orElse(recs.first())

                    String primaryTumorLocation = row.FALOC
                    row.primaryTumorLocation = recs.size() > 1 ? 'Multiple locations' : primaryTumorLocation
                    row.originalDiagnosisDate = row.FADTC
                    row.specifyPrimaryTumorLocation = null
                    if (recs.size() == 1) {
                        if (StringUtils.startsWithIgnoreCase(primaryTumorLocation, 'OTHER METASTATIC SITES')) {
                            row.specifyPrimaryTumorLocation = getSuppFirstQval(row, suppfa, 'FAMEOTH', 'FASEQ')
                        } else if (StringUtils.startsWithIgnoreCase(primaryTumorLocation, 'OTHER LOCALLY ADVANCED SITES')) {
                            row.specifyPrimaryTumorLocation = getSuppFirstQval(row, suppfa, 'FALOCOTH', 'FASEQ')
                        }
                    }
                    writer.write(map(row))
                }
            } else if (studyIn(DR10, DR6)) {
                MongoCollection<Document> fa = mongo.getCollection("${sessionId}_FA")
                MongoCollection<Document> suppfa = mongo.getCollection("${sessionId}_SUPPFA")

                Bson filter = eq('FATESTCD', 'PHPRDIAG')

                fa.find(filter).each { row ->

                    row.primaryTumorLocation = row.FASTRESC
                    row.originalDiagnosisDate = getOriginalDiagnosisDate(row, suppfa)

                    writer.write(map(row))
                }
            } else if (studyIn(DR17)) {
                main = mongo.getCollection("${sessionId}_MH")
                //filter
                pipeline = [
                        match(eq('MHCAT', 'DISEASE HISTORY')),
                        sort(Sorts.ascending('MHSTDTC')),
                        group(new BasicDBObject(USUBJID: '$USUBJID'),
                                first('first', '$$ROOT'))]
                rowSupplier = { it.first }
                primaryTumorLocationSupplier = { Document r, MongoCollection<Document> s -> r['MHTERM'] }
                originalDiagnosisDateSupplier = { Document r, MongoCollection s -> getOriginalDiagnosisDate(r, s) }
            } else if (studyIn(DR18)) {
                main = mongo.getCollection("${sessionId}_MH")
                pipeline = [
                        match(and(eq('MHCAT', 'ONCOLOGY DISEASE'),
                                eq('MHSCAT', 'FIRST POSITIVE BIOPSY'))),
                        sort(Sorts.ascending('MHSTDTC')),
                        group(new BasicDBObject(USUBJID: '$USUBJID'),
                                first('first', '$$ROOT'))]
                rowSupplier = { it.first }
                primaryTumorLocationSupplier = { Document r, MongoCollection<Document> s -> r['MHTERM'] }
                originalDiagnosisDateSupplier = { Document r, MongoCollection s -> getOriginalDiagnosisDate(r, s) }
            } else if (studyIn(DR20)) {
                MongoCollection<Document> fa = mongo.getCollection("${sessionId}_FA")
                fa
                        .aggregate(
                                [
                                        match(or(eq('FAOBJ', 'First Biopsy'),
                                                eq('FATESTCD', 'DIAGP'))),
                                        group(new BasicDBObject(USUBJID: '$USUBJID'),
                                                push('recs', '$$ROOT'))
                                ])
                        .each {
                            grp ->
                                def groupedRows = (grp.recs as List<Document>).groupBy { it.USUBJID }
                                groupedRows.each { key, rows ->
                                    def row = rows.first()
                                    row.primaryTumorLocation = rows.find {
                                        it.FATESTCD == 'DIAGP' && 'Primary Diagnosis' == it.FAOBJ
                                    }?.FASTRESC
                                    row.originalDiagnosisDate = rows.find { it.FAOBJ == 'First Biopsy' }?.FASTRESC
                                    writer.write(map(row))
                                }
                        }
            } else if (studyIn(DR21)) {
                main = mongo.getCollection("${sessionId}_MH")

                pipeline = [
                        match(and(eq('MHCAT', 'DISEASE HISTORY'),
                                nin('MHBODSYS', null, '')))]
                primaryTumorLocationSupplier = { Document r, MongoCollection<Document> s -> r['MHTERM'] }
                originalDiagnosisDateSupplier = { Document r, MongoCollection s -> getOriginalDiagnosisDate(r, s) }
            } else if (studyIn(DR22)) {
                main = mongo.getCollection("${sessionId}_MH")
                supplementary = mongo.getCollection("${sessionId}_FA")
                pipeline = [match(or(eq('MHCAT', 'ONCOLOGY DISEASE'),
                        eq('MHSCAT', 'FIRST POSITIVE BIOPSY'))),
                            sort(Sorts.ascending('MHSTDTC')),
                            group(new BasicDBObject(USUBJID: '$USUBJID'),
                                    first('first', '$$ROOT'))]
                rowSupplier = { it.first }
                primaryTumorLocationSupplier = { Document r, MongoCollection<Document> s ->
                    def consolidatedRows = s.find(and(eq('USUBJID', r.USUBJID),
                            eq('FATESTCD', 'PTUMLOC')))
                    return consolidatedRows.size() > 1 ? 'multiple' : consolidatedRows.first()?.FASTRESC
                }
                originalDiagnosisDateSupplier = { Document r, MongoCollection s -> parseDateTrimDash(r?.MHSTDTC as String) }

            } else if (studyIn(DR24)) {
                main = mongo.getCollection("${sessionId}_FA")
                supplementary = mongo.getCollection("${sessionId}_FA")
                primaryTumorLocationSupplier = { Document r, MongoCollection<Document> s ->
                    def ptumlocFilter = s.find(and(eq('USUBJID', r.USUBJID),
                            eq('FATESTCD', 'PTUMLOC'),
                            eq('FATEST', 'Primary Tumor Location'))).first()
                    return ptumlocFilter?.FAORRES
                }
                originalDiagnosisDateSupplier = { Document r, MongoCollection s -> getOriginalDiagnosisDate(r, s) }
                specifyPrimaryTumorLocationSupplier = { Document r, MongoCollection s -> r['FAORRES'] }
            } else if (studyIn(DR25)) {
                MongoCollection<Document> fa = mongo.getCollection("${sessionId}_FA")

                fa.find(eq('FATESTCD', 'DIAGDTC')).each { row ->
                    row.specifyPrimaryTumorLocation = null
                    row.originalDiagnosisDate = row.FASTRESC
                    writer.write(map(row))
                }
            }
            performProcess(main, supplementary, pipeline, rowSupplier, primaryTumorLocationSupplier,
                    originalDiagnosisDateSupplier, specifyPrimaryTumorLocationSupplier, writer)
        } finally {
            writer.close()
        }
    }