Improvement suggestions for AbstractRabbitMQInputOperator

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Improvement suggestions for AbstractRabbitMQInputOperator

vikram patil
Hello All,

While helping one of the Apex User, I found that current
AbstractRabbitMQInputOperator can be made simpler to use . After
investigation, I would like to suggest some improvements as below. If there
are more improvements needed, please suggest and I will incorporate those
suggestions to create Jira ticket.

In current code for AbstractRabbitMQInputOperator exchange, exchangeType
are made NotNull which doesn't allow app to be launched without specifying
these values. For an input Operator, we are actually trying to create
queues and exchanges with specified values. But it leads to conflict in
some scenarios when default exchange is used for the queue as well when
queue type is transient. To consume from rabbitmq, operator need to use
only QueueName, host and port of rabbitmq.  Similar to KafkaInputOperator
we can let the operator fail if QueueName is not specified and let
developer correct an application or specified it from configuration.

*Suggested Improvements:*

1) Drop requirements to specify exchange and its type .
2) We should not be attempting to create queue in Input Operator. For
consumer only queue name is sufficient to start consuming data from queue.
3) Currently queue name is optional, we should make it mandatory instead of
creating queue.

I tried out following scenarios to test existing operator .

*Scenarios with default exchange:*
*1) Queue is already created as non-durable with default exchange*
*Setup:*
rabbitMQInputOperator.setQueueName("test_2");
rabbitMQInputOperator.setExchangeType("fanout");
rabbitMQInputOperator.setExchange("");
rabbitMQInputOperator.setHost("localhost");
rabbitMQInputOperator.setPort(5672);

*Exception:*
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
protocol method: #method<channel.close>(reply-code=403,
reply-text=ACCESS_REFUSED - operation not permitted on the default
exchange, class-id=40, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66)
at
com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36)
at
com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:398)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:244)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:128

*Reason:*
Default exchange is specified in the code as “” . Though we are trying to
consume from specified and already created queue, operator crashes as
exchangeDeclare() call fails .

*2)Queue is not created before launching an app   *

*Setup:*
rabbitMQInputOperator.setQueueName("test");
rabbitMQInputOperator.setExchangeType("fanout");
rabbitMQInputOperator.setExchange("");
rabbitMQInputOperator.setHost("localhost");
rabbitMQInputOperator.setPort(5672);

*Exception:*
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
protocol method: #method<channel.close>(reply-code=403,
reply-text=ACCESS_REFUSED - operation not permitted on the default
exchange, class-id=40, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66)
at
com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36)
at
com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:398)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:244)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:128

*Reason:*
Current operator failed to create default exchange .

*Scnenarios with Custom Exchanges:*

*1)Queue “test_2” created as non-durable with exchange “new_exchange” in
rabbitmq*
*Setup:*
rabbitMQInputOperator.setQueueName("test_2");
rabbitMQInputOperator.setExchangeType("fanout");
rabbitMQInputOperator.setExchange("new_exchange");
rabbitMQInputOperator.setHost("localhost");
rabbitMQInputOperator.setPort(5672);

*Exception:*
Exception Caused: due to mismatch in param while declaring queue as
“durable”
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
protocol method: #method<channel.close>(reply-code=406,
reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for exchange
'new_exchange' in vhost '/': received 'false' but current is 'true',
class-id=40, method-id=10)
*Reason:*
Existing queue was transient ( non-durable ) while code tries to create
durable queue with the same name.

*2) When Queue and exchange are not created ( declared ) in rabbitmq*
*Setup:*
No queue and exchange are present in rabbitmq
App with following configuration ends up creating durable queue “test_2”
 in new exchange “new_exchange” as “fanout” type and routing key as “”.

rabbitMQInputOperator.setQueueName("test_2");
rabbitMQInputOperator.setExchangeType("fanout");
rabbitMQInputOperator.setExchange("new_exchange");
rabbitMQInputOperator.setHost("localhost");
rabbitMQInputOperator.setPort(5672);

*Result:*
Now external entities can pushed data to newly created exchange and queue
*3) Queue and exchanges are already created rabbitmq with queue as durable
and exchange and exchange Type as specified specified. *

*Setup:*

rabbitMQInputOperator.setQueueName("test");
rabbitMQInputOperator.setExchangeType("fanout");
rabbitMQInputOperator.setExchange("new_exchange");
rabbitMQInputOperator.setRoutingKey("test");
rabbitMQInputOperator.setHost("localhost");
rabbitMQInputOperator.setPort(5672);
*Result:*
Worked with no issues. But it demands that it durable queue has to be
created with proper exchange and routing key

