def run()

in luigi/contrib/salesforce.py [0:0]


    def run(self):
        if self.use_sandbox and not self.sandbox_name:
            raise Exception("Parameter sf_sandbox_name must be provided when uploading to a Salesforce Sandbox")

        sf = SalesforceAPI(salesforce().username,
                           salesforce().password,
                           salesforce().security_token,
                           salesforce().sb_security_token,
                           self.sandbox_name)

        job_id = sf.create_operation_job('query', self.object_name, content_type=self.content_type)
        logger.info("Started query job %s in salesforce for object %s" % (job_id, self.object_name))

        batch_id = ''
        msg = ''
        try:
            if self.is_soql_file:
                with open(self.soql, 'r') as infile:
                    self.soql = infile.read()

            batch_id = sf.create_batch(job_id, self.soql, self.content_type)
            logger.info("Creating new batch %s to query: %s for job: %s." % (batch_id, self.object_name, job_id))
            status = sf.block_on_batch(job_id, batch_id)
            if status['state'].lower() == 'failed':
                msg = "Batch failed with message: %s" % status['state_message']
                logger.error(msg)
                # don't raise exception if it's b/c of an included relationship
                # normal query will execute (with relationship) after bulk job is closed
                if 'foreign key relationships not supported' not in status['state_message'].lower():
                    raise Exception(msg)
            else:
                result_ids = sf.get_batch_result_ids(job_id, batch_id)

                # If there's only one result, just download it, otherwise we need to merge the resulting downloads
                if len(result_ids) == 1:
                    data = sf.get_batch_result(job_id, batch_id, result_ids[0])
                    with open(self.output().path, 'wb') as outfile:
                        outfile.write(data)
                else:
                    # Download each file to disk, and then merge into one.
                    # Preferring to do it this way so as to minimize memory consumption.
                    for i, result_id in enumerate(result_ids):
                        logger.info("Downloading batch result %s for batch: %s and job: %s" % (result_id, batch_id, job_id))
                        with open("%s.%d" % (self.output().path, i), 'wb') as outfile:
                            outfile.write(sf.get_batch_result(job_id, batch_id, result_id))

                    logger.info("Merging results of batch %s" % batch_id)
                    self.merge_batch_results(result_ids)
        finally:
            logger.info("Closing job %s" % job_id)
            sf.close_job(job_id)

        if 'state_message' in status and 'foreign key relationships not supported' in status['state_message'].lower():
            logger.info("Retrying with REST API query")
            data_file = sf.query_all(self.soql)

            reader = csv.reader(data_file)
            with open(self.output().path, 'wb') as outfile:
                writer = csv.writer(outfile, dialect='excel')
                for row in reader:
                    writer.writerow(row)