Source code for _repobee.git

"""Wrapper functions for git commands.

.. module:: git
    :synopsis: Wrapper functions for git CLI commands, such as push and clone.

.. moduleauthor:: Simon Larsén
"""
import asyncio
import os
import subprocess
import collections
import pathlib
import daiquiri
from typing import Iterable, List, Any, Callable

from _repobee import exception
from _repobee import util

CONCURRENT_TASKS = 20

LOGGER = daiquiri.getLogger(__file__)

Push = collections.namedtuple("Push", ("local_path", "repo_url", "branch"))


def _ensure_repo_dir_exists(repo_url: str, cwd: str) -> pathlib.Path:
    """Checks if a dir for the repo url exists, and if it does not, creates it.
    Also initializez (or reinitializes, if it alrady exists) as a git repo.
    """
    repo_name = util.repo_name(repo_url)
    dirpath = pathlib.Path(cwd) / repo_name
    if not dirpath.exists():
        dirpath.mkdir()
    _git_init(dirpath)
    return dirpath


def _git_init(dirpath):
    captured_run(["git", "init"], cwd=str(dirpath))


def _pull_clone(repo_url: str, branch: str = "", cwd: str = "."):
    """Simulate a clone with a pull to avoid writing remotes (that could
    include secure tokens) to disk.
    """
    dirpath = _ensure_repo_dir_exists(repo_url, cwd)

    pull_command = "git pull {} {}".format(repo_url, branch).strip().split()

    rc, _, stderr = captured_run(pull_command, cwd=str(dirpath))
    return rc, stderr


async def _pull_clone_async(repo_url: str, branch: str = "", cwd: str = "."):
    """Same as _pull_clone, but asynchronously."""
    dirpath = _ensure_repo_dir_exists(repo_url, cwd)

    pull_command = "git pull {} {}".format(repo_url, branch).strip().split()

    proc = await asyncio.create_subprocess_exec(
        *pull_command,
        cwd=str(dirpath),
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE
    )
    _, stderr = await proc.communicate()
    return proc.returncode, stderr


[docs]def captured_run(*args, **kwargs): """Run a subprocess and capture the output.""" proc = subprocess.run( *args, **kwargs, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) return proc.returncode, proc.stdout, proc.stderr
[docs]def clone_single(repo_url: str, branch: str = "", cwd: str = "."): """Clone a git repository. Args: repo_url: HTTPS url to repository on the form https://<host>/<owner>/<repo>. branch: The branch to clone. cwd: Working directory. Defaults to the current directory. """ rc, stderr = _pull_clone(repo_url, branch, cwd) if rc != 0: raise exception.CloneFailedError( "Failed to clone", rc, stderr, repo_url )
async def _clone_async(repo_url: str, branch: str = "", cwd="."): """Clone git repositories asynchronously. Args: repo_url: A url to clone. branch: Which branch to clone. cwd: Working directory. """ rc, stderr = await _pull_clone_async(repo_url, branch, cwd) if rc != 0: raise exception.CloneFailedError( "Failed to clone {}".format(repo_url), returncode=rc, stderr=stderr, url=repo_url, ) else: LOGGER.info("Cloned into {}".format(repo_url))
[docs]def clone(repo_urls: Iterable[str], cwd: str = ".") -> List[Exception]: """Clone all repos asynchronously. Args: repo_urls: URLs to repos to clone. cwd: Working directory. Defaults to the current directory. Returns: URLs from which cloning failed. """ # TODO valdate repo_urls return [ exc.url for exc in _batch_execution(_clone_async, repo_urls, cwd=cwd) if isinstance(exc, exception.CloneFailedError) ]
async def _push_async(pt: Push): """Asynchronous call to git push, pushing directly to the repo_url and branch. Args: pt: A Push namedtuple. """ command = ["git", "push", pt.repo_url, pt.branch] proc = await asyncio.create_subprocess_exec( *command, cwd=os.path.abspath(pt.local_path), stdout=subprocess.PIPE, stderr=subprocess.PIPE ) _, stderr = await proc.communicate() if proc.returncode != 0: raise exception.PushFailedError( "Failed to push to {}".format(pt.repo_url), proc.returncode, stderr, pt.repo_url, ) elif b"Everything up-to-date" in stderr: LOGGER.info("{} is up-to-date".format(pt.repo_url)) else: LOGGER.info("Pushed files to {} {}".format(pt.repo_url, pt.branch)) def _push_no_retry(push_tuples: Iterable[Push]) -> List[str]: """Push to all repos defined in push_tuples asynchronously. Amount of concurrent tasks is limited by CONCURRENT_TASKS. Pushes once and only once to each repo. Args: push_tuples: Push namedtuples defining local and remote repos. Returns: urls to which pushes failed with exception.PushFailedError. Other errors are only logged. """ return [ exc.url for exc in _batch_execution(_push_async, push_tuples) if isinstance(exc, exception.PushFailedError) ]
[docs]def push(push_tuples: Iterable[Push], tries: int = 3) -> List[str]: """Push to all repos defined in push_tuples asynchronously. Amount of concurrent tasks is limited by CONCURRENT_TASKS. Pushing to repos is tried a maximum of ``tries`` times (i.e. pushing is _retried_ ``tries - 1`` times.) Args: push_tuples: Push namedtuples defining local and remote repos. tries: Amount of times to try to push (including initial push). Returns: urls to which pushes failed with exception.PushFailedError. Other errors are only logged. """ if tries < 1: raise ValueError("tries must be larger than 0") # confusing, but failed_pts needs an initial value failed_pts = list(push_tuples) for i in range(tries): LOGGER.info("Pushing, attempt {}/{}".format(i + 1, tries)) failed_urls = set(_push_no_retry(failed_pts)) failed_pts = [pt for pt in push_tuples if pt.repo_url in failed_urls] if not failed_pts: break LOGGER.warning("{} pushes failed ...".format(len(failed_pts))) return [pt.repo_url for pt in failed_pts]
def _batch_execution( batch_func: Callable[[Iterable[Any], Any], List[asyncio.Task]], arg_list: Iterable[Any], *batch_func_args, **batch_func_kwargs ) -> List[Exception]: """Take a batch function (any function whos first argument is an iterable) and send in send in CONCURRENT_TASKS amount of arguments from the arg_list until it is exhausted. The batch_func_kwargs are provided on each call. Args: batch_func: A function that takes an iterable as a first argument and returns a list of asyncio.Task objects. arg_list: A list of objects that are of the same type as the batch_func's first argument. batch_func_kwargs: Additional keyword arguments to the batch_func. Returns: a list of exceptions raised in the tasks returned by the batch function. """ completed_tasks = [] args_iter = iter(arg_list) loop = asyncio.get_event_loop() has_more_jobs = True while has_more_jobs: args = [] for _ in range(CONCURRENT_TASKS): try: args.append(next(args_iter)) except StopIteration: has_more_jobs = False tasks = [ loop.create_task( batch_func(arg, *batch_func_args, **batch_func_kwargs) ) for arg in args ] # if # a) arg_list was empty # or # b) len(arg_list) % CONCURRENT_TASKS == 0 # the last iteration will have no tasks if tasks: loop.run_until_complete(asyncio.wait(tasks)) completed_tasks += tasks exceptions = [ task.exception() for task in completed_tasks if task.exception() ] for exc in exceptions: LOGGER.error(str(exc)) return exceptions