*4) Queuename not specified in an app but exchange and exchangeType
specified for Operator *

*Setup:*
rabbitMQInputOperator.setExchangeType("fanout");
rabbitMQInputOperator.setExchange("new_exchange");
rabbitMQInputOperator.setHost("localhost");
rabbitMQInputOperator.setRoutingKey("test");
rabbitMQInputOperator.setPort(5672);

*Result:*
Operator ended up creating queue with unique names such as
“amq.gen-dHDsywLO-8eV8qZM4Q4T_w”  which gets auto deleted when last
consumer stops consuming from it.


Thanks & Regards,
Vikram
Reply | Threaded
Open this post in threaded view
|

Re: Improvement suggestions for AbstractRabbitMQInputOperator

Shubham Pathak
+1 for the improvements
This way the it will become easier for a new user to try out the operator
as well.

Thanks,
Shubham Pathak

On Fri, Jun 9, 2017 at 2:01 PM, vikram patil <[hidden email]> wrote:

> Hello All,
>
> While helping one of the Apex User, I found that current
> AbstractRabbitMQInputOperator can be made simpler to use . After
> investigation, I would like to suggest some improvements as below. If there
> are more improvements needed, please suggest and I will incorporate those
> suggestions to create Jira ticket.
>
> In current code for AbstractRabbitMQInputOperator exchange, exchangeType
> are made NotNull which doesn't allow app to be launched without specifying
> these values. For an input Operator, we are actually trying to create
> queues and exchanges with specified values. But it leads to conflict in
> some scenarios when default exchange is used for the queue as well when
> queue type is transient. To consume from rabbitmq, operator need to use
> only QueueName, host and port of rabbitmq.  Similar to KafkaInputOperator
> we can let the operator fail if QueueName is not specified and let
> developer correct an application or specified it from configuration.
>
> *Suggested Improvements:*
>
> 1) Drop requirements to specify exchange and its type .
> 2) We should not be attempting to create queue in Input Operator. For
> consumer only queue name is sufficient to start consuming data from queue.
> 3) Currently queue name is optional, we should make it mandatory instead of
> creating queue.
>
> I tried out following scenarios to test existing operator .
>
> *Scenarios with default exchange:*
> *1) Queue is already created as non-durable with default exchange*
> *Setup:*
> rabbitMQInputOperator.setQueueName("test_2");
> rabbitMQInputOperator.setExchangeType("fanout");
> rabbitMQInputOperator.setExchange("");
> rabbitMQInputOperator.setHost("localhost");
> rabbitMQInputOperator.setPort(5672);
>
> *Exception:*
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=403,
> reply-text=ACCESS_REFUSED - operation not permitted on the default
> exchange, class-id=40, method-id=10)
> at com.rabbitmq.utility.ValueOrException.getValue(
> ValueOrException.java:66)
> at
> com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(
> BlockingValueOrException.java:36)
> at
> com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.
> getReply(AMQChannel.java:398)
> at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:244)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:128
>
> *Reason:*
> Default exchange is specified in the code as “” . Though we are trying to
> consume from specified and already created queue, operator crashes as
> exchangeDeclare() call fails .
>
> *2)Queue is not created before launching an app   *
>
> *Setup:*
> rabbitMQInputOperator.setQueueName("test");
> rabbitMQInputOperator.setExchangeType("fanout");
> rabbitMQInputOperator.setExchange("");
> rabbitMQInputOperator.setHost("localhost");
> rabbitMQInputOperator.setPort(5672);
>
> *Exception:*
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=403,
> reply-text=ACCESS_REFUSED - operation not permitted on the default
> exchange, class-id=40, method-id=10)
> at com.rabbitmq.utility.ValueOrException.getValue(
> ValueOrException.java:66)
> at
> com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(
> BlockingValueOrException.java:36)
> at
> com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.
> getReply(AMQChannel.java:398)
> at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:244)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:128
>
> *Reason:*
> Current operator failed to create default exchange .
>
> *Scnenarios with Custom Exchanges:*
>
> *1)Queue “test_2” created as non-durable with exchange “new_exchange” in
> rabbitmq*
> *Setup:*
> rabbitMQInputOperator.setQueueName("test_2");
> rabbitMQInputOperator.setExchangeType("fanout");
> rabbitMQInputOperator.setExchange("new_exchange");
> rabbitMQInputOperator.setHost("localhost");
> rabbitMQInputOperator.setPort(5672);
>
> *Exception:*
> Exception Caused: due to mismatch in param while declaring queue as
> “durable”
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for exchange
> 'new_exchange' in vhost '/': received 'false' but current is 'true',
> class-id=40, method-id=10)
> *Reason:*
> Existing queue was transient ( non-durable ) while code tries to create
> durable queue with the same name.
>
> *2) When Queue and exchange are not created ( declared ) in rabbitmq*
> *Setup:*
> No queue and exchange are present in rabbitmq
> App with following configuration ends up creating durable queue “test_2”
>  in new exchange “new_exchange” as “fanout” type and routing key as “”.
>
> rabbitMQInputOperator.setQueueName("test_2");
> rabbitMQInputOperator.setExchangeType("fanout");
> rabbitMQInputOperator.setExchange("new_exchange");
> rabbitMQInputOperator.setHost("localhost");
> rabbitMQInputOperator.setPort(5672);
>
> *Result:*
> Now external entities can pushed data to newly created exchange and queue
> *3) Queue and exchanges are already created rabbitmq with queue as durable
> and exchange and exchange Type as specified specified. *
>
> *Setup:*
>
> rabbitMQInputOperator.setQueueName("test");
> rabbitMQInputOperator.setExchangeType("fanout");
> rabbitMQInputOperator.setExchange("new_exchange");
> rabbitMQInputOperator.setRoutingKey("test");
> rabbitMQInputOperator.setHost("localhost");
> rabbitMQInputOperator.setPort(5672);
> *Result:*
> Worked with no issues. But it demands that it durable queue has to be
> created with proper exchange and routing key
>
> *4) Queuename not specified in an app but exchange and exchangeType
> specified for Operator *
>
> *Setup:*
> rabbitMQInputOperator.setExchangeType("fanout");
> rabbitMQInputOperator.setExchange("new_exchange");
> rabbitMQInputOperator.setHost("localhost");
> rabbitMQInputOperator.setRoutingKey("test");
> rabbitMQInputOperator.setPort(5672);
>
> *Result:*
> Operator ended up creating queue with unique names such as
> “amq.gen-dHDsywLO-8eV8qZM4Q4T_w”  which gets auto deleted when last
> consumer stops consuming from it.
>
>
> Thanks & Regards,
> Vikram
>
Reply | Threaded
Open this post in threaded view
|

