001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded;
021import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasException;
022import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
023import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasParentSpanId;
024import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode;
025import static org.hamcrest.MatcherAssert.assertThat;
026import static org.hamcrest.Matchers.allOf;
027import static org.hamcrest.Matchers.hasItem;
028import static org.hamcrest.Matchers.startsWith;
029
030import io.opentelemetry.api.trace.StatusCode;
031import io.opentelemetry.sdk.trace.data.SpanData;
032import java.util.List;
033import java.util.Objects;
034import java.util.concurrent.ForkJoinPool;
035import java.util.function.Supplier;
036import java.util.stream.Collectors;
037import org.apache.hadoop.hbase.HBaseClassTestRule;
038import org.apache.hadoop.hbase.client.trace.StringTraceRenderer;
039import org.apache.hadoop.hbase.testclassification.ClientTests;
040import org.apache.hadoop.hbase.testclassification.LargeTests;
041import org.hamcrest.Matcher;
042import org.junit.ClassRule;
043import org.junit.experimental.categories.Category;
044import org.junit.runner.RunWith;
045import org.junit.runners.Parameterized;
046import org.junit.runners.Parameterized.Parameter;
047import org.junit.runners.Parameterized.Parameters;
048import org.slf4j.Logger;
049import org.slf4j.LoggerFactory;
050
051@RunWith(Parameterized.class)
052@Category({ LargeTests.class, ClientTests.class })
053public class TestAsyncTableScan extends AbstractTestAsyncTableScan {
054  private static final Logger logger = LoggerFactory.getLogger(TestAsyncTableScan.class);
055
056  @ClassRule
057  public static final HBaseClassTestRule CLASS_RULE =
058    HBaseClassTestRule.forClass(TestAsyncTableScan.class);
059
060  @Parameter(0)
061  public String scanType;
062
063  @Parameter(1)
064  public Supplier<Scan> scanCreater;
065
066  @Parameters(name = "{index}: scan={0}")
067  public static List<Object[]> params() {
068    return getScanCreatorParams();
069  }
070
071  @Override
072  protected Scan createScan() {
073    return scanCreater.get();
074  }
075
076  @Override
077  protected List<Result> doScan(Scan scan, int closeAfter) throws Exception {
078    AsyncTable<ScanResultConsumer> table =
079      connectionRule.getAsyncConnection().getTable(TABLE_NAME, ForkJoinPool.commonPool());
080    List<Result> results;
081    if (closeAfter > 0) {
082      // these tests batch settings with the sample data result in each result being
083      // split in two. so we must allow twice the expected results in order to reach
084      // our true limit. see convertFromBatchResult for details.
085      if (scan.getBatch() > 0) {
086        closeAfter = closeAfter * 2;
087      }
088      TracedScanResultConsumer consumer =
089        new TracedScanResultConsumer(new LimitedScanResultConsumer(closeAfter));
090      table.scan(scan, consumer);
091      results = consumer.getAll();
092    } else {
093      TracedScanResultConsumer consumer =
094        new TracedScanResultConsumer(new SimpleScanResultConsumerImpl());
095      table.scan(scan, consumer);
096      results = consumer.getAll();
097    }
098    if (scan.getBatch() > 0) {
099      results = convertFromBatchResult(results);
100    }
101    return results;
102  }
103
104  @Override
105  protected void assertTraceContinuity() {
106    final String parentSpanName = testName.getMethodName();
107    final Matcher<SpanData> parentSpanMatcher =
108      allOf(hasName(parentSpanName), hasStatusWithCode(StatusCode.OK), hasEnded());
109    waitForSpan(parentSpanMatcher);
110
111    final List<SpanData> spans =
112      otelClassRule.getSpans().stream().filter(Objects::nonNull).collect(Collectors.toList());
113    if (logger.isDebugEnabled()) {
114      StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans);
115      stringTraceRenderer.render(logger::debug);
116    }
117
118    final String parentSpanId = spans.stream().filter(parentSpanMatcher::matches)
119      .map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new);
120
121    final Matcher<SpanData> scanOperationSpanMatcher =
122      allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())),
123        hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.OK), hasEnded());
124    assertThat(spans, hasItem(scanOperationSpanMatcher));
125    final String scanOperationSpanId = spans.stream().filter(scanOperationSpanMatcher::matches)
126      .map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new);
127
128    final Matcher<SpanData> onScanMetricsCreatedMatcher =
129      hasName("TracedScanResultConsumer#onScanMetricsCreated");
130    assertThat(spans, hasItem(onScanMetricsCreatedMatcher));
131    spans.stream().filter(onScanMetricsCreatedMatcher::matches).forEach(span -> assertThat(span,
132      allOf(onScanMetricsCreatedMatcher, hasParentSpanId(scanOperationSpanId), hasEnded())));
133
134    final Matcher<SpanData> onNextMatcher = hasName("TracedScanResultConsumer#onNext");
135    assertThat(spans, hasItem(onNextMatcher));
136    spans.stream().filter(onNextMatcher::matches)
137      .forEach(span -> assertThat(span, allOf(onNextMatcher, hasParentSpanId(scanOperationSpanId),
138        hasStatusWithCode(StatusCode.OK), hasEnded())));
139
140    final Matcher<SpanData> onCompleteMatcher = hasName("TracedScanResultConsumer#onComplete");
141    assertThat(spans, hasItem(onCompleteMatcher));
142    spans.stream().filter(onCompleteMatcher::matches)
143      .forEach(span -> assertThat(span, allOf(onCompleteMatcher,
144        hasParentSpanId(scanOperationSpanId), hasStatusWithCode(StatusCode.OK), hasEnded())));
145  }
146
147  @Override
148  protected void
149    assertTraceError(Matcher<io.opentelemetry.api.common.Attributes> exceptionMatcher) {
150    final String parentSpanName = testName.getMethodName();
151    final Matcher<SpanData> parentSpanMatcher = allOf(hasName(parentSpanName), hasEnded());
152    waitForSpan(parentSpanMatcher);
153
154    final List<SpanData> spans =
155      otelClassRule.getSpans().stream().filter(Objects::nonNull).collect(Collectors.toList());
156    if (logger.isDebugEnabled()) {
157      StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans);
158      stringTraceRenderer.render(logger::debug);
159    }
160
161    final String parentSpanId = spans.stream().filter(parentSpanMatcher::matches)
162      .map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new);
163
164    final Matcher<SpanData> scanOperationSpanMatcher =
165      allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())),
166        hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.ERROR),
167        hasException(exceptionMatcher), hasEnded());
168    assertThat(spans, hasItem(scanOperationSpanMatcher));
169    final String scanOperationSpanId = spans.stream().filter(scanOperationSpanMatcher::matches)
170      .map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new);
171
172    final Matcher<SpanData> onErrorMatcher = hasName("TracedScanResultConsumer#onError");
173    assertThat(spans, hasItem(onErrorMatcher));
174    spans.stream().filter(onErrorMatcher::matches)
175      .forEach(span -> assertThat(span, allOf(onErrorMatcher, hasParentSpanId(scanOperationSpanId),
176        hasStatusWithCode(StatusCode.OK), hasEnded())));
177  }
178}