Java Development News:

New Features in JGroups 2.5

By Vladimir Blagojevic

01 Aug 2007 | TheServerSide.com

JGroups is a toolkit for the development of distributed Java applications based on group communication. Group communication involves message exchange between set of processes in presence of process failures and network interruptions. The most powerful feature of JGroups is its flexible protocol stack, which allows developers to adapt JGroups to exactly match their application requirements and underlying network characteristics. By mixing and matching protocols multiple application requirements can be satisfied at a minimum cost.

Before we begin lets review some basic concepts from group communication and JGroups. Each group member/process in JGroups is represented by JChannel object. The best way to think of a JChannel is a socket like abstraction allowing message sending and receiving. JChannel also allows registration of callback interfaces that are invoked when channel related events occur. Messages exchanged between JChannel instances have a payload and a pair of sender and receiver addresses. A view is an ordered list of member addresses representing current agreed membership of a group/cluster. When a new member joins, or a member crashes, a new view will be installed in all members *including* the new member. For example, if we have a cluster of A, B and C, the view is V12={A,B,C}. If a new member D joins, then everyone will have view V13={A,B,C,D}. If member A crashes, we will have view V14={B,C,D}. Note that we add members in the order in which they join the group.

The latest JGroups release (2.5) includes some of long-awaited features, like a concurrent stack, multiplexer improvements, out-of-band (unordered) messages, new failure detection protocols, and finally full support for virtual synchrony. Today we will have a look at three important features of the 2.5 release: full virtual synchrony, the concurrent stack, and the improved multiplexer.

FLUSH

FLUSH is a JGroups protocol introduced in the 2.5 release that implements virtual synchrony. But before we dive into the details of virtual synchrony lets understand the problem that virtual synchrony solves.

When sending/receiving messages, the main property of the default JGroups stack is that all messages are delivered reliably to all (non-crashed) members. However, there are no guarantees when these messages are going to be delivered with respect to the views. For example, when a member A with view V1={A,B,C} multicasts message M1 to the group and D joins at about the same time, then D may or may not receive M1, and there is no guarantee that A, B and C receive M1 in V1 or V2={A,B,C,D}. This is where the FLUSH protocol comes in to provide stronger guarantee about the message delivery with respect to views.

Virtual synchrony is a model of group communication defined by Ken Birman [1]. First of all, virtual synchrony guarantees that group membership changes are observed in the same order by all the surviving group members. Moreover, virtual synchrony guarantees that two members observing the same two consecutive membership changes are guaranteed to receive the same set of application multicast messages between the two membership changes. Note that we are not concerned about particular order of application multicast messages between two consecutive membership changes. Virtual synchrony is only concerned that the same set of application messages is received by all surviving group members and that membership changes are totally ordered with respect to application messages. Clustered application using virtual synchrony model will have an appearance that group members observe the same events and in the same order – virtually synchronous order.

The name “FLUSH” is derived from “flushing all pending messages out of the system”. This means, that before a new view is installed, we make sure that every member has delivered exactly the same set of messages. For example, if we have V10={A,B,C,D}, and B sends a message M to the group, but crashes right after sending M. If nobody else received M, then M is lost and will never be seen by the group. However, if D received M, then the FLUSH protocol will make sure that M is delivered to A and C before view V11={A,C,D} is installed.

This 'atomicity' of message delivery (either every non-faulty member receives a message, or no-one receives a message) is similar to transactions known from the database world, except that it only extends to non-faulty members. JGroups doesn't maintain a log in stable storage (like databases), as it only deals with in-memory state. So, if a member crashes, and comes back online, it can re-acquire the state from some other member, and therefore doesn't need to reconcile logs.

Therefore if we have group of processes that are observing same view events, where each view has the same ordered member list received by each member, and if we have a guarantee that each member will receive the same set of messages between those views then we can further simplify the design of distributed applications. Not only that we can simplify applications that are using the JGroups toolkit but we can also simplify some internal protocols within the JGroups stack, as we will see in the examples below.

FLUSH, state transfer and Multiplexer

FLUSH protocol does not only come in handy to provide a strong model regarding demarcation of views and messages but it also provides a stop-the-world message model which is essentially the ability to stop all message exchange for a certain time period. The stop-the-world model is necessary for state transfer between applications that are multiplexed over the regular JGroups channel [see Multiplexer below for more details]. In order to understand why FLUSH is required for correct state transfer for multiplexed channels lets take an example of three distributed hash table applications (DHT1,DHT2 and DHT3) sitting on top of three multiplexed channels where all three multiplexed channels are sharing the same JChannel.

When DHT2 application starts, suppose it wants to fetch its state (by calling channel.getState()) from other DHT2s application residing on top of another channel. The state retrieved from the state providing channel should only include the state for DHT2 application, but not the states of DHT1 and DHT3 applications.

However, the state transfer protocol does not know, and should not know, anything about multiple applications sharing one channel. State transfer only knows how to transfer application state of one channel to application on another channel. Since state transfer should be concurrent with message sending and receiving, state transfer ships the state with a digest which contains the highest delivered sequence numbers sent by all members that are included in that state. You can think of a digest as fingerprint that encodes which messages went into composition of that state. After state installation at the state receiving member, digest is set in the joining member to determine which messages are part of the state, and which ones need to be possibly retransmitted so that the joining channel can catchup with other members of a group.