Re: Improvement suggestions for AbstractRabbitMQInputOperator

Mohit Jotwani
+1 - The queue_name, host, port should be sufficient to start reading the
messages.

Regards,
Mohit



On Fri, Jun 9, 2017 at 5:47 PM, Shubham Pathak <[hidden email]>
wrote:

> +1 for the improvements
> This way the it will become easier for a new user to try out the operator
> as well.
>
> Thanks,
> Shubham Pathak
>
> On Fri, Jun 9, 2017 at 2:01 PM, vikram patil <[hidden email]>
> wrote:
>
> > Hello All,
> >
> > While helping one of the Apex User, I found that current
> > AbstractRabbitMQInputOperator can be made simpler to use . After
> > investigation, I would like to suggest some improvements as below. If
> there
> > are more improvements needed, please suggest and I will incorporate those
> > suggestions to create Jira ticket.
> >
> > In current code for AbstractRabbitMQInputOperator exchange, exchangeType
> > are made NotNull which doesn't allow app to be launched without
> specifying
> > these values. For an input Operator, we are actually trying to create
> > queues and exchanges with specified values. But it leads to conflict in
> > some scenarios when default exchange is used for the queue as well when
> > queue type is transient. To consume from rabbitmq, operator need to use
> > only QueueName, host and port of rabbitmq.  Similar to KafkaInputOperator
> > we can let the operator fail if QueueName is not specified and let
> > developer correct an application or specified it from configuration.
> >
> > *Suggested Improvements:*
> >
> > 1) Drop requirements to specify exchange and its type .
> > 2) We should not be attempting to create queue in Input Operator. For
> > consumer only queue name is sufficient to start consuming data from
> queue.
> > 3) Currently queue name is optional, we should make it mandatory instead
> of
> > creating queue.
> >
> > I tried out following scenarios to test existing operator .
> >
> > *Scenarios with default exchange:*
> > *1) Queue is already created as non-durable with default exchange*
> > *Setup:*
> > rabbitMQInputOperator.setQueueName("test_2");
> > rabbitMQInputOperator.setExchangeType("fanout");
> > rabbitMQInputOperator.setExchange("");
> > rabbitMQInputOperator.setHost("localhost");
> > rabbitMQInputOperator.setPort(5672);
> >
> > *Exception:*
> > Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> > protocol method: #method<channel.close>(reply-code=403,
> > reply-text=ACCESS_REFUSED - operation not permitted on the default
> > exchange, class-id=40, method-id=10)
> > at com.rabbitmq.utility.ValueOrException.getValue(
> > ValueOrException.java:66)
> > at
> > com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(
> > BlockingValueOrException.java:36)
> > at
> > com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.
> > getReply(AMQChannel.java:398)
> > at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:244)
> > at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(
> AMQChannel.java:128
> >
> > *Reason:*
> > Default exchange is specified in the code as “” . Though we are trying to
> > consume from specified and already created queue, operator crashes as
> > exchangeDeclare() call fails .
> >
> > *2)Queue is not created before launching an app   *
> >
> > *Setup:*
> > rabbitMQInputOperator.setQueueName("test");
> > rabbitMQInputOperator.setExchangeType("fanout");
> > rabbitMQInputOperator.setExchange("");
> > rabbitMQInputOperator.setHost("localhost");
> > rabbitMQInputOperator.setPort(5672);
> >
> > *Exception:*
> > Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> > protocol method: #method<channel.close>(reply-code=403,
> > reply-text=ACCESS_REFUSED - operation not permitted on the default
> > exchange, class-id=40, method-id=10)
> > at com.rabbitmq.utility.ValueOrException.getValue(
> > ValueOrException.java:66)
> > at
> > com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(
> > BlockingValueOrException.java:36)
> > at
> > com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.
> > getReply(AMQChannel.java:398)
> > at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:244)
> > at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(
> AMQChannel.java:128
> >
> > *Reason:*
> > Current operator failed to create default exchange .
> >
> > *Scnenarios with Custom Exchanges:*
> >
> > *1)Queue “test_2” created as non-durable with exchange “new_exchange” in
> > rabbitmq*
> > *Setup:*
> > rabbitMQInputOperator.setQueueName("test_2");
> > rabbitMQInputOperator.setExchangeType("fanout");
> > rabbitMQInputOperator.setExchange("new_exchange");
> > rabbitMQInputOperator.setHost("localhost");
> > rabbitMQInputOperator.setPort(5672);
> >
> > *Exception:*
> > Exception Caused: due to mismatch in param while declaring queue as
> > “durable”
> > Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> > protocol method: #method<channel.close>(reply-code=406,
> > reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for exchange
> > 'new_exchange' in vhost '/': received 'false' but current is 'true',
> > class-id=40, method-id=10)
> > *Reason:*
> > Existing queue was transient ( non-durable ) while code tries to create
> > durable queue with the same name.
> >
> > *2) When Queue and exchange are not created ( declared ) in rabbitmq*
> > *Setup:*
> > No queue and exchange are present in rabbitmq
> > App with following configuration ends up creating durable queue “test_2”
> >  in new exchange “new_exchange” as “fanout” type and routing key as “”.
> >
> > rabbitMQInputOperator.setQueueName("test_2");
> > rabbitMQInputOperator.setExchangeType("fanout");
> > rabbitMQInputOperator.setExchange("new_exchange");
> > rabbitMQInputOperator.setHost("localhost");
> > rabbitMQInputOperator.setPort(5672);
> >
> > *Result:*
> > Now external entities can pushed data to newly created exchange and queue
> > *3) Queue and exchanges are already created rabbitmq with queue as
> durable
> > and exchange and exchange Type as specified specified. *
> >
> > *Setup:*
> >
> > rabbitMQInputOperator.setQueueName("test");
> > rabbitMQInputOperator.setExchangeType("fanout");
> > rabbitMQInputOperator.setExchange("new_exchange");
> > rabbitMQInputOperator.setRoutingKey("test");
> > rabbitMQInputOperator.setHost("localhost");
> > rabbitMQInputOperator.setPort(5672);
> > *Result:*
> > Worked with no issues. But it demands that it durable queue has to be
> > created with proper exchange and routing key
> >
> > *4) Queuename not specified in an app but exchange and exchangeType
> > specified for Operator *
> >
> > *Setup:*
> > rabbitMQInputOperator.setExchangeType("fanout");
> > rabbitMQInputOperator.setExchange("new_exchange");
> > rabbitMQInputOperator.setHost("localhost");
> > rabbitMQInputOperator.setRoutingKey("test");
> > rabbitMQInputOperator.setPort(5672);
> >
> > *Result:*
> > Operator ended up creating queue with unique names such as
> > “amq.gen-dHDsywLO-8eV8qZM4Q4T_w”  which gets auto deleted when last
> > consumer stops consuming from it.
> >
> >
> > Thanks & Regards,
> > Vikram
> >
>



