本文将和大家一起讨论链上数据同步解决方案 - Kafka 的实际使用和优势,Kafka 能够根据区块链上数据的位移来跟踪消费状态和保证消息的顺序,同时使用分布式日志存储来持久化消息。
作者:ddl
在现有区块链世界,有很多业务场景需要实时获取链上和链下的数据,进一步加工数据价值。譬如基于数据筛选的 SmartMoney 交易监控,基于数据统计的实时TVL交易决策等等。这些场景都需要丰富的流式数据集的支持,Chainbase 开放性地提供基于「数据集」的实时 Sync 模块解决此类问题
当前市面上大多数实时消息流推送,基本上都是基于 WebSocket 和 Webhook 模式,两者都无法保证 exact once 语意,即保证消息刚好被消费一次。
WebSocket 可以实现实时的通信,但由于网络的不稳定性和消息传输的复杂性,可能会导致消息的乱序、丢失或重复。
并且 HTTP 请求是无状态的,无法保证请求的可靠性和顺序性。如果网络传输中发生错误,或者服务端没有正确处理重试和幂等性,就有可能导致重复的请求或丢失的请求,从而无法保证「exact once」的语义。
而 Kafka 的优势在于它支持消息的可重放功能。Kafka 作为消息队列,它可以持久化消息至磁盘。所以在保证「at least once」语意的同时,它也支持消息的重播功能,具有「效率一次」和「待机重放」的特性。
这对于需要建立在数据不丢失基础上的流式应用尤为重要。只有具备消息重放能力的系统,才能确保数据处理的可靠性和一致性,满足「exact once」语意。比如在实时计算、日志收集和分析等应用场景,如果遇到错误,可以从头重新计算。这让Kafka可以更好地支持复杂流式数据处理需求。
Kafka 是一种分布式流处理平台,用于高容量、低延迟的数据传输和持久化存储。它使用发布订阅的消息队列模型,基于分布式日志存储的方式来处理消息。主题是消息发布的目的地,可以被分为多个分区,每个分区在不同的存储节点上分布着。生产者负责将消息发布到指定主题中,消费者根据订阅的主题和分区从 Kafka 获取消息。Kafka 使用位移来跟踪消费状态和保证消息的顺序,同时使用分布式日志存储来持久化消息。
在我们的托管 Kafka 服务中,为了满足区块链场景中的数据消费端到端有序性,我们限制了 topic 为单分区。
当前 Chainbase 支持多链 (ethereum, bsc, polygon..) 以及多种类型数据 (log, transactions, transfer…) Topic 进行消费;
RawData
我们自建了多链的RPC节点,提供稳定可靠的RawData数据,字段格式和主流ETL工具保持一致,具体schema可以参考[2]。区块链数据公开透明,具有很好的可组合性。我们提供blocks, logs , transactions, traces, contracts等原始数据类型,用户可以结合自己业务场景自由组合这些RawData。下面给出了一条transaction message示例。
{
"type": "transaction",
"hash": "0xd310285f40c898ab6da873f4a4b769fb838cc618be495cf880c3a9e3f2e6dea4",
"nonce": 24719,
"transaction_index": 1,
"from_address": "0x759ec1b3326de6fd4ba316f65a6f689c4e4c3092",
"to_address": "0x759ec1b3326de6fd4ba316f65a6f689c4e4c3092",
"value": 100000,
"gas": 31000,
"gas_price": 449542706643,
"input": "0x",
"block_timestamp": 1689253991,
"block_number": 17684799,
"block_hash": "0x2b2e1e5cfce445a1ff5227eaff0ec8d6c335cf6b7e00e0bc05abc468bcdc9b89",
"max_fee_per_gas": 526493200000,
"max_priority_fee_per_gas": 426493200000,
"transaction_type": 2,
"receipt_cumulative_gas_used": 141613,
"receipt_gas_used": 21000,
"receipt_contract_address": null,
"receipt_root": null,
"receipt_status": 1,
"receipt_effective_gas_price": 449542706643,
"item_id": "transaction_0xd310285f40c898ab6da873f4a4b769fb838cc618be495cf880c3a9e3f2e6dea4",
"item_timestamp": "2023-07-13T13:13:11Z"
}
加工数据
同时为了满足更多链上交易监控的需求,我们对 Transfer 等数据进行了富化(Enrich)聚合,更加方便直观的洞察链上交易行为。后续如果有反馈,我们还将提供更丰富的聚合数据类型,帮助客户更快、更准确地获取实时链上数据。
{
"value": "41063984478246667122325862",
"block_number": 17862713,
"block_timestamp": 1691407127,
"block_hash": "0x5a134e7a3da9c51581f15f962f2beded4f87449dc708bd79df2ef148a4a219fc",
"transaction_index": 59,
"transaction_hash": "0x4ff499e45269101ef2444975013c78abafd612dfef7f603e62dca884d4f79158",
"contract_address": "0x6d23e40e776c5d3505f0a8e2b434425121d9818e",
"log_index": 159,
"from_address": "0xb6b6768a21cc3354dbc413739432c8d8213c1628",
"to_address": "0xe38b9f6d7a58aca5531e383d4357825a075c65ec",
"token_metas": {
"symbol": "DCI",
"name": "DeltaCore Integrations",
"decimals": 18
}
}
Chainbase 作为开放的 Web3 数据基础设施,具有多年海量数据运维经验,使用我们的托管 kafka 服务,用户可以做到真正的开箱即用,完善的ACL权限管理,丰富的指标监控,节省大量时间!具体来说有下面的优势:
数据端到端一致
基于 WebSocket 或 Webhook 等网络协议请求,无法保证「端到端」的数据一致性。比如,客户端发送数据包到服务器端,数据包在网络中传输,经过多个路由器和网络设备。在某个路由器或网络设备上,由于网络拥塞或故障,数据包丢失。丢失的数据包无法到达服务器端,导致数据丢失。服务器端可能会在特定时间间隔内等待数据包的到达,如果超过等待时间仍未接收到数据包,则数据丢失。而基于 Kafka 则为我们服务端提供了「端到端」数据一致性的可能性
生态丰富
在开源和商业的ETL(Extract, Transform, Load)工具中集成 Kafka 是非常常见的做法。Kafka作为一种高性能、可扩展的分布式消息队列系统,提供了可靠的数据传输和持久化存储。将Kafka与ETL集成可以带来多个好处。
首先,Kafka 作为一个可靠的数据管道,可以作为 ETL 过程中数据的源头或目的地,通过Kafka的消息队列模型可以确保数据的顺序性和可靠性。其次,Kafka的高吞吐量和低延迟特性使得ETL能够更快速地处理和传输大量数据。
此外,Kafka 可以与其他组件和工具集成,例如流处理引擎(如 Apache Flink、Apache Spark Streaming),开源ETL工具 airbyte、流数据库 RisingWave 等都支持 source or sink 到 kafka。通过集成Kafka,ETL工具可以实现实时数据处理、数据流转和数据交换等功能,帮助组织更好地管理和分析海量数据,并支持实时决策和洞察力。
开放的数据集
开放数据集与 Kafka 的实时订阅和同步结合使用,为区块链数据实时分析系统提供了多样性的数据源、丰富的分析模型和场景,以及精确的数据验证,同时促进数据共享和开放。这种结合为我们带来更全面、准确的区块链数据视角,推动技术进步并加速问题解决。
[1] A Detailed Introduction to Kafka Components – Bhanu's Blog
[2] https://ethereum-etl.readthedocs.io/en/latest/schema/
原文链接:The Cloud-Native Kafka: Boosting Data Synchronization Reliability and Consistency
如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!