The amqp_consume node

Consume data from an amqp-broker like RabbitMQ.

This node accepts regular amqp routing keys as well as MQTT style topic strings for bindings/routing_key.

In safe mode

Once a data-item is received by the node, it will be immediately stored in an on-disk queue for data-safety. Only after this will the item be acknowledged to the amqp broker.

Message deduplication

If the amqp correlation-id property is set (to a unique value per message), this node can perform efficient message deduplication.

See amqp_publish for details on this.


Prefetch count, ack_every and dedup_size

For a description of these settings, see table below.

As they relate to one another in some kind, here is a rule of thumb for how to set ack_every and dedup_size when prefetch is changed:

  • set ack_every to one third of prefetch
  • set dedup_size to 3 times the prefetch value

Example: prefetch = 100, ack_every = 35, dedup_size = 300


At the moment this node can only set up and work with topic exchanges.

Example

|amqp_consume()
.host('deves-amqp-cluster1.internal') 
.bindings('my.routing.key')
.exchange('x_xchange')
.queue('faxe_test')
.dt_field('UTC-Time')
.dt_format('float_micro')

Parameters

Parameter Description Default
host( string ) Ip address or hostname of the broker from config
port( integer ) The broker's port 5672 / from config
user( string ) AMQP user from config
pass( string ) AMQP password from config
vhost( string ) vhost to connect to on the broker '/'
routing_key( string ) deprecated routing key to use for queue binding undefined
bindings( string_list ) list of queue bindings keys []
queue( string ) name of the queue to bind to the exchange FlowName + '_' + NodeName
queue_prefix( string ) prefix for the queue-name that will be ensured to exist for queue from config
exchange( string ) name of the exchange to bind to the source exchange FlowName + '_' + NodeName
exchange_prefix( string ) prefix for the exchange-name that will be ensured to exist for exchange from config
prefetch( integer ) prefetch count to use 70
consumer_tag( string ) Identifier for the queue consumer 'c_' + FlowName + '_' + NodeName
ack_every( integer ) number of messages to consume before acknowledging them to the broker 20
ack_after( duration ) timeout after which all currently not acknowledged messages will be acknowledged, regardless of the ack_every setting 5s
use_flow_ack( bool ) special ack mode, where message acknowledgement is dependend on other nodes in the flow (see crate_out), ack_every and ack_after have no effect with this mode false
dedup_size( integer ) number of correlation-ids to hold in memory for message deduplication 200
dt_field( string ) name of the timestamp field that is expected 'ts'
dt_format( string ) timestamp or datetime format that is expected (see datetime-parsing) 'millisecond'
include_topic ( bool ) whether to include the routingkey in the resulting datapoints true
topic_as ( string ) if include_topic is true, this will be the fieldname for the routingkey value 'topic'
as ( string ) base object for the output data-point undefined
ssl( is_set ) whether to use ssl, if true, ssl options from faxe's config for amqp connections will be used false (not set)
confirm ( boolean ) whether to acknowledge consumed messages to the amqp broker, when set to false, throughput can be increased with the danger of data-loss true
safe ( boolean ) whether to use faxe's internal queue. If true, messages consumed from the amqp broker will be stored in an internal ondisc queue before they get sent to downstream nodes, to avoid losing data. false

Exactly one of these must be provided: routing_key, bindings.