JMS INPUT OPERATOR

Introduction: About the JMS Input Operator

The JMS input operator consumes data from a messaging system using the JMS client API. JMS not being a communication protocol, the operator needs an underlying JMS client API library to talk to a messaging system. Currently the operator has been tested with the Amazon SQS and Apache ActiveMQ System brokers via their respective JMS client API libraries.

Why is it needed ?

You will need the operator to read data from a messaging system (e.g. Apache ActiveMQ) via the JMS client API. The operator supports both the publish-subscribe (topics) and point-to-point (queues) modes. The operator currently does not support partitioning and dynamic scalability.

JMSBase

This class encapsulates various JMS properties and behaviors and maintains connections with the JMS broker. This is the base class for JMS input and output adaptor operators. Operators should not directly subclass JMSBase but one of the JMS input or output operators.

AbstractJMSInputOperator

This abstract implementation serves as the base class for consuming generic messages from an external messaging system. Concrete subclasses implement conversion and emit methods to emit tuples for a concrete type. JMSStringInputOperator is one such subclass in the library used for String messages. JMSObjectInputOperator is another one used for multiple message types where the user has the ability to get String, byte array, Map or POJO messages on the respective output ports.

Configuration Parameters

Common configuration parameters are described here.

Parameter

Description

windowDataManager

This is an instance of WindowDataManager that implements idempotency. Idempotency ensures that an operator will process the same set of messages in a window before and after a failure. For example, say the operator completed window 10 and failed before or during window 11. If the operator gets restored at window 10, it will replay the messages of window 10 which were saved from the previous run before the failure. Although important, idempotency comes at a price because an operator needs to persist some state at the end of each window. Default Value = org.apache.apex.malhar.lib.wal.FSWindowDataManager

connectionFactoryBuilder

The operator uses the builder pattern that requires the user to specify an instance of com.datatorrent.lib.io.jms.JMSBase.ConnectionFactoryBuilder. This builder creates the connection factory that encapsulates the underlying JMS client API library (e.g. ActiveMQ or Amazon SQS). By default the operator uses com.datatorrent.lib.io.jms.JMSBase.DefaultConnectionFactoryBuilder which is used for ActiveMQ. One of the examples below describes the Amazon SQS use-case.

Abstract Methods

The following abstract methods need to be implemented by concrete subclasses.

T convert(Message message): This method converts a JMS Message object to type T.

void emit(T payload): This method emits a tuple given the payload extracted from a JMS message.

Concrete Classes

  1. JMSStringInputOperator : This class extends AbstractJMSInputOperator to deliver String payloads in the tuple.

  2. JMSObjectInputOperator: This class extends AbstractJMSInputOperator to deliver String, byte array, Map or POJO payloads in the tuple.

Application Examples

ActiveMQ Example

The source code for the tutorial can be found here:

https://github.com/DataTorrent/examples/tree/master/tutorials/jmsActiveMQ

The following code snippet from the example illustrates how the DAG is created:

  @Override
  public void populateDAG(DAG dag, Configuration conf)
  {
    JMSStringInputOperator amqInput = dag.addOperator("amqIn", 
        new JMSStringInputOperator());

    LineOutputOperator out = dag.addOperator("fileOut", new LineOutputOperator());

    dag.addStream("data", amqInput.output, out.input);
  }

The DAG consists of only 2 operators: the JMSStringInputOperator which is the input operator that feeds received ActiveMQ messages into the output operator LineOutputOperator which outputs these messages into a file or files.

The default connectionFactoryBuilder supports ActiveMQ so there is no need to set this value. However the following ActiveMQ related values need to be set either from properties files or using the appropriate setter methods in the code:

Value

Description

connectionFactoryProperties

This is a Map of key and value strings and can be set directly from configuration as in the example above. The table below describes the most important properties.

topic

This boolean value is set to true for the publish-subscribe case and false for the PTP (point-to-point) case.

subject

This is the queue name for PTP (point-to-point) use-case and topic name for the publish-subscribe use case.

durable

This boolean value is set to true for durable subscriptionss, false otherwise. Durable subscriptions save messages to persistent storage until consumed. Used only when the clientId (see below) is set.

clientId

The client-ID for this ActiveMQ consumer in the durable subscription mode as described above.

transacted

This boolean value is set to true for transacted JMS sessions as described in Session.

ackMode

This string value sets the acknowledgement mode as described in Session fields.

The following table describes the string properties to be set in the map that is passed in the connectionFactoryProperties value described above.

Property Name

Description

brokerURL

The connection URL used to connect to the ActiveMQ broker

userName

The JMS userName used by connections created by this factory (optional when anonymous access is used)

password

The JMS password used for connections created from this factory (optional when anonymous access is used)

These properties can be set from the properties.xml file as shown below (from the example https://github.com/DataTorrent/examples/tree/master/tutorials/jmsActiveMQ ).

<configuration>
  <property>
    <name>dt.operator.amqIn.prop.connectionFactoryProperties.brokerURL</name>
    <value>vm://localhost</value>
  </property>
  <property>
    <name>dt.operator.amqIn.prop.subject</name>
    <value>jms4Amq</value>
  </property>
</configuration>                                                                                                        

SQS Example

The source code for the tutorial can be found here:

https://github.com/DataTorrent/examples/tree/master/tutorials/jmsSqs

The following code snippet from the example illustrates how the DAG is created:

 @Override
 public void populateDAG(DAG dag, Configuration conf)
 {

   JMSStringInputOperator sqsInput = dag.addOperator("sqsIn", 
       new JMSStringInputOperator());

   MyConnectionFactoryBuilder factoryBuilder = new MyConnectionFactoryBuilder();

   factoryBuilder.sqsDevCredsFilename = conf.get(SQSDEV_CREDS_FILENAME_PROPERTY);

   sqsInput.setConnectionFactoryBuilder(factoryBuilder);

   LineOutputOperator out = dag.addOperator("fileOut", new LineOutputOperator());

   dag.addStream("data", sqsInput.output, out.input);
 }

The DAG consists of only 2 operators: the JMSStringInputOperator which is the input operator that feeds received SQS messages into the output operator LineOutputOperator which outputs these messages into a file or files. The code also shows how the AWS/SQS credentials are initialized in the factory builder.

For SQS you will have to provide a custom connectionFactoryBuilder as shown in the example above and in SQSConnectionFactory.java. The builder is typically used to supply AWS region and credential information that cannot be supplied via any JMS interfaces.

The following code snippet shows a typical Builder implementation that can be supplied to the operator. The AWS credentials are supplied via a PropertiesFileCredentialsProvider object in which sqsCredsFilename is the fully qualified path to a properties file from which the AWS security credentials are to be loaded. For example /etc/somewhere/credentials.properties

static class MyConnectionFactoryBuilder implements JMSBase.ConnectionFactoryBuilder {

String sqsCredsFilename;

MyConnectionFactoryBuilder()
{
}

@Override
public ConnectionFactory buildConnectionFactory() 
{
  // Create the connection factory using the properties file credential provider.
  // Connections this factory creates can talk to the queues in us-east-1 region. 
  SQSConnectionFactory connectionFactory =
    SQSConnectionFactory.builder()
      .withRegion(Region.getRegion(Regions.US_EAST_1))
      .withAWSCredentialsProvider(new PropertiesFileCredentialsProvider(sqsCredsFilename))
      .build();
    return connectionFactory;
  }
}