001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded;
021import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasException;
022import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
023import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasParentSpanId;
024import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode;
025import static org.hamcrest.Matchers.allOf;
026import static org.hamcrest.Matchers.startsWith;
027
028import io.opentelemetry.api.trace.StatusCode;
029import io.opentelemetry.sdk.trace.data.SpanData;
030import java.util.ArrayList;
031import java.util.List;
032import java.util.concurrent.ForkJoinPool;
033import java.util.function.Supplier;
034import java.util.stream.Collectors;
035import org.apache.hadoop.hbase.HBaseClassTestRule;
036import org.apache.hadoop.hbase.client.trace.StringTraceRenderer;
037import org.apache.hadoop.hbase.testclassification.ClientTests;
038import org.apache.hadoop.hbase.testclassification.LargeTests;
039import org.hamcrest.Matcher;
040import org.junit.ClassRule;
041import org.junit.experimental.categories.Category;
042import org.junit.runner.RunWith;
043import org.junit.runners.Parameterized;
044import org.junit.runners.Parameterized.Parameter;
045import org.junit.runners.Parameterized.Parameters;
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048
049@RunWith(Parameterized.class)
050@Category({ LargeTests.class, ClientTests.class })
051public class TestAsyncTableScanner extends AbstractTestAsyncTableScan {
052  private static final Logger logger = LoggerFactory.getLogger(TestAsyncTableScanner.class);
053
054  @ClassRule
055  public static final HBaseClassTestRule CLASS_RULE =
056    HBaseClassTestRule.forClass(TestAsyncTableScanner.class);
057
058  @Parameter(0)
059  public String tableType;
060
061  @Parameter(1)
062  public Supplier<AsyncTable<?>> getTable;
063
064  @Parameter(2)
065  public String scanType;
066
067  @Parameter(3)
068  public Supplier<Scan> scanCreator;
069
070  @Parameters(name = "{index}: table={0}, scan={2}")
071  public static List<Object[]> params() {
072    return getTableAndScanCreatorParams();
073  }
074
075  @Override
076  protected Scan createScan() {
077    return scanCreator.get();
078  }
079
080  @Override
081  protected List<Result> doScan(Scan scan, int closeAfter) throws Exception {
082    AsyncTable<?> table =
083      CONN_RULE.getAsyncConnection().getTable(TABLE_NAME, ForkJoinPool.commonPool());
084    List<Result> results = new ArrayList<>();
085    // these tests batch settings with the sample data result in each result being
086    // split in two. so we must allow twice the expected results in order to reach
087    // our true limit. see convertFromBatchResult for details.
088    if (closeAfter > 0 && scan.getBatch() > 0) {
089      closeAfter = closeAfter * 2;
090    }
091    try (ResultScanner scanner = table.getScanner(scan)) {
092      for (Result result; (result = scanner.next()) != null;) {
093        results.add(result);
094        if (closeAfter > 0 && results.size() >= closeAfter) {
095          break;
096        }
097      }
098    }
099    if (scan.getBatch() > 0) {
100      results = convertFromBatchResult(results);
101    }
102    return results;
103  }
104
105  @Override
106  protected void assertTraceContinuity() {
107    final String parentSpanName = testName.getMethodName();
108    final Matcher<SpanData> parentSpanMatcher =
109      allOf(hasName(parentSpanName), hasStatusWithCode(StatusCode.OK), hasEnded());
110    waitForSpan(parentSpanMatcher);
111
112    if (logger.isDebugEnabled()) {
113      StringTraceRenderer stringTraceRenderer =
114        new StringTraceRenderer(spanStream().collect(Collectors.toList()));
115      stringTraceRenderer.render(logger::debug);
116    }
117
118    final String parentSpanId = spanStream().filter(parentSpanMatcher::matches)
119      .max((a, b) -> Long.compare(a.getEndEpochNanos(), b.getEndEpochNanos()))
120      .map(SpanData::getSpanId).get();
121
122    waitForSpan(allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())),
123      hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.OK), hasEnded()));
124  }
125
126  @Override
127  protected void
128    assertTraceError(Matcher<io.opentelemetry.api.common.Attributes> exceptionMatcher) {
129    final String parentSpanName = testName.getMethodName();
130    final Matcher<SpanData> parentSpanMatcher = allOf(hasName(parentSpanName), hasEnded());
131    waitForSpan(parentSpanMatcher);
132
133    if (logger.isDebugEnabled()) {
134      StringTraceRenderer stringTraceRenderer =
135        new StringTraceRenderer(spanStream().collect(Collectors.toList()));
136      stringTraceRenderer.render(logger::debug);
137    }
138
139    final String parentSpanId = spanStream().filter(parentSpanMatcher::matches)
140      .max((a, b) -> Long.compare(a.getEndEpochNanos(), b.getEndEpochNanos()))
141      .map(SpanData::getSpanId).get();
142
143    final Matcher<SpanData> scanOperationSpanMatcher =
144      allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())),
145        hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.ERROR),
146        hasException(exceptionMatcher), hasEnded());
147    waitForSpan(scanOperationSpanMatcher);
148  }
149}