Java Development News:

Complex Event Processing Made Simple Using Esper

By Thomas Bernhardt and Alexandre Vasseur

01 Apr 2008 | TheServerSide.com

Introduction to Complex Event Processing (CEP)

Event processing has been at the heart of any computing system for more then a decade. A common challenge across industries is to be able to extract actionable intelligence from disparate event sources in the most real-time possible way.

You may have experienced this yourself: you need to have some information right away, not 1 hour later. Any delay in getting the information reduces the value it has to you...and you need to know when certain things are not happening within a certain time period, not just the presence of some events. Among those events you may want to filter a few of them and do aggregate computation over some period of time.

Relational databases make it really hard to deal with temporal data and real-time or continuous queries. Other well known event processing systems have thus far focused on integrating endpoints as services, gaining abstraction over transports and protocols: EAI, MOM and more generally SOA. The missing part for actionable intelligence is a form of event processor capable of executing continuous queries with an highly expressive event processing language, capable to render the most complex situations: those of a real world system in which time and causality are first class citizens: this is what Complex Event Processing (CEP) is about.

CEP aims at analyzing the data (events) that flow between information systems to gain valuable information in real-time. The term 'Complex' means that events can exist in relationships with each other, for example through timing or causality.

Introduction to Esper

Esper is an open-source CEP engine written entirely in Java and fully embeddable into any Java process - custom, JEE, ESB, BPM, etc. It recently has reached version 2.0, is backed by EsperTech under a professional open source / dual license business model, and triggers a growing interest in the Java and .Net community as well with NEsper, its full .Net/C# implementation. Both can be downloaded from esper.codehaus.org with complete examples and documentation.

The Esper engine works somewhat like a database turned upside-down. Instead of storing the data and running queries against stored data, Esper enables applications to store queries (or statements) and run the data through them. Response from the Esper engine is thus real-time when conditions occur that match queries. The execution model is thus continuous rather than only when a query is submitted.

One of the most attractive features in Esper is that is easy to formulate continuous queries: Esper provides an SQL-like language extended to handle event streams and patterns, an Event Processing Language (EPL). For developers more inclined to program against an API, a continuous query object model API is also available.

The other key features are listed below and a few of them will be further described through a concrete example in this article:

  • Continuous queries, filtering and computations (min, max, avg, stddev, vwap, ...)
  • Continuous joins
  • "Followed by" logic and detection of missing events
  • Time and length based sliding window, output flow control
  • Continuous joins of streams and historical data stored in relational databases, with local caching
  • Event representation as Java object, .Net object, Map or XML
  • Continuous listener processing model and pull based iterator processing model

Example use case

The use cases for CEP engines are numerous. There has been traction in financial services, RFID, BAM, and general purpose monitoring framework as well as more generally in Event Driven SOA. In this article, we describe a simplified BAM use case. The scenario is a payment processing vendor, such as PayPal, that connects to a payment inquiry service that a customer bank typically provides. We will pretend to be PayPal and use a CEP engine to monitor proper acknowledgement by a bank's payment inquiry service and report problematic conditions.

Inquiry Request Events

The application sends requests for payment inquiries to an external payment processing endpoint, such as the customer bank backbone. A request carries the following information:

public class PayInqRequest {
  private String customerId;
  private String paymentId;
  private long timestamp;
}

Assume that we have a service agreement with the payment processing endpoint: it must acknowledge such requests within 15 minutes of receiving a request. The provider answers back by providing a status indicating whether the payment was found in its own system. The acknowledgement looks like:

public class PayInqAck {
  private String customerId;
  private String paymentId;
  private long timestamp;
  private boolean isFound;
}

Detecting Missing Acknowledgements

Sometimes the payment processing partner misses his 15-minute deadline for various reasons. In such a case, our application must record each occurrence and alert an operator as soon as the situation occurs.

