001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.HConstants.RPC_CODEC_CONF_KEY;
021import static org.apache.hadoop.hbase.client.TestFromClientSide3.generateHugeValue;
022import static org.apache.hadoop.hbase.ipc.RpcClient.DEFAULT_CODEC_CLASS;
023import static org.junit.Assert.assertArrayEquals;
024import static org.junit.Assert.assertEquals;
025import static org.junit.Assert.assertFalse;
026import static org.junit.Assert.assertNotNull;
027import static org.junit.Assert.assertNull;
028import static org.junit.Assert.assertTrue;
029import static org.junit.Assert.fail;
030
031import java.io.IOException;
032import java.util.ArrayList;
033import java.util.Arrays;
034import java.util.Collection;
035import java.util.List;
036import java.util.concurrent.TimeUnit;
037import java.util.function.Consumer;
038import java.util.stream.IntStream;
039import org.apache.hadoop.conf.Configuration;
040import org.apache.hadoop.hbase.Cell;
041import org.apache.hadoop.hbase.CompareOperator;
042import org.apache.hadoop.hbase.HBaseClassTestRule;
043import org.apache.hadoop.hbase.HBaseTestingUtility;
044import org.apache.hadoop.hbase.HColumnDescriptor;
045import org.apache.hadoop.hbase.HConstants;
046import org.apache.hadoop.hbase.HRegionInfo;
047import org.apache.hadoop.hbase.HRegionLocation;
048import org.apache.hadoop.hbase.HTestConst;
049import org.apache.hadoop.hbase.KeyValue;
050import org.apache.hadoop.hbase.MiniHBaseCluster;
051import org.apache.hadoop.hbase.StartMiniClusterOption;
052import org.apache.hadoop.hbase.TableName;
053import org.apache.hadoop.hbase.TableNameTestRule;
054import org.apache.hadoop.hbase.TableNotFoundException;
055import org.apache.hadoop.hbase.exceptions.DeserializationException;
056import org.apache.hadoop.hbase.filter.BinaryComparator;
057import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
058import org.apache.hadoop.hbase.filter.ColumnRangeFilter;
059import org.apache.hadoop.hbase.filter.FilterBase;
060import org.apache.hadoop.hbase.filter.QualifierFilter;
061import org.apache.hadoop.hbase.regionserver.HRegionServer;
062import org.apache.hadoop.hbase.testclassification.ClientTests;
063import org.apache.hadoop.hbase.testclassification.MediumTests;
064import org.apache.hadoop.hbase.util.Bytes;
065import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
066import org.junit.AfterClass;
067import org.junit.Before;
068import org.junit.ClassRule;
069import org.junit.Rule;
070import org.junit.Test;
071import org.junit.experimental.categories.Category;
072import org.junit.runner.RunWith;
073import org.junit.runners.Parameterized;
074import org.slf4j.Logger;
075import org.slf4j.LoggerFactory;
076
077import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
078
079/**
080 * A client-side test, mostly testing scanners with various parameters. Parameterized on different
081 * registry implementations.
082 */
083@Category({MediumTests.class, ClientTests.class})
084@RunWith(Parameterized.class)
085public class TestScannersFromClientSide {
086
087  @ClassRule
088  public static final HBaseClassTestRule CLASS_RULE =
089      HBaseClassTestRule.forClass(TestScannersFromClientSide.class);
090
091  private static final Logger LOG = LoggerFactory.getLogger(TestScannersFromClientSide.class);
092
093  private static HBaseTestingUtility TEST_UTIL;
094  private static byte [] ROW = Bytes.toBytes("testRow");
095  private static byte [] FAMILY = Bytes.toBytes("testFamily");
096  private static byte [] QUALIFIER = Bytes.toBytes("testQualifier");
097  private static byte [] VALUE = Bytes.toBytes("testValue");
098
099  @Rule public TableNameTestRule name = new TableNameTestRule();
100
101  /**
102   * @throws java.lang.Exception
103   */
104  @AfterClass
105  public static void tearDownAfterClass() throws Exception {
106    if (TEST_UTIL != null) {
107      TEST_UTIL.shutdownMiniCluster();
108    }
109  }
110
111  /**
112   * @throws java.lang.Exception
113   */
114  @Before
115  public void setUp() throws Exception {
116    // Nothing to do.
117  }
118
119  @Parameterized.Parameters
120  public static Collection<Object[]> parameters() {
121    return Arrays.asList(new Object[][] {
122        { MasterRegistry.class, 1},
123        { MasterRegistry.class, 2},
124        { ZKConnectionRegistry.class, 1}
125    });
126  }
127
128  /**
129   * JUnit does not provide an easy way to run a hook after each parameterized run. Without that
130   * there is no easy way to restart the test cluster after each parameterized run. Annotation
131   * BeforeParam does not work either because it runs before parameterization and hence does not
132   * have access to the test parameters (which is weird).
133   *
134   * This *hack* checks if the current instance of test cluster configuration has the passed
135   * parameterized configs. In such a case, we can just reuse the cluster for test and do not need
136   * to initialize from scratch. While this is a hack, it saves a ton of time for the full
137   * test and de-flakes it.
138   */
139  private static boolean isSameParameterizedCluster(Class<?> registryImpl, int numHedgedReqs) {
140    // initialize() is called for every unit test, however we only want to reset the cluster state
141    // at the end of every parameterized run.
142    if (TEST_UTIL == null) {
143      return false;
144    }
145    Configuration conf = TEST_UTIL.getConfiguration();
146    Class<?> confClass = conf.getClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
147      ZKConnectionRegistry.class);
148    int hedgedReqConfig = conf.getInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY,
149      MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT);
150    return confClass.getName().equals(registryImpl.getName()) && numHedgedReqs == hedgedReqConfig;
151  }
152
153  public TestScannersFromClientSide(Class<?> registryImpl, int numHedgedReqs) throws Exception {
154    if (isSameParameterizedCluster(registryImpl, numHedgedReqs)) {
155      return;
156    }
157    if (TEST_UTIL != null) {
158      // We reached the end of a parameterized run, clean up the cluster.
159      TEST_UTIL.shutdownMiniCluster();
160    }
161    TEST_UTIL = new HBaseTestingUtility();
162    Configuration conf = TEST_UTIL.getConfiguration();
163    conf.setLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, 10 * 1024 * 1024);
164    conf.setClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, registryImpl,
165        ConnectionRegistry.class);
166    Preconditions.checkArgument(numHedgedReqs > 0);
167    conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, numHedgedReqs);
168    StartMiniClusterOption.Builder builder = StartMiniClusterOption.builder();
169    // Multiple masters needed only when hedged reads for master registry are enabled.
170    builder.numMasters(numHedgedReqs > 1 ? 3 : 1).numRegionServers(3);
171    TEST_UTIL.startMiniCluster(builder.build());
172  }
173
174  /**
175   * Test from client side for batch of scan
176   *
177   * @throws Exception
178   */
179  @Test
180  public void testScanBatch() throws Exception {
181    final TableName tableName = name.getTableName();
182    byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 8);
183
184    Table ht = TEST_UTIL.createTable(tableName, FAMILY);
185
186    Put put;
187    Scan scan;
188    Delete delete;
189    Result result;
190    ResultScanner scanner;
191    boolean toLog = true;
192    List<Cell> kvListExp;
193
194    // table: row, family, c0:0, c1:1, ... , c7:7
195    put = new Put(ROW);
196    for (int i=0; i < QUALIFIERS.length; i++) {
197      KeyValue kv = new KeyValue(ROW, FAMILY, QUALIFIERS[i], i, VALUE);
198      put.add(kv);
199    }
200    ht.put(put);
201
202    // table: row, family, c0:0, c1:1, ..., c6:2, c6:6 , c7:7
203    put = new Put(ROW);
204    KeyValue kv = new KeyValue(ROW, FAMILY, QUALIFIERS[6], 2, VALUE);
205    put.add(kv);
206    ht.put(put);
207
208    // delete upto ts: 3
209    delete = new Delete(ROW);
210    delete.addFamily(FAMILY, 3);
211    ht.delete(delete);
212
213    // without batch
214    scan = new Scan().withStartRow(ROW);
215    scan.setMaxVersions();
216    scanner = ht.getScanner(scan);
217
218    // c4:4, c5:5, c6:6, c7:7
219    kvListExp = new ArrayList<>();
220    kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[4], 4, VALUE));
221    kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[5], 5, VALUE));
222    kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[6], 6, VALUE));
223    kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[7], 7, VALUE));
224    result = scanner.next();
225    verifyResult(result, kvListExp, toLog, "Testing first batch of scan");
226
227    // with batch
228    scan =  new Scan().withStartRow(ROW);
229    scan.setMaxVersions();
230    scan.setBatch(2);
231    scanner = ht.getScanner(scan);
232
233    // First batch: c4:4, c5:5
234    kvListExp = new ArrayList<>();
235    kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[4], 4, VALUE));
236    kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[5], 5, VALUE));
237    result = scanner.next();
238    verifyResult(result, kvListExp, toLog, "Testing first batch of scan");
239
240    // Second batch: c6:6, c7:7
241    kvListExp = new ArrayList<>();
242    kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[6], 6, VALUE));
243    kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[7], 7, VALUE));
244    result = scanner.next();
245    verifyResult(result, kvListExp, toLog, "Testing second batch of scan");
246
247  }
248
249  @Test
250  public void testMaxResultSizeIsSetToDefault() throws Exception {
251    final TableName tableName = name.getTableName();
252    Table ht = TEST_UTIL.createTable(tableName, FAMILY);
253
254    // The max result size we expect the scan to use by default.
255    long expectedMaxResultSize =
256        TEST_UTIL.getConfiguration().getLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
257          HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
258
259    int numRows = 5;
260    byte[][] ROWS = HTestConst.makeNAscii(ROW, numRows);
261
262    int numQualifiers = 10;
263    byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, numQualifiers);
264
265    // Specify the cell size such that a single row will be larger than the default
266    // value of maxResultSize. This means that Scan RPCs should return at most a single
267    // result back to the client.
268    int cellSize = (int) (expectedMaxResultSize / (numQualifiers - 1));
269    byte[] cellValue = Bytes.createMaxByteArray(cellSize);
270
271    Put put;
272    List<Put> puts = new ArrayList<>();
273    for (int row = 0; row < ROWS.length; row++) {
274      put = new Put(ROWS[row]);
275      for (int qual = 0; qual < QUALIFIERS.length; qual++) {
276        KeyValue kv = new KeyValue(ROWS[row], FAMILY, QUALIFIERS[qual], cellValue);
277        put.add(kv);
278      }
279      puts.add(put);
280    }
281    ht.put(puts);
282
283    // Create a scan with the default configuration.
284    Scan scan = new Scan();
285
286    ResultScanner scanner = ht.getScanner(scan);
287    assertTrue(scanner instanceof ClientScanner);
288    ClientScanner clientScanner = (ClientScanner) scanner;
289
290    // Call next to issue a single RPC to the server
291    scanner.next();
292
293    // The scanner should have, at most, a single result in its cache. If there more results exists
294    // in the cache it means that more than the expected max result size was fetched.
295    assertTrue("The cache contains: " + clientScanner.getCacheSize() + " results",
296      clientScanner.getCacheSize() <= 1);
297  }
298
299  /**
300   * Scan on not existing table should throw the exception with correct message
301   */
302  @Test
303  public void testScannerForNotExistingTable() {
304    String[] tableNames = {"A", "Z", "A:A", "Z:Z"};
305    for(String tableName : tableNames) {
306      try {
307        Table table = TEST_UTIL.getConnection().getTable(TableName.valueOf(tableName));
308        testSmallScan(table, true, 1, 5);
309        fail("TableNotFoundException was not thrown");
310      } catch (TableNotFoundException e) {
311        // We expect that the message for TableNotFoundException would have only the table name only
312        // Otherwise that would mean that localeRegionInMeta doesn't work properly
313        assertEquals(e.getMessage(), tableName);
314      } catch (Exception e) {
315        fail("Unexpected exception " + e.getMessage());
316      }
317    }
318  }
319
320  @Test
321  public void testSmallScan() throws Exception {
322    final TableName tableName = name.getTableName();
323
324    int numRows = 10;
325    byte[][] ROWS = HTestConst.makeNAscii(ROW, numRows);
326
327    int numQualifiers = 10;
328    byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, numQualifiers);
329
330    Table ht = TEST_UTIL.createTable(tableName, FAMILY);
331
332    Put put;
333    List<Put> puts = new ArrayList<>();
334    for (int row = 0; row < ROWS.length; row++) {
335      put = new Put(ROWS[row]);
336      for (int qual = 0; qual < QUALIFIERS.length; qual++) {
337        KeyValue kv = new KeyValue(ROWS[row], FAMILY, QUALIFIERS[qual], VALUE);
338        put.add(kv);
339      }
340      puts.add(put);
341    }
342    ht.put(puts);
343
344    int expectedRows = numRows;
345    int expectedCols = numRows * numQualifiers;
346
347    // Test normal and reversed
348    testSmallScan(ht, true, expectedRows, expectedCols);
349    testSmallScan(ht, false, expectedRows, expectedCols);
350  }
351
352  /**
353   * Run through a variety of test configurations with a small scan
354   * @param table
355   * @param reversed
356   * @param rows
357   * @param columns
358   * @throws Exception
359   */
360  private void testSmallScan(
361      Table table, boolean reversed, int rows, int columns) throws Exception {
362    Scan baseScan = new Scan();
363    baseScan.setReversed(reversed);
364    baseScan.setSmall(true);
365
366    Scan scan = new Scan(baseScan);
367    verifyExpectedCounts(table, scan, rows, columns);
368
369    scan = new Scan(baseScan);
370    scan.setMaxResultSize(1);
371    verifyExpectedCounts(table, scan, rows, columns);
372
373    scan = new Scan(baseScan);
374    scan.setMaxResultSize(1);
375    scan.setCaching(Integer.MAX_VALUE);
376    verifyExpectedCounts(table, scan, rows, columns);
377  }
378
379  private void verifyExpectedCounts(Table table, Scan scan, int expectedRowCount,
380      int expectedCellCount) throws Exception {
381    ResultScanner scanner = table.getScanner(scan);
382
383    int rowCount = 0;
384    int cellCount = 0;
385    Result r = null;
386    while ((r = scanner.next()) != null) {
387      rowCount++;
388      cellCount += r.rawCells().length;
389    }
390
391    assertTrue("Expected row count: " + expectedRowCount + " Actual row count: " + rowCount,
392        expectedRowCount == rowCount);
393    assertTrue("Expected cell count: " + expectedCellCount + " Actual cell count: " + cellCount,
394        expectedCellCount == cellCount);
395    scanner.close();
396  }
397
398  /**
399   * Test from client side for get with maxResultPerCF set
400   *
401   * @throws Exception
402   */
403  @Test
404  public void testGetMaxResults() throws Exception {
405    final TableName tableName = name.getTableName();
406    byte [][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3);
407    byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 20);
408
409    Table ht = TEST_UTIL.createTable(tableName, FAMILIES);
410
411    Get get;
412    Put put;
413    Result result;
414    boolean toLog = true;
415    List<Cell> kvListExp;
416
417    kvListExp = new ArrayList<>();
418    // Insert one CF for row[0]
419    put = new Put(ROW);
420    for (int i=0; i < 10; i++) {
421      KeyValue kv = new KeyValue(ROW, FAMILIES[0], QUALIFIERS[i], 1, VALUE);
422      put.add(kv);
423      kvListExp.add(kv);
424    }
425    ht.put(put);
426
427    get = new Get(ROW);
428    result = ht.get(get);
429    verifyResult(result, kvListExp, toLog, "Testing without setting maxResults");
430
431    get = new Get(ROW);
432    get.setMaxResultsPerColumnFamily(2);
433    result = ht.get(get);
434    kvListExp = new ArrayList<>();
435    kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[0], 1, VALUE));
436    kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[1], 1, VALUE));
437    verifyResult(result, kvListExp, toLog, "Testing basic setMaxResults");
438
439    // Filters: ColumnRangeFilter
440    get = new Get(ROW);
441    get.setMaxResultsPerColumnFamily(5);
442    get.setFilter(new ColumnRangeFilter(QUALIFIERS[2], true, QUALIFIERS[5],
443                                        true));
444    result = ht.get(get);
445    kvListExp = new ArrayList<>();
446    kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[2], 1, VALUE));
447    kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[3], 1, VALUE));
448    kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[4], 1, VALUE));
449    kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[5], 1, VALUE));
450    verifyResult(result, kvListExp, toLog, "Testing single CF with CRF");
451
452    // Insert two more CF for row[0]
453    // 20 columns for CF2, 10 columns for CF1
454    put = new Put(ROW);
455    for (int i=0; i < QUALIFIERS.length; i++) {
456      KeyValue kv = new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE);
457      put.add(kv);
458    }
459    ht.put(put);
460
461    put = new Put(ROW);
462    for (int i=0; i < 10; i++) {
463      KeyValue kv = new KeyValue(ROW, FAMILIES[1], QUALIFIERS[i], 1, VALUE);
464      put.add(kv);
465    }
466    ht.put(put);
467
468    get = new Get(ROW);
469    get.setMaxResultsPerColumnFamily(12);
470    get.addFamily(FAMILIES[1]);
471    get.addFamily(FAMILIES[2]);
472    result = ht.get(get);
473    kvListExp = new ArrayList<>();
474    //Exp: CF1:q0, ..., q9, CF2: q0, q1, q10, q11, ..., q19
475    for (int i=0; i < 10; i++) {
476      kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[i], 1, VALUE));
477    }
478    for (int i=0; i < 2; i++) {
479      kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE));
480    }
481    for (int i=10; i < 20; i++) {
482      kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE));
483    }
484    verifyResult(result, kvListExp, toLog, "Testing multiple CFs");
485
486    // Filters: ColumnRangeFilter and ColumnPrefixFilter
487    get = new Get(ROW);
488    get.setMaxResultsPerColumnFamily(3);
489    get.setFilter(new ColumnRangeFilter(QUALIFIERS[2], true, null, true));
490    result = ht.get(get);
491    kvListExp = new ArrayList<>();
492    for (int i=2; i < 5; i++) {
493      kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[i], 1, VALUE));
494    }
495    for (int i=2; i < 5; i++) {
496      kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[i], 1, VALUE));
497    }
498    for (int i=2; i < 5; i++) {
499      kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE));
500    }
501    verifyResult(result, kvListExp, toLog, "Testing multiple CFs + CRF");
502
503    get = new Get(ROW);
504    get.setMaxResultsPerColumnFamily(7);
505    get.setFilter(new ColumnPrefixFilter(QUALIFIERS[1]));
506    result = ht.get(get);
507    kvListExp = new ArrayList<>();
508    kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[1], 1, VALUE));
509    kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[1], 1, VALUE));
510    kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[1], 1, VALUE));
511    for (int i=10; i < 16; i++) {
512      kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE));
513    }
514    verifyResult(result, kvListExp, toLog, "Testing multiple CFs + PFF");
515
516  }
517
518  /**
519   * Test from client side for scan with maxResultPerCF set
520   *
521   * @throws Exception
522   */
523  @Test
524  public void testScanMaxResults() throws Exception {
525    final TableName tableName = name.getTableName();
526    byte [][] ROWS = HTestConst.makeNAscii(ROW, 2);
527    byte [][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3);
528    byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 10);
529
530    Table ht = TEST_UTIL.createTable(tableName, FAMILIES);
531
532    Put put;
533    Scan scan;
534    Result result;
535    boolean toLog = true;
536    List<Cell> kvListExp, kvListScan;
537
538    kvListExp = new ArrayList<>();
539
540    for (int r=0; r < ROWS.length; r++) {
541      put = new Put(ROWS[r]);
542      for (int c=0; c < FAMILIES.length; c++) {
543        for (int q=0; q < QUALIFIERS.length; q++) {
544          KeyValue kv = new KeyValue(ROWS[r], FAMILIES[c], QUALIFIERS[q], 1, VALUE);
545          put.add(kv);
546          if (q < 4) {
547            kvListExp.add(kv);
548          }
549        }
550      }
551      ht.put(put);
552    }
553
554    scan = new Scan();
555    scan.setMaxResultsPerColumnFamily(4);
556    ResultScanner scanner = ht.getScanner(scan);
557    kvListScan = new ArrayList<>();
558    while ((result = scanner.next()) != null) {
559      for (Cell kv : result.listCells()) {
560        kvListScan.add(kv);
561      }
562    }
563    result = Result.create(kvListScan);
564    verifyResult(result, kvListExp, toLog, "Testing scan with maxResults");
565
566  }
567
568  /**
569   * Test from client side for get with rowOffset
570   *
571   * @throws Exception
572   */
573  @Test
574  public void testGetRowOffset() throws Exception {
575    final TableName tableName = name.getTableName();
576    byte [][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3);
577    byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 20);
578
579    Table ht = TEST_UTIL.createTable(tableName, FAMILIES);
580
581    Get get;
582    Put put;
583    Result result;
584    boolean toLog = true;
585    List<Cell> kvListExp;
586
587    // Insert one CF for row
588    kvListExp = new ArrayList<>();
589    put = new Put(ROW);
590    for (int i=0; i < 10; i++) {
591      KeyValue kv = new KeyValue(ROW, FAMILIES[0], QUALIFIERS[i], 1, VALUE);
592      put.add(kv);
593      // skipping first two kvs
594      if (i < 2) {
595        continue;
596      }
597      kvListExp.add(kv);
598    }
599    ht.put(put);
600
601    //setting offset to 2
602    get = new Get(ROW);
603    get.setRowOffsetPerColumnFamily(2);
604    result = ht.get(get);
605    verifyResult(result, kvListExp, toLog, "Testing basic setRowOffset");
606
607    //setting offset to 20
608    get = new Get(ROW);
609    get.setRowOffsetPerColumnFamily(20);
610    result = ht.get(get);
611    kvListExp = new ArrayList<>();
612    verifyResult(result, kvListExp, toLog, "Testing offset > #kvs");
613
614    //offset + maxResultPerCF
615    get = new Get(ROW);
616    get.setRowOffsetPerColumnFamily(4);
617    get.setMaxResultsPerColumnFamily(5);
618    result = ht.get(get);
619    kvListExp = new ArrayList<>();
620    for (int i=4; i < 9; i++) {
621      kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[i], 1, VALUE));
622    }
623    verifyResult(result, kvListExp, toLog,
624      "Testing offset + setMaxResultsPerCF");
625
626    // Filters: ColumnRangeFilter
627    get = new Get(ROW);
628    get.setRowOffsetPerColumnFamily(1);
629    get.setFilter(new ColumnRangeFilter(QUALIFIERS[2], true, QUALIFIERS[5],
630                                        true));
631    result = ht.get(get);
632    kvListExp = new ArrayList<>();
633    kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[3], 1, VALUE));
634    kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[4], 1, VALUE));
635    kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[5], 1, VALUE));
636    verifyResult(result, kvListExp, toLog, "Testing offset with CRF");
637
638    // Insert into two more CFs for row
639    // 10 columns for CF2, 10 columns for CF1
640    for(int j=2; j > 0; j--) {
641      put = new Put(ROW);
642      for (int i=0; i < 10; i++) {
643        KeyValue kv = new KeyValue(ROW, FAMILIES[j], QUALIFIERS[i], 1, VALUE);
644        put.add(kv);
645      }
646      ht.put(put);
647    }
648
649    get = new Get(ROW);
650    get.setRowOffsetPerColumnFamily(4);
651    get.setMaxResultsPerColumnFamily(2);
652    get.addFamily(FAMILIES[1]);
653    get.addFamily(FAMILIES[2]);
654    result = ht.get(get);
655    kvListExp = new ArrayList<>();
656    //Exp: CF1:q4, q5, CF2: q4, q5
657    kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[4], 1, VALUE));
658    kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[5], 1, VALUE));
659    kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[4], 1, VALUE));
660    kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[5], 1, VALUE));
661    verifyResult(result, kvListExp, toLog,
662       "Testing offset + multiple CFs + maxResults");
663  }
664
665  @Test
666  public void testScanRawDeleteFamilyVersion() throws Exception {
667    TableName tableName = name.getTableName();
668    TEST_UTIL.createTable(tableName, FAMILY);
669    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
670    conf.set(RPC_CODEC_CONF_KEY, "");
671    conf.set(DEFAULT_CODEC_CLASS, "");
672    try (Connection connection = ConnectionFactory.createConnection(conf);
673        Table table = connection.getTable(tableName)) {
674      Delete delete = new Delete(ROW);
675      delete.addFamilyVersion(FAMILY, 0L);
676      table.delete(delete);
677      Scan scan = new Scan(ROW).setRaw(true);
678      ResultScanner scanner = table.getScanner(scan);
679      int count = 0;
680      while (scanner.next() != null) {
681        count++;
682      }
683      assertEquals(1, count);
684    } finally {
685      TEST_UTIL.deleteTable(tableName);
686    }
687  }
688
689  /**
690   * Test from client side for scan while the region is reopened
691   * on the same region server.
692   *
693   * @throws Exception
694   */
695  @Test
696  public void testScanOnReopenedRegion() throws Exception {
697    final TableName tableName = name.getTableName();
698    byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 2);
699
700    Table ht = TEST_UTIL.createTable(tableName, FAMILY);
701
702    Put put;
703    Scan scan;
704    Result result;
705    ResultScanner scanner;
706    boolean toLog = false;
707    List<Cell> kvListExp;
708
709    // table: row, family, c0:0, c1:1
710    put = new Put(ROW);
711    for (int i=0; i < QUALIFIERS.length; i++) {
712      KeyValue kv = new KeyValue(ROW, FAMILY, QUALIFIERS[i], i, VALUE);
713      put.add(kv);
714    }
715    ht.put(put);
716
717    scan = new Scan().withStartRow(ROW);
718    scanner = ht.getScanner(scan);
719
720    HRegionLocation loc;
721
722    try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName)) {
723      loc = locator.getRegionLocation(ROW);
724    }
725    HRegionInfo hri = loc.getRegionInfo();
726    MiniHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster();
727    byte[] regionName = hri.getRegionName();
728    int i = cluster.getServerWith(regionName);
729    HRegionServer rs = cluster.getRegionServer(i);
730    LOG.info("Unassigning " + hri);
731    TEST_UTIL.getAdmin().unassign(hri.getRegionName(), true);
732    long startTime = EnvironmentEdgeManager.currentTime();
733    long timeOut = 10000;
734    boolean offline = false;
735    while (true) {
736      if (rs.getOnlineRegion(regionName) == null) {
737        offline = true;
738        break;
739      }
740      assertTrue("Timed out in closing the testing region",
741        EnvironmentEdgeManager.currentTime() < startTime + timeOut);
742    }
743    assertTrue(offline);
744    LOG.info("Assigning " + hri);
745    TEST_UTIL.getAdmin().assign(hri.getRegionName());
746    startTime = EnvironmentEdgeManager.currentTime();
747    while (true) {
748      rs = cluster.getRegionServer(cluster.getServerWith(regionName));
749      if (rs != null && rs.getOnlineRegion(regionName) != null) {
750        offline = false;
751        break;
752      }
753      assertTrue("Timed out in open the testing region",
754        EnvironmentEdgeManager.currentTime() < startTime + timeOut);
755    }
756    assertFalse(offline);
757
758    // c0:0, c1:1
759    kvListExp = new ArrayList<>();
760    kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[0], 0, VALUE));
761    kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[1], 1, VALUE));
762    result = scanner.next();
763    verifyResult(result, kvListExp, toLog, "Testing scan on re-opened region");
764  }
765
766  @Test
767  public void testAsyncScannerWithSmallData() throws Exception {
768    testAsyncScanner(name.getTableName(),
769      2,
770      3,
771      10,
772      -1,
773      null);
774  }
775
776  @Test
777  public void testAsyncScannerWithManyRows() throws Exception {
778    testAsyncScanner(name.getTableName(),
779      30000,
780      1,
781      1,
782      -1,
783      null);
784  }
785
786  @Test
787  public void testAsyncScannerWithoutCaching() throws Exception {
788    testAsyncScanner(name.getTableName(),
789      5,
790      1,
791      1,
792      1,
793      (b) -> {
794        try {
795          TimeUnit.MILLISECONDS.sleep(500);
796        } catch (InterruptedException ex) {
797        }
798      });
799  }
800
801  private void testAsyncScanner(TableName table, int rowNumber, int familyNumber,
802      int qualifierNumber, int caching, Consumer<Boolean> listener) throws Exception {
803    assert rowNumber > 0;
804    assert familyNumber > 0;
805    assert qualifierNumber > 0;
806    byte[] row = Bytes.toBytes("r");
807    byte[] family = Bytes.toBytes("f");
808    byte[] qualifier = Bytes.toBytes("q");
809    byte[][] rows = makeNAsciiWithZeroPrefix(row, rowNumber);
810    byte[][] families = makeNAsciiWithZeroPrefix(family, familyNumber);
811    byte[][] qualifiers = makeNAsciiWithZeroPrefix(qualifier, qualifierNumber);
812
813    Table ht = TEST_UTIL.createTable(table, families);
814
815    boolean toLog = true;
816    List<Cell> kvListExp = new ArrayList<>();
817
818    List<Put> puts = new ArrayList<>();
819    for (byte[] r : rows) {
820      Put put = new Put(r);
821      for (byte[] f : families) {
822        for (byte[] q : qualifiers) {
823          KeyValue kv = new KeyValue(r, f, q, 1, VALUE);
824          put.add(kv);
825          kvListExp.add(kv);
826        }
827      }
828      puts.add(put);
829      if (puts.size() > 1000) {
830        ht.put(puts);
831        puts.clear();
832      }
833    }
834    if (!puts.isEmpty()) {
835      ht.put(puts);
836      puts.clear();
837    }
838
839    Scan scan = new Scan();
840    scan.setAsyncPrefetch(true);
841    if (caching > 0) {
842      scan.setCaching(caching);
843    }
844    try (ResultScanner scanner = ht.getScanner(scan)) {
845      assertTrue("Not instance of async scanner",scanner instanceof ClientAsyncPrefetchScanner);
846      ((ClientAsyncPrefetchScanner) scanner).setPrefetchListener(listener);
847      List<Cell> kvListScan = new ArrayList<>();
848      Result result;
849      boolean first = true;
850      int actualRows = 0;
851      while ((result = scanner.next()) != null) {
852        ++actualRows;
853        // waiting for cache. see HBASE-17376
854        if (first) {
855          TimeUnit.SECONDS.sleep(1);
856          first = false;
857        }
858        for (Cell kv : result.listCells()) {
859          kvListScan.add(kv);
860        }
861      }
862      assertEquals(rowNumber, actualRows);
863      // These cells may have different rows but it is ok. The Result#getRow
864      // isn't used in the verifyResult()
865      result = Result.create(kvListScan);
866      verifyResult(result, kvListExp, toLog, "Testing async scan");
867    }
868
869    TEST_UTIL.deleteTable(table);
870  }
871
872  private static byte[][] makeNAsciiWithZeroPrefix(byte[] base, int n) {
873    int maxLength = Integer.toString(n).length();
874    byte [][] ret = new byte[n][];
875    for (int i = 0; i < n; i++) {
876      int length = Integer.toString(i).length();
877      StringBuilder buf = new StringBuilder(Integer.toString(i));
878      IntStream.range(0, maxLength - length).forEach(v -> buf.insert(0, "0"));
879      byte[] tail = Bytes.toBytes(buf.toString());
880      ret[i] = Bytes.add(base, tail);
881    }
882    return ret;
883  }
884
885  static void verifyResult(Result result, List<Cell> expKvList, boolean toLog,
886      String msg) {
887
888    LOG.info(msg);
889    LOG.info("Expected count: " + expKvList.size());
890    LOG.info("Actual count: " + result.size());
891    if (expKvList.isEmpty()) {
892      return;
893    }
894
895    int i = 0;
896    for (Cell kv : result.rawCells()) {
897      if (i >= expKvList.size()) {
898        break;  // we will check the size later
899      }
900
901      Cell kvExp = expKvList.get(i++);
902      if (toLog) {
903        LOG.info("get kv is: " + kv.toString());
904        LOG.info("exp kv is: " + kvExp.toString());
905      }
906      assertTrue("Not equal", kvExp.equals(kv));
907    }
908
909    assertEquals(expKvList.size(), result.size());
910  }
911
912  @Test
913  public void testReadExpiredDataForRawScan() throws IOException {
914    TableName tableName = name.getTableName();
915    long ts = System.currentTimeMillis() - 10000;
916    byte[] value = Bytes.toBytes("expired");
917    try (Table table = TEST_UTIL.createTable(tableName, FAMILY)) {
918      table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, ts, value));
919      assertArrayEquals(value, table.get(new Get(ROW)).getValue(FAMILY, QUALIFIER));
920      TEST_UTIL.getAdmin().modifyColumnFamily(tableName,
921        new HColumnDescriptor(FAMILY).setTimeToLive(5));
922      try (ResultScanner scanner = table.getScanner(FAMILY)) {
923        assertNull(scanner.next());
924      }
925      try (ResultScanner scanner = table.getScanner(new Scan().setRaw(true))) {
926        assertArrayEquals(value, scanner.next().getValue(FAMILY, QUALIFIER));
927        assertNull(scanner.next());
928      }
929    }
930  }
931
932  @Test
933  public void testScanWithColumnsAndFilterAndVersion() throws IOException {
934    TableName tableName = name.getTableName();
935    long now = System.currentTimeMillis();
936    try (Table table = TEST_UTIL.createTable(tableName, FAMILY, 4)) {
937      for (int i = 0; i < 4; i++) {
938        Put put = new Put(ROW);
939        put.addColumn(FAMILY, QUALIFIER, now + i, VALUE);
940        table.put(put);
941      }
942
943      Scan scan = new Scan();
944      scan.addColumn(FAMILY, QUALIFIER);
945      scan.setFilter(new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(QUALIFIER)));
946      scan.readVersions(3);
947
948      try (ResultScanner scanner = table.getScanner(scan)) {
949        Result result = scanner.next();
950        assertEquals(3, result.size());
951      }
952    }
953  }
954
955  @Test
956  public void testScanWithSameStartRowStopRow() throws IOException {
957    TableName tableName = name.getTableName();
958    try (Table table = TEST_UTIL.createTable(tableName, FAMILY)) {
959      table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE));
960
961      Scan scan = new Scan().withStartRow(ROW).withStopRow(ROW);
962      try (ResultScanner scanner = table.getScanner(scan)) {
963        assertNull(scanner.next());
964      }
965
966      scan = new Scan().withStartRow(ROW, true).withStopRow(ROW, true);
967      try (ResultScanner scanner = table.getScanner(scan)) {
968        Result result = scanner.next();
969        assertNotNull(result);
970        assertArrayEquals(ROW, result.getRow());
971        assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER));
972        assertNull(scanner.next());
973      }
974
975      scan = new Scan().withStartRow(ROW, true).withStopRow(ROW, false);
976      try (ResultScanner scanner = table.getScanner(scan)) {
977        assertNull(scanner.next());
978      }
979
980      scan = new Scan().withStartRow(ROW, false).withStopRow(ROW, false);
981      try (ResultScanner scanner = table.getScanner(scan)) {
982        assertNull(scanner.next());
983      }
984
985      scan = new Scan().withStartRow(ROW, false).withStopRow(ROW, true);
986      try (ResultScanner scanner = table.getScanner(scan)) {
987        assertNull(scanner.next());
988      }
989    }
990  }
991
992  @Test
993  public void testReverseScanWithFlush() throws Exception {
994    TableName tableName = name.getTableName();
995    final int BATCH_SIZE = 10;
996    final int ROWS_TO_INSERT = 100;
997    final byte[] LARGE_VALUE = generateHugeValue(128 * 1024);
998
999    try (Table table = TEST_UTIL.createTable(tableName, FAMILY);
1000      Admin admin = TEST_UTIL.getAdmin()) {
1001      List<Put> putList = new ArrayList<>();
1002      for (long i = 0; i < ROWS_TO_INSERT; i++) {
1003        Put put = new Put(Bytes.toBytes(i));
1004        put.addColumn(FAMILY, QUALIFIER, LARGE_VALUE);
1005        putList.add(put);
1006
1007        if (putList.size() >= BATCH_SIZE) {
1008          table.put(putList);
1009          admin.flush(tableName);
1010          putList.clear();
1011        }
1012      }
1013
1014      if (!putList.isEmpty()) {
1015        table.put(putList);
1016        admin.flush(tableName);
1017        putList.clear();
1018      }
1019
1020      Scan scan = new Scan();
1021      scan.setReversed(true);
1022      int count = 0;
1023
1024      try (ResultScanner results = table.getScanner(scan)) {
1025        for (Result result : results) {
1026          count++;
1027        }
1028      }
1029      assertEquals("Expected " + ROWS_TO_INSERT + " rows in the table but it is " + count,
1030        ROWS_TO_INSERT, count);
1031    }
1032  }
1033
1034  @Test
1035  public void testScannerWithPartialResults() throws Exception {
1036    TableName tableName = TableName.valueOf("testScannerWithPartialResults");
1037    try (Table table = TEST_UTIL.createMultiRegionTable(tableName,
1038      Bytes.toBytes("c"), 4)) {
1039      List<Put> puts = new ArrayList<>();
1040      byte[] largeArray = new byte[10000];
1041      Put put = new Put(Bytes.toBytes("aaaa0"));
1042      put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("1"), Bytes.toBytes("1"));
1043      put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("2"), Bytes.toBytes("2"));
1044      put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("3"), Bytes.toBytes("3"));
1045      put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("4"), Bytes.toBytes("4"));
1046      puts.add(put);
1047      put = new Put(Bytes.toBytes("aaaa1"));
1048      put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("1"), Bytes.toBytes("1"));
1049      put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("2"), largeArray);
1050      put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("3"), largeArray);
1051      puts.add(put);
1052      table.put(puts);
1053      Scan scan = new Scan();
1054      scan.addFamily(Bytes.toBytes("c"));
1055      scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, tableName.getName());
1056      scan.setMaxResultSize(10001);
1057      scan.setStopRow(Bytes.toBytes("bbbb"));
1058      scan.setFilter(new LimitKVsReturnFilter());
1059      ResultScanner rs = table.getScanner(scan);
1060      Result result;
1061      int expectedKvNumber = 6;
1062      int returnedKvNumber = 0;
1063      while((result = rs.next()) != null) {
1064        returnedKvNumber += result.listCells().size();
1065      }
1066      rs.close();
1067      assertEquals(expectedKvNumber, returnedKvNumber);
1068    }
1069  }
1070
1071  public static class LimitKVsReturnFilter extends FilterBase {
1072
1073    private int cellCount = 0;
1074
1075    @Override
1076    public ReturnCode filterCell(Cell v) throws IOException {
1077      if (cellCount >= 6) {
1078        cellCount++;
1079        return ReturnCode.SKIP;
1080      }
1081      cellCount++;
1082      return ReturnCode.INCLUDE;
1083    }
1084
1085    @Override
1086    public boolean filterAllRemaining() throws IOException {
1087      if (cellCount < 7) {
1088        return false;
1089      }
1090      cellCount++;
1091      return true;
1092    }
1093
1094    @Override
1095    public String toString() {
1096      return this.getClass().getSimpleName();
1097    }
1098
1099    public static LimitKVsReturnFilter parseFrom(final byte [] pbBytes)
1100        throws DeserializationException {
1101      return new LimitKVsReturnFilter();
1102    }
1103  }
1104}