001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.Assert.assertArrayEquals; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertFalse; 023import static org.junit.Assert.assertNull; 024import static org.junit.Assert.assertTrue; 025import static org.junit.Assert.fail; 026import static org.mockito.ArgumentMatchers.any; 027import static org.mockito.Mockito.mock; 028import static org.mockito.Mockito.spy; 029import static org.mockito.Mockito.times; 030import static org.mockito.Mockito.verify; 031import static org.mockito.Mockito.when; 032 033import java.io.IOException; 034import java.lang.ref.SoftReference; 035import java.security.PrivilegedExceptionAction; 036import java.util.ArrayList; 037import java.util.Arrays; 038import java.util.Collection; 039import java.util.Collections; 040import java.util.Iterator; 041import java.util.List; 042import java.util.ListIterator; 043import java.util.NavigableSet; 044import java.util.TreeSet; 045import java.util.concurrent.ConcurrentSkipListSet; 046import java.util.concurrent.CountDownLatch; 047import java.util.concurrent.CyclicBarrier; 048import java.util.concurrent.ExecutorService; 049import java.util.concurrent.Executors; 050import java.util.concurrent.ThreadPoolExecutor; 051import java.util.concurrent.TimeUnit; 052import java.util.concurrent.atomic.AtomicBoolean; 053import java.util.concurrent.atomic.AtomicInteger; 054import java.util.concurrent.atomic.AtomicLong; 055import java.util.concurrent.atomic.AtomicReference; 056import java.util.concurrent.locks.ReentrantReadWriteLock; 057import java.util.function.Consumer; 058import java.util.function.IntBinaryOperator; 059import org.apache.hadoop.conf.Configuration; 060import org.apache.hadoop.fs.FSDataOutputStream; 061import org.apache.hadoop.fs.FileStatus; 062import org.apache.hadoop.fs.FileSystem; 063import org.apache.hadoop.fs.FilterFileSystem; 064import org.apache.hadoop.fs.LocalFileSystem; 065import org.apache.hadoop.fs.Path; 066import org.apache.hadoop.fs.permission.FsPermission; 067import org.apache.hadoop.hbase.Cell; 068import org.apache.hadoop.hbase.CellBuilderFactory; 069import org.apache.hadoop.hbase.CellBuilderType; 070import org.apache.hadoop.hbase.CellComparator; 071import org.apache.hadoop.hbase.CellComparatorImpl; 072import org.apache.hadoop.hbase.CellUtil; 073import org.apache.hadoop.hbase.HBaseClassTestRule; 074import org.apache.hadoop.hbase.HBaseConfiguration; 075import org.apache.hadoop.hbase.HBaseTestingUtil; 076import org.apache.hadoop.hbase.HConstants; 077import org.apache.hadoop.hbase.KeyValue; 078import org.apache.hadoop.hbase.MemoryCompactionPolicy; 079import org.apache.hadoop.hbase.PrivateCellUtil; 080import org.apache.hadoop.hbase.TableName; 081import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 082import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 083import org.apache.hadoop.hbase.client.Get; 084import org.apache.hadoop.hbase.client.RegionInfo; 085import org.apache.hadoop.hbase.client.RegionInfoBuilder; 086import org.apache.hadoop.hbase.client.Scan; 087import org.apache.hadoop.hbase.client.Scan.ReadType; 088import org.apache.hadoop.hbase.client.TableDescriptor; 089import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 090import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException; 091import org.apache.hadoop.hbase.filter.Filter; 092import org.apache.hadoop.hbase.filter.FilterBase; 093import org.apache.hadoop.hbase.io.compress.Compression; 094import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 095import org.apache.hadoop.hbase.io.hfile.CacheConfig; 096import org.apache.hadoop.hbase.io.hfile.HFile; 097import org.apache.hadoop.hbase.io.hfile.HFileContext; 098import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 099import org.apache.hadoop.hbase.monitoring.MonitoredTask; 100import org.apache.hadoop.hbase.nio.RefCnt; 101import org.apache.hadoop.hbase.quotas.RegionSizeStoreImpl; 102import org.apache.hadoop.hbase.regionserver.MemStoreCompactionStrategy.Action; 103import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; 104import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; 105import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher; 106import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; 107import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 108import org.apache.hadoop.hbase.security.User; 109import org.apache.hadoop.hbase.testclassification.MediumTests; 110import org.apache.hadoop.hbase.testclassification.RegionServerTests; 111import org.apache.hadoop.hbase.util.Bytes; 112import org.apache.hadoop.hbase.util.CommonFSUtils; 113import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 114import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; 115import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge; 116import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; 117import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 118import org.apache.hadoop.hbase.wal.WALFactory; 119import org.apache.hadoop.util.Progressable; 120import org.junit.After; 121import org.junit.AfterClass; 122import org.junit.Before; 123import org.junit.ClassRule; 124import org.junit.Rule; 125import org.junit.Test; 126import org.junit.experimental.categories.Category; 127import org.junit.rules.TestName; 128import org.mockito.Mockito; 129import org.slf4j.Logger; 130import org.slf4j.LoggerFactory; 131 132import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 133 134/** 135 * Test class for the HStore 136 */ 137@Category({ RegionServerTests.class, MediumTests.class }) 138public class TestHStore { 139 140 @ClassRule 141 public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestHStore.class); 142 143 private static final Logger LOG = LoggerFactory.getLogger(TestHStore.class); 144 @Rule 145 public TestName name = new TestName(); 146 147 HRegion region; 148 HStore store; 149 byte[] table = Bytes.toBytes("table"); 150 byte[] family = Bytes.toBytes("family"); 151 152 byte[] row = Bytes.toBytes("row"); 153 byte[] row2 = Bytes.toBytes("row2"); 154 byte[] qf1 = Bytes.toBytes("qf1"); 155 byte[] qf2 = Bytes.toBytes("qf2"); 156 byte[] qf3 = Bytes.toBytes("qf3"); 157 byte[] qf4 = Bytes.toBytes("qf4"); 158 byte[] qf5 = Bytes.toBytes("qf5"); 159 byte[] qf6 = Bytes.toBytes("qf6"); 160 161 NavigableSet<byte[]> qualifiers = new ConcurrentSkipListSet<>(Bytes.BYTES_COMPARATOR); 162 163 List<Cell> expected = new ArrayList<>(); 164 List<Cell> result = new ArrayList<>(); 165 166 long id = EnvironmentEdgeManager.currentTime(); 167 Get get = new Get(row); 168 169 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 170 private static final String DIR = TEST_UTIL.getDataTestDir("TestStore").toString(); 171 172 @Before 173 public void setUp() throws IOException { 174 qualifiers.clear(); 175 qualifiers.add(qf1); 176 qualifiers.add(qf3); 177 qualifiers.add(qf5); 178 179 Iterator<byte[]> iter = qualifiers.iterator(); 180 while (iter.hasNext()) { 181 byte[] next = iter.next(); 182 expected.add(new KeyValue(row, family, next, 1, (byte[]) null)); 183 get.addColumn(family, next); 184 } 185 } 186 187 private void init(String methodName) throws IOException { 188 init(methodName, TEST_UTIL.getConfiguration()); 189 } 190 191 private HStore init(String methodName, Configuration conf) throws IOException { 192 // some of the tests write 4 versions and then flush 193 // (with HBASE-4241, lower versions are collected on flush) 194 return init(methodName, conf, 195 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(4).build()); 196 } 197 198 private HStore init(String methodName, Configuration conf, ColumnFamilyDescriptor hcd) 199 throws IOException { 200 return init(methodName, conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), hcd); 201 } 202 203 private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder, 204 ColumnFamilyDescriptor hcd) throws IOException { 205 return init(methodName, conf, builder, hcd, null); 206 } 207 208 private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder, 209 ColumnFamilyDescriptor hcd, MyStoreHook hook) throws IOException { 210 return init(methodName, conf, builder, hcd, hook, false); 211 } 212 213 private void initHRegion(String methodName, Configuration conf, TableDescriptorBuilder builder, 214 ColumnFamilyDescriptor hcd, MyStoreHook hook, boolean switchToPread) throws IOException { 215 TableDescriptor htd = builder.setColumnFamily(hcd).build(); 216 Path basedir = new Path(DIR + methodName); 217 Path tableDir = CommonFSUtils.getTableDir(basedir, htd.getTableName()); 218 final Path logdir = new Path(basedir, AbstractFSWALProvider.getWALDirectoryName(methodName)); 219 220 FileSystem fs = FileSystem.get(conf); 221 222 fs.delete(logdir, true); 223 ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 224 MemStoreLABImpl.CHUNK_SIZE_DEFAULT, 1, 0, null, 225 MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); 226 RegionInfo info = RegionInfoBuilder.newBuilder(htd.getTableName()).build(); 227 Configuration walConf = new Configuration(conf); 228 CommonFSUtils.setRootDir(walConf, basedir); 229 WALFactory wals = new WALFactory(walConf, methodName); 230 region = new HRegion(new HRegionFileSystem(conf, fs, tableDir, info), wals.getWAL(info), conf, 231 htd, null); 232 region.regionServicesForStores = Mockito.spy(region.regionServicesForStores); 233 ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1); 234 Mockito.when(region.regionServicesForStores.getInMemoryCompactionPool()).thenReturn(pool); 235 } 236 237 private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder, 238 ColumnFamilyDescriptor hcd, MyStoreHook hook, boolean switchToPread) throws IOException { 239 initHRegion(methodName, conf, builder, hcd, hook, switchToPread); 240 if (hook == null) { 241 store = new HStore(region, hcd, conf, false); 242 } else { 243 store = new MyStore(region, hcd, conf, hook, switchToPread); 244 } 245 region.stores.put(store.getColumnFamilyDescriptor().getName(), store); 246 return store; 247 } 248 249 /** 250 * Test we do not lose data if we fail a flush and then close. Part of HBase-10466 251 */ 252 @Test 253 public void testFlushSizeSizing() throws Exception { 254 LOG.info("Setting up a faulty file system that cannot write in " + this.name.getMethodName()); 255 final Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); 256 // Only retry once. 257 conf.setInt("hbase.hstore.flush.retries.number", 1); 258 User user = User.createUserForTesting(conf, this.name.getMethodName(), new String[] { "foo" }); 259 // Inject our faulty LocalFileSystem 260 conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class); 261 user.runAs(new PrivilegedExceptionAction<Object>() { 262 @Override 263 public Object run() throws Exception { 264 // Make sure it worked (above is sensitive to caching details in hadoop core) 265 FileSystem fs = FileSystem.get(conf); 266 assertEquals(FaultyFileSystem.class, fs.getClass()); 267 FaultyFileSystem ffs = (FaultyFileSystem) fs; 268 269 // Initialize region 270 init(name.getMethodName(), conf); 271 272 MemStoreSize mss = store.memstore.getFlushableSize(); 273 assertEquals(0, mss.getDataSize()); 274 LOG.info("Adding some data"); 275 MemStoreSizing kvSize = new NonThreadSafeMemStoreSizing(); 276 store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), kvSize); 277 // add the heap size of active (mutable) segment 278 kvSize.incMemStoreSize(0, MutableSegment.DEEP_OVERHEAD, 0, 0); 279 mss = store.memstore.getFlushableSize(); 280 assertEquals(kvSize.getMemStoreSize(), mss); 281 // Flush. Bug #1 from HBASE-10466. Make sure size calculation on failed flush is right. 282 try { 283 LOG.info("Flushing"); 284 flushStore(store, id++); 285 fail("Didn't bubble up IOE!"); 286 } catch (IOException ioe) { 287 assertTrue(ioe.getMessage().contains("Fault injected")); 288 } 289 // due to snapshot, change mutable to immutable segment 290 kvSize.incMemStoreSize(0, 291 CSLMImmutableSegment.DEEP_OVERHEAD_CSLM - MutableSegment.DEEP_OVERHEAD, 0, 0); 292 mss = store.memstore.getFlushableSize(); 293 assertEquals(kvSize.getMemStoreSize(), mss); 294 MemStoreSizing kvSize2 = new NonThreadSafeMemStoreSizing(); 295 store.add(new KeyValue(row, family, qf2, 2, (byte[]) null), kvSize2); 296 kvSize2.incMemStoreSize(0, MutableSegment.DEEP_OVERHEAD, 0, 0); 297 // Even though we add a new kv, we expect the flushable size to be 'same' since we have 298 // not yet cleared the snapshot -- the above flush failed. 299 assertEquals(kvSize.getMemStoreSize(), mss); 300 ffs.fault.set(false); 301 flushStore(store, id++); 302 mss = store.memstore.getFlushableSize(); 303 // Size should be the foreground kv size. 304 assertEquals(kvSize2.getMemStoreSize(), mss); 305 flushStore(store, id++); 306 mss = store.memstore.getFlushableSize(); 307 assertEquals(0, mss.getDataSize()); 308 assertEquals(MutableSegment.DEEP_OVERHEAD, mss.getHeapSize()); 309 return null; 310 } 311 }); 312 } 313 314 /** 315 * Verify that compression and data block encoding are respected by the createWriter method, used 316 * on store flush. 317 */ 318 @Test 319 public void testCreateWriter() throws Exception { 320 Configuration conf = HBaseConfiguration.create(); 321 FileSystem fs = FileSystem.get(conf); 322 323 ColumnFamilyDescriptor hcd = 324 ColumnFamilyDescriptorBuilder.newBuilder(family).setCompressionType(Compression.Algorithm.GZ) 325 .setDataBlockEncoding(DataBlockEncoding.DIFF).build(); 326 init(name.getMethodName(), conf, hcd); 327 328 // Test createWriter 329 StoreFileWriter writer = store.getStoreEngine() 330 .createWriter(CreateStoreFileWriterParams.create().maxKeyCount(4) 331 .compression(hcd.getCompressionType()).isCompaction(false).includeMVCCReadpoint(true) 332 .includesTag(false).shouldDropBehind(false)); 333 Path path = writer.getPath(); 334 writer.append(new KeyValue(row, family, qf1, Bytes.toBytes(1))); 335 writer.append(new KeyValue(row, family, qf2, Bytes.toBytes(2))); 336 writer.append(new KeyValue(row2, family, qf1, Bytes.toBytes(3))); 337 writer.append(new KeyValue(row2, family, qf2, Bytes.toBytes(4))); 338 writer.close(); 339 340 // Verify that compression and encoding settings are respected 341 HFile.Reader reader = HFile.createReader(fs, path, new CacheConfig(conf), true, conf); 342 assertEquals(hcd.getCompressionType(), reader.getTrailer().getCompressionCodec()); 343 assertEquals(hcd.getDataBlockEncoding(), reader.getDataBlockEncoding()); 344 reader.close(); 345 } 346 347 @Test 348 public void testDeleteExpiredStoreFiles() throws Exception { 349 testDeleteExpiredStoreFiles(0); 350 testDeleteExpiredStoreFiles(1); 351 } 352 353 /** 354 * @param minVersions the MIN_VERSIONS for the column family 355 */ 356 public void testDeleteExpiredStoreFiles(int minVersions) throws Exception { 357 int storeFileNum = 4; 358 int ttl = 4; 359 IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge(); 360 EnvironmentEdgeManagerTestHelper.injectEdge(edge); 361 362 Configuration conf = HBaseConfiguration.create(); 363 // Enable the expired store file deletion 364 conf.setBoolean("hbase.store.delete.expired.storefile", true); 365 // Set the compaction threshold higher to avoid normal compactions. 366 conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 5); 367 368 init(name.getMethodName() + "-" + minVersions, conf, ColumnFamilyDescriptorBuilder 369 .newBuilder(family).setMinVersions(minVersions).setTimeToLive(ttl).build()); 370 371 long storeTtl = this.store.getScanInfo().getTtl(); 372 long sleepTime = storeTtl / storeFileNum; 373 long timeStamp; 374 // There are 4 store files and the max time stamp difference among these 375 // store files will be (this.store.ttl / storeFileNum) 376 for (int i = 1; i <= storeFileNum; i++) { 377 LOG.info("Adding some data for the store file #" + i); 378 timeStamp = EnvironmentEdgeManager.currentTime(); 379 this.store.add(new KeyValue(row, family, qf1, timeStamp, (byte[]) null), null); 380 this.store.add(new KeyValue(row, family, qf2, timeStamp, (byte[]) null), null); 381 this.store.add(new KeyValue(row, family, qf3, timeStamp, (byte[]) null), null); 382 flush(i); 383 edge.incrementTime(sleepTime); 384 } 385 386 // Verify the total number of store files 387 assertEquals(storeFileNum, this.store.getStorefiles().size()); 388 389 // Each call will find one expired store file and delete it before compaction happens. 390 // There will be no compaction due to threshold above. Last file will not be replaced. 391 for (int i = 1; i <= storeFileNum - 1; i++) { 392 // verify the expired store file. 393 assertFalse(this.store.requestCompaction().isPresent()); 394 Collection<HStoreFile> sfs = this.store.getStorefiles(); 395 // Ensure i files are gone. 396 if (minVersions == 0) { 397 assertEquals(storeFileNum - i, sfs.size()); 398 // Ensure only non-expired files remain. 399 for (HStoreFile sf : sfs) { 400 assertTrue(sf.getReader().getMaxTimestamp() >= (edge.currentTime() - storeTtl)); 401 } 402 } else { 403 assertEquals(storeFileNum, sfs.size()); 404 } 405 // Let the next store file expired. 406 edge.incrementTime(sleepTime); 407 } 408 assertFalse(this.store.requestCompaction().isPresent()); 409 410 Collection<HStoreFile> sfs = this.store.getStorefiles(); 411 // Assert the last expired file is not removed. 412 if (minVersions == 0) { 413 assertEquals(1, sfs.size()); 414 } 415 long ts = sfs.iterator().next().getReader().getMaxTimestamp(); 416 assertTrue(ts < (edge.currentTime() - storeTtl)); 417 418 for (HStoreFile sf : sfs) { 419 sf.closeStoreFile(true); 420 } 421 } 422 423 @Test 424 public void testLowestModificationTime() throws Exception { 425 Configuration conf = HBaseConfiguration.create(); 426 FileSystem fs = FileSystem.get(conf); 427 // Initialize region 428 init(name.getMethodName(), conf); 429 430 int storeFileNum = 4; 431 for (int i = 1; i <= storeFileNum; i++) { 432 LOG.info("Adding some data for the store file #" + i); 433 this.store.add(new KeyValue(row, family, qf1, i, (byte[]) null), null); 434 this.store.add(new KeyValue(row, family, qf2, i, (byte[]) null), null); 435 this.store.add(new KeyValue(row, family, qf3, i, (byte[]) null), null); 436 flush(i); 437 } 438 // after flush; check the lowest time stamp 439 long lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles()); 440 long lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles()); 441 assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS); 442 443 // after compact; check the lowest time stamp 444 store.compact(store.requestCompaction().get(), NoLimitThroughputController.INSTANCE, null); 445 lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles()); 446 lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles()); 447 assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS); 448 } 449 450 private static long getLowestTimeStampFromFS(FileSystem fs, 451 final Collection<HStoreFile> candidates) throws IOException { 452 long minTs = Long.MAX_VALUE; 453 if (candidates.isEmpty()) { 454 return minTs; 455 } 456 Path[] p = new Path[candidates.size()]; 457 int i = 0; 458 for (HStoreFile sf : candidates) { 459 p[i] = sf.getPath(); 460 ++i; 461 } 462 463 FileStatus[] stats = fs.listStatus(p); 464 if (stats == null || stats.length == 0) { 465 return minTs; 466 } 467 for (FileStatus s : stats) { 468 minTs = Math.min(minTs, s.getModificationTime()); 469 } 470 return minTs; 471 } 472 473 ////////////////////////////////////////////////////////////////////////////// 474 // Get tests 475 ////////////////////////////////////////////////////////////////////////////// 476 477 private static final int BLOCKSIZE_SMALL = 8192; 478 479 /** 480 * Test for hbase-1686. 481 */ 482 @Test 483 public void testEmptyStoreFile() throws IOException { 484 init(this.name.getMethodName()); 485 // Write a store file. 486 this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null); 487 this.store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null); 488 flush(1); 489 // Now put in place an empty store file. Its a little tricky. Have to 490 // do manually with hacked in sequence id. 491 HStoreFile f = this.store.getStorefiles().iterator().next(); 492 Path storedir = f.getPath().getParent(); 493 long seqid = f.getMaxSequenceId(); 494 Configuration c = HBaseConfiguration.create(); 495 FileSystem fs = FileSystem.get(c); 496 HFileContext meta = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).build(); 497 StoreFileWriter w = new StoreFileWriter.Builder(c, new CacheConfig(c), fs) 498 .withOutputDir(storedir).withFileContext(meta).build(); 499 w.appendMetadata(seqid + 1, false); 500 w.close(); 501 this.store.close(); 502 // Reopen it... should pick up two files 503 this.store = 504 new HStore(this.store.getHRegion(), this.store.getColumnFamilyDescriptor(), c, false); 505 assertEquals(2, this.store.getStorefilesCount()); 506 507 result = HBaseTestingUtil.getFromStoreFile(store, get.getRow(), qualifiers); 508 assertEquals(1, result.size()); 509 } 510 511 /** 512 * Getting data from memstore only 513 */ 514 @Test 515 public void testGet_FromMemStoreOnly() throws IOException { 516 init(this.name.getMethodName()); 517 518 // Put data in memstore 519 this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null); 520 this.store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null); 521 this.store.add(new KeyValue(row, family, qf3, 1, (byte[]) null), null); 522 this.store.add(new KeyValue(row, family, qf4, 1, (byte[]) null), null); 523 this.store.add(new KeyValue(row, family, qf5, 1, (byte[]) null), null); 524 this.store.add(new KeyValue(row, family, qf6, 1, (byte[]) null), null); 525 526 // Get 527 result = HBaseTestingUtil.getFromStoreFile(store, get.getRow(), qualifiers); 528 529 // Compare 530 assertCheck(); 531 } 532 533 @Test 534 public void testTimeRangeIfSomeCellsAreDroppedInFlush() throws IOException { 535 testTimeRangeIfSomeCellsAreDroppedInFlush(1); 536 testTimeRangeIfSomeCellsAreDroppedInFlush(3); 537 testTimeRangeIfSomeCellsAreDroppedInFlush(5); 538 } 539 540 private void testTimeRangeIfSomeCellsAreDroppedInFlush(int maxVersion) throws IOException { 541 init(this.name.getMethodName(), TEST_UTIL.getConfiguration(), 542 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(maxVersion).build()); 543 long currentTs = 100; 544 long minTs = currentTs; 545 // the extra cell won't be flushed to disk, 546 // so the min of timerange will be different between memStore and hfile. 547 for (int i = 0; i != (maxVersion + 1); ++i) { 548 this.store.add(new KeyValue(row, family, qf1, ++currentTs, (byte[]) null), null); 549 if (i == 1) { 550 minTs = currentTs; 551 } 552 } 553 flushStore(store, id++); 554 555 Collection<HStoreFile> files = store.getStorefiles(); 556 assertEquals(1, files.size()); 557 HStoreFile f = files.iterator().next(); 558 f.initReader(); 559 StoreFileReader reader = f.getReader(); 560 assertEquals(minTs, reader.timeRange.getMin()); 561 assertEquals(currentTs, reader.timeRange.getMax()); 562 } 563 564 /** 565 * Getting data from files only 566 */ 567 @Test 568 public void testGet_FromFilesOnly() throws IOException { 569 init(this.name.getMethodName()); 570 571 // Put data in memstore 572 this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null); 573 this.store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null); 574 // flush 575 flush(1); 576 577 // Add more data 578 this.store.add(new KeyValue(row, family, qf3, 1, (byte[]) null), null); 579 this.store.add(new KeyValue(row, family, qf4, 1, (byte[]) null), null); 580 // flush 581 flush(2); 582 583 // Add more data 584 this.store.add(new KeyValue(row, family, qf5, 1, (byte[]) null), null); 585 this.store.add(new KeyValue(row, family, qf6, 1, (byte[]) null), null); 586 // flush 587 flush(3); 588 589 // Get 590 result = HBaseTestingUtil.getFromStoreFile(store, get.getRow(), qualifiers); 591 // this.store.get(get, qualifiers, result); 592 593 // Need to sort the result since multiple files 594 Collections.sort(result, CellComparatorImpl.COMPARATOR); 595 596 // Compare 597 assertCheck(); 598 } 599 600 /** 601 * Getting data from memstore and files 602 */ 603 @Test 604 public void testGet_FromMemStoreAndFiles() throws IOException { 605 init(this.name.getMethodName()); 606 607 // Put data in memstore 608 this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null); 609 this.store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null); 610 // flush 611 flush(1); 612 613 // Add more data 614 this.store.add(new KeyValue(row, family, qf3, 1, (byte[]) null), null); 615 this.store.add(new KeyValue(row, family, qf4, 1, (byte[]) null), null); 616 // flush 617 flush(2); 618 619 // Add more data 620 this.store.add(new KeyValue(row, family, qf5, 1, (byte[]) null), null); 621 this.store.add(new KeyValue(row, family, qf6, 1, (byte[]) null), null); 622 623 // Get 624 result = HBaseTestingUtil.getFromStoreFile(store, get.getRow(), qualifiers); 625 626 // Need to sort the result since multiple files 627 Collections.sort(result, CellComparatorImpl.COMPARATOR); 628 629 // Compare 630 assertCheck(); 631 } 632 633 private void flush(int storeFilessize) throws IOException { 634 flushStore(store, id++); 635 assertEquals(storeFilessize, this.store.getStorefiles().size()); 636 assertEquals(0, ((AbstractMemStore) this.store.memstore).getActive().getCellsCount()); 637 } 638 639 private void assertCheck() { 640 assertEquals(expected.size(), result.size()); 641 for (int i = 0; i < expected.size(); i++) { 642 assertEquals(expected.get(i), result.get(i)); 643 } 644 } 645 646 @After 647 public void tearDown() throws Exception { 648 EnvironmentEdgeManagerTestHelper.reset(); 649 if (store != null) { 650 try { 651 store.close(); 652 } catch (IOException e) { 653 } 654 store = null; 655 } 656 if (region != null) { 657 region.close(); 658 region = null; 659 } 660 } 661 662 @AfterClass 663 public static void tearDownAfterClass() throws IOException { 664 TEST_UTIL.cleanupTestDir(); 665 } 666 667 @Test 668 public void testHandleErrorsInFlush() throws Exception { 669 LOG.info("Setting up a faulty file system that cannot write"); 670 671 final Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); 672 User user = User.createUserForTesting(conf, "testhandleerrorsinflush", new String[] { "foo" }); 673 // Inject our faulty LocalFileSystem 674 conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class); 675 user.runAs(new PrivilegedExceptionAction<Object>() { 676 @Override 677 public Object run() throws Exception { 678 // Make sure it worked (above is sensitive to caching details in hadoop core) 679 FileSystem fs = FileSystem.get(conf); 680 assertEquals(FaultyFileSystem.class, fs.getClass()); 681 682 // Initialize region 683 init(name.getMethodName(), conf); 684 685 LOG.info("Adding some data"); 686 store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null); 687 store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null); 688 store.add(new KeyValue(row, family, qf3, 1, (byte[]) null), null); 689 690 LOG.info("Before flush, we should have no files"); 691 692 Collection<StoreFileInfo> files = 693 store.getRegionFileSystem().getStoreFiles(store.getColumnFamilyName()); 694 assertEquals(0, files != null ? files.size() : 0); 695 696 // flush 697 try { 698 LOG.info("Flushing"); 699 flush(1); 700 fail("Didn't bubble up IOE!"); 701 } catch (IOException ioe) { 702 assertTrue(ioe.getMessage().contains("Fault injected")); 703 } 704 705 LOG.info("After failed flush, we should still have no files!"); 706 files = store.getRegionFileSystem().getStoreFiles(store.getColumnFamilyName()); 707 assertEquals(0, files != null ? files.size() : 0); 708 store.getHRegion().getWAL().close(); 709 return null; 710 } 711 }); 712 FileSystem.closeAllForUGI(user.getUGI()); 713 } 714 715 /** 716 * Faulty file system that will fail if you write past its fault position the FIRST TIME only; 717 * thereafter it will succeed. Used by {@link TestHRegion} too. 718 */ 719 static class FaultyFileSystem extends FilterFileSystem { 720 List<SoftReference<FaultyOutputStream>> outStreams = new ArrayList<>(); 721 private long faultPos = 200; 722 AtomicBoolean fault = new AtomicBoolean(true); 723 724 public FaultyFileSystem() { 725 super(new LocalFileSystem()); 726 LOG.info("Creating faulty!"); 727 } 728 729 @Override 730 public FSDataOutputStream create(Path p) throws IOException { 731 return new FaultyOutputStream(super.create(p), faultPos, fault); 732 } 733 734 @Override 735 public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, 736 int bufferSize, short replication, long blockSize, Progressable progress) throws IOException { 737 return new FaultyOutputStream( 738 super.create(f, permission, overwrite, bufferSize, replication, blockSize, progress), 739 faultPos, fault); 740 } 741 742 @Override 743 public FSDataOutputStream createNonRecursive(Path f, boolean overwrite, int bufferSize, 744 short replication, long blockSize, Progressable progress) throws IOException { 745 // Fake it. Call create instead. The default implementation throws an IOE 746 // that this is not supported. 747 return create(f, overwrite, bufferSize, replication, blockSize, progress); 748 } 749 } 750 751 static class FaultyOutputStream extends FSDataOutputStream { 752 volatile long faultPos = Long.MAX_VALUE; 753 private final AtomicBoolean fault; 754 755 public FaultyOutputStream(FSDataOutputStream out, long faultPos, final AtomicBoolean fault) 756 throws IOException { 757 super(out, null); 758 this.faultPos = faultPos; 759 this.fault = fault; 760 } 761 762 @Override 763 public synchronized void write(byte[] buf, int offset, int length) throws IOException { 764 LOG.info("faulty stream write at pos " + getPos()); 765 injectFault(); 766 super.write(buf, offset, length); 767 } 768 769 private void injectFault() throws IOException { 770 if (this.fault.get() && getPos() >= faultPos) { 771 throw new IOException("Fault injected"); 772 } 773 } 774 } 775 776 private static StoreFlushContext flushStore(HStore store, long id) throws IOException { 777 StoreFlushContext storeFlushCtx = store.createFlushContext(id, FlushLifeCycleTracker.DUMMY); 778 storeFlushCtx.prepare(); 779 storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class)); 780 storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); 781 return storeFlushCtx; 782 } 783 784 /** 785 * Generate a list of KeyValues for testing based on given parameters 786 * @return the rows key-value list 787 */ 788 private List<Cell> getKeyValueSet(long[] timestamps, int numRows, byte[] qualifier, 789 byte[] family) { 790 List<Cell> kvList = new ArrayList<>(); 791 for (int i = 1; i <= numRows; i++) { 792 byte[] b = Bytes.toBytes(i); 793 for (long timestamp : timestamps) { 794 kvList.add(new KeyValue(b, family, qualifier, timestamp, b)); 795 } 796 } 797 return kvList; 798 } 799 800 /** 801 * Test to ensure correctness when using Stores with multiple timestamps 802 */ 803 @Test 804 public void testMultipleTimestamps() throws IOException { 805 int numRows = 1; 806 long[] timestamps1 = new long[] { 1, 5, 10, 20 }; 807 long[] timestamps2 = new long[] { 30, 80 }; 808 809 init(this.name.getMethodName()); 810 811 List<Cell> kvList1 = getKeyValueSet(timestamps1, numRows, qf1, family); 812 for (Cell kv : kvList1) { 813 this.store.add(kv, null); 814 } 815 816 flushStore(store, id++); 817 818 List<Cell> kvList2 = getKeyValueSet(timestamps2, numRows, qf1, family); 819 for (Cell kv : kvList2) { 820 this.store.add(kv, null); 821 } 822 823 List<Cell> result; 824 Get get = new Get(Bytes.toBytes(1)); 825 get.addColumn(family, qf1); 826 827 get.setTimeRange(0, 15); 828 result = HBaseTestingUtil.getFromStoreFile(store, get); 829 assertTrue(result.size() > 0); 830 831 get.setTimeRange(40, 90); 832 result = HBaseTestingUtil.getFromStoreFile(store, get); 833 assertTrue(result.size() > 0); 834 835 get.setTimeRange(10, 45); 836 result = HBaseTestingUtil.getFromStoreFile(store, get); 837 assertTrue(result.size() > 0); 838 839 get.setTimeRange(80, 145); 840 result = HBaseTestingUtil.getFromStoreFile(store, get); 841 assertTrue(result.size() > 0); 842 843 get.setTimeRange(1, 2); 844 result = HBaseTestingUtil.getFromStoreFile(store, get); 845 assertTrue(result.size() > 0); 846 847 get.setTimeRange(90, 200); 848 result = HBaseTestingUtil.getFromStoreFile(store, get); 849 assertTrue(result.size() == 0); 850 } 851 852 /** 853 * Test for HBASE-3492 - Test split on empty colfam (no store files). 854 * @throws IOException When the IO operations fail. 855 */ 856 @Test 857 public void testSplitWithEmptyColFam() throws IOException { 858 init(this.name.getMethodName()); 859 assertFalse(store.getSplitPoint().isPresent()); 860 } 861 862 @Test 863 public void testStoreUsesConfigurationFromHcdAndHtd() throws Exception { 864 final String CONFIG_KEY = "hbase.regionserver.thread.compaction.throttle"; 865 long anyValue = 10; 866 867 // We'll check that it uses correct config and propagates it appropriately by going thru 868 // the simplest "real" path I can find - "throttleCompaction", which just checks whether 869 // a number we pass in is higher than some config value, inside compactionPolicy. 870 Configuration conf = HBaseConfiguration.create(); 871 conf.setLong(CONFIG_KEY, anyValue); 872 init(name.getMethodName() + "-xml", conf); 873 assertTrue(store.throttleCompaction(anyValue + 1)); 874 assertFalse(store.throttleCompaction(anyValue)); 875 876 // HTD overrides XML. 877 --anyValue; 878 init( 879 name.getMethodName() + "-htd", conf, TableDescriptorBuilder 880 .newBuilder(TableName.valueOf(table)).setValue(CONFIG_KEY, Long.toString(anyValue)), 881 ColumnFamilyDescriptorBuilder.of(family)); 882 assertTrue(store.throttleCompaction(anyValue + 1)); 883 assertFalse(store.throttleCompaction(anyValue)); 884 885 // HCD overrides them both. 886 --anyValue; 887 init(name.getMethodName() + "-hcd", conf, 888 TableDescriptorBuilder.newBuilder(TableName.valueOf(table)).setValue(CONFIG_KEY, 889 Long.toString(anyValue)), 890 ColumnFamilyDescriptorBuilder.newBuilder(family).setValue(CONFIG_KEY, Long.toString(anyValue)) 891 .build()); 892 assertTrue(store.throttleCompaction(anyValue + 1)); 893 assertFalse(store.throttleCompaction(anyValue)); 894 } 895 896 public static class DummyStoreEngine extends DefaultStoreEngine { 897 public static DefaultCompactor lastCreatedCompactor = null; 898 899 @Override 900 protected void createComponents(Configuration conf, HStore store, CellComparator comparator) 901 throws IOException { 902 super.createComponents(conf, store, comparator); 903 lastCreatedCompactor = this.compactor; 904 } 905 } 906 907 @Test 908 public void testStoreUsesSearchEngineOverride() throws Exception { 909 Configuration conf = HBaseConfiguration.create(); 910 conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DummyStoreEngine.class.getName()); 911 init(this.name.getMethodName(), conf); 912 assertEquals(DummyStoreEngine.lastCreatedCompactor, this.store.storeEngine.getCompactor()); 913 } 914 915 private void addStoreFile() throws IOException { 916 HStoreFile f = this.store.getStorefiles().iterator().next(); 917 Path storedir = f.getPath().getParent(); 918 long seqid = this.store.getMaxSequenceId().orElse(0L); 919 Configuration c = TEST_UTIL.getConfiguration(); 920 FileSystem fs = FileSystem.get(c); 921 HFileContext fileContext = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).build(); 922 StoreFileWriter w = new StoreFileWriter.Builder(c, new CacheConfig(c), fs) 923 .withOutputDir(storedir).withFileContext(fileContext).build(); 924 w.appendMetadata(seqid + 1, false); 925 w.close(); 926 LOG.info("Added store file:" + w.getPath()); 927 } 928 929 private void archiveStoreFile(int index) throws IOException { 930 Collection<HStoreFile> files = this.store.getStorefiles(); 931 HStoreFile sf = null; 932 Iterator<HStoreFile> it = files.iterator(); 933 for (int i = 0; i <= index; i++) { 934 sf = it.next(); 935 } 936 store.getRegionFileSystem().removeStoreFiles(store.getColumnFamilyName(), 937 Lists.newArrayList(sf)); 938 } 939 940 private void closeCompactedFile(int index) throws IOException { 941 Collection<HStoreFile> files = 942 this.store.getStoreEngine().getStoreFileManager().getCompactedfiles(); 943 if (files.size() > 0) { 944 HStoreFile sf = null; 945 Iterator<HStoreFile> it = files.iterator(); 946 for (int i = 0; i <= index; i++) { 947 sf = it.next(); 948 } 949 sf.closeStoreFile(true); 950 store.getStoreEngine().getStoreFileManager() 951 .removeCompactedFiles(Collections.singletonList(sf)); 952 } 953 } 954 955 @Test 956 public void testRefreshStoreFiles() throws Exception { 957 init(name.getMethodName()); 958 959 assertEquals(0, this.store.getStorefilesCount()); 960 961 // Test refreshing store files when no store files are there 962 store.refreshStoreFiles(); 963 assertEquals(0, this.store.getStorefilesCount()); 964 965 // add some data, flush 966 this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null); 967 flush(1); 968 assertEquals(1, this.store.getStorefilesCount()); 969 970 // add one more file 971 addStoreFile(); 972 973 assertEquals(1, this.store.getStorefilesCount()); 974 store.refreshStoreFiles(); 975 assertEquals(2, this.store.getStorefilesCount()); 976 977 // add three more files 978 addStoreFile(); 979 addStoreFile(); 980 addStoreFile(); 981 982 assertEquals(2, this.store.getStorefilesCount()); 983 store.refreshStoreFiles(); 984 assertEquals(5, this.store.getStorefilesCount()); 985 986 closeCompactedFile(0); 987 archiveStoreFile(0); 988 989 assertEquals(5, this.store.getStorefilesCount()); 990 store.refreshStoreFiles(); 991 assertEquals(4, this.store.getStorefilesCount()); 992 993 archiveStoreFile(0); 994 archiveStoreFile(1); 995 archiveStoreFile(2); 996 997 assertEquals(4, this.store.getStorefilesCount()); 998 store.refreshStoreFiles(); 999 assertEquals(1, this.store.getStorefilesCount()); 1000 1001 archiveStoreFile(0); 1002 store.refreshStoreFiles(); 1003 assertEquals(0, this.store.getStorefilesCount()); 1004 } 1005 1006 @Test 1007 public void testRefreshStoreFilesNotChanged() throws IOException { 1008 init(name.getMethodName()); 1009 1010 assertEquals(0, this.store.getStorefilesCount()); 1011 1012 // add some data, flush 1013 this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null); 1014 flush(1); 1015 // add one more file 1016 addStoreFile(); 1017 1018 StoreEngine<?, ?, ?, ?> spiedStoreEngine = spy(store.getStoreEngine()); 1019 1020 // call first time after files changed 1021 spiedStoreEngine.refreshStoreFiles(); 1022 assertEquals(2, this.store.getStorefilesCount()); 1023 verify(spiedStoreEngine, times(1)).replaceStoreFiles(any(), any(), any(), any()); 1024 1025 // call second time 1026 spiedStoreEngine.refreshStoreFiles(); 1027 1028 // ensure that replaceStoreFiles is not called, i.e, the times does not change, if files are not 1029 // refreshed, 1030 verify(spiedStoreEngine, times(1)).replaceStoreFiles(any(), any(), any(), any()); 1031 } 1032 1033 private long countMemStoreScanner(StoreScanner scanner) { 1034 if (scanner.currentScanners == null) { 1035 return 0; 1036 } 1037 return scanner.currentScanners.stream().filter(s -> !s.isFileScanner()).count(); 1038 } 1039 1040 @Test 1041 public void testNumberOfMemStoreScannersAfterFlush() throws IOException { 1042 long seqId = 100; 1043 long timestamp = EnvironmentEdgeManager.currentTime(); 1044 Cell cell0 = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family) 1045 .setQualifier(qf1).setTimestamp(timestamp).setType(Cell.Type.Put).setValue(qf1).build(); 1046 PrivateCellUtil.setSequenceId(cell0, seqId); 1047 testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Collections.emptyList()); 1048 1049 Cell cell1 = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family) 1050 .setQualifier(qf2).setTimestamp(timestamp).setType(Cell.Type.Put).setValue(qf1).build(); 1051 PrivateCellUtil.setSequenceId(cell1, seqId); 1052 testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Arrays.asList(cell1)); 1053 1054 seqId = 101; 1055 timestamp = EnvironmentEdgeManager.currentTime(); 1056 Cell cell2 = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row2).setFamily(family) 1057 .setQualifier(qf2).setTimestamp(timestamp).setType(Cell.Type.Put).setValue(qf1).build(); 1058 PrivateCellUtil.setSequenceId(cell2, seqId); 1059 testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Arrays.asList(cell1, cell2)); 1060 } 1061 1062 private void testNumberOfMemStoreScannersAfterFlush(List<Cell> inputCellsBeforeSnapshot, 1063 List<Cell> inputCellsAfterSnapshot) throws IOException { 1064 init(this.name.getMethodName() + "-" + inputCellsBeforeSnapshot.size()); 1065 TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); 1066 long seqId = Long.MIN_VALUE; 1067 for (Cell c : inputCellsBeforeSnapshot) { 1068 quals.add(CellUtil.cloneQualifier(c)); 1069 seqId = Math.max(seqId, c.getSequenceId()); 1070 } 1071 for (Cell c : inputCellsAfterSnapshot) { 1072 quals.add(CellUtil.cloneQualifier(c)); 1073 seqId = Math.max(seqId, c.getSequenceId()); 1074 } 1075 inputCellsBeforeSnapshot.forEach(c -> store.add(c, null)); 1076 StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY); 1077 storeFlushCtx.prepare(); 1078 inputCellsAfterSnapshot.forEach(c -> store.add(c, null)); 1079 int numberOfMemScannersBeforeFlush = inputCellsAfterSnapshot.isEmpty() ? 1 : 2; 1080 try (StoreScanner s = (StoreScanner) store.getScanner(new Scan(), quals, seqId)) { 1081 // snapshot + active (if inputCellsAfterSnapshot isn't empty) 1082 assertEquals(numberOfMemScannersBeforeFlush, countMemStoreScanner(s)); 1083 storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class)); 1084 storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); 1085 // snapshot has no data after flush 1086 int numberOfMemScannersAfterFlush = inputCellsAfterSnapshot.isEmpty() ? 0 : 1; 1087 boolean more; 1088 int cellCount = 0; 1089 do { 1090 List<Cell> cells = new ArrayList<>(); 1091 more = s.next(cells); 1092 cellCount += cells.size(); 1093 assertEquals(more ? numberOfMemScannersAfterFlush : 0, countMemStoreScanner(s)); 1094 } while (more); 1095 assertEquals( 1096 "The number of cells added before snapshot is " + inputCellsBeforeSnapshot.size() 1097 + ", The number of cells added after snapshot is " + inputCellsAfterSnapshot.size(), 1098 inputCellsBeforeSnapshot.size() + inputCellsAfterSnapshot.size(), cellCount); 1099 // the current scanners is cleared 1100 assertEquals(0, countMemStoreScanner(s)); 1101 } 1102 } 1103 1104 private Cell createCell(byte[] qualifier, long ts, long sequenceId, byte[] value) 1105 throws IOException { 1106 return createCell(row, qualifier, ts, sequenceId, value); 1107 } 1108 1109 private Cell createCell(byte[] row, byte[] qualifier, long ts, long sequenceId, byte[] value) 1110 throws IOException { 1111 Cell c = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family) 1112 .setQualifier(qualifier).setTimestamp(ts).setType(Cell.Type.Put).setValue(value).build(); 1113 PrivateCellUtil.setSequenceId(c, sequenceId); 1114 return c; 1115 } 1116 1117 @Test 1118 public void testFlushBeforeCompletingScanWoFilter() throws IOException, InterruptedException { 1119 final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false); 1120 final int expectedSize = 3; 1121 testFlushBeforeCompletingScan(new MyListHook() { 1122 @Override 1123 public void hook(int currentSize) { 1124 if (currentSize == expectedSize - 1) { 1125 try { 1126 flushStore(store, id++); 1127 timeToGoNextRow.set(true); 1128 } catch (IOException e) { 1129 throw new RuntimeException(e); 1130 } 1131 } 1132 } 1133 }, new FilterBase() { 1134 @Override 1135 public Filter.ReturnCode filterCell(final Cell c) throws IOException { 1136 return ReturnCode.INCLUDE; 1137 } 1138 }, expectedSize); 1139 } 1140 1141 @Test 1142 public void testFlushBeforeCompletingScanWithFilter() throws IOException, InterruptedException { 1143 final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false); 1144 final int expectedSize = 2; 1145 testFlushBeforeCompletingScan(new MyListHook() { 1146 @Override 1147 public void hook(int currentSize) { 1148 if (currentSize == expectedSize - 1) { 1149 try { 1150 flushStore(store, id++); 1151 timeToGoNextRow.set(true); 1152 } catch (IOException e) { 1153 throw new RuntimeException(e); 1154 } 1155 } 1156 } 1157 }, new FilterBase() { 1158 @Override 1159 public Filter.ReturnCode filterCell(final Cell c) throws IOException { 1160 if (timeToGoNextRow.get()) { 1161 timeToGoNextRow.set(false); 1162 return ReturnCode.NEXT_ROW; 1163 } else { 1164 return ReturnCode.INCLUDE; 1165 } 1166 } 1167 }, expectedSize); 1168 } 1169 1170 @Test 1171 public void testFlushBeforeCompletingScanWithFilterHint() 1172 throws IOException, InterruptedException { 1173 final AtomicBoolean timeToGetHint = new AtomicBoolean(false); 1174 final int expectedSize = 2; 1175 testFlushBeforeCompletingScan(new MyListHook() { 1176 @Override 1177 public void hook(int currentSize) { 1178 if (currentSize == expectedSize - 1) { 1179 try { 1180 flushStore(store, id++); 1181 timeToGetHint.set(true); 1182 } catch (IOException e) { 1183 throw new RuntimeException(e); 1184 } 1185 } 1186 } 1187 }, new FilterBase() { 1188 @Override 1189 public Filter.ReturnCode filterCell(final Cell c) throws IOException { 1190 if (timeToGetHint.get()) { 1191 timeToGetHint.set(false); 1192 return Filter.ReturnCode.SEEK_NEXT_USING_HINT; 1193 } else { 1194 return Filter.ReturnCode.INCLUDE; 1195 } 1196 } 1197 1198 @Override 1199 public Cell getNextCellHint(Cell currentCell) throws IOException { 1200 return currentCell; 1201 } 1202 }, expectedSize); 1203 } 1204 1205 private void testFlushBeforeCompletingScan(MyListHook hook, Filter filter, int expectedSize) 1206 throws IOException, InterruptedException { 1207 Configuration conf = HBaseConfiguration.create(); 1208 byte[] r0 = Bytes.toBytes("row0"); 1209 byte[] r1 = Bytes.toBytes("row1"); 1210 byte[] r2 = Bytes.toBytes("row2"); 1211 byte[] value0 = Bytes.toBytes("value0"); 1212 byte[] value1 = Bytes.toBytes("value1"); 1213 byte[] value2 = Bytes.toBytes("value2"); 1214 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 1215 long ts = EnvironmentEdgeManager.currentTime(); 1216 long seqId = 100; 1217 init(name.getMethodName(), conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), 1218 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(1).build(), 1219 new MyStoreHook() { 1220 @Override 1221 public long getSmallestReadPoint(HStore store) { 1222 return seqId + 3; 1223 } 1224 }); 1225 // The cells having the value0 won't be flushed to disk because the value of max version is 1 1226 store.add(createCell(r0, qf1, ts, seqId, value0), memStoreSizing); 1227 store.add(createCell(r0, qf2, ts, seqId, value0), memStoreSizing); 1228 store.add(createCell(r0, qf3, ts, seqId, value0), memStoreSizing); 1229 store.add(createCell(r1, qf1, ts + 1, seqId + 1, value1), memStoreSizing); 1230 store.add(createCell(r1, qf2, ts + 1, seqId + 1, value1), memStoreSizing); 1231 store.add(createCell(r1, qf3, ts + 1, seqId + 1, value1), memStoreSizing); 1232 store.add(createCell(r2, qf1, ts + 2, seqId + 2, value2), memStoreSizing); 1233 store.add(createCell(r2, qf2, ts + 2, seqId + 2, value2), memStoreSizing); 1234 store.add(createCell(r2, qf3, ts + 2, seqId + 2, value2), memStoreSizing); 1235 store.add(createCell(r1, qf1, ts + 3, seqId + 3, value1), memStoreSizing); 1236 store.add(createCell(r1, qf2, ts + 3, seqId + 3, value1), memStoreSizing); 1237 store.add(createCell(r1, qf3, ts + 3, seqId + 3, value1), memStoreSizing); 1238 List<Cell> myList = new MyList<>(hook); 1239 Scan scan = new Scan().withStartRow(r1).setFilter(filter); 1240 try (InternalScanner scanner = (InternalScanner) store.getScanner(scan, null, seqId + 3)) { 1241 // r1 1242 scanner.next(myList); 1243 assertEquals(expectedSize, myList.size()); 1244 for (Cell c : myList) { 1245 byte[] actualValue = CellUtil.cloneValue(c); 1246 assertTrue("expected:" + Bytes.toStringBinary(value1) + ", actual:" 1247 + Bytes.toStringBinary(actualValue), Bytes.equals(actualValue, value1)); 1248 } 1249 List<Cell> normalList = new ArrayList<>(3); 1250 // r2 1251 scanner.next(normalList); 1252 assertEquals(3, normalList.size()); 1253 for (Cell c : normalList) { 1254 byte[] actualValue = CellUtil.cloneValue(c); 1255 assertTrue("expected:" + Bytes.toStringBinary(value2) + ", actual:" 1256 + Bytes.toStringBinary(actualValue), Bytes.equals(actualValue, value2)); 1257 } 1258 } 1259 } 1260 1261 @Test 1262 public void testPreventLoopRead() throws Exception { 1263 init(this.name.getMethodName()); 1264 Configuration conf = HBaseConfiguration.create(); 1265 // use small heart beat cells 1266 conf.setLong(StoreScanner.HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK, 2); 1267 IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge(); 1268 EnvironmentEdgeManager.injectEdge(edge); 1269 byte[] r0 = Bytes.toBytes("row0"); 1270 byte[] value0 = Bytes.toBytes("value0"); 1271 byte[] value1 = Bytes.toBytes("value1"); 1272 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 1273 long ts = EnvironmentEdgeManager.currentTime(); 1274 long seqId = 100; 1275 init(name.getMethodName(), conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), 1276 ColumnFamilyDescriptorBuilder.newBuilder(family).setTimeToLive(10).build(), 1277 new MyStoreHook() { 1278 @Override 1279 public long getSmallestReadPoint(HStore store) { 1280 return seqId + 3; 1281 } 1282 }); 1283 // The cells having the value0 will be expired 1284 store.add(createCell(r0, qf1, ts, seqId, value0), memStoreSizing); 1285 store.add(createCell(r0, qf2, ts, seqId, value0), memStoreSizing); 1286 store.add(createCell(r0, qf3, ts, seqId, value0), memStoreSizing); 1287 store.add(createCell(r0, qf4, ts + 10000 + 1, seqId, value1), memStoreSizing); 1288 store.add(createCell(r0, qf5, ts, seqId, value0), memStoreSizing); 1289 store.add(createCell(r0, qf6, ts + 10000 + 1, seqId, value1), memStoreSizing); 1290 1291 List<Cell> myList = new ArrayList<>(); 1292 Scan scan = new Scan().withStartRow(r0); 1293 ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(false); 1294 // test normal scan, should return all the cells 1295 ScannerContext scannerContext = contextBuilder.build(); 1296 try (InternalScanner scanner = (InternalScanner) store.getScanner(scan, null, seqId + 3)) { 1297 scanner.next(myList, scannerContext); 1298 assertEquals(6, myList.size()); 1299 } 1300 1301 // test skip two ttl cells and return with empty results, default prevent loop skip is on 1302 edge.incrementTime(10 * 1000); 1303 scannerContext = contextBuilder.build(); 1304 myList.clear(); 1305 try (InternalScanner scanner = (InternalScanner) store.getScanner(scan, null, seqId + 3)) { 1306 // r0 1307 scanner.next(myList, scannerContext); 1308 assertEquals(0, myList.size()); 1309 } 1310 1311 // should scan all non-ttl expired cells by iterative next 1312 int resultCells = 0; 1313 try (InternalScanner scanner = (InternalScanner) store.getScanner(scan, null, seqId + 3)) { 1314 boolean hasMore = true; 1315 while (hasMore) { 1316 myList.clear(); 1317 hasMore = scanner.next(myList, scannerContext); 1318 assertTrue(myList.size() < 6); 1319 resultCells += myList.size(); 1320 } 1321 for (Cell c : myList) { 1322 byte[] actualValue = CellUtil.cloneValue(c); 1323 assertTrue("expected:" + Bytes.toStringBinary(value1) + ", actual:" 1324 + Bytes.toStringBinary(actualValue), Bytes.equals(actualValue, value1)); 1325 } 1326 } 1327 assertEquals(2, resultCells); 1328 } 1329 1330 @Test 1331 public void testCreateScannerAndSnapshotConcurrently() throws IOException, InterruptedException { 1332 Configuration conf = HBaseConfiguration.create(); 1333 conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore.class.getName()); 1334 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) 1335 .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); 1336 byte[] value = Bytes.toBytes("value"); 1337 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 1338 long ts = EnvironmentEdgeManager.currentTime(); 1339 long seqId = 100; 1340 // older data whihc shouldn't be "seen" by client 1341 store.add(createCell(qf1, ts, seqId, value), memStoreSizing); 1342 store.add(createCell(qf2, ts, seqId, value), memStoreSizing); 1343 store.add(createCell(qf3, ts, seqId, value), memStoreSizing); 1344 TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); 1345 quals.add(qf1); 1346 quals.add(qf2); 1347 quals.add(qf3); 1348 StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY); 1349 MyCompactingMemStore.START_TEST.set(true); 1350 Runnable flush = () -> { 1351 // this is blocked until we create first scanner from pipeline and snapshot -- phase (1/5) 1352 // recreate the active memstore -- phase (4/5) 1353 storeFlushCtx.prepare(); 1354 }; 1355 ExecutorService service = Executors.newSingleThreadExecutor(); 1356 service.submit(flush); 1357 // we get scanner from pipeline and snapshot but they are empty. -- phase (2/5) 1358 // this is blocked until we recreate the active memstore -- phase (3/5) 1359 // we get scanner from active memstore but it is empty -- phase (5/5) 1360 InternalScanner scanner = 1361 (InternalScanner) store.getScanner(new Scan(new Get(row)), quals, seqId + 1); 1362 service.shutdown(); 1363 service.awaitTermination(20, TimeUnit.SECONDS); 1364 try { 1365 try { 1366 List<Cell> results = new ArrayList<>(); 1367 scanner.next(results); 1368 assertEquals(3, results.size()); 1369 for (Cell c : results) { 1370 byte[] actualValue = CellUtil.cloneValue(c); 1371 assertTrue("expected:" + Bytes.toStringBinary(value) + ", actual:" 1372 + Bytes.toStringBinary(actualValue), Bytes.equals(actualValue, value)); 1373 } 1374 } finally { 1375 scanner.close(); 1376 } 1377 } finally { 1378 MyCompactingMemStore.START_TEST.set(false); 1379 storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class)); 1380 storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); 1381 } 1382 } 1383 1384 @Test 1385 public void testScanWithDoubleFlush() throws IOException { 1386 Configuration conf = HBaseConfiguration.create(); 1387 // Initialize region 1388 MyStore myStore = initMyStore(name.getMethodName(), conf, new MyStoreHook() { 1389 @Override 1390 public void getScanners(MyStore store) throws IOException { 1391 final long tmpId = id++; 1392 ExecutorService s = Executors.newSingleThreadExecutor(); 1393 s.submit(() -> { 1394 try { 1395 // flush the store before storescanner updates the scanners from store. 1396 // The current data will be flushed into files, and the memstore will 1397 // be clear. 1398 // -- phase (4/4) 1399 flushStore(store, tmpId); 1400 } catch (IOException ex) { 1401 throw new RuntimeException(ex); 1402 } 1403 }); 1404 s.shutdown(); 1405 try { 1406 // wait for the flush, the thread will be blocked in HStore#notifyChangedReadersObservers. 1407 s.awaitTermination(3, TimeUnit.SECONDS); 1408 } catch (InterruptedException ex) { 1409 } 1410 } 1411 }); 1412 byte[] oldValue = Bytes.toBytes("oldValue"); 1413 byte[] currentValue = Bytes.toBytes("currentValue"); 1414 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 1415 long ts = EnvironmentEdgeManager.currentTime(); 1416 long seqId = 100; 1417 // older data whihc shouldn't be "seen" by client 1418 myStore.add(createCell(qf1, ts, seqId, oldValue), memStoreSizing); 1419 myStore.add(createCell(qf2, ts, seqId, oldValue), memStoreSizing); 1420 myStore.add(createCell(qf3, ts, seqId, oldValue), memStoreSizing); 1421 long snapshotId = id++; 1422 // push older data into snapshot -- phase (1/4) 1423 StoreFlushContext storeFlushCtx = 1424 store.createFlushContext(snapshotId, FlushLifeCycleTracker.DUMMY); 1425 storeFlushCtx.prepare(); 1426 1427 // insert current data into active -- phase (2/4) 1428 myStore.add(createCell(qf1, ts + 1, seqId + 1, currentValue), memStoreSizing); 1429 myStore.add(createCell(qf2, ts + 1, seqId + 1, currentValue), memStoreSizing); 1430 myStore.add(createCell(qf3, ts + 1, seqId + 1, currentValue), memStoreSizing); 1431 TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); 1432 quals.add(qf1); 1433 quals.add(qf2); 1434 quals.add(qf3); 1435 try (InternalScanner scanner = 1436 (InternalScanner) myStore.getScanner(new Scan(new Get(row)), quals, seqId + 1)) { 1437 // complete the flush -- phase (3/4) 1438 storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class)); 1439 storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); 1440 1441 List<Cell> results = new ArrayList<>(); 1442 scanner.next(results); 1443 assertEquals(3, results.size()); 1444 for (Cell c : results) { 1445 byte[] actualValue = CellUtil.cloneValue(c); 1446 assertTrue("expected:" + Bytes.toStringBinary(currentValue) + ", actual:" 1447 + Bytes.toStringBinary(actualValue), Bytes.equals(actualValue, currentValue)); 1448 } 1449 } 1450 } 1451 1452 @Test 1453 public void testReclaimChunkWhenScaning() throws IOException { 1454 init("testReclaimChunkWhenScaning"); 1455 long ts = EnvironmentEdgeManager.currentTime(); 1456 long seqId = 100; 1457 byte[] value = Bytes.toBytes("value"); 1458 // older data whihc shouldn't be "seen" by client 1459 store.add(createCell(qf1, ts, seqId, value), null); 1460 store.add(createCell(qf2, ts, seqId, value), null); 1461 store.add(createCell(qf3, ts, seqId, value), null); 1462 TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); 1463 quals.add(qf1); 1464 quals.add(qf2); 1465 quals.add(qf3); 1466 try (InternalScanner scanner = 1467 (InternalScanner) store.getScanner(new Scan(new Get(row)), quals, seqId)) { 1468 List<Cell> results = new MyList<>(size -> { 1469 switch (size) { 1470 // 1) we get the first cell (qf1) 1471 // 2) flush the data to have StoreScanner update inner scanners 1472 // 3) the chunk will be reclaimed after updaing 1473 case 1: 1474 try { 1475 flushStore(store, id++); 1476 } catch (IOException e) { 1477 throw new RuntimeException(e); 1478 } 1479 break; 1480 // 1) we get the second cell (qf2) 1481 // 2) add some cell to fill some byte into the chunk (we have only one chunk) 1482 case 2: 1483 try { 1484 byte[] newValue = Bytes.toBytes("newValue"); 1485 // older data whihc shouldn't be "seen" by client 1486 store.add(createCell(qf1, ts + 1, seqId + 1, newValue), null); 1487 store.add(createCell(qf2, ts + 1, seqId + 1, newValue), null); 1488 store.add(createCell(qf3, ts + 1, seqId + 1, newValue), null); 1489 } catch (IOException e) { 1490 throw new RuntimeException(e); 1491 } 1492 break; 1493 default: 1494 break; 1495 } 1496 }); 1497 scanner.next(results); 1498 assertEquals(3, results.size()); 1499 for (Cell c : results) { 1500 byte[] actualValue = CellUtil.cloneValue(c); 1501 assertTrue("expected:" + Bytes.toStringBinary(value) + ", actual:" 1502 + Bytes.toStringBinary(actualValue), Bytes.equals(actualValue, value)); 1503 } 1504 } 1505 } 1506 1507 /** 1508 * If there are two running InMemoryFlushRunnable, the later InMemoryFlushRunnable may change the 1509 * versionedList. And the first InMemoryFlushRunnable will use the chagned versionedList to remove 1510 * the corresponding segments. In short, there will be some segements which isn't in merge are 1511 * removed. 1512 */ 1513 @Test 1514 public void testRunDoubleMemStoreCompactors() throws IOException, InterruptedException { 1515 int flushSize = 500; 1516 Configuration conf = HBaseConfiguration.create(); 1517 conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStoreWithCustomCompactor.class.getName()); 1518 conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.25); 1519 MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.set(0); 1520 conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushSize)); 1521 // Set the lower threshold to invoke the "MERGE" policy 1522 conf.set(MemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY, String.valueOf(0)); 1523 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) 1524 .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); 1525 byte[] value = Bytes.toBytes("thisisavarylargevalue"); 1526 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 1527 long ts = EnvironmentEdgeManager.currentTime(); 1528 long seqId = 100; 1529 // older data whihc shouldn't be "seen" by client 1530 store.add(createCell(qf1, ts, seqId, value), memStoreSizing); 1531 store.add(createCell(qf2, ts, seqId, value), memStoreSizing); 1532 store.add(createCell(qf3, ts, seqId, value), memStoreSizing); 1533 assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get()); 1534 StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY); 1535 storeFlushCtx.prepare(); 1536 // This shouldn't invoke another in-memory flush because the first compactor thread 1537 // hasn't accomplished the in-memory compaction. 1538 store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing); 1539 store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing); 1540 store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing); 1541 assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get()); 1542 // okay. Let the compaction be completed 1543 MyMemStoreCompactor.START_COMPACTOR_LATCH.countDown(); 1544 CompactingMemStore mem = (CompactingMemStore) ((HStore) store).memstore; 1545 while (mem.isMemStoreFlushingInMemory()) { 1546 TimeUnit.SECONDS.sleep(1); 1547 } 1548 // This should invoke another in-memory flush. 1549 store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing); 1550 store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing); 1551 store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing); 1552 assertEquals(2, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get()); 1553 conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1554 String.valueOf(TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE)); 1555 storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class)); 1556 storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); 1557 } 1558 1559 @Test 1560 public void testAge() throws IOException { 1561 long currentTime = EnvironmentEdgeManager.currentTime(); 1562 ManualEnvironmentEdge edge = new ManualEnvironmentEdge(); 1563 edge.setValue(currentTime); 1564 EnvironmentEdgeManager.injectEdge(edge); 1565 Configuration conf = TEST_UTIL.getConfiguration(); 1566 ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.of(family); 1567 initHRegion(name.getMethodName(), conf, 1568 TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), hcd, null, false); 1569 HStore store = new HStore(region, hcd, conf, false) { 1570 1571 @Override 1572 protected StoreEngine<?, ?, ?, ?> createStoreEngine(HStore store, Configuration conf, 1573 CellComparator kvComparator) throws IOException { 1574 List<HStoreFile> storefiles = 1575 Arrays.asList(mockStoreFile(currentTime - 10), mockStoreFile(currentTime - 100), 1576 mockStoreFile(currentTime - 1000), mockStoreFile(currentTime - 10000)); 1577 StoreFileManager sfm = mock(StoreFileManager.class); 1578 when(sfm.getStorefiles()).thenReturn(storefiles); 1579 StoreEngine<?, ?, ?, ?> storeEngine = mock(StoreEngine.class); 1580 when(storeEngine.getStoreFileManager()).thenReturn(sfm); 1581 return storeEngine; 1582 } 1583 }; 1584 assertEquals(10L, store.getMinStoreFileAge().getAsLong()); 1585 assertEquals(10000L, store.getMaxStoreFileAge().getAsLong()); 1586 assertEquals((10 + 100 + 1000 + 10000) / 4.0, store.getAvgStoreFileAge().getAsDouble(), 1E-4); 1587 } 1588 1589 private HStoreFile mockStoreFile(long createdTime) { 1590 StoreFileInfo info = mock(StoreFileInfo.class); 1591 when(info.getCreatedTimestamp()).thenReturn(createdTime); 1592 HStoreFile sf = mock(HStoreFile.class); 1593 when(sf.getReader()).thenReturn(mock(StoreFileReader.class)); 1594 when(sf.isHFile()).thenReturn(true); 1595 when(sf.getFileInfo()).thenReturn(info); 1596 return sf; 1597 } 1598 1599 private MyStore initMyStore(String methodName, Configuration conf, MyStoreHook hook) 1600 throws IOException { 1601 return (MyStore) init(methodName, conf, 1602 TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), 1603 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(5).build(), hook); 1604 } 1605 1606 private static class MyStore extends HStore { 1607 private final MyStoreHook hook; 1608 1609 MyStore(final HRegion region, final ColumnFamilyDescriptor family, 1610 final Configuration confParam, MyStoreHook hook, boolean switchToPread) throws IOException { 1611 super(region, family, confParam, false); 1612 this.hook = hook; 1613 } 1614 1615 @Override 1616 public List<KeyValueScanner> getScanners(List<HStoreFile> files, boolean cacheBlocks, 1617 boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, 1618 boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt, 1619 boolean includeMemstoreScanner) throws IOException { 1620 hook.getScanners(this); 1621 return super.getScanners(files, cacheBlocks, usePread, isCompaction, matcher, startRow, true, 1622 stopRow, false, readPt, includeMemstoreScanner); 1623 } 1624 1625 @Override 1626 public long getSmallestReadPoint() { 1627 return hook.getSmallestReadPoint(this); 1628 } 1629 } 1630 1631 private abstract static class MyStoreHook { 1632 1633 void getScanners(MyStore store) throws IOException { 1634 } 1635 1636 long getSmallestReadPoint(HStore store) { 1637 return store.getHRegion().getSmallestReadPoint(); 1638 } 1639 } 1640 1641 @Test 1642 public void testSwitchingPreadtoStreamParallelyWithCompactionDischarger() throws Exception { 1643 Configuration conf = HBaseConfiguration.create(); 1644 conf.set("hbase.hstore.engine.class", DummyStoreEngine.class.getName()); 1645 conf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 0); 1646 // Set the lower threshold to invoke the "MERGE" policy 1647 MyStore store = initMyStore(name.getMethodName(), conf, new MyStoreHook() { 1648 }); 1649 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 1650 long ts = EnvironmentEdgeManager.currentTime(); 1651 long seqID = 1L; 1652 // Add some data to the region and do some flushes 1653 for (int i = 1; i < 10; i++) { 1654 store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), 1655 memStoreSizing); 1656 } 1657 // flush them 1658 flushStore(store, seqID); 1659 for (int i = 11; i < 20; i++) { 1660 store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), 1661 memStoreSizing); 1662 } 1663 // flush them 1664 flushStore(store, seqID); 1665 for (int i = 21; i < 30; i++) { 1666 store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), 1667 memStoreSizing); 1668 } 1669 // flush them 1670 flushStore(store, seqID); 1671 1672 assertEquals(3, store.getStorefilesCount()); 1673 Scan scan = new Scan(); 1674 scan.addFamily(family); 1675 Collection<HStoreFile> storefiles2 = store.getStorefiles(); 1676 ArrayList<HStoreFile> actualStorefiles = Lists.newArrayList(storefiles2); 1677 StoreScanner storeScanner = 1678 (StoreScanner) store.getScanner(scan, scan.getFamilyMap().get(family), Long.MAX_VALUE); 1679 // get the current heap 1680 KeyValueHeap heap = storeScanner.heap; 1681 // create more store files 1682 for (int i = 31; i < 40; i++) { 1683 store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), 1684 memStoreSizing); 1685 } 1686 // flush them 1687 flushStore(store, seqID); 1688 1689 for (int i = 41; i < 50; i++) { 1690 store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), 1691 memStoreSizing); 1692 } 1693 // flush them 1694 flushStore(store, seqID); 1695 storefiles2 = store.getStorefiles(); 1696 ArrayList<HStoreFile> actualStorefiles1 = Lists.newArrayList(storefiles2); 1697 actualStorefiles1.removeAll(actualStorefiles); 1698 // Do compaction 1699 MyThread thread = new MyThread(storeScanner); 1700 thread.start(); 1701 store.replaceStoreFiles(actualStorefiles, actualStorefiles1, false); 1702 thread.join(); 1703 KeyValueHeap heap2 = thread.getHeap(); 1704 assertFalse(heap.equals(heap2)); 1705 } 1706 1707 @Test 1708 public void testMaxPreadBytesConfiguredToBeLessThanZero() throws Exception { 1709 Configuration conf = HBaseConfiguration.create(); 1710 conf.set("hbase.hstore.engine.class", DummyStoreEngine.class.getName()); 1711 // Set 'hbase.storescanner.pread.max.bytes' < 0, so that StoreScanner will be a STREAM type. 1712 conf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, -1); 1713 MyStore store = initMyStore(name.getMethodName(), conf, new MyStoreHook() { 1714 }); 1715 Scan scan = new Scan(); 1716 scan.addFamily(family); 1717 // ReadType on Scan is still DEFAULT only. 1718 assertEquals(ReadType.DEFAULT, scan.getReadType()); 1719 StoreScanner storeScanner = 1720 (StoreScanner) store.getScanner(scan, scan.getFamilyMap().get(family), Long.MAX_VALUE); 1721 assertFalse(storeScanner.isScanUsePread()); 1722 } 1723 1724 @Test 1725 public void testSpaceQuotaChangeAfterReplacement() throws IOException { 1726 final TableName tn = TableName.valueOf(name.getMethodName()); 1727 init(name.getMethodName()); 1728 1729 RegionSizeStoreImpl sizeStore = new RegionSizeStoreImpl(); 1730 1731 HStoreFile sf1 = mockStoreFileWithLength(1024L); 1732 HStoreFile sf2 = mockStoreFileWithLength(2048L); 1733 HStoreFile sf3 = mockStoreFileWithLength(4096L); 1734 HStoreFile sf4 = mockStoreFileWithLength(8192L); 1735 1736 RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tn).setStartKey(Bytes.toBytes("a")) 1737 .setEndKey(Bytes.toBytes("b")).build(); 1738 1739 // Compacting two files down to one, reducing size 1740 sizeStore.put(regionInfo, 1024L + 4096L); 1741 store.updateSpaceQuotaAfterFileReplacement(sizeStore, regionInfo, Arrays.asList(sf1, sf3), 1742 Arrays.asList(sf2)); 1743 1744 assertEquals(2048L, sizeStore.getRegionSize(regionInfo).getSize()); 1745 1746 // The same file length in and out should have no change 1747 store.updateSpaceQuotaAfterFileReplacement(sizeStore, regionInfo, Arrays.asList(sf2), 1748 Arrays.asList(sf2)); 1749 1750 assertEquals(2048L, sizeStore.getRegionSize(regionInfo).getSize()); 1751 1752 // Increase the total size used 1753 store.updateSpaceQuotaAfterFileReplacement(sizeStore, regionInfo, Arrays.asList(sf2), 1754 Arrays.asList(sf3)); 1755 1756 assertEquals(4096L, sizeStore.getRegionSize(regionInfo).getSize()); 1757 1758 RegionInfo regionInfo2 = RegionInfoBuilder.newBuilder(tn).setStartKey(Bytes.toBytes("b")) 1759 .setEndKey(Bytes.toBytes("c")).build(); 1760 store.updateSpaceQuotaAfterFileReplacement(sizeStore, regionInfo2, null, Arrays.asList(sf4)); 1761 1762 assertEquals(8192L, sizeStore.getRegionSize(regionInfo2).getSize()); 1763 } 1764 1765 @Test 1766 public void testHFileContextSetWithCFAndTable() throws Exception { 1767 init(this.name.getMethodName()); 1768 StoreFileWriter writer = store.getStoreEngine() 1769 .createWriter(CreateStoreFileWriterParams.create().maxKeyCount(10000L) 1770 .compression(Compression.Algorithm.NONE).isCompaction(true).includeMVCCReadpoint(true) 1771 .includesTag(false).shouldDropBehind(true)); 1772 HFileContext hFileContext = writer.getHFileWriter().getFileContext(); 1773 assertArrayEquals(family, hFileContext.getColumnFamily()); 1774 assertArrayEquals(table, hFileContext.getTableName()); 1775 } 1776 1777 // This test is for HBASE-26026, HBase Write be stuck when active segment has no cell 1778 // but its dataSize exceeds inmemoryFlushSize 1779 @Test 1780 public void testCompactingMemStoreNoCellButDataSizeExceedsInmemoryFlushSize() 1781 throws IOException, InterruptedException { 1782 Configuration conf = HBaseConfiguration.create(); 1783 1784 byte[] smallValue = new byte[3]; 1785 byte[] largeValue = new byte[9]; 1786 final long timestamp = EnvironmentEdgeManager.currentTime(); 1787 final long seqId = 100; 1788 final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue); 1789 final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue); 1790 int smallCellByteSize = MutableSegment.getCellLength(smallCell); 1791 int largeCellByteSize = MutableSegment.getCellLength(largeCell); 1792 int flushByteSize = smallCellByteSize + largeCellByteSize - 2; 1793 1794 // set CompactingMemStore.inmemoryFlushSize to flushByteSize. 1795 conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore2.class.getName()); 1796 conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005); 1797 conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200)); 1798 1799 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) 1800 .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); 1801 1802 MyCompactingMemStore2 myCompactingMemStore = ((MyCompactingMemStore2) store.memstore); 1803 assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize); 1804 myCompactingMemStore.smallCellPreUpdateCounter.set(0); 1805 myCompactingMemStore.largeCellPreUpdateCounter.set(0); 1806 1807 final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>(); 1808 Thread smallCellThread = new Thread(() -> { 1809 try { 1810 store.add(smallCell, new NonThreadSafeMemStoreSizing()); 1811 } catch (Throwable exception) { 1812 exceptionRef.set(exception); 1813 } 1814 }); 1815 smallCellThread.setName(MyCompactingMemStore2.SMALL_CELL_THREAD_NAME); 1816 smallCellThread.start(); 1817 1818 String oldThreadName = Thread.currentThread().getName(); 1819 try { 1820 /** 1821 * 1.smallCellThread enters CompactingMemStore.checkAndAddToActiveSize first, then 1822 * largeCellThread enters CompactingMemStore.checkAndAddToActiveSize, and then largeCellThread 1823 * invokes flushInMemory. 1824 * <p/> 1825 * 2. After largeCellThread finished CompactingMemStore.flushInMemory method, smallCellThread 1826 * can add cell to currentActive . That is to say when largeCellThread called flushInMemory 1827 * method, CompactingMemStore.active has no cell. 1828 */ 1829 Thread.currentThread().setName(MyCompactingMemStore2.LARGE_CELL_THREAD_NAME); 1830 store.add(largeCell, new NonThreadSafeMemStoreSizing()); 1831 smallCellThread.join(); 1832 1833 for (int i = 0; i < 100; i++) { 1834 long currentTimestamp = timestamp + 100 + i; 1835 Cell cell = createCell(qf2, currentTimestamp, seqId, largeValue); 1836 store.add(cell, new NonThreadSafeMemStoreSizing()); 1837 } 1838 } finally { 1839 Thread.currentThread().setName(oldThreadName); 1840 } 1841 1842 assertTrue(exceptionRef.get() == null); 1843 1844 } 1845 1846 // This test is for HBASE-26210, HBase Write be stuck when there is cell which size exceeds 1847 // InmemoryFlushSize 1848 @Test(timeout = 60000) 1849 public void testCompactingMemStoreCellExceedInmemoryFlushSize() throws Exception { 1850 Configuration conf = HBaseConfiguration.create(); 1851 conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore6.class.getName()); 1852 1853 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) 1854 .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); 1855 1856 MyCompactingMemStore6 myCompactingMemStore = ((MyCompactingMemStore6) store.memstore); 1857 1858 int size = (int) (myCompactingMemStore.getInmemoryFlushSize()); 1859 byte[] value = new byte[size + 1]; 1860 1861 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 1862 long timestamp = EnvironmentEdgeManager.currentTime(); 1863 long seqId = 100; 1864 Cell cell = createCell(qf1, timestamp, seqId, value); 1865 int cellByteSize = MutableSegment.getCellLength(cell); 1866 store.add(cell, memStoreSizing); 1867 assertTrue(memStoreSizing.getCellsCount() == 1); 1868 assertTrue(memStoreSizing.getDataSize() == cellByteSize); 1869 // Waiting the in memory compaction completed, see HBASE-26438 1870 myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await(); 1871 } 1872 1873 // This test is for HBASE-26210 also, test write large cell and small cell concurrently when 1874 // InmemoryFlushSize is smaller,equal with and larger than cell size. 1875 @Test 1876 public void testCompactingMemStoreWriteLargeCellAndSmallCellConcurrently() 1877 throws IOException, InterruptedException { 1878 doWriteTestLargeCellAndSmallCellConcurrently( 1879 (smallCellByteSize, largeCellByteSize) -> largeCellByteSize - 1); 1880 doWriteTestLargeCellAndSmallCellConcurrently( 1881 (smallCellByteSize, largeCellByteSize) -> largeCellByteSize); 1882 doWriteTestLargeCellAndSmallCellConcurrently( 1883 (smallCellByteSize, largeCellByteSize) -> smallCellByteSize + largeCellByteSize - 1); 1884 doWriteTestLargeCellAndSmallCellConcurrently( 1885 (smallCellByteSize, largeCellByteSize) -> smallCellByteSize + largeCellByteSize); 1886 doWriteTestLargeCellAndSmallCellConcurrently( 1887 (smallCellByteSize, largeCellByteSize) -> smallCellByteSize + largeCellByteSize + 1); 1888 } 1889 1890 private void doWriteTestLargeCellAndSmallCellConcurrently(IntBinaryOperator getFlushByteSize) 1891 throws IOException, InterruptedException { 1892 1893 Configuration conf = HBaseConfiguration.create(); 1894 1895 byte[] smallValue = new byte[3]; 1896 byte[] largeValue = new byte[100]; 1897 final long timestamp = EnvironmentEdgeManager.currentTime(); 1898 final long seqId = 100; 1899 final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue); 1900 final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue); 1901 int smallCellByteSize = MutableSegment.getCellLength(smallCell); 1902 int largeCellByteSize = MutableSegment.getCellLength(largeCell); 1903 int flushByteSize = getFlushByteSize.applyAsInt(smallCellByteSize, largeCellByteSize); 1904 boolean flushByteSizeLessThanSmallAndLargeCellSize = 1905 flushByteSize < (smallCellByteSize + largeCellByteSize); 1906 1907 conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore3.class.getName()); 1908 conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005); 1909 conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200)); 1910 1911 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) 1912 .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); 1913 1914 MyCompactingMemStore3 myCompactingMemStore = ((MyCompactingMemStore3) store.memstore); 1915 assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize); 1916 myCompactingMemStore.disableCompaction(); 1917 if (flushByteSizeLessThanSmallAndLargeCellSize) { 1918 myCompactingMemStore.flushByteSizeLessThanSmallAndLargeCellSize = true; 1919 } else { 1920 myCompactingMemStore.flushByteSizeLessThanSmallAndLargeCellSize = false; 1921 } 1922 1923 final ThreadSafeMemStoreSizing memStoreSizing = new ThreadSafeMemStoreSizing(); 1924 final AtomicLong totalCellByteSize = new AtomicLong(0); 1925 final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>(); 1926 Thread smallCellThread = new Thread(() -> { 1927 try { 1928 for (int i = 1; i <= MyCompactingMemStore3.CELL_COUNT; i++) { 1929 long currentTimestamp = timestamp + i; 1930 Cell cell = createCell(qf1, currentTimestamp, seqId, smallValue); 1931 totalCellByteSize.addAndGet(MutableSegment.getCellLength(cell)); 1932 store.add(cell, memStoreSizing); 1933 } 1934 } catch (Throwable exception) { 1935 exceptionRef.set(exception); 1936 1937 } 1938 }); 1939 smallCellThread.setName(MyCompactingMemStore3.SMALL_CELL_THREAD_NAME); 1940 smallCellThread.start(); 1941 1942 String oldThreadName = Thread.currentThread().getName(); 1943 try { 1944 /** 1945 * When flushByteSizeLessThanSmallAndLargeCellSize is true: 1946 * </p> 1947 * 1.smallCellThread enters MyCompactingMemStore3.checkAndAddToActiveSize first, then 1948 * largeCellThread enters MyCompactingMemStore3.checkAndAddToActiveSize, and then 1949 * largeCellThread invokes flushInMemory. 1950 * <p/> 1951 * 2. After largeCellThread finished CompactingMemStore.flushInMemory method, smallCellThread 1952 * can run into MyCompactingMemStore3.checkAndAddToActiveSize again. 1953 * <p/> 1954 * When flushByteSizeLessThanSmallAndLargeCellSize is false: smallCellThread and 1955 * largeCellThread concurrently write one cell and wait each other, and then write another 1956 * cell etc. 1957 */ 1958 Thread.currentThread().setName(MyCompactingMemStore3.LARGE_CELL_THREAD_NAME); 1959 for (int i = 1; i <= MyCompactingMemStore3.CELL_COUNT; i++) { 1960 long currentTimestamp = timestamp + i; 1961 Cell cell = createCell(qf2, currentTimestamp, seqId, largeValue); 1962 totalCellByteSize.addAndGet(MutableSegment.getCellLength(cell)); 1963 store.add(cell, memStoreSizing); 1964 } 1965 smallCellThread.join(); 1966 1967 assertTrue(exceptionRef.get() == null); 1968 assertTrue(memStoreSizing.getCellsCount() == (MyCompactingMemStore3.CELL_COUNT * 2)); 1969 assertTrue(memStoreSizing.getDataSize() == totalCellByteSize.get()); 1970 if (flushByteSizeLessThanSmallAndLargeCellSize) { 1971 assertTrue(myCompactingMemStore.flushCounter.get() == MyCompactingMemStore3.CELL_COUNT); 1972 } else { 1973 assertTrue( 1974 myCompactingMemStore.flushCounter.get() <= (MyCompactingMemStore3.CELL_COUNT - 1)); 1975 } 1976 } finally { 1977 Thread.currentThread().setName(oldThreadName); 1978 } 1979 } 1980 1981 /** 1982 * <pre> 1983 * This test is for HBASE-26384, 1984 * test {@link CompactingMemStore#flattenOneSegment} and {@link CompactingMemStore#snapshot()} 1985 * execute concurrently. 1986 * The threads sequence before HBASE-26384 is(The bug only exists for branch-2,and I add UTs 1987 * for both branch-2 and master): 1988 * 1. The {@link CompactingMemStore} size exceeds 1989 * {@link CompactingMemStore#getInmemoryFlushSize()},the write thread adds a new 1990 * {@link ImmutableSegment} to the head of {@link CompactingMemStore#pipeline},and start a 1991 * in memory compact thread to execute {@link CompactingMemStore#inMemoryCompaction}. 1992 * 2. The in memory compact thread starts and then stopping before 1993 * {@link CompactingMemStore#flattenOneSegment}. 1994 * 3. The snapshot thread starts {@link CompactingMemStore#snapshot} concurrently,after the 1995 * snapshot thread executing {@link CompactingMemStore#getImmutableSegments},the in memory 1996 * compact thread continues. 1997 * Assuming {@link VersionedSegmentsList#version} returned from 1998 * {@link CompactingMemStore#getImmutableSegments} is v. 1999 * 4. The snapshot thread stopping before {@link CompactingMemStore#swapPipelineWithNull}. 2000 * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment}, 2001 * {@link CompactionPipeline#version} is still v. 2002 * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because 2003 * {@link CompactionPipeline#version} is v, {@link CompactingMemStore#swapPipelineWithNull} 2004 * thinks it is successful and continue flushing,but the {@link ImmutableSegment} in 2005 * {@link CompactionPipeline} has changed because 2006 * {@link CompactingMemStore#flattenOneSegment},so the {@link ImmutableSegment} is not 2007 * removed in fact and still remaining in {@link CompactionPipeline}. 2008 * 2009 * After HBASE-26384, the 5-6 step is changed to following, which is expected behavior: 2010 * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment}, 2011 * {@link CompactingMemStore#flattenOneSegment} change {@link CompactionPipeline#version} to 2012 * v+1. 2013 * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because 2014 * {@link CompactionPipeline#version} is v+1, {@link CompactingMemStore#swapPipelineWithNull} 2015 * failed and retry the while loop in {@link CompactingMemStore#pushPipelineToSnapshot} once 2016 * again, because there is no concurrent {@link CompactingMemStore#inMemoryCompaction} now, 2017 * {@link CompactingMemStore#swapPipelineWithNull} succeeds. 2018 * </pre> 2019 */ 2020 @Test 2021 public void testFlattenAndSnapshotCompactingMemStoreConcurrently() throws Exception { 2022 Configuration conf = HBaseConfiguration.create(); 2023 2024 byte[] smallValue = new byte[3]; 2025 byte[] largeValue = new byte[9]; 2026 final long timestamp = EnvironmentEdgeManager.currentTime(); 2027 final long seqId = 100; 2028 final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue); 2029 final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue); 2030 int smallCellByteSize = MutableSegment.getCellLength(smallCell); 2031 int largeCellByteSize = MutableSegment.getCellLength(largeCell); 2032 int totalCellByteSize = (smallCellByteSize + largeCellByteSize); 2033 int flushByteSize = totalCellByteSize - 2; 2034 2035 // set CompactingMemStore.inmemoryFlushSize to flushByteSize. 2036 conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore4.class.getName()); 2037 conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005); 2038 conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200)); 2039 2040 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) 2041 .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); 2042 2043 MyCompactingMemStore4 myCompactingMemStore = ((MyCompactingMemStore4) store.memstore); 2044 assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize); 2045 2046 store.add(smallCell, new NonThreadSafeMemStoreSizing()); 2047 store.add(largeCell, new NonThreadSafeMemStoreSizing()); 2048 2049 String oldThreadName = Thread.currentThread().getName(); 2050 try { 2051 Thread.currentThread().setName(MyCompactingMemStore4.TAKE_SNAPSHOT_THREAD_NAME); 2052 /** 2053 * {@link CompactingMemStore#snapshot} must wait the in memory compact thread enters 2054 * {@link CompactingMemStore#flattenOneSegment},because {@link CompactingMemStore#snapshot} 2055 * would invoke {@link CompactingMemStore#stopCompaction}. 2056 */ 2057 myCompactingMemStore.snapShotStartCyclicCyclicBarrier.await(); 2058 2059 MemStoreSnapshot memStoreSnapshot = myCompactingMemStore.snapshot(); 2060 myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await(); 2061 2062 assertTrue(memStoreSnapshot.getCellsCount() == 2); 2063 assertTrue(((int) (memStoreSnapshot.getDataSize())) == totalCellByteSize); 2064 VersionedSegmentsList segments = myCompactingMemStore.getImmutableSegments(); 2065 assertTrue(segments.getNumOfSegments() == 0); 2066 assertTrue(segments.getNumOfCells() == 0); 2067 assertTrue(myCompactingMemStore.setInMemoryCompactionFlagCounter.get() == 1); 2068 assertTrue(myCompactingMemStore.swapPipelineWithNullCounter.get() == 2); 2069 } finally { 2070 Thread.currentThread().setName(oldThreadName); 2071 } 2072 } 2073 2074 /** 2075 * <pre> 2076 * This test is for HBASE-26384, 2077 * test {@link CompactingMemStore#flattenOneSegment}{@link CompactingMemStore#snapshot()} 2078 * and writeMemStore execute concurrently. 2079 * The threads sequence before HBASE-26384 is(The bug only exists for branch-2,and I add UTs 2080 * for both branch-2 and master): 2081 * 1. The {@link CompactingMemStore} size exceeds 2082 * {@link CompactingMemStore#getInmemoryFlushSize()},the write thread adds a new 2083 * {@link ImmutableSegment} to the head of {@link CompactingMemStore#pipeline},and start a 2084 * in memory compact thread to execute {@link CompactingMemStore#inMemoryCompaction}. 2085 * 2. The in memory compact thread starts and then stopping before 2086 * {@link CompactingMemStore#flattenOneSegment}. 2087 * 3. The snapshot thread starts {@link CompactingMemStore#snapshot} concurrently,after the 2088 * snapshot thread executing {@link CompactingMemStore#getImmutableSegments},the in memory 2089 * compact thread continues. 2090 * Assuming {@link VersionedSegmentsList#version} returned from 2091 * {@link CompactingMemStore#getImmutableSegments} is v. 2092 * 4. The snapshot thread stopping before {@link CompactingMemStore#swapPipelineWithNull}. 2093 * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment}, 2094 * {@link CompactionPipeline#version} is still v. 2095 * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because 2096 * {@link CompactionPipeline#version} is v, {@link CompactingMemStore#swapPipelineWithNull} 2097 * thinks it is successful and continue flushing,but the {@link ImmutableSegment} in 2098 * {@link CompactionPipeline} has changed because 2099 * {@link CompactingMemStore#flattenOneSegment},so the {@link ImmutableSegment} is not 2100 * removed in fact and still remaining in {@link CompactionPipeline}. 2101 * 2102 * After HBASE-26384, the 5-6 step is changed to following, which is expected behavior, 2103 * and I add step 7-8 to test there is new segment added before retry. 2104 * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment}, 2105 * {@link CompactingMemStore#flattenOneSegment} change {@link CompactionPipeline#version} to 2106 * v+1. 2107 * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because 2108 * {@link CompactionPipeline#version} is v+1, {@link CompactingMemStore#swapPipelineWithNull} 2109 * failed and retry,{@link VersionedSegmentsList#version} returned from 2110 * {@link CompactingMemStore#getImmutableSegments} is v+1. 2111 * 7. The write thread continues writing to {@link CompactingMemStore} and 2112 * {@link CompactingMemStore} size exceeds {@link CompactingMemStore#getInmemoryFlushSize()}, 2113 * {@link CompactingMemStore#flushInMemory(MutableSegment)} is called and a new 2114 * {@link ImmutableSegment} is added to the head of {@link CompactingMemStore#pipeline}, 2115 * {@link CompactionPipeline#version} is still v+1. 2116 * 8. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because 2117 * {@link CompactionPipeline#version} is still v+1, 2118 * {@link CompactingMemStore#swapPipelineWithNull} succeeds.The new {@link ImmutableSegment} 2119 * remained at the head of {@link CompactingMemStore#pipeline},the old is removed by 2120 * {@link CompactingMemStore#swapPipelineWithNull}. 2121 * </pre> 2122 */ 2123 @Test 2124 public void testFlattenSnapshotWriteCompactingMemeStoreConcurrently() throws Exception { 2125 Configuration conf = HBaseConfiguration.create(); 2126 2127 byte[] smallValue = new byte[3]; 2128 byte[] largeValue = new byte[9]; 2129 final long timestamp = EnvironmentEdgeManager.currentTime(); 2130 final long seqId = 100; 2131 final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue); 2132 final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue); 2133 int smallCellByteSize = MutableSegment.getCellLength(smallCell); 2134 int largeCellByteSize = MutableSegment.getCellLength(largeCell); 2135 int firstWriteCellByteSize = (smallCellByteSize + largeCellByteSize); 2136 int flushByteSize = firstWriteCellByteSize - 2; 2137 2138 // set CompactingMemStore.inmemoryFlushSize to flushByteSize. 2139 conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore5.class.getName()); 2140 conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005); 2141 conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200)); 2142 2143 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) 2144 .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); 2145 2146 final MyCompactingMemStore5 myCompactingMemStore = ((MyCompactingMemStore5) store.memstore); 2147 assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize); 2148 2149 store.add(smallCell, new NonThreadSafeMemStoreSizing()); 2150 store.add(largeCell, new NonThreadSafeMemStoreSizing()); 2151 2152 final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>(); 2153 final Cell writeAgainCell1 = createCell(qf3, timestamp, seqId + 1, largeValue); 2154 final Cell writeAgainCell2 = createCell(qf4, timestamp, seqId + 1, largeValue); 2155 final int writeAgainCellByteSize = 2156 MutableSegment.getCellLength(writeAgainCell1) + MutableSegment.getCellLength(writeAgainCell2); 2157 final Thread writeAgainThread = new Thread(() -> { 2158 try { 2159 myCompactingMemStore.writeMemStoreAgainStartCyclicBarrier.await(); 2160 2161 store.add(writeAgainCell1, new NonThreadSafeMemStoreSizing()); 2162 store.add(writeAgainCell2, new NonThreadSafeMemStoreSizing()); 2163 2164 myCompactingMemStore.writeMemStoreAgainEndCyclicBarrier.await(); 2165 } catch (Throwable exception) { 2166 exceptionRef.set(exception); 2167 } 2168 }); 2169 writeAgainThread.setName(MyCompactingMemStore5.WRITE_AGAIN_THREAD_NAME); 2170 writeAgainThread.start(); 2171 2172 String oldThreadName = Thread.currentThread().getName(); 2173 try { 2174 Thread.currentThread().setName(MyCompactingMemStore5.TAKE_SNAPSHOT_THREAD_NAME); 2175 /** 2176 * {@link CompactingMemStore#snapshot} must wait the in memory compact thread enters 2177 * {@link CompactingMemStore#flattenOneSegment},because {@link CompactingMemStore#snapshot} 2178 * would invoke {@link CompactingMemStore#stopCompaction}. 2179 */ 2180 myCompactingMemStore.snapShotStartCyclicCyclicBarrier.await(); 2181 MemStoreSnapshot memStoreSnapshot = myCompactingMemStore.snapshot(); 2182 myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await(); 2183 writeAgainThread.join(); 2184 2185 assertTrue(memStoreSnapshot.getCellsCount() == 2); 2186 assertTrue(((int) (memStoreSnapshot.getDataSize())) == firstWriteCellByteSize); 2187 VersionedSegmentsList segments = myCompactingMemStore.getImmutableSegments(); 2188 assertTrue(segments.getNumOfSegments() == 1); 2189 assertTrue( 2190 ((int) (segments.getStoreSegments().get(0).getDataSize())) == writeAgainCellByteSize); 2191 assertTrue(segments.getNumOfCells() == 2); 2192 assertTrue(myCompactingMemStore.setInMemoryCompactionFlagCounter.get() == 2); 2193 assertTrue(exceptionRef.get() == null); 2194 assertTrue(myCompactingMemStore.swapPipelineWithNullCounter.get() == 2); 2195 } finally { 2196 Thread.currentThread().setName(oldThreadName); 2197 } 2198 } 2199 2200 /** 2201 * <pre> 2202 * This test is for HBASE-26465, 2203 * test {@link DefaultMemStore#clearSnapshot} and {@link DefaultMemStore#getScanners} execute 2204 * concurrently. The threads sequence before HBASE-26465 is: 2205 * 1.The flush thread starts {@link DefaultMemStore} flushing after some cells have be added to 2206 * {@link DefaultMemStore}. 2207 * 2.The flush thread stopping before {@link DefaultMemStore#clearSnapshot} in 2208 * {@link HStore#updateStorefiles} after completed flushing memStore to hfile. 2209 * 3.The scan thread starts and stopping after {@link DefaultMemStore#getSnapshotSegments} in 2210 * {@link DefaultMemStore#getScanners},here the scan thread gets the 2211 * {@link DefaultMemStore#snapshot} which is created by the flush thread. 2212 * 4.The flush thread continues {@link DefaultMemStore#clearSnapshot} and close 2213 * {@link DefaultMemStore#snapshot},because the reference count of the corresponding 2214 * {@link MemStoreLABImpl} is 0, the {@link Chunk}s in corresponding {@link MemStoreLABImpl} 2215 * are recycled. 2216 * 5.The scan thread continues {@link DefaultMemStore#getScanners},and create a 2217 * {@link SegmentScanner} for this {@link DefaultMemStore#snapshot}, and increase the 2218 * reference count of the corresponding {@link MemStoreLABImpl}, but {@link Chunk}s in 2219 * corresponding {@link MemStoreLABImpl} are recycled by step 4, and these {@link Chunk}s may 2220 * be overwritten by other write threads,which may cause serious problem. 2221 * After HBASE-26465,{@link DefaultMemStore#getScanners} and 2222 * {@link DefaultMemStore#clearSnapshot} could not execute concurrently. 2223 * </pre> 2224 */ 2225 @Test 2226 public void testClearSnapshotGetScannerConcurrently() throws Exception { 2227 Configuration conf = HBaseConfiguration.create(); 2228 2229 byte[] smallValue = new byte[3]; 2230 byte[] largeValue = new byte[9]; 2231 final long timestamp = EnvironmentEdgeManager.currentTime(); 2232 final long seqId = 100; 2233 final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue); 2234 final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue); 2235 TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); 2236 quals.add(qf1); 2237 quals.add(qf2); 2238 2239 conf.set(HStore.MEMSTORE_CLASS_NAME, MyDefaultMemStore.class.getName()); 2240 conf.setBoolean(WALFactory.WAL_ENABLED, false); 2241 2242 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build()); 2243 MyDefaultMemStore myDefaultMemStore = (MyDefaultMemStore) (store.memstore); 2244 myDefaultMemStore.store = store; 2245 2246 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 2247 store.add(smallCell, memStoreSizing); 2248 store.add(largeCell, memStoreSizing); 2249 2250 final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>(); 2251 final Thread flushThread = new Thread(() -> { 2252 try { 2253 flushStore(store, id++); 2254 } catch (Throwable exception) { 2255 exceptionRef.set(exception); 2256 } 2257 }); 2258 flushThread.setName(MyDefaultMemStore.FLUSH_THREAD_NAME); 2259 flushThread.start(); 2260 2261 String oldThreadName = Thread.currentThread().getName(); 2262 StoreScanner storeScanner = null; 2263 try { 2264 Thread.currentThread().setName(MyDefaultMemStore.GET_SCANNER_THREAD_NAME); 2265 2266 /** 2267 * Wait flush thread stopping before {@link DefaultMemStore#doClearSnapshot} 2268 */ 2269 myDefaultMemStore.getScannerCyclicBarrier.await(); 2270 2271 storeScanner = (StoreScanner) store.getScanner(new Scan(new Get(row)), quals, seqId + 1); 2272 flushThread.join(); 2273 2274 if (myDefaultMemStore.shouldWait) { 2275 SegmentScanner segmentScanner = getTypeKeyValueScanner(storeScanner, SegmentScanner.class); 2276 MemStoreLABImpl memStoreLAB = (MemStoreLABImpl) (segmentScanner.segment.getMemStoreLAB()); 2277 assertTrue(memStoreLAB.isClosed()); 2278 assertTrue(!memStoreLAB.chunks.isEmpty()); 2279 assertTrue(!memStoreLAB.isReclaimed()); 2280 2281 Cell cell1 = segmentScanner.next(); 2282 CellUtil.equals(smallCell, cell1); 2283 Cell cell2 = segmentScanner.next(); 2284 CellUtil.equals(largeCell, cell2); 2285 assertNull(segmentScanner.next()); 2286 } else { 2287 List<Cell> results = new ArrayList<>(); 2288 storeScanner.next(results); 2289 assertEquals(2, results.size()); 2290 CellUtil.equals(smallCell, results.get(0)); 2291 CellUtil.equals(largeCell, results.get(1)); 2292 } 2293 assertTrue(exceptionRef.get() == null); 2294 } finally { 2295 if (storeScanner != null) { 2296 storeScanner.close(); 2297 } 2298 Thread.currentThread().setName(oldThreadName); 2299 } 2300 } 2301 2302 @SuppressWarnings("unchecked") 2303 private <T> T getTypeKeyValueScanner(StoreScanner storeScanner, Class<T> keyValueScannerClass) { 2304 List<T> resultScanners = new ArrayList<T>(); 2305 for (KeyValueScanner keyValueScanner : storeScanner.currentScanners) { 2306 if (keyValueScannerClass.isInstance(keyValueScanner)) { 2307 resultScanners.add((T) keyValueScanner); 2308 } 2309 } 2310 assertTrue(resultScanners.size() == 1); 2311 return resultScanners.get(0); 2312 } 2313 2314 @Test 2315 public void testOnConfigurationChange() throws IOException { 2316 final int COMMON_MAX_FILES_TO_COMPACT = 10; 2317 final int NEW_COMMON_MAX_FILES_TO_COMPACT = 8; 2318 final int STORE_MAX_FILES_TO_COMPACT = 6; 2319 2320 // Build a table that its maxFileToCompact different from common configuration. 2321 Configuration conf = HBaseConfiguration.create(); 2322 conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 2323 COMMON_MAX_FILES_TO_COMPACT); 2324 ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.newBuilder(family) 2325 .setConfiguration(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 2326 String.valueOf(STORE_MAX_FILES_TO_COMPACT)) 2327 .build(); 2328 init(this.name.getMethodName(), conf, hcd); 2329 2330 // After updating common configuration, the conf in HStore itself must not be changed. 2331 conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 2332 NEW_COMMON_MAX_FILES_TO_COMPACT); 2333 this.store.onConfigurationChange(conf); 2334 assertEquals(STORE_MAX_FILES_TO_COMPACT, 2335 store.getStoreEngine().getCompactionPolicy().getConf().getMaxFilesToCompact()); 2336 } 2337 2338 /** 2339 * This test is for HBASE-26476 2340 */ 2341 @Test 2342 public void testExtendsDefaultMemStore() throws Exception { 2343 Configuration conf = HBaseConfiguration.create(); 2344 conf.setBoolean(WALFactory.WAL_ENABLED, false); 2345 2346 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build()); 2347 assertTrue(this.store.memstore.getClass() == DefaultMemStore.class); 2348 tearDown(); 2349 2350 conf.set(HStore.MEMSTORE_CLASS_NAME, CustomDefaultMemStore.class.getName()); 2351 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build()); 2352 assertTrue(this.store.memstore.getClass() == CustomDefaultMemStore.class); 2353 } 2354 2355 static class CustomDefaultMemStore extends DefaultMemStore { 2356 2357 public CustomDefaultMemStore(Configuration conf, CellComparator c, 2358 RegionServicesForStores regionServices) { 2359 super(conf, c, regionServices); 2360 } 2361 2362 } 2363 2364 /** 2365 * This test is for HBASE-26488 2366 */ 2367 @Test 2368 public void testMemoryLeakWhenFlushMemStoreRetrying() throws Exception { 2369 2370 Configuration conf = HBaseConfiguration.create(); 2371 2372 byte[] smallValue = new byte[3]; 2373 byte[] largeValue = new byte[9]; 2374 final long timestamp = EnvironmentEdgeManager.currentTime(); 2375 final long seqId = 100; 2376 final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue); 2377 final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue); 2378 TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); 2379 quals.add(qf1); 2380 quals.add(qf2); 2381 2382 conf.set(HStore.MEMSTORE_CLASS_NAME, MyDefaultMemStore1.class.getName()); 2383 conf.setBoolean(WALFactory.WAL_ENABLED, false); 2384 conf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY, 2385 MyDefaultStoreFlusher.class.getName()); 2386 2387 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build()); 2388 MyDefaultMemStore1 myDefaultMemStore = (MyDefaultMemStore1) (store.memstore); 2389 assertTrue((store.storeEngine.getStoreFlusher()) instanceof MyDefaultStoreFlusher); 2390 2391 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 2392 store.add(smallCell, memStoreSizing); 2393 store.add(largeCell, memStoreSizing); 2394 flushStore(store, id++); 2395 2396 MemStoreLABImpl memStoreLAB = 2397 (MemStoreLABImpl) (myDefaultMemStore.snapshotImmutableSegment.getMemStoreLAB()); 2398 assertTrue(memStoreLAB.isClosed()); 2399 assertTrue(memStoreLAB.getRefCntValue() == 0); 2400 assertTrue(memStoreLAB.isReclaimed()); 2401 assertTrue(memStoreLAB.chunks.isEmpty()); 2402 StoreScanner storeScanner = null; 2403 try { 2404 storeScanner = (StoreScanner) store.getScanner(new Scan(new Get(row)), quals, seqId + 1); 2405 assertTrue(store.storeEngine.getStoreFileManager().getStorefileCount() == 1); 2406 assertTrue(store.memstore.size().getCellsCount() == 0); 2407 assertTrue(store.memstore.getSnapshotSize().getCellsCount() == 0); 2408 assertTrue(storeScanner.currentScanners.size() == 1); 2409 assertTrue(storeScanner.currentScanners.get(0) instanceof StoreFileScanner); 2410 2411 List<Cell> results = new ArrayList<>(); 2412 storeScanner.next(results); 2413 assertEquals(2, results.size()); 2414 CellUtil.equals(smallCell, results.get(0)); 2415 CellUtil.equals(largeCell, results.get(1)); 2416 } finally { 2417 if (storeScanner != null) { 2418 storeScanner.close(); 2419 } 2420 } 2421 } 2422 2423 static class MyDefaultMemStore1 extends DefaultMemStore { 2424 2425 private ImmutableSegment snapshotImmutableSegment; 2426 2427 public MyDefaultMemStore1(Configuration conf, CellComparator c, 2428 RegionServicesForStores regionServices) { 2429 super(conf, c, regionServices); 2430 } 2431 2432 @Override 2433 public MemStoreSnapshot snapshot() { 2434 MemStoreSnapshot result = super.snapshot(); 2435 this.snapshotImmutableSegment = snapshot; 2436 return result; 2437 } 2438 2439 } 2440 2441 public static class MyDefaultStoreFlusher extends DefaultStoreFlusher { 2442 private static final AtomicInteger failCounter = new AtomicInteger(1); 2443 private static final AtomicInteger counter = new AtomicInteger(0); 2444 2445 public MyDefaultStoreFlusher(Configuration conf, HStore store) { 2446 super(conf, store); 2447 } 2448 2449 @Override 2450 public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId, 2451 MonitoredTask status, ThroughputController throughputController, 2452 FlushLifeCycleTracker tracker, Consumer<Path> writerCreationTracker) throws IOException { 2453 counter.incrementAndGet(); 2454 return super.flushSnapshot(snapshot, cacheFlushId, status, throughputController, tracker, 2455 writerCreationTracker); 2456 } 2457 2458 @Override 2459 protected void performFlush(InternalScanner scanner, final CellSink sink, 2460 ThroughputController throughputController) throws IOException { 2461 2462 final int currentCount = counter.get(); 2463 CellSink newCellSink = (cell) -> { 2464 if (currentCount <= failCounter.get()) { 2465 throw new IOException("Simulated exception by tests"); 2466 } 2467 sink.append(cell); 2468 }; 2469 super.performFlush(scanner, newCellSink, throughputController); 2470 } 2471 } 2472 2473 /** 2474 * This test is for HBASE-26494, test the {@link RefCnt} behaviors in {@link ImmutableMemStoreLAB} 2475 */ 2476 @Test 2477 public void testImmutableMemStoreLABRefCnt() throws Exception { 2478 Configuration conf = HBaseConfiguration.create(); 2479 2480 byte[] smallValue = new byte[3]; 2481 byte[] largeValue = new byte[9]; 2482 final long timestamp = EnvironmentEdgeManager.currentTime(); 2483 final long seqId = 100; 2484 final Cell smallCell1 = createCell(qf1, timestamp, seqId, smallValue); 2485 final Cell largeCell1 = createCell(qf2, timestamp, seqId, largeValue); 2486 final Cell smallCell2 = createCell(qf3, timestamp, seqId + 1, smallValue); 2487 final Cell largeCell2 = createCell(qf4, timestamp, seqId + 1, largeValue); 2488 final Cell smallCell3 = createCell(qf5, timestamp, seqId + 2, smallValue); 2489 final Cell largeCell3 = createCell(qf6, timestamp, seqId + 2, largeValue); 2490 2491 int smallCellByteSize = MutableSegment.getCellLength(smallCell1); 2492 int largeCellByteSize = MutableSegment.getCellLength(largeCell1); 2493 int firstWriteCellByteSize = (smallCellByteSize + largeCellByteSize); 2494 int flushByteSize = firstWriteCellByteSize - 2; 2495 2496 // set CompactingMemStore.inmemoryFlushSize to flushByteSize. 2497 conf.set(HStore.MEMSTORE_CLASS_NAME, CompactingMemStore.class.getName()); 2498 conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005); 2499 conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200)); 2500 conf.setBoolean(WALFactory.WAL_ENABLED, false); 2501 2502 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) 2503 .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); 2504 2505 final CompactingMemStore myCompactingMemStore = ((CompactingMemStore) store.memstore); 2506 assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize); 2507 myCompactingMemStore.allowCompaction.set(false); 2508 2509 NonThreadSafeMemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 2510 store.add(smallCell1, memStoreSizing); 2511 store.add(largeCell1, memStoreSizing); 2512 store.add(smallCell2, memStoreSizing); 2513 store.add(largeCell2, memStoreSizing); 2514 store.add(smallCell3, memStoreSizing); 2515 store.add(largeCell3, memStoreSizing); 2516 VersionedSegmentsList versionedSegmentsList = myCompactingMemStore.getImmutableSegments(); 2517 assertTrue(versionedSegmentsList.getNumOfSegments() == 3); 2518 List<ImmutableSegment> segments = versionedSegmentsList.getStoreSegments(); 2519 List<MemStoreLABImpl> memStoreLABs = new ArrayList<MemStoreLABImpl>(segments.size()); 2520 for (ImmutableSegment segment : segments) { 2521 memStoreLABs.add((MemStoreLABImpl) segment.getMemStoreLAB()); 2522 } 2523 List<KeyValueScanner> scanners1 = myCompactingMemStore.getScanners(Long.MAX_VALUE); 2524 for (MemStoreLABImpl memStoreLAB : memStoreLABs) { 2525 assertTrue(memStoreLAB.getRefCntValue() == 2); 2526 } 2527 2528 myCompactingMemStore.allowCompaction.set(true); 2529 myCompactingMemStore.flushInMemory(); 2530 2531 versionedSegmentsList = myCompactingMemStore.getImmutableSegments(); 2532 assertTrue(versionedSegmentsList.getNumOfSegments() == 1); 2533 ImmutableMemStoreLAB immutableMemStoreLAB = 2534 (ImmutableMemStoreLAB) (versionedSegmentsList.getStoreSegments().get(0).getMemStoreLAB()); 2535 for (MemStoreLABImpl memStoreLAB : memStoreLABs) { 2536 assertTrue(memStoreLAB.getRefCntValue() == 2); 2537 } 2538 2539 List<KeyValueScanner> scanners2 = myCompactingMemStore.getScanners(Long.MAX_VALUE); 2540 for (MemStoreLABImpl memStoreLAB : memStoreLABs) { 2541 assertTrue(memStoreLAB.getRefCntValue() == 2); 2542 } 2543 assertTrue(immutableMemStoreLAB.getRefCntValue() == 2); 2544 for (KeyValueScanner scanner : scanners1) { 2545 scanner.close(); 2546 } 2547 for (MemStoreLABImpl memStoreLAB : memStoreLABs) { 2548 assertTrue(memStoreLAB.getRefCntValue() == 1); 2549 } 2550 for (KeyValueScanner scanner : scanners2) { 2551 scanner.close(); 2552 } 2553 for (MemStoreLABImpl memStoreLAB : memStoreLABs) { 2554 assertTrue(memStoreLAB.getRefCntValue() == 1); 2555 } 2556 assertTrue(immutableMemStoreLAB.getRefCntValue() == 1); 2557 flushStore(store, id++); 2558 for (MemStoreLABImpl memStoreLAB : memStoreLABs) { 2559 assertTrue(memStoreLAB.getRefCntValue() == 0); 2560 } 2561 assertTrue(immutableMemStoreLAB.getRefCntValue() == 0); 2562 assertTrue(immutableMemStoreLAB.isClosed()); 2563 for (MemStoreLABImpl memStoreLAB : memStoreLABs) { 2564 assertTrue(memStoreLAB.isClosed()); 2565 assertTrue(memStoreLAB.isReclaimed()); 2566 assertTrue(memStoreLAB.chunks.isEmpty()); 2567 } 2568 } 2569 2570 private HStoreFile mockStoreFileWithLength(long length) { 2571 HStoreFile sf = mock(HStoreFile.class); 2572 StoreFileReader sfr = mock(StoreFileReader.class); 2573 when(sf.isHFile()).thenReturn(true); 2574 when(sf.getReader()).thenReturn(sfr); 2575 when(sfr.length()).thenReturn(length); 2576 return sf; 2577 } 2578 2579 private static class MyThread extends Thread { 2580 private StoreScanner scanner; 2581 private KeyValueHeap heap; 2582 2583 public MyThread(StoreScanner scanner) { 2584 this.scanner = scanner; 2585 } 2586 2587 public KeyValueHeap getHeap() { 2588 return this.heap; 2589 } 2590 2591 @Override 2592 public void run() { 2593 scanner.trySwitchToStreamRead(); 2594 heap = scanner.heap; 2595 } 2596 } 2597 2598 private static class MyMemStoreCompactor extends MemStoreCompactor { 2599 private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0); 2600 private static final CountDownLatch START_COMPACTOR_LATCH = new CountDownLatch(1); 2601 2602 public MyMemStoreCompactor(CompactingMemStore compactingMemStore, 2603 MemoryCompactionPolicy compactionPolicy) throws IllegalArgumentIOException { 2604 super(compactingMemStore, compactionPolicy); 2605 } 2606 2607 @Override 2608 public boolean start() throws IOException { 2609 boolean isFirst = RUNNER_COUNT.getAndIncrement() == 0; 2610 if (isFirst) { 2611 try { 2612 START_COMPACTOR_LATCH.await(); 2613 return super.start(); 2614 } catch (InterruptedException ex) { 2615 throw new RuntimeException(ex); 2616 } 2617 } 2618 return super.start(); 2619 } 2620 } 2621 2622 public static class MyCompactingMemStoreWithCustomCompactor extends CompactingMemStore { 2623 private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0); 2624 2625 public MyCompactingMemStoreWithCustomCompactor(Configuration conf, CellComparatorImpl c, 2626 HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy) 2627 throws IOException { 2628 super(conf, c, store, regionServices, compactionPolicy); 2629 } 2630 2631 @Override 2632 protected MemStoreCompactor createMemStoreCompactor(MemoryCompactionPolicy compactionPolicy) 2633 throws IllegalArgumentIOException { 2634 return new MyMemStoreCompactor(this, compactionPolicy); 2635 } 2636 2637 @Override 2638 protected boolean setInMemoryCompactionFlag() { 2639 boolean rval = super.setInMemoryCompactionFlag(); 2640 if (rval) { 2641 RUNNER_COUNT.incrementAndGet(); 2642 if (LOG.isDebugEnabled()) { 2643 LOG.debug("runner count: " + RUNNER_COUNT.get()); 2644 } 2645 } 2646 return rval; 2647 } 2648 } 2649 2650 public static class MyCompactingMemStore extends CompactingMemStore { 2651 private static final AtomicBoolean START_TEST = new AtomicBoolean(false); 2652 private final CountDownLatch getScannerLatch = new CountDownLatch(1); 2653 private final CountDownLatch snapshotLatch = new CountDownLatch(1); 2654 2655 public MyCompactingMemStore(Configuration conf, CellComparatorImpl c, HStore store, 2656 RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy) 2657 throws IOException { 2658 super(conf, c, store, regionServices, compactionPolicy); 2659 } 2660 2661 @Override 2662 protected List<KeyValueScanner> createList(int capacity) { 2663 if (START_TEST.get()) { 2664 try { 2665 getScannerLatch.countDown(); 2666 snapshotLatch.await(); 2667 } catch (InterruptedException e) { 2668 throw new RuntimeException(e); 2669 } 2670 } 2671 return new ArrayList<>(capacity); 2672 } 2673 2674 @Override 2675 protected void pushActiveToPipeline(MutableSegment active, boolean checkEmpty) { 2676 if (START_TEST.get()) { 2677 try { 2678 getScannerLatch.await(); 2679 } catch (InterruptedException e) { 2680 throw new RuntimeException(e); 2681 } 2682 } 2683 2684 super.pushActiveToPipeline(active, checkEmpty); 2685 if (START_TEST.get()) { 2686 snapshotLatch.countDown(); 2687 } 2688 } 2689 } 2690 2691 interface MyListHook { 2692 void hook(int currentSize); 2693 } 2694 2695 private static class MyList<T> implements List<T> { 2696 private final List<T> delegatee = new ArrayList<>(); 2697 private final MyListHook hookAtAdd; 2698 2699 MyList(final MyListHook hookAtAdd) { 2700 this.hookAtAdd = hookAtAdd; 2701 } 2702 2703 @Override 2704 public int size() { 2705 return delegatee.size(); 2706 } 2707 2708 @Override 2709 public boolean isEmpty() { 2710 return delegatee.isEmpty(); 2711 } 2712 2713 @Override 2714 public boolean contains(Object o) { 2715 return delegatee.contains(o); 2716 } 2717 2718 @Override 2719 public Iterator<T> iterator() { 2720 return delegatee.iterator(); 2721 } 2722 2723 @Override 2724 public Object[] toArray() { 2725 return delegatee.toArray(); 2726 } 2727 2728 @Override 2729 public <R> R[] toArray(R[] a) { 2730 return delegatee.toArray(a); 2731 } 2732 2733 @Override 2734 public boolean add(T e) { 2735 hookAtAdd.hook(size()); 2736 return delegatee.add(e); 2737 } 2738 2739 @Override 2740 public boolean remove(Object o) { 2741 return delegatee.remove(o); 2742 } 2743 2744 @Override 2745 public boolean containsAll(Collection<?> c) { 2746 return delegatee.containsAll(c); 2747 } 2748 2749 @Override 2750 public boolean addAll(Collection<? extends T> c) { 2751 return delegatee.addAll(c); 2752 } 2753 2754 @Override 2755 public boolean addAll(int index, Collection<? extends T> c) { 2756 return delegatee.addAll(index, c); 2757 } 2758 2759 @Override 2760 public boolean removeAll(Collection<?> c) { 2761 return delegatee.removeAll(c); 2762 } 2763 2764 @Override 2765 public boolean retainAll(Collection<?> c) { 2766 return delegatee.retainAll(c); 2767 } 2768 2769 @Override 2770 public void clear() { 2771 delegatee.clear(); 2772 } 2773 2774 @Override 2775 public T get(int index) { 2776 return delegatee.get(index); 2777 } 2778 2779 @Override 2780 public T set(int index, T element) { 2781 return delegatee.set(index, element); 2782 } 2783 2784 @Override 2785 public void add(int index, T element) { 2786 delegatee.add(index, element); 2787 } 2788 2789 @Override 2790 public T remove(int index) { 2791 return delegatee.remove(index); 2792 } 2793 2794 @Override 2795 public int indexOf(Object o) { 2796 return delegatee.indexOf(o); 2797 } 2798 2799 @Override 2800 public int lastIndexOf(Object o) { 2801 return delegatee.lastIndexOf(o); 2802 } 2803 2804 @Override 2805 public ListIterator<T> listIterator() { 2806 return delegatee.listIterator(); 2807 } 2808 2809 @Override 2810 public ListIterator<T> listIterator(int index) { 2811 return delegatee.listIterator(index); 2812 } 2813 2814 @Override 2815 public List<T> subList(int fromIndex, int toIndex) { 2816 return delegatee.subList(fromIndex, toIndex); 2817 } 2818 } 2819 2820 public static class MyCompactingMemStore2 extends CompactingMemStore { 2821 private static final String LARGE_CELL_THREAD_NAME = "largeCellThread"; 2822 private static final String SMALL_CELL_THREAD_NAME = "smallCellThread"; 2823 private final CyclicBarrier preCyclicBarrier = new CyclicBarrier(2); 2824 private final CyclicBarrier postCyclicBarrier = new CyclicBarrier(2); 2825 private final AtomicInteger largeCellPreUpdateCounter = new AtomicInteger(0); 2826 private final AtomicInteger smallCellPreUpdateCounter = new AtomicInteger(0); 2827 2828 public MyCompactingMemStore2(Configuration conf, CellComparatorImpl cellComparator, 2829 HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy) 2830 throws IOException { 2831 super(conf, cellComparator, store, regionServices, compactionPolicy); 2832 } 2833 2834 @Override 2835 protected boolean checkAndAddToActiveSize(MutableSegment currActive, Cell cellToAdd, 2836 MemStoreSizing memstoreSizing) { 2837 if (Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)) { 2838 int currentCount = largeCellPreUpdateCounter.incrementAndGet(); 2839 if (currentCount <= 1) { 2840 try { 2841 /** 2842 * smallCellThread enters CompactingMemStore.checkAndAddToActiveSize first, then 2843 * largeCellThread enters CompactingMemStore.checkAndAddToActiveSize, and then 2844 * largeCellThread invokes flushInMemory. 2845 */ 2846 preCyclicBarrier.await(); 2847 } catch (Throwable e) { 2848 throw new RuntimeException(e); 2849 } 2850 } 2851 } 2852 2853 boolean returnValue = super.checkAndAddToActiveSize(currActive, cellToAdd, memstoreSizing); 2854 if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) { 2855 try { 2856 preCyclicBarrier.await(); 2857 } catch (Throwable e) { 2858 throw new RuntimeException(e); 2859 } 2860 } 2861 return returnValue; 2862 } 2863 2864 @Override 2865 protected void doAdd(MutableSegment currentActive, Cell cell, MemStoreSizing memstoreSizing) { 2866 if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) { 2867 try { 2868 /** 2869 * After largeCellThread finished flushInMemory method, smallCellThread can add cell to 2870 * currentActive . That is to say when largeCellThread called flushInMemory method, 2871 * currentActive has no cell. 2872 */ 2873 postCyclicBarrier.await(); 2874 } catch (Throwable e) { 2875 throw new RuntimeException(e); 2876 } 2877 } 2878 super.doAdd(currentActive, cell, memstoreSizing); 2879 } 2880 2881 @Override 2882 protected void flushInMemory(MutableSegment currentActiveMutableSegment) { 2883 super.flushInMemory(currentActiveMutableSegment); 2884 if (Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)) { 2885 if (largeCellPreUpdateCounter.get() <= 1) { 2886 try { 2887 postCyclicBarrier.await(); 2888 } catch (Throwable e) { 2889 throw new RuntimeException(e); 2890 } 2891 } 2892 } 2893 } 2894 2895 } 2896 2897 public static class MyCompactingMemStore3 extends CompactingMemStore { 2898 private static final String LARGE_CELL_THREAD_NAME = "largeCellThread"; 2899 private static final String SMALL_CELL_THREAD_NAME = "smallCellThread"; 2900 2901 private final CyclicBarrier preCyclicBarrier = new CyclicBarrier(2); 2902 private final CyclicBarrier postCyclicBarrier = new CyclicBarrier(2); 2903 private final AtomicInteger flushCounter = new AtomicInteger(0); 2904 private static final int CELL_COUNT = 5; 2905 private boolean flushByteSizeLessThanSmallAndLargeCellSize = true; 2906 2907 public MyCompactingMemStore3(Configuration conf, CellComparatorImpl cellComparator, 2908 HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy) 2909 throws IOException { 2910 super(conf, cellComparator, store, regionServices, compactionPolicy); 2911 } 2912 2913 @Override 2914 protected boolean checkAndAddToActiveSize(MutableSegment currActive, Cell cellToAdd, 2915 MemStoreSizing memstoreSizing) { 2916 if (!flushByteSizeLessThanSmallAndLargeCellSize) { 2917 return super.checkAndAddToActiveSize(currActive, cellToAdd, memstoreSizing); 2918 } 2919 if (Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)) { 2920 try { 2921 preCyclicBarrier.await(); 2922 } catch (Throwable e) { 2923 throw new RuntimeException(e); 2924 } 2925 } 2926 2927 boolean returnValue = super.checkAndAddToActiveSize(currActive, cellToAdd, memstoreSizing); 2928 if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) { 2929 try { 2930 preCyclicBarrier.await(); 2931 } catch (Throwable e) { 2932 throw new RuntimeException(e); 2933 } 2934 } 2935 return returnValue; 2936 } 2937 2938 @Override 2939 protected void postUpdate(MutableSegment currentActiveMutableSegment) { 2940 super.postUpdate(currentActiveMutableSegment); 2941 if (!flushByteSizeLessThanSmallAndLargeCellSize) { 2942 try { 2943 postCyclicBarrier.await(); 2944 } catch (Throwable e) { 2945 throw new RuntimeException(e); 2946 } 2947 return; 2948 } 2949 2950 if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) { 2951 try { 2952 postCyclicBarrier.await(); 2953 } catch (Throwable e) { 2954 throw new RuntimeException(e); 2955 } 2956 } 2957 } 2958 2959 @Override 2960 protected void flushInMemory(MutableSegment currentActiveMutableSegment) { 2961 super.flushInMemory(currentActiveMutableSegment); 2962 flushCounter.incrementAndGet(); 2963 if (!flushByteSizeLessThanSmallAndLargeCellSize) { 2964 return; 2965 } 2966 2967 assertTrue(Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)); 2968 try { 2969 postCyclicBarrier.await(); 2970 } catch (Throwable e) { 2971 throw new RuntimeException(e); 2972 } 2973 2974 } 2975 2976 void disableCompaction() { 2977 allowCompaction.set(false); 2978 } 2979 2980 void enableCompaction() { 2981 allowCompaction.set(true); 2982 } 2983 2984 } 2985 2986 public static class MyCompactingMemStore4 extends CompactingMemStore { 2987 private static final String TAKE_SNAPSHOT_THREAD_NAME = "takeSnapShotThread"; 2988 /** 2989 * {@link CompactingMemStore#flattenOneSegment} must execute after 2990 * {@link CompactingMemStore#getImmutableSegments} 2991 */ 2992 private final CyclicBarrier flattenOneSegmentPreCyclicBarrier = new CyclicBarrier(2); 2993 /** 2994 * Only after {@link CompactingMemStore#flattenOneSegment} completed, 2995 * {@link CompactingMemStore#swapPipelineWithNull} could execute. 2996 */ 2997 private final CyclicBarrier flattenOneSegmentPostCyclicBarrier = new CyclicBarrier(2); 2998 /** 2999 * Only the in memory compact thread enters {@link CompactingMemStore#flattenOneSegment},the 3000 * snapshot thread starts {@link CompactingMemStore#snapshot},because 3001 * {@link CompactingMemStore#snapshot} would invoke {@link CompactingMemStore#stopCompaction}. 3002 */ 3003 private final CyclicBarrier snapShotStartCyclicCyclicBarrier = new CyclicBarrier(2); 3004 /** 3005 * To wait for {@link CompactingMemStore.InMemoryCompactionRunnable} stopping. 3006 */ 3007 private final CyclicBarrier inMemoryCompactionEndCyclicBarrier = new CyclicBarrier(2); 3008 private final AtomicInteger getImmutableSegmentsListCounter = new AtomicInteger(0); 3009 private final AtomicInteger swapPipelineWithNullCounter = new AtomicInteger(0); 3010 private final AtomicInteger flattenOneSegmentCounter = new AtomicInteger(0); 3011 private final AtomicInteger setInMemoryCompactionFlagCounter = new AtomicInteger(0); 3012 3013 public MyCompactingMemStore4(Configuration conf, CellComparatorImpl cellComparator, 3014 HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy) 3015 throws IOException { 3016 super(conf, cellComparator, store, regionServices, compactionPolicy); 3017 } 3018 3019 @Override 3020 public VersionedSegmentsList getImmutableSegments() { 3021 VersionedSegmentsList result = super.getImmutableSegments(); 3022 if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) { 3023 int currentCount = getImmutableSegmentsListCounter.incrementAndGet(); 3024 if (currentCount <= 1) { 3025 try { 3026 flattenOneSegmentPreCyclicBarrier.await(); 3027 } catch (Throwable e) { 3028 throw new RuntimeException(e); 3029 } 3030 } 3031 } 3032 return result; 3033 } 3034 3035 @Override 3036 protected boolean swapPipelineWithNull(VersionedSegmentsList segments) { 3037 if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) { 3038 int currentCount = swapPipelineWithNullCounter.incrementAndGet(); 3039 if (currentCount <= 1) { 3040 try { 3041 flattenOneSegmentPostCyclicBarrier.await(); 3042 } catch (Throwable e) { 3043 throw new RuntimeException(e); 3044 } 3045 } 3046 } 3047 boolean result = super.swapPipelineWithNull(segments); 3048 if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) { 3049 int currentCount = swapPipelineWithNullCounter.get(); 3050 if (currentCount <= 1) { 3051 assertTrue(!result); 3052 } 3053 if (currentCount == 2) { 3054 assertTrue(result); 3055 } 3056 } 3057 return result; 3058 3059 } 3060 3061 @Override 3062 public void flattenOneSegment(long requesterVersion, Action action) { 3063 int currentCount = flattenOneSegmentCounter.incrementAndGet(); 3064 if (currentCount <= 1) { 3065 try { 3066 /** 3067 * {@link CompactingMemStore#snapshot} could start. 3068 */ 3069 snapShotStartCyclicCyclicBarrier.await(); 3070 flattenOneSegmentPreCyclicBarrier.await(); 3071 } catch (Throwable e) { 3072 throw new RuntimeException(e); 3073 } 3074 } 3075 super.flattenOneSegment(requesterVersion, action); 3076 if (currentCount <= 1) { 3077 try { 3078 flattenOneSegmentPostCyclicBarrier.await(); 3079 } catch (Throwable e) { 3080 throw new RuntimeException(e); 3081 } 3082 } 3083 } 3084 3085 @Override 3086 protected boolean setInMemoryCompactionFlag() { 3087 boolean result = super.setInMemoryCompactionFlag(); 3088 assertTrue(result); 3089 setInMemoryCompactionFlagCounter.incrementAndGet(); 3090 return result; 3091 } 3092 3093 @Override 3094 void inMemoryCompaction() { 3095 try { 3096 super.inMemoryCompaction(); 3097 } finally { 3098 try { 3099 inMemoryCompactionEndCyclicBarrier.await(); 3100 } catch (Throwable e) { 3101 throw new RuntimeException(e); 3102 } 3103 3104 } 3105 } 3106 3107 } 3108 3109 public static class MyCompactingMemStore5 extends CompactingMemStore { 3110 private static final String TAKE_SNAPSHOT_THREAD_NAME = "takeSnapShotThread"; 3111 private static final String WRITE_AGAIN_THREAD_NAME = "writeAgainThread"; 3112 /** 3113 * {@link CompactingMemStore#flattenOneSegment} must execute after 3114 * {@link CompactingMemStore#getImmutableSegments} 3115 */ 3116 private final CyclicBarrier flattenOneSegmentPreCyclicBarrier = new CyclicBarrier(2); 3117 /** 3118 * Only after {@link CompactingMemStore#flattenOneSegment} completed, 3119 * {@link CompactingMemStore#swapPipelineWithNull} could execute. 3120 */ 3121 private final CyclicBarrier flattenOneSegmentPostCyclicBarrier = new CyclicBarrier(2); 3122 /** 3123 * Only the in memory compact thread enters {@link CompactingMemStore#flattenOneSegment},the 3124 * snapshot thread starts {@link CompactingMemStore#snapshot},because 3125 * {@link CompactingMemStore#snapshot} would invoke {@link CompactingMemStore#stopCompaction}. 3126 */ 3127 private final CyclicBarrier snapShotStartCyclicCyclicBarrier = new CyclicBarrier(2); 3128 /** 3129 * To wait for {@link CompactingMemStore.InMemoryCompactionRunnable} stopping. 3130 */ 3131 private final CyclicBarrier inMemoryCompactionEndCyclicBarrier = new CyclicBarrier(2); 3132 private final AtomicInteger getImmutableSegmentsListCounter = new AtomicInteger(0); 3133 private final AtomicInteger swapPipelineWithNullCounter = new AtomicInteger(0); 3134 private final AtomicInteger flattenOneSegmentCounter = new AtomicInteger(0); 3135 private final AtomicInteger setInMemoryCompactionFlagCounter = new AtomicInteger(0); 3136 /** 3137 * Only the snapshot thread retry {@link CompactingMemStore#swapPipelineWithNull}, writeAgain 3138 * thread could start. 3139 */ 3140 private final CyclicBarrier writeMemStoreAgainStartCyclicBarrier = new CyclicBarrier(2); 3141 /** 3142 * This is used for snapshot thread,writeAgain thread and in memory compact thread. Only the 3143 * writeAgain thread completes, {@link CompactingMemStore#swapPipelineWithNull} would 3144 * execute,and in memory compact thread would exit,because we expect that in memory compact 3145 * executing only once. 3146 */ 3147 private final CyclicBarrier writeMemStoreAgainEndCyclicBarrier = new CyclicBarrier(3); 3148 3149 public MyCompactingMemStore5(Configuration conf, CellComparatorImpl cellComparator, 3150 HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy) 3151 throws IOException { 3152 super(conf, cellComparator, store, regionServices, compactionPolicy); 3153 } 3154 3155 @Override 3156 public VersionedSegmentsList getImmutableSegments() { 3157 VersionedSegmentsList result = super.getImmutableSegments(); 3158 if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) { 3159 int currentCount = getImmutableSegmentsListCounter.incrementAndGet(); 3160 if (currentCount <= 1) { 3161 try { 3162 flattenOneSegmentPreCyclicBarrier.await(); 3163 } catch (Throwable e) { 3164 throw new RuntimeException(e); 3165 } 3166 } 3167 3168 } 3169 3170 return result; 3171 } 3172 3173 @Override 3174 protected boolean swapPipelineWithNull(VersionedSegmentsList segments) { 3175 if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) { 3176 int currentCount = swapPipelineWithNullCounter.incrementAndGet(); 3177 if (currentCount <= 1) { 3178 try { 3179 flattenOneSegmentPostCyclicBarrier.await(); 3180 } catch (Throwable e) { 3181 throw new RuntimeException(e); 3182 } 3183 } 3184 3185 if (currentCount == 2) { 3186 try { 3187 /** 3188 * Only the snapshot thread retry {@link CompactingMemStore#swapPipelineWithNull}, 3189 * writeAgain thread could start. 3190 */ 3191 writeMemStoreAgainStartCyclicBarrier.await(); 3192 /** 3193 * Only the writeAgain thread completes, retry 3194 * {@link CompactingMemStore#swapPipelineWithNull} would execute. 3195 */ 3196 writeMemStoreAgainEndCyclicBarrier.await(); 3197 } catch (Throwable e) { 3198 throw new RuntimeException(e); 3199 } 3200 } 3201 3202 } 3203 boolean result = super.swapPipelineWithNull(segments); 3204 if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) { 3205 int currentCount = swapPipelineWithNullCounter.get(); 3206 if (currentCount <= 1) { 3207 assertTrue(!result); 3208 } 3209 if (currentCount == 2) { 3210 assertTrue(result); 3211 } 3212 } 3213 return result; 3214 3215 } 3216 3217 @Override 3218 public void flattenOneSegment(long requesterVersion, Action action) { 3219 int currentCount = flattenOneSegmentCounter.incrementAndGet(); 3220 if (currentCount <= 1) { 3221 try { 3222 /** 3223 * {@link CompactingMemStore#snapshot} could start. 3224 */ 3225 snapShotStartCyclicCyclicBarrier.await(); 3226 flattenOneSegmentPreCyclicBarrier.await(); 3227 } catch (Throwable e) { 3228 throw new RuntimeException(e); 3229 } 3230 } 3231 super.flattenOneSegment(requesterVersion, action); 3232 if (currentCount <= 1) { 3233 try { 3234 flattenOneSegmentPostCyclicBarrier.await(); 3235 /** 3236 * Only the writeAgain thread completes, in memory compact thread would exit,because we 3237 * expect that in memory compact executing only once. 3238 */ 3239 writeMemStoreAgainEndCyclicBarrier.await(); 3240 } catch (Throwable e) { 3241 throw new RuntimeException(e); 3242 } 3243 3244 } 3245 } 3246 3247 @Override 3248 protected boolean setInMemoryCompactionFlag() { 3249 boolean result = super.setInMemoryCompactionFlag(); 3250 int count = setInMemoryCompactionFlagCounter.incrementAndGet(); 3251 if (count <= 1) { 3252 assertTrue(result); 3253 } 3254 if (count == 2) { 3255 assertTrue(!result); 3256 } 3257 return result; 3258 } 3259 3260 @Override 3261 void inMemoryCompaction() { 3262 try { 3263 super.inMemoryCompaction(); 3264 } finally { 3265 try { 3266 inMemoryCompactionEndCyclicBarrier.await(); 3267 } catch (Throwable e) { 3268 throw new RuntimeException(e); 3269 } 3270 3271 } 3272 } 3273 } 3274 3275 public static class MyCompactingMemStore6 extends CompactingMemStore { 3276 private final CyclicBarrier inMemoryCompactionEndCyclicBarrier = new CyclicBarrier(2); 3277 3278 public MyCompactingMemStore6(Configuration conf, CellComparatorImpl cellComparator, 3279 HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy) 3280 throws IOException { 3281 super(conf, cellComparator, store, regionServices, compactionPolicy); 3282 } 3283 3284 @Override 3285 void inMemoryCompaction() { 3286 try { 3287 super.inMemoryCompaction(); 3288 } finally { 3289 try { 3290 inMemoryCompactionEndCyclicBarrier.await(); 3291 } catch (Throwable e) { 3292 throw new RuntimeException(e); 3293 } 3294 3295 } 3296 } 3297 } 3298 3299 public static class MyDefaultMemStore extends DefaultMemStore { 3300 private static final String GET_SCANNER_THREAD_NAME = "getScannerMyThread"; 3301 private static final String FLUSH_THREAD_NAME = "flushMyThread"; 3302 /** 3303 * Only when flush thread enters {@link DefaultMemStore#doClearSnapShot}, getScanner thread 3304 * could start. 3305 */ 3306 private final CyclicBarrier getScannerCyclicBarrier = new CyclicBarrier(2); 3307 /** 3308 * Used by getScanner thread notifies flush thread {@link DefaultMemStore#getSnapshotSegments} 3309 * completed, {@link DefaultMemStore#doClearSnapShot} could continue. 3310 */ 3311 private final CyclicBarrier preClearSnapShotCyclicBarrier = new CyclicBarrier(2); 3312 /** 3313 * Used by flush thread notifies getScanner thread {@link DefaultMemStore#doClearSnapShot} 3314 * completed, {@link DefaultMemStore#getScanners} could continue. 3315 */ 3316 private final CyclicBarrier postClearSnapShotCyclicBarrier = new CyclicBarrier(2); 3317 private final AtomicInteger getSnapshotSegmentsCounter = new AtomicInteger(0); 3318 private final AtomicInteger clearSnapshotCounter = new AtomicInteger(0); 3319 private volatile boolean shouldWait = true; 3320 private volatile HStore store = null; 3321 3322 public MyDefaultMemStore(Configuration conf, CellComparator cellComparator, 3323 RegionServicesForStores regionServices) throws IOException { 3324 super(conf, cellComparator, regionServices); 3325 } 3326 3327 @Override 3328 protected List<Segment> getSnapshotSegments() { 3329 3330 List<Segment> result = super.getSnapshotSegments(); 3331 3332 if (Thread.currentThread().getName().equals(GET_SCANNER_THREAD_NAME)) { 3333 int currentCount = getSnapshotSegmentsCounter.incrementAndGet(); 3334 if (currentCount == 1) { 3335 if (this.shouldWait) { 3336 try { 3337 /** 3338 * Notify flush thread {@link DefaultMemStore#getSnapshotSegments} completed, 3339 * {@link DefaultMemStore#doClearSnapShot} could continue. 3340 */ 3341 preClearSnapShotCyclicBarrier.await(); 3342 /** 3343 * Wait for {@link DefaultMemStore#doClearSnapShot} completed. 3344 */ 3345 postClearSnapShotCyclicBarrier.await(); 3346 3347 } catch (Throwable e) { 3348 throw new RuntimeException(e); 3349 } 3350 } 3351 } 3352 } 3353 return result; 3354 } 3355 3356 @Override 3357 protected void doClearSnapShot() { 3358 if (Thread.currentThread().getName().equals(FLUSH_THREAD_NAME)) { 3359 int currentCount = clearSnapshotCounter.incrementAndGet(); 3360 if (currentCount == 1) { 3361 try { 3362 if ( 3363 ((ReentrantReadWriteLock) store.getStoreEngine().getLock()) 3364 .isWriteLockedByCurrentThread() 3365 ) { 3366 shouldWait = false; 3367 } 3368 /** 3369 * Only when flush thread enters {@link DefaultMemStore#doClearSnapShot}, getScanner 3370 * thread could start. 3371 */ 3372 getScannerCyclicBarrier.await(); 3373 3374 if (shouldWait) { 3375 /** 3376 * Wait for {@link DefaultMemStore#getSnapshotSegments} completed. 3377 */ 3378 preClearSnapShotCyclicBarrier.await(); 3379 } 3380 } catch (Throwable e) { 3381 throw new RuntimeException(e); 3382 } 3383 } 3384 } 3385 super.doClearSnapShot(); 3386 3387 if (Thread.currentThread().getName().equals(FLUSH_THREAD_NAME)) { 3388 int currentCount = clearSnapshotCounter.get(); 3389 if (currentCount == 1) { 3390 if (shouldWait) { 3391 try { 3392 /** 3393 * Notify getScanner thread {@link DefaultMemStore#doClearSnapShot} completed, 3394 * {@link DefaultMemStore#getScanners} could continue. 3395 */ 3396 postClearSnapShotCyclicBarrier.await(); 3397 } catch (Throwable e) { 3398 throw new RuntimeException(e); 3399 } 3400 } 3401 } 3402 } 3403 } 3404 } 3405}