Since the digest applies to all messages for all states, we cannot determine correctly which particular messages went into building of a state for DHT2. In another words we cannot rely on the digest for multiplexer state transfers. Therefore we have to use the stop-the-world model provided by FLUSH. When a state is requested, FLUSH forces everybody to stop sending any new messages. After the group has become quiet partial state is transferred, and finally FLUSH signals everybody to continue sending messages.

FLUSH and ENCRYPT

The JGroups ENCRYPT protocol, as its name implies, encrypts all messages sent/received by group members. ENCRYPT uses a combination of both asymmetric and symmetric encryption algorithms. Symmetric encryption is used for actual message encryption while asymmetric algorithm is used to disseminate the shared key for symmetric encryption algorithm. The shared symmetric key is recreated by the coordinator whenever a node joins or an existing one leaves or crashes i.e. whenever a new view is installed. When a new node joins a cluster it uses asymmetric (public/private) encryption to obtain the shared symmetric key.

What can potentially happen if we are not using FLUSH? We can have a case where a message M sent by sender S from V1 and encrypted with a shared key from V1 is received by a receiver R in V2 where a new shared key has already been installed. Thus R would be unable to decrypt message M. It is here where FLUSH comes to a rescue. Recall how FLUSH provides strict demarcation of messages and views; therefore you can see how combining FLUSH and ENCRYPT can provide strong guarantees that every message sent/received by cluster members will be properly encrypted and decrypted.

If we use FLUSH in combination with ENCRYPT, the implementation of ENCRYPT protocol can be further simplified. Currently ENCRYPT does not assume presence of FLUSH so it has to do a non-trivial application message queuing after a new view has be installed. Each channel has to queue its outgoing and incoming messages until ENCRYPT obtains a new secret key from coordinator. If we use FLUSH protocol to obtain stop-the-world scenario when installing new views then shared key negotiation can be easily integrated into FLUSH similar to how we use FLUSH in state transfer.

Example

FLUSH can be invoked directly from JChannel API using startFlush() method. After startFlush returns the cluster has been quieted and no members can send any messages until stopFlush is invoked. As an exercise, the reader is encouraged to review SimpleChat application from JGroups tutorial [2], add print statement for the number of messages in the previous view and finally add FLUSH protocol to the channel's stack. After FLUSH has been added to the stack we are guaranteed that every surviving member will observe exactly the same number of messages in each view no matter how erratically membership changes and how frequently we send messages.

Concurrent stack

Prior to the JGroups 2.5 release, messages from all senders in a group, as they arrived to receivers, were placed into a single queue and delivered to an application in order of reception. Although we maintained so called sender (FIFO) message order, messages were unnecessarily kept in a queue waiting to be pushed up the channel to an application. For instance, if sender A sent messages 1, 2 and 3, and B sent message 34 and 35, and if A's messages were all received first, then B's messages 34 and 35 could only be processed after messages 1-3 from A were processed!

A better solution would be to concurrently receive message per sender at each channel and thus increase concurrency in message reception. However, concurrency should not come at the expense of message ordering and FIFO order per sender still needs to be maintained. To go back to our example we should process messages from different senders in parallel, messages 1, 2 and 3 from A should be processed by one thread and messages 34 and 35 from B should be processed by a different thread.

Starting with JGroups 2.5 release messages arriving to a channel from different senders are processed concurrently! As depicted in Figure 1, we introduced two thread pools: one for default messages and another for out-of-band (OOB) messages. Both thread pools can be configured through XML. OOB message are familiar JGroups messages but with a OOB bit tag turned on. They do not need to be delivered in FIFO order with respect to other messages from the same sender. As such they are perfect for heartbeats, discovery requests/responses, and any other messages which do not need to be delivered in FIFO order with respect to other messages from the same sender.

It is important to distinguish that thread pools are not used to fetch network packets from the interface; dedicated threads fetch packets from the network interface and then they hand off messages for further processing. Packets marked as OOB are dispatched to the OOB thread pool, and all other packets are dispatched to the regular thread pool. The point of using thread pools is that the receiver threads should only receive the packets and forward them to the thread pools for unmarshalling and processing. Message unmarshalling and processing requires more computing cycles than simply receiving messages from network interface and therefore can draw benefit from parallelization.

The concurrent stack will improve performance dramatically when:

- there are multiple senders and/or
- the processing of a message takes some time.

In fact, in our own stress tests we have confirmed our performance improvement expectations when we observed that in a cluster of N with N senders, X messages and T think time/message, we have seen total processing time of all messages drop from X * N * T to (X * T) + overhead for thread pools !

We have also removed up/down threads per protocol. Prior to JGroups 2.5 release each protocol in a channel could fine tune turning on/off 2 threads that were passing messages up and down the channel. Since the introduction of concurrent message processing there was no need for up and down threads per protocol any more. We also get clear and unambiguous semantics for Channel.send() method. Starting with JGroups 2.5, all messages are sent down the channel on the caller's thread and the send method call returns once the message has been put on the network.

