Pervasive DataRush

Java Development News:

Pervasive DataRush

By Emilio Bernabei

01 Apr 2007 | TheServerSide.com

Unleash the Power of Multicore: A Java Framework for Building Highly Parallel Auto-Scaling Applications

In this article we explore how Java developers, tasked with crunching gigabytes of data, can quickly harness the full power of new multicore platforms using the Pervasive DataRush™ framework. We will start with a very simple business problem, show the thought process the developer goes through during the design phase and then we provide code snippets showing, step-by-step, how the DataRush framework is used to rapidly build one of the hyper-parallel, auto-scaling components of the application.

NOTE: There are problems that can be solved using extremely parallel computational logic (in fact, the scientific community calls these constructs “embarrassingly parallel”) and those that are inherently hard to execute in parallel. I chose a problem that exercises a few types of parallelism in DataRush. In the real world, your results will vary.

Five Reference Terms to Know

First, let’s define some terms here that are commonly used when building applications with DataRush. We’ll start from the smallest building block (where you might be coding in Java) and slowly move up the composition ladder.

  • Process – A custom Java class you implement. This is the lowest-level data processing component in the application. It has input “ports” (scalar or composite types that define the incoming data) and output ports (you write the data to the output once some sort of calculation and/or transformation has been completed on the input data).
  • Operator – A multithreaded process or an assembly (see below). Operators manipulate data and can have input and output ports through which data flows to and from other operators.
  • Assembly – A multithreaded composite operator. In its simplest form, the assembly is made up of one or more processes arranged into a dataflow graph (a directed, acyclic graph, to be precise). But the assembly can have input and output ports and be treated as a “black-box” operator; therefore, in more complex forms, the assembly contains other assemblies masked as simple operators. That’s why I used “composite operator” to describe an assembly rather than “composite process”. Assemblies are expressed in an XML declarative language called DFXML, which stands for “dataflow XML language”.
  • Customizer – This is a critical aspect of DataRush. Strictly speaking, a customizer is a code generator inside an assembly, but you can think of it as a helper class that instructs DataRush how an operator can be made parallel. Customizers dynamically extend the enclosing assembly. For example, an assembly that sorts records having a composite primary key may have a customizer that generates N scalar sort operators (one for each field in the key) and then merges the results.
  • Application – A DataRush application is the highest-level entity in a DataRush project. It is basically an assembly with no defined input and output ports (it can’t be used as a “black-box” operator in any other assembly). It can only be invoked by the DataRush compiler and executor.

Why do we want this level of rigor defining how Java classes interact with each other? In essence, you are using an XML scripting language to declaratively define a dataflow machine that can be analyzed and expanded dynamically into a highly-parallel series of Java processes. In fact, DataRush assemblies provide parallelism on three axes:

  • horizontal parallelism: operate on large segments of a record set by segmenting records -- e.g., using record types or round-robin partitioning to spread workload
  • vertical parallelism: each field/column in a set of records is processed concurrently, then stitched back together
  • pipeline parallelism: if an assembly is structured sequentially such as A->B->C, all three operators actually run in parallel, so that as data is processed by A, it flows to B and as B completes portions of the dataset, its output flows to C

So declarative form of assembly specification is basically “design by contract” reuse, where operators have input/output ports, but make no claims to implementation.

The Business Problem: Surveillance, Search and Compliance

Our sample scenario assumes a medium-sized financial institution has tasked its compliance and risk division with building a high-performance information surveillance framework that can be repurposed for many surveillance applications. The first application of this framework will be to the immediate task of detecting individuals on FBI watch-lists and/or individuals known to be associated with money laundering activities (let’s call this the “hit-list”). Bank officials need to be notified within 15 minutes of any hit-list individuals conducting electronic transactions with the bank and they want to know if the activity was clustering in any one geographic area.

Data Avalanche

In our scenario, fifty thousand audit records are generated every minute by back-end legacy systems that aggregate credit card, ATM and bank teller transactions. Every ten minutes, the data is to be fed to the surveillance application in a delimited text format. The volume of transactions is expected to almost double every year.

The hit-list of suspected felons is constantly changing, but averages 1,000 names and aliases. The hit-list is in a database which is updated in near real-time by either the FBI or the bank’s internal fraud department, so you have to pull from the hit-list every time you scan the transaction data (or apply changes to in-memory lookup tables).

