001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.Collections;
027import java.util.List;
028import java.util.Map;
029import java.util.NavigableSet;
030import org.apache.hadoop.fs.FileSystem;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.Cell;
033import org.apache.hadoop.hbase.CellComparatorImpl;
034import org.apache.hadoop.hbase.HBaseClassTestRule;
035import org.apache.hadoop.hbase.HBaseTestingUtility;
036import org.apache.hadoop.hbase.HColumnDescriptor;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.HTableDescriptor;
039import org.apache.hadoop.hbase.KeepDeletedCells;
040import org.apache.hadoop.hbase.KeyValue;
041import org.apache.hadoop.hbase.KeyValueUtil;
042import org.apache.hadoop.hbase.TableName;
043import org.apache.hadoop.hbase.client.Put;
044import org.apache.hadoop.hbase.client.Result;
045import org.apache.hadoop.hbase.client.Scan;
046import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
047import org.apache.hadoop.hbase.filter.Filter;
048import org.apache.hadoop.hbase.filter.FilterList;
049import org.apache.hadoop.hbase.filter.FilterList.Operator;
050import org.apache.hadoop.hbase.filter.PageFilter;
051import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
052import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
053import org.apache.hadoop.hbase.io.hfile.CacheConfig;
054import org.apache.hadoop.hbase.io.hfile.HFileContext;
055import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
056import org.apache.hadoop.hbase.testclassification.MediumTests;
057import org.apache.hadoop.hbase.testclassification.RegionServerTests;
058import org.apache.hadoop.hbase.util.Bytes;
059import org.apache.hadoop.hbase.util.Pair;
060import org.junit.BeforeClass;
061import org.junit.ClassRule;
062import org.junit.Rule;
063import org.junit.Test;
064import org.junit.experimental.categories.Category;
065import org.junit.rules.TestName;
066import org.slf4j.Logger;
067import org.slf4j.LoggerFactory;
068
069import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
070
071/**
072 * Test cases against ReversibleKeyValueScanner
073 */
074@Category({RegionServerTests.class, MediumTests.class})
075public class TestReversibleScanners {
076
077  @ClassRule
078  public static final HBaseClassTestRule CLASS_RULE =
079      HBaseClassTestRule.forClass(TestReversibleScanners.class);
080
081  private static final Logger LOG = LoggerFactory.getLogger(TestReversibleScanners.class);
082  HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
083
084  private static byte[] FAMILYNAME = Bytes.toBytes("testCf");
085  private static long TS = System.currentTimeMillis();
086  private static int MAXMVCC = 7;
087  private static byte[] ROW = Bytes.toBytes("testRow");
088  private static final int ROWSIZE = 200;
089  private static byte[][] ROWS = makeN(ROW, ROWSIZE);
090  private static byte[] QUAL = Bytes.toBytes("testQual");
091  private static final int QUALSIZE = 5;
092  private static byte[][] QUALS = makeN(QUAL, QUALSIZE);
093  private static byte[] VALUE = Bytes.toBytes("testValue");
094  private static final int VALUESIZE = 3;
095  private static byte[][] VALUES = makeN(VALUE, VALUESIZE);
096
097  @Rule
098  public TestName name = new TestName();
099
100  @BeforeClass
101  public static void setUp() {
102    ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0,
103      0, null, MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
104  }
105  @Test
106  public void testReversibleStoreFileScanner() throws IOException {
107    FileSystem fs = TEST_UTIL.getTestFileSystem();
108    Path hfilePath = new Path(new Path(
109        TEST_UTIL.getDataTestDir("testReversibleStoreFileScanner"),
110        "regionname"), "familyname");
111    CacheConfig cacheConf = new CacheConfig(TEST_UTIL.getConfiguration());
112    for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
113      HFileContextBuilder hcBuilder = new HFileContextBuilder();
114      hcBuilder.withBlockSize(2 * 1024);
115      hcBuilder.withDataBlockEncoding(encoding);
116      HFileContext hFileContext = hcBuilder.build();
117      StoreFileWriter writer = new StoreFileWriter.Builder(
118          TEST_UTIL.getConfiguration(), cacheConf, fs).withOutputDir(hfilePath)
119          .withFileContext(hFileContext).build();
120      writeStoreFile(writer);
121
122      HStoreFile sf = new HStoreFile(fs, writer.getPath(), TEST_UTIL.getConfiguration(), cacheConf,
123          BloomType.NONE, true);
124
125      List<StoreFileScanner> scanners = StoreFileScanner
126          .getScannersForStoreFiles(Collections.singletonList(sf),
127              false, true, false, false, Long.MAX_VALUE);
128      StoreFileScanner scanner = scanners.get(0);
129      seekTestOfReversibleKeyValueScanner(scanner);
130      for (int readPoint = 0; readPoint < MAXMVCC; readPoint++) {
131        LOG.info("Setting read point to " + readPoint);
132        scanners = StoreFileScanner.getScannersForStoreFiles(
133            Collections.singletonList(sf), false, true, false, false, readPoint);
134        seekTestOfReversibleKeyValueScannerWithMVCC(scanners, readPoint);
135      }
136    }
137
138  }
139
140  @Test
141  public void testReversibleMemstoreScanner() throws IOException {
142    MemStore memstore = new DefaultMemStore();
143    writeMemstore(memstore);
144    List<KeyValueScanner> scanners = memstore.getScanners(Long.MAX_VALUE);
145    seekTestOfReversibleKeyValueScanner(scanners.get(0));
146    for (int readPoint = 0; readPoint < MAXMVCC; readPoint++) {
147      LOG.info("Setting read point to " + readPoint);
148      scanners = memstore.getScanners(readPoint);
149      seekTestOfReversibleKeyValueScannerWithMVCC(scanners, readPoint);
150    }
151
152  }
153
154  @Test
155  public void testReversibleKeyValueHeap() throws IOException {
156    // write data to one memstore and two store files
157    FileSystem fs = TEST_UTIL.getTestFileSystem();
158    Path hfilePath = new Path(new Path(
159        TEST_UTIL.getDataTestDir("testReversibleKeyValueHeap"), "regionname"),
160        "familyname");
161    CacheConfig cacheConf = new CacheConfig(TEST_UTIL.getConfiguration());
162    HFileContextBuilder hcBuilder = new HFileContextBuilder();
163    hcBuilder.withBlockSize(2 * 1024);
164    HFileContext hFileContext = hcBuilder.build();
165    StoreFileWriter writer1 = new StoreFileWriter.Builder(
166        TEST_UTIL.getConfiguration(), cacheConf, fs).withOutputDir(
167        hfilePath).withFileContext(hFileContext).build();
168    StoreFileWriter writer2 = new StoreFileWriter.Builder(
169        TEST_UTIL.getConfiguration(), cacheConf, fs).withOutputDir(
170        hfilePath).withFileContext(hFileContext).build();
171
172    MemStore memstore = new DefaultMemStore();
173    writeMemstoreAndStoreFiles(memstore, new StoreFileWriter[] { writer1,
174        writer2 });
175
176    HStoreFile sf1 = new HStoreFile(fs, writer1.getPath(), TEST_UTIL.getConfiguration(), cacheConf,
177        BloomType.NONE, true);
178
179    HStoreFile sf2 = new HStoreFile(fs, writer2.getPath(), TEST_UTIL.getConfiguration(), cacheConf,
180        BloomType.NONE, true);
181    /**
182     * Test without MVCC
183     */
184    int startRowNum = ROWSIZE / 2;
185    ReversedKeyValueHeap kvHeap = getReversibleKeyValueHeap(memstore, sf1, sf2,
186        ROWS[startRowNum], MAXMVCC);
187    internalTestSeekAndNextForReversibleKeyValueHeap(kvHeap, startRowNum);
188
189    startRowNum = ROWSIZE - 1;
190    kvHeap = getReversibleKeyValueHeap(memstore, sf1, sf2,
191        HConstants.EMPTY_START_ROW, MAXMVCC);
192    internalTestSeekAndNextForReversibleKeyValueHeap(kvHeap, startRowNum);
193
194    /**
195     * Test with MVCC
196     */
197    for (int readPoint = 0; readPoint < MAXMVCC; readPoint++) {
198      LOG.info("Setting read point to " + readPoint);
199      startRowNum = ROWSIZE - 1;
200      kvHeap = getReversibleKeyValueHeap(memstore, sf1, sf2,
201          HConstants.EMPTY_START_ROW, readPoint);
202      for (int i = startRowNum; i >= 0; i--) {
203        if (i - 2 < 0) break;
204        i = i - 2;
205        kvHeap.seekToPreviousRow(KeyValueUtil.createFirstOnRow(ROWS[i + 1]));
206        Pair<Integer, Integer> nextReadableNum = getNextReadableNumWithBackwardScan(
207            i, 0, readPoint);
208        if (nextReadableNum == null) break;
209        KeyValue expecedKey = makeKV(nextReadableNum.getFirst(),
210            nextReadableNum.getSecond());
211        assertEquals(expecedKey, kvHeap.peek());
212        i = nextReadableNum.getFirst();
213        int qualNum = nextReadableNum.getSecond();
214        if (qualNum + 1 < QUALSIZE) {
215          kvHeap.backwardSeek(makeKV(i, qualNum + 1));
216          nextReadableNum = getNextReadableNumWithBackwardScan(i, qualNum + 1,
217              readPoint);
218          if (nextReadableNum == null) break;
219          expecedKey = makeKV(nextReadableNum.getFirst(),
220              nextReadableNum.getSecond());
221          assertEquals(expecedKey, kvHeap.peek());
222          i = nextReadableNum.getFirst();
223          qualNum = nextReadableNum.getSecond();
224        }
225
226        kvHeap.next();
227
228        if (qualNum + 1 >= QUALSIZE) {
229          nextReadableNum = getNextReadableNumWithBackwardScan(i - 1, 0,
230              readPoint);
231        } else {
232          nextReadableNum = getNextReadableNumWithBackwardScan(i, qualNum + 1,
233              readPoint);
234        }
235        if (nextReadableNum == null) break;
236        expecedKey = makeKV(nextReadableNum.getFirst(),
237            nextReadableNum.getSecond());
238        assertEquals(expecedKey, kvHeap.peek());
239        i = nextReadableNum.getFirst();
240      }
241    }
242  }
243
244  @Test
245  public void testReversibleStoreScanner() throws IOException {
246    // write data to one memstore and two store files
247    FileSystem fs = TEST_UTIL.getTestFileSystem();
248    Path hfilePath = new Path(new Path(
249        TEST_UTIL.getDataTestDir("testReversibleStoreScanner"), "regionname"),
250        "familyname");
251    CacheConfig cacheConf = new CacheConfig(TEST_UTIL.getConfiguration());
252    HFileContextBuilder hcBuilder = new HFileContextBuilder();
253    hcBuilder.withBlockSize(2 * 1024);
254    HFileContext hFileContext = hcBuilder.build();
255    StoreFileWriter writer1 = new StoreFileWriter.Builder(
256        TEST_UTIL.getConfiguration(), cacheConf, fs).withOutputDir(
257        hfilePath).withFileContext(hFileContext).build();
258    StoreFileWriter writer2 = new StoreFileWriter.Builder(
259        TEST_UTIL.getConfiguration(), cacheConf, fs).withOutputDir(
260        hfilePath).withFileContext(hFileContext).build();
261
262    MemStore memstore = new DefaultMemStore();
263    writeMemstoreAndStoreFiles(memstore, new StoreFileWriter[] { writer1,
264        writer2 });
265
266    HStoreFile sf1 = new HStoreFile(fs, writer1.getPath(), TEST_UTIL.getConfiguration(), cacheConf,
267        BloomType.NONE, true);
268
269    HStoreFile sf2 = new HStoreFile(fs, writer2.getPath(), TEST_UTIL.getConfiguration(), cacheConf,
270        BloomType.NONE, true);
271
272    ScanInfo scanInfo =
273        new ScanInfo(TEST_UTIL.getConfiguration(), FAMILYNAME, 0, Integer.MAX_VALUE, Long.MAX_VALUE,
274            KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, CellComparatorImpl.COMPARATOR, false);
275
276    // Case 1.Test a full reversed scan
277    Scan scan = new Scan();
278    scan.setReversed(true);
279    StoreScanner storeScanner =
280        getReversibleStoreScanner(memstore, sf1, sf2, scan, scanInfo, MAXMVCC);
281    verifyCountAndOrder(storeScanner, QUALSIZE * ROWSIZE, ROWSIZE, false);
282
283    // Case 2.Test reversed scan with a specified start row
284    int startRowNum = ROWSIZE / 2;
285    byte[] startRow = ROWS[startRowNum];
286    scan.withStartRow(startRow);
287    storeScanner = getReversibleStoreScanner(memstore, sf1, sf2, scan, scanInfo, MAXMVCC);
288    verifyCountAndOrder(storeScanner, QUALSIZE * (startRowNum + 1),
289        startRowNum + 1, false);
290
291    // Case 3.Test reversed scan with a specified start row and specified
292    // qualifiers
293    assertTrue(QUALSIZE > 2);
294    scan.addColumn(FAMILYNAME, QUALS[0]);
295    scan.addColumn(FAMILYNAME, QUALS[2]);
296    storeScanner = getReversibleStoreScanner(memstore, sf1, sf2, scan, scanInfo, MAXMVCC);
297    verifyCountAndOrder(storeScanner, 2 * (startRowNum + 1), startRowNum + 1,
298        false);
299
300    // Case 4.Test reversed scan with mvcc based on case 3
301    for (int readPoint = 0; readPoint < MAXMVCC; readPoint++) {
302      LOG.info("Setting read point to " + readPoint);
303      storeScanner = getReversibleStoreScanner(memstore, sf1, sf2, scan, scanInfo, readPoint);
304      int expectedRowCount = 0;
305      int expectedKVCount = 0;
306      for (int i = startRowNum; i >= 0; i--) {
307        int kvCount = 0;
308        if (makeMVCC(i, 0) <= readPoint) {
309          kvCount++;
310        }
311        if (makeMVCC(i, 2) <= readPoint) {
312          kvCount++;
313        }
314        if (kvCount > 0) {
315          expectedRowCount++;
316          expectedKVCount += kvCount;
317        }
318      }
319      verifyCountAndOrder(storeScanner, expectedKVCount, expectedRowCount,
320          false);
321    }
322  }
323
324  @Test
325  public void testReversibleRegionScanner() throws IOException {
326    byte[] FAMILYNAME2 = Bytes.toBytes("testCf2");
327    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName()))
328        .addFamily(new HColumnDescriptor(FAMILYNAME))
329        .addFamily(new HColumnDescriptor(FAMILYNAME2));
330    HRegion region = TEST_UTIL.createLocalHRegion(htd, null, null);
331    loadDataToRegion(region, FAMILYNAME2);
332
333    // verify row count with forward scan
334    Scan scan = new Scan();
335    InternalScanner scanner = region.getScanner(scan);
336    verifyCountAndOrder(scanner, ROWSIZE * QUALSIZE * 2, ROWSIZE, true);
337
338    // Case1:Full reversed scan
339    scan.setReversed(true);
340    scanner = region.getScanner(scan);
341    verifyCountAndOrder(scanner, ROWSIZE * QUALSIZE * 2, ROWSIZE, false);
342
343    // Case2:Full reversed scan with one family
344    scan = new Scan();
345    scan.setReversed(true);
346    scan.addFamily(FAMILYNAME);
347    scanner = region.getScanner(scan);
348    verifyCountAndOrder(scanner, ROWSIZE * QUALSIZE, ROWSIZE, false);
349
350    // Case3:Specify qualifiers + One family
351    byte[][] specifiedQualifiers = { QUALS[1], QUALS[2] };
352    for (byte[] specifiedQualifier : specifiedQualifiers)
353      scan.addColumn(FAMILYNAME, specifiedQualifier);
354    scanner = region.getScanner(scan);
355    verifyCountAndOrder(scanner, ROWSIZE * 2, ROWSIZE, false);
356
357    // Case4:Specify qualifiers + Two families
358    for (byte[] specifiedQualifier : specifiedQualifiers)
359      scan.addColumn(FAMILYNAME2, specifiedQualifier);
360    scanner = region.getScanner(scan);
361    verifyCountAndOrder(scanner, ROWSIZE * 2 * 2, ROWSIZE, false);
362
363    // Case5: Case4 + specify start row
364    int startRowNum = ROWSIZE * 3 / 4;
365    scan.withStartRow(ROWS[startRowNum]);
366    scanner = region.getScanner(scan);
367    verifyCountAndOrder(scanner, (startRowNum + 1) * 2 * 2, (startRowNum + 1),
368        false);
369
370    // Case6: Case4 + specify stop row
371    int stopRowNum = ROWSIZE / 4;
372    scan.withStartRow(HConstants.EMPTY_BYTE_ARRAY);
373    scan.withStopRow(ROWS[stopRowNum]);
374    scanner = region.getScanner(scan);
375    verifyCountAndOrder(scanner, (ROWSIZE - stopRowNum - 1) * 2 * 2, (ROWSIZE
376        - stopRowNum - 1), false);
377
378    // Case7: Case4 + specify start row + specify stop row
379    scan.withStartRow(ROWS[startRowNum]);
380    scanner = region.getScanner(scan);
381    verifyCountAndOrder(scanner, (startRowNum - stopRowNum) * 2 * 2,
382        (startRowNum - stopRowNum), false);
383
384    // Case8: Case7 + SingleColumnValueFilter
385    int valueNum = startRowNum % VALUESIZE;
386    Filter filter = new SingleColumnValueFilter(FAMILYNAME,
387        specifiedQualifiers[0], CompareOp.EQUAL, VALUES[valueNum]);
388    scan.setFilter(filter);
389    scanner = region.getScanner(scan);
390    int unfilteredRowNum = (startRowNum - stopRowNum) / VALUESIZE
391        + (stopRowNum / VALUESIZE == valueNum ? 0 : 1);
392    verifyCountAndOrder(scanner, unfilteredRowNum * 2 * 2, unfilteredRowNum,
393        false);
394
395    // Case9: Case7 + PageFilter
396    int pageSize = 10;
397    filter = new PageFilter(pageSize);
398    scan.setFilter(filter);
399    scanner = region.getScanner(scan);
400    int expectedRowNum = pageSize;
401    verifyCountAndOrder(scanner, expectedRowNum * 2 * 2, expectedRowNum, false);
402
403    // Case10: Case7 + FilterList+MUST_PASS_ONE
404    SingleColumnValueFilter scvFilter1 = new SingleColumnValueFilter(
405        FAMILYNAME, specifiedQualifiers[0], CompareOp.EQUAL, VALUES[0]);
406    SingleColumnValueFilter scvFilter2 = new SingleColumnValueFilter(
407        FAMILYNAME, specifiedQualifiers[0], CompareOp.EQUAL, VALUES[1]);
408    expectedRowNum = 0;
409    for (int i = startRowNum; i > stopRowNum; i--) {
410      if (i % VALUESIZE == 0 || i % VALUESIZE == 1) {
411        expectedRowNum++;
412      }
413    }
414    filter = new FilterList(Operator.MUST_PASS_ONE, scvFilter1, scvFilter2);
415    scan.setFilter(filter);
416    scanner = region.getScanner(scan);
417    verifyCountAndOrder(scanner, expectedRowNum * 2 * 2, expectedRowNum, false);
418
419    // Case10: Case7 + FilterList+MUST_PASS_ALL
420    filter = new FilterList(Operator.MUST_PASS_ALL, scvFilter1, scvFilter2);
421    expectedRowNum = 0;
422    scan.setFilter(filter);
423    scanner = region.getScanner(scan);
424    verifyCountAndOrder(scanner, expectedRowNum * 2 * 2, expectedRowNum, false);
425  }
426
427  private StoreScanner getReversibleStoreScanner(MemStore memstore, HStoreFile sf1, HStoreFile sf2,
428      Scan scan, ScanInfo scanInfo, int readPoint) throws IOException {
429    List<KeyValueScanner> scanners = getScanners(memstore, sf1, sf2, null, false, readPoint);
430    NavigableSet<byte[]> columns = null;
431    for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap().entrySet()) {
432      // Should only one family
433      columns = entry.getValue();
434    }
435    StoreScanner storeScanner = new ReversedStoreScanner(scan, scanInfo, columns, scanners);
436    return storeScanner;
437  }
438
439  private void verifyCountAndOrder(InternalScanner scanner,
440      int expectedKVCount, int expectedRowCount, boolean forward)
441      throws IOException {
442    List<Cell> kvList = new ArrayList<>();
443    Result lastResult = null;
444    int rowCount = 0;
445    int kvCount = 0;
446    try {
447      while (scanner.next(kvList)) {
448        if (kvList.isEmpty()) continue;
449        rowCount++;
450        kvCount += kvList.size();
451        if (lastResult != null) {
452          Result curResult = Result.create(kvList);
453          assertEquals("LastResult:" + lastResult + "CurResult:" + curResult,
454              forward,
455              Bytes.compareTo(curResult.getRow(), lastResult.getRow()) > 0);
456        }
457        lastResult = Result.create(kvList);
458        kvList.clear();
459      }
460    } finally {
461      scanner.close();
462    }
463    if (!kvList.isEmpty()) {
464      rowCount++;
465      kvCount += kvList.size();
466      kvList.clear();
467    }
468    assertEquals(expectedKVCount, kvCount);
469    assertEquals(expectedRowCount, rowCount);
470  }
471
472  private void internalTestSeekAndNextForReversibleKeyValueHeap(
473      ReversedKeyValueHeap kvHeap, int startRowNum) throws IOException {
474    // Test next and seek
475    for (int i = startRowNum; i >= 0; i--) {
476      if (i % 2 == 1 && i - 2 >= 0) {
477        i = i - 2;
478        kvHeap.seekToPreviousRow(KeyValueUtil.createFirstOnRow(ROWS[i + 1]));
479      }
480      for (int j = 0; j < QUALSIZE; j++) {
481        if (j % 2 == 1 && (j + 1) < QUALSIZE) {
482          j = j + 1;
483          kvHeap.backwardSeek(makeKV(i, j));
484        }
485        assertEquals(makeKV(i, j), kvHeap.peek());
486        kvHeap.next();
487      }
488    }
489    assertEquals(null, kvHeap.peek());
490  }
491
492  private ReversedKeyValueHeap getReversibleKeyValueHeap(MemStore memstore, HStoreFile sf1,
493      HStoreFile sf2, byte[] startRow, int readPoint) throws IOException {
494    List<KeyValueScanner> scanners = getScanners(memstore, sf1, sf2, startRow, true, readPoint);
495    ReversedKeyValueHeap kvHeap = new ReversedKeyValueHeap(scanners, CellComparatorImpl.COMPARATOR);
496    return kvHeap;
497  }
498
499  private List<KeyValueScanner> getScanners(MemStore memstore, HStoreFile sf1, HStoreFile sf2,
500      byte[] startRow, boolean doSeek, int readPoint) throws IOException {
501    List<StoreFileScanner> fileScanners = StoreFileScanner.getScannersForStoreFiles(
502      Lists.newArrayList(sf1, sf2), false, true, false, false, readPoint);
503    List<KeyValueScanner> memScanners = memstore.getScanners(readPoint);
504    List<KeyValueScanner> scanners = new ArrayList<>(fileScanners.size() + 1);
505    scanners.addAll(fileScanners);
506    scanners.addAll(memScanners);
507
508    if (doSeek) {
509      if (Bytes.equals(HConstants.EMPTY_START_ROW, startRow)) {
510        for (KeyValueScanner scanner : scanners) {
511          scanner.seekToLastRow();
512        }
513      } else {
514        KeyValue startKey = KeyValueUtil.createFirstOnRow(startRow);
515        for (KeyValueScanner scanner : scanners) {
516          scanner.backwardSeek(startKey);
517        }
518      }
519    }
520    return scanners;
521  }
522
523  private void seekTestOfReversibleKeyValueScanner(KeyValueScanner scanner)
524      throws IOException {
525    /**
526     * Test without MVCC
527     */
528    // Test seek to last row
529    assertTrue(scanner.seekToLastRow());
530    assertEquals(makeKV(ROWSIZE - 1, 0), scanner.peek());
531
532    // Test backward seek in three cases
533    // Case1: seek in the same row in backwardSeek
534    KeyValue seekKey = makeKV(ROWSIZE - 2, QUALSIZE - 2);
535    assertTrue(scanner.backwardSeek(seekKey));
536    assertEquals(seekKey, scanner.peek());
537
538    // Case2: seek to the previous row in backwardSeek
539    int seekRowNum = ROWSIZE - 2;
540    assertTrue(scanner.backwardSeek(KeyValueUtil.createLastOnRow(ROWS[seekRowNum])));
541    KeyValue expectedKey = makeKV(seekRowNum - 1, 0);
542    assertEquals(expectedKey, scanner.peek());
543
544    // Case3: unable to backward seek
545    assertFalse(scanner.backwardSeek(KeyValueUtil.createLastOnRow(ROWS[0])));
546    assertEquals(null, scanner.peek());
547
548    // Test seek to previous row
549    seekRowNum = ROWSIZE - 4;
550    assertTrue(scanner.seekToPreviousRow(KeyValueUtil
551        .createFirstOnRow(ROWS[seekRowNum])));
552    expectedKey = makeKV(seekRowNum - 1, 0);
553    assertEquals(expectedKey, scanner.peek());
554
555    // Test seek to previous row for the first row
556    assertFalse(scanner.seekToPreviousRow(makeKV(0, 0)));
557    assertEquals(null, scanner.peek());
558
559  }
560
561  private void seekTestOfReversibleKeyValueScannerWithMVCC(
562      List<? extends KeyValueScanner> scanners, int readPoint) throws IOException {
563  /**
564   * Test with MVCC
565   */
566    // Test seek to last row
567    KeyValue expectedKey = getNextReadableKeyValueWithBackwardScan(
568        ROWSIZE - 1, 0, readPoint);
569    boolean res = false;
570    for (KeyValueScanner scanner : scanners) {
571      res |= scanner.seekToLastRow();
572    }
573    assertEquals(expectedKey != null, res);
574    res = false;
575    for (KeyValueScanner scanner : scanners) {
576      res |= (expectedKey.equals(scanner.peek()));
577    }
578    assertTrue(res);
579
580      // Test backward seek in two cases
581      // Case1: seek in the same row in backwardSeek
582      expectedKey = getNextReadableKeyValueWithBackwardScan(ROWSIZE - 2,
583          QUALSIZE - 2, readPoint);
584    res = false;
585    for (KeyValueScanner scanner : scanners) {
586      res |= scanner.backwardSeek(expectedKey);
587    }
588    assertEquals(expectedKey != null, res);
589    res = false;
590    for (KeyValueScanner scanner : scanners) {
591      res |= (expectedKey.equals(scanner.peek()));
592    }
593    assertTrue(res);
594
595      // Case2: seek to the previous row in backwardSeek
596    int seekRowNum = ROWSIZE - 3;
597    res = false;
598    for (KeyValueScanner scanner : scanners) {
599      res |= scanner.backwardSeek(expectedKey);
600    }
601    res = false;
602    for (KeyValueScanner scanner : scanners) {
603      res |= (expectedKey.equals(scanner.peek()));
604    }
605    assertTrue(res);
606
607      // Test seek to previous row
608      seekRowNum = ROWSIZE - 4;
609      expectedKey = getNextReadableKeyValueWithBackwardScan(seekRowNum - 1, 0,
610          readPoint);
611    res = false;
612    for (KeyValueScanner scanner : scanners) {
613      res |= scanner.seekToPreviousRow(KeyValueUtil.createFirstOnRow(ROWS[seekRowNum]));
614    }
615    assertEquals(expectedKey != null, res);
616    res = false;
617    for (KeyValueScanner scanner : scanners) {
618      res |= (expectedKey.equals(scanner.peek()));
619    }
620    assertTrue(res);
621  }
622
623  private KeyValue getNextReadableKeyValueWithBackwardScan(int startRowNum,
624      int startQualNum, int readPoint) {
625    Pair<Integer, Integer> nextReadableNum = getNextReadableNumWithBackwardScan(
626        startRowNum, startQualNum, readPoint);
627    if (nextReadableNum == null)
628      return null;
629    return makeKV(nextReadableNum.getFirst(), nextReadableNum.getSecond());
630  }
631
632  private Pair<Integer, Integer> getNextReadableNumWithBackwardScan(
633      int startRowNum, int startQualNum, int readPoint) {
634    Pair<Integer, Integer> nextReadableNum = null;
635    boolean findExpected = false;
636    for (int i = startRowNum; i >= 0; i--) {
637      for (int j = (i == startRowNum ? startQualNum : 0); j < QUALSIZE; j++) {
638        if (makeMVCC(i, j) <= readPoint) {
639          nextReadableNum = new Pair<>(i, j);
640          findExpected = true;
641          break;
642        }
643      }
644      if (findExpected)
645        break;
646    }
647    return nextReadableNum;
648  }
649
650  private static void loadDataToRegion(HRegion region, byte[] additionalFamily)
651      throws IOException {
652    for (int i = 0; i < ROWSIZE; i++) {
653      Put put = new Put(ROWS[i]);
654      for (int j = 0; j < QUALSIZE; j++) {
655        put.add(makeKV(i, j));
656        // put additional family
657        put.add(makeKV(i, j, additionalFamily));
658      }
659      region.put(put);
660      if (i == ROWSIZE / 3 || i == ROWSIZE * 2 / 3) {
661        region.flush(true);
662      }
663    }
664  }
665
666  private static void writeMemstoreAndStoreFiles(MemStore memstore,
667      final StoreFileWriter[] writers) throws IOException {
668    try {
669      for (int i = 0; i < ROWSIZE; i++) {
670        for (int j = 0; j < QUALSIZE; j++) {
671          if (i % 2 == 0) {
672            memstore.add(makeKV(i, j), null);
673          } else {
674            writers[(i + j) % writers.length].append(makeKV(i, j));
675          }
676        }
677      }
678    } finally {
679      for (int i = 0; i < writers.length; i++) {
680        writers[i].close();
681      }
682    }
683  }
684
685  private static void writeStoreFile(final StoreFileWriter writer)
686      throws IOException {
687    try {
688      for (int i = 0; i < ROWSIZE; i++) {
689        for (int j = 0; j < QUALSIZE; j++) {
690          writer.append(makeKV(i, j));
691        }
692      }
693    } finally {
694      writer.close();
695    }
696  }
697
698  private static void writeMemstore(MemStore memstore) throws IOException {
699    // Add half of the keyvalues to memstore
700    for (int i = 0; i < ROWSIZE; i++) {
701      for (int j = 0; j < QUALSIZE; j++) {
702        if ((i + j) % 2 == 0) {
703          memstore.add(makeKV(i, j), null);
704        }
705      }
706    }
707    memstore.snapshot();
708    // Add another half of the keyvalues to snapshot
709    for (int i = 0; i < ROWSIZE; i++) {
710      for (int j = 0; j < QUALSIZE; j++) {
711        if ((i + j) % 2 == 1) {
712          memstore.add(makeKV(i, j), null);
713        }
714      }
715    }
716  }
717
718  private static KeyValue makeKV(int rowNum, int cqNum) {
719    return makeKV(rowNum, cqNum, FAMILYNAME);
720  }
721
722  private static KeyValue makeKV(int rowNum, int cqNum, byte[] familyName) {
723    KeyValue kv = new KeyValue(ROWS[rowNum], familyName, QUALS[cqNum], TS,
724        VALUES[rowNum % VALUESIZE]);
725    kv.setSequenceId(makeMVCC(rowNum, cqNum));
726    return kv;
727  }
728
729  private static long makeMVCC(int rowNum, int cqNum) {
730    return (rowNum + cqNum) % (MAXMVCC + 1);
731  }
732
733  private static byte[][] makeN(byte[] base, int n) {
734    byte[][] ret = new byte[n][];
735    for (int i = 0; i < n; i++) {
736      ret[i] = Bytes.add(base, Bytes.toBytes(String.format("%04d", i)));
737    }
738    return ret;
739  }
740}