001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client.trace; 019 020import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.CONTAINER_DB_OPERATIONS_KEY; 021import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.DB_OPERATION; 022 023import io.opentelemetry.api.common.AttributeKey; 024import io.opentelemetry.api.trace.Span; 025import io.opentelemetry.api.trace.SpanBuilder; 026import io.opentelemetry.api.trace.SpanKind; 027import java.util.Arrays; 028import java.util.Collection; 029import java.util.HashMap; 030import java.util.HashSet; 031import java.util.List; 032import java.util.Map; 033import java.util.Set; 034import java.util.function.Supplier; 035import java.util.stream.Collectors; 036import java.util.stream.Stream; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.client.Append; 039import org.apache.hadoop.hbase.client.AsyncConnectionImpl; 040import org.apache.hadoop.hbase.client.CheckAndMutate; 041import org.apache.hadoop.hbase.client.Delete; 042import org.apache.hadoop.hbase.client.Get; 043import org.apache.hadoop.hbase.client.Increment; 044import org.apache.hadoop.hbase.client.Put; 045import org.apache.hadoop.hbase.client.RegionCoprocessorServiceExec; 046import org.apache.hadoop.hbase.client.Row; 047import org.apache.hadoop.hbase.client.RowMutations; 048import org.apache.hadoop.hbase.client.Scan; 049import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.Operation; 050import org.apache.hadoop.hbase.trace.TraceUtil; 051import org.apache.yetus.audience.InterfaceAudience; 052 053/** 054 * Construct {@link Span} instances originating from "table operations" -- the verbs in our public 055 * API that interact with data in tables. 056 */ 057@InterfaceAudience.Private 058public class TableOperationSpanBuilder implements Supplier<Span> { 059 060 // n.b. The results of this class are tested implicitly by way of the likes of 061 // `TestAsyncTableTracing` and friends. 062 063 private static final String unknown = "UNKNOWN"; 064 065 private TableName tableName; 066 private final Map<AttributeKey<?>, Object> attributes = new HashMap<>(); 067 068 public TableOperationSpanBuilder(final AsyncConnectionImpl conn) { 069 ConnectionSpanBuilder.populateConnectionAttributes(attributes, conn); 070 } 071 072 @Override 073 public Span get() { 074 return build(); 075 } 076 077 public TableOperationSpanBuilder setOperation(final Scan scan) { 078 return setOperation(valueFrom(scan)); 079 } 080 081 public TableOperationSpanBuilder setOperation(final Row row) { 082 return setOperation(valueFrom(row)); 083 } 084 085 @SuppressWarnings("unused") 086 public TableOperationSpanBuilder setOperation(final Collection<? extends Row> operations) { 087 return setOperation(Operation.BATCH); 088 } 089 090 public TableOperationSpanBuilder setOperation(final Operation operation) { 091 attributes.put(DB_OPERATION, operation.name()); 092 return this; 093 } 094 095 // `setContainerOperations` perform a recursive descent expansion of all the operations 096 // contained within the provided "batch" object. 097 098 public TableOperationSpanBuilder setContainerOperations(final RowMutations mutations) { 099 final Operation[] ops = mutations.getMutations().stream() 100 .flatMap(row -> Stream.concat(Stream.of(valueFrom(row)), unpackRowOperations(row).stream())) 101 .toArray(Operation[]::new); 102 return setContainerOperations(ops); 103 } 104 105 public TableOperationSpanBuilder setContainerOperations(final Row row) { 106 final Operation[] ops = 107 Stream.concat(Stream.of(valueFrom(row)), unpackRowOperations(row).stream()) 108 .toArray(Operation[]::new); 109 return setContainerOperations(ops); 110 } 111 112 public TableOperationSpanBuilder 113 setContainerOperations(final Collection<? extends Row> operations) { 114 final Operation[] ops = operations.stream() 115 .flatMap(row -> Stream.concat(Stream.of(valueFrom(row)), unpackRowOperations(row).stream())) 116 .toArray(Operation[]::new); 117 return setContainerOperations(ops); 118 } 119 120 private static Set<Operation> unpackRowOperations(final Row row) { 121 final Set<Operation> ops = new HashSet<>(); 122 if (row instanceof CheckAndMutate) { 123 final CheckAndMutate cam = (CheckAndMutate) row; 124 ops.addAll(unpackRowOperations(cam)); 125 } 126 if (row instanceof RowMutations) { 127 final RowMutations mutations = (RowMutations) row; 128 final List<Operation> operations = mutations.getMutations().stream() 129 .map(TableOperationSpanBuilder::valueFrom).collect(Collectors.toList()); 130 ops.addAll(operations); 131 } 132 return ops; 133 } 134 135 private static Set<Operation> unpackRowOperations(final CheckAndMutate cam) { 136 final Set<Operation> ops = new HashSet<>(); 137 final Operation op = valueFrom(cam.getAction()); 138 switch (op) { 139 case BATCH: 140 case CHECK_AND_MUTATE: 141 ops.addAll(unpackRowOperations(cam.getAction())); 142 break; 143 default: 144 ops.add(op); 145 } 146 return ops; 147 } 148 149 public TableOperationSpanBuilder setContainerOperations(final Operation... operations) { 150 final List<String> ops = Arrays.stream(operations).map(op -> op == null ? unknown : op.name()) 151 .sorted().distinct().collect(Collectors.toList()); 152 attributes.put(CONTAINER_DB_OPERATIONS_KEY, ops); 153 return this; 154 } 155 156 public TableOperationSpanBuilder setTableName(final TableName tableName) { 157 this.tableName = tableName; 158 TableSpanBuilder.populateTableNameAttributes(attributes, tableName); 159 return this; 160 } 161 162 @SuppressWarnings("unchecked") 163 public Span build() { 164 final String name = attributes.getOrDefault(DB_OPERATION, unknown) + " " 165 + (tableName != null ? tableName.getNameWithNamespaceInclAsString() : unknown); 166 final SpanBuilder builder = TraceUtil.getGlobalTracer().spanBuilder(name) 167 // TODO: what about clients embedded in Master/RegionServer/Gateways/&c? 168 .setSpanKind(SpanKind.CLIENT); 169 attributes.forEach((k, v) -> builder.setAttribute((AttributeKey<? super Object>) k, v)); 170 return builder.startSpan(); 171 } 172 173 private static Operation valueFrom(final Scan scan) { 174 if (scan == null) { 175 return null; 176 } 177 return Operation.SCAN; 178 } 179 180 private static Operation valueFrom(final Row row) { 181 if (row == null) { 182 return null; 183 } 184 if (row instanceof Append) { 185 return Operation.APPEND; 186 } 187 if (row instanceof CheckAndMutate) { 188 return Operation.CHECK_AND_MUTATE; 189 } 190 if (row instanceof Delete) { 191 return Operation.DELETE; 192 } 193 if (row instanceof Get) { 194 return Operation.GET; 195 } 196 if (row instanceof Increment) { 197 return Operation.INCREMENT; 198 } 199 if (row instanceof Put) { 200 return Operation.PUT; 201 } 202 if (row instanceof RegionCoprocessorServiceExec) { 203 return Operation.COPROC_EXEC; 204 } 205 if (row instanceof RowMutations) { 206 return Operation.BATCH; 207 } 208 return null; 209 } 210}