Farcaster 是 web3 版的 twitter, 本文探究 Farcaster Hub 实现技术探究
【注:由于 farcaster 的代码库非常活跃,若本文内容和你看到的代码之间有出入,请以代码为准。】
近期对 web3 当红项目 farcaster 代码进行了粗略的研究,重点关注了其 decentralized node 实现 hub。并且基于学到的知识以 quick and dirty 的方式实现了一个简单的原型,本文则是这段时间内学习成果的汇总。
之所以研究它,原因无它:正在构思一个 decentralized node 实现,而 farcaster 的技术栈与我们团队高度吻合。
如果作为 web3 从业者还不清楚 farcaster 为何物,那么建议充分利用搜索引擎或直接问近期崛起的 AI 服务。要是用一句话介绍的话: farcaster 就是 web3 版的 twitter 。
它的整体架构如下图:
[来源:https://docs.farcaster.xyz/assets/architecture.T7tCPEnC.png]
由上图可知,hub 组成了其去中心化的网络,它也正是本文的重点。至于其他内容,在此略过不谈。
Hub 的架构如下:
[来源:https://docs.farcaster.xyz/assets/hub.PU02ORHT.png]
作为一个去中心化节点,hub 的职责:
保存 offchain 数据
在这里就是用户发的那些 cast、comment、mention 等等,简单来说就是消息以及相关的数据。
处理节点间通信请求
作为一个建立在 p2p 协议之上的应用,这一点不难理解。
同步数据
如 blockchain 节点一样,每个节点都保存一份完整的数据副本。但由于(来自于同个用户的)请求可能会被发送到整个网络的不同节点,那么数据同步和解决冲突自然必不可少。
验证其他节点的有效性
即网络中的节点是否有篡改数据的行为。
api server
支持 https 和 grpc
最后请注意:hub node 并非一个 blockchain 节点,它只是一个 offchain 的去中心化节点。
hub 是一个多线程 node 应用,充分利用了 worker 技术来划分节点的不同职责:p2p node、db、merkel 同步 ……
同时还利用 ReadWriteLock 来保证线程安全。
不过,farcaster 使用的 node 自带的 worker 库,代码不免有些繁琐。在此,我推荐使用 threads.js ,它可极大简化代码编写,其 github readme 中的例子充分体现了这一点。其典型特性:
主线程触发子线程的执行如普通函数调用一般简单。
子线程使用 parentPort
与父线程通信。
封装了父子线程之间的事件:termination
、 message
……
支持线程池
……
熟悉 node worker 编程的小伙伴可能会说还有:piscina 。阔是经过人肉测试之后,个人更喜欢 threads.js。
farcaster 采用 rocksdb 来作为其节点数据库,这也是很多 blockchain node 的选择,因为其读写速度飞快。
阔是,rocksdb 在 node 生态实在更新太慢,各位可自行去搜搜它的最近一次发布是什么时候。更要命的是,其版本混乱,甚至于 rocksdb npm 的 github repo 都是在其老祖宗 leveldb github repo 之下 😓。
farcaster 后来 fork 了这个库,可是并未添加什么实质性更新,仅仅做了一些关于平台编译的微调,列位看官可点击此处自行查看。
甚至于 eth 的 node 实现 lodestar 也用的是 leveldb,而不是 rocksdb。
估计 farcaster 团队也被此弄烦了,再加上内存和性能的问题,在近期代码中,rocksdb 相关代码已经采用组合技术实现:node rust bridge + rust rocksdb cargo。在完成迁移之后,还在 twitter 上欣喜的说:内存少了,性能高了!
关于 node 和 rust addon 的集成,由于时间和精力关系,我没有深入研究。目前初步看来至少有两种流行方案:
neon,farcaster 的选择
napi-rs,更简单
hub 采用 grpc 进行两方面的通信:
client 和 hub
p2p peers
并通过 grpc-web
来提供 http 接口。
hub 使用 ts-proto 来完成 protobuf 的代码生成,直接使用 grpc
和 grpc-web
来用于 server 端实现,至于 http server 则是用 fastify 来完成。
跟直接使用 worker 库一样,这种方式导致代码繁琐,建议采用 connectrpc 快速实现。
libp2p 是去中心化节点实现的基础,不少 blockchain 实现也是基于它来完成,比如 eth 的 node 实现 lodestar。
但它本身是一组大的协议栈,若对于其组成部分缺乏基本了解,则无法用好,建议先阅读协议再看配置。
hub 使用 gossipsub 实现节点间的消息传播,并利用 connectionGater
来实现对于节点连接的控制。
但可能是代码实现时间较早的缘故,根据现有协议规范,若自行实现时,可以考虑若干增强:
将 stream multiplexer 由 mplex 切换到目前推荐的 yamux
使用 Pubsubpeerdiscovery 来自动发现 peer
并且可以实现一个 cron job 来移除无效的 peer
基本代码架子如下:
createLibp2p({
peerId: await createFromPrivKey(
await unmarshalPrivateKey(Buffer.from(peerPrivateKey, 'hex'))
),
addresses: {
listen: ['/ip4/127.0.0.1/tcp/0'],
},
connectionGater: {
denyDialPeer: async (peerId: PeerId) => {
return await notAllowed(peerId.toString());
},
denyDialMultiaddr: async (multiaddr: Multiaddr) => {
return await notAllowed(multiaddr.getPeerId()!.toString());
},
denyInboundConnection: () => {
return false;
},
denyOutboundConnection: async (peerId: PeerId) => {
return await notAllowed(peerId.toString());
},
denyInboundEncryptedConnection: async (peerId: PeerId) => {
return await notAllowed(peerId.toString());
},
denyOutboundEncryptedConnection: async (peerId: PeerId) => {
return await notAllowed(peerId.toString());
},
denyInboundUpgradedConnection: async (peerId: PeerId) => {
return await notAllowed(peerId.toString());
},
denyOutboundUpgradedConnection: async (peerId: PeerId) => {
return await notAllowed(peerId.toString());
},
denyInboundRelayReservation: async (peerId: PeerId) => {
return await notAllowed(peerId.toString());
},
denyOutboundRelayedConnection: async (peerId: PeerId) => {
return await notAllowed(peerId.toString());
},
denyInboundRelayedConnection: async (peerId: PeerId) => {
return await notAllowed(peerId.toString());
},
},
connectionManager: {
autoDialInterval: 1000,
},
transports: [tcp()],
connectionEncryption: [noise()],
streamMuxers: [yamux()],
peerDiscovery: [
mdns({
interval: 1000,
}),
pubsubPeerDiscovery(),
],
services: {
pubsub: gossipsub(),
identify: identify(),
},
})
.then(async node => {
libp2p = node;
node.addEventListener('peer:discovery', async evt => {
console.log('Discovered %s', evt.detail.id.toString());
});
node.addEventListener('peer:connect', async evt => {
console.log('Connected to %s', evt.detail.toString());
checkPeersPermissionPeriodically();
});
node.services.pubsub.addEventListener('message', message => {
if (message.detail.topic === messageTopic) {
parentPort?.postMessage({
topic: message.detail.topic,
message: message.detail.data.toString(),
});
}
});
node.services.pubsub.addEventListener('gossipsub:message', message => {});
node.services.pubsub.subscribe(messageTopic);
await node.start();
console.log('listening on addresses:');
node.getMultiaddrs().forEach(addr => {
console.log(addr.toString());
});
const stop = async () => {
await node.stop();
console.log('libp2p has stopped');
process.exit(0);
};
process.on('SIGTERM', stop);
process.on('SIGINT', stop);
})
.catch((e: any) => console.error);
}
CRDT 全称:Conflict Free Replicated Data Types,广泛用于主主复制、文档协同编辑等场景。其特点是:最终一致性,此术语各位应该不会太陌生。
hub 采用 CRDT 来定义其 storage,采用的是 2p-set,看起来吓人其实就是:
add 和 remove 各自记录
merge 负责处理冲突
在研究代码的过程中,发现一个非常不错的 CRDT 系列文章,第一篇的链接:https://www.bartoszsypytkowski.com/the-state-of-a-state-based-crdts/ ,有兴趣的可以去仔细看看。同时,也可以研究一下大名鼎鼎的yjs。
hub 使用该项技术来实现节点之间的数据同步,即在保存本地消息数据时同步生成 merkel trie,在同步周期到了之后,节点交换各自的 merkel tries,以看是否有交换消息的必要。
此处让我不解的是为何 farcaster 要自行去实现一个 merkel trie,或许在当初 node 生态中尚未如现在一样有众多的 merkel tree 的实现。
如果不考虑精细的控制和性能,一种快糙猛的 merkel 同步实现方式如下:
确定数据分块策略,注意:一定要维持一个确定的顺序。一个 block 包含多个待同步消息,在 rocksdb 这类 kv 数据库中,使用指向实际数据的 key 即可。
每个 block 对应的数据表示:(merkel proof, leaf set)
定义数据同步消息格式和协议
在同步周期到时,节点随机选择一个节点获取其 merkel trie 数据,即 block 集合。
对于每个数据,比较 merkel proof,若相同,则略过。
若不同,则 diff 出缺失的 leaf,由于 leaf 本身就是指向消息的 key,因此直接由 key 即可获得对应缺失的消息。
同步更新完消息时,也同步更新 block 数据。
至于 merkel proof 的生成,采用成熟的 npm 即可,如 merkeltree.js。此外,openzepplin 也提供了一个实现,而且提供了安全性增强,各位可根据自己的喜好自行选择。
如众多开源项目一样,farcaster 采用的是 monorepo 实践,基于 turbo。并且广泛的采用了基于 rust 的 js/ts 工具链:
tsup,打包
swc,编译
biome,格式化和 lint
关于 tooling,请参见我们近期的研究。
如果想了解 decentralized app 的编写套路,farcaster 无疑是一个非常好的参考,本文列出了其中涉及的关键技术,相信此文将会激发各位的思路以及自行探索的兴趣。
注:文中有大量链接,请点击“阅读原文”来查看链接内容。
如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!