Skip to content

Commit

Permalink
Revert ext_proc: send attributes #30781 (#31017)
Browse files Browse the repository at this point in the history
Signed-off-by: Yanjun Xiang <yanjunxiang@google.com>
  • Loading branch information
yanjunxiang-google committed Nov 28, 2023
1 parent 1f1f605 commit 7730f8b
Show file tree
Hide file tree
Showing 16 changed files with 47 additions and 276 deletions.
4 changes: 4 additions & 0 deletions api/envoy/extensions/filters/http/ext_proc/v3/ext_proc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,15 @@ message ExternalProcessor {
// for a reply.
bool async_mode = 4;

// [#not-implemented-hide:]
// Envoy provides a number of :ref:`attributes <arch_overview_attributes>`
// for expressive policies. Each attribute name provided in this field will be
// matched against that list and populated in the request_headers message.
// See the :ref:`attribute documentation <arch_overview_request_attributes>`
// for the list of supported attributes and their types.
repeated string request_attributes = 5;

// [#not-implemented-hide:]
// Envoy provides a number of :ref:`attributes <arch_overview_attributes>`
// for expressive policies. Each attribute name provided in this field will be
// matched against that list and populated in the response_headers message.
Expand Down Expand Up @@ -255,10 +257,12 @@ message ExtProcOverrides {
// Set a different asynchronous processing option than the default.
bool async_mode = 2;

// [#not-implemented-hide:]
// Set different optional attributes than the default setting of the
// ``request_attributes`` field.
repeated string request_attributes = 3;

// [#not-implemented-hide:]
// Set different optional properties than the default setting of the
// ``response_attributes`` field.
repeated string response_attributes = 4;
Expand Down
7 changes: 0 additions & 7 deletions changelogs/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -203,13 +203,6 @@ new_features:
- area: tracing
change: |
Provide initial span attributes to a sampler used in the OpenTelemetry tracer.
- area: ext_proc
change: |
implemented
:ref:`request_attributes <envoy_v3_api_field_extensions.filters.http.ext_proc.v3.ExternalProcessor.request_attributes>`
and
:ref:`response_attributes <envoy_v3_api_field_extensions.filters.http.ext_proc.v3.ExternalProcessor.response_attributes>`
config APIs to enable sending and receiving attributes from/to the external processing server.
- area: tracing
change: |
Added support to configure a Dynatrace resource detector for the OpenTelemetry tracer.
Expand Down
25 changes: 1 addition & 24 deletions source/extensions/filters/http/ext_proc/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,6 @@ envoy_cc_library(
"ext_proc.h",
"processor_state.h",
],
copts = select({
"//bazel:windows_x86_64": [],
"//conditions:default": [
"-DUSE_CEL_PARSER",
],
}),
deps = [
":client_interface",
":mutation_utils_lib",
Expand All @@ -35,41 +29,24 @@ envoy_cc_library(
"//source/common/buffer:buffer_lib",
"//source/common/protobuf",
"//source/common/runtime:runtime_features_lib",
"//source/extensions/filters/common/expr:evaluator_lib",
"//source/extensions/filters/common/mutation_rules:mutation_rules_lib",
"//source/extensions/filters/http/common:pass_through_filter_lib",
"@com_google_absl//absl/status",
"@com_google_absl//absl/strings:str_format",
"@com_google_cel_cpp//eval/public:builtin_func_registrar",
"@com_google_cel_cpp//eval/public:cel_expr_builder_factory",
"@envoy_api//envoy/config/common/mutation_rules/v3:pkg_cc_proto",
"@envoy_api//envoy/config/core/v3:pkg_cc_proto",
"@envoy_api//envoy/extensions/filters/http/ext_proc/v3:pkg_cc_proto",
"@envoy_api//envoy/service/ext_proc/v3:pkg_cc_proto",
] + select(
{
"//bazel:windows_x86_64": [],
"//conditions:default": [
"@com_google_cel_cpp//parser",
],
},
),
],
)

envoy_cc_extension(
name = "config",
srcs = ["config.cc"],
hdrs = ["config.h"],
copts = select({
"//bazel:windows_x86_64": [],
"//conditions:default": [
"-DUSE_CEL_PARSER",
],
}),
deps = [
":client_lib",
":ext_proc",
"//source/extensions/filters/common/expr:evaluator_lib",
"//source/extensions/filters/http/common:factory_base_lib",
"@envoy_api//envoy/extensions/filters/http/ext_proc/v3:pkg_cc_proto",
],
Expand Down
13 changes: 6 additions & 7 deletions source/extensions/filters/http/ext_proc/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ Http::FilterFactoryCb ExternalProcessingFilterConfig::createFilterFactoryFromPro
PROTOBUF_GET_MS_OR_DEFAULT(proto_config, message_timeout, DefaultMessageTimeoutMs);
const uint32_t max_message_timeout_ms =
PROTOBUF_GET_MS_OR_DEFAULT(proto_config, max_message_timeout, DefaultMaxMessageTimeoutMs);
const auto filter_config = std::make_shared<FilterConfig>(
proto_config, std::chrono::milliseconds(message_timeout_ms), max_message_timeout_ms,
context.scope(), stats_prefix, Envoy::Extensions::Filters::Common::Expr::getBuilder(context));
const auto filter_config =
std::make_shared<FilterConfig>(proto_config, std::chrono::milliseconds(message_timeout_ms),
max_message_timeout_ms, context.scope(), stats_prefix);

return [filter_config, grpc_service = proto_config.grpc_service(),
&context](Http::FilterChainFactoryCallbacks& callbacks) {
Expand Down Expand Up @@ -45,10 +45,9 @@ ExternalProcessingFilterConfig::createFilterFactoryFromProtoWithServerContextTyp
PROTOBUF_GET_MS_OR_DEFAULT(proto_config, message_timeout, DefaultMessageTimeoutMs);
const uint32_t max_message_timeout_ms =
PROTOBUF_GET_MS_OR_DEFAULT(proto_config, max_message_timeout, DefaultMaxMessageTimeoutMs);
const auto filter_config = std::make_shared<FilterConfig>(
proto_config, std::chrono::milliseconds(message_timeout_ms), max_message_timeout_ms,
server_context.scope(), stats_prefix,
Envoy::Extensions::Filters::Common::Expr::getBuilder(server_context));
const auto filter_config =
std::make_shared<FilterConfig>(proto_config, std::chrono::milliseconds(message_timeout_ms),
max_message_timeout_ms, server_context.scope(), stats_prefix);

return [filter_config, grpc_service = proto_config.grpc_service(),
&server_context](Http::FilterChainFactoryCallbacks& callbacks) {
Expand Down
3 changes: 1 addition & 2 deletions source/extensions/filters/http/ext_proc/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
#include "envoy/extensions/filters/http/ext_proc/v3/ext_proc.pb.h"
#include "envoy/extensions/filters/http/ext_proc/v3/ext_proc.pb.validate.h"

#include "source/extensions/filters/common/expr/evaluator.h"
#include "source/extensions/filters/http/common/factory_base.h"

namespace Envoy {
Expand All @@ -30,7 +29,7 @@ class ExternalProcessingFilterConfig

Router::RouteSpecificFilterConfigConstSharedPtr createRouteSpecificFilterConfigTyped(
const envoy::extensions::filters::http::ext_proc::v3::ExtProcPerRoute& proto_config,
Server::Configuration::ServerFactoryContext&,
Server::Configuration::ServerFactoryContext& context,
ProtobufMessage::ValidationVisitor& validator) override;

Http::FilterFactoryCb createFilterFactoryFromProtoWithServerContextTyped(
Expand Down
91 changes: 3 additions & 88 deletions source/extensions/filters/http/ext_proc/ext_proc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,6 @@

#include "absl/strings/str_format.h"

#if defined(USE_CEL_PARSER)
#include "parser/parser.h"
#endif

namespace Envoy {
namespace Extensions {
namespace HttpFilters {
Expand Down Expand Up @@ -117,27 +113,6 @@ ExtProcLoggingInfo::grpcCalls(envoy::config::core::v3::TrafficDirection traffic_
: encoding_processor_grpc_calls_;
}

absl::flat_hash_map<std::string, Extensions::Filters::Common::Expr::ExpressionPtr>
FilterConfig::initExpressions(const Protobuf::RepeatedPtrField<std::string>& matchers) const {
absl::flat_hash_map<std::string, Extensions::Filters::Common::Expr::ExpressionPtr> expressions;
#if defined(USE_CEL_PARSER)
for (const auto& matcher : matchers) {
auto parse_status = google::api::expr::parser::Parse(matcher);
if (!parse_status.ok()) {
throw EnvoyException("Unable to parse descriptor expression: " +
parse_status.status().ToString());
}
expressions.emplace(matcher, Extensions::Filters::Common::Expr::createExpression(
builder_->builder(), parse_status.value().expr()));
}
#else
ENVOY_LOG(warn, "CEL expression parsing is not available for use in this environment."
" Attempted to parse " +
std::to_string(matchers.size()) + " expressions");
#endif
return expressions;
}

FilterConfigPerRoute::FilterConfigPerRoute(const ExtProcPerRoute& config)
: disabled_(config.disabled()) {
if (config.has_overrides()) {
Expand Down Expand Up @@ -226,8 +201,7 @@ void Filter::onDestroy() {
}

FilterHeadersStatus Filter::onHeaders(ProcessorState& state,
Http::RequestOrResponseHeaderMap& headers, bool end_stream,
absl::optional<ProtobufWkt::Struct> proto) {
Http::RequestOrResponseHeaderMap& headers, bool end_stream) {
switch (openStream()) {
case StreamOpenState::Error:
return FilterHeadersStatus::StopIteration;
Expand All @@ -245,9 +219,6 @@ FilterHeadersStatus Filter::onHeaders(ProcessorState& state,
MutationUtils::headersToProto(headers, config_->allowedHeaders(), config_->disallowedHeaders(),
*headers_req->mutable_headers());
headers_req->set_end_of_stream(end_stream);
if (proto.has_value()) {
(*headers_req->mutable_attributes())[FilterName] = proto.value();
}
state.onStartProcessorCall(std::bind(&Filter::onMessageTimeout, this), config_->messageTimeout(),
ProcessorState::CallbackState::HeadersCallback);
ENVOY_LOG(debug, "Sending headers message");
Expand All @@ -257,48 +228,6 @@ FilterHeadersStatus Filter::onHeaders(ProcessorState& state,
return FilterHeadersStatus::StopIteration;
}

const absl::optional<ProtobufWkt::Struct> Filter::evaluateAttributes(
Filters::Common::Expr::ActivationPtr activation,
const absl::flat_hash_map<std::string, Extensions::Filters::Common::Expr::ExpressionPtr>&
expr) {
absl::optional<ProtobufWkt::Struct> proto;
if (expr.size() > 0) {
proto.emplace(ProtobufWkt::Struct{});
for (const auto& hash_entry : expr) {
ProtobufWkt::Arena arena;
const auto result = hash_entry.second.get()->Evaluate(*activation, &arena);
if (!result.ok()) {
// TODO: Stats?
continue;
}

if (result.value().IsError()) {
ENVOY_LOG(trace, "error parsing cel expression {}", hash_entry.first);
continue;
}

ProtobufWkt::Value value;
switch (result.value().type()) {
case google::api::expr::runtime::CelValue::Type::kBool:
value.set_bool_value(result.value().BoolOrDie());
break;
case google::api::expr::runtime::CelValue::Type::kNullType:
value.set_null_value(ProtobufWkt::NullValue{});
break;
case google::api::expr::runtime::CelValue::Type::kDouble:
value.set_number_value(result.value().DoubleOrDie());
break;
default:
value.set_string_value(Filters::Common::Expr::print(result.value()));
}

(*(proto.value()).mutable_fields())[hash_entry.first] = value;
}
}

return proto;
}

FilterHeadersStatus Filter::decodeHeaders(RequestHeaderMap& headers, bool end_stream) {
ENVOY_LOG(trace, "decodeHeaders: end_stream = {}", end_stream);
mergePerRouteConfig();
Expand All @@ -308,14 +237,7 @@ FilterHeadersStatus Filter::decodeHeaders(RequestHeaderMap& headers, bool end_st

FilterHeadersStatus status = FilterHeadersStatus::Continue;
if (decoding_state_.sendHeaders()) {
absl::optional<Envoy::ProtobufWkt::Struct> proto;
if (!config_->requestExpr().empty()) {
auto activation_ptr = Filters::Common::Expr::createActivation(decoding_state_.streamInfo(),
&headers, nullptr, nullptr);
proto = evaluateAttributes(std::move(activation_ptr), config_->requestExpr());
}

status = onHeaders(decoding_state_, headers, end_stream, proto);
status = onHeaders(decoding_state_, headers, end_stream);
ENVOY_LOG(trace, "onHeaders returning {}", static_cast<int>(status));
} else {
ENVOY_LOG(trace, "decodeHeaders: Skipped header processing");
Expand Down Expand Up @@ -593,14 +515,7 @@ FilterHeadersStatus Filter::encodeHeaders(ResponseHeaderMap& headers, bool end_s

FilterHeadersStatus status = FilterHeadersStatus::Continue;
if (!processing_complete_ && encoding_state_.sendHeaders()) {
absl::optional<Envoy::ProtobufWkt::Struct> proto;
if (!config_->responseExpr().empty()) {
auto activation_ptr = Filters::Common::Expr::createActivation(encoding_state_.streamInfo(),
nullptr, &headers, nullptr);
proto = evaluateAttributes(std::move(activation_ptr), config_->responseExpr());
}

status = onHeaders(encoding_state_, headers, end_stream, proto);
status = onHeaders(encoding_state_, headers, end_stream);
ENVOY_LOG(trace, "onHeaders returns {}", static_cast<int>(status));
} else {
ENVOY_LOG(trace, "encodeHeaders: Skipped header processing");
Expand Down
38 changes: 4 additions & 34 deletions source/extensions/filters/http/ext_proc/ext_proc.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
#include "source/common/common/logger.h"
#include "source/common/common/matchers.h"
#include "source/common/protobuf/protobuf.h"
#include "source/extensions/filters/common/expr/evaluator.h"
#include "source/extensions/filters/common/mutation_rules/mutation_rules.h"
#include "source/extensions/filters/http/common/pass_through_filter.h"
#include "source/extensions/filters/http/ext_proc/client.h"
Expand Down Expand Up @@ -122,13 +121,12 @@ class ExtProcLoggingInfo : public Envoy::StreamInfo::FilterState::Object {
Upstream::HostDescriptionConstSharedPtr upstream_host_;
};

class FilterConfig : public Logger::Loggable<Logger::Id::ext_proc> {
class FilterConfig {
public:
FilterConfig(const envoy::extensions::filters::http::ext_proc::v3::ExternalProcessor& config,
const std::chrono::milliseconds message_timeout,
const uint32_t max_message_timeout_ms, Stats::Scope& scope,
const std::string& stats_prefix,
Extensions::Filters::Common::Expr::BuilderInstanceSharedPtr builder)
const std::string& stats_prefix)
: failure_mode_allow_(config.failure_mode_allow()),
disable_clear_route_cache_(config.disable_clear_route_cache()),
message_timeout_(message_timeout), max_message_timeout_ms_(max_message_timeout_ms),
Expand All @@ -138,9 +136,7 @@ class FilterConfig : public Logger::Loggable<Logger::Id::ext_proc> {
allow_mode_override_(config.allow_mode_override()),
disable_immediate_response_(config.disable_immediate_response()),
allowed_headers_(initHeaderMatchers(config.forward_rules().allowed_headers())),
disallowed_headers_(initHeaderMatchers(config.forward_rules().disallowed_headers())),
builder_(builder), request_expr_(initExpressions(config.request_attributes())),
response_expr_(initExpressions(config.response_attributes())) {}
disallowed_headers_(initHeaderMatchers(config.forward_rules().disallowed_headers())) {}

bool failureModeAllow() const { return failure_mode_allow_; }

Expand Down Expand Up @@ -170,16 +166,6 @@ class FilterConfig : public Logger::Loggable<Logger::Id::ext_proc> {

const Envoy::ProtobufWkt::Struct& filterMetadata() const { return filter_metadata_; }

const absl::flat_hash_map<std::string, Extensions::Filters::Common::Expr::ExpressionPtr>&
requestExpr() const {
return request_expr_;
}

const absl::flat_hash_map<std::string, Extensions::Filters::Common::Expr::ExpressionPtr>&
responseExpr() const {
return response_expr_;
}

private:
ExtProcFilterStats generateStats(const std::string& prefix,
const std::string& filter_stats_prefix, Stats::Scope& scope) {
Expand All @@ -197,9 +183,6 @@ class FilterConfig : public Logger::Loggable<Logger::Id::ext_proc> {
return header_matchers;
}

absl::flat_hash_map<std::string, Extensions::Filters::Common::Expr::ExpressionPtr>
initExpressions(const Protobuf::RepeatedPtrField<std::string>& matchers) const;

const bool failure_mode_allow_;
const bool disable_clear_route_cache_;
const std::chrono::milliseconds message_timeout_;
Expand All @@ -218,13 +201,6 @@ class FilterConfig : public Logger::Loggable<Logger::Id::ext_proc> {
const std::vector<Matchers::StringMatcherPtr> allowed_headers_;
// Empty disallowed_header_ means disallow nothing, i.e, allow all.
const std::vector<Matchers::StringMatcherPtr> disallowed_headers_;

Extensions::Filters::Common::Expr::BuilderInstanceSharedPtr builder_;

const absl::flat_hash_map<std::string, Extensions::Filters::Common::Expr::ExpressionPtr>
request_expr_;
const absl::flat_hash_map<std::string, Extensions::Filters::Common::Expr::ExpressionPtr>
response_expr_;
};

using FilterConfigSharedPtr = std::shared_ptr<FilterConfig>;
Expand Down Expand Up @@ -324,13 +300,7 @@ class Filter : public Logger::Loggable<Logger::Id::ext_proc>,
void sendImmediateResponse(const envoy::service::ext_proc::v3::ImmediateResponse& response);

Http::FilterHeadersStatus onHeaders(ProcessorState& state,
Http::RequestOrResponseHeaderMap& headers, bool end_stream,
absl::optional<ProtobufWkt::Struct> proto);

const absl::optional<ProtobufWkt::Struct> evaluateAttributes(
Filters::Common::Expr::ActivationPtr activation,
const absl::flat_hash_map<std::string, Extensions::Filters::Common::Expr::ExpressionPtr>&
expr);
Http::RequestOrResponseHeaderMap& headers, bool end_stream);
// Return a pair of whether to terminate returning the current result.
std::pair<bool, Http::FilterDataStatus> sendStreamChunk(ProcessorState& state);
Http::FilterDataStatus onData(ProcessorState& state, Buffer::Instance& data, bool end_stream);
Expand Down
Loading

0 comments on commit 7730f8b

Please sign in to comment.