001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertFalse; 022import static org.junit.jupiter.api.Assertions.assertTrue; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.Collections; 027import java.util.List; 028import java.util.Map; 029import java.util.NavigableSet; 030import org.apache.hadoop.fs.FileSystem; 031import org.apache.hadoop.fs.Path; 032import org.apache.hadoop.hbase.Cell; 033import org.apache.hadoop.hbase.CellComparatorImpl; 034import org.apache.hadoop.hbase.CompareOperator; 035import org.apache.hadoop.hbase.HBaseTestingUtil; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.KeepDeletedCells; 038import org.apache.hadoop.hbase.KeyValue; 039import org.apache.hadoop.hbase.KeyValueUtil; 040import org.apache.hadoop.hbase.TableName; 041import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 042import org.apache.hadoop.hbase.client.Put; 043import org.apache.hadoop.hbase.client.Result; 044import org.apache.hadoop.hbase.client.Scan; 045import org.apache.hadoop.hbase.client.TableDescriptor; 046import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 047import org.apache.hadoop.hbase.filter.Filter; 048import org.apache.hadoop.hbase.filter.FilterList; 049import org.apache.hadoop.hbase.filter.FilterList.Operator; 050import org.apache.hadoop.hbase.filter.PageFilter; 051import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; 052import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 053import org.apache.hadoop.hbase.io.hfile.CacheConfig; 054import org.apache.hadoop.hbase.io.hfile.HFileContext; 055import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 056import org.apache.hadoop.hbase.testclassification.MediumTests; 057import org.apache.hadoop.hbase.testclassification.RegionServerTests; 058import org.apache.hadoop.hbase.util.Bytes; 059import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 060import org.apache.hadoop.hbase.util.Pair; 061import org.junit.jupiter.api.BeforeAll; 062import org.junit.jupiter.api.Tag; 063import org.junit.jupiter.api.Test; 064import org.junit.jupiter.api.TestInfo; 065import org.slf4j.Logger; 066import org.slf4j.LoggerFactory; 067 068import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 069 070/** 071 * Test cases against ReversibleKeyValueScanner 072 */ 073@Tag(RegionServerTests.TAG) 074@Tag(MediumTests.TAG) 075public class TestReversibleScanners { 076 077 private static final Logger LOG = LoggerFactory.getLogger(TestReversibleScanners.class); 078 HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 079 080 private static byte[] FAMILYNAME = Bytes.toBytes("testCf"); 081 private static long TS = EnvironmentEdgeManager.currentTime(); 082 private static int MAXMVCC = 7; 083 private static byte[] ROW = Bytes.toBytes("testRow"); 084 private static final int ROWSIZE = 200; 085 private static byte[][] ROWS = makeN(ROW, ROWSIZE); 086 private static byte[] QUAL = Bytes.toBytes("testQual"); 087 private static final int QUALSIZE = 5; 088 private static byte[][] QUALS = makeN(QUAL, QUALSIZE); 089 private static byte[] VALUE = Bytes.toBytes("testValue"); 090 private static final int VALUESIZE = 3; 091 private static byte[][] VALUES = makeN(VALUE, VALUESIZE); 092 093 @BeforeAll 094 public static void setUp() { 095 ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null, 096 MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); 097 } 098 099 @Test 100 public void testReversibleStoreFileScanner() throws IOException { 101 FileSystem fs = TEST_UTIL.getTestFileSystem(); 102 Path hfilePath = 103 new Path(new Path(TEST_UTIL.getDataTestDir("testReversibleStoreFileScanner"), "regionname"), 104 "familyname"); 105 CacheConfig cacheConf = new CacheConfig(TEST_UTIL.getConfiguration()); 106 for (DataBlockEncoding encoding : DataBlockEncoding.values()) { 107 HFileContextBuilder hcBuilder = new HFileContextBuilder(); 108 hcBuilder.withBlockSize(2 * 1024); 109 hcBuilder.withDataBlockEncoding(encoding); 110 HFileContext hFileContext = hcBuilder.build(); 111 StoreFileWriter writer = 112 new StoreFileWriter.Builder(TEST_UTIL.getConfiguration(), cacheConf, fs) 113 .withOutputDir(hfilePath).withFileContext(hFileContext).build(); 114 writeStoreFile(writer); 115 116 StoreFileInfo storeFileInfo = StoreFileInfo 117 .createStoreFileInfoForHFile(TEST_UTIL.getConfiguration(), fs, writer.getPath(), true); 118 HStoreFile sf = new HStoreFile(storeFileInfo, BloomType.NONE, cacheConf); 119 120 List<StoreFileScanner> scanners = StoreFileScanner.getScannersForStoreFiles( 121 Collections.singletonList(sf), false, true, false, false, Long.MAX_VALUE); 122 StoreFileScanner scanner = scanners.get(0); 123 seekTestOfReversibleKeyValueScanner(scanner); 124 for (int readPoint = 0; readPoint < MAXMVCC; readPoint++) { 125 LOG.info("Setting read point to " + readPoint); 126 scanners = StoreFileScanner.getScannersForStoreFiles(Collections.singletonList(sf), false, 127 true, false, false, readPoint); 128 seekTestOfReversibleKeyValueScannerWithMVCC(scanners, readPoint); 129 } 130 } 131 132 } 133 134 @Test 135 public void testReversibleMemstoreScanner() throws IOException { 136 MemStore memstore = new DefaultMemStore(); 137 writeMemstore(memstore); 138 List<KeyValueScanner> scanners = memstore.getScanners(Long.MAX_VALUE); 139 seekTestOfReversibleKeyValueScanner(scanners.get(0)); 140 for (int readPoint = 0; readPoint < MAXMVCC; readPoint++) { 141 LOG.info("Setting read point to " + readPoint); 142 scanners = memstore.getScanners(readPoint); 143 seekTestOfReversibleKeyValueScannerWithMVCC(scanners, readPoint); 144 } 145 146 } 147 148 @Test 149 public void testReversibleKeyValueHeap() throws IOException { 150 // write data to one memstore and two store files 151 FileSystem fs = TEST_UTIL.getTestFileSystem(); 152 Path hfilePath = new Path( 153 new Path(TEST_UTIL.getDataTestDir("testReversibleKeyValueHeap"), "regionname"), "familyname"); 154 CacheConfig cacheConf = new CacheConfig(TEST_UTIL.getConfiguration()); 155 HFileContextBuilder hcBuilder = new HFileContextBuilder(); 156 hcBuilder.withBlockSize(2 * 1024); 157 HFileContext hFileContext = hcBuilder.build(); 158 StoreFileWriter writer1 = 159 new StoreFileWriter.Builder(TEST_UTIL.getConfiguration(), cacheConf, fs) 160 .withOutputDir(hfilePath).withFileContext(hFileContext).build(); 161 StoreFileWriter writer2 = 162 new StoreFileWriter.Builder(TEST_UTIL.getConfiguration(), cacheConf, fs) 163 .withOutputDir(hfilePath).withFileContext(hFileContext).build(); 164 165 MemStore memstore = new DefaultMemStore(); 166 writeMemstoreAndStoreFiles(memstore, new StoreFileWriter[] { writer1, writer2 }); 167 168 StoreFileInfo storeFileInfo1 = StoreFileInfo 169 .createStoreFileInfoForHFile(TEST_UTIL.getConfiguration(), fs, writer1.getPath(), true); 170 HStoreFile sf1 = new HStoreFile(storeFileInfo1, BloomType.NONE, cacheConf); 171 172 StoreFileInfo storeFileInfo2 = StoreFileInfo 173 .createStoreFileInfoForHFile(TEST_UTIL.getConfiguration(), fs, writer2.getPath(), true); 174 HStoreFile sf2 = new HStoreFile(storeFileInfo2, BloomType.NONE, cacheConf); 175 /** 176 * Test without MVCC 177 */ 178 int startRowNum = ROWSIZE / 2; 179 ReversedKeyValueHeap kvHeap = 180 getReversibleKeyValueHeap(memstore, sf1, sf2, ROWS[startRowNum], MAXMVCC); 181 internalTestSeekAndNextForReversibleKeyValueHeap(kvHeap, startRowNum); 182 183 startRowNum = ROWSIZE - 1; 184 kvHeap = getReversibleKeyValueHeap(memstore, sf1, sf2, HConstants.EMPTY_START_ROW, MAXMVCC); 185 internalTestSeekAndNextForReversibleKeyValueHeap(kvHeap, startRowNum); 186 187 /** 188 * Test with MVCC 189 */ 190 for (int readPoint = 0; readPoint < MAXMVCC; readPoint++) { 191 LOG.info("Setting read point to " + readPoint); 192 startRowNum = ROWSIZE - 1; 193 kvHeap = getReversibleKeyValueHeap(memstore, sf1, sf2, HConstants.EMPTY_START_ROW, readPoint); 194 for (int i = startRowNum; i >= 0; i--) { 195 if (i - 2 < 0) break; 196 i = i - 2; 197 kvHeap.seekToPreviousRow(KeyValueUtil.createFirstOnRow(ROWS[i + 1])); 198 Pair<Integer, Integer> nextReadableNum = 199 getNextReadableNumWithBackwardScan(i, 0, readPoint); 200 if (nextReadableNum == null) break; 201 KeyValue expecedKey = makeKV(nextReadableNum.getFirst(), nextReadableNum.getSecond()); 202 assertEquals(expecedKey, kvHeap.peek()); 203 i = nextReadableNum.getFirst(); 204 int qualNum = nextReadableNum.getSecond(); 205 if (qualNum + 1 < QUALSIZE) { 206 kvHeap.backwardSeek(makeKV(i, qualNum + 1)); 207 nextReadableNum = getNextReadableNumWithBackwardScan(i, qualNum + 1, readPoint); 208 if (nextReadableNum == null) break; 209 expecedKey = makeKV(nextReadableNum.getFirst(), nextReadableNum.getSecond()); 210 assertEquals(expecedKey, kvHeap.peek()); 211 i = nextReadableNum.getFirst(); 212 qualNum = nextReadableNum.getSecond(); 213 } 214 215 kvHeap.next(); 216 217 if (qualNum + 1 >= QUALSIZE) { 218 nextReadableNum = getNextReadableNumWithBackwardScan(i - 1, 0, readPoint); 219 } else { 220 nextReadableNum = getNextReadableNumWithBackwardScan(i, qualNum + 1, readPoint); 221 } 222 if (nextReadableNum == null) break; 223 expecedKey = makeKV(nextReadableNum.getFirst(), nextReadableNum.getSecond()); 224 assertEquals(expecedKey, kvHeap.peek()); 225 i = nextReadableNum.getFirst(); 226 } 227 } 228 } 229 230 @Test 231 public void testReversibleStoreScanner() throws IOException { 232 // write data to one memstore and two store files 233 FileSystem fs = TEST_UTIL.getTestFileSystem(); 234 Path hfilePath = new Path( 235 new Path(TEST_UTIL.getDataTestDir("testReversibleStoreScanner"), "regionname"), "familyname"); 236 CacheConfig cacheConf = new CacheConfig(TEST_UTIL.getConfiguration()); 237 HFileContextBuilder hcBuilder = new HFileContextBuilder(); 238 hcBuilder.withBlockSize(2 * 1024); 239 HFileContext hFileContext = hcBuilder.build(); 240 StoreFileWriter writer1 = 241 new StoreFileWriter.Builder(TEST_UTIL.getConfiguration(), cacheConf, fs) 242 .withOutputDir(hfilePath).withFileContext(hFileContext).build(); 243 StoreFileWriter writer2 = 244 new StoreFileWriter.Builder(TEST_UTIL.getConfiguration(), cacheConf, fs) 245 .withOutputDir(hfilePath).withFileContext(hFileContext).build(); 246 247 MemStore memstore = new DefaultMemStore(); 248 writeMemstoreAndStoreFiles(memstore, new StoreFileWriter[] { writer1, writer2 }); 249 250 StoreFileInfo storeFileInfo1 = StoreFileInfo 251 .createStoreFileInfoForHFile(TEST_UTIL.getConfiguration(), fs, writer1.getPath(), true); 252 HStoreFile sf1 = new HStoreFile(storeFileInfo1, BloomType.NONE, cacheConf); 253 254 StoreFileInfo storeFileInfo2 = StoreFileInfo 255 .createStoreFileInfoForHFile(TEST_UTIL.getConfiguration(), fs, writer2.getPath(), true); 256 HStoreFile sf2 = new HStoreFile(storeFileInfo2, BloomType.NONE, cacheConf); 257 258 ScanInfo scanInfo = new ScanInfo(TEST_UTIL.getConfiguration(), FAMILYNAME, 0, Integer.MAX_VALUE, 259 Long.MAX_VALUE, KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, 260 CellComparatorImpl.COMPARATOR, false); 261 262 // Case 1.Test a full reversed scan 263 Scan scan = new Scan(); 264 scan.setReversed(true); 265 StoreScanner storeScanner = 266 getReversibleStoreScanner(memstore, sf1, sf2, scan, scanInfo, MAXMVCC); 267 verifyCountAndOrder(storeScanner, QUALSIZE * ROWSIZE, ROWSIZE, false); 268 269 // Case 2.Test reversed scan with a specified start row 270 int startRowNum = ROWSIZE / 2; 271 byte[] startRow = ROWS[startRowNum]; 272 scan.withStartRow(startRow); 273 storeScanner = getReversibleStoreScanner(memstore, sf1, sf2, scan, scanInfo, MAXMVCC); 274 verifyCountAndOrder(storeScanner, QUALSIZE * (startRowNum + 1), startRowNum + 1, false); 275 276 // Case 3.Test reversed scan with a specified start row and specified 277 // qualifiers 278 assertTrue(QUALSIZE > 2); 279 scan.addColumn(FAMILYNAME, QUALS[0]); 280 scan.addColumn(FAMILYNAME, QUALS[2]); 281 storeScanner = getReversibleStoreScanner(memstore, sf1, sf2, scan, scanInfo, MAXMVCC); 282 verifyCountAndOrder(storeScanner, 2 * (startRowNum + 1), startRowNum + 1, false); 283 284 // Case 4.Test reversed scan with mvcc based on case 3 285 for (int readPoint = 0; readPoint < MAXMVCC; readPoint++) { 286 LOG.info("Setting read point to " + readPoint); 287 storeScanner = getReversibleStoreScanner(memstore, sf1, sf2, scan, scanInfo, readPoint); 288 int expectedRowCount = 0; 289 int expectedKVCount = 0; 290 for (int i = startRowNum; i >= 0; i--) { 291 int kvCount = 0; 292 if (makeMVCC(i, 0) <= readPoint) { 293 kvCount++; 294 } 295 if (makeMVCC(i, 2) <= readPoint) { 296 kvCount++; 297 } 298 if (kvCount > 0) { 299 expectedRowCount++; 300 expectedKVCount += kvCount; 301 } 302 } 303 verifyCountAndOrder(storeScanner, expectedKVCount, expectedRowCount, false); 304 } 305 } 306 307 @Test 308 public void testReversibleRegionScanner(TestInfo testInfo) throws IOException { 309 byte[] FAMILYNAME2 = Bytes.toBytes("testCf2"); 310 TableDescriptor htd = 311 TableDescriptorBuilder.newBuilder(TableName.valueOf(testInfo.getTestMethod().get().getName())) 312 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILYNAME)) 313 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILYNAME2)).build(); 314 HRegion region = TEST_UTIL.createLocalHRegion(htd, null, null); 315 loadDataToRegion(region, FAMILYNAME2); 316 317 // verify row count with forward scan 318 Scan scan = new Scan(); 319 InternalScanner scanner = region.getScanner(scan); 320 verifyCountAndOrder(scanner, ROWSIZE * QUALSIZE * 2, ROWSIZE, true); 321 322 // Case1:Full reversed scan 323 scan.setReversed(true); 324 scanner = region.getScanner(scan); 325 verifyCountAndOrder(scanner, ROWSIZE * QUALSIZE * 2, ROWSIZE, false); 326 327 // Case2:Full reversed scan with one family 328 scan = new Scan(); 329 scan.setReversed(true); 330 scan.addFamily(FAMILYNAME); 331 scanner = region.getScanner(scan); 332 verifyCountAndOrder(scanner, ROWSIZE * QUALSIZE, ROWSIZE, false); 333 334 // Case3:Specify qualifiers + One family 335 byte[][] specifiedQualifiers = { QUALS[1], QUALS[2] }; 336 for (byte[] specifiedQualifier : specifiedQualifiers) 337 scan.addColumn(FAMILYNAME, specifiedQualifier); 338 scanner = region.getScanner(scan); 339 verifyCountAndOrder(scanner, ROWSIZE * 2, ROWSIZE, false); 340 341 // Case4:Specify qualifiers + Two families 342 for (byte[] specifiedQualifier : specifiedQualifiers) 343 scan.addColumn(FAMILYNAME2, specifiedQualifier); 344 scanner = region.getScanner(scan); 345 verifyCountAndOrder(scanner, ROWSIZE * 2 * 2, ROWSIZE, false); 346 347 // Case5: Case4 + specify start row 348 int startRowNum = ROWSIZE * 3 / 4; 349 scan.withStartRow(ROWS[startRowNum]); 350 scanner = region.getScanner(scan); 351 verifyCountAndOrder(scanner, (startRowNum + 1) * 2 * 2, (startRowNum + 1), false); 352 353 // Case6: Case4 + specify stop row 354 int stopRowNum = ROWSIZE / 4; 355 scan.withStartRow(HConstants.EMPTY_BYTE_ARRAY); 356 scan.withStopRow(ROWS[stopRowNum]); 357 scanner = region.getScanner(scan); 358 verifyCountAndOrder(scanner, (ROWSIZE - stopRowNum - 1) * 2 * 2, (ROWSIZE - stopRowNum - 1), 359 false); 360 361 // Case7: Case4 + specify start row + specify stop row 362 scan.withStartRow(ROWS[startRowNum]); 363 scanner = region.getScanner(scan); 364 verifyCountAndOrder(scanner, (startRowNum - stopRowNum) * 2 * 2, (startRowNum - stopRowNum), 365 false); 366 367 // Case8: Case7 + SingleColumnValueFilter 368 int valueNum = startRowNum % VALUESIZE; 369 Filter filter = new SingleColumnValueFilter(FAMILYNAME, specifiedQualifiers[0], 370 CompareOperator.EQUAL, VALUES[valueNum]); 371 scan.setFilter(filter); 372 scanner = region.getScanner(scan); 373 int unfilteredRowNum = 374 (startRowNum - stopRowNum) / VALUESIZE + (stopRowNum / VALUESIZE == valueNum ? 0 : 1); 375 verifyCountAndOrder(scanner, unfilteredRowNum * 2 * 2, unfilteredRowNum, false); 376 377 // Case9: Case7 + PageFilter 378 int pageSize = 10; 379 filter = new PageFilter(pageSize); 380 scan.setFilter(filter); 381 scanner = region.getScanner(scan); 382 int expectedRowNum = pageSize; 383 verifyCountAndOrder(scanner, expectedRowNum * 2 * 2, expectedRowNum, false); 384 385 // Case10: Case7 + FilterList+MUST_PASS_ONE 386 SingleColumnValueFilter scvFilter1 = new SingleColumnValueFilter(FAMILYNAME, 387 specifiedQualifiers[0], CompareOperator.EQUAL, VALUES[0]); 388 SingleColumnValueFilter scvFilter2 = new SingleColumnValueFilter(FAMILYNAME, 389 specifiedQualifiers[0], CompareOperator.EQUAL, VALUES[1]); 390 expectedRowNum = 0; 391 for (int i = startRowNum; i > stopRowNum; i--) { 392 if (i % VALUESIZE == 0 || i % VALUESIZE == 1) { 393 expectedRowNum++; 394 } 395 } 396 filter = new FilterList(Operator.MUST_PASS_ONE, scvFilter1, scvFilter2); 397 scan.setFilter(filter); 398 scanner = region.getScanner(scan); 399 verifyCountAndOrder(scanner, expectedRowNum * 2 * 2, expectedRowNum, false); 400 401 // Case10: Case7 + FilterList+MUST_PASS_ALL 402 filter = new FilterList(Operator.MUST_PASS_ALL, scvFilter1, scvFilter2); 403 expectedRowNum = 0; 404 scan.setFilter(filter); 405 scanner = region.getScanner(scan); 406 verifyCountAndOrder(scanner, expectedRowNum * 2 * 2, expectedRowNum, false); 407 } 408 409 private StoreScanner getReversibleStoreScanner(MemStore memstore, HStoreFile sf1, HStoreFile sf2, 410 Scan scan, ScanInfo scanInfo, int readPoint) throws IOException { 411 List<KeyValueScanner> scanners = getScanners(memstore, sf1, sf2, null, false, readPoint); 412 NavigableSet<byte[]> columns = null; 413 for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap().entrySet()) { 414 // Should only one family 415 columns = entry.getValue(); 416 } 417 StoreScanner storeScanner = new ReversedStoreScanner(scan, scanInfo, columns, scanners); 418 return storeScanner; 419 } 420 421 private void verifyCountAndOrder(InternalScanner scanner, int expectedKVCount, 422 int expectedRowCount, boolean forward) throws IOException { 423 List<Cell> kvList = new ArrayList<>(); 424 Result lastResult = null; 425 int rowCount = 0; 426 int kvCount = 0; 427 try { 428 while (scanner.next(kvList)) { 429 if (kvList.isEmpty()) continue; 430 rowCount++; 431 kvCount += kvList.size(); 432 if (lastResult != null) { 433 Result curResult = Result.create(kvList); 434 assertEquals(forward, Bytes.compareTo(curResult.getRow(), lastResult.getRow()) > 0, 435 "LastResult:" + lastResult + "CurResult:" + curResult); 436 } 437 lastResult = Result.create(kvList); 438 kvList.clear(); 439 } 440 } finally { 441 scanner.close(); 442 } 443 if (!kvList.isEmpty()) { 444 rowCount++; 445 kvCount += kvList.size(); 446 kvList.clear(); 447 } 448 assertEquals(expectedKVCount, kvCount); 449 assertEquals(expectedRowCount, rowCount); 450 } 451 452 private void internalTestSeekAndNextForReversibleKeyValueHeap(ReversedKeyValueHeap kvHeap, 453 int startRowNum) throws IOException { 454 // Test next and seek 455 for (int i = startRowNum; i >= 0; i--) { 456 if (i % 2 == 1 && i - 2 >= 0) { 457 i = i - 2; 458 kvHeap.seekToPreviousRow(KeyValueUtil.createFirstOnRow(ROWS[i + 1])); 459 } 460 for (int j = 0; j < QUALSIZE; j++) { 461 if (j % 2 == 1 && (j + 1) < QUALSIZE) { 462 j = j + 1; 463 kvHeap.backwardSeek(makeKV(i, j)); 464 } 465 assertEquals(makeKV(i, j), kvHeap.peek()); 466 kvHeap.next(); 467 } 468 } 469 assertEquals(null, kvHeap.peek()); 470 } 471 472 private ReversedKeyValueHeap getReversibleKeyValueHeap(MemStore memstore, HStoreFile sf1, 473 HStoreFile sf2, byte[] startRow, int readPoint) throws IOException { 474 List<KeyValueScanner> scanners = getScanners(memstore, sf1, sf2, startRow, true, readPoint); 475 ReversedKeyValueHeap kvHeap = new ReversedKeyValueHeap(scanners, CellComparatorImpl.COMPARATOR); 476 return kvHeap; 477 } 478 479 private List<KeyValueScanner> getScanners(MemStore memstore, HStoreFile sf1, HStoreFile sf2, 480 byte[] startRow, boolean doSeek, int readPoint) throws IOException { 481 List<StoreFileScanner> fileScanners = StoreFileScanner 482 .getScannersForStoreFiles(Lists.newArrayList(sf1, sf2), false, true, false, false, readPoint); 483 List<KeyValueScanner> memScanners = memstore.getScanners(readPoint); 484 List<KeyValueScanner> scanners = new ArrayList<>(fileScanners.size() + 1); 485 scanners.addAll(fileScanners); 486 scanners.addAll(memScanners); 487 488 if (doSeek) { 489 if (Bytes.equals(HConstants.EMPTY_START_ROW, startRow)) { 490 for (KeyValueScanner scanner : scanners) { 491 scanner.seekToLastRow(); 492 } 493 } else { 494 KeyValue startKey = KeyValueUtil.createFirstOnRow(startRow); 495 for (KeyValueScanner scanner : scanners) { 496 scanner.backwardSeek(startKey); 497 } 498 } 499 } 500 return scanners; 501 } 502 503 private void seekTestOfReversibleKeyValueScanner(KeyValueScanner scanner) throws IOException { 504 /** 505 * Test without MVCC 506 */ 507 // Test seek to last row 508 assertTrue(scanner.seekToLastRow()); 509 assertEquals(makeKV(ROWSIZE - 1, 0), scanner.peek()); 510 511 // Test backward seek in three cases 512 // Case1: seek in the same row in backwardSeek 513 KeyValue seekKey = makeKV(ROWSIZE - 2, QUALSIZE - 2); 514 assertTrue(scanner.backwardSeek(seekKey)); 515 assertEquals(seekKey, scanner.peek()); 516 517 // Case2: seek to the previous row in backwardSeek 518 int seekRowNum = ROWSIZE - 2; 519 assertTrue(scanner.backwardSeek(KeyValueUtil.createLastOnRow(ROWS[seekRowNum]))); 520 KeyValue expectedKey = makeKV(seekRowNum - 1, 0); 521 assertEquals(expectedKey, scanner.peek()); 522 523 // Case3: unable to backward seek 524 assertFalse(scanner.backwardSeek(KeyValueUtil.createLastOnRow(ROWS[0]))); 525 assertEquals(null, scanner.peek()); 526 527 // Test seek to previous row 528 seekRowNum = ROWSIZE - 4; 529 assertTrue(scanner.seekToPreviousRow(KeyValueUtil.createFirstOnRow(ROWS[seekRowNum]))); 530 expectedKey = makeKV(seekRowNum - 1, 0); 531 assertEquals(expectedKey, scanner.peek()); 532 533 // Test seek to previous row for the first row 534 assertFalse(scanner.seekToPreviousRow(makeKV(0, 0))); 535 assertEquals(null, scanner.peek()); 536 537 } 538 539 private void seekTestOfReversibleKeyValueScannerWithMVCC(List<? extends KeyValueScanner> scanners, 540 int readPoint) throws IOException { 541 /** 542 * Test with MVCC 543 */ 544 // Test seek to last row 545 KeyValue expectedKey = getNextReadableKeyValueWithBackwardScan(ROWSIZE - 1, 0, readPoint); 546 boolean res = false; 547 for (KeyValueScanner scanner : scanners) { 548 res |= scanner.seekToLastRow(); 549 } 550 assertEquals(expectedKey != null, res); 551 res = false; 552 for (KeyValueScanner scanner : scanners) { 553 res |= (expectedKey.equals(scanner.peek())); 554 } 555 assertTrue(res); 556 557 // Test backward seek in two cases 558 // Case1: seek in the same row in backwardSeek 559 expectedKey = getNextReadableKeyValueWithBackwardScan(ROWSIZE - 2, QUALSIZE - 2, readPoint); 560 res = false; 561 for (KeyValueScanner scanner : scanners) { 562 res |= scanner.backwardSeek(expectedKey); 563 } 564 assertEquals(expectedKey != null, res); 565 res = false; 566 for (KeyValueScanner scanner : scanners) { 567 res |= (expectedKey.equals(scanner.peek())); 568 } 569 assertTrue(res); 570 571 // Case2: seek to the previous row in backwardSeek 572 int seekRowNum = ROWSIZE - 3; 573 res = false; 574 for (KeyValueScanner scanner : scanners) { 575 res |= scanner.backwardSeek(expectedKey); 576 } 577 res = false; 578 for (KeyValueScanner scanner : scanners) { 579 res |= (expectedKey.equals(scanner.peek())); 580 } 581 assertTrue(res); 582 583 // Test seek to previous row 584 seekRowNum = ROWSIZE - 4; 585 expectedKey = getNextReadableKeyValueWithBackwardScan(seekRowNum - 1, 0, readPoint); 586 res = false; 587 for (KeyValueScanner scanner : scanners) { 588 res |= scanner.seekToPreviousRow(KeyValueUtil.createFirstOnRow(ROWS[seekRowNum])); 589 } 590 assertEquals(expectedKey != null, res); 591 res = false; 592 for (KeyValueScanner scanner : scanners) { 593 res |= (expectedKey.equals(scanner.peek())); 594 } 595 assertTrue(res); 596 } 597 598 private KeyValue getNextReadableKeyValueWithBackwardScan(int startRowNum, int startQualNum, 599 int readPoint) { 600 Pair<Integer, Integer> nextReadableNum = 601 getNextReadableNumWithBackwardScan(startRowNum, startQualNum, readPoint); 602 if (nextReadableNum == null) return null; 603 return makeKV(nextReadableNum.getFirst(), nextReadableNum.getSecond()); 604 } 605 606 private Pair<Integer, Integer> getNextReadableNumWithBackwardScan(int startRowNum, 607 int startQualNum, int readPoint) { 608 Pair<Integer, Integer> nextReadableNum = null; 609 boolean findExpected = false; 610 for (int i = startRowNum; i >= 0; i--) { 611 for (int j = (i == startRowNum ? startQualNum : 0); j < QUALSIZE; j++) { 612 if (makeMVCC(i, j) <= readPoint) { 613 nextReadableNum = new Pair<>(i, j); 614 findExpected = true; 615 break; 616 } 617 } 618 if (findExpected) break; 619 } 620 return nextReadableNum; 621 } 622 623 private static void loadDataToRegion(HRegion region, byte[] additionalFamily) throws IOException { 624 for (int i = 0; i < ROWSIZE; i++) { 625 Put put = new Put(ROWS[i]); 626 for (int j = 0; j < QUALSIZE; j++) { 627 put.add(makeKV(i, j)); 628 // put additional family 629 put.add(makeKV(i, j, additionalFamily)); 630 } 631 region.put(put); 632 if (i == ROWSIZE / 3 || i == ROWSIZE * 2 / 3) { 633 region.flush(true); 634 } 635 } 636 } 637 638 private static void writeMemstoreAndStoreFiles(MemStore memstore, final StoreFileWriter[] writers) 639 throws IOException { 640 try { 641 for (int i = 0; i < ROWSIZE; i++) { 642 for (int j = 0; j < QUALSIZE; j++) { 643 if (i % 2 == 0) { 644 memstore.add(makeKV(i, j), null); 645 } else { 646 writers[(i + j) % writers.length].append(makeKV(i, j)); 647 } 648 } 649 } 650 } finally { 651 for (int i = 0; i < writers.length; i++) { 652 writers[i].close(); 653 } 654 } 655 } 656 657 private static void writeStoreFile(final StoreFileWriter writer) throws IOException { 658 try { 659 for (int i = 0; i < ROWSIZE; i++) { 660 for (int j = 0; j < QUALSIZE; j++) { 661 writer.append(makeKV(i, j)); 662 } 663 } 664 } finally { 665 writer.close(); 666 } 667 } 668 669 private static void writeMemstore(MemStore memstore) throws IOException { 670 // Add half of the keyvalues to memstore 671 for (int i = 0; i < ROWSIZE; i++) { 672 for (int j = 0; j < QUALSIZE; j++) { 673 if ((i + j) % 2 == 0) { 674 memstore.add(makeKV(i, j), null); 675 } 676 } 677 } 678 memstore.snapshot(); 679 // Add another half of the keyvalues to snapshot 680 for (int i = 0; i < ROWSIZE; i++) { 681 for (int j = 0; j < QUALSIZE; j++) { 682 if ((i + j) % 2 == 1) { 683 memstore.add(makeKV(i, j), null); 684 } 685 } 686 } 687 } 688 689 private static KeyValue makeKV(int rowNum, int cqNum) { 690 return makeKV(rowNum, cqNum, FAMILYNAME); 691 } 692 693 private static KeyValue makeKV(int rowNum, int cqNum, byte[] familyName) { 694 KeyValue kv = 695 new KeyValue(ROWS[rowNum], familyName, QUALS[cqNum], TS, VALUES[rowNum % VALUESIZE]); 696 kv.setSequenceId(makeMVCC(rowNum, cqNum)); 697 return kv; 698 } 699 700 private static long makeMVCC(int rowNum, int cqNum) { 701 return (rowNum + cqNum) % (MAXMVCC + 1); 702 } 703 704 private static byte[][] makeN(byte[] base, int n) { 705 byte[][] ret = new byte[n][]; 706 for (int i = 0; i < n; i++) { 707 ret[i] = Bytes.add(base, Bytes.toBytes(String.format("%04d", i))); 708 } 709 return ret; 710 } 711}