什么是mpt树MerklePatriciaTree(简称MPT树,实际上是一种trie前缀树)是以太坊中的一种加密认证的数据结构,可以用来存储所有的(key,value)对https://learnblockchain.cn/article/319(七哥的文章,详细讲解了mpt树)
<!--StartFragment-->
Merkle Patricia Tree(简称MPT树,实际上是一种trie前缀树)是以太坊中的一种加密认证的数据结构,可以用来存储所有的(key,value)对 https://learnblockchain.cn/article/319 (七哥的文章,详细讲解了mpt树) <!--EndFragment-->
实现我们的简易mpt树需要定义4个结构体,1个结构体切片,还需要定义一个全局变量
var EmptyHash = hash.BigToHash(big.NewInt(0))
它被用来作为初始根节点的哈希值(当树为空时)
ITrie interface {
Store(key []byte, account types.Account) error
Root() hash.Hash
Load(key []byte) (types.Account, error)
}
store:用于在 Trie 中存储一个键值对。它接收一个字节切片 key
作为查找的键,以及一个 account
(类型为 types.Account
)作为与该键关联的值 。 如果存储操作成功,则返回 nil
错误;如果有任何错误发生(比如存储失败),则返回相应的错误信息。
Root:返回当前树的根哈希
Load:根据提供的key关键字,查询账户
Child struct {
Path string
Hash hash.Hash
}
Path:根节点到该节点的路径 Hash :存储自己的哈希值
Children []Child
TrieNode struct {
Path string
Leaf bool
ValueHash hash.Hash
Children Children
}
Path:根节点到该节点的路径 Leaf:是否是叶子节点 ValueHash :当前节点的哈希值 Children :孩子
State struct {
db kvstore.KVDatabase
root *TrieNode
}
db:键值对数据库的接口或抽象,用于持久化存储Trie节点数据 root :当前树的根节点,mpt树的状态管理
func NewChild(path string, hash hash.Hash) Child {
return Child{
Path: path,
Hash: hash,
}
}
直接return一个新建的孩子节点
func (children Children) Len() int {
return len(children)
}
func (children Children) Less(i, j int) bool {
return strings.Compare(children[i].Path, children[j].Path) < 0
}
func (children Children) Swap(i, j int) {
children[i], children[j] = children[j], children[i]
}
func (node *TrieNode) Sort() {
sort.Sort(node.Children)
}
这四个函数用于排序 sort函数:是对Children的切片按照一定的顺序排序,通过实现标准库中的sort.Interface接口中的三个方法(len,less,swap),实现了Children切片的排序 len函数:返回切片的长度,这是排序的基础信息 less函数:通过比较children.path实现升序排序 swap函数:交换
func NewTrienode() *TrieNode {
return &TrieNode{
Path: "",
}
}
path设置为""表示从根节点开始
func TrieNodeFromBytes(data []byte) (*TrieNode, error) {
var node TrieNode
err := rlp.DecodeBytes(data, &node)
return &node, err
}
从给定的字节切片 data解码出一个 TrieNode 结构体的实例
func NewState(db kvstore.KVDatabase, root hash.Hash) *State {
if bytes.Equal(EmptyHash[:], root[:]) {
return &State{
db: db,
root: NewTrienode(),
}
} else {
value, err := db.Get(root[:])
if err != nil {
panic(err)
}
node, err := TrieNodeFromBytes(value)
if err != nil {
panic(err)
}
return &State{
db: db,
root: node,
}
}
}
这个函数具体的执行逻辑如下
如果传入的root与表示为空的EmptyHash相等,表明这是一个新的,无数据的状态树,新建一个根节点并返回(从空节点开始新建一个状态树)
如果不等,表明不需要新建,已经存在,需要更新
1.根据根哈希在数据库中查找,err表示是否发生错误
2 发生错误,直接终止程序
3 更新State
func (node *TrieNode) Bytes() []byte {
data, _ := rlp.EncodeToBytes(node)
return data
}
func (node *TrieNode) Hash() hash.Hash {
data := node.Bytes()
return sha3.Keccak256(data)
}
func (state *State) Root() hash.Hash {
return state.root.Hash()
}
func prefixLength(s1, s2 string) int {
length := len(s1)
if length > len(s2) {
return len(s2)
}
for i := 0; i < length; i++ {
if s1[i] != s2[i] {
return i
}
}
return length
}
将 Trienode结构体实例序列化为 RLP 编码的字节切片
计算并返回 TrieNode 实例的 Keccak-256 哈希值
获取当前状态树的根哈希
这个辅助函数计算两个字符串 s1
和 s2
的公共前缀长度。它首先比较两个字符串的长度,取较短的那个长度作为可能的最大前缀长度。然后,通过循环逐字符对比,直到找到第一个不匹配的字符或者遍历完较短字符串的长度。这个函数在处理路径或键值比较时非常有用,特别是在Trie结构中,可以用来快速识别共享路径部分,加速查找或比较过程。(找到共同前缀的长度)
func (state *State) FindAncestors(path string) ([]string, []hash.Hash) { //查找
current := state.root
paths, hashs := make([]string, 0), make([]hash.Hash, 0)
paths = append(paths, "")
hashs = append(hashs, state.Root())
prefix := ""
for {
flag := false
for _, child := range current.Children {
tmp := prefix + child.Path
length := prefixLength(path, tmp)
if length == len(tmp) {
prefix = prefix + child.Path
paths = append(paths, child.Path)
hashs = append(hashs, child.Hash)
flag = true
data, _ := state.db.Get(child.Hash[:])
current, _ = TrieNodeFromBytes(data)
break
} else if length > len(prefix) { //分叉
l := length - len(prefix)
str := child.Path[:l]
paths = append(paths, str)
hashs = append(hashs, child.Hash)
return paths, hashs
}
}
if !flag {
break
}
}
return paths, hashs
}
这个函数是用于查找的,查找在存储,查找中很重要
当前节点执行状态树根
初始化2个变量 paths,hahs
让paths,hashs从根节点开始
定义一个从根节点开始开始的前缀
for循环查找
flag用于决定是否继续查找
构建一个变量tmp,表示从根节点到当前节点的路径
构建一个变量length,表示需要查找的路径与当前节点共同的长度
如果当前路径与tmp匹配{
更新当前路径前缀 prefix,使其包含刚刚匹配的 chiid.path ,以便不断查找
更新paths,以便paths收集经过的路径
更新hashs,以便记录经历过的节点哈希
更改flag,以便决定是否继续查找
根据hashs去数据库里面查找
更新当前节点,以便于继续查找
}如果不匹配且公共前缀大于当前的路径前缀--分叉{
计算分叉点的长度差
从当前子节点的路径 child.Path
中截取与已知前缀匹配的部分
更新分叉点路径
更新分叉点哈希
返回分叉点的信息
}
如果以上都没有,结束循环
返回最后查找到的paths,hashs(没分叉就插入)
func (state *State) LoadTeieNodeFromHash(hash hash.Hash) (*TrieNode, error) {
data, err := state.db.Get(hash[:])
if err != nil {
return nil, err
}
return TrieNodeFromBytes(data)
}
从数据库中根据给定的哈希值 hash 加载一个 Trie节点
返回一个解码之后的节点
func (state *State) Save(node TrieNode) {
h := node.Hash()
state.db.Put(h[:], node.Bytes())
}
将指定的node存储到数据库中
func (state *State) Load(key []byte) (types.Account, error) {
path := hexutil.Encode(key)
path = path[2:]
paths, hashs := state.FindAncestors(path)
matched := strings.Join(paths, "")
var account types.Account
if strings.EqualFold(path, matched) {
lastHash := hashs[len(hashs)-1]
leafNode, err := state.LoadTeieNodeFromHash(lastHash)
if err != nil {
return account, err
}
if !leafNode.Leaf {
return account, errors.New("not found")
}
data, err := state.db.Get(leafNode.ValueHash[:])
_ = rlp.DecodeBytes(data, &account)
return account, err
} else {
return account, errors.New("哦,没找到,玩球,代码出错了")
}
}
给定一个key,查找对应的账户信息
将对应的Key转换成十六进制编码
去掉前缀
调用查找函数,获取指定key的路径和哈希
将找到的路径片段 (paths
) 连接起来,得到 matched
,并与原始的 path
比较。如果两者完全匹配,表示路径正确无误,可以继续查找账户;如果不匹配,说明路径有问题或账户不存在,返回错误信息。
查找到的一定要是叶子节点,如果不是说明有问题
如果路径匹配,取出最后一个哈希值 lasthah,使用 LoadTeieNodeFromHash 加载对应的叶子节点。如果加载失败或该节点不是叶子节点返回错误。
如果叶子节点有效,使用叶子节点的 valuehash 从数据库中获取账户的具体数据,然后通过 rlp的哈桑解码这些数据到 accuount 结构体中。最后,返回账户和可能发生的错误。
如果不匹配,gg
func (state *State) Update(node *TrieNode, hash []hash.Hash) {
childHash := node.Hash()
childPath := node.Path
depth := len(hash)
if depth == 1 {
state.root = node
}
for i := depth - 2; i >= 0; i-- {
current, _ := state.LoadTeieNodeFromHash(hash[i]) //
for key := range current.Children {
if strings.Contains(current.Children[key].Path, childPath) {
current.Children[key].Hash = childHash
current.Children[key].Path = childPath
state.Save(*current)
childHash = current.Hash()
childPath = current.Path
break
}
}
if i == 0 {
state.root = current
}
}
}
更新trie,自下而上更新
首先计算当前节点的hash,path
计算深度,确定遍历深度,如果深度为1,更新根节点
从 倒数第二层 开始倒序遍历(不包括叶子节点,因为那是要更新的目标),遍历到根节点(索引为 0){
根据hash[i]找到每层的节点
遍历当前层的子节点{
如果当前字节点的路径包含目标路径(
更新该子节点的哈希值和路径为新值,并保存更新后的子节点到数据库然后更新 childhash 和 childPath 为当前已更新节点的哈希值和路径,为下次的迭代做准备
结束当前循环
}
{
遍历到根节点
直接更新为当前节点
}
}
}
func (state *State) Store(key []byte, account types.Account) error {
value := account.Bytes()
valueHash := sha3.Keccak256(value)
state.db.Put(valueHash[:], value)
path := hexutil.Encode(key)
path = path[2:]
paths, hashs := state.FindAncestors(path)
prefix := strings.Join(paths, "")
depth := len(hashs)
node, _ := state.LoadTeieNodeFromHash(hashs[depth-1])
if strings.EqualFold(prefix, path) {
node.ValueHash = valueHash
state.Save(*node)
state.Update(node, hashs)
} else {
if strings.EqualFold(node.Path, paths[depth-1]) {
prefix := strings.Join(paths, "")
leafPath := path[len(prefix):]
leafNode := NewTrienode()
leafNode.Leaf = true
leafNode.Path = leafPath
leafNode.ValueHash = valueHash
state.Save(*leafNode)
leafHash := leafNode.Hash()
node.Children = append(node.Children, NewChild(leafPath, leafHash))
node.Sort()
state.Save(*node)
state.Update(node, hashs)
} else {
//第一个孩子
lastMatched := paths[len(paths)-1]
node.Path = node.Path[len(lastMatched):]
state.Save(*node)
prefix := strings.Join(paths, "")
leafPath := path[len(prefix):]
//第二个孩子
leafNode := NewTrienode()
leafNode.Leaf = true
leafNode.Path = leafPath
leafNode.ValueHash = valueHash
state.Save(*leafNode)
//孩子的父亲
newNode := NewTrienode()
newNode.Path = lastMatched
newNode.Children = make(Children, 0)
newNode.Children = append(newNode.Children, NewChild(node.Path, node.Hash()), NewChild(leafNode.Path, leafNode.Hash()))
newNode.Sort()
state.Save(*newNode)
state.Update(newNode, hashs)
}
}
return nil
}
将给定的 key 和对应的账户信息 account 存储到 Trie 结构中
将 account 序列化为字节切片 value,然后计算 value 的 Keccak-256 哈希值 valuehash。
将 valuehash作为键,value 作为值存入数据库
key 进行十六进制编码,去除前缀 0x,得到 path。
使用 查找 函数查找 path 上的所有祖先路径片段及对应哈希值。
prefix表示从根节点到当前节点的路劲
加载最后一个祖先节点(根据 hashs[depth-1]),准备进行更新操作
如果根节点与目标节点没有中间节点直接更新
其他情况
1 :当前遍历到的节点(node)的路径等于 paths[depth-1] 时的情形。这意味着我们已经到达了需要插入新数据的节点的父节点,但目标路径比当前路径稍长一点
计算完整的前缀路径 prefix 为所有祖先路径的组合。
从目标路径 path 中提取出超出 prefix部分的剩余部分作为 leafPath,这将是新叶子节点的路径。
创建一个新的 leafNode,设置其为叶子节点(Leaf = true),并赋予其 Path 为 leafPath 和 ValueHash 为之前计算的账户值的哈希值。
保存 leafNode到数据库。
计算 leafNode 的哈希值 leafHash。
向当前节点 弄得的子节点列表 Children 中添加一个新的子节点,路径为 leafPath,哈希值为 leafHash。
对子节点列表进行排序,保持有序
保存更新后的 node 到数据库。
调用 Update(node, hashs)函数,递归更新祖先节点直到根节点,以反映当前的结构变化。
2:分叉
识别最后匹配的路径
修改当前节点的path(去掉了匹配的部分)
存储当前节点
获取当前节点到根节点的路径
获取叶子的path
创建新的叶子节点,设置为叶子节点,并赋予新的路径和hash后报错到数据库
新建父亲节点,更新父亲节点路径,初始化孩子并添加孩子后给字节点排序。
保存,更新
如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!