Kafka Java Consumer Client小技
Distributed computing Advent Calendar 2017の12/16分の投稿です。
clientコードでよく使われるテクニック、というわけではなく
自分が勝手によく使っている小技の紹介です。
今回扱うKafka Java Clientのバージョンは1.0ではなく残念ながら0.10.1.1です・・・今キャッチアップしてないんですよね。
他にいい小技や改善案などあればコメントで教えてください。
- RPS調整(EPS)
- サンプリング
- RPS調整とサンプリングの合わせ技
- トピック毎の流量調整(おまけ)
- 考え方
- 処理スレッドを増やす
- オフセット強制最新
- batch consumer(startとstop)
- consumerの割当はconsumer groupの機能を使い、それ以降は使わずに固定する
- おわり
Javaのオブジェクト複製の動作確認
今更ながら躓いたので動作チェックのメモ。
import org.junit.Test; import java.util.ArrayList; import java.util.List; ・・・ /** * オブジェクト同士が同じオブジェクトか判断するとき * hashCode()がオーバーライドされていて使えないときは * System.identityHashCode()を使うといい. */ @Test public final void objectCheck() { List original = new ArrayList(); List original2 = original; List copyConst = new ArrayList(original); System.out.println("original = " + System.identityHashCode(original)); System.out.println("original2 = " + System.identityHashCode(original2)); System.out.println("copyConst = " + System.identityHashCode(copyConst)); } /** * Objectクラスのclone()はあくまでもshallowコピーなので * メンバが参照型でインスタンスが入っている場合clone()を使うと共有してしまう. */ @Test public final void shallowCopyTest() { CopySareru shallow1 = new CopySareru(); CopySareru shallow2 = shallow1.clone(); System.out.println("shallow1.getMemberAddress() = " + shallow1.getMemberAddress()); System.out.println("shallow2.getMemberAddress() = " + shallow2.getMemberAddress()); } /** * shallowコピーとはいえコピー対象同士のオブジェクトは別物なので * clone()後にメンバをインスタンス化すればメンバを共有することはない. */ @Test public final void copyAndInitTest() { CopySareru deep1 = new CopySareru(); CopySareru deep2 = deep1.clone(); deep1.init(); deep2.init(); System.out.println("deep1.getMemberAddress() = " + deep1.getMemberAddress()); System.out.println("deep2.getMemberAddress() = " + deep2.getMemberAddress()); } class CopySareru implements Cloneable { private List member; public CopySareru() { member = new ArrayList(); } public void init() { member = new ArrayList(); } public int getMemberAddress() { return System.identityHashCode(member); } public CopySareru clone() { CopySareru cs = null; try { cs = (CopySareru) super.clone(); } catch (CloneNotSupportedException e) { e.printStackTrace(); } return cs; } }
結果
Tests run: 1, Failures: 0, Errors: 0, Skipped: 1, Time elapsed: 0.021 sec original = 1689843956 original2 = 1689843956 copyConst = 977993101 shallow1.getMemberAddress() = 5592464 shallow2.getMemberAddress() = 5592464 deep1.getMemberAddress() = 1830712962 deep2.getMemberAddress() = 1112280004
KafkaのLog Compaction
KafkaにはLog Compactionという機能があります。バージョン0.8.1から導入されました。
このLog Compaction、かなりとっつきづらい機能なんじゃないかと思っています。Kafkaドキュメントのこのセクションhttps://kafka.apache.org/documentation/#compaction、ついつい読み飛ばしてしまうんじゃないでしょうか。
知らなくても普通のユースケースなら使わなそうですし。
しかしこいつ、ユースケースによっては使ってみるとなかなか便利なシロモノです。
細かい挙動とかも公式ドキュメントを確認しつつ自分の理解を書いていこうかと思います。*1
- Log Compactionとはなにか
- 状態を持つデータ
- 実際のシステムで考える
- compactモード
- compactionはいつどうやって起こる?どのデータが対象になる?
*1:これを書いたときはバージョンは0.10.1.1で、そのときのドキュメントを見ながら書いていました。
Kafkaアプリケーションのユニットテスト
10日0時を回ってしまった気がしますが Distributed Computing Advent Calendar 12/9 分、書いていきます。寝なければセーフ!
Kafkaアプリケーションのユニットテストについてですが具体的な方法ではなく概論のような形になります。
Kafkaアプリケーション is 何
今回話すKafkaアプリケーションは、Kafka brokerそのものの上で動くなにかのプログラムではなく、
Kafka brokerに対してconsumeしたりproduceしたりするクライアントアプリケーションのことです。
例えば
- Kafkaから読んだデータをフィルタリングしたり情報を付加した後に別トピックに書き戻すアプリケーション(ETL)
- Kafkaから読んだデータを集約して結果をKVSなどに書き出すアプリケーション
- Kafkaから読んだデータを使って異常検知し、何らかのAPIを叩くアプリケーション
- StormやKafka Streamsのアプリケーション
Kafkaの利用目的がストリーム処理であれデータバスであれ、 クライアントアプリケーションで何かを実現したいからKafkaを使うわけで、基本的にはこれらの開発をすることになります。
続きを読む東京メトロのオープンデータをElasticsearch + Kibana (5.0.2)で可視化してみる(Java Client)
JJUG CCC 2016 fallブーストークで大谷さんがElastic Advent Calendar枠まだありますって言ってて、勢いで登録してしまった。
実は時間もネタもなかったことに後から気付きつつ速攻でなんかやっていきます!
3時間くらいでなんとかなったらいいな!
何かいいサンプルデータないかなー
電車の遅延傾向とかパッとグラフで見れたらいいなーと思いつき、
以前どこかで聞いた気がして調べて見つけたのが東京メトロのオープンデータです。
東京メトロの路線情報や駅情報をAPIで参照できます。
APIを利用するためには開発者登録をすると最大2営業日ほどでアクセストークンが取得できるとのことだったんですが自分がやったら半日で登録してくれました、ありがたい。
APIは路線IDで路線を検索したりLat Lonで駅検索したりいろいろできるんですが、
とりあえず全路線の現在状況が知りたければ
https://api.tokyometroapp.jp/api/v2/datapoints?rdf:type=odpt:Train&acl:consumerKey=<アクセストークン>
を叩いておけば大丈夫です。公式ドキュメントはなんだかちょっと結構難しく書いてありますがHTTPリクエストでJSON arrayが返ってくるだけです。
レスポンス項目は開発者登録後に読めるドキュメントにありますが、よく読まなくてもとりあえず可視化すればなにかいい感じでわかるでしょう!わかるといいな!
ストリームでのウィンドウ集計:タンブリング(Tumbling)、ホッピング(Hopping)、スライディング(Sliding)ウィンドウ
リアルタイムストリーム処理の話でよく出てくる以下のウィンドウ集計について、パッとわかる日本語の説明がなかったから認識を書いてみた。
各項目最初の2, 3行でつまりなんなのかを説明しようとしているが、それ以降はちょっと細かい話なので混乱したくない場合は見ない方がいいかも。
時間軸での集計前提。
タンブリングウィンドウ集計
一言でいうと「一定時間ごとの集計」。
例えば10秒毎の流れてくるツイートを知りたいといったときにはタンブリングウィンドウ集計を選択することになる。
このウィンドウ関数を起動してから10秒たったらその10秒間のツイート数が、さらにその10秒後にはその10秒間のツイート数が取得できる感じ。
ポイントは1つのイベントは1つのウィンドウにしか属さないということ。「オーバーラップしない」と表現されるかもしれない。