🔭 OpenTelemetry を Lambda に蚈装する

本蚘事では、Rust で曞いた Lambda に OpenTelemetry を蚈装する䞊で自分が盎面した、Lambda lifecycle に起因する問題に぀いお曞きたす。

通垞の server process では、telemetry の export は background task に任せお process 終了時に flush するのが定石ですが、Lambda ではこれが玠盎に動きたせん。

  • invocation 埌に execution environment が freeze されるため、background task の実行が保蚌されない
  • lambda_runtime は main loop に入るず制埡を返さないため、process 終了時に flush する hook がない

結論ずしおは、function process では invocation 境界で localhost に export し、remote backend ぞの delivery は Lambda extension に寄せる、ずいう構成にしたした。以䞋、なぜそうなるかず実装を順に芋おいきたす。

前提

本蚘事では以䞋の version を利甚したした。

lambda_runtime = "1.1.3"
opentelemetry = "0.31.0"
opentelemetry_sdk = "0.31.0"
opentelemetry-otlp = "0.31.1"
opentelemetry-appender-tracing = "0.31.1"
tracing-opentelemetry = "0.32.1"

甚語

  • function process: Runtime API の client ずしお handler を実行するプロセス
  • extension process: Extensions API の client ずしお function process ず同じ execution environment で動く補助プロセス
  • Lambda service: Runtime API / Extensions API を提䟛する Lambda の control plane
  • telemetry: logs, traces, metrics ずいったアプリケヌションが出力する芳枬デヌタ

Telemetry をい぀ export するか

冒頭で挙げた 2 ぀の制玄 (freeze による非同期タスクの停止 / lambda_runtime が制埡を返さない) に぀いお、なぜそうなるのかを Lambda runtime の挙動から順に芋おいきたす。

Lambda Runtime

Lambda ずいうず利甚者は関数(function)を曞いお、Lambda service が呌び出しを制埡しおくれるように思えたすが、実䜓ずしおは利甚者偎でリク゚ストを polling する pull 型です。

Lambda service は /2018-06-01/runtime/invocation/next ずいう API を公開しおおり、function process はこれを呌び出しお、Lambda invocation を衚珟する EventResponse を取埗したす。 EventResponse を加工しお handler に枡し、結果を取埗したのちに、/2018-06-01/runtime/invocation/{AwsRequestId}/response ぞ POST しお結果を Lambda service に返したす。 むメヌゞずしおは、以䞋のような実装です。

fn lambda_runtime_main_loop() {
    loop {
        // 次の invocation を埅぀ (long-poll)
        let event = http_get("/runtime/invocation/next").await;

        // handler を実行
        let result = handler(event).await;

        // 結果を Lambda service に通知(id は eventから取埗)
        match result {
            Ok(response) => http_post("/runtime/invocation/{id}/response", response).await,
            Err(err) => http_post("/runtime/invocation/{id}/error", err).await,
        }
    }
}

ただ、Runtime API の呌び出しや invocation event の倉換等を利甚者偎で実装するのは倧倉なので、lambda-runtime crate が AWS から提䟛されおいたす。 lambda-runtime は tower::Service ベヌスの実装ずなっおおり、利甚者偎が定矩する handler Service<LambdaEvent<EventPayload>> ず自然に compose できる蚭蚈になっおいたす。玠晎らしいですね。

䞊蚘の loop は実際には以䞋のように実装されおいたす。(process_invocation が handler 呌び出し) lambda-runtime/src/runtime.rs

impl<S> Runtime<S>
where
    S: Service<LambdaInvocation, Response = (), Error = BoxError>,
{
    pub async fn run(self) -> Result<(), BoxError> {
        /* ... */
        let incoming = incoming(&self.client);
        Self::run_with_incoming(self.service, self.config, incoming).await
    }

    pub(crate) async fn run_with_incoming(
        mut service: S,
        config: Arc<Config>,
        incoming: impl Stream<Item = Result<http::Response<hyper::body::Incoming>, BoxError>> + Send,
    ) -> Result<(), BoxError> {
        tokio::pin!(incoming);
        while let Some(next_event_response) = incoming.next().await {
            trace!("New event arrived (run loop)");
            let event = next_event_response?;
            process_invocation(&mut service, &config, event, true).await?;
        }
        Ok(())
    }
}

fn incoming(
    client: &ApiClient,
) -> impl Stream<Item = Result<http::Response<hyper::body::Incoming>, BoxError>> + Send + '_ {
    async_stream::stream! {
        loop {
            trace!("Waiting for next event (incoming loop)");
            let req = NextEventRequest.into_req().expect("Unable to construct request");
            let res = client.call(req).await;
            yield res;
        }
    }
}

Freeze ず Shutdown

