001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client.trace;
019
020import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.CONTAINER_DB_OPERATIONS_KEY;
021import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.DB_OPERATION;
022
023import io.opentelemetry.api.common.AttributeKey;
024import io.opentelemetry.api.trace.Span;
025import io.opentelemetry.api.trace.SpanBuilder;
026import io.opentelemetry.api.trace.SpanKind;
027import java.util.Arrays;
028import java.util.Collection;
029import java.util.HashMap;
030import java.util.HashSet;
031import java.util.List;
032import java.util.Map;
033import java.util.Set;
034import java.util.function.Supplier;
035import java.util.stream.Collectors;
036import java.util.stream.Stream;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.client.Append;
039import org.apache.hadoop.hbase.client.AsyncConnectionImpl;
040import org.apache.hadoop.hbase.client.CheckAndMutate;
041import org.apache.hadoop.hbase.client.Delete;
042import org.apache.hadoop.hbase.client.Get;
043import org.apache.hadoop.hbase.client.Increment;
044import org.apache.hadoop.hbase.client.Put;
045import org.apache.hadoop.hbase.client.RegionCoprocessorServiceExec;
046import org.apache.hadoop.hbase.client.Row;
047import org.apache.hadoop.hbase.client.RowMutations;
048import org.apache.hadoop.hbase.client.Scan;
049import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.Operation;
050import org.apache.hadoop.hbase.trace.TraceUtil;
051import org.apache.yetus.audience.InterfaceAudience;
052
053/**
054 * Construct {@link Span} instances originating from "table operations" -- the verbs in our public
055 * API that interact with data in tables.
056 */
057@InterfaceAudience.Private
058public class TableOperationSpanBuilder implements Supplier<Span> {
059
060  // n.b. The results of this class are tested implicitly by way of the likes of
061  // `TestAsyncTableTracing` and friends.
062
063  private static final String unknown = "UNKNOWN";
064
065  private TableName tableName;
066  private final Map<AttributeKey<?>, Object> attributes = new HashMap<>();
067
068  public TableOperationSpanBuilder(final AsyncConnectionImpl conn) {
069    ConnectionSpanBuilder.populateConnectionAttributes(attributes, conn);
070  }
071
072  @Override
073  public Span get() {
074    return build();
075  }
076
077  public TableOperationSpanBuilder setOperation(final Scan scan) {
078    return setOperation(valueFrom(scan));
079  }
080
081  public TableOperationSpanBuilder setOperation(final Row row) {
082    return setOperation(valueFrom(row));
083  }
084
085  @SuppressWarnings("unused")
086  public TableOperationSpanBuilder setOperation(final Collection<? extends Row> operations) {
087    return setOperation(Operation.BATCH);
088  }
089
090  public TableOperationSpanBuilder setOperation(final Operation operation) {
091    attributes.put(DB_OPERATION, operation.name());
092    return this;
093  }
094
095  // `setContainerOperations` perform a recursive descent expansion of all the operations
096  // contained within the provided "batch" object.
097
098  public TableOperationSpanBuilder setContainerOperations(final RowMutations mutations) {
099    final Operation[] ops = mutations.getMutations().stream()
100      .flatMap(row -> Stream.concat(Stream.of(valueFrom(row)), unpackRowOperations(row).stream()))
101      .toArray(Operation[]::new);
102    return setContainerOperations(ops);
103  }
104
105  public TableOperationSpanBuilder setContainerOperations(final Row row) {
106    final Operation[] ops =
107      Stream.concat(Stream.of(valueFrom(row)), unpackRowOperations(row).stream())
108        .toArray(Operation[]::new);
109    return setContainerOperations(ops);
110  }
111
112  public TableOperationSpanBuilder
113    setContainerOperations(final Collection<? extends Row> operations) {
114    final Operation[] ops = operations.stream()
115      .flatMap(row -> Stream.concat(Stream.of(valueFrom(row)), unpackRowOperations(row).stream()))
116      .toArray(Operation[]::new);
117    return setContainerOperations(ops);
118  }
119
120  private static Set<Operation> unpackRowOperations(final Row row) {
121    final Set<Operation> ops = new HashSet<>();
122    if (row instanceof CheckAndMutate) {
123      final CheckAndMutate cam = (CheckAndMutate) row;
124      ops.addAll(unpackRowOperations(cam));
125    }
126    if (row instanceof RowMutations) {
127      final RowMutations mutations = (RowMutations) row;
128      final List<Operation> operations = mutations.getMutations().stream()
129        .map(TableOperationSpanBuilder::valueFrom).collect(Collectors.toList());
130      ops.addAll(operations);
131    }
132    return ops;
133  }
134
135  private static Set<Operation> unpackRowOperations(final CheckAndMutate cam) {
136    final Set<Operation> ops = new HashSet<>();
137    final Operation op = valueFrom(cam.getAction());
138    switch (op) {
139      case BATCH:
140      case CHECK_AND_MUTATE:
141        ops.addAll(unpackRowOperations(cam.getAction()));
142        break;
143      default:
144        ops.add(op);
145    }
146    return ops;
147  }
148
149  public TableOperationSpanBuilder setContainerOperations(final Operation... operations) {
150    final List<String> ops = Arrays.stream(operations).map(op -> op == null ? unknown : op.name())
151      .sorted().distinct().collect(Collectors.toList());
152    attributes.put(CONTAINER_DB_OPERATIONS_KEY, ops);
153    return this;
154  }
155
156  public TableOperationSpanBuilder setTableName(final TableName tableName) {
157    this.tableName = tableName;
158    TableSpanBuilder.populateTableNameAttributes(attributes, tableName);
159    return this;
160  }
161
162  @SuppressWarnings("unchecked")
163  public Span build() {
164    final String name = attributes.getOrDefault(DB_OPERATION, unknown) + " "
165      + (tableName != null ? tableName.getNameWithNamespaceInclAsString() : unknown);
166    final SpanBuilder builder = TraceUtil.getGlobalTracer().spanBuilder(name)
167      // TODO: what about clients embedded in Master/RegionServer/Gateways/&c?
168      .setSpanKind(SpanKind.CLIENT);
169    attributes.forEach((k, v) -> builder.setAttribute((AttributeKey<? super Object>) k, v));
170    return builder.startSpan();
171  }
172
173  private static Operation valueFrom(final Scan scan) {
174    if (scan == null) {
175      return null;
176    }
177    return Operation.SCAN;
178  }
179
180  private static Operation valueFrom(final Row row) {
181    if (row == null) {
182      return null;
183    }
184    if (row instanceof Append) {
185      return Operation.APPEND;
186    }
187    if (row instanceof CheckAndMutate) {
188      return Operation.CHECK_AND_MUTATE;
189    }
190    if (row instanceof Delete) {
191      return Operation.DELETE;
192    }
193    if (row instanceof Get) {
194      return Operation.GET;
195    }
196    if (row instanceof Increment) {
197      return Operation.INCREMENT;
198    }
199    if (row instanceof Put) {
200      return Operation.PUT;
201    }
202    if (row instanceof RegionCoprocessorServiceExec) {
203      return Operation.COPROC_EXEC;
204    }
205    if (row instanceof RowMutations) {
206      return Operation.BATCH;
207    }
208    return null;
209  }
210}