| |
| |
| |
| |
|
|
| from io import BytesIO |
|
|
| import mmap |
| import os |
| import sys |
| import zlib |
|
|
| from gitdb.fun import ( |
| msb_size, |
| stream_copy, |
| apply_delta_data, |
| connect_deltas, |
| delta_types |
| ) |
|
|
| from gitdb.util import ( |
| allocate_memory, |
| LazyMixin, |
| make_sha, |
| write, |
| close, |
| ) |
|
|
| from gitdb.const import NULL_BYTE, BYTE_SPACE |
| from gitdb.utils.encoding import force_bytes |
|
|
| has_perf_mod = False |
| try: |
| from gitdb_speedups._perf import apply_delta as c_apply_delta |
| has_perf_mod = True |
| except ImportError: |
| pass |
|
|
| __all__ = ('DecompressMemMapReader', 'FDCompressedSha1Writer', 'DeltaApplyReader', |
| 'Sha1Writer', 'FlexibleSha1Writer', 'ZippedStoreShaWriter', 'FDCompressedSha1Writer', |
| 'FDStream', 'NullStream') |
|
|
|
|
| |
|
|
| class DecompressMemMapReader(LazyMixin): |
|
|
| """Reads data in chunks from a memory map and decompresses it. The client sees |
| only the uncompressed data, respective file-like read calls are handling on-demand |
| buffered decompression accordingly |
| |
| A constraint on the total size of bytes is activated, simulating |
| a logical file within a possibly larger physical memory area |
| |
| To read efficiently, you clearly don't want to read individual bytes, instead, |
| read a few kilobytes at least. |
| |
| **Note:** The chunk-size should be carefully selected as it will involve quite a bit |
| of string copying due to the way the zlib is implemented. Its very wasteful, |
| hence we try to find a good tradeoff between allocation time and number of |
| times we actually allocate. An own zlib implementation would be good here |
| to better support streamed reading - it would only need to keep the mmap |
| and decompress it into chunks, that's all ... """ |
| __slots__ = ('_m', '_zip', '_buf', '_buflen', '_br', '_cws', '_cwe', '_s', '_close', |
| '_cbr', '_phi') |
|
|
| max_read_size = 512 * 1024 |
|
|
| def __init__(self, m, close_on_deletion, size=None): |
| """Initialize with mmap for stream reading |
| :param m: must be content data - use new if you have object data and no size""" |
| self._m = m |
| self._zip = zlib.decompressobj() |
| self._buf = None |
| self._buflen = 0 |
| if size is not None: |
| self._s = size |
| self._br = 0 |
| self._cws = 0 |
| self._cwe = 0 |
| self._cbr = 0 |
| self._phi = False |
| self._close = close_on_deletion |
|
|
| def _set_cache_(self, attr): |
| assert attr == '_s' |
| |
| |
| self._parse_header_info() |
|
|
| def __del__(self): |
| self.close() |
|
|
| def _parse_header_info(self): |
| """If this stream contains object data, parse the header info and skip the |
| stream to a point where each read will yield object content |
| |
| :return: parsed type_string, size""" |
| |
| |
| |
| maxb = 8192 |
| self._s = maxb |
| hdr = self.read(maxb) |
| hdrend = hdr.find(NULL_BYTE) |
| typ, size = hdr[:hdrend].split(BYTE_SPACE) |
| size = int(size) |
| self._s = size |
|
|
| |
| |
| self._br = 0 |
| hdrend += 1 |
| self._buf = BytesIO(hdr[hdrend:]) |
| self._buflen = len(hdr) - hdrend |
|
|
| self._phi = True |
|
|
| return typ, size |
|
|
| |
|
|
| @classmethod |
| def new(self, m, close_on_deletion=False): |
| """Create a new DecompressMemMapReader instance for acting as a read-only stream |
| This method parses the object header from m and returns the parsed |
| type and size, as well as the created stream instance. |
| |
| :param m: memory map on which to operate. It must be object data ( header + contents ) |
| :param close_on_deletion: if True, the memory map will be closed once we are |
| being deleted""" |
| inst = DecompressMemMapReader(m, close_on_deletion, 0) |
| typ, size = inst._parse_header_info() |
| return typ, size, inst |
|
|
| def data(self): |
| """:return: random access compatible data we are working on""" |
| return self._m |
|
|
| def close(self): |
| """Close our underlying stream of compressed bytes if this was allowed during initialization |
| :return: True if we closed the underlying stream |
| :note: can be called safely |
| """ |
| if self._close: |
| if hasattr(self._m, 'close'): |
| self._m.close() |
| self._close = False |
| |
|
|
| def compressed_bytes_read(self): |
| """ |
| :return: number of compressed bytes read. This includes the bytes it |
| took to decompress the header ( if there was one )""" |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| if self._br == self._s and not self._zip.unused_data: |
| |
| |
| self._br = 0 |
| if hasattr(self._zip, 'status'): |
| while self._zip.status == zlib.Z_OK: |
| self.read(mmap.PAGESIZE) |
| |
| else: |
| |
| while not self._zip.unused_data and self._cbr != len(self._m): |
| self.read(mmap.PAGESIZE) |
| |
| |
|
|
| |
| self._br = self._s |
| |
|
|
| |
| |
| return self._cbr |
|
|
| |
|
|
| def seek(self, offset, whence=getattr(os, 'SEEK_SET', 0)): |
| """Allows to reset the stream to restart reading |
| :raise ValueError: If offset and whence are not 0""" |
| if offset != 0 or whence != getattr(os, 'SEEK_SET', 0): |
| raise ValueError("Can only seek to position 0") |
| |
|
|
| self._zip = zlib.decompressobj() |
| self._br = self._cws = self._cwe = self._cbr = 0 |
| if self._phi: |
| self._phi = False |
| del(self._s) |
| |
|
|
| def read(self, size=-1): |
| if size < 1: |
| size = self._s - self._br |
| else: |
| size = min(size, self._s - self._br) |
| |
|
|
| if size == 0: |
| return b'' |
| |
|
|
| |
| |
| |
| dat = b'' |
| if self._buf: |
| if self._buflen >= size: |
| |
| dat = self._buf.read(size) |
| self._buflen -= size |
| self._br += size |
| return dat |
| else: |
| dat = self._buf.read() |
| size -= self._buflen |
| self._br += self._buflen |
|
|
| self._buflen = 0 |
| self._buf = None |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| tail = self._zip.unconsumed_tail |
| if tail: |
| |
| |
| |
| |
| |
| |
| self._cws = self._cwe - len(tail) |
| self._cwe = self._cws + size |
| else: |
| cws = self._cws |
| self._cws = self._cwe |
| self._cwe = cws + size |
| |
|
|
| |
| if self._cwe - self._cws < 8: |
| self._cwe = self._cws + 8 |
| |
|
|
| |
| indata = self._m[self._cws:self._cwe] |
|
|
| |
| self._cwe = self._cws + len(indata) |
| dcompdat = self._zip.decompress(indata, size) |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| if getattr(zlib, 'ZLIB_RUNTIME_VERSION', zlib.ZLIB_VERSION) in ('1.2.7', '1.2.5') and not sys.platform == 'darwin': |
| unused_datalen = len(self._zip.unconsumed_tail) |
| else: |
| unused_datalen = len(self._zip.unconsumed_tail) + len(self._zip.unused_data) |
| |
|
|
| self._cbr += len(indata) - unused_datalen |
| self._br += len(dcompdat) |
|
|
| if dat: |
| dcompdat = dat + dcompdat |
| |
|
|
| |
| |
| |
| |
| |
| |
| if dcompdat and (len(dcompdat) - len(dat)) < size and self._br < self._s: |
| dcompdat += self.read(size - len(dcompdat)) |
| |
| return dcompdat |
|
|
|
|
| class DeltaApplyReader(LazyMixin): |
|
|
| """A reader which dynamically applies pack deltas to a base object, keeping the |
| memory demands to a minimum. |
| |
| The size of the final object is only obtainable once all deltas have been |
| applied, unless it is retrieved from a pack index. |
| |
| The uncompressed Delta has the following layout (MSB being a most significant |
| bit encoded dynamic size): |
| |
| * MSB Source Size - the size of the base against which the delta was created |
| * MSB Target Size - the size of the resulting data after the delta was applied |
| * A list of one byte commands (cmd) which are followed by a specific protocol: |
| |
| * cmd & 0x80 - copy delta_data[offset:offset+size] |
| |
| * Followed by an encoded offset into the delta data |
| * Followed by an encoded size of the chunk to copy |
| |
| * cmd & 0x7f - insert |
| |
| * insert cmd bytes from the delta buffer into the output stream |
| |
| * cmd == 0 - invalid operation ( or error in delta stream ) |
| """ |
| __slots__ = ( |
| "_bstream", |
| "_dstreams", |
| "_mm_target", |
| "_size", |
| "_br" |
| ) |
|
|
| |
| k_max_memory_move = 250 * 1000 * 1000 |
| |
|
|
| def __init__(self, stream_list): |
| """Initialize this instance with a list of streams, the first stream being |
| the delta to apply on top of all following deltas, the last stream being the |
| base object onto which to apply the deltas""" |
| assert len(stream_list) > 1, "Need at least one delta and one base stream" |
|
|
| self._bstream = stream_list[-1] |
| self._dstreams = tuple(stream_list[:-1]) |
| self._br = 0 |
|
|
| def _set_cache_too_slow_without_c(self, attr): |
| |
| |
| |
| |
| |
| if len(self._dstreams) == 1: |
| return self._set_cache_brute_(attr) |
|
|
| |
| |
| |
| dcl = connect_deltas(self._dstreams) |
|
|
| |
| |
| if dcl.rbound() == 0: |
| self._size = 0 |
| self._mm_target = allocate_memory(0) |
| return |
| |
|
|
| self._size = dcl.rbound() |
| self._mm_target = allocate_memory(self._size) |
|
|
| bbuf = allocate_memory(self._bstream.size) |
| stream_copy(self._bstream.read, bbuf.write, self._bstream.size, 256 * mmap.PAGESIZE) |
|
|
| |
| write = self._mm_target.write |
| dcl.apply(bbuf, write) |
|
|
| self._mm_target.seek(0) |
|
|
| def _set_cache_brute_(self, attr): |
| """If we are here, we apply the actual deltas""" |
| |
| |
| |
| buffer_info_list = list() |
| max_target_size = 0 |
| for dstream in self._dstreams: |
| buf = dstream.read(512) |
| offset, src_size = msb_size(buf) |
| offset, target_size = msb_size(buf, offset) |
| buffer_info_list.append((buf[offset:], offset, src_size, target_size)) |
| max_target_size = max(max_target_size, target_size) |
| |
|
|
| |
| |
| base_size = self._bstream.size |
| target_size = max_target_size |
|
|
| |
| |
| if len(self._dstreams) > 1: |
| base_size = target_size = max(base_size, max_target_size) |
| |
|
|
| |
| |
| bbuf = allocate_memory(base_size) |
| stream_copy(self._bstream.read, bbuf.write, base_size, 256 * mmap.PAGESIZE) |
|
|
| |
| |
| |
| |
| tbuf = allocate_memory(target_size) |
|
|
| |
| |
| |
| |
| final_target_size = None |
| for (dbuf, offset, src_size, target_size), dstream in zip(reversed(buffer_info_list), reversed(self._dstreams)): |
| |
| |
| |
| |
| |
| ddata = allocate_memory(dstream.size - offset) |
| ddata.write(dbuf) |
| |
| stream_copy(dstream.read, ddata.write, dstream.size, 256 * mmap.PAGESIZE) |
|
|
| |
| if 'c_apply_delta' in globals(): |
| c_apply_delta(bbuf, ddata, tbuf) |
| else: |
| apply_delta_data(bbuf, src_size, ddata, len(ddata), tbuf.write) |
| |
|
|
| |
| |
| bbuf, tbuf = tbuf, bbuf |
| bbuf.seek(0) |
| tbuf.seek(0) |
| final_target_size = target_size |
| |
|
|
| |
| |
| |
| self._mm_target = bbuf |
| self._size = final_target_size |
|
|
| |
| if not has_perf_mod: |
| _set_cache_ = _set_cache_brute_ |
| else: |
| _set_cache_ = _set_cache_too_slow_without_c |
|
|
| |
|
|
| def read(self, count=0): |
| bl = self._size - self._br |
| if count < 1 or count > bl: |
| count = bl |
| |
| |
| data = self._mm_target.read(count) |
| self._br += len(data) |
| return data |
|
|
| def seek(self, offset, whence=getattr(os, 'SEEK_SET', 0)): |
| """Allows to reset the stream to restart reading |
| |
| :raise ValueError: If offset and whence are not 0""" |
| if offset != 0 or whence != getattr(os, 'SEEK_SET', 0): |
| raise ValueError("Can only seek to position 0") |
| |
| self._br = 0 |
| self._mm_target.seek(0) |
|
|
| |
|
|
| @classmethod |
| def new(cls, stream_list): |
| """ |
| Convert the given list of streams into a stream which resolves deltas |
| when reading from it. |
| |
| :param stream_list: two or more stream objects, first stream is a Delta |
| to the object that you want to resolve, followed by N additional delta |
| streams. The list's last stream must be a non-delta stream. |
| |
| :return: Non-Delta OPackStream object whose stream can be used to obtain |
| the decompressed resolved data |
| :raise ValueError: if the stream list cannot be handled""" |
| if len(stream_list) < 2: |
| raise ValueError("Need at least two streams") |
| |
|
|
| if stream_list[-1].type_id in delta_types: |
| raise ValueError( |
| "Cannot resolve deltas if there is no base object stream, last one was type: %s" % stream_list[-1].type) |
| |
| return cls(stream_list) |
|
|
| |
|
|
| |
|
|
| @property |
| def type(self): |
| return self._bstream.type |
|
|
| @property |
| def type_id(self): |
| return self._bstream.type_id |
|
|
| @property |
| def size(self): |
| """:return: number of uncompressed bytes in the stream""" |
| return self._size |
|
|
| |
|
|
|
|
| |
|
|
|
|
| |
|
|
| class Sha1Writer: |
|
|
| """Simple stream writer which produces a sha whenever you like as it degests |
| everything it is supposed to write""" |
| __slots__ = "sha1" |
|
|
| def __init__(self): |
| self.sha1 = make_sha() |
|
|
| |
|
|
| def write(self, data): |
| """:raise IOError: If not all bytes could be written |
| :param data: byte object |
| :return: length of incoming data""" |
|
|
| self.sha1.update(data) |
|
|
| return len(data) |
|
|
| |
|
|
| |
|
|
| def sha(self, as_hex=False): |
| """:return: sha so far |
| :param as_hex: if True, sha will be hex-encoded, binary otherwise""" |
| if as_hex: |
| return self.sha1.hexdigest() |
| return self.sha1.digest() |
|
|
| |
|
|
|
|
| class FlexibleSha1Writer(Sha1Writer): |
|
|
| """Writer producing a sha1 while passing on the written bytes to the given |
| write function""" |
| __slots__ = 'writer' |
|
|
| def __init__(self, writer): |
| Sha1Writer.__init__(self) |
| self.writer = writer |
|
|
| def write(self, data): |
| Sha1Writer.write(self, data) |
| self.writer(data) |
|
|
|
|
| class ZippedStoreShaWriter(Sha1Writer): |
|
|
| """Remembers everything someone writes to it and generates a sha""" |
| __slots__ = ('buf', 'zip') |
|
|
| def __init__(self): |
| Sha1Writer.__init__(self) |
| self.buf = BytesIO() |
| self.zip = zlib.compressobj(zlib.Z_BEST_SPEED) |
|
|
| def __getattr__(self, attr): |
| return getattr(self.buf, attr) |
|
|
| def write(self, data): |
| alen = Sha1Writer.write(self, data) |
| self.buf.write(self.zip.compress(data)) |
|
|
| return alen |
|
|
| def close(self): |
| self.buf.write(self.zip.flush()) |
|
|
| def seek(self, offset, whence=getattr(os, 'SEEK_SET', 0)): |
| """Seeking currently only supports to rewind written data |
| Multiple writes are not supported""" |
| if offset != 0 or whence != getattr(os, 'SEEK_SET', 0): |
| raise ValueError("Can only seek to position 0") |
| |
| self.buf.seek(0) |
|
|
| def getvalue(self): |
| """:return: string value from the current stream position to the end""" |
| return self.buf.getvalue() |
|
|
|
|
| class FDCompressedSha1Writer(Sha1Writer): |
|
|
| """Digests data written to it, making the sha available, then compress the |
| data and write it to the file descriptor |
| |
| **Note:** operates on raw file descriptors |
| **Note:** for this to work, you have to use the close-method of this instance""" |
| __slots__ = ("fd", "sha1", "zip") |
|
|
| |
| exc = IOError("Failed to write all bytes to filedescriptor") |
|
|
| def __init__(self, fd): |
| super().__init__() |
| self.fd = fd |
| self.zip = zlib.compressobj(zlib.Z_BEST_SPEED) |
|
|
| |
|
|
| def write(self, data): |
| """:raise IOError: If not all bytes could be written |
| :return: length of incoming data""" |
| self.sha1.update(data) |
| cdata = self.zip.compress(data) |
| bytes_written = write(self.fd, cdata) |
|
|
| if bytes_written != len(cdata): |
| raise self.exc |
|
|
| return len(data) |
|
|
| def close(self): |
| remainder = self.zip.flush() |
| if write(self.fd, remainder) != len(remainder): |
| raise self.exc |
| return close(self.fd) |
|
|
| |
|
|
|
|
| class FDStream: |
|
|
| """A simple wrapper providing the most basic functions on a file descriptor |
| with the fileobject interface. Cannot use os.fdopen as the resulting stream |
| takes ownership""" |
| __slots__ = ("_fd", '_pos') |
|
|
| def __init__(self, fd): |
| self._fd = fd |
| self._pos = 0 |
|
|
| def write(self, data): |
| self._pos += len(data) |
| os.write(self._fd, data) |
|
|
| def read(self, count=0): |
| if count == 0: |
| count = os.path.getsize(self._filepath) |
| |
|
|
| bytes = os.read(self._fd, count) |
| self._pos += len(bytes) |
| return bytes |
|
|
| def fileno(self): |
| return self._fd |
|
|
| def tell(self): |
| return self._pos |
|
|
| def close(self): |
| close(self._fd) |
|
|
|
|
| class NullStream: |
|
|
| """A stream that does nothing but providing a stream interface. |
| Use it like /dev/null""" |
| __slots__ = tuple() |
|
|
| def read(self, size=0): |
| return '' |
|
|
| def close(self): |
| pass |
|
|
| def write(self, data): |
| return len(data) |
|
|
|
|
| |
|
|