返回 2026-04-20
⚙️ 工程

256 行代码以内:测试用例最小化256 Lines or Less: Test Case Minimization

matklad.github.io·2026-04-20

文章介绍了一种极简的属性测试(Property-Based Testing)库实现,可在几百行代码内完成,用于高效执行测试用例最小化。尽管 PBT 和模糊测试领域技术复杂,但作者提出了一种轻量级、可快速上手的实现方案,适合集成到日常开发流程中。

基于属性的测试(Property Based Testing)和模糊测试(fuzzing)是一个深奥且科学密集的话题。其中涉及的高级技术足以支撑几篇博士论文、一个PBT守护进程,以及一套客户端-服务器架构。但我却搞出了一个奇怪的“客厅戏法”式的PBT库,只需几百行代码,一次就能实现。

这周我一直在琢磨一种有趣的共识算法变体,周末把它实现了出来。我先是花了几小时写了一个PBT库本身,然后又写了个测试,结果这个测试暴露了我算法思路中的一个深层缺陷(在此之前还发现了十几个编码上的低级错误)。所以,我暂时还写不了关于共识算法的更多内容,但至少可以聊聊这个库。它非常简单,甚至可以说简陋。借用一句苏联老笑话形容巴别尔和别尔别洛夫的话来说,它是果戈里,而不是黑格尔。但仅凭256行代码,它却是我工具箱中功率重量比最高的工具之一。

