001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.HConstants.RPC_CODEC_CONF_KEY;
021import static org.apache.hadoop.hbase.client.TestFromClientSide3.generateHugeValue;
022import static org.apache.hadoop.hbase.ipc.RpcClient.DEFAULT_CODEC_CLASS;
023import static org.hamcrest.CoreMatchers.instanceOf;
024import static org.hamcrest.MatcherAssert.assertThat;
025import static org.junit.Assert.assertArrayEquals;
026import static org.junit.Assert.assertEquals;
027import static org.junit.Assert.assertFalse;
028import static org.junit.Assert.assertNotNull;
029import static org.junit.Assert.assertNull;
030import static org.junit.Assert.assertTrue;
031import static org.junit.Assert.fail;
032
033import java.io.IOException;
034import java.util.ArrayList;
035import java.util.Arrays;
036import java.util.Collection;
037import java.util.List;
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.hbase.Cell;
040import org.apache.hadoop.hbase.CompareOperator;
041import org.apache.hadoop.hbase.HBaseClassTestRule;
042import org.apache.hadoop.hbase.HBaseTestingUtil;
043import org.apache.hadoop.hbase.HConstants;
044import org.apache.hadoop.hbase.HRegionLocation;
045import org.apache.hadoop.hbase.HTestConst;
046import org.apache.hadoop.hbase.KeyValue;
047import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
048import org.apache.hadoop.hbase.StartTestingClusterOption;
049import org.apache.hadoop.hbase.TableName;
050import org.apache.hadoop.hbase.TableNameTestRule;
051import org.apache.hadoop.hbase.TableNotFoundException;
052import org.apache.hadoop.hbase.client.Scan.ReadType;
053import org.apache.hadoop.hbase.exceptions.DeserializationException;
054import org.apache.hadoop.hbase.filter.BinaryComparator;
055import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
056import org.apache.hadoop.hbase.filter.ColumnRangeFilter;
057import org.apache.hadoop.hbase.filter.FilterBase;
058import org.apache.hadoop.hbase.filter.QualifierFilter;
059import org.apache.hadoop.hbase.regionserver.HRegionServer;
060import org.apache.hadoop.hbase.testclassification.ClientTests;
061import org.apache.hadoop.hbase.testclassification.MediumTests;
062import org.apache.hadoop.hbase.util.Bytes;
063import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
064import org.junit.AfterClass;
065import org.junit.ClassRule;
066import org.junit.Rule;
067import org.junit.Test;
068import org.junit.experimental.categories.Category;
069import org.junit.runner.RunWith;
070import org.junit.runners.Parameterized;
071import org.slf4j.Logger;
072import org.slf4j.LoggerFactory;
073
074import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
075
076/**
077 * A client-side test, mostly testing scanners with various parameters. Parameterized on different
078 * registry implementations.
079 */
080@Category({ MediumTests.class, ClientTests.class })
081@RunWith(Parameterized.class)
082public class TestScannersFromClientSide {
083
084  @ClassRule
085  public static final HBaseClassTestRule CLASS_RULE =
086    HBaseClassTestRule.forClass(TestScannersFromClientSide.class);
087
088  private static final Logger LOG = LoggerFactory.getLogger(TestScannersFromClientSide.class);
089
090  private static HBaseTestingUtil TEST_UTIL;
091  private static byte[] ROW = Bytes.toBytes("testRow");
092  private static byte[] FAMILY = Bytes.toBytes("testFamily");
093  private static byte[] QUALIFIER = Bytes.toBytes("testQualifier");
094  private static byte[] VALUE = Bytes.toBytes("testValue");
095
096  @Rule
097  public TableNameTestRule name = new TableNameTestRule();
098
099  @AfterClass
100  public static void tearDownAfterClass() throws Exception {
101    if (TEST_UTIL != null) {
102      TEST_UTIL.shutdownMiniCluster();
103    }
104  }
105
106  @Parameterized.Parameters
107  public static Collection<Object[]> parameters() {
108    return Arrays.asList(new Object[][] { { MasterRegistry.class, 1 }, { MasterRegistry.class, 2 },
109      { ZKConnectionRegistry.class, 1 } });
110  }
111
112  /**
113   * JUnit does not provide an easy way to run a hook after each parameterized run. Without that
114   * there is no easy way to restart the test cluster after each parameterized run. Annotation
115   * BeforeParam does not work either because it runs before parameterization and hence does not
116   * have access to the test parameters (which is weird). This *hack* checks if the current instance
117   * of test cluster configuration has the passed parameterized configs. In such a case, we can just
118   * reuse the cluster for test and do not need to initialize from scratch. While this is a hack, it
119   * saves a ton of time for the full test and de-flakes it.
120   */
121  private static boolean isSameParameterizedCluster(Class<?> registryImpl, int numHedgedReqs) {
122    // initialize() is called for every unit test, however we only want to reset the cluster state
123    // at the end of every parameterized run.
124    if (TEST_UTIL == null) {
125      return false;
126    }
127    Configuration conf = TEST_UTIL.getConfiguration();
128    Class<?> confClass = conf.getClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
129      ZKConnectionRegistry.class);
130    int hedgedReqConfig = conf.getInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY,
131      AbstractRpcBasedConnectionRegistry.HEDGED_REQS_FANOUT_DEFAULT);
132    return confClass.getName().equals(registryImpl.getName()) && numHedgedReqs == hedgedReqConfig;
133  }
134
135  public TestScannersFromClientSide(Class<?> registryImpl, int numHedgedReqs) throws Exception {
136    if (isSameParameterizedCluster(registryImpl, numHedgedReqs)) {
137      return;
138    }
139    if (TEST_UTIL != null) {
140      // We reached the end of a parameterized run, clean up the cluster.
141      TEST_UTIL.shutdownMiniCluster();
142    }
143    TEST_UTIL = new HBaseTestingUtil();
144    Configuration conf = TEST_UTIL.getConfiguration();
145    conf.setLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, 10 * 1024 * 1024);
146    conf.setClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, registryImpl,
147      ConnectionRegistry.class);
148    Preconditions.checkArgument(numHedgedReqs > 0);
149    conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, numHedgedReqs);
150    StartTestingClusterOption.Builder builder = StartTestingClusterOption.builder();
151    // Multiple masters needed only when hedged reads for master registry are enabled.
152    builder.numMasters(numHedgedReqs > 1 ? 3 : 1).numRegionServers(3);
153    TEST_UTIL.startMiniCluster(builder.build());
154  }
155
156  /**
157   * Test from client side for batch of scan
158   */
159  @Test
160  public void testScanBatch() throws Exception {
161    final TableName tableName = name.getTableName();
162    byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 8);
163
164    Table ht = TEST_UTIL.createTable(tableName, FAMILY);
165
166    Put put;
167    Scan scan;
168    Delete delete;
169    Result result;
170    ResultScanner scanner;
171    boolean toLog = true;
172    List<Cell> kvListExp;
173
174    // table: row, family, c0:0, c1:1, ... , c7:7
175    put = new Put(ROW);
176    for (int i = 0; i < QUALIFIERS.length; i++) {
177      KeyValue kv = new KeyValue(ROW, FAMILY, QUALIFIERS[i], i, VALUE);
178      put.add(kv);
179    }
180    ht.put(put);
181
182    // table: row, family, c0:0, c1:1, ..., c6:2, c6:6 , c7:7
183    put = new Put(ROW);
184    KeyValue kv = new KeyValue(ROW, FAMILY, QUALIFIERS[6], 2, VALUE);
185    put.add(kv);
186    ht.put(put);
187
188    // delete upto ts: 3
189    delete = new Delete(ROW);
190    delete.addFamily(FAMILY, 3);
191    ht.delete(delete);
192
193    // without batch
194    scan = new Scan().withStartRow(ROW);
195    scan.readAllVersions();
196    scanner = ht.getScanner(scan);
197
198    // c4:4, c5:5, c6:6, c7:7
199    kvListExp = new ArrayList<>();
200    kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[4], 4, VALUE));
201    kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[5], 5, VALUE));
202    kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[6], 6, VALUE));
203    kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[7], 7, VALUE));
204    result = scanner.next();
205    verifyResult(result, kvListExp, toLog, "Testing first batch of scan");
206
207    // with batch
208    scan = new Scan().withStartRow(ROW);
209    scan.readAllVersions();
210    scan.setBatch(2);
211    scanner = ht.getScanner(scan);
212
213    // First batch: c4:4, c5:5
214    kvListExp = new ArrayList<>();
215    kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[4], 4, VALUE));
216    kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[5], 5, VALUE));
217    result = scanner.next();
218    verifyResult(result, kvListExp, toLog, "Testing first batch of scan");
219
220    // Second batch: c6:6, c7:7
221    kvListExp = new ArrayList<>();
222    kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[6], 6, VALUE));
223    kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[7], 7, VALUE));
224    result = scanner.next();
225    verifyResult(result, kvListExp, toLog, "Testing second batch of scan");
226
227  }
228
229  @Test
230  public void testMaxResultSizeIsSetToDefault() throws Exception {
231    final TableName tableName = name.getTableName();
232    Table ht = TEST_UTIL.createTable(tableName, FAMILY);
233
234    // The max result size we expect the scan to use by default.
235    long expectedMaxResultSize =
236      TEST_UTIL.getConfiguration().getLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
237        HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
238
239    int numRows = 5;
240    byte[][] ROWS = HTestConst.makeNAscii(ROW, numRows);
241
242    int numQualifiers = 10;
243    byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, numQualifiers);
244
245    // Specify the cell size such that a single row will be larger than the default
246    // value of maxResultSize. This means that Scan RPCs should return at most a single
247    // result back to the client.
248    int cellSize = (int) (expectedMaxResultSize / (numQualifiers - 1));
249    byte[] cellValue = Bytes.createMaxByteArray(cellSize);
250
251    Put put;
252    List<Put> puts = new ArrayList<>();
253    for (int row = 0; row < ROWS.length; row++) {
254      put = new Put(ROWS[row]);
255      for (int qual = 0; qual < QUALIFIERS.length; qual++) {
256        KeyValue kv = new KeyValue(ROWS[row], FAMILY, QUALIFIERS[qual], cellValue);
257        put.add(kv);
258      }
259      puts.add(put);
260    }
261    ht.put(puts);
262
263    // Create a scan with the default configuration.
264    Scan scan = new Scan();
265
266    try (ResultScanner scanner = ht.getScanner(scan)) {
267      assertThat(scanner, instanceOf(AsyncTableResultScanner.class));
268      scanner.next();
269      AsyncTableResultScanner s = (AsyncTableResultScanner) scanner;
270      // The scanner should have, at most, a single result in its cache. If there more results
271      // exists
272      // in the cache it means that more than the expected max result size was fetched.
273      assertTrue("The cache contains: " + s.getCacheSize() + " results", s.getCacheSize() <= 1);
274    }
275  }
276
277  /**
278   * Scan on not existing table should throw the exception with correct message
279   */
280  @Test
281  public void testScannerForNotExistingTable() {
282    String[] tableNames = { "A", "Z", "A:A", "Z:Z" };
283    for (String tableName : tableNames) {
284      try {
285        Table table = TEST_UTIL.getConnection().getTable(TableName.valueOf(tableName));
286        testSmallScan(table, true, 1, 5);
287        fail("TableNotFoundException was not thrown");
288      } catch (TableNotFoundException e) {
289        // We expect that the message for TableNotFoundException would have only the table name only
290        // Otherwise that would mean that localeRegionInMeta doesn't work properly
291        assertEquals(e.getMessage(), tableName);
292      } catch (Exception e) {
293        fail("Unexpected exception " + e.getMessage());
294      }
295    }
296  }
297
298  @Test
299  public void testSmallScan() throws Exception {
300    final TableName tableName = name.getTableName();
301
302    int numRows = 10;
303    byte[][] ROWS = HTestConst.makeNAscii(ROW, numRows);
304
305    int numQualifiers = 10;
306    byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, numQualifiers);
307
308    Table ht = TEST_UTIL.createTable(tableName, FAMILY);
309
310    Put put;
311    List<Put> puts = new ArrayList<>();
312    for (int row = 0; row < ROWS.length; row++) {
313      put = new Put(ROWS[row]);
314      for (int qual = 0; qual < QUALIFIERS.length; qual++) {
315        KeyValue kv = new KeyValue(ROWS[row], FAMILY, QUALIFIERS[qual], VALUE);
316        put.add(kv);
317      }
318      puts.add(put);
319    }
320    ht.put(puts);
321
322    int expectedRows = numRows;
323    int expectedCols = numRows * numQualifiers;
324
325    // Test normal and reversed
326    testSmallScan(ht, true, expectedRows, expectedCols);
327    testSmallScan(ht, false, expectedRows, expectedCols);
328  }
329
330  /**
331   * Run through a variety of test configurations with a small scan
332   */
333  private void testSmallScan(Table table, boolean reversed, int rows, int columns)
334    throws Exception {
335    Scan baseScan = new Scan();
336    baseScan.setReversed(reversed);
337    baseScan.setReadType(ReadType.PREAD);
338
339    Scan scan = new Scan(baseScan);
340    verifyExpectedCounts(table, scan, rows, columns);
341
342    scan = new Scan(baseScan);
343    scan.setMaxResultSize(1);
344    verifyExpectedCounts(table, scan, rows, columns);
345
346    scan = new Scan(baseScan);
347    scan.setMaxResultSize(1);
348    scan.setCaching(Integer.MAX_VALUE);
349    verifyExpectedCounts(table, scan, rows, columns);
350  }
351
352  private void verifyExpectedCounts(Table table, Scan scan, int expectedRowCount,
353    int expectedCellCount) throws Exception {
354    ResultScanner scanner = table.getScanner(scan);
355
356    int rowCount = 0;
357    int cellCount = 0;
358    Result r = null;
359    while ((r = scanner.next()) != null) {
360      rowCount++;
361      cellCount += r.rawCells().length;
362    }
363
364    assertTrue("Expected row count: " + expectedRowCount + " Actual row count: " + rowCount,
365      expectedRowCount == rowCount);
366    assertTrue("Expected cell count: " + expectedCellCount + " Actual cell count: " + cellCount,
367      expectedCellCount == cellCount);
368    scanner.close();
369  }
370
371  /**
372   * Test from client side for get with maxResultPerCF set
373   */
374  @Test
375  public void testGetMaxResults() throws Exception {
376    final TableName tableName = name.getTableName();
377    byte[][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3);
378    byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 20);
379
380    Table ht = TEST_UTIL.createTable(tableName, FAMILIES);
381
382    Get get;
383    Put put;
384    Result result;
385    boolean toLog = true;
386    List<Cell> kvListExp;
387
388    kvListExp = new ArrayList<>();
389    // Insert one CF for row[0]
390    put = new Put(ROW);
391    for (int i = 0; i < 10; i++) {
392      KeyValue kv = new KeyValue(ROW, FAMILIES[0], QUALIFIERS[i], 1, VALUE);
393      put.add(kv);
394      kvListExp.add(kv);
395    }
396    ht.put(put);
397
398    get = new Get(ROW);
399    result = ht.get(get);
400    verifyResult(result, kvListExp, toLog, "Testing without setting maxResults");
401
402    get = new Get(ROW);
403    get.setMaxResultsPerColumnFamily(2);
404    result = ht.get(get);
405    kvListExp = new ArrayList<>();
406    kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[0], 1, VALUE));
407    kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[1], 1, VALUE));
408    verifyResult(result, kvListExp, toLog, "Testing basic setMaxResults");
409
410    // Filters: ColumnRangeFilter
411    get = new Get(ROW);
412    get.setMaxResultsPerColumnFamily(5);
413    get.setFilter(new ColumnRangeFilter(QUALIFIERS[2], true, QUALIFIERS[5], true));
414    result = ht.get(get);
415    kvListExp = new ArrayList<>();
416    kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[2], 1, VALUE));
417    kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[3], 1, VALUE));
418    kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[4], 1, VALUE));
419    kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[5], 1, VALUE));
420    verifyResult(result, kvListExp, toLog, "Testing single CF with CRF");
421
422    // Insert two more CF for row[0]
423    // 20 columns for CF2, 10 columns for CF1
424    put = new Put(ROW);
425    for (int i = 0; i < QUALIFIERS.length; i++) {
426      KeyValue kv = new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE);
427      put.add(kv);
428    }
429    ht.put(put);
430
431    put = new Put(ROW);
432    for (int i = 0; i < 10; i++) {
433      KeyValue kv = new KeyValue(ROW, FAMILIES[1], QUALIFIERS[i], 1, VALUE);
434      put.add(kv);
435    }
436    ht.put(put);
437
438    get = new Get(ROW);
439    get.setMaxResultsPerColumnFamily(12);
440    get.addFamily(FAMILIES[1]);
441    get.addFamily(FAMILIES[2]);
442    result = ht.get(get);
443    kvListExp = new ArrayList<>();
444    // Exp: CF1:q0, ..., q9, CF2: q0, q1, q10, q11, ..., q19
445    for (int i = 0; i < 10; i++) {
446      kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[i], 1, VALUE));
447    }
448    for (int i = 0; i < 2; i++) {
449      kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE));
450    }
451    for (int i = 10; i < 20; i++) {
452      kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE));
453    }
454    verifyResult(result, kvListExp, toLog, "Testing multiple CFs");
455
456    // Filters: ColumnRangeFilter and ColumnPrefixFilter
457    get = new Get(ROW);
458    get.setMaxResultsPerColumnFamily(3);
459    get.setFilter(new ColumnRangeFilter(QUALIFIERS[2], true, null, true));
460    result = ht.get(get);
461    kvListExp = new ArrayList<>();
462    for (int i = 2; i < 5; i++) {
463      kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[i], 1, VALUE));
464    }
465    for (int i = 2; i < 5; i++) {
466      kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[i], 1, VALUE));
467    }
468    for (int i = 2; i < 5; i++) {
469      kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE));
470    }
471    verifyResult(result, kvListExp, toLog, "Testing multiple CFs + CRF");
472
473    get = new Get(ROW);
474    get.setMaxResultsPerColumnFamily(7);
475    get.setFilter(new ColumnPrefixFilter(QUALIFIERS[1]));
476    result = ht.get(get);
477    kvListExp = new ArrayList<>();
478    kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[1], 1, VALUE));
479    kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[1], 1, VALUE));
480    kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[1], 1, VALUE));
481    for (int i = 10; i < 16; i++) {
482      kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE));
483    }
484    verifyResult(result, kvListExp, toLog, "Testing multiple CFs + PFF");
485
486  }
487
488  /**
489   * Test from client side for scan with maxResultPerCF set
490   */
491  @Test
492  public void testScanMaxResults() throws Exception {
493    final TableName tableName = name.getTableName();
494    byte[][] ROWS = HTestConst.makeNAscii(ROW, 2);
495    byte[][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3);
496    byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 10);
497
498    Table ht = TEST_UTIL.createTable(tableName, FAMILIES);
499
500    Put put;
501    Scan scan;
502    Result result;
503    boolean toLog = true;
504    List<Cell> kvListExp, kvListScan;
505
506    kvListExp = new ArrayList<>();
507
508    for (int r = 0; r < ROWS.length; r++) {
509      put = new Put(ROWS[r]);
510      for (int c = 0; c < FAMILIES.length; c++) {
511        for (int q = 0; q < QUALIFIERS.length; q++) {
512          KeyValue kv = new KeyValue(ROWS[r], FAMILIES[c], QUALIFIERS[q], 1, VALUE);
513          put.add(kv);
514          if (q < 4) {
515            kvListExp.add(kv);
516          }
517        }
518      }
519      ht.put(put);
520    }
521
522    scan = new Scan();
523    scan.setMaxResultsPerColumnFamily(4);
524    ResultScanner scanner = ht.getScanner(scan);
525    kvListScan = new ArrayList<>();
526    while ((result = scanner.next()) != null) {
527      for (Cell kv : result.listCells()) {
528        kvListScan.add(kv);
529      }
530    }
531    result = Result.create(kvListScan);
532    verifyResult(result, kvListExp, toLog, "Testing scan with maxResults");
533
534  }
535
536  /**
537   * Test from client side for get with rowOffset
538   */
539  @Test
540  public void testGetRowOffset() throws Exception {
541    final TableName tableName = name.getTableName();
542    byte[][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3);
543    byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 20);
544
545    Table ht = TEST_UTIL.createTable(tableName, FAMILIES);
546
547    Get get;
548    Put put;
549    Result result;
550    boolean toLog = true;
551    List<Cell> kvListExp;
552
553    // Insert one CF for row
554    kvListExp = new ArrayList<>();
555    put = new Put(ROW);
556    for (int i = 0; i < 10; i++) {
557      KeyValue kv = new KeyValue(ROW, FAMILIES[0], QUALIFIERS[i], 1, VALUE);
558      put.add(kv);
559      // skipping first two kvs
560      if (i < 2) {
561        continue;
562      }
563      kvListExp.add(kv);
564    }
565    ht.put(put);
566
567    // setting offset to 2
568    get = new Get(ROW);
569    get.setRowOffsetPerColumnFamily(2);
570    result = ht.get(get);
571    verifyResult(result, kvListExp, toLog, "Testing basic setRowOffset");
572
573    // setting offset to 20
574    get = new Get(ROW);
575    get.setRowOffsetPerColumnFamily(20);
576    result = ht.get(get);
577    kvListExp = new ArrayList<>();
578    verifyResult(result, kvListExp, toLog, "Testing offset > #kvs");
579
580    // offset + maxResultPerCF
581    get = new Get(ROW);
582    get.setRowOffsetPerColumnFamily(4);
583    get.setMaxResultsPerColumnFamily(5);
584    result = ht.get(get);
585    kvListExp = new ArrayList<>();
586    for (int i = 4; i < 9; i++) {
587      kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[i], 1, VALUE));
588    }
589    verifyResult(result, kvListExp, toLog, "Testing offset + setMaxResultsPerCF");
590
591    // Filters: ColumnRangeFilter
592    get = new Get(ROW);
593    get.setRowOffsetPerColumnFamily(1);
594    get.setFilter(new ColumnRangeFilter(QUALIFIERS[2], true, QUALIFIERS[5], true));
595    result = ht.get(get);
596    kvListExp = new ArrayList<>();
597    kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[3], 1, VALUE));
598    kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[4], 1, VALUE));
599    kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[5], 1, VALUE));
600    verifyResult(result, kvListExp, toLog, "Testing offset with CRF");
601
602    // Insert into two more CFs for row
603    // 10 columns for CF2, 10 columns for CF1
604    for (int j = 2; j > 0; j--) {
605      put = new Put(ROW);
606      for (int i = 0; i < 10; i++) {
607        KeyValue kv = new KeyValue(ROW, FAMILIES[j], QUALIFIERS[i], 1, VALUE);
608        put.add(kv);
609      }
610      ht.put(put);
611    }
612
613    get = new Get(ROW);
614    get.setRowOffsetPerColumnFamily(4);
615    get.setMaxResultsPerColumnFamily(2);
616    get.addFamily(FAMILIES[1]);
617    get.addFamily(FAMILIES[2]);
618    result = ht.get(get);
619    kvListExp = new ArrayList<>();
620    // Exp: CF1:q4, q5, CF2: q4, q5
621    kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[4], 1, VALUE));
622    kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[5], 1, VALUE));
623    kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[4], 1, VALUE));
624    kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[5], 1, VALUE));
625    verifyResult(result, kvListExp, toLog, "Testing offset + multiple CFs + maxResults");
626  }
627
628  @Test
629  public void testScanRawDeleteFamilyVersion() throws Exception {
630    TableName tableName = name.getTableName();
631    TEST_UTIL.createTable(tableName, FAMILY);
632    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
633    conf.set(RPC_CODEC_CONF_KEY, "");
634    conf.set(DEFAULT_CODEC_CLASS, "");
635    try (Connection connection = ConnectionFactory.createConnection(conf);
636      Table table = connection.getTable(tableName)) {
637      Delete delete = new Delete(ROW);
638      delete.addFamilyVersion(FAMILY, 0L);
639      table.delete(delete);
640      Scan scan = new Scan().withStartRow(ROW).setRaw(true);
641      ResultScanner scanner = table.getScanner(scan);
642      int count = 0;
643      while (scanner.next() != null) {
644        count++;
645      }
646      assertEquals(1, count);
647    } finally {
648      TEST_UTIL.deleteTable(tableName);
649    }
650  }
651
652  /**
653   * Test from client side for scan while the region is reopened on the same region server.
654   */
655  @Test
656  public void testScanOnReopenedRegion() throws Exception {
657    final TableName tableName = name.getTableName();
658    byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 2);
659
660    Table ht = TEST_UTIL.createTable(tableName, FAMILY);
661
662    Put put;
663    Scan scan;
664    Result result;
665    ResultScanner scanner;
666    boolean toLog = false;
667    List<Cell> kvListExp;
668
669    // table: row, family, c0:0, c1:1
670    put = new Put(ROW);
671    for (int i = 0; i < QUALIFIERS.length; i++) {
672      KeyValue kv = new KeyValue(ROW, FAMILY, QUALIFIERS[i], i, VALUE);
673      put.add(kv);
674    }
675    ht.put(put);
676
677    scan = new Scan().withStartRow(ROW);
678    scanner = ht.getScanner(scan);
679
680    HRegionLocation loc;
681
682    try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName)) {
683      loc = locator.getRegionLocation(ROW);
684    }
685    RegionInfo hri = loc.getRegion();
686    SingleProcessHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster();
687    byte[] regionName = hri.getRegionName();
688    int i = cluster.getServerWith(regionName);
689    HRegionServer rs = cluster.getRegionServer(i);
690    LOG.info("Unassigning " + hri);
691    TEST_UTIL.getAdmin().unassign(hri.getRegionName(), true);
692    long startTime = EnvironmentEdgeManager.currentTime();
693    long timeOut = 10000;
694    boolean offline = false;
695    while (true) {
696      if (rs.getOnlineRegion(regionName) == null) {
697        offline = true;
698        break;
699      }
700      assertTrue("Timed out in closing the testing region",
701        EnvironmentEdgeManager.currentTime() < startTime + timeOut);
702    }
703    assertTrue(offline);
704    LOG.info("Assigning " + hri);
705    TEST_UTIL.getAdmin().assign(hri.getRegionName());
706    startTime = EnvironmentEdgeManager.currentTime();
707    while (true) {
708      rs = cluster.getRegionServer(cluster.getServerWith(regionName));
709      if (rs != null && rs.getOnlineRegion(regionName) != null) {
710        offline = false;
711        break;
712      }
713      assertTrue("Timed out in open the testing region",
714        EnvironmentEdgeManager.currentTime() < startTime + timeOut);
715    }
716    assertFalse(offline);
717
718    // c0:0, c1:1
719    kvListExp = new ArrayList<>();
720    kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[0], 0, VALUE));
721    kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[1], 1, VALUE));
722    result = scanner.next();
723    verifyResult(result, kvListExp, toLog, "Testing scan on re-opened region");
724  }
725
726  static void verifyResult(Result result, List<Cell> expKvList, boolean toLog, String msg) {
727
728    LOG.info(msg);
729    LOG.info("Expected count: " + expKvList.size());
730    LOG.info("Actual count: " + result.size());
731    if (expKvList.isEmpty()) {
732      return;
733    }
734
735    int i = 0;
736    for (Cell kv : result.rawCells()) {
737      if (i >= expKvList.size()) {
738        break; // we will check the size later
739      }
740
741      Cell kvExp = expKvList.get(i++);
742      if (toLog) {
743        LOG.info("get kv is: " + kv.toString());
744        LOG.info("exp kv is: " + kvExp.toString());
745      }
746      assertTrue("Not equal", kvExp.equals(kv));
747    }
748
749    assertEquals(expKvList.size(), result.size());
750  }
751
752  @Test
753  public void testReadExpiredDataForRawScan() throws IOException {
754    TableName tableName = name.getTableName();
755    long ts = EnvironmentEdgeManager.currentTime() - 10000;
756    byte[] value = Bytes.toBytes("expired");
757    try (Table table = TEST_UTIL.createTable(tableName, FAMILY)) {
758      table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, ts, value));
759      assertArrayEquals(value, table.get(new Get(ROW)).getValue(FAMILY, QUALIFIER));
760      TEST_UTIL.getAdmin().modifyColumnFamily(tableName,
761        ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setTimeToLive(5).build());
762      try (ResultScanner scanner = table.getScanner(FAMILY)) {
763        assertNull(scanner.next());
764      }
765      try (ResultScanner scanner = table.getScanner(new Scan().setRaw(true))) {
766        assertArrayEquals(value, scanner.next().getValue(FAMILY, QUALIFIER));
767        assertNull(scanner.next());
768      }
769    }
770  }
771
772  @Test
773  public void testScanWithColumnsAndFilterAndVersion() throws IOException {
774    TableName tableName = name.getTableName();
775    long now = EnvironmentEdgeManager.currentTime();
776    try (Table table = TEST_UTIL.createTable(tableName, FAMILY, 4)) {
777      for (int i = 0; i < 4; i++) {
778        Put put = new Put(ROW);
779        put.addColumn(FAMILY, QUALIFIER, now + i, VALUE);
780        table.put(put);
781      }
782
783      Scan scan = new Scan();
784      scan.addColumn(FAMILY, QUALIFIER);
785      scan.setFilter(new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(QUALIFIER)));
786      scan.readVersions(3);
787
788      try (ResultScanner scanner = table.getScanner(scan)) {
789        Result result = scanner.next();
790        assertEquals(3, result.size());
791      }
792    }
793  }
794
795  @Test
796  public void testScanWithSameStartRowStopRow() throws IOException {
797    TableName tableName = name.getTableName();
798    try (Table table = TEST_UTIL.createTable(tableName, FAMILY)) {
799      table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE));
800
801      Scan scan = new Scan().withStartRow(ROW).withStopRow(ROW);
802      try (ResultScanner scanner = table.getScanner(scan)) {
803        assertNull(scanner.next());
804      }
805
806      scan = new Scan().withStartRow(ROW, true).withStopRow(ROW, true);
807      try (ResultScanner scanner = table.getScanner(scan)) {
808        Result result = scanner.next();
809        assertNotNull(result);
810        assertArrayEquals(ROW, result.getRow());
811        assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER));
812        assertNull(scanner.next());
813      }
814
815      scan = new Scan().withStartRow(ROW, true).withStopRow(ROW, false);
816      try (ResultScanner scanner = table.getScanner(scan)) {
817        assertNull(scanner.next());
818      }
819
820      scan = new Scan().withStartRow(ROW, false).withStopRow(ROW, false);
821      try (ResultScanner scanner = table.getScanner(scan)) {
822        assertNull(scanner.next());
823      }
824
825      scan = new Scan().withStartRow(ROW, false).withStopRow(ROW, true);
826      try (ResultScanner scanner = table.getScanner(scan)) {
827        assertNull(scanner.next());
828      }
829    }
830  }
831
832  @Test
833  public void testReverseScanWithFlush() throws Exception {
834    TableName tableName = name.getTableName();
835    final int BATCH_SIZE = 10;
836    final int ROWS_TO_INSERT = 100;
837    final byte[] LARGE_VALUE = generateHugeValue(128 * 1024);
838
839    try (Table table = TEST_UTIL.createTable(tableName, FAMILY);
840      Admin admin = TEST_UTIL.getAdmin()) {
841      List<Put> putList = new ArrayList<>();
842      for (long i = 0; i < ROWS_TO_INSERT; i++) {
843        Put put = new Put(Bytes.toBytes(i));
844        put.addColumn(FAMILY, QUALIFIER, LARGE_VALUE);
845        putList.add(put);
846
847        if (putList.size() >= BATCH_SIZE) {
848          table.put(putList);
849          admin.flush(tableName);
850          putList.clear();
851        }
852      }
853
854      if (!putList.isEmpty()) {
855        table.put(putList);
856        admin.flush(tableName);
857        putList.clear();
858      }
859
860      Scan scan = new Scan();
861      scan.setReversed(true);
862      int count = 0;
863
864      try (ResultScanner results = table.getScanner(scan)) {
865        for (Result result : results) {
866          count++;
867        }
868      }
869      assertEquals("Expected " + ROWS_TO_INSERT + " rows in the table but it is " + count,
870        ROWS_TO_INSERT, count);
871    }
872  }
873
874  @Test
875  public void testScannerWithPartialResults() throws Exception {
876    TableName tableName = TableName.valueOf("testScannerWithPartialResults");
877    try (Table table = TEST_UTIL.createMultiRegionTable(tableName, Bytes.toBytes("c"), 4)) {
878      List<Put> puts = new ArrayList<>();
879      byte[] largeArray = new byte[10000];
880      Put put = new Put(Bytes.toBytes("aaaa0"));
881      put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("1"), Bytes.toBytes("1"));
882      put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("2"), Bytes.toBytes("2"));
883      put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("3"), Bytes.toBytes("3"));
884      put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("4"), Bytes.toBytes("4"));
885      puts.add(put);
886      put = new Put(Bytes.toBytes("aaaa1"));
887      put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("1"), Bytes.toBytes("1"));
888      put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("2"), largeArray);
889      put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("3"), largeArray);
890      puts.add(put);
891      table.put(puts);
892      Scan scan = new Scan();
893      scan.addFamily(Bytes.toBytes("c"));
894      scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, tableName.getName());
895      scan.setMaxResultSize(10001);
896      scan.withStopRow(Bytes.toBytes("bbbb"));
897      scan.setFilter(new LimitKVsReturnFilter());
898      ResultScanner rs = table.getScanner(scan);
899      Result result;
900      int expectedKvNumber = 6;
901      int returnedKvNumber = 0;
902      while ((result = rs.next()) != null) {
903        returnedKvNumber += result.listCells().size();
904      }
905      rs.close();
906      assertEquals(expectedKvNumber, returnedKvNumber);
907    }
908  }
909
910  public static class LimitKVsReturnFilter extends FilterBase {
911
912    private int cellCount = 0;
913
914    @Override
915    public ReturnCode filterCell(Cell v) throws IOException {
916      if (cellCount >= 6) {
917        cellCount++;
918        return ReturnCode.SKIP;
919      }
920      cellCount++;
921      return ReturnCode.INCLUDE;
922    }
923
924    @Override
925    public boolean filterAllRemaining() throws IOException {
926      if (cellCount < 7) {
927        return false;
928      }
929      cellCount++;
930      return true;
931    }
932
933    @Override
934    public String toString() {
935      return this.getClass().getSimpleName();
936    }
937
938    public static LimitKVsReturnFilter parseFrom(final byte[] pbBytes)
939      throws DeserializationException {
940      return new LimitKVsReturnFilter();
941    }
942  }
943}