読者です 読者をやめる 読者になる 読者になる

烏滑稽

ブログはじめました(迫真)

東京メトロのオープンデータをElasticsearch + Kibana (5.0.2)で可視化してみる(Java Client)

JJUG CCC 2016 fallブーストークで大谷さんがElastic Advent Calendar枠まだありますって言ってて、勢いで登録してしまった。

qiita.com

実は時間もネタもなかったことに後から気付きつつ速攻でなんかやっていきます!
3時間くらいでなんとかなったらいいな!

何かいいサンプルデータないかなー

電車の遅延傾向とかパッとグラフで見れたらいいなーと思いつき、
以前どこかで聞いた気がして調べて見つけたのが東京メトロのオープンデータです。

東京メトロオープンデータ開発者サイト

東京メトロの路線情報や駅情報をAPIで参照できます。
APIを利用するためには開発者登録をすると最大2営業日ほどでアクセストークンが取得できるとのことだったんですが自分がやったら半日で登録してくれました、ありがたい。
APIは路線IDで路線を検索したりLat Lonで駅検索したりいろいろできるんですが、
とりあえず全路線の現在状況が知りたければ
https://api.tokyometroapp.jp/api/v2/datapoints?rdf:type=odpt:Train&acl:consumerKey=<アクセストークン>
を叩いておけば大丈夫です。公式ドキュメントはなんだかちょっと結構難しく書いてありますがHTTPリクエストでJSON arrayが返ってくるだけです。
レスポンス項目は開発者登録後に読めるドキュメントにありますが、よく読まなくてもとりあえず可視化すればなにかいい感じでわかるでしょう!わかるといいな!

Elasticsearchのセットアップ

今回の目標は可視化です。サーバもインスタンスも準備する暇はないので自分のmacで構築します。
自分は以前2.3系のelasticsearchを使っていたことがあったのですが、今は5系(何故か3系4系がとんだ?)が出ているのらしいのでせっかくだから新しいやつを使ってみます。
https://www.elastic.co/jp/downloads/elasticsearch
ダウンロードページを見ると3ステップ書いてあります。

  1. 最新のElasticsearchをダウンロードして展開する
  2. ./bin/elasticsearchを実行する
  3. curl -X GET http://localhost:9200/ を実行

超カンタンですね!
では早速Macの適当なディレクトリで作業していきます。

$ wget "https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.0.2.tar.gz"
$ tar xvzf elasticsearch-5.0.2.tar.gz
$ cd elasticsearch-5.0.2
$ ./bin/elasticsearch

いろいろログが出て起動できた感じです。
ブラウザで http://localhost:9200/ にアクセスするとクラスタ情報がJSONで表示されました。本当に1分でできちゃいますね。

// ひとこと
headという、elasticsearchのインデックス状況などを詳細にWeb UIで表示してくれるお決まりのプラグインがありまして。
site pluginという機能で5系の前まではelasitcsearchに組み込んで起動するということをよくやっていたのですが
5系でsite pluginがなくなったようです。ただheadが使えないわけではなく、今はelasticsearchの外から起動できるようになった感じだと思います。

// ふたこと
今回は単独Mac起動なので触れませんがスケールアウトも簡単にできます。
基本同じネットワーク内であれば同じクラスタ名でelasticsearchを起動してシャード数を増やせば勝手に分散します。

Kibanaのセットアップ

さて、次はKibanaです。 https://www.elastic.co.jp/downloads/kibana MAC用のtar ballがありますね。 ステップを見るとelasticsearchほどはシンプルでないでしょうか、とはいえ5分で終わるか。

  1. Kibana 4をダウンロードして展開する
  2. アーカイブの抽出、エディタでconfig/kibana.ymlを開く、Elasticsearchインスタンスを参照するようにelasticsearch.urlを設定する、./bin/kibanaを実行する
  3. ブラウザで http://yourhost.com:5601 を開く

おや、stableのバージョンは5.0.2と書いてありますがステップではKibana 4になってますね。ま、気にせず進めましょう。

$ wget "https://artifacts.elastic.co/downloads/kibana/kibana-5.0.2-darwin-x86_64.tar.gz"
$ tar xvzf kibana-5.0.2-darwin-x86_64.tar.gz
$ cd kibana-5.0.2-darwin-X86_64
$ ./bin/kibana

起動できたようです。
ブラウザで http://localhost:5601 にアクセスするとkibanaのリッチなUIが表示されました。 f:id:rice_kokumutyoukan:20161207013438j:plain,w400
ステップ②のkibana.ymlの編集を飛ばしましたが問題ありませんでした。デフォルトでelasticsearchはlocalhost:9200に接続しにいくようになっているんですね、親切。

// ひとこと
新規構築時にはあまり問題にならないのですがElasticsearchとKibanaの連携可能バージョンは限られています。
ElasticsearchをバージョンアップしたらKibanaが古くて見れなくなったとか、
逆にKibanaをバージョンアップしたらElasticsearchが古くてうまくいかないとかありがちだと思います。
バージョンアップ時は基本的には両方最新にしていきましょう。

Java Client開発

メトロAPIを引いて結果JSONをelasticsearchに投入するJavaコードを書いていきます。
以前使っていた時はHTTP POSTでindexしていたのでelasticsearchのJava Clientを使うのは初となります。
↓のページを見ながら進めていきます。
https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/index.html
maven pomで最小限必要なdependencyはelasticsearch transportとlog4jです。
自分では実際には他にもいろいろユーティリティライブラリをつっこみました(Apache CommonとかHTTP ClientとかJSON simpleとか)。

<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>transport</artifactId>
    <version>5.0.1</version>
