Improvement suggestions to AbstractRabbitMQInputOperator

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Improvement suggestions to AbstractRabbitMQInputOperator

vikram patil
Hello All,


While helping one of the Apex User, I found that current
AbstractRabbitMQInputOperator
can be made simpler to use . After investigation, I would like to suggest
some improvements as below. If there are more improvements needed, please
suggest and I will incorporate those suggestions to create Jira ticket.


In current code for AbstractRabbitMQInputOperator exchange, exchangeType
are made NotNull which doesn't allow app to be launched without specifying
these values. For an input Operator, we are actually trying to create
queues and exchanges with specified values. But it leads to conflict in
some scenarios when default exchange is used for the queue as well when
queue type is transient. To consume from rabbitmq, operator need to use
only QueueName, host and port of rabbitmq.  Similar to KafkaInputOperator
we can let the operator fail if QueueName is not specified and let
developer correct an application or specified it from configuration.



Suggested Improvements:



   1.

   Drop requirements to specify exchange and its type .
   2.

   We should not be attempting to create queue in Input Operator. For
   consumer only queue name is sufficient to start consuming data from queue.
   3.

   Currently queue name is optional, we should make it mandatory instead of
   creating queue.



I tried out following scenarios to test existing operator .



Scenarios with default exchanges:

   1.

   *Queue is already created as non-durable with default exchange*

Setup:

rabbitMQInputOperator.setQueueName("test_2");

rabbitMQInputOperator.setExchangeType("fanout");

rabbitMQInputOperator.setExchange("");

rabbitMQInputOperator.setHost("localhost");

rabbitMQInputOperator.setPort(5672);





Exception:

Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
protocol method: #method<channel.close>(reply-code=403,
reply-text=ACCESS_REFUSED - operation not permitted on the default
exchange, class-id=40, method-id=10)

at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66)

at
com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36)

at
com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:398)

at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:244)

at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:128



Reason:

Default exchange is specified in the code as “” . Though we are trying to
consume from specified and already created queue, operator crashes as
exchangeDeclare() call fails .



   1.

   Queue is not created before launching an app





Setup:

rabbitMQInputOperator.setQueueName("test");

rabbitMQInputOperator.setExchangeType("fanout");

rabbitMQInputOperator.setExchange("");

rabbitMQInputOperator.setHost("localhost");

rabbitMQInputOperator.setPort(5672);







Exception:

Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
protocol method: #method<channel.close>(reply-code=403,
reply-text=ACCESS_REFUSED - operation not permitted on the default
exchange, class-id=40, method-id=10)

at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66)

at
com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36)

at
com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:398)

at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:244)

at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:128







Reason:

 Current operator failed to create default exchange .
Scnenarios with Custom Exchanges:

   1.

   Queue “test_2” created as non-durable with exchange “new_exchange” in
   rabbitmq

Setup:

rabbitMQInputOperator.setQueueName("test_2");

rabbitMQInputOperator.setExchangeType("fanout");

rabbitMQInputOperator.setExchange("new_exchange");

rabbitMQInputOperator.setHost("localhost");

rabbitMQInputOperator.setPort(5672);





Exception:

Exception Caused: due to mismatch in param while declaring queue as
“durable”

Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
protocol method: #method<channel.close>(reply-code=406,
reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for exchange
'new_exchange' in vhost '/': received 'false' but current is 'true',
class-id=40, method-id=10)

Reason:

   Existing queue was transient ( non-durable ) while code tries to create
durable queue with the same name.



   1.

   When Queue and exchange are not created ( declared ) in rabbitmq

Setup:

   -

   No queue and exchange are present in rabbitmq
   -

   App with following configuration ends up creating durable queue “test_2”
    in new exchange “new_exchange” as “fanout” type and routing key as “”.



rabbitMQInputOperator.setQueueName("test_2");

rabbitMQInputOperator.setExchangeType("fanout");

rabbitMQInputOperator.setExchange("new_exchange");

rabbitMQInputOperator.setHost("localhost");

rabbitMQInputOperator.setPort(5672);

Result:

Now external entities can pushed data to newly created exchange and queue





3) Queue and exchanges are already created rabbitmq with queue as durable
and exchange and exchange Type as specified specified.



Setup:



rabbitMQInputOperator.setQueueName("test");

rabbitMQInputOperator.setExchangeType("fanout");

rabbitMQInputOperator.setExchange("new_exchange");

          rabbitMQInputOperator.setRoutingKey("test");

rabbitMQInputOperator.setHost("localhost");

rabbitMQInputOperator.setPort(5672);

Result:

        Worked with no issues. But it demands that it durable queue has to
be created with proper exchange and routing key







4) Queuename not specified in an app but exchange and exchangeType
specified for Operator



Setup:

rabbitMQInputOperator.setExchangeType("fanout");

rabbitMQInputOperator.setExchange("new_exchange");

rabbitMQInputOperator.setHost("localhost");

rabbitMQInputOperator.setRoutingKey("test");

rabbitMQInputOperator.setPort(5672);





Result:

Operator ended up creating queue with unique names such as
“amq.gen-dHDsywLO-8eV8qZM4Q4T_w”
 which gets auto deleted when last consumer stops consuming from it.




Thanks & Regards,

Vikram