001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.io.hfile.CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY; 021import static org.apache.hadoop.hbase.io.hfile.CacheConfig.CACHE_DATA_ON_READ_KEY; 022import static org.apache.hadoop.hbase.io.hfile.CacheConfig.DEFAULT_CACHE_DATA_ON_READ; 023import static org.apache.hadoop.hbase.io.hfile.CacheConfig.DEFAULT_CACHE_DATA_ON_WRITE; 024import static org.apache.hadoop.hbase.io.hfile.CacheConfig.DEFAULT_EVICT_ON_CLOSE; 025import static org.apache.hadoop.hbase.io.hfile.CacheConfig.EVICT_BLOCKS_ON_CLOSE_KEY; 026import static org.apache.hadoop.hbase.regionserver.DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY; 027import static org.junit.jupiter.api.Assertions.assertArrayEquals; 028import static org.junit.jupiter.api.Assertions.assertEquals; 029import static org.junit.jupiter.api.Assertions.assertFalse; 030import static org.junit.jupiter.api.Assertions.assertNull; 031import static org.junit.jupiter.api.Assertions.assertTrue; 032import static org.junit.jupiter.api.Assertions.fail; 033import static org.mockito.ArgumentMatchers.any; 034import static org.mockito.Mockito.mock; 035import static org.mockito.Mockito.spy; 036import static org.mockito.Mockito.times; 037import static org.mockito.Mockito.verify; 038import static org.mockito.Mockito.when; 039 040import java.io.FileNotFoundException; 041import java.io.IOException; 042import java.lang.ref.SoftReference; 043import java.security.PrivilegedExceptionAction; 044import java.util.ArrayList; 045import java.util.Arrays; 046import java.util.Collection; 047import java.util.Collections; 048import java.util.Iterator; 049import java.util.List; 050import java.util.ListIterator; 051import java.util.NavigableSet; 052import java.util.Optional; 053import java.util.TreeSet; 054import java.util.concurrent.BrokenBarrierException; 055import java.util.concurrent.ConcurrentSkipListSet; 056import java.util.concurrent.CountDownLatch; 057import java.util.concurrent.CyclicBarrier; 058import java.util.concurrent.ExecutorService; 059import java.util.concurrent.Executors; 060import java.util.concurrent.Future; 061import java.util.concurrent.ThreadPoolExecutor; 062import java.util.concurrent.TimeUnit; 063import java.util.concurrent.atomic.AtomicBoolean; 064import java.util.concurrent.atomic.AtomicInteger; 065import java.util.concurrent.atomic.AtomicLong; 066import java.util.concurrent.atomic.AtomicReference; 067import java.util.concurrent.locks.ReentrantReadWriteLock; 068import java.util.function.Consumer; 069import java.util.function.IntBinaryOperator; 070import java.util.function.IntConsumer; 071import org.apache.hadoop.conf.Configuration; 072import org.apache.hadoop.fs.FSDataOutputStream; 073import org.apache.hadoop.fs.FileStatus; 074import org.apache.hadoop.fs.FileSystem; 075import org.apache.hadoop.fs.FilterFileSystem; 076import org.apache.hadoop.fs.LocalFileSystem; 077import org.apache.hadoop.fs.Path; 078import org.apache.hadoop.fs.permission.FsPermission; 079import org.apache.hadoop.hbase.Cell; 080import org.apache.hadoop.hbase.CellBuilderType; 081import org.apache.hadoop.hbase.CellComparator; 082import org.apache.hadoop.hbase.CellComparatorImpl; 083import org.apache.hadoop.hbase.CellUtil; 084import org.apache.hadoop.hbase.ExtendedCell; 085import org.apache.hadoop.hbase.ExtendedCellBuilderFactory; 086import org.apache.hadoop.hbase.HBaseConfiguration; 087import org.apache.hadoop.hbase.HBaseTestingUtil; 088import org.apache.hadoop.hbase.HConstants; 089import org.apache.hadoop.hbase.KeyValue; 090import org.apache.hadoop.hbase.MemoryCompactionPolicy; 091import org.apache.hadoop.hbase.NamespaceDescriptor; 092import org.apache.hadoop.hbase.PrivateCellUtil; 093import org.apache.hadoop.hbase.TableName; 094import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 095import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 096import org.apache.hadoop.hbase.client.Get; 097import org.apache.hadoop.hbase.client.RegionInfo; 098import org.apache.hadoop.hbase.client.RegionInfoBuilder; 099import org.apache.hadoop.hbase.client.Scan; 100import org.apache.hadoop.hbase.client.Scan.ReadType; 101import org.apache.hadoop.hbase.client.TableDescriptor; 102import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 103import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException; 104import org.apache.hadoop.hbase.filter.Filter; 105import org.apache.hadoop.hbase.filter.FilterBase; 106import org.apache.hadoop.hbase.io.compress.Compression; 107import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 108import org.apache.hadoop.hbase.io.hfile.CacheConfig; 109import org.apache.hadoop.hbase.io.hfile.HFile; 110import org.apache.hadoop.hbase.io.hfile.HFileContext; 111import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 112import org.apache.hadoop.hbase.monitoring.MonitoredTask; 113import org.apache.hadoop.hbase.nio.RefCnt; 114import org.apache.hadoop.hbase.quotas.RegionSizeStoreImpl; 115import org.apache.hadoop.hbase.regionserver.ChunkCreator.ChunkType; 116import org.apache.hadoop.hbase.regionserver.MemStoreCompactionStrategy.Action; 117import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; 118import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; 119import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; 120import org.apache.hadoop.hbase.regionserver.compactions.EverythingPolicy; 121import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher; 122import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker; 123import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory; 124import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; 125import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 126import org.apache.hadoop.hbase.security.User; 127import org.apache.hadoop.hbase.testclassification.MediumTests; 128import org.apache.hadoop.hbase.testclassification.RegionServerTests; 129import org.apache.hadoop.hbase.util.BloomFilterUtil; 130import org.apache.hadoop.hbase.util.Bytes; 131import org.apache.hadoop.hbase.util.CommonFSUtils; 132import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 133import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; 134import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge; 135import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; 136import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 137import org.apache.hadoop.hbase.wal.WALFactory; 138import org.apache.hadoop.util.Progressable; 139import org.junit.jupiter.api.AfterAll; 140import org.junit.jupiter.api.AfterEach; 141import org.junit.jupiter.api.BeforeEach; 142import org.junit.jupiter.api.Tag; 143import org.junit.jupiter.api.Test; 144import org.junit.jupiter.api.TestInfo; 145import org.junit.jupiter.api.Timeout; 146import org.mockito.Mockito; 147import org.slf4j.Logger; 148import org.slf4j.LoggerFactory; 149 150import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 151 152/** 153 * Test class for the HStore 154 */ 155@Tag(RegionServerTests.TAG) 156@Tag(MediumTests.TAG) 157public class TestHStore { 158 159 private static final Logger LOG = LoggerFactory.getLogger(TestHStore.class); 160 private String name; 161 162 HRegion region; 163 HStore store; 164 byte[] table = Bytes.toBytes("table"); 165 byte[] family = Bytes.toBytes("family"); 166 167 byte[] row = Bytes.toBytes("row"); 168 byte[] row2 = Bytes.toBytes("row2"); 169 byte[] qf1 = Bytes.toBytes("qf1"); 170 byte[] qf2 = Bytes.toBytes("qf2"); 171 byte[] qf3 = Bytes.toBytes("qf3"); 172 byte[] qf4 = Bytes.toBytes("qf4"); 173 byte[] qf5 = Bytes.toBytes("qf5"); 174 byte[] qf6 = Bytes.toBytes("qf6"); 175 176 NavigableSet<byte[]> qualifiers = new ConcurrentSkipListSet<>(Bytes.BYTES_COMPARATOR); 177 178 List<Cell> expected = new ArrayList<>(); 179 List<Cell> result = new ArrayList<>(); 180 181 long id = EnvironmentEdgeManager.currentTime(); 182 Get get = new Get(row); 183 184 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 185 private static final String DIR = TEST_UTIL.getDataTestDir("TestStore").toString(); 186 187 @BeforeEach 188 public void setUp(TestInfo testInfo) throws IOException { 189 this.name = testInfo.getTestMethod().get().getName(); 190 qualifiers.clear(); 191 qualifiers.add(qf1); 192 qualifiers.add(qf3); 193 qualifiers.add(qf5); 194 195 Iterator<byte[]> iter = qualifiers.iterator(); 196 while (iter.hasNext()) { 197 byte[] next = iter.next(); 198 expected.add(new KeyValue(row, family, next, 1, (byte[]) null)); 199 get.addColumn(family, next); 200 } 201 } 202 203 private void init(String methodName) throws IOException { 204 init(methodName, TEST_UTIL.getConfiguration()); 205 } 206 207 private HStore init(String methodName, Configuration conf) throws IOException { 208 // some of the tests write 4 versions and then flush 209 // (with HBASE-4241, lower versions are collected on flush) 210 return init(methodName, conf, 211 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(4).build()); 212 } 213 214 private HStore init(String methodName, Configuration conf, ColumnFamilyDescriptor hcd) 215 throws IOException { 216 return init(methodName, conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), hcd); 217 } 218 219 private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder, 220 ColumnFamilyDescriptor hcd) throws IOException { 221 return init(methodName, conf, builder, hcd, null); 222 } 223 224 private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder, 225 ColumnFamilyDescriptor hcd, MyStoreHook hook) throws IOException { 226 return init(methodName, conf, builder, hcd, hook, false); 227 } 228 229 private void initHRegion(String methodName, Configuration conf, TableDescriptorBuilder builder, 230 ColumnFamilyDescriptor hcd, MyStoreHook hook, boolean switchToPread) throws IOException { 231 TableDescriptor htd = builder.setColumnFamily(hcd).build(); 232 Path basedir = new Path(DIR + methodName); 233 Path tableDir = CommonFSUtils.getTableDir(basedir, htd.getTableName()); 234 final Path logdir = new Path(basedir, AbstractFSWALProvider.getWALDirectoryName(methodName)); 235 236 FileSystem fs = FileSystem.get(conf); 237 238 fs.delete(logdir, true); 239 ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 240 MemStoreLABImpl.CHUNK_SIZE_DEFAULT, 1, 0, null, 241 MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); 242 RegionInfo info = RegionInfoBuilder.newBuilder(htd.getTableName()).build(); 243 Configuration walConf = new Configuration(conf); 244 CommonFSUtils.setRootDir(walConf, basedir); 245 WALFactory wals = new WALFactory(walConf, methodName); 246 region = new HRegion(new HRegionFileSystem(conf, fs, tableDir, info), wals.getWAL(info), conf, 247 htd, null); 248 region.regionServicesForStores = Mockito.spy(region.regionServicesForStores); 249 ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1); 250 Mockito.when(region.regionServicesForStores.getInMemoryCompactionPool()).thenReturn(pool); 251 } 252 253 private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder, 254 ColumnFamilyDescriptor hcd, MyStoreHook hook, boolean switchToPread) throws IOException { 255 initHRegion(methodName, conf, builder, hcd, hook, switchToPread); 256 if (hook == null) { 257 store = new HStore(region, hcd, conf, false); 258 } else { 259 store = new MyStore(region, hcd, conf, hook, switchToPread); 260 } 261 region.stores.put(store.getColumnFamilyDescriptor().getName(), store); 262 return store; 263 } 264 265 /** 266 * Test we do not lose data if we fail a flush and then close. Part of HBase-10466 267 */ 268 269 @Test 270 public void testFlushSizeSizing() throws Exception { 271 LOG.info("Setting up a faulty file system that cannot write in " + name); 272 final Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); 273 // Only retry once. 274 conf.setInt("hbase.hstore.flush.retries.number", 1); 275 User user = User.createUserForTesting(conf, name, new String[] { "foo" }); 276 // Inject our faulty LocalFileSystem 277 conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class); 278 user.runAs(new PrivilegedExceptionAction<Object>() { 279 @Override 280 public Object run() throws Exception { 281 // Make sure it worked (above is sensitive to caching details in hadoop core) 282 FileSystem fs = FileSystem.get(conf); 283 assertEquals(FaultyFileSystem.class, fs.getClass()); 284 FaultyFileSystem ffs = (FaultyFileSystem) fs; 285 286 // Initialize region 287 init(name, conf); 288 289 MemStoreSize mss = store.memstore.getFlushableSize(); 290 assertEquals(0, mss.getDataSize()); 291 LOG.info("Adding some data"); 292 MemStoreSizing kvSize = new NonThreadSafeMemStoreSizing(); 293 store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), kvSize); 294 // add the heap size of active (mutable) segment 295 kvSize.incMemStoreSize(0, MutableSegment.DEEP_OVERHEAD, 0, 0); 296 mss = store.memstore.getFlushableSize(); 297 assertEquals(kvSize.getMemStoreSize(), mss); 298 // Flush. Bug #1 from HBASE-10466. Make sure size calculation on failed flush is right. 299 try { 300 LOG.info("Flushing"); 301 flushStore(store, id++); 302 fail("Didn't bubble up IOE!"); 303 } catch (IOException ioe) { 304 assertTrue(ioe.getMessage().contains("Fault injected")); 305 } 306 // due to snapshot, change mutable to immutable segment 307 kvSize.incMemStoreSize(0, 308 CSLMImmutableSegment.DEEP_OVERHEAD_CSLM - MutableSegment.DEEP_OVERHEAD, 0, 0); 309 mss = store.memstore.getFlushableSize(); 310 assertEquals(kvSize.getMemStoreSize(), mss); 311 MemStoreSizing kvSize2 = new NonThreadSafeMemStoreSizing(); 312 store.add(new KeyValue(row, family, qf2, 2, (byte[]) null), kvSize2); 313 kvSize2.incMemStoreSize(0, MutableSegment.DEEP_OVERHEAD, 0, 0); 314 // Even though we add a new kv, we expect the flushable size to be 'same' since we have 315 // not yet cleared the snapshot -- the above flush failed. 316 assertEquals(kvSize.getMemStoreSize(), mss); 317 ffs.fault.set(false); 318 flushStore(store, id++); 319 mss = store.memstore.getFlushableSize(); 320 // Size should be the foreground kv size. 321 assertEquals(kvSize2.getMemStoreSize(), mss); 322 flushStore(store, id++); 323 mss = store.memstore.getFlushableSize(); 324 assertEquals(0, mss.getDataSize()); 325 assertEquals(MutableSegment.DEEP_OVERHEAD, mss.getHeapSize()); 326 return null; 327 } 328 }); 329 } 330 331 @Test 332 public void testStoreBloomFilterMetricsWithBloomRowCol() throws IOException { 333 int numStoreFiles = 5; 334 writeAndRead(BloomType.ROWCOL, numStoreFiles); 335 336 assertEquals(0, store.getBloomFilterEligibleRequestsCount()); 337 // hard to know exactly the numbers here, we are just trying to 338 // prove that they are incrementing 339 assertTrue(store.getBloomFilterRequestsCount() >= numStoreFiles); 340 assertTrue(store.getBloomFilterNegativeResultsCount() > 0); 341 } 342 343 @Test 344 public void testStoreBloomFilterMetricsWithBloomRow() throws IOException { 345 int numStoreFiles = 5; 346 writeAndRead(BloomType.ROWCOL, numStoreFiles); 347 348 assertEquals(0, store.getBloomFilterEligibleRequestsCount()); 349 // hard to know exactly the numbers here, we are just trying to 350 // prove that they are incrementing 351 assertTrue(store.getBloomFilterRequestsCount() >= numStoreFiles); 352 assertTrue(store.getBloomFilterNegativeResultsCount() > 0); 353 } 354 355 @Test 356 public void testStoreBloomFilterMetricsWithBloomRowPrefix() throws IOException { 357 int numStoreFiles = 5; 358 writeAndRead(BloomType.ROWPREFIX_FIXED_LENGTH, numStoreFiles); 359 360 assertEquals(0, store.getBloomFilterEligibleRequestsCount()); 361 // hard to know exactly the numbers here, we are just trying to 362 // prove that they are incrementing 363 assertTrue(store.getBloomFilterRequestsCount() >= numStoreFiles); 364 } 365 366 @Test 367 public void testStoreBloomFilterMetricsWithBloomNone() throws IOException { 368 int numStoreFiles = 5; 369 writeAndRead(BloomType.NONE, numStoreFiles); 370 371 assertEquals(0, store.getBloomFilterRequestsCount()); 372 assertEquals(0, store.getBloomFilterNegativeResultsCount()); 373 374 // hard to know exactly the numbers here, we are just trying to 375 // prove that they are incrementing 376 assertTrue(store.getBloomFilterEligibleRequestsCount() >= numStoreFiles); 377 } 378 379 private void writeAndRead(BloomType bloomType, int numStoreFiles) throws IOException { 380 Configuration conf = HBaseConfiguration.create(); 381 FileSystem fs = FileSystem.get(conf); 382 383 ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.newBuilder(family) 384 .setCompressionType(Compression.Algorithm.GZ).setBloomFilterType(bloomType) 385 .setConfiguration(BloomFilterUtil.PREFIX_LENGTH_KEY, "3").build(); 386 init(name, conf, hcd); 387 388 for (int i = 1; i <= numStoreFiles; i++) { 389 byte[] row = Bytes.toBytes("row" + i); 390 LOG.info("Adding some data for the store file #" + i); 391 long timeStamp = EnvironmentEdgeManager.currentTime(); 392 this.store.add(new KeyValue(row, family, qf1, timeStamp, (byte[]) null), null); 393 this.store.add(new KeyValue(row, family, qf2, timeStamp, (byte[]) null), null); 394 this.store.add(new KeyValue(row, family, qf3, timeStamp, (byte[]) null), null); 395 flush(i); 396 } 397 398 // Verify the total number of store files 399 assertEquals(numStoreFiles, this.store.getStorefiles().size()); 400 401 TreeSet<byte[]> columns = new TreeSet<>(Bytes.BYTES_COMPARATOR); 402 columns.add(qf1); 403 404 for (int i = 1; i <= numStoreFiles; i++) { 405 KeyValueScanner scanner = 406 store.getScanner(new Scan(new Get(Bytes.toBytes("row" + i))), columns, 0); 407 scanner.peek(); 408 } 409 } 410 411 /** 412 * Verify that compression and data block encoding are respected by the createWriter method, used 413 * on store flush. 414 */ 415 @Test 416 public void testCreateWriter() throws Exception { 417 Configuration conf = HBaseConfiguration.create(); 418 FileSystem fs = FileSystem.get(conf); 419 420 ColumnFamilyDescriptor hcd = 421 ColumnFamilyDescriptorBuilder.newBuilder(family).setCompressionType(Compression.Algorithm.GZ) 422 .setDataBlockEncoding(DataBlockEncoding.DIFF).build(); 423 init(name, conf, hcd); 424 425 // Test createWriter 426 StoreFileWriter writer = store.getStoreEngine() 427 .createWriter(CreateStoreFileWriterParams.create().maxKeyCount(4) 428 .compression(hcd.getCompressionType()).isCompaction(false).includeMVCCReadpoint(true) 429 .includesTag(false).shouldDropBehind(false)); 430 Path path = writer.getPath(); 431 writer.append(new KeyValue(row, family, qf1, Bytes.toBytes(1))); 432 writer.append(new KeyValue(row, family, qf2, Bytes.toBytes(2))); 433 writer.append(new KeyValue(row2, family, qf1, Bytes.toBytes(3))); 434 writer.append(new KeyValue(row2, family, qf2, Bytes.toBytes(4))); 435 writer.close(); 436 437 // Verify that compression and encoding settings are respected 438 HFile.Reader reader = HFile.createReader(fs, path, new CacheConfig(conf), true, conf); 439 assertEquals(hcd.getCompressionType(), reader.getTrailer().getCompressionCodec()); 440 assertEquals(hcd.getDataBlockEncoding(), reader.getDataBlockEncoding()); 441 reader.close(); 442 } 443 444 @Test 445 public void testDeleteExpiredStoreFiles() throws Exception { 446 testDeleteExpiredStoreFiles(0); 447 testDeleteExpiredStoreFiles(1); 448 } 449 450 /** 451 * @param minVersions the MIN_VERSIONS for the column family 452 */ 453 public void testDeleteExpiredStoreFiles(int minVersions) throws Exception { 454 int storeFileNum = 4; 455 int ttl = 4; 456 IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge(); 457 EnvironmentEdgeManagerTestHelper.injectEdge(edge); 458 459 Configuration conf = HBaseConfiguration.create(); 460 // Enable the expired store file deletion 461 conf.setBoolean("hbase.store.delete.expired.storefile", true); 462 // Set the compaction threshold higher to avoid normal compactions. 463 conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 5); 464 465 init(name + "-" + minVersions, conf, ColumnFamilyDescriptorBuilder.newBuilder(family) 466 .setMinVersions(minVersions).setTimeToLive(ttl).build()); 467 468 long storeTtl = this.store.getScanInfo().getTtl(); 469 long sleepTime = storeTtl / storeFileNum; 470 long timeStamp; 471 // There are 4 store files and the max time stamp difference among these 472 // store files will be (this.store.ttl / storeFileNum) 473 for (int i = 1; i <= storeFileNum; i++) { 474 LOG.info("Adding some data for the store file #" + i); 475 timeStamp = EnvironmentEdgeManager.currentTime(); 476 this.store.add(new KeyValue(row, family, qf1, timeStamp, (byte[]) null), null); 477 this.store.add(new KeyValue(row, family, qf2, timeStamp, (byte[]) null), null); 478 this.store.add(new KeyValue(row, family, qf3, timeStamp, (byte[]) null), null); 479 flush(i); 480 edge.incrementTime(sleepTime); 481 } 482 483 // Verify the total number of store files 484 assertEquals(storeFileNum, this.store.getStorefiles().size()); 485 486 // Each call will find one expired store file and delete it before compaction happens. 487 // There will be no compaction due to threshold above. Last file will not be replaced. 488 for (int i = 1; i <= storeFileNum - 1; i++) { 489 // verify the expired store file. 490 assertFalse(this.store.requestCompaction().isPresent()); 491 Collection<HStoreFile> sfs = this.store.getStorefiles(); 492 // Ensure i files are gone. 493 if (minVersions == 0) { 494 assertEquals(storeFileNum - i, sfs.size()); 495 // Ensure only non-expired files remain. 496 for (HStoreFile sf : sfs) { 497 assertTrue(sf.getReader().getMaxTimestamp() >= (edge.currentTime() - storeTtl)); 498 } 499 } else { 500 assertEquals(storeFileNum, sfs.size()); 501 } 502 // Let the next store file expired. 503 edge.incrementTime(sleepTime); 504 } 505 assertFalse(this.store.requestCompaction().isPresent()); 506 507 Collection<HStoreFile> sfs = this.store.getStorefiles(); 508 // Assert the last expired file is not removed. 509 if (minVersions == 0) { 510 assertEquals(1, sfs.size()); 511 } 512 long ts = sfs.iterator().next().getReader().getMaxTimestamp(); 513 assertTrue(ts < (edge.currentTime() - storeTtl)); 514 515 for (HStoreFile sf : sfs) { 516 sf.closeStoreFile(true); 517 } 518 } 519 520 @Test 521 public void testLowestModificationTime() throws Exception { 522 Configuration conf = HBaseConfiguration.create(); 523 FileSystem fs = FileSystem.get(conf); 524 // Initialize region 525 init(name, conf); 526 527 int storeFileNum = 4; 528 for (int i = 1; i <= storeFileNum; i++) { 529 LOG.info("Adding some data for the store file #" + i); 530 this.store.add(new KeyValue(row, family, qf1, i, (byte[]) null), null); 531 this.store.add(new KeyValue(row, family, qf2, i, (byte[]) null), null); 532 this.store.add(new KeyValue(row, family, qf3, i, (byte[]) null), null); 533 flush(i); 534 } 535 // after flush; check the lowest time stamp 536 long lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles()); 537 long lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles()); 538 assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS); 539 540 // after compact; check the lowest time stamp 541 store.compact(store.requestCompaction().get(), NoLimitThroughputController.INSTANCE, null); 542 lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles()); 543 lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles()); 544 assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS); 545 } 546 547 private static long getLowestTimeStampFromFS(FileSystem fs, 548 final Collection<HStoreFile> candidates) throws IOException { 549 long minTs = Long.MAX_VALUE; 550 if (candidates.isEmpty()) { 551 return minTs; 552 } 553 Path[] p = new Path[candidates.size()]; 554 int i = 0; 555 for (HStoreFile sf : candidates) { 556 p[i] = sf.getPath(); 557 ++i; 558 } 559 560 FileStatus[] stats = fs.listStatus(p); 561 if (stats == null || stats.length == 0) { 562 return minTs; 563 } 564 for (FileStatus s : stats) { 565 minTs = Math.min(minTs, s.getModificationTime()); 566 } 567 return minTs; 568 } 569 570 ////////////////////////////////////////////////////////////////////////////// 571 // Get tests 572 ////////////////////////////////////////////////////////////////////////////// 573 574 private static final int BLOCKSIZE_SMALL = 8192; 575 576 /** 577 * Test for hbase-1686. 578 */ 579 @Test 580 public void testEmptyStoreFile() throws IOException { 581 init(name); 582 // Write a store file. 583 this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null); 584 this.store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null); 585 flush(1); 586 // Now put in place an empty store file. Its a little tricky. Have to 587 // do manually with hacked in sequence id. 588 HStoreFile f = this.store.getStorefiles().iterator().next(); 589 Path storedir = f.getPath().getParent(); 590 long seqid = f.getMaxSequenceId(); 591 Configuration c = HBaseConfiguration.create(); 592 FileSystem fs = FileSystem.get(c); 593 HFileContext meta = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).build(); 594 StoreFileWriter w = new StoreFileWriter.Builder(c, new CacheConfig(c), fs) 595 .withOutputDir(storedir).withFileContext(meta).build(); 596 w.appendMetadata(seqid + 1, false); 597 w.close(); 598 this.store.close(); 599 // Reopen it... should pick up two files 600 this.store = 601 new HStore(this.store.getHRegion(), this.store.getColumnFamilyDescriptor(), c, false); 602 assertEquals(2, this.store.getStorefilesCount()); 603 604 result = HBaseTestingUtil.getFromStoreFile(store, get.getRow(), qualifiers); 605 assertEquals(1, result.size()); 606 } 607 608 /** 609 * Getting data from memstore only 610 */ 611 @Test 612 public void testGet_FromMemStoreOnly() throws IOException { 613 init(name); 614 615 // Put data in memstore 616 this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null); 617 this.store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null); 618 this.store.add(new KeyValue(row, family, qf3, 1, (byte[]) null), null); 619 this.store.add(new KeyValue(row, family, qf4, 1, (byte[]) null), null); 620 this.store.add(new KeyValue(row, family, qf5, 1, (byte[]) null), null); 621 this.store.add(new KeyValue(row, family, qf6, 1, (byte[]) null), null); 622 623 // Get 624 result = HBaseTestingUtil.getFromStoreFile(store, get.getRow(), qualifiers); 625 626 // Compare 627 assertCheck(); 628 } 629 630 @Test 631 public void testTimeRangeIfSomeCellsAreDroppedInFlush() throws IOException { 632 testTimeRangeIfSomeCellsAreDroppedInFlush(1); 633 testTimeRangeIfSomeCellsAreDroppedInFlush(3); 634 testTimeRangeIfSomeCellsAreDroppedInFlush(5); 635 } 636 637 private void testTimeRangeIfSomeCellsAreDroppedInFlush(int maxVersion) throws IOException { 638 init(name, TEST_UTIL.getConfiguration(), 639 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(maxVersion).build()); 640 long currentTs = 100; 641 long minTs = currentTs; 642 // the extra cell won't be flushed to disk, 643 // so the min of timerange will be different between memStore and hfile. 644 for (int i = 0; i != (maxVersion + 1); ++i) { 645 this.store.add(new KeyValue(row, family, qf1, ++currentTs, (byte[]) null), null); 646 if (i == 1) { 647 minTs = currentTs; 648 } 649 } 650 flushStore(store, id++); 651 652 Collection<HStoreFile> files = store.getStorefiles(); 653 assertEquals(1, files.size()); 654 HStoreFile f = files.iterator().next(); 655 f.initReader(); 656 StoreFileReader reader = f.getReader(); 657 assertEquals(minTs, reader.timeRange.getMin()); 658 assertEquals(currentTs, reader.timeRange.getMax()); 659 } 660 661 /** 662 * Getting data from files only 663 */ 664 @Test 665 public void testGet_FromFilesOnly() throws IOException { 666 init(name); 667 668 // Put data in memstore 669 this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null); 670 this.store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null); 671 // flush 672 flush(1); 673 674 // Add more data 675 this.store.add(new KeyValue(row, family, qf3, 1, (byte[]) null), null); 676 this.store.add(new KeyValue(row, family, qf4, 1, (byte[]) null), null); 677 // flush 678 flush(2); 679 680 // Add more data 681 this.store.add(new KeyValue(row, family, qf5, 1, (byte[]) null), null); 682 this.store.add(new KeyValue(row, family, qf6, 1, (byte[]) null), null); 683 // flush 684 flush(3); 685 686 // Get 687 result = HBaseTestingUtil.getFromStoreFile(store, get.getRow(), qualifiers); 688 // this.store.get(get, qualifiers, result); 689 690 // Need to sort the result since multiple files 691 Collections.sort(result, CellComparatorImpl.COMPARATOR); 692 693 // Compare 694 assertCheck(); 695 } 696 697 /** 698 * Getting data from memstore and files 699 */ 700 @Test 701 public void testGet_FromMemStoreAndFiles() throws IOException { 702 init(name); 703 704 // Put data in memstore 705 this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null); 706 this.store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null); 707 // flush 708 flush(1); 709 710 // Add more data 711 this.store.add(new KeyValue(row, family, qf3, 1, (byte[]) null), null); 712 this.store.add(new KeyValue(row, family, qf4, 1, (byte[]) null), null); 713 // flush 714 flush(2); 715 716 // Add more data 717 this.store.add(new KeyValue(row, family, qf5, 1, (byte[]) null), null); 718 this.store.add(new KeyValue(row, family, qf6, 1, (byte[]) null), null); 719 720 // Get 721 result = HBaseTestingUtil.getFromStoreFile(store, get.getRow(), qualifiers); 722 723 // Need to sort the result since multiple files 724 Collections.sort(result, CellComparatorImpl.COMPARATOR); 725 726 // Compare 727 assertCheck(); 728 } 729 730 private void flush(int storeFilessize) throws IOException { 731 flushStore(store, id++); 732 assertEquals(storeFilessize, this.store.getStorefiles().size()); 733 assertEquals(0, ((AbstractMemStore) this.store.memstore).getActive().getCellsCount()); 734 } 735 736 private void assertCheck() { 737 assertEquals(expected.size(), result.size()); 738 for (int i = 0; i < expected.size(); i++) { 739 assertEquals(expected.get(i), result.get(i)); 740 } 741 } 742 743 @AfterEach 744 public void tearDown() throws Exception { 745 EnvironmentEdgeManagerTestHelper.reset(); 746 if (store != null) { 747 try { 748 store.close(); 749 } catch (IOException e) { 750 } 751 store = null; 752 } 753 if (region != null) { 754 region.close(); 755 region = null; 756 } 757 } 758 759 @AfterAll 760 public static void tearDownAfterClass() throws IOException { 761 TEST_UTIL.cleanupTestDir(); 762 } 763 764 @Test 765 public void testHandleErrorsInFlush() throws Exception { 766 LOG.info("Setting up a faulty file system that cannot write"); 767 768 final Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); 769 User user = User.createUserForTesting(conf, "testhandleerrorsinflush", new String[] { "foo" }); 770 // Inject our faulty LocalFileSystem 771 conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class); 772 user.runAs(new PrivilegedExceptionAction<Object>() { 773 @Override 774 public Object run() throws Exception { 775 // Make sure it worked (above is sensitive to caching details in hadoop core) 776 FileSystem fs = FileSystem.get(conf); 777 assertEquals(FaultyFileSystem.class, fs.getClass()); 778 779 // Initialize region 780 init(name, conf); 781 782 LOG.info("Adding some data"); 783 store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null); 784 store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null); 785 store.add(new KeyValue(row, family, qf3, 1, (byte[]) null), null); 786 787 LOG.info("Before flush, we should have no files"); 788 789 StoreFileTracker sft = StoreFileTrackerFactory.create(conf, false, store.getStoreContext()); 790 Collection<StoreFileInfo> files = sft.load(); 791 assertEquals(0, files != null ? files.size() : 0); 792 793 // flush 794 try { 795 LOG.info("Flushing"); 796 flush(1); 797 fail("Didn't bubble up IOE!"); 798 } catch (IOException ioe) { 799 assertTrue(ioe.getMessage().contains("Fault injected")); 800 } 801 802 LOG.info("After failed flush, we should still have no files!"); 803 files = sft.load(); 804 assertEquals(0, files != null ? files.size() : 0); 805 store.getHRegion().getWAL().close(); 806 return null; 807 } 808 }); 809 FileSystem.closeAllForUGI(user.getUGI()); 810 } 811 812 /** 813 * Faulty file system that will fail if you write past its fault position the FIRST TIME only; 814 * thereafter it will succeed. Used by {@link TestHRegion} too. 815 */ 816 static class FaultyFileSystem extends FilterFileSystem { 817 List<SoftReference<FaultyOutputStream>> outStreams = new ArrayList<>(); 818 private long faultPos = 200; 819 AtomicBoolean fault = new AtomicBoolean(true); 820 821 public FaultyFileSystem() { 822 super(new LocalFileSystem()); 823 LOG.info("Creating faulty!"); 824 } 825 826 @Override 827 public FSDataOutputStream create(Path p) throws IOException { 828 return new FaultyOutputStream(super.create(p), faultPos, fault); 829 } 830 831 @Override 832 public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, 833 int bufferSize, short replication, long blockSize, Progressable progress) throws IOException { 834 return new FaultyOutputStream( 835 super.create(f, permission, overwrite, bufferSize, replication, blockSize, progress), 836 faultPos, fault); 837 } 838 839 @Override 840 public FSDataOutputStream createNonRecursive(Path f, boolean overwrite, int bufferSize, 841 short replication, long blockSize, Progressable progress) throws IOException { 842 // Fake it. Call create instead. The default implementation throws an IOE 843 // that this is not supported. 844 return create(f, overwrite, bufferSize, replication, blockSize, progress); 845 } 846 } 847 848 static class FaultyOutputStream extends FSDataOutputStream { 849 volatile long faultPos = Long.MAX_VALUE; 850 private final AtomicBoolean fault; 851 852 public FaultyOutputStream(FSDataOutputStream out, long faultPos, final AtomicBoolean fault) 853 throws IOException { 854 super(out, null); 855 this.faultPos = faultPos; 856 this.fault = fault; 857 } 858 859 @Override 860 public synchronized void write(byte[] buf, int offset, int length) throws IOException { 861 LOG.info("faulty stream write at pos " + getPos()); 862 injectFault(); 863 super.write(buf, offset, length); 864 } 865 866 private void injectFault() throws IOException { 867 if (this.fault.get() && getPos() >= faultPos) { 868 throw new IOException("Fault injected"); 869 } 870 } 871 } 872 873 private static StoreFlushContext flushStore(HStore store, long id) throws IOException { 874 StoreFlushContext storeFlushCtx = store.createFlushContext(id, FlushLifeCycleTracker.DUMMY); 875 storeFlushCtx.prepare(); 876 storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class)); 877 storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); 878 return storeFlushCtx; 879 } 880 881 /** 882 * Generate a list of KeyValues for testing based on given parameters 883 * @return the rows key-value list 884 */ 885 private List<ExtendedCell> getKeyValueSet(long[] timestamps, int numRows, byte[] qualifier, 886 byte[] family) { 887 List<ExtendedCell> kvList = new ArrayList<>(); 888 for (int i = 1; i <= numRows; i++) { 889 byte[] b = Bytes.toBytes(i); 890 for (long timestamp : timestamps) { 891 kvList.add(new KeyValue(b, family, qualifier, timestamp, b)); 892 } 893 } 894 return kvList; 895 } 896 897 /** 898 * Test to ensure correctness when using Stores with multiple timestamps 899 */ 900 @Test 901 public void testMultipleTimestamps() throws IOException { 902 int numRows = 1; 903 long[] timestamps1 = new long[] { 1, 5, 10, 20 }; 904 long[] timestamps2 = new long[] { 30, 80 }; 905 906 init(name); 907 908 List<ExtendedCell> kvList1 = getKeyValueSet(timestamps1, numRows, qf1, family); 909 for (ExtendedCell kv : kvList1) { 910 this.store.add(kv, null); 911 } 912 913 flushStore(store, id++); 914 915 List<ExtendedCell> kvList2 = getKeyValueSet(timestamps2, numRows, qf1, family); 916 for (ExtendedCell kv : kvList2) { 917 this.store.add(kv, null); 918 } 919 920 List<Cell> result; 921 Get get = new Get(Bytes.toBytes(1)); 922 get.addColumn(family, qf1); 923 924 get.setTimeRange(0, 15); 925 result = HBaseTestingUtil.getFromStoreFile(store, get); 926 assertTrue(result.size() > 0); 927 928 get.setTimeRange(40, 90); 929 result = HBaseTestingUtil.getFromStoreFile(store, get); 930 assertTrue(result.size() > 0); 931 932 get.setTimeRange(10, 45); 933 result = HBaseTestingUtil.getFromStoreFile(store, get); 934 assertTrue(result.size() > 0); 935 936 get.setTimeRange(80, 145); 937 result = HBaseTestingUtil.getFromStoreFile(store, get); 938 assertTrue(result.size() > 0); 939 940 get.setTimeRange(1, 2); 941 result = HBaseTestingUtil.getFromStoreFile(store, get); 942 assertTrue(result.size() > 0); 943 944 get.setTimeRange(90, 200); 945 result = HBaseTestingUtil.getFromStoreFile(store, get); 946 assertTrue(result.size() == 0); 947 } 948 949 /** 950 * Test for HBASE-3492 - Test split on empty colfam (no store files). 951 * @throws IOException When the IO operations fail. 952 */ 953 @Test 954 public void testSplitWithEmptyColFam() throws IOException { 955 init(name); 956 assertFalse(store.getSplitPoint().isPresent()); 957 } 958 959 @Test 960 public void testStoreUsesConfigurationFromHcdAndHtd() throws Exception { 961 final String CONFIG_KEY = "hbase.regionserver.thread.compaction.throttle"; 962 long anyValue = 10; 963 964 // We'll check that it uses correct config and propagates it appropriately by going thru 965 // the simplest "real" path I can find - "throttleCompaction", which just checks whether 966 // a number we pass in is higher than some config value, inside compactionPolicy. 967 Configuration conf = HBaseConfiguration.create(); 968 conf.setLong(CONFIG_KEY, anyValue); 969 init(name + "-xml", conf); 970 assertTrue(store.throttleCompaction(anyValue + 1)); 971 assertFalse(store.throttleCompaction(anyValue)); 972 973 // HTD overrides XML. 974 --anyValue; 975 init(name + "-htd", conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)) 976 .setValue(CONFIG_KEY, Long.toString(anyValue)), ColumnFamilyDescriptorBuilder.of(family)); 977 assertTrue(store.throttleCompaction(anyValue + 1)); 978 assertFalse(store.throttleCompaction(anyValue)); 979 980 // HCD overrides them both. 981 --anyValue; 982 init(name + "-hcd", conf, 983 TableDescriptorBuilder.newBuilder(TableName.valueOf(table)).setValue(CONFIG_KEY, 984 Long.toString(anyValue)), 985 ColumnFamilyDescriptorBuilder.newBuilder(family).setValue(CONFIG_KEY, Long.toString(anyValue)) 986 .build()); 987 assertTrue(store.throttleCompaction(anyValue + 1)); 988 assertFalse(store.throttleCompaction(anyValue)); 989 } 990 991 public static class DummyStoreEngine extends DefaultStoreEngine { 992 public static DefaultCompactor lastCreatedCompactor = null; 993 994 @Override 995 protected void createComponents(Configuration conf, HStore store, CellComparator comparator) 996 throws IOException { 997 super.createComponents(conf, store, comparator); 998 lastCreatedCompactor = this.compactor; 999 } 1000 } 1001 1002 @Test 1003 public void testStoreUsesSearchEngineOverride() throws Exception { 1004 Configuration conf = HBaseConfiguration.create(); 1005 conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DummyStoreEngine.class.getName()); 1006 init(name, conf); 1007 assertEquals(DummyStoreEngine.lastCreatedCompactor, this.store.storeEngine.getCompactor()); 1008 } 1009 1010 private void addStoreFile() throws IOException { 1011 HStoreFile f = this.store.getStorefiles().iterator().next(); 1012 Path storedir = f.getPath().getParent(); 1013 long seqid = this.store.getMaxSequenceId().orElse(0L); 1014 Configuration c = TEST_UTIL.getConfiguration(); 1015 FileSystem fs = FileSystem.get(c); 1016 HFileContext fileContext = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).build(); 1017 StoreFileWriter w = new StoreFileWriter.Builder(c, new CacheConfig(c), fs) 1018 .withOutputDir(storedir).withFileContext(fileContext).build(); 1019 w.appendMetadata(seqid + 1, false); 1020 w.close(); 1021 LOG.info("Added store file:" + w.getPath()); 1022 } 1023 1024 private void archiveStoreFile(int index) throws IOException { 1025 Collection<HStoreFile> files = this.store.getStorefiles(); 1026 HStoreFile sf = null; 1027 Iterator<HStoreFile> it = files.iterator(); 1028 for (int i = 0; i <= index; i++) { 1029 sf = it.next(); 1030 } 1031 StoreFileTracker sft = 1032 StoreFileTrackerFactory.create(store.getRegionFileSystem().getFileSystem().getConf(), 1033 store.isPrimaryReplicaStore(), store.getStoreContext()); 1034 sft.removeStoreFiles(Lists.newArrayList(sf)); 1035 } 1036 1037 private void closeCompactedFile(int index) throws IOException { 1038 Collection<HStoreFile> files = 1039 this.store.getStoreEngine().getStoreFileManager().getCompactedfiles(); 1040 if (files.size() > 0) { 1041 HStoreFile sf = null; 1042 Iterator<HStoreFile> it = files.iterator(); 1043 for (int i = 0; i <= index; i++) { 1044 sf = it.next(); 1045 } 1046 sf.closeStoreFile(true); 1047 store.getStoreEngine().getStoreFileManager() 1048 .removeCompactedFiles(Collections.singletonList(sf)); 1049 } 1050 } 1051 1052 @Test 1053 public void testRefreshStoreFiles() throws Exception { 1054 init(name); 1055 1056 assertEquals(0, this.store.getStorefilesCount()); 1057 1058 // Test refreshing store files when no store files are there 1059 store.refreshStoreFiles(); 1060 assertEquals(0, this.store.getStorefilesCount()); 1061 1062 // add some data, flush 1063 this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null); 1064 flush(1); 1065 assertEquals(1, this.store.getStorefilesCount()); 1066 1067 // add one more file 1068 addStoreFile(); 1069 1070 assertEquals(1, this.store.getStorefilesCount()); 1071 store.refreshStoreFiles(); 1072 assertEquals(2, this.store.getStorefilesCount()); 1073 1074 // add three more files 1075 addStoreFile(); 1076 addStoreFile(); 1077 addStoreFile(); 1078 1079 assertEquals(2, this.store.getStorefilesCount()); 1080 store.refreshStoreFiles(); 1081 assertEquals(5, this.store.getStorefilesCount()); 1082 1083 closeCompactedFile(0); 1084 archiveStoreFile(0); 1085 1086 assertEquals(5, this.store.getStorefilesCount()); 1087 store.refreshStoreFiles(); 1088 assertEquals(4, this.store.getStorefilesCount()); 1089 1090 archiveStoreFile(0); 1091 archiveStoreFile(1); 1092 archiveStoreFile(2); 1093 1094 assertEquals(4, this.store.getStorefilesCount()); 1095 store.refreshStoreFiles(); 1096 assertEquals(1, this.store.getStorefilesCount()); 1097 1098 archiveStoreFile(0); 1099 store.refreshStoreFiles(); 1100 assertEquals(0, this.store.getStorefilesCount()); 1101 } 1102 1103 @Test 1104 public void testRefreshStoreFilesNotChanged() throws IOException { 1105 init(name); 1106 1107 assertEquals(0, this.store.getStorefilesCount()); 1108 1109 // add some data, flush 1110 this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null); 1111 flush(1); 1112 // add one more file 1113 addStoreFile(); 1114 1115 StoreEngine<?, ?, ?, ?> spiedStoreEngine = spy(store.getStoreEngine()); 1116 1117 // call first time after files changed 1118 spiedStoreEngine.refreshStoreFiles(); 1119 assertEquals(2, this.store.getStorefilesCount()); 1120 verify(spiedStoreEngine, times(1)).replaceStoreFiles(any(), any(), any(), any()); 1121 1122 // call second time 1123 spiedStoreEngine.refreshStoreFiles(); 1124 1125 // ensure that replaceStoreFiles is not called, i.e, the times does not change, if files are not 1126 // refreshed, 1127 verify(spiedStoreEngine, times(1)).replaceStoreFiles(any(), any(), any(), any()); 1128 } 1129 1130 @Test 1131 public void testScanWithCompactionAfterFlush() throws Exception { 1132 TEST_UTIL.getConfiguration().set(DEFAULT_COMPACTION_POLICY_CLASS_KEY, 1133 EverythingPolicy.class.getName()); 1134 init(name); 1135 1136 assertEquals(0, this.store.getStorefilesCount()); 1137 1138 KeyValue kv = new KeyValue(row, family, qf1, 1, (byte[]) null); 1139 // add some data, flush 1140 this.store.add(kv, null); 1141 flush(1); 1142 kv = new KeyValue(row, family, qf2, 1, (byte[]) null); 1143 // add some data, flush 1144 this.store.add(kv, null); 1145 flush(2); 1146 kv = new KeyValue(row, family, qf3, 1, (byte[]) null); 1147 // add some data, flush 1148 this.store.add(kv, null); 1149 flush(3); 1150 1151 ExecutorService service = Executors.newFixedThreadPool(2); 1152 1153 Scan scan = new Scan(new Get(row)); 1154 Future<KeyValueScanner> scanFuture = service.submit(() -> { 1155 try { 1156 LOG.info(">>>> creating scanner"); 1157 return this.store.createScanner(scan, 1158 new ScanInfo(HBaseConfiguration.create(), 1159 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(4).build(), 1160 Long.MAX_VALUE, 0, CellComparator.getInstance()), 1161 scan.getFamilyMap().get(store.getColumnFamilyDescriptor().getName()), 0); 1162 } catch (IOException e) { 1163 e.printStackTrace(); 1164 return null; 1165 } 1166 }); 1167 Future compactFuture = service.submit(() -> { 1168 try { 1169 LOG.info(">>>>>> starting compaction"); 1170 Optional<CompactionContext> opCompaction = this.store.requestCompaction(); 1171 assertTrue(opCompaction.isPresent()); 1172 store.compact(opCompaction.get(), new NoLimitThroughputController(), User.getCurrent()); 1173 LOG.info(">>>>>> Compaction is finished"); 1174 this.store.closeAndArchiveCompactedFiles(); 1175 LOG.info(">>>>>> Compacted files deleted"); 1176 } catch (IOException e) { 1177 e.printStackTrace(); 1178 } 1179 }); 1180 1181 KeyValueScanner kvs = scanFuture.get(); 1182 compactFuture.get(); 1183 ((StoreScanner) kvs).currentScanners.forEach(s -> { 1184 if (s instanceof StoreFileScanner) { 1185 assertEquals(1, ((StoreFileScanner) s).getReader().getRefCount()); 1186 } 1187 }); 1188 kvs.seek(kv); 1189 service.shutdownNow(); 1190 } 1191 1192 private long countMemStoreScanner(StoreScanner scanner) { 1193 if (scanner.currentScanners == null) { 1194 return 0; 1195 } 1196 return scanner.currentScanners.stream().filter(s -> !s.isFileScanner()).count(); 1197 } 1198 1199 @Test 1200 public void testNumberOfMemStoreScannersAfterFlush() throws IOException { 1201 long seqId = 100; 1202 long timestamp = EnvironmentEdgeManager.currentTime(); 1203 ExtendedCell cell0 = 1204 ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family) 1205 .setQualifier(qf1).setTimestamp(timestamp).setType(Cell.Type.Put).setValue(qf1).build(); 1206 PrivateCellUtil.setSequenceId(cell0, seqId); 1207 testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Collections.emptyList()); 1208 1209 ExtendedCell cell1 = 1210 ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family) 1211 .setQualifier(qf2).setTimestamp(timestamp).setType(Cell.Type.Put).setValue(qf1).build(); 1212 PrivateCellUtil.setSequenceId(cell1, seqId); 1213 testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Arrays.asList(cell1)); 1214 1215 seqId = 101; 1216 timestamp = EnvironmentEdgeManager.currentTime(); 1217 ExtendedCell cell2 = 1218 ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row2).setFamily(family) 1219 .setQualifier(qf2).setTimestamp(timestamp).setType(Cell.Type.Put).setValue(qf1).build(); 1220 PrivateCellUtil.setSequenceId(cell2, seqId); 1221 testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Arrays.asList(cell1, cell2)); 1222 } 1223 1224 private void testNumberOfMemStoreScannersAfterFlush(List<ExtendedCell> inputCellsBeforeSnapshot, 1225 List<ExtendedCell> inputCellsAfterSnapshot) throws IOException { 1226 init(name + "-" + inputCellsBeforeSnapshot.size()); 1227 TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); 1228 long seqId = Long.MIN_VALUE; 1229 for (ExtendedCell c : inputCellsBeforeSnapshot) { 1230 quals.add(CellUtil.cloneQualifier(c)); 1231 seqId = Math.max(seqId, c.getSequenceId()); 1232 } 1233 for (ExtendedCell c : inputCellsAfterSnapshot) { 1234 quals.add(CellUtil.cloneQualifier(c)); 1235 seqId = Math.max(seqId, c.getSequenceId()); 1236 } 1237 inputCellsBeforeSnapshot.forEach(c -> store.add(c, null)); 1238 StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY); 1239 storeFlushCtx.prepare(); 1240 inputCellsAfterSnapshot.forEach(c -> store.add(c, null)); 1241 int numberOfMemScannersBeforeFlush = inputCellsAfterSnapshot.isEmpty() ? 1 : 2; 1242 try (StoreScanner s = (StoreScanner) store.getScanner(new Scan(), quals, seqId)) { 1243 // snapshot + active (if inputCellsAfterSnapshot isn't empty) 1244 assertEquals(numberOfMemScannersBeforeFlush, countMemStoreScanner(s)); 1245 storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class)); 1246 storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); 1247 // snapshot has no data after flush 1248 int numberOfMemScannersAfterFlush = inputCellsAfterSnapshot.isEmpty() ? 0 : 1; 1249 boolean more; 1250 int cellCount = 0; 1251 do { 1252 List<Cell> cells = new ArrayList<>(); 1253 more = s.next(cells); 1254 cellCount += cells.size(); 1255 assertEquals(more ? numberOfMemScannersAfterFlush : 0, countMemStoreScanner(s)); 1256 } while (more); 1257 assertEquals(inputCellsBeforeSnapshot.size() + inputCellsAfterSnapshot.size(), cellCount, 1258 "The number of cells added before snapshot is " + inputCellsBeforeSnapshot.size() 1259 + ", The number of cells added after snapshot is " + inputCellsAfterSnapshot.size()); 1260 // the current scanners is cleared 1261 assertEquals(0, countMemStoreScanner(s)); 1262 } 1263 } 1264 1265 private ExtendedCell createCell(byte[] qualifier, long ts, long sequenceId, byte[] value) 1266 throws IOException { 1267 return createCell(row, qualifier, ts, sequenceId, value); 1268 } 1269 1270 private ExtendedCell createCell(byte[] row, byte[] qualifier, long ts, long sequenceId, 1271 byte[] value) throws IOException { 1272 return ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row) 1273 .setFamily(family).setQualifier(qualifier).setTimestamp(ts).setType(Cell.Type.Put) 1274 .setValue(value).setSequenceId(sequenceId).build(); 1275 } 1276 1277 private ExtendedCell createDeleteCell(byte[] row, byte[] qualifier, long ts, long sequenceId) { 1278 return ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row) 1279 .setFamily(family).setQualifier(qualifier).setTimestamp(ts).setType(Cell.Type.DeleteColumn) 1280 .setSequenceId(sequenceId).build(); 1281 } 1282 1283 @Test 1284 public void testFlushBeforeCompletingScanWoFilter() throws IOException, InterruptedException { 1285 final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false); 1286 final int expectedSize = 3; 1287 testFlushBeforeCompletingScan(new MyListHook() { 1288 @Override 1289 public void hook(int currentSize) { 1290 if (currentSize == expectedSize - 1) { 1291 try { 1292 flushStore(store, id++); 1293 timeToGoNextRow.set(true); 1294 } catch (IOException e) { 1295 throw new RuntimeException(e); 1296 } 1297 } 1298 } 1299 }, new FilterBase() { 1300 @Override 1301 public Filter.ReturnCode filterCell(final Cell c) throws IOException { 1302 return ReturnCode.INCLUDE; 1303 } 1304 }, expectedSize); 1305 } 1306 1307 @Test 1308 public void testFlushBeforeCompletingScanWithFilter() throws IOException, InterruptedException { 1309 final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false); 1310 final int expectedSize = 2; 1311 testFlushBeforeCompletingScan(new MyListHook() { 1312 @Override 1313 public void hook(int currentSize) { 1314 if (currentSize == expectedSize - 1) { 1315 try { 1316 flushStore(store, id++); 1317 timeToGoNextRow.set(true); 1318 } catch (IOException e) { 1319 throw new RuntimeException(e); 1320 } 1321 } 1322 } 1323 }, new FilterBase() { 1324 @Override 1325 public Filter.ReturnCode filterCell(final Cell c) throws IOException { 1326 if (timeToGoNextRow.get()) { 1327 timeToGoNextRow.set(false); 1328 return ReturnCode.NEXT_ROW; 1329 } else { 1330 return ReturnCode.INCLUDE; 1331 } 1332 } 1333 }, expectedSize); 1334 } 1335 1336 @Test 1337 public void testFlushBeforeCompletingScanWithFilterHint() 1338 throws IOException, InterruptedException { 1339 final AtomicBoolean timeToGetHint = new AtomicBoolean(false); 1340 final int expectedSize = 2; 1341 testFlushBeforeCompletingScan(new MyListHook() { 1342 @Override 1343 public void hook(int currentSize) { 1344 if (currentSize == expectedSize - 1) { 1345 try { 1346 flushStore(store, id++); 1347 timeToGetHint.set(true); 1348 } catch (IOException e) { 1349 throw new RuntimeException(e); 1350 } 1351 } 1352 } 1353 }, new FilterBase() { 1354 @Override 1355 public Filter.ReturnCode filterCell(final Cell c) throws IOException { 1356 if (timeToGetHint.get()) { 1357 timeToGetHint.set(false); 1358 return Filter.ReturnCode.SEEK_NEXT_USING_HINT; 1359 } else { 1360 return Filter.ReturnCode.INCLUDE; 1361 } 1362 } 1363 1364 @Override 1365 public Cell getNextCellHint(Cell currentCell) throws IOException { 1366 return currentCell; 1367 } 1368 }, expectedSize); 1369 } 1370 1371 private void testFlushBeforeCompletingScan(MyListHook hook, Filter filter, int expectedSize) 1372 throws IOException, InterruptedException { 1373 Configuration conf = HBaseConfiguration.create(); 1374 byte[] r0 = Bytes.toBytes("row0"); 1375 byte[] r1 = Bytes.toBytes("row1"); 1376 byte[] r2 = Bytes.toBytes("row2"); 1377 byte[] value0 = Bytes.toBytes("value0"); 1378 byte[] value1 = Bytes.toBytes("value1"); 1379 byte[] value2 = Bytes.toBytes("value2"); 1380 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 1381 long ts = EnvironmentEdgeManager.currentTime(); 1382 long seqId = 100; 1383 init(name, conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), 1384 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(1).build(), 1385 new MyStoreHook() { 1386 @Override 1387 public long getSmallestReadPoint(HStore store) { 1388 return seqId + 3; 1389 } 1390 }); 1391 // The cells having the value0 won't be flushed to disk because the value of max version is 1 1392 store.add(createCell(r0, qf1, ts, seqId, value0), memStoreSizing); 1393 store.add(createCell(r0, qf2, ts, seqId, value0), memStoreSizing); 1394 store.add(createCell(r0, qf3, ts, seqId, value0), memStoreSizing); 1395 store.add(createCell(r1, qf1, ts + 1, seqId + 1, value1), memStoreSizing); 1396 store.add(createCell(r1, qf2, ts + 1, seqId + 1, value1), memStoreSizing); 1397 store.add(createCell(r1, qf3, ts + 1, seqId + 1, value1), memStoreSizing); 1398 store.add(createCell(r2, qf1, ts + 2, seqId + 2, value2), memStoreSizing); 1399 store.add(createCell(r2, qf2, ts + 2, seqId + 2, value2), memStoreSizing); 1400 store.add(createCell(r2, qf3, ts + 2, seqId + 2, value2), memStoreSizing); 1401 store.add(createCell(r1, qf1, ts + 3, seqId + 3, value1), memStoreSizing); 1402 store.add(createCell(r1, qf2, ts + 3, seqId + 3, value1), memStoreSizing); 1403 store.add(createCell(r1, qf3, ts + 3, seqId + 3, value1), memStoreSizing); 1404 List<Cell> myList = new MyList<>(hook); 1405 Scan scan = new Scan().withStartRow(r1).setFilter(filter); 1406 try (InternalScanner scanner = (InternalScanner) store.getScanner(scan, null, seqId + 3)) { 1407 // r1 1408 scanner.next(myList); 1409 assertEquals(expectedSize, myList.size()); 1410 for (Cell c : myList) { 1411 byte[] actualValue = CellUtil.cloneValue(c); 1412 assertTrue(Bytes.equals(actualValue, value1), "expected:" + Bytes.toStringBinary(value1) 1413 + ", actual:" + Bytes.toStringBinary(actualValue)); 1414 } 1415 List<Cell> normalList = new ArrayList<>(3); 1416 // r2 1417 scanner.next(normalList); 1418 assertEquals(3, normalList.size()); 1419 for (Cell c : normalList) { 1420 byte[] actualValue = CellUtil.cloneValue(c); 1421 assertTrue(Bytes.equals(actualValue, value2), "expected:" + Bytes.toStringBinary(value2) 1422 + ", actual:" + Bytes.toStringBinary(actualValue)); 1423 } 1424 } 1425 } 1426 1427 @Test 1428 public void testFlushBeforeCompletingScanWithDeleteCell() throws IOException { 1429 final Configuration conf = HBaseConfiguration.create(); 1430 1431 byte[] r1 = Bytes.toBytes("row1"); 1432 byte[] r2 = Bytes.toBytes("row2"); 1433 1434 byte[] value1 = Bytes.toBytes("value1"); 1435 byte[] value2 = Bytes.toBytes("value2"); 1436 1437 final MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 1438 final long ts = EnvironmentEdgeManager.currentTime(); 1439 final long seqId = 100; 1440 1441 init(name, conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), 1442 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(1).build(), 1443 new MyStoreHook() { 1444 @Override 1445 long getSmallestReadPoint(HStore store) { 1446 return seqId + 3; 1447 } 1448 }); 1449 1450 store.add(createCell(r1, qf1, ts + 1, seqId + 1, value2), memStoreSizing); 1451 store.add(createCell(r1, qf2, ts + 1, seqId + 1, value2), memStoreSizing); 1452 store.add(createCell(r1, qf3, ts + 1, seqId + 1, value2), memStoreSizing); 1453 1454 store.add(createDeleteCell(r1, qf1, ts + 2, seqId + 2), memStoreSizing); 1455 store.add(createDeleteCell(r1, qf2, ts + 2, seqId + 2), memStoreSizing); 1456 store.add(createDeleteCell(r1, qf3, ts + 2, seqId + 2), memStoreSizing); 1457 1458 store.add(createCell(r2, qf1, ts + 3, seqId + 3, value1), memStoreSizing); 1459 store.add(createCell(r2, qf2, ts + 3, seqId + 3, value1), memStoreSizing); 1460 store.add(createCell(r2, qf3, ts + 3, seqId + 3, value1), memStoreSizing); 1461 1462 Scan scan = new Scan().withStartRow(r1); 1463 1464 try (final InternalScanner scanner = 1465 new StoreScanner(store, store.getScanInfo(), scan, null, seqId + 3) { 1466 @Override 1467 protected KeyValueHeap newKVHeap(List<? extends KeyValueScanner> scanners, 1468 CellComparator comparator) throws IOException { 1469 return new MyKeyValueHeap(scanners, comparator, recordBlockSizeCallCount -> { 1470 if (recordBlockSizeCallCount == 6) { 1471 try { 1472 flushStore(store, id++); 1473 } catch (IOException e) { 1474 throw new RuntimeException(e); 1475 } 1476 } 1477 }); 1478 } 1479 }) { 1480 List<Cell> cellResult = new ArrayList<>(); 1481 1482 scanner.next(cellResult); 1483 assertEquals(0, cellResult.size()); 1484 1485 cellResult.clear(); 1486 1487 scanner.next(cellResult); 1488 assertEquals(3, cellResult.size()); 1489 for (Cell cell : cellResult) { 1490 assertArrayEquals(r2, CellUtil.cloneRow(cell)); 1491 } 1492 } 1493 } 1494 1495 @Test 1496 public void testCreateScannerAndSnapshotConcurrently() throws IOException, InterruptedException { 1497 Configuration conf = HBaseConfiguration.create(); 1498 conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore.class.getName()); 1499 init(name, conf, ColumnFamilyDescriptorBuilder.newBuilder(family) 1500 .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); 1501 byte[] value = Bytes.toBytes("value"); 1502 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 1503 long ts = EnvironmentEdgeManager.currentTime(); 1504 long seqId = 100; 1505 // older data whihc shouldn't be "seen" by client 1506 store.add(createCell(qf1, ts, seqId, value), memStoreSizing); 1507 store.add(createCell(qf2, ts, seqId, value), memStoreSizing); 1508 store.add(createCell(qf3, ts, seqId, value), memStoreSizing); 1509 TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); 1510 quals.add(qf1); 1511 quals.add(qf2); 1512 quals.add(qf3); 1513 StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY); 1514 MyCompactingMemStore.START_TEST.set(true); 1515 Runnable flush = () -> { 1516 // this is blocked until we create first scanner from pipeline and snapshot -- phase (1/5) 1517 // recreate the active memstore -- phase (4/5) 1518 storeFlushCtx.prepare(); 1519 }; 1520 ExecutorService service = Executors.newSingleThreadExecutor(); 1521 service.execute(flush); 1522 // we get scanner from pipeline and snapshot but they are empty. -- phase (2/5) 1523 // this is blocked until we recreate the active memstore -- phase (3/5) 1524 // we get scanner from active memstore but it is empty -- phase (5/5) 1525 InternalScanner scanner = 1526 (InternalScanner) store.getScanner(new Scan(new Get(row)), quals, seqId + 1); 1527 service.shutdown(); 1528 service.awaitTermination(20, TimeUnit.SECONDS); 1529 try { 1530 try { 1531 List<Cell> results = new ArrayList<>(); 1532 scanner.next(results); 1533 assertEquals(3, results.size()); 1534 for (Cell c : results) { 1535 byte[] actualValue = CellUtil.cloneValue(c); 1536 assertTrue(Bytes.equals(actualValue, value), "expected:" + Bytes.toStringBinary(value) 1537 + ", actual:" + Bytes.toStringBinary(actualValue)); 1538 } 1539 } finally { 1540 scanner.close(); 1541 } 1542 } finally { 1543 MyCompactingMemStore.START_TEST.set(false); 1544 storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class)); 1545 storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); 1546 } 1547 } 1548 1549 @Test 1550 public void testScanWithDoubleFlush() throws IOException { 1551 Configuration conf = HBaseConfiguration.create(); 1552 // Initialize region 1553 MyStore myStore = initMyStore(name, conf, new MyStoreHook() { 1554 @Override 1555 public void getScanners(MyStore store) throws IOException { 1556 final long tmpId = id++; 1557 ExecutorService s = Executors.newSingleThreadExecutor(); 1558 s.execute(() -> { 1559 try { 1560 // flush the store before storescanner updates the scanners from store. 1561 // The current data will be flushed into files, and the memstore will 1562 // be clear. 1563 // -- phase (4/4) 1564 flushStore(store, tmpId); 1565 } catch (IOException ex) { 1566 throw new RuntimeException(ex); 1567 } 1568 }); 1569 s.shutdown(); 1570 try { 1571 // wait for the flush, the thread will be blocked in HStore#notifyChangedReadersObservers. 1572 s.awaitTermination(3, TimeUnit.SECONDS); 1573 } catch (InterruptedException ex) { 1574 } 1575 } 1576 }); 1577 byte[] oldValue = Bytes.toBytes("oldValue"); 1578 byte[] currentValue = Bytes.toBytes("currentValue"); 1579 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 1580 long ts = EnvironmentEdgeManager.currentTime(); 1581 long seqId = 100; 1582 // older data whihc shouldn't be "seen" by client 1583 myStore.add(createCell(qf1, ts, seqId, oldValue), memStoreSizing); 1584 myStore.add(createCell(qf2, ts, seqId, oldValue), memStoreSizing); 1585 myStore.add(createCell(qf3, ts, seqId, oldValue), memStoreSizing); 1586 long snapshotId = id++; 1587 // push older data into snapshot -- phase (1/4) 1588 StoreFlushContext storeFlushCtx = 1589 store.createFlushContext(snapshotId, FlushLifeCycleTracker.DUMMY); 1590 storeFlushCtx.prepare(); 1591 1592 // insert current data into active -- phase (2/4) 1593 myStore.add(createCell(qf1, ts + 1, seqId + 1, currentValue), memStoreSizing); 1594 myStore.add(createCell(qf2, ts + 1, seqId + 1, currentValue), memStoreSizing); 1595 myStore.add(createCell(qf3, ts + 1, seqId + 1, currentValue), memStoreSizing); 1596 TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); 1597 quals.add(qf1); 1598 quals.add(qf2); 1599 quals.add(qf3); 1600 try (InternalScanner scanner = 1601 (InternalScanner) myStore.getScanner(new Scan(new Get(row)), quals, seqId + 1)) { 1602 // complete the flush -- phase (3/4) 1603 storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class)); 1604 storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); 1605 1606 List<Cell> results = new ArrayList<>(); 1607 scanner.next(results); 1608 assertEquals(3, results.size()); 1609 for (Cell c : results) { 1610 byte[] actualValue = CellUtil.cloneValue(c); 1611 assertTrue(Bytes.equals(actualValue, currentValue), "expected:" 1612 + Bytes.toStringBinary(currentValue) + ", actual:" + Bytes.toStringBinary(actualValue)); 1613 } 1614 } 1615 } 1616 1617 /** 1618 * This test is for HBASE-27519, when the {@link StoreScanner} is scanning,the Flush and the 1619 * Compaction execute concurrently and theCcompaction compact and archive the flushed 1620 * {@link HStoreFile} which is used by {@link StoreScanner#updateReaders}.Before 1621 * HBASE-27519,{@link StoreScanner.updateReaders} would throw {@link FileNotFoundException}. 1622 */ 1623 @Test 1624 public void testStoreScannerUpdateReadersWhenFlushAndCompactConcurrently() throws IOException { 1625 Configuration conf = HBaseConfiguration.create(); 1626 conf.setBoolean(WALFactory.WAL_ENABLED, false); 1627 conf.set(DEFAULT_COMPACTION_POLICY_CLASS_KEY, EverythingPolicy.class.getName()); 1628 byte[] r0 = Bytes.toBytes("row0"); 1629 byte[] r1 = Bytes.toBytes("row1"); 1630 final CyclicBarrier cyclicBarrier = new CyclicBarrier(2); 1631 final AtomicBoolean shouldWaitRef = new AtomicBoolean(false); 1632 // Initialize region 1633 final MyStore myStore = initMyStore(name, conf, new MyStoreHook() { 1634 @Override 1635 public void getScanners(MyStore store) throws IOException { 1636 try { 1637 // Here this method is called by StoreScanner.updateReaders which is invoked by the 1638 // following TestHStore.flushStore 1639 if (shouldWaitRef.get()) { 1640 // wait the following compaction Task start 1641 cyclicBarrier.await(); 1642 // wait the following HStore.closeAndArchiveCompactedFiles end. 1643 cyclicBarrier.await(); 1644 } 1645 } catch (BrokenBarrierException | InterruptedException e) { 1646 throw new RuntimeException(e); 1647 } 1648 } 1649 }); 1650 1651 final AtomicReference<Throwable> compactionExceptionRef = new AtomicReference<Throwable>(null); 1652 Runnable compactionTask = () -> { 1653 try { 1654 // Only when the StoreScanner.updateReaders invoked by TestHStore.flushStore prepares for 1655 // entering the MyStore.getScanners, compactionTask could start. 1656 cyclicBarrier.await(); 1657 region.compactStore(family, new NoLimitThroughputController()); 1658 myStore.closeAndArchiveCompactedFiles(); 1659 // Notify StoreScanner.updateReaders could enter MyStore.getScanners. 1660 cyclicBarrier.await(); 1661 } catch (Throwable e) { 1662 compactionExceptionRef.set(e); 1663 } 1664 }; 1665 1666 long ts = EnvironmentEdgeManager.currentTime(); 1667 long seqId = 100; 1668 byte[] value = Bytes.toBytes("value"); 1669 // older data whihc shouldn't be "seen" by client 1670 myStore.add(createCell(r0, qf1, ts, seqId, value), null); 1671 flushStore(myStore, id++); 1672 myStore.add(createCell(r0, qf2, ts, seqId, value), null); 1673 flushStore(myStore, id++); 1674 myStore.add(createCell(r0, qf3, ts, seqId, value), null); 1675 TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); 1676 quals.add(qf1); 1677 quals.add(qf2); 1678 quals.add(qf3); 1679 1680 myStore.add(createCell(r1, qf1, ts, seqId, value), null); 1681 myStore.add(createCell(r1, qf2, ts, seqId, value), null); 1682 myStore.add(createCell(r1, qf3, ts, seqId, value), null); 1683 1684 Thread.currentThread() 1685 .setName("testStoreScannerUpdateReadersWhenFlushAndCompactConcurrently thread"); 1686 Scan scan = new Scan(); 1687 scan.withStartRow(r0, true); 1688 try (InternalScanner scanner = (InternalScanner) myStore.getScanner(scan, quals, seqId)) { 1689 List<Cell> results = new MyList<>(size -> { 1690 switch (size) { 1691 case 1: 1692 shouldWaitRef.set(true); 1693 Thread thread = new Thread(compactionTask); 1694 thread.setName("MyCompacting Thread."); 1695 thread.start(); 1696 try { 1697 flushStore(myStore, id++); 1698 thread.join(); 1699 } catch (IOException | InterruptedException e) { 1700 throw new RuntimeException(e); 1701 } 1702 shouldWaitRef.set(false); 1703 break; 1704 default: 1705 break; 1706 } 1707 }); 1708 // Before HBASE-27519, here would throw java.io.FileNotFoundException because the storeFile 1709 // which used by StoreScanner.updateReaders is deleted by compactionTask. 1710 scanner.next(results); 1711 // The results is r0 row cells. 1712 assertEquals(3, results.size()); 1713 assertTrue(compactionExceptionRef.get() == null); 1714 } 1715 } 1716 1717 @Test 1718 public void testReclaimChunkWhenScaning() throws IOException { 1719 init("testReclaimChunkWhenScaning"); 1720 long ts = EnvironmentEdgeManager.currentTime(); 1721 long seqId = 100; 1722 byte[] value = Bytes.toBytes("value"); 1723 // older data whihc shouldn't be "seen" by client 1724 store.add(createCell(qf1, ts, seqId, value), null); 1725 store.add(createCell(qf2, ts, seqId, value), null); 1726 store.add(createCell(qf3, ts, seqId, value), null); 1727 TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); 1728 quals.add(qf1); 1729 quals.add(qf2); 1730 quals.add(qf3); 1731 try (InternalScanner scanner = 1732 (InternalScanner) store.getScanner(new Scan(new Get(row)), quals, seqId)) { 1733 List<Cell> results = new MyList<>(size -> { 1734 switch (size) { 1735 // 1) we get the first cell (qf1) 1736 // 2) flush the data to have StoreScanner update inner scanners 1737 // 3) the chunk will be reclaimed after updaing 1738 case 1: 1739 try { 1740 flushStore(store, id++); 1741 } catch (IOException e) { 1742 throw new RuntimeException(e); 1743 } 1744 break; 1745 // 1) we get the second cell (qf2) 1746 // 2) add some cell to fill some byte into the chunk (we have only one chunk) 1747 case 2: 1748 try { 1749 byte[] newValue = Bytes.toBytes("newValue"); 1750 // older data whihc shouldn't be "seen" by client 1751 store.add(createCell(qf1, ts + 1, seqId + 1, newValue), null); 1752 store.add(createCell(qf2, ts + 1, seqId + 1, newValue), null); 1753 store.add(createCell(qf3, ts + 1, seqId + 1, newValue), null); 1754 } catch (IOException e) { 1755 throw new RuntimeException(e); 1756 } 1757 break; 1758 default: 1759 break; 1760 } 1761 }); 1762 scanner.next(results); 1763 assertEquals(3, results.size()); 1764 for (Cell c : results) { 1765 byte[] actualValue = CellUtil.cloneValue(c); 1766 assertTrue(Bytes.equals(actualValue, value), "expected:" + Bytes.toStringBinary(value) 1767 + ", actual:" + Bytes.toStringBinary(actualValue)); 1768 } 1769 } 1770 } 1771 1772 /** 1773 * If there are two running InMemoryFlushRunnable, the later InMemoryFlushRunnable may change the 1774 * versionedList. And the first InMemoryFlushRunnable will use the chagned versionedList to remove 1775 * the corresponding segments. In short, there will be some segements which isn't in merge are 1776 * removed. 1777 */ 1778 @Test 1779 public void testRunDoubleMemStoreCompactors() throws IOException, InterruptedException { 1780 int flushSize = 500; 1781 Configuration conf = HBaseConfiguration.create(); 1782 conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStoreWithCustomCompactor.class.getName()); 1783 conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.25); 1784 MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.set(0); 1785 conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushSize)); 1786 // Set the lower threshold to invoke the "MERGE" policy 1787 conf.set(MemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY, String.valueOf(0)); 1788 init(name, conf, ColumnFamilyDescriptorBuilder.newBuilder(family) 1789 .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); 1790 byte[] value = Bytes.toBytes("thisisavarylargevalue"); 1791 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 1792 long ts = EnvironmentEdgeManager.currentTime(); 1793 long seqId = 100; 1794 // older data whihc shouldn't be "seen" by client 1795 store.add(createCell(qf1, ts, seqId, value), memStoreSizing); 1796 store.add(createCell(qf2, ts, seqId, value), memStoreSizing); 1797 store.add(createCell(qf3, ts, seqId, value), memStoreSizing); 1798 assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get()); 1799 StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY); 1800 storeFlushCtx.prepare(); 1801 // This shouldn't invoke another in-memory flush because the first compactor thread 1802 // hasn't accomplished the in-memory compaction. 1803 store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing); 1804 store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing); 1805 store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing); 1806 assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get()); 1807 // okay. Let the compaction be completed 1808 MyMemStoreCompactor.START_COMPACTOR_LATCH.countDown(); 1809 CompactingMemStore mem = (CompactingMemStore) ((HStore) store).memstore; 1810 while (mem.isMemStoreFlushingInMemory()) { 1811 TimeUnit.SECONDS.sleep(1); 1812 } 1813 // This should invoke another in-memory flush. 1814 store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing); 1815 store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing); 1816 store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing); 1817 assertEquals(2, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get()); 1818 conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1819 String.valueOf(TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE)); 1820 storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class)); 1821 storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); 1822 } 1823 1824 @Test 1825 public void testAge() throws IOException { 1826 long currentTime = EnvironmentEdgeManager.currentTime(); 1827 ManualEnvironmentEdge edge = new ManualEnvironmentEdge(); 1828 edge.setValue(currentTime); 1829 EnvironmentEdgeManager.injectEdge(edge); 1830 Configuration conf = TEST_UTIL.getConfiguration(); 1831 ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.of(family); 1832 initHRegion(name, conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), hcd, null, 1833 false); 1834 HStore store = new HStore(region, hcd, conf, false) { 1835 1836 @Override 1837 protected StoreEngine<?, ?, ?, ?> createStoreEngine(HStore store, Configuration conf, 1838 CellComparator kvComparator) throws IOException { 1839 List<HStoreFile> storefiles = 1840 Arrays.asList(mockStoreFile(currentTime - 10), mockStoreFile(currentTime - 100), 1841 mockStoreFile(currentTime - 1000), mockStoreFile(currentTime - 10000)); 1842 StoreFileManager sfm = mock(StoreFileManager.class); 1843 when(sfm.getStoreFiles()).thenReturn(storefiles); 1844 StoreEngine<?, ?, ?, ?> storeEngine = mock(StoreEngine.class); 1845 when(storeEngine.getStoreFileManager()).thenReturn(sfm); 1846 return storeEngine; 1847 } 1848 }; 1849 assertEquals(10L, store.getMinStoreFileAge().getAsLong()); 1850 assertEquals(10000L, store.getMaxStoreFileAge().getAsLong()); 1851 assertEquals((10 + 100 + 1000 + 10000) / 4.0, store.getAvgStoreFileAge().getAsDouble(), 1E-4); 1852 } 1853 1854 private HStoreFile mockStoreFile(long createdTime) { 1855 StoreFileInfo info = mock(StoreFileInfo.class); 1856 when(info.getCreatedTimestamp()).thenReturn(createdTime); 1857 HStoreFile sf = mock(HStoreFile.class); 1858 when(sf.getReader()).thenReturn(mock(StoreFileReader.class)); 1859 when(sf.isHFile()).thenReturn(true); 1860 when(sf.getFileInfo()).thenReturn(info); 1861 return sf; 1862 } 1863 1864 private MyStore initMyStore(String methodName, Configuration conf, MyStoreHook hook) 1865 throws IOException { 1866 return (MyStore) init(methodName, conf, 1867 TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), 1868 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(5).build(), hook); 1869 } 1870 1871 private static class MyStore extends HStore { 1872 private final MyStoreHook hook; 1873 1874 MyStore(final HRegion region, final ColumnFamilyDescriptor family, 1875 final Configuration confParam, MyStoreHook hook, boolean switchToPread) throws IOException { 1876 super(region, family, confParam, false); 1877 this.hook = hook; 1878 } 1879 1880 @Override 1881 public List<KeyValueScanner> getScanners(List<HStoreFile> files, boolean cacheBlocks, 1882 boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, 1883 boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt, 1884 boolean includeMemstoreScanner, boolean onlyLatestVersion) throws IOException { 1885 hook.getScanners(this); 1886 return super.getScanners(files, cacheBlocks, usePread, isCompaction, matcher, startRow, true, 1887 stopRow, false, readPt, includeMemstoreScanner, onlyLatestVersion); 1888 } 1889 1890 @Override 1891 public long getSmallestReadPoint() { 1892 return hook.getSmallestReadPoint(this); 1893 } 1894 } 1895 1896 private abstract static class MyStoreHook { 1897 1898 void getScanners(MyStore store) throws IOException { 1899 } 1900 1901 long getSmallestReadPoint(HStore store) { 1902 return store.getHRegion().getSmallestReadPoint(); 1903 } 1904 } 1905 1906 @Test 1907 public void testSwitchingPreadtoStreamParallelyWithCompactionDischarger() throws Exception { 1908 Configuration conf = HBaseConfiguration.create(); 1909 conf.set("hbase.hstore.engine.class", DummyStoreEngine.class.getName()); 1910 conf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 0); 1911 // Set the lower threshold to invoke the "MERGE" policy 1912 MyStore store = initMyStore(name, conf, new MyStoreHook() { 1913 }); 1914 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 1915 long ts = EnvironmentEdgeManager.currentTime(); 1916 long seqID = 1L; 1917 // Add some data to the region and do some flushes 1918 for (int i = 1; i < 10; i++) { 1919 store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), 1920 memStoreSizing); 1921 } 1922 // flush them 1923 flushStore(store, seqID); 1924 for (int i = 11; i < 20; i++) { 1925 store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), 1926 memStoreSizing); 1927 } 1928 // flush them 1929 flushStore(store, seqID); 1930 for (int i = 21; i < 30; i++) { 1931 store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), 1932 memStoreSizing); 1933 } 1934 // flush them 1935 flushStore(store, seqID); 1936 1937 assertEquals(3, store.getStorefilesCount()); 1938 Scan scan = new Scan(); 1939 scan.addFamily(family); 1940 Collection<HStoreFile> storefiles2 = store.getStorefiles(); 1941 ArrayList<HStoreFile> actualStorefiles = Lists.newArrayList(storefiles2); 1942 StoreScanner storeScanner = 1943 (StoreScanner) store.getScanner(scan, scan.getFamilyMap().get(family), Long.MAX_VALUE); 1944 // get the current heap 1945 KeyValueHeap heap = storeScanner.heap; 1946 // create more store files 1947 for (int i = 31; i < 40; i++) { 1948 store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), 1949 memStoreSizing); 1950 } 1951 // flush them 1952 flushStore(store, seqID); 1953 1954 for (int i = 41; i < 50; i++) { 1955 store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), 1956 memStoreSizing); 1957 } 1958 // flush them 1959 flushStore(store, seqID); 1960 storefiles2 = store.getStorefiles(); 1961 ArrayList<HStoreFile> actualStorefiles1 = Lists.newArrayList(storefiles2); 1962 actualStorefiles1.removeAll(actualStorefiles); 1963 // Do compaction 1964 MyThread thread = new MyThread(storeScanner); 1965 thread.start(); 1966 store.replaceStoreFiles(actualStorefiles, actualStorefiles1, false); 1967 thread.join(); 1968 KeyValueHeap heap2 = thread.getHeap(); 1969 assertFalse(heap.equals(heap2)); 1970 } 1971 1972 @Test 1973 public void testMaxPreadBytesConfiguredToBeLessThanZero() throws Exception { 1974 Configuration conf = HBaseConfiguration.create(); 1975 conf.set("hbase.hstore.engine.class", DummyStoreEngine.class.getName()); 1976 // Set 'hbase.storescanner.pread.max.bytes' < 0, so that StoreScanner will be a STREAM type. 1977 conf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, -1); 1978 MyStore store = initMyStore(name, conf, new MyStoreHook() { 1979 }); 1980 Scan scan = new Scan(); 1981 scan.addFamily(family); 1982 // ReadType on Scan is still DEFAULT only. 1983 assertEquals(ReadType.DEFAULT, scan.getReadType()); 1984 StoreScanner storeScanner = 1985 (StoreScanner) store.getScanner(scan, scan.getFamilyMap().get(family), Long.MAX_VALUE); 1986 assertFalse(storeScanner.isScanUsePread()); 1987 } 1988 1989 @Test 1990 public void testInMemoryCompactionTypeWithLowerCase() throws IOException, InterruptedException { 1991 Configuration conf = HBaseConfiguration.create(); 1992 conf.set("hbase.systemtables.compacting.memstore.type", "eager"); 1993 init(name, conf, 1994 TableDescriptorBuilder.newBuilder( 1995 TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME, "meta".getBytes())), 1996 ColumnFamilyDescriptorBuilder.newBuilder(family) 1997 .setInMemoryCompaction(MemoryCompactionPolicy.NONE).build()); 1998 assertTrue(((MemStoreCompactor) ((CompactingMemStore) store.memstore).compactor).toString() 1999 .startsWith("eager".toUpperCase())); 2000 } 2001 2002 @Test 2003 public void testSpaceQuotaChangeAfterReplacement() throws IOException { 2004 final TableName tn = TableName.valueOf(name); 2005 init(name); 2006 2007 RegionSizeStoreImpl sizeStore = new RegionSizeStoreImpl(); 2008 2009 HStoreFile sf1 = mockStoreFileWithLength(1024L); 2010 HStoreFile sf2 = mockStoreFileWithLength(2048L); 2011 HStoreFile sf3 = mockStoreFileWithLength(4096L); 2012 HStoreFile sf4 = mockStoreFileWithLength(8192L); 2013 2014 RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tn).setStartKey(Bytes.toBytes("a")) 2015 .setEndKey(Bytes.toBytes("b")).build(); 2016 2017 // Compacting two files down to one, reducing size 2018 sizeStore.put(regionInfo, 1024L + 4096L); 2019 store.updateSpaceQuotaAfterFileReplacement(sizeStore, regionInfo, Arrays.asList(sf1, sf3), 2020 Arrays.asList(sf2)); 2021 2022 assertEquals(2048L, sizeStore.getRegionSize(regionInfo).getSize()); 2023 2024 // The same file length in and out should have no change 2025 store.updateSpaceQuotaAfterFileReplacement(sizeStore, regionInfo, Arrays.asList(sf2), 2026 Arrays.asList(sf2)); 2027 2028 assertEquals(2048L, sizeStore.getRegionSize(regionInfo).getSize()); 2029 2030 // Increase the total size used 2031 store.updateSpaceQuotaAfterFileReplacement(sizeStore, regionInfo, Arrays.asList(sf2), 2032 Arrays.asList(sf3)); 2033 2034 assertEquals(4096L, sizeStore.getRegionSize(regionInfo).getSize()); 2035 2036 RegionInfo regionInfo2 = RegionInfoBuilder.newBuilder(tn).setStartKey(Bytes.toBytes("b")) 2037 .setEndKey(Bytes.toBytes("c")).build(); 2038 store.updateSpaceQuotaAfterFileReplacement(sizeStore, regionInfo2, null, Arrays.asList(sf4)); 2039 2040 assertEquals(8192L, sizeStore.getRegionSize(regionInfo2).getSize()); 2041 } 2042 2043 @Test 2044 public void testHFileContextSetWithCFAndTable() throws Exception { 2045 init(name); 2046 StoreFileWriter writer = store.getStoreEngine() 2047 .createWriter(CreateStoreFileWriterParams.create().maxKeyCount(10000L) 2048 .compression(Compression.Algorithm.NONE).isCompaction(true).includeMVCCReadpoint(true) 2049 .includesTag(false).shouldDropBehind(true)); 2050 HFileContext hFileContext = writer.getLiveFileWriter().getFileContext(); 2051 assertArrayEquals(family, hFileContext.getColumnFamily()); 2052 assertArrayEquals(table, hFileContext.getTableName()); 2053 } 2054 2055 // This test is for HBASE-26026, HBase Write be stuck when active segment has no cell 2056 // but its dataSize exceeds inmemoryFlushSize 2057 @Test 2058 public void testCompactingMemStoreNoCellButDataSizeExceedsInmemoryFlushSize() 2059 throws IOException, InterruptedException { 2060 Configuration conf = HBaseConfiguration.create(); 2061 2062 byte[] smallValue = new byte[3]; 2063 byte[] largeValue = new byte[9]; 2064 final long timestamp = EnvironmentEdgeManager.currentTime(); 2065 final long seqId = 100; 2066 final ExtendedCell smallCell = createCell(qf1, timestamp, seqId, smallValue); 2067 final ExtendedCell largeCell = createCell(qf2, timestamp, seqId, largeValue); 2068 int smallCellByteSize = MutableSegment.getCellLength(smallCell); 2069 int largeCellByteSize = MutableSegment.getCellLength(largeCell); 2070 int flushByteSize = smallCellByteSize + largeCellByteSize - 2; 2071 2072 // set CompactingMemStore.inmemoryFlushSize to flushByteSize. 2073 conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore2.class.getName()); 2074 conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005); 2075 conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200)); 2076 2077 init(name, conf, ColumnFamilyDescriptorBuilder.newBuilder(family) 2078 .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); 2079 2080 MyCompactingMemStore2 myCompactingMemStore = ((MyCompactingMemStore2) store.memstore); 2081 assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize); 2082 myCompactingMemStore.smallCellPreUpdateCounter.set(0); 2083 myCompactingMemStore.largeCellPreUpdateCounter.set(0); 2084 2085 final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>(); 2086 Thread smallCellThread = new Thread(() -> { 2087 try { 2088 store.add(smallCell, new NonThreadSafeMemStoreSizing()); 2089 } catch (Throwable exception) { 2090 exceptionRef.set(exception); 2091 } 2092 }); 2093 smallCellThread.setName(MyCompactingMemStore2.SMALL_CELL_THREAD_NAME); 2094 smallCellThread.start(); 2095 2096 String oldThreadName = Thread.currentThread().getName(); 2097 try { 2098 /** 2099 * 1.smallCellThread enters CompactingMemStore.checkAndAddToActiveSize first, then 2100 * largeCellThread enters CompactingMemStore.checkAndAddToActiveSize, and then largeCellThread 2101 * invokes flushInMemory. 2102 * <p/> 2103 * 2. After largeCellThread finished CompactingMemStore.flushInMemory method, smallCellThread 2104 * can add cell to currentActive . That is to say when largeCellThread called flushInMemory 2105 * method, CompactingMemStore.active has no cell. 2106 */ 2107 Thread.currentThread().setName(MyCompactingMemStore2.LARGE_CELL_THREAD_NAME); 2108 store.add(largeCell, new NonThreadSafeMemStoreSizing()); 2109 smallCellThread.join(); 2110 2111 for (int i = 0; i < 100; i++) { 2112 long currentTimestamp = timestamp + 100 + i; 2113 ExtendedCell cell = createCell(qf2, currentTimestamp, seqId, largeValue); 2114 store.add(cell, new NonThreadSafeMemStoreSizing()); 2115 } 2116 } finally { 2117 Thread.currentThread().setName(oldThreadName); 2118 } 2119 2120 assertTrue(exceptionRef.get() == null); 2121 2122 } 2123 2124 // This test is for HBASE-26210, HBase Write be stuck when there is cell which size exceeds 2125 // InmemoryFlushSize 2126 @Test 2127 @Timeout(value = 60000, unit = TimeUnit.MILLISECONDS) 2128 public void testCompactingMemStoreCellExceedInmemoryFlushSize() throws Exception { 2129 Configuration conf = HBaseConfiguration.create(); 2130 conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore6.class.getName()); 2131 2132 init(name, conf, ColumnFamilyDescriptorBuilder.newBuilder(family) 2133 .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); 2134 2135 MyCompactingMemStore6 myCompactingMemStore = ((MyCompactingMemStore6) store.memstore); 2136 2137 int size = (int) (myCompactingMemStore.getInmemoryFlushSize()); 2138 byte[] value = new byte[size + 1]; 2139 2140 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 2141 long timestamp = EnvironmentEdgeManager.currentTime(); 2142 long seqId = 100; 2143 ExtendedCell cell = createCell(qf1, timestamp, seqId, value); 2144 int cellByteSize = MutableSegment.getCellLength(cell); 2145 store.add(cell, memStoreSizing); 2146 assertTrue(memStoreSizing.getCellsCount() == 1); 2147 assertTrue(memStoreSizing.getDataSize() == cellByteSize); 2148 // Waiting the in memory compaction completed, see HBASE-26438 2149 myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await(); 2150 } 2151 2152 /** 2153 * This test is for HBASE-27464, before this JIRA,when init {@link CellChunkImmutableSegment} for 2154 * 'COMPACT' action, we not force copy to current MSLab. When cell size bigger than 2155 * {@link MemStoreLABImpl#maxAlloc}, cell will stay in previous chunk which will recycle after 2156 * segment replace, and we may read wrong data when these chunk reused by others. 2157 */ 2158 @Test 2159 public void testForceCloneOfBigCellForCellChunkImmutableSegment() throws Exception { 2160 Configuration conf = HBaseConfiguration.create(); 2161 int maxAllocByteSize = conf.getInt(MemStoreLAB.MAX_ALLOC_KEY, MemStoreLAB.MAX_ALLOC_DEFAULT); 2162 2163 // Construct big cell,which is large than {@link MemStoreLABImpl#maxAlloc}. 2164 byte[] cellValue = new byte[maxAllocByteSize + 1]; 2165 final long timestamp = EnvironmentEdgeManager.currentTime(); 2166 final long seqId = 100; 2167 final byte[] rowKey1 = Bytes.toBytes("rowKey1"); 2168 final ExtendedCell originalCell1 = createCell(rowKey1, qf1, timestamp, seqId, cellValue); 2169 final byte[] rowKey2 = Bytes.toBytes("rowKey2"); 2170 final ExtendedCell originalCell2 = createCell(rowKey2, qf1, timestamp, seqId, cellValue); 2171 TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); 2172 quals.add(qf1); 2173 2174 int cellByteSize = MutableSegment.getCellLength(originalCell1); 2175 int inMemoryFlushByteSize = cellByteSize - 1; 2176 2177 // set CompactingMemStore.inmemoryFlushSize to flushByteSize. 2178 conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore6.class.getName()); 2179 conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005); 2180 conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(inMemoryFlushByteSize * 200)); 2181 conf.setBoolean(WALFactory.WAL_ENABLED, false); 2182 2183 // Use {@link MemoryCompactionPolicy#EAGER} for always compacting. 2184 init(name, conf, ColumnFamilyDescriptorBuilder.newBuilder(family) 2185 .setInMemoryCompaction(MemoryCompactionPolicy.EAGER).build()); 2186 2187 MyCompactingMemStore6 myCompactingMemStore = ((MyCompactingMemStore6) store.memstore); 2188 assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == inMemoryFlushByteSize); 2189 2190 // Data chunk Pool is disabled. 2191 assertTrue(ChunkCreator.getInstance().getMaxCount(ChunkType.DATA_CHUNK) == 0); 2192 2193 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 2194 2195 // First compact 2196 store.add(originalCell1, memStoreSizing); 2197 // Waiting for the first in-memory compaction finished 2198 myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await(); 2199 2200 StoreScanner storeScanner = 2201 (StoreScanner) store.getScanner(new Scan(new Get(rowKey1)), quals, seqId + 1); 2202 SegmentScanner segmentScanner = getTypeKeyValueScanner(storeScanner, SegmentScanner.class); 2203 ExtendedCell resultCell1 = segmentScanner.next(); 2204 assertTrue(PrivateCellUtil.equals(resultCell1, originalCell1)); 2205 int cell1ChunkId = resultCell1.getChunkId(); 2206 assertTrue(cell1ChunkId != ExtendedCell.CELL_NOT_BASED_ON_CHUNK); 2207 assertNull(segmentScanner.next()); 2208 segmentScanner.close(); 2209 storeScanner.close(); 2210 Segment segment = segmentScanner.segment; 2211 assertTrue(segment instanceof CellChunkImmutableSegment); 2212 MemStoreLABImpl memStoreLAB1 = (MemStoreLABImpl) (segmentScanner.segment.getMemStoreLAB()); 2213 assertTrue(!memStoreLAB1.isClosed()); 2214 assertTrue(!memStoreLAB1.chunks.isEmpty()); 2215 assertTrue(!memStoreLAB1.isReclaimed()); 2216 2217 // Second compact 2218 store.add(originalCell2, memStoreSizing); 2219 // Waiting for the second in-memory compaction finished 2220 myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await(); 2221 2222 // Before HBASE-27464, here may throw java.lang.IllegalArgumentException: In CellChunkMap, cell 2223 // must be associated with chunk.. We were looking for a cell at index 0. 2224 // The cause for this exception is because the data chunk Pool is disabled,when the data chunks 2225 // are recycled after the second in-memory compaction finished,the 2226 // {@link ChunkCreator.putbackChunks} method does not put the chunks back to the data chunk 2227 // pool,it just removes them from {@link ChunkCreator#chunkIdMap},so in 2228 // {@link CellChunkMap#getCell} we could not get the data chunk by chunkId. 2229 storeScanner = (StoreScanner) store.getScanner(new Scan(new Get(rowKey1)), quals, seqId + 1); 2230 segmentScanner = getTypeKeyValueScanner(storeScanner, SegmentScanner.class); 2231 ExtendedCell newResultCell1 = segmentScanner.next(); 2232 assertTrue(newResultCell1 != resultCell1); 2233 assertTrue(PrivateCellUtil.equals(newResultCell1, originalCell1)); 2234 2235 ExtendedCell resultCell2 = segmentScanner.next(); 2236 assertTrue(PrivateCellUtil.equals(resultCell2, originalCell2)); 2237 assertNull(segmentScanner.next()); 2238 segmentScanner.close(); 2239 storeScanner.close(); 2240 2241 segment = segmentScanner.segment; 2242 assertTrue(segment instanceof CellChunkImmutableSegment); 2243 MemStoreLABImpl memStoreLAB2 = (MemStoreLABImpl) (segmentScanner.segment.getMemStoreLAB()); 2244 assertTrue(!memStoreLAB2.isClosed()); 2245 assertTrue(!memStoreLAB2.chunks.isEmpty()); 2246 assertTrue(!memStoreLAB2.isReclaimed()); 2247 assertTrue(memStoreLAB1.isClosed()); 2248 assertTrue(memStoreLAB1.chunks.isEmpty()); 2249 assertTrue(memStoreLAB1.isReclaimed()); 2250 } 2251 2252 // This test is for HBASE-26210 also, test write large cell and small cell concurrently when 2253 // InmemoryFlushSize is smaller,equal with and larger than cell size. 2254 @Test 2255 public void testCompactingMemStoreWriteLargeCellAndSmallCellConcurrently() 2256 throws IOException, InterruptedException { 2257 doWriteTestLargeCellAndSmallCellConcurrently( 2258 (smallCellByteSize, largeCellByteSize) -> largeCellByteSize - 1); 2259 doWriteTestLargeCellAndSmallCellConcurrently( 2260 (smallCellByteSize, largeCellByteSize) -> largeCellByteSize); 2261 doWriteTestLargeCellAndSmallCellConcurrently( 2262 (smallCellByteSize, largeCellByteSize) -> smallCellByteSize + largeCellByteSize - 1); 2263 doWriteTestLargeCellAndSmallCellConcurrently( 2264 (smallCellByteSize, largeCellByteSize) -> smallCellByteSize + largeCellByteSize); 2265 doWriteTestLargeCellAndSmallCellConcurrently( 2266 (smallCellByteSize, largeCellByteSize) -> smallCellByteSize + largeCellByteSize + 1); 2267 } 2268 2269 private void doWriteTestLargeCellAndSmallCellConcurrently(IntBinaryOperator getFlushByteSize) 2270 throws IOException, InterruptedException { 2271 2272 Configuration conf = HBaseConfiguration.create(); 2273 2274 byte[] smallValue = new byte[3]; 2275 byte[] largeValue = new byte[100]; 2276 final long timestamp = EnvironmentEdgeManager.currentTime(); 2277 final long seqId = 100; 2278 final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue); 2279 final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue); 2280 int smallCellByteSize = MutableSegment.getCellLength(smallCell); 2281 int largeCellByteSize = MutableSegment.getCellLength(largeCell); 2282 int flushByteSize = getFlushByteSize.applyAsInt(smallCellByteSize, largeCellByteSize); 2283 boolean flushByteSizeLessThanSmallAndLargeCellSize = 2284 flushByteSize < (smallCellByteSize + largeCellByteSize); 2285 2286 conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore3.class.getName()); 2287 conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005); 2288 conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200)); 2289 2290 init(name, conf, ColumnFamilyDescriptorBuilder.newBuilder(family) 2291 .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); 2292 2293 MyCompactingMemStore3 myCompactingMemStore = ((MyCompactingMemStore3) store.memstore); 2294 assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize); 2295 myCompactingMemStore.disableCompaction(); 2296 if (flushByteSizeLessThanSmallAndLargeCellSize) { 2297 myCompactingMemStore.flushByteSizeLessThanSmallAndLargeCellSize = true; 2298 } else { 2299 myCompactingMemStore.flushByteSizeLessThanSmallAndLargeCellSize = false; 2300 } 2301 2302 final ThreadSafeMemStoreSizing memStoreSizing = new ThreadSafeMemStoreSizing(); 2303 final AtomicLong totalCellByteSize = new AtomicLong(0); 2304 final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>(); 2305 Thread smallCellThread = new Thread(() -> { 2306 try { 2307 for (int i = 1; i <= MyCompactingMemStore3.CELL_COUNT; i++) { 2308 long currentTimestamp = timestamp + i; 2309 ExtendedCell cell = createCell(qf1, currentTimestamp, seqId, smallValue); 2310 totalCellByteSize.addAndGet(MutableSegment.getCellLength(cell)); 2311 store.add(cell, memStoreSizing); 2312 } 2313 } catch (Throwable exception) { 2314 exceptionRef.set(exception); 2315 2316 } 2317 }); 2318 smallCellThread.setName(MyCompactingMemStore3.SMALL_CELL_THREAD_NAME); 2319 smallCellThread.start(); 2320 2321 String oldThreadName = Thread.currentThread().getName(); 2322 try { 2323 /** 2324 * When flushByteSizeLessThanSmallAndLargeCellSize is true: 2325 * </p> 2326 * 1.smallCellThread enters MyCompactingMemStore3.checkAndAddToActiveSize first, then 2327 * largeCellThread enters MyCompactingMemStore3.checkAndAddToActiveSize, and then 2328 * largeCellThread invokes flushInMemory. 2329 * <p/> 2330 * 2. After largeCellThread finished CompactingMemStore.flushInMemory method, smallCellThread 2331 * can run into MyCompactingMemStore3.checkAndAddToActiveSize again. 2332 * <p/> 2333 * When flushByteSizeLessThanSmallAndLargeCellSize is false: smallCellThread and 2334 * largeCellThread concurrently write one cell and wait each other, and then write another 2335 * cell etc. 2336 */ 2337 Thread.currentThread().setName(MyCompactingMemStore3.LARGE_CELL_THREAD_NAME); 2338 for (int i = 1; i <= MyCompactingMemStore3.CELL_COUNT; i++) { 2339 long currentTimestamp = timestamp + i; 2340 ExtendedCell cell = createCell(qf2, currentTimestamp, seqId, largeValue); 2341 totalCellByteSize.addAndGet(MutableSegment.getCellLength(cell)); 2342 store.add(cell, memStoreSizing); 2343 } 2344 smallCellThread.join(); 2345 2346 assertTrue(exceptionRef.get() == null); 2347 assertTrue(memStoreSizing.getCellsCount() == (MyCompactingMemStore3.CELL_COUNT * 2)); 2348 assertTrue(memStoreSizing.getDataSize() == totalCellByteSize.get()); 2349 if (flushByteSizeLessThanSmallAndLargeCellSize) { 2350 assertTrue(myCompactingMemStore.flushCounter.get() == MyCompactingMemStore3.CELL_COUNT); 2351 } else { 2352 assertTrue( 2353 myCompactingMemStore.flushCounter.get() <= (MyCompactingMemStore3.CELL_COUNT - 1)); 2354 } 2355 } finally { 2356 Thread.currentThread().setName(oldThreadName); 2357 } 2358 } 2359 2360 /** 2361 * <pre> 2362 * This test is for HBASE-26384, 2363 * test {@link CompactingMemStore#flattenOneSegment} and {@link CompactingMemStore#snapshot()} 2364 * execute concurrently. 2365 * The threads sequence before HBASE-26384 is(The bug only exists for branch-2,and I add UTs 2366 * for both branch-2 and master): 2367 * 1. The {@link CompactingMemStore} size exceeds 2368 * {@link CompactingMemStore#getInmemoryFlushSize()},the write thread adds a new 2369 * {@link ImmutableSegment} to the head of {@link CompactingMemStore#pipeline},and start a 2370 * in memory compact thread to execute {@link CompactingMemStore#inMemoryCompaction}. 2371 * 2. The in memory compact thread starts and then stopping before 2372 * {@link CompactingMemStore#flattenOneSegment}. 2373 * 3. The snapshot thread starts {@link CompactingMemStore#snapshot} concurrently,after the 2374 * snapshot thread executing {@link CompactingMemStore#getImmutableSegments},the in memory 2375 * compact thread continues. 2376 * Assuming {@link VersionedSegmentsList#version} returned from 2377 * {@link CompactingMemStore#getImmutableSegments} is v. 2378 * 4. The snapshot thread stopping before {@link CompactingMemStore#swapPipelineWithNull}. 2379 * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment}, 2380 * {@link CompactionPipeline#version} is still v. 2381 * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because 2382 * {@link CompactionPipeline#version} is v, {@link CompactingMemStore#swapPipelineWithNull} 2383 * thinks it is successful and continue flushing,but the {@link ImmutableSegment} in 2384 * {@link CompactionPipeline} has changed because 2385 * {@link CompactingMemStore#flattenOneSegment},so the {@link ImmutableSegment} is not 2386 * removed in fact and still remaining in {@link CompactionPipeline}. 2387 * 2388 * After HBASE-26384, the 5-6 step is changed to following, which is expected behavior: 2389 * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment}, 2390 * {@link CompactingMemStore#flattenOneSegment} change {@link CompactionPipeline#version} to 2391 * v+1. 2392 * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because 2393 * {@link CompactionPipeline#version} is v+1, {@link CompactingMemStore#swapPipelineWithNull} 2394 * failed and retry the while loop in {@link CompactingMemStore#pushPipelineToSnapshot} once 2395 * again, because there is no concurrent {@link CompactingMemStore#inMemoryCompaction} now, 2396 * {@link CompactingMemStore#swapPipelineWithNull} succeeds. 2397 * </pre> 2398 */ 2399 @Test 2400 public void testFlattenAndSnapshotCompactingMemStoreConcurrently() throws Exception { 2401 Configuration conf = HBaseConfiguration.create(); 2402 2403 byte[] smallValue = new byte[3]; 2404 byte[] largeValue = new byte[9]; 2405 final long timestamp = EnvironmentEdgeManager.currentTime(); 2406 final long seqId = 100; 2407 final ExtendedCell smallCell = createCell(qf1, timestamp, seqId, smallValue); 2408 final ExtendedCell largeCell = createCell(qf2, timestamp, seqId, largeValue); 2409 int smallCellByteSize = MutableSegment.getCellLength(smallCell); 2410 int largeCellByteSize = MutableSegment.getCellLength(largeCell); 2411 int totalCellByteSize = (smallCellByteSize + largeCellByteSize); 2412 int flushByteSize = totalCellByteSize - 2; 2413 2414 // set CompactingMemStore.inmemoryFlushSize to flushByteSize. 2415 conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore4.class.getName()); 2416 conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005); 2417 conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200)); 2418 2419 init(name, conf, ColumnFamilyDescriptorBuilder.newBuilder(family) 2420 .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); 2421 2422 MyCompactingMemStore4 myCompactingMemStore = ((MyCompactingMemStore4) store.memstore); 2423 assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize); 2424 2425 store.add(smallCell, new NonThreadSafeMemStoreSizing()); 2426 store.add(largeCell, new NonThreadSafeMemStoreSizing()); 2427 2428 String oldThreadName = Thread.currentThread().getName(); 2429 try { 2430 Thread.currentThread().setName(MyCompactingMemStore4.TAKE_SNAPSHOT_THREAD_NAME); 2431 /** 2432 * {@link CompactingMemStore#snapshot} must wait the in memory compact thread enters 2433 * {@link CompactingMemStore#flattenOneSegment},because {@link CompactingMemStore#snapshot} 2434 * would invoke {@link CompactingMemStore#stopCompaction}. 2435 */ 2436 myCompactingMemStore.snapShotStartCyclicCyclicBarrier.await(); 2437 2438 MemStoreSnapshot memStoreSnapshot = myCompactingMemStore.snapshot(); 2439 myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await(); 2440 2441 assertTrue(memStoreSnapshot.getCellsCount() == 2); 2442 assertTrue(((int) (memStoreSnapshot.getDataSize())) == totalCellByteSize); 2443 VersionedSegmentsList segments = myCompactingMemStore.getImmutableSegments(); 2444 assertTrue(segments.getNumOfSegments() == 0); 2445 assertTrue(segments.getNumOfCells() == 0); 2446 assertTrue(myCompactingMemStore.setInMemoryCompactionFlagCounter.get() == 1); 2447 assertTrue(myCompactingMemStore.swapPipelineWithNullCounter.get() == 2); 2448 } finally { 2449 Thread.currentThread().setName(oldThreadName); 2450 } 2451 } 2452 2453 /** 2454 * <pre> 2455 * This test is for HBASE-26384, 2456 * test {@link CompactingMemStore#flattenOneSegment}{@link CompactingMemStore#snapshot()} 2457 * and writeMemStore execute concurrently. 2458 * The threads sequence before HBASE-26384 is(The bug only exists for branch-2,and I add UTs 2459 * for both branch-2 and master): 2460 * 1. The {@link CompactingMemStore} size exceeds 2461 * {@link CompactingMemStore#getInmemoryFlushSize()},the write thread adds a new 2462 * {@link ImmutableSegment} to the head of {@link CompactingMemStore#pipeline},and start a 2463 * in memory compact thread to execute {@link CompactingMemStore#inMemoryCompaction}. 2464 * 2. The in memory compact thread starts and then stopping before 2465 * {@link CompactingMemStore#flattenOneSegment}. 2466 * 3. The snapshot thread starts {@link CompactingMemStore#snapshot} concurrently,after the 2467 * snapshot thread executing {@link CompactingMemStore#getImmutableSegments},the in memory 2468 * compact thread continues. 2469 * Assuming {@link VersionedSegmentsList#version} returned from 2470 * {@link CompactingMemStore#getImmutableSegments} is v. 2471 * 4. The snapshot thread stopping before {@link CompactingMemStore#swapPipelineWithNull}. 2472 * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment}, 2473 * {@link CompactionPipeline#version} is still v. 2474 * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because 2475 * {@link CompactionPipeline#version} is v, {@link CompactingMemStore#swapPipelineWithNull} 2476 * thinks it is successful and continue flushing,but the {@link ImmutableSegment} in 2477 * {@link CompactionPipeline} has changed because 2478 * {@link CompactingMemStore#flattenOneSegment},so the {@link ImmutableSegment} is not 2479 * removed in fact and still remaining in {@link CompactionPipeline}. 2480 * 2481 * After HBASE-26384, the 5-6 step is changed to following, which is expected behavior, 2482 * and I add step 7-8 to test there is new segment added before retry. 2483 * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment}, 2484 * {@link CompactingMemStore#flattenOneSegment} change {@link CompactionPipeline#version} to 2485 * v+1. 2486 * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because 2487 * {@link CompactionPipeline#version} is v+1, {@link CompactingMemStore#swapPipelineWithNull} 2488 * failed and retry,{@link VersionedSegmentsList#version} returned from 2489 * {@link CompactingMemStore#getImmutableSegments} is v+1. 2490 * 7. The write thread continues writing to {@link CompactingMemStore} and 2491 * {@link CompactingMemStore} size exceeds {@link CompactingMemStore#getInmemoryFlushSize()}, 2492 * {@link CompactingMemStore#flushInMemory(MutableSegment)} is called and a new 2493 * {@link ImmutableSegment} is added to the head of {@link CompactingMemStore#pipeline}, 2494 * {@link CompactionPipeline#version} is still v+1. 2495 * 8. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because 2496 * {@link CompactionPipeline#version} is still v+1, 2497 * {@link CompactingMemStore#swapPipelineWithNull} succeeds.The new {@link ImmutableSegment} 2498 * remained at the head of {@link CompactingMemStore#pipeline},the old is removed by 2499 * {@link CompactingMemStore#swapPipelineWithNull}. 2500 * </pre> 2501 */ 2502 @Test 2503 public void testFlattenSnapshotWriteCompactingMemeStoreConcurrently() throws Exception { 2504 Configuration conf = HBaseConfiguration.create(); 2505 2506 byte[] smallValue = new byte[3]; 2507 byte[] largeValue = new byte[9]; 2508 final long timestamp = EnvironmentEdgeManager.currentTime(); 2509 final long seqId = 100; 2510 final ExtendedCell smallCell = createCell(qf1, timestamp, seqId, smallValue); 2511 final ExtendedCell largeCell = createCell(qf2, timestamp, seqId, largeValue); 2512 int smallCellByteSize = MutableSegment.getCellLength(smallCell); 2513 int largeCellByteSize = MutableSegment.getCellLength(largeCell); 2514 int firstWriteCellByteSize = (smallCellByteSize + largeCellByteSize); 2515 int flushByteSize = firstWriteCellByteSize - 2; 2516 2517 // set CompactingMemStore.inmemoryFlushSize to flushByteSize. 2518 conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore5.class.getName()); 2519 conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005); 2520 conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200)); 2521 2522 init(name, conf, ColumnFamilyDescriptorBuilder.newBuilder(family) 2523 .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); 2524 2525 final MyCompactingMemStore5 myCompactingMemStore = ((MyCompactingMemStore5) store.memstore); 2526 assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize); 2527 2528 store.add(smallCell, new NonThreadSafeMemStoreSizing()); 2529 store.add(largeCell, new NonThreadSafeMemStoreSizing()); 2530 2531 final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>(); 2532 final ExtendedCell writeAgainCell1 = createCell(qf3, timestamp, seqId + 1, largeValue); 2533 final ExtendedCell writeAgainCell2 = createCell(qf4, timestamp, seqId + 1, largeValue); 2534 final int writeAgainCellByteSize = 2535 MutableSegment.getCellLength(writeAgainCell1) + MutableSegment.getCellLength(writeAgainCell2); 2536 final Thread writeAgainThread = new Thread(() -> { 2537 try { 2538 myCompactingMemStore.writeMemStoreAgainStartCyclicBarrier.await(); 2539 2540 store.add(writeAgainCell1, new NonThreadSafeMemStoreSizing()); 2541 store.add(writeAgainCell2, new NonThreadSafeMemStoreSizing()); 2542 2543 myCompactingMemStore.writeMemStoreAgainEndCyclicBarrier.await(); 2544 } catch (Throwable exception) { 2545 exceptionRef.set(exception); 2546 } 2547 }); 2548 writeAgainThread.setName(MyCompactingMemStore5.WRITE_AGAIN_THREAD_NAME); 2549 writeAgainThread.start(); 2550 2551 String oldThreadName = Thread.currentThread().getName(); 2552 try { 2553 Thread.currentThread().setName(MyCompactingMemStore5.TAKE_SNAPSHOT_THREAD_NAME); 2554 /** 2555 * {@link CompactingMemStore#snapshot} must wait the in memory compact thread enters 2556 * {@link CompactingMemStore#flattenOneSegment},because {@link CompactingMemStore#snapshot} 2557 * would invoke {@link CompactingMemStore#stopCompaction}. 2558 */ 2559 myCompactingMemStore.snapShotStartCyclicCyclicBarrier.await(); 2560 MemStoreSnapshot memStoreSnapshot = myCompactingMemStore.snapshot(); 2561 myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await(); 2562 writeAgainThread.join(); 2563 2564 assertTrue(memStoreSnapshot.getCellsCount() == 2); 2565 assertTrue(((int) (memStoreSnapshot.getDataSize())) == firstWriteCellByteSize); 2566 VersionedSegmentsList segments = myCompactingMemStore.getImmutableSegments(); 2567 assertTrue(segments.getNumOfSegments() == 1); 2568 assertTrue( 2569 ((int) (segments.getStoreSegments().get(0).getDataSize())) == writeAgainCellByteSize); 2570 assertTrue(segments.getNumOfCells() == 2); 2571 assertTrue(myCompactingMemStore.setInMemoryCompactionFlagCounter.get() == 2); 2572 assertTrue(exceptionRef.get() == null); 2573 assertTrue(myCompactingMemStore.swapPipelineWithNullCounter.get() == 2); 2574 } finally { 2575 Thread.currentThread().setName(oldThreadName); 2576 } 2577 } 2578 2579 /** 2580 * <pre> 2581 * This test is for HBASE-26465, 2582 * test {@link DefaultMemStore#clearSnapshot} and {@link DefaultMemStore#getScanners} execute 2583 * concurrently. The threads sequence before HBASE-26465 is: 2584 * 1.The flush thread starts {@link DefaultMemStore} flushing after some cells have be added to 2585 * {@link DefaultMemStore}. 2586 * 2.The flush thread stopping before {@link DefaultMemStore#clearSnapshot} in 2587 * {@link HStore#updateStorefiles} after completed flushing memStore to hfile. 2588 * 3.The scan thread starts and stopping after {@link DefaultMemStore#getSnapshotSegments} in 2589 * {@link DefaultMemStore#getScanners},here the scan thread gets the 2590 * {@link DefaultMemStore#snapshot} which is created by the flush thread. 2591 * 4.The flush thread continues {@link DefaultMemStore#clearSnapshot} and close 2592 * {@link DefaultMemStore#snapshot},because the reference count of the corresponding 2593 * {@link MemStoreLABImpl} is 0, the {@link Chunk}s in corresponding {@link MemStoreLABImpl} 2594 * are recycled. 2595 * 5.The scan thread continues {@link DefaultMemStore#getScanners},and create a 2596 * {@link SegmentScanner} for this {@link DefaultMemStore#snapshot}, and increase the 2597 * reference count of the corresponding {@link MemStoreLABImpl}, but {@link Chunk}s in 2598 * corresponding {@link MemStoreLABImpl} are recycled by step 4, and these {@link Chunk}s may 2599 * be overwritten by other write threads,which may cause serious problem. 2600 * After HBASE-26465,{@link DefaultMemStore#getScanners} and 2601 * {@link DefaultMemStore#clearSnapshot} could not execute concurrently. 2602 * </pre> 2603 */ 2604 @Test 2605 public void testClearSnapshotGetScannerConcurrently() throws Exception { 2606 Configuration conf = HBaseConfiguration.create(); 2607 2608 byte[] smallValue = new byte[3]; 2609 byte[] largeValue = new byte[9]; 2610 final long timestamp = EnvironmentEdgeManager.currentTime(); 2611 final long seqId = 100; 2612 final ExtendedCell smallCell = createCell(qf1, timestamp, seqId, smallValue); 2613 final ExtendedCell largeCell = createCell(qf2, timestamp, seqId, largeValue); 2614 TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); 2615 quals.add(qf1); 2616 quals.add(qf2); 2617 2618 conf.set(HStore.MEMSTORE_CLASS_NAME, MyDefaultMemStore.class.getName()); 2619 conf.setBoolean(WALFactory.WAL_ENABLED, false); 2620 2621 init(name, conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build()); 2622 MyDefaultMemStore myDefaultMemStore = (MyDefaultMemStore) (store.memstore); 2623 myDefaultMemStore.store = store; 2624 2625 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 2626 store.add(smallCell, memStoreSizing); 2627 store.add(largeCell, memStoreSizing); 2628 2629 final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>(); 2630 final Thread flushThread = new Thread(() -> { 2631 try { 2632 flushStore(store, id++); 2633 } catch (Throwable exception) { 2634 exceptionRef.set(exception); 2635 } 2636 }); 2637 flushThread.setName(MyDefaultMemStore.FLUSH_THREAD_NAME); 2638 flushThread.start(); 2639 2640 String oldThreadName = Thread.currentThread().getName(); 2641 StoreScanner storeScanner = null; 2642 try { 2643 Thread.currentThread().setName(MyDefaultMemStore.GET_SCANNER_THREAD_NAME); 2644 2645 /** 2646 * Wait flush thread stopping before {@link DefaultMemStore#doClearSnapshot} 2647 */ 2648 myDefaultMemStore.getScannerCyclicBarrier.await(); 2649 2650 storeScanner = (StoreScanner) store.getScanner(new Scan(new Get(row)), quals, seqId + 1); 2651 flushThread.join(); 2652 2653 if (myDefaultMemStore.shouldWait) { 2654 SegmentScanner segmentScanner = getTypeKeyValueScanner(storeScanner, SegmentScanner.class); 2655 MemStoreLABImpl memStoreLAB = (MemStoreLABImpl) (segmentScanner.segment.getMemStoreLAB()); 2656 assertTrue(memStoreLAB.isClosed()); 2657 assertTrue(!memStoreLAB.chunks.isEmpty()); 2658 assertTrue(!memStoreLAB.isReclaimed()); 2659 2660 ExtendedCell cell1 = segmentScanner.next(); 2661 PrivateCellUtil.equals(smallCell, cell1); 2662 ExtendedCell cell2 = segmentScanner.next(); 2663 PrivateCellUtil.equals(largeCell, cell2); 2664 assertNull(segmentScanner.next()); 2665 } else { 2666 List<ExtendedCell> results = new ArrayList<>(); 2667 storeScanner.next(results); 2668 assertEquals(2, results.size()); 2669 PrivateCellUtil.equals(smallCell, results.get(0)); 2670 PrivateCellUtil.equals(largeCell, results.get(1)); 2671 } 2672 assertTrue(exceptionRef.get() == null); 2673 } finally { 2674 if (storeScanner != null) { 2675 storeScanner.close(); 2676 } 2677 Thread.currentThread().setName(oldThreadName); 2678 } 2679 } 2680 2681 @SuppressWarnings("unchecked") 2682 private <T> T getTypeKeyValueScanner(StoreScanner storeScanner, Class<T> keyValueScannerClass) { 2683 List<T> resultScanners = new ArrayList<T>(); 2684 for (KeyValueScanner keyValueScanner : storeScanner.currentScanners) { 2685 if (keyValueScannerClass.isInstance(keyValueScanner)) { 2686 resultScanners.add((T) keyValueScanner); 2687 } 2688 } 2689 assertTrue(resultScanners.size() == 1); 2690 return resultScanners.get(0); 2691 } 2692 2693 @Test 2694 public void testOnConfigurationChange() throws IOException { 2695 final int COMMON_MAX_FILES_TO_COMPACT = 10; 2696 final int NEW_COMMON_MAX_FILES_TO_COMPACT = 8; 2697 final int STORE_MAX_FILES_TO_COMPACT = 6; 2698 2699 // Build a table that its maxFileToCompact different from common configuration. 2700 Configuration conf = HBaseConfiguration.create(); 2701 conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 2702 COMMON_MAX_FILES_TO_COMPACT); 2703 conf.setBoolean(CACHE_DATA_ON_READ_KEY, false); 2704 conf.setBoolean(CACHE_BLOCKS_ON_WRITE_KEY, true); 2705 conf.setBoolean(EVICT_BLOCKS_ON_CLOSE_KEY, true); 2706 ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.newBuilder(family) 2707 .setConfiguration(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 2708 String.valueOf(STORE_MAX_FILES_TO_COMPACT)) 2709 .build(); 2710 init(name, conf, hcd); 2711 2712 // After updating common configuration, the conf in HStore itself must not be changed. 2713 conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 2714 NEW_COMMON_MAX_FILES_TO_COMPACT); 2715 this.store.onConfigurationChange(conf); 2716 2717 assertEquals(STORE_MAX_FILES_TO_COMPACT, 2718 store.getStoreEngine().getCompactionPolicy().getConf().getMaxFilesToCompact()); 2719 2720 assertEquals(conf.getBoolean(CACHE_DATA_ON_READ_KEY, DEFAULT_CACHE_DATA_ON_READ), false); 2721 assertEquals(conf.getBoolean(CACHE_BLOCKS_ON_WRITE_KEY, DEFAULT_CACHE_DATA_ON_WRITE), true); 2722 assertEquals(conf.getBoolean(EVICT_BLOCKS_ON_CLOSE_KEY, DEFAULT_EVICT_ON_CLOSE), true); 2723 2724 // reset to default values 2725 conf.getBoolean(CACHE_DATA_ON_READ_KEY, DEFAULT_CACHE_DATA_ON_READ); 2726 conf.getBoolean(CACHE_BLOCKS_ON_WRITE_KEY, DEFAULT_CACHE_DATA_ON_WRITE); 2727 conf.getBoolean(EVICT_BLOCKS_ON_CLOSE_KEY, DEFAULT_EVICT_ON_CLOSE); 2728 this.store.onConfigurationChange(conf); 2729 } 2730 2731 /** 2732 * This test is for HBASE-26476 2733 */ 2734 @Test 2735 public void testExtendsDefaultMemStore() throws Exception { 2736 Configuration conf = HBaseConfiguration.create(); 2737 conf.setBoolean(WALFactory.WAL_ENABLED, false); 2738 2739 init(name, conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build()); 2740 assertTrue(this.store.memstore.getClass() == DefaultMemStore.class); 2741 tearDown(); 2742 2743 conf.set(HStore.MEMSTORE_CLASS_NAME, CustomDefaultMemStore.class.getName()); 2744 init(name, conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build()); 2745 assertTrue(this.store.memstore.getClass() == CustomDefaultMemStore.class); 2746 } 2747 2748 static class CustomDefaultMemStore extends DefaultMemStore { 2749 2750 public CustomDefaultMemStore(Configuration conf, CellComparator c, 2751 RegionServicesForStores regionServices) { 2752 super(conf, c, regionServices); 2753 } 2754 2755 } 2756 2757 /** 2758 * This test is for HBASE-26488 2759 */ 2760 @Test 2761 public void testMemoryLeakWhenFlushMemStoreRetrying() throws Exception { 2762 Configuration conf = HBaseConfiguration.create(); 2763 2764 byte[] smallValue = new byte[3]; 2765 byte[] largeValue = new byte[9]; 2766 final long timestamp = EnvironmentEdgeManager.currentTime(); 2767 final long seqId = 100; 2768 final ExtendedCell smallCell = createCell(qf1, timestamp, seqId, smallValue); 2769 final ExtendedCell largeCell = createCell(qf2, timestamp, seqId, largeValue); 2770 TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); 2771 quals.add(qf1); 2772 quals.add(qf2); 2773 2774 conf.set(HStore.MEMSTORE_CLASS_NAME, MyDefaultMemStore1.class.getName()); 2775 conf.setBoolean(WALFactory.WAL_ENABLED, false); 2776 conf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY, 2777 MyDefaultStoreFlusher.class.getName()); 2778 2779 init(name, conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build()); 2780 MyDefaultMemStore1 myDefaultMemStore = (MyDefaultMemStore1) (store.memstore); 2781 assertTrue((store.storeEngine.getStoreFlusher()) instanceof MyDefaultStoreFlusher); 2782 2783 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 2784 store.add(smallCell, memStoreSizing); 2785 store.add(largeCell, memStoreSizing); 2786 flushStore(store, id++); 2787 2788 MemStoreLABImpl memStoreLAB = 2789 (MemStoreLABImpl) (myDefaultMemStore.snapshotImmutableSegment.getMemStoreLAB()); 2790 assertTrue(memStoreLAB.isClosed()); 2791 assertTrue(memStoreLAB.getRefCntValue() == 0); 2792 assertTrue(memStoreLAB.isReclaimed()); 2793 assertTrue(memStoreLAB.chunks.isEmpty()); 2794 StoreScanner storeScanner = null; 2795 try { 2796 storeScanner = (StoreScanner) store.getScanner(new Scan(new Get(row)), quals, seqId + 1); 2797 assertTrue(store.storeEngine.getStoreFileManager().getStorefileCount() == 1); 2798 assertTrue(store.memstore.size().getCellsCount() == 0); 2799 assertTrue(store.memstore.getSnapshotSize().getCellsCount() == 0); 2800 assertTrue(storeScanner.currentScanners.size() == 1); 2801 assertTrue(storeScanner.currentScanners.get(0) instanceof StoreFileScanner); 2802 2803 List<ExtendedCell> results = new ArrayList<>(); 2804 storeScanner.next(results); 2805 assertEquals(2, results.size()); 2806 PrivateCellUtil.equals(smallCell, results.get(0)); 2807 PrivateCellUtil.equals(largeCell, results.get(1)); 2808 } finally { 2809 if (storeScanner != null) { 2810 storeScanner.close(); 2811 } 2812 } 2813 } 2814 2815 static class MyDefaultMemStore1 extends DefaultMemStore { 2816 2817 private ImmutableSegment snapshotImmutableSegment; 2818 2819 public MyDefaultMemStore1(Configuration conf, CellComparator c, 2820 RegionServicesForStores regionServices) { 2821 super(conf, c, regionServices); 2822 } 2823 2824 @Override 2825 public MemStoreSnapshot snapshot() { 2826 MemStoreSnapshot result = super.snapshot(); 2827 this.snapshotImmutableSegment = snapshot; 2828 return result; 2829 } 2830 2831 } 2832 2833 public static class MyDefaultStoreFlusher extends DefaultStoreFlusher { 2834 private static final AtomicInteger failCounter = new AtomicInteger(1); 2835 private static final AtomicInteger counter = new AtomicInteger(0); 2836 2837 public MyDefaultStoreFlusher(Configuration conf, HStore store) { 2838 super(conf, store); 2839 } 2840 2841 @Override 2842 public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId, 2843 MonitoredTask status, ThroughputController throughputController, 2844 FlushLifeCycleTracker tracker, Consumer<Path> writerCreationTracker) throws IOException { 2845 counter.incrementAndGet(); 2846 return super.flushSnapshot(snapshot, cacheFlushId, status, throughputController, tracker, 2847 writerCreationTracker); 2848 } 2849 2850 @Override 2851 protected void performFlush(InternalScanner scanner, final CellSink sink, 2852 ThroughputController throughputController) throws IOException { 2853 2854 final int currentCount = counter.get(); 2855 CellSink newCellSink = (cell) -> { 2856 if (currentCount <= failCounter.get()) { 2857 throw new IOException("Simulated exception by tests"); 2858 } 2859 sink.append(cell); 2860 }; 2861 super.performFlush(scanner, newCellSink, throughputController); 2862 } 2863 } 2864 2865 /** 2866 * This test is for HBASE-26494, test the {@link RefCnt} behaviors in {@link ImmutableMemStoreLAB} 2867 */ 2868 @Test 2869 public void testImmutableMemStoreLABRefCnt() throws Exception { 2870 Configuration conf = HBaseConfiguration.create(); 2871 2872 byte[] smallValue = new byte[3]; 2873 byte[] largeValue = new byte[9]; 2874 final long timestamp = EnvironmentEdgeManager.currentTime(); 2875 final long seqId = 100; 2876 final ExtendedCell smallCell1 = createCell(qf1, timestamp, seqId, smallValue); 2877 final ExtendedCell largeCell1 = createCell(qf2, timestamp, seqId, largeValue); 2878 final ExtendedCell smallCell2 = createCell(qf3, timestamp, seqId + 1, smallValue); 2879 final ExtendedCell largeCell2 = createCell(qf4, timestamp, seqId + 1, largeValue); 2880 final ExtendedCell smallCell3 = createCell(qf5, timestamp, seqId + 2, smallValue); 2881 final ExtendedCell largeCell3 = createCell(qf6, timestamp, seqId + 2, largeValue); 2882 2883 int smallCellByteSize = MutableSegment.getCellLength(smallCell1); 2884 int largeCellByteSize = MutableSegment.getCellLength(largeCell1); 2885 int firstWriteCellByteSize = (smallCellByteSize + largeCellByteSize); 2886 int flushByteSize = firstWriteCellByteSize - 2; 2887 2888 // set CompactingMemStore.inmemoryFlushSize to flushByteSize. 2889 conf.set(HStore.MEMSTORE_CLASS_NAME, CompactingMemStore.class.getName()); 2890 conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005); 2891 conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200)); 2892 conf.setBoolean(WALFactory.WAL_ENABLED, false); 2893 2894 init(name, conf, ColumnFamilyDescriptorBuilder.newBuilder(family) 2895 .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); 2896 2897 final CompactingMemStore myCompactingMemStore = ((CompactingMemStore) store.memstore); 2898 assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize); 2899 myCompactingMemStore.allowCompaction.set(false); 2900 2901 NonThreadSafeMemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 2902 store.add(smallCell1, memStoreSizing); 2903 store.add(largeCell1, memStoreSizing); 2904 store.add(smallCell2, memStoreSizing); 2905 store.add(largeCell2, memStoreSizing); 2906 store.add(smallCell3, memStoreSizing); 2907 store.add(largeCell3, memStoreSizing); 2908 VersionedSegmentsList versionedSegmentsList = myCompactingMemStore.getImmutableSegments(); 2909 assertTrue(versionedSegmentsList.getNumOfSegments() == 3); 2910 List<ImmutableSegment> segments = versionedSegmentsList.getStoreSegments(); 2911 List<MemStoreLABImpl> memStoreLABs = new ArrayList<MemStoreLABImpl>(segments.size()); 2912 for (ImmutableSegment segment : segments) { 2913 memStoreLABs.add((MemStoreLABImpl) segment.getMemStoreLAB()); 2914 } 2915 List<KeyValueScanner> scanners1 = myCompactingMemStore.getScanners(Long.MAX_VALUE); 2916 for (MemStoreLABImpl memStoreLAB : memStoreLABs) { 2917 assertTrue(memStoreLAB.getRefCntValue() == 2); 2918 } 2919 2920 myCompactingMemStore.allowCompaction.set(true); 2921 myCompactingMemStore.flushInMemory(); 2922 2923 versionedSegmentsList = myCompactingMemStore.getImmutableSegments(); 2924 assertTrue(versionedSegmentsList.getNumOfSegments() == 1); 2925 ImmutableMemStoreLAB immutableMemStoreLAB = 2926 (ImmutableMemStoreLAB) (versionedSegmentsList.getStoreSegments().get(0).getMemStoreLAB()); 2927 for (MemStoreLABImpl memStoreLAB : memStoreLABs) { 2928 assertTrue(memStoreLAB.getRefCntValue() == 2); 2929 } 2930 2931 List<KeyValueScanner> scanners2 = myCompactingMemStore.getScanners(Long.MAX_VALUE); 2932 for (MemStoreLABImpl memStoreLAB : memStoreLABs) { 2933 assertTrue(memStoreLAB.getRefCntValue() == 2); 2934 } 2935 assertTrue(immutableMemStoreLAB.getRefCntValue() == 2); 2936 for (KeyValueScanner scanner : scanners1) { 2937 scanner.close(); 2938 } 2939 for (MemStoreLABImpl memStoreLAB : memStoreLABs) { 2940 assertTrue(memStoreLAB.getRefCntValue() == 1); 2941 } 2942 for (KeyValueScanner scanner : scanners2) { 2943 scanner.close(); 2944 } 2945 for (MemStoreLABImpl memStoreLAB : memStoreLABs) { 2946 assertTrue(memStoreLAB.getRefCntValue() == 1); 2947 } 2948 assertTrue(immutableMemStoreLAB.getRefCntValue() == 1); 2949 flushStore(store, id++); 2950 for (MemStoreLABImpl memStoreLAB : memStoreLABs) { 2951 assertTrue(memStoreLAB.getRefCntValue() == 0); 2952 } 2953 assertTrue(immutableMemStoreLAB.getRefCntValue() == 0); 2954 assertTrue(immutableMemStoreLAB.isClosed()); 2955 for (MemStoreLABImpl memStoreLAB : memStoreLABs) { 2956 assertTrue(memStoreLAB.isClosed()); 2957 assertTrue(memStoreLAB.isReclaimed()); 2958 assertTrue(memStoreLAB.chunks.isEmpty()); 2959 } 2960 } 2961 2962 private HStoreFile mockStoreFileWithLength(long length) { 2963 HStoreFile sf = mock(HStoreFile.class); 2964 StoreFileReader sfr = mock(StoreFileReader.class); 2965 when(sf.isHFile()).thenReturn(true); 2966 when(sf.getReader()).thenReturn(sfr); 2967 when(sfr.length()).thenReturn(length); 2968 return sf; 2969 } 2970 2971 private static class MyThread extends Thread { 2972 private StoreScanner scanner; 2973 private KeyValueHeap heap; 2974 2975 public MyThread(StoreScanner scanner) { 2976 this.scanner = scanner; 2977 } 2978 2979 public KeyValueHeap getHeap() { 2980 return this.heap; 2981 } 2982 2983 @Override 2984 public void run() { 2985 scanner.trySwitchToStreamRead(); 2986 heap = scanner.heap; 2987 } 2988 } 2989 2990 private static class MyMemStoreCompactor extends MemStoreCompactor { 2991 private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0); 2992 private static final CountDownLatch START_COMPACTOR_LATCH = new CountDownLatch(1); 2993 2994 public MyMemStoreCompactor(CompactingMemStore compactingMemStore, 2995 MemoryCompactionPolicy compactionPolicy) throws IllegalArgumentIOException { 2996 super(compactingMemStore, compactionPolicy); 2997 } 2998 2999 @Override 3000 public boolean start() throws IOException { 3001 boolean isFirst = RUNNER_COUNT.getAndIncrement() == 0; 3002 if (isFirst) { 3003 try { 3004 START_COMPACTOR_LATCH.await(); 3005 return super.start(); 3006 } catch (InterruptedException ex) { 3007 throw new RuntimeException(ex); 3008 } 3009 } 3010 return super.start(); 3011 } 3012 } 3013 3014 public static class MyCompactingMemStoreWithCustomCompactor extends CompactingMemStore { 3015 private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0); 3016 3017 public MyCompactingMemStoreWithCustomCompactor(Configuration conf, CellComparatorImpl c, 3018 HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy) 3019 throws IOException { 3020 super(conf, c, store, regionServices, compactionPolicy); 3021 } 3022 3023 @Override 3024 protected MemStoreCompactor createMemStoreCompactor(MemoryCompactionPolicy compactionPolicy) 3025 throws IllegalArgumentIOException { 3026 return new MyMemStoreCompactor(this, compactionPolicy); 3027 } 3028 3029 @Override 3030 protected boolean setInMemoryCompactionFlag() { 3031 boolean rval = super.setInMemoryCompactionFlag(); 3032 if (rval) { 3033 RUNNER_COUNT.incrementAndGet(); 3034 if (LOG.isDebugEnabled()) { 3035 LOG.debug("runner count: " + RUNNER_COUNT.get()); 3036 } 3037 } 3038 return rval; 3039 } 3040 } 3041 3042 public static class MyCompactingMemStore extends CompactingMemStore { 3043 private static final AtomicBoolean START_TEST = new AtomicBoolean(false); 3044 private final CountDownLatch getScannerLatch = new CountDownLatch(1); 3045 private final CountDownLatch snapshotLatch = new CountDownLatch(1); 3046 3047 public MyCompactingMemStore(Configuration conf, CellComparatorImpl c, HStore store, 3048 RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy) 3049 throws IOException { 3050 super(conf, c, store, regionServices, compactionPolicy); 3051 } 3052 3053 @Override 3054 protected List<KeyValueScanner> createList(int capacity) { 3055 if (START_TEST.get()) { 3056 try { 3057 getScannerLatch.countDown(); 3058 snapshotLatch.await(); 3059 } catch (InterruptedException e) { 3060 throw new RuntimeException(e); 3061 } 3062 } 3063 return new ArrayList<>(capacity); 3064 } 3065 3066 @Override 3067 protected void pushActiveToPipeline(MutableSegment active, boolean checkEmpty) { 3068 if (START_TEST.get()) { 3069 try { 3070 getScannerLatch.await(); 3071 } catch (InterruptedException e) { 3072 throw new RuntimeException(e); 3073 } 3074 } 3075 3076 super.pushActiveToPipeline(active, checkEmpty); 3077 if (START_TEST.get()) { 3078 snapshotLatch.countDown(); 3079 } 3080 } 3081 } 3082 3083 interface MyListHook { 3084 void hook(int currentSize); 3085 } 3086 3087 private static class MyList<T> implements List<T> { 3088 private final List<T> delegatee = new ArrayList<>(); 3089 private final MyListHook hookAtAdd; 3090 3091 MyList(final MyListHook hookAtAdd) { 3092 this.hookAtAdd = hookAtAdd; 3093 } 3094 3095 @Override 3096 public int size() { 3097 return delegatee.size(); 3098 } 3099 3100 @Override 3101 public boolean isEmpty() { 3102 return delegatee.isEmpty(); 3103 } 3104 3105 @Override 3106 public boolean contains(Object o) { 3107 return delegatee.contains(o); 3108 } 3109 3110 @Override 3111 public Iterator<T> iterator() { 3112 return delegatee.iterator(); 3113 } 3114 3115 @Override 3116 public Object[] toArray() { 3117 return delegatee.toArray(); 3118 } 3119 3120 @Override 3121 public <R> R[] toArray(R[] a) { 3122 return delegatee.toArray(a); 3123 } 3124 3125 @Override 3126 public boolean add(T e) { 3127 hookAtAdd.hook(size()); 3128 return delegatee.add(e); 3129 } 3130 3131 @Override 3132 public boolean remove(Object o) { 3133 return delegatee.remove(o); 3134 } 3135 3136 @Override 3137 public boolean containsAll(Collection<?> c) { 3138 return delegatee.containsAll(c); 3139 } 3140 3141 @Override 3142 public boolean addAll(Collection<? extends T> c) { 3143 return delegatee.addAll(c); 3144 } 3145 3146 @Override 3147 public boolean addAll(int index, Collection<? extends T> c) { 3148 return delegatee.addAll(index, c); 3149 } 3150 3151 @Override 3152 public boolean removeAll(Collection<?> c) { 3153 return delegatee.removeAll(c); 3154 } 3155 3156 @Override 3157 public boolean retainAll(Collection<?> c) { 3158 return delegatee.retainAll(c); 3159 } 3160 3161 @Override 3162 public void clear() { 3163 delegatee.clear(); 3164 } 3165 3166 @Override 3167 public T get(int index) { 3168 return delegatee.get(index); 3169 } 3170 3171 @Override 3172 public T set(int index, T element) { 3173 return delegatee.set(index, element); 3174 } 3175 3176 @Override 3177 public void add(int index, T element) { 3178 delegatee.add(index, element); 3179 } 3180 3181 @Override 3182 public T remove(int index) { 3183 return delegatee.remove(index); 3184 } 3185 3186 @Override 3187 public int indexOf(Object o) { 3188 return delegatee.indexOf(o); 3189 } 3190 3191 @Override 3192 public int lastIndexOf(Object o) { 3193 return delegatee.lastIndexOf(o); 3194 } 3195 3196 @Override 3197 public ListIterator<T> listIterator() { 3198 return delegatee.listIterator(); 3199 } 3200 3201 @Override 3202 public ListIterator<T> listIterator(int index) { 3203 return delegatee.listIterator(index); 3204 } 3205 3206 @Override 3207 public List<T> subList(int fromIndex, int toIndex) { 3208 return delegatee.subList(fromIndex, toIndex); 3209 } 3210 } 3211 3212 private interface MyKeyValueHeapHook { 3213 void onRecordBlockSize(int recordBlockSizeCallCount); 3214 } 3215 3216 private static class MyKeyValueHeap extends KeyValueHeap { 3217 private final MyKeyValueHeapHook hook; 3218 private int recordBlockSizeCallCount; 3219 3220 public MyKeyValueHeap(List<? extends KeyValueScanner> scanners, CellComparator comparator, 3221 MyKeyValueHeapHook hook) throws IOException { 3222 super(scanners, comparator); 3223 this.hook = hook; 3224 } 3225 3226 @Override 3227 public void recordBlockSize(IntConsumer blockSizeConsumer) { 3228 recordBlockSizeCallCount++; 3229 hook.onRecordBlockSize(recordBlockSizeCallCount); 3230 super.recordBlockSize(blockSizeConsumer); 3231 } 3232 } 3233 3234 public static class MyCompactingMemStore2 extends CompactingMemStore { 3235 private static final String LARGE_CELL_THREAD_NAME = "largeCellThread"; 3236 private static final String SMALL_CELL_THREAD_NAME = "smallCellThread"; 3237 private final CyclicBarrier preCyclicBarrier = new CyclicBarrier(2); 3238 private final CyclicBarrier postCyclicBarrier = new CyclicBarrier(2); 3239 private final AtomicInteger largeCellPreUpdateCounter = new AtomicInteger(0); 3240 private final AtomicInteger smallCellPreUpdateCounter = new AtomicInteger(0); 3241 3242 public MyCompactingMemStore2(Configuration conf, CellComparatorImpl cellComparator, 3243 HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy) 3244 throws IOException { 3245 super(conf, cellComparator, store, regionServices, compactionPolicy); 3246 } 3247 3248 @Override 3249 protected boolean checkAndAddToActiveSize(MutableSegment currActive, Cell cellToAdd, 3250 MemStoreSizing memstoreSizing) { 3251 if (Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)) { 3252 int currentCount = largeCellPreUpdateCounter.incrementAndGet(); 3253 if (currentCount <= 1) { 3254 try { 3255 /** 3256 * smallCellThread enters CompactingMemStore.checkAndAddToActiveSize first, then 3257 * largeCellThread enters CompactingMemStore.checkAndAddToActiveSize, and then 3258 * largeCellThread invokes flushInMemory. 3259 */ 3260 preCyclicBarrier.await(); 3261 } catch (Throwable e) { 3262 throw new RuntimeException(e); 3263 } 3264 } 3265 } 3266 3267 boolean returnValue = super.checkAndAddToActiveSize(currActive, cellToAdd, memstoreSizing); 3268 if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) { 3269 try { 3270 preCyclicBarrier.await(); 3271 } catch (Throwable e) { 3272 throw new RuntimeException(e); 3273 } 3274 } 3275 return returnValue; 3276 } 3277 3278 @Override 3279 protected void doAdd(MutableSegment currentActive, ExtendedCell cell, 3280 MemStoreSizing memstoreSizing) { 3281 if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) { 3282 try { 3283 /** 3284 * After largeCellThread finished flushInMemory method, smallCellThread can add cell to 3285 * currentActive . That is to say when largeCellThread called flushInMemory method, 3286 * currentActive has no cell. 3287 */ 3288 postCyclicBarrier.await(); 3289 } catch (Throwable e) { 3290 throw new RuntimeException(e); 3291 } 3292 } 3293 super.doAdd(currentActive, cell, memstoreSizing); 3294 } 3295 3296 @Override 3297 protected void flushInMemory(MutableSegment currentActiveMutableSegment) { 3298 super.flushInMemory(currentActiveMutableSegment); 3299 if (Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)) { 3300 if (largeCellPreUpdateCounter.get() <= 1) { 3301 try { 3302 postCyclicBarrier.await(); 3303 } catch (Throwable e) { 3304 throw new RuntimeException(e); 3305 } 3306 } 3307 } 3308 } 3309 3310 } 3311 3312 public static class MyCompactingMemStore3 extends CompactingMemStore { 3313 private static final String LARGE_CELL_THREAD_NAME = "largeCellThread"; 3314 private static final String SMALL_CELL_THREAD_NAME = "smallCellThread"; 3315 3316 private final CyclicBarrier preCyclicBarrier = new CyclicBarrier(2); 3317 private final CyclicBarrier postCyclicBarrier = new CyclicBarrier(2); 3318 private final AtomicInteger flushCounter = new AtomicInteger(0); 3319 private static final int CELL_COUNT = 5; 3320 private boolean flushByteSizeLessThanSmallAndLargeCellSize = true; 3321 3322 public MyCompactingMemStore3(Configuration conf, CellComparatorImpl cellComparator, 3323 HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy) 3324 throws IOException { 3325 super(conf, cellComparator, store, regionServices, compactionPolicy); 3326 } 3327 3328 @Override 3329 protected boolean checkAndAddToActiveSize(MutableSegment currActive, Cell cellToAdd, 3330 MemStoreSizing memstoreSizing) { 3331 if (!flushByteSizeLessThanSmallAndLargeCellSize) { 3332 return super.checkAndAddToActiveSize(currActive, cellToAdd, memstoreSizing); 3333 } 3334 if (Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)) { 3335 try { 3336 preCyclicBarrier.await(); 3337 } catch (Throwable e) { 3338 throw new RuntimeException(e); 3339 } 3340 } 3341 3342 boolean returnValue = super.checkAndAddToActiveSize(currActive, cellToAdd, memstoreSizing); 3343 if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) { 3344 try { 3345 preCyclicBarrier.await(); 3346 } catch (Throwable e) { 3347 throw new RuntimeException(e); 3348 } 3349 } 3350 return returnValue; 3351 } 3352 3353 @Override 3354 protected void postUpdate(MutableSegment currentActiveMutableSegment) { 3355 super.postUpdate(currentActiveMutableSegment); 3356 if (!flushByteSizeLessThanSmallAndLargeCellSize) { 3357 try { 3358 postCyclicBarrier.await(); 3359 } catch (Throwable e) { 3360 throw new RuntimeException(e); 3361 } 3362 return; 3363 } 3364 3365 if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) { 3366 try { 3367 postCyclicBarrier.await(); 3368 } catch (Throwable e) { 3369 throw new RuntimeException(e); 3370 } 3371 } 3372 } 3373 3374 @Override 3375 protected void flushInMemory(MutableSegment currentActiveMutableSegment) { 3376 super.flushInMemory(currentActiveMutableSegment); 3377 flushCounter.incrementAndGet(); 3378 if (!flushByteSizeLessThanSmallAndLargeCellSize) { 3379 return; 3380 } 3381 3382 assertTrue(Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)); 3383 try { 3384 postCyclicBarrier.await(); 3385 } catch (Throwable e) { 3386 throw new RuntimeException(e); 3387 } 3388 3389 } 3390 3391 void disableCompaction() { 3392 allowCompaction.set(false); 3393 } 3394 3395 void enableCompaction() { 3396 allowCompaction.set(true); 3397 } 3398 3399 } 3400 3401 public static class MyCompactingMemStore4 extends CompactingMemStore { 3402 private static final String TAKE_SNAPSHOT_THREAD_NAME = "takeSnapShotThread"; 3403 /** 3404 * {@link CompactingMemStore#flattenOneSegment} must execute after 3405 * {@link CompactingMemStore#getImmutableSegments} 3406 */ 3407 private final CyclicBarrier flattenOneSegmentPreCyclicBarrier = new CyclicBarrier(2); 3408 /** 3409 * Only after {@link CompactingMemStore#flattenOneSegment} completed, 3410 * {@link CompactingMemStore#swapPipelineWithNull} could execute. 3411 */ 3412 private final CyclicBarrier flattenOneSegmentPostCyclicBarrier = new CyclicBarrier(2); 3413 /** 3414 * Only the in memory compact thread enters {@link CompactingMemStore#flattenOneSegment},the 3415 * snapshot thread starts {@link CompactingMemStore#snapshot},because 3416 * {@link CompactingMemStore#snapshot} would invoke {@link CompactingMemStore#stopCompaction}. 3417 */ 3418 private final CyclicBarrier snapShotStartCyclicCyclicBarrier = new CyclicBarrier(2); 3419 /** 3420 * To wait for {@link CompactingMemStore.InMemoryCompactionRunnable} stopping. 3421 */ 3422 private final CyclicBarrier inMemoryCompactionEndCyclicBarrier = new CyclicBarrier(2); 3423 private final AtomicInteger getImmutableSegmentsListCounter = new AtomicInteger(0); 3424 private final AtomicInteger swapPipelineWithNullCounter = new AtomicInteger(0); 3425 private final AtomicInteger flattenOneSegmentCounter = new AtomicInteger(0); 3426 private final AtomicInteger setInMemoryCompactionFlagCounter = new AtomicInteger(0); 3427 3428 public MyCompactingMemStore4(Configuration conf, CellComparatorImpl cellComparator, 3429 HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy) 3430 throws IOException { 3431 super(conf, cellComparator, store, regionServices, compactionPolicy); 3432 } 3433 3434 @Override 3435 public VersionedSegmentsList getImmutableSegments() { 3436 VersionedSegmentsList result = super.getImmutableSegments(); 3437 if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) { 3438 int currentCount = getImmutableSegmentsListCounter.incrementAndGet(); 3439 if (currentCount <= 1) { 3440 try { 3441 flattenOneSegmentPreCyclicBarrier.await(); 3442 } catch (Throwable e) { 3443 throw new RuntimeException(e); 3444 } 3445 } 3446 } 3447 return result; 3448 } 3449 3450 @Override 3451 protected boolean swapPipelineWithNull(VersionedSegmentsList segments) { 3452 if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) { 3453 int currentCount = swapPipelineWithNullCounter.incrementAndGet(); 3454 if (currentCount <= 1) { 3455 try { 3456 flattenOneSegmentPostCyclicBarrier.await(); 3457 } catch (Throwable e) { 3458 throw new RuntimeException(e); 3459 } 3460 } 3461 } 3462 boolean result = super.swapPipelineWithNull(segments); 3463 if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) { 3464 int currentCount = swapPipelineWithNullCounter.get(); 3465 if (currentCount <= 1) { 3466 assertTrue(!result); 3467 } 3468 if (currentCount == 2) { 3469 assertTrue(result); 3470 } 3471 } 3472 return result; 3473 3474 } 3475 3476 @Override 3477 public void flattenOneSegment(long requesterVersion, Action action) { 3478 int currentCount = flattenOneSegmentCounter.incrementAndGet(); 3479 if (currentCount <= 1) { 3480 try { 3481 /** 3482 * {@link CompactingMemStore#snapshot} could start. 3483 */ 3484 snapShotStartCyclicCyclicBarrier.await(); 3485 flattenOneSegmentPreCyclicBarrier.await(); 3486 } catch (Throwable e) { 3487 throw new RuntimeException(e); 3488 } 3489 } 3490 super.flattenOneSegment(requesterVersion, action); 3491 if (currentCount <= 1) { 3492 try { 3493 flattenOneSegmentPostCyclicBarrier.await(); 3494 } catch (Throwable e) { 3495 throw new RuntimeException(e); 3496 } 3497 } 3498 } 3499 3500 @Override 3501 protected boolean setInMemoryCompactionFlag() { 3502 boolean result = super.setInMemoryCompactionFlag(); 3503 assertTrue(result); 3504 setInMemoryCompactionFlagCounter.incrementAndGet(); 3505 return result; 3506 } 3507 3508 @Override 3509 void inMemoryCompaction() { 3510 try { 3511 super.inMemoryCompaction(); 3512 } finally { 3513 try { 3514 inMemoryCompactionEndCyclicBarrier.await(); 3515 } catch (Throwable e) { 3516 throw new RuntimeException(e); 3517 } 3518 3519 } 3520 } 3521 3522 } 3523 3524 public static class MyCompactingMemStore5 extends CompactingMemStore { 3525 private static final String TAKE_SNAPSHOT_THREAD_NAME = "takeSnapShotThread"; 3526 private static final String WRITE_AGAIN_THREAD_NAME = "writeAgainThread"; 3527 /** 3528 * {@link CompactingMemStore#flattenOneSegment} must execute after 3529 * {@link CompactingMemStore#getImmutableSegments} 3530 */ 3531 private final CyclicBarrier flattenOneSegmentPreCyclicBarrier = new CyclicBarrier(2); 3532 /** 3533 * Only after {@link CompactingMemStore#flattenOneSegment} completed, 3534 * {@link CompactingMemStore#swapPipelineWithNull} could execute. 3535 */ 3536 private final CyclicBarrier flattenOneSegmentPostCyclicBarrier = new CyclicBarrier(2); 3537 /** 3538 * Only the in memory compact thread enters {@link CompactingMemStore#flattenOneSegment},the 3539 * snapshot thread starts {@link CompactingMemStore#snapshot},because 3540 * {@link CompactingMemStore#snapshot} would invoke {@link CompactingMemStore#stopCompaction}. 3541 */ 3542 private final CyclicBarrier snapShotStartCyclicCyclicBarrier = new CyclicBarrier(2); 3543 /** 3544 * To wait for {@link CompactingMemStore.InMemoryCompactionRunnable} stopping. 3545 */ 3546 private final CyclicBarrier inMemoryCompactionEndCyclicBarrier = new CyclicBarrier(2); 3547 private final AtomicInteger getImmutableSegmentsListCounter = new AtomicInteger(0); 3548 private final AtomicInteger swapPipelineWithNullCounter = new AtomicInteger(0); 3549 private final AtomicInteger flattenOneSegmentCounter = new AtomicInteger(0); 3550 private final AtomicInteger setInMemoryCompactionFlagCounter = new AtomicInteger(0); 3551 /** 3552 * Only the snapshot thread retry {@link CompactingMemStore#swapPipelineWithNull}, writeAgain 3553 * thread could start. 3554 */ 3555 private final CyclicBarrier writeMemStoreAgainStartCyclicBarrier = new CyclicBarrier(2); 3556 /** 3557 * This is used for snapshot thread,writeAgain thread and in memory compact thread. Only the 3558 * writeAgain thread completes, {@link CompactingMemStore#swapPipelineWithNull} would 3559 * execute,and in memory compact thread would exit,because we expect that in memory compact 3560 * executing only once. 3561 */ 3562 private final CyclicBarrier writeMemStoreAgainEndCyclicBarrier = new CyclicBarrier(3); 3563 3564 public MyCompactingMemStore5(Configuration conf, CellComparatorImpl cellComparator, 3565 HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy) 3566 throws IOException { 3567 super(conf, cellComparator, store, regionServices, compactionPolicy); 3568 } 3569 3570 @Override 3571 public VersionedSegmentsList getImmutableSegments() { 3572 VersionedSegmentsList result = super.getImmutableSegments(); 3573 if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) { 3574 int currentCount = getImmutableSegmentsListCounter.incrementAndGet(); 3575 if (currentCount <= 1) { 3576 try { 3577 flattenOneSegmentPreCyclicBarrier.await(); 3578 } catch (Throwable e) { 3579 throw new RuntimeException(e); 3580 } 3581 } 3582 3583 } 3584 3585 return result; 3586 } 3587 3588 @Override 3589 protected boolean swapPipelineWithNull(VersionedSegmentsList segments) { 3590 if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) { 3591 int currentCount = swapPipelineWithNullCounter.incrementAndGet(); 3592 if (currentCount <= 1) { 3593 try { 3594 flattenOneSegmentPostCyclicBarrier.await(); 3595 } catch (Throwable e) { 3596 throw new RuntimeException(e); 3597 } 3598 } 3599 3600 if (currentCount == 2) { 3601 try { 3602 /** 3603 * Only the snapshot thread retry {@link CompactingMemStore#swapPipelineWithNull}, 3604 * writeAgain thread could start. 3605 */ 3606 writeMemStoreAgainStartCyclicBarrier.await(); 3607 /** 3608 * Only the writeAgain thread completes, retry 3609 * {@link CompactingMemStore#swapPipelineWithNull} would execute. 3610 */ 3611 writeMemStoreAgainEndCyclicBarrier.await(); 3612 } catch (Throwable e) { 3613 throw new RuntimeException(e); 3614 } 3615 } 3616 3617 } 3618 boolean result = super.swapPipelineWithNull(segments); 3619 if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) { 3620 int currentCount = swapPipelineWithNullCounter.get(); 3621 if (currentCount <= 1) { 3622 assertTrue(!result); 3623 } 3624 if (currentCount == 2) { 3625 assertTrue(result); 3626 } 3627 } 3628 return result; 3629 3630 } 3631 3632 @Override 3633 public void flattenOneSegment(long requesterVersion, Action action) { 3634 int currentCount = flattenOneSegmentCounter.incrementAndGet(); 3635 if (currentCount <= 1) { 3636 try { 3637 /** 3638 * {@link CompactingMemStore#snapshot} could start. 3639 */ 3640 snapShotStartCyclicCyclicBarrier.await(); 3641 flattenOneSegmentPreCyclicBarrier.await(); 3642 } catch (Throwable e) { 3643 throw new RuntimeException(e); 3644 } 3645 } 3646 super.flattenOneSegment(requesterVersion, action); 3647 if (currentCount <= 1) { 3648 try { 3649 flattenOneSegmentPostCyclicBarrier.await(); 3650 /** 3651 * Only the writeAgain thread completes, in memory compact thread would exit,because we 3652 * expect that in memory compact executing only once. 3653 */ 3654 writeMemStoreAgainEndCyclicBarrier.await(); 3655 } catch (Throwable e) { 3656 throw new RuntimeException(e); 3657 } 3658 3659 } 3660 } 3661 3662 @Override 3663 protected boolean setInMemoryCompactionFlag() { 3664 boolean result = super.setInMemoryCompactionFlag(); 3665 int count = setInMemoryCompactionFlagCounter.incrementAndGet(); 3666 if (count <= 1) { 3667 assertTrue(result); 3668 } 3669 if (count == 2) { 3670 assertTrue(!result); 3671 } 3672 return result; 3673 } 3674 3675 @Override 3676 void inMemoryCompaction() { 3677 try { 3678 super.inMemoryCompaction(); 3679 } finally { 3680 try { 3681 inMemoryCompactionEndCyclicBarrier.await(); 3682 } catch (Throwable e) { 3683 throw new RuntimeException(e); 3684 } 3685 3686 } 3687 } 3688 } 3689 3690 public static class MyCompactingMemStore6 extends CompactingMemStore { 3691 private final CyclicBarrier inMemoryCompactionEndCyclicBarrier = new CyclicBarrier(2); 3692 3693 public MyCompactingMemStore6(Configuration conf, CellComparatorImpl cellComparator, 3694 HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy) 3695 throws IOException { 3696 super(conf, cellComparator, store, regionServices, compactionPolicy); 3697 } 3698 3699 @Override 3700 void inMemoryCompaction() { 3701 try { 3702 super.inMemoryCompaction(); 3703 } finally { 3704 try { 3705 inMemoryCompactionEndCyclicBarrier.await(); 3706 } catch (Throwable e) { 3707 throw new RuntimeException(e); 3708 } 3709 3710 } 3711 } 3712 } 3713 3714 public static class MyDefaultMemStore extends DefaultMemStore { 3715 private static final String GET_SCANNER_THREAD_NAME = "getScannerMyThread"; 3716 private static final String FLUSH_THREAD_NAME = "flushMyThread"; 3717 /** 3718 * Only when flush thread enters {@link DefaultMemStore#doClearSnapShot}, getScanner thread 3719 * could start. 3720 */ 3721 private final CyclicBarrier getScannerCyclicBarrier = new CyclicBarrier(2); 3722 /** 3723 * Used by getScanner thread notifies flush thread {@link DefaultMemStore#getSnapshotSegments} 3724 * completed, {@link DefaultMemStore#doClearSnapShot} could continue. 3725 */ 3726 private final CyclicBarrier preClearSnapShotCyclicBarrier = new CyclicBarrier(2); 3727 /** 3728 * Used by flush thread notifies getScanner thread {@link DefaultMemStore#doClearSnapShot} 3729 * completed, {@link DefaultMemStore#getScanners} could continue. 3730 */ 3731 private final CyclicBarrier postClearSnapShotCyclicBarrier = new CyclicBarrier(2); 3732 private final AtomicInteger getSnapshotSegmentsCounter = new AtomicInteger(0); 3733 private final AtomicInteger clearSnapshotCounter = new AtomicInteger(0); 3734 private volatile boolean shouldWait = true; 3735 private volatile HStore store = null; 3736 3737 public MyDefaultMemStore(Configuration conf, CellComparator cellComparator, 3738 RegionServicesForStores regionServices) throws IOException { 3739 super(conf, cellComparator, regionServices); 3740 } 3741 3742 @Override 3743 protected List<Segment> getSnapshotSegments() { 3744 3745 List<Segment> result = super.getSnapshotSegments(); 3746 3747 if (Thread.currentThread().getName().equals(GET_SCANNER_THREAD_NAME)) { 3748 int currentCount = getSnapshotSegmentsCounter.incrementAndGet(); 3749 if (currentCount == 1) { 3750 if (this.shouldWait) { 3751 try { 3752 /** 3753 * Notify flush thread {@link DefaultMemStore#getSnapshotSegments} completed, 3754 * {@link DefaultMemStore#doClearSnapShot} could continue. 3755 */ 3756 preClearSnapShotCyclicBarrier.await(); 3757 /** 3758 * Wait for {@link DefaultMemStore#doClearSnapShot} completed. 3759 */ 3760 postClearSnapShotCyclicBarrier.await(); 3761 3762 } catch (Throwable e) { 3763 throw new RuntimeException(e); 3764 } 3765 } 3766 } 3767 } 3768 return result; 3769 } 3770 3771 @Override 3772 protected void doClearSnapShot() { 3773 if (Thread.currentThread().getName().equals(FLUSH_THREAD_NAME)) { 3774 int currentCount = clearSnapshotCounter.incrementAndGet(); 3775 if (currentCount == 1) { 3776 try { 3777 if ( 3778 ((ReentrantReadWriteLock) store.getStoreEngine().getLock()) 3779 .isWriteLockedByCurrentThread() 3780 ) { 3781 shouldWait = false; 3782 } 3783 /** 3784 * Only when flush thread enters {@link DefaultMemStore#doClearSnapShot}, getScanner 3785 * thread could start. 3786 */ 3787 getScannerCyclicBarrier.await(); 3788 3789 if (shouldWait) { 3790 /** 3791 * Wait for {@link DefaultMemStore#getSnapshotSegments} completed. 3792 */ 3793 preClearSnapShotCyclicBarrier.await(); 3794 } 3795 } catch (Throwable e) { 3796 throw new RuntimeException(e); 3797 } 3798 } 3799 } 3800 super.doClearSnapShot(); 3801 3802 if (Thread.currentThread().getName().equals(FLUSH_THREAD_NAME)) { 3803 int currentCount = clearSnapshotCounter.get(); 3804 if (currentCount == 1) { 3805 if (shouldWait) { 3806 try { 3807 /** 3808 * Notify getScanner thread {@link DefaultMemStore#doClearSnapShot} completed, 3809 * {@link DefaultMemStore#getScanners} could continue. 3810 */ 3811 postClearSnapShotCyclicBarrier.await(); 3812 } catch (Throwable e) { 3813 throw new RuntimeException(e); 3814 } 3815 } 3816 } 3817 } 3818 } 3819 } 3820}