MGF 手动引导模糊测试:一份新手指南

  • Ackee
  • 发布于 7小时前
  • 阅读 51

本文介绍了手动引导模糊测试(MGF)在智能合约安全中的应用,MGF通过定义特定的测试流程和不变量,有针对性地检测漏洞,文章还对比了Wake MGF 与 Foundry 的模糊测试和不变量测试,并提供了使用Wake框架进行MGF的具体步骤和代码示例,展示了如何初始化合约、定义流程、处理 revert,定义不变量以及运行测试。

介绍

手动引导模糊测试 (MGF) 是一种测试方法,它通过有指导的测试场景系统地测试智能合约的行为,从而发现关键漏洞。

与仅依赖随机性的传统模糊测试不同,MGF 允许开发者定义特定的测试流程和不变量,从而提供更有针对性和更有效的漏洞检测。

学习 MGF 以加强你的智能合约安全性。

综合文档:Wake Testing Framework – Fuzzing

与 Foundry Fuzz 测试和不变量测试的比较

在任何智能合约测试中,最关键的要素是定义清晰的 不变量 ——无论合约状态如何变化,都应始终保持为真的属性。

Wake 手动引导模糊测试方法将预期行为(在 Python 中定义)与实际合约行为进行比较,而不依赖于合约的内部逻辑。

这种方法迫使测试人员在每个步骤验证行为,确保全面的覆盖,并捕获其他测试方法可能遗漏的边界情况。

Wake MGF 生命周期

与 Foundry 的 fuzz 测试或不变量测试相比,Wake MGF 遵循不同的执行生命周期。

Wake MGF 执行生命周期:

其中:

  • flow_count 定义了在每个 pre_sequence 函数(合约初始化)之后执行的 flow 函数调用的数量
  • sequence_count 定义了要执行的完整测试序列的数量

每个序列由一个 pre_sequence 组成,后跟指定数量的 flow 函数调用。

了解更多关于 Wake 中的 execution hooks

实现指南

前提条件

  • 你的系统已经安装 Wake 框架
  • 对 Python 和 Solidity 的基本理解
  • 已准备好用于测试的 Solidity 项目或使用 附录 中的代码。

完整源代码

完整源代码可在 附录 中获得。

1. 使用 Wake 编译项目

  1. 运行 $ wake up 以编译你的 Solidity 合约并生成 Python 类型定义
  2. pytypes 会自动生成在 pytypes 目录中
  3. 创建你的测试文件:tests/test_fuzz.py

pytypes 为你的 Solidity 合约提供 Python 接口,从而在测试期间实现类型安全的交互。

操作项:使用正确的测试文件位置设置你的项目结构。

2. 导入 Wake

导入 Wake 测试。

from wake.testing import *
from wake.testing.fuzzing import *

3. 导入 pytypes

通过查看 pytypes 目录并导入你的合约 pytypes 来导入 pytypes。

from pytypes.contracts.Token import Token

4. 定义测试类并从测试中调用

Fuzzing 基类 FuzzTestwake.testing.fuzzing 中定义。

from wake.testing import *
from wake.testing.fuzzing import *

import logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

from pytypes.contracts.Token import Token

def revert_handler(e: RevertError):
    if e.tx is not None:
        print(e.tx.call_trace)

class TokenFuzz(FuzzTest):

    def pre_sequence(self):
        pass

    @flow()
    def flow_example(self):
        pass

    @invariant()
    def invariant_example(self):
        pass

@chain.connect()
@on_revert(revert_handler)
def test_default():
    TokenFuzz.run(sequences_count=1, flows_count=100)

以下部分详细介绍了如何在 TokenFuzz 类中实现逻辑。

5. 合约初始化和 Python 状态定义

pre_sequence 函数用作每个测试序列的设置阶段:

  1. 部署合约:初始化你要测试的合约
  2. 定义参与者:设置将与你的合约交互的账户
  3. 初始化 Python 状态:创建数据结构以跟踪预期的合约状态

这种分离确保每个测试序列都以干净、已知的状态开始。

class TokenFuzz(FuzzTest):

    token_owner: Account
    token: Token
    token_balances: dict[Account, int]

    def pre_sequence(self):
        self.token_owner = random_account()
        self.token = Token.deploy(from_=self.token_owner)
        self.token_balances = defaultdict(int)

6. 定义 Flows

什么是 flow 函数?

Flow 函数是 MGF 测试的核心。每个 flow 函数生成测试输入、执行合约调用、验证行为并更新 Python 状态以反映合约更改。

Flow 函数通过系统地测试不同的输入组合和执行路径来模拟真实世界的使用模式和边界情况。

这是 flow 函数。

