diff --git a/src/parsers/archivers/rpm/UnpackParser.py b/src/parsers/archivers/rpm/UnpackParser.py index 12a406446..82326d6a2 100644 --- a/src/parsers/archivers/rpm/UnpackParser.py +++ b/src/parsers/archivers/rpm/UnpackParser.py @@ -20,17 +20,14 @@ # version 3 # SPDX-License-Identifier: AGPL-3.0-only - +import bz2 +import gzip +import lzma import os import pathlib import shutil import tempfile - -from bangunpack import unpack_gzip -from bangunpack import unpack_bzip2 -from bangunpack import unpack_xz -from bangunpack import unpack_lzma -from bangunpack import unpack_zstd +import zstandard from parsers.archivers.cpio import UnpackParser as cpio_unpack @@ -59,77 +56,88 @@ def parse(self): self.data.lead.type == rpm.Rpm.RpmTypes.source, "invalid RPM type") + # The default compressor is either gzip or XZ (on Fedora). Other + # supported compressors are bzip2, LZMA and zstd (recent addition). + # The default compressor is gzip. + self.compressor = 'gzip' + + # at most one compressor and payload format can be defined self.compressor_seen = False self.payload_format = '' - # at most one compressor can be defined for i in self.data.header.index_records: if i.header_tag == rpm.Rpm.HeaderTags.payload_compressor: check_condition(not self.compressor_seen, "duplicate compressor defined") self.compressor_seen = True + self.compressor = i.body.values[0] if i.header_tag == rpm.Rpm.HeaderTags.payload_format: check_condition(self.payload_format == '', "duplicate compressor defined") self.payload_format = i.body.values[0] + # test decompress the payload + if self.compressor == 'bzip2': + decompressor = bz2.BZ2Decompressor() + try: + payload = decompressor.decompress(self.data.payload) + except Exception as e: + raise UnpackParserException(e.args) + elif self.compressor == 'xz' or self.compressor == 'lzma': + try: + payload = lzma.decompress(self.data.payload) + except Exception as e: + raise UnpackParserException(e.args) + elif self.compressor == 'zstd': + try: + reader = zstandard.ZstdDecompressor().stream_reader(self.data.payload) + payload = reader.read() + except Exception as e: + raise UnpackParserException(e.args) + else: + try: + payload = gzip.decompress(self.data.payload) + except Exception as e: + raise UnpackParserException(e.args) def unpack(self): unpacked_files = [] if self.payload_format not in ['cpio', 'drpm']: return unpacked_files - # then unpack the file. This depends on the compressor and the - # payload format. The default compressor is either gzip or XZ - # (on Fedora). Other supported compressors are bzip2, LZMA and - # zstd (recent addition). - if not self.compressor_seen: - # if not defined fall back to gzip - compressor = 'gzip' + if self.compressor == 'bzip2': + decompressor = bz2.BZ2Decompressor() + payload = decompressor.decompress(self.data.payload) + elif self.compressor == 'xz' or self.compressor == 'lzma': + payload = lzma.decompress(self.data.payload) + elif self.compressor == 'zstd': + reader = zstandard.ZstdDecompressor().stream_reader(self.data.payload) + payload = reader.read() else: - for i in self.data.header.index_records: - if i.header_tag == rpm.Rpm.HeaderTags.payload_compressor: - compressor = i.body.values[0] - break - - # write the payload to a temporary file first - temporary_file = tempfile.mkstemp(dir=self.scan_environment.temporarydirectory) - os.write(temporary_file[0], self.data.payload) - os.fdopen(temporary_file[0]).close() - - fr = FileResult(None, temporary_file[1], set([])) - fr.set_filesize(len(self.data.payload)) - - if compressor == 'gzip': - unpackresult = unpack_gzip(fr, self.scan_environment, 0, self.rel_unpack_dir) - elif compressor == 'bzip2': - unpackresult = unpack_bzip2(fr, self.scan_environment, 0, self.rel_unpack_dir) - elif compressor == 'xz': - unpackresult = unpack_xz(fr, self.scan_environment, 0, self.rel_unpack_dir) - elif compressor == 'lzma': - unpackresult = unpack_lzma(fr, self.scan_environment, 0, self.rel_unpack_dir) - elif compressor == 'zstd': - unpackresult = unpack_zstd(fr, self.scan_environment, 0, self.rel_unpack_dir) - else: - # gzip is default - unpackresult = unpack_gzip(fr, self.scan_environment, 0, self.rel_unpack_dir) - os.unlink(temporary_file[1]) - - payloadfile = unpackresult['filesandlabels'][0][0] - payloadfile_full = self.scan_environment.unpack_path(payloadfile) + payload = gzip.decompress(self.data.payload) if self.payload_format == 'drpm': - fr = FileResult(self.fileresult, self.rel_unpack_dir / os.path.basename(payloadfile), set()) + out_labels = [] + file_path = pathlib.Path('drpm') + outfile_rel = self.rel_unpack_dir / file_path + outfile_full = self.scan_environment.unpack_path(outfile_rel) + os.makedirs(outfile_full.parent, exist_ok=True) + outfile = open(outfile_full, 'wb') + outfile.write(payload) + outfile.close() + fr = FileResult(self.fileresult, self.rel_unpack_dir / file_path, set(out_labels)) unpacked_files.append(fr) else: - # first move the payload file to a different location - # to avoid any potential name clashes - payloadsize = payloadfile_full.stat().st_size - payloaddir = pathlib.Path(tempfile.mkdtemp(dir=self.scan_environment.temporarydirectory)) - shutil.move(str(payloadfile_full), payloaddir) + # write the payload to a temporary file first + temporary_file = tempfile.mkstemp(dir=self.scan_environment.temporarydirectory) + os.write(temporary_file[0], payload) + os.fdopen(temporary_file[0]).close() + + payloadfile = temporary_file[1] + payloadfile_full = self.scan_environment.unpack_path(payloadfile) # create a file result object and pass it to the CPIO unpacker fr = FileResult(self.fileresult, - payloaddir / os.path.basename(payloadfile), + payloadfile, set([])) - fr.set_filesize(payloadsize) + fr.set_filesize(len(payload)) # assuming that the CPIO data is always in "new ascii" format cpio_parser = cpio_unpack.CpioNewAsciiUnpackParser(fr, self.scan_environment, self.rel_unpack_dir, 0) @@ -146,8 +154,6 @@ def unpack(self): i.parent_path = self.fileresult.filename unpacked_files.append(i) - shutil.rmtree(payloaddir) - return(unpacked_files)