LIFULL Creators Blog

LIFULL Creators Blogとは、株式会社LIFULLの社員が記事を共有するブログです。自分の役立つ経験や知識を広めることで世界をもっとFULLにしていきます。

Kubernetesクラスタの可観測性の隙間を埋めるeBPF

KEELチームの相原です。

今回はeBPFを利用してKubernetesクラスタの可観測性の隙間を埋めている話です。

前回のエントリではLLMにうつつを抜かしていたので本業(?)の話をしようと思います。

www.lifull.blog

LIFULLの可観測性の現在地

はじめに、LIFULLの可観測性の現在地について軽く書きます。

可観測性にはPrimary Signalsと呼ばれるLogs, Metrics, Tracesの3つの指標があり、我々が開発するKubernetesベースの内製PaaSであるKEELにはそれぞれに対応するプラットフォームが構築されています。

github.com

それぞれGrafana Loki, Thanos, Grafana Tempoを採用しており、Grafanaで横断的に閲覧可能です。

加えてContinuous ProfilingのためにPyroscopeも構築されており、Logs, Metrics, Traces, Profilesと4つの指標をプラットフォームとしてサポートしています。

(ここまで来ると全てGrafana製品で統一したいですが、Thanosはかれこれ5年以上運用していて十分に実績があるのでアーキテクチャにそれほど違いがないこともありGrafana Mimirへの移行は検討中です)

Logsはアプリケーションの標準出力・標準エラーに加えてService Meshのレイヤで取得した共通フォーマットのアクセスログを集めていて、TracesとProfilesはそれぞれ我々が管理する共通のアプリケーションフレームワークに事前に組み込まれているOpenTelemetryとPyroscope SDKによって自動で収集しています。

Metricsも同様にOpenTelemetryで取得していますが、その他にもアクセスログから集計したURIごとのレイテンシ・サクセスレートをfluentdで出力していたり、拙作のkube-trivy-exporterを使ってアプリケーションの脆弱性情報を収集していたりCore Web Vitalsを計測したりとPrometheus Exporterを適宜作りながらあらゆる情報を集めています。

クラスタレベルだとprometheus/node_exporterkubernetes/kube-state-metrics, kubernetes/node-problem-detectorの他に、Podごとの利用料を按分するPrometheus Exporterなどがあり、内製PaaSの利用者は様々な事象を観測できるようになっています。

しかし、これだけやっていてもまだ観測できないものがあります。LinuxカーネルレイヤのMetricsです。

そしてそれはeBPFを利用することで取得可能です。

eBPFとは

既にeBPFの説明はありふれていますが軽く説明しておきます。

eBPFとはカーネル空間で安全にプログラムを実行するためのサンドボックス技術です。

eBPF is a revolutionary technology that can run sandboxed programs in the Linux kernel without changing kernel source code or loading a kernel module.

ebpf.io

サンドボックス内で実行されるC言語のプログラムをBPFプログラムと呼ぶことが多いです。

BPFプログラムはイベント駆動でネットワーク上のイベントやカーネル上のイベントなどを起点として実行されます。

カーネル上のイベントは主に事前にLinuxカーネル上に定義されたフックポイントであるTracepointsの他に、カーネル空間の任意の関数の実行にフックを仕込むことのできるKprobesが利用できます。

Kprobesは任意の関数に仕込んでなんでもできる一方でカーネルのバージョンアップによって関数名が変わった際などに追従することが難しく、Tracepointsは事前に定義されているためカーネルのバージョンアップに左右されないものの定義されていない場所を起点に発火させることができないといった違いがあります。

eBPFはMapsというデータ構造を持っていてこれでユーザ空間と状態を共有できるため、KprobesやTracepointsをもとに発火したBPFプログラムでMetricsを収集し、Mapsを通してユーザ空間に出力することでLinuxカーネルレイヤのMetricsを観測できるようになります。

可観測性の隙間

では実際に観測したいLinuxカーネルレイヤのMetricsとはなんでしょうか。

例えばどんなMetricsが取れるかを知りたい場合はiovisor/bcc#toolsがお勧めです。

bccとは詳しくは後述しますが、BPF Compiler Collectionの略でeBPFを簡単に実行するための仕組みです。

同時に様々なeBPFを利用したツールも提供されていて、bcc: General Perfomance Checklistを見たことある方はいらっしゃるのではないでしょうか。

