001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded;
021import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasException;
022import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
023import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasParentSpanId;
024import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode;
025import static org.hamcrest.Matchers.allOf;
026import static org.hamcrest.Matchers.startsWith;
027
028import io.opentelemetry.api.trace.StatusCode;
029import io.opentelemetry.sdk.trace.data.SpanData;
030import java.util.List;
031import java.util.function.Supplier;
032import java.util.stream.Collectors;
033import java.util.stream.Stream;
034import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate;
035import org.apache.hadoop.hbase.client.trace.StringTraceRenderer;
036import org.apache.hadoop.hbase.testclassification.ClientTests;
037import org.apache.hadoop.hbase.testclassification.LargeTests;
038import org.hamcrest.Matcher;
039import org.junit.jupiter.api.Tag;
040import org.junit.jupiter.params.provider.Arguments;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044@Tag(LargeTests.TAG)
045@Tag(ClientTests.TAG)
046@HBaseParameterizedTestTemplate(name = "{index}: table={0}, scan={2}")
047public class TestAsyncTableScanAll extends AbstractTestAsyncTableScan {
048  private static final Logger logger = LoggerFactory.getLogger(TestAsyncTableScanAll.class);
049
050  private Supplier<AsyncTable<?>> getTable;
051
052  private Supplier<Scan> scanCreator;
053
054  // tableType and scanType are only for displaying
055  public TestAsyncTableScanAll(String tableType, Supplier<AsyncTable<?>> getTable, String scanType,
056    Supplier<Scan> scanCreator) {
057    this.getTable = getTable;
058    this.scanCreator = scanCreator;
059  }
060
061  public static Stream<Arguments> parameters() {
062    return getTableAndScanCreatorParams();
063  }
064
065  @Override
066  protected Scan createScan() {
067    return scanCreator.get();
068  }
069
070  @Override
071  protected List<Result> doScan(Scan scan, int closeAfter) throws Exception {
072    List<Result> results = getTable.get().scanAll(scan).get();
073    if (scan.getBatch() > 0) {
074      results = convertFromBatchResult(results);
075    }
076    // we can't really close the scan early for scanAll, but to keep the assertions
077    // simple in AbstractTestAsyncTableScan we'll just sublist here instead.
078    if (closeAfter > 0 && closeAfter < results.size()) {
079      results = results.subList(0, closeAfter);
080    }
081    return results;
082  }
083
084  @Override
085  protected void assertTraceContinuity() {
086    final String parentSpanName = methodName;
087    final Matcher<SpanData> parentSpanMatcher =
088      allOf(hasName(parentSpanName), hasStatusWithCode(StatusCode.OK), hasEnded());
089    waitForSpan(parentSpanMatcher);
090
091    if (logger.isDebugEnabled()) {
092      StringTraceRenderer stringTraceRenderer =
093        new StringTraceRenderer(spanStream().collect(Collectors.toList()));
094      stringTraceRenderer.render(logger::debug);
095    }
096
097    final String parentSpanId = spanStream().filter(parentSpanMatcher::matches)
098      .max((a, b) -> Long.compare(a.getEndEpochNanos(), b.getEndEpochNanos()))
099      .map(SpanData::getSpanId).get();
100
101    final Matcher<SpanData> scanOperationSpanMatcher =
102      allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())),
103        hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.OK), hasEnded());
104    waitForSpan(scanOperationSpanMatcher);
105  }
106
107  @Override
108  protected void
109    assertTraceError(Matcher<io.opentelemetry.api.common.Attributes> exceptionMatcher) {
110    final String parentSpanName = methodName;
111    final Matcher<SpanData> parentSpanMatcher = allOf(hasName(parentSpanName), hasEnded());
112    waitForSpan(parentSpanMatcher);
113
114    if (logger.isDebugEnabled()) {
115      StringTraceRenderer stringTraceRenderer =
116        new StringTraceRenderer(spanStream().collect(Collectors.toList()));
117      stringTraceRenderer.render(logger::debug);
118    }
119
120    final String parentSpanId = spanStream().filter(parentSpanMatcher::matches)
121      .max((a, b) -> Long.compare(a.getEndEpochNanos(), b.getEndEpochNanos()))
122      .map(SpanData::getSpanId).get();
123
124    final Matcher<SpanData> scanOperationSpanMatcher =
125      allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())),
126        hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.ERROR),
127        hasException(exceptionMatcher), hasEnded());
128    waitForSpan(scanOperationSpanMatcher);
129  }
130}