001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.trace.hamcrest.AttributesMatchers.containsEntryWithStringValuesOf;
021import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasAttributes;
022import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded;
023import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasKind;
024import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
025import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode;
026import static org.apache.hadoop.hbase.client.trace.hamcrest.TraceTestUtil.buildConnectionAttributesMatcher;
027import static org.apache.hadoop.hbase.client.trace.hamcrest.TraceTestUtil.buildTableAttributesMatcher;
028import static org.hamcrest.MatcherAssert.assertThat;
029import static org.hamcrest.Matchers.allOf;
030import static org.hamcrest.Matchers.containsString;
031import static org.hamcrest.Matchers.greaterThan;
032import static org.hamcrest.Matchers.greaterThanOrEqualTo;
033import static org.hamcrest.Matchers.hasItem;
034import static org.hamcrest.Matchers.hasSize;
035import static org.junit.Assert.fail;
036import static org.mockito.ArgumentMatchers.any;
037import static org.mockito.ArgumentMatchers.anyInt;
038import static org.mockito.ArgumentMatchers.anyLong;
039import static org.mockito.Mockito.doAnswer;
040import static org.mockito.Mockito.mock;
041
042import io.opentelemetry.api.trace.SpanKind;
043import io.opentelemetry.api.trace.StatusCode;
044import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule;
045import io.opentelemetry.sdk.trace.data.SpanData;
046import java.io.IOException;
047import java.util.Arrays;
048import java.util.List;
049import java.util.concurrent.CompletableFuture;
050import java.util.concurrent.CountDownLatch;
051import java.util.concurrent.ForkJoinPool;
052import java.util.concurrent.atomic.AtomicInteger;
053import java.util.concurrent.atomic.AtomicReference;
054import java.util.stream.Collectors;
055import org.apache.hadoop.conf.Configuration;
056import org.apache.hadoop.hbase.Cell;
057import org.apache.hadoop.hbase.CellBuilderFactory;
058import org.apache.hadoop.hbase.CellBuilderType;
059import org.apache.hadoop.hbase.HBaseClassTestRule;
060import org.apache.hadoop.hbase.HBaseConfiguration;
061import org.apache.hadoop.hbase.HRegionLocation;
062import org.apache.hadoop.hbase.MatcherPredicate;
063import org.apache.hadoop.hbase.ServerName;
064import org.apache.hadoop.hbase.TableName;
065import org.apache.hadoop.hbase.Waiter;
066import org.apache.hadoop.hbase.filter.PrefixFilter;
067import org.apache.hadoop.hbase.ipc.HBaseRpcController;
068import org.apache.hadoop.hbase.security.User;
069import org.apache.hadoop.hbase.security.UserProvider;
070import org.apache.hadoop.hbase.testclassification.ClientTests;
071import org.apache.hadoop.hbase.testclassification.MediumTests;
072import org.apache.hadoop.hbase.util.Bytes;
073import org.hamcrest.Matcher;
074import org.hamcrest.core.IsAnything;
075import org.junit.After;
076import org.junit.Before;
077import org.junit.ClassRule;
078import org.junit.Rule;
079import org.junit.Test;
080import org.junit.experimental.categories.Category;
081import org.mockito.invocation.InvocationOnMock;
082import org.mockito.stubbing.Answer;
083
084import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
085import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
086
087import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
088import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
089import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
090import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
091import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse;
092import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
093import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
094import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto;
095import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.ColumnValue;
096import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.ColumnValue.QualifierValue;
097import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult;
098import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException;
099import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
100import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
101
102@Category({ ClientTests.class, MediumTests.class })
103public class TestAsyncTableTracing {
104
105  @ClassRule
106  public static final HBaseClassTestRule CLASS_RULE =
107    HBaseClassTestRule.forClass(TestAsyncTableTracing.class);
108
109  private static Configuration CONF = HBaseConfiguration.create();
110
111  private ClientService.Interface stub;
112
113  private AsyncConnectionImpl conn;
114
115  private AsyncTable<ScanResultConsumer> table;
116
117  @Rule
118  public OpenTelemetryRule traceRule = OpenTelemetryRule.create();
119
120  @Before
121  public void setUp() throws IOException {
122    stub = mock(ClientService.Interface.class);
123    AtomicInteger scanNextCalled = new AtomicInteger(0);
124    doAnswer(new Answer<Void>() {
125
126      @Override
127      public Void answer(InvocationOnMock invocation) throws Throwable {
128        ScanRequest req = invocation.getArgument(1);
129        RpcCallback<ScanResponse> done = invocation.getArgument(2);
130        if (!req.hasScannerId()) {
131          done.run(ScanResponse.newBuilder().setScannerId(1).setTtl(800)
132            .setMoreResultsInRegion(true).setMoreResults(true).build());
133        } else {
134          if (req.hasCloseScanner() && req.getCloseScanner()) {
135            done.run(ScanResponse.getDefaultInstance());
136          } else {
137            Cell cell = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
138              .setType(Cell.Type.Put).setRow(Bytes.toBytes(scanNextCalled.incrementAndGet()))
139              .setFamily(Bytes.toBytes("cf")).setQualifier(Bytes.toBytes("cq"))
140              .setValue(Bytes.toBytes("v")).build();
141            Result result = Result.create(Arrays.asList(cell));
142            ScanResponse.Builder builder = ScanResponse.newBuilder().setScannerId(1).setTtl(800)
143              .addResults(ProtobufUtil.toResult(result));
144            if (req.getLimitOfRows() == 1) {
145              builder.setMoreResultsInRegion(false).setMoreResults(false);
146            } else {
147              builder.setMoreResultsInRegion(true).setMoreResults(true);
148            }
149            ForkJoinPool.commonPool().execute(() -> done.run(builder.build()));
150          }
151        }
152        return null;
153      }
154    }).when(stub).scan(any(HBaseRpcController.class), any(ScanRequest.class), any());
155    doAnswer(new Answer<Void>() {
156
157      @Override
158      public Void answer(InvocationOnMock invocation) throws Throwable {
159        ClientProtos.MultiRequest req = invocation.getArgument(1);
160        ClientProtos.MultiResponse.Builder builder = ClientProtos.MultiResponse.newBuilder();
161        for (ClientProtos.RegionAction regionAction : req.getRegionActionList()) {
162          RegionActionResult.Builder raBuilder = RegionActionResult.newBuilder();
163          for (ClientProtos.Action ignored : regionAction.getActionList()) {
164            raBuilder.addResultOrException(
165              ResultOrException.newBuilder().setResult(ProtobufUtil.toResult(new Result())));
166          }
167          builder.addRegionActionResult(raBuilder);
168        }
169        ClientProtos.MultiResponse resp = builder.build();
170        RpcCallback<ClientProtos.MultiResponse> done = invocation.getArgument(2);
171        ForkJoinPool.commonPool().execute(() -> done.run(resp));
172        return null;
173      }
174    }).when(stub).multi(any(HBaseRpcController.class), any(ClientProtos.MultiRequest.class), any());
175    doAnswer(new Answer<Void>() {
176
177      @Override
178      public Void answer(InvocationOnMock invocation) throws Throwable {
179        MutationProto req = ((MutateRequest) invocation.getArgument(1)).getMutation();
180        MutateResponse resp;
181        switch (req.getMutateType()) {
182          case INCREMENT:
183            ColumnValue value = req.getColumnValue(0);
184            QualifierValue qvalue = value.getQualifierValue(0);
185            Cell cell =
186              CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setType(Cell.Type.Put)
187                .setRow(req.getRow().toByteArray()).setFamily(value.getFamily().toByteArray())
188                .setQualifier(qvalue.getQualifier().toByteArray())
189                .setValue(qvalue.getValue().toByteArray()).build();
190            resp = MutateResponse.newBuilder()
191              .setResult(ProtobufUtil.toResult(Result.create(Arrays.asList(cell)))).build();
192            break;
193          default:
194            resp = MutateResponse.getDefaultInstance();
195            break;
196        }
197        RpcCallback<MutateResponse> done = invocation.getArgument(2);
198        ForkJoinPool.commonPool().execute(() -> done.run(resp));
199        return null;
200      }
201    }).when(stub).mutate(any(HBaseRpcController.class), any(MutateRequest.class), any());
202    doAnswer(new Answer<Void>() {
203
204      @Override
205      public Void answer(InvocationOnMock invocation) throws Throwable {
206        RpcCallback<GetResponse> done = invocation.getArgument(2);
207        ForkJoinPool.commonPool().execute(() -> done.run(GetResponse.getDefaultInstance()));
208        return null;
209      }
210    }).when(stub).get(any(HBaseRpcController.class), any(GetRequest.class), any());
211    final User user = UserProvider.instantiate(CONF).getCurrent();
212    conn = new AsyncConnectionImpl(CONF, new DoNothingConnectionRegistry(CONF, user), "test", null,
213      user) {
214
215      @Override
216      AsyncRegionLocator getLocator() {
217        AsyncRegionLocator locator = mock(AsyncRegionLocator.class);
218        Answer<CompletableFuture<HRegionLocation>> answer =
219          new Answer<CompletableFuture<HRegionLocation>>() {
220
221            @Override
222            public CompletableFuture<HRegionLocation> answer(InvocationOnMock invocation)
223              throws Throwable {
224              TableName tableName = invocation.getArgument(0);
225              RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build();
226              ServerName serverName = ServerName.valueOf("rs", 16010, 12345);
227              HRegionLocation loc = new HRegionLocation(info, serverName);
228              return CompletableFuture.completedFuture(loc);
229            }
230          };
231        doAnswer(answer).when(locator).getRegionLocation(any(TableName.class), any(byte[].class),
232          any(RegionLocateType.class), anyLong());
233        doAnswer(answer).when(locator).getRegionLocation(any(TableName.class), any(byte[].class),
234          anyInt(), any(RegionLocateType.class), anyLong());
235        return locator;
236      }
237
238      @Override
239      ClientService.Interface getRegionServerStub(ServerName serverName) throws IOException {
240        return stub;
241      }
242    };
243    table = conn.getTable(TableName.valueOf("table"), ForkJoinPool.commonPool());
244  }
245
246  @After
247  public void tearDown() throws IOException {
248    Closeables.close(conn, true);
249  }
250
251  private void assertTrace(String tableOperation) {
252    assertTrace(tableOperation, new IsAnything<>());
253  }
254
255  private void assertTrace(String tableOperation, Matcher<SpanData> matcher) {
256    final TableName tableName = table.getName();
257    final Matcher<SpanData> spanLocator =
258      allOf(hasName(containsString(tableOperation)), hasEnded());
259    final String expectedName = tableOperation + " " + tableName.getNameWithNamespaceInclAsString();
260
261    Waiter.waitFor(CONF, 1000, new MatcherPredicate<>("waiting for span to emit",
262      () -> traceRule.getSpans(), hasItem(spanLocator)));
263    List<SpanData> candidateSpans =
264      traceRule.getSpans().stream().filter(spanLocator::matches).collect(Collectors.toList());
265    assertThat(candidateSpans, hasSize(1));
266    SpanData data = candidateSpans.iterator().next();
267    assertThat(data,
268      allOf(hasName(expectedName), hasKind(SpanKind.CLIENT), hasStatusWithCode(StatusCode.OK),
269        buildConnectionAttributesMatcher(conn), buildTableAttributesMatcher(tableName), matcher));
270  }
271
272  @Test
273  public void testExists() {
274    table.exists(new Get(Bytes.toBytes(0))).join();
275    assertTrace("GET");
276  }
277
278  @Test
279  public void testGet() {
280    table.get(new Get(Bytes.toBytes(0))).join();
281    assertTrace("GET");
282  }
283
284  @Test
285  public void testPut() {
286    table.put(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"),
287      Bytes.toBytes("v"))).join();
288    assertTrace("PUT");
289  }
290
291  @Test
292  public void testDelete() {
293    table.delete(new Delete(Bytes.toBytes(0))).join();
294    assertTrace("DELETE");
295  }
296
297  @Test
298  public void testAppend() {
299    table.append(new Append(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"),
300      Bytes.toBytes("v"))).join();
301    assertTrace("APPEND");
302  }
303
304  @Test
305  public void testIncrement() {
306    table
307      .increment(
308        new Increment(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1))
309      .join();
310    assertTrace("INCREMENT");
311  }
312
313  @Test
314  public void testIncrementColumnValue1() {
315    table.incrementColumnValue(Bytes.toBytes(0), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1)
316      .join();
317    assertTrace("INCREMENT");
318  }
319
320  @Test
321  public void testIncrementColumnValue2() {
322    table.incrementColumnValue(Bytes.toBytes(0), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1,
323      Durability.ASYNC_WAL).join();
324    assertTrace("INCREMENT");
325  }
326
327  @Test
328  public void testCheckAndMutate() {
329    table.checkAndMutate(CheckAndMutate.newBuilder(Bytes.toBytes(0))
330      .ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))
331      .build(new Delete(Bytes.toBytes(0)))).join();
332    assertTrace("CHECK_AND_MUTATE");
333  }
334
335  @Test
336  public void testCheckAndMutateList() {
337    CompletableFuture
338      .allOf(table.checkAndMutate(Arrays.asList(CheckAndMutate.newBuilder(Bytes.toBytes(0))
339        .ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))
340        .build(new Delete(Bytes.toBytes(0))))).toArray(new CompletableFuture[0]))
341      .join();
342    assertTrace("BATCH",
343      hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations",
344        "CHECK_AND_MUTATE", "DELETE")));
345  }
346
347  @Test
348  public void testCheckAndMutateAll() {
349    table.checkAndMutateAll(Arrays.asList(CheckAndMutate.newBuilder(Bytes.toBytes(0))
350      .ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))
351      .build(new Delete(Bytes.toBytes(0))))).join();
352    assertTrace("BATCH",
353      hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations",
354        "CHECK_AND_MUTATE", "DELETE")));
355  }
356
357  private void testCheckAndMutateBuilder(Row op) {
358    AsyncTable.CheckAndMutateBuilder builder =
359      table.checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")).qualifier(Bytes.toBytes("cq"))
360        .ifEquals(Bytes.toBytes("v"));
361    if (op instanceof Put) {
362      Put put = (Put) op;
363      builder.thenPut(put).join();
364    } else if (op instanceof Delete) {
365      Delete delete = (Delete) op;
366      builder.thenDelete(delete).join();
367    } else if (op instanceof RowMutations) {
368      RowMutations mutations = (RowMutations) op;
369      builder.thenMutate(mutations).join();
370    } else {
371      fail("unsupported CheckAndPut operation " + op);
372    }
373    assertTrace("CHECK_AND_MUTATE");
374  }
375
376  @Test
377  public void testCheckAndMutateBuilderThenPut() {
378    Put put = new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("f"), Bytes.toBytes("cq"),
379      Bytes.toBytes("v"));
380    testCheckAndMutateBuilder(put);
381  }
382
383  @Test
384  public void testCheckAndMutateBuilderThenDelete() {
385    testCheckAndMutateBuilder(new Delete(Bytes.toBytes(0)));
386  }
387
388  @Test
389  public void testCheckAndMutateBuilderThenMutations() throws IOException {
390    RowMutations mutations =
391      new RowMutations(Bytes.toBytes(0)).add(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("f"),
392        Bytes.toBytes("cq"), Bytes.toBytes("v"))).add(new Delete(Bytes.toBytes(0)));
393    testCheckAndMutateBuilder(mutations);
394  }
395
396  private void testCheckAndMutateWithFilterBuilder(Row op) {
397    // use of `PrefixFilter` is completely arbitrary here.
398    AsyncTable.CheckAndMutateWithFilterBuilder builder =
399      table.checkAndMutate(Bytes.toBytes(0), new PrefixFilter(Bytes.toBytes(0)));
400    if (op instanceof Put) {
401      Put put = (Put) op;
402      builder.thenPut(put).join();
403    } else if (op instanceof Delete) {
404      Delete delete = (Delete) op;
405      builder.thenDelete(delete).join();
406    } else if (op instanceof RowMutations) {
407      RowMutations mutations = (RowMutations) op;
408      builder.thenMutate(mutations).join();
409    } else {
410      fail("unsupported CheckAndPut operation " + op);
411    }
412    assertTrace("CHECK_AND_MUTATE");
413  }
414
415  @Test
416  public void testCheckAndMutateWithFilterBuilderThenPut() {
417    Put put = new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("f"), Bytes.toBytes("cq"),
418      Bytes.toBytes("v"));
419    testCheckAndMutateWithFilterBuilder(put);
420  }
421
422  @Test
423  public void testCheckAndMutateWithFilterBuilderThenDelete() {
424    testCheckAndMutateWithFilterBuilder(new Delete(Bytes.toBytes(0)));
425  }
426
427  @Test
428  public void testCheckAndMutateWithFilterBuilderThenMutations() throws IOException {
429    RowMutations mutations =
430      new RowMutations(Bytes.toBytes(0)).add(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("f"),
431        Bytes.toBytes("cq"), Bytes.toBytes("v"))).add(new Delete(Bytes.toBytes(0)));
432    testCheckAndMutateWithFilterBuilder(mutations);
433  }
434
435  @Test
436  public void testMutateRow() throws IOException {
437    final RowMutations mutations = new RowMutations(Bytes.toBytes(0)).add(new Put(Bytes.toBytes(0))
438      .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")))
439      .add(new Delete(Bytes.toBytes(0)));
440    table.mutateRow(mutations).join();
441    assertTrace("BATCH", hasAttributes(
442      containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE", "PUT")));
443  }
444
445  @Test
446  public void testScanAll() {
447    table.scanAll(new Scan().setCaching(1).setMaxResultSize(1).setLimit(1)).join();
448    assertTrace("SCAN");
449  }
450
451  @Test
452  public void testScan() throws Throwable {
453    final CountDownLatch doneSignal = new CountDownLatch(1);
454    final AtomicInteger count = new AtomicInteger();
455    final AtomicReference<Throwable> throwable = new AtomicReference<>();
456    final Scan scan = new Scan().setCaching(1).setMaxResultSize(1).setLimit(1);
457    table.scan(scan, new ScanResultConsumer() {
458      @Override
459      public boolean onNext(Result result) {
460        if (result.getRow() != null) {
461          count.incrementAndGet();
462        }
463        return true;
464      }
465
466      @Override
467      public void onError(Throwable error) {
468        throwable.set(error);
469        doneSignal.countDown();
470      }
471
472      @Override
473      public void onComplete() {
474        doneSignal.countDown();
475      }
476    });
477    doneSignal.await();
478    if (throwable.get() != null) {
479      throw throwable.get();
480    }
481    assertThat("user code did not run. check test setup.", count.get(), greaterThan(0));
482    assertTrace("SCAN");
483  }
484
485  @Test
486  public void testGetScanner() {
487    final Scan scan = new Scan().setCaching(1).setMaxResultSize(1).setLimit(1);
488    try (ResultScanner scanner = table.getScanner(scan)) {
489      int count = 0;
490      for (Result result : scanner) {
491        if (result.getRow() != null) {
492          count++;
493        }
494      }
495      // do something with it.
496      assertThat(count, greaterThanOrEqualTo(0));
497    }
498    assertTrace("SCAN");
499  }
500
501  @Test
502  public void testExistsList() {
503    CompletableFuture
504      .allOf(
505        table.exists(Arrays.asList(new Get(Bytes.toBytes(0)))).toArray(new CompletableFuture[0]))
506      .join();
507    assertTrace("BATCH",
508      hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "GET")));
509  }
510
511  @Test
512  public void testExistsAll() {
513    table.existsAll(Arrays.asList(new Get(Bytes.toBytes(0)))).join();
514    assertTrace("BATCH",
515      hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "GET")));
516  }
517
518  @Test
519  public void testGetList() {
520    CompletableFuture
521      .allOf(table.get(Arrays.asList(new Get(Bytes.toBytes(0)))).toArray(new CompletableFuture[0]))
522      .join();
523    assertTrace("BATCH",
524      hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "GET")));
525  }
526
527  @Test
528  public void testGetAll() {
529    table.getAll(Arrays.asList(new Get(Bytes.toBytes(0)))).join();
530    assertTrace("BATCH",
531      hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "GET")));
532  }
533
534  @Test
535  public void testPutList() {
536    CompletableFuture
537      .allOf(table.put(Arrays.asList(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"),
538        Bytes.toBytes("cq"), Bytes.toBytes("v")))).toArray(new CompletableFuture[0]))
539      .join();
540    assertTrace("BATCH",
541      hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "PUT")));
542  }
543
544  @Test
545  public void testPutAll() {
546    table.putAll(Arrays.asList(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"),
547      Bytes.toBytes("cq"), Bytes.toBytes("v")))).join();
548    assertTrace("BATCH",
549      hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "PUT")));
550  }
551
552  @Test
553  public void testDeleteList() {
554    CompletableFuture
555      .allOf(
556        table.delete(Arrays.asList(new Delete(Bytes.toBytes(0)))).toArray(new CompletableFuture[0]))
557      .join();
558    assertTrace("BATCH",
559      hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE")));
560  }
561
562  @Test
563  public void testDeleteAll() {
564    table.deleteAll(Arrays.asList(new Delete(Bytes.toBytes(0)))).join();
565    assertTrace("BATCH",
566      hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE")));
567  }
568
569  @Test
570  public void testBatch() {
571    CompletableFuture
572      .allOf(
573        table.batch(Arrays.asList(new Delete(Bytes.toBytes(0)))).toArray(new CompletableFuture[0]))
574      .join();
575    assertTrace("BATCH",
576      hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE")));
577  }
578
579  @Test
580  public void testBatchAll() {
581    table.batchAll(Arrays.asList(new Delete(Bytes.toBytes(0)))).join();
582    assertTrace("BATCH",
583      hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE")));
584  }
585}