この中のうちあなたが管理するシステムの潜在的な問題にまつわるものが観測したいMetricsとなるわけですが、ここではLIFULLでの分かりやすい例を一つ紹介したいと思います。

私達が観測したいLinuxカーネルレイヤのMetricsの一つは、Kubernetesクラスタ内からのある接続先に対するプロセスごとの接続回数 でした。

なぜそんなMetricsを取得したいかを説明するためにはまずNAT Loopback(hairpinning)ついて説明する必要があります。

NAT Loopback

NAT Loopbackとはhairpinningとしても知られる機能で、NAT環境下においてLAN内のクライアントが自身に対してWANからアクセスする際にその通信をループバックさせるというものです。

これは利用しているルータやロードバランサによっては対応していないことがあり、実際にAWSのNetwork Load Balancerは Preserve client IP addresses を有効にしているとNAT Loopbackは機能せず接続がタイムアウトしてしまうということが知られています。

docs.aws.amazon.com

Kubernetesクラスタにおいてはクラスタ前段にIngress Controllerに紐づいた Type: LoadBalancer なNetwork Load Balancerを立ててクラスタ外のリクエストを受けるというものはよくあるパターンです。

この時、クラスタ内のPodからそのNetwork Load Balancerに接続してしまうとタイムアウトしてしまう可能性があるということになります。

KubernetesクラスタとしてもLAN内であればKubernetesのサービスディスカバリを使って接続した方がレイテンシが低いため、プラットフォーマーとしては Kubernetesクラスタ内からIngress Controllerに紐づいたNetwork Load Balancerに対して接続しているクライアント を検知する必要があります。

全てのPodにService Meshが入っていれば検知可能でしょうし、パケットキャプチャでもクライアントの存在自体は検知可能です。

しかし歴史的理由から私達のIstioは一部導入できていないアプリケーションがあったり、Kubernetesクラスタには複数のアプリケーションが載っているためクライアントの存在を検知できただけでは不十分でクライアントの特定まで行う必要があります。

そこでeBPFでTCPの接続処理にあたる tcp_v4_connect/tcp_v6_connect をフックして検知をしようということになりました。

(eBPFを利用すればパケットの向き先を勝手に変えてしまうこともできますが今回は検知のお話をします)

eBPFを実行するには

さて、それではeBPFを動かすにはどうしたらいいでしょうか。

eBPFはサンドボックス化されたVM上でBPFプログラムを実行することで安全性を担保しているため、そのVMが解釈できるバイトコードにBPFプログラムをコンパイルして実行する必要があります。

そこでよく使われていたものが先ほど紹介したBPF Compiler Collection、bccです。

bccはeBPFを簡単に実行するための仕組みで、bccをライブラリとして利用したソフトウェアを実行すると、ClangをフロントエンドとしたLLVMでBPFプログラムをコンパイルし成果物のバイトコードをVMにロードしてeBPFが実行されます。

これにより利用者はBPFプログラムだけを書けば簡単にeBPFを動かすことができるといったわけです。

しかし、ご存じの通りClangは重いバイナリですし実行時にコンパイルするというアプローチは実行時のオーバーヘッドを伴います。

監視対象のサーバの台数分だけClangをインストールしてコンパイルしてとなると支払うコストが大きくなるためプロダクション環境に手放しに導入できるものではありません。

"よく使われていた"とbccを過去形で紹介しましたが、現在はその問題を解決するためにBPF CO-REという仕組みがあります。

BPF CO-RE

BPF CO-REの説明もわざわざここでしなくても感がありますが一応簡単にしておきます。

BPF CO-REはBPF Compile Once - Run Everywhereの略で、その名の通りコンパイルを一度だけすれば成果物のバイナリをどこででも動かすことができるというものです。

詳細な仕組みについては省きますが、libbpf/libbpfというBPF CO-REをサポートしたライブラリを使うことで利用できます。

libbpfはC言語向けのライブラリですが、libbpf/libbpf-rsというRustバインディングも公式に提供されているためRustでも開発可能です。

我々KEELチームはproxy-wasmでEnvoyの拡張を書く際にもRustを利用しているため、ここからはlibbpf-rsを使って Kubernetesクラスタ内からIngress Controllerに紐づいたNetwork Load Balancerに対して接続しているクライアント を検知する方法を説明してきます。

今回はユーザ空間でも多少処理が必要となるためRustで書いた方が無難でしょう。