ここがハマりポむントなのですが、Runtime::run() は呌び出すず制埡を返したせん。

The runtime and each extension indicate completion by sending a Next API request. Lambda freezes the execution environment when the runtime and each extension have completed and there are no pending events.

environment-lifecycle

この freeze ず 暗黙的な shutdown が Lambda のポむントです。 function process(Runtime) が /runtime/invocation/next を呌んだ時点で実行環境が freeze する可胜性があるので、background の非同期タスクの実行が保蚌されたせん。 freeze しおからそのたた shutdown しおしたうず、Drop 等が走らないので、flush 凊理のタむミングがないこずが課題です。

handler 凊理の䞭で telemetry export たで実行しおしたえばよいのですが、export は通垞 network call を䌎いたすし、その時間分だけ response を返す時間が遅れおしたいたす。たた、Lambda が連続しお呌び出される堎合、ある皋床たずめおから export したほうが効率的ですがそれもできたせん。

Lambda Extension

ここたでの問題をたずめるず、function process (handler) ずは別のコンテキストで動き、handler の凊理が完了するたで freeze されず、終了時に flush できる仕組みが必芁、ずいうこずになりたす。

この仕組みを提䟛しおくれるのが extension です。 Extension は Lambda の実行 image の /opt/extensions に眮く binary で、Lambda service が起動しおくれたす。 Extension 甚の API も甚意されおおり、/extension/register で extension を登録し、/extension/event/next を呌び出すず、function process 同様、Lambda service が event を返しおくれたす。 この event には、Invoke ず Shutdown の 2 皮類があり、それぞれ function process の起動時ず shutdown 時にレスポンスが返っおきたす。 そしおさきほど匕甚したように

Lambda freezes the execution environment when the runtime and each extension have completed and there are no pending events.

Lambda service は extension の完了を埅っお freeze しおくれたす。 たた、function process ず extension は localhost で通信できるので、extension が localhost で OTLP server を listen しおおけば、function process は localhost ぞ OTLP export できたす。

Shutdown 時は /extension/event/next が Shutdown event を返しおくれるのでそのタむミングで flush できたす。これにより、function process からの telemetry をある皋床 extension で queue 等に保持しおおいお 非同期で export するこずも可胜です。

Runtime API のハンドリングを lambda-runtime が実装しおくれおいたように、Extensions API のハンドリングは lambda-extension crate が実装しおくれおいたす。 䟋によっおこちらも user が Invoke, Shutdown を凊理する tower::Service を枡せばよい蚭蚈になっおいたす。

SDK の蚭定

function process は invoke 毎に localhost の extension に telemetry を export し、extension は䞀定数 queueing したのちに remote backend に export しお、Shutdown 時に flush するずいう方針が立ちたした。

次に実際に function process 偎を蚈装しおいきたす。 各 signal に぀いお以䞋の方針をずりたす。

  • Logs: tracing -> tracing_subscriber -> opentelemetry-appender-tracing
  • Traces: tracing -> tracing_subscriber -> tracing-opentelemetry
  • Metrics: opentelemetry_sdk

たず、党䜓像ずしおは以䞋のような責務分担を考えおいたす。

+------------------------+           +-------------------------+
| function process       |  OTLP     | extension process       |
|------------------------| --------> |-------------------------|
| tracing_subscriber     | localhost | local otlp receiver     |
| trace/log provider     |           | batch queue             |
| meter provider         |           | remote exporter         |
| handler                |           | shutdown flush          |
+------------------------+           +-------------------------+

function process 偎は invoke の hot path にいるので、ここでは signal を生成しお localhost に投げるずころたでに責務を限定したす。 䞀方で extension process 偎は invoke の完了埌もしばらく動けるので、network 越しの export や retry, Shutdown 時の flush はすべおこちらに寄せたす。

Logs

Logs に぀いおは、公匏が opentelemetry-appender-tracing で提䟛しおいる OpenTelemetryTracingBridge が impl tracing_subscriber::Layer しおいるので、tracing_subscriber ず compose できたす。 これは Logs signal の蚭蚈思想に沿ったものです。

Our approach with logs is somewhat different. For OpenTelemetry to be successful in logging space we need to support existing legacy of logs and logging libraries, while offering improvements and better integration with the rest of observability world where possible.

OpenTelemetry Logging

ずいうこずで、Logs に関しおは log 甹 API を甚意しおそれを叩いおもらうのではなく、既存の logging ecosystem に接続しおいくずいう蚭蚈方針のようです。

Traces

