in tfx/components/util/udf_utils.py [0:0]
def package_user_module_file(instance_name: str, module_path: str,
pipeline_root: str) -> Tuple[str, str]:
"""Package the given user module file into a Python Wheel package.
Args:
instance_name: Name of the component instance, for creating a unique wheel
package name.
module_path: Path to the module file to be packaged.
pipeline_root: Text
Returns:
dist_file_path: Path to the generated wheel file.
user_module_path: Path for referencing the user module when stored
as the _MODULE_PATH_KEY execution property. Format should be treated
as opaque by the user.
Raises:
RuntimeError: When wheel building fails.
"""
module_path = os.path.abspath(io_utils.ensure_local(module_path))
if not module_path.endswith('.py'):
raise ValueError(f'Module path {module_path!r} is not a ".py" file.')
if not os.path.exists(module_path):
raise ValueError(f'Module path {module_path!r} does not exist.')
user_module_dir, module_file_name = os.path.split(module_path)
user_module_name = re.sub(r'\.py$', '', module_file_name)
source_files = []
# Discover all Python source files in this directory for inclusion.
for file_name in os.listdir(user_module_dir):
if file_name.endswith('.py'):
source_files.append(file_name)
module_names = []
for file_name in source_files:
if file_name in (_EPHEMERAL_SETUP_PY_FILE_NAME, '__init__.py'):
continue
module_name = re.sub(r'\.py$', '', file_name)
module_names.append(module_name)
# Set up build directory.
build_dir = tempfile.mkdtemp()
for source_file in source_files:
shutil.copyfile(
os.path.join(user_module_dir, source_file),
os.path.join(build_dir, source_file))
# Generate an ephemeral wheel for this module.
logging.info(
'Generating ephemeral wheel package for %r (including modules: %s).',
module_path, module_names)
version_hash = _get_version_hash(user_module_dir, source_files)
logging.info('User module package has hash fingerprint version %s.',
version_hash)
setup_py_path = os.path.join(build_dir, _EPHEMERAL_SETUP_PY_FILE_NAME)
with open(setup_py_path, 'w', encoding='utf-8') as f:
f.write(
_get_ephemeral_setup_py_contents(f'tfx-user-code-{instance_name}',
f'0.0+{version_hash}', module_names))
temp_dir = tempfile.mkdtemp()
dist_dir = tempfile.mkdtemp()
bdist_command = [
sys.executable, setup_py_path, 'bdist_wheel', '--bdist-dir', temp_dir,
'--dist-dir', dist_dir
]
logging.info('Executing: %s', bdist_command)
try:
subprocess.check_call(bdist_command, cwd=build_dir)
except subprocess.CalledProcessError as e:
raise RuntimeError('Failed to build wheel.') from e
dist_files = os.listdir(dist_dir)
if len(dist_files) != 1:
raise RuntimeError(f'Unexpectedly found {len(dist_files)} output files '
f'in wheel output directory {dist_dir}.')
build_dist_file_path = os.path.join(dist_dir, dist_files[0])
# Copy wheel file atomically to wheel staging directory.
dist_wheel_directory = os.path.join(pipeline_root, '_wheels')
dist_file_path = os.path.join(dist_wheel_directory, dist_files[0])
temp_dist_file_path = dist_file_path + '.tmp'
fileio.makedirs(dist_wheel_directory)
fileio.copy(build_dist_file_path, temp_dist_file_path, overwrite=True)
fileio.rename(temp_dist_file_path, dist_file_path, overwrite=True)
logging.info(
('Successfully built user code wheel distribution at %r; target user '
'module is %r.'), dist_file_path, user_module_name)
# Encode the user module key as a specification of a user module name within
# a packaged wheel path.
assert '@' not in user_module_name, ('Unexpected invalid module name: ' +
user_module_name)
user_module_path = f'{user_module_name}@{dist_file_path}'
logging.info('Full user module path is %r', user_module_path)
return dist_file_path, user_module_path