使用 Eventeum 可以轻松的实现在后端服务中订阅(监听)以太坊合约事件.
但凡我们构建一个稍微上了一点规模的DApp,监听以太坊上发生的事件都是很痛苦的一件事。
今天,我们看看如何使用Eventeum来解决这个痛点. Eventeum 由 Kauri Team开发, 并且源代码开源(Apache 2.0协议许可), github地址。
Eventeum 是一个以太坊事件监听服务(Event listener service ),它可以用来桥接智能合约与(后端服务的)中间层。Eventeum 支持动态订阅以太坊事件, 当以太坊智能合约事件被触发时,包含事件详情的消息将广播到消息总线上(当前支持 Kafka或RabbitMQ),进而被后端服务利用。下面是一个架构图:
Eventeum 主要特性包括:
可动态配置 - 应用可以利用Eventeum提供的rest API动态订阅或取消订阅以太坊智能合约事件。
高可用性 - Eventeum实例间会彼此通信以确保所有实例订阅相同的智能合约事件集合。
弹性(容忍失效) - Eventeum会自动检测节点失效,当节点恢复工作后订阅可以从失效时的区块继续。
(软)分叉容错 - Eventeum可以配置事件"确认"所需的区块数。如果在此期间发生了区块链的分叉, Eventeum会广播一个消息到网络中,以便让后端服务对分叉的(或移除的)事件进行处理。
Eventeum目前支持4种广播机制:
这篇文章,我们将使用Kafka。
部署Eventeum前,需要先安装以下依赖:
Java8
Maven
Mongo
Kafka
Zookeeper
Ethereum Node
Docker (可选)
可以选择使用 docker (或不用docker) 来部署Eventeum,不过我建议使用docker 进行套件安装(all-in-one installation) ,它包含了上述大部分依赖,不过仍然需要安装Java8和Maven。(我两个方式都安装过, 最简单的方式是 clone 代码,在server文件下运行docker-compose脚本. 会简单很多,它会自动安装 Kafka/Zookepper , MongoDB 以及Parity)
使用Docker安装 Eventeum, 系统需要先安装 Docker 和 Docker-compose, 安装之后再参考下面的安装指引.
$ git clone https://github.com/ConsenSys/eventeum.git
$ cd /path/to/eventeum/
$ mvn clean package
$ cd server
$ docker compose build
$ docker compose up
安装完成后,可以在Docker终端中查看日志。
注意:根据你的docker安装情况,你可能需要使用sudo
。
为了测试Eventeum,我们需要先在本地Parity节点(Eventeum会自动安装) 上部署一个示例智能合约,然后订阅一个合约事件以通过Kafka广播。
让我们部署一下CrudApp.sol合约,这里使用Remix IDE 和Metamask。
CrudApp.sol合约源码如下:
pragma solidity ^0.4.23;
contract CrudApp {
struct country{
string name;
string leader;
uint256 population;
}
country[] public countries;
uint256 public totalCountries;
constructor() public {
totalCountries = 0;
}
// 定义事件
event CountryEvent(string countryName , string leader, uint256 population);
event LeaderUpdated(string countryName , string leader);
event CountryDelete(string countryName);
// 会触发CountryEvent事件
function insert( string countryName , string leader , uint256 population) public returns (uint256 totalCountries){
country memory newCountry = country(countryName , leader, population);
countries.push(newCountry);
totalCountries++;
//emit event
emit CountryEvent (countryName, leader, population);
return totalCountries;
}
function updateLeader(string countryName, string newLeader) public returns (bool success){
//This has a problem we need loop
for(uint256 i =0; i< totalCountries; i++){
if(compareStrings(countries[i].name ,countryName)){
countries[i].leader = newLeader;
emit LeaderUpdated(countryName, newLeader);
return true;
}
}
return false;
}
function deleteCountry(string countryName) public returns(bool success){
require(totalCountries > 0);
for(uint256 i =0; i< totalCountries; i++){
if(compareStrings(countries[i].name , countryName)){
countries[i] = countries[totalCountries-1]; // pushing last into current arrray index which we gonna delete
delete countries[totalCountries-1]; // now deleteing last index
totalCountries--; //total count decrease
countries.length--; // array length decrease
//emit event
emit CountryDelete(countryName);
return true;
}
}
return false;
}
function getCountry(string countryName) public view returns(string name , string leader , uint256 population){
for(uint256 i =0; i< totalCountries; i++){
if(compareStrings(countries[i].name, countryName)){
//emit event
return (countries[i].name , countries[i].leader , countries[i].population);
}
}
revert('country not found');
}
function compareStrings (string a, string b) internal pure returns (bool){
return keccak256(a) == keccak256(b);
}
function getTotalCountries() public view returns (uint256 length){
return countries.length;
}
}
Deploy & Run Transaction
选项页,然后环境选择 Injected web3
并进行部署现在,合约已经部署到本地的Parity以太坊开发节点了。
现在我们需要注册以太坊事件了,这样Eventeum可以监听这些事件然后进行广播。Eventeum 提供了rest API,下面就会使用这些API来注册智能合约事件。
也可以通过修改配置文件 application.yml 订阅合约事件.
/api/rest/v1/event-filter
{
"id": "event-identifier",
"contractAddress": "0x1fbBeeE6eC2B7B095fE3c5A572551b1e260Af4d2",
"eventSpecification": {
"eventName": "TestEvent",
"indexedParameterDefinitions": [
{"position": 0, "type": "UINT256"},
{"position": 1, "type": "ADDRESS"}],
"nonIndexedParameterDefinitions": [
{"position": 2, "type": "BYTES32"},
{"position": 3, "type": "STRING"}] },
"correlationIdStrategy": {
"type": "NON_INDEXED_PARAMETER",
"parameterIndex": 0 }
}
REST请求的主体定义了要注册的事件的规范。 与事件的智能合约定义相比,我们可以看到它定义了事件名称以及每个参数的类型和顺序。
为了简化,我们使用curl调用上述API注册指定的以太坊智能合约事件:
curl -X POST \
http://localhost:8060/api/rest/v1/event-filter \
-H 'Cache-Control: no-cache' \
-H 'Content-Type: application/json' \
-H 'Postman-Token: 616712a3-bf11-bbf5-b4ac-b82835779d51' \
-d '{
"id": "Country",
"contractAddress": "ENTER_YOUR_CONTRACT_ADDRESS_CHECK_IN_REMIX",
"eventSpecification": {
"eventName": "CountryEvent",
"nonIndexedParameterDefinitions": [
{"position": 0, "type": "STRING"},
{"position": 1, "type": "STRING"},
{"position": 2, "type": "UINT256"}] }
}'
如果注册成功,在控制太看到响应如:registerContractEventFilter - Registering filter: {"id":"Country"...
响应中会返回ID
(这个例子是Country
)。
现在我们调用合约方法insert
以便触发 CountryEvent
事件。同样使用Remix IDE来进行合约调用:
现在我们为insert方法传入3参数("USA", "Elizabeth Warren", 327946410)
, 一切正常的话,你就可以在Docker终端中看到交易:
broadcastContractEvent - Sending message:
{"id":" ..-0" ,"type":"CONTRACT_EVENT", "details":
{"name":"CountryEvent", ...}, "retries":0}
如果您看到此消息,则表示Eventeum已从我们的智能合约接收到事件发出的通知,并已将包含发出事件的所有详细信息的相应CONTRACT_EVENT消息推送到我们的Kafka队列中。
现在我们使用NodeJS服务来订阅Eventuem推送的Kafka事件, 这将帮助我们清楚地了解如何在应用程序中使用Eventeum。
订阅Eventuem操作步骤如下。
$ mkdir watcher
$ cd watcher
$ npm init
$ npm install
$ npm i kafka-node (Kafka-nodejs client)
$ touch index.js
在index.js中添加如下代码:
var kafka = require('kafka-node')
const client = new kafka.KafkaClient({kafkaHost: 'localhost:9092'});
var kafka = require('kafka-node'),
Consumer = kafka.Consumer,
// client = new kafka.Client(),
consumer = new Consumer(
client,
[
{ topic: 'contract-events', partition: 0 }
],
{
autoCommit: false
}
);
consumer.on('message', function (message) {
console.log(message);
});
运行index.js:
nodejs index.js
为了测试下,再次使用remix进行调用insert`函数,将能够看到该事件的控制台输出。
正如在index.js你看到的,我们使用的主题是Eventeum内置的 contract-events
。 使用下面的命令查看Eventeum的全部内建Kafka主题:
bin/kafka-topics.sh --list --zookeeper localhost:2181
使用docker安装时,你需要进入docker中用实际安装路径运行/bin/kafka-topics.sh
命令。
Eventeum已经创建了以下主题:
__consumer_offsets
block-events
contract-events
filter-events
filter-events-dlteventeum
filter-events-erreventeum
filter-eventsnull
在这里,block-events
用来跟踪新的块生成。
您可以在此处了解更多有关Eventeum API的详细信息,并了解更多信息。
要在以太坊Testnet或Mainnet上使用Eventeum,需要修改docker-compose.yml 中的以太坊节点URL。
由于使用 Infura 会遇到了 eth_newBlockFilter
api 的问题 ,因此也可以使用QuikNode来实现此目的。
QuikNode提供快速的以太坊节点,结合Eventeum,可以可靠地监视智能合约事件。
立即注册一个QuikNode免费帐户!
关于QuikNode
QuikNode正在构建基础设施以支持Web3的到来。QuikNode已经与数百家公司合作,以帮助他们扩展Dapps并提供高度可靠的以太坊节点。
如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!