Skip to content

SQL说明

系统基础消息数据格式为JSON,可将其映射为虚拟数据表,数据表中的key对应表的列,value对应列值,规则引擎通过SQL语句对数据进行处理,如图所示:

消息示例

json
{
  "deviceId":"1017610",
  "messageType":"PROPERTY_REPORT",
  "data":{
    "CurrentHumidity":{
      "time":1689045521810,
      "value":87
    }
  }
}

SQL示例1

sql
SELECT * FROM mqtt_stream WHERE deviceId = "1017610" AND messageType = "PROPERTY_REPORT"

该SQL语句表示: 在消息总线中筛选deviceId等于"1017610"并且messageType等于"PROPERTY_REPORT"的数据,并将字段全部输出。

经过该SQL处理之后的消息输出如下:

json
{
  "deviceId":"1017610",
  "messageType":"PROPERTY_REPORT",
  "data":{
    "CurrentHumidity":{
      "time":1689045521810,
      "value":87
    }
  }
}

SQL示例2

sql
SELECT
    deviceId AS device_id,
    json_path_query (data,
                     "$.CurrentHumidity.value") AS value,
	json_path_query (data,
		"$.CurrentHumidity.time") AS ts
FROM
    mqtt_stream
WHERE
    deviceId = "1017610"
  AND messageType = "PROPERTY_REPORT"
  AND json_path_exists (data, "$.CurrentHumidity") = true

该SQL语句表示: 在消息总线中筛选deviceId等于"1017610"并且messageType等于"PROPERTY_REPORT"并且data对象中存在CurrentHumidity字段的数据,并将deviceId字段改为device_id,CurrentHumidity.value字段改为value,CurrentHumidity.time字段改为ts。

经过该SQL处理之后的消息输出如下:

json
{
    "device_id":"1017610",
    "ts":1689064850367,
    "value":18
}

SQL示例3

sql
SELECT
    deviceId AS device_id,
    json_path_query (data,
                     "$.CurrentHumidity.value") AS value,
	json_path_query (data,
		"$.CurrentHumidity.time") AS ts
FROM
    mqtt_stream
WHERE
    deviceId = "1017610"
  AND messageType = "PROPERTY_REPORT"
  AND json_path_exists (data, "$.CurrentHumidity") = true
  AND json_path_query(data,"$.CurrentHumidity.value") > 50

该SQL语句表示: 在消息总线中筛选deviceId等于"1017610"并且messageType等于"PROPERTY_REPORT",并且data对象中存在CurrentHumidity字段的数据取并且CurrentHumidity.value的值大与50, 并将deviceId字段改为device_id,CurrentHumidity.value字段改为value,CurrentHumidity.time字段改为ts。

经过该SQL处理之后的消息输出如下:

json
{
    "device_id":"1017610",
    "ts":1689064850367,
    "value":78
}

SQL示例4

sql
SELECT
    deviceId AS device_id,
    avg(json_path_query(data, 
        "$.CurrentHumidity.value")) AS avg_CurrentHumidity,
	json_path_query (data,
		"$.CurrentHumidity.time") AS ts
FROM
    mqtt_stream
WHERE
    deviceId = "1017610"
  AND messageType = "PROPERTY_REPORT"
  AND json_path_exists (data, "$.CurrentHumidity") = true
  GROUP BY TUMBLINGWINDOW(ss, 300) HAVING avg_CurrentHumidity >= 30

该SQL语句表示: 在消息总线中筛选deviceId等于"1017610"并且messageType等于"PROPERTY_REPORT"并且在300s内,CurrentHumidity.value的平均值大于30的数据, 并将deviceId字段改为device_id,并将平均值的计算结果赋值给avg_CurrentHumidity字段,CurrentHumidity.time字段改为ts。 经过该SQL处理之后的消息输出如下:

json
{
    "device_id":"1017610",
    "ts":1689064850367,
    "avg_CurrentHumidity":38.2
}

SELECT说明

  • 默认为 *,即不做任何提取与重组操作。
  • 支持 as,可对提取出的属性进行重命名,示例如下:SELECT deviceId as device_id
  • 支持使用json_path_exists函数判断json中的属性值是否存在。
  • 支持使用json_path_query函数获取嵌套json中的属性值,示例如下:
    SELECT json_path_query (data,"$.CurrentHumidity.value") AS value
  • 支持众多函数操作

WHERE说明

  • 支持使用json_path_exists函数判断json中的属性值是否存在,同SELECT。
  • 支持使用json_path_query函数获取嵌套json中的属性值,同SELECT。
  • 支持对数值类型进行大于、小于、等于判断,示例如下。示例如下:
    json_path_query(data,"$.CurrentHumidity.value") > 50
  • 支持众多函数操作
  • 支持时间窗口