clns-eTarget_ingest/clinicaldata.py (587 lines of code) (raw):

# # This file is part of the eTarget ingest distribution (https://github.com/digital-ECMT/eTarget_ingest). # Copyright (C) 2017 - 2021 digital ECMT # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, version 3. # # This program is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. # import pymssql import utilities import hashlib from fileinput import filename from azure.storage.file import FileService import re import csv import traceback import sys from datetime import datetime class ClinicalDataException(Exception): """Raised internally and is already sent to the log file/db""" pass class ClinicalData: def __init__(self, filename, remotehostname, remoteusername, remotepassword, remotedbname, fileuser, filekey, datadir='data', logblob='log'): self.filename = filename self.datadir=datadir self.file_service = FileService(account_name=fileuser, account_key=filekey) try: self.clinicalDataString = self.file_service.get_file_to_text(self.datadir, None, filename).content except Exception as e: self.clinicalDataString = self.file_service.get_file_to_text(self.datadir, None, filename, encoding="cp1252").content self.remotehostname = remotehostname self.remoteusername = remoteusername self.remotepassword = remotepassword self.remotedbname = remotedbname print(self.remotehostname + "\n" + self.remoteusername+"\n"+self.remotepassword+"\n"+self.remotedbname) self.conn = pymssql.connect(self.remotehostname,self.remoteusername, self.remotepassword, self.remotedbname, autocommit=False) self.log = utilities.Util(remotehostname, remoteusername, remotepassword, remotedbname, fileuser, filekey, logblob) self.errorflag = False self.config = self.log.getConfig() def __del__(self): self.conn.close() def deleteFile(self): try: if self.file_service.exists(self.datadir, None, self.filename): self.file_service.delete_file(self.datadir, None, self.filename) if self.errorflag==False: self.log.systemStatusUpdate(filename, 'Clinical', self.log.timestamp(), 'Success') except: self.log.logMessage('There was a problem deleting '+self.filename) def rollback(self): self.conn.rollback() self.conn.close() def getPatientDetails(self, row): # Patient data from the file patientDetails = {'hospital_numberNo':row[0], 'patientNumber':row[1], 'ageOfConsent':row[2], 'gender':row[3], 'dateOfDiagnosis':row[4], 'primaryTumourType':row[5].replace("'", "''"), 'stage':row[6], 'dateOfConsent':row[7], 'dateOfBloodCollection':row[9], 'sampleType':row[10], 'sampleDate':row[11], 'biopsyLocation':row[12].replace("'", "''"), 'gdlRequestDate':row[13], 'treatmentDetails':row[14].replace("'", "''"), 'treatmentStartDate':row[15], 'treatmentEndDate':row[16], 'consultant':row[17].replace("'", "''"), 'updateStamp':row[18], 'preclinId':row[19].replace("'", "''"), 'tumourId':row[20].replace("'", "''"), 'addInfo':row[21].replace("'", "''")} if len(patientDetails['ageOfConsent'].strip())==0 : raise Exception('Age at consent cannot be empty') if not patientDetails['ageOfConsent'].isdigit() : raise Exception('Age at consent is not a number') if len(patientDetails['dateOfConsent'].strip())==0: raise Exception('Date of Consent cannot be empty') if len(patientDetails['gender'].strip())==0: raise Exception('Gender cannot be empty') if len(patientDetails['patientNumber'].strip())==0: raise Exception('Patient ID cannot be empty') if len(patientDetails['dateOfDiagnosis'].strip())==0: raise Exception('Date of diagnosis cannot be empty') if len(patientDetails['consultant'].strip())==0: raise Exception('Consultant cannot be empty') if len(patientDetails['primaryTumourType'].strip())==0: raise Exception('Diagnosis cannot be empty') if len(patientDetails['sampleType'].strip())==0 and (len(patientDetails['sampleDate'].strip())>0 or len(row[22].strip())>0 or len(patientDetails['biopsyLocation'].strip())>0): raise Exception('Sample type cannot be empty if other tumour data are present') if len(patientDetails['treatmentDetails'].strip())==0 and (len(patientDetails['treatmentStartDate'].strip())>0 or len(patientDetails['treatmentEndDate'].strip())>0): raise Exception('treatments require a name') if len(row[8].strip())>1: self.checkTimePoint(row[8], 'blood', patientDetails['patientNumber']) patientDetails['sampleTimePoint'] = row[8][1:] if len(patientDetails['dateOfBloodCollection'].strip())==0: raise Exception('Date of blood collection cannot be empty if there is a time point') elif len(patientDetails['dateOfBloodCollection'].strip())>0: raise Exception('Blood timepoint cannot be empty if other blood data are available') else: patientDetails['sampleTimePoint'] = '' if len(row[22].strip())>1: self.checkTimePoint(row[22].strip(), 'tumour', patientDetails['patientNumber']) patientDetails['tumourTimePoint'] = row[22][2:] if len(patientDetails['preclinId'].strip()) ==0: patientDetails['preclinId']=patientDetails['patientNumber']+row[22] else: if patientDetails['sampleType'].upper()!='CDX' and patientDetails['sampleType'].upper()!='PDX' and len(patientDetails['sampleDate'].strip()) > 0: raise Exception('Tumour timepoint cannot be empty if other tumour data are available') patientDetails['tumourTimePoint']='' f = "%d/%m/%Y" f2 = "%d/%m/%Y %H:%M" try: datetime.strptime(patientDetails['dateOfDiagnosis'], f) datetime.strptime(patientDetails['dateOfConsent'], f) if patientDetails['dateOfBloodCollection'].strip()!='': datetime.strptime(patientDetails['dateOfBloodCollection'], f) if patientDetails['sampleDate'].strip()!='': datetime.strptime(patientDetails['sampleDate'], f) if patientDetails['treatmentStartDate'].strip()!='': datetime.strptime(patientDetails['treatmentStartDate'], f) if patientDetails['treatmentEndDate'].strip()!='': datetime.strptime(patientDetails['treatmentEndDate'], f) except ValueError as e: raise Exception('Date format is incorrect and does not match dd/mm/yyyy or is not a valid date') try: if patientDetails['updateStamp'].strip()!='': datetime.strptime(patientDetails['updateStamp'], f2) except ValueError as e: try: datetime.strptime(patientDetails['updateStamp'], f) except ValueError as e: raise Exception('Date format is incorrect and does not match dd/mm/yyyy HH:MM or is not a valid date') return patientDetails def checkTimePoint(self, timepoint, type, patientID): if type=='blood': pattern = re.compile("^T\d{1,2}$") if not re.match(pattern, timepoint): self.log.systemStatusUpdate(self.filename, 'Clinical', self.log.timestamp(), 'Error: The blood time point ('+timepoint+') is incompatible with the naming convention ('+patientID+')') self.log.logMessage('Error creating person ' +patientID+"\n blood time point has the wrong structure") self.errorflag=True raise ClinicalDataException('Blood timepoint incorrect format') else: pattern = re.compile("^Bx\d{1,2}$") if not re.match(pattern, timepoint): self.log.systemStatusUpdate(self.filename, 'Clinical', self.log.timestamp(), 'Error: The tumour time point ('+timepoint+') is incompatible with the naming convention ('+patientID+')') self.log.logMessage('Error creating person ' +patientID+"\n tumour time point has the wrong structure") self.errorflag=True raise ClinicalDataException('Error checking timepoint') return def readLine(self, number): csvString = self.clinicalDataString.splitlines() readCSV = csv.reader(csvString, delimiter=',', dialect=csv.excel_tab) i=0 for row in readCSV: i=i+1 if i== number: return row return '' def ingest(self): csvString = self.clinicalDataString.splitlines() try: readCSV = csv.reader(csvString, delimiter=',', dialect=csv.excel_tab) except Exception as e: self.log.logMessage(self.filename + ' problems with ingestion ' + str(e)) print(e) self.log.systemStatusUpdate(self.filename, 'Clinical', self.log.timestamp(), 'Data import failed; check encoding of file') patientList = [] rowCount = 0 try: for row in readCSV: if rowCount==0: rowCount = rowCount+1 continue if len(row) == 0: continue if row[8] == 'Finished': continue patientDetails = self.getPatientDetails(row) # check if record exist and delete if present on appearance in file if patientDetails['patientNumber'] not in patientList: patientList.append(patientDetails['patientNumber']) self.checkAndDeletePatient(patientDetails) #process this patient self.ingestPatient(patientDetails) rowCount = rowCount+1 #self.conn.close() if self.errorflag==False: # no problems foundc self.conn.commit() self.log.systemStatusUpdate(self.filename, 'Clinical', self.log.timestamp(), 'Success') self.log.logMessage(self.filename + ' successfully ingested') return 0 else: self.rollback() self.log.logMessage(self.filename + ' problems with ingestion') return -1 except ClinicalDataException as e: self.rollback() self.log.logMessage(self.filename + ' problems with ingestion ' + str(e)) traceback.print_exc(file=sys.stdout) return -1 except Exception as e: self.rollback() self.log.logMessage(self.filename + ' problems with ingestion ' + str(e)) print(e) traceback.print_exc(file=sys.stdout) self.log.systemStatusUpdate(self.filename, 'Clinical', self.log.timestamp(), 'Data import failed (' + str(e) +')') return -1 def checkAndDeletePatient(self, patientDetails): #check if patient exist self.log.logMessage('checking if patient exists') selectPatient = self.conn.cursor() selectPatient.execute("SELECT * FROM PERSON WHERE target_id = '"+patientDetails['patientNumber']+"'") patients=selectPatient.fetchall() selectPatientRowCount = 0 personID = 0 for row in patients: personID = row[0] databaseLastUpdate = row[6] selectPatientRowCount = selectPatientRowCount+1 if selectPatientRowCount == 0: return #found patient need to clear old db entries deleteProcedureOccurance = self.conn.cursor() deleteProcedureOccurance.execute("DELETE from PROCEDURE_OCCURRENCE where person_id="+str(personID)) deletePDXCDX = self.conn.cursor() deletePDXCDX.execute("DELETE from PDXCDX where person_id="+str(personID)) #find SPECIMEN selectSpecimen = self.conn.cursor() selectSpecimen.execute("SELECT specimen_id from SPECIMEN where person_id="+str(personID)) specimenIDs=selectSpecimen.fetchall(); for row in specimenIDs: specimen_id=row[0] selectGenePanel = self.conn.cursor() selectGenePanel.execute("SELECT measurement_gene_panel_id FROM MEASUREMENT_GENE_PANEL WHERE specimen_id="+str(specimen_id)) row = selectGenePanel.fetchone() selectGenePanel.execute("select IHC_REPORT.ihc_report_id from IHC_REPORT where specimen_id="+str(specimen_id)) row2 = selectGenePanel.fetchone() if row is None and row2 is None: deleteSpecimen = self.conn.cursor() deleteSpecimen.execute("DELETE FROM SPECIMEN WHERE specimen_id="+str(specimen_id)) def ingestPatient(self,patientDetails): self.log.logMessage('Updating database with new Christie details...') personID = self.findPatient(patientDetails) if personID==0: personID = self.createPatient(patientDetails) else: self.updatePatient(personID, patientDetails) self.insertCondition(personID, patientDetails) self.insertConsultant(patientDetails) self.insertTreatment(personID, patientDetails) self.insertBlood(personID, patientDetails) self.insertTumour(personID, patientDetails) self.insertPDXCDX(personID, patientDetails) def findPatient(self, patientDetails): getPatient = self.conn.cursor() getPatient.execute("SELECT * FROM PERSON WHERE target_id = '"+patientDetails['patientNumber']+"'") patients=getPatient.fetchall(); for row in patients: return(row[0]) return 0 def insertConsultant(self, patientDetails): getConsultant = self.conn.cursor() getConsultant.execute("SELECT * FROM CONCEPT_CONSULTANT WHERE name = '" + patientDetails['consultant'] + "'") getConsultantRowcount = 0 consultantConceptID = 0 consultants=getConsultant.fetchall() for row in consultants: getConsultantRowcount = getConsultantRowcount + 1 consultantConceptID = row[0] if getConsultantRowcount == 0: try: insertConsultant = self.conn.cursor() insertConsultant.execute("INSERT INTO CONCEPT_CONSULTANT (name) VALUES('" + patientDetails['consultant'] + "')") consultantConceptID = insertConsultant.lastrowid except Exception as e: self.log.systemStatusUpdate(self.filename, 'Clinical', self.log.timestamp(), 'Error: Data import failed (insert consultant) -- ' +patientDetails['patientNumber']) self.log.logMessage('Error creating consultant ' +patientDetails['patientNumber']+"\n"+ str(e)) self.errorflag=True print(str(e)) print(12) raise ClinicalDataException('Error creating consultant') return consultantConceptID def createPatient(self, patientDetails): # Insert the patient details getGender = self.conn.cursor() genderConceptID = 0 getGender.execute("SELECT * FROM CONCEPT_GENDER WHERE gender_name = '"+patientDetails['gender'].upper()+"'") gender=getGender.fetchall() for row in gender: genderConceptID = row[0] if genderConceptID == 0: self.log.systemStatusUpdate(self.filename, 'Clinical', self.log.timestamp(), 'Error: Data import failed (gender not recognised)') self.log.logMessage('Error creating person ' +patientDetails['patientNumber']+"\n gender is not registered in DB") self.errorflag=True raise ClinicalDataException('Gender is not recognised') #check person_id and find site targetid = patientDetails['patientNumber'] pattern = re.compile("^[A-Z]{3}\d{7}$") if not re.match(pattern, targetid): self.log.systemStatusUpdate(self.filename, 'Clinical', self.log.timestamp(), 'Error: One or more patient IDs are incompatible with the naming convention') self.log.logMessage('Error creating person ' +patientDetails['patientNumber']+"\n patient ID has the wrong structure") self.errorflag=True raise ClinicalDataException('Error creating person') siteId = targetid[3:6] getSite = self.conn.cursor() getSite.execute("SELECT * FROM CARE_SITE WHERE care_site_id = "+siteId) row=getSite.fetchone() if row is None: self.log.systemStatusUpdate(self.filename, 'Clinical', self.log.timestamp(), 'Error: One or more sites are not present in eTARGET') self.log.logMessage('Error creating person ' +patientDetails['patientNumber']+"\n site is not registered in DB") self.errorflag=True raise ClinicalDataException('Error creating person') consultantConceptID = self.insertConsultant(patientDetails) try: patientData = self.conn.cursor() consentDate = self.formatDate(patientDetails['dateOfConsent']) hash_object = hashlib.sha256(patientDetails['hospital_numberNo'].encode('utf-8')) hex_dig = hash_object.hexdigest() print("INSERT INTO PERSON (person_id, hospital_number, hospital_number_hash, age_at_consent, gender_concept_id, consent_date, consultant_concept_id, target_id, care_site_id) VALUES((SELECT 1+COALESCE(MAX(person_id),0) FROM PERSON person), ENCRYPTBYPASSPHRASE('"+self.config['patientkey']+"', '"+patientDetails['hospital_numberNo']+"'), '"+hex_dig+"', "+str(patientDetails['ageOfConsent'])+", "+str(genderConceptID)+", '"+consentDate+"', "+str(consultantConceptID)+", '"+patientDetails['patientNumber']+"', "+str(siteId)+")") patientData.execute("INSERT INTO PERSON (person_id, hospital_number, hospital_number_hash, age_at_consent, gender_concept_id, consent_date, consultant_concept_id, target_id, care_site_id) VALUES((SELECT 1+COALESCE(MAX(person_id),0) FROM PERSON person), ENCRYPTBYPASSPHRASE('"+self.config['patientkey']+"', '"+patientDetails['hospital_numberNo']+"'), '"+hex_dig+"', "+str(patientDetails['ageOfConsent'])+", "+str(genderConceptID)+", '"+consentDate+"', "+str(consultantConceptID)+", '"+patientDetails['patientNumber']+"', "+str(siteId)+")") personID = patientData.lastrowid self.log.logMessage('Patient ID '+str(personID)+' added.') except Exception as e: self.log.systemStatusUpdate(self.filename, 'Clinical', self.log.timestamp(), 'Error: A value is missing or of the wrong format -- ' +patientDetails['patientNumber']) self.log.logMessage('Error creating person ' +patientDetails['patientNumber']+"\n"+ str(e)) self.errorflag=True print(str(e)) print(13) raise ClinicalDataException('Error creating person') try: lastUpdateCursor = self.conn.cursor() if len(patientDetails['updateStamp'].strip())==0: lastUpdateCursor.execute("UPDATE PERSON SET last_update=NULL WHERE person_id=(SELECT MAX(person_id) FROM PERSON person)") else: # luP1 = patientDetails['updateStamp'].split(' ') lastUpdateInsert = self.formatDateTime(patientDetails['updateStamp']) lastUpdateCursor.execute("UPDATE PERSON SET last_update='"+lastUpdateInsert+"' WHERE person_id=(SELECT MAX(person_id) FROM PERSON person)") except Exception as e: self.log.systemStatusUpdate(self.filename, 'Clinical', self.log.timestamp(), 'Error: Data import failed (set last updated) -- ' +patientDetails['patientNumber']) self.log.logMessage('Error updating person '+patientDetails['patientNumber']+"\n" + str(e)) self.errorflag=True print(str(e)) print(14) raise ClinicalDataException('Error updating person') try: selectPersonIDCursor = self.conn.cursor() selectPersonIDCursor.execute("select top 1 person_id from PERSON WHERE target_id='"+patientDetails['patientNumber']+"'") row=selectPersonIDCursor.fetchone() personID= row[0] return personID except Exception as e: self.log.systemStatusUpdate(self.filename, 'Clinical', self.log.timestamp(), 'Error: Data import failed (select person) -- ' +patientDetails['patientNumber']) self.log.logMessage('Error selecting person ' +patientDetails['patientNumber']+"\n"+ str(e)) self.errorflag=True print(str(e)) print(15) raise ClinicalDataException('Error selecting person') def insertCondition(self, personID, patientDetails): # Insert the condition data getCondition = self.conn.cursor() getCondition.execute("SELECT * FROM CONCEPT_CONDITION_SUBTYPE WHERE subtype_name = '"+patientDetails['primaryTumourType']+"'") # conditionConceptID = 0 getConditionRowcount = 0 conditions=getCondition.fetchall() for row in conditions: getConditionRowcount = getConditionRowcount+1 # conditionConceptID = row[0] if getConditionRowcount == 0: insertCondition = self.conn.cursor() insertCondition.execute("INSERT INTO CONCEPT_CONDITION_SUBTYPE (subtype_name, condition_concept_id) VALUES ('"+patientDetails['primaryTumourType']+"', (select top 1 condition_concept_id from CONCEPT_CONDITION where condition_name='Other'))") print('new condition ' + str(insertCondition.lastrowid)) # conditionConceptID= insertCondition.lastrowid if len(patientDetails['dateOfDiagnosis']) > 0: try: conditionData = self.conn.cursor() diagnosisDate = self.formatDate(patientDetails['dateOfDiagnosis']) conditionData.execute("IF NOT EXISTS (SELECT * FROM CONDITION_OCCURRENCE WHERE person_id="+str(personID)+") "+ "INSERT INTO CONDITION_OCCURRENCE (person_id, condition_concept_id, condition_start_date, condition_subtype_concept_id, additional_details) VALUES("+str(personID)+", (SELECT TOP 1 condition_concept_id FROM CONCEPT_CONDITION_SUBTYPE WHERE subtype_name = '"+patientDetails['primaryTumourType']+"'), '"+diagnosisDate+"', (SELECT TOP 1 subtype_concept_id FROM CONCEPT_CONDITION_SUBTYPE WHERE subtype_name = '"+patientDetails['primaryTumourType']+"'), '"+patientDetails['addInfo']+"')"+ "ELSE UPDATE CONDITION_OCCURRENCE SET condition_start_date='"+diagnosisDate+"', condition_concept_id=(SELECT TOP 1 condition_concept_id FROM CONCEPT_CONDITION_SUBTYPE WHERE subtype_name = '"+patientDetails['primaryTumourType']+"'), condition_subtype_concept_id=(SELECT TOP 1 subtype_concept_id FROM CONCEPT_CONDITION_SUBTYPE WHERE subtype_name = '"+patientDetails['primaryTumourType']+"'), additional_details='"+patientDetails['addInfo']+"'WHERE person_id="+str(personID)) conditionDataID = conditionData.lastrowid self.log.logMessage('conditionData ID '+str(conditionDataID)+' added/updated.') print('conditionData ID '+str(conditionDataID)+' added/updated.') except Exception as e: self.log.systemStatusUpdate(filename, 'Clinical', self.log.timestamp(), 'Error: Data import failed (update condition_occurance) -- ' +patientDetails['patientNumber']) self.log.logMessage('Error inserting/updateing codition_occurance ' +patientDetails['patientNumber']+"\n" + str(e)) self.errorflag=True print(str(e)) print(16) raise ClinicalDataException('Error inserting/updateing codition_occurance') return personID def insertTreatment(self, personID, patientDetails): # Update the procedure / treatment details getProcedure = self.conn.cursor() getProcedure.execute("SELECT * FROM CONCEPT_PROCEDURE WHERE procedure_name = '"+patientDetails['treatmentDetails']+"'") procedureRows = 0 procedure=getProcedure.fetchall() for row in procedure: procedureConceptID = row[0] procedureRows = procedureRows+1 if procedureRows == 0: try: insertProcedure = self.conn.cursor() insertProcedure.execute("INSERT INTO CONCEPT_PROCEDURE (procedure_name) VALUES('"+patientDetails['treatmentDetails']+"')") procedureConceptID = insertProcedure.lastrowid except Exception as e: self.log.systemStatusUpdate(filename, 'Clinical', self.log.timestamp(), 'Error: Data import failed (procedure)') self.log.logMessage('Error inserting procedure_name ' +patientDetails['patientNumber']+"\n" + str(e)) self.errorflag=True print(str(e)) print(7) raise ClinicalDataException('Error inserting procedure_name') procedureData = self.conn.cursor() if patientDetails['treatmentStartDate'] != '': treatmentStartDate = self.formatDate(patientDetails['treatmentStartDate']) else: treatmentStartDate = '' if patientDetails['treatmentEndDate'] != '': treatmentEndDate = self.formatDate(patientDetails['treatmentEndDate']) else: treatmentEndDate = '' try: procedureData.execute("IF NOT EXISTS (SELECT * FROM PROCEDURE_OCCURRENCE WHERE person_id ='"+str(personID)+"' AND procedure_concept_id = '"+str(procedureConceptID)+"' AND procedure_start_date='"+treatmentStartDate+"') INSERT INTO PROCEDURE_OCCURRENCE (person_id, procedure_concept_id, procedure_start_date, procedure_end_date) VALUES('"+str(personID)+"', '"+str(procedureConceptID)+"', '"+treatmentStartDate+"', '"+treatmentEndDate+"')") procedureDataID = procedureData.lastrowid self.log.logMessage('procedureData ID '+str(procedureDataID)+' updated.') except Exception as e: self.log.systemStatusUpdate(filename, 'Clinical', self.log.timestamp(), 'Error: Data import failed (procedure_occurance) -- ' +patientDetails['patientNumber']) self.log.logMessage('Error inserting procedure_occurance ' +patientDetails['patientNumber']+"\n" + str(err)) self.errorflag=True print(str(e)) print(8) raise ClinicalDataException('Error inserting procedure_occurance') def updatePatient(self, personID, patientDetails): if len(patientDetails['updateStamp'].strip())==0: lastUpdate = 'NULL' else: # lU1 = patientDetails['updateStamp'].split(' ') lastUpdate = "'"+self.formatDateTime(patientDetails['updateStamp'])+"'" # Update the patient details consultantID = self.insertConsultant(patientDetails) getGender = self.conn.cursor() genderConceptID = 0 getGender.execute("SELECT * FROM CONCEPT_GENDER WHERE gender_name = '"+patientDetails['gender'].upper()+"'") gender=getGender.fetchall() for row in gender: genderConceptID = row[0] if genderConceptID == 0: self.log.systemStatusUpdate(self.filename, 'Clinical', self.log.timestamp(), 'Error: Data import failed (gender not recognised)') self.log.logMessage('Error creating person ' +patientDetails['patientNumber']+"\n gender is not registered in DB") self.errorflag=True raise ClinicalDataException('Gender is not recognised') try: patientData = self.conn.cursor() consentDate = self.formatDate(patientDetails['dateOfConsent']) # uD1 = patientDetails['updateStamp'].split(' ') hash_object = hashlib.sha256(patientDetails['hospital_numberNo'].encode('utf-8')) hex_dig = hash_object.hexdigest() #hospital_numberNo = self.encode(patientDetails['hospital_numberNo']) patientData.execute("UPDATE PERSON SET age_at_consent="+str(patientDetails['ageOfConsent'])+", hospital_number=ENCRYPTBYPASSPHRASE('"+self.config['patientkey']+"', '"+patientDetails['hospital_numberNo']+"'), hospital_number_hash='"+hex_dig+"', gender_concept_id="+str(genderConceptID)+", consent_date='"+consentDate+"', last_update="+lastUpdate+" WHERE target_id = '"+patientDetails['patientNumber']+"'") patientData.execute("UPDATE PERSON SET consultant_concept_id=(SELECT TOP 1 consultant_concept_id FROM CONCEPT_CONSULTANT WHERE name = '" + patientDetails['consultant'] + "') WHERE target_id = '"+patientDetails['patientNumber']+"'") self.log.logMessage('Patient ID '+str(personID)+' updated.') except Exception as e: self.log.systemStatusUpdate(self.filename, 'Clinical', self.log.timestamp(), 'Error: Data import failed (update person) -- ' +patientDetails['patientNumber']) self.log.logMessage('Error updateing person '+patientDetails['patientNumber']+"\n" + (str(e))) self.errorflag=True print(str(e)) print(1) raise ClinicalDataException('Error updating person') # Update the condition data getCondition = self.conn.cursor() getCondition.execute("SELECT * FROM CONCEPT_CONDITION_SUBTYPE WHERE subtype_name = '"+patientDetails['primaryTumourType']+"'") row = getCondition.fetchone() if row is None: insertCondition = self.conn.cursor() insertCondition.execute("INSERT INTO CONCEPT_CONDITION_SUBTYPE (subtype_name, condition_concept_id) VALUES ('"+patientDetails['primaryTumourType']+"', (select top 1 condition_concept_id from CONCEPT_CONDITION where condition_name='Other'))") if len(patientDetails['dateOfDiagnosis']) > 0: try: conditionData = self.conn.cursor() diagnosisDate = self.formatDate(patientDetails['dateOfDiagnosis']) #check if exists conditionData.execute("IF NOT EXISTS (SELECT * FROM CONDITION_OCCURRENCE WHERE person_id="+str(personID)+") "+ "INSERT INTO CONDITION_OCCURRENCE (person_id, condition_concept_id, condition_start_date, condition_subtype_concept_id, additional_details) VALUES("+str(personID)+", (SELECT TOP 1 condition_concept_id FROM CONCEPT_CONDITION_SUBTYPE WHERE subtype_name = '"+patientDetails['primaryTumourType']+"'), '"+diagnosisDate+"', (SELECT TOP 1 subtype_concept_id FROM CONCEPT_CONDITION_SUBTYPE WHERE subtype_name = '"+patientDetails['primaryTumourType']+"'), '"+patientDetails['addInfo']+"')"+ "ELSE UPDATE CONDITION_OCCURRENCE SET condition_start_date='"+diagnosisDate+"', condition_concept_id=(SELECT TOP 1 condition_concept_id FROM CONCEPT_CONDITION_SUBTYPE WHERE subtype_name = '"+patientDetails['primaryTumourType']+"'), condition_subtype_concept_id=(SELECT TOP 1 subtype_concept_id FROM CONCEPT_CONDITION_SUBTYPE WHERE subtype_name = '"+patientDetails['primaryTumourType']+"'), additional_details='"+patientDetails['addInfo']+"'WHERE person_id="+str(personID)) conditionDataID = conditionData.lastrowid self.log.logMessage('conditionData ID '+str(conditionDataID)+' updated.') except Exception as e: self.log.systemStatusUpdate(self.filename, 'Clinical', self.log.timestamp(), 'Error: Data import failed (condition data) -- ' +patientDetails['patientNumber']) self.log.logMessage('Error updateing person '+patientDetails['patientNumber']+ ' Condition_Occurance \n' +str(e)) self.errorflag=True print(str(e)) print(3) raise ClinicalDataException('Error Condition_Occurance') try: consultantData = self.conn.cursor() consultantData.execute("UPDATE PERSON SET consultant_concept_id='"+str(consultantID)+"' WHERE person_id = '"+str(personID)+"'") consultantDataID = consultantData.lastrowid self.log.logMessage('consultantDataID ID '+str(consultantDataID)+' updated.') except Exception as e: self.log.systemStatusUpdate(self.filename, 'Clinical', self.log.timestamp(), 'Error: Data import failed (update consultant) -- ' +patientDetails['patientNumber']) self.log.logMessage('Error updating consultant ' +patientDetails['patientNumber']+"\n" + str(e)) self.errorflag=True print(str(e)) print(11) raise ClinicalDataException('Error updating consultant') def insertBlood(self, personID, patientDetails): # Blood specimen if len(patientDetails['dateOfBloodCollection']) > 0: try: specimenData = self.conn.cursor() bloodDate = self.formatDate(patientDetails['dateOfBloodCollection']) specimenData.execute("IF NOT EXISTS (SELECT * FROM SPECIMEN WHERE person_id="+str(personID)+" AND specimen_concept_id=1 AND baseline_number="+str(patientDetails['sampleTimePoint'])+") "+ "INSERT INTO SPECIMEN (person_id, specimen_date, specimen_concept_id, baseline_number) VALUES("+str(personID)+", '"+bloodDate+"', 1, "+str(patientDetails['sampleTimePoint'])+") "+ "ELSE UPDATE SPECIMEN SET specimen_date = '"+bloodDate+"' WHERE person_id="+str(personID)+" AND specimen_concept_id=1 AND baseline_number="+str(patientDetails['sampleTimePoint'])) specimenDataID = specimenData.lastrowid self.log.logMessage('Specimen blood ID '+str(specimenDataID)+' updated.') except Exception as e: self.log.systemStatusUpdate(self.filename, 'Clinical', self.log.timestamp(), 'Error: Data import failed (Blood specimen) -- ' +patientDetails['patientNumber']) self.log.logMessage('Error inserting/updating blood specimen ' +patientDetails['patientNumber']+"\n" + str(e)) self.errorflag=True print(str(e)) print(4) raise ClinicalDataException('Error inserting/updating blood specimen') def insertTumour(self, personID, patientDetails): if patientDetails['sampleType'].lower() == 'ffpe' or patientDetails['sampleType'].lower() == 'snap' or patientDetails['sampleType'].lower() == 'fresh': specimenConceptID = '3' elif patientDetails['sampleType'].lower() == 'archival': specimenConceptID = '2' else: specimenConceptID = '0' if patientDetails['sampleDate']!='' and patientDetails['sampleType'].lower() != 'cdx' and patientDetails['sampleType'].lower() != 'pdx': raise Exception('unknown sample type') if specimenConceptID == '2' or specimenConceptID == '3': try: specimenData = self.conn.cursor() if patientDetails['sampleDate'] !='': tumourDate = self.formatDate(patientDetails['sampleDate']) #specimenData.execute("UPDATE SPECIMEN SET specimen_date = '"+tumourDate+"', anatomic_site_id=(SELECT anatomy_concept_id FROM CONCEPT_ANATOMY WHERE anatomy_name = '"+patientDetails['primaryTumourType']+"') WHERE person_id="+str(personID)+" AND specimen_concept_id="+specimenConceptID) else : tumourDate ='' specimenData.execute("SELECT * FROM SPECIMEN WHERE person_id="+str(personID)+" AND baseline_number="+str(patientDetails['tumourTimePoint'])+ " and specimen_concept_id="+str(specimenConceptID)) specimenDataRows = 0 specimen=specimenData.fetchall() for row in specimen: specimenID=row[0] specimenDataRows = specimenDataRows+1 # The record didn't exist to update, so let's add it now if specimenDataRows == 0: specimenData.execute("INSERT INTO SPECIMEN (person_id, specimen_date, specimen_concept_id, anatomic_site_id, tumour_id, preclin_id, baseline_number) VALUES('"+str(personID)+"', '"+tumourDate+"', "+str(specimenConceptID)+", (SELECT anatomy_concept_id FROM CONCEPT_ANATOMY WHERE anatomy_name = '"+patientDetails['primaryTumourType']+"'), '"+str(patientDetails['tumourId']) + "', '"+str(patientDetails['preclinId'])+"', "+str(patientDetails['tumourTimePoint'])+")") specimenID = specimenData.lastrowid else: specimenData.execute("UPDATE SPECIMEN set specimen_concept_id="+str(specimenConceptID)+" ,specimen_date='"+tumourDate+"', tumour_id='"+str(patientDetails['tumourId']) +"', baseline_number="+str(patientDetails['tumourTimePoint'])+", preclin_id='"+str(patientDetails['preclinId'])+"', anatomic_site_id= (SELECT TOP 1 anatomy_concept_id FROM CONCEPT_ANATOMY WHERE anatomy_name = '"+patientDetails['primaryTumourType']+"') where specimen_id="+str(specimenID)) #print(specimenDataID) self.log.logMessage('Specimen tumour ID '+str(specimenID)+' updated.') except Exception as e: self.log.systemStatusUpdate(self.filename, 'Clinical', self.log.timestamp(), 'Error: Data import failed (tumour specimen) -- ' +patientDetails['patientNumber']) self.log.logMessage('Error inserting/updating tumour specimen ' +patientDetails['patientNumber']+"\n" + str(e)) self.errorflag=True print(str(e)) print(5) raise ClinicalDataException('Error inserting/updating tumour specimen') try: gdlSQL = "" gdlData = self.conn.cursor() if len(patientDetails['gdlRequestDate']) > 0: gdlDate = self.formatDate(patientDetails['gdlRequestDate']) gdlSQL = "UPDATE GDL_REQUEST SET date_requested='"+gdlDate+"', sample_type='"+patientDetails['biopsyLocation']+"' WHERE specimen_id=(SELECT TOP 1 specimen_id FROM SPECIMEN WHERE person_id="+str(personID)+" AND ((specimen_concept_id="+str(specimenConceptID)+" and specimen_date='"+tumourDate+"' and (preclin_id is NULL)) OR dbo.RemoveNonAlphaNumChars(preclin_id) like dbo.RemoveNonAlphaNumChars('"+str(patientDetails['preclinId'])+"')))" gdlData.execute(gdlSQL) if gdlData.rowcount == 0: gdlSQL = "INSERT INTO GDL_REQUEST (date_requested, path_lab_ref, specimen_id, sample_type) VALUES('"+gdlDate+"', '', '"+str(specimenID)+"', '"+patientDetails['biopsyLocation']+"')" gdlData.execute(gdlSQL) else: gdlSQL = "UPDATE GDL_REQUEST SET sample_type='"+patientDetails['biopsyLocation']+"' WHERE specimen_id=(SELECT TOP 1 specimen_id FROM SPECIMEN WHERE person_id="+str(personID)+" AND specimen_concept_id="+str(specimenConceptID)+" AND ((specimen_date='"+tumourDate+"' and (preclin_id is NULL)) OR dbo.RemoveNonAlphaNumChars(preclin_id) like dbo.RemoveNonAlphaNumChars('"+str(patientDetails['preclinId'])+"')))" gdlData.execute(gdlSQL) if gdlData.rowcount == 0: gdlSQL = "INSERT INTO GDL_REQUEST (path_lab_ref, specimen_id, sample_type) VALUES('', '"+str(specimenID)+"', '"+patientDetails['biopsyLocation']+"')" gdlData.execute(gdlSQL) specimenID = specimenData.lastrowid print(specimenID) self.log.logMessage('GDL ID '+str(specimenID)+' updated.') except Exception as e: self.log.systemStatusUpdate(self.filename, 'Clinical', self.log.timestamp(), 'Error: Data import failed (GDL request) -- ' +patientDetails['patientNumber']) self.log.logMessage('Error inserting/updating GDL data ' +patientDetails['patientNumber']+"\n" + str(e)) self.errorflag=True print(str(e)) print(6) raise ClinicalDataException('Error inserting/updating GDL data') def insertPDXCDX(self, personID, patientDetails): if patientDetails['sampleType'] == 'PDX' or patientDetails['sampleType'] == 'CDX': # Insert the PDX/CDX data try: insertPDXCDX = self.conn.cursor() if patientDetails['sampleDate']!='': pdxDate = self.formatDate(patientDetails['sampleDate']) insertPDXCDX.execute("IF NOT EXISTS (SELECT * FROM PDXCDX WHERE person_id ='"+str(personID)+"' AND date_created='"+str(pdxDate)+"' AND pdx_or_cdx='"+str(patientDetails['sampleType'])+"') INSERT INTO PDXCDX (date_created, pdx_or_cdx, timepoint, person_id) VALUES('"+str(pdxDate)+"', '"+str(patientDetails['sampleType'])+"', '"+str(patientDetails['sampleTimePoint'])+"', '"+str(personID)+"')") else : insertPDXCDX.execute("IF NOT EXISTS (SELECT * FROM PDXCDX WHERE person_id ='"+str(personID)+"' AND date_created is null AND pdx_or_cdx='"+str(patientDetails['sampleType'])+"') INSERT INTO PDXCDX (pdx_or_cdx, timepoint, person_id) VALUES('"+str(patientDetails['sampleType'])+"', '"+str(patientDetails['sampleTimePoint'])+"', '"+str(personID)+"')") insertPDXCDXID = insertPDXCDX.lastrowid except Exception as e: self.log.systemStatusUpdate(self.filename, 'Clinical', self.log.timestamp(), 'Error: Data import failed (PDX/CDX) -- ' +patientDetails['patientNumber']) self.log.logMessage('Error insert PDXCDX ' +patientDetails['patientNumber']+"\n" + str(e)) self.errorflag=True print(str(e)) print(9) raise ClinicalDataException('Error insert PDXCDX') def formatDateTime(self, dateString): # print(dateString) if len(dateString)<16: return self.formatDate(dateString) try: dateString= formattedDate = dateString[0:16] datetimeO=datetime.strptime(dateString, '%d/%m/%Y %H:%M') return datetimeO.strftime('%Y-%m-%d %H:%M:%S') except ValueError: raise Exception('Date string not correct '+ dateString) def formatDate(self, dateString): #print(dateString) dateList = dateString.split('/') formattedDate = dateList[2]+'-'+dateList[1]+'-'+dateList[0] if len(dateList[2]) < 3: formattedDate = '20'+formattedDate return formattedDate