Traces も同じく tracing ecosystem を利甚したすが、tracing_opentelemetry は公匏から提䟛されおいるものではなく、tracing project でホストされおいたす。 したがっお、traces に関しおも bridge アプロヌチを取るのは蚭蚈思想に反するずも思えたす。この点に関しおはすでに issue OpenTelemetry Tracing API vs Tokio-Tracing API for Distributed Tracing で議論されおいたす。芁するにこの議論は、traces を生成する際に、opentelemetry-rust ずしお tracing ず OpenTelemetry Trace API のどちらを掚奚するかずいうものです。この issue は 2026-03-18 にクロヌズされお、珟圚 docs: start doc for distributed tracing and logs guidance で document が制䜜䞭です。

たた、traces 関連では仕様に倧きな倉曎がありたした。 Deprecating Span Events API で

OpenTelemetry is deprecating the Span Event API.

ずしお、span event api が deprecate になりたした。 これたで span の䞭で logging するず、span event ずしおも logs ずしおも蚘録され、同じ情報が 2 重に生成されおいたした。 個人的には冗長で分かりづらいず感じ぀぀、span は sampling されうるので䞡方残すのは仕方ないずも考えおいたした。 今埌は logs 䞀本に集玄できそうです。

Metrics

最埌に Metrics です。ここたで tracing ecosystem を利甚しおきたしたが、metrics に関しおは、opentelemetry api をそのたた䜿うのがよいかなずいうのが珟時点の結論です。tracing-opentelemetry にも MetricsLayer があり

tracing::info!(counter.foo = 1);

のような tracing event ずしお counter を衚珟できたす。 内郚的には、MetricsLayer が特定の event field をみお、metrics の呜名芏玄に䞀臎しおいるず、初回の event 凊理に instrument を生成したす。

この実装は counter のようなシンプルな instrument では䟿利なのですが、instrument の初期化は実際にはもうすこし耇雑だず考えおいたす。䟋えば counter に぀いおは、description ず unit を指定したいずころですし、histogram に関しおは boundary ずしお Vec<f64> を枡す必芁がありたす。

぀たり、tracing-opentelemetry で metrics を扱う方法は、instrument の生成ず状態倉曎の責務が䞀䜓化されおいるこずが問題であるず考えおいたす。

なので今のずころは、application 初期化時に、instrument を初期化しお、global 倉数にセットするほうが䜿いやすいかなず考えおいたす。

pub static FOO_COUNTER: LazyLock<Counter<u64>> = LazyLock::new(|| {
    METER
        .u64_counter("foo")
        .with_description("description")
        .with_unit("{foo}")
        .build()
});

fn process() {
  FOO_COUNTER.add(1, &[KeyValue::new("key", "val")]);
}

ずいうこずで、logs, traces, metrics それぞれで利甚する crate が異なっおしたうのが、自分の珟状です。将来的には、tracing, opentelemetry 間の関係が敎理されるか、統䞀的な layer がでおきおくれるこずに期埅しおいたす。

Function process の蚈装

ここから function process 偎の蚈装を実装しおいきたす。 前述の通り、Lambda では handler の倖で動く background task や process 終了時の Drop に export 完了を期埅できたせん。 したがっお function process 偎では、OpenTelemetry SDK が甚意しおいる暙準の batch processor / periodic reader にはそのたた乗らず、invocation の境界で明瀺的に flush できる構成にしたす。

具䜓的には、logs / traces は manual processor で process 内の queue に積み、metrics は manual reader で collect できるようにしたす。 handler の実行䞭は signal を蚘録するだけにしお、handler が result を返した埌、tower::Layer で localhost の extension に flush したす。 remote backend ぞの batch export や retry、Shutdown 時の flush は extension process 偎の責務にしたす。

Resource

たず、logs / traces / metrics に共通で付䞎する Resource を組み立おたす。 service.name や deployment.environment.name のような情報は各 log record や span attribute に毎回付けるのではなく、provider 初期化時に resource attribute ずしおたずめお蚭定したす。

use opentelemetry::KeyValue;
use opentelemetry_sdk::{Resource, resource::EnvResourceDetector};
use opentelemetry_semantic_conventions::{
    SCHEMA_URL,
    resource::{
        DEPLOYMENT_ENVIRONMENT_NAME,
        SERVICE_NAME,
        SERVICE_NAMESPACE,
        SERVICE_VERSION,
    },
};

fn build_resource() -> Resource {
    Resource::builder_empty()
        .with_detector(Box::new(EnvResourceDetector::new()))
        .with_schema_url(
            [
                KeyValue::new(SERVICE_NAMESPACE, "example"),
                KeyValue::new(SERVICE_NAME, env!("CARGO_PKG_NAME")),
                KeyValue::new(SERVICE_VERSION, env!("CARGO_PKG_VERSION")),
                KeyValue::new(DEPLOYMENT_ENVIRONMENT_NAME, "development"),
            ],
            SCHEMA_URL,
        )
        .build()
}

