in sdks/python/apache_beam/runners/portability/stager.py [0:0]
def create_job_resources(options, # type: PipelineOptions
temp_dir, # type: str
build_setup_args=None, # type: Optional[List[str]]
pypi_requirements=None, # type: Optional[List[str]]
populate_requirements_cache=None, # type: Optional[str]
skip_prestaged_dependencies=False, # type: Optional[bool]
):
"""For internal use only; no backwards-compatibility guarantees.
Creates (if needed) a list of job resources.
Args:
options: Command line options. More specifically the function will
expect requirements_file, setup_file, and save_main_session options
to be present.
temp_dir: Temporary folder where the resource building can happen. If
None then a unique temp directory will be created. Used only for
testing.
build_setup_args: A list of command line arguments used to build a
setup package. Used only if options.setup_file is not None. Used
only for testing.
pypi_requirements: A list of PyPI requirements used to cache source
packages.
populate_requirements_cache: Callable for populating the requirements
cache. Used only for testing.
skip_prestaged_dependencies: Skip staging dependencies that can be
added into SDK containers during prebuilding.
Returns:
A list of ArtifactInformation to be used for staging resources.
Raises:
RuntimeError: If files specified are not found or error encountered
while trying to create the resources (e.g., build a setup package).
"""
resources = [] # type: List[beam_runner_api_pb2.ArtifactInformation]
setup_options = options.view_as(SetupOptions)
# We can skip boot dependencies: apache beam sdk, python packages from
# requirements.txt, python packages from extra_packages and workflow tarball
# if we know we are using a dependency pre-installed sdk container image.
if not skip_prestaged_dependencies:
requirements_cache_path = (
os.path.join(tempfile.gettempdir(), 'dataflow-requirements-cache')
if setup_options.requirements_cache is None else
setup_options.requirements_cache)
if not os.path.exists(requirements_cache_path):
os.makedirs(requirements_cache_path)
# Stage a requirements file if present.
if setup_options.requirements_file is not None:
if not os.path.isfile(setup_options.requirements_file):
raise RuntimeError(
'The file %s cannot be found. It was specified in the '
'--requirements_file command line option.' %
setup_options.requirements_file)
resources.append(
Stager._create_file_stage_to_artifact(
setup_options.requirements_file, REQUIREMENTS_FILE))
# Populate cache with packages from the requirement file option and
# stage the files in the cache.
(
populate_requirements_cache if populate_requirements_cache else
Stager._populate_requirements_cache)(
setup_options.requirements_file, requirements_cache_path)
if pypi_requirements:
tf = tempfile.NamedTemporaryFile(mode='w', delete=False)
tf.writelines(pypi_requirements)
tf.close()
resources.append(Stager._create_file_pip_requirements_artifact(tf.name))
# Populate cache with packages from PyPI requirements and stage
# the files in the cache.
(
populate_requirements_cache if populate_requirements_cache else
Stager._populate_requirements_cache)(
tf.name, requirements_cache_path)
if setup_options.requirements_file is not None or pypi_requirements:
for pkg in glob.glob(os.path.join(requirements_cache_path, '*')):
resources.append(
Stager._create_file_stage_to_artifact(pkg, os.path.basename(pkg)))
# Handle a setup file if present.
# We will build the setup package locally and then copy it to the staging
# location because the staging location is a remote path and the file
# cannot be created directly there.
if setup_options.setup_file is not None:
if not os.path.isfile(setup_options.setup_file):
raise RuntimeError(
'The file %s cannot be found. It was specified in the '
'--setup_file command line option.' % setup_options.setup_file)
if os.path.basename(setup_options.setup_file) != 'setup.py':
raise RuntimeError(
'The --setup_file option expects the full path to a file named '
'setup.py instead of %s' % setup_options.setup_file)
tarball_file = Stager._build_setup_package(
setup_options.setup_file, temp_dir, build_setup_args)
resources.append(
Stager._create_file_stage_to_artifact(
tarball_file, WORKFLOW_TARBALL_FILE))
# Handle extra local packages that should be staged.
if setup_options.extra_packages is not None:
resources.extend(
Stager._create_extra_packages(
setup_options.extra_packages, temp_dir=temp_dir))
if hasattr(setup_options, 'sdk_location'):
if (setup_options.sdk_location == 'default') or Stager._is_remote_path(
setup_options.sdk_location):
# If --sdk_location is not specified then the appropriate package
# will be obtained from PyPI (https://pypi.python.org) based on the
# version of the currently running SDK. If the option is
# present then no version matching is made and the exact URL or path
# is expected.
#
# Unit tests running in the 'python setup.py test' context will
# not have the sdk_location attribute present and therefore we
# will not stage SDK.
sdk_remote_location = 'pypi' if (
setup_options.sdk_location == 'default'
) else setup_options.sdk_location
resources.extend(
Stager._create_beam_sdk(sdk_remote_location, temp_dir))
elif setup_options.sdk_location == 'container':
# Use the SDK that's built into the container, rather than re-staging
# it.
pass
else:
# This branch is also used by internal tests running with the SDK
# built at head.
if os.path.isdir(setup_options.sdk_location):
sdk_path = os.path.join(
setup_options.sdk_location, names.STAGED_SDK_SOURCES_FILENAME)
else:
sdk_path = setup_options.sdk_location
if os.path.isfile(sdk_path):
_LOGGER.info('Copying Beam SDK "%s" to staging location.', sdk_path)
resources.append(
Stager._create_file_stage_to_artifact(
sdk_path,
Stager._desired_sdk_filename_in_staging_location(
setup_options.sdk_location)))
else:
if setup_options.sdk_location == 'default':
raise RuntimeError(
'Cannot find default Beam SDK tar file "%s"' % sdk_path)
elif not setup_options.sdk_location:
_LOGGER.info(
'Beam SDK will not be staged since --sdk_location '
'is empty.')
else:
raise RuntimeError(
'The file "%s" cannot be found. Its location was specified '
'by the --sdk_location command-line option.' % sdk_path)
# The following artifacts are not processed by python sdk container boot
# sequence in a setup mode and hence should not be skipped even if a
# prebuilt sdk container image is used.
# TODO(heejong): remove jar_packages experimental flag when cross-language
# dependency management is implemented for all runners.
# Handle jar packages that should be staged for Java SDK Harness.
jar_packages = options.view_as(DebugOptions).lookup_experiment(
'jar_packages')
if jar_packages is not None:
resources.extend(
Stager._create_jar_packages(
jar_packages.split(','), temp_dir=temp_dir))
# Pickle the main session if requested.
# We will create the pickled main session locally and then copy it to the
# staging location because the staging location is a remote path and the
# file cannot be created directly there.
if setup_options.save_main_session:
pickled_session_file = os.path.join(
temp_dir, names.PICKLED_MAIN_SESSION_FILE)
pickler.dump_session(pickled_session_file)
resources.append(
Stager._create_file_stage_to_artifact(
pickled_session_file, names.PICKLED_MAIN_SESSION_FILE))
worker_options = options.view_as(WorkerOptions)
dataflow_worker_jar = getattr(worker_options, 'dataflow_worker_jar', None)
if dataflow_worker_jar is not None:
jar_staged_filename = 'dataflow-worker.jar'
resources.append(
Stager._create_file_stage_to_artifact(
dataflow_worker_jar, jar_staged_filename))
return resources