烏滑稽

ブログはじめました(迫真)

Kafka Java Consumer Client小技

Distributed computing Advent Calendar 2017の12/16分の投稿です。

qiita.com

clientコードでよく使われるテクニック、というわけではなく
自分が勝手によく使っている小技の紹介です。
今回扱うKafka Java Clientのバージョンは1.0ではなく残念ながら0.10.1.1です・・・今キャッチアップしてないんですよね。
他にいい小技や改善案などあればコメントで教えてください。

RPS調整(EPS)

Kafkaは一般に流量に強く、大トラフィックも捌いてくれますが後続のコンポーネントがそうとは限りません。
後続コンポーネントはシステム都合や検証用途のために500RPS以上でリクエストを投げると死んでしまうかもしれません。
そうしたときに下流が溢れないようにMax流量を絞るための便利UtilがguavaのRateLimiterです。使い方はJavadocに心底丁寧に書かれています。

Guava: Google Core Libraries for Java 23.0 API

Kafkaには特定のユーザやクライアントのMax流量を制限するquotaの機能がありますが、これはあくまでKafkaの管理側の視点で調整するもので、クライアントアプリケーション都合でちょいちょい変えるものではありません。
RateLimiterのいいところは

  • クライアントコードレベルで調整できる
  • トラフィックbpsとか)ではなくイベント数レベル(EPS)で調節できる
  • オブジェクトはスレッドセーフで、スレッド内だけでなくプロセス全体で流量調整できる
  • 手軽で直感的
  • すごく正確、しかもwaitが偏らない
// consume毎の処理を最大500EPSに絞るサンプルコード. 2行の追加で調整可能.
RateLimiter limiter = RateLimiter.create(500d);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
// 今回はconsumer 1threadだが,複数スレッドだとしても同じRateLimiterオブジェクトを書くスレッドに配れば全体で最大500EPSになる
while(true) {
  ConsumerRecords<String, String> records = consumer.poll(100);
  for (ConsumerRecord<String, String> record : records) {
    limiter.acquire();
    // 後続コンポーネントへのリクエストなど
  }
}

特にconsumerに便利というだけでproducerでも使えますしそもそもUtilなのでなんにでも使えます。

サンプリング

これも流量調整と似ているのですが、データの検証段階だったり、本番だとしてもデータの傾向さえ見れればいいので全量はいらない、というときがよくあります。例えば全量の10%で後続にリクエストしてくれと言われます。
これの実現方法は簡単です。consumerは全量吸ってランダムに10回に9回は何もせずメッセージを捨てればいいです。

// 10%サンプリングのサンプルコード
Random RND = new Random();
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while(true) {
  ConsumerRecords<String, String> records = consumer.poll(100);
  for (ConsumerRecord<String, String> record : records) {
    if (RND.nextInt(10) != 0) continue;
    // 後続コンポーネントへのリクエストなど
  }
}

しかしこのシンプルな方法には課題もあります。

  • 後続コンポーネントへの流入量は確かに減るが、consumerは結局全量吸うのでネットワークトラフィックを減らしたい場合は効果がない
  • RPSレベルの調整はできず、Kafkaへ想定以上の流入があるとサンプリングしていても大量のリクエストを後続に投げてしまう

RPS調整とサンプリングの合わせ技

これは上記サンプリングの課題を解決するためにRPS調整ベースでサンプリングする方法です。 RateLimiterで単純にRPSを調整すると、問題になるのはKafkaへの流入量がその最大RPSを上回っているとconsumeがどんどん遅れていく(ラグる)ことです。
検証デモを見せて、これは検証用途で流量を抑えているので遅れていますと言っても見せる相手は満足してくれないかもしれません。なぜなら偉い人はリアルタイムになにかが更新されているのを見るのが好きだからです。
これの解決方法はRPS調整しつつ定期的にオフセットを最新に飛ばすというものです。

// 500RPSに流量を抑えつつ最新のデータを後続にリクエストし続けるサンプルコード
RateLimiter limiter = RateLimiter.create(500d);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
long lastCheckTime = 0;
while(true) {
  ConsumerRecords<String, String> records = consumer.poll(100);
  for (ConsumerRecord<String, String> record : records) {
    limiter.acquire();
    // 後続コンポーネントへのリクエストなど
    long now = System.currentTimeMillis();
    if (now - lastCheckTime > 5_000) {
      consumer.seekToEnd(consumer.assignment());
      lastCheckTime = now;
    }
  }
}

これのいいところはまさに上記サンプリングの課題を解決しているところです。

  • consumerはオフセットを飛ばすことによって全量を吸っていないので、ネットワークトラフィックも減る
  • 後続コンポーネントへのMax RPSは確実に制限できる

