001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.trace.hamcrest.AttributesMatchers.containsEntryWithStringValuesOf;
021import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasAttributes;
022import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded;
023import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasKind;
024import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
025import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode;
026import static org.apache.hadoop.hbase.client.trace.hamcrest.TraceTestUtil.buildConnectionAttributesMatcher;
027import static org.apache.hadoop.hbase.client.trace.hamcrest.TraceTestUtil.buildTableAttributesMatcher;
028import static org.hamcrest.MatcherAssert.assertThat;
029import static org.hamcrest.Matchers.allOf;
030import static org.hamcrest.Matchers.containsString;
031import static org.hamcrest.Matchers.hasItem;
032import static org.hamcrest.Matchers.hasSize;
033import static org.mockito.ArgumentMatchers.any;
034import static org.mockito.ArgumentMatchers.anyBoolean;
035import static org.mockito.ArgumentMatchers.anyInt;
036import static org.mockito.Mockito.doAnswer;
037import static org.mockito.Mockito.doNothing;
038import static org.mockito.Mockito.doReturn;
039import static org.mockito.Mockito.mock;
040import static org.mockito.Mockito.spy;
041
042import io.opentelemetry.api.trace.SpanKind;
043import io.opentelemetry.api.trace.StatusCode;
044import io.opentelemetry.sdk.trace.data.SpanData;
045import java.io.IOException;
046import java.util.Arrays;
047import java.util.Collections;
048import java.util.List;
049import java.util.concurrent.ForkJoinPool;
050import java.util.concurrent.atomic.AtomicInteger;
051import java.util.stream.Collectors;
052import org.apache.hadoop.hbase.Cell;
053import org.apache.hadoop.hbase.CellBuilderFactory;
054import org.apache.hadoop.hbase.CellBuilderType;
055import org.apache.hadoop.hbase.HBaseClassTestRule;
056import org.apache.hadoop.hbase.HRegionLocation;
057import org.apache.hadoop.hbase.MatcherPredicate;
058import org.apache.hadoop.hbase.ServerName;
059import org.apache.hadoop.hbase.TableName;
060import org.apache.hadoop.hbase.Waiter;
061import org.apache.hadoop.hbase.ipc.HBaseRpcController;
062import org.apache.hadoop.hbase.security.UserProvider;
063import org.apache.hadoop.hbase.testclassification.ClientTests;
064import org.apache.hadoop.hbase.testclassification.MediumTests;
065import org.apache.hadoop.hbase.util.Bytes;
066import org.hamcrest.Matcher;
067import org.hamcrest.core.IsAnything;
068import org.junit.After;
069import org.junit.Before;
070import org.junit.ClassRule;
071import org.junit.Test;
072import org.junit.experimental.categories.Category;
073import org.mockito.invocation.InvocationOnMock;
074import org.mockito.stubbing.Answer;
075
076import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
077import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
078
079import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
080import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
081import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
082import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
083import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse;
084import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse;
085import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
086import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
087import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto;
088import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.ColumnValue;
089import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.ColumnValue.QualifierValue;
090import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult;
091import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException;
092import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
093import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
094
095@Category({ ClientTests.class, MediumTests.class })
096public class TestHTableTracing extends TestTracingBase {
097  @ClassRule
098  public static final HBaseClassTestRule CLASS_RULE =
099    HBaseClassTestRule.forClass(TestHTableTracing.class);
100
101  private ClientProtos.ClientService.BlockingInterface stub;
102  private ConnectionImplementation conn;
103  private Table table;
104
105  @Override
106  @Before
107  public void setUp() throws Exception {
108    super.setUp();
109
110    stub = mock(ClientService.BlockingInterface.class);
111
112    AtomicInteger scanNextCalled = new AtomicInteger(0);
113
114    doAnswer(new Answer<ScanResponse>() {
115      @Override
116      public ScanResponse answer(InvocationOnMock invocation) throws Throwable {
117        ScanRequest req = invocation.getArgument(1);
118        if (!req.hasScannerId()) {
119          return ScanResponse.newBuilder().setScannerId(1).setTtl(800).setMoreResultsInRegion(true)
120            .setMoreResults(true).build();
121        } else {
122          if (req.hasCloseScanner() && req.getCloseScanner()) {
123            return ScanResponse.getDefaultInstance();
124          } else {
125            Cell cell = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
126              .setType(Cell.Type.Put).setRow(Bytes.toBytes(scanNextCalled.incrementAndGet()))
127              .setFamily(Bytes.toBytes("cf")).setQualifier(Bytes.toBytes("cq"))
128              .setValue(Bytes.toBytes("v")).build();
129            Result result = Result.create(Arrays.asList(cell));
130            ScanResponse.Builder builder = ScanResponse.newBuilder().setScannerId(1).setTtl(800)
131              .addResults(ProtobufUtil.toResult(result));
132            if (req.getLimitOfRows() == 1) {
133              builder.setMoreResultsInRegion(false).setMoreResults(false);
134            } else {
135              builder.setMoreResultsInRegion(true).setMoreResults(true);
136            }
137            return builder.build();
138          }
139        }
140      }
141    }).when(stub).scan(any(HBaseRpcController.class), any(ScanRequest.class));
142
143    doAnswer(new Answer<MultiResponse>() {
144      @Override
145      public MultiResponse answer(InvocationOnMock invocation) throws Throwable {
146        MultiResponse resp =
147          MultiResponse.newBuilder()
148            .addRegionActionResult(RegionActionResult.newBuilder().addResultOrException(
149              ResultOrException.newBuilder().setResult(ProtobufUtil.toResult(new Result()))))
150            .build();
151        return resp;
152      }
153    }).when(stub).multi(any(HBaseRpcController.class), any(ClientProtos.MultiRequest.class));
154
155    doAnswer(new Answer<MutateResponse>() {
156      @Override
157      public MutateResponse answer(InvocationOnMock invocation) throws Throwable {
158        MutationProto req = ((MutateRequest) invocation.getArgument(1)).getMutation();
159        MutateResponse resp;
160        switch (req.getMutateType()) {
161          case INCREMENT:
162            ColumnValue value = req.getColumnValue(0);
163            QualifierValue qvalue = value.getQualifierValue(0);
164            Cell cell =
165              CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setType(Cell.Type.Put)
166                .setRow(req.getRow().toByteArray()).setFamily(value.getFamily().toByteArray())
167                .setQualifier(qvalue.getQualifier().toByteArray())
168                .setValue(qvalue.getValue().toByteArray()).build();
169            resp = MutateResponse.newBuilder()
170              .setResult(ProtobufUtil.toResult(Result.create(Arrays.asList(cell)))).build();
171            break;
172          default:
173            resp = MutateResponse.getDefaultInstance();
174            break;
175        }
176        return resp;
177      }
178    }).when(stub).mutate(any(HBaseRpcController.class), any(MutateRequest.class));
179
180    doAnswer(new Answer<GetResponse>() {
181      @Override
182      public GetResponse answer(InvocationOnMock invocation) throws Throwable {
183        ClientProtos.Get req = ((GetRequest) invocation.getArgument(1)).getGet();
184        ColumnValue value = ColumnValue.getDefaultInstance();
185        QualifierValue qvalue = QualifierValue.getDefaultInstance();
186        Cell cell = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setType(Cell.Type.Put)
187          .setRow(req.getRow().toByteArray()).setFamily(value.getFamily().toByteArray())
188          .setQualifier(qvalue.getQualifier().toByteArray())
189          .setValue(qvalue.getValue().toByteArray()).build();
190        return GetResponse.newBuilder()
191          .setResult(ProtobufUtil.toResult(Result.create(Arrays.asList(cell), true))).build();
192      }
193    }).when(stub).get(any(HBaseRpcController.class), any(GetRequest.class));
194
195    conn = spy(new ConnectionImplementation(conf, null, UserProvider.instantiate(conf).getCurrent(),
196      Collections.emptyMap()) {
197      @Override
198      public RegionLocator getRegionLocator(TableName tableName) throws IOException {
199        RegionLocator locator = mock(HRegionLocator.class);
200        Answer<HRegionLocation> answer = new Answer<HRegionLocation>() {
201
202          @Override
203          public HRegionLocation answer(InvocationOnMock invocation) throws Throwable {
204            TableName tableName = TableName.META_TABLE_NAME;
205            RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build();
206            ServerName serverName = MASTER_HOST;
207            HRegionLocation loc = new HRegionLocation(info, serverName);
208            return loc;
209          }
210        };
211        doAnswer(answer).when(locator).getRegionLocation(any(byte[].class), anyInt(), anyBoolean());
212        doAnswer(answer).when(locator).getRegionLocation(any(byte[].class));
213        doAnswer(answer).when(locator).getRegionLocation(any(byte[].class), anyInt());
214        doAnswer(answer).when(locator).getRegionLocation(any(byte[].class), anyBoolean());
215        return locator;
216      }
217
218      @Override
219      public ClientService.BlockingInterface getClient(ServerName serverName) throws IOException {
220        return stub;
221      }
222    });
223    // this setup of AsyncProcess is for MultiResponse
224    AsyncProcess asyncProcess = mock(AsyncProcess.class);
225    AsyncRequestFuture asyncRequestFuture = mock(AsyncRequestFuture.class);
226    doNothing().when(asyncRequestFuture).waitUntilDone();
227    doReturn(asyncRequestFuture).when(asyncProcess).submit(any());
228    doReturn(asyncProcess).when(conn).getAsyncProcess();
229    // setup the table instance
230    table = conn.getTable(TableName.META_TABLE_NAME, ForkJoinPool.commonPool());
231  }
232
233  @After
234  public void tearDown() throws IOException {
235    Closeables.close(conn, true);
236  }
237
238  private void assertTrace(String tableOperation) {
239    assertTrace(tableOperation, new IsAnything<>());
240  }
241
242  private void assertTrace(String tableOperation, Matcher<SpanData> matcher) {
243    // n.b. this method implementation must match the one of the same name found in
244    // TestAsyncTableTracing
245    final TableName tableName = table.getName();
246    final Matcher<SpanData> spanLocator =
247      allOf(hasName(containsString(tableOperation)), hasEnded());
248    final String expectedName = tableOperation + " " + tableName.getNameWithNamespaceInclAsString();
249
250    Waiter.waitFor(conf, 1000, new MatcherPredicate<>("waiting for span to emit",
251      () -> TRACE_RULE.getSpans(), hasItem(spanLocator)));
252    List<SpanData> candidateSpans =
253      TRACE_RULE.getSpans().stream().filter(spanLocator::matches).collect(Collectors.toList());
254    assertThat(candidateSpans, hasSize(1));
255    SpanData data = candidateSpans.iterator().next();
256    assertThat(data,
257      allOf(hasName(expectedName), hasKind(SpanKind.CLIENT), hasStatusWithCode(StatusCode.OK),
258        buildConnectionAttributesMatcher(conn), buildTableAttributesMatcher(tableName), matcher));
259  }
260
261  @Test
262  public void testPut() throws IOException {
263    table.put(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"),
264      Bytes.toBytes("v")));
265    assertTrace("PUT");
266  }
267
268  @Test
269  public void testExists() throws IOException {
270    table.exists(new Get(Bytes.toBytes(0)));
271    assertTrace("GET");
272  }
273
274  @Test
275  public void testGet() throws IOException {
276    table.get(new Get(Bytes.toBytes(0)));
277    assertTrace("GET");
278  }
279
280  @Test
281  public void testDelete() throws IOException {
282    table.delete(new Delete(Bytes.toBytes(0)));
283    assertTrace("DELETE");
284  }
285
286  @Test
287  public void testAppend() throws IOException {
288    table.append(new Append(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"),
289      Bytes.toBytes("v")));
290    assertTrace("APPEND");
291  }
292
293  @Test
294  public void testIncrement() throws IOException {
295    table.increment(
296      new Increment(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1));
297    assertTrace("INCREMENT");
298  }
299
300  @Test
301  public void testIncrementColumnValue1() throws IOException {
302    table.incrementColumnValue(Bytes.toBytes(0), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1);
303    assertTrace("INCREMENT");
304  }
305
306  @Test
307  public void testIncrementColumnValue2() throws IOException {
308    table.incrementColumnValue(Bytes.toBytes(0), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1,
309      Durability.SYNC_WAL);
310    assertTrace("INCREMENT");
311  }
312
313  @Test
314  public void testCheckAndMutate() throws IOException {
315    table.checkAndMutate(CheckAndMutate.newBuilder(Bytes.toBytes(0))
316      .ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))
317      .build(new Delete(Bytes.toBytes(0))));
318    assertTrace("CHECK_AND_MUTATE",
319      hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations",
320        "CHECK_AND_MUTATE", "DELETE")));
321  }
322
323  @Test
324  public void testCheckAndMutateList() throws IOException {
325    table.checkAndMutate(Arrays.asList(CheckAndMutate.newBuilder(Bytes.toBytes(0))
326      .ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))
327      .build(new Delete(Bytes.toBytes(0)))));
328    assertTrace("BATCH",
329      hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations",
330        "CHECK_AND_MUTATE", "DELETE")));
331  }
332
333  @Test
334  public void testCheckAndMutateAll() throws IOException {
335    table.checkAndMutate(Arrays.asList(CheckAndMutate.newBuilder(Bytes.toBytes(0))
336      .ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))
337      .build(new Delete(Bytes.toBytes(0)))));
338    assertTrace("BATCH",
339      hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations",
340        "CHECK_AND_MUTATE", "DELETE")));
341  }
342
343  @Test
344  public void testMutateRow() throws Exception {
345    byte[] row = Bytes.toBytes(0);
346    table.mutateRow(RowMutations.of(Arrays.asList(new Delete(row))));
347    assertTrace("BATCH",
348      hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE")));
349  }
350
351  @Test
352  public void testExistsList() throws IOException {
353    table.exists(Arrays.asList(new Get(Bytes.toBytes(0))));
354    assertTrace("BATCH",
355      hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "GET")));
356  }
357
358  @Test
359  public void testExistsAll() throws IOException {
360    table.existsAll(Arrays.asList(new Get(Bytes.toBytes(0))));
361    assertTrace("BATCH",
362      hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "GET")));
363  }
364
365  @Test
366  public void testGetList() throws IOException {
367    table.get(Arrays.asList(new Get(Bytes.toBytes(0))));
368    assertTrace("BATCH",
369      hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "GET")));
370  }
371
372  @Test
373  public void testPutList() throws IOException {
374    table.put(Arrays.asList(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"),
375      Bytes.toBytes("cq"), Bytes.toBytes("v"))));
376    assertTrace("BATCH",
377      hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "PUT")));
378  }
379
380  @Test
381  public void testDeleteList() throws IOException {
382    table.delete(Lists.newArrayList(new Delete(Bytes.toBytes(0))));
383    assertTrace("BATCH",
384      hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE")));
385  }
386
387  @Test
388  public void testBatchList() throws IOException, InterruptedException {
389    table.batch(Arrays.asList(new Delete(Bytes.toBytes(0))), null);
390    assertTrace("BATCH",
391      hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE")));
392  }
393
394  @Test
395  public void testTableClose() throws IOException {
396    table.close();
397    assertTrace(HTable.class.getSimpleName(), "close", null, TableName.META_TABLE_NAME);
398  }
399}