001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client.trace;
019
020import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.CONTAINER_DB_OPERATIONS_KEY;
021import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.DB_OPERATION;
022
023import io.opentelemetry.api.common.AttributeKey;
024import io.opentelemetry.api.trace.Span;
025import io.opentelemetry.api.trace.SpanBuilder;
026import io.opentelemetry.api.trace.SpanKind;
027import java.util.Arrays;
028import java.util.Collection;
029import java.util.HashMap;
030import java.util.HashSet;
031import java.util.List;
032import java.util.Map;
033import java.util.Set;
034import java.util.function.Supplier;
035import java.util.stream.Collectors;
036import java.util.stream.Stream;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.client.Append;
039import org.apache.hadoop.hbase.client.AsyncConnectionImpl;
040import org.apache.hadoop.hbase.client.CheckAndMutate;
041import org.apache.hadoop.hbase.client.ClusterConnection;
042import org.apache.hadoop.hbase.client.Delete;
043import org.apache.hadoop.hbase.client.Get;
044import org.apache.hadoop.hbase.client.Increment;
045import org.apache.hadoop.hbase.client.Put;
046import org.apache.hadoop.hbase.client.RegionCoprocessorServiceExec;
047import org.apache.hadoop.hbase.client.Row;
048import org.apache.hadoop.hbase.client.RowMutations;
049import org.apache.hadoop.hbase.client.Scan;
050import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.Operation;
051import org.apache.hadoop.hbase.trace.TraceUtil;
052import org.apache.yetus.audience.InterfaceAudience;
053
054/**
055 * Construct {@link Span} instances originating from "table operations" -- the verbs in our public
056 * API that interact with data in tables.
057 */
058@InterfaceAudience.Private
059public class TableOperationSpanBuilder implements Supplier<Span> {
060
061  // n.b. The results of this class are tested implicitly by way of the likes of
062  // `TestAsyncTableTracing` and friends.
063
064  private static final String unknown = "UNKNOWN";
065
066  private TableName tableName;
067  private final Map<AttributeKey<?>, Object> attributes = new HashMap<>();
068
069  public TableOperationSpanBuilder(final ClusterConnection conn) {
070    ConnectionSpanBuilder.populateConnectionAttributes(attributes, conn);
071  }
072
073  public TableOperationSpanBuilder(final AsyncConnectionImpl conn) {
074    ConnectionSpanBuilder.populateConnectionAttributes(attributes, conn);
075  }
076
077  @Override
078  public Span get() {
079    return build();
080  }
081
082  public TableOperationSpanBuilder setOperation(final Scan scan) {
083    return setOperation(valueFrom(scan));
084  }
085
086  public TableOperationSpanBuilder setOperation(final Row row) {
087    return setOperation(valueFrom(row));
088  }
089
090  @SuppressWarnings("unused")
091  public TableOperationSpanBuilder setOperation(final Collection<? extends Row> operations) {
092    return setOperation(Operation.BATCH);
093  }
094
095  public TableOperationSpanBuilder setOperation(final Operation operation) {
096    attributes.put(DB_OPERATION, operation.name());
097    return this;
098  }
099
100  // `setContainerOperations` perform a recursive descent expansion of all the operations
101  // contained within the provided "batch" object.
102
103  public TableOperationSpanBuilder setContainerOperations(final RowMutations mutations) {
104    final Operation[] ops = mutations.getMutations().stream()
105      .flatMap(row -> Stream.concat(Stream.of(valueFrom(row)), unpackRowOperations(row).stream()))
106      .toArray(Operation[]::new);
107    return setContainerOperations(ops);
108  }
109
110  public TableOperationSpanBuilder setContainerOperations(final Row row) {
111    final Operation[] ops =
112      Stream.concat(Stream.of(valueFrom(row)), unpackRowOperations(row).stream())
113        .toArray(Operation[]::new);
114    return setContainerOperations(ops);
115  }
116
117  public TableOperationSpanBuilder
118    setContainerOperations(final Collection<? extends Row> operations) {
119    final Operation[] ops = operations.stream()
120      .flatMap(row -> Stream.concat(Stream.of(valueFrom(row)), unpackRowOperations(row).stream()))
121      .toArray(Operation[]::new);
122    return setContainerOperations(ops);
123  }
124
125  private static Set<Operation> unpackRowOperations(final Row row) {
126    final Set<Operation> ops = new HashSet<>();
127    if (row instanceof CheckAndMutate) {
128      final CheckAndMutate cam = (CheckAndMutate) row;
129      ops.addAll(unpackRowOperations(cam));
130    }
131    if (row instanceof RowMutations) {
132      final RowMutations mutations = (RowMutations) row;
133      final List<Operation> operations = mutations.getMutations().stream()
134        .map(TableOperationSpanBuilder::valueFrom).collect(Collectors.toList());
135      ops.addAll(operations);
136    }
137    return ops;
138  }
139
140  private static Set<Operation> unpackRowOperations(final CheckAndMutate cam) {
141    final Set<Operation> ops = new HashSet<>();
142    final Operation op = valueFrom(cam.getAction());
143    switch (op) {
144      case BATCH:
145      case CHECK_AND_MUTATE:
146        ops.addAll(unpackRowOperations(cam.getAction()));
147        break;
148      default:
149        ops.add(op);
150    }
151    return ops;
152  }
153
154  public TableOperationSpanBuilder setContainerOperations(final Operation... operations) {
155    final List<String> ops = Arrays.stream(operations).map(op -> op == null ? unknown : op.name())
156      .sorted().distinct().collect(Collectors.toList());
157    attributes.put(CONTAINER_DB_OPERATIONS_KEY, ops);
158    return this;
159  }
160
161  public TableOperationSpanBuilder setTableName(final TableName tableName) {
162    this.tableName = tableName;
163    TableSpanBuilder.populateTableNameAttributes(attributes, tableName);
164    return this;
165  }
166
167  @SuppressWarnings("unchecked")
168  public Span build() {
169    final String name = attributes.getOrDefault(DB_OPERATION, unknown) + " "
170      + (tableName != null ? tableName.getNameWithNamespaceInclAsString() : unknown);
171    final SpanBuilder builder = TraceUtil.getGlobalTracer().spanBuilder(name)
172      // TODO: what about clients embedded in Master/RegionServer/Gateways/&c?
173      .setSpanKind(SpanKind.CLIENT);
174    attributes.forEach((k, v) -> builder.setAttribute((AttributeKey<? super Object>) k, v));
175    return builder.startSpan();
176  }
177
178  private static Operation valueFrom(final Scan scan) {
179    if (scan == null) {
180      return null;
181    }
182    return Operation.SCAN;
183  }
184
185  private static Operation valueFrom(final Row row) {
186    if (row == null) {
187      return null;
188    }
189    if (row instanceof Append) {
190      return Operation.APPEND;
191    }
192    if (row instanceof CheckAndMutate) {
193      return Operation.CHECK_AND_MUTATE;
194    }
195    if (row instanceof Delete) {
196      return Operation.DELETE;
197    }
198    if (row instanceof Get) {
199      return Operation.GET;
200    }
201    if (row instanceof Increment) {
202      return Operation.INCREMENT;
203    }
204    if (row instanceof Put) {
205      return Operation.PUT;
206    }
207    if (row instanceof RegionCoprocessorServiceExec) {
208      return Operation.COPROC_EXEC;
209    }
210    if (row instanceof RowMutations) {
211      return Operation.BATCH;
212    }
213    return null;
214  }
215}