ただこれはこれで課題もあり

  • サンプリングがランダムほどまばらではなく、時間によって偏るので期待する傾向が得られない可能性がある
  • 流入がMax RPSに引っかかっておらず、consumeが間に合ってるときでもサンプリングされる可能性がある(これはラグチェックの時にラグが一定以下だったら飛ばさない、などすれば回避可能)

サンプルコードでは5秒に1回オフセット飛ばしのチェックをしているので、偏りを減らしたければこの5秒をもっと短くすればいいんじゃないかと思うかもしれませんが、KafkaConsmer.seekToEndメソッドはそれなりにブロック時間がかかるのであまり短くしすぎるとRPSがぶれます。

トピック毎の流量調整(おまけ)

Aトピックは100EPSで、Bトピックは200EPSでconsumeしたいときがあります。
単純にはAトピック用のConsumerオブジェクトとスレッド群、Bトピック用のConsumerオブジェクトとスレッド群をそれぞれ作って(プロセスレベルで分けているのと同じ)、それぞれ上述のRPS調整をすればいいです。
しかし、1つのConsumerオブジェクトがA, B両方共subscribeするようなシステムのRPSを調整しようとする場合、難易度が激増します。
これは何故かと言うと1つのconsumerオブジェクト(スレッドといってもいい)はAを読んでいるときもあればBを読んでいるときもあるからです。
そこまでせんでも感ありますが実際こういうことはあったので紹介します(コードは長いので書きません、考え方だけです)。やや長いので興味がない人は次の節まで飛ばしてください。

考え方

Consumerスレッドは先の通り今吸っているトピックを認識できないので以下のように書くことはできません。Aトピックだけのconsumeなら問題ないですがBトピックを吸う可能性もあるからです。

