Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(envoy-rls): Update sentinel-cluster-server-envoy-rls to support v3 api #2336

Merged
merged 2 commits into from
Aug 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,12 @@ After preparing the yaml template, you may deploy the Envoy instance:
kubectl apply -f sample/k8s/envoy.yml
```

for v3 api:

```bash
kubectl apply -f sample/k8s/envoy-v3-api.yml
```

## Test the rate limiting

Now it's show time! We could visit the URL `envoy-service:10000/json` in K8S pods.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
apiVersion: v1
kind: ConfigMap
metadata:
name: envoy-cm-17
data:
envoy-yml: |-
admin:
access_log_path: /tmp/admin_access.log
address:
socket_address:
protocol: TCP
address: 127.0.0.1
port_value: 9901
static_resources:
listeners:
- name: listener_0
address:
socket_address:
protocol: TCP
address: 0.0.0.0
port_value: 10000
filter_chains:
- filters:
- name: envoy.filters.network.http_connection_manager
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager
stat_prefix: ingress_http
route_config:
name: local_route
virtual_hosts:
- name: local_service
domains: ["*"]
routes:
- match:
prefix: "/"
route:
cluster: service_httpbin
typed_per_filter_config:
envoy.filters.http.dynamic_forward_proxy:
"@type": type.googleapis.com/envoy.extensions.filters.http.dynamic_forward_proxy.v3.PerRouteConfig
host_rewrite_literal: httpbin.org
rate_limits:
- stage: 0
actions:
- {destination_cluster: {}}
http_filters:
- name: envoy.filters.http.ratelimit
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.http.ratelimit.v3.RateLimit
domain: foo
request_type: external
failure_mode_deny: false
stage: 0
rate_limit_service:
grpc_service:
envoy_grpc:
cluster_name: rate_limit_cluster
timeout: 2s
transport_api_version: V3
- name: envoy.filters.http.router
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router
clusters:
- name: service_httpbin
connect_timeout: 0.5s
type: LOGICAL_DNS
# Comment out the following line to test on v6 networks
dns_lookup_family: V4_ONLY
lb_policy: ROUND_ROBIN
load_assignment:
cluster_name: service_httpbin
endpoints:
- lb_endpoints:
- endpoint:
address:
socket_address:
address: httpbin.org
port_value: 80
- name: rate_limit_cluster
type: STRICT_DNS
connect_timeout: 10s
lb_policy: ROUND_ROBIN
http2_protocol_options: {}
load_assignment:
cluster_name: rate_limit_cluster
endpoints:
- lb_endpoints:
- endpoint:
address:
socket_address:
address: sentinel-rls-service
port_value: 10245
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: envoy-deployment-basic-17
labels:
app: envoy-17
spec:
replicas: 1
selector:
matchLabels:
app: envoy-17
template:
metadata:
labels:
app: envoy-17
spec:
containers:
- name: envoy
image: envoyproxy/envoy:v1.17.3
ports:
- containerPort: 10000
command: ["envoy"]
args: ["-c", "/tmp/envoy/envoy.yaml"]
volumeMounts:
- name: envoy-config
mountPath: /tmp/envoy
volumes:
- name: envoy-config
configMap:
name: envoy-cm-17
items:
- key: envoy-yml
path: envoy.yaml
---
apiVersion: v1
kind: Service
metadata:
name: envoy-service-17
labels:
name: envoy-service-17
spec:
type: NodePort
ports:
- port: 10000
targetPort: 10000
protocol: TCP
selector:
app: envoy-17
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ public class SentinelRlsGrpcServer {

public SentinelRlsGrpcServer(int port) {
ServerBuilder<?> builder = ServerBuilder.forPort(port)
.addService(new com.alibaba.csp.sentinel.cluster.server.envoy.rls.service.v3.SentinelEnvoyRlsServiceImpl())
.addService(new SentinelEnvoyRlsServiceImpl());

server = builder.build();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package com.alibaba.csp.sentinel.cluster.server.envoy.rls.service.v3;

import com.alibaba.csp.sentinel.cluster.TokenResult;
import com.alibaba.csp.sentinel.cluster.TokenResultStatus;
import com.alibaba.csp.sentinel.cluster.flow.rule.ClusterFlowRuleManager;
import com.alibaba.csp.sentinel.cluster.server.envoy.rls.flow.SimpleClusterFlowChecker;
import com.alibaba.csp.sentinel.cluster.server.envoy.rls.log.RlsAccessLogger;
import com.alibaba.csp.sentinel.cluster.server.envoy.rls.rule.EnvoySentinelRuleConverter;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
import com.alibaba.csp.sentinel.util.function.Tuple2;
import com.google.protobuf.TextFormat;
import io.envoyproxy.envoy.extensions.common.ratelimit.v3.RateLimitDescriptor;
import io.envoyproxy.envoy.service.ratelimit.v3.RateLimitRequest;
import io.envoyproxy.envoy.service.ratelimit.v3.RateLimitResponse;
import io.envoyproxy.envoy.service.ratelimit.v3.RateLimitResponse.Code;
import io.envoyproxy.envoy.service.ratelimit.v3.RateLimitResponse.RateLimit;
import io.envoyproxy.envoy.service.ratelimit.v3.RateLimitResponse.DescriptorStatus;
import io.envoyproxy.envoy.service.ratelimit.v3.RateLimitServiceGrpc;
import io.grpc.stub.StreamObserver;

import java.util.ArrayList;
import java.util.List;

import static com.alibaba.csp.sentinel.cluster.server.envoy.rls.rule.EnvoySentinelRuleConverter.SEPARATOR;

/**
* gRPC限流入口,实现envoy rls v3 api
*
* @author Winjay chan
* @date 2021/8/4
*/
public class SentinelEnvoyRlsServiceImpl extends RateLimitServiceGrpc.RateLimitServiceImplBase {
@Override
public void shouldRateLimit(RateLimitRequest request, StreamObserver<RateLimitResponse> responseObserver) {
int acquireCount = request.getHitsAddend();
if (acquireCount < 0) {
responseObserver.onError(new IllegalArgumentException(
"acquireCount should be positive, but actual: " + acquireCount));
return;
}
if (acquireCount == 0) {
// Not present, use the default "1" by default.
acquireCount = 1;
}

String domain = request.getDomain();
boolean blocked = false;
List<DescriptorStatus> statusList = new ArrayList<>(request.getDescriptorsCount());
for (RateLimitDescriptor descriptor : request.getDescriptorsList()) {
Tuple2<FlowRule, TokenResult> t = checkToken(domain, descriptor, acquireCount);
TokenResult r = t.r2;

printAccessLogIfNecessary(domain, descriptor, r);

if (r.getStatus() == TokenResultStatus.NO_RULE_EXISTS) {
// If the rule of the descriptor is absent, the request will pass directly.
r.setStatus(TokenResultStatus.OK);
}

if (!blocked && r.getStatus() != TokenResultStatus.OK) {
blocked = true;
}

Code statusCode = r.getStatus() == TokenResultStatus.OK ? Code.OK : Code.OVER_LIMIT;
DescriptorStatus.Builder descriptorStatusBuilder = DescriptorStatus.newBuilder()
.setCode(statusCode);
if (t.r1 != null) {
descriptorStatusBuilder
.setCurrentLimit(RateLimit.newBuilder().setUnit(RateLimit.Unit.SECOND)
.setRequestsPerUnit((int)t.r1.getCount())
.build())
.setLimitRemaining(r.getRemaining());
}
statusList.add(descriptorStatusBuilder.build());
}

Code overallStatus = blocked ? Code.OVER_LIMIT :Code.OK;
RateLimitResponse response = RateLimitResponse.newBuilder()
.setOverallCode(overallStatus)
.addAllStatuses(statusList)
.build();

responseObserver.onNext(response);
responseObserver.onCompleted();
}

private void printAccessLogIfNecessary(String domain, RateLimitDescriptor descriptor, TokenResult result) {
if (!RlsAccessLogger.isEnabled()) {
return;
}
String message = new StringBuilder("[RlsAccessLog] domain=").append(domain)
.append(", descriptor=").append(TextFormat.shortDebugString(descriptor))
.append(", checkStatus=").append(result.getStatus())
.append(", remaining=").append(result.getRemaining())
.toString();
RlsAccessLogger.log(message);
}

protected Tuple2<FlowRule, TokenResult> checkToken(String domain, RateLimitDescriptor descriptor, int acquireCount) {
long ruleId = EnvoySentinelRuleConverter.generateFlowId(generateKey(domain, descriptor));

FlowRule rule = ClusterFlowRuleManager.getFlowRuleById(ruleId);
if (rule == null) {
// Pass if the target rule is absent.
return Tuple2.of(null, new TokenResult(TokenResultStatus.NO_RULE_EXISTS));
}
// If the rule is present, it should be valid.
return Tuple2.of(rule, SimpleClusterFlowChecker.acquireClusterToken(rule, acquireCount));
}

private String generateKey(String domain, RateLimitDescriptor descriptor) {
StringBuilder sb = new StringBuilder(domain);
for (RateLimitDescriptor.Entry resource : descriptor.getEntriesList()) {
sb.append(SEPARATOR).append(resource.getKey()).append(SEPARATOR).append(resource.getValue());
}
return sb.toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
syntax = "proto3";

package envoy.config.core.v3;

import "udpa/annotations/status.proto";
import "udpa/annotations/versioning.proto";
import "validate/validate.proto";

option java_package = "io.envoyproxy.envoy.config.core.v3";
option java_outer_classname = "BaseProto";
option java_multiple_files = true;
option (udpa.annotations.file_status).package_version_status = ACTIVE;



// Header name/value pair.
message HeaderValue {
option (udpa.annotations.versioning).previous_message_type = "envoy.api.v2.core.HeaderValue";

// Header name.
string key = 1
[(validate.rules).string =
{min_len: 1 max_bytes: 16384 well_known_regex: HTTP_HEADER_NAME strict: false}];

// Header value.
//
// The same :ref:`format specifier <config_access_log_format>` as used for
// :ref:`HTTP access logging <config_access_log>` applies here, however
// unknown header values are replaced with the empty string instead of `-`.
string value = 2 [
(validate.rules).string = {max_bytes: 16384 well_known_regex: HTTP_HEADER_VALUE strict: false}
];
}
Loading