AWS IoTと Kinesis Analyticsを使ったニアリアルタイムデータ収集と加工
Kinesis Analyticsを使ってみたので、使い方をまとめる。センサーからIoTでデータを受信し、ニアリアルタイムで処理して、DynamoDBに格納するところまで。
- システム概要
- Kinesis Data Streamの作成
- AWS IoTの設定
- Kinesis Data Analyticsの設定
- DynamoDBへの格納
- (おまけ)センサー用のデバイスの準備
- 結果
- 感想
- 各種ガイド
システム概要
下図のシステムを作成した。
目的としては、次のような点を押さえたかった。
Kinesis Data Streamの作成
Kinesis Data Streamの設定は2つ。
- ストリーム名の決定
- シャード数の決定
ストリーム名の決定
今回は2つストリームが必要なので、適当に名前を付けて2つ作成する。とりあえず"IoTStream"と"DynamoStream"と名付けておく。
AWS IoTの設定
IoTの設定は主に3つ。
エッジサーバーに対応するモノの作成
AWS IoTのコンソールで、管理 > モノ から、エッジサーバー用のモノを作成する。エッジサーバーからデータを送信する際の証明書はここから入手する(あとで説明する)。
Kinesis Data Analyticsの設定
Kinesis Data Analyticsの設定は3つ。
- Sourceの設定
- Real time analytics用のSQLの作成
- Destinationの設定
Sourceの設定
Edit streaming data source connection で Choose sourceを選択する。Kinesis streamの欄で"IoTStream"を選択する。
Record pre-processing with AWS Lambdaで前処理用のLambdaを作成する。Record pre-processingをEnableにし、Create newから新しいLambada作成する。
Lambdaの実装はこんな感じ。ここでは、CSVデータをJSONに加工している(ちなみにAnalyticsはCSVも読めるので、そのままでも特に問題はないが、JSONの方がスキーマ定義のときに楽だったので加工した)。
加工するデータは、エッジサーバーから流すデータによるので、適宜変えてほしい。
from __future__ import print_function import base64 import json import re print('Loading function') def lambda_handler(event, context): output = [] for record in event['records']: #decode csv from IoT payload = base64.b64decode(record['data']) parsedPayloads = re.split('[:,]', payload) temperature = parsedPayloads[1] humidity = parsedPayloads[3] jrpayload = {} jrpayload["TEMPERATURE"] = temperature jrpayload["HUMIDITY"] = humidity #encode json payload for return rpayload = json.dumps(jrpayload) # Do custom processing on the record payload here output_record = { 'recordId': record['recordId'], 'result': 'Ok', 'data': base64.b64encode(rpayload) } output.append(output_record) print('Successfully processed {} records.'.format(len(event['records']))) return {'records': output}
ここで、トリガーは設定しなくても動作する。
Access permissionsでIAMロールを設定する。これは"Create / update IAM role XXX"の方を選んでおけばよい。
最後に、Schemaを作成する。Kinesis Analyticsに流れてきたデータから自動でスキーマを作成してくれる。この時、ある程度データを流しながら出ないとうまくスキーマを作成してくれないので注意する。
Real time analytics用のSQLの作成
Kinesis Analyticsに流れてきたデータに対してSQLで処理を行うことができる。特に、window処理を行うことができるのが特徴。
docs.aws.amazon.com
今回はスライディングウィンドウで、15秒ごとに平均値をとる。
SQLの実装はこんな感じ。
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (AVGTIME TIMESTAMP, AVGTEMPERATURE FLOAT, AVGHUMIDITY FLOAT); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM STEP(s.ROWTIME BY INTERVAL '15' SECOND) AS AVGTIME, AVG(s.temperature) AS AVGTEMPERATURE, AVG(s.humidity) AS AVGHUMIDITY FROM "SOURCE_SQL_STREAM_001" AS s -- Uses a 10-second tumbling time window GROUP BY STEP(s.ROWTIME BY INTERVAL '15' SECOND);
Destinationの設定
Edit destination connection > Destinationで、Kinesis streamを選択する。送信先は"DynamoStream"としておく。In-application streamは、DESTINATION_SQL_STREAMとし、Access permissionsで適切なIAMロールを付けておく。
DynamoDBへの格納
- Kinesis Data StreamからDynamoDBへのLambdaの設定
- DynamoDBのテーブル作成
Kinesis Data StreamからDynamoDBへのLambdaの設定
Kinesis Data StreamからAnalyticsで処理したデータを取り出し、DynamoDBへ格納するため、Lambdaを作成する。データを取り出しはトリガーをKinesis Data Streamにする。"DynamoStream"を指定しておく。
一方、DynamoDBへの格納はSDKを使用する。pythonだとboto3を使って実装する。
from __future__ import print_function import logging import base64 import boto3 import json import decimal logger = logging.getLogger() class DecimalEncoder(json.JSONEncoder): def default(self, o): if isinstance(o, decimal.Decimal): if abs(o) % 1 > 0: return float(o) else: return int(o) return super(DecimalEncoder, self).default(o) dynamodb = boto3.resource('dynamodb', region_name=[DynamoDBのリージョン], endpoint_url=[DynamoDBのエンドポイント]) table = dynamodb.Table([テーブル名]) def lambda_handler(event, context): for record in event['Records']: payload = base64.b64decode(record['kinesis']['data']) jpayload = json.loads(payload) TEMPERATURE = str(jpayload['AVGTEMPERATURE']) HUMIDITY = str(jpayload['AVGHUMIDITY']) AVGTIME = jpayload['AVGTIME'] sAVGTIME = AVGTIME.split(" ") DATE = str(sAVGTIME[0]) TIME = str(sAVGTIME[1]) response = table.put_item( Item={ 'DATE': DATE, 'TIME' : TIME, 'TEMPERATURE': TEMPERATURE, 'HUMIDITY':HUMIDITY } )
なお、Boto SDK でDynamoDBに数値を挿入する際にはDecimal クラスを使用する(ステップ 3: 項目を作成、読み込み、更新、削除する - Amazon DynamoDB)。
(おまけ)センサー用のデバイスの準備
Linuxをインストールしたエッジサーバーを用意しておき、直接センターをつなげたり、Bluetoothなどでデータを受ける構成が多い。
今回はRaspberry PiとDHT11で温度・湿度を測る - Qiitaを参考に、DHT11という温度/湿度センサーをラズパイに繋ぎ、一定時間ごとにAWS IoTに送信する。
DHT11用のPythonライブラリがあるので、センサーからの数値の読み取りはPythonで行う。数値の読み取り後、IoT_post.shというcurlのスクリプトを読んでAWS IoTに数値を送る。これは自分のラズパイがPythonからポストできなかったための苦肉の策で、数値が送れれば特に方法は問わない。
import RPi.GPIO as GPIO import dht11 import time import datetime import subprocess # initialize GPIO GPIO.setwarnings(False) GPIO.setmode(GPIO.BCM) GPIO.cleanup() # read data using pin 14 instance = dht11.DHT11(pin=14) while True: result = instance.read() if result.is_valid(): payload = 'temperature:' + str(result.temperature) + ',humidity:' + str(result.humidity) print(payload) subprocess.call("./IoT_post.sh %s" % payload , shell=True) time.sleep(1)
IoT_post.shは以下のような感じ。
#!/bin/bash curl -D - --tlsv1.2 -X POST --cert ./cert.pem --key ./private.pem --cacert ./RootCA1.pem [自分のAWS-IoTのエンドポイント]:8443/IoT_topic?qos=0 -d $1
必要な各種情報の取得は以下のようにできる。
- AWS-IoTのエンドポイント:IoT Core > 設定 > カスタムエンドポイント > エンドポイント
- 認証情報
※IoT Coreでポリシーを作成する際に、ダウンロードして有効化しておく。
-private.pem:プライベートキー
-cert.pem:モノの証明書
-RootCA1.pem:ルートCA(表示されたURLをたどって、ダウンロードする。)
感想
今回はIoTデータのリアルタイム分析のためにKinesis Data Analyticsを試してみた。処理部分がSQLだったため、複雑な分析ができないものの、ほかのサービスでは非常に実現がむつかしい「ウィンドウ処理」を手軽に実装できた。
また、SQLでも簡単な機械学習のライブラリがあるほか、JavaやScalaを動かすことも可能だ。ちなみに、個人的には仕込むjarの元となるApache flink(https://flink.apache.org/)に興味しんしんだったりする...。チュートリアルのコード例でScalaのコード量がめちゃくちゃ少なくなるのがすごいのでぜひ見てほしい...。
Kinesis Data Analyticsは今年6月ごろに東京リージョンで利用できるようになったばかりで、情報もまだまだ少ないし、コンソールも英語のままになっている。
aws.amazon.com
ストリーミング処理がクラウド上かつサーバーレスでできるのはとても強力な魅力だ。いろいろ試してみたいが、結構料金がかかったり、使用感に結構クセがあるので、初めての場合は苦労するように感じた。ぜひいろんな人に試してもらって、知見が溜まっていけばよいと思う。
各種ガイド
Kinesis Data Analyticsは昨日盛りだくさんのため、ガイドへのリンクを張っておく(ガイドがいくつかあり、公式HPの階層が深いため分かりづらい)。
▼開発者ガイド
docs.aws.amazon.com
▼SQLリファレンス
docs.aws.amazon.com
▼APIリファレンス
docs.aws.amazon.com