001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded; 021import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasException; 022import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName; 023import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasParentSpanId; 024import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode; 025import static org.hamcrest.MatcherAssert.assertThat; 026import static org.hamcrest.Matchers.allOf; 027import static org.hamcrest.Matchers.hasItem; 028import static org.hamcrest.Matchers.not; 029import static org.hamcrest.Matchers.startsWith; 030 031import io.opentelemetry.api.trace.StatusCode; 032import io.opentelemetry.sdk.trace.data.SpanData; 033import java.util.ArrayList; 034import java.util.List; 035import java.util.Objects; 036import java.util.function.Supplier; 037import java.util.stream.Collectors; 038import org.apache.hadoop.hbase.HBaseClassTestRule; 039import org.apache.hadoop.hbase.client.trace.StringTraceRenderer; 040import org.apache.hadoop.hbase.testclassification.ClientTests; 041import org.apache.hadoop.hbase.testclassification.LargeTests; 042import org.hamcrest.Matcher; 043import org.junit.ClassRule; 044import org.junit.experimental.categories.Category; 045import org.junit.runner.RunWith; 046import org.junit.runners.Parameterized; 047import org.junit.runners.Parameterized.Parameter; 048import org.junit.runners.Parameterized.Parameters; 049import org.slf4j.Logger; 050import org.slf4j.LoggerFactory; 051 052@RunWith(Parameterized.class) 053@Category({ LargeTests.class, ClientTests.class }) 054public class TestRawAsyncTableScan extends AbstractTestAsyncTableScan { 055 private static final Logger logger = LoggerFactory.getLogger(TestRawAsyncTableScan.class); 056 057 @ClassRule 058 public static final HBaseClassTestRule CLASS_RULE = 059 HBaseClassTestRule.forClass(TestRawAsyncTableScan.class); 060 061 @Parameter(0) 062 public String scanType; 063 064 @Parameter(1) 065 public Supplier<Scan> scanCreater; 066 067 @Parameters(name = "{index}: type={0}") 068 public static List<Object[]> params() { 069 return getScanCreatorParams(); 070 } 071 072 @Override 073 protected Scan createScan() { 074 return scanCreater.get(); 075 } 076 077 @Override 078 protected List<Result> doScan(Scan scan, int closeAfter) throws Exception { 079 TracedAdvancedScanResultConsumer scanConsumer = new TracedAdvancedScanResultConsumer(); 080 connectionRule.getAsyncConnection().getTable(TABLE_NAME).scan(scan, scanConsumer); 081 List<Result> results = new ArrayList<>(); 082 // these tests batch settings with the sample data result in each result being 083 // split in two. so we must allow twice the expected results in order to reach 084 // our true limit. see convertFromBatchResult for details. 085 if (closeAfter > 0 && scan.getBatch() > 0) { 086 closeAfter = closeAfter * 2; 087 } 088 for (Result result; (result = scanConsumer.take()) != null;) { 089 results.add(result); 090 if (closeAfter > 0 && results.size() >= closeAfter) { 091 break; 092 } 093 } 094 if (scan.getBatch() > 0) { 095 results = convertFromBatchResult(results); 096 } 097 return results; 098 } 099 100 @Override 101 protected void assertTraceContinuity() { 102 final String parentSpanName = testName.getMethodName(); 103 final Matcher<SpanData> parentSpanMatcher = 104 allOf(hasName(parentSpanName), hasStatusWithCode(StatusCode.OK), hasEnded()); 105 waitForSpan(parentSpanMatcher); 106 107 final List<SpanData> spans = 108 otelClassRule.getSpans().stream().filter(Objects::nonNull).collect(Collectors.toList()); 109 if (logger.isDebugEnabled()) { 110 StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans); 111 stringTraceRenderer.render(logger::debug); 112 } 113 114 final String parentSpanId = spans.stream().filter(parentSpanMatcher::matches) 115 .map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new); 116 117 final Matcher<SpanData> scanOperationSpanMatcher = 118 allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())), 119 hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.OK), hasEnded()); 120 assertThat(spans, hasItem(scanOperationSpanMatcher)); 121 final String scanOperationSpanId = spans.stream().filter(scanOperationSpanMatcher::matches) 122 .map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new); 123 124 // RawAsyncTableImpl never invokes the callback to `onScanMetricsCreated` -- bug? 125 final Matcher<SpanData> onScanMetricsCreatedMatcher = 126 hasName("TracedAdvancedScanResultConsumer#onScanMetricsCreated"); 127 assertThat(spans, not(hasItem(onScanMetricsCreatedMatcher))); 128 129 final Matcher<SpanData> onNextMatcher = hasName("TracedAdvancedScanResultConsumer#onNext"); 130 assertThat(spans, hasItem(onNextMatcher)); 131 spans.stream().filter(onNextMatcher::matches) 132 .forEach(span -> assertThat(span, hasParentSpanId(scanOperationSpanId))); 133 assertThat(spans, hasItem(allOf(onNextMatcher, hasParentSpanId(scanOperationSpanId), 134 hasStatusWithCode(StatusCode.OK), hasEnded()))); 135 136 final Matcher<SpanData> onCompleteMatcher = 137 hasName("TracedAdvancedScanResultConsumer#onComplete"); 138 assertThat(spans, hasItem(onCompleteMatcher)); 139 spans.stream().filter(onCompleteMatcher::matches) 140 .forEach(span -> assertThat(span, allOf(onCompleteMatcher, 141 hasParentSpanId(scanOperationSpanId), hasStatusWithCode(StatusCode.OK), hasEnded()))); 142 } 143 144 @Override 145 protected void 146 assertTraceError(Matcher<io.opentelemetry.api.common.Attributes> exceptionMatcher) { 147 final String parentSpanName = testName.getMethodName(); 148 final Matcher<SpanData> parentSpanMatcher = allOf(hasName(parentSpanName), hasEnded()); 149 waitForSpan(parentSpanMatcher); 150 151 final List<SpanData> spans = 152 otelClassRule.getSpans().stream().filter(Objects::nonNull).collect(Collectors.toList()); 153 if (logger.isDebugEnabled()) { 154 StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans); 155 stringTraceRenderer.render(logger::debug); 156 } 157 158 final String parentSpanId = spans.stream().filter(parentSpanMatcher::matches) 159 .map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new); 160 161 final Matcher<SpanData> scanOperationSpanMatcher = 162 allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())), 163 hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.ERROR), 164 hasException(exceptionMatcher), hasEnded()); 165 assertThat(spans, hasItem(scanOperationSpanMatcher)); 166 final String scanOperationSpanId = spans.stream().filter(scanOperationSpanMatcher::matches) 167 .map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new); 168 169 final Matcher<SpanData> onCompleteMatcher = hasName("TracedAdvancedScanResultConsumer#onError"); 170 assertThat(spans, hasItem(onCompleteMatcher)); 171 spans.stream().filter(onCompleteMatcher::matches) 172 .forEach(span -> assertThat(span, allOf(onCompleteMatcher, 173 hasParentSpanId(scanOperationSpanId), hasStatusWithCode(StatusCode.OK), hasEnded()))); 174 } 175}