如果你符合以下情况,请阅读本文:

  • 你想锻炼一下生成式测试的能力。
  • 你是个喜欢自己动手的人,不想从货架上拖下一个庞大的PBT库。
  • 你愿意使用现成库,但希望对现有选项有更深入的了解,尤其是关于本质复杂性和偶然复杂性的问题。
  • 你想要一些自包含的、真实的Zig示例 :P
  • Zig在这里表现优异,因为它本身在功率重量比方面同样出众。

    FRNG

    实现只有一个文件:FRNG.zig,因为这里的核心抽象是一个有限随机数生成器(Finite Random Number Generator)——一种所有随机数都预先生成、可能会耗尽的PRNG。我们从标准的样板代码开始:

    const std = @import("std");
    const assert = std.debug.assert;
    
    entropy: []const u8,
    
    pub const Error = error{OutOfEntropy};
    
    const FRNG = @This();
    
    pub fn init(entropy: []const u8) FRNG {
        return .{ .entropy = entropy };
    }

    在Zig中,文件就是结构体:显然你需要结构体,而如果语言将结构体复用为文件的载体,语言本身就会变得更简洁。上述代码中,const FRNG = @This() 为文件结构体赋予了一个惯用名称,而 entropy: []const u8 声明了实例字段(这里只有一个)。const Error 和 fn init 是“静态”(容器级别)的声明。

    我们唯一的字段只是一个原始字节的切片,即我们预先生成的随机数。我们唯一可能抛出的错误是 OutOfEntropy。

    我们能生成的最简单的东西是一个字节切片。通常,这类API会将一个可变切片作为输出参数传入:

    pub fn fill(prng: *PRNG, bytes: []u8) void { ... }

    但由于FRNG的预生成特性,只要我们有足够的熵,就可以直接返回切片。这将成为我们的(唯一)基础函数,其他所有功能都将是构建其上的便捷辅助函数:

    pub fn bytes(frng: *FRNG, size: usize) Error![]const u8 {
        if (frng.entropy.len < size) return error.OutOfEntropy;
        const result = frng.entropy[0..size];
        frng.entropy = frng.entropy[size..];
        return result;
    }

    下一个最简单的东西是数组(具有固定大小的切片):

    pub fn array(frng: *FRNG, comptime size: usize) Error![size]u8 {
        return (try frng.bytes(size))[0..size].*;
    }

    注意Zig是如何从运行时已知的切片长度,转变为编译时已知的数组类型的。因为大小是编译时常量,用 [0..size] 对 []const u8 切片会返回一个指向数组的指针,即 *const [size]u8。

    我们可以将一个4字节数组重新解释为u32。但正因为这是Zig,我们可以轻松地将该函数泛化,使其适用于任何整数类型,只需传入一个类型为 type 的编译时参数 Int:

    const builtin = @import("builtin");
    
    pub fn int(frng: *FRNG, Int: type) Error!Int {
        comptime {
            assert(@typeInfo(Int).int.signedness == .unsigned);
            assert(builtin.cpu.arch.endian() == .little);
        }
        return @bitCast(try frng.array(@sizeOf(Int)));
    }

    该函数会为每一个 Int 类型进行单态化,因此 @sizeOf(Int) 会变成一个编译时常量,我们可以将其传递给 fn array。

    生产环境的代码在这里应该是字节序无关的,但为了简化,我们将字节序假设编码为一个编译时断言。注意 Zig 如何将字节序信息传达给程序。它没有使用任何旁路通道或额外的编译输入(比如 --cfg 标志)。相反,编译器会将目标 CPU 的所有信息以 Zig 代码的形式具体化。在编译器缓存目录的某个位置有一个 builtin.zig 文件,其中包含

    pub const cpu: std.Target.Cpu = .{
        .arch = .aarch64,
        .model = &std.Target.aarch64.cpu.apple_m3,
        // ...
    }

    该文件可通过 @import("builtin") 访问,并且所有常量都可在编译时检查。

    我们可以构造一个整数,而布尔值则更简单:

    pub fn boolean(frng: *FRNG) Error!bool {
        return (try frng.int(u8)) & 1 == 1;
    }

    严格来说,我们只需要一个比特位,而不是一整个字节,但追踪单个比特位会过于繁琐。

    从一个任意的整数出发,我们可以生成一个指定范围内的整数。根据《Random Numbers Included》中的做法,我们使用闭区间,这使得 API 不会失败,并且在调用处通常也更方便:

    pub fn int_inclusive(frng: *FRNG, Int: type, max: Int) Error!Int

    作为 PRNG 的一点花絮,虽然这可以简单地实现为 frng.int(Int) % (max + 1),但结果会有偏差(非均匀分布)。考虑 Int = u8 的情况,以及类似 frng.int_inclusive(u8, 64 * 3) 的调用。

    0..64 范围内的数字出现的概率将是 64..(64*3) 范围内数字的两倍,因为 256 范围的最后四分之一会与开头部分发生别名重叠。

    生成无偏数字是棘手的,可能需要从熵池中抽取任意数量的字节。详情请参考 https://www.pcg-random.org/posts/bounded-rands.html。我没有深究,而是直接从 Zig 标准库复制粘贴了代码。使用风险自负!

    pub fn int_inclusive(frng: *FRNG, Int: type, max: Int) Error!Int {
        comptime assert(@typeInfo(Int).int.signedness == .unsigned);
        if (max == std.math.maxInt(Int)) return try frng.int(Int);
    
        const bits = @typeInfo(Int).int.bits;
        const less_than = max + 1;
    
        var x = try frng.int(Int);
        var m = std.math.mulWide(Int, x, less_than);
        var l: Int = @truncate(m);
        if (l < less_than) {
            var t = -%less_than;
    
            if (t >= less_than) {
                t -= less_than;
                if (t >= less_than) t %= less_than;
            }
            while (l < t) {
                x = try frng.int(Int);
                m = std.math.mulWide(Int, x, less_than);
                l = @truncate(m);
            }
        }
        return @intCast(m >> bits);
    }

    现在我们可以生成一个上下界受限的整数:

    pub fn range_inclusive(
        frng: *FRNG, Int: type,
        min: Int, max: Int,
    ) Error!Int {
        comptime assert(@typeInfo(Int).int.signedness == .unsigned);
        assert(min <= max);
        return min + try frng.int_inclusive(Int, max - min);
    }

    另一个常见操作是从切片中随机选取一个元素。如果你想返回指向该元素的指针,就需要 const 和 mut 版本的函数。一个更简单且更通用的解决方案是返回索引:

    pub fn index(frng: *FRNG, slice: anytype) Error!usize {
        assert(slice.len > 0);
        return try frng.range_inclusive(usize, 0, slice.len - 1);
    }

    在调用处,xs[try frng.index(xs)] 看起来还不错,具有良好的 const 多态性,也适用于多个并行数组。

    模拟

    到目前为止,我们已经花费了约 40% 的代码行预算来实现一个更差的随机数生成器,它随时可能因 OutOfEntropy 而失败。这有什么用呢?

    我们用它来为待测系统提供随机输入,观察其反应,并检查它是否不会崩溃。如果我们编写系统使其在遇到任何意外情况时崩溃,且我们的随机输入覆盖了所有可能的输入空间,那么我们就能在一定程度上确信测试可以检测出 bug。

    在我的共识模拟中,有一个 World 结构体,它持有一个 FRNG 和一组副本:

    const World = struct {
        frng: *FRNG,
        replicas: []Replica,
        // ...
    };

    World 有如下方法:

    fn simulate_request(world: *World) !void {
        const replica = try world.frng.index(world.replicas);
        const payload = try world.frng.int(u64);
    
        world.send_payload(replica, payload);
    }

    然后我随机选择要调用的方法:

    fn step(world: *World) !void {
        const action = try world.frng.weighted(.{
            .request = 10,
            .message = 20,
            .crash = 1,
        });
        switch (action) {
            .request => try world.simulate_request(),
            .message => { ... },
            .crash => { ... },
        }
    }

    这里的 fn weighted 是另一个 FRNG 辅助函数,它根据权重按比例随机选择一个动作。这个辅助函数需要比我们目前所见更多的反射机制:

    pub fn weighted(
        frng: *FRNG,
        weights: anytype,
    ) Error!std.meta.FieldEnum(@TypeOf(weights)) {
        const fields =
            comptime std.meta.fieldNames(@TypeOf(weights));
    
        var total: u32 = 0;
        inline for (fields) |field|
            total += @field(weights, field);
        assert(total > 0);
    
        var pick = try frng.int_inclusive(u64, total - 1);
        inline for (fields) |field| {
            const weight = @field(weights, field);
            if (pick < weight) {
                return @field(
                    std.meta.FieldEnum(@TypeOf(weights)),
                    field,
                );
            }
            pick -= weight;
        }
        unreachable;
    }

    weights: anytype 是一种编译时的鸭子类型(duck-typing)。这意味着我们的加权函数可以接受任何类型的参数,而每种具体类型都会生成一个该函数的单态化(monomorphised)新实例。虽然我们没有显式声明 weights 的类型,但可以通过 @TypeOf(weights) 获取它。

    FieldEnum 是一个类型级函数,它接收一个结构体类型:

    const S = struct {
        foo: bool,
        bar: u32,
        baz: []const u8
    };

    并将其转换为一个枚举类型,其中每个字段对应一个枚举变体——这正是我们所需要的返回类型:

    const E = enum { foo, bar, baz };

    提示:如果你想快速了解 Zig 的反射能力,可以研究 Zig 标准库中 std.meta 和 std.enums 的实现。

    @field 内置函数用于在编译时根据字段名访问字段。它就像 Python 的 getattr / setattr,但多了一个限制:必须在编译时求值。

    为了再增加一点复杂性,我总觉得很难判断哪些权重是合理的,因此喜欢在测试开始时随机生成权重本身:

    pub fn swarm_weights(frng: *FRNG, Weights: type) Error!Weights {
        var result: Weights = undefined;
        inline for (comptime std.meta.fieldNames(Weights)) |field| {
            @field(result, field) = try frng.range_inclusive(u32, 1, 100);
        }
        return result;
    }

    (如果你对此感到困惑,可以参考《Swarm Testing Data Structures》)

    步进与运行

    现在我们已经具备了足够的机制来描述测试的整体结构:

    fn run_test(gpa: Allocator, frng: *FRNG) !void {
        var world = World.init(gpa, &frng) catch |err|
            switch (err) {
                error.OutOfEntropy => return,
                else => return err,
            };
        defer world.deinit(gpa);
    
        while (true) {
            world.step() catch |err| switch (err) {
                error.OutOfEntropy => break,
            };
        }
    }
    
    const World = struct {
        frng: *FRNG,
        weights: ActionWeights,
    
        // ...
    
        const ActionWeights = struct {
            request: u32,
            message: u32,
            crash: u32,
            // ...
        };
    
        pub fn init(gpa: Allocator, frng: *FRNG) !void {
            const weights = try frng.swarm_weights(ActionWeights);
            // ...
        }
    
        fn step(world: *World) error{OutOfEntropy}!void {
            const action = try world.frng.weighted(world.weights);
            switch (action) {
                .request => { ... },
                // ...
            }
        }
    };

    一个测试需要一个 FRNG(最终决定测试结果)和一个通用分配器(General Purpose Allocator)来创建 World。我们首先用随机的动作权重创建一个模拟的 World。如果 FRNG 的熵非常低,即使在这个阶段也可能耗尽熵。我们假设代码在被证明有罪之前是无辜的——如果我们没有足够的熵来发现 bug,那么这次测试就返回成功。别担心,我们会在其他地方确保有足够的熵。

    我们使用 catch |err| switch(err) 来剥离 OutOfEntropy 错误。我发现,每当我在 Zig 中处理错误时,经常只想从错误集中排除某一个特定错误。我希望能够像这样在 catch 中使用括号:

    // NOT ACTUALY ZIG :(
    
    var world = try World.init(gpa, &frng)
        catch (error.OutOfEntropy) return;

    总之,在创建 World 之后,只要还有剩余的熵,我们就逐步执行它。如果某一步检测到内部不一致,整个 World 就会因断言失败而崩溃。如果我们成功退出了 while(true) 循环,就说明至少该段熵数据没有暴露出任何可疑问题。

    注意这里缺少了什么。我们并没有预先生成完整的动作列表。相反,我们是边执行边做随机决策,并且可以自由地使用 World 的当前状态来构建可能的选择菜单(例如,在发送消息时,只考虑当前未崩溃的副本)。

    对答案进行二分查找

    而这就是我们费心编写自定义有限 PRNG(Finite PRNG)而非使用现成方案的原因。FRNG 中的熵量定义了测试的复杂度。初始随机字节越少,我们就越快退出步进循环。这使我们能够几乎零成本地最小化测试用例。

    假设你知道某个特定的熵片段会导致测试失败(例如集群在第 100 万步时进入脑裂状态)。假设该片段大小为 16KiB。显然下一步是看看仅用 8KiB 是否就足以使其崩溃。如果 8KiB 不行,那或许 12KiB 可以?

    你可以通过二分查找来确定导致测试失败所需的最小熵量。这种方法适用于任何测试,不限于分布式系统。只要你能编写代码随机生成输入,就可以通过测量构造该输入所消耗的随机字节数来评估每个特定输入的复杂度。

    而最有趣的部分来了——当然,最小化熵的“正统”方法是从一个已知的失败输入片段开始,对其应用遗传算法变异。但实践中一个简单得多的方法往往也有效:直接生成一个更短的新熵片段。如果你已经随机发现了一个失败案例,那么只要存在更小的失败示例,你同样有可能随机遇到它——毕竟小规模的示例数量少得多,规模越小,找到失败案例就越容易!

    搜索器

    对失败熵进行二分查找的问题在于,断言触发会导致程序崩溃。Zig 中没有栈展开机制。因此,我们将搜索代码移至另一个进程。这样一来,单个测试就是一个带有 main 函数的二进制程序,它从 stdin 读取熵。

    Zig 新引入的 juicy main 让编写这类代码比以往任何版本都更轻松 :D

    pub fn main(init: std.process.Init) !void {
        const gpa = init.gpa;
        const io = init.io;
    
        var stdin_reader = std.Io.File.stdin().reader(io, &.{});
        const entropy = try stdin_reader.interface
            .allocRemaining(gpa, .unlimited);
        defer gpa.free(entropy);
    
        var frng = FRNG.init(entropy);
    
        var world = World.init(gpa, &frng, .{}) catch |err|
            switch (err) {
                error.OutOfEntropy => return,
                else => return err,
            };
        defer world.deinit(gpa);
    
        world.run();
    }

    main 函数接收 Init 作为参数,通过它可以访问命令行参数、默认分配器以及默认的 Io 实现。如今 Zig 摒弃了全局隐式 IO 能力,每当需要发起系统调用时,都必须显式传递一个 Io 实例。在这里,我们需要 Io 来读取 stdin。

    现在我们将实现一个 harness 来调用这个 main 函数。它将是一个 FRNG.Driver:

    pub const Driver = struct {
        io: std.Io,
        sut: []const u8,
        buffer: []u8,
    
        const log = std.log;
    };

    由于它会启动外部进程,因此需要一个 Io。我们还需要一个包含测试 main 函数的可执行文件路径,即被测系统(System Under Test)。此外还需要一个缓冲区来保存熵。该驱动会向用户报告成功或失败,因此我们还需准备一个用于文本输出的日志。

    我们如何将熵传递给被测系统(sut)?因为我们只关心熵的大小,所以不会存储实际的熵字节,而是通过一个 u64 种子来生成它。换句话说,只需两个数值——熵大小和种子——就能复现单次测试运行:

    fn run_once(driver: Driver, options: struct {
        size: u32,
        seed: u64,
        quiet: bool,
    }) !enum { pass, fail } {
        assert(options.size <= driver.buffer.len);
        const entropy = driver.buffer[0..options.size];
    
        var rng = std.Random.DefaultPrng.init(options.seed);
        rng.random().bytes(entropy);
    
        var child = try std.process.spawn(driver.io, .{
            .argv = &.{driver.sut},
            .stdin = .pipe,
            .stderr = if (options.quiet) .ignore else .inherit,
        });
    
        try child.stdin.?.writeStreamingAll(driver.io, entropy);
        child.stdin.?.close(driver.io);
        child.stdin = null;
    
        const term = try child.wait(driver.io);
        return if (success(term)) .pass else .fail;
    }
    
    fn success(term: std.process.Child.Term) bool {
        return term == .exited and term.exited == 0;
    }

    我们使用默认的确定性 PRNG 将短种子扩展为指定大小的熵切片。然后启动 sut 进程,并通过 stdin 将生成的熵喂给它。关闭子进程的 stdin 表示熵输入结束。接着根据子进程的退出码返回 .pass 或 .fail。因此,无论是显式错误还是崩溃,都会被识别为失败。

    接下来,我们实现逻辑以判断某个特定种子大小是否足以发现失败。当然,在有限时间内我们无法绝对确定,因此采用用户指定的重试次数作为标准:

    fn run_multiple(driver: Driver, options: struct {
        size: u32,
        attempts: u32,
    }) !union(enum) { pass, fail: u64 } {
        // ...
    }

    用户传入尝试次数,如果所有尝试都成功,我们返回 .pass;否则返回一个具体的失败种子(如果我们找到了的话):

    assert(options.size <= driver.buffer.len);
    
    for (0..options.attempts) |_| {
        var seed: u64 = undefined;
        driver.io.random(@ptrCast(&seed));
    
        const outcome = try driver.run_once(.{
            .seed = seed,
            .size = options.size,
            .quiet = true,
        });
        switch (outcome) {
            .fail => return .{ .fail = seed },
            .pass => {},
        }
    }
    return .pass;

    要生成一个真正的种子,我们需要“真正的”密码学意义上的非确定性随机性,这由 io.random 提供。

    最后,搜索大小:

    fn search(driver: Driver, options: struct {
        attempts: u32 = 100,
    }) !union(enum) {
        pass,
        fail: struct { size: u32, seed: u64 },
    } {
        // ...
    }

    在这里,我们要找到导致 sut 崩溃的最小熵大小。如果成功,我们将返回种子和该大小。大小的上限是预分配的熵缓冲区中的可用空间。

    搜索循环本质上是一个二分查找,但略有变化——我们不会直接在大小上进行二分,而是会加倍用于在迭代之间改变大小的步长。

    也就是说,我们从一个小的大小和步长开始,在每次迭代中,将步长加倍并加到当前大小上,直到触发失败(或熵缓冲区耗尽)。

    一旦发现失败,我们就在相反方向继续搜索——将步长减半并从当前大小中减去,如果更小的尺寸仍然导致失败,则保留该更小的尺寸。

    在每一步中,我们记录当前的大小和结果,并在最后报告最小的失败大小。

    var found_size: ?u32 = null;
    var found_seed: ?u64 = null;
    
    var pass: bool = true;
    var size: u32 = 16;
    var step: u32 = 16;
    for (0..1024) |_| {
        if (step == 0) break;
        const size_next = if (pass) size + step else size -| step;
        if (size > driver.buffer.len) break;
    
        const outcome = try driver.run_multiple(.{
            .size = size_next,
            .attempts = options.attempts,
        });
        switch (outcome) {
            .pass => log.info("pass: size={}", .{size_next}),
            .fail => |seed| {
                found_size = size_next;
                found_seed = seed;
                log.err("fail: size={} seed={}", .{ size_next, seed });
            },
        }
        const pass_next = (outcome == .pass);
    
        if (pass and pass_next) {
            step *= 2;
        } else if (!pass and !pass_next) {
            // Keep the step.
        } else {
            step /= 2;
        }
    
        if (pass or !pass_next) {
            size = size_next;
            pass = pass_next;
        }
    } else @panic("safety counter");
    
    if (found_size == null) return .pass;
    return .{ .fail = .{
        .size = found_size.?,
        .seed = found_seed.?,
    } };

    最后,我们将 Driver 的功能封装到 main 函数中,该函数支持两种模式——要么根据给定的种子和大小复现某个失败,要么搜索最小的失败。

    pub fn main(
        gpa: std.mem.Allocator,
        io: std.Io,
        sut: []const u8,
        operation: union(enum) {
            replay: struct { size: u32, seed: u64 },
            search: struct {
                attempts: u32 = 100,
                size_max: u32 = 4 * 1024 * 1024,
            },
        },
    ) !void {
        const size_max = switch (operation) {
            .replay => |options| options.size,
            .search => |options| options.size_max,
        };
    
        const buffer = try gpa.alloc(u8, size_max);
        defer gpa.free(buffer);
    
        var driver: Driver = .{
            .io = io,
            .buffer = buffer,
            .sut = sut,
        };
    
        switch (operation) {
            .replay => |options| {
                const outcome = try driver.run_once(.{
                    .size = options.size,
                    .seed = options.seed,
                    .quiet = false,
                });
                log.info("{t}", .{outcome});
            },
            .search => |options| {
                const outcome = try driver.search(.{
                    .attempts = options.attempts,
                 });
                switch (outcome) {
                    .pass => log.info("ok", .{}),
                    .fail => |fail| {
                        log.err("minimized size={} seed={}", .{
                            fail.size, fail.seed,
                         });
                    },
                }
            },
        }
    }

    在终端中运行搜索例程的效果如下:

    这些最终的种子和大小随后可用于 .replay,从而为你提供用于调试的最小可复现失败案例!

    当然,如果没有可视化我们通过这种方式发现的某个具体 bug,这看起来并不太令人兴奋;但问题在于,通常值得用这种方式测试的有趣系统实现起来往往超过 256 行代码。所以我把这个留给你想象,但你已经明白核心思想:如果你能让一个系统在“随机”输入下失败,那么你也可以系统地搜索所有输入空间,找到最小的反例,而无需向搜索器添加关于系统的任何先验知识。本文还提供了一个具体(尽管略显冗长)的示例。

    以下是完整代码:

    https://gist.github.com/matklad/343d13547c8bfe9af310e2ca2fbfe109

    需要完整排版与评论请前往来源站点阅读。