001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.HConstants.RPC_CODEC_CONF_KEY;
021import static org.apache.hadoop.hbase.client.TestFromClientSide3.generateHugeValue;
022import static org.apache.hadoop.hbase.ipc.RpcClient.DEFAULT_CODEC_CLASS;
023import static org.junit.Assert.assertArrayEquals;
024import static org.junit.Assert.assertEquals;
025import static org.junit.Assert.assertFalse;
026import static org.junit.Assert.assertNotNull;
027import static org.junit.Assert.assertNull;
028import static org.junit.Assert.assertTrue;
029import static org.junit.Assert.fail;
030
031import java.io.IOException;
032import java.util.ArrayList;
033import java.util.Arrays;
034import java.util.Collection;
035import java.util.List;
036import java.util.concurrent.TimeUnit;
037import java.util.function.Consumer;
038import java.util.stream.IntStream;
039import org.apache.hadoop.conf.Configuration;
040import org.apache.hadoop.hbase.Cell;
041import org.apache.hadoop.hbase.CompareOperator;
042import org.apache.hadoop.hbase.HBaseClassTestRule;
043import org.apache.hadoop.hbase.HBaseTestingUtility;
044import org.apache.hadoop.hbase.HColumnDescriptor;
045import org.apache.hadoop.hbase.HConstants;
046import org.apache.hadoop.hbase.HRegionInfo;
047import org.apache.hadoop.hbase.HRegionLocation;
048import org.apache.hadoop.hbase.HTestConst;
049import org.apache.hadoop.hbase.KeyValue;
050import org.apache.hadoop.hbase.MiniHBaseCluster;
051import org.apache.hadoop.hbase.StartMiniClusterOption;
052import org.apache.hadoop.hbase.TableName;
053import org.apache.hadoop.hbase.TableNameTestRule;
054import org.apache.hadoop.hbase.TableNotFoundException;
055import org.apache.hadoop.hbase.exceptions.DeserializationException;
056import org.apache.hadoop.hbase.filter.BinaryComparator;
057import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
058import org.apache.hadoop.hbase.filter.ColumnRangeFilter;
059import org.apache.hadoop.hbase.filter.FilterBase;
060import org.apache.hadoop.hbase.filter.QualifierFilter;
061import org.apache.hadoop.hbase.regionserver.HRegionServer;
062import org.apache.hadoop.hbase.testclassification.ClientTests;
063import org.apache.hadoop.hbase.testclassification.MediumTests;
064import org.apache.hadoop.hbase.util.Bytes;
065import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
066import org.junit.AfterClass;
067import org.junit.Before;
068import org.junit.ClassRule;
069import org.junit.Rule;
070import org.junit.Test;
071import org.junit.experimental.categories.Category;
072import org.junit.runner.RunWith;
073import org.junit.runners.Parameterized;
074import org.slf4j.Logger;
075import org.slf4j.LoggerFactory;
076
077import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
078
079/**
080 * A client-side test, mostly testing scanners with various parameters. Parameterized on different
081 * registry implementations.
082 */
083@Category({ MediumTests.class, ClientTests.class })
084@RunWith(Parameterized.class)
085public class TestScannersFromClientSide {
086
087  @ClassRule
088  public static final HBaseClassTestRule CLASS_RULE =
089    HBaseClassTestRule.forClass(TestScannersFromClientSide.class);
090
091  private static final Logger LOG = LoggerFactory.getLogger(TestScannersFromClientSide.class);
092
093  private static HBaseTestingUtility TEST_UTIL;
094  private static byte[] ROW = Bytes.toBytes("testRow");
095  private static byte[] FAMILY = Bytes.toBytes("testFamily");
096  private static byte[] QUALIFIER = Bytes.toBytes("testQualifier");
097  private static byte[] VALUE = Bytes.toBytes("testValue");
098
099  @Rule
100  public TableNameTestRule name = new TableNameTestRule();
101
102  /**
103   * @throws java.lang.Exception
104   */
105  @AfterClass
106  public static void tearDownAfterClass() throws Exception {
107    if (TEST_UTIL != null) {
108      TEST_UTIL.shutdownMiniCluster();
109    }
110  }
111
112  /**
113   * @throws java.lang.Exception
114   */
115  @Before
116  public void setUp() throws Exception {
117    // Nothing to do.
118  }
119
120  @Parameterized.Parameters
121  public static Collection<Object[]> parameters() {
122    return Arrays.asList(new Object[][] { { MasterRegistry.class, 1 }, { MasterRegistry.class, 2 },
123      { ZKConnectionRegistry.class, 1 } });
124  }
125
126  /**
127   * JUnit does not provide an easy way to run a hook after each parameterized run. Without that
128   * there is no easy way to restart the test cluster after each parameterized run. Annotation
129   * BeforeParam does not work either because it runs before parameterization and hence does not
130   * have access to the test parameters (which is weird). This *hack* checks if the current instance
131   * of test cluster configuration has the passed parameterized configs. In such a case, we can just
132   * reuse the cluster for test and do not need to initialize from scratch. While this is a hack, it
133   * saves a ton of time for the full test and de-flakes it.
134   */
135  private static boolean isSameParameterizedCluster(Class<?> registryImpl, int numHedgedReqs) {
136    // initialize() is called for every unit test, however we only want to reset the cluster state
137    // at the end of every parameterized run.
138    if (TEST_UTIL == null) {
139      return false;
140    }
141    Configuration conf = TEST_UTIL.getConfiguration();
142    Class<?> confClass = conf.getClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
143      ZKConnectionRegistry.class);
144    int hedgedReqConfig = conf.getInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY,
145      AbstractRpcBasedConnectionRegistry.HEDGED_REQS_FANOUT_DEFAULT);
146    return confClass.getName().equals(registryImpl.getName()) && numHedgedReqs == hedgedReqConfig;
147  }
148
149  public TestScannersFromClientSide(Class<?> registryImpl, int numHedgedReqs) throws Exception {
150    if (isSameParameterizedCluster(registryImpl, numHedgedReqs)) {
151      return;
152    }
153    if (TEST_UTIL != null) {
154      // We reached the end of a parameterized run, clean up the cluster.
155      TEST_UTIL.shutdownMiniCluster();
156    }
157    TEST_UTIL = new HBaseTestingUtility();
158    Configuration conf = TEST_UTIL.getConfiguration();
159    conf.setLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, 10 * 1024 * 1024);
160    conf.setClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, registryImpl,
161      ConnectionRegistry.class);
162    Preconditions.checkArgument(numHedgedReqs > 0);
163    conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, numHedgedReqs);
164    StartMiniClusterOption.Builder builder = StartMiniClusterOption.builder();
165    // Multiple masters needed only when hedged reads for master registry are enabled.
166    builder.numMasters(numHedgedReqs > 1 ? 3 : 1).numRegionServers(3);
167    TEST_UTIL.startMiniCluster(builder.build());
168  }
169
170  /**
171   * Test from client side for batch of scan n
172   */
173  @Test
174  public void testScanBatch() throws Exception {
175    final TableName tableName = name.getTableName();
176    byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 8);
177
178    Table ht = TEST_UTIL.createTable(tableName, FAMILY);
179
180    Put put;
181    Scan scan;
182    Delete delete;
183    Result result;
184    ResultScanner scanner;
185    boolean toLog = true;
186    List<Cell> kvListExp;
187
188    // table: row, family, c0:0, c1:1, ... , c7:7
189    put = new Put(ROW);
190    for (int i = 0; i < QUALIFIERS.length; i++) {
191      KeyValue kv = new KeyValue(ROW, FAMILY, QUALIFIERS[i], i, VALUE);
192      put.add(kv);
193    }
194    ht.put(put);
195
196    // table: row, family, c0:0, c1:1, ..., c6:2, c6:6 , c7:7
197    put = new Put(ROW);
198    KeyValue kv = new KeyValue(ROW, FAMILY, QUALIFIERS[6], 2, VALUE);
199    put.add(kv);
200    ht.put(put);
201
202    // delete upto ts: 3
203    delete = new Delete(ROW);
204    delete.addFamily(FAMILY, 3);
205    ht.delete(delete);
206
207    // without batch
208    scan = new Scan().withStartRow(ROW);
209    scan.setMaxVersions();
210    scanner = ht.getScanner(scan);
211
212    // c4:4, c5:5, c6:6, c7:7
213    kvListExp = new ArrayList<>();
214    kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[4], 4, VALUE));
215    kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[5], 5, VALUE));
216    kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[6], 6, VALUE));
217    kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[7], 7, VALUE));
218    result = scanner.next();
219    verifyResult(result, kvListExp, toLog, "Testing first batch of scan");
220
221    // with batch
222    scan = new Scan().withStartRow(ROW);
223    scan.setMaxVersions();
224    scan.setBatch(2);
225    scanner = ht.getScanner(scan);
226
227    // First batch: c4:4, c5:5
228    kvListExp = new ArrayList<>();
229    kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[4], 4, VALUE));
230    kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[5], 5, VALUE));
231    result = scanner.next();
232    verifyResult(result, kvListExp, toLog, "Testing first batch of scan");
233
234    // Second batch: c6:6, c7:7
235    kvListExp = new ArrayList<>();
236    kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[6], 6, VALUE));
237    kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[7], 7, VALUE));
238    result = scanner.next();
239    verifyResult(result, kvListExp, toLog, "Testing second batch of scan");
240
241  }
242
243  @Test
244  public void testMaxResultSizeIsSetToDefault() throws Exception {
245    final TableName tableName = name.getTableName();
246    Table ht = TEST_UTIL.createTable(tableName, FAMILY);
247
248    // The max result size we expect the scan to use by default.
249    long expectedMaxResultSize =
250      TEST_UTIL.getConfiguration().getLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
251        HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
252
253    int numRows = 5;
254    byte[][] ROWS = HTestConst.makeNAscii(ROW, numRows);
255
256    int numQualifiers = 10;
257    byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, numQualifiers);
258
259    // Specify the cell size such that a single row will be larger than the default
260    // value of maxResultSize. This means that Scan RPCs should return at most a single
261    // result back to the client.
262    int cellSize = (int) (expectedMaxResultSize / (numQualifiers - 1));
263    byte[] cellValue = Bytes.createMaxByteArray(cellSize);
264
265    Put put;
266    List<Put> puts = new ArrayList<>();
267    for (int row = 0; row < ROWS.length; row++) {
268      put = new Put(ROWS[row]);
269      for (int qual = 0; qual < QUALIFIERS.length; qual++) {
270        KeyValue kv = new KeyValue(ROWS[row], FAMILY, QUALIFIERS[qual], cellValue);
271        put.add(kv);
272      }
273      puts.add(put);
274    }
275    ht.put(puts);
276
277    // Create a scan with the default configuration.
278    Scan scan = new Scan();
279
280    ResultScanner scanner = ht.getScanner(scan);
281    assertTrue(scanner instanceof ClientScanner);
282    ClientScanner clientScanner = (ClientScanner) scanner;
283
284    // Call next to issue a single RPC to the server
285    scanner.next();
286
287    // The scanner should have, at most, a single result in its cache. If there more results exists
288    // in the cache it means that more than the expected max result size was fetched.
289    assertTrue("The cache contains: " + clientScanner.getCacheSize() + " results",
290      clientScanner.getCacheSize() <= 1);
291  }
292
293  /**
294   * Scan on not existing table should throw the exception with correct message
295   */
296  @Test
297  public void testScannerForNotExistingTable() {
298    String[] tableNames = { "A", "Z", "A:A", "Z:Z" };
299    for (String tableName : tableNames) {
300      try {
301        Table table = TEST_UTIL.getConnection().getTable(TableName.valueOf(tableName));
302        testSmallScan(table, true, 1, 5);
303        fail("TableNotFoundException was not thrown");
304      } catch (TableNotFoundException e) {
305        // We expect that the message for TableNotFoundException would have only the table name only
306        // Otherwise that would mean that localeRegionInMeta doesn't work properly
307        assertEquals(e.getMessage(), tableName);
308      } catch (Exception e) {
309        fail("Unexpected exception " + e.getMessage());
310      }
311    }
312  }
313
314  @Test
315  public void testSmallScan() throws Exception {
316    final TableName tableName = name.getTableName();
317
318    int numRows = 10;
319    byte[][] ROWS = HTestConst.makeNAscii(ROW, numRows);
320
321    int numQualifiers = 10;
322    byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, numQualifiers);
323
324    Table ht = TEST_UTIL.createTable(tableName, FAMILY);
325
326    Put put;
327    List<Put> puts = new ArrayList<>();
328    for (int row = 0; row < ROWS.length; row++) {
329      put = new Put(ROWS[row]);
330      for (int qual = 0; qual < QUALIFIERS.length; qual++) {
331        KeyValue kv = new KeyValue(ROWS[row], FAMILY, QUALIFIERS[qual], VALUE);
332        put.add(kv);
333      }
334      puts.add(put);
335    }
336    ht.put(puts);
337
338    int expectedRows = numRows;
339    int expectedCols = numRows * numQualifiers;
340
341    // Test normal and reversed
342    testSmallScan(ht, true, expectedRows, expectedCols);
343    testSmallScan(ht, false, expectedRows, expectedCols);
344  }
345
346  /**
347   * Run through a variety of test configurations with a small scan nnnnn
348   */
349  private void testSmallScan(Table table, boolean reversed, int rows, int columns)
350    throws Exception {
351    Scan baseScan = new Scan();
352    baseScan.setReversed(reversed);
353    baseScan.setSmall(true);
354
355    Scan scan = new Scan(baseScan);
356    verifyExpectedCounts(table, scan, rows, columns);
357
358    scan = new Scan(baseScan);
359    scan.setMaxResultSize(1);
360    verifyExpectedCounts(table, scan, rows, columns);
361
362    scan = new Scan(baseScan);
363    scan.setMaxResultSize(1);
364    scan.setCaching(Integer.MAX_VALUE);
365    verifyExpectedCounts(table, scan, rows, columns);
366  }
367
368  private void verifyExpectedCounts(Table table, Scan scan, int expectedRowCount,
369    int expectedCellCount) throws Exception {
370    ResultScanner scanner = table.getScanner(scan);
371
372    int rowCount = 0;
373    int cellCount = 0;
374    Result r = null;
375    while ((r = scanner.next()) != null) {
376      rowCount++;
377      cellCount += r.rawCells().length;
378    }
379
380    assertTrue("Expected row count: " + expectedRowCount + " Actual row count: " + rowCount,
381      expectedRowCount == rowCount);
382    assertTrue("Expected cell count: " + expectedCellCount + " Actual cell count: " + cellCount,
383      expectedCellCount == cellCount);
384    scanner.close();
385  }
386
387  /**
388   * Test from client side for get with maxResultPerCF set n
389   */
390  @Test
391  public void testGetMaxResults() throws Exception {
392    final TableName tableName = name.getTableName();
393    byte[][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3);
394    byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 20);
395
396    Table ht = TEST_UTIL.createTable(tableName, FAMILIES);
397
398    Get get;
399    Put put;
400    Result result;
401    boolean toLog = true;
402    List<Cell> kvListExp;
403
404    kvListExp = new ArrayList<>();
405    // Insert one CF for row[0]
406    put = new Put(ROW);
407    for (int i = 0; i < 10; i++) {
408      KeyValue kv = new KeyValue(ROW, FAMILIES[0], QUALIFIERS[i], 1, VALUE);
409      put.add(kv);
410      kvListExp.add(kv);
411    }
412    ht.put(put);
413
414    get = new Get(ROW);
415    result = ht.get(get);
416    verifyResult(result, kvListExp, toLog, "Testing without setting maxResults");
417
418    get = new Get(ROW);
419    get.setMaxResultsPerColumnFamily(2);
420    result = ht.get(get);
421    kvListExp = new ArrayList<>();
422    kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[0], 1, VALUE));
423    kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[1], 1, VALUE));
424    verifyResult(result, kvListExp, toLog, "Testing basic setMaxResults");
425
426    // Filters: ColumnRangeFilter
427    get = new Get(ROW);
428    get.setMaxResultsPerColumnFamily(5);
429    get.setFilter(new ColumnRangeFilter(QUALIFIERS[2], true, QUALIFIERS[5], true));
430    result = ht.get(get);
431    kvListExp = new ArrayList<>();
432    kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[2], 1, VALUE));
433    kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[3], 1, VALUE));
434    kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[4], 1, VALUE));
435    kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[5], 1, VALUE));
436    verifyResult(result, kvListExp, toLog, "Testing single CF with CRF");
437
438    // Insert two more CF for row[0]
439    // 20 columns for CF2, 10 columns for CF1
440    put = new Put(ROW);
441    for (int i = 0; i < QUALIFIERS.length; i++) {
442      KeyValue kv = new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE);
443      put.add(kv);
444    }
445    ht.put(put);
446
447    put = new Put(ROW);
448    for (int i = 0; i < 10; i++) {
449      KeyValue kv = new KeyValue(ROW, FAMILIES[1], QUALIFIERS[i], 1, VALUE);
450      put.add(kv);
451    }
452    ht.put(put);
453
454    get = new Get(ROW);
455    get.setMaxResultsPerColumnFamily(12);
456    get.addFamily(FAMILIES[1]);
457    get.addFamily(FAMILIES[2]);
458    result = ht.get(get);
459    kvListExp = new ArrayList<>();
460    // Exp: CF1:q0, ..., q9, CF2: q0, q1, q10, q11, ..., q19
461    for (int i = 0; i < 10; i++) {
462      kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[i], 1, VALUE));
463    }
464    for (int i = 0; i < 2; i++) {
465      kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE));
466    }
467    for (int i = 10; i < 20; i++) {
468      kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE));
469    }
470    verifyResult(result, kvListExp, toLog, "Testing multiple CFs");
471
472    // Filters: ColumnRangeFilter and ColumnPrefixFilter
473    get = new Get(ROW);
474    get.setMaxResultsPerColumnFamily(3);
475    get.setFilter(new ColumnRangeFilter(QUALIFIERS[2], true, null, true));
476    result = ht.get(get);
477    kvListExp = new ArrayList<>();
478    for (int i = 2; i < 5; i++) {
479      kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[i], 1, VALUE));
480    }
481    for (int i = 2; i < 5; i++) {
482      kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[i], 1, VALUE));
483    }
484    for (int i = 2; i < 5; i++) {
485      kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE));
486    }
487    verifyResult(result, kvListExp, toLog, "Testing multiple CFs + CRF");
488
489    get = new Get(ROW);
490    get.setMaxResultsPerColumnFamily(7);
491    get.setFilter(new ColumnPrefixFilter(QUALIFIERS[1]));
492    result = ht.get(get);
493    kvListExp = new ArrayList<>();
494    kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[1], 1, VALUE));
495    kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[1], 1, VALUE));
496    kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[1], 1, VALUE));
497    for (int i = 10; i < 16; i++) {
498      kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE));
499    }
500    verifyResult(result, kvListExp, toLog, "Testing multiple CFs + PFF");
501
502  }
503
504  /**
505   * Test from client side for scan with maxResultPerCF set n
506   */
507  @Test
508  public void testScanMaxResults() throws Exception {
509    final TableName tableName = name.getTableName();
510    byte[][] ROWS = HTestConst.makeNAscii(ROW, 2);
511    byte[][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3);
512    byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 10);
513
514    Table ht = TEST_UTIL.createTable(tableName, FAMILIES);
515
516    Put put;
517    Scan scan;
518    Result result;
519    boolean toLog = true;
520    List<Cell> kvListExp, kvListScan;
521
522    kvListExp = new ArrayList<>();
523
524    for (int r = 0; r < ROWS.length; r++) {
525      put = new Put(ROWS[r]);
526      for (int c = 0; c < FAMILIES.length; c++) {
527        for (int q = 0; q < QUALIFIERS.length; q++) {
528          KeyValue kv = new KeyValue(ROWS[r], FAMILIES[c], QUALIFIERS[q], 1, VALUE);
529          put.add(kv);
530          if (q < 4) {
531            kvListExp.add(kv);
532          }
533        }
534      }
535      ht.put(put);
536    }
537
538    scan = new Scan();
539    scan.setMaxResultsPerColumnFamily(4);
540    ResultScanner scanner = ht.getScanner(scan);
541    kvListScan = new ArrayList<>();
542    while ((result = scanner.next()) != null) {
543      for (Cell kv : result.listCells()) {
544        kvListScan.add(kv);
545      }
546    }
547    result = Result.create(kvListScan);
548    verifyResult(result, kvListExp, toLog, "Testing scan with maxResults");
549
550  }
551
552  /**
553   * Test from client side for get with rowOffset n
554   */
555  @Test
556  public void testGetRowOffset() throws Exception {
557    final TableName tableName = name.getTableName();
558    byte[][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3);
559    byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 20);
560
561    Table ht = TEST_UTIL.createTable(tableName, FAMILIES);
562
563    Get get;
564    Put put;
565    Result result;
566    boolean toLog = true;
567    List<Cell> kvListExp;
568
569    // Insert one CF for row
570    kvListExp = new ArrayList<>();
571    put = new Put(ROW);
572    for (int i = 0; i < 10; i++) {
573      KeyValue kv = new KeyValue(ROW, FAMILIES[0], QUALIFIERS[i], 1, VALUE);
574      put.add(kv);
575      // skipping first two kvs
576      if (i < 2) {
577        continue;
578      }
579      kvListExp.add(kv);
580    }
581    ht.put(put);
582
583    // setting offset to 2
584    get = new Get(ROW);
585    get.setRowOffsetPerColumnFamily(2);
586    result = ht.get(get);
587    verifyResult(result, kvListExp, toLog, "Testing basic setRowOffset");
588
589    // setting offset to 20
590    get = new Get(ROW);
591    get.setRowOffsetPerColumnFamily(20);
592    result = ht.get(get);
593    kvListExp = new ArrayList<>();
594    verifyResult(result, kvListExp, toLog, "Testing offset > #kvs");
595
596    // offset + maxResultPerCF
597    get = new Get(ROW);
598    get.setRowOffsetPerColumnFamily(4);
599    get.setMaxResultsPerColumnFamily(5);
600    result = ht.get(get);
601    kvListExp = new ArrayList<>();
602    for (int i = 4; i < 9; i++) {
603      kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[i], 1, VALUE));
604    }
605    verifyResult(result, kvListExp, toLog, "Testing offset + setMaxResultsPerCF");
606
607    // Filters: ColumnRangeFilter
608    get = new Get(ROW);
609    get.setRowOffsetPerColumnFamily(1);
610    get.setFilter(new ColumnRangeFilter(QUALIFIERS[2], true, QUALIFIERS[5], true));
611    result = ht.get(get);
612    kvListExp = new ArrayList<>();
613    kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[3], 1, VALUE));
614    kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[4], 1, VALUE));
615    kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[5], 1, VALUE));
616    verifyResult(result, kvListExp, toLog, "Testing offset with CRF");
617
618    // Insert into two more CFs for row
619    // 10 columns for CF2, 10 columns for CF1
620    for (int j = 2; j > 0; j--) {
621      put = new Put(ROW);
622      for (int i = 0; i < 10; i++) {
623        KeyValue kv = new KeyValue(ROW, FAMILIES[j], QUALIFIERS[i], 1, VALUE);
624        put.add(kv);
625      }
626      ht.put(put);
627    }
628
629    get = new Get(ROW);
630    get.setRowOffsetPerColumnFamily(4);
631    get.setMaxResultsPerColumnFamily(2);
632    get.addFamily(FAMILIES[1]);
633    get.addFamily(FAMILIES[2]);
634    result = ht.get(get);
635    kvListExp = new ArrayList<>();
636    // Exp: CF1:q4, q5, CF2: q4, q5
637    kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[4], 1, VALUE));
638    kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[5], 1, VALUE));
639    kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[4], 1, VALUE));
640    kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[5], 1, VALUE));
641    verifyResult(result, kvListExp, toLog, "Testing offset + multiple CFs + maxResults");
642  }
643
644  @Test
645  public void testScanRawDeleteFamilyVersion() throws Exception {
646    TableName tableName = name.getTableName();
647    TEST_UTIL.createTable(tableName, FAMILY);
648    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
649    conf.set(RPC_CODEC_CONF_KEY, "");
650    conf.set(DEFAULT_CODEC_CLASS, "");
651    try (Connection connection = ConnectionFactory.createConnection(conf);
652      Table table = connection.getTable(tableName)) {
653      Delete delete = new Delete(ROW);
654      delete.addFamilyVersion(FAMILY, 0L);
655      table.delete(delete);
656      Scan scan = new Scan(ROW).setRaw(true);
657      ResultScanner scanner = table.getScanner(scan);
658      int count = 0;
659      while (scanner.next() != null) {
660        count++;
661      }
662      assertEquals(1, count);
663    } finally {
664      TEST_UTIL.deleteTable(tableName);
665    }
666  }
667
668  /**
669   * Test from client side for scan while the region is reopened on the same region server. n
670   */
671  @Test
672  public void testScanOnReopenedRegion() throws Exception {
673    final TableName tableName = name.getTableName();
674    byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 2);
675
676    Table ht = TEST_UTIL.createTable(tableName, FAMILY);
677
678    Put put;
679    Scan scan;
680    Result result;
681    ResultScanner scanner;
682    boolean toLog = false;
683    List<Cell> kvListExp;
684
685    // table: row, family, c0:0, c1:1
686    put = new Put(ROW);
687    for (int i = 0; i < QUALIFIERS.length; i++) {
688      KeyValue kv = new KeyValue(ROW, FAMILY, QUALIFIERS[i], i, VALUE);
689      put.add(kv);
690    }
691    ht.put(put);
692
693    scan = new Scan().withStartRow(ROW);
694    scanner = ht.getScanner(scan);
695
696    HRegionLocation loc;
697
698    try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName)) {
699      loc = locator.getRegionLocation(ROW);
700    }
701    HRegionInfo hri = loc.getRegionInfo();
702    MiniHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster();
703    byte[] regionName = hri.getRegionName();
704    int i = cluster.getServerWith(regionName);
705    HRegionServer rs = cluster.getRegionServer(i);
706    LOG.info("Unassigning " + hri);
707    TEST_UTIL.getAdmin().unassign(hri.getRegionName(), true);
708    long startTime = EnvironmentEdgeManager.currentTime();
709    long timeOut = 10000;
710    boolean offline = false;
711    while (true) {
712      if (rs.getOnlineRegion(regionName) == null) {
713        offline = true;
714        break;
715      }
716      assertTrue("Timed out in closing the testing region",
717        EnvironmentEdgeManager.currentTime() < startTime + timeOut);
718    }
719    assertTrue(offline);
720    LOG.info("Assigning " + hri);
721    TEST_UTIL.getAdmin().assign(hri.getRegionName());
722    startTime = EnvironmentEdgeManager.currentTime();
723    while (true) {
724      rs = cluster.getRegionServer(cluster.getServerWith(regionName));
725      if (rs != null && rs.getOnlineRegion(regionName) != null) {
726        offline = false;
727        break;
728      }
729      assertTrue("Timed out in open the testing region",
730        EnvironmentEdgeManager.currentTime() < startTime + timeOut);
731    }
732    assertFalse(offline);
733
734    // c0:0, c1:1
735    kvListExp = new ArrayList<>();
736    kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[0], 0, VALUE));
737    kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[1], 1, VALUE));
738    result = scanner.next();
739    verifyResult(result, kvListExp, toLog, "Testing scan on re-opened region");
740  }
741
742  @Test
743  public void testAsyncScannerWithSmallData() throws Exception {
744    testAsyncScanner(name.getTableName(), 2, 3, 10, -1, null);
745  }
746
747  @Test
748  public void testAsyncScannerWithManyRows() throws Exception {
749    testAsyncScanner(name.getTableName(), 30000, 1, 1, -1, null);
750  }
751
752  @Test
753  public void testAsyncScannerWithoutCaching() throws Exception {
754    testAsyncScanner(name.getTableName(), 5, 1, 1, 1, (b) -> {
755      try {
756        TimeUnit.MILLISECONDS.sleep(500);
757      } catch (InterruptedException ex) {
758      }
759    });
760  }
761
762  private void testAsyncScanner(TableName table, int rowNumber, int familyNumber,
763    int qualifierNumber, int caching, Consumer<Boolean> listener) throws Exception {
764    assert rowNumber > 0;
765    assert familyNumber > 0;
766    assert qualifierNumber > 0;
767    byte[] row = Bytes.toBytes("r");
768    byte[] family = Bytes.toBytes("f");
769    byte[] qualifier = Bytes.toBytes("q");
770    byte[][] rows = makeNAsciiWithZeroPrefix(row, rowNumber);
771    byte[][] families = makeNAsciiWithZeroPrefix(family, familyNumber);
772    byte[][] qualifiers = makeNAsciiWithZeroPrefix(qualifier, qualifierNumber);
773
774    Table ht = TEST_UTIL.createTable(table, families);
775
776    boolean toLog = true;
777    List<Cell> kvListExp = new ArrayList<>();
778
779    List<Put> puts = new ArrayList<>();
780    for (byte[] r : rows) {
781      Put put = new Put(r);
782      for (byte[] f : families) {
783        for (byte[] q : qualifiers) {
784          KeyValue kv = new KeyValue(r, f, q, 1, VALUE);
785          put.add(kv);
786          kvListExp.add(kv);
787        }
788      }
789      puts.add(put);
790      if (puts.size() > 1000) {
791        ht.put(puts);
792        puts.clear();
793      }
794    }
795    if (!puts.isEmpty()) {
796      ht.put(puts);
797      puts.clear();
798    }
799
800    Scan scan = new Scan();
801    scan.setAsyncPrefetch(true);
802    if (caching > 0) {
803      scan.setCaching(caching);
804    }
805    try (ResultScanner scanner = ht.getScanner(scan)) {
806      assertTrue("Not instance of async scanner", scanner instanceof ClientAsyncPrefetchScanner);
807      ((ClientAsyncPrefetchScanner) scanner).setPrefetchListener(listener);
808      List<Cell> kvListScan = new ArrayList<>();
809      Result result;
810      boolean first = true;
811      int actualRows = 0;
812      while ((result = scanner.next()) != null) {
813        ++actualRows;
814        // waiting for cache. see HBASE-17376
815        if (first) {
816          TimeUnit.SECONDS.sleep(1);
817          first = false;
818        }
819        for (Cell kv : result.listCells()) {
820          kvListScan.add(kv);
821        }
822      }
823      assertEquals(rowNumber, actualRows);
824      // These cells may have different rows but it is ok. The Result#getRow
825      // isn't used in the verifyResult()
826      result = Result.create(kvListScan);
827      verifyResult(result, kvListExp, toLog, "Testing async scan");
828    }
829
830    TEST_UTIL.deleteTable(table);
831  }
832
833  private static byte[][] makeNAsciiWithZeroPrefix(byte[] base, int n) {
834    int maxLength = Integer.toString(n).length();
835    byte[][] ret = new byte[n][];
836    for (int i = 0; i < n; i++) {
837      int length = Integer.toString(i).length();
838      StringBuilder buf = new StringBuilder(Integer.toString(i));
839      IntStream.range(0, maxLength - length).forEach(v -> buf.insert(0, "0"));
840      byte[] tail = Bytes.toBytes(buf.toString());
841      ret[i] = Bytes.add(base, tail);
842    }
843    return ret;
844  }
845
846  static void verifyResult(Result result, List<Cell> expKvList, boolean toLog, String msg) {
847
848    LOG.info(msg);
849    LOG.info("Expected count: " + expKvList.size());
850    LOG.info("Actual count: " + result.size());
851    if (expKvList.isEmpty()) {
852      return;
853    }
854
855    int i = 0;
856    for (Cell kv : result.rawCells()) {
857      if (i >= expKvList.size()) {
858        break; // we will check the size later
859      }
860
861      Cell kvExp = expKvList.get(i++);
862      if (toLog) {
863        LOG.info("get kv is: " + kv.toString());
864        LOG.info("exp kv is: " + kvExp.toString());
865      }
866      assertTrue("Not equal", kvExp.equals(kv));
867    }
868
869    assertEquals(expKvList.size(), result.size());
870  }
871
872  @Test
873  public void testReadExpiredDataForRawScan() throws IOException {
874    TableName tableName = name.getTableName();
875    long ts = EnvironmentEdgeManager.currentTime() - 10000;
876    byte[] value = Bytes.toBytes("expired");
877    try (Table table = TEST_UTIL.createTable(tableName, FAMILY)) {
878      table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, ts, value));
879      assertArrayEquals(value, table.get(new Get(ROW)).getValue(FAMILY, QUALIFIER));
880      TEST_UTIL.getAdmin().modifyColumnFamily(tableName,
881        new HColumnDescriptor(FAMILY).setTimeToLive(5));
882      try (ResultScanner scanner = table.getScanner(FAMILY)) {
883        assertNull(scanner.next());
884      }
885      try (ResultScanner scanner = table.getScanner(new Scan().setRaw(true))) {
886        assertArrayEquals(value, scanner.next().getValue(FAMILY, QUALIFIER));
887        assertNull(scanner.next());
888      }
889    }
890  }
891
892  @Test
893  public void testScanWithColumnsAndFilterAndVersion() throws IOException {
894    TableName tableName = name.getTableName();
895    long now = EnvironmentEdgeManager.currentTime();
896    try (Table table = TEST_UTIL.createTable(tableName, FAMILY, 4)) {
897      for (int i = 0; i < 4; i++) {
898        Put put = new Put(ROW);
899        put.addColumn(FAMILY, QUALIFIER, now + i, VALUE);
900        table.put(put);
901      }
902
903      Scan scan = new Scan();
904      scan.addColumn(FAMILY, QUALIFIER);
905      scan.setFilter(new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(QUALIFIER)));
906      scan.readVersions(3);
907
908      try (ResultScanner scanner = table.getScanner(scan)) {
909        Result result = scanner.next();
910        assertEquals(3, result.size());
911      }
912    }
913  }
914
915  @Test
916  public void testScanWithSameStartRowStopRow() throws IOException {
917    TableName tableName = name.getTableName();
918    try (Table table = TEST_UTIL.createTable(tableName, FAMILY)) {
919      table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE));
920
921      Scan scan = new Scan().withStartRow(ROW).withStopRow(ROW);
922      try (ResultScanner scanner = table.getScanner(scan)) {
923        assertNull(scanner.next());
924      }
925
926      scan = new Scan().withStartRow(ROW, true).withStopRow(ROW, true);
927      try (ResultScanner scanner = table.getScanner(scan)) {
928        Result result = scanner.next();
929        assertNotNull(result);
930        assertArrayEquals(ROW, result.getRow());
931        assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER));
932        assertNull(scanner.next());
933      }
934
935      scan = new Scan().withStartRow(ROW, true).withStopRow(ROW, false);
936      try (ResultScanner scanner = table.getScanner(scan)) {
937        assertNull(scanner.next());
938      }
939
940      scan = new Scan().withStartRow(ROW, false).withStopRow(ROW, false);
941      try (ResultScanner scanner = table.getScanner(scan)) {
942        assertNull(scanner.next());
943      }
944
945      scan = new Scan().withStartRow(ROW, false).withStopRow(ROW, true);
946      try (ResultScanner scanner = table.getScanner(scan)) {
947        assertNull(scanner.next());
948      }
949    }
950  }
951
952  @Test
953  public void testReverseScanWithFlush() throws Exception {
954    TableName tableName = name.getTableName();
955    final int BATCH_SIZE = 10;
956    final int ROWS_TO_INSERT = 100;
957    final byte[] LARGE_VALUE = generateHugeValue(128 * 1024);
958
959    try (Table table = TEST_UTIL.createTable(tableName, FAMILY);
960      Admin admin = TEST_UTIL.getAdmin()) {
961      List<Put> putList = new ArrayList<>();
962      for (long i = 0; i < ROWS_TO_INSERT; i++) {
963        Put put = new Put(Bytes.toBytes(i));
964        put.addColumn(FAMILY, QUALIFIER, LARGE_VALUE);
965        putList.add(put);
966
967        if (putList.size() >= BATCH_SIZE) {
968          table.put(putList);
969          admin.flush(tableName);
970          putList.clear();
971        }
972      }
973
974      if (!putList.isEmpty()) {
975        table.put(putList);
976        admin.flush(tableName);
977        putList.clear();
978      }
979
980      Scan scan = new Scan();
981      scan.setReversed(true);
982      int count = 0;
983
984      try (ResultScanner results = table.getScanner(scan)) {
985        for (Result result : results) {
986          count++;
987        }
988      }
989      assertEquals("Expected " + ROWS_TO_INSERT + " rows in the table but it is " + count,
990        ROWS_TO_INSERT, count);
991    }
992  }
993
994  @Test
995  public void testScannerWithPartialResults() throws Exception {
996    TableName tableName = TableName.valueOf("testScannerWithPartialResults");
997    try (Table table = TEST_UTIL.createMultiRegionTable(tableName, Bytes.toBytes("c"), 4)) {
998      List<Put> puts = new ArrayList<>();
999      byte[] largeArray = new byte[10000];
1000      Put put = new Put(Bytes.toBytes("aaaa0"));
1001      put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("1"), Bytes.toBytes("1"));
1002      put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("2"), Bytes.toBytes("2"));
1003      put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("3"), Bytes.toBytes("3"));
1004      put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("4"), Bytes.toBytes("4"));
1005      puts.add(put);
1006      put = new Put(Bytes.toBytes("aaaa1"));
1007      put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("1"), Bytes.toBytes("1"));
1008      put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("2"), largeArray);
1009      put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("3"), largeArray);
1010      puts.add(put);
1011      table.put(puts);
1012      Scan scan = new Scan();
1013      scan.addFamily(Bytes.toBytes("c"));
1014      scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, tableName.getName());
1015      scan.setMaxResultSize(10001);
1016      scan.setStopRow(Bytes.toBytes("bbbb"));
1017      scan.setFilter(new LimitKVsReturnFilter());
1018      ResultScanner rs = table.getScanner(scan);
1019      Result result;
1020      int expectedKvNumber = 6;
1021      int returnedKvNumber = 0;
1022      while ((result = rs.next()) != null) {
1023        returnedKvNumber += result.listCells().size();
1024      }
1025      rs.close();
1026      assertEquals(expectedKvNumber, returnedKvNumber);
1027    }
1028  }
1029
1030  public static class LimitKVsReturnFilter extends FilterBase {
1031
1032    private int cellCount = 0;
1033
1034    @Override
1035    public ReturnCode filterCell(Cell v) throws IOException {
1036      if (cellCount >= 6) {
1037        cellCount++;
1038        return ReturnCode.SKIP;
1039      }
1040      cellCount++;
1041      return ReturnCode.INCLUDE;
1042    }
1043
1044    @Override
1045    public boolean filterAllRemaining() throws IOException {
1046      if (cellCount < 7) {
1047        return false;
1048      }
1049      cellCount++;
1050      return true;
1051    }
1052
1053    @Override
1054    public String toString() {
1055      return this.getClass().getSimpleName();
1056    }
1057
1058    public static LimitKVsReturnFilter parseFrom(final byte[] pbBytes)
1059      throws DeserializationException {
1060      return new LimitKVsReturnFilter();
1061    }
1062  }
1063}