256 行以内实现测试用例最小化256 Lines or Less: Test Case Minimization
本文介绍一个可在短短几百行代码中实现的轻量级属性测试(PBT)库,用于自动最小化失败测试用例。尽管 PBT 和模糊测试技术复杂深奥,但作者展示了其核心思想可通过简洁代码表达,适用于调试和回归测试场景。
基于属性的测试(Property Based Testing)和模糊测试(fuzzing)是一个深奥且高度技术性的领域。它包含足够多的进阶技巧,足以支撑几位博士、一个PBT守护进程以及一套客户端-服务器架构。但我这里有一个奇特的“戏法”式PBT库,只需几百行代码就能在一小时内完成实现。
这周我一直在思考一种共识算法的巧妙变体。我在周末实现了它。结果发现,光是编写这个PBT库本身就花了几个小时,而随后的测试却揭示了我思维中一个深刻的算法缺陷(尽管我的编码中已有十来个低级错误)。所以,我暂时还无法深入讨论共识算法,但至少可以聊聊这个库。它非常简单,甚至可以说是简陋。借用一句老苏联笑话里关于巴别尔和贝贝尔的说法,这更像是果戈理而非黑格尔。不过,仅用256行代码,它就成为我工具箱中功率重量比最高的工具之一。
如果你符合以下情况,请阅读本文:
Zig在这里表现出色,因为它本身就以极高的功率重量比著称。
FRNG
实现是一个单独的文件 FRNG.zig,因为这里的抽象核心是有限随机数生成器(Finite Random Number Generator),即所有数字都是预先生成的PRNG,并且会耗尽。我们从标准样板开始:
const std = @import("std");
const assert = std.debug.assert;
entropy: []const u8,
pub const Error = error{OutOfEntropy};
const FRNG = @This();
pub fn init(entropy: []const u8) FRNG {
return .{ .entropy = entropy };
}在Zig中,文件就是结构体:显然你需要结构体,而且如果文件也用可复用的结构体来表示,语言会更简洁。上面的 `const FRNG = @This()` 将文件结构体赋予一个约定俗成的名称,`entropy: []const u8` 声明实例字段(此处只有一个)。`const Error` 和 `fn init` 是“静态”(容器级别)声明。
我们唯一的字段只是一个原始字节的切片,也就是我们预生成的随机数。我们能抛出的唯一错误情况是 OutOfEntropy。
我们能生成的最简单的东西是一个字节切片。通常,API 会以可变切片作为输出参数:
pub fn fill(prng: *PRNG, bytes: []u8) void { ... }但由于 FRNG 的预生成特性,只要我们有足够的熵,就可以直接返回该切片。这将成为我们的(唯一)基础函数,其余的都是便利辅助函数:
pub fn bytes(frng: *FRNG, size: usize) Error![]const u8 {
if (frng.entropy.len < size) return error.OutOfEntropy;
const result = frng.entropy[0..size];
frng.entropy = frng.entropy[size..];
return result;
}接下来最简单的是一组数组(固定大小的切片):
pub fn array(frng: *FRNG, comptime size: usize) Error![size]u8 {
return (try frng.bytes(size))[0..size].*;
}注意Zig如何从运行时已知的切片长度过渡到编译时已知的数组类型。由于大小是编译时常量,对 `[]const u8` 进行 `[0..size]` 切片会返回指向数组的指针,即 `*const [size]u8`。
我们可以将一个4字节数组重新解释为 u32。但因为这是Zig,我们可以轻松地将函数泛化为适用于任何整数类型,方法是传入一个类型为 `type` 的编译时参数 `Int`:
const builtin = @import("builtin");
pub fn int(frng: *FRNG, Int: type) Error!Int {
comptime {
assert(@typeInfo(Int).int.signedness == .unsigned);
assert(builtin.cpu.arch.endian() == .little);
}
return @bitCast(try frng.array(@sizeOf(Int)));
}这个函数会为每个 `Int` 类型进行单态化(monomorphised),因此 `@sizeOf(Int)` 成为一个编译时常量,我们可以将其传递给 `fn array`。
生产代码本应是字节序无关的,但为简化起见,我们将字节序假设编码为编译时断言。注意 Zig 如何将字节序信息传递给程序。这里没有任何侧信道或额外的编译输入(如 --cfg 标志),相反,编译器将所有关于目标 CPU 的信息都物化为 Zig 代码。在编译器缓存目录中的某个地方存在一个 builtin.zig 文件,其中包含
pub const cpu: std.Target.Cpu = .{
.arch = .aarch64,
.model = &std.Target.aarch64.cpu.apple_m3,
// ...
}该文件可通过 @import("builtin") 访问,所有常量可在编译时被检查。
我们可以创建一个整数,布尔值甚至更简单:
pub fn boolean(frng: *FRNG) Error!bool {
return (try frng.int(u8)) & 1 == 1;
}严格来说,我们只需要一个比特而非一个字节,但跟踪单个比特过于繁琐。
从任意整数出发,我们可以生成指定范围内的整数。根据《随机数说明》,我们使用闭区间,这使得 API 无缺陷且在调用处通常更便捷:
pub fn int_inclusive(frng: *FRNG, Int: type, max: Int) Error!Int作为 PRNG 的冷知识,虽然这可以按 frng.int(Int) % (max + 1) 实现,但结果会存在偏差(非均匀分布)。考虑 Int = u8 且调用类似 frng.int_inclusive(u8, 64 * 3) 的情况。
数字 0..64 出现的概率将是 64..(64*3) 中数字的两倍,因为 256 范围的最后四分之一会与第一四分之一的值重叠。
生成无偏随机数是棘手的问题,可能需要从熵源中抽取任意数量的字节。详情参见 https://www.pcg-random.org/posts/bounded-rands.html。我没有深入研究,而是直接复制粘贴了 Zig 标准库中的代码。请自行承担风险!
pub fn int_inclusive(frng: *FRNG, Int: type, max: Int) Error!Int {
comptime assert(@typeInfo(Int).int.signedness == .unsigned);
if (max == std.math.maxInt(Int)) return try frng.int(Int);
const bits = @typeInfo(Int).int.bits;
const less_than = max + 1;
var x = try frng.int(Int);
var m = std.math.mulWide(Int, x, less_than);
var l: Int = @truncate(m);
if (l < less_than) {
var t = -%less_than;
if (t >= less_than) {
t -= less_than;
if (t >= less_than) t %= less_than;
}
while (l < t) {
x = try frng.int(Int);
m = std.math.mulWide(Int, x, less_than);
l = @truncate(m);
}
}
return @intCast(m >> bits);
}现在我们可以生成上下界均受限的整数:
pub fn range_inclusive(
frng: *FRNG, Int: type,
min: Int, max: Int,
) Error!Int {
comptime assert(@typeInfo(Int).int.signedness == .unsigned);
assert(min <= max);
return min + try frng.int_inclusive(Int, max - min);
}另一种常见操作是从切片(slice)中随机选取一个元素。若你希望返回指向元素的指针,则需要提供 const 和 mut 两个版本的函数。更简单且通用的解决方案是返回索引:
pub fn index(frng: *FRNG, slice: anytype) Error!usize {
assert(slice.len > 0);
return try frng.range_inclusive(usize, 0, slice.len - 1);
}在调用处,xs[try frng.index(xs)] 看起来并不糟糕,它适当实现了 const 多态,也可用于多个并行数组。
仿真
截至目前,我们已经花费了约 40% 的行数预算来实现一个较差的随机数生成器,它在任何时刻都可能因 OutOfEntropy 而失败。这有什么用呢?
我们用它为被测系统提供随机输入,观察其反应,并确保它不会崩溃。如果我们设计的系统在遇到任何意外情况时都会崩溃,并且我们的随机输入覆盖了所有可能的输入空间,那么我们就能获得一种信心——测试将能发现 bug。
在我的共识仿真中,我有一个 World 结构体,它包含一个 FRNG 和一个副本集合:
const World = struct {
frng: *FRNG,
replicas: []Replica,
// ...
};World 有如下方法:
fn simulate_request(world: *World) !void {
const replica = try world.frng.index(world.replicas);
const payload = try world.frng.int(u64);
world.send_payload(replica, payload);
}然后我随机选择要调用的方法:
fn step(world: *World) !void {
const action = try world.frng.weighted(.{
.request = 10,
.message = 20,
.crash = 1,
});
switch (action) {
.request => try world.simulate_request(),
.message => { ... },
.crash => { ... },
}
}这里的 fn weighted 是另一个 FRNG 辅助函数,它根据权重比例随机选择一个动作。这个辅助函数需要比我们目前所见更多的反射机制:
pub fn weighted(
frng: *FRNG,
weights: anytype,
) Error!std.meta.FieldEnum(@TypeOf(weights)) {
const fields =
comptime std.meta.fieldNames(@TypeOf(weights));
var total: u32 = 0;
inline for (fields) |field|
total += @field(weights, field);
assert(total > 0);
var pick = try frng.int_inclusive(u64, total - 1);
inline for (fields) |field| {
const weight = @field(weights, field);
if (pick < weight) {
return @field(
std.meta.FieldEnum(@TypeOf(weights)),
field,
);
}
pick -= weight;
}
unreachable;
}weights: anytype 是编译时的鸭子类型。这意味着我们的加权函数可以接受任何类型,每种特定类型都会创建一个新的函数单态化实例。虽然我们没有显式命名权重的类型,但可以通过 @TypeOf(weights) 获取它。
FieldEnum 是一个类型级函数,它接收一个结构体类型:
const S = struct {
foo: bool,
bar: u32,
baz: []const u8
};并将其转换为一个枚举类型,每个字段对应一个变体,这正是我们想要的返回类型:
const E = enum { foo, bar, baz };提示:如果你想快速了解 Zig 的反射能力,请研究 Zig 标准库中 std.meta 和 std.enums 的实现。
@field 内置函数通过编译时常量字段名访问字段。它类似于 Python 的 getattr/setattr,但多了一个限制——必须在编译时求值。
为了增加一点变化,我发现自己很难确定哪些权重是合理的,因此喜欢在测试开始时随机生成权重:
pub fn swarm_weights(frng: *FRNG, Weights: type) Error!Weights {
var result: Weights = undefined;
inline for (comptime std.meta.fieldNames(Weights)) |field| {
@field(result, field) = try frng.range_inclusive(u32, 1, 100);
}
return result;
}(如果你在这里感到困惑,请参考《Swarm Testing Data Structures》)
步进与运行
现在我们有足够的机制来描述测试的整体结构:
fn run_test(gpa: Allocator, frng: *FRNG) !void {
var world = World.init(gpa, &frng) catch |err|
switch (err) {
error.OutOfEntropy => return,
else => return err,
};
defer world.deinit(gpa);
while (true) {
world.step() catch |err| switch (err) {
error.OutOfEntropy => break,
};
}
}
const World = struct {
frng: *FRNG,
weights: ActionWeights,
// ...
const ActionWeights = struct {
request: u32,
message: u32,
crash: u32,
// ...
};
pub fn init(gpa: Allocator, frng: *FRNG) !void {
const weights = try frng.swarm_weights(ActionWeights);
// ...
}
fn step(world: *World) error{OutOfEntropy}!void {
const action = try world.frng.weighted(world.weights);
switch (action) {
.request => { ... },
// ...
}
}
};测试需要一个 FRNG(最终决定结果)和一个通用用途分配器(General Purpose Allocator)用于 World。我们从创建一个具有随机动作权重的模拟 World 开始。如果 FRNG 的熵非常低,即使在这个阶段也可能耗尽熵。我们假设代码是无辜的,直到被证明有罪——如果熵不足以发现 bug,该测试就返回成功。别担心,我们会确保在其他地方有足够的熵。
我们使用 catch |err| switch(err) 来处理 OutOfEntropy 错误。我发现,每当我在 Zig 中处理错误时,常常只想从错误集中排除单个错误。我希望 catch 能像括号一样使用:
// NOT ACTUALY ZIG :(
var world = try World.init(gpa, &frng)
catch (error.OutOfEntropy) return;无论如何,创建了 World 后,只要还有熵,我们就逐步执行它。如果任何步骤检测到内部不一致,整个 World 就会因断言失败而崩溃。如果我们能完成 while(true) 循环,就说明至少这个熵片段没有暴露出任何可疑问题。
注意这里缺少什么:我们并没有预先生成完整的操作列表。相反,我们在过程中随机做决定,并可以自由利用 World 的当前状态构造可能的选择菜单(例如,发送消息时只考虑尚未崩溃的副本)。
二分查找答案
现在我们终于明白为什么要费心编写自定义有限 PRNG,而不是使用现成的了。FRNG 中的熵量决定了测试的复杂度。初始随机字节越少,退出步进循环的速度就越快。这让我们几乎免费就能最小化测试用例。
假设你知道某个特定的熵片段会导致测试失败(集群在第百万步进入分裂脑状态)。比如这个片段是 16KiB。显而易见的下一步是看看只用 8KiB 是否也能让它崩溃。如果 8KiB 不行,那 12KiB 呢?
你可以对测试失败所需的最小熵值进行二分搜索。这种方法适用于任何测试,不限于分布式系统。只要你能编写代码来随机生成输入,就可以通过测量构造该输入时消耗了多少随机字节,来衡量每个特定输入的复杂度。
现在到了最有趣的部分——当然,看起来最小化熵值的方法是从某个特定的失败用例开始,并对其应用遗传算法变异。但在实践中,一个更简单的方法似乎更有效:直接生成一个全新的、更短的熵切片。如果你碰巧发现了一个失败的用例,那么如果存在更小的失败用例,你就有可能在随机尝试中找到它——因为较小的用例数量少得多,随着规模减小,找到失败用例的难度也会大大降低!
搜索器(Searcher)
在二分搜索失败熵值时遇到的问题是:触发断言会导致程序崩溃。Zig 语言没有栈展开机制。因此,我们将把搜索逻辑移到另一个进程中执行。这样,每个测试就成为一个带有 main 函数的可执行文件,该函数从标准输入读取熵值。
Zig 新引入的 main 函数让这变得比以往任何版本都更容易实现 :D
pub fn main(init: std.process.Init) !void {
const gpa = init.gpa;
const io = init.io;
var stdin_reader = std.Io.File.stdin().reader(io, &.{});
const entropy = try stdin_reader.interface
.allocRemaining(gpa, .unlimited);
defer gpa.free(entropy);
var frng = FRNG.init(entropy);
var world = World.init(gpa, &frng, .{}) catch |err|
switch (err) {
error.OutOfEntropy => return,
else => return err,
};
defer world.deinit(gpa);
world.run();
}main 接收 Init 作为参数,提供了对命令行参数、默认分配器和默认 I/O 实现的访问权限。如今 Zig 摒弃了全局环境 I/O 能力,每当需要进行系统调用时,都必须传递一个 I/O 实例。在这里,我们需要 I/O 来读取标准输入。
现在我们来实现一个调用这个 main 函数的框架。这将是一个 FRNG.Driver:
pub const Driver = struct {
io: std.Io,
sut: []const u8,
buffer: []u8,
const log = std.log;
};它将启动外部进程,因此需要一个 I/O 实例。我们还需要一个包含测试 main 函数的可执行文件路径(即被测系统 SUT)。此外,我们还需要一个缓冲区来保存熵值。这个驱动将把成功或失败的结果反馈给用户,所以我们还准备了一个日志用于文本输出。
如何将熵值传递给 SUT?因为我们只关心熵值的大小,所以不会存储实际的熵字节,而是从一个 u64 种子生成。换句话说,只需要两个数字——熵值大小和种子——就能重现一次测试运行:
fn run_once(driver: Driver, options: struct {
size: u32,
seed: u64,
quiet: bool,
}) !enum { pass, fail } {
assert(options.size <= driver.buffer.len);
const entropy = driver.buffer[0..options.size];
var rng = std.Random.DefaultPrng.init(options.seed);
rng.random().bytes(entropy);
var child = try std.process.spawn(driver.io, .{
.argv = &.{driver.sut},
.stdin = .pipe,
.stderr = if (options.quiet) .ignore else .inherit,
});
try child.stdin.?.writeStreamingAll(driver.io, entropy);
child.stdin.?.close(driver.io);
child.stdin = null;
const term = try child.wait(driver.io);
return if (success(term)) .pass else .fail;
}
fn success(term: std.process.Child.Term) bool {
return term == .exited and term.exited == 0;
}我们使用默认的确定性伪随机数生成器(PRNG)将短种子扩展为所需大小的熵切片。然后启动 SUT 进程,通过标准输入传递生成的熵值。关闭子进程的标准输入表示熵值传输结束。接着根据子进程的退出码返回 .pass 或 .fail。这样,显式错误和崩溃都会被识别为失败。
接下来,我们实现检查特定种子大小是否足以发现失败情况的逻辑。当然,在有限时间内我们无法完全确定这一点,因此我们采用用户指定的重试次数:
fn run_multiple(driver: Driver, options: struct {
size: u32,
attempts: u32,
}) !union(enum) { pass, fail: u64 } {
// ...
}用户传入尝试次数,如果所有尝试都成功则返回 .pass,否则返回一个具体的失败种子:
assert(options.size <= driver.buffer.len);
for (0..options.attempts) |_| {
var seed: u64 = undefined;
driver.io.random(@ptrCast(&seed));
const outcome = try driver.run_once(.{
.seed = seed,
.size = options.size,
.quiet = true,
});
switch (outcome) {
.fail => return .{ .fail = seed },
.pass => {},
}
}
return .pass;要生成一个真正的种子,我们需要“真正”的加密非确定性随机性,这由 io.random 提供。
最后,开始搜索大小:
fn search(driver: Driver, options: struct {
attempts: u32 = 100,
}) !union(enum) {
pass,
fail: struct { size: u32, seed: u64 },
} {
// ...
}在这里,我们要找的是导致 sut 崩溃的最小熵大小。如果成功,就返回该种子和大小。大小的上限是预分配的熵缓冲区中可用的空间。
搜索循环本质上是一个二分查找,但有一个变化——我们不直接在大小上进行二分,而是通过倍增步长来调整每次迭代之间的大小。
也就是说,我们从一个小的大小和步长开始,在每次迭代中,将步长翻倍并加到当前大小上,直到遇到失败(或耗尽熵缓冲区)。
一旦发现失败,我们继续向另一个方向搜索——将步长减半并从当前大小中减去它,如果较小的那个大小仍然失败,则保留它。
每一步都记录当前大小和结果,最后报告最小的失败大小。
var found_size: ?u32 = null;
var found_seed: ?u64 = null;
var pass: bool = true;
var size: u32 = 16;
var step: u32 = 16;
for (0..1024) |_| {
if (step == 0) break;
const size_next = if (pass) size + step else size -| step;
if (size > driver.buffer.len) break;
const outcome = try driver.run_multiple(.{
.size = size_next,
.attempts = options.attempts,
});
switch (outcome) {
.pass => log.info("pass: size={}", .{size_next}),
.fail => |seed| {
found_size = size_next;
found_seed = seed;
log.err("fail: size={} seed={}", .{ size_next, seed });
},
}
const pass_next = (outcome == .pass);
if (pass and pass_next) {
step *= 2;
} else if (!pass and !pass_next) {
// Keep the step.
} else {
step /= 2;
}
if (pass or !pass_next) {
size = size_next;
pass = pass_next;
}
} else @panic("safety counter");
if (found_size == null) return .pass;
return .{ .fail = .{
.size = found_size.?,
.seed = found_seed.?,
} };最后,我们将 Driver 的功能封装进 main 函数,使其支持两种模式——要么从给定的种子和大小重现某个失败,要么搜索最小失败:
pub fn main(
gpa: std.mem.Allocator,
io: std.Io,
sut: []const u8,
operation: union(enum) {
replay: struct { size: u32, seed: u64 },
search: struct {
attempts: u32 = 100,
size_max: u32 = 4 * 1024 * 1024,
},
},
) !void {
const size_max = switch (operation) {
.replay => |options| options.size,
.search => |options| options.size_max,
};
const buffer = try gpa.alloc(u8, size_max);
defer gpa.free(buffer);
var driver: Driver = .{
.io = io,
.buffer = buffer,
.sut = sut,
};
switch (operation) {
.replay => |options| {
const outcome = try driver.run_once(.{
.size = options.size,
.seed = options.seed,
.quiet = false,
});
log.info("{t}", .{outcome});
},
.search => |options| {
const outcome = try driver.search(.{
.attempts = options.attempts,
});
switch (outcome) {
.pass => log.info("ok", .{}),
.fail => |fail| {
log.err("minimized size={} seed={}", .{
fail.size, fail.seed,
});
},
}
},
}
}在终端中运行搜索例程如下所示:
这些最终的 seed&size 可以用于 .replay,从而为你提供可用于调试的最小可复现失败!
当然,如果没有可视化我们这样找到的具体 bug,这看起来并不太激动人心,但问题在于,用这种方法测试的系统有趣示例通常需要实现超过 256 行代码。所以我就不具体举例了,但你应该明白:如果你能让系统在“随机”输入下失败,你也可以系统地搜索所有输入空间,找出最小的反例,而无需向搜索器添加关于系统的知识。本文还提供了一个具体的(但略显冗长的)示例。
这是完整代码:
https://gist.github.com/matklad/343d13547c8bfe9af310e2ca2fbfe109
需要完整排版与评论请前往来源站点阅读。