--

Regards,

___________________________________________________

*Mohit Jotwani*

Product Manager

E: [hidden email] | M: +91 97699 62740

www.datatorrent.com  |  apex.apache.org
Reply | Threaded
Open this post in threaded view
|

Re: Improvement suggestions for AbstractRabbitMQInputOperator

vikram patil
I have created Jira for this improvement .
https://issues.apache.org/jira/browse/APEXMALHAR-2509

-Vikram

On Mon, Jun 12, 2017 at 1:47 PM, Mohit Jotwani <[hidden email]>
wrote:

> +1 - The queue_name, host, port should be sufficient to start reading the
> messages.
>
> Regards,
> Mohit
>
>
>
> On Fri, Jun 9, 2017 at 5:47 PM, Shubham Pathak <[hidden email]>
> wrote:
>
> > +1 for the improvements
> > This way the it will become easier for a new user to try out the operator
> > as well.
> >
> > Thanks,
> > Shubham Pathak
> >
> > On Fri, Jun 9, 2017 at 2:01 PM, vikram patil <[hidden email]>
> > wrote:
> >
> > > Hello All,
> > >
> > > While helping one of the Apex User, I found that current
> > > AbstractRabbitMQInputOperator can be made simpler to use . After
> > > investigation, I would like to suggest some improvements as below. If
> > there
> > > are more improvements needed, please suggest and I will incorporate
> those
> > > suggestions to create Jira ticket.
> > >
> > > In current code for AbstractRabbitMQInputOperator exchange,
> exchangeType
> > > are made NotNull which doesn't allow app to be launched without
> > specifying
> > > these values. For an input Operator, we are actually trying to create
> > > queues and exchanges with specified values. But it leads to conflict in
> > > some scenarios when default exchange is used for the queue as well when
> > > queue type is transient. To consume from rabbitmq, operator need to use
> > > only QueueName, host and port of rabbitmq.  Similar to
> KafkaInputOperator
> > > we can let the operator fail if QueueName is not specified and let
> > > developer correct an application or specified it from configuration.
> > >
> > > *Suggested Improvements:*
> > >
> > > 1) Drop requirements to specify exchange and its type .
> > > 2) We should not be attempting to create queue in Input Operator. For
> > > consumer only queue name is sufficient to start consuming data from
> > queue.
> > > 3) Currently queue name is optional, we should make it mandatory
> instead
> > of
> > > creating queue.
> > >
> > > I tried out following scenarios to test existing operator .
> > >
> > > *Scenarios with default exchange:*
> > > *1) Queue is already created as non-durable with default exchange*
> > > *Setup:*
> > > rabbitMQInputOperator.setQueueName("test_2");
> > > rabbitMQInputOperator.setExchangeType("fanout");
> > > rabbitMQInputOperator.setExchange("");
> > > rabbitMQInputOperator.setHost("localhost");
> > > rabbitMQInputOperator.setPort(5672);
> > >
> > > *Exception:*
> > > Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> > > protocol method: #method<channel.close>(reply-code=403,
> > > reply-text=ACCESS_REFUSED - operation not permitted on the default
> > > exchange, class-id=40, method-id=10)
> > > at com.rabbitmq.utility.ValueOrException.getValue(
> > > ValueOrException.java:66)
> > > at
> > > com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(
> > > BlockingValueOrException.java:36)
> > > at
> > > com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.
> > > getReply(AMQChannel.java:398)
> > > at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:244)
> > > at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(
> > AMQChannel.java:128
> > >
> > > *Reason:*
> > > Default exchange is specified in the code as “” . Though we are trying
> to
> > > consume from specified and already created queue, operator crashes as
> > > exchangeDeclare() call fails .
> > >
> > > *2)Queue is not created before launching an app   *
> > >
> > > *Setup:*
> > > rabbitMQInputOperator.setQueueName("test");
> > > rabbitMQInputOperator.setExchangeType("fanout");
> > > rabbitMQInputOperator.setExchange("");
> > > rabbitMQInputOperator.setHost("localhost");
> > > rabbitMQInputOperator.setPort(5672);
> > >
> > > *Exception:*
> > > Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> > > protocol method: #method<channel.close>(reply-code=403,
> > > reply-text=ACCESS_REFUSED - operation not permitted on the default
> > > exchange, class-id=40, method-id=10)
> > > at com.rabbitmq.utility.ValueOrException.getValue(
> > > ValueOrException.java:66)
> > > at
> > > com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(
> > > BlockingValueOrException.java:36)
> > > at
> > > com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.
> > > getReply(AMQChannel.java:398)
> > > at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:244)
> > > at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(
> > AMQChannel.java:128
> > >
> > > *Reason:*
> > > Current operator failed to create default exchange .
> > >
> > > *Scnenarios with Custom Exchanges:*
> > >
> > > *1)Queue “test_2” created as non-durable with exchange “new_exchange”
> in
> > > rabbitmq*
> > > *Setup:*
> > > rabbitMQInputOperator.setQueueName("test_2");
> > > rabbitMQInputOperator.setExchangeType("fanout");
> > > rabbitMQInputOperator.setExchange("new_exchange");
> > > rabbitMQInputOperator.setHost("localhost");
> > > rabbitMQInputOperator.setPort(5672);
> > >
> > > *Exception:*
> > > Exception Caused: due to mismatch in param while declaring queue as
> > > “durable”
> > > Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> > > protocol method: #method<channel.close>(reply-code=406,
> > > reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for
> exchange
> > > 'new_exchange' in vhost '/': received 'false' but current is 'true',
> > > class-id=40, method-id=10)
> > > *Reason:*
> > > Existing queue was transient ( non-durable ) while code tries to create
> > > durable queue with the same name.
> > >
> > > *2) When Queue and exchange are not created ( declared ) in rabbitmq*
> > > *Setup:*
> > > No queue and exchange are present in rabbitmq
> > > App with following configuration ends up creating durable queue
> “test_2”
> > >  in new exchange “new_exchange” as “fanout” type and routing key as “”.
> > >
> > > rabbitMQInputOperator.setQueueName("test_2");
> > > rabbitMQInputOperator.setExchangeType("fanout");
> > > rabbitMQInputOperator.setExchange("new_exchange");
> > > rabbitMQInputOperator.setHost("localhost");
> > > rabbitMQInputOperator.setPort(5672);
> > >
> > > *Result:*
> > > Now external entities can pushed data to newly created exchange and
> queue
> > > *3) Queue and exchanges are already created rabbitmq with queue as
> > durable
> > > and exchange and exchange Type as specified specified. *
> > >
> > > *Setup:*
> > >
> > > rabbitMQInputOperator.setQueueName("test");
> > > rabbitMQInputOperator.setExchangeType("fanout");
> > > rabbitMQInputOperator.setExchange("new_exchange");
> > > rabbitMQInputOperator.setRoutingKey("test");
> > > rabbitMQInputOperator.setHost("localhost");
> > > rabbitMQInputOperator.setPort(5672);
> > > *Result:*
> > > Worked with no issues. But it demands that it durable queue has to be
> > > created with proper exchange and routing key
> > >
> > > *4) Queuename not specified in an app but exchange and exchangeType
> > > specified for Operator *
> > >
> > > *Setup:*
> > > rabbitMQInputOperator.setExchangeType("fanout");
> > > rabbitMQInputOperator.setExchange("new_exchange");
> > > rabbitMQInputOperator.setHost("localhost");
> > > rabbitMQInputOperator.setRoutingKey("test");
> > > rabbitMQInputOperator.setPort(5672);
> > >
> > > *Result:*
> > > Operator ended up creating queue with unique names such as
> > > “amq.gen-dHDsywLO-8eV8qZM4Q4T_w”  which gets auto deleted when last
> > > consumer stops consuming from it.
> > >
> > >
> > > Thanks & Regards,
> > > Vikram
> > >
> >
>
>
>
> --
>
> Regards,
>
> ___________________________________________________
>
> *Mohit Jotwani*
>
> Product Manager
>
> E: [hidden email] | M: +91 97699 62740
>
> www.datatorrent.com  |  apex.apache.org
>