2021年当時はいくつかlibbpf-rsに不足している機能がありましたが今はlibbpfと遜色なく利用できるようになりました。

libbpf-rsを利用したNAT Loopbackの検知

まずは大まかな設計を決めましょう。

改めて、今回実現したいことは Kubernetesクラスタ内からIngress Controllerに紐づいたNetwork Load Balancerに対して接続しているクライアント の検知です。

この仕組みは他にも"退役予定のデータストアにクエリしているクライアントの洗い出し" などにも使えるため、今のNetwork Load BalancerはIPアドレスが変わらなくなりましたがDNSベースで汎用的に作ってみます。

大まかな処理の流れは以下といったところでしょうか。

  1. (ユーザ空間) コマンドライン引数として受け取ったDNSをTTLごとに名前解決してIPアドレスを取得する
  2. (ユーザ空間) IPアドレスに変化がある度にカーネル空間で動くBPFプログラムにそのIPアドレスのリストを渡す
  3. (カーネル空間) Kprobesで tcp_v4_connect/tcp_v6_connect にフックを仕込む
  4. (カーネル空間) 受け取ったIPアドレスに対する tcp_v4_connect/tcp_v6_connect があればユーザ空間に対してその実行元のプロセスIDとコマンド名を返す
  5. (ユーザ空間) カーネル空間から受け取ったプロセスIDからKubernetes上のコンテナIDを取得する
  6. (ユーザ空間) 得られたコンテナIDとコマンド名とともに接続先をPrometheusのMetricsとして公開する

今回はクライアントの特定まで行う必要があるため、プロセスIDからKubernetes上のコンテナIDを取得して公開することで、コンテナIDからPodを特定できるようにしています。

最終的にこのソフトウェアをKubernetes上にDaemonSetとしてデプロイするイメージです。

順番に見ていきます。

1. (ユーザ空間) コマンドライン引数として受け取ったDNSをTTLごとに名前解決してIPアドレスを取得する

ここは本筋ではないのでさらっと流します。

こちらが今回開発するソフトウェアのエントリポイントとなる main.rs です。

ご覧の通り、当然普通のRustアプリケーションとして開発できます。

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
    let args: Args = Args::parse();

    <snip>

    let mut handles = vec![];

    let ns = args.nameserver.parse::<std::net::SocketAddr>()?;
    let conn = trust_dns_client::udp::UdpClientStream::<tokio::net::UdpSocket>::with_timeout(
        ns,
        std::time::Duration::from_secs(5),
    );
    let (client, bg) = trust_dns_client::client::AsyncClient::connect(conn).await?;
    handles.push(tokio::spawn(bg));

    let (tx, mut rx): (
        tokio::sync::mpsc::UnboundedSender<IPMap>,
        tokio::sync::mpsc::UnboundedReceiver<IPMap>,
    ) = tokio::sync::mpsc::unbounded_channel();
    let hm = std::sync::Arc::new(futures::lock::Mutex::new(IPMap::new()));

    for host in args.hosts {
        for record_type in [
            trust_dns_client::rr::RecordType::A,
            trust_dns_client::rr::RecordType::AAAA,
        ] {
            let mut cloned_client = client.clone();
            let cloned_hm = std::sync::Arc::clone(&hm);
            let cloned_tx = tx.clone();
            let host = host.clone();
            handles.push(tokio::spawn(async move {
                let name = trust_dns_client::rr::Name::from_str(&host).unwrap();
                let mut cache = IPCache::new();
                loop {
                    let response: trust_dns_client::op::DnsResponse = cloned_client
                        .query(
                            name.clone(),
                            trust_dns_client::rr::DNSClass::IN,
                            record_type,
                        )
                        .await
                        .unwrap();
                    let answers: &[trust_dns_client::rr::Record] = response.answers();
                    let mut max_ttl = 0;

                    match record_type {
                        trust_dns_client::proto::rr::RecordType::A => {
                            let mut new = vec![];
                            for record in answers {
                                if record.ttl() > max_ttl {
                                    max_ttl = record.ttl();
                                }
                                if let Some(trust_dns_client::proto::rr::RData::A(ref ip)) =
                                    record.data()
                                {
                                    new.push(u32::swap_bytes((*ip).into()))
                                }
                            }
                            new.sort();

                            let default = vec![];
                            let old = cache.ipv4.get(&host).unwrap_or(&default);
                            if old != &new {
                                let mut hm = cloned_hm.lock().await;
                                if let trust_dns_client::proto::rr::RecordType::A = record_type {
                                    for ip in old {
                                        hm.ipv4.remove(ip);
                                    }
                                    for ip in new.iter() {
                                        hm.ipv4.insert(*ip, host.clone());
                                    }
                                }
                                cloned_tx.send(hm.clone()).unwrap();

                                cache.ipv4.insert(host.clone(), new);
                            }
                        }
                        <snip>
                        _ => {
                            continue;
                        }
                    }

                    if max_ttl > 60 {
                        tokio::time::sleep(std::time::Duration::from_secs(max_ttl as u64)).await;
                    } else {
                        tokio::time::sleep(std::time::Duration::from_secs(60)).await;
                    }
                }
            }));
        }
    }

    <snip>

    Ok(())
}