Further address information about each individual on the hit-list is stored in yet another delimited text file. Access to this file is logged and audited by the FBI on a monthly basis.

A Tail of Two Teams

Since the project is so important to management, they tasked two teams to attack the problem independently. Both teams were told to use no more than a 4-way Dual Core SMP server with 64GB RAM and to reuse two existing code modules from a previous project:

  • Edit Distance: Computes an integer indicating how similar two strings are using a well-known algorithm called Levenshtein distance* (http://en.wikipedia.org/wiki/Levenshtein_distance).
  • K Means: Given a plotting of many X,Y coordinates, find the related clusters. Applying this to zip code or GPS coordinates might be useful for finding clusters of illicit activity.

* There are numerous ways to skin a cat when it comes to search algorithms. One could use http://en.wikipedia.org/wiki/Soundex or even neural network algorithms rather than enumerating all known variants of a name in a list. For the purpose of the article, this will do.

The Other Guys

Let’s peer into the Java development team that started down the path of writing the surveillance application starting from the baseline Java 5 JDK. First, they looked at the edit distance code and calculated that it would have to run against 50,000 transactions/min x 10 min. x 1,000 names = half a billion iterations of the algorithm or 800,000/second. Gasp.

Well, running the edit distance POJO on half a billion in-memory comparisons yielded a pretty good result on the 8-core server – 13 minutes, only 3 minutes off the mark, right? “Wrong,” said the compliance officer, as he reminded the team about next year’s doubling of volumes, about the 15-minute notification requirement, and about the fact they still haven’t factored in the location clustering using K means nor the I/O overhead. Half the team went to work tuning the JVM, while the other half cracked open Java concurrency books and started studying…

The DataRush Team

The DataRush development team studied the problem and immediately began discussing the solution in the context of a classic dataflow graph. They knew parallel computing was the answer and they knew the cost of 8, 16 and 32 multicore servers would almost certainly decline every year, so the surveillance framework had to automatically scale as CPU and memory resources were added by IT operations personnel.

A quick prototype of the dataflow pipeline on a whiteboard showed that the team could break the problem into two main sections that could be built independently:

  • Reading the hit-list table and transaction file to find possible matches
  • Enriching matches with address information to find implicit associations among the perpetrators’ home addresses (using a well-known algorithm called K means)

Specifically, the team decided that the following modules made up the dataflow graph:

  • Concurrently:
    • Reads transaction data from a delimited text file
    • Reads a "hit word" dataset from a database
  • Replicates each hit word with each transaction record
  • Concurrently:
    • Runs edit distance (horizontal partitioned) on transaction lastName field and hit word
    • Filters out edit distance results < 3
  • Concurrently:
    • Reads an identity dataset containing zip code from a delimited text file
    • Joins identity data with filtered edit distance data
  • Runs K means on zip code to cluster filtered edit distance results

Now, even though some portions of the application seem to be constrained to serial execution, the DataRush framework actually allows pipeline parallelism and vertical partitioning parallelism as well. This means that as the first component completes work it receives from its input port(s), it can stream the results to the next component that is running concurrently.

While half the DataRush team worked on the K means clustering module, the other tackled matching – let’s follow the matching team as they build a highly parallel implementation of the edit distance algorithm.

Re-Using Existing Non-Parallel Code

The DataRush team looked at the runtime statistics for the Levenshtein edit distance algorithm (see below) and realized the current 13 minute runtime could be reduced through parallelism.

Text Box:     private int minimum(int a, int b, int c){          if (a<=b && a<=c)              return a;          if (b<=a && b<=c)              return b;          return c;      }            private int computeLevenshteinDistance(String str1, String str2) {          return computeLevenshteinDistance(str1.toCharArray(), str2.toCharArray());      }        private int computeLevenshteinDistance(char[] str1, char[] str2) {          int[][] distance = new int[str1.length+1][];            for(int i=0; i<=str1.length; i++){              distance[i] = new int[str2.length+1];              distance[i][0] = i;          }          for(int j=0; j<str2.length+1; j++)              distance[0][j]=j;            for(int i=1; i<=str1.length; i++)              for(int j=1;j<=str2.length; j++)                    distance[i][j]= minimum(distance[i-1][j]+1, distance[i][j-1]+1,                                             distance[i-1][j-1]+((str1[i-1]==str2[j-1])?0:1));                   return distance[str1.length][str2.length];      }

Given two strings, the Levenshtein algorithm returns an integer. The higher the value, the less likely the two strings match, because the integer represents the number of “edits” that must be done to String 1 in order for it to match String 2 – hence the term “edit distance”.

Now, remembering the three axes of parallelism mentioned in the Pervasive DataRush Developer’s Guide, the team set out to “wrapper” the work already done. Again, these three axes are:

  1. Horizontal partitioning of work (i.e., by row)
  2. Vertical partitioning of work (i.e., by column)
  3. Pipeline parallelism (i.e., serial processes run concurrently and begin streaming completed work immediately, rather than waiting for the full dataset to be processed)

The team knows they can’t always use all three, but for the most part at least 1 or 2 of the axes will fit given a particular component. In general, if the component is factored down to the simplest data processing operation, it can be made parallel more efficiently.
First, a simple DataRush “operator” wrapper is set up to process a single execution of Levenshtein.

Text Box: package example.editdistance;    import com.pervasive.dataflow.dev.IntOutput;  import com.pervasive.dataflow.dev.ProcessBase;  import com.pervasive.dataflow.dev.ProcessRunContext;  import com.pervasive.dataflow.dev.StringInput;    public class EditDistanceProcess extends ProcessBase {        public StringInput leftInput;      public StringInput rightInput;      public IntOutput output;            public void run(ProcessRunContext context) {           // Read pairs of strings until no more data in the input port         while (leftInput.stepNext() & rightInput.stepNext()) {            int distance = computeLevenshteinDistance(leftInput.asString(),rightInput.asString());                            // Push to output              output.push(distance);          }                    output.pushEndOfData();          return;      }       /* … cut/paste the non-parallel computeLevenshteinDistance() code here        */  }

The above snippet is a DataRush process that has two input ports, each of type string. A “row” from each port is read and the edit distance between the two “columns” is computed. The edit distance computed is pushed to the output port. When end of data is encountered on the input ports, the output port is closed and the process exits.

As most team members already spent a week playing with DataRush, this phase of the project only took them 20 minutes. Results may vary depending on familiarity with the framework and the software engineer’s ability to think of computational problems as dataflow graphs.

Building the DFXML

The next step is to define the new DataRush process as an assembly; even though it’s an assembly of one, this allows the addition of documentation to the operator and is good practice. Here’s the dataflow XML language (DFXML):

Text Box: <?xml version=1.0 encoding=UTF-8?>  <!--    (c) Copyright 2007 Surveillance XYZ Corp. All rights reserved.  -->  <AssemblySpecification       xmlns=http://www.pervasive.com/xmlns/dataflow/sdk       name=EditDistance       package=example.editdistance       schemaVersion=1.0>    <Doc>      <Author>DataRush Team</Author>      <DateCreated>2006-10-17</DateCreated>      <Description>Compute Levenshtein edit distance of two strings</Description>    </Doc>    <Operator>      <Contract>        <Inputs>          <Port name=leftInput type=string>            <Description>Left string input</Description>          </Port>          <Port name=rightInput type=string>            <Description>Right string input</Description>          </Port>        </Inputs>        <Outputs>          <Port name=output type=int>            <Description>Computed edit distance</Description>          </Port>        </Outputs>      </Contract>      <Composition>        <Process instance=editDistance type=example.editdistance.EditDistanceProcess>          <Link st=leftInput />          <Link st=rightInput />        </Process>        <Link instance=editDistance st=output />      </Composition>    </Operator>  </AssemblySpecification>

The assembly wrapper for the edit distance process allows the DataRush team to add metadata information about the operator, set up input and output ports and define a simple one-process composition. This took the team about 10 minutes.

In today’s world of model-driven development, filling out deployment descriptor-like XML scripts to define graphs is not the most intuitive experience; however, the DataRush SDK does include a graph visualizer for Eclipse enabling model-driven application design.
The team now creates the key component that enables auto-scaling of applications – the job of this customizer is to replicate assemblies based on any number of factors (e.g. # cores, memory). Here’s the code of a simple customizer that replicates the Edit Distance assembly

Text Box: package example.editdistance;    import com.pervasive.dataflow.dev.AssemblyRef;  import com.pervasive.dataflow.dev.CustomizerContext;  import com.pervasive.dataflow.dev.DataflowCustomizer;    public class EditDistanceCustomizer implements DataflowCustomizer {        private int partitionCount;           /*      * Use whatever property value was manually set by the user. If not set, use the number of processor cores to      * drive the parallel execution strategy.      */     public void setPartitionCount(int partitionCount) {             if (partitionCount <= 0) {                 partitionCount = Runtime.getRuntime().availableProcessors();             }             this.partitionCount = partitionCount;     }        /**       * This method is invoked by the DataRush framework at compile time. The customizer method       * create instance count number of assemblies that will run in parallel.       */      public void customize(CustomizerContext context) {             // For the number of instances wanted ...again, this could be a more complex algorithm, but           // in this case we do not know the execution power of each core, nor do we want to infer any…          for (int i = 0; i < partitionCount; i++) {                      // Create a new assembly              AssemblyRef editDistRef = context.addAssembly(                      edit_distance_ + i,                       example.editdistance.EditDistanceComputeInstance);                            // Tell the assembly where to get its properties.              editDistRef.setSource(seed);              editDistRef.setSource(rowCount);              editDistRef.setSource(logFrequency);          }      }  }

The DataRush team wanted to first isolate the testing of scalability to the server alone, ruling out I/O issues with disk, so a much simpler customizer was created above. The DataRush framework tries to make parallelism as simple and dynamic as possible for the Java developer. This customizer creates a given number of instances of an assembly. Each test assembly here is stand-alone – it doesn’t have linked input and output ports. This will change after benchmarking is complete.

Time to implement/test this DFXML: approximately 30 minutes….

Creating the In-Memory Benchmark

The team now creates the benchmark of half a billion executions of edit distance.

Text Box: <?xml version=1.0 encoding=UTF-8?>  <!--    (c) Copyright 2007. Surveillance XYZ Corp. All rights reserved.  -->  <AssemblySpecification      xmlns=http://www.pervasive.com/xmlns/dataflow/sdk      name=EditDistanceComputeInstance      package=example.editdistance      schemaVersion=1.0>    <Doc>      <Author>Pervasive Software</Author>      <DateCreated>2006-10-18</DateCreated>      <Description>Generate two string fields and compute edit distance.</Description>    </Doc>    <Operator>      <Contract>        <Properties>          <Property name=seed type=long default=12345/>          <Property name=rowCount type=long/>          <Property name=logFrequency type=int default=0/>        </Properties>      </Contract>      <Composition>        <!-- Generate left hand side string field -->        <Assembly instance=genLeft type=com.pervasive.dataflow.operators.source.GenerateRandomRows>          <Set target=outputType value=string/>          <Set st=rowCount />          <Set st=seed/>          <Set target=seedOffset value=0/>        </Assembly>                <!-- Generate right hand side string field -->        <Assembly instance=genRight type=com.pervasive.dataflow.operators.source.GenerateRandomRows>          <Set target=outputType value=string/>          <Set st=rowCount />          <Set st=seed/>          <Set target=seedOffset value=7/>        </Assembly>          <!-- Compute edit distance of generated strings. -->        <Assembly instance=editDistance type=example.editdistance.EditDistance>          <Link instance=genLeft source=output target=leftInput />          <Link instance=genRight source=output target=rightInput />        </Assembly>                <!-- Send computed edit distance to bit bucket.                   -->        <!-- Used sink to demonstrate linking to edit distance component. -->        <Assembly instance=sink type=com.pervasive.dataflow.operators.sink.LogRows>          <Set st=logFrequency/>          <Link instance=editDistance source=output target=input/>        </Assembly>              </Composition>    </Operator>  </AssemblySpecification>

This assembly creates a data generator for the pair of string phrases (called “left and right hand side” here). The output of edit distance links to a data sink component. Why data generators and data sink? Again, we are trying to make sure the matching assembly can scale to use all 8 cores at peak data throughput, so the in-memory test doesn’t need to link to the data readers yet. Once we prove computational scalability, we will introduce reader operators to pull from the hit-list.

Time to implement/test this DFXML and get it right: approximately 30 minutes.
Bringing it All Together

Text Box: <?xml version=1.0 encoding=UTF-8?>  <!--    (c) Copyright 2006 Pervasive Software Inc. All rights reserved.  -->  <AssemblySpecification       xmlns=http://www.pervasive.com/xmlns/dataflow/sdk       name=EditDistanceSample       package=example.editdistance       schemaVersion=1.0>    <Doc>      <Author>Pervasive Software</Author>      <DateCreated>2006-10-17</DateCreated>      <Description>Sample application to run the edit distance algorithm.</Description>    </Doc>    <Operator>      <Contract>        <Properties>          <Property name=partitionCount type=int default=50>            <Description>Number of parallel instances of edit distance to create</Description>          </Property>          <Property name=logFrequency type=int default=0>            <Description>Frequency with which to log output rows (zero implies no logging)</Description>          </Property>          <Property name=rowCount type=long default=10000000>            <Description>Generate 10 million records.</Description>          </Property>          <Property name=seed type=long default=12345>            <Description>Seed value for random number generation.</Description>          </Property>        </Properties>      </Contract>      <Composition>        <Customizer stage=composition type=example.editdistance.EditDistanceCustomizer>          <Set st=partitionCount/>        </Customizer>      </Composition>    </Operator>  </AssemblySpecification>

This assembly invokes the edit distance customizer with the number of parallel instantiations of edit distance that is defined by an input property. Each instantiation creates two string data generators, an edit distance compute component and a data sink. All of these components will run in parallel within the DataRush engine.

Time to implement/test this DFXML: approximately 30 minutes.

Total time to implement matching: about 2 hours.

It now takes less than 2 minutes to compute edit distance against 500 million word/phrase pairs. Moreover, the solution scales almost linearly as processors continue to double in core density – with no additional code optimization required.

The DataRush team has reused existing code and created a key component of the overall solution in record time. They can now begin the development of the file I/O architecture knowing the surveillance application has a full eight minutes to get the file into the server’s address space compute K means clusters against the hit-list matches.

Closing Thoughts

I admit that some portion of the Java concurrency experts out there won’t see the power in a framework like DataRush when J2SE 6 already supports many APIs required to build high-performance applications (e.g., NIO and JSR-166). I suggest we all step back a second and walk a mile in the shoes of the vast majority of Java developers.

Here’s my logic:

  • Java developers have been learning about and building transactional, message-oriented systems for some time (J2EE and ESB frameworks). In fact, the majority of enterprise Java developers have skewed to this area of computer science.
  • When confronted by a project to build data-intensive (bulk data) applications, Java developers have defaulted to other scripting languages or used simple, non-parallel Java constructs. Why? To reduce project risk.
  • Building parallelism, even seemingly simple parallelism, has unforeseen consequences. Data pipelining requires intelligent queueing and buffering of data flowing between operators – queues expand and contract depending on operator performance. Deadlock detection of shared memory resources is more art than science. Custom coding all this while leaving future successors with reusable, configurable composite applications is not likely to happen either.

Therefore, I contend the vast majority of enterprise applications that run during scheduled intervals (nightly, hourly, every 10 minutes) do not utilize parallel data processing techniques that can harness the full power of multicore hardware platforms. Nor will these applications scale automatically as processor core densities continue to double.

Pervasive DataRush provides simple constructs to:

  • Create units of work (processes) that can each individually be made parallel.
  • Tie processes together in a dataflow graph (assemblies), but then enable the reuse of complex assemblies as simple operators in other applications.
  • Then further tie operators into new, broader dataflow applications and so on.
  • Run a compiler that can traverse all sub-assemblies while executing customizers to automatically define parallel execution strategies based on then-current resources and/or more complex heuristics (this will only improve over time).

Thus in the real world, where IT budgets dwindle and almost every project schedule experiences slippage within a week of the kick-off meeting, I think a framework like Pervasive DataRush will give Java developers a fighting chance to achieve significant returns on technology investment – both a return on the time invested to learn DataRush as well as a significant return on the cost of new multicore servers your company is undoubtedly provisioning as you read this article.

Pervasive DataRush is currently in beta and available for free download at www.pervasivedatarush.com.

Emilio Bernabei is director of product management at Pervasive Software. He can be reached at ebernabei@pervasive.com .