001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.trace.hamcrest.AttributesMatchers.containsEntryWithStringValuesOf;
021import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasAttributes;
022import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded;
023import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasKind;
024import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
025import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode;
026import static org.apache.hadoop.hbase.client.trace.hamcrest.TraceTestUtil.buildConnectionAttributesMatcher;
027import static org.apache.hadoop.hbase.client.trace.hamcrest.TraceTestUtil.buildTableAttributesMatcher;
028import static org.hamcrest.MatcherAssert.assertThat;
029import static org.hamcrest.Matchers.allOf;
030import static org.hamcrest.Matchers.containsString;
031import static org.hamcrest.Matchers.hasItem;
032import static org.hamcrest.Matchers.hasSize;
033import static org.mockito.ArgumentMatchers.any;
034import static org.mockito.ArgumentMatchers.anyBoolean;
035import static org.mockito.ArgumentMatchers.anyInt;
036import static org.mockito.Mockito.doAnswer;
037import static org.mockito.Mockito.doNothing;
038import static org.mockito.Mockito.doReturn;
039import static org.mockito.Mockito.mock;
040import static org.mockito.Mockito.spy;
041
042import io.opentelemetry.api.trace.SpanKind;
043import io.opentelemetry.api.trace.StatusCode;
044import io.opentelemetry.sdk.trace.data.SpanData;
045import java.io.IOException;
046import java.util.Arrays;
047import java.util.List;
048import java.util.concurrent.ForkJoinPool;
049import java.util.concurrent.atomic.AtomicInteger;
050import java.util.stream.Collectors;
051import org.apache.hadoop.hbase.Cell;
052import org.apache.hadoop.hbase.CellBuilderFactory;
053import org.apache.hadoop.hbase.CellBuilderType;
054import org.apache.hadoop.hbase.HBaseClassTestRule;
055import org.apache.hadoop.hbase.HRegionLocation;
056import org.apache.hadoop.hbase.MatcherPredicate;
057import org.apache.hadoop.hbase.ServerName;
058import org.apache.hadoop.hbase.TableName;
059import org.apache.hadoop.hbase.Waiter;
060import org.apache.hadoop.hbase.ipc.HBaseRpcController;
061import org.apache.hadoop.hbase.security.UserProvider;
062import org.apache.hadoop.hbase.testclassification.ClientTests;
063import org.apache.hadoop.hbase.testclassification.MediumTests;
064import org.apache.hadoop.hbase.util.Bytes;
065import org.hamcrest.Matcher;
066import org.hamcrest.core.IsAnything;
067import org.junit.After;
068import org.junit.Before;
069import org.junit.ClassRule;
070import org.junit.Test;
071import org.junit.experimental.categories.Category;
072import org.mockito.invocation.InvocationOnMock;
073import org.mockito.stubbing.Answer;
074
075import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
076import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
077
078import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
079import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
080import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
081import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
082import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse;
083import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse;
084import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
085import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
086import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto;
087import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.ColumnValue;
088import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.ColumnValue.QualifierValue;
089import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult;
090import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException;
091import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
092import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
093
094@Category({ ClientTests.class, MediumTests.class })
095public class TestHTableTracing extends TestTracingBase {
096  @ClassRule
097  public static final HBaseClassTestRule CLASS_RULE =
098    HBaseClassTestRule.forClass(TestHTableTracing.class);
099
100  private ClientProtos.ClientService.BlockingInterface stub;
101  private ConnectionImplementation conn;
102  private Table table;
103
104  @Override
105  @Before
106  public void setUp() throws Exception {
107    super.setUp();
108
109    stub = mock(ClientService.BlockingInterface.class);
110
111    AtomicInteger scanNextCalled = new AtomicInteger(0);
112
113    doAnswer(new Answer<ScanResponse>() {
114      @Override
115      public ScanResponse answer(InvocationOnMock invocation) throws Throwable {
116        ScanRequest req = invocation.getArgument(1);
117        if (!req.hasScannerId()) {
118          return ScanResponse.newBuilder().setScannerId(1).setTtl(800).setMoreResultsInRegion(true)
119            .setMoreResults(true).build();
120        } else {
121          if (req.hasCloseScanner() && req.getCloseScanner()) {
122            return ScanResponse.getDefaultInstance();
123          } else {
124            Cell cell = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
125              .setType(Cell.Type.Put).setRow(Bytes.toBytes(scanNextCalled.incrementAndGet()))
126              .setFamily(Bytes.toBytes("cf")).setQualifier(Bytes.toBytes("cq"))
127              .setValue(Bytes.toBytes("v")).build();
128            Result result = Result.create(Arrays.asList(cell));
129            ScanResponse.Builder builder = ScanResponse.newBuilder().setScannerId(1).setTtl(800)
130              .addResults(ProtobufUtil.toResult(result));
131            if (req.getLimitOfRows() == 1) {
132              builder.setMoreResultsInRegion(false).setMoreResults(false);
133            } else {
134              builder.setMoreResultsInRegion(true).setMoreResults(true);
135            }
136            return builder.build();
137          }
138        }
139      }
140    }).when(stub).scan(any(HBaseRpcController.class), any(ScanRequest.class));
141
142    doAnswer(new Answer<MultiResponse>() {
143      @Override
144      public MultiResponse answer(InvocationOnMock invocation) throws Throwable {
145        MultiResponse resp =
146          MultiResponse.newBuilder()
147            .addRegionActionResult(RegionActionResult.newBuilder().addResultOrException(
148              ResultOrException.newBuilder().setResult(ProtobufUtil.toResult(new Result()))))
149            .build();
150        return resp;
151      }
152    }).when(stub).multi(any(HBaseRpcController.class), any(ClientProtos.MultiRequest.class));
153
154    doAnswer(new Answer<MutateResponse>() {
155      @Override
156      public MutateResponse answer(InvocationOnMock invocation) throws Throwable {
157        MutationProto req = ((MutateRequest) invocation.getArgument(1)).getMutation();
158        MutateResponse resp;
159        switch (req.getMutateType()) {
160          case INCREMENT:
161            ColumnValue value = req.getColumnValue(0);
162            QualifierValue qvalue = value.getQualifierValue(0);
163            Cell cell =
164              CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setType(Cell.Type.Put)
165                .setRow(req.getRow().toByteArray()).setFamily(value.getFamily().toByteArray())
166                .setQualifier(qvalue.getQualifier().toByteArray())
167                .setValue(qvalue.getValue().toByteArray()).build();
168            resp = MutateResponse.newBuilder()
169              .setResult(ProtobufUtil.toResult(Result.create(Arrays.asList(cell)))).build();
170            break;
171          default:
172            resp = MutateResponse.getDefaultInstance();
173            break;
174        }
175        return resp;
176      }
177    }).when(stub).mutate(any(HBaseRpcController.class), any(MutateRequest.class));
178
179    doAnswer(new Answer<GetResponse>() {
180      @Override
181      public GetResponse answer(InvocationOnMock invocation) throws Throwable {
182        ClientProtos.Get req = ((GetRequest) invocation.getArgument(1)).getGet();
183        ColumnValue value = ColumnValue.getDefaultInstance();
184        QualifierValue qvalue = QualifierValue.getDefaultInstance();
185        Cell cell = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setType(Cell.Type.Put)
186          .setRow(req.getRow().toByteArray()).setFamily(value.getFamily().toByteArray())
187          .setQualifier(qvalue.getQualifier().toByteArray())
188          .setValue(qvalue.getValue().toByteArray()).build();
189        return GetResponse.newBuilder()
190          .setResult(ProtobufUtil.toResult(Result.create(Arrays.asList(cell), true))).build();
191      }
192    }).when(stub).get(any(HBaseRpcController.class), any(GetRequest.class));
193
194    conn =
195      spy(new ConnectionImplementation(conf, null, UserProvider.instantiate(conf).getCurrent()) {
196        @Override
197        public RegionLocator getRegionLocator(TableName tableName) throws IOException {
198          RegionLocator locator = mock(HRegionLocator.class);
199          Answer<HRegionLocation> answer = new Answer<HRegionLocation>() {
200
201            @Override
202            public HRegionLocation answer(InvocationOnMock invocation) throws Throwable {
203              TableName tableName = TableName.META_TABLE_NAME;
204              RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build();
205              ServerName serverName = MASTER_HOST;
206              HRegionLocation loc = new HRegionLocation(info, serverName);
207              return loc;
208            }
209          };
210          doAnswer(answer).when(locator).getRegionLocation(any(byte[].class), anyInt(),
211            anyBoolean());
212          doAnswer(answer).when(locator).getRegionLocation(any(byte[].class));
213          doAnswer(answer).when(locator).getRegionLocation(any(byte[].class), anyInt());
214          doAnswer(answer).when(locator).getRegionLocation(any(byte[].class), anyBoolean());
215          return locator;
216        }
217
218        @Override
219        public ClientService.BlockingInterface getClient(ServerName serverName) throws IOException {
220          return stub;
221        }
222      });
223    // this setup of AsyncProcess is for MultiResponse
224    AsyncProcess asyncProcess = mock(AsyncProcess.class);
225    AsyncRequestFuture asyncRequestFuture = mock(AsyncRequestFuture.class);
226    doNothing().when(asyncRequestFuture).waitUntilDone();
227    doReturn(asyncRequestFuture).when(asyncProcess).submit(any());
228    doReturn(asyncProcess).when(conn).getAsyncProcess();
229    // setup the table instance
230    table = conn.getTable(TableName.META_TABLE_NAME, ForkJoinPool.commonPool());
231  }
232
233  @After
234  public void tearDown() throws IOException {
235    Closeables.close(conn, true);
236  }
237
238  private void assertTrace(String tableOperation) {
239    assertTrace(tableOperation, new IsAnything<>());
240  }
241
242  private void assertTrace(String tableOperation, Matcher<SpanData> matcher) {
243    // n.b. this method implementation must match the one of the same name found in
244    // TestAsyncTableTracing
245    final TableName tableName = table.getName();
246    final Matcher<SpanData> spanLocator =
247      allOf(hasName(containsString(tableOperation)), hasEnded());
248    final String expectedName = tableOperation + " " + tableName.getNameWithNamespaceInclAsString();
249
250    Waiter.waitFor(conf, 1000, new MatcherPredicate<>("waiting for span to emit",
251      () -> TRACE_RULE.getSpans(), hasItem(spanLocator)));
252    List<SpanData> candidateSpans =
253      TRACE_RULE.getSpans().stream().filter(spanLocator::matches).collect(Collectors.toList());
254    assertThat(candidateSpans, hasSize(1));
255    SpanData data = candidateSpans.iterator().next();
256    assertThat(data,
257      allOf(hasName(expectedName), hasKind(SpanKind.CLIENT), hasStatusWithCode(StatusCode.OK),
258        buildConnectionAttributesMatcher(conn), buildTableAttributesMatcher(tableName), matcher));
259  }
260
261  @Test
262  public void testPut() throws IOException {
263    table.put(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"),
264      Bytes.toBytes("v")));
265    assertTrace("PUT");
266  }
267
268  @Test
269  public void testExists() throws IOException {
270    table.exists(new Get(Bytes.toBytes(0)));
271    assertTrace("GET");
272  }
273
274  @Test
275  public void testGet() throws IOException {
276    table.get(new Get(Bytes.toBytes(0)));
277    assertTrace("GET");
278  }
279
280  @Test
281  public void testDelete() throws IOException {
282    table.delete(new Delete(Bytes.toBytes(0)));
283    assertTrace("DELETE");
284  }
285
286  @Test
287  public void testAppend() throws IOException {
288    table.append(new Append(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"),
289      Bytes.toBytes("v")));
290    assertTrace("APPEND");
291  }
292
293  @Test
294  public void testIncrement() throws IOException {
295    table.increment(
296      new Increment(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1));
297    assertTrace("INCREMENT");
298  }
299
300  @Test
301  public void testIncrementColumnValue1() throws IOException {
302    table.incrementColumnValue(Bytes.toBytes(0), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1);
303    assertTrace("INCREMENT");
304  }
305
306  @Test
307  public void testIncrementColumnValue2() throws IOException {
308    table.incrementColumnValue(Bytes.toBytes(0), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1,
309      Durability.SYNC_WAL);
310    assertTrace("INCREMENT");
311  }
312
313  @Test
314  public void testCheckAndMutate() throws IOException {
315    table.checkAndMutate(CheckAndMutate.newBuilder(Bytes.toBytes(0))
316      .ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))
317      .build(new Delete(Bytes.toBytes(0))));
318    assertTrace("CHECK_AND_MUTATE",
319      hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations",
320        "CHECK_AND_MUTATE", "DELETE")));
321  }
322
323  @Test
324  public void testCheckAndMutateList() throws IOException {
325    table.checkAndMutate(Arrays.asList(CheckAndMutate.newBuilder(Bytes.toBytes(0))
326      .ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))
327      .build(new Delete(Bytes.toBytes(0)))));
328    assertTrace("BATCH",
329      hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations",
330        "CHECK_AND_MUTATE", "DELETE")));
331  }
332
333  @Test
334  public void testCheckAndMutateAll() throws IOException {
335    table.checkAndMutate(Arrays.asList(CheckAndMutate.newBuilder(Bytes.toBytes(0))
336      .ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))
337      .build(new Delete(Bytes.toBytes(0)))));
338    assertTrace("BATCH",
339      hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations",
340        "CHECK_AND_MUTATE", "DELETE")));
341  }
342
343  @Test
344  public void testMutateRow() throws Exception {
345    byte[] row = Bytes.toBytes(0);
346    table.mutateRow(RowMutations.of(Arrays.asList(new Delete(row))));
347    assertTrace("BATCH",
348      hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE")));
349  }
350
351  @Test
352  public void testExistsList() throws IOException {
353    table.exists(Arrays.asList(new Get(Bytes.toBytes(0))));
354    assertTrace("BATCH",
355      hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "GET")));
356  }
357
358  @Test
359  public void testExistsAll() throws IOException {
360    table.existsAll(Arrays.asList(new Get(Bytes.toBytes(0))));
361    assertTrace("BATCH",
362      hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "GET")));
363  }
364
365  @Test
366  public void testGetList() throws IOException {
367    table.get(Arrays.asList(new Get(Bytes.toBytes(0))));
368    assertTrace("BATCH",
369      hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "GET")));
370  }
371
372  @Test
373  public void testPutList() throws IOException {
374    table.put(Arrays.asList(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"),
375      Bytes.toBytes("cq"), Bytes.toBytes("v"))));
376    assertTrace("BATCH",
377      hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "PUT")));
378  }
379
380  @Test
381  public void testDeleteList() throws IOException {
382    table.delete(Lists.newArrayList(new Delete(Bytes.toBytes(0))));
383    assertTrace("BATCH",
384      hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE")));
385  }
386
387  @Test
388  public void testBatchList() throws IOException, InterruptedException {
389    table.batch(Arrays.asList(new Delete(Bytes.toBytes(0))), null);
390    assertTrace("BATCH",
391      hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE")));
392  }
393
394  @Test
395  public void testTableClose() throws IOException {
396    table.close();
397    assertTrace(HTable.class.getSimpleName(), "close", null, TableName.META_TABLE_NAME);
398  }
399}