001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.trace.hamcrest.AttributesMatchers.containsEntryWithStringValuesOf; 021import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasAttributes; 022import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded; 023import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasKind; 024import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName; 025import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode; 026import static org.apache.hadoop.hbase.client.trace.hamcrest.TraceTestUtil.buildConnectionAttributesMatcher; 027import static org.apache.hadoop.hbase.client.trace.hamcrest.TraceTestUtil.buildTableAttributesMatcher; 028import static org.hamcrest.MatcherAssert.assertThat; 029import static org.hamcrest.Matchers.allOf; 030import static org.hamcrest.Matchers.containsString; 031import static org.hamcrest.Matchers.greaterThan; 032import static org.hamcrest.Matchers.greaterThanOrEqualTo; 033import static org.hamcrest.Matchers.hasItem; 034import static org.hamcrest.Matchers.hasSize; 035import static org.junit.Assert.fail; 036import static org.mockito.ArgumentMatchers.any; 037import static org.mockito.ArgumentMatchers.anyInt; 038import static org.mockito.ArgumentMatchers.anyLong; 039import static org.mockito.Mockito.doAnswer; 040import static org.mockito.Mockito.mock; 041 042import io.opentelemetry.api.trace.SpanKind; 043import io.opentelemetry.api.trace.StatusCode; 044import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule; 045import io.opentelemetry.sdk.trace.data.SpanData; 046import java.io.IOException; 047import java.util.Arrays; 048import java.util.List; 049import java.util.concurrent.CompletableFuture; 050import java.util.concurrent.CountDownLatch; 051import java.util.concurrent.ForkJoinPool; 052import java.util.concurrent.atomic.AtomicInteger; 053import java.util.concurrent.atomic.AtomicReference; 054import java.util.stream.Collectors; 055import org.apache.hadoop.conf.Configuration; 056import org.apache.hadoop.hbase.Cell; 057import org.apache.hadoop.hbase.CellBuilderFactory; 058import org.apache.hadoop.hbase.CellBuilderType; 059import org.apache.hadoop.hbase.HBaseClassTestRule; 060import org.apache.hadoop.hbase.HBaseConfiguration; 061import org.apache.hadoop.hbase.HRegionLocation; 062import org.apache.hadoop.hbase.MatcherPredicate; 063import org.apache.hadoop.hbase.ServerName; 064import org.apache.hadoop.hbase.TableName; 065import org.apache.hadoop.hbase.Waiter; 066import org.apache.hadoop.hbase.filter.PrefixFilter; 067import org.apache.hadoop.hbase.ipc.HBaseRpcController; 068import org.apache.hadoop.hbase.security.User; 069import org.apache.hadoop.hbase.security.UserProvider; 070import org.apache.hadoop.hbase.testclassification.ClientTests; 071import org.apache.hadoop.hbase.testclassification.MediumTests; 072import org.apache.hadoop.hbase.util.Bytes; 073import org.hamcrest.Matcher; 074import org.hamcrest.core.IsAnything; 075import org.junit.After; 076import org.junit.Before; 077import org.junit.ClassRule; 078import org.junit.Rule; 079import org.junit.Test; 080import org.junit.experimental.categories.Category; 081import org.mockito.invocation.InvocationOnMock; 082import org.mockito.stubbing.Answer; 083 084import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 085import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; 086 087import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 088import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 089import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 090import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest; 091import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse; 092import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; 093import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse; 094import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto; 095import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.ColumnValue; 096import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.ColumnValue.QualifierValue; 097import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult; 098import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException; 099import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; 100import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; 101 102@Category({ ClientTests.class, MediumTests.class }) 103public class TestAsyncTableTracing { 104 105 @ClassRule 106 public static final HBaseClassTestRule CLASS_RULE = 107 HBaseClassTestRule.forClass(TestAsyncTableTracing.class); 108 109 private static Configuration CONF = HBaseConfiguration.create(); 110 111 private ClientService.Interface stub; 112 113 private AsyncConnectionImpl conn; 114 115 private AsyncTable<ScanResultConsumer> table; 116 117 @Rule 118 public OpenTelemetryRule traceRule = OpenTelemetryRule.create(); 119 120 @Before 121 public void setUp() throws IOException { 122 stub = mock(ClientService.Interface.class); 123 AtomicInteger scanNextCalled = new AtomicInteger(0); 124 doAnswer(new Answer<Void>() { 125 126 @Override 127 public Void answer(InvocationOnMock invocation) throws Throwable { 128 ScanRequest req = invocation.getArgument(1); 129 RpcCallback<ScanResponse> done = invocation.getArgument(2); 130 if (!req.hasScannerId()) { 131 done.run(ScanResponse.newBuilder().setScannerId(1).setTtl(800) 132 .setMoreResultsInRegion(true).setMoreResults(true).build()); 133 } else { 134 if (req.hasCloseScanner() && req.getCloseScanner()) { 135 done.run(ScanResponse.getDefaultInstance()); 136 } else { 137 Cell cell = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY) 138 .setType(Cell.Type.Put).setRow(Bytes.toBytes(scanNextCalled.incrementAndGet())) 139 .setFamily(Bytes.toBytes("cf")).setQualifier(Bytes.toBytes("cq")) 140 .setValue(Bytes.toBytes("v")).build(); 141 Result result = Result.create(Arrays.asList(cell)); 142 ScanResponse.Builder builder = ScanResponse.newBuilder().setScannerId(1).setTtl(800) 143 .addResults(ProtobufUtil.toResult(result)); 144 if (req.getLimitOfRows() == 1) { 145 builder.setMoreResultsInRegion(false).setMoreResults(false); 146 } else { 147 builder.setMoreResultsInRegion(true).setMoreResults(true); 148 } 149 ForkJoinPool.commonPool().execute(() -> done.run(builder.build())); 150 } 151 } 152 return null; 153 } 154 }).when(stub).scan(any(HBaseRpcController.class), any(ScanRequest.class), any()); 155 doAnswer(new Answer<Void>() { 156 157 @Override 158 public Void answer(InvocationOnMock invocation) throws Throwable { 159 ClientProtos.MultiRequest req = invocation.getArgument(1); 160 ClientProtos.MultiResponse.Builder builder = ClientProtos.MultiResponse.newBuilder(); 161 for (ClientProtos.RegionAction regionAction : req.getRegionActionList()) { 162 RegionActionResult.Builder raBuilder = RegionActionResult.newBuilder(); 163 for (ClientProtos.Action ignored : regionAction.getActionList()) { 164 raBuilder.addResultOrException( 165 ResultOrException.newBuilder().setResult(ProtobufUtil.toResult(new Result()))); 166 } 167 builder.addRegionActionResult(raBuilder); 168 } 169 ClientProtos.MultiResponse resp = builder.build(); 170 RpcCallback<ClientProtos.MultiResponse> done = invocation.getArgument(2); 171 ForkJoinPool.commonPool().execute(() -> done.run(resp)); 172 return null; 173 } 174 }).when(stub).multi(any(HBaseRpcController.class), any(ClientProtos.MultiRequest.class), any()); 175 doAnswer(new Answer<Void>() { 176 177 @Override 178 public Void answer(InvocationOnMock invocation) throws Throwable { 179 MutationProto req = ((MutateRequest) invocation.getArgument(1)).getMutation(); 180 MutateResponse resp; 181 switch (req.getMutateType()) { 182 case INCREMENT: 183 ColumnValue value = req.getColumnValue(0); 184 QualifierValue qvalue = value.getQualifierValue(0); 185 Cell cell = 186 CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setType(Cell.Type.Put) 187 .setRow(req.getRow().toByteArray()).setFamily(value.getFamily().toByteArray()) 188 .setQualifier(qvalue.getQualifier().toByteArray()) 189 .setValue(qvalue.getValue().toByteArray()).build(); 190 resp = MutateResponse.newBuilder() 191 .setResult(ProtobufUtil.toResult(Result.create(Arrays.asList(cell)))).build(); 192 break; 193 default: 194 resp = MutateResponse.getDefaultInstance(); 195 break; 196 } 197 RpcCallback<MutateResponse> done = invocation.getArgument(2); 198 ForkJoinPool.commonPool().execute(() -> done.run(resp)); 199 return null; 200 } 201 }).when(stub).mutate(any(HBaseRpcController.class), any(MutateRequest.class), any()); 202 doAnswer(new Answer<Void>() { 203 204 @Override 205 public Void answer(InvocationOnMock invocation) throws Throwable { 206 RpcCallback<GetResponse> done = invocation.getArgument(2); 207 ForkJoinPool.commonPool().execute(() -> done.run(GetResponse.getDefaultInstance())); 208 return null; 209 } 210 }).when(stub).get(any(HBaseRpcController.class), any(GetRequest.class), any()); 211 final User user = UserProvider.instantiate(CONF).getCurrent(); 212 conn = new AsyncConnectionImpl(CONF, new DoNothingConnectionRegistry(CONF), "test", user) { 213 214 @Override 215 AsyncRegionLocator getLocator() { 216 AsyncRegionLocator locator = mock(AsyncRegionLocator.class); 217 Answer<CompletableFuture<HRegionLocation>> answer = 218 new Answer<CompletableFuture<HRegionLocation>>() { 219 220 @Override 221 public CompletableFuture<HRegionLocation> answer(InvocationOnMock invocation) 222 throws Throwable { 223 TableName tableName = invocation.getArgument(0); 224 RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build(); 225 ServerName serverName = ServerName.valueOf("rs", 16010, 12345); 226 HRegionLocation loc = new HRegionLocation(info, serverName); 227 return CompletableFuture.completedFuture(loc); 228 } 229 }; 230 doAnswer(answer).when(locator).getRegionLocation(any(TableName.class), any(byte[].class), 231 any(RegionLocateType.class), anyLong()); 232 doAnswer(answer).when(locator).getRegionLocation(any(TableName.class), any(byte[].class), 233 anyInt(), any(RegionLocateType.class), anyLong()); 234 return locator; 235 } 236 237 @Override 238 ClientService.Interface getRegionServerStub(ServerName serverName) throws IOException { 239 return stub; 240 } 241 }; 242 table = conn.getTable(TableName.valueOf("table"), ForkJoinPool.commonPool()); 243 } 244 245 @After 246 public void tearDown() throws IOException { 247 Closeables.close(conn, true); 248 } 249 250 private void assertTrace(String tableOperation) { 251 assertTrace(tableOperation, new IsAnything<>()); 252 } 253 254 private void assertTrace(String tableOperation, Matcher<SpanData> matcher) { 255 // n.b. this method implementation must match the one of the same name found in 256 // TestHTableTracing 257 final TableName tableName = table.getName(); 258 final Matcher<SpanData> spanLocator = 259 allOf(hasName(containsString(tableOperation)), hasEnded()); 260 final String expectedName = tableOperation + " " + tableName.getNameWithNamespaceInclAsString(); 261 262 Waiter.waitFor(CONF, 1000, new MatcherPredicate<>("waiting for span to emit", 263 () -> traceRule.getSpans(), hasItem(spanLocator))); 264 List<SpanData> candidateSpans = 265 traceRule.getSpans().stream().filter(spanLocator::matches).collect(Collectors.toList()); 266 assertThat(candidateSpans, hasSize(1)); 267 SpanData data = candidateSpans.iterator().next(); 268 assertThat(data, 269 allOf(hasName(expectedName), hasKind(SpanKind.CLIENT), hasStatusWithCode(StatusCode.OK), 270 buildConnectionAttributesMatcher(conn), buildTableAttributesMatcher(tableName), matcher)); 271 } 272 273 @Test 274 public void testExists() { 275 table.exists(new Get(Bytes.toBytes(0))).join(); 276 assertTrace("GET"); 277 } 278 279 @Test 280 public void testGet() { 281 table.get(new Get(Bytes.toBytes(0))).join(); 282 assertTrace("GET"); 283 } 284 285 @Test 286 public void testPut() { 287 table.put(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), 288 Bytes.toBytes("v"))).join(); 289 assertTrace("PUT"); 290 } 291 292 @Test 293 public void testDelete() { 294 table.delete(new Delete(Bytes.toBytes(0))).join(); 295 assertTrace("DELETE"); 296 } 297 298 @Test 299 public void testAppend() { 300 table.append(new Append(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), 301 Bytes.toBytes("v"))).join(); 302 assertTrace("APPEND"); 303 } 304 305 @Test 306 public void testIncrement() { 307 table 308 .increment( 309 new Increment(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1)) 310 .join(); 311 assertTrace("INCREMENT"); 312 } 313 314 @Test 315 public void testIncrementColumnValue1() { 316 table.incrementColumnValue(Bytes.toBytes(0), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1) 317 .join(); 318 assertTrace("INCREMENT"); 319 } 320 321 @Test 322 public void testIncrementColumnValue2() { 323 table.incrementColumnValue(Bytes.toBytes(0), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1, 324 Durability.ASYNC_WAL).join(); 325 assertTrace("INCREMENT"); 326 } 327 328 @Test 329 public void testCheckAndMutate() { 330 table.checkAndMutate(CheckAndMutate.newBuilder(Bytes.toBytes(0)) 331 .ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")) 332 .build(new Delete(Bytes.toBytes(0)))).join(); 333 assertTrace("CHECK_AND_MUTATE"); 334 } 335 336 @Test 337 public void testCheckAndMutateList() { 338 CompletableFuture 339 .allOf(table.checkAndMutate(Arrays.asList(CheckAndMutate.newBuilder(Bytes.toBytes(0)) 340 .ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")) 341 .build(new Delete(Bytes.toBytes(0))))).toArray(new CompletableFuture[0])) 342 .join(); 343 assertTrace("BATCH", 344 hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", 345 "CHECK_AND_MUTATE", "DELETE"))); 346 } 347 348 @Test 349 public void testCheckAndMutateAll() { 350 table.checkAndMutateAll(Arrays.asList(CheckAndMutate.newBuilder(Bytes.toBytes(0)) 351 .ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")) 352 .build(new Delete(Bytes.toBytes(0))))).join(); 353 assertTrace("BATCH", 354 hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", 355 "CHECK_AND_MUTATE", "DELETE"))); 356 } 357 358 private void testCheckAndMutateBuilder(Row op) { 359 AsyncTable.CheckAndMutateBuilder builder = 360 table.checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")).qualifier(Bytes.toBytes("cq")) 361 .ifEquals(Bytes.toBytes("v")); 362 if (op instanceof Put) { 363 Put put = (Put) op; 364 builder.thenPut(put).join(); 365 } else if (op instanceof Delete) { 366 Delete delete = (Delete) op; 367 builder.thenDelete(delete).join(); 368 } else if (op instanceof RowMutations) { 369 RowMutations mutations = (RowMutations) op; 370 builder.thenMutate(mutations).join(); 371 } else { 372 fail("unsupported CheckAndPut operation " + op); 373 } 374 assertTrace("CHECK_AND_MUTATE"); 375 } 376 377 @Test 378 public void testCheckAndMutateBuilderThenPut() { 379 Put put = new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("f"), Bytes.toBytes("cq"), 380 Bytes.toBytes("v")); 381 testCheckAndMutateBuilder(put); 382 } 383 384 @Test 385 public void testCheckAndMutateBuilderThenDelete() { 386 testCheckAndMutateBuilder(new Delete(Bytes.toBytes(0))); 387 } 388 389 @Test 390 public void testCheckAndMutateBuilderThenMutations() throws IOException { 391 RowMutations mutations = new RowMutations(Bytes.toBytes(0)) 392 .add((Mutation) new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("f"), Bytes.toBytes("cq"), 393 Bytes.toBytes("v"))) 394 .add((Mutation) new Delete(Bytes.toBytes(0))); 395 testCheckAndMutateBuilder(mutations); 396 } 397 398 private void testCheckAndMutateWithFilterBuilder(Row op) { 399 // use of `PrefixFilter` is completely arbitrary here. 400 AsyncTable.CheckAndMutateWithFilterBuilder builder = 401 table.checkAndMutate(Bytes.toBytes(0), new PrefixFilter(Bytes.toBytes(0))); 402 if (op instanceof Put) { 403 Put put = (Put) op; 404 builder.thenPut(put).join(); 405 } else if (op instanceof Delete) { 406 Delete delete = (Delete) op; 407 builder.thenDelete(delete).join(); 408 } else if (op instanceof RowMutations) { 409 RowMutations mutations = (RowMutations) op; 410 builder.thenMutate(mutations).join(); 411 } else { 412 fail("unsupported CheckAndPut operation " + op); 413 } 414 assertTrace("CHECK_AND_MUTATE"); 415 } 416 417 @Test 418 public void testCheckAndMutateWithFilterBuilderThenPut() { 419 Put put = new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("f"), Bytes.toBytes("cq"), 420 Bytes.toBytes("v")); 421 testCheckAndMutateWithFilterBuilder(put); 422 } 423 424 @Test 425 public void testCheckAndMutateWithFilterBuilderThenDelete() { 426 testCheckAndMutateWithFilterBuilder(new Delete(Bytes.toBytes(0))); 427 } 428 429 @Test 430 public void testCheckAndMutateWithFilterBuilderThenMutations() throws IOException { 431 RowMutations mutations = new RowMutations(Bytes.toBytes(0)) 432 .add((Mutation) new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("f"), Bytes.toBytes("cq"), 433 Bytes.toBytes("v"))) 434 .add((Mutation) new Delete(Bytes.toBytes(0))); 435 testCheckAndMutateWithFilterBuilder(mutations); 436 } 437 438 @Test 439 public void testMutateRow() throws IOException { 440 final RowMutations mutations = new RowMutations(Bytes.toBytes(0)) 441 .add((Mutation) new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), 442 Bytes.toBytes("v"))) 443 .add((Mutation) new Delete(Bytes.toBytes(0))); 444 table.mutateRow(mutations).join(); 445 assertTrace("BATCH", hasAttributes( 446 containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE", "PUT"))); 447 } 448 449 @Test 450 public void testScanAll() { 451 table.scanAll(new Scan().setCaching(1).setMaxResultSize(1).setLimit(1)).join(); 452 assertTrace("SCAN"); 453 } 454 455 @Test 456 public void testScan() throws Throwable { 457 final CountDownLatch doneSignal = new CountDownLatch(1); 458 final AtomicInteger count = new AtomicInteger(); 459 final AtomicReference<Throwable> throwable = new AtomicReference<>(); 460 final Scan scan = new Scan().setCaching(1).setMaxResultSize(1).setLimit(1); 461 table.scan(scan, new ScanResultConsumer() { 462 @Override 463 public boolean onNext(Result result) { 464 if (result.getRow() != null) { 465 count.incrementAndGet(); 466 } 467 return true; 468 } 469 470 @Override 471 public void onError(Throwable error) { 472 throwable.set(error); 473 doneSignal.countDown(); 474 } 475 476 @Override 477 public void onComplete() { 478 doneSignal.countDown(); 479 } 480 }); 481 doneSignal.await(); 482 if (throwable.get() != null) { 483 throw throwable.get(); 484 } 485 assertThat("user code did not run. check test setup.", count.get(), greaterThan(0)); 486 assertTrace("SCAN"); 487 } 488 489 @Test 490 public void testGetScanner() { 491 final Scan scan = new Scan().setCaching(1).setMaxResultSize(1).setLimit(1); 492 try (ResultScanner scanner = table.getScanner(scan)) { 493 int count = 0; 494 for (Result result : scanner) { 495 if (result.getRow() != null) { 496 count++; 497 } 498 } 499 // do something with it. 500 assertThat(count, greaterThanOrEqualTo(0)); 501 } 502 assertTrace("SCAN"); 503 } 504 505 @Test 506 public void testExistsList() { 507 CompletableFuture 508 .allOf( 509 table.exists(Arrays.asList(new Get(Bytes.toBytes(0)))).toArray(new CompletableFuture[0])) 510 .join(); 511 assertTrace("BATCH", 512 hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "GET"))); 513 } 514 515 @Test 516 public void testExistsAll() { 517 table.existsAll(Arrays.asList(new Get(Bytes.toBytes(0)))).join(); 518 assertTrace("BATCH", 519 hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "GET"))); 520 } 521 522 @Test 523 public void testGetList() { 524 CompletableFuture 525 .allOf(table.get(Arrays.asList(new Get(Bytes.toBytes(0)))).toArray(new CompletableFuture[0])) 526 .join(); 527 assertTrace("BATCH", 528 hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "GET"))); 529 } 530 531 @Test 532 public void testGetAll() { 533 table.getAll(Arrays.asList(new Get(Bytes.toBytes(0)))).join(); 534 assertTrace("BATCH", 535 hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "GET"))); 536 } 537 538 @Test 539 public void testPutList() { 540 CompletableFuture 541 .allOf(table.put(Arrays.asList(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), 542 Bytes.toBytes("cq"), Bytes.toBytes("v")))).toArray(new CompletableFuture[0])) 543 .join(); 544 assertTrace("BATCH", 545 hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "PUT"))); 546 } 547 548 @Test 549 public void testPutAll() { 550 table.putAll(Arrays.asList(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), 551 Bytes.toBytes("cq"), Bytes.toBytes("v")))).join(); 552 assertTrace("BATCH", 553 hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "PUT"))); 554 } 555 556 @Test 557 public void testDeleteList() { 558 CompletableFuture 559 .allOf( 560 table.delete(Arrays.asList(new Delete(Bytes.toBytes(0)))).toArray(new CompletableFuture[0])) 561 .join(); 562 assertTrace("BATCH", 563 hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE"))); 564 } 565 566 @Test 567 public void testDeleteAll() { 568 table.deleteAll(Arrays.asList(new Delete(Bytes.toBytes(0)))).join(); 569 assertTrace("BATCH", 570 hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE"))); 571 } 572 573 @Test 574 public void testBatch() { 575 CompletableFuture 576 .allOf( 577 table.batch(Arrays.asList(new Delete(Bytes.toBytes(0)))).toArray(new CompletableFuture[0])) 578 .join(); 579 assertTrace("BATCH", 580 hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE"))); 581 } 582 583 @Test 584 public void testBatchAll() { 585 table.batchAll(Arrays.asList(new Delete(Bytes.toBytes(0)))).join(); 586 assertTrace("BATCH", 587 hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE"))); 588 } 589}