RateLimiter limiter = RateLimiter.create(100d);
while(true) {
  ConsumerRecords<String, String> records = consumer.poll(100);
  for (ConsumerRecord<String, String> record : records) {
    limiter.acquire();

consume側で流量調整するのは不可能なので処理側(この後出てくる節と関連しますが)で調整することを考えます。
トピック事になんらかのQueue、たとえばLinkedBlockingQueueを使います。
consumer側はメッセージのトピックを見て、AトピックならAトピックのQueue、BトピックならBトピックのQueueに投げ入れるようにします(Queueの数は実際には動的に決まる)。
で、それぞれのQueue用のRateLimiterオブジェクトを作って(A:100, B:200)関連付けます。
後続の処理スレッドはそれぞれのRateLimiter.acquireを実行しながらQueueからpollして実際の処理を行います。
これで一応トピック毎のRPS調整ができるようになりますが、問題が2個あります。

  • pollが遅いのにconsumeは全力なのでQueueがあっという間に溢れること
  • Consumerオブジェクトは特定のトピックを吸うことに専念して、他のトピックを吸う機会が割り当てられないかもしれないこと(例えばBトピックばかり集中して吸ってしまい、Aトピックが100RPSを担保できるほどConsumerから流してもらえないかもしれない)。

これを解決するためにKafkaConsumerのメソッドであるpause(), resume()を使います。
このメソッドは特定のトピック、パーティションを指定してconsumeを停止したり再開させたりできます。
これ何のためのメソッドなんだと以前僕は思っていたのですがJavadocを見るとわかります。Consumption Flow Controlのためです。

kafka 0.10.1.1 API

なので割と想定にあった使い方です。
pollスピードを抑えてconsumeを全力で行うので普通に考えればQueueのサイズがどんどん増えていきます。
これを例えば全容量の80%(QueueのMaxサイズを1万にしていたら8千)に到達したらそのトピックのConsumeはpause()します。するとQueueは逆に消費されるだけなのでどんどんサイズが減っていきます。この間にConsumerオブジェクトはほかのトピックのconsumeをするようになるでしょう。そしてQueueが20%まで減ったら止めていたトピックのconsumeをresume()します。
閾値の調整もうまくやらないといけないところではるのですが、これで先の2点の問題を解決できます。

処理スレッドを増やす

今度はスピードを抑える話ではなく、頑張ってスピードを出す話です。
Kafkaのconsumerは大抵無チューニングでも十分に速いです。
そのためconsumer threadがボトルネックになることはほとんどありません。
もし今consumer threadが4本で、1本2000EPSしか処理できないので、現状8000EPSしか処理できない。要件では2万EPSを処理したいのでconsumer theradを10本にしたいんだ、ということがあったとしたらそれはconsumer threadを10本にする必要はありません(必要はないというだけで、増やすことが簡単であればその対応でも全く問題ないです)。
もちろん1個1個のデータサイズや物理リソースにもよるのですが、1 consumer threadのconsumeだけの能力を考えれば2000EPSは遅すぎです。そのスピードを2000EPSに縛っているのはconsume能力ではなく恐らくビジネスロジックです。
consumer thread数はトピックのpartition数に縛られるので増やすのはそう簡単ではない場合があります。その場合の解決策は、今consumer threadに一緒に載せているビジネスロジック部をビジネスロジック専用のスレッドに分けてしまうことです。
この話は我流ではなく、KafkaConsumerのJavadoc(再掲)に書いてある内容です(Decouple Consumtion and Processing)。

// consumer threadは1個だが、処理スレッドは5個にしている
LinkedBlockingQueue<ConsumerRecord<String, String>> queue = new LinkedBlockingQueue<>(100_000);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
ExecutorService pool = Executors.newFixedThreadPool(5);
for (int i = 0; i < 5; i++) {
  pool.submit(new ProcessTask(queue));
}
while(true) {
  ConsumerRecords<String, String> records = consumer.poll(100);
  for (ConsumerRecord<String, String> record : records) {
    queue.put(record);
  }
}

class ProcessTask implements Runnable {
  private LinkedBlockingQueue<ConsumerRecord<String, String>> queue;
  public ProcessTask(LinkedBlockingQueue queue) {
    this.queue = queue;
  }
  public void run() {
    while(true) {
      ConsumerRecord<String, String> record = queue.take();
      // 重い処理
    }
  }
}

ただここで非常に注意すべき点があります。
処理スレッドを分けるとat least onceの確保を自分でやる必要があります(Javadocの当該説のconsに書いてあります)。
ビジネスロジックをconsumer threadに載せている限りは、すべてのビジネスロジックが終わらない限りオフセットが進まないので、定期自動のデフォルトオフセットコミットに任せておけばat least onceを確保できました。
しかし、処理スレッドを分けると処理が全て完了するかどうかに関わらずオフセットが進むので、データロストの可能性がでてきます。
もしat least onceを確保しつつ処理スレッドを分けたい場合はオフセットコミットは自動ではなく手動でする必要があります。
これのやり方の定番は実は自分はわかっていないのですが、とりあえず自分の場合はどこかのタイミングでconsumeをブロックしてキューが空になってから手動コミットしてます。

オフセット強制最新

スレッドの話はさておき、今度はオフセットコントロールの話です。(この節の実装は実は既にふれていますが・・・)
ネットワークの問題や後続コンポーネントのトラブルでconsumerアプリケーションのconsumeがリアルタイムから遅れることがあります。 そして問題が解決した後リアルタイムに追いつくまでに数時間かかったりします。
これがデイリーバッチのためにデータを補充するシステムなどであればそんなに問題にならないのですが、提供するデータの鮮度がサービス満足度に直結するようなシステム(トレンドワード生成システムなど)だと非常にまずいことになります。
そんなとき一旦今読んでるリアルタイムから遅れているデータはスルーしていいからオフセットを最新に飛ばしてリアルタイムでconsumeしたいということがままあります。
現在のKafkaだとadminツールからconsumer groupを指定してオフセットをいじれたと思うのですが、今回はconsumer clientで対応する方法です。
と言っても簡単で、KafkaConsumerのseekToEndメソッドを呼ぶだけです。

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
consumer.poll(100); // consumer groupにconsumerを参加させておく
consumer.seekToEnd(consumer.assignment()); // 次pollするときに対象assign済partitionのオフセットを最新に飛ばす
while(true) {
  ConsumerRecords<String, String> records = consumer.poll(100);
  // 処理
}

通常時ではこの機能は使わず、何か起動引数にlatestなど加えて起動したときのみ緊急でオフセットを飛ばす、などのように作っておくと便利です。

batch consumer(startとstop)

メッセージキューではよく再送要件が重視されます。
いくらconsumerアプリケーションが正しく処理し、後続コンポーネントにフィードしてackが返ったと言っても
後続コンポーネントはトラブルや手オペによるミスで特定時間帯のデータをふっ飛ばしてしまうかもしれません。
ackは返していたが実はfeedされていなかったなんていうこともあるでしょう。そもそもackが返るまでブロックしない設計にする場合もあります。
また、上述のオフセット強制最新をやったときにスルーしたデータを別口(別consumer group)で補充したい場合があります。
そうしたときに、オフセットの最古から最新までconsumeし直しでは時間がかかりすぎますし、大量の重複がでます。
「10:45のデータから12:00のデータまでを再送」というようにstartとstopの時間を指定することができれば重複と補充時間を最小限に抑えることができます。
stopの時間が指定できるというのは重要です。これができないと人間がそのあたりの時間を読んでいるときに勘で止めなければなりません。
早く止めすぎると損失しますし、遅く止めすぎると大量に重複します。
これをclientで実現する方法が以下です。

long startMsec = 1513302300L; // 12/15 10:45
long stopMsec = 1513306800L; // 12/15 12:00

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
consumer.poll(100); // consumer groupにconsumerを参加させておく
Map<TopicParition, Long> timestampsToSearch = consumer.assiginment().stream.collect(Collectors.toMap(tp -> tp, tp -> startMsec));
Map<TopicPartition, OffsetAndTimestamp> offsetMap = consumer.offsetsForTimes(timestampsToSearch);
offsetMap.forEach((tp, offsetTIme) -> consumer.seek(tp, offsetTime.offset())); // ここでオフセットの位置を矯正
timestampsToSearch = consumer.assiginment().stream().collect(Collectors.toMap(tp -> tp, tp -> stopMsec));
Map<TopicPartition, OffsetAndTimestamp> endOffsetMap = consumer.offsetsForTimes(timestampsToSearch); // 終了オフセットを持っておく
Set<TopicPartition> completeTopicPart = new HashSet<>(); // 完了済topic partitionをとっておく

while(true) {
  ConsumerRecords<String, String> records = consumer.poll(100);
  for (ConsumerRecord<String, String> record : records) {
    // 処理
    if (record.offset() >= endOffsetMap.get(tp).offset()) {
      Set<TopicPartition> completed = Sets.newHashSet(tp);
      consumer.pause(completed); // 終わったpartitionはpauseしておくのがポイント
      completeTopicPart.addAll(completed);
    }
  }
  if (consumer.assignment().equals(completeTopicPart)) {
    break; // 担当する全てのtopic partitionがend offsetまでいったらbreakする
  }
}

開始地点と終了地点を決めて処理するこの形はストリーム的ではなく、バッチ的な感じがします。
この機構ではretention期限を過ぎない限りstartとstopを同じ数値で指定すれば必ず同じデータが同じ分だけ投入されます。
これはアプリケーションの検証時に再現性のあるテストをするために同じデータを何回も投げたい場合にも都合が良いです。

consumerの割当はconsumer groupの機能を使い、それ以降は使わずに固定する

さっきconsumeのstart stopを実現する方法を紹介しましたが、実はこれConsumerオブジェクトが複数あったとき、ほとんど同時に終わる場合のみを想定しています。
実際には多くのbatch consumeは上記の方法でうまくいくと思っているのですが、本当に大量のデータを複数のマシンでbatch consumeする、といったときにはうまくいきません。自分の担当分が終わったconsumerがbreakして処理を終えてもまだほかのconsumerがしばらく動いている場合、Coordinatorは一部のconsumerが死んだと判断してリバランスしてしまいます。
consumerにうまいことparititonを割り当ててくれるGroupCoordinatorは便利ですが、リバランスの機能が時として邪魔になることがあります。
リバランスをclientで無効にする方法は簡単で、KafkaConsumer.assign(Collection paritions)メソッドでconsumerに直接partitionを割り当ててしまえばよいです。ただ、これをやると最初のpartition割り当てもしないので人間が手で設定してやる必要がでてきます。実行の度にいちいち計算してpartitionを指定するのは面倒です。
以下は最初のpartition割当だけはGroupCoordinatorに任せ、一度決まったら固定化してリバランスさせないconsumeの方法です。

long fixPartitionWaitMsec = 30_000; // 待ち合わせ時間30秒

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
consumer.poll(100); // consumer groupにconsumerを参加させておく
long fixPartitionWaitStart = System.currentTimeMillis();
// groupへのconsumer参加が一番多かった時(つまり担当partitionが一番少ないとき)のアサインが確定partitionになる
Set<TopicPartition> minAssignTp = new HashSet<>();
while (System.currentTimeMillis() - fixParitionWaitStart < fixPartitionWaitMasec) {
  Thread.sleep(100L);
  consumer.poll(100); // ハートビート
  Set<TopicParitition> nowAssign = consumer.assignment();
  if (minAssignTp.size() == 0 || minAssignTp.size() > nowAssign.size()) {
    minAssignTp = nowAssign;
  }
}
consumer.unsubscribe();
consumer.assign(minAssignTp);

while(true) {
  ConsumerRecords<String, String> records = consumer.poll(100);
  // 処理
}

この実装で重要なことは、groupに参加しているConsumerをいつ確定させるかということです。 特にconsumerを載せているマシンが複数にわたる場合は、batch consumeの起動に時間差がついてしまい、
本当はもっとconsumerを参加させたかったのに打ち切られて少ないconsumerしか使えなかったとかが発生しないようにしたいです。
つまり、待ち合わせ時間の考えが必要です。
最初のプロセスを起動してから何秒以内だったら全てのconsumerを起動できるかは調整できるようにしておくべきです。
この時間を30秒としたら、最初に起動されたconsumerは30秒間ハートビートを送るだけで何もせず待ちます。30秒たった瞬間に今割り当てられているpartitionを確定として処理を開始します。

おわり

というわけでconsumer clientの小技紹介でした。記事が少しでもお役にたてれば幸いです。