LIFULL Creators Blog

LIFULL Creators Blogとは、株式会社LIFULLの社員が記事を共有するブログです。自分の役立つ経験や知識を広めることで世界をもっとFULLにしていきます。

KubernetesクラスタのPull型アプローチ由来のログ・メトリクス欠損を防いで信頼できる可観測性基盤をつくる

KEELチーム の相原です。

前回のエントリは「比較的安全にMCPサーバを動かす」でした。

www.lifull.blog

今回は信頼できる可観測性基盤を提供するべく、KubernetesクラスタにおけるPull型アプローチ由来のログ・メトリクス欠損と色々向き合った話を書きます。

Pull型アプローチとそのトレードオフ

まずPull型アプローチについて軽く触れておきます。

Pull型アプローチとは、ログやメトリクスをその持ち主がどこかに公開しておいて、FluentdやPrometheusなどの他の誰かに取得しに来てもらうアプローチを指します。

対となるPush型アプローチは逆に持ち主がログやメトリクスを直接対象に送りつけるというものです。

Push型アプローチはサーバ側で流量のコントロールが難しかったり、Rate Limit時などのリトライの責任がクライアント側に要求される一方で、Pull型アプローチはその点が単純という関係性にあります。

Kubernetesクラスタで素直にログとメトリクスを収集しようと思うと、大体Pull型のアプローチを採用することが多いのではないかと思います。

コンテナランタイムはコンテナの標準出力をファイルとして出力しているのでそれをFluentdやPromtailで収集し、メトリクスはPrometheus Exporterで公開してPrometheusがScrapeするといった感じです。

ここで考えたいリスクが、「Podが削除される時までに本当にそのログ・メトリクスは収集されているのか」というものです。

Kubernetes上でコンテナランタイムを司るkubeletは、Podの削除時にログのファイルを削除します。 Podが削除されれば当然Scrapeするためのエンドポイントも無効になるためメトリクスも収集できません。

削除直前に出力されたログやインクリメントされた Counter はどうなるのでしょうか。

恐らくそれらは欠損している可能性が高いです。 Fluentdは比較的すぐにログを読みますがそれでも読み込みが間に合わず欠損することはありますし、PrometheusのScrapeの間隔は一般に短くとも秒単位なので望みは薄いです。

kubeletにログファイルを削除される前にFluentdに読ませたい

まずはログの欠損から考えていきましょう。

最初に思いつく対処はkubeletがログを削除するまでに遅延を入れたり、削除を無効化にするといったことだと思います。

それはこの辺のIssuesで議論されていますが、今のところこれといった方法はありません。(Grafana AlloyとかはKubernetes API経由でのログ取得にも対応していますが、こちらの削除タイミングも同様のはずです)

github.com

github.com

かといって全てのPodにログ転送用のサイドカーをデプロイして送信というのも辛いのでどうにかプラットフォーム側で対処したいところです。

こんな時、eBPFはいつでも私達の銀の弾丸になってくれます。

(eBPFとは、という話は以前に書いたのでこちらをご覧ください)

www.lifull.blog

eBPFでunlinkat(2)を遅延実行させる

アプローチとしては、eBPFでkubeletが発行するファイル削除のunlinkat(2)をhookしたら bpf_override_return でファイルを削除することなく終了コードを偽装して返し、裏でFluentdが対象のファイルを読み切ったことを確認出来たら実際の削除を行うというものです。

Fluentdは *.pos ファイルで読み込んだファイルのバイト数を記録しているため、これを見ればファイルを読み切ったことを確認できます。

まずはこんな感じにunlinkat(2)をhookしましょう。 bpf_override_returnkprobeの一部の関数 からしか利用できないことに注意してください。

// /sys/kernel/debug/tracing/events/syscalls/sys_enter_unlinkat/format
SEC("tracepoint/syscalls/sys_enter_unlinkat") int sys_enter_unlinkat(struct trace_event_raw_sys_enter *ctx) {
    u64 __pid_tgid = bpf_get_current_pid_tgid();
    gid_t tgid = __pid_tgid >> 32;
    pid_t pid = __pid_tgid;

    struct task_struct *task = (struct task_struct *)bpf_get_current_task();
    u32 ppid = BPF_CORE_READ(task, real_parent, tgid);

    if (tool_config.this == tgid || tool_config.this == ppid) {
        return 0;
    }

    const char *pathname = (const char *)ctx->args[1];
    int flag = (int)ctx->args[2];

    // AT_REMOVEDIR
    // https://elixir.bootlin.com/linux/v6.10.6/source/include/uapi/linux/fcntl.h#L104
    if (flag & 0x200) {
        return 0;
    }

    struct arg arg = {
        .pathname = pathname,
    };

    bpf_map_update_elem(&args, &pid, &arg, BPF_ANY);
    return 0;
}

