001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded; 021import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName; 022import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasParentSpanId; 023import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode; 024import static org.hamcrest.MatcherAssert.assertThat; 025import static org.hamcrest.Matchers.allOf; 026import static org.hamcrest.Matchers.emptyIterable; 027import static org.hamcrest.Matchers.hasItem; 028import static org.hamcrest.Matchers.is; 029import static org.hamcrest.Matchers.not; 030import static org.hamcrest.Matchers.nullValue; 031import static org.hamcrest.Matchers.startsWith; 032 033import io.opentelemetry.api.trace.StatusCode; 034import io.opentelemetry.sdk.trace.data.SpanData; 035import java.io.IOException; 036import java.util.ArrayList; 037import java.util.List; 038import java.util.Objects; 039import java.util.concurrent.TimeUnit; 040import java.util.function.Consumer; 041import java.util.function.Supplier; 042import java.util.stream.Collectors; 043import java.util.stream.IntStream; 044import org.apache.hadoop.conf.Configuration; 045import org.apache.hadoop.hbase.ConnectionRule; 046import org.apache.hadoop.hbase.HBaseClassTestRule; 047import org.apache.hadoop.hbase.HBaseTestingUtility; 048import org.apache.hadoop.hbase.MatcherPredicate; 049import org.apache.hadoop.hbase.MiniClusterRule; 050import org.apache.hadoop.hbase.StartMiniClusterOption; 051import org.apache.hadoop.hbase.TableName; 052import org.apache.hadoop.hbase.Waiter; 053import org.apache.hadoop.hbase.client.trace.StringTraceRenderer; 054import org.apache.hadoop.hbase.testclassification.ClientTests; 055import org.apache.hadoop.hbase.testclassification.LargeTests; 056import org.apache.hadoop.hbase.trace.OpenTelemetryClassRule; 057import org.apache.hadoop.hbase.trace.OpenTelemetryTestRule; 058import org.apache.hadoop.hbase.trace.TraceUtil; 059import org.apache.hadoop.hbase.util.Bytes; 060import org.hamcrest.Matcher; 061import org.junit.Before; 062import org.junit.ClassRule; 063import org.junit.Rule; 064import org.junit.Test; 065import org.junit.experimental.categories.Category; 066import org.junit.rules.ExternalResource; 067import org.junit.rules.RuleChain; 068import org.junit.rules.TestName; 069import org.junit.rules.TestRule; 070import org.slf4j.Logger; 071import org.slf4j.LoggerFactory; 072 073@Category({ LargeTests.class, ClientTests.class }) 074public class TestResultScannerTracing { 075 private static final Logger LOG = LoggerFactory.getLogger(TestResultScannerTracing.class); 076 077 @ClassRule 078 public static final HBaseClassTestRule CLASS_RULE = 079 HBaseClassTestRule.forClass(TestResultScannerTracing.class); 080 081 private static final TableName TABLE_NAME = 082 TableName.valueOf(TestResultScannerTracing.class.getSimpleName()); 083 private static final byte[] FAMILY = Bytes.toBytes("f"); 084 private static final byte[] CQ = Bytes.toBytes("q"); 085 private static final int COUNT = 1000; 086 087 private static final OpenTelemetryClassRule otelClassRule = OpenTelemetryClassRule.create(); 088 private static final MiniClusterRule miniClusterRule = MiniClusterRule.newBuilder() 089 .setMiniClusterOption(StartMiniClusterOption.builder().numRegionServers(3).build()).build(); 090 091 private static final ConnectionRule connectionRule = 092 ConnectionRule.createConnectionRule(miniClusterRule::createConnection); 093 094 private static final class Setup extends ExternalResource { 095 096 private Connection conn; 097 098 @Override 099 protected void before() throws Throwable { 100 final HBaseTestingUtility testUtil = miniClusterRule.getTestingUtility(); 101 conn = testUtil.getConnection(); 102 103 byte[][] splitKeys = new byte[8][]; 104 for (int i = 111; i < 999; i += 111) { 105 splitKeys[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i)); 106 } 107 testUtil.createTable(TABLE_NAME, FAMILY, splitKeys); 108 testUtil.waitTableAvailable(TABLE_NAME); 109 try (final Table table = conn.getTable(TABLE_NAME)) { 110 table.put( 111 IntStream.range(0, COUNT).mapToObj(i -> new Put(Bytes.toBytes(String.format("%03d", i))) 112 .addColumn(FAMILY, CQ, Bytes.toBytes(i))).collect(Collectors.toList())); 113 } 114 } 115 116 @Override 117 protected void after() { 118 try (Admin admin = conn.getAdmin()) { 119 if (!admin.tableExists(TABLE_NAME)) { 120 return; 121 } 122 admin.disableTable(TABLE_NAME); 123 admin.deleteTable(TABLE_NAME); 124 } catch (IOException e) { 125 throw new RuntimeException(e); 126 } 127 } 128 } 129 130 @ClassRule 131 public static final TestRule classRule = RuleChain.outerRule(otelClassRule) 132 .around(miniClusterRule).around(connectionRule).around(new Setup()); 133 134 @Rule 135 public final OpenTelemetryTestRule otelTestRule = new OpenTelemetryTestRule(otelClassRule); 136 137 @Rule 138 public final TestName testName = new TestName(); 139 140 @Before 141 public void before() throws Exception { 142 final Connection conn = connectionRule.getConnection(); 143 try (final RegionLocator locator = conn.getRegionLocator(TABLE_NAME)) { 144 locator.clearRegionLocationCache(); 145 } 146 } 147 148 private static void waitForSpan(final Matcher<SpanData> parentSpanMatcher) { 149 final Configuration conf = miniClusterRule.getTestingUtility().getConfiguration(); 150 Waiter.waitFor(conf, TimeUnit.SECONDS.toMillis(5), new MatcherPredicate<>( 151 "Span for test failed to complete.", otelClassRule::getSpans, hasItem(parentSpanMatcher))); 152 } 153 154 private Scan buildDefaultScan() { 155 return new Scan().withStartRow(Bytes.toBytes(String.format("%03d", 1))) 156 .withStopRow(Bytes.toBytes(String.format("%03d", 998))); 157 } 158 159 private void assertDefaultScan(final Scan scan) { 160 assertThat(scan.isReversed(), is(false)); 161 assertThat(scan.isAsyncPrefetch(), nullValue()); 162 } 163 164 private Scan buildAsyncPrefetchScan() { 165 return new Scan().withStartRow(Bytes.toBytes(String.format("%03d", 1))) 166 .withStopRow(Bytes.toBytes(String.format("%03d", 998))).setAsyncPrefetch(true); 167 } 168 169 private void assertAsyncPrefetchScan(final Scan scan) { 170 assertThat(scan.isReversed(), is(false)); 171 assertThat(scan.isAsyncPrefetch(), is(true)); 172 } 173 174 private Scan buildReversedScan() { 175 return new Scan().withStartRow(Bytes.toBytes(String.format("%03d", 998))) 176 .withStopRow(Bytes.toBytes(String.format("%03d", 1))).setReversed(true); 177 } 178 179 private void assertReversedScan(final Scan scan) { 180 assertThat(scan.isReversed(), is(true)); 181 assertThat(scan.isAsyncPrefetch(), nullValue()); 182 } 183 184 private void doScan(final Supplier<Scan> spanSupplier, final Consumer<Scan> scanAssertions) 185 throws Exception { 186 final Connection conn = connectionRule.getConnection(); 187 final Scan scan = spanSupplier.get(); 188 scanAssertions.accept(scan); 189 try (final Table table = conn.getTable(TABLE_NAME); 190 final ResultScanner scanner = table.getScanner(scan)) { 191 final List<Result> results = new ArrayList<>(COUNT); 192 scanner.forEach(results::add); 193 assertThat(results, not(emptyIterable())); 194 } 195 } 196 197 @Test 198 public void testNormalScan() throws Exception { 199 TraceUtil.trace(() -> doScan(this::buildDefaultScan, this::assertDefaultScan), 200 testName.getMethodName()); 201 202 final String parentSpanName = testName.getMethodName(); 203 final Matcher<SpanData> parentSpanMatcher = 204 allOf(hasName(parentSpanName), hasStatusWithCode(StatusCode.OK), hasEnded()); 205 waitForSpan(parentSpanMatcher); 206 207 final List<SpanData> spans = 208 otelClassRule.getSpans().stream().filter(Objects::nonNull).collect(Collectors.toList()); 209 if (LOG.isDebugEnabled()) { 210 StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans); 211 stringTraceRenderer.render(LOG::debug); 212 } 213 214 final String parentSpanId = spans.stream().filter(parentSpanMatcher::matches) 215 .map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new); 216 217 final Matcher<SpanData> scanOperationSpanMatcher = 218 allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())), 219 hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.OK), hasEnded()); 220 assertThat(spans, hasItem(scanOperationSpanMatcher)); 221 final String scanOperationSpanId = spans.stream().filter(scanOperationSpanMatcher::matches) 222 .map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new); 223 224 final Matcher<SpanData> childMetaScanSpanMatcher = allOf(hasName(startsWith("SCAN hbase:meta")), 225 hasParentSpanId(scanOperationSpanId), hasStatusWithCode(StatusCode.OK), hasEnded()); 226 assertThat("expected a scan of hbase:meta", spans, hasItem(childMetaScanSpanMatcher)); 227 } 228 229 @Test 230 public void testAsyncPrefetchScan() throws Exception { 231 TraceUtil.trace(() -> doScan(this::buildAsyncPrefetchScan, this::assertAsyncPrefetchScan), 232 testName.getMethodName()); 233 234 final String parentSpanName = testName.getMethodName(); 235 final Matcher<SpanData> parentSpanMatcher = 236 allOf(hasName(parentSpanName), hasStatusWithCode(StatusCode.OK), hasEnded()); 237 waitForSpan(parentSpanMatcher); 238 239 final List<SpanData> spans = 240 otelClassRule.getSpans().stream().filter(Objects::nonNull).collect(Collectors.toList()); 241 if (LOG.isDebugEnabled()) { 242 StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans); 243 stringTraceRenderer.render(LOG::debug); 244 } 245 246 final String parentSpanId = spans.stream().filter(parentSpanMatcher::matches) 247 .map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new); 248 249 final Matcher<SpanData> scanOperationSpanMatcher = 250 allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())), 251 hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.OK), hasEnded()); 252 assertThat(spans, hasItem(scanOperationSpanMatcher)); 253 final String scanOperationSpanId = spans.stream().filter(scanOperationSpanMatcher::matches) 254 .map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new); 255 256 final Matcher<SpanData> childMetaScanSpanMatcher = allOf(hasName(startsWith("SCAN hbase:meta")), 257 hasParentSpanId(scanOperationSpanId), hasStatusWithCode(StatusCode.OK), hasEnded()); 258 assertThat("expected a scan of hbase:meta", spans, hasItem(childMetaScanSpanMatcher)); 259 } 260 261 @Test 262 public void testReversedScan() throws Exception { 263 TraceUtil.trace(() -> doScan(this::buildReversedScan, this::assertReversedScan), 264 testName.getMethodName()); 265 266 final String parentSpanName = testName.getMethodName(); 267 final Matcher<SpanData> parentSpanMatcher = 268 allOf(hasName(parentSpanName), hasStatusWithCode(StatusCode.OK), hasEnded()); 269 waitForSpan(parentSpanMatcher); 270 271 final List<SpanData> spans = 272 otelClassRule.getSpans().stream().filter(Objects::nonNull).collect(Collectors.toList()); 273 if (LOG.isDebugEnabled()) { 274 StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans); 275 stringTraceRenderer.render(LOG::debug); 276 } 277 278 final String parentSpanId = spans.stream().filter(parentSpanMatcher::matches) 279 .map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new); 280 281 final Matcher<SpanData> scanOperationSpanMatcher = 282 allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())), 283 hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.OK), hasEnded()); 284 assertThat(spans, hasItem(scanOperationSpanMatcher)); 285 final String scanOperationSpanId = spans.stream().filter(scanOperationSpanMatcher::matches) 286 .map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new); 287 288 final Matcher<SpanData> childMetaScanSpanMatcher = allOf(hasName(startsWith("SCAN hbase:meta")), 289 hasParentSpanId(scanOperationSpanId), hasStatusWithCode(StatusCode.OK), hasEnded()); 290 assertThat("expected a scan of hbase:meta", spans, hasItem(childMetaScanSpanMatcher)); 291 } 292}