処理内容は単純で、行儀よくTTLごとに名前解決をしながらIPアドレスに変更があればそれをチャネルで送信しています。

実際には AAAA レコードの実装もしてIPv6に対応する必要がある点にご注意ください。

エラーハンドリングも省略しているので必要に応じて修正する必要があります。

2. (ユーザ空間) IPアドレスに変化がある度にカーネル空間で動くBPFプログラムにそのIPアドレスのリストを渡す

次は本題となるカーネル空間との接合部分です。

libbpf-rs 周辺のエコシステムには libbpf-cargo というBPFプログラムからRustのスケルトンをビルド時に生成してくれるツールがあります。

以下のような build.rs を書いておくと、

fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
    libbpf_cargo::SkeletonBuilder::new()
        .source("src/bpf/connect.bpf.c")
        .build_and_generate(std::path::Path::new("src/bpf/skel.rs"))?;
    Ok(())
}

src/bpf/skel.rs が生成されて src ディレクトリ内でこのように利用できるというものです。

mod skel {
    include!("bpf/skel.rs");
}

そうすると skel モジュール以下に *Builder が生えてくるのでこれを使ってカーネル空間で動くBPFプログラムにそのIPアドレスのリストを渡していきましょう。

use skel::*;

unsafe impl plain::Plain for connect_bss_types::event {}

pub fn watch(
    map: crate::IPMap,
    stop: std::sync::Arc<std::sync::atomic::AtomicBool>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
    let builder = ConnectSkelBuilder::default();
    let mut open = builder.open()?;

    let v4_keys = map.ipv4.keys();
    let mut v4_keys_array: [u32; 16] = [0; 16];
    let v4_keys_len = v4_keys.len();
    for (i, key) in v4_keys.enumerate() {
        v4_keys_array[i] = *key;
    }
    open.rodata().tool_config.daddr_v4 = v4_keys_array;
    open.rodata().tool_config.daddr_v4_len = v4_keys_len as u32;

    let mut load = open.load()?;
    load.attach()?;

    <snip>
}

この watch 関数はチャネルから送られてきた crate::IPMap を受け取ってBPFプログラムとやり取りをするというものです。

BPFプログラムに値を渡すためには、先に説明したMapsの他に .rodata セクションを利用できます。

open.rodata().rodata セクションに書き込まれた値はC言語のBPFプログラムで const として参照できるというものです。(感覚的には逆に思いますがそういうものみたいです)

本来IPアドレスのリストは動的に変化するためReadOnlyな .rodata セクションではなくMapsが望ましいですが今回は単純化して .rodata セクションを利用しています。

(Arc<AtomicBool>stop という変数でIPMapに変更があった際に古いIPMapを持った watch を止めるみたいなことをイメージしています)

そして load.attach() でBPFプログラムをカーネルにロードしたらようやくBPFプログラムです。

3. (カーネル空間) Kprobesで tcp_v4_connect/tcp_v6_connect にフックを仕込む

今回フックしたい tcp_v4_connect/tcp_v6_connect には事前定義されたTracepointsがないためKprobesを使います。

メインの処理はこのようになります。

SEC("kprobe/tcp_v4_connect") int BPF_KPROBE(tcp_v4_connect, struct sock *sk, struct sockaddr *uaddr, int addr_len) {
    u64 __pid_tgid = bpf_get_current_pid_tgid();
    gid_t tgid = __pid_tgid >> 32;
    pid_t pid = __pid_tgid;

    bpf_map_update_elem(&sockets, &pid, &sk, 0);
    return 0;
}

SEC("kretprobe/tcp_v4_connect") int BPF_KRETPROBE(tcp_v4_connect_ret, int ret) {
    u64 __pid_tgid = bpf_get_current_pid_tgid();
    gid_t tgid = __pid_tgid >> 32;
    pid_t pid = __pid_tgid;

    struct sock **skpp = bpf_map_lookup_elem(&sockets, &pid);
    if (!skpp) {
        return 0;
    }

    if (ret) {
        goto end;
    }

   <snip>
}

Kprobesには kprobekretprobe という2つのエントリがありそれぞれ関数の開始と終了に紐づいています。

この実装では tcp_v4_connect 関数の開始と終了をフックしているというわけです。

関数が呼び出された時点では実際に接続が行われたかどうかは判断できないため、終了時にMetricsを送信したいところですが kretprobe では終了ステータスしか取ることができません。

そのため、kprobebpf_map_update_elem で引数をMapsで保持しつつ kretprobebpf_map_lookup_elem でそれを取り出して処理をします。

実際には tcp_v6_connect の実装もしてIPv6に対応する必要がある点にご注意ください。

kprobe で保存する引数は実際にLinuxカーネルの関数のシグネチャと一致している必要があり、それを調べるためにはLinuxクロスリファレンスがお勧めです。

いくつか候補がありますが私は https://elixir.bootlin.com/ を使っていて、関数名で検索するとこのように定義元にジャンプできます。(この時カーネルのバージョンによる差異に注意する必要があります)

https://elixir.bootlin.com/linux/v6.6.1/source/net/ipv4/tcp_ipv4.c#L201

4. (カーネル空間) 受け取ったIPアドレスに対する tcp_v4_connect/tcp_v6_connect があればユーザ空間に対してその実行元のプロセスIDとコマンド名を返す

以下は kretprobe の完全版です。

bpf/bpf_helpers.hbpf/bpf_core_read.h にヘルパ関数が色々入っているのでそれらを使いながら必要な情報を取り出しています。 詳細な説明は省きますが関数名からなんとなく雰囲気はつかめるはずです。

#include "../../vmlinux.h"
#include <bpf/bpf_helpers.h>
#include <bpf/bpf_core_read.h>
#include <bpf/bpf_tracing.h>

SEC("kretprobe/tcp_v4_connect") int BPF_KRETPROBE(tcp_v4_connect_ret, int ret) {
    u64 __pid_tgid = bpf_get_current_pid_tgid();
    gid_t tgid = __pid_tgid >> 32;
    pid_t pid = __pid_tgid;

    struct sock **skpp = bpf_map_lookup_elem(&sockets, &pid);
    if (!skpp) {
        return 0;
    }

    if (ret) {
        goto end;
    }

    struct sock *sk = *skpp;

    u32 daddr_v4 = BPF_CORE_READ(sk, __sk_common.skc_daddr);
    if (!filter_daddr_v4(daddr_v4)) {
        goto end;
    }

    uid_t uid = bpf_get_current_uid_gid();
    struct event event = {
        .tgid = tgid,
        .pid = pid,
        .uid = uid,
        .protocol = ipv4,
    };

    BPF_CORE_READ_INTO(&event.daddr_v4, sk, __sk_common.skc_daddr);

    bpf_get_current_comm(event.comm, sizeof(event.comm));
    bpf_perf_event_output(ctx, &events, BPF_F_CURRENT_CPU, &event, sizeof(event));
end:
    bpf_map_delete_elem(&sockets, &pid);
    return 0;
}

sock のメンバも同様にLinuxクロスリファレンスで検索して確認することができ、__sk_common.skc_daddr で向き先のIPアドレスを取得できます。

https://elixir.bootlin.com/linux/v6.6.1/source/include/net/sock.h#L357

filter_daddr_v4 関数は先ほど .rodata セクション経由で渡した tool_config を使いながら対象のIPアドレスへの接続をフィルタリングする関数です。

const volatile struct {
    u32 daddr_v4[ADDR_LEN];
    u32 daddr_v4_len;
    u8 daddr_v6[ADDR_LEN][16];
    u32 daddr_v6_len;
} tool_config;

static __always_inline bool filter_daddr_v4(u32 daddr) {
    if (tool_config.daddr_v4_len == 0) {
        return true;
    }

    for (int i = 0; i < tool_config.daddr_v4_len; i++) {
        if (daddr == tool_config.daddr_v4[i]) {
            return true;
        }
    }
    return false;
}

bpf_perf_event_outputBPF_MAP_TYPE_PERF_EVENT_ARRAY というリングバッファのMapsを使ってユーザ空間に値を送信するための関数で、Mapsは以下のように events として定義されています。

SEC(".maps") struct {
    __uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);
    __uint(key_size, sizeof(u32));
    __uint(value_size, sizeof(u32));
} events;

これを使うことでユーザ空間に対して実行元のプロセスIDやコマンド名を event という構造体に詰めて返すことができます。

eBPFで利用できるリングバッファには BPF_MAP_TYPE_RINGBUF もありますがLinuxカーネルのバージョンが5.8以上でないと利用できず、例えばUbuntu 20.04とかでは利用できないためご注意ください。(今回紹介している事例は2021年のものであるため BPF_MAP_TYPE_PERF_EVENT_ARRAY を利用していました)

sockets というMapsは kprobekretprobe の間で引数を持ち回すためだけのものなので用が済んだら中身を削除しています。

5. (ユーザ空間) カーネル空間から受け取ったプロセスIDからKubernetes上のコンテナIDを取得する

プロセスIDからコンテナIDを取得する方法は少なくとも2021年時点ではあまり情報がなかった記憶があるので説明しておきます。 コンテナランタイムはcri-o想定です。

コードを見ていただくと早いでしょう。

use std::io::BufRead;

pub struct Metadata {
    container_id: String,
}

pub fn from_pid(pid: i32) -> Option<Metadata> {
    let var = std::env::var("PROCFS_PATH");
    let path = if let Ok(ref path) = var {
        std::path::Path::new(path)
    } else {
        std::path::Path::new("/proc")
    };
    let cgroup = path.join(pid.to_string()).join("cgroup");

    if let Ok(file) = std::fs::File::open(cgroup) {
        let mut reader = std::io::BufReader::new(file);
        let mut buf = String::new();
        let _ = reader.read_line(&mut buf);
        return buf
            .trim_end()
            .split(':')
            .last()
            .and_then(extract_container_id)
            .map(|container_id| Metadata { container_id });
    }
    None
}

enum CgroupDriver {
    Cgroupfs,
    Systemd,
}

fn detect_cgroup_driver<T: AsRef<str>>(cgroup_path: T) -> CgroupDriver {
    if cgroup_path.as_ref().starts_with("/kubepods.slice") {
        // https://github.com/kubernetes/kubernetes/blob/v1.26.1/pkg/kubelet/cm/cgroup_manager_linux.go#L82
        CgroupDriver::Systemd
    } else {
        // https://github.com/kubernetes/kubernetes/blob/v1.26.1/pkg/kubelet/cm/cgroup_manager_linux.go#L111
        CgroupDriver::Cgroupfs
    }
}

fn extract_container_id<T: AsRef<str>>(cgroup_path: T) -> Option<String> {
    // https://github.com/kubernetes/kubernetes/blob/v1.26.1/pkg/kubelet/cm/node_container_manager_linux.go#L40
    if !cgroup_path.as_ref().starts_with("/kubepods") {
        return None;
    }

    match detect_cgroup_driver(&cgroup_path) {
        // https://github.com/cri-o/cri-o/blob/v1.26.1/internal/config/cgmgr/cgroupfs.go#L65
        CgroupDriver::Cgroupfs => cgroup_path
            .as_ref()
            .split('/')
            .last()
            .map(|s| s.to_string()),
        // https://github.com/cri-o/cri-o/blob/v1.26.1/internal/config/cgmgr/systemd.go#L80
        CgroupDriver::Systemd => cgroup_path
            .as_ref()
            .split('/')
            .last()
            .and_then(|unit| unit.trim_end_matches(".scope").split('-').last())
            .map(|s| s.to_string()),
    }
}

基本的にはprocfsからcgroupの情報にアクセスして、cgroupドライバに応じて判断するという流れになっています。

cgroupのパスの中にコンテナIDが含まれているのでそれを取り出すだけです。

このソフトウェアはDaemonSetとしてKubernetesクラスタにデプロイすることを想定しており、その際にPodにはホストのprocfsをマウントする必要があるため環境変数 PROCFS_PATH からprocfsのマウントポイントを受け取れるようにしています。

6. (ユーザ空間) 得られたコンテナIDとコマンド名とともに接続先をPrometheusのMetricsとして公開する

