001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.Collections;
027import java.util.List;
028import java.util.Map;
029import java.util.NavigableSet;
030import org.apache.hadoop.fs.FileSystem;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.Cell;
033import org.apache.hadoop.hbase.CellComparatorImpl;
034import org.apache.hadoop.hbase.HBaseClassTestRule;
035import org.apache.hadoop.hbase.HBaseTestingUtility;
036import org.apache.hadoop.hbase.HColumnDescriptor;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.HTableDescriptor;
039import org.apache.hadoop.hbase.KeepDeletedCells;
040import org.apache.hadoop.hbase.KeyValue;
041import org.apache.hadoop.hbase.KeyValueUtil;
042import org.apache.hadoop.hbase.TableName;
043import org.apache.hadoop.hbase.client.Put;
044import org.apache.hadoop.hbase.client.Result;
045import org.apache.hadoop.hbase.client.Scan;
046import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
047import org.apache.hadoop.hbase.filter.Filter;
048import org.apache.hadoop.hbase.filter.FilterList;
049import org.apache.hadoop.hbase.filter.FilterList.Operator;
050import org.apache.hadoop.hbase.filter.PageFilter;
051import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
052import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
053import org.apache.hadoop.hbase.io.hfile.CacheConfig;
054import org.apache.hadoop.hbase.io.hfile.HFileContext;
055import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
056import org.apache.hadoop.hbase.testclassification.MediumTests;
057import org.apache.hadoop.hbase.testclassification.RegionServerTests;
058import org.apache.hadoop.hbase.util.Bytes;
059import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
060import org.apache.hadoop.hbase.util.Pair;
061import org.junit.BeforeClass;
062import org.junit.ClassRule;
063import org.junit.Rule;
064import org.junit.Test;
065import org.junit.experimental.categories.Category;
066import org.junit.rules.TestName;
067import org.slf4j.Logger;
068import org.slf4j.LoggerFactory;
069
070import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
071
072/**
073 * Test cases against ReversibleKeyValueScanner
074 */
075@Category({ RegionServerTests.class, MediumTests.class })
076public class TestReversibleScanners {
077
078  @ClassRule
079  public static final HBaseClassTestRule CLASS_RULE =
080    HBaseClassTestRule.forClass(TestReversibleScanners.class);
081
082  private static final Logger LOG = LoggerFactory.getLogger(TestReversibleScanners.class);
083  HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
084
085  private static byte[] FAMILYNAME = Bytes.toBytes("testCf");
086  private static long TS = EnvironmentEdgeManager.currentTime();
087  private static int MAXMVCC = 7;
088  private static byte[] ROW = Bytes.toBytes("testRow");
089  private static final int ROWSIZE = 200;
090  private static byte[][] ROWS = makeN(ROW, ROWSIZE);
091  private static byte[] QUAL = Bytes.toBytes("testQual");
092  private static final int QUALSIZE = 5;
093  private static byte[][] QUALS = makeN(QUAL, QUALSIZE);
094  private static byte[] VALUE = Bytes.toBytes("testValue");
095  private static final int VALUESIZE = 3;
096  private static byte[][] VALUES = makeN(VALUE, VALUESIZE);
097
098  @Rule
099  public TestName name = new TestName();
100
101  @BeforeClass
102  public static void setUp() {
103    ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null,
104      MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
105  }
106
107  @Test
108  public void testReversibleStoreFileScanner() throws IOException {
109    FileSystem fs = TEST_UTIL.getTestFileSystem();
110    Path hfilePath =
111      new Path(new Path(TEST_UTIL.getDataTestDir("testReversibleStoreFileScanner"), "regionname"),
112        "familyname");
113    CacheConfig cacheConf = new CacheConfig(TEST_UTIL.getConfiguration());
114    for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
115      HFileContextBuilder hcBuilder = new HFileContextBuilder();
116      hcBuilder.withBlockSize(2 * 1024);
117      hcBuilder.withDataBlockEncoding(encoding);
118      HFileContext hFileContext = hcBuilder.build();
119      StoreFileWriter writer =
120        new StoreFileWriter.Builder(TEST_UTIL.getConfiguration(), cacheConf, fs)
121          .withOutputDir(hfilePath).withFileContext(hFileContext).build();
122      writeStoreFile(writer);
123
124      HStoreFile sf = new HStoreFile(fs, writer.getPath(), TEST_UTIL.getConfiguration(), cacheConf,
125        BloomType.NONE, true);
126
127      List<StoreFileScanner> scanners = StoreFileScanner.getScannersForStoreFiles(
128        Collections.singletonList(sf), false, true, false, false, Long.MAX_VALUE);
129      StoreFileScanner scanner = scanners.get(0);
130      seekTestOfReversibleKeyValueScanner(scanner);
131      for (int readPoint = 0; readPoint < MAXMVCC; readPoint++) {
132        LOG.info("Setting read point to " + readPoint);
133        scanners = StoreFileScanner.getScannersForStoreFiles(Collections.singletonList(sf), false,
134          true, false, false, readPoint);
135        seekTestOfReversibleKeyValueScannerWithMVCC(scanners, readPoint);
136      }
137    }
138
139  }
140
141  @Test
142  public void testReversibleMemstoreScanner() throws IOException {
143    MemStore memstore = new DefaultMemStore();
144    writeMemstore(memstore);
145    List<KeyValueScanner> scanners = memstore.getScanners(Long.MAX_VALUE);
146    seekTestOfReversibleKeyValueScanner(scanners.get(0));
147    for (int readPoint = 0; readPoint < MAXMVCC; readPoint++) {
148      LOG.info("Setting read point to " + readPoint);
149      scanners = memstore.getScanners(readPoint);
150      seekTestOfReversibleKeyValueScannerWithMVCC(scanners, readPoint);
151    }
152
153  }
154
155  @Test
156  public void testReversibleKeyValueHeap() throws IOException {
157    // write data to one memstore and two store files
158    FileSystem fs = TEST_UTIL.getTestFileSystem();
159    Path hfilePath = new Path(
160      new Path(TEST_UTIL.getDataTestDir("testReversibleKeyValueHeap"), "regionname"), "familyname");
161    CacheConfig cacheConf = new CacheConfig(TEST_UTIL.getConfiguration());
162    HFileContextBuilder hcBuilder = new HFileContextBuilder();
163    hcBuilder.withBlockSize(2 * 1024);
164    HFileContext hFileContext = hcBuilder.build();
165    StoreFileWriter writer1 =
166      new StoreFileWriter.Builder(TEST_UTIL.getConfiguration(), cacheConf, fs)
167        .withOutputDir(hfilePath).withFileContext(hFileContext).build();
168    StoreFileWriter writer2 =
169      new StoreFileWriter.Builder(TEST_UTIL.getConfiguration(), cacheConf, fs)
170        .withOutputDir(hfilePath).withFileContext(hFileContext).build();
171
172    MemStore memstore = new DefaultMemStore();
173    writeMemstoreAndStoreFiles(memstore, new StoreFileWriter[] { writer1, writer2 });
174
175    HStoreFile sf1 = new HStoreFile(fs, writer1.getPath(), TEST_UTIL.getConfiguration(), cacheConf,
176      BloomType.NONE, true);
177
178    HStoreFile sf2 = new HStoreFile(fs, writer2.getPath(), TEST_UTIL.getConfiguration(), cacheConf,
179      BloomType.NONE, true);
180    /**
181     * Test without MVCC
182     */
183    int startRowNum = ROWSIZE / 2;
184    ReversedKeyValueHeap kvHeap =
185      getReversibleKeyValueHeap(memstore, sf1, sf2, ROWS[startRowNum], MAXMVCC);
186    internalTestSeekAndNextForReversibleKeyValueHeap(kvHeap, startRowNum);
187
188    startRowNum = ROWSIZE - 1;
189    kvHeap = getReversibleKeyValueHeap(memstore, sf1, sf2, HConstants.EMPTY_START_ROW, MAXMVCC);
190    internalTestSeekAndNextForReversibleKeyValueHeap(kvHeap, startRowNum);
191
192    /**
193     * Test with MVCC
194     */
195    for (int readPoint = 0; readPoint < MAXMVCC; readPoint++) {
196      LOG.info("Setting read point to " + readPoint);
197      startRowNum = ROWSIZE - 1;
198      kvHeap = getReversibleKeyValueHeap(memstore, sf1, sf2, HConstants.EMPTY_START_ROW, readPoint);
199      for (int i = startRowNum; i >= 0; i--) {
200        if (i - 2 < 0) break;
201        i = i - 2;
202        kvHeap.seekToPreviousRow(KeyValueUtil.createFirstOnRow(ROWS[i + 1]));
203        Pair<Integer, Integer> nextReadableNum =
204          getNextReadableNumWithBackwardScan(i, 0, readPoint);
205        if (nextReadableNum == null) break;
206        KeyValue expecedKey = makeKV(nextReadableNum.getFirst(), nextReadableNum.getSecond());
207        assertEquals(expecedKey, kvHeap.peek());
208        i = nextReadableNum.getFirst();
209        int qualNum = nextReadableNum.getSecond();
210        if (qualNum + 1 < QUALSIZE) {
211          kvHeap.backwardSeek(makeKV(i, qualNum + 1));
212          nextReadableNum = getNextReadableNumWithBackwardScan(i, qualNum + 1, readPoint);
213          if (nextReadableNum == null) break;
214          expecedKey = makeKV(nextReadableNum.getFirst(), nextReadableNum.getSecond());
215          assertEquals(expecedKey, kvHeap.peek());
216          i = nextReadableNum.getFirst();
217          qualNum = nextReadableNum.getSecond();
218        }
219
220        kvHeap.next();
221
222        if (qualNum + 1 >= QUALSIZE) {
223          nextReadableNum = getNextReadableNumWithBackwardScan(i - 1, 0, readPoint);
224        } else {
225          nextReadableNum = getNextReadableNumWithBackwardScan(i, qualNum + 1, readPoint);
226        }
227        if (nextReadableNum == null) break;
228        expecedKey = makeKV(nextReadableNum.getFirst(), nextReadableNum.getSecond());
229        assertEquals(expecedKey, kvHeap.peek());
230        i = nextReadableNum.getFirst();
231      }
232    }
233  }
234
235  @Test
236  public void testReversibleStoreScanner() throws IOException {
237    // write data to one memstore and two store files
238    FileSystem fs = TEST_UTIL.getTestFileSystem();
239    Path hfilePath = new Path(
240      new Path(TEST_UTIL.getDataTestDir("testReversibleStoreScanner"), "regionname"), "familyname");
241    CacheConfig cacheConf = new CacheConfig(TEST_UTIL.getConfiguration());
242    HFileContextBuilder hcBuilder = new HFileContextBuilder();
243    hcBuilder.withBlockSize(2 * 1024);
244    HFileContext hFileContext = hcBuilder.build();
245    StoreFileWriter writer1 =
246      new StoreFileWriter.Builder(TEST_UTIL.getConfiguration(), cacheConf, fs)
247        .withOutputDir(hfilePath).withFileContext(hFileContext).build();
248    StoreFileWriter writer2 =
249      new StoreFileWriter.Builder(TEST_UTIL.getConfiguration(), cacheConf, fs)
250        .withOutputDir(hfilePath).withFileContext(hFileContext).build();
251
252    MemStore memstore = new DefaultMemStore();
253    writeMemstoreAndStoreFiles(memstore, new StoreFileWriter[] { writer1, writer2 });
254
255    HStoreFile sf1 = new HStoreFile(fs, writer1.getPath(), TEST_UTIL.getConfiguration(), cacheConf,
256      BloomType.NONE, true);
257
258    HStoreFile sf2 = new HStoreFile(fs, writer2.getPath(), TEST_UTIL.getConfiguration(), cacheConf,
259      BloomType.NONE, true);
260
261    ScanInfo scanInfo = new ScanInfo(TEST_UTIL.getConfiguration(), FAMILYNAME, 0, Integer.MAX_VALUE,
262      Long.MAX_VALUE, KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0,
263      CellComparatorImpl.COMPARATOR, false);
264
265    // Case 1.Test a full reversed scan
266    Scan scan = new Scan();
267    scan.setReversed(true);
268    StoreScanner storeScanner =
269      getReversibleStoreScanner(memstore, sf1, sf2, scan, scanInfo, MAXMVCC);
270    verifyCountAndOrder(storeScanner, QUALSIZE * ROWSIZE, ROWSIZE, false);
271
272    // Case 2.Test reversed scan with a specified start row
273    int startRowNum = ROWSIZE / 2;
274    byte[] startRow = ROWS[startRowNum];
275    scan.withStartRow(startRow);
276    storeScanner = getReversibleStoreScanner(memstore, sf1, sf2, scan, scanInfo, MAXMVCC);
277    verifyCountAndOrder(storeScanner, QUALSIZE * (startRowNum + 1), startRowNum + 1, false);
278
279    // Case 3.Test reversed scan with a specified start row and specified
280    // qualifiers
281    assertTrue(QUALSIZE > 2);
282    scan.addColumn(FAMILYNAME, QUALS[0]);
283    scan.addColumn(FAMILYNAME, QUALS[2]);
284    storeScanner = getReversibleStoreScanner(memstore, sf1, sf2, scan, scanInfo, MAXMVCC);
285    verifyCountAndOrder(storeScanner, 2 * (startRowNum + 1), startRowNum + 1, false);
286
287    // Case 4.Test reversed scan with mvcc based on case 3
288    for (int readPoint = 0; readPoint < MAXMVCC; readPoint++) {
289      LOG.info("Setting read point to " + readPoint);
290      storeScanner = getReversibleStoreScanner(memstore, sf1, sf2, scan, scanInfo, readPoint);
291      int expectedRowCount = 0;
292      int expectedKVCount = 0;
293      for (int i = startRowNum; i >= 0; i--) {
294        int kvCount = 0;
295        if (makeMVCC(i, 0) <= readPoint) {
296          kvCount++;
297        }
298        if (makeMVCC(i, 2) <= readPoint) {
299          kvCount++;
300        }
301        if (kvCount > 0) {
302          expectedRowCount++;
303          expectedKVCount += kvCount;
304        }
305      }
306      verifyCountAndOrder(storeScanner, expectedKVCount, expectedRowCount, false);
307    }
308  }
309
310  @Test
311  public void testReversibleRegionScanner() throws IOException {
312    byte[] FAMILYNAME2 = Bytes.toBytes("testCf2");
313    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName()))
314      .addFamily(new HColumnDescriptor(FAMILYNAME)).addFamily(new HColumnDescriptor(FAMILYNAME2));
315    HRegion region = TEST_UTIL.createLocalHRegion(htd, null, null);
316    loadDataToRegion(region, FAMILYNAME2);
317
318    // verify row count with forward scan
319    Scan scan = new Scan();
320    InternalScanner scanner = region.getScanner(scan);
321    verifyCountAndOrder(scanner, ROWSIZE * QUALSIZE * 2, ROWSIZE, true);
322
323    // Case1:Full reversed scan
324    scan.setReversed(true);
325    scanner = region.getScanner(scan);
326    verifyCountAndOrder(scanner, ROWSIZE * QUALSIZE * 2, ROWSIZE, false);
327
328    // Case2:Full reversed scan with one family
329    scan = new Scan();
330    scan.setReversed(true);
331    scan.addFamily(FAMILYNAME);
332    scanner = region.getScanner(scan);
333    verifyCountAndOrder(scanner, ROWSIZE * QUALSIZE, ROWSIZE, false);
334
335    // Case3:Specify qualifiers + One family
336    byte[][] specifiedQualifiers = { QUALS[1], QUALS[2] };
337    for (byte[] specifiedQualifier : specifiedQualifiers)
338      scan.addColumn(FAMILYNAME, specifiedQualifier);
339    scanner = region.getScanner(scan);
340    verifyCountAndOrder(scanner, ROWSIZE * 2, ROWSIZE, false);
341
342    // Case4:Specify qualifiers + Two families
343    for (byte[] specifiedQualifier : specifiedQualifiers)
344      scan.addColumn(FAMILYNAME2, specifiedQualifier);
345    scanner = region.getScanner(scan);
346    verifyCountAndOrder(scanner, ROWSIZE * 2 * 2, ROWSIZE, false);
347
348    // Case5: Case4 + specify start row
349    int startRowNum = ROWSIZE * 3 / 4;
350    scan.withStartRow(ROWS[startRowNum]);
351    scanner = region.getScanner(scan);
352    verifyCountAndOrder(scanner, (startRowNum + 1) * 2 * 2, (startRowNum + 1), false);
353
354    // Case6: Case4 + specify stop row
355    int stopRowNum = ROWSIZE / 4;
356    scan.withStartRow(HConstants.EMPTY_BYTE_ARRAY);
357    scan.withStopRow(ROWS[stopRowNum]);
358    scanner = region.getScanner(scan);
359    verifyCountAndOrder(scanner, (ROWSIZE - stopRowNum - 1) * 2 * 2, (ROWSIZE - stopRowNum - 1),
360      false);
361
362    // Case7: Case4 + specify start row + specify stop row
363    scan.withStartRow(ROWS[startRowNum]);
364    scanner = region.getScanner(scan);
365    verifyCountAndOrder(scanner, (startRowNum - stopRowNum) * 2 * 2, (startRowNum - stopRowNum),
366      false);
367
368    // Case8: Case7 + SingleColumnValueFilter
369    int valueNum = startRowNum % VALUESIZE;
370    Filter filter = new SingleColumnValueFilter(FAMILYNAME, specifiedQualifiers[0], CompareOp.EQUAL,
371      VALUES[valueNum]);
372    scan.setFilter(filter);
373    scanner = region.getScanner(scan);
374    int unfilteredRowNum =
375      (startRowNum - stopRowNum) / VALUESIZE + (stopRowNum / VALUESIZE == valueNum ? 0 : 1);
376    verifyCountAndOrder(scanner, unfilteredRowNum * 2 * 2, unfilteredRowNum, false);
377
378    // Case9: Case7 + PageFilter
379    int pageSize = 10;
380    filter = new PageFilter(pageSize);
381    scan.setFilter(filter);
382    scanner = region.getScanner(scan);
383    int expectedRowNum = pageSize;
384    verifyCountAndOrder(scanner, expectedRowNum * 2 * 2, expectedRowNum, false);
385
386    // Case10: Case7 + FilterList+MUST_PASS_ONE
387    SingleColumnValueFilter scvFilter1 =
388      new SingleColumnValueFilter(FAMILYNAME, specifiedQualifiers[0], CompareOp.EQUAL, VALUES[0]);
389    SingleColumnValueFilter scvFilter2 =
390      new SingleColumnValueFilter(FAMILYNAME, specifiedQualifiers[0], CompareOp.EQUAL, VALUES[1]);
391    expectedRowNum = 0;
392    for (int i = startRowNum; i > stopRowNum; i--) {
393      if (i % VALUESIZE == 0 || i % VALUESIZE == 1) {
394        expectedRowNum++;
395      }
396    }
397    filter = new FilterList(Operator.MUST_PASS_ONE, scvFilter1, scvFilter2);
398    scan.setFilter(filter);
399    scanner = region.getScanner(scan);
400    verifyCountAndOrder(scanner, expectedRowNum * 2 * 2, expectedRowNum, false);
401
402    // Case10: Case7 + FilterList+MUST_PASS_ALL
403    filter = new FilterList(Operator.MUST_PASS_ALL, scvFilter1, scvFilter2);
404    expectedRowNum = 0;
405    scan.setFilter(filter);
406    scanner = region.getScanner(scan);
407    verifyCountAndOrder(scanner, expectedRowNum * 2 * 2, expectedRowNum, false);
408  }
409
410  private StoreScanner getReversibleStoreScanner(MemStore memstore, HStoreFile sf1, HStoreFile sf2,
411    Scan scan, ScanInfo scanInfo, int readPoint) throws IOException {
412    List<KeyValueScanner> scanners = getScanners(memstore, sf1, sf2, null, false, readPoint);
413    NavigableSet<byte[]> columns = null;
414    for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap().entrySet()) {
415      // Should only one family
416      columns = entry.getValue();
417    }
418    StoreScanner storeScanner = new ReversedStoreScanner(scan, scanInfo, columns, scanners);
419    return storeScanner;
420  }
421
422  private void verifyCountAndOrder(InternalScanner scanner, int expectedKVCount,
423    int expectedRowCount, boolean forward) throws IOException {
424    List<Cell> kvList = new ArrayList<>();
425    Result lastResult = null;
426    int rowCount = 0;
427    int kvCount = 0;
428    try {
429      while (scanner.next(kvList)) {
430        if (kvList.isEmpty()) continue;
431        rowCount++;
432        kvCount += kvList.size();
433        if (lastResult != null) {
434          Result curResult = Result.create(kvList);
435          assertEquals("LastResult:" + lastResult + "CurResult:" + curResult, forward,
436            Bytes.compareTo(curResult.getRow(), lastResult.getRow()) > 0);
437        }
438        lastResult = Result.create(kvList);
439        kvList.clear();
440      }
441    } finally {
442      scanner.close();
443    }
444    if (!kvList.isEmpty()) {
445      rowCount++;
446      kvCount += kvList.size();
447      kvList.clear();
448    }
449    assertEquals(expectedKVCount, kvCount);
450    assertEquals(expectedRowCount, rowCount);
451  }
452
453  private void internalTestSeekAndNextForReversibleKeyValueHeap(ReversedKeyValueHeap kvHeap,
454    int startRowNum) throws IOException {
455    // Test next and seek
456    for (int i = startRowNum; i >= 0; i--) {
457      if (i % 2 == 1 && i - 2 >= 0) {
458        i = i - 2;
459        kvHeap.seekToPreviousRow(KeyValueUtil.createFirstOnRow(ROWS[i + 1]));
460      }
461      for (int j = 0; j < QUALSIZE; j++) {
462        if (j % 2 == 1 && (j + 1) < QUALSIZE) {
463          j = j + 1;
464          kvHeap.backwardSeek(makeKV(i, j));
465        }
466        assertEquals(makeKV(i, j), kvHeap.peek());
467        kvHeap.next();
468      }
469    }
470    assertEquals(null, kvHeap.peek());
471  }
472
473  private ReversedKeyValueHeap getReversibleKeyValueHeap(MemStore memstore, HStoreFile sf1,
474    HStoreFile sf2, byte[] startRow, int readPoint) throws IOException {
475    List<KeyValueScanner> scanners = getScanners(memstore, sf1, sf2, startRow, true, readPoint);
476    ReversedKeyValueHeap kvHeap = new ReversedKeyValueHeap(scanners, CellComparatorImpl.COMPARATOR);
477    return kvHeap;
478  }
479
480  private List<KeyValueScanner> getScanners(MemStore memstore, HStoreFile sf1, HStoreFile sf2,
481    byte[] startRow, boolean doSeek, int readPoint) throws IOException {
482    List<StoreFileScanner> fileScanners = StoreFileScanner
483      .getScannersForStoreFiles(Lists.newArrayList(sf1, sf2), false, true, false, false, readPoint);
484    List<KeyValueScanner> memScanners = memstore.getScanners(readPoint);
485    List<KeyValueScanner> scanners = new ArrayList<>(fileScanners.size() + 1);
486    scanners.addAll(fileScanners);
487    scanners.addAll(memScanners);
488
489    if (doSeek) {
490      if (Bytes.equals(HConstants.EMPTY_START_ROW, startRow)) {
491        for (KeyValueScanner scanner : scanners) {
492          scanner.seekToLastRow();
493        }
494      } else {
495        KeyValue startKey = KeyValueUtil.createFirstOnRow(startRow);
496        for (KeyValueScanner scanner : scanners) {
497          scanner.backwardSeek(startKey);
498        }
499      }
500    }
501    return scanners;
502  }
503
504  private void seekTestOfReversibleKeyValueScanner(KeyValueScanner scanner) throws IOException {
505    /**
506     * Test without MVCC
507     */
508    // Test seek to last row
509    assertTrue(scanner.seekToLastRow());
510    assertEquals(makeKV(ROWSIZE - 1, 0), scanner.peek());
511
512    // Test backward seek in three cases
513    // Case1: seek in the same row in backwardSeek
514    KeyValue seekKey = makeKV(ROWSIZE - 2, QUALSIZE - 2);
515    assertTrue(scanner.backwardSeek(seekKey));
516    assertEquals(seekKey, scanner.peek());
517
518    // Case2: seek to the previous row in backwardSeek
519    int seekRowNum = ROWSIZE - 2;
520    assertTrue(scanner.backwardSeek(KeyValueUtil.createLastOnRow(ROWS[seekRowNum])));
521    KeyValue expectedKey = makeKV(seekRowNum - 1, 0);
522    assertEquals(expectedKey, scanner.peek());
523
524    // Case3: unable to backward seek
525    assertFalse(scanner.backwardSeek(KeyValueUtil.createLastOnRow(ROWS[0])));
526    assertEquals(null, scanner.peek());
527
528    // Test seek to previous row
529    seekRowNum = ROWSIZE - 4;
530    assertTrue(scanner.seekToPreviousRow(KeyValueUtil.createFirstOnRow(ROWS[seekRowNum])));
531    expectedKey = makeKV(seekRowNum - 1, 0);
532    assertEquals(expectedKey, scanner.peek());
533
534    // Test seek to previous row for the first row
535    assertFalse(scanner.seekToPreviousRow(makeKV(0, 0)));
536    assertEquals(null, scanner.peek());
537
538  }
539
540  private void seekTestOfReversibleKeyValueScannerWithMVCC(List<? extends KeyValueScanner> scanners,
541    int readPoint) throws IOException {
542    /**
543     * Test with MVCC
544     */
545    // Test seek to last row
546    KeyValue expectedKey = getNextReadableKeyValueWithBackwardScan(ROWSIZE - 1, 0, readPoint);
547    boolean res = false;
548    for (KeyValueScanner scanner : scanners) {
549      res |= scanner.seekToLastRow();
550    }
551    assertEquals(expectedKey != null, res);
552    res = false;
553    for (KeyValueScanner scanner : scanners) {
554      res |= (expectedKey.equals(scanner.peek()));
555    }
556    assertTrue(res);
557
558    // Test backward seek in two cases
559    // Case1: seek in the same row in backwardSeek
560    expectedKey = getNextReadableKeyValueWithBackwardScan(ROWSIZE - 2, QUALSIZE - 2, readPoint);
561    res = false;
562    for (KeyValueScanner scanner : scanners) {
563      res |= scanner.backwardSeek(expectedKey);
564    }
565    assertEquals(expectedKey != null, res);
566    res = false;
567    for (KeyValueScanner scanner : scanners) {
568      res |= (expectedKey.equals(scanner.peek()));
569    }
570    assertTrue(res);
571
572    // Case2: seek to the previous row in backwardSeek
573    int seekRowNum = ROWSIZE - 3;
574    res = false;
575    for (KeyValueScanner scanner : scanners) {
576      res |= scanner.backwardSeek(expectedKey);
577    }
578    res = false;
579    for (KeyValueScanner scanner : scanners) {
580      res |= (expectedKey.equals(scanner.peek()));
581    }
582    assertTrue(res);
583
584    // Test seek to previous row
585    seekRowNum = ROWSIZE - 4;
586    expectedKey = getNextReadableKeyValueWithBackwardScan(seekRowNum - 1, 0, readPoint);
587    res = false;
588    for (KeyValueScanner scanner : scanners) {
589      res |= scanner.seekToPreviousRow(KeyValueUtil.createFirstOnRow(ROWS[seekRowNum]));
590    }
591    assertEquals(expectedKey != null, res);
592    res = false;
593    for (KeyValueScanner scanner : scanners) {
594      res |= (expectedKey.equals(scanner.peek()));
595    }
596    assertTrue(res);
597  }
598
599  private KeyValue getNextReadableKeyValueWithBackwardScan(int startRowNum, int startQualNum,
600    int readPoint) {
601    Pair<Integer, Integer> nextReadableNum =
602      getNextReadableNumWithBackwardScan(startRowNum, startQualNum, readPoint);
603    if (nextReadableNum == null) return null;
604    return makeKV(nextReadableNum.getFirst(), nextReadableNum.getSecond());
605  }
606
607  private Pair<Integer, Integer> getNextReadableNumWithBackwardScan(int startRowNum,
608    int startQualNum, int readPoint) {
609    Pair<Integer, Integer> nextReadableNum = null;
610    boolean findExpected = false;
611    for (int i = startRowNum; i >= 0; i--) {
612      for (int j = (i == startRowNum ? startQualNum : 0); j < QUALSIZE; j++) {
613        if (makeMVCC(i, j) <= readPoint) {
614          nextReadableNum = new Pair<>(i, j);
615          findExpected = true;
616          break;
617        }
618      }
619      if (findExpected) break;
620    }
621    return nextReadableNum;
622  }
623
624  private static void loadDataToRegion(HRegion region, byte[] additionalFamily) throws IOException {
625    for (int i = 0; i < ROWSIZE; i++) {
626      Put put = new Put(ROWS[i]);
627      for (int j = 0; j < QUALSIZE; j++) {
628        put.add(makeKV(i, j));
629        // put additional family
630        put.add(makeKV(i, j, additionalFamily));
631      }
632      region.put(put);
633      if (i == ROWSIZE / 3 || i == ROWSIZE * 2 / 3) {
634        region.flush(true);
635      }
636    }
637  }
638
639  private static void writeMemstoreAndStoreFiles(MemStore memstore, final StoreFileWriter[] writers)
640    throws IOException {
641    try {
642      for (int i = 0; i < ROWSIZE; i++) {
643        for (int j = 0; j < QUALSIZE; j++) {
644          if (i % 2 == 0) {
645            memstore.add(makeKV(i, j), null);
646          } else {
647            writers[(i + j) % writers.length].append(makeKV(i, j));
648          }
649        }
650      }
651    } finally {
652      for (int i = 0; i < writers.length; i++) {
653        writers[i].close();
654      }
655    }
656  }
657
658  private static void writeStoreFile(final StoreFileWriter writer) throws IOException {
659    try {
660      for (int i = 0; i < ROWSIZE; i++) {
661        for (int j = 0; j < QUALSIZE; j++) {
662          writer.append(makeKV(i, j));
663        }
664      }
665    } finally {
666      writer.close();
667    }
668  }
669
670  private static void writeMemstore(MemStore memstore) throws IOException {
671    // Add half of the keyvalues to memstore
672    for (int i = 0; i < ROWSIZE; i++) {
673      for (int j = 0; j < QUALSIZE; j++) {
674        if ((i + j) % 2 == 0) {
675          memstore.add(makeKV(i, j), null);
676        }
677      }
678    }
679    memstore.snapshot();
680    // Add another half of the keyvalues to snapshot
681    for (int i = 0; i < ROWSIZE; i++) {
682      for (int j = 0; j < QUALSIZE; j++) {
683        if ((i + j) % 2 == 1) {
684          memstore.add(makeKV(i, j), null);
685        }
686      }
687    }
688  }
689
690  private static KeyValue makeKV(int rowNum, int cqNum) {
691    return makeKV(rowNum, cqNum, FAMILYNAME);
692  }
693
694  private static KeyValue makeKV(int rowNum, int cqNum, byte[] familyName) {
695    KeyValue kv =
696      new KeyValue(ROWS[rowNum], familyName, QUALS[cqNum], TS, VALUES[rowNum % VALUESIZE]);
697    kv.setSequenceId(makeMVCC(rowNum, cqNum));
698    return kv;
699  }
700
701  private static long makeMVCC(int rowNum, int cqNum) {
702    return (rowNum + cqNum) % (MAXMVCC + 1);
703  }
704
705  private static byte[][] makeN(byte[] base, int n) {
706    byte[][] ret = new byte[n][];
707    for (int i = 0; i < n; i++) {
708      ret[i] = Bytes.add(base, Bytes.toBytes(String.format("%04d", i)));
709    }
710    return ret;
711  }
712}