@flow()
def flow_mint_tokens(self):
    ##1. 准备随机输入
    recipient = random_account() # 或者 random.choice(list(chain.accounts) + [self.token])
    amount = random_int(0, 10**30)
    actor = random_account()

    ##2. 运行交易
    with may_revert() as e:
        tx = self.token.mintTokens(recipient, amount, from_=actor)
    if e.value is not None:
        ## 3. 检查 revert
        if actor != self.token_owner:
            assert e.value == Token.NotAuthorized(actor.address)
            return "Not authorized"
        assert False

    ##4. 检查事件
    events = [e for e in tx.events if isinstance(e, Token.TokensMinted)]
    assert len(events) == 1
    assert events[0].to == recipient.address
    assert events[0].amount == amount

    ##5. 更新 python 状态
    self.token_balances[recipient] += amount

    ##6. 用于调试的日志记录
    logger.info(f"Minted {amount} tokens to {recipient.address}")

在每个 flow 函数中遵循这种结构化的方法:

  • 准备随机输入
  • 执行带有 revert 处理的交易
  • 验证事件和断言
  • 更新 Python 状态
  • 添加日志记录以进行调试(如果需要)
步骤 1:准备随机输入

使用 Wake 的内置随机函数(如 random_account()random_int(min, max)random_bytes(length))生成测试输入。这些函数确保跨不同输入场景的全面测试覆盖率。

完整文档:https://ackee.xyz/wake/docs/latest/testing-framework/fuzzing/#random-functions

步骤 2:执行带有 revert 处理的交易

使用 may_revert() 上下文管理器来处理成功和失败的交易。这使得可以为成功/失败情况设置分支逻辑。使用 assert False 来捕获意外的 revert 条件,并返回描述性字符串以进行预期的 revert,以便跟踪测试统计信息。

with may_revert() as e:
    tx = self.token.mintTokens(recipient, amount, from_=actor)
if e.value is not None:
    if condition:
        # assert e.value == RevertError()
        return "Reason"
    elif other_condition:
        # assert e.value == RevertOtherError()
        return "OtherReason"
    assert False
步骤 3:检查事件和断言

始终检查错误。

始终检查测试目标的事件。

可以通过以下方式检查事件和 RevertErrors:

events = [e for e in tx.events if e == Token.TokensMinted(recipient.address, amount)]
assert len(events) == 1

或者按事件筛选并断言参数

events = [e for e in tx.events if isinstance(e, Token.TokensMinted)]
assert len(events) == 1
assert events[0].to == recipient.address
assert events[0].amount == amount

建议将 isinstance() 方法用于复杂的验证场景,例如发出多个事件的交易或参数值需要复杂计算时。此方法提供精确的错误报告,准确显示哪些参数未能通过断言检查。

步骤 4:更新 Python 状态

在你的 Python 变量中镜像合约的状态更改。这种并行的状态跟踪可以实现准确的 不变量 检查。永远不要从 view 函数派生状态更新——始终根据你交易的已知结果进行更新。

不变量函数

不变量函数验证关键属性在整个合约执行过程中是否成立。它们使用 view 函数将你的 Python 状态与实际合约状态进行比较。

对于复杂的协议,不变量 可能包括复杂的逻辑,以验证多合约的交互。永远不要在 不变量 函数中修改状态。如果验证需要更改状态的操作,请使用 snapshot_and_revert() 以避免影响测试序列。

MGF 中 不变量 的定义

使用 Python 状态检查 view 函数。

检查 不变量 语句和条件 不变量

所有 @invariant() 函数在每次 @flow 函数调用后被调用。

这些函数中没有状态更改。

@invariant()
def invariant_token_balances(self):
    for account in list(self.token_balances.keys()) + [self.token]:
        assert self.token.getBalance(account.address) == self.token_balances[account]

@invariant()
def invariant_token_owner(self):
    assert self.token.owner() == self.token_owner.address

运行测试

运行测试:

$ wake test tests/test_token_fuzz.py

这是一个运行示例,具有较小的 flow_number。

视频播放器

https://ackee.xyz/blog/wp-content/uploads/2025/09/mgf_run.mp4

测试失败时使用调试模式:

$ wake test tests/test_token_fuzz.py -d

执行显示随机种子十六进制值。你可以使用此十六进制值重现相同的测试,包括失败。

设置特定的随机种子以进行可重现的测试:

$ wake test tests/test_token_fuzz.py -S 235ab3

更多模糊测试技巧和专业方法:在 X 上关注 @wakeframework

结论

手动引导模糊测试提供了一种验证合约行为的系统方法,同时提供对合约逻辑和边界情况的深刻见解。

附录 – 完整代码

token.sol

// SPDX-License-Identifier: MIT
pragma solidity ^0.8.0;

