001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.trace.hamcrest.AttributesMatchers.containsEntryWithStringValuesOf; 021import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasAttributes; 022import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded; 023import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasKind; 024import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName; 025import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode; 026import static org.apache.hadoop.hbase.client.trace.hamcrest.TraceTestUtil.buildConnectionAttributesMatcher; 027import static org.apache.hadoop.hbase.client.trace.hamcrest.TraceTestUtil.buildTableAttributesMatcher; 028import static org.hamcrest.MatcherAssert.assertThat; 029import static org.hamcrest.Matchers.allOf; 030import static org.hamcrest.Matchers.containsString; 031import static org.hamcrest.Matchers.hasItem; 032import static org.hamcrest.Matchers.hasSize; 033import static org.mockito.ArgumentMatchers.any; 034import static org.mockito.ArgumentMatchers.anyBoolean; 035import static org.mockito.ArgumentMatchers.anyInt; 036import static org.mockito.Mockito.doAnswer; 037import static org.mockito.Mockito.doNothing; 038import static org.mockito.Mockito.doReturn; 039import static org.mockito.Mockito.mock; 040import static org.mockito.Mockito.spy; 041 042import io.opentelemetry.api.trace.SpanKind; 043import io.opentelemetry.api.trace.StatusCode; 044import io.opentelemetry.sdk.trace.data.SpanData; 045import java.io.IOException; 046import java.util.Arrays; 047import java.util.List; 048import java.util.concurrent.ForkJoinPool; 049import java.util.concurrent.atomic.AtomicInteger; 050import java.util.stream.Collectors; 051import org.apache.hadoop.hbase.Cell; 052import org.apache.hadoop.hbase.CellBuilderFactory; 053import org.apache.hadoop.hbase.CellBuilderType; 054import org.apache.hadoop.hbase.HBaseClassTestRule; 055import org.apache.hadoop.hbase.HRegionLocation; 056import org.apache.hadoop.hbase.MatcherPredicate; 057import org.apache.hadoop.hbase.ServerName; 058import org.apache.hadoop.hbase.TableName; 059import org.apache.hadoop.hbase.Waiter; 060import org.apache.hadoop.hbase.ipc.HBaseRpcController; 061import org.apache.hadoop.hbase.security.UserProvider; 062import org.apache.hadoop.hbase.testclassification.ClientTests; 063import org.apache.hadoop.hbase.testclassification.MediumTests; 064import org.apache.hadoop.hbase.util.Bytes; 065import org.hamcrest.Matcher; 066import org.hamcrest.core.IsAnything; 067import org.junit.After; 068import org.junit.Before; 069import org.junit.ClassRule; 070import org.junit.Test; 071import org.junit.experimental.categories.Category; 072import org.mockito.invocation.InvocationOnMock; 073import org.mockito.stubbing.Answer; 074 075import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 076import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 077 078import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 079import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 080import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 081import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest; 082import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse; 083import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse; 084import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; 085import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse; 086import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto; 087import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.ColumnValue; 088import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.ColumnValue.QualifierValue; 089import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult; 090import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException; 091import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; 092import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; 093 094@Category({ ClientTests.class, MediumTests.class }) 095public class TestHTableTracing extends TestTracingBase { 096 @ClassRule 097 public static final HBaseClassTestRule CLASS_RULE = 098 HBaseClassTestRule.forClass(TestHTableTracing.class); 099 100 private ClientProtos.ClientService.BlockingInterface stub; 101 private ConnectionImplementation conn; 102 private Table table; 103 104 @Override 105 @Before 106 public void setUp() throws Exception { 107 super.setUp(); 108 109 stub = mock(ClientService.BlockingInterface.class); 110 111 AtomicInteger scanNextCalled = new AtomicInteger(0); 112 113 doAnswer(new Answer<ScanResponse>() { 114 @Override 115 public ScanResponse answer(InvocationOnMock invocation) throws Throwable { 116 ScanRequest req = invocation.getArgument(1); 117 if (!req.hasScannerId()) { 118 return ScanResponse.newBuilder().setScannerId(1).setTtl(800).setMoreResultsInRegion(true) 119 .setMoreResults(true).build(); 120 } else { 121 if (req.hasCloseScanner() && req.getCloseScanner()) { 122 return ScanResponse.getDefaultInstance(); 123 } else { 124 Cell cell = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY) 125 .setType(Cell.Type.Put).setRow(Bytes.toBytes(scanNextCalled.incrementAndGet())) 126 .setFamily(Bytes.toBytes("cf")).setQualifier(Bytes.toBytes("cq")) 127 .setValue(Bytes.toBytes("v")).build(); 128 Result result = Result.create(Arrays.asList(cell)); 129 ScanResponse.Builder builder = ScanResponse.newBuilder().setScannerId(1).setTtl(800) 130 .addResults(ProtobufUtil.toResult(result)); 131 if (req.getLimitOfRows() == 1) { 132 builder.setMoreResultsInRegion(false).setMoreResults(false); 133 } else { 134 builder.setMoreResultsInRegion(true).setMoreResults(true); 135 } 136 return builder.build(); 137 } 138 } 139 } 140 }).when(stub).scan(any(HBaseRpcController.class), any(ScanRequest.class)); 141 142 doAnswer(new Answer<MultiResponse>() { 143 @Override 144 public MultiResponse answer(InvocationOnMock invocation) throws Throwable { 145 MultiResponse resp = 146 MultiResponse.newBuilder() 147 .addRegionActionResult(RegionActionResult.newBuilder().addResultOrException( 148 ResultOrException.newBuilder().setResult(ProtobufUtil.toResult(new Result())))) 149 .build(); 150 return resp; 151 } 152 }).when(stub).multi(any(HBaseRpcController.class), any(ClientProtos.MultiRequest.class)); 153 154 doAnswer(new Answer<MutateResponse>() { 155 @Override 156 public MutateResponse answer(InvocationOnMock invocation) throws Throwable { 157 MutationProto req = ((MutateRequest) invocation.getArgument(1)).getMutation(); 158 MutateResponse resp; 159 switch (req.getMutateType()) { 160 case INCREMENT: 161 ColumnValue value = req.getColumnValue(0); 162 QualifierValue qvalue = value.getQualifierValue(0); 163 Cell cell = 164 CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setType(Cell.Type.Put) 165 .setRow(req.getRow().toByteArray()).setFamily(value.getFamily().toByteArray()) 166 .setQualifier(qvalue.getQualifier().toByteArray()) 167 .setValue(qvalue.getValue().toByteArray()).build(); 168 resp = MutateResponse.newBuilder() 169 .setResult(ProtobufUtil.toResult(Result.create(Arrays.asList(cell)))).build(); 170 break; 171 default: 172 resp = MutateResponse.getDefaultInstance(); 173 break; 174 } 175 return resp; 176 } 177 }).when(stub).mutate(any(HBaseRpcController.class), any(MutateRequest.class)); 178 179 doAnswer(new Answer<GetResponse>() { 180 @Override 181 public GetResponse answer(InvocationOnMock invocation) throws Throwable { 182 ClientProtos.Get req = ((GetRequest) invocation.getArgument(1)).getGet(); 183 ColumnValue value = ColumnValue.getDefaultInstance(); 184 QualifierValue qvalue = QualifierValue.getDefaultInstance(); 185 Cell cell = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setType(Cell.Type.Put) 186 .setRow(req.getRow().toByteArray()).setFamily(value.getFamily().toByteArray()) 187 .setQualifier(qvalue.getQualifier().toByteArray()) 188 .setValue(qvalue.getValue().toByteArray()).build(); 189 return GetResponse.newBuilder() 190 .setResult(ProtobufUtil.toResult(Result.create(Arrays.asList(cell), true))).build(); 191 } 192 }).when(stub).get(any(HBaseRpcController.class), any(GetRequest.class)); 193 194 conn = 195 spy(new ConnectionImplementation(conf, null, UserProvider.instantiate(conf).getCurrent()) { 196 @Override 197 public RegionLocator getRegionLocator(TableName tableName) throws IOException { 198 RegionLocator locator = mock(HRegionLocator.class); 199 Answer<HRegionLocation> answer = new Answer<HRegionLocation>() { 200 201 @Override 202 public HRegionLocation answer(InvocationOnMock invocation) throws Throwable { 203 TableName tableName = TableName.META_TABLE_NAME; 204 RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build(); 205 ServerName serverName = MASTER_HOST; 206 HRegionLocation loc = new HRegionLocation(info, serverName); 207 return loc; 208 } 209 }; 210 doAnswer(answer).when(locator).getRegionLocation(any(byte[].class), anyInt(), 211 anyBoolean()); 212 doAnswer(answer).when(locator).getRegionLocation(any(byte[].class)); 213 doAnswer(answer).when(locator).getRegionLocation(any(byte[].class), anyInt()); 214 doAnswer(answer).when(locator).getRegionLocation(any(byte[].class), anyBoolean()); 215 return locator; 216 } 217 218 @Override 219 public ClientService.BlockingInterface getClient(ServerName serverName) throws IOException { 220 return stub; 221 } 222 }); 223 // this setup of AsyncProcess is for MultiResponse 224 AsyncProcess asyncProcess = mock(AsyncProcess.class); 225 AsyncRequestFuture asyncRequestFuture = mock(AsyncRequestFuture.class); 226 doNothing().when(asyncRequestFuture).waitUntilDone(); 227 doReturn(asyncRequestFuture).when(asyncProcess).submit(any()); 228 doReturn(asyncProcess).when(conn).getAsyncProcess(); 229 // setup the table instance 230 table = conn.getTable(TableName.META_TABLE_NAME, ForkJoinPool.commonPool()); 231 } 232 233 @After 234 public void tearDown() throws IOException { 235 Closeables.close(conn, true); 236 } 237 238 private void assertTrace(String tableOperation) { 239 assertTrace(tableOperation, new IsAnything<>()); 240 } 241 242 private void assertTrace(String tableOperation, Matcher<SpanData> matcher) { 243 // n.b. this method implementation must match the one of the same name found in 244 // TestAsyncTableTracing 245 final TableName tableName = table.getName(); 246 final Matcher<SpanData> spanLocator = 247 allOf(hasName(containsString(tableOperation)), hasEnded()); 248 final String expectedName = tableOperation + " " + tableName.getNameWithNamespaceInclAsString(); 249 250 Waiter.waitFor(conf, 1000, new MatcherPredicate<>("waiting for span to emit", 251 () -> TRACE_RULE.getSpans(), hasItem(spanLocator))); 252 List<SpanData> candidateSpans = 253 TRACE_RULE.getSpans().stream().filter(spanLocator::matches).collect(Collectors.toList()); 254 assertThat(candidateSpans, hasSize(1)); 255 SpanData data = candidateSpans.iterator().next(); 256 assertThat(data, 257 allOf(hasName(expectedName), hasKind(SpanKind.CLIENT), hasStatusWithCode(StatusCode.OK), 258 buildConnectionAttributesMatcher(conn), buildTableAttributesMatcher(tableName), matcher)); 259 } 260 261 @Test 262 public void testPut() throws IOException { 263 table.put(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), 264 Bytes.toBytes("v"))); 265 assertTrace("PUT"); 266 } 267 268 @Test 269 public void testExists() throws IOException { 270 table.exists(new Get(Bytes.toBytes(0))); 271 assertTrace("GET"); 272 } 273 274 @Test 275 public void testGet() throws IOException { 276 table.get(new Get(Bytes.toBytes(0))); 277 assertTrace("GET"); 278 } 279 280 @Test 281 public void testDelete() throws IOException { 282 table.delete(new Delete(Bytes.toBytes(0))); 283 assertTrace("DELETE"); 284 } 285 286 @Test 287 public void testAppend() throws IOException { 288 table.append(new Append(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), 289 Bytes.toBytes("v"))); 290 assertTrace("APPEND"); 291 } 292 293 @Test 294 public void testIncrement() throws IOException { 295 table.increment( 296 new Increment(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1)); 297 assertTrace("INCREMENT"); 298 } 299 300 @Test 301 public void testIncrementColumnValue1() throws IOException { 302 table.incrementColumnValue(Bytes.toBytes(0), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1); 303 assertTrace("INCREMENT"); 304 } 305 306 @Test 307 public void testIncrementColumnValue2() throws IOException { 308 table.incrementColumnValue(Bytes.toBytes(0), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1, 309 Durability.SYNC_WAL); 310 assertTrace("INCREMENT"); 311 } 312 313 @Test 314 public void testCheckAndMutate() throws IOException { 315 table.checkAndMutate(CheckAndMutate.newBuilder(Bytes.toBytes(0)) 316 .ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")) 317 .build(new Delete(Bytes.toBytes(0)))); 318 assertTrace("CHECK_AND_MUTATE", 319 hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", 320 "CHECK_AND_MUTATE", "DELETE"))); 321 } 322 323 @Test 324 public void testCheckAndMutateList() throws IOException { 325 table.checkAndMutate(Arrays.asList(CheckAndMutate.newBuilder(Bytes.toBytes(0)) 326 .ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")) 327 .build(new Delete(Bytes.toBytes(0))))); 328 assertTrace("BATCH", 329 hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", 330 "CHECK_AND_MUTATE", "DELETE"))); 331 } 332 333 @Test 334 public void testCheckAndMutateAll() throws IOException { 335 table.checkAndMutate(Arrays.asList(CheckAndMutate.newBuilder(Bytes.toBytes(0)) 336 .ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")) 337 .build(new Delete(Bytes.toBytes(0))))); 338 assertTrace("BATCH", 339 hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", 340 "CHECK_AND_MUTATE", "DELETE"))); 341 } 342 343 @Test 344 public void testMutateRow() throws Exception { 345 byte[] row = Bytes.toBytes(0); 346 table.mutateRow(RowMutations.of(Arrays.asList(new Delete(row)))); 347 assertTrace("BATCH", 348 hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE"))); 349 } 350 351 @Test 352 public void testExistsList() throws IOException { 353 table.exists(Arrays.asList(new Get(Bytes.toBytes(0)))); 354 assertTrace("BATCH", 355 hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "GET"))); 356 } 357 358 @Test 359 public void testExistsAll() throws IOException { 360 table.existsAll(Arrays.asList(new Get(Bytes.toBytes(0)))); 361 assertTrace("BATCH", 362 hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "GET"))); 363 } 364 365 @Test 366 public void testGetList() throws IOException { 367 table.get(Arrays.asList(new Get(Bytes.toBytes(0)))); 368 assertTrace("BATCH", 369 hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "GET"))); 370 } 371 372 @Test 373 public void testPutList() throws IOException { 374 table.put(Arrays.asList(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), 375 Bytes.toBytes("cq"), Bytes.toBytes("v")))); 376 assertTrace("BATCH", 377 hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "PUT"))); 378 } 379 380 @Test 381 public void testDeleteList() throws IOException { 382 table.delete(Lists.newArrayList(new Delete(Bytes.toBytes(0)))); 383 assertTrace("BATCH", 384 hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE"))); 385 } 386 387 @Test 388 public void testBatchList() throws IOException, InterruptedException { 389 table.batch(Arrays.asList(new Delete(Bytes.toBytes(0))), null); 390 assertTrace("BATCH", 391 hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE"))); 392 } 393 394 @Test 395 public void testTableClose() throws IOException { 396 table.close(); 397 assertTrace(HTable.class.getSimpleName(), "close", null, TableName.META_TABLE_NAME); 398 } 399}