User Defined Control Tuples

Introduction

Custom control tuple support in Apache Apex gives the user the capability to insert user defined control tuples in the data flow. For analogy, the engine already supports a few pre-defined control tuples like BEGIN_WINDOW, END_WINDOW, etc. Until now, we did not have the support for applications to insert their own control tuples.

Terminology

All discussion in this document is related to Control Tuples generated by user defined logic. The document may refer to these tuples as Control Tuples, User Defined Control Tuples or Custom Control Tuples interchangeably.

Definition

A user defined control tuple could be any user defined object which implements a ControlTuple interface.

See Delivery Semantics for details on DeliveryType

public interface ControlTuple
{
  DeliveryType getDeliveryType();

  enum DeliveryType
  {
    IMMEDIATE,
    END_WINDOW
  }
}

Example user defined control tuple:

public class TestControlTuple implements ControlTuple
{
  public long data;
  public boolean immediate;

  // For Kryo
  public TestControlTuple()
  {
    data = 0;
  }

  // Constructor
  public TestControlTuple(long data, boolean immediate)
  {
    this.data = data;
    this.immediate = immediate;
  }

  @Override
  public DeliveryType getDeliveryType()
  {
    if (immediate) {
      return DeliveryType.IMMEDIATE;
    } else {
      return DeliveryType.END_WINDOW;
    }
  }
}

Use cases

A control tuple may be used in an application to trigger some sort of action in a downstream operator. For example, the source operator might want to notify the last operator that it has emitted all the data in a file and that the file has now ended. Let's call this an End-Of-File control tuple. Once the last operator gets the End-Of-File tuple, it would, say, close the destination file it was writing and create a new file.

More use cases which were discussed during the requirements of this feature are as follows:

  1. Batch support - We need to tell all operators of the physical DAG when a batch starts and ends, so the operators can do whatever is needed upon the start or the end of a batch.
  2. Watermark - To support the concepts of event time windowing, the watermark control tuple is needed to identify late windows.
  3. Changing operator properties - We do have the support of changing operator properties on the fly, but with a custom control tuple, the command to change operator properties can be window aligned for all partitions and also across the DAG. In other words, the properties of all physical partitions can be aligned to a particular window. In case the behavior of the application needs to change, we may also be able to change properties of multiple logical operators aligned to a particular window.
  4. Recording tuples - Like changing operator properties, we do have this support now but only at the individual physical operator level, and without control of which window to record tuples for. With a custom control tuple, because a control tuple must belong to a window, all operators in the DAG can start (and stop) recording for the same windows.

Usage

Generating a Control Tuple

There is no restriction on which operator in the DAG can or can not generate a control tuple. The operator which needs to generate a control tuple should declare a port whose type is ControlAwareDefaultOutputPort; the user could simply call the emitControl(ControlTuple t) method on this port.

Example: In the code snippet below, the Generator operator declares a ControlAwareDefaultOutputPort called output which can emit a data tuple as well as a control tuple.

public class Generator extends BaseOperator implements InputOperator
{
  private long data;
  private long count;

  public final transient ControlAwareDefaultOutputPort<Double> output =
      new ControlAwareDefaultOutputPort<>();

  @Override
  public void emitTuples()
  {
    // Can emit a data tuple using output.emit()
    output.emit(data++);
    count++;
  }

  @Override
  public void endWindow()
  {
    // Can also emit a control tuple using output.emitControl()
    output.emitControl(new TestControlTuple(count, immediate));
  }
}

Note - User defined control tuples and control aware ports can only be used in operators which use the apex-core dependency which has control tuple support, viz. 3.6.0 or above. Previous versions of apex-core would not be able to support an application which uses user defined control tuples or control aware ports and would crash at launch time.

Receiving a Control Tuple

Any downstream operator which wants to receive a user defined control tuple, should declare an input port which is Control Aware. A ControlAwareDefaultInputPort would have the necessary capability to process a control tuple in addition to a regular data tuple.

Example: Below code snippet illustrates the use of processControl method of ControlAwareDefaultInputPort to receive / handle user defined control tuples.

