From 261daa37cfe4abbae6e12ef5706a941b4357b815 Mon Sep 17 00:00:00 2001 From: mrproliu <741550557@qq.com> Date: Thu, 18 Jan 2024 22:51:04 +0800 Subject: [PATCH] Support transmit eBPF Access Log Protocol (#150) --- CHANGES.md | 15 +-- changes/changes-1.2.0.md | 21 +++ configs/satellite_config.yaml | 26 ++++ ...er_native-ebpf-accesslog-grpc-forwarder.md | 9 ++ docs/en/setup/plugins/plugin-list.md | 6 +- ...ver_grpc-native-ebpf-accesslog-receiver.md | 11 ++ docs/menu.yml | 12 +- go.mod | 2 +- go.sum | 14 +- plugins/forwarder/forwarder_repository.go | 2 + .../grpc/nativeebpfaccesslog/forwarder.go | 124 ++++++++++++++++++ .../grpc/nativeebpfaccesslog/access_log.go | 67 ++++++++++ .../grpc/nativeebpfaccesslog/receiver.go | 77 +++++++++++ plugins/receiver/receiver_repository.go | 2 + 14 files changed, 367 insertions(+), 21 deletions(-) create mode 100644 changes/changes-1.2.0.md create mode 100755 docs/en/setup/plugins/forwarder_native-ebpf-accesslog-grpc-forwarder.md create mode 100755 docs/en/setup/plugins/receiver_grpc-native-ebpf-accesslog-receiver.md create mode 100644 plugins/forwarder/grpc/nativeebpfaccesslog/forwarder.go create mode 100644 plugins/receiver/grpc/nativeebpfaccesslog/access_log.go create mode 100644 plugins/receiver/grpc/nativeebpfaccesslog/receiver.go diff --git a/CHANGES.md b/CHANGES.md index 427bcb26..6f4bd810 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -2,20 +2,13 @@ Changes by Version ================== Release Notes. -1.2.0 +1.3.0 ------------------ #### Features -* Introduce `pprof` module. -* Support export multiple `telemetry` service. -* Update the base docker image. -* Add timeout configuration for gRPC client. -* Reduce log print when the enqueue data to the pipeline error. -* Support transmit the Continuous Profiling protocol. +* Support native eBPF Access Log protocol. #### Bug Fixes -* Fix [CVE-2022-41721](https://avd.aquasec.com/nvd/cve-2022-41721). -* Use Go 19 to build the Docker image to fix CVEs. #### Issues and PR -- All issues are [here](https://github.com/apache/skywalking/milestone/170?closed=1) -- All and pull requests are [here](https://github.com/apache/skywalking-satellite/pulls?q=is%3Apr+milestone%3A1.2.0+is%3Aclosed) +- All issues are [here](https://github.com/apache/skywalking/milestone/188?closed=1) +- All and pull requests are [here](https://github.com/apache/skywalking-satellite/pulls?q=is%3Apr+milestone%3A1.3.0+is%3Aclosed) diff --git a/changes/changes-1.2.0.md b/changes/changes-1.2.0.md new file mode 100644 index 00000000..427bcb26 --- /dev/null +++ b/changes/changes-1.2.0.md @@ -0,0 +1,21 @@ +Changes by Version +================== +Release Notes. + +1.2.0 +------------------ +#### Features +* Introduce `pprof` module. +* Support export multiple `telemetry` service. +* Update the base docker image. +* Add timeout configuration for gRPC client. +* Reduce log print when the enqueue data to the pipeline error. +* Support transmit the Continuous Profiling protocol. + +#### Bug Fixes +* Fix [CVE-2022-41721](https://avd.aquasec.com/nvd/cve-2022-41721). +* Use Go 19 to build the Docker image to fix CVEs. + +#### Issues and PR +- All issues are [here](https://github.com/apache/skywalking/milestone/170?closed=1) +- All and pull requests are [here](https://github.com/apache/skywalking-satellite/pulls?q=is%3Apr+milestone%3A1.2.0+is%3Aclosed) diff --git a/configs/satellite_config.yaml b/configs/satellite_config.yaml index fa9e559b..e1acb676 100644 --- a/configs/satellite_config.yaml +++ b/configs/satellite_config.yaml @@ -543,6 +543,32 @@ pipes: client_name: grpc-client forwarders: - plugin_name: native-ebpf-profiling-grpc-forwarder + - common_config: + pipe_name: ebpf-accesslog-pipe + gatherer: + server_name: "grpc-server" + receiver: + plugin_name: "grpc-native-ebpf-accesslog-receiver" + queue: + plugin_name: "memory-queue" + # The maximum buffer event size. + event_buffer_size: ${SATELLITE_QUEUE_EVENT_BUFFER_SIZE:5000} + # The partition count of queue. + partition: ${SATELLITE_QUEUE_PARTITION:4} + processor: + filters: + sender: + fallbacker: + plugin_name: none-fallbacker + # The time interval between two flush operations. And the time unit is millisecond. + flush_time: ${SATELLITE_EBPFACCESSLOG_SENDER_FLUSH_TIME:1000} + # The maximum buffer elements. + max_buffer_size: ${SATELLITE_EBPFPACCESSLOG_SENDER_MAX_BUFFER_SIZE:200} + # The minimum flush elements. + min_flush_events: ${SATELLITE_EBPFACCESSLOG_SENDER_MIN_FLUSH_EVENTS:1} + client_name: grpc-client + forwarders: + - plugin_name: native-ebpf-accesslog-grpc-forwarder - common_config: pipe_name: otlp-metrics-v1-pipe gatherer: diff --git a/docs/en/setup/plugins/forwarder_native-ebpf-accesslog-grpc-forwarder.md b/docs/en/setup/plugins/forwarder_native-ebpf-accesslog-grpc-forwarder.md new file mode 100755 index 00000000..80083e15 --- /dev/null +++ b/docs/en/setup/plugins/forwarder_native-ebpf-accesslog-grpc-forwarder.md @@ -0,0 +1,9 @@ +# Forwarder/native-ebpf-accesslog-grpc-forwarder +## Description +This is a synchronization grpc forwarder with the SkyWalking native eBPF access log protocol. +## DefaultConfig +```yaml``` +## Configuration +|Name|Type|Description| +|----|----|-----------| + diff --git a/docs/en/setup/plugins/plugin-list.md b/docs/en/setup/plugins/plugin-list.md index 1c2b47cd..ab92c433 100755 --- a/docs/en/setup/plugins/plugin-list.md +++ b/docs/en/setup/plugins/plugin-list.md @@ -13,10 +13,11 @@ - [Envoy Metrics v2 GRPC Forwarder](./forwarder_envoy-metrics-v2-grpc-forwarder.md) - [Envoy Metrics v3 GRPC Forwarder](./forwarder_envoy-metrics-v3-grpc-forwarder.md) - [Native CDS GRPC Forwarder](./forwarder_native-cds-grpc-forwarder.md) + - [Native CLR GRPC Forwarder](./forwarder_native-clr-grpc-forwarder.md) + - [GRPC Native EBFP Access Log Forwarder](./forwarder_native-ebpf-accesslog-grpc-forwarder.md) - [Native EBPF Profiling GRPC Forwarder](./forwarder_native-ebpf-profiling-grpc-forwarder.md) - [Native Event GRPC Forwarder](./forwarder_native-event-grpc-forwarder.md) - [Native JVM GRPC Forwarder](./forwarder_native-jvm-grpc-forwarder.md) - - [Native CLR GRPC Forwarder](./forwarder_native-clr-grpc-forwarder.md) - [Native Log GRPC Forwarder](./forwarder_native-log-grpc-forwarder.md) - [Native Log Kafka Forwarder](./forwarder_native-log-kafka-forwarder.md) - [Native Management GRPC Forwarder](./forwarder_native-management-grpc-forwarder.md) @@ -36,10 +37,11 @@ - [GRPC Envoy Metrics v2 Receiver](./receiver_grpc-envoy-metrics-v2-receiver.md) - [GRPC Envoy Metrics v3 Receiver](./receiver_grpc-envoy-metrics-v3-receiver.md) - [GRPC Native CDS Receiver](./receiver_grpc-native-cds-receiver.md) + - [GRPC Native CLR Receiver](./receiver_grpc-native-clr-receiver.md) + - [GRPC Native EBFP Accesslog Receiver](./receiver_grpc-native-ebpf-accesslog-receiver.md) - [GRPC Native EBFP Profiling Receiver](./receiver_grpc-native-ebpf-profiling-receiver.md) - [GRPC Native Event Receiver](./receiver_grpc-native-event-receiver.md) - [GRPC Native JVM Receiver](./receiver_grpc-native-jvm-receiver.md) - - [GRPC Native CLR Receiver](./receiver_grpc-native-clr-receiver.md) - [GRPC Native Log Receiver](./receiver_grpc-native-log-receiver.md) - [GRPC Native Management Receiver](./receiver_grpc-native-management-receiver.md) - [GRPC Native Meter Receiver](./receiver_grpc-native-meter-receiver.md) diff --git a/docs/en/setup/plugins/receiver_grpc-native-ebpf-accesslog-receiver.md b/docs/en/setup/plugins/receiver_grpc-native-ebpf-accesslog-receiver.md new file mode 100755 index 00000000..e55fa3e1 --- /dev/null +++ b/docs/en/setup/plugins/receiver_grpc-native-ebpf-accesslog-receiver.md @@ -0,0 +1,11 @@ +# Receiver/grpc-native-ebpf-accesslog-receiver +## Description +This is a receiver for SkyWalking native accesslog format, which is defined at https://github.com/apache/skywalking-data-collect-protocol/blob/master/ebpf/accesslog.proto. +## Support Forwarders + - [native-ebpf-accesslog-grpc-forwarder](forwarder_native-ebpf-accesslog-grpc-forwarder.md) +## DefaultConfig +```yaml``` +## Configuration +|Name|Type|Description| +|----|----|-----------| + diff --git a/docs/menu.yml b/docs/menu.yml index 917c858d..0196bf39 100644 --- a/docs/menu.yml +++ b/docs/menu.yml @@ -93,14 +93,16 @@ catalog: path: /en/setup/plugins/forwarder_envoy-metrics-v3-grpc-forwarder - name: Native CDS GRPC Forwarder path: /en/setup/plugins/forwarder_native-cds-grpc-forwarder + - name: Native CLR GRPC Forwarder + path: /en/setup/plugins/forwarder_native-clr-grpc-forwarder + - name: GRPC Native EBFP Access Log Forwarder + path: /en/setup/plugins/forwarder_native-ebpf-accesslog-grpc-forwarder - name: Native EBPF Profiling GRPC Forwarder path: /en/setup/plugins/forwarder_native-ebpf-profiling-grpc-forwarder - name: Native Event GRPC Forwarder path: /en/setup/plugins/forwarder_native-event-grpc-forwarder - name: Native JVM GRPC Forwarder path: /en/setup/plugins/forwarder_native-jvm-grpc-forwarder - - name: Native CLR GRPC Forwarder - path: /en/setup/plugins/forwarder_native-clr-grpc-forwarder - name: Native Log GRPC Forwarder path: /en/setup/plugins/forwarder_native-log-grpc-forwarder - name: Native Log Kafka Forwarder @@ -137,14 +139,16 @@ catalog: path: /en/setup/plugins/receiver_grpc-envoy-metrics-v3-receiver - name: GRPC Native CDS Receiver path: /en/setup/plugins/receiver_grpc-native-cds-receiver + - name: GRPC Native CLR Receiver + path: /en/setup/plugins/receiver_grpc-native-clr-receiver + - name: GRPC Native EBFP Accesslog Receiver + path: /en/setup/plugins/receiver_grpc-native-ebpf-accesslog-receiver - name: GRPC Native EBFP Profiling Receiver path: /en/setup/plugins/receiver_grpc-native-ebpf-profiling-receiver - name: GRPC Native Event Receiver path: /en/setup/plugins/receiver_grpc-native-event-receiver - name: GRPC Native JVM Receiver path: /en/setup/plugins/receiver_grpc-native-jvm-receiver - - name: GRPC Native CLR Receiver - path: /en/setup/plugins/receiver_grpc-native-clr-receiver - name: GRPC Native Log Receiver path: /en/setup/plugins/receiver_grpc-native-log-receiver - name: GRPC Native Management Receiver diff --git a/go.mod b/go.mod index 51cd87cb..ef8475fd 100644 --- a/go.mod +++ b/go.mod @@ -22,7 +22,7 @@ require ( google.golang.org/protobuf v1.31.0 gopkg.in/yaml.v3 v3.0.1 k8s.io/apimachinery v0.26.2 - skywalking.apache.org/repo/goapi v0.0.0-20230531132709-826aefddf3cb + skywalking.apache.org/repo/goapi v0.0.0-20240118085825-a7fa70cc38b7 ) require ( diff --git a/go.sum b/go.sum index 621a3023..f60d5926 100644 --- a/go.sum +++ b/go.sum @@ -489,6 +489,7 @@ golang.org/x/mod v0.4.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.1/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/mod v0.9.0 h1:KENHtAZL2y3NLMYZeHY9DW8HW8V+kQyJsY/V9JlKvCs= golang.org/x/mod v0.9.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -531,6 +532,7 @@ golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96b golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -556,6 +558,7 @@ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -605,11 +608,14 @@ golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.14.0 h1:Vz7Qs629MkJkGyHxUlRHizWJRG2j8fbQKjELVSNhy7Q= golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= +golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= golang.org/x/term v0.13.0 h1:bb+I9cTfFazGW51MZqBVmZy7+JEJMouUHTUSKVQLBek= golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -622,6 +628,8 @@ golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= @@ -683,6 +691,7 @@ golang.org/x/tools v0.0.0-20210108195828-e2f9c7f1fc8e/go.mod h1:emZCQorbCU4vsT4f golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= golang.org/x/tools v0.1.3/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -790,7 +799,6 @@ google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlba google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= -google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= @@ -858,5 +866,5 @@ sigs.k8s.io/structured-merge-diff/v4 v4.2.3 h1:PRbqxJClWWYMNV1dhaG4NsibJbArud9kF sigs.k8s.io/structured-merge-diff/v4 v4.2.3/go.mod h1:qjx8mGObPmV2aSZepjQjbmb2ihdVs8cGKBraizNC69E= sigs.k8s.io/yaml v1.3.0 h1:a2VclLzOGrwOHDiV8EfBGhvjHvP46CtW5j6POvhYGGo= sigs.k8s.io/yaml v1.3.0/go.mod h1:GeOyir5tyXNByN85N/dRIT9es5UQNerPYEKK56eTBm8= -skywalking.apache.org/repo/goapi v0.0.0-20230531132709-826aefddf3cb h1:rsExxPGSCqiTScUfph4R3uNfQbVvaqMXYz84Hx3W6NI= -skywalking.apache.org/repo/goapi v0.0.0-20230531132709-826aefddf3cb/go.mod h1:bW4dg0GUN4rMCMS8DLlaY3ZiKUAJ1fQYKoZ91Bl0kTk= +skywalking.apache.org/repo/goapi v0.0.0-20240118085825-a7fa70cc38b7 h1:iUx3ovyKy4IMlXv0hB/qRYvUsCIQkAU6DLnLoCt6Qck= +skywalking.apache.org/repo/goapi v0.0.0-20240118085825-a7fa70cc38b7/go.mod h1:oD2dxcDAHVIt95Ee7kJHgZ5f64QNhrqTjQYARwfafc4= diff --git a/plugins/forwarder/forwarder_repository.go b/plugins/forwarder/forwarder_repository.go index 2579552a..fe19e937 100644 --- a/plugins/forwarder/forwarder_repository.go +++ b/plugins/forwarder/forwarder_repository.go @@ -26,6 +26,7 @@ import ( "github.com/apache/skywalking-satellite/plugins/forwarder/grpc/envoymetricsv3" grpc_nativecds "github.com/apache/skywalking-satellite/plugins/forwarder/grpc/nativecds" grpc_nativeclr "github.com/apache/skywalking-satellite/plugins/forwarder/grpc/nativeclr" + grpc_nativeebpfaccesslog "github.com/apache/skywalking-satellite/plugins/forwarder/grpc/nativeebpfaccesslog" grpc_nativeebpfprofiling "github.com/apache/skywalking-satellite/plugins/forwarder/grpc/nativeebpfprofiling" grpc_nativeevent "github.com/apache/skywalking-satellite/plugins/forwarder/grpc/nativeevent" grpc_nativejvm "github.com/apache/skywalking-satellite/plugins/forwarder/grpc/nativejvm" @@ -64,6 +65,7 @@ func RegisterForwarderPlugins() { new(envoymetricsv2.Forwarder), new(envoymetricsv3.Forwarder), new(otlpmetricsv1.Forwarder), + new(grpc_nativeebpfaccesslog.Forwarder), } for _, forwarder := range forwarders { plugin.RegisterPlugin(forwarder) diff --git a/plugins/forwarder/grpc/nativeebpfaccesslog/forwarder.go b/plugins/forwarder/grpc/nativeebpfaccesslog/forwarder.go new file mode 100644 index 00000000..a2ec1f21 --- /dev/null +++ b/plugins/forwarder/grpc/nativeebpfaccesslog/forwarder.go @@ -0,0 +1,124 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package nativeebpfaccesslog + +import ( + "context" + "fmt" + "io" + "reflect" + + "google.golang.org/grpc" + + "github.com/apache/skywalking-satellite/internal/pkg/config" + "github.com/apache/skywalking-satellite/internal/pkg/log" + "github.com/apache/skywalking-satellite/internal/satellite/event" + server_grpc "github.com/apache/skywalking-satellite/plugins/server/grpc" + + v3 "skywalking.apache.org/repo/goapi/collect/ebpf/accesslog/v3" + v1 "skywalking.apache.org/repo/goapi/satellite/data/v1" +) + +const ( + Name = "native-ebpf-accesslog-grpc-forwarder" + ShowName = "GRPC Native EBFP Access Log Forwarder" +) + +type Forwarder struct { + config.CommonFields + + accessClient v3.EBPFAccessLogServiceClient +} + +func (f *Forwarder) Name() string { + return Name +} + +func (f *Forwarder) ShowName() string { + return ShowName +} + +func (f *Forwarder) Description() string { + return "This is a synchronization grpc forwarder with the SkyWalking native eBPF access log protocol." +} + +func (f *Forwarder) DefaultConfig() string { + return `` +} + +func (f *Forwarder) Prepare(connection interface{}) error { + client, ok := connection.(*grpc.ClientConn) + if !ok { + return fmt.Errorf("the %s only accepts a grpc client, but received a %s", + f.Name(), reflect.TypeOf(connection).String()) + } + f.accessClient = v3.NewEBPFAccessLogServiceClient(client) + return nil +} + +func (f *Forwarder) Forward(batch event.BatchEvents) (err error) { + var stream v3.EBPFAccessLogService_CollectClient + for _, e := range batch { + data, ok := e.GetData().(*v1.SniffData_EBPFAccessLogList) + if !ok { + continue + } + + stream, err = f.accessClient.Collect(context.Background()) + if err != nil { + log.Logger.Errorf("open grpc stream error: %v", err) + return err + } + + streamClosed := false + for _, message := range data.EBPFAccessLogList.Messages { + err := stream.SendMsg(server_grpc.NewOriginalData(message)) + if err != nil { + log.Logger.Errorf("%s send log data error: %v", f.Name(), err) + f.closeStream(stream) + streamClosed = true + break + } + } + + if !streamClosed { + f.closeStream(stream) + } + } + + return nil +} + +func (f *Forwarder) closeStream(stream v3.EBPFAccessLogService_CollectClient) { + _, err := stream.CloseAndRecv() + if err != nil && err != io.EOF { + log.Logger.Errorf("%s close stream error: %v", f.Name(), err) + } +} + +func (f *Forwarder) ForwardType() v1.SniffType { + return v1.SniffType_EBPFAccessLogType +} + +func (f *Forwarder) SyncForward(e *v1.SniffData) (*v1.SniffData, error) { + return nil, fmt.Errorf("unsupport sync forward") +} + +func (f *Forwarder) SupportedSyncInvoke() bool { + return false +} diff --git a/plugins/receiver/grpc/nativeebpfaccesslog/access_log.go b/plugins/receiver/grpc/nativeebpfaccesslog/access_log.go new file mode 100644 index 00000000..f1375dd6 --- /dev/null +++ b/plugins/receiver/grpc/nativeebpfaccesslog/access_log.go @@ -0,0 +1,67 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package nativeebpfaccesslog + +import ( + "io" + "time" + + "github.com/apache/skywalking-satellite/plugins/server/grpc" + + v3 "skywalking.apache.org/repo/goapi/collect/ebpf/accesslog/v3" + v1 "skywalking.apache.org/repo/goapi/satellite/data/v1" +) + +var eventName = "grpc-accesslog-event" + +type AccessLogService struct { + OutputChannel chan *v1.SniffData + + v3.UnimplementedEBPFAccessLogServiceServer +} + +func (a *AccessLogService) Collect(stream v3.EBPFAccessLogService_CollectServer) error { + result := make([][]byte, 0) + originalData := grpc.NewOriginalData(nil) + for { + if err := stream.RecvMsg(originalData); err == io.EOF { + a.sendData(result) + return stream.SendAndClose(&v3.EBPFAccessLogDownstream{}) + } else if err != nil { + a.sendData(result) + return err + } + result = append(result, originalData.Content) + } +} + +func (a *AccessLogService) sendData(dataList [][]byte) { + e := &v1.SniffData{ + Name: eventName, + Timestamp: time.Now().UnixNano() / 1e6, + Meta: nil, + Type: v1.SniffType_EBPFAccessLogType, + Remote: true, + Data: &v1.SniffData_EBPFAccessLogList{ + EBPFAccessLogList: &v1.EBPFAccessLogList{ + Messages: dataList, + }, + }, + } + a.OutputChannel <- e +} diff --git a/plugins/receiver/grpc/nativeebpfaccesslog/receiver.go b/plugins/receiver/grpc/nativeebpfaccesslog/receiver.go new file mode 100644 index 00000000..46ce5dc0 --- /dev/null +++ b/plugins/receiver/grpc/nativeebpfaccesslog/receiver.go @@ -0,0 +1,77 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package nativeebpfaccesslog + +import ( + "github.com/apache/skywalking-satellite/internal/pkg/config" + module "github.com/apache/skywalking-satellite/internal/satellite/module/api" + forwarder "github.com/apache/skywalking-satellite/plugins/forwarder/api" + forwarder_nativeebpfaccesslog "github.com/apache/skywalking-satellite/plugins/forwarder/grpc/nativeebpfaccesslog" + grpcreceiver "github.com/apache/skywalking-satellite/plugins/receiver/grpc" + + v3 "skywalking.apache.org/repo/goapi/collect/ebpf/accesslog/v3" + v1 "skywalking.apache.org/repo/goapi/satellite/data/v1" +) + +const ( + Name = "grpc-native-ebpf-accesslog-receiver" + ShowName = "GRPC Native EBFP Accesslog Receiver" +) + +type Receiver struct { + config.CommonFields + grpcreceiver.CommonGRPCReceiverFields + + accesslogService *AccessLogService +} + +func (r *Receiver) Name() string { + return Name +} + +func (r *Receiver) ShowName() string { + return ShowName +} + +func (r *Receiver) Description() string { + return "This is a receiver for SkyWalking native accesslog format, " + + "which is defined at https://github.com/apache/skywalking-data-collect-protocol/blob/master/ebpf/accesslog.proto." +} + +func (r *Receiver) DefaultConfig() string { + return "" +} + +func (r *Receiver) RegisterHandler(server interface{}) { + r.CommonGRPCReceiverFields = *grpcreceiver.InitCommonGRPCReceiverFields(server) + r.accesslogService = &AccessLogService{OutputChannel: r.OutputChannel} + v3.RegisterEBPFAccessLogServiceServer(r.Server, r.accesslogService) +} + +func (r *Receiver) RegisterSyncInvoker(invoker module.SyncInvoker) { +} + +func (r *Receiver) Channel() <-chan *v1.SniffData { + return r.OutputChannel +} + +func (r *Receiver) SupportForwarders() []forwarder.Forwarder { + return []forwarder.Forwarder{ + &forwarder_nativeebpfaccesslog.Forwarder{}, + } +} diff --git a/plugins/receiver/receiver_repository.go b/plugins/receiver/receiver_repository.go index d75ea41d..7a8da897 100644 --- a/plugins/receiver/receiver_repository.go +++ b/plugins/receiver/receiver_repository.go @@ -28,6 +28,7 @@ import ( "github.com/apache/skywalking-satellite/plugins/receiver/grpc/envoymetricsv3" grpcnativecds "github.com/apache/skywalking-satellite/plugins/receiver/grpc/nativecds" grpcnativeclr "github.com/apache/skywalking-satellite/plugins/receiver/grpc/nativeclr" + grpcnativeebpfaccesslog "github.com/apache/skywalking-satellite/plugins/receiver/grpc/nativeebpfaccesslog" grpcnativeebpfprofiling "github.com/apache/skywalking-satellite/plugins/receiver/grpc/nativeebpfprofiling" grpcnativeevent "github.com/apache/skywalking-satellite/plugins/receiver/grpc/nativeevent" grpcnativejvm "github.com/apache/skywalking-satellite/plugins/receiver/grpc/nativejvm" @@ -63,6 +64,7 @@ func RegisterReceiverPlugins() { new(envoymetricsv2.Receiver), new(envoymetricsv3.Receiver), new(otlpmetricsv1.Receiver), + new(grpcnativeebpfaccesslog.Receiver), } for _, receiver := range receivers { plugin.RegisterPlugin(receiver)