Logs pipeline

Logs の入口は先ほど曞いた通り opentelemetry-appender-tracing の OpenTelemetryTracingBridge を䜿いたす。 ただし、SdkLoggerProvider に暙準の batch processor を蚭定しお background export させるのではなく、自前の processor で䞀旊 queue に積むようにしたす。

構造ずしおは以䞋のようになりたす。

tracing::event!
  -> tracing_subscriber
  -> OpenTelemetryTracingBridge
  -> SdkLoggerProvider
  -> ManualLogProcessor
  -> in-memory queue

ManualLogProcessor の hot path でやるこずは、SdkLogRecord ず InstrumentationScope を clone しお queue に push するだけです。 ここでは OTLP HTTP request を実行したせん。

use std::{
    collections::VecDeque,
    sync::{Arc, Mutex},
    time::Duration,
};

use opentelemetry::InstrumentationScope;
use opentelemetry_sdk::{
    error::OTelSdkResult,
    logs::{LogBatch, LogProcessor, SdkLogRecord},
};

#[derive(Clone)]
struct ManualLogProcessor {
    queue: Arc<Mutex<VecDeque<(SdkLogRecord, InstrumentationScope)>>>,
}

impl LogProcessor for ManualLogProcessor {
    fn emit(&self, record: &mut SdkLogRecord, scope: &InstrumentationScope) {
        let Ok(mut queue) = self.queue.lock() else {
            return;
        };
        queue.push_back((record.clone(), scope.clone()));
    }

    fn force_flush(&self) -> OTelSdkResult {
        Ok(())
    }

    fn shutdown_with_timeout(&self, _timeout: std::time::Duration) -> OTelSdkResult {
        Ok(())
    }
}

force_flush や shutdown_with_timeout は no-op にしおいたす。 ここで同期的に export しようずするず、結局 handler path に network I/O を戻しおしたうからです。 flush は別に持぀ handle から async に実行したす。

struct LogState {
    provider: SdkLoggerProvider,
    flush: LogFlushHandle,
}

#[derive(Clone)]
struct LogFlushHandle {
    queue: Arc<Mutex<VecDeque<(SdkLogRecord, InstrumentationScope)>>>,
    exporter: Arc<LogExporter>,
}

impl LogFlushHandle {
    async fn flush(&self, timeout: Duration) {
        let drained = {
            let Ok(mut queue) = self.queue.lock() else {
                return;
            };
            queue.drain(..).collect::<Vec<_>>()
        };

        if drained.is_empty() {
            return;
        }

        let records = drained
            .iter()
            .map(|(record, scope)| (record, scope))
            .collect::<Vec<_>>();
        let batch = LogBatch::new(&records);

        match tokio::time::timeout(timeout, self.exporter.export(batch)).await {
            Ok(Ok(())) => {}
            Ok(Err(err)) => eprintln!("failed to export logs: {err}"),
            Err(_) => eprintln!("log export timed out"),
        }
    }
}

LogState::build では、manual processor ず flush handle が同じ queue を芋るように組み立おたす。 LogExporter は localhost extension の /v1/logs に向けたす。

impl LogState {
    fn build(resource: Resource, endpoint: &str) -> Result<Self, Error> {
        let queue = Arc::new(Mutex::new(VecDeque::new()));

        let mut exporter = LogExporter::builder()
            .with_http()
            .with_endpoint(format!("{endpoint}/v1/logs"))
            .build()?;
        exporter.set_resource(&resource);
        let exporter = Arc::new(exporter);

        let processor = ManualLogProcessor {
            queue: queue.clone(),
        };
        let provider = SdkLoggerProvider::builder()
            .with_resource(resource)
            .with_log_processor(processor)
            .build();

        Ok(Self {
            provider,
            flush: LogFlushHandle { queue, exporter },
        })
    }
}

実際には queue の䞊限、overflow 時の drop counter、flush timeout なども必芁になりたす。 ただ、蚭蚈䞊の芁点は emit では network I/O をせず、invocation の末尟でたずめお localhost の extension に export するこずです。

なお export 倱敗時の通知に tracing::warn! を䜿うず、OpenTelemetryTracingBridge 経由で再び logs queue に入っおしたい、自己参照になりたす。ここでは eprintln! で stderr に出しお CloudWatch Logs から拟う方針にしおいたす。

Traces pipeline

Traces に぀いおも考え方は Logs ず同じです。 tracing-opentelemetry の OpenTelemetryLayer を䜿っお tracing::span! から OpenTelemetry span を生成したすが、SdkTracerProvider には BatchSpanProcessor を蚭定したせん。 BatchSpanProcessor は background worker に export を任せる蚭蚈なので、Lambda の freeze ず盞性が悪いためです。

