Esper
Esper 是用于复杂事件处理(CEP)和流分析的软件,可用 .net as NEsper。
Esper和NEsper支持快速开发处理大量传入消息或事件的应用程序,而不管传入消息是历史消息还是实时消息。Esper和NEsper以不同的方式过滤和分析事件,并响应感兴趣的条件。
Esper和事件处理语言(EPL)提供了一个高度可伸缩、内存高效、内存计算、sql标准、最小延迟、支持实时流的大数据处理引擎,可用于任何速度的在线、实时到达的数据和各种各样的数据,以及历史事件分析。
Esper不局限于在一台机器上运行,并且在分布式流处理框架内运行良好。Esper很有意义,可以在任何架构和任何容器中运行,因为它不依赖于外部服务,不需要任何特定的线程模型或时间如何推进的模型,也不需要任何外部存储。Esper可以很好地处理事件时间和基于水印的时间管理。
Esper有一个水平扩展架构,用于线性水平可伸缩性、弹性缩放、负载分布、平衡和重新平衡、容错、通过种子节点动态发现节点、复制和多数据中心支持。Esper的水平扩展架构构建于Apache Kafka和Apache Zookeeper之上,见Esper企业版。
设计核心
低延迟和高吞吐量。
EPL语言的表达性、简洁性、可扩展性。
遵守标准和最佳实践。
在内存、CPU和IO使用方面的轻量级。
核心概念
从Esper的架构图中,可以看出,Esper主要包括了三个部分:Input adapter,Esper engine,Output adapter。
2.1 Input adapter & Output adapter
输入适配器和输出适配器的主要目的是接收来自不同事件源的事件,并向不同的目的地输出事件。 目前,Esper提供的适配器包括File Input and Output adpter, Spring JMS Input and Output Adapter, AMQP Input and Output Adapter, Kafka Adapter等等。这些适配器提供了一系列接口,可以让用户从不同的数据源读取数据,并将数据发送给不同的目的数据源,用户可以不用自己单独编写客户端代码来连接这些数据源,感觉相当于对这些数据源提供了一层封装。
2.2 Esper engine
Esper引擎是处理事件的核心,它允许用户定义需要接收的事件以及对这些事件的处理方式。
2.2.1 Esper支持的事件表现形式
Esper支持多种事件表现形式,包括遵循JavaBean方式的含有getter方法的Java POJO(普通Java对象),实现了Map接口的对象,对象数组,XML文档对象,以及Apache Avro(一个支持JSON和Schema的数据序列化系统,可以将数据结构或对象转化成便于存储和传输的格式)。 这些事件表现形式的共同之处在于,它们都提供了事件类型的元数据,也就是说能够表示事件的一系列属性,例如,一个Java对象可以通过其成员变量来表示其事件属性,一个Map对象能够通过键值对来表示属性。由此可见,本质上事件是一系列属性值的集合,对事件的操作即对事件中的部分或全部属性的操作。
2.2.2 Esper事件处理模型
Esper的事件处理模型主要包括两个部分:Statement和Listener。 (1)Statement 利用Esper的事件处理语言EPL声明对事件进行的操作,Esper中提供了多种类型的事件操作,包括过滤、加窗、事件聚合等等。EPL是一种类似于SQL的语言,从这一点上来看,Esper恰好与数据库相反,数据库时保存数据,并在数据上运行查询语句,而Esper是保存查询语句,在这些查询上运行数据,只要事件与查询条件匹配,Esper就会实时进行处理,而不是只有在查询提交的时候才处理。
快速开始
maven
测试 jdk=1.7 所以选择了个较老的版本
<dependency>
<groupId>com.espertech</groupId>
<artifactId>esper</artifactId>
<version>5.2.0</version>
</dependency>
代码
- Apple.java
public class Apple {
private int id;
private int price;
//getter & setter
}
- AppleListener.java
import com.espertech.esper.client.EventBean;
import com.espertech.esper.client.UpdateListener;
public class AppleListener implements UpdateListener {
public void update(EventBean[] eventBeans, EventBean[] eventBeans1) {
if (eventBeans != null)
{
Double avg = (Double) eventBeans[0].get("avg(price)");
System.out.println("Apple's average price is " + avg);
}
}
}
- main()
public static void main(String[] args) {
EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider();
EPAdministrator admin = epService.getEPAdministrator();
String product = Apple.class.getName();
String epl = "select avg(price) from " + product + ".win:length_batch(3)";
EPStatement state = admin.createEPL(epl);
state.addListener(new AppleListener());
EPRuntime runtime = epService.getEPRuntime();
Apple apple1 = new Apple();
apple1.setId(1);
apple1.setPrice(5);
runtime.sendEvent(apple1);
Apple apple2 = new Apple();
apple2.setId(2);
apple2.setPrice(2);
runtime.sendEvent(apple2);
Apple apple3 = new Apple();
apple3.setId(3);
apple3.setPrice(5);
runtime.sendEvent(apple3);
}
- 测试结果
Apple's average price is 4.0