Skip to content

Commit f0b67b9

Browse files
committed
feat: add destinations new option
1 parent a4026f9 commit f0b67b9

File tree

11 files changed

+640
-6
lines changed

11 files changed

+640
-6
lines changed

index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
export { configure } from './configure.js'
1111
export { defineConfig } from './src/define_config.js'
1212
export { OtelManager } from './src/otel.js'
13+
export { destinations } from './src/destinations.js'
1314

1415
/**
1516
* Re-export OTLP exporters so users don't need to install those 100 packages

package.json

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,10 +69,14 @@
6969
"@opentelemetry/api": "^1.9.0",
7070
"@opentelemetry/auto-instrumentations-node": "^0.67.3",
7171
"@opentelemetry/core": "^2.5.0",
72+
"@opentelemetry/exporter-logs-otlp-http": "^0.211.0",
73+
"@opentelemetry/exporter-metrics-otlp-http": "^0.211.0",
7274
"@opentelemetry/exporter-metrics-otlp-grpc": "^0.211.0",
75+
"@opentelemetry/exporter-trace-otlp-http": "^0.211.0",
7376
"@opentelemetry/exporter-trace-otlp-grpc": "^0.211.0",
7477
"@opentelemetry/instrumentation": "^0.211.0",
7578
"@opentelemetry/resources": "^2.5.0",
79+
"@opentelemetry/sdk-logs": "^0.211.0",
7680
"@opentelemetry/sdk-metrics": "^2.5.0",
7781
"@opentelemetry/sdk-node": "^0.211.0",
7882
"@opentelemetry/sdk-trace-base": "^2.5.0",

src/destination_manager.ts

Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
import { OTLPLogExporter } from '@opentelemetry/exporter-logs-otlp-http'
2+
import { OTLPMetricExporter } from '@opentelemetry/exporter-metrics-otlp-http'
3+
import { OTLPTraceExporter } from '@opentelemetry/exporter-trace-otlp-http'
4+
import { BatchLogRecordProcessor } from '@opentelemetry/sdk-logs'
5+
import { PeriodicExportingMetricReader } from '@opentelemetry/sdk-metrics'
6+
import type { NodeSDKConfiguration } from '@opentelemetry/sdk-node'
7+
import { BatchSpanProcessor } from '@opentelemetry/sdk-trace-base'
8+
9+
import type { DestinationConfig, DestinationMap, DestinationSignal } from './types/destinations.js'
10+
import debug from './debug.js'
11+
12+
/**
13+
* OTel SDK primitives built from configured destinations.
14+
*
15+
* These arrays are merged inside `OtelManager` with user-provided SDK options.
16+
*/
17+
export interface DestinationPipelines {
18+
spanProcessors?: NonNullable<NodeSDKConfiguration['spanProcessors']>
19+
metricReaders?: NonNullable<NodeSDKConfiguration['metricReaders']>
20+
logRecordProcessors?: NonNullable<NodeSDKConfiguration['logRecordProcessors']>
21+
}
22+
23+
/**
24+
* Transforms high-level `destinations` config into concrete OTel exporters/processors.
25+
*/
26+
export class DestinationManager {
27+
readonly destinations: DestinationMap
28+
29+
/**
30+
* @param destinations Destination map from `OtelConfig.destinations`.
31+
*/
32+
constructor(destinations: DestinationMap | undefined) {
33+
if (!destinations) {
34+
this.destinations = {}
35+
return
36+
}
37+
38+
this.destinations = destinations
39+
}
40+
41+
#resolveSignals(destination: DestinationConfig): Set<DestinationSignal> {
42+
if (destination.signals === 'all') {
43+
return new Set<DestinationSignal>(['traces', 'metrics', 'logs'])
44+
}
45+
46+
return new Set(destination.signals)
47+
}
48+
49+
/**
50+
* Resolve the final endpoint for one signal.
51+
*
52+
* Priority:
53+
* 1) `destination.endpoints[signal]`
54+
* 2) `destination.endpoint` + `/v1/{signal}`
55+
* 3) `undefined` (OTel exporter applies env/default fallback)
56+
*/
57+
#resolveEndpoint(destination: DestinationConfig, signal: DestinationSignal): string | undefined {
58+
const signalEndpoint = destination.endpoints?.[signal]
59+
if (signalEndpoint) return signalEndpoint
60+
61+
if (destination.endpoint) {
62+
return `${destination.endpoint.replace(/\/$/, '')}/v1/${signal}`
63+
}
64+
65+
return undefined
66+
}
67+
68+
/**
69+
* Batch processor options shared by trace and logs processors.
70+
*/
71+
#resolveBatchProcessorConfig(destination: DestinationConfig) {
72+
return {
73+
...(destination.maxExportBatchSize !== undefined && {
74+
maxExportBatchSize: destination.maxExportBatchSize,
75+
}),
76+
...(destination.scheduledDelayMillis !== undefined && {
77+
scheduledDelayMillis: destination.scheduledDelayMillis,
78+
}),
79+
...(destination.exportTimeoutMillis !== undefined && {
80+
exportTimeoutMillis: destination.exportTimeoutMillis,
81+
}),
82+
...(destination.maxQueueSize !== undefined && {
83+
maxQueueSize: destination.maxQueueSize,
84+
}),
85+
}
86+
}
87+
88+
/**
89+
* Build span processors, metric readers and log processors for every enabled destination.
90+
*
91+
* One destination can receive all signals or only a subset via `signals`.
92+
* Multiple destinations create multiple exporters, enabling fan-out.
93+
*/
94+
buildPipelines(): DestinationPipelines {
95+
const spanProcessors: NonNullable<NodeSDKConfiguration['spanProcessors']> = []
96+
const metricReaders: NonNullable<NodeSDKConfiguration['metricReaders']> = []
97+
const logRecordProcessors: NonNullable<NodeSDKConfiguration['logRecordProcessors']> = []
98+
99+
for (const [destinationKey, destination] of Object.entries(this.destinations)) {
100+
if (!destination.enabled) continue
101+
102+
const signals = this.#resolveSignals(destination)
103+
const destinationName = destination.name ?? destinationKey
104+
const headers = destination.headers
105+
const timeoutMillis = destination.timeoutMillis
106+
const concurrencyLimit = destination.concurrencyLimit
107+
const compression = destination.compression
108+
const batchProcessorConfig = this.#resolveBatchProcessorConfig(destination)
109+
110+
if (signals.has('traces')) {
111+
const url = this.#resolveEndpoint(destination, 'traces')
112+
113+
const exporter = new OTLPTraceExporter({
114+
...(url && { url }),
115+
...(headers && { headers }),
116+
...(timeoutMillis !== undefined && { timeoutMillis }),
117+
...(concurrencyLimit !== undefined && { concurrencyLimit }),
118+
...(compression !== undefined && { compression }),
119+
})
120+
121+
spanProcessors.push(new BatchSpanProcessor(exporter, batchProcessorConfig))
122+
}
123+
124+
if (signals.has('metrics')) {
125+
const url = this.#resolveEndpoint(destination, 'metrics')
126+
127+
const exporter = new OTLPMetricExporter({
128+
...(url && { url }),
129+
...(headers && { headers }),
130+
...(timeoutMillis !== undefined && { timeoutMillis }),
131+
...(concurrencyLimit !== undefined && { concurrencyLimit }),
132+
...(compression !== undefined && { compression }),
133+
})
134+
135+
metricReaders.push(
136+
new PeriodicExportingMetricReader({
137+
exporter,
138+
...(destination.metricExportIntervalMillis !== undefined && {
139+
exportIntervalMillis: destination.metricExportIntervalMillis,
140+
}),
141+
...(destination.metricExportTimeoutMillis !== undefined && {
142+
exportTimeoutMillis: destination.metricExportTimeoutMillis,
143+
}),
144+
})
145+
)
146+
}
147+
148+
if (signals.has('logs')) {
149+
const url = this.#resolveEndpoint(destination, 'logs')
150+
151+
const exporter = new OTLPLogExporter({
152+
...(url && { url }),
153+
...(headers && { headers }),
154+
...(timeoutMillis !== undefined && { timeoutMillis }),
155+
...(concurrencyLimit !== undefined && { concurrencyLimit }),
156+
...(compression !== undefined && { compression }),
157+
})
158+
159+
logRecordProcessors.push(new BatchLogRecordProcessor(exporter, batchProcessorConfig))
160+
}
161+
162+
debug('configured destination "%s" for signals: %O', destinationName, [...signals])
163+
}
164+
165+
return {
166+
spanProcessors: spanProcessors.length > 0 ? spanProcessors : undefined,
167+
metricReaders: metricReaders.length > 0 ? metricReaders : undefined,
168+
logRecordProcessors: logRecordProcessors.length > 0 ? logRecordProcessors : undefined,
169+
}
170+
}
171+
}

