001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded;
021import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasException;
022import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
023import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasParentSpanId;
024import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode;
025import static org.hamcrest.MatcherAssert.assertThat;
026import static org.hamcrest.Matchers.allOf;
027import static org.hamcrest.Matchers.hasItem;
028import static org.hamcrest.Matchers.not;
029import static org.hamcrest.Matchers.startsWith;
030
031import io.opentelemetry.api.trace.StatusCode;
032import io.opentelemetry.sdk.trace.data.SpanData;
033import java.util.ArrayList;
034import java.util.List;
035import java.util.Objects;
036import java.util.function.Supplier;
037import java.util.stream.Collectors;
038import org.apache.hadoop.hbase.HBaseClassTestRule;
039import org.apache.hadoop.hbase.client.trace.StringTraceRenderer;
040import org.apache.hadoop.hbase.testclassification.ClientTests;
041import org.apache.hadoop.hbase.testclassification.LargeTests;
042import org.hamcrest.Matcher;
043import org.junit.ClassRule;
044import org.junit.experimental.categories.Category;
045import org.junit.runner.RunWith;
046import org.junit.runners.Parameterized;
047import org.junit.runners.Parameterized.Parameter;
048import org.junit.runners.Parameterized.Parameters;
049import org.slf4j.Logger;
050import org.slf4j.LoggerFactory;
051
052@RunWith(Parameterized.class)
053@Category({ LargeTests.class, ClientTests.class })
054public class TestRawAsyncTableScan extends AbstractTestAsyncTableScan {
055  private static final Logger logger = LoggerFactory.getLogger(TestRawAsyncTableScan.class);
056
057  @ClassRule
058  public static final HBaseClassTestRule CLASS_RULE =
059    HBaseClassTestRule.forClass(TestRawAsyncTableScan.class);
060
061  @Parameter(0)
062  public String scanType;
063
064  @Parameter(1)
065  public Supplier<Scan> scanCreater;
066
067  @Parameters(name = "{index}: type={0}")
068  public static List<Object[]> params() {
069    return getScanCreatorParams();
070  }
071
072  @Override
073  protected Scan createScan() {
074    return scanCreater.get();
075  }
076
077  @Override
078  protected List<Result> doScan(Scan scan, int closeAfter) throws Exception {
079    TracedAdvancedScanResultConsumer scanConsumer = new TracedAdvancedScanResultConsumer();
080    connectionRule.getAsyncConnection().getTable(TABLE_NAME).scan(scan, scanConsumer);
081    List<Result> results = new ArrayList<>();
082    // these tests batch settings with the sample data result in each result being
083    // split in two. so we must allow twice the expected results in order to reach
084    // our true limit. see convertFromBatchResult for details.
085    if (closeAfter > 0 && scan.getBatch() > 0) {
086      closeAfter = closeAfter * 2;
087    }
088    for (Result result; (result = scanConsumer.take()) != null;) {
089      results.add(result);
090      if (closeAfter > 0 && results.size() >= closeAfter) {
091        break;
092      }
093    }
094    if (scan.getBatch() > 0) {
095      results = convertFromBatchResult(results);
096    }
097    return results;
098  }
099
100  @Override
101  protected void assertTraceContinuity() {
102    final String parentSpanName = testName.getMethodName();
103    final Matcher<SpanData> parentSpanMatcher =
104      allOf(hasName(parentSpanName), hasStatusWithCode(StatusCode.OK), hasEnded());
105    waitForSpan(parentSpanMatcher);
106
107    final List<SpanData> spans =
108      otelClassRule.getSpans().stream().filter(Objects::nonNull).collect(Collectors.toList());
109    if (logger.isDebugEnabled()) {
110      StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans);
111      stringTraceRenderer.render(logger::debug);
112    }
113
114    final String parentSpanId = spans.stream().filter(parentSpanMatcher::matches)
115      .map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new);
116
117    final Matcher<SpanData> scanOperationSpanMatcher =
118      allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())),
119        hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.OK), hasEnded());
120    assertThat(spans, hasItem(scanOperationSpanMatcher));
121    final String scanOperationSpanId = spans.stream().filter(scanOperationSpanMatcher::matches)
122      .map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new);
123
124    // RawAsyncTableImpl never invokes the callback to `onScanMetricsCreated` -- bug?
125    final Matcher<SpanData> onScanMetricsCreatedMatcher =
126      hasName("TracedAdvancedScanResultConsumer#onScanMetricsCreated");
127    assertThat(spans, not(hasItem(onScanMetricsCreatedMatcher)));
128
129    final Matcher<SpanData> onNextMatcher = hasName("TracedAdvancedScanResultConsumer#onNext");
130    assertThat(spans, hasItem(onNextMatcher));
131    spans.stream().filter(onNextMatcher::matches)
132      .forEach(span -> assertThat(span, hasParentSpanId(scanOperationSpanId)));
133    assertThat(spans, hasItem(allOf(onNextMatcher, hasParentSpanId(scanOperationSpanId),
134      hasStatusWithCode(StatusCode.OK), hasEnded())));
135
136    final Matcher<SpanData> onCompleteMatcher =
137      hasName("TracedAdvancedScanResultConsumer#onComplete");
138    assertThat(spans, hasItem(onCompleteMatcher));
139    spans.stream().filter(onCompleteMatcher::matches)
140      .forEach(span -> assertThat(span, allOf(onCompleteMatcher,
141        hasParentSpanId(scanOperationSpanId), hasStatusWithCode(StatusCode.OK), hasEnded())));
142  }
143
144  @Override
145  protected void
146    assertTraceError(Matcher<io.opentelemetry.api.common.Attributes> exceptionMatcher) {
147    final String parentSpanName = testName.getMethodName();
148    final Matcher<SpanData> parentSpanMatcher = allOf(hasName(parentSpanName), hasEnded());
149    waitForSpan(parentSpanMatcher);
150
151    final List<SpanData> spans =
152      otelClassRule.getSpans().stream().filter(Objects::nonNull).collect(Collectors.toList());
153    if (logger.isDebugEnabled()) {
154      StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans);
155      stringTraceRenderer.render(logger::debug);
156    }
157
158    final String parentSpanId = spans.stream().filter(parentSpanMatcher::matches)
159      .map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new);
160
161    final Matcher<SpanData> scanOperationSpanMatcher =
162      allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())),
163        hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.ERROR),
164        hasException(exceptionMatcher), hasEnded());
165    assertThat(spans, hasItem(scanOperationSpanMatcher));
166    final String scanOperationSpanId = spans.stream().filter(scanOperationSpanMatcher::matches)
167      .map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new);
168
169    final Matcher<SpanData> onCompleteMatcher = hasName("TracedAdvancedScanResultConsumer#onError");
170    assertThat(spans, hasItem(onCompleteMatcher));
171    spans.stream().filter(onCompleteMatcher::matches)
172      .forEach(span -> assertThat(span, allOf(onCompleteMatcher,
173        hasParentSpanId(scanOperationSpanId), hasStatusWithCode(StatusCode.OK), hasEnded())));
174  }
175}