烏滑稽

ブログはじめました(迫真)

KafkaのLog Compaction

KafkaにはLog Compactionという機能があります。バージョン0.8.1から導入されました。
このLog Compaction、かなりとっつきづらい機能なんじゃないかと思っています。Kafkaドキュメントのこのセクションhttps://kafka.apache.org/documentation/#compaction、ついつい読み飛ばしてしまうんじゃないでしょうか。
知らなくても普通のユースケースなら使わなそうですし。
しかしこいつ、ユースケースによっては使ってみるとなかなか便利なシロモノです。
細かい挙動とかも公式ドキュメントを確認しつつ自分の理解を書いていこうかと思います。*1

Log Compactionとはなにか

一言で言うと「常に流れ続けるデータをbrokerに貯めていく上で、使用ディスクサイズが永遠に増えていかないように自動的に制御する機能の、モードの一つ」です。
「の一つです」ということは複数あるということで、2つあります。

  • brokerに書き込まれたデータが指定期間以上たって期限切れになった、または指定サイズ以上肥大化した場合古いデータを削除する「delete」モード
  • Log Compactionによってbrokerに書き込まれたデータを圧縮(削除)する「compact」モード

このモードはトピック毎に設定できます。デフォルトは「delete」モードです。
「delete」モードはよく知られるところだと思います。時間とサイズで縛って古いデータを削除する。シンプルでわかりやすい。
「compact」モードは直感的にわかりづらいです。これから順を追って説明していきます。

ひとくちメモ

deleteモードとcompactモードは併用できます(方法は後述)。
実際のところcompactモードを使う場合は併用するパターンが多いのではないかと思います。
また、両方無効もできます。永遠にデータサイズがすごい勢いで増えていきます。

状態を持つデータ

Log Compactionを理解するためにはまずこういうデータの概念があるんだということを理解しなければなりません。
この文脈で話すうえで、Kafkaに流すストリームデータには2種類あります。

1つ目は一般的なユースケースのデータです。Webアクセスログとか、アプリケーションのシステムログとか、そういう各々が独立して意味を持つデータです。
brokerにproduceするときは、keyに何もいれず(つまりnull)にvalueにログの文字列なりを入れてproduceするのではないでしょうか。
ひょっとすると特定のデータは同じパーティションに入れさせたいといった要望があってkeyに何か入れているかもしれませんが、ユースケースとしては若干少ないように思います。
こういうデータにはLog Compactionの機能を使う必要はありません。

もう1つは状態を持つデータです。
このデータはkey毎に状態を持っています。状態はvalueに入っています。
例えばkeyにユーザID、valueにユーザメールアドレスが入っているようなデータがそれにあたります。
ユーザID 123であるAさんがメールアドレス 〇〇〇@example.com を設定したら
“key: 123, value: 〇〇〇@example.com” というレコードがbrokerに投げられ、
Aさんがメールアドレスを ×××@example.com に変更したら
“key: 123, value: ×××@example.com” というレコードがbrokerに投げられる感じです。
ユーザの現在のメールアドレスを表示するシステムだったら必要なレコードは同じkeyのレコード群の中で最新のレコードだけです。
これを聞くとそれってまさにKVSでは?と思われるかもしれません。それは正しくて、このデータの場合KafkaをKVSのように使います。
じゃあKVSでいいじゃん?
いやいや、Kafkaに流れるデータはあくまでストリームデータです。Kafkaの先にはKVSや時系列DBが居るかもしれませんが、状態を持つストリームデータはKafkaに保持されるのです。

ひとくちメモ

Kafkaの1レコードはkey, valueのペアから成り立っています(正確にはメタデータも入っている)。
valueだけしか使わない場合も結構多いのでピンとこない方もいるかもしれないのですが、
その場合でもkeyがnullになっているだけで、必ずkey, valueのペアとしてproduce, consumeすることになります。

実際のシステムで考える

こんなユースケースを考えてみましょう(あくまで想像の話です)。
フロントエンドがユーザから自分のメールアドレス設定を受け付け、その情報をさっきのレコード形式、key: ユーザID, value: メールアドレス としてKafkaのaddressトピックにproduceします。
その先にはKVSが居て、consumerアプリケーションがaddressトピックを吸ってユーザアドレスをKVSにputしていきます。KVSを参照すればユーザ毎最新のメールアドレスがわかるというシンプルなシステムです。
Kafkaの管理者であるあなたはこのシステムを実現するうえで、Kafka brokerのディスク使用量が増えすぎないように前述の"deleteモード"でログ保持期限半日としてaddressトピックを作りました。
しかしあなたは後で気づきました。実は別の部門が管理するKVSはメンテナンスのため半年に一回丸一日停止するらしいのです。
さて困りました。KVSのメンテナンス時でもフロントエンドを停止することはできません。KVSの停止時間は一日です。あるユーザAさんの最後のアドレス変更タイミングがKVSを停止してから半日以上たってしまった場合、このデータはKVSに書き込まれる前に保持期限を過ぎて削除され、データ損失となります。
仕方がない、ログの保持期限を半日から一日に伸ばすためにサーバディスクの増設を上司にお願いしよう・・・と席を立とうとしてあなたは思いました。
このシステムは同じユーザのアドレス変更が多く、ユーザ数自体は少ない(例えばの話です)。KVSの復旧のために必要なデータは各ユーザの最新の更新データだけなので、残しておくべきデータは少ない。残しておかなくていいデータが大量にあるせいでサーバディスクを増設しなければいけないのはもったいない・・・

