æ¬èšäºã§ã¯ãRust ã§æžãã Lambda ã« OpenTelemetry ãèšè£ ããäžã§èªåãçŽé¢ãããLambda lifecycle ã«èµ·å ããåé¡ã«ã€ããŠæžããŸãã
éåžžã® server process ã§ã¯ãtelemetry ã® export 㯠background task ã«ä»»ã㊠process çµäºæã« flush ããã®ãå®ç³ã§ãããLambda ã§ã¯ãããçŽ çŽã«åããŸããã
- invocation åŸã« execution environment ã freeze ããããããbackground task ã®å®è¡ãä¿èšŒãããªã
lambda_runtime㯠main loop ã«å ¥ããšå¶åŸ¡ãè¿ããªããããprocess çµäºæã« flush ãã hook ããªã
çµè«ãšããŠã¯ãfunction process ã§ã¯ invocation å¢çã§ localhost ã« export ããremote backend ãžã® delivery 㯠Lambda extension ã«å¯ããããšããæ§æã«ããŸããã以äžããªããããªãããšå®è£ ãé ã«èŠãŠãããŸãã
åæ
æ¬èšäºã§ã¯ä»¥äžã® version ãå©çšããŸããã
lambda_runtime = "1.1.3"
opentelemetry = "0.31.0"
opentelemetry_sdk = "0.31.0"
opentelemetry-otlp = "0.31.1"
opentelemetry-appender-tracing = "0.31.1"
tracing-opentelemetry = "0.32.1"çšèª
- function process: Runtime API ã® client ãšã㊠handler ãå®è¡ããããã»ã¹
- extension process: Extensions API ã® client ãšã㊠function process ãšåã execution environment ã§åãè£å©ããã»ã¹
- Lambda service: Runtime API / Extensions API ãæäŸãã Lambda ã® control plane
- telemetry: logs, traces, metrics ãšãã£ãã¢ããªã±ãŒã·ã§ã³ãåºåãã芳枬ããŒã¿
Telemetry ãã〠export ããã
åé ã§æãã 2 ã€ã®å¶çŽ (freeze ã«ããéåæã¿ã¹ã¯ã®åæ¢ / lambda_runtime ãå¶åŸ¡ãè¿ããªã) ã«ã€ããŠããªããããªãã®ãã Lambda runtime ã®æåããé ã«èŠãŠãããŸãã
Lambda Runtime
Lambda ãšãããšå©çšè ã¯é¢æ°(function)ãæžããŠãLambda service ãåŒã³åºããå¶åŸ¡ããŠãããããã«æããŸãããå®äœãšããŠã¯å©çšè åŽã§ãªã¯ãšã¹ãã polling ãã pull åã§ãã
Lambda service 㯠/2018-06-01/runtime/invocation/next ãšãã API ãå
¬éããŠãããfunction process ã¯ãããåŒã³åºããŠãLambda invocation ã衚çŸãã EventResponse ãååŸããŸãã EventResponse ãå å·¥ã㊠handler ã«æž¡ããçµæãååŸããã®ã¡ã«ã/2018-06-01/runtime/invocation/{AwsRequestId}/response ãž POST ããŠçµæã Lambda service ã«è¿ããŸãã ã€ã¡ãŒãžãšããŠã¯ã以äžã®ãããªå®è£
ã§ãã
fn lambda_runtime_main_loop() {
loop {
// 次㮠invocation ãåŸ
〠(long-poll)
let event = http_get("/runtime/invocation/next").await;
// handler ãå®è¡
let result = handler(event).await;
// çµæã Lambda service ã«éç¥(id 㯠eventããååŸ)
match result {
Ok(response) => http_post("/runtime/invocation/{id}/response", response).await,
Err(err) => http_post("/runtime/invocation/{id}/error", err).await,
}
}
}ãã ãRuntime API ã®åŒã³åºãã invocation event ã®å€æçãå©çšè
åŽã§å®è£
ããã®ã¯å€§å€ãªã®ã§ãlambda-runtime crate ã AWS ããæäŸãããŠããŸãã lambda-runtime 㯠tower::Service ããŒã¹ã®å®è£
ãšãªã£ãŠãããå©çšè
åŽãå®çŸ©ãã handler Service<LambdaEvent<EventPayload>> ãšèªç¶ã« compose ã§ããèšèšã«ãªã£ãŠããŸããçŽ æŽãããã§ããã
äžèšã® loop ã¯å®éã«ã¯ä»¥äžã®ããã«å®è£
ãããŠããŸãã(process_invocation ã handler åŒã³åºã) lambda-runtime/src/runtime.rs
impl<S> Runtime<S>
where
S: Service<LambdaInvocation, Response = (), Error = BoxError>,
{
pub async fn run(self) -> Result<(), BoxError> {
/* ... */
let incoming = incoming(&self.client);
Self::run_with_incoming(self.service, self.config, incoming).await
}
pub(crate) async fn run_with_incoming(
mut service: S,
config: Arc<Config>,
incoming: impl Stream<Item = Result<http::Response<hyper::body::Incoming>, BoxError>> + Send,
) -> Result<(), BoxError> {
tokio::pin!(incoming);
while let Some(next_event_response) = incoming.next().await {
trace!("New event arrived (run loop)");
let event = next_event_response?;
process_invocation(&mut service, &config, event, true).await?;
}
Ok(())
}
}
fn incoming(
client: &ApiClient,
) -> impl Stream<Item = Result<http::Response<hyper::body::Incoming>, BoxError>> + Send + '_ {
async_stream::stream! {
loop {
trace!("Waiting for next event (incoming loop)");
let req = NextEventRequest.into_req().expect("Unable to construct request");
let res = client.call(req).await;
yield res;
}
}
}Freeze ãš Shutdown
ãããããããã€ã³ããªã®ã§ãããRuntime::run() ã¯åŒã³åºããšå¶åŸ¡ãè¿ããŸããã
The runtime and each extension indicate completion by sending a Next API request. Lambda freezes the execution environment when the runtime and each extension have completed and there are no pending events.
ãã® freeze ãš æé»ç㪠shutdown ã Lambda ã®ãã€ã³ãã§ãã function process(Runtime) ã /runtime/invocation/next ãåŒãã æç¹ã§å®è¡ç°å¢ã freeze ããå¯èœæ§ãããã®ã§ãbackground ã®éåæã¿ã¹ã¯ã®å®è¡ãä¿èšŒãããŸããã freeze ããŠãããã®ãŸãŸ shutdown ããŠããŸããšãDrop çãèµ°ããªãã®ã§ãflush åŠçã®ã¿ã€ãã³ã°ããªãããšã課é¡ã§ãã
handler åŠçã®äžã§ telemetry export ãŸã§å®è¡ããŠããŸãã°ããã®ã§ãããexport ã¯éåžž network call ã䌎ããŸããããã®æéåã ã response ãè¿ãæéãé ããŠããŸããŸãããŸããLambda ãé£ç¶ããŠåŒã³åºãããå ŽåãããçšåºŠãŸãšããŠãã export ããã»ããå¹ççã§ãããããã§ããŸããã
Lambda Extension
ãããŸã§ã®åé¡ããŸãšãããšãfunction process (handler) ãšã¯å¥ã®ã³ã³ããã¹ãã§åããhandler ã®åŠçãå®äºãããŸã§ freeze ããããçµäºæã« flush ã§ããä»çµã¿ãå¿ èŠããšããããšã«ãªããŸãã
ãã®ä»çµã¿ãæäŸããŠãããã®ã extension ã§ãã Extension 㯠Lambda ã®å®è¡ image ã® /opt/extensions ã«çœ®ã binary ã§ãLambda service ãèµ·åããŠãããŸãã Extension çšã® API ãçšæãããŠããã/extension/register ã§ extension ãç»é²ãã/extension/event/next ãåŒã³åºããšãfunction process åæ§ãLambda service ã event ãè¿ããŠãããŸãã ãã® event ã«ã¯ãInvoke ãš Shutdown ã® 2 çš®é¡ãããããããã function process ã®èµ·åæãš shutdown æã«ã¬ã¹ãã³ã¹ãè¿ã£ãŠããŸãã ãããŠããã»ã©åŒçšããããã«
Lambda freezes the execution environment when the runtime and each extension have completed and there are no pending events.
Lambda service 㯠extension ã®å®äºãåŸ ã£ãŠ freeze ããŠãããŸãã ãŸããfunction process ãš extension 㯠localhost ã§éä¿¡ã§ããã®ã§ãextension ã localhost ã§ OTLP server ã listen ããŠããã°ãfunction process 㯠localhost ãž OTLP export ã§ããŸãã
Shutdown æã¯ /extension/event/next ã Shutdown event ãè¿ããŠãããã®ã§ãã®ã¿ã€ãã³ã°ã§ flush ã§ããŸããããã«ãããfunction process ããã® telemetry ãããçšåºŠ extension ã§ queue çã«ä¿æããŠãã㊠éåæã§ export ããããšãå¯èœã§ãã
Runtime API ã®ãã³ããªã³ã°ã lambda-runtime ãå®è£
ããŠãããŠããããã«ãExtensions API ã®ãã³ããªã³ã°ã¯ lambda-extension crate ãå®è£
ããŠãããŠããŸãã äŸã«ãã£ãŠãã¡ãã user ã Invoke, Shutdown ãåŠçãã tower::Service ãæž¡ãã°ããèšèšã«ãªã£ãŠããŸãã
SDK ã®èšå®
function process 㯠invoke æ¯ã« localhost ã® extension ã« telemetry ã export ããextension ã¯äžå®æ° queueing ããã®ã¡ã« remote backend ã« export ããŠãShutdown æã« flush ãããšããæ¹éãç«ã¡ãŸããã
次ã«å®éã« function process åŽãèšè£ ããŠãããŸãã å signal ã«ã€ããŠä»¥äžã®æ¹éããšããŸãã
- Logs: tracing -> tracing_subscriber -> opentelemetry-appender-tracing
- Traces: tracing -> tracing_subscriber -> tracing-opentelemetry
- Metrics: opentelemetry_sdk
ãŸããå šäœåãšããŠã¯ä»¥äžã®ãããªè²¬ååæ ãèããŠããŸãã
+------------------------+ +-------------------------+
| function process | OTLP | extension process |
|------------------------| --------> |-------------------------|
| tracing_subscriber | localhost | local otlp receiver |
| trace/log provider | | batch queue |
| meter provider | | remote exporter |
| handler | | shutdown flush |
+------------------------+ +-------------------------+function process åŽã¯ invoke ã® hot path ã«ããã®ã§ãããã§ã¯ signal ãçæã㊠localhost ã«æãããšãããŸã§ã«è²¬åãéå®ããŸãã äžæ¹ã§ extension process åŽã¯ invoke ã®å®äºåŸããã°ããåããã®ã§ãnetwork è¶ãã® export ã retry, Shutdown æã® flush ã¯ãã¹ãŠãã¡ãã«å¯ããŸãã
Logs
Logs ã«ã€ããŠã¯ãå
¬åŒã opentelemetry-appender-tracing ã§æäŸããŠãã OpenTelemetryTracingBridge ã impl tracing_subscriber::Layer ããŠããã®ã§ãtracing_subscriber ãš compose ã§ããŸãã ãã㯠Logs signal ã®èšèšææ³ã«æ²¿ã£ããã®ã§ãã
Our approach with logs is somewhat different. For OpenTelemetry to be successful in logging space we need to support existing legacy of logs and logging libraries, while offering improvements and better integration with the rest of observability world where possible.
ãšããããšã§ãLogs ã«é¢ããŠã¯ log çš API ãçšæããŠãããå©ããŠãããã®ã§ã¯ãªããæ¢åã® logging ecosystem ã«æ¥ç¶ããŠãããšããèšèšæ¹éã®ããã§ãã
Traces
Traces ãåãã tracing ecosystem ãå©çšããŸãããtracing_opentelemetry ã¯å ¬åŒããæäŸãããŠãããã®ã§ã¯ãªããtracing project ã§ãã¹ããããŠããŸãã ãããã£ãŠãtraces ã«é¢ããŠã bridge ã¢ãããŒããåãã®ã¯èšèšææ³ã«åãããšãæããŸãããã®ç¹ã«é¢ããŠã¯ãã§ã« issue OpenTelemetry Tracing API vs Tokio-Tracing API for Distributed Tracing ã§è°è«ãããŠããŸããèŠããã«ãã®è°è«ã¯ãtraces ãçæããéã«ãopentelemetry-rust ãšã㊠tracing ãš OpenTelemetry Trace API ã®ã©ã¡ããæšå¥šããããšãããã®ã§ãããã® issue 㯠2026-03-18 ã«ã¯ããŒãºãããŠãçŸåš docs: start doc for distributed tracing and logs guidance ã§ document ãå¶äœäžã§ãã
ãŸããtraces é¢é£ã§ã¯ä»æ§ã«å€§ããªå€æŽããããŸããã Deprecating Span Events API ã§
OpenTelemetry is deprecating the Span Event API.
ãšããŠãspan event api ã deprecate ã«ãªããŸããã ãããŸã§ span ã®äžã§ logging ãããšãspan event ãšããŠã logs ãšããŠãèšé²ãããåãæ å ±ã 2 éã«çæãããŠããŸããã å人çã«ã¯åé·ã§åããã¥ãããšæãã€ã€ãspan 㯠sampling ããããã®ã§äž¡æ¹æ®ãã®ã¯ä»æ¹ãªããšãèããŠããŸããã ä»åŸã¯ logs äžæ¬ã«éçŽã§ãããã§ãã
Metrics
æåŸã« Metrics ã§ãããããŸã§ tracing ecosystem ãå©çšããŠããŸããããmetrics ã«é¢ããŠã¯ãopentelemetry api ããã®ãŸãŸäœ¿ãã®ãããããªãšããã®ãçŸæç¹ã®çµè«ã§ããtracing-opentelemetry ã«ã MetricsLayer ããã
tracing::info!(counter.foo = 1);ã®ãã㪠tracing event ãšã㊠counter ã衚çŸã§ããŸãã å
éšçã«ã¯ãMetricsLayer ãç¹å®ã® event field ãã¿ãŠãmetrics ã®åœåèŠçŽã«äžèŽããŠãããšãååã® event åŠçã« instrument ãçæããŸãã
ãã®å®è£
㯠counter ã®ãããªã·ã³ãã«ãª instrument ã§ã¯äŸ¿å©ãªã®ã§ãããinstrument ã®åæåã¯å®éã«ã¯ãããããè€éã ãšèããŠããŸããäŸãã° counter ã«ã€ããŠã¯ãdescription ãš unit ãæå®ããããšããã§ãããhistogram ã«é¢ããŠã¯ boundary ãšã㊠Vec<f64> ãæž¡ãå¿
èŠããããŸãã
ã€ãŸããtracing-opentelemetry ã§ metrics ãæ±ãæ¹æ³ã¯ãinstrument ã®çæãšç¶æ 倿Žã®è²¬åãäžäœåãããŠããããšãåé¡ã§ãããšèããŠããŸãã
ãªã®ã§ä»ã®ãšããã¯ãapplication åæåæã«ãinstrument ãåæåããŠãglobal 倿°ã«ã»ããããã»ãã䜿ããããããªãšèããŠããŸãã
pub static FOO_COUNTER: LazyLock<Counter<u64>> = LazyLock::new(|| {
METER
.u64_counter("foo")
.with_description("description")
.with_unit("{foo}")
.build()
});
fn process() {
FOO_COUNTER.add(1, &[KeyValue::new("key", "val")]);
}ãšããããšã§ãlogs, traces, metrics ããããã§å©çšãã crate ãç°ãªã£ãŠããŸãã®ããèªåã®çŸç¶ã§ããå°æ¥çã«ã¯ãtracing, opentelemetry éã®é¢ä¿ãæŽçãããããçµ±äžç㪠layer ãã§ãŠããŠãããããšã«æåŸ ããŠããŸãã
Function process ã®èšè£
ãããã function process åŽã®èšè£ ãå®è£ ããŠãããŸãã åè¿°ã®éããLambda ã§ã¯ handler ã®å€ã§åã background task ã process çµäºæã® Drop ã« export å®äºãæåŸ ã§ããŸããã ãããã£ãŠ function process åŽã§ã¯ãOpenTelemetry SDK ãçšæããŠããæšæºã® batch processor / periodic reader ã«ã¯ãã®ãŸãŸä¹ãããinvocation ã®å¢çã§æç€ºçã« flush ã§ããæ§æã«ããŸãã
å
·äœçã«ã¯ãlogs / traces 㯠manual processor ã§ process å
ã® queue ã«ç©ã¿ãmetrics 㯠manual reader ã§ collect ã§ããããã«ããŸãã handler ã®å®è¡äžã¯ signal ãèšé²ããã ãã«ããŠãhandler ã result ãè¿ããåŸãtower::Layer ã§ localhost ã® extension ã« flush ããŸãã remote backend ãžã® batch export ã retryãShutdown æã® flush 㯠extension process åŽã®è²¬åã«ããŸãã
Resource
ãŸããlogs / traces / metrics ã«å
±éã§ä»äžãã Resource ãçµã¿ç«ãŠãŸãã service.name ã deployment.environment.name ã®ãããªæ
å ±ã¯å log record ã span attribute ã«æ¯åä»ããã®ã§ã¯ãªããprovider åæåæã« resource attribute ãšããŠãŸãšããŠèšå®ããŸãã
use opentelemetry::KeyValue;
use opentelemetry_sdk::{Resource, resource::EnvResourceDetector};
use opentelemetry_semantic_conventions::{
SCHEMA_URL,
resource::{
DEPLOYMENT_ENVIRONMENT_NAME,
SERVICE_NAME,
SERVICE_NAMESPACE,
SERVICE_VERSION,
},
};
fn build_resource() -> Resource {
Resource::builder_empty()
.with_detector(Box::new(EnvResourceDetector::new()))
.with_schema_url(
[
KeyValue::new(SERVICE_NAMESPACE, "example"),
KeyValue::new(SERVICE_NAME, env!("CARGO_PKG_NAME")),
KeyValue::new(SERVICE_VERSION, env!("CARGO_PKG_VERSION")),
KeyValue::new(DEPLOYMENT_ENVIRONMENT_NAME, "development"),
],
SCHEMA_URL,
)
.build()
}Logs pipeline
Logs ã®å
¥å£ã¯å
ã»ã©æžããéã opentelemetry-appender-tracing ã® OpenTelemetryTracingBridge ã䜿ããŸãã ãã ããSdkLoggerProvider ã«æšæºã® batch processor ãèšå®ã㊠background export ãããã®ã§ã¯ãªããèªåã® processor ã§äžæŠ queue ã«ç©ãããã«ããŸãã
æ§é ãšããŠã¯ä»¥äžã®ããã«ãªããŸãã
tracing::event!
-> tracing_subscriber
-> OpenTelemetryTracingBridge
-> SdkLoggerProvider
-> ManualLogProcessor
-> in-memory queueManualLogProcessor ã® hot path ã§ããããšã¯ãSdkLogRecord ãš InstrumentationScope ã clone ã㊠queue ã« push ããã ãã§ãã ããã§ã¯ OTLP HTTP request ãå®è¡ããŸããã
use std::{
collections::VecDeque,
sync::{Arc, Mutex},
time::Duration,
};
use opentelemetry::InstrumentationScope;
use opentelemetry_sdk::{
error::OTelSdkResult,
logs::{LogBatch, LogProcessor, SdkLogRecord},
};
#[derive(Clone)]
struct ManualLogProcessor {
queue: Arc<Mutex<VecDeque<(SdkLogRecord, InstrumentationScope)>>>,
}
impl LogProcessor for ManualLogProcessor {
fn emit(&self, record: &mut SdkLogRecord, scope: &InstrumentationScope) {
let Ok(mut queue) = self.queue.lock() else {
return;
};
queue.push_back((record.clone(), scope.clone()));
}
fn force_flush(&self) -> OTelSdkResult {
Ok(())
}
fn shutdown_with_timeout(&self, _timeout: std::time::Duration) -> OTelSdkResult {
Ok(())
}
}force_flush ã shutdown_with_timeout 㯠no-op ã«ããŠããŸãã ããã§åæçã« export ããããšãããšãçµå± handler path ã« network I/O ãæ»ããŠããŸãããã§ãã flush ã¯å¥ã«æã€ handle ãã async ã«å®è¡ããŸãã
struct LogState {
provider: SdkLoggerProvider,
flush: LogFlushHandle,
}
#[derive(Clone)]
struct LogFlushHandle {
queue: Arc<Mutex<VecDeque<(SdkLogRecord, InstrumentationScope)>>>,
exporter: Arc<LogExporter>,
}
impl LogFlushHandle {
async fn flush(&self, timeout: Duration) {
let drained = {
let Ok(mut queue) = self.queue.lock() else {
return;
};
queue.drain(..).collect::<Vec<_>>()
};
if drained.is_empty() {
return;
}
let records = drained
.iter()
.map(|(record, scope)| (record, scope))
.collect::<Vec<_>>();
let batch = LogBatch::new(&records);
match tokio::time::timeout(timeout, self.exporter.export(batch)).await {
Ok(Ok(())) => {}
Ok(Err(err)) => eprintln!("failed to export logs: {err}"),
Err(_) => eprintln!("log export timed out"),
}
}
}LogState::build ã§ã¯ãmanual processor ãš flush handle ãåã queue ãèŠãããã«çµã¿ç«ãŠãŸãã LogExporter 㯠localhost extension ã® /v1/logs ã«åããŸãã
impl LogState {
fn build(resource: Resource, endpoint: &str) -> Result<Self, Error> {
let queue = Arc::new(Mutex::new(VecDeque::new()));
let mut exporter = LogExporter::builder()
.with_http()
.with_endpoint(format!("{endpoint}/v1/logs"))
.build()?;
exporter.set_resource(&resource);
let exporter = Arc::new(exporter);
let processor = ManualLogProcessor {
queue: queue.clone(),
};
let provider = SdkLoggerProvider::builder()
.with_resource(resource)
.with_log_processor(processor)
.build();
Ok(Self {
provider,
flush: LogFlushHandle { queue, exporter },
})
}
}å®éã«ã¯ queue ã®äžéãoverflow æã® drop counterãflush timeout ãªã©ãå¿
èŠã«ãªããŸãã ãã ãèšèšäžã®èŠç¹ã¯ emit ã§ã¯ network I/O ããããinvocation ã®æ«å°Ÿã§ãŸãšã㊠localhost ã® extension ã« export ããããšã§ãã
ãªã export 倱ææã®éç¥ã« tracing::warn! ã䜿ããšãOpenTelemetryTracingBridge çµç±ã§åã³ logs queue ã«å
¥ã£ãŠããŸããèªå·±åç
§ã«ãªããŸããããã§ã¯ eprintln! ã§ stderr ã«åºã㊠CloudWatch Logs ããæŸãæ¹éã«ããŠããŸãã
Traces pipeline
Traces ã«ã€ããŠãèãæ¹ã¯ Logs ãšåãã§ãã tracing-opentelemetry ã® OpenTelemetryLayer ã䜿ã£ãŠ tracing::span! ãã OpenTelemetry span ãçæããŸãããSdkTracerProvider ã«ã¯ BatchSpanProcessor ãèšå®ããŸããã BatchSpanProcessor 㯠background worker ã« export ãä»»ããèšèšãªã®ã§ãLambda ã® freeze ãšçžæ§ãæªãããã§ãã
æ§é ãšããŠã¯ä»¥äžã®ããã«ãªããŸãã
tracing::span!
-> tracing_subscriber
-> tracing_opentelemetry::OpenTelemetryLayer
-> SdkTracerProvider
-> ManualSpanProcessor
-> in-memory queueManualSpanProcessor ã§ã¯ãspan çµäºæã«åŒã°ãã on_end ã§ SpanData ã queue ã«ç©ã¿ãŸãã
use std::{
collections::VecDeque,
sync::{Arc, Mutex},
time::Duration,
};
use opentelemetry::Context;
use opentelemetry_sdk::{
error::OTelSdkResult,
trace::{Span, SpanData, SpanProcessor},
};
#[derive(Clone)]
struct ManualSpanProcessor {
queue: Arc<Mutex<VecDeque<SpanData>>>,
}
impl SpanProcessor for ManualSpanProcessor {
fn on_start(&self, _span: &mut Span, _cx: &Context) {
}
fn on_end(&self, span: SpanData) {
if !span.span_context.is_sampled() {
return;
}
let Ok(mut queue) = self.queue.lock() else {
return;
};
queue.push_back(span);
}
fn force_flush(&self) -> OTelSdkResult {
Ok(())
}
fn shutdown_with_timeout(&self, _timeout: std::time::Duration) -> OTelSdkResult {
Ok(())
}
}force_flush ãš shutdown_with_timeout ã no-op ã«ããçç±ã¯ Logs ãšåãããexport ã invocation æ«å°Ÿã® flush handle ã«éçŽããããã§ãã
struct TraceState {
provider: SdkTracerProvider,
flush: SpanFlushHandle,
}
#[derive(Clone)]
struct SpanFlushHandle {
queue: Arc<Mutex<VecDeque<SpanData>>>,
exporter: Arc<SpanExporter>,
}
impl SpanFlushHandle {
async fn flush(&self, timeout: Duration) {
let spans = {
let Ok(mut queue) = self.queue.lock() else {
return;
};
queue.drain(..).collect::<Vec<_>>()
};
if spans.is_empty() {
return;
}
match tokio::time::timeout(timeout, self.exporter.export(spans)).await {
Ok(Ok(())) => {}
Ok(Err(err)) => eprintln!("failed to export spans: {err}"),
Err(_) => eprintln!("span export timed out"),
}
}
}TraceState::build ã®æ§é 㯠LogState::build ãšåãã§ãprocessor ãš flush handle ãåã queue ãå
±æããSpanExporter 㯠localhost extension ã® /v1/traces ã«åããŸãã
impl TraceState {
fn build(resource: Resource, endpoint: &str) -> Result<Self, Error> {
let queue = Arc::new(Mutex::new(VecDeque::new()));
let mut exporter = SpanExporter::builder()
.with_http()
.with_endpoint(format!("{endpoint}/v1/traces"))
.build()?;
exporter.set_resource(&resource);
let exporter = Arc::new(exporter);
let processor = ManualSpanProcessor {
queue: queue.clone(),
};
let provider = SdkTracerProvider::builder()
.with_resource(resource)
.with_sampler(Sampler::ParentBased(Box::new(
Sampler::TraceIdRatioBased(0.1),
)))
.with_span_processor(processor)
.build();
Ok(Self {
provider,
flush: SpanFlushHandle { queue, exporter },
})
}
fn tracer(&self) -> Tracer {
self.provider.tracer("lambda-app")
}
}å®è£
äžã¯ sampler ã®æ±ããããã§æ±ºããŸãã äŸãã° ParentBased(TraceIdRatioBased) ã䜿ãå Žåã§ããsampling å€å®èªäœã¯ SDK åŽã«ä»»ããprocessor ã§ã¯ sampled 㪠span ã ãã queue ã«ç©ãããšãã責ååæ
ã«ããŠããŸãã
Metrics pipeline
Metrics ã§ã¯ PeriodicReader ã䜿ããŸããã PeriodicReader ã¯ååã®éã background task / timer ã§å®æçã« collect + export ãããããLambda ã® freeze äžã«åãããšãæåŸ
ã§ããªãããã§ãã 代ããã« ManualReader ã SdkMeterProvider ã«ç»é²ããinvocation æ«å°Ÿã§æç€ºçã« collect ããŸãã
æ§é ãšããŠã¯ä»¥äžã®ããã«ãªããŸãã
Counter::add / Histogram::record
-> SdkMeterProvider
-> ManualReader
-> ResourceMetrics
-> MetricExporteruse std::{
sync::{Arc, Weak},
time::Duration,
};
use opentelemetry_sdk::{
error::OTelSdkResult,
metrics::{
InstrumentKind, ManualReader, Pipeline, Temporality,
data::ResourceMetrics,
reader::MetricReader,
},
};
#[derive(Clone, Debug)]
struct SharedManualReader(Arc<ManualReader>);
impl MetricReader for SharedManualReader {
fn register_pipeline(&self, pipeline: Weak<Pipeline>) {
self.0.register_pipeline(pipeline);
}
fn collect(&self, rm: &mut ResourceMetrics) -> OTelSdkResult {
self.0.collect(rm)
}
fn force_flush(&self) -> OTelSdkResult {
self.0.force_flush()
}
fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult {
self.0.shutdown_with_timeout(timeout)
}
fn temporality(&self, kind: InstrumentKind) -> Temporality {
match kind {
InstrumentKind::Counter | InstrumentKind::Histogram => Temporality::Delta,
_ => Temporality::Cumulative,
}
}
}temporality 㯠metrics ã®å€ãã©ãããæå³ã§ export ããããæ±ºããŸãã Cumulative ã¯ãéå§æç¹ããçŸåšãŸã§ã®çޝç©å€ããéããDelta ã¯ãåå collect ããŠããä»å collect ãããŸã§ã®å¢åããéããŸãã
éåžžã® long-running process ã§ã¯ Cumulative ã§ãæ±ããããã§ãã process ãé·ãçããã®ã§ãcounter ã® start time ãå®å®ããŠãããbackend åŽãåã process ãã䌞ã³ç¶ããæç³»åãšããŠè§£éã§ããŸãã
äžæ¹ã§ Lambda ã§ã¯ execution environment ã cold start / warm start ã«ãã£ãŠå¢æžããŸãã åã Lambda function ã§ããè£åŽã§ã¯è€æ°ã® execution environment ã䞊è¡ããŠååšãããããããå¥ã ã® start time ãæã¡ãŸãã ãŸãããã°ãã invoke ãç¡ããã° environment ã¯ç Žæ£ãããæ¬¡ã® cold start ã§ counter 㯠0 ããå§ãŸããŸãã
äŸãã° request.count counter ã Cumulative ã§éããšã次ã®ãããªå€ã«ãªããŸãã
env A: 1, 2, 3, 4
env B: 1, 2
env C: 1ããã¯ããããã® execution environment å ã§ã¯æ£ãã环ç©å€ã§ãããbackend ã§ Lambda function å šäœã® request æ°ãšããŠæ±ãã«ã¯ãæç³»åã® identity ã reset ãæ£ããè§£éããå¿ èŠããããŸãã backend ã start time ã resource attribute ã®æ±ãã«ææã ãšãenvironment ããšã«å¥ series ã«ãªã£ãããreset ããŸããã å¢åèšç®ãæåŸ éãã«ãªããªãã£ããããŸãã
Delta ã«ãããšãå flush ãããã® invocation ã§å¢ããåãã«è¿ãå€ã«ãªããŸãã
env A: 1, 1, 1, 1
env B: 1, 1
env C: 1Lambda ã§ã¯ invocation å¢çã§ flush ããèšèšã«ããŠããã®ã§ãDelta 㯠function å
šäœã§éèšãããã圢ã«ãªããŸãã ãã¡ãã backend ã reader ã®ä»æ§ã«äŸåããéšåã¯ãããŸãããLambda ã®ããã« process lifetime ãå®å®ããªãå®è¡ç°å¢ã§ã¯ãcounter / histogram 㯠Delta ãšããŠéãã»ããéçšäžãããããããšèããŠããŸãã
flush åŽã§ã¯ ManualReader::collect ã§ ResourceMetrics ãäœããããã localhost ã® extension ã« export ããŸãã
struct MeterState {
provider: SdkMeterProvider,
flush: MeterFlushHandle,
}
#[derive(Clone)]
struct MeterFlushHandle {
reader: SharedManualReader,
exporter: Arc<MetricExporter>,
}
impl MeterFlushHandle {
async fn flush(&self, timeout: Duration) {
let mut metrics = ResourceMetrics::default();
if let Err(err) = self.reader.collect(&mut metrics) {
eprintln!("failed to collect metrics: {err}");
return;
}
if metrics.scope_metrics().all(|scope| scope.metrics().count() == 0) {
return;
}
match tokio::time::timeout(timeout, self.exporter.export(&metrics)).await {
Ok(Ok(())) => {}
Ok(Err(err)) => eprintln!("failed to export metrics: {err}"),
Err(_) => eprintln!("metric export timed out"),
}
}
}MeterState::build ã§ã¯ãSdkMeterProvider ã«ç»é²ãã reader ãšãflush åŽã§ collect ãã reader ãåããã®ã«ããŸãã MetricExporter 㯠localhost extension ã® /v1/metrics ã«åããŸãã
impl MeterState {
fn build(resource: Resource, endpoint: &str) -> Result<Self, Error> {
let reader = SharedManualReader(Arc::new(
ManualReader::builder().build(),
));
let exporter = Arc::new(
MetricExporter::builder()
.with_http()
.with_endpoint(format!("{endpoint}/v1/metrics"))
.build()?,
);
let provider = SdkMeterProvider::builder()
.with_resource(resource)
.with_reader(reader.clone())
.build();
Ok(Self {
provider,
flush: MeterFlushHandle { reader, exporter },
})
}
}ããã§ handler åŽã¯ Counter::add ã Histogram::record ãåŒã¶ã ãã«ãªããŸãã metrics ã® collect / export 㯠reader åŽã«éã蟌ããhandler ã® business logic ããã¯èŠããªãããã«ããŸãã
Subscriber / Provider ã®åæå
ãããŸã§ã§ logs / traces / metrics ããããã® provider ãš flush handle ãäœããããã«ãªããŸããã æ¬¡ã«ããããã application ã®å ¥å£ã§äžåºŠã ãåæåããŸãã
Logs ãš Traces 㯠tracing_subscriber ã® layer ãšããŠæ¥ç¶ããŸãã Logs 㯠OpenTelemetryTracingBridgeãTraces 㯠tracing_opentelemetry::OpenTelemetryLayer ã䜿ããŸãã Metrics 㯠global::set_meter_provider ã§ global provider ãšããŠç»é²ããŸãã
struct Providers {
logs: LogState,
traces: TraceState,
metrics: MeterState,
}
impl Providers {
fn flush_handle(&self) -> FlushHandle {
FlushHandle {
logs: self.logs.flush.clone(),
traces: self.traces.flush.clone(),
metrics: self.metrics.flush.clone(),
}
}
}
fn init_telemetry() -> Result<Providers, Error> {
let resource = build_resource();
let endpoint = "http://127.0.0.1:4318";
let logs = LogState::build(resource.clone(), endpoint)?;
let traces = TraceState::build(resource.clone(), endpoint)?;
let metrics = MeterState::build(resource, endpoint)?;
let log_layer =
OpenTelemetryTracingBridge::new(&logs.provider);
let trace_layer =
tracing_opentelemetry::layer().with_tracer(traces.tracer());
tracing_subscriber::registry()
.with(tracing_subscriber::fmt::layer())
.with(trace_layer)
.with(log_layer)
.with(tracing_subscriber::EnvFilter::from_default_env())
.init();
global::set_meter_provider(metrics.provider.clone());
Ok(Providers {
logs,
traces,
metrics,
})
}ããã§ã export å
㯠remote backend ã§ã¯ãªã localhost ã® extension ã§ãã function process 㯠observability backend ã® URL ãèªèšŒæ
å ±ãç¥ãããhttp://127.0.0.1:4318/v1/logs ãªã©ã« OTLP HTTP ã§æããã ãã«ããŠãããŸãã remote backend ãžã®æ¥ç¶èšå®ã¯ extension process åŽã«éã蟌ããŸãã
fmt::layer() ãæ®ããŠããã®ã¯ãLambda ã®æšæºåºåã«åºã log 㯠CloudWatch Logs ã«æµããããã§ãã OTLP åŽã®çéã確èªããŠããéã¯ãstdout ãš OTLP ã®äž¡æ¹ã«åºããŠããã»ãããããã°ããããã§ãã æçµçã« stdout ãæ®ããã©ããã¯éçšæ¹é次第ã§ãã
Invocation æ«å°Ÿã§ flush
ãããŸã§ã®æ§æã§ã¯ãhandler ãå®è¡ãããŠããéã« logs / traces / metrics ã¯ãããã process å ã«è²¯ãŸããŸãã ãã ãã貯ããã ãã§ã¯ extension ã«ã¯å±ããŸããã ããã§ invocation ã®æ«å°Ÿã§ãå pipeline ã® flush handle ãæç€ºçã«åŒã³åºããŸãã
handler ã®äžã«çŽæ¥ flush().await ãæžãããšãã§ããŸãããbusiness logic ãš telemetry delivery ã®è²¬åãæ··ãã£ãŠããŸããŸãã lambda_runtime 㯠tower::Service ããŒã¹ãªã®ã§ãflush 㯠tower::Layer ãšããŠå·®ã蟌ãã»ããæ±ããããã§ãã
#[derive(Clone)]
struct FlushLayer {
handle: FlushHandle,
timeout: Duration,
}
struct FlushService<S> {
inner: S,
handle: FlushHandle,
timeout: Duration,
}
impl<S> tower::Layer<S> for FlushLayer {
type Service = FlushService<S>;
fn layer(&self, inner: S) -> Self::Service {
FlushService {
inner,
handle: self.handle.clone(),
timeout: self.timeout,
}
}
}FlushService 㯠handler ã® result ãåŸ
ã£ãåŸã« flush ããŸãã handler ã Ok ãè¿ããå Žåã Err ãè¿ããå Žåããtelemetry ã¯ã§ããã ã extension ã«æž¡ãããã®ã§ãresult ã®äžèº«ã«é¢ä¿ãªã flush ã詊ã¿ãŸãã
impl<S, T, R> Service<LambdaEvent<T>> for FlushService<S>
where
S: Service<LambdaEvent<T>, Response = R>,
S::Future: Future<Output = Result<R, S::Error>> + Send + 'static,
S::Error: Send + 'static,
R: Send + 'static,
{
type Response = R;
type Error = S::Error;
type Future = Pin<Box<dyn Future<Output = Result<R, Self::Error>> + Send>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}
fn call(&mut self, event: LambdaEvent<T>) -> Self::Future {
let handle = self.handle.clone();
let timeout = self.timeout;
let fut = self.inner.call(event);
Box::pin(async move {
let result = fut.await;
handle.flush_all(timeout).await;
result
})
}
}flush_all ã§ã¯ logs -> traces -> metrics ã®é ã« flush ããŸãã flush å
㯠localhost ã® extension ãªã®ã§ remote backend ãžçŽæ¥éãããã¯è»œãã§ãããããã§ã HTTP request ãªã®ã§ timeout ã¯å¿
ãèšå®ããŸãã
#[derive(Clone)]
struct FlushHandle {
logs: LogFlushHandle,
traces: SpanFlushHandle,
metrics: MeterFlushHandle,
}
impl FlushHandle {
async fn flush_all(&self, timeout: Duration) {
self.logs.flush(timeout).await;
self.traces.flush(timeout).await;
self.metrics.flush(timeout).await;
}
}ããã§ application code ããèŠããšãhandler ã§ã¯ tracing::info! ã Counter::add ãåŒã¶ã ãã§ãã signal ã® delivery 㯠service layer ã«éã蟌ããããã®ã§ãå handler ã« flush åŠçãæžãå¿ããåé¡ãé¿ããããŸãã
æåŸã«ãmain ã§ã¯åæåãã Providers ãã flush handle ãåãåºããŠãhandler service ã« layer ãéããŸãã
#[tokio::main]
async fn main() -> Result<(), Error> {
let providers = init_telemetry()?;
let handler = service_fn(handler);
let service = ServiceBuilder::new()
.layer(FlushLayer {
handle: providers.flush_handle(),
timeout: Duration::from_millis(500),
})
.service(handler);
Runtime::new(service).run().await?;
Ok(())
}ããã§ providers 㯠main scope ã«ä¿æãç¶ããŸãã FlushHandle ã¯å
éšã® Arc ã clone ã㊠layer ã«æž¡ããŸãããprovider èªäœã® lifetime 㯠providers ãæã¡ãŸãã flush ã®ã¿ã€ãã³ã°ã¯ãããŸã§ FlushLayer ã管çããDrop ã process shutdown ã«ã¯é Œããªãããã«ããŸãã
Lambda extension ã®å®è£
ãããŸã§ã§ãfunction process 㯠invocation ã®æ«å°Ÿã§ localhost ã® extension ã« telemetry ãæž¡ãã ãã«ãªããŸããã æ¬¡ã«ããã®åãå£ã«ãªã Lambda extension ãå®è£ ããŸãã
extension process ã®è²¬åã¯ä»¥äžã§ãã
- Extensions API ã§ extension ãç»é²ãã
Invoke/Shutdownevent ãåãåããããã«ãã - localhost ã§ OTLP receiver ã listen ããfunction process ããéãããŠãã telemetry ãåãåã
- åãåã£ã telemetry ã extension process å ã§ buffer ããå¿ èŠãªåäœã§ remote backend ã« export ãã
Shutdownevent ãåãåã£ãããæ®ã£ãŠãã telemetry ã flush ãã
function process ã§ã¯ background worker ã process shutdown ã«é Œããªãããã«ããŸããã äžæ¹ã§ extension process 㯠Extensions API ã® event loop ãæã£ãŠãããããShutdown event ãæç€ºçã«åŠçã§ããŸãã ãã®ãã remote backend ãžã® batch exportãretryãshutdown flush 㯠extension process åŽã«å¯ããŸãã
å®è£
äžã¯ãOTLP receiver ãš Extensions API ã® event loop ãåæã«åããå¿
èŠããããŸãã receiver 㯠function process ããã® /v1/logsã/v1/tracesã/v1/metrics ãåãä»ããŸãã event loop 㯠/extension/event/next ã§æ¬¡ã® Invoke ãŸã㯠Shutdown event ãåŸ
ã¡ãŸãã
ãã®ãšããextension åŽã® receiver ã¯æåã® invocation ãå§ãŸãåã« listen ã§ããŠããå¿ èŠããããŸãã function process 㯠invocation ã®æ«å°Ÿã§ extension ã« flush ãããããreceiver ã®èµ·åãé ãããšæåã® telemetry ãåãåããŸããã ãããã£ãŠ extension ã®åæåã§ã¯ãExtensions API ãžã®ç»é²ãæžãŸããããã§ãæåã® event polling ã«å ¥ãåã« receiver ã listen ããŠããããšããé åºã«ããŸãã
ã€ã¡ãŒãžãšããŠã¯ä»¥äžã®ãããªå®è£ ã§ãã
async fn extension_main_loop() -> Result<(), Error> {
let extension = register_extension().await?;
let telemetry = TelemetryBuffer::new();
let receiver = start_otlp_receiver("127.0.0.1:4318", telemetry.clone()).await?;
let exporter = RemoteExporter::new();
loop {
let event = extension.next_event().await?;
match event {
ExtensionEvent::Invoke { .. } => {
telemetry.export_ready_batch(&exporter).await?;
}
ExtensionEvent::Shutdown { .. } => {
receiver.shutdown().await?;
telemetry.flush_all(&exporter).await?;
break;
}
}
}
Ok(())
}å®éã®å®è£
ã§ã¯ãInvoke event ãåãåã£ãæç¹ã§ã¯ function ã®åŠçã¯ãŸã å®äºããŠããŸããã ãã®ãããInvoke event ã¯ããã® invocation ã® telemetry ã flush ããã¿ã€ãã³ã°ããšãããããååãŸã§ã«æºãŸã£ãŠãã telemetry ã backend ã«éãæ©äŒãšããŠæ±ããŸãã çŸåšå®è¡äžã® invocation ã§çºçãã telemetry ã¯ãfunction process ã handler ã®çµäºåŸã« localhost receiver ãžéã£ãŠããŸãã
åç §å®è£ ãšããŠãlambda-observability ã® opentelemetry-lambda-extension ããããŸãã ãã®å®è£ ã§ããExtensions API ã®ãã³ããªã³ã°ã«ã¯ lambda-extension ãå©çšãããŠããŸãã
ExtensionRuntime::run() ã§ã¯ãOTLP receiver ã®èµ·åãš Extension::run() ãçµã¿åããããŠããŸãã èªåã®èŠä»¶ã§ã¯ãflush ã®å€å®ããäŸå crate (ç¹ã« opentelemetry ã tokio) ã®ããŒãžã§ã³ãèªåã§å¶åŸ¡ãããã£ãã®ã§çŽæ¥å©çšã¯ããŸããã§ããããæ§æãèããããã§éåžžã«åèã«ãªããŸããã
ãŸãšã
Lambda ã® lifecycle ã«åãã㊠OpenTelemetry ãèšè£ ããã«ã¯ãbackground task ã Drop ã« export ãä»»ãããmanual processor / reader ã§ invocation å¢çã« flush ãéçŽããremote backend ãžã® delivery 㯠extension ã«å¯ããããšããæ§æã«ãªããŸããã
ãã®ãããã§ extension ã®ä»çµã¿ã manual processor / reader ã«ã€ããŠåŠã¹ãäžæ¹ãsignal ããšã«å©çšãã crate ãã°ãããŠããŸãã®ã¯çŸç¶ã®èª²é¡ã§ããRust ã® opentelemetry 㯠çå®ã« stable release ã«åãã£ãŠããã®ã§ãtracing ecosystem ãšå ±åããŠãããããã·ã³ãã«ã«ãªããšãããããšæã£ãŠããŸãã