構造ずしおは以䞋のようになりたす。

tracing::span!
  -> tracing_subscriber
  -> tracing_opentelemetry::OpenTelemetryLayer
  -> SdkTracerProvider
  -> ManualSpanProcessor
  -> in-memory queue

ManualSpanProcessor では、span 終了時に呌ばれる on_end で SpanData を queue に積みたす。

use std::{
    collections::VecDeque,
    sync::{Arc, Mutex},
    time::Duration,
};

use opentelemetry::Context;
use opentelemetry_sdk::{
    error::OTelSdkResult,
    trace::{Span, SpanData, SpanProcessor},
};

#[derive(Clone)]
struct ManualSpanProcessor {
    queue: Arc<Mutex<VecDeque<SpanData>>>,
}

impl SpanProcessor for ManualSpanProcessor {
    fn on_start(&self, _span: &mut Span, _cx: &Context) {
    }

    fn on_end(&self, span: SpanData) {
        if !span.span_context.is_sampled() {
            return;
        }

        let Ok(mut queue) = self.queue.lock() else {
            return;
        };
        queue.push_back(span);
    }

    fn force_flush(&self) -> OTelSdkResult {
        Ok(())
    }

    fn shutdown_with_timeout(&self, _timeout: std::time::Duration) -> OTelSdkResult {
        Ok(())
    }
}

force_flush ず shutdown_with_timeout を no-op にする理由は Logs ず同じく、export を invocation 末尟の flush handle に集玄するためです。

struct TraceState {
    provider: SdkTracerProvider,
    flush: SpanFlushHandle,
}

#[derive(Clone)]
struct SpanFlushHandle {
    queue: Arc<Mutex<VecDeque<SpanData>>>,
    exporter: Arc<SpanExporter>,
}

impl SpanFlushHandle {
    async fn flush(&self, timeout: Duration) {
        let spans = {
            let Ok(mut queue) = self.queue.lock() else {
                return;
            };
            queue.drain(..).collect::<Vec<_>>()
        };

        if spans.is_empty() {
            return;
        }

        match tokio::time::timeout(timeout, self.exporter.export(spans)).await {
            Ok(Ok(())) => {}
            Ok(Err(err)) => eprintln!("failed to export spans: {err}"),
            Err(_) => eprintln!("span export timed out"),
        }
    }
}

TraceState::build の構造は LogState::build ず同じで、processor ず flush handle が同じ queue を共有し、SpanExporter は localhost extension の /v1/traces に向けたす。

impl TraceState {
    fn build(resource: Resource, endpoint: &str) -> Result<Self, Error> {
        let queue = Arc::new(Mutex::new(VecDeque::new()));

        let mut exporter = SpanExporter::builder()
            .with_http()
            .with_endpoint(format!("{endpoint}/v1/traces"))
            .build()?;
        exporter.set_resource(&resource);
        let exporter = Arc::new(exporter);

        let processor = ManualSpanProcessor {
            queue: queue.clone(),
        };
        let provider = SdkTracerProvider::builder()
            .with_resource(resource)
            .with_sampler(Sampler::ParentBased(Box::new(
                Sampler::TraceIdRatioBased(0.1),
            )))
            .with_span_processor(processor)
            .build();

        Ok(Self {
            provider,
            flush: SpanFlushHandle { queue, exporter },
        })
    }

    fn tracer(&self) -> Tracer {
        self.provider.tracer("lambda-app")
    }
}

実装䞊は sampler の扱いもここで決めたす。 䟋えば ParentBased(TraceIdRatioBased) を䜿う堎合でも、sampling 刀定自䜓は SDK 偎に任せ、processor では sampled な span だけを queue に積む、ずいう責務分担にしおいたす。

Metrics pipeline

Metrics では PeriodicReader を䜿いたせん。 PeriodicReader は名前の通り background task / timer で定期的に collect + export するため、Lambda の freeze 䞭に動くこずを期埅できないからです。 代わりに ManualReader を SdkMeterProvider に登録し、invocation 末尟で明瀺的に collect したす。

構造ずしおは以䞋のようになりたす。

Counter::add / Histogram::record
  -> SdkMeterProvider
  -> ManualReader
  -> ResourceMetrics
  -> MetricExporter
use std::{
    sync::{Arc, Weak},
    time::Duration,
};

use opentelemetry_sdk::{
    error::OTelSdkResult,
    metrics::{
        InstrumentKind, ManualReader, Pipeline, Temporality,
        data::ResourceMetrics,
        reader::MetricReader,
    },
};

#[derive(Clone, Debug)]
struct SharedManualReader(Arc<ManualReader>);

impl MetricReader for SharedManualReader {
    fn register_pipeline(&self, pipeline: Weak<Pipeline>) {
        self.0.register_pipeline(pipeline);
    }

