The join node
Join data from two or more nodes.
If the merge_field
parameter is given, the node will merge the fields given from every in-node, instead of
joining with prefixes. (See merge example below).
If you want to enrich a stream of data with a second stream consider using the combine node.
Example
def v1 =
|value_emitter()
.every(3s)
.align()
def v2 =
|value_emitter()
.every(5s)
.align()
v1
|join(v2)
.prefix('v1.joined', 'v2.joined')
.tolerance(3s)
.missing_timeout(3s)
Joins the fields of v1
and v2
and produces a stream, that has the fields v1.joined.val
and v2.joined.val
Node Parameters
Parameter | Description | Default |
---|---|---|
[node] nodes( node_list ) |
list of node (chains) to merge | [] |
Parameters
Parameter | Description | Default |
---|---|---|
prefix( string_list ) |
list of prefixes (used in join mode) | ['', ''] (no prefixes) |
merge_field( string ) |
when given, the join node will do a field merge operation | undefined |
missing_timeout( duration ) |
values that do not arrive within this timeout will be treated as missing | 20s |
tolerance( duration ) |
timestamp tolerance. Determines the maximum difference a data-item's timestamp can have to the current timeslot, for the item to be included in the join operation. | 2s |
~fill( 'none' 'null' any ) |
deprecated, use full instead, the nodes default behaviour stayed the same |
'none' |
full( boolean ) |
whether to output full joins (no value missing) only, this would drop joins that are not complete | true |
fill value - join behaviour (deprecated)
'none'
- (default) skip rows, where a point is missing, inner join.'null'
- fill missing points with null, full outer join.Any value
- fill fields with given value, full outer join.
Merge example
Let's look at an example where the streams coming out of two nodes are not joined with prefixes, but a merge operation is performed.
def v1 =
|json_emitter()
.every(3s)
.json(' {"condition": {"id": 0, "name": "OK", "sub_cond":
[{"value": 33}]}, "condition_reason": "",
"predicted_maintenance_time": 1584246411783,
"vac_on_without_contact": [1.2, 2.5, 4.33]} ')
.as('data')
def v2 =
|json_emitter()
.every(3s)
.json(' {"condition": {"id1": 0, "name1": "OK", "sub_cond":
[{"number": 44}]}, "condition_reason": "",
"predicted_maintenance_time": 1584246411783,
"vac_on_without_contact": [2.2, 2.5, 4.33],
"vac_on_with_contact": [5.6, 45.98, 7.012]} ')
.as('data')
v1
|join(v2)
.merge_field('data')
.tolerance(20ms)
.missing_timeout(30ms)
.full(false)
|debug()
v1 node data-field (in json format for readability):
{"condition": {"id": 0, "name": "OK", "sub_cond":
[{"value": 33}]}, "condition_reason": "Reason",
"predicted_maintenance_time": 1584246411783,
"vac_on_without_contact": [1.2, 2.5, 4.33]
}
v2 node data-field:
{"condition": {"id1": 0, "name1": "OK", "sub_cond":
[{"number": 44}]}, "condition_reason": "",
"predicted_maintenance_time": 1584246411785,
"vac_on_without_contact": [2.2, 2.5, 4.33],
"vac_on_with_contact": [5.6, 45.98, 7.012]
}
The result data-field after merge (json format here):
{"condition":
{"name1":"OK","name":"OK","id1":0,"id":0,
"sub_cond":[{"number":44}, {"value":33}]
},
"predicted_maintenance_time":1584246411785,
"vac_on_without_contact":[1.2,2.2,2.5,2.5,4.33,4.33],
"vac_on_with_contact":[5.6,45.98,7.012],
"condition_reason":""
}
Objects and lists and lists of objects will be merged.
If a path exists in several streams, the value in the first stream is superseded by the value in a following stream ("condition_reason" and "predicted_maintenance_time" in this example). Except for lists, which will be merged ("vac_on_without_contact").