001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.HConstants.RPC_CODEC_CONF_KEY;
021import static org.apache.hadoop.hbase.client.TestFromClientSide3.generateHugeValue;
022import static org.apache.hadoop.hbase.ipc.RpcClient.DEFAULT_CODEC_CLASS;
023import static org.junit.Assert.assertArrayEquals;
024import static org.junit.Assert.assertEquals;
025import static org.junit.Assert.assertFalse;
026import static org.junit.Assert.assertNotNull;
027import static org.junit.Assert.assertNull;
028import static org.junit.Assert.assertTrue;
029import static org.junit.Assert.fail;
030
031import java.io.IOException;
032import java.util.ArrayList;
033import java.util.List;
034import java.util.concurrent.TimeUnit;
035import java.util.function.Consumer;
036import java.util.stream.IntStream;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.hbase.Cell;
039import org.apache.hadoop.hbase.CompareOperator;
040import org.apache.hadoop.hbase.HBaseClassTestRule;
041import org.apache.hadoop.hbase.HBaseTestingUtility;
042import org.apache.hadoop.hbase.HColumnDescriptor;
043import org.apache.hadoop.hbase.HConstants;
044import org.apache.hadoop.hbase.HRegionInfo;
045import org.apache.hadoop.hbase.HRegionLocation;
046import org.apache.hadoop.hbase.HTestConst;
047import org.apache.hadoop.hbase.KeyValue;
048import org.apache.hadoop.hbase.MiniHBaseCluster;
049import org.apache.hadoop.hbase.exceptions.DeserializationException;
050import org.apache.hadoop.hbase.filter.FilterBase;
051import org.apache.hadoop.hbase.testclassification.MediumTests;
052import org.apache.hadoop.hbase.TableName;
053import org.apache.hadoop.hbase.TableNotFoundException;
054import org.apache.hadoop.hbase.filter.BinaryComparator;
055import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
056import org.apache.hadoop.hbase.filter.ColumnRangeFilter;
057import org.apache.hadoop.hbase.filter.QualifierFilter;
058import org.apache.hadoop.hbase.regionserver.HRegionServer;
059import org.apache.hadoop.hbase.testclassification.ClientTests;
060import org.apache.hadoop.hbase.util.Bytes;
061import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
062import org.junit.After;
063import org.junit.AfterClass;
064import org.junit.Before;
065import org.junit.BeforeClass;
066import org.junit.ClassRule;
067import org.junit.Rule;
068import org.junit.Test;
069import org.junit.experimental.categories.Category;
070import org.junit.rules.TestName;
071import org.slf4j.Logger;
072import org.slf4j.LoggerFactory;
073
074/**
075 * A client-side test, mostly testing scanners with various parameters.
076 */
077@Category({MediumTests.class, ClientTests.class})
078public class TestScannersFromClientSide {
079
080  @ClassRule
081  public static final HBaseClassTestRule CLASS_RULE =
082      HBaseClassTestRule.forClass(TestScannersFromClientSide.class);
083
084  private static final Logger LOG = LoggerFactory.getLogger(TestScannersFromClientSide.class);
085
086  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
087  private static byte [] ROW = Bytes.toBytes("testRow");
088  private static byte [] FAMILY = Bytes.toBytes("testFamily");
089  private static byte [] QUALIFIER = Bytes.toBytes("testQualifier");
090  private static byte [] VALUE = Bytes.toBytes("testValue");
091
092  @Rule
093  public TestName name = new TestName();
094
095  /**
096   * @throws java.lang.Exception
097   */
098  @BeforeClass
099  public static void setUpBeforeClass() throws Exception {
100    Configuration conf = TEST_UTIL.getConfiguration();
101    conf.setLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, 10 * 1024 * 1024);
102    TEST_UTIL.startMiniCluster(3);
103  }
104
105  /**
106   * @throws java.lang.Exception
107   */
108  @AfterClass
109  public static void tearDownAfterClass() throws Exception {
110    TEST_UTIL.shutdownMiniCluster();
111  }
112
113  /**
114   * @throws java.lang.Exception
115   */
116  @Before
117  public void setUp() throws Exception {
118    // Nothing to do.
119  }
120
121  /**
122   * @throws java.lang.Exception
123   */
124  @After
125  public void tearDown() throws Exception {
126    // Nothing to do.
127  }
128
129  /**
130   * Test from client side for batch of scan
131   *
132   * @throws Exception
133   */
134  @Test
135  public void testScanBatch() throws Exception {
136    final TableName tableName = TableName.valueOf(name.getMethodName());
137    byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 8);
138
139    Table ht = TEST_UTIL.createTable(tableName, FAMILY);
140
141    Put put;
142    Scan scan;
143    Delete delete;
144    Result result;
145    ResultScanner scanner;
146    boolean toLog = true;
147    List<Cell> kvListExp;
148
149    // table: row, family, c0:0, c1:1, ... , c7:7
150    put = new Put(ROW);
151    for (int i=0; i < QUALIFIERS.length; i++) {
152      KeyValue kv = new KeyValue(ROW, FAMILY, QUALIFIERS[i], i, VALUE);
153      put.add(kv);
154    }
155    ht.put(put);
156
157    // table: row, family, c0:0, c1:1, ..., c6:2, c6:6 , c7:7
158    put = new Put(ROW);
159    KeyValue kv = new KeyValue(ROW, FAMILY, QUALIFIERS[6], 2, VALUE);
160    put.add(kv);
161    ht.put(put);
162
163    // delete upto ts: 3
164    delete = new Delete(ROW);
165    delete.addFamily(FAMILY, 3);
166    ht.delete(delete);
167
168    // without batch
169    scan = new Scan().withStartRow(ROW);
170    scan.setMaxVersions();
171    scanner = ht.getScanner(scan);
172
173    // c4:4, c5:5, c6:6, c7:7
174    kvListExp = new ArrayList<>();
175    kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[4], 4, VALUE));
176    kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[5], 5, VALUE));
177    kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[6], 6, VALUE));
178    kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[7], 7, VALUE));
179    result = scanner.next();
180    verifyResult(result, kvListExp, toLog, "Testing first batch of scan");
181
182    // with batch
183    scan =  new Scan().withStartRow(ROW);
184    scan.setMaxVersions();
185    scan.setBatch(2);
186    scanner = ht.getScanner(scan);
187
188    // First batch: c4:4, c5:5
189    kvListExp = new ArrayList<>();
190    kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[4], 4, VALUE));
191    kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[5], 5, VALUE));
192    result = scanner.next();
193    verifyResult(result, kvListExp, toLog, "Testing first batch of scan");
194
195    // Second batch: c6:6, c7:7
196    kvListExp = new ArrayList<>();
197    kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[6], 6, VALUE));
198    kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[7], 7, VALUE));
199    result = scanner.next();
200    verifyResult(result, kvListExp, toLog, "Testing second batch of scan");
201
202  }
203
204  @Test
205  public void testMaxResultSizeIsSetToDefault() throws Exception {
206    final TableName tableName = TableName.valueOf(name.getMethodName());
207    Table ht = TEST_UTIL.createTable(tableName, FAMILY);
208
209    // The max result size we expect the scan to use by default.
210    long expectedMaxResultSize =
211        TEST_UTIL.getConfiguration().getLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
212          HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
213
214    int numRows = 5;
215    byte[][] ROWS = HTestConst.makeNAscii(ROW, numRows);
216
217    int numQualifiers = 10;
218    byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, numQualifiers);
219
220    // Specify the cell size such that a single row will be larger than the default
221    // value of maxResultSize. This means that Scan RPCs should return at most a single
222    // result back to the client.
223    int cellSize = (int) (expectedMaxResultSize / (numQualifiers - 1));
224    byte[] cellValue = Bytes.createMaxByteArray(cellSize);
225
226    Put put;
227    List<Put> puts = new ArrayList<>();
228    for (int row = 0; row < ROWS.length; row++) {
229      put = new Put(ROWS[row]);
230      for (int qual = 0; qual < QUALIFIERS.length; qual++) {
231        KeyValue kv = new KeyValue(ROWS[row], FAMILY, QUALIFIERS[qual], cellValue);
232        put.add(kv);
233      }
234      puts.add(put);
235    }
236    ht.put(puts);
237
238    // Create a scan with the default configuration.
239    Scan scan = new Scan();
240
241    ResultScanner scanner = ht.getScanner(scan);
242    assertTrue(scanner instanceof ClientScanner);
243    ClientScanner clientScanner = (ClientScanner) scanner;
244
245    // Call next to issue a single RPC to the server
246    scanner.next();
247
248    // The scanner should have, at most, a single result in its cache. If there more results exists
249    // in the cache it means that more than the expected max result size was fetched.
250    assertTrue("The cache contains: " + clientScanner.getCacheSize() + " results",
251      clientScanner.getCacheSize() <= 1);
252  }
253
254  /**
255   * Scan on not existing table should throw the exception with correct message
256   */
257  @Test
258  public void testScannerForNotExistingTable() {
259    String[] tableNames = {"A", "Z", "A:A", "Z:Z"};
260    for(String tableName : tableNames) {
261      try {
262        Table table = TEST_UTIL.getConnection().getTable(TableName.valueOf(tableName));
263        testSmallScan(table, true, 1, 5);
264        fail("TableNotFoundException was not thrown");
265      } catch (TableNotFoundException e) {
266        // We expect that the message for TableNotFoundException would have only the table name only
267        // Otherwise that would mean that localeRegionInMeta doesn't work properly
268        assertEquals(e.getMessage(), tableName);
269      } catch (Exception e) {
270        fail("Unexpected exception " + e.getMessage());
271      }
272    }
273  }
274
275  @Test
276  public void testSmallScan() throws Exception {
277    final TableName tableName = TableName.valueOf(name.getMethodName());
278
279    int numRows = 10;
280    byte[][] ROWS = HTestConst.makeNAscii(ROW, numRows);
281
282    int numQualifiers = 10;
283    byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, numQualifiers);
284
285    Table ht = TEST_UTIL.createTable(tableName, FAMILY);
286
287    Put put;
288    List<Put> puts = new ArrayList<>();
289    for (int row = 0; row < ROWS.length; row++) {
290      put = new Put(ROWS[row]);
291      for (int qual = 0; qual < QUALIFIERS.length; qual++) {
292        KeyValue kv = new KeyValue(ROWS[row], FAMILY, QUALIFIERS[qual], VALUE);
293        put.add(kv);
294      }
295      puts.add(put);
296    }
297    ht.put(puts);
298
299    int expectedRows = numRows;
300    int expectedCols = numRows * numQualifiers;
301
302    // Test normal and reversed
303    testSmallScan(ht, true, expectedRows, expectedCols);
304    testSmallScan(ht, false, expectedRows, expectedCols);
305  }
306
307  /**
308   * Run through a variety of test configurations with a small scan
309   * @param table
310   * @param reversed
311   * @param rows
312   * @param columns
313   * @throws Exception
314   */
315  private void testSmallScan(Table table, boolean reversed, int rows, int columns) throws Exception {
316    Scan baseScan = new Scan();
317    baseScan.setReversed(reversed);
318    baseScan.setSmall(true);
319
320    Scan scan = new Scan(baseScan);
321    verifyExpectedCounts(table, scan, rows, columns);
322
323    scan = new Scan(baseScan);
324    scan.setMaxResultSize(1);
325    verifyExpectedCounts(table, scan, rows, columns);
326
327    scan = new Scan(baseScan);
328    scan.setMaxResultSize(1);
329    scan.setCaching(Integer.MAX_VALUE);
330    verifyExpectedCounts(table, scan, rows, columns);
331  }
332
333  private void verifyExpectedCounts(Table table, Scan scan, int expectedRowCount,
334      int expectedCellCount) throws Exception {
335    ResultScanner scanner = table.getScanner(scan);
336
337    int rowCount = 0;
338    int cellCount = 0;
339    Result r = null;
340    while ((r = scanner.next()) != null) {
341      rowCount++;
342      cellCount += r.rawCells().length;
343    }
344
345    assertTrue("Expected row count: " + expectedRowCount + " Actual row count: " + rowCount,
346        expectedRowCount == rowCount);
347    assertTrue("Expected cell count: " + expectedCellCount + " Actual cell count: " + cellCount,
348        expectedCellCount == cellCount);
349    scanner.close();
350  }
351
352  /**
353   * Test from client side for get with maxResultPerCF set
354   *
355   * @throws Exception
356   */
357  @Test
358  public void testGetMaxResults() throws Exception {
359    final TableName tableName = TableName.valueOf(name.getMethodName());
360    byte [][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3);
361    byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 20);
362
363    Table ht = TEST_UTIL.createTable(tableName, FAMILIES);
364
365    Get get;
366    Put put;
367    Result result;
368    boolean toLog = true;
369    List<Cell> kvListExp;
370
371    kvListExp = new ArrayList<>();
372    // Insert one CF for row[0]
373    put = new Put(ROW);
374    for (int i=0; i < 10; i++) {
375      KeyValue kv = new KeyValue(ROW, FAMILIES[0], QUALIFIERS[i], 1, VALUE);
376      put.add(kv);
377      kvListExp.add(kv);
378    }
379    ht.put(put);
380
381    get = new Get(ROW);
382    result = ht.get(get);
383    verifyResult(result, kvListExp, toLog, "Testing without setting maxResults");
384
385    get = new Get(ROW);
386    get.setMaxResultsPerColumnFamily(2);
387    result = ht.get(get);
388    kvListExp = new ArrayList<>();
389    kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[0], 1, VALUE));
390    kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[1], 1, VALUE));
391    verifyResult(result, kvListExp, toLog, "Testing basic setMaxResults");
392
393    // Filters: ColumnRangeFilter
394    get = new Get(ROW);
395    get.setMaxResultsPerColumnFamily(5);
396    get.setFilter(new ColumnRangeFilter(QUALIFIERS[2], true, QUALIFIERS[5],
397                                        true));
398    result = ht.get(get);
399    kvListExp = new ArrayList<>();
400    kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[2], 1, VALUE));
401    kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[3], 1, VALUE));
402    kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[4], 1, VALUE));
403    kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[5], 1, VALUE));
404    verifyResult(result, kvListExp, toLog, "Testing single CF with CRF");
405
406    // Insert two more CF for row[0]
407    // 20 columns for CF2, 10 columns for CF1
408    put = new Put(ROW);
409    for (int i=0; i < QUALIFIERS.length; i++) {
410      KeyValue kv = new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE);
411      put.add(kv);
412    }
413    ht.put(put);
414
415    put = new Put(ROW);
416    for (int i=0; i < 10; i++) {
417      KeyValue kv = new KeyValue(ROW, FAMILIES[1], QUALIFIERS[i], 1, VALUE);
418      put.add(kv);
419    }
420    ht.put(put);
421
422    get = new Get(ROW);
423    get.setMaxResultsPerColumnFamily(12);
424    get.addFamily(FAMILIES[1]);
425    get.addFamily(FAMILIES[2]);
426    result = ht.get(get);
427    kvListExp = new ArrayList<>();
428    //Exp: CF1:q0, ..., q9, CF2: q0, q1, q10, q11, ..., q19
429    for (int i=0; i < 10; i++) {
430      kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[i], 1, VALUE));
431    }
432    for (int i=0; i < 2; i++) {
433        kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE));
434      }
435    for (int i=10; i < 20; i++) {
436      kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE));
437    }
438    verifyResult(result, kvListExp, toLog, "Testing multiple CFs");
439
440    // Filters: ColumnRangeFilter and ColumnPrefixFilter
441    get = new Get(ROW);
442    get.setMaxResultsPerColumnFamily(3);
443    get.setFilter(new ColumnRangeFilter(QUALIFIERS[2], true, null, true));
444    result = ht.get(get);
445    kvListExp = new ArrayList<>();
446    for (int i=2; i < 5; i++) {
447      kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[i], 1, VALUE));
448    }
449    for (int i=2; i < 5; i++) {
450      kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[i], 1, VALUE));
451    }
452    for (int i=2; i < 5; i++) {
453      kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE));
454    }
455    verifyResult(result, kvListExp, toLog, "Testing multiple CFs + CRF");
456
457    get = new Get(ROW);
458    get.setMaxResultsPerColumnFamily(7);
459    get.setFilter(new ColumnPrefixFilter(QUALIFIERS[1]));
460    result = ht.get(get);
461    kvListExp = new ArrayList<>();
462    kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[1], 1, VALUE));
463    kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[1], 1, VALUE));
464    kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[1], 1, VALUE));
465    for (int i=10; i < 16; i++) {
466      kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE));
467    }
468    verifyResult(result, kvListExp, toLog, "Testing multiple CFs + PFF");
469
470  }
471
472  /**
473   * Test from client side for scan with maxResultPerCF set
474   *
475   * @throws Exception
476   */
477  @Test
478  public void testScanMaxResults() throws Exception {
479    final TableName tableName = TableName.valueOf(name.getMethodName());
480    byte [][] ROWS = HTestConst.makeNAscii(ROW, 2);
481    byte [][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3);
482    byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 10);
483
484    Table ht = TEST_UTIL.createTable(tableName, FAMILIES);
485
486    Put put;
487    Scan scan;
488    Result result;
489    boolean toLog = true;
490    List<Cell> kvListExp, kvListScan;
491
492    kvListExp = new ArrayList<>();
493
494    for (int r=0; r < ROWS.length; r++) {
495      put = new Put(ROWS[r]);
496      for (int c=0; c < FAMILIES.length; c++) {
497        for (int q=0; q < QUALIFIERS.length; q++) {
498          KeyValue kv = new KeyValue(ROWS[r], FAMILIES[c], QUALIFIERS[q], 1, VALUE);
499          put.add(kv);
500          if (q < 4) {
501            kvListExp.add(kv);
502          }
503        }
504      }
505      ht.put(put);
506    }
507
508    scan = new Scan();
509    scan.setMaxResultsPerColumnFamily(4);
510    ResultScanner scanner = ht.getScanner(scan);
511    kvListScan = new ArrayList<>();
512    while ((result = scanner.next()) != null) {
513      for (Cell kv : result.listCells()) {
514        kvListScan.add(kv);
515      }
516    }
517    result = Result.create(kvListScan);
518    verifyResult(result, kvListExp, toLog, "Testing scan with maxResults");
519
520  }
521
522  /**
523   * Test from client side for get with rowOffset
524   *
525   * @throws Exception
526   */
527  @Test
528  public void testGetRowOffset() throws Exception {
529    final TableName tableName = TableName.valueOf(name.getMethodName());
530    byte [][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3);
531    byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 20);
532
533    Table ht = TEST_UTIL.createTable(tableName, FAMILIES);
534
535    Get get;
536    Put put;
537    Result result;
538    boolean toLog = true;
539    List<Cell> kvListExp;
540
541    // Insert one CF for row
542    kvListExp = new ArrayList<>();
543    put = new Put(ROW);
544    for (int i=0; i < 10; i++) {
545      KeyValue kv = new KeyValue(ROW, FAMILIES[0], QUALIFIERS[i], 1, VALUE);
546      put.add(kv);
547      // skipping first two kvs
548      if (i < 2) continue;
549      kvListExp.add(kv);
550    }
551    ht.put(put);
552
553    //setting offset to 2
554    get = new Get(ROW);
555    get.setRowOffsetPerColumnFamily(2);
556    result = ht.get(get);
557    verifyResult(result, kvListExp, toLog, "Testing basic setRowOffset");
558
559    //setting offset to 20
560    get = new Get(ROW);
561    get.setRowOffsetPerColumnFamily(20);
562    result = ht.get(get);
563    kvListExp = new ArrayList<>();
564    verifyResult(result, kvListExp, toLog, "Testing offset > #kvs");
565
566    //offset + maxResultPerCF
567    get = new Get(ROW);
568    get.setRowOffsetPerColumnFamily(4);
569    get.setMaxResultsPerColumnFamily(5);
570    result = ht.get(get);
571    kvListExp = new ArrayList<>();
572    for (int i=4; i < 9; i++) {
573      kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[i], 1, VALUE));
574    }
575    verifyResult(result, kvListExp, toLog,
576      "Testing offset + setMaxResultsPerCF");
577
578    // Filters: ColumnRangeFilter
579    get = new Get(ROW);
580    get.setRowOffsetPerColumnFamily(1);
581    get.setFilter(new ColumnRangeFilter(QUALIFIERS[2], true, QUALIFIERS[5],
582                                        true));
583    result = ht.get(get);
584    kvListExp = new ArrayList<>();
585    kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[3], 1, VALUE));
586    kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[4], 1, VALUE));
587    kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[5], 1, VALUE));
588    verifyResult(result, kvListExp, toLog, "Testing offset with CRF");
589
590    // Insert into two more CFs for row
591    // 10 columns for CF2, 10 columns for CF1
592    for(int j=2; j > 0; j--) {
593      put = new Put(ROW);
594      for (int i=0; i < 10; i++) {
595        KeyValue kv = new KeyValue(ROW, FAMILIES[j], QUALIFIERS[i], 1, VALUE);
596        put.add(kv);
597      }
598      ht.put(put);
599    }
600
601    get = new Get(ROW);
602    get.setRowOffsetPerColumnFamily(4);
603    get.setMaxResultsPerColumnFamily(2);
604    get.addFamily(FAMILIES[1]);
605    get.addFamily(FAMILIES[2]);
606    result = ht.get(get);
607    kvListExp = new ArrayList<>();
608    //Exp: CF1:q4, q5, CF2: q4, q5
609    kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[4], 1, VALUE));
610    kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[5], 1, VALUE));
611    kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[4], 1, VALUE));
612    kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[5], 1, VALUE));
613    verifyResult(result, kvListExp, toLog,
614       "Testing offset + multiple CFs + maxResults");
615  }
616
617  @Test
618  public void testScanRawDeleteFamilyVersion() throws Exception {
619    TableName tableName = TableName.valueOf(name.getMethodName());
620    TEST_UTIL.createTable(tableName, FAMILY);
621    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
622    conf.set(RPC_CODEC_CONF_KEY, "");
623    conf.set(DEFAULT_CODEC_CLASS, "");
624    try (Connection connection = ConnectionFactory.createConnection(conf);
625        Table table = connection.getTable(tableName)) {
626      Delete delete = new Delete(ROW);
627      delete.addFamilyVersion(FAMILY, 0L);
628      table.delete(delete);
629      Scan scan = new Scan(ROW).setRaw(true);
630      ResultScanner scanner = table.getScanner(scan);
631      int count = 0;
632      while (scanner.next() != null) {
633        count++;
634      }
635      assertEquals(1, count);
636    } finally {
637      TEST_UTIL.deleteTable(tableName);
638    }
639  }
640
641  /**
642   * Test from client side for scan while the region is reopened
643   * on the same region server.
644   *
645   * @throws Exception
646   */
647  @Test
648  public void testScanOnReopenedRegion() throws Exception {
649    final TableName tableName = TableName.valueOf(name.getMethodName());
650    byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 2);
651
652    Table ht = TEST_UTIL.createTable(tableName, FAMILY);
653
654    Put put;
655    Scan scan;
656    Result result;
657    ResultScanner scanner;
658    boolean toLog = false;
659    List<Cell> kvListExp;
660
661    // table: row, family, c0:0, c1:1
662    put = new Put(ROW);
663    for (int i=0; i < QUALIFIERS.length; i++) {
664      KeyValue kv = new KeyValue(ROW, FAMILY, QUALIFIERS[i], i, VALUE);
665      put.add(kv);
666    }
667    ht.put(put);
668
669    scan = new Scan().withStartRow(ROW);
670    scanner = ht.getScanner(scan);
671
672    HRegionLocation loc;
673
674    try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName)) {
675      loc = locator.getRegionLocation(ROW);
676    }
677    HRegionInfo hri = loc.getRegionInfo();
678    MiniHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster();
679    byte[] regionName = hri.getRegionName();
680    int i = cluster.getServerWith(regionName);
681    HRegionServer rs = cluster.getRegionServer(i);
682    LOG.info("Unassigning " + hri);
683    TEST_UTIL.getAdmin().unassign(hri.getRegionName(), true);
684    long startTime = EnvironmentEdgeManager.currentTime();
685    long timeOut = 10000;
686    boolean offline = false;
687    while (true) {
688      if (rs.getOnlineRegion(regionName) == null) {
689        offline = true;
690        break;
691      }
692      assertTrue("Timed out in closing the testing region",
693        EnvironmentEdgeManager.currentTime() < startTime + timeOut);
694    }
695    assertTrue(offline);
696    LOG.info("Assigning " + hri);
697    TEST_UTIL.getAdmin().assign(hri.getRegionName());
698    startTime = EnvironmentEdgeManager.currentTime();
699    while (true) {
700      rs = cluster.getRegionServer(cluster.getServerWith(regionName));
701      if (rs != null && rs.getOnlineRegion(regionName) != null) {
702        offline = false;
703        break;
704      }
705      assertTrue("Timed out in open the testing region",
706        EnvironmentEdgeManager.currentTime() < startTime + timeOut);
707    }
708    assertFalse(offline);
709
710    // c0:0, c1:1
711    kvListExp = new ArrayList<>();
712    kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[0], 0, VALUE));
713    kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[1], 1, VALUE));
714    result = scanner.next();
715    verifyResult(result, kvListExp, toLog, "Testing scan on re-opened region");
716  }
717
718  @Test
719  public void testAsyncScannerWithSmallData() throws Exception {
720    testAsyncScanner(TableName.valueOf(name.getMethodName()),
721      2,
722      3,
723      10,
724      -1,
725      null);
726  }
727
728  @Test
729  public void testAsyncScannerWithManyRows() throws Exception {
730    testAsyncScanner(TableName.valueOf(name.getMethodName()),
731      30000,
732      1,
733      1,
734      -1,
735      null);
736  }
737
738  @Test
739  public void testAsyncScannerWithoutCaching() throws Exception {
740    testAsyncScanner(TableName.valueOf(name.getMethodName()),
741      5,
742      1,
743      1,
744      1,
745      (b) -> {
746        try {
747          TimeUnit.MILLISECONDS.sleep(500);
748        } catch (InterruptedException ex) {
749        }
750      });
751  }
752
753  private void testAsyncScanner(TableName table, int rowNumber, int familyNumber,
754      int qualifierNumber, int caching, Consumer<Boolean> listener) throws Exception {
755    assert rowNumber > 0;
756    assert familyNumber > 0;
757    assert qualifierNumber > 0;
758    byte[] row = Bytes.toBytes("r");
759    byte[] family = Bytes.toBytes("f");
760    byte[] qualifier = Bytes.toBytes("q");
761    byte[][] rows = makeNAsciiWithZeroPrefix(row, rowNumber);
762    byte[][] families = makeNAsciiWithZeroPrefix(family, familyNumber);
763    byte[][] qualifiers = makeNAsciiWithZeroPrefix(qualifier, qualifierNumber);
764
765    Table ht = TEST_UTIL.createTable(table, families);
766
767    boolean toLog = true;
768    List<Cell> kvListExp = new ArrayList<>();
769
770    List<Put> puts = new ArrayList<>();
771    for (byte[] r : rows) {
772      Put put = new Put(r);
773      for (byte[] f : families) {
774        for (byte[] q : qualifiers) {
775          KeyValue kv = new KeyValue(r, f, q, 1, VALUE);
776          put.add(kv);
777          kvListExp.add(kv);
778        }
779      }
780      puts.add(put);
781      if (puts.size() > 1000) {
782        ht.put(puts);
783        puts.clear();
784      }
785    }
786    if (!puts.isEmpty()) {
787      ht.put(puts);
788      puts.clear();
789    }
790
791    Scan scan = new Scan();
792    scan.setAsyncPrefetch(true);
793    if (caching > 0) {
794      scan.setCaching(caching);
795    }
796    try (ResultScanner scanner = ht.getScanner(scan)) {
797      assertTrue("Not instance of async scanner",scanner instanceof ClientAsyncPrefetchScanner);
798      ((ClientAsyncPrefetchScanner) scanner).setPrefetchListener(listener);
799      List<Cell> kvListScan = new ArrayList<>();
800      Result result;
801      boolean first = true;
802      int actualRows = 0;
803      while ((result = scanner.next()) != null) {
804        ++actualRows;
805        // waiting for cache. see HBASE-17376
806        if (first) {
807          TimeUnit.SECONDS.sleep(1);
808          first = false;
809        }
810        for (Cell kv : result.listCells()) {
811          kvListScan.add(kv);
812        }
813      }
814      assertEquals(rowNumber, actualRows);
815      // These cells may have different rows but it is ok. The Result#getRow
816      // isn't used in the verifyResult()
817      result = Result.create(kvListScan);
818      verifyResult(result, kvListExp, toLog, "Testing async scan");
819    }
820
821    TEST_UTIL.deleteTable(table);
822  }
823
824  private static byte[][] makeNAsciiWithZeroPrefix(byte[] base, int n) {
825    int maxLength = Integer.toString(n).length();
826    byte [][] ret = new byte[n][];
827    for (int i = 0; i < n; i++) {
828      int length = Integer.toString(i).length();
829      StringBuilder buf = new StringBuilder(Integer.toString(i));
830      IntStream.range(0, maxLength - length).forEach(v -> buf.insert(0, "0"));
831      byte[] tail = Bytes.toBytes(buf.toString());
832      ret[i] = Bytes.add(base, tail);
833    }
834    return ret;
835  }
836
837  static void verifyResult(Result result, List<Cell> expKvList, boolean toLog,
838      String msg) {
839
840    LOG.info(msg);
841    LOG.info("Expected count: " + expKvList.size());
842    LOG.info("Actual count: " + result.size());
843    if (expKvList.isEmpty())
844      return;
845
846    int i = 0;
847    for (Cell kv : result.rawCells()) {
848      if (i >= expKvList.size()) {
849        break;  // we will check the size later
850      }
851
852      Cell kvExp = expKvList.get(i++);
853      if (toLog) {
854        LOG.info("get kv is: " + kv.toString());
855        LOG.info("exp kv is: " + kvExp.toString());
856      }
857      assertTrue("Not equal", kvExp.equals(kv));
858    }
859
860    assertEquals(expKvList.size(), result.size());
861  }
862
863  @Test
864  public void testReadExpiredDataForRawScan() throws IOException {
865    TableName tableName = TableName.valueOf(name.getMethodName());
866    long ts = System.currentTimeMillis() - 10000;
867    byte[] value = Bytes.toBytes("expired");
868    try (Table table = TEST_UTIL.createTable(tableName, FAMILY)) {
869      table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, ts, value));
870      assertArrayEquals(value, table.get(new Get(ROW)).getValue(FAMILY, QUALIFIER));
871      TEST_UTIL.getAdmin().modifyColumnFamily(tableName,
872        new HColumnDescriptor(FAMILY).setTimeToLive(5));
873      try (ResultScanner scanner = table.getScanner(FAMILY)) {
874        assertNull(scanner.next());
875      }
876      try (ResultScanner scanner = table.getScanner(new Scan().setRaw(true))) {
877        assertArrayEquals(value, scanner.next().getValue(FAMILY, QUALIFIER));
878        assertNull(scanner.next());
879      }
880    }
881  }
882
883  @Test
884  public void testScanWithColumnsAndFilterAndVersion() throws IOException {
885    TableName tableName = TableName.valueOf(name.getMethodName());
886    try (Table table = TEST_UTIL.createTable(tableName, FAMILY, 4)) {
887      for (int i = 0; i < 4; i++) {
888        Put put = new Put(ROW);
889        put.addColumn(FAMILY, QUALIFIER, VALUE);
890        table.put(put);
891      }
892
893      Scan scan = new Scan();
894      scan.addColumn(FAMILY, QUALIFIER);
895      scan.setFilter(new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(QUALIFIER)));
896      scan.readVersions(3);
897
898      try (ResultScanner scanner = table.getScanner(scan)) {
899        Result result = scanner.next();
900        assertEquals(3, result.size());
901      }
902    }
903  }
904
905  @Test
906  public void testScanWithSameStartRowStopRow() throws IOException {
907    TableName tableName = TableName.valueOf(name.getMethodName());
908    try (Table table = TEST_UTIL.createTable(tableName, FAMILY)) {
909      table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE));
910
911      Scan scan = new Scan().withStartRow(ROW).withStopRow(ROW);
912      try (ResultScanner scanner = table.getScanner(scan)) {
913        assertNull(scanner.next());
914      }
915
916      scan = new Scan().withStartRow(ROW, true).withStopRow(ROW, true);
917      try (ResultScanner scanner = table.getScanner(scan)) {
918        Result result = scanner.next();
919        assertNotNull(result);
920        assertArrayEquals(ROW, result.getRow());
921        assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER));
922        assertNull(scanner.next());
923      }
924
925      scan = new Scan().withStartRow(ROW, true).withStopRow(ROW, false);
926      try (ResultScanner scanner = table.getScanner(scan)) {
927        assertNull(scanner.next());
928      }
929
930      scan = new Scan().withStartRow(ROW, false).withStopRow(ROW, false);
931      try (ResultScanner scanner = table.getScanner(scan)) {
932        assertNull(scanner.next());
933      }
934
935      scan = new Scan().withStartRow(ROW, false).withStopRow(ROW, true);
936      try (ResultScanner scanner = table.getScanner(scan)) {
937        assertNull(scanner.next());
938      }
939    }
940  }
941
942  @Test
943  public void testReverseScanWithFlush() throws Exception {
944    TableName tableName = TableName.valueOf(name.getMethodName());
945    final int BATCH_SIZE = 10;
946    final int ROWS_TO_INSERT = 100;
947    final byte[] LARGE_VALUE = generateHugeValue(128 * 1024);
948
949    try (Table table = TEST_UTIL.createTable(tableName, FAMILY);
950      Admin admin = TEST_UTIL.getAdmin()) {
951      List<Put> putList = new ArrayList<>();
952      for (long i = 0; i < ROWS_TO_INSERT; i++) {
953        Put put = new Put(Bytes.toBytes(i));
954        put.addColumn(FAMILY, QUALIFIER, LARGE_VALUE);
955        putList.add(put);
956
957        if (putList.size() >= BATCH_SIZE) {
958          table.put(putList);
959          admin.flush(tableName);
960          putList.clear();
961        }
962      }
963
964      if (!putList.isEmpty()) {
965        table.put(putList);
966        admin.flush(tableName);
967        putList.clear();
968      }
969
970      Scan scan = new Scan();
971      scan.setReversed(true);
972      int count = 0;
973
974      try (ResultScanner results = table.getScanner(scan)) {
975        for (Result result : results) {
976          count++;
977        }
978      }
979      assertEquals("Expected " + ROWS_TO_INSERT + " rows in the table but it is " + count,
980        ROWS_TO_INSERT, count);
981    }
982  }
983
984  @Test
985  public void testScannerWithPartialResults() throws Exception {
986    TableName tableName = TableName.valueOf("testScannerWithPartialResults");
987    try (Table table = TEST_UTIL.createMultiRegionTable(tableName,
988      Bytes.toBytes("c"), 4)) {
989      List<Put> puts = new ArrayList<>();
990      byte[] largeArray = new byte[10000];
991      Put put = new Put(Bytes.toBytes("aaaa0"));
992      put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("1"), Bytes.toBytes("1"));
993      put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("2"), Bytes.toBytes("2"));
994      put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("3"), Bytes.toBytes("3"));
995      put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("4"), Bytes.toBytes("4"));
996      puts.add(put);
997      put = new Put(Bytes.toBytes("aaaa1"));
998      put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("1"), Bytes.toBytes("1"));
999      put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("2"), largeArray);
1000      put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("3"), largeArray);
1001      puts.add(put);
1002      table.put(puts);
1003      Scan scan = new Scan();
1004      scan.addFamily(Bytes.toBytes("c"));
1005      scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, tableName.getName());
1006      scan.setMaxResultSize(10001);
1007      scan.setStopRow(Bytes.toBytes("bbbb"));
1008      scan.setFilter(new LimitKVsReturnFilter());
1009      ResultScanner rs = table.getScanner(scan);
1010      Result result;
1011      int expectedKvNumber = 6;
1012      int returnedKvNumber = 0;
1013      while((result = rs.next()) != null) {
1014        returnedKvNumber += result.listCells().size();
1015      }
1016      rs.close();
1017      assertEquals(expectedKvNumber, returnedKvNumber);
1018    }
1019  }
1020
1021  public static class LimitKVsReturnFilter extends FilterBase {
1022
1023    private int cellCount = 0;
1024
1025    @Override
1026    public ReturnCode filterCell(Cell v) throws IOException {
1027      if (cellCount >= 6) {
1028        cellCount++;
1029        return ReturnCode.SKIP;
1030      }
1031      cellCount++;
1032      return ReturnCode.INCLUDE;
1033    }
1034
1035    @Override
1036    public boolean filterAllRemaining() throws IOException {
1037      if (cellCount < 7) {
1038        return false;
1039      }
1040      cellCount++;
1041      return true;
1042    }
1043
1044    @Override
1045    public String toString() {
1046      return this.getClass().getSimpleName();
1047    }
1048
1049    public static LimitKVsReturnFilter parseFrom(final byte [] pbBytes)
1050        throws DeserializationException {
1051      return new LimitKVsReturnFilter();
1052    }
1053  }
1054}