001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertFalse;
022import static org.junit.jupiter.api.Assertions.assertTrue;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.Collections;
027import java.util.List;
028import java.util.Map;
029import java.util.NavigableSet;
030import org.apache.hadoop.fs.FileSystem;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.Cell;
033import org.apache.hadoop.hbase.CellComparatorImpl;
034import org.apache.hadoop.hbase.CompareOperator;
035import org.apache.hadoop.hbase.HBaseTestingUtil;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.KeepDeletedCells;
038import org.apache.hadoop.hbase.KeyValue;
039import org.apache.hadoop.hbase.KeyValueUtil;
040import org.apache.hadoop.hbase.TableName;
041import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
042import org.apache.hadoop.hbase.client.Put;
043import org.apache.hadoop.hbase.client.Result;
044import org.apache.hadoop.hbase.client.Scan;
045import org.apache.hadoop.hbase.client.TableDescriptor;
046import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
047import org.apache.hadoop.hbase.filter.Filter;
048import org.apache.hadoop.hbase.filter.FilterList;
049import org.apache.hadoop.hbase.filter.FilterList.Operator;
050import org.apache.hadoop.hbase.filter.PageFilter;
051import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
052import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
053import org.apache.hadoop.hbase.io.hfile.CacheConfig;
054import org.apache.hadoop.hbase.io.hfile.HFileContext;
055import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
056import org.apache.hadoop.hbase.testclassification.MediumTests;
057import org.apache.hadoop.hbase.testclassification.RegionServerTests;
058import org.apache.hadoop.hbase.util.Bytes;
059import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
060import org.apache.hadoop.hbase.util.Pair;
061import org.junit.jupiter.api.BeforeAll;
062import org.junit.jupiter.api.Tag;
063import org.junit.jupiter.api.Test;
064import org.junit.jupiter.api.TestInfo;
065import org.slf4j.Logger;
066import org.slf4j.LoggerFactory;
067
068import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
069
070/**
071 * Test cases against ReversibleKeyValueScanner
072 */
073@Tag(RegionServerTests.TAG)
074@Tag(MediumTests.TAG)
075public class TestReversibleScanners {
076
077  private static final Logger LOG = LoggerFactory.getLogger(TestReversibleScanners.class);
078  HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
079
080  private static byte[] FAMILYNAME = Bytes.toBytes("testCf");
081  private static long TS = EnvironmentEdgeManager.currentTime();
082  private static int MAXMVCC = 7;
083  private static byte[] ROW = Bytes.toBytes("testRow");
084  private static final int ROWSIZE = 200;
085  private static byte[][] ROWS = makeN(ROW, ROWSIZE);
086  private static byte[] QUAL = Bytes.toBytes("testQual");
087  private static final int QUALSIZE = 5;
088  private static byte[][] QUALS = makeN(QUAL, QUALSIZE);
089  private static byte[] VALUE = Bytes.toBytes("testValue");
090  private static final int VALUESIZE = 3;
091  private static byte[][] VALUES = makeN(VALUE, VALUESIZE);
092
093  @BeforeAll
094  public static void setUp() {
095    ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null,
096      MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
097  }
098
099  @Test
100  public void testReversibleStoreFileScanner() throws IOException {
101    FileSystem fs = TEST_UTIL.getTestFileSystem();
102    Path hfilePath =
103      new Path(new Path(TEST_UTIL.getDataTestDir("testReversibleStoreFileScanner"), "regionname"),
104        "familyname");
105    CacheConfig cacheConf = new CacheConfig(TEST_UTIL.getConfiguration());
106    for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
107      HFileContextBuilder hcBuilder = new HFileContextBuilder();
108      hcBuilder.withBlockSize(2 * 1024);
109      hcBuilder.withDataBlockEncoding(encoding);
110      HFileContext hFileContext = hcBuilder.build();
111      StoreFileWriter writer =
112        new StoreFileWriter.Builder(TEST_UTIL.getConfiguration(), cacheConf, fs)
113          .withOutputDir(hfilePath).withFileContext(hFileContext).build();
114      writeStoreFile(writer);
115
116      StoreFileInfo storeFileInfo = StoreFileInfo
117        .createStoreFileInfoForHFile(TEST_UTIL.getConfiguration(), fs, writer.getPath(), true);
118      HStoreFile sf = new HStoreFile(storeFileInfo, BloomType.NONE, cacheConf);
119
120      List<StoreFileScanner> scanners = StoreFileScanner.getScannersForStoreFiles(
121        Collections.singletonList(sf), false, true, false, false, Long.MAX_VALUE);
122      StoreFileScanner scanner = scanners.get(0);
123      seekTestOfReversibleKeyValueScanner(scanner);
124      for (int readPoint = 0; readPoint < MAXMVCC; readPoint++) {
125        LOG.info("Setting read point to " + readPoint);
126        scanners = StoreFileScanner.getScannersForStoreFiles(Collections.singletonList(sf), false,
127          true, false, false, readPoint);
128        seekTestOfReversibleKeyValueScannerWithMVCC(scanners, readPoint);
129      }
130    }
131
132  }
133
134  @Test
135  public void testReversibleMemstoreScanner() throws IOException {
136    MemStore memstore = new DefaultMemStore();
137    writeMemstore(memstore);
138    List<KeyValueScanner> scanners = memstore.getScanners(Long.MAX_VALUE);
139    seekTestOfReversibleKeyValueScanner(scanners.get(0));
140    for (int readPoint = 0; readPoint < MAXMVCC; readPoint++) {
141      LOG.info("Setting read point to " + readPoint);
142      scanners = memstore.getScanners(readPoint);
143      seekTestOfReversibleKeyValueScannerWithMVCC(scanners, readPoint);
144    }
145
146  }
147
148  @Test
149  public void testReversibleKeyValueHeap() throws IOException {
150    // write data to one memstore and two store files
151    FileSystem fs = TEST_UTIL.getTestFileSystem();
152    Path hfilePath = new Path(
153      new Path(TEST_UTIL.getDataTestDir("testReversibleKeyValueHeap"), "regionname"), "familyname");
154    CacheConfig cacheConf = new CacheConfig(TEST_UTIL.getConfiguration());
155    HFileContextBuilder hcBuilder = new HFileContextBuilder();
156    hcBuilder.withBlockSize(2 * 1024);
157    HFileContext hFileContext = hcBuilder.build();
158    StoreFileWriter writer1 =
159      new StoreFileWriter.Builder(TEST_UTIL.getConfiguration(), cacheConf, fs)
160        .withOutputDir(hfilePath).withFileContext(hFileContext).build();
161    StoreFileWriter writer2 =
162      new StoreFileWriter.Builder(TEST_UTIL.getConfiguration(), cacheConf, fs)
163        .withOutputDir(hfilePath).withFileContext(hFileContext).build();
164
165    MemStore memstore = new DefaultMemStore();
166    writeMemstoreAndStoreFiles(memstore, new StoreFileWriter[] { writer1, writer2 });
167
168    StoreFileInfo storeFileInfo1 = StoreFileInfo
169      .createStoreFileInfoForHFile(TEST_UTIL.getConfiguration(), fs, writer1.getPath(), true);
170    HStoreFile sf1 = new HStoreFile(storeFileInfo1, BloomType.NONE, cacheConf);
171
172    StoreFileInfo storeFileInfo2 = StoreFileInfo
173      .createStoreFileInfoForHFile(TEST_UTIL.getConfiguration(), fs, writer2.getPath(), true);
174    HStoreFile sf2 = new HStoreFile(storeFileInfo2, BloomType.NONE, cacheConf);
175    /**
176     * Test without MVCC
177     */
178    int startRowNum = ROWSIZE / 2;
179    ReversedKeyValueHeap kvHeap =
180      getReversibleKeyValueHeap(memstore, sf1, sf2, ROWS[startRowNum], MAXMVCC);
181    internalTestSeekAndNextForReversibleKeyValueHeap(kvHeap, startRowNum);
182
183    startRowNum = ROWSIZE - 1;
184    kvHeap = getReversibleKeyValueHeap(memstore, sf1, sf2, HConstants.EMPTY_START_ROW, MAXMVCC);
185    internalTestSeekAndNextForReversibleKeyValueHeap(kvHeap, startRowNum);
186
187    /**
188     * Test with MVCC
189     */
190    for (int readPoint = 0; readPoint < MAXMVCC; readPoint++) {
191      LOG.info("Setting read point to " + readPoint);
192      startRowNum = ROWSIZE - 1;
193      kvHeap = getReversibleKeyValueHeap(memstore, sf1, sf2, HConstants.EMPTY_START_ROW, readPoint);
194      for (int i = startRowNum; i >= 0; i--) {
195        if (i - 2 < 0) break;
196        i = i - 2;
197        kvHeap.seekToPreviousRow(KeyValueUtil.createFirstOnRow(ROWS[i + 1]));
198        Pair<Integer, Integer> nextReadableNum =
199          getNextReadableNumWithBackwardScan(i, 0, readPoint);
200        if (nextReadableNum == null) break;
201        KeyValue expecedKey = makeKV(nextReadableNum.getFirst(), nextReadableNum.getSecond());
202        assertEquals(expecedKey, kvHeap.peek());
203        i = nextReadableNum.getFirst();
204        int qualNum = nextReadableNum.getSecond();
205        if (qualNum + 1 < QUALSIZE) {
206          kvHeap.backwardSeek(makeKV(i, qualNum + 1));
207          nextReadableNum = getNextReadableNumWithBackwardScan(i, qualNum + 1, readPoint);
208          if (nextReadableNum == null) break;
209          expecedKey = makeKV(nextReadableNum.getFirst(), nextReadableNum.getSecond());
210          assertEquals(expecedKey, kvHeap.peek());
211          i = nextReadableNum.getFirst();
212          qualNum = nextReadableNum.getSecond();
213        }
214
215        kvHeap.next();
216
217        if (qualNum + 1 >= QUALSIZE) {
218          nextReadableNum = getNextReadableNumWithBackwardScan(i - 1, 0, readPoint);
219        } else {
220          nextReadableNum = getNextReadableNumWithBackwardScan(i, qualNum + 1, readPoint);
221        }
222        if (nextReadableNum == null) break;
223        expecedKey = makeKV(nextReadableNum.getFirst(), nextReadableNum.getSecond());
224        assertEquals(expecedKey, kvHeap.peek());
225        i = nextReadableNum.getFirst();
226      }
227    }
228  }
229
230  @Test
231  public void testReversibleStoreScanner() throws IOException {
232    // write data to one memstore and two store files
233    FileSystem fs = TEST_UTIL.getTestFileSystem();
234    Path hfilePath = new Path(
235      new Path(TEST_UTIL.getDataTestDir("testReversibleStoreScanner"), "regionname"), "familyname");
236    CacheConfig cacheConf = new CacheConfig(TEST_UTIL.getConfiguration());
237    HFileContextBuilder hcBuilder = new HFileContextBuilder();
238    hcBuilder.withBlockSize(2 * 1024);
239    HFileContext hFileContext = hcBuilder.build();
240    StoreFileWriter writer1 =
241      new StoreFileWriter.Builder(TEST_UTIL.getConfiguration(), cacheConf, fs)
242        .withOutputDir(hfilePath).withFileContext(hFileContext).build();
243    StoreFileWriter writer2 =
244      new StoreFileWriter.Builder(TEST_UTIL.getConfiguration(), cacheConf, fs)
245        .withOutputDir(hfilePath).withFileContext(hFileContext).build();
246
247    MemStore memstore = new DefaultMemStore();
248    writeMemstoreAndStoreFiles(memstore, new StoreFileWriter[] { writer1, writer2 });
249
250    StoreFileInfo storeFileInfo1 = StoreFileInfo
251      .createStoreFileInfoForHFile(TEST_UTIL.getConfiguration(), fs, writer1.getPath(), true);
252    HStoreFile sf1 = new HStoreFile(storeFileInfo1, BloomType.NONE, cacheConf);
253
254    StoreFileInfo storeFileInfo2 = StoreFileInfo
255      .createStoreFileInfoForHFile(TEST_UTIL.getConfiguration(), fs, writer2.getPath(), true);
256    HStoreFile sf2 = new HStoreFile(storeFileInfo2, BloomType.NONE, cacheConf);
257
258    ScanInfo scanInfo = new ScanInfo(TEST_UTIL.getConfiguration(), FAMILYNAME, 0, Integer.MAX_VALUE,
259      Long.MAX_VALUE, KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0,
260      CellComparatorImpl.COMPARATOR, false);
261
262    // Case 1.Test a full reversed scan
263    Scan scan = new Scan();
264    scan.setReversed(true);
265    StoreScanner storeScanner =
266      getReversibleStoreScanner(memstore, sf1, sf2, scan, scanInfo, MAXMVCC);
267    verifyCountAndOrder(storeScanner, QUALSIZE * ROWSIZE, ROWSIZE, false);
268
269    // Case 2.Test reversed scan with a specified start row
270    int startRowNum = ROWSIZE / 2;
271    byte[] startRow = ROWS[startRowNum];
272    scan.withStartRow(startRow);
273    storeScanner = getReversibleStoreScanner(memstore, sf1, sf2, scan, scanInfo, MAXMVCC);
274    verifyCountAndOrder(storeScanner, QUALSIZE * (startRowNum + 1), startRowNum + 1, false);
275
276    // Case 3.Test reversed scan with a specified start row and specified
277    // qualifiers
278    assertTrue(QUALSIZE > 2);
279    scan.addColumn(FAMILYNAME, QUALS[0]);
280    scan.addColumn(FAMILYNAME, QUALS[2]);
281    storeScanner = getReversibleStoreScanner(memstore, sf1, sf2, scan, scanInfo, MAXMVCC);
282    verifyCountAndOrder(storeScanner, 2 * (startRowNum + 1), startRowNum + 1, false);
283
284    // Case 4.Test reversed scan with mvcc based on case 3
285    for (int readPoint = 0; readPoint < MAXMVCC; readPoint++) {
286      LOG.info("Setting read point to " + readPoint);
287      storeScanner = getReversibleStoreScanner(memstore, sf1, sf2, scan, scanInfo, readPoint);
288      int expectedRowCount = 0;
289      int expectedKVCount = 0;
290      for (int i = startRowNum; i >= 0; i--) {
291        int kvCount = 0;
292        if (makeMVCC(i, 0) <= readPoint) {
293          kvCount++;
294        }
295        if (makeMVCC(i, 2) <= readPoint) {
296          kvCount++;
297        }
298        if (kvCount > 0) {
299          expectedRowCount++;
300          expectedKVCount += kvCount;
301        }
302      }
303      verifyCountAndOrder(storeScanner, expectedKVCount, expectedRowCount, false);
304    }
305  }
306
307  @Test
308  public void testReversibleRegionScanner(TestInfo testInfo) throws IOException {
309    byte[] FAMILYNAME2 = Bytes.toBytes("testCf2");
310    TableDescriptor htd =
311      TableDescriptorBuilder.newBuilder(TableName.valueOf(testInfo.getTestMethod().get().getName()))
312        .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILYNAME))
313        .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILYNAME2)).build();
314    HRegion region = TEST_UTIL.createLocalHRegion(htd, null, null);
315    loadDataToRegion(region, FAMILYNAME2);
316
317    // verify row count with forward scan
318    Scan scan = new Scan();
319    InternalScanner scanner = region.getScanner(scan);
320    verifyCountAndOrder(scanner, ROWSIZE * QUALSIZE * 2, ROWSIZE, true);
321
322    // Case1:Full reversed scan
323    scan.setReversed(true);
324    scanner = region.getScanner(scan);
325    verifyCountAndOrder(scanner, ROWSIZE * QUALSIZE * 2, ROWSIZE, false);
326
327    // Case2:Full reversed scan with one family
328    scan = new Scan();
329    scan.setReversed(true);
330    scan.addFamily(FAMILYNAME);
331    scanner = region.getScanner(scan);
332    verifyCountAndOrder(scanner, ROWSIZE * QUALSIZE, ROWSIZE, false);
333
334    // Case3:Specify qualifiers + One family
335    byte[][] specifiedQualifiers = { QUALS[1], QUALS[2] };
336    for (byte[] specifiedQualifier : specifiedQualifiers)
337      scan.addColumn(FAMILYNAME, specifiedQualifier);
338    scanner = region.getScanner(scan);
339    verifyCountAndOrder(scanner, ROWSIZE * 2, ROWSIZE, false);
340
341    // Case4:Specify qualifiers + Two families
342    for (byte[] specifiedQualifier : specifiedQualifiers)
343      scan.addColumn(FAMILYNAME2, specifiedQualifier);
344    scanner = region.getScanner(scan);
345    verifyCountAndOrder(scanner, ROWSIZE * 2 * 2, ROWSIZE, false);
346
347    // Case5: Case4 + specify start row
348    int startRowNum = ROWSIZE * 3 / 4;
349    scan.withStartRow(ROWS[startRowNum]);
350    scanner = region.getScanner(scan);
351    verifyCountAndOrder(scanner, (startRowNum + 1) * 2 * 2, (startRowNum + 1), false);
352
353    // Case6: Case4 + specify stop row
354    int stopRowNum = ROWSIZE / 4;
355    scan.withStartRow(HConstants.EMPTY_BYTE_ARRAY);
356    scan.withStopRow(ROWS[stopRowNum]);
357    scanner = region.getScanner(scan);
358    verifyCountAndOrder(scanner, (ROWSIZE - stopRowNum - 1) * 2 * 2, (ROWSIZE - stopRowNum - 1),
359      false);
360
361    // Case7: Case4 + specify start row + specify stop row
362    scan.withStartRow(ROWS[startRowNum]);
363    scanner = region.getScanner(scan);
364    verifyCountAndOrder(scanner, (startRowNum - stopRowNum) * 2 * 2, (startRowNum - stopRowNum),
365      false);
366
367    // Case8: Case7 + SingleColumnValueFilter
368    int valueNum = startRowNum % VALUESIZE;
369    Filter filter = new SingleColumnValueFilter(FAMILYNAME, specifiedQualifiers[0],
370      CompareOperator.EQUAL, VALUES[valueNum]);
371    scan.setFilter(filter);
372    scanner = region.getScanner(scan);
373    int unfilteredRowNum =
374      (startRowNum - stopRowNum) / VALUESIZE + (stopRowNum / VALUESIZE == valueNum ? 0 : 1);
375    verifyCountAndOrder(scanner, unfilteredRowNum * 2 * 2, unfilteredRowNum, false);
376
377    // Case9: Case7 + PageFilter
378    int pageSize = 10;
379    filter = new PageFilter(pageSize);
380    scan.setFilter(filter);
381    scanner = region.getScanner(scan);
382    int expectedRowNum = pageSize;
383    verifyCountAndOrder(scanner, expectedRowNum * 2 * 2, expectedRowNum, false);
384
385    // Case10: Case7 + FilterList+MUST_PASS_ONE
386    SingleColumnValueFilter scvFilter1 = new SingleColumnValueFilter(FAMILYNAME,
387      specifiedQualifiers[0], CompareOperator.EQUAL, VALUES[0]);
388    SingleColumnValueFilter scvFilter2 = new SingleColumnValueFilter(FAMILYNAME,
389      specifiedQualifiers[0], CompareOperator.EQUAL, VALUES[1]);
390    expectedRowNum = 0;
391    for (int i = startRowNum; i > stopRowNum; i--) {
392      if (i % VALUESIZE == 0 || i % VALUESIZE == 1) {
393        expectedRowNum++;
394      }
395    }
396    filter = new FilterList(Operator.MUST_PASS_ONE, scvFilter1, scvFilter2);
397    scan.setFilter(filter);
398    scanner = region.getScanner(scan);
399    verifyCountAndOrder(scanner, expectedRowNum * 2 * 2, expectedRowNum, false);
400
401    // Case10: Case7 + FilterList+MUST_PASS_ALL
402    filter = new FilterList(Operator.MUST_PASS_ALL, scvFilter1, scvFilter2);
403    expectedRowNum = 0;
404    scan.setFilter(filter);
405    scanner = region.getScanner(scan);
406    verifyCountAndOrder(scanner, expectedRowNum * 2 * 2, expectedRowNum, false);
407  }
408
409  private StoreScanner getReversibleStoreScanner(MemStore memstore, HStoreFile sf1, HStoreFile sf2,
410    Scan scan, ScanInfo scanInfo, int readPoint) throws IOException {
411    List<KeyValueScanner> scanners = getScanners(memstore, sf1, sf2, null, false, readPoint);
412    NavigableSet<byte[]> columns = null;
413    for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap().entrySet()) {
414      // Should only one family
415      columns = entry.getValue();
416    }
417    StoreScanner storeScanner = new ReversedStoreScanner(scan, scanInfo, columns, scanners);
418    return storeScanner;
419  }
420
421  private void verifyCountAndOrder(InternalScanner scanner, int expectedKVCount,
422    int expectedRowCount, boolean forward) throws IOException {
423    List<Cell> kvList = new ArrayList<>();
424    Result lastResult = null;
425    int rowCount = 0;
426    int kvCount = 0;
427    try {
428      while (scanner.next(kvList)) {
429        if (kvList.isEmpty()) continue;
430        rowCount++;
431        kvCount += kvList.size();
432        if (lastResult != null) {
433          Result curResult = Result.create(kvList);
434          assertEquals(forward, Bytes.compareTo(curResult.getRow(), lastResult.getRow()) > 0,
435            "LastResult:" + lastResult + "CurResult:" + curResult);
436        }
437        lastResult = Result.create(kvList);
438        kvList.clear();
439      }
440    } finally {
441      scanner.close();
442    }
443    if (!kvList.isEmpty()) {
444      rowCount++;
445      kvCount += kvList.size();
446      kvList.clear();
447    }
448    assertEquals(expectedKVCount, kvCount);
449    assertEquals(expectedRowCount, rowCount);
450  }
451
452  private void internalTestSeekAndNextForReversibleKeyValueHeap(ReversedKeyValueHeap kvHeap,
453    int startRowNum) throws IOException {
454    // Test next and seek
455    for (int i = startRowNum; i >= 0; i--) {
456      if (i % 2 == 1 && i - 2 >= 0) {
457        i = i - 2;
458        kvHeap.seekToPreviousRow(KeyValueUtil.createFirstOnRow(ROWS[i + 1]));
459      }
460      for (int j = 0; j < QUALSIZE; j++) {
461        if (j % 2 == 1 && (j + 1) < QUALSIZE) {
462          j = j + 1;
463          kvHeap.backwardSeek(makeKV(i, j));
464        }
465        assertEquals(makeKV(i, j), kvHeap.peek());
466        kvHeap.next();
467      }
468    }
469    assertEquals(null, kvHeap.peek());
470  }
471
472  private ReversedKeyValueHeap getReversibleKeyValueHeap(MemStore memstore, HStoreFile sf1,
473    HStoreFile sf2, byte[] startRow, int readPoint) throws IOException {
474    List<KeyValueScanner> scanners = getScanners(memstore, sf1, sf2, startRow, true, readPoint);
475    ReversedKeyValueHeap kvHeap = new ReversedKeyValueHeap(scanners, CellComparatorImpl.COMPARATOR);
476    return kvHeap;
477  }
478
479  private List<KeyValueScanner> getScanners(MemStore memstore, HStoreFile sf1, HStoreFile sf2,
480    byte[] startRow, boolean doSeek, int readPoint) throws IOException {
481    List<StoreFileScanner> fileScanners = StoreFileScanner
482      .getScannersForStoreFiles(Lists.newArrayList(sf1, sf2), false, true, false, false, readPoint);
483    List<KeyValueScanner> memScanners = memstore.getScanners(readPoint);
484    List<KeyValueScanner> scanners = new ArrayList<>(fileScanners.size() + 1);
485    scanners.addAll(fileScanners);
486    scanners.addAll(memScanners);
487
488    if (doSeek) {
489      if (Bytes.equals(HConstants.EMPTY_START_ROW, startRow)) {
490        for (KeyValueScanner scanner : scanners) {
491          scanner.seekToLastRow();
492        }
493      } else {
494        KeyValue startKey = KeyValueUtil.createFirstOnRow(startRow);
495        for (KeyValueScanner scanner : scanners) {
496          scanner.backwardSeek(startKey);
497        }
498      }
499    }
500    return scanners;
501  }
502
503  private void seekTestOfReversibleKeyValueScanner(KeyValueScanner scanner) throws IOException {
504    /**
505     * Test without MVCC
506     */
507    // Test seek to last row
508    assertTrue(scanner.seekToLastRow());
509    assertEquals(makeKV(ROWSIZE - 1, 0), scanner.peek());
510
511    // Test backward seek in three cases
512    // Case1: seek in the same row in backwardSeek
513    KeyValue seekKey = makeKV(ROWSIZE - 2, QUALSIZE - 2);
514    assertTrue(scanner.backwardSeek(seekKey));
515    assertEquals(seekKey, scanner.peek());
516
517    // Case2: seek to the previous row in backwardSeek
518    int seekRowNum = ROWSIZE - 2;
519    assertTrue(scanner.backwardSeek(KeyValueUtil.createLastOnRow(ROWS[seekRowNum])));
520    KeyValue expectedKey = makeKV(seekRowNum - 1, 0);
521    assertEquals(expectedKey, scanner.peek());
522
523    // Case3: unable to backward seek
524    assertFalse(scanner.backwardSeek(KeyValueUtil.createLastOnRow(ROWS[0])));
525    assertEquals(null, scanner.peek());
526
527    // Test seek to previous row
528    seekRowNum = ROWSIZE - 4;
529    assertTrue(scanner.seekToPreviousRow(KeyValueUtil.createFirstOnRow(ROWS[seekRowNum])));
530    expectedKey = makeKV(seekRowNum - 1, 0);
531    assertEquals(expectedKey, scanner.peek());
532
533    // Test seek to previous row for the first row
534    assertFalse(scanner.seekToPreviousRow(makeKV(0, 0)));
535    assertEquals(null, scanner.peek());
536
537  }
538
539  private void seekTestOfReversibleKeyValueScannerWithMVCC(List<? extends KeyValueScanner> scanners,
540    int readPoint) throws IOException {
541    /**
542     * Test with MVCC
543     */
544    // Test seek to last row
545    KeyValue expectedKey = getNextReadableKeyValueWithBackwardScan(ROWSIZE - 1, 0, readPoint);
546    boolean res = false;
547    for (KeyValueScanner scanner : scanners) {
548      res |= scanner.seekToLastRow();
549    }
550    assertEquals(expectedKey != null, res);
551    res = false;
552    for (KeyValueScanner scanner : scanners) {
553      res |= (expectedKey.equals(scanner.peek()));
554    }
555    assertTrue(res);
556
557    // Test backward seek in two cases
558    // Case1: seek in the same row in backwardSeek
559    expectedKey = getNextReadableKeyValueWithBackwardScan(ROWSIZE - 2, QUALSIZE - 2, readPoint);
560    res = false;
561    for (KeyValueScanner scanner : scanners) {
562      res |= scanner.backwardSeek(expectedKey);
563    }
564    assertEquals(expectedKey != null, res);
565    res = false;
566    for (KeyValueScanner scanner : scanners) {
567      res |= (expectedKey.equals(scanner.peek()));
568    }
569    assertTrue(res);
570
571    // Case2: seek to the previous row in backwardSeek
572    int seekRowNum = ROWSIZE - 3;
573    res = false;
574    for (KeyValueScanner scanner : scanners) {
575      res |= scanner.backwardSeek(expectedKey);
576    }
577    res = false;
578    for (KeyValueScanner scanner : scanners) {
579      res |= (expectedKey.equals(scanner.peek()));
580    }
581    assertTrue(res);
582
583    // Test seek to previous row
584    seekRowNum = ROWSIZE - 4;
585    expectedKey = getNextReadableKeyValueWithBackwardScan(seekRowNum - 1, 0, readPoint);
586    res = false;
587    for (KeyValueScanner scanner : scanners) {
588      res |= scanner.seekToPreviousRow(KeyValueUtil.createFirstOnRow(ROWS[seekRowNum]));
589    }
590    assertEquals(expectedKey != null, res);
591    res = false;
592    for (KeyValueScanner scanner : scanners) {
593      res |= (expectedKey.equals(scanner.peek()));
594    }
595    assertTrue(res);
596  }
597
598  private KeyValue getNextReadableKeyValueWithBackwardScan(int startRowNum, int startQualNum,
599    int readPoint) {
600    Pair<Integer, Integer> nextReadableNum =
601      getNextReadableNumWithBackwardScan(startRowNum, startQualNum, readPoint);
602    if (nextReadableNum == null) return null;
603    return makeKV(nextReadableNum.getFirst(), nextReadableNum.getSecond());
604  }
605
606  private Pair<Integer, Integer> getNextReadableNumWithBackwardScan(int startRowNum,
607    int startQualNum, int readPoint) {
608    Pair<Integer, Integer> nextReadableNum = null;
609    boolean findExpected = false;
610    for (int i = startRowNum; i >= 0; i--) {
611      for (int j = (i == startRowNum ? startQualNum : 0); j < QUALSIZE; j++) {
612        if (makeMVCC(i, j) <= readPoint) {
613          nextReadableNum = new Pair<>(i, j);
614          findExpected = true;
615          break;
616        }
617      }
618      if (findExpected) break;
619    }
620    return nextReadableNum;
621  }
622
623  private static void loadDataToRegion(HRegion region, byte[] additionalFamily) throws IOException {
624    for (int i = 0; i < ROWSIZE; i++) {
625      Put put = new Put(ROWS[i]);
626      for (int j = 0; j < QUALSIZE; j++) {
627        put.add(makeKV(i, j));
628        // put additional family
629        put.add(makeKV(i, j, additionalFamily));
630      }
631      region.put(put);
632      if (i == ROWSIZE / 3 || i == ROWSIZE * 2 / 3) {
633        region.flush(true);
634      }
635    }
636  }
637
638  private static void writeMemstoreAndStoreFiles(MemStore memstore, final StoreFileWriter[] writers)
639    throws IOException {
640    try {
641      for (int i = 0; i < ROWSIZE; i++) {
642        for (int j = 0; j < QUALSIZE; j++) {
643          if (i % 2 == 0) {
644            memstore.add(makeKV(i, j), null);
645          } else {
646            writers[(i + j) % writers.length].append(makeKV(i, j));
647          }
648        }
649      }
650    } finally {
651      for (int i = 0; i < writers.length; i++) {
652        writers[i].close();
653      }
654    }
655  }
656
657  private static void writeStoreFile(final StoreFileWriter writer) throws IOException {
658    try {
659      for (int i = 0; i < ROWSIZE; i++) {
660        for (int j = 0; j < QUALSIZE; j++) {
661          writer.append(makeKV(i, j));
662        }
663      }
664    } finally {
665      writer.close();
666    }
667  }
668
669  private static void writeMemstore(MemStore memstore) throws IOException {
670    // Add half of the keyvalues to memstore
671    for (int i = 0; i < ROWSIZE; i++) {
672      for (int j = 0; j < QUALSIZE; j++) {
673        if ((i + j) % 2 == 0) {
674          memstore.add(makeKV(i, j), null);
675        }
676      }
677    }
678    memstore.snapshot();
679    // Add another half of the keyvalues to snapshot
680    for (int i = 0; i < ROWSIZE; i++) {
681      for (int j = 0; j < QUALSIZE; j++) {
682        if ((i + j) % 2 == 1) {
683          memstore.add(makeKV(i, j), null);
684        }
685      }
686    }
687  }
688
689  private static KeyValue makeKV(int rowNum, int cqNum) {
690    return makeKV(rowNum, cqNum, FAMILYNAME);
691  }
692
693  private static KeyValue makeKV(int rowNum, int cqNum, byte[] familyName) {
694    KeyValue kv =
695      new KeyValue(ROWS[rowNum], familyName, QUALS[cqNum], TS, VALUES[rowNum % VALUESIZE]);
696    kv.setSequenceId(makeMVCC(rowNum, cqNum));
697    return kv;
698  }
699
700  private static long makeMVCC(int rowNum, int cqNum) {
701    return (rowNum + cqNum) % (MAXMVCC + 1);
702  }
703
704  private static byte[][] makeN(byte[] base, int n) {
705    byte[][] ret = new byte[n][];
706    for (int i = 0; i < n; i++) {
707      ret[i] = Bytes.add(base, Bytes.toBytes(String.format("%04d", i)));
708    }
709    return ret;
710  }
711}