public final transient ControlAwareDefaultInputPort<Double> input =
    new ControlAwareDefaultInputPort<Double>()
{
  // Process a data tuple
  @Override
  public void process(Double tuple)
  {
    output.emit(tuple);
  }

  // Process a control tuple
  @Override
  public boolean processControl(ControlTuple userControlTuple)
  {
    // process control tuple here
    return false;
    // indicates whether or not the engine
    // should propagate the tuple automatically to downstream operators
    // Discussed in later sections
  }
};

Note that the pre-defined control tuples like BEGIN_WINDOW and END_WINDOW would not be handled by the processControl() method since these used only by the engine and are not meant to be delivered to user logic in operators. Custom control tuples on the other hand are generated by the operators and need to be delivered to downstream operators.

Return value of processControl

Following are the semantics:

  1. true - Operator would handle propagation explicitly
  2. false - Operator would not handle propagation. Engine will automatically forward.

See Propagation of Control Tuples for more details

Serialization requirements

A control tuple generated by some operator of the application needs to traverse the same path as that traversed by other data tuples transmitted by the application. For this reason, similar to the other data tuples, the control tuple needs to be Kryo serializable since the default serializer used by the platform is Kryo.

Propagation of Control Tuples

A control tuple emitted by an operator can be propagated downstream automatically. This is in line with the automatic propagation of other pre-defined control tuples in the engine. However, some use cases require that the control tuple need not be propagated further in the DAG. We support this behavior for user defined control tuples.

Once the control tuple is processed in the processControl method, a return value is expected by the engine. This return value indicates whether or not the operator wishes to handle the propagation of the control tuple or let the engine proceed with the default auto-propagation of the control tuple.

The processControl method of the ControlAwareDefaultInputPort returns a boolean return value.

@Override
public boolean processControl(ControlTuple userControlTuple)
{
  // process userControlTuple here
  // return true if operator wants to propagate explicitly or block propagation
  // return false if operator wants engine to propagate automatically
}

Non - Control Aware ports

For operators without Control Aware ports, the platform will forward the control tuples to the downstream operators automatically. The application writer / user does not have to worry about handling a Control tuple which is generated upstream. Only operators with Control Aware ports would be delivered the control tuple via the processControl method. This also allows the existing operators to be backward compatible.

Delivery Semantics

Delivery mechanism refer to the time wrt. the processing window when a control tuple is delivered to the operator. An operator has various call backs like setup, beginWindow, endWindow, etc.

DeliveryType IMMEDIATE

As the name implies, the control tuple is immediately delivered to the next downstream operator (if the operator is control aware), else it is forwarded to the next downstream operator.

  • Case: Downstream is partitioned
    When the downstream is partitioned, the control tuple with IMMEDIATE delivery type would go to all the downstream partitions. This holds, irrespective of whether or not the control tuple was generated by the immediately upstream operator or even further upstream.

  • Case: Upstream is partitioned
    When the upstream is partitioned and the control tuple is generated in any subset of the partitions the downstream operator would receive the control tuple immediately and would not wait till the end of the current window. In case the source for the control tuple was a single source further upstream and multiple copies were generated by the intermediate partitions, the duplicate copies of the control tuple would be filtered out at the downstream operator. Thus only unique control tuples are delivered to the downstream operator. Further, in case of IMMEDIATE delivery, the first instance of the control tuple is delivered to the operator and the duplicates filtered out.

DeliveryType END_WINDOW

This delivery type only delivers the control tuple to the operator after all data tuples have been delivered to the operator. In the operator lifecycle, this would mean that the control tuples would be delivered just before the endWindow call.

  • Case: Downstream is partitioned
    When the downstream is partitioned, the control tuple emitted by the upstream would be broadcast to downstream operators and buffered in the downstream partitions until the end of the window and is delivered to the operator just before the endWindow call.

  • Case: Upstream is partitioned
    If the control tuples are generated in any subset of the partitions, then each control tuple is unique and are delivered to the downstream operator before the endWindow call. However, if the source for the control tuple is a source further upstream, then the downstream operator would filter out duplicates as and when each control tuple arrive at the operator, and finally all unique control tuples are delivered to the operator just before the endWindow call.

Assumptions

All the user defined control tuples used in the application are cached in the memory of the operator for the duration of a window. For this reason, it is imperative that the size as well as the number of control tuples emitted within a window is small as compared to the number of data tuples.

JIRA

  • APEXCORE-579 points to the top level JIRA issue for control tuple support.