001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded;
021import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasException;
022import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
023import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasParentSpanId;
024import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode;
025import static org.hamcrest.MatcherAssert.assertThat;
026import static org.hamcrest.Matchers.allOf;
027import static org.hamcrest.Matchers.startsWith;
028
029import io.opentelemetry.api.trace.StatusCode;
030import io.opentelemetry.sdk.trace.data.SpanData;
031import java.util.List;
032import java.util.concurrent.ForkJoinPool;
033import java.util.function.Supplier;
034import java.util.stream.Collectors;
035import org.apache.hadoop.hbase.HBaseClassTestRule;
036import org.apache.hadoop.hbase.client.trace.StringTraceRenderer;
037import org.apache.hadoop.hbase.testclassification.ClientTests;
038import org.apache.hadoop.hbase.testclassification.LargeTests;
039import org.hamcrest.Matcher;
040import org.junit.ClassRule;
041import org.junit.experimental.categories.Category;
042import org.junit.runner.RunWith;
043import org.junit.runners.Parameterized;
044import org.junit.runners.Parameterized.Parameter;
045import org.junit.runners.Parameterized.Parameters;
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048
049@RunWith(Parameterized.class)
050@Category({ LargeTests.class, ClientTests.class })
051public class TestAsyncTableScan extends AbstractTestAsyncTableScan {
052  private static final Logger logger = LoggerFactory.getLogger(TestAsyncTableScan.class);
053
054  @ClassRule
055  public static final HBaseClassTestRule CLASS_RULE =
056    HBaseClassTestRule.forClass(TestAsyncTableScan.class);
057
058  @Parameter(0)
059  public String scanType;
060
061  @Parameter(1)
062  public Supplier<Scan> scanCreater;
063
064  @Parameters(name = "{index}: scan={0}")
065  public static List<Object[]> params() {
066    return getScanCreatorParams();
067  }
068
069  @Override
070  protected Scan createScan() {
071    return scanCreater.get();
072  }
073
074  @Override
075  protected List<Result> doScan(Scan scan, int closeAfter) throws Exception {
076    AsyncTable<ScanResultConsumer> table =
077      CONN_RULE.getAsyncConnection().getTable(TABLE_NAME, ForkJoinPool.commonPool());
078    List<Result> results;
079    if (closeAfter > 0) {
080      // these tests batch settings with the sample data result in each result being
081      // split in two. so we must allow twice the expected results in order to reach
082      // our true limit. see convertFromBatchResult for details.
083      if (scan.getBatch() > 0) {
084        closeAfter = closeAfter * 2;
085      }
086      TracedScanResultConsumer consumer =
087        new TracedScanResultConsumer(new LimitedScanResultConsumer(closeAfter));
088      table.scan(scan, consumer);
089      results = consumer.getAll();
090    } else {
091      TracedScanResultConsumer consumer =
092        new TracedScanResultConsumer(new SimpleScanResultConsumerImpl());
093      table.scan(scan, consumer);
094      results = consumer.getAll();
095    }
096    if (scan.getBatch() > 0) {
097      results = convertFromBatchResult(results);
098    }
099    return results;
100  }
101
102  @Override
103  protected void assertTraceContinuity() {
104    final String parentSpanName = testName.getMethodName();
105    final Matcher<SpanData> parentSpanMatcher =
106      allOf(hasName(parentSpanName), hasStatusWithCode(StatusCode.OK), hasEnded());
107    waitForSpan(parentSpanMatcher);
108
109    if (logger.isDebugEnabled()) {
110      StringTraceRenderer stringTraceRenderer =
111        new StringTraceRenderer(spanStream().collect(Collectors.toList()));
112      stringTraceRenderer.render(logger::debug);
113    }
114
115    final String parentSpanId =
116      spanStream().filter(parentSpanMatcher::matches).map(SpanData::getSpanId).findAny().get();
117
118    final Matcher<SpanData> scanOperationSpanMatcher =
119      allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())),
120        hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.OK), hasEnded());
121    waitForSpan(scanOperationSpanMatcher);
122    final String scanOperationSpanId = spanStream().filter(scanOperationSpanMatcher::matches)
123      .map(SpanData::getSpanId).findAny().get();
124
125    final Matcher<SpanData> onNextMatcher = hasName("TracedScanResultConsumer#onNext");
126    waitForSpan(onNextMatcher);
127    spanStream().filter(onNextMatcher::matches)
128      .forEach(span -> assertThat(span, allOf(onNextMatcher, hasParentSpanId(scanOperationSpanId),
129        hasStatusWithCode(StatusCode.OK), hasEnded())));
130
131    final Matcher<SpanData> onCompleteMatcher = hasName("TracedScanResultConsumer#onComplete");
132    waitForSpan(onCompleteMatcher);
133    spanStream().filter(onCompleteMatcher::matches)
134      .forEach(span -> assertThat(span, allOf(onCompleteMatcher,
135        hasParentSpanId(scanOperationSpanId), hasStatusWithCode(StatusCode.OK), hasEnded())));
136  }
137
138  @Override
139  protected void
140    assertTraceError(Matcher<io.opentelemetry.api.common.Attributes> exceptionMatcher) {
141    final String parentSpanName = testName.getMethodName();
142    final Matcher<SpanData> parentSpanMatcher = allOf(hasName(parentSpanName), hasEnded());
143    waitForSpan(parentSpanMatcher);
144
145    if (logger.isDebugEnabled()) {
146      StringTraceRenderer stringTraceRenderer =
147        new StringTraceRenderer(spanStream().collect(Collectors.toList()));
148      stringTraceRenderer.render(logger::debug);
149    }
150
151    final String parentSpanId =
152      spanStream().filter(parentSpanMatcher::matches).map(SpanData::getSpanId).findAny().get();
153
154    final Matcher<SpanData> scanOperationSpanMatcher =
155      allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())),
156        hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.ERROR),
157        hasException(exceptionMatcher), hasEnded());
158    waitForSpan(scanOperationSpanMatcher);
159    final String scanOperationSpanId = spanStream().filter(scanOperationSpanMatcher::matches)
160      .map(SpanData::getSpanId).findAny().get();
161
162    final Matcher<SpanData> onErrorMatcher = hasName("TracedScanResultConsumer#onError");
163    waitForSpan(onErrorMatcher);
164    spanStream().filter(onErrorMatcher::matches)
165      .forEach(span -> assertThat(span, allOf(onErrorMatcher, hasParentSpanId(scanOperationSpanId),
166        hasStatusWithCode(StatusCode.OK), hasEnded())));
167  }
168}