投稿日
FIWAREのデータをElastic Stackで可視化する
もくじ
こんにちは。西日本テクノロジー&イノベーション室の髙谷です。
今回は私が担当したとある実証実験向けの案件で、IoTプラットフォームのFIWAREに蓄積したセンサーのデータを ElasticsearchとKibanaを使って可視化する作業をした時に発生した課題と、それを解決した方法を紹介します。
記事内ではFIWAREのコンポーネントであるOrion Context Broker(以下Orion)とSTH-Comet(以下Comet)の大まかな機能を紹介します。
OrionやCometの詳細な機能や利用方法については以下のリンクを参照してください。
全体構成
以下の順でデータを受け渡していきます。
センサー -> ZETA Server -> 「POST | PUT アプリ」 -> Orion -> Comet -> 「マージ用アプリ」 -> Elasticsearch -> Kibana
下図の破線で囲った部分が独自に作成したアプリケーションです。
- 「POST|PUT アプリ」でZETA ServerからFIWAREへデータ登録・更新
- 「マージ用アプリ」でFIWAREからElasticsearchへデータ登録・更新
【構成】
※ZETA Serverはセンサーのデータを収集して保持できるサービスです。
センサーのデータはZETA Serverに限らず他のサーバから収集することもあり、それらを集約する基盤としてFIWAREがいます。
※別の構成でも可能ですが、今回は作成済のFIWARE環境を利用する制約があるためCometを利用しました。
※FIWAREのカタログにあるWireCloudというコンポーネントを利用してデータを可視化することも可能ですが、利用方法を調査した際に見つかるノウハウの多さからElastisearchとKibanaを利用することにしました。
Cometの履歴が持つ課題と対応
CometはOrionに登録されているEntityの更新通知を受けて更新履歴を作成、保持します。
Cometから取得できる履歴には課題が2つあります。
- 複数属性の履歴をまとめて取得することができない
- 履歴に含まれる時刻(
recvTime
)はセンサーが温度を検知した時間ではない
この章ではそれぞれの課題と対応について説明します。
複数属性の履歴をまとめて取得することができない
履歴はEntityを1つ指定したうえで、そのEntityが持つ属性毎にしか取得できません。
【Orionに登録されているEntity例】
属性名 | 属性の説明 |
---|---|
id |
Entityを特定するID (必須) |
temperature |
温度 |
uploadTime |
センサーが温度を検知した日時 |
検知した温度や日時などセンサーから上がってくるデータをEntityに持って、登録・更新されます。
センサー毎に1つのEntityを作成して管理するためid
にはセンサーを特定するidをつけます。
// Orionに登録されているEntity(抜粋)
{
"type": "Temperature",
"id": "id.thermometer01",
"temperature": 23.1,
"uploadTime": "2020-01-21T23:43:53.00Z",
}
【Cometから取得できる履歴例】
属性名 | 属性の説明 |
---|---|
values |
履歴 |
attrValue |
履歴取得対象の属性の値(ここではtemperature 属性の値) |
recvTime |
OrionのEntityに更新がかかった日時 |
id |
履歴取得対象のEntityのID |
履歴は1つのEntityの1つの属性毎(ここではid.thermometer01
のtemperature
属性)にしか取れません。
// Cometから取得できる履歴例(抜粋)
~略~
{
"attributes": [
{
"name": "temperature",
"values": [
{
"attrType": "Number",
"attrValue": "20.3",
"recvTime": "2020-01-22T08:30:28.574Z"
},
{
"attrType": "Number",
"attrValue": "20.4",
"recvTime": "2020-01-22T08:40:28.653Z"
},
{
"attrType": "Number",
"attrValue": "23.1",
"recvTime": "2020-01-22T08:50:28.594Z"
}
]
}
],
"id": "id.thermometer01",
~略~
}
別々に取得した履歴を1つにまとめる
前述の通りCometからは複数属性の履歴を同時に取ることができないため、取得した各属性の履歴をマージする処理をいれることでセンサーが温度を検知した時間と計測された値を一つにまとめます。
マージする際のキーにはrecvTime
が利用できます。
Orionは1つのEntity単位で更新をおこなうためrecvTime
は複数属性の履歴で横断して同じ値になります。
センサー毎に一意なEntityのid
とrecvTime
と合わせてキーにすることでマージが可能になります。
履歴に含まれる時刻(recvTime)はセンサーが温度を検知した時間ではない
Cometから取得した履歴に付与されるrecvTime
はOrionに登録・更新が行われた日時なので、上の構成図であげた「POST|PUT アプリ」が処理を行った時間次第で決まります。
そのため、ZETA Serverに温度をアップロードされるタイミングと「POST|PUT アプリ」がOrionに更新をかけるタイミングが異なるため、recvTime
がセンサーが温度を検知した時間と乖離してしまいます。
センサーが検知した日時をEntityの一属性として持たせる
recvTime
属性とは別にセンサーが検知した日時をOrionのEntityの一属性uploadTime
として持たせて、温度等の他の属性と同じようにマージすることで、センサーが検知した温度と検知した日時が揃い、可視化に利用できるデータができます。
【Orionに登録されているEntity例】
温度属性と同じようにセンサーが温度を検知した日時もEntityの一属性として持たせる。
属性名 | 属性の説明 |
---|---|
id |
Entityを特定するID |
temperature |
温度 |
uploadTime |
センサーが温度を検知した日時 |
Comet内の履歴に不正な値が登録された場合の対処
「POST|PUT アプリ」に問題が発生し、温度(temperature
)が不正な値になってOrionが更新されることがありました。
Cometを経由して不正な値がElasticsearchに登録されたため、ZETA Serverから取り直した正常値で上書きする必要があります。
Cometの履歴情報は削除・更新ができないため、過去の不正な値はそのままにして、新たに履歴を登録してElasticsearch内のデータを上書きします。
上書きの仕組みはElasticsearchに登録するデータの_id属性を用います。
センサーが温度を検知した日時(uploadTime
)とセンサーを特定するID(id
)でZETA Serverに登録されたデータを一意に特定できるため、その2つを_id属性に付与することでElasticsearchにある過去のデータを最新のデータで上書できるようになります。
また、Elasticsearchはすでに存在するデータと同じ_id属性値を持つデータは更新、新規の_id属性値を持つデータは登録されるため、ZETA Serverからデータを再度登録する際に登録済か否かを意識する必要はありません。
また、タイムゾーンの設定等で「POST|PUT アプリ」から登録されたuploadTime
属性が不正な値になる場合もありました。
uploadTime
を_id属性にしているため、後からuploadTime
を正常値に直してもElasticsearchの不正データを上書きできません。
今回はZETA Serverに過去分のデータが残っていたため、Orionに新たなEntity定義を作成し、ZETA Serverに残っているデータで新たなEntityを更新し、正常な新規の履歴としてCometに記録、それを元に新たにElasticsearchにデータを登録して対処しました。
また、不正なElasticsearchのデータは参照しなくなるため削除しました。
おわりに
今回、課題はありましたがFIWAREのデータをElasticsearchとKibanaを使って可視化することができました。
ElasticsearchとKibanaの利用ノウハウはWeb上で調べると沢山みつかるため、FIWARE、Elasticsearch間の課題さえ越えればとても扱いやすい組み合わせになります。
FIWAREを利用したデータの利活用の際には是非ここで紹介した構成をためしてみてください。
本コンテンツはクリエイティブコモンズ(Creative Commons) 4.0 の「表示—継承」に準拠しています。