たそらぼ

日頃思ったこととかメモとか。

AWS IoTと Kinesis Analyticsを使ったニアリアルタイムデータ収集と加工

Kinesis Analyticsを使ってみたので、使い方をまとめる。センサーからIoTでデータを受信し、ニアリアルタイムで処理して、DynamoDBに格納するところまで。

システム概要

下図のシステムを作成した。

f:id:tasotasoso:20190801170419p:plain
今回作成したシステム

目的としては、次のような点を押さえたかった。

  • ラズパイなどのエッジサーバーでデータを収集し、AWS IoTに送信してみる。
  • AWS IoTでデータを受信し、Kinesis Data Streamに格納してみる。
  • Kinesis Analyticsでデータをウィンドウ処理してみる。Kinesis Analyticsでは前処理でLambdaを仕込めるので、これもやってみる。
  • LambdaでDynamoに格納してみる。

Kinesis Data Streamの作成

Kinesis Data Streamの設定は2つ。

  • ストリーム名の決定
  • シャード数の決定

ストリーム名の決定

今回は2つストリームが必要なので、適当に名前を付けて2つ作成する。とりあえず"IoTStream"と"DynamoStream"と名付けておく。

シャード数の決定

Kinesis Data Streamにはシャード単位でデータのスループットが決まっている。

シャードは、スループット容量の単位です。各シャードは最大 1 MB/秒、また 1000 レコード/秒を取り込み、最大 2 MB/秒を送信します。より高い、またはより低いスループットに対応するため、API を使用して Kinesis ストリームの作成後にシャード数を変更できます
Kinesis ストリームのコンソールより)

実務では必要なスループットの計算が必要だが、とりあえず1にしておく。


AWS IoTの設定

IoTの設定は主に3つ。

  • エッジサーバーに対応するモノの作成
  • ルールクエリステートメントで監視するトピックの設定
  • 受信したデータの送信先の設定

エッジサーバーに対応するモノの作成

AWS IoTのコンソールで、管理 > モノ から、エッジサーバー用のモノを作成する。エッジサーバーからデータを送信する際の証明書はここから入手する(あとで説明する)。

ルールクエリステートメントで監視するトピックの設定

ACT > "モノ" > 概要から、ルールクエリステートメントを設定できる。まず受信用に使うトピックを設定する。とりあえず、

SELECT * FROM 'IoT_topic'

のように'IoT_topic'を監視するようにしておく。必要に応じてカスタムするとよい。

受信したデータの送信先の設定

続いて、アクションを"Amazon Kinesis ストリームにメッセージを送信する"に設定する。アクションは、ルールがトリガーされたときの処理となる。この時、送信先を"IoTStream"とし、Kinesisストリームへのデータ格納が有効なIAMロールを適用しておく。

また、折角なので、エラーアクションも設定しておく。これはルールの処理中に問題が発生した場合に実行するアクションだが、設定は任意。今回は"Amazon S3バケットにメッセージを格納する"にしておく。こちらも必要なIAMロールを適用しておく。


Kinesis Data Analyticsの設定

Kinesis Data Analyticsの設定は3つ。

  • Sourceの設定
  • Real time analytics用のSQLの作成
  • Destinationの設定

Sourceの設定

Edit streaming data source connection で Choose sourceを選択する。Kinesis streamの欄で"IoTStream"を選択する。

Record pre-processing with AWS Lambdaで前処理用のLambdaを作成する。Record pre-processingをEnableにし、Create newから新しいLambada作成する。
Lambdaの実装はこんな感じ。ここでは、CSVデータをJSONに加工している(ちなみにAnalyticsはCSVも読めるので、そのままでも特に問題はないが、JSONの方がスキーマ定義のときに楽だったので加工した)。
加工するデータは、エッジサーバーから流すデータによるので、適宜変えてほしい。

from __future__ import print_function

import base64
import json
import re

print('Loading function')

