The combine node

Combine the values from 2 nodes, used to enrich a stream of data with data from another stream, that usually has lower frequency.

Port 1 is the trigger port and its the port where data to be enriched comes into the node. Port 2 is the one where enrichment data come in. Every time a value is received on the trigger port, the node will emit a value, combined with whatever current value on port 2. The node will never emit on port 2 values.

No output is given, as long as there has not arrived a value on port 2 to combine with.


fields

The fields parameter defines the fields to inject into the combination for the stream on port 2. To rename these fields, parameter prefix or aliases can be used. With prefix_delimiter a delimiter can be given, defaults to: '_'


merge

When merge_field is given, the node will merge the values from the input port 2 with the values from port 1. Objects and lists and lists of objects will be merged.

If a path exists in both streams, the value in the first stream is superseded by the value in the second stream (in2). Except for lists, which will be combined.


Either fields(optionally with prefix or aliases) or merge_field must be given.

If you want to join 2 or more streams consider using the join node.

Examples

 def in1 =
  |value_emitter()
  .every(500ms)
  .type(point)
  .fields('val')

 def in2 =
  |value_emitter()
  .every(4s)
  .type(point)
  .fields('val2', 'val3') 

 in1
  |combine(in2)
  .fields('val2', 'val3')
  .prefix('comb')
  .prefix_delimiter('_')

In this example values from the stream called in1 will be enriched with values from in2. Outputfields will be called: val, comb_val2 and comb_val3 . The flow will emit every 500 milliseconds after 4 seconds have past initially.


 def in1 =
  |value_emitter()
  .every(500ms)
  .type(point)
  .fields('data.val')

 def in2 =
  |value_emitter()
  .every(4s)
  .type(point)
  .fields('data.val2', 'data.val3') 

 in1
  |combine(in2)
  .merge_field('data') 

This example will merge data from in2 into in1, such that the resulting data-point will have the fields: data.val, data.val2, data.val3


def v1 =
|json_emitter()
.every(1s)
.json(' {"condition": {"id": 0, "name": "OK", "sub_cond":
        [{"value": 33}]}, "condition_reason": "",
        "predicted_maintenance_time": 1584246411783,
        "vac_on_without_contact": [1.2, 2.5, 4.33]} ')

def v2 =
|json_emitter()
.every(5s)
.json(' {"condition": {"id1": 0, "name1": "OK", "sub_cond":
        [{"number": 44}]}, "condition_reason": "",
        "predicted_maintenance_time": 1584246411785,
        "vac_on_without_contact": [2.2, 2.5, 4.33],
        "vac_on_with_contact": [5.6, 45.98, 7.012]} ')

v1
    |combine(v2)
    .merge_field('data') 

The output from the above example will be:

{"data":
    {"condition": {"id1":0,"id":0,"name1":"OK","name":"OK",
    "sub_cond":[{"number":44},{"value":33}]},
    "condition_reason":"",
    "predicted_maintenance_time":1584246411785,
    "vac_on_without_contact":[1.2,2.2,2.5,2.5,4.33,4.33],
    "vac_on_with_contact":[5.6,45.98,7.012]
    }

Parameters

Parameter Description Default
[node] (port) input node for port 2
merge_field( string ) Base field for the merge operation []
fields( string_list ) List of fields to include []
tags( string_list ) List of tags to include []
aliases( string_list ) List of field aliases to use instead of the original field names []
prefix( string ) Prefix for the injected fields from stream 2 undefined
prefix_delimiter( string ) Used to separate prefix and the original field name from stream 2 '_'
nofill ( isset ) if set, dataoutput will happen regardless of a initial input on port 2 false (not set)

When merge_field is given the params fields, prefix and prefix_delimiter have no effect. Otherwise either prefix or aliases must be given these are mutually exclusive parameters. If both are given, then prefix will win.