def create_job_resources()

in sdks/python/apache_beam/runners/portability/stager.py [0:0]


  def create_job_resources(options,  # type: PipelineOptions
                           temp_dir,  # type: str
                           build_setup_args=None,  # type: Optional[List[str]]
                           pypi_requirements=None, # type: Optional[List[str]]
                           populate_requirements_cache=None,  # type: Optional[str]
                           skip_prestaged_dependencies=False, # type: Optional[bool]
                           ):
    """For internal use only; no backwards-compatibility guarantees.

        Creates (if needed) a list of job resources.

        Args:
          options: Command line options. More specifically the function will
            expect requirements_file, setup_file, and save_main_session options
            to be present.
          temp_dir: Temporary folder where the resource building can happen. If
            None then a unique temp directory will be created. Used only for
            testing.
          build_setup_args: A list of command line arguments used to build a
            setup package. Used only if options.setup_file is not None. Used
            only for testing.
          pypi_requirements: A list of PyPI requirements used to cache source
            packages.
          populate_requirements_cache: Callable for populating the requirements
            cache. Used only for testing.
          skip_prestaged_dependencies: Skip staging dependencies that can be
            added into SDK containers during prebuilding.

        Returns:
          A list of ArtifactInformation to be used for staging resources.

        Raises:
          RuntimeError: If files specified are not found or error encountered
          while trying to create the resources (e.g., build a setup package).
        """

    resources = []  # type: List[beam_runner_api_pb2.ArtifactInformation]

    setup_options = options.view_as(SetupOptions)

    # We can skip boot dependencies: apache beam sdk, python packages from
    # requirements.txt, python packages from extra_packages and workflow tarball
    # if we know we are using a dependency pre-installed sdk container image.
    if not skip_prestaged_dependencies:
      requirements_cache_path = (
          os.path.join(tempfile.gettempdir(), 'dataflow-requirements-cache')
          if setup_options.requirements_cache is None else
          setup_options.requirements_cache)
      if not os.path.exists(requirements_cache_path):
        os.makedirs(requirements_cache_path)

      # Stage a requirements file if present.
      if setup_options.requirements_file is not None:
        if not os.path.isfile(setup_options.requirements_file):
          raise RuntimeError(
              'The file %s cannot be found. It was specified in the '
              '--requirements_file command line option.' %
              setup_options.requirements_file)
        resources.append(
            Stager._create_file_stage_to_artifact(
                setup_options.requirements_file, REQUIREMENTS_FILE))
        # Populate cache with packages from the requirement file option and
        # stage the files in the cache.
        (
            populate_requirements_cache if populate_requirements_cache else
            Stager._populate_requirements_cache)(
                setup_options.requirements_file, requirements_cache_path)

      if pypi_requirements:
        tf = tempfile.NamedTemporaryFile(mode='w', delete=False)
        tf.writelines(pypi_requirements)
        tf.close()
        resources.append(Stager._create_file_pip_requirements_artifact(tf.name))
        # Populate cache with packages from PyPI requirements and stage
        # the files in the cache.
        (
            populate_requirements_cache if populate_requirements_cache else
            Stager._populate_requirements_cache)(
                tf.name, requirements_cache_path)

      if setup_options.requirements_file is not None or pypi_requirements:
        for pkg in glob.glob(os.path.join(requirements_cache_path, '*')):
          resources.append(
              Stager._create_file_stage_to_artifact(pkg, os.path.basename(pkg)))

      # Handle a setup file if present.
      # We will build the setup package locally and then copy it to the staging
      # location because the staging location is a remote path and the file
      # cannot be created directly there.
      if setup_options.setup_file is not None:
        if not os.path.isfile(setup_options.setup_file):
          raise RuntimeError(
              'The file %s cannot be found. It was specified in the '
              '--setup_file command line option.' % setup_options.setup_file)
        if os.path.basename(setup_options.setup_file) != 'setup.py':
          raise RuntimeError(
              'The --setup_file option expects the full path to a file named '
              'setup.py instead of %s' % setup_options.setup_file)
        tarball_file = Stager._build_setup_package(
            setup_options.setup_file, temp_dir, build_setup_args)
        resources.append(
            Stager._create_file_stage_to_artifact(
                tarball_file, WORKFLOW_TARBALL_FILE))

      # Handle extra local packages that should be staged.
      if setup_options.extra_packages is not None:
        resources.extend(
            Stager._create_extra_packages(
                setup_options.extra_packages, temp_dir=temp_dir))

      if hasattr(setup_options, 'sdk_location'):

        if (setup_options.sdk_location == 'default') or Stager._is_remote_path(
            setup_options.sdk_location):
          # If --sdk_location is not specified then the appropriate package
          # will be obtained from PyPI (https://pypi.python.org) based on the
          # version of the currently running SDK. If the option is
          # present then no version matching is made and the exact URL or path
          # is expected.
          #
          # Unit tests running in the 'python setup.py test' context will
          # not have the sdk_location attribute present and therefore we
          # will not stage SDK.
          sdk_remote_location = 'pypi' if (
              setup_options.sdk_location == 'default'
          ) else setup_options.sdk_location
          resources.extend(
              Stager._create_beam_sdk(sdk_remote_location, temp_dir))
        elif setup_options.sdk_location == 'container':
          # Use the SDK that's built into the container, rather than re-staging
          # it.
          pass
        else:
          # This branch is also used by internal tests running with the SDK
          # built at head.
          if os.path.isdir(setup_options.sdk_location):
            sdk_path = os.path.join(
                setup_options.sdk_location, names.STAGED_SDK_SOURCES_FILENAME)
          else:
            sdk_path = setup_options.sdk_location

          if os.path.isfile(sdk_path):
            _LOGGER.info('Copying Beam SDK "%s" to staging location.', sdk_path)
            resources.append(
                Stager._create_file_stage_to_artifact(
                    sdk_path,
                    Stager._desired_sdk_filename_in_staging_location(
                        setup_options.sdk_location)))
          else:
            if setup_options.sdk_location == 'default':
              raise RuntimeError(
                  'Cannot find default Beam SDK tar file "%s"' % sdk_path)
            elif not setup_options.sdk_location:
              _LOGGER.info(
                  'Beam SDK will not be staged since --sdk_location '
                  'is empty.')
            else:
              raise RuntimeError(
                  'The file "%s" cannot be found. Its location was specified '
                  'by the --sdk_location command-line option.' % sdk_path)

    # The following artifacts are not processed by python sdk container boot
    # sequence in a setup mode and hence should not be skipped even if a
    # prebuilt sdk container image is used.

    # TODO(heejong): remove jar_packages experimental flag when cross-language
    #   dependency management is implemented for all runners.
    # Handle jar packages that should be staged for Java SDK Harness.
    jar_packages = options.view_as(DebugOptions).lookup_experiment(
        'jar_packages')
    if jar_packages is not None:
      resources.extend(
          Stager._create_jar_packages(
              jar_packages.split(','), temp_dir=temp_dir))

    # Pickle the main session if requested.
    # We will create the pickled main session locally and then copy it to the
    # staging location because the staging location is a remote path and the
    # file cannot be created directly there.
    if setup_options.save_main_session:
      pickled_session_file = os.path.join(
          temp_dir, names.PICKLED_MAIN_SESSION_FILE)
      pickler.dump_session(pickled_session_file)
      resources.append(
          Stager._create_file_stage_to_artifact(
              pickled_session_file, names.PICKLED_MAIN_SESSION_FILE))

    worker_options = options.view_as(WorkerOptions)
    dataflow_worker_jar = getattr(worker_options, 'dataflow_worker_jar', None)
    if dataflow_worker_jar is not None:
      jar_staged_filename = 'dataflow-worker.jar'
      resources.append(
          Stager._create_file_stage_to_artifact(
              dataflow_worker_jar, jar_staged_filename))

    return resources