|
| 1 | +/* |
| 2 | + * Copyright 2024 The gRPC Authors |
| 3 | + * |
| 4 | + * Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | + * you may not use this file except in compliance with the License. |
| 6 | + * You may obtain a copy of the License at |
| 7 | + * |
| 8 | + * http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | + * |
| 10 | + * Unless required by applicable law or agreed to in writing, software |
| 11 | + * distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | + * See the License for the specific language governing permissions and |
| 14 | + * limitations under the License. |
| 15 | + */ |
| 16 | + |
| 17 | +package io.grpc.gcp.csm.observability; |
| 18 | + |
| 19 | +import com.google.common.annotations.VisibleForTesting; |
| 20 | +import com.google.common.base.Preconditions; |
| 21 | +import com.google.common.io.BaseEncoding; |
| 22 | +import com.google.protobuf.Struct; |
| 23 | +import com.google.protobuf.Value; |
| 24 | +import io.grpc.CallOptions; |
| 25 | +import io.grpc.ForwardingServerCall.SimpleForwardingServerCall; |
| 26 | +import io.grpc.Metadata; |
| 27 | +import io.grpc.ServerBuilder; |
| 28 | +import io.grpc.ServerCall; |
| 29 | +import io.grpc.ServerCallHandler; |
| 30 | +import io.grpc.ServerInterceptor; |
| 31 | +import io.grpc.Status; |
| 32 | +import io.grpc.internal.JsonParser; |
| 33 | +import io.grpc.internal.JsonUtil; |
| 34 | +import io.grpc.opentelemetry.InternalOpenTelemetryPlugin; |
| 35 | +import io.grpc.protobuf.ProtoUtils; |
| 36 | +import io.grpc.xds.ClusterImplLoadBalancerProvider; |
| 37 | +import io.grpc.xds.InternalGrpcBootstrapperImpl; |
| 38 | +import io.opentelemetry.api.common.AttributeKey; |
| 39 | +import io.opentelemetry.api.common.Attributes; |
| 40 | +import io.opentelemetry.api.common.AttributesBuilder; |
| 41 | +import io.opentelemetry.contrib.gcp.resource.GCPResourceProvider; |
| 42 | +import java.net.URI; |
| 43 | +import java.util.Map; |
| 44 | +import java.util.function.Consumer; |
| 45 | +import java.util.logging.Level; |
| 46 | +import java.util.logging.Logger; |
| 47 | + |
| 48 | +/** |
| 49 | + * OpenTelemetryPlugin implementing metadata-based workload property exchange for both client and |
| 50 | + * server. Is responsible for determining the metadata, communicating the metadata, and adding local |
| 51 | + * and remote details to metrics. |
| 52 | + */ |
| 53 | +final class MetadataExchanger implements InternalOpenTelemetryPlugin { |
| 54 | + private static final Logger logger = Logger.getLogger(MetadataExchanger.class.getName()); |
| 55 | + |
| 56 | + private static final AttributeKey<String> CLOUD_PLATFORM = |
| 57 | + AttributeKey.stringKey("cloud.platform"); |
| 58 | + private static final AttributeKey<String> K8S_NAMESPACE_NAME = |
| 59 | + AttributeKey.stringKey("k8s.namespace.name"); |
| 60 | + private static final AttributeKey<String> K8S_CLUSTER_NAME = |
| 61 | + AttributeKey.stringKey("k8s.cluster.name"); |
| 62 | + private static final AttributeKey<String> CLOUD_AVAILABILITY_ZONE = |
| 63 | + AttributeKey.stringKey("cloud.availability_zone"); |
| 64 | + private static final AttributeKey<String> CLOUD_REGION = |
| 65 | + AttributeKey.stringKey("cloud.region"); |
| 66 | + private static final AttributeKey<String> CLOUD_ACCOUNT_ID = |
| 67 | + AttributeKey.stringKey("cloud.account.id"); |
| 68 | + |
| 69 | + private static final Metadata.Key<String> SEND_KEY = |
| 70 | + Metadata.Key.of("x-envoy-peer-metadata", Metadata.ASCII_STRING_MARSHALLER); |
| 71 | + private static final Metadata.Key<Struct> RECV_KEY = |
| 72 | + Metadata.Key.of("x-envoy-peer-metadata", new BinaryToAsciiMarshaller<>( |
| 73 | + ProtoUtils.metadataMarshaller(Struct.getDefaultInstance()))); |
| 74 | + |
| 75 | + private static final String EXCHANGE_TYPE = "type"; |
| 76 | + private static final String EXCHANGE_CANONICAL_SERVICE = "canonical_service"; |
| 77 | + private static final String EXCHANGE_PROJECT_ID = "project_id"; |
| 78 | + private static final String EXCHANGE_LOCATION = "location"; |
| 79 | + private static final String EXCHANGE_CLUSTER_NAME = "cluster_name"; |
| 80 | + private static final String EXCHANGE_NAMESPACE_NAME = "namespace_name"; |
| 81 | + private static final String EXCHANGE_WORKLOAD_NAME = "workload_name"; |
| 82 | + private static final String TYPE_GKE = "gcp_kubernetes_engine"; |
| 83 | + private static final String TYPE_GCE = "gcp_compute_engine"; |
| 84 | + |
| 85 | + private final String localMetadata; |
| 86 | + private final Attributes localAttributes; |
| 87 | + |
| 88 | + public MetadataExchanger() { |
| 89 | + this( |
| 90 | + new GCPResourceProvider().getAttributes(), |
| 91 | + System::getenv, |
| 92 | + InternalGrpcBootstrapperImpl::getJsonContent); |
| 93 | + } |
| 94 | + |
| 95 | + MetadataExchanger(Attributes platformAttributes, Lookup env, Supplier<String> xdsBootstrap) { |
| 96 | + String type = platformAttributes.get(CLOUD_PLATFORM); |
| 97 | + String canonicalService = env.get("CSM_CANONICAL_SERVICE_NAME"); |
| 98 | + Struct.Builder struct = Struct.newBuilder(); |
| 99 | + put(struct, EXCHANGE_TYPE, type); |
| 100 | + put(struct, EXCHANGE_CANONICAL_SERVICE, canonicalService); |
| 101 | + if (TYPE_GKE.equals(type)) { |
| 102 | + String location = platformAttributes.get(CLOUD_AVAILABILITY_ZONE); |
| 103 | + if (location == null) { |
| 104 | + location = platformAttributes.get(CLOUD_REGION); |
| 105 | + } |
| 106 | + put(struct, EXCHANGE_WORKLOAD_NAME, env.get("CSM_WORKLOAD_NAME")); |
| 107 | + put(struct, EXCHANGE_NAMESPACE_NAME, platformAttributes.get(K8S_NAMESPACE_NAME)); |
| 108 | + put(struct, EXCHANGE_CLUSTER_NAME, platformAttributes.get(K8S_CLUSTER_NAME)); |
| 109 | + put(struct, EXCHANGE_LOCATION, location); |
| 110 | + put(struct, EXCHANGE_PROJECT_ID, platformAttributes.get(CLOUD_ACCOUNT_ID)); |
| 111 | + } else if (TYPE_GCE.equals(type)) { |
| 112 | + String location = platformAttributes.get(CLOUD_AVAILABILITY_ZONE); |
| 113 | + if (location == null) { |
| 114 | + location = platformAttributes.get(CLOUD_REGION); |
| 115 | + } |
| 116 | + put(struct, EXCHANGE_WORKLOAD_NAME, env.get("CSM_WORKLOAD_NAME")); |
| 117 | + put(struct, EXCHANGE_LOCATION, location); |
| 118 | + put(struct, EXCHANGE_PROJECT_ID, platformAttributes.get(CLOUD_ACCOUNT_ID)); |
| 119 | + } |
| 120 | + localMetadata = BaseEncoding.base64().encode(struct.build().toByteArray()); |
| 121 | + |
| 122 | + localAttributes = Attributes.builder() |
| 123 | + .put("csm.mesh_id", nullIsUnknown(getMeshId(xdsBootstrap))) |
| 124 | + .put("csm.workload_canonical_service", nullIsUnknown(canonicalService)) |
| 125 | + .build(); |
| 126 | + } |
| 127 | + |
| 128 | + private static String nullIsUnknown(String value) { |
| 129 | + return value == null ? "unknown" : value; |
| 130 | + } |
| 131 | + |
| 132 | + private static void put(Struct.Builder struct, String key, String value) { |
| 133 | + value = nullIsUnknown(value); |
| 134 | + struct.putFields(key, Value.newBuilder().setStringValue(value).build()); |
| 135 | + } |
| 136 | + |
| 137 | + private static void put(AttributesBuilder attributes, String key, Value value) { |
| 138 | + attributes.put(key, nullIsUnknown(fromValue(value))); |
| 139 | + } |
| 140 | + |
| 141 | + private static String fromValue(Value value) { |
| 142 | + if (value == null) { |
| 143 | + return null; |
| 144 | + } |
| 145 | + if (value.getKindCase() != Value.KindCase.STRING_VALUE) { |
| 146 | + return null; |
| 147 | + } |
| 148 | + return value.getStringValue(); |
| 149 | + } |
| 150 | + |
| 151 | + @VisibleForTesting |
| 152 | + static String getMeshId(Supplier<String> xdsBootstrap) { |
| 153 | + try { |
| 154 | + @SuppressWarnings("unchecked") |
| 155 | + Map<String, ?> rawBootstrap = (Map<String, ?>) JsonParser.parse(xdsBootstrap.get()); |
| 156 | + Map<String, ?> node = JsonUtil.getObject(rawBootstrap, "node"); |
| 157 | + String id = JsonUtil.getString(node, "id"); |
| 158 | + Preconditions.checkNotNull(id, "id"); |
| 159 | + String[] parts = id.split("/", 6); |
| 160 | + if (!(parts.length == 6 |
| 161 | + && parts[0].equals("projects") |
| 162 | + && parts[2].equals("networks") |
| 163 | + && parts[3].startsWith("mesh:") |
| 164 | + && parts[4].equals("nodes"))) { |
| 165 | + throw new Exception("node id didn't match mesh format: " + id); |
| 166 | + } |
| 167 | + return parts[3].substring("mesh:".length()); |
| 168 | + } catch (Exception e) { |
| 169 | + logger.log(Level.INFO, "Failed to determine mesh ID for CSM", e); |
| 170 | + return null; |
| 171 | + } |
| 172 | + } |
| 173 | + |
| 174 | + private void addLabels(AttributesBuilder to, Struct struct) { |
| 175 | + to.putAll(localAttributes); |
| 176 | + Map<String, Value> remote = struct.getFieldsMap(); |
| 177 | + Value typeValue = remote.get(EXCHANGE_TYPE); |
| 178 | + String type = fromValue(typeValue); |
| 179 | + put(to, "csm.remote_workload_type", typeValue); |
| 180 | + put(to, "csm.remote_workload_canonical_service", remote.get(EXCHANGE_CANONICAL_SERVICE)); |
| 181 | + if (TYPE_GKE.equals(type)) { |
| 182 | + put(to, "csm.remote_workload_project_id", remote.get(EXCHANGE_PROJECT_ID)); |
| 183 | + put(to, "csm.remote_workload_location", remote.get(EXCHANGE_LOCATION)); |
| 184 | + put(to, "csm.remote_workload_cluster_name", remote.get(EXCHANGE_CLUSTER_NAME)); |
| 185 | + put(to, "csm.remote_workload_namespace_name", remote.get(EXCHANGE_NAMESPACE_NAME)); |
| 186 | + put(to, "csm.remote_workload_name", remote.get(EXCHANGE_WORKLOAD_NAME)); |
| 187 | + } else if (TYPE_GCE.equals(type)) { |
| 188 | + put(to, "csm.remote_workload_project_id", remote.get(EXCHANGE_PROJECT_ID)); |
| 189 | + put(to, "csm.remote_workload_location", remote.get(EXCHANGE_LOCATION)); |
| 190 | + put(to, "csm.remote_workload_name", remote.get(EXCHANGE_WORKLOAD_NAME)); |
| 191 | + } |
| 192 | + } |
| 193 | + |
| 194 | + @Override |
| 195 | + public boolean enablePluginForChannel(String target) { |
| 196 | + URI uri; |
| 197 | + try { |
| 198 | + uri = new URI(target); |
| 199 | + } catch (Exception ex) { |
| 200 | + return false; |
| 201 | + } |
| 202 | + String authority = uri.getAuthority(); |
| 203 | + return "xds".equals(uri.getScheme()) |
| 204 | + && (authority == null || "traffic-director-global.xds.googleapis.com".equals(authority)); |
| 205 | + } |
| 206 | + |
| 207 | + @Override |
| 208 | + public ClientCallPlugin newClientCallPlugin() { |
| 209 | + return new ClientCallState(); |
| 210 | + } |
| 211 | + |
| 212 | + public void configureServerBuilder(ServerBuilder<?> serverBuilder) { |
| 213 | + serverBuilder.intercept(new ServerCallInterceptor()); |
| 214 | + } |
| 215 | + |
| 216 | + @Override |
| 217 | + public ServerStreamPlugin newServerStreamPlugin(Metadata inboundMetadata) { |
| 218 | + return new ServerStreamState(inboundMetadata.get(RECV_KEY)); |
| 219 | + } |
| 220 | + |
| 221 | + final class ClientCallState implements ClientCallPlugin { |
| 222 | + private volatile Value serviceName; |
| 223 | + private volatile Value serviceNamespace; |
| 224 | + |
| 225 | + @Override |
| 226 | + public ClientStreamPlugin newClientStreamPlugin() { |
| 227 | + return new ClientStreamState(); |
| 228 | + } |
| 229 | + |
| 230 | + @Override |
| 231 | + public CallOptions filterCallOptions(CallOptions options) { |
| 232 | + Consumer<Map<String, Struct>> existingConsumer = |
| 233 | + options.getOption(ClusterImplLoadBalancerProvider.FILTER_METADATA_CONSUMER); |
| 234 | + return options.withOption( |
| 235 | + ClusterImplLoadBalancerProvider.FILTER_METADATA_CONSUMER, |
| 236 | + (Map<String, Struct> clusterMetadata) -> { |
| 237 | + metadataConsumer(clusterMetadata); |
| 238 | + existingConsumer.accept(clusterMetadata); |
| 239 | + }); |
| 240 | + } |
| 241 | + |
| 242 | + private void metadataConsumer(Map<String, Struct> clusterMetadata) { |
| 243 | + Struct struct = clusterMetadata.get("com.google.csm.telemetry_labels"); |
| 244 | + if (struct == null) { |
| 245 | + struct = Struct.getDefaultInstance(); |
| 246 | + } |
| 247 | + serviceName = struct.getFieldsMap().get("service_name"); |
| 248 | + serviceNamespace = struct.getFieldsMap().get("service_namespace"); |
| 249 | + } |
| 250 | + |
| 251 | + @Override |
| 252 | + public void addMetadata(Metadata toMetadata) { |
| 253 | + toMetadata.put(SEND_KEY, localMetadata); |
| 254 | + } |
| 255 | + |
| 256 | + class ClientStreamState implements ClientStreamPlugin { |
| 257 | + private Struct receivedExchange; |
| 258 | + |
| 259 | + @Override |
| 260 | + public void inboundHeaders(Metadata headers) { |
| 261 | + setExchange(headers); |
| 262 | + } |
| 263 | + |
| 264 | + @Override |
| 265 | + public void inboundTrailers(Metadata trailers) { |
| 266 | + if (receivedExchange != null) { |
| 267 | + return; // Received headers |
| 268 | + } |
| 269 | + setExchange(trailers); |
| 270 | + } |
| 271 | + |
| 272 | + private void setExchange(Metadata metadata) { |
| 273 | + Struct received = metadata.get(RECV_KEY); |
| 274 | + if (received == null) { |
| 275 | + receivedExchange = Struct.getDefaultInstance(); |
| 276 | + } else { |
| 277 | + receivedExchange = received; |
| 278 | + } |
| 279 | + } |
| 280 | + |
| 281 | + @Override |
| 282 | + public void addLabels(AttributesBuilder to) { |
| 283 | + put(to, "csm.service_name", serviceName); |
| 284 | + put(to, "csm.service_namespace", serviceNamespace); |
| 285 | + Struct exchange = receivedExchange; |
| 286 | + if (exchange == null) { |
| 287 | + exchange = Struct.getDefaultInstance(); |
| 288 | + } |
| 289 | + MetadataExchanger.this.addLabels(to, exchange); |
| 290 | + } |
| 291 | + } |
| 292 | + } |
| 293 | + |
| 294 | + final class ServerCallInterceptor implements ServerInterceptor { |
| 295 | + @Override |
| 296 | + public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall( |
| 297 | + ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) { |
| 298 | + if (!headers.containsKey(RECV_KEY)) { |
| 299 | + return next.startCall(call, headers); |
| 300 | + } else { |
| 301 | + return next.startCall(new SimpleForwardingServerCall<ReqT, RespT>(call) { |
| 302 | + private boolean headersSent; |
| 303 | + |
| 304 | + @Override |
| 305 | + public void sendHeaders(Metadata headers) { |
| 306 | + headersSent = true; |
| 307 | + headers.put(SEND_KEY, localMetadata); |
| 308 | + super.sendHeaders(headers); |
| 309 | + } |
| 310 | + |
| 311 | + @Override |
| 312 | + public void close(Status status, Metadata trailers) { |
| 313 | + if (!headersSent) { |
| 314 | + trailers.put(SEND_KEY, localMetadata); |
| 315 | + } |
| 316 | + super.close(status, trailers); |
| 317 | + } |
| 318 | + }, headers); |
| 319 | + } |
| 320 | + } |
| 321 | + } |
| 322 | + |
| 323 | + final class ServerStreamState implements ServerStreamPlugin { |
| 324 | + private final Struct receivedExchange; |
| 325 | + |
| 326 | + ServerStreamState(Struct exchange) { |
| 327 | + if (exchange == null) { |
| 328 | + exchange = Struct.getDefaultInstance(); |
| 329 | + } |
| 330 | + receivedExchange = exchange; |
| 331 | + } |
| 332 | + |
| 333 | + @Override |
| 334 | + public void addLabels(AttributesBuilder to) { |
| 335 | + MetadataExchanger.this.addLabels(to, receivedExchange); |
| 336 | + } |
| 337 | + } |
| 338 | + |
| 339 | + interface Lookup { |
| 340 | + String get(String name); |
| 341 | + } |
| 342 | + |
| 343 | + interface Supplier<T> { |
| 344 | + T get() throws Exception; |
| 345 | + } |
| 346 | + |
| 347 | + static final class BinaryToAsciiMarshaller<T> implements Metadata.AsciiMarshaller<T> { |
| 348 | + private final Metadata.BinaryMarshaller<T> delegate; |
| 349 | + |
| 350 | + public BinaryToAsciiMarshaller(Metadata.BinaryMarshaller<T> delegate) { |
| 351 | + this.delegate = Preconditions.checkNotNull(delegate, "delegate"); |
| 352 | + } |
| 353 | + |
| 354 | + @Override |
| 355 | + public T parseAsciiString(String serialized) { |
| 356 | + return delegate.parseBytes(BaseEncoding.base64().decode(serialized)); |
| 357 | + } |
| 358 | + |
| 359 | + @Override |
| 360 | + public String toAsciiString(T value) { |
| 361 | + return BaseEncoding.base64().encode(delegate.toBytes(value)); |
| 362 | + } |
| 363 | + } |
| 364 | +} |
0 commit comments