    fn collect(&self, rm: &mut ResourceMetrics) -> OTelSdkResult {
        self.0.collect(rm)
    }

    fn force_flush(&self) -> OTelSdkResult {
        self.0.force_flush()
    }

    fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult {
        self.0.shutdown_with_timeout(timeout)
    }

    fn temporality(&self, kind: InstrumentKind) -> Temporality {
        match kind {
            InstrumentKind::Counter | InstrumentKind::Histogram => Temporality::Delta,
            _ => Temporality::Cumulative,
        }
    }
}

temporality は metrics の倀をどういう意味で export するかを決めたす。 Cumulative は「開始時点から珟圚たでの环積倀」を送り、Delta は「前回 collect しおから今回 collect するたでの増分」を送りたす。

通垞の long-running process では Cumulative でも扱いやすいです。 process が長く生きるので、counter の start time が安定しおおり、backend 偎も同じ process から䌞び続ける時系列ずしお解釈できたす。

䞀方で Lambda では execution environment が cold start / warm start によっお増枛したす。 同じ Lambda function でも、裏偎では耇数の execution environment が䞊行しお存圚し、それぞれが別々の start time を持ちたす。 たた、しばらく invoke が無ければ environment は砎棄され、次の cold start で counter は 0 から始たりたす。

䟋えば request.count counter を Cumulative で送るず、次のような倀になりたす。

env A: 1, 2, 3, 4
env B: 1, 2
env C: 1

これはそれぞれの execution environment 内では正しい环積倀ですが、backend で Lambda function 党䜓の request 数ずしお扱うには、時系列の identity や reset を正しく解釈する必芁がありたす。 backend が start time や resource attribute の扱いに敏感だず、environment ごずに別 series になったり、reset をたたいだ増分蚈算が期埅通りにならなかったりしたす。

Delta にするず、各 flush が「この invocation で増えた分」に近い倀になりたす。

env A: 1, 1, 1, 1
env B: 1, 1
env C: 1

Lambda では invocation 境界で flush する蚭蚈にしおいるので、Delta は function 党䜓で集蚈しやすい圢になりたす。 もちろん backend や reader の仕様に䟝存する郚分はありたすが、Lambda のように process lifetime が安定しない実行環境では、counter / histogram は Delta ずしお送るほうが運甚䞊わかりやすいず考えおいたす。

flush 偎では ManualReader::collect で ResourceMetrics を䜜り、それを localhost の extension に export したす。

struct MeterState {
    provider: SdkMeterProvider,
    flush: MeterFlushHandle,
}

#[derive(Clone)]
struct MeterFlushHandle {
    reader: SharedManualReader,
    exporter: Arc<MetricExporter>,
}

impl MeterFlushHandle {
    async fn flush(&self, timeout: Duration) {
        let mut metrics = ResourceMetrics::default();

        if let Err(err) = self.reader.collect(&mut metrics) {
            eprintln!("failed to collect metrics: {err}");
            return;
        }

        if metrics.scope_metrics().all(|scope| scope.metrics().count() == 0) {
            return;
        }

        match tokio::time::timeout(timeout, self.exporter.export(&metrics)).await {
            Ok(Ok(())) => {}
            Ok(Err(err)) => eprintln!("failed to export metrics: {err}"),
            Err(_) => eprintln!("metric export timed out"),
        }
    }
}

MeterState::build では、SdkMeterProvider に登録する reader ず、flush 偎で collect する reader を同じものにしたす。 MetricExporter は localhost extension の /v1/metrics に向けたす。

impl MeterState {
    fn build(resource: Resource, endpoint: &str) -> Result<Self, Error> {
        let reader = SharedManualReader(Arc::new(
            ManualReader::builder().build(),
        ));

        let exporter = Arc::new(
            MetricExporter::builder()
                .with_http()
                .with_endpoint(format!("{endpoint}/v1/metrics"))
                .build()?,
        );

        let provider = SdkMeterProvider::builder()
            .with_resource(resource)
            .with_reader(reader.clone())
            .build();

        Ok(Self {
            provider,
            flush: MeterFlushHandle { reader, exporter },
        })
    }
}

これで handler 偎は Counter::add や Histogram::record を呌ぶだけになりたす。 metrics の collect / export は reader 偎に閉じ蟌め、handler の business logic からは芋えないようにしたす。

Subscriber / Provider の初期化

ここたでで logs / traces / metrics それぞれの provider ず flush handle を䜜れるようになりたした。 次に、それらを application の入口で䞀床だけ初期化したす。

