syndicate/core/build/runtime/python.py (331 lines of code) (raw):
"""
Copyright 2018 EPAM Systems, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import concurrent
import glob
import json
import os
import platform
import shutil
import subprocess
import sys
from concurrent.futures import FIRST_EXCEPTION
from concurrent.futures import FIRST_EXCEPTION
from concurrent.futures.thread import ThreadPoolExecutor
from itertools import chain
from pathlib import Path
from typing import Union, Optional, List, Set
from syndicate.commons.log_helper import get_logger, get_user_logger
from syndicate.core.build.helper import build_py_package_name, zip_dir, \
remove_dir
from syndicate.core.conf.processor import path_resolver
from syndicate.core.constants import (LAMBDA_CONFIG_FILE_NAME, DEFAULT_SEP,
REQ_FILE_NAME, LOCAL_REQ_FILE_NAME,
LAMBDA_LAYER_CONFIG_FILE_NAME,
PYTHON_LAMBDA_LAYER_PATH,
MANY_LINUX_2014_PLATFORM)
from syndicate.core.helper import (build_path, unpack_kwargs, zip_ext,
without_zip_ext)
from syndicate.core.resources.helper import validate_params
_LOG = get_logger(__name__)
USER_LOG = get_user_logger()
_PY_EXT = "*.py"
EMPTY_LINE_CHARS = ('\n', '\r\n', '\t')
def assemble_python_lambdas(project_path, bundles_dir, errors_allowed,
**kwargs):
from syndicate.core import CONFIG
project_base_folder = os.path.basename(os.path.normpath(project_path))
if project_path != '.':
project_abs_path = build_path(CONFIG.project_path, project_base_folder)
else:
project_abs_path = CONFIG.project_path
_LOG.info('Going to process python project by path: {0}'.format(
project_abs_path))
with ThreadPoolExecutor(max_workers=5) as executor:
futures = []
for root, sub_dirs, files in os.walk(project_abs_path):
for item in files:
if item.endswith(LAMBDA_CONFIG_FILE_NAME):
_LOG.info('Going to build artifact in: {0}'.format(root))
arg = {
'root': str(Path(root)),
'config_file': str(Path(root, item)),
'target_folder': bundles_dir,
'project_path': project_path,
'errors_allowed': errors_allowed
}
futures.append(
executor.submit(_build_python_artifact, arg))
elif item.endswith(LAMBDA_LAYER_CONFIG_FILE_NAME):
_LOG.info(f'Going to build lambda layer in `{root}`')
arg = {
'layer_root': root,
'bundle_dir': bundles_dir,
'project_path': project_path,
'errors_allowed': errors_allowed
}
futures.append(
executor.submit(build_python_lambda_layer, arg))
result = concurrent.futures.wait(futures, return_when=FIRST_EXCEPTION)
for future in result.done:
exception = future.exception()
if exception:
raise AssertionError(exception)
_LOG.info('Python project was processed successfully')
@unpack_kwargs
def build_python_lambda_layer(layer_root: str, bundle_dir: str,
project_path: str, errors_allowed: bool):
"""
Layer root is a dir where these files exist:
- lambda_layer_config.json
- local_requirements.txt
- requirements.txt
"""
with open(Path(layer_root, LAMBDA_LAYER_CONFIG_FILE_NAME), 'r') as file:
layer_config = json.load(file)
validate_params(layer_root, layer_config, ['name', 'deployment_package'])
artifact_name = without_zip_ext(layer_config['deployment_package'])
artifact_path = Path(bundle_dir, artifact_name)
path_for_requirements = artifact_path / PYTHON_LAMBDA_LAYER_PATH
_LOG.info(f'Artifacts path: {artifact_path}')
os.makedirs(artifact_path, exist_ok=True)
# install requirements.txt content
requirements_path = Path(layer_root, REQ_FILE_NAME)
if os.path.exists(requirements_path):
install_requirements_to(requirements_path, to=path_for_requirements,
config=layer_config,
errors_allowed=errors_allowed)
# install local requirements
local_requirements_path = Path(layer_root, LOCAL_REQ_FILE_NAME)
if os.path.exists(local_requirements_path):
_LOG.info('Going to install local dependencies')
_install_local_req(path_for_requirements, local_requirements_path,
project_path)
_LOG.info('Local dependencies were installed successfully')
# making zip archive
package_name = zip_ext(layer_config['deployment_package'])
_LOG.info(f'Packaging artifacts by {artifact_path} to {package_name}')
zip_dir(str(artifact_path), str(Path(bundle_dir, package_name)))
_LOG.info(f'Package \'{package_name}\' was successfully created')
# remove unused folder
remove_dir(artifact_path)
_LOG.info(f'"{artifact_path}" was removed successfully')
@unpack_kwargs
def _build_python_artifact(root, config_file, target_folder, project_path,
errors_allowed):
_LOG.info(f'Building artifact in {target_folder}')
# create folder to store artifacts
with open(config_file, 'r') as file:
lambda_config = json.load(file)
validate_params(root, lambda_config, ['lambda_path', 'name', 'version'])
artifact_name = f'{lambda_config["name"]}-{lambda_config["version"]}'
artifact_path = Path(target_folder, artifact_name)
_LOG.info(f'Artifacts path: {artifact_path}')
os.makedirs(artifact_path, exist_ok=True)
# install requirements.txt content
requirements_path = Path(root, REQ_FILE_NAME)
if os.path.exists(requirements_path):
install_requirements_to(requirements_path, to=artifact_path,
config=lambda_config,
errors_allowed=errors_allowed)
# install local requirements
local_requirements_path = Path(root, LOCAL_REQ_FILE_NAME)
if os.path.exists(local_requirements_path):
_LOG.info('Going to install local dependencies')
_install_local_req(artifact_path, local_requirements_path,
project_path)
_LOG.info('Local dependencies were installed successfully')
# copy lambda's specific packages
packages_dir = artifact_path / 'lambdas' / Path(root).name
os.makedirs(packages_dir, exist_ok=True)
for package in filter(
is_python_package,
[Path(root, item) for item in os.listdir(root)]):
_LOG.info(f'Copying package {package} to lambda\'s artifacts packages '
f'dir: {packages_dir}')
shutil.copytree(str(package), str(packages_dir / package.name))
_LOG.info('Copied successfully')
# copy lambda's handler to artifacts folder
_LOG.info(f'Copying lambda\'s handler from {root} to {artifact_path}')
_copy_py_files(root, artifact_path)
# making zip archive
package_name = build_py_package_name(lambda_config["name"],
lambda_config["version"])
_LOG.info(f'Packaging artifacts by {artifact_path} to {package_name}')
zip_dir(str(artifact_path), str(Path(target_folder, package_name)))
_LOG.info(f'Package \'{package_name}\' was successfully created')
# remove unused folder
remove_dir(artifact_path)
_LOG.info(f'"{artifact_path}" was removed successfully')
def install_requirements_to(requirements_txt: Union[str, Path],
to: Union[str, Path],
config: Optional[dict] = None,
errors_allowed: bool = False):
"""
1. If there is NO "platform" parameter in lambda_config.json, then the
dependency installation will be executed by the default command:
"pip install -r requirements.txt".
2. If there is NO "platform" parameter in lambda_config.json and flag
--errors_allowed is True, dependency installation will be tried by
executing the default command: "pip install -r requirements.txt" in case
of failures, installation of dependencies will be performed separately
for each dependency using the default command:
"pip install <package1>
pip install <packageN>".
3. If there is "platform" parameter in lambda_config.json and flag
--errors_allowed is False, then the dependency installation will be
executed by the default command using additional parameters:
"pip install -r requirements.txt --platform manylinux2014_x86_64
--only-binary=:all: --implementation=cp --python-version 3.8".
4. If there is "platform" parameter in lambda_config.json and flag
--errors_allowed is True, dependency installation will be tried by the
default command using additional parameters:
"pip install -r requirements.txt --platform manylinux2014_x86_64
--only-binary=:all: --implementation=cp --python-version 3.8",
in case of failures, installation of dependencies will be performed
separately for each dependency using the default command using additional
parameters. Dependencies that do not have a specified platform will be
installed with the --platform=any:
"pip install <package1> --platform manylinux2014_x86_64
--only-binary=:all: --implementation=cp --python-version 3.8
pip install <packageN> --platform manylinux2014_x86_64
--only-binary=:all: --implementation=cp --python-version 3.8
pip install <packageN+1>".
"""
exit_code = None
config = config or {}
_LOG.info('Going to install 3-rd party dependencies')
supported_platforms = update_platforms(set(config.get('platforms') or []))
python_version = _get_python_version(lambda_config=config)
if supported_platforms:
command = build_pip_install_command( # default installation
requirement=requirements_txt,
to=to,
platforms=supported_platforms,
python=python_version,
only_binary=':all:',
implementation='cp'
)
result = subprocess.run(command, capture_output=True, text=True)
_LOG.info(f'\n{result.stdout}\n{result.stderr}')
if result.returncode != 0 and errors_allowed:
# tries to install packages compatible with specific platforms
# independently
_LOG.info(
f'Going to install 3-rd party dependencies for platforms: '
f'{",".join(supported_platforms)}')
failed_requirements = install_requirements_independently(
requirements=requirements_txt,
to=to,
supported_platforms=supported_platforms,
python_version=python_version
)
failed_requirements = install_requirements_independently(
requirements=failed_requirements,
to=to
)
_LOG.info(f'\n{result.stdout}\n{result.stderr}')
if failed_requirements:
message = (f'An error occurred while installing '
f'requirements: "{failed_requirements}" for '
f'package "{to}"')
_LOG.error(message)
raise RuntimeError(message)
elif result.returncode != 0:
exit_code = result.returncode
else:
_LOG.info('Installing all the requirements with defaults')
command = build_pip_install_command(
requirement=requirements_txt,
to=to
)
result = subprocess.run(command, capture_output=True, text=True)
exit_code = result.returncode
if result.returncode != 0 and errors_allowed:
# tries to install packages independently
_LOG.info(
'Installing the requirements with defaults independently')
failed_requirements = install_requirements_independently(
requirements=requirements_txt,
to=to
)
_LOG.info(f'\n{result.stdout}\n{result.stderr}')
if failed_requirements:
message = (f'An error occurred while installing '
f'requirements: "{failed_requirements}" for '
f'package "{to}"')
_LOG.error(message)
raise RuntimeError(message)
if exit_code:
message = (f'An error: \n"{result.stdout}\n{result.stderr}"\noccurred '
f'while installing requirements: "{str(requirements_txt)}" '
f'for package "{to}"\nUse --errors_allowed flag to ignore '
f'failures in dependencies installation.')
_LOG.error(message)
raise RuntimeError(message)
if exit_code == 0:
_LOG.info(f'\n{result.stdout}\n{result.stderr}')
_LOG.info('3-rd party dependencies were installed successfully')
def build_pip_install_command(
requirement: Optional[Union[str, Path]],
to: Optional[Union[str, Path]] = None,
implementation: Optional[str] = None,
python: Optional[str] = None,
only_binary: Optional[str] = None,
platforms: Optional[Set[str]] = None,
additional_args: Optional[List[str]] = None) -> List[str]:
"""
:param requirement: path to requirements.txt or just one requirement.
If the path is not file or does not exist it will be treated as one
requirement.
:param to: Optional[str] the path where to install requirements
:param implementation: Optional[str], can be `cp`
:param python: Optional[str], can be `3.8`, `3.9`
:param only_binary: Optional[str], can be `:all:` or `:none:`
:param platforms: Optional[Set], can be {'manylinux2014_x86_64'}
:param additional_args: Optional[List[str]] list or some additional args
:return: List[str]
"""
command = [
sys.executable, '-m', 'pip', 'install'
]
r_path = Path(requirement)
if r_path.exists() and r_path.is_file():
command.extend(['-r', str(r_path)])
else: # not a path to requirements.txt but one requirement
command.append(requirement)
if to:
command.extend(['-t', str(to)])
if implementation:
command.extend(['--implementation', 'cp'])
if python:
command.extend(['--python-version', python])
if only_binary:
command.append(f'--only-binary={only_binary}')
if platforms:
command.extend(chain.from_iterable(
('--platform', p) for p in platforms
))
command.extend(additional_args or [])
return command
def update_platforms(platforms: Set[str]) -> Set[str]:
"""
If platforms are not empty, just return them without changing
(user's choice). But in case the set is empty and the current
processor is ARM (mac m1) or OS is Windows, we add
manylinux2014_x86_64 to the list of platforms because by default
lambdas with x86_64.
This code is experimental and can be adjusted
"""
if platforms:
return platforms
_arm = platform.processor() == 'arm'
_win = platform.system() == 'Windows'
if _arm or _win:
platforms.add(MANY_LINUX_2014_PLATFORM)
return platforms
def install_requirements_independently(requirements: Union[str, Path, List[str]],
to: Union[str, Path],
python_version: str = None,
supported_platforms: Set[str] = None) \
-> List[str]:
if type(requirements) != list:
fp = open(requirements, 'r')
it = (
line.split(' #')[0].strip() for line in
filter(lambda line: not line.strip().startswith('#')
and line not in EMPTY_LINE_CHARS, fp)
)
else:
it = requirements
failed_requirements = []
implementation = 'cp' if python_version or supported_platforms else None
only_binary = ':all:' if python_version or supported_platforms else None
for requirement in it:
command = build_pip_install_command(
requirement=requirement,
to=to,
implementation=implementation,
python=python_version,
only_binary=only_binary,
platforms=supported_platforms
)
result = subprocess.run(command, capture_output=True, text=True)
if result.returncode != 0:
message = (f'An error occurred while installing requirements from '
f'"{str(requirements)}" for platforms: '
f'{",".join(supported_platforms) if supported_platforms else "any"}: '
f'for package "{requirement}"'
f'\nDetails: \n"{result.stdout}\n{result.stderr}"\n')
USER_LOG.error(f"\033[93m{message}\033[0m")
failed_requirements.append(requirement)
else:
_LOG.info(f'\n{result.stdout}\n{result.stderr}')
if type(requirements) != list:
fp.close()
return failed_requirements
def _install_local_req(artifact_path, local_req_path, project_path):
from syndicate.core import CONFIG
with open(local_req_path) as f:
local_req_list = f.readlines()
local_req_list = [path_resolver(r.strip()) for r in local_req_list]
_LOG.info(f'Installing local dependencies: {local_req_list}')
# copy folders
for lrp in local_req_list:
_LOG.info(f'Processing local dependency: {lrp}')
shutil.copytree(str(Path(CONFIG.project_path, project_path, lrp)),
str(Path(artifact_path, lrp)))
_LOG.debug('Dependency was copied successfully')
folders = [r for r in lrp.split(DEFAULT_SEP) if r]
# process folder from root python project
folders.insert(0, '')
i = 0
temp_path = ''
while i < len(folders):
temp_path += DEFAULT_SEP + folders[i]
src_path = Path(CONFIG.project_path, project_path, temp_path)
dst_path = Path(artifact_path, temp_path)
_copy_py_files(str(src_path), str(dst_path))
i += 1
_LOG.debug('Python files from packages were copied successfully')
def _get_python_version(lambda_config: dict) -> Optional[str]:
"""
Lambda config or layer config.
"runtime": "python3.7" => "3.7".
If "runtime" contains a list with runtimes. The lowest version is returned
"""
runtimes: Union[None, List, str] = lambda_config.get('runtime')
if not runtimes:
return
if isinstance(runtimes, str):
runtimes = [runtimes]
return sorted(
''.join(ch for ch in runtime if ch.isdigit() or ch == '.')
for runtime in runtimes
)[0]
def _copy_py_files(search_path, destination_path):
files = glob.iglob(build_path(search_path, _PY_EXT))
for py_file in files:
if os.path.isfile(py_file):
shutil.copy2(py_file, destination_path)
def is_python_package(path: Union[Path, str]) -> bool:
"""A file is considered to be a package if it's a directory containing
__init__.py"""
return os.path.isdir(path) and os.path.exists(Path(path, '__init__.py'))