Refactor the Python autopkgtests.
authorPeter Pentchev <roam@debian.org>
Sat, 30 May 2020 22:33:22 +0000 (01:33 +0300)
committerPeter Pentchev <roam@debian.org>
Sat, 30 May 2020 22:33:22 +0000 (01:33 +0300)
debian/rules
debian/tests/compile [deleted file]
debian/tests/control
debian/tests/dictionary [deleted file]
debian/tests/pychunk/__init__.py [new file with mode: 0644]
debian/tests/pychunk/common.py [new file with mode: 0644]
debian/tests/pychunk/compile.py [new file with mode: 0755]
debian/tests/pychunk/defs.py [new file with mode: 0644]
debian/tests/pychunk/roundtrip.py [new file with mode: 0755]
debian/tests/tox.ini [new file with mode: 0644]

index 8cd5c52ece60eed320464897ce42deced710ffb3..69bc1c63256ad34a1fd0278980bb488a8d3e569e 100755 (executable)
@@ -13,8 +13,8 @@ override_dh_auto_configure:
        dh_auto_configure -- -Dwith-openssl=disabled
 
 execute_after_dh_auto_test:
-       debian/tests/dictionary -- 'obj-${MARCH}/src' '/usr/share/dict/american-english'
-       debian/tests/dictionary -- 'obj-${MARCH}/src' "$$(readlink -f -- "$$(command -v gcc)")"
+       env PYTHONPATH=debian/tests python3 -B -m pychunk.roundtrip -d 'obj-${MARCH}/src' -f '/usr/share/dict/american-english'
+       env PYTHONPATH=debian/tests python3 -B -m pychunk.roundtrip -d 'obj-${MARCH}/src' -f "$$(readlink -f -- "$$(command -v gcc)")"
 
 override_dh_makeshlibs:
        dh_makeshlibs -- -c4