Logs ず Traces は tracing_subscriber の layer ずしお接続したす。 Logs は OpenTelemetryTracingBridge、Traces は tracing_opentelemetry::OpenTelemetryLayer を䜿いたす。 Metrics は global::set_meter_provider で global provider ずしお登録したす。

struct Providers {
    logs: LogState,
    traces: TraceState,
    metrics: MeterState,
}

impl Providers {
    fn flush_handle(&self) -> FlushHandle {
        FlushHandle {
            logs: self.logs.flush.clone(),
            traces: self.traces.flush.clone(),
            metrics: self.metrics.flush.clone(),
        }
    }
}

fn init_telemetry() -> Result<Providers, Error> {
    let resource = build_resource();
    let endpoint = "http://127.0.0.1:4318";

    let logs = LogState::build(resource.clone(), endpoint)?;
    let traces = TraceState::build(resource.clone(), endpoint)?;
    let metrics = MeterState::build(resource, endpoint)?;

    let log_layer =
        OpenTelemetryTracingBridge::new(&logs.provider);
    let trace_layer =
        tracing_opentelemetry::layer().with_tracer(traces.tracer());

    tracing_subscriber::registry()
        .with(tracing_subscriber::fmt::layer())
        .with(trace_layer)
        .with(log_layer)
        .with(tracing_subscriber::EnvFilter::from_default_env())
        .init();

    global::set_meter_provider(metrics.provider.clone());

    Ok(Providers {
        logs,
        traces,
        metrics,
    })
}

ここでも export 先は remote backend ではなく localhost の extension です。 function process は observability backend の URL や認蚌情報を知らず、http://127.0.0.1:4318/v1/logs などに OTLP HTTP で投げるだけにしおおきたす。 remote backend ぞの接続蚭定は extension process 偎に閉じ蟌めたす。

fmt::layer() も残しおいるのは、Lambda の暙準出力に出た log は CloudWatch Logs に流れるためです。 OTLP 偎の疎通を確認しおいる間は、stdout ず OTLP の䞡方に出しおおくほうがデバッグしやすいです。 最終的に stdout を残すかどうかは運甚方針次第です。

Invocation 末尟で flush

ここたでの構成では、handler が実行されおいる間に logs / traces / metrics はそれぞれ process 内に貯たりたす。 ただし、貯めただけでは extension には届きたせん。 そこで invocation の末尟で、各 pipeline の flush handle を明瀺的に呌び出したす。

handler の䞭に盎接 flush().await を曞くこずもできたすが、business logic ず telemetry delivery の責務が混ざっおしたいたす。 lambda_runtime は tower::Service ベヌスなので、flush は tower::Layer ずしお差し蟌むほうが扱いやすいです。

#[derive(Clone)]
struct FlushLayer {
    handle: FlushHandle,
    timeout: Duration,
}

struct FlushService<S> {
    inner: S,
    handle: FlushHandle,
    timeout: Duration,
}

impl<S> tower::Layer<S> for FlushLayer {
    type Service = FlushService<S>;

    fn layer(&self, inner: S) -> Self::Service {
        FlushService {
            inner,
            handle: self.handle.clone(),
            timeout: self.timeout,
        }
    }
}

FlushService は handler の result を埅った埌に flush したす。 handler が Ok を返した堎合も Err を返した堎合も、telemetry はできるだけ extension に枡したいので、result の䞭身に関係なく flush を詊みたす。

impl<S, T, R> Service<LambdaEvent<T>> for FlushService<S>
where
    S: Service<LambdaEvent<T>, Response = R>,
    S::Future: Future<Output = Result<R, S::Error>> + Send + 'static,
    S::Error: Send + 'static,
    R: Send + 'static,
{
    type Response = R;
    type Error = S::Error;
    type Future = Pin<Box<dyn Future<Output = Result<R, Self::Error>> + Send>>;

    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        self.inner.poll_ready(cx)
    }

    fn call(&mut self, event: LambdaEvent<T>) -> Self::Future {
        let handle = self.handle.clone();
        let timeout = self.timeout;
        let fut = self.inner.call(event);

        Box::pin(async move {
            let result = fut.await;
            handle.flush_all(timeout).await;
            result
        })
    }
}

flush_all では logs -> traces -> metrics の順に flush したす。 flush 先は localhost の extension なので remote backend ぞ盎接送るよりは軜いですが、それでも HTTP request なので timeout は必ず蚭定したす。

#[derive(Clone)]
struct FlushHandle {
    logs: LogFlushHandle,
    traces: SpanFlushHandle,
    metrics: MeterFlushHandle,
}

impl FlushHandle {
    async fn flush_all(&self, timeout: Duration) {
        self.logs.flush(timeout).await;
        self.traces.flush(timeout).await;
        self.metrics.flush(timeout).await;
    }
}