“deleteモード"ではこの場合の「残しておかなくていいデータ」をうまいこと消せないのです。
そして"compactモード"はこの「残しておかなくていいデータ」だけうまいこと消してくれるモードです。

compactモード

compactモードはパーティションに保持されるログのサイズや時間を指定して超過分のセグメントを削除するのではなく、
(同じパーティションの中で)同じkeyを持つレコードの内最新以外を削除するモードです。
deleteモードと違い、compactモードではセグメントそのものは削除されずサイズが減って瘦せたようなセグメントになっていきます。
ここで重要なことは、compactionにより削除されなかったレコードのオフセットはcompaction後に繰り上がったりせず、変わらないということです。
そのためconsumerが読んでいる間にcompactionが走ってもconsumerオフセットの齟齬が起こらず、急にconsumerが読み飛ばしてしまったり逆に巻き戻って同じデータを読んだりすることはありません。パーティション内のデータの順番がcompactionの前後で変わることもありません。

compactionはいつどうやって起こる?どのデータが対象になる?

broker設定項目 “log.cleaner.enable” の値が true のとき(デフォルトtrueです)、topic設定項目 “cleanup.policy” の値が “compact” になっているトピックに対して実行されます。
ちなみに “cleanup.policy” の値を “compact,delete” とカンマ区切りで続けることによってcompactモードとdeleteモードの併用ができます。
brokerプロセス中のバックグラウンドスレッドが、compaction対象のログがないかチェックして回っています。
compaction対象のログが無い場合broker設定項目 “log.cleaner.backoff.ms” 時間のスリープが入るため、compactionを期待してから実行されるまで最大この時間かかる可能性があります。
compaction対象になるデータは以下の条件を全て満たしたものです。

  • トピック:topic設定項目 “cleanup.policy” の値が “compact”
  • パーティションパーティションの全ログ中の内、未compactionのデータがトピック設定項目 “min.cleanable.dirty.ratio” の割合以上存在する
  • セグメント:セグメントはアクティブセグメントではない
  • レコード:レコードは書き込まれてからtopic設定項目 “min.compaction.lag.ms” 以上の時間が経過している

本当にcompactionが走るか試してみようと自分でテストするとき引っかかりそうなのはセグメントの条件です。
“min.cleanable.dirty.ratio” はデフォルト0.5, “min.compaction.lag.ms” はデフォルト0なのであまり気にしないでも “cleanup.policy” さえ “compact” にして同じkeyでvalueが変わるレコードをいくらかproduceして、"log.cleaner.backoff.ms" のデフォルトである15秒待てば compaction が走るはず、と思ってearliestオフセットからconsumeすると全部データが読めてしまう。
こういう場合はセグメント条件を考慮していないからかもしれません(僕は引っかかった)。
アクティブセグメントとは今書き込みが発生しているセグメントのことで、このセグメントはcompactionの対象になりません。
トピック設定項目 “segment.ms” を短くしてセグメントを頻繁にロールするようにしてやるとテストがしやすいかと思います。

compactionはバックグラウンドスレッドがアクティブでないセグメントをコピーして削除対象のレコードを削除したセグメントにした上でファイルに書き出し、両者を直ちにスワップすることで実現されます。
この処理に使うリソースはbroker設定項目で制限することができ(デフォルトで制限されています)、brokerの性能に影響を与えてproduceやconsume処理が滞るようなことを避けることができます。

ひとくちメモ

実はcompactモードはkeyの最新レコードが残り続けるため、keyのユニーク数が増えていくデータではbrokerにログが永遠に増えていきます。
keyのユニーク数の増え方がわずかなものならそれでもいいのですが、増え方が激しい場合はディスクをいつか食い尽くしてしまうかもしれません。
アプリケーション仕様で、keyが最新だったとしても書き込まれてからこれだけ時間がたったら削除してもよい、ということが許容できるのであればcompactモードとdeleteモードを併用するのが良いです。

 

ふたくちメモ

Kafkaの文脈では「レコード」・「セグメント」・「ログ」という用語は以下の定義で言及しています(Kafkaの公式ドキュメント通り)。

  • レコード:データ1個1個を指しています。RDBで言うところのタプル、「行」のイメージです。
  • セグメント:Brokerの各パーティションディレクトリ配下に書き込まれるファイルそのものです。"オフセット番号.log" というファイル名で確認することができます。1セグメントとは1ファイルのことです。
  • ログ:パーティション毎のセグメント群のことを指しています。パーティションをまたいだ場合はログとは言いません。

*1:これを書いたときはバージョンは0.10.1.1で、そのときのドキュメントを見ながら書いていました。