The amqp_consume node
Consume data from an amqp-broker like RabbitMQ.
In safe mode
Once a data-item is received by the node, it will be immediately stored in an on-disk queue for data-safety. Only after this will the item be acknowledged to the amqp broker.
If the amqp
correlation-id property is set (to a unique value per message), this node can perform efficient message deduplication.
See amqp_publish for details on this.
Prefetch count, ack_every and dedup_size
For a description of these settings, see table below.
As they relate to one another in some kind, here is a rule of thumb for how to set
prefetch is changed:
ack_everyto one third of
dedup_sizeto 3 times the
Example: prefetch = 100, ack_every = 35, dedup_size = 300
At the moment this node can only set up and work with
|amqp_consume() .host('deves-amqp-cluster1.internal') .bindings('my.routing.key') .exchange('x_xchange') .queue('faxe_test') .dt_field('UTC-Time') .dt_format('float_micro')
||Ip address or hostname of the broker||from config|
||The broker's port||5672 / from config|
||AMQP user||from config|
||AMQP password||from config|
||vhost to connect to on the broker||'/'|
||routing key to use for queue binding||undefined|
||list of queue bindings keys|||
||name of the queue to bind to the exchange|
||prefix for the queue-name that will be ensured to exist for
||name of the exchange to bind to the source exchange|
||prefix for the exchange-name that will be ensured to exist for
||prefetch count to use||70|
||Identifier for the queue consumer, defaults to a combination of flow-id and node-id||undefined|
||number of messages to consume before acknowledging them to the broker||20|
||timeout after which all currently not acknowledged messages will be acknowledged, regardless of the
||number of correlation-ids to hold in memory for message deduplication||200|
||name of the timestamp field that is expected||'ts'|
||timestamp or datetime format that is expected (see datetime-parsing)||'millisecond'|
||whether to include the routingkey in the resulting datapoints||true|
||base object for the output data-point||undefined|
|ssl( is_set )||whether to use ssl, if true, ssl options from faxe's config for amqp connections will be used||false (not set)|
||whether to acknowledge consumed messages to the amqp broker, when set to
||whether to use faxe's internal queue. If
Exactly one of these must be provided: