LIFULL Creators Blog

LIFULL Creators Blogとは、株式会社LIFULLの社員が記事を共有するブログです。自分の役立つ経験や知識を広めることで世界をもっとFULLにしていきます。

小さい経路最適化ミドルウェアを実装してあらゆるAZ間通信を削減する

KEELチームの相原です。

前回のエントリは「LLMを利用したPlatform Engineering」でした。

www.lifull.blog

今回は、小さい経路最適化ミドルウェアを実装してAZ間通信を削減した話を書きたいと思います。

背景

我々KEELチームはKubernetsベースの内製PaaSであるKEELを開発しており、LIFULLのほとんどのサービスがこのKEEL上で動いています。

www.lifull.blog

そして、KEELは巨大なマルチテナントのKubernetesクラスタとしてAWSの複数のAvailability Zone(以下AZ)に展開されていて、多くのmicroservicesが互いに通信しあっています。

そのためAZ間通信はプラットフォームとして重要な関心事の一つです。 レイテンシやAWSのAZ間通信に対する課金を最小限に抑えるため、なるべくAZ間通信を減らしたいという要求があります。

我々のプラットフォームでは現在LLMのサポートを強めており、それに伴い扱うペイロードのサイズも増加傾向にあることからも対応の優先度が上がってきています。

www.lifull.blog

実際にKubernetesのTopology Aware Routingをプラットフォーム側で配布してデフォルトで有効にしたり、IstioのLocality Load Balancing をクラスタ全体で有効にして、クラスタ内の通信の経路最適化に努めてきました。

※Istioが有効になっているPodではTopology Aware Routingは機能しないため、全てのPodにIstioを導入できていない場合はクラスタ全体として両方を有効にする必要があります

一方で、KEELはステートレスなクラスタとして永続性が必要とされるデータストアは基本的にクラスタ外に依存しており、未だ最適化できていない経路も存在しています。

そこでその残されたAZ間通信も削減すべく、小さい経路最適化ミドルウェアを実装する判断に至りました。

実装

さて、KubernetesやIstioが実現するクラスタ内通信の経路最適化はKubernetesのService Discoveryに依存しているため、クラスタ外への通信で真似することはできません。

つまりクライアントはサーバがどのトポロジに配置されているかを知る共通の手段を持っていないということです。

サーバ側がDNSのSRVレコードなどに対応していればそれをそのまま利用できますが、そうでもない場合全てのサーバに新規にService Discoveryを入れるということはあまりに大変です。

そこで我々はTCPサーバとして振る舞うConnection Poolのミドルウェアとして経路最適化を実現することに決めました。

LIFULLにはRubyのUnicornなど1リクエスト1プロセスなプロセスモデルのアプリケーションサーバがいくつかあり、元々プロセスをまたいだConnection Poolを実現したいという要求があります。

しかし、プラットフォームとしてProxySQLを提供するなど特定のユースケースでは対応できていたものの、あらゆるAZ間通信 には対応できていませんでした。

Connection Poolでの経路最適化のアイデアはシンプルで、維持している接続をトポロジごとに管理して、なるべくクライアントと同じトポロジの接続を取り出して使うだけです。 つまりConnection Poolが維持している接続を疑似的にService Discoveryとして利用するということです。

これが実現できれば色々と都合が良さそうなのでやっていきましょう。

シンプルなConnection Pool

まずは普通のTCPプロキシにシンプルなConnection Poolを実装していきます。接続を取り回すだけのソフトウェアになるのでGoが無難でしょう。

MinIdleConnections, MaxIdleConnections でアイドルな接続をコントロールできるようにして MaxIdleTime, MaxLifetime で生存期間を設定できるよくあるやつです。

設定は大体こういうインタフェースでしょうか。 ConnectionMaxIdleTime の判定で利用する returnedAt を持てるようにしただけの net.Conn の薄いラッパーです。

type ConnectionPoolStrategy int

const (
    FIFO ConnectionPoolStrategy = iota
    LIFO
)

type ConnectionPoolOption struct {
    MaxConnections         uint
    MaxIdleConnections     uint
    MinIdleConnections     uint
    MaxIdleTime            time.Duration
    MaxLifetime            time.Duration
    Dialer                 func(context.Context) (net.Conn, error)
    ConnectionPoolStrategy ConnectionPoolStrategy
}

type ConnectionPool struct {
    option *ConnectionPoolOption

    connectionMutex      sync.Mutex
    connections          []*Connection
    idleConnections      []*Connection
}

コンストラクタは省略するとして、接続を取り出す部分はこんな感じになります。

func (p *ConnectionPool) Get(ctx context.Context) (*Connection, error) {
    p.connectionMutex.Lock()
    defer p.connectionMutex.Unlock()

    defer func() {
        go p.prepareIdleConnection(ctx)
    }()

    for {
        connection, err := p.getIdleConnection(ctx)
        if err != nil {
            return nil, xerrors.Errorf("failed to get idle connection: %w", err)
        }

        if connection == nil {
            break
        }

        now := nowFunc()
        if (p.option.MaxIdleTime > 0 && now.Sub(connection.returnedAt) > p.option.MaxIdleTime) ||
            (p.option.MaxLifetime > 0 && now.Sub(connection.createdAt) > p.option.MaxLifetime) ||
            !connection.isHealthy() {
            _ = connection.Close()
            p.removeConnection(ctx, connection)
            continue
        }

        return connection, nil
    }

    connection, err := p.newConnection(ctx)
    if err != nil {
        return nil, xerrors.Errorf("failed to create connection: %w", err)
    }

    return connection, nil
}

ロックを取りながらアイドルな接続を取り出し、生存期間を満たしていれば死活監視をしてから接続を返します。 アイドルな接続がなければ新規に接続を作成して返します。

prepareIdleConnectionMinIdleConnections を満たすようにアイドルな接続を用意していて、isHealthy はソケットに read(2) してEAGAIN or EWOULDBLOCKであることの確認です。

func (c *Connection) isHealthy() bool {
    // A zero time value disables the deadline.
    _ = c.conn.SetReadDeadline(time.Time{})

    syscallConnection, ok := c.conn.(syscall.Conn)
    if !ok {
        return false
    }

    rawConnection, err := syscallConnection.SyscallConn()
    if err != nil {
        return false
    }

    healthy := false

    if err := rawConnection.Read(func(fd uintptr) bool {
        b := make([]byte, 1)
        _, err := syscall.Read(int(fd), b)
        if errors.Is(err, syscall.EAGAIN) || errors.Is(err, syscall.EWOULDBLOCK) {
            healthy = true
        }

        return true
    }); err != nil {
        return false
    }

    return healthy
}

そして、使い終わった接続を維持するためのPutを実装します。 MaxIdleTime で判断するための returnedAt の更新も忘れないようにしましょう。

func (p *ConnectionPool) Put(ctx context.Context, connection *Connection) error {
    p.connectionMutex.Lock()
    defer p.connectionMutex.Unlock()

    if p.IdleConnections() < int(p.option.MaxIdleConnections) {
        connection.returnedAt = nowFunc()
        p.idleConnections = append(p.idleConnections, connection)
    } else {
        _ = connection.Close()
        p.removeConnection(ctx, connection)
    }

    return nil
}

ちなみに、ここまでの内容のTCPプロキシでの利用イメージはこんな感じになります。

func main() {
    ...

    semaphore := make(chan struct{}, maxConnections)
    shutdown := make(chan struct{}, 1)
    wg := sync.WaitGroup{}

    go func() {
        ctx := context.Background()
        for {
            local, err := listener.AcceptTCP()
            if err != nil {
                select {
                case <-shutdown:
                    return
                default:
                    continue
                }
            }
            semaphore <- struct{}{}
            wg.Add(1)
            go func() {
                defer func() {
                    <-semaphore
                    wg.Done()
                }()

                defer local.Close()

                remote, err := connectionPool.Get(ctx)
                if err != nil {
                    return
                }
                defer connectionPool.Put(ctx, remote)
                defer remote.Cancel()

                c := make(chan struct{}, 2)

                f := func(c chan struct{}, dst io.Writer, src io.Reader) {
                    _, _ = io.Copy(dst, src)
                    c <- struct{}{}
                }
                go f(c, remote, local)
                go f(c, local, remote)

                select {
                case <-c:
                case <-shutdown:
                    local.CloseWrite()
                }
            }()
        }
    }()

    // Graceful shutdown
    quit := make(chan os.Signal, 1)
    signal.Notify(quit, syscall.SIGTERM)
    <-quit
    time.Sleep(time.Duration(lameduck) * time.Second)

    close(shutdown)
    listener.Close()

    wg.Wait()
}

