001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded;
021import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasException;
022import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
023import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasParentSpanId;
024import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode;
025import static org.hamcrest.MatcherAssert.assertThat;
026import static org.hamcrest.Matchers.allOf;
027import static org.hamcrest.Matchers.startsWith;
028
029import io.opentelemetry.api.trace.StatusCode;
030import io.opentelemetry.sdk.trace.data.SpanData;
031import java.util.ArrayList;
032import java.util.List;
033import java.util.function.Supplier;
034import java.util.stream.Collectors;
035import java.util.stream.Stream;
036import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate;
037import org.apache.hadoop.hbase.client.trace.StringTraceRenderer;
038import org.apache.hadoop.hbase.testclassification.ClientTests;
039import org.apache.hadoop.hbase.testclassification.LargeTests;
040import org.hamcrest.Matcher;
041import org.junit.jupiter.api.Tag;
042import org.junit.jupiter.params.provider.Arguments;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046@Tag(LargeTests.TAG)
047@Tag(ClientTests.TAG)
048@HBaseParameterizedTestTemplate(name = "{index}: scan={0}")
049public class TestRawAsyncTableScan extends AbstractTestAsyncTableScan {
050  private static final Logger logger = LoggerFactory.getLogger(TestRawAsyncTableScan.class);
051
052  private Supplier<Scan> scanCreator;
053
054  // scanType is only for displaying
055  public TestRawAsyncTableScan(String scanType, Supplier<Scan> scanCreator) {
056    this.scanCreator = scanCreator;
057  }
058
059  public static Stream<Arguments> parameters() {
060    return getScanCreatorParams();
061  }
062
063  @Override
064  protected Scan createScan() {
065    return scanCreator.get();
066  }
067
068  @Override
069  protected List<Result> doScan(Scan scan, int closeAfter) throws Exception {
070    TracedAdvancedScanResultConsumer scanConsumer = new TracedAdvancedScanResultConsumer();
071    CONN.getTable(TABLE_NAME).scan(scan, scanConsumer);
072    List<Result> results = new ArrayList<>();
073    // these tests batch settings with the sample data result in each result being
074    // split in two. so we must allow twice the expected results in order to reach
075    // our true limit. see convertFromBatchResult for details.
076    if (closeAfter > 0 && scan.getBatch() > 0) {
077      closeAfter = closeAfter * 2;
078    }
079    for (Result result; (result = scanConsumer.take()) != null;) {
080      results.add(result);
081      if (closeAfter > 0 && results.size() >= closeAfter) {
082        break;
083      }
084    }
085    if (scan.getBatch() > 0) {
086      results = convertFromBatchResult(results);
087    }
088    return results;
089  }
090
091  @Override
092  protected void assertTraceContinuity() {
093    final String parentSpanName = methodName;
094    final Matcher<SpanData> parentSpanMatcher =
095      allOf(hasName(parentSpanName), hasStatusWithCode(StatusCode.OK), hasEnded());
096    waitForSpan(parentSpanMatcher);
097
098    if (logger.isDebugEnabled()) {
099      StringTraceRenderer stringTraceRenderer =
100        new StringTraceRenderer(spanStream().collect(Collectors.toList()));
101      stringTraceRenderer.render(logger::debug);
102    }
103
104    final String parentSpanId = spanStream().filter(parentSpanMatcher::matches)
105      .max((a, b) -> Long.compare(a.getEndEpochNanos(), b.getEndEpochNanos()))
106      .map(SpanData::getSpanId).get();
107
108    final Matcher<SpanData> scanOperationSpanMatcher =
109      allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())),
110        hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.OK), hasEnded());
111    waitForSpan(scanOperationSpanMatcher);
112
113    final String scanOperationSpanId = spanStream().filter(scanOperationSpanMatcher::matches)
114      .max((a, b) -> Long.compare(a.getEndEpochNanos(), b.getEndEpochNanos()))
115      .map(SpanData::getSpanId).get();
116    final Matcher<SpanData> onNextMatcher = hasName("TracedAdvancedScanResultConsumer#onNext");
117    waitForSpan(onNextMatcher);
118    spanStream().filter(onNextMatcher::matches)
119      .forEach(span -> assertThat(span, hasParentSpanId(scanOperationSpanId)));
120    waitForSpan(allOf(onNextMatcher, hasParentSpanId(scanOperationSpanId),
121      hasStatusWithCode(StatusCode.OK), hasEnded()));
122
123    final Matcher<SpanData> onCompleteMatcher =
124      hasName("TracedAdvancedScanResultConsumer#onComplete");
125    waitForSpan(onCompleteMatcher);
126    spanStream().filter(onCompleteMatcher::matches)
127      .forEach(span -> assertThat(span, allOf(onCompleteMatcher,
128        hasParentSpanId(scanOperationSpanId), hasStatusWithCode(StatusCode.OK), hasEnded())));
129  }
130
131  @Override
132  protected void
133    assertTraceError(Matcher<io.opentelemetry.api.common.Attributes> exceptionMatcher) {
134    final String parentSpanName = methodName;
135    final Matcher<SpanData> parentSpanMatcher = allOf(hasName(parentSpanName), hasEnded());
136    waitForSpan(parentSpanMatcher);
137
138    if (logger.isDebugEnabled()) {
139      StringTraceRenderer stringTraceRenderer =
140        new StringTraceRenderer(spanStream().collect(Collectors.toList()));
141      stringTraceRenderer.render(logger::debug);
142    }
143
144    final String parentSpanId = spanStream().filter(parentSpanMatcher::matches)
145      .max((a, b) -> Long.compare(a.getEndEpochNanos(), b.getEndEpochNanos()))
146      .map(SpanData::getSpanId).get();
147
148    final Matcher<SpanData> scanOperationSpanMatcher =
149      allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())),
150        hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.ERROR),
151        hasException(exceptionMatcher), hasEnded());
152    waitForSpan(scanOperationSpanMatcher);
153    final String scanOperationSpanId = spanStream().filter(scanOperationSpanMatcher::matches)
154      .max((a, b) -> Long.compare(a.getEndEpochNanos(), b.getEndEpochNanos()))
155      .map(SpanData::getSpanId).get();
156
157    final Matcher<SpanData> onCompleteMatcher = hasName("TracedAdvancedScanResultConsumer#onError");
158    waitForSpan(onCompleteMatcher);
159    spanStream().filter(onCompleteMatcher::matches)
160      .forEach(span -> assertThat(span, allOf(onCompleteMatcher,
161        hasParentSpanId(scanOperationSpanId), hasStatusWithCode(StatusCode.OK), hasEnded())));
162  }
163}