diff --git a/debian/tests/compile b/debian/tests/compile
deleted file mode 100755 (executable)
index 7986557..0000000
+++ /dev/null
@@ -1,391 +0,0 @@
-#!/usr/bin/python3
-"""Compile a test program."""
-
-import argparse
-import dataclasses
-import os
-import pathlib
-import re
-import subprocess
-import sys
-import tempfile
-
-from typing import Callable, Dict, List
-
-
-MAGIC = bytes([0, ord("Z"), ord("C"), ord("K"), ord("1")])
-
-RE_DATA_SIZE = re.compile(
-    r""" ^
-    Data \s+ size \s* : \s*
-    (?P<size> 0 | [1-9][0-9]* )
-    \s*
-    $ """,
-    re.X,
-)
-
-RE_CHUNK_COUNT = re.compile(
-    r""" ^
-    Chunk \s+ count \s* : \s*
-    (?P<count> 0 | [1-9][0-9]* )
-    \s*
-    $ """,
-    re.X,
-)
-
-RE_CHUNKS = re.compile(
-    r""" ^
-    \s+
-    Chunk \s+
-    Checksum \s+
-    Start \s+
-    Comp \s size \s+
-    Size \s*
-    $ """,
-    re.X,
-)
-
-RE_CHUNK = re.compile(
-    r""" ^
-    \s+
-    (?P<idx> 0 | [1-9][0-9]* ) \s+
-    (?P<cksum> \S+ ) \s+
-    (?P<start> 0 | [1-9][0-9]* ) \s+
-    (?P<comp_size> 0 | [1-9][0-9]* ) \s+
-    (?P<size> 0 | [1-9][0-9]* ) \s*
-    $ """,
-    re.X,
-)
-
-
-@dataclasses.dataclass(frozen=True)
-class Config:
-    """Runtime configuration."""
-
-    # pylint: disable=too-many-instance-attributes
-
-    tempd: pathlib.Path
-    source: pathlib.Path
-    obj: pathlib.Path
-    program: pathlib.Path
-    env: Dict[str, str]
-
-    orig: pathlib.Path
-    compressed: pathlib.Path
-    uncompressed: pathlib.Path
-
-
-@dataclasses.dataclass(frozen=True)
-class Chunk:
-    """A single chunk descriptor."""
-
-    cstart: int
-    start: int
-    csize: int
-    size: int
-    cend: int
-    end: int
-
-
-def get_runenv() -> Dict[str, str]:
-    """Set up the environment for running the zchunk programs."""
-    env = dict(os.environ)
-    env["LC_ALL"] = "C.UTF-8"
-    env["LANGUAGE"] = ""
-    return env
-
-
-def parse_args(dirname: str) -> Config:
-    """Parse the command-line arguments, deduce some things."""
-    parser = argparse.ArgumentParser(prog="dictionary")
-    parser.add_argument(
-        "source", type=str, help="path to the test program source file",
-    )
-    parser.add_argument(
-        "filename", type=str, help="path to the filename to compress"
-    )
-
-    args = parser.parse_args()
-
-    tempd = pathlib.Path(dirname).absolute()
-    return Config(
-        tempd=tempd,
-        source=pathlib.Path(args.source),
-        obj=tempd / "chunk.o",
-        program=tempd / "chunk",
-        env=get_runenv(),
-        orig=pathlib.Path(args.filename).absolute(),
-        compressed=tempd / "words.txt.zck",
-        uncompressed=tempd / "chunk.txt",
-    )
-
-
-def do_compile(cfg: Config) -> None:
-    """Compile the test program."""
-    print("Fetching the C compiler flags for zck")
-    cflags = (
-        subprocess.check_output(
-            ["pkg-config", "--cflags", "zck"], shell=False, env=cfg.env
-        )
-        .decode("UTF-8")
-        .rstrip("\r\n")
-    )
-    if "\r" in cflags or "\n" in cflags:
-        sys.exit(f"`pkg-config --cflags zck` returned {cflags!r}")
-
-    if cfg.obj.exists():
-        sys.exit(f"Did not expect {cfg.obj} to exist")
-    cmd = f"cc -c -o '{cfg.obj}' {cflags} '{cfg.source}'"
-    print(f"Running {cmd!r}")
-    subprocess.check_call(cmd, shell=True, env=cfg.env)
-    if not cfg.obj.is_file():
-        sys.exit(f"{cmd!r} did not create the {cfg.obj} file")
-
-    print("Fetching the C linker flags and libraries for zck")
-    libs = (
-        subprocess.check_output(
-            ["pkg-config", "--libs", "zck"], shell=False, env=cfg.env
-        )
-        .decode("UTF-8")
-        .rstrip("\r\n")
-    )
-    if "\r" in libs or "\n" in libs:
-        sys.exit(f"`pkg-config --libs zck` returned {libs!r}")
-
-    if cfg.program.exists():
-        sys.exit(f"Did not expect {cfg.program} to exist")
-    cmd = f"cc -o '{cfg.program}' '{cfg.obj}' {libs}"
-    print(f"Running {cmd!r}")
-    subprocess.check_call(cmd, shell=True, env=cfg.env)
-    if not cfg.program.is_file():
-        sys.exit(f"{cmd!r} did not create the {cfg.program} file")
-    if not os.access(cfg.program, os.X_OK):
-        sys.exit(f"Not an executable file: {cfg.program}")
-    print(f"Looks like we got {cfg.program}")
-
-
-def do_compress(cfg: Config, orig_size: int) -> int:
-    """Compress the original file."""
-    print(f"About to compress {cfg.orig} to {cfg.compressed}")
-    if cfg.compressed.exists():
-        sys.exit(f"Did not expect {cfg.compressed} to exist")
-    subprocess.check_call(
-        ["zck", "-o", cfg.compressed, "--", cfg.orig],
-        shell=False,
-        env=cfg.env,
-    )
-    if not cfg.compressed.is_file():
-        sys.exit(f"zck did not create the {cfg.compressed} file")
-    comp_size = cfg.compressed.stat().st_size
-    print(f"{cfg.compressed} size is {comp_size} bytes long")
-    if comp_size >= orig_size:
-        sys.exit(
-            f"sizeof({cfg.compressed}) == {comp_size} : "
-            f"sizeof({cfg.orig}) == {orig_size}"
-        )
-    start = cfg.compressed.open(mode="rb").read(5)
-    print(f"{cfg.compressed} starts with {start!r}")
-    if start != MAGIC:
-        sys.exit(f"{cfg.compressed} does not start with {MAGIC!r}: {start!r}")
-
-    return comp_size
-
-
-def read_chunks(cfg: Config, orig_size: int, comp_size: int) -> Chunk:
-    """Parse the chunks of the compressed file."""
-    # pylint: disable=too-many-statements
-    output = subprocess.check_output(
-        ["zck_read_header", "-c", "--", cfg.compressed],
-        shell=False,
-        env=cfg.env,
-    ).decode("UTF-8")
-
-    params: Dict[str, int] = {}
-    chunks: List[Chunk] = []
-
-    def ignore_till_end(line: str) -> str:
-        """Ignore anything until EOF."""
-        raise NotImplementedError(line)
-
-    def parse_chunk(line: str) -> str:
-        """Parse a single chunk line."""
-        # pylint: disable=too-many-branches
-        data = RE_CHUNK.match(line)
-        if not data:
-            sys.exit(f"Unexpected line for chunk {len(chunks)}: {line!r}")
-        idx = int(data.group("idx"))
-        start = int(data.group("start"))
-        csize = int(data.group("comp_size"))
-        size = int(data.group("size"))
-
-        if idx != len(chunks):
-            sys.exit(f"Expected index {len(chunks)}: {line!r}")
-        if chunks:
-            last_chunk = chunks[-1]
-            if start != last_chunk.cend:
-                sys.exit(f"Expected start {last_chunk.cend}: {line!r}")
-        else:
-            if start != params["size_diff"]:
-                sys.exit(f"Expected start {params['size_diff']}: {line!r}")
-            last_chunk = Chunk(
-                cstart=0,
-                start=0,
-                csize=0,
-                size=0,
-                cend=params["size_diff"],
-                end=0,
-            )
-
-        next_chunk = Chunk(
-            cstart=start,
-            start=last_chunk.end,
-            csize=csize,
-            size=size,
-            cend=last_chunk.cend + csize,
-            end=last_chunk.end + size,
-        )
-        if next_chunk.cend > comp_size:
-            sys.exit(
-                f"Compressed size overflow: {next_chunk.cend} > {comp_size}"
-            )
-
-        more = idx + 1 != params["chunk_count"]
-        if more:
-            if next_chunk.end >= orig_size:
-                sys.exit(
-                    f"Original size overflow: "
-                    f"{next_chunk.end} >= {orig_size}"
-                )
-        else:
-            if next_chunk.cend != comp_size:
-                sys.exit(
-                    f"Compressed size mismatch: "
-                    f"{next_chunk.cend} != {comp_size}"
-                )
-            if next_chunk.end != orig_size:
-                sys.exit(
-                    f"Original size mismatch: "
-                    f"{next_chunk.end} != {orig_size}"
-                )
-
-        print(f"- appending {next_chunk!r}")
-        chunks.append(next_chunk)
-
-        if more:
-            return "parse_chunk"
-        return "ignore_till_end"
-
-    def wait_for_chunks(line: str) -> str:
-        """Wait for the 'Chunks:' line."""
-        if not RE_CHUNKS.match(line):
-            return "wait_for_chunks"
-
-        return "parse_chunk"
-
-    def wait_for_chunk_count(line: str) -> str:
-        """Wait for the 'chunk count' line."""
-        data = RE_CHUNK_COUNT.match(line)
-        if not data:
-            return "wait_for_chunk_count"
-        print(f"- got a chunk count: {data.groupdict()!r}")
-
-        count = int(data.group("count"))
-        if count < 1:
-            sys.exit(f"zck_read_header said chunk count {count}")
-        params["chunk_count"] = count
-
-        return "wait_for_chunks"
-
-    def wait_for_total_size(line: str) -> str:
-        """Wait for the 'data size' line."""
-        data = RE_DATA_SIZE.match(line)
-        if not data:
-            return "wait_for_total_size"
-        print(f"- got a size line: {data.groupdict()!r}")
-
-        size = int(data.group("size"))
-        if size < 1 or size > comp_size:
-            sys.exit(
-                f"zck_read_header said data size {size} (comp {comp_size})"
-            )
-        params["size_diff"] = comp_size - size
-
-        return "wait_for_chunk_count"
-
-    handlers: Dict[str, Callable[[str], str]] = {
-        func.__name__: func
-        for func in (
-            wait_for_total_size,
-            wait_for_chunk_count,
-            wait_for_chunks,
-            parse_chunk,
-            ignore_till_end,
-        )
-    }
-
-    handler: Callable[[str], str] = wait_for_total_size
-
-    for line in output.splitlines():
-        print(f"- read a line: {line}")
-        new_handler = handler(line)
-        assert new_handler in handlers, new_handler
-        handler = handlers[new_handler]
-
-    if handler != ignore_till_end:  # pylint: disable=comparison-with-callable
-        sys.exit(f"handler is {handler!r} instead of {ignore_till_end!r}")
-
-    # Now let's find the second chunk
-    return next(chunk for chunk in chunks if chunk.start > 0)
-
-
-def run_program(cfg: Config) -> None:
-    """Run the test program, hopefully generate the chunk file."""
-    print(f"About to run {cfg.program}")
-    if cfg.uncompressed.exists():
-        sys.exit(f"Did not expect {cfg.uncompressed} to exist")
-    subprocess.check_call(
-        [cfg.program, cfg.compressed, cfg.uncompressed],
-        shell=False,
-        env=cfg.env,
-    )
-    if not cfg.uncompressed.is_file():
-        sys.exit(f"{cfg.program} did not create the {cfg.uncompressed} file")
-
-
-def compare_chunk(cfg: Config, second: Chunk, orig_size: int) -> None:
-    """Read data from the input file and the chunk."""
-    # OK, let's load it all into memory, mmkay?
-    contents = cfg.orig.read_bytes()
-    if len(contents) != orig_size:
-        sys.exit(
-            f"Could not read {orig_size} bytes from {cfg.orig}, "
-            f"read {len(contents)}"
-        )
-    chunk = cfg.uncompressed.read_bytes()
-    if len(chunk) != second.size:
-        sys.exit(
-            f"Could not read {second.size} bytes from {cfg.uncompressed}, "
-            f"read {len(chunk)}"
-        )
-
-    if contents[second.start : second.start + second.size] != chunk:
-        sys.exit("Mismatch!")
-
-
-def main() -> None:
-    """Parse arguments, compile a program, compress a file, test it."""
-    with tempfile.TemporaryDirectory() as dirname:
-        print(f"Using temporary directory {dirname}")
-        cfg = parse_args(dirname)
-        do_compile(cfg)
-        orig_size = cfg.orig.stat().st_size
-        print(f"Original file size: {orig_size}")
-        comp_size = do_compress(cfg, orig_size)
-        second_chunk = read_chunks(cfg, orig_size, comp_size)
-        run_program(cfg)
-        compare_chunk(cfg, second_chunk, orig_size)
-        print("Seems fine!")
-
-
-if __name__ == "__main__":
-    main()
index aeaf0481939b145a07b4b0e83e9317aad80dc180..4e6af83639370c2b249cf686d6773b96fc683f8d 100644 (file)
@@ -1,7 +1,7 @@
-Test-Command: debian/tests/dictionary /usr/bin /usr/share/dict/american-english
+Test-Command: env PYTHONPATH=debian/tests python3 -B -m pychunk.roundtrip -d /usr/bin -f /usr/share/dict/american-english
 Depends: @, python3, wamerican
 Features: test-name=debian-dict
 