余談: Connection Storm

この実装では、維持している接続が MaxLifetime に達した時に強制的にその接続を破棄します。

古い接続が使われ続けることを防ぐためのものですが、これには比較的大きなトレードオフがあります。

それはConnection Stormと呼ばれる現象で、維持されていた接続が一斉に MaxLifetime によって切断されることで、新規の接続が大量に確立して接続先に大きな負荷がかかるというものです。

それを防ぐためにはリトライなどでよく知られるJitterを入れるとよいです。

Jitterはランダム性を注入する概念で、生存期間の判定箇所にJitterを入れて一斉に切断されることを防ぎます。

リトライにおけるJitterは、接続先の障害発生時などに複数のリトライが同時に行われるとカスケード障害を起こしてしまうため、リトライにランダム性を持たせるために利用されています。

type ConnectionPoolOption struct {
        MinIdleConnections     uint
        MaxIdleTime            time.Duration
        MaxLifetime            time.Duration
+       Jitter                 func(time.Duration) time.Duration
        Dialer                 func(context.Context) (net.Conn, error)
        ConnectionPoolStrategy ConnectionPoolStrategy
 }
                now := nowFunc()
-               if (p.option.MaxIdleTime > 0 && now.Sub(connection.returnedAt) > p.option.MaxIdleTime) ||
-                       (p.option.MaxLifetime > 0 && now.Sub(connection.createdAt) > p.option.MaxLifetime) ||
+               if (p.option.MaxIdleTime > 0 && now.Sub(connection.returnedAt) > p.option.Jitter(p.option.MaxIdleTime)) ||
+                       (p.option.MaxLifetime > 0 && now.Sub(connection.createdAt) > p.option.Jitter(p.option.MaxLifetime)) ||
                        !connection.isHealthy() {
                        _ = connection.Close()
                        p.removeConnection(ctx, connection)

実際のJitterの実装はこんな感じになると思います。 jitterPercentage でどの程度ランダム性に開きを持たせるかを調整できるようにするイメージです。

テストの時には引数の duration をそのまま返せばいいでしょう。

func(duration time.Duration) time.Duration {
    jitter := time.Duration(float64(duration) * jitterPercentage * (rand.Float64()*2 - 1))
    return duration + jitter
}

閑話休題。

Topology Aware Routing

ここまででシンプルなConnection Poolの実装が完成したので、ここからようやく本題の経路最適化の実装に入ろうと思います。

アイデアは先に書いた通りシンプルで、維持している接続をトポロジごとに管理して、なるべくクライアントと同じトポロジの接続を取り出して使うだけのものを目指します。

トポロジはIPをベースに判断するとして、今回はサイドカーとしてクライアントのプログラムと同じサーバで動かすことを想定しているため、クライアントのトポロジは自身のIPから判断することができそうです。

トポロジの一覧を --topologies=a=192.168.0.0/24,b=192.168.1.0/24,c=192.168.2.0/24 のように受け取り、プログラム内では以下のように取り扱うことにします。

type Topology struct {
    Name string
    CIDR net.IPNet
}

var topologyList []Topology
var topologyName string
if topologyAwareRouting {
    for _, t := range strings.Split(topologies, ",") {
        if t == "" {
            continue
        }

        parts := strings.Split(t, "=")
        if len(parts) != 2 {
            log.Fatalf("invalid topologies: %s", topologies)
        }
        _, cidr, err := net.ParseCIDR(parts[1])
        if err != nil {
            log.Fatalf("failed to parse CIDR %s: %+v", parts[1], err)
        }
        topology := Topology{
            Name: parts[0],
            CIDR: *cidr,
        }
        topologyList = append(topologyList, topology)

        if topology.CIDR.Contains(net.ParseIP(ownIP)) {
            topologyName = topology.Name
        }
    }
}

CIDRを持ったトポロジの一覧が topologyList として管理されていて、自身のIPから判断したクライアントのトポロジを事前に topologyName として定義しておきます。 (topologyName を事前に定義するのは計算コストを嫌っているだけで、当然クライアントの接続から都度判断することも可能なのでサイドカーではなく独立してデプロイすることも可能です)

Kubernetesでは自身のIPをDownward APIから取得可能で、AWSでもインスタンスメタデータから取得できるのでこれを使います。

それでは先ほどのConnection Poolの実装にこれらを与えてTopology Aware Routingに対応していきましょう。 まずは topologyList を受け取るれるようにしつつ、維持している接続をHashMapでトポロジごとに管理できるように構造体を変更します。

type ConnectionPoolOption struct {
        MinIdleConnections     uint
        MaxIdleTime            time.Duration
        MaxLifetime            time.Duration
        Jitter                 func(time.Duration) time.Duration
+       TopologyList           []Topology
        Dialer                 func(context.Context) (net.Conn, error)
        ConnectionPoolStrategy ConnectionPoolStrategy
 }

type ConnectionPool struct {
        connectionMutex      sync.Mutex
        connections          []*Connection
-       idleConnections      []*Connection
+       idleConnections      map[string][]*Connection
 }

次に、接続を取り出す Get でクライアントのトポロジ topologyName を受け取って、同じトポロジの接続を取り出す部分です。

-func (p *ConnectionPool) Get(ctx context.Context) (*Connection, error) {
+func (p *ConnectionPool) Get(ctx context.Context, topologyName string) (*Connection, error) {
    p.connectionMutex.Lock()
    defer p.connectionMutex.Unlock()

    defer func() {
        go p.prepareIdleConnection(ctx)
    }()

    for {
-       connection, err := p.getIdleConnection(ctx)
+       connection, err := p.getIdleConnection(ctx, topologyName)
        if err != nil {
            return nil, xerrors.Errorf("failed to get idle connection: %w", err)
        }

        if connection == nil {
+           if n := p.pickRandomIdleTopologyName(); n != nil {
+               topologyName = *n
+               continue
+           }
            break
        }

        now := nowFunc()
        if (p.option.MaxIdleTime > 0 && now.Sub(connection.returnedAt) > p.option.Jitter(p.option.MaxIdleTime)) ||
            (p.option.MaxLifetime > 0 && now.Sub(connection.createdAt) > p.option.Jitter(p.option.MaxLifetime)) ||
            !connection.isHealthy() {
            _ = connection.Close()
-           p.removeConnection(ctx, connection)
+           p.removeConnection(ctx, topologyName, connection)
            continue
        }

        return connection, nil
    }

    connection, err := p.newConnection(ctx)
    if err != nil {
        return nil, xerrors.Errorf("failed to create connection: %w", err)
    }

    return connection, nil
}

pickRandomIdleTopologyName はフォールバックの処理で、同じトポロジの接続がなかった際に適当なトポロジの接続を代わりに使います。 フォールバックの戦略を外から与えられるようにしてもいいですね。

他はそれぞれ先のHashMapを扱って接続を取り出したり削除したりしてるだけです。 細かい話ですが、idleConnections がHashMapになることで総接続数を得るための計算コストが O(1) でなくなってしまうため、別でAtomicなカウンターを用意しておいた方がよいです。

最後に Put の実装です。

func (p *ConnectionPool) Put(ctx context.Context, connection *Connection) error
        p.connectionMutex.Lock()
        defer p.connectionMutex.Unlock()

+       topologyName := p.topologyName(connection.RemoteAddr())
+
        if p.IdleConnections() < int(p.option.MaxIdleConnections) {
                connection.returnedAt = nowFunc()
-               p.idleConnections = append(p.idleConnections, connection)
+               p.idleConnections[topologyName] = append(p.idleConnections[topologyName], connection)
        } else {
                _ = connection.Close()
-               p.removeConnection(ctx, connection)
+               p.removeConnection(ctx, topologyName, connection)
        }

        return nil
}

net.ConnRemoteAddr() で接続先のIPを取得し、以下の関数で topologyList として受け取ったCIDRから適切なトポロジを取得します。

func (p *ConnectionPool) topologyName(addr net.Addr) string {
    for _, t := range p.option.TopologyList {
        switch addr := addr.(type) {
        case *net.TCPAddr:
            if t.CIDR.Contains(addr.IP) {
                return t.Name
            }
        case *net.UDPAddr:
            if t.CIDR.Contains(addr.IP) {
                return t.Name
            }
        }
    }
    return ""
}

これで経路最適化を行うTopology Aware Routingも完成です。 簡易的な実装による楽観的な経路最適化ですが、接続先のService Discoveryに依存しない あらゆるAZ間通信 に対応するものができました。

例えばAWSのElastic Load BalancerはAZごとにIPを持っており、それがDNSラウンドロビンで返却されます。 このミドルウェアをクライアントに導入することで、AWSのロードバランサに対する接続であってもAZ間通信を削減することに成功しました。

なお、AWS Application Load Balancerはデフォルトでクロスゾーン負荷分散が有効になっていて、これを導入してもロードバランサとUpstreamの間ではAZ間通信が発生する可能性があります。 Upstreamが十分にAZに分散されていればクロスゾーン負荷分散を無効にする判断ができるかもしれません。

注意点

このミドルウェアに限らずこういった経路最適化で考慮しなければならないことがあります。

それはトポロジ間の分散です。

クライアントとサーバでトポロジ間の分散が不十分な場合、例えばクライアントが一つのトポロジに集中してデプロイされてしまっている場合に、そのトポロジのサーバに負荷が集中してしまいます。

これを防ぐためには慎重にトポロジ間で分散させる必要があり、それにはKubernetesのPod Topology Spread Constraintsを利用することができます。

これはデプロイ時にトポロジ間のばらつきに対する制約をかけられる機能で、LIFULLではKubernetes Manifestを生成するためのコードジェネレータでKubernetesのTopology Aware Routingの設定とともにデフォルトの設定として配布しています。

www.lifull.blog

ただしこれはあくまでデプロイ時の制約で、Podがスケールインした後のリバランスまでは面倒を見てくれません。

そこで、あわせてkubernetes-sigs/deschedulerを導入して RemovePodsViolatingTopologySpreadConstraint を有効にすることをお勧めします。

これにより Pod Topology Spread Constraints に違反しているPodを再スケジュールしてくれるため、その後の再デプロイで再度制約をかけることができます。

またこの制約を機能させるためにはノードのトポロジ間のバランスも重要で、cluster-autoscalerなどで適切に設定する必要があることにも注意してください。 (AWSのAuto Scaling Groupはスポットインスタンスを使っていなければいい感じにやってくれるので特に気にする必要はない気がします)

最後に

Connection Poolのミドルウェアに簡易的なTopology Aware Routingを実装することで あらゆるAZ間通信 を削減することに成功しました。

これによりレイテンシの改善とAZ間通信に対する課金を抑えることができます。 AZ間のレイテンシはLIFULLの環境では最悪ケースで5msにも及ぶことがあり、それなりのインパクトを期待しています。

マルチプロセスなアプリケーションサーバを運用しているとそれっぽいConnection Poolが欲しくなることはあって、手前味噌ですがこのアプローチはちょうどいい塩梅なのではないでしょうか。

最近は調子に乗ってこれにRedisのプロトコルパーサを載せて、RedisへのクエリのRead/Writeの分離機能を実装するなどもしてしまっています。

KEELではRead Heavyなキャッシュなどの用途に向けて運用が簡単なRedis Sentinelクラスタを提供していまして、クライアント実装に手を入れづらい際にこれを入れるだけでいい感じに動くというちょうどいいTCPプロキシとしての立ち位置を確立しつつあります。

KEELチームでは、いくつかの機能をProxy-Wasmで実装してIstio経由で配布していたりもしますが、こういう実装はProxy-Wasmでは実現しづらく第二のService Meshとして今後もこのちょうどいいTCPプロキシを改善していく予定です。

(オーバーエンジニアリングとの境目を気にしながら、)プラットフォームとして必要なものがあれば何でも実装するKEELチームにもし興味を持っていただけた場合は、是非カジュアル面談をさせてください!

hrmos.co

hrmos.co

次のエントリでは、KubernetesのPodの終了際のログが欠損してしまう問題をeBPFで unlink(2)bpf_override_return して対処した問題について書こうと思うので購読してお待ちください :D