clns-eTarget_ingest/clinical_json.py (552 lines of code) (raw):

# # This file is part of the eTarget ingest distribution (https://github.com/digital-ECMT/eTarget_ingest). # Copyright (C) 2017 - 2021 digital ECMT # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, version 3. # # This program is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. # import pymssql import json from azure.storage.file import FileService import utilities import re import datetime import traceback class ClinicalDataException(Exception): """Raised internally and is already sent to the log file/db""" pass class ClinicalDataJson: def __init__(self, filename, remotehostname, remoteusername, remotepassword, remotedbname, fileuser, filekey, datadir='data', logblob='log'): self.filename = filename self.datadir=datadir self.file_service = FileService(account_name=fileuser, account_key=filekey) self.clinicalDataString = self.file_service.get_file_to_text(self.datadir, None, filename).content self.remotehostname = remotehostname self.remoteusername = remoteusername self.remotepassword = remotepassword self.remotedbname = remotedbname print(self.remotehostname + "\n" + self.remoteusername+"\n"+self.remotepassword+"\n"+self.remotedbname) self.conn = pymssql.connect(self.remotehostname,self.remoteusername, self.remotepassword, self.remotedbname, autocommit=False) self.log = utilities.Util(remotehostname, remoteusername, remotepassword, remotedbname, fileuser, filekey, logblob) self.errorflag = False self.config = self.log.getConfig() def __del__(self): self.conn.close() def deleteFile(self): try: if self.file_service.exists(self.datadir, None, self.filename): self.file_service.delete_file(self.datadir, None, self.filename) if self.errorflag==False: self.log.systemStatusUpdate(filename, 'Clinical', self.log.timestamp(), 'Success') except: self.log.logMessage('There was a problem deleting '+self.filename) def rollback(self): self.conn.rollback() self.conn.close() def ingest(self): try: clinicalDataAll = json.loads(self.clinicalDataString) if not isinstance(clinicalDataAll, list): self.log.systemStatusUpdate(self.filename, 'Clinical', self.log.timestamp(),'Error: JSON format, first element must be an array') self.log.logMessage(self.filename + ' JSON does not start with an array') return -1 for clinicalDataIndividual in clinicalDataAll: if clinicalDataIndividual['type'] == 'clinical': patientDetails = self.getPatientDetails(clinicalDataIndividual) self.checkAndDeletePatient(patientDetails) self.ingestPatient(patientDetails) else: self.log.systemStatusUpdate(self.filename, 'Clinical', self.log.timestamp(), 'Error: JSON contains data of wrong type.') self.log.logMessage(self.filename + ' json contains wrong type of data') raise ClinicalDataException("Type of data is not clinical") if self.errorflag==False: # no problems found self.conn.commit() self.log.systemStatusUpdate(self.filename, 'Clinical', self.log.timestamp(), 'Success') self.log.logMessage(self.filename + ' successfully ingested') return 0 else: self.rollback() self.log.logMessage(self.filename + ' problems with ingestion') return -1 except ClinicalDataException as e: self.log.logMessage("abort ingest because of previous errors") self.log.logMessage(traceback.format_exc()) return -1 except Exception as e: self.log.systemStatusUpdate(self.filename, 'Clinical', self.log.timestamp(), 'Error: JSON format cannot be read; ' + str(e).replace("'", "''")) self.log.logMessage(self.filename + ' json format wrong' + str(e) ) self.log.logMessage(traceback.format_exc()) return -1 def getPatientDetails(self, patientRecord): patientDetails = {'christieNo':"", 'patientNumber':patientRecord['patientId'], 'ageAtConsent':patientRecord['ageAtConsent'], 'gender':patientRecord['gender'], 'dateOfDiagnosis':patientRecord['dateOfDiagnosis'], 'dateOfConsent':patientRecord['dateOfConsent'], 'primaryTumourType':patientRecord['diagnosis'].replace("'", "''").strip(), 'consultant':patientRecord['consultant'].replace("'", "''").strip() } if not 'dateTimeUpdate' in patientRecord or len(patientRecord['dateTimeUpdate'])==0: patientDetails['updateStamp']='NULL' else: patientDetails['updateStamp']=patientRecord['dateTimeUpdate'] if not 'diagnosis' in patientRecord or len(patientRecord['diagnosis'])==0: self.log.systemStatusUpdate(self.filename, 'Clinical', self.log.timestamp(), 'Error: Data import failed (diagnosis not present)') self.log.logMessage('Error diagnosis not present') self.errorflag=True return if not 'dateOfDiagnosis' in patientRecord or len(patientDetails['dateOfDiagnosis'])==0: self.log.systemStatusUpdate(self.filename, 'Clinical', self.log.timestamp(), 'Error: Data import failed (dateOfDiagnosis not present)') self.log.logMessage('Error dateOfDiagnosis not present') self.errorflag=True return if not 'consultant' in patientRecord or len(patientDetails['consultant'])==0: self.log.systemStatusUpdate(self.filename, 'Clinical', self.log.timestamp(), 'Error: Data import failed (consultant data not present)') self.log.logMessage('Error consultant name not present') self.errorflag=True return if not 'ageAtConsent' in patientRecord or not isinstance(patientDetails['ageAtConsent'], int): self.log.systemStatusUpdate(self.filename, 'Clinical', self.log.timestamp(), 'Error: Data import failed (ageAtConsent data not present, or not a number)') self.log.logMessage('Error ageAtConsent name not present') self.errorflag=True return if 'additionalTumourInfo' in patientRecord: if isinstance(patientRecord['additionalTumourInfo'], str): patientDetails['addInfo']=patientRecord['additionalTumourInfo'].replace("'", "''") else: self.log.systemStatusUpdate(self.filename, 'Clinical', self.log.timestamp(), 'Error: Data import failed (additionalTumourInfo data not a string)') self.log.logMessage('Error additionalTumourInfo data not a string') self.errorflag=True return else: patientDetails['addInfo']='' if 'samples' in patientRecord: patientDetails['specimen']=patientRecord['samples'] else: patientDetails['specimen']=[] if 'treatment' in patientRecord and len(patientRecord['treatment'])>0: patientDetails['treatment']=patientRecord['treatment'] else: patientDetails['treatment']=[] print(patientDetails) return patientDetails def checkAndDeletePatient(self, patientDetails): #check if patient exist self.log.logMessage('checking if patient exists') selectPatient = self.conn.cursor() selectPatient.execute("SELECT * FROM PERSON WHERE target_id = '"+patientDetails['patientNumber']+"'") patients=selectPatient.fetchall() selectPatientRowCount = 0 personID = 0 for row in patients: personID = row[0] databaseLastUpdate = row[6] selectPatientRowCount = selectPatientRowCount+1 if selectPatientRowCount == 0: return #found patient need to clear old db entries deleteProcedureOccurance = self.conn.cursor() deleteProcedureOccurance.execute("DELETE from PROCEDURE_OCCURRENCE where person_id="+str(personID)) deletePDXCDX = self.conn.cursor() deletePDXCDX.execute("DELETE from PDXCDX where person_id="+str(personID)) #find SPECIMEN selectSpecimen = self.conn.cursor() selectSpecimen.execute("SELECT specimen_id from SPECIMEN where person_id="+str(personID)) specimenIDs=selectSpecimen.fetchall(); for row in specimenIDs: specimen_id=row[0] selectGenePanel = self.conn.cursor() selectGenePanel.execute("SELECT measurement_gene_panel_id FROM MEASUREMENT_GENE_PANEL WHERE specimen_id="+str(specimen_id)) row = selectGenePanel.fetchone() selectGenePanel.execute("select IHC_REPORT.ihc_report_id from IHC_REPORT where specimen_id="+str(specimen_id)) row2 = selectGenePanel.fetchone() if row is None and row2 is None: deleteSpecimen = self.conn.cursor() deleteSpecimen.execute("DELETE FROM SPECIMEN WHERE specimen_id="+str(specimen_id)) def ingestPatient(self,patientDetails): self.log.logMessage('Updating database with new Christie details...') personID = self.findPatient(patientDetails) if personID==0: personID = self.createPatient(patientDetails) else: self.updatePatient(personID, patientDetails) self.insertCondition(personID, patientDetails) self.insertConsultant(patientDetails) self.insertTreatments(personID, patientDetails) self.insertSamples(personID, patientDetails) # self.insertBlood(personID, patientDetails) # self.insertTumour(personID, patientDetails) # self.insertPDXCDX(personID, patientDetails) def findPatient(self, patientDetails): getPatient = self.conn.cursor() getPatient.execute("SELECT * FROM PERSON WHERE target_id = '"+patientDetails['patientNumber']+"'") patients=getPatient.fetchall(); for row in patients: return(row[0]) return 0 def insertConsultant(self, patientDetails): getConsultant = self.conn.cursor() getConsultant.execute("SELECT * FROM CONCEPT_CONSULTANT WHERE name = '" + patientDetails['consultant'] + "'") getConsultantRowcount = 0 consultantConceptID = 0 consultants=getConsultant.fetchall() for row in consultants: getConsultantRowcount = getConsultantRowcount + 1 consultantConceptID = row[0] if getConsultantRowcount == 0: try: insertConsultant = self.conn.cursor() insertConsultant.execute("INSERT INTO CONCEPT_CONSULTANT (name) VALUES('" + patientDetails['consultant'] + "')") consultantConceptID = insertConsultant.lastrowid except Exception as e: self.log.systemStatusUpdate(self.filename, 'Clinical', self.log.timestamp(), 'Error: Data import failed (insert consultant) -- ' +patientDetails['patientNumber']) self.log.logMessage('Error creating consultant ' +patientDetails['patientNumber']+"\n"+ str(e)) self.errorflag=True print(str(e)) print(12) raise ClinicalDataException('Error creating consultant') return consultantConceptID def createPatient(self, patientDetails): # Insert the patient details getGender = self.conn.cursor() genderConceptID = 0 getGender.execute("SELECT * FROM CONCEPT_GENDER WHERE gender_name = '"+patientDetails['gender'].upper()+"'") gender=getGender.fetchall() for row in gender: genderConceptID = row[0] if genderConceptID == 0: self.log.systemStatusUpdate(self.filename, 'Clinical', self.log.timestamp(), 'Error: Data import failed (gender not recognised)') #check person_id and find site targetid = patientDetails['patientNumber'] pattern = re.compile("^[A-Z]{3}\d{7}$") if not re.match(pattern, targetid): self.log.systemStatusUpdate(self.filename, 'Clinical', self.log.timestamp(), 'Error: One or more patient IDs are incompatible with the naming convention') self.log.logMessage('Error creating person ' +patientDetails['patientNumber']+"\n patient ID has the wrong structure") self.errorflag=True raise ClinicalDataException('Error creating person') siteId = targetid[3:6] getSite = self.conn.cursor() getSite.execute("SELECT * FROM CARE_SITE WHERE care_site_id = "+siteId) row=getSite.fetchone() if row is None: self.log.systemStatusUpdate(self.filename, 'Clinical', self.log.timestamp(), 'Error: One or more sites are not present in eTARGET') self.log.logMessage('Error creating person ' +patientDetails['patientNumber']+"\n site is not registerd in DB") self.errorflag=True raise ClinicalDataException('Error creating person') consultantConceptID = self.insertConsultant(patientDetails) try: patientData = self.conn.cursor() print(patientDetails) consentDate = self.formatDate(patientDetails['dateOfConsent']) #hash_object = hashlib.sha256(patientDetails['christieNo'].encode('utf-8')) #hex_dig = hash_object.hexdigest() print("INSERT INTO PERSON (person_id, age_at_consent, gender_concept_id, consent_date, consultant_concept_id, target_id, care_site_id) VALUES((SELECT 1+COALESCE(MAX(person_id),0) FROM PERSON person), "+str(patientDetails['ageAtConsent'])+", "+str(genderConceptID)+", '"+consentDate+"', "+str(consultantConceptID)+", '"+patientDetails['patientNumber']+"', "+str(siteId)+")") patientData.execute("INSERT INTO PERSON (person_id, age_at_consent, gender_concept_id, consent_date, consultant_concept_id, target_id, care_site_id) VALUES((SELECT 1+COALESCE(MAX(person_id),0) FROM PERSON person), "+str(patientDetails['ageAtConsent'])+", "+str(genderConceptID)+", '"+consentDate+"', "+str(consultantConceptID)+", '"+patientDetails['patientNumber']+"', "+str(siteId)+")") personID = patientData.lastrowid self.log.logMessage('Patient ID '+str(personID)+' added.') except Exception as e: self.log.systemStatusUpdate(self.filename, 'Clinical', self.log.timestamp(), 'Error: Data import failed (create patient) -- ' +patientDetails['patientNumber']) self.log.logMessage('Error creating person ' +patientDetails['patientNumber']+"\n"+ str(e)) self.errorflag=True print(str(e)) print(13) raise ClinicalDataException('Error creating person') try: lastUpdateCursor = self.conn.cursor() if patientDetails['updateStamp']!='NULL': lastUpdateInsert = "'"+self.formatDateTime(patientDetails['updateStamp'])+"'" else: lastUpdateInsert = 'NULL' lastUpdateCursor.execute("UPDATE PERSON SET last_update="+lastUpdateInsert+" WHERE person_id=(SELECT MAX(person_id) FROM PERSON person)") except Exception as e: self.log.systemStatusUpdate(self.filename, 'Clinical', self.log.timestamp(), 'Error: Data import failed (set last updated) -- ' +patientDetails['patientNumber']) self.log.logMessage('Error updating person '+patientDetails['patientNumber']+"\n" + str(e)) self.errorflag=True print(str(e)) print(14) raise ClinicalDataException(str(e)) try: selectPersonIDCursor = self.conn.cursor() selectPersonIDCursor.execute("select top 1 person_id from PERSON WHERE target_id='"+patientDetails['patientNumber']+"'") row=selectPersonIDCursor.fetchone() personID= row[0] return personID except Exception as e: self.log.systemStatusUpdate(self.filename, 'Clinical', self.log.timestamp(), 'Error: Data import failed (select person) -- ' +patientDetails['patientNumber']) self.log.logMessage('Error selecting person ' +patientDetails['patientNumber']+"\n"+ str(e)) self.errorflag=True print(str(e)) print(15) raise ClinicalDataException('Error selecting person') def insertCondition(self, personID, patientDetails): # Insert the condition data getCondition = self.conn.cursor() getCondition.execute("SELECT * FROM CONCEPT_CONDITION_SUBTYPE WHERE subtype_name = '"+patientDetails['primaryTumourType']+"'") # conditionConceptID = 0 getConditionRowcount = 0 conditions=getCondition.fetchall() for row in conditions: getConditionRowcount = getConditionRowcount+1 # conditionConceptID = row[0] if getConditionRowcount == 0: insertCondition = self.conn.cursor() insertCondition.execute("INSERT INTO CONCEPT_CONDITION_SUBTYPE (subtype_name, condition_concept_id) VALUES ('"+patientDetails['primaryTumourType']+"', (select top 1 condition_concept_id from CONCEPT_CONDITION where condition_name='Other'))") print('new condition ' + str(insertCondition.lastrowid)) # conditionConceptID= insertCondition.lastrowid if len(patientDetails['dateOfDiagnosis']) > 0: try: conditionData = self.conn.cursor() diagnosisDate = self.formatDate(patientDetails['dateOfDiagnosis']) conditionData.execute("IF NOT EXISTS (SELECT * FROM CONDITION_OCCURRENCE WHERE person_id="+str(personID)+") "+ "INSERT INTO CONDITION_OCCURRENCE (person_id, condition_concept_id, condition_start_date, condition_subtype_concept_id, additional_details) VALUES("+str(personID)+", (SELECT TOP 1 condition_concept_id FROM CONCEPT_CONDITION_SUBTYPE WHERE subtype_name = '"+patientDetails['primaryTumourType']+"'), '"+diagnosisDate+"', (SELECT TOP 1 subtype_concept_id FROM CONCEPT_CONDITION_SUBTYPE WHERE subtype_name = '"+patientDetails['primaryTumourType']+"'), '"+patientDetails['addInfo']+"')"+ "ELSE UPDATE CONDITION_OCCURRENCE SET condition_start_date='"+diagnosisDate+"', condition_concept_id=(SELECT TOP 1 condition_concept_id FROM CONCEPT_CONDITION_SUBTYPE WHERE subtype_name = '"+patientDetails['primaryTumourType']+"'), condition_subtype_concept_id=(SELECT TOP 1 subtype_concept_id FROM CONCEPT_CONDITION_SUBTYPE WHERE subtype_name = '"+patientDetails['primaryTumourType']+"'), additional_details='"+patientDetails['addInfo']+"'WHERE person_id="+str(personID)) conditionDataID = conditionData.lastrowid self.log.logMessage('conditionData ID '+str(conditionDataID)+' added/updated.') print('conditionData ID '+str(conditionDataID)+' added/updated.') except Exception as e: self.log.systemStatusUpdate(self.filename, 'Clinical', self.log.timestamp(), 'Error: Data import failed (update condition_occurance) -- ' +patientDetails['patientNumber']) self.log.logMessage('Error inserting/updateing codition_occurance ' +patientDetails['patientNumber']+"\n" + str(e)) self.errorflag=True print(str(e)) print(16) raise ClinicalDataException('Error inserting/updateing codition_occurance') return personID def insertSamples(self,personID, patientDetails): for sample in patientDetails['specimen']: if 'type' in sample and isinstance(sample['type'], str): type=sample['type'] if type.lower()=='blood': self.insertBlood(personID, patientDetails['patientNumber'], sample) elif type.lower()=='tumour': self.insertTumour(personID,patientDetails['patientNumber'], sample, patientDetails['primaryTumourType']) else: self.log.systemStatusUpdate(self.filename, 'Clinical', self.log.timestamp(), 'Error: Data import failed (unknown type of sample. Must be blood or tumour)') self.log.logMessage('Error unknown type of sample') self.errorflag=True else: self.log.systemStatusUpdate(self.filename, 'Clinical', self.log.timestamp(), 'Error: Data import failed (type of sample must be present and a string)') self.log.logMessage('Error sample type missing or not a string') self.errorflag=True def insertTreatments(self, personID, patientDetails): for treatment in patientDetails['treatment']: if 'name' not in treatment or len(treatment['name'])==0: self.log.systemStatusUpdate(self.filename, 'Clinical', self.log.timestamp(),'Error: Data import failed (treatments require a name)') self.log.logMessage('Treatment name required') self.errorflag=True return procedure={ 'name':treatment['name'].replace("'", "''").strip(), 'start':'', 'end':'' } if 'dateStart' in treatment: procedure['start']=treatment['dateStart'] if 'dateEnd' in treatment: procedure['end']=treatment['dateEnd'] print(procedure) self.insertTreatment(personID,patientDetails,procedure) def insertTreatment(self, personID, patientDetails, treatment): # Update the procedure / treatment details getProcedure = self.conn.cursor() getProcedure.execute("SELECT * FROM CONCEPT_PROCEDURE WHERE procedure_name = '"+treatment['name']+"'") procedureRows = 0 procedure=getProcedure.fetchall() for row in procedure: procedureConceptID = row[0] procedureRows = procedureRows+1 if procedureRows == 0: try: insertProcedure = self.conn.cursor() insertProcedure.execute("INSERT INTO CONCEPT_PROCEDURE (procedure_name) VALUES('"+treatment['name']+"')") procedureConceptID = insertProcedure.lastrowid except Exception as e: self.log.systemStatusUpdate(filename, 'Clinical', self.log.timestamp(), 'Error: Data import failed (procedure)') self.log.logMessage('Error inserting procedure_name ' +patientDetails['patientNumber']+"\n" + str(e)) self.errorflag=True print(str(e)) print(7) raise ClinicalDataException('Error inserting procedure_name') procedureData = self.conn.cursor() if treatment['start'] != '': treatmentStartDate = self.formatDate(treatment['start']) else: treatmentStartDate = '' if treatment['end'] != '': treatmentEndDate = self.formatDate(treatment['end']) else: treatmentEndDate = '' try: procedureData.execute("IF NOT EXISTS (SELECT * FROM PROCEDURE_OCCURRENCE WHERE person_id ='"+str(personID)+"' AND procedure_concept_id = '"+str(procedureConceptID)+"' AND procedure_start_date='"+treatmentStartDate+"') INSERT INTO PROCEDURE_OCCURRENCE (person_id, procedure_concept_id, procedure_start_date, procedure_end_date) VALUES('"+str(personID)+"', '"+str(procedureConceptID)+"', '"+treatmentStartDate+"', '"+treatmentEndDate+"')") procedureDataID = procedureData.lastrowid self.log.logMessage('procedureData ID '+str(procedureDataID)+' updated.') except Exception as e: self.log.systemStatusUpdate(filename, 'Clinical', self.log.timestamp(), 'Error: Data import failed (procedure_occurance) -- ' +patientDetails['patientNumber']) self.log.logMessage('Error inserting procedure_occurance ' +patientDetails['patientNumber']+"\n" + str(err)) self.errorflag=True print(str(e)) print(8) raise ClinicalDataException('Error inserting procedure_occurance') def updatePatient(self, personID, patientDetails): # Update the patient details consultantID = self.insertConsultant(patientDetails) try: patientData = self.conn.cursor() consentDate = self.formatDate(patientDetails['dateOfConsent']) if patientDetails['updateStamp']!='NULL': lastUpdate = "'"+self.formatDateTime(patientDetails['updateStamp'])+"'" else: lastUpdate = 'NULL' #christieNo = self.encode(patientDetails['christieNo']) patientData.execute("UPDATE PERSON SET age_at_consent="+str(patientDetails['ageAtConsent'])+", gender_concept_id=(SELECT gender_concept_id FROM CONCEPT_GENDER WHERE gender_name ='"+patientDetails['gender'].upper()+"'), consent_date='"+consentDate+"', last_update="+lastUpdate+" WHERE target_id = '"+patientDetails['patientNumber']+"'") patientData.execute("UPDATE PERSON SET consultant_concept_id=(SELECT TOP 1 consultant_concept_id FROM CONCEPT_CONSULTANT WHERE name = '" + patientDetails['consultant'] + "') WHERE target_id = '"+patientDetails['patientNumber']+"'") self.log.logMessage('Patient ID '+str(personID)+' updated.') except Exception as e: self.log.systemStatusUpdate(self.filename, 'Clinical', self.log.timestamp(), 'Error: Data import failed (update person) -- ' +patientDetails['patientNumber'] + ' ' + str(e)) self.log.logMessage('Error updateing person '+patientDetails['patientNumber']+"\n" + (str(e))) self.errorflag=True print(str(e)) print(1) raise ClinicalDataException(str(e)) # Update the condition data getCondition = self.conn.cursor() getCondition.execute("SELECT * FROM CONCEPT_CONDITION_SUBTYPE WHERE subtype_name = '"+patientDetails['primaryTumourType']+"'") row = getCondition.fetchone() if row is None: insertCondition = self.conn.cursor() insertCondition.execute("INSERT INTO CONCEPT_CONDITION_SUBTYPE (subtype_name, condition_concept_id) VALUES ('"+patientDetails['primaryTumourType']+"', (select top 1 condition_concept_id from CONCEPT_CONDITION where condition_name='Other'))") if len(patientDetails['dateOfDiagnosis']) > 0: try: conditionData = self.conn.cursor() diagnosisDate = self.formatDate(patientDetails['dateOfDiagnosis']) #check if exists conditionData.execute("IF NOT EXISTS (SELECT * FROM CONDITION_OCCURRENCE WHERE person_id="+str(personID)+") "+ "INSERT INTO CONDITION_OCCURRENCE (person_id, condition_concept_id, condition_start_date, condition_subtype_concept_id, additional_details) VALUES("+str(personID)+", (SELECT TOP 1 condition_concept_id FROM CONCEPT_CONDITION_SUBTYPE WHERE subtype_name = '"+patientDetails['primaryTumourType']+"'), '"+diagnosisDate+"', (SELECT TOP 1 subtype_concept_id FROM CONCEPT_CONDITION_SUBTYPE WHERE subtype_name = '"+patientDetails['primaryTumourType']+"'), '"+patientDetails['addInfo']+"')"+ "ELSE UPDATE CONDITION_OCCURRENCE SET condition_start_date='"+diagnosisDate+"', condition_concept_id=(SELECT TOP 1 condition_concept_id FROM CONCEPT_CONDITION_SUBTYPE WHERE subtype_name = '"+patientDetails['primaryTumourType']+"'), condition_subtype_concept_id=(SELECT TOP 1 subtype_concept_id FROM CONCEPT_CONDITION_SUBTYPE WHERE subtype_name = '"+patientDetails['primaryTumourType']+"'), additional_details='"+patientDetails['addInfo']+"'WHERE person_id="+str(personID)) conditionDataID = conditionData.lastrowid self.log.logMessage('conditionData ID '+str(conditionDataID)+' updated.') except Exception as e: self.log.systemStatusUpdate(self.filename, 'Clinical', self.log.timestamp(), 'Error: Data import failed for -- ' +patientDetails['patientNumber'] + ': ' +str(e).replace("'", "''")) self.log.logMessage('Error updateing person '+patientDetails['patientNumber']+ ' Condition_Occurance \n' +str(e)) self.errorflag=True print(str(e)) print(3) raise ClinicalDataException('Error Condition_Occurance') try: consultantData = self.conn.cursor() consultantData.execute("UPDATE PERSON SET consultant_concept_id='"+str(consultantID)+"' WHERE person_id = '"+str(personID)+"'") consultantDataID = consultantData.lastrowid self.log.logMessage('consultantDataID ID '+str(consultantDataID)+' updated.') except Exception as e: self.log.systemStatusUpdate(self.filename, 'Clinical', self.log.timestamp(), 'Error: Data import failed (update consultant) -- ' +patientDetails['patientNumber']) self.log.logMessage('Error updating consultant ' +patientDetails['patientNumber']+"\n" + str(e)) self.errorflag=True print(str(e)) print(11) raise ClinicalDataException('Error updating consultant') def insertBlood(self, personID, patientNumber, sample): # Blood specimen if 'cdx' in sample and (sample['cdx']==True or (isinstance(sample['cdx'], str) and sample['cdx'].lower()=='true')): self.insertPDXCDX(personID, patientNumber, sample) if 'cdx' in sample and not isinstance(sample['cdx'], bool) and ((isinstance(sample['cdx'], str) and sample['cdx'].lower()!='true' and sample['cdx'].lower()!='false') or (isinstance(sample['cdx'], (int, float, complex)))): raise Exception('CDX value must be either true or false') try: specimenData = self.conn.cursor() bloodDate = self.formatDate(sample['date']) pattern = re.compile("^T\d{1,2}$") if not re.match(pattern, sample['timePoint']): raise Exception('timepoint does not conform to pattern T[number]') print(sample['timePoint']) timepoint = sample['timePoint'][1:] print(timepoint) specimenData.execute("IF NOT EXISTS (SELECT * FROM SPECIMEN WHERE person_id="+str(personID)+" AND specimen_concept_id=1 AND baseline_number="+timepoint+") "+ "INSERT INTO SPECIMEN (person_id, specimen_date, specimen_concept_id, baseline_number) VALUES("+str(personID)+", '"+bloodDate+"', 1, "+timepoint+") "+ "ELSE UPDATE SPECIMEN SET specimen_date = '"+bloodDate+"' WHERE person_id="+str(personID)+" AND specimen_concept_id=1 AND baseline_number="+timepoint) specimenDataID = specimenData.lastrowid self.log.logMessage('Specimen blood ID for '+sample['timePoint']+' updated.') except Exception as e: self.log.systemStatusUpdate(self.filename, 'Clinical', self.log.timestamp(), 'Error: Data import failed (Blood specimen) -- ' +patientNumber) self.log.logMessage('Error inserting/updating blood specimen ' +patientNumber+"\n" + str(e)) self.errorflag=True print(str(e)) print(4) raise ClinicalDataException('Error inserting/updating blood specimen') def insertTumour(self, personID, patientNumber, sample, diagnosis): if 'pdx' in sample and (sample['pdx']==True or (isinstance(sample['pdx'], str) and sample['pdx'].lower()=='true')): self.insertPDXCDX(personID, patientNumber, sample) if 'pdx' in sample and not isinstance(sample['pdx'], bool) and ((isinstance(sample['pdx'], str) and sample['pdx'].lower()!='true' and sample['pdx'].lower()!='false') or (isinstance(sample['pdx'], (int, float, complex)))): raise Exception('PDX value must be either true or false') pattern = re.compile("^Bx\d{1,2}$") if not re.match(pattern, sample['timePoint']): raise Exception('timepoint does not conform to pattern T[number]') print(sample['timePoint']) timepoint = sample['timePoint'][2:] print(timepoint) if sample['sampleType'].lower() == 'ffpe' or sample['sampleType'].lower() == 'snap' or sample['sampleType'].lower() == 'fresh': specimenConceptID = '3' elif sample['sampleType'].lower() == 'archival': specimenConceptID = '2' else: self.log.systemStatusUpdate(self.filename, 'Clinical', self.log.timestamp(), 'Error: Data import failed (tumour sampleType not present, or value not supported)') self.log.logMessage('Error sampleType not present') self.errorflag=True return if 'sampleId' in sample and sample['sampleId']!='': sampleId = sample['sampleId'] else: sampleId = patientNumber+sample['timePoint'] try: specimenData = self.conn.cursor() if 'date' in sample and sample['date'] !='': tumourDate = self.formatDate(sample['date']) #specimenData.execute("UPDATE SPECIMEN SET specimen_date = '"+tumourDate+"', anatomic_site_id=(SELECT anatomy_concept_id FROM CONCEPT_ANATOMY WHERE anatomy_name = '"+patientDetails['primaryTumourType']+"') WHERE person_id="+str(personID)+" AND specimen_concept_id="+specimenConceptID) else : tumourDate ='' specimenData.execute("SELECT * FROM SPECIMEN WHERE person_id="+str(personID)+" AND (preclin_id like '"+sampleId+"')"); specimenDataRows = 0 specimen=specimenData.fetchall() for row in specimen: specimenID=row[0] specimenDataRows = specimenDataRows+1 # The record didn't exist to update, so let's add it now if specimenDataRows == 0: specimenData.execute("INSERT INTO SPECIMEN (person_id, specimen_date, specimen_concept_id, anatomic_site_id, preclin_id, baseline_number) VALUES('"+str(personID)+"', '"+tumourDate+"', "+str(specimenConceptID)+", (SELECT anatomy_concept_id FROM CONCEPT_ANATOMY WHERE anatomy_name = '"+diagnosis+"'), '"+sampleId+"',"+timepoint+")") specimenID = specimenData.lastrowid else: specimenData.execute("UPDATE SPECIMEN set specimen_concept_id="+str(specimenConceptID)+" ,specimen_date='"+tumourDate+"', preclin_id='"+sampleId+"', baseline_number="+timepoint+", anatomic_site_id= (SELECT TOP 1 anatomy_concept_id FROM CONCEPT_ANATOMY WHERE anatomy_name = '"+diagnosis+"') where specimen_id="+str(specimenID)) #print(specimenDataID) self.log.logMessage('Specimen tumour ID '+str(specimenID)+' updated.') if 'siteOfBiobsy' in sample and sample['siteOfBiobsy']!= '': sample['siteOfBiobsy']=sample['siteOfBiobsy'].replace("'", "''") gdlData = self.conn.cursor() gdlSQL = "UPDATE GDL_REQUEST SET sample_type='"+sample['siteOfBiobsy']+"' WHERE specimen_id='"+str(specimenID)+"'" gdlData.execute(gdlSQL) if gdlData.rowcount == 0: gdlSQL = "INSERT INTO GDL_REQUEST (path_lab_ref, specimen_id, sample_type) VALUES('', '"+str(specimenID)+"', '"+sample['siteOfBiobsy']+"')" gdlData.execute(gdlSQL) except Exception as e: self.log.systemStatusUpdate(self.filename, 'Clinical', self.log.timestamp(), 'Error: Data import failed (tumour specimen) -- ' +patientNumber) self.log.logMessage('Error inserting/updating tumour specimen ' +patientNumber+"\n" + str(e)) self.errorflag=True print(str(e)) print(5) raise ClinicalDataException('Error inserting/updating tumour specimen') # Not clear yet whether we want this, if yes it needs to be specific to where it went # try: # gdlSQL = "" # gdlData = self.conn.cursor() # if len(patientDetails['gdlRequestDate']) > 0: # gdlDate = self.formatDate(patientDetails['gdlRequestDate']) # gdlSQL = "UPDATE GDL_REQUEST SET date_requested='"+gdlDate+"', sample_type='"+patientDetails['biopsyLocation']+"' WHERE specimen_id=(SELECT TOP 1 specimen_id FROM SPECIMEN WHERE person_id="+str(personID)+" AND ((specimen_date='"+tumourDate+"' and (preclin_id is NULL)) OR dbo.RemoveNonAlphaNumChars(preclin_id) like dbo.RemoveNonAlphaNumChars('"+str(patientDetails['preclinId'])+"')))" # gdlData.execute(gdlSQL) # # if gdlData.rowcount == 0: # gdlSQL = "INSERT INTO GDL_REQUEST (date_requested, path_lab_ref, specimen_id, sample_type) VALUES('"+gdlDate+"', '', '"+str(specimenID)+"', '"+patientDetails['biopsyLocation']+"')" # gdlData.execute(gdlSQL) # else: # gdlSQL = "UPDATE GDL_REQUEST SET sample_type='"+patientDetails['biopsyLocation']+"' WHERE specimen_id=(SELECT TOP 1 specimen_id FROM SPECIMEN WHERE person_id="+str(personID)+" AND specimen_concept_id="+str(specimenConceptID)+" AND ((specimen_date='"+tumourDate+"' and (preclin_id is NULL)) OR dbo.RemoveNonAlphaNumChars(preclin_id) like dbo.RemoveNonAlphaNumChars('"+str(patientDetails['preclinId'])+"')))" # gdlData.execute(gdlSQL) # # if gdlData.rowcount == 0: # gdlSQL = "INSERT INTO GDL_REQUEST (path_lab_ref, specimen_id, sample_type) VALUES('', '"+str(specimenID)+"', '"+patientDetails['biopsyLocation']+"')" # gdlData.execute(gdlSQL) # # specimenID = specimenData.lastrowid # print(specimenID) # self.log.logMessage('GDL ID '+str(specimenID)+' updated.') # except Exception as e: # self.log.systemStatusUpdate(self.filename, 'Clinical', self.log.timestamp(), 'Error: Data import failed (GDL request) -- ' +patientDetails['patientNumber']) # self.log.logMessage('Error inserting/updating GDL data ' +patientDetails['patientNumber']+"\n" + str(e)) # self.errorflag=True # print(str(e)) # print(6) # raise Exception('Error inserting/updating GDL data') def insertPDXCDX(self, personID, patientNumber, sample): # Insert the PDX/CDX data try: if sample['type']=='blood': pdxcdx='CDX' pattern = re.compile("^T\d{1,2}$") if not re.match(pattern, sample['timePoint']): raise Exception('timepoint does not conform to pattern T[number]') timepoint = sample['timePoint'][1:] else: pdxcdx='PDX' pattern = re.compile("^Bx\d{1,2}$") if not re.match(pattern, sample['timePoint']): raise Exception('timepoint does not conform to pattern T[number]') timepoint = sample['timePoint'][2:] insertPDXCDX = self.conn.cursor() if 'date' in sample and sample['date']!='': pdxDate = self.formatDate(sample['date']) insertPDXCDX.execute("IF NOT EXISTS (SELECT * FROM PDXCDX WHERE person_id ='"+str(personID)+"' AND date_created='"+str(pdxDate)+"' AND pdx_or_cdx='"+pdxcdx+"') INSERT INTO PDXCDX (date_created, pdx_or_cdx, timepoint, person_id) VALUES('"+str(pdxDate)+"', '"+pdxcdx+"', '"+str(timepoint)+"', '"+str(personID)+"')") else : insertPDXCDX.execute("IF NOT EXISTS (SELECT * FROM PDXCDX WHERE person_id ='"+str(personID)+"' AND date_created is null AND pdx_or_cdx='"+pdxcdx+"') INSERT INTO PDXCDX (pdx_or_cdx, timepoint, person_id) VALUES('"+pdxcdx+"', '"+str(timepoint)+"', '"+str(personID)+"')") insertPDXCDXID = insertPDXCDX.lastrowid except Exception as e: self.log.systemStatusUpdate(self.filename, 'Clinical', self.log.timestamp(), 'Error: Data import failed (PDX/CDX) -- ' +patientNumber) self.log.logMessage('Error insert PDXCDX ' +patientNumber+"\n" + str(e)) self.errorflag=True print(str(e)) print(9) raise ClinicalDataException('Error insert PDXCDX') def formatDateTime(self, dateString): # print(dateString) if len(dateString)<19: return self.formatDate(dateString) try: dateString= formattedDate = dateString[0:19] datetime.datetime.strptime(dateString, '%Y-%m-%dT%H:%M:%S') except ValueError: raise Exception('Date string not correct '+ dateString) return dateString def formatDate(self, dateString): # print(dateString) formattedDate = dateString[0:10] if not self.valid_date(formattedDate): raise Exception('Date string not correct '+ dateString) # print(formattedDate) return formattedDate def valid_date(self, datestring): try: datetime.datetime.strptime(datestring, '%Y-%m-%d') return True except ValueError: return False