Esper

Esper 是用于复杂事件处理(CEP)和流分析的软件,可用 .net as NEsper。

Esper和NEsper支持快速开发处理大量传入消息或事件的应用程序,而不管传入消息是历史消息还是实时消息。Esper和NEsper以不同的方式过滤和分析事件,并响应感兴趣的条件。

Esper和事件处理语言(EPL)提供了一个高度可伸缩、内存高效、内存计算、sql标准、最小延迟、支持实时流的大数据处理引擎,可用于任何速度的在线、实时到达的数据和各种各样的数据,以及历史事件分析。

Esper不局限于在一台机器上运行,并且在分布式流处理框架内运行良好。Esper很有意义,可以在任何架构和任何容器中运行,因为它不依赖于外部服务,不需要任何特定的线程模型或时间如何推进的模型,也不需要任何外部存储。Esper可以很好地处理事件时间和基于水印的时间管理。

Esper有一个水平扩展架构,用于线性水平可伸缩性、弹性缩放、负载分布、平衡和重新平衡、容错、通过种子节点动态发现节点、复制和多数据中心支持。Esper的水平扩展架构构建于Apache Kafka和Apache Zookeeper之上,见Esper企业版。

设计核心

低延迟和高吞吐量。

EPL语言的表达性、简洁性、可扩展性。

遵守标准和最佳实践。

在内存、CPU和IO使用方面的轻量级。

核心概念

架构图

从Esper的架构图中,可以看出,Esper主要包括了三个部分:Input adapter,Esper engine,Output adapter。

2.1 Input adapter & Output adapter

输入适配器和输出适配器的主要目的是接收来自不同事件源的事件,并向不同的目的地输出事件。 目前,Esper提供的适配器包括File Input and Output adpter, Spring JMS Input and Output Adapter, AMQP Input and Output Adapter, Kafka Adapter等等。这些适配器提供了一系列接口,可以让用户从不同的数据源读取数据,并将数据发送给不同的目的数据源,用户可以不用自己单独编写客户端代码来连接这些数据源,感觉相当于对这些数据源提供了一层封装。

2.2 Esper engine

Esper引擎是处理事件的核心,它允许用户定义需要接收的事件以及对这些事件的处理方式。

2.2.1 Esper支持的事件表现形式

Esper支持多种事件表现形式,包括遵循JavaBean方式的含有getter方法的Java POJO(普通Java对象),实现了Map接口的对象,对象数组,XML文档对象,以及Apache Avro(一个支持JSON和Schema的数据序列化系统,可以将数据结构或对象转化成便于存储和传输的格式)。 这些事件表现形式的共同之处在于,它们都提供了事件类型的元数据,也就是说能够表示事件的一系列属性,例如,一个Java对象可以通过其成员变量来表示其事件属性,一个Map对象能够通过键值对来表示属性。由此可见,本质上事件是一系列属性值的集合,对事件的操作即对事件中的部分或全部属性的操作。

2.2.2 Esper事件处理模型

Esper的事件处理模型主要包括两个部分:Statement和Listener。 (1)Statement 利用Esper的事件处理语言EPL声明对事件进行的操作,Esper中提供了多种类型的事件操作,包括过滤、加窗、事件聚合等等。EPL是一种类似于SQL的语言,从这一点上来看,Esper恰好与数据库相反,数据库时保存数据,并在数据上运行查询语句,而Esper是保存查询语句,在这些查询上运行数据,只要事件与查询条件匹配,Esper就会实时进行处理,而不是只有在查询提交的时候才处理。

快速开始

maven

测试 jdk=1.7 所以选择了个较老的版本

<dependency>
    <groupId>com.espertech</groupId>
    <artifactId>esper</artifactId>
    <version>5.2.0</version>
</dependency>

代码

  • Apple.java
public class Apple {

    private int id;
    private int price;
	
	//getter & setter
}
  • AppleListener.java
import com.espertech.esper.client.EventBean;
import com.espertech.esper.client.UpdateListener;

public class AppleListener implements UpdateListener {
    public void update(EventBean[] eventBeans, EventBean[] eventBeans1) {
        if (eventBeans != null)
        {
            Double avg = (Double) eventBeans[0].get("avg(price)");
            System.out.println("Apple's average price is " + avg);
        }
    }
}
  • main()
public static void main(String[] args) {
    EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider();

    EPAdministrator admin = epService.getEPAdministrator();

    String product = Apple.class.getName();
    String epl = "select avg(price) from " + product + ".win:length_batch(3)";

    EPStatement state = admin.createEPL(epl);
    state.addListener(new AppleListener());

    EPRuntime runtime = epService.getEPRuntime();

    Apple apple1 = new Apple();
    apple1.setId(1);
    apple1.setPrice(5);
    runtime.sendEvent(apple1);

    Apple apple2 = new Apple();
    apple2.setId(2);
    apple2.setPrice(2);
    runtime.sendEvent(apple2);

    Apple apple3 = new Apple();
    apple3.setId(3);
    apple3.setPrice(5);
    runtime.sendEvent(apple3);
}
  • 测试结果
Apple's average price is 4.0

参考资料

Esper学习之一:Esper介绍