Define data formats
Use the set of pre-defined data formats or define your own data formats as a mapping from the source data format to target data types in CDF.
A format can produce multiple different types of resources. The output must be a list of JSON objects, or a single JSON object, where each match an output resource schema described below.
Datapoints
Each output is a single CDF datapoint mapped to a time series in CDF. If the time series does not exist, it will be created.
type
- Set to datapoint.externalId
- The external ID of the time series to insert the datapoint into.timestamp
- The timestamp of the datapoint, given as a millisecond Unix timestamp.value
- The value of the datapoint. as a number or a string.Events
Each output is a single CDF event. If the provided event has an external ID, and it already exists, then the old event will be updated. Null fields will be ignored when updating events. This is almost identical to the "Create event" payload to the CDF API. See the API docs on events for details.
type
- Set to event.startTime
- The start time of the event, given as a millisecond Unix timestamp.endTime
- The end time of the event, given as a millisecond Unix timestamp.eventType
- Thetype
field of the event in CDF.subtype
- Thesubtype
field of the event in CDF.description
- Thedescription
field of the event in CDF.metadata
- An object containing key/value pairs for event metadata.assetIds
- A list of asset external IDs to link the event to. If the assets do not exist, they will not be created.source
- Thesource
field of the event in CDF.
Quickstart guide
This section explains how you can set up your own data formats for hosted extractors in CDF using the MQTT extractor as an example.
The language you must use to transform the data is inspired by JavaScript. If you want a complete overview and descriptions of all the available functions, see All built-in functions.
For more details about the mapping language, see Mapping concepts.
Single datapoint in a message
The section below describes how to set up set up data formats using a broker where the data comes in as values, and the topic indicates which sensor generated the value. For example, if you have a sensor that publishes the message
23.5
on the topic /myhouse/groundfloor/livingroom/temperature
and you want to map this
to a target in CDF. In this example, the data is ingested as a
datapoint in a time series, where the content is the value, the time the message
was received as the timestamp, and the topic path as the external ID of the time
series.
These are the input objects for defining the format:
input
, which is the content of the message receivedcontext
, which contains information about the message, such as which topic it arrived on
To make a datapoint from this message, create a JSON object with four fields:
value
: the value of the datapointtimestamp
: the timestamp for the datapoint given as a millisecond Unix timestamp. Use the built-innow()
function to get the current time on this format.externalId
: the external ID of the time series to insert this datapoint intotype
, set todatapoint
to tell the extractor that this JSON object describes a datapoint
The final transform looks like:
{
"value": input,
"timestamp": now(),
"externalId": context.topic,
"type": "datapoint"
}
Handling more data in a single message
This section describes how to set up data formats while subscribing to a topic that contains the external ID of the time series getting the data, but the message payloads are now lists of datapoints instead of just a single value:
{
"sensorData": [
{
"datetime": "2023-06-13T14:52:34",
"temperature": 21.4
},
{
"datetime": "2023-06-13T14:59:53",
"temperature": 22.1
},
{
"datetime": "2023-06-13T15:23:42",
"temperature": 24.0
}
]
}
The content of messages will be automatically parsed as JSON, and made available
through the input
object. In the last example, you used the input
object
directly as the messages had no structure. This time, you can access attributes the
same way you might be familiar with from object oriented languages. For example
input.sensorData[0].temperature
will resolve to 21.4
.
To ingest this data into CDF, you must make a datapoint for each element in the
sensorData
list. Use a map
function on input.sensorData
.
map
takes in a function and applies that function to each element in the list.
input.sensorData.map(row =>
...
)
row
is a name for the input of the map
function. In this
case, for the first iteration of the map
the row
object will look like
{
"datetime": "2023-06-13T14:52:34",
"temperature": 21.4
}
The output of the map
function should be
{
"value": 21.4,
"timestamp": 1686667954000,
"externalId": "/myhouse/groundfloor/livingroom/temperature",
"type": "datapoint"
}
To do this, you must define a JSON structure where you specify input data as the values:
{
"value": row.temperature,
"timestamp": to_unix_timestamp(row.datetime, "%Y-%m-%dT%H:%M:%S"),
"externalId": context.topic,
"type": "datapoint"
}
For value
, we map it to the temperature
attribute of the row
object.
Similarly for timestamp
, except that we need to parse the time format from a
string to a CDF timestamp. To do this, we use the to_unix_timestamp
function
which takes in the timestamp to convert, and a description of the format.
For the external ID of the time series to use, we do the same as in the previous
example and use the topic the message arrived at. And type
can just be hard
coded to datapoint
since we only ingest datapoints in this example.
Putting that all together, we end up with the following format description:
input.sensorData.map(row => {
"value": row.temperature,
"timestamp": to_unix_timestamp(row.datetime, "%Y-%m-%dT%H:%M:%S"),
"externalId": context.topic,
"type": "datapoint"
})
Nested structures
Finally, let's look at a case where the data is nested with several lists. As an example, let's consider the case where a message contains a list of time series, each with a list of datapoints:
{
"sensorData": [
{
"sensor": "temperature",
"location": "myhouse/groundfloor/livingroom",
"values": [
{
"datetime": "2023-06-13T14:52:34",
"value": 21.4
},
{
"datetime": "2023-06-13T14:59:53",
"value": 22.1
},
{
"datetime": "2023-06-13T15:23:42",
"value": 24.0
}
]
},
{
"sensor": "pressure",
"location": "myhouse/groundfloor/livingroom",
"values": [
{
"datetime": "2023-06-13T14:52:34",
"value": 997.3
},
{
"datetime": "2023-06-13T14:59:53",
"value": 995.1
},
{
"datetime": "2023-06-13T15:23:42",
"value": 1012.8
}
]
}
]
}
First, let's start by iterating over the sensorData
list in the same way as
before:
input.sensorData.map(timeseries =>
...
)
For the first iteration in this map
, the timeseries
object will then be
{
"sensor": "temperature",
"location": "myhouse/groundfloor/livingroom",
"values": [
{
"datetime": "2023-06-13T14:52:34",
"value": 21.4
},
{
"datetime": "2023-06-13T14:59:53",
"value": 22.1
},
{
"datetime": "2023-06-13T15:23:42",
"value": 24.0
}
]
}
To extract the datapoints from this object, we need to iterate over the values
list. Let's attempt to use map
again to do that:
input.sensorData.map(timeseries =>
timeseries.values.map(datapoint =>
...
)
)
For the first iteration of this inner map
, the datapoint
object will be
{
"datetime": "2023-06-13T14:52:34",
"value": 21.4
}
We can convert this to a datapoint JSON in a similar way to before:
{
"value": datapoint.value,
"timestamp": to_unix_timestamp(datapoint.datetime, "%Y-%m-%dT%H:%M:%S"),
"externalId": concat(timeseries.location, "/", timeseries.sensor),
"type": "datapoint"
}
This time we also needed to make an external ID for the time series ourselves.
To do this, we use the location
and sensor
attributes on the timeseries
object from the outter loop and join them together with the concat
function.
Notice that in this inner loop, both the timeseries
object from the outter
map
and the datapoint
object from the inner map
are available.
Putting this all together, we get
input.sensorData.map(timeseries =>
timeseries.values.map(datapoint => {
"value": datapoint.value,
"timestamp": to_unix_timestamp(datapoint.datetime, "%Y-%m-%dT%H:%M:%S"),
"externalId": concat(timeseries.location, "/", timeseries.sensor),
"type": "datapoint"
})
)
However, if we use this format to convert the example message, we would not get a list of datapoints, but a list of lists of datapoints:
[
[
{
"externalId": "myhouse/groundfloor/livingroom/temperature",
"timestamp": 1686667954000,
"type": "datapoint",
"value": 21.4
},
{
"externalId": "myhouse/groundfloor/livingroom/temperature",
"timestamp": 1686668393000,
"type": "datapoint",
"value": 22.1
},
{
"externalId": "myhouse/groundfloor/livingroom/temperature",
"timestamp": 1686669822000,
"type": "datapoint",
"value": 24
}
],
[
{
"externalId": "myhouse/groundfloor/livingroom/pressure",
"timestamp": 1686667954000,
"type": "datapoint",
"value": 997.3
},
{
"externalId": "myhouse/groundfloor/livingroom/pressure",
"timestamp": 1686668393000,
"type": "datapoint",
"value": 995.1
},
{
"externalId": "myhouse/groundfloor/livingroom/pressure",
"timestamp": 1686669822000,
"type": "datapoint",
"value": 1012.8
}
]
]
This is because map
always works on a list, and returns a new list. Since we
want our output to just be a list of datapoints, we need to change the outter
map
to a flatmap
. flatmap
is similar to map
, except it flattens the
output, which means that it rolls out the list of lists to just a simple list:
[
{
"externalId": "myhouse/groundfloor/livingroom/temperature",
"timestamp": 1686667954000,
"type": "datapoint",
"value": 21.4
},
{
"externalId": "myhouse/groundfloor/livingroom/temperature",
"timestamp": 1686668393000,
"type": "datapoint",
"value": 22.1
},
{
"externalId": "myhouse/groundfloor/livingroom/temperature",
"timestamp": 1686669822000,
"type": "datapoint",
"value": 24
},
{
"externalId": "myhouse/groundfloor/livingroom/pressure",
"timestamp": 1686667954000,
"type": "datapoint",
"value": 997.3
},
{
"externalId": "myhouse/groundfloor/livingroom/pressure",
"timestamp": 1686668393000,
"type": "datapoint",
"value": 995.1
},
{
"externalId": "myhouse/groundfloor/livingroom/pressure",
"timestamp": 1686669822000,
"type": "datapoint",
"value": 1012.8
}
]
In total, our final format looks like
input.sensorData.flatmap(timeseries =>
timeseries.values.map(datapoint => {
"value": datapoint.value,
"timestamp": to_unix_timestamp(datapoint.datetime, "%Y-%m-%dT%H:%M:%S"),
"externalId": concat(timeseries.location, "/", timeseries.sensor),
"type": "datapoint"
})
)
Further reading
For more details about the mapping language, see Mapping concepts.