001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.io.hfile.CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY; 021import static org.apache.hadoop.hbase.io.hfile.CacheConfig.CACHE_DATA_ON_READ_KEY; 022import static org.apache.hadoop.hbase.io.hfile.CacheConfig.DEFAULT_CACHE_DATA_ON_READ; 023import static org.apache.hadoop.hbase.io.hfile.CacheConfig.DEFAULT_CACHE_DATA_ON_WRITE; 024import static org.apache.hadoop.hbase.io.hfile.CacheConfig.DEFAULT_EVICT_ON_CLOSE; 025import static org.apache.hadoop.hbase.io.hfile.CacheConfig.EVICT_BLOCKS_ON_CLOSE_KEY; 026import static org.apache.hadoop.hbase.regionserver.DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY; 027import static org.junit.Assert.assertArrayEquals; 028import static org.junit.Assert.assertEquals; 029import static org.junit.Assert.assertFalse; 030import static org.junit.Assert.assertNull; 031import static org.junit.Assert.assertTrue; 032import static org.junit.Assert.fail; 033import static org.mockito.ArgumentMatchers.any; 034import static org.mockito.Mockito.mock; 035import static org.mockito.Mockito.spy; 036import static org.mockito.Mockito.times; 037import static org.mockito.Mockito.verify; 038import static org.mockito.Mockito.when; 039 040import java.io.FileNotFoundException; 041import java.io.IOException; 042import java.lang.ref.SoftReference; 043import java.security.PrivilegedExceptionAction; 044import java.util.ArrayList; 045import java.util.Arrays; 046import java.util.Collection; 047import java.util.Collections; 048import java.util.Iterator; 049import java.util.List; 050import java.util.ListIterator; 051import java.util.NavigableSet; 052import java.util.Optional; 053import java.util.TreeSet; 054import java.util.concurrent.BrokenBarrierException; 055import java.util.concurrent.ConcurrentSkipListSet; 056import java.util.concurrent.CountDownLatch; 057import java.util.concurrent.CyclicBarrier; 058import java.util.concurrent.ExecutorService; 059import java.util.concurrent.Executors; 060import java.util.concurrent.Future; 061import java.util.concurrent.ThreadPoolExecutor; 062import java.util.concurrent.TimeUnit; 063import java.util.concurrent.atomic.AtomicBoolean; 064import java.util.concurrent.atomic.AtomicInteger; 065import java.util.concurrent.atomic.AtomicLong; 066import java.util.concurrent.atomic.AtomicReference; 067import java.util.concurrent.locks.ReentrantReadWriteLock; 068import java.util.function.Consumer; 069import java.util.function.IntBinaryOperator; 070import org.apache.hadoop.conf.Configuration; 071import org.apache.hadoop.fs.FSDataOutputStream; 072import org.apache.hadoop.fs.FileStatus; 073import org.apache.hadoop.fs.FileSystem; 074import org.apache.hadoop.fs.FilterFileSystem; 075import org.apache.hadoop.fs.LocalFileSystem; 076import org.apache.hadoop.fs.Path; 077import org.apache.hadoop.fs.permission.FsPermission; 078import org.apache.hadoop.hbase.Cell; 079import org.apache.hadoop.hbase.CellBuilderType; 080import org.apache.hadoop.hbase.CellComparator; 081import org.apache.hadoop.hbase.CellComparatorImpl; 082import org.apache.hadoop.hbase.CellUtil; 083import org.apache.hadoop.hbase.ExtendedCell; 084import org.apache.hadoop.hbase.ExtendedCellBuilderFactory; 085import org.apache.hadoop.hbase.HBaseClassTestRule; 086import org.apache.hadoop.hbase.HBaseConfiguration; 087import org.apache.hadoop.hbase.HBaseTestingUtil; 088import org.apache.hadoop.hbase.HConstants; 089import org.apache.hadoop.hbase.KeyValue; 090import org.apache.hadoop.hbase.MemoryCompactionPolicy; 091import org.apache.hadoop.hbase.PrivateCellUtil; 092import org.apache.hadoop.hbase.TableName; 093import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 094import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 095import org.apache.hadoop.hbase.client.Get; 096import org.apache.hadoop.hbase.client.RegionInfo; 097import org.apache.hadoop.hbase.client.RegionInfoBuilder; 098import org.apache.hadoop.hbase.client.Scan; 099import org.apache.hadoop.hbase.client.Scan.ReadType; 100import org.apache.hadoop.hbase.client.TableDescriptor; 101import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 102import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException; 103import org.apache.hadoop.hbase.filter.Filter; 104import org.apache.hadoop.hbase.filter.FilterBase; 105import org.apache.hadoop.hbase.io.compress.Compression; 106import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 107import org.apache.hadoop.hbase.io.hfile.CacheConfig; 108import org.apache.hadoop.hbase.io.hfile.HFile; 109import org.apache.hadoop.hbase.io.hfile.HFileContext; 110import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 111import org.apache.hadoop.hbase.monitoring.MonitoredTask; 112import org.apache.hadoop.hbase.nio.RefCnt; 113import org.apache.hadoop.hbase.quotas.RegionSizeStoreImpl; 114import org.apache.hadoop.hbase.regionserver.ChunkCreator.ChunkType; 115import org.apache.hadoop.hbase.regionserver.MemStoreCompactionStrategy.Action; 116import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; 117import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; 118import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; 119import org.apache.hadoop.hbase.regionserver.compactions.EverythingPolicy; 120import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher; 121import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; 122import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 123import org.apache.hadoop.hbase.security.User; 124import org.apache.hadoop.hbase.testclassification.MediumTests; 125import org.apache.hadoop.hbase.testclassification.RegionServerTests; 126import org.apache.hadoop.hbase.util.BloomFilterUtil; 127import org.apache.hadoop.hbase.util.Bytes; 128import org.apache.hadoop.hbase.util.CommonFSUtils; 129import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 130import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; 131import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge; 132import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; 133import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 134import org.apache.hadoop.hbase.wal.WALFactory; 135import org.apache.hadoop.util.Progressable; 136import org.junit.After; 137import org.junit.AfterClass; 138import org.junit.Before; 139import org.junit.ClassRule; 140import org.junit.Rule; 141import org.junit.Test; 142import org.junit.experimental.categories.Category; 143import org.junit.rules.TestName; 144import org.mockito.Mockito; 145import org.slf4j.Logger; 146import org.slf4j.LoggerFactory; 147 148import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 149 150/** 151 * Test class for the HStore 152 */ 153@Category({ RegionServerTests.class, MediumTests.class }) 154public class TestHStore { 155 156 @ClassRule 157 public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestHStore.class); 158 159 private static final Logger LOG = LoggerFactory.getLogger(TestHStore.class); 160 @Rule 161 public TestName name = new TestName(); 162 163 HRegion region; 164 HStore store; 165 byte[] table = Bytes.toBytes("table"); 166 byte[] family = Bytes.toBytes("family"); 167 168 byte[] row = Bytes.toBytes("row"); 169 byte[] row2 = Bytes.toBytes("row2"); 170 byte[] qf1 = Bytes.toBytes("qf1"); 171 byte[] qf2 = Bytes.toBytes("qf2"); 172 byte[] qf3 = Bytes.toBytes("qf3"); 173 byte[] qf4 = Bytes.toBytes("qf4"); 174 byte[] qf5 = Bytes.toBytes("qf5"); 175 byte[] qf6 = Bytes.toBytes("qf6"); 176 177 NavigableSet<byte[]> qualifiers = new ConcurrentSkipListSet<>(Bytes.BYTES_COMPARATOR); 178 179 List<Cell> expected = new ArrayList<>(); 180 List<Cell> result = new ArrayList<>(); 181 182 long id = EnvironmentEdgeManager.currentTime(); 183 Get get = new Get(row); 184 185 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 186 private static final String DIR = TEST_UTIL.getDataTestDir("TestStore").toString(); 187 188 @Before 189 public void setUp() throws IOException { 190 qualifiers.clear(); 191 qualifiers.add(qf1); 192 qualifiers.add(qf3); 193 qualifiers.add(qf5); 194 195 Iterator<byte[]> iter = qualifiers.iterator(); 196 while (iter.hasNext()) { 197 byte[] next = iter.next(); 198 expected.add(new KeyValue(row, family, next, 1, (byte[]) null)); 199 get.addColumn(family, next); 200 } 201 } 202 203 private void init(String methodName) throws IOException { 204 init(methodName, TEST_UTIL.getConfiguration()); 205 } 206 207 private HStore init(String methodName, Configuration conf) throws IOException { 208 // some of the tests write 4 versions and then flush 209 // (with HBASE-4241, lower versions are collected on flush) 210 return init(methodName, conf, 211 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(4).build()); 212 } 213 214 private HStore init(String methodName, Configuration conf, ColumnFamilyDescriptor hcd) 215 throws IOException { 216 return init(methodName, conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), hcd); 217 } 218 219 private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder, 220 ColumnFamilyDescriptor hcd) throws IOException { 221 return init(methodName, conf, builder, hcd, null); 222 } 223 224 private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder, 225 ColumnFamilyDescriptor hcd, MyStoreHook hook) throws IOException { 226 return init(methodName, conf, builder, hcd, hook, false); 227 } 228 229 private void initHRegion(String methodName, Configuration conf, TableDescriptorBuilder builder, 230 ColumnFamilyDescriptor hcd, MyStoreHook hook, boolean switchToPread) throws IOException { 231 TableDescriptor htd = builder.setColumnFamily(hcd).build(); 232 Path basedir = new Path(DIR + methodName); 233 Path tableDir = CommonFSUtils.getTableDir(basedir, htd.getTableName()); 234 final Path logdir = new Path(basedir, AbstractFSWALProvider.getWALDirectoryName(methodName)); 235 236 FileSystem fs = FileSystem.get(conf); 237 238 fs.delete(logdir, true); 239 ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 240 MemStoreLABImpl.CHUNK_SIZE_DEFAULT, 1, 0, null, 241 MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); 242 RegionInfo info = RegionInfoBuilder.newBuilder(htd.getTableName()).build(); 243 Configuration walConf = new Configuration(conf); 244 CommonFSUtils.setRootDir(walConf, basedir); 245 WALFactory wals = new WALFactory(walConf, methodName); 246 region = new HRegion(new HRegionFileSystem(conf, fs, tableDir, info), wals.getWAL(info), conf, 247 htd, null); 248 region.regionServicesForStores = Mockito.spy(region.regionServicesForStores); 249 ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1); 250 Mockito.when(region.regionServicesForStores.getInMemoryCompactionPool()).thenReturn(pool); 251 } 252 253 private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder, 254 ColumnFamilyDescriptor hcd, MyStoreHook hook, boolean switchToPread) throws IOException { 255 initHRegion(methodName, conf, builder, hcd, hook, switchToPread); 256 if (hook == null) { 257 store = new HStore(region, hcd, conf, false); 258 } else { 259 store = new MyStore(region, hcd, conf, hook, switchToPread); 260 } 261 region.stores.put(store.getColumnFamilyDescriptor().getName(), store); 262 return store; 263 } 264 265 /** 266 * Test we do not lose data if we fail a flush and then close. Part of HBase-10466 267 */ 268 @Test 269 public void testFlushSizeSizing() throws Exception { 270 LOG.info("Setting up a faulty file system that cannot write in " + this.name.getMethodName()); 271 final Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); 272 // Only retry once. 273 conf.setInt("hbase.hstore.flush.retries.number", 1); 274 User user = User.createUserForTesting(conf, this.name.getMethodName(), new String[] { "foo" }); 275 // Inject our faulty LocalFileSystem 276 conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class); 277 user.runAs(new PrivilegedExceptionAction<Object>() { 278 @Override 279 public Object run() throws Exception { 280 // Make sure it worked (above is sensitive to caching details in hadoop core) 281 FileSystem fs = FileSystem.get(conf); 282 assertEquals(FaultyFileSystem.class, fs.getClass()); 283 FaultyFileSystem ffs = (FaultyFileSystem) fs; 284 285 // Initialize region 286 init(name.getMethodName(), conf); 287 288 MemStoreSize mss = store.memstore.getFlushableSize(); 289 assertEquals(0, mss.getDataSize()); 290 LOG.info("Adding some data"); 291 MemStoreSizing kvSize = new NonThreadSafeMemStoreSizing(); 292 store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), kvSize); 293 // add the heap size of active (mutable) segment 294 kvSize.incMemStoreSize(0, MutableSegment.DEEP_OVERHEAD, 0, 0); 295 mss = store.memstore.getFlushableSize(); 296 assertEquals(kvSize.getMemStoreSize(), mss); 297 // Flush. Bug #1 from HBASE-10466. Make sure size calculation on failed flush is right. 298 try { 299 LOG.info("Flushing"); 300 flushStore(store, id++); 301 fail("Didn't bubble up IOE!"); 302 } catch (IOException ioe) { 303 assertTrue(ioe.getMessage().contains("Fault injected")); 304 } 305 // due to snapshot, change mutable to immutable segment 306 kvSize.incMemStoreSize(0, 307 CSLMImmutableSegment.DEEP_OVERHEAD_CSLM - MutableSegment.DEEP_OVERHEAD, 0, 0); 308 mss = store.memstore.getFlushableSize(); 309 assertEquals(kvSize.getMemStoreSize(), mss); 310 MemStoreSizing kvSize2 = new NonThreadSafeMemStoreSizing(); 311 store.add(new KeyValue(row, family, qf2, 2, (byte[]) null), kvSize2); 312 kvSize2.incMemStoreSize(0, MutableSegment.DEEP_OVERHEAD, 0, 0); 313 // Even though we add a new kv, we expect the flushable size to be 'same' since we have 314 // not yet cleared the snapshot -- the above flush failed. 315 assertEquals(kvSize.getMemStoreSize(), mss); 316 ffs.fault.set(false); 317 flushStore(store, id++); 318 mss = store.memstore.getFlushableSize(); 319 // Size should be the foreground kv size. 320 assertEquals(kvSize2.getMemStoreSize(), mss); 321 flushStore(store, id++); 322 mss = store.memstore.getFlushableSize(); 323 assertEquals(0, mss.getDataSize()); 324 assertEquals(MutableSegment.DEEP_OVERHEAD, mss.getHeapSize()); 325 return null; 326 } 327 }); 328 } 329 330 @Test 331 public void testStoreBloomFilterMetricsWithBloomRowCol() throws IOException { 332 int numStoreFiles = 5; 333 writeAndRead(BloomType.ROWCOL, numStoreFiles); 334 335 assertEquals(0, store.getBloomFilterEligibleRequestsCount()); 336 // hard to know exactly the numbers here, we are just trying to 337 // prove that they are incrementing 338 assertTrue(store.getBloomFilterRequestsCount() >= numStoreFiles); 339 assertTrue(store.getBloomFilterNegativeResultsCount() > 0); 340 } 341 342 @Test 343 public void testStoreBloomFilterMetricsWithBloomRow() throws IOException { 344 int numStoreFiles = 5; 345 writeAndRead(BloomType.ROWCOL, numStoreFiles); 346 347 assertEquals(0, store.getBloomFilterEligibleRequestsCount()); 348 // hard to know exactly the numbers here, we are just trying to 349 // prove that they are incrementing 350 assertTrue(store.getBloomFilterRequestsCount() >= numStoreFiles); 351 assertTrue(store.getBloomFilterNegativeResultsCount() > 0); 352 } 353 354 @Test 355 public void testStoreBloomFilterMetricsWithBloomRowPrefix() throws IOException { 356 int numStoreFiles = 5; 357 writeAndRead(BloomType.ROWPREFIX_FIXED_LENGTH, numStoreFiles); 358 359 assertEquals(0, store.getBloomFilterEligibleRequestsCount()); 360 // hard to know exactly the numbers here, we are just trying to 361 // prove that they are incrementing 362 assertTrue(store.getBloomFilterRequestsCount() >= numStoreFiles); 363 } 364 365 @Test 366 public void testStoreBloomFilterMetricsWithBloomNone() throws IOException { 367 int numStoreFiles = 5; 368 writeAndRead(BloomType.NONE, numStoreFiles); 369 370 assertEquals(0, store.getBloomFilterRequestsCount()); 371 assertEquals(0, store.getBloomFilterNegativeResultsCount()); 372 373 // hard to know exactly the numbers here, we are just trying to 374 // prove that they are incrementing 375 assertTrue(store.getBloomFilterEligibleRequestsCount() >= numStoreFiles); 376 } 377 378 private void writeAndRead(BloomType bloomType, int numStoreFiles) throws IOException { 379 Configuration conf = HBaseConfiguration.create(); 380 FileSystem fs = FileSystem.get(conf); 381 382 ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.newBuilder(family) 383 .setCompressionType(Compression.Algorithm.GZ).setBloomFilterType(bloomType) 384 .setConfiguration(BloomFilterUtil.PREFIX_LENGTH_KEY, "3").build(); 385 init(name.getMethodName(), conf, hcd); 386 387 for (int i = 1; i <= numStoreFiles; i++) { 388 byte[] row = Bytes.toBytes("row" + i); 389 LOG.info("Adding some data for the store file #" + i); 390 long timeStamp = EnvironmentEdgeManager.currentTime(); 391 this.store.add(new KeyValue(row, family, qf1, timeStamp, (byte[]) null), null); 392 this.store.add(new KeyValue(row, family, qf2, timeStamp, (byte[]) null), null); 393 this.store.add(new KeyValue(row, family, qf3, timeStamp, (byte[]) null), null); 394 flush(i); 395 } 396 397 // Verify the total number of store files 398 assertEquals(numStoreFiles, this.store.getStorefiles().size()); 399 400 TreeSet<byte[]> columns = new TreeSet<>(Bytes.BYTES_COMPARATOR); 401 columns.add(qf1); 402 403 for (int i = 1; i <= numStoreFiles; i++) { 404 KeyValueScanner scanner = 405 store.getScanner(new Scan(new Get(Bytes.toBytes("row" + i))), columns, 0); 406 scanner.peek(); 407 } 408 } 409 410 /** 411 * Verify that compression and data block encoding are respected by the createWriter method, used 412 * on store flush. 413 */ 414 @Test 415 public void testCreateWriter() throws Exception { 416 Configuration conf = HBaseConfiguration.create(); 417 FileSystem fs = FileSystem.get(conf); 418 419 ColumnFamilyDescriptor hcd = 420 ColumnFamilyDescriptorBuilder.newBuilder(family).setCompressionType(Compression.Algorithm.GZ) 421 .setDataBlockEncoding(DataBlockEncoding.DIFF).build(); 422 init(name.getMethodName(), conf, hcd); 423 424 // Test createWriter 425 StoreFileWriter writer = store.getStoreEngine() 426 .createWriter(CreateStoreFileWriterParams.create().maxKeyCount(4) 427 .compression(hcd.getCompressionType()).isCompaction(false).includeMVCCReadpoint(true) 428 .includesTag(false).shouldDropBehind(false)); 429 Path path = writer.getPath(); 430 writer.append(new KeyValue(row, family, qf1, Bytes.toBytes(1))); 431 writer.append(new KeyValue(row, family, qf2, Bytes.toBytes(2))); 432 writer.append(new KeyValue(row2, family, qf1, Bytes.toBytes(3))); 433 writer.append(new KeyValue(row2, family, qf2, Bytes.toBytes(4))); 434 writer.close(); 435 436 // Verify that compression and encoding settings are respected 437 HFile.Reader reader = HFile.createReader(fs, path, new CacheConfig(conf), true, conf); 438 assertEquals(hcd.getCompressionType(), reader.getTrailer().getCompressionCodec()); 439 assertEquals(hcd.getDataBlockEncoding(), reader.getDataBlockEncoding()); 440 reader.close(); 441 } 442 443 @Test 444 public void testDeleteExpiredStoreFiles() throws Exception { 445 testDeleteExpiredStoreFiles(0); 446 testDeleteExpiredStoreFiles(1); 447 } 448 449 /** 450 * @param minVersions the MIN_VERSIONS for the column family 451 */ 452 public void testDeleteExpiredStoreFiles(int minVersions) throws Exception { 453 int storeFileNum = 4; 454 int ttl = 4; 455 IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge(); 456 EnvironmentEdgeManagerTestHelper.injectEdge(edge); 457 458 Configuration conf = HBaseConfiguration.create(); 459 // Enable the expired store file deletion 460 conf.setBoolean("hbase.store.delete.expired.storefile", true); 461 // Set the compaction threshold higher to avoid normal compactions. 462 conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 5); 463 464 init(name.getMethodName() + "-" + minVersions, conf, ColumnFamilyDescriptorBuilder 465 .newBuilder(family).setMinVersions(minVersions).setTimeToLive(ttl).build()); 466 467 long storeTtl = this.store.getScanInfo().getTtl(); 468 long sleepTime = storeTtl / storeFileNum; 469 long timeStamp; 470 // There are 4 store files and the max time stamp difference among these 471 // store files will be (this.store.ttl / storeFileNum) 472 for (int i = 1; i <= storeFileNum; i++) { 473 LOG.info("Adding some data for the store file #" + i); 474 timeStamp = EnvironmentEdgeManager.currentTime(); 475 this.store.add(new KeyValue(row, family, qf1, timeStamp, (byte[]) null), null); 476 this.store.add(new KeyValue(row, family, qf2, timeStamp, (byte[]) null), null); 477 this.store.add(new KeyValue(row, family, qf3, timeStamp, (byte[]) null), null); 478 flush(i); 479 edge.incrementTime(sleepTime); 480 } 481 482 // Verify the total number of store files 483 assertEquals(storeFileNum, this.store.getStorefiles().size()); 484 485 // Each call will find one expired store file and delete it before compaction happens. 486 // There will be no compaction due to threshold above. Last file will not be replaced. 487 for (int i = 1; i <= storeFileNum - 1; i++) { 488 // verify the expired store file. 489 assertFalse(this.store.requestCompaction().isPresent()); 490 Collection<HStoreFile> sfs = this.store.getStorefiles(); 491 // Ensure i files are gone. 492 if (minVersions == 0) { 493 assertEquals(storeFileNum - i, sfs.size()); 494 // Ensure only non-expired files remain. 495 for (HStoreFile sf : sfs) { 496 assertTrue(sf.getReader().getMaxTimestamp() >= (edge.currentTime() - storeTtl)); 497 } 498 } else { 499 assertEquals(storeFileNum, sfs.size()); 500 } 501 // Let the next store file expired. 502 edge.incrementTime(sleepTime); 503 } 504 assertFalse(this.store.requestCompaction().isPresent()); 505 506 Collection<HStoreFile> sfs = this.store.getStorefiles(); 507 // Assert the last expired file is not removed. 508 if (minVersions == 0) { 509 assertEquals(1, sfs.size()); 510 } 511 long ts = sfs.iterator().next().getReader().getMaxTimestamp(); 512 assertTrue(ts < (edge.currentTime() - storeTtl)); 513 514 for (HStoreFile sf : sfs) { 515 sf.closeStoreFile(true); 516 } 517 } 518 519 @Test 520 public void testLowestModificationTime() throws Exception { 521 Configuration conf = HBaseConfiguration.create(); 522 FileSystem fs = FileSystem.get(conf); 523 // Initialize region 524 init(name.getMethodName(), conf); 525 526 int storeFileNum = 4; 527 for (int i = 1; i <= storeFileNum; i++) { 528 LOG.info("Adding some data for the store file #" + i); 529 this.store.add(new KeyValue(row, family, qf1, i, (byte[]) null), null); 530 this.store.add(new KeyValue(row, family, qf2, i, (byte[]) null), null); 531 this.store.add(new KeyValue(row, family, qf3, i, (byte[]) null), null); 532 flush(i); 533 } 534 // after flush; check the lowest time stamp 535 long lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles()); 536 long lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles()); 537 assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS); 538 539 // after compact; check the lowest time stamp 540 store.compact(store.requestCompaction().get(), NoLimitThroughputController.INSTANCE, null); 541 lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles()); 542 lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles()); 543 assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS); 544 } 545 546 private static long getLowestTimeStampFromFS(FileSystem fs, 547 final Collection<HStoreFile> candidates) throws IOException { 548 long minTs = Long.MAX_VALUE; 549 if (candidates.isEmpty()) { 550 return minTs; 551 } 552 Path[] p = new Path[candidates.size()]; 553 int i = 0; 554 for (HStoreFile sf : candidates) { 555 p[i] = sf.getPath(); 556 ++i; 557 } 558 559 FileStatus[] stats = fs.listStatus(p); 560 if (stats == null || stats.length == 0) { 561 return minTs; 562 } 563 for (FileStatus s : stats) { 564 minTs = Math.min(minTs, s.getModificationTime()); 565 } 566 return minTs; 567 } 568 569 ////////////////////////////////////////////////////////////////////////////// 570 // Get tests 571 ////////////////////////////////////////////////////////////////////////////// 572 573 private static final int BLOCKSIZE_SMALL = 8192; 574 575 /** 576 * Test for hbase-1686. 577 */ 578 @Test 579 public void testEmptyStoreFile() throws IOException { 580 init(this.name.getMethodName()); 581 // Write a store file. 582 this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null); 583 this.store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null); 584 flush(1); 585 // Now put in place an empty store file. Its a little tricky. Have to 586 // do manually with hacked in sequence id. 587 HStoreFile f = this.store.getStorefiles().iterator().next(); 588 Path storedir = f.getPath().getParent(); 589 long seqid = f.getMaxSequenceId(); 590 Configuration c = HBaseConfiguration.create(); 591 FileSystem fs = FileSystem.get(c); 592 HFileContext meta = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).build(); 593 StoreFileWriter w = new StoreFileWriter.Builder(c, new CacheConfig(c), fs) 594 .withOutputDir(storedir).withFileContext(meta).build(); 595 w.appendMetadata(seqid + 1, false); 596 w.close(); 597 this.store.close(); 598 // Reopen it... should pick up two files 599 this.store = 600 new HStore(this.store.getHRegion(), this.store.getColumnFamilyDescriptor(), c, false); 601 assertEquals(2, this.store.getStorefilesCount()); 602 603 result = HBaseTestingUtil.getFromStoreFile(store, get.getRow(), qualifiers); 604 assertEquals(1, result.size()); 605 } 606 607 /** 608 * Getting data from memstore only 609 */ 610 @Test 611 public void testGet_FromMemStoreOnly() throws IOException { 612 init(this.name.getMethodName()); 613 614 // Put data in memstore 615 this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null); 616 this.store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null); 617 this.store.add(new KeyValue(row, family, qf3, 1, (byte[]) null), null); 618 this.store.add(new KeyValue(row, family, qf4, 1, (byte[]) null), null); 619 this.store.add(new KeyValue(row, family, qf5, 1, (byte[]) null), null); 620 this.store.add(new KeyValue(row, family, qf6, 1, (byte[]) null), null); 621 622 // Get 623 result = HBaseTestingUtil.getFromStoreFile(store, get.getRow(), qualifiers); 624 625 // Compare 626 assertCheck(); 627 } 628 629 @Test 630 public void testTimeRangeIfSomeCellsAreDroppedInFlush() throws IOException { 631 testTimeRangeIfSomeCellsAreDroppedInFlush(1); 632 testTimeRangeIfSomeCellsAreDroppedInFlush(3); 633 testTimeRangeIfSomeCellsAreDroppedInFlush(5); 634 } 635 636 private void testTimeRangeIfSomeCellsAreDroppedInFlush(int maxVersion) throws IOException { 637 init(this.name.getMethodName(), TEST_UTIL.getConfiguration(), 638 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(maxVersion).build()); 639 long currentTs = 100; 640 long minTs = currentTs; 641 // the extra cell won't be flushed to disk, 642 // so the min of timerange will be different between memStore and hfile. 643 for (int i = 0; i != (maxVersion + 1); ++i) { 644 this.store.add(new KeyValue(row, family, qf1, ++currentTs, (byte[]) null), null); 645 if (i == 1) { 646 minTs = currentTs; 647 } 648 } 649 flushStore(store, id++); 650 651 Collection<HStoreFile> files = store.getStorefiles(); 652 assertEquals(1, files.size()); 653 HStoreFile f = files.iterator().next(); 654 f.initReader(); 655 StoreFileReader reader = f.getReader(); 656 assertEquals(minTs, reader.timeRange.getMin()); 657 assertEquals(currentTs, reader.timeRange.getMax()); 658 } 659 660 /** 661 * Getting data from files only 662 */ 663 @Test 664 public void testGet_FromFilesOnly() throws IOException { 665 init(this.name.getMethodName()); 666 667 // Put data in memstore 668 this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null); 669 this.store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null); 670 // flush 671 flush(1); 672 673 // Add more data 674 this.store.add(new KeyValue(row, family, qf3, 1, (byte[]) null), null); 675 this.store.add(new KeyValue(row, family, qf4, 1, (byte[]) null), null); 676 // flush 677 flush(2); 678 679 // Add more data 680 this.store.add(new KeyValue(row, family, qf5, 1, (byte[]) null), null); 681 this.store.add(new KeyValue(row, family, qf6, 1, (byte[]) null), null); 682 // flush 683 flush(3); 684 685 // Get 686 result = HBaseTestingUtil.getFromStoreFile(store, get.getRow(), qualifiers); 687 // this.store.get(get, qualifiers, result); 688 689 // Need to sort the result since multiple files 690 Collections.sort(result, CellComparatorImpl.COMPARATOR); 691 692 // Compare 693 assertCheck(); 694 } 695 696 /** 697 * Getting data from memstore and files 698 */ 699 @Test 700 public void testGet_FromMemStoreAndFiles() throws IOException { 701 init(this.name.getMethodName()); 702 703 // Put data in memstore 704 this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null); 705 this.store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null); 706 // flush 707 flush(1); 708 709 // Add more data 710 this.store.add(new KeyValue(row, family, qf3, 1, (byte[]) null), null); 711 this.store.add(new KeyValue(row, family, qf4, 1, (byte[]) null), null); 712 // flush 713 flush(2); 714 715 // Add more data 716 this.store.add(new KeyValue(row, family, qf5, 1, (byte[]) null), null); 717 this.store.add(new KeyValue(row, family, qf6, 1, (byte[]) null), null); 718 719 // Get 720 result = HBaseTestingUtil.getFromStoreFile(store, get.getRow(), qualifiers); 721 722 // Need to sort the result since multiple files 723 Collections.sort(result, CellComparatorImpl.COMPARATOR); 724 725 // Compare 726 assertCheck(); 727 } 728 729 private void flush(int storeFilessize) throws IOException { 730 flushStore(store, id++); 731 assertEquals(storeFilessize, this.store.getStorefiles().size()); 732 assertEquals(0, ((AbstractMemStore) this.store.memstore).getActive().getCellsCount()); 733 } 734 735 private void assertCheck() { 736 assertEquals(expected.size(), result.size()); 737 for (int i = 0; i < expected.size(); i++) { 738 assertEquals(expected.get(i), result.get(i)); 739 } 740 } 741 742 @After 743 public void tearDown() throws Exception { 744 EnvironmentEdgeManagerTestHelper.reset(); 745 if (store != null) { 746 try { 747 store.close(); 748 } catch (IOException e) { 749 } 750 store = null; 751 } 752 if (region != null) { 753 region.close(); 754 region = null; 755 } 756 } 757 758 @AfterClass 759 public static void tearDownAfterClass() throws IOException { 760 TEST_UTIL.cleanupTestDir(); 761 } 762 763 @Test 764 public void testHandleErrorsInFlush() throws Exception { 765 LOG.info("Setting up a faulty file system that cannot write"); 766 767 final Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); 768 User user = User.createUserForTesting(conf, "testhandleerrorsinflush", new String[] { "foo" }); 769 // Inject our faulty LocalFileSystem 770 conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class); 771 user.runAs(new PrivilegedExceptionAction<Object>() { 772 @Override 773 public Object run() throws Exception { 774 // Make sure it worked (above is sensitive to caching details in hadoop core) 775 FileSystem fs = FileSystem.get(conf); 776 assertEquals(FaultyFileSystem.class, fs.getClass()); 777 778 // Initialize region 779 init(name.getMethodName(), conf); 780 781 LOG.info("Adding some data"); 782 store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null); 783 store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null); 784 store.add(new KeyValue(row, family, qf3, 1, (byte[]) null), null); 785 786 LOG.info("Before flush, we should have no files"); 787 788 Collection<StoreFileInfo> files = 789 store.getRegionFileSystem().getStoreFiles(store.getColumnFamilyName()); 790 assertEquals(0, files != null ? files.size() : 0); 791 792 // flush 793 try { 794 LOG.info("Flushing"); 795 flush(1); 796 fail("Didn't bubble up IOE!"); 797 } catch (IOException ioe) { 798 assertTrue(ioe.getMessage().contains("Fault injected")); 799 } 800 801 LOG.info("After failed flush, we should still have no files!"); 802 files = store.getRegionFileSystem().getStoreFiles(store.getColumnFamilyName()); 803 assertEquals(0, files != null ? files.size() : 0); 804 store.getHRegion().getWAL().close(); 805 return null; 806 } 807 }); 808 FileSystem.closeAllForUGI(user.getUGI()); 809 } 810 811 /** 812 * Faulty file system that will fail if you write past its fault position the FIRST TIME only; 813 * thereafter it will succeed. Used by {@link TestHRegion} too. 814 */ 815 static class FaultyFileSystem extends FilterFileSystem { 816 List<SoftReference<FaultyOutputStream>> outStreams = new ArrayList<>(); 817 private long faultPos = 200; 818 AtomicBoolean fault = new AtomicBoolean(true); 819 820 public FaultyFileSystem() { 821 super(new LocalFileSystem()); 822 LOG.info("Creating faulty!"); 823 } 824 825 @Override 826 public FSDataOutputStream create(Path p) throws IOException { 827 return new FaultyOutputStream(super.create(p), faultPos, fault); 828 } 829 830 @Override 831 public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, 832 int bufferSize, short replication, long blockSize, Progressable progress) throws IOException { 833 return new FaultyOutputStream( 834 super.create(f, permission, overwrite, bufferSize, replication, blockSize, progress), 835 faultPos, fault); 836 } 837 838 @Override 839 public FSDataOutputStream createNonRecursive(Path f, boolean overwrite, int bufferSize, 840 short replication, long blockSize, Progressable progress) throws IOException { 841 // Fake it. Call create instead. The default implementation throws an IOE 842 // that this is not supported. 843 return create(f, overwrite, bufferSize, replication, blockSize, progress); 844 } 845 } 846 847 static class FaultyOutputStream extends FSDataOutputStream { 848 volatile long faultPos = Long.MAX_VALUE; 849 private final AtomicBoolean fault; 850 851 public FaultyOutputStream(FSDataOutputStream out, long faultPos, final AtomicBoolean fault) 852 throws IOException { 853 super(out, null); 854 this.faultPos = faultPos; 855 this.fault = fault; 856 } 857 858 @Override 859 public synchronized void write(byte[] buf, int offset, int length) throws IOException { 860 LOG.info("faulty stream write at pos " + getPos()); 861 injectFault(); 862 super.write(buf, offset, length); 863 } 864 865 private void injectFault() throws IOException { 866 if (this.fault.get() && getPos() >= faultPos) { 867 throw new IOException("Fault injected"); 868 } 869 } 870 } 871 872 private static StoreFlushContext flushStore(HStore store, long id) throws IOException { 873 StoreFlushContext storeFlushCtx = store.createFlushContext(id, FlushLifeCycleTracker.DUMMY); 874 storeFlushCtx.prepare(); 875 storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class)); 876 storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); 877 return storeFlushCtx; 878 } 879 880 /** 881 * Generate a list of KeyValues for testing based on given parameters 882 * @return the rows key-value list 883 */ 884 private List<ExtendedCell> getKeyValueSet(long[] timestamps, int numRows, byte[] qualifier, 885 byte[] family) { 886 List<ExtendedCell> kvList = new ArrayList<>(); 887 for (int i = 1; i <= numRows; i++) { 888 byte[] b = Bytes.toBytes(i); 889 for (long timestamp : timestamps) { 890 kvList.add(new KeyValue(b, family, qualifier, timestamp, b)); 891 } 892 } 893 return kvList; 894 } 895 896 /** 897 * Test to ensure correctness when using Stores with multiple timestamps 898 */ 899 @Test 900 public void testMultipleTimestamps() throws IOException { 901 int numRows = 1; 902 long[] timestamps1 = new long[] { 1, 5, 10, 20 }; 903 long[] timestamps2 = new long[] { 30, 80 }; 904 905 init(this.name.getMethodName()); 906 907 List<ExtendedCell> kvList1 = getKeyValueSet(timestamps1, numRows, qf1, family); 908 for (ExtendedCell kv : kvList1) { 909 this.store.add(kv, null); 910 } 911 912 flushStore(store, id++); 913 914 List<ExtendedCell> kvList2 = getKeyValueSet(timestamps2, numRows, qf1, family); 915 for (ExtendedCell kv : kvList2) { 916 this.store.add(kv, null); 917 } 918 919 List<Cell> result; 920 Get get = new Get(Bytes.toBytes(1)); 921 get.addColumn(family, qf1); 922 923 get.setTimeRange(0, 15); 924 result = HBaseTestingUtil.getFromStoreFile(store, get); 925 assertTrue(result.size() > 0); 926 927 get.setTimeRange(40, 90); 928 result = HBaseTestingUtil.getFromStoreFile(store, get); 929 assertTrue(result.size() > 0); 930 931 get.setTimeRange(10, 45); 932 result = HBaseTestingUtil.getFromStoreFile(store, get); 933 assertTrue(result.size() > 0); 934 935 get.setTimeRange(80, 145); 936 result = HBaseTestingUtil.getFromStoreFile(store, get); 937 assertTrue(result.size() > 0); 938 939 get.setTimeRange(1, 2); 940 result = HBaseTestingUtil.getFromStoreFile(store, get); 941 assertTrue(result.size() > 0); 942 943 get.setTimeRange(90, 200); 944 result = HBaseTestingUtil.getFromStoreFile(store, get); 945 assertTrue(result.size() == 0); 946 } 947 948 /** 949 * Test for HBASE-3492 - Test split on empty colfam (no store files). 950 * @throws IOException When the IO operations fail. 951 */ 952 @Test 953 public void testSplitWithEmptyColFam() throws IOException { 954 init(this.name.getMethodName()); 955 assertFalse(store.getSplitPoint().isPresent()); 956 } 957 958 @Test 959 public void testStoreUsesConfigurationFromHcdAndHtd() throws Exception { 960 final String CONFIG_KEY = "hbase.regionserver.thread.compaction.throttle"; 961 long anyValue = 10; 962 963 // We'll check that it uses correct config and propagates it appropriately by going thru 964 // the simplest "real" path I can find - "throttleCompaction", which just checks whether 965 // a number we pass in is higher than some config value, inside compactionPolicy. 966 Configuration conf = HBaseConfiguration.create(); 967 conf.setLong(CONFIG_KEY, anyValue); 968 init(name.getMethodName() + "-xml", conf); 969 assertTrue(store.throttleCompaction(anyValue + 1)); 970 assertFalse(store.throttleCompaction(anyValue)); 971 972 // HTD overrides XML. 973 --anyValue; 974 init( 975 name.getMethodName() + "-htd", conf, TableDescriptorBuilder 976 .newBuilder(TableName.valueOf(table)).setValue(CONFIG_KEY, Long.toString(anyValue)), 977 ColumnFamilyDescriptorBuilder.of(family)); 978 assertTrue(store.throttleCompaction(anyValue + 1)); 979 assertFalse(store.throttleCompaction(anyValue)); 980 981 // HCD overrides them both. 982 --anyValue; 983 init(name.getMethodName() + "-hcd", conf, 984 TableDescriptorBuilder.newBuilder(TableName.valueOf(table)).setValue(CONFIG_KEY, 985 Long.toString(anyValue)), 986 ColumnFamilyDescriptorBuilder.newBuilder(family).setValue(CONFIG_KEY, Long.toString(anyValue)) 987 .build()); 988 assertTrue(store.throttleCompaction(anyValue + 1)); 989 assertFalse(store.throttleCompaction(anyValue)); 990 } 991 992 public static class DummyStoreEngine extends DefaultStoreEngine { 993 public static DefaultCompactor lastCreatedCompactor = null; 994 995 @Override 996 protected void createComponents(Configuration conf, HStore store, CellComparator comparator) 997 throws IOException { 998 super.createComponents(conf, store, comparator); 999 lastCreatedCompactor = this.compactor; 1000 } 1001 } 1002 1003 @Test 1004 public void testStoreUsesSearchEngineOverride() throws Exception { 1005 Configuration conf = HBaseConfiguration.create(); 1006 conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DummyStoreEngine.class.getName()); 1007 init(this.name.getMethodName(), conf); 1008 assertEquals(DummyStoreEngine.lastCreatedCompactor, this.store.storeEngine.getCompactor()); 1009 } 1010 1011 private void addStoreFile() throws IOException { 1012 HStoreFile f = this.store.getStorefiles().iterator().next(); 1013 Path storedir = f.getPath().getParent(); 1014 long seqid = this.store.getMaxSequenceId().orElse(0L); 1015 Configuration c = TEST_UTIL.getConfiguration(); 1016 FileSystem fs = FileSystem.get(c); 1017 HFileContext fileContext = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).build(); 1018 StoreFileWriter w = new StoreFileWriter.Builder(c, new CacheConfig(c), fs) 1019 .withOutputDir(storedir).withFileContext(fileContext).build(); 1020 w.appendMetadata(seqid + 1, false); 1021 w.close(); 1022 LOG.info("Added store file:" + w.getPath()); 1023 } 1024 1025 private void archiveStoreFile(int index) throws IOException { 1026 Collection<HStoreFile> files = this.store.getStorefiles(); 1027 HStoreFile sf = null; 1028 Iterator<HStoreFile> it = files.iterator(); 1029 for (int i = 0; i <= index; i++) { 1030 sf = it.next(); 1031 } 1032 store.getRegionFileSystem().removeStoreFiles(store.getColumnFamilyName(), 1033 Lists.newArrayList(sf)); 1034 } 1035 1036 private void closeCompactedFile(int index) throws IOException { 1037 Collection<HStoreFile> files = 1038 this.store.getStoreEngine().getStoreFileManager().getCompactedfiles(); 1039 if (files.size() > 0) { 1040 HStoreFile sf = null; 1041 Iterator<HStoreFile> it = files.iterator(); 1042 for (int i = 0; i <= index; i++) { 1043 sf = it.next(); 1044 } 1045 sf.closeStoreFile(true); 1046 store.getStoreEngine().getStoreFileManager() 1047 .removeCompactedFiles(Collections.singletonList(sf)); 1048 } 1049 } 1050 1051 @Test 1052 public void testRefreshStoreFiles() throws Exception { 1053 init(name.getMethodName()); 1054 1055 assertEquals(0, this.store.getStorefilesCount()); 1056 1057 // Test refreshing store files when no store files are there 1058 store.refreshStoreFiles(); 1059 assertEquals(0, this.store.getStorefilesCount()); 1060 1061 // add some data, flush 1062 this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null); 1063 flush(1); 1064 assertEquals(1, this.store.getStorefilesCount()); 1065 1066 // add one more file 1067 addStoreFile(); 1068 1069 assertEquals(1, this.store.getStorefilesCount()); 1070 store.refreshStoreFiles(); 1071 assertEquals(2, this.store.getStorefilesCount()); 1072 1073 // add three more files 1074 addStoreFile(); 1075 addStoreFile(); 1076 addStoreFile(); 1077 1078 assertEquals(2, this.store.getStorefilesCount()); 1079 store.refreshStoreFiles(); 1080 assertEquals(5, this.store.getStorefilesCount()); 1081 1082 closeCompactedFile(0); 1083 archiveStoreFile(0); 1084 1085 assertEquals(5, this.store.getStorefilesCount()); 1086 store.refreshStoreFiles(); 1087 assertEquals(4, this.store.getStorefilesCount()); 1088 1089 archiveStoreFile(0); 1090 archiveStoreFile(1); 1091 archiveStoreFile(2); 1092 1093 assertEquals(4, this.store.getStorefilesCount()); 1094 store.refreshStoreFiles(); 1095 assertEquals(1, this.store.getStorefilesCount()); 1096 1097 archiveStoreFile(0); 1098 store.refreshStoreFiles(); 1099 assertEquals(0, this.store.getStorefilesCount()); 1100 } 1101 1102 @Test 1103 public void testRefreshStoreFilesNotChanged() throws IOException { 1104 init(name.getMethodName()); 1105 1106 assertEquals(0, this.store.getStorefilesCount()); 1107 1108 // add some data, flush 1109 this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null); 1110 flush(1); 1111 // add one more file 1112 addStoreFile(); 1113 1114 StoreEngine<?, ?, ?, ?> spiedStoreEngine = spy(store.getStoreEngine()); 1115 1116 // call first time after files changed 1117 spiedStoreEngine.refreshStoreFiles(); 1118 assertEquals(2, this.store.getStorefilesCount()); 1119 verify(spiedStoreEngine, times(1)).replaceStoreFiles(any(), any(), any(), any()); 1120 1121 // call second time 1122 spiedStoreEngine.refreshStoreFiles(); 1123 1124 // ensure that replaceStoreFiles is not called, i.e, the times does not change, if files are not 1125 // refreshed, 1126 verify(spiedStoreEngine, times(1)).replaceStoreFiles(any(), any(), any(), any()); 1127 } 1128 1129 @Test 1130 public void testScanWithCompactionAfterFlush() throws Exception { 1131 TEST_UTIL.getConfiguration().set(DEFAULT_COMPACTION_POLICY_CLASS_KEY, 1132 EverythingPolicy.class.getName()); 1133 init(name.getMethodName()); 1134 1135 assertEquals(0, this.store.getStorefilesCount()); 1136 1137 KeyValue kv = new KeyValue(row, family, qf1, 1, (byte[]) null); 1138 // add some data, flush 1139 this.store.add(kv, null); 1140 flush(1); 1141 kv = new KeyValue(row, family, qf2, 1, (byte[]) null); 1142 // add some data, flush 1143 this.store.add(kv, null); 1144 flush(2); 1145 kv = new KeyValue(row, family, qf3, 1, (byte[]) null); 1146 // add some data, flush 1147 this.store.add(kv, null); 1148 flush(3); 1149 1150 ExecutorService service = Executors.newFixedThreadPool(2); 1151 1152 Scan scan = new Scan(new Get(row)); 1153 Future<KeyValueScanner> scanFuture = service.submit(() -> { 1154 try { 1155 LOG.info(">>>> creating scanner"); 1156 return this.store.createScanner(scan, 1157 new ScanInfo(HBaseConfiguration.create(), 1158 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(4).build(), 1159 Long.MAX_VALUE, 0, CellComparator.getInstance()), 1160 scan.getFamilyMap().get(store.getColumnFamilyDescriptor().getName()), 0); 1161 } catch (IOException e) { 1162 e.printStackTrace(); 1163 return null; 1164 } 1165 }); 1166 Future compactFuture = service.submit(() -> { 1167 try { 1168 LOG.info(">>>>>> starting compaction"); 1169 Optional<CompactionContext> opCompaction = this.store.requestCompaction(); 1170 assertTrue(opCompaction.isPresent()); 1171 store.compact(opCompaction.get(), new NoLimitThroughputController(), User.getCurrent()); 1172 LOG.info(">>>>>> Compaction is finished"); 1173 this.store.closeAndArchiveCompactedFiles(); 1174 LOG.info(">>>>>> Compacted files deleted"); 1175 } catch (IOException e) { 1176 e.printStackTrace(); 1177 } 1178 }); 1179 1180 KeyValueScanner kvs = scanFuture.get(); 1181 compactFuture.get(); 1182 ((StoreScanner) kvs).currentScanners.forEach(s -> { 1183 if (s instanceof StoreFileScanner) { 1184 assertEquals(1, ((StoreFileScanner) s).getReader().getRefCount()); 1185 } 1186 }); 1187 kvs.seek(kv); 1188 service.shutdownNow(); 1189 } 1190 1191 private long countMemStoreScanner(StoreScanner scanner) { 1192 if (scanner.currentScanners == null) { 1193 return 0; 1194 } 1195 return scanner.currentScanners.stream().filter(s -> !s.isFileScanner()).count(); 1196 } 1197 1198 @Test 1199 public void testNumberOfMemStoreScannersAfterFlush() throws IOException { 1200 long seqId = 100; 1201 long timestamp = EnvironmentEdgeManager.currentTime(); 1202 ExtendedCell cell0 = 1203 ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family) 1204 .setQualifier(qf1).setTimestamp(timestamp).setType(Cell.Type.Put).setValue(qf1).build(); 1205 PrivateCellUtil.setSequenceId(cell0, seqId); 1206 testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Collections.emptyList()); 1207 1208 ExtendedCell cell1 = 1209 ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family) 1210 .setQualifier(qf2).setTimestamp(timestamp).setType(Cell.Type.Put).setValue(qf1).build(); 1211 PrivateCellUtil.setSequenceId(cell1, seqId); 1212 testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Arrays.asList(cell1)); 1213 1214 seqId = 101; 1215 timestamp = EnvironmentEdgeManager.currentTime(); 1216 ExtendedCell cell2 = 1217 ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row2).setFamily(family) 1218 .setQualifier(qf2).setTimestamp(timestamp).setType(Cell.Type.Put).setValue(qf1).build(); 1219 PrivateCellUtil.setSequenceId(cell2, seqId); 1220 testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Arrays.asList(cell1, cell2)); 1221 } 1222 1223 private void testNumberOfMemStoreScannersAfterFlush(List<ExtendedCell> inputCellsBeforeSnapshot, 1224 List<ExtendedCell> inputCellsAfterSnapshot) throws IOException { 1225 init(this.name.getMethodName() + "-" + inputCellsBeforeSnapshot.size()); 1226 TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); 1227 long seqId = Long.MIN_VALUE; 1228 for (ExtendedCell c : inputCellsBeforeSnapshot) { 1229 quals.add(CellUtil.cloneQualifier(c)); 1230 seqId = Math.max(seqId, c.getSequenceId()); 1231 } 1232 for (ExtendedCell c : inputCellsAfterSnapshot) { 1233 quals.add(CellUtil.cloneQualifier(c)); 1234 seqId = Math.max(seqId, c.getSequenceId()); 1235 } 1236 inputCellsBeforeSnapshot.forEach(c -> store.add(c, null)); 1237 StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY); 1238 storeFlushCtx.prepare(); 1239 inputCellsAfterSnapshot.forEach(c -> store.add(c, null)); 1240 int numberOfMemScannersBeforeFlush = inputCellsAfterSnapshot.isEmpty() ? 1 : 2; 1241 try (StoreScanner s = (StoreScanner) store.getScanner(new Scan(), quals, seqId)) { 1242 // snapshot + active (if inputCellsAfterSnapshot isn't empty) 1243 assertEquals(numberOfMemScannersBeforeFlush, countMemStoreScanner(s)); 1244 storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class)); 1245 storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); 1246 // snapshot has no data after flush 1247 int numberOfMemScannersAfterFlush = inputCellsAfterSnapshot.isEmpty() ? 0 : 1; 1248 boolean more; 1249 int cellCount = 0; 1250 do { 1251 List<Cell> cells = new ArrayList<>(); 1252 more = s.next(cells); 1253 cellCount += cells.size(); 1254 assertEquals(more ? numberOfMemScannersAfterFlush : 0, countMemStoreScanner(s)); 1255 } while (more); 1256 assertEquals( 1257 "The number of cells added before snapshot is " + inputCellsBeforeSnapshot.size() 1258 + ", The number of cells added after snapshot is " + inputCellsAfterSnapshot.size(), 1259 inputCellsBeforeSnapshot.size() + inputCellsAfterSnapshot.size(), cellCount); 1260 // the current scanners is cleared 1261 assertEquals(0, countMemStoreScanner(s)); 1262 } 1263 } 1264 1265 private ExtendedCell createCell(byte[] qualifier, long ts, long sequenceId, byte[] value) 1266 throws IOException { 1267 return createCell(row, qualifier, ts, sequenceId, value); 1268 } 1269 1270 private ExtendedCell createCell(byte[] row, byte[] qualifier, long ts, long sequenceId, 1271 byte[] value) throws IOException { 1272 return ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row) 1273 .setFamily(family).setQualifier(qualifier).setTimestamp(ts).setType(Cell.Type.Put) 1274 .setValue(value).setSequenceId(sequenceId).build(); 1275 } 1276 1277 @Test 1278 public void testFlushBeforeCompletingScanWoFilter() throws IOException, InterruptedException { 1279 final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false); 1280 final int expectedSize = 3; 1281 testFlushBeforeCompletingScan(new MyListHook() { 1282 @Override 1283 public void hook(int currentSize) { 1284 if (currentSize == expectedSize - 1) { 1285 try { 1286 flushStore(store, id++); 1287 timeToGoNextRow.set(true); 1288 } catch (IOException e) { 1289 throw new RuntimeException(e); 1290 } 1291 } 1292 } 1293 }, new FilterBase() { 1294 @Override 1295 public Filter.ReturnCode filterCell(final Cell c) throws IOException { 1296 return ReturnCode.INCLUDE; 1297 } 1298 }, expectedSize); 1299 } 1300 1301 @Test 1302 public void testFlushBeforeCompletingScanWithFilter() throws IOException, InterruptedException { 1303 final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false); 1304 final int expectedSize = 2; 1305 testFlushBeforeCompletingScan(new MyListHook() { 1306 @Override 1307 public void hook(int currentSize) { 1308 if (currentSize == expectedSize - 1) { 1309 try { 1310 flushStore(store, id++); 1311 timeToGoNextRow.set(true); 1312 } catch (IOException e) { 1313 throw new RuntimeException(e); 1314 } 1315 } 1316 } 1317 }, new FilterBase() { 1318 @Override 1319 public Filter.ReturnCode filterCell(final Cell c) throws IOException { 1320 if (timeToGoNextRow.get()) { 1321 timeToGoNextRow.set(false); 1322 return ReturnCode.NEXT_ROW; 1323 } else { 1324 return ReturnCode.INCLUDE; 1325 } 1326 } 1327 }, expectedSize); 1328 } 1329 1330 @Test 1331 public void testFlushBeforeCompletingScanWithFilterHint() 1332 throws IOException, InterruptedException { 1333 final AtomicBoolean timeToGetHint = new AtomicBoolean(false); 1334 final int expectedSize = 2; 1335 testFlushBeforeCompletingScan(new MyListHook() { 1336 @Override 1337 public void hook(int currentSize) { 1338 if (currentSize == expectedSize - 1) { 1339 try { 1340 flushStore(store, id++); 1341 timeToGetHint.set(true); 1342 } catch (IOException e) { 1343 throw new RuntimeException(e); 1344 } 1345 } 1346 } 1347 }, new FilterBase() { 1348 @Override 1349 public Filter.ReturnCode filterCell(final Cell c) throws IOException { 1350 if (timeToGetHint.get()) { 1351 timeToGetHint.set(false); 1352 return Filter.ReturnCode.SEEK_NEXT_USING_HINT; 1353 } else { 1354 return Filter.ReturnCode.INCLUDE; 1355 } 1356 } 1357 1358 @Override 1359 public Cell getNextCellHint(Cell currentCell) throws IOException { 1360 return currentCell; 1361 } 1362 }, expectedSize); 1363 } 1364 1365 private void testFlushBeforeCompletingScan(MyListHook hook, Filter filter, int expectedSize) 1366 throws IOException, InterruptedException { 1367 Configuration conf = HBaseConfiguration.create(); 1368 byte[] r0 = Bytes.toBytes("row0"); 1369 byte[] r1 = Bytes.toBytes("row1"); 1370 byte[] r2 = Bytes.toBytes("row2"); 1371 byte[] value0 = Bytes.toBytes("value0"); 1372 byte[] value1 = Bytes.toBytes("value1"); 1373 byte[] value2 = Bytes.toBytes("value2"); 1374 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 1375 long ts = EnvironmentEdgeManager.currentTime(); 1376 long seqId = 100; 1377 init(name.getMethodName(), conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), 1378 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(1).build(), 1379 new MyStoreHook() { 1380 @Override 1381 public long getSmallestReadPoint(HStore store) { 1382 return seqId + 3; 1383 } 1384 }); 1385 // The cells having the value0 won't be flushed to disk because the value of max version is 1 1386 store.add(createCell(r0, qf1, ts, seqId, value0), memStoreSizing); 1387 store.add(createCell(r0, qf2, ts, seqId, value0), memStoreSizing); 1388 store.add(createCell(r0, qf3, ts, seqId, value0), memStoreSizing); 1389 store.add(createCell(r1, qf1, ts + 1, seqId + 1, value1), memStoreSizing); 1390 store.add(createCell(r1, qf2, ts + 1, seqId + 1, value1), memStoreSizing); 1391 store.add(createCell(r1, qf3, ts + 1, seqId + 1, value1), memStoreSizing); 1392 store.add(createCell(r2, qf1, ts + 2, seqId + 2, value2), memStoreSizing); 1393 store.add(createCell(r2, qf2, ts + 2, seqId + 2, value2), memStoreSizing); 1394 store.add(createCell(r2, qf3, ts + 2, seqId + 2, value2), memStoreSizing); 1395 store.add(createCell(r1, qf1, ts + 3, seqId + 3, value1), memStoreSizing); 1396 store.add(createCell(r1, qf2, ts + 3, seqId + 3, value1), memStoreSizing); 1397 store.add(createCell(r1, qf3, ts + 3, seqId + 3, value1), memStoreSizing); 1398 List<Cell> myList = new MyList<>(hook); 1399 Scan scan = new Scan().withStartRow(r1).setFilter(filter); 1400 try (InternalScanner scanner = (InternalScanner) store.getScanner(scan, null, seqId + 3)) { 1401 // r1 1402 scanner.next(myList); 1403 assertEquals(expectedSize, myList.size()); 1404 for (Cell c : myList) { 1405 byte[] actualValue = CellUtil.cloneValue(c); 1406 assertTrue("expected:" + Bytes.toStringBinary(value1) + ", actual:" 1407 + Bytes.toStringBinary(actualValue), Bytes.equals(actualValue, value1)); 1408 } 1409 List<Cell> normalList = new ArrayList<>(3); 1410 // r2 1411 scanner.next(normalList); 1412 assertEquals(3, normalList.size()); 1413 for (Cell c : normalList) { 1414 byte[] actualValue = CellUtil.cloneValue(c); 1415 assertTrue("expected:" + Bytes.toStringBinary(value2) + ", actual:" 1416 + Bytes.toStringBinary(actualValue), Bytes.equals(actualValue, value2)); 1417 } 1418 } 1419 } 1420 1421 @Test 1422 public void testCreateScannerAndSnapshotConcurrently() throws IOException, InterruptedException { 1423 Configuration conf = HBaseConfiguration.create(); 1424 conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore.class.getName()); 1425 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) 1426 .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); 1427 byte[] value = Bytes.toBytes("value"); 1428 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 1429 long ts = EnvironmentEdgeManager.currentTime(); 1430 long seqId = 100; 1431 // older data whihc shouldn't be "seen" by client 1432 store.add(createCell(qf1, ts, seqId, value), memStoreSizing); 1433 store.add(createCell(qf2, ts, seqId, value), memStoreSizing); 1434 store.add(createCell(qf3, ts, seqId, value), memStoreSizing); 1435 TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); 1436 quals.add(qf1); 1437 quals.add(qf2); 1438 quals.add(qf3); 1439 StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY); 1440 MyCompactingMemStore.START_TEST.set(true); 1441 Runnable flush = () -> { 1442 // this is blocked until we create first scanner from pipeline and snapshot -- phase (1/5) 1443 // recreate the active memstore -- phase (4/5) 1444 storeFlushCtx.prepare(); 1445 }; 1446 ExecutorService service = Executors.newSingleThreadExecutor(); 1447 service.execute(flush); 1448 // we get scanner from pipeline and snapshot but they are empty. -- phase (2/5) 1449 // this is blocked until we recreate the active memstore -- phase (3/5) 1450 // we get scanner from active memstore but it is empty -- phase (5/5) 1451 InternalScanner scanner = 1452 (InternalScanner) store.getScanner(new Scan(new Get(row)), quals, seqId + 1); 1453 service.shutdown(); 1454 service.awaitTermination(20, TimeUnit.SECONDS); 1455 try { 1456 try { 1457 List<Cell> results = new ArrayList<>(); 1458 scanner.next(results); 1459 assertEquals(3, results.size()); 1460 for (Cell c : results) { 1461 byte[] actualValue = CellUtil.cloneValue(c); 1462 assertTrue("expected:" + Bytes.toStringBinary(value) + ", actual:" 1463 + Bytes.toStringBinary(actualValue), Bytes.equals(actualValue, value)); 1464 } 1465 } finally { 1466 scanner.close(); 1467 } 1468 } finally { 1469 MyCompactingMemStore.START_TEST.set(false); 1470 storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class)); 1471 storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); 1472 } 1473 } 1474 1475 @Test 1476 public void testScanWithDoubleFlush() throws IOException { 1477 Configuration conf = HBaseConfiguration.create(); 1478 // Initialize region 1479 MyStore myStore = initMyStore(name.getMethodName(), conf, new MyStoreHook() { 1480 @Override 1481 public void getScanners(MyStore store) throws IOException { 1482 final long tmpId = id++; 1483 ExecutorService s = Executors.newSingleThreadExecutor(); 1484 s.execute(() -> { 1485 try { 1486 // flush the store before storescanner updates the scanners from store. 1487 // The current data will be flushed into files, and the memstore will 1488 // be clear. 1489 // -- phase (4/4) 1490 flushStore(store, tmpId); 1491 } catch (IOException ex) { 1492 throw new RuntimeException(ex); 1493 } 1494 }); 1495 s.shutdown(); 1496 try { 1497 // wait for the flush, the thread will be blocked in HStore#notifyChangedReadersObservers. 1498 s.awaitTermination(3, TimeUnit.SECONDS); 1499 } catch (InterruptedException ex) { 1500 } 1501 } 1502 }); 1503 byte[] oldValue = Bytes.toBytes("oldValue"); 1504 byte[] currentValue = Bytes.toBytes("currentValue"); 1505 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 1506 long ts = EnvironmentEdgeManager.currentTime(); 1507 long seqId = 100; 1508 // older data whihc shouldn't be "seen" by client 1509 myStore.add(createCell(qf1, ts, seqId, oldValue), memStoreSizing); 1510 myStore.add(createCell(qf2, ts, seqId, oldValue), memStoreSizing); 1511 myStore.add(createCell(qf3, ts, seqId, oldValue), memStoreSizing); 1512 long snapshotId = id++; 1513 // push older data into snapshot -- phase (1/4) 1514 StoreFlushContext storeFlushCtx = 1515 store.createFlushContext(snapshotId, FlushLifeCycleTracker.DUMMY); 1516 storeFlushCtx.prepare(); 1517 1518 // insert current data into active -- phase (2/4) 1519 myStore.add(createCell(qf1, ts + 1, seqId + 1, currentValue), memStoreSizing); 1520 myStore.add(createCell(qf2, ts + 1, seqId + 1, currentValue), memStoreSizing); 1521 myStore.add(createCell(qf3, ts + 1, seqId + 1, currentValue), memStoreSizing); 1522 TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); 1523 quals.add(qf1); 1524 quals.add(qf2); 1525 quals.add(qf3); 1526 try (InternalScanner scanner = 1527 (InternalScanner) myStore.getScanner(new Scan(new Get(row)), quals, seqId + 1)) { 1528 // complete the flush -- phase (3/4) 1529 storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class)); 1530 storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); 1531 1532 List<Cell> results = new ArrayList<>(); 1533 scanner.next(results); 1534 assertEquals(3, results.size()); 1535 for (Cell c : results) { 1536 byte[] actualValue = CellUtil.cloneValue(c); 1537 assertTrue("expected:" + Bytes.toStringBinary(currentValue) + ", actual:" 1538 + Bytes.toStringBinary(actualValue), Bytes.equals(actualValue, currentValue)); 1539 } 1540 } 1541 } 1542 1543 /** 1544 * This test is for HBASE-27519, when the {@link StoreScanner} is scanning,the Flush and the 1545 * Compaction execute concurrently and theCcompaction compact and archive the flushed 1546 * {@link HStoreFile} which is used by {@link StoreScanner#updateReaders}.Before 1547 * HBASE-27519,{@link StoreScanner.updateReaders} would throw {@link FileNotFoundException}. 1548 */ 1549 @Test 1550 public void testStoreScannerUpdateReadersWhenFlushAndCompactConcurrently() throws IOException { 1551 Configuration conf = HBaseConfiguration.create(); 1552 conf.setBoolean(WALFactory.WAL_ENABLED, false); 1553 conf.set(DEFAULT_COMPACTION_POLICY_CLASS_KEY, EverythingPolicy.class.getName()); 1554 byte[] r0 = Bytes.toBytes("row0"); 1555 byte[] r1 = Bytes.toBytes("row1"); 1556 final CyclicBarrier cyclicBarrier = new CyclicBarrier(2); 1557 final AtomicBoolean shouldWaitRef = new AtomicBoolean(false); 1558 // Initialize region 1559 final MyStore myStore = initMyStore(name.getMethodName(), conf, new MyStoreHook() { 1560 @Override 1561 public void getScanners(MyStore store) throws IOException { 1562 try { 1563 // Here this method is called by StoreScanner.updateReaders which is invoked by the 1564 // following TestHStore.flushStore 1565 if (shouldWaitRef.get()) { 1566 // wait the following compaction Task start 1567 cyclicBarrier.await(); 1568 // wait the following HStore.closeAndArchiveCompactedFiles end. 1569 cyclicBarrier.await(); 1570 } 1571 } catch (BrokenBarrierException | InterruptedException e) { 1572 throw new RuntimeException(e); 1573 } 1574 } 1575 }); 1576 1577 final AtomicReference<Throwable> compactionExceptionRef = new AtomicReference<Throwable>(null); 1578 Runnable compactionTask = () -> { 1579 try { 1580 // Only when the StoreScanner.updateReaders invoked by TestHStore.flushStore prepares for 1581 // entering the MyStore.getScanners, compactionTask could start. 1582 cyclicBarrier.await(); 1583 region.compactStore(family, new NoLimitThroughputController()); 1584 myStore.closeAndArchiveCompactedFiles(); 1585 // Notify StoreScanner.updateReaders could enter MyStore.getScanners. 1586 cyclicBarrier.await(); 1587 } catch (Throwable e) { 1588 compactionExceptionRef.set(e); 1589 } 1590 }; 1591 1592 long ts = EnvironmentEdgeManager.currentTime(); 1593 long seqId = 100; 1594 byte[] value = Bytes.toBytes("value"); 1595 // older data whihc shouldn't be "seen" by client 1596 myStore.add(createCell(r0, qf1, ts, seqId, value), null); 1597 flushStore(myStore, id++); 1598 myStore.add(createCell(r0, qf2, ts, seqId, value), null); 1599 flushStore(myStore, id++); 1600 myStore.add(createCell(r0, qf3, ts, seqId, value), null); 1601 TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); 1602 quals.add(qf1); 1603 quals.add(qf2); 1604 quals.add(qf3); 1605 1606 myStore.add(createCell(r1, qf1, ts, seqId, value), null); 1607 myStore.add(createCell(r1, qf2, ts, seqId, value), null); 1608 myStore.add(createCell(r1, qf3, ts, seqId, value), null); 1609 1610 Thread.currentThread() 1611 .setName("testStoreScannerUpdateReadersWhenFlushAndCompactConcurrently thread"); 1612 Scan scan = new Scan(); 1613 scan.withStartRow(r0, true); 1614 try (InternalScanner scanner = (InternalScanner) myStore.getScanner(scan, quals, seqId)) { 1615 List<Cell> results = new MyList<>(size -> { 1616 switch (size) { 1617 case 1: 1618 shouldWaitRef.set(true); 1619 Thread thread = new Thread(compactionTask); 1620 thread.setName("MyCompacting Thread."); 1621 thread.start(); 1622 try { 1623 flushStore(myStore, id++); 1624 thread.join(); 1625 } catch (IOException | InterruptedException e) { 1626 throw new RuntimeException(e); 1627 } 1628 shouldWaitRef.set(false); 1629 break; 1630 default: 1631 break; 1632 } 1633 }); 1634 // Before HBASE-27519, here would throw java.io.FileNotFoundException because the storeFile 1635 // which used by StoreScanner.updateReaders is deleted by compactionTask. 1636 scanner.next(results); 1637 // The results is r0 row cells. 1638 assertEquals(3, results.size()); 1639 assertTrue(compactionExceptionRef.get() == null); 1640 } 1641 } 1642 1643 @Test 1644 public void testReclaimChunkWhenScaning() throws IOException { 1645 init("testReclaimChunkWhenScaning"); 1646 long ts = EnvironmentEdgeManager.currentTime(); 1647 long seqId = 100; 1648 byte[] value = Bytes.toBytes("value"); 1649 // older data whihc shouldn't be "seen" by client 1650 store.add(createCell(qf1, ts, seqId, value), null); 1651 store.add(createCell(qf2, ts, seqId, value), null); 1652 store.add(createCell(qf3, ts, seqId, value), null); 1653 TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); 1654 quals.add(qf1); 1655 quals.add(qf2); 1656 quals.add(qf3); 1657 try (InternalScanner scanner = 1658 (InternalScanner) store.getScanner(new Scan(new Get(row)), quals, seqId)) { 1659 List<Cell> results = new MyList<>(size -> { 1660 switch (size) { 1661 // 1) we get the first cell (qf1) 1662 // 2) flush the data to have StoreScanner update inner scanners 1663 // 3) the chunk will be reclaimed after updaing 1664 case 1: 1665 try { 1666 flushStore(store, id++); 1667 } catch (IOException e) { 1668 throw new RuntimeException(e); 1669 } 1670 break; 1671 // 1) we get the second cell (qf2) 1672 // 2) add some cell to fill some byte into the chunk (we have only one chunk) 1673 case 2: 1674 try { 1675 byte[] newValue = Bytes.toBytes("newValue"); 1676 // older data whihc shouldn't be "seen" by client 1677 store.add(createCell(qf1, ts + 1, seqId + 1, newValue), null); 1678 store.add(createCell(qf2, ts + 1, seqId + 1, newValue), null); 1679 store.add(createCell(qf3, ts + 1, seqId + 1, newValue), null); 1680 } catch (IOException e) { 1681 throw new RuntimeException(e); 1682 } 1683 break; 1684 default: 1685 break; 1686 } 1687 }); 1688 scanner.next(results); 1689 assertEquals(3, results.size()); 1690 for (Cell c : results) { 1691 byte[] actualValue = CellUtil.cloneValue(c); 1692 assertTrue("expected:" + Bytes.toStringBinary(value) + ", actual:" 1693 + Bytes.toStringBinary(actualValue), Bytes.equals(actualValue, value)); 1694 } 1695 } 1696 } 1697 1698 /** 1699 * If there are two running InMemoryFlushRunnable, the later InMemoryFlushRunnable may change the 1700 * versionedList. And the first InMemoryFlushRunnable will use the chagned versionedList to remove 1701 * the corresponding segments. In short, there will be some segements which isn't in merge are 1702 * removed. 1703 */ 1704 @Test 1705 public void testRunDoubleMemStoreCompactors() throws IOException, InterruptedException { 1706 int flushSize = 500; 1707 Configuration conf = HBaseConfiguration.create(); 1708 conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStoreWithCustomCompactor.class.getName()); 1709 conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.25); 1710 MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.set(0); 1711 conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushSize)); 1712 // Set the lower threshold to invoke the "MERGE" policy 1713 conf.set(MemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY, String.valueOf(0)); 1714 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) 1715 .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); 1716 byte[] value = Bytes.toBytes("thisisavarylargevalue"); 1717 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 1718 long ts = EnvironmentEdgeManager.currentTime(); 1719 long seqId = 100; 1720 // older data whihc shouldn't be "seen" by client 1721 store.add(createCell(qf1, ts, seqId, value), memStoreSizing); 1722 store.add(createCell(qf2, ts, seqId, value), memStoreSizing); 1723 store.add(createCell(qf3, ts, seqId, value), memStoreSizing); 1724 assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get()); 1725 StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY); 1726 storeFlushCtx.prepare(); 1727 // This shouldn't invoke another in-memory flush because the first compactor thread 1728 // hasn't accomplished the in-memory compaction. 1729 store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing); 1730 store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing); 1731 store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing); 1732 assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get()); 1733 // okay. Let the compaction be completed 1734 MyMemStoreCompactor.START_COMPACTOR_LATCH.countDown(); 1735 CompactingMemStore mem = (CompactingMemStore) ((HStore) store).memstore; 1736 while (mem.isMemStoreFlushingInMemory()) { 1737 TimeUnit.SECONDS.sleep(1); 1738 } 1739 // This should invoke another in-memory flush. 1740 store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing); 1741 store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing); 1742 store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing); 1743 assertEquals(2, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get()); 1744 conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1745 String.valueOf(TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE)); 1746 storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class)); 1747 storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); 1748 } 1749 1750 @Test 1751 public void testAge() throws IOException { 1752 long currentTime = EnvironmentEdgeManager.currentTime(); 1753 ManualEnvironmentEdge edge = new ManualEnvironmentEdge(); 1754 edge.setValue(currentTime); 1755 EnvironmentEdgeManager.injectEdge(edge); 1756 Configuration conf = TEST_UTIL.getConfiguration(); 1757 ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.of(family); 1758 initHRegion(name.getMethodName(), conf, 1759 TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), hcd, null, false); 1760 HStore store = new HStore(region, hcd, conf, false) { 1761 1762 @Override 1763 protected StoreEngine<?, ?, ?, ?> createStoreEngine(HStore store, Configuration conf, 1764 CellComparator kvComparator) throws IOException { 1765 List<HStoreFile> storefiles = 1766 Arrays.asList(mockStoreFile(currentTime - 10), mockStoreFile(currentTime - 100), 1767 mockStoreFile(currentTime - 1000), mockStoreFile(currentTime - 10000)); 1768 StoreFileManager sfm = mock(StoreFileManager.class); 1769 when(sfm.getStoreFiles()).thenReturn(storefiles); 1770 StoreEngine<?, ?, ?, ?> storeEngine = mock(StoreEngine.class); 1771 when(storeEngine.getStoreFileManager()).thenReturn(sfm); 1772 return storeEngine; 1773 } 1774 }; 1775 assertEquals(10L, store.getMinStoreFileAge().getAsLong()); 1776 assertEquals(10000L, store.getMaxStoreFileAge().getAsLong()); 1777 assertEquals((10 + 100 + 1000 + 10000) / 4.0, store.getAvgStoreFileAge().getAsDouble(), 1E-4); 1778 } 1779 1780 private HStoreFile mockStoreFile(long createdTime) { 1781 StoreFileInfo info = mock(StoreFileInfo.class); 1782 when(info.getCreatedTimestamp()).thenReturn(createdTime); 1783 HStoreFile sf = mock(HStoreFile.class); 1784 when(sf.getReader()).thenReturn(mock(StoreFileReader.class)); 1785 when(sf.isHFile()).thenReturn(true); 1786 when(sf.getFileInfo()).thenReturn(info); 1787 return sf; 1788 } 1789 1790 private MyStore initMyStore(String methodName, Configuration conf, MyStoreHook hook) 1791 throws IOException { 1792 return (MyStore) init(methodName, conf, 1793 TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), 1794 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(5).build(), hook); 1795 } 1796 1797 private static class MyStore extends HStore { 1798 private final MyStoreHook hook; 1799 1800 MyStore(final HRegion region, final ColumnFamilyDescriptor family, 1801 final Configuration confParam, MyStoreHook hook, boolean switchToPread) throws IOException { 1802 super(region, family, confParam, false); 1803 this.hook = hook; 1804 } 1805 1806 @Override 1807 public List<KeyValueScanner> getScanners(List<HStoreFile> files, boolean cacheBlocks, 1808 boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, 1809 boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt, 1810 boolean includeMemstoreScanner, boolean onlyLatestVersion) throws IOException { 1811 hook.getScanners(this); 1812 return super.getScanners(files, cacheBlocks, usePread, isCompaction, matcher, startRow, true, 1813 stopRow, false, readPt, includeMemstoreScanner, onlyLatestVersion); 1814 } 1815 1816 @Override 1817 public long getSmallestReadPoint() { 1818 return hook.getSmallestReadPoint(this); 1819 } 1820 } 1821 1822 private abstract static class MyStoreHook { 1823 1824 void getScanners(MyStore store) throws IOException { 1825 } 1826 1827 long getSmallestReadPoint(HStore store) { 1828 return store.getHRegion().getSmallestReadPoint(); 1829 } 1830 } 1831 1832 @Test 1833 public void testSwitchingPreadtoStreamParallelyWithCompactionDischarger() throws Exception { 1834 Configuration conf = HBaseConfiguration.create(); 1835 conf.set("hbase.hstore.engine.class", DummyStoreEngine.class.getName()); 1836 conf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 0); 1837 // Set the lower threshold to invoke the "MERGE" policy 1838 MyStore store = initMyStore(name.getMethodName(), conf, new MyStoreHook() { 1839 }); 1840 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 1841 long ts = EnvironmentEdgeManager.currentTime(); 1842 long seqID = 1L; 1843 // Add some data to the region and do some flushes 1844 for (int i = 1; i < 10; i++) { 1845 store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), 1846 memStoreSizing); 1847 } 1848 // flush them 1849 flushStore(store, seqID); 1850 for (int i = 11; i < 20; i++) { 1851 store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), 1852 memStoreSizing); 1853 } 1854 // flush them 1855 flushStore(store, seqID); 1856 for (int i = 21; i < 30; i++) { 1857 store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), 1858 memStoreSizing); 1859 } 1860 // flush them 1861 flushStore(store, seqID); 1862 1863 assertEquals(3, store.getStorefilesCount()); 1864 Scan scan = new Scan(); 1865 scan.addFamily(family); 1866 Collection<HStoreFile> storefiles2 = store.getStorefiles(); 1867 ArrayList<HStoreFile> actualStorefiles = Lists.newArrayList(storefiles2); 1868 StoreScanner storeScanner = 1869 (StoreScanner) store.getScanner(scan, scan.getFamilyMap().get(family), Long.MAX_VALUE); 1870 // get the current heap 1871 KeyValueHeap heap = storeScanner.heap; 1872 // create more store files 1873 for (int i = 31; i < 40; i++) { 1874 store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), 1875 memStoreSizing); 1876 } 1877 // flush them 1878 flushStore(store, seqID); 1879 1880 for (int i = 41; i < 50; i++) { 1881 store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), 1882 memStoreSizing); 1883 } 1884 // flush them 1885 flushStore(store, seqID); 1886 storefiles2 = store.getStorefiles(); 1887 ArrayList<HStoreFile> actualStorefiles1 = Lists.newArrayList(storefiles2); 1888 actualStorefiles1.removeAll(actualStorefiles); 1889 // Do compaction 1890 MyThread thread = new MyThread(storeScanner); 1891 thread.start(); 1892 store.replaceStoreFiles(actualStorefiles, actualStorefiles1, false); 1893 thread.join(); 1894 KeyValueHeap heap2 = thread.getHeap(); 1895 assertFalse(heap.equals(heap2)); 1896 } 1897 1898 @Test 1899 public void testMaxPreadBytesConfiguredToBeLessThanZero() throws Exception { 1900 Configuration conf = HBaseConfiguration.create(); 1901 conf.set("hbase.hstore.engine.class", DummyStoreEngine.class.getName()); 1902 // Set 'hbase.storescanner.pread.max.bytes' < 0, so that StoreScanner will be a STREAM type. 1903 conf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, -1); 1904 MyStore store = initMyStore(name.getMethodName(), conf, new MyStoreHook() { 1905 }); 1906 Scan scan = new Scan(); 1907 scan.addFamily(family); 1908 // ReadType on Scan is still DEFAULT only. 1909 assertEquals(ReadType.DEFAULT, scan.getReadType()); 1910 StoreScanner storeScanner = 1911 (StoreScanner) store.getScanner(scan, scan.getFamilyMap().get(family), Long.MAX_VALUE); 1912 assertFalse(storeScanner.isScanUsePread()); 1913 } 1914 1915 @Test 1916 public void testSpaceQuotaChangeAfterReplacement() throws IOException { 1917 final TableName tn = TableName.valueOf(name.getMethodName()); 1918 init(name.getMethodName()); 1919 1920 RegionSizeStoreImpl sizeStore = new RegionSizeStoreImpl(); 1921 1922 HStoreFile sf1 = mockStoreFileWithLength(1024L); 1923 HStoreFile sf2 = mockStoreFileWithLength(2048L); 1924 HStoreFile sf3 = mockStoreFileWithLength(4096L); 1925 HStoreFile sf4 = mockStoreFileWithLength(8192L); 1926 1927 RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tn).setStartKey(Bytes.toBytes("a")) 1928 .setEndKey(Bytes.toBytes("b")).build(); 1929 1930 // Compacting two files down to one, reducing size 1931 sizeStore.put(regionInfo, 1024L + 4096L); 1932 store.updateSpaceQuotaAfterFileReplacement(sizeStore, regionInfo, Arrays.asList(sf1, sf3), 1933 Arrays.asList(sf2)); 1934 1935 assertEquals(2048L, sizeStore.getRegionSize(regionInfo).getSize()); 1936 1937 // The same file length in and out should have no change 1938 store.updateSpaceQuotaAfterFileReplacement(sizeStore, regionInfo, Arrays.asList(sf2), 1939 Arrays.asList(sf2)); 1940 1941 assertEquals(2048L, sizeStore.getRegionSize(regionInfo).getSize()); 1942 1943 // Increase the total size used 1944 store.updateSpaceQuotaAfterFileReplacement(sizeStore, regionInfo, Arrays.asList(sf2), 1945 Arrays.asList(sf3)); 1946 1947 assertEquals(4096L, sizeStore.getRegionSize(regionInfo).getSize()); 1948 1949 RegionInfo regionInfo2 = RegionInfoBuilder.newBuilder(tn).setStartKey(Bytes.toBytes("b")) 1950 .setEndKey(Bytes.toBytes("c")).build(); 1951 store.updateSpaceQuotaAfterFileReplacement(sizeStore, regionInfo2, null, Arrays.asList(sf4)); 1952 1953 assertEquals(8192L, sizeStore.getRegionSize(regionInfo2).getSize()); 1954 } 1955 1956 @Test 1957 public void testHFileContextSetWithCFAndTable() throws Exception { 1958 init(this.name.getMethodName()); 1959 StoreFileWriter writer = store.getStoreEngine() 1960 .createWriter(CreateStoreFileWriterParams.create().maxKeyCount(10000L) 1961 .compression(Compression.Algorithm.NONE).isCompaction(true).includeMVCCReadpoint(true) 1962 .includesTag(false).shouldDropBehind(true)); 1963 HFileContext hFileContext = writer.getLiveFileWriter().getFileContext(); 1964 assertArrayEquals(family, hFileContext.getColumnFamily()); 1965 assertArrayEquals(table, hFileContext.getTableName()); 1966 } 1967 1968 // This test is for HBASE-26026, HBase Write be stuck when active segment has no cell 1969 // but its dataSize exceeds inmemoryFlushSize 1970 @Test 1971 public void testCompactingMemStoreNoCellButDataSizeExceedsInmemoryFlushSize() 1972 throws IOException, InterruptedException { 1973 Configuration conf = HBaseConfiguration.create(); 1974 1975 byte[] smallValue = new byte[3]; 1976 byte[] largeValue = new byte[9]; 1977 final long timestamp = EnvironmentEdgeManager.currentTime(); 1978 final long seqId = 100; 1979 final ExtendedCell smallCell = createCell(qf1, timestamp, seqId, smallValue); 1980 final ExtendedCell largeCell = createCell(qf2, timestamp, seqId, largeValue); 1981 int smallCellByteSize = MutableSegment.getCellLength(smallCell); 1982 int largeCellByteSize = MutableSegment.getCellLength(largeCell); 1983 int flushByteSize = smallCellByteSize + largeCellByteSize - 2; 1984 1985 // set CompactingMemStore.inmemoryFlushSize to flushByteSize. 1986 conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore2.class.getName()); 1987 conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005); 1988 conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200)); 1989 1990 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) 1991 .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); 1992 1993 MyCompactingMemStore2 myCompactingMemStore = ((MyCompactingMemStore2) store.memstore); 1994 assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize); 1995 myCompactingMemStore.smallCellPreUpdateCounter.set(0); 1996 myCompactingMemStore.largeCellPreUpdateCounter.set(0); 1997 1998 final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>(); 1999 Thread smallCellThread = new Thread(() -> { 2000 try { 2001 store.add(smallCell, new NonThreadSafeMemStoreSizing()); 2002 } catch (Throwable exception) { 2003 exceptionRef.set(exception); 2004 } 2005 }); 2006 smallCellThread.setName(MyCompactingMemStore2.SMALL_CELL_THREAD_NAME); 2007 smallCellThread.start(); 2008 2009 String oldThreadName = Thread.currentThread().getName(); 2010 try { 2011 /** 2012 * 1.smallCellThread enters CompactingMemStore.checkAndAddToActiveSize first, then 2013 * largeCellThread enters CompactingMemStore.checkAndAddToActiveSize, and then largeCellThread 2014 * invokes flushInMemory. 2015 * <p/> 2016 * 2. After largeCellThread finished CompactingMemStore.flushInMemory method, smallCellThread 2017 * can add cell to currentActive . That is to say when largeCellThread called flushInMemory 2018 * method, CompactingMemStore.active has no cell. 2019 */ 2020 Thread.currentThread().setName(MyCompactingMemStore2.LARGE_CELL_THREAD_NAME); 2021 store.add(largeCell, new NonThreadSafeMemStoreSizing()); 2022 smallCellThread.join(); 2023 2024 for (int i = 0; i < 100; i++) { 2025 long currentTimestamp = timestamp + 100 + i; 2026 ExtendedCell cell = createCell(qf2, currentTimestamp, seqId, largeValue); 2027 store.add(cell, new NonThreadSafeMemStoreSizing()); 2028 } 2029 } finally { 2030 Thread.currentThread().setName(oldThreadName); 2031 } 2032 2033 assertTrue(exceptionRef.get() == null); 2034 2035 } 2036 2037 // This test is for HBASE-26210, HBase Write be stuck when there is cell which size exceeds 2038 // InmemoryFlushSize 2039 @Test(timeout = 60000) 2040 public void testCompactingMemStoreCellExceedInmemoryFlushSize() throws Exception { 2041 Configuration conf = HBaseConfiguration.create(); 2042 conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore6.class.getName()); 2043 2044 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) 2045 .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); 2046 2047 MyCompactingMemStore6 myCompactingMemStore = ((MyCompactingMemStore6) store.memstore); 2048 2049 int size = (int) (myCompactingMemStore.getInmemoryFlushSize()); 2050 byte[] value = new byte[size + 1]; 2051 2052 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 2053 long timestamp = EnvironmentEdgeManager.currentTime(); 2054 long seqId = 100; 2055 ExtendedCell cell = createCell(qf1, timestamp, seqId, value); 2056 int cellByteSize = MutableSegment.getCellLength(cell); 2057 store.add(cell, memStoreSizing); 2058 assertTrue(memStoreSizing.getCellsCount() == 1); 2059 assertTrue(memStoreSizing.getDataSize() == cellByteSize); 2060 // Waiting the in memory compaction completed, see HBASE-26438 2061 myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await(); 2062 } 2063 2064 /** 2065 * This test is for HBASE-27464, before this JIRA,when init {@link CellChunkImmutableSegment} for 2066 * 'COMPACT' action, we not force copy to current MSLab. When cell size bigger than 2067 * {@link MemStoreLABImpl#maxAlloc}, cell will stay in previous chunk which will recycle after 2068 * segment replace, and we may read wrong data when these chunk reused by others. 2069 */ 2070 @Test 2071 public void testForceCloneOfBigCellForCellChunkImmutableSegment() throws Exception { 2072 Configuration conf = HBaseConfiguration.create(); 2073 int maxAllocByteSize = conf.getInt(MemStoreLAB.MAX_ALLOC_KEY, MemStoreLAB.MAX_ALLOC_DEFAULT); 2074 2075 // Construct big cell,which is large than {@link MemStoreLABImpl#maxAlloc}. 2076 byte[] cellValue = new byte[maxAllocByteSize + 1]; 2077 final long timestamp = EnvironmentEdgeManager.currentTime(); 2078 final long seqId = 100; 2079 final byte[] rowKey1 = Bytes.toBytes("rowKey1"); 2080 final ExtendedCell originalCell1 = createCell(rowKey1, qf1, timestamp, seqId, cellValue); 2081 final byte[] rowKey2 = Bytes.toBytes("rowKey2"); 2082 final ExtendedCell originalCell2 = createCell(rowKey2, qf1, timestamp, seqId, cellValue); 2083 TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); 2084 quals.add(qf1); 2085 2086 int cellByteSize = MutableSegment.getCellLength(originalCell1); 2087 int inMemoryFlushByteSize = cellByteSize - 1; 2088 2089 // set CompactingMemStore.inmemoryFlushSize to flushByteSize. 2090 conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore6.class.getName()); 2091 conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005); 2092 conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(inMemoryFlushByteSize * 200)); 2093 conf.setBoolean(WALFactory.WAL_ENABLED, false); 2094 2095 // Use {@link MemoryCompactionPolicy#EAGER} for always compacting. 2096 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) 2097 .setInMemoryCompaction(MemoryCompactionPolicy.EAGER).build()); 2098 2099 MyCompactingMemStore6 myCompactingMemStore = ((MyCompactingMemStore6) store.memstore); 2100 assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == inMemoryFlushByteSize); 2101 2102 // Data chunk Pool is disabled. 2103 assertTrue(ChunkCreator.getInstance().getMaxCount(ChunkType.DATA_CHUNK) == 0); 2104 2105 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 2106 2107 // First compact 2108 store.add(originalCell1, memStoreSizing); 2109 // Waiting for the first in-memory compaction finished 2110 myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await(); 2111 2112 StoreScanner storeScanner = 2113 (StoreScanner) store.getScanner(new Scan(new Get(rowKey1)), quals, seqId + 1); 2114 SegmentScanner segmentScanner = getTypeKeyValueScanner(storeScanner, SegmentScanner.class); 2115 ExtendedCell resultCell1 = segmentScanner.next(); 2116 assertTrue(PrivateCellUtil.equals(resultCell1, originalCell1)); 2117 int cell1ChunkId = resultCell1.getChunkId(); 2118 assertTrue(cell1ChunkId != ExtendedCell.CELL_NOT_BASED_ON_CHUNK); 2119 assertNull(segmentScanner.next()); 2120 segmentScanner.close(); 2121 storeScanner.close(); 2122 Segment segment = segmentScanner.segment; 2123 assertTrue(segment instanceof CellChunkImmutableSegment); 2124 MemStoreLABImpl memStoreLAB1 = (MemStoreLABImpl) (segmentScanner.segment.getMemStoreLAB()); 2125 assertTrue(!memStoreLAB1.isClosed()); 2126 assertTrue(!memStoreLAB1.chunks.isEmpty()); 2127 assertTrue(!memStoreLAB1.isReclaimed()); 2128 2129 // Second compact 2130 store.add(originalCell2, memStoreSizing); 2131 // Waiting for the second in-memory compaction finished 2132 myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await(); 2133 2134 // Before HBASE-27464, here may throw java.lang.IllegalArgumentException: In CellChunkMap, cell 2135 // must be associated with chunk.. We were looking for a cell at index 0. 2136 // The cause for this exception is because the data chunk Pool is disabled,when the data chunks 2137 // are recycled after the second in-memory compaction finished,the 2138 // {@link ChunkCreator.putbackChunks} method does not put the chunks back to the data chunk 2139 // pool,it just removes them from {@link ChunkCreator#chunkIdMap},so in 2140 // {@link CellChunkMap#getCell} we could not get the data chunk by chunkId. 2141 storeScanner = (StoreScanner) store.getScanner(new Scan(new Get(rowKey1)), quals, seqId + 1); 2142 segmentScanner = getTypeKeyValueScanner(storeScanner, SegmentScanner.class); 2143 ExtendedCell newResultCell1 = segmentScanner.next(); 2144 assertTrue(newResultCell1 != resultCell1); 2145 assertTrue(PrivateCellUtil.equals(newResultCell1, originalCell1)); 2146 2147 ExtendedCell resultCell2 = segmentScanner.next(); 2148 assertTrue(PrivateCellUtil.equals(resultCell2, originalCell2)); 2149 assertNull(segmentScanner.next()); 2150 segmentScanner.close(); 2151 storeScanner.close(); 2152 2153 segment = segmentScanner.segment; 2154 assertTrue(segment instanceof CellChunkImmutableSegment); 2155 MemStoreLABImpl memStoreLAB2 = (MemStoreLABImpl) (segmentScanner.segment.getMemStoreLAB()); 2156 assertTrue(!memStoreLAB2.isClosed()); 2157 assertTrue(!memStoreLAB2.chunks.isEmpty()); 2158 assertTrue(!memStoreLAB2.isReclaimed()); 2159 assertTrue(memStoreLAB1.isClosed()); 2160 assertTrue(memStoreLAB1.chunks.isEmpty()); 2161 assertTrue(memStoreLAB1.isReclaimed()); 2162 } 2163 2164 // This test is for HBASE-26210 also, test write large cell and small cell concurrently when 2165 // InmemoryFlushSize is smaller,equal with and larger than cell size. 2166 @Test 2167 public void testCompactingMemStoreWriteLargeCellAndSmallCellConcurrently() 2168 throws IOException, InterruptedException { 2169 doWriteTestLargeCellAndSmallCellConcurrently( 2170 (smallCellByteSize, largeCellByteSize) -> largeCellByteSize - 1); 2171 doWriteTestLargeCellAndSmallCellConcurrently( 2172 (smallCellByteSize, largeCellByteSize) -> largeCellByteSize); 2173 doWriteTestLargeCellAndSmallCellConcurrently( 2174 (smallCellByteSize, largeCellByteSize) -> smallCellByteSize + largeCellByteSize - 1); 2175 doWriteTestLargeCellAndSmallCellConcurrently( 2176 (smallCellByteSize, largeCellByteSize) -> smallCellByteSize + largeCellByteSize); 2177 doWriteTestLargeCellAndSmallCellConcurrently( 2178 (smallCellByteSize, largeCellByteSize) -> smallCellByteSize + largeCellByteSize + 1); 2179 } 2180 2181 private void doWriteTestLargeCellAndSmallCellConcurrently(IntBinaryOperator getFlushByteSize) 2182 throws IOException, InterruptedException { 2183 2184 Configuration conf = HBaseConfiguration.create(); 2185 2186 byte[] smallValue = new byte[3]; 2187 byte[] largeValue = new byte[100]; 2188 final long timestamp = EnvironmentEdgeManager.currentTime(); 2189 final long seqId = 100; 2190 final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue); 2191 final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue); 2192 int smallCellByteSize = MutableSegment.getCellLength(smallCell); 2193 int largeCellByteSize = MutableSegment.getCellLength(largeCell); 2194 int flushByteSize = getFlushByteSize.applyAsInt(smallCellByteSize, largeCellByteSize); 2195 boolean flushByteSizeLessThanSmallAndLargeCellSize = 2196 flushByteSize < (smallCellByteSize + largeCellByteSize); 2197 2198 conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore3.class.getName()); 2199 conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005); 2200 conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200)); 2201 2202 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) 2203 .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); 2204 2205 MyCompactingMemStore3 myCompactingMemStore = ((MyCompactingMemStore3) store.memstore); 2206 assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize); 2207 myCompactingMemStore.disableCompaction(); 2208 if (flushByteSizeLessThanSmallAndLargeCellSize) { 2209 myCompactingMemStore.flushByteSizeLessThanSmallAndLargeCellSize = true; 2210 } else { 2211 myCompactingMemStore.flushByteSizeLessThanSmallAndLargeCellSize = false; 2212 } 2213 2214 final ThreadSafeMemStoreSizing memStoreSizing = new ThreadSafeMemStoreSizing(); 2215 final AtomicLong totalCellByteSize = new AtomicLong(0); 2216 final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>(); 2217 Thread smallCellThread = new Thread(() -> { 2218 try { 2219 for (int i = 1; i <= MyCompactingMemStore3.CELL_COUNT; i++) { 2220 long currentTimestamp = timestamp + i; 2221 ExtendedCell cell = createCell(qf1, currentTimestamp, seqId, smallValue); 2222 totalCellByteSize.addAndGet(MutableSegment.getCellLength(cell)); 2223 store.add(cell, memStoreSizing); 2224 } 2225 } catch (Throwable exception) { 2226 exceptionRef.set(exception); 2227 2228 } 2229 }); 2230 smallCellThread.setName(MyCompactingMemStore3.SMALL_CELL_THREAD_NAME); 2231 smallCellThread.start(); 2232 2233 String oldThreadName = Thread.currentThread().getName(); 2234 try { 2235 /** 2236 * When flushByteSizeLessThanSmallAndLargeCellSize is true: 2237 * </p> 2238 * 1.smallCellThread enters MyCompactingMemStore3.checkAndAddToActiveSize first, then 2239 * largeCellThread enters MyCompactingMemStore3.checkAndAddToActiveSize, and then 2240 * largeCellThread invokes flushInMemory. 2241 * <p/> 2242 * 2. After largeCellThread finished CompactingMemStore.flushInMemory method, smallCellThread 2243 * can run into MyCompactingMemStore3.checkAndAddToActiveSize again. 2244 * <p/> 2245 * When flushByteSizeLessThanSmallAndLargeCellSize is false: smallCellThread and 2246 * largeCellThread concurrently write one cell and wait each other, and then write another 2247 * cell etc. 2248 */ 2249 Thread.currentThread().setName(MyCompactingMemStore3.LARGE_CELL_THREAD_NAME); 2250 for (int i = 1; i <= MyCompactingMemStore3.CELL_COUNT; i++) { 2251 long currentTimestamp = timestamp + i; 2252 ExtendedCell cell = createCell(qf2, currentTimestamp, seqId, largeValue); 2253 totalCellByteSize.addAndGet(MutableSegment.getCellLength(cell)); 2254 store.add(cell, memStoreSizing); 2255 } 2256 smallCellThread.join(); 2257 2258 assertTrue(exceptionRef.get() == null); 2259 assertTrue(memStoreSizing.getCellsCount() == (MyCompactingMemStore3.CELL_COUNT * 2)); 2260 assertTrue(memStoreSizing.getDataSize() == totalCellByteSize.get()); 2261 if (flushByteSizeLessThanSmallAndLargeCellSize) { 2262 assertTrue(myCompactingMemStore.flushCounter.get() == MyCompactingMemStore3.CELL_COUNT); 2263 } else { 2264 assertTrue( 2265 myCompactingMemStore.flushCounter.get() <= (MyCompactingMemStore3.CELL_COUNT - 1)); 2266 } 2267 } finally { 2268 Thread.currentThread().setName(oldThreadName); 2269 } 2270 } 2271 2272 /** 2273 * <pre> 2274 * This test is for HBASE-26384, 2275 * test {@link CompactingMemStore#flattenOneSegment} and {@link CompactingMemStore#snapshot()} 2276 * execute concurrently. 2277 * The threads sequence before HBASE-26384 is(The bug only exists for branch-2,and I add UTs 2278 * for both branch-2 and master): 2279 * 1. The {@link CompactingMemStore} size exceeds 2280 * {@link CompactingMemStore#getInmemoryFlushSize()},the write thread adds a new 2281 * {@link ImmutableSegment} to the head of {@link CompactingMemStore#pipeline},and start a 2282 * in memory compact thread to execute {@link CompactingMemStore#inMemoryCompaction}. 2283 * 2. The in memory compact thread starts and then stopping before 2284 * {@link CompactingMemStore#flattenOneSegment}. 2285 * 3. The snapshot thread starts {@link CompactingMemStore#snapshot} concurrently,after the 2286 * snapshot thread executing {@link CompactingMemStore#getImmutableSegments},the in memory 2287 * compact thread continues. 2288 * Assuming {@link VersionedSegmentsList#version} returned from 2289 * {@link CompactingMemStore#getImmutableSegments} is v. 2290 * 4. The snapshot thread stopping before {@link CompactingMemStore#swapPipelineWithNull}. 2291 * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment}, 2292 * {@link CompactionPipeline#version} is still v. 2293 * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because 2294 * {@link CompactionPipeline#version} is v, {@link CompactingMemStore#swapPipelineWithNull} 2295 * thinks it is successful and continue flushing,but the {@link ImmutableSegment} in 2296 * {@link CompactionPipeline} has changed because 2297 * {@link CompactingMemStore#flattenOneSegment},so the {@link ImmutableSegment} is not 2298 * removed in fact and still remaining in {@link CompactionPipeline}. 2299 * 2300 * After HBASE-26384, the 5-6 step is changed to following, which is expected behavior: 2301 * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment}, 2302 * {@link CompactingMemStore#flattenOneSegment} change {@link CompactionPipeline#version} to 2303 * v+1. 2304 * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because 2305 * {@link CompactionPipeline#version} is v+1, {@link CompactingMemStore#swapPipelineWithNull} 2306 * failed and retry the while loop in {@link CompactingMemStore#pushPipelineToSnapshot} once 2307 * again, because there is no concurrent {@link CompactingMemStore#inMemoryCompaction} now, 2308 * {@link CompactingMemStore#swapPipelineWithNull} succeeds. 2309 * </pre> 2310 */ 2311 @Test 2312 public void testFlattenAndSnapshotCompactingMemStoreConcurrently() throws Exception { 2313 Configuration conf = HBaseConfiguration.create(); 2314 2315 byte[] smallValue = new byte[3]; 2316 byte[] largeValue = new byte[9]; 2317 final long timestamp = EnvironmentEdgeManager.currentTime(); 2318 final long seqId = 100; 2319 final ExtendedCell smallCell = createCell(qf1, timestamp, seqId, smallValue); 2320 final ExtendedCell largeCell = createCell(qf2, timestamp, seqId, largeValue); 2321 int smallCellByteSize = MutableSegment.getCellLength(smallCell); 2322 int largeCellByteSize = MutableSegment.getCellLength(largeCell); 2323 int totalCellByteSize = (smallCellByteSize + largeCellByteSize); 2324 int flushByteSize = totalCellByteSize - 2; 2325 2326 // set CompactingMemStore.inmemoryFlushSize to flushByteSize. 2327 conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore4.class.getName()); 2328 conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005); 2329 conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200)); 2330 2331 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) 2332 .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); 2333 2334 MyCompactingMemStore4 myCompactingMemStore = ((MyCompactingMemStore4) store.memstore); 2335 assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize); 2336 2337 store.add(smallCell, new NonThreadSafeMemStoreSizing()); 2338 store.add(largeCell, new NonThreadSafeMemStoreSizing()); 2339 2340 String oldThreadName = Thread.currentThread().getName(); 2341 try { 2342 Thread.currentThread().setName(MyCompactingMemStore4.TAKE_SNAPSHOT_THREAD_NAME); 2343 /** 2344 * {@link CompactingMemStore#snapshot} must wait the in memory compact thread enters 2345 * {@link CompactingMemStore#flattenOneSegment},because {@link CompactingMemStore#snapshot} 2346 * would invoke {@link CompactingMemStore#stopCompaction}. 2347 */ 2348 myCompactingMemStore.snapShotStartCyclicCyclicBarrier.await(); 2349 2350 MemStoreSnapshot memStoreSnapshot = myCompactingMemStore.snapshot(); 2351 myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await(); 2352 2353 assertTrue(memStoreSnapshot.getCellsCount() == 2); 2354 assertTrue(((int) (memStoreSnapshot.getDataSize())) == totalCellByteSize); 2355 VersionedSegmentsList segments = myCompactingMemStore.getImmutableSegments(); 2356 assertTrue(segments.getNumOfSegments() == 0); 2357 assertTrue(segments.getNumOfCells() == 0); 2358 assertTrue(myCompactingMemStore.setInMemoryCompactionFlagCounter.get() == 1); 2359 assertTrue(myCompactingMemStore.swapPipelineWithNullCounter.get() == 2); 2360 } finally { 2361 Thread.currentThread().setName(oldThreadName); 2362 } 2363 } 2364 2365 /** 2366 * <pre> 2367 * This test is for HBASE-26384, 2368 * test {@link CompactingMemStore#flattenOneSegment}{@link CompactingMemStore#snapshot()} 2369 * and writeMemStore execute concurrently. 2370 * The threads sequence before HBASE-26384 is(The bug only exists for branch-2,and I add UTs 2371 * for both branch-2 and master): 2372 * 1. The {@link CompactingMemStore} size exceeds 2373 * {@link CompactingMemStore#getInmemoryFlushSize()},the write thread adds a new 2374 * {@link ImmutableSegment} to the head of {@link CompactingMemStore#pipeline},and start a 2375 * in memory compact thread to execute {@link CompactingMemStore#inMemoryCompaction}. 2376 * 2. The in memory compact thread starts and then stopping before 2377 * {@link CompactingMemStore#flattenOneSegment}. 2378 * 3. The snapshot thread starts {@link CompactingMemStore#snapshot} concurrently,after the 2379 * snapshot thread executing {@link CompactingMemStore#getImmutableSegments},the in memory 2380 * compact thread continues. 2381 * Assuming {@link VersionedSegmentsList#version} returned from 2382 * {@link CompactingMemStore#getImmutableSegments} is v. 2383 * 4. The snapshot thread stopping before {@link CompactingMemStore#swapPipelineWithNull}. 2384 * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment}, 2385 * {@link CompactionPipeline#version} is still v. 2386 * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because 2387 * {@link CompactionPipeline#version} is v, {@link CompactingMemStore#swapPipelineWithNull} 2388 * thinks it is successful and continue flushing,but the {@link ImmutableSegment} in 2389 * {@link CompactionPipeline} has changed because 2390 * {@link CompactingMemStore#flattenOneSegment},so the {@link ImmutableSegment} is not 2391 * removed in fact and still remaining in {@link CompactionPipeline}. 2392 * 2393 * After HBASE-26384, the 5-6 step is changed to following, which is expected behavior, 2394 * and I add step 7-8 to test there is new segment added before retry. 2395 * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment}, 2396 * {@link CompactingMemStore#flattenOneSegment} change {@link CompactionPipeline#version} to 2397 * v+1. 2398 * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because 2399 * {@link CompactionPipeline#version} is v+1, {@link CompactingMemStore#swapPipelineWithNull} 2400 * failed and retry,{@link VersionedSegmentsList#version} returned from 2401 * {@link CompactingMemStore#getImmutableSegments} is v+1. 2402 * 7. The write thread continues writing to {@link CompactingMemStore} and 2403 * {@link CompactingMemStore} size exceeds {@link CompactingMemStore#getInmemoryFlushSize()}, 2404 * {@link CompactingMemStore#flushInMemory(MutableSegment)} is called and a new 2405 * {@link ImmutableSegment} is added to the head of {@link CompactingMemStore#pipeline}, 2406 * {@link CompactionPipeline#version} is still v+1. 2407 * 8. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because 2408 * {@link CompactionPipeline#version} is still v+1, 2409 * {@link CompactingMemStore#swapPipelineWithNull} succeeds.The new {@link ImmutableSegment} 2410 * remained at the head of {@link CompactingMemStore#pipeline},the old is removed by 2411 * {@link CompactingMemStore#swapPipelineWithNull}. 2412 * </pre> 2413 */ 2414 @Test 2415 public void testFlattenSnapshotWriteCompactingMemeStoreConcurrently() throws Exception { 2416 Configuration conf = HBaseConfiguration.create(); 2417 2418 byte[] smallValue = new byte[3]; 2419 byte[] largeValue = new byte[9]; 2420 final long timestamp = EnvironmentEdgeManager.currentTime(); 2421 final long seqId = 100; 2422 final ExtendedCell smallCell = createCell(qf1, timestamp, seqId, smallValue); 2423 final ExtendedCell largeCell = createCell(qf2, timestamp, seqId, largeValue); 2424 int smallCellByteSize = MutableSegment.getCellLength(smallCell); 2425 int largeCellByteSize = MutableSegment.getCellLength(largeCell); 2426 int firstWriteCellByteSize = (smallCellByteSize + largeCellByteSize); 2427 int flushByteSize = firstWriteCellByteSize - 2; 2428 2429 // set CompactingMemStore.inmemoryFlushSize to flushByteSize. 2430 conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore5.class.getName()); 2431 conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005); 2432 conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200)); 2433 2434 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) 2435 .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); 2436 2437 final MyCompactingMemStore5 myCompactingMemStore = ((MyCompactingMemStore5) store.memstore); 2438 assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize); 2439 2440 store.add(smallCell, new NonThreadSafeMemStoreSizing()); 2441 store.add(largeCell, new NonThreadSafeMemStoreSizing()); 2442 2443 final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>(); 2444 final ExtendedCell writeAgainCell1 = createCell(qf3, timestamp, seqId + 1, largeValue); 2445 final ExtendedCell writeAgainCell2 = createCell(qf4, timestamp, seqId + 1, largeValue); 2446 final int writeAgainCellByteSize = 2447 MutableSegment.getCellLength(writeAgainCell1) + MutableSegment.getCellLength(writeAgainCell2); 2448 final Thread writeAgainThread = new Thread(() -> { 2449 try { 2450 myCompactingMemStore.writeMemStoreAgainStartCyclicBarrier.await(); 2451 2452 store.add(writeAgainCell1, new NonThreadSafeMemStoreSizing()); 2453 store.add(writeAgainCell2, new NonThreadSafeMemStoreSizing()); 2454 2455 myCompactingMemStore.writeMemStoreAgainEndCyclicBarrier.await(); 2456 } catch (Throwable exception) { 2457 exceptionRef.set(exception); 2458 } 2459 }); 2460 writeAgainThread.setName(MyCompactingMemStore5.WRITE_AGAIN_THREAD_NAME); 2461 writeAgainThread.start(); 2462 2463 String oldThreadName = Thread.currentThread().getName(); 2464 try { 2465 Thread.currentThread().setName(MyCompactingMemStore5.TAKE_SNAPSHOT_THREAD_NAME); 2466 /** 2467 * {@link CompactingMemStore#snapshot} must wait the in memory compact thread enters 2468 * {@link CompactingMemStore#flattenOneSegment},because {@link CompactingMemStore#snapshot} 2469 * would invoke {@link CompactingMemStore#stopCompaction}. 2470 */ 2471 myCompactingMemStore.snapShotStartCyclicCyclicBarrier.await(); 2472 MemStoreSnapshot memStoreSnapshot = myCompactingMemStore.snapshot(); 2473 myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await(); 2474 writeAgainThread.join(); 2475 2476 assertTrue(memStoreSnapshot.getCellsCount() == 2); 2477 assertTrue(((int) (memStoreSnapshot.getDataSize())) == firstWriteCellByteSize); 2478 VersionedSegmentsList segments = myCompactingMemStore.getImmutableSegments(); 2479 assertTrue(segments.getNumOfSegments() == 1); 2480 assertTrue( 2481 ((int) (segments.getStoreSegments().get(0).getDataSize())) == writeAgainCellByteSize); 2482 assertTrue(segments.getNumOfCells() == 2); 2483 assertTrue(myCompactingMemStore.setInMemoryCompactionFlagCounter.get() == 2); 2484 assertTrue(exceptionRef.get() == null); 2485 assertTrue(myCompactingMemStore.swapPipelineWithNullCounter.get() == 2); 2486 } finally { 2487 Thread.currentThread().setName(oldThreadName); 2488 } 2489 } 2490 2491 /** 2492 * <pre> 2493 * This test is for HBASE-26465, 2494 * test {@link DefaultMemStore#clearSnapshot} and {@link DefaultMemStore#getScanners} execute 2495 * concurrently. The threads sequence before HBASE-26465 is: 2496 * 1.The flush thread starts {@link DefaultMemStore} flushing after some cells have be added to 2497 * {@link DefaultMemStore}. 2498 * 2.The flush thread stopping before {@link DefaultMemStore#clearSnapshot} in 2499 * {@link HStore#updateStorefiles} after completed flushing memStore to hfile. 2500 * 3.The scan thread starts and stopping after {@link DefaultMemStore#getSnapshotSegments} in 2501 * {@link DefaultMemStore#getScanners},here the scan thread gets the 2502 * {@link DefaultMemStore#snapshot} which is created by the flush thread. 2503 * 4.The flush thread continues {@link DefaultMemStore#clearSnapshot} and close 2504 * {@link DefaultMemStore#snapshot},because the reference count of the corresponding 2505 * {@link MemStoreLABImpl} is 0, the {@link Chunk}s in corresponding {@link MemStoreLABImpl} 2506 * are recycled. 2507 * 5.The scan thread continues {@link DefaultMemStore#getScanners},and create a 2508 * {@link SegmentScanner} for this {@link DefaultMemStore#snapshot}, and increase the 2509 * reference count of the corresponding {@link MemStoreLABImpl}, but {@link Chunk}s in 2510 * corresponding {@link MemStoreLABImpl} are recycled by step 4, and these {@link Chunk}s may 2511 * be overwritten by other write threads,which may cause serious problem. 2512 * After HBASE-26465,{@link DefaultMemStore#getScanners} and 2513 * {@link DefaultMemStore#clearSnapshot} could not execute concurrently. 2514 * </pre> 2515 */ 2516 @Test 2517 public void testClearSnapshotGetScannerConcurrently() throws Exception { 2518 Configuration conf = HBaseConfiguration.create(); 2519 2520 byte[] smallValue = new byte[3]; 2521 byte[] largeValue = new byte[9]; 2522 final long timestamp = EnvironmentEdgeManager.currentTime(); 2523 final long seqId = 100; 2524 final ExtendedCell smallCell = createCell(qf1, timestamp, seqId, smallValue); 2525 final ExtendedCell largeCell = createCell(qf2, timestamp, seqId, largeValue); 2526 TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); 2527 quals.add(qf1); 2528 quals.add(qf2); 2529 2530 conf.set(HStore.MEMSTORE_CLASS_NAME, MyDefaultMemStore.class.getName()); 2531 conf.setBoolean(WALFactory.WAL_ENABLED, false); 2532 2533 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build()); 2534 MyDefaultMemStore myDefaultMemStore = (MyDefaultMemStore) (store.memstore); 2535 myDefaultMemStore.store = store; 2536 2537 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 2538 store.add(smallCell, memStoreSizing); 2539 store.add(largeCell, memStoreSizing); 2540 2541 final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>(); 2542 final Thread flushThread = new Thread(() -> { 2543 try { 2544 flushStore(store, id++); 2545 } catch (Throwable exception) { 2546 exceptionRef.set(exception); 2547 } 2548 }); 2549 flushThread.setName(MyDefaultMemStore.FLUSH_THREAD_NAME); 2550 flushThread.start(); 2551 2552 String oldThreadName = Thread.currentThread().getName(); 2553 StoreScanner storeScanner = null; 2554 try { 2555 Thread.currentThread().setName(MyDefaultMemStore.GET_SCANNER_THREAD_NAME); 2556 2557 /** 2558 * Wait flush thread stopping before {@link DefaultMemStore#doClearSnapshot} 2559 */ 2560 myDefaultMemStore.getScannerCyclicBarrier.await(); 2561 2562 storeScanner = (StoreScanner) store.getScanner(new Scan(new Get(row)), quals, seqId + 1); 2563 flushThread.join(); 2564 2565 if (myDefaultMemStore.shouldWait) { 2566 SegmentScanner segmentScanner = getTypeKeyValueScanner(storeScanner, SegmentScanner.class); 2567 MemStoreLABImpl memStoreLAB = (MemStoreLABImpl) (segmentScanner.segment.getMemStoreLAB()); 2568 assertTrue(memStoreLAB.isClosed()); 2569 assertTrue(!memStoreLAB.chunks.isEmpty()); 2570 assertTrue(!memStoreLAB.isReclaimed()); 2571 2572 ExtendedCell cell1 = segmentScanner.next(); 2573 PrivateCellUtil.equals(smallCell, cell1); 2574 ExtendedCell cell2 = segmentScanner.next(); 2575 PrivateCellUtil.equals(largeCell, cell2); 2576 assertNull(segmentScanner.next()); 2577 } else { 2578 List<ExtendedCell> results = new ArrayList<>(); 2579 storeScanner.next((List) results); 2580 assertEquals(2, results.size()); 2581 PrivateCellUtil.equals(smallCell, results.get(0)); 2582 PrivateCellUtil.equals(largeCell, results.get(1)); 2583 } 2584 assertTrue(exceptionRef.get() == null); 2585 } finally { 2586 if (storeScanner != null) { 2587 storeScanner.close(); 2588 } 2589 Thread.currentThread().setName(oldThreadName); 2590 } 2591 } 2592 2593 @SuppressWarnings("unchecked") 2594 private <T> T getTypeKeyValueScanner(StoreScanner storeScanner, Class<T> keyValueScannerClass) { 2595 List<T> resultScanners = new ArrayList<T>(); 2596 for (KeyValueScanner keyValueScanner : storeScanner.currentScanners) { 2597 if (keyValueScannerClass.isInstance(keyValueScanner)) { 2598 resultScanners.add((T) keyValueScanner); 2599 } 2600 } 2601 assertTrue(resultScanners.size() == 1); 2602 return resultScanners.get(0); 2603 } 2604 2605 @Test 2606 public void testOnConfigurationChange() throws IOException { 2607 final int COMMON_MAX_FILES_TO_COMPACT = 10; 2608 final int NEW_COMMON_MAX_FILES_TO_COMPACT = 8; 2609 final int STORE_MAX_FILES_TO_COMPACT = 6; 2610 2611 // Build a table that its maxFileToCompact different from common configuration. 2612 Configuration conf = HBaseConfiguration.create(); 2613 conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 2614 COMMON_MAX_FILES_TO_COMPACT); 2615 conf.setBoolean(CACHE_DATA_ON_READ_KEY, false); 2616 conf.setBoolean(CACHE_BLOCKS_ON_WRITE_KEY, true); 2617 conf.setBoolean(EVICT_BLOCKS_ON_CLOSE_KEY, true); 2618 ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.newBuilder(family) 2619 .setConfiguration(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 2620 String.valueOf(STORE_MAX_FILES_TO_COMPACT)) 2621 .build(); 2622 init(this.name.getMethodName(), conf, hcd); 2623 2624 // After updating common configuration, the conf in HStore itself must not be changed. 2625 conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 2626 NEW_COMMON_MAX_FILES_TO_COMPACT); 2627 this.store.onConfigurationChange(conf); 2628 2629 assertEquals(STORE_MAX_FILES_TO_COMPACT, 2630 store.getStoreEngine().getCompactionPolicy().getConf().getMaxFilesToCompact()); 2631 2632 assertEquals(conf.getBoolean(CACHE_DATA_ON_READ_KEY, DEFAULT_CACHE_DATA_ON_READ), false); 2633 assertEquals(conf.getBoolean(CACHE_BLOCKS_ON_WRITE_KEY, DEFAULT_CACHE_DATA_ON_WRITE), true); 2634 assertEquals(conf.getBoolean(EVICT_BLOCKS_ON_CLOSE_KEY, DEFAULT_EVICT_ON_CLOSE), true); 2635 2636 // reset to default values 2637 conf.getBoolean(CACHE_DATA_ON_READ_KEY, DEFAULT_CACHE_DATA_ON_READ); 2638 conf.getBoolean(CACHE_BLOCKS_ON_WRITE_KEY, DEFAULT_CACHE_DATA_ON_WRITE); 2639 conf.getBoolean(EVICT_BLOCKS_ON_CLOSE_KEY, DEFAULT_EVICT_ON_CLOSE); 2640 this.store.onConfigurationChange(conf); 2641 } 2642 2643 /** 2644 * This test is for HBASE-26476 2645 */ 2646 @Test 2647 public void testExtendsDefaultMemStore() throws Exception { 2648 Configuration conf = HBaseConfiguration.create(); 2649 conf.setBoolean(WALFactory.WAL_ENABLED, false); 2650 2651 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build()); 2652 assertTrue(this.store.memstore.getClass() == DefaultMemStore.class); 2653 tearDown(); 2654 2655 conf.set(HStore.MEMSTORE_CLASS_NAME, CustomDefaultMemStore.class.getName()); 2656 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build()); 2657 assertTrue(this.store.memstore.getClass() == CustomDefaultMemStore.class); 2658 } 2659 2660 static class CustomDefaultMemStore extends DefaultMemStore { 2661 2662 public CustomDefaultMemStore(Configuration conf, CellComparator c, 2663 RegionServicesForStores regionServices) { 2664 super(conf, c, regionServices); 2665 } 2666 2667 } 2668 2669 /** 2670 * This test is for HBASE-26488 2671 */ 2672 @Test 2673 public void testMemoryLeakWhenFlushMemStoreRetrying() throws Exception { 2674 Configuration conf = HBaseConfiguration.create(); 2675 2676 byte[] smallValue = new byte[3]; 2677 byte[] largeValue = new byte[9]; 2678 final long timestamp = EnvironmentEdgeManager.currentTime(); 2679 final long seqId = 100; 2680 final ExtendedCell smallCell = createCell(qf1, timestamp, seqId, smallValue); 2681 final ExtendedCell largeCell = createCell(qf2, timestamp, seqId, largeValue); 2682 TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); 2683 quals.add(qf1); 2684 quals.add(qf2); 2685 2686 conf.set(HStore.MEMSTORE_CLASS_NAME, MyDefaultMemStore1.class.getName()); 2687 conf.setBoolean(WALFactory.WAL_ENABLED, false); 2688 conf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY, 2689 MyDefaultStoreFlusher.class.getName()); 2690 2691 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build()); 2692 MyDefaultMemStore1 myDefaultMemStore = (MyDefaultMemStore1) (store.memstore); 2693 assertTrue((store.storeEngine.getStoreFlusher()) instanceof MyDefaultStoreFlusher); 2694 2695 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 2696 store.add(smallCell, memStoreSizing); 2697 store.add(largeCell, memStoreSizing); 2698 flushStore(store, id++); 2699 2700 MemStoreLABImpl memStoreLAB = 2701 (MemStoreLABImpl) (myDefaultMemStore.snapshotImmutableSegment.getMemStoreLAB()); 2702 assertTrue(memStoreLAB.isClosed()); 2703 assertTrue(memStoreLAB.getRefCntValue() == 0); 2704 assertTrue(memStoreLAB.isReclaimed()); 2705 assertTrue(memStoreLAB.chunks.isEmpty()); 2706 StoreScanner storeScanner = null; 2707 try { 2708 storeScanner = (StoreScanner) store.getScanner(new Scan(new Get(row)), quals, seqId + 1); 2709 assertTrue(store.storeEngine.getStoreFileManager().getStorefileCount() == 1); 2710 assertTrue(store.memstore.size().getCellsCount() == 0); 2711 assertTrue(store.memstore.getSnapshotSize().getCellsCount() == 0); 2712 assertTrue(storeScanner.currentScanners.size() == 1); 2713 assertTrue(storeScanner.currentScanners.get(0) instanceof StoreFileScanner); 2714 2715 List<ExtendedCell> results = new ArrayList<>(); 2716 storeScanner.next((List) results); 2717 assertEquals(2, results.size()); 2718 PrivateCellUtil.equals(smallCell, results.get(0)); 2719 PrivateCellUtil.equals(largeCell, results.get(1)); 2720 } finally { 2721 if (storeScanner != null) { 2722 storeScanner.close(); 2723 } 2724 } 2725 } 2726 2727 static class MyDefaultMemStore1 extends DefaultMemStore { 2728 2729 private ImmutableSegment snapshotImmutableSegment; 2730 2731 public MyDefaultMemStore1(Configuration conf, CellComparator c, 2732 RegionServicesForStores regionServices) { 2733 super(conf, c, regionServices); 2734 } 2735 2736 @Override 2737 public MemStoreSnapshot snapshot() { 2738 MemStoreSnapshot result = super.snapshot(); 2739 this.snapshotImmutableSegment = snapshot; 2740 return result; 2741 } 2742 2743 } 2744 2745 public static class MyDefaultStoreFlusher extends DefaultStoreFlusher { 2746 private static final AtomicInteger failCounter = new AtomicInteger(1); 2747 private static final AtomicInteger counter = new AtomicInteger(0); 2748 2749 public MyDefaultStoreFlusher(Configuration conf, HStore store) { 2750 super(conf, store); 2751 } 2752 2753 @Override 2754 public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId, 2755 MonitoredTask status, ThroughputController throughputController, 2756 FlushLifeCycleTracker tracker, Consumer<Path> writerCreationTracker) throws IOException { 2757 counter.incrementAndGet(); 2758 return super.flushSnapshot(snapshot, cacheFlushId, status, throughputController, tracker, 2759 writerCreationTracker); 2760 } 2761 2762 @Override 2763 protected void performFlush(InternalScanner scanner, final CellSink sink, 2764 ThroughputController throughputController) throws IOException { 2765 2766 final int currentCount = counter.get(); 2767 CellSink newCellSink = (cell) -> { 2768 if (currentCount <= failCounter.get()) { 2769 throw new IOException("Simulated exception by tests"); 2770 } 2771 sink.append(cell); 2772 }; 2773 super.performFlush(scanner, newCellSink, throughputController); 2774 } 2775 } 2776 2777 /** 2778 * This test is for HBASE-26494, test the {@link RefCnt} behaviors in {@link ImmutableMemStoreLAB} 2779 */ 2780 @Test 2781 public void testImmutableMemStoreLABRefCnt() throws Exception { 2782 Configuration conf = HBaseConfiguration.create(); 2783 2784 byte[] smallValue = new byte[3]; 2785 byte[] largeValue = new byte[9]; 2786 final long timestamp = EnvironmentEdgeManager.currentTime(); 2787 final long seqId = 100; 2788 final ExtendedCell smallCell1 = createCell(qf1, timestamp, seqId, smallValue); 2789 final ExtendedCell largeCell1 = createCell(qf2, timestamp, seqId, largeValue); 2790 final ExtendedCell smallCell2 = createCell(qf3, timestamp, seqId + 1, smallValue); 2791 final ExtendedCell largeCell2 = createCell(qf4, timestamp, seqId + 1, largeValue); 2792 final ExtendedCell smallCell3 = createCell(qf5, timestamp, seqId + 2, smallValue); 2793 final ExtendedCell largeCell3 = createCell(qf6, timestamp, seqId + 2, largeValue); 2794 2795 int smallCellByteSize = MutableSegment.getCellLength(smallCell1); 2796 int largeCellByteSize = MutableSegment.getCellLength(largeCell1); 2797 int firstWriteCellByteSize = (smallCellByteSize + largeCellByteSize); 2798 int flushByteSize = firstWriteCellByteSize - 2; 2799 2800 // set CompactingMemStore.inmemoryFlushSize to flushByteSize. 2801 conf.set(HStore.MEMSTORE_CLASS_NAME, CompactingMemStore.class.getName()); 2802 conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005); 2803 conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200)); 2804 conf.setBoolean(WALFactory.WAL_ENABLED, false); 2805 2806 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) 2807 .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); 2808 2809 final CompactingMemStore myCompactingMemStore = ((CompactingMemStore) store.memstore); 2810 assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize); 2811 myCompactingMemStore.allowCompaction.set(false); 2812 2813 NonThreadSafeMemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 2814 store.add(smallCell1, memStoreSizing); 2815 store.add(largeCell1, memStoreSizing); 2816 store.add(smallCell2, memStoreSizing); 2817 store.add(largeCell2, memStoreSizing); 2818 store.add(smallCell3, memStoreSizing); 2819 store.add(largeCell3, memStoreSizing); 2820 VersionedSegmentsList versionedSegmentsList = myCompactingMemStore.getImmutableSegments(); 2821 assertTrue(versionedSegmentsList.getNumOfSegments() == 3); 2822 List<ImmutableSegment> segments = versionedSegmentsList.getStoreSegments(); 2823 List<MemStoreLABImpl> memStoreLABs = new ArrayList<MemStoreLABImpl>(segments.size()); 2824 for (ImmutableSegment segment : segments) { 2825 memStoreLABs.add((MemStoreLABImpl) segment.getMemStoreLAB()); 2826 } 2827 List<KeyValueScanner> scanners1 = myCompactingMemStore.getScanners(Long.MAX_VALUE); 2828 for (MemStoreLABImpl memStoreLAB : memStoreLABs) { 2829 assertTrue(memStoreLAB.getRefCntValue() == 2); 2830 } 2831 2832 myCompactingMemStore.allowCompaction.set(true); 2833 myCompactingMemStore.flushInMemory(); 2834 2835 versionedSegmentsList = myCompactingMemStore.getImmutableSegments(); 2836 assertTrue(versionedSegmentsList.getNumOfSegments() == 1); 2837 ImmutableMemStoreLAB immutableMemStoreLAB = 2838 (ImmutableMemStoreLAB) (versionedSegmentsList.getStoreSegments().get(0).getMemStoreLAB()); 2839 for (MemStoreLABImpl memStoreLAB : memStoreLABs) { 2840 assertTrue(memStoreLAB.getRefCntValue() == 2); 2841 } 2842 2843 List<KeyValueScanner> scanners2 = myCompactingMemStore.getScanners(Long.MAX_VALUE); 2844 for (MemStoreLABImpl memStoreLAB : memStoreLABs) { 2845 assertTrue(memStoreLAB.getRefCntValue() == 2); 2846 } 2847 assertTrue(immutableMemStoreLAB.getRefCntValue() == 2); 2848 for (KeyValueScanner scanner : scanners1) { 2849 scanner.close(); 2850 } 2851 for (MemStoreLABImpl memStoreLAB : memStoreLABs) { 2852 assertTrue(memStoreLAB.getRefCntValue() == 1); 2853 } 2854 for (KeyValueScanner scanner : scanners2) { 2855 scanner.close(); 2856 } 2857 for (MemStoreLABImpl memStoreLAB : memStoreLABs) { 2858 assertTrue(memStoreLAB.getRefCntValue() == 1); 2859 } 2860 assertTrue(immutableMemStoreLAB.getRefCntValue() == 1); 2861 flushStore(store, id++); 2862 for (MemStoreLABImpl memStoreLAB : memStoreLABs) { 2863 assertTrue(memStoreLAB.getRefCntValue() == 0); 2864 } 2865 assertTrue(immutableMemStoreLAB.getRefCntValue() == 0); 2866 assertTrue(immutableMemStoreLAB.isClosed()); 2867 for (MemStoreLABImpl memStoreLAB : memStoreLABs) { 2868 assertTrue(memStoreLAB.isClosed()); 2869 assertTrue(memStoreLAB.isReclaimed()); 2870 assertTrue(memStoreLAB.chunks.isEmpty()); 2871 } 2872 } 2873 2874 private HStoreFile mockStoreFileWithLength(long length) { 2875 HStoreFile sf = mock(HStoreFile.class); 2876 StoreFileReader sfr = mock(StoreFileReader.class); 2877 when(sf.isHFile()).thenReturn(true); 2878 when(sf.getReader()).thenReturn(sfr); 2879 when(sfr.length()).thenReturn(length); 2880 return sf; 2881 } 2882 2883 private static class MyThread extends Thread { 2884 private StoreScanner scanner; 2885 private KeyValueHeap heap; 2886 2887 public MyThread(StoreScanner scanner) { 2888 this.scanner = scanner; 2889 } 2890 2891 public KeyValueHeap getHeap() { 2892 return this.heap; 2893 } 2894 2895 @Override 2896 public void run() { 2897 scanner.trySwitchToStreamRead(); 2898 heap = scanner.heap; 2899 } 2900 } 2901 2902 private static class MyMemStoreCompactor extends MemStoreCompactor { 2903 private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0); 2904 private static final CountDownLatch START_COMPACTOR_LATCH = new CountDownLatch(1); 2905 2906 public MyMemStoreCompactor(CompactingMemStore compactingMemStore, 2907 MemoryCompactionPolicy compactionPolicy) throws IllegalArgumentIOException { 2908 super(compactingMemStore, compactionPolicy); 2909 } 2910 2911 @Override 2912 public boolean start() throws IOException { 2913 boolean isFirst = RUNNER_COUNT.getAndIncrement() == 0; 2914 if (isFirst) { 2915 try { 2916 START_COMPACTOR_LATCH.await(); 2917 return super.start(); 2918 } catch (InterruptedException ex) { 2919 throw new RuntimeException(ex); 2920 } 2921 } 2922 return super.start(); 2923 } 2924 } 2925 2926 public static class MyCompactingMemStoreWithCustomCompactor extends CompactingMemStore { 2927 private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0); 2928 2929 public MyCompactingMemStoreWithCustomCompactor(Configuration conf, CellComparatorImpl c, 2930 HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy) 2931 throws IOException { 2932 super(conf, c, store, regionServices, compactionPolicy); 2933 } 2934 2935 @Override 2936 protected MemStoreCompactor createMemStoreCompactor(MemoryCompactionPolicy compactionPolicy) 2937 throws IllegalArgumentIOException { 2938 return new MyMemStoreCompactor(this, compactionPolicy); 2939 } 2940 2941 @Override 2942 protected boolean setInMemoryCompactionFlag() { 2943 boolean rval = super.setInMemoryCompactionFlag(); 2944 if (rval) { 2945 RUNNER_COUNT.incrementAndGet(); 2946 if (LOG.isDebugEnabled()) { 2947 LOG.debug("runner count: " + RUNNER_COUNT.get()); 2948 } 2949 } 2950 return rval; 2951 } 2952 } 2953 2954 public static class MyCompactingMemStore extends CompactingMemStore { 2955 private static final AtomicBoolean START_TEST = new AtomicBoolean(false); 2956 private final CountDownLatch getScannerLatch = new CountDownLatch(1); 2957 private final CountDownLatch snapshotLatch = new CountDownLatch(1); 2958 2959 public MyCompactingMemStore(Configuration conf, CellComparatorImpl c, HStore store, 2960 RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy) 2961 throws IOException { 2962 super(conf, c, store, regionServices, compactionPolicy); 2963 } 2964 2965 @Override 2966 protected List<KeyValueScanner> createList(int capacity) { 2967 if (START_TEST.get()) { 2968 try { 2969 getScannerLatch.countDown(); 2970 snapshotLatch.await(); 2971 } catch (InterruptedException e) { 2972 throw new RuntimeException(e); 2973 } 2974 } 2975 return new ArrayList<>(capacity); 2976 } 2977 2978 @Override 2979 protected void pushActiveToPipeline(MutableSegment active, boolean checkEmpty) { 2980 if (START_TEST.get()) { 2981 try { 2982 getScannerLatch.await(); 2983 } catch (InterruptedException e) { 2984 throw new RuntimeException(e); 2985 } 2986 } 2987 2988 super.pushActiveToPipeline(active, checkEmpty); 2989 if (START_TEST.get()) { 2990 snapshotLatch.countDown(); 2991 } 2992 } 2993 } 2994 2995 interface MyListHook { 2996 void hook(int currentSize); 2997 } 2998 2999 private static class MyList<T> implements List<T> { 3000 private final List<T> delegatee = new ArrayList<>(); 3001 private final MyListHook hookAtAdd; 3002 3003 MyList(final MyListHook hookAtAdd) { 3004 this.hookAtAdd = hookAtAdd; 3005 } 3006 3007 @Override 3008 public int size() { 3009 return delegatee.size(); 3010 } 3011 3012 @Override 3013 public boolean isEmpty() { 3014 return delegatee.isEmpty(); 3015 } 3016 3017 @Override 3018 public boolean contains(Object o) { 3019 return delegatee.contains(o); 3020 } 3021 3022 @Override 3023 public Iterator<T> iterator() { 3024 return delegatee.iterator(); 3025 } 3026 3027 @Override 3028 public Object[] toArray() { 3029 return delegatee.toArray(); 3030 } 3031 3032 @Override 3033 public <R> R[] toArray(R[] a) { 3034 return delegatee.toArray(a); 3035 } 3036 3037 @Override 3038 public boolean add(T e) { 3039 hookAtAdd.hook(size()); 3040 return delegatee.add(e); 3041 } 3042 3043 @Override 3044 public boolean remove(Object o) { 3045 return delegatee.remove(o); 3046 } 3047 3048 @Override 3049 public boolean containsAll(Collection<?> c) { 3050 return delegatee.containsAll(c); 3051 } 3052 3053 @Override 3054 public boolean addAll(Collection<? extends T> c) { 3055 return delegatee.addAll(c); 3056 } 3057 3058 @Override 3059 public boolean addAll(int index, Collection<? extends T> c) { 3060 return delegatee.addAll(index, c); 3061 } 3062 3063 @Override 3064 public boolean removeAll(Collection<?> c) { 3065 return delegatee.removeAll(c); 3066 } 3067 3068 @Override 3069 public boolean retainAll(Collection<?> c) { 3070 return delegatee.retainAll(c); 3071 } 3072 3073 @Override 3074 public void clear() { 3075 delegatee.clear(); 3076 } 3077 3078 @Override 3079 public T get(int index) { 3080 return delegatee.get(index); 3081 } 3082 3083 @Override 3084 public T set(int index, T element) { 3085 return delegatee.set(index, element); 3086 } 3087 3088 @Override 3089 public void add(int index, T element) { 3090 delegatee.add(index, element); 3091 } 3092 3093 @Override 3094 public T remove(int index) { 3095 return delegatee.remove(index); 3096 } 3097 3098 @Override 3099 public int indexOf(Object o) { 3100 return delegatee.indexOf(o); 3101 } 3102 3103 @Override 3104 public int lastIndexOf(Object o) { 3105 return delegatee.lastIndexOf(o); 3106 } 3107 3108 @Override 3109 public ListIterator<T> listIterator() { 3110 return delegatee.listIterator(); 3111 } 3112 3113 @Override 3114 public ListIterator<T> listIterator(int index) { 3115 return delegatee.listIterator(index); 3116 } 3117 3118 @Override 3119 public List<T> subList(int fromIndex, int toIndex) { 3120 return delegatee.subList(fromIndex, toIndex); 3121 } 3122 } 3123 3124 public static class MyCompactingMemStore2 extends CompactingMemStore { 3125 private static final String LARGE_CELL_THREAD_NAME = "largeCellThread"; 3126 private static final String SMALL_CELL_THREAD_NAME = "smallCellThread"; 3127 private final CyclicBarrier preCyclicBarrier = new CyclicBarrier(2); 3128 private final CyclicBarrier postCyclicBarrier = new CyclicBarrier(2); 3129 private final AtomicInteger largeCellPreUpdateCounter = new AtomicInteger(0); 3130 private final AtomicInteger smallCellPreUpdateCounter = new AtomicInteger(0); 3131 3132 public MyCompactingMemStore2(Configuration conf, CellComparatorImpl cellComparator, 3133 HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy) 3134 throws IOException { 3135 super(conf, cellComparator, store, regionServices, compactionPolicy); 3136 } 3137 3138 @Override 3139 protected boolean checkAndAddToActiveSize(MutableSegment currActive, Cell cellToAdd, 3140 MemStoreSizing memstoreSizing) { 3141 if (Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)) { 3142 int currentCount = largeCellPreUpdateCounter.incrementAndGet(); 3143 if (currentCount <= 1) { 3144 try { 3145 /** 3146 * smallCellThread enters CompactingMemStore.checkAndAddToActiveSize first, then 3147 * largeCellThread enters CompactingMemStore.checkAndAddToActiveSize, and then 3148 * largeCellThread invokes flushInMemory. 3149 */ 3150 preCyclicBarrier.await(); 3151 } catch (Throwable e) { 3152 throw new RuntimeException(e); 3153 } 3154 } 3155 } 3156 3157 boolean returnValue = super.checkAndAddToActiveSize(currActive, cellToAdd, memstoreSizing); 3158 if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) { 3159 try { 3160 preCyclicBarrier.await(); 3161 } catch (Throwable e) { 3162 throw new RuntimeException(e); 3163 } 3164 } 3165 return returnValue; 3166 } 3167 3168 @Override 3169 protected void doAdd(MutableSegment currentActive, ExtendedCell cell, 3170 MemStoreSizing memstoreSizing) { 3171 if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) { 3172 try { 3173 /** 3174 * After largeCellThread finished flushInMemory method, smallCellThread can add cell to 3175 * currentActive . That is to say when largeCellThread called flushInMemory method, 3176 * currentActive has no cell. 3177 */ 3178 postCyclicBarrier.await(); 3179 } catch (Throwable e) { 3180 throw new RuntimeException(e); 3181 } 3182 } 3183 super.doAdd(currentActive, cell, memstoreSizing); 3184 } 3185 3186 @Override 3187 protected void flushInMemory(MutableSegment currentActiveMutableSegment) { 3188 super.flushInMemory(currentActiveMutableSegment); 3189 if (Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)) { 3190 if (largeCellPreUpdateCounter.get() <= 1) { 3191 try { 3192 postCyclicBarrier.await(); 3193 } catch (Throwable e) { 3194 throw new RuntimeException(e); 3195 } 3196 } 3197 } 3198 } 3199 3200 } 3201 3202 public static class MyCompactingMemStore3 extends CompactingMemStore { 3203 private static final String LARGE_CELL_THREAD_NAME = "largeCellThread"; 3204 private static final String SMALL_CELL_THREAD_NAME = "smallCellThread"; 3205 3206 private final CyclicBarrier preCyclicBarrier = new CyclicBarrier(2); 3207 private final CyclicBarrier postCyclicBarrier = new CyclicBarrier(2); 3208 private final AtomicInteger flushCounter = new AtomicInteger(0); 3209 private static final int CELL_COUNT = 5; 3210 private boolean flushByteSizeLessThanSmallAndLargeCellSize = true; 3211 3212 public MyCompactingMemStore3(Configuration conf, CellComparatorImpl cellComparator, 3213 HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy) 3214 throws IOException { 3215 super(conf, cellComparator, store, regionServices, compactionPolicy); 3216 } 3217 3218 @Override 3219 protected boolean checkAndAddToActiveSize(MutableSegment currActive, Cell cellToAdd, 3220 MemStoreSizing memstoreSizing) { 3221 if (!flushByteSizeLessThanSmallAndLargeCellSize) { 3222 return super.checkAndAddToActiveSize(currActive, cellToAdd, memstoreSizing); 3223 } 3224 if (Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)) { 3225 try { 3226 preCyclicBarrier.await(); 3227 } catch (Throwable e) { 3228 throw new RuntimeException(e); 3229 } 3230 } 3231 3232 boolean returnValue = super.checkAndAddToActiveSize(currActive, cellToAdd, memstoreSizing); 3233 if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) { 3234 try { 3235 preCyclicBarrier.await(); 3236 } catch (Throwable e) { 3237 throw new RuntimeException(e); 3238 } 3239 } 3240 return returnValue; 3241 } 3242 3243 @Override 3244 protected void postUpdate(MutableSegment currentActiveMutableSegment) { 3245 super.postUpdate(currentActiveMutableSegment); 3246 if (!flushByteSizeLessThanSmallAndLargeCellSize) { 3247 try { 3248 postCyclicBarrier.await(); 3249 } catch (Throwable e) { 3250 throw new RuntimeException(e); 3251 } 3252 return; 3253 } 3254 3255 if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) { 3256 try { 3257 postCyclicBarrier.await(); 3258 } catch (Throwable e) { 3259 throw new RuntimeException(e); 3260 } 3261 } 3262 } 3263 3264 @Override 3265 protected void flushInMemory(MutableSegment currentActiveMutableSegment) { 3266 super.flushInMemory(currentActiveMutableSegment); 3267 flushCounter.incrementAndGet(); 3268 if (!flushByteSizeLessThanSmallAndLargeCellSize) { 3269 return; 3270 } 3271 3272 assertTrue(Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)); 3273 try { 3274 postCyclicBarrier.await(); 3275 } catch (Throwable e) { 3276 throw new RuntimeException(e); 3277 } 3278 3279 } 3280 3281 void disableCompaction() { 3282 allowCompaction.set(false); 3283 } 3284 3285 void enableCompaction() { 3286 allowCompaction.set(true); 3287 } 3288 3289 } 3290 3291 public static class MyCompactingMemStore4 extends CompactingMemStore { 3292 private static final String TAKE_SNAPSHOT_THREAD_NAME = "takeSnapShotThread"; 3293 /** 3294 * {@link CompactingMemStore#flattenOneSegment} must execute after 3295 * {@link CompactingMemStore#getImmutableSegments} 3296 */ 3297 private final CyclicBarrier flattenOneSegmentPreCyclicBarrier = new CyclicBarrier(2); 3298 /** 3299 * Only after {@link CompactingMemStore#flattenOneSegment} completed, 3300 * {@link CompactingMemStore#swapPipelineWithNull} could execute. 3301 */ 3302 private final CyclicBarrier flattenOneSegmentPostCyclicBarrier = new CyclicBarrier(2); 3303 /** 3304 * Only the in memory compact thread enters {@link CompactingMemStore#flattenOneSegment},the 3305 * snapshot thread starts {@link CompactingMemStore#snapshot},because 3306 * {@link CompactingMemStore#snapshot} would invoke {@link CompactingMemStore#stopCompaction}. 3307 */ 3308 private final CyclicBarrier snapShotStartCyclicCyclicBarrier = new CyclicBarrier(2); 3309 /** 3310 * To wait for {@link CompactingMemStore.InMemoryCompactionRunnable} stopping. 3311 */ 3312 private final CyclicBarrier inMemoryCompactionEndCyclicBarrier = new CyclicBarrier(2); 3313 private final AtomicInteger getImmutableSegmentsListCounter = new AtomicInteger(0); 3314 private final AtomicInteger swapPipelineWithNullCounter = new AtomicInteger(0); 3315 private final AtomicInteger flattenOneSegmentCounter = new AtomicInteger(0); 3316 private final AtomicInteger setInMemoryCompactionFlagCounter = new AtomicInteger(0); 3317 3318 public MyCompactingMemStore4(Configuration conf, CellComparatorImpl cellComparator, 3319 HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy) 3320 throws IOException { 3321 super(conf, cellComparator, store, regionServices, compactionPolicy); 3322 } 3323 3324 @Override 3325 public VersionedSegmentsList getImmutableSegments() { 3326 VersionedSegmentsList result = super.getImmutableSegments(); 3327 if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) { 3328 int currentCount = getImmutableSegmentsListCounter.incrementAndGet(); 3329 if (currentCount <= 1) { 3330 try { 3331 flattenOneSegmentPreCyclicBarrier.await(); 3332 } catch (Throwable e) { 3333 throw new RuntimeException(e); 3334 } 3335 } 3336 } 3337 return result; 3338 } 3339 3340 @Override 3341 protected boolean swapPipelineWithNull(VersionedSegmentsList segments) { 3342 if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) { 3343 int currentCount = swapPipelineWithNullCounter.incrementAndGet(); 3344 if (currentCount <= 1) { 3345 try { 3346 flattenOneSegmentPostCyclicBarrier.await(); 3347 } catch (Throwable e) { 3348 throw new RuntimeException(e); 3349 } 3350 } 3351 } 3352 boolean result = super.swapPipelineWithNull(segments); 3353 if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) { 3354 int currentCount = swapPipelineWithNullCounter.get(); 3355 if (currentCount <= 1) { 3356 assertTrue(!result); 3357 } 3358 if (currentCount == 2) { 3359 assertTrue(result); 3360 } 3361 } 3362 return result; 3363 3364 } 3365 3366 @Override 3367 public void flattenOneSegment(long requesterVersion, Action action) { 3368 int currentCount = flattenOneSegmentCounter.incrementAndGet(); 3369 if (currentCount <= 1) { 3370 try { 3371 /** 3372 * {@link CompactingMemStore#snapshot} could start. 3373 */ 3374 snapShotStartCyclicCyclicBarrier.await(); 3375 flattenOneSegmentPreCyclicBarrier.await(); 3376 } catch (Throwable e) { 3377 throw new RuntimeException(e); 3378 } 3379 } 3380 super.flattenOneSegment(requesterVersion, action); 3381 if (currentCount <= 1) { 3382 try { 3383 flattenOneSegmentPostCyclicBarrier.await(); 3384 } catch (Throwable e) { 3385 throw new RuntimeException(e); 3386 } 3387 } 3388 } 3389 3390 @Override 3391 protected boolean setInMemoryCompactionFlag() { 3392 boolean result = super.setInMemoryCompactionFlag(); 3393 assertTrue(result); 3394 setInMemoryCompactionFlagCounter.incrementAndGet(); 3395 return result; 3396 } 3397 3398 @Override 3399 void inMemoryCompaction() { 3400 try { 3401 super.inMemoryCompaction(); 3402 } finally { 3403 try { 3404 inMemoryCompactionEndCyclicBarrier.await(); 3405 } catch (Throwable e) { 3406 throw new RuntimeException(e); 3407 } 3408 3409 } 3410 } 3411 3412 } 3413 3414 public static class MyCompactingMemStore5 extends CompactingMemStore { 3415 private static final String TAKE_SNAPSHOT_THREAD_NAME = "takeSnapShotThread"; 3416 private static final String WRITE_AGAIN_THREAD_NAME = "writeAgainThread"; 3417 /** 3418 * {@link CompactingMemStore#flattenOneSegment} must execute after 3419 * {@link CompactingMemStore#getImmutableSegments} 3420 */ 3421 private final CyclicBarrier flattenOneSegmentPreCyclicBarrier = new CyclicBarrier(2); 3422 /** 3423 * Only after {@link CompactingMemStore#flattenOneSegment} completed, 3424 * {@link CompactingMemStore#swapPipelineWithNull} could execute. 3425 */ 3426 private final CyclicBarrier flattenOneSegmentPostCyclicBarrier = new CyclicBarrier(2); 3427 /** 3428 * Only the in memory compact thread enters {@link CompactingMemStore#flattenOneSegment},the 3429 * snapshot thread starts {@link CompactingMemStore#snapshot},because 3430 * {@link CompactingMemStore#snapshot} would invoke {@link CompactingMemStore#stopCompaction}. 3431 */ 3432 private final CyclicBarrier snapShotStartCyclicCyclicBarrier = new CyclicBarrier(2); 3433 /** 3434 * To wait for {@link CompactingMemStore.InMemoryCompactionRunnable} stopping. 3435 */ 3436 private final CyclicBarrier inMemoryCompactionEndCyclicBarrier = new CyclicBarrier(2); 3437 private final AtomicInteger getImmutableSegmentsListCounter = new AtomicInteger(0); 3438 private final AtomicInteger swapPipelineWithNullCounter = new AtomicInteger(0); 3439 private final AtomicInteger flattenOneSegmentCounter = new AtomicInteger(0); 3440 private final AtomicInteger setInMemoryCompactionFlagCounter = new AtomicInteger(0); 3441 /** 3442 * Only the snapshot thread retry {@link CompactingMemStore#swapPipelineWithNull}, writeAgain 3443 * thread could start. 3444 */ 3445 private final CyclicBarrier writeMemStoreAgainStartCyclicBarrier = new CyclicBarrier(2); 3446 /** 3447 * This is used for snapshot thread,writeAgain thread and in memory compact thread. Only the 3448 * writeAgain thread completes, {@link CompactingMemStore#swapPipelineWithNull} would 3449 * execute,and in memory compact thread would exit,because we expect that in memory compact 3450 * executing only once. 3451 */ 3452 private final CyclicBarrier writeMemStoreAgainEndCyclicBarrier = new CyclicBarrier(3); 3453 3454 public MyCompactingMemStore5(Configuration conf, CellComparatorImpl cellComparator, 3455 HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy) 3456 throws IOException { 3457 super(conf, cellComparator, store, regionServices, compactionPolicy); 3458 } 3459 3460 @Override 3461 public VersionedSegmentsList getImmutableSegments() { 3462 VersionedSegmentsList result = super.getImmutableSegments(); 3463 if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) { 3464 int currentCount = getImmutableSegmentsListCounter.incrementAndGet(); 3465 if (currentCount <= 1) { 3466 try { 3467 flattenOneSegmentPreCyclicBarrier.await(); 3468 } catch (Throwable e) { 3469 throw new RuntimeException(e); 3470 } 3471 } 3472 3473 } 3474 3475 return result; 3476 } 3477 3478 @Override 3479 protected boolean swapPipelineWithNull(VersionedSegmentsList segments) { 3480 if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) { 3481 int currentCount = swapPipelineWithNullCounter.incrementAndGet(); 3482 if (currentCount <= 1) { 3483 try { 3484 flattenOneSegmentPostCyclicBarrier.await(); 3485 } catch (Throwable e) { 3486 throw new RuntimeException(e); 3487 } 3488 } 3489 3490 if (currentCount == 2) { 3491 try { 3492 /** 3493 * Only the snapshot thread retry {@link CompactingMemStore#swapPipelineWithNull}, 3494 * writeAgain thread could start. 3495 */ 3496 writeMemStoreAgainStartCyclicBarrier.await(); 3497 /** 3498 * Only the writeAgain thread completes, retry 3499 * {@link CompactingMemStore#swapPipelineWithNull} would execute. 3500 */ 3501 writeMemStoreAgainEndCyclicBarrier.await(); 3502 } catch (Throwable e) { 3503 throw new RuntimeException(e); 3504 } 3505 } 3506 3507 } 3508 boolean result = super.swapPipelineWithNull(segments); 3509 if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) { 3510 int currentCount = swapPipelineWithNullCounter.get(); 3511 if (currentCount <= 1) { 3512 assertTrue(!result); 3513 } 3514 if (currentCount == 2) { 3515 assertTrue(result); 3516 } 3517 } 3518 return result; 3519 3520 } 3521 3522 @Override 3523 public void flattenOneSegment(long requesterVersion, Action action) { 3524 int currentCount = flattenOneSegmentCounter.incrementAndGet(); 3525 if (currentCount <= 1) { 3526 try { 3527 /** 3528 * {@link CompactingMemStore#snapshot} could start. 3529 */ 3530 snapShotStartCyclicCyclicBarrier.await(); 3531 flattenOneSegmentPreCyclicBarrier.await(); 3532 } catch (Throwable e) { 3533 throw new RuntimeException(e); 3534 } 3535 } 3536 super.flattenOneSegment(requesterVersion, action); 3537 if (currentCount <= 1) { 3538 try { 3539 flattenOneSegmentPostCyclicBarrier.await(); 3540 /** 3541 * Only the writeAgain thread completes, in memory compact thread would exit,because we 3542 * expect that in memory compact executing only once. 3543 */ 3544 writeMemStoreAgainEndCyclicBarrier.await(); 3545 } catch (Throwable e) { 3546 throw new RuntimeException(e); 3547 } 3548 3549 } 3550 } 3551 3552 @Override 3553 protected boolean setInMemoryCompactionFlag() { 3554 boolean result = super.setInMemoryCompactionFlag(); 3555 int count = setInMemoryCompactionFlagCounter.incrementAndGet(); 3556 if (count <= 1) { 3557 assertTrue(result); 3558 } 3559 if (count == 2) { 3560 assertTrue(!result); 3561 } 3562 return result; 3563 } 3564 3565 @Override 3566 void inMemoryCompaction() { 3567 try { 3568 super.inMemoryCompaction(); 3569 } finally { 3570 try { 3571 inMemoryCompactionEndCyclicBarrier.await(); 3572 } catch (Throwable e) { 3573 throw new RuntimeException(e); 3574 } 3575 3576 } 3577 } 3578 } 3579 3580 public static class MyCompactingMemStore6 extends CompactingMemStore { 3581 private final CyclicBarrier inMemoryCompactionEndCyclicBarrier = new CyclicBarrier(2); 3582 3583 public MyCompactingMemStore6(Configuration conf, CellComparatorImpl cellComparator, 3584 HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy) 3585 throws IOException { 3586 super(conf, cellComparator, store, regionServices, compactionPolicy); 3587 } 3588 3589 @Override 3590 void inMemoryCompaction() { 3591 try { 3592 super.inMemoryCompaction(); 3593 } finally { 3594 try { 3595 inMemoryCompactionEndCyclicBarrier.await(); 3596 } catch (Throwable e) { 3597 throw new RuntimeException(e); 3598 } 3599 3600 } 3601 } 3602 } 3603 3604 public static class MyDefaultMemStore extends DefaultMemStore { 3605 private static final String GET_SCANNER_THREAD_NAME = "getScannerMyThread"; 3606 private static final String FLUSH_THREAD_NAME = "flushMyThread"; 3607 /** 3608 * Only when flush thread enters {@link DefaultMemStore#doClearSnapShot}, getScanner thread 3609 * could start. 3610 */ 3611 private final CyclicBarrier getScannerCyclicBarrier = new CyclicBarrier(2); 3612 /** 3613 * Used by getScanner thread notifies flush thread {@link DefaultMemStore#getSnapshotSegments} 3614 * completed, {@link DefaultMemStore#doClearSnapShot} could continue. 3615 */ 3616 private final CyclicBarrier preClearSnapShotCyclicBarrier = new CyclicBarrier(2); 3617 /** 3618 * Used by flush thread notifies getScanner thread {@link DefaultMemStore#doClearSnapShot} 3619 * completed, {@link DefaultMemStore#getScanners} could continue. 3620 */ 3621 private final CyclicBarrier postClearSnapShotCyclicBarrier = new CyclicBarrier(2); 3622 private final AtomicInteger getSnapshotSegmentsCounter = new AtomicInteger(0); 3623 private final AtomicInteger clearSnapshotCounter = new AtomicInteger(0); 3624 private volatile boolean shouldWait = true; 3625 private volatile HStore store = null; 3626 3627 public MyDefaultMemStore(Configuration conf, CellComparator cellComparator, 3628 RegionServicesForStores regionServices) throws IOException { 3629 super(conf, cellComparator, regionServices); 3630 } 3631 3632 @Override 3633 protected List<Segment> getSnapshotSegments() { 3634 3635 List<Segment> result = super.getSnapshotSegments(); 3636 3637 if (Thread.currentThread().getName().equals(GET_SCANNER_THREAD_NAME)) { 3638 int currentCount = getSnapshotSegmentsCounter.incrementAndGet(); 3639 if (currentCount == 1) { 3640 if (this.shouldWait) { 3641 try { 3642 /** 3643 * Notify flush thread {@link DefaultMemStore#getSnapshotSegments} completed, 3644 * {@link DefaultMemStore#doClearSnapShot} could continue. 3645 */ 3646 preClearSnapShotCyclicBarrier.await(); 3647 /** 3648 * Wait for {@link DefaultMemStore#doClearSnapShot} completed. 3649 */ 3650 postClearSnapShotCyclicBarrier.await(); 3651 3652 } catch (Throwable e) { 3653 throw new RuntimeException(e); 3654 } 3655 } 3656 } 3657 } 3658 return result; 3659 } 3660 3661 @Override 3662 protected void doClearSnapShot() { 3663 if (Thread.currentThread().getName().equals(FLUSH_THREAD_NAME)) { 3664 int currentCount = clearSnapshotCounter.incrementAndGet(); 3665 if (currentCount == 1) { 3666 try { 3667 if ( 3668 ((ReentrantReadWriteLock) store.getStoreEngine().getLock()) 3669 .isWriteLockedByCurrentThread() 3670 ) { 3671 shouldWait = false; 3672 } 3673 /** 3674 * Only when flush thread enters {@link DefaultMemStore#doClearSnapShot}, getScanner 3675 * thread could start. 3676 */ 3677 getScannerCyclicBarrier.await(); 3678 3679 if (shouldWait) { 3680 /** 3681 * Wait for {@link DefaultMemStore#getSnapshotSegments} completed. 3682 */ 3683 preClearSnapShotCyclicBarrier.await(); 3684 } 3685 } catch (Throwable e) { 3686 throw new RuntimeException(e); 3687 } 3688 } 3689 } 3690 super.doClearSnapShot(); 3691 3692 if (Thread.currentThread().getName().equals(FLUSH_THREAD_NAME)) { 3693 int currentCount = clearSnapshotCounter.get(); 3694 if (currentCount == 1) { 3695 if (shouldWait) { 3696 try { 3697 /** 3698 * Notify getScanner thread {@link DefaultMemStore#doClearSnapShot} completed, 3699 * {@link DefaultMemStore#getScanners} could continue. 3700 */ 3701 postClearSnapShotCyclicBarrier.await(); 3702 } catch (Throwable e) { 3703 throw new RuntimeException(e); 3704 } 3705 } 3706 } 3707 } 3708 } 3709 } 3710}