001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded;
021import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
022import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasParentSpanId;
023import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode;
024import static org.hamcrest.MatcherAssert.assertThat;
025import static org.hamcrest.Matchers.allOf;
026import static org.hamcrest.Matchers.emptyIterable;
027import static org.hamcrest.Matchers.hasItem;
028import static org.hamcrest.Matchers.is;
029import static org.hamcrest.Matchers.not;
030import static org.hamcrest.Matchers.nullValue;
031import static org.hamcrest.Matchers.startsWith;
032
033import io.opentelemetry.api.trace.StatusCode;
034import io.opentelemetry.sdk.trace.data.SpanData;
035import java.io.IOException;
036import java.util.ArrayList;
037import java.util.List;
038import java.util.Objects;
039import java.util.concurrent.TimeUnit;
040import java.util.function.Consumer;
041import java.util.function.Supplier;
042import java.util.stream.Collectors;
043import java.util.stream.IntStream;
044import org.apache.hadoop.conf.Configuration;
045import org.apache.hadoop.hbase.ConnectionRule;
046import org.apache.hadoop.hbase.HBaseClassTestRule;
047import org.apache.hadoop.hbase.HBaseTestingUtility;
048import org.apache.hadoop.hbase.MatcherPredicate;
049import org.apache.hadoop.hbase.MiniClusterRule;
050import org.apache.hadoop.hbase.StartMiniClusterOption;
051import org.apache.hadoop.hbase.TableName;
052import org.apache.hadoop.hbase.Waiter;
053import org.apache.hadoop.hbase.client.trace.StringTraceRenderer;
054import org.apache.hadoop.hbase.testclassification.ClientTests;
055import org.apache.hadoop.hbase.testclassification.LargeTests;
056import org.apache.hadoop.hbase.trace.OpenTelemetryClassRule;
057import org.apache.hadoop.hbase.trace.OpenTelemetryTestRule;
058import org.apache.hadoop.hbase.trace.TraceUtil;
059import org.apache.hadoop.hbase.util.Bytes;
060import org.hamcrest.Matcher;
061import org.junit.Before;
062import org.junit.ClassRule;
063import org.junit.Rule;
064import org.junit.Test;
065import org.junit.experimental.categories.Category;
066import org.junit.rules.ExternalResource;
067import org.junit.rules.RuleChain;
068import org.junit.rules.TestName;
069import org.junit.rules.TestRule;
070import org.slf4j.Logger;
071import org.slf4j.LoggerFactory;
072
073@Category({ LargeTests.class, ClientTests.class })
074public class TestResultScannerTracing {
075  private static final Logger LOG = LoggerFactory.getLogger(TestResultScannerTracing.class);
076
077  @ClassRule
078  public static final HBaseClassTestRule CLASS_RULE =
079    HBaseClassTestRule.forClass(TestResultScannerTracing.class);
080
081  private static final TableName TABLE_NAME =
082    TableName.valueOf(TestResultScannerTracing.class.getSimpleName());
083  private static final byte[] FAMILY = Bytes.toBytes("f");
084  private static final byte[] CQ = Bytes.toBytes("q");
085  private static final int COUNT = 1000;
086
087  private static final OpenTelemetryClassRule otelClassRule = OpenTelemetryClassRule.create();
088  private static final MiniClusterRule miniClusterRule = MiniClusterRule.newBuilder()
089    .setMiniClusterOption(StartMiniClusterOption.builder().numRegionServers(3).build()).build();
090
091  private static final ConnectionRule connectionRule =
092    ConnectionRule.createConnectionRule(miniClusterRule::createConnection);
093
094  private static final class Setup extends ExternalResource {
095
096    private Connection conn;
097
098    @Override
099    protected void before() throws Throwable {
100      final HBaseTestingUtility testUtil = miniClusterRule.getTestingUtility();
101      conn = testUtil.getConnection();
102
103      byte[][] splitKeys = new byte[8][];
104      for (int i = 111; i < 999; i += 111) {
105        splitKeys[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i));
106      }
107      testUtil.createTable(TABLE_NAME, FAMILY, splitKeys);
108      testUtil.waitTableAvailable(TABLE_NAME);
109      try (final Table table = conn.getTable(TABLE_NAME)) {
110        table.put(
111          IntStream.range(0, COUNT).mapToObj(i -> new Put(Bytes.toBytes(String.format("%03d", i)))
112            .addColumn(FAMILY, CQ, Bytes.toBytes(i))).collect(Collectors.toList()));
113      }
114    }
115
116    @Override
117    protected void after() {
118      try (Admin admin = conn.getAdmin()) {
119        if (!admin.tableExists(TABLE_NAME)) {
120          return;
121        }
122        admin.disableTable(TABLE_NAME);
123        admin.deleteTable(TABLE_NAME);
124      } catch (IOException e) {
125        throw new RuntimeException(e);
126      }
127    }
128  }
129
130  @ClassRule
131  public static final TestRule classRule = RuleChain.outerRule(otelClassRule)
132    .around(miniClusterRule).around(connectionRule).around(new Setup());
133
134  @Rule
135  public final OpenTelemetryTestRule otelTestRule = new OpenTelemetryTestRule(otelClassRule);
136
137  @Rule
138  public final TestName testName = new TestName();
139
140  @Before
141  public void before() throws Exception {
142    final Connection conn = connectionRule.getConnection();
143    try (final RegionLocator locator = conn.getRegionLocator(TABLE_NAME)) {
144      locator.clearRegionLocationCache();
145    }
146  }
147
148  private static void waitForSpan(final Matcher<SpanData> parentSpanMatcher) {
149    final Configuration conf = miniClusterRule.getTestingUtility().getConfiguration();
150    Waiter.waitFor(conf, TimeUnit.SECONDS.toMillis(5), new MatcherPredicate<>(
151      "Span for test failed to complete.", otelClassRule::getSpans, hasItem(parentSpanMatcher)));
152  }
153
154  private Scan buildDefaultScan() {
155    return new Scan().withStartRow(Bytes.toBytes(String.format("%03d", 1)))
156      .withStopRow(Bytes.toBytes(String.format("%03d", 998)));
157  }
158
159  private void assertDefaultScan(final Scan scan) {
160    assertThat(scan.isReversed(), is(false));
161    assertThat(scan.isAsyncPrefetch(), nullValue());
162  }
163
164  private Scan buildAsyncPrefetchScan() {
165    return new Scan().withStartRow(Bytes.toBytes(String.format("%03d", 1)))
166      .withStopRow(Bytes.toBytes(String.format("%03d", 998))).setAsyncPrefetch(true);
167  }
168
169  private void assertAsyncPrefetchScan(final Scan scan) {
170    assertThat(scan.isReversed(), is(false));
171    assertThat(scan.isAsyncPrefetch(), is(true));
172  }
173
174  private Scan buildReversedScan() {
175    return new Scan().withStartRow(Bytes.toBytes(String.format("%03d", 998)))
176      .withStopRow(Bytes.toBytes(String.format("%03d", 1))).setReversed(true);
177  }
178
179  private void assertReversedScan(final Scan scan) {
180    assertThat(scan.isReversed(), is(true));
181    assertThat(scan.isAsyncPrefetch(), nullValue());
182  }
183
184  private void doScan(final Supplier<Scan> spanSupplier, final Consumer<Scan> scanAssertions)
185    throws Exception {
186    final Connection conn = connectionRule.getConnection();
187    final Scan scan = spanSupplier.get();
188    scanAssertions.accept(scan);
189    try (final Table table = conn.getTable(TABLE_NAME);
190      final ResultScanner scanner = table.getScanner(scan)) {
191      final List<Result> results = new ArrayList<>(COUNT);
192      scanner.forEach(results::add);
193      assertThat(results, not(emptyIterable()));
194    }
195  }
196
197  @Test
198  public void testNormalScan() throws Exception {
199    TraceUtil.trace(() -> doScan(this::buildDefaultScan, this::assertDefaultScan),
200      testName.getMethodName());
201
202    final String parentSpanName = testName.getMethodName();
203    final Matcher<SpanData> parentSpanMatcher =
204      allOf(hasName(parentSpanName), hasStatusWithCode(StatusCode.OK), hasEnded());
205    waitForSpan(parentSpanMatcher);
206
207    final List<SpanData> spans =
208      otelClassRule.getSpans().stream().filter(Objects::nonNull).collect(Collectors.toList());
209    if (LOG.isDebugEnabled()) {
210      StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans);
211      stringTraceRenderer.render(LOG::debug);
212    }
213
214    final String parentSpanId = spans.stream().filter(parentSpanMatcher::matches)
215      .map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new);
216
217    final Matcher<SpanData> scanOperationSpanMatcher =
218      allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())),
219        hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.OK), hasEnded());
220    assertThat(spans, hasItem(scanOperationSpanMatcher));
221    final String scanOperationSpanId = spans.stream().filter(scanOperationSpanMatcher::matches)
222      .map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new);
223
224    final Matcher<SpanData> childMetaScanSpanMatcher = allOf(hasName(startsWith("SCAN hbase:meta")),
225      hasParentSpanId(scanOperationSpanId), hasStatusWithCode(StatusCode.OK), hasEnded());
226    assertThat("expected a scan of hbase:meta", spans, hasItem(childMetaScanSpanMatcher));
227  }
228
229  @Test
230  public void testAsyncPrefetchScan() throws Exception {
231    TraceUtil.trace(() -> doScan(this::buildAsyncPrefetchScan, this::assertAsyncPrefetchScan),
232      testName.getMethodName());
233
234    final String parentSpanName = testName.getMethodName();
235    final Matcher<SpanData> parentSpanMatcher =
236      allOf(hasName(parentSpanName), hasStatusWithCode(StatusCode.OK), hasEnded());
237    waitForSpan(parentSpanMatcher);
238
239    final List<SpanData> spans =
240      otelClassRule.getSpans().stream().filter(Objects::nonNull).collect(Collectors.toList());
241    if (LOG.isDebugEnabled()) {
242      StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans);
243      stringTraceRenderer.render(LOG::debug);
244    }
245
246    final String parentSpanId = spans.stream().filter(parentSpanMatcher::matches)
247      .map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new);
248
249    final Matcher<SpanData> scanOperationSpanMatcher =
250      allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())),
251        hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.OK), hasEnded());
252    assertThat(spans, hasItem(scanOperationSpanMatcher));
253    final String scanOperationSpanId = spans.stream().filter(scanOperationSpanMatcher::matches)
254      .map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new);
255
256    final Matcher<SpanData> childMetaScanSpanMatcher = allOf(hasName(startsWith("SCAN hbase:meta")),
257      hasParentSpanId(scanOperationSpanId), hasStatusWithCode(StatusCode.OK), hasEnded());
258    assertThat("expected a scan of hbase:meta", spans, hasItem(childMetaScanSpanMatcher));
259  }
260
261  @Test
262  public void testReversedScan() throws Exception {
263    TraceUtil.trace(() -> doScan(this::buildReversedScan, this::assertReversedScan),
264      testName.getMethodName());
265
266    final String parentSpanName = testName.getMethodName();
267    final Matcher<SpanData> parentSpanMatcher =
268      allOf(hasName(parentSpanName), hasStatusWithCode(StatusCode.OK), hasEnded());
269    waitForSpan(parentSpanMatcher);
270
271    final List<SpanData> spans =
272      otelClassRule.getSpans().stream().filter(Objects::nonNull).collect(Collectors.toList());
273    if (LOG.isDebugEnabled()) {
274      StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans);
275      stringTraceRenderer.render(LOG::debug);
276    }
277
278    final String parentSpanId = spans.stream().filter(parentSpanMatcher::matches)
279      .map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new);
280
281    final Matcher<SpanData> scanOperationSpanMatcher =
282      allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())),
283        hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.OK), hasEnded());
284    assertThat(spans, hasItem(scanOperationSpanMatcher));
285    final String scanOperationSpanId = spans.stream().filter(scanOperationSpanMatcher::matches)
286      .map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new);
287
288    final Matcher<SpanData> childMetaScanSpanMatcher = allOf(hasName(startsWith("SCAN hbase:meta")),
289      hasParentSpanId(scanOperationSpanId), hasStatusWithCode(StatusCode.OK), hasEnded());
290    assertThat("expected a scan of hbase:meta", spans, hasItem(childMetaScanSpanMatcher));
291  }
292}