001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.Collections; 027import java.util.List; 028import java.util.Map; 029import java.util.NavigableSet; 030import org.apache.hadoop.fs.FileSystem; 031import org.apache.hadoop.fs.Path; 032import org.apache.hadoop.hbase.Cell; 033import org.apache.hadoop.hbase.CellComparatorImpl; 034import org.apache.hadoop.hbase.CompareOperator; 035import org.apache.hadoop.hbase.HBaseClassTestRule; 036import org.apache.hadoop.hbase.HBaseTestingUtil; 037import org.apache.hadoop.hbase.HConstants; 038import org.apache.hadoop.hbase.KeepDeletedCells; 039import org.apache.hadoop.hbase.KeyValue; 040import org.apache.hadoop.hbase.KeyValueUtil; 041import org.apache.hadoop.hbase.TableName; 042import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 043import org.apache.hadoop.hbase.client.Put; 044import org.apache.hadoop.hbase.client.Result; 045import org.apache.hadoop.hbase.client.Scan; 046import org.apache.hadoop.hbase.client.TableDescriptor; 047import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 048import org.apache.hadoop.hbase.filter.Filter; 049import org.apache.hadoop.hbase.filter.FilterList; 050import org.apache.hadoop.hbase.filter.FilterList.Operator; 051import org.apache.hadoop.hbase.filter.PageFilter; 052import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; 053import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 054import org.apache.hadoop.hbase.io.hfile.CacheConfig; 055import org.apache.hadoop.hbase.io.hfile.HFileContext; 056import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 057import org.apache.hadoop.hbase.testclassification.MediumTests; 058import org.apache.hadoop.hbase.testclassification.RegionServerTests; 059import org.apache.hadoop.hbase.util.Bytes; 060import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 061import org.apache.hadoop.hbase.util.Pair; 062import org.junit.BeforeClass; 063import org.junit.ClassRule; 064import org.junit.Rule; 065import org.junit.Test; 066import org.junit.experimental.categories.Category; 067import org.junit.rules.TestName; 068import org.slf4j.Logger; 069import org.slf4j.LoggerFactory; 070 071import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 072 073/** 074 * Test cases against ReversibleKeyValueScanner 075 */ 076@Category({ RegionServerTests.class, MediumTests.class }) 077public class TestReversibleScanners { 078 079 @ClassRule 080 public static final HBaseClassTestRule CLASS_RULE = 081 HBaseClassTestRule.forClass(TestReversibleScanners.class); 082 083 private static final Logger LOG = LoggerFactory.getLogger(TestReversibleScanners.class); 084 HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 085 086 private static byte[] FAMILYNAME = Bytes.toBytes("testCf"); 087 private static long TS = EnvironmentEdgeManager.currentTime(); 088 private static int MAXMVCC = 7; 089 private static byte[] ROW = Bytes.toBytes("testRow"); 090 private static final int ROWSIZE = 200; 091 private static byte[][] ROWS = makeN(ROW, ROWSIZE); 092 private static byte[] QUAL = Bytes.toBytes("testQual"); 093 private static final int QUALSIZE = 5; 094 private static byte[][] QUALS = makeN(QUAL, QUALSIZE); 095 private static byte[] VALUE = Bytes.toBytes("testValue"); 096 private static final int VALUESIZE = 3; 097 private static byte[][] VALUES = makeN(VALUE, VALUESIZE); 098 099 @Rule 100 public TestName name = new TestName(); 101 102 @BeforeClass 103 public static void setUp() { 104 ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null, 105 MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); 106 } 107 108 @Test 109 public void testReversibleStoreFileScanner() throws IOException { 110 FileSystem fs = TEST_UTIL.getTestFileSystem(); 111 Path hfilePath = 112 new Path(new Path(TEST_UTIL.getDataTestDir("testReversibleStoreFileScanner"), "regionname"), 113 "familyname"); 114 CacheConfig cacheConf = new CacheConfig(TEST_UTIL.getConfiguration()); 115 for (DataBlockEncoding encoding : DataBlockEncoding.values()) { 116 HFileContextBuilder hcBuilder = new HFileContextBuilder(); 117 hcBuilder.withBlockSize(2 * 1024); 118 hcBuilder.withDataBlockEncoding(encoding); 119 HFileContext hFileContext = hcBuilder.build(); 120 StoreFileWriter writer = 121 new StoreFileWriter.Builder(TEST_UTIL.getConfiguration(), cacheConf, fs) 122 .withOutputDir(hfilePath).withFileContext(hFileContext).build(); 123 writeStoreFile(writer); 124 125 HStoreFile sf = new HStoreFile(fs, writer.getPath(), TEST_UTIL.getConfiguration(), cacheConf, 126 BloomType.NONE, true); 127 128 List<StoreFileScanner> scanners = StoreFileScanner.getScannersForStoreFiles( 129 Collections.singletonList(sf), false, true, false, false, Long.MAX_VALUE); 130 StoreFileScanner scanner = scanners.get(0); 131 seekTestOfReversibleKeyValueScanner(scanner); 132 for (int readPoint = 0; readPoint < MAXMVCC; readPoint++) { 133 LOG.info("Setting read point to " + readPoint); 134 scanners = StoreFileScanner.getScannersForStoreFiles(Collections.singletonList(sf), false, 135 true, false, false, readPoint); 136 seekTestOfReversibleKeyValueScannerWithMVCC(scanners, readPoint); 137 } 138 } 139 140 } 141 142 @Test 143 public void testReversibleMemstoreScanner() throws IOException { 144 MemStore memstore = new DefaultMemStore(); 145 writeMemstore(memstore); 146 List<KeyValueScanner> scanners = memstore.getScanners(Long.MAX_VALUE); 147 seekTestOfReversibleKeyValueScanner(scanners.get(0)); 148 for (int readPoint = 0; readPoint < MAXMVCC; readPoint++) { 149 LOG.info("Setting read point to " + readPoint); 150 scanners = memstore.getScanners(readPoint); 151 seekTestOfReversibleKeyValueScannerWithMVCC(scanners, readPoint); 152 } 153 154 } 155 156 @Test 157 public void testReversibleKeyValueHeap() throws IOException { 158 // write data to one memstore and two store files 159 FileSystem fs = TEST_UTIL.getTestFileSystem(); 160 Path hfilePath = new Path( 161 new Path(TEST_UTIL.getDataTestDir("testReversibleKeyValueHeap"), "regionname"), "familyname"); 162 CacheConfig cacheConf = new CacheConfig(TEST_UTIL.getConfiguration()); 163 HFileContextBuilder hcBuilder = new HFileContextBuilder(); 164 hcBuilder.withBlockSize(2 * 1024); 165 HFileContext hFileContext = hcBuilder.build(); 166 StoreFileWriter writer1 = 167 new StoreFileWriter.Builder(TEST_UTIL.getConfiguration(), cacheConf, fs) 168 .withOutputDir(hfilePath).withFileContext(hFileContext).build(); 169 StoreFileWriter writer2 = 170 new StoreFileWriter.Builder(TEST_UTIL.getConfiguration(), cacheConf, fs) 171 .withOutputDir(hfilePath).withFileContext(hFileContext).build(); 172 173 MemStore memstore = new DefaultMemStore(); 174 writeMemstoreAndStoreFiles(memstore, new StoreFileWriter[] { writer1, writer2 }); 175 176 HStoreFile sf1 = new HStoreFile(fs, writer1.getPath(), TEST_UTIL.getConfiguration(), cacheConf, 177 BloomType.NONE, true); 178 179 HStoreFile sf2 = new HStoreFile(fs, writer2.getPath(), TEST_UTIL.getConfiguration(), cacheConf, 180 BloomType.NONE, true); 181 /** 182 * Test without MVCC 183 */ 184 int startRowNum = ROWSIZE / 2; 185 ReversedKeyValueHeap kvHeap = 186 getReversibleKeyValueHeap(memstore, sf1, sf2, ROWS[startRowNum], MAXMVCC); 187 internalTestSeekAndNextForReversibleKeyValueHeap(kvHeap, startRowNum); 188 189 startRowNum = ROWSIZE - 1; 190 kvHeap = getReversibleKeyValueHeap(memstore, sf1, sf2, HConstants.EMPTY_START_ROW, MAXMVCC); 191 internalTestSeekAndNextForReversibleKeyValueHeap(kvHeap, startRowNum); 192 193 /** 194 * Test with MVCC 195 */ 196 for (int readPoint = 0; readPoint < MAXMVCC; readPoint++) { 197 LOG.info("Setting read point to " + readPoint); 198 startRowNum = ROWSIZE - 1; 199 kvHeap = getReversibleKeyValueHeap(memstore, sf1, sf2, HConstants.EMPTY_START_ROW, readPoint); 200 for (int i = startRowNum; i >= 0; i--) { 201 if (i - 2 < 0) break; 202 i = i - 2; 203 kvHeap.seekToPreviousRow(KeyValueUtil.createFirstOnRow(ROWS[i + 1])); 204 Pair<Integer, Integer> nextReadableNum = 205 getNextReadableNumWithBackwardScan(i, 0, readPoint); 206 if (nextReadableNum == null) break; 207 KeyValue expecedKey = makeKV(nextReadableNum.getFirst(), nextReadableNum.getSecond()); 208 assertEquals(expecedKey, kvHeap.peek()); 209 i = nextReadableNum.getFirst(); 210 int qualNum = nextReadableNum.getSecond(); 211 if (qualNum + 1 < QUALSIZE) { 212 kvHeap.backwardSeek(makeKV(i, qualNum + 1)); 213 nextReadableNum = getNextReadableNumWithBackwardScan(i, qualNum + 1, readPoint); 214 if (nextReadableNum == null) break; 215 expecedKey = makeKV(nextReadableNum.getFirst(), nextReadableNum.getSecond()); 216 assertEquals(expecedKey, kvHeap.peek()); 217 i = nextReadableNum.getFirst(); 218 qualNum = nextReadableNum.getSecond(); 219 } 220 221 kvHeap.next(); 222 223 if (qualNum + 1 >= QUALSIZE) { 224 nextReadableNum = getNextReadableNumWithBackwardScan(i - 1, 0, readPoint); 225 } else { 226 nextReadableNum = getNextReadableNumWithBackwardScan(i, qualNum + 1, readPoint); 227 } 228 if (nextReadableNum == null) break; 229 expecedKey = makeKV(nextReadableNum.getFirst(), nextReadableNum.getSecond()); 230 assertEquals(expecedKey, kvHeap.peek()); 231 i = nextReadableNum.getFirst(); 232 } 233 } 234 } 235 236 @Test 237 public void testReversibleStoreScanner() throws IOException { 238 // write data to one memstore and two store files 239 FileSystem fs = TEST_UTIL.getTestFileSystem(); 240 Path hfilePath = new Path( 241 new Path(TEST_UTIL.getDataTestDir("testReversibleStoreScanner"), "regionname"), "familyname"); 242 CacheConfig cacheConf = new CacheConfig(TEST_UTIL.getConfiguration()); 243 HFileContextBuilder hcBuilder = new HFileContextBuilder(); 244 hcBuilder.withBlockSize(2 * 1024); 245 HFileContext hFileContext = hcBuilder.build(); 246 StoreFileWriter writer1 = 247 new StoreFileWriter.Builder(TEST_UTIL.getConfiguration(), cacheConf, fs) 248 .withOutputDir(hfilePath).withFileContext(hFileContext).build(); 249 StoreFileWriter writer2 = 250 new StoreFileWriter.Builder(TEST_UTIL.getConfiguration(), cacheConf, fs) 251 .withOutputDir(hfilePath).withFileContext(hFileContext).build(); 252 253 MemStore memstore = new DefaultMemStore(); 254 writeMemstoreAndStoreFiles(memstore, new StoreFileWriter[] { writer1, writer2 }); 255 256 HStoreFile sf1 = new HStoreFile(fs, writer1.getPath(), TEST_UTIL.getConfiguration(), cacheConf, 257 BloomType.NONE, true); 258 259 HStoreFile sf2 = new HStoreFile(fs, writer2.getPath(), TEST_UTIL.getConfiguration(), cacheConf, 260 BloomType.NONE, true); 261 262 ScanInfo scanInfo = new ScanInfo(TEST_UTIL.getConfiguration(), FAMILYNAME, 0, Integer.MAX_VALUE, 263 Long.MAX_VALUE, KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, 264 CellComparatorImpl.COMPARATOR, false); 265 266 // Case 1.Test a full reversed scan 267 Scan scan = new Scan(); 268 scan.setReversed(true); 269 StoreScanner storeScanner = 270 getReversibleStoreScanner(memstore, sf1, sf2, scan, scanInfo, MAXMVCC); 271 verifyCountAndOrder(storeScanner, QUALSIZE * ROWSIZE, ROWSIZE, false); 272 273 // Case 2.Test reversed scan with a specified start row 274 int startRowNum = ROWSIZE / 2; 275 byte[] startRow = ROWS[startRowNum]; 276 scan.withStartRow(startRow); 277 storeScanner = getReversibleStoreScanner(memstore, sf1, sf2, scan, scanInfo, MAXMVCC); 278 verifyCountAndOrder(storeScanner, QUALSIZE * (startRowNum + 1), startRowNum + 1, false); 279 280 // Case 3.Test reversed scan with a specified start row and specified 281 // qualifiers 282 assertTrue(QUALSIZE > 2); 283 scan.addColumn(FAMILYNAME, QUALS[0]); 284 scan.addColumn(FAMILYNAME, QUALS[2]); 285 storeScanner = getReversibleStoreScanner(memstore, sf1, sf2, scan, scanInfo, MAXMVCC); 286 verifyCountAndOrder(storeScanner, 2 * (startRowNum + 1), startRowNum + 1, false); 287 288 // Case 4.Test reversed scan with mvcc based on case 3 289 for (int readPoint = 0; readPoint < MAXMVCC; readPoint++) { 290 LOG.info("Setting read point to " + readPoint); 291 storeScanner = getReversibleStoreScanner(memstore, sf1, sf2, scan, scanInfo, readPoint); 292 int expectedRowCount = 0; 293 int expectedKVCount = 0; 294 for (int i = startRowNum; i >= 0; i--) { 295 int kvCount = 0; 296 if (makeMVCC(i, 0) <= readPoint) { 297 kvCount++; 298 } 299 if (makeMVCC(i, 2) <= readPoint) { 300 kvCount++; 301 } 302 if (kvCount > 0) { 303 expectedRowCount++; 304 expectedKVCount += kvCount; 305 } 306 } 307 verifyCountAndOrder(storeScanner, expectedKVCount, expectedRowCount, false); 308 } 309 } 310 311 @Test 312 public void testReversibleRegionScanner() throws IOException { 313 byte[] FAMILYNAME2 = Bytes.toBytes("testCf2"); 314 TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf(name.getMethodName())) 315 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILYNAME)) 316 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILYNAME2)).build(); 317 HRegion region = TEST_UTIL.createLocalHRegion(htd, null, null); 318 loadDataToRegion(region, FAMILYNAME2); 319 320 // verify row count with forward scan 321 Scan scan = new Scan(); 322 InternalScanner scanner = region.getScanner(scan); 323 verifyCountAndOrder(scanner, ROWSIZE * QUALSIZE * 2, ROWSIZE, true); 324 325 // Case1:Full reversed scan 326 scan.setReversed(true); 327 scanner = region.getScanner(scan); 328 verifyCountAndOrder(scanner, ROWSIZE * QUALSIZE * 2, ROWSIZE, false); 329 330 // Case2:Full reversed scan with one family 331 scan = new Scan(); 332 scan.setReversed(true); 333 scan.addFamily(FAMILYNAME); 334 scanner = region.getScanner(scan); 335 verifyCountAndOrder(scanner, ROWSIZE * QUALSIZE, ROWSIZE, false); 336 337 // Case3:Specify qualifiers + One family 338 byte[][] specifiedQualifiers = { QUALS[1], QUALS[2] }; 339 for (byte[] specifiedQualifier : specifiedQualifiers) 340 scan.addColumn(FAMILYNAME, specifiedQualifier); 341 scanner = region.getScanner(scan); 342 verifyCountAndOrder(scanner, ROWSIZE * 2, ROWSIZE, false); 343 344 // Case4:Specify qualifiers + Two families 345 for (byte[] specifiedQualifier : specifiedQualifiers) 346 scan.addColumn(FAMILYNAME2, specifiedQualifier); 347 scanner = region.getScanner(scan); 348 verifyCountAndOrder(scanner, ROWSIZE * 2 * 2, ROWSIZE, false); 349 350 // Case5: Case4 + specify start row 351 int startRowNum = ROWSIZE * 3 / 4; 352 scan.withStartRow(ROWS[startRowNum]); 353 scanner = region.getScanner(scan); 354 verifyCountAndOrder(scanner, (startRowNum + 1) * 2 * 2, (startRowNum + 1), false); 355 356 // Case6: Case4 + specify stop row 357 int stopRowNum = ROWSIZE / 4; 358 scan.withStartRow(HConstants.EMPTY_BYTE_ARRAY); 359 scan.withStopRow(ROWS[stopRowNum]); 360 scanner = region.getScanner(scan); 361 verifyCountAndOrder(scanner, (ROWSIZE - stopRowNum - 1) * 2 * 2, (ROWSIZE - stopRowNum - 1), 362 false); 363 364 // Case7: Case4 + specify start row + specify stop row 365 scan.withStartRow(ROWS[startRowNum]); 366 scanner = region.getScanner(scan); 367 verifyCountAndOrder(scanner, (startRowNum - stopRowNum) * 2 * 2, (startRowNum - stopRowNum), 368 false); 369 370 // Case8: Case7 + SingleColumnValueFilter 371 int valueNum = startRowNum % VALUESIZE; 372 Filter filter = new SingleColumnValueFilter(FAMILYNAME, specifiedQualifiers[0], 373 CompareOperator.EQUAL, VALUES[valueNum]); 374 scan.setFilter(filter); 375 scanner = region.getScanner(scan); 376 int unfilteredRowNum = 377 (startRowNum - stopRowNum) / VALUESIZE + (stopRowNum / VALUESIZE == valueNum ? 0 : 1); 378 verifyCountAndOrder(scanner, unfilteredRowNum * 2 * 2, unfilteredRowNum, false); 379 380 // Case9: Case7 + PageFilter 381 int pageSize = 10; 382 filter = new PageFilter(pageSize); 383 scan.setFilter(filter); 384 scanner = region.getScanner(scan); 385 int expectedRowNum = pageSize; 386 verifyCountAndOrder(scanner, expectedRowNum * 2 * 2, expectedRowNum, false); 387 388 // Case10: Case7 + FilterList+MUST_PASS_ONE 389 SingleColumnValueFilter scvFilter1 = new SingleColumnValueFilter(FAMILYNAME, 390 specifiedQualifiers[0], CompareOperator.EQUAL, VALUES[0]); 391 SingleColumnValueFilter scvFilter2 = new SingleColumnValueFilter(FAMILYNAME, 392 specifiedQualifiers[0], CompareOperator.EQUAL, VALUES[1]); 393 expectedRowNum = 0; 394 for (int i = startRowNum; i > stopRowNum; i--) { 395 if (i % VALUESIZE == 0 || i % VALUESIZE == 1) { 396 expectedRowNum++; 397 } 398 } 399 filter = new FilterList(Operator.MUST_PASS_ONE, scvFilter1, scvFilter2); 400 scan.setFilter(filter); 401 scanner = region.getScanner(scan); 402 verifyCountAndOrder(scanner, expectedRowNum * 2 * 2, expectedRowNum, false); 403 404 // Case10: Case7 + FilterList+MUST_PASS_ALL 405 filter = new FilterList(Operator.MUST_PASS_ALL, scvFilter1, scvFilter2); 406 expectedRowNum = 0; 407 scan.setFilter(filter); 408 scanner = region.getScanner(scan); 409 verifyCountAndOrder(scanner, expectedRowNum * 2 * 2, expectedRowNum, false); 410 } 411 412 private StoreScanner getReversibleStoreScanner(MemStore memstore, HStoreFile sf1, HStoreFile sf2, 413 Scan scan, ScanInfo scanInfo, int readPoint) throws IOException { 414 List<KeyValueScanner> scanners = getScanners(memstore, sf1, sf2, null, false, readPoint); 415 NavigableSet<byte[]> columns = null; 416 for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap().entrySet()) { 417 // Should only one family 418 columns = entry.getValue(); 419 } 420 StoreScanner storeScanner = new ReversedStoreScanner(scan, scanInfo, columns, scanners); 421 return storeScanner; 422 } 423 424 private void verifyCountAndOrder(InternalScanner scanner, int expectedKVCount, 425 int expectedRowCount, boolean forward) throws IOException { 426 List<Cell> kvList = new ArrayList<>(); 427 Result lastResult = null; 428 int rowCount = 0; 429 int kvCount = 0; 430 try { 431 while (scanner.next(kvList)) { 432 if (kvList.isEmpty()) continue; 433 rowCount++; 434 kvCount += kvList.size(); 435 if (lastResult != null) { 436 Result curResult = Result.create(kvList); 437 assertEquals("LastResult:" + lastResult + "CurResult:" + curResult, forward, 438 Bytes.compareTo(curResult.getRow(), lastResult.getRow()) > 0); 439 } 440 lastResult = Result.create(kvList); 441 kvList.clear(); 442 } 443 } finally { 444 scanner.close(); 445 } 446 if (!kvList.isEmpty()) { 447 rowCount++; 448 kvCount += kvList.size(); 449 kvList.clear(); 450 } 451 assertEquals(expectedKVCount, kvCount); 452 assertEquals(expectedRowCount, rowCount); 453 } 454 455 private void internalTestSeekAndNextForReversibleKeyValueHeap(ReversedKeyValueHeap kvHeap, 456 int startRowNum) throws IOException { 457 // Test next and seek 458 for (int i = startRowNum; i >= 0; i--) { 459 if (i % 2 == 1 && i - 2 >= 0) { 460 i = i - 2; 461 kvHeap.seekToPreviousRow(KeyValueUtil.createFirstOnRow(ROWS[i + 1])); 462 } 463 for (int j = 0; j < QUALSIZE; j++) { 464 if (j % 2 == 1 && (j + 1) < QUALSIZE) { 465 j = j + 1; 466 kvHeap.backwardSeek(makeKV(i, j)); 467 } 468 assertEquals(makeKV(i, j), kvHeap.peek()); 469 kvHeap.next(); 470 } 471 } 472 assertEquals(null, kvHeap.peek()); 473 } 474 475 private ReversedKeyValueHeap getReversibleKeyValueHeap(MemStore memstore, HStoreFile sf1, 476 HStoreFile sf2, byte[] startRow, int readPoint) throws IOException { 477 List<KeyValueScanner> scanners = getScanners(memstore, sf1, sf2, startRow, true, readPoint); 478 ReversedKeyValueHeap kvHeap = new ReversedKeyValueHeap(scanners, CellComparatorImpl.COMPARATOR); 479 return kvHeap; 480 } 481 482 private List<KeyValueScanner> getScanners(MemStore memstore, HStoreFile sf1, HStoreFile sf2, 483 byte[] startRow, boolean doSeek, int readPoint) throws IOException { 484 List<StoreFileScanner> fileScanners = StoreFileScanner 485 .getScannersForStoreFiles(Lists.newArrayList(sf1, sf2), false, true, false, false, readPoint); 486 List<KeyValueScanner> memScanners = memstore.getScanners(readPoint); 487 List<KeyValueScanner> scanners = new ArrayList<>(fileScanners.size() + 1); 488 scanners.addAll(fileScanners); 489 scanners.addAll(memScanners); 490 491 if (doSeek) { 492 if (Bytes.equals(HConstants.EMPTY_START_ROW, startRow)) { 493 for (KeyValueScanner scanner : scanners) { 494 scanner.seekToLastRow(); 495 } 496 } else { 497 KeyValue startKey = KeyValueUtil.createFirstOnRow(startRow); 498 for (KeyValueScanner scanner : scanners) { 499 scanner.backwardSeek(startKey); 500 } 501 } 502 } 503 return scanners; 504 } 505 506 private void seekTestOfReversibleKeyValueScanner(KeyValueScanner scanner) throws IOException { 507 /** 508 * Test without MVCC 509 */ 510 // Test seek to last row 511 assertTrue(scanner.seekToLastRow()); 512 assertEquals(makeKV(ROWSIZE - 1, 0), scanner.peek()); 513 514 // Test backward seek in three cases 515 // Case1: seek in the same row in backwardSeek 516 KeyValue seekKey = makeKV(ROWSIZE - 2, QUALSIZE - 2); 517 assertTrue(scanner.backwardSeek(seekKey)); 518 assertEquals(seekKey, scanner.peek()); 519 520 // Case2: seek to the previous row in backwardSeek 521 int seekRowNum = ROWSIZE - 2; 522 assertTrue(scanner.backwardSeek(KeyValueUtil.createLastOnRow(ROWS[seekRowNum]))); 523 KeyValue expectedKey = makeKV(seekRowNum - 1, 0); 524 assertEquals(expectedKey, scanner.peek()); 525 526 // Case3: unable to backward seek 527 assertFalse(scanner.backwardSeek(KeyValueUtil.createLastOnRow(ROWS[0]))); 528 assertEquals(null, scanner.peek()); 529 530 // Test seek to previous row 531 seekRowNum = ROWSIZE - 4; 532 assertTrue(scanner.seekToPreviousRow(KeyValueUtil.createFirstOnRow(ROWS[seekRowNum]))); 533 expectedKey = makeKV(seekRowNum - 1, 0); 534 assertEquals(expectedKey, scanner.peek()); 535 536 // Test seek to previous row for the first row 537 assertFalse(scanner.seekToPreviousRow(makeKV(0, 0))); 538 assertEquals(null, scanner.peek()); 539 540 } 541 542 private void seekTestOfReversibleKeyValueScannerWithMVCC(List<? extends KeyValueScanner> scanners, 543 int readPoint) throws IOException { 544 /** 545 * Test with MVCC 546 */ 547 // Test seek to last row 548 KeyValue expectedKey = getNextReadableKeyValueWithBackwardScan(ROWSIZE - 1, 0, readPoint); 549 boolean res = false; 550 for (KeyValueScanner scanner : scanners) { 551 res |= scanner.seekToLastRow(); 552 } 553 assertEquals(expectedKey != null, res); 554 res = false; 555 for (KeyValueScanner scanner : scanners) { 556 res |= (expectedKey.equals(scanner.peek())); 557 } 558 assertTrue(res); 559 560 // Test backward seek in two cases 561 // Case1: seek in the same row in backwardSeek 562 expectedKey = getNextReadableKeyValueWithBackwardScan(ROWSIZE - 2, QUALSIZE - 2, readPoint); 563 res = false; 564 for (KeyValueScanner scanner : scanners) { 565 res |= scanner.backwardSeek(expectedKey); 566 } 567 assertEquals(expectedKey != null, res); 568 res = false; 569 for (KeyValueScanner scanner : scanners) { 570 res |= (expectedKey.equals(scanner.peek())); 571 } 572 assertTrue(res); 573 574 // Case2: seek to the previous row in backwardSeek 575 int seekRowNum = ROWSIZE - 3; 576 res = false; 577 for (KeyValueScanner scanner : scanners) { 578 res |= scanner.backwardSeek(expectedKey); 579 } 580 res = false; 581 for (KeyValueScanner scanner : scanners) { 582 res |= (expectedKey.equals(scanner.peek())); 583 } 584 assertTrue(res); 585 586 // Test seek to previous row 587 seekRowNum = ROWSIZE - 4; 588 expectedKey = getNextReadableKeyValueWithBackwardScan(seekRowNum - 1, 0, readPoint); 589 res = false; 590 for (KeyValueScanner scanner : scanners) { 591 res |= scanner.seekToPreviousRow(KeyValueUtil.createFirstOnRow(ROWS[seekRowNum])); 592 } 593 assertEquals(expectedKey != null, res); 594 res = false; 595 for (KeyValueScanner scanner : scanners) { 596 res |= (expectedKey.equals(scanner.peek())); 597 } 598 assertTrue(res); 599 } 600 601 private KeyValue getNextReadableKeyValueWithBackwardScan(int startRowNum, int startQualNum, 602 int readPoint) { 603 Pair<Integer, Integer> nextReadableNum = 604 getNextReadableNumWithBackwardScan(startRowNum, startQualNum, readPoint); 605 if (nextReadableNum == null) return null; 606 return makeKV(nextReadableNum.getFirst(), nextReadableNum.getSecond()); 607 } 608 609 private Pair<Integer, Integer> getNextReadableNumWithBackwardScan(int startRowNum, 610 int startQualNum, int readPoint) { 611 Pair<Integer, Integer> nextReadableNum = null; 612 boolean findExpected = false; 613 for (int i = startRowNum; i >= 0; i--) { 614 for (int j = (i == startRowNum ? startQualNum : 0); j < QUALSIZE; j++) { 615 if (makeMVCC(i, j) <= readPoint) { 616 nextReadableNum = new Pair<>(i, j); 617 findExpected = true; 618 break; 619 } 620 } 621 if (findExpected) break; 622 } 623 return nextReadableNum; 624 } 625 626 private static void loadDataToRegion(HRegion region, byte[] additionalFamily) throws IOException { 627 for (int i = 0; i < ROWSIZE; i++) { 628 Put put = new Put(ROWS[i]); 629 for (int j = 0; j < QUALSIZE; j++) { 630 put.add(makeKV(i, j)); 631 // put additional family 632 put.add(makeKV(i, j, additionalFamily)); 633 } 634 region.put(put); 635 if (i == ROWSIZE / 3 || i == ROWSIZE * 2 / 3) { 636 region.flush(true); 637 } 638 } 639 } 640 641 private static void writeMemstoreAndStoreFiles(MemStore memstore, final StoreFileWriter[] writers) 642 throws IOException { 643 try { 644 for (int i = 0; i < ROWSIZE; i++) { 645 for (int j = 0; j < QUALSIZE; j++) { 646 if (i % 2 == 0) { 647 memstore.add(makeKV(i, j), null); 648 } else { 649 writers[(i + j) % writers.length].append(makeKV(i, j)); 650 } 651 } 652 } 653 } finally { 654 for (int i = 0; i < writers.length; i++) { 655 writers[i].close(); 656 } 657 } 658 } 659 660 private static void writeStoreFile(final StoreFileWriter writer) throws IOException { 661 try { 662 for (int i = 0; i < ROWSIZE; i++) { 663 for (int j = 0; j < QUALSIZE; j++) { 664 writer.append(makeKV(i, j)); 665 } 666 } 667 } finally { 668 writer.close(); 669 } 670 } 671 672 private static void writeMemstore(MemStore memstore) throws IOException { 673 // Add half of the keyvalues to memstore 674 for (int i = 0; i < ROWSIZE; i++) { 675 for (int j = 0; j < QUALSIZE; j++) { 676 if ((i + j) % 2 == 0) { 677 memstore.add(makeKV(i, j), null); 678 } 679 } 680 } 681 memstore.snapshot(); 682 // Add another half of the keyvalues to snapshot 683 for (int i = 0; i < ROWSIZE; i++) { 684 for (int j = 0; j < QUALSIZE; j++) { 685 if ((i + j) % 2 == 1) { 686 memstore.add(makeKV(i, j), null); 687 } 688 } 689 } 690 } 691 692 private static KeyValue makeKV(int rowNum, int cqNum) { 693 return makeKV(rowNum, cqNum, FAMILYNAME); 694 } 695 696 private static KeyValue makeKV(int rowNum, int cqNum, byte[] familyName) { 697 KeyValue kv = 698 new KeyValue(ROWS[rowNum], familyName, QUALS[cqNum], TS, VALUES[rowNum % VALUESIZE]); 699 kv.setSequenceId(makeMVCC(rowNum, cqNum)); 700 return kv; 701 } 702 703 private static long makeMVCC(int rowNum, int cqNum) { 704 return (rowNum + cqNum) % (MAXMVCC + 1); 705 } 706 707 private static byte[][] makeN(byte[] base, int n) { 708 byte[][] ret = new byte[n][]; 709 for (int i = 0; i < n; i++) { 710 ret[i] = Bytes.add(base, Bytes.toBytes(String.format("%04d", i))); 711 } 712 return ret; 713 } 714}