MapDで流れてくるデータを取り込んで可視化してみた

Stream Dataを取り込みたい

Stream Dataとは絶えず流れてくるデータのことです。
MapDにStream Dataを取り込んで、リアルタイムに可視化できるか?
今回はこれを試してみます。

StreamInsertとは

https://www.mapd.com/docs/latest/mapd-core-guide/loading-data/

StreamInsertとはMapDが用意している、Stream DataをMapDデータベースに入れるためのプログラムです。使い方は上記の公式URLに載っています。

以下はその構文です。

<data stream> | StreamInsert <table> <mapd database> --host <host name> --port <port number> -u <mapd_user> -p <mapd_pwd> --delim <field delimeter> --batch <batch number>

馴染みのなかったものだけ説明を加えます。
<data stream>標準出力に書き込まれるStream Dataです。
--delimには区切り文字を指定できます。csvファイルを指定するときは’,’を指定します。
--batchにはMapDに取り込む単位行数を指定できます。

少々わかりづらいかもしれませんので、実際に動かした様子をお見せしながら説明します。

実験方法

テーブルの用意

今回は簡単な実験にします。

MapDには以下のようなテーブルを用意してあります。

CREATE TABLE DEM (
dem_type TEXT ENCODING DICT(8),
height FLOAT,
lon FLOAT,
lat FLOAT)

DEMは木やビルを含まない標高データを意味していますが、Stream Dataの取り込みが可能かどうかを確認することが目的なので、今回はあまり意識する必要はありません。

Stream Dataの用意

Stream Dataを用意します。pythonで標準出力に書き込むようにしました。
経度、緯度と高さをカンマ区切りで標準出力に書き込みます。
経度と緯度は札幌市を中心に円を描くように計算して出力してみました。
出力されるデータの並びは次のようになります。

'value name', height,  longitude, latitude

作成したPythonコードは次のとおりです。

import math
import time
import sys

step = 40
for i in range(0, step):
    x = 141.345959 + math.sin(2*math.pi*(float(i)/step))
    y = 43.066894 + math.cos(2*math.pi*(float(i)/step))
    time.sleep(5)
    print "elevation," + str(10) + "," + str(x) + "," + str(y)

上記のコードで試したところ、意図したとおりにデータべースにデータが入っていませんでした。挿入されたことをその都度確認するためには、標準出力に書かれたデータをフラッシュする必要がありました。

つぎのコードをprint文の後に加えました。

sys.stdout.flush()

確認手段の用意(MapD Immerse)

データが正しく挿入されていることを確認するために、MapD Immerseを利用します。

MapD ImmerseはMapDに入っているデータを可視化分析するプラットフォームです。

https://www.mapd.com/platform/immerse/

MapD Immerseを使ってDEMテーブルを可視化します。

DEMテーブルのデータの緯度、経度を読み取って地図上に表示します。

結果

以下のコマンドを実行し、その様子をアニメーションgifにしました。

$python circle.py |/SampleCode/StreamInsert DEM mapd --host localhost --port 9091 -u mapd -p [pass] --delim ',' --batch 1

備考

MapD Immerseでは新しく挿入されたデータを反映させるには、マウスで画面をスクロールするなどして、リロードする必要があります。(2017/10/31現在)
リロードなしに、データが反映される機能は近々実装されるようです。

https://community.mapd.com/t/predictive-analytics-and-realtime-refresh/656