001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded;
021import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasException;
022import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
023import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasParentSpanId;
024import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode;
025import static org.hamcrest.MatcherAssert.assertThat;
026import static org.hamcrest.Matchers.allOf;
027import static org.hamcrest.Matchers.startsWith;
028
029import io.opentelemetry.api.trace.StatusCode;
030import io.opentelemetry.sdk.trace.data.SpanData;
031import java.util.List;
032import java.util.concurrent.ForkJoinPool;
033import java.util.function.Supplier;
034import java.util.stream.Collectors;
035import java.util.stream.Stream;
036import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate;
037import org.apache.hadoop.hbase.client.trace.StringTraceRenderer;
038import org.apache.hadoop.hbase.testclassification.ClientTests;
039import org.apache.hadoop.hbase.testclassification.LargeTests;
040import org.hamcrest.Matcher;
041import org.junit.jupiter.api.Tag;
042import org.junit.jupiter.params.provider.Arguments;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046@Tag(LargeTests.TAG)
047@Tag(ClientTests.TAG)
048@HBaseParameterizedTestTemplate(name = "{index}: scan={0}")
049public class TestAsyncTableScan extends AbstractTestAsyncTableScan {
050  private static final Logger logger = LoggerFactory.getLogger(TestAsyncTableScan.class);
051
052  private Supplier<Scan> scanCreator;
053
054  // scanType is only for displaying
055  public TestAsyncTableScan(String scanType, Supplier<Scan> scanCreator) {
056    this.scanCreator = scanCreator;
057  }
058
059  public static Stream<Arguments> parameters() {
060    return getScanCreatorParams();
061  }
062
063  @Override
064  protected Scan createScan() {
065    return scanCreator.get();
066  }
067
068  @Override
069  protected List<Result> doScan(Scan scan, int closeAfter) throws Exception {
070    AsyncTable<ScanResultConsumer> table = CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool());
071    List<Result> results;
072    if (closeAfter > 0) {
073      // these tests batch settings with the sample data result in each result being
074      // split in two. so we must allow twice the expected results in order to reach
075      // our true limit. see convertFromBatchResult for details.
076      if (scan.getBatch() > 0) {
077        closeAfter = closeAfter * 2;
078      }
079      TracedScanResultConsumer consumer =
080        new TracedScanResultConsumer(new LimitedScanResultConsumer(closeAfter));
081      table.scan(scan, consumer);
082      results = consumer.getAll();
083    } else {
084      TracedScanResultConsumer consumer =
085        new TracedScanResultConsumer(new SimpleScanResultConsumerImpl());
086      table.scan(scan, consumer);
087      results = consumer.getAll();
088    }
089    if (scan.getBatch() > 0) {
090      results = convertFromBatchResult(results);
091    }
092    return results;
093  }
094
095  @Override
096  protected void assertTraceContinuity() {
097    final String parentSpanName = methodName;
098    final Matcher<SpanData> parentSpanMatcher =
099      allOf(hasName(parentSpanName), hasStatusWithCode(StatusCode.OK), hasEnded());
100    waitForSpan(parentSpanMatcher);
101
102    if (logger.isDebugEnabled()) {
103      StringTraceRenderer stringTraceRenderer =
104        new StringTraceRenderer(spanStream().collect(Collectors.toList()));
105      stringTraceRenderer.render(logger::debug);
106    }
107
108    final String parentSpanId = spanStream().filter(parentSpanMatcher::matches)
109      .max((a, b) -> Long.compare(a.getEndEpochNanos(), b.getEndEpochNanos()))
110      .map(SpanData::getSpanId).get();
111
112    final Matcher<SpanData> scanOperationSpanMatcher =
113      allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())),
114        hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.OK), hasEnded());
115    waitForSpan(scanOperationSpanMatcher);
116    final String scanOperationSpanId = spanStream().filter(scanOperationSpanMatcher::matches)
117      .max((a, b) -> Long.compare(a.getEndEpochNanos(), b.getEndEpochNanos()))
118      .map(SpanData::getSpanId).get();
119
120    final Matcher<SpanData> onNextMatcher = hasName("TracedScanResultConsumer#onNext");
121    waitForSpan(onNextMatcher);
122    spanStream().filter(onNextMatcher::matches)
123      .forEach(span -> assertThat(span, allOf(onNextMatcher, hasParentSpanId(scanOperationSpanId),
124        hasStatusWithCode(StatusCode.OK), hasEnded())));
125
126    final Matcher<SpanData> onCompleteMatcher = hasName("TracedScanResultConsumer#onComplete");
127    waitForSpan(onCompleteMatcher);
128    spanStream().filter(onCompleteMatcher::matches)
129      .forEach(span -> assertThat(span, allOf(onCompleteMatcher,
130        hasParentSpanId(scanOperationSpanId), hasStatusWithCode(StatusCode.OK), hasEnded())));
131  }
132
133  @Override
134  protected void
135    assertTraceError(Matcher<io.opentelemetry.api.common.Attributes> exceptionMatcher) {
136    final String parentSpanName = methodName;
137    final Matcher<SpanData> parentSpanMatcher = allOf(hasName(parentSpanName), hasEnded());
138    waitForSpan(parentSpanMatcher);
139
140    if (logger.isDebugEnabled()) {
141      StringTraceRenderer stringTraceRenderer =
142        new StringTraceRenderer(spanStream().collect(Collectors.toList()));
143      stringTraceRenderer.render(logger::debug);
144    }
145
146    final String parentSpanId = spanStream().filter(parentSpanMatcher::matches)
147      .max((a, b) -> Long.compare(a.getEndEpochNanos(), b.getEndEpochNanos()))
148      .map(SpanData::getSpanId).get();
149
150    final Matcher<SpanData> scanOperationSpanMatcher =
151      allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())),
152        hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.ERROR),
153        hasException(exceptionMatcher), hasEnded());
154    waitForSpan(scanOperationSpanMatcher);
155    final String scanOperationSpanId = spanStream().filter(scanOperationSpanMatcher::matches)
156      .max((a, b) -> Long.compare(a.getEndEpochNanos(), b.getEndEpochNanos()))
157      .map(SpanData::getSpanId).get();
158
159    final Matcher<SpanData> onErrorMatcher = hasName("TracedScanResultConsumer#onError");
160    waitForSpan(onErrorMatcher);
161    spanStream().filter(onErrorMatcher::matches)
162      .forEach(span -> assertThat(span, allOf(onErrorMatcher, hasParentSpanId(scanOperationSpanId),
163        hasStatusWithCode(StatusCode.OK), hasEnded())));
164  }
165}