001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded; 021import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasException; 022import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName; 023import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasParentSpanId; 024import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode; 025import static org.hamcrest.Matchers.allOf; 026import static org.hamcrest.Matchers.startsWith; 027 028import io.opentelemetry.api.trace.StatusCode; 029import io.opentelemetry.sdk.trace.data.SpanData; 030import java.util.List; 031import java.util.function.Supplier; 032import java.util.stream.Collectors; 033import java.util.stream.Stream; 034import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate; 035import org.apache.hadoop.hbase.client.trace.StringTraceRenderer; 036import org.apache.hadoop.hbase.testclassification.ClientTests; 037import org.apache.hadoop.hbase.testclassification.LargeTests; 038import org.hamcrest.Matcher; 039import org.junit.jupiter.api.Tag; 040import org.junit.jupiter.params.provider.Arguments; 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043 044@Tag(LargeTests.TAG) 045@Tag(ClientTests.TAG) 046@HBaseParameterizedTestTemplate(name = "{index}: table={0}, scan={2}") 047public class TestAsyncTableScanAll extends AbstractTestAsyncTableScan { 048 private static final Logger logger = LoggerFactory.getLogger(TestAsyncTableScanAll.class); 049 050 private Supplier<AsyncTable<?>> getTable; 051 052 private Supplier<Scan> scanCreator; 053 054 // tableType and scanType are only for displaying 055 public TestAsyncTableScanAll(String tableType, Supplier<AsyncTable<?>> getTable, String scanType, 056 Supplier<Scan> scanCreator) { 057 this.getTable = getTable; 058 this.scanCreator = scanCreator; 059 } 060 061 public static Stream<Arguments> parameters() { 062 return getTableAndScanCreatorParams(); 063 } 064 065 @Override 066 protected Scan createScan() { 067 return scanCreator.get(); 068 } 069 070 @Override 071 protected List<Result> doScan(Scan scan, int closeAfter) throws Exception { 072 List<Result> results = getTable.get().scanAll(scan).get(); 073 if (scan.getBatch() > 0) { 074 results = convertFromBatchResult(results); 075 } 076 // we can't really close the scan early for scanAll, but to keep the assertions 077 // simple in AbstractTestAsyncTableScan we'll just sublist here instead. 078 if (closeAfter > 0 && closeAfter < results.size()) { 079 results = results.subList(0, closeAfter); 080 } 081 return results; 082 } 083 084 @Override 085 protected void assertTraceContinuity() { 086 final String parentSpanName = methodName; 087 final Matcher<SpanData> parentSpanMatcher = 088 allOf(hasName(parentSpanName), hasStatusWithCode(StatusCode.OK), hasEnded()); 089 waitForSpan(parentSpanMatcher); 090 091 if (logger.isDebugEnabled()) { 092 StringTraceRenderer stringTraceRenderer = 093 new StringTraceRenderer(spanStream().collect(Collectors.toList())); 094 stringTraceRenderer.render(logger::debug); 095 } 096 097 final String parentSpanId = spanStream().filter(parentSpanMatcher::matches) 098 .max((a, b) -> Long.compare(a.getEndEpochNanos(), b.getEndEpochNanos())) 099 .map(SpanData::getSpanId).get(); 100 101 final Matcher<SpanData> scanOperationSpanMatcher = 102 allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())), 103 hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.OK), hasEnded()); 104 waitForSpan(scanOperationSpanMatcher); 105 } 106 107 @Override 108 protected void 109 assertTraceError(Matcher<io.opentelemetry.api.common.Attributes> exceptionMatcher) { 110 final String parentSpanName = methodName; 111 final Matcher<SpanData> parentSpanMatcher = allOf(hasName(parentSpanName), hasEnded()); 112 waitForSpan(parentSpanMatcher); 113 114 if (logger.isDebugEnabled()) { 115 StringTraceRenderer stringTraceRenderer = 116 new StringTraceRenderer(spanStream().collect(Collectors.toList())); 117 stringTraceRenderer.render(logger::debug); 118 } 119 120 final String parentSpanId = spanStream().filter(parentSpanMatcher::matches) 121 .max((a, b) -> Long.compare(a.getEndEpochNanos(), b.getEndEpochNanos())) 122 .map(SpanData::getSpanId).get(); 123 124 final Matcher<SpanData> scanOperationSpanMatcher = 125 allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())), 126 hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.ERROR), 127 hasException(exceptionMatcher), hasEnded()); 128 waitForSpan(scanOperationSpanMatcher); 129 } 130}