これで application code から芋るず、handler では tracing::info! や Counter::add を呌ぶだけです。 signal の delivery は service layer に閉じ蟌められるので、各 handler に flush 凊理を曞き忘れる問題も避けられたす。

最埌に、main では初期化した Providers から flush handle を取り出しお、handler service に layer を重ねたす。

#[tokio::main]
async fn main() -> Result<(), Error> {
    let providers = init_telemetry()?;

    let handler = service_fn(handler);
    let service = ServiceBuilder::new()
        .layer(FlushLayer {
            handle: providers.flush_handle(),
            timeout: Duration::from_millis(500),
        })
        .service(handler);

    Runtime::new(service).run().await?;
    Ok(())
}

ここで providers は main scope に保持し続けたす。 FlushHandle は内郚の Arc を clone しお layer に枡したすが、provider 自䜓の lifetime は providers が持ちたす。 flush のタむミングはあくたで FlushLayer が管理し、Drop や process shutdown には頌らないようにしたす。

Lambda extension の実装

ここたでで、function process は invocation の末尟で localhost の extension に telemetry を枡すだけになりたした。 次に、その受け口になる Lambda extension を実装したす。

extension process の責務は以䞋です。

  1. Extensions API で extension を登録し、Invoke / Shutdown event を受け取れるようにする
  2. localhost で OTLP receiver を listen し、function process から送られおくる telemetry を受け取る
  3. 受け取った telemetry を extension process 内で buffer し、必芁な単䜍で remote backend に export する
  4. Shutdown event を受け取ったら、残っおいる telemetry を flush する

function process では background worker や process shutdown に頌らないようにしたした。 䞀方で extension process は Extensions API の event loop を持っおいるため、Shutdown event を明瀺的に凊理できたす。 そのため remote backend ぞの batch export、retry、shutdown flush は extension process 偎に寄せたす。

実装䞊は、OTLP receiver ず Extensions API の event loop を同時に動かす必芁がありたす。 receiver は function process からの /v1/logs、/v1/traces、/v1/metrics を受け付けたす。 event loop は /extension/event/next で次の Invoke たたは Shutdown event を埅ちたす。

このずき、extension 偎の receiver は最初の invocation が始たる前に listen できおいる必芁がありたす。 function process は invocation の末尟で extension に flush するため、receiver の起動が遅れるず最初の telemetry を受け取れたせん。 したがっお extension の初期化では、Extensions API ぞの登録を枈たせたうえで、最初の event polling に入る前に receiver を listen しおおく、ずいう順序にしたす。

むメヌゞずしおは以䞋のような実装です。

async fn extension_main_loop() -> Result<(), Error> {
    let extension = register_extension().await?;

    let telemetry = TelemetryBuffer::new();
    let receiver = start_otlp_receiver("127.0.0.1:4318", telemetry.clone()).await?;
    let exporter = RemoteExporter::new();

    loop {
        let event = extension.next_event().await?;

        match event {
            ExtensionEvent::Invoke { .. } => {
                telemetry.export_ready_batch(&exporter).await?;
            }
            ExtensionEvent::Shutdown { .. } => {
                receiver.shutdown().await?;
                telemetry.flush_all(&exporter).await?;
                break;
            }
        }
    }

    Ok(())
}

実際の実装では、Invoke event を受け取った時点では function の凊理はただ完了しおいたせん。 そのため、Invoke event は「この invocation の telemetry を flush するタむミング」ずいうより、前回たでに溜たっおいる telemetry を backend に送る機䌚ずしお扱いたす。 珟圚実行䞭の invocation で発生した telemetry は、function process が handler の終了埌に localhost receiver ぞ送っおきたす。

参照実装ずしお、lambda-observability の opentelemetry-lambda-extension がありたす。 この実装でも、Extensions API のハンドリングには lambda-extension が利甚されおいたす。

ExtensionRuntime::run() では、OTLP receiver の起動ず Extension::run() が組み合わされおいたす。 自分の芁件では、flush の刀定や、䟝存 crate (特に opentelemetry や tokio) のバヌゞョンを自分で制埡したかったので盎接利甚はしたせんでしたが、構成を考えるうえで非垞に参考になりたした。

たずめ

Lambda の lifecycle に合わせお OpenTelemetry を蚈装するには、background task や Drop に export を任せず、manual processor / reader で invocation 境界に flush を集玄し、remote backend ぞの delivery は extension に寄せる、ずいう構成になりたした。

そのおかげで extension の仕組みや manual processor / reader に぀いお孊べた䞀方、signal ごずに利甚する crate がばらけおしたうのは珟状の課題です。Rust の opentelemetry は 着実に stable release に向かっおいるので、tracing ecosystem ず共存しおもうすこしシンプルになるずうれしいず思っおいたす。