in luigi/contrib/salesforce.py [0:0]
def run(self):
if self.use_sandbox and not self.sandbox_name:
raise Exception("Parameter sf_sandbox_name must be provided when uploading to a Salesforce Sandbox")
sf = SalesforceAPI(salesforce().username,
salesforce().password,
salesforce().security_token,
salesforce().sb_security_token,
self.sandbox_name)
job_id = sf.create_operation_job('query', self.object_name, content_type=self.content_type)
logger.info("Started query job %s in salesforce for object %s" % (job_id, self.object_name))
batch_id = ''
msg = ''
try:
if self.is_soql_file:
with open(self.soql, 'r') as infile:
self.soql = infile.read()
batch_id = sf.create_batch(job_id, self.soql, self.content_type)
logger.info("Creating new batch %s to query: %s for job: %s." % (batch_id, self.object_name, job_id))
status = sf.block_on_batch(job_id, batch_id)
if status['state'].lower() == 'failed':
msg = "Batch failed with message: %s" % status['state_message']
logger.error(msg)
# don't raise exception if it's b/c of an included relationship
# normal query will execute (with relationship) after bulk job is closed
if 'foreign key relationships not supported' not in status['state_message'].lower():
raise Exception(msg)
else:
result_ids = sf.get_batch_result_ids(job_id, batch_id)
# If there's only one result, just download it, otherwise we need to merge the resulting downloads
if len(result_ids) == 1:
data = sf.get_batch_result(job_id, batch_id, result_ids[0])
with open(self.output().path, 'wb') as outfile:
outfile.write(data)
else:
# Download each file to disk, and then merge into one.
# Preferring to do it this way so as to minimize memory consumption.
for i, result_id in enumerate(result_ids):
logger.info("Downloading batch result %s for batch: %s and job: %s" % (result_id, batch_id, job_id))
with open("%s.%d" % (self.output().path, i), 'wb') as outfile:
outfile.write(sf.get_batch_result(job_id, batch_id, result_id))
logger.info("Merging results of batch %s" % batch_id)
self.merge_batch_results(result_ids)
finally:
logger.info("Closing job %s" % job_id)
sf.close_job(job_id)
if 'state_message' in status and 'foreign key relationships not supported' in status['state_message'].lower():
logger.info("Retrying with REST API query")
data_file = sf.query_all(self.soql)
reader = csv.reader(data_file)
with open(self.output().path, 'wb') as outfile:
writer = csv.writer(outfile, dialect='excel')
for row in reader:
writer.writerow(row)