SEC("kprobe/" SYS_PREFIX "sys_unlinkat") int BPF_KPROBE(sys_unlinkat) {
    u64 __pid_tgid = bpf_get_current_pid_tgid();
    gid_t tgid = __pid_tgid >> 32;
    pid_t pid = __pid_tgid;

    struct arg *argp = bpf_map_lookup_elem(&args, &pid);
    if (!argp) {
        return 0;
    }

    int zero = 0;
    struct event *eventp = bpf_map_lookup_elem(&unlinkat_heap, &zero);
    if (!eventp) {
        goto end;
    }

    eventp->tgid = tgid;
    eventp->pid = pid;
    eventp->uid = bpf_get_current_uid_gid();
    eventp->pathname[0] = '\0';
    if (bpf_probe_read_user(eventp->pathname, sizeof(eventp->pathname), argp->pathname) < 0) {
        goto end;
    }

    u8 directory[DIRECTORY_MAX];
    if (bpf_probe_read_kernel(directory, sizeof(directory), tool_config.directory) < 0) {
        goto end;
    }

    if (!filter_directory(directory, eventp->pathname)) {
        goto end;
    }

    bpf_override_return(ctx, 0);

    bpf_perf_event_output(ctx, &events, BPF_F_CURRENT_CPU, eventp, sizeof(*eventp));
end:
    bpf_map_delete_elem(&args, &pid);
    return 0;
}

bpf_override_return(ctx, 0); によって即座にkubeletに返るので、これでファイルの削除処理をスキップすることができます。

unlinkat_heapPERCPU_ARRAY にしているのでnull byteを区切り文字にしています。 Kubernetesのノード上で実行することになり余計なunlinkat(2)も流れてくるため、filter_directory でcontainerdのログディレクトリ以外のイベントは無視しなければなりません。

if (tool_config.this == tgid || tool_config.this == ppid) は、ファイル削除を遅延実行する関係上unlinkat(2)を自身も実行するため、無限ループ防止のために hostPID: true における自身のpidを this として受け取ってスキップしています。

あとは std::ffi::CStr::from_bytes_until_nul でユーザ空間から pathname を取得して、*.pos ファイルと突合しながらFluentdが読み切るまで待機してファイルを削除するだけです。

そこそこ流量は多くなるので、mtimeを見て適当に *.pos ファイルはキャッシュしておきます。

while let Some(pathname) = unlink_rx.recv().await {
    let current_mtime = std::fs::metadata(&args.pos_file)
        .and_then(|m| m.modified())
        .unwrap_or(std::time::SystemTime::UNIX_EPOCH);

    let needs_refresh = pos_cache.read().await.mtime != current_mtime;
    if needs_refresh {
        let new_pos = parse_pos_file(&args.pos_file);
        let mut cache = pos_cache.write().await;
        cache.mtime = current_mtime;
        cache.entries = new_pos;
    }

    if let Some(offset) = pos_cache
        .read()
        .await
        .entries
        .get(&pathname)
        .map(|(offset, _)| *offset)
    {
        let path = std::path::Path::new(&pathname);
        if let Ok(metadata) = path.metadata()
            && metadata.len() != offset
        {
            let unlink_tx = unlink_tx.clone();
            tokio::spawn(async move {
                tokio::time::sleep(std::time::Duration::from_secs(
                    args.delayed_seconds,
                ))
                .await;
                if let Err(e) = unlink_tx.send(pathname.clone()) {
                    eprintln!("Failed to re-queue {}: {}", pathname, e);
                }
            });
            continue;
        }
    }

    let result = std::fs::remove_file(&pathname).or_else(|e| {
        if e.kind() == std::io::ErrorKind::IsADirectory {
            std::fs::remove_dir(&pathname)
        } else {
            Err(e)
        }
    });
    if let Err(e) = result {
        if e.kind() == std::io::ErrorKind::DirectoryNotEmpty {
            if let Ok(entries) = std::fs::read_dir(&pathname) {
                for entry in entries.flatten() {
                    let entry_path = entry.path().to_string_lossy().to_string();
                    if let Err(e) = unlink_tx.send(entry_path.clone()) {
                        eprintln!("Failed to queue {}: {}", entry_path, e);
                    }
                }
            }

            let unlink_tx = unlink_tx.clone();
            tokio::spawn(async move {
                tokio::time::sleep(std::time::Duration::from_secs(
                    args.delayed_seconds,
                ))
                .await;
                if let Err(e) = unlink_tx.send(pathname.clone()) {
                    eprintln!("Failed to re-queue {}: {}", pathname, e);
                }
            });
        } else {
            eprintln!("Failed to remove {}: {}", pathname, e);
        }
    }
}

kubeletが実行するGoの os.RemoveAll はファイルとディレクトリを区別せずにとりあえずunlinkat(2)してくるので、ユーザ空間側では中身を削除しながらディレクトリが空になるまで待機するようにしています。

本来はEISDIRを受け取ってからAT_REMOVEDIR付きのunlinkat(2)にフォールバックするんだと思いますが、今回はbpf_override_returnでEISDIRを握り潰してしまっているので、 os.RemoveAll 前提の気持ち悪さはありつつもこちらで削除まで責任を持ちます。 (ファイルのみが渡ってくる vfs_unlink を使えるといいんですが、ALLOW_ERROR_INJECTIONされておらずこちらは bpf_override_return が使えません)

また一つだけ注意点があって、containerdは /var/log/containers 以下にログファイルを作成しますが、実際にはこれは /var/log/pods へのシンボリックリンクとなっているため、unlinkat(2)が呼ばれる pathname とFluentdの *.pos ファイルが指すファイルが異なる可能性があります。

私はFluentd側を修正せず *.pos ファイルを読む時についでにシンボリックリンクも解決してしまいました。 この辺もあって *.pos ファイルはキャッシュしています。

fn parse_pos_file(path: &std::path::Path) -> std::collections::HashMap<String, (u64, u64)> {
    let mut pos = std::collections::HashMap::new();
    if let Ok(file) = std::fs::read_to_string(path) {
        for line in file.lines() {
            let mut parts = line.split_whitespace();
            if let (Some(path), Some(offset), Some(inode)) =
                (parts.next(), parts.next(), parts.next())
            {
                let resolved_path = std::fs::canonicalize(path)
                    .map(|p| p.to_string_lossy().to_string())
                    .unwrap_or_else(|_| path.to_string());
                pos.insert(
                    resolved_path,
                    (
                        u64::from_str_radix(offset, 16).unwrap_or_default(),
                        u64::from_str_radix(inode, 16).unwrap_or_default(),
                    ),
                );
            }
        }
    }
    pos
}

これで晴れてkubeletの実行するunlinkat(2)が握りつぶされ、ユーザ空間で安全にFluentdのin_tailを待ってから遅延削除されるようになりました。

同じようなアプローチはPromtailなどでも有効なはずです。

PrometheusがCounterのインクリメントをScrapeするまでPodを待機させたい

次はメトリクスの欠損です。

Gauge(UpDownCounterの実装がGaugeなこともありますが、用途としてのGauge)などのMetric typeは割とどうでもいいんですが、Counterなどは意外と重要なケースもあってインクリメントが欠損してしまうと困ることがあります。

LIFULLでは大きめのKubernetesクラスタをマルチテナントで運用しているということもあり、PrometheusがScrapeする間隔は30秒程度が限界で、Podが削除されるまでの30秒間のインクリメントが失われてしまうという問題がありました。

もっと短い間隔で運用できるのであれば、Kubernetes 1.29から利用できるようになった KEP-3960: Introducing Sleep Action for PreStop Hook で雑にsleepしてもいいですが、問答無用で30秒も待ってしまうとPodのロールアウトが遅くなってしまうためそうもいきません。

なるべくロールアウトに影響を与えないよう、きっちりScrapeされるまでを待てると理想です。

SIGTERMを受け取ったら次回のScrapeまで待機するプロキシを挟む

素直にはScrapeされたことを検知できないと思うので、間にプロキシを挟ことにしましょう。

アイデアはシンプルで、以下のようなPrometheusとPrometheus Exporterの間に挟んでScrape時刻を記録するプロキシを作ります。

Prometheus ExporterはPod削除時に送られてくるSIGTERMでそのままGraceful Shutdownされてしまうので、終了直前のメトリクスを cachedMetrics に入れておいてSingleHostReverseProxyのErrorHandlerでPrometheus Exporterがシャットダウンして疎通しなくなったらそれを返すようにしています。

type CachedResponse struct {
    Body        []byte
    ContentType string
}

var (
    lastScrape    atomic.Value
    terminating   atomic.Bool
    scrapeChan    chan struct{}
    cachedMetrics atomic.Pointer[CachedResponse]
)

targetURL, err := url.Parse(a.TargetURL)
if err != nil {
    return xerrors.Errorf("failed to parse target URL: %w", err)
}

proxy := httputil.NewSingleHostReverseProxy(targetURL)
proxy.ErrorHandler = func(w http.ResponseWriter, r *http.Request, err error) {
    if cached := cachedMetrics.Load(); cached != nil {
        w.Header().Set("Content-Type", cached.ContentType)
        w.WriteHeader(http.StatusOK)
        _, _ = w.Write(cached.Body)
        return
    }
    http.Error(w, http.StatusText(http.StatusServiceUnavailable), http.StatusServiceUnavailable)
}

server := &http.Server{
    Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
        lastScrape.Store(time.Now())
        if terminating.Load() {
            select {
            case scrapeChan <- struct{}{}:
            default:
            }
        }
        proxy.ServeHTTP(w, r)
    }),
}

そのためにこのプロキシはSIGTERMを受け取った時にPrometheus Exporterから最新のメトリクスを取得してからキャッシュし、 scrapeChan で待ち受けて次回のScrapeまで待機します。

SIGTERMの送信とEndpoint ControllerがEndpointを削除する処理は同時に行われるということは広く知られていて、LIFULLではこれへの対処のために全てのPodは終了時に数秒sleepするようにしています。

kubernetes.io

最新のメトリクスをSIGTERM時に取得する処理は、この設定とGraceful Shutdownの実装に依存している部分があることには注意したいです。

signal.Notify(quit, syscall.SIGTERM)
<-quit

ctx, cancel := context.WithTimeout(context.Background(), a.TerminationGracePeriod)
defer cancel()

t, ok := lastScrape.Load().(time.Time)
if ok && !t.IsZero() {
    timeSinceLastScrape := time.Since(t)

    if timeSinceLastScrape > a.ScrapeWaitThreshold {
        if request, err := http.NewRequestWithContext(ctx, http.MethodGet, a.TargetURL, nil); err == nil {
            if response, err := http.DefaultClient.Do(request); err == nil {
                defer func() {
                    _ = response.Body.Close()
                }()
                if response.StatusCode < 400 {
                    if body, err := io.ReadAll(response.Body); err == nil {
                        cachedMetrics.Store(&CachedResponse{
                            Body:        body,
                            ContentType: response.Header.Get("Content-Type"),
                        })
                    }
                }
            }
        }

        terminating.Store(true)

        select {
        case <-scrapeChan:
        case <-ctx.Done():
        }
    }
}

まとめるとこのプロキシは以下のように動きます。

  1. 常にPrometheusとPrometheus Exporterの間に入って最終Scrape時刻を記録しておく
  2. SIGTERMを受け取った時にScrapeWaitThreshold以内にScrapeされていなければ、Prometheus Exporterから最新のメトリクスを取得してキャッシュする
  3. 次回Scrape時にそのキャッシュからメトリクスを返して終了する

これでPod終了直前にインクリメントされたCounterなどの値がPrometheusにScrapeされず欠損する問題を解決できました。

あとはPrometheus Operatorを使っているなら PodMonitor を、PodのアノテーションによるService Discoveryを利用している場合は metadata.annotations をこのプロキシに向けるだけです。

LIFULLではPodのアノテーションによるService Discoveryを利用しているため、prometheus.io/wait: true と付与するとこのプロキシを注入するMutating Admission Webhookを開発して、このタイミングで prometheus.io/port もプロキシのポートに書き換えるようにしました。

これにより、利用者はPodアノテーションの付与をするだけで最小限のロールアウト遅延と引き換えに信頼性の高いメトリクスを得ることができます。

なお、KubernetesがSIGTERM送信後に諦めてSIGKILLを送るまでの spec.terminationGracePeriodSeconds のデフォルト値は30秒であるため、Scrape間隔が長いなどでこれを超えてプロキシが待機しなければならない場合はPodの spec.terminationGracePeriodSeconds を伸ばす必要もあります。

まとめ

このエントリではKubernetesクラスタの可観測性基盤でよく採用されるPull型アプローチのトレードオフと、そのトレードオフへの対処を紹介しました。

一部コミュニティでは問題視されているものの、素直に利用していると意外と見落としがちな問題だと思っていて、お手元のKubernetesクラスタの監視を見直すきっかけになれば幸いです。

メトリクスはともかくログは欠損してしまうとそれなりに問題があるはずで、特にCronJobとかは実行ログを出力してから比較的すぐPodが終了することが多く、 successfulJobsHistoryLimit, failedJobsHistoryLimit を0にしていると直前のログはほぼ欠損していると考えてよいでしょう。

最近はトレースのSpanにログを持たせてしまうことも増えましたが、依然ログは重要な可観測性のシグナルだと思うので欠損がないように運用していきたいところです。

Fluentdのバッファはちゃんと設計しましょうとか、Prometheusの冗長化とか、Grafana Lokiは max_chunk_age の値に応じて送信が遅延してしまった同一ストリーム内の古いログを捨ててしまうので、fluent-plugin-grafana-lokiにパッチ当てて該当エラーの場合にUnrecoverableErrorを返すことでFluentdのsecondaryに流して、別ストリームとして送信し直すことで取りこぼさないようにしましょうとか、Pull型アプローチ由来以外の欠損を防ぐ話はまた別の機会に書きたいと思います。

ご興味をお持ちいただけましたら、ぜひ以下のページもご覧ください。

hrmos.co

hrmos.co