BPF_MAP_TYPE_PERF_EVENT_ARRAY から送信されてきた値は、ユーザ空間では libbpf_rs::PerfBufferBuilder のコールバックとして取得できます。

use skel::*;

unsafe impl plain::Plain for connect_bss_types::event {}

pub fn watch(
    map: crate::IPMap,
    stop: std::sync::Arc<std::sync::atomic::AtomicBool>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
    <snip>

    let meter = opentelemetry::global::meter("connectracer");
    let counter = meter.u64_counter("connect_total").init();

    let buffer = libbpf_rs::PerfBufferBuilder::new(load.maps_mut().events())
        .sample_cb(move |_cpu: i32, data: &[u8]| {
            let mut event = connect_bss_types::event::default();
            plain::copy_from_bytes(&mut event, data).expect("Data buffer was too short");

            if let Some(host) = match event.protocol {
                connect_bss_types::protocol::ipv4 => map.ipv4.get(&event.daddr_v4),
                connect_bss_types::protocol::ipv6 => {
                    map.ipv6.get(&u128::from_be_bytes(event.daddr_v6))
                }
            } {
                let command = if let Ok(s) = std::str::from_utf8(&event.comm) {
                    s.trim_end_matches(char::from(0))
                } else {
                    ""
                };

                let mut attributes = vec![
                    opentelemetry::KeyValue::new("host", host.clone()),
                    opentelemetry::KeyValue::new("command", command.to_string()),
                ];

                if let Some(metadata) = crate::metadata::kubernetes::from_pid(event.pid) {
                    let mut m = metadata.into();
                    attributes.append(&mut m);
                }

                counter.add(&opentelemetry::Context::current(), 1, &attributes);
            }
        })
        .build()?;

    <snip>
}

(先ほどのKubernetesの Metadata は以下のようなFromトレイトを実装しているため、そのまま metadata.into() できます)

impl From<Metadata> for Vec<opentelemetry::KeyValue> {
    fn from(metadata: Metadata) -> Self {
        vec![opentelemetry::KeyValue::new(
            "container_id",
            metadata.container_id,
        )]
    }
}

PrometheusのMetricsとして公開するためにはOpenTelemetryを利用するとして、あとは取得したコンテナIDとともにインクリメントするだけです。

コンテナIDさえ取得できてしまえばkubernetes/kube-state-metricsが出力する kube_pod_container_info と組み合わせて以下のようなクエリでPodと紐づけることができるため、ここではそれ以上のことはしません。

tcp_v4_connect_total * on(container_id) group_left(namespace, pod) label_replace(kube_pod_container_info{container_id!=""}, "container_id", "$2", "container_id", "(.+)://(.+)")

最後に

少し長くなってしまいましたが、あとはOpenTelemetryのregistryのMetricsを公開するサーバを書けば、晴れてeBPFによる Kubernetesクラスタ内からIngress Controllerに紐づいたNetwork Load Balancerに対して接続しているクライアント の検知が完成です。

このように、eBPFを利用することでKubernetesクラスタの可観測性の隙間を埋めることができました。

コンテナIDの取得など、実際にKubernetesクラスタで利用するイメージもついたのではないでしょうか。

(一部のbccベースのトレーシングツールと異なり)ユーザ空間のリソース消費は非常に軽微で、このソフトウェアの場合はメモリ使用量が6MB未満程度でCPUも処理内容をご覧の通りほとんど使わないためご安心ください。

BPF CO-REで可搬性のあるバイナリにすることでbccの時にあったClangへの依存や実行時コンパイルを取り払うことができ、プロダクション環境でも比較的気軽にeBPFを導入できます。

eBPFは kretprobe で返り値を上書きできたりと副作用があったり、パフォーマンスのオーバーヘッドも0ではないため導入には慎重になるべきですが、実際に数年のプロダクション環境での運用の中で今のところ問題は発生していません。

(LIFULLでは kretprobe で返り値が上書きできることを利用して簡単なCircuit Breakerの仕組みを準備していたりもします)

あわせて、Network Load BalancerのNAT Loopback問題についてもくれぐれもご注意ください。 性質上クラスタが巨大になるほど発生率が低くなるため、しっかり監視していないと謎のTail Latencyに悩まされることになります。

ブログを書くのをサボってしまいeBPFの旬はとっくに過ぎてしまった感がありますが、時に(当時の)最新技術を使いながらPlatform Engineeringすることに興味がある方がいれば是非こちらからお問い合わせください!

hrmos.co