I'm a bit late to the "party", but it may help people passing by in the future.
I was also struggling for the "cluster mode" to kick in, when executing a partitioned transformation or its wrapper job. A lot of conditions need to be met which are not at all obvious and are not present/explicit in the documentation.
The following may not apply directly or exactly to the stated problem, yet I hope that (at least part of) the solution is the same.
The version of Pentaho I tested this was 9.3.
Attached is a zip with the sample job and transformation.
Wrapper Job
The wrapper job, PDISamplePartitionJob.kjb
, calls the "clustered transformation", PDISamplePartition.ktr
.
The transformation job entry has the following settings:
Notice the Run configuration setting, whose value is Run Clustered
.
Run Configuration
The settings of the Run Clustered
run configuration are:
Note the special/virtual slave server called Clustered
, which is not defined in View Panel > Slave server node. It is selecting this special slave server which activates running "cluster mode" (at least in my case, it did).
This run configuration enables clustered execution of a transformation, without specifying the cluster-schema/slave-server(s) in which the transformation will run.
As you'll see, these are determined by the cluster schema assigned to each step of the transformation.
Clustered Transformation
The following shows the partitioned and clustered transformation, PDISamplePartition.ktr
:
In a nutshell, this is how this transformation works:
- The
Generate rows
step generates 10000 rows, having a single data
field, with a constant value
- The
Generate random value
, adds a random
field, with random string values
- Finally, the
Remove "data" field
step, removes the dummy data
field
More importantly, the
Remove "data" field
step is configured to group rows into partitions
and, when running in clustered mode, to assign each of these row partitions to one of the available slave servers.
I haven't tested the following, but I suspect it's how it works: when running in clustered mode, for each slave server, or, otherwise, for the single server, its partitions are further distributed across the configured copies of the step (each running in a different thread). In this case, the step was configured with a number of copies of one.
Partition Schema
To configure row partitioning for the Remove data field
step, first create a new partition schema, One Partition Per Slave Dynamic
:
The partition schema determines the number of partitions to be used.
A static partition schema has Dynamically create the schema definition? unchecked and its partitions are static and explicitly listed. AFAIK, the partition identifiers are arbitrary and only used for debugging/logging.
A dynamic partition schema has Dynamically create the schema definition? checked. When not running in clustered mode, it is used just like a static partition schema. Otherwise, when running in clustered mode, the listed partitions are ignored and are instead determined dynamically according to the number of slave servers of the cluster (the identifier of dynamic partitions is PDyn<partition-index>
). The number of partitions will be the cluster's number of slave servers times the value of Number of partitions per slave server?. For example, a cluster with three slave servers and two partitions per slave server would use six partitions.
Lastly, on the step's context menu, select "Partitions...", and, in the successive dialogs displayed, configure the following:
- Partition method:
Remainder of division
— each distinct value of the Partition field is assigned to the partition whose index is the remainder of the division of that value by the number of partitions (string values use the hash code)
- Partition schema:
One Partition Per Slave Dynamic
— when clustered, use one partition per slave server
- Partition field:
random
— the field whose value determines the partition of a row
Note that, generally, a partition will have several values associated with it. Crucially, each partition, and all values assigned to it, will be handled by a single slave server and thread.
Cluster Schema
When running a transformation in clustered mode, a single cluster schema is supported — all "clustered steps" must specify the same Cluster schema. Unclustered steps run in the cluster's master server, while clustered steps run in the cluster's slave servers.
To configure the Cluster schema to use for the Remove data field
step, let's first create the cluster's master server, Local Pentaho Master
, a Pentaho Server located at the default location, and having default credentials:
Next, let's create the cluster schema, Cluster Schema Dynamic
:
This is a dynamic cluster, as determined by Dynamic cluster being checked. Its slave servers are spawn dynamically, at runtime. As such, the "Slave servers" list need only indicate the master server...
Lastly, on the step's context menu, select "Clusters...", and, for Cluster schema, select the just created Cluster Schema Dynamic
.
Hope this helps, cheers!