云原生 Kafka:提高链上数据同步的可靠和一致性

  • Chainbase
  • 更新于 2023-09-11 17:38
  • 阅读 1865

本文将和大家一起讨论链上数据同步解决方案 - Kafka 的实际使用和优势,Kafka 能够根据区块链上数据的位移来跟踪消费状态和保证消息的顺序,同时使用分布式日志存储来持久化消息。

作者:ddl

背景


在现有区块链世界,有很多业务场景需要实时获取链上和链下的数据,进一步加工数据价值。譬如基于数据筛选的 SmartMoney 交易监控,基于数据统计的实时TVL交易决策等等。这些场景都需要丰富的流式数据集的支持,Chainbase 开放性地提供基于「数据集」的实时 Sync 模块解决此类问题

挑战


当前市面上大多数实时消息流推送,基本上都是基于 WebSocket 和 Webhook 模式,两者都无法保证 exact once 语意,即保证消息刚好被消费一次。

WebSocket 可以实现实时的通信,但由于网络的不稳定性和消息传输的复杂性,可能会导致消息的乱序、丢失或重复。

并且 HTTP 请求是无状态的,无法保证请求的可靠性和顺序性。如果网络传输中发生错误,或者服务端没有正确处理重试和幂等性,就有可能导致重复的请求或丢失的请求,从而无法保证「exact once」的语义。

而 Kafka 的优势在于它支持消息的可重放功能。Kafka 作为消息队列,它可以持久化消息至磁盘。所以在保证「at least once」语意的同时,它也支持消息的重播功能,具有「效率一次」和「待机重放」的特性。

这对于需要建立在数据不丢失基础上的流式应用尤为重要。只有具备消息重放能力的系统,才能确保数据处理的可靠性和一致性,满足「exact once」语意。比如在实时计算、日志收集和分析等应用场景,如果遇到错误,可以从头重新计算。这让Kafka可以更好地支持复杂流式数据处理需求。

思路

基本介绍


Kafka 是一种分布式流处理平台,用于高容量、低延迟的数据传输和持久化存储。它使用发布订阅的消息队列模型,基于分布式日志存储的方式来处理消息。主题是消息发布的目的地,可以被分为多个分区,每个分区在不同的存储节点上分布着。生产者负责将消息发布到指定主题中,消费者根据订阅的主题和分区从 Kafka 获取消息。Kafka 使用位移来跟踪消费状态和保证消息的顺序,同时使用分布式日志存储来持久化消息。

1-Introducing Chainbase Kafka.png

在我们的托管 Kafka 服务中,为了满足区块链场景中的数据消费端到端有序性,我们限制了 topic 为单分区。

区块链数据 ETL 到 Kafka


2-Introducing Chainbase Kafka.png

caption

当前 Chainbase 支持多链 (ethereum, bsc, polygon..) 以及多种类型数据 (log, transactions, transfer…) Topic 进行消费;

  • RawData

    我们自建了多链的RPC节点,提供稳定可靠的RawData数据,字段格式和主流ETL工具保持一致,具体schema可以参考[2]。区块链数据公开透明,具有很好的可组合性。我们提供blocks, logs , transactions, traces, contracts等原始数据类型,用户可以结合自己业务场景自由组合这些RawData。下面给出了一条transaction message示例。

    {
    "type": "transaction",
    "hash": "0xd310285f40c898ab6da873f4a4b769fb838cc618be495cf880c3a9e3f2e6dea4",
    "nonce": 24719,
    "transaction_index": 1,
    "from_address": "0x759ec1b3326de6fd4ba316f65a6f689c4e4c3092",
    "to_address": "0x759ec1b3326de6fd4ba316f65a6f689c4e4c3092",
    "value": 100000,
    "gas": 31000,
    "gas_price": 449542706643,
    "input": "0x",
    "block_timestamp": 1689253991,
    "block_number": 17684799,
    "block_hash": "0x2b2e1e5cfce445a1ff5227eaff0ec8d6c335cf6b7e00e0bc05abc468bcdc9b89",
    "max_fee_per_gas": 526493200000,
    "max_priority_fee_per_gas": 426493200000,
    "transaction_type": 2,
    "receipt_cumulative_gas_used": 141613,
    "receipt_gas_used": 21000,
    "receipt_contract_address": null,
    "receipt_root": null,
    "receipt_status": 1,
    "receipt_effective_gas_price": 449542706643,
    "item_id": "transaction_0xd310285f40c898ab6da873f4a4b769fb838cc618be495cf880c3a9e3f2e6dea4",
    "item_timestamp": "2023-07-13T13:13:11Z"
    }
  • 加工数据

    同时为了满足更多链上交易监控的需求,我们对 Transfer 等数据进行了富化(Enrich)聚合,更加方便直观的洞察链上交易行为。后续如果有反馈,我们还将提供更丰富的聚合数据类型,帮助客户更快、更准确地获取实时链上数据。

    {
        "value": "41063984478246667122325862",
        "block_number": 17862713,
        "block_timestamp": 1691407127,
        "block_hash": "0x5a134e7a3da9c51581f15f962f2beded4f87449dc708bd79df2ef148a4a219fc",
        "transaction_index": 59,
        "transaction_hash": "0x4ff499e45269101ef2444975013c78abafd612dfef7f603e62dca884d4f79158",
        "contract_address": "0x6d23e40e776c5d3505f0a8e2b434425121d9818e",
        "log_index": 159,
        "from_address": "0xb6b6768a21cc3354dbc413739432c8d8213c1628",
        "to_address": "0xe38b9f6d7a58aca5531e383d4357825a075c65ec",
        "token_metas": {
            "symbol": "DCI",
            "name": "DeltaCore Integrations",
            "decimals": 18
        }
    }
    

kafka 运维


Chainbase 作为开放的 Web3 数据基础设施,具有多年海量数据运维经验,使用我们的托管 kafka 服务,用户可以做到真正的开箱即用,完善的ACL权限管理,丰富的指标监控,节省大量时间!具体来说有下面的优势:

  • 简化管理:托管 Kafka 解放了客户的管理负担。托管提供了自动化的集群配置、部署、监控和维护,减轻了运维团队的工作量。
  • 高可靠性:托管 Kafka 提供高「可用性和冗余」机制,确保消息的持久性和可靠性。具备备份和故障恢复策略,可以有效地应对硬件故障或其他意外情况。
  • 弹性伸缩:托管 Kafka 会根据需求动态调整「容量和吞吐量」,从而提供弹性和可伸缩性。通过简单的控制界面或API,可以根据负载的变化来增加或减少Kafka集群的规模。
  • 安全加固:托管 Kafka 提供数据加密、身份验证和访问控制等安全特性。这确保了数据的保密性和完整性,帮助组织符合合规性要求。

收益


  1. 数据端到端一致

    基于 WebSocket 或 Webhook 等网络协议请求,无法保证「端到端」的数据一致性。比如,客户端发送数据包到服务器端,数据包在网络中传输,经过多个路由器和网络设备。在某个路由器或网络设备上,由于网络拥塞或故障,数据包丢失。丢失的数据包无法到达服务器端,导致数据丢失。服务器端可能会在特定时间间隔内等待数据包的到达,如果超过等待时间仍未接收到数据包,则数据丢失。而基于 Kafka 则为我们服务端提供了「端到端」数据一致性的可能性

  2. 生态丰富

    在开源和商业的ETL(Extract, Transform, Load)工具中集成 Kafka 是非常常见的做法。Kafka作为一种高性能、可扩展的分布式消息队列系统,提供了可靠的数据传输和持久化存储。将Kafka与ETL集成可以带来多个好处。

    首先,Kafka 作为一个可靠的数据管道,可以作为 ETL 过程中数据的源头或目的地,通过Kafka的消息队列模型可以确保数据的顺序性和可靠性。其次,Kafka的高吞吐量和低延迟特性使得ETL能够更快速地处理和传输大量数据。

    此外,Kafka 可以与其他组件和工具集成,例如流处理引擎(如 Apache Flink、Apache Spark Streaming),开源ETL工具 airbyte、流数据库 RisingWave 等都支持 source or sink 到 kafka。通过集成Kafka,ETL工具可以实现实时数据处理、数据流转和数据交换等功能,帮助组织更好地管理和分析海量数据,并支持实时决策和洞察力。

  3. 开放的数据集

    开放数据集与 Kafka 的实时订阅和同步结合使用,为区块链数据实时分析系统提供了多样性的数据源、丰富的分析模型和场景,以及精确的数据验证,同时促进数据共享和开放。这种结合为我们带来更全面、准确的区块链数据视角,推动技术进步并加速问题解决。

参考


[1] A Detailed Introduction to Kafka Components – Bhanu's Blog

[2] https://ethereum-etl.readthedocs.io/en/latest/schema/

原文链接:The Cloud-Native Kafka: Boosting Data Synchronization Reliability and Consistency

点赞 0
收藏 0
分享
本文参与登链社区写作激励计划 ,好文好收益,欢迎正在阅读的你也加入。

0 条评论

请先 登录 后评论
Chainbase
Chainbase
0x7C46...d02D
Chainbase 是领先的 Web3 数据基础设施,帮助开发者轻松访问加密数据,并支持对数据的大规模索引、转换和使用。