Skip to content

Conversation

@lquerel
Copy link
Contributor

@lquerel lquerel commented Nov 4, 2025

Align OTLP/OTAP receivers on shared gRPC config

This PR is a subset of #1357 and focuses on the OTLP and OTAP receivers.

  • Both gRPC-based receivers now share the same configuration structure (GrpcServerSettings) to ensure greater consistency. The configuration has also been significantly extended to allow for more precise tuning. Parameters representing byte quantities can now be expressed either as raw byte counts or using units such as MB, MiB, etc.
  • Adds flexible compression deserialization (single value, list, or none) and reuses it across request/response compression settings. By default, response compression is no longer enabled since responses are typically very small. Both zstd and gzip are enabled for requests.
  • Pushes max-concurrent-request tuning into both receivers (and exposes a helper) so that downstream capacity directly drives the gRPC concurrency clamp.
  • Updates the OTLP/OTAP servers to honor the new settings: compression preferences, adaptive windows, and message-size limits are applied once when the services are built.

Key changes for the OTAP receiver

  • Response streaming is now driven by an async state machine instead of spawning a new task per request. This removes the extra mpsc hop and preserves backpressure.
  • Ack/Nack correlation slots are protected by a parking_lot::Mutex, providing fast, non-poisoning locking in async contexts where a poisoned std::sync::Mutex would otherwise stall the Tokio worker.
  • Compression preferences, concurrency limits, and middleware (such as zstd header handling) are applied once per service construction so that hot-path processing remains lean.

New experimental receiver (non-Tonic)

This PR also introduces an experimental OTAP receiver (otel_receiver) that does not rely on Tonic. The intent is to:

  • Better align our gRPC-based receivers with the thread-per-core design of the OTAP pipeline engine
  • Improve control over the internal mechanics of these receivers. A first version of the admission controller is included; a future PR will extend it to protect the engine and keep CPU/memory usage under control
  • Improve overall performance
  • Support OTAP and OTLP on the same port

This experimental receiver does not support OTLP yet, but that is coming soon.

The following diagram describes the overall design to ease the review process.

image

@lquerel lquerel self-assigned this Nov 4, 2025
@github-actions github-actions bot added the rust Pull requests that update Rust code label Nov 4, 2025
@codecov
Copy link

codecov bot commented Nov 4, 2025

Codecov Report

❌ Patch coverage is 82.09203% with 755 lines in your changes missing coverage. Please review.
✅ Project coverage is 83.91%. Comparing base (e771ca9) to head (8664c2f).

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #1382      +/-   ##
==========================================
- Coverage   83.91%   83.91%   -0.01%     
==========================================
  Files         398      409      +11     
  Lines      108993   112861    +3868     
==========================================
+ Hits        91466    94708    +3242     
- Misses      16993    17619     +626     
  Partials      534      534              
Components Coverage Δ
otap-dataflow 85.62% <82.09%> (-0.12%) ⬇️
query_abstraction 80.61% <ø> (ø)
query_engine 90.61% <ø> (ø)
syslog_cef_receivers ∅ <ø> (∅)
otel-arrow-go 53.50% <ø> (ø)
🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@lquerel lquerel marked this pull request as ready for review November 4, 2025 18:51
@lquerel lquerel requested a review from a team as a code owner November 4, 2025 18:51
Co-authored-by: Utkarsh Umesan Pillai <66651184+utpilla@users.noreply.github.com>
Comment on lines +75 to +80
let mut deduped = Vec::with_capacity(methods.len());
for method in methods {
if !deduped.contains(&method) {
deduped.push(method);
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic is only needed for CompressionConfigValue::List. We could move it inside the match statement to make the code simpler.

Comment on lines +140 to +144
// Note: without status set, the OTAP encoder fails at runtime
.status(otap_df_pdata::proto::opentelemetry::trace::v1::Status::new(
StatusCode::Ok,
"ok",
))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Note: without status set, the OTAP encoder fails at runtime
.status(otap_df_pdata::proto::opentelemetry::trace::v1::Status::new(
StatusCode::Ok,
"ok",
))
.status(otap_df_pdata::proto::opentelemetry::trace::v1::Status::new(
StatusCode::Ok,
"ok",
))

Maybe fixed in #1436

use otap_df_pdata::proto::opentelemetry::collector::logs::v1::ExportLogsServiceResponse;
use otap_df_pdata::proto::opentelemetry::collector::metrics::v1::ExportMetricsServiceResponse;
use otap_df_pdata::proto::opentelemetry::collector::trace::v1::ExportTraceServiceResponse;
use parking_lot::Mutex;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 TIL

}
}

/// Applies the shared server tuning options to a tonic server builder.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Applies the shared server tuning options to a tonic server builder.
/// Applies the shared server tuning options to a server builder.

Comment on lines 57 to 60
struct AckRegistryInner {
slots: Box<[AckSlot]>,
free_stack: Vec<usize>,
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a tiny bit confused. Is this the new experimental code intended to replace something else, or have you already replaced something with this? This looks like the data type in crates/otap/src/accessory which use slotmap, which I see still referenced from otap_grpc/otlp/server.rs. This looks like an alternative to slotmap?

}
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can write your own gRPC implementation, but it better be 1000 lines or less!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lol yeah we gotta delete all the comments to get under the line limit

F: Fn(T) -> OtapArrowRecords + Send + Copy + 'static,
{
stream::unfold(state, |mut state| async move {
match state.next_item().await {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems the same problem persists. The receiver requires the client to poll the output stream. Even though fill_inflight() reads data eagerly from the input stream, it won’t run until the client polls the output stream, right?

client polls the output stream -> `ArrowBatchStreamState::next_item()` -> `ArrowBatchStreamState::fill_inflight()`

}

#[test]
#[ignore = "temporarily disabled while investigating produce_bar failure"]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lquerel since #1447 and #1446 are merged this should pass now. I tested locally by merging main into your branch and it seems to work

Copy link
Contributor

@utpilla utpilla left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the effort on this! I wanted to share a few thoughts and concerns. To be upfront, I don’t have deep expertise in HTTP/2 or gRPC internals, so I can’t fully evaluate this implementation and that’s part of why I’m raising these points.

I understand the motivation behind avoiding Send bounds for a thread-per-core design, and that seems like a valid goal. My concern is more about long-term maintainability than the immediate code. HTTP/2 and gRPC have subtle requirements around flow control, connection state, error handling, deadline propagation, and header compression. I guess my question is less about whether this works now but whether it will still work correctly down the road when someone encounters a weird client, a specific network condition, or a new gRPC feature. Tonic has years of production exposure and an active community finding and fixing edge cases. With a custom implementation, we'd be building up that production experience from scratch. I’m not sure how many of us (myself included) feel comfortable debugging HTTP/2 frame-level issues if they arise later. Just something to consider on the maintenance side.

Maybe we could feature flag this implementation rather than trying to make it default? This would let teams who need the thread-per-core performance benefit opt in, while keeping Tonic as the battle-tested default. It would also give time to harden the h2 implementation with early adopters before broader rollout.

@lquerel
Copy link
Contributor Author

lquerel commented Nov 22, 2025

@utpilla We are on the same page. I plan to keep both the OTLP and OTAP receivers for quite some time, in addition to this experimental receiver.

@jmacd jmacd marked this pull request as draft December 3, 2025 17:46
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

rust Pull requests that update Rust code

Projects

Status: No status

Development

Successfully merging this pull request may close these issues.

4 participants