def lambda_handler(event, context):
    output = []

    for record in event['records']:
        
        #decode csv from IoT
        payload = base64.b64decode(record['data'])
        parsedPayloads = re.split('[:,]', payload)

        temperature = parsedPayloads[1]
        humidity = parsedPayloads[3]
        jrpayload = {}
        jrpayload["TEMPERATURE"] = temperature 
        jrpayload["HUMIDITY"] = humidity
        
        #encode json payload for return
        rpayload = json.dumps(jrpayload)

        # Do custom processing on the record payload here
        output_record = {
            'recordId': record['recordId'],
            'result': 'Ok',
            'data': base64.b64encode(rpayload)
        }
        output.append(output_record)

    print('Successfully processed {} records.'.format(len(event['records'])))

    return {'records': output}

ここで、トリガーは設定しなくても動作する。

Access permissionsでIAMロールを設定する。これは"Create / update IAM role XXX"の方を選んでおけばよい。

最後に、Schemaを作成する。Kinesis Analyticsに流れてきたデータから自動でスキーマを作成してくれる。この時、ある程度データを流しながら出ないとうまくスキーマを作成してくれないので注意する。

Real time analytics用のSQLの作成

Kinesis Analyticsに流れてきたデータに対してSQLで処理を行うことができる。特に、window処理を行うことができるのが特徴。
docs.aws.amazon.com

今回はスライディングウィンドウで、15秒ごとに平均値をとる。
SQLの実装はこんな感じ。

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (AVGTIME TIMESTAMP, AVGTEMPERATURE FLOAT, AVGHUMIDITY FLOAT);

CREATE OR REPLACE  PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM STEP(s.ROWTIME BY INTERVAL '15' SECOND) AS AVGTIME,
              AVG(s.temperature) AS AVGTEMPERATURE,
              AVG(s.humidity) AS AVGHUMIDITY
FROM "SOURCE_SQL_STREAM_001" AS s
-- Uses a 10-second tumbling time window
GROUP BY STEP(s.ROWTIME BY INTERVAL '15' SECOND);

Destinationの設定

Edit destination connection > Destinationで、Kinesis streamを選択する。送信先は"DynamoStream"としておく。In-application streamは、DESTINATION_SQL_STREAMとし、Access permissionsで適切なIAMロールを付けておく。


DynamoDBへの格納

  • Kinesis Data StreamからDynamoDBへのLambdaの設定
  • DynamoDBのテーブル作成

Kinesis Data StreamからDynamoDBへのLambdaの設定

Kinesis Data StreamからAnalyticsで処理したデータを取り出し、DynamoDBへ格納するため、Lambdaを作成する。データを取り出しはトリガーをKinesis Data Streamにする。"DynamoStream"を指定しておく。

一方、DynamoDBへの格納はSDKを使用する。pythonだとboto3を使って実装する。

from __future__ import print_function
import logging
import base64
import boto3
import json
import decimal

logger = logging.getLogger()
class DecimalEncoder(json.JSONEncoder):
    def default(self, o):
        if isinstance(o, decimal.Decimal):
            if abs(o) % 1 > 0:
                return float(o)
            else:
                return int(o)
        return super(DecimalEncoder, self).default(o)

dynamodb = boto3.resource('dynamodb', region_name=[DynamoDBのリージョン], endpoint_url=[DynamoDBのエンドポイント])
table = dynamodb.Table([テーブル名])

def lambda_handler(event, context):
    for record in event['Records']:
        payload = base64.b64decode(record['kinesis']['data'])
        jpayload = json.loads(payload)
        
        
        TEMPERATURE = str(jpayload['AVGTEMPERATURE'])
        HUMIDITY = str(jpayload['AVGHUMIDITY'])
        AVGTIME = jpayload['AVGTIME']
        sAVGTIME = AVGTIME.split(" ")
        DATE = str(sAVGTIME[0])
        TIME = str(sAVGTIME[1])
        
        response = table.put_item(
        Item={
            'DATE': DATE,
            'TIME' : TIME,
            'TEMPERATURE': TEMPERATURE,
            'HUMIDITY':HUMIDITY
        }
        )

