| |
| |
| |
| |
|
|
| import sys |
|
|
| __all__ = [ |
| "stream_copy", |
| "join_path", |
| "to_native_path_linux", |
| "join_path_native", |
| "Stats", |
| "IndexFileSHA1Writer", |
| "IterableObj", |
| "IterableList", |
| "BlockingLockFile", |
| "LockFile", |
| "Actor", |
| "get_user_id", |
| "assure_directory_exists", |
| "RemoteProgress", |
| "CallableRemoteProgress", |
| "rmtree", |
| "unbare_repo", |
| "HIDE_WINDOWS_KNOWN_ERRORS", |
| ] |
|
|
| if sys.platform == "win32": |
| __all__.append("to_native_path_windows") |
|
|
| from abc import abstractmethod |
| import contextlib |
| from functools import wraps |
| import getpass |
| import logging |
| import os |
| import os.path as osp |
| import pathlib |
| import platform |
| import re |
| import shutil |
| import stat |
| import subprocess |
| import time |
| from urllib.parse import urlsplit, urlunsplit |
| import warnings |
|
|
| |
| |
| |
| |
| |
| from gitdb.util import ( |
| LazyMixin, |
| LockedFD, |
| bin_to_hex, |
| file_contents_ro, |
| file_contents_ro_filepath, |
| hex_to_bin, |
| make_sha, |
| to_bin_sha, |
| to_hex_sha, |
| ) |
|
|
| |
|
|
| from typing import ( |
| Any, |
| AnyStr, |
| BinaryIO, |
| Callable, |
| Dict, |
| Generator, |
| IO, |
| Iterator, |
| List, |
| Optional, |
| Pattern, |
| Sequence, |
| Tuple, |
| TYPE_CHECKING, |
| TypeVar, |
| Union, |
| cast, |
| overload, |
| ) |
|
|
| if TYPE_CHECKING: |
| from git.cmd import Git |
| from git.config import GitConfigParser, SectionConstraint |
| from git.remote import Remote |
| from git.repo.base import Repo |
|
|
| from git.types import ( |
| Files_TD, |
| Has_id_attribute, |
| HSH_TD, |
| Literal, |
| PathLike, |
| Protocol, |
| SupportsIndex, |
| Total_TD, |
| runtime_checkable, |
| ) |
|
|
| |
|
|
| T_IterableObj = TypeVar("T_IterableObj", bound=Union["IterableObj", "Has_id_attribute"], covariant=True) |
| |
|
|
| _logger = logging.getLogger(__name__) |
|
|
|
|
| def _read_env_flag(name: str, default: bool) -> bool: |
| """Read a boolean flag from an environment variable. |
| |
| :return: |
| The flag, or the `default` value if absent or ambiguous. |
| """ |
| try: |
| value = os.environ[name] |
| except KeyError: |
| return default |
|
|
| _logger.warning( |
| "The %s environment variable is deprecated. Its effect has never been documented and changes without warning.", |
| name, |
| ) |
|
|
| adjusted_value = value.strip().lower() |
|
|
| if adjusted_value in {"", "0", "false", "no"}: |
| return False |
| if adjusted_value in {"1", "true", "yes"}: |
| return True |
| _logger.warning("%s has unrecognized value %r, treating as %r.", name, value, default) |
| return default |
|
|
|
|
| def _read_win_env_flag(name: str, default: bool) -> bool: |
| """Read a boolean flag from an environment variable on Windows. |
| |
| :return: |
| On Windows, the flag, or the `default` value if absent or ambiguous. |
| On all other operating systems, ``False``. |
| |
| :note: |
| This only accesses the environment on Windows. |
| """ |
| return sys.platform == "win32" and _read_env_flag(name, default) |
|
|
|
|
| |
| |
| |
| HIDE_WINDOWS_KNOWN_ERRORS = _read_win_env_flag("HIDE_WINDOWS_KNOWN_ERRORS", True) |
| HIDE_WINDOWS_FREEZE_ERRORS = _read_win_env_flag("HIDE_WINDOWS_FREEZE_ERRORS", True) |
|
|
| |
|
|
| T = TypeVar("T") |
|
|
|
|
| def unbare_repo(func: Callable[..., T]) -> Callable[..., T]: |
| """Methods with this decorator raise :exc:`~git.exc.InvalidGitRepositoryError` if |
| they encounter a bare repository.""" |
|
|
| from .exc import InvalidGitRepositoryError |
|
|
| @wraps(func) |
| def wrapper(self: "Remote", *args: Any, **kwargs: Any) -> T: |
| if self.repo.bare: |
| raise InvalidGitRepositoryError("Method '%s' cannot operate on bare repositories" % func.__name__) |
| |
| return func(self, *args, **kwargs) |
|
|
| |
|
|
| return wrapper |
|
|
|
|
| @contextlib.contextmanager |
| def cwd(new_dir: PathLike) -> Generator[PathLike, None, None]: |
| """Context manager to temporarily change directory. |
| |
| This is similar to :func:`contextlib.chdir` introduced in Python 3.11, but the |
| context manager object returned by a single call to this function is not reentrant. |
| """ |
| old_dir = os.getcwd() |
| os.chdir(new_dir) |
| try: |
| yield new_dir |
| finally: |
| os.chdir(old_dir) |
|
|
|
|
| @contextlib.contextmanager |
| def patch_env(name: str, value: str) -> Generator[None, None, None]: |
| """Context manager to temporarily patch an environment variable.""" |
| old_value = os.getenv(name) |
| os.environ[name] = value |
| try: |
| yield |
| finally: |
| if old_value is None: |
| del os.environ[name] |
| else: |
| os.environ[name] = old_value |
|
|
|
|
| def rmtree(path: PathLike) -> None: |
| """Remove the given directory tree recursively. |
| |
| :note: |
| We use :func:`shutil.rmtree` but adjust its behaviour to see whether files that |
| couldn't be deleted are read-only. Windows will not remove them in that case. |
| """ |
|
|
| def handler(function: Callable, path: PathLike, _excinfo: Any) -> None: |
| """Callback for :func:`shutil.rmtree`. |
| |
| This works as either a ``onexc`` or ``onerror`` style callback. |
| """ |
| |
| os.chmod(path, stat.S_IWUSR) |
|
|
| try: |
| function(path) |
| except PermissionError as ex: |
| if HIDE_WINDOWS_KNOWN_ERRORS: |
| from unittest import SkipTest |
|
|
| raise SkipTest(f"FIXME: fails with: PermissionError\n {ex}") from ex |
| raise |
|
|
| if sys.platform != "win32": |
| shutil.rmtree(path) |
| elif sys.version_info >= (3, 12): |
| shutil.rmtree(path, onexc=handler) |
| else: |
| shutil.rmtree(path, onerror=handler) |
|
|
|
|
| def rmfile(path: PathLike) -> None: |
| """Ensure file deleted also on *Windows* where read-only files need special |
| treatment.""" |
| if osp.isfile(path): |
| if sys.platform == "win32": |
| os.chmod(path, 0o777) |
| os.remove(path) |
|
|
|
|
| def stream_copy(source: BinaryIO, destination: BinaryIO, chunk_size: int = 512 * 1024) -> int: |
| """Copy all data from the `source` stream into the `destination` stream in chunks |
| of size `chunk_size`. |
| |
| :return: |
| Number of bytes written |
| """ |
| br = 0 |
| while True: |
| chunk = source.read(chunk_size) |
| destination.write(chunk) |
| br += len(chunk) |
| if len(chunk) < chunk_size: |
| break |
| |
| return br |
|
|
|
|
| def join_path(a: PathLike, *p: PathLike) -> PathLike: |
| R"""Join path tokens together similar to osp.join, but always use ``/`` instead of |
| possibly ``\`` on Windows.""" |
| path = str(a) |
| for b in p: |
| b = str(b) |
| if not b: |
| continue |
| if b.startswith("/"): |
| path += b[1:] |
| elif path == "" or path.endswith("/"): |
| path += b |
| else: |
| path += "/" + b |
| |
| return path |
|
|
|
|
| if sys.platform == "win32": |
|
|
| def to_native_path_windows(path: PathLike) -> PathLike: |
| path = str(path) |
| return path.replace("/", "\\") |
|
|
| def to_native_path_linux(path: PathLike) -> str: |
| path = str(path) |
| return path.replace("\\", "/") |
|
|
| to_native_path = to_native_path_windows |
| else: |
| |
| def to_native_path_linux(path: PathLike) -> str: |
| return str(path) |
|
|
| to_native_path = to_native_path_linux |
|
|
|
|
| def join_path_native(a: PathLike, *p: PathLike) -> PathLike: |
| R"""Like :func:`join_path`, but makes sure an OS native path is returned. |
| |
| This is only needed to play it safe on Windows and to ensure nice paths that only |
| use ``\``. |
| """ |
| return to_native_path(join_path(a, *p)) |
|
|
|
|
| def assure_directory_exists(path: PathLike, is_file: bool = False) -> bool: |
| """Make sure that the directory pointed to by path exists. |
| |
| :param is_file: |
| If ``True``, `path` is assumed to be a file and handled correctly. |
| Otherwise it must be a directory. |
| |
| :return: |
| ``True`` if the directory was created, ``False`` if it already existed. |
| """ |
| if is_file: |
| path = osp.dirname(path) |
| |
| if not osp.isdir(path): |
| os.makedirs(path, exist_ok=True) |
| return True |
| return False |
|
|
|
|
| def _get_exe_extensions() -> Sequence[str]: |
| PATHEXT = os.environ.get("PATHEXT", None) |
| if PATHEXT: |
| return tuple(p.upper() for p in PATHEXT.split(os.pathsep)) |
| elif sys.platform == "win32": |
| return (".BAT", ".COM", ".EXE") |
| else: |
| return () |
|
|
|
|
| def py_where(program: str, path: Optional[PathLike] = None) -> List[str]: |
| """Perform a path search to assist :func:`is_cygwin_git`. |
| |
| This is not robust for general use. It is an implementation detail of |
| :func:`is_cygwin_git`. When a search following all shell rules is needed, |
| :func:`shutil.which` can be used instead. |
| |
| :note: |
| Neither this function nor :func:`shutil.which` will predict the effect of an |
| executable search on a native Windows system due to a :class:`subprocess.Popen` |
| call without ``shell=True``, because shell and non-shell executable search on |
| Windows differ considerably. |
| """ |
| |
| winprog_exts = _get_exe_extensions() |
|
|
| def is_exec(fpath: str) -> bool: |
| return ( |
| osp.isfile(fpath) |
| and os.access(fpath, os.X_OK) |
| and ( |
| sys.platform != "win32" or not winprog_exts or any(fpath.upper().endswith(ext) for ext in winprog_exts) |
| ) |
| ) |
|
|
| progs = [] |
| if not path: |
| path = os.environ["PATH"] |
| for folder in str(path).split(os.pathsep): |
| folder = folder.strip('"') |
| if folder: |
| exe_path = osp.join(folder, program) |
| for f in [exe_path] + ["%s%s" % (exe_path, e) for e in winprog_exts]: |
| if is_exec(f): |
| progs.append(f) |
| return progs |
|
|
|
|
| def _cygexpath(drive: Optional[str], path: str) -> str: |
| if osp.isabs(path) and not drive: |
| |
| |
| p = path |
| else: |
| p = path and osp.normpath(osp.expandvars(osp.expanduser(path))) |
| if osp.isabs(p): |
| if drive: |
| |
| p = path |
| else: |
| p = cygpath(p) |
| elif drive: |
| p = "/proc/cygdrive/%s/%s" % (drive.lower(), p) |
| p_str = str(p) |
| return p_str.replace("\\", "/") |
|
|
|
|
| _cygpath_parsers: Tuple[Tuple[Pattern[str], Callable, bool], ...] = ( |
| |
| |
| ( |
| re.compile(r"\\\\\?\\UNC\\([^\\]+)\\([^\\]+)(?:\\(.*))?"), |
| (lambda server, share, rest_path: "//%s/%s/%s" % (server, share, rest_path.replace("\\", "/"))), |
| False, |
| ), |
| (re.compile(r"\\\\\?\\(\w):[/\\](.*)"), (_cygexpath), False), |
| (re.compile(r"(\w):[/\\](.*)"), (_cygexpath), False), |
| (re.compile(r"file:(.*)", re.I), (lambda rest_path: rest_path), True), |
| (re.compile(r"(\w{2,}:.*)"), (lambda url: url), False), |
| ) |
|
|
|
|
| def cygpath(path: str) -> str: |
| """Use :meth:`git.cmd.Git.polish_url` instead, that works on any environment.""" |
| path = str(path) |
| |
| if not path.startswith(("/cygdrive", "//", "/proc/cygdrive")): |
| for regex, parser, recurse in _cygpath_parsers: |
| match = regex.match(path) |
| if match: |
| path = parser(*match.groups()) |
| if recurse: |
| path = cygpath(path) |
| break |
| else: |
| path = _cygexpath(None, path) |
|
|
| return path |
|
|
|
|
| _decygpath_regex = re.compile(r"(?:/proc)?/cygdrive/(\w)(/.*)?") |
|
|
|
|
| def decygpath(path: PathLike) -> str: |
| path = str(path) |
| m = _decygpath_regex.match(path) |
| if m: |
| drive, rest_path = m.groups() |
| path = "%s:%s" % (drive.upper(), rest_path or "") |
|
|
| return path.replace("/", "\\") |
|
|
|
|
| |
| |
| _is_cygwin_cache: Dict[str, Optional[bool]] = {} |
|
|
|
|
| def _is_cygwin_git(git_executable: str) -> bool: |
| is_cygwin = _is_cygwin_cache.get(git_executable) |
| if is_cygwin is None: |
| is_cygwin = False |
| try: |
| git_dir = osp.dirname(git_executable) |
| if not git_dir: |
| res = py_where(git_executable) |
| git_dir = osp.dirname(res[0]) if res else "" |
|
|
| |
| uname_cmd = osp.join(git_dir, "uname") |
| process = subprocess.Popen([uname_cmd], stdout=subprocess.PIPE, universal_newlines=True) |
| uname_out, _ = process.communicate() |
| |
| is_cygwin = "CYGWIN" in uname_out |
| except Exception as ex: |
| _logger.debug("Failed checking if running in CYGWIN due to: %r", ex) |
| _is_cygwin_cache[git_executable] = is_cygwin |
|
|
| return is_cygwin |
|
|
|
|
| @overload |
| def is_cygwin_git(git_executable: None) -> Literal[False]: ... |
|
|
|
|
| @overload |
| def is_cygwin_git(git_executable: PathLike) -> bool: ... |
|
|
|
|
| def is_cygwin_git(git_executable: Union[None, PathLike]) -> bool: |
| if sys.platform == "win32": |
| return False |
| elif git_executable is None: |
| return False |
| else: |
| return _is_cygwin_git(str(git_executable)) |
|
|
|
|
| def get_user_id() -> str: |
| """:return: String identifying the currently active system user as ``name@node``""" |
| return "%s@%s" % (getpass.getuser(), platform.node()) |
|
|
|
|
| def finalize_process(proc: Union[subprocess.Popen, "Git.AutoInterrupt"], **kwargs: Any) -> None: |
| """Wait for the process (clone, fetch, pull or push) and handle its errors |
| accordingly.""" |
| |
| proc.wait(**kwargs) |
|
|
|
|
| @overload |
| def expand_path(p: None, expand_vars: bool = ...) -> None: ... |
|
|
|
|
| @overload |
| def expand_path(p: PathLike, expand_vars: bool = ...) -> str: |
| |
| ... |
|
|
|
|
| def expand_path(p: Union[None, PathLike], expand_vars: bool = True) -> Optional[PathLike]: |
| if isinstance(p, pathlib.Path): |
| return p.resolve() |
| try: |
| p = osp.expanduser(p) |
| if expand_vars: |
| p = osp.expandvars(p) |
| return osp.normpath(osp.abspath(p)) |
| except Exception: |
| return None |
|
|
|
|
| def remove_password_if_present(cmdline: Sequence[str]) -> List[str]: |
| """Parse any command line argument and if one of the elements is an URL with a |
| username and/or password, replace them by stars (in-place). |
| |
| If nothing is found, this just returns the command line as-is. |
| |
| This should be used for every log line that print a command line, as well as |
| exception messages. |
| """ |
| new_cmdline = [] |
| for index, to_parse in enumerate(cmdline): |
| new_cmdline.append(to_parse) |
| try: |
| url = urlsplit(to_parse) |
| |
| if url.password is None and url.username is None: |
| continue |
|
|
| if url.password is not None: |
| url = url._replace(netloc=url.netloc.replace(url.password, "*****")) |
| if url.username is not None: |
| url = url._replace(netloc=url.netloc.replace(url.username, "*****")) |
| new_cmdline[index] = urlunsplit(url) |
| except ValueError: |
| |
| continue |
| return new_cmdline |
|
|
|
|
| |
|
|
| |
|
|
|
|
| class RemoteProgress: |
| """Handler providing an interface to parse progress information emitted by |
| :manpage:`git-push(1)` and :manpage:`git-fetch(1)` and to dispatch callbacks |
| allowing subclasses to react to the progress.""" |
|
|
| _num_op_codes: int = 9 |
| ( |
| BEGIN, |
| END, |
| COUNTING, |
| COMPRESSING, |
| WRITING, |
| RECEIVING, |
| RESOLVING, |
| FINDING_SOURCES, |
| CHECKING_OUT, |
| ) = [1 << x for x in range(_num_op_codes)] |
| STAGE_MASK = BEGIN | END |
| OP_MASK = ~STAGE_MASK |
|
|
| DONE_TOKEN = "done." |
| TOKEN_SEPARATOR = ", " |
|
|
| __slots__ = ( |
| "_cur_line", |
| "_seen_ops", |
| "error_lines", |
| "other_lines", |
| ) |
| re_op_absolute = re.compile(r"(remote: )?([\w\s]+):\s+()(\d+)()(.*)") |
| re_op_relative = re.compile(r"(remote: )?([\w\s]+):\s+(\d+)% \((\d+)/(\d+)\)(.*)") |
|
|
| def __init__(self) -> None: |
| self._seen_ops: List[int] = [] |
| self._cur_line: Optional[str] = None |
| self.error_lines: List[str] = [] |
| self.other_lines: List[str] = [] |
|
|
| def _parse_progress_line(self, line: AnyStr) -> None: |
| """Parse progress information from the given line as retrieved by |
| :manpage:`git-push(1)` or :manpage:`git-fetch(1)`. |
| |
| - Lines that do not contain progress info are stored in :attr:`other_lines`. |
| - Lines that seem to contain an error (i.e. start with ``error:`` or ``fatal:``) |
| are stored in :attr:`error_lines`. |
| """ |
| |
| |
| |
| |
| |
| if isinstance(line, bytes): |
| line_str = line.decode("utf-8") |
| else: |
| line_str = line |
| self._cur_line = line_str |
|
|
| if self._cur_line.startswith(("error:", "fatal:")): |
| self.error_lines.append(self._cur_line) |
| return |
|
|
| cur_count, max_count = None, None |
| match = self.re_op_relative.match(line_str) |
| if match is None: |
| match = self.re_op_absolute.match(line_str) |
|
|
| if not match: |
| self.line_dropped(line_str) |
| self.other_lines.append(line_str) |
| return |
| |
|
|
| op_code = 0 |
| _remote, op_name, _percent, cur_count, max_count, message = match.groups() |
|
|
| |
| if op_name == "Counting objects": |
| op_code |= self.COUNTING |
| elif op_name == "Compressing objects": |
| op_code |= self.COMPRESSING |
| elif op_name == "Writing objects": |
| op_code |= self.WRITING |
| elif op_name == "Receiving objects": |
| op_code |= self.RECEIVING |
| elif op_name == "Resolving deltas": |
| op_code |= self.RESOLVING |
| elif op_name == "Finding sources": |
| op_code |= self.FINDING_SOURCES |
| elif op_name == "Checking out files": |
| op_code |= self.CHECKING_OUT |
| else: |
| |
| |
| |
| |
| |
| |
| self.line_dropped(line_str) |
| |
| |
| return |
| |
|
|
| |
| if op_code not in self._seen_ops: |
| self._seen_ops.append(op_code) |
| op_code |= self.BEGIN |
| |
|
|
| if message is None: |
| message = "" |
| |
|
|
| message = message.strip() |
| if message.endswith(self.DONE_TOKEN): |
| op_code |= self.END |
| message = message[: -len(self.DONE_TOKEN)] |
| |
| message = message.strip(self.TOKEN_SEPARATOR) |
|
|
| self.update( |
| op_code, |
| cur_count and float(cur_count), |
| max_count and float(max_count), |
| message, |
| ) |
|
|
| def new_message_handler(self) -> Callable[[str], None]: |
| """ |
| :return: |
| A progress handler suitable for :func:`~git.cmd.handle_process_output`, |
| passing lines on to this progress handler in a suitable format. |
| """ |
|
|
| def handler(line: AnyStr) -> None: |
| return self._parse_progress_line(line.rstrip()) |
|
|
| |
|
|
| return handler |
|
|
| def line_dropped(self, line: str) -> None: |
| """Called whenever a line could not be understood and was therefore dropped.""" |
| pass |
|
|
| def update( |
| self, |
| op_code: int, |
| cur_count: Union[str, float], |
| max_count: Union[str, float, None] = None, |
| message: str = "", |
| ) -> None: |
| """Called whenever the progress changes. |
| |
| :param op_code: |
| Integer allowing to be compared against Operation IDs and stage IDs. |
| |
| Stage IDs are :const:`BEGIN` and :const:`END`. :const:`BEGIN` will only be |
| set once for each Operation ID as well as :const:`END`. It may be that |
| :const:`BEGIN` and :const:`END` are set at once in case only one progress |
| message was emitted due to the speed of the operation. Between |
| :const:`BEGIN` and :const:`END`, none of these flags will be set. |
| |
| Operation IDs are all held within the :const:`OP_MASK`. Only one Operation |
| ID will be active per call. |
| |
| :param cur_count: |
| Current absolute count of items. |
| |
| :param max_count: |
| The maximum count of items we expect. It may be ``None`` in case there is no |
| maximum number of items or if it is (yet) unknown. |
| |
| :param message: |
| In case of the :const:`WRITING` operation, it contains the amount of bytes |
| transferred. It may possibly be used for other purposes as well. |
| |
| :note: |
| You may read the contents of the current line in |
| :attr:`self._cur_line <_cur_line>`. |
| """ |
| pass |
|
|
|
|
| class CallableRemoteProgress(RemoteProgress): |
| """A :class:`RemoteProgress` implementation forwarding updates to any callable. |
| |
| :note: |
| Like direct instances of :class:`RemoteProgress`, instances of this |
| :class:`CallableRemoteProgress` class are not themselves directly callable. |
| Rather, instances of this class wrap a callable and forward to it. This should |
| therefore not be confused with :class:`git.types.CallableProgress`. |
| """ |
|
|
| __slots__ = ("_callable",) |
|
|
| def __init__(self, fn: Callable) -> None: |
| self._callable = fn |
| super().__init__() |
|
|
| def update(self, *args: Any, **kwargs: Any) -> None: |
| self._callable(*args, **kwargs) |
|
|
|
|
| class Actor: |
| """Actors hold information about a person acting on the repository. They can be |
| committers and authors or anything with a name and an email as mentioned in the git |
| log entries.""" |
|
|
| |
| name_only_regex = re.compile(r"<(.*)>") |
| name_email_regex = re.compile(r"(.*) <(.*?)>") |
|
|
| |
| |
| env_author_name = "GIT_AUTHOR_NAME" |
| env_author_email = "GIT_AUTHOR_EMAIL" |
| env_committer_name = "GIT_COMMITTER_NAME" |
| env_committer_email = "GIT_COMMITTER_EMAIL" |
|
|
| |
| conf_name = "name" |
| conf_email = "email" |
|
|
| __slots__ = ("name", "email") |
|
|
| def __init__(self, name: Optional[str], email: Optional[str]) -> None: |
| self.name = name |
| self.email = email |
|
|
| def __eq__(self, other: Any) -> bool: |
| return self.name == other.name and self.email == other.email |
|
|
| def __ne__(self, other: Any) -> bool: |
| return not (self == other) |
|
|
| def __hash__(self) -> int: |
| return hash((self.name, self.email)) |
|
|
| def __str__(self) -> str: |
| return self.name if self.name else "" |
|
|
| def __repr__(self) -> str: |
| return '<git.Actor "%s <%s>">' % (self.name, self.email) |
|
|
| @classmethod |
| def _from_string(cls, string: str) -> "Actor": |
| """Create an :class:`Actor` from a string. |
| |
| :param string: |
| The string, which is expected to be in regular git format:: |
| |
| John Doe <jdoe@example.com> |
| |
| :return: |
| :class:`Actor` |
| """ |
| m = cls.name_email_regex.search(string) |
| if m: |
| name, email = m.groups() |
| return Actor(name, email) |
| else: |
| m = cls.name_only_regex.search(string) |
| if m: |
| return Actor(m.group(1), None) |
| |
| return Actor(string, None) |
| |
| |
|
|
| @classmethod |
| def _main_actor( |
| cls, |
| env_name: str, |
| env_email: str, |
| config_reader: Union[None, "GitConfigParser", "SectionConstraint"] = None, |
| ) -> "Actor": |
| actor = Actor("", "") |
| user_id = None |
|
|
| def default_email() -> str: |
| nonlocal user_id |
| if not user_id: |
| user_id = get_user_id() |
| return user_id |
|
|
| def default_name() -> str: |
| return default_email().split("@")[0] |
|
|
| for attr, evar, cvar, default in ( |
| ("name", env_name, cls.conf_name, default_name), |
| ("email", env_email, cls.conf_email, default_email), |
| ): |
| try: |
| val = os.environ[evar] |
| setattr(actor, attr, val) |
| except KeyError: |
| if config_reader is not None: |
| try: |
| val = config_reader.get("user", cvar) |
| except Exception: |
| val = default() |
| setattr(actor, attr, val) |
| |
| if not getattr(actor, attr): |
| setattr(actor, attr, default()) |
| |
| |
| return actor |
|
|
| @classmethod |
| def committer(cls, config_reader: Union[None, "GitConfigParser", "SectionConstraint"] = None) -> "Actor": |
| """ |
| :return: |
| :class:`Actor` instance corresponding to the configured committer. It |
| behaves similar to the git implementation, such that the environment will |
| override configuration values of `config_reader`. If no value is set at all, |
| it will be generated. |
| |
| :param config_reader: |
| ConfigReader to use to retrieve the values from in case they are not set in |
| the environment. |
| """ |
| return cls._main_actor(cls.env_committer_name, cls.env_committer_email, config_reader) |
|
|
| @classmethod |
| def author(cls, config_reader: Union[None, "GitConfigParser", "SectionConstraint"] = None) -> "Actor": |
| """Same as :meth:`committer`, but defines the main author. It may be specified |
| in the environment, but defaults to the committer.""" |
| return cls._main_actor(cls.env_author_name, cls.env_author_email, config_reader) |
|
|
|
|
| class Stats: |
| """Represents stat information as presented by git at the end of a merge. It is |
| created from the output of a diff operation. |
| |
| Example:: |
| |
| c = Commit( sha1 ) |
| s = c.stats |
| s.total # full-stat-dict |
| s.files # dict( filepath : stat-dict ) |
| |
| ``stat-dict`` |
| |
| A dictionary with the following keys and values:: |
| |
| deletions = number of deleted lines as int |
| insertions = number of inserted lines as int |
| lines = total number of lines changed as int, or deletions + insertions |
| change_type = type of change as str, A|C|D|M|R|T|U|X|B |
| |
| ``full-stat-dict`` |
| |
| In addition to the items in the stat-dict, it features additional information:: |
| |
| files = number of changed files as int |
| """ |
|
|
| __slots__ = ("total", "files") |
|
|
| def __init__(self, total: Total_TD, files: Dict[PathLike, Files_TD]) -> None: |
| self.total = total |
| self.files = files |
|
|
| @classmethod |
| def _list_from_string(cls, repo: "Repo", text: str) -> "Stats": |
| """Create a :class:`Stats` object from output retrieved by |
| :manpage:`git-diff(1)`. |
| |
| :return: |
| :class:`git.Stats` |
| """ |
|
|
| hsh: HSH_TD = { |
| "total": {"insertions": 0, "deletions": 0, "lines": 0, "files": 0}, |
| "files": {}, |
| } |
| for line in text.splitlines(): |
| (change_type, raw_insertions, raw_deletions, filename) = line.split("\t") |
| insertions = raw_insertions != "-" and int(raw_insertions) or 0 |
| deletions = raw_deletions != "-" and int(raw_deletions) or 0 |
| hsh["total"]["insertions"] += insertions |
| hsh["total"]["deletions"] += deletions |
| hsh["total"]["lines"] += insertions + deletions |
| hsh["total"]["files"] += 1 |
| files_dict: Files_TD = { |
| "insertions": insertions, |
| "deletions": deletions, |
| "lines": insertions + deletions, |
| "change_type": change_type, |
| } |
| hsh["files"][filename.strip()] = files_dict |
| return Stats(hsh["total"], hsh["files"]) |
|
|
|
|
| class IndexFileSHA1Writer: |
| """Wrapper around a file-like object that remembers the SHA1 of the data written to |
| it. It will write a sha when the stream is closed or if asked for explicitly using |
| :meth:`write_sha`. |
| |
| Only useful to the index file. |
| |
| :note: |
| Based on the dulwich project. |
| """ |
|
|
| __slots__ = ("f", "sha1") |
|
|
| def __init__(self, f: IO) -> None: |
| self.f = f |
| self.sha1 = make_sha(b"") |
|
|
| def write(self, data: AnyStr) -> int: |
| self.sha1.update(data) |
| return self.f.write(data) |
|
|
| def write_sha(self) -> bytes: |
| sha = self.sha1.digest() |
| self.f.write(sha) |
| return sha |
|
|
| def close(self) -> bytes: |
| sha = self.write_sha() |
| self.f.close() |
| return sha |
|
|
| def tell(self) -> int: |
| return self.f.tell() |
|
|
|
|
| class LockFile: |
| """Provides methods to obtain, check for, and release a file based lock which |
| should be used to handle concurrent access to the same file. |
| |
| As we are a utility class to be derived from, we only use protected methods. |
| |
| Locks will automatically be released on destruction. |
| """ |
|
|
| __slots__ = ("_file_path", "_owns_lock") |
|
|
| def __init__(self, file_path: PathLike) -> None: |
| self._file_path = file_path |
| self._owns_lock = False |
|
|
| def __del__(self) -> None: |
| self._release_lock() |
|
|
| def _lock_file_path(self) -> str: |
| """:return: Path to lockfile""" |
| return "%s.lock" % (self._file_path) |
|
|
| def _has_lock(self) -> bool: |
| """ |
| :return: |
| True if we have a lock and if the lockfile still exists |
| |
| :raise AssertionError: |
| If our lock-file does not exist. |
| """ |
| return self._owns_lock |
|
|
| def _obtain_lock_or_raise(self) -> None: |
| """Create a lock file as flag for other instances, mark our instance as |
| lock-holder. |
| |
| :raise IOError: |
| If a lock was already present or a lock file could not be written. |
| """ |
| if self._has_lock(): |
| return |
| lock_file = self._lock_file_path() |
| if osp.isfile(lock_file): |
| raise IOError( |
| "Lock for file %r did already exist, delete %r in case the lock is illegal" |
| % (self._file_path, lock_file) |
| ) |
|
|
| try: |
| with open(lock_file, mode="w"): |
| pass |
| except OSError as e: |
| raise IOError(str(e)) from e |
|
|
| self._owns_lock = True |
|
|
| def _obtain_lock(self) -> None: |
| """The default implementation will raise if a lock cannot be obtained. |
| |
| Subclasses may override this method to provide a different implementation. |
| """ |
| return self._obtain_lock_or_raise() |
|
|
| def _release_lock(self) -> None: |
| """Release our lock if we have one.""" |
| if not self._has_lock(): |
| return |
|
|
| |
| |
| lfp = self._lock_file_path() |
| try: |
| rmfile(lfp) |
| except OSError: |
| pass |
| self._owns_lock = False |
|
|
|
|
| class BlockingLockFile(LockFile): |
| """The lock file will block until a lock could be obtained, or fail after a |
| specified timeout. |
| |
| :note: |
| If the directory containing the lock was removed, an exception will be raised |
| during the blocking period, preventing hangs as the lock can never be obtained. |
| """ |
|
|
| __slots__ = ("_check_interval", "_max_block_time") |
|
|
| def __init__( |
| self, |
| file_path: PathLike, |
| check_interval_s: float = 0.3, |
| max_block_time_s: int = sys.maxsize, |
| ) -> None: |
| """Configure the instance. |
| |
| :param check_interval_s: |
| Period of time to sleep until the lock is checked the next time. |
| By default, it waits a nearly unlimited time. |
| |
| :param max_block_time_s: |
| Maximum amount of seconds we may lock. |
| """ |
| super().__init__(file_path) |
| self._check_interval = check_interval_s |
| self._max_block_time = max_block_time_s |
|
|
| def _obtain_lock(self) -> None: |
| """This method blocks until it obtained the lock, or raises :exc:`IOError` if it |
| ran out of time or if the parent directory was not available anymore. |
| |
| If this method returns, you are guaranteed to own the lock. |
| """ |
| starttime = time.time() |
| maxtime = starttime + float(self._max_block_time) |
| while True: |
| try: |
| super()._obtain_lock() |
| except IOError as e: |
| |
| |
| curtime = time.time() |
| if not osp.isdir(osp.dirname(self._lock_file_path())): |
| msg = "Directory containing the lockfile %r was not readable anymore after waiting %g seconds" % ( |
| self._lock_file_path(), |
| curtime - starttime, |
| ) |
| raise IOError(msg) from e |
| |
|
|
| if curtime >= maxtime: |
| msg = "Waited %g seconds for lock at %r" % ( |
| maxtime - starttime, |
| self._lock_file_path(), |
| ) |
| raise IOError(msg) from e |
| |
| time.sleep(self._check_interval) |
| else: |
| break |
| |
|
|
|
|
| class IterableList(List[T_IterableObj]): |
| """List of iterable objects allowing to query an object by id or by named index:: |
| |
| heads = repo.heads |
| heads.master |
| heads['master'] |
| heads[0] |
| |
| Iterable parent objects: |
| |
| * :class:`Commit <git.objects.Commit>` |
| * :class:`Submodule <git.objects.submodule.base.Submodule>` |
| * :class:`Reference <git.refs.reference.Reference>` |
| * :class:`FetchInfo <git.remote.FetchInfo>` |
| * :class:`PushInfo <git.remote.PushInfo>` |
| |
| Iterable via inheritance: |
| |
| * :class:`Head <git.refs.head.Head>` |
| * :class:`TagReference <git.refs.tag.TagReference>` |
| * :class:`RemoteReference <git.refs.remote.RemoteReference>` |
| |
| This requires an ``id_attribute`` name to be set which will be queried from its |
| contained items to have a means for comparison. |
| |
| A prefix can be specified which is to be used in case the id returned by the items |
| always contains a prefix that does not matter to the user, so it can be left out. |
| """ |
|
|
| __slots__ = ("_id_attr", "_prefix") |
|
|
| def __new__(cls, id_attr: str, prefix: str = "") -> "IterableList[T_IterableObj]": |
| return super().__new__(cls) |
|
|
| def __init__(self, id_attr: str, prefix: str = "") -> None: |
| self._id_attr = id_attr |
| self._prefix = prefix |
|
|
| def __contains__(self, attr: object) -> bool: |
| |
| try: |
| rval = list.__contains__(self, attr) |
| if rval: |
| return rval |
| except (AttributeError, TypeError): |
| pass |
| |
|
|
| |
| try: |
| getattr(self, cast(str, attr)) |
| return True |
| except (AttributeError, TypeError): |
| return False |
| |
|
|
| def __getattr__(self, attr: str) -> T_IterableObj: |
| attr = self._prefix + attr |
| for item in self: |
| if getattr(item, self._id_attr) == attr: |
| return item |
| |
| return list.__getattribute__(self, attr) |
|
|
| def __getitem__(self, index: Union[SupportsIndex, int, slice, str]) -> T_IterableObj: |
| assert isinstance(index, (int, str, slice)), "Index of IterableList should be an int or str" |
|
|
| if isinstance(index, int): |
| return list.__getitem__(self, index) |
| elif isinstance(index, slice): |
| raise ValueError("Index should be an int or str") |
| else: |
| try: |
| return getattr(self, index) |
| except AttributeError as e: |
| raise IndexError("No item found with id %r" % (self._prefix + index)) from e |
| |
|
|
| def __delitem__(self, index: Union[SupportsIndex, int, slice, str]) -> None: |
| assert isinstance(index, (int, str)), "Index of IterableList should be an int or str" |
|
|
| delindex = cast(int, index) |
| if not isinstance(index, int): |
| delindex = -1 |
| name = self._prefix + index |
| for i, item in enumerate(self): |
| if getattr(item, self._id_attr) == name: |
| delindex = i |
| break |
| |
| |
| if delindex == -1: |
| raise IndexError("Item with name %s not found" % name) |
| |
| |
| list.__delitem__(self, delindex) |
|
|
|
|
| @runtime_checkable |
| class IterableObj(Protocol): |
| """Defines an interface for iterable items, so there is a uniform way to retrieve |
| and iterate items within the git repository. |
| |
| Subclasses: |
| |
| * :class:`Submodule <git.objects.submodule.base.Submodule>` |
| * :class:`Commit <git.objects.Commit>` |
| * :class:`Reference <git.refs.reference.Reference>` |
| * :class:`PushInfo <git.remote.PushInfo>` |
| * :class:`FetchInfo <git.remote.FetchInfo>` |
| * :class:`Remote <git.remote.Remote>` |
| """ |
|
|
| __slots__ = () |
|
|
| _id_attribute_: str |
|
|
| @classmethod |
| @abstractmethod |
| def iter_items(cls, repo: "Repo", *args: Any, **kwargs: Any) -> Iterator[T_IterableObj]: |
| |
| """Find (all) items of this type. |
| |
| Subclasses can specify `args` and `kwargs` differently, and may use them for |
| filtering. However, when the method is called with no additional positional or |
| keyword arguments, subclasses are obliged to to yield all items. |
| |
| :return: |
| Iterator yielding Items |
| """ |
| raise NotImplementedError("To be implemented by Subclass") |
|
|
| @classmethod |
| def list_items(cls, repo: "Repo", *args: Any, **kwargs: Any) -> IterableList[T_IterableObj]: |
| """Find (all) items of this type and collect them into a list. |
| |
| For more information about the arguments, see :meth:`iter_items`. |
| |
| :note: |
| Favor the :meth:`iter_items` method as it will avoid eagerly collecting all |
| items. When there are many items, that can slow performance and increase |
| memory usage. |
| |
| :return: |
| list(Item,...) list of item instances |
| """ |
| out_list: IterableList = IterableList(cls._id_attribute_) |
| out_list.extend(cls.iter_items(repo, *args, **kwargs)) |
| return out_list |
|
|
|
|
| class IterableClassWatcher(type): |
| """Metaclass that issues :exc:`DeprecationWarning` when :class:`git.util.Iterable` |
| is subclassed.""" |
|
|
| def __init__(cls, name: str, bases: Tuple, clsdict: Dict) -> None: |
| for base in bases: |
| if type(base) is IterableClassWatcher: |
| warnings.warn( |
| f"GitPython Iterable subclassed by {name}." |
| " Iterable is deprecated due to naming clash since v3.1.18" |
| " and will be removed in 4.0.0." |
| " Use IterableObj instead.", |
| DeprecationWarning, |
| stacklevel=2, |
| ) |
|
|
|
|
| class Iterable(metaclass=IterableClassWatcher): |
| """Deprecated, use :class:`IterableObj` instead. |
| |
| Defines an interface for iterable items, so there is a uniform way to retrieve |
| and iterate items within the git repository. |
| """ |
|
|
| __slots__ = () |
|
|
| _id_attribute_ = "attribute that most suitably identifies your instance" |
|
|
| @classmethod |
| def iter_items(cls, repo: "Repo", *args: Any, **kwargs: Any) -> Any: |
| """Deprecated, use :class:`IterableObj` instead. |
| |
| Find (all) items of this type. |
| |
| See :meth:`IterableObj.iter_items` for details on usage. |
| |
| :return: |
| Iterator yielding Items |
| """ |
| raise NotImplementedError("To be implemented by Subclass") |
|
|
| @classmethod |
| def list_items(cls, repo: "Repo", *args: Any, **kwargs: Any) -> Any: |
| """Deprecated, use :class:`IterableObj` instead. |
| |
| Find (all) items of this type and collect them into a list. |
| |
| See :meth:`IterableObj.list_items` for details on usage. |
| |
| :return: |
| list(Item,...) list of item instances |
| """ |
| out_list: Any = IterableList(cls._id_attribute_) |
| out_list.extend(cls.iter_items(repo, *args, **kwargs)) |
| return out_list |
|
|
|
|
| |
|
|