-Test-Command: debian/tests/compile debian/tests/chunk.c /usr/share/dict/american-english
+Test-Command: env PYTHONPATH=debian/tests python3 -B -m pychunk.compile -d /usr/bin -f /usr/share/dict/american-english debian/tests/chunk.c
 Depends: @, build-essential, pkg-config, python3, wamerican
 Features: test-name=debian-compile
diff --git a/debian/tests/dictionary b/debian/tests/dictionary
deleted file mode 100755 (executable)
index 957c130..0000000
+++ /dev/null
@@ -1,352 +0,0 @@
-#!/usr/bin/python3
-"""A very simple test for the command-line zchunk tools."""
-
-import argparse
-import dataclasses
-import os
-import pathlib
-import re
-import subprocess
-import sys
-import tempfile
-
-from typing import Callable, Dict, List, Tuple
-
-
-MAGIC = bytes([0, ord("Z"), ord("C"), ord("K"), ord("1")])
-
-RE_DATA_SIZE = re.compile(
-    r""" ^
-    Data \s+ size \s* : \s*
-    (?P<size> 0 | [1-9][0-9]* )
-    \s*
-    $ """,
-    re.X,
-)
-
-RE_CHUNK_COUNT = re.compile(
-    r""" ^
-    Chunk \s+ count \s* : \s*
-    (?P<count> 0 | [1-9][0-9]* )
-    \s*
-    $ """,
-    re.X,
-)
-
-RE_CHUNKS = re.compile(
-    r""" ^
-    \s+
-    Chunk \s+
-    Checksum \s+
-    Start \s+
-    Comp \s size \s+
-    Size \s*
-    $ """,
-    re.X,
-)
-
-RE_CHUNK = re.compile(
-    r""" ^
-    \s+
-    (?P<idx> 0 | [1-9][0-9]* ) \s+
-    (?P<cksum> \S+ ) \s+
-    (?P<start> 0 | [1-9][0-9]* ) \s+
-    (?P<comp_size> 0 | [1-9][0-9]* ) \s+
-    (?P<size> 0 | [1-9][0-9]* ) \s*
-    $ """,
-    re.X,
-)
-
-
-@dataclasses.dataclass(frozen=True)
-class Config:
-    """Runtime configuration."""
-
-    tempd: pathlib.Path
-    bindir: pathlib.Path
-    env: Dict[str, str]
-
-    orig: pathlib.Path
-    compressed: pathlib.Path
-    uncompressed: pathlib.Path
-    recompressed: pathlib.Path
-
-
-def get_runenv() -> Dict[str, str]:
-    """Set up the environment for running the zchunk programs."""
-    env = dict(os.environ)
-    env["LC_ALL"] = "C.UTF-8"
-    env["LANGUAGE"] = ""
-    return env
-
-
-def parse_args(dirname: str) -> Config:
-    """Parse the command-line arguments, deduce some things."""
-    parser = argparse.ArgumentParser(prog="dictionary")
-    parser.add_argument(
-        "bindir",
-        type=str,
-        help="path to the directory containing the zchunk binaries",
-    )
-    parser.add_argument(
-        "filename", type=str, help="path to the filename to compress"
-    )
-
-    args = parser.parse_args()
-    bindir = pathlib.Path(args.bindir).absolute()
-    if not bindir.is_dir():
-        sys.exit(f"Not a directory: {bindir}")
-    zck = bindir / "zck"
-    if not zck.is_file() or not os.access(zck, os.X_OK):
-        sys.exit(f"Not an executable file: {zck}")
-
-    tempd = pathlib.Path(dirname).absolute()
-    return Config(
-        tempd=tempd,
-        bindir=bindir,
-        env=get_runenv(),
-        orig=pathlib.Path(args.filename).absolute(),
-        compressed=tempd / "words.txt.zck",
-        uncompressed=tempd / "un/words.txt",
-        recompressed=tempd / "re/words.txt.zck",
-    )
-
-
-def do_compress(cfg: Config, orig_size: int) -> int:
-    """Compress the original file."""
-    print(f"About to compress {cfg.orig} to {cfg.compressed}")
-    if cfg.compressed.exists():
-        sys.exit(f"Did not expect {cfg.compressed} to exist")
-    subprocess.check_call(
-        [cfg.bindir / "zck", "-o", cfg.compressed, "--", cfg.orig],
-        shell=False,
-        env=cfg.env,
-    )
-    if not cfg.compressed.is_file():
-        sys.exit(f"zck did not create the {cfg.compressed} file")
-    comp_size = cfg.compressed.stat().st_size
-    print(f"{cfg.compressed} size is {comp_size} bytes long")
-    if comp_size >= orig_size:
-        sys.exit(
-            f"sizeof({cfg.compressed}) == {comp_size} : "
-            f"sizeof({cfg.orig}) == {orig_size}"
-        )
-    start = cfg.compressed.open(mode="rb").read(5)
-    print(f"{cfg.compressed} starts with {start!r}")
-    if start != MAGIC:
-        sys.exit(f"{cfg.compressed} does not start with {MAGIC!r}: {start!r}")
-
-    return comp_size
-
-
-def read_chunks(cfg: Config, orig_size: int, comp_size: int) -> None:
-    """Parse the chunks of the compressed file."""
-    # pylint: disable=too-many-statements
-    output = subprocess.check_output(
-        [cfg.bindir / "zck_read_header", "-c", "--", cfg.compressed],
-        shell=False,
-        env=cfg.env,
-    ).decode("UTF-8")
-
-    params: Dict[str, int] = {}
-    chunks: List[Tuple[int, int, int, int, int]] = []
-
-    def ignore_till_end(line: str) -> str:
-        """Ignore anything until EOF."""
-        raise NotImplementedError(line)
-
-    def parse_chunk(line: str) -> str:
-        """Parse a single chunk line."""
-        # pylint: disable=too-many-branches
-        data = RE_CHUNK.match(line)
-        if not data:
-            sys.exit(f"Unexpected line for chunk {len(chunks)}: {line!r}")
-        idx = int(data.group("idx"))
-        start = int(data.group("start"))
-        csize = int(data.group("comp_size"))
-        size = int(data.group("size"))
-
-        if idx != len(chunks):
-            sys.exit(f"Expected index {len(chunks)}: {line!r}")
-        if chunks:
-            last_chunk = chunks[-1]
-            if start != last_chunk[3]:
-                sys.exit(f"Expected start {last_chunk[3]}: {line!r}")
-        else:
-            if start != params["size_diff"]:
-                sys.exit(f"Expected start {params['size_diff']}: {line!r}")
-            last_chunk = (0, 0, 0, params["size_diff"], 0)
-
-        next_chunk = (
-            start,
-            csize,
-            size,
-            last_chunk[3] + csize,
-            last_chunk[4] + size,
-        )
-        if next_chunk[3] > comp_size:
-            sys.exit(
-                f"Compressed size overflow: {next_chunk[3]} > {comp_size}"
-            )
-
-        more = idx + 1 != params["chunk_count"]
-        if more:
-            if next_chunk[4] >= orig_size:
-                sys.exit(
-                    f"Original size overflow: {next_chunk[4]} >= {orig_size}"
-                )
-        else:
-            if next_chunk[3] != comp_size:
-                sys.exit(
-                    f"Compressed size mismatch: {next_chunk[3]} != {comp_size}"
-                )
-            if next_chunk[4] != orig_size:
-                sys.exit(
-                    f"Original size mismatch: {next_chunk[4]} != {orig_size}"
-                )
-
-        print(f"- appending {next_chunk!r}")
-        chunks.append(next_chunk)
-
-        if more:
-            return "parse_chunk"
-        return "ignore_till_end"
-
-    def wait_for_chunks(line: str) -> str:
-        """Wait for the 'Chunks:' line."""
-        if not RE_CHUNKS.match(line):
-            return "wait_for_chunks"
-
-        return "parse_chunk"
-
-    def wait_for_chunk_count(line: str) -> str:
-        """Wait for the 'chunk count' line."""
-        data = RE_CHUNK_COUNT.match(line)
-        if not data:
-            return "wait_for_chunk_count"
-        print(f"- got a chunk count: {data.groupdict()!r}")
-
-        count = int(data.group("count"))
-        if count < 1:
-            sys.exit(f"zck_read_header said chunk count {count}")
-        params["chunk_count"] = count
-
-        return "wait_for_chunks"
-
-    def wait_for_total_size(line: str) -> str:
-        """Wait for the 'data size' line."""
-        data = RE_DATA_SIZE.match(line)
-        if not data:
-            return "wait_for_total_size"
-        print(f"- got a size line: {data.groupdict()!r}")
-
-        size = int(data.group("size"))
-        if size < 1 or size > comp_size:
-            sys.exit(
-                f"zck_read_header said data size {size} (comp {comp_size})"
-            )
-        params["size_diff"] = comp_size - size
-
-        return "wait_for_chunk_count"
-
-    handlers: Dict[str, Callable[[str], str]] = {
-        func.__name__: func
-        for func in (
-            wait_for_total_size,
-            wait_for_chunk_count,
-            wait_for_chunks,
-            parse_chunk,
-            ignore_till_end,
-        )
-    }
-
-    handler: Callable[[str], str] = wait_for_total_size
-
-    for line in output.splitlines():
-        print(f"- read a line: {line}")
-        new_handler = handler(line)
-        assert new_handler in handlers, new_handler
-        handler = handlers[new_handler]
-
-    if handler != ignore_till_end:  # pylint: disable=comparison-with-callable
-        sys.exit(f"handler is {handler!r} instead of {ignore_till_end!r}")
-
-
-def do_uncompress(cfg: Config, orig_size: int) -> None:
-    """Uncompress and compare."""
-    # OK, so unzck's behavior is... weird.
-    cfg.uncompressed.parent.mkdir(mode=0o755)
-
-    print(f"Extracting {cfg.compressed} to {cfg.uncompressed}")
-    if cfg.uncompressed.exists():
-        sys.exit(f"Did not expect {cfg.uncompressed} to exist")
-    subprocess.check_call(
-        [cfg.bindir / "unzck", "--", cfg.compressed],
-        shell=False,
-        env=cfg.env,
-        cwd=cfg.uncompressed.parent,
-    )
-    if not cfg.uncompressed.is_file():
-        subprocess.check_call(["ls", "-lt", "--", cfg.tempd], shell=False)
-        sys.exit(f"unzck did not create the {cfg.uncompressed} file")
-
-    new_size = cfg.uncompressed.stat().st_size
-    print(f"Uncompressed size {new_size}")
-    if new_size != orig_size:
-        sys.exit(f"Uncompressed size {new_size} != original size {orig_size}")
-
-    print(f"Comparing {cfg.orig} to {cfg.uncompressed}")
-    subprocess.check_call(
-        ["cmp", "--", cfg.orig, cfg.uncompressed], shell=False, env=cfg.env
-    )
-
-
-def do_recompress(cfg: Config, comp_size: int) -> None:
-    """Recompress the file and compare."""
-    # OK, so zck's behavior is also weird...
-    cfg.recompressed.parent.mkdir(mode=0o755)
-
-    print(f"Recompressing {cfg.uncompressed} to {cfg.recompressed}")
-    if cfg.recompressed.exists():
-        sys.exit(f"Did not expect {cfg.recompressed} to exist")
-    subprocess.check_call(
-        [cfg.bindir / "zck", "--", cfg.uncompressed],
-        shell=False,
-        env=cfg.env,
-        cwd=cfg.recompressed.parent,
-    )
-    if not cfg.recompressed.is_file():
-        sys.exit(f"zck did not create the {cfg.recompressed} file")
-
-    new_size = cfg.recompressed.stat().st_size
-    print(f"Recompressed size {new_size}")
-    if new_size != comp_size:
-        sys.exit(
-            f"Recompressed size {new_size} != compressed size {comp_size}"
-        )
-
-    print(f"Comparing {cfg.compressed} to {cfg.recompressed}")
-    subprocess.check_call(
-        ["cmp", "--", cfg.compressed, cfg.recompressed],
-        shell=False,
-        env=cfg.env,
-    )
-
-
-def main() -> None:
-    """Create a temporary directory, compress a file, analyze it."""
-    with tempfile.TemporaryDirectory() as dirname:
-        print(f"Using temporary directory {dirname}")
-        cfg = parse_args(dirname)
-        orig_size = cfg.orig.stat().st_size
-        print(f"{cfg.orig} is {orig_size} bytes long")
-
-        comp_size = do_compress(cfg, orig_size)
-        read_chunks(cfg, orig_size, comp_size)
-        do_uncompress(cfg, orig_size)
-        do_recompress(cfg, comp_size)
-        print("Seems fine!")
-
-
-if __name__ == "__main__":
-    main()
diff --git a/debian/tests/pychunk/__init__.py b/debian/tests/pychunk/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/debian/tests/pychunk/common.py b/debian/tests/pychunk/common.py
new file mode 100644 (file)
index 0000000..acf00b2
--- /dev/null
@@ -0,0 +1,240 @@
+"""Common routines for the Python zchunk tests."""
+
+import argparse
+import dataclasses
+import os
+import pathlib
+import subprocess
+import sys
+
+from typing import Callable, Dict, List
+
+from pychunk import defs
+
+
+@dataclasses.dataclass(frozen=True)
+class Config:
+    """Common runtime configuration settings."""
+
+    bindir: pathlib.Path
+    env: Dict[str, str]
+
+    orig: pathlib.Path
+    compressed: pathlib.Path
+
+
+@dataclasses.dataclass(frozen=True)
+class Chunk:
+    """A single chunk descriptor."""
+
+    cstart: int
+    start: int
+    csize: int
+    size: int
+    cend: int
+    end: int
+
+
+def get_runenv() -> Dict[str, str]:
+    """Set up the environment for running the zchunk programs."""
+    env = dict(os.environ)
+    env["LC_ALL"] = "C.UTF-8"
+    env["LANGUAGE"] = ""
+    return env
+
+
+def base_parser(prog: str) -> argparse.ArgumentParser:
+    """Create a parser with the common options."""
+    parser = argparse.ArgumentParser(prog=prog)
+    parser.add_argument(
+        "-d",
+        "--bindir",
+        type=str,
+        required=True,
+        help="path to the directory containing the zchunk tools",
+    )
+    parser.add_argument(
+        "-f",
+        "--filename",
+        type=str,
+        required=True,
+        help="path to the filename to compress",
+    )
+
+    return parser
+
+
+def do_compress(cfg: Config, orig_size: int) -> int:
+    """Compress the original file."""
+    print(f"About to compress {cfg.orig} to {cfg.compressed}")
+    if cfg.compressed.exists():
+        sys.exit(f"Did not expect {cfg.compressed} to exist")
+    subprocess.check_call(
+        [cfg.bindir / "zck", "-o", cfg.compressed, "--", cfg.orig],
+        shell=False,
+        env=cfg.env,
+    )
+    if not cfg.compressed.is_file():
+        sys.exit(f"zck did not create the {cfg.compressed} file")
+    comp_size = cfg.compressed.stat().st_size
+    print(f"{cfg.compressed} size is {comp_size} bytes long")
+    if comp_size >= orig_size:
+        sys.exit(
+            f"sizeof({cfg.compressed}) == {comp_size} : "
+            f"sizeof({cfg.orig}) == {orig_size}"
+        )
+    start = cfg.compressed.open(mode="rb").read(5)
+    print(f"{cfg.compressed} starts with {start!r}")
+    if start != defs.MAGIC:
+        sys.exit(
+            f"{cfg.compressed} does not start with {defs.MAGIC!r}: {start!r}"
+        )
+
+    return comp_size
+
+
+def read_chunks(cfg: Config, orig_size: int, comp_size: int) -> Chunk:
+    """Parse the chunks of the compressed file."""
+    # pylint: disable=too-many-statements
+    output = subprocess.check_output(
+        [cfg.bindir / "zck_read_header", "-c", "--", cfg.compressed],
+        shell=False,
+        env=cfg.env,
+    ).decode("UTF-8")
+
+    params: Dict[str, int] = {}
+    chunks: List[Chunk] = []
+
+    def ignore_till_end(line: str) -> str:
+        """Ignore anything until EOF."""
+        raise NotImplementedError(line)
+
+    def parse_chunk(line: str) -> str:
+        """Parse a single chunk line."""
+        # pylint: disable=too-many-branches
+        data = defs.RE_CHUNK.match(line)
+        if not data:
+            sys.exit(f"Unexpected line for chunk {len(chunks)}: {line!r}")
+        idx = int(data.group("idx"))
+        start = int(data.group("start"))
+        csize = int(data.group("comp_size"))
+        size = int(data.group("size"))
+
+        if idx != len(chunks):
+            sys.exit(f"Expected index {len(chunks)}: {line!r}")
+        if chunks:
+            last_chunk = chunks[-1]
+            if start != last_chunk.cend:
+                sys.exit(f"Expected start {last_chunk.cend}: {line!r}")
+        else:
+            if start != params["size_diff"]:
+                sys.exit(f"Expected start {params['size_diff']}: {line!r}")
+            last_chunk = Chunk(
+                cstart=0,
+                start=0,
+                csize=0,
+                size=0,
+                cend=params["size_diff"],
+                end=0,
+            )
+
+        next_chunk = Chunk(
+            cstart=start,
+            start=last_chunk.end,
+            csize=csize,
+            size=size,
+            cend=last_chunk.cend + csize,
+            end=last_chunk.end + size,
+        )
+        if next_chunk.cend > comp_size:
+            sys.exit(
+                f"Compressed size overflow: {next_chunk.cend} > {comp_size}"
+            )
+
+        more = idx + 1 != params["chunk_count"]
+        if more:
+            if next_chunk.end >= orig_size:
+                sys.exit(
+                    f"Original size overflow: "
+                    f"{next_chunk.end} >= {orig_size}"
+                )
+        else:
+            if next_chunk.cend != comp_size:
+                sys.exit(
+                    f"Compressed size mismatch: "
+                    f"{next_chunk.cend} != {comp_size}"
+                )
+            if next_chunk.end != orig_size:
+                sys.exit(
+                    f"Original size mismatch: "
+                    f"{next_chunk.end} != {orig_size}"
+                )
+
+        print(f"- appending {next_chunk!r}")
+        chunks.append(next_chunk)
+
+        if more:
+            return "parse_chunk"
+        return "ignore_till_end"
+
+    def wait_for_chunks(line: str) -> str:
+        """Wait for the 'Chunks:' line."""
+        if not defs.RE_CHUNKS.match(line):
+            return "wait_for_chunks"
+
+        return "parse_chunk"
+
+    def wait_for_chunk_count(line: str) -> str:
+        """Wait for the 'chunk count' line."""
+        data = defs.RE_CHUNK_COUNT.match(line)
+        if not data:
+            return "wait_for_chunk_count"
+        print(f"- got a chunk count: {data.groupdict()!r}")
+
+        count = int(data.group("count"))
+        if count < 1:
+            sys.exit(f"zck_read_header said chunk count {count}")
+        params["chunk_count"] = count
+
+        return "wait_for_chunks"
+
+    def wait_for_total_size(line: str) -> str:
+        """Wait for the 'data size' line."""
+        data = defs.RE_DATA_SIZE.match(line)
+        if not data:
+            return "wait_for_total_size"
+        print(f"- got a size line: {data.groupdict()!r}")
+
+        size = int(data.group("size"))
+        if size < 1 or size > comp_size:
+            sys.exit(
+                f"zck_read_header said data size {size} (comp {comp_size})"
+            )
+        params["size_diff"] = comp_size - size
+
+        return "wait_for_chunk_count"
+
+    handlers: Dict[str, Callable[[str], str]] = {
+        func.__name__: func
+        for func in (
+            wait_for_total_size,
+            wait_for_chunk_count,
+            wait_for_chunks,
+            parse_chunk,
+            ignore_till_end,
+        )
+    }
+
+    handler: Callable[[str], str] = wait_for_total_size
+
+    for line in output.splitlines():
+        print(f"- read a line: {line}")
+        new_handler = handler(line)
+        assert new_handler in handlers, new_handler
+        handler = handlers[new_handler]
+
+    if handler != ignore_till_end:  # pylint: disable=comparison-with-callable
+        sys.exit(f"handler is {handler!r} instead of {ignore_till_end!r}")
+
+    # Now let's find the second chunk
+    return next(chunk for chunk in chunks if chunk.start > 0)
diff --git a/debian/tests/pychunk/compile.py b/debian/tests/pychunk/compile.py
new file mode 100755 (executable)
index 0000000..33d4a1c
--- /dev/null
@@ -0,0 +1,144 @@
+"""Compile a test program."""
+
+import dataclasses
+import os
+import pathlib
+import subprocess
+import sys
+import tempfile
+
+from pychunk import common
+
+
+@dataclasses.dataclass(frozen=True)
+class Config(common.Config):
+    """Runtime configuration."""
+
+    # pylint: disable=too-many-instance-attributes
+
+    tempd: pathlib.Path
+    source: pathlib.Path
+    obj: pathlib.Path
+    program: pathlib.Path
+
+    uncompressed: pathlib.Path
+
+
+def parse_args(dirname: str) -> Config:
+    """Parse the command-line arguments, deduce some things."""
+    parser = common.base_parser("compile")
+    parser.add_argument(
+        "source", type=str, help="path to the test program source file",
+    )
+
+    args = parser.parse_args()
+
+    tempd = pathlib.Path(dirname).absolute()
+    return Config(
+        tempd=tempd,
+        bindir=pathlib.Path(args.bindir),
+        source=pathlib.Path(args.source),
+        obj=tempd / "chunk.o",
+        program=tempd / "chunk",
+        env=common.get_runenv(),
+        orig=pathlib.Path(args.filename).absolute(),
+        compressed=tempd / "words.txt.zck",
+        uncompressed=tempd / "chunk.txt",
+    )
+
+
+def do_compile(cfg: Config) -> None:
+    """Compile the test program."""
+    print("Fetching the C compiler flags for zck")
+    cflags = (
+        subprocess.check_output(
+            ["pkg-config", "--cflags", "zck"], shell=False, env=cfg.env
+        )
+        .decode("UTF-8")
+        .rstrip("\r\n")
+    )
+    if "\r" in cflags or "\n" in cflags:
+        sys.exit(f"`pkg-config --cflags zck` returned {cflags!r}")
+
+    if cfg.obj.exists():
+        sys.exit(f"Did not expect {cfg.obj} to exist")
+    cmd = f"cc -c -o '{cfg.obj}' {cflags} '{cfg.source}'"
+    print(f"Running {cmd!r}")
+    subprocess.check_call(cmd, shell=True, env=cfg.env)
+    if not cfg.obj.is_file():
+        sys.exit(f"{cmd!r} did not create the {cfg.obj} file")
+
+    print("Fetching the C linker flags and libraries for zck")
+    libs = (
+        subprocess.check_output(
+            ["pkg-config", "--libs", "zck"], shell=False, env=cfg.env
+        )
+        .decode("UTF-8")
+        .rstrip("\r\n")
+    )
+    if "\r" in libs or "\n" in libs:
+        sys.exit(f"`pkg-config --libs zck` returned {libs!r}")
+
+    if cfg.program.exists():
+        sys.exit(f"Did not expect {cfg.program} to exist")
+    cmd = f"cc -o '{cfg.program}' '{cfg.obj}' {libs}"
+    print(f"Running {cmd!r}")
+    subprocess.check_call(cmd, shell=True, env=cfg.env)
+    if not cfg.program.is_file():
+        sys.exit(f"{cmd!r} did not create the {cfg.program} file")
+    if not os.access(cfg.program, os.X_OK):
+        sys.exit(f"Not an executable file: {cfg.program}")
+    print(f"Looks like we got {cfg.program}")
+
+
+def run_program(cfg: Config) -> None:
+    """Run the test program, hopefully generate the chunk file."""
+    print(f"About to run {cfg.program}")
+    if cfg.uncompressed.exists():
+        sys.exit(f"Did not expect {cfg.uncompressed} to exist")
+    subprocess.check_call(
+        [cfg.program, cfg.compressed, cfg.uncompressed],
+        shell=False,
+        env=cfg.env,
+    )
+    if not cfg.uncompressed.is_file():
+        sys.exit(f"{cfg.program} did not create the {cfg.uncompressed} file")
+
+
+def compare_chunk(cfg: Config, second: common.Chunk, orig_size: int) -> None:
+    """Read data from the input file and the chunk."""
+    # OK, let's load it all into memory, mmkay?
+    contents = cfg.orig.read_bytes()
+    if len(contents) != orig_size:
+        sys.exit(
+            f"Could not read {orig_size} bytes from {cfg.orig}, "
+            f"read {len(contents)}"
+        )
+    chunk = cfg.uncompressed.read_bytes()
+    if len(chunk) != second.size:
+        sys.exit(
+            f"Could not read {second.size} bytes from {cfg.uncompressed}, "
+            f"read {len(chunk)}"
+        )
+
+    if contents[second.start : second.start + second.size] != chunk:
+        sys.exit("Mismatch!")
+
+
+def main() -> None:
+    """Parse arguments, compile a program, compress a file, test it."""
+    with tempfile.TemporaryDirectory() as dirname:
+        print(f"Using temporary directory {dirname}")
+        cfg = parse_args(dirname)
+        do_compile(cfg)
+        orig_size = cfg.orig.stat().st_size
+        print(f"Original file size: {orig_size}")
+        comp_size = common.do_compress(cfg, orig_size)
+        second_chunk = common.read_chunks(cfg, orig_size, comp_size)
+        run_program(cfg)
+        compare_chunk(cfg, second_chunk, orig_size)
+        print("Seems fine!")
+
+
+if __name__ == "__main__":
+    main()
diff --git a/debian/tests/pychunk/defs.py b/debian/tests/pychunk/defs.py
new file mode 100644 (file)
index 0000000..a050c5f
--- /dev/null
@@ -0,0 +1,48 @@
+"""Definitions for the Python zchunk tests."""
+
+import re
+
+
+MAGIC = bytes([0, ord("Z"), ord("C"), ord("K"), ord("1")])
+
+RE_DATA_SIZE = re.compile(
+    r""" ^
+    Data \s+ size \s* : \s*
+    (?P<size> 0 | [1-9][0-9]* )
+    \s*
+    $ """,
+    re.X,
+)
+
+RE_CHUNK_COUNT = re.compile(
+    r""" ^
+    Chunk \s+ count \s* : \s*
+    (?P<count> 0 | [1-9][0-9]* )
+    \s*
+    $ """,
+    re.X,
+)
+
+RE_CHUNKS = re.compile(
+    r""" ^
+    \s+
+    Chunk \s+
+    Checksum \s+
+    Start \s+
+    Comp \s size \s+
+    Size \s*
+    $ """,
+    re.X,
+)
+
+RE_CHUNK = re.compile(
+    r""" ^
+    \s+
+    (?P<idx> 0 | [1-9][0-9]* ) \s+
+    (?P<cksum> \S+ ) \s+
+    (?P<start> 0 | [1-9][0-9]* ) \s+
+    (?P<comp_size> 0 | [1-9][0-9]* ) \s+
+    (?P<size> 0 | [1-9][0-9]* ) \s*
+    $ """,
+    re.X,
+)
diff --git a/debian/tests/pychunk/roundtrip.py b/debian/tests/pychunk/roundtrip.py
new file mode 100755 (executable)
index 0000000..b7b003f
--- /dev/null
@@ -0,0 +1,124 @@
+"""A very simple test for the command-line zchunk tools."""
+
+import dataclasses
+import os
+import pathlib
+import subprocess
+import sys
+import tempfile
+
+from pychunk import common
+
+
+@dataclasses.dataclass(frozen=True)
+class Config(common.Config):
+    """Runtime configuration."""
+
+    tempd: pathlib.Path
+
+    uncompressed: pathlib.Path
+    recompressed: pathlib.Path
+
+
+def parse_args(dirname: str) -> Config:
+    """Parse the command-line arguments, deduce some things."""
+    parser = common.base_parser("roundtrip")
+
+    args = parser.parse_args()
+    bindir = pathlib.Path(args.bindir).absolute()
+    if not bindir.is_dir():
+        sys.exit(f"Not a directory: {bindir}")
+    zck = bindir / "zck"
+    if not zck.is_file() or not os.access(zck, os.X_OK):
+        sys.exit(f"Not an executable file: {zck}")
+
+    tempd = pathlib.Path(dirname).absolute()
+    return Config(
+        tempd=tempd,
+        bindir=bindir,
+        env=common.get_runenv(),
+        orig=pathlib.Path(args.filename).absolute(),
+        compressed=tempd / "words.txt.zck",
+        uncompressed=tempd / "un/words.txt",
+        recompressed=tempd / "re/words.txt.zck",
+    )
+
+
+def do_uncompress(cfg: Config, orig_size: int) -> None:
+    """Uncompress and compare."""
+    # OK, so unzck's behavior is... weird.
+    cfg.uncompressed.parent.mkdir(mode=0o755)
+
+    print(f"Extracting {cfg.compressed} to {cfg.uncompressed}")
+    if cfg.uncompressed.exists():
+        sys.exit(f"Did not expect {cfg.uncompressed} to exist")
+    subprocess.check_call(
+        [cfg.bindir / "unzck", "--", cfg.compressed],
+        shell=False,
+        env=cfg.env,
+        cwd=cfg.uncompressed.parent,
+    )
+    if not cfg.uncompressed.is_file():
+        subprocess.check_call(["ls", "-lt", "--", cfg.tempd], shell=False)
+        sys.exit(f"unzck did not create the {cfg.uncompressed} file")
+
+    new_size = cfg.uncompressed.stat().st_size
+    print(f"Uncompressed size {new_size}")
+    if new_size != orig_size:
+        sys.exit(f"Uncompressed size {new_size} != original size {orig_size}")
+
+    print(f"Comparing {cfg.orig} to {cfg.uncompressed}")
+    subprocess.check_call(
+        ["cmp", "--", cfg.orig, cfg.uncompressed], shell=False, env=cfg.env
+    )
+
+
+def do_recompress(cfg: Config, comp_size: int) -> None:
+    """Recompress the file and compare."""
+    # OK, so zck's behavior is also weird...
+    cfg.recompressed.parent.mkdir(mode=0o755)
+
+    print(f"Recompressing {cfg.uncompressed} to {cfg.recompressed}")
+    if cfg.recompressed.exists():
+        sys.exit(f"Did not expect {cfg.recompressed} to exist")
+    subprocess.check_call(
+        [cfg.bindir / "zck", "--", cfg.uncompressed],
+        shell=False,
+        env=cfg.env,
+        cwd=cfg.recompressed.parent,
+    )
+    if not cfg.recompressed.is_file():
+        sys.exit(f"zck did not create the {cfg.recompressed} file")
+
+    new_size = cfg.recompressed.stat().st_size
+    print(f"Recompressed size {new_size}")
+    if new_size != comp_size:
+        sys.exit(
+            f"Recompressed size {new_size} != compressed size {comp_size}"
+        )
+
+    print(f"Comparing {cfg.compressed} to {cfg.recompressed}")
+    subprocess.check_call(
+        ["cmp", "--", cfg.compressed, cfg.recompressed],
+        shell=False,
+        env=cfg.env,
+    )
+
+
+def main() -> None:
+    """Create a temporary directory, compress a file, analyze it."""
+    with tempfile.TemporaryDirectory() as dirname:
+        print(f"Using temporary directory {dirname}")
+        cfg = parse_args(dirname)
+        orig_size = cfg.orig.stat().st_size
+        print(f"{cfg.orig} is {orig_size} bytes long")
+
+        comp_size = common.do_compress(cfg, orig_size)
+        common.read_chunks(cfg, orig_size, comp_size)
+        do_uncompress(cfg, orig_size)
+        do_recompress(cfg, comp_size)
+        print("Seems fine!")
+
+
+if __name__ == "__main__":
+    main()
diff --git a/debian/tests/tox.ini b/debian/tests/tox.ini
new file mode 100644 (file)
index 0000000..f8787ff
--- /dev/null
@@ -0,0 +1,46 @@
+[tox]
+envlist =
+  black
+  flake8
+  mypy
+  pylint
+skipsdist = True
+
+[defs]
+files =
+  pychunk
+
+[testenv:black]
+basepython = python3
+deps =
+  black
+commands =
+  python3 -m black --check --line-length 79 {[defs]files}
+
+[testenv:black-reformat]
+basepython = python3
+deps =
+  black
+commands =
+  python3 -m black --line-length 79 {[defs]files}
+
+[testenv:flake8]
+basepython = python3
+deps =
+  flake8
+commands =
+  python3 -m flake8 --ignore=E203 {[defs]files}
+
+[testenv:mypy]
+basepython = python3
+deps =
+  mypy
+commands =
+  python3 -m mypy --strict --python-version=3.6 {[defs]files}
+
+[testenv:pylint]
+basepython = python3
+deps =
+  pylint
+commands =
+  python3 -m pylint --ignore-imports=yes {[defs]files}