001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client.trace; 019 020import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.CONTAINER_DB_OPERATIONS_KEY; 021import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.DB_OPERATION; 022 023import io.opentelemetry.api.common.AttributeKey; 024import io.opentelemetry.api.trace.Span; 025import io.opentelemetry.api.trace.SpanBuilder; 026import io.opentelemetry.api.trace.SpanKind; 027import java.util.Arrays; 028import java.util.Collection; 029import java.util.HashMap; 030import java.util.HashSet; 031import java.util.List; 032import java.util.Map; 033import java.util.Set; 034import java.util.function.Supplier; 035import java.util.stream.Collectors; 036import java.util.stream.Stream; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.client.Append; 039import org.apache.hadoop.hbase.client.AsyncConnectionImpl; 040import org.apache.hadoop.hbase.client.CheckAndMutate; 041import org.apache.hadoop.hbase.client.ClusterConnection; 042import org.apache.hadoop.hbase.client.Delete; 043import org.apache.hadoop.hbase.client.Get; 044import org.apache.hadoop.hbase.client.Increment; 045import org.apache.hadoop.hbase.client.Put; 046import org.apache.hadoop.hbase.client.RegionCoprocessorServiceExec; 047import org.apache.hadoop.hbase.client.Row; 048import org.apache.hadoop.hbase.client.RowMutations; 049import org.apache.hadoop.hbase.client.Scan; 050import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.Operation; 051import org.apache.hadoop.hbase.trace.TraceUtil; 052import org.apache.yetus.audience.InterfaceAudience; 053 054/** 055 * Construct {@link Span} instances originating from "table operations" -- the verbs in our public 056 * API that interact with data in tables. 057 */ 058@InterfaceAudience.Private 059public class TableOperationSpanBuilder implements Supplier<Span> { 060 061 // n.b. The results of this class are tested implicitly by way of the likes of 062 // `TestAsyncTableTracing` and friends. 063 064 private static final String unknown = "UNKNOWN"; 065 066 private TableName tableName; 067 private final Map<AttributeKey<?>, Object> attributes = new HashMap<>(); 068 069 public TableOperationSpanBuilder(final ClusterConnection conn) { 070 ConnectionSpanBuilder.populateConnectionAttributes(attributes, conn); 071 } 072 073 public TableOperationSpanBuilder(final AsyncConnectionImpl conn) { 074 ConnectionSpanBuilder.populateConnectionAttributes(attributes, conn); 075 } 076 077 @Override 078 public Span get() { 079 return build(); 080 } 081 082 public TableOperationSpanBuilder setOperation(final Scan scan) { 083 return setOperation(valueFrom(scan)); 084 } 085 086 public TableOperationSpanBuilder setOperation(final Row row) { 087 return setOperation(valueFrom(row)); 088 } 089 090 @SuppressWarnings("unused") 091 public TableOperationSpanBuilder setOperation(final Collection<? extends Row> operations) { 092 return setOperation(Operation.BATCH); 093 } 094 095 public TableOperationSpanBuilder setOperation(final Operation operation) { 096 attributes.put(DB_OPERATION, operation.name()); 097 return this; 098 } 099 100 // `setContainerOperations` perform a recursive descent expansion of all the operations 101 // contained within the provided "batch" object. 102 103 public TableOperationSpanBuilder setContainerOperations(final RowMutations mutations) { 104 final Operation[] ops = mutations.getMutations().stream() 105 .flatMap(row -> Stream.concat(Stream.of(valueFrom(row)), unpackRowOperations(row).stream())) 106 .toArray(Operation[]::new); 107 return setContainerOperations(ops); 108 } 109 110 public TableOperationSpanBuilder setContainerOperations(final Row row) { 111 final Operation[] ops = 112 Stream.concat(Stream.of(valueFrom(row)), unpackRowOperations(row).stream()) 113 .toArray(Operation[]::new); 114 return setContainerOperations(ops); 115 } 116 117 public TableOperationSpanBuilder 118 setContainerOperations(final Collection<? extends Row> operations) { 119 final Operation[] ops = operations.stream() 120 .flatMap(row -> Stream.concat(Stream.of(valueFrom(row)), unpackRowOperations(row).stream())) 121 .toArray(Operation[]::new); 122 return setContainerOperations(ops); 123 } 124 125 private static Set<Operation> unpackRowOperations(final Row row) { 126 final Set<Operation> ops = new HashSet<>(); 127 if (row instanceof CheckAndMutate) { 128 final CheckAndMutate cam = (CheckAndMutate) row; 129 ops.addAll(unpackRowOperations(cam)); 130 } 131 if (row instanceof RowMutations) { 132 final RowMutations mutations = (RowMutations) row; 133 final List<Operation> operations = mutations.getMutations().stream() 134 .map(TableOperationSpanBuilder::valueFrom).collect(Collectors.toList()); 135 ops.addAll(operations); 136 } 137 return ops; 138 } 139 140 private static Set<Operation> unpackRowOperations(final CheckAndMutate cam) { 141 final Set<Operation> ops = new HashSet<>(); 142 final Operation op = valueFrom(cam.getAction()); 143 switch (op) { 144 case BATCH: 145 case CHECK_AND_MUTATE: 146 ops.addAll(unpackRowOperations(cam.getAction())); 147 break; 148 default: 149 ops.add(op); 150 } 151 return ops; 152 } 153 154 public TableOperationSpanBuilder setContainerOperations(final Operation... operations) { 155 final List<String> ops = Arrays.stream(operations).map(op -> op == null ? unknown : op.name()) 156 .sorted().distinct().collect(Collectors.toList()); 157 attributes.put(CONTAINER_DB_OPERATIONS_KEY, ops); 158 return this; 159 } 160 161 public TableOperationSpanBuilder setTableName(final TableName tableName) { 162 this.tableName = tableName; 163 TableSpanBuilder.populateTableNameAttributes(attributes, tableName); 164 return this; 165 } 166 167 @SuppressWarnings("unchecked") 168 public Span build() { 169 final String name = attributes.getOrDefault(DB_OPERATION, unknown) + " " 170 + (tableName != null ? tableName.getNameWithNamespaceInclAsString() : unknown); 171 final SpanBuilder builder = TraceUtil.getGlobalTracer().spanBuilder(name) 172 // TODO: what about clients embedded in Master/RegionServer/Gateways/&c? 173 .setSpanKind(SpanKind.CLIENT); 174 attributes.forEach((k, v) -> builder.setAttribute((AttributeKey<? super Object>) k, v)); 175 return builder.startSpan(); 176 } 177 178 private static Operation valueFrom(final Scan scan) { 179 if (scan == null) { 180 return null; 181 } 182 return Operation.SCAN; 183 } 184 185 private static Operation valueFrom(final Row row) { 186 if (row == null) { 187 return null; 188 } 189 if (row instanceof Append) { 190 return Operation.APPEND; 191 } 192 if (row instanceof CheckAndMutate) { 193 return Operation.CHECK_AND_MUTATE; 194 } 195 if (row instanceof Delete) { 196 return Operation.DELETE; 197 } 198 if (row instanceof Get) { 199 return Operation.GET; 200 } 201 if (row instanceof Increment) { 202 return Operation.INCREMENT; 203 } 204 if (row instanceof Put) { 205 return Operation.PUT; 206 } 207 if (row instanceof RegionCoprocessorServiceExec) { 208 return Operation.COPROC_EXEC; 209 } 210 if (row instanceof RowMutations) { 211 return Operation.BATCH; 212 } 213 return null; 214 } 215}