[译]使用Eventeum监听以太坊合约事件

  • Tiny熊
  • 更新于 2020-03-16 23:27
  • 阅读 10254

使用 Eventeum 可以轻松的实现在后端服务中订阅(监听)以太坊合约事件. 

但凡我们构建一个稍微上了一点规模的DApp,监听以太坊上发生的事件都是很痛苦的一件事。

今天,我们看看如何使用Eventeum来解决这个痛点. Eventeum 由 Kauri Team开发, 并且源代码开源(Apache 2.0协议许可), github地址

Eventeum是什么?

Eventeum 是一个以太坊事件监听服务(Event listener service ),它可以用来桥接智能合约与(后端服务的)中间层。Eventeum 支持动态订阅以太坊事件, 当以太坊智能合约事件被触发时,包含事件详情的消息将广播到消息总线上(当前支持 Kafka或RabbitMQ),进而被后端服务利用。下面是一个架构图:

Eventeum listen

Eventeum 主要特性包括:

  • 可动态配置 - 应用可以利用Eventeum提供的rest API动态订阅或取消订阅以太坊智能合约事件。

  • 高可用性 - Eventeum实例间会彼此通信以确保所有实例订阅相同的智能合约事件集合。

  • 弹性(容忍失效) - Eventeum会自动检测节点失效,当节点恢复工作后订阅可以从失效时的区块继续。

  • (软)分叉容错 - Eventeum可以配置事件"确认"所需的区块数。如果在此期间发生了区块链的分叉, Eventeum会广播一个消息到网络中,以便让后端服务对分叉的(或移除的)事件进行处理。

部署 Eventeum

Eventeum目前支持4种广播机制:

这篇文章,我们将使用Kafka。

部署Eventeum前,需要先安装以下依赖:

  • Java8

  • Maven

  • Mongo

  • Kafka

  • Zookeeper

  • Ethereum Node

  • Docker (可选)

可以选择使用 docker (或不用docker) 来部署Eventeum,不过我建议使用docker 进行套件安装(all-in-one installation) ,它包含了上述大部分依赖,不过仍然需要安装Java8和Maven。(我两个方式都安装过, 最简单的方式是 clone 代码,在server文件下运行docker-compose脚本. 会简单很多,它会自动安装 Kafka/Zookepper , MongoDB 以及Parity)

使用Docker安装 Eventeum

使用Docker安装 Eventeum, 系统需要先安装 Docker 和 Docker-compose, 安装之后再参考下面的安装指引.

  • Git 克隆以太坊 Eventeum 库:
$ git clone https://github.com/ConsenSys/eventeum.git
  • 克隆Eventeum代码后,进入代码跟目录:
$ cd /path/to/eventeum/
  • 编译、测试、打包Eventeum项目:
$ mvn clean package
  • docker 套件(all-in-one)安装Eventeum:
$ cd server
$ docker compose build
$ docker compose up

安装完成后,可以在Docker终端中查看日志。

注意:根据你的docker安装情况,你可能需要使用sudo

部署合约

为了测试Eventeum,我们需要先在本地Parity节点(Eventeum会自动安装) 上部署一个示例智能合约,然后订阅一个合约事件以通过Kafka广播。

在本地Parity上部署合约

让我们部署一下CrudApp.sol合约,这里使用Remix IDE 和Metamask。

CrudApp.sol合约源码如下:

pragma solidity ^0.4.23;

contract CrudApp {

   struct country{
      string name;
      string leader;
      uint256 population;
   }

   country[] public countries; 

   uint256 public totalCountries;

    constructor() public {
       totalCountries = 0;
   }

    // 定义事件
   event CountryEvent(string countryName , string leader, uint256 population);
   event LeaderUpdated(string countryName , string leader);
   event CountryDelete(string countryName);

    // 会触发CountryEvent事件
   function insert( string countryName , string leader , uint256 population) public returns (uint256 totalCountries){
        country memory newCountry = country(countryName , leader, population);
        countries.push(newCountry);
        totalCountries++;
        //emit event
        emit CountryEvent (countryName, leader, population);
        return totalCountries;
   }

   function updateLeader(string countryName, string newLeader) public returns (bool success){
       //This has a problem we need loop
       for(uint256 i =0; i< totalCountries; i++){
           if(compareStrings(countries[i].name ,countryName)){
              countries[i].leader = newLeader;
              emit LeaderUpdated(countryName, newLeader);
              return true;
           }
       }
       return false;
   }

   function deleteCountry(string countryName) public returns(bool success){
        require(totalCountries > 0);
        for(uint256 i =0; i< totalCountries; i++){
           if(compareStrings(countries[i].name , countryName)){
              countries[i] = countries[totalCountries-1]; // pushing last into current arrray index which we gonna delete
              delete countries[totalCountries-1]; // now deleteing last index
              totalCountries--; //total count decrease
              countries.length--; // array length decrease
              //emit event
              emit CountryDelete(countryName);
              return true;
           }
       }
       return false;
   }

   function getCountry(string countryName) public view returns(string name , string leader , uint256 population){
        for(uint256 i =0; i< totalCountries; i++){
           if(compareStrings(countries[i].name, countryName)){
              //emit event
              return (countries[i].name , countries[i].leader , countries[i].population);
           }
       }
       revert('country not found');
   }     

  function compareStrings (string a, string b)  internal pure returns (bool){
       return keccak256(a) == keccak256(b);
   }

   function getTotalCountries() public view returns (uint256 length){
      return countries.length;
   }
}
  • 打开Remix IDE,添加CrudApp.sol合约
  • 选择适合合约solc编译器版本(这里为0.4.23),然后编译合约(如果有警告,请忽略?)
  • 在Metamask上选择节点: Localhost:8545
  • 在Remix IDE中,切换到Deploy & Run Transaction选项页,然后环境选择 Injected web3 并进行部署

