Skip to main content

DataSQRL Quickstart in 10 Minutes

Metrics Monitoring Quickstart >|

We are going to build a data pipeline that analyzes sensor metrics in 10 minutes. Tik tok, let's go!

Create Script

First, we are going to define the metrics processing for our data product using SQL.

info

If you are unfamiliar with SQL, we recommend you read our SQL Primer first.

In the terminal or command line, create an empty folder for the SQL script:

> mkdir metrics; cd metrics

Then create a new file called metrics.sqrl and copy-paste the following SQL code:

metrics.sqrl
IMPORT datasqrl.example.sensors.SensorReading; -- Import metrics
IMPORT time.endOfSecond; -- Import time function
/* Aggregate sensor readings to second */
SecReading := SELECT sensorid, endOfSecond(time) as timeSec,
avg(temperature) as temp
FROM SensorReading GROUP BY sensorid, timeSec;
/* Get max temperature in last minute per sensor */
SensorMaxTemp := SELECT sensorid, max(temp) as maxTemp
FROM SecReading
WHERE timeSec >= now() - INTERVAL 1 MINUTE
GROUP BY sensorid;

DataSQRL's flavor of SQL is called "SQRL", which defines tables using the := assignment operator and supports explicit data and function imports.

In the script, we import the sensor data we are processing and a time function we use for aggregation.

We define the SecReading table that aggregates all sensor metrics within one second to smooth our temperature readings. We define another table SensorMaxTemp which computes the maximum temperature in the last minute for each sensor.

Compile the Script

DataSQRL compiles our SQRL script into an integrated data pipeline with the following command:

sqrl compile metrics.sqrl

Run the Script

(cd build/deploy; docker compose up --build -d)

Once the pipeline is running, it will ingest, process, store, and serve the data through an API.

note

We'll start up postgres, flink, kafka, and a graphql server. You may have other services running which could cause port conflicts.

Query API

Open your favorite browser and navigate to http://localhost:8888/graphiql/ to access GraphiQL - a popular GraphQL IDE. Write GraphQL queries in the left-hand panel. For example, copy the following query:

{
SensorMaxTemp(sensorid: 1) {
maxTemp
}
}

When you hit the "run" button you get the maximum temperature for the sensor with id 1 in the last minute.

And there you have it: a running data pipeline that ingests metrics, aggregates them, and exposes the results through a GraphQL API which you can call in your applications.

To stop the pipeline, interrupt it with CTRL-C and run (cd build/deploy; docker compose down -v).

Customize API

Got a little more time? Let's customize the GraphQL API and add a metrics ingestion endpoint.

By default, DataSQRL generates a GraphQL schema for us based on the tables we define in the SQRL script. That's great for rapid prototyping, but eventually we want to customize the API and limit data access.

To save us time, we are going to start with the GraphQL API that DataSQRL generates for us by running this command:

sqrl compile metrics.sqrl --api graphql

There is now a file called schema.graphqls in the same folder as our script. Open it and take a look.

Notice, how each table defined in our SQRL script maps to a query endpoint in the API and an associated result type. The query endpoints accept arguments for each column of the table to filter the results by column values.

We are going to remove most of those arguments to only support querying by sensorid. We will also remove the SensorReading query endpoint and result type to only expose the smoothed-out sensor readings from the SecReading table.

In the schema.graphqls file, remove the SensorReading type and replace the query definition with the following:

metricsapi.graphqls
type Query {
SecReading(sensorid: Int!): [SecReading!]
SensorMaxTemp(sensorid: Int): [SensorMaxTemp!]
}

Note, that we made sensorid a required argument for the SecReading query endpoint.

Now, invoke the compiler with the GraphQL schema we just created and launch the updated pipeline:

sqrl compile metrics.sqrl schema.graphqls

Followed By:

(cd build/deploy; docker compose up --build -d)

When you refresh GraphiQL in the browser, you see that the API is simpler and only exposes the data for our use case.

Ingest Metrics with Mutations

So far, we have ingested metrics data from an external source imported from the DataSQRL repository. The data source is static which is convenient for whipping up an example data product, but we want our data pipeline to provide a metrics ingestion endpoint.

No problem, let's add it to our GraphQL schema by appending the following mutation to the schema.graphqls file we created above

schema.graphqls
type Mutation {
AddReading(metric: SensorReadingInput!): CreatedReading
}

input SensorReadingInput {
sensorid: Int!
temperature: Float!
humidity: Float!
}

type CreatedReading {
event_time: String!
sensorid: Int!
}

To use the data created by this mutation in our SQRL script, we have to import it. Replace the first three lines of the metrics.sqrl script with:

metrics.sqrl
IMPORT schema.AddReading AS SensorReading;
IMPORT time.endOfSecond;
SecReading := SELECT sensorid, endOfSecond(event_time) as timeSec,
avg(temperature) as temp
FROM SensorReading GROUP BY sensorid, timeSec;

We are now using data ingested through the API mutation endpoint instead of the static example data. And for the timestamp on the metrics, we are using the special column event_time which captures the time data was ingested through the API.

Terminate the running service, run the compiler again, and re-launch the pipeline.

(cd build/deploy; docker compose down -v)
sqrl compile metrics.sqrl schema.graphqls
(cd build/deploy; docker compose up --build -d)

In GraphiQL, run the following mutation to add a temperature reading:

mutation {
AddReading(metric: {
sensorid: 1,
temperature: 37.2,
humidity: 88
}) {
sensorid
event_time
}
}

Hit the run button a few times and change the temperature and/or sensor id to insert multiple readings.

To query the maximum temperatures, run the following query:

{
SensorMaxTemp {
sensorid
maxTemp
}
}

Realtime Updates with Subscriptions

DataSQRL supports GraphQL subscription, so we can push processed data to the user in realtime instead of the user having to query for it. This is useful when we want to update dashboards with new metrics automatically and in realtime.

Let's add an alert when the temperature of a sensor exceeds 50°. First, we add the HighTempAlert table to our script:

metrics.sqrl
HighTempAlert := SELECT * FROM SecReading WHERE temp > 50;

Open the schema.graphqls file and add the following subscription and type:

schema.graphqls
type Subscription {
HighTempAlert(sensorid: Int): HighTempAlert
}

type HighTempAlert {
sensorid: Int!
timeSec: String!
temp: Float!
}

Terminate and rerun the pipeline:

(cd build/deploy; docker compose down -v)
sqrl compile metrics.sqrl schema.graphqls
(cd build/deploy; docker compose up --build -d)

This allows users of our API to subscribe to the HighTempAlert table with an optional sensorid argument to only receive alerts for a particular sensor. Whenever a sensor reading exceeds 50°, the user will be immediately notified.

Open two browser windows and navigate to http://localhost:8888/graphiql/ so you can see them both.

On one, start the graphql subscription:

subscription {
HighTempAlert(sensorid: 2) {
sensorid
temp
timeSec
}
}

On the other, fire off a mutation:

mutation {
AddReading(metric: {
sensorid: 2,
temperature: 90.5,
humidity: 88
}) {
sensorid
event_time
}
}

Wait a second and fire off a second one:

mutation {
AddReading(metric: {
sensorid: 2,
temperature: 95.5,
humidity: 88
}) {
sensorid
event_time
}
}

Voila, we just built a fully-functioning monitoring service that ingests, aggregates, and serves metrics data in realtime with push-based alerts. And the best part? The DataSQRL compiler ensures that it is efficient, fast, robust, and scalable.