Pentaho

 View Only

 Kafka Consumer semantic in PDI 8.1

  • Pentaho
  • Kettle
  • Pentaho
  • Pentaho Data Integration PDI
Virgilio Pierini's profile image
Virgilio Pierini posted 04-17-2019 08:25

Hi all

I'm interested in setting up different semantics when retrieving data from Kafka with PDI: at most once, at least once, exactly once. I read [PDI-17272] Implement explicit commit in Kafka Consumer - Pentaho Platform Tracking and since there are no comments I'm a bit puzzled.

What am I doing?

I have one transformation with Kafka Consumer step (Kafka Consumer - Pentaho Documentation ) and on subtrasformation just logging this data to Elasticsearch. I can have the sub trasform to fail and so I'd like to test different reprocessing of data.

I'm on PDI CE 8.1.0.0-365 and Kafka 2.11

So, is this semantic configuration possible? And where should I start?

An idea I came up with is about using  "enable.auto.commit" and "isolation.level" (Apache Kafka ) but I don't know if those are supported / overwritten...

Has anybody some experience to share?

Thank you

Virgilio


#Pentaho
#Kettle
#PentahoDataIntegrationPDI
Virgilio Pierini's profile image
Virgilio Pierini

mmm, let's try to engage people with a simpler question: is there anybody using kafka consumer? 

I promise I'm not going to ask you anything if you raise your hand :-) it's just a sort of poll because I do not believe nobody thought about semantic.

Maybe it's EE, ok fine, just let me know...

Rodrigo Haces's profile image
Rodrigo Haces

Virgilio, I'm currently using the Kafka Consumer and have to say I have not noticed that PDI-17272 happens (but honestly I haven't tested it deeply, but will have to do it now that I see the bug report).

I have used plenty of times the `enable.auto.commit` property = false and I can clearly go and re-read messages I have already read. I think one thing you have to make sure is to correctly set the `auto.offset.reset` to earliest so that you read the oldest messages and not the newest messages.

Virgilio Pierini's profile image
Virgilio Pierini

I performed some tests and, as you said, I did not expect any strange behaviour.

So, basically, when not performing commits (or performing commits after long time) every time a trasformation starts it gets the whole log from kafka.

This has some drawbacks because it means reprocessing things. I was wondering if there is a way to commit offsets once the subtrasformation has succeeded...

Do you have the same need?

Regards

Virgilio