001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.Collections;
027import java.util.List;
028import java.util.Map;
029import java.util.NavigableSet;
030import org.apache.hadoop.fs.FileSystem;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.Cell;
033import org.apache.hadoop.hbase.CellComparatorImpl;
034import org.apache.hadoop.hbase.CompareOperator;
035import org.apache.hadoop.hbase.HBaseClassTestRule;
036import org.apache.hadoop.hbase.HBaseTestingUtil;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.KeepDeletedCells;
039import org.apache.hadoop.hbase.KeyValue;
040import org.apache.hadoop.hbase.KeyValueUtil;
041import org.apache.hadoop.hbase.TableName;
042import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
043import org.apache.hadoop.hbase.client.Put;
044import org.apache.hadoop.hbase.client.Result;
045import org.apache.hadoop.hbase.client.Scan;
046import org.apache.hadoop.hbase.client.TableDescriptor;
047import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
048import org.apache.hadoop.hbase.filter.Filter;
049import org.apache.hadoop.hbase.filter.FilterList;
050import org.apache.hadoop.hbase.filter.FilterList.Operator;
051import org.apache.hadoop.hbase.filter.PageFilter;
052import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
053import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
054import org.apache.hadoop.hbase.io.hfile.CacheConfig;
055import org.apache.hadoop.hbase.io.hfile.HFileContext;
056import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
057import org.apache.hadoop.hbase.testclassification.MediumTests;
058import org.apache.hadoop.hbase.testclassification.RegionServerTests;
059import org.apache.hadoop.hbase.util.Bytes;
060import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
061import org.apache.hadoop.hbase.util.Pair;
062import org.junit.BeforeClass;
063import org.junit.ClassRule;
064import org.junit.Rule;
065import org.junit.Test;
066import org.junit.experimental.categories.Category;
067import org.junit.rules.TestName;
068import org.slf4j.Logger;
069import org.slf4j.LoggerFactory;
070
071import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
072
073/**
074 * Test cases against ReversibleKeyValueScanner
075 */
076@Category({ RegionServerTests.class, MediumTests.class })
077public class TestReversibleScanners {
078
079  @ClassRule
080  public static final HBaseClassTestRule CLASS_RULE =
081    HBaseClassTestRule.forClass(TestReversibleScanners.class);
082
083  private static final Logger LOG = LoggerFactory.getLogger(TestReversibleScanners.class);
084  HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
085
086  private static byte[] FAMILYNAME = Bytes.toBytes("testCf");
087  private static long TS = EnvironmentEdgeManager.currentTime();
088  private static int MAXMVCC = 7;
089  private static byte[] ROW = Bytes.toBytes("testRow");
090  private static final int ROWSIZE = 200;
091  private static byte[][] ROWS = makeN(ROW, ROWSIZE);
092  private static byte[] QUAL = Bytes.toBytes("testQual");
093  private static final int QUALSIZE = 5;
094  private static byte[][] QUALS = makeN(QUAL, QUALSIZE);
095  private static byte[] VALUE = Bytes.toBytes("testValue");
096  private static final int VALUESIZE = 3;
097  private static byte[][] VALUES = makeN(VALUE, VALUESIZE);
098
099  @Rule
100  public TestName name = new TestName();
101
102  @BeforeClass
103  public static void setUp() {
104    ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null,
105      MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
106  }
107
108  @Test
109  public void testReversibleStoreFileScanner() throws IOException {
110    FileSystem fs = TEST_UTIL.getTestFileSystem();
111    Path hfilePath =
112      new Path(new Path(TEST_UTIL.getDataTestDir("testReversibleStoreFileScanner"), "regionname"),
113        "familyname");
114    CacheConfig cacheConf = new CacheConfig(TEST_UTIL.getConfiguration());
115    for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
116      HFileContextBuilder hcBuilder = new HFileContextBuilder();
117      hcBuilder.withBlockSize(2 * 1024);
118      hcBuilder.withDataBlockEncoding(encoding);
119      HFileContext hFileContext = hcBuilder.build();
120      StoreFileWriter writer =
121        new StoreFileWriter.Builder(TEST_UTIL.getConfiguration(), cacheConf, fs)
122          .withOutputDir(hfilePath).withFileContext(hFileContext).build();
123      writeStoreFile(writer);
124
125      HStoreFile sf = new HStoreFile(fs, writer.getPath(), TEST_UTIL.getConfiguration(), cacheConf,
126        BloomType.NONE, true);
127
128      List<StoreFileScanner> scanners = StoreFileScanner.getScannersForStoreFiles(
129        Collections.singletonList(sf), false, true, false, false, Long.MAX_VALUE);
130      StoreFileScanner scanner = scanners.get(0);
131      seekTestOfReversibleKeyValueScanner(scanner);
132      for (int readPoint = 0; readPoint < MAXMVCC; readPoint++) {
133        LOG.info("Setting read point to " + readPoint);
134        scanners = StoreFileScanner.getScannersForStoreFiles(Collections.singletonList(sf), false,
135          true, false, false, readPoint);
136        seekTestOfReversibleKeyValueScannerWithMVCC(scanners, readPoint);
137      }
138    }
139
140  }
141
142  @Test
143  public void testReversibleMemstoreScanner() throws IOException {
144    MemStore memstore = new DefaultMemStore();
145    writeMemstore(memstore);
146    List<KeyValueScanner> scanners = memstore.getScanners(Long.MAX_VALUE);
147    seekTestOfReversibleKeyValueScanner(scanners.get(0));
148    for (int readPoint = 0; readPoint < MAXMVCC; readPoint++) {
149      LOG.info("Setting read point to " + readPoint);
150      scanners = memstore.getScanners(readPoint);
151      seekTestOfReversibleKeyValueScannerWithMVCC(scanners, readPoint);
152    }
153
154  }
155
156  @Test
157  public void testReversibleKeyValueHeap() throws IOException {
158    // write data to one memstore and two store files
159    FileSystem fs = TEST_UTIL.getTestFileSystem();
160    Path hfilePath = new Path(
161      new Path(TEST_UTIL.getDataTestDir("testReversibleKeyValueHeap"), "regionname"), "familyname");
162    CacheConfig cacheConf = new CacheConfig(TEST_UTIL.getConfiguration());
163    HFileContextBuilder hcBuilder = new HFileContextBuilder();
164    hcBuilder.withBlockSize(2 * 1024);
165    HFileContext hFileContext = hcBuilder.build();
166    StoreFileWriter writer1 =
167      new StoreFileWriter.Builder(TEST_UTIL.getConfiguration(), cacheConf, fs)
168        .withOutputDir(hfilePath).withFileContext(hFileContext).build();
169    StoreFileWriter writer2 =
170      new StoreFileWriter.Builder(TEST_UTIL.getConfiguration(), cacheConf, fs)
171        .withOutputDir(hfilePath).withFileContext(hFileContext).build();
172
173    MemStore memstore = new DefaultMemStore();
174    writeMemstoreAndStoreFiles(memstore, new StoreFileWriter[] { writer1, writer2 });
175
176    HStoreFile sf1 = new HStoreFile(fs, writer1.getPath(), TEST_UTIL.getConfiguration(), cacheConf,
177      BloomType.NONE, true);
178
179    HStoreFile sf2 = new HStoreFile(fs, writer2.getPath(), TEST_UTIL.getConfiguration(), cacheConf,
180      BloomType.NONE, true);
181    /**
182     * Test without MVCC
183     */
184    int startRowNum = ROWSIZE / 2;
185    ReversedKeyValueHeap kvHeap =
186      getReversibleKeyValueHeap(memstore, sf1, sf2, ROWS[startRowNum], MAXMVCC);
187    internalTestSeekAndNextForReversibleKeyValueHeap(kvHeap, startRowNum);
188
189    startRowNum = ROWSIZE - 1;
190    kvHeap = getReversibleKeyValueHeap(memstore, sf1, sf2, HConstants.EMPTY_START_ROW, MAXMVCC);
191    internalTestSeekAndNextForReversibleKeyValueHeap(kvHeap, startRowNum);
192
193    /**
194     * Test with MVCC
195     */
196    for (int readPoint = 0; readPoint < MAXMVCC; readPoint++) {
197      LOG.info("Setting read point to " + readPoint);
198      startRowNum = ROWSIZE - 1;
199      kvHeap = getReversibleKeyValueHeap(memstore, sf1, sf2, HConstants.EMPTY_START_ROW, readPoint);
200      for (int i = startRowNum; i >= 0; i--) {
201        if (i - 2 < 0) break;
202        i = i - 2;
203        kvHeap.seekToPreviousRow(KeyValueUtil.createFirstOnRow(ROWS[i + 1]));
204        Pair<Integer, Integer> nextReadableNum =
205          getNextReadableNumWithBackwardScan(i, 0, readPoint);
206        if (nextReadableNum == null) break;
207        KeyValue expecedKey = makeKV(nextReadableNum.getFirst(), nextReadableNum.getSecond());
208        assertEquals(expecedKey, kvHeap.peek());
209        i = nextReadableNum.getFirst();
210        int qualNum = nextReadableNum.getSecond();
211        if (qualNum + 1 < QUALSIZE) {
212          kvHeap.backwardSeek(makeKV(i, qualNum + 1));
213          nextReadableNum = getNextReadableNumWithBackwardScan(i, qualNum + 1, readPoint);
214          if (nextReadableNum == null) break;
215          expecedKey = makeKV(nextReadableNum.getFirst(), nextReadableNum.getSecond());
216          assertEquals(expecedKey, kvHeap.peek());
217          i = nextReadableNum.getFirst();
218          qualNum = nextReadableNum.getSecond();
219        }
220
221        kvHeap.next();
222
223        if (qualNum + 1 >= QUALSIZE) {
224          nextReadableNum = getNextReadableNumWithBackwardScan(i - 1, 0, readPoint);
225        } else {
226          nextReadableNum = getNextReadableNumWithBackwardScan(i, qualNum + 1, readPoint);
227        }
228        if (nextReadableNum == null) break;
229        expecedKey = makeKV(nextReadableNum.getFirst(), nextReadableNum.getSecond());
230        assertEquals(expecedKey, kvHeap.peek());
231        i = nextReadableNum.getFirst();
232      }
233    }
234  }
235
236  @Test
237  public void testReversibleStoreScanner() throws IOException {
238    // write data to one memstore and two store files
239    FileSystem fs = TEST_UTIL.getTestFileSystem();
240    Path hfilePath = new Path(
241      new Path(TEST_UTIL.getDataTestDir("testReversibleStoreScanner"), "regionname"), "familyname");
242    CacheConfig cacheConf = new CacheConfig(TEST_UTIL.getConfiguration());
243    HFileContextBuilder hcBuilder = new HFileContextBuilder();
244    hcBuilder.withBlockSize(2 * 1024);
245    HFileContext hFileContext = hcBuilder.build();
246    StoreFileWriter writer1 =
247      new StoreFileWriter.Builder(TEST_UTIL.getConfiguration(), cacheConf, fs)
248        .withOutputDir(hfilePath).withFileContext(hFileContext).build();
249    StoreFileWriter writer2 =
250      new StoreFileWriter.Builder(TEST_UTIL.getConfiguration(), cacheConf, fs)
251        .withOutputDir(hfilePath).withFileContext(hFileContext).build();
252
253    MemStore memstore = new DefaultMemStore();
254    writeMemstoreAndStoreFiles(memstore, new StoreFileWriter[] { writer1, writer2 });
255
256    HStoreFile sf1 = new HStoreFile(fs, writer1.getPath(), TEST_UTIL.getConfiguration(), cacheConf,
257      BloomType.NONE, true);
258
259    HStoreFile sf2 = new HStoreFile(fs, writer2.getPath(), TEST_UTIL.getConfiguration(), cacheConf,
260      BloomType.NONE, true);
261
262    ScanInfo scanInfo = new ScanInfo(TEST_UTIL.getConfiguration(), FAMILYNAME, 0, Integer.MAX_VALUE,
263      Long.MAX_VALUE, KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0,
264      CellComparatorImpl.COMPARATOR, false);
265
266    // Case 1.Test a full reversed scan
267    Scan scan = new Scan();
268    scan.setReversed(true);
269    StoreScanner storeScanner =
270      getReversibleStoreScanner(memstore, sf1, sf2, scan, scanInfo, MAXMVCC);
271    verifyCountAndOrder(storeScanner, QUALSIZE * ROWSIZE, ROWSIZE, false);
272
273    // Case 2.Test reversed scan with a specified start row
274    int startRowNum = ROWSIZE / 2;
275    byte[] startRow = ROWS[startRowNum];
276    scan.withStartRow(startRow);
277    storeScanner = getReversibleStoreScanner(memstore, sf1, sf2, scan, scanInfo, MAXMVCC);
278    verifyCountAndOrder(storeScanner, QUALSIZE * (startRowNum + 1), startRowNum + 1, false);
279
280    // Case 3.Test reversed scan with a specified start row and specified
281    // qualifiers
282    assertTrue(QUALSIZE > 2);
283    scan.addColumn(FAMILYNAME, QUALS[0]);
284    scan.addColumn(FAMILYNAME, QUALS[2]);
285    storeScanner = getReversibleStoreScanner(memstore, sf1, sf2, scan, scanInfo, MAXMVCC);
286    verifyCountAndOrder(storeScanner, 2 * (startRowNum + 1), startRowNum + 1, false);
287
288    // Case 4.Test reversed scan with mvcc based on case 3
289    for (int readPoint = 0; readPoint < MAXMVCC; readPoint++) {
290      LOG.info("Setting read point to " + readPoint);
291      storeScanner = getReversibleStoreScanner(memstore, sf1, sf2, scan, scanInfo, readPoint);
292      int expectedRowCount = 0;
293      int expectedKVCount = 0;
294      for (int i = startRowNum; i >= 0; i--) {
295        int kvCount = 0;
296        if (makeMVCC(i, 0) <= readPoint) {
297          kvCount++;
298        }
299        if (makeMVCC(i, 2) <= readPoint) {
300          kvCount++;
301        }
302        if (kvCount > 0) {
303          expectedRowCount++;
304          expectedKVCount += kvCount;
305        }
306      }
307      verifyCountAndOrder(storeScanner, expectedKVCount, expectedRowCount, false);
308    }
309  }
310
311  @Test
312  public void testReversibleRegionScanner() throws IOException {
313    byte[] FAMILYNAME2 = Bytes.toBytes("testCf2");
314    TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf(name.getMethodName()))
315      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILYNAME))
316      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILYNAME2)).build();
317    HRegion region = TEST_UTIL.createLocalHRegion(htd, null, null);
318    loadDataToRegion(region, FAMILYNAME2);
319
320    // verify row count with forward scan
321    Scan scan = new Scan();
322    InternalScanner scanner = region.getScanner(scan);
323    verifyCountAndOrder(scanner, ROWSIZE * QUALSIZE * 2, ROWSIZE, true);
324
325    // Case1:Full reversed scan
326    scan.setReversed(true);
327    scanner = region.getScanner(scan);
328    verifyCountAndOrder(scanner, ROWSIZE * QUALSIZE * 2, ROWSIZE, false);
329
330    // Case2:Full reversed scan with one family
331    scan = new Scan();
332    scan.setReversed(true);
333    scan.addFamily(FAMILYNAME);
334    scanner = region.getScanner(scan);
335    verifyCountAndOrder(scanner, ROWSIZE * QUALSIZE, ROWSIZE, false);
336
337    // Case3:Specify qualifiers + One family
338    byte[][] specifiedQualifiers = { QUALS[1], QUALS[2] };
339    for (byte[] specifiedQualifier : specifiedQualifiers)
340      scan.addColumn(FAMILYNAME, specifiedQualifier);
341    scanner = region.getScanner(scan);
342    verifyCountAndOrder(scanner, ROWSIZE * 2, ROWSIZE, false);
343
344    // Case4:Specify qualifiers + Two families
345    for (byte[] specifiedQualifier : specifiedQualifiers)
346      scan.addColumn(FAMILYNAME2, specifiedQualifier);
347    scanner = region.getScanner(scan);
348    verifyCountAndOrder(scanner, ROWSIZE * 2 * 2, ROWSIZE, false);
349
350    // Case5: Case4 + specify start row
351    int startRowNum = ROWSIZE * 3 / 4;
352    scan.withStartRow(ROWS[startRowNum]);
353    scanner = region.getScanner(scan);
354    verifyCountAndOrder(scanner, (startRowNum + 1) * 2 * 2, (startRowNum + 1), false);
355
356    // Case6: Case4 + specify stop row
357    int stopRowNum = ROWSIZE / 4;
358    scan.withStartRow(HConstants.EMPTY_BYTE_ARRAY);
359    scan.withStopRow(ROWS[stopRowNum]);
360    scanner = region.getScanner(scan);
361    verifyCountAndOrder(scanner, (ROWSIZE - stopRowNum - 1) * 2 * 2, (ROWSIZE - stopRowNum - 1),
362      false);
363
364    // Case7: Case4 + specify start row + specify stop row
365    scan.withStartRow(ROWS[startRowNum]);
366    scanner = region.getScanner(scan);
367    verifyCountAndOrder(scanner, (startRowNum - stopRowNum) * 2 * 2, (startRowNum - stopRowNum),
368      false);
369
370    // Case8: Case7 + SingleColumnValueFilter
371    int valueNum = startRowNum % VALUESIZE;
372    Filter filter = new SingleColumnValueFilter(FAMILYNAME, specifiedQualifiers[0],
373      CompareOperator.EQUAL, VALUES[valueNum]);
374    scan.setFilter(filter);
375    scanner = region.getScanner(scan);
376    int unfilteredRowNum =
377      (startRowNum - stopRowNum) / VALUESIZE + (stopRowNum / VALUESIZE == valueNum ? 0 : 1);
378    verifyCountAndOrder(scanner, unfilteredRowNum * 2 * 2, unfilteredRowNum, false);
379
380    // Case9: Case7 + PageFilter
381    int pageSize = 10;
382    filter = new PageFilter(pageSize);
383    scan.setFilter(filter);
384    scanner = region.getScanner(scan);
385    int expectedRowNum = pageSize;
386    verifyCountAndOrder(scanner, expectedRowNum * 2 * 2, expectedRowNum, false);
387
388    // Case10: Case7 + FilterList+MUST_PASS_ONE
389    SingleColumnValueFilter scvFilter1 = new SingleColumnValueFilter(FAMILYNAME,
390      specifiedQualifiers[0], CompareOperator.EQUAL, VALUES[0]);
391    SingleColumnValueFilter scvFilter2 = new SingleColumnValueFilter(FAMILYNAME,
392      specifiedQualifiers[0], CompareOperator.EQUAL, VALUES[1]);
393    expectedRowNum = 0;
394    for (int i = startRowNum; i > stopRowNum; i--) {
395      if (i % VALUESIZE == 0 || i % VALUESIZE == 1) {
396        expectedRowNum++;
397      }
398    }
399    filter = new FilterList(Operator.MUST_PASS_ONE, scvFilter1, scvFilter2);
400    scan.setFilter(filter);
401    scanner = region.getScanner(scan);
402    verifyCountAndOrder(scanner, expectedRowNum * 2 * 2, expectedRowNum, false);
403
404    // Case10: Case7 + FilterList+MUST_PASS_ALL
405    filter = new FilterList(Operator.MUST_PASS_ALL, scvFilter1, scvFilter2);
406    expectedRowNum = 0;
407    scan.setFilter(filter);
408    scanner = region.getScanner(scan);
409    verifyCountAndOrder(scanner, expectedRowNum * 2 * 2, expectedRowNum, false);
410  }
411
412  private StoreScanner getReversibleStoreScanner(MemStore memstore, HStoreFile sf1, HStoreFile sf2,
413    Scan scan, ScanInfo scanInfo, int readPoint) throws IOException {
414    List<KeyValueScanner> scanners = getScanners(memstore, sf1, sf2, null, false, readPoint);
415    NavigableSet<byte[]> columns = null;
416    for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap().entrySet()) {
417      // Should only one family
418      columns = entry.getValue();
419    }
420    StoreScanner storeScanner = new ReversedStoreScanner(scan, scanInfo, columns, scanners);
421    return storeScanner;
422  }
423
424  private void verifyCountAndOrder(InternalScanner scanner, int expectedKVCount,
425    int expectedRowCount, boolean forward) throws IOException {
426    List<Cell> kvList = new ArrayList<>();
427    Result lastResult = null;
428    int rowCount = 0;
429    int kvCount = 0;
430    try {
431      while (scanner.next(kvList)) {
432        if (kvList.isEmpty()) continue;
433        rowCount++;
434        kvCount += kvList.size();
435        if (lastResult != null) {
436          Result curResult = Result.create(kvList);
437          assertEquals("LastResult:" + lastResult + "CurResult:" + curResult, forward,
438            Bytes.compareTo(curResult.getRow(), lastResult.getRow()) > 0);
439        }
440        lastResult = Result.create(kvList);
441        kvList.clear();
442      }
443    } finally {
444      scanner.close();
445    }
446    if (!kvList.isEmpty()) {
447      rowCount++;
448      kvCount += kvList.size();
449      kvList.clear();
450    }
451    assertEquals(expectedKVCount, kvCount);
452    assertEquals(expectedRowCount, rowCount);
453  }
454
455  private void internalTestSeekAndNextForReversibleKeyValueHeap(ReversedKeyValueHeap kvHeap,
456    int startRowNum) throws IOException {
457    // Test next and seek
458    for (int i = startRowNum; i >= 0; i--) {
459      if (i % 2 == 1 && i - 2 >= 0) {
460        i = i - 2;
461        kvHeap.seekToPreviousRow(KeyValueUtil.createFirstOnRow(ROWS[i + 1]));
462      }
463      for (int j = 0; j < QUALSIZE; j++) {
464        if (j % 2 == 1 && (j + 1) < QUALSIZE) {
465          j = j + 1;
466          kvHeap.backwardSeek(makeKV(i, j));
467        }
468        assertEquals(makeKV(i, j), kvHeap.peek());
469        kvHeap.next();
470      }
471    }
472    assertEquals(null, kvHeap.peek());
473  }
474
475  private ReversedKeyValueHeap getReversibleKeyValueHeap(MemStore memstore, HStoreFile sf1,
476    HStoreFile sf2, byte[] startRow, int readPoint) throws IOException {
477    List<KeyValueScanner> scanners = getScanners(memstore, sf1, sf2, startRow, true, readPoint);
478    ReversedKeyValueHeap kvHeap = new ReversedKeyValueHeap(scanners, CellComparatorImpl.COMPARATOR);
479    return kvHeap;
480  }
481
482  private List<KeyValueScanner> getScanners(MemStore memstore, HStoreFile sf1, HStoreFile sf2,
483    byte[] startRow, boolean doSeek, int readPoint) throws IOException {
484    List<StoreFileScanner> fileScanners = StoreFileScanner
485      .getScannersForStoreFiles(Lists.newArrayList(sf1, sf2), false, true, false, false, readPoint);
486    List<KeyValueScanner> memScanners = memstore.getScanners(readPoint);
487    List<KeyValueScanner> scanners = new ArrayList<>(fileScanners.size() + 1);
488    scanners.addAll(fileScanners);
489    scanners.addAll(memScanners);
490
491    if (doSeek) {
492      if (Bytes.equals(HConstants.EMPTY_START_ROW, startRow)) {
493        for (KeyValueScanner scanner : scanners) {
494          scanner.seekToLastRow();
495        }
496      } else {
497        KeyValue startKey = KeyValueUtil.createFirstOnRow(startRow);
498        for (KeyValueScanner scanner : scanners) {
499          scanner.backwardSeek(startKey);
500        }
501      }
502    }
503    return scanners;
504  }
505
506  private void seekTestOfReversibleKeyValueScanner(KeyValueScanner scanner) throws IOException {
507    /**
508     * Test without MVCC
509     */
510    // Test seek to last row
511    assertTrue(scanner.seekToLastRow());
512    assertEquals(makeKV(ROWSIZE - 1, 0), scanner.peek());
513
514    // Test backward seek in three cases
515    // Case1: seek in the same row in backwardSeek
516    KeyValue seekKey = makeKV(ROWSIZE - 2, QUALSIZE - 2);
517    assertTrue(scanner.backwardSeek(seekKey));
518    assertEquals(seekKey, scanner.peek());
519
520    // Case2: seek to the previous row in backwardSeek
521    int seekRowNum = ROWSIZE - 2;
522    assertTrue(scanner.backwardSeek(KeyValueUtil.createLastOnRow(ROWS[seekRowNum])));
523    KeyValue expectedKey = makeKV(seekRowNum - 1, 0);
524    assertEquals(expectedKey, scanner.peek());
525
526    // Case3: unable to backward seek
527    assertFalse(scanner.backwardSeek(KeyValueUtil.createLastOnRow(ROWS[0])));
528    assertEquals(null, scanner.peek());
529
530    // Test seek to previous row
531    seekRowNum = ROWSIZE - 4;
532    assertTrue(scanner.seekToPreviousRow(KeyValueUtil.createFirstOnRow(ROWS[seekRowNum])));
533    expectedKey = makeKV(seekRowNum - 1, 0);
534    assertEquals(expectedKey, scanner.peek());
535
536    // Test seek to previous row for the first row
537    assertFalse(scanner.seekToPreviousRow(makeKV(0, 0)));
538    assertEquals(null, scanner.peek());
539
540  }
541
542  private void seekTestOfReversibleKeyValueScannerWithMVCC(List<? extends KeyValueScanner> scanners,
543    int readPoint) throws IOException {
544    /**
545     * Test with MVCC
546     */
547    // Test seek to last row
548    KeyValue expectedKey = getNextReadableKeyValueWithBackwardScan(ROWSIZE - 1, 0, readPoint);
549    boolean res = false;
550    for (KeyValueScanner scanner : scanners) {
551      res |= scanner.seekToLastRow();
552    }
553    assertEquals(expectedKey != null, res);
554    res = false;
555    for (KeyValueScanner scanner : scanners) {
556      res |= (expectedKey.equals(scanner.peek()));
557    }
558    assertTrue(res);
559
560    // Test backward seek in two cases
561    // Case1: seek in the same row in backwardSeek
562    expectedKey = getNextReadableKeyValueWithBackwardScan(ROWSIZE - 2, QUALSIZE - 2, readPoint);
563    res = false;
564    for (KeyValueScanner scanner : scanners) {
565      res |= scanner.backwardSeek(expectedKey);
566    }
567    assertEquals(expectedKey != null, res);
568    res = false;
569    for (KeyValueScanner scanner : scanners) {
570      res |= (expectedKey.equals(scanner.peek()));
571    }
572    assertTrue(res);
573
574    // Case2: seek to the previous row in backwardSeek
575    int seekRowNum = ROWSIZE - 3;
576    res = false;
577    for (KeyValueScanner scanner : scanners) {
578      res |= scanner.backwardSeek(expectedKey);
579    }
580    res = false;
581    for (KeyValueScanner scanner : scanners) {
582      res |= (expectedKey.equals(scanner.peek()));
583    }
584    assertTrue(res);
585
586    // Test seek to previous row
587    seekRowNum = ROWSIZE - 4;
588    expectedKey = getNextReadableKeyValueWithBackwardScan(seekRowNum - 1, 0, readPoint);
589    res = false;
590    for (KeyValueScanner scanner : scanners) {
591      res |= scanner.seekToPreviousRow(KeyValueUtil.createFirstOnRow(ROWS[seekRowNum]));
592    }
593    assertEquals(expectedKey != null, res);
594    res = false;
595    for (KeyValueScanner scanner : scanners) {
596      res |= (expectedKey.equals(scanner.peek()));
597    }
598    assertTrue(res);
599  }
600
601  private KeyValue getNextReadableKeyValueWithBackwardScan(int startRowNum, int startQualNum,
602    int readPoint) {
603    Pair<Integer, Integer> nextReadableNum =
604      getNextReadableNumWithBackwardScan(startRowNum, startQualNum, readPoint);
605    if (nextReadableNum == null) return null;
606    return makeKV(nextReadableNum.getFirst(), nextReadableNum.getSecond());
607  }
608
609  private Pair<Integer, Integer> getNextReadableNumWithBackwardScan(int startRowNum,
610    int startQualNum, int readPoint) {
611    Pair<Integer, Integer> nextReadableNum = null;
612    boolean findExpected = false;
613    for (int i = startRowNum; i >= 0; i--) {
614      for (int j = (i == startRowNum ? startQualNum : 0); j < QUALSIZE; j++) {
615        if (makeMVCC(i, j) <= readPoint) {
616          nextReadableNum = new Pair<>(i, j);
617          findExpected = true;
618          break;
619        }
620      }
621      if (findExpected) break;
622    }
623    return nextReadableNum;
624  }
625
626  private static void loadDataToRegion(HRegion region, byte[] additionalFamily) throws IOException {
627    for (int i = 0; i < ROWSIZE; i++) {
628      Put put = new Put(ROWS[i]);
629      for (int j = 0; j < QUALSIZE; j++) {
630        put.add(makeKV(i, j));
631        // put additional family
632        put.add(makeKV(i, j, additionalFamily));
633      }
634      region.put(put);
635      if (i == ROWSIZE / 3 || i == ROWSIZE * 2 / 3) {
636        region.flush(true);
637      }
638    }
639  }
640
641  private static void writeMemstoreAndStoreFiles(MemStore memstore, final StoreFileWriter[] writers)
642    throws IOException {
643    try {
644      for (int i = 0; i < ROWSIZE; i++) {
645        for (int j = 0; j < QUALSIZE; j++) {
646          if (i % 2 == 0) {
647            memstore.add(makeKV(i, j), null);
648          } else {
649            writers[(i + j) % writers.length].append(makeKV(i, j));
650          }
651        }
652      }
653    } finally {
654      for (int i = 0; i < writers.length; i++) {
655        writers[i].close();
656      }
657    }
658  }
659
660  private static void writeStoreFile(final StoreFileWriter writer) throws IOException {
661    try {
662      for (int i = 0; i < ROWSIZE; i++) {
663        for (int j = 0; j < QUALSIZE; j++) {
664          writer.append(makeKV(i, j));
665        }
666      }
667    } finally {
668      writer.close();
669    }
670  }
671
672  private static void writeMemstore(MemStore memstore) throws IOException {
673    // Add half of the keyvalues to memstore
674    for (int i = 0; i < ROWSIZE; i++) {
675      for (int j = 0; j < QUALSIZE; j++) {
676        if ((i + j) % 2 == 0) {
677          memstore.add(makeKV(i, j), null);
678        }
679      }
680    }
681    memstore.snapshot();
682    // Add another half of the keyvalues to snapshot
683    for (int i = 0; i < ROWSIZE; i++) {
684      for (int j = 0; j < QUALSIZE; j++) {
685        if ((i + j) % 2 == 1) {
686          memstore.add(makeKV(i, j), null);
687        }
688      }
689    }
690  }
691
692  private static KeyValue makeKV(int rowNum, int cqNum) {
693    return makeKV(rowNum, cqNum, FAMILYNAME);
694  }
695
696  private static KeyValue makeKV(int rowNum, int cqNum, byte[] familyName) {
697    KeyValue kv =
698      new KeyValue(ROWS[rowNum], familyName, QUALS[cqNum], TS, VALUES[rowNum % VALUESIZE]);
699    kv.setSequenceId(makeMVCC(rowNum, cqNum));
700    return kv;
701  }
702
703  private static long makeMVCC(int rowNum, int cqNum) {
704    return (rowNum + cqNum) % (MAXMVCC + 1);
705  }
706
707  private static byte[][] makeN(byte[] base, int n) {
708    byte[][] ret = new byte[n][];
709    for (int i = 0; i < n; i++) {
710      ret[i] = Bytes.add(base, Bytes.toBytes(String.format("%04d", i)));
711    }
712    return ret;
713  }
714}