001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.trace.hamcrest.AttributesMatchers.containsEntryWithStringValuesOf;
021import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasAttributes;
022import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded;
023import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasKind;
024import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
025import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode;
026import static org.apache.hadoop.hbase.client.trace.hamcrest.TraceTestUtil.buildConnectionAttributesMatcher;
027import static org.apache.hadoop.hbase.client.trace.hamcrest.TraceTestUtil.buildTableAttributesMatcher;
028import static org.hamcrest.MatcherAssert.assertThat;
029import static org.hamcrest.Matchers.allOf;
030import static org.hamcrest.Matchers.containsString;
031import static org.hamcrest.Matchers.greaterThan;
032import static org.hamcrest.Matchers.greaterThanOrEqualTo;
033import static org.hamcrest.Matchers.hasItem;
034import static org.hamcrest.Matchers.hasSize;
035import static org.junit.Assert.fail;
036import static org.mockito.ArgumentMatchers.any;
037import static org.mockito.ArgumentMatchers.anyInt;
038import static org.mockito.ArgumentMatchers.anyLong;
039import static org.mockito.Mockito.doAnswer;
040import static org.mockito.Mockito.mock;
041
042import io.opentelemetry.api.trace.SpanKind;
043import io.opentelemetry.api.trace.StatusCode;
044import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule;
045import io.opentelemetry.sdk.trace.data.SpanData;
046import java.io.IOException;
047import java.util.Arrays;
048import java.util.List;
049import java.util.concurrent.CompletableFuture;
050import java.util.concurrent.CountDownLatch;
051import java.util.concurrent.ForkJoinPool;
052import java.util.concurrent.atomic.AtomicInteger;
053import java.util.concurrent.atomic.AtomicReference;
054import java.util.stream.Collectors;
055import org.apache.hadoop.conf.Configuration;
056import org.apache.hadoop.hbase.Cell;
057import org.apache.hadoop.hbase.CellBuilderFactory;
058import org.apache.hadoop.hbase.CellBuilderType;
059import org.apache.hadoop.hbase.HBaseClassTestRule;
060import org.apache.hadoop.hbase.HBaseConfiguration;
061import org.apache.hadoop.hbase.HRegionLocation;
062import org.apache.hadoop.hbase.MatcherPredicate;
063import org.apache.hadoop.hbase.ServerName;
064import org.apache.hadoop.hbase.TableName;
065import org.apache.hadoop.hbase.Waiter;
066import org.apache.hadoop.hbase.filter.PrefixFilter;
067import org.apache.hadoop.hbase.ipc.HBaseRpcController;
068import org.apache.hadoop.hbase.security.User;
069import org.apache.hadoop.hbase.security.UserProvider;
070import org.apache.hadoop.hbase.testclassification.ClientTests;
071import org.apache.hadoop.hbase.testclassification.MediumTests;
072import org.apache.hadoop.hbase.util.Bytes;
073import org.hamcrest.Matcher;
074import org.hamcrest.core.IsAnything;
075import org.junit.After;
076import org.junit.Before;
077import org.junit.ClassRule;
078import org.junit.Rule;
079import org.junit.Test;
080import org.junit.experimental.categories.Category;
081import org.mockito.invocation.InvocationOnMock;
082import org.mockito.stubbing.Answer;
083
084import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
085import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
086
087import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
088import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
089import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
090import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
091import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse;
092import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
093import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
094import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto;
095import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.ColumnValue;
096import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.ColumnValue.QualifierValue;
097import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult;
098import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException;
099import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
100import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
101
102@Category({ ClientTests.class, MediumTests.class })
103public class TestAsyncTableTracing {
104
105  @ClassRule
106  public static final HBaseClassTestRule CLASS_RULE =
107    HBaseClassTestRule.forClass(TestAsyncTableTracing.class);
108
109  private static Configuration CONF = HBaseConfiguration.create();
110
111  private ClientService.Interface stub;
112
113  private AsyncConnectionImpl conn;
114
115  private AsyncTable<ScanResultConsumer> table;
116
117  @Rule
118  public OpenTelemetryRule traceRule = OpenTelemetryRule.create();
119
120  @Before
121  public void setUp() throws IOException {
122    stub = mock(ClientService.Interface.class);
123    AtomicInteger scanNextCalled = new AtomicInteger(0);
124    doAnswer(new Answer<Void>() {
125
126      @Override
127      public Void answer(InvocationOnMock invocation) throws Throwable {
128        ScanRequest req = invocation.getArgument(1);
129        RpcCallback<ScanResponse> done = invocation.getArgument(2);
130        if (!req.hasScannerId()) {
131          done.run(ScanResponse.newBuilder().setScannerId(1).setTtl(800)
132            .setMoreResultsInRegion(true).setMoreResults(true).build());
133        } else {
134          if (req.hasCloseScanner() && req.getCloseScanner()) {
135            done.run(ScanResponse.getDefaultInstance());
136          } else {
137            Cell cell = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
138              .setType(Cell.Type.Put).setRow(Bytes.toBytes(scanNextCalled.incrementAndGet()))
139              .setFamily(Bytes.toBytes("cf")).setQualifier(Bytes.toBytes("cq"))
140              .setValue(Bytes.toBytes("v")).build();
141            Result result = Result.create(Arrays.asList(cell));
142            ScanResponse.Builder builder = ScanResponse.newBuilder().setScannerId(1).setTtl(800)
143              .addResults(ProtobufUtil.toResult(result));
144            if (req.getLimitOfRows() == 1) {
145              builder.setMoreResultsInRegion(false).setMoreResults(false);
146            } else {
147              builder.setMoreResultsInRegion(true).setMoreResults(true);
148            }
149            ForkJoinPool.commonPool().execute(() -> done.run(builder.build()));
150          }
151        }
152        return null;
153      }
154    }).when(stub).scan(any(HBaseRpcController.class), any(ScanRequest.class), any());
155    doAnswer(new Answer<Void>() {
156
157      @Override
158      public Void answer(InvocationOnMock invocation) throws Throwable {
159        ClientProtos.MultiRequest req = invocation.getArgument(1);
160        ClientProtos.MultiResponse.Builder builder = ClientProtos.MultiResponse.newBuilder();
161        for (ClientProtos.RegionAction regionAction : req.getRegionActionList()) {
162          RegionActionResult.Builder raBuilder = RegionActionResult.newBuilder();
163          for (ClientProtos.Action ignored : regionAction.getActionList()) {
164            raBuilder.addResultOrException(
165              ResultOrException.newBuilder().setResult(ProtobufUtil.toResult(new Result())));
166          }
167          builder.addRegionActionResult(raBuilder);
168        }
169        ClientProtos.MultiResponse resp = builder.build();
170        RpcCallback<ClientProtos.MultiResponse> done = invocation.getArgument(2);
171        ForkJoinPool.commonPool().execute(() -> done.run(resp));
172        return null;
173      }
174    }).when(stub).multi(any(HBaseRpcController.class), any(ClientProtos.MultiRequest.class), any());
175    doAnswer(new Answer<Void>() {
176
177      @Override
178      public Void answer(InvocationOnMock invocation) throws Throwable {
179        MutationProto req = ((MutateRequest) invocation.getArgument(1)).getMutation();
180        MutateResponse resp;
181        switch (req.getMutateType()) {
182          case INCREMENT:
183            ColumnValue value = req.getColumnValue(0);
184            QualifierValue qvalue = value.getQualifierValue(0);
185            Cell cell =
186              CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setType(Cell.Type.Put)
187                .setRow(req.getRow().toByteArray()).setFamily(value.getFamily().toByteArray())
188                .setQualifier(qvalue.getQualifier().toByteArray())
189                .setValue(qvalue.getValue().toByteArray()).build();
190            resp = MutateResponse.newBuilder()
191              .setResult(ProtobufUtil.toResult(Result.create(Arrays.asList(cell)))).build();
192            break;
193          default:
194            resp = MutateResponse.getDefaultInstance();
195            break;
196        }
197        RpcCallback<MutateResponse> done = invocation.getArgument(2);
198        ForkJoinPool.commonPool().execute(() -> done.run(resp));
199        return null;
200      }
201    }).when(stub).mutate(any(HBaseRpcController.class), any(MutateRequest.class), any());
202    doAnswer(new Answer<Void>() {
203
204      @Override
205      public Void answer(InvocationOnMock invocation) throws Throwable {
206        RpcCallback<GetResponse> done = invocation.getArgument(2);
207        ForkJoinPool.commonPool().execute(() -> done.run(GetResponse.getDefaultInstance()));
208        return null;
209      }
210    }).when(stub).get(any(HBaseRpcController.class), any(GetRequest.class), any());
211    final User user = UserProvider.instantiate(CONF).getCurrent();
212    conn = new AsyncConnectionImpl(CONF, new DoNothingConnectionRegistry(CONF), "test", user) {
213
214      @Override
215      AsyncRegionLocator getLocator() {
216        AsyncRegionLocator locator = mock(AsyncRegionLocator.class);
217        Answer<CompletableFuture<HRegionLocation>> answer =
218          new Answer<CompletableFuture<HRegionLocation>>() {
219
220            @Override
221            public CompletableFuture<HRegionLocation> answer(InvocationOnMock invocation)
222              throws Throwable {
223              TableName tableName = invocation.getArgument(0);
224              RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build();
225              ServerName serverName = ServerName.valueOf("rs", 16010, 12345);
226              HRegionLocation loc = new HRegionLocation(info, serverName);
227              return CompletableFuture.completedFuture(loc);
228            }
229          };
230        doAnswer(answer).when(locator).getRegionLocation(any(TableName.class), any(byte[].class),
231          any(RegionLocateType.class), anyLong());
232        doAnswer(answer).when(locator).getRegionLocation(any(TableName.class), any(byte[].class),
233          anyInt(), any(RegionLocateType.class), anyLong());
234        return locator;
235      }
236
237      @Override
238      ClientService.Interface getRegionServerStub(ServerName serverName) throws IOException {
239        return stub;
240      }
241    };
242    table = conn.getTable(TableName.valueOf("table"), ForkJoinPool.commonPool());
243  }
244
245  @After
246  public void tearDown() throws IOException {
247    Closeables.close(conn, true);
248  }
249
250  private void assertTrace(String tableOperation) {
251    assertTrace(tableOperation, new IsAnything<>());
252  }
253
254  private void assertTrace(String tableOperation, Matcher<SpanData> matcher) {
255    // n.b. this method implementation must match the one of the same name found in
256    // TestHTableTracing
257    final TableName tableName = table.getName();
258    final Matcher<SpanData> spanLocator =
259      allOf(hasName(containsString(tableOperation)), hasEnded());
260    final String expectedName = tableOperation + " " + tableName.getNameWithNamespaceInclAsString();
261
262    Waiter.waitFor(CONF, 1000, new MatcherPredicate<>("waiting for span to emit",
263      () -> traceRule.getSpans(), hasItem(spanLocator)));
264    List<SpanData> candidateSpans =
265      traceRule.getSpans().stream().filter(spanLocator::matches).collect(Collectors.toList());
266    assertThat(candidateSpans, hasSize(1));
267    SpanData data = candidateSpans.iterator().next();
268    assertThat(data,
269      allOf(hasName(expectedName), hasKind(SpanKind.CLIENT), hasStatusWithCode(StatusCode.OK),
270        buildConnectionAttributesMatcher(conn), buildTableAttributesMatcher(tableName), matcher));
271  }
272
273  @Test
274  public void testExists() {
275    table.exists(new Get(Bytes.toBytes(0))).join();
276    assertTrace("GET");
277  }
278
279  @Test
280  public void testGet() {
281    table.get(new Get(Bytes.toBytes(0))).join();
282    assertTrace("GET");
283  }
284
285  @Test
286  public void testPut() {
287    table.put(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"),
288      Bytes.toBytes("v"))).join();
289    assertTrace("PUT");
290  }
291
292  @Test
293  public void testDelete() {
294    table.delete(new Delete(Bytes.toBytes(0))).join();
295    assertTrace("DELETE");
296  }
297
298  @Test
299  public void testAppend() {
300    table.append(new Append(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"),
301      Bytes.toBytes("v"))).join();
302    assertTrace("APPEND");
303  }
304
305  @Test
306  public void testIncrement() {
307    table
308      .increment(
309        new Increment(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1))
310      .join();
311    assertTrace("INCREMENT");
312  }
313
314  @Test
315  public void testIncrementColumnValue1() {
316    table.incrementColumnValue(Bytes.toBytes(0), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1)
317      .join();
318    assertTrace("INCREMENT");
319  }
320
321  @Test
322  public void testIncrementColumnValue2() {
323    table.incrementColumnValue(Bytes.toBytes(0), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1,
324      Durability.ASYNC_WAL).join();
325    assertTrace("INCREMENT");
326  }
327
328  @Test
329  public void testCheckAndMutate() {
330    table.checkAndMutate(CheckAndMutate.newBuilder(Bytes.toBytes(0))
331      .ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))
332      .build(new Delete(Bytes.toBytes(0)))).join();
333    assertTrace("CHECK_AND_MUTATE");
334  }
335
336  @Test
337  public void testCheckAndMutateList() {
338    CompletableFuture
339      .allOf(table.checkAndMutate(Arrays.asList(CheckAndMutate.newBuilder(Bytes.toBytes(0))
340        .ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))
341        .build(new Delete(Bytes.toBytes(0))))).toArray(new CompletableFuture[0]))
342      .join();
343    assertTrace("BATCH",
344      hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations",
345        "CHECK_AND_MUTATE", "DELETE")));
346  }
347
348  @Test
349  public void testCheckAndMutateAll() {
350    table.checkAndMutateAll(Arrays.asList(CheckAndMutate.newBuilder(Bytes.toBytes(0))
351      .ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))
352      .build(new Delete(Bytes.toBytes(0))))).join();
353    assertTrace("BATCH",
354      hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations",
355        "CHECK_AND_MUTATE", "DELETE")));
356  }
357
358  private void testCheckAndMutateBuilder(Row op) {
359    AsyncTable.CheckAndMutateBuilder builder =
360      table.checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")).qualifier(Bytes.toBytes("cq"))
361        .ifEquals(Bytes.toBytes("v"));
362    if (op instanceof Put) {
363      Put put = (Put) op;
364      builder.thenPut(put).join();
365    } else if (op instanceof Delete) {
366      Delete delete = (Delete) op;
367      builder.thenDelete(delete).join();
368    } else if (op instanceof RowMutations) {
369      RowMutations mutations = (RowMutations) op;
370      builder.thenMutate(mutations).join();
371    } else {
372      fail("unsupported CheckAndPut operation " + op);
373    }
374    assertTrace("CHECK_AND_MUTATE");
375  }
376
377  @Test
378  public void testCheckAndMutateBuilderThenPut() {
379    Put put = new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("f"), Bytes.toBytes("cq"),
380      Bytes.toBytes("v"));
381    testCheckAndMutateBuilder(put);
382  }
383
384  @Test
385  public void testCheckAndMutateBuilderThenDelete() {
386    testCheckAndMutateBuilder(new Delete(Bytes.toBytes(0)));
387  }
388
389  @Test
390  public void testCheckAndMutateBuilderThenMutations() throws IOException {
391    RowMutations mutations = new RowMutations(Bytes.toBytes(0))
392      .add((Mutation) new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("f"), Bytes.toBytes("cq"),
393        Bytes.toBytes("v")))
394      .add((Mutation) new Delete(Bytes.toBytes(0)));
395    testCheckAndMutateBuilder(mutations);
396  }
397
398  private void testCheckAndMutateWithFilterBuilder(Row op) {
399    // use of `PrefixFilter` is completely arbitrary here.
400    AsyncTable.CheckAndMutateWithFilterBuilder builder =
401      table.checkAndMutate(Bytes.toBytes(0), new PrefixFilter(Bytes.toBytes(0)));
402    if (op instanceof Put) {
403      Put put = (Put) op;
404      builder.thenPut(put).join();
405    } else if (op instanceof Delete) {
406      Delete delete = (Delete) op;
407      builder.thenDelete(delete).join();
408    } else if (op instanceof RowMutations) {
409      RowMutations mutations = (RowMutations) op;
410      builder.thenMutate(mutations).join();
411    } else {
412      fail("unsupported CheckAndPut operation " + op);
413    }
414    assertTrace("CHECK_AND_MUTATE");
415  }
416
417  @Test
418  public void testCheckAndMutateWithFilterBuilderThenPut() {
419    Put put = new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("f"), Bytes.toBytes("cq"),
420      Bytes.toBytes("v"));
421    testCheckAndMutateWithFilterBuilder(put);
422  }
423
424  @Test
425  public void testCheckAndMutateWithFilterBuilderThenDelete() {
426    testCheckAndMutateWithFilterBuilder(new Delete(Bytes.toBytes(0)));
427  }
428
429  @Test
430  public void testCheckAndMutateWithFilterBuilderThenMutations() throws IOException {
431    RowMutations mutations = new RowMutations(Bytes.toBytes(0))
432      .add((Mutation) new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("f"), Bytes.toBytes("cq"),
433        Bytes.toBytes("v")))
434      .add((Mutation) new Delete(Bytes.toBytes(0)));
435    testCheckAndMutateWithFilterBuilder(mutations);
436  }
437
438  @Test
439  public void testMutateRow() throws IOException {
440    final RowMutations mutations = new RowMutations(Bytes.toBytes(0))
441      .add((Mutation) new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"),
442        Bytes.toBytes("v")))
443      .add((Mutation) new Delete(Bytes.toBytes(0)));
444    table.mutateRow(mutations).join();
445    assertTrace("BATCH", hasAttributes(
446      containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE", "PUT")));
447  }
448
449  @Test
450  public void testScanAll() {
451    table.scanAll(new Scan().setCaching(1).setMaxResultSize(1).setLimit(1)).join();
452    assertTrace("SCAN");
453  }
454
455  @Test
456  public void testScan() throws Throwable {
457    final CountDownLatch doneSignal = new CountDownLatch(1);
458    final AtomicInteger count = new AtomicInteger();
459    final AtomicReference<Throwable> throwable = new AtomicReference<>();
460    final Scan scan = new Scan().setCaching(1).setMaxResultSize(1).setLimit(1);
461    table.scan(scan, new ScanResultConsumer() {
462      @Override
463      public boolean onNext(Result result) {
464        if (result.getRow() != null) {
465          count.incrementAndGet();
466        }
467        return true;
468      }
469
470      @Override
471      public void onError(Throwable error) {
472        throwable.set(error);
473        doneSignal.countDown();
474      }
475
476      @Override
477      public void onComplete() {
478        doneSignal.countDown();
479      }
480    });
481    doneSignal.await();
482    if (throwable.get() != null) {
483      throw throwable.get();
484    }
485    assertThat("user code did not run. check test setup.", count.get(), greaterThan(0));
486    assertTrace("SCAN");
487  }
488
489  @Test
490  public void testGetScanner() {
491    final Scan scan = new Scan().setCaching(1).setMaxResultSize(1).setLimit(1);
492    try (ResultScanner scanner = table.getScanner(scan)) {
493      int count = 0;
494      for (Result result : scanner) {
495        if (result.getRow() != null) {
496          count++;
497        }
498      }
499      // do something with it.
500      assertThat(count, greaterThanOrEqualTo(0));
501    }
502    assertTrace("SCAN");
503  }
504
505  @Test
506  public void testExistsList() {
507    CompletableFuture
508      .allOf(
509        table.exists(Arrays.asList(new Get(Bytes.toBytes(0)))).toArray(new CompletableFuture[0]))
510      .join();
511    assertTrace("BATCH",
512      hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "GET")));
513  }
514
515  @Test
516  public void testExistsAll() {
517    table.existsAll(Arrays.asList(new Get(Bytes.toBytes(0)))).join();
518    assertTrace("BATCH",
519      hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "GET")));
520  }
521
522  @Test
523  public void testGetList() {
524    CompletableFuture
525      .allOf(table.get(Arrays.asList(new Get(Bytes.toBytes(0)))).toArray(new CompletableFuture[0]))
526      .join();
527    assertTrace("BATCH",
528      hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "GET")));
529  }
530
531  @Test
532  public void testGetAll() {
533    table.getAll(Arrays.asList(new Get(Bytes.toBytes(0)))).join();
534    assertTrace("BATCH",
535      hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "GET")));
536  }
537
538  @Test
539  public void testPutList() {
540    CompletableFuture
541      .allOf(table.put(Arrays.asList(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"),
542        Bytes.toBytes("cq"), Bytes.toBytes("v")))).toArray(new CompletableFuture[0]))
543      .join();
544    assertTrace("BATCH",
545      hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "PUT")));
546  }
547
548  @Test
549  public void testPutAll() {
550    table.putAll(Arrays.asList(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"),
551      Bytes.toBytes("cq"), Bytes.toBytes("v")))).join();
552    assertTrace("BATCH",
553      hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "PUT")));
554  }
555
556  @Test
557  public void testDeleteList() {
558    CompletableFuture
559      .allOf(
560        table.delete(Arrays.asList(new Delete(Bytes.toBytes(0)))).toArray(new CompletableFuture[0]))
561      .join();
562    assertTrace("BATCH",
563      hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE")));
564  }
565
566  @Test
567  public void testDeleteAll() {
568    table.deleteAll(Arrays.asList(new Delete(Bytes.toBytes(0)))).join();
569    assertTrace("BATCH",
570      hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE")));
571  }
572
573  @Test
574  public void testBatch() {
575    CompletableFuture
576      .allOf(
577        table.batch(Arrays.asList(new Delete(Bytes.toBytes(0)))).toArray(new CompletableFuture[0]))
578      .join();
579    assertTrace("BATCH",
580      hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE")));
581  }
582
583  @Test
584  public void testBatchAll() {
585    table.batchAll(Arrays.asList(new Delete(Bytes.toBytes(0)))).join();
586    assertTrace("BATCH",
587      hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE")));
588  }
589}