001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.trace.hamcrest.AttributesMatchers.containsEntryWithStringValuesOf; 021import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasAttributes; 022import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded; 023import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasKind; 024import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName; 025import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode; 026import static org.apache.hadoop.hbase.client.trace.hamcrest.TraceTestUtil.buildConnectionAttributesMatcher; 027import static org.apache.hadoop.hbase.client.trace.hamcrest.TraceTestUtil.buildTableAttributesMatcher; 028import static org.hamcrest.MatcherAssert.assertThat; 029import static org.hamcrest.Matchers.allOf; 030import static org.hamcrest.Matchers.containsString; 031import static org.hamcrest.Matchers.greaterThan; 032import static org.hamcrest.Matchers.greaterThanOrEqualTo; 033import static org.hamcrest.Matchers.hasItem; 034import static org.hamcrest.Matchers.hasSize; 035import static org.junit.jupiter.api.Assertions.fail; 036import static org.mockito.ArgumentMatchers.any; 037import static org.mockito.ArgumentMatchers.anyInt; 038import static org.mockito.ArgumentMatchers.anyLong; 039import static org.mockito.Mockito.doAnswer; 040import static org.mockito.Mockito.mock; 041 042import io.opentelemetry.api.trace.SpanKind; 043import io.opentelemetry.api.trace.StatusCode; 044import io.opentelemetry.sdk.testing.junit5.OpenTelemetryExtension; 045import io.opentelemetry.sdk.trace.data.SpanData; 046import java.io.IOException; 047import java.util.Arrays; 048import java.util.List; 049import java.util.concurrent.CompletableFuture; 050import java.util.concurrent.CountDownLatch; 051import java.util.concurrent.ForkJoinPool; 052import java.util.concurrent.atomic.AtomicInteger; 053import java.util.concurrent.atomic.AtomicReference; 054import java.util.stream.Collectors; 055import org.apache.hadoop.conf.Configuration; 056import org.apache.hadoop.hbase.Cell; 057import org.apache.hadoop.hbase.CellBuilderFactory; 058import org.apache.hadoop.hbase.CellBuilderType; 059import org.apache.hadoop.hbase.HBaseConfiguration; 060import org.apache.hadoop.hbase.HRegionLocation; 061import org.apache.hadoop.hbase.MatcherPredicate; 062import org.apache.hadoop.hbase.ServerName; 063import org.apache.hadoop.hbase.TableName; 064import org.apache.hadoop.hbase.Waiter; 065import org.apache.hadoop.hbase.filter.PrefixFilter; 066import org.apache.hadoop.hbase.ipc.HBaseRpcController; 067import org.apache.hadoop.hbase.security.User; 068import org.apache.hadoop.hbase.security.UserProvider; 069import org.apache.hadoop.hbase.testclassification.ClientTests; 070import org.apache.hadoop.hbase.testclassification.MediumTests; 071import org.apache.hadoop.hbase.util.Bytes; 072import org.hamcrest.Matcher; 073import org.hamcrest.core.IsAnything; 074import org.junit.jupiter.api.AfterEach; 075import org.junit.jupiter.api.BeforeEach; 076import org.junit.jupiter.api.Tag; 077import org.junit.jupiter.api.Test; 078import org.junit.jupiter.api.extension.RegisterExtension; 079import org.mockito.invocation.InvocationOnMock; 080import org.mockito.stubbing.Answer; 081 082import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 083import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; 084 085import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 086import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 087import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 088import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest; 089import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse; 090import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; 091import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse; 092import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto; 093import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.ColumnValue; 094import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.ColumnValue.QualifierValue; 095import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult; 096import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException; 097import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; 098import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; 099 100@Tag(ClientTests.TAG) 101@Tag(MediumTests.TAG) 102public class TestAsyncTableTracing { 103 104 private static Configuration CONF = HBaseConfiguration.create(); 105 106 private ClientService.Interface stub; 107 108 private AsyncConnectionImpl conn; 109 110 private AsyncTable<ScanResultConsumer> table; 111 112 @RegisterExtension 113 public static final OpenTelemetryExtension traceRule = OpenTelemetryExtension.create(); 114 115 @BeforeEach 116 public void setUp() throws IOException { 117 stub = mock(ClientService.Interface.class); 118 AtomicInteger scanNextCalled = new AtomicInteger(0); 119 doAnswer(new Answer<Void>() { 120 121 @Override 122 public Void answer(InvocationOnMock invocation) throws Throwable { 123 ScanRequest req = invocation.getArgument(1); 124 RpcCallback<ScanResponse> done = invocation.getArgument(2); 125 if (!req.hasScannerId()) { 126 done.run(ScanResponse.newBuilder().setScannerId(1).setTtl(800) 127 .setMoreResultsInRegion(true).setMoreResults(true).build()); 128 } else { 129 if (req.hasCloseScanner() && req.getCloseScanner()) { 130 done.run(ScanResponse.getDefaultInstance()); 131 } else { 132 Cell cell = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY) 133 .setType(Cell.Type.Put).setRow(Bytes.toBytes(scanNextCalled.incrementAndGet())) 134 .setFamily(Bytes.toBytes("cf")).setQualifier(Bytes.toBytes("cq")) 135 .setValue(Bytes.toBytes("v")).build(); 136 Result result = Result.create(Arrays.asList(cell)); 137 ScanResponse.Builder builder = ScanResponse.newBuilder().setScannerId(1).setTtl(800) 138 .addResults(ProtobufUtil.toResult(result)); 139 if (req.getLimitOfRows() == 1) { 140 builder.setMoreResultsInRegion(false).setMoreResults(false); 141 } else { 142 builder.setMoreResultsInRegion(true).setMoreResults(true); 143 } 144 ForkJoinPool.commonPool().execute(() -> done.run(builder.build())); 145 } 146 } 147 return null; 148 } 149 }).when(stub).scan(any(HBaseRpcController.class), any(ScanRequest.class), any()); 150 doAnswer(new Answer<Void>() { 151 152 @Override 153 public Void answer(InvocationOnMock invocation) throws Throwable { 154 ClientProtos.MultiRequest req = invocation.getArgument(1); 155 ClientProtos.MultiResponse.Builder builder = ClientProtos.MultiResponse.newBuilder(); 156 for (ClientProtos.RegionAction regionAction : req.getRegionActionList()) { 157 RegionActionResult.Builder raBuilder = RegionActionResult.newBuilder(); 158 for (ClientProtos.Action ignored : regionAction.getActionList()) { 159 raBuilder.addResultOrException( 160 ResultOrException.newBuilder().setResult(ProtobufUtil.toResult(new Result()))); 161 } 162 builder.addRegionActionResult(raBuilder); 163 } 164 ClientProtos.MultiResponse resp = builder.build(); 165 RpcCallback<ClientProtos.MultiResponse> done = invocation.getArgument(2); 166 ForkJoinPool.commonPool().execute(() -> done.run(resp)); 167 return null; 168 } 169 }).when(stub).multi(any(HBaseRpcController.class), any(ClientProtos.MultiRequest.class), any()); 170 doAnswer(new Answer<Void>() { 171 172 @Override 173 public Void answer(InvocationOnMock invocation) throws Throwable { 174 MutationProto req = ((MutateRequest) invocation.getArgument(1)).getMutation(); 175 MutateResponse resp; 176 switch (req.getMutateType()) { 177 case INCREMENT: 178 ColumnValue value = req.getColumnValue(0); 179 QualifierValue qvalue = value.getQualifierValue(0); 180 Cell cell = 181 CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setType(Cell.Type.Put) 182 .setRow(req.getRow().toByteArray()).setFamily(value.getFamily().toByteArray()) 183 .setQualifier(qvalue.getQualifier().toByteArray()) 184 .setValue(qvalue.getValue().toByteArray()).build(); 185 resp = MutateResponse.newBuilder() 186 .setResult(ProtobufUtil.toResult(Result.create(Arrays.asList(cell)))).build(); 187 break; 188 default: 189 resp = MutateResponse.getDefaultInstance(); 190 break; 191 } 192 RpcCallback<MutateResponse> done = invocation.getArgument(2); 193 ForkJoinPool.commonPool().execute(() -> done.run(resp)); 194 return null; 195 } 196 }).when(stub).mutate(any(HBaseRpcController.class), any(MutateRequest.class), any()); 197 doAnswer(new Answer<Void>() { 198 199 @Override 200 public Void answer(InvocationOnMock invocation) throws Throwable { 201 RpcCallback<GetResponse> done = invocation.getArgument(2); 202 ForkJoinPool.commonPool().execute(() -> done.run(GetResponse.getDefaultInstance())); 203 return null; 204 } 205 }).when(stub).get(any(HBaseRpcController.class), any(GetRequest.class), any()); 206 final User user = UserProvider.instantiate(CONF).getCurrent(); 207 conn = new AsyncConnectionImpl(CONF, new DoNothingConnectionRegistry(CONF, user), "test", null, 208 user) { 209 210 @Override 211 AsyncRegionLocator getLocator() { 212 AsyncRegionLocator locator = mock(AsyncRegionLocator.class); 213 Answer<CompletableFuture<HRegionLocation>> answer = 214 new Answer<CompletableFuture<HRegionLocation>>() { 215 216 @Override 217 public CompletableFuture<HRegionLocation> answer(InvocationOnMock invocation) 218 throws Throwable { 219 TableName tableName = invocation.getArgument(0); 220 RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build(); 221 ServerName serverName = ServerName.valueOf("rs", 16010, 12345); 222 HRegionLocation loc = new HRegionLocation(info, serverName); 223 return CompletableFuture.completedFuture(loc); 224 } 225 }; 226 doAnswer(answer).when(locator).getRegionLocation(any(TableName.class), any(byte[].class), 227 any(RegionLocateType.class), anyLong()); 228 doAnswer(answer).when(locator).getRegionLocation(any(TableName.class), any(byte[].class), 229 anyInt(), any(RegionLocateType.class), anyLong()); 230 return locator; 231 } 232 233 @Override 234 ClientService.Interface getRegionServerStub(ServerName serverName) throws IOException { 235 return stub; 236 } 237 }; 238 table = conn.getTable(TableName.valueOf("table"), ForkJoinPool.commonPool()); 239 } 240 241 @AfterEach 242 public void tearDown() throws IOException { 243 Closeables.close(conn, true); 244 } 245 246 private void assertTrace(String tableOperation) { 247 assertTrace(tableOperation, new IsAnything<>()); 248 } 249 250 private void assertTrace(String tableOperation, Matcher<SpanData> matcher) { 251 final TableName tableName = table.getName(); 252 final Matcher<SpanData> spanLocator = 253 allOf(hasName(containsString(tableOperation)), hasEnded()); 254 final String expectedName = tableOperation + " " + tableName.getNameWithNamespaceInclAsString(); 255 256 Waiter.waitFor(CONF, 1000, new MatcherPredicate<>("waiting for span to emit", 257 () -> traceRule.getSpans(), hasItem(spanLocator))); 258 List<SpanData> candidateSpans = 259 traceRule.getSpans().stream().filter(spanLocator::matches).collect(Collectors.toList()); 260 assertThat(candidateSpans, hasSize(1)); 261 SpanData data = candidateSpans.iterator().next(); 262 assertThat(data, 263 allOf(hasName(expectedName), hasKind(SpanKind.CLIENT), hasStatusWithCode(StatusCode.OK), 264 buildConnectionAttributesMatcher(conn), buildTableAttributesMatcher(tableName), matcher)); 265 } 266 267 @Test 268 public void testExists() { 269 table.exists(new Get(Bytes.toBytes(0))).join(); 270 assertTrace("GET"); 271 } 272 273 @Test 274 public void testGet() { 275 table.get(new Get(Bytes.toBytes(0))).join(); 276 assertTrace("GET"); 277 } 278 279 @Test 280 public void testPut() { 281 table.put(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), 282 Bytes.toBytes("v"))).join(); 283 assertTrace("PUT"); 284 } 285 286 @Test 287 public void testDelete() { 288 table.delete(new Delete(Bytes.toBytes(0))).join(); 289 assertTrace("DELETE"); 290 } 291 292 @Test 293 public void testAppend() { 294 table.append(new Append(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), 295 Bytes.toBytes("v"))).join(); 296 assertTrace("APPEND"); 297 } 298 299 @Test 300 public void testIncrement() { 301 table 302 .increment( 303 new Increment(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1)) 304 .join(); 305 assertTrace("INCREMENT"); 306 } 307 308 @Test 309 public void testIncrementColumnValue1() { 310 table.incrementColumnValue(Bytes.toBytes(0), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1) 311 .join(); 312 assertTrace("INCREMENT"); 313 } 314 315 @Test 316 public void testIncrementColumnValue2() { 317 table.incrementColumnValue(Bytes.toBytes(0), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1, 318 Durability.ASYNC_WAL).join(); 319 assertTrace("INCREMENT"); 320 } 321 322 @Test 323 public void testCheckAndMutate() { 324 table.checkAndMutate(CheckAndMutate.newBuilder(Bytes.toBytes(0)) 325 .ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")) 326 .build(new Delete(Bytes.toBytes(0)))).join(); 327 assertTrace("CHECK_AND_MUTATE"); 328 } 329 330 @Test 331 public void testCheckAndMutateList() { 332 CompletableFuture 333 .allOf(table.checkAndMutate(Arrays.asList(CheckAndMutate.newBuilder(Bytes.toBytes(0)) 334 .ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")) 335 .build(new Delete(Bytes.toBytes(0))))).toArray(new CompletableFuture[0])) 336 .join(); 337 assertTrace("BATCH", 338 hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", 339 "CHECK_AND_MUTATE", "DELETE"))); 340 } 341 342 @Test 343 public void testCheckAndMutateAll() { 344 table.checkAndMutateAll(Arrays.asList(CheckAndMutate.newBuilder(Bytes.toBytes(0)) 345 .ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")) 346 .build(new Delete(Bytes.toBytes(0))))).join(); 347 assertTrace("BATCH", 348 hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", 349 "CHECK_AND_MUTATE", "DELETE"))); 350 } 351 352 private void testCheckAndMutateBuilder(Row op) { 353 AsyncTable.CheckAndMutateBuilder builder = 354 table.checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")).qualifier(Bytes.toBytes("cq")) 355 .ifEquals(Bytes.toBytes("v")); 356 if (op instanceof Put) { 357 Put put = (Put) op; 358 builder.thenPut(put).join(); 359 } else if (op instanceof Delete) { 360 Delete delete = (Delete) op; 361 builder.thenDelete(delete).join(); 362 } else if (op instanceof RowMutations) { 363 RowMutations mutations = (RowMutations) op; 364 builder.thenMutate(mutations).join(); 365 } else { 366 fail("unsupported CheckAndPut operation " + op); 367 } 368 assertTrace("CHECK_AND_MUTATE"); 369 } 370 371 @Test 372 public void testCheckAndMutateBuilderThenPut() { 373 Put put = new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("f"), Bytes.toBytes("cq"), 374 Bytes.toBytes("v")); 375 testCheckAndMutateBuilder(put); 376 } 377 378 @Test 379 public void testCheckAndMutateBuilderThenDelete() { 380 testCheckAndMutateBuilder(new Delete(Bytes.toBytes(0))); 381 } 382 383 @Test 384 public void testCheckAndMutateBuilderThenMutations() throws IOException { 385 RowMutations mutations = 386 new RowMutations(Bytes.toBytes(0)).add(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("f"), 387 Bytes.toBytes("cq"), Bytes.toBytes("v"))).add(new Delete(Bytes.toBytes(0))); 388 testCheckAndMutateBuilder(mutations); 389 } 390 391 private void testCheckAndMutateWithFilterBuilder(Row op) { 392 // use of `PrefixFilter` is completely arbitrary here. 393 AsyncTable.CheckAndMutateWithFilterBuilder builder = 394 table.checkAndMutate(Bytes.toBytes(0), new PrefixFilter(Bytes.toBytes(0))); 395 if (op instanceof Put) { 396 Put put = (Put) op; 397 builder.thenPut(put).join(); 398 } else if (op instanceof Delete) { 399 Delete delete = (Delete) op; 400 builder.thenDelete(delete).join(); 401 } else if (op instanceof RowMutations) { 402 RowMutations mutations = (RowMutations) op; 403 builder.thenMutate(mutations).join(); 404 } else { 405 fail("unsupported CheckAndPut operation " + op); 406 } 407 assertTrace("CHECK_AND_MUTATE"); 408 } 409 410 @Test 411 public void testCheckAndMutateWithFilterBuilderThenPut() { 412 Put put = new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("f"), Bytes.toBytes("cq"), 413 Bytes.toBytes("v")); 414 testCheckAndMutateWithFilterBuilder(put); 415 } 416 417 @Test 418 public void testCheckAndMutateWithFilterBuilderThenDelete() { 419 testCheckAndMutateWithFilterBuilder(new Delete(Bytes.toBytes(0))); 420 } 421 422 @Test 423 public void testCheckAndMutateWithFilterBuilderThenMutations() throws IOException { 424 RowMutations mutations = 425 new RowMutations(Bytes.toBytes(0)).add(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("f"), 426 Bytes.toBytes("cq"), Bytes.toBytes("v"))).add(new Delete(Bytes.toBytes(0))); 427 testCheckAndMutateWithFilterBuilder(mutations); 428 } 429 430 @Test 431 public void testMutateRow() throws IOException { 432 final RowMutations mutations = new RowMutations(Bytes.toBytes(0)).add(new Put(Bytes.toBytes(0)) 433 .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))) 434 .add(new Delete(Bytes.toBytes(0))); 435 table.mutateRow(mutations).join(); 436 assertTrace("BATCH", hasAttributes( 437 containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE", "PUT"))); 438 } 439 440 @Test 441 public void testScanAll() { 442 table.scanAll(new Scan().setCaching(1).setMaxResultSize(1).setLimit(1)).join(); 443 assertTrace("SCAN"); 444 } 445 446 @Test 447 public void testScan() throws Throwable { 448 final CountDownLatch doneSignal = new CountDownLatch(1); 449 final AtomicInteger count = new AtomicInteger(); 450 final AtomicReference<Throwable> throwable = new AtomicReference<>(); 451 final Scan scan = new Scan().setCaching(1).setMaxResultSize(1).setLimit(1); 452 table.scan(scan, new ScanResultConsumer() { 453 @Override 454 public boolean onNext(Result result) { 455 if (result.getRow() != null) { 456 count.incrementAndGet(); 457 } 458 return true; 459 } 460 461 @Override 462 public void onError(Throwable error) { 463 throwable.set(error); 464 doneSignal.countDown(); 465 } 466 467 @Override 468 public void onComplete() { 469 doneSignal.countDown(); 470 } 471 }); 472 doneSignal.await(); 473 if (throwable.get() != null) { 474 throw throwable.get(); 475 } 476 assertThat("user code did not run. check test setup.", count.get(), greaterThan(0)); 477 assertTrace("SCAN"); 478 } 479 480 @Test 481 public void testGetScanner() { 482 final Scan scan = new Scan().setCaching(1).setMaxResultSize(1).setLimit(1); 483 try (ResultScanner scanner = table.getScanner(scan)) { 484 int count = 0; 485 for (Result result : scanner) { 486 if (result.getRow() != null) { 487 count++; 488 } 489 } 490 // do something with it. 491 assertThat(count, greaterThanOrEqualTo(0)); 492 } 493 assertTrace("SCAN"); 494 } 495 496 @Test 497 public void testExistsList() { 498 CompletableFuture 499 .allOf( 500 table.exists(Arrays.asList(new Get(Bytes.toBytes(0)))).toArray(new CompletableFuture[0])) 501 .join(); 502 assertTrace("BATCH", 503 hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "GET"))); 504 } 505 506 @Test 507 public void testExistsAll() { 508 table.existsAll(Arrays.asList(new Get(Bytes.toBytes(0)))).join(); 509 assertTrace("BATCH", 510 hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "GET"))); 511 } 512 513 @Test 514 public void testGetList() { 515 CompletableFuture 516 .allOf(table.get(Arrays.asList(new Get(Bytes.toBytes(0)))).toArray(new CompletableFuture[0])) 517 .join(); 518 assertTrace("BATCH", 519 hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "GET"))); 520 } 521 522 @Test 523 public void testGetAll() { 524 table.getAll(Arrays.asList(new Get(Bytes.toBytes(0)))).join(); 525 assertTrace("BATCH", 526 hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "GET"))); 527 } 528 529 @Test 530 public void testPutList() { 531 CompletableFuture 532 .allOf(table.put(Arrays.asList(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), 533 Bytes.toBytes("cq"), Bytes.toBytes("v")))).toArray(new CompletableFuture[0])) 534 .join(); 535 assertTrace("BATCH", 536 hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "PUT"))); 537 } 538 539 @Test 540 public void testPutAll() { 541 table.putAll(Arrays.asList(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), 542 Bytes.toBytes("cq"), Bytes.toBytes("v")))).join(); 543 assertTrace("BATCH", 544 hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "PUT"))); 545 } 546 547 @Test 548 public void testDeleteList() { 549 CompletableFuture 550 .allOf( 551 table.delete(Arrays.asList(new Delete(Bytes.toBytes(0)))).toArray(new CompletableFuture[0])) 552 .join(); 553 assertTrace("BATCH", 554 hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE"))); 555 } 556 557 @Test 558 public void testDeleteAll() { 559 table.deleteAll(Arrays.asList(new Delete(Bytes.toBytes(0)))).join(); 560 assertTrace("BATCH", 561 hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE"))); 562 } 563 564 @Test 565 public void testBatch() { 566 CompletableFuture 567 .allOf( 568 table.batch(Arrays.asList(new Delete(Bytes.toBytes(0)))).toArray(new CompletableFuture[0])) 569 .join(); 570 assertTrace("BATCH", 571 hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE"))); 572 } 573 574 @Test 575 public void testBatchAll() { 576 table.batchAll(Arrays.asList(new Delete(Bytes.toBytes(0)))).join(); 577 assertTrace("BATCH", 578 hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE"))); 579 } 580}