Transform - Operator Documentation
About Transform operator
Transform means mapping of field expression from input to output or conversion of fields from one type to another. This operator is stateless. This operator receives objects on its input port; for each such input object, it creates a new output object whose fields are computed as expressions involving fields of the input object. The types of the input and output objects are configurable as are the expressions used to compute the output fields.
The operator class is TransformOperator
located in the package com.datatorrent.lib.transform
.
Please refer to github URL for TransformOperator
.
Use Case
Consider the data that needs to be transformed as per output schema.
Consider input objects with these fields:
Name | Type |
---|---|
FirstName | String |
LastName | String |
Phone | String |
DateOfBirth | java.util.Date |
Address | String |
and output objects with fields:
Name | Type |
---|---|
Name | String |
Phone | String |
Age | Integer |
Address | String |
Suppose Name
is a concatenation of FirstName
and LastName
and
Age
is computed by subtracting the DateOfBirth
from the current year.
These simple computations can be expressed as Java expressions where the input object is represented by $ and provided as configuration parameters as follows:
Name => {$.FirstName}.concat(\" \").concat({$.LastName})
Age => (new java.util.Date()).getYear() - {$.dateOfBirth}.getYear()
Configuration Parameters
-
expressionMap - Map
- Mandatory Parameter
- Specifies the map between the output field (key) and the expression used to compute it (value) using fields of the input Java object.
-
expressionFunctions - List
- List of imported classes or methods should be made available to expression to use. It overrides the default list.
- Default Value = {java.lang.Math., org.apache.commons.lang3.StringUtils., org.apache.commons.lang3.StringEscapeUtils., org.apache.commons.lang3.time.DurationFormatUtils., org.apache.commons.lang3.time.DateFormatUtils.*}
-
copyMatchingFields - boolean
- Specifies whether matching fields should be copied; here matching means the name and type of an input field is the same as the name and type of an output field.
If the matching field appears in
expressionMap
then it ignores copy to output object. - Default Value = true.
- Specifies whether matching fields should be copied; here matching means the name and type of an input field is the same as the name and type of an output field.
If the matching field appears in
Configuration Example
Consider input object with fields:
Name | Type |
---|---|
FirstName | String |
LastName | String |
StartDate | org.joda.time.DateTime |
and output objects with fields:
Name | Type |
---|---|
Name | String |
isLeapYear | Boolean |
Note: org.joda.time.DateTime
class is not present in the default list. So, we need to add this library to expressionFunctions
as below in populateDAG method:
TransformOperator operator = dag.addOperator("transform", new TransformOperator());
operator.setExpressionFunctions(Arrays.asList("org.joda.time.DateTime", org.apache.commons.lang3.StringUtils));
Map<String,String> expressionMap = new HashMap<>();
expressionMap.put(isLeapYear, {$.StartDate}.year().isLeap());
expressionMap.put(Name, org.apache.commons.lang3.StringUtils.joinWith(\" \", {$.FirstName},{$.LastName});
operator.setExpressionMap(expressionMap);
Above Properties also can be set in properties file as follows:
<property>
<name>dt.operator.transform.expressionFunctions[0]</name>
<value>org.joda.time.DateTime</value>
</property>
<property>
<name>dt.operator.transform.expressionFunctions[1]</name>
<value>org.apache.commons.lang3.StringUtils</value>
</property>
<property>
<name>dt.operator.transform.expressionMap(isLeapYear)</name>
<value>{$.StartDate}.year().isLeap()</value>
</property>
<property>
<name>dt.operator.transform.expressionMap(Name)</name>
<value>org.apache.commons.lang3.StringUtils.joinWith(\" \", {$.FirstName}, {$.LastName})</value>
</property>
Ports
-
input - Port for input tuples.
- Mandatory input port
-
output - Port for transformed output tuples.
- Mandatory output port
Attributes
-
Input port Attribute - input.TUPLE_CLASS - Fully qualified class name and class should be Kryo serializable.
- Mandatory attribute
- Type of input tuple.
-
Output port Attribute - output.TUPLE_CLASS - Fully qualified class name and class should be Kryo serializable.
- Mandatory attribute
- Type of output tuple.
Application Example
Please refer Example for transform sample application.
Partitioning
Being stateless, this operator can be partitioned using any of the built-in partitioners present in the Malhar library by setting a few properties as follows:
Stateless partitioning
Stateless partitioning will ensure that TransformOperator will be partitioned right at the starting of the application and will remain partitioned throughout the lifetime of the DAG. TransformOperator can be stateless partitioned by adding following lines to properties.xml:
<property>
<name>dt.operator.{OperatorName}.attr.PARTITIONER</name>
<value>com.datatorrent.common.partitioner.StatelessPartitioner:{N}/value>
</property>
where {OperatorName} is the name of the TransformOperator operator and {N} is the number of static partitions. Above lines will partition TransformOperator statically {N} times.
Dynamic Partitioning
Dynamic partitioning is a feature of Apex platform which changes the partition of the operator based on certain condition. TransformOperator can be dynamically partitioned using the below two partitioners:
Throughput based
Following code can be added to populateDAG(DAG dag, Configuration conf) method of application to dynamically partitioning TransformOperator:
StatelessThroughputBasedPartitioner<TransformOperator> partitioner = new StatelessThroughputBasedPartitioner<>();
partitioner.setCooldownMillis(10000);
partitioner.setMaximumEvents(30000);
partitioner.setMinimumEvents(10000);
dag.setAttribute(transform, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{partitioner}));
dag.setAttribute(transform, OperatorContext.PARTITIONER, partitioner);
Above code will dynamically partition TransformOperator when the throughput changes. If the overall throughput of TransformOperator goes beyond 30000 or less than 10000, the platform will repartition TransformOperator to balance throughput of a single partition to be between 10000 and 30000. CooldownMillis of 10000 will be used as the threshold time for which the throughout change is observed.
Source code for this dynamic application can be found here.