001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.Collections;
027import java.util.List;
028import java.util.Map;
029import java.util.NavigableSet;
030import org.apache.hadoop.fs.FileSystem;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.Cell;
033import org.apache.hadoop.hbase.CellComparatorImpl;
034import org.apache.hadoop.hbase.HBaseClassTestRule;
035import org.apache.hadoop.hbase.HBaseTestingUtility;
036import org.apache.hadoop.hbase.HColumnDescriptor;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.HTableDescriptor;
039import org.apache.hadoop.hbase.KeepDeletedCells;
040import org.apache.hadoop.hbase.KeyValue;
041import org.apache.hadoop.hbase.KeyValueUtil;
042import org.apache.hadoop.hbase.TableName;
043import org.apache.hadoop.hbase.client.Put;
044import org.apache.hadoop.hbase.client.Result;
045import org.apache.hadoop.hbase.client.Scan;
046import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
047import org.apache.hadoop.hbase.filter.Filter;
048import org.apache.hadoop.hbase.filter.FilterList;
049import org.apache.hadoop.hbase.filter.FilterList.Operator;
050import org.apache.hadoop.hbase.filter.PageFilter;
051import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
052import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
053import org.apache.hadoop.hbase.io.hfile.CacheConfig;
054import org.apache.hadoop.hbase.io.hfile.HFileContext;
055import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
056import org.apache.hadoop.hbase.testclassification.MediumTests;
057import org.apache.hadoop.hbase.testclassification.RegionServerTests;
058import org.apache.hadoop.hbase.util.Bytes;
059import org.apache.hadoop.hbase.util.Pair;
060import org.junit.BeforeClass;
061import org.junit.ClassRule;
062import org.junit.Rule;
063import org.junit.Test;
064import org.junit.experimental.categories.Category;
065import org.junit.rules.TestName;
066import org.slf4j.Logger;
067import org.slf4j.LoggerFactory;
068
069import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
070
071/**
072 * Test cases against ReversibleKeyValueScanner
073 */
074@Category({RegionServerTests.class, MediumTests.class})
075public class TestReversibleScanners {
076
077  @ClassRule
078  public static final HBaseClassTestRule CLASS_RULE =
079      HBaseClassTestRule.forClass(TestReversibleScanners.class);
080
081  private static final Logger LOG = LoggerFactory.getLogger(TestReversibleScanners.class);
082  HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
083
084  private static byte[] FAMILYNAME = Bytes.toBytes("testCf");
085  private static long TS = System.currentTimeMillis();
086  private static int MAXMVCC = 7;
087  private static byte[] ROW = Bytes.toBytes("testRow");
088  private static final int ROWSIZE = 200;
089  private static byte[][] ROWS = makeN(ROW, ROWSIZE);
090  private static byte[] QUAL = Bytes.toBytes("testQual");
091  private static final int QUALSIZE = 5;
092  private static byte[][] QUALS = makeN(QUAL, QUALSIZE);
093  private static byte[] VALUE = Bytes.toBytes("testValue");
094  private static final int VALUESIZE = 3;
095  private static byte[][] VALUES = makeN(VALUE, VALUESIZE);
096
097  @Rule
098  public TestName name = new TestName();
099
100  @BeforeClass
101  public static void setUp() {
102    ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
103  }
104  @Test
105  public void testReversibleStoreFileScanner() throws IOException {
106    FileSystem fs = TEST_UTIL.getTestFileSystem();
107    Path hfilePath = new Path(new Path(
108        TEST_UTIL.getDataTestDir("testReversibleStoreFileScanner"),
109        "regionname"), "familyname");
110    CacheConfig cacheConf = new CacheConfig(TEST_UTIL.getConfiguration());
111    for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
112      HFileContextBuilder hcBuilder = new HFileContextBuilder();
113      hcBuilder.withBlockSize(2 * 1024);
114      hcBuilder.withDataBlockEncoding(encoding);
115      HFileContext hFileContext = hcBuilder.build();
116      StoreFileWriter writer = new StoreFileWriter.Builder(
117          TEST_UTIL.getConfiguration(), cacheConf, fs).withOutputDir(hfilePath)
118          .withFileContext(hFileContext).build();
119      writeStoreFile(writer);
120
121      HStoreFile sf = new HStoreFile(fs, writer.getPath(), TEST_UTIL.getConfiguration(), cacheConf,
122          BloomType.NONE, true);
123
124      List<StoreFileScanner> scanners = StoreFileScanner
125          .getScannersForStoreFiles(Collections.singletonList(sf),
126              false, true, false, false, Long.MAX_VALUE);
127      StoreFileScanner scanner = scanners.get(0);
128      seekTestOfReversibleKeyValueScanner(scanner);
129      for (int readPoint = 0; readPoint < MAXMVCC; readPoint++) {
130        LOG.info("Setting read point to " + readPoint);
131        scanners = StoreFileScanner.getScannersForStoreFiles(
132            Collections.singletonList(sf), false, true, false, false, readPoint);
133        seekTestOfReversibleKeyValueScannerWithMVCC(scanners, readPoint);
134      }
135    }
136
137  }
138
139  @Test
140  public void testReversibleMemstoreScanner() throws IOException {
141    MemStore memstore = new DefaultMemStore();
142    writeMemstore(memstore);
143    List<KeyValueScanner> scanners = memstore.getScanners(Long.MAX_VALUE);
144    seekTestOfReversibleKeyValueScanner(scanners.get(0));
145    for (int readPoint = 0; readPoint < MAXMVCC; readPoint++) {
146      LOG.info("Setting read point to " + readPoint);
147      scanners = memstore.getScanners(readPoint);
148      seekTestOfReversibleKeyValueScannerWithMVCC(scanners, readPoint);
149    }
150
151  }
152
153  @Test
154  public void testReversibleKeyValueHeap() throws IOException {
155    // write data to one memstore and two store files
156    FileSystem fs = TEST_UTIL.getTestFileSystem();
157    Path hfilePath = new Path(new Path(
158        TEST_UTIL.getDataTestDir("testReversibleKeyValueHeap"), "regionname"),
159        "familyname");
160    CacheConfig cacheConf = new CacheConfig(TEST_UTIL.getConfiguration());
161    HFileContextBuilder hcBuilder = new HFileContextBuilder();
162    hcBuilder.withBlockSize(2 * 1024);
163    HFileContext hFileContext = hcBuilder.build();
164    StoreFileWriter writer1 = new StoreFileWriter.Builder(
165        TEST_UTIL.getConfiguration(), cacheConf, fs).withOutputDir(
166        hfilePath).withFileContext(hFileContext).build();
167    StoreFileWriter writer2 = new StoreFileWriter.Builder(
168        TEST_UTIL.getConfiguration(), cacheConf, fs).withOutputDir(
169        hfilePath).withFileContext(hFileContext).build();
170
171    MemStore memstore = new DefaultMemStore();
172    writeMemstoreAndStoreFiles(memstore, new StoreFileWriter[] { writer1,
173        writer2 });
174
175    HStoreFile sf1 = new HStoreFile(fs, writer1.getPath(), TEST_UTIL.getConfiguration(), cacheConf,
176        BloomType.NONE, true);
177
178    HStoreFile sf2 = new HStoreFile(fs, writer2.getPath(), TEST_UTIL.getConfiguration(), cacheConf,
179        BloomType.NONE, true);
180    /**
181     * Test without MVCC
182     */
183    int startRowNum = ROWSIZE / 2;
184    ReversedKeyValueHeap kvHeap = getReversibleKeyValueHeap(memstore, sf1, sf2,
185        ROWS[startRowNum], MAXMVCC);
186    internalTestSeekAndNextForReversibleKeyValueHeap(kvHeap, startRowNum);
187
188    startRowNum = ROWSIZE - 1;
189    kvHeap = getReversibleKeyValueHeap(memstore, sf1, sf2,
190        HConstants.EMPTY_START_ROW, MAXMVCC);
191    internalTestSeekAndNextForReversibleKeyValueHeap(kvHeap, startRowNum);
192
193    /**
194     * Test with MVCC
195     */
196    for (int readPoint = 0; readPoint < MAXMVCC; readPoint++) {
197      LOG.info("Setting read point to " + readPoint);
198      startRowNum = ROWSIZE - 1;
199      kvHeap = getReversibleKeyValueHeap(memstore, sf1, sf2,
200          HConstants.EMPTY_START_ROW, readPoint);
201      for (int i = startRowNum; i >= 0; i--) {
202        if (i - 2 < 0) break;
203        i = i - 2;
204        kvHeap.seekToPreviousRow(KeyValueUtil.createFirstOnRow(ROWS[i + 1]));
205        Pair<Integer, Integer> nextReadableNum = getNextReadableNumWithBackwardScan(
206            i, 0, readPoint);
207        if (nextReadableNum == null) break;
208        KeyValue expecedKey = makeKV(nextReadableNum.getFirst(),
209            nextReadableNum.getSecond());
210        assertEquals(expecedKey, kvHeap.peek());
211        i = nextReadableNum.getFirst();
212        int qualNum = nextReadableNum.getSecond();
213        if (qualNum + 1 < QUALSIZE) {
214          kvHeap.backwardSeek(makeKV(i, qualNum + 1));
215          nextReadableNum = getNextReadableNumWithBackwardScan(i, qualNum + 1,
216              readPoint);
217          if (nextReadableNum == null) break;
218          expecedKey = makeKV(nextReadableNum.getFirst(),
219              nextReadableNum.getSecond());
220          assertEquals(expecedKey, kvHeap.peek());
221          i = nextReadableNum.getFirst();
222          qualNum = nextReadableNum.getSecond();
223        }
224
225        kvHeap.next();
226
227        if (qualNum + 1 >= QUALSIZE) {
228          nextReadableNum = getNextReadableNumWithBackwardScan(i - 1, 0,
229              readPoint);
230        } else {
231          nextReadableNum = getNextReadableNumWithBackwardScan(i, qualNum + 1,
232              readPoint);
233        }
234        if (nextReadableNum == null) break;
235        expecedKey = makeKV(nextReadableNum.getFirst(),
236            nextReadableNum.getSecond());
237        assertEquals(expecedKey, kvHeap.peek());
238        i = nextReadableNum.getFirst();
239      }
240    }
241  }
242
243  @Test
244  public void testReversibleStoreScanner() throws IOException {
245    // write data to one memstore and two store files
246    FileSystem fs = TEST_UTIL.getTestFileSystem();
247    Path hfilePath = new Path(new Path(
248        TEST_UTIL.getDataTestDir("testReversibleStoreScanner"), "regionname"),
249        "familyname");
250    CacheConfig cacheConf = new CacheConfig(TEST_UTIL.getConfiguration());
251    HFileContextBuilder hcBuilder = new HFileContextBuilder();
252    hcBuilder.withBlockSize(2 * 1024);
253    HFileContext hFileContext = hcBuilder.build();
254    StoreFileWriter writer1 = new StoreFileWriter.Builder(
255        TEST_UTIL.getConfiguration(), cacheConf, fs).withOutputDir(
256        hfilePath).withFileContext(hFileContext).build();
257    StoreFileWriter writer2 = new StoreFileWriter.Builder(
258        TEST_UTIL.getConfiguration(), cacheConf, fs).withOutputDir(
259        hfilePath).withFileContext(hFileContext).build();
260
261    MemStore memstore = new DefaultMemStore();
262    writeMemstoreAndStoreFiles(memstore, new StoreFileWriter[] { writer1,
263        writer2 });
264
265    HStoreFile sf1 = new HStoreFile(fs, writer1.getPath(), TEST_UTIL.getConfiguration(), cacheConf,
266        BloomType.NONE, true);
267
268    HStoreFile sf2 = new HStoreFile(fs, writer2.getPath(), TEST_UTIL.getConfiguration(), cacheConf,
269        BloomType.NONE, true);
270
271    ScanInfo scanInfo =
272        new ScanInfo(TEST_UTIL.getConfiguration(), FAMILYNAME, 0, Integer.MAX_VALUE, Long.MAX_VALUE,
273            KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, CellComparatorImpl.COMPARATOR, false);
274
275    // Case 1.Test a full reversed scan
276    Scan scan = new Scan();
277    scan.setReversed(true);
278    StoreScanner storeScanner =
279        getReversibleStoreScanner(memstore, sf1, sf2, scan, scanInfo, MAXMVCC);
280    verifyCountAndOrder(storeScanner, QUALSIZE * ROWSIZE, ROWSIZE, false);
281
282    // Case 2.Test reversed scan with a specified start row
283    int startRowNum = ROWSIZE / 2;
284    byte[] startRow = ROWS[startRowNum];
285    scan.withStartRow(startRow);
286    storeScanner = getReversibleStoreScanner(memstore, sf1, sf2, scan, scanInfo, MAXMVCC);
287    verifyCountAndOrder(storeScanner, QUALSIZE * (startRowNum + 1),
288        startRowNum + 1, false);
289
290    // Case 3.Test reversed scan with a specified start row and specified
291    // qualifiers
292    assertTrue(QUALSIZE > 2);
293    scan.addColumn(FAMILYNAME, QUALS[0]);
294    scan.addColumn(FAMILYNAME, QUALS[2]);
295    storeScanner = getReversibleStoreScanner(memstore, sf1, sf2, scan, scanInfo, MAXMVCC);
296    verifyCountAndOrder(storeScanner, 2 * (startRowNum + 1), startRowNum + 1,
297        false);
298
299    // Case 4.Test reversed scan with mvcc based on case 3
300    for (int readPoint = 0; readPoint < MAXMVCC; readPoint++) {
301      LOG.info("Setting read point to " + readPoint);
302      storeScanner = getReversibleStoreScanner(memstore, sf1, sf2, scan, scanInfo, readPoint);
303      int expectedRowCount = 0;
304      int expectedKVCount = 0;
305      for (int i = startRowNum; i >= 0; i--) {
306        int kvCount = 0;
307        if (makeMVCC(i, 0) <= readPoint) {
308          kvCount++;
309        }
310        if (makeMVCC(i, 2) <= readPoint) {
311          kvCount++;
312        }
313        if (kvCount > 0) {
314          expectedRowCount++;
315          expectedKVCount += kvCount;
316        }
317      }
318      verifyCountAndOrder(storeScanner, expectedKVCount, expectedRowCount,
319          false);
320    }
321  }
322
323  @Test
324  public void testReversibleRegionScanner() throws IOException {
325    byte[] FAMILYNAME2 = Bytes.toBytes("testCf2");
326    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName()))
327        .addFamily(new HColumnDescriptor(FAMILYNAME))
328        .addFamily(new HColumnDescriptor(FAMILYNAME2));
329    HRegion region = TEST_UTIL.createLocalHRegion(htd, null, null);
330    loadDataToRegion(region, FAMILYNAME2);
331
332    // verify row count with forward scan
333    Scan scan = new Scan();
334    InternalScanner scanner = region.getScanner(scan);
335    verifyCountAndOrder(scanner, ROWSIZE * QUALSIZE * 2, ROWSIZE, true);
336
337    // Case1:Full reversed scan
338    scan.setReversed(true);
339    scanner = region.getScanner(scan);
340    verifyCountAndOrder(scanner, ROWSIZE * QUALSIZE * 2, ROWSIZE, false);
341
342    // Case2:Full reversed scan with one family
343    scan = new Scan();
344    scan.setReversed(true);
345    scan.addFamily(FAMILYNAME);
346    scanner = region.getScanner(scan);
347    verifyCountAndOrder(scanner, ROWSIZE * QUALSIZE, ROWSIZE, false);
348
349    // Case3:Specify qualifiers + One family
350    byte[][] specifiedQualifiers = { QUALS[1], QUALS[2] };
351    for (byte[] specifiedQualifier : specifiedQualifiers)
352      scan.addColumn(FAMILYNAME, specifiedQualifier);
353    scanner = region.getScanner(scan);
354    verifyCountAndOrder(scanner, ROWSIZE * 2, ROWSIZE, false);
355
356    // Case4:Specify qualifiers + Two families
357    for (byte[] specifiedQualifier : specifiedQualifiers)
358      scan.addColumn(FAMILYNAME2, specifiedQualifier);
359    scanner = region.getScanner(scan);
360    verifyCountAndOrder(scanner, ROWSIZE * 2 * 2, ROWSIZE, false);
361
362    // Case5: Case4 + specify start row
363    int startRowNum = ROWSIZE * 3 / 4;
364    scan.withStartRow(ROWS[startRowNum]);
365    scanner = region.getScanner(scan);
366    verifyCountAndOrder(scanner, (startRowNum + 1) * 2 * 2, (startRowNum + 1),
367        false);
368
369    // Case6: Case4 + specify stop row
370    int stopRowNum = ROWSIZE / 4;
371    scan.withStartRow(HConstants.EMPTY_BYTE_ARRAY);
372    scan.withStopRow(ROWS[stopRowNum]);
373    scanner = region.getScanner(scan);
374    verifyCountAndOrder(scanner, (ROWSIZE - stopRowNum - 1) * 2 * 2, (ROWSIZE
375        - stopRowNum - 1), false);
376
377    // Case7: Case4 + specify start row + specify stop row
378    scan.withStartRow(ROWS[startRowNum]);
379    scanner = region.getScanner(scan);
380    verifyCountAndOrder(scanner, (startRowNum - stopRowNum) * 2 * 2,
381        (startRowNum - stopRowNum), false);
382
383    // Case8: Case7 + SingleColumnValueFilter
384    int valueNum = startRowNum % VALUESIZE;
385    Filter filter = new SingleColumnValueFilter(FAMILYNAME,
386        specifiedQualifiers[0], CompareOp.EQUAL, VALUES[valueNum]);
387    scan.setFilter(filter);
388    scanner = region.getScanner(scan);
389    int unfilteredRowNum = (startRowNum - stopRowNum) / VALUESIZE
390        + (stopRowNum / VALUESIZE == valueNum ? 0 : 1);
391    verifyCountAndOrder(scanner, unfilteredRowNum * 2 * 2, unfilteredRowNum,
392        false);
393
394    // Case9: Case7 + PageFilter
395    int pageSize = 10;
396    filter = new PageFilter(pageSize);
397    scan.setFilter(filter);
398    scanner = region.getScanner(scan);
399    int expectedRowNum = pageSize;
400    verifyCountAndOrder(scanner, expectedRowNum * 2 * 2, expectedRowNum, false);
401
402    // Case10: Case7 + FilterList+MUST_PASS_ONE
403    SingleColumnValueFilter scvFilter1 = new SingleColumnValueFilter(
404        FAMILYNAME, specifiedQualifiers[0], CompareOp.EQUAL, VALUES[0]);
405    SingleColumnValueFilter scvFilter2 = new SingleColumnValueFilter(
406        FAMILYNAME, specifiedQualifiers[0], CompareOp.EQUAL, VALUES[1]);
407    expectedRowNum = 0;
408    for (int i = startRowNum; i > stopRowNum; i--) {
409      if (i % VALUESIZE == 0 || i % VALUESIZE == 1) {
410        expectedRowNum++;
411      }
412    }
413    filter = new FilterList(Operator.MUST_PASS_ONE, scvFilter1, scvFilter2);
414    scan.setFilter(filter);
415    scanner = region.getScanner(scan);
416    verifyCountAndOrder(scanner, expectedRowNum * 2 * 2, expectedRowNum, false);
417
418    // Case10: Case7 + FilterList+MUST_PASS_ALL
419    filter = new FilterList(Operator.MUST_PASS_ALL, scvFilter1, scvFilter2);
420    expectedRowNum = 0;
421    scan.setFilter(filter);
422    scanner = region.getScanner(scan);
423    verifyCountAndOrder(scanner, expectedRowNum * 2 * 2, expectedRowNum, false);
424  }
425
426  private StoreScanner getReversibleStoreScanner(MemStore memstore, HStoreFile sf1, HStoreFile sf2,
427      Scan scan, ScanInfo scanInfo, int readPoint) throws IOException {
428    List<KeyValueScanner> scanners = getScanners(memstore, sf1, sf2, null, false, readPoint);
429    NavigableSet<byte[]> columns = null;
430    for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap().entrySet()) {
431      // Should only one family
432      columns = entry.getValue();
433    }
434    StoreScanner storeScanner = new ReversedStoreScanner(scan, scanInfo, columns, scanners);
435    return storeScanner;
436  }
437
438  private void verifyCountAndOrder(InternalScanner scanner,
439      int expectedKVCount, int expectedRowCount, boolean forward)
440      throws IOException {
441    List<Cell> kvList = new ArrayList<>();
442    Result lastResult = null;
443    int rowCount = 0;
444    int kvCount = 0;
445    try {
446      while (scanner.next(kvList)) {
447        if (kvList.isEmpty()) continue;
448        rowCount++;
449        kvCount += kvList.size();
450        if (lastResult != null) {
451          Result curResult = Result.create(kvList);
452          assertEquals("LastResult:" + lastResult + "CurResult:" + curResult,
453              forward,
454              Bytes.compareTo(curResult.getRow(), lastResult.getRow()) > 0);
455        }
456        lastResult = Result.create(kvList);
457        kvList.clear();
458      }
459    } finally {
460      scanner.close();
461    }
462    if (!kvList.isEmpty()) {
463      rowCount++;
464      kvCount += kvList.size();
465      kvList.clear();
466    }
467    assertEquals(expectedKVCount, kvCount);
468    assertEquals(expectedRowCount, rowCount);
469  }
470
471  private void internalTestSeekAndNextForReversibleKeyValueHeap(
472      ReversedKeyValueHeap kvHeap, int startRowNum) throws IOException {
473    // Test next and seek
474    for (int i = startRowNum; i >= 0; i--) {
475      if (i % 2 == 1 && i - 2 >= 0) {
476        i = i - 2;
477        kvHeap.seekToPreviousRow(KeyValueUtil.createFirstOnRow(ROWS[i + 1]));
478      }
479      for (int j = 0; j < QUALSIZE; j++) {
480        if (j % 2 == 1 && (j + 1) < QUALSIZE) {
481          j = j + 1;
482          kvHeap.backwardSeek(makeKV(i, j));
483        }
484        assertEquals(makeKV(i, j), kvHeap.peek());
485        kvHeap.next();
486      }
487    }
488    assertEquals(null, kvHeap.peek());
489  }
490
491  private ReversedKeyValueHeap getReversibleKeyValueHeap(MemStore memstore, HStoreFile sf1,
492      HStoreFile sf2, byte[] startRow, int readPoint) throws IOException {
493    List<KeyValueScanner> scanners = getScanners(memstore, sf1, sf2, startRow, true, readPoint);
494    ReversedKeyValueHeap kvHeap = new ReversedKeyValueHeap(scanners, CellComparatorImpl.COMPARATOR);
495    return kvHeap;
496  }
497
498  private List<KeyValueScanner> getScanners(MemStore memstore, HStoreFile sf1, HStoreFile sf2,
499      byte[] startRow, boolean doSeek, int readPoint) throws IOException {
500    List<StoreFileScanner> fileScanners = StoreFileScanner.getScannersForStoreFiles(
501      Lists.newArrayList(sf1, sf2), false, true, false, false, readPoint);
502    List<KeyValueScanner> memScanners = memstore.getScanners(readPoint);
503    List<KeyValueScanner> scanners = new ArrayList<>(fileScanners.size() + 1);
504    scanners.addAll(fileScanners);
505    scanners.addAll(memScanners);
506
507    if (doSeek) {
508      if (Bytes.equals(HConstants.EMPTY_START_ROW, startRow)) {
509        for (KeyValueScanner scanner : scanners) {
510          scanner.seekToLastRow();
511        }
512      } else {
513        KeyValue startKey = KeyValueUtil.createFirstOnRow(startRow);
514        for (KeyValueScanner scanner : scanners) {
515          scanner.backwardSeek(startKey);
516        }
517      }
518    }
519    return scanners;
520  }
521
522  private void seekTestOfReversibleKeyValueScanner(KeyValueScanner scanner)
523      throws IOException {
524    /**
525     * Test without MVCC
526     */
527    // Test seek to last row
528    assertTrue(scanner.seekToLastRow());
529    assertEquals(makeKV(ROWSIZE - 1, 0), scanner.peek());
530
531    // Test backward seek in three cases
532    // Case1: seek in the same row in backwardSeek
533    KeyValue seekKey = makeKV(ROWSIZE - 2, QUALSIZE - 2);
534    assertTrue(scanner.backwardSeek(seekKey));
535    assertEquals(seekKey, scanner.peek());
536
537    // Case2: seek to the previous row in backwardSeek
538    int seekRowNum = ROWSIZE - 2;
539    assertTrue(scanner.backwardSeek(KeyValueUtil.createLastOnRow(ROWS[seekRowNum])));
540    KeyValue expectedKey = makeKV(seekRowNum - 1, 0);
541    assertEquals(expectedKey, scanner.peek());
542
543    // Case3: unable to backward seek
544    assertFalse(scanner.backwardSeek(KeyValueUtil.createLastOnRow(ROWS[0])));
545    assertEquals(null, scanner.peek());
546
547    // Test seek to previous row
548    seekRowNum = ROWSIZE - 4;
549    assertTrue(scanner.seekToPreviousRow(KeyValueUtil
550        .createFirstOnRow(ROWS[seekRowNum])));
551    expectedKey = makeKV(seekRowNum - 1, 0);
552    assertEquals(expectedKey, scanner.peek());
553
554    // Test seek to previous row for the first row
555    assertFalse(scanner.seekToPreviousRow(makeKV(0, 0)));
556    assertEquals(null, scanner.peek());
557
558  }
559
560  private void seekTestOfReversibleKeyValueScannerWithMVCC(
561      List<? extends KeyValueScanner> scanners, int readPoint) throws IOException {
562  /**
563   * Test with MVCC
564   */
565    // Test seek to last row
566    KeyValue expectedKey = getNextReadableKeyValueWithBackwardScan(
567        ROWSIZE - 1, 0, readPoint);
568    boolean res = false;
569    for (KeyValueScanner scanner : scanners) {
570      res |= scanner.seekToLastRow();
571    }
572    assertEquals(expectedKey != null, res);
573    res = false;
574    for (KeyValueScanner scanner : scanners) {
575      res |= (expectedKey.equals(scanner.peek()));
576    }
577    assertTrue(res);
578
579      // Test backward seek in two cases
580      // Case1: seek in the same row in backwardSeek
581      expectedKey = getNextReadableKeyValueWithBackwardScan(ROWSIZE - 2,
582          QUALSIZE - 2, readPoint);
583    res = false;
584    for (KeyValueScanner scanner : scanners) {
585      res |= scanner.backwardSeek(expectedKey);
586    }
587    assertEquals(expectedKey != null, res);
588    res = false;
589    for (KeyValueScanner scanner : scanners) {
590      res |= (expectedKey.equals(scanner.peek()));
591    }
592    assertTrue(res);
593
594      // Case2: seek to the previous row in backwardSeek
595    int seekRowNum = ROWSIZE - 3;
596    res = false;
597    for (KeyValueScanner scanner : scanners) {
598      res |= scanner.backwardSeek(expectedKey);
599    }
600    res = false;
601    for (KeyValueScanner scanner : scanners) {
602      res |= (expectedKey.equals(scanner.peek()));
603    }
604    assertTrue(res);
605
606      // Test seek to previous row
607      seekRowNum = ROWSIZE - 4;
608      expectedKey = getNextReadableKeyValueWithBackwardScan(seekRowNum - 1, 0,
609          readPoint);
610    res = false;
611    for (KeyValueScanner scanner : scanners) {
612      res |= scanner.seekToPreviousRow(KeyValueUtil.createFirstOnRow(ROWS[seekRowNum]));
613    }
614    assertEquals(expectedKey != null, res);
615    res = false;
616    for (KeyValueScanner scanner : scanners) {
617      res |= (expectedKey.equals(scanner.peek()));
618    }
619    assertTrue(res);
620  }
621
622  private KeyValue getNextReadableKeyValueWithBackwardScan(int startRowNum,
623      int startQualNum, int readPoint) {
624    Pair<Integer, Integer> nextReadableNum = getNextReadableNumWithBackwardScan(
625        startRowNum, startQualNum, readPoint);
626    if (nextReadableNum == null)
627      return null;
628    return makeKV(nextReadableNum.getFirst(), nextReadableNum.getSecond());
629  }
630
631  private Pair<Integer, Integer> getNextReadableNumWithBackwardScan(
632      int startRowNum, int startQualNum, int readPoint) {
633    Pair<Integer, Integer> nextReadableNum = null;
634    boolean findExpected = false;
635    for (int i = startRowNum; i >= 0; i--) {
636      for (int j = (i == startRowNum ? startQualNum : 0); j < QUALSIZE; j++) {
637        if (makeMVCC(i, j) <= readPoint) {
638          nextReadableNum = new Pair<>(i, j);
639          findExpected = true;
640          break;
641        }
642      }
643      if (findExpected)
644        break;
645    }
646    return nextReadableNum;
647  }
648
649  private static void loadDataToRegion(HRegion region, byte[] additionalFamily)
650      throws IOException {
651    for (int i = 0; i < ROWSIZE; i++) {
652      Put put = new Put(ROWS[i]);
653      for (int j = 0; j < QUALSIZE; j++) {
654        put.add(makeKV(i, j));
655        // put additional family
656        put.add(makeKV(i, j, additionalFamily));
657      }
658      region.put(put);
659      if (i == ROWSIZE / 3 || i == ROWSIZE * 2 / 3) {
660        region.flush(true);
661      }
662    }
663  }
664
665  private static void writeMemstoreAndStoreFiles(MemStore memstore,
666      final StoreFileWriter[] writers) throws IOException {
667    try {
668      for (int i = 0; i < ROWSIZE; i++) {
669        for (int j = 0; j < QUALSIZE; j++) {
670          if (i % 2 == 0) {
671            memstore.add(makeKV(i, j), null);
672          } else {
673            writers[(i + j) % writers.length].append(makeKV(i, j));
674          }
675        }
676      }
677    } finally {
678      for (int i = 0; i < writers.length; i++) {
679        writers[i].close();
680      }
681    }
682  }
683
684  private static void writeStoreFile(final StoreFileWriter writer)
685      throws IOException {
686    try {
687      for (int i = 0; i < ROWSIZE; i++) {
688        for (int j = 0; j < QUALSIZE; j++) {
689          writer.append(makeKV(i, j));
690        }
691      }
692    } finally {
693      writer.close();
694    }
695  }
696
697  private static void writeMemstore(MemStore memstore) throws IOException {
698    // Add half of the keyvalues to memstore
699    for (int i = 0; i < ROWSIZE; i++) {
700      for (int j = 0; j < QUALSIZE; j++) {
701        if ((i + j) % 2 == 0) {
702          memstore.add(makeKV(i, j), null);
703        }
704      }
705    }
706    memstore.snapshot();
707    // Add another half of the keyvalues to snapshot
708    for (int i = 0; i < ROWSIZE; i++) {
709      for (int j = 0; j < QUALSIZE; j++) {
710        if ((i + j) % 2 == 1) {
711          memstore.add(makeKV(i, j), null);
712        }
713      }
714    }
715  }
716
717  private static KeyValue makeKV(int rowNum, int cqNum) {
718    return makeKV(rowNum, cqNum, FAMILYNAME);
719  }
720
721  private static KeyValue makeKV(int rowNum, int cqNum, byte[] familyName) {
722    KeyValue kv = new KeyValue(ROWS[rowNum], familyName, QUALS[cqNum], TS,
723        VALUES[rowNum % VALUESIZE]);
724    kv.setSequenceId(makeMVCC(rowNum, cqNum));
725    return kv;
726  }
727
728  private static long makeMVCC(int rowNum, int cqNum) {
729    return (rowNum + cqNum) % (MAXMVCC + 1);
730  }
731
732  private static byte[][] makeN(byte[] base, int n) {
733    byte[][] ret = new byte[n][];
734    for (int i = 0; i < n; i++) {
735      ret[i] = Bytes.add(base, Bytes.toBytes(String.format("%04d", i)));
736    }
737    return ret;
738  }
739}