001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.trace.hamcrest.AttributesMatchers.containsEntryWithStringValuesOf;
021import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasAttributes;
022import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded;
023import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasKind;
024import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
025import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode;
026import static org.apache.hadoop.hbase.client.trace.hamcrest.TraceTestUtil.buildConnectionAttributesMatcher;
027import static org.apache.hadoop.hbase.client.trace.hamcrest.TraceTestUtil.buildTableAttributesMatcher;
028import static org.hamcrest.MatcherAssert.assertThat;
029import static org.hamcrest.Matchers.allOf;
030import static org.hamcrest.Matchers.containsString;
031import static org.hamcrest.Matchers.greaterThan;
032import static org.hamcrest.Matchers.greaterThanOrEqualTo;
033import static org.hamcrest.Matchers.hasItem;
034import static org.hamcrest.Matchers.hasSize;
035import static org.junit.jupiter.api.Assertions.fail;
036import static org.mockito.ArgumentMatchers.any;
037import static org.mockito.ArgumentMatchers.anyInt;
038import static org.mockito.ArgumentMatchers.anyLong;
039import static org.mockito.Mockito.doAnswer;
040import static org.mockito.Mockito.mock;
041
042import io.opentelemetry.api.trace.SpanKind;
043import io.opentelemetry.api.trace.StatusCode;
044import io.opentelemetry.sdk.testing.junit5.OpenTelemetryExtension;
045import io.opentelemetry.sdk.trace.data.SpanData;
046import java.io.IOException;
047import java.util.Arrays;
048import java.util.List;
049import java.util.concurrent.CompletableFuture;
050import java.util.concurrent.CountDownLatch;
051import java.util.concurrent.ForkJoinPool;
052import java.util.concurrent.atomic.AtomicInteger;
053import java.util.concurrent.atomic.AtomicReference;
054import java.util.stream.Collectors;
055import org.apache.hadoop.conf.Configuration;
056import org.apache.hadoop.hbase.Cell;
057import org.apache.hadoop.hbase.CellBuilderFactory;
058import org.apache.hadoop.hbase.CellBuilderType;
059import org.apache.hadoop.hbase.HBaseConfiguration;
060import org.apache.hadoop.hbase.HRegionLocation;
061import org.apache.hadoop.hbase.MatcherPredicate;
062import org.apache.hadoop.hbase.ServerName;
063import org.apache.hadoop.hbase.TableName;
064import org.apache.hadoop.hbase.Waiter;
065import org.apache.hadoop.hbase.filter.PrefixFilter;
066import org.apache.hadoop.hbase.ipc.HBaseRpcController;
067import org.apache.hadoop.hbase.security.User;
068import org.apache.hadoop.hbase.security.UserProvider;
069import org.apache.hadoop.hbase.testclassification.ClientTests;
070import org.apache.hadoop.hbase.testclassification.MediumTests;
071import org.apache.hadoop.hbase.util.Bytes;
072import org.hamcrest.Matcher;
073import org.hamcrest.core.IsAnything;
074import org.junit.jupiter.api.AfterEach;
075import org.junit.jupiter.api.BeforeEach;
076import org.junit.jupiter.api.Tag;
077import org.junit.jupiter.api.Test;
078import org.junit.jupiter.api.extension.RegisterExtension;
079import org.mockito.invocation.InvocationOnMock;
080import org.mockito.stubbing.Answer;
081
082import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
083import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
084
085import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
086import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
087import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
088import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
089import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse;
090import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
091import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
092import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto;
093import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.ColumnValue;
094import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.ColumnValue.QualifierValue;
095import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult;
096import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException;
097import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
098import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
099
100@Tag(ClientTests.TAG)
101@Tag(MediumTests.TAG)
102public class TestAsyncTableTracing {
103
104  private static Configuration CONF = HBaseConfiguration.create();
105
106  private ClientService.Interface stub;
107
108  private AsyncConnectionImpl conn;
109
110  private AsyncTable<ScanResultConsumer> table;
111
112  @RegisterExtension
113  public static final OpenTelemetryExtension traceRule = OpenTelemetryExtension.create();
114
115  @BeforeEach
116  public void setUp() throws IOException {
117    stub = mock(ClientService.Interface.class);
118    AtomicInteger scanNextCalled = new AtomicInteger(0);
119    doAnswer(new Answer<Void>() {
120
121      @Override
122      public Void answer(InvocationOnMock invocation) throws Throwable {
123        ScanRequest req = invocation.getArgument(1);
124        RpcCallback<ScanResponse> done = invocation.getArgument(2);
125        if (!req.hasScannerId()) {
126          done.run(ScanResponse.newBuilder().setScannerId(1).setTtl(800)
127            .setMoreResultsInRegion(true).setMoreResults(true).build());
128        } else {
129          if (req.hasCloseScanner() && req.getCloseScanner()) {
130            done.run(ScanResponse.getDefaultInstance());
131          } else {
132            Cell cell = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
133              .setType(Cell.Type.Put).setRow(Bytes.toBytes(scanNextCalled.incrementAndGet()))
134              .setFamily(Bytes.toBytes("cf")).setQualifier(Bytes.toBytes("cq"))
135              .setValue(Bytes.toBytes("v")).build();
136            Result result = Result.create(Arrays.asList(cell));
137            ScanResponse.Builder builder = ScanResponse.newBuilder().setScannerId(1).setTtl(800)
138              .addResults(ProtobufUtil.toResult(result));
139            if (req.getLimitOfRows() == 1) {
140              builder.setMoreResultsInRegion(false).setMoreResults(false);
141            } else {
142              builder.setMoreResultsInRegion(true).setMoreResults(true);
143            }
144            ForkJoinPool.commonPool().execute(() -> done.run(builder.build()));
145          }
146        }
147        return null;
148      }
149    }).when(stub).scan(any(HBaseRpcController.class), any(ScanRequest.class), any());
150    doAnswer(new Answer<Void>() {
151
152      @Override
153      public Void answer(InvocationOnMock invocation) throws Throwable {
154        ClientProtos.MultiRequest req = invocation.getArgument(1);
155        ClientProtos.MultiResponse.Builder builder = ClientProtos.MultiResponse.newBuilder();
156        for (ClientProtos.RegionAction regionAction : req.getRegionActionList()) {
157          RegionActionResult.Builder raBuilder = RegionActionResult.newBuilder();
158          for (ClientProtos.Action ignored : regionAction.getActionList()) {
159            raBuilder.addResultOrException(
160              ResultOrException.newBuilder().setResult(ProtobufUtil.toResult(new Result())));
161          }
162          builder.addRegionActionResult(raBuilder);
163        }
164        ClientProtos.MultiResponse resp = builder.build();
165        RpcCallback<ClientProtos.MultiResponse> done = invocation.getArgument(2);
166        ForkJoinPool.commonPool().execute(() -> done.run(resp));
167        return null;
168      }
169    }).when(stub).multi(any(HBaseRpcController.class), any(ClientProtos.MultiRequest.class), any());
170    doAnswer(new Answer<Void>() {
171
172      @Override
173      public Void answer(InvocationOnMock invocation) throws Throwable {
174        MutationProto req = ((MutateRequest) invocation.getArgument(1)).getMutation();
175        MutateResponse resp;
176        switch (req.getMutateType()) {
177          case INCREMENT:
178            ColumnValue value = req.getColumnValue(0);
179            QualifierValue qvalue = value.getQualifierValue(0);
180            Cell cell =
181              CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setType(Cell.Type.Put)
182                .setRow(req.getRow().toByteArray()).setFamily(value.getFamily().toByteArray())
183                .setQualifier(qvalue.getQualifier().toByteArray())
184                .setValue(qvalue.getValue().toByteArray()).build();
185            resp = MutateResponse.newBuilder()
186              .setResult(ProtobufUtil.toResult(Result.create(Arrays.asList(cell)))).build();
187            break;
188          default:
189            resp = MutateResponse.getDefaultInstance();
190            break;
191        }
192        RpcCallback<MutateResponse> done = invocation.getArgument(2);
193        ForkJoinPool.commonPool().execute(() -> done.run(resp));
194        return null;
195      }
196    }).when(stub).mutate(any(HBaseRpcController.class), any(MutateRequest.class), any());
197    doAnswer(new Answer<Void>() {
198
199      @Override
200      public Void answer(InvocationOnMock invocation) throws Throwable {
201        RpcCallback<GetResponse> done = invocation.getArgument(2);
202        ForkJoinPool.commonPool().execute(() -> done.run(GetResponse.getDefaultInstance()));
203        return null;
204      }
205    }).when(stub).get(any(HBaseRpcController.class), any(GetRequest.class), any());
206    final User user = UserProvider.instantiate(CONF).getCurrent();
207    conn = new AsyncConnectionImpl(CONF, new DoNothingConnectionRegistry(CONF, user), "test", null,
208      user) {
209
210      @Override
211      AsyncRegionLocator getLocator() {
212        AsyncRegionLocator locator = mock(AsyncRegionLocator.class);
213        Answer<CompletableFuture<HRegionLocation>> answer =
214          new Answer<CompletableFuture<HRegionLocation>>() {
215
216            @Override
217            public CompletableFuture<HRegionLocation> answer(InvocationOnMock invocation)
218              throws Throwable {
219              TableName tableName = invocation.getArgument(0);
220              RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build();
221              ServerName serverName = ServerName.valueOf("rs", 16010, 12345);
222              HRegionLocation loc = new HRegionLocation(info, serverName);
223              return CompletableFuture.completedFuture(loc);
224            }
225          };
226        doAnswer(answer).when(locator).getRegionLocation(any(TableName.class), any(byte[].class),
227          any(RegionLocateType.class), anyLong());
228        doAnswer(answer).when(locator).getRegionLocation(any(TableName.class), any(byte[].class),
229          anyInt(), any(RegionLocateType.class), anyLong());
230        return locator;
231      }
232
233      @Override
234      ClientService.Interface getRegionServerStub(ServerName serverName) throws IOException {
235        return stub;
236      }
237    };
238    table = conn.getTable(TableName.valueOf("table"), ForkJoinPool.commonPool());
239  }
240
241  @AfterEach
242  public void tearDown() throws IOException {
243    Closeables.close(conn, true);
244  }
245
246  private void assertTrace(String tableOperation) {
247    assertTrace(tableOperation, new IsAnything<>());
248  }
249
250  private void assertTrace(String tableOperation, Matcher<SpanData> matcher) {
251    final TableName tableName = table.getName();
252    final Matcher<SpanData> spanLocator =
253      allOf(hasName(containsString(tableOperation)), hasEnded());
254    final String expectedName = tableOperation + " " + tableName.getNameWithNamespaceInclAsString();
255
256    Waiter.waitFor(CONF, 1000, new MatcherPredicate<>("waiting for span to emit",
257      () -> traceRule.getSpans(), hasItem(spanLocator)));
258    List<SpanData> candidateSpans =
259      traceRule.getSpans().stream().filter(spanLocator::matches).collect(Collectors.toList());
260    assertThat(candidateSpans, hasSize(1));
261    SpanData data = candidateSpans.iterator().next();
262    assertThat(data,
263      allOf(hasName(expectedName), hasKind(SpanKind.CLIENT), hasStatusWithCode(StatusCode.OK),
264        buildConnectionAttributesMatcher(conn), buildTableAttributesMatcher(tableName), matcher));
265  }
266
267  @Test
268  public void testExists() {
269    table.exists(new Get(Bytes.toBytes(0))).join();
270    assertTrace("GET");
271  }
272
273  @Test
274  public void testGet() {
275    table.get(new Get(Bytes.toBytes(0))).join();
276    assertTrace("GET");
277  }
278
279  @Test
280  public void testPut() {
281    table.put(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"),
282      Bytes.toBytes("v"))).join();
283    assertTrace("PUT");
284  }
285
286  @Test
287  public void testDelete() {
288    table.delete(new Delete(Bytes.toBytes(0))).join();
289    assertTrace("DELETE");
290  }
291
292  @Test
293  public void testAppend() {
294    table.append(new Append(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"),
295      Bytes.toBytes("v"))).join();
296    assertTrace("APPEND");
297  }
298
299  @Test
300  public void testIncrement() {
301    table
302      .increment(
303        new Increment(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1))
304      .join();
305    assertTrace("INCREMENT");
306  }
307
308  @Test
309  public void testIncrementColumnValue1() {
310    table.incrementColumnValue(Bytes.toBytes(0), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1)
311      .join();
312    assertTrace("INCREMENT");
313  }
314
315  @Test
316  public void testIncrementColumnValue2() {
317    table.incrementColumnValue(Bytes.toBytes(0), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1,
318      Durability.ASYNC_WAL).join();
319    assertTrace("INCREMENT");
320  }
321
322  @Test
323  public void testCheckAndMutate() {
324    table.checkAndMutate(CheckAndMutate.newBuilder(Bytes.toBytes(0))
325      .ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))
326      .build(new Delete(Bytes.toBytes(0)))).join();
327    assertTrace("CHECK_AND_MUTATE");
328  }
329
330  @Test
331  public void testCheckAndMutateList() {
332    CompletableFuture
333      .allOf(table.checkAndMutate(Arrays.asList(CheckAndMutate.newBuilder(Bytes.toBytes(0))
334        .ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))
335        .build(new Delete(Bytes.toBytes(0))))).toArray(new CompletableFuture[0]))
336      .join();
337    assertTrace("BATCH",
338      hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations",
339        "CHECK_AND_MUTATE", "DELETE")));
340  }
341
342  @Test
343  public void testCheckAndMutateAll() {
344    table.checkAndMutateAll(Arrays.asList(CheckAndMutate.newBuilder(Bytes.toBytes(0))
345      .ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))
346      .build(new Delete(Bytes.toBytes(0))))).join();
347    assertTrace("BATCH",
348      hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations",
349        "CHECK_AND_MUTATE", "DELETE")));
350  }
351
352  private void testCheckAndMutateBuilder(Row op) {
353    AsyncTable.CheckAndMutateBuilder builder =
354      table.checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")).qualifier(Bytes.toBytes("cq"))
355        .ifEquals(Bytes.toBytes("v"));
356    if (op instanceof Put) {
357      Put put = (Put) op;
358      builder.thenPut(put).join();
359    } else if (op instanceof Delete) {
360      Delete delete = (Delete) op;
361      builder.thenDelete(delete).join();
362    } else if (op instanceof RowMutations) {
363      RowMutations mutations = (RowMutations) op;
364      builder.thenMutate(mutations).join();
365    } else {
366      fail("unsupported CheckAndPut operation " + op);
367    }
368    assertTrace("CHECK_AND_MUTATE");
369  }
370
371  @Test
372  public void testCheckAndMutateBuilderThenPut() {
373    Put put = new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("f"), Bytes.toBytes("cq"),
374      Bytes.toBytes("v"));
375    testCheckAndMutateBuilder(put);
376  }
377
378  @Test
379  public void testCheckAndMutateBuilderThenDelete() {
380    testCheckAndMutateBuilder(new Delete(Bytes.toBytes(0)));
381  }
382
383  @Test
384  public void testCheckAndMutateBuilderThenMutations() throws IOException {
385    RowMutations mutations =
386      new RowMutations(Bytes.toBytes(0)).add(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("f"),
387        Bytes.toBytes("cq"), Bytes.toBytes("v"))).add(new Delete(Bytes.toBytes(0)));
388    testCheckAndMutateBuilder(mutations);
389  }
390
391  private void testCheckAndMutateWithFilterBuilder(Row op) {
392    // use of `PrefixFilter` is completely arbitrary here.
393    AsyncTable.CheckAndMutateWithFilterBuilder builder =
394      table.checkAndMutate(Bytes.toBytes(0), new PrefixFilter(Bytes.toBytes(0)));
395    if (op instanceof Put) {
396      Put put = (Put) op;
397      builder.thenPut(put).join();
398    } else if (op instanceof Delete) {
399      Delete delete = (Delete) op;
400      builder.thenDelete(delete).join();
401    } else if (op instanceof RowMutations) {
402      RowMutations mutations = (RowMutations) op;
403      builder.thenMutate(mutations).join();
404    } else {
405      fail("unsupported CheckAndPut operation " + op);
406    }
407    assertTrace("CHECK_AND_MUTATE");
408  }
409
410  @Test
411  public void testCheckAndMutateWithFilterBuilderThenPut() {
412    Put put = new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("f"), Bytes.toBytes("cq"),
413      Bytes.toBytes("v"));
414    testCheckAndMutateWithFilterBuilder(put);
415  }
416
417  @Test
418  public void testCheckAndMutateWithFilterBuilderThenDelete() {
419    testCheckAndMutateWithFilterBuilder(new Delete(Bytes.toBytes(0)));
420  }
421
422  @Test
423  public void testCheckAndMutateWithFilterBuilderThenMutations() throws IOException {
424    RowMutations mutations =
425      new RowMutations(Bytes.toBytes(0)).add(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("f"),
426        Bytes.toBytes("cq"), Bytes.toBytes("v"))).add(new Delete(Bytes.toBytes(0)));
427    testCheckAndMutateWithFilterBuilder(mutations);
428  }
429
430  @Test
431  public void testMutateRow() throws IOException {
432    final RowMutations mutations = new RowMutations(Bytes.toBytes(0)).add(new Put(Bytes.toBytes(0))
433      .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")))
434      .add(new Delete(Bytes.toBytes(0)));
435    table.mutateRow(mutations).join();
436    assertTrace("BATCH", hasAttributes(
437      containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE", "PUT")));
438  }
439
440  @Test
441  public void testScanAll() {
442    table.scanAll(new Scan().setCaching(1).setMaxResultSize(1).setLimit(1)).join();
443    assertTrace("SCAN");
444  }
445
446  @Test
447  public void testScan() throws Throwable {
448    final CountDownLatch doneSignal = new CountDownLatch(1);
449    final AtomicInteger count = new AtomicInteger();
450    final AtomicReference<Throwable> throwable = new AtomicReference<>();
451    final Scan scan = new Scan().setCaching(1).setMaxResultSize(1).setLimit(1);
452    table.scan(scan, new ScanResultConsumer() {
453      @Override
454      public boolean onNext(Result result) {
455        if (result.getRow() != null) {
456          count.incrementAndGet();
457        }
458        return true;
459      }
460
461      @Override
462      public void onError(Throwable error) {
463        throwable.set(error);
464        doneSignal.countDown();
465      }
466
467      @Override
468      public void onComplete() {
469        doneSignal.countDown();
470      }
471    });
472    doneSignal.await();
473    if (throwable.get() != null) {
474      throw throwable.get();
475    }
476    assertThat("user code did not run. check test setup.", count.get(), greaterThan(0));
477    assertTrace("SCAN");
478  }
479
480  @Test
481  public void testGetScanner() {
482    final Scan scan = new Scan().setCaching(1).setMaxResultSize(1).setLimit(1);
483    try (ResultScanner scanner = table.getScanner(scan)) {
484      int count = 0;
485      for (Result result : scanner) {
486        if (result.getRow() != null) {
487          count++;
488        }
489      }
490      // do something with it.
491      assertThat(count, greaterThanOrEqualTo(0));
492    }
493    assertTrace("SCAN");
494  }
495
496  @Test
497  public void testExistsList() {
498    CompletableFuture
499      .allOf(
500        table.exists(Arrays.asList(new Get(Bytes.toBytes(0)))).toArray(new CompletableFuture[0]))
501      .join();
502    assertTrace("BATCH",
503      hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "GET")));
504  }
505
506  @Test
507  public void testExistsAll() {
508    table.existsAll(Arrays.asList(new Get(Bytes.toBytes(0)))).join();
509    assertTrace("BATCH",
510      hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "GET")));
511  }
512
513  @Test
514  public void testGetList() {
515    CompletableFuture
516      .allOf(table.get(Arrays.asList(new Get(Bytes.toBytes(0)))).toArray(new CompletableFuture[0]))
517      .join();
518    assertTrace("BATCH",
519      hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "GET")));
520  }
521
522  @Test
523  public void testGetAll() {
524    table.getAll(Arrays.asList(new Get(Bytes.toBytes(0)))).join();
525    assertTrace("BATCH",
526      hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "GET")));
527  }
528
529  @Test
530  public void testPutList() {
531    CompletableFuture
532      .allOf(table.put(Arrays.asList(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"),
533        Bytes.toBytes("cq"), Bytes.toBytes("v")))).toArray(new CompletableFuture[0]))
534      .join();
535    assertTrace("BATCH",
536      hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "PUT")));
537  }
538
539  @Test
540  public void testPutAll() {
541    table.putAll(Arrays.asList(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"),
542      Bytes.toBytes("cq"), Bytes.toBytes("v")))).join();
543    assertTrace("BATCH",
544      hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "PUT")));
545  }
546
547  @Test
548  public void testDeleteList() {
549    CompletableFuture
550      .allOf(
551        table.delete(Arrays.asList(new Delete(Bytes.toBytes(0)))).toArray(new CompletableFuture[0]))
552      .join();
553    assertTrace("BATCH",
554      hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE")));
555  }
556
557  @Test
558  public void testDeleteAll() {
559    table.deleteAll(Arrays.asList(new Delete(Bytes.toBytes(0)))).join();
560    assertTrace("BATCH",
561      hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE")));
562  }
563
564  @Test
565  public void testBatch() {
566    CompletableFuture
567      .allOf(
568        table.batch(Arrays.asList(new Delete(Bytes.toBytes(0)))).toArray(new CompletableFuture[0]))
569      .join();
570    assertTrace("BATCH",
571      hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE")));
572  }
573
574  @Test
575  public void testBatchAll() {
576    table.batchAll(Arrays.asList(new Delete(Bytes.toBytes(0)))).join();
577    assertTrace("BATCH",
578      hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE")));
579  }
580}