By analyzing the PayInqRequest and PayInqAck events as they occur, Esper can detect late or missing acknowledgments. The pattern needs to look for every PayInqRequest event. Each such request is - in the ideal case - followed by an acknowledgment for the same payment id, within 15 minutes. The absence of such acknowledgment is the situation we need to detect.

The Esper EPL provides operators to express such event relationships: There is an 'every' keyword that tells the engine to look for every event, and a '->' operator to express a followed-by relationship. An interval specification and a 'not' can be used to detect when an event is absent after a period of time.

Detecting the missing acknowledgments can be expressed by a single pattern query as shown here:

  every r=PayInqRequest ->
   (timer:interval(15 min) and not PayInqAck(paymentId = r.paymentId))

Reporting Payments Not Found

When the partner system cannot find the payment on its system, it reports back an error status. In order to maximize operational efficiency and accelerate troubleshooting, we want to report this condition but only if it happens with a certain frequency for any customer: More then 3 times in a 10-minute sliding period should result in an immediate alert. Otherwise the condition is assumed to be part of a regular exception path.

For this requirement, we need to filter on the status field of PayInqAck events and continuously compute a total per customer. The computation should consider only the last 10 minutes of events, older events should not be considered.

This use case requires set-based processing of events and further considering a sliding time window, and that is another key capability of Esper. The Esper EPL provides views onto event streams such as a time window as first level language syntax element.

This query performs the filtering and totaling per customer, for the last 10 minutes of events:

  select customerId, count(*)
  from PayInqAck(isFound = false).win:time(10 minutes)
  group by customer
  having count(*) >= 3

The 'win:time' syntax in the above query declares a time window of 10 minutes. The query reports immediately when the condition is met. Knowing that a threshold has been reached as soon as it happens can thus increase operational efficiency and provides real- time business activity monitoring on such a non-trivial scenario.

The use of a CEP engine like Esper represents a fair amount savings compared to trying to develop by hand such detection logic. It further provides great flexibility to change it and enrich it as business imperative evolves.

Putting Esper EPL Queries into Action

Thus far we have focused on Esper EPL through two specific use cases - which hardly scratch the surface of what ones can do with EPL. We have yet to look at how we can put it in action, registering queries and subscribing to their results, and finally how to send events into the engine for consumption.

Esper can be configured by API or by XML file. Most of the configuration items are optional, and this article illustrates some the API usage.

The following code snippet creates an Esper engine instance:

  Configuration config = new Configuration();
  config.addEventTypeAutoAlias("com.mycompany.sample.events");
  EPServiceProvider service =
     EPServiceProviderManager.getDefaultProvider(config);

The sample code above specifies a package name; thereby our queries don't need to use the fully-qualified Java class name. We could also have declared Map or XML based events - without ever requiring the EPL query to be changed.

