Esper

Esper 是用于复杂事件处理(CEP)和流分析的软件,可用 .net as NEsper。

Esper和NEsper支持快速开发处理大量传入消息或事件的应用程序,而不管传入消息是历史消息还是实时消息。Esper和NEsper以不同的方式过滤和分析事件,并响应感兴趣的条件。

Esper和事件处理语言(EPL)提供了一个高度可伸缩、内存高效、内存计算、sql标准、最小延迟、支持实时流的大数据处理引擎,可用于任何速度的在线、实时到达的数据和各种各样的数据,以及历史事件分析。

Esper不局限于在一台机器上运行,并且在分布式流处理框架内运行良好。Esper很有意义,可以在任何架构和任何容器中运行,因为它不依赖于外部服务,不需要任何特定的线程模型或时间如何推进的模型,也不需要任何外部存储。Esper可以很好地处理事件时间和基于水印的时间管理。

Esper有一个水平扩展架构,用于线性水平可伸缩性、弹性缩放、负载分布、平衡和重新平衡、容错、通过种子节点动态发现节点、复制和多数据中心支持。Esper的水平扩展架构构建于Apache Kafka和Apache Zookeeper之上,见Esper企业版。

设计核心

低延迟和高吞吐量。

EPL语言的表达性、简洁性、可扩展性。

遵守标准和最佳实践。

在内存、CPU和IO使用方面的轻量级。

核心概念

架构图

从Esper的架构图中,可以看出,Esper主要包括了三个部分:Input adapter,Esper engine,Output adapter。

2.1 Input adapter & Output adapter

输入适配器和输出适配器的主要目的是接收来自不同事件源的事件,并向不同的目的地输出事件。 目前,Esper提供的适配器包括File Input and Output adpter, Spring JMS Input and Output Adapter, AMQP Input and Output Adapter, Kafka Adapter等等。这些适配器提供了一系列接口,可以让用户从不同的数据源读取数据,并将数据发送给不同的目的数据源,用户可以不用自己单独编写客户端代码来连接这些数据源,感觉相当于对这些数据源提供了一层封装。

2.2 Esper engine

Esper引擎是处理事件的核心,它允许用户定义需要接收的事件以及对这些事件的处理方式。

2.2.1 Esper支持的事件表现形式

Esper支持多种事件表现形式,包括遵循JavaBean方式的含有getter方法的Java POJO(普通Java对象),实现了Map接口的对象,对象数组,XML文档对象,以及Apache Avro(一个支持JSON和Schema的数据序列化系统,可以将数据结构或对象转化成便于存储和传输的格式)。 这些事件表现形式的共同之处在于,它们都提供了事件类型的元数据,也就是说能够表示事件的一系列属性,例如,一个Java对象可以通过其成员变量来表示其事件属性,一个Map对象能够通过键值对来表示属性。由此可见,本质上事件是一系列属性值的集合,对事件的操作即对事件中的部分或全部属性的操作。

2.2.2 Esper事件处理模型

Esper的事件处理模型主要包括两个部分:Statement和Listener。 (1)Statement 利用Esper的事件处理语言EPL声明对事件进行的操作,Esper中提供了多种类型的事件操作,包括过滤、加窗、事件聚合等等。EPL是一种类似于SQL的语言,从这一点上来看,Esper恰好与数据库相反,数据库时保存数据,并在数据上运行查询语句,而Esper是保存查询语句,在这些查询上运行数据,只要事件与查询条件匹配,Esper就会实时进行处理,而不是只有在查询提交的时候才处理。

快速开始

maven

测试 jdk=1.7 所以选择了个较老的版本

  [xml]
1
2
3
4
5
<dependency> <groupId>com.espertech</groupId> <artifactId>esper</artifactId> <version>5.2.0</version> </dependency>

代码

  • Apple.java
  [java]
1
2
3
4
5
6
7
public class Apple { private int id; private int price; //getter & setter }
  • AppleListener.java
  [java]
1
2
3
4
5
6
7
8
9
10
11
12
import com.espertech.esper.client.EventBean; import com.espertech.esper.client.UpdateListener; public class AppleListener implements UpdateListener { public void update(EventBean[] eventBeans, EventBean[] eventBeans1) { if (eventBeans != null) { Double avg = (Double) eventBeans[0].get("avg(price)"); System.out.println("Apple's average price is " + avg); } } }
  • main()
  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public static void main(String[] args) { EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider(); EPAdministrator admin = epService.getEPAdministrator(); String product = Apple.class.getName(); String epl = "select avg(price) from " + product + ".win:length_batch(3)"; EPStatement state = admin.createEPL(epl); state.addListener(new AppleListener()); EPRuntime runtime = epService.getEPRuntime(); Apple apple1 = new Apple(); apple1.setId(1); apple1.setPrice(5); runtime.sendEvent(apple1); Apple apple2 = new Apple(); apple2.setId(2); apple2.setPrice(2); runtime.sendEvent(apple2); Apple apple3 = new Apple(); apple3.setId(3); apple3.setPrice(5); runtime.sendEvent(apple3); }
  • 测试结果
  [plaintext]
1
Apple's average price is 4.0

参考资料

Esper学习之一:Esper介绍