The amqp_consume node
Consume data from an amqp-broker like RabbitMQ.
This node accepts regular amqp routing keys as well as MQTT style topic strings for bindings/routing_key.
In safe mode
Once a data-item is received by the node, it will be immediately stored in an on-disk queue for data-safety. Only after this will the item be acknowledged to the amqp broker.
Message deduplication
If the amqp correlation-id property is set (to a unique value per message), this node can perform efficient message deduplication.
See amqp_publish for details on this.
Prefetch count, ack_every and dedup_size
For a description of these settings, see table below.
As they relate to one another in some kind, here is a rule of thumb for how to set ack_every and dedup_size
when prefetch is changed:
- set
ack_everyto one third ofprefetch - set
dedup_sizeto 3 times theprefetchvalue
Example: prefetch = 100, ack_every = 35, dedup_size = 300
At the moment this node can only set up and work with topic exchanges.
Example
|amqp_consume()
.host('deves-amqp-cluster1.internal')
.bindings('my.routing.key')
.exchange('x_xchange')
.queue('faxe_test')
.dt_field('UTC-Time')
.dt_format('float_micro')
Parameters
| Parameter | Description | Default |
|---|---|---|
host( string ) |
Ip address or hostname of the broker | from config |
port( integer ) |
The broker's port | 5672 / from config |
user( string ) |
AMQP user | from config |
pass( string ) |
AMQP password | from config |
vhost( string ) |
vhost to connect to on the broker | '/' |
routing_key( string ) deprecated |
routing key to use for queue binding | undefined |
bindings( string_list ) |
list of queue bindings keys | [] |
queue( string ) |
name of the queue to bind to the exchange | FlowName + '_' + NodeName |
queue_prefix( string ) |
prefix for the queue-name that will be ensured to exist for queue |
from config |
exchange( string ) |
name of the exchange to bind to the source exchange | FlowName + '_' + NodeName |
exchange_prefix( string ) |
prefix for the exchange-name that will be ensured to exist for exchange |
from config |
prefetch( integer ) |
prefetch count to use | 70 |
consumer_tag( string ) |
Identifier for the queue consumer | 'c_' + FlowName + '_' + NodeName |
ack_every( integer ) |
number of messages to consume before acknowledging them to the broker | 20 |
ack_after( duration ) |
timeout after which all currently not acknowledged messages will be acknowledged, regardless of the ack_every setting |
5s |
use_flow_ack( bool ) |
special ack mode, where message acknowledgement is dependend on other nodes in the flow (see crate_out), ack_every and ack_after have no effect with this mode |
false |
dedup_size( integer ) |
number of correlation-ids to hold in memory for message deduplication | 200 |
dt_field( string ) |
name of the timestamp field that is expected | 'ts' |
dt_format( string ) |
timestamp or datetime format that is expected (see datetime-parsing) | 'millisecond' |
include_topic ( bool ) |
whether to include the routingkey in the resulting datapoints | true |
topic_as ( string ) |
if include_topic is true, this will be the fieldname for the routingkey value |
'topic' |
as ( string ) |
base object for the output data-point | undefined |
| ssl( is_set ) | whether to use ssl, if true, ssl options from faxe's config for amqp connections will be used | false (not set) |
confirm ( boolean ) |
whether to acknowledge consumed messages to the amqp broker, when set to false, throughput can be increased with the danger of data-loss |
true |
safe ( boolean ) |
whether to use faxe's internal queue. If true, messages consumed from the amqp broker will be stored in an internal ondisc queue before they get sent to downstream nodes, to avoid losing data. |
false |
Exactly one of these must be provided:
routing_key,bindings.