001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.TestFromClientSide3.generateHugeValue;
021import static org.junit.Assert.assertArrayEquals;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.assertFalse;
024import static org.junit.Assert.assertNotNull;
025import static org.junit.Assert.assertNull;
026import static org.junit.Assert.assertTrue;
027import static org.junit.Assert.fail;
028
029import java.io.IOException;
030import java.util.ArrayList;
031import java.util.List;
032import java.util.concurrent.TimeUnit;
033import java.util.function.Consumer;
034import java.util.stream.IntStream;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.hbase.Cell;
037import org.apache.hadoop.hbase.CompareOperator;
038import org.apache.hadoop.hbase.HBaseClassTestRule;
039import org.apache.hadoop.hbase.HBaseTestingUtility;
040import org.apache.hadoop.hbase.HColumnDescriptor;
041import org.apache.hadoop.hbase.HConstants;
042import org.apache.hadoop.hbase.HRegionInfo;
043import org.apache.hadoop.hbase.HRegionLocation;
044import org.apache.hadoop.hbase.HTestConst;
045import org.apache.hadoop.hbase.KeyValue;
046import org.apache.hadoop.hbase.MiniHBaseCluster;
047import org.apache.hadoop.hbase.TableName;
048import org.apache.hadoop.hbase.TableNotFoundException;
049import org.apache.hadoop.hbase.filter.BinaryComparator;
050import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
051import org.apache.hadoop.hbase.filter.ColumnRangeFilter;
052import org.apache.hadoop.hbase.filter.QualifierFilter;
053import org.apache.hadoop.hbase.regionserver.HRegionServer;
054import org.apache.hadoop.hbase.testclassification.ClientTests;
055import org.apache.hadoop.hbase.testclassification.MediumTests;
056import org.apache.hadoop.hbase.util.Bytes;
057import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
058import org.junit.After;
059import org.junit.AfterClass;
060import org.junit.Before;
061import org.junit.BeforeClass;
062import org.junit.ClassRule;
063import org.junit.Rule;
064import org.junit.Test;
065import org.junit.experimental.categories.Category;
066import org.junit.rules.TestName;
067import org.slf4j.Logger;
068import org.slf4j.LoggerFactory;
069
070/**
071 * A client-side test, mostly testing scanners with various parameters.
072 */
073@Category({MediumTests.class, ClientTests.class})
074public class TestScannersFromClientSide {
075
076  @ClassRule
077  public static final HBaseClassTestRule CLASS_RULE =
078      HBaseClassTestRule.forClass(TestScannersFromClientSide.class);
079
080  private static final Logger LOG = LoggerFactory.getLogger(TestScannersFromClientSide.class);
081
082  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
083  private static byte [] ROW = Bytes.toBytes("testRow");
084  private static byte [] FAMILY = Bytes.toBytes("testFamily");
085  private static byte [] QUALIFIER = Bytes.toBytes("testQualifier");
086  private static byte [] VALUE = Bytes.toBytes("testValue");
087
088  @Rule
089  public TestName name = new TestName();
090
091  /**
092   * @throws java.lang.Exception
093   */
094  @BeforeClass
095  public static void setUpBeforeClass() throws Exception {
096    Configuration conf = TEST_UTIL.getConfiguration();
097    conf.setLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, 10 * 1024 * 1024);
098    TEST_UTIL.startMiniCluster(3);
099  }
100
101  /**
102   * @throws java.lang.Exception
103   */
104  @AfterClass
105  public static void tearDownAfterClass() throws Exception {
106    TEST_UTIL.shutdownMiniCluster();
107  }
108
109  /**
110   * @throws java.lang.Exception
111   */
112  @Before
113  public void setUp() throws Exception {
114    // Nothing to do.
115  }
116
117  /**
118   * @throws java.lang.Exception
119   */
120  @After
121  public void tearDown() throws Exception {
122    // Nothing to do.
123  }
124
125  /**
126   * Test from client side for batch of scan
127   *
128   * @throws Exception
129   */
130  @Test
131  public void testScanBatch() throws Exception {
132    final TableName tableName = TableName.valueOf(name.getMethodName());
133    byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 8);
134
135    Table ht = TEST_UTIL.createTable(tableName, FAMILY);
136
137    Put put;
138    Scan scan;
139    Delete delete;
140    Result result;
141    ResultScanner scanner;
142    boolean toLog = true;
143    List<Cell> kvListExp;
144
145    // table: row, family, c0:0, c1:1, ... , c7:7
146    put = new Put(ROW);
147    for (int i=0; i < QUALIFIERS.length; i++) {
148      KeyValue kv = new KeyValue(ROW, FAMILY, QUALIFIERS[i], i, VALUE);
149      put.add(kv);
150    }
151    ht.put(put);
152
153    // table: row, family, c0:0, c1:1, ..., c6:2, c6:6 , c7:7
154    put = new Put(ROW);
155    KeyValue kv = new KeyValue(ROW, FAMILY, QUALIFIERS[6], 2, VALUE);
156    put.add(kv);
157    ht.put(put);
158
159    // delete upto ts: 3
160    delete = new Delete(ROW);
161    delete.addFamily(FAMILY, 3);
162    ht.delete(delete);
163
164    // without batch
165    scan = new Scan().withStartRow(ROW);
166    scan.setMaxVersions();
167    scanner = ht.getScanner(scan);
168
169    // c4:4, c5:5, c6:6, c7:7
170    kvListExp = new ArrayList<>();
171    kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[4], 4, VALUE));
172    kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[5], 5, VALUE));
173    kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[6], 6, VALUE));
174    kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[7], 7, VALUE));
175    result = scanner.next();
176    verifyResult(result, kvListExp, toLog, "Testing first batch of scan");
177
178    // with batch
179    scan =  new Scan().withStartRow(ROW);
180    scan.setMaxVersions();
181    scan.setBatch(2);
182    scanner = ht.getScanner(scan);
183
184    // First batch: c4:4, c5:5
185    kvListExp = new ArrayList<>();
186    kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[4], 4, VALUE));
187    kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[5], 5, VALUE));
188    result = scanner.next();
189    verifyResult(result, kvListExp, toLog, "Testing first batch of scan");
190
191    // Second batch: c6:6, c7:7
192    kvListExp = new ArrayList<>();
193    kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[6], 6, VALUE));
194    kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[7], 7, VALUE));
195    result = scanner.next();
196    verifyResult(result, kvListExp, toLog, "Testing second batch of scan");
197
198  }
199
200  @Test
201  public void testMaxResultSizeIsSetToDefault() throws Exception {
202    final TableName tableName = TableName.valueOf(name.getMethodName());
203    Table ht = TEST_UTIL.createTable(tableName, FAMILY);
204
205    // The max result size we expect the scan to use by default.
206    long expectedMaxResultSize =
207        TEST_UTIL.getConfiguration().getLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
208          HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
209
210    int numRows = 5;
211    byte[][] ROWS = HTestConst.makeNAscii(ROW, numRows);
212
213    int numQualifiers = 10;
214    byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, numQualifiers);
215
216    // Specify the cell size such that a single row will be larger than the default
217    // value of maxResultSize. This means that Scan RPCs should return at most a single
218    // result back to the client.
219    int cellSize = (int) (expectedMaxResultSize / (numQualifiers - 1));
220    byte[] cellValue = Bytes.createMaxByteArray(cellSize);
221
222    Put put;
223    List<Put> puts = new ArrayList<>();
224    for (int row = 0; row < ROWS.length; row++) {
225      put = new Put(ROWS[row]);
226      for (int qual = 0; qual < QUALIFIERS.length; qual++) {
227        KeyValue kv = new KeyValue(ROWS[row], FAMILY, QUALIFIERS[qual], cellValue);
228        put.add(kv);
229      }
230      puts.add(put);
231    }
232    ht.put(puts);
233
234    // Create a scan with the default configuration.
235    Scan scan = new Scan();
236
237    ResultScanner scanner = ht.getScanner(scan);
238    assertTrue(scanner instanceof ClientScanner);
239    ClientScanner clientScanner = (ClientScanner) scanner;
240
241    // Call next to issue a single RPC to the server
242    scanner.next();
243
244    // The scanner should have, at most, a single result in its cache. If there more results exists
245    // in the cache it means that more than the expected max result size was fetched.
246    assertTrue("The cache contains: " + clientScanner.getCacheSize() + " results",
247      clientScanner.getCacheSize() <= 1);
248  }
249
250  /**
251   * Scan on not existing table should throw the exception with correct message
252   */
253  @Test
254  public void testScannerForNotExistingTable() {
255    String[] tableNames = {"A", "Z", "A:A", "Z:Z"};
256    for(String tableName : tableNames) {
257      try {
258        Table table = TEST_UTIL.getConnection().getTable(TableName.valueOf(tableName));
259        testSmallScan(table, true, 1, 5);
260        fail("TableNotFoundException was not thrown");
261      } catch (TableNotFoundException e) {
262        // We expect that the message for TableNotFoundException would have only the table name only
263        // Otherwise that would mean that localeRegionInMeta doesn't work properly
264        assertEquals(e.getMessage(), tableName);
265      } catch (Exception e) {
266        fail("Unexpected exception " + e.getMessage());
267      }
268    }
269  }
270
271  @Test
272  public void testSmallScan() throws Exception {
273    final TableName tableName = TableName.valueOf(name.getMethodName());
274
275    int numRows = 10;
276    byte[][] ROWS = HTestConst.makeNAscii(ROW, numRows);
277
278    int numQualifiers = 10;
279    byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, numQualifiers);
280
281    Table ht = TEST_UTIL.createTable(tableName, FAMILY);
282
283    Put put;
284    List<Put> puts = new ArrayList<>();
285    for (int row = 0; row < ROWS.length; row++) {
286      put = new Put(ROWS[row]);
287      for (int qual = 0; qual < QUALIFIERS.length; qual++) {
288        KeyValue kv = new KeyValue(ROWS[row], FAMILY, QUALIFIERS[qual], VALUE);
289        put.add(kv);
290      }
291      puts.add(put);
292    }
293    ht.put(puts);
294
295    int expectedRows = numRows;
296    int expectedCols = numRows * numQualifiers;
297
298    // Test normal and reversed
299    testSmallScan(ht, true, expectedRows, expectedCols);
300    testSmallScan(ht, false, expectedRows, expectedCols);
301  }
302
303  /**
304   * Run through a variety of test configurations with a small scan
305   * @param table
306   * @param reversed
307   * @param rows
308   * @param columns
309   * @throws Exception
310   */
311  private void testSmallScan(Table table, boolean reversed, int rows, int columns) throws Exception {
312    Scan baseScan = new Scan();
313    baseScan.setReversed(reversed);
314    baseScan.setSmall(true);
315
316    Scan scan = new Scan(baseScan);
317    verifyExpectedCounts(table, scan, rows, columns);
318
319    scan = new Scan(baseScan);
320    scan.setMaxResultSize(1);
321    verifyExpectedCounts(table, scan, rows, columns);
322
323    scan = new Scan(baseScan);
324    scan.setMaxResultSize(1);
325    scan.setCaching(Integer.MAX_VALUE);
326    verifyExpectedCounts(table, scan, rows, columns);
327  }
328
329  private void verifyExpectedCounts(Table table, Scan scan, int expectedRowCount,
330      int expectedCellCount) throws Exception {
331    ResultScanner scanner = table.getScanner(scan);
332
333    int rowCount = 0;
334    int cellCount = 0;
335    Result r = null;
336    while ((r = scanner.next()) != null) {
337      rowCount++;
338      cellCount += r.rawCells().length;
339    }
340
341    assertTrue("Expected row count: " + expectedRowCount + " Actual row count: " + rowCount,
342        expectedRowCount == rowCount);
343    assertTrue("Expected cell count: " + expectedCellCount + " Actual cell count: " + cellCount,
344        expectedCellCount == cellCount);
345    scanner.close();
346  }
347
348  /**
349   * Test from client side for get with maxResultPerCF set
350   *
351   * @throws Exception
352   */
353  @Test
354  public void testGetMaxResults() throws Exception {
355    final TableName tableName = TableName.valueOf(name.getMethodName());
356    byte [][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3);
357    byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 20);
358
359    Table ht = TEST_UTIL.createTable(tableName, FAMILIES);
360
361    Get get;
362    Put put;
363    Result result;
364    boolean toLog = true;
365    List<Cell> kvListExp;
366
367    kvListExp = new ArrayList<>();
368    // Insert one CF for row[0]
369    put = new Put(ROW);
370    for (int i=0; i < 10; i++) {
371      KeyValue kv = new KeyValue(ROW, FAMILIES[0], QUALIFIERS[i], 1, VALUE);
372      put.add(kv);
373      kvListExp.add(kv);
374    }
375    ht.put(put);
376
377    get = new Get(ROW);
378    result = ht.get(get);
379    verifyResult(result, kvListExp, toLog, "Testing without setting maxResults");
380
381    get = new Get(ROW);
382    get.setMaxResultsPerColumnFamily(2);
383    result = ht.get(get);
384    kvListExp = new ArrayList<>();
385    kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[0], 1, VALUE));
386    kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[1], 1, VALUE));
387    verifyResult(result, kvListExp, toLog, "Testing basic setMaxResults");
388
389    // Filters: ColumnRangeFilter
390    get = new Get(ROW);
391    get.setMaxResultsPerColumnFamily(5);
392    get.setFilter(new ColumnRangeFilter(QUALIFIERS[2], true, QUALIFIERS[5],
393                                        true));
394    result = ht.get(get);
395    kvListExp = new ArrayList<>();
396    kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[2], 1, VALUE));
397    kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[3], 1, VALUE));
398    kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[4], 1, VALUE));
399    kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[5], 1, VALUE));
400    verifyResult(result, kvListExp, toLog, "Testing single CF with CRF");
401
402    // Insert two more CF for row[0]
403    // 20 columns for CF2, 10 columns for CF1
404    put = new Put(ROW);
405    for (int i=0; i < QUALIFIERS.length; i++) {
406      KeyValue kv = new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE);
407      put.add(kv);
408    }
409    ht.put(put);
410
411    put = new Put(ROW);
412    for (int i=0; i < 10; i++) {
413      KeyValue kv = new KeyValue(ROW, FAMILIES[1], QUALIFIERS[i], 1, VALUE);
414      put.add(kv);
415    }
416    ht.put(put);
417
418    get = new Get(ROW);
419    get.setMaxResultsPerColumnFamily(12);
420    get.addFamily(FAMILIES[1]);
421    get.addFamily(FAMILIES[2]);
422    result = ht.get(get);
423    kvListExp = new ArrayList<>();
424    //Exp: CF1:q0, ..., q9, CF2: q0, q1, q10, q11, ..., q19
425    for (int i=0; i < 10; i++) {
426      kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[i], 1, VALUE));
427    }
428    for (int i=0; i < 2; i++) {
429        kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE));
430      }
431    for (int i=10; i < 20; i++) {
432      kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE));
433    }
434    verifyResult(result, kvListExp, toLog, "Testing multiple CFs");
435
436    // Filters: ColumnRangeFilter and ColumnPrefixFilter
437    get = new Get(ROW);
438    get.setMaxResultsPerColumnFamily(3);
439    get.setFilter(new ColumnRangeFilter(QUALIFIERS[2], true, null, true));
440    result = ht.get(get);
441    kvListExp = new ArrayList<>();
442    for (int i=2; i < 5; i++) {
443      kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[i], 1, VALUE));
444    }
445    for (int i=2; i < 5; i++) {
446      kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[i], 1, VALUE));
447    }
448    for (int i=2; i < 5; i++) {
449      kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE));
450    }
451    verifyResult(result, kvListExp, toLog, "Testing multiple CFs + CRF");
452
453    get = new Get(ROW);
454    get.setMaxResultsPerColumnFamily(7);
455    get.setFilter(new ColumnPrefixFilter(QUALIFIERS[1]));
456    result = ht.get(get);
457    kvListExp = new ArrayList<>();
458    kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[1], 1, VALUE));
459    kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[1], 1, VALUE));
460    kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[1], 1, VALUE));
461    for (int i=10; i < 16; i++) {
462      kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE));
463    }
464    verifyResult(result, kvListExp, toLog, "Testing multiple CFs + PFF");
465
466  }
467
468  /**
469   * Test from client side for scan with maxResultPerCF set
470   *
471   * @throws Exception
472   */
473  @Test
474  public void testScanMaxResults() throws Exception {
475    final TableName tableName = TableName.valueOf(name.getMethodName());
476    byte [][] ROWS = HTestConst.makeNAscii(ROW, 2);
477    byte [][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3);
478    byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 10);
479
480    Table ht = TEST_UTIL.createTable(tableName, FAMILIES);
481
482    Put put;
483    Scan scan;
484    Result result;
485    boolean toLog = true;
486    List<Cell> kvListExp, kvListScan;
487
488    kvListExp = new ArrayList<>();
489
490    for (int r=0; r < ROWS.length; r++) {
491      put = new Put(ROWS[r]);
492      for (int c=0; c < FAMILIES.length; c++) {
493        for (int q=0; q < QUALIFIERS.length; q++) {
494          KeyValue kv = new KeyValue(ROWS[r], FAMILIES[c], QUALIFIERS[q], 1, VALUE);
495          put.add(kv);
496          if (q < 4) {
497            kvListExp.add(kv);
498          }
499        }
500      }
501      ht.put(put);
502    }
503
504    scan = new Scan();
505    scan.setMaxResultsPerColumnFamily(4);
506    ResultScanner scanner = ht.getScanner(scan);
507    kvListScan = new ArrayList<>();
508    while ((result = scanner.next()) != null) {
509      for (Cell kv : result.listCells()) {
510        kvListScan.add(kv);
511      }
512    }
513    result = Result.create(kvListScan);
514    verifyResult(result, kvListExp, toLog, "Testing scan with maxResults");
515
516  }
517
518  /**
519   * Test from client side for get with rowOffset
520   *
521   * @throws Exception
522   */
523  @Test
524  public void testGetRowOffset() throws Exception {
525    final TableName tableName = TableName.valueOf(name.getMethodName());
526    byte [][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3);
527    byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 20);
528
529    Table ht = TEST_UTIL.createTable(tableName, FAMILIES);
530
531    Get get;
532    Put put;
533    Result result;
534    boolean toLog = true;
535    List<Cell> kvListExp;
536
537    // Insert one CF for row
538    kvListExp = new ArrayList<>();
539    put = new Put(ROW);
540    for (int i=0; i < 10; i++) {
541      KeyValue kv = new KeyValue(ROW, FAMILIES[0], QUALIFIERS[i], 1, VALUE);
542      put.add(kv);
543      // skipping first two kvs
544      if (i < 2) continue;
545      kvListExp.add(kv);
546    }
547    ht.put(put);
548
549    //setting offset to 2
550    get = new Get(ROW);
551    get.setRowOffsetPerColumnFamily(2);
552    result = ht.get(get);
553    verifyResult(result, kvListExp, toLog, "Testing basic setRowOffset");
554
555    //setting offset to 20
556    get = new Get(ROW);
557    get.setRowOffsetPerColumnFamily(20);
558    result = ht.get(get);
559    kvListExp = new ArrayList<>();
560    verifyResult(result, kvListExp, toLog, "Testing offset > #kvs");
561
562    //offset + maxResultPerCF
563    get = new Get(ROW);
564    get.setRowOffsetPerColumnFamily(4);
565    get.setMaxResultsPerColumnFamily(5);
566    result = ht.get(get);
567    kvListExp = new ArrayList<>();
568    for (int i=4; i < 9; i++) {
569      kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[i], 1, VALUE));
570    }
571    verifyResult(result, kvListExp, toLog,
572      "Testing offset + setMaxResultsPerCF");
573
574    // Filters: ColumnRangeFilter
575    get = new Get(ROW);
576    get.setRowOffsetPerColumnFamily(1);
577    get.setFilter(new ColumnRangeFilter(QUALIFIERS[2], true, QUALIFIERS[5],
578                                        true));
579    result = ht.get(get);
580    kvListExp = new ArrayList<>();
581    kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[3], 1, VALUE));
582    kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[4], 1, VALUE));
583    kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[5], 1, VALUE));
584    verifyResult(result, kvListExp, toLog, "Testing offset with CRF");
585
586    // Insert into two more CFs for row
587    // 10 columns for CF2, 10 columns for CF1
588    for(int j=2; j > 0; j--) {
589      put = new Put(ROW);
590      for (int i=0; i < 10; i++) {
591        KeyValue kv = new KeyValue(ROW, FAMILIES[j], QUALIFIERS[i], 1, VALUE);
592        put.add(kv);
593      }
594      ht.put(put);
595    }
596
597    get = new Get(ROW);
598    get.setRowOffsetPerColumnFamily(4);
599    get.setMaxResultsPerColumnFamily(2);
600    get.addFamily(FAMILIES[1]);
601    get.addFamily(FAMILIES[2]);
602    result = ht.get(get);
603    kvListExp = new ArrayList<>();
604    //Exp: CF1:q4, q5, CF2: q4, q5
605    kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[4], 1, VALUE));
606    kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[5], 1, VALUE));
607    kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[4], 1, VALUE));
608    kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[5], 1, VALUE));
609    verifyResult(result, kvListExp, toLog,
610       "Testing offset + multiple CFs + maxResults");
611  }
612
613  /**
614   * Test from client side for scan while the region is reopened
615   * on the same region server.
616   *
617   * @throws Exception
618   */
619  @Test
620  public void testScanOnReopenedRegion() throws Exception {
621    final TableName tableName = TableName.valueOf(name.getMethodName());
622    byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 2);
623
624    Table ht = TEST_UTIL.createTable(tableName, FAMILY);
625
626    Put put;
627    Scan scan;
628    Result result;
629    ResultScanner scanner;
630    boolean toLog = false;
631    List<Cell> kvListExp;
632
633    // table: row, family, c0:0, c1:1
634    put = new Put(ROW);
635    for (int i=0; i < QUALIFIERS.length; i++) {
636      KeyValue kv = new KeyValue(ROW, FAMILY, QUALIFIERS[i], i, VALUE);
637      put.add(kv);
638    }
639    ht.put(put);
640
641    scan = new Scan().withStartRow(ROW);
642    scanner = ht.getScanner(scan);
643
644    HRegionLocation loc;
645
646    try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName)) {
647      loc = locator.getRegionLocation(ROW);
648    }
649    HRegionInfo hri = loc.getRegionInfo();
650    MiniHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster();
651    byte[] regionName = hri.getRegionName();
652    int i = cluster.getServerWith(regionName);
653    HRegionServer rs = cluster.getRegionServer(i);
654    LOG.info("Unassigning " + hri);
655    TEST_UTIL.getAdmin().unassign(hri.getRegionName(), true);
656    long startTime = EnvironmentEdgeManager.currentTime();
657    long timeOut = 10000;
658    boolean offline = false;
659    while (true) {
660      if (rs.getOnlineRegion(regionName) == null) {
661        offline = true;
662        break;
663      }
664      assertTrue("Timed out in closing the testing region",
665        EnvironmentEdgeManager.currentTime() < startTime + timeOut);
666    }
667    assertTrue(offline);
668    LOG.info("Assigning " + hri);
669    TEST_UTIL.getAdmin().assign(hri.getRegionName());
670    startTime = EnvironmentEdgeManager.currentTime();
671    while (true) {
672      rs = cluster.getRegionServer(cluster.getServerWith(regionName));
673      if (rs != null && rs.getOnlineRegion(regionName) != null) {
674        offline = false;
675        break;
676      }
677      assertTrue("Timed out in open the testing region",
678        EnvironmentEdgeManager.currentTime() < startTime + timeOut);
679    }
680    assertFalse(offline);
681
682    // c0:0, c1:1
683    kvListExp = new ArrayList<>();
684    kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[0], 0, VALUE));
685    kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[1], 1, VALUE));
686    result = scanner.next();
687    verifyResult(result, kvListExp, toLog, "Testing scan on re-opened region");
688  }
689
690  @Test
691  public void testAsyncScannerWithSmallData() throws Exception {
692    testAsyncScanner(TableName.valueOf(name.getMethodName()),
693      2,
694      3,
695      10,
696      -1,
697      null);
698  }
699
700  @Test
701  public void testAsyncScannerWithManyRows() throws Exception {
702    testAsyncScanner(TableName.valueOf(name.getMethodName()),
703      30000,
704      1,
705      1,
706      -1,
707      null);
708  }
709
710  @Test
711  public void testAsyncScannerWithoutCaching() throws Exception {
712    testAsyncScanner(TableName.valueOf(name.getMethodName()),
713      5,
714      1,
715      1,
716      1,
717      (b) -> {
718        try {
719          TimeUnit.MILLISECONDS.sleep(500);
720        } catch (InterruptedException ex) {
721        }
722      });
723  }
724
725  private void testAsyncScanner(TableName table, int rowNumber, int familyNumber,
726      int qualifierNumber, int caching, Consumer<Boolean> listener) throws Exception {
727    assert rowNumber > 0;
728    assert familyNumber > 0;
729    assert qualifierNumber > 0;
730    byte[] row = Bytes.toBytes("r");
731    byte[] family = Bytes.toBytes("f");
732    byte[] qualifier = Bytes.toBytes("q");
733    byte[][] rows = makeNAsciiWithZeroPrefix(row, rowNumber);
734    byte[][] families = makeNAsciiWithZeroPrefix(family, familyNumber);
735    byte[][] qualifiers = makeNAsciiWithZeroPrefix(qualifier, qualifierNumber);
736
737    Table ht = TEST_UTIL.createTable(table, families);
738
739    boolean toLog = true;
740    List<Cell> kvListExp = new ArrayList<>();
741
742    List<Put> puts = new ArrayList<>();
743    for (byte[] r : rows) {
744      Put put = new Put(r);
745      for (byte[] f : families) {
746        for (byte[] q : qualifiers) {
747          KeyValue kv = new KeyValue(r, f, q, 1, VALUE);
748          put.add(kv);
749          kvListExp.add(kv);
750        }
751      }
752      puts.add(put);
753      if (puts.size() > 1000) {
754        ht.put(puts);
755        puts.clear();
756      }
757    }
758    if (!puts.isEmpty()) {
759      ht.put(puts);
760      puts.clear();
761    }
762
763    Scan scan = new Scan();
764    scan.setAsyncPrefetch(true);
765    if (caching > 0) {
766      scan.setCaching(caching);
767    }
768    try (ResultScanner scanner = ht.getScanner(scan)) {
769      assertTrue("Not instance of async scanner",scanner instanceof ClientAsyncPrefetchScanner);
770      ((ClientAsyncPrefetchScanner) scanner).setPrefetchListener(listener);
771      List<Cell> kvListScan = new ArrayList<>();
772      Result result;
773      boolean first = true;
774      int actualRows = 0;
775      while ((result = scanner.next()) != null) {
776        ++actualRows;
777        // waiting for cache. see HBASE-17376
778        if (first) {
779          TimeUnit.SECONDS.sleep(1);
780          first = false;
781        }
782        for (Cell kv : result.listCells()) {
783          kvListScan.add(kv);
784        }
785      }
786      assertEquals(rowNumber, actualRows);
787      // These cells may have different rows but it is ok. The Result#getRow
788      // isn't used in the verifyResult()
789      result = Result.create(kvListScan);
790      verifyResult(result, kvListExp, toLog, "Testing async scan");
791    }
792
793    TEST_UTIL.deleteTable(table);
794  }
795
796  private static byte[][] makeNAsciiWithZeroPrefix(byte[] base, int n) {
797    int maxLength = Integer.toString(n).length();
798    byte [][] ret = new byte[n][];
799    for (int i = 0; i < n; i++) {
800      int length = Integer.toString(i).length();
801      StringBuilder buf = new StringBuilder(Integer.toString(i));
802      IntStream.range(0, maxLength - length).forEach(v -> buf.insert(0, "0"));
803      byte[] tail = Bytes.toBytes(buf.toString());
804      ret[i] = Bytes.add(base, tail);
805    }
806    return ret;
807  }
808
809  static void verifyResult(Result result, List<Cell> expKvList, boolean toLog,
810      String msg) {
811
812    LOG.info(msg);
813    LOG.info("Expected count: " + expKvList.size());
814    LOG.info("Actual count: " + result.size());
815    if (expKvList.isEmpty())
816      return;
817
818    int i = 0;
819    for (Cell kv : result.rawCells()) {
820      if (i >= expKvList.size()) {
821        break;  // we will check the size later
822      }
823
824      Cell kvExp = expKvList.get(i++);
825      if (toLog) {
826        LOG.info("get kv is: " + kv.toString());
827        LOG.info("exp kv is: " + kvExp.toString());
828      }
829      assertTrue("Not equal", kvExp.equals(kv));
830    }
831
832    assertEquals(expKvList.size(), result.size());
833  }
834
835  @Test
836  public void testReadExpiredDataForRawScan() throws IOException {
837    TableName tableName = TableName.valueOf(name.getMethodName());
838    long ts = System.currentTimeMillis() - 10000;
839    byte[] value = Bytes.toBytes("expired");
840    try (Table table = TEST_UTIL.createTable(tableName, FAMILY)) {
841      table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, ts, value));
842      assertArrayEquals(value, table.get(new Get(ROW)).getValue(FAMILY, QUALIFIER));
843      TEST_UTIL.getAdmin().modifyColumnFamily(tableName,
844        new HColumnDescriptor(FAMILY).setTimeToLive(5));
845      try (ResultScanner scanner = table.getScanner(FAMILY)) {
846        assertNull(scanner.next());
847      }
848      try (ResultScanner scanner = table.getScanner(new Scan().setRaw(true))) {
849        assertArrayEquals(value, scanner.next().getValue(FAMILY, QUALIFIER));
850        assertNull(scanner.next());
851      }
852    }
853  }
854
855  @Test
856  public void testScanWithColumnsAndFilterAndVersion() throws IOException {
857    TableName tableName = TableName.valueOf(name.getMethodName());
858    try (Table table = TEST_UTIL.createTable(tableName, FAMILY, 4)) {
859      for (int i = 0; i < 4; i++) {
860        Put put = new Put(ROW);
861        put.addColumn(FAMILY, QUALIFIER, VALUE);
862        table.put(put);
863      }
864
865      Scan scan = new Scan();
866      scan.addColumn(FAMILY, QUALIFIER);
867      scan.setFilter(new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(QUALIFIER)));
868      scan.readVersions(3);
869
870      try (ResultScanner scanner = table.getScanner(scan)) {
871        Result result = scanner.next();
872        assertEquals(3, result.size());
873      }
874    }
875  }
876
877  @Test
878  public void testScanWithSameStartRowStopRow() throws IOException {
879    TableName tableName = TableName.valueOf(name.getMethodName());
880    try (Table table = TEST_UTIL.createTable(tableName, FAMILY)) {
881      table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE));
882
883      Scan scan = new Scan().withStartRow(ROW).withStopRow(ROW);
884      try (ResultScanner scanner = table.getScanner(scan)) {
885        assertNull(scanner.next());
886      }
887
888      scan = new Scan().withStartRow(ROW, true).withStopRow(ROW, true);
889      try (ResultScanner scanner = table.getScanner(scan)) {
890        Result result = scanner.next();
891        assertNotNull(result);
892        assertArrayEquals(ROW, result.getRow());
893        assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER));
894        assertNull(scanner.next());
895      }
896
897      scan = new Scan().withStartRow(ROW, true).withStopRow(ROW, false);
898      try (ResultScanner scanner = table.getScanner(scan)) {
899        assertNull(scanner.next());
900      }
901
902      scan = new Scan().withStartRow(ROW, false).withStopRow(ROW, false);
903      try (ResultScanner scanner = table.getScanner(scan)) {
904        assertNull(scanner.next());
905      }
906
907      scan = new Scan().withStartRow(ROW, false).withStopRow(ROW, true);
908      try (ResultScanner scanner = table.getScanner(scan)) {
909        assertNull(scanner.next());
910      }
911    }
912  }
913
914  @Test
915  public void testReverseScanWithFlush() throws Exception {
916    TableName tableName = TableName.valueOf(name.getMethodName());
917    final int BATCH_SIZE = 10;
918    final int ROWS_TO_INSERT = 100;
919    final byte[] LARGE_VALUE = generateHugeValue(128 * 1024);
920
921    try (Table table = TEST_UTIL.createTable(tableName, FAMILY);
922        Admin admin = TEST_UTIL.getAdmin()) {
923      List<Put> putList = new ArrayList<>();
924      for (long i = 0; i < ROWS_TO_INSERT; i++) {
925        Put put = new Put(Bytes.toBytes(i));
926        put.addColumn(FAMILY, QUALIFIER, LARGE_VALUE);
927        putList.add(put);
928
929        if (putList.size() >= BATCH_SIZE) {
930          table.put(putList);
931          admin.flush(tableName);
932          putList.clear();
933        }
934      }
935
936      if (!putList.isEmpty()) {
937        table.put(putList);
938        admin.flush(tableName);
939        putList.clear();
940      }
941
942      Scan scan = new Scan();
943      scan.setReversed(true);
944      int count = 0;
945
946      try (ResultScanner results = table.getScanner(scan)) {
947        for (Result result : results) {
948          count++;
949        }
950      }
951      assertEquals("Expected " + ROWS_TO_INSERT + " rows in the table but it is " + count,
952          ROWS_TO_INSERT, count);
953    }
954  }
955}