</dependency>
<dependency>
    <groupId>org.apache.logging.log4j</groupId>
    <artifactId>log4j-api</artifactId>
    <version>2.6.2</version>
</dependency>
<dependency>
    <groupId>org.apache.logging.log4j</groupId>
    <artifactId>log4j-core</artifactId>
    <version>2.6.2</version>
</dependency>

で開発したコードスニペットがこれです。

JSONParser parser = new JSONParser();

// メトロAPIをHTTP GETして返ってきたJSON arrayをJSONArrayオブジェクトとして返す感じの関数
public JSONArray getMetroAPI() throws IOException, ParseException {
    CloseableHttpClient httpclient = HttpClients.createDefault();
    HttpGet httpGet = new HttpGet("https://api.tokyometroapp.jp/api/v2/datapoints?rdf:type=odpt:Train&acl:consumerKey=<アクセストークン>");
    CloseableHttpResponse apiResponse = httpclient.execute(httpGet);
    String jsonStr = IOUtils.toString(apiResponse.getEntity().getContent(), Charset.forName("UTF-8"));
    JSONArray arr = (JSONArray) parser.parse(jsonStr);
    apiResponse.close()
    return arr;
}

@Test
public void test() throws IOException, ParseException {
    // やりとりしたいelasticsearchのホスト情報を書く.
    // TransportAddress add = new InetSocketTransportAddress(InetAddress.getLocalHost(), 9300); // これはなんか接続できなかった. バインドされているのがデフォルトだとループバックアドレスだけなのかも
    TransportAddress add = new InetSocketTransportAddress(InetAddress.getByName("localhost"), 9300); // デフォルトだとJava Clientに対するリッスンポートは9200ではなく9300や
    TransportClient cli = new PreBuiltTransportClient(Settings.EMPTY) // クラスタ名がデフォルトでなければ指定しなければダメ
        .addTransportAddress(add);
    for (Object json : getMetroAPI()) {
        IndexResponse response = cli.prepareIndex("train", "info") // 第一引数がインデックス名, 第二引数がタイプ名. それぞれデータベース名, テーブル名みたいなもの. 小文字じゃないとダメ.
            .setSource(json.toString()).get();
    }
}

これでインデックスできました。
ここでgetMetroAPIメソッドは重要ではないです。ただURLに対してHTTP GETしてJSON Arrayを取得しているだけです。
testメソッド内でcli.prepareIndex(...).setSource(なにか).get();
しているところが実際にelasticsearchにインデックスしている箇所です。
setSourceの引数でインデックスする内容を入れますが、JSON文字列だけでなくMapやJacksonシリアライズしたバイナリも選択できるようです。
自分でコンテンツをいじってインデックスするような場合はMapやJacksonシリアライズの方が良い選択肢でしたが、今回は外から取得したJSONをそのままインデックスするのでJSON文字列を選択しました。 インデックス名は適当にtrainとしました。事前にインデックスを作っておかなくても存在しなければelasticsearchが勝手に作ってくれます。

これをcronなどで定期的に実行するようにして放置しておきます。

可視化

1日だけしか回さなかったので少ない時間とデータですが、elasticsearchにはいくらか路線情報がインデックスされたはずです。
これを可視化していきます。KibanaのWeb UIを触るだけです。
※すみませんここからだんだん適当に。。

Kibanaの最初の画面はConfigure an index patternです。
Index name or patternはデフォルトではlogstash-*と書いてありますが、これはログ投入機にlogstashというelastic stackを使うことを想定しているためです。
作ったインデックスは"train"なのでtrainと書きましょう。
普通インデックス名はプレフィックスとyyyyMMdd等を組み合わせて、インデックスが肥大化しすぎないように一定期間毎に区切ったりするのですが今回はめんどくさいので"train"だけです。
指定するとインデックスを認識してTime-fieldの指定ができるようになります。
今回のログの場合タイムスタンプ型の項目は2つありましたが、項目仕様からdc:dateを選んでCreateボタンを押します。↓のような感じです。
f:id:rice_kokumutyoukan:20161207013445j:plain,w400
するとtrainインデックスの各フィールドが認識されて以下のような画面になりました。
f:id:rice_kokumutyoukan:20161207013448j:plain,w400

次に各路線の遅延グラフを作ってみます。
左のツールバーのVisualizeをクリックしてLine chartを選びます。
f:id:rice_kokumutyoukan:20161207013452j:plain,w400
Y軸、X軸などが指定できる画面になるので、ノリで設定していきます。
Y軸は遅延秒を表すodpt:delayをCountではなく、Sumで。
X軸はDate Histogramにしてインターバルは重複しないようにわざと分固定にします。
さらにY軸はSub AggregetionをTermsに、フィールドを路線名に指定して再生ボタンみたいなやつを押します。
f:id:rice_kokumutyoukan:20161207013456j:plain,w400
APIの叩き漏れなどでいまいちなグラフになってしまいましたが一応路線ごとの1日の遅延時間をグラフ化することができました!
地下鉄は外の路線と違ってあまり遅延しないかなと思っていたのですがこうして見るとなんだかんだどの路線もちょっとは遅延してますね、特にラッシュ時間。
この日は朝が千代田線、夕方は銀座線が10-15分くらい遅延していることがわかりました。
これをもっと長期でとって、遅延傾向もいろんな側面でグラフ化して、さらにかっこよくダッシュボード化すると面白そうですが夜も更けてきたので記事はこれくらいにしておこうと思います。

最初3時間でできたらいいなと書いていましたがJava Clientいじりで時間を消費し、3時間は少し厳しいものがありました。
とはいえ簡単にログ可視化ができるツールであることは間違いないと思うのでお試しあれ。