| |
| |
| |
| |
|
|
| from __future__ import annotations |
|
|
| __all__ = ["GitMeta", "Git"] |
|
|
| import contextlib |
| import io |
| import itertools |
| import logging |
| import os |
| import re |
| import signal |
| import subprocess |
| from subprocess import DEVNULL, PIPE, Popen |
| import sys |
| from textwrap import dedent |
| import threading |
| import warnings |
|
|
| from git.compat import defenc, force_bytes, safe_decode |
| from git.exc import ( |
| CommandError, |
| GitCommandError, |
| GitCommandNotFound, |
| UnsafeOptionError, |
| UnsafeProtocolError, |
| ) |
| from git.util import ( |
| cygpath, |
| expand_path, |
| is_cygwin_git, |
| patch_env, |
| remove_password_if_present, |
| stream_copy, |
| ) |
|
|
| |
|
|
| from typing import ( |
| Any, |
| AnyStr, |
| BinaryIO, |
| Callable, |
| Dict, |
| IO, |
| Iterator, |
| List, |
| Mapping, |
| Optional, |
| Sequence, |
| TYPE_CHECKING, |
| TextIO, |
| Tuple, |
| Union, |
| cast, |
| overload, |
| ) |
|
|
| from git.types import Literal, PathLike, TBD |
|
|
| if TYPE_CHECKING: |
| from git.diff import DiffIndex |
| from git.repo.base import Repo |
|
|
| |
|
|
| execute_kwargs = { |
| "istream", |
| "with_extended_output", |
| "with_exceptions", |
| "as_process", |
| "output_stream", |
| "stdout_as_string", |
| "kill_after_timeout", |
| "with_stdout", |
| "universal_newlines", |
| "shell", |
| "env", |
| "max_chunk_size", |
| "strip_newline_in_stdout", |
| } |
|
|
| _logger = logging.getLogger(__name__) |
|
|
|
|
| |
| |
| |
| |
| |
|
|
|
|
| def handle_process_output( |
| process: "Git.AutoInterrupt" | Popen, |
| stdout_handler: Union[ |
| None, |
| Callable[[AnyStr], None], |
| Callable[[List[AnyStr]], None], |
| Callable[[bytes, "Repo", "DiffIndex"], None], |
| ], |
| stderr_handler: Union[None, Callable[[AnyStr], None], Callable[[List[AnyStr]], None]], |
| finalizer: Union[None, Callable[[Union[Popen, "Git.AutoInterrupt"]], None]] = None, |
| decode_streams: bool = True, |
| kill_after_timeout: Union[None, float] = None, |
| ) -> None: |
| R"""Register for notifications to learn that process output is ready to read, and |
| dispatch lines to the respective line handlers. |
| |
| This function returns once the finalizer returns. |
| |
| :param process: |
| :class:`subprocess.Popen` instance. |
| |
| :param stdout_handler: |
| f(stdout_line_string), or ``None``. |
| |
| :param stderr_handler: |
| f(stderr_line_string), or ``None``. |
| |
| :param finalizer: |
| f(proc) - wait for proc to finish. |
| |
| :param decode_streams: |
| Assume stdout/stderr streams are binary and decode them before pushing their |
| contents to handlers. |
| |
| This defaults to ``True``. Set it to ``False`` if: |
| |
| - ``universal_newlines == True``, as then streams are in text mode, or |
| - decoding must happen later, such as for :class:`~git.diff.Diff`\s. |
| |
| :param kill_after_timeout: |
| :class:`float` or ``None``, Default = ``None`` |
| |
| To specify a timeout in seconds for the git command, after which the process |
| should be killed. |
| """ |
|
|
| |
| def pump_stream( |
| cmdline: List[str], |
| name: str, |
| stream: Union[BinaryIO, TextIO], |
| is_decode: bool, |
| handler: Union[None, Callable[[Union[bytes, str]], None]], |
| ) -> None: |
| try: |
| for line in stream: |
| if handler: |
| if is_decode: |
| assert isinstance(line, bytes) |
| line_str = line.decode(defenc) |
| handler(line_str) |
| else: |
| handler(line) |
|
|
| except Exception as ex: |
| _logger.error(f"Pumping {name!r} of cmd({remove_password_if_present(cmdline)}) failed due to: {ex!r}") |
| if "I/O operation on closed file" not in str(ex): |
| |
| raise CommandError([f"<{name}-pump>"] + remove_password_if_present(cmdline), ex) from ex |
| finally: |
| stream.close() |
|
|
| if hasattr(process, "proc"): |
| process = cast("Git.AutoInterrupt", process) |
| cmdline: str | Tuple[str, ...] | List[str] = getattr(process.proc, "args", "") |
| p_stdout = process.proc.stdout if process.proc else None |
| p_stderr = process.proc.stderr if process.proc else None |
| else: |
| process = cast(Popen, process) |
| cmdline = getattr(process, "args", "") |
| p_stdout = process.stdout |
| p_stderr = process.stderr |
|
|
| if not isinstance(cmdline, (tuple, list)): |
| cmdline = cmdline.split() |
|
|
| pumps: List[Tuple[str, IO, Callable[..., None] | None]] = [] |
| if p_stdout: |
| pumps.append(("stdout", p_stdout, stdout_handler)) |
| if p_stderr: |
| pumps.append(("stderr", p_stderr, stderr_handler)) |
|
|
| threads: List[threading.Thread] = [] |
|
|
| for name, stream, handler in pumps: |
| t = threading.Thread(target=pump_stream, args=(cmdline, name, stream, decode_streams, handler)) |
| t.daemon = True |
| t.start() |
| threads.append(t) |
|
|
| |
| for t in threads: |
| t.join(timeout=kill_after_timeout) |
| if t.is_alive(): |
| if isinstance(process, Git.AutoInterrupt): |
| process._terminate() |
| else: |
| raise RuntimeError( |
| "Thread join() timed out in cmd.handle_process_output()." |
| f" kill_after_timeout={kill_after_timeout} seconds" |
| ) |
| if stderr_handler: |
| error_str: Union[str, bytes] = ( |
| "error: process killed because it timed out." f" kill_after_timeout={kill_after_timeout} seconds" |
| ) |
| if not decode_streams and isinstance(p_stderr, BinaryIO): |
| |
| error_str = cast(str, error_str) |
| error_str = error_str.encode() |
| |
| |
| stderr_handler(error_str) |
|
|
| if finalizer: |
| finalizer(process) |
|
|
|
|
| safer_popen: Callable[..., Popen] |
|
|
| if sys.platform == "win32": |
|
|
| def _safer_popen_windows( |
| command: Union[str, Sequence[Any]], |
| *, |
| shell: bool = False, |
| env: Optional[Mapping[str, str]] = None, |
| **kwargs: Any, |
| ) -> Popen: |
| """Call :class:`subprocess.Popen` on Windows but don't include a CWD in the |
| search. |
| |
| This avoids an untrusted search path condition where a file like ``git.exe`` in |
| a malicious repository would be run when GitPython operates on the repository. |
| The process using GitPython may have an untrusted repository's working tree as |
| its current working directory. Some operations may temporarily change to that |
| directory before running a subprocess. In addition, while by default GitPython |
| does not run external commands with a shell, it can be made to do so, in which |
| case the CWD of the subprocess, which GitPython usually sets to a repository |
| working tree, can itself be searched automatically by the shell. This wrapper |
| covers all those cases. |
| |
| :note: |
| This currently works by setting the |
| :envvar:`NoDefaultCurrentDirectoryInExePath` environment variable during |
| subprocess creation. It also takes care of passing Windows-specific process |
| creation flags, but that is unrelated to path search. |
| |
| :note: |
| The current implementation contains a race condition on :attr:`os.environ`. |
| GitPython isn't thread-safe, but a program using it on one thread should |
| ideally be able to mutate :attr:`os.environ` on another, without |
| unpredictable results. See comments in: |
| https://github.com/gitpython-developers/GitPython/pull/1650 |
| """ |
| |
| |
| |
| creationflags = subprocess.CREATE_NO_WINDOW | subprocess.CREATE_NEW_PROCESS_GROUP |
|
|
| |
| |
| if shell: |
| |
| env = {} if env is None else dict(env) |
| env["NoDefaultCurrentDirectoryInExePath"] = "1" |
|
|
| |
| |
| |
| |
| |
| |
| |
| with patch_env("NoDefaultCurrentDirectoryInExePath", "1"): |
| return Popen( |
| command, |
| shell=shell, |
| env=env, |
| creationflags=creationflags, |
| **kwargs, |
| ) |
|
|
| safer_popen = _safer_popen_windows |
| else: |
| safer_popen = Popen |
|
|
|
|
| def dashify(string: str) -> str: |
| return string.replace("_", "-") |
|
|
|
|
| def slots_to_dict(self: "Git", exclude: Sequence[str] = ()) -> Dict[str, Any]: |
| return {s: getattr(self, s) for s in self.__slots__ if s not in exclude} |
|
|
|
|
| def dict_to_slots_and__excluded_are_none(self: object, d: Mapping[str, Any], excluded: Sequence[str] = ()) -> None: |
| for k, v in d.items(): |
| setattr(self, k, v) |
| for k in excluded: |
| setattr(self, k, None) |
|
|
|
|
| |
|
|
| _USE_SHELL_DEFAULT_MESSAGE = ( |
| "Git.USE_SHELL is deprecated, because only its default value of False is safe. " |
| "It will be removed in a future release." |
| ) |
|
|
| _USE_SHELL_DANGER_MESSAGE = ( |
| "Setting Git.USE_SHELL to True is unsafe and insecure, as the effect of special " |
| "shell syntax cannot usually be accounted for. This can result in a command " |
| "injection vulnerability and arbitrary code execution. Git.USE_SHELL is deprecated " |
| "and will be removed in a future release." |
| ) |
|
|
|
|
| def _warn_use_shell(extra_danger: bool) -> None: |
| warnings.warn( |
| _USE_SHELL_DANGER_MESSAGE if extra_danger else _USE_SHELL_DEFAULT_MESSAGE, |
| DeprecationWarning, |
| stacklevel=3, |
| ) |
|
|
|
|
| class _GitMeta(type): |
| """Metaclass for :class:`Git`. |
| |
| This helps issue :class:`DeprecationWarning` if :attr:`Git.USE_SHELL` is used. |
| """ |
|
|
| def __getattribute(cls, name: str) -> Any: |
| if name == "USE_SHELL": |
| _warn_use_shell(False) |
| return super().__getattribute__(name) |
|
|
| def __setattr(cls, name: str, value: Any) -> Any: |
| if name == "USE_SHELL": |
| _warn_use_shell(value) |
| super().__setattr__(name, value) |
|
|
| if not TYPE_CHECKING: |
| |
| |
| |
| |
| __getattribute__ = __getattribute |
| __setattr__ = __setattr |
|
|
|
|
| GitMeta = _GitMeta |
| """Alias of :class:`Git`'s metaclass, whether it is :class:`type` or a custom metaclass. |
| |
| Whether the :class:`Git` class has the default :class:`type` as its metaclass or uses a |
| custom metaclass is not documented and may change at any time. This statically checkable |
| metaclass alias is equivalent at runtime to ``type(Git)``. This should almost never be |
| used. Code that benefits from it is likely to be remain brittle even if it is used. |
| |
| In view of the :class:`Git` class's intended use and :class:`Git` objects' dynamic |
| callable attributes representing git subcommands, it rarely makes sense to inherit from |
| :class:`Git` at all. Using :class:`Git` in multiple inheritance can be especially tricky |
| to do correctly. Attempting uses of :class:`Git` where its metaclass is relevant, such |
| as when a sibling class has an unrelated metaclass and a shared lower bound metaclass |
| might have to be introduced to solve a metaclass conflict, is not recommended. |
| |
| :note: |
| The correct static type of the :class:`Git` class itself, and any subclasses, is |
| ``Type[Git]``. (This can be written as ``type[Git]`` in Python 3.9 later.) |
| |
| :class:`GitMeta` should never be used in any annotation where ``Type[Git]`` is |
| intended or otherwise possible to use. This alias is truly only for very rare and |
| inherently precarious situations where it is necessary to deal with the metaclass |
| explicitly. |
| """ |
|
|
|
|
| class Git(metaclass=_GitMeta): |
| """The Git class manages communication with the Git binary. |
| |
| It provides a convenient interface to calling the Git binary, such as in:: |
| |
| g = Git( git_dir ) |
| g.init() # calls 'git init' program |
| rval = g.ls_files() # calls 'git ls-files' program |
| |
| Debugging: |
| |
| * Set the :envvar:`GIT_PYTHON_TRACE` environment variable to print each invocation |
| of the command to stdout. |
| * Set its value to ``full`` to see details about the returned values. |
| """ |
|
|
| __slots__ = ( |
| "_working_dir", |
| "cat_file_all", |
| "cat_file_header", |
| "_version_info", |
| "_version_info_token", |
| "_git_options", |
| "_persistent_git_options", |
| "_environment", |
| ) |
|
|
| _excluded_ = ( |
| "cat_file_all", |
| "cat_file_header", |
| "_version_info", |
| "_version_info_token", |
| ) |
|
|
| re_unsafe_protocol = re.compile(r"(.+)::.+") |
|
|
| def __getstate__(self) -> Dict[str, Any]: |
| return slots_to_dict(self, exclude=self._excluded_) |
|
|
| def __setstate__(self, d: Dict[str, Any]) -> None: |
| dict_to_slots_and__excluded_are_none(self, d, excluded=self._excluded_) |
|
|
| |
|
|
| git_exec_name = "git" |
| """Default git command that should work on Linux, Windows, and other systems.""" |
|
|
| GIT_PYTHON_TRACE = os.environ.get("GIT_PYTHON_TRACE", False) |
| """Enables debugging of GitPython's git commands.""" |
|
|
| USE_SHELL: bool = False |
| """Deprecated. If set to ``True``, a shell will be used when executing git commands. |
| |
| Code that uses ``USE_SHELL = True`` or that passes ``shell=True`` to any GitPython |
| functions should be updated to use the default value of ``False`` instead. ``True`` |
| is unsafe unless the effect of syntax treated specially by the shell is fully |
| considered and accounted for, which is not possible under most circumstances. As |
| detailed below, it is also no longer needed, even where it had been in the past. |
| |
| It is in many if not most cases a command injection vulnerability for an application |
| to set :attr:`USE_SHELL` to ``True``. Any attacker who can cause a specially crafted |
| fragment of text to make its way into any part of any argument to any git command |
| (including paths, branch names, etc.) can cause the shell to read and write |
| arbitrary files and execute arbitrary commands. Innocent input may also accidentally |
| contain special shell syntax, leading to inadvertent malfunctions. |
| |
| In addition, how a value of ``True`` interacts with some aspects of GitPython's |
| operation is not precisely specified and may change without warning, even before |
| GitPython 4.0.0 when :attr:`USE_SHELL` may be removed. This includes: |
| |
| * Whether or how GitPython automatically customizes the shell environment. |
| |
| * Whether, outside of Windows (where :class:`subprocess.Popen` supports lists of |
| separate arguments even when ``shell=True``), this can be used with any GitPython |
| functionality other than direct calls to the :meth:`execute` method. |
| |
| * Whether any GitPython feature that runs git commands ever attempts to partially |
| sanitize data a shell may treat specially. Currently this is not done. |
| |
| Prior to GitPython 2.0.8, this had a narrow purpose in suppressing console windows |
| in graphical Windows applications. In 2.0.8 and higher, it provides no benefit, as |
| GitPython solves that problem more robustly and safely by using the |
| ``CREATE_NO_WINDOW`` process creation flag on Windows. |
| |
| Because Windows path search differs subtly based on whether a shell is used, in rare |
| cases changing this from ``True`` to ``False`` may keep an unusual git "executable", |
| such as a batch file, from being found. To fix this, set the command name or full |
| path in the :envvar:`GIT_PYTHON_GIT_EXECUTABLE` environment variable or pass the |
| full path to :func:`git.refresh` (or invoke the script using a ``.exe`` shim). |
| |
| Further reading: |
| |
| * :meth:`Git.execute` (on the ``shell`` parameter). |
| * https://github.com/gitpython-developers/GitPython/commit/0d9390866f9ce42870d3116094cd49e0019a970a |
| * https://learn.microsoft.com/en-us/windows/win32/procthread/process-creation-flags |
| * https://github.com/python/cpython/issues/91558#issuecomment-1100942950 |
| * https://learn.microsoft.com/en-us/windows/win32/api/processthreadsapi/nf-processthreadsapi-createprocessw |
| """ |
|
|
| _git_exec_env_var = "GIT_PYTHON_GIT_EXECUTABLE" |
| _refresh_env_var = "GIT_PYTHON_REFRESH" |
|
|
| GIT_PYTHON_GIT_EXECUTABLE = None |
| """Provide the full path to the git executable. Otherwise it assumes git is in the |
| executable search path. |
| |
| :note: |
| The git executable is actually found during the refresh step in the top level |
| ``__init__``. It can also be changed by explicitly calling :func:`git.refresh`. |
| """ |
|
|
| _refresh_token = object() |
|
|
| @classmethod |
| def refresh(cls, path: Union[None, PathLike] = None) -> bool: |
| """Update information about the git executable :class:`Git` objects will use. |
| |
| Called by the :func:`git.refresh` function in the top level ``__init__``. |
| |
| :param path: |
| Optional path to the git executable. If not absolute, it is resolved |
| immediately, relative to the current directory. (See note below.) |
| |
| :note: |
| The top-level :func:`git.refresh` should be preferred because it calls this |
| method and may also update other state accordingly. |
| |
| :note: |
| There are three different ways to specify the command that refreshing causes |
| to be used for git: |
| |
| 1. Pass no `path` argument and do not set the |
| :envvar:`GIT_PYTHON_GIT_EXECUTABLE` environment variable. The command |
| name ``git`` is used. It is looked up in a path search by the system, in |
| each command run (roughly similar to how git is found when running |
| ``git`` commands manually). This is usually the desired behavior. |
| |
| 2. Pass no `path` argument but set the :envvar:`GIT_PYTHON_GIT_EXECUTABLE` |
| environment variable. The command given as the value of that variable is |
| used. This may be a simple command or an arbitrary path. It is looked up |
| in each command run. Setting :envvar:`GIT_PYTHON_GIT_EXECUTABLE` to |
| ``git`` has the same effect as not setting it. |
| |
| 3. Pass a `path` argument. This path, if not absolute, is immediately |
| resolved, relative to the current directory. This resolution occurs at |
| the time of the refresh. When git commands are run, they are run using |
| that previously resolved path. If a `path` argument is passed, the |
| :envvar:`GIT_PYTHON_GIT_EXECUTABLE` environment variable is not |
| consulted. |
| |
| :note: |
| Refreshing always sets the :attr:`Git.GIT_PYTHON_GIT_EXECUTABLE` class |
| attribute, which can be read on the :class:`Git` class or any of its |
| instances to check what command is used to run git. This attribute should |
| not be confused with the related :envvar:`GIT_PYTHON_GIT_EXECUTABLE` |
| environment variable. The class attribute is set no matter how refreshing is |
| performed. |
| """ |
| |
| if path is not None: |
| new_git = os.path.expanduser(path) |
| new_git = os.path.abspath(new_git) |
| else: |
| new_git = os.environ.get(cls._git_exec_env_var, cls.git_exec_name) |
|
|
| |
| old_git = cls.GIT_PYTHON_GIT_EXECUTABLE |
| old_refresh_token = cls._refresh_token |
| cls.GIT_PYTHON_GIT_EXECUTABLE = new_git |
| cls._refresh_token = object() |
|
|
| |
| |
| |
| has_git = False |
| try: |
| cls().version() |
| has_git = True |
| except (GitCommandNotFound, PermissionError): |
| pass |
|
|
| |
| if not has_git: |
| err = ( |
| dedent( |
| """\ |
| Bad git executable. |
| The git executable must be specified in one of the following ways: |
| - be included in your $PATH |
| - be set via $%s |
| - explicitly set via git.refresh(<full-path-to-git-executable>) |
| """ |
| ) |
| % cls._git_exec_env_var |
| ) |
|
|
| |
| cls.GIT_PYTHON_GIT_EXECUTABLE = old_git |
| cls._refresh_token = old_refresh_token |
|
|
| if old_git is None: |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
|
|
| mode = os.environ.get(cls._refresh_env_var, "raise").lower() |
|
|
| quiet = ["quiet", "q", "silence", "s", "silent", "none", "n", "0"] |
| warn = ["warn", "w", "warning", "log", "l", "1"] |
| error = ["error", "e", "exception", "raise", "r", "2"] |
|
|
| if mode in quiet: |
| pass |
| elif mode in warn or mode in error: |
| err = dedent( |
| """\ |
| %s |
| All git commands will error until this is rectified. |
| |
| This initial message can be silenced or aggravated in the future by setting the |
| $%s environment variable. Use one of the following values: |
| - %s: for no message or exception |
| - %s: for a warning message (logging level CRITICAL, displayed by default) |
| - %s: for a raised exception |
| |
| Example: |
| export %s=%s |
| """ |
| ) % ( |
| err, |
| cls._refresh_env_var, |
| "|".join(quiet), |
| "|".join(warn), |
| "|".join(error), |
| cls._refresh_env_var, |
| quiet[0], |
| ) |
|
|
| if mode in warn: |
| _logger.critical(err) |
| else: |
| raise ImportError(err) |
| else: |
| err = dedent( |
| """\ |
| %s environment variable has been set but it has been set with an invalid value. |
| |
| Use only the following values: |
| - %s: for no message or exception |
| - %s: for a warning message (logging level CRITICAL, displayed by default) |
| - %s: for a raised exception |
| """ |
| ) % ( |
| cls._refresh_env_var, |
| "|".join(quiet), |
| "|".join(warn), |
| "|".join(error), |
| ) |
| raise ImportError(err) |
|
|
| |
| |
| |
| |
| cls.GIT_PYTHON_GIT_EXECUTABLE = cls.git_exec_name |
| else: |
| |
| |
| raise GitCommandNotFound(new_git, err) |
|
|
| return has_git |
|
|
| @classmethod |
| def is_cygwin(cls) -> bool: |
| return is_cygwin_git(cls.GIT_PYTHON_GIT_EXECUTABLE) |
|
|
| @overload |
| @classmethod |
| def polish_url(cls, url: str, is_cygwin: Literal[False] = ...) -> str: ... |
|
|
| @overload |
| @classmethod |
| def polish_url(cls, url: str, is_cygwin: Union[None, bool] = None) -> str: ... |
|
|
| @classmethod |
| def polish_url(cls, url: str, is_cygwin: Union[None, bool] = None) -> PathLike: |
| """Remove any backslashes from URLs to be written in config files. |
| |
| Windows might create config files containing paths with backslashes, but git |
| stops liking them as it will escape the backslashes. Hence we undo the escaping |
| just to be sure. |
| """ |
| if is_cygwin is None: |
| is_cygwin = cls.is_cygwin() |
|
|
| if is_cygwin: |
| url = cygpath(url) |
| else: |
| url = os.path.expandvars(url) |
| if url.startswith("~"): |
| url = os.path.expanduser(url) |
| url = url.replace("\\\\", "\\").replace("\\", "/") |
| return url |
|
|
| @classmethod |
| def check_unsafe_protocols(cls, url: str) -> None: |
| """Check for unsafe protocols. |
| |
| Apart from the usual protocols (http, git, ssh), Git allows "remote helpers" |
| that have the form ``<transport>::<address>``. One of these helpers (``ext::``) |
| can be used to invoke any arbitrary command. |
| |
| See: |
| |
| - https://git-scm.com/docs/gitremote-helpers |
| - https://git-scm.com/docs/git-remote-ext |
| """ |
| match = cls.re_unsafe_protocol.match(url) |
| if match: |
| protocol = match.group(1) |
| raise UnsafeProtocolError( |
| f"The `{protocol}::` protocol looks suspicious, use `allow_unsafe_protocols=True` to allow it." |
| ) |
|
|
| @classmethod |
| def check_unsafe_options(cls, options: List[str], unsafe_options: List[str]) -> None: |
| """Check for unsafe options. |
| |
| Some options that are passed to ``git <command>`` can be used to execute |
| arbitrary commands. These are blocked by default. |
| """ |
| |
| |
| bare_unsafe_options = [option.lstrip("-") for option in unsafe_options] |
| for option in options: |
| for unsafe_option, bare_option in zip(unsafe_options, bare_unsafe_options): |
| if option.startswith(unsafe_option) or option == bare_option: |
| raise UnsafeOptionError( |
| f"{unsafe_option} is not allowed, use `allow_unsafe_options=True` to allow it." |
| ) |
|
|
| class AutoInterrupt: |
| """Process wrapper that terminates the wrapped process on finalization. |
| |
| This kills/interrupts the stored process instance once this instance goes out of |
| scope. It is used to prevent processes piling up in case iterators stop reading. |
| |
| All attributes are wired through to the contained process object. |
| |
| The wait method is overridden to perform automatic status code checking and |
| possibly raise. |
| """ |
|
|
| __slots__ = ("proc", "args", "status") |
|
|
| |
| |
| _status_code_if_terminate: int = 0 |
|
|
| def __init__(self, proc: Union[None, subprocess.Popen], args: Any) -> None: |
| self.proc = proc |
| self.args = args |
| self.status: Union[int, None] = None |
|
|
| def _terminate(self) -> None: |
| """Terminate the underlying process.""" |
| if self.proc is None: |
| return |
|
|
| proc = self.proc |
| self.proc = None |
| if proc.stdin: |
| proc.stdin.close() |
| if proc.stdout: |
| proc.stdout.close() |
| if proc.stderr: |
| proc.stderr.close() |
| |
| try: |
| if proc.poll() is not None: |
| self.status = self._status_code_if_terminate or proc.poll() |
| return |
| except OSError as ex: |
| _logger.info("Ignored error after process had died: %r", ex) |
|
|
| |
| if os is None or getattr(os, "kill", None) is None: |
| return |
|
|
| |
| try: |
| proc.terminate() |
| status = proc.wait() |
|
|
| self.status = self._status_code_if_terminate or status |
| except OSError as ex: |
| _logger.info("Ignored error after process had died: %r", ex) |
| |
|
|
| def __del__(self) -> None: |
| self._terminate() |
|
|
| def __getattr__(self, attr: str) -> Any: |
| return getattr(self.proc, attr) |
|
|
| |
| def wait(self, stderr: Union[None, str, bytes] = b"") -> int: |
| """Wait for the process and return its status code. |
| |
| :param stderr: |
| Previously read value of stderr, in case stderr is already closed. |
| |
| :warn: |
| May deadlock if output or error pipes are used and not handled |
| separately. |
| |
| :raise git.exc.GitCommandError: |
| If the return status is not 0. |
| """ |
| if stderr is None: |
| stderr_b = b"" |
| stderr_b = force_bytes(data=stderr, encoding="utf-8") |
| status: Union[int, None] |
| if self.proc is not None: |
| status = self.proc.wait() |
| p_stderr = self.proc.stderr |
| else: |
| status = self.status |
| p_stderr = None |
|
|
| def read_all_from_possibly_closed_stream(stream: Union[IO[bytes], None]) -> bytes: |
| if stream: |
| try: |
| return stderr_b + force_bytes(stream.read()) |
| except (OSError, ValueError): |
| return stderr_b or b"" |
| else: |
| return stderr_b or b"" |
|
|
| |
|
|
| if status != 0: |
| errstr = read_all_from_possibly_closed_stream(p_stderr) |
| _logger.debug("AutoInterrupt wait stderr: %r" % (errstr,)) |
| raise GitCommandError(remove_password_if_present(self.args), status, errstr) |
| return status |
|
|
| |
|
|
| class CatFileContentStream: |
| """Object representing a sized read-only stream returning the contents of |
| an object. |
| |
| This behaves like a stream, but counts the data read and simulates an empty |
| stream once our sized content region is empty. |
| |
| If not all data are read to the end of the object's lifetime, we read the |
| rest to ensure the underlying stream continues to work. |
| """ |
|
|
| __slots__ = ("_stream", "_nbr", "_size") |
|
|
| def __init__(self, size: int, stream: IO[bytes]) -> None: |
| self._stream = stream |
| self._size = size |
| self._nbr = 0 |
|
|
| |
| |
| if size == 0: |
| stream.read(1) |
| |
|
|
| def read(self, size: int = -1) -> bytes: |
| bytes_left = self._size - self._nbr |
| if bytes_left == 0: |
| return b"" |
| if size > -1: |
| |
| size = min(bytes_left, size) |
| else: |
| |
| size = bytes_left |
| |
| data = self._stream.read(size) |
| self._nbr += len(data) |
|
|
| |
| |
| if self._size - self._nbr == 0: |
| self._stream.read(1) |
| |
| return data |
|
|
| def readline(self, size: int = -1) -> bytes: |
| if self._nbr == self._size: |
| return b"" |
|
|
| |
| bytes_left = self._size - self._nbr |
| if size > -1: |
| size = min(bytes_left, size) |
| else: |
| size = bytes_left |
| |
|
|
| data = self._stream.readline(size) |
| self._nbr += len(data) |
|
|
| |
| if self._size - self._nbr == 0: |
| self._stream.read(1) |
| |
|
|
| return data |
|
|
| def readlines(self, size: int = -1) -> List[bytes]: |
| if self._nbr == self._size: |
| return [] |
|
|
| |
| out = [] |
| nbr = 0 |
| while True: |
| line = self.readline() |
| if not line: |
| break |
| out.append(line) |
| if size > -1: |
| nbr += len(line) |
| if nbr > size: |
| break |
| |
| |
| return out |
|
|
| |
| def __iter__(self) -> "Git.CatFileContentStream": |
| return self |
|
|
| def __next__(self) -> bytes: |
| line = self.readline() |
| if not line: |
| raise StopIteration |
|
|
| return line |
|
|
| next = __next__ |
|
|
| def __del__(self) -> None: |
| bytes_left = self._size - self._nbr |
| if bytes_left: |
| |
| |
| self._stream.read(bytes_left + 1) |
| |
|
|
| def __init__(self, working_dir: Union[None, PathLike] = None) -> None: |
| """Initialize this instance with: |
| |
| :param working_dir: |
| Git directory we should work in. If ``None``, we always work in the current |
| directory as returned by :func:`os.getcwd`. |
| This is meant to be the working tree directory if available, or the |
| ``.git`` directory in case of bare repositories. |
| """ |
| super().__init__() |
| self._working_dir = expand_path(working_dir) |
| self._git_options: Union[List[str], Tuple[str, ...]] = () |
| self._persistent_git_options: List[str] = [] |
|
|
| |
| self._environment: Dict[str, str] = {} |
|
|
| |
| self._version_info: Union[Tuple[int, ...], None] = None |
| self._version_info_token: object = None |
|
|
| |
| self.cat_file_header: Union[None, TBD] = None |
| self.cat_file_all: Union[None, TBD] = None |
|
|
| def __getattribute__(self, name: str) -> Any: |
| if name == "USE_SHELL": |
| _warn_use_shell(False) |
| return super().__getattribute__(name) |
|
|
| def __getattr__(self, name: str) -> Any: |
| """A convenience method as it allows to call the command as if it was an object. |
| |
| :return: |
| Callable object that will execute call :meth:`_call_process` with your |
| arguments. |
| """ |
| if name.startswith("_"): |
| return super().__getattribute__(name) |
| return lambda *args, **kwargs: self._call_process(name, *args, **kwargs) |
|
|
| def set_persistent_git_options(self, **kwargs: Any) -> None: |
| """Specify command line options to the git executable for subsequent |
| subcommand calls. |
| |
| :param kwargs: |
| A dict of keyword arguments. |
| These arguments are passed as in :meth:`_call_process`, but will be passed |
| to the git command rather than the subcommand. |
| """ |
|
|
| self._persistent_git_options = self.transform_kwargs(split_single_char_options=True, **kwargs) |
|
|
| @property |
| def working_dir(self) -> Union[None, PathLike]: |
| """:return: Git directory we are working on""" |
| return self._working_dir |
|
|
| @property |
| def version_info(self) -> Tuple[int, ...]: |
| """ |
| :return: Tuple with integers representing the major, minor and additional |
| version numbers as parsed from :manpage:`git-version(1)`. Up to four fields |
| are used. |
| |
| This value is generated on demand and is cached. |
| """ |
| |
| refresh_token = self._refresh_token |
|
|
| |
| if self._version_info_token is refresh_token: |
| assert self._version_info is not None, "Bug: corrupted token-check state" |
| return self._version_info |
|
|
| |
| process_version = self._call_process("version") |
| version_string = process_version.split(" ")[2] |
| version_fields = version_string.split(".")[:4] |
| leading_numeric_fields = itertools.takewhile(str.isdigit, version_fields) |
| self._version_info = tuple(map(int, leading_numeric_fields)) |
|
|
| |
| self._version_info_token = refresh_token |
| return self._version_info |
|
|
| @overload |
| def execute( |
| self, |
| command: Union[str, Sequence[Any]], |
| *, |
| as_process: Literal[True], |
| ) -> "AutoInterrupt": ... |
|
|
| @overload |
| def execute( |
| self, |
| command: Union[str, Sequence[Any]], |
| *, |
| as_process: Literal[False] = False, |
| stdout_as_string: Literal[True], |
| ) -> Union[str, Tuple[int, str, str]]: ... |
|
|
| @overload |
| def execute( |
| self, |
| command: Union[str, Sequence[Any]], |
| *, |
| as_process: Literal[False] = False, |
| stdout_as_string: Literal[False] = False, |
| ) -> Union[bytes, Tuple[int, bytes, str]]: ... |
|
|
| @overload |
| def execute( |
| self, |
| command: Union[str, Sequence[Any]], |
| *, |
| with_extended_output: Literal[False], |
| as_process: Literal[False], |
| stdout_as_string: Literal[True], |
| ) -> str: ... |
|
|
| @overload |
| def execute( |
| self, |
| command: Union[str, Sequence[Any]], |
| *, |
| with_extended_output: Literal[False], |
| as_process: Literal[False], |
| stdout_as_string: Literal[False], |
| ) -> bytes: ... |
|
|
| def execute( |
| self, |
| command: Union[str, Sequence[Any]], |
| istream: Union[None, BinaryIO] = None, |
| with_extended_output: bool = False, |
| with_exceptions: bool = True, |
| as_process: bool = False, |
| output_stream: Union[None, BinaryIO] = None, |
| stdout_as_string: bool = True, |
| kill_after_timeout: Union[None, float] = None, |
| with_stdout: bool = True, |
| universal_newlines: bool = False, |
| shell: Union[None, bool] = None, |
| env: Union[None, Mapping[str, str]] = None, |
| max_chunk_size: int = io.DEFAULT_BUFFER_SIZE, |
| strip_newline_in_stdout: bool = True, |
| **subprocess_kwargs: Any, |
| ) -> Union[str, bytes, Tuple[int, Union[str, bytes], str], AutoInterrupt]: |
| R"""Handle executing the command, and consume and return the returned |
| information (stdout). |
| |
| :param command: |
| The command argument list to execute. |
| It should be a sequence of program arguments, or a string. The |
| program to execute is the first item in the args sequence or string. |
| |
| :param istream: |
| Standard input filehandle passed to :class:`subprocess.Popen`. |
| |
| :param with_extended_output: |
| Whether to return a (status, stdout, stderr) tuple. |
| |
| :param with_exceptions: |
| Whether to raise an exception when git returns a non-zero status. |
| |
| :param as_process: |
| Whether to return the created process instance directly from which |
| streams can be read on demand. This will render `with_extended_output` |
| and `with_exceptions` ineffective - the caller will have to deal with |
| the details. It is important to note that the process will be placed |
| into an :class:`AutoInterrupt` wrapper that will interrupt the process |
| once it goes out of scope. If you use the command in iterators, you |
| should pass the whole process instance instead of a single stream. |
| |
| :param output_stream: |
| If set to a file-like object, data produced by the git command will be |
| copied to the given stream instead of being returned as a string. |
| This feature only has any effect if `as_process` is ``False``. |
| |
| :param stdout_as_string: |
| If ``False``, the command's standard output will be bytes. Otherwise, it |
| will be decoded into a string using the default encoding (usually UTF-8). |
| The latter can fail, if the output contains binary data. |
| |
| :param kill_after_timeout: |
| Specifies a timeout in seconds for the git command, after which the process |
| should be killed. This will have no effect if `as_process` is set to |
| ``True``. It is set to ``None`` by default and will let the process run |
| until the timeout is explicitly specified. Uses of this feature should be |
| carefully considered, due to the following limitations: |
| |
| 1. This feature is not supported at all on Windows. |
| 2. Effectiveness may vary by operating system. ``ps --ppid`` is used to |
| enumerate child processes, which is available on most GNU/Linux systems |
| but not most others. |
| 3. Deeper descendants do not receive signals, though they may sometimes |
| terminate as a consequence of their parent processes being killed. |
| 4. `kill_after_timeout` uses ``SIGKILL``, which can have negative side |
| effects on a repository. For example, stale locks in case of |
| :manpage:`git-gc(1)` could render the repository incapable of accepting |
| changes until the lock is manually removed. |
| |
| :param with_stdout: |
| If ``True``, default ``True``, we open stdout on the created process. |
| |
| :param universal_newlines: |
| If ``True``, pipes will be opened as text, and lines are split at all known |
| line endings. |
| |
| :param shell: |
| Whether to invoke commands through a shell |
| (see :class:`Popen(..., shell=True) <subprocess.Popen>`). |
| If this is not ``None``, it overrides :attr:`USE_SHELL`. |
| |
| Passing ``shell=True`` to this or any other GitPython function should be |
| avoided, as it is unsafe under most circumstances. This is because it is |
| typically not feasible to fully consider and account for the effect of shell |
| expansions, especially when passing ``shell=True`` to other methods that |
| forward it to :meth:`Git.execute`. Passing ``shell=True`` is also no longer |
| needed (nor useful) to work around any known operating system specific |
| issues. |
| |
| :param env: |
| A dictionary of environment variables to be passed to |
| :class:`subprocess.Popen`. |
| |
| :param max_chunk_size: |
| Maximum number of bytes in one chunk of data passed to the `output_stream` |
| in one invocation of its ``write()`` method. If the given number is not |
| positive then the default value is used. |
| |
| :param strip_newline_in_stdout: |
| Whether to strip the trailing ``\n`` of the command stdout. |
| |
| :param subprocess_kwargs: |
| Keyword arguments to be passed to :class:`subprocess.Popen`. Please note |
| that some of the valid kwargs are already set by this method; the ones you |
| specify may not be the same ones. |
| |
| :return: |
| * str(output), if `extended_output` is ``False`` (Default) |
| * tuple(int(status), str(stdout), str(stderr)), |
| if `extended_output` is ``True`` |
| |
| If `output_stream` is ``True``, the stdout value will be your output stream: |
| |
| * output_stream, if `extended_output` is ``False`` |
| * tuple(int(status), output_stream, str(stderr)), |
| if `extended_output` is ``True`` |
| |
| Note that git is executed with ``LC_MESSAGES="C"`` to ensure consistent |
| output regardless of system language. |
| |
| :raise git.exc.GitCommandError: |
| |
| :note: |
| If you add additional keyword arguments to the signature of this method, you |
| must update the ``execute_kwargs`` variable housed in this module. |
| """ |
| |
| redacted_command = remove_password_if_present(command) |
| if self.GIT_PYTHON_TRACE and (self.GIT_PYTHON_TRACE != "full" or as_process): |
| _logger.info(" ".join(redacted_command)) |
|
|
| |
| try: |
| cwd = self._working_dir or os.getcwd() |
| if not os.access(str(cwd), os.X_OK): |
| cwd = None |
| except FileNotFoundError: |
| cwd = None |
|
|
| |
| inline_env = env |
| env = os.environ.copy() |
| |
| |
| |
| |
| env["LANGUAGE"] = "C" |
| env["LC_ALL"] = "C" |
| env.update(self._environment) |
| if inline_env is not None: |
| env.update(inline_env) |
|
|
| if sys.platform == "win32": |
| if kill_after_timeout is not None: |
| raise GitCommandError( |
| redacted_command, |
| '"kill_after_timeout" feature is not supported on Windows.', |
| ) |
| cmd_not_found_exception = OSError |
| else: |
| cmd_not_found_exception = FileNotFoundError |
| |
|
|
| stdout_sink = PIPE if with_stdout else getattr(subprocess, "DEVNULL", None) or open(os.devnull, "wb") |
| if shell is None: |
| |
| |
| |
| |
| |
| shell = super().__getattribute__("USE_SHELL") |
| _logger.debug( |
| "Popen(%s, cwd=%s, stdin=%s, shell=%s, universal_newlines=%s)", |
| redacted_command, |
| cwd, |
| "<valid stream>" if istream else "None", |
| shell, |
| universal_newlines, |
| ) |
| try: |
| proc = safer_popen( |
| command, |
| env=env, |
| cwd=cwd, |
| bufsize=-1, |
| stdin=(istream or DEVNULL), |
| stderr=PIPE, |
| stdout=stdout_sink, |
| shell=shell, |
| universal_newlines=universal_newlines, |
| encoding=defenc if universal_newlines else None, |
| **subprocess_kwargs, |
| ) |
| except cmd_not_found_exception as err: |
| raise GitCommandNotFound(redacted_command, err) from err |
| else: |
| |
| proc.stdout = cast(BinaryIO, proc.stdout) |
| proc.stderr = cast(BinaryIO, proc.stderr) |
|
|
| if as_process: |
| return self.AutoInterrupt(proc, command) |
|
|
| if sys.platform != "win32" and kill_after_timeout is not None: |
| |
| timeout = kill_after_timeout |
|
|
| def kill_process(pid: int) -> None: |
| """Callback to kill a process. |
| |
| This callback implementation would be ineffective and unsafe on Windows. |
| """ |
| p = Popen(["ps", "--ppid", str(pid)], stdout=PIPE) |
| child_pids = [] |
| if p.stdout is not None: |
| for line in p.stdout: |
| if len(line.split()) > 0: |
| local_pid = (line.split())[0] |
| if local_pid.isdigit(): |
| child_pids.append(int(local_pid)) |
| try: |
| os.kill(pid, signal.SIGKILL) |
| for child_pid in child_pids: |
| try: |
| os.kill(child_pid, signal.SIGKILL) |
| except OSError: |
| pass |
| |
| kill_check.set() |
| except OSError: |
| |
| |
| pass |
| return |
|
|
| def communicate() -> Tuple[AnyStr, AnyStr]: |
| watchdog.start() |
| out, err = proc.communicate() |
| watchdog.cancel() |
| if kill_check.is_set(): |
| err = 'Timeout: the command "%s" did not complete in %d ' "secs." % ( |
| " ".join(redacted_command), |
| timeout, |
| ) |
| if not universal_newlines: |
| err = err.encode(defenc) |
| return out, err |
|
|
| |
|
|
| kill_check = threading.Event() |
| watchdog = threading.Timer(timeout, kill_process, args=(proc.pid,)) |
| else: |
| communicate = proc.communicate |
|
|
| |
| status = 0 |
| stdout_value: Union[str, bytes] = b"" |
| stderr_value: Union[str, bytes] = b"" |
| newline = "\n" if universal_newlines else b"\n" |
| try: |
| if output_stream is None: |
| stdout_value, stderr_value = communicate() |
| |
| if stdout_value.endswith(newline) and strip_newline_in_stdout: |
| stdout_value = stdout_value[:-1] |
| if stderr_value.endswith(newline): |
| stderr_value = stderr_value[:-1] |
|
|
| status = proc.returncode |
| else: |
| max_chunk_size = max_chunk_size if max_chunk_size and max_chunk_size > 0 else io.DEFAULT_BUFFER_SIZE |
| stream_copy(proc.stdout, output_stream, max_chunk_size) |
| stdout_value = proc.stdout.read() |
| stderr_value = proc.stderr.read() |
| |
| if stderr_value.endswith(newline): |
| stderr_value = stderr_value[:-1] |
| status = proc.wait() |
| |
| finally: |
| proc.stdout.close() |
| proc.stderr.close() |
|
|
| if self.GIT_PYTHON_TRACE == "full": |
| cmdstr = " ".join(redacted_command) |
|
|
| def as_text(stdout_value: Union[bytes, str]) -> str: |
| return not output_stream and safe_decode(stdout_value) or "<OUTPUT_STREAM>" |
|
|
| |
|
|
| if stderr_value: |
| _logger.info( |
| "%s -> %d; stdout: '%s'; stderr: '%s'", |
| cmdstr, |
| status, |
| as_text(stdout_value), |
| safe_decode(stderr_value), |
| ) |
| elif stdout_value: |
| _logger.info("%s -> %d; stdout: '%s'", cmdstr, status, as_text(stdout_value)) |
| else: |
| _logger.info("%s -> %d", cmdstr, status) |
| |
|
|
| if with_exceptions and status != 0: |
| raise GitCommandError(redacted_command, status, stderr_value, stdout_value) |
|
|
| if isinstance(stdout_value, bytes) and stdout_as_string: |
| stdout_value = safe_decode(stdout_value) |
|
|
| |
| if with_extended_output: |
| return (status, stdout_value, safe_decode(stderr_value)) |
| else: |
| return stdout_value |
|
|
| def environment(self) -> Dict[str, str]: |
| return self._environment |
|
|
| def update_environment(self, **kwargs: Any) -> Dict[str, Union[str, None]]: |
| """Set environment variables for future git invocations. Return all changed |
| values in a format that can be passed back into this function to revert the |
| changes. |
| |
| Examples:: |
| |
| old_env = self.update_environment(PWD='/tmp') |
| self.update_environment(**old_env) |
| |
| :param kwargs: |
| Environment variables to use for git processes. |
| |
| :return: |
| Dict that maps environment variables to their old values |
| """ |
| old_env = {} |
| for key, value in kwargs.items(): |
| |
| if value is not None: |
| old_env[key] = self._environment.get(key) |
| self._environment[key] = value |
| |
| elif key in self._environment: |
| old_env[key] = self._environment[key] |
| del self._environment[key] |
| return old_env |
|
|
| @contextlib.contextmanager |
| def custom_environment(self, **kwargs: Any) -> Iterator[None]: |
| """A context manager around the above :meth:`update_environment` method to |
| restore the environment back to its previous state after operation. |
| |
| Examples:: |
| |
| with self.custom_environment(GIT_SSH='/bin/ssh_wrapper'): |
| repo.remotes.origin.fetch() |
| |
| :param kwargs: |
| See :meth:`update_environment`. |
| """ |
| old_env = self.update_environment(**kwargs) |
| try: |
| yield |
| finally: |
| self.update_environment(**old_env) |
|
|
| def transform_kwarg(self, name: str, value: Any, split_single_char_options: bool) -> List[str]: |
| if len(name) == 1: |
| if value is True: |
| return ["-%s" % name] |
| elif value not in (False, None): |
| if split_single_char_options: |
| return ["-%s" % name, "%s" % value] |
| else: |
| return ["-%s%s" % (name, value)] |
| else: |
| if value is True: |
| return ["--%s" % dashify(name)] |
| elif value is not False and value is not None: |
| return ["--%s=%s" % (dashify(name), value)] |
| return [] |
|
|
| def transform_kwargs(self, split_single_char_options: bool = True, **kwargs: Any) -> List[str]: |
| """Transform Python-style kwargs into git command line options.""" |
| args = [] |
| for k, v in kwargs.items(): |
| if isinstance(v, (list, tuple)): |
| for value in v: |
| args += self.transform_kwarg(k, value, split_single_char_options) |
| else: |
| args += self.transform_kwarg(k, v, split_single_char_options) |
| return args |
|
|
| @classmethod |
| def _unpack_args(cls, arg_list: Sequence[str]) -> List[str]: |
| outlist = [] |
| if isinstance(arg_list, (list, tuple)): |
| for arg in arg_list: |
| outlist.extend(cls._unpack_args(arg)) |
| else: |
| outlist.append(str(arg_list)) |
|
|
| return outlist |
|
|
| def __call__(self, **kwargs: Any) -> "Git": |
| """Specify command line options to the git executable for a subcommand call. |
| |
| :param kwargs: |
| A dict of keyword arguments. |
| These arguments are passed as in :meth:`_call_process`, but will be passed |
| to the git command rather than the subcommand. |
| |
| Examples:: |
| |
| git(work_tree='/tmp').difftool() |
| """ |
| self._git_options = self.transform_kwargs(split_single_char_options=True, **kwargs) |
| return self |
|
|
| @overload |
| def _call_process( |
| self, method: str, *args: None, **kwargs: None |
| ) -> str: ... |
|
|
| @overload |
| def _call_process( |
| self, |
| method: str, |
| istream: int, |
| as_process: Literal[True], |
| *args: Any, |
| **kwargs: Any, |
| ) -> "Git.AutoInterrupt": ... |
|
|
| @overload |
| def _call_process( |
| self, method: str, *args: Any, **kwargs: Any |
| ) -> Union[str, bytes, Tuple[int, Union[str, bytes], str], "Git.AutoInterrupt"]: ... |
|
|
| def _call_process( |
| self, method: str, *args: Any, **kwargs: Any |
| ) -> Union[str, bytes, Tuple[int, Union[str, bytes], str], "Git.AutoInterrupt"]: |
| """Run the given git command with the specified arguments and return the result |
| as a string. |
| |
| :param method: |
| The command. Contained ``_`` characters will be converted to hyphens, such |
| as in ``ls_files`` to call ``ls-files``. |
| |
| :param args: |
| The list of arguments. If ``None`` is included, it will be pruned. |
| This allows your commands to call git more conveniently, as ``None`` is |
| realized as non-existent. |
| |
| :param kwargs: |
| Contains key-values for the following: |
| |
| - The :meth:`execute()` kwds, as listed in ``execute_kwargs``. |
| - "Command options" to be converted by :meth:`transform_kwargs`. |
| - The ``insert_kwargs_after`` key which its value must match one of |
| ``*args``. |
| |
| It also contains any command options, to be appended after the matched arg. |
| |
| Examples:: |
| |
| git.rev_list('master', max_count=10, header=True) |
| |
| turns into:: |
| |
| git rev-list max-count 10 --header master |
| |
| :return: |
| Same as :meth:`execute`. If no args are given, used :meth:`execute`'s |
| default (especially ``as_process = False``, ``stdout_as_string = True``) and |
| return :class:`str`. |
| """ |
| |
| |
| exec_kwargs = {k: v for k, v in kwargs.items() if k in execute_kwargs} |
| opts_kwargs = {k: v for k, v in kwargs.items() if k not in execute_kwargs} |
|
|
| insert_after_this_arg = opts_kwargs.pop("insert_kwargs_after", None) |
|
|
| |
|
|
| opt_args = self.transform_kwargs(**opts_kwargs) |
| ext_args = self._unpack_args([a for a in args if a is not None]) |
|
|
| if insert_after_this_arg is None: |
| args_list = opt_args + ext_args |
| else: |
| try: |
| index = ext_args.index(insert_after_this_arg) |
| except ValueError as err: |
| raise ValueError( |
| "Couldn't find argument '%s' in args %s to insert cmd options after" |
| % (insert_after_this_arg, str(ext_args)) |
| ) from err |
| |
| args_list = ext_args[: index + 1] + opt_args + ext_args[index + 1 :] |
| |
|
|
| call = [self.GIT_PYTHON_GIT_EXECUTABLE] |
|
|
| |
| call.extend(self._persistent_git_options) |
|
|
| |
| call.extend(self._git_options) |
| self._git_options = () |
|
|
| call.append(dashify(method)) |
| call.extend(args_list) |
|
|
| return self.execute(call, **exec_kwargs) |
|
|
| def _parse_object_header(self, header_line: str) -> Tuple[str, str, int]: |
| """ |
| :param header_line: |
| A line of the form:: |
| |
| <hex_sha> type_string size_as_int |
| |
| :return: |
| (hex_sha, type_string, size_as_int) |
| |
| :raise ValueError: |
| If the header contains indication for an error due to incorrect input sha. |
| """ |
| tokens = header_line.split() |
| if len(tokens) != 3: |
| if not tokens: |
| err_msg = ( |
| f"SHA is empty, possible dubious ownership in the repository " |
| f"""at {self._working_dir}.\n If this is unintended run:\n\n """ |
| f""" "git config --global --add safe.directory {self._working_dir}" """ |
| ) |
| raise ValueError(err_msg) |
| else: |
| raise ValueError("SHA %s could not be resolved, git returned: %r" % (tokens[0], header_line.strip())) |
| |
| |
|
|
| if len(tokens[0]) != 40: |
| raise ValueError("Failed to parse header: %r" % header_line) |
| return (tokens[0], tokens[1], int(tokens[2])) |
|
|
| def _prepare_ref(self, ref: AnyStr) -> bytes: |
| |
| if isinstance(ref, bytes): |
| |
| refstr: str = ref.decode("ascii") |
| elif not isinstance(ref, str): |
| refstr = str(ref) |
| else: |
| refstr = ref |
|
|
| if not refstr.endswith("\n"): |
| refstr += "\n" |
| return refstr.encode(defenc) |
|
|
| def _get_persistent_cmd(self, attr_name: str, cmd_name: str, *args: Any, **kwargs: Any) -> "Git.AutoInterrupt": |
| cur_val = getattr(self, attr_name) |
| if cur_val is not None: |
| return cur_val |
|
|
| options = {"istream": PIPE, "as_process": True} |
| options.update(kwargs) |
|
|
| cmd = self._call_process(cmd_name, *args, **options) |
| setattr(self, attr_name, cmd) |
| cmd = cast("Git.AutoInterrupt", cmd) |
| return cmd |
|
|
| def __get_object_header(self, cmd: "Git.AutoInterrupt", ref: AnyStr) -> Tuple[str, str, int]: |
| if cmd.stdin and cmd.stdout: |
| cmd.stdin.write(self._prepare_ref(ref)) |
| cmd.stdin.flush() |
| return self._parse_object_header(cmd.stdout.readline()) |
| else: |
| raise ValueError("cmd stdin was empty") |
|
|
| def get_object_header(self, ref: str) -> Tuple[str, str, int]: |
| """Use this method to quickly examine the type and size of the object behind the |
| given ref. |
| |
| :note: |
| The method will only suffer from the costs of command invocation once and |
| reuses the command in subsequent calls. |
| |
| :return: |
| (hexsha, type_string, size_as_int) |
| """ |
| cmd = self._get_persistent_cmd("cat_file_header", "cat_file", batch_check=True) |
| return self.__get_object_header(cmd, ref) |
|
|
| def get_object_data(self, ref: str) -> Tuple[str, str, int, bytes]: |
| """Similar to :meth:`get_object_header`, but returns object data as well. |
| |
| :return: |
| (hexsha, type_string, size_as_int, data_string) |
| |
| :note: |
| Not threadsafe. |
| """ |
| hexsha, typename, size, stream = self.stream_object_data(ref) |
| data = stream.read(size) |
| del stream |
| return (hexsha, typename, size, data) |
|
|
| def stream_object_data(self, ref: str) -> Tuple[str, str, int, "Git.CatFileContentStream"]: |
| """Similar to :meth:`get_object_data`, but returns the data as a stream. |
| |
| :return: |
| (hexsha, type_string, size_as_int, stream) |
| |
| :note: |
| This method is not threadsafe. You need one independent :class:`Git` |
| instance per thread to be safe! |
| """ |
| cmd = self._get_persistent_cmd("cat_file_all", "cat_file", batch=True) |
| hexsha, typename, size = self.__get_object_header(cmd, ref) |
| cmd_stdout = cmd.stdout if cmd.stdout is not None else io.BytesIO() |
| return (hexsha, typename, size, self.CatFileContentStream(size, cmd_stdout)) |
|
|
| def clear_cache(self) -> "Git": |
| """Clear all kinds of internal caches to release resources. |
| |
| Currently persistent commands will be interrupted. |
| |
| :return: |
| self |
| """ |
| for cmd in (self.cat_file_all, self.cat_file_header): |
| if cmd: |
| cmd.__del__() |
|
|
| self.cat_file_all = None |
| self.cat_file_header = None |
| return self |
|
|