001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.HConstants.RPC_CODEC_CONF_KEY;
021import static org.apache.hadoop.hbase.client.TestFromClientSide3.generateHugeValue;
022import static org.apache.hadoop.hbase.ipc.RpcClient.DEFAULT_CODEC_CLASS;
023import static org.junit.Assert.assertArrayEquals;
024import static org.junit.Assert.assertEquals;
025import static org.junit.Assert.assertFalse;
026import static org.junit.Assert.assertNotNull;
027import static org.junit.Assert.assertNull;
028import static org.junit.Assert.assertTrue;
029import static org.junit.Assert.fail;
030
031import java.io.IOException;
032import java.util.ArrayList;
033import java.util.Arrays;
034import java.util.Collection;
035import java.util.List;
036import java.util.concurrent.TimeUnit;
037import java.util.function.Consumer;
038import java.util.stream.IntStream;
039import org.apache.hadoop.conf.Configuration;
040import org.apache.hadoop.hbase.Cell;
041import org.apache.hadoop.hbase.CompareOperator;
042import org.apache.hadoop.hbase.HBaseClassTestRule;
043import org.apache.hadoop.hbase.HBaseTestingUtility;
044import org.apache.hadoop.hbase.HColumnDescriptor;
045import org.apache.hadoop.hbase.HConstants;
046import org.apache.hadoop.hbase.HRegionInfo;
047import org.apache.hadoop.hbase.HRegionLocation;
048import org.apache.hadoop.hbase.HTestConst;
049import org.apache.hadoop.hbase.KeyValue;
050import org.apache.hadoop.hbase.MiniHBaseCluster;
051import org.apache.hadoop.hbase.StartMiniClusterOption;
052import org.apache.hadoop.hbase.TableName;
053import org.apache.hadoop.hbase.TableNameTestRule;
054import org.apache.hadoop.hbase.TableNotFoundException;
055import org.apache.hadoop.hbase.exceptions.DeserializationException;
056import org.apache.hadoop.hbase.filter.BinaryComparator;
057import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
058import org.apache.hadoop.hbase.filter.ColumnRangeFilter;
059import org.apache.hadoop.hbase.filter.FilterBase;
060import org.apache.hadoop.hbase.filter.QualifierFilter;
061import org.apache.hadoop.hbase.regionserver.HRegionServer;
062import org.apache.hadoop.hbase.testclassification.ClientTests;
063import org.apache.hadoop.hbase.testclassification.MediumTests;
064import org.apache.hadoop.hbase.util.Bytes;
065import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
066import org.junit.AfterClass;
067import org.junit.Before;
068import org.junit.ClassRule;
069import org.junit.Rule;
070import org.junit.Test;
071import org.junit.experimental.categories.Category;
072import org.junit.runner.RunWith;
073import org.junit.runners.Parameterized;
074import org.slf4j.Logger;
075import org.slf4j.LoggerFactory;
076
077import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
078
079/**
080 * A client-side test, mostly testing scanners with various parameters. Parameterized on different
081 * registry implementations.
082 */
083@Category({ MediumTests.class, ClientTests.class })
084@RunWith(Parameterized.class)
085public class TestScannersFromClientSide {
086
087  @ClassRule
088  public static final HBaseClassTestRule CLASS_RULE =
089    HBaseClassTestRule.forClass(TestScannersFromClientSide.class);
090
091  private static final Logger LOG = LoggerFactory.getLogger(TestScannersFromClientSide.class);
092
093  private static HBaseTestingUtility TEST_UTIL;
094  private static byte[] ROW = Bytes.toBytes("testRow");
095  private static byte[] FAMILY = Bytes.toBytes("testFamily");
096  private static byte[] QUALIFIER = Bytes.toBytes("testQualifier");
097  private static byte[] VALUE = Bytes.toBytes("testValue");
098
099  @Rule
100  public TableNameTestRule name = new TableNameTestRule();
101
102  /**
103   * @throws java.lang.Exception
104   */
105  @AfterClass
106  public static void tearDownAfterClass() throws Exception {
107    if (TEST_UTIL != null) {
108      TEST_UTIL.shutdownMiniCluster();
109    }
110  }
111
112  /**
113   * @throws java.lang.Exception
114   */
115  @Before
116  public void setUp() throws Exception {
117    // Nothing to do.
118  }
119
120  @Parameterized.Parameters
121  public static Collection<Object[]> parameters() {
122    return Arrays.asList(new Object[][] { { MasterRegistry.class, 1 }, { MasterRegistry.class, 2 },
123      { ZKConnectionRegistry.class, 1 } });
124  }
125
126  /**
127   * JUnit does not provide an easy way to run a hook after each parameterized run. Without that
128   * there is no easy way to restart the test cluster after each parameterized run. Annotation
129   * BeforeParam does not work either because it runs before parameterization and hence does not
130   * have access to the test parameters (which is weird). This *hack* checks if the current instance
131   * of test cluster configuration has the passed parameterized configs. In such a case, we can just
132   * reuse the cluster for test and do not need to initialize from scratch. While this is a hack, it
133   * saves a ton of time for the full test and de-flakes it.
134   */
135  private static boolean isSameParameterizedCluster(Class<?> registryImpl, int numHedgedReqs) {
136    // initialize() is called for every unit test, however we only want to reset the cluster state
137    // at the end of every parameterized run.
138    if (TEST_UTIL == null) {
139      return false;
140    }
141    Configuration conf = TEST_UTIL.getConfiguration();
142    Class<?> confClass = conf.getClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
143      ZKConnectionRegistry.class);
144    int hedgedReqConfig = conf.getInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY,
145      AbstractRpcBasedConnectionRegistry.HEDGED_REQS_FANOUT_DEFAULT);
146    return confClass.getName().equals(registryImpl.getName()) && numHedgedReqs == hedgedReqConfig;
147  }
148
149  public TestScannersFromClientSide(Class<?> registryImpl, int numHedgedReqs) throws Exception {
150    if (isSameParameterizedCluster(registryImpl, numHedgedReqs)) {
151      return;
152    }
153    if (TEST_UTIL != null) {
154      // We reached the end of a parameterized run, clean up the cluster.
155      TEST_UTIL.shutdownMiniCluster();
156    }
157    TEST_UTIL = new HBaseTestingUtility();
158    Configuration conf = TEST_UTIL.getConfiguration();
159    conf.setLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, 10 * 1024 * 1024);
160    conf.setClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, registryImpl,
161      ConnectionRegistry.class);
162    Preconditions.checkArgument(numHedgedReqs > 0);
163    conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, numHedgedReqs);
164    StartMiniClusterOption.Builder builder = StartMiniClusterOption.builder();
165    // Multiple masters needed only when hedged reads for master registry are enabled.
166    builder.numMasters(numHedgedReqs > 1 ? 3 : 1).numRegionServers(3);
167    TEST_UTIL.startMiniCluster(builder.build());
168  }
169
170  @Test
171  public void testScanImmutable() throws IOException {
172    TableName tableName = name.getTableName();
173    Table table = TEST_UTIL.createTable(tableName, FAMILY);
174    TEST_UTIL.loadRandomRows(table, FAMILY, 100, 100);
175
176    Scan scan = new Scan().setCaching(-1).setMvccReadPoint(-1).setScanMetricsEnabled(true);
177
178    try (ResultScanner scanner = table.getScanner(scan)) {
179      scanner.next(1000);
180    }
181    // these 2 should be unchanged
182    assertEquals(-1, scan.getCaching());
183    assertEquals(-1, scan.getMvccReadPoint());
184    // scan metrics should be populated
185    assertNotNull(scan.getScanMetrics());
186    assertEquals(scan.getScanMetrics().countOfRegions.get(), 1);
187  }
188
189  /**
190   * Test from client side for batch of scan
191   */
192  @Test
193  public void testScanBatch() throws Exception {
194    final TableName tableName = name.getTableName();
195    byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 8);
196
197    Table ht = TEST_UTIL.createTable(tableName, FAMILY);
198
199    Put put;
200    Scan scan;
201    Delete delete;
202    Result result;
203    ResultScanner scanner;
204    boolean toLog = true;
205    List<Cell> kvListExp;
206
207    // table: row, family, c0:0, c1:1, ... , c7:7
208    put = new Put(ROW);
209    for (int i = 0; i < QUALIFIERS.length; i++) {
210      KeyValue kv = new KeyValue(ROW, FAMILY, QUALIFIERS[i], i, VALUE);
211      put.add(kv);
212    }
213    ht.put(put);
214
215    // table: row, family, c0:0, c1:1, ..., c6:2, c6:6 , c7:7
216    put = new Put(ROW);
217    KeyValue kv = new KeyValue(ROW, FAMILY, QUALIFIERS[6], 2, VALUE);
218    put.add(kv);
219    ht.put(put);
220
221    // delete upto ts: 3
222    delete = new Delete(ROW);
223    delete.addFamily(FAMILY, 3);
224    ht.delete(delete);
225
226    // without batch
227    scan = new Scan().withStartRow(ROW);
228    scan.setMaxVersions();
229    scanner = ht.getScanner(scan);
230
231    // c4:4, c5:5, c6:6, c7:7
232    kvListExp = new ArrayList<>();
233    kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[4], 4, VALUE));
234    kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[5], 5, VALUE));
235    kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[6], 6, VALUE));
236    kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[7], 7, VALUE));
237    result = scanner.next();
238    verifyResult(result, kvListExp, toLog, "Testing first batch of scan");
239
240    // with batch
241    scan = new Scan().withStartRow(ROW);
242    scan.setMaxVersions();
243    scan.setBatch(2);
244    scanner = ht.getScanner(scan);
245
246    // First batch: c4:4, c5:5
247    kvListExp = new ArrayList<>();
248    kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[4], 4, VALUE));
249    kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[5], 5, VALUE));
250    result = scanner.next();
251    verifyResult(result, kvListExp, toLog, "Testing first batch of scan");
252
253    // Second batch: c6:6, c7:7
254    kvListExp = new ArrayList<>();
255    kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[6], 6, VALUE));
256    kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[7], 7, VALUE));
257    result = scanner.next();
258    verifyResult(result, kvListExp, toLog, "Testing second batch of scan");
259
260  }
261
262  @Test
263  public void testMaxResultSizeIsSetToDefault() throws Exception {
264    final TableName tableName = name.getTableName();
265    Table ht = TEST_UTIL.createTable(tableName, FAMILY);
266
267    // The max result size we expect the scan to use by default.
268    long expectedMaxResultSize =
269      TEST_UTIL.getConfiguration().getLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
270        HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
271
272    int numRows = 5;
273    byte[][] ROWS = HTestConst.makeNAscii(ROW, numRows);
274
275    int numQualifiers = 10;
276    byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, numQualifiers);
277
278    // Specify the cell size such that a single row will be larger than the default
279    // value of maxResultSize. This means that Scan RPCs should return at most a single
280    // result back to the client.
281    int cellSize = (int) (expectedMaxResultSize / (numQualifiers - 1));
282    byte[] cellValue = Bytes.createMaxByteArray(cellSize);
283
284    Put put;
285    List<Put> puts = new ArrayList<>();
286    for (int row = 0; row < ROWS.length; row++) {
287      put = new Put(ROWS[row]);
288      for (int qual = 0; qual < QUALIFIERS.length; qual++) {
289        KeyValue kv = new KeyValue(ROWS[row], FAMILY, QUALIFIERS[qual], cellValue);
290        put.add(kv);
291      }
292      puts.add(put);
293    }
294    ht.put(puts);
295
296    // Create a scan with the default configuration.
297    Scan scan = new Scan();
298
299    ResultScanner scanner = ht.getScanner(scan);
300    assertTrue(scanner instanceof ClientScanner);
301    ClientScanner clientScanner = (ClientScanner) scanner;
302
303    // Call next to issue a single RPC to the server
304    scanner.next();
305
306    // The scanner should have, at most, a single result in its cache. If there more results exists
307    // in the cache it means that more than the expected max result size was fetched.
308    assertTrue("The cache contains: " + clientScanner.getCacheSize() + " results",
309      clientScanner.getCacheSize() <= 1);
310  }
311
312  /**
313   * Scan on not existing table should throw the exception with correct message
314   */
315  @Test
316  public void testScannerForNotExistingTable() {
317    String[] tableNames = { "A", "Z", "A:A", "Z:Z" };
318    for (String tableName : tableNames) {
319      try {
320        Table table = TEST_UTIL.getConnection().getTable(TableName.valueOf(tableName));
321        testSmallScan(table, true, 1, 5);
322        fail("TableNotFoundException was not thrown");
323      } catch (TableNotFoundException e) {
324        // We expect that the message for TableNotFoundException would have only the table name only
325        // Otherwise that would mean that localeRegionInMeta doesn't work properly
326        assertEquals(e.getMessage(), tableName);
327      } catch (Exception e) {
328        fail("Unexpected exception " + e.getMessage());
329      }
330    }
331  }
332
333  @Test
334  public void testSmallScan() throws Exception {
335    final TableName tableName = name.getTableName();
336
337    int numRows = 10;
338    byte[][] ROWS = HTestConst.makeNAscii(ROW, numRows);
339
340    int numQualifiers = 10;
341    byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, numQualifiers);
342
343    Table ht = TEST_UTIL.createTable(tableName, FAMILY);
344
345    Put put;
346    List<Put> puts = new ArrayList<>();
347    for (int row = 0; row < ROWS.length; row++) {
348      put = new Put(ROWS[row]);
349      for (int qual = 0; qual < QUALIFIERS.length; qual++) {
350        KeyValue kv = new KeyValue(ROWS[row], FAMILY, QUALIFIERS[qual], VALUE);
351        put.add(kv);
352      }
353      puts.add(put);
354    }
355    ht.put(puts);
356
357    int expectedRows = numRows;
358    int expectedCols = numRows * numQualifiers;
359
360    // Test normal and reversed
361    testSmallScan(ht, true, expectedRows, expectedCols);
362    testSmallScan(ht, false, expectedRows, expectedCols);
363  }
364
365  /**
366   * Run through a variety of test configurations with a small scan
367   */
368  private void testSmallScan(Table table, boolean reversed, int rows, int columns)
369    throws Exception {
370    Scan baseScan = new Scan();
371    baseScan.setReversed(reversed);
372    baseScan.setSmall(true);
373
374    Scan scan = new Scan(baseScan);
375    verifyExpectedCounts(table, scan, rows, columns);
376
377    scan = new Scan(baseScan);
378    scan.setMaxResultSize(1);
379    verifyExpectedCounts(table, scan, rows, columns);
380
381    scan = new Scan(baseScan);
382    scan.setMaxResultSize(1);
383    scan.setCaching(Integer.MAX_VALUE);
384    verifyExpectedCounts(table, scan, rows, columns);
385  }
386
387  private void verifyExpectedCounts(Table table, Scan scan, int expectedRowCount,
388    int expectedCellCount) throws Exception {
389    ResultScanner scanner = table.getScanner(scan);
390
391    int rowCount = 0;
392    int cellCount = 0;
393    Result r = null;
394    while ((r = scanner.next()) != null) {
395      rowCount++;
396      cellCount += r.rawCells().length;
397    }
398
399    assertTrue("Expected row count: " + expectedRowCount + " Actual row count: " + rowCount,
400      expectedRowCount == rowCount);
401    assertTrue("Expected cell count: " + expectedCellCount + " Actual cell count: " + cellCount,
402      expectedCellCount == cellCount);
403    scanner.close();
404  }
405
406  /**
407   * Test from client side for get with maxResultPerCF set
408   */
409  @Test
410  public void testGetMaxResults() throws Exception {
411    final TableName tableName = name.getTableName();
412    byte[][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3);
413    byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 20);
414
415    Table ht = TEST_UTIL.createTable(tableName, FAMILIES);
416
417    Get get;
418    Put put;
419    Result result;
420    boolean toLog = true;
421    List<Cell> kvListExp;
422
423    kvListExp = new ArrayList<>();
424    // Insert one CF for row[0]
425    put = new Put(ROW);
426    for (int i = 0; i < 10; i++) {
427      KeyValue kv = new KeyValue(ROW, FAMILIES[0], QUALIFIERS[i], 1, VALUE);
428      put.add(kv);
429      kvListExp.add(kv);
430    }
431    ht.put(put);
432
433    get = new Get(ROW);
434    result = ht.get(get);
435    verifyResult(result, kvListExp, toLog, "Testing without setting maxResults");
436
437    get = new Get(ROW);
438    get.setMaxResultsPerColumnFamily(2);
439    result = ht.get(get);
440    kvListExp = new ArrayList<>();
441    kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[0], 1, VALUE));
442    kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[1], 1, VALUE));
443    verifyResult(result, kvListExp, toLog, "Testing basic setMaxResults");
444
445    // Filters: ColumnRangeFilter
446    get = new Get(ROW);
447    get.setMaxResultsPerColumnFamily(5);
448    get.setFilter(new ColumnRangeFilter(QUALIFIERS[2], true, QUALIFIERS[5], true));
449    result = ht.get(get);
450    kvListExp = new ArrayList<>();
451    kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[2], 1, VALUE));
452    kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[3], 1, VALUE));
453    kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[4], 1, VALUE));
454    kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[5], 1, VALUE));
455    verifyResult(result, kvListExp, toLog, "Testing single CF with CRF");
456
457    // Insert two more CF for row[0]
458    // 20 columns for CF2, 10 columns for CF1
459    put = new Put(ROW);
460    for (int i = 0; i < QUALIFIERS.length; i++) {
461      KeyValue kv = new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE);
462      put.add(kv);
463    }
464    ht.put(put);
465
466    put = new Put(ROW);
467    for (int i = 0; i < 10; i++) {
468      KeyValue kv = new KeyValue(ROW, FAMILIES[1], QUALIFIERS[i], 1, VALUE);
469      put.add(kv);
470    }
471    ht.put(put);
472
473    get = new Get(ROW);
474    get.setMaxResultsPerColumnFamily(12);
475    get.addFamily(FAMILIES[1]);
476    get.addFamily(FAMILIES[2]);
477    result = ht.get(get);
478    kvListExp = new ArrayList<>();
479    // Exp: CF1:q0, ..., q9, CF2: q0, q1, q10, q11, ..., q19
480    for (int i = 0; i < 10; i++) {
481      kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[i], 1, VALUE));
482    }
483    for (int i = 0; i < 2; i++) {
484      kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE));
485    }
486    for (int i = 10; i < 20; i++) {
487      kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE));
488    }
489    verifyResult(result, kvListExp, toLog, "Testing multiple CFs");
490
491    // Filters: ColumnRangeFilter and ColumnPrefixFilter
492    get = new Get(ROW);
493    get.setMaxResultsPerColumnFamily(3);
494    get.setFilter(new ColumnRangeFilter(QUALIFIERS[2], true, null, true));
495    result = ht.get(get);
496    kvListExp = new ArrayList<>();
497    for (int i = 2; i < 5; i++) {
498      kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[i], 1, VALUE));
499    }
500    for (int i = 2; i < 5; i++) {
501      kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[i], 1, VALUE));
502    }
503    for (int i = 2; i < 5; i++) {
504      kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE));
505    }
506    verifyResult(result, kvListExp, toLog, "Testing multiple CFs + CRF");
507
508    get = new Get(ROW);
509    get.setMaxResultsPerColumnFamily(7);
510    get.setFilter(new ColumnPrefixFilter(QUALIFIERS[1]));
511    result = ht.get(get);
512    kvListExp = new ArrayList<>();
513    kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[1], 1, VALUE));
514    kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[1], 1, VALUE));
515    kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[1], 1, VALUE));
516    for (int i = 10; i < 16; i++) {
517      kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE));
518    }
519    verifyResult(result, kvListExp, toLog, "Testing multiple CFs + PFF");
520
521  }
522
523  /**
524   * Test from client side for scan with maxResultPerCF set
525   */
526  @Test
527  public void testScanMaxResults() throws Exception {
528    final TableName tableName = name.getTableName();
529    byte[][] ROWS = HTestConst.makeNAscii(ROW, 2);
530    byte[][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3);
531    byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 10);
532
533    Table ht = TEST_UTIL.createTable(tableName, FAMILIES);
534
535    Put put;
536    Scan scan;
537    Result result;
538    boolean toLog = true;
539    List<Cell> kvListExp, kvListScan;
540
541    kvListExp = new ArrayList<>();
542
543    for (int r = 0; r < ROWS.length; r++) {
544      put = new Put(ROWS[r]);
545      for (int c = 0; c < FAMILIES.length; c++) {
546        for (int q = 0; q < QUALIFIERS.length; q++) {
547          KeyValue kv = new KeyValue(ROWS[r], FAMILIES[c], QUALIFIERS[q], 1, VALUE);
548          put.add(kv);
549          if (q < 4) {
550            kvListExp.add(kv);
551          }
552        }
553      }
554      ht.put(put);
555    }
556
557    scan = new Scan();
558    scan.setMaxResultsPerColumnFamily(4);
559    ResultScanner scanner = ht.getScanner(scan);
560    kvListScan = new ArrayList<>();
561    while ((result = scanner.next()) != null) {
562      for (Cell kv : result.listCells()) {
563        kvListScan.add(kv);
564      }
565    }
566    result = Result.create(kvListScan);
567    verifyResult(result, kvListExp, toLog, "Testing scan with maxResults");
568
569  }
570
571  /**
572   * Test from client side for get with rowOffset
573   */
574  @Test
575  public void testGetRowOffset() throws Exception {
576    final TableName tableName = name.getTableName();
577    byte[][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3);
578    byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 20);
579
580    Table ht = TEST_UTIL.createTable(tableName, FAMILIES);
581
582    Get get;
583    Put put;
584    Result result;
585    boolean toLog = true;
586    List<Cell> kvListExp;
587
588    // Insert one CF for row
589    kvListExp = new ArrayList<>();
590    put = new Put(ROW);
591    for (int i = 0; i < 10; i++) {
592      KeyValue kv = new KeyValue(ROW, FAMILIES[0], QUALIFIERS[i], 1, VALUE);
593      put.add(kv);
594      // skipping first two kvs
595      if (i < 2) {
596        continue;
597      }
598      kvListExp.add(kv);
599    }
600    ht.put(put);
601
602    // setting offset to 2
603    get = new Get(ROW);
604    get.setRowOffsetPerColumnFamily(2);
605    result = ht.get(get);
606    verifyResult(result, kvListExp, toLog, "Testing basic setRowOffset");
607
608    // setting offset to 20
609    get = new Get(ROW);
610    get.setRowOffsetPerColumnFamily(20);
611    result = ht.get(get);
612    kvListExp = new ArrayList<>();
613    verifyResult(result, kvListExp, toLog, "Testing offset > #kvs");
614
615    // offset + maxResultPerCF
616    get = new Get(ROW);
617    get.setRowOffsetPerColumnFamily(4);
618    get.setMaxResultsPerColumnFamily(5);
619    result = ht.get(get);
620    kvListExp = new ArrayList<>();
621    for (int i = 4; i < 9; i++) {
622      kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[i], 1, VALUE));
623    }
624    verifyResult(result, kvListExp, toLog, "Testing offset + setMaxResultsPerCF");
625
626    // Filters: ColumnRangeFilter
627    get = new Get(ROW);
628    get.setRowOffsetPerColumnFamily(1);
629    get.setFilter(new ColumnRangeFilter(QUALIFIERS[2], true, QUALIFIERS[5], true));
630    result = ht.get(get);
631    kvListExp = new ArrayList<>();
632    kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[3], 1, VALUE));
633    kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[4], 1, VALUE));
634    kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[5], 1, VALUE));
635    verifyResult(result, kvListExp, toLog, "Testing offset with CRF");
636
637    // Insert into two more CFs for row
638    // 10 columns for CF2, 10 columns for CF1
639    for (int j = 2; j > 0; j--) {
640      put = new Put(ROW);
641      for (int i = 0; i < 10; i++) {
642        KeyValue kv = new KeyValue(ROW, FAMILIES[j], QUALIFIERS[i], 1, VALUE);
643        put.add(kv);
644      }
645      ht.put(put);
646    }
647
648    get = new Get(ROW);
649    get.setRowOffsetPerColumnFamily(4);
650    get.setMaxResultsPerColumnFamily(2);
651    get.addFamily(FAMILIES[1]);
652    get.addFamily(FAMILIES[2]);
653    result = ht.get(get);
654    kvListExp = new ArrayList<>();
655    // Exp: CF1:q4, q5, CF2: q4, q5
656    kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[4], 1, VALUE));
657    kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[5], 1, VALUE));
658    kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[4], 1, VALUE));
659    kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[5], 1, VALUE));
660    verifyResult(result, kvListExp, toLog, "Testing offset + multiple CFs + maxResults");
661  }
662
663  @Test
664  public void testScanRawDeleteFamilyVersion() throws Exception {
665    TableName tableName = name.getTableName();
666    TEST_UTIL.createTable(tableName, FAMILY);
667    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
668    conf.set(RPC_CODEC_CONF_KEY, "");
669    conf.set(DEFAULT_CODEC_CLASS, "");
670    try (Connection connection = ConnectionFactory.createConnection(conf);
671      Table table = connection.getTable(tableName)) {
672      Delete delete = new Delete(ROW);
673      delete.addFamilyVersion(FAMILY, 0L);
674      table.delete(delete);
675      Scan scan = new Scan(ROW).setRaw(true);
676      ResultScanner scanner = table.getScanner(scan);
677      int count = 0;
678      while (scanner.next() != null) {
679        count++;
680      }
681      assertEquals(1, count);
682    } finally {
683      TEST_UTIL.deleteTable(tableName);
684    }
685  }
686
687  /**
688   * Test from client side for scan while the region is reopened on the same region server.
689   */
690  @Test
691  public void testScanOnReopenedRegion() throws Exception {
692    final TableName tableName = name.getTableName();
693    byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 2);
694
695    Table ht = TEST_UTIL.createTable(tableName, FAMILY);
696
697    Put put;
698    Scan scan;
699    Result result;
700    ResultScanner scanner;
701    boolean toLog = false;
702    List<Cell> kvListExp;
703
704    // table: row, family, c0:0, c1:1
705    put = new Put(ROW);
706    for (int i = 0; i < QUALIFIERS.length; i++) {
707      KeyValue kv = new KeyValue(ROW, FAMILY, QUALIFIERS[i], i, VALUE);
708      put.add(kv);
709    }
710    ht.put(put);
711
712    scan = new Scan().withStartRow(ROW);
713    scanner = ht.getScanner(scan);
714
715    HRegionLocation loc;
716
717    try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName)) {
718      loc = locator.getRegionLocation(ROW);
719    }
720    HRegionInfo hri = loc.getRegionInfo();
721    MiniHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster();
722    byte[] regionName = hri.getRegionName();
723    int i = cluster.getServerWith(regionName);
724    HRegionServer rs = cluster.getRegionServer(i);
725    LOG.info("Unassigning " + hri);
726    TEST_UTIL.getAdmin().unassign(hri.getRegionName(), true);
727    long startTime = EnvironmentEdgeManager.currentTime();
728    long timeOut = 10000;
729    boolean offline = false;
730    while (true) {
731      if (rs.getOnlineRegion(regionName) == null) {
732        offline = true;
733        break;
734      }
735      assertTrue("Timed out in closing the testing region",
736        EnvironmentEdgeManager.currentTime() < startTime + timeOut);
737    }
738    assertTrue(offline);
739    LOG.info("Assigning " + hri);
740    TEST_UTIL.getAdmin().assign(hri.getRegionName());
741    startTime = EnvironmentEdgeManager.currentTime();
742    while (true) {
743      rs = cluster.getRegionServer(cluster.getServerWith(regionName));
744      if (rs != null && rs.getOnlineRegion(regionName) != null) {
745        offline = false;
746        break;
747      }
748      assertTrue("Timed out in open the testing region",
749        EnvironmentEdgeManager.currentTime() < startTime + timeOut);
750    }
751    assertFalse(offline);
752
753    // c0:0, c1:1
754    kvListExp = new ArrayList<>();
755    kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[0], 0, VALUE));
756    kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[1], 1, VALUE));
757    result = scanner.next();
758    verifyResult(result, kvListExp, toLog, "Testing scan on re-opened region");
759  }
760
761  @Test
762  public void testAsyncScannerWithSmallData() throws Exception {
763    testAsyncScanner(name.getTableName(), 2, 3, 10, -1, null);
764  }
765
766  @Test
767  public void testAsyncScannerWithManyRows() throws Exception {
768    testAsyncScanner(name.getTableName(), 30000, 1, 1, -1, null);
769  }
770
771  @Test
772  public void testAsyncScannerWithoutCaching() throws Exception {
773    testAsyncScanner(name.getTableName(), 5, 1, 1, 1, (b) -> {
774      try {
775        TimeUnit.MILLISECONDS.sleep(500);
776      } catch (InterruptedException ex) {
777      }
778    });
779  }
780
781  private void testAsyncScanner(TableName table, int rowNumber, int familyNumber,
782    int qualifierNumber, int caching, Consumer<Boolean> listener) throws Exception {
783    assert rowNumber > 0;
784    assert familyNumber > 0;
785    assert qualifierNumber > 0;
786    byte[] row = Bytes.toBytes("r");
787    byte[] family = Bytes.toBytes("f");
788    byte[] qualifier = Bytes.toBytes("q");
789    byte[][] rows = makeNAsciiWithZeroPrefix(row, rowNumber);
790    byte[][] families = makeNAsciiWithZeroPrefix(family, familyNumber);
791    byte[][] qualifiers = makeNAsciiWithZeroPrefix(qualifier, qualifierNumber);
792
793    Table ht = TEST_UTIL.createTable(table, families);
794
795    boolean toLog = true;
796    List<Cell> kvListExp = new ArrayList<>();
797
798    List<Put> puts = new ArrayList<>();
799    for (byte[] r : rows) {
800      Put put = new Put(r);
801      for (byte[] f : families) {
802        for (byte[] q : qualifiers) {
803          KeyValue kv = new KeyValue(r, f, q, 1, VALUE);
804          put.add(kv);
805          kvListExp.add(kv);
806        }
807      }
808      puts.add(put);
809      if (puts.size() > 1000) {
810        ht.put(puts);
811        puts.clear();
812      }
813    }
814    if (!puts.isEmpty()) {
815      ht.put(puts);
816      puts.clear();
817    }
818
819    Scan scan = new Scan();
820    scan.setAsyncPrefetch(true);
821    if (caching > 0) {
822      scan.setCaching(caching);
823    }
824    try (ResultScanner scanner = ht.getScanner(scan)) {
825      assertTrue("Not instance of async scanner", scanner instanceof ClientAsyncPrefetchScanner);
826      ((ClientAsyncPrefetchScanner) scanner).setPrefetchListener(listener);
827      List<Cell> kvListScan = new ArrayList<>();
828      Result result;
829      boolean first = true;
830      int actualRows = 0;
831      while ((result = scanner.next()) != null) {
832        ++actualRows;
833        // waiting for cache. see HBASE-17376
834        if (first) {
835          TimeUnit.SECONDS.sleep(1);
836          first = false;
837        }
838        for (Cell kv : result.listCells()) {
839          kvListScan.add(kv);
840        }
841      }
842      assertEquals(rowNumber, actualRows);
843      // These cells may have different rows but it is ok. The Result#getRow
844      // isn't used in the verifyResult()
845      result = Result.create(kvListScan);
846      verifyResult(result, kvListExp, toLog, "Testing async scan");
847    }
848
849    TEST_UTIL.deleteTable(table);
850  }
851
852  private static byte[][] makeNAsciiWithZeroPrefix(byte[] base, int n) {
853    int maxLength = Integer.toString(n).length();
854    byte[][] ret = new byte[n][];
855    for (int i = 0; i < n; i++) {
856      int length = Integer.toString(i).length();
857      StringBuilder buf = new StringBuilder(Integer.toString(i));
858      IntStream.range(0, maxLength - length).forEach(v -> buf.insert(0, "0"));
859      byte[] tail = Bytes.toBytes(buf.toString());
860      ret[i] = Bytes.add(base, tail);
861    }
862    return ret;
863  }
864
865  static void verifyResult(Result result, List<Cell> expKvList, boolean toLog, String msg) {
866
867    LOG.info(msg);
868    LOG.info("Expected count: " + expKvList.size());
869    LOG.info("Actual count: " + result.size());
870    if (expKvList.isEmpty()) {
871      return;
872    }
873
874    int i = 0;
875    for (Cell kv : result.rawCells()) {
876      if (i >= expKvList.size()) {
877        break; // we will check the size later
878      }
879
880      Cell kvExp = expKvList.get(i++);
881      if (toLog) {
882        LOG.info("get kv is: " + kv.toString());
883        LOG.info("exp kv is: " + kvExp.toString());
884      }
885      assertTrue("Not equal", kvExp.equals(kv));
886    }
887
888    assertEquals(expKvList.size(), result.size());
889  }
890
891  @Test
892  public void testReadExpiredDataForRawScan() throws IOException {
893    TableName tableName = name.getTableName();
894    long ts = EnvironmentEdgeManager.currentTime() - 10000;
895    byte[] value = Bytes.toBytes("expired");
896    try (Table table = TEST_UTIL.createTable(tableName, FAMILY)) {
897      table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, ts, value));
898      assertArrayEquals(value, table.get(new Get(ROW)).getValue(FAMILY, QUALIFIER));
899      TEST_UTIL.getAdmin().modifyColumnFamily(tableName,
900        new HColumnDescriptor(FAMILY).setTimeToLive(5));
901      try (ResultScanner scanner = table.getScanner(FAMILY)) {
902        assertNull(scanner.next());
903      }
904      try (ResultScanner scanner = table.getScanner(new Scan().setRaw(true))) {
905        assertArrayEquals(value, scanner.next().getValue(FAMILY, QUALIFIER));
906        assertNull(scanner.next());
907      }
908    }
909  }
910
911  @Test
912  public void testScanWithColumnsAndFilterAndVersion() throws IOException {
913    TableName tableName = name.getTableName();
914    long now = EnvironmentEdgeManager.currentTime();
915    try (Table table = TEST_UTIL.createTable(tableName, FAMILY, 4)) {
916      for (int i = 0; i < 4; i++) {
917        Put put = new Put(ROW);
918        put.addColumn(FAMILY, QUALIFIER, now + i, VALUE);
919        table.put(put);
920      }
921
922      Scan scan = new Scan();
923      scan.addColumn(FAMILY, QUALIFIER);
924      scan.setFilter(new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(QUALIFIER)));
925      scan.readVersions(3);
926
927      try (ResultScanner scanner = table.getScanner(scan)) {
928        Result result = scanner.next();
929        assertEquals(3, result.size());
930      }
931    }
932  }
933
934  @Test
935  public void testScanWithSameStartRowStopRow() throws IOException {
936    TableName tableName = name.getTableName();
937    try (Table table = TEST_UTIL.createTable(tableName, FAMILY)) {
938      table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE));
939
940      Scan scan = new Scan().withStartRow(ROW).withStopRow(ROW);
941      try (ResultScanner scanner = table.getScanner(scan)) {
942        assertNull(scanner.next());
943      }
944
945      scan = new Scan().withStartRow(ROW, true).withStopRow(ROW, true);
946      try (ResultScanner scanner = table.getScanner(scan)) {
947        Result result = scanner.next();
948        assertNotNull(result);
949        assertArrayEquals(ROW, result.getRow());
950        assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER));
951        assertNull(scanner.next());
952      }
953
954      scan = new Scan().withStartRow(ROW, true).withStopRow(ROW, false);
955      try (ResultScanner scanner = table.getScanner(scan)) {
956        assertNull(scanner.next());
957      }
958
959      scan = new Scan().withStartRow(ROW, false).withStopRow(ROW, false);
960      try (ResultScanner scanner = table.getScanner(scan)) {
961        assertNull(scanner.next());
962      }
963
964      scan = new Scan().withStartRow(ROW, false).withStopRow(ROW, true);
965      try (ResultScanner scanner = table.getScanner(scan)) {
966        assertNull(scanner.next());
967      }
968    }
969  }
970
971  @Test
972  public void testReverseScanWithFlush() throws Exception {
973    TableName tableName = name.getTableName();
974    final int BATCH_SIZE = 10;
975    final int ROWS_TO_INSERT = 100;
976    final byte[] LARGE_VALUE = generateHugeValue(128 * 1024);
977
978    try (Table table = TEST_UTIL.createTable(tableName, FAMILY);
979      Admin admin = TEST_UTIL.getAdmin()) {
980      List<Put> putList = new ArrayList<>();
981      for (long i = 0; i < ROWS_TO_INSERT; i++) {
982        Put put = new Put(Bytes.toBytes(i));
983        put.addColumn(FAMILY, QUALIFIER, LARGE_VALUE);
984        putList.add(put);
985
986        if (putList.size() >= BATCH_SIZE) {
987          table.put(putList);
988          admin.flush(tableName);
989          putList.clear();
990        }
991      }
992
993      if (!putList.isEmpty()) {
994        table.put(putList);
995        admin.flush(tableName);
996        putList.clear();
997      }
998
999      Scan scan = new Scan();
1000      scan.setReversed(true);
1001      int count = 0;
1002
1003      try (ResultScanner results = table.getScanner(scan)) {
1004        for (Result result : results) {
1005          count++;
1006        }
1007      }
1008      assertEquals("Expected " + ROWS_TO_INSERT + " rows in the table but it is " + count,
1009        ROWS_TO_INSERT, count);
1010    }
1011  }
1012
1013  @Test
1014  public void testScannerWithPartialResults() throws Exception {
1015    TableName tableName = TableName.valueOf("testScannerWithPartialResults");
1016    try (Table table = TEST_UTIL.createMultiRegionTable(tableName, Bytes.toBytes("c"), 4)) {
1017      List<Put> puts = new ArrayList<>();
1018      byte[] largeArray = new byte[10000];
1019      Put put = new Put(Bytes.toBytes("aaaa0"));
1020      put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("1"), Bytes.toBytes("1"));
1021      put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("2"), Bytes.toBytes("2"));
1022      put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("3"), Bytes.toBytes("3"));
1023      put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("4"), Bytes.toBytes("4"));
1024      puts.add(put);
1025      put = new Put(Bytes.toBytes("aaaa1"));
1026      put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("1"), Bytes.toBytes("1"));
1027      put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("2"), largeArray);
1028      put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("3"), largeArray);
1029      puts.add(put);
1030      table.put(puts);
1031      Scan scan = new Scan();
1032      scan.addFamily(Bytes.toBytes("c"));
1033      scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, tableName.getName());
1034      scan.setMaxResultSize(10001);
1035      scan.setStopRow(Bytes.toBytes("bbbb"));
1036      scan.setFilter(new LimitKVsReturnFilter());
1037      ResultScanner rs = table.getScanner(scan);
1038      Result result;
1039      int expectedKvNumber = 6;
1040      int returnedKvNumber = 0;
1041      while ((result = rs.next()) != null) {
1042        returnedKvNumber += result.listCells().size();
1043      }
1044      rs.close();
1045      assertEquals(expectedKvNumber, returnedKvNumber);
1046    }
1047  }
1048
1049  public static class LimitKVsReturnFilter extends FilterBase {
1050
1051    private int cellCount = 0;
1052
1053    @Override
1054    public ReturnCode filterCell(Cell v) throws IOException {
1055      if (cellCount >= 6) {
1056        cellCount++;
1057        return ReturnCode.SKIP;
1058      }
1059      cellCount++;
1060      return ReturnCode.INCLUDE;
1061    }
1062
1063    @Override
1064    public boolean filterAllRemaining() throws IOException {
1065      if (cellCount < 7) {
1066        return false;
1067      }
1068      cellCount++;
1069      return true;
1070    }
1071
1072    @Override
1073    public String toString() {
1074      return this.getClass().getSimpleName();
1075    }
1076
1077    public static LimitKVsReturnFilter parseFrom(final byte[] pbBytes)
1078      throws DeserializationException {
1079      return new LimitKVsReturnFilter();
1080    }
1081  }
1082}