001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.trace.hamcrest.AttributesMatchers.containsEntry;
021import static org.hamcrest.MatcherAssert.assertThat;
022import static org.hamcrest.Matchers.allOf;
023import static org.hamcrest.Matchers.anyOf;
024import static org.hamcrest.Matchers.containsString;
025import static org.hamcrest.Matchers.endsWith;
026import static org.hamcrest.Matchers.hasItem;
027import static org.hamcrest.Matchers.hasProperty;
028import static org.hamcrest.Matchers.is;
029import static org.hamcrest.Matchers.isA;
030import static org.junit.Assert.assertEquals;
031import static org.junit.Assert.assertThrows;
032import static org.junit.Assert.fail;
033
034import io.opentelemetry.sdk.trace.data.SpanData;
035import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
036import java.io.IOException;
037import java.io.UncheckedIOException;
038import java.util.Arrays;
039import java.util.List;
040import java.util.concurrent.ExecutionException;
041import java.util.concurrent.ForkJoinPool;
042import java.util.concurrent.TimeUnit;
043import java.util.function.Supplier;
044import java.util.stream.Collectors;
045import java.util.stream.IntStream;
046import org.apache.hadoop.conf.Configuration;
047import org.apache.hadoop.hbase.ConnectionRule;
048import org.apache.hadoop.hbase.HBaseTestingUtility;
049import org.apache.hadoop.hbase.MatcherPredicate;
050import org.apache.hadoop.hbase.MiniClusterRule;
051import org.apache.hadoop.hbase.StartMiniClusterOption;
052import org.apache.hadoop.hbase.TableName;
053import org.apache.hadoop.hbase.Waiter;
054import org.apache.hadoop.hbase.ipc.RemoteWithExtrasException;
055import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
056import org.apache.hadoop.hbase.trace.OpenTelemetryClassRule;
057import org.apache.hadoop.hbase.trace.OpenTelemetryTestRule;
058import org.apache.hadoop.hbase.trace.TraceUtil;
059import org.apache.hadoop.hbase.util.Bytes;
060import org.apache.hadoop.hbase.util.JVMClusterUtil;
061import org.apache.hadoop.hbase.util.Pair;
062import org.hamcrest.Matcher;
063import org.junit.ClassRule;
064import org.junit.Rule;
065import org.junit.Test;
066import org.junit.rules.ExternalResource;
067import org.junit.rules.RuleChain;
068import org.junit.rules.TestName;
069import org.junit.rules.TestRule;
070
071public abstract class AbstractTestAsyncTableScan {
072
073  protected static final OpenTelemetryClassRule otelClassRule = OpenTelemetryClassRule.create();
074  protected static final MiniClusterRule miniClusterRule = MiniClusterRule.newBuilder()
075    .setMiniClusterOption(StartMiniClusterOption.builder().numWorkers(3).build()).build();
076
077  protected static final ConnectionRule connectionRule =
078    ConnectionRule.createAsyncConnectionRule(miniClusterRule::createAsyncConnection);
079
080  private static final class Setup extends ExternalResource {
081    @Override
082    protected void before() throws Throwable {
083      final HBaseTestingUtility testingUtil = miniClusterRule.getTestingUtility();
084      final AsyncConnection conn = connectionRule.getAsyncConnection();
085
086      byte[][] splitKeys = new byte[8][];
087      for (int i = 111; i < 999; i += 111) {
088        splitKeys[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i));
089      }
090      testingUtil.createTable(TABLE_NAME, FAMILY, splitKeys);
091      testingUtil.waitTableAvailable(TABLE_NAME);
092      conn.getTable(TABLE_NAME)
093        .putAll(IntStream.range(0, COUNT)
094          .mapToObj(i -> new Put(Bytes.toBytes(String.format("%03d", i)))
095            .addColumn(FAMILY, CQ1, Bytes.toBytes(i)).addColumn(FAMILY, CQ2, Bytes.toBytes(i * i)))
096          .collect(Collectors.toList()))
097        .get();
098    }
099  }
100
101  @ClassRule
102  public static final TestRule classRule = RuleChain.outerRule(otelClassRule)
103    .around(miniClusterRule).around(connectionRule).around(new Setup());
104
105  @Rule
106  public final OpenTelemetryTestRule otelTestRule = new OpenTelemetryTestRule(otelClassRule);
107
108  @Rule
109  public final TestName testName = new TestName();
110
111  protected static TableName TABLE_NAME = TableName.valueOf("async");
112
113  protected static byte[] FAMILY = Bytes.toBytes("cf");
114
115  protected static byte[] CQ1 = Bytes.toBytes("cq1");
116
117  protected static byte[] CQ2 = Bytes.toBytes("cq2");
118
119  protected static int COUNT = 1000;
120
121  private static Scan createNormalScan() {
122    return new Scan();
123  }
124
125  private static Scan createBatchScan() {
126    return new Scan().setBatch(1);
127  }
128
129  // set a small result size for testing flow control
130  private static Scan createSmallResultSizeScan() {
131    return new Scan().setMaxResultSize(1);
132  }
133
134  private static Scan createBatchSmallResultSizeScan() {
135    return new Scan().setBatch(1).setMaxResultSize(1);
136  }
137
138  private static AsyncTable<?> getRawTable() {
139    return connectionRule.getAsyncConnection().getTable(TABLE_NAME);
140  }
141
142  private static AsyncTable<?> getTable() {
143    return connectionRule.getAsyncConnection().getTable(TABLE_NAME, ForkJoinPool.commonPool());
144  }
145
146  private static List<Pair<String, Supplier<Scan>>> getScanCreator() {
147    return Arrays.asList(Pair.newPair("normal", AbstractTestAsyncTableScan::createNormalScan),
148      Pair.newPair("batch", AbstractTestAsyncTableScan::createBatchScan),
149      Pair.newPair("smallResultSize", AbstractTestAsyncTableScan::createSmallResultSizeScan),
150      Pair.newPair("batchSmallResultSize",
151        AbstractTestAsyncTableScan::createBatchSmallResultSizeScan));
152  }
153
154  protected static List<Object[]> getScanCreatorParams() {
155    return getScanCreator().stream().map(p -> new Object[] { p.getFirst(), p.getSecond() })
156      .collect(Collectors.toList());
157  }
158
159  private static List<Pair<String, Supplier<AsyncTable<?>>>> getTableCreator() {
160    return Arrays.asList(Pair.newPair("raw", AbstractTestAsyncTableScan::getRawTable),
161      Pair.newPair("normal", AbstractTestAsyncTableScan::getTable));
162  }
163
164  protected static List<Object[]> getTableAndScanCreatorParams() {
165    List<Pair<String, Supplier<AsyncTable<?>>>> tableCreator = getTableCreator();
166    List<Pair<String, Supplier<Scan>>> scanCreator = getScanCreator();
167    return tableCreator.stream()
168      .flatMap(tp -> scanCreator.stream()
169        .map(sp -> new Object[] { tp.getFirst(), tp.getSecond(), sp.getFirst(), sp.getSecond() }))
170      .collect(Collectors.toList());
171  }
172
173  protected abstract Scan createScan();
174
175  protected abstract List<Result> doScan(Scan scan, int closeAfter) throws Exception;
176
177  /**
178   * Used by implementation classes to assert the correctness of spans produced under test.
179   */
180  protected abstract void assertTraceContinuity();
181
182  /**
183   * Used by implementation classes to assert the correctness of spans having errors.
184   */
185  protected abstract void
186    assertTraceError(final Matcher<io.opentelemetry.api.common.Attributes> exceptionMatcher);
187
188  protected final List<Result> convertFromBatchResult(List<Result> results) {
189    assertEquals(0, results.size() % 2);
190    return IntStream.range(0, results.size() / 2).mapToObj(i -> {
191      try {
192        return Result
193          .createCompleteResult(Arrays.asList(results.get(2 * i), results.get(2 * i + 1)));
194      } catch (IOException e) {
195        throw new UncheckedIOException(e);
196      }
197    }).collect(Collectors.toList());
198  }
199
200  protected static void waitForSpan(final Matcher<SpanData> parentSpanMatcher) {
201    final Configuration conf = miniClusterRule.getTestingUtility().getConfiguration();
202    Waiter.waitFor(conf, TimeUnit.SECONDS.toMillis(5), new MatcherPredicate<>(
203      "Span for test failed to complete.", otelClassRule::getSpans, hasItem(parentSpanMatcher)));
204  }
205
206  @Test
207  public void testScanAll() throws Exception {
208    List<Result> results = doScan(createScan(), -1);
209    // make sure all scanners are closed at RS side
210    miniClusterRule.getTestingUtility().getHBaseCluster().getRegionServerThreads().stream()
211      .map(JVMClusterUtil.RegionServerThread::getRegionServer).forEach(
212        rs -> assertEquals(
213          "The scanner count of " + rs.getServerName() + " is "
214            + rs.getRSRpcServices().getScannersCount(),
215          0, rs.getRSRpcServices().getScannersCount()));
216    assertEquals(COUNT, results.size());
217    IntStream.range(0, COUNT).forEach(i -> {
218      Result result = results.get(i);
219      assertEquals(String.format("%03d", i), Bytes.toString(result.getRow()));
220      assertEquals(i, Bytes.toInt(result.getValue(FAMILY, CQ1)));
221    });
222  }
223
224  private void assertResultEquals(Result result, int i) {
225    assertEquals(String.format("%03d", i), Bytes.toString(result.getRow()));
226    assertEquals(i, Bytes.toInt(result.getValue(FAMILY, CQ1)));
227    assertEquals(i * i, Bytes.toInt(result.getValue(FAMILY, CQ2)));
228  }
229
230  @Test
231  public void testReversedScanAll() throws Exception {
232    List<Result> results =
233      TraceUtil.trace(() -> doScan(createScan().setReversed(true), -1), testName.getMethodName());
234    assertEquals(COUNT, results.size());
235    IntStream.range(0, COUNT).forEach(i -> assertResultEquals(results.get(i), COUNT - i - 1));
236    assertTraceContinuity();
237  }
238
239  @Test
240  public void testScanNoStopKey() throws Exception {
241    int start = 345;
242    List<Result> results = TraceUtil.trace(
243      () -> doScan(createScan().withStartRow(Bytes.toBytes(String.format("%03d", start))), -1),
244      testName.getMethodName());
245    assertEquals(COUNT - start, results.size());
246    IntStream.range(0, COUNT - start).forEach(i -> assertResultEquals(results.get(i), start + i));
247    assertTraceContinuity();
248  }
249
250  @Test
251  public void testReverseScanNoStopKey() throws Exception {
252    int start = 765;
253    final Scan scan =
254      createScan().withStartRow(Bytes.toBytes(String.format("%03d", start))).setReversed(true);
255    List<Result> results = TraceUtil.trace(() -> doScan(scan, -1), testName.getMethodName());
256    assertEquals(start + 1, results.size());
257    IntStream.range(0, start + 1).forEach(i -> assertResultEquals(results.get(i), start - i));
258    assertTraceContinuity();
259  }
260
261  @Test
262  public void testScanWrongColumnFamily() {
263    final Exception e = assertThrows(Exception.class,
264      () -> TraceUtil.trace(
265        () -> doScan(createScan().addFamily(Bytes.toBytes("WrongColumnFamily")), -1),
266        testName.getMethodName()));
267    // hamcrest generic enforcement for `anyOf` is a pain; skip it
268    // but -- don't we always unwrap ExecutionExceptions -- bug?
269    if (e instanceof NoSuchColumnFamilyException) {
270      final NoSuchColumnFamilyException ex = (NoSuchColumnFamilyException) e;
271      assertThat(ex, isA(NoSuchColumnFamilyException.class));
272    } else if (e instanceof ExecutionException) {
273      final ExecutionException ex = (ExecutionException) e;
274      assertThat(ex, allOf(isA(ExecutionException.class),
275        hasProperty("cause", isA(NoSuchColumnFamilyException.class))));
276    } else {
277      fail("Found unexpected Exception " + e);
278    }
279    assertTraceError(anyOf(
280      containsEntry(is(SemanticAttributes.EXCEPTION_TYPE),
281        endsWith(NoSuchColumnFamilyException.class.getName())),
282      allOf(
283        containsEntry(is(SemanticAttributes.EXCEPTION_TYPE),
284          endsWith(RemoteWithExtrasException.class.getName())),
285        containsEntry(is(SemanticAttributes.EXCEPTION_MESSAGE),
286          containsString(NoSuchColumnFamilyException.class.getName())))));
287  }
288
289  private void testScan(int start, boolean startInclusive, int stop, boolean stopInclusive,
290    int limit) throws Exception {
291    testScan(start, startInclusive, stop, stopInclusive, limit, -1);
292  }
293
294  private void testScan(int start, boolean startInclusive, int stop, boolean stopInclusive,
295    int limit, int closeAfter) throws Exception {
296    Scan scan =
297      createScan().withStartRow(Bytes.toBytes(String.format("%03d", start)), startInclusive)
298        .withStopRow(Bytes.toBytes(String.format("%03d", stop)), stopInclusive);
299    if (limit > 0) {
300      scan.setLimit(limit);
301    }
302    List<Result> results = doScan(scan, closeAfter);
303    int actualStart = startInclusive ? start : start + 1;
304    int actualStop = stopInclusive ? stop + 1 : stop;
305    int count = actualStop - actualStart;
306    if (limit > 0) {
307      count = Math.min(count, limit);
308    }
309    if (closeAfter > 0) {
310      count = Math.min(count, closeAfter);
311    }
312    assertEquals(count, results.size());
313    IntStream.range(0, count).forEach(i -> assertResultEquals(results.get(i), actualStart + i));
314  }
315
316  private void testReversedScan(int start, boolean startInclusive, int stop, boolean stopInclusive,
317    int limit) throws Exception {
318    Scan scan =
319      createScan().withStartRow(Bytes.toBytes(String.format("%03d", start)), startInclusive)
320        .withStopRow(Bytes.toBytes(String.format("%03d", stop)), stopInclusive).setReversed(true);
321    if (limit > 0) {
322      scan.setLimit(limit);
323    }
324    List<Result> results = doScan(scan, -1);
325    int actualStart = startInclusive ? start : start - 1;
326    int actualStop = stopInclusive ? stop - 1 : stop;
327    int count = actualStart - actualStop;
328    if (limit > 0) {
329      count = Math.min(count, limit);
330    }
331    assertEquals(count, results.size());
332    IntStream.range(0, count).forEach(i -> assertResultEquals(results.get(i), actualStart - i));
333  }
334
335  @Test
336  public void testScanWithStartKeyAndStopKey() throws Exception {
337    testScan(1, true, 998, false, -1); // from first region to last region
338    testScan(123, true, 345, true, -1);
339    testScan(234, true, 456, false, -1);
340    testScan(345, false, 567, true, -1);
341    testScan(456, false, 678, false, -1);
342  }
343
344  @Test
345  public void testReversedScanWithStartKeyAndStopKey() throws Exception {
346    testReversedScan(998, true, 1, false, -1); // from last region to first region
347    testReversedScan(543, true, 321, true, -1);
348    testReversedScan(654, true, 432, false, -1);
349    testReversedScan(765, false, 543, true, -1);
350    testReversedScan(876, false, 654, false, -1);
351  }
352
353  @Test
354  public void testScanAtRegionBoundary() throws Exception {
355    testScan(222, true, 333, true, -1);
356    testScan(333, true, 444, false, -1);
357    testScan(444, false, 555, true, -1);
358    testScan(555, false, 666, false, -1);
359  }
360
361  @Test
362  public void testReversedScanAtRegionBoundary() throws Exception {
363    testReversedScan(333, true, 222, true, -1);
364    testReversedScan(444, true, 333, false, -1);
365    testReversedScan(555, false, 444, true, -1);
366    testReversedScan(666, false, 555, false, -1);
367  }
368
369  @Test
370  public void testScanWithLimit() throws Exception {
371    testScan(1, true, 998, false, 900); // from first region to last region
372    testScan(123, true, 234, true, 100);
373    testScan(234, true, 456, false, 100);
374    testScan(345, false, 567, true, 100);
375    testScan(456, false, 678, false, 100);
376  }
377
378  @Test
379  public void testScanWithLimitGreaterThanActualCount() throws Exception {
380    testScan(1, true, 998, false, 1000); // from first region to last region
381    testScan(123, true, 345, true, 200);
382    testScan(234, true, 456, false, 200);
383    testScan(345, false, 567, true, 200);
384    testScan(456, false, 678, false, 200);
385  }
386
387  @Test
388  public void testReversedScanWithLimit() throws Exception {
389    testReversedScan(998, true, 1, false, 900); // from last region to first region
390    testReversedScan(543, true, 321, true, 100);
391    testReversedScan(654, true, 432, false, 100);
392    testReversedScan(765, false, 543, true, 100);
393    testReversedScan(876, false, 654, false, 100);
394  }
395
396  @Test
397  public void testReversedScanWithLimitGreaterThanActualCount() throws Exception {
398    testReversedScan(998, true, 1, false, 1000); // from last region to first region
399    testReversedScan(543, true, 321, true, 200);
400    testReversedScan(654, true, 432, false, 200);
401    testReversedScan(765, false, 543, true, 200);
402    testReversedScan(876, false, 654, false, 200);
403  }
404
405  @Test
406  public void testScanEndingEarly() throws Exception {
407    testScan(1, true, 998, false, 0, 900); // from first region to last region
408    testScan(123, true, 234, true, 0, 100);
409    testScan(234, true, 456, false, 0, 100);
410    testScan(345, false, 567, true, 0, 100);
411    testScan(456, false, 678, false, 0, 100);
412  }
413}