Continuous query statements can then be created and registered at runtime:

  EPStatement statement = service.createEQL(
     "select customerId, count(*) as cnt " +
     "from PayInqAck(isFound = false).win:time(10 minutes) " +
     "group by customer " +
     "having count(*) >= 3";

An observer can subscribe to the continuous query results:

  MyStmtObserver observer = new MyStmtObserver();//see below
  statement.setObserver(observer);

A sample observer that only prints query results as they happen is shown below. Esper invokes the observer and passes strongly-typed statement results when conditions match:

  public class MyStmtObserver {
    public void update(String customerId, int cnt)
    {
      System.out.println("customer: " + customerId +
                         " ,count: " + cnt);
    }
  }

As an alternative, we can subscribe one or more generic listener components to a statement and use an API to access output events.

  public class MyStmtListener implements UpdateListener {
    public void update(EventBean[] result, EventBean[] prior)
    {
      System.out.println("customer: " + result[0].get("customerId") +
                         " ,count: " + result[0].get("cnt") );
    }
  }

Code to receive actual events from the outside will very likely be integrated into a service endpoint in a MOM, an ESB, a WebService facade etc. where transport and protocol issues are dealt with. The code to send those events into the event processing engine is then very simple:

  PayInqRequest event = new PayInqRequest(...);
  //could have read it from network etc
  epService.getEPRuntime().sendEvent(event);

Immediate Advantages of applying CEP

As we now understand, the Esper CEP engine works somewhat like a database turned upside-down: Instead of storing the data and running queries against stored data - which would not be intuitive neither scalable - the Esper engine enables applications to store queries and run the data through. Response from the Esper engine occurs in real-time when conditions occur that match queries. The execution model is thus continuous rather than only when a query is submitted.

Once such event streams are available, it is then easy to add additional queries. Very little or no coding effort is required to analyze existing event streams by adding new queries, or refine existing queries. The Esper EPL is designed for expressiveness and ease-of-use with a familiar SQL look-alike, and can also be extended in numerous ways - providing great agility in expressing business imperatives.

As query results are available with very little or no delay, the technology brings a tremendous business value by providing immediate information - either read only as would be the case of BAM and KPI or possibly for further automated action. The technology is also designed to handle a very high volume of events with minimal latency, and remove the need to store events to then only be able to perform polling and computations.

Events can be domain representations of relevant steps in a business process or problem space, and remain agnostic to the technology used to accomplish integration - be it format, transport or protocol - which ensures existing systems tailored for such tasks can fully be reused.

Advanced Use Cases and Features

Continuous Joins

A web application may allow users to schedule payments for online bill pay. Each request a user submits to the web application can be seen as an event and therefore analyzed and correlated. This is termed clickstream analysis.

An advanced use case may correlate click stream events and backend system events in real-time, such as the payment inquiry system discussed before. The business value lies in an improved customer experience to detect for example when currently online web site users keep cancelling payments.

A join or outer join between two or more event streams can also come in handy when matching up event streams from different systems. Esper continuous joins are incremental in nature: they get evaluated when a new event arrives for any of the joined streams. Joins can also include static data out a relational database or data obtained via an arbitrary method invocation - which opens for a wide range of integration with reference data (such as joins with data out of a distributed cache).

Statement variables

Threshold values, such as the number of events per time interval, are typically represented as variables in Esper to parameterize statements. Many use cases compare an average over one time interval, say 1 hour, with an average over a shorter time interval, such as 5 minutes, to detect outliers in the data.

Output flow control and Pull based processing

It may not always be convenient to your application to receive constant updates when events arrive or timing occurs that change query results. Therefore Esper provides a comprehensive control over output rates, as well as a complete and thread-safe pull iterator API - should the real-time aspect be less relevant to your particular use case.

This code snippet demonstrates the use if the thread-safe iterator API to retrieve results from a statement while receiving events:

  // Obtain the safe iterator from an EPStatement as seen earlier
  SafeIterator<EventBean> eventIter = statement.safeIterator();
  try {
    while (eventIter.hasNext()) {
       EventBean event = safeIter.next();
       System.out.println("customer: " + event.get("customerId") +
                           " ,count: " + event.get("cnt") );
     }
  } finally {
    eventIter.close();  // make sure to close the safe iterator
  }

Resiliency and out of heap overflow

EsperTech also provides a resiliency and high availability extension - EsperHA - under a commercial license - which ensures internal computed intermediate results survive a crash and further on that the stream window processing capabilities are not bound by memory limits.

Summary

This article has introduced Complex Event Processing using Esper, a technology for real-time continuous data analysis. It solved two use cases out of a BAM scenario in an extensible and decoupled fashion, where Esper significantly reduced a sizable development effort. Last, it presented some of the more advanced use cases and features.

Biographies

Thomas Bernhardt is founder and project lead of the Esper open source project and founder/CEO of EsperTech. Thomas is regularly architecting event-driven software systems at major financial institutions. He has more than 20 years of experience in software.

Alexandre Vasseur co-leads the Esper open source ESP/CEP project and works and evangelizes on Event-Driven Architectures and Java middleware. He is is a member of Eclipse and CodeHaus communities, and also co-founded the AspectWerkz AOP framework prior its merger with AspectJ, now being leveraged in Spring 2.