dusty/commands/git_clone.py (158 lines of code) (raw):
#!/usr/bin/python3
# coding=utf-8
# Copyright 2019 getcarrier.io
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Command: git-clone
"""
import os
import io
import getpass
import dulwich # pylint: disable=E0401
from dulwich import porcelain # pylint: disable=E0401
from dulwich.contrib.paramiko_vendor import ParamikoSSHVendor # pylint: disable=E0401
import paramiko # pylint: disable=E0401
import paramiko.client # pylint: disable=E0401
import paramiko.transport # pylint: disable=E0401
from paramiko.ssh_exception import SSHException # pylint: disable=E0401
from paramiko.message import Message # pylint: disable=E0401
from dusty.tools import log
from dusty.models.module import ModuleModel
from dusty.models.command import CommandModel
class Command(ModuleModel, CommandModel):
""" Generate sample config """
def __init__(self, argparser):
""" Initialize command instance, add arguments """
super().__init__()
argparser.add_argument(
"-r", "--repository", dest="source",
help="source git repository",
type=str
)
argparser.add_argument(
"-t", "--target", dest="target",
help="target directory",
type=str
)
argparser.add_argument(
"-b", "--branch", dest="branch",
help="repository branch",
type=str, default="master"
)
argparser.add_argument(
"-l", "--lightweight", dest="depth",
help="limit clone depth",
type=int
)
argparser.add_argument(
"-u", "--username", dest="username",
help="username",
type=str
)
argparser.add_argument(
"-p", "--password", dest="password",
help="password",
type=str
)
argparser.add_argument(
"-k", "--key", dest="key",
help="SSH key file",
type=str
)
argparser.add_argument(
"-K", "--key-data", dest="key_data",
help="SSH key data",
type=str
)
argparser.add_argument(
"--username-variable", dest="username_variable",
help="environment variable with username",
type=str, default="GIT_LOGIN"
)
argparser.add_argument(
"--password-variable", dest="password_variable",
help="environment variable with password",
type=str, default="GIT_PASSWORD"
)
argparser.add_argument(
"--key-variable", dest="key_variable",
help="environment variable with path to SSH key",
type=str, default="GIT_KEY"
)
argparser.add_argument(
"--key-data-variable", dest="key_data_variable",
help="environment variable with SSH key data",
type=str, default="GIT_KEY_DATA"
)
def execute(self, args):
""" Run the command """
log.debug("Starting")
# Check args
if not args.source or not args.target:
log.error("Please specify source and target.")
return
# Patch dulwich to work without valid UID/GID
dulwich.repo.__original__get_default_identity = dulwich.repo._get_default_identity # pylint: disable=W0212
dulwich.repo._get_default_identity = _dulwich_repo_get_default_identity # pylint: disable=W0212
# Patch dulwich to use paramiko SSH client
dulwich.client.get_ssh_vendor = ParamikoSSHVendor
# Patch paramiko to skip key verification
paramiko.transport.Transport._verify_key = _paramiko_transport_verify_key # pylint: disable=W0212
# Set USERNAME if needed
try:
getpass.getuser()
except: # pylint: disable=W0702
os.environ["USERNAME"] = "git"
# Fill args
depth = None
if args.depth:
depth = args.depth
# Prepare auth
auth_args = dict()
# Take from env variables
if args.username_variable and args.username_variable in os.environ:
auth_args["username"] = os.environ[args.username_variable]
os.environ["USERNAME"] = os.environ[args.username_variable]
if args.password_variable and args.password_variable in os.environ:
auth_args["password"] = os.environ[args.password_variable]
if args.key_variable and args.key_variable in os.environ:
auth_args["key_filename"] = os.environ[args.key_variable]
if args.key_data_variable and args.key_data_variable in os.environ:
key_obj = io.StringIO(os.environ[args.key_data_variable].replace("|", "\n"))
pkey = paramiko.RSAKey.from_private_key(key_obj)
# Patch paramiko to use our key
paramiko.client.SSHClient._auth = _paramiko_client_SSHClient_auth( # pylint: disable=W0212
paramiko.client.SSHClient._auth, pkey # pylint: disable=W0212
)
# Take from commandline parameters
if args.username:
auth_args["username"] = args.username
os.environ["USERNAME"] = args.username
if args.password:
auth_args["password"] = args.password
if args.key:
auth_args["key_filename"] = args.key
if args.key_data:
key_obj = io.StringIO(args.key_data.replace("|", "\n"))
pkey = paramiko.RSAKey.from_private_key(key_obj)
# Patch paramiko to use our key
paramiko.client.SSHClient._auth = _paramiko_client_SSHClient_auth( # pylint: disable=W0212
paramiko.client.SSHClient._auth, pkey # pylint: disable=W0212
)
# Clone repository
log.info("Cloning repository %s into %s", args.source, args.target)
repository = porcelain.clone(
args.source, args.target,
checkout=False, depth=depth,
errstream=log.DebugLogStream(),
**auth_args
)
# Checkout branch
log.info("Checking out branch %s", args.branch)
branch = args.branch.encode("utf-8")
repository[b"refs/heads/" + branch] = repository[b"refs/remotes/origin/" + branch]
repository.refs.set_symbolic_ref(b"HEAD", b"refs/heads/" + branch)
repository.reset_index(repository[b"HEAD"].tree)
@staticmethod
def get_name():
""" Command name """
return "git-clone"
@staticmethod
def get_description():
""" Command help message (description) """
return "clone remote git repository"
def _dulwich_repo_get_default_identity():
try:
return dulwich.repo.__original__get_default_identity() # pylint: disable=W0212
except: # pylint: disable=W0702
return ("Carrier User", "dusty@localhost")
def _paramiko_transport_verify_key(self, host_key, sig): # pylint: disable=W0613
key = self._key_info[self.host_key_type](Message(host_key)) # pylint: disable=W0212
if key is None:
raise SSHException('Unknown host key type')
self.host_key = key
def _paramiko_client_SSHClient_auth(original_auth, forced_pkey): # pylint: disable=C0103
def __paramiko_client_SSHClient_auth( # pylint: disable=C0103,R0913
self, username, password, pkey, key_filenames, allow_agent, look_for_keys, # pylint: disable=W0613
gss_auth, gss_kex, gss_deleg_creds, gss_host, passphrase
):
return original_auth(
self, username, password, forced_pkey, key_filenames, allow_agent, look_for_keys,
gss_auth, gss_kex, gss_deleg_creds, gss_host, passphrase
)
return __paramiko_client_SSHClient_auth