src/destinations.ts

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
import type { OtlpDestinationConfig, OtlpDestinationOptions } from './types/destinations.js'
2+
3+
/**
4+
* Create an OTLP destination config with sensible defaults.
5+
*
6+
* Defaults:
7+
* - `enabled: true`
8+
* - `signals: 'all'`
9+
*/
10+
export function otlp(options: OtlpDestinationOptions): OtlpDestinationConfig {
11+
return {
12+
type: 'otlp',
13+
...options,
14+
enabled: options.enabled ?? true,
15+
signals: options.signals ?? 'all',
16+
}
17+
}
18+
19+
/**
20+
* Helper namespace for destination factories.
21+
*
22+
* @example
23+
* ```ts
24+
* destinations: {
25+
* lgtm: destinations.otlp({ endpoint: 'http://localhost:4318', signals: 'all' }),
26+
* }
27+
* ```
28+
*/
29+
export const destinations = {
30+
otlp,
31+
}

src/otel.ts

Lines changed: 94 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import type { Instrumentation } from '@opentelemetry/instrumentation'
66
import { resourceFromAttributes } from '@opentelemetry/resources'
77
import { NodeSDK } from '@opentelemetry/sdk-node'
88
import {
9+
BatchSpanProcessor,
910
ConsoleSpanExporter,
1011
ParentBasedSampler,
1112
SimpleSpanProcessor,
@@ -28,6 +29,7 @@ import type {
2829
} from './types/instrumentations.js'
2930
import { HttpContext } from '@adonisjs/core/http'
3031
import { HttpUrlFilter } from './http_url_filter.js'
32+
import { DestinationManager, type DestinationPipelines } from './destination_manager.js'
3133
import debug from './debug.js'
3234

3335
/**
@@ -273,22 +275,108 @@ export class OtelManager {
273275
return processors.length > 0 ? processors : undefined
274276
}
275277

278+
/**
279+
* Build destination pipelines from the `destinations` config.
280+
*/
281+
#buildDestinationPipelines(): DestinationPipelines {
282+
return new DestinationManager(this.#config.destinations).buildPipelines()
283+
}
284+
285+
/**
286+
* Whether we should preserve legacy trace exporter settings when destinations add traces.
287+
*/
288+
#shouldPreserveTraceExporter(
289+
destinationSpanProcessors: DestinationPipelines['spanProcessors']
290+
): boolean {
291+
const hasDestinationTraceProcessors = (destinationSpanProcessors?.length ?? 0) > 0
292+
const hasExplicitSpanProcessors = (this.#config.spanProcessors?.length ?? 0) > 0
293+
return hasDestinationTraceProcessors && !hasExplicitSpanProcessors
294+
}
295+
296+
/**
297+
* Merge configured span processors with destination trace processors.
298+
*/
299+
#mergeSpanProcessors(
300+
destinationSpanProcessors: DestinationPipelines['spanProcessors']
301+
): OtelConfig['spanProcessors'] {
302+
const spanProcessors = [
303+
...(this.#buildSpanProcessors() ?? []),
304+
...(destinationSpanProcessors ?? []),
305+
]
306+
307+
/**
308+
* When destinations add trace span processors, NodeSDK ignores `traceExporter`
309+
* and deprecated `spanProcessor`. Re-inject them to preserve existing behavior.
310+
*/
311+
if (this.#shouldPreserveTraceExporter(destinationSpanProcessors)) {
312+
if (this.#config.traceExporter) {
313+
spanProcessors.unshift(new BatchSpanProcessor(this.#config.traceExporter))
314+
}
315+
316+
if (this.#config.spanProcessor) {
317+
spanProcessors.unshift(this.#config.spanProcessor)
318+
}
319+
}
320+
321+
return spanProcessors.length > 0 ? spanProcessors : undefined
322+
}
323+
324+
/**
325+
* Merge configured metric readers with destination metric readers.
326+
*/
327+
#mergeMetricReaders(destinationMetricReaders: DestinationPipelines['metricReaders']) {
328+
const metricReaders = [
329+
...(this.#config.metricReaders ?? []),
330+
...(destinationMetricReaders ?? []),
331+
]
332+
333+
return metricReaders.length > 0 ? metricReaders : undefined
334+
}
335+
336+
/**
337+
* Merge configured log record processors with destination log processors.
338+
*/
339+
#mergeLogRecordProcessors(
340+
destinationLogRecordProcessors: DestinationPipelines['logRecordProcessors']
341+
) {
342+
const logRecordProcessors = [
343+
...(this.#config.logRecordProcessors ?? []),
344+
...(destinationLogRecordProcessors ?? []),
345+
]
346+
347+
return logRecordProcessors.length > 0 ? logRecordProcessors : undefined
348+
}
349+
350+
/**
351+
* Return NodeSDK user config without internal-only keys.
352+
*/
353+
#getNodeSdkUserConfig() {
354+
const sdkConfig = { ...this.#config }
355+
delete sdkConfig.destinations
356+
return sdkConfig
357+
}
358+
276359
/**
277360
* Create the NodeSDK instance with all configuration
278361
*/
279362
#createSdk(): NodeSDK {
280-
const resource = this.#buildResource()
281-
const instrumentations = this.#buildInstrumentations()
363+
const destinationPipelines = this.#buildDestinationPipelines()
282364
const sampler = this.#buildSampler()
283-
const spanProcessors = this.#buildSpanProcessors()
365+
const spanProcessors = this.#mergeSpanProcessors(destinationPipelines.spanProcessors)
366+
const metricReaders = this.#mergeMetricReaders(destinationPipelines.metricReaders)
367+
const logRecordProcessors = this.#mergeLogRecordProcessors(
368+
destinationPipelines.logRecordProcessors
369+
)
284370

285371
return new NodeSDK({
286-
...this.#config,
287-
resource,
372+
...this.#getNodeSdkUserConfig(),
373+
resource: this.#buildResource(),
288374
serviceName: this.serviceName,
289-
instrumentations,
375+
instrumentations: this.#buildInstrumentations(),
290376
...(sampler && { sampler }),
291377
...(spanProcessors && { spanProcessors }),
378+
...(metricReaders && { metricReaders }),
379+
...(logRecordProcessors && { logRecordProcessors }),
292380
})
293381
}
294382

0 commit comments

Comments
 (0)