CsvFormatter
Operator Objective
This operator receives a POJO (Plain Old Java Object) as an incoming tuple, converts the data in the incoming POJO to a custom delimited string and emits the delimited string.
CsvFormatter supports schema definition as a JSON string.
CsvFormatter does not hold any state and is idempotent, fault-tolerant and statically/dynamically partitionable.
Operator Information
- Operator location: malhar-contrib
- Available since: 3.2.0
- Operator state: Evolving
- Java Packages:
Properties, Attributes and Ports
Properties of POJOEnricher
Property | Description | Type | Mandatory | Default Value |
---|---|---|---|---|
schema | Contents of the schema.Schema is specified in a json format. | String | Yes | N/A |
Platform Attributes that influences operator behavior
Attribute | Description | Type | Mandatory |
---|---|---|---|
in.TUPLE_CLASS | TUPLE_CLASS attribute on input port which tells operator the class of POJO which will be incoming | Class or FQCN | Yes |
Ports
Port | Description | Type | Mandatory |
---|---|---|---|
in | Tuples which need to be formatted are received on this port | Object (POJO) | Yes |
out | Tuples that are formatted are emitted from this port | String | No |
err | Tuples that could not be converted are emitted on this port | Object | No |
Limitations
Current CsvFormatter contain following limitations:
- The field names in schema and the pojo field names should match.For eg. if name of the schema field is "customerName", then POJO should contain a field with the same name.
- Field wise validation/formatting is not yet supported.
- The fields will be written to the file in the same order as specified in schema.json
Example
Example for CsvFormatter can be found at: https://github.com/DataTorrent/examples/tree/master/tutorials/csvformatter
Advanced
Schema format for CsvFormatter
CsvFormatter expects schema to be a String in JSON format:
Example for format of schema:
{
"separator": ",",
"quoteChar": "\"",
"lineDelimiter": "\n",
"fields": [
{
"name": "campaignId",
"type": "Integer"
},
{
"name": "startDate",
"type": "Date",
"constraints": {
"format": "yyyy-MM-dd"
}
}
]
}
Partitioning of CsvFormatter
Being stateless operator, CsvFormatter will ensure built-in partitioners present in Malhar library can be directly used by setting properties as follows:
Stateless partioning of CsvFormatter
Stateless partitioning will ensure that CsvFormatter will be partitioned right at the start of the application and will remain partitioned throughout the lifetime of the DAG. CsvFormatter can be stateless partitioned by adding following lines to properties.xml:
<property>
<name>dt.operator.{OperatorName}.attr.PARTITIONER</name>
<value>com.datatorrent.common.partitioner.StatelessPartitioner:2</value>
</property>
where {OperatorName} is the name of the CsvFormatter operator. Above lines will partition CsvFormatter statically 2 times. Above value can be changed accordingly to change the number of static partitions.
Dynamic Partitioning of CsvFormatter
Dynamic partitioning is a feature of Apex platform which changes the partition of the operator based on certain conditions. CsvFormatter can be dynamically partitioned using below out-of-the-box partitioner:
Throughput based
Following code can be added to populateDAG method of application to dynamically partition CsvFormatter:
StatelessThroughputBasedPartitioner<CsvFormatter> partitioner = new StatelessThroughputBasedPartitioner<>();
partitioner.setCooldownMillis(conf.getLong(COOL_DOWN_MILLIS, 10000));
partitioner.setMaximumEvents(conf.getLong(MAX_THROUGHPUT, 30000));
partitioner.setMinimumEvents(conf.getLong(MIN_THROUGHPUT, 10000));
dag.setAttribute(csvFormatter, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{partitioner}));
dag.setAttribute(csvFormatter, OperatorContext.PARTITIONER, partitioner);
Above code will dynamically partition CsvFormatter when throughput changes. If overall throughput of CsvFormatter goes beyond 30000 or less than 10000, the platform will repartition CsvFormatter to balance throughput of a single partition to be between 10000 and 30000. CooldownMillis of 10000 will be used as threshold time for which throughput change is observed.