001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.trace.hamcrest.AttributesMatchers.containsEntry;
021import static org.hamcrest.MatcherAssert.assertThat;
022import static org.hamcrest.Matchers.allOf;
023import static org.hamcrest.Matchers.anyOf;
024import static org.hamcrest.Matchers.containsString;
025import static org.hamcrest.Matchers.endsWith;
026import static org.hamcrest.Matchers.hasItem;
027import static org.hamcrest.Matchers.hasProperty;
028import static org.hamcrest.Matchers.is;
029import static org.hamcrest.Matchers.isA;
030import static org.junit.Assert.assertEquals;
031import static org.junit.Assert.assertThrows;
032import static org.junit.Assert.fail;
033
034import io.opentelemetry.sdk.trace.data.SpanData;
035import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
036import java.io.IOException;
037import java.io.UncheckedIOException;
038import java.util.Arrays;
039import java.util.List;
040import java.util.Objects;
041import java.util.concurrent.ExecutionException;
042import java.util.concurrent.ForkJoinPool;
043import java.util.concurrent.TimeUnit;
044import java.util.function.Supplier;
045import java.util.stream.Collectors;
046import java.util.stream.IntStream;
047import java.util.stream.Stream;
048import org.apache.hadoop.conf.Configuration;
049import org.apache.hadoop.hbase.ConnectionRule;
050import org.apache.hadoop.hbase.HBaseTestingUtil;
051import org.apache.hadoop.hbase.MatcherPredicate;
052import org.apache.hadoop.hbase.MiniClusterRule;
053import org.apache.hadoop.hbase.StartTestingClusterOption;
054import org.apache.hadoop.hbase.TableName;
055import org.apache.hadoop.hbase.Waiter;
056import org.apache.hadoop.hbase.ipc.RemoteWithExtrasException;
057import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
058import org.apache.hadoop.hbase.trace.OpenTelemetryClassRule;
059import org.apache.hadoop.hbase.trace.OpenTelemetryTestRule;
060import org.apache.hadoop.hbase.trace.TraceUtil;
061import org.apache.hadoop.hbase.util.Bytes;
062import org.apache.hadoop.hbase.util.JVMClusterUtil;
063import org.apache.hadoop.hbase.util.Pair;
064import org.hamcrest.Matcher;
065import org.junit.ClassRule;
066import org.junit.Rule;
067import org.junit.Test;
068import org.junit.rules.ExternalResource;
069import org.junit.rules.RuleChain;
070import org.junit.rules.TestName;
071import org.junit.rules.TestRule;
072
073public abstract class AbstractTestAsyncTableScan {
074
075  protected static final OpenTelemetryClassRule OTEL_CLASS_RULE = OpenTelemetryClassRule.create();
076  protected static final MiniClusterRule MINI_CLUSTER_RULE = MiniClusterRule.newBuilder()
077    .setMiniClusterOption(StartTestingClusterOption.builder().numWorkers(3).build()).build();
078
079  protected static final ConnectionRule CONN_RULE =
080    ConnectionRule.createAsyncConnectionRule(MINI_CLUSTER_RULE::createAsyncConnection);
081
082  private static final class Setup extends ExternalResource {
083    @Override
084    protected void before() throws Throwable {
085      final HBaseTestingUtil testingUtil = MINI_CLUSTER_RULE.getTestingUtility();
086      final AsyncConnection conn = CONN_RULE.getAsyncConnection();
087
088      byte[][] splitKeys = new byte[8][];
089      for (int i = 111; i < 999; i += 111) {
090        splitKeys[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i));
091      }
092      testingUtil.createTable(TABLE_NAME, FAMILY, splitKeys);
093      testingUtil.waitTableAvailable(TABLE_NAME);
094      conn.getTable(TABLE_NAME)
095        .putAll(IntStream.range(0, COUNT)
096          .mapToObj(i -> new Put(Bytes.toBytes(String.format("%03d", i)))
097            .addColumn(FAMILY, CQ1, Bytes.toBytes(i)).addColumn(FAMILY, CQ2, Bytes.toBytes(i * i)))
098          .collect(Collectors.toList()))
099        .get();
100    }
101  }
102
103  @ClassRule
104  public static final TestRule classRule = RuleChain.outerRule(OTEL_CLASS_RULE)
105    .around(MINI_CLUSTER_RULE).around(CONN_RULE).around(new Setup());
106
107  @Rule
108  public final OpenTelemetryTestRule otelTestRule = new OpenTelemetryTestRule(OTEL_CLASS_RULE);
109
110  @Rule
111  public final TestName testName = new TestName();
112
113  protected static TableName TABLE_NAME = TableName.valueOf("async");
114
115  protected static byte[] FAMILY = Bytes.toBytes("cf");
116
117  protected static byte[] CQ1 = Bytes.toBytes("cq1");
118
119  protected static byte[] CQ2 = Bytes.toBytes("cq2");
120
121  protected static int COUNT = 1000;
122
123  private static Scan createNormalScan() {
124    return new Scan();
125  }
126
127  private static Scan createBatchScan() {
128    return new Scan().setBatch(1);
129  }
130
131  // set a small result size for testing flow control
132  private static Scan createSmallResultSizeScan() {
133    return new Scan().setMaxResultSize(1);
134  }
135
136  private static Scan createBatchSmallResultSizeScan() {
137    return new Scan().setBatch(1).setMaxResultSize(1);
138  }
139
140  private static AsyncTable<?> getRawTable() {
141    return CONN_RULE.getAsyncConnection().getTable(TABLE_NAME);
142  }
143
144  private static AsyncTable<?> getTable() {
145    return CONN_RULE.getAsyncConnection().getTable(TABLE_NAME, ForkJoinPool.commonPool());
146  }
147
148  private static List<Pair<String, Supplier<Scan>>> getScanCreator() {
149    return Arrays.asList(Pair.newPair("normal", AbstractTestAsyncTableScan::createNormalScan),
150      Pair.newPair("batch", AbstractTestAsyncTableScan::createBatchScan),
151      Pair.newPair("smallResultSize", AbstractTestAsyncTableScan::createSmallResultSizeScan),
152      Pair.newPair("batchSmallResultSize",
153        AbstractTestAsyncTableScan::createBatchSmallResultSizeScan));
154  }
155
156  protected static List<Object[]> getScanCreatorParams() {
157    return getScanCreator().stream().map(p -> new Object[] { p.getFirst(), p.getSecond() })
158      .collect(Collectors.toList());
159  }
160
161  private static List<Pair<String, Supplier<AsyncTable<?>>>> getTableCreator() {
162    return Arrays.asList(Pair.newPair("raw", AbstractTestAsyncTableScan::getRawTable),
163      Pair.newPair("normal", AbstractTestAsyncTableScan::getTable));
164  }
165
166  protected static List<Object[]> getTableAndScanCreatorParams() {
167    List<Pair<String, Supplier<AsyncTable<?>>>> tableCreator = getTableCreator();
168    List<Pair<String, Supplier<Scan>>> scanCreator = getScanCreator();
169    return tableCreator.stream()
170      .flatMap(tp -> scanCreator.stream()
171        .map(sp -> new Object[] { tp.getFirst(), tp.getSecond(), sp.getFirst(), sp.getSecond() }))
172      .collect(Collectors.toList());
173  }
174
175  protected abstract Scan createScan();
176
177  protected abstract List<Result> doScan(Scan scan, int closeAfter) throws Exception;
178
179  /**
180   * Used by implementation classes to assert the correctness of spans produced under test.
181   */
182  protected abstract void assertTraceContinuity();
183
184  /**
185   * Used by implementation classes to assert the correctness of spans having errors.
186   */
187  protected abstract void
188    assertTraceError(final Matcher<io.opentelemetry.api.common.Attributes> exceptionMatcher);
189
190  protected final List<Result> convertFromBatchResult(List<Result> results) {
191    assertEquals(0, results.size() % 2);
192    return IntStream.range(0, results.size() / 2).mapToObj(i -> {
193      try {
194        return Result
195          .createCompleteResult(Arrays.asList(results.get(2 * i), results.get(2 * i + 1)));
196      } catch (IOException e) {
197        throw new UncheckedIOException(e);
198      }
199    }).collect(Collectors.toList());
200  }
201
202  protected static void waitForSpan(final Matcher<SpanData> parentSpanMatcher) {
203    final Configuration conf = MINI_CLUSTER_RULE.getTestingUtility().getConfiguration();
204    Waiter.waitFor(conf, TimeUnit.SECONDS.toMillis(5), new MatcherPredicate<>(
205      "Span for test failed to complete.", OTEL_CLASS_RULE::getSpans, hasItem(parentSpanMatcher)));
206  }
207
208  protected static Stream<SpanData> spanStream() {
209    return OTEL_CLASS_RULE.getSpans().stream().filter(Objects::nonNull);
210  }
211
212  @Test
213  public void testScanAll() throws Exception {
214    List<Result> results = doScan(createScan(), -1);
215    // make sure all scanners are closed at RS side
216    MINI_CLUSTER_RULE.getTestingUtility().getHBaseCluster().getRegionServerThreads().stream()
217      .map(JVMClusterUtil.RegionServerThread::getRegionServer).forEach(
218        rs -> assertEquals(
219          "The scanner count of " + rs.getServerName() + " is "
220            + rs.getRSRpcServices().getScannersCount(),
221          0, rs.getRSRpcServices().getScannersCount()));
222    assertEquals(COUNT, results.size());
223    IntStream.range(0, COUNT).forEach(i -> {
224      Result result = results.get(i);
225      assertEquals(String.format("%03d", i), Bytes.toString(result.getRow()));
226      assertEquals(i, Bytes.toInt(result.getValue(FAMILY, CQ1)));
227    });
228  }
229
230  private void assertResultEquals(Result result, int i) {
231    assertEquals(String.format("%03d", i), Bytes.toString(result.getRow()));
232    assertEquals(i, Bytes.toInt(result.getValue(FAMILY, CQ1)));
233    assertEquals(i * i, Bytes.toInt(result.getValue(FAMILY, CQ2)));
234  }
235
236  @Test
237  public void testReversedScanAll() throws Exception {
238    List<Result> results =
239      TraceUtil.trace(() -> doScan(createScan().setReversed(true), -1), testName.getMethodName());
240    assertEquals(COUNT, results.size());
241    IntStream.range(0, COUNT).forEach(i -> assertResultEquals(results.get(i), COUNT - i - 1));
242    assertTraceContinuity();
243  }
244
245  @Test
246  public void testScanNoStopKey() throws Exception {
247    int start = 345;
248    List<Result> results = TraceUtil.trace(
249      () -> doScan(createScan().withStartRow(Bytes.toBytes(String.format("%03d", start))), -1),
250      testName.getMethodName());
251    assertEquals(COUNT - start, results.size());
252    IntStream.range(0, COUNT - start).forEach(i -> assertResultEquals(results.get(i), start + i));
253    assertTraceContinuity();
254  }
255
256  @Test
257  public void testReverseScanNoStopKey() throws Exception {
258    int start = 765;
259    final Scan scan =
260      createScan().withStartRow(Bytes.toBytes(String.format("%03d", start))).setReversed(true);
261    List<Result> results = TraceUtil.trace(() -> doScan(scan, -1), testName.getMethodName());
262    assertEquals(start + 1, results.size());
263    IntStream.range(0, start + 1).forEach(i -> assertResultEquals(results.get(i), start - i));
264    assertTraceContinuity();
265  }
266
267  @Test
268  public void testScanWrongColumnFamily() {
269    final Exception e = assertThrows(Exception.class,
270      () -> TraceUtil.trace(
271        () -> doScan(createScan().addFamily(Bytes.toBytes("WrongColumnFamily")), -1),
272        testName.getMethodName()));
273    // hamcrest generic enforcement for `anyOf` is a pain; skip it
274    // but -- don't we always unwrap ExecutionExceptions -- bug?
275    if (e instanceof NoSuchColumnFamilyException) {
276      final NoSuchColumnFamilyException ex = (NoSuchColumnFamilyException) e;
277      assertThat(ex, isA(NoSuchColumnFamilyException.class));
278    } else if (e instanceof ExecutionException) {
279      final ExecutionException ex = (ExecutionException) e;
280      assertThat(ex, allOf(isA(ExecutionException.class),
281        hasProperty("cause", isA(NoSuchColumnFamilyException.class))));
282    } else {
283      fail("Found unexpected Exception " + e);
284    }
285    assertTraceError(anyOf(
286      containsEntry(is(SemanticAttributes.EXCEPTION_TYPE),
287        endsWith(NoSuchColumnFamilyException.class.getName())),
288      allOf(
289        containsEntry(is(SemanticAttributes.EXCEPTION_TYPE),
290          endsWith(RemoteWithExtrasException.class.getName())),
291        containsEntry(is(SemanticAttributes.EXCEPTION_MESSAGE),
292          containsString(NoSuchColumnFamilyException.class.getName())))));
293  }
294
295  private void testScan(int start, boolean startInclusive, int stop, boolean stopInclusive,
296    int limit) throws Exception {
297    testScan(start, startInclusive, stop, stopInclusive, limit, -1);
298  }
299
300  private void testScan(int start, boolean startInclusive, int stop, boolean stopInclusive,
301    int limit, int closeAfter) throws Exception {
302    Scan scan =
303      createScan().withStartRow(Bytes.toBytes(String.format("%03d", start)), startInclusive)
304        .withStopRow(Bytes.toBytes(String.format("%03d", stop)), stopInclusive);
305    if (limit > 0) {
306      scan.setLimit(limit);
307    }
308    List<Result> results = doScan(scan, closeAfter);
309    int actualStart = startInclusive ? start : start + 1;
310    int actualStop = stopInclusive ? stop + 1 : stop;
311    int count = actualStop - actualStart;
312    if (limit > 0) {
313      count = Math.min(count, limit);
314    }
315    if (closeAfter > 0) {
316      count = Math.min(count, closeAfter);
317    }
318    assertEquals(count, results.size());
319    IntStream.range(0, count).forEach(i -> assertResultEquals(results.get(i), actualStart + i));
320  }
321
322  private void testReversedScan(int start, boolean startInclusive, int stop, boolean stopInclusive,
323    int limit) throws Exception {
324    Scan scan =
325      createScan().withStartRow(Bytes.toBytes(String.format("%03d", start)), startInclusive)
326        .withStopRow(Bytes.toBytes(String.format("%03d", stop)), stopInclusive).setReversed(true);
327    if (limit > 0) {
328      scan.setLimit(limit);
329    }
330    List<Result> results = doScan(scan, -1);
331    int actualStart = startInclusive ? start : start - 1;
332    int actualStop = stopInclusive ? stop - 1 : stop;
333    int count = actualStart - actualStop;
334    if (limit > 0) {
335      count = Math.min(count, limit);
336    }
337    assertEquals(count, results.size());
338    IntStream.range(0, count).forEach(i -> assertResultEquals(results.get(i), actualStart - i));
339  }
340
341  @Test
342  public void testScanWithStartKeyAndStopKey() throws Exception {
343    testScan(1, true, 998, false, -1); // from first region to last region
344    testScan(123, true, 345, true, -1);
345    testScan(234, true, 456, false, -1);
346    testScan(345, false, 567, true, -1);
347    testScan(456, false, 678, false, -1);
348  }
349
350  @Test
351  public void testReversedScanWithStartKeyAndStopKey() throws Exception {
352    testReversedScan(998, true, 1, false, -1); // from last region to first region
353    testReversedScan(543, true, 321, true, -1);
354    testReversedScan(654, true, 432, false, -1);
355    testReversedScan(765, false, 543, true, -1);
356    testReversedScan(876, false, 654, false, -1);
357  }
358
359  @Test
360  public void testScanAtRegionBoundary() throws Exception {
361    testScan(222, true, 333, true, -1);
362    testScan(333, true, 444, false, -1);
363    testScan(444, false, 555, true, -1);
364    testScan(555, false, 666, false, -1);
365  }
366
367  @Test
368  public void testReversedScanAtRegionBoundary() throws Exception {
369    testReversedScan(333, true, 222, true, -1);
370    testReversedScan(444, true, 333, false, -1);
371    testReversedScan(555, false, 444, true, -1);
372    testReversedScan(666, false, 555, false, -1);
373  }
374
375  @Test
376  public void testScanWithLimit() throws Exception {
377    testScan(1, true, 998, false, 900); // from first region to last region
378    testScan(123, true, 234, true, 100);
379    testScan(234, true, 456, false, 100);
380    testScan(345, false, 567, true, 100);
381    testScan(456, false, 678, false, 100);
382  }
383
384  @Test
385  public void testScanWithLimitGreaterThanActualCount() throws Exception {
386    testScan(1, true, 998, false, 1000); // from first region to last region
387    testScan(123, true, 345, true, 200);
388    testScan(234, true, 456, false, 200);
389    testScan(345, false, 567, true, 200);
390    testScan(456, false, 678, false, 200);
391  }
392
393  @Test
394  public void testReversedScanWithLimit() throws Exception {
395    testReversedScan(998, true, 1, false, 900); // from last region to first region
396    testReversedScan(543, true, 321, true, 100);
397    testReversedScan(654, true, 432, false, 100);
398    testReversedScan(765, false, 543, true, 100);
399    testReversedScan(876, false, 654, false, 100);
400  }
401
402  @Test
403  public void testReversedScanWithLimitGreaterThanActualCount() throws Exception {
404    testReversedScan(998, true, 1, false, 1000); // from last region to first region
405    testReversedScan(543, true, 321, true, 200);
406    testReversedScan(654, true, 432, false, 200);
407    testReversedScan(765, false, 543, true, 200);
408    testReversedScan(876, false, 654, false, 200);
409  }
410
411  @Test
412  public void testScanEndingEarly() throws Exception {
413    testScan(1, true, 998, false, 0, 900); // from first region to last region
414    testScan(123, true, 234, true, 0, 100);
415    testScan(234, true, 456, false, 0, 100);
416    testScan(345, false, 567, true, 0, 100);
417    testScan(456, false, 678, false, 0, 100);
418  }
419}