contract Token {
    address public immutable owner;
    mapping(address => uint256) public tokenBalance;

    event Transfer(address indexed from, address indexed to, uint256 value);
    event TokensMinted(address indexed to, uint256 amount);

    error NotEnoughTokens(uint256 requested, uint256 balance);
    error NotAuthorized(address caller);

    constructor() {
        owner = msg.sender;
    }

    modifier onlyOwner() {
        if (msg.sender != owner) {
            revert NotAuthorized(msg.sender);
        }
        _;
    }

    function mintTokens(address recipient, uint256 amount) external onlyOwner {
        tokenBalance[recipient] += amount;
        emit TokensMinted(recipient, amount);
    }

    function transfer(address to, uint256 amount) external {
        if (tokenBalance[msg.sender] < amount) {
            revert NotEnoughTokens(amount, tokenBalance[msg.sender]);
        }

        tokenBalance[msg.sender] -= amount;
        tokenBalance[to] += amount;

        emit Transfer(msg.sender, to, amount);
    }

    function transferWithBytes(bytes calldata data) external {
        (address to, uint256 amount) = abi.decode(data, (address, uint256));
        if (tokenBalance[msg.sender] < amount) {
            revert NotEnoughTokens(amount, tokenBalance[msg.sender]);
        }
        tokenBalance[msg.sender] -= amount;
        tokenBalance[to] += amount;
        emit Transfer(msg.sender, to, amount);
    }

    function getBalance(address account) external view returns (uint256) {
        return tokenBalance[account];
    }
}

test_token_fuzz.py

from wake.testing import *
from collections import defaultdict
from wake.testing.fuzzing import *

from pytypes.contracts.Token import Token

import logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

## Print failing tx call trace
def revert_handler(e: RevertError):
    if e.tx is not None:
        print(e.tx.call_trace)
class TokenFuzz(FuzzTest):

    token_owner: Account
    token: Token
    token_balances: dict[Account, int]

    def pre_sequence(self):
        self.token_owner = random_account()
        self.token = Token.deploy(from_=self.token_owner)
        self.token_balances = defaultdict(int)

    @flow()
    def flow_mint_tokens(self):
        ## 准备随机输入
        recipient = random_account() # 或者 list(chain.accounts) + [self.token]
        amount = random_int(0, 10**30)
        actor = random_account()

        ## 运行交易
        with may_revert() as e:
            tx = self.token.mintTokens(recipient.address, amount, from_=actor)
        if e.value is not None:
            if actor != self.token_owner:
                assert e.value == Token.NotAuthorized(actor.address)
                return "Not authorized"
            assert False

        ## 检查事件
        events = [e for e in tx.events if isinstance(e, Token.TokensMinted)]
        assert len(events) == 1
        assert events[0].to == recipient.address
        assert events[0].amount == amount

        ## 更新 python 状态
        self.token_balances[recipient] += amount

        logger.info(f"Minted {amount} tokens to {recipient.address}")

    @flow()
    def flow_transfer_tokens(self):
        recipient = random_account()
        amount = random_int(0, 10**30)
        actor = random_account()
        with may_revert() as e:
            tx = self.token.transfer(recipient.address, amount, from_=actor)

        if e.value is not None:
            if self.token_balances[actor] < amount:
                assert e.value == Token.NotEnoughTokens(amount, self.token_balances[actor])
                return "Not enough tokens"
            assert False

        events = [e for e in tx.events if isinstance(e, Token.Transfer)]
        assert len(events) == 1
        assert events[0].from_ == actor.address
        assert events[0].to == recipient.address
        assert events[0].value == amount

        self.token_balances[recipient] += amount
        self.token_balances[actor] -= amount

        logger.info(f"Transferred {amount} tokens from {actor.address} to {recipient.address}")

    @flow()
    def flow_transfer_tokens_with_bytes(self):
        recipient = random_account()
        amount = random_int(0, 10**30)
        actor = random_account()
        with may_revert() as e:
            tx = self.token.transferWithBytes(abi.encode(recipient.address, uint256(amount)), from_=actor)
        if e.value is not None:
            if self.token_balances[actor] < amount:
                assert e.value == Token.NotEnoughTokens(amount, self.token_balances[actor])
                return "Not enough tokens"
            assert False

        events = [e for e in tx.events if isinstance(e, Token.Transfer)]
        assert len(events) == 1
        assert events[0].from_ == actor.address
        assert events[0].to == recipient.address
        assert events[0].value == amount

        self.token_balances[recipient] += amount
        self.token_balances[actor] -= amount

        logger.info(f"Transferred {amount} tokens from {actor.address} to {recipient.address}")

    @invariant()
    def invariant_token_balances(self):
        for account in list(self.token_balances.keys()) + [self.token]:
            assert self.token.getBalance(account.address) == self.token_balances[account]

    @invariant()
    def invariant_token_owner(self):
        assert self.token.owner() == self.token_owner.address

@chain.connect()
def test_default():
    TokenFuzz.run(sequences_count=10, flows_count=10000)
  • 原文链接: ackee.xyz/blog/a-beginne...
  • 登链社区 AI 助手,为大家转译优秀英文文章,如有翻译不通的地方,还请包涵~
点赞 0
收藏 0
分享
本文参与登链社区写作激励计划 ,好文好收益,欢迎正在阅读的你也加入。

0 条评论

请先 登录 后评论
Ackee
Ackee
Cybersecurity experts | We audit Ethereum and Solana | Creators of @WakeFramework , Solidity (Wake) & @TridentSolana | Educational partner of Solana Foundation