001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded; 021import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasException; 022import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName; 023import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasParentSpanId; 024import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode; 025import static org.hamcrest.MatcherAssert.assertThat; 026import static org.hamcrest.Matchers.allOf; 027import static org.hamcrest.Matchers.hasItem; 028import static org.hamcrest.Matchers.startsWith; 029 030import io.opentelemetry.api.trace.StatusCode; 031import io.opentelemetry.sdk.trace.data.SpanData; 032import java.util.ArrayList; 033import java.util.List; 034import java.util.Objects; 035import java.util.concurrent.ForkJoinPool; 036import java.util.function.Supplier; 037import java.util.stream.Collectors; 038import org.apache.hadoop.hbase.HBaseClassTestRule; 039import org.apache.hadoop.hbase.client.trace.StringTraceRenderer; 040import org.apache.hadoop.hbase.testclassification.ClientTests; 041import org.apache.hadoop.hbase.testclassification.LargeTests; 042import org.hamcrest.Matcher; 043import org.junit.ClassRule; 044import org.junit.experimental.categories.Category; 045import org.junit.runner.RunWith; 046import org.junit.runners.Parameterized; 047import org.junit.runners.Parameterized.Parameter; 048import org.junit.runners.Parameterized.Parameters; 049import org.slf4j.Logger; 050import org.slf4j.LoggerFactory; 051 052@RunWith(Parameterized.class) 053@Category({ LargeTests.class, ClientTests.class }) 054public class TestAsyncTableScanner extends AbstractTestAsyncTableScan { 055 private static final Logger logger = LoggerFactory.getLogger(TestAsyncTableScanner.class); 056 057 @ClassRule 058 public static final HBaseClassTestRule CLASS_RULE = 059 HBaseClassTestRule.forClass(TestAsyncTableScanner.class); 060 061 @Parameter(0) 062 public String tableType; 063 064 @Parameter(1) 065 public Supplier<AsyncTable<?>> getTable; 066 067 @Parameter(2) 068 public String scanType; 069 070 @Parameter(3) 071 public Supplier<Scan> scanCreator; 072 073 @Parameters(name = "{index}: table={0}, scan={2}") 074 public static List<Object[]> params() { 075 return getTableAndScanCreatorParams(); 076 } 077 078 @Override 079 protected Scan createScan() { 080 return scanCreator.get(); 081 } 082 083 @Override 084 protected List<Result> doScan(Scan scan, int closeAfter) throws Exception { 085 AsyncTable<?> table = 086 connectionRule.getAsyncConnection().getTable(TABLE_NAME, ForkJoinPool.commonPool()); 087 List<Result> results = new ArrayList<>(); 088 // these tests batch settings with the sample data result in each result being 089 // split in two. so we must allow twice the expected results in order to reach 090 // our true limit. see convertFromBatchResult for details. 091 if (closeAfter > 0 && scan.getBatch() > 0) { 092 closeAfter = closeAfter * 2; 093 } 094 try (ResultScanner scanner = table.getScanner(scan)) { 095 for (Result result; (result = scanner.next()) != null;) { 096 results.add(result); 097 if (closeAfter > 0 && results.size() >= closeAfter) { 098 break; 099 } 100 } 101 } 102 if (scan.getBatch() > 0) { 103 results = convertFromBatchResult(results); 104 } 105 return results; 106 } 107 108 @Override 109 protected void assertTraceContinuity() { 110 final String parentSpanName = testName.getMethodName(); 111 final Matcher<SpanData> parentSpanMatcher = 112 allOf(hasName(parentSpanName), hasStatusWithCode(StatusCode.OK), hasEnded()); 113 waitForSpan(parentSpanMatcher); 114 115 final List<SpanData> spans = 116 otelClassRule.getSpans().stream().filter(Objects::nonNull).collect(Collectors.toList()); 117 if (logger.isDebugEnabled()) { 118 StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans); 119 stringTraceRenderer.render(logger::debug); 120 } 121 122 final String parentSpanId = spans.stream().filter(parentSpanMatcher::matches) 123 .map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new); 124 125 assertThat(spans, 126 hasItem(allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())), 127 hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.OK), hasEnded()))); 128 } 129 130 @Override 131 protected void 132 assertTraceError(Matcher<io.opentelemetry.api.common.Attributes> exceptionMatcher) { 133 final String parentSpanName = testName.getMethodName(); 134 final Matcher<SpanData> parentSpanMatcher = allOf(hasName(parentSpanName), hasEnded()); 135 waitForSpan(parentSpanMatcher); 136 137 final List<SpanData> spans = 138 otelClassRule.getSpans().stream().filter(Objects::nonNull).collect(Collectors.toList()); 139 if (logger.isDebugEnabled()) { 140 StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans); 141 stringTraceRenderer.render(logger::debug); 142 } 143 144 final String parentSpanId = spans.stream().filter(parentSpanMatcher::matches) 145 .map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new); 146 147 final Matcher<SpanData> scanOperationSpanMatcher = 148 allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())), 149 hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.ERROR), 150 hasException(exceptionMatcher), hasEnded()); 151 assertThat(spans, hasItem(scanOperationSpanMatcher)); 152 } 153}