Pentaho

 View Only

 Carte cluster not load balancing transformation between the slaves while using the API

Juan Sierra Pons's profile image
Juan Sierra Pons posted 03-29-2022 03:46
Hi,

I have configured a Carte cluster one master and two slaves. Basically I have launched the ones located in the pwd folder
./carte.sh pwd/carte-config-master-8080.xml
./carte.sh pwd/carte-config-8081.xml
./carte.sh pwd/carte-config-8082.xml

As far as I know there is a lack of a command tool (like pan or kitchen) to launch trans/jobs directly to the Carte cluster. I have read something about wrapping it inside a .klb https://forums.pentaho.com/threads/74921-Run-Transformation-or-Job-on-Carte-Server-from-pan-Kitchen/ but I would like to the use API instead of the wrapping

Slaves are registered successfully as per:

curl -s -L "http://cluster:cluster@localhost:8080/kettle/getSlaves/"
<?xml version="1.0" encoding="UTF-8"?>
<SlaveServerDetections>
<SlaveServerDetection>
      <slaveserver>
        <name>Dynamic slave [localhost:8081]</name>
        <hostname>localhost</hostname>
        <port>8081</port>
        <webAppName/>
        <username>cluster</username>
<password>Encrypted 2be98afc86aa7f2e4cb1aa265cd86aac8</password> <proxy_hostname/>
        <proxy_port/>
        <non_proxy_hosts/>
        <master>N</master>
        <sslMode>N</sslMode> </slaveserver>
<active>Y</active>
<last_active_date>2022/03/24 12:00:17.411</last_active_date>
<last_inactive_date/>
</SlaveServerDetection>

<SlaveServerDetection>
      <slaveserver>
        <name>Dynamic slave [localhost:8082]</name>
        <hostname>localhost</hostname>
        <port>8082</port>
        <webAppName/>
        <username>cluster</username>
<password>Encrypted 2be98afc86aa7f2e4cb1aa265cd86aac8</password> <proxy_hostname/>
        <proxy_port/>
        <non_proxy_hosts/>
        <master>N</master>
        <sslMode>N</sslMode> </slaveserver>
<active>Y</active>
<last_active_date>2022/03/24 12:00:17.416</last_active_date>
<last_inactive_date/>
</SlaveServerDetection>

</SlaveServerDetections>

When I launch in parallel a bunch of dummy transformations (200 grouped by 20) to the master they only run in the master. None has run in any of the slaves

seq 200 |parallel -j20 -n0 'curl -s -L "http://cluster:cluster@localhost:8080/kettle/executeTrans/?rep=myRepository&trans=Juan%2FDummy1&level=Debug"'

Am I missing something?
Andrew Cave's profile image
Andrew Cave
you'll need to set up a wrapper job with a  Job or Transformation Executor entry that points to the job you want to run.  In the options for the entry, you set the run configuration to point to your carte server.
Juan Sierra Pons's profile image
Juan Sierra Pons

Thanks @Andrew Cave I have done as suggested


​​With the cluster declared as this way:


But it is still running only in the master only.

What am I missing?

Andrew Cave's profile image
Andrew Cave
Hi Juan

You have only defined a run configuration for the Master server so it's doing what you're telling it do.  Add a run configuration for the slaved servers and update the execution step.
Juan Sierra Pons's profile image
Juan Sierra Pons
Hi @Andrew Cave,

I am not fully understanding this. As I see it the good thing about dynamic clusters is that they are transparent to spoon, pan, kitchen. I mean It shouldn't be  necessary knowing in advance the cluster's members therefore not configuration in advance is needed.
With your approach I have to configure the slaves in advance.

By the way I have added the two slaves

And updated the execution step

With no luck.

All is still running in the master :(

Thanks for your time


​​
Roberto Velasco Martin's profile image
Roberto Velasco Martin
I have a similar problem. But I run the transformation in the two slaves in paralel.This is also not the expected behavior. It should run on one of the two slaves

My config to execute is


Andrew Cave's profile image
Andrew Cave
Hi Juan et al

Have you set up the files for the dynamic clustering as per this page  https://help.hitachivantara.com/Documentation/Pentaho/8.2/Products/Data_Integration/Carte_Clusters/Setup ?
Juan Sierra Pons's profile image
Juan Sierra Pons
Yes @Andrew Cave, I was using the configuration from that link. But I am using 9.2

Thanks for your time

Best regards​​​
Andrew Cave's profile image
Andrew Cave
It's really unclear documentation isn't it. 

From this old post by Diethard Steiner http://diethardsteiner.blogspot.com/2013/03/creating-clustered-transformation-in.html   it looks like a cluster schema needs to be set up as well - which you can only do from a transformation (weird?)

Another reference is here https://pentaho-community.atlassian.net/wiki/spaces/EAI/pages/374571420/Dynamic+clusters   .  Also quite old but it does look like the cluster schema is very important.

Since you haven't mentioned that step and the 8.2 documentation says it is for executing in parallel you might have missed it?  But it also says
Dynamic cluster

If checked, a master Carte server will perform failover operations, and you must define the master as a slave server in the field below. If unchecked, the PDI client will act as the master server, and you must define the available Carte slaves in the field below.


Man, what a cluster-****   : D

Duarte Cunha Leao's profile image
Duarte Cunha Leao

I'm a bit late to the "party", but it may help people passing by in the future.

I was also struggling for the "cluster mode" to kick in, when executing a partitioned transformation or its wrapper job. A lot of conditions need to be met which are not at all obvious and are not present/explicit in the documentation.

The following may not apply directly or exactly to the stated problem, yet I hope that (at least part of) the solution is the same.

The version of Pentaho I tested this was 9.3.

Attached is a zip with the sample job and transformation.

Wrapper Job

The wrapper job, PDISamplePartitionJob.kjb, calls the "clustered transformation", PDISamplePartition.ktr.
The transformation job entry has the following settings:



Notice the Run configuration setting, whose value is Run Clustered.

Run Configuration

The settings of the Run Clustered run configuration are:

Note the special/virtual slave server called Clustered, which is not defined in View Panel > Slave server node. It is selecting this special slave server which activates running "cluster mode" (at least in my case, it did).
This run configuration enables clustered execution of a transformation, without specifying the cluster-schema/slave-server(s) in which the transformation will run.
As you'll see, these are determined by the cluster schema assigned to each step of the transformation.

Clustered Transformation

The following shows the partitioned and clustered transformation, PDISamplePartition.ktr:


In a nutshell, this is how this transformation works:

  1. The Generate rows step generates 10000 rows, having a single data field, with a constant value
  2. The Generate random value, adds a random field, with random string values
  3. Finally, the Remove "data" field step, removes the dummy data field
More importantly, the Remove "data" field step is configured to group rows into partitions and, when running in clustered mode, to assign each of these row partitions to one of the available slave servers.


I haven't tested the following, but I suspect it's how it works: when running in clustered mode, for each slave server, or, otherwise, for the single server, its partitions are further distributed across the configured copies of the step (each running in a different thread). In this case, the step was configured with a number of copies of one.

Partition Schema

To configure row partitioning for the Remove data field step, first create a new partition schema, One Partition Per Slave Dynamic:


The partition schema determines the number of partitions to be used.

A static partition schema has Dynamically create the schema definition?  unchecked and its partitions are static and explicitly listed. AFAIK, the partition identifiers are arbitrary and only used for debugging/logging.

A dynamic partition schema has Dynamically create the schema definition?  checked. When not running in clustered mode, it is used just like a static partition schema. Otherwise, when running in clustered mode, the listed partitions are ignored and are instead determined dynamically according to the number of slave servers of the cluster (the identifier of dynamic partitions is PDyn<partition-index>). The number of partitions will be the cluster's number of slave servers times the value of Number of partitions per slave server?. For example, a cluster with three slave servers and two partitions per slave server would use six partitions.

Lastly, on the step's context menu, select "Partitions...", and, in the successive dialogs displayed, configure the following:

  1. Partition method: Remainder of division — each distinct value of the Partition field is assigned to the partition whose index is the remainder of the division of that value by the number of partitions (string values use the hash code)
  2. Partition schema: One Partition Per Slave Dynamic — when clustered, use one partition per slave server
  3. Partition field: random — the field whose value determines the partition of a row

    Note that, generally, a partition will have several values associated with it. Crucially, each partition, and all values assigned to it, will be handled by a single slave server and thread.

    Cluster Schema

    When running a transformation in clustered mode, a single cluster schema is supported — all "clustered steps" must specify the same Cluster schema. Unclustered steps run in the cluster's master server, while clustered steps run in the cluster's slave servers.

    To configure the Cluster schema to use for the Remove data field step, let's first create the cluster's master server, Local Pentaho Master, a Pentaho Server located at the default location, and having default credentials:

    Next, let's create the cluster schema, Cluster Schema Dynamic:

    This is a dynamic cluster, as determined by Dynamic cluster being checked. Its slave servers are spawn dynamically, at runtime. As such, the "Slave servers" list need only indicate the master server...

    Lastly, on the step's context menu, select "Clusters...", and, for Cluster schema, select the just created Cluster Schema Dynamic.


    Hope this helps, cheers!

    Juan Sierra Pons's profile image
    Juan Sierra Pons
    Plas, plas, plas! @Duarte Cunha Leao
    Thanks for your very detailed explanation :)

    I was researching Pentaho's clusters as I thought I was going to face a very high load on my systems but I have found out that with a few slaves I can manage pretty well.

    What I have done is to build a home made wrapper to use Carte's API and load balance at wrapper level. Basically:
    1.- I check slaves status (online|offine) http://slave1:8080 http://slave2:8081, etc
    2.- I check the number of jobs running on each slave http://slave1:8080/kettle/status/?xml=Y
    3.- I select the laziest one and send him the job
    4.- I check periodically the Carte to get the logs till the job Finished

    https://help.hitachivantara.com/Documentation/Pentaho/9.3/Developer_center/REST_API_Reference/Carte/030

    Hope it helps to future readers​