| |
| |
| |
| |
|
|
| __all__ = ["Commit"] |
|
|
| from collections import defaultdict |
| import datetime |
| from io import BytesIO |
| import logging |
| import os |
| import re |
| from subprocess import Popen, PIPE |
| import sys |
| from time import altzone, daylight, localtime, time, timezone |
| import warnings |
|
|
| from gitdb import IStream |
|
|
| from git.cmd import Git |
| from git.diff import Diffable |
| from git.util import Actor, Stats, finalize_process, hex_to_bin |
|
|
| from . import base |
| from .tree import Tree |
| from .util import ( |
| Serializable, |
| TraversableIterableObj, |
| altz_to_utctz_str, |
| from_timestamp, |
| parse_actor_and_date, |
| parse_date, |
| ) |
|
|
| |
|
|
| from typing import ( |
| Any, |
| Dict, |
| IO, |
| Iterator, |
| List, |
| Sequence, |
| Tuple, |
| TYPE_CHECKING, |
| Union, |
| cast, |
| ) |
|
|
| if sys.version_info >= (3, 8): |
| from typing import Literal |
| else: |
| from typing_extensions import Literal |
|
|
| from git.types import PathLike |
|
|
| if TYPE_CHECKING: |
| from git.refs import SymbolicReference |
| from git.repo import Repo |
|
|
| |
|
|
| _logger = logging.getLogger(__name__) |
|
|
|
|
| class Commit(base.Object, TraversableIterableObj, Diffable, Serializable): |
| """Wraps a git commit object. |
| |
| See :manpage:`gitglossary(7)` on "commit object": |
| https://git-scm.com/docs/gitglossary#def_commit_object |
| |
| :note: |
| This class will act lazily on some of its attributes and will query the value on |
| demand only if it involves calling the git binary. |
| """ |
|
|
| |
| |
| env_author_date = "GIT_AUTHOR_DATE" |
| env_committer_date = "GIT_COMMITTER_DATE" |
|
|
| |
| conf_encoding = "i18n.commitencoding" |
|
|
| |
| default_encoding = "UTF-8" |
|
|
| type: Literal["commit"] = "commit" |
|
|
| __slots__ = ( |
| "tree", |
| "author", |
| "authored_date", |
| "author_tz_offset", |
| "committer", |
| "committed_date", |
| "committer_tz_offset", |
| "message", |
| "parents", |
| "encoding", |
| "gpgsig", |
| ) |
|
|
| _id_attribute_ = "hexsha" |
|
|
| parents: Sequence["Commit"] |
|
|
| def __init__( |
| self, |
| repo: "Repo", |
| binsha: bytes, |
| tree: Union[Tree, None] = None, |
| author: Union[Actor, None] = None, |
| authored_date: Union[int, None] = None, |
| author_tz_offset: Union[None, float] = None, |
| committer: Union[Actor, None] = None, |
| committed_date: Union[int, None] = None, |
| committer_tz_offset: Union[None, float] = None, |
| message: Union[str, bytes, None] = None, |
| parents: Union[Sequence["Commit"], None] = None, |
| encoding: Union[str, None] = None, |
| gpgsig: Union[str, None] = None, |
| ) -> None: |
| """Instantiate a new :class:`Commit`. All keyword arguments taking ``None`` as |
| default will be implicitly set on first query. |
| |
| :param binsha: |
| 20 byte sha1. |
| |
| :param tree: |
| A :class:`~git.objects.tree.Tree` object. |
| |
| :param author: |
| The author :class:`~git.util.Actor` object. |
| |
| :param authored_date: int_seconds_since_epoch |
| The authored DateTime - use :func:`time.gmtime` to convert it into a |
| different format. |
| |
| :param author_tz_offset: int_seconds_west_of_utc |
| The timezone that the `authored_date` is in. |
| |
| :param committer: |
| The committer string, as an :class:`~git.util.Actor` object. |
| |
| :param committed_date: int_seconds_since_epoch |
| The committed DateTime - use :func:`time.gmtime` to convert it into a |
| different format. |
| |
| :param committer_tz_offset: int_seconds_west_of_utc |
| The timezone that the `committed_date` is in. |
| |
| :param message: string |
| The commit message. |
| |
| :param encoding: string |
| Encoding of the message, defaults to UTF-8. |
| |
| :param parents: |
| List or tuple of :class:`Commit` objects which are our parent(s) in the |
| commit dependency graph. |
| |
| :return: |
| :class:`Commit` |
| |
| :note: |
| Timezone information is in the same format and in the same sign as what |
| :func:`time.altzone` returns. The sign is inverted compared to git's UTC |
| timezone. |
| """ |
| super().__init__(repo, binsha) |
| self.binsha = binsha |
| if tree is not None: |
| assert isinstance(tree, Tree), "Tree needs to be a Tree instance, was %s" % type(tree) |
| if tree is not None: |
| self.tree = tree |
| if author is not None: |
| self.author = author |
| if authored_date is not None: |
| self.authored_date = authored_date |
| if author_tz_offset is not None: |
| self.author_tz_offset = author_tz_offset |
| if committer is not None: |
| self.committer = committer |
| if committed_date is not None: |
| self.committed_date = committed_date |
| if committer_tz_offset is not None: |
| self.committer_tz_offset = committer_tz_offset |
| if message is not None: |
| self.message = message |
| if parents is not None: |
| self.parents = parents |
| if encoding is not None: |
| self.encoding = encoding |
| if gpgsig is not None: |
| self.gpgsig = gpgsig |
|
|
| @classmethod |
| def _get_intermediate_items(cls, commit: "Commit") -> Tuple["Commit", ...]: |
| return tuple(commit.parents) |
|
|
| @classmethod |
| def _calculate_sha_(cls, repo: "Repo", commit: "Commit") -> bytes: |
| """Calculate the sha of a commit. |
| |
| :param repo: |
| :class:`~git.repo.base.Repo` object the commit should be part of. |
| |
| :param commit: |
| :class:`Commit` object for which to generate the sha. |
| """ |
|
|
| stream = BytesIO() |
| commit._serialize(stream) |
| streamlen = stream.tell() |
| stream.seek(0) |
|
|
| istream = repo.odb.store(IStream(cls.type, streamlen, stream)) |
| return istream.binsha |
|
|
| def replace(self, **kwargs: Any) -> "Commit": |
| """Create new commit object from an existing commit object. |
| |
| Any values provided as keyword arguments will replace the corresponding |
| attribute in the new object. |
| """ |
|
|
| attrs = {k: getattr(self, k) for k in self.__slots__} |
|
|
| for attrname in kwargs: |
| if attrname not in self.__slots__: |
| raise ValueError("invalid attribute name") |
|
|
| attrs.update(kwargs) |
| new_commit = self.__class__(self.repo, self.NULL_BIN_SHA, **attrs) |
| new_commit.binsha = self._calculate_sha_(self.repo, new_commit) |
|
|
| return new_commit |
|
|
| def _set_cache_(self, attr: str) -> None: |
| if attr in Commit.__slots__: |
| |
| _binsha, _typename, self.size, stream = self.repo.odb.stream(self.binsha) |
| self._deserialize(BytesIO(stream.read())) |
| else: |
| super()._set_cache_(attr) |
| |
|
|
| @property |
| def authored_datetime(self) -> datetime.datetime: |
| return from_timestamp(self.authored_date, self.author_tz_offset) |
|
|
| @property |
| def committed_datetime(self) -> datetime.datetime: |
| return from_timestamp(self.committed_date, self.committer_tz_offset) |
|
|
| @property |
| def summary(self) -> Union[str, bytes]: |
| """:return: First line of the commit message""" |
| if isinstance(self.message, str): |
| return self.message.split("\n", 1)[0] |
| else: |
| return self.message.split(b"\n", 1)[0] |
|
|
| def count(self, paths: Union[PathLike, Sequence[PathLike]] = "", **kwargs: Any) -> int: |
| """Count the number of commits reachable from this commit. |
| |
| :param paths: |
| An optional path or a list of paths restricting the return value to commits |
| actually containing the paths. |
| |
| :param kwargs: |
| Additional options to be passed to :manpage:`git-rev-list(1)`. They must not |
| alter the output style of the command, or parsing will yield incorrect |
| results. |
| |
| :return: |
| An int defining the number of reachable commits |
| """ |
| |
| |
| if paths: |
| return len(self.repo.git.rev_list(self.hexsha, "--", paths, **kwargs).splitlines()) |
| return len(self.repo.git.rev_list(self.hexsha, **kwargs).splitlines()) |
|
|
| @property |
| def name_rev(self) -> str: |
| """ |
| :return: |
| String describing the commits hex sha based on the closest |
| `~git.refs.reference.Reference`. |
| |
| :note: |
| Mostly useful for UI purposes. |
| """ |
| return self.repo.git.name_rev(self) |
|
|
| @classmethod |
| def iter_items( |
| cls, |
| repo: "Repo", |
| rev: Union[str, "Commit", "SymbolicReference"], |
| paths: Union[PathLike, Sequence[PathLike]] = "", |
| **kwargs: Any, |
| ) -> Iterator["Commit"]: |
| R"""Find all commits matching the given criteria. |
| |
| :param repo: |
| The :class:`~git.repo.base.Repo`. |
| |
| :param rev: |
| Revision specifier. See :manpage:`git-rev-parse(1)` for viable options. |
| |
| :param paths: |
| An optional path or list of paths. If set only :class:`Commit`\s that |
| include the path or paths will be considered. |
| |
| :param kwargs: |
| Optional keyword arguments to :manpage:`git-rev-list(1)` where: |
| |
| * ``max_count`` is the maximum number of commits to fetch. |
| * ``skip`` is the number of commits to skip. |
| * ``since`` selects all commits since some date, e.g. ``"1970-01-01"``. |
| |
| :return: |
| Iterator yielding :class:`Commit` items. |
| """ |
| if "pretty" in kwargs: |
| raise ValueError("--pretty cannot be used as parsing expects single sha's only") |
| |
|
|
| |
| |
|
|
| args_list: List[PathLike] = ["--"] |
|
|
| if paths: |
| paths_tup: Tuple[PathLike, ...] |
| if isinstance(paths, (str, os.PathLike)): |
| paths_tup = (paths,) |
| else: |
| paths_tup = tuple(paths) |
|
|
| args_list.extend(paths_tup) |
| |
|
|
| proc = repo.git.rev_list(rev, args_list, as_process=True, **kwargs) |
| return cls._iter_from_process_or_stream(repo, proc) |
|
|
| def iter_parents(self, paths: Union[PathLike, Sequence[PathLike]] = "", **kwargs: Any) -> Iterator["Commit"]: |
| R"""Iterate _all_ parents of this commit. |
| |
| :param paths: |
| Optional path or list of paths limiting the :class:`Commit`\s to those that |
| contain at least one of the paths. |
| |
| :param kwargs: |
| All arguments allowed by :manpage:`git-rev-list(1)`. |
| |
| :return: |
| Iterator yielding :class:`Commit` objects which are parents of ``self`` |
| """ |
| |
| skip = kwargs.get("skip", 1) |
| if skip == 0: |
| skip = 1 |
| kwargs["skip"] = skip |
|
|
| return self.iter_items(self.repo, self, paths, **kwargs) |
|
|
| @property |
| def stats(self) -> Stats: |
| """Create a git stat from changes between this commit and its first parent |
| or from all changes done if this is the very first commit. |
| |
| :return: |
| :class:`Stats` |
| """ |
|
|
| def process_lines(lines: List[str]) -> str: |
| text = "" |
| for file_info, line in zip(lines, lines[len(lines) // 2 :]): |
| change_type = file_info.split("\t")[0][-1] |
| (insertions, deletions, filename) = line.split("\t") |
| text += "%s\t%s\t%s\t%s\n" % (change_type, insertions, deletions, filename) |
| return text |
|
|
| if not self.parents: |
| lines = self.repo.git.diff_tree( |
| self.hexsha, "--", numstat=True, no_renames=True, root=True, raw=True |
| ).splitlines()[1:] |
| text = process_lines(lines) |
| else: |
| lines = self.repo.git.diff( |
| self.parents[0].hexsha, self.hexsha, "--", numstat=True, no_renames=True, raw=True |
| ).splitlines() |
| text = process_lines(lines) |
| return Stats._list_from_string(self.repo, text) |
|
|
| @property |
| def trailers(self) -> Dict[str, str]: |
| """Deprecated. Get the trailers of the message as a dictionary. |
| |
| :note: |
| This property is deprecated, please use either :attr:`trailers_list` or |
| :attr:`trailers_dict`. |
| |
| :return: |
| Dictionary containing whitespace stripped trailer information. |
| Only contains the latest instance of each trailer key. |
| """ |
| warnings.warn( |
| "Commit.trailers is deprecated, use Commit.trailers_list or Commit.trailers_dict instead", |
| DeprecationWarning, |
| stacklevel=2, |
| ) |
| return {k: v[0] for k, v in self.trailers_dict.items()} |
|
|
| @property |
| def trailers_list(self) -> List[Tuple[str, str]]: |
| """Get the trailers of the message as a list. |
| |
| Git messages can contain trailer information that are similar to :rfc:`822` |
| e-mail headers. See :manpage:`git-interpret-trailers(1)`. |
| |
| This function calls ``git interpret-trailers --parse`` onto the message to |
| extract the trailer information, returns the raw trailer data as a list. |
| |
| Valid message with trailer:: |
| |
| Subject line |
| |
| some body information |
| |
| another information |
| |
| key1: value1.1 |
| key1: value1.2 |
| key2 : value 2 with inner spaces |
| |
| Returned list will look like this:: |
| |
| [ |
| ("key1", "value1.1"), |
| ("key1", "value1.2"), |
| ("key2", "value 2 with inner spaces"), |
| ] |
| |
| :return: |
| List containing key-value tuples of whitespace stripped trailer information. |
| """ |
| cmd = ["git", "interpret-trailers", "--parse"] |
| proc: Git.AutoInterrupt = self.repo.git.execute( |
| cmd, |
| as_process=True, |
| istream=PIPE, |
| ) |
| trailer: str = proc.communicate(str(self.message).encode())[0].decode("utf8") |
| trailer = trailer.strip() |
|
|
| if not trailer: |
| return [] |
|
|
| trailer_list = [] |
| for t in trailer.split("\n"): |
| key, val = t.split(":", 1) |
| trailer_list.append((key.strip(), val.strip())) |
|
|
| return trailer_list |
|
|
| @property |
| def trailers_dict(self) -> Dict[str, List[str]]: |
| """Get the trailers of the message as a dictionary. |
| |
| Git messages can contain trailer information that are similar to :rfc:`822` |
| e-mail headers. See :manpage:`git-interpret-trailers(1)`. |
| |
| This function calls ``git interpret-trailers --parse`` onto the message to |
| extract the trailer information. The key value pairs are stripped of leading and |
| trailing whitespaces before they get saved into a dictionary. |
| |
| Valid message with trailer:: |
| |
| Subject line |
| |
| some body information |
| |
| another information |
| |
| key1: value1.1 |
| key1: value1.2 |
| key2 : value 2 with inner spaces |
| |
| Returned dictionary will look like this:: |
| |
| { |
| "key1": ["value1.1", "value1.2"], |
| "key2": ["value 2 with inner spaces"], |
| } |
| |
| |
| :return: |
| Dictionary containing whitespace stripped trailer information, mapping |
| trailer keys to a list of their corresponding values. |
| """ |
| d = defaultdict(list) |
| for key, val in self.trailers_list: |
| d[key].append(val) |
| return dict(d) |
|
|
| @classmethod |
| def _iter_from_process_or_stream(cls, repo: "Repo", proc_or_stream: Union[Popen, IO]) -> Iterator["Commit"]: |
| """Parse out commit information into a list of :class:`Commit` objects. |
| |
| We expect one line per commit, and parse the actual commit information directly |
| from our lighting fast object database. |
| |
| :param proc: |
| :manpage:`git-rev-list(1)` process instance - one sha per line. |
| |
| :return: |
| Iterator supplying :class:`Commit` objects |
| """ |
|
|
| |
| |
|
|
| |
| |
|
|
| if hasattr(proc_or_stream, "wait"): |
| proc_or_stream = cast(Popen, proc_or_stream) |
| if proc_or_stream.stdout is not None: |
| stream = proc_or_stream.stdout |
| elif hasattr(proc_or_stream, "readline"): |
| proc_or_stream = cast(IO, proc_or_stream) |
| stream = proc_or_stream |
|
|
| readline = stream.readline |
| while True: |
| line = readline() |
| if not line: |
| break |
| hexsha = line.strip() |
| if len(hexsha) > 40: |
| |
| hexsha, _ = line.split(None, 1) |
| |
|
|
| assert len(hexsha) == 40, "Invalid line: %s" % hexsha |
| yield cls(repo, hex_to_bin(hexsha)) |
| |
|
|
| |
| |
| if hasattr(proc_or_stream, "wait"): |
| proc_or_stream = cast(Popen, proc_or_stream) |
| finalize_process(proc_or_stream) |
|
|
| @classmethod |
| def create_from_tree( |
| cls, |
| repo: "Repo", |
| tree: Union[Tree, str], |
| message: str, |
| parent_commits: Union[None, List["Commit"]] = None, |
| head: bool = False, |
| author: Union[None, Actor] = None, |
| committer: Union[None, Actor] = None, |
| author_date: Union[None, str, datetime.datetime] = None, |
| commit_date: Union[None, str, datetime.datetime] = None, |
| ) -> "Commit": |
| """Commit the given tree, creating a :class:`Commit` object. |
| |
| :param repo: |
| :class:`~git.repo.base.Repo` object the commit should be part of. |
| |
| :param tree: |
| :class:`~git.objects.tree.Tree` object or hex or bin sha. |
| The tree of the new commit. |
| |
| :param message: |
| Commit message. It may be an empty string if no message is provided. It will |
| be converted to a string, in any case. |
| |
| :param parent_commits: |
| Optional :class:`Commit` objects to use as parents for the new commit. If |
| empty list, the commit will have no parents at all and become a root commit. |
| If ``None``, the current head commit will be the parent of the new commit |
| object. |
| |
| :param head: |
| If ``True``, the HEAD will be advanced to the new commit automatically. |
| Otherwise the HEAD will remain pointing on the previous commit. This could |
| lead to undesired results when diffing files. |
| |
| :param author: |
| The name of the author, optional. |
| If unset, the repository configuration is used to obtain this value. |
| |
| :param committer: |
| The name of the committer, optional. |
| If unset, the repository configuration is used to obtain this value. |
| |
| :param author_date: |
| The timestamp for the author field. |
| |
| :param commit_date: |
| The timestamp for the committer field. |
| |
| :return: |
| :class:`Commit` object representing the new commit. |
| |
| :note: |
| Additional information about the committer and author are taken from the |
| environment or from the git configuration. See :manpage:`git-commit-tree(1)` |
| for more information. |
| """ |
| if parent_commits is None: |
| try: |
| parent_commits = [repo.head.commit] |
| except ValueError: |
| |
| parent_commits = [] |
| |
| else: |
| for p in parent_commits: |
| if not isinstance(p, cls): |
| raise ValueError(f"Parent commit '{p!r}' must be of type {cls}") |
| |
| |
|
|
| |
| |
| |
| |
|
|
| |
| cr = repo.config_reader() |
| env = os.environ |
|
|
| committer = committer or Actor.committer(cr) |
| author = author or Actor.author(cr) |
|
|
| |
| unix_time = int(time()) |
| is_dst = daylight and localtime().tm_isdst > 0 |
| offset = altzone if is_dst else timezone |
|
|
| author_date_str = env.get(cls.env_author_date, "") |
| if author_date: |
| author_time, author_offset = parse_date(author_date) |
| elif author_date_str: |
| author_time, author_offset = parse_date(author_date_str) |
| else: |
| author_time, author_offset = unix_time, offset |
| |
|
|
| committer_date_str = env.get(cls.env_committer_date, "") |
| if commit_date: |
| committer_time, committer_offset = parse_date(commit_date) |
| elif committer_date_str: |
| committer_time, committer_offset = parse_date(committer_date_str) |
| else: |
| committer_time, committer_offset = unix_time, offset |
| |
|
|
| |
| enc_section, enc_option = cls.conf_encoding.split(".") |
| conf_encoding = cr.get_value(enc_section, enc_option, cls.default_encoding) |
| if not isinstance(conf_encoding, str): |
| raise TypeError("conf_encoding could not be coerced to str") |
|
|
| |
| |
| if isinstance(tree, str): |
| tree = repo.tree(tree) |
| |
|
|
| |
| new_commit = cls( |
| repo, |
| cls.NULL_BIN_SHA, |
| tree, |
| author, |
| author_time, |
| author_offset, |
| committer, |
| committer_time, |
| committer_offset, |
| message, |
| parent_commits, |
| conf_encoding, |
| ) |
|
|
| new_commit.binsha = cls._calculate_sha_(repo, new_commit) |
|
|
| if head: |
| |
| |
| import git.refs |
|
|
| try: |
| repo.head.set_commit(new_commit, logmsg=message) |
| except ValueError: |
| |
| |
| master = git.refs.Head.create( |
| repo, |
| repo.head.ref, |
| new_commit, |
| logmsg="commit (initial): %s" % message, |
| ) |
| repo.head.set_reference(master, logmsg="commit: Switching to %s" % master) |
| |
| |
|
|
| return new_commit |
|
|
| |
|
|
| def _serialize(self, stream: BytesIO) -> "Commit": |
| write = stream.write |
| write(("tree %s\n" % self.tree).encode("ascii")) |
| for p in self.parents: |
| write(("parent %s\n" % p).encode("ascii")) |
|
|
| a = self.author |
| aname = a.name |
| c = self.committer |
| fmt = "%s %s <%s> %s %s\n" |
| write( |
| ( |
| fmt |
| % ( |
| "author", |
| aname, |
| a.email, |
| self.authored_date, |
| altz_to_utctz_str(self.author_tz_offset), |
| ) |
| ).encode(self.encoding) |
| ) |
|
|
| |
| aname = c.name |
| write( |
| ( |
| fmt |
| % ( |
| "committer", |
| aname, |
| c.email, |
| self.committed_date, |
| altz_to_utctz_str(self.committer_tz_offset), |
| ) |
| ).encode(self.encoding) |
| ) |
|
|
| if self.encoding != self.default_encoding: |
| write(("encoding %s\n" % self.encoding).encode("ascii")) |
|
|
| try: |
| if self.__getattribute__("gpgsig"): |
| write(b"gpgsig") |
| for sigline in self.gpgsig.rstrip("\n").split("\n"): |
| write((" " + sigline + "\n").encode("ascii")) |
| except AttributeError: |
| pass |
|
|
| write(b"\n") |
|
|
| |
| if isinstance(self.message, str): |
| write(self.message.encode(self.encoding)) |
| else: |
| write(self.message) |
| |
| return self |
|
|
| def _deserialize(self, stream: BytesIO) -> "Commit": |
| readline = stream.readline |
| self.tree = Tree(self.repo, hex_to_bin(readline().split()[1]), Tree.tree_id << 12, "") |
|
|
| self.parents = [] |
| next_line = None |
| while True: |
| parent_line = readline() |
| if not parent_line.startswith(b"parent"): |
| next_line = parent_line |
| break |
| |
| self.parents.append(type(self)(self.repo, hex_to_bin(parent_line.split()[-1].decode("ascii")))) |
| |
| self.parents = tuple(self.parents) |
|
|
| |
| |
| author_line = next_line |
| committer_line = readline() |
|
|
| |
| next_line = readline() |
| while next_line.startswith(b"mergetag "): |
| next_line = readline() |
| while next_line.startswith(b" "): |
| next_line = readline() |
| |
|
|
| |
| |
| self.encoding = self.default_encoding |
| self.gpgsig = "" |
|
|
| |
| enc = next_line |
| buf = enc.strip() |
| while buf: |
| if buf[0:10] == b"encoding ": |
| self.encoding = buf[buf.find(b" ") + 1 :].decode(self.encoding, "ignore") |
| elif buf[0:7] == b"gpgsig ": |
| sig = buf[buf.find(b" ") + 1 :] + b"\n" |
| is_next_header = False |
| while True: |
| sigbuf = readline() |
| if not sigbuf: |
| break |
| if sigbuf[0:1] != b" ": |
| buf = sigbuf.strip() |
| is_next_header = True |
| break |
| sig += sigbuf[1:] |
| |
| self.gpgsig = sig.rstrip(b"\n").decode(self.encoding, "ignore") |
| if is_next_header: |
| continue |
| buf = readline().strip() |
|
|
| |
| try: |
| ( |
| self.author, |
| self.authored_date, |
| self.author_tz_offset, |
| ) = parse_actor_and_date(author_line.decode(self.encoding, "replace")) |
| except UnicodeDecodeError: |
| _logger.error( |
| "Failed to decode author line '%s' using encoding %s", |
| author_line, |
| self.encoding, |
| exc_info=True, |
| ) |
|
|
| try: |
| ( |
| self.committer, |
| self.committed_date, |
| self.committer_tz_offset, |
| ) = parse_actor_and_date(committer_line.decode(self.encoding, "replace")) |
| except UnicodeDecodeError: |
| _logger.error( |
| "Failed to decode committer line '%s' using encoding %s", |
| committer_line, |
| self.encoding, |
| exc_info=True, |
| ) |
| |
|
|
| |
| |
| self.message = stream.read() |
| try: |
| self.message = self.message.decode(self.encoding, "replace") |
| except UnicodeDecodeError: |
| _logger.error( |
| "Failed to decode message '%s' using encoding %s", |
| self.message, |
| self.encoding, |
| exc_info=True, |
| ) |
| |
|
|
| return self |
|
|
| |
|
|
| @property |
| def co_authors(self) -> List[Actor]: |
| """Search the commit message for any co-authors of this commit. |
| |
| Details on co-authors: |
| https://github.blog/2018-01-29-commit-together-with-co-authors/ |
| |
| :return: |
| List of co-authors for this commit (as :class:`~git.util.Actor` objects). |
| """ |
| co_authors = [] |
|
|
| if self.message: |
| results = re.findall( |
| r"^Co-authored-by: (.*) <(.*?)>$", |
| self.message, |
| re.MULTILINE, |
| ) |
| for author in results: |
| co_authors.append(Actor(*author)) |
|
|
| return co_authors |
|
|