なお、Boto SDK でDynamoDBに数値を挿入する際にはDecimal クラスを使用する(ステップ 3: 項目を作成、読み込み、更新、削除する - Amazon DynamoDB)。


(おまけ)センサー用のデバイスの準備

Linuxをインストールしたエッジサーバーを用意しておき、直接センターをつなげたり、Bluetoothなどでデータを受ける構成が多い。
今回はRaspberry PiとDHT11で温度・湿度を測る - Qiitaを参考に、DHT11という温度/湿度センサーをラズパイに繋ぎ、一定時間ごとにAWS IoTに送信する。

f:id:tasotasoso:20190801170442j:plain
作成したデバイス


DHT11用のPythonライブラリがあるので、センサーからの数値の読み取りはPythonで行う。数値の読み取り後、IoT_post.shというcurlスクリプトを読んでAWS IoTに数値を送る。これは自分のラズパイがPythonからポストできなかったための苦肉の策で、数値が送れれば特に方法は問わない。

import RPi.GPIO as GPIO
import dht11
import time
import datetime
import subprocess

# initialize GPIO
GPIO.setwarnings(False)
GPIO.setmode(GPIO.BCM)
GPIO.cleanup()

# read data using pin 14
instance = dht11.DHT11(pin=14)

while True:
    result = instance.read()
    if result.is_valid():
        payload = 'temperature:' + str(result.temperature) + ',humidity:' + str(result.humidity)
	print(payload)
        subprocess.call("./IoT_post.sh %s" % payload , shell=True)
    time.sleep(1)

IoT_post.shは以下のような感じ。

#!/bin/bash
curl -D - --tlsv1.2 -X POST --cert ./cert.pem --key ./private.pem --cacert ./RootCA1.pem [自分のAWS-IoTのエンドポイント]:8443/IoT_topic?qos=0 -d $1

必要な各種情報の取得は以下のようにできる。

  • AWS-IoTのエンドポイント:IoT Core > 設定 > カスタムエンドポイント > エンドポイント
  • 認証情報

 ※IoT Coreでポリシーを作成する際に、ダウンロードして有効化しておく。
 -private.pem:プライベートキー
 -cert.pem:モノの証明書
 -RootCA1.pem:ルートCA(表示されたURLをたどって、ダウンロードする。)



結果

こんな感じでDynamoに格納できた。

f:id:tasotasoso:20190801170455p:plain
DynamoDBのテーブル


感想

今回はIoTデータのリアルタイム分析のためにKinesis Data Analyticsを試してみた。処理部分がSQLだったため、複雑な分析ができないものの、ほかのサービスでは非常に実現がむつかしい「ウィンドウ処理」を手軽に実装できた。

また、SQLでも簡単な機械学習のライブラリがあるほか、JavaScalaを動かすことも可能だ。ちなみに、個人的には仕込むjarの元となるApache flink(https://flink.apache.org/)に興味しんしんだったりする...。チュートリアルのコード例でScalaのコード量がめちゃくちゃ少なくなるのがすごいのでぜひ見てほしい...。

Kinesis Data Analyticsは今年6月ごろに東京リージョンで利用できるようになったばかりで、情報もまだまだ少ないし、コンソールも英語のままになっている。
aws.amazon.com
ストリーミング処理がクラウド上かつサーバーレスでできるのはとても強力な魅力だ。いろいろ試してみたいが、結構料金がかかったり、使用感に結構クセがあるので、初めての場合は苦労するように感じた。ぜひいろんな人に試してもらって、知見が溜まっていけばよいと思う。


各種ガイド

Kinesis Data Analyticsは昨日盛りだくさんのため、ガイドへのリンクを張っておく(ガイドがいくつかあり、公式HPの階層が深いため分かりづらい)。

▼開発者ガイド
docs.aws.amazon.com

SQLリファレンス
docs.aws.amazon.com

APIリファレンス
docs.aws.amazon.com