RedisTimeSeriesRedisTimeSeries

Posted on

li.L0, li.L1, li.L2, li.L3, li.L5, li.L6, li.L7, li.L8 { list style type: decimal! important }

Summary

I will cover an IoT data flow use case in this article. This data stream comes from a Storm weather station and is stored on a Redis instance as TimeSeries data. The application is implemented as a container on Google Cloud Run. Redis TimeSeries data is visualized using grafana.

Architecture

Application

ExpressJS REST API Server

Very simple server side application to start and stop the data stream.
app.post('/start', async (req, res) => {
try {
if (!tc) {
tc = new TempestClient();
await tc.start();
res.status(201).json({'message': 'success'});
}
else {
throw new Error('tempest client already instantiated');
}
}
catch (err) {
res.status(400).json({error: err.message})
};
});

app.post('/stop', async (req, res) => {
try {
if (tc) {
await tc.stop();
tc = null;
res.status(201).json({'message': 'success'});
}
else {
throw new Error('tempest client does not exist');
}
}
catch (err) {
res.status(400).json({error: err.message})
};
});

Customer Storm

Weatherflow provides a post REST APIs and Websockets. In this case, I used their Websocket interface to provide a 3 second wind data stream from the weather station.
    async start() {
if (!this.ts && !this.ws) {
this.ts = new TimeSeriesClient(redis.user, redis.password, redis.url);
await this.ts.connect();
this.ws = new WebSocket(`${tempest.url}?token=${tempest.password}`);

this.ws.on('open', () => {
console.log('Websocket opened');
this.ws.send(JSON.stringify(this.wsRequest));
});

this.ws.on('message', async (data) => {
const obj = JSON.parse(data);
if ("ob" in obj) {
const time = Date.now()
const speed = Number(obj.ob[1] * MS_TO_MPH).toFixed(1);
const direction = obj.ob[2];
console.log(`time: ${time} speed: ${speed} direction: ${direction}`);
await this.ts.update(tempest.deviceId, time, speed, direction);
}
});

this.ws.on('close', async () => {
console.log('Websocket closed')
await this.ts.quit();
this.ts = null;
this.ws = null;
});

this.ws.on('error', async (err) => {
await this.ts.quit();
this.ws.close();
this.ts = null;
this.ws = null;
console.error('ws err: ' + err);
});
}
}

async stop() {
this.ws.close();
}

Redis TimeSeries client

I used the Node-Redisclient to implement a function that executes a Adding time series.
    async update(deviceId, time, speed, direction) {
await this.client.ts.add(`wind_direction:${deviceId}`, time, direction);
await this.client.ts.add(`wind_speed:${deviceId}`, time, speed);
}

Deployment

Docker file

FROM node:18-slim
WORKDIR /usr/src/app
COPY package*.json ./
RUN npm install
COPY . .
EXPOSE 8080
CMD ["npm", "start"]

Cloud Redis + preview

Integrating Google Cloud Code with VS Code

The app container is deployed to Cloud Run using Cloud Code tools.

Connecting Grafana data to Redis

Execution

CURL POST to start the data stream

curl -X POST https://redis-demo-y6pby4qk2a-uc.a.run.app/start -u yourUser:yourPassword

Redis Insight real-time streams

CloudRun Console

Grafana Dashboard

Source

Leave a Reply

Your email address will not be published.