|
| 1 | +import { OTLPLogExporter } from '@opentelemetry/exporter-logs-otlp-http' |
| 2 | +import { OTLPMetricExporter } from '@opentelemetry/exporter-metrics-otlp-http' |
| 3 | +import { OTLPTraceExporter } from '@opentelemetry/exporter-trace-otlp-http' |
| 4 | +import { BatchLogRecordProcessor } from '@opentelemetry/sdk-logs' |
| 5 | +import { PeriodicExportingMetricReader } from '@opentelemetry/sdk-metrics' |
| 6 | +import type { NodeSDKConfiguration } from '@opentelemetry/sdk-node' |
| 7 | +import { BatchSpanProcessor } from '@opentelemetry/sdk-trace-base' |
| 8 | + |
| 9 | +import type { DestinationConfig, DestinationMap, DestinationSignal } from './types/destinations.js' |
| 10 | +import debug from './debug.js' |
| 11 | + |
| 12 | +/** |
| 13 | + * OTel SDK primitives built from configured destinations. |
| 14 | + * |
| 15 | + * These arrays are merged inside `OtelManager` with user-provided SDK options. |
| 16 | + */ |
| 17 | +export interface DestinationPipelines { |
| 18 | + spanProcessors?: NonNullable<NodeSDKConfiguration['spanProcessors']> |
| 19 | + metricReaders?: NonNullable<NodeSDKConfiguration['metricReaders']> |
| 20 | + logRecordProcessors?: NonNullable<NodeSDKConfiguration['logRecordProcessors']> |
| 21 | +} |
| 22 | + |
| 23 | +/** |
| 24 | + * Transforms high-level `destinations` config into concrete OTel exporters/processors. |
| 25 | + */ |
| 26 | +export class DestinationManager { |
| 27 | + readonly destinations: DestinationMap |
| 28 | + |
| 29 | + /** |
| 30 | + * @param destinations Destination map from `OtelConfig.destinations`. |
| 31 | + */ |
| 32 | + constructor(destinations: DestinationMap | undefined) { |
| 33 | + if (!destinations) { |
| 34 | + this.destinations = {} |
| 35 | + return |
| 36 | + } |
| 37 | + |
| 38 | + this.destinations = destinations |
| 39 | + } |
| 40 | + |
| 41 | + #resolveSignals(destination: DestinationConfig): Set<DestinationSignal> { |
| 42 | + if (destination.signals === 'all') { |
| 43 | + return new Set<DestinationSignal>(['traces', 'metrics', 'logs']) |
| 44 | + } |
| 45 | + |
| 46 | + return new Set(destination.signals) |
| 47 | + } |
| 48 | + |
| 49 | + /** |
| 50 | + * Resolve the final endpoint for one signal. |
| 51 | + * |
| 52 | + * Priority: |
| 53 | + * 1) `destination.endpoints[signal]` |
| 54 | + * 2) `destination.endpoint` + `/v1/{signal}` |
| 55 | + * 3) `undefined` (OTel exporter applies env/default fallback) |
| 56 | + */ |
| 57 | + #resolveEndpoint(destination: DestinationConfig, signal: DestinationSignal): string | undefined { |
| 58 | + const signalEndpoint = destination.endpoints?.[signal] |
| 59 | + if (signalEndpoint) return signalEndpoint |
| 60 | + |
| 61 | + if (destination.endpoint) { |
| 62 | + return `${destination.endpoint.replace(/\/$/, '')}/v1/${signal}` |
| 63 | + } |
| 64 | + |
| 65 | + return undefined |
| 66 | + } |
| 67 | + |
| 68 | + /** |
| 69 | + * Batch processor options shared by trace and logs processors. |
| 70 | + */ |
| 71 | + #resolveBatchProcessorConfig(destination: DestinationConfig) { |
| 72 | + return { |
| 73 | + ...(destination.maxExportBatchSize !== undefined && { |
| 74 | + maxExportBatchSize: destination.maxExportBatchSize, |
| 75 | + }), |
| 76 | + ...(destination.scheduledDelayMillis !== undefined && { |
| 77 | + scheduledDelayMillis: destination.scheduledDelayMillis, |
| 78 | + }), |
| 79 | + ...(destination.exportTimeoutMillis !== undefined && { |
| 80 | + exportTimeoutMillis: destination.exportTimeoutMillis, |
| 81 | + }), |
| 82 | + ...(destination.maxQueueSize !== undefined && { |
| 83 | + maxQueueSize: destination.maxQueueSize, |
| 84 | + }), |
| 85 | + } |
| 86 | + } |
| 87 | + |
| 88 | + /** |
| 89 | + * Build span processors, metric readers and log processors for every enabled destination. |
| 90 | + * |
| 91 | + * One destination can receive all signals or only a subset via `signals`. |
| 92 | + * Multiple destinations create multiple exporters, enabling fan-out. |
| 93 | + */ |
| 94 | + buildPipelines(): DestinationPipelines { |
| 95 | + const spanProcessors: NonNullable<NodeSDKConfiguration['spanProcessors']> = [] |
| 96 | + const metricReaders: NonNullable<NodeSDKConfiguration['metricReaders']> = [] |
| 97 | + const logRecordProcessors: NonNullable<NodeSDKConfiguration['logRecordProcessors']> = [] |
| 98 | + |
| 99 | + for (const [destinationKey, destination] of Object.entries(this.destinations)) { |
| 100 | + if (!destination.enabled) continue |
| 101 | + |
| 102 | + const signals = this.#resolveSignals(destination) |
| 103 | + const destinationName = destination.name ?? destinationKey |
| 104 | + const headers = destination.headers |
| 105 | + const timeoutMillis = destination.timeoutMillis |
| 106 | + const concurrencyLimit = destination.concurrencyLimit |
| 107 | + const compression = destination.compression |
| 108 | + const batchProcessorConfig = this.#resolveBatchProcessorConfig(destination) |
| 109 | + |
| 110 | + if (signals.has('traces')) { |
| 111 | + const url = this.#resolveEndpoint(destination, 'traces') |
| 112 | + |
| 113 | + const exporter = new OTLPTraceExporter({ |
| 114 | + ...(url && { url }), |
| 115 | + ...(headers && { headers }), |
| 116 | + ...(timeoutMillis !== undefined && { timeoutMillis }), |
| 117 | + ...(concurrencyLimit !== undefined && { concurrencyLimit }), |
| 118 | + ...(compression !== undefined && { compression }), |
| 119 | + }) |
| 120 | + |
| 121 | + spanProcessors.push(new BatchSpanProcessor(exporter, batchProcessorConfig)) |
| 122 | + } |
| 123 | + |
| 124 | + if (signals.has('metrics')) { |
| 125 | + const url = this.#resolveEndpoint(destination, 'metrics') |
| 126 | + |
| 127 | + const exporter = new OTLPMetricExporter({ |
| 128 | + ...(url && { url }), |
| 129 | + ...(headers && { headers }), |
| 130 | + ...(timeoutMillis !== undefined && { timeoutMillis }), |
| 131 | + ...(concurrencyLimit !== undefined && { concurrencyLimit }), |
| 132 | + ...(compression !== undefined && { compression }), |
| 133 | + }) |
| 134 | + |
| 135 | + metricReaders.push( |
| 136 | + new PeriodicExportingMetricReader({ |
| 137 | + exporter, |
| 138 | + ...(destination.metricExportIntervalMillis !== undefined && { |
| 139 | + exportIntervalMillis: destination.metricExportIntervalMillis, |
| 140 | + }), |
| 141 | + ...(destination.metricExportTimeoutMillis !== undefined && { |
| 142 | + exportTimeoutMillis: destination.metricExportTimeoutMillis, |
| 143 | + }), |
| 144 | + }) |
| 145 | + ) |
| 146 | + } |
| 147 | + |
| 148 | + if (signals.has('logs')) { |
| 149 | + const url = this.#resolveEndpoint(destination, 'logs') |
| 150 | + |
| 151 | + const exporter = new OTLPLogExporter({ |
| 152 | + ...(url && { url }), |
| 153 | + ...(headers && { headers }), |
| 154 | + ...(timeoutMillis !== undefined && { timeoutMillis }), |
| 155 | + ...(concurrencyLimit !== undefined && { concurrencyLimit }), |
| 156 | + ...(compression !== undefined && { compression }), |
| 157 | + }) |
| 158 | + |
| 159 | + logRecordProcessors.push(new BatchLogRecordProcessor(exporter, batchProcessorConfig)) |
| 160 | + } |
| 161 | + |
| 162 | + debug('configured destination "%s" for signals: %O', destinationName, [...signals]) |
| 163 | + } |
| 164 | + |
| 165 | + return { |
| 166 | + spanProcessors: spanProcessors.length > 0 ? spanProcessors : undefined, |
| 167 | + metricReaders: metricReaders.length > 0 ? metricReaders : undefined, |
| 168 | + logRecordProcessors: logRecordProcessors.length > 0 ? logRecordProcessors : undefined, |
| 169 | + } |
| 170 | + } |
| 171 | +} |
0 commit comments