001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.HConstants.RPC_CODEC_CONF_KEY;
021import static org.apache.hadoop.hbase.client.FromClientSideTest3.generateHugeValue;
022import static org.apache.hadoop.hbase.ipc.RpcClient.DEFAULT_CODEC_CLASS;
023import static org.hamcrest.CoreMatchers.instanceOf;
024import static org.hamcrest.MatcherAssert.assertThat;
025import static org.junit.jupiter.api.Assertions.assertArrayEquals;
026import static org.junit.jupiter.api.Assertions.assertEquals;
027import static org.junit.jupiter.api.Assertions.assertFalse;
028import static org.junit.jupiter.api.Assertions.assertNotNull;
029import static org.junit.jupiter.api.Assertions.assertNull;
030import static org.junit.jupiter.api.Assertions.assertThrows;
031import static org.junit.jupiter.api.Assertions.assertTrue;
032
033import java.io.IOException;
034import java.util.ArrayList;
035import java.util.List;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.hbase.Cell;
038import org.apache.hadoop.hbase.CompareOperator;
039import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate;
040import org.apache.hadoop.hbase.HConstants;
041import org.apache.hadoop.hbase.HRegionLocation;
042import org.apache.hadoop.hbase.HTestConst;
043import org.apache.hadoop.hbase.KeyValue;
044import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
045import org.apache.hadoop.hbase.TableName;
046import org.apache.hadoop.hbase.TableNotFoundException;
047import org.apache.hadoop.hbase.client.Scan.ReadType;
048import org.apache.hadoop.hbase.exceptions.DeserializationException;
049import org.apache.hadoop.hbase.filter.BinaryComparator;
050import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
051import org.apache.hadoop.hbase.filter.ColumnRangeFilter;
052import org.apache.hadoop.hbase.filter.FilterBase;
053import org.apache.hadoop.hbase.filter.QualifierFilter;
054import org.apache.hadoop.hbase.regionserver.HRegionServer;
055import org.apache.hadoop.hbase.testclassification.ClientTests;
056import org.apache.hadoop.hbase.testclassification.LargeTests;
057import org.apache.hadoop.hbase.util.Bytes;
058import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
059import org.junit.jupiter.api.BeforeAll;
060import org.junit.jupiter.api.Tag;
061import org.junit.jupiter.api.TestTemplate;
062import org.slf4j.Logger;
063import org.slf4j.LoggerFactory;
064
065/**
066 * A client-side test, mostly testing scanners with various parameters. Parameterized on different
067 * registry implementations.
068 */
069@Tag(LargeTests.TAG)
070@Tag(ClientTests.TAG)
071@HBaseParameterizedTestTemplate(name = "{index}: registryImpl={0}, numHedgedReqs={1}")
072public class TestScannersFromClientSide extends FromClientSideTestBase {
073
074  public TestScannersFromClientSide(Class<? extends ConnectionRegistry> registryImpl,
075    int numHedgedReqs) {
076    super(registryImpl, numHedgedReqs);
077  }
078
079  private static final Logger LOG = LoggerFactory.getLogger(TestScannersFromClientSide.class);
080
081  @BeforeAll
082  public static void setUpBeforeClass() throws Exception {
083    Configuration conf = TEST_UTIL.getConfiguration();
084    conf.setLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, 10 * 1024 * 1024);
085    SLAVES = 3;
086    initialize();
087  }
088
089  /**
090   * Test from client side for batch of scan
091   */
092  @TestTemplate
093  public void testScanBatch() throws Exception {
094    byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 8);
095
096    TEST_UTIL.createTable(tableName, FAMILY);
097    try (Connection conn = getConnection(); Table ht = conn.getTable(tableName)) {
098      Put put;
099      Scan scan;
100      Delete delete;
101      Result result;
102      ResultScanner scanner;
103      boolean toLog = true;
104      List<Cell> kvListExp;
105
106      // table: row, family, c0:0, c1:1, ... , c7:7
107      put = new Put(ROW);
108      for (int i = 0; i < QUALIFIERS.length; i++) {
109        KeyValue kv = new KeyValue(ROW, FAMILY, QUALIFIERS[i], i, VALUE);
110        put.add(kv);
111      }
112      ht.put(put);
113
114      // table: row, family, c0:0, c1:1, ..., c6:2, c6:6 , c7:7
115      put = new Put(ROW);
116      KeyValue kv = new KeyValue(ROW, FAMILY, QUALIFIERS[6], 2, VALUE);
117      put.add(kv);
118      ht.put(put);
119
120      // delete upto ts: 3
121      delete = new Delete(ROW);
122      delete.addFamily(FAMILY, 3);
123      ht.delete(delete);
124
125      // without batch
126      scan = new Scan().withStartRow(ROW);
127      scan.readAllVersions();
128      scanner = ht.getScanner(scan);
129
130      // c4:4, c5:5, c6:6, c7:7
131      kvListExp = new ArrayList<>();
132      kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[4], 4, VALUE));
133      kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[5], 5, VALUE));
134      kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[6], 6, VALUE));
135      kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[7], 7, VALUE));
136      result = scanner.next();
137      verifyResult(result, kvListExp, toLog, "Testing first batch of scan");
138
139      // with batch
140      scan = new Scan().withStartRow(ROW);
141      scan.readAllVersions();
142      scan.setBatch(2);
143      scanner = ht.getScanner(scan);
144
145      // First batch: c4:4, c5:5
146      kvListExp = new ArrayList<>();
147      kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[4], 4, VALUE));
148      kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[5], 5, VALUE));
149      result = scanner.next();
150      verifyResult(result, kvListExp, toLog, "Testing first batch of scan");
151
152      // Second batch: c6:6, c7:7
153      kvListExp = new ArrayList<>();
154      kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[6], 6, VALUE));
155      kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[7], 7, VALUE));
156      result = scanner.next();
157      verifyResult(result, kvListExp, toLog, "Testing second batch of scan");
158    }
159  }
160
161  @TestTemplate
162  public void testMaxResultSizeIsSetToDefault() throws Exception {
163    TEST_UTIL.createTable(tableName, FAMILY);
164    try (Connection conn = getConnection(); Table ht = conn.getTable(tableName)) {
165      // The max result size we expect the scan to use by default.
166      long expectedMaxResultSize =
167        TEST_UTIL.getConfiguration().getLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
168          HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
169
170      int numRows = 5;
171      byte[][] ROWS = HTestConst.makeNAscii(ROW, numRows);
172
173      int numQualifiers = 10;
174      byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, numQualifiers);
175
176      // Specify the cell size such that a single row will be larger than the default
177      // value of maxResultSize. This means that Scan RPCs should return at most a single
178      // result back to the client.
179      int cellSize = (int) (expectedMaxResultSize / (numQualifiers - 1));
180      byte[] cellValue = Bytes.createMaxByteArray(cellSize);
181
182      Put put;
183      List<Put> puts = new ArrayList<>();
184      for (int row = 0; row < ROWS.length; row++) {
185        put = new Put(ROWS[row]);
186        for (int qual = 0; qual < QUALIFIERS.length; qual++) {
187          KeyValue kv = new KeyValue(ROWS[row], FAMILY, QUALIFIERS[qual], cellValue);
188          put.add(kv);
189        }
190        puts.add(put);
191      }
192      ht.put(puts);
193
194      // Create a scan with the default configuration.
195      Scan scan = new Scan();
196
197      try (ResultScanner scanner = ht.getScanner(scan)) {
198        assertThat(scanner, instanceOf(AsyncTableResultScanner.class));
199        scanner.next();
200        AsyncTableResultScanner s = (AsyncTableResultScanner) scanner;
201        // The scanner should have, at most, a single result in its cache. If there more results
202        // exists in the cache it means that more than the expected max result size was fetched.
203        assertTrue(s.getCacheSize() <= 1, "The cache contains: " + s.getCacheSize() + " results");
204      }
205    }
206  }
207
208  /**
209   * Scan on not existing table should throw the exception with correct message
210   */
211  @TestTemplate
212  public void testScannerForNotExistingTable() throws Exception {
213    String[] tableNames = { "A", "Z", "A:A", "Z:Z" };
214    for (String tableName : tableNames) {
215      try (Connection conn = getConnection();
216        Table table = conn.getTable(TableName.valueOf(tableName))) {
217        TableNotFoundException e = assertThrows(TableNotFoundException.class,
218          () -> testSmallScan(table, true, 1, 5), "TableNotFoundException was not thrown");
219        // We expect that the message for TableNotFoundException would have only the table name only
220        // Otherwise that would mean that localeRegionInMeta doesn't work properly
221        assertEquals(e.getMessage(), tableName);
222      }
223    }
224  }
225
226  @TestTemplate
227  public void testSmallScan() throws Exception {
228    int numRows = 10;
229    byte[][] ROWS = HTestConst.makeNAscii(ROW, numRows);
230
231    int numQualifiers = 10;
232    byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, numQualifiers);
233
234    TEST_UTIL.createTable(tableName, FAMILY);
235    try (Connection conn = getConnection(); Table ht = conn.getTable(tableName)) {
236      Put put;
237      List<Put> puts = new ArrayList<>();
238      for (int row = 0; row < ROWS.length; row++) {
239        put = new Put(ROWS[row]);
240        for (int qual = 0; qual < QUALIFIERS.length; qual++) {
241          KeyValue kv = new KeyValue(ROWS[row], FAMILY, QUALIFIERS[qual], VALUE);
242          put.add(kv);
243        }
244        puts.add(put);
245      }
246      ht.put(puts);
247
248      int expectedRows = numRows;
249      int expectedCols = numRows * numQualifiers;
250
251      // Test normal and reversed
252      testSmallScan(ht, true, expectedRows, expectedCols);
253      testSmallScan(ht, false, expectedRows, expectedCols);
254    }
255  }
256
257  /**
258   * Run through a variety of test configurations with a small scan
259   */
260  private void testSmallScan(Table table, boolean reversed, int rows, int columns)
261    throws Exception {
262    Scan baseScan = new Scan();
263    baseScan.setReversed(reversed);
264    baseScan.setReadType(ReadType.PREAD);
265
266    Scan scan = new Scan(baseScan);
267    verifyExpectedCounts(table, scan, rows, columns);
268
269    scan = new Scan(baseScan);
270    scan.setMaxResultSize(1);
271    verifyExpectedCounts(table, scan, rows, columns);
272
273    scan = new Scan(baseScan);
274    scan.setMaxResultSize(1);
275    scan.setCaching(Integer.MAX_VALUE);
276    verifyExpectedCounts(table, scan, rows, columns);
277  }
278
279  private void verifyExpectedCounts(Table table, Scan scan, int expectedRowCount,
280    int expectedCellCount) throws Exception {
281    try (ResultScanner scanner = table.getScanner(scan)) {
282
283      int rowCount = 0;
284      int cellCount = 0;
285      Result r = null;
286      while ((r = scanner.next()) != null) {
287        rowCount++;
288        cellCount += r.rawCells().length;
289      }
290
291      assertEquals(expectedRowCount, rowCount,
292        "Expected row count: " + expectedRowCount + " Actual row count: " + rowCount);
293      assertEquals(expectedCellCount, cellCount,
294        "Expected cell count: " + expectedCellCount + " Actual cell count: " + cellCount);
295    }
296  }
297
298  /**
299   * Test from client side for get with maxResultPerCF set
300   */
301  @TestTemplate
302  public void testGetMaxResults() throws Exception {
303    byte[][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3);
304    byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 20);
305
306    TEST_UTIL.createTable(tableName, FAMILIES);
307    try (Connection conn = getConnection(); Table ht = conn.getTable(tableName)) {
308      Get get;
309      Put put;
310      Result result;
311      boolean toLog = true;
312      List<Cell> kvListExp;
313
314      kvListExp = new ArrayList<>();
315      // Insert one CF for row[0]
316      put = new Put(ROW);
317      for (int i = 0; i < 10; i++) {
318        KeyValue kv = new KeyValue(ROW, FAMILIES[0], QUALIFIERS[i], 1, VALUE);
319        put.add(kv);
320        kvListExp.add(kv);
321      }
322      ht.put(put);
323
324      get = new Get(ROW);
325      result = ht.get(get);
326      verifyResult(result, kvListExp, toLog, "Testing without setting maxResults");
327
328      get = new Get(ROW);
329      get.setMaxResultsPerColumnFamily(2);
330      result = ht.get(get);
331      kvListExp = new ArrayList<>();
332      kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[0], 1, VALUE));
333      kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[1], 1, VALUE));
334      verifyResult(result, kvListExp, toLog, "Testing basic setMaxResults");
335
336      // Filters: ColumnRangeFilter
337      get = new Get(ROW);
338      get.setMaxResultsPerColumnFamily(5);
339      get.setFilter(new ColumnRangeFilter(QUALIFIERS[2], true, QUALIFIERS[5], true));
340      result = ht.get(get);
341      kvListExp = new ArrayList<>();
342      kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[2], 1, VALUE));
343      kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[3], 1, VALUE));
344      kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[4], 1, VALUE));
345      kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[5], 1, VALUE));
346      verifyResult(result, kvListExp, toLog, "Testing single CF with CRF");
347
348      // Insert two more CF for row[0]
349      // 20 columns for CF2, 10 columns for CF1
350      put = new Put(ROW);
351      for (int i = 0; i < QUALIFIERS.length; i++) {
352        KeyValue kv = new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE);
353        put.add(kv);
354      }
355      ht.put(put);
356
357      put = new Put(ROW);
358      for (int i = 0; i < 10; i++) {
359        KeyValue kv = new KeyValue(ROW, FAMILIES[1], QUALIFIERS[i], 1, VALUE);
360        put.add(kv);
361      }
362      ht.put(put);
363
364      get = new Get(ROW);
365      get.setMaxResultsPerColumnFamily(12);
366      get.addFamily(FAMILIES[1]);
367      get.addFamily(FAMILIES[2]);
368      result = ht.get(get);
369      kvListExp = new ArrayList<>();
370      // Exp: CF1:q0, ..., q9, CF2: q0, q1, q10, q11, ..., q19
371      for (int i = 0; i < 10; i++) {
372        kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[i], 1, VALUE));
373      }
374      for (int i = 0; i < 2; i++) {
375        kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE));
376      }
377      for (int i = 10; i < 20; i++) {
378        kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE));
379      }
380      verifyResult(result, kvListExp, toLog, "Testing multiple CFs");
381
382      // Filters: ColumnRangeFilter and ColumnPrefixFilter
383      get = new Get(ROW);
384      get.setMaxResultsPerColumnFamily(3);
385      get.setFilter(new ColumnRangeFilter(QUALIFIERS[2], true, null, true));
386      result = ht.get(get);
387      kvListExp = new ArrayList<>();
388      for (int i = 2; i < 5; i++) {
389        kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[i], 1, VALUE));
390      }
391      for (int i = 2; i < 5; i++) {
392        kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[i], 1, VALUE));
393      }
394      for (int i = 2; i < 5; i++) {
395        kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE));
396      }
397      verifyResult(result, kvListExp, toLog, "Testing multiple CFs + CRF");
398
399      get = new Get(ROW);
400      get.setMaxResultsPerColumnFamily(7);
401      get.setFilter(new ColumnPrefixFilter(QUALIFIERS[1]));
402      result = ht.get(get);
403      kvListExp = new ArrayList<>();
404      kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[1], 1, VALUE));
405      kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[1], 1, VALUE));
406      kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[1], 1, VALUE));
407      for (int i = 10; i < 16; i++) {
408        kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE));
409      }
410      verifyResult(result, kvListExp, toLog, "Testing multiple CFs + PFF");
411    }
412
413  }
414
415  /**
416   * Test from client side for scan with maxResultPerCF set
417   */
418  @TestTemplate
419  public void testScanMaxResults() throws Exception {
420    byte[][] ROWS = HTestConst.makeNAscii(ROW, 2);
421    byte[][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3);
422    byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 10);
423
424    TEST_UTIL.createTable(tableName, FAMILIES);
425    try (Connection conn = getConnection(); Table ht = conn.getTable(tableName)) {
426      Put put;
427      Scan scan;
428      Result result;
429      boolean toLog = true;
430      List<Cell> kvListExp, kvListScan;
431
432      kvListExp = new ArrayList<>();
433
434      for (int r = 0; r < ROWS.length; r++) {
435        put = new Put(ROWS[r]);
436        for (int c = 0; c < FAMILIES.length; c++) {
437          for (int q = 0; q < QUALIFIERS.length; q++) {
438            KeyValue kv = new KeyValue(ROWS[r], FAMILIES[c], QUALIFIERS[q], 1, VALUE);
439            put.add(kv);
440            if (q < 4) {
441              kvListExp.add(kv);
442            }
443          }
444        }
445        ht.put(put);
446      }
447
448      scan = new Scan();
449      scan.setMaxResultsPerColumnFamily(4);
450      ResultScanner scanner = ht.getScanner(scan);
451      kvListScan = new ArrayList<>();
452      while ((result = scanner.next()) != null) {
453        for (Cell kv : result.listCells()) {
454          kvListScan.add(kv);
455        }
456      }
457      result = Result.create(kvListScan);
458      verifyResult(result, kvListExp, toLog, "Testing scan with maxResults");
459    }
460  }
461
462  /**
463   * Test from client side for get with rowOffset
464   */
465  @TestTemplate
466  public void testGetRowOffset() throws Exception {
467    byte[][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3);
468    byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 20);
469
470    TEST_UTIL.createTable(tableName, FAMILIES);
471    try (Connection conn = getConnection(); Table ht = conn.getTable(tableName)) {
472      Get get;
473      Put put;
474      Result result;
475      boolean toLog = true;
476      List<Cell> kvListExp;
477
478      // Insert one CF for row
479      kvListExp = new ArrayList<>();
480      put = new Put(ROW);
481      for (int i = 0; i < 10; i++) {
482        KeyValue kv = new KeyValue(ROW, FAMILIES[0], QUALIFIERS[i], 1, VALUE);
483        put.add(kv);
484        // skipping first two kvs
485        if (i < 2) {
486          continue;
487        }
488        kvListExp.add(kv);
489      }
490      ht.put(put);
491
492      // setting offset to 2
493      get = new Get(ROW);
494      get.setRowOffsetPerColumnFamily(2);
495      result = ht.get(get);
496      verifyResult(result, kvListExp, toLog, "Testing basic setRowOffset");
497
498      // setting offset to 20
499      get = new Get(ROW);
500      get.setRowOffsetPerColumnFamily(20);
501      result = ht.get(get);
502      kvListExp = new ArrayList<>();
503      verifyResult(result, kvListExp, toLog, "Testing offset > #kvs");
504
505      // offset + maxResultPerCF
506      get = new Get(ROW);
507      get.setRowOffsetPerColumnFamily(4);
508      get.setMaxResultsPerColumnFamily(5);
509      result = ht.get(get);
510      kvListExp = new ArrayList<>();
511      for (int i = 4; i < 9; i++) {
512        kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[i], 1, VALUE));
513      }
514      verifyResult(result, kvListExp, toLog, "Testing offset + setMaxResultsPerCF");
515
516      // Filters: ColumnRangeFilter
517      get = new Get(ROW);
518      get.setRowOffsetPerColumnFamily(1);
519      get.setFilter(new ColumnRangeFilter(QUALIFIERS[2], true, QUALIFIERS[5], true));
520      result = ht.get(get);
521      kvListExp = new ArrayList<>();
522      kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[3], 1, VALUE));
523      kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[4], 1, VALUE));
524      kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[5], 1, VALUE));
525      verifyResult(result, kvListExp, toLog, "Testing offset with CRF");
526
527      // Insert into two more CFs for row
528      // 10 columns for CF2, 10 columns for CF1
529      for (int j = 2; j > 0; j--) {
530        put = new Put(ROW);
531        for (int i = 0; i < 10; i++) {
532          KeyValue kv = new KeyValue(ROW, FAMILIES[j], QUALIFIERS[i], 1, VALUE);
533          put.add(kv);
534        }
535        ht.put(put);
536      }
537
538      get = new Get(ROW);
539      get.setRowOffsetPerColumnFamily(4);
540      get.setMaxResultsPerColumnFamily(2);
541      get.addFamily(FAMILIES[1]);
542      get.addFamily(FAMILIES[2]);
543      result = ht.get(get);
544      kvListExp = new ArrayList<>();
545      // Exp: CF1:q4, q5, CF2: q4, q5
546      kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[4], 1, VALUE));
547      kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[5], 1, VALUE));
548      kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[4], 1, VALUE));
549      kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[5], 1, VALUE));
550      verifyResult(result, kvListExp, toLog, "Testing offset + multiple CFs + maxResults");
551    }
552  }
553
554  @TestTemplate
555  public void testRawScanExpiredCell() throws Exception {
556    TEST_UTIL.createTable(tableName, FAMILY);
557    try (Connection conn = getConnection(); Table table = conn.getTable(tableName)) {
558      final Put put = new Put(ROW);
559      put.addColumn(FAMILY, QUALIFIER, VALUE);
560      put.setTTL(0);
561      table.put(put);
562      final Scan scan = new Scan().setRaw(true);
563      try (final ResultScanner scanner = table.getScanner(scan)) {
564        final Result result = scanner.next();
565        assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER));
566        assertNull(scanner.next());
567      }
568    }
569  }
570
571  @TestTemplate
572  public void testScanRawDeleteFamilyVersion() throws Exception {
573    TEST_UTIL.createTable(tableName, FAMILY);
574    Configuration conf = getClientConf();
575    conf.set(RPC_CODEC_CONF_KEY, "");
576    conf.set(DEFAULT_CODEC_CLASS, "");
577    try (Connection connection = ConnectionFactory.createConnection(conf);
578      Table table = connection.getTable(tableName)) {
579      Delete delete = new Delete(ROW);
580      delete.addFamilyVersion(FAMILY, 0L);
581      table.delete(delete);
582      Scan scan = new Scan().withStartRow(ROW).setRaw(true);
583      ResultScanner scanner = table.getScanner(scan);
584      int count = 0;
585      while (scanner.next() != null) {
586        count++;
587      }
588      assertEquals(1, count);
589    }
590  }
591
592  /**
593   * Test from client side for scan while the region is reopened on the same region server.
594   */
595  @TestTemplate
596  public void testScanOnReopenedRegion() throws Exception {
597    byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 2);
598
599    TEST_UTIL.createTable(tableName, FAMILY);
600    try (Connection conn = getConnection(); Table ht = conn.getTable(tableName)) {
601      Put put;
602      Scan scan;
603      Result result;
604      ResultScanner scanner;
605      boolean toLog = false;
606      List<Cell> kvListExp;
607
608      // table: row, family, c0:0, c1:1
609      put = new Put(ROW);
610      for (int i = 0; i < QUALIFIERS.length; i++) {
611        KeyValue kv = new KeyValue(ROW, FAMILY, QUALIFIERS[i], i, VALUE);
612        put.add(kv);
613      }
614      ht.put(put);
615
616      scan = new Scan().withStartRow(ROW);
617      scanner = ht.getScanner(scan);
618
619      HRegionLocation loc;
620
621      try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName)) {
622        loc = locator.getRegionLocation(ROW);
623      }
624      RegionInfo hri = loc.getRegion();
625      SingleProcessHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster();
626      byte[] regionName = hri.getRegionName();
627      int i = cluster.getServerWith(regionName);
628      HRegionServer rs = cluster.getRegionServer(i);
629      LOG.info("Unassigning " + hri);
630      TEST_UTIL.getAdmin().unassign(hri.getRegionName(), true);
631      long startTime = EnvironmentEdgeManager.currentTime();
632      long timeOut = 10000;
633      boolean offline = false;
634      while (true) {
635        if (rs.getOnlineRegion(regionName) == null) {
636          offline = true;
637          break;
638        }
639        assertTrue(EnvironmentEdgeManager.currentTime() < startTime + timeOut,
640          "Timed out in closing the testing region");
641      }
642      assertTrue(offline);
643      LOG.info("Assigning " + hri);
644      TEST_UTIL.getAdmin().assign(hri.getRegionName());
645      startTime = EnvironmentEdgeManager.currentTime();
646      while (true) {
647        rs = cluster.getRegionServer(cluster.getServerWith(regionName));
648        if (rs != null && rs.getOnlineRegion(regionName) != null) {
649          offline = false;
650          break;
651        }
652        assertTrue(EnvironmentEdgeManager.currentTime() < startTime + timeOut,
653          "Timed out in open the testing region");
654      }
655      assertFalse(offline);
656
657      // c0:0, c1:1
658      kvListExp = new ArrayList<>();
659      kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[0], 0, VALUE));
660      kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[1], 1, VALUE));
661      result = scanner.next();
662      verifyResult(result, kvListExp, toLog, "Testing scan on re-opened region");
663    }
664  }
665
666  static void verifyResult(Result result, List<Cell> expKvList, boolean toLog, String msg) {
667    LOG.info(msg);
668    LOG.info("Expected count: " + expKvList.size());
669    LOG.info("Actual count: " + result.size());
670    if (expKvList.isEmpty()) {
671      return;
672    }
673
674    int i = 0;
675    for (Cell kv : result.rawCells()) {
676      if (i >= expKvList.size()) {
677        break; // we will check the size later
678      }
679
680      Cell kvExp = expKvList.get(i++);
681      if (toLog) {
682        LOG.info("get kv is: " + kv.toString());
683        LOG.info("exp kv is: " + kvExp.toString());
684      }
685      assertTrue(kvExp.equals(kv), "Not equal");
686    }
687
688    assertEquals(expKvList.size(), result.size());
689  }
690
691  @TestTemplate
692  public void testReadExpiredDataForRawScan() throws IOException {
693    long ts = EnvironmentEdgeManager.currentTime() - 10000;
694    byte[] value = Bytes.toBytes("expired");
695    TEST_UTIL.createTable(tableName, FAMILY);
696    try (Connection conn = getConnection(); Table table = conn.getTable(tableName)) {
697      table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, ts, value));
698      assertArrayEquals(value, table.get(new Get(ROW)).getValue(FAMILY, QUALIFIER));
699      TEST_UTIL.getAdmin().modifyColumnFamily(tableName,
700        ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setTimeToLive(5).build());
701      try (ResultScanner scanner = table.getScanner(FAMILY)) {
702        assertNull(scanner.next());
703      }
704      try (ResultScanner scanner = table.getScanner(new Scan().setRaw(true))) {
705        assertArrayEquals(value, scanner.next().getValue(FAMILY, QUALIFIER));
706        assertNull(scanner.next());
707      }
708    }
709  }
710
711  @TestTemplate
712  public void testScanWithColumnsAndFilterAndVersion() throws IOException {
713    long now = EnvironmentEdgeManager.currentTime();
714    TEST_UTIL.createTable(tableName, FAMILY, 4);
715    try (Connection conn = getConnection(); Table table = conn.getTable(tableName)) {
716      for (int i = 0; i < 4; i++) {
717        Put put = new Put(ROW);
718        put.addColumn(FAMILY, QUALIFIER, now + i, VALUE);
719        table.put(put);
720      }
721
722      Scan scan = new Scan();
723      scan.addColumn(FAMILY, QUALIFIER);
724      scan.setFilter(new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(QUALIFIER)));
725      scan.readVersions(3);
726
727      try (ResultScanner scanner = table.getScanner(scan)) {
728        Result result = scanner.next();
729        assertEquals(3, result.size());
730      }
731    }
732  }
733
734  @TestTemplate
735  public void testScanWithSameStartRowStopRow() throws IOException {
736    TEST_UTIL.createTable(tableName, FAMILY);
737    try (Connection conn = getConnection(); Table table = conn.getTable(tableName)) {
738      table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE));
739
740      Scan scan = new Scan().withStartRow(ROW).withStopRow(ROW);
741      try (ResultScanner scanner = table.getScanner(scan)) {
742        assertNull(scanner.next());
743      }
744
745      scan = new Scan().withStartRow(ROW, true).withStopRow(ROW, true);
746      try (ResultScanner scanner = table.getScanner(scan)) {
747        Result result = scanner.next();
748        assertNotNull(result);
749        assertArrayEquals(ROW, result.getRow());
750        assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER));
751        assertNull(scanner.next());
752      }
753
754      scan = new Scan().withStartRow(ROW, true).withStopRow(ROW, false);
755      try (ResultScanner scanner = table.getScanner(scan)) {
756        assertNull(scanner.next());
757      }
758
759      scan = new Scan().withStartRow(ROW, false).withStopRow(ROW, false);
760      try (ResultScanner scanner = table.getScanner(scan)) {
761        assertNull(scanner.next());
762      }
763
764      scan = new Scan().withStartRow(ROW, false).withStopRow(ROW, true);
765      try (ResultScanner scanner = table.getScanner(scan)) {
766        assertNull(scanner.next());
767      }
768    }
769  }
770
771  @TestTemplate
772  public void testReverseScanWithFlush() throws Exception {
773    final int BATCH_SIZE = 10;
774    final int ROWS_TO_INSERT = 100;
775    final byte[] LARGE_VALUE = generateHugeValue(128 * 1024);
776    TEST_UTIL.createTable(tableName, FAMILY);
777    try (Connection conn = getConnection(); Table table = conn.getTable(tableName);
778      Admin admin = TEST_UTIL.getAdmin()) {
779      List<Put> putList = new ArrayList<>();
780      for (long i = 0; i < ROWS_TO_INSERT; i++) {
781        Put put = new Put(Bytes.toBytes(i));
782        put.addColumn(FAMILY, QUALIFIER, LARGE_VALUE);
783        putList.add(put);
784
785        if (putList.size() >= BATCH_SIZE) {
786          table.put(putList);
787          admin.flush(tableName);
788          putList.clear();
789        }
790      }
791
792      if (!putList.isEmpty()) {
793        table.put(putList);
794        admin.flush(tableName);
795        putList.clear();
796      }
797
798      Scan scan = new Scan();
799      scan.setReversed(true);
800      int count = 0;
801
802      try (ResultScanner results = table.getScanner(scan)) {
803        while (results.next() != null) {
804          count++;
805        }
806      }
807      assertEquals(ROWS_TO_INSERT, count,
808        "Expected " + ROWS_TO_INSERT + " rows in the table but it is " + count);
809    }
810  }
811
812  @TestTemplate
813  public void testScannerWithPartialResults() throws Exception {
814    TEST_UTIL.createMultiRegionTable(tableName, Bytes.toBytes("c"), 4);
815    try (Connection conn = getConnection(); Table table = conn.getTable(tableName)) {
816      List<Put> puts = new ArrayList<>();
817      byte[] largeArray = new byte[10000];
818      Put put = new Put(Bytes.toBytes("aaaa0"));
819      put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("1"), Bytes.toBytes("1"));
820      put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("2"), Bytes.toBytes("2"));
821      put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("3"), Bytes.toBytes("3"));
822      put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("4"), Bytes.toBytes("4"));
823      puts.add(put);
824      put = new Put(Bytes.toBytes("aaaa1"));
825      put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("1"), Bytes.toBytes("1"));
826      put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("2"), largeArray);
827      put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("3"), largeArray);
828      puts.add(put);
829      table.put(puts);
830      Scan scan = new Scan();
831      scan.addFamily(Bytes.toBytes("c"));
832      scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, tableName.getName());
833      scan.setMaxResultSize(10001);
834      scan.withStopRow(Bytes.toBytes("bbbb"));
835      scan.setFilter(new LimitKVsReturnFilter());
836      try (ResultScanner rs = table.getScanner(scan)) {
837        Result result;
838        int expectedKvNumber = 6;
839        int returnedKvNumber = 0;
840        while ((result = rs.next()) != null) {
841          returnedKvNumber += result.listCells().size();
842        }
843        assertEquals(expectedKvNumber, returnedKvNumber);
844      }
845    }
846  }
847
848  public static class LimitKVsReturnFilter extends FilterBase {
849
850    private int cellCount = 0;
851
852    @Override
853    public ReturnCode filterCell(Cell v) throws IOException {
854      if (cellCount >= 6) {
855        cellCount++;
856        return ReturnCode.SKIP;
857      }
858      cellCount++;
859      return ReturnCode.INCLUDE;
860    }
861
862    @Override
863    public boolean filterAllRemaining() throws IOException {
864      if (cellCount < 7) {
865        return false;
866      }
867      cellCount++;
868      return true;
869    }
870
871    @Override
872    public String toString() {
873      return this.getClass().getSimpleName();
874    }
875
876    public static LimitKVsReturnFilter parseFrom(final byte[] pbBytes)
877      throws DeserializationException {
878      return new LimitKVsReturnFilter();
879    }
880  }
881}