Figure 1. Concurrent stack with two thread pools.

Multiplexer

As we have seen the most important abstraction in JGroups which allows a process to join and exchange messages within a process group is a channel. The Channel is similar to a socket; it has a message sending API, state transfer API, it can register listeners that are notified about all channel related events like message receiving, view installation, state transfer invocation hooks and so on. Closer inspection and analysis reveals that all of these semantics are naturally tied to a one particular user application residing on top of one channel. We have followed this common practice ourselves when developing clustering solutions for JBoss application server. If you start JBoss application server you will notice that each clustered component, web session replication, clustered JNDI, EJB replication, etc,. is using its own channel.

In fact if you start JBoss AS 4.2 in the standard 'all' clustering configuration, four channel instances will be created. Each of these channels will form separate clusters with its peer channels on other JBoss AS instances. If we look at the internals of JChannel we will notice that JChannel is a rather resource intensive object especially when it comes to use of Java threads. If you start two instances of the Draw demo application you will notice that each channel starts at least a dozen threads. If we start creating more than a few channels in a single JVM we quickly begin to exhaust threads as a resource. This further translates into more context switching and more memory consumption as well. Having multiple channels per JVM is also configuration intensive since we have to carefully separate all clusters by using different multicast addresses. The introduction of the Multiplexer in JGroups 2.4 release aims to alleviate these problems.

The multiplexer allows multiple channel interfaces to be associated with one underlying instance of JChannel. It is essentially a building block residing on top of a JChannel providing multiplexing functionality to N instances of MuxChannel. Since MuxChannel extends the JGroups JChannel class, user applications are completely unaware of this change in the underlying plumbing.

Each JGroups application sharing a channel through a multiplexer has to create a MuxChannel with a unique application id. The multiplexer keeps track of all registered applications and tags messages belonging to a specific application with that id for sent messages. When receiving a message from a remote peer, the multiplexer will dispatch a message to the appropriate MuxChannel depending on the id attached to the message.

However, the multiplexer has to undertake special handling for view installations, state transfers and suspect messages. Views are installed per MuxChannel semantics rather than regular JChannel semantics since an application view is different from a regular view. For example, one can easily envision a situation where three JChannel instances make a group, say V1={A,B,C} but a specific application is running only on A and B. An application view is always a subset of a “real” channel view. In the case above application view AV1={A,B}.

Multiplexer improvements in JGroups 2.5

Although the multiplexer has significantly improved resource management, we still encountered a few problems. Messages sent to multiplexed applications residing on top of channel P by the same sender channel Q were processed sequentially at P. At the first look one might not be worried by this fact but closer examination concludes that we can encounter rather significant message delivery delay issues. For example, it is rather easy to envision the following scenario: we have three channels (P, Q, and R) and each of them running 3 applications A1, A2, A3. Say A1 at P sends a replication message M to the cluster. On channel Q, the thread carrying the message M to A1 blocks at the application level waiting to acquire a certain lock. The FIFO ordering protocol will prevent A2 and A3 at channel Q receiving any messages from P until M returns! (Note that Q and R are able to receive messages, but services A2 and A3 are blocked until M from P to service A1 has completed).

We solved this problem by adding a thread pool to the Multiplexer so requests to different services can be processed concurrently, even if they were sent by the same sender while still maintaining FIFO ordering of messages between applications.

Creating multiplexed applications is straightforward. MuxChannel is created using the JChannelFactory [3] class and a factory method:

public Channel createMultiplexerChannel(String stack_name, String id) throws Exception;

where a stack_name has to be a defined stack in the file JChannelFactory was initialized with and id is a multiplexed application id. After a MuxChannel is created we can use it as it was a regular JChannel instance (send messages, invoke state transfer etc).

Example

As an exercise convert SimpleChat [2] application to use multiplexer instead of a regular channel. Thus effectively we can create different chat topics in SimpleChat application where all topics are multiplexed over one JChannel instance. There is no need to create one JChannel per chat topic as it was the case prior to introduction of multiplexer.

Conclusion and look ahead

As we have seen JGroups 2.5 release offers major improvements over previous releases most notably concurrent messages delivery, channel multiplexing and full virtual synchrony support. We encourage all JGroups users to upgrade their applications to 2.5 release [4]. Upgrade is rather straightforward; API is backward compatible to 2.2.7 release, the only change should be in channel configuration files. We are also following our own advice: JGroups 2.5 will be a base library for all clustering stacks in upcoming JBossAS 5.0 release.

As we move forward we will focus on strengthening JGroups NIO protocol implementation along with enhancements to gossip routing and some other minor performance improvements. In the JGroups 3.0 release we are going to refactor JGroups API to fit the best practices we have learned so far. Stay tuned.

[1] http://portal.acm.org/citation.cfm?id=37515
[2] http://www.jgroups.org/javagroupsnew/docs/tutorial/html/index.html
[3] http://www.jgroups.org/javagroupsnew/docs/javadoc/org/jgroups/JChannelFactory.html
[4] http://sourceforge.net/project/showfiles.php?group_id=6081