001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.io.hfile.CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY; 021import static org.apache.hadoop.hbase.io.hfile.CacheConfig.CACHE_DATA_ON_READ_KEY; 022import static org.apache.hadoop.hbase.io.hfile.CacheConfig.DEFAULT_CACHE_DATA_ON_READ; 023import static org.apache.hadoop.hbase.io.hfile.CacheConfig.DEFAULT_CACHE_DATA_ON_WRITE; 024import static org.apache.hadoop.hbase.io.hfile.CacheConfig.DEFAULT_EVICT_ON_CLOSE; 025import static org.apache.hadoop.hbase.io.hfile.CacheConfig.EVICT_BLOCKS_ON_CLOSE_KEY; 026import static org.apache.hadoop.hbase.regionserver.DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY; 027import static org.junit.Assert.assertArrayEquals; 028import static org.junit.Assert.assertEquals; 029import static org.junit.Assert.assertFalse; 030import static org.junit.Assert.assertNull; 031import static org.junit.Assert.assertTrue; 032import static org.junit.Assert.fail; 033import static org.mockito.ArgumentMatchers.any; 034import static org.mockito.Mockito.mock; 035import static org.mockito.Mockito.spy; 036import static org.mockito.Mockito.times; 037import static org.mockito.Mockito.verify; 038import static org.mockito.Mockito.when; 039 040import java.io.FileNotFoundException; 041import java.io.IOException; 042import java.lang.ref.SoftReference; 043import java.security.PrivilegedExceptionAction; 044import java.util.ArrayList; 045import java.util.Arrays; 046import java.util.Collection; 047import java.util.Collections; 048import java.util.Iterator; 049import java.util.List; 050import java.util.ListIterator; 051import java.util.NavigableSet; 052import java.util.Optional; 053import java.util.TreeSet; 054import java.util.concurrent.BrokenBarrierException; 055import java.util.concurrent.ConcurrentSkipListSet; 056import java.util.concurrent.CountDownLatch; 057import java.util.concurrent.CyclicBarrier; 058import java.util.concurrent.ExecutorService; 059import java.util.concurrent.Executors; 060import java.util.concurrent.Future; 061import java.util.concurrent.ThreadPoolExecutor; 062import java.util.concurrent.TimeUnit; 063import java.util.concurrent.atomic.AtomicBoolean; 064import java.util.concurrent.atomic.AtomicInteger; 065import java.util.concurrent.atomic.AtomicLong; 066import java.util.concurrent.atomic.AtomicReference; 067import java.util.concurrent.locks.ReentrantReadWriteLock; 068import java.util.function.Consumer; 069import java.util.function.IntBinaryOperator; 070import java.util.function.IntConsumer; 071import org.apache.hadoop.conf.Configuration; 072import org.apache.hadoop.fs.FSDataOutputStream; 073import org.apache.hadoop.fs.FileStatus; 074import org.apache.hadoop.fs.FileSystem; 075import org.apache.hadoop.fs.FilterFileSystem; 076import org.apache.hadoop.fs.LocalFileSystem; 077import org.apache.hadoop.fs.Path; 078import org.apache.hadoop.fs.permission.FsPermission; 079import org.apache.hadoop.hbase.Cell; 080import org.apache.hadoop.hbase.CellBuilderType; 081import org.apache.hadoop.hbase.CellComparator; 082import org.apache.hadoop.hbase.CellComparatorImpl; 083import org.apache.hadoop.hbase.CellUtil; 084import org.apache.hadoop.hbase.ExtendedCell; 085import org.apache.hadoop.hbase.ExtendedCellBuilderFactory; 086import org.apache.hadoop.hbase.HBaseClassTestRule; 087import org.apache.hadoop.hbase.HBaseConfiguration; 088import org.apache.hadoop.hbase.HBaseTestingUtil; 089import org.apache.hadoop.hbase.HConstants; 090import org.apache.hadoop.hbase.KeyValue; 091import org.apache.hadoop.hbase.MemoryCompactionPolicy; 092import org.apache.hadoop.hbase.NamespaceDescriptor; 093import org.apache.hadoop.hbase.PrivateCellUtil; 094import org.apache.hadoop.hbase.TableName; 095import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 096import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 097import org.apache.hadoop.hbase.client.Get; 098import org.apache.hadoop.hbase.client.RegionInfo; 099import org.apache.hadoop.hbase.client.RegionInfoBuilder; 100import org.apache.hadoop.hbase.client.Scan; 101import org.apache.hadoop.hbase.client.Scan.ReadType; 102import org.apache.hadoop.hbase.client.TableDescriptor; 103import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 104import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException; 105import org.apache.hadoop.hbase.filter.Filter; 106import org.apache.hadoop.hbase.filter.FilterBase; 107import org.apache.hadoop.hbase.io.compress.Compression; 108import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 109import org.apache.hadoop.hbase.io.hfile.CacheConfig; 110import org.apache.hadoop.hbase.io.hfile.HFile; 111import org.apache.hadoop.hbase.io.hfile.HFileContext; 112import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 113import org.apache.hadoop.hbase.monitoring.MonitoredTask; 114import org.apache.hadoop.hbase.nio.RefCnt; 115import org.apache.hadoop.hbase.quotas.RegionSizeStoreImpl; 116import org.apache.hadoop.hbase.regionserver.ChunkCreator.ChunkType; 117import org.apache.hadoop.hbase.regionserver.MemStoreCompactionStrategy.Action; 118import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; 119import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; 120import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; 121import org.apache.hadoop.hbase.regionserver.compactions.EverythingPolicy; 122import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher; 123import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker; 124import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory; 125import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; 126import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 127import org.apache.hadoop.hbase.security.User; 128import org.apache.hadoop.hbase.testclassification.MediumTests; 129import org.apache.hadoop.hbase.testclassification.RegionServerTests; 130import org.apache.hadoop.hbase.util.BloomFilterUtil; 131import org.apache.hadoop.hbase.util.Bytes; 132import org.apache.hadoop.hbase.util.CommonFSUtils; 133import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 134import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; 135import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge; 136import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; 137import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 138import org.apache.hadoop.hbase.wal.WALFactory; 139import org.apache.hadoop.util.Progressable; 140import org.junit.After; 141import org.junit.AfterClass; 142import org.junit.Before; 143import org.junit.ClassRule; 144import org.junit.Rule; 145import org.junit.Test; 146import org.junit.experimental.categories.Category; 147import org.junit.rules.TestName; 148import org.mockito.Mockito; 149import org.slf4j.Logger; 150import org.slf4j.LoggerFactory; 151 152import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 153 154/** 155 * Test class for the HStore 156 */ 157@Category({ RegionServerTests.class, MediumTests.class }) 158public class TestHStore { 159 160 @ClassRule 161 public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestHStore.class); 162 163 private static final Logger LOG = LoggerFactory.getLogger(TestHStore.class); 164 @Rule 165 public TestName name = new TestName(); 166 167 HRegion region; 168 HStore store; 169 byte[] table = Bytes.toBytes("table"); 170 byte[] family = Bytes.toBytes("family"); 171 172 byte[] row = Bytes.toBytes("row"); 173 byte[] row2 = Bytes.toBytes("row2"); 174 byte[] qf1 = Bytes.toBytes("qf1"); 175 byte[] qf2 = Bytes.toBytes("qf2"); 176 byte[] qf3 = Bytes.toBytes("qf3"); 177 byte[] qf4 = Bytes.toBytes("qf4"); 178 byte[] qf5 = Bytes.toBytes("qf5"); 179 byte[] qf6 = Bytes.toBytes("qf6"); 180 181 NavigableSet<byte[]> qualifiers = new ConcurrentSkipListSet<>(Bytes.BYTES_COMPARATOR); 182 183 List<Cell> expected = new ArrayList<>(); 184 List<Cell> result = new ArrayList<>(); 185 186 long id = EnvironmentEdgeManager.currentTime(); 187 Get get = new Get(row); 188 189 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 190 private static final String DIR = TEST_UTIL.getDataTestDir("TestStore").toString(); 191 192 @Before 193 public void setUp() throws IOException { 194 qualifiers.clear(); 195 qualifiers.add(qf1); 196 qualifiers.add(qf3); 197 qualifiers.add(qf5); 198 199 Iterator<byte[]> iter = qualifiers.iterator(); 200 while (iter.hasNext()) { 201 byte[] next = iter.next(); 202 expected.add(new KeyValue(row, family, next, 1, (byte[]) null)); 203 get.addColumn(family, next); 204 } 205 } 206 207 private void init(String methodName) throws IOException { 208 init(methodName, TEST_UTIL.getConfiguration()); 209 } 210 211 private HStore init(String methodName, Configuration conf) throws IOException { 212 // some of the tests write 4 versions and then flush 213 // (with HBASE-4241, lower versions are collected on flush) 214 return init(methodName, conf, 215 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(4).build()); 216 } 217 218 private HStore init(String methodName, Configuration conf, ColumnFamilyDescriptor hcd) 219 throws IOException { 220 return init(methodName, conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), hcd); 221 } 222 223 private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder, 224 ColumnFamilyDescriptor hcd) throws IOException { 225 return init(methodName, conf, builder, hcd, null); 226 } 227 228 private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder, 229 ColumnFamilyDescriptor hcd, MyStoreHook hook) throws IOException { 230 return init(methodName, conf, builder, hcd, hook, false); 231 } 232 233 private void initHRegion(String methodName, Configuration conf, TableDescriptorBuilder builder, 234 ColumnFamilyDescriptor hcd, MyStoreHook hook, boolean switchToPread) throws IOException { 235 TableDescriptor htd = builder.setColumnFamily(hcd).build(); 236 Path basedir = new Path(DIR + methodName); 237 Path tableDir = CommonFSUtils.getTableDir(basedir, htd.getTableName()); 238 final Path logdir = new Path(basedir, AbstractFSWALProvider.getWALDirectoryName(methodName)); 239 240 FileSystem fs = FileSystem.get(conf); 241 242 fs.delete(logdir, true); 243 ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 244 MemStoreLABImpl.CHUNK_SIZE_DEFAULT, 1, 0, null, 245 MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); 246 RegionInfo info = RegionInfoBuilder.newBuilder(htd.getTableName()).build(); 247 Configuration walConf = new Configuration(conf); 248 CommonFSUtils.setRootDir(walConf, basedir); 249 WALFactory wals = new WALFactory(walConf, methodName); 250 region = new HRegion(new HRegionFileSystem(conf, fs, tableDir, info), wals.getWAL(info), conf, 251 htd, null); 252 region.regionServicesForStores = Mockito.spy(region.regionServicesForStores); 253 ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1); 254 Mockito.when(region.regionServicesForStores.getInMemoryCompactionPool()).thenReturn(pool); 255 } 256 257 private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder, 258 ColumnFamilyDescriptor hcd, MyStoreHook hook, boolean switchToPread) throws IOException { 259 initHRegion(methodName, conf, builder, hcd, hook, switchToPread); 260 if (hook == null) { 261 store = new HStore(region, hcd, conf, false); 262 } else { 263 store = new MyStore(region, hcd, conf, hook, switchToPread); 264 } 265 region.stores.put(store.getColumnFamilyDescriptor().getName(), store); 266 return store; 267 } 268 269 /** 270 * Test we do not lose data if we fail a flush and then close. Part of HBase-10466 271 */ 272 @Test 273 public void testFlushSizeSizing() throws Exception { 274 LOG.info("Setting up a faulty file system that cannot write in " + this.name.getMethodName()); 275 final Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); 276 // Only retry once. 277 conf.setInt("hbase.hstore.flush.retries.number", 1); 278 User user = User.createUserForTesting(conf, this.name.getMethodName(), new String[] { "foo" }); 279 // Inject our faulty LocalFileSystem 280 conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class); 281 user.runAs(new PrivilegedExceptionAction<Object>() { 282 @Override 283 public Object run() throws Exception { 284 // Make sure it worked (above is sensitive to caching details in hadoop core) 285 FileSystem fs = FileSystem.get(conf); 286 assertEquals(FaultyFileSystem.class, fs.getClass()); 287 FaultyFileSystem ffs = (FaultyFileSystem) fs; 288 289 // Initialize region 290 init(name.getMethodName(), conf); 291 292 MemStoreSize mss = store.memstore.getFlushableSize(); 293 assertEquals(0, mss.getDataSize()); 294 LOG.info("Adding some data"); 295 MemStoreSizing kvSize = new NonThreadSafeMemStoreSizing(); 296 store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), kvSize); 297 // add the heap size of active (mutable) segment 298 kvSize.incMemStoreSize(0, MutableSegment.DEEP_OVERHEAD, 0, 0); 299 mss = store.memstore.getFlushableSize(); 300 assertEquals(kvSize.getMemStoreSize(), mss); 301 // Flush. Bug #1 from HBASE-10466. Make sure size calculation on failed flush is right. 302 try { 303 LOG.info("Flushing"); 304 flushStore(store, id++); 305 fail("Didn't bubble up IOE!"); 306 } catch (IOException ioe) { 307 assertTrue(ioe.getMessage().contains("Fault injected")); 308 } 309 // due to snapshot, change mutable to immutable segment 310 kvSize.incMemStoreSize(0, 311 CSLMImmutableSegment.DEEP_OVERHEAD_CSLM - MutableSegment.DEEP_OVERHEAD, 0, 0); 312 mss = store.memstore.getFlushableSize(); 313 assertEquals(kvSize.getMemStoreSize(), mss); 314 MemStoreSizing kvSize2 = new NonThreadSafeMemStoreSizing(); 315 store.add(new KeyValue(row, family, qf2, 2, (byte[]) null), kvSize2); 316 kvSize2.incMemStoreSize(0, MutableSegment.DEEP_OVERHEAD, 0, 0); 317 // Even though we add a new kv, we expect the flushable size to be 'same' since we have 318 // not yet cleared the snapshot -- the above flush failed. 319 assertEquals(kvSize.getMemStoreSize(), mss); 320 ffs.fault.set(false); 321 flushStore(store, id++); 322 mss = store.memstore.getFlushableSize(); 323 // Size should be the foreground kv size. 324 assertEquals(kvSize2.getMemStoreSize(), mss); 325 flushStore(store, id++); 326 mss = store.memstore.getFlushableSize(); 327 assertEquals(0, mss.getDataSize()); 328 assertEquals(MutableSegment.DEEP_OVERHEAD, mss.getHeapSize()); 329 return null; 330 } 331 }); 332 } 333 334 @Test 335 public void testStoreBloomFilterMetricsWithBloomRowCol() throws IOException { 336 int numStoreFiles = 5; 337 writeAndRead(BloomType.ROWCOL, numStoreFiles); 338 339 assertEquals(0, store.getBloomFilterEligibleRequestsCount()); 340 // hard to know exactly the numbers here, we are just trying to 341 // prove that they are incrementing 342 assertTrue(store.getBloomFilterRequestsCount() >= numStoreFiles); 343 assertTrue(store.getBloomFilterNegativeResultsCount() > 0); 344 } 345 346 @Test 347 public void testStoreBloomFilterMetricsWithBloomRow() throws IOException { 348 int numStoreFiles = 5; 349 writeAndRead(BloomType.ROWCOL, numStoreFiles); 350 351 assertEquals(0, store.getBloomFilterEligibleRequestsCount()); 352 // hard to know exactly the numbers here, we are just trying to 353 // prove that they are incrementing 354 assertTrue(store.getBloomFilterRequestsCount() >= numStoreFiles); 355 assertTrue(store.getBloomFilterNegativeResultsCount() > 0); 356 } 357 358 @Test 359 public void testStoreBloomFilterMetricsWithBloomRowPrefix() throws IOException { 360 int numStoreFiles = 5; 361 writeAndRead(BloomType.ROWPREFIX_FIXED_LENGTH, numStoreFiles); 362 363 assertEquals(0, store.getBloomFilterEligibleRequestsCount()); 364 // hard to know exactly the numbers here, we are just trying to 365 // prove that they are incrementing 366 assertTrue(store.getBloomFilterRequestsCount() >= numStoreFiles); 367 } 368 369 @Test 370 public void testStoreBloomFilterMetricsWithBloomNone() throws IOException { 371 int numStoreFiles = 5; 372 writeAndRead(BloomType.NONE, numStoreFiles); 373 374 assertEquals(0, store.getBloomFilterRequestsCount()); 375 assertEquals(0, store.getBloomFilterNegativeResultsCount()); 376 377 // hard to know exactly the numbers here, we are just trying to 378 // prove that they are incrementing 379 assertTrue(store.getBloomFilterEligibleRequestsCount() >= numStoreFiles); 380 } 381 382 private void writeAndRead(BloomType bloomType, int numStoreFiles) throws IOException { 383 Configuration conf = HBaseConfiguration.create(); 384 FileSystem fs = FileSystem.get(conf); 385 386 ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.newBuilder(family) 387 .setCompressionType(Compression.Algorithm.GZ).setBloomFilterType(bloomType) 388 .setConfiguration(BloomFilterUtil.PREFIX_LENGTH_KEY, "3").build(); 389 init(name.getMethodName(), conf, hcd); 390 391 for (int i = 1; i <= numStoreFiles; i++) { 392 byte[] row = Bytes.toBytes("row" + i); 393 LOG.info("Adding some data for the store file #" + i); 394 long timeStamp = EnvironmentEdgeManager.currentTime(); 395 this.store.add(new KeyValue(row, family, qf1, timeStamp, (byte[]) null), null); 396 this.store.add(new KeyValue(row, family, qf2, timeStamp, (byte[]) null), null); 397 this.store.add(new KeyValue(row, family, qf3, timeStamp, (byte[]) null), null); 398 flush(i); 399 } 400 401 // Verify the total number of store files 402 assertEquals(numStoreFiles, this.store.getStorefiles().size()); 403 404 TreeSet<byte[]> columns = new TreeSet<>(Bytes.BYTES_COMPARATOR); 405 columns.add(qf1); 406 407 for (int i = 1; i <= numStoreFiles; i++) { 408 KeyValueScanner scanner = 409 store.getScanner(new Scan(new Get(Bytes.toBytes("row" + i))), columns, 0); 410 scanner.peek(); 411 } 412 } 413 414 /** 415 * Verify that compression and data block encoding are respected by the createWriter method, used 416 * on store flush. 417 */ 418 @Test 419 public void testCreateWriter() throws Exception { 420 Configuration conf = HBaseConfiguration.create(); 421 FileSystem fs = FileSystem.get(conf); 422 423 ColumnFamilyDescriptor hcd = 424 ColumnFamilyDescriptorBuilder.newBuilder(family).setCompressionType(Compression.Algorithm.GZ) 425 .setDataBlockEncoding(DataBlockEncoding.DIFF).build(); 426 init(name.getMethodName(), conf, hcd); 427 428 // Test createWriter 429 StoreFileWriter writer = store.getStoreEngine() 430 .createWriter(CreateStoreFileWriterParams.create().maxKeyCount(4) 431 .compression(hcd.getCompressionType()).isCompaction(false).includeMVCCReadpoint(true) 432 .includesTag(false).shouldDropBehind(false)); 433 Path path = writer.getPath(); 434 writer.append(new KeyValue(row, family, qf1, Bytes.toBytes(1))); 435 writer.append(new KeyValue(row, family, qf2, Bytes.toBytes(2))); 436 writer.append(new KeyValue(row2, family, qf1, Bytes.toBytes(3))); 437 writer.append(new KeyValue(row2, family, qf2, Bytes.toBytes(4))); 438 writer.close(); 439 440 // Verify that compression and encoding settings are respected 441 HFile.Reader reader = HFile.createReader(fs, path, new CacheConfig(conf), true, conf); 442 assertEquals(hcd.getCompressionType(), reader.getTrailer().getCompressionCodec()); 443 assertEquals(hcd.getDataBlockEncoding(), reader.getDataBlockEncoding()); 444 reader.close(); 445 } 446 447 @Test 448 public void testDeleteExpiredStoreFiles() throws Exception { 449 testDeleteExpiredStoreFiles(0); 450 testDeleteExpiredStoreFiles(1); 451 } 452 453 /** 454 * @param minVersions the MIN_VERSIONS for the column family 455 */ 456 public void testDeleteExpiredStoreFiles(int minVersions) throws Exception { 457 int storeFileNum = 4; 458 int ttl = 4; 459 IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge(); 460 EnvironmentEdgeManagerTestHelper.injectEdge(edge); 461 462 Configuration conf = HBaseConfiguration.create(); 463 // Enable the expired store file deletion 464 conf.setBoolean("hbase.store.delete.expired.storefile", true); 465 // Set the compaction threshold higher to avoid normal compactions. 466 conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 5); 467 468 init(name.getMethodName() + "-" + minVersions, conf, ColumnFamilyDescriptorBuilder 469 .newBuilder(family).setMinVersions(minVersions).setTimeToLive(ttl).build()); 470 471 long storeTtl = this.store.getScanInfo().getTtl(); 472 long sleepTime = storeTtl / storeFileNum; 473 long timeStamp; 474 // There are 4 store files and the max time stamp difference among these 475 // store files will be (this.store.ttl / storeFileNum) 476 for (int i = 1; i <= storeFileNum; i++) { 477 LOG.info("Adding some data for the store file #" + i); 478 timeStamp = EnvironmentEdgeManager.currentTime(); 479 this.store.add(new KeyValue(row, family, qf1, timeStamp, (byte[]) null), null); 480 this.store.add(new KeyValue(row, family, qf2, timeStamp, (byte[]) null), null); 481 this.store.add(new KeyValue(row, family, qf3, timeStamp, (byte[]) null), null); 482 flush(i); 483 edge.incrementTime(sleepTime); 484 } 485 486 // Verify the total number of store files 487 assertEquals(storeFileNum, this.store.getStorefiles().size()); 488 489 // Each call will find one expired store file and delete it before compaction happens. 490 // There will be no compaction due to threshold above. Last file will not be replaced. 491 for (int i = 1; i <= storeFileNum - 1; i++) { 492 // verify the expired store file. 493 assertFalse(this.store.requestCompaction().isPresent()); 494 Collection<HStoreFile> sfs = this.store.getStorefiles(); 495 // Ensure i files are gone. 496 if (minVersions == 0) { 497 assertEquals(storeFileNum - i, sfs.size()); 498 // Ensure only non-expired files remain. 499 for (HStoreFile sf : sfs) { 500 assertTrue(sf.getReader().getMaxTimestamp() >= (edge.currentTime() - storeTtl)); 501 } 502 } else { 503 assertEquals(storeFileNum, sfs.size()); 504 } 505 // Let the next store file expired. 506 edge.incrementTime(sleepTime); 507 } 508 assertFalse(this.store.requestCompaction().isPresent()); 509 510 Collection<HStoreFile> sfs = this.store.getStorefiles(); 511 // Assert the last expired file is not removed. 512 if (minVersions == 0) { 513 assertEquals(1, sfs.size()); 514 } 515 long ts = sfs.iterator().next().getReader().getMaxTimestamp(); 516 assertTrue(ts < (edge.currentTime() - storeTtl)); 517 518 for (HStoreFile sf : sfs) { 519 sf.closeStoreFile(true); 520 } 521 } 522 523 @Test 524 public void testLowestModificationTime() throws Exception { 525 Configuration conf = HBaseConfiguration.create(); 526 FileSystem fs = FileSystem.get(conf); 527 // Initialize region 528 init(name.getMethodName(), conf); 529 530 int storeFileNum = 4; 531 for (int i = 1; i <= storeFileNum; i++) { 532 LOG.info("Adding some data for the store file #" + i); 533 this.store.add(new KeyValue(row, family, qf1, i, (byte[]) null), null); 534 this.store.add(new KeyValue(row, family, qf2, i, (byte[]) null), null); 535 this.store.add(new KeyValue(row, family, qf3, i, (byte[]) null), null); 536 flush(i); 537 } 538 // after flush; check the lowest time stamp 539 long lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles()); 540 long lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles()); 541 assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS); 542 543 // after compact; check the lowest time stamp 544 store.compact(store.requestCompaction().get(), NoLimitThroughputController.INSTANCE, null); 545 lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles()); 546 lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles()); 547 assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS); 548 } 549 550 private static long getLowestTimeStampFromFS(FileSystem fs, 551 final Collection<HStoreFile> candidates) throws IOException { 552 long minTs = Long.MAX_VALUE; 553 if (candidates.isEmpty()) { 554 return minTs; 555 } 556 Path[] p = new Path[candidates.size()]; 557 int i = 0; 558 for (HStoreFile sf : candidates) { 559 p[i] = sf.getPath(); 560 ++i; 561 } 562 563 FileStatus[] stats = fs.listStatus(p); 564 if (stats == null || stats.length == 0) { 565 return minTs; 566 } 567 for (FileStatus s : stats) { 568 minTs = Math.min(minTs, s.getModificationTime()); 569 } 570 return minTs; 571 } 572 573 ////////////////////////////////////////////////////////////////////////////// 574 // Get tests 575 ////////////////////////////////////////////////////////////////////////////// 576 577 private static final int BLOCKSIZE_SMALL = 8192; 578 579 /** 580 * Test for hbase-1686. 581 */ 582 @Test 583 public void testEmptyStoreFile() throws IOException { 584 init(this.name.getMethodName()); 585 // Write a store file. 586 this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null); 587 this.store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null); 588 flush(1); 589 // Now put in place an empty store file. Its a little tricky. Have to 590 // do manually with hacked in sequence id. 591 HStoreFile f = this.store.getStorefiles().iterator().next(); 592 Path storedir = f.getPath().getParent(); 593 long seqid = f.getMaxSequenceId(); 594 Configuration c = HBaseConfiguration.create(); 595 FileSystem fs = FileSystem.get(c); 596 HFileContext meta = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).build(); 597 StoreFileWriter w = new StoreFileWriter.Builder(c, new CacheConfig(c), fs) 598 .withOutputDir(storedir).withFileContext(meta).build(); 599 w.appendMetadata(seqid + 1, false); 600 w.close(); 601 this.store.close(); 602 // Reopen it... should pick up two files 603 this.store = 604 new HStore(this.store.getHRegion(), this.store.getColumnFamilyDescriptor(), c, false); 605 assertEquals(2, this.store.getStorefilesCount()); 606 607 result = HBaseTestingUtil.getFromStoreFile(store, get.getRow(), qualifiers); 608 assertEquals(1, result.size()); 609 } 610 611 /** 612 * Getting data from memstore only 613 */ 614 @Test 615 public void testGet_FromMemStoreOnly() throws IOException { 616 init(this.name.getMethodName()); 617 618 // Put data in memstore 619 this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null); 620 this.store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null); 621 this.store.add(new KeyValue(row, family, qf3, 1, (byte[]) null), null); 622 this.store.add(new KeyValue(row, family, qf4, 1, (byte[]) null), null); 623 this.store.add(new KeyValue(row, family, qf5, 1, (byte[]) null), null); 624 this.store.add(new KeyValue(row, family, qf6, 1, (byte[]) null), null); 625 626 // Get 627 result = HBaseTestingUtil.getFromStoreFile(store, get.getRow(), qualifiers); 628 629 // Compare 630 assertCheck(); 631 } 632 633 @Test 634 public void testTimeRangeIfSomeCellsAreDroppedInFlush() throws IOException { 635 testTimeRangeIfSomeCellsAreDroppedInFlush(1); 636 testTimeRangeIfSomeCellsAreDroppedInFlush(3); 637 testTimeRangeIfSomeCellsAreDroppedInFlush(5); 638 } 639 640 private void testTimeRangeIfSomeCellsAreDroppedInFlush(int maxVersion) throws IOException { 641 init(this.name.getMethodName(), TEST_UTIL.getConfiguration(), 642 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(maxVersion).build()); 643 long currentTs = 100; 644 long minTs = currentTs; 645 // the extra cell won't be flushed to disk, 646 // so the min of timerange will be different between memStore and hfile. 647 for (int i = 0; i != (maxVersion + 1); ++i) { 648 this.store.add(new KeyValue(row, family, qf1, ++currentTs, (byte[]) null), null); 649 if (i == 1) { 650 minTs = currentTs; 651 } 652 } 653 flushStore(store, id++); 654 655 Collection<HStoreFile> files = store.getStorefiles(); 656 assertEquals(1, files.size()); 657 HStoreFile f = files.iterator().next(); 658 f.initReader(); 659 StoreFileReader reader = f.getReader(); 660 assertEquals(minTs, reader.timeRange.getMin()); 661 assertEquals(currentTs, reader.timeRange.getMax()); 662 } 663 664 /** 665 * Getting data from files only 666 */ 667 @Test 668 public void testGet_FromFilesOnly() throws IOException { 669 init(this.name.getMethodName()); 670 671 // Put data in memstore 672 this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null); 673 this.store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null); 674 // flush 675 flush(1); 676 677 // Add more data 678 this.store.add(new KeyValue(row, family, qf3, 1, (byte[]) null), null); 679 this.store.add(new KeyValue(row, family, qf4, 1, (byte[]) null), null); 680 // flush 681 flush(2); 682 683 // Add more data 684 this.store.add(new KeyValue(row, family, qf5, 1, (byte[]) null), null); 685 this.store.add(new KeyValue(row, family, qf6, 1, (byte[]) null), null); 686 // flush 687 flush(3); 688 689 // Get 690 result = HBaseTestingUtil.getFromStoreFile(store, get.getRow(), qualifiers); 691 // this.store.get(get, qualifiers, result); 692 693 // Need to sort the result since multiple files 694 Collections.sort(result, CellComparatorImpl.COMPARATOR); 695 696 // Compare 697 assertCheck(); 698 } 699 700 /** 701 * Getting data from memstore and files 702 */ 703 @Test 704 public void testGet_FromMemStoreAndFiles() throws IOException { 705 init(this.name.getMethodName()); 706 707 // Put data in memstore 708 this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null); 709 this.store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null); 710 // flush 711 flush(1); 712 713 // Add more data 714 this.store.add(new KeyValue(row, family, qf3, 1, (byte[]) null), null); 715 this.store.add(new KeyValue(row, family, qf4, 1, (byte[]) null), null); 716 // flush 717 flush(2); 718 719 // Add more data 720 this.store.add(new KeyValue(row, family, qf5, 1, (byte[]) null), null); 721 this.store.add(new KeyValue(row, family, qf6, 1, (byte[]) null), null); 722 723 // Get 724 result = HBaseTestingUtil.getFromStoreFile(store, get.getRow(), qualifiers); 725 726 // Need to sort the result since multiple files 727 Collections.sort(result, CellComparatorImpl.COMPARATOR); 728 729 // Compare 730 assertCheck(); 731 } 732 733 private void flush(int storeFilessize) throws IOException { 734 flushStore(store, id++); 735 assertEquals(storeFilessize, this.store.getStorefiles().size()); 736 assertEquals(0, ((AbstractMemStore) this.store.memstore).getActive().getCellsCount()); 737 } 738 739 private void assertCheck() { 740 assertEquals(expected.size(), result.size()); 741 for (int i = 0; i < expected.size(); i++) { 742 assertEquals(expected.get(i), result.get(i)); 743 } 744 } 745 746 @After 747 public void tearDown() throws Exception { 748 EnvironmentEdgeManagerTestHelper.reset(); 749 if (store != null) { 750 try { 751 store.close(); 752 } catch (IOException e) { 753 } 754 store = null; 755 } 756 if (region != null) { 757 region.close(); 758 region = null; 759 } 760 } 761 762 @AfterClass 763 public static void tearDownAfterClass() throws IOException { 764 TEST_UTIL.cleanupTestDir(); 765 } 766 767 @Test 768 public void testHandleErrorsInFlush() throws Exception { 769 LOG.info("Setting up a faulty file system that cannot write"); 770 771 final Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); 772 User user = User.createUserForTesting(conf, "testhandleerrorsinflush", new String[] { "foo" }); 773 // Inject our faulty LocalFileSystem 774 conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class); 775 user.runAs(new PrivilegedExceptionAction<Object>() { 776 @Override 777 public Object run() throws Exception { 778 // Make sure it worked (above is sensitive to caching details in hadoop core) 779 FileSystem fs = FileSystem.get(conf); 780 assertEquals(FaultyFileSystem.class, fs.getClass()); 781 782 // Initialize region 783 init(name.getMethodName(), conf); 784 785 LOG.info("Adding some data"); 786 store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null); 787 store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null); 788 store.add(new KeyValue(row, family, qf3, 1, (byte[]) null), null); 789 790 LOG.info("Before flush, we should have no files"); 791 792 StoreFileTracker sft = StoreFileTrackerFactory.create(conf, false, store.getStoreContext()); 793 Collection<StoreFileInfo> files = sft.load(); 794 assertEquals(0, files != null ? files.size() : 0); 795 796 // flush 797 try { 798 LOG.info("Flushing"); 799 flush(1); 800 fail("Didn't bubble up IOE!"); 801 } catch (IOException ioe) { 802 assertTrue(ioe.getMessage().contains("Fault injected")); 803 } 804 805 LOG.info("After failed flush, we should still have no files!"); 806 files = sft.load(); 807 assertEquals(0, files != null ? files.size() : 0); 808 store.getHRegion().getWAL().close(); 809 return null; 810 } 811 }); 812 FileSystem.closeAllForUGI(user.getUGI()); 813 } 814 815 /** 816 * Faulty file system that will fail if you write past its fault position the FIRST TIME only; 817 * thereafter it will succeed. Used by {@link TestHRegion} too. 818 */ 819 static class FaultyFileSystem extends FilterFileSystem { 820 List<SoftReference<FaultyOutputStream>> outStreams = new ArrayList<>(); 821 private long faultPos = 200; 822 AtomicBoolean fault = new AtomicBoolean(true); 823 824 public FaultyFileSystem() { 825 super(new LocalFileSystem()); 826 LOG.info("Creating faulty!"); 827 } 828 829 @Override 830 public FSDataOutputStream create(Path p) throws IOException { 831 return new FaultyOutputStream(super.create(p), faultPos, fault); 832 } 833 834 @Override 835 public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, 836 int bufferSize, short replication, long blockSize, Progressable progress) throws IOException { 837 return new FaultyOutputStream( 838 super.create(f, permission, overwrite, bufferSize, replication, blockSize, progress), 839 faultPos, fault); 840 } 841 842 @Override 843 public FSDataOutputStream createNonRecursive(Path f, boolean overwrite, int bufferSize, 844 short replication, long blockSize, Progressable progress) throws IOException { 845 // Fake it. Call create instead. The default implementation throws an IOE 846 // that this is not supported. 847 return create(f, overwrite, bufferSize, replication, blockSize, progress); 848 } 849 } 850 851 static class FaultyOutputStream extends FSDataOutputStream { 852 volatile long faultPos = Long.MAX_VALUE; 853 private final AtomicBoolean fault; 854 855 public FaultyOutputStream(FSDataOutputStream out, long faultPos, final AtomicBoolean fault) 856 throws IOException { 857 super(out, null); 858 this.faultPos = faultPos; 859 this.fault = fault; 860 } 861 862 @Override 863 public synchronized void write(byte[] buf, int offset, int length) throws IOException { 864 LOG.info("faulty stream write at pos " + getPos()); 865 injectFault(); 866 super.write(buf, offset, length); 867 } 868 869 private void injectFault() throws IOException { 870 if (this.fault.get() && getPos() >= faultPos) { 871 throw new IOException("Fault injected"); 872 } 873 } 874 } 875 876 private static StoreFlushContext flushStore(HStore store, long id) throws IOException { 877 StoreFlushContext storeFlushCtx = store.createFlushContext(id, FlushLifeCycleTracker.DUMMY); 878 storeFlushCtx.prepare(); 879 storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class)); 880 storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); 881 return storeFlushCtx; 882 } 883 884 /** 885 * Generate a list of KeyValues for testing based on given parameters 886 * @return the rows key-value list 887 */ 888 private List<ExtendedCell> getKeyValueSet(long[] timestamps, int numRows, byte[] qualifier, 889 byte[] family) { 890 List<ExtendedCell> kvList = new ArrayList<>(); 891 for (int i = 1; i <= numRows; i++) { 892 byte[] b = Bytes.toBytes(i); 893 for (long timestamp : timestamps) { 894 kvList.add(new KeyValue(b, family, qualifier, timestamp, b)); 895 } 896 } 897 return kvList; 898 } 899 900 /** 901 * Test to ensure correctness when using Stores with multiple timestamps 902 */ 903 @Test 904 public void testMultipleTimestamps() throws IOException { 905 int numRows = 1; 906 long[] timestamps1 = new long[] { 1, 5, 10, 20 }; 907 long[] timestamps2 = new long[] { 30, 80 }; 908 909 init(this.name.getMethodName()); 910 911 List<ExtendedCell> kvList1 = getKeyValueSet(timestamps1, numRows, qf1, family); 912 for (ExtendedCell kv : kvList1) { 913 this.store.add(kv, null); 914 } 915 916 flushStore(store, id++); 917 918 List<ExtendedCell> kvList2 = getKeyValueSet(timestamps2, numRows, qf1, family); 919 for (ExtendedCell kv : kvList2) { 920 this.store.add(kv, null); 921 } 922 923 List<Cell> result; 924 Get get = new Get(Bytes.toBytes(1)); 925 get.addColumn(family, qf1); 926 927 get.setTimeRange(0, 15); 928 result = HBaseTestingUtil.getFromStoreFile(store, get); 929 assertTrue(result.size() > 0); 930 931 get.setTimeRange(40, 90); 932 result = HBaseTestingUtil.getFromStoreFile(store, get); 933 assertTrue(result.size() > 0); 934 935 get.setTimeRange(10, 45); 936 result = HBaseTestingUtil.getFromStoreFile(store, get); 937 assertTrue(result.size() > 0); 938 939 get.setTimeRange(80, 145); 940 result = HBaseTestingUtil.getFromStoreFile(store, get); 941 assertTrue(result.size() > 0); 942 943 get.setTimeRange(1, 2); 944 result = HBaseTestingUtil.getFromStoreFile(store, get); 945 assertTrue(result.size() > 0); 946 947 get.setTimeRange(90, 200); 948 result = HBaseTestingUtil.getFromStoreFile(store, get); 949 assertTrue(result.size() == 0); 950 } 951 952 /** 953 * Test for HBASE-3492 - Test split on empty colfam (no store files). 954 * @throws IOException When the IO operations fail. 955 */ 956 @Test 957 public void testSplitWithEmptyColFam() throws IOException { 958 init(this.name.getMethodName()); 959 assertFalse(store.getSplitPoint().isPresent()); 960 } 961 962 @Test 963 public void testStoreUsesConfigurationFromHcdAndHtd() throws Exception { 964 final String CONFIG_KEY = "hbase.regionserver.thread.compaction.throttle"; 965 long anyValue = 10; 966 967 // We'll check that it uses correct config and propagates it appropriately by going thru 968 // the simplest "real" path I can find - "throttleCompaction", which just checks whether 969 // a number we pass in is higher than some config value, inside compactionPolicy. 970 Configuration conf = HBaseConfiguration.create(); 971 conf.setLong(CONFIG_KEY, anyValue); 972 init(name.getMethodName() + "-xml", conf); 973 assertTrue(store.throttleCompaction(anyValue + 1)); 974 assertFalse(store.throttleCompaction(anyValue)); 975 976 // HTD overrides XML. 977 --anyValue; 978 init( 979 name.getMethodName() + "-htd", conf, TableDescriptorBuilder 980 .newBuilder(TableName.valueOf(table)).setValue(CONFIG_KEY, Long.toString(anyValue)), 981 ColumnFamilyDescriptorBuilder.of(family)); 982 assertTrue(store.throttleCompaction(anyValue + 1)); 983 assertFalse(store.throttleCompaction(anyValue)); 984 985 // HCD overrides them both. 986 --anyValue; 987 init(name.getMethodName() + "-hcd", conf, 988 TableDescriptorBuilder.newBuilder(TableName.valueOf(table)).setValue(CONFIG_KEY, 989 Long.toString(anyValue)), 990 ColumnFamilyDescriptorBuilder.newBuilder(family).setValue(CONFIG_KEY, Long.toString(anyValue)) 991 .build()); 992 assertTrue(store.throttleCompaction(anyValue + 1)); 993 assertFalse(store.throttleCompaction(anyValue)); 994 } 995 996 public static class DummyStoreEngine extends DefaultStoreEngine { 997 public static DefaultCompactor lastCreatedCompactor = null; 998 999 @Override 1000 protected void createComponents(Configuration conf, HStore store, CellComparator comparator) 1001 throws IOException { 1002 super.createComponents(conf, store, comparator); 1003 lastCreatedCompactor = this.compactor; 1004 } 1005 } 1006 1007 @Test 1008 public void testStoreUsesSearchEngineOverride() throws Exception { 1009 Configuration conf = HBaseConfiguration.create(); 1010 conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DummyStoreEngine.class.getName()); 1011 init(this.name.getMethodName(), conf); 1012 assertEquals(DummyStoreEngine.lastCreatedCompactor, this.store.storeEngine.getCompactor()); 1013 } 1014 1015 private void addStoreFile() throws IOException { 1016 HStoreFile f = this.store.getStorefiles().iterator().next(); 1017 Path storedir = f.getPath().getParent(); 1018 long seqid = this.store.getMaxSequenceId().orElse(0L); 1019 Configuration c = TEST_UTIL.getConfiguration(); 1020 FileSystem fs = FileSystem.get(c); 1021 HFileContext fileContext = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).build(); 1022 StoreFileWriter w = new StoreFileWriter.Builder(c, new CacheConfig(c), fs) 1023 .withOutputDir(storedir).withFileContext(fileContext).build(); 1024 w.appendMetadata(seqid + 1, false); 1025 w.close(); 1026 LOG.info("Added store file:" + w.getPath()); 1027 } 1028 1029 private void archiveStoreFile(int index) throws IOException { 1030 Collection<HStoreFile> files = this.store.getStorefiles(); 1031 HStoreFile sf = null; 1032 Iterator<HStoreFile> it = files.iterator(); 1033 for (int i = 0; i <= index; i++) { 1034 sf = it.next(); 1035 } 1036 StoreFileTracker sft = 1037 StoreFileTrackerFactory.create(store.getRegionFileSystem().getFileSystem().getConf(), 1038 store.isPrimaryReplicaStore(), store.getStoreContext()); 1039 sft.removeStoreFiles(Lists.newArrayList(sf)); 1040 } 1041 1042 private void closeCompactedFile(int index) throws IOException { 1043 Collection<HStoreFile> files = 1044 this.store.getStoreEngine().getStoreFileManager().getCompactedfiles(); 1045 if (files.size() > 0) { 1046 HStoreFile sf = null; 1047 Iterator<HStoreFile> it = files.iterator(); 1048 for (int i = 0; i <= index; i++) { 1049 sf = it.next(); 1050 } 1051 sf.closeStoreFile(true); 1052 store.getStoreEngine().getStoreFileManager() 1053 .removeCompactedFiles(Collections.singletonList(sf)); 1054 } 1055 } 1056 1057 @Test 1058 public void testRefreshStoreFiles() throws Exception { 1059 init(name.getMethodName()); 1060 1061 assertEquals(0, this.store.getStorefilesCount()); 1062 1063 // Test refreshing store files when no store files are there 1064 store.refreshStoreFiles(); 1065 assertEquals(0, this.store.getStorefilesCount()); 1066 1067 // add some data, flush 1068 this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null); 1069 flush(1); 1070 assertEquals(1, this.store.getStorefilesCount()); 1071 1072 // add one more file 1073 addStoreFile(); 1074 1075 assertEquals(1, this.store.getStorefilesCount()); 1076 store.refreshStoreFiles(); 1077 assertEquals(2, this.store.getStorefilesCount()); 1078 1079 // add three more files 1080 addStoreFile(); 1081 addStoreFile(); 1082 addStoreFile(); 1083 1084 assertEquals(2, this.store.getStorefilesCount()); 1085 store.refreshStoreFiles(); 1086 assertEquals(5, this.store.getStorefilesCount()); 1087 1088 closeCompactedFile(0); 1089 archiveStoreFile(0); 1090 1091 assertEquals(5, this.store.getStorefilesCount()); 1092 store.refreshStoreFiles(); 1093 assertEquals(4, this.store.getStorefilesCount()); 1094 1095 archiveStoreFile(0); 1096 archiveStoreFile(1); 1097 archiveStoreFile(2); 1098 1099 assertEquals(4, this.store.getStorefilesCount()); 1100 store.refreshStoreFiles(); 1101 assertEquals(1, this.store.getStorefilesCount()); 1102 1103 archiveStoreFile(0); 1104 store.refreshStoreFiles(); 1105 assertEquals(0, this.store.getStorefilesCount()); 1106 } 1107 1108 @Test 1109 public void testRefreshStoreFilesNotChanged() throws IOException { 1110 init(name.getMethodName()); 1111 1112 assertEquals(0, this.store.getStorefilesCount()); 1113 1114 // add some data, flush 1115 this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null); 1116 flush(1); 1117 // add one more file 1118 addStoreFile(); 1119 1120 StoreEngine<?, ?, ?, ?> spiedStoreEngine = spy(store.getStoreEngine()); 1121 1122 // call first time after files changed 1123 spiedStoreEngine.refreshStoreFiles(); 1124 assertEquals(2, this.store.getStorefilesCount()); 1125 verify(spiedStoreEngine, times(1)).replaceStoreFiles(any(), any(), any(), any()); 1126 1127 // call second time 1128 spiedStoreEngine.refreshStoreFiles(); 1129 1130 // ensure that replaceStoreFiles is not called, i.e, the times does not change, if files are not 1131 // refreshed, 1132 verify(spiedStoreEngine, times(1)).replaceStoreFiles(any(), any(), any(), any()); 1133 } 1134 1135 @Test 1136 public void testScanWithCompactionAfterFlush() throws Exception { 1137 TEST_UTIL.getConfiguration().set(DEFAULT_COMPACTION_POLICY_CLASS_KEY, 1138 EverythingPolicy.class.getName()); 1139 init(name.getMethodName()); 1140 1141 assertEquals(0, this.store.getStorefilesCount()); 1142 1143 KeyValue kv = new KeyValue(row, family, qf1, 1, (byte[]) null); 1144 // add some data, flush 1145 this.store.add(kv, null); 1146 flush(1); 1147 kv = new KeyValue(row, family, qf2, 1, (byte[]) null); 1148 // add some data, flush 1149 this.store.add(kv, null); 1150 flush(2); 1151 kv = new KeyValue(row, family, qf3, 1, (byte[]) null); 1152 // add some data, flush 1153 this.store.add(kv, null); 1154 flush(3); 1155 1156 ExecutorService service = Executors.newFixedThreadPool(2); 1157 1158 Scan scan = new Scan(new Get(row)); 1159 Future<KeyValueScanner> scanFuture = service.submit(() -> { 1160 try { 1161 LOG.info(">>>> creating scanner"); 1162 return this.store.createScanner(scan, 1163 new ScanInfo(HBaseConfiguration.create(), 1164 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(4).build(), 1165 Long.MAX_VALUE, 0, CellComparator.getInstance()), 1166 scan.getFamilyMap().get(store.getColumnFamilyDescriptor().getName()), 0); 1167 } catch (IOException e) { 1168 e.printStackTrace(); 1169 return null; 1170 } 1171 }); 1172 Future compactFuture = service.submit(() -> { 1173 try { 1174 LOG.info(">>>>>> starting compaction"); 1175 Optional<CompactionContext> opCompaction = this.store.requestCompaction(); 1176 assertTrue(opCompaction.isPresent()); 1177 store.compact(opCompaction.get(), new NoLimitThroughputController(), User.getCurrent()); 1178 LOG.info(">>>>>> Compaction is finished"); 1179 this.store.closeAndArchiveCompactedFiles(); 1180 LOG.info(">>>>>> Compacted files deleted"); 1181 } catch (IOException e) { 1182 e.printStackTrace(); 1183 } 1184 }); 1185 1186 KeyValueScanner kvs = scanFuture.get(); 1187 compactFuture.get(); 1188 ((StoreScanner) kvs).currentScanners.forEach(s -> { 1189 if (s instanceof StoreFileScanner) { 1190 assertEquals(1, ((StoreFileScanner) s).getReader().getRefCount()); 1191 } 1192 }); 1193 kvs.seek(kv); 1194 service.shutdownNow(); 1195 } 1196 1197 private long countMemStoreScanner(StoreScanner scanner) { 1198 if (scanner.currentScanners == null) { 1199 return 0; 1200 } 1201 return scanner.currentScanners.stream().filter(s -> !s.isFileScanner()).count(); 1202 } 1203 1204 @Test 1205 public void testNumberOfMemStoreScannersAfterFlush() throws IOException { 1206 long seqId = 100; 1207 long timestamp = EnvironmentEdgeManager.currentTime(); 1208 ExtendedCell cell0 = 1209 ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family) 1210 .setQualifier(qf1).setTimestamp(timestamp).setType(Cell.Type.Put).setValue(qf1).build(); 1211 PrivateCellUtil.setSequenceId(cell0, seqId); 1212 testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Collections.emptyList()); 1213 1214 ExtendedCell cell1 = 1215 ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family) 1216 .setQualifier(qf2).setTimestamp(timestamp).setType(Cell.Type.Put).setValue(qf1).build(); 1217 PrivateCellUtil.setSequenceId(cell1, seqId); 1218 testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Arrays.asList(cell1)); 1219 1220 seqId = 101; 1221 timestamp = EnvironmentEdgeManager.currentTime(); 1222 ExtendedCell cell2 = 1223 ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row2).setFamily(family) 1224 .setQualifier(qf2).setTimestamp(timestamp).setType(Cell.Type.Put).setValue(qf1).build(); 1225 PrivateCellUtil.setSequenceId(cell2, seqId); 1226 testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Arrays.asList(cell1, cell2)); 1227 } 1228 1229 private void testNumberOfMemStoreScannersAfterFlush(List<ExtendedCell> inputCellsBeforeSnapshot, 1230 List<ExtendedCell> inputCellsAfterSnapshot) throws IOException { 1231 init(this.name.getMethodName() + "-" + inputCellsBeforeSnapshot.size()); 1232 TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); 1233 long seqId = Long.MIN_VALUE; 1234 for (ExtendedCell c : inputCellsBeforeSnapshot) { 1235 quals.add(CellUtil.cloneQualifier(c)); 1236 seqId = Math.max(seqId, c.getSequenceId()); 1237 } 1238 for (ExtendedCell c : inputCellsAfterSnapshot) { 1239 quals.add(CellUtil.cloneQualifier(c)); 1240 seqId = Math.max(seqId, c.getSequenceId()); 1241 } 1242 inputCellsBeforeSnapshot.forEach(c -> store.add(c, null)); 1243 StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY); 1244 storeFlushCtx.prepare(); 1245 inputCellsAfterSnapshot.forEach(c -> store.add(c, null)); 1246 int numberOfMemScannersBeforeFlush = inputCellsAfterSnapshot.isEmpty() ? 1 : 2; 1247 try (StoreScanner s = (StoreScanner) store.getScanner(new Scan(), quals, seqId)) { 1248 // snapshot + active (if inputCellsAfterSnapshot isn't empty) 1249 assertEquals(numberOfMemScannersBeforeFlush, countMemStoreScanner(s)); 1250 storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class)); 1251 storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); 1252 // snapshot has no data after flush 1253 int numberOfMemScannersAfterFlush = inputCellsAfterSnapshot.isEmpty() ? 0 : 1; 1254 boolean more; 1255 int cellCount = 0; 1256 do { 1257 List<Cell> cells = new ArrayList<>(); 1258 more = s.next(cells); 1259 cellCount += cells.size(); 1260 assertEquals(more ? numberOfMemScannersAfterFlush : 0, countMemStoreScanner(s)); 1261 } while (more); 1262 assertEquals( 1263 "The number of cells added before snapshot is " + inputCellsBeforeSnapshot.size() 1264 + ", The number of cells added after snapshot is " + inputCellsAfterSnapshot.size(), 1265 inputCellsBeforeSnapshot.size() + inputCellsAfterSnapshot.size(), cellCount); 1266 // the current scanners is cleared 1267 assertEquals(0, countMemStoreScanner(s)); 1268 } 1269 } 1270 1271 private ExtendedCell createCell(byte[] qualifier, long ts, long sequenceId, byte[] value) 1272 throws IOException { 1273 return createCell(row, qualifier, ts, sequenceId, value); 1274 } 1275 1276 private ExtendedCell createCell(byte[] row, byte[] qualifier, long ts, long sequenceId, 1277 byte[] value) throws IOException { 1278 return ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row) 1279 .setFamily(family).setQualifier(qualifier).setTimestamp(ts).setType(Cell.Type.Put) 1280 .setValue(value).setSequenceId(sequenceId).build(); 1281 } 1282 1283 private ExtendedCell createDeleteCell(byte[] row, byte[] qualifier, long ts, long sequenceId) { 1284 return ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row) 1285 .setFamily(family).setQualifier(qualifier).setTimestamp(ts).setType(Cell.Type.DeleteColumn) 1286 .setSequenceId(sequenceId).build(); 1287 } 1288 1289 @Test 1290 public void testFlushBeforeCompletingScanWoFilter() throws IOException, InterruptedException { 1291 final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false); 1292 final int expectedSize = 3; 1293 testFlushBeforeCompletingScan(new MyListHook() { 1294 @Override 1295 public void hook(int currentSize) { 1296 if (currentSize == expectedSize - 1) { 1297 try { 1298 flushStore(store, id++); 1299 timeToGoNextRow.set(true); 1300 } catch (IOException e) { 1301 throw new RuntimeException(e); 1302 } 1303 } 1304 } 1305 }, new FilterBase() { 1306 @Override 1307 public Filter.ReturnCode filterCell(final Cell c) throws IOException { 1308 return ReturnCode.INCLUDE; 1309 } 1310 }, expectedSize); 1311 } 1312 1313 @Test 1314 public void testFlushBeforeCompletingScanWithFilter() throws IOException, InterruptedException { 1315 final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false); 1316 final int expectedSize = 2; 1317 testFlushBeforeCompletingScan(new MyListHook() { 1318 @Override 1319 public void hook(int currentSize) { 1320 if (currentSize == expectedSize - 1) { 1321 try { 1322 flushStore(store, id++); 1323 timeToGoNextRow.set(true); 1324 } catch (IOException e) { 1325 throw new RuntimeException(e); 1326 } 1327 } 1328 } 1329 }, new FilterBase() { 1330 @Override 1331 public Filter.ReturnCode filterCell(final Cell c) throws IOException { 1332 if (timeToGoNextRow.get()) { 1333 timeToGoNextRow.set(false); 1334 return ReturnCode.NEXT_ROW; 1335 } else { 1336 return ReturnCode.INCLUDE; 1337 } 1338 } 1339 }, expectedSize); 1340 } 1341 1342 @Test 1343 public void testFlushBeforeCompletingScanWithFilterHint() 1344 throws IOException, InterruptedException { 1345 final AtomicBoolean timeToGetHint = new AtomicBoolean(false); 1346 final int expectedSize = 2; 1347 testFlushBeforeCompletingScan(new MyListHook() { 1348 @Override 1349 public void hook(int currentSize) { 1350 if (currentSize == expectedSize - 1) { 1351 try { 1352 flushStore(store, id++); 1353 timeToGetHint.set(true); 1354 } catch (IOException e) { 1355 throw new RuntimeException(e); 1356 } 1357 } 1358 } 1359 }, new FilterBase() { 1360 @Override 1361 public Filter.ReturnCode filterCell(final Cell c) throws IOException { 1362 if (timeToGetHint.get()) { 1363 timeToGetHint.set(false); 1364 return Filter.ReturnCode.SEEK_NEXT_USING_HINT; 1365 } else { 1366 return Filter.ReturnCode.INCLUDE; 1367 } 1368 } 1369 1370 @Override 1371 public Cell getNextCellHint(Cell currentCell) throws IOException { 1372 return currentCell; 1373 } 1374 }, expectedSize); 1375 } 1376 1377 private void testFlushBeforeCompletingScan(MyListHook hook, Filter filter, int expectedSize) 1378 throws IOException, InterruptedException { 1379 Configuration conf = HBaseConfiguration.create(); 1380 byte[] r0 = Bytes.toBytes("row0"); 1381 byte[] r1 = Bytes.toBytes("row1"); 1382 byte[] r2 = Bytes.toBytes("row2"); 1383 byte[] value0 = Bytes.toBytes("value0"); 1384 byte[] value1 = Bytes.toBytes("value1"); 1385 byte[] value2 = Bytes.toBytes("value2"); 1386 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 1387 long ts = EnvironmentEdgeManager.currentTime(); 1388 long seqId = 100; 1389 init(name.getMethodName(), conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), 1390 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(1).build(), 1391 new MyStoreHook() { 1392 @Override 1393 public long getSmallestReadPoint(HStore store) { 1394 return seqId + 3; 1395 } 1396 }); 1397 // The cells having the value0 won't be flushed to disk because the value of max version is 1 1398 store.add(createCell(r0, qf1, ts, seqId, value0), memStoreSizing); 1399 store.add(createCell(r0, qf2, ts, seqId, value0), memStoreSizing); 1400 store.add(createCell(r0, qf3, ts, seqId, value0), memStoreSizing); 1401 store.add(createCell(r1, qf1, ts + 1, seqId + 1, value1), memStoreSizing); 1402 store.add(createCell(r1, qf2, ts + 1, seqId + 1, value1), memStoreSizing); 1403 store.add(createCell(r1, qf3, ts + 1, seqId + 1, value1), memStoreSizing); 1404 store.add(createCell(r2, qf1, ts + 2, seqId + 2, value2), memStoreSizing); 1405 store.add(createCell(r2, qf2, ts + 2, seqId + 2, value2), memStoreSizing); 1406 store.add(createCell(r2, qf3, ts + 2, seqId + 2, value2), memStoreSizing); 1407 store.add(createCell(r1, qf1, ts + 3, seqId + 3, value1), memStoreSizing); 1408 store.add(createCell(r1, qf2, ts + 3, seqId + 3, value1), memStoreSizing); 1409 store.add(createCell(r1, qf3, ts + 3, seqId + 3, value1), memStoreSizing); 1410 List<Cell> myList = new MyList<>(hook); 1411 Scan scan = new Scan().withStartRow(r1).setFilter(filter); 1412 try (InternalScanner scanner = (InternalScanner) store.getScanner(scan, null, seqId + 3)) { 1413 // r1 1414 scanner.next(myList); 1415 assertEquals(expectedSize, myList.size()); 1416 for (Cell c : myList) { 1417 byte[] actualValue = CellUtil.cloneValue(c); 1418 assertTrue("expected:" + Bytes.toStringBinary(value1) + ", actual:" 1419 + Bytes.toStringBinary(actualValue), Bytes.equals(actualValue, value1)); 1420 } 1421 List<Cell> normalList = new ArrayList<>(3); 1422 // r2 1423 scanner.next(normalList); 1424 assertEquals(3, normalList.size()); 1425 for (Cell c : normalList) { 1426 byte[] actualValue = CellUtil.cloneValue(c); 1427 assertTrue("expected:" + Bytes.toStringBinary(value2) + ", actual:" 1428 + Bytes.toStringBinary(actualValue), Bytes.equals(actualValue, value2)); 1429 } 1430 } 1431 } 1432 1433 @Test 1434 public void testFlushBeforeCompletingScanWithDeleteCell() throws IOException { 1435 final Configuration conf = HBaseConfiguration.create(); 1436 1437 byte[] r1 = Bytes.toBytes("row1"); 1438 byte[] r2 = Bytes.toBytes("row2"); 1439 1440 byte[] value1 = Bytes.toBytes("value1"); 1441 byte[] value2 = Bytes.toBytes("value2"); 1442 1443 final MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 1444 final long ts = EnvironmentEdgeManager.currentTime(); 1445 final long seqId = 100; 1446 1447 init(name.getMethodName(), conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), 1448 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(1).build(), 1449 new MyStoreHook() { 1450 @Override 1451 long getSmallestReadPoint(HStore store) { 1452 return seqId + 3; 1453 } 1454 }); 1455 1456 store.add(createCell(r1, qf1, ts + 1, seqId + 1, value2), memStoreSizing); 1457 store.add(createCell(r1, qf2, ts + 1, seqId + 1, value2), memStoreSizing); 1458 store.add(createCell(r1, qf3, ts + 1, seqId + 1, value2), memStoreSizing); 1459 1460 store.add(createDeleteCell(r1, qf1, ts + 2, seqId + 2), memStoreSizing); 1461 store.add(createDeleteCell(r1, qf2, ts + 2, seqId + 2), memStoreSizing); 1462 store.add(createDeleteCell(r1, qf3, ts + 2, seqId + 2), memStoreSizing); 1463 1464 store.add(createCell(r2, qf1, ts + 3, seqId + 3, value1), memStoreSizing); 1465 store.add(createCell(r2, qf2, ts + 3, seqId + 3, value1), memStoreSizing); 1466 store.add(createCell(r2, qf3, ts + 3, seqId + 3, value1), memStoreSizing); 1467 1468 Scan scan = new Scan().withStartRow(r1); 1469 1470 try (final InternalScanner scanner = 1471 new StoreScanner(store, store.getScanInfo(), scan, null, seqId + 3) { 1472 @Override 1473 protected KeyValueHeap newKVHeap(List<? extends KeyValueScanner> scanners, 1474 CellComparator comparator) throws IOException { 1475 return new MyKeyValueHeap(scanners, comparator, recordBlockSizeCallCount -> { 1476 if (recordBlockSizeCallCount == 6) { 1477 try { 1478 flushStore(store, id++); 1479 } catch (IOException e) { 1480 throw new RuntimeException(e); 1481 } 1482 } 1483 }); 1484 } 1485 }) { 1486 List<Cell> cellResult = new ArrayList<>(); 1487 1488 scanner.next(cellResult); 1489 assertEquals(0, cellResult.size()); 1490 1491 cellResult.clear(); 1492 1493 scanner.next(cellResult); 1494 assertEquals(3, cellResult.size()); 1495 for (Cell cell : cellResult) { 1496 assertArrayEquals(r2, CellUtil.cloneRow(cell)); 1497 } 1498 } 1499 } 1500 1501 @Test 1502 public void testCreateScannerAndSnapshotConcurrently() throws IOException, InterruptedException { 1503 Configuration conf = HBaseConfiguration.create(); 1504 conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore.class.getName()); 1505 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) 1506 .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); 1507 byte[] value = Bytes.toBytes("value"); 1508 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 1509 long ts = EnvironmentEdgeManager.currentTime(); 1510 long seqId = 100; 1511 // older data whihc shouldn't be "seen" by client 1512 store.add(createCell(qf1, ts, seqId, value), memStoreSizing); 1513 store.add(createCell(qf2, ts, seqId, value), memStoreSizing); 1514 store.add(createCell(qf3, ts, seqId, value), memStoreSizing); 1515 TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); 1516 quals.add(qf1); 1517 quals.add(qf2); 1518 quals.add(qf3); 1519 StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY); 1520 MyCompactingMemStore.START_TEST.set(true); 1521 Runnable flush = () -> { 1522 // this is blocked until we create first scanner from pipeline and snapshot -- phase (1/5) 1523 // recreate the active memstore -- phase (4/5) 1524 storeFlushCtx.prepare(); 1525 }; 1526 ExecutorService service = Executors.newSingleThreadExecutor(); 1527 service.execute(flush); 1528 // we get scanner from pipeline and snapshot but they are empty. -- phase (2/5) 1529 // this is blocked until we recreate the active memstore -- phase (3/5) 1530 // we get scanner from active memstore but it is empty -- phase (5/5) 1531 InternalScanner scanner = 1532 (InternalScanner) store.getScanner(new Scan(new Get(row)), quals, seqId + 1); 1533 service.shutdown(); 1534 service.awaitTermination(20, TimeUnit.SECONDS); 1535 try { 1536 try { 1537 List<Cell> results = new ArrayList<>(); 1538 scanner.next(results); 1539 assertEquals(3, results.size()); 1540 for (Cell c : results) { 1541 byte[] actualValue = CellUtil.cloneValue(c); 1542 assertTrue("expected:" + Bytes.toStringBinary(value) + ", actual:" 1543 + Bytes.toStringBinary(actualValue), Bytes.equals(actualValue, value)); 1544 } 1545 } finally { 1546 scanner.close(); 1547 } 1548 } finally { 1549 MyCompactingMemStore.START_TEST.set(false); 1550 storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class)); 1551 storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); 1552 } 1553 } 1554 1555 @Test 1556 public void testScanWithDoubleFlush() throws IOException { 1557 Configuration conf = HBaseConfiguration.create(); 1558 // Initialize region 1559 MyStore myStore = initMyStore(name.getMethodName(), conf, new MyStoreHook() { 1560 @Override 1561 public void getScanners(MyStore store) throws IOException { 1562 final long tmpId = id++; 1563 ExecutorService s = Executors.newSingleThreadExecutor(); 1564 s.execute(() -> { 1565 try { 1566 // flush the store before storescanner updates the scanners from store. 1567 // The current data will be flushed into files, and the memstore will 1568 // be clear. 1569 // -- phase (4/4) 1570 flushStore(store, tmpId); 1571 } catch (IOException ex) { 1572 throw new RuntimeException(ex); 1573 } 1574 }); 1575 s.shutdown(); 1576 try { 1577 // wait for the flush, the thread will be blocked in HStore#notifyChangedReadersObservers. 1578 s.awaitTermination(3, TimeUnit.SECONDS); 1579 } catch (InterruptedException ex) { 1580 } 1581 } 1582 }); 1583 byte[] oldValue = Bytes.toBytes("oldValue"); 1584 byte[] currentValue = Bytes.toBytes("currentValue"); 1585 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 1586 long ts = EnvironmentEdgeManager.currentTime(); 1587 long seqId = 100; 1588 // older data whihc shouldn't be "seen" by client 1589 myStore.add(createCell(qf1, ts, seqId, oldValue), memStoreSizing); 1590 myStore.add(createCell(qf2, ts, seqId, oldValue), memStoreSizing); 1591 myStore.add(createCell(qf3, ts, seqId, oldValue), memStoreSizing); 1592 long snapshotId = id++; 1593 // push older data into snapshot -- phase (1/4) 1594 StoreFlushContext storeFlushCtx = 1595 store.createFlushContext(snapshotId, FlushLifeCycleTracker.DUMMY); 1596 storeFlushCtx.prepare(); 1597 1598 // insert current data into active -- phase (2/4) 1599 myStore.add(createCell(qf1, ts + 1, seqId + 1, currentValue), memStoreSizing); 1600 myStore.add(createCell(qf2, ts + 1, seqId + 1, currentValue), memStoreSizing); 1601 myStore.add(createCell(qf3, ts + 1, seqId + 1, currentValue), memStoreSizing); 1602 TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); 1603 quals.add(qf1); 1604 quals.add(qf2); 1605 quals.add(qf3); 1606 try (InternalScanner scanner = 1607 (InternalScanner) myStore.getScanner(new Scan(new Get(row)), quals, seqId + 1)) { 1608 // complete the flush -- phase (3/4) 1609 storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class)); 1610 storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); 1611 1612 List<Cell> results = new ArrayList<>(); 1613 scanner.next(results); 1614 assertEquals(3, results.size()); 1615 for (Cell c : results) { 1616 byte[] actualValue = CellUtil.cloneValue(c); 1617 assertTrue("expected:" + Bytes.toStringBinary(currentValue) + ", actual:" 1618 + Bytes.toStringBinary(actualValue), Bytes.equals(actualValue, currentValue)); 1619 } 1620 } 1621 } 1622 1623 /** 1624 * This test is for HBASE-27519, when the {@link StoreScanner} is scanning,the Flush and the 1625 * Compaction execute concurrently and theCcompaction compact and archive the flushed 1626 * {@link HStoreFile} which is used by {@link StoreScanner#updateReaders}.Before 1627 * HBASE-27519,{@link StoreScanner.updateReaders} would throw {@link FileNotFoundException}. 1628 */ 1629 @Test 1630 public void testStoreScannerUpdateReadersWhenFlushAndCompactConcurrently() throws IOException { 1631 Configuration conf = HBaseConfiguration.create(); 1632 conf.setBoolean(WALFactory.WAL_ENABLED, false); 1633 conf.set(DEFAULT_COMPACTION_POLICY_CLASS_KEY, EverythingPolicy.class.getName()); 1634 byte[] r0 = Bytes.toBytes("row0"); 1635 byte[] r1 = Bytes.toBytes("row1"); 1636 final CyclicBarrier cyclicBarrier = new CyclicBarrier(2); 1637 final AtomicBoolean shouldWaitRef = new AtomicBoolean(false); 1638 // Initialize region 1639 final MyStore myStore = initMyStore(name.getMethodName(), conf, new MyStoreHook() { 1640 @Override 1641 public void getScanners(MyStore store) throws IOException { 1642 try { 1643 // Here this method is called by StoreScanner.updateReaders which is invoked by the 1644 // following TestHStore.flushStore 1645 if (shouldWaitRef.get()) { 1646 // wait the following compaction Task start 1647 cyclicBarrier.await(); 1648 // wait the following HStore.closeAndArchiveCompactedFiles end. 1649 cyclicBarrier.await(); 1650 } 1651 } catch (BrokenBarrierException | InterruptedException e) { 1652 throw new RuntimeException(e); 1653 } 1654 } 1655 }); 1656 1657 final AtomicReference<Throwable> compactionExceptionRef = new AtomicReference<Throwable>(null); 1658 Runnable compactionTask = () -> { 1659 try { 1660 // Only when the StoreScanner.updateReaders invoked by TestHStore.flushStore prepares for 1661 // entering the MyStore.getScanners, compactionTask could start. 1662 cyclicBarrier.await(); 1663 region.compactStore(family, new NoLimitThroughputController()); 1664 myStore.closeAndArchiveCompactedFiles(); 1665 // Notify StoreScanner.updateReaders could enter MyStore.getScanners. 1666 cyclicBarrier.await(); 1667 } catch (Throwable e) { 1668 compactionExceptionRef.set(e); 1669 } 1670 }; 1671 1672 long ts = EnvironmentEdgeManager.currentTime(); 1673 long seqId = 100; 1674 byte[] value = Bytes.toBytes("value"); 1675 // older data whihc shouldn't be "seen" by client 1676 myStore.add(createCell(r0, qf1, ts, seqId, value), null); 1677 flushStore(myStore, id++); 1678 myStore.add(createCell(r0, qf2, ts, seqId, value), null); 1679 flushStore(myStore, id++); 1680 myStore.add(createCell(r0, qf3, ts, seqId, value), null); 1681 TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); 1682 quals.add(qf1); 1683 quals.add(qf2); 1684 quals.add(qf3); 1685 1686 myStore.add(createCell(r1, qf1, ts, seqId, value), null); 1687 myStore.add(createCell(r1, qf2, ts, seqId, value), null); 1688 myStore.add(createCell(r1, qf3, ts, seqId, value), null); 1689 1690 Thread.currentThread() 1691 .setName("testStoreScannerUpdateReadersWhenFlushAndCompactConcurrently thread"); 1692 Scan scan = new Scan(); 1693 scan.withStartRow(r0, true); 1694 try (InternalScanner scanner = (InternalScanner) myStore.getScanner(scan, quals, seqId)) { 1695 List<Cell> results = new MyList<>(size -> { 1696 switch (size) { 1697 case 1: 1698 shouldWaitRef.set(true); 1699 Thread thread = new Thread(compactionTask); 1700 thread.setName("MyCompacting Thread."); 1701 thread.start(); 1702 try { 1703 flushStore(myStore, id++); 1704 thread.join(); 1705 } catch (IOException | InterruptedException e) { 1706 throw new RuntimeException(e); 1707 } 1708 shouldWaitRef.set(false); 1709 break; 1710 default: 1711 break; 1712 } 1713 }); 1714 // Before HBASE-27519, here would throw java.io.FileNotFoundException because the storeFile 1715 // which used by StoreScanner.updateReaders is deleted by compactionTask. 1716 scanner.next(results); 1717 // The results is r0 row cells. 1718 assertEquals(3, results.size()); 1719 assertTrue(compactionExceptionRef.get() == null); 1720 } 1721 } 1722 1723 @Test 1724 public void testReclaimChunkWhenScaning() throws IOException { 1725 init("testReclaimChunkWhenScaning"); 1726 long ts = EnvironmentEdgeManager.currentTime(); 1727 long seqId = 100; 1728 byte[] value = Bytes.toBytes("value"); 1729 // older data whihc shouldn't be "seen" by client 1730 store.add(createCell(qf1, ts, seqId, value), null); 1731 store.add(createCell(qf2, ts, seqId, value), null); 1732 store.add(createCell(qf3, ts, seqId, value), null); 1733 TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); 1734 quals.add(qf1); 1735 quals.add(qf2); 1736 quals.add(qf3); 1737 try (InternalScanner scanner = 1738 (InternalScanner) store.getScanner(new Scan(new Get(row)), quals, seqId)) { 1739 List<Cell> results = new MyList<>(size -> { 1740 switch (size) { 1741 // 1) we get the first cell (qf1) 1742 // 2) flush the data to have StoreScanner update inner scanners 1743 // 3) the chunk will be reclaimed after updaing 1744 case 1: 1745 try { 1746 flushStore(store, id++); 1747 } catch (IOException e) { 1748 throw new RuntimeException(e); 1749 } 1750 break; 1751 // 1) we get the second cell (qf2) 1752 // 2) add some cell to fill some byte into the chunk (we have only one chunk) 1753 case 2: 1754 try { 1755 byte[] newValue = Bytes.toBytes("newValue"); 1756 // older data whihc shouldn't be "seen" by client 1757 store.add(createCell(qf1, ts + 1, seqId + 1, newValue), null); 1758 store.add(createCell(qf2, ts + 1, seqId + 1, newValue), null); 1759 store.add(createCell(qf3, ts + 1, seqId + 1, newValue), null); 1760 } catch (IOException e) { 1761 throw new RuntimeException(e); 1762 } 1763 break; 1764 default: 1765 break; 1766 } 1767 }); 1768 scanner.next(results); 1769 assertEquals(3, results.size()); 1770 for (Cell c : results) { 1771 byte[] actualValue = CellUtil.cloneValue(c); 1772 assertTrue("expected:" + Bytes.toStringBinary(value) + ", actual:" 1773 + Bytes.toStringBinary(actualValue), Bytes.equals(actualValue, value)); 1774 } 1775 } 1776 } 1777 1778 /** 1779 * If there are two running InMemoryFlushRunnable, the later InMemoryFlushRunnable may change the 1780 * versionedList. And the first InMemoryFlushRunnable will use the chagned versionedList to remove 1781 * the corresponding segments. In short, there will be some segements which isn't in merge are 1782 * removed. 1783 */ 1784 @Test 1785 public void testRunDoubleMemStoreCompactors() throws IOException, InterruptedException { 1786 int flushSize = 500; 1787 Configuration conf = HBaseConfiguration.create(); 1788 conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStoreWithCustomCompactor.class.getName()); 1789 conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.25); 1790 MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.set(0); 1791 conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushSize)); 1792 // Set the lower threshold to invoke the "MERGE" policy 1793 conf.set(MemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY, String.valueOf(0)); 1794 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) 1795 .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); 1796 byte[] value = Bytes.toBytes("thisisavarylargevalue"); 1797 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 1798 long ts = EnvironmentEdgeManager.currentTime(); 1799 long seqId = 100; 1800 // older data whihc shouldn't be "seen" by client 1801 store.add(createCell(qf1, ts, seqId, value), memStoreSizing); 1802 store.add(createCell(qf2, ts, seqId, value), memStoreSizing); 1803 store.add(createCell(qf3, ts, seqId, value), memStoreSizing); 1804 assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get()); 1805 StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY); 1806 storeFlushCtx.prepare(); 1807 // This shouldn't invoke another in-memory flush because the first compactor thread 1808 // hasn't accomplished the in-memory compaction. 1809 store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing); 1810 store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing); 1811 store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing); 1812 assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get()); 1813 // okay. Let the compaction be completed 1814 MyMemStoreCompactor.START_COMPACTOR_LATCH.countDown(); 1815 CompactingMemStore mem = (CompactingMemStore) ((HStore) store).memstore; 1816 while (mem.isMemStoreFlushingInMemory()) { 1817 TimeUnit.SECONDS.sleep(1); 1818 } 1819 // This should invoke another in-memory flush. 1820 store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing); 1821 store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing); 1822 store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing); 1823 assertEquals(2, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get()); 1824 conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1825 String.valueOf(TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE)); 1826 storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class)); 1827 storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); 1828 } 1829 1830 @Test 1831 public void testAge() throws IOException { 1832 long currentTime = EnvironmentEdgeManager.currentTime(); 1833 ManualEnvironmentEdge edge = new ManualEnvironmentEdge(); 1834 edge.setValue(currentTime); 1835 EnvironmentEdgeManager.injectEdge(edge); 1836 Configuration conf = TEST_UTIL.getConfiguration(); 1837 ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.of(family); 1838 initHRegion(name.getMethodName(), conf, 1839 TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), hcd, null, false); 1840 HStore store = new HStore(region, hcd, conf, false) { 1841 1842 @Override 1843 protected StoreEngine<?, ?, ?, ?> createStoreEngine(HStore store, Configuration conf, 1844 CellComparator kvComparator) throws IOException { 1845 List<HStoreFile> storefiles = 1846 Arrays.asList(mockStoreFile(currentTime - 10), mockStoreFile(currentTime - 100), 1847 mockStoreFile(currentTime - 1000), mockStoreFile(currentTime - 10000)); 1848 StoreFileManager sfm = mock(StoreFileManager.class); 1849 when(sfm.getStoreFiles()).thenReturn(storefiles); 1850 StoreEngine<?, ?, ?, ?> storeEngine = mock(StoreEngine.class); 1851 when(storeEngine.getStoreFileManager()).thenReturn(sfm); 1852 return storeEngine; 1853 } 1854 }; 1855 assertEquals(10L, store.getMinStoreFileAge().getAsLong()); 1856 assertEquals(10000L, store.getMaxStoreFileAge().getAsLong()); 1857 assertEquals((10 + 100 + 1000 + 10000) / 4.0, store.getAvgStoreFileAge().getAsDouble(), 1E-4); 1858 } 1859 1860 private HStoreFile mockStoreFile(long createdTime) { 1861 StoreFileInfo info = mock(StoreFileInfo.class); 1862 when(info.getCreatedTimestamp()).thenReturn(createdTime); 1863 HStoreFile sf = mock(HStoreFile.class); 1864 when(sf.getReader()).thenReturn(mock(StoreFileReader.class)); 1865 when(sf.isHFile()).thenReturn(true); 1866 when(sf.getFileInfo()).thenReturn(info); 1867 return sf; 1868 } 1869 1870 private MyStore initMyStore(String methodName, Configuration conf, MyStoreHook hook) 1871 throws IOException { 1872 return (MyStore) init(methodName, conf, 1873 TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), 1874 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(5).build(), hook); 1875 } 1876 1877 private static class MyStore extends HStore { 1878 private final MyStoreHook hook; 1879 1880 MyStore(final HRegion region, final ColumnFamilyDescriptor family, 1881 final Configuration confParam, MyStoreHook hook, boolean switchToPread) throws IOException { 1882 super(region, family, confParam, false); 1883 this.hook = hook; 1884 } 1885 1886 @Override 1887 public List<KeyValueScanner> getScanners(List<HStoreFile> files, boolean cacheBlocks, 1888 boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, 1889 boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt, 1890 boolean includeMemstoreScanner, boolean onlyLatestVersion) throws IOException { 1891 hook.getScanners(this); 1892 return super.getScanners(files, cacheBlocks, usePread, isCompaction, matcher, startRow, true, 1893 stopRow, false, readPt, includeMemstoreScanner, onlyLatestVersion); 1894 } 1895 1896 @Override 1897 public long getSmallestReadPoint() { 1898 return hook.getSmallestReadPoint(this); 1899 } 1900 } 1901 1902 private abstract static class MyStoreHook { 1903 1904 void getScanners(MyStore store) throws IOException { 1905 } 1906 1907 long getSmallestReadPoint(HStore store) { 1908 return store.getHRegion().getSmallestReadPoint(); 1909 } 1910 } 1911 1912 @Test 1913 public void testSwitchingPreadtoStreamParallelyWithCompactionDischarger() throws Exception { 1914 Configuration conf = HBaseConfiguration.create(); 1915 conf.set("hbase.hstore.engine.class", DummyStoreEngine.class.getName()); 1916 conf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 0); 1917 // Set the lower threshold to invoke the "MERGE" policy 1918 MyStore store = initMyStore(name.getMethodName(), conf, new MyStoreHook() { 1919 }); 1920 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 1921 long ts = EnvironmentEdgeManager.currentTime(); 1922 long seqID = 1L; 1923 // Add some data to the region and do some flushes 1924 for (int i = 1; i < 10; i++) { 1925 store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), 1926 memStoreSizing); 1927 } 1928 // flush them 1929 flushStore(store, seqID); 1930 for (int i = 11; i < 20; i++) { 1931 store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), 1932 memStoreSizing); 1933 } 1934 // flush them 1935 flushStore(store, seqID); 1936 for (int i = 21; i < 30; i++) { 1937 store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), 1938 memStoreSizing); 1939 } 1940 // flush them 1941 flushStore(store, seqID); 1942 1943 assertEquals(3, store.getStorefilesCount()); 1944 Scan scan = new Scan(); 1945 scan.addFamily(family); 1946 Collection<HStoreFile> storefiles2 = store.getStorefiles(); 1947 ArrayList<HStoreFile> actualStorefiles = Lists.newArrayList(storefiles2); 1948 StoreScanner storeScanner = 1949 (StoreScanner) store.getScanner(scan, scan.getFamilyMap().get(family), Long.MAX_VALUE); 1950 // get the current heap 1951 KeyValueHeap heap = storeScanner.heap; 1952 // create more store files 1953 for (int i = 31; i < 40; i++) { 1954 store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), 1955 memStoreSizing); 1956 } 1957 // flush them 1958 flushStore(store, seqID); 1959 1960 for (int i = 41; i < 50; i++) { 1961 store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), 1962 memStoreSizing); 1963 } 1964 // flush them 1965 flushStore(store, seqID); 1966 storefiles2 = store.getStorefiles(); 1967 ArrayList<HStoreFile> actualStorefiles1 = Lists.newArrayList(storefiles2); 1968 actualStorefiles1.removeAll(actualStorefiles); 1969 // Do compaction 1970 MyThread thread = new MyThread(storeScanner); 1971 thread.start(); 1972 store.replaceStoreFiles(actualStorefiles, actualStorefiles1, false); 1973 thread.join(); 1974 KeyValueHeap heap2 = thread.getHeap(); 1975 assertFalse(heap.equals(heap2)); 1976 } 1977 1978 @Test 1979 public void testMaxPreadBytesConfiguredToBeLessThanZero() throws Exception { 1980 Configuration conf = HBaseConfiguration.create(); 1981 conf.set("hbase.hstore.engine.class", DummyStoreEngine.class.getName()); 1982 // Set 'hbase.storescanner.pread.max.bytes' < 0, so that StoreScanner will be a STREAM type. 1983 conf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, -1); 1984 MyStore store = initMyStore(name.getMethodName(), conf, new MyStoreHook() { 1985 }); 1986 Scan scan = new Scan(); 1987 scan.addFamily(family); 1988 // ReadType on Scan is still DEFAULT only. 1989 assertEquals(ReadType.DEFAULT, scan.getReadType()); 1990 StoreScanner storeScanner = 1991 (StoreScanner) store.getScanner(scan, scan.getFamilyMap().get(family), Long.MAX_VALUE); 1992 assertFalse(storeScanner.isScanUsePread()); 1993 } 1994 1995 @Test 1996 public void testInMemoryCompactionTypeWithLowerCase() throws IOException, InterruptedException { 1997 Configuration conf = HBaseConfiguration.create(); 1998 conf.set("hbase.systemtables.compacting.memstore.type", "eager"); 1999 init(name.getMethodName(), conf, 2000 TableDescriptorBuilder.newBuilder( 2001 TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME, "meta".getBytes())), 2002 ColumnFamilyDescriptorBuilder.newBuilder(family) 2003 .setInMemoryCompaction(MemoryCompactionPolicy.NONE).build()); 2004 assertTrue(((MemStoreCompactor) ((CompactingMemStore) store.memstore).compactor).toString() 2005 .startsWith("eager".toUpperCase())); 2006 } 2007 2008 @Test 2009 public void testSpaceQuotaChangeAfterReplacement() throws IOException { 2010 final TableName tn = TableName.valueOf(name.getMethodName()); 2011 init(name.getMethodName()); 2012 2013 RegionSizeStoreImpl sizeStore = new RegionSizeStoreImpl(); 2014 2015 HStoreFile sf1 = mockStoreFileWithLength(1024L); 2016 HStoreFile sf2 = mockStoreFileWithLength(2048L); 2017 HStoreFile sf3 = mockStoreFileWithLength(4096L); 2018 HStoreFile sf4 = mockStoreFileWithLength(8192L); 2019 2020 RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tn).setStartKey(Bytes.toBytes("a")) 2021 .setEndKey(Bytes.toBytes("b")).build(); 2022 2023 // Compacting two files down to one, reducing size 2024 sizeStore.put(regionInfo, 1024L + 4096L); 2025 store.updateSpaceQuotaAfterFileReplacement(sizeStore, regionInfo, Arrays.asList(sf1, sf3), 2026 Arrays.asList(sf2)); 2027 2028 assertEquals(2048L, sizeStore.getRegionSize(regionInfo).getSize()); 2029 2030 // The same file length in and out should have no change 2031 store.updateSpaceQuotaAfterFileReplacement(sizeStore, regionInfo, Arrays.asList(sf2), 2032 Arrays.asList(sf2)); 2033 2034 assertEquals(2048L, sizeStore.getRegionSize(regionInfo).getSize()); 2035 2036 // Increase the total size used 2037 store.updateSpaceQuotaAfterFileReplacement(sizeStore, regionInfo, Arrays.asList(sf2), 2038 Arrays.asList(sf3)); 2039 2040 assertEquals(4096L, sizeStore.getRegionSize(regionInfo).getSize()); 2041 2042 RegionInfo regionInfo2 = RegionInfoBuilder.newBuilder(tn).setStartKey(Bytes.toBytes("b")) 2043 .setEndKey(Bytes.toBytes("c")).build(); 2044 store.updateSpaceQuotaAfterFileReplacement(sizeStore, regionInfo2, null, Arrays.asList(sf4)); 2045 2046 assertEquals(8192L, sizeStore.getRegionSize(regionInfo2).getSize()); 2047 } 2048 2049 @Test 2050 public void testHFileContextSetWithCFAndTable() throws Exception { 2051 init(this.name.getMethodName()); 2052 StoreFileWriter writer = store.getStoreEngine() 2053 .createWriter(CreateStoreFileWriterParams.create().maxKeyCount(10000L) 2054 .compression(Compression.Algorithm.NONE).isCompaction(true).includeMVCCReadpoint(true) 2055 .includesTag(false).shouldDropBehind(true)); 2056 HFileContext hFileContext = writer.getLiveFileWriter().getFileContext(); 2057 assertArrayEquals(family, hFileContext.getColumnFamily()); 2058 assertArrayEquals(table, hFileContext.getTableName()); 2059 } 2060 2061 // This test is for HBASE-26026, HBase Write be stuck when active segment has no cell 2062 // but its dataSize exceeds inmemoryFlushSize 2063 @Test 2064 public void testCompactingMemStoreNoCellButDataSizeExceedsInmemoryFlushSize() 2065 throws IOException, InterruptedException { 2066 Configuration conf = HBaseConfiguration.create(); 2067 2068 byte[] smallValue = new byte[3]; 2069 byte[] largeValue = new byte[9]; 2070 final long timestamp = EnvironmentEdgeManager.currentTime(); 2071 final long seqId = 100; 2072 final ExtendedCell smallCell = createCell(qf1, timestamp, seqId, smallValue); 2073 final ExtendedCell largeCell = createCell(qf2, timestamp, seqId, largeValue); 2074 int smallCellByteSize = MutableSegment.getCellLength(smallCell); 2075 int largeCellByteSize = MutableSegment.getCellLength(largeCell); 2076 int flushByteSize = smallCellByteSize + largeCellByteSize - 2; 2077 2078 // set CompactingMemStore.inmemoryFlushSize to flushByteSize. 2079 conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore2.class.getName()); 2080 conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005); 2081 conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200)); 2082 2083 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) 2084 .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); 2085 2086 MyCompactingMemStore2 myCompactingMemStore = ((MyCompactingMemStore2) store.memstore); 2087 assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize); 2088 myCompactingMemStore.smallCellPreUpdateCounter.set(0); 2089 myCompactingMemStore.largeCellPreUpdateCounter.set(0); 2090 2091 final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>(); 2092 Thread smallCellThread = new Thread(() -> { 2093 try { 2094 store.add(smallCell, new NonThreadSafeMemStoreSizing()); 2095 } catch (Throwable exception) { 2096 exceptionRef.set(exception); 2097 } 2098 }); 2099 smallCellThread.setName(MyCompactingMemStore2.SMALL_CELL_THREAD_NAME); 2100 smallCellThread.start(); 2101 2102 String oldThreadName = Thread.currentThread().getName(); 2103 try { 2104 /** 2105 * 1.smallCellThread enters CompactingMemStore.checkAndAddToActiveSize first, then 2106 * largeCellThread enters CompactingMemStore.checkAndAddToActiveSize, and then largeCellThread 2107 * invokes flushInMemory. 2108 * <p/> 2109 * 2. After largeCellThread finished CompactingMemStore.flushInMemory method, smallCellThread 2110 * can add cell to currentActive . That is to say when largeCellThread called flushInMemory 2111 * method, CompactingMemStore.active has no cell. 2112 */ 2113 Thread.currentThread().setName(MyCompactingMemStore2.LARGE_CELL_THREAD_NAME); 2114 store.add(largeCell, new NonThreadSafeMemStoreSizing()); 2115 smallCellThread.join(); 2116 2117 for (int i = 0; i < 100; i++) { 2118 long currentTimestamp = timestamp + 100 + i; 2119 ExtendedCell cell = createCell(qf2, currentTimestamp, seqId, largeValue); 2120 store.add(cell, new NonThreadSafeMemStoreSizing()); 2121 } 2122 } finally { 2123 Thread.currentThread().setName(oldThreadName); 2124 } 2125 2126 assertTrue(exceptionRef.get() == null); 2127 2128 } 2129 2130 // This test is for HBASE-26210, HBase Write be stuck when there is cell which size exceeds 2131 // InmemoryFlushSize 2132 @Test(timeout = 60000) 2133 public void testCompactingMemStoreCellExceedInmemoryFlushSize() throws Exception { 2134 Configuration conf = HBaseConfiguration.create(); 2135 conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore6.class.getName()); 2136 2137 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) 2138 .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); 2139 2140 MyCompactingMemStore6 myCompactingMemStore = ((MyCompactingMemStore6) store.memstore); 2141 2142 int size = (int) (myCompactingMemStore.getInmemoryFlushSize()); 2143 byte[] value = new byte[size + 1]; 2144 2145 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 2146 long timestamp = EnvironmentEdgeManager.currentTime(); 2147 long seqId = 100; 2148 ExtendedCell cell = createCell(qf1, timestamp, seqId, value); 2149 int cellByteSize = MutableSegment.getCellLength(cell); 2150 store.add(cell, memStoreSizing); 2151 assertTrue(memStoreSizing.getCellsCount() == 1); 2152 assertTrue(memStoreSizing.getDataSize() == cellByteSize); 2153 // Waiting the in memory compaction completed, see HBASE-26438 2154 myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await(); 2155 } 2156 2157 /** 2158 * This test is for HBASE-27464, before this JIRA,when init {@link CellChunkImmutableSegment} for 2159 * 'COMPACT' action, we not force copy to current MSLab. When cell size bigger than 2160 * {@link MemStoreLABImpl#maxAlloc}, cell will stay in previous chunk which will recycle after 2161 * segment replace, and we may read wrong data when these chunk reused by others. 2162 */ 2163 @Test 2164 public void testForceCloneOfBigCellForCellChunkImmutableSegment() throws Exception { 2165 Configuration conf = HBaseConfiguration.create(); 2166 int maxAllocByteSize = conf.getInt(MemStoreLAB.MAX_ALLOC_KEY, MemStoreLAB.MAX_ALLOC_DEFAULT); 2167 2168 // Construct big cell,which is large than {@link MemStoreLABImpl#maxAlloc}. 2169 byte[] cellValue = new byte[maxAllocByteSize + 1]; 2170 final long timestamp = EnvironmentEdgeManager.currentTime(); 2171 final long seqId = 100; 2172 final byte[] rowKey1 = Bytes.toBytes("rowKey1"); 2173 final ExtendedCell originalCell1 = createCell(rowKey1, qf1, timestamp, seqId, cellValue); 2174 final byte[] rowKey2 = Bytes.toBytes("rowKey2"); 2175 final ExtendedCell originalCell2 = createCell(rowKey2, qf1, timestamp, seqId, cellValue); 2176 TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); 2177 quals.add(qf1); 2178 2179 int cellByteSize = MutableSegment.getCellLength(originalCell1); 2180 int inMemoryFlushByteSize = cellByteSize - 1; 2181 2182 // set CompactingMemStore.inmemoryFlushSize to flushByteSize. 2183 conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore6.class.getName()); 2184 conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005); 2185 conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(inMemoryFlushByteSize * 200)); 2186 conf.setBoolean(WALFactory.WAL_ENABLED, false); 2187 2188 // Use {@link MemoryCompactionPolicy#EAGER} for always compacting. 2189 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) 2190 .setInMemoryCompaction(MemoryCompactionPolicy.EAGER).build()); 2191 2192 MyCompactingMemStore6 myCompactingMemStore = ((MyCompactingMemStore6) store.memstore); 2193 assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == inMemoryFlushByteSize); 2194 2195 // Data chunk Pool is disabled. 2196 assertTrue(ChunkCreator.getInstance().getMaxCount(ChunkType.DATA_CHUNK) == 0); 2197 2198 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 2199 2200 // First compact 2201 store.add(originalCell1, memStoreSizing); 2202 // Waiting for the first in-memory compaction finished 2203 myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await(); 2204 2205 StoreScanner storeScanner = 2206 (StoreScanner) store.getScanner(new Scan(new Get(rowKey1)), quals, seqId + 1); 2207 SegmentScanner segmentScanner = getTypeKeyValueScanner(storeScanner, SegmentScanner.class); 2208 ExtendedCell resultCell1 = segmentScanner.next(); 2209 assertTrue(PrivateCellUtil.equals(resultCell1, originalCell1)); 2210 int cell1ChunkId = resultCell1.getChunkId(); 2211 assertTrue(cell1ChunkId != ExtendedCell.CELL_NOT_BASED_ON_CHUNK); 2212 assertNull(segmentScanner.next()); 2213 segmentScanner.close(); 2214 storeScanner.close(); 2215 Segment segment = segmentScanner.segment; 2216 assertTrue(segment instanceof CellChunkImmutableSegment); 2217 MemStoreLABImpl memStoreLAB1 = (MemStoreLABImpl) (segmentScanner.segment.getMemStoreLAB()); 2218 assertTrue(!memStoreLAB1.isClosed()); 2219 assertTrue(!memStoreLAB1.chunks.isEmpty()); 2220 assertTrue(!memStoreLAB1.isReclaimed()); 2221 2222 // Second compact 2223 store.add(originalCell2, memStoreSizing); 2224 // Waiting for the second in-memory compaction finished 2225 myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await(); 2226 2227 // Before HBASE-27464, here may throw java.lang.IllegalArgumentException: In CellChunkMap, cell 2228 // must be associated with chunk.. We were looking for a cell at index 0. 2229 // The cause for this exception is because the data chunk Pool is disabled,when the data chunks 2230 // are recycled after the second in-memory compaction finished,the 2231 // {@link ChunkCreator.putbackChunks} method does not put the chunks back to the data chunk 2232 // pool,it just removes them from {@link ChunkCreator#chunkIdMap},so in 2233 // {@link CellChunkMap#getCell} we could not get the data chunk by chunkId. 2234 storeScanner = (StoreScanner) store.getScanner(new Scan(new Get(rowKey1)), quals, seqId + 1); 2235 segmentScanner = getTypeKeyValueScanner(storeScanner, SegmentScanner.class); 2236 ExtendedCell newResultCell1 = segmentScanner.next(); 2237 assertTrue(newResultCell1 != resultCell1); 2238 assertTrue(PrivateCellUtil.equals(newResultCell1, originalCell1)); 2239 2240 ExtendedCell resultCell2 = segmentScanner.next(); 2241 assertTrue(PrivateCellUtil.equals(resultCell2, originalCell2)); 2242 assertNull(segmentScanner.next()); 2243 segmentScanner.close(); 2244 storeScanner.close(); 2245 2246 segment = segmentScanner.segment; 2247 assertTrue(segment instanceof CellChunkImmutableSegment); 2248 MemStoreLABImpl memStoreLAB2 = (MemStoreLABImpl) (segmentScanner.segment.getMemStoreLAB()); 2249 assertTrue(!memStoreLAB2.isClosed()); 2250 assertTrue(!memStoreLAB2.chunks.isEmpty()); 2251 assertTrue(!memStoreLAB2.isReclaimed()); 2252 assertTrue(memStoreLAB1.isClosed()); 2253 assertTrue(memStoreLAB1.chunks.isEmpty()); 2254 assertTrue(memStoreLAB1.isReclaimed()); 2255 } 2256 2257 // This test is for HBASE-26210 also, test write large cell and small cell concurrently when 2258 // InmemoryFlushSize is smaller,equal with and larger than cell size. 2259 @Test 2260 public void testCompactingMemStoreWriteLargeCellAndSmallCellConcurrently() 2261 throws IOException, InterruptedException { 2262 doWriteTestLargeCellAndSmallCellConcurrently( 2263 (smallCellByteSize, largeCellByteSize) -> largeCellByteSize - 1); 2264 doWriteTestLargeCellAndSmallCellConcurrently( 2265 (smallCellByteSize, largeCellByteSize) -> largeCellByteSize); 2266 doWriteTestLargeCellAndSmallCellConcurrently( 2267 (smallCellByteSize, largeCellByteSize) -> smallCellByteSize + largeCellByteSize - 1); 2268 doWriteTestLargeCellAndSmallCellConcurrently( 2269 (smallCellByteSize, largeCellByteSize) -> smallCellByteSize + largeCellByteSize); 2270 doWriteTestLargeCellAndSmallCellConcurrently( 2271 (smallCellByteSize, largeCellByteSize) -> smallCellByteSize + largeCellByteSize + 1); 2272 } 2273 2274 private void doWriteTestLargeCellAndSmallCellConcurrently(IntBinaryOperator getFlushByteSize) 2275 throws IOException, InterruptedException { 2276 2277 Configuration conf = HBaseConfiguration.create(); 2278 2279 byte[] smallValue = new byte[3]; 2280 byte[] largeValue = new byte[100]; 2281 final long timestamp = EnvironmentEdgeManager.currentTime(); 2282 final long seqId = 100; 2283 final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue); 2284 final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue); 2285 int smallCellByteSize = MutableSegment.getCellLength(smallCell); 2286 int largeCellByteSize = MutableSegment.getCellLength(largeCell); 2287 int flushByteSize = getFlushByteSize.applyAsInt(smallCellByteSize, largeCellByteSize); 2288 boolean flushByteSizeLessThanSmallAndLargeCellSize = 2289 flushByteSize < (smallCellByteSize + largeCellByteSize); 2290 2291 conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore3.class.getName()); 2292 conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005); 2293 conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200)); 2294 2295 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) 2296 .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); 2297 2298 MyCompactingMemStore3 myCompactingMemStore = ((MyCompactingMemStore3) store.memstore); 2299 assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize); 2300 myCompactingMemStore.disableCompaction(); 2301 if (flushByteSizeLessThanSmallAndLargeCellSize) { 2302 myCompactingMemStore.flushByteSizeLessThanSmallAndLargeCellSize = true; 2303 } else { 2304 myCompactingMemStore.flushByteSizeLessThanSmallAndLargeCellSize = false; 2305 } 2306 2307 final ThreadSafeMemStoreSizing memStoreSizing = new ThreadSafeMemStoreSizing(); 2308 final AtomicLong totalCellByteSize = new AtomicLong(0); 2309 final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>(); 2310 Thread smallCellThread = new Thread(() -> { 2311 try { 2312 for (int i = 1; i <= MyCompactingMemStore3.CELL_COUNT; i++) { 2313 long currentTimestamp = timestamp + i; 2314 ExtendedCell cell = createCell(qf1, currentTimestamp, seqId, smallValue); 2315 totalCellByteSize.addAndGet(MutableSegment.getCellLength(cell)); 2316 store.add(cell, memStoreSizing); 2317 } 2318 } catch (Throwable exception) { 2319 exceptionRef.set(exception); 2320 2321 } 2322 }); 2323 smallCellThread.setName(MyCompactingMemStore3.SMALL_CELL_THREAD_NAME); 2324 smallCellThread.start(); 2325 2326 String oldThreadName = Thread.currentThread().getName(); 2327 try { 2328 /** 2329 * When flushByteSizeLessThanSmallAndLargeCellSize is true: 2330 * </p> 2331 * 1.smallCellThread enters MyCompactingMemStore3.checkAndAddToActiveSize first, then 2332 * largeCellThread enters MyCompactingMemStore3.checkAndAddToActiveSize, and then 2333 * largeCellThread invokes flushInMemory. 2334 * <p/> 2335 * 2. After largeCellThread finished CompactingMemStore.flushInMemory method, smallCellThread 2336 * can run into MyCompactingMemStore3.checkAndAddToActiveSize again. 2337 * <p/> 2338 * When flushByteSizeLessThanSmallAndLargeCellSize is false: smallCellThread and 2339 * largeCellThread concurrently write one cell and wait each other, and then write another 2340 * cell etc. 2341 */ 2342 Thread.currentThread().setName(MyCompactingMemStore3.LARGE_CELL_THREAD_NAME); 2343 for (int i = 1; i <= MyCompactingMemStore3.CELL_COUNT; i++) { 2344 long currentTimestamp = timestamp + i; 2345 ExtendedCell cell = createCell(qf2, currentTimestamp, seqId, largeValue); 2346 totalCellByteSize.addAndGet(MutableSegment.getCellLength(cell)); 2347 store.add(cell, memStoreSizing); 2348 } 2349 smallCellThread.join(); 2350 2351 assertTrue(exceptionRef.get() == null); 2352 assertTrue(memStoreSizing.getCellsCount() == (MyCompactingMemStore3.CELL_COUNT * 2)); 2353 assertTrue(memStoreSizing.getDataSize() == totalCellByteSize.get()); 2354 if (flushByteSizeLessThanSmallAndLargeCellSize) { 2355 assertTrue(myCompactingMemStore.flushCounter.get() == MyCompactingMemStore3.CELL_COUNT); 2356 } else { 2357 assertTrue( 2358 myCompactingMemStore.flushCounter.get() <= (MyCompactingMemStore3.CELL_COUNT - 1)); 2359 } 2360 } finally { 2361 Thread.currentThread().setName(oldThreadName); 2362 } 2363 } 2364 2365 /** 2366 * <pre> 2367 * This test is for HBASE-26384, 2368 * test {@link CompactingMemStore#flattenOneSegment} and {@link CompactingMemStore#snapshot()} 2369 * execute concurrently. 2370 * The threads sequence before HBASE-26384 is(The bug only exists for branch-2,and I add UTs 2371 * for both branch-2 and master): 2372 * 1. The {@link CompactingMemStore} size exceeds 2373 * {@link CompactingMemStore#getInmemoryFlushSize()},the write thread adds a new 2374 * {@link ImmutableSegment} to the head of {@link CompactingMemStore#pipeline},and start a 2375 * in memory compact thread to execute {@link CompactingMemStore#inMemoryCompaction}. 2376 * 2. The in memory compact thread starts and then stopping before 2377 * {@link CompactingMemStore#flattenOneSegment}. 2378 * 3. The snapshot thread starts {@link CompactingMemStore#snapshot} concurrently,after the 2379 * snapshot thread executing {@link CompactingMemStore#getImmutableSegments},the in memory 2380 * compact thread continues. 2381 * Assuming {@link VersionedSegmentsList#version} returned from 2382 * {@link CompactingMemStore#getImmutableSegments} is v. 2383 * 4. The snapshot thread stopping before {@link CompactingMemStore#swapPipelineWithNull}. 2384 * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment}, 2385 * {@link CompactionPipeline#version} is still v. 2386 * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because 2387 * {@link CompactionPipeline#version} is v, {@link CompactingMemStore#swapPipelineWithNull} 2388 * thinks it is successful and continue flushing,but the {@link ImmutableSegment} in 2389 * {@link CompactionPipeline} has changed because 2390 * {@link CompactingMemStore#flattenOneSegment},so the {@link ImmutableSegment} is not 2391 * removed in fact and still remaining in {@link CompactionPipeline}. 2392 * 2393 * After HBASE-26384, the 5-6 step is changed to following, which is expected behavior: 2394 * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment}, 2395 * {@link CompactingMemStore#flattenOneSegment} change {@link CompactionPipeline#version} to 2396 * v+1. 2397 * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because 2398 * {@link CompactionPipeline#version} is v+1, {@link CompactingMemStore#swapPipelineWithNull} 2399 * failed and retry the while loop in {@link CompactingMemStore#pushPipelineToSnapshot} once 2400 * again, because there is no concurrent {@link CompactingMemStore#inMemoryCompaction} now, 2401 * {@link CompactingMemStore#swapPipelineWithNull} succeeds. 2402 * </pre> 2403 */ 2404 @Test 2405 public void testFlattenAndSnapshotCompactingMemStoreConcurrently() throws Exception { 2406 Configuration conf = HBaseConfiguration.create(); 2407 2408 byte[] smallValue = new byte[3]; 2409 byte[] largeValue = new byte[9]; 2410 final long timestamp = EnvironmentEdgeManager.currentTime(); 2411 final long seqId = 100; 2412 final ExtendedCell smallCell = createCell(qf1, timestamp, seqId, smallValue); 2413 final ExtendedCell largeCell = createCell(qf2, timestamp, seqId, largeValue); 2414 int smallCellByteSize = MutableSegment.getCellLength(smallCell); 2415 int largeCellByteSize = MutableSegment.getCellLength(largeCell); 2416 int totalCellByteSize = (smallCellByteSize + largeCellByteSize); 2417 int flushByteSize = totalCellByteSize - 2; 2418 2419 // set CompactingMemStore.inmemoryFlushSize to flushByteSize. 2420 conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore4.class.getName()); 2421 conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005); 2422 conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200)); 2423 2424 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) 2425 .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); 2426 2427 MyCompactingMemStore4 myCompactingMemStore = ((MyCompactingMemStore4) store.memstore); 2428 assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize); 2429 2430 store.add(smallCell, new NonThreadSafeMemStoreSizing()); 2431 store.add(largeCell, new NonThreadSafeMemStoreSizing()); 2432 2433 String oldThreadName = Thread.currentThread().getName(); 2434 try { 2435 Thread.currentThread().setName(MyCompactingMemStore4.TAKE_SNAPSHOT_THREAD_NAME); 2436 /** 2437 * {@link CompactingMemStore#snapshot} must wait the in memory compact thread enters 2438 * {@link CompactingMemStore#flattenOneSegment},because {@link CompactingMemStore#snapshot} 2439 * would invoke {@link CompactingMemStore#stopCompaction}. 2440 */ 2441 myCompactingMemStore.snapShotStartCyclicCyclicBarrier.await(); 2442 2443 MemStoreSnapshot memStoreSnapshot = myCompactingMemStore.snapshot(); 2444 myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await(); 2445 2446 assertTrue(memStoreSnapshot.getCellsCount() == 2); 2447 assertTrue(((int) (memStoreSnapshot.getDataSize())) == totalCellByteSize); 2448 VersionedSegmentsList segments = myCompactingMemStore.getImmutableSegments(); 2449 assertTrue(segments.getNumOfSegments() == 0); 2450 assertTrue(segments.getNumOfCells() == 0); 2451 assertTrue(myCompactingMemStore.setInMemoryCompactionFlagCounter.get() == 1); 2452 assertTrue(myCompactingMemStore.swapPipelineWithNullCounter.get() == 2); 2453 } finally { 2454 Thread.currentThread().setName(oldThreadName); 2455 } 2456 } 2457 2458 /** 2459 * <pre> 2460 * This test is for HBASE-26384, 2461 * test {@link CompactingMemStore#flattenOneSegment}{@link CompactingMemStore#snapshot()} 2462 * and writeMemStore execute concurrently. 2463 * The threads sequence before HBASE-26384 is(The bug only exists for branch-2,and I add UTs 2464 * for both branch-2 and master): 2465 * 1. The {@link CompactingMemStore} size exceeds 2466 * {@link CompactingMemStore#getInmemoryFlushSize()},the write thread adds a new 2467 * {@link ImmutableSegment} to the head of {@link CompactingMemStore#pipeline},and start a 2468 * in memory compact thread to execute {@link CompactingMemStore#inMemoryCompaction}. 2469 * 2. The in memory compact thread starts and then stopping before 2470 * {@link CompactingMemStore#flattenOneSegment}. 2471 * 3. The snapshot thread starts {@link CompactingMemStore#snapshot} concurrently,after the 2472 * snapshot thread executing {@link CompactingMemStore#getImmutableSegments},the in memory 2473 * compact thread continues. 2474 * Assuming {@link VersionedSegmentsList#version} returned from 2475 * {@link CompactingMemStore#getImmutableSegments} is v. 2476 * 4. The snapshot thread stopping before {@link CompactingMemStore#swapPipelineWithNull}. 2477 * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment}, 2478 * {@link CompactionPipeline#version} is still v. 2479 * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because 2480 * {@link CompactionPipeline#version} is v, {@link CompactingMemStore#swapPipelineWithNull} 2481 * thinks it is successful and continue flushing,but the {@link ImmutableSegment} in 2482 * {@link CompactionPipeline} has changed because 2483 * {@link CompactingMemStore#flattenOneSegment},so the {@link ImmutableSegment} is not 2484 * removed in fact and still remaining in {@link CompactionPipeline}. 2485 * 2486 * After HBASE-26384, the 5-6 step is changed to following, which is expected behavior, 2487 * and I add step 7-8 to test there is new segment added before retry. 2488 * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment}, 2489 * {@link CompactingMemStore#flattenOneSegment} change {@link CompactionPipeline#version} to 2490 * v+1. 2491 * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because 2492 * {@link CompactionPipeline#version} is v+1, {@link CompactingMemStore#swapPipelineWithNull} 2493 * failed and retry,{@link VersionedSegmentsList#version} returned from 2494 * {@link CompactingMemStore#getImmutableSegments} is v+1. 2495 * 7. The write thread continues writing to {@link CompactingMemStore} and 2496 * {@link CompactingMemStore} size exceeds {@link CompactingMemStore#getInmemoryFlushSize()}, 2497 * {@link CompactingMemStore#flushInMemory(MutableSegment)} is called and a new 2498 * {@link ImmutableSegment} is added to the head of {@link CompactingMemStore#pipeline}, 2499 * {@link CompactionPipeline#version} is still v+1. 2500 * 8. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because 2501 * {@link CompactionPipeline#version} is still v+1, 2502 * {@link CompactingMemStore#swapPipelineWithNull} succeeds.The new {@link ImmutableSegment} 2503 * remained at the head of {@link CompactingMemStore#pipeline},the old is removed by 2504 * {@link CompactingMemStore#swapPipelineWithNull}. 2505 * </pre> 2506 */ 2507 @Test 2508 public void testFlattenSnapshotWriteCompactingMemeStoreConcurrently() throws Exception { 2509 Configuration conf = HBaseConfiguration.create(); 2510 2511 byte[] smallValue = new byte[3]; 2512 byte[] largeValue = new byte[9]; 2513 final long timestamp = EnvironmentEdgeManager.currentTime(); 2514 final long seqId = 100; 2515 final ExtendedCell smallCell = createCell(qf1, timestamp, seqId, smallValue); 2516 final ExtendedCell largeCell = createCell(qf2, timestamp, seqId, largeValue); 2517 int smallCellByteSize = MutableSegment.getCellLength(smallCell); 2518 int largeCellByteSize = MutableSegment.getCellLength(largeCell); 2519 int firstWriteCellByteSize = (smallCellByteSize + largeCellByteSize); 2520 int flushByteSize = firstWriteCellByteSize - 2; 2521 2522 // set CompactingMemStore.inmemoryFlushSize to flushByteSize. 2523 conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore5.class.getName()); 2524 conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005); 2525 conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200)); 2526 2527 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) 2528 .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); 2529 2530 final MyCompactingMemStore5 myCompactingMemStore = ((MyCompactingMemStore5) store.memstore); 2531 assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize); 2532 2533 store.add(smallCell, new NonThreadSafeMemStoreSizing()); 2534 store.add(largeCell, new NonThreadSafeMemStoreSizing()); 2535 2536 final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>(); 2537 final ExtendedCell writeAgainCell1 = createCell(qf3, timestamp, seqId + 1, largeValue); 2538 final ExtendedCell writeAgainCell2 = createCell(qf4, timestamp, seqId + 1, largeValue); 2539 final int writeAgainCellByteSize = 2540 MutableSegment.getCellLength(writeAgainCell1) + MutableSegment.getCellLength(writeAgainCell2); 2541 final Thread writeAgainThread = new Thread(() -> { 2542 try { 2543 myCompactingMemStore.writeMemStoreAgainStartCyclicBarrier.await(); 2544 2545 store.add(writeAgainCell1, new NonThreadSafeMemStoreSizing()); 2546 store.add(writeAgainCell2, new NonThreadSafeMemStoreSizing()); 2547 2548 myCompactingMemStore.writeMemStoreAgainEndCyclicBarrier.await(); 2549 } catch (Throwable exception) { 2550 exceptionRef.set(exception); 2551 } 2552 }); 2553 writeAgainThread.setName(MyCompactingMemStore5.WRITE_AGAIN_THREAD_NAME); 2554 writeAgainThread.start(); 2555 2556 String oldThreadName = Thread.currentThread().getName(); 2557 try { 2558 Thread.currentThread().setName(MyCompactingMemStore5.TAKE_SNAPSHOT_THREAD_NAME); 2559 /** 2560 * {@link CompactingMemStore#snapshot} must wait the in memory compact thread enters 2561 * {@link CompactingMemStore#flattenOneSegment},because {@link CompactingMemStore#snapshot} 2562 * would invoke {@link CompactingMemStore#stopCompaction}. 2563 */ 2564 myCompactingMemStore.snapShotStartCyclicCyclicBarrier.await(); 2565 MemStoreSnapshot memStoreSnapshot = myCompactingMemStore.snapshot(); 2566 myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await(); 2567 writeAgainThread.join(); 2568 2569 assertTrue(memStoreSnapshot.getCellsCount() == 2); 2570 assertTrue(((int) (memStoreSnapshot.getDataSize())) == firstWriteCellByteSize); 2571 VersionedSegmentsList segments = myCompactingMemStore.getImmutableSegments(); 2572 assertTrue(segments.getNumOfSegments() == 1); 2573 assertTrue( 2574 ((int) (segments.getStoreSegments().get(0).getDataSize())) == writeAgainCellByteSize); 2575 assertTrue(segments.getNumOfCells() == 2); 2576 assertTrue(myCompactingMemStore.setInMemoryCompactionFlagCounter.get() == 2); 2577 assertTrue(exceptionRef.get() == null); 2578 assertTrue(myCompactingMemStore.swapPipelineWithNullCounter.get() == 2); 2579 } finally { 2580 Thread.currentThread().setName(oldThreadName); 2581 } 2582 } 2583 2584 /** 2585 * <pre> 2586 * This test is for HBASE-26465, 2587 * test {@link DefaultMemStore#clearSnapshot} and {@link DefaultMemStore#getScanners} execute 2588 * concurrently. The threads sequence before HBASE-26465 is: 2589 * 1.The flush thread starts {@link DefaultMemStore} flushing after some cells have be added to 2590 * {@link DefaultMemStore}. 2591 * 2.The flush thread stopping before {@link DefaultMemStore#clearSnapshot} in 2592 * {@link HStore#updateStorefiles} after completed flushing memStore to hfile. 2593 * 3.The scan thread starts and stopping after {@link DefaultMemStore#getSnapshotSegments} in 2594 * {@link DefaultMemStore#getScanners},here the scan thread gets the 2595 * {@link DefaultMemStore#snapshot} which is created by the flush thread. 2596 * 4.The flush thread continues {@link DefaultMemStore#clearSnapshot} and close 2597 * {@link DefaultMemStore#snapshot},because the reference count of the corresponding 2598 * {@link MemStoreLABImpl} is 0, the {@link Chunk}s in corresponding {@link MemStoreLABImpl} 2599 * are recycled. 2600 * 5.The scan thread continues {@link DefaultMemStore#getScanners},and create a 2601 * {@link SegmentScanner} for this {@link DefaultMemStore#snapshot}, and increase the 2602 * reference count of the corresponding {@link MemStoreLABImpl}, but {@link Chunk}s in 2603 * corresponding {@link MemStoreLABImpl} are recycled by step 4, and these {@link Chunk}s may 2604 * be overwritten by other write threads,which may cause serious problem. 2605 * After HBASE-26465,{@link DefaultMemStore#getScanners} and 2606 * {@link DefaultMemStore#clearSnapshot} could not execute concurrently. 2607 * </pre> 2608 */ 2609 @Test 2610 public void testClearSnapshotGetScannerConcurrently() throws Exception { 2611 Configuration conf = HBaseConfiguration.create(); 2612 2613 byte[] smallValue = new byte[3]; 2614 byte[] largeValue = new byte[9]; 2615 final long timestamp = EnvironmentEdgeManager.currentTime(); 2616 final long seqId = 100; 2617 final ExtendedCell smallCell = createCell(qf1, timestamp, seqId, smallValue); 2618 final ExtendedCell largeCell = createCell(qf2, timestamp, seqId, largeValue); 2619 TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); 2620 quals.add(qf1); 2621 quals.add(qf2); 2622 2623 conf.set(HStore.MEMSTORE_CLASS_NAME, MyDefaultMemStore.class.getName()); 2624 conf.setBoolean(WALFactory.WAL_ENABLED, false); 2625 2626 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build()); 2627 MyDefaultMemStore myDefaultMemStore = (MyDefaultMemStore) (store.memstore); 2628 myDefaultMemStore.store = store; 2629 2630 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 2631 store.add(smallCell, memStoreSizing); 2632 store.add(largeCell, memStoreSizing); 2633 2634 final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>(); 2635 final Thread flushThread = new Thread(() -> { 2636 try { 2637 flushStore(store, id++); 2638 } catch (Throwable exception) { 2639 exceptionRef.set(exception); 2640 } 2641 }); 2642 flushThread.setName(MyDefaultMemStore.FLUSH_THREAD_NAME); 2643 flushThread.start(); 2644 2645 String oldThreadName = Thread.currentThread().getName(); 2646 StoreScanner storeScanner = null; 2647 try { 2648 Thread.currentThread().setName(MyDefaultMemStore.GET_SCANNER_THREAD_NAME); 2649 2650 /** 2651 * Wait flush thread stopping before {@link DefaultMemStore#doClearSnapshot} 2652 */ 2653 myDefaultMemStore.getScannerCyclicBarrier.await(); 2654 2655 storeScanner = (StoreScanner) store.getScanner(new Scan(new Get(row)), quals, seqId + 1); 2656 flushThread.join(); 2657 2658 if (myDefaultMemStore.shouldWait) { 2659 SegmentScanner segmentScanner = getTypeKeyValueScanner(storeScanner, SegmentScanner.class); 2660 MemStoreLABImpl memStoreLAB = (MemStoreLABImpl) (segmentScanner.segment.getMemStoreLAB()); 2661 assertTrue(memStoreLAB.isClosed()); 2662 assertTrue(!memStoreLAB.chunks.isEmpty()); 2663 assertTrue(!memStoreLAB.isReclaimed()); 2664 2665 ExtendedCell cell1 = segmentScanner.next(); 2666 PrivateCellUtil.equals(smallCell, cell1); 2667 ExtendedCell cell2 = segmentScanner.next(); 2668 PrivateCellUtil.equals(largeCell, cell2); 2669 assertNull(segmentScanner.next()); 2670 } else { 2671 List<ExtendedCell> results = new ArrayList<>(); 2672 storeScanner.next(results); 2673 assertEquals(2, results.size()); 2674 PrivateCellUtil.equals(smallCell, results.get(0)); 2675 PrivateCellUtil.equals(largeCell, results.get(1)); 2676 } 2677 assertTrue(exceptionRef.get() == null); 2678 } finally { 2679 if (storeScanner != null) { 2680 storeScanner.close(); 2681 } 2682 Thread.currentThread().setName(oldThreadName); 2683 } 2684 } 2685 2686 @SuppressWarnings("unchecked") 2687 private <T> T getTypeKeyValueScanner(StoreScanner storeScanner, Class<T> keyValueScannerClass) { 2688 List<T> resultScanners = new ArrayList<T>(); 2689 for (KeyValueScanner keyValueScanner : storeScanner.currentScanners) { 2690 if (keyValueScannerClass.isInstance(keyValueScanner)) { 2691 resultScanners.add((T) keyValueScanner); 2692 } 2693 } 2694 assertTrue(resultScanners.size() == 1); 2695 return resultScanners.get(0); 2696 } 2697 2698 @Test 2699 public void testOnConfigurationChange() throws IOException { 2700 final int COMMON_MAX_FILES_TO_COMPACT = 10; 2701 final int NEW_COMMON_MAX_FILES_TO_COMPACT = 8; 2702 final int STORE_MAX_FILES_TO_COMPACT = 6; 2703 2704 // Build a table that its maxFileToCompact different from common configuration. 2705 Configuration conf = HBaseConfiguration.create(); 2706 conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 2707 COMMON_MAX_FILES_TO_COMPACT); 2708 conf.setBoolean(CACHE_DATA_ON_READ_KEY, false); 2709 conf.setBoolean(CACHE_BLOCKS_ON_WRITE_KEY, true); 2710 conf.setBoolean(EVICT_BLOCKS_ON_CLOSE_KEY, true); 2711 ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.newBuilder(family) 2712 .setConfiguration(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 2713 String.valueOf(STORE_MAX_FILES_TO_COMPACT)) 2714 .build(); 2715 init(this.name.getMethodName(), conf, hcd); 2716 2717 // After updating common configuration, the conf in HStore itself must not be changed. 2718 conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 2719 NEW_COMMON_MAX_FILES_TO_COMPACT); 2720 this.store.onConfigurationChange(conf); 2721 2722 assertEquals(STORE_MAX_FILES_TO_COMPACT, 2723 store.getStoreEngine().getCompactionPolicy().getConf().getMaxFilesToCompact()); 2724 2725 assertEquals(conf.getBoolean(CACHE_DATA_ON_READ_KEY, DEFAULT_CACHE_DATA_ON_READ), false); 2726 assertEquals(conf.getBoolean(CACHE_BLOCKS_ON_WRITE_KEY, DEFAULT_CACHE_DATA_ON_WRITE), true); 2727 assertEquals(conf.getBoolean(EVICT_BLOCKS_ON_CLOSE_KEY, DEFAULT_EVICT_ON_CLOSE), true); 2728 2729 // reset to default values 2730 conf.getBoolean(CACHE_DATA_ON_READ_KEY, DEFAULT_CACHE_DATA_ON_READ); 2731 conf.getBoolean(CACHE_BLOCKS_ON_WRITE_KEY, DEFAULT_CACHE_DATA_ON_WRITE); 2732 conf.getBoolean(EVICT_BLOCKS_ON_CLOSE_KEY, DEFAULT_EVICT_ON_CLOSE); 2733 this.store.onConfigurationChange(conf); 2734 } 2735 2736 /** 2737 * This test is for HBASE-26476 2738 */ 2739 @Test 2740 public void testExtendsDefaultMemStore() throws Exception { 2741 Configuration conf = HBaseConfiguration.create(); 2742 conf.setBoolean(WALFactory.WAL_ENABLED, false); 2743 2744 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build()); 2745 assertTrue(this.store.memstore.getClass() == DefaultMemStore.class); 2746 tearDown(); 2747 2748 conf.set(HStore.MEMSTORE_CLASS_NAME, CustomDefaultMemStore.class.getName()); 2749 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build()); 2750 assertTrue(this.store.memstore.getClass() == CustomDefaultMemStore.class); 2751 } 2752 2753 static class CustomDefaultMemStore extends DefaultMemStore { 2754 2755 public CustomDefaultMemStore(Configuration conf, CellComparator c, 2756 RegionServicesForStores regionServices) { 2757 super(conf, c, regionServices); 2758 } 2759 2760 } 2761 2762 /** 2763 * This test is for HBASE-26488 2764 */ 2765 @Test 2766 public void testMemoryLeakWhenFlushMemStoreRetrying() throws Exception { 2767 Configuration conf = HBaseConfiguration.create(); 2768 2769 byte[] smallValue = new byte[3]; 2770 byte[] largeValue = new byte[9]; 2771 final long timestamp = EnvironmentEdgeManager.currentTime(); 2772 final long seqId = 100; 2773 final ExtendedCell smallCell = createCell(qf1, timestamp, seqId, smallValue); 2774 final ExtendedCell largeCell = createCell(qf2, timestamp, seqId, largeValue); 2775 TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); 2776 quals.add(qf1); 2777 quals.add(qf2); 2778 2779 conf.set(HStore.MEMSTORE_CLASS_NAME, MyDefaultMemStore1.class.getName()); 2780 conf.setBoolean(WALFactory.WAL_ENABLED, false); 2781 conf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY, 2782 MyDefaultStoreFlusher.class.getName()); 2783 2784 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build()); 2785 MyDefaultMemStore1 myDefaultMemStore = (MyDefaultMemStore1) (store.memstore); 2786 assertTrue((store.storeEngine.getStoreFlusher()) instanceof MyDefaultStoreFlusher); 2787 2788 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 2789 store.add(smallCell, memStoreSizing); 2790 store.add(largeCell, memStoreSizing); 2791 flushStore(store, id++); 2792 2793 MemStoreLABImpl memStoreLAB = 2794 (MemStoreLABImpl) (myDefaultMemStore.snapshotImmutableSegment.getMemStoreLAB()); 2795 assertTrue(memStoreLAB.isClosed()); 2796 assertTrue(memStoreLAB.getRefCntValue() == 0); 2797 assertTrue(memStoreLAB.isReclaimed()); 2798 assertTrue(memStoreLAB.chunks.isEmpty()); 2799 StoreScanner storeScanner = null; 2800 try { 2801 storeScanner = (StoreScanner) store.getScanner(new Scan(new Get(row)), quals, seqId + 1); 2802 assertTrue(store.storeEngine.getStoreFileManager().getStorefileCount() == 1); 2803 assertTrue(store.memstore.size().getCellsCount() == 0); 2804 assertTrue(store.memstore.getSnapshotSize().getCellsCount() == 0); 2805 assertTrue(storeScanner.currentScanners.size() == 1); 2806 assertTrue(storeScanner.currentScanners.get(0) instanceof StoreFileScanner); 2807 2808 List<ExtendedCell> results = new ArrayList<>(); 2809 storeScanner.next(results); 2810 assertEquals(2, results.size()); 2811 PrivateCellUtil.equals(smallCell, results.get(0)); 2812 PrivateCellUtil.equals(largeCell, results.get(1)); 2813 } finally { 2814 if (storeScanner != null) { 2815 storeScanner.close(); 2816 } 2817 } 2818 } 2819 2820 static class MyDefaultMemStore1 extends DefaultMemStore { 2821 2822 private ImmutableSegment snapshotImmutableSegment; 2823 2824 public MyDefaultMemStore1(Configuration conf, CellComparator c, 2825 RegionServicesForStores regionServices) { 2826 super(conf, c, regionServices); 2827 } 2828 2829 @Override 2830 public MemStoreSnapshot snapshot() { 2831 MemStoreSnapshot result = super.snapshot(); 2832 this.snapshotImmutableSegment = snapshot; 2833 return result; 2834 } 2835 2836 } 2837 2838 public static class MyDefaultStoreFlusher extends DefaultStoreFlusher { 2839 private static final AtomicInteger failCounter = new AtomicInteger(1); 2840 private static final AtomicInteger counter = new AtomicInteger(0); 2841 2842 public MyDefaultStoreFlusher(Configuration conf, HStore store) { 2843 super(conf, store); 2844 } 2845 2846 @Override 2847 public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId, 2848 MonitoredTask status, ThroughputController throughputController, 2849 FlushLifeCycleTracker tracker, Consumer<Path> writerCreationTracker) throws IOException { 2850 counter.incrementAndGet(); 2851 return super.flushSnapshot(snapshot, cacheFlushId, status, throughputController, tracker, 2852 writerCreationTracker); 2853 } 2854 2855 @Override 2856 protected void performFlush(InternalScanner scanner, final CellSink sink, 2857 ThroughputController throughputController) throws IOException { 2858 2859 final int currentCount = counter.get(); 2860 CellSink newCellSink = (cell) -> { 2861 if (currentCount <= failCounter.get()) { 2862 throw new IOException("Simulated exception by tests"); 2863 } 2864 sink.append(cell); 2865 }; 2866 super.performFlush(scanner, newCellSink, throughputController); 2867 } 2868 } 2869 2870 /** 2871 * This test is for HBASE-26494, test the {@link RefCnt} behaviors in {@link ImmutableMemStoreLAB} 2872 */ 2873 @Test 2874 public void testImmutableMemStoreLABRefCnt() throws Exception { 2875 Configuration conf = HBaseConfiguration.create(); 2876 2877 byte[] smallValue = new byte[3]; 2878 byte[] largeValue = new byte[9]; 2879 final long timestamp = EnvironmentEdgeManager.currentTime(); 2880 final long seqId = 100; 2881 final ExtendedCell smallCell1 = createCell(qf1, timestamp, seqId, smallValue); 2882 final ExtendedCell largeCell1 = createCell(qf2, timestamp, seqId, largeValue); 2883 final ExtendedCell smallCell2 = createCell(qf3, timestamp, seqId + 1, smallValue); 2884 final ExtendedCell largeCell2 = createCell(qf4, timestamp, seqId + 1, largeValue); 2885 final ExtendedCell smallCell3 = createCell(qf5, timestamp, seqId + 2, smallValue); 2886 final ExtendedCell largeCell3 = createCell(qf6, timestamp, seqId + 2, largeValue); 2887 2888 int smallCellByteSize = MutableSegment.getCellLength(smallCell1); 2889 int largeCellByteSize = MutableSegment.getCellLength(largeCell1); 2890 int firstWriteCellByteSize = (smallCellByteSize + largeCellByteSize); 2891 int flushByteSize = firstWriteCellByteSize - 2; 2892 2893 // set CompactingMemStore.inmemoryFlushSize to flushByteSize. 2894 conf.set(HStore.MEMSTORE_CLASS_NAME, CompactingMemStore.class.getName()); 2895 conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005); 2896 conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200)); 2897 conf.setBoolean(WALFactory.WAL_ENABLED, false); 2898 2899 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) 2900 .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); 2901 2902 final CompactingMemStore myCompactingMemStore = ((CompactingMemStore) store.memstore); 2903 assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize); 2904 myCompactingMemStore.allowCompaction.set(false); 2905 2906 NonThreadSafeMemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 2907 store.add(smallCell1, memStoreSizing); 2908 store.add(largeCell1, memStoreSizing); 2909 store.add(smallCell2, memStoreSizing); 2910 store.add(largeCell2, memStoreSizing); 2911 store.add(smallCell3, memStoreSizing); 2912 store.add(largeCell3, memStoreSizing); 2913 VersionedSegmentsList versionedSegmentsList = myCompactingMemStore.getImmutableSegments(); 2914 assertTrue(versionedSegmentsList.getNumOfSegments() == 3); 2915 List<ImmutableSegment> segments = versionedSegmentsList.getStoreSegments(); 2916 List<MemStoreLABImpl> memStoreLABs = new ArrayList<MemStoreLABImpl>(segments.size()); 2917 for (ImmutableSegment segment : segments) { 2918 memStoreLABs.add((MemStoreLABImpl) segment.getMemStoreLAB()); 2919 } 2920 List<KeyValueScanner> scanners1 = myCompactingMemStore.getScanners(Long.MAX_VALUE); 2921 for (MemStoreLABImpl memStoreLAB : memStoreLABs) { 2922 assertTrue(memStoreLAB.getRefCntValue() == 2); 2923 } 2924 2925 myCompactingMemStore.allowCompaction.set(true); 2926 myCompactingMemStore.flushInMemory(); 2927 2928 versionedSegmentsList = myCompactingMemStore.getImmutableSegments(); 2929 assertTrue(versionedSegmentsList.getNumOfSegments() == 1); 2930 ImmutableMemStoreLAB immutableMemStoreLAB = 2931 (ImmutableMemStoreLAB) (versionedSegmentsList.getStoreSegments().get(0).getMemStoreLAB()); 2932 for (MemStoreLABImpl memStoreLAB : memStoreLABs) { 2933 assertTrue(memStoreLAB.getRefCntValue() == 2); 2934 } 2935 2936 List<KeyValueScanner> scanners2 = myCompactingMemStore.getScanners(Long.MAX_VALUE); 2937 for (MemStoreLABImpl memStoreLAB : memStoreLABs) { 2938 assertTrue(memStoreLAB.getRefCntValue() == 2); 2939 } 2940 assertTrue(immutableMemStoreLAB.getRefCntValue() == 2); 2941 for (KeyValueScanner scanner : scanners1) { 2942 scanner.close(); 2943 } 2944 for (MemStoreLABImpl memStoreLAB : memStoreLABs) { 2945 assertTrue(memStoreLAB.getRefCntValue() == 1); 2946 } 2947 for (KeyValueScanner scanner : scanners2) { 2948 scanner.close(); 2949 } 2950 for (MemStoreLABImpl memStoreLAB : memStoreLABs) { 2951 assertTrue(memStoreLAB.getRefCntValue() == 1); 2952 } 2953 assertTrue(immutableMemStoreLAB.getRefCntValue() == 1); 2954 flushStore(store, id++); 2955 for (MemStoreLABImpl memStoreLAB : memStoreLABs) { 2956 assertTrue(memStoreLAB.getRefCntValue() == 0); 2957 } 2958 assertTrue(immutableMemStoreLAB.getRefCntValue() == 0); 2959 assertTrue(immutableMemStoreLAB.isClosed()); 2960 for (MemStoreLABImpl memStoreLAB : memStoreLABs) { 2961 assertTrue(memStoreLAB.isClosed()); 2962 assertTrue(memStoreLAB.isReclaimed()); 2963 assertTrue(memStoreLAB.chunks.isEmpty()); 2964 } 2965 } 2966 2967 private HStoreFile mockStoreFileWithLength(long length) { 2968 HStoreFile sf = mock(HStoreFile.class); 2969 StoreFileReader sfr = mock(StoreFileReader.class); 2970 when(sf.isHFile()).thenReturn(true); 2971 when(sf.getReader()).thenReturn(sfr); 2972 when(sfr.length()).thenReturn(length); 2973 return sf; 2974 } 2975 2976 private static class MyThread extends Thread { 2977 private StoreScanner scanner; 2978 private KeyValueHeap heap; 2979 2980 public MyThread(StoreScanner scanner) { 2981 this.scanner = scanner; 2982 } 2983 2984 public KeyValueHeap getHeap() { 2985 return this.heap; 2986 } 2987 2988 @Override 2989 public void run() { 2990 scanner.trySwitchToStreamRead(); 2991 heap = scanner.heap; 2992 } 2993 } 2994 2995 private static class MyMemStoreCompactor extends MemStoreCompactor { 2996 private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0); 2997 private static final CountDownLatch START_COMPACTOR_LATCH = new CountDownLatch(1); 2998 2999 public MyMemStoreCompactor(CompactingMemStore compactingMemStore, 3000 MemoryCompactionPolicy compactionPolicy) throws IllegalArgumentIOException { 3001 super(compactingMemStore, compactionPolicy); 3002 } 3003 3004 @Override 3005 public boolean start() throws IOException { 3006 boolean isFirst = RUNNER_COUNT.getAndIncrement() == 0; 3007 if (isFirst) { 3008 try { 3009 START_COMPACTOR_LATCH.await(); 3010 return super.start(); 3011 } catch (InterruptedException ex) { 3012 throw new RuntimeException(ex); 3013 } 3014 } 3015 return super.start(); 3016 } 3017 } 3018 3019 public static class MyCompactingMemStoreWithCustomCompactor extends CompactingMemStore { 3020 private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0); 3021 3022 public MyCompactingMemStoreWithCustomCompactor(Configuration conf, CellComparatorImpl c, 3023 HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy) 3024 throws IOException { 3025 super(conf, c, store, regionServices, compactionPolicy); 3026 } 3027 3028 @Override 3029 protected MemStoreCompactor createMemStoreCompactor(MemoryCompactionPolicy compactionPolicy) 3030 throws IllegalArgumentIOException { 3031 return new MyMemStoreCompactor(this, compactionPolicy); 3032 } 3033 3034 @Override 3035 protected boolean setInMemoryCompactionFlag() { 3036 boolean rval = super.setInMemoryCompactionFlag(); 3037 if (rval) { 3038 RUNNER_COUNT.incrementAndGet(); 3039 if (LOG.isDebugEnabled()) { 3040 LOG.debug("runner count: " + RUNNER_COUNT.get()); 3041 } 3042 } 3043 return rval; 3044 } 3045 } 3046 3047 public static class MyCompactingMemStore extends CompactingMemStore { 3048 private static final AtomicBoolean START_TEST = new AtomicBoolean(false); 3049 private final CountDownLatch getScannerLatch = new CountDownLatch(1); 3050 private final CountDownLatch snapshotLatch = new CountDownLatch(1); 3051 3052 public MyCompactingMemStore(Configuration conf, CellComparatorImpl c, HStore store, 3053 RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy) 3054 throws IOException { 3055 super(conf, c, store, regionServices, compactionPolicy); 3056 } 3057 3058 @Override 3059 protected List<KeyValueScanner> createList(int capacity) { 3060 if (START_TEST.get()) { 3061 try { 3062 getScannerLatch.countDown(); 3063 snapshotLatch.await(); 3064 } catch (InterruptedException e) { 3065 throw new RuntimeException(e); 3066 } 3067 } 3068 return new ArrayList<>(capacity); 3069 } 3070 3071 @Override 3072 protected void pushActiveToPipeline(MutableSegment active, boolean checkEmpty) { 3073 if (START_TEST.get()) { 3074 try { 3075 getScannerLatch.await(); 3076 } catch (InterruptedException e) { 3077 throw new RuntimeException(e); 3078 } 3079 } 3080 3081 super.pushActiveToPipeline(active, checkEmpty); 3082 if (START_TEST.get()) { 3083 snapshotLatch.countDown(); 3084 } 3085 } 3086 } 3087 3088 interface MyListHook { 3089 void hook(int currentSize); 3090 } 3091 3092 private static class MyList<T> implements List<T> { 3093 private final List<T> delegatee = new ArrayList<>(); 3094 private final MyListHook hookAtAdd; 3095 3096 MyList(final MyListHook hookAtAdd) { 3097 this.hookAtAdd = hookAtAdd; 3098 } 3099 3100 @Override 3101 public int size() { 3102 return delegatee.size(); 3103 } 3104 3105 @Override 3106 public boolean isEmpty() { 3107 return delegatee.isEmpty(); 3108 } 3109 3110 @Override 3111 public boolean contains(Object o) { 3112 return delegatee.contains(o); 3113 } 3114 3115 @Override 3116 public Iterator<T> iterator() { 3117 return delegatee.iterator(); 3118 } 3119 3120 @Override 3121 public Object[] toArray() { 3122 return delegatee.toArray(); 3123 } 3124 3125 @Override 3126 public <R> R[] toArray(R[] a) { 3127 return delegatee.toArray(a); 3128 } 3129 3130 @Override 3131 public boolean add(T e) { 3132 hookAtAdd.hook(size()); 3133 return delegatee.add(e); 3134 } 3135 3136 @Override 3137 public boolean remove(Object o) { 3138 return delegatee.remove(o); 3139 } 3140 3141 @Override 3142 public boolean containsAll(Collection<?> c) { 3143 return delegatee.containsAll(c); 3144 } 3145 3146 @Override 3147 public boolean addAll(Collection<? extends T> c) { 3148 return delegatee.addAll(c); 3149 } 3150 3151 @Override 3152 public boolean addAll(int index, Collection<? extends T> c) { 3153 return delegatee.addAll(index, c); 3154 } 3155 3156 @Override 3157 public boolean removeAll(Collection<?> c) { 3158 return delegatee.removeAll(c); 3159 } 3160 3161 @Override 3162 public boolean retainAll(Collection<?> c) { 3163 return delegatee.retainAll(c); 3164 } 3165 3166 @Override 3167 public void clear() { 3168 delegatee.clear(); 3169 } 3170 3171 @Override 3172 public T get(int index) { 3173 return delegatee.get(index); 3174 } 3175 3176 @Override 3177 public T set(int index, T element) { 3178 return delegatee.set(index, element); 3179 } 3180 3181 @Override 3182 public void add(int index, T element) { 3183 delegatee.add(index, element); 3184 } 3185 3186 @Override 3187 public T remove(int index) { 3188 return delegatee.remove(index); 3189 } 3190 3191 @Override 3192 public int indexOf(Object o) { 3193 return delegatee.indexOf(o); 3194 } 3195 3196 @Override 3197 public int lastIndexOf(Object o) { 3198 return delegatee.lastIndexOf(o); 3199 } 3200 3201 @Override 3202 public ListIterator<T> listIterator() { 3203 return delegatee.listIterator(); 3204 } 3205 3206 @Override 3207 public ListIterator<T> listIterator(int index) { 3208 return delegatee.listIterator(index); 3209 } 3210 3211 @Override 3212 public List<T> subList(int fromIndex, int toIndex) { 3213 return delegatee.subList(fromIndex, toIndex); 3214 } 3215 } 3216 3217 private interface MyKeyValueHeapHook { 3218 void onRecordBlockSize(int recordBlockSizeCallCount); 3219 } 3220 3221 private static class MyKeyValueHeap extends KeyValueHeap { 3222 private final MyKeyValueHeapHook hook; 3223 private int recordBlockSizeCallCount; 3224 3225 public MyKeyValueHeap(List<? extends KeyValueScanner> scanners, CellComparator comparator, 3226 MyKeyValueHeapHook hook) throws IOException { 3227 super(scanners, comparator); 3228 this.hook = hook; 3229 } 3230 3231 @Override 3232 public void recordBlockSize(IntConsumer blockSizeConsumer) { 3233 recordBlockSizeCallCount++; 3234 hook.onRecordBlockSize(recordBlockSizeCallCount); 3235 super.recordBlockSize(blockSizeConsumer); 3236 } 3237 } 3238 3239 public static class MyCompactingMemStore2 extends CompactingMemStore { 3240 private static final String LARGE_CELL_THREAD_NAME = "largeCellThread"; 3241 private static final String SMALL_CELL_THREAD_NAME = "smallCellThread"; 3242 private final CyclicBarrier preCyclicBarrier = new CyclicBarrier(2); 3243 private final CyclicBarrier postCyclicBarrier = new CyclicBarrier(2); 3244 private final AtomicInteger largeCellPreUpdateCounter = new AtomicInteger(0); 3245 private final AtomicInteger smallCellPreUpdateCounter = new AtomicInteger(0); 3246 3247 public MyCompactingMemStore2(Configuration conf, CellComparatorImpl cellComparator, 3248 HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy) 3249 throws IOException { 3250 super(conf, cellComparator, store, regionServices, compactionPolicy); 3251 } 3252 3253 @Override 3254 protected boolean checkAndAddToActiveSize(MutableSegment currActive, Cell cellToAdd, 3255 MemStoreSizing memstoreSizing) { 3256 if (Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)) { 3257 int currentCount = largeCellPreUpdateCounter.incrementAndGet(); 3258 if (currentCount <= 1) { 3259 try { 3260 /** 3261 * smallCellThread enters CompactingMemStore.checkAndAddToActiveSize first, then 3262 * largeCellThread enters CompactingMemStore.checkAndAddToActiveSize, and then 3263 * largeCellThread invokes flushInMemory. 3264 */ 3265 preCyclicBarrier.await(); 3266 } catch (Throwable e) { 3267 throw new RuntimeException(e); 3268 } 3269 } 3270 } 3271 3272 boolean returnValue = super.checkAndAddToActiveSize(currActive, cellToAdd, memstoreSizing); 3273 if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) { 3274 try { 3275 preCyclicBarrier.await(); 3276 } catch (Throwable e) { 3277 throw new RuntimeException(e); 3278 } 3279 } 3280 return returnValue; 3281 } 3282 3283 @Override 3284 protected void doAdd(MutableSegment currentActive, ExtendedCell cell, 3285 MemStoreSizing memstoreSizing) { 3286 if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) { 3287 try { 3288 /** 3289 * After largeCellThread finished flushInMemory method, smallCellThread can add cell to 3290 * currentActive . That is to say when largeCellThread called flushInMemory method, 3291 * currentActive has no cell. 3292 */ 3293 postCyclicBarrier.await(); 3294 } catch (Throwable e) { 3295 throw new RuntimeException(e); 3296 } 3297 } 3298 super.doAdd(currentActive, cell, memstoreSizing); 3299 } 3300 3301 @Override 3302 protected void flushInMemory(MutableSegment currentActiveMutableSegment) { 3303 super.flushInMemory(currentActiveMutableSegment); 3304 if (Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)) { 3305 if (largeCellPreUpdateCounter.get() <= 1) { 3306 try { 3307 postCyclicBarrier.await(); 3308 } catch (Throwable e) { 3309 throw new RuntimeException(e); 3310 } 3311 } 3312 } 3313 } 3314 3315 } 3316 3317 public static class MyCompactingMemStore3 extends CompactingMemStore { 3318 private static final String LARGE_CELL_THREAD_NAME = "largeCellThread"; 3319 private static final String SMALL_CELL_THREAD_NAME = "smallCellThread"; 3320 3321 private final CyclicBarrier preCyclicBarrier = new CyclicBarrier(2); 3322 private final CyclicBarrier postCyclicBarrier = new CyclicBarrier(2); 3323 private final AtomicInteger flushCounter = new AtomicInteger(0); 3324 private static final int CELL_COUNT = 5; 3325 private boolean flushByteSizeLessThanSmallAndLargeCellSize = true; 3326 3327 public MyCompactingMemStore3(Configuration conf, CellComparatorImpl cellComparator, 3328 HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy) 3329 throws IOException { 3330 super(conf, cellComparator, store, regionServices, compactionPolicy); 3331 } 3332 3333 @Override 3334 protected boolean checkAndAddToActiveSize(MutableSegment currActive, Cell cellToAdd, 3335 MemStoreSizing memstoreSizing) { 3336 if (!flushByteSizeLessThanSmallAndLargeCellSize) { 3337 return super.checkAndAddToActiveSize(currActive, cellToAdd, memstoreSizing); 3338 } 3339 if (Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)) { 3340 try { 3341 preCyclicBarrier.await(); 3342 } catch (Throwable e) { 3343 throw new RuntimeException(e); 3344 } 3345 } 3346 3347 boolean returnValue = super.checkAndAddToActiveSize(currActive, cellToAdd, memstoreSizing); 3348 if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) { 3349 try { 3350 preCyclicBarrier.await(); 3351 } catch (Throwable e) { 3352 throw new RuntimeException(e); 3353 } 3354 } 3355 return returnValue; 3356 } 3357 3358 @Override 3359 protected void postUpdate(MutableSegment currentActiveMutableSegment) { 3360 super.postUpdate(currentActiveMutableSegment); 3361 if (!flushByteSizeLessThanSmallAndLargeCellSize) { 3362 try { 3363 postCyclicBarrier.await(); 3364 } catch (Throwable e) { 3365 throw new RuntimeException(e); 3366 } 3367 return; 3368 } 3369 3370 if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) { 3371 try { 3372 postCyclicBarrier.await(); 3373 } catch (Throwable e) { 3374 throw new RuntimeException(e); 3375 } 3376 } 3377 } 3378 3379 @Override 3380 protected void flushInMemory(MutableSegment currentActiveMutableSegment) { 3381 super.flushInMemory(currentActiveMutableSegment); 3382 flushCounter.incrementAndGet(); 3383 if (!flushByteSizeLessThanSmallAndLargeCellSize) { 3384 return; 3385 } 3386 3387 assertTrue(Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)); 3388 try { 3389 postCyclicBarrier.await(); 3390 } catch (Throwable e) { 3391 throw new RuntimeException(e); 3392 } 3393 3394 } 3395 3396 void disableCompaction() { 3397 allowCompaction.set(false); 3398 } 3399 3400 void enableCompaction() { 3401 allowCompaction.set(true); 3402 } 3403 3404 } 3405 3406 public static class MyCompactingMemStore4 extends CompactingMemStore { 3407 private static final String TAKE_SNAPSHOT_THREAD_NAME = "takeSnapShotThread"; 3408 /** 3409 * {@link CompactingMemStore#flattenOneSegment} must execute after 3410 * {@link CompactingMemStore#getImmutableSegments} 3411 */ 3412 private final CyclicBarrier flattenOneSegmentPreCyclicBarrier = new CyclicBarrier(2); 3413 /** 3414 * Only after {@link CompactingMemStore#flattenOneSegment} completed, 3415 * {@link CompactingMemStore#swapPipelineWithNull} could execute. 3416 */ 3417 private final CyclicBarrier flattenOneSegmentPostCyclicBarrier = new CyclicBarrier(2); 3418 /** 3419 * Only the in memory compact thread enters {@link CompactingMemStore#flattenOneSegment},the 3420 * snapshot thread starts {@link CompactingMemStore#snapshot},because 3421 * {@link CompactingMemStore#snapshot} would invoke {@link CompactingMemStore#stopCompaction}. 3422 */ 3423 private final CyclicBarrier snapShotStartCyclicCyclicBarrier = new CyclicBarrier(2); 3424 /** 3425 * To wait for {@link CompactingMemStore.InMemoryCompactionRunnable} stopping. 3426 */ 3427 private final CyclicBarrier inMemoryCompactionEndCyclicBarrier = new CyclicBarrier(2); 3428 private final AtomicInteger getImmutableSegmentsListCounter = new AtomicInteger(0); 3429 private final AtomicInteger swapPipelineWithNullCounter = new AtomicInteger(0); 3430 private final AtomicInteger flattenOneSegmentCounter = new AtomicInteger(0); 3431 private final AtomicInteger setInMemoryCompactionFlagCounter = new AtomicInteger(0); 3432 3433 public MyCompactingMemStore4(Configuration conf, CellComparatorImpl cellComparator, 3434 HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy) 3435 throws IOException { 3436 super(conf, cellComparator, store, regionServices, compactionPolicy); 3437 } 3438 3439 @Override 3440 public VersionedSegmentsList getImmutableSegments() { 3441 VersionedSegmentsList result = super.getImmutableSegments(); 3442 if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) { 3443 int currentCount = getImmutableSegmentsListCounter.incrementAndGet(); 3444 if (currentCount <= 1) { 3445 try { 3446 flattenOneSegmentPreCyclicBarrier.await(); 3447 } catch (Throwable e) { 3448 throw new RuntimeException(e); 3449 } 3450 } 3451 } 3452 return result; 3453 } 3454 3455 @Override 3456 protected boolean swapPipelineWithNull(VersionedSegmentsList segments) { 3457 if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) { 3458 int currentCount = swapPipelineWithNullCounter.incrementAndGet(); 3459 if (currentCount <= 1) { 3460 try { 3461 flattenOneSegmentPostCyclicBarrier.await(); 3462 } catch (Throwable e) { 3463 throw new RuntimeException(e); 3464 } 3465 } 3466 } 3467 boolean result = super.swapPipelineWithNull(segments); 3468 if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) { 3469 int currentCount = swapPipelineWithNullCounter.get(); 3470 if (currentCount <= 1) { 3471 assertTrue(!result); 3472 } 3473 if (currentCount == 2) { 3474 assertTrue(result); 3475 } 3476 } 3477 return result; 3478 3479 } 3480 3481 @Override 3482 public void flattenOneSegment(long requesterVersion, Action action) { 3483 int currentCount = flattenOneSegmentCounter.incrementAndGet(); 3484 if (currentCount <= 1) { 3485 try { 3486 /** 3487 * {@link CompactingMemStore#snapshot} could start. 3488 */ 3489 snapShotStartCyclicCyclicBarrier.await(); 3490 flattenOneSegmentPreCyclicBarrier.await(); 3491 } catch (Throwable e) { 3492 throw new RuntimeException(e); 3493 } 3494 } 3495 super.flattenOneSegment(requesterVersion, action); 3496 if (currentCount <= 1) { 3497 try { 3498 flattenOneSegmentPostCyclicBarrier.await(); 3499 } catch (Throwable e) { 3500 throw new RuntimeException(e); 3501 } 3502 } 3503 } 3504 3505 @Override 3506 protected boolean setInMemoryCompactionFlag() { 3507 boolean result = super.setInMemoryCompactionFlag(); 3508 assertTrue(result); 3509 setInMemoryCompactionFlagCounter.incrementAndGet(); 3510 return result; 3511 } 3512 3513 @Override 3514 void inMemoryCompaction() { 3515 try { 3516 super.inMemoryCompaction(); 3517 } finally { 3518 try { 3519 inMemoryCompactionEndCyclicBarrier.await(); 3520 } catch (Throwable e) { 3521 throw new RuntimeException(e); 3522 } 3523 3524 } 3525 } 3526 3527 } 3528 3529 public static class MyCompactingMemStore5 extends CompactingMemStore { 3530 private static final String TAKE_SNAPSHOT_THREAD_NAME = "takeSnapShotThread"; 3531 private static final String WRITE_AGAIN_THREAD_NAME = "writeAgainThread"; 3532 /** 3533 * {@link CompactingMemStore#flattenOneSegment} must execute after 3534 * {@link CompactingMemStore#getImmutableSegments} 3535 */ 3536 private final CyclicBarrier flattenOneSegmentPreCyclicBarrier = new CyclicBarrier(2); 3537 /** 3538 * Only after {@link CompactingMemStore#flattenOneSegment} completed, 3539 * {@link CompactingMemStore#swapPipelineWithNull} could execute. 3540 */ 3541 private final CyclicBarrier flattenOneSegmentPostCyclicBarrier = new CyclicBarrier(2); 3542 /** 3543 * Only the in memory compact thread enters {@link CompactingMemStore#flattenOneSegment},the 3544 * snapshot thread starts {@link CompactingMemStore#snapshot},because 3545 * {@link CompactingMemStore#snapshot} would invoke {@link CompactingMemStore#stopCompaction}. 3546 */ 3547 private final CyclicBarrier snapShotStartCyclicCyclicBarrier = new CyclicBarrier(2); 3548 /** 3549 * To wait for {@link CompactingMemStore.InMemoryCompactionRunnable} stopping. 3550 */ 3551 private final CyclicBarrier inMemoryCompactionEndCyclicBarrier = new CyclicBarrier(2); 3552 private final AtomicInteger getImmutableSegmentsListCounter = new AtomicInteger(0); 3553 private final AtomicInteger swapPipelineWithNullCounter = new AtomicInteger(0); 3554 private final AtomicInteger flattenOneSegmentCounter = new AtomicInteger(0); 3555 private final AtomicInteger setInMemoryCompactionFlagCounter = new AtomicInteger(0); 3556 /** 3557 * Only the snapshot thread retry {@link CompactingMemStore#swapPipelineWithNull}, writeAgain 3558 * thread could start. 3559 */ 3560 private final CyclicBarrier writeMemStoreAgainStartCyclicBarrier = new CyclicBarrier(2); 3561 /** 3562 * This is used for snapshot thread,writeAgain thread and in memory compact thread. Only the 3563 * writeAgain thread completes, {@link CompactingMemStore#swapPipelineWithNull} would 3564 * execute,and in memory compact thread would exit,because we expect that in memory compact 3565 * executing only once. 3566 */ 3567 private final CyclicBarrier writeMemStoreAgainEndCyclicBarrier = new CyclicBarrier(3); 3568 3569 public MyCompactingMemStore5(Configuration conf, CellComparatorImpl cellComparator, 3570 HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy) 3571 throws IOException { 3572 super(conf, cellComparator, store, regionServices, compactionPolicy); 3573 } 3574 3575 @Override 3576 public VersionedSegmentsList getImmutableSegments() { 3577 VersionedSegmentsList result = super.getImmutableSegments(); 3578 if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) { 3579 int currentCount = getImmutableSegmentsListCounter.incrementAndGet(); 3580 if (currentCount <= 1) { 3581 try { 3582 flattenOneSegmentPreCyclicBarrier.await(); 3583 } catch (Throwable e) { 3584 throw new RuntimeException(e); 3585 } 3586 } 3587 3588 } 3589 3590 return result; 3591 } 3592 3593 @Override 3594 protected boolean swapPipelineWithNull(VersionedSegmentsList segments) { 3595 if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) { 3596 int currentCount = swapPipelineWithNullCounter.incrementAndGet(); 3597 if (currentCount <= 1) { 3598 try { 3599 flattenOneSegmentPostCyclicBarrier.await(); 3600 } catch (Throwable e) { 3601 throw new RuntimeException(e); 3602 } 3603 } 3604 3605 if (currentCount == 2) { 3606 try { 3607 /** 3608 * Only the snapshot thread retry {@link CompactingMemStore#swapPipelineWithNull}, 3609 * writeAgain thread could start. 3610 */ 3611 writeMemStoreAgainStartCyclicBarrier.await(); 3612 /** 3613 * Only the writeAgain thread completes, retry 3614 * {@link CompactingMemStore#swapPipelineWithNull} would execute. 3615 */ 3616 writeMemStoreAgainEndCyclicBarrier.await(); 3617 } catch (Throwable e) { 3618 throw new RuntimeException(e); 3619 } 3620 } 3621 3622 } 3623 boolean result = super.swapPipelineWithNull(segments); 3624 if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) { 3625 int currentCount = swapPipelineWithNullCounter.get(); 3626 if (currentCount <= 1) { 3627 assertTrue(!result); 3628 } 3629 if (currentCount == 2) { 3630 assertTrue(result); 3631 } 3632 } 3633 return result; 3634 3635 } 3636 3637 @Override 3638 public void flattenOneSegment(long requesterVersion, Action action) { 3639 int currentCount = flattenOneSegmentCounter.incrementAndGet(); 3640 if (currentCount <= 1) { 3641 try { 3642 /** 3643 * {@link CompactingMemStore#snapshot} could start. 3644 */ 3645 snapShotStartCyclicCyclicBarrier.await(); 3646 flattenOneSegmentPreCyclicBarrier.await(); 3647 } catch (Throwable e) { 3648 throw new RuntimeException(e); 3649 } 3650 } 3651 super.flattenOneSegment(requesterVersion, action); 3652 if (currentCount <= 1) { 3653 try { 3654 flattenOneSegmentPostCyclicBarrier.await(); 3655 /** 3656 * Only the writeAgain thread completes, in memory compact thread would exit,because we 3657 * expect that in memory compact executing only once. 3658 */ 3659 writeMemStoreAgainEndCyclicBarrier.await(); 3660 } catch (Throwable e) { 3661 throw new RuntimeException(e); 3662 } 3663 3664 } 3665 } 3666 3667 @Override 3668 protected boolean setInMemoryCompactionFlag() { 3669 boolean result = super.setInMemoryCompactionFlag(); 3670 int count = setInMemoryCompactionFlagCounter.incrementAndGet(); 3671 if (count <= 1) { 3672 assertTrue(result); 3673 } 3674 if (count == 2) { 3675 assertTrue(!result); 3676 } 3677 return result; 3678 } 3679 3680 @Override 3681 void inMemoryCompaction() { 3682 try { 3683 super.inMemoryCompaction(); 3684 } finally { 3685 try { 3686 inMemoryCompactionEndCyclicBarrier.await(); 3687 } catch (Throwable e) { 3688 throw new RuntimeException(e); 3689 } 3690 3691 } 3692 } 3693 } 3694 3695 public static class MyCompactingMemStore6 extends CompactingMemStore { 3696 private final CyclicBarrier inMemoryCompactionEndCyclicBarrier = new CyclicBarrier(2); 3697 3698 public MyCompactingMemStore6(Configuration conf, CellComparatorImpl cellComparator, 3699 HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy) 3700 throws IOException { 3701 super(conf, cellComparator, store, regionServices, compactionPolicy); 3702 } 3703 3704 @Override 3705 void inMemoryCompaction() { 3706 try { 3707 super.inMemoryCompaction(); 3708 } finally { 3709 try { 3710 inMemoryCompactionEndCyclicBarrier.await(); 3711 } catch (Throwable e) { 3712 throw new RuntimeException(e); 3713 } 3714 3715 } 3716 } 3717 } 3718 3719 public static class MyDefaultMemStore extends DefaultMemStore { 3720 private static final String GET_SCANNER_THREAD_NAME = "getScannerMyThread"; 3721 private static final String FLUSH_THREAD_NAME = "flushMyThread"; 3722 /** 3723 * Only when flush thread enters {@link DefaultMemStore#doClearSnapShot}, getScanner thread 3724 * could start. 3725 */ 3726 private final CyclicBarrier getScannerCyclicBarrier = new CyclicBarrier(2); 3727 /** 3728 * Used by getScanner thread notifies flush thread {@link DefaultMemStore#getSnapshotSegments} 3729 * completed, {@link DefaultMemStore#doClearSnapShot} could continue. 3730 */ 3731 private final CyclicBarrier preClearSnapShotCyclicBarrier = new CyclicBarrier(2); 3732 /** 3733 * Used by flush thread notifies getScanner thread {@link DefaultMemStore#doClearSnapShot} 3734 * completed, {@link DefaultMemStore#getScanners} could continue. 3735 */ 3736 private final CyclicBarrier postClearSnapShotCyclicBarrier = new CyclicBarrier(2); 3737 private final AtomicInteger getSnapshotSegmentsCounter = new AtomicInteger(0); 3738 private final AtomicInteger clearSnapshotCounter = new AtomicInteger(0); 3739 private volatile boolean shouldWait = true; 3740 private volatile HStore store = null; 3741 3742 public MyDefaultMemStore(Configuration conf, CellComparator cellComparator, 3743 RegionServicesForStores regionServices) throws IOException { 3744 super(conf, cellComparator, regionServices); 3745 } 3746 3747 @Override 3748 protected List<Segment> getSnapshotSegments() { 3749 3750 List<Segment> result = super.getSnapshotSegments(); 3751 3752 if (Thread.currentThread().getName().equals(GET_SCANNER_THREAD_NAME)) { 3753 int currentCount = getSnapshotSegmentsCounter.incrementAndGet(); 3754 if (currentCount == 1) { 3755 if (this.shouldWait) { 3756 try { 3757 /** 3758 * Notify flush thread {@link DefaultMemStore#getSnapshotSegments} completed, 3759 * {@link DefaultMemStore#doClearSnapShot} could continue. 3760 */ 3761 preClearSnapShotCyclicBarrier.await(); 3762 /** 3763 * Wait for {@link DefaultMemStore#doClearSnapShot} completed. 3764 */ 3765 postClearSnapShotCyclicBarrier.await(); 3766 3767 } catch (Throwable e) { 3768 throw new RuntimeException(e); 3769 } 3770 } 3771 } 3772 } 3773 return result; 3774 } 3775 3776 @Override 3777 protected void doClearSnapShot() { 3778 if (Thread.currentThread().getName().equals(FLUSH_THREAD_NAME)) { 3779 int currentCount = clearSnapshotCounter.incrementAndGet(); 3780 if (currentCount == 1) { 3781 try { 3782 if ( 3783 ((ReentrantReadWriteLock) store.getStoreEngine().getLock()) 3784 .isWriteLockedByCurrentThread() 3785 ) { 3786 shouldWait = false; 3787 } 3788 /** 3789 * Only when flush thread enters {@link DefaultMemStore#doClearSnapShot}, getScanner 3790 * thread could start. 3791 */ 3792 getScannerCyclicBarrier.await(); 3793 3794 if (shouldWait) { 3795 /** 3796 * Wait for {@link DefaultMemStore#getSnapshotSegments} completed. 3797 */ 3798 preClearSnapShotCyclicBarrier.await(); 3799 } 3800 } catch (Throwable e) { 3801 throw new RuntimeException(e); 3802 } 3803 } 3804 } 3805 super.doClearSnapShot(); 3806 3807 if (Thread.currentThread().getName().equals(FLUSH_THREAD_NAME)) { 3808 int currentCount = clearSnapshotCounter.get(); 3809 if (currentCount == 1) { 3810 if (shouldWait) { 3811 try { 3812 /** 3813 * Notify getScanner thread {@link DefaultMemStore#doClearSnapShot} completed, 3814 * {@link DefaultMemStore#getScanners} could continue. 3815 */ 3816 postClearSnapShotCyclicBarrier.await(); 3817 } catch (Throwable e) { 3818 throw new RuntimeException(e); 3819 } 3820 } 3821 } 3822 } 3823 } 3824 } 3825}