现在,合约已经部署到本地的Parity以太坊开发节点了。

注册合约事件

现在我们需要注册以太坊事件了,这样Eventeum可以监听这些事件然后进行广播。Eventeum 提供了rest API,下面就会使用这些API来注册智能合约事件。

也可以通过修改配置文件 application.yml 订阅合约事件.

  1. 注册合约事件
  • URL: /api/rest/v1/event-filter
  • Method: POST
  • Post 数据如下:
{
    "id": "event-identifier",
    "contractAddress": "0x1fbBeeE6eC2B7B095fE3c5A572551b1e260Af4d2",
    "eventSpecification": {
        "eventName": "TestEvent",
        "indexedParameterDefinitions": [
          {"position": 0, "type": "UINT256"},
          {"position": 1, "type": "ADDRESS"}],
        "nonIndexedParameterDefinitions": [
          {"position": 2, "type": "BYTES32"},
          {"position": 3, "type": "STRING"}] },
    "correlationIdStrategy": {
        "type": "NON_INDEXED_PARAMETER",
        "parameterIndex": 0 }
}

REST请求的主体定义了要注册的事件的规范。 与事件的智能合约定义相比,我们可以看到它定义了事件名称以及每个参数的类型和顺序。

为了简化,我们使用curl调用上述API注册指定的以太坊智能合约事件:

curl -X POST \
http://localhost:8060/api/rest/v1/event-filter \
-H 'Cache-Control: no-cache' \
-H 'Content-Type: application/json' \
-H 'Postman-Token: 616712a3-bf11-bbf5-b4ac-b82835779d51' \
-d '{
"id": "Country",
"contractAddress": "ENTER_YOUR_CONTRACT_ADDRESS_CHECK_IN_REMIX",
"eventSpecification": {
  "eventName": "CountryEvent",
  "nonIndexedParameterDefinitions": [
    {"position": 0, "type": "STRING"},
    {"position": 1, "type": "STRING"},
    {"position": 2, "type": "UINT256"}] }
}'

如果注册成功,在控制太看到响应如:registerContractEventFilter - Registering filter: {"id":"Country"...

响应中会返回ID(这个例子是Country)。

现在我们调用合约方法insert以便触发 CountryEvent事件。同样使用Remix IDE来进行合约调用:

Eventeum教程

现在我们为insert方法传入3参数("USA", "Elizabeth Warren", 327946410), 一切正常的话,你就可以在Docker终端中看到交易:

broadcastContractEvent - Sending message: 
{"id":" ..-0" ,"type":"CONTRACT_EVENT", "details":
{"name":"CountryEvent", ...}, "retries":0}

如果您看到此消息,则表示Eventeum已从我们的智能合约接收到事件发出的通知,并已将包含发出事件的所有详细信息的相应CONTRACT_EVENT消息推送到我们的Kafka队列中。

在NodeJS服务中订阅Eventeum服务

现在我们使用NodeJS服务来订阅Eventuem推送的Kafka事件, 这将帮助我们清楚地了解如何在应用程序中使用Eventeum。

订阅Eventuem操作步骤如下。

$ mkdir watcher
$ cd watcher
$ npm init
$ npm install
$ npm i kafka-node  (Kafka-nodejs client)
$ touch index.js

在index.js中添加如下代码:

var kafka = require('kafka-node')
const client = new kafka.KafkaClient({kafkaHost: 'localhost:9092'});
var kafka = require('kafka-node'),
    Consumer = kafka.Consumer,
    // client = new kafka.Client(),
    consumer = new Consumer(
        client,
        [
            { topic: 'contract-events', partition: 0 }
        ],
        {
            autoCommit: false
        }
    );
consumer.on('message', function (message) {
    console.log(message);
});

运行index.js:

nodejs index.js

为了测试下,再次使用remix进行调用insert`函数,将能够看到该事件的控制台输出。

正如在index.js你看到的,我们使用的主题是Eventeum内置的 contract-events 。 使用下面的命令查看Eventeum的全部内建Kafka主题:

bin/kafka-topics.sh --list --zookeeper localhost:2181

使用docker安装时,你需要进入docker中用实际安装路径运行/bin/kafka-topics.sh命令。

Eventeum已经创建了以下主题:

__consumer_offsets
block-events
contract-events
filter-events
filter-events-dlteventeum
filter-events-erreventeum
filter-eventsnull

在这里,block-events用来跟踪新的块生成。

您可以在此处了解更多有关Eventeum API的详细信息,并了解更多信息。

如何在测试网或主网上使用Eventeum?

要在以太坊Testnet或Mainnet上使用Eventeum,需要修改docker-compose.yml 中的以太坊节点URL。

由于使用 Infura 会遇到了 eth_newBlockFilter api 的问题 ,因此也可以使用QuikNode来实现此目的。

QuikNode提供快速的以太坊节点,结合Eventeum,可以可靠地监视智能合约事件。

立即注册一个QuikNode免费帐户

关于QuikNode

QuikNode正在构建基础设施以支持Web3的到来。QuikNode已经与数百家公司合作,以帮助他们扩展Dapps并提供高度可靠的以太坊节点。 

原文链接:Ethereum Events monitoring using Eventeum

参考Listening to Ethereum Events with Eventeum

点赞 2
收藏 5
分享
本文参与登链社区写作激励计划 ,好文好收益,欢迎正在阅读的你也加入。

1 条评论

请先 登录 后评论
Tiny熊
Tiny熊
0xD682...E8AB
登链社区发起人 通过区块链技术让世界变得更好而尽一份力。