The amqp_consume node
Consume data from an amqp-broker like RabbitMQ
.
Once a data-item is received by the node, it will be immediately stored in an on-disk queue for data-safety. Only after this will the item be acknowledged to the amqp broker.
At the moment this node can only setup and work with topic
exchanges.
Example
|amqp_consume()
.host('deves-amqp-cluster1.internal')
.bindings('my.routing.key')
.exchange('x_xchange')
.queue('faxe_test')
.dt_field('UTC-Time')
.dt_format('float_micro')
Parameters
Parameter | Description | Default |
---|---|---|
host( string ) |
Ip address or hostname of the broker | |
port( integer ) |
The broker's port | 5672 / from config file |
user( string ) |
AMQP user | from config file |
pass( string ) |
AMQP password | from config file |
vhost( string ) |
vhost to connect to on the broker | '/' |
routing_key( string ) |
routing key to use for queue binding | undefined |
bindings( string_list ) |
queue-bindings | [] |
queue( string ) |
name of the queue to bind to the exchange | |
exchange( string ) |
name of the exchange to bind to the source exchange | |
prefetch( integer ) |
prefetch count to use | 70 |
dt_field( string ) |
name of the timestamp field that is expected | 'ts' |
dt_format( string ) |
timestamp or datetime format that is expected (see table below) | 'millisecond' |
include_topic ( bool ) |
whether to include the routingkey in the resulting datapoints | true |
topic_as ( string ) |
if include_topic is true, this will be the fieldname for the routingkey value |
'topic' |
as ( string ) |
base object for the output data-point | undefined |
ssl( is_set ) | whether to use ssl | false (not set) |
confirm ( boolean ) |
whether to acknowledge consumed messages to the amqp broker, when set to false , throughput can be increased with the danger of data-loss |
true |
safe ( boolean ) |
whether to use faxe's internal queue. If true , messages consumed from the amqp broker will be stored in an internal ondisc queue before they get sent to downstream nodes, to avoid losing data. |
false |
Exactly one of these must be provided:
routing_key
,bindings
.
Available datetime formats
dt_format | description | example |
---|---|---|
'millisecond' | timestamp UTC in milliseconds | 1565343079000 |
'second' | timestamp UTC in seconds | 1565343079 |
'float_micro' | timestamp UTC float with microsecond precision | 1565343079.173588 |
'float_millisecond' | timestamp UTC float with millisecond precision | 1565343079.173 |
'ISO8601' | ISO8601 Datetime format string | '2011-10-05T14:48:00.000Z' |
'RFC3339' | RFC3339 Datetime format string | '2018-02-01 15:18:02.088Z' |
'convtrack_datetime' | special datetime format used in the conveyor tracking data stream | '19.08.01 17:33:44,867 ' |