def run()

in dusty/tools/actions/git_clone/action.py [0:0]


    def run(self):
        """ Run action """
        # Patch dulwich to work without valid UID/GID
        dulwich.repo.__original__get_default_identity = dulwich.repo._get_default_identity  # pylint: disable=W0212
        dulwich.repo._get_default_identity = _dulwich_repo_get_default_identity  # pylint: disable=W0212
        # Patch dulwich to use paramiko SSH client
        dulwich.client.get_ssh_vendor = ParamikoSSHVendor
        # Patch paramiko to skip key verification
        paramiko.transport.Transport._verify_key = _paramiko_transport_verify_key  # pylint: disable=W0212
        # Set USERNAME if needed
        try:
            getpass.getuser()
        except:  # pylint: disable=W0702
            os.environ["USERNAME"] = "git"
        # Get options
        source = self.config.get("source")
        target = self.config.get("target")
        branch = self.config.get("branch", "master")
        depth = self.config.get("depth", None)
        # Prepare auth
        auth_args = dict()
        if self.config.get("username", None) is not None:
            auth_args["username"] = self.config.get("username")
        if self.config.get("password", None) is not None:
            auth_args["password"] = self.config.get("password")
        if self.config.get("key", None) is not None:
            auth_args["key_filename"] = self.config.get("key")
        if self.config.get("key_data", None) is not None:
            key_obj = io.StringIO(self.config.get("key_data").replace("|", "\n"))
            pkey = paramiko.RSAKey.from_private_key(key_obj)
            # Patch paramiko to use our key
            paramiko.client.SSHClient._auth = _paramiko_client_SSHClient_auth(  # pylint: disable=W0212
                paramiko.client.SSHClient._auth, pkey  # pylint: disable=W0212
            )
        # Clone repository
        log.info("Cloning repository %s into %s", source, target)
        repository = porcelain.clone(
            source, target, checkout=False, depth=depth, errstream=log.DebugLogStream(), **auth_args
        )
        # Checkout branch
        log.info("Checking out branch %s", branch)
        branch = branch.encode("utf-8")
        repository[b"refs/heads/" + branch] = repository[b"refs/remotes/origin/" + branch]
        repository.refs.set_symbolic_ref(b"HEAD", b"refs/heads/" + branch)
        repository.reset_index(repository[b"HEAD"].tree)
        # Delete .git if requested
        if self.config.get("delete_git_dir", False):
            log.info("Deleting .git directory")
            shutil.rmtree(os.path.join(target, ".git"))