The collect node

Experimental.

Since 0.15.2

The collect node will maintain a collection of data-points based on some criteria given as lambda-expressions. It will output a data_batch item regularily (when emit_every is given) or on every incoming item.

Example

%% input data ->

|json_emitter()
.every(500ms)
.json(
<<<{"code" : {"id": 224, "name" : "224"}, "message": "this is a test", "mode": 1}>>>,
<<<{"code" : {"id": 334, "name" : "334"}, "message": "this is another test", "mode": 1}>>>,
<<<{"code" : {"id": 114, "name" : "114"}, "message": "this is another test", "mode": 2}>>>,
<<<{"code" : {"id": 443, "name" : "443"}, "message": "this is another test", "mode": 1}>>>, 
<<<{"code" : {"id": 111, "name" : "111"}, "message": "this is another test", "mode": 1}>>>,
<<<{"code" : {"id": 443, "name" : "443"}, "message": "this is another test", "mode": 0}>>>,
<<<{"code" : {"id": 224, "name" : "224"}, "message": "this is another test", "mode": 0}>>>,
<<<{"code" : {"id": 111, "name" : "111"}, "message": "this is another test", "mode": 0}>>>,
<<<{"code" : {"id": 334, "name" : "334"}, "message": "this is another test", "mode": 0}>>>,
<<<{"code" : {"id": 551, "name" : "551"}, "message": "this is another test", "mode": 2}>>>,
<<<{"code" : {"id": 551, "name" : "551"}, "message": "this is another test", "mode": 0}>>>
)
.as('data')

%% collect 2 fields ('data.code' and 'data.message') with the key-field 'data.code'.
%% this node will output a data_batch item with a list of data-points
%% where the original timestamp and meta-data is preserved and
%% containing the fields mentioned before every 10 secondes

|collect()
%% the collect node will build an internal buffer with the value of the 'key_field' as index
.key_field('data.code')
%% criterion for adding a data-point to the internal collection buffer
.add(lambda: "data.mode" > 0)
%% criterion for removal of values
.remove(lambda: "data.mode" == 0)
%% we keep these fields in the resulting data_batch
.keep('data.code', 'data.message')
%% collection of type set, so no duplicates
.type('set')
.emit_every(10s)

%% make sense of the data-collection in counting the 'data.code' values
|aggregate()
.fields('data.code')
.functions('count')
.as('code_count')

|debug()

Parameters

Parameter Description Default
key_field(string) The value of the key-field will be used as an index for the collection, can be any data-type.
add(lambda) Criterion for adding an incoming point to the collection, must return a boolean value.
remove(lambda) Criterion for removing an incoming point from the collection, must return a boolen value.
keep(string_list) If given, these field will be kept from every data-point, if not given, the whole item will be kept. undefined
type (string) The type of the collection: 'list' or 'set' (no duplicates) 'set'
emit_every (duration) Interval at which to emit the current collection as a data_batch item. undefined