Pentaho

 View Only

 Pentaho Kafka Consumer in version 9.0 is resulting into io.reactivex.exceptions.MissingBackpressureException

  • Pentaho
  • Kettle
  • Pentaho
  • Pentaho Data Integration PDI
Rajkumar Venkatasamy's profile image
Rajkumar Venkatasamy posted 05-08-2020 10:50

I have developed a Pentaho Kafka Consumer transformation (and associated sub transformation) and that was working fine in version PDI Community edition 8.0. But the same transformations when processing the same set of messages from same topic, fails after PDI upgrade to 9.0 with the following exception:

 

2020/05/07 19:47:39 - Kafka Consumer.0 - ERROR (version 9.0.0.0-423, build 9.0.0.0-423 from 2020-01-31 04.53.04 by buildguy) : io.reactivex.exceptions.MissingBackpressureException: Could not emit buffer due to lack of requests 2020/05/07 19:47:39 - Kafka Consumer.0 - at io.reactivex.internal.subscribers.QueueDrainSubscriber.fastPathOrderedEmitMax(QueueDrainSubscriber.java:121) 2020/05/07 19:47:39 - Kafka Consumer.0 - at io.reactivex.internal.operators.flowable.FlowableBufferTimed$BufferExactBoundedSubscriber.run(FlowableBufferTimed.java:569) 2020/05/07 19:47:39 - Kafka Consumer.0 - at io.reactivex.Scheduler$Worker$PeriodicTask.run(Scheduler.java:479) 2020/05/07 19:47:39 - Kafka Consumer.0 - at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:66) 2020/05/07 19:47:39 - Kafka Consumer.0 - at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:57) 2020/05/07 19:47:39 - Kafka Consumer.0 - at java.util.concurrent.FutureTask.run(Unknown Source) 2020/05/07 19:47:39 - Kafka Consumer.0 - at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(Unknown Source) 2020/05/07 19:47:39 - Kafka Consumer.0 - at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) 2020/05/07 19:47:39 - Kafka Consumer.0 - at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) 2020/05/07 19:47:39 - Kafka Consumer.0 - at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) 2020/05/07 19:47:39 - Kafka Consumer.0 - at java.lang.Thread.run(Unknown Source) 

Please feedback if this is a known issue or some fix is available to resolve this. Let me know, if I have to provide more clarifications on this regards.

 

 

Thanks and Regards

Rajkumar.V

 


#Pentaho
#Kettle
#PentahoDataIntegrationPDI
Karen Connelly's profile image
Karen Connelly

Following. I just encountered the exact same exception with the AMQP Consumer using PDI v8.3.

gogo ray's profile image
gogo ray

I also had the same error,

 

 

2021/05/13 18:25:09 - Kafka consumer.0 - ERROR (version 9.0.0.0-423, build 9.0.0.0-423 from 2020-01-31 04.53.04 by buildguy) : Unexpected error2021/05/13 18:25:09 - Kafka consumer.0 - ERROR (version 9.0.0.0-423, build 9.0.0.0-423 from 2020-01-31 04.53.04 by buildguy) : io.reactivex.exceptions.MissingBackpressureException: Could not emit buffer due to lack of requests2021/05/13 18:25:09 - Kafka consumer.0 - at io.reactivex.internal.subscribers.QueueDrainSubscriber.fastPathOrderedEmitMax(QueueDrainSubscriber.java:121)2021/05/13 18:25:09 - Kafka consumer.0 - at io.reactivex.internal.operators.flowable.FlowableBufferTimed$BufferExactBoundedSubscriber.run(FlowableBufferTimed.java:569)2021/05/13 18:25:09 - Kafka consumer.0 - at io.reactivex.Scheduler$Worker$PeriodicTask.run(Scheduler.java:479)2021/05/13 18:25:09 - Kafka consumer.0 - at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:66)2021/05/13 18:25:09 - Kafka consumer.0 - at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:57)2021/05/13 18:25:09 - Kafka consumer.0 - at java.util.concurrent.FutureTask.run(FutureTask.java:266)2021/05/13 18:25:09 - Kafka consumer.0 - at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)2021/05/13 18:25:09 - Kafka consumer.0 - at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)2021/05/13 18:25:09 - Kafka consumer.0 - at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)2021/05/13 18:25:09 - Kafka consumer.0 - at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)2021/05/13 18:25:09 - Kafka consumer.0 - at java.lang.Thread.run(Thread.java:748)

 

 

 

But I Found other question,The CPU and Memory used very hight,If you choose the PDI v8.0 it's ok.