001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023import static org.junit.Assert.fail; 024import static org.mockito.ArgumentMatchers.any; 025import static org.mockito.Mockito.mock; 026import static org.mockito.Mockito.spy; 027import static org.mockito.Mockito.times; 028import static org.mockito.Mockito.verify; 029import static org.mockito.Mockito.when; 030 031import java.io.IOException; 032import java.lang.ref.SoftReference; 033import java.security.PrivilegedExceptionAction; 034import java.util.ArrayList; 035import java.util.Arrays; 036import java.util.Collection; 037import java.util.Collections; 038import java.util.Iterator; 039import java.util.List; 040import java.util.ListIterator; 041import java.util.NavigableSet; 042import java.util.TreeSet; 043import java.util.concurrent.ConcurrentSkipListSet; 044import java.util.concurrent.CountDownLatch; 045import java.util.concurrent.ExecutorService; 046import java.util.concurrent.Executors; 047import java.util.concurrent.ThreadPoolExecutor; 048import java.util.concurrent.TimeUnit; 049import java.util.concurrent.atomic.AtomicBoolean; 050import java.util.concurrent.atomic.AtomicInteger; 051import org.apache.hadoop.conf.Configuration; 052import org.apache.hadoop.fs.FSDataOutputStream; 053import org.apache.hadoop.fs.FileStatus; 054import org.apache.hadoop.fs.FileSystem; 055import org.apache.hadoop.fs.FilterFileSystem; 056import org.apache.hadoop.fs.LocalFileSystem; 057import org.apache.hadoop.fs.Path; 058import org.apache.hadoop.fs.permission.FsPermission; 059import org.apache.hadoop.hbase.Cell; 060import org.apache.hadoop.hbase.CellBuilderFactory; 061import org.apache.hadoop.hbase.CellBuilderType; 062import org.apache.hadoop.hbase.CellComparator; 063import org.apache.hadoop.hbase.CellComparatorImpl; 064import org.apache.hadoop.hbase.CellUtil; 065import org.apache.hadoop.hbase.HBaseClassTestRule; 066import org.apache.hadoop.hbase.HBaseConfiguration; 067import org.apache.hadoop.hbase.HBaseTestingUtility; 068import org.apache.hadoop.hbase.HConstants; 069import org.apache.hadoop.hbase.KeyValue; 070import org.apache.hadoop.hbase.MemoryCompactionPolicy; 071import org.apache.hadoop.hbase.PrivateCellUtil; 072import org.apache.hadoop.hbase.TableName; 073import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 074import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 075import org.apache.hadoop.hbase.client.Get; 076import org.apache.hadoop.hbase.client.RegionInfo; 077import org.apache.hadoop.hbase.client.RegionInfoBuilder; 078import org.apache.hadoop.hbase.client.Scan; 079import org.apache.hadoop.hbase.client.TableDescriptor; 080import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 081import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException; 082import org.apache.hadoop.hbase.filter.Filter; 083import org.apache.hadoop.hbase.filter.FilterBase; 084import org.apache.hadoop.hbase.io.compress.Compression; 085import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 086import org.apache.hadoop.hbase.io.hfile.CacheConfig; 087import org.apache.hadoop.hbase.io.hfile.HFile; 088import org.apache.hadoop.hbase.io.hfile.HFileContext; 089import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 090import org.apache.hadoop.hbase.monitoring.MonitoredTask; 091import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; 092import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; 093import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher; 094import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; 095import org.apache.hadoop.hbase.security.User; 096import org.apache.hadoop.hbase.testclassification.MediumTests; 097import org.apache.hadoop.hbase.testclassification.RegionServerTests; 098import org.apache.hadoop.hbase.util.Bytes; 099import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 100import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; 101import org.apache.hadoop.hbase.util.FSUtils; 102import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge; 103import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; 104import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 105import org.apache.hadoop.hbase.wal.WALFactory; 106import org.apache.hadoop.util.Progressable; 107import org.junit.After; 108import org.junit.AfterClass; 109import org.junit.Before; 110import org.junit.ClassRule; 111import org.junit.Rule; 112import org.junit.Test; 113import org.junit.experimental.categories.Category; 114import org.junit.rules.TestName; 115import org.mockito.Mockito; 116import org.slf4j.Logger; 117import org.slf4j.LoggerFactory; 118 119import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 120 121/** 122 * Test class for the HStore 123 */ 124@Category({ RegionServerTests.class, MediumTests.class }) 125public class TestHStore { 126 127 @ClassRule 128 public static final HBaseClassTestRule CLASS_RULE = 129 HBaseClassTestRule.forClass(TestHStore.class); 130 131 private static final Logger LOG = LoggerFactory.getLogger(TestHStore.class); 132 @Rule 133 public TestName name = new TestName(); 134 135 HRegion region; 136 HStore store; 137 byte [] table = Bytes.toBytes("table"); 138 byte [] family = Bytes.toBytes("family"); 139 140 byte [] row = Bytes.toBytes("row"); 141 byte [] row2 = Bytes.toBytes("row2"); 142 byte [] qf1 = Bytes.toBytes("qf1"); 143 byte [] qf2 = Bytes.toBytes("qf2"); 144 byte [] qf3 = Bytes.toBytes("qf3"); 145 byte [] qf4 = Bytes.toBytes("qf4"); 146 byte [] qf5 = Bytes.toBytes("qf5"); 147 byte [] qf6 = Bytes.toBytes("qf6"); 148 149 NavigableSet<byte[]> qualifiers = new ConcurrentSkipListSet<>(Bytes.BYTES_COMPARATOR); 150 151 List<Cell> expected = new ArrayList<>(); 152 List<Cell> result = new ArrayList<>(); 153 154 long id = System.currentTimeMillis(); 155 Get get = new Get(row); 156 157 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 158 private static final String DIR = TEST_UTIL.getDataTestDir("TestStore").toString(); 159 160 161 /** 162 * Setup 163 * @throws IOException 164 */ 165 @Before 166 public void setUp() throws IOException { 167 qualifiers.add(qf1); 168 qualifiers.add(qf3); 169 qualifiers.add(qf5); 170 171 Iterator<byte[]> iter = qualifiers.iterator(); 172 while(iter.hasNext()){ 173 byte [] next = iter.next(); 174 expected.add(new KeyValue(row, family, next, 1, (byte[])null)); 175 get.addColumn(family, next); 176 } 177 } 178 179 private void init(String methodName) throws IOException { 180 init(methodName, TEST_UTIL.getConfiguration()); 181 } 182 183 private HStore init(String methodName, Configuration conf) throws IOException { 184 // some of the tests write 4 versions and then flush 185 // (with HBASE-4241, lower versions are collected on flush) 186 return init(methodName, conf, 187 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(4).build()); 188 } 189 190 private HStore init(String methodName, Configuration conf, ColumnFamilyDescriptor hcd) 191 throws IOException { 192 return init(methodName, conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), hcd); 193 } 194 195 private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder, 196 ColumnFamilyDescriptor hcd) throws IOException { 197 return init(methodName, conf, builder, hcd, null); 198 } 199 200 private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder, 201 ColumnFamilyDescriptor hcd, MyStoreHook hook) throws IOException { 202 return init(methodName, conf, builder, hcd, hook, false); 203 } 204 205 private void initHRegion(String methodName, Configuration conf, TableDescriptorBuilder builder, 206 ColumnFamilyDescriptor hcd, MyStoreHook hook, boolean switchToPread) throws IOException { 207 TableDescriptor htd = builder.setColumnFamily(hcd).build(); 208 Path basedir = new Path(DIR + methodName); 209 Path tableDir = FSUtils.getTableDir(basedir, htd.getTableName()); 210 final Path logdir = new Path(basedir, AbstractFSWALProvider.getWALDirectoryName(methodName)); 211 212 FileSystem fs = FileSystem.get(conf); 213 214 fs.delete(logdir, true); 215 ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 216 MemStoreLABImpl.CHUNK_SIZE_DEFAULT, 1, 0, null); 217 RegionInfo info = RegionInfoBuilder.newBuilder(htd.getTableName()).build(); 218 Configuration walConf = new Configuration(conf); 219 FSUtils.setRootDir(walConf, basedir); 220 WALFactory wals = new WALFactory(walConf, methodName); 221 region = new HRegion(new HRegionFileSystem(conf, fs, tableDir, info), wals.getWAL(info), conf, 222 htd, null); 223 region.regionServicesForStores = Mockito.spy(region.regionServicesForStores); 224 ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1); 225 Mockito.when(region.regionServicesForStores.getInMemoryCompactionPool()).thenReturn(pool); 226 } 227 228 private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder, 229 ColumnFamilyDescriptor hcd, MyStoreHook hook, boolean switchToPread) throws IOException { 230 initHRegion(methodName, conf, builder, hcd, hook, switchToPread); 231 if (hook == null) { 232 store = new HStore(region, hcd, conf); 233 } else { 234 store = new MyStore(region, hcd, conf, hook, switchToPread); 235 } 236 return store; 237 } 238 239 /** 240 * Test we do not lose data if we fail a flush and then close. 241 * Part of HBase-10466 242 * @throws Exception 243 */ 244 @Test 245 public void testFlushSizeSizing() throws Exception { 246 LOG.info("Setting up a faulty file system that cannot write in " + this.name.getMethodName()); 247 final Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); 248 // Only retry once. 249 conf.setInt("hbase.hstore.flush.retries.number", 1); 250 User user = User.createUserForTesting(conf, this.name.getMethodName(), 251 new String[]{"foo"}); 252 // Inject our faulty LocalFileSystem 253 conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class); 254 user.runAs(new PrivilegedExceptionAction<Object>() { 255 @Override 256 public Object run() throws Exception { 257 // Make sure it worked (above is sensitive to caching details in hadoop core) 258 FileSystem fs = FileSystem.get(conf); 259 assertEquals(FaultyFileSystem.class, fs.getClass()); 260 FaultyFileSystem ffs = (FaultyFileSystem)fs; 261 262 // Initialize region 263 init(name.getMethodName(), conf); 264 265 MemStoreSize mss = store.memstore.getFlushableSize(); 266 assertEquals(0, mss.getDataSize()); 267 LOG.info("Adding some data"); 268 MemStoreSizing kvSize = new NonThreadSafeMemStoreSizing(); 269 store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), kvSize); 270 // add the heap size of active (mutable) segment 271 kvSize.incMemStoreSize(0, MutableSegment.DEEP_OVERHEAD, 0, 0); 272 mss = store.memstore.getFlushableSize(); 273 assertEquals(kvSize.getMemStoreSize(), mss); 274 // Flush. Bug #1 from HBASE-10466. Make sure size calculation on failed flush is right. 275 try { 276 LOG.info("Flushing"); 277 flushStore(store, id++); 278 fail("Didn't bubble up IOE!"); 279 } catch (IOException ioe) { 280 assertTrue(ioe.getMessage().contains("Fault injected")); 281 } 282 // due to snapshot, change mutable to immutable segment 283 kvSize.incMemStoreSize(0, 284 CSLMImmutableSegment.DEEP_OVERHEAD_CSLM - MutableSegment.DEEP_OVERHEAD, 0, 0); 285 mss = store.memstore.getFlushableSize(); 286 assertEquals(kvSize.getMemStoreSize(), mss); 287 MemStoreSizing kvSize2 = new NonThreadSafeMemStoreSizing(); 288 store.add(new KeyValue(row, family, qf2, 2, (byte[]) null), kvSize2); 289 kvSize2.incMemStoreSize(0, MutableSegment.DEEP_OVERHEAD, 0, 0); 290 // Even though we add a new kv, we expect the flushable size to be 'same' since we have 291 // not yet cleared the snapshot -- the above flush failed. 292 assertEquals(kvSize.getMemStoreSize(), mss); 293 ffs.fault.set(false); 294 flushStore(store, id++); 295 mss = store.memstore.getFlushableSize(); 296 // Size should be the foreground kv size. 297 assertEquals(kvSize2.getMemStoreSize(), mss); 298 flushStore(store, id++); 299 mss = store.memstore.getFlushableSize(); 300 assertEquals(0, mss.getDataSize()); 301 assertEquals(MutableSegment.DEEP_OVERHEAD, mss.getHeapSize()); 302 return null; 303 } 304 }); 305 } 306 307 /** 308 * Verify that compression and data block encoding are respected by the 309 * Store.createWriterInTmp() method, used on store flush. 310 */ 311 @Test 312 public void testCreateWriter() throws Exception { 313 Configuration conf = HBaseConfiguration.create(); 314 FileSystem fs = FileSystem.get(conf); 315 316 ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.newBuilder(family) 317 .setCompressionType(Compression.Algorithm.GZ).setDataBlockEncoding(DataBlockEncoding.DIFF) 318 .build(); 319 init(name.getMethodName(), conf, hcd); 320 321 // Test createWriterInTmp() 322 StoreFileWriter writer = 323 store.createWriterInTmp(4, hcd.getCompressionType(), false, true, false, false); 324 Path path = writer.getPath(); 325 writer.append(new KeyValue(row, family, qf1, Bytes.toBytes(1))); 326 writer.append(new KeyValue(row, family, qf2, Bytes.toBytes(2))); 327 writer.append(new KeyValue(row2, family, qf1, Bytes.toBytes(3))); 328 writer.append(new KeyValue(row2, family, qf2, Bytes.toBytes(4))); 329 writer.close(); 330 331 // Verify that compression and encoding settings are respected 332 HFile.Reader reader = HFile.createReader(fs, path, new CacheConfig(conf), true, conf); 333 assertEquals(hcd.getCompressionType(), reader.getCompressionAlgorithm()); 334 assertEquals(hcd.getDataBlockEncoding(), reader.getDataBlockEncoding()); 335 reader.close(); 336 } 337 338 @Test 339 public void testDeleteExpiredStoreFiles() throws Exception { 340 testDeleteExpiredStoreFiles(0); 341 testDeleteExpiredStoreFiles(1); 342 } 343 344 /* 345 * @param minVersions the MIN_VERSIONS for the column family 346 */ 347 public void testDeleteExpiredStoreFiles(int minVersions) throws Exception { 348 int storeFileNum = 4; 349 int ttl = 4; 350 IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge(); 351 EnvironmentEdgeManagerTestHelper.injectEdge(edge); 352 353 Configuration conf = HBaseConfiguration.create(); 354 // Enable the expired store file deletion 355 conf.setBoolean("hbase.store.delete.expired.storefile", true); 356 // Set the compaction threshold higher to avoid normal compactions. 357 conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 5); 358 359 init(name.getMethodName() + "-" + minVersions, conf, ColumnFamilyDescriptorBuilder 360 .newBuilder(family).setMinVersions(minVersions).setTimeToLive(ttl).build()); 361 362 long storeTtl = this.store.getScanInfo().getTtl(); 363 long sleepTime = storeTtl / storeFileNum; 364 long timeStamp; 365 // There are 4 store files and the max time stamp difference among these 366 // store files will be (this.store.ttl / storeFileNum) 367 for (int i = 1; i <= storeFileNum; i++) { 368 LOG.info("Adding some data for the store file #" + i); 369 timeStamp = EnvironmentEdgeManager.currentTime(); 370 this.store.add(new KeyValue(row, family, qf1, timeStamp, (byte[]) null), null); 371 this.store.add(new KeyValue(row, family, qf2, timeStamp, (byte[]) null), null); 372 this.store.add(new KeyValue(row, family, qf3, timeStamp, (byte[]) null), null); 373 flush(i); 374 edge.incrementTime(sleepTime); 375 } 376 377 // Verify the total number of store files 378 assertEquals(storeFileNum, this.store.getStorefiles().size()); 379 380 // Each call will find one expired store file and delete it before compaction happens. 381 // There will be no compaction due to threshold above. Last file will not be replaced. 382 for (int i = 1; i <= storeFileNum - 1; i++) { 383 // verify the expired store file. 384 assertFalse(this.store.requestCompaction().isPresent()); 385 Collection<HStoreFile> sfs = this.store.getStorefiles(); 386 // Ensure i files are gone. 387 if (minVersions == 0) { 388 assertEquals(storeFileNum - i, sfs.size()); 389 // Ensure only non-expired files remain. 390 for (HStoreFile sf : sfs) { 391 assertTrue(sf.getReader().getMaxTimestamp() >= (edge.currentTime() - storeTtl)); 392 } 393 } else { 394 assertEquals(storeFileNum, sfs.size()); 395 } 396 // Let the next store file expired. 397 edge.incrementTime(sleepTime); 398 } 399 assertFalse(this.store.requestCompaction().isPresent()); 400 401 Collection<HStoreFile> sfs = this.store.getStorefiles(); 402 // Assert the last expired file is not removed. 403 if (minVersions == 0) { 404 assertEquals(1, sfs.size()); 405 } 406 long ts = sfs.iterator().next().getReader().getMaxTimestamp(); 407 assertTrue(ts < (edge.currentTime() - storeTtl)); 408 409 for (HStoreFile sf : sfs) { 410 sf.closeStoreFile(true); 411 } 412 } 413 414 @Test 415 public void testLowestModificationTime() throws Exception { 416 Configuration conf = HBaseConfiguration.create(); 417 FileSystem fs = FileSystem.get(conf); 418 // Initialize region 419 init(name.getMethodName(), conf); 420 421 int storeFileNum = 4; 422 for (int i = 1; i <= storeFileNum; i++) { 423 LOG.info("Adding some data for the store file #"+i); 424 this.store.add(new KeyValue(row, family, qf1, i, (byte[])null), null); 425 this.store.add(new KeyValue(row, family, qf2, i, (byte[])null), null); 426 this.store.add(new KeyValue(row, family, qf3, i, (byte[])null), null); 427 flush(i); 428 } 429 // after flush; check the lowest time stamp 430 long lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles()); 431 long lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles()); 432 assertEquals(lowestTimeStampFromManager,lowestTimeStampFromFS); 433 434 // after compact; check the lowest time stamp 435 store.compact(store.requestCompaction().get(), NoLimitThroughputController.INSTANCE, null); 436 lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles()); 437 lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles()); 438 assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS); 439 } 440 441 private static long getLowestTimeStampFromFS(FileSystem fs, 442 final Collection<HStoreFile> candidates) throws IOException { 443 long minTs = Long.MAX_VALUE; 444 if (candidates.isEmpty()) { 445 return minTs; 446 } 447 Path[] p = new Path[candidates.size()]; 448 int i = 0; 449 for (HStoreFile sf : candidates) { 450 p[i] = sf.getPath(); 451 ++i; 452 } 453 454 FileStatus[] stats = fs.listStatus(p); 455 if (stats == null || stats.length == 0) { 456 return minTs; 457 } 458 for (FileStatus s : stats) { 459 minTs = Math.min(minTs, s.getModificationTime()); 460 } 461 return minTs; 462 } 463 464 ////////////////////////////////////////////////////////////////////////////// 465 // Get tests 466 ////////////////////////////////////////////////////////////////////////////// 467 468 private static final int BLOCKSIZE_SMALL = 8192; 469 /** 470 * Test for hbase-1686. 471 * @throws IOException 472 */ 473 @Test 474 public void testEmptyStoreFile() throws IOException { 475 init(this.name.getMethodName()); 476 // Write a store file. 477 this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null); 478 this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null), null); 479 flush(1); 480 // Now put in place an empty store file. Its a little tricky. Have to 481 // do manually with hacked in sequence id. 482 HStoreFile f = this.store.getStorefiles().iterator().next(); 483 Path storedir = f.getPath().getParent(); 484 long seqid = f.getMaxSequenceId(); 485 Configuration c = HBaseConfiguration.create(); 486 FileSystem fs = FileSystem.get(c); 487 HFileContext meta = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).build(); 488 StoreFileWriter w = new StoreFileWriter.Builder(c, new CacheConfig(c), 489 fs) 490 .withOutputDir(storedir) 491 .withFileContext(meta) 492 .build(); 493 w.appendMetadata(seqid + 1, false); 494 w.close(); 495 this.store.close(); 496 // Reopen it... should pick up two files 497 this.store = new HStore(this.store.getHRegion(), this.store.getColumnFamilyDescriptor(), c); 498 assertEquals(2, this.store.getStorefilesCount()); 499 500 result = HBaseTestingUtility.getFromStoreFile(store, 501 get.getRow(), 502 qualifiers); 503 assertEquals(1, result.size()); 504 } 505 506 /** 507 * Getting data from memstore only 508 * @throws IOException 509 */ 510 @Test 511 public void testGet_FromMemStoreOnly() throws IOException { 512 init(this.name.getMethodName()); 513 514 //Put data in memstore 515 this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null); 516 this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null), null); 517 this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null), null); 518 this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null), null); 519 this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null), null); 520 this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null), null); 521 522 //Get 523 result = HBaseTestingUtility.getFromStoreFile(store, 524 get.getRow(), qualifiers); 525 526 //Compare 527 assertCheck(); 528 } 529 530 @Test 531 public void testTimeRangeIfSomeCellsAreDroppedInFlush() throws IOException { 532 testTimeRangeIfSomeCellsAreDroppedInFlush(1); 533 testTimeRangeIfSomeCellsAreDroppedInFlush(3); 534 testTimeRangeIfSomeCellsAreDroppedInFlush(5); 535 } 536 537 private void testTimeRangeIfSomeCellsAreDroppedInFlush(int maxVersion) throws IOException { 538 init(this.name.getMethodName(), TEST_UTIL.getConfiguration(), 539 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(maxVersion).build()); 540 long currentTs = 100; 541 long minTs = currentTs; 542 // the extra cell won't be flushed to disk, 543 // so the min of timerange will be different between memStore and hfile. 544 for (int i = 0; i != (maxVersion + 1); ++i) { 545 this.store.add(new KeyValue(row, family, qf1, ++currentTs, (byte[])null), null); 546 if (i == 1) { 547 minTs = currentTs; 548 } 549 } 550 flushStore(store, id++); 551 552 Collection<HStoreFile> files = store.getStorefiles(); 553 assertEquals(1, files.size()); 554 HStoreFile f = files.iterator().next(); 555 f.initReader(); 556 StoreFileReader reader = f.getReader(); 557 assertEquals(minTs, reader.timeRange.getMin()); 558 assertEquals(currentTs, reader.timeRange.getMax()); 559 } 560 561 /** 562 * Getting data from files only 563 * @throws IOException 564 */ 565 @Test 566 public void testGet_FromFilesOnly() throws IOException { 567 init(this.name.getMethodName()); 568 569 //Put data in memstore 570 this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null); 571 this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null), null); 572 //flush 573 flush(1); 574 575 //Add more data 576 this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null), null); 577 this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null), null); 578 //flush 579 flush(2); 580 581 //Add more data 582 this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null), null); 583 this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null), null); 584 //flush 585 flush(3); 586 587 //Get 588 result = HBaseTestingUtility.getFromStoreFile(store, 589 get.getRow(), 590 qualifiers); 591 //this.store.get(get, qualifiers, result); 592 593 //Need to sort the result since multiple files 594 Collections.sort(result, CellComparatorImpl.COMPARATOR); 595 596 //Compare 597 assertCheck(); 598 } 599 600 /** 601 * Getting data from memstore and files 602 * @throws IOException 603 */ 604 @Test 605 public void testGet_FromMemStoreAndFiles() throws IOException { 606 init(this.name.getMethodName()); 607 608 //Put data in memstore 609 this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null); 610 this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null), null); 611 //flush 612 flush(1); 613 614 //Add more data 615 this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null), null); 616 this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null), null); 617 //flush 618 flush(2); 619 620 //Add more data 621 this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null), null); 622 this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null), null); 623 624 //Get 625 result = HBaseTestingUtility.getFromStoreFile(store, 626 get.getRow(), qualifiers); 627 628 //Need to sort the result since multiple files 629 Collections.sort(result, CellComparatorImpl.COMPARATOR); 630 631 //Compare 632 assertCheck(); 633 } 634 635 private void flush(int storeFilessize) throws IOException{ 636 this.store.snapshot(); 637 flushStore(store, id++); 638 assertEquals(storeFilessize, this.store.getStorefiles().size()); 639 assertEquals(0, ((AbstractMemStore)this.store.memstore).getActive().getCellsCount()); 640 } 641 642 private void assertCheck() { 643 assertEquals(expected.size(), result.size()); 644 for(int i=0; i<expected.size(); i++) { 645 assertEquals(expected.get(i), result.get(i)); 646 } 647 } 648 649 @After 650 public void tearDown() throws Exception { 651 EnvironmentEdgeManagerTestHelper.reset(); 652 if (store != null) { 653 try { 654 store.close(); 655 } catch (IOException e) { 656 } 657 store = null; 658 } 659 if (region != null) { 660 region.close(); 661 region = null; 662 } 663 } 664 665 @AfterClass 666 public static void tearDownAfterClass() throws IOException { 667 TEST_UTIL.cleanupTestDir(); 668 } 669 670 @Test 671 public void testHandleErrorsInFlush() throws Exception { 672 LOG.info("Setting up a faulty file system that cannot write"); 673 674 final Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); 675 User user = User.createUserForTesting(conf, 676 "testhandleerrorsinflush", new String[]{"foo"}); 677 // Inject our faulty LocalFileSystem 678 conf.setClass("fs.file.impl", FaultyFileSystem.class, 679 FileSystem.class); 680 user.runAs(new PrivilegedExceptionAction<Object>() { 681 @Override 682 public Object run() throws Exception { 683 // Make sure it worked (above is sensitive to caching details in hadoop core) 684 FileSystem fs = FileSystem.get(conf); 685 assertEquals(FaultyFileSystem.class, fs.getClass()); 686 687 // Initialize region 688 init(name.getMethodName(), conf); 689 690 LOG.info("Adding some data"); 691 store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null); 692 store.add(new KeyValue(row, family, qf2, 1, (byte[])null), null); 693 store.add(new KeyValue(row, family, qf3, 1, (byte[])null), null); 694 695 LOG.info("Before flush, we should have no files"); 696 697 Collection<StoreFileInfo> files = 698 store.getRegionFileSystem().getStoreFiles(store.getColumnFamilyName()); 699 assertEquals(0, files != null ? files.size() : 0); 700 701 //flush 702 try { 703 LOG.info("Flushing"); 704 flush(1); 705 fail("Didn't bubble up IOE!"); 706 } catch (IOException ioe) { 707 assertTrue(ioe.getMessage().contains("Fault injected")); 708 } 709 710 LOG.info("After failed flush, we should still have no files!"); 711 files = store.getRegionFileSystem().getStoreFiles(store.getColumnFamilyName()); 712 assertEquals(0, files != null ? files.size() : 0); 713 store.getHRegion().getWAL().close(); 714 return null; 715 } 716 }); 717 FileSystem.closeAllForUGI(user.getUGI()); 718 } 719 720 /** 721 * Faulty file system that will fail if you write past its fault position the FIRST TIME 722 * only; thereafter it will succeed. Used by {@link TestHRegion} too. 723 */ 724 static class FaultyFileSystem extends FilterFileSystem { 725 List<SoftReference<FaultyOutputStream>> outStreams = new ArrayList<>(); 726 private long faultPos = 200; 727 AtomicBoolean fault = new AtomicBoolean(true); 728 729 public FaultyFileSystem() { 730 super(new LocalFileSystem()); 731 System.err.println("Creating faulty!"); 732 } 733 734 @Override 735 public FSDataOutputStream create(Path p) throws IOException { 736 return new FaultyOutputStream(super.create(p), faultPos, fault); 737 } 738 739 @Override 740 public FSDataOutputStream create(Path f, FsPermission permission, 741 boolean overwrite, int bufferSize, short replication, long blockSize, 742 Progressable progress) throws IOException { 743 return new FaultyOutputStream(super.create(f, permission, 744 overwrite, bufferSize, replication, blockSize, progress), faultPos, fault); 745 } 746 747 @Override 748 public FSDataOutputStream createNonRecursive(Path f, boolean overwrite, 749 int bufferSize, short replication, long blockSize, Progressable progress) 750 throws IOException { 751 // Fake it. Call create instead. The default implementation throws an IOE 752 // that this is not supported. 753 return create(f, overwrite, bufferSize, replication, blockSize, progress); 754 } 755 } 756 757 static class FaultyOutputStream extends FSDataOutputStream { 758 volatile long faultPos = Long.MAX_VALUE; 759 private final AtomicBoolean fault; 760 761 public FaultyOutputStream(FSDataOutputStream out, long faultPos, final AtomicBoolean fault) 762 throws IOException { 763 super(out, null); 764 this.faultPos = faultPos; 765 this.fault = fault; 766 } 767 768 @Override 769 public synchronized void write(byte[] buf, int offset, int length) throws IOException { 770 System.err.println("faulty stream write at pos " + getPos()); 771 injectFault(); 772 super.write(buf, offset, length); 773 } 774 775 private void injectFault() throws IOException { 776 if (this.fault.get() && getPos() >= faultPos) { 777 throw new IOException("Fault injected"); 778 } 779 } 780 } 781 782 private static void flushStore(HStore store, long id) throws IOException { 783 StoreFlushContext storeFlushCtx = store.createFlushContext(id, FlushLifeCycleTracker.DUMMY); 784 storeFlushCtx.prepare(); 785 storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class)); 786 storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); 787 } 788 789 /** 790 * Generate a list of KeyValues for testing based on given parameters 791 * @param timestamps 792 * @param numRows 793 * @param qualifier 794 * @param family 795 * @return the rows key-value list 796 */ 797 List<Cell> getKeyValueSet(long[] timestamps, int numRows, 798 byte[] qualifier, byte[] family) { 799 List<Cell> kvList = new ArrayList<>(); 800 for (int i=1;i<=numRows;i++) { 801 byte[] b = Bytes.toBytes(i); 802 for (long timestamp: timestamps) { 803 kvList.add(new KeyValue(b, family, qualifier, timestamp, b)); 804 } 805 } 806 return kvList; 807 } 808 809 /** 810 * Test to ensure correctness when using Stores with multiple timestamps 811 * @throws IOException 812 */ 813 @Test 814 public void testMultipleTimestamps() throws IOException { 815 int numRows = 1; 816 long[] timestamps1 = new long[] {1,5,10,20}; 817 long[] timestamps2 = new long[] {30,80}; 818 819 init(this.name.getMethodName()); 820 821 List<Cell> kvList1 = getKeyValueSet(timestamps1,numRows, qf1, family); 822 for (Cell kv : kvList1) { 823 this.store.add(kv, null); 824 } 825 826 this.store.snapshot(); 827 flushStore(store, id++); 828 829 List<Cell> kvList2 = getKeyValueSet(timestamps2,numRows, qf1, family); 830 for(Cell kv : kvList2) { 831 this.store.add(kv, null); 832 } 833 834 List<Cell> result; 835 Get get = new Get(Bytes.toBytes(1)); 836 get.addColumn(family,qf1); 837 838 get.setTimeRange(0,15); 839 result = HBaseTestingUtility.getFromStoreFile(store, get); 840 assertTrue(result.size()>0); 841 842 get.setTimeRange(40,90); 843 result = HBaseTestingUtility.getFromStoreFile(store, get); 844 assertTrue(result.size()>0); 845 846 get.setTimeRange(10,45); 847 result = HBaseTestingUtility.getFromStoreFile(store, get); 848 assertTrue(result.size()>0); 849 850 get.setTimeRange(80,145); 851 result = HBaseTestingUtility.getFromStoreFile(store, get); 852 assertTrue(result.size()>0); 853 854 get.setTimeRange(1,2); 855 result = HBaseTestingUtility.getFromStoreFile(store, get); 856 assertTrue(result.size()>0); 857 858 get.setTimeRange(90,200); 859 result = HBaseTestingUtility.getFromStoreFile(store, get); 860 assertTrue(result.size()==0); 861 } 862 863 /** 864 * Test for HBASE-3492 - Test split on empty colfam (no store files). 865 * 866 * @throws IOException When the IO operations fail. 867 */ 868 @Test 869 public void testSplitWithEmptyColFam() throws IOException { 870 init(this.name.getMethodName()); 871 assertFalse(store.getSplitPoint().isPresent()); 872 store.getHRegion().forceSplit(null); 873 assertFalse(store.getSplitPoint().isPresent()); 874 store.getHRegion().clearSplit(); 875 } 876 877 @Test 878 public void testStoreUsesConfigurationFromHcdAndHtd() throws Exception { 879 final String CONFIG_KEY = "hbase.regionserver.thread.compaction.throttle"; 880 long anyValue = 10; 881 882 // We'll check that it uses correct config and propagates it appropriately by going thru 883 // the simplest "real" path I can find - "throttleCompaction", which just checks whether 884 // a number we pass in is higher than some config value, inside compactionPolicy. 885 Configuration conf = HBaseConfiguration.create(); 886 conf.setLong(CONFIG_KEY, anyValue); 887 init(name.getMethodName() + "-xml", conf); 888 assertTrue(store.throttleCompaction(anyValue + 1)); 889 assertFalse(store.throttleCompaction(anyValue)); 890 891 // HTD overrides XML. 892 --anyValue; 893 init(name.getMethodName() + "-htd", conf, TableDescriptorBuilder 894 .newBuilder(TableName.valueOf(table)).setValue(CONFIG_KEY, Long.toString(anyValue)), 895 ColumnFamilyDescriptorBuilder.of(family)); 896 assertTrue(store.throttleCompaction(anyValue + 1)); 897 assertFalse(store.throttleCompaction(anyValue)); 898 899 // HCD overrides them both. 900 --anyValue; 901 init(name.getMethodName() + "-hcd", conf, 902 TableDescriptorBuilder.newBuilder(TableName.valueOf(table)).setValue(CONFIG_KEY, 903 Long.toString(anyValue)), 904 ColumnFamilyDescriptorBuilder.newBuilder(family).setValue(CONFIG_KEY, Long.toString(anyValue)) 905 .build()); 906 assertTrue(store.throttleCompaction(anyValue + 1)); 907 assertFalse(store.throttleCompaction(anyValue)); 908 } 909 910 public static class DummyStoreEngine extends DefaultStoreEngine { 911 public static DefaultCompactor lastCreatedCompactor = null; 912 913 @Override 914 protected void createComponents(Configuration conf, HStore store, CellComparator comparator) 915 throws IOException { 916 super.createComponents(conf, store, comparator); 917 lastCreatedCompactor = this.compactor; 918 } 919 } 920 921 @Test 922 public void testStoreUsesSearchEngineOverride() throws Exception { 923 Configuration conf = HBaseConfiguration.create(); 924 conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DummyStoreEngine.class.getName()); 925 init(this.name.getMethodName(), conf); 926 assertEquals(DummyStoreEngine.lastCreatedCompactor, 927 this.store.storeEngine.getCompactor()); 928 } 929 930 private void addStoreFile() throws IOException { 931 HStoreFile f = this.store.getStorefiles().iterator().next(); 932 Path storedir = f.getPath().getParent(); 933 long seqid = this.store.getMaxSequenceId().orElse(0L); 934 Configuration c = TEST_UTIL.getConfiguration(); 935 FileSystem fs = FileSystem.get(c); 936 HFileContext fileContext = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).build(); 937 StoreFileWriter w = new StoreFileWriter.Builder(c, new CacheConfig(c), 938 fs) 939 .withOutputDir(storedir) 940 .withFileContext(fileContext) 941 .build(); 942 w.appendMetadata(seqid + 1, false); 943 w.close(); 944 LOG.info("Added store file:" + w.getPath()); 945 } 946 947 private void archiveStoreFile(int index) throws IOException { 948 Collection<HStoreFile> files = this.store.getStorefiles(); 949 HStoreFile sf = null; 950 Iterator<HStoreFile> it = files.iterator(); 951 for (int i = 0; i <= index; i++) { 952 sf = it.next(); 953 } 954 store.getRegionFileSystem().removeStoreFiles(store.getColumnFamilyName(), Lists.newArrayList(sf)); 955 } 956 957 private void closeCompactedFile(int index) throws IOException { 958 Collection<HStoreFile> files = 959 this.store.getStoreEngine().getStoreFileManager().getCompactedfiles(); 960 HStoreFile sf = null; 961 Iterator<HStoreFile> it = files.iterator(); 962 for (int i = 0; i <= index; i++) { 963 sf = it.next(); 964 } 965 sf.closeStoreFile(true); 966 store.getStoreEngine().getStoreFileManager().removeCompactedFiles(Lists.newArrayList(sf)); 967 } 968 969 @Test 970 public void testRefreshStoreFiles() throws Exception { 971 init(name.getMethodName()); 972 973 assertEquals(0, this.store.getStorefilesCount()); 974 975 // Test refreshing store files when no store files are there 976 store.refreshStoreFiles(); 977 assertEquals(0, this.store.getStorefilesCount()); 978 979 // add some data, flush 980 this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null); 981 flush(1); 982 assertEquals(1, this.store.getStorefilesCount()); 983 984 // add one more file 985 addStoreFile(); 986 987 assertEquals(1, this.store.getStorefilesCount()); 988 store.refreshStoreFiles(); 989 assertEquals(2, this.store.getStorefilesCount()); 990 991 // add three more files 992 addStoreFile(); 993 addStoreFile(); 994 addStoreFile(); 995 996 assertEquals(2, this.store.getStorefilesCount()); 997 store.refreshStoreFiles(); 998 assertEquals(5, this.store.getStorefilesCount()); 999 1000 closeCompactedFile(0); 1001 archiveStoreFile(0); 1002 1003 assertEquals(5, this.store.getStorefilesCount()); 1004 store.refreshStoreFiles(); 1005 assertEquals(4, this.store.getStorefilesCount()); 1006 1007 archiveStoreFile(0); 1008 archiveStoreFile(1); 1009 archiveStoreFile(2); 1010 1011 assertEquals(4, this.store.getStorefilesCount()); 1012 store.refreshStoreFiles(); 1013 assertEquals(1, this.store.getStorefilesCount()); 1014 1015 archiveStoreFile(0); 1016 store.refreshStoreFiles(); 1017 assertEquals(0, this.store.getStorefilesCount()); 1018 } 1019 1020 @Test 1021 public void testRefreshStoreFilesNotChanged() throws IOException { 1022 init(name.getMethodName()); 1023 1024 assertEquals(0, this.store.getStorefilesCount()); 1025 1026 // add some data, flush 1027 this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null); 1028 flush(1); 1029 // add one more file 1030 addStoreFile(); 1031 1032 HStore spiedStore = spy(store); 1033 1034 // call first time after files changed 1035 spiedStore.refreshStoreFiles(); 1036 assertEquals(2, this.store.getStorefilesCount()); 1037 verify(spiedStore, times(1)).replaceStoreFiles(any(), any()); 1038 1039 // call second time 1040 spiedStore.refreshStoreFiles(); 1041 1042 //ensure that replaceStoreFiles is not called if files are not refreshed 1043 verify(spiedStore, times(0)).replaceStoreFiles(null, null); 1044 } 1045 1046 private long countMemStoreScanner(StoreScanner scanner) { 1047 if (scanner.currentScanners == null) { 1048 return 0; 1049 } 1050 return scanner.currentScanners.stream() 1051 .filter(s -> !s.isFileScanner()) 1052 .count(); 1053 } 1054 1055 @Test 1056 public void testNumberOfMemStoreScannersAfterFlush() throws IOException { 1057 long seqId = 100; 1058 long timestamp = System.currentTimeMillis(); 1059 Cell cell0 = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family) 1060 .setQualifier(qf1).setTimestamp(timestamp).setType(Cell.Type.Put) 1061 .setValue(qf1).build(); 1062 PrivateCellUtil.setSequenceId(cell0, seqId); 1063 testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Collections.emptyList()); 1064 1065 Cell cell1 = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family) 1066 .setQualifier(qf2).setTimestamp(timestamp).setType(Cell.Type.Put) 1067 .setValue(qf1).build(); 1068 PrivateCellUtil.setSequenceId(cell1, seqId); 1069 testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Arrays.asList(cell1)); 1070 1071 seqId = 101; 1072 timestamp = System.currentTimeMillis(); 1073 Cell cell2 = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row2).setFamily(family) 1074 .setQualifier(qf2).setTimestamp(timestamp).setType(Cell.Type.Put) 1075 .setValue(qf1).build(); 1076 PrivateCellUtil.setSequenceId(cell2, seqId); 1077 testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Arrays.asList(cell1, cell2)); 1078 } 1079 1080 private void testNumberOfMemStoreScannersAfterFlush(List<Cell> inputCellsBeforeSnapshot, 1081 List<Cell> inputCellsAfterSnapshot) throws IOException { 1082 init(this.name.getMethodName() + "-" + inputCellsBeforeSnapshot.size()); 1083 TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); 1084 long seqId = Long.MIN_VALUE; 1085 for (Cell c : inputCellsBeforeSnapshot) { 1086 quals.add(CellUtil.cloneQualifier(c)); 1087 seqId = Math.max(seqId, c.getSequenceId()); 1088 } 1089 for (Cell c : inputCellsAfterSnapshot) { 1090 quals.add(CellUtil.cloneQualifier(c)); 1091 seqId = Math.max(seqId, c.getSequenceId()); 1092 } 1093 inputCellsBeforeSnapshot.forEach(c -> store.add(c, null)); 1094 StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY); 1095 storeFlushCtx.prepare(); 1096 inputCellsAfterSnapshot.forEach(c -> store.add(c, null)); 1097 int numberOfMemScannersBeforeFlush = inputCellsAfterSnapshot.isEmpty() ? 1 : 2; 1098 try (StoreScanner s = (StoreScanner) store.getScanner(new Scan(), quals, seqId)) { 1099 // snapshot + active (if inputCellsAfterSnapshot isn't empty) 1100 assertEquals(numberOfMemScannersBeforeFlush, countMemStoreScanner(s)); 1101 storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class)); 1102 storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); 1103 // snapshot has no data after flush 1104 int numberOfMemScannersAfterFlush = inputCellsAfterSnapshot.isEmpty() ? 0 : 1; 1105 boolean more; 1106 int cellCount = 0; 1107 do { 1108 List<Cell> cells = new ArrayList<>(); 1109 more = s.next(cells); 1110 cellCount += cells.size(); 1111 assertEquals(more ? numberOfMemScannersAfterFlush : 0, countMemStoreScanner(s)); 1112 } while (more); 1113 assertEquals("The number of cells added before snapshot is " + inputCellsBeforeSnapshot.size() 1114 + ", The number of cells added after snapshot is " + inputCellsAfterSnapshot.size(), 1115 inputCellsBeforeSnapshot.size() + inputCellsAfterSnapshot.size(), cellCount); 1116 // the current scanners is cleared 1117 assertEquals(0, countMemStoreScanner(s)); 1118 } 1119 } 1120 1121 private Cell createCell(byte[] qualifier, long ts, long sequenceId, byte[] value) 1122 throws IOException { 1123 return createCell(row, qualifier, ts, sequenceId, value); 1124 } 1125 1126 private Cell createCell(byte[] row, byte[] qualifier, long ts, long sequenceId, byte[] value) 1127 throws IOException { 1128 Cell c = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family) 1129 .setQualifier(qualifier).setTimestamp(ts).setType(Cell.Type.Put) 1130 .setValue(value).build(); 1131 PrivateCellUtil.setSequenceId(c, sequenceId); 1132 return c; 1133 } 1134 1135 @Test 1136 public void testFlushBeforeCompletingScanWoFilter() throws IOException, InterruptedException { 1137 final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false); 1138 final int expectedSize = 3; 1139 testFlushBeforeCompletingScan(new MyListHook() { 1140 @Override 1141 public void hook(int currentSize) { 1142 if (currentSize == expectedSize - 1) { 1143 try { 1144 flushStore(store, id++); 1145 timeToGoNextRow.set(true); 1146 } catch (IOException e) { 1147 throw new RuntimeException(e); 1148 } 1149 } 1150 } 1151 }, new FilterBase() { 1152 @Override 1153 public Filter.ReturnCode filterCell(final Cell c) throws IOException { 1154 return ReturnCode.INCLUDE; 1155 } 1156 }, expectedSize); 1157 } 1158 1159 @Test 1160 public void testFlushBeforeCompletingScanWithFilter() throws IOException, InterruptedException { 1161 final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false); 1162 final int expectedSize = 2; 1163 testFlushBeforeCompletingScan(new MyListHook() { 1164 @Override 1165 public void hook(int currentSize) { 1166 if (currentSize == expectedSize - 1) { 1167 try { 1168 flushStore(store, id++); 1169 timeToGoNextRow.set(true); 1170 } catch (IOException e) { 1171 throw new RuntimeException(e); 1172 } 1173 } 1174 } 1175 }, new FilterBase() { 1176 @Override 1177 public Filter.ReturnCode filterCell(final Cell c) throws IOException { 1178 if (timeToGoNextRow.get()) { 1179 timeToGoNextRow.set(false); 1180 return ReturnCode.NEXT_ROW; 1181 } else { 1182 return ReturnCode.INCLUDE; 1183 } 1184 } 1185 }, expectedSize); 1186 } 1187 1188 @Test 1189 public void testFlushBeforeCompletingScanWithFilterHint() throws IOException, 1190 InterruptedException { 1191 final AtomicBoolean timeToGetHint = new AtomicBoolean(false); 1192 final int expectedSize = 2; 1193 testFlushBeforeCompletingScan(new MyListHook() { 1194 @Override 1195 public void hook(int currentSize) { 1196 if (currentSize == expectedSize - 1) { 1197 try { 1198 flushStore(store, id++); 1199 timeToGetHint.set(true); 1200 } catch (IOException e) { 1201 throw new RuntimeException(e); 1202 } 1203 } 1204 } 1205 }, new FilterBase() { 1206 @Override 1207 public Filter.ReturnCode filterCell(final Cell c) throws IOException { 1208 if (timeToGetHint.get()) { 1209 timeToGetHint.set(false); 1210 return Filter.ReturnCode.SEEK_NEXT_USING_HINT; 1211 } else { 1212 return Filter.ReturnCode.INCLUDE; 1213 } 1214 } 1215 @Override 1216 public Cell getNextCellHint(Cell currentCell) throws IOException { 1217 return currentCell; 1218 } 1219 }, expectedSize); 1220 } 1221 1222 private void testFlushBeforeCompletingScan(MyListHook hook, Filter filter, int expectedSize) 1223 throws IOException, InterruptedException { 1224 Configuration conf = HBaseConfiguration.create(); 1225 byte[] r0 = Bytes.toBytes("row0"); 1226 byte[] r1 = Bytes.toBytes("row1"); 1227 byte[] r2 = Bytes.toBytes("row2"); 1228 byte[] value0 = Bytes.toBytes("value0"); 1229 byte[] value1 = Bytes.toBytes("value1"); 1230 byte[] value2 = Bytes.toBytes("value2"); 1231 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 1232 long ts = EnvironmentEdgeManager.currentTime(); 1233 long seqId = 100; 1234 init(name.getMethodName(), conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), 1235 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(1).build(), 1236 new MyStoreHook() { 1237 @Override 1238 public long getSmallestReadPoint(HStore store) { 1239 return seqId + 3; 1240 } 1241 }); 1242 // The cells having the value0 won't be flushed to disk because the value of max version is 1 1243 store.add(createCell(r0, qf1, ts, seqId, value0), memStoreSizing); 1244 store.add(createCell(r0, qf2, ts, seqId, value0), memStoreSizing); 1245 store.add(createCell(r0, qf3, ts, seqId, value0), memStoreSizing); 1246 store.add(createCell(r1, qf1, ts + 1, seqId + 1, value1), memStoreSizing); 1247 store.add(createCell(r1, qf2, ts + 1, seqId + 1, value1), memStoreSizing); 1248 store.add(createCell(r1, qf3, ts + 1, seqId + 1, value1), memStoreSizing); 1249 store.add(createCell(r2, qf1, ts + 2, seqId + 2, value2), memStoreSizing); 1250 store.add(createCell(r2, qf2, ts + 2, seqId + 2, value2), memStoreSizing); 1251 store.add(createCell(r2, qf3, ts + 2, seqId + 2, value2), memStoreSizing); 1252 store.add(createCell(r1, qf1, ts + 3, seqId + 3, value1), memStoreSizing); 1253 store.add(createCell(r1, qf2, ts + 3, seqId + 3, value1), memStoreSizing); 1254 store.add(createCell(r1, qf3, ts + 3, seqId + 3, value1), memStoreSizing); 1255 List<Cell> myList = new MyList<>(hook); 1256 Scan scan = new Scan() 1257 .withStartRow(r1) 1258 .setFilter(filter); 1259 try (InternalScanner scanner = (InternalScanner) store.getScanner( 1260 scan, null, seqId + 3)){ 1261 // r1 1262 scanner.next(myList); 1263 assertEquals(expectedSize, myList.size()); 1264 for (Cell c : myList) { 1265 byte[] actualValue = CellUtil.cloneValue(c); 1266 assertTrue("expected:" + Bytes.toStringBinary(value1) 1267 + ", actual:" + Bytes.toStringBinary(actualValue) 1268 , Bytes.equals(actualValue, value1)); 1269 } 1270 List<Cell> normalList = new ArrayList<>(3); 1271 // r2 1272 scanner.next(normalList); 1273 assertEquals(3, normalList.size()); 1274 for (Cell c : normalList) { 1275 byte[] actualValue = CellUtil.cloneValue(c); 1276 assertTrue("expected:" + Bytes.toStringBinary(value2) 1277 + ", actual:" + Bytes.toStringBinary(actualValue) 1278 , Bytes.equals(actualValue, value2)); 1279 } 1280 } 1281 } 1282 1283 @Test 1284 public void testCreateScannerAndSnapshotConcurrently() throws IOException, InterruptedException { 1285 Configuration conf = HBaseConfiguration.create(); 1286 conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore.class.getName()); 1287 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) 1288 .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); 1289 byte[] value = Bytes.toBytes("value"); 1290 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 1291 long ts = EnvironmentEdgeManager.currentTime(); 1292 long seqId = 100; 1293 // older data whihc shouldn't be "seen" by client 1294 store.add(createCell(qf1, ts, seqId, value), memStoreSizing); 1295 store.add(createCell(qf2, ts, seqId, value), memStoreSizing); 1296 store.add(createCell(qf3, ts, seqId, value), memStoreSizing); 1297 TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); 1298 quals.add(qf1); 1299 quals.add(qf2); 1300 quals.add(qf3); 1301 StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY); 1302 MyCompactingMemStore.START_TEST.set(true); 1303 Runnable flush = () -> { 1304 // this is blocked until we create first scanner from pipeline and snapshot -- phase (1/5) 1305 // recreate the active memstore -- phase (4/5) 1306 storeFlushCtx.prepare(); 1307 }; 1308 ExecutorService service = Executors.newSingleThreadExecutor(); 1309 service.submit(flush); 1310 // we get scanner from pipeline and snapshot but they are empty. -- phase (2/5) 1311 // this is blocked until we recreate the active memstore -- phase (3/5) 1312 // we get scanner from active memstore but it is empty -- phase (5/5) 1313 InternalScanner scanner = (InternalScanner) store.getScanner( 1314 new Scan(new Get(row)), quals, seqId + 1); 1315 service.shutdown(); 1316 service.awaitTermination(20, TimeUnit.SECONDS); 1317 try { 1318 try { 1319 List<Cell> results = new ArrayList<>(); 1320 scanner.next(results); 1321 assertEquals(3, results.size()); 1322 for (Cell c : results) { 1323 byte[] actualValue = CellUtil.cloneValue(c); 1324 assertTrue("expected:" + Bytes.toStringBinary(value) 1325 + ", actual:" + Bytes.toStringBinary(actualValue) 1326 , Bytes.equals(actualValue, value)); 1327 } 1328 } finally { 1329 scanner.close(); 1330 } 1331 } finally { 1332 MyCompactingMemStore.START_TEST.set(false); 1333 storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class)); 1334 storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); 1335 } 1336 } 1337 1338 @Test 1339 public void testScanWithDoubleFlush() throws IOException { 1340 Configuration conf = HBaseConfiguration.create(); 1341 // Initialize region 1342 MyStore myStore = initMyStore(name.getMethodName(), conf, new MyStoreHook(){ 1343 @Override 1344 public void getScanners(MyStore store) throws IOException { 1345 final long tmpId = id++; 1346 ExecutorService s = Executors.newSingleThreadExecutor(); 1347 s.submit(() -> { 1348 try { 1349 // flush the store before storescanner updates the scanners from store. 1350 // The current data will be flushed into files, and the memstore will 1351 // be clear. 1352 // -- phase (4/4) 1353 flushStore(store, tmpId); 1354 }catch (IOException ex) { 1355 throw new RuntimeException(ex); 1356 } 1357 }); 1358 s.shutdown(); 1359 try { 1360 // wait for the flush, the thread will be blocked in HStore#notifyChangedReadersObservers. 1361 s.awaitTermination(3, TimeUnit.SECONDS); 1362 } catch (InterruptedException ex) { 1363 } 1364 } 1365 }); 1366 byte[] oldValue = Bytes.toBytes("oldValue"); 1367 byte[] currentValue = Bytes.toBytes("currentValue"); 1368 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 1369 long ts = EnvironmentEdgeManager.currentTime(); 1370 long seqId = 100; 1371 // older data whihc shouldn't be "seen" by client 1372 myStore.add(createCell(qf1, ts, seqId, oldValue), memStoreSizing); 1373 myStore.add(createCell(qf2, ts, seqId, oldValue), memStoreSizing); 1374 myStore.add(createCell(qf3, ts, seqId, oldValue), memStoreSizing); 1375 long snapshotId = id++; 1376 // push older data into snapshot -- phase (1/4) 1377 StoreFlushContext storeFlushCtx = store.createFlushContext(snapshotId, FlushLifeCycleTracker 1378 .DUMMY); 1379 storeFlushCtx.prepare(); 1380 1381 // insert current data into active -- phase (2/4) 1382 myStore.add(createCell(qf1, ts + 1, seqId + 1, currentValue), memStoreSizing); 1383 myStore.add(createCell(qf2, ts + 1, seqId + 1, currentValue), memStoreSizing); 1384 myStore.add(createCell(qf3, ts + 1, seqId + 1, currentValue), memStoreSizing); 1385 TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); 1386 quals.add(qf1); 1387 quals.add(qf2); 1388 quals.add(qf3); 1389 try (InternalScanner scanner = (InternalScanner) myStore.getScanner( 1390 new Scan(new Get(row)), quals, seqId + 1)) { 1391 // complete the flush -- phase (3/4) 1392 storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class)); 1393 storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); 1394 1395 List<Cell> results = new ArrayList<>(); 1396 scanner.next(results); 1397 assertEquals(3, results.size()); 1398 for (Cell c : results) { 1399 byte[] actualValue = CellUtil.cloneValue(c); 1400 assertTrue("expected:" + Bytes.toStringBinary(currentValue) 1401 + ", actual:" + Bytes.toStringBinary(actualValue) 1402 , Bytes.equals(actualValue, currentValue)); 1403 } 1404 } 1405 } 1406 1407 @Test 1408 public void testReclaimChunkWhenScaning() throws IOException { 1409 init("testReclaimChunkWhenScaning"); 1410 long ts = EnvironmentEdgeManager.currentTime(); 1411 long seqId = 100; 1412 byte[] value = Bytes.toBytes("value"); 1413 // older data whihc shouldn't be "seen" by client 1414 store.add(createCell(qf1, ts, seqId, value), null); 1415 store.add(createCell(qf2, ts, seqId, value), null); 1416 store.add(createCell(qf3, ts, seqId, value), null); 1417 TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); 1418 quals.add(qf1); 1419 quals.add(qf2); 1420 quals.add(qf3); 1421 try (InternalScanner scanner = (InternalScanner) store.getScanner( 1422 new Scan(new Get(row)), quals, seqId)) { 1423 List<Cell> results = new MyList<>(size -> { 1424 switch (size) { 1425 // 1) we get the first cell (qf1) 1426 // 2) flush the data to have StoreScanner update inner scanners 1427 // 3) the chunk will be reclaimed after updaing 1428 case 1: 1429 try { 1430 flushStore(store, id++); 1431 } catch (IOException e) { 1432 throw new RuntimeException(e); 1433 } 1434 break; 1435 // 1) we get the second cell (qf2) 1436 // 2) add some cell to fill some byte into the chunk (we have only one chunk) 1437 case 2: 1438 try { 1439 byte[] newValue = Bytes.toBytes("newValue"); 1440 // older data whihc shouldn't be "seen" by client 1441 store.add(createCell(qf1, ts + 1, seqId + 1, newValue), null); 1442 store.add(createCell(qf2, ts + 1, seqId + 1, newValue), null); 1443 store.add(createCell(qf3, ts + 1, seqId + 1, newValue), null); 1444 } catch (IOException e) { 1445 throw new RuntimeException(e); 1446 } 1447 break; 1448 default: 1449 break; 1450 } 1451 }); 1452 scanner.next(results); 1453 assertEquals(3, results.size()); 1454 for (Cell c : results) { 1455 byte[] actualValue = CellUtil.cloneValue(c); 1456 assertTrue("expected:" + Bytes.toStringBinary(value) 1457 + ", actual:" + Bytes.toStringBinary(actualValue) 1458 , Bytes.equals(actualValue, value)); 1459 } 1460 } 1461 } 1462 1463 /** 1464 * If there are two running InMemoryFlushRunnable, the later InMemoryFlushRunnable 1465 * may change the versionedList. And the first InMemoryFlushRunnable will use the chagned 1466 * versionedList to remove the corresponding segments. 1467 * In short, there will be some segements which isn't in merge are removed. 1468 * @throws IOException 1469 * @throws InterruptedException 1470 */ 1471 @Test 1472 public void testRunDoubleMemStoreCompactors() throws IOException, InterruptedException { 1473 int flushSize = 500; 1474 Configuration conf = HBaseConfiguration.create(); 1475 conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStoreWithCustomCompactor.class.getName()); 1476 conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.25); 1477 MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.set(0); 1478 conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushSize)); 1479 // Set the lower threshold to invoke the "MERGE" policy 1480 conf.set(MemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY, String.valueOf(0)); 1481 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) 1482 .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); 1483 byte[] value = Bytes.toBytes("thisisavarylargevalue"); 1484 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 1485 long ts = EnvironmentEdgeManager.currentTime(); 1486 long seqId = 100; 1487 // older data whihc shouldn't be "seen" by client 1488 store.add(createCell(qf1, ts, seqId, value), memStoreSizing); 1489 store.add(createCell(qf2, ts, seqId, value), memStoreSizing); 1490 store.add(createCell(qf3, ts, seqId, value), memStoreSizing); 1491 assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get()); 1492 StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY); 1493 storeFlushCtx.prepare(); 1494 // This shouldn't invoke another in-memory flush because the first compactor thread 1495 // hasn't accomplished the in-memory compaction. 1496 store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing); 1497 store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing); 1498 store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing); 1499 assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get()); 1500 //okay. Let the compaction be completed 1501 MyMemStoreCompactor.START_COMPACTOR_LATCH.countDown(); 1502 CompactingMemStore mem = (CompactingMemStore) ((HStore)store).memstore; 1503 while (mem.isMemStoreFlushingInMemory()) { 1504 TimeUnit.SECONDS.sleep(1); 1505 } 1506 // This should invoke another in-memory flush. 1507 store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing); 1508 store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing); 1509 store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing); 1510 assertEquals(2, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get()); 1511 conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1512 String.valueOf(TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE)); 1513 storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class)); 1514 storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); 1515 } 1516 1517 @Test 1518 public void testAge() throws IOException { 1519 long currentTime = System.currentTimeMillis(); 1520 ManualEnvironmentEdge edge = new ManualEnvironmentEdge(); 1521 edge.setValue(currentTime); 1522 EnvironmentEdgeManager.injectEdge(edge); 1523 Configuration conf = TEST_UTIL.getConfiguration(); 1524 ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.of(family); 1525 initHRegion(name.getMethodName(), conf, 1526 TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), hcd, null, false); 1527 HStore store = new HStore(region, hcd, conf) { 1528 1529 @Override 1530 protected StoreEngine<?, ?, ?, ?> createStoreEngine(HStore store, Configuration conf, 1531 CellComparator kvComparator) throws IOException { 1532 List<HStoreFile> storefiles = 1533 Arrays.asList(mockStoreFile(currentTime - 10), mockStoreFile(currentTime - 100), 1534 mockStoreFile(currentTime - 1000), mockStoreFile(currentTime - 10000)); 1535 StoreFileManager sfm = mock(StoreFileManager.class); 1536 when(sfm.getStorefiles()).thenReturn(storefiles); 1537 StoreEngine<?, ?, ?, ?> storeEngine = mock(StoreEngine.class); 1538 when(storeEngine.getStoreFileManager()).thenReturn(sfm); 1539 return storeEngine; 1540 } 1541 }; 1542 assertEquals(10L, store.getMinStoreFileAge().getAsLong()); 1543 assertEquals(10000L, store.getMaxStoreFileAge().getAsLong()); 1544 assertEquals((10 + 100 + 1000 + 10000) / 4.0, store.getAvgStoreFileAge().getAsDouble(), 1E-4); 1545 } 1546 1547 private HStoreFile mockStoreFile(long createdTime) { 1548 StoreFileInfo info = mock(StoreFileInfo.class); 1549 when(info.getCreatedTimestamp()).thenReturn(createdTime); 1550 HStoreFile sf = mock(HStoreFile.class); 1551 when(sf.getReader()).thenReturn(mock(StoreFileReader.class)); 1552 when(sf.isHFile()).thenReturn(true); 1553 when(sf.getFileInfo()).thenReturn(info); 1554 return sf; 1555 } 1556 1557 private MyStore initMyStore(String methodName, Configuration conf, MyStoreHook hook) 1558 throws IOException { 1559 return (MyStore) init(methodName, conf, 1560 TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), 1561 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(5).build(), hook); 1562 } 1563 1564 private static class MyStore extends HStore { 1565 private final MyStoreHook hook; 1566 1567 MyStore(final HRegion region, final ColumnFamilyDescriptor family, final Configuration 1568 confParam, MyStoreHook hook, boolean switchToPread) throws IOException { 1569 super(region, family, confParam); 1570 this.hook = hook; 1571 } 1572 1573 @Override 1574 public List<KeyValueScanner> getScanners(List<HStoreFile> files, boolean cacheBlocks, 1575 boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, 1576 boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt, 1577 boolean includeMemstoreScanner) throws IOException { 1578 hook.getScanners(this); 1579 return super.getScanners(files, cacheBlocks, usePread, isCompaction, matcher, startRow, true, 1580 stopRow, false, readPt, includeMemstoreScanner); 1581 } 1582 1583 @Override 1584 public long getSmallestReadPoint() { 1585 return hook.getSmallestReadPoint(this); 1586 } 1587 } 1588 1589 private abstract static class MyStoreHook { 1590 1591 void getScanners(MyStore store) throws IOException { 1592 } 1593 1594 long getSmallestReadPoint(HStore store) { 1595 return store.getHRegion().getSmallestReadPoint(); 1596 } 1597 } 1598 1599 @Test 1600 public void testSwitchingPreadtoStreamParallelyWithCompactionDischarger() throws Exception { 1601 Configuration conf = HBaseConfiguration.create(); 1602 conf.set("hbase.hstore.engine.class", DummyStoreEngine.class.getName()); 1603 conf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 0); 1604 // Set the lower threshold to invoke the "MERGE" policy 1605 MyStore store = initMyStore(name.getMethodName(), conf, new MyStoreHook() {}); 1606 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 1607 long ts = System.currentTimeMillis(); 1608 long seqID = 1L; 1609 // Add some data to the region and do some flushes 1610 for (int i = 1; i < 10; i++) { 1611 store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), 1612 memStoreSizing); 1613 } 1614 // flush them 1615 flushStore(store, seqID); 1616 for (int i = 11; i < 20; i++) { 1617 store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), 1618 memStoreSizing); 1619 } 1620 // flush them 1621 flushStore(store, seqID); 1622 for (int i = 21; i < 30; i++) { 1623 store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), 1624 memStoreSizing); 1625 } 1626 // flush them 1627 flushStore(store, seqID); 1628 1629 assertEquals(3, store.getStorefilesCount()); 1630 Scan scan = new Scan(); 1631 scan.addFamily(family); 1632 Collection<HStoreFile> storefiles2 = store.getStorefiles(); 1633 ArrayList<HStoreFile> actualStorefiles = Lists.newArrayList(storefiles2); 1634 StoreScanner storeScanner = 1635 (StoreScanner) store.getScanner(scan, scan.getFamilyMap().get(family), Long.MAX_VALUE); 1636 // get the current heap 1637 KeyValueHeap heap = storeScanner.heap; 1638 // create more store files 1639 for (int i = 31; i < 40; i++) { 1640 store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), 1641 memStoreSizing); 1642 } 1643 // flush them 1644 flushStore(store, seqID); 1645 1646 for (int i = 41; i < 50; i++) { 1647 store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), 1648 memStoreSizing); 1649 } 1650 // flush them 1651 flushStore(store, seqID); 1652 storefiles2 = store.getStorefiles(); 1653 ArrayList<HStoreFile> actualStorefiles1 = Lists.newArrayList(storefiles2); 1654 actualStorefiles1.removeAll(actualStorefiles); 1655 // Do compaction 1656 MyThread thread = new MyThread(storeScanner); 1657 thread.start(); 1658 store.replaceStoreFiles(actualStorefiles, actualStorefiles1); 1659 thread.join(); 1660 KeyValueHeap heap2 = thread.getHeap(); 1661 assertFalse(heap.equals(heap2)); 1662 } 1663 1664 private static class MyThread extends Thread { 1665 private StoreScanner scanner; 1666 private KeyValueHeap heap; 1667 1668 public MyThread(StoreScanner scanner) { 1669 this.scanner = scanner; 1670 } 1671 1672 public KeyValueHeap getHeap() { 1673 return this.heap; 1674 } 1675 1676 @Override 1677 public void run() { 1678 scanner.trySwitchToStreamRead(); 1679 heap = scanner.heap; 1680 } 1681 } 1682 1683 private static class MyMemStoreCompactor extends MemStoreCompactor { 1684 private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0); 1685 private static final CountDownLatch START_COMPACTOR_LATCH = new CountDownLatch(1); 1686 public MyMemStoreCompactor(CompactingMemStore compactingMemStore, MemoryCompactionPolicy 1687 compactionPolicy) throws IllegalArgumentIOException { 1688 super(compactingMemStore, compactionPolicy); 1689 } 1690 1691 @Override 1692 public boolean start() throws IOException { 1693 boolean isFirst = RUNNER_COUNT.getAndIncrement() == 0; 1694 boolean rval = super.start(); 1695 if (isFirst) { 1696 try { 1697 START_COMPACTOR_LATCH.await(); 1698 } catch (InterruptedException ex) { 1699 throw new RuntimeException(ex); 1700 } 1701 } 1702 return rval; 1703 } 1704 } 1705 1706 public static class MyCompactingMemStoreWithCustomCompactor extends CompactingMemStore { 1707 private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0); 1708 public MyCompactingMemStoreWithCustomCompactor(Configuration conf, CellComparatorImpl c, 1709 HStore store, RegionServicesForStores regionServices, 1710 MemoryCompactionPolicy compactionPolicy) throws IOException { 1711 super(conf, c, store, regionServices, compactionPolicy); 1712 } 1713 1714 @Override 1715 protected MemStoreCompactor createMemStoreCompactor(MemoryCompactionPolicy compactionPolicy) 1716 throws IllegalArgumentIOException { 1717 return new MyMemStoreCompactor(this, compactionPolicy); 1718 } 1719 1720 @Override 1721 protected boolean shouldFlushInMemory() { 1722 boolean rval = super.shouldFlushInMemory(); 1723 if (rval) { 1724 RUNNER_COUNT.incrementAndGet(); 1725 if (LOG.isDebugEnabled()) { 1726 LOG.debug("runner count: " + RUNNER_COUNT.get()); 1727 } 1728 } 1729 return rval; 1730 } 1731 } 1732 1733 public static class MyCompactingMemStore extends CompactingMemStore { 1734 private static final AtomicBoolean START_TEST = new AtomicBoolean(false); 1735 private final CountDownLatch getScannerLatch = new CountDownLatch(1); 1736 private final CountDownLatch snapshotLatch = new CountDownLatch(1); 1737 public MyCompactingMemStore(Configuration conf, CellComparatorImpl c, 1738 HStore store, RegionServicesForStores regionServices, 1739 MemoryCompactionPolicy compactionPolicy) throws IOException { 1740 super(conf, c, store, regionServices, compactionPolicy); 1741 } 1742 1743 @Override 1744 protected List<KeyValueScanner> createList(int capacity) { 1745 if (START_TEST.get()) { 1746 try { 1747 getScannerLatch.countDown(); 1748 snapshotLatch.await(); 1749 } catch (InterruptedException e) { 1750 throw new RuntimeException(e); 1751 } 1752 } 1753 return new ArrayList<>(capacity); 1754 } 1755 @Override 1756 protected void pushActiveToPipeline(MutableSegment active) { 1757 if (START_TEST.get()) { 1758 try { 1759 getScannerLatch.await(); 1760 } catch (InterruptedException e) { 1761 throw new RuntimeException(e); 1762 } 1763 } 1764 1765 super.pushActiveToPipeline(active); 1766 if (START_TEST.get()) { 1767 snapshotLatch.countDown(); 1768 } 1769 } 1770 } 1771 1772 interface MyListHook { 1773 void hook(int currentSize); 1774 } 1775 1776 private static class MyList<T> implements List<T> { 1777 private final List<T> delegatee = new ArrayList<>(); 1778 private final MyListHook hookAtAdd; 1779 MyList(final MyListHook hookAtAdd) { 1780 this.hookAtAdd = hookAtAdd; 1781 } 1782 @Override 1783 public int size() {return delegatee.size();} 1784 1785 @Override 1786 public boolean isEmpty() {return delegatee.isEmpty();} 1787 1788 @Override 1789 public boolean contains(Object o) {return delegatee.contains(o);} 1790 1791 @Override 1792 public Iterator<T> iterator() {return delegatee.iterator();} 1793 1794 @Override 1795 public Object[] toArray() {return delegatee.toArray();} 1796 1797 @Override 1798 public <R> R[] toArray(R[] a) {return delegatee.toArray(a);} 1799 1800 @Override 1801 public boolean add(T e) { 1802 hookAtAdd.hook(size()); 1803 return delegatee.add(e); 1804 } 1805 1806 @Override 1807 public boolean remove(Object o) {return delegatee.remove(o);} 1808 1809 @Override 1810 public boolean containsAll(Collection<?> c) {return delegatee.containsAll(c);} 1811 1812 @Override 1813 public boolean addAll(Collection<? extends T> c) {return delegatee.addAll(c);} 1814 1815 @Override 1816 public boolean addAll(int index, Collection<? extends T> c) {return delegatee.addAll(index, c);} 1817 1818 @Override 1819 public boolean removeAll(Collection<?> c) {return delegatee.removeAll(c);} 1820 1821 @Override 1822 public boolean retainAll(Collection<?> c) {return delegatee.retainAll(c);} 1823 1824 @Override 1825 public void clear() {delegatee.clear();} 1826 1827 @Override 1828 public T get(int index) {return delegatee.get(index);} 1829 1830 @Override 1831 public T set(int index, T element) {return delegatee.set(index, element);} 1832 1833 @Override 1834 public void add(int index, T element) {delegatee.add(index, element);} 1835 1836 @Override 1837 public T remove(int index) {return delegatee.remove(index);} 1838 1839 @Override 1840 public int indexOf(Object o) {return delegatee.indexOf(o);} 1841 1842 @Override 1843 public int lastIndexOf(Object o) {return delegatee.lastIndexOf(o);} 1844 1845 @Override 1846 public ListIterator<T> listIterator() {return delegatee.listIterator();} 1847 1848 @Override 1849 public ListIterator<T> listIterator(int index) {return delegatee.listIterator(index);} 1850 1851 @Override 1852 public List<T> subList(int fromIndex, int toIndex) {return delegatee.subList(fromIndex, toIndex);} 1853 } 1854}