001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.regionserver.DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY; 021import static org.junit.Assert.assertArrayEquals; 022import static org.junit.Assert.assertEquals; 023import static org.junit.Assert.assertFalse; 024import static org.junit.Assert.assertNull; 025import static org.junit.Assert.assertTrue; 026import static org.junit.Assert.fail; 027import static org.mockito.ArgumentMatchers.any; 028import static org.mockito.Mockito.mock; 029import static org.mockito.Mockito.spy; 030import static org.mockito.Mockito.times; 031import static org.mockito.Mockito.verify; 032import static org.mockito.Mockito.when; 033 034import java.io.FileNotFoundException; 035import java.io.IOException; 036import java.lang.ref.SoftReference; 037import java.security.PrivilegedExceptionAction; 038import java.util.ArrayList; 039import java.util.Arrays; 040import java.util.Collection; 041import java.util.Collections; 042import java.util.Iterator; 043import java.util.List; 044import java.util.ListIterator; 045import java.util.NavigableSet; 046import java.util.Optional; 047import java.util.TreeSet; 048import java.util.concurrent.BrokenBarrierException; 049import java.util.concurrent.ConcurrentSkipListSet; 050import java.util.concurrent.CountDownLatch; 051import java.util.concurrent.CyclicBarrier; 052import java.util.concurrent.ExecutorService; 053import java.util.concurrent.Executors; 054import java.util.concurrent.Future; 055import java.util.concurrent.ThreadPoolExecutor; 056import java.util.concurrent.TimeUnit; 057import java.util.concurrent.atomic.AtomicBoolean; 058import java.util.concurrent.atomic.AtomicInteger; 059import java.util.concurrent.atomic.AtomicLong; 060import java.util.concurrent.atomic.AtomicReference; 061import java.util.concurrent.locks.ReentrantReadWriteLock; 062import java.util.function.Consumer; 063import java.util.function.IntBinaryOperator; 064import org.apache.hadoop.conf.Configuration; 065import org.apache.hadoop.fs.FSDataOutputStream; 066import org.apache.hadoop.fs.FileStatus; 067import org.apache.hadoop.fs.FileSystem; 068import org.apache.hadoop.fs.FilterFileSystem; 069import org.apache.hadoop.fs.LocalFileSystem; 070import org.apache.hadoop.fs.Path; 071import org.apache.hadoop.fs.permission.FsPermission; 072import org.apache.hadoop.hbase.Cell; 073import org.apache.hadoop.hbase.CellBuilderFactory; 074import org.apache.hadoop.hbase.CellBuilderType; 075import org.apache.hadoop.hbase.CellComparator; 076import org.apache.hadoop.hbase.CellComparatorImpl; 077import org.apache.hadoop.hbase.CellUtil; 078import org.apache.hadoop.hbase.ExtendedCell; 079import org.apache.hadoop.hbase.HBaseClassTestRule; 080import org.apache.hadoop.hbase.HBaseConfiguration; 081import org.apache.hadoop.hbase.HBaseTestingUtil; 082import org.apache.hadoop.hbase.HConstants; 083import org.apache.hadoop.hbase.KeyValue; 084import org.apache.hadoop.hbase.MemoryCompactionPolicy; 085import org.apache.hadoop.hbase.PrivateCellUtil; 086import org.apache.hadoop.hbase.TableName; 087import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 088import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 089import org.apache.hadoop.hbase.client.Get; 090import org.apache.hadoop.hbase.client.RegionInfo; 091import org.apache.hadoop.hbase.client.RegionInfoBuilder; 092import org.apache.hadoop.hbase.client.Scan; 093import org.apache.hadoop.hbase.client.Scan.ReadType; 094import org.apache.hadoop.hbase.client.TableDescriptor; 095import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 096import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException; 097import org.apache.hadoop.hbase.filter.Filter; 098import org.apache.hadoop.hbase.filter.FilterBase; 099import org.apache.hadoop.hbase.io.compress.Compression; 100import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 101import org.apache.hadoop.hbase.io.hfile.CacheConfig; 102import org.apache.hadoop.hbase.io.hfile.HFile; 103import org.apache.hadoop.hbase.io.hfile.HFileContext; 104import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 105import org.apache.hadoop.hbase.monitoring.MonitoredTask; 106import org.apache.hadoop.hbase.nio.RefCnt; 107import org.apache.hadoop.hbase.quotas.RegionSizeStoreImpl; 108import org.apache.hadoop.hbase.regionserver.ChunkCreator.ChunkType; 109import org.apache.hadoop.hbase.regionserver.MemStoreCompactionStrategy.Action; 110import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; 111import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; 112import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; 113import org.apache.hadoop.hbase.regionserver.compactions.EverythingPolicy; 114import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher; 115import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; 116import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 117import org.apache.hadoop.hbase.security.User; 118import org.apache.hadoop.hbase.testclassification.MediumTests; 119import org.apache.hadoop.hbase.testclassification.RegionServerTests; 120import org.apache.hadoop.hbase.util.BloomFilterUtil; 121import org.apache.hadoop.hbase.util.Bytes; 122import org.apache.hadoop.hbase.util.CommonFSUtils; 123import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 124import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; 125import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge; 126import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; 127import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 128import org.apache.hadoop.hbase.wal.WALFactory; 129import org.apache.hadoop.util.Progressable; 130import org.junit.After; 131import org.junit.AfterClass; 132import org.junit.Before; 133import org.junit.ClassRule; 134import org.junit.Rule; 135import org.junit.Test; 136import org.junit.experimental.categories.Category; 137import org.junit.rules.TestName; 138import org.mockito.Mockito; 139import org.slf4j.Logger; 140import org.slf4j.LoggerFactory; 141 142import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 143 144/** 145 * Test class for the HStore 146 */ 147@Category({ RegionServerTests.class, MediumTests.class }) 148public class TestHStore { 149 150 @ClassRule 151 public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestHStore.class); 152 153 private static final Logger LOG = LoggerFactory.getLogger(TestHStore.class); 154 @Rule 155 public TestName name = new TestName(); 156 157 HRegion region; 158 HStore store; 159 byte[] table = Bytes.toBytes("table"); 160 byte[] family = Bytes.toBytes("family"); 161 162 byte[] row = Bytes.toBytes("row"); 163 byte[] row2 = Bytes.toBytes("row2"); 164 byte[] qf1 = Bytes.toBytes("qf1"); 165 byte[] qf2 = Bytes.toBytes("qf2"); 166 byte[] qf3 = Bytes.toBytes("qf3"); 167 byte[] qf4 = Bytes.toBytes("qf4"); 168 byte[] qf5 = Bytes.toBytes("qf5"); 169 byte[] qf6 = Bytes.toBytes("qf6"); 170 171 NavigableSet<byte[]> qualifiers = new ConcurrentSkipListSet<>(Bytes.BYTES_COMPARATOR); 172 173 List<Cell> expected = new ArrayList<>(); 174 List<Cell> result = new ArrayList<>(); 175 176 long id = EnvironmentEdgeManager.currentTime(); 177 Get get = new Get(row); 178 179 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 180 private static final String DIR = TEST_UTIL.getDataTestDir("TestStore").toString(); 181 182 @Before 183 public void setUp() throws IOException { 184 qualifiers.clear(); 185 qualifiers.add(qf1); 186 qualifiers.add(qf3); 187 qualifiers.add(qf5); 188 189 Iterator<byte[]> iter = qualifiers.iterator(); 190 while (iter.hasNext()) { 191 byte[] next = iter.next(); 192 expected.add(new KeyValue(row, family, next, 1, (byte[]) null)); 193 get.addColumn(family, next); 194 } 195 } 196 197 private void init(String methodName) throws IOException { 198 init(methodName, TEST_UTIL.getConfiguration()); 199 } 200 201 private HStore init(String methodName, Configuration conf) throws IOException { 202 // some of the tests write 4 versions and then flush 203 // (with HBASE-4241, lower versions are collected on flush) 204 return init(methodName, conf, 205 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(4).build()); 206 } 207 208 private HStore init(String methodName, Configuration conf, ColumnFamilyDescriptor hcd) 209 throws IOException { 210 return init(methodName, conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), hcd); 211 } 212 213 private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder, 214 ColumnFamilyDescriptor hcd) throws IOException { 215 return init(methodName, conf, builder, hcd, null); 216 } 217 218 private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder, 219 ColumnFamilyDescriptor hcd, MyStoreHook hook) throws IOException { 220 return init(methodName, conf, builder, hcd, hook, false); 221 } 222 223 private void initHRegion(String methodName, Configuration conf, TableDescriptorBuilder builder, 224 ColumnFamilyDescriptor hcd, MyStoreHook hook, boolean switchToPread) throws IOException { 225 TableDescriptor htd = builder.setColumnFamily(hcd).build(); 226 Path basedir = new Path(DIR + methodName); 227 Path tableDir = CommonFSUtils.getTableDir(basedir, htd.getTableName()); 228 final Path logdir = new Path(basedir, AbstractFSWALProvider.getWALDirectoryName(methodName)); 229 230 FileSystem fs = FileSystem.get(conf); 231 232 fs.delete(logdir, true); 233 ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 234 MemStoreLABImpl.CHUNK_SIZE_DEFAULT, 1, 0, null, 235 MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); 236 RegionInfo info = RegionInfoBuilder.newBuilder(htd.getTableName()).build(); 237 Configuration walConf = new Configuration(conf); 238 CommonFSUtils.setRootDir(walConf, basedir); 239 WALFactory wals = new WALFactory(walConf, methodName); 240 region = new HRegion(new HRegionFileSystem(conf, fs, tableDir, info), wals.getWAL(info), conf, 241 htd, null); 242 region.regionServicesForStores = Mockito.spy(region.regionServicesForStores); 243 ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1); 244 Mockito.when(region.regionServicesForStores.getInMemoryCompactionPool()).thenReturn(pool); 245 } 246 247 private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder, 248 ColumnFamilyDescriptor hcd, MyStoreHook hook, boolean switchToPread) throws IOException { 249 initHRegion(methodName, conf, builder, hcd, hook, switchToPread); 250 if (hook == null) { 251 store = new HStore(region, hcd, conf, false); 252 } else { 253 store = new MyStore(region, hcd, conf, hook, switchToPread); 254 } 255 region.stores.put(store.getColumnFamilyDescriptor().getName(), store); 256 return store; 257 } 258 259 /** 260 * Test we do not lose data if we fail a flush and then close. Part of HBase-10466 261 */ 262 @Test 263 public void testFlushSizeSizing() throws Exception { 264 LOG.info("Setting up a faulty file system that cannot write in " + this.name.getMethodName()); 265 final Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); 266 // Only retry once. 267 conf.setInt("hbase.hstore.flush.retries.number", 1); 268 User user = User.createUserForTesting(conf, this.name.getMethodName(), new String[] { "foo" }); 269 // Inject our faulty LocalFileSystem 270 conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class); 271 user.runAs(new PrivilegedExceptionAction<Object>() { 272 @Override 273 public Object run() throws Exception { 274 // Make sure it worked (above is sensitive to caching details in hadoop core) 275 FileSystem fs = FileSystem.get(conf); 276 assertEquals(FaultyFileSystem.class, fs.getClass()); 277 FaultyFileSystem ffs = (FaultyFileSystem) fs; 278 279 // Initialize region 280 init(name.getMethodName(), conf); 281 282 MemStoreSize mss = store.memstore.getFlushableSize(); 283 assertEquals(0, mss.getDataSize()); 284 LOG.info("Adding some data"); 285 MemStoreSizing kvSize = new NonThreadSafeMemStoreSizing(); 286 store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), kvSize); 287 // add the heap size of active (mutable) segment 288 kvSize.incMemStoreSize(0, MutableSegment.DEEP_OVERHEAD, 0, 0); 289 mss = store.memstore.getFlushableSize(); 290 assertEquals(kvSize.getMemStoreSize(), mss); 291 // Flush. Bug #1 from HBASE-10466. Make sure size calculation on failed flush is right. 292 try { 293 LOG.info("Flushing"); 294 flushStore(store, id++); 295 fail("Didn't bubble up IOE!"); 296 } catch (IOException ioe) { 297 assertTrue(ioe.getMessage().contains("Fault injected")); 298 } 299 // due to snapshot, change mutable to immutable segment 300 kvSize.incMemStoreSize(0, 301 CSLMImmutableSegment.DEEP_OVERHEAD_CSLM - MutableSegment.DEEP_OVERHEAD, 0, 0); 302 mss = store.memstore.getFlushableSize(); 303 assertEquals(kvSize.getMemStoreSize(), mss); 304 MemStoreSizing kvSize2 = new NonThreadSafeMemStoreSizing(); 305 store.add(new KeyValue(row, family, qf2, 2, (byte[]) null), kvSize2); 306 kvSize2.incMemStoreSize(0, MutableSegment.DEEP_OVERHEAD, 0, 0); 307 // Even though we add a new kv, we expect the flushable size to be 'same' since we have 308 // not yet cleared the snapshot -- the above flush failed. 309 assertEquals(kvSize.getMemStoreSize(), mss); 310 ffs.fault.set(false); 311 flushStore(store, id++); 312 mss = store.memstore.getFlushableSize(); 313 // Size should be the foreground kv size. 314 assertEquals(kvSize2.getMemStoreSize(), mss); 315 flushStore(store, id++); 316 mss = store.memstore.getFlushableSize(); 317 assertEquals(0, mss.getDataSize()); 318 assertEquals(MutableSegment.DEEP_OVERHEAD, mss.getHeapSize()); 319 return null; 320 } 321 }); 322 } 323 324 @Test 325 public void testStoreBloomFilterMetricsWithBloomRowCol() throws IOException { 326 int numStoreFiles = 5; 327 writeAndRead(BloomType.ROWCOL, numStoreFiles); 328 329 assertEquals(0, store.getBloomFilterEligibleRequestsCount()); 330 // hard to know exactly the numbers here, we are just trying to 331 // prove that they are incrementing 332 assertTrue(store.getBloomFilterRequestsCount() >= numStoreFiles); 333 assertTrue(store.getBloomFilterNegativeResultsCount() > 0); 334 } 335 336 @Test 337 public void testStoreBloomFilterMetricsWithBloomRow() throws IOException { 338 int numStoreFiles = 5; 339 writeAndRead(BloomType.ROWCOL, numStoreFiles); 340 341 assertEquals(0, store.getBloomFilterEligibleRequestsCount()); 342 // hard to know exactly the numbers here, we are just trying to 343 // prove that they are incrementing 344 assertTrue(store.getBloomFilterRequestsCount() >= numStoreFiles); 345 assertTrue(store.getBloomFilterNegativeResultsCount() > 0); 346 } 347 348 @Test 349 public void testStoreBloomFilterMetricsWithBloomRowPrefix() throws IOException { 350 int numStoreFiles = 5; 351 writeAndRead(BloomType.ROWPREFIX_FIXED_LENGTH, numStoreFiles); 352 353 assertEquals(0, store.getBloomFilterEligibleRequestsCount()); 354 // hard to know exactly the numbers here, we are just trying to 355 // prove that they are incrementing 356 assertTrue(store.getBloomFilterRequestsCount() >= numStoreFiles); 357 } 358 359 @Test 360 public void testStoreBloomFilterMetricsWithBloomNone() throws IOException { 361 int numStoreFiles = 5; 362 writeAndRead(BloomType.NONE, numStoreFiles); 363 364 assertEquals(0, store.getBloomFilterRequestsCount()); 365 assertEquals(0, store.getBloomFilterNegativeResultsCount()); 366 367 // hard to know exactly the numbers here, we are just trying to 368 // prove that they are incrementing 369 assertTrue(store.getBloomFilterEligibleRequestsCount() >= numStoreFiles); 370 } 371 372 private void writeAndRead(BloomType bloomType, int numStoreFiles) throws IOException { 373 Configuration conf = HBaseConfiguration.create(); 374 FileSystem fs = FileSystem.get(conf); 375 376 ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.newBuilder(family) 377 .setCompressionType(Compression.Algorithm.GZ).setBloomFilterType(bloomType) 378 .setConfiguration(BloomFilterUtil.PREFIX_LENGTH_KEY, "3").build(); 379 init(name.getMethodName(), conf, hcd); 380 381 for (int i = 1; i <= numStoreFiles; i++) { 382 byte[] row = Bytes.toBytes("row" + i); 383 LOG.info("Adding some data for the store file #" + i); 384 long timeStamp = EnvironmentEdgeManager.currentTime(); 385 this.store.add(new KeyValue(row, family, qf1, timeStamp, (byte[]) null), null); 386 this.store.add(new KeyValue(row, family, qf2, timeStamp, (byte[]) null), null); 387 this.store.add(new KeyValue(row, family, qf3, timeStamp, (byte[]) null), null); 388 flush(i); 389 } 390 391 // Verify the total number of store files 392 assertEquals(numStoreFiles, this.store.getStorefiles().size()); 393 394 TreeSet<byte[]> columns = new TreeSet<>(Bytes.BYTES_COMPARATOR); 395 columns.add(qf1); 396 397 for (int i = 1; i <= numStoreFiles; i++) { 398 KeyValueScanner scanner = 399 store.getScanner(new Scan(new Get(Bytes.toBytes("row" + i))), columns, 0); 400 scanner.peek(); 401 } 402 } 403 404 /** 405 * Verify that compression and data block encoding are respected by the createWriter method, used 406 * on store flush. 407 */ 408 @Test 409 public void testCreateWriter() throws Exception { 410 Configuration conf = HBaseConfiguration.create(); 411 FileSystem fs = FileSystem.get(conf); 412 413 ColumnFamilyDescriptor hcd = 414 ColumnFamilyDescriptorBuilder.newBuilder(family).setCompressionType(Compression.Algorithm.GZ) 415 .setDataBlockEncoding(DataBlockEncoding.DIFF).build(); 416 init(name.getMethodName(), conf, hcd); 417 418 // Test createWriter 419 StoreFileWriter writer = store.getStoreEngine() 420 .createWriter(CreateStoreFileWriterParams.create().maxKeyCount(4) 421 .compression(hcd.getCompressionType()).isCompaction(false).includeMVCCReadpoint(true) 422 .includesTag(false).shouldDropBehind(false)); 423 Path path = writer.getPath(); 424 writer.append(new KeyValue(row, family, qf1, Bytes.toBytes(1))); 425 writer.append(new KeyValue(row, family, qf2, Bytes.toBytes(2))); 426 writer.append(new KeyValue(row2, family, qf1, Bytes.toBytes(3))); 427 writer.append(new KeyValue(row2, family, qf2, Bytes.toBytes(4))); 428 writer.close(); 429 430 // Verify that compression and encoding settings are respected 431 HFile.Reader reader = HFile.createReader(fs, path, new CacheConfig(conf), true, conf); 432 assertEquals(hcd.getCompressionType(), reader.getTrailer().getCompressionCodec()); 433 assertEquals(hcd.getDataBlockEncoding(), reader.getDataBlockEncoding()); 434 reader.close(); 435 } 436 437 @Test 438 public void testDeleteExpiredStoreFiles() throws Exception { 439 testDeleteExpiredStoreFiles(0); 440 testDeleteExpiredStoreFiles(1); 441 } 442 443 /** 444 * @param minVersions the MIN_VERSIONS for the column family 445 */ 446 public void testDeleteExpiredStoreFiles(int minVersions) throws Exception { 447 int storeFileNum = 4; 448 int ttl = 4; 449 IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge(); 450 EnvironmentEdgeManagerTestHelper.injectEdge(edge); 451 452 Configuration conf = HBaseConfiguration.create(); 453 // Enable the expired store file deletion 454 conf.setBoolean("hbase.store.delete.expired.storefile", true); 455 // Set the compaction threshold higher to avoid normal compactions. 456 conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 5); 457 458 init(name.getMethodName() + "-" + minVersions, conf, ColumnFamilyDescriptorBuilder 459 .newBuilder(family).setMinVersions(minVersions).setTimeToLive(ttl).build()); 460 461 long storeTtl = this.store.getScanInfo().getTtl(); 462 long sleepTime = storeTtl / storeFileNum; 463 long timeStamp; 464 // There are 4 store files and the max time stamp difference among these 465 // store files will be (this.store.ttl / storeFileNum) 466 for (int i = 1; i <= storeFileNum; i++) { 467 LOG.info("Adding some data for the store file #" + i); 468 timeStamp = EnvironmentEdgeManager.currentTime(); 469 this.store.add(new KeyValue(row, family, qf1, timeStamp, (byte[]) null), null); 470 this.store.add(new KeyValue(row, family, qf2, timeStamp, (byte[]) null), null); 471 this.store.add(new KeyValue(row, family, qf3, timeStamp, (byte[]) null), null); 472 flush(i); 473 edge.incrementTime(sleepTime); 474 } 475 476 // Verify the total number of store files 477 assertEquals(storeFileNum, this.store.getStorefiles().size()); 478 479 // Each call will find one expired store file and delete it before compaction happens. 480 // There will be no compaction due to threshold above. Last file will not be replaced. 481 for (int i = 1; i <= storeFileNum - 1; i++) { 482 // verify the expired store file. 483 assertFalse(this.store.requestCompaction().isPresent()); 484 Collection<HStoreFile> sfs = this.store.getStorefiles(); 485 // Ensure i files are gone. 486 if (minVersions == 0) { 487 assertEquals(storeFileNum - i, sfs.size()); 488 // Ensure only non-expired files remain. 489 for (HStoreFile sf : sfs) { 490 assertTrue(sf.getReader().getMaxTimestamp() >= (edge.currentTime() - storeTtl)); 491 } 492 } else { 493 assertEquals(storeFileNum, sfs.size()); 494 } 495 // Let the next store file expired. 496 edge.incrementTime(sleepTime); 497 } 498 assertFalse(this.store.requestCompaction().isPresent()); 499 500 Collection<HStoreFile> sfs = this.store.getStorefiles(); 501 // Assert the last expired file is not removed. 502 if (minVersions == 0) { 503 assertEquals(1, sfs.size()); 504 } 505 long ts = sfs.iterator().next().getReader().getMaxTimestamp(); 506 assertTrue(ts < (edge.currentTime() - storeTtl)); 507 508 for (HStoreFile sf : sfs) { 509 sf.closeStoreFile(true); 510 } 511 } 512 513 @Test 514 public void testLowestModificationTime() throws Exception { 515 Configuration conf = HBaseConfiguration.create(); 516 FileSystem fs = FileSystem.get(conf); 517 // Initialize region 518 init(name.getMethodName(), conf); 519 520 int storeFileNum = 4; 521 for (int i = 1; i <= storeFileNum; i++) { 522 LOG.info("Adding some data for the store file #" + i); 523 this.store.add(new KeyValue(row, family, qf1, i, (byte[]) null), null); 524 this.store.add(new KeyValue(row, family, qf2, i, (byte[]) null), null); 525 this.store.add(new KeyValue(row, family, qf3, i, (byte[]) null), null); 526 flush(i); 527 } 528 // after flush; check the lowest time stamp 529 long lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles()); 530 long lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles()); 531 assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS); 532 533 // after compact; check the lowest time stamp 534 store.compact(store.requestCompaction().get(), NoLimitThroughputController.INSTANCE, null); 535 lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles()); 536 lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles()); 537 assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS); 538 } 539 540 private static long getLowestTimeStampFromFS(FileSystem fs, 541 final Collection<HStoreFile> candidates) throws IOException { 542 long minTs = Long.MAX_VALUE; 543 if (candidates.isEmpty()) { 544 return minTs; 545 } 546 Path[] p = new Path[candidates.size()]; 547 int i = 0; 548 for (HStoreFile sf : candidates) { 549 p[i] = sf.getPath(); 550 ++i; 551 } 552 553 FileStatus[] stats = fs.listStatus(p); 554 if (stats == null || stats.length == 0) { 555 return minTs; 556 } 557 for (FileStatus s : stats) { 558 minTs = Math.min(minTs, s.getModificationTime()); 559 } 560 return minTs; 561 } 562 563 ////////////////////////////////////////////////////////////////////////////// 564 // Get tests 565 ////////////////////////////////////////////////////////////////////////////// 566 567 private static final int BLOCKSIZE_SMALL = 8192; 568 569 /** 570 * Test for hbase-1686. 571 */ 572 @Test 573 public void testEmptyStoreFile() throws IOException { 574 init(this.name.getMethodName()); 575 // Write a store file. 576 this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null); 577 this.store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null); 578 flush(1); 579 // Now put in place an empty store file. Its a little tricky. Have to 580 // do manually with hacked in sequence id. 581 HStoreFile f = this.store.getStorefiles().iterator().next(); 582 Path storedir = f.getPath().getParent(); 583 long seqid = f.getMaxSequenceId(); 584 Configuration c = HBaseConfiguration.create(); 585 FileSystem fs = FileSystem.get(c); 586 HFileContext meta = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).build(); 587 StoreFileWriter w = new StoreFileWriter.Builder(c, new CacheConfig(c), fs) 588 .withOutputDir(storedir).withFileContext(meta).build(); 589 w.appendMetadata(seqid + 1, false); 590 w.close(); 591 this.store.close(); 592 // Reopen it... should pick up two files 593 this.store = 594 new HStore(this.store.getHRegion(), this.store.getColumnFamilyDescriptor(), c, false); 595 assertEquals(2, this.store.getStorefilesCount()); 596 597 result = HBaseTestingUtil.getFromStoreFile(store, get.getRow(), qualifiers); 598 assertEquals(1, result.size()); 599 } 600 601 /** 602 * Getting data from memstore only 603 */ 604 @Test 605 public void testGet_FromMemStoreOnly() throws IOException { 606 init(this.name.getMethodName()); 607 608 // Put data in memstore 609 this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null); 610 this.store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null); 611 this.store.add(new KeyValue(row, family, qf3, 1, (byte[]) null), null); 612 this.store.add(new KeyValue(row, family, qf4, 1, (byte[]) null), null); 613 this.store.add(new KeyValue(row, family, qf5, 1, (byte[]) null), null); 614 this.store.add(new KeyValue(row, family, qf6, 1, (byte[]) null), null); 615 616 // Get 617 result = HBaseTestingUtil.getFromStoreFile(store, get.getRow(), qualifiers); 618 619 // Compare 620 assertCheck(); 621 } 622 623 @Test 624 public void testTimeRangeIfSomeCellsAreDroppedInFlush() throws IOException { 625 testTimeRangeIfSomeCellsAreDroppedInFlush(1); 626 testTimeRangeIfSomeCellsAreDroppedInFlush(3); 627 testTimeRangeIfSomeCellsAreDroppedInFlush(5); 628 } 629 630 private void testTimeRangeIfSomeCellsAreDroppedInFlush(int maxVersion) throws IOException { 631 init(this.name.getMethodName(), TEST_UTIL.getConfiguration(), 632 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(maxVersion).build()); 633 long currentTs = 100; 634 long minTs = currentTs; 635 // the extra cell won't be flushed to disk, 636 // so the min of timerange will be different between memStore and hfile. 637 for (int i = 0; i != (maxVersion + 1); ++i) { 638 this.store.add(new KeyValue(row, family, qf1, ++currentTs, (byte[]) null), null); 639 if (i == 1) { 640 minTs = currentTs; 641 } 642 } 643 flushStore(store, id++); 644 645 Collection<HStoreFile> files = store.getStorefiles(); 646 assertEquals(1, files.size()); 647 HStoreFile f = files.iterator().next(); 648 f.initReader(); 649 StoreFileReader reader = f.getReader(); 650 assertEquals(minTs, reader.timeRange.getMin()); 651 assertEquals(currentTs, reader.timeRange.getMax()); 652 } 653 654 /** 655 * Getting data from files only 656 */ 657 @Test 658 public void testGet_FromFilesOnly() throws IOException { 659 init(this.name.getMethodName()); 660 661 // Put data in memstore 662 this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null); 663 this.store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null); 664 // flush 665 flush(1); 666 667 // Add more data 668 this.store.add(new KeyValue(row, family, qf3, 1, (byte[]) null), null); 669 this.store.add(new KeyValue(row, family, qf4, 1, (byte[]) null), null); 670 // flush 671 flush(2); 672 673 // Add more data 674 this.store.add(new KeyValue(row, family, qf5, 1, (byte[]) null), null); 675 this.store.add(new KeyValue(row, family, qf6, 1, (byte[]) null), null); 676 // flush 677 flush(3); 678 679 // Get 680 result = HBaseTestingUtil.getFromStoreFile(store, get.getRow(), qualifiers); 681 // this.store.get(get, qualifiers, result); 682 683 // Need to sort the result since multiple files 684 Collections.sort(result, CellComparatorImpl.COMPARATOR); 685 686 // Compare 687 assertCheck(); 688 } 689 690 /** 691 * Getting data from memstore and files 692 */ 693 @Test 694 public void testGet_FromMemStoreAndFiles() throws IOException { 695 init(this.name.getMethodName()); 696 697 // Put data in memstore 698 this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null); 699 this.store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null); 700 // flush 701 flush(1); 702 703 // Add more data 704 this.store.add(new KeyValue(row, family, qf3, 1, (byte[]) null), null); 705 this.store.add(new KeyValue(row, family, qf4, 1, (byte[]) null), null); 706 // flush 707 flush(2); 708 709 // Add more data 710 this.store.add(new KeyValue(row, family, qf5, 1, (byte[]) null), null); 711 this.store.add(new KeyValue(row, family, qf6, 1, (byte[]) null), null); 712 713 // Get 714 result = HBaseTestingUtil.getFromStoreFile(store, get.getRow(), qualifiers); 715 716 // Need to sort the result since multiple files 717 Collections.sort(result, CellComparatorImpl.COMPARATOR); 718 719 // Compare 720 assertCheck(); 721 } 722 723 private void flush(int storeFilessize) throws IOException { 724 flushStore(store, id++); 725 assertEquals(storeFilessize, this.store.getStorefiles().size()); 726 assertEquals(0, ((AbstractMemStore) this.store.memstore).getActive().getCellsCount()); 727 } 728 729 private void assertCheck() { 730 assertEquals(expected.size(), result.size()); 731 for (int i = 0; i < expected.size(); i++) { 732 assertEquals(expected.get(i), result.get(i)); 733 } 734 } 735 736 @After 737 public void tearDown() throws Exception { 738 EnvironmentEdgeManagerTestHelper.reset(); 739 if (store != null) { 740 try { 741 store.close(); 742 } catch (IOException e) { 743 } 744 store = null; 745 } 746 if (region != null) { 747 region.close(); 748 region = null; 749 } 750 } 751 752 @AfterClass 753 public static void tearDownAfterClass() throws IOException { 754 TEST_UTIL.cleanupTestDir(); 755 } 756 757 @Test 758 public void testHandleErrorsInFlush() throws Exception { 759 LOG.info("Setting up a faulty file system that cannot write"); 760 761 final Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); 762 User user = User.createUserForTesting(conf, "testhandleerrorsinflush", new String[] { "foo" }); 763 // Inject our faulty LocalFileSystem 764 conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class); 765 user.runAs(new PrivilegedExceptionAction<Object>() { 766 @Override 767 public Object run() throws Exception { 768 // Make sure it worked (above is sensitive to caching details in hadoop core) 769 FileSystem fs = FileSystem.get(conf); 770 assertEquals(FaultyFileSystem.class, fs.getClass()); 771 772 // Initialize region 773 init(name.getMethodName(), conf); 774 775 LOG.info("Adding some data"); 776 store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null); 777 store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null); 778 store.add(new KeyValue(row, family, qf3, 1, (byte[]) null), null); 779 780 LOG.info("Before flush, we should have no files"); 781 782 Collection<StoreFileInfo> files = 783 store.getRegionFileSystem().getStoreFiles(store.getColumnFamilyName()); 784 assertEquals(0, files != null ? files.size() : 0); 785 786 // flush 787 try { 788 LOG.info("Flushing"); 789 flush(1); 790 fail("Didn't bubble up IOE!"); 791 } catch (IOException ioe) { 792 assertTrue(ioe.getMessage().contains("Fault injected")); 793 } 794 795 LOG.info("After failed flush, we should still have no files!"); 796 files = store.getRegionFileSystem().getStoreFiles(store.getColumnFamilyName()); 797 assertEquals(0, files != null ? files.size() : 0); 798 store.getHRegion().getWAL().close(); 799 return null; 800 } 801 }); 802 FileSystem.closeAllForUGI(user.getUGI()); 803 } 804 805 /** 806 * Faulty file system that will fail if you write past its fault position the FIRST TIME only; 807 * thereafter it will succeed. Used by {@link TestHRegion} too. 808 */ 809 static class FaultyFileSystem extends FilterFileSystem { 810 List<SoftReference<FaultyOutputStream>> outStreams = new ArrayList<>(); 811 private long faultPos = 200; 812 AtomicBoolean fault = new AtomicBoolean(true); 813 814 public FaultyFileSystem() { 815 super(new LocalFileSystem()); 816 LOG.info("Creating faulty!"); 817 } 818 819 @Override 820 public FSDataOutputStream create(Path p) throws IOException { 821 return new FaultyOutputStream(super.create(p), faultPos, fault); 822 } 823 824 @Override 825 public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, 826 int bufferSize, short replication, long blockSize, Progressable progress) throws IOException { 827 return new FaultyOutputStream( 828 super.create(f, permission, overwrite, bufferSize, replication, blockSize, progress), 829 faultPos, fault); 830 } 831 832 @Override 833 public FSDataOutputStream createNonRecursive(Path f, boolean overwrite, int bufferSize, 834 short replication, long blockSize, Progressable progress) throws IOException { 835 // Fake it. Call create instead. The default implementation throws an IOE 836 // that this is not supported. 837 return create(f, overwrite, bufferSize, replication, blockSize, progress); 838 } 839 } 840 841 static class FaultyOutputStream extends FSDataOutputStream { 842 volatile long faultPos = Long.MAX_VALUE; 843 private final AtomicBoolean fault; 844 845 public FaultyOutputStream(FSDataOutputStream out, long faultPos, final AtomicBoolean fault) 846 throws IOException { 847 super(out, null); 848 this.faultPos = faultPos; 849 this.fault = fault; 850 } 851 852 @Override 853 public synchronized void write(byte[] buf, int offset, int length) throws IOException { 854 LOG.info("faulty stream write at pos " + getPos()); 855 injectFault(); 856 super.write(buf, offset, length); 857 } 858 859 private void injectFault() throws IOException { 860 if (this.fault.get() && getPos() >= faultPos) { 861 throw new IOException("Fault injected"); 862 } 863 } 864 } 865 866 private static StoreFlushContext flushStore(HStore store, long id) throws IOException { 867 StoreFlushContext storeFlushCtx = store.createFlushContext(id, FlushLifeCycleTracker.DUMMY); 868 storeFlushCtx.prepare(); 869 storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class)); 870 storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); 871 return storeFlushCtx; 872 } 873 874 /** 875 * Generate a list of KeyValues for testing based on given parameters 876 * @return the rows key-value list 877 */ 878 private List<Cell> getKeyValueSet(long[] timestamps, int numRows, byte[] qualifier, 879 byte[] family) { 880 List<Cell> kvList = new ArrayList<>(); 881 for (int i = 1; i <= numRows; i++) { 882 byte[] b = Bytes.toBytes(i); 883 for (long timestamp : timestamps) { 884 kvList.add(new KeyValue(b, family, qualifier, timestamp, b)); 885 } 886 } 887 return kvList; 888 } 889 890 /** 891 * Test to ensure correctness when using Stores with multiple timestamps 892 */ 893 @Test 894 public void testMultipleTimestamps() throws IOException { 895 int numRows = 1; 896 long[] timestamps1 = new long[] { 1, 5, 10, 20 }; 897 long[] timestamps2 = new long[] { 30, 80 }; 898 899 init(this.name.getMethodName()); 900 901 List<Cell> kvList1 = getKeyValueSet(timestamps1, numRows, qf1, family); 902 for (Cell kv : kvList1) { 903 this.store.add(kv, null); 904 } 905 906 flushStore(store, id++); 907 908 List<Cell> kvList2 = getKeyValueSet(timestamps2, numRows, qf1, family); 909 for (Cell kv : kvList2) { 910 this.store.add(kv, null); 911 } 912 913 List<Cell> result; 914 Get get = new Get(Bytes.toBytes(1)); 915 get.addColumn(family, qf1); 916 917 get.setTimeRange(0, 15); 918 result = HBaseTestingUtil.getFromStoreFile(store, get); 919 assertTrue(result.size() > 0); 920 921 get.setTimeRange(40, 90); 922 result = HBaseTestingUtil.getFromStoreFile(store, get); 923 assertTrue(result.size() > 0); 924 925 get.setTimeRange(10, 45); 926 result = HBaseTestingUtil.getFromStoreFile(store, get); 927 assertTrue(result.size() > 0); 928 929 get.setTimeRange(80, 145); 930 result = HBaseTestingUtil.getFromStoreFile(store, get); 931 assertTrue(result.size() > 0); 932 933 get.setTimeRange(1, 2); 934 result = HBaseTestingUtil.getFromStoreFile(store, get); 935 assertTrue(result.size() > 0); 936 937 get.setTimeRange(90, 200); 938 result = HBaseTestingUtil.getFromStoreFile(store, get); 939 assertTrue(result.size() == 0); 940 } 941 942 /** 943 * Test for HBASE-3492 - Test split on empty colfam (no store files). 944 * @throws IOException When the IO operations fail. 945 */ 946 @Test 947 public void testSplitWithEmptyColFam() throws IOException { 948 init(this.name.getMethodName()); 949 assertFalse(store.getSplitPoint().isPresent()); 950 } 951 952 @Test 953 public void testStoreUsesConfigurationFromHcdAndHtd() throws Exception { 954 final String CONFIG_KEY = "hbase.regionserver.thread.compaction.throttle"; 955 long anyValue = 10; 956 957 // We'll check that it uses correct config and propagates it appropriately by going thru 958 // the simplest "real" path I can find - "throttleCompaction", which just checks whether 959 // a number we pass in is higher than some config value, inside compactionPolicy. 960 Configuration conf = HBaseConfiguration.create(); 961 conf.setLong(CONFIG_KEY, anyValue); 962 init(name.getMethodName() + "-xml", conf); 963 assertTrue(store.throttleCompaction(anyValue + 1)); 964 assertFalse(store.throttleCompaction(anyValue)); 965 966 // HTD overrides XML. 967 --anyValue; 968 init( 969 name.getMethodName() + "-htd", conf, TableDescriptorBuilder 970 .newBuilder(TableName.valueOf(table)).setValue(CONFIG_KEY, Long.toString(anyValue)), 971 ColumnFamilyDescriptorBuilder.of(family)); 972 assertTrue(store.throttleCompaction(anyValue + 1)); 973 assertFalse(store.throttleCompaction(anyValue)); 974 975 // HCD overrides them both. 976 --anyValue; 977 init(name.getMethodName() + "-hcd", conf, 978 TableDescriptorBuilder.newBuilder(TableName.valueOf(table)).setValue(CONFIG_KEY, 979 Long.toString(anyValue)), 980 ColumnFamilyDescriptorBuilder.newBuilder(family).setValue(CONFIG_KEY, Long.toString(anyValue)) 981 .build()); 982 assertTrue(store.throttleCompaction(anyValue + 1)); 983 assertFalse(store.throttleCompaction(anyValue)); 984 } 985 986 public static class DummyStoreEngine extends DefaultStoreEngine { 987 public static DefaultCompactor lastCreatedCompactor = null; 988 989 @Override 990 protected void createComponents(Configuration conf, HStore store, CellComparator comparator) 991 throws IOException { 992 super.createComponents(conf, store, comparator); 993 lastCreatedCompactor = this.compactor; 994 } 995 } 996 997 @Test 998 public void testStoreUsesSearchEngineOverride() throws Exception { 999 Configuration conf = HBaseConfiguration.create(); 1000 conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DummyStoreEngine.class.getName()); 1001 init(this.name.getMethodName(), conf); 1002 assertEquals(DummyStoreEngine.lastCreatedCompactor, this.store.storeEngine.getCompactor()); 1003 } 1004 1005 private void addStoreFile() throws IOException { 1006 HStoreFile f = this.store.getStorefiles().iterator().next(); 1007 Path storedir = f.getPath().getParent(); 1008 long seqid = this.store.getMaxSequenceId().orElse(0L); 1009 Configuration c = TEST_UTIL.getConfiguration(); 1010 FileSystem fs = FileSystem.get(c); 1011 HFileContext fileContext = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).build(); 1012 StoreFileWriter w = new StoreFileWriter.Builder(c, new CacheConfig(c), fs) 1013 .withOutputDir(storedir).withFileContext(fileContext).build(); 1014 w.appendMetadata(seqid + 1, false); 1015 w.close(); 1016 LOG.info("Added store file:" + w.getPath()); 1017 } 1018 1019 private void archiveStoreFile(int index) throws IOException { 1020 Collection<HStoreFile> files = this.store.getStorefiles(); 1021 HStoreFile sf = null; 1022 Iterator<HStoreFile> it = files.iterator(); 1023 for (int i = 0; i <= index; i++) { 1024 sf = it.next(); 1025 } 1026 store.getRegionFileSystem().removeStoreFiles(store.getColumnFamilyName(), 1027 Lists.newArrayList(sf)); 1028 } 1029 1030 private void closeCompactedFile(int index) throws IOException { 1031 Collection<HStoreFile> files = 1032 this.store.getStoreEngine().getStoreFileManager().getCompactedfiles(); 1033 if (files.size() > 0) { 1034 HStoreFile sf = null; 1035 Iterator<HStoreFile> it = files.iterator(); 1036 for (int i = 0; i <= index; i++) { 1037 sf = it.next(); 1038 } 1039 sf.closeStoreFile(true); 1040 store.getStoreEngine().getStoreFileManager() 1041 .removeCompactedFiles(Collections.singletonList(sf)); 1042 } 1043 } 1044 1045 @Test 1046 public void testRefreshStoreFiles() throws Exception { 1047 init(name.getMethodName()); 1048 1049 assertEquals(0, this.store.getStorefilesCount()); 1050 1051 // Test refreshing store files when no store files are there 1052 store.refreshStoreFiles(); 1053 assertEquals(0, this.store.getStorefilesCount()); 1054 1055 // add some data, flush 1056 this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null); 1057 flush(1); 1058 assertEquals(1, this.store.getStorefilesCount()); 1059 1060 // add one more file 1061 addStoreFile(); 1062 1063 assertEquals(1, this.store.getStorefilesCount()); 1064 store.refreshStoreFiles(); 1065 assertEquals(2, this.store.getStorefilesCount()); 1066 1067 // add three more files 1068 addStoreFile(); 1069 addStoreFile(); 1070 addStoreFile(); 1071 1072 assertEquals(2, this.store.getStorefilesCount()); 1073 store.refreshStoreFiles(); 1074 assertEquals(5, this.store.getStorefilesCount()); 1075 1076 closeCompactedFile(0); 1077 archiveStoreFile(0); 1078 1079 assertEquals(5, this.store.getStorefilesCount()); 1080 store.refreshStoreFiles(); 1081 assertEquals(4, this.store.getStorefilesCount()); 1082 1083 archiveStoreFile(0); 1084 archiveStoreFile(1); 1085 archiveStoreFile(2); 1086 1087 assertEquals(4, this.store.getStorefilesCount()); 1088 store.refreshStoreFiles(); 1089 assertEquals(1, this.store.getStorefilesCount()); 1090 1091 archiveStoreFile(0); 1092 store.refreshStoreFiles(); 1093 assertEquals(0, this.store.getStorefilesCount()); 1094 } 1095 1096 @Test 1097 public void testRefreshStoreFilesNotChanged() throws IOException { 1098 init(name.getMethodName()); 1099 1100 assertEquals(0, this.store.getStorefilesCount()); 1101 1102 // add some data, flush 1103 this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null); 1104 flush(1); 1105 // add one more file 1106 addStoreFile(); 1107 1108 StoreEngine<?, ?, ?, ?> spiedStoreEngine = spy(store.getStoreEngine()); 1109 1110 // call first time after files changed 1111 spiedStoreEngine.refreshStoreFiles(); 1112 assertEquals(2, this.store.getStorefilesCount()); 1113 verify(spiedStoreEngine, times(1)).replaceStoreFiles(any(), any(), any(), any()); 1114 1115 // call second time 1116 spiedStoreEngine.refreshStoreFiles(); 1117 1118 // ensure that replaceStoreFiles is not called, i.e, the times does not change, if files are not 1119 // refreshed, 1120 verify(spiedStoreEngine, times(1)).replaceStoreFiles(any(), any(), any(), any()); 1121 } 1122 1123 @Test 1124 public void testScanWithCompactionAfterFlush() throws Exception { 1125 TEST_UTIL.getConfiguration().set(DEFAULT_COMPACTION_POLICY_CLASS_KEY, 1126 EverythingPolicy.class.getName()); 1127 init(name.getMethodName()); 1128 1129 assertEquals(0, this.store.getStorefilesCount()); 1130 1131 KeyValue kv = new KeyValue(row, family, qf1, 1, (byte[]) null); 1132 // add some data, flush 1133 this.store.add(kv, null); 1134 flush(1); 1135 kv = new KeyValue(row, family, qf2, 1, (byte[]) null); 1136 // add some data, flush 1137 this.store.add(kv, null); 1138 flush(2); 1139 kv = new KeyValue(row, family, qf3, 1, (byte[]) null); 1140 // add some data, flush 1141 this.store.add(kv, null); 1142 flush(3); 1143 1144 ExecutorService service = Executors.newFixedThreadPool(2); 1145 1146 Scan scan = new Scan(new Get(row)); 1147 Future<KeyValueScanner> scanFuture = service.submit(() -> { 1148 try { 1149 LOG.info(">>>> creating scanner"); 1150 return this.store.createScanner(scan, 1151 new ScanInfo(HBaseConfiguration.create(), 1152 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(4).build(), 1153 Long.MAX_VALUE, 0, CellComparator.getInstance()), 1154 scan.getFamilyMap().get(store.getColumnFamilyDescriptor().getName()), 0); 1155 } catch (IOException e) { 1156 e.printStackTrace(); 1157 return null; 1158 } 1159 }); 1160 Future compactFuture = service.submit(() -> { 1161 try { 1162 LOG.info(">>>>>> starting compaction"); 1163 Optional<CompactionContext> opCompaction = this.store.requestCompaction(); 1164 assertTrue(opCompaction.isPresent()); 1165 store.compact(opCompaction.get(), new NoLimitThroughputController(), User.getCurrent()); 1166 LOG.info(">>>>>> Compaction is finished"); 1167 this.store.closeAndArchiveCompactedFiles(); 1168 LOG.info(">>>>>> Compacted files deleted"); 1169 } catch (IOException e) { 1170 e.printStackTrace(); 1171 } 1172 }); 1173 1174 KeyValueScanner kvs = scanFuture.get(); 1175 compactFuture.get(); 1176 ((StoreScanner) kvs).currentScanners.forEach(s -> { 1177 if (s instanceof StoreFileScanner) { 1178 assertEquals(1, ((StoreFileScanner) s).getReader().getRefCount()); 1179 } 1180 }); 1181 kvs.seek(kv); 1182 service.shutdownNow(); 1183 } 1184 1185 private long countMemStoreScanner(StoreScanner scanner) { 1186 if (scanner.currentScanners == null) { 1187 return 0; 1188 } 1189 return scanner.currentScanners.stream().filter(s -> !s.isFileScanner()).count(); 1190 } 1191 1192 @Test 1193 public void testNumberOfMemStoreScannersAfterFlush() throws IOException { 1194 long seqId = 100; 1195 long timestamp = EnvironmentEdgeManager.currentTime(); 1196 Cell cell0 = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family) 1197 .setQualifier(qf1).setTimestamp(timestamp).setType(Cell.Type.Put).setValue(qf1).build(); 1198 PrivateCellUtil.setSequenceId(cell0, seqId); 1199 testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Collections.emptyList()); 1200 1201 Cell cell1 = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family) 1202 .setQualifier(qf2).setTimestamp(timestamp).setType(Cell.Type.Put).setValue(qf1).build(); 1203 PrivateCellUtil.setSequenceId(cell1, seqId); 1204 testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Arrays.asList(cell1)); 1205 1206 seqId = 101; 1207 timestamp = EnvironmentEdgeManager.currentTime(); 1208 Cell cell2 = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row2).setFamily(family) 1209 .setQualifier(qf2).setTimestamp(timestamp).setType(Cell.Type.Put).setValue(qf1).build(); 1210 PrivateCellUtil.setSequenceId(cell2, seqId); 1211 testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Arrays.asList(cell1, cell2)); 1212 } 1213 1214 private void testNumberOfMemStoreScannersAfterFlush(List<Cell> inputCellsBeforeSnapshot, 1215 List<Cell> inputCellsAfterSnapshot) throws IOException { 1216 init(this.name.getMethodName() + "-" + inputCellsBeforeSnapshot.size()); 1217 TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); 1218 long seqId = Long.MIN_VALUE; 1219 for (Cell c : inputCellsBeforeSnapshot) { 1220 quals.add(CellUtil.cloneQualifier(c)); 1221 seqId = Math.max(seqId, c.getSequenceId()); 1222 } 1223 for (Cell c : inputCellsAfterSnapshot) { 1224 quals.add(CellUtil.cloneQualifier(c)); 1225 seqId = Math.max(seqId, c.getSequenceId()); 1226 } 1227 inputCellsBeforeSnapshot.forEach(c -> store.add(c, null)); 1228 StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY); 1229 storeFlushCtx.prepare(); 1230 inputCellsAfterSnapshot.forEach(c -> store.add(c, null)); 1231 int numberOfMemScannersBeforeFlush = inputCellsAfterSnapshot.isEmpty() ? 1 : 2; 1232 try (StoreScanner s = (StoreScanner) store.getScanner(new Scan(), quals, seqId)) { 1233 // snapshot + active (if inputCellsAfterSnapshot isn't empty) 1234 assertEquals(numberOfMemScannersBeforeFlush, countMemStoreScanner(s)); 1235 storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class)); 1236 storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); 1237 // snapshot has no data after flush 1238 int numberOfMemScannersAfterFlush = inputCellsAfterSnapshot.isEmpty() ? 0 : 1; 1239 boolean more; 1240 int cellCount = 0; 1241 do { 1242 List<Cell> cells = new ArrayList<>(); 1243 more = s.next(cells); 1244 cellCount += cells.size(); 1245 assertEquals(more ? numberOfMemScannersAfterFlush : 0, countMemStoreScanner(s)); 1246 } while (more); 1247 assertEquals( 1248 "The number of cells added before snapshot is " + inputCellsBeforeSnapshot.size() 1249 + ", The number of cells added after snapshot is " + inputCellsAfterSnapshot.size(), 1250 inputCellsBeforeSnapshot.size() + inputCellsAfterSnapshot.size(), cellCount); 1251 // the current scanners is cleared 1252 assertEquals(0, countMemStoreScanner(s)); 1253 } 1254 } 1255 1256 private Cell createCell(byte[] qualifier, long ts, long sequenceId, byte[] value) 1257 throws IOException { 1258 return createCell(row, qualifier, ts, sequenceId, value); 1259 } 1260 1261 private Cell createCell(byte[] row, byte[] qualifier, long ts, long sequenceId, byte[] value) 1262 throws IOException { 1263 Cell c = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family) 1264 .setQualifier(qualifier).setTimestamp(ts).setType(Cell.Type.Put).setValue(value).build(); 1265 PrivateCellUtil.setSequenceId(c, sequenceId); 1266 return c; 1267 } 1268 1269 @Test 1270 public void testFlushBeforeCompletingScanWoFilter() throws IOException, InterruptedException { 1271 final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false); 1272 final int expectedSize = 3; 1273 testFlushBeforeCompletingScan(new MyListHook() { 1274 @Override 1275 public void hook(int currentSize) { 1276 if (currentSize == expectedSize - 1) { 1277 try { 1278 flushStore(store, id++); 1279 timeToGoNextRow.set(true); 1280 } catch (IOException e) { 1281 throw new RuntimeException(e); 1282 } 1283 } 1284 } 1285 }, new FilterBase() { 1286 @Override 1287 public Filter.ReturnCode filterCell(final Cell c) throws IOException { 1288 return ReturnCode.INCLUDE; 1289 } 1290 }, expectedSize); 1291 } 1292 1293 @Test 1294 public void testFlushBeforeCompletingScanWithFilter() throws IOException, InterruptedException { 1295 final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false); 1296 final int expectedSize = 2; 1297 testFlushBeforeCompletingScan(new MyListHook() { 1298 @Override 1299 public void hook(int currentSize) { 1300 if (currentSize == expectedSize - 1) { 1301 try { 1302 flushStore(store, id++); 1303 timeToGoNextRow.set(true); 1304 } catch (IOException e) { 1305 throw new RuntimeException(e); 1306 } 1307 } 1308 } 1309 }, new FilterBase() { 1310 @Override 1311 public Filter.ReturnCode filterCell(final Cell c) throws IOException { 1312 if (timeToGoNextRow.get()) { 1313 timeToGoNextRow.set(false); 1314 return ReturnCode.NEXT_ROW; 1315 } else { 1316 return ReturnCode.INCLUDE; 1317 } 1318 } 1319 }, expectedSize); 1320 } 1321 1322 @Test 1323 public void testFlushBeforeCompletingScanWithFilterHint() 1324 throws IOException, InterruptedException { 1325 final AtomicBoolean timeToGetHint = new AtomicBoolean(false); 1326 final int expectedSize = 2; 1327 testFlushBeforeCompletingScan(new MyListHook() { 1328 @Override 1329 public void hook(int currentSize) { 1330 if (currentSize == expectedSize - 1) { 1331 try { 1332 flushStore(store, id++); 1333 timeToGetHint.set(true); 1334 } catch (IOException e) { 1335 throw new RuntimeException(e); 1336 } 1337 } 1338 } 1339 }, new FilterBase() { 1340 @Override 1341 public Filter.ReturnCode filterCell(final Cell c) throws IOException { 1342 if (timeToGetHint.get()) { 1343 timeToGetHint.set(false); 1344 return Filter.ReturnCode.SEEK_NEXT_USING_HINT; 1345 } else { 1346 return Filter.ReturnCode.INCLUDE; 1347 } 1348 } 1349 1350 @Override 1351 public Cell getNextCellHint(Cell currentCell) throws IOException { 1352 return currentCell; 1353 } 1354 }, expectedSize); 1355 } 1356 1357 private void testFlushBeforeCompletingScan(MyListHook hook, Filter filter, int expectedSize) 1358 throws IOException, InterruptedException { 1359 Configuration conf = HBaseConfiguration.create(); 1360 byte[] r0 = Bytes.toBytes("row0"); 1361 byte[] r1 = Bytes.toBytes("row1"); 1362 byte[] r2 = Bytes.toBytes("row2"); 1363 byte[] value0 = Bytes.toBytes("value0"); 1364 byte[] value1 = Bytes.toBytes("value1"); 1365 byte[] value2 = Bytes.toBytes("value2"); 1366 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 1367 long ts = EnvironmentEdgeManager.currentTime(); 1368 long seqId = 100; 1369 init(name.getMethodName(), conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), 1370 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(1).build(), 1371 new MyStoreHook() { 1372 @Override 1373 public long getSmallestReadPoint(HStore store) { 1374 return seqId + 3; 1375 } 1376 }); 1377 // The cells having the value0 won't be flushed to disk because the value of max version is 1 1378 store.add(createCell(r0, qf1, ts, seqId, value0), memStoreSizing); 1379 store.add(createCell(r0, qf2, ts, seqId, value0), memStoreSizing); 1380 store.add(createCell(r0, qf3, ts, seqId, value0), memStoreSizing); 1381 store.add(createCell(r1, qf1, ts + 1, seqId + 1, value1), memStoreSizing); 1382 store.add(createCell(r1, qf2, ts + 1, seqId + 1, value1), memStoreSizing); 1383 store.add(createCell(r1, qf3, ts + 1, seqId + 1, value1), memStoreSizing); 1384 store.add(createCell(r2, qf1, ts + 2, seqId + 2, value2), memStoreSizing); 1385 store.add(createCell(r2, qf2, ts + 2, seqId + 2, value2), memStoreSizing); 1386 store.add(createCell(r2, qf3, ts + 2, seqId + 2, value2), memStoreSizing); 1387 store.add(createCell(r1, qf1, ts + 3, seqId + 3, value1), memStoreSizing); 1388 store.add(createCell(r1, qf2, ts + 3, seqId + 3, value1), memStoreSizing); 1389 store.add(createCell(r1, qf3, ts + 3, seqId + 3, value1), memStoreSizing); 1390 List<Cell> myList = new MyList<>(hook); 1391 Scan scan = new Scan().withStartRow(r1).setFilter(filter); 1392 try (InternalScanner scanner = (InternalScanner) store.getScanner(scan, null, seqId + 3)) { 1393 // r1 1394 scanner.next(myList); 1395 assertEquals(expectedSize, myList.size()); 1396 for (Cell c : myList) { 1397 byte[] actualValue = CellUtil.cloneValue(c); 1398 assertTrue("expected:" + Bytes.toStringBinary(value1) + ", actual:" 1399 + Bytes.toStringBinary(actualValue), Bytes.equals(actualValue, value1)); 1400 } 1401 List<Cell> normalList = new ArrayList<>(3); 1402 // r2 1403 scanner.next(normalList); 1404 assertEquals(3, normalList.size()); 1405 for (Cell c : normalList) { 1406 byte[] actualValue = CellUtil.cloneValue(c); 1407 assertTrue("expected:" + Bytes.toStringBinary(value2) + ", actual:" 1408 + Bytes.toStringBinary(actualValue), Bytes.equals(actualValue, value2)); 1409 } 1410 } 1411 } 1412 1413 @Test 1414 public void testCreateScannerAndSnapshotConcurrently() throws IOException, InterruptedException { 1415 Configuration conf = HBaseConfiguration.create(); 1416 conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore.class.getName()); 1417 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) 1418 .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); 1419 byte[] value = Bytes.toBytes("value"); 1420 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 1421 long ts = EnvironmentEdgeManager.currentTime(); 1422 long seqId = 100; 1423 // older data whihc shouldn't be "seen" by client 1424 store.add(createCell(qf1, ts, seqId, value), memStoreSizing); 1425 store.add(createCell(qf2, ts, seqId, value), memStoreSizing); 1426 store.add(createCell(qf3, ts, seqId, value), memStoreSizing); 1427 TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); 1428 quals.add(qf1); 1429 quals.add(qf2); 1430 quals.add(qf3); 1431 StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY); 1432 MyCompactingMemStore.START_TEST.set(true); 1433 Runnable flush = () -> { 1434 // this is blocked until we create first scanner from pipeline and snapshot -- phase (1/5) 1435 // recreate the active memstore -- phase (4/5) 1436 storeFlushCtx.prepare(); 1437 }; 1438 ExecutorService service = Executors.newSingleThreadExecutor(); 1439 service.execute(flush); 1440 // we get scanner from pipeline and snapshot but they are empty. -- phase (2/5) 1441 // this is blocked until we recreate the active memstore -- phase (3/5) 1442 // we get scanner from active memstore but it is empty -- phase (5/5) 1443 InternalScanner scanner = 1444 (InternalScanner) store.getScanner(new Scan(new Get(row)), quals, seqId + 1); 1445 service.shutdown(); 1446 service.awaitTermination(20, TimeUnit.SECONDS); 1447 try { 1448 try { 1449 List<Cell> results = new ArrayList<>(); 1450 scanner.next(results); 1451 assertEquals(3, results.size()); 1452 for (Cell c : results) { 1453 byte[] actualValue = CellUtil.cloneValue(c); 1454 assertTrue("expected:" + Bytes.toStringBinary(value) + ", actual:" 1455 + Bytes.toStringBinary(actualValue), Bytes.equals(actualValue, value)); 1456 } 1457 } finally { 1458 scanner.close(); 1459 } 1460 } finally { 1461 MyCompactingMemStore.START_TEST.set(false); 1462 storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class)); 1463 storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); 1464 } 1465 } 1466 1467 @Test 1468 public void testScanWithDoubleFlush() throws IOException { 1469 Configuration conf = HBaseConfiguration.create(); 1470 // Initialize region 1471 MyStore myStore = initMyStore(name.getMethodName(), conf, new MyStoreHook() { 1472 @Override 1473 public void getScanners(MyStore store) throws IOException { 1474 final long tmpId = id++; 1475 ExecutorService s = Executors.newSingleThreadExecutor(); 1476 s.execute(() -> { 1477 try { 1478 // flush the store before storescanner updates the scanners from store. 1479 // The current data will be flushed into files, and the memstore will 1480 // be clear. 1481 // -- phase (4/4) 1482 flushStore(store, tmpId); 1483 } catch (IOException ex) { 1484 throw new RuntimeException(ex); 1485 } 1486 }); 1487 s.shutdown(); 1488 try { 1489 // wait for the flush, the thread will be blocked in HStore#notifyChangedReadersObservers. 1490 s.awaitTermination(3, TimeUnit.SECONDS); 1491 } catch (InterruptedException ex) { 1492 } 1493 } 1494 }); 1495 byte[] oldValue = Bytes.toBytes("oldValue"); 1496 byte[] currentValue = Bytes.toBytes("currentValue"); 1497 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 1498 long ts = EnvironmentEdgeManager.currentTime(); 1499 long seqId = 100; 1500 // older data whihc shouldn't be "seen" by client 1501 myStore.add(createCell(qf1, ts, seqId, oldValue), memStoreSizing); 1502 myStore.add(createCell(qf2, ts, seqId, oldValue), memStoreSizing); 1503 myStore.add(createCell(qf3, ts, seqId, oldValue), memStoreSizing); 1504 long snapshotId = id++; 1505 // push older data into snapshot -- phase (1/4) 1506 StoreFlushContext storeFlushCtx = 1507 store.createFlushContext(snapshotId, FlushLifeCycleTracker.DUMMY); 1508 storeFlushCtx.prepare(); 1509 1510 // insert current data into active -- phase (2/4) 1511 myStore.add(createCell(qf1, ts + 1, seqId + 1, currentValue), memStoreSizing); 1512 myStore.add(createCell(qf2, ts + 1, seqId + 1, currentValue), memStoreSizing); 1513 myStore.add(createCell(qf3, ts + 1, seqId + 1, currentValue), memStoreSizing); 1514 TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); 1515 quals.add(qf1); 1516 quals.add(qf2); 1517 quals.add(qf3); 1518 try (InternalScanner scanner = 1519 (InternalScanner) myStore.getScanner(new Scan(new Get(row)), quals, seqId + 1)) { 1520 // complete the flush -- phase (3/4) 1521 storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class)); 1522 storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); 1523 1524 List<Cell> results = new ArrayList<>(); 1525 scanner.next(results); 1526 assertEquals(3, results.size()); 1527 for (Cell c : results) { 1528 byte[] actualValue = CellUtil.cloneValue(c); 1529 assertTrue("expected:" + Bytes.toStringBinary(currentValue) + ", actual:" 1530 + Bytes.toStringBinary(actualValue), Bytes.equals(actualValue, currentValue)); 1531 } 1532 } 1533 } 1534 1535 /** 1536 * This test is for HBASE-27519, when the {@link StoreScanner} is scanning,the Flush and the 1537 * Compaction execute concurrently and theCcompaction compact and archive the flushed 1538 * {@link HStoreFile} which is used by {@link StoreScanner#updateReaders}.Before 1539 * HBASE-27519,{@link StoreScanner.updateReaders} would throw {@link FileNotFoundException}. 1540 */ 1541 @Test 1542 public void testStoreScannerUpdateReadersWhenFlushAndCompactConcurrently() throws IOException { 1543 Configuration conf = HBaseConfiguration.create(); 1544 conf.setBoolean(WALFactory.WAL_ENABLED, false); 1545 conf.set(DEFAULT_COMPACTION_POLICY_CLASS_KEY, EverythingPolicy.class.getName()); 1546 byte[] r0 = Bytes.toBytes("row0"); 1547 byte[] r1 = Bytes.toBytes("row1"); 1548 final CyclicBarrier cyclicBarrier = new CyclicBarrier(2); 1549 final AtomicBoolean shouldWaitRef = new AtomicBoolean(false); 1550 // Initialize region 1551 final MyStore myStore = initMyStore(name.getMethodName(), conf, new MyStoreHook() { 1552 @Override 1553 public void getScanners(MyStore store) throws IOException { 1554 try { 1555 // Here this method is called by StoreScanner.updateReaders which is invoked by the 1556 // following TestHStore.flushStore 1557 if (shouldWaitRef.get()) { 1558 // wait the following compaction Task start 1559 cyclicBarrier.await(); 1560 // wait the following HStore.closeAndArchiveCompactedFiles end. 1561 cyclicBarrier.await(); 1562 } 1563 } catch (BrokenBarrierException | InterruptedException e) { 1564 throw new RuntimeException(e); 1565 } 1566 } 1567 }); 1568 1569 final AtomicReference<Throwable> compactionExceptionRef = new AtomicReference<Throwable>(null); 1570 Runnable compactionTask = () -> { 1571 try { 1572 // Only when the StoreScanner.updateReaders invoked by TestHStore.flushStore prepares for 1573 // entering the MyStore.getScanners, compactionTask could start. 1574 cyclicBarrier.await(); 1575 region.compactStore(family, new NoLimitThroughputController()); 1576 myStore.closeAndArchiveCompactedFiles(); 1577 // Notify StoreScanner.updateReaders could enter MyStore.getScanners. 1578 cyclicBarrier.await(); 1579 } catch (Throwable e) { 1580 compactionExceptionRef.set(e); 1581 } 1582 }; 1583 1584 long ts = EnvironmentEdgeManager.currentTime(); 1585 long seqId = 100; 1586 byte[] value = Bytes.toBytes("value"); 1587 // older data whihc shouldn't be "seen" by client 1588 myStore.add(createCell(r0, qf1, ts, seqId, value), null); 1589 flushStore(myStore, id++); 1590 myStore.add(createCell(r0, qf2, ts, seqId, value), null); 1591 flushStore(myStore, id++); 1592 myStore.add(createCell(r0, qf3, ts, seqId, value), null); 1593 TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); 1594 quals.add(qf1); 1595 quals.add(qf2); 1596 quals.add(qf3); 1597 1598 myStore.add(createCell(r1, qf1, ts, seqId, value), null); 1599 myStore.add(createCell(r1, qf2, ts, seqId, value), null); 1600 myStore.add(createCell(r1, qf3, ts, seqId, value), null); 1601 1602 Thread.currentThread() 1603 .setName("testStoreScannerUpdateReadersWhenFlushAndCompactConcurrently thread"); 1604 Scan scan = new Scan(); 1605 scan.withStartRow(r0, true); 1606 try (InternalScanner scanner = (InternalScanner) myStore.getScanner(scan, quals, seqId)) { 1607 List<Cell> results = new MyList<>(size -> { 1608 switch (size) { 1609 case 1: 1610 shouldWaitRef.set(true); 1611 Thread thread = new Thread(compactionTask); 1612 thread.setName("MyCompacting Thread."); 1613 thread.start(); 1614 try { 1615 flushStore(myStore, id++); 1616 thread.join(); 1617 } catch (IOException | InterruptedException e) { 1618 throw new RuntimeException(e); 1619 } 1620 shouldWaitRef.set(false); 1621 break; 1622 default: 1623 break; 1624 } 1625 }); 1626 // Before HBASE-27519, here would throw java.io.FileNotFoundException because the storeFile 1627 // which used by StoreScanner.updateReaders is deleted by compactionTask. 1628 scanner.next(results); 1629 // The results is r0 row cells. 1630 assertEquals(3, results.size()); 1631 assertTrue(compactionExceptionRef.get() == null); 1632 } 1633 } 1634 1635 @Test 1636 public void testReclaimChunkWhenScaning() throws IOException { 1637 init("testReclaimChunkWhenScaning"); 1638 long ts = EnvironmentEdgeManager.currentTime(); 1639 long seqId = 100; 1640 byte[] value = Bytes.toBytes("value"); 1641 // older data whihc shouldn't be "seen" by client 1642 store.add(createCell(qf1, ts, seqId, value), null); 1643 store.add(createCell(qf2, ts, seqId, value), null); 1644 store.add(createCell(qf3, ts, seqId, value), null); 1645 TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); 1646 quals.add(qf1); 1647 quals.add(qf2); 1648 quals.add(qf3); 1649 try (InternalScanner scanner = 1650 (InternalScanner) store.getScanner(new Scan(new Get(row)), quals, seqId)) { 1651 List<Cell> results = new MyList<>(size -> { 1652 switch (size) { 1653 // 1) we get the first cell (qf1) 1654 // 2) flush the data to have StoreScanner update inner scanners 1655 // 3) the chunk will be reclaimed after updaing 1656 case 1: 1657 try { 1658 flushStore(store, id++); 1659 } catch (IOException e) { 1660 throw new RuntimeException(e); 1661 } 1662 break; 1663 // 1) we get the second cell (qf2) 1664 // 2) add some cell to fill some byte into the chunk (we have only one chunk) 1665 case 2: 1666 try { 1667 byte[] newValue = Bytes.toBytes("newValue"); 1668 // older data whihc shouldn't be "seen" by client 1669 store.add(createCell(qf1, ts + 1, seqId + 1, newValue), null); 1670 store.add(createCell(qf2, ts + 1, seqId + 1, newValue), null); 1671 store.add(createCell(qf3, ts + 1, seqId + 1, newValue), null); 1672 } catch (IOException e) { 1673 throw new RuntimeException(e); 1674 } 1675 break; 1676 default: 1677 break; 1678 } 1679 }); 1680 scanner.next(results); 1681 assertEquals(3, results.size()); 1682 for (Cell c : results) { 1683 byte[] actualValue = CellUtil.cloneValue(c); 1684 assertTrue("expected:" + Bytes.toStringBinary(value) + ", actual:" 1685 + Bytes.toStringBinary(actualValue), Bytes.equals(actualValue, value)); 1686 } 1687 } 1688 } 1689 1690 /** 1691 * If there are two running InMemoryFlushRunnable, the later InMemoryFlushRunnable may change the 1692 * versionedList. And the first InMemoryFlushRunnable will use the chagned versionedList to remove 1693 * the corresponding segments. In short, there will be some segements which isn't in merge are 1694 * removed. 1695 */ 1696 @Test 1697 public void testRunDoubleMemStoreCompactors() throws IOException, InterruptedException { 1698 int flushSize = 500; 1699 Configuration conf = HBaseConfiguration.create(); 1700 conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStoreWithCustomCompactor.class.getName()); 1701 conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.25); 1702 MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.set(0); 1703 conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushSize)); 1704 // Set the lower threshold to invoke the "MERGE" policy 1705 conf.set(MemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY, String.valueOf(0)); 1706 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) 1707 .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); 1708 byte[] value = Bytes.toBytes("thisisavarylargevalue"); 1709 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 1710 long ts = EnvironmentEdgeManager.currentTime(); 1711 long seqId = 100; 1712 // older data whihc shouldn't be "seen" by client 1713 store.add(createCell(qf1, ts, seqId, value), memStoreSizing); 1714 store.add(createCell(qf2, ts, seqId, value), memStoreSizing); 1715 store.add(createCell(qf3, ts, seqId, value), memStoreSizing); 1716 assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get()); 1717 StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY); 1718 storeFlushCtx.prepare(); 1719 // This shouldn't invoke another in-memory flush because the first compactor thread 1720 // hasn't accomplished the in-memory compaction. 1721 store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing); 1722 store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing); 1723 store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing); 1724 assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get()); 1725 // okay. Let the compaction be completed 1726 MyMemStoreCompactor.START_COMPACTOR_LATCH.countDown(); 1727 CompactingMemStore mem = (CompactingMemStore) ((HStore) store).memstore; 1728 while (mem.isMemStoreFlushingInMemory()) { 1729 TimeUnit.SECONDS.sleep(1); 1730 } 1731 // This should invoke another in-memory flush. 1732 store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing); 1733 store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing); 1734 store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing); 1735 assertEquals(2, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get()); 1736 conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1737 String.valueOf(TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE)); 1738 storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class)); 1739 storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); 1740 } 1741 1742 @Test 1743 public void testAge() throws IOException { 1744 long currentTime = EnvironmentEdgeManager.currentTime(); 1745 ManualEnvironmentEdge edge = new ManualEnvironmentEdge(); 1746 edge.setValue(currentTime); 1747 EnvironmentEdgeManager.injectEdge(edge); 1748 Configuration conf = TEST_UTIL.getConfiguration(); 1749 ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.of(family); 1750 initHRegion(name.getMethodName(), conf, 1751 TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), hcd, null, false); 1752 HStore store = new HStore(region, hcd, conf, false) { 1753 1754 @Override 1755 protected StoreEngine<?, ?, ?, ?> createStoreEngine(HStore store, Configuration conf, 1756 CellComparator kvComparator) throws IOException { 1757 List<HStoreFile> storefiles = 1758 Arrays.asList(mockStoreFile(currentTime - 10), mockStoreFile(currentTime - 100), 1759 mockStoreFile(currentTime - 1000), mockStoreFile(currentTime - 10000)); 1760 StoreFileManager sfm = mock(StoreFileManager.class); 1761 when(sfm.getStorefiles()).thenReturn(storefiles); 1762 StoreEngine<?, ?, ?, ?> storeEngine = mock(StoreEngine.class); 1763 when(storeEngine.getStoreFileManager()).thenReturn(sfm); 1764 return storeEngine; 1765 } 1766 }; 1767 assertEquals(10L, store.getMinStoreFileAge().getAsLong()); 1768 assertEquals(10000L, store.getMaxStoreFileAge().getAsLong()); 1769 assertEquals((10 + 100 + 1000 + 10000) / 4.0, store.getAvgStoreFileAge().getAsDouble(), 1E-4); 1770 } 1771 1772 private HStoreFile mockStoreFile(long createdTime) { 1773 StoreFileInfo info = mock(StoreFileInfo.class); 1774 when(info.getCreatedTimestamp()).thenReturn(createdTime); 1775 HStoreFile sf = mock(HStoreFile.class); 1776 when(sf.getReader()).thenReturn(mock(StoreFileReader.class)); 1777 when(sf.isHFile()).thenReturn(true); 1778 when(sf.getFileInfo()).thenReturn(info); 1779 return sf; 1780 } 1781 1782 private MyStore initMyStore(String methodName, Configuration conf, MyStoreHook hook) 1783 throws IOException { 1784 return (MyStore) init(methodName, conf, 1785 TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), 1786 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(5).build(), hook); 1787 } 1788 1789 private static class MyStore extends HStore { 1790 private final MyStoreHook hook; 1791 1792 MyStore(final HRegion region, final ColumnFamilyDescriptor family, 1793 final Configuration confParam, MyStoreHook hook, boolean switchToPread) throws IOException { 1794 super(region, family, confParam, false); 1795 this.hook = hook; 1796 } 1797 1798 @Override 1799 public List<KeyValueScanner> getScanners(List<HStoreFile> files, boolean cacheBlocks, 1800 boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, 1801 boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt, 1802 boolean includeMemstoreScanner) throws IOException { 1803 hook.getScanners(this); 1804 return super.getScanners(files, cacheBlocks, usePread, isCompaction, matcher, startRow, true, 1805 stopRow, false, readPt, includeMemstoreScanner); 1806 } 1807 1808 @Override 1809 public long getSmallestReadPoint() { 1810 return hook.getSmallestReadPoint(this); 1811 } 1812 } 1813 1814 private abstract static class MyStoreHook { 1815 1816 void getScanners(MyStore store) throws IOException { 1817 } 1818 1819 long getSmallestReadPoint(HStore store) { 1820 return store.getHRegion().getSmallestReadPoint(); 1821 } 1822 } 1823 1824 @Test 1825 public void testSwitchingPreadtoStreamParallelyWithCompactionDischarger() throws Exception { 1826 Configuration conf = HBaseConfiguration.create(); 1827 conf.set("hbase.hstore.engine.class", DummyStoreEngine.class.getName()); 1828 conf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 0); 1829 // Set the lower threshold to invoke the "MERGE" policy 1830 MyStore store = initMyStore(name.getMethodName(), conf, new MyStoreHook() { 1831 }); 1832 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 1833 long ts = EnvironmentEdgeManager.currentTime(); 1834 long seqID = 1L; 1835 // Add some data to the region and do some flushes 1836 for (int i = 1; i < 10; i++) { 1837 store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), 1838 memStoreSizing); 1839 } 1840 // flush them 1841 flushStore(store, seqID); 1842 for (int i = 11; i < 20; i++) { 1843 store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), 1844 memStoreSizing); 1845 } 1846 // flush them 1847 flushStore(store, seqID); 1848 for (int i = 21; i < 30; i++) { 1849 store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), 1850 memStoreSizing); 1851 } 1852 // flush them 1853 flushStore(store, seqID); 1854 1855 assertEquals(3, store.getStorefilesCount()); 1856 Scan scan = new Scan(); 1857 scan.addFamily(family); 1858 Collection<HStoreFile> storefiles2 = store.getStorefiles(); 1859 ArrayList<HStoreFile> actualStorefiles = Lists.newArrayList(storefiles2); 1860 StoreScanner storeScanner = 1861 (StoreScanner) store.getScanner(scan, scan.getFamilyMap().get(family), Long.MAX_VALUE); 1862 // get the current heap 1863 KeyValueHeap heap = storeScanner.heap; 1864 // create more store files 1865 for (int i = 31; i < 40; i++) { 1866 store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), 1867 memStoreSizing); 1868 } 1869 // flush them 1870 flushStore(store, seqID); 1871 1872 for (int i = 41; i < 50; i++) { 1873 store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), 1874 memStoreSizing); 1875 } 1876 // flush them 1877 flushStore(store, seqID); 1878 storefiles2 = store.getStorefiles(); 1879 ArrayList<HStoreFile> actualStorefiles1 = Lists.newArrayList(storefiles2); 1880 actualStorefiles1.removeAll(actualStorefiles); 1881 // Do compaction 1882 MyThread thread = new MyThread(storeScanner); 1883 thread.start(); 1884 store.replaceStoreFiles(actualStorefiles, actualStorefiles1, false); 1885 thread.join(); 1886 KeyValueHeap heap2 = thread.getHeap(); 1887 assertFalse(heap.equals(heap2)); 1888 } 1889 1890 @Test 1891 public void testMaxPreadBytesConfiguredToBeLessThanZero() throws Exception { 1892 Configuration conf = HBaseConfiguration.create(); 1893 conf.set("hbase.hstore.engine.class", DummyStoreEngine.class.getName()); 1894 // Set 'hbase.storescanner.pread.max.bytes' < 0, so that StoreScanner will be a STREAM type. 1895 conf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, -1); 1896 MyStore store = initMyStore(name.getMethodName(), conf, new MyStoreHook() { 1897 }); 1898 Scan scan = new Scan(); 1899 scan.addFamily(family); 1900 // ReadType on Scan is still DEFAULT only. 1901 assertEquals(ReadType.DEFAULT, scan.getReadType()); 1902 StoreScanner storeScanner = 1903 (StoreScanner) store.getScanner(scan, scan.getFamilyMap().get(family), Long.MAX_VALUE); 1904 assertFalse(storeScanner.isScanUsePread()); 1905 } 1906 1907 @Test 1908 public void testSpaceQuotaChangeAfterReplacement() throws IOException { 1909 final TableName tn = TableName.valueOf(name.getMethodName()); 1910 init(name.getMethodName()); 1911 1912 RegionSizeStoreImpl sizeStore = new RegionSizeStoreImpl(); 1913 1914 HStoreFile sf1 = mockStoreFileWithLength(1024L); 1915 HStoreFile sf2 = mockStoreFileWithLength(2048L); 1916 HStoreFile sf3 = mockStoreFileWithLength(4096L); 1917 HStoreFile sf4 = mockStoreFileWithLength(8192L); 1918 1919 RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tn).setStartKey(Bytes.toBytes("a")) 1920 .setEndKey(Bytes.toBytes("b")).build(); 1921 1922 // Compacting two files down to one, reducing size 1923 sizeStore.put(regionInfo, 1024L + 4096L); 1924 store.updateSpaceQuotaAfterFileReplacement(sizeStore, regionInfo, Arrays.asList(sf1, sf3), 1925 Arrays.asList(sf2)); 1926 1927 assertEquals(2048L, sizeStore.getRegionSize(regionInfo).getSize()); 1928 1929 // The same file length in and out should have no change 1930 store.updateSpaceQuotaAfterFileReplacement(sizeStore, regionInfo, Arrays.asList(sf2), 1931 Arrays.asList(sf2)); 1932 1933 assertEquals(2048L, sizeStore.getRegionSize(regionInfo).getSize()); 1934 1935 // Increase the total size used 1936 store.updateSpaceQuotaAfterFileReplacement(sizeStore, regionInfo, Arrays.asList(sf2), 1937 Arrays.asList(sf3)); 1938 1939 assertEquals(4096L, sizeStore.getRegionSize(regionInfo).getSize()); 1940 1941 RegionInfo regionInfo2 = RegionInfoBuilder.newBuilder(tn).setStartKey(Bytes.toBytes("b")) 1942 .setEndKey(Bytes.toBytes("c")).build(); 1943 store.updateSpaceQuotaAfterFileReplacement(sizeStore, regionInfo2, null, Arrays.asList(sf4)); 1944 1945 assertEquals(8192L, sizeStore.getRegionSize(regionInfo2).getSize()); 1946 } 1947 1948 @Test 1949 public void testHFileContextSetWithCFAndTable() throws Exception { 1950 init(this.name.getMethodName()); 1951 StoreFileWriter writer = store.getStoreEngine() 1952 .createWriter(CreateStoreFileWriterParams.create().maxKeyCount(10000L) 1953 .compression(Compression.Algorithm.NONE).isCompaction(true).includeMVCCReadpoint(true) 1954 .includesTag(false).shouldDropBehind(true)); 1955 HFileContext hFileContext = writer.getHFileWriter().getFileContext(); 1956 assertArrayEquals(family, hFileContext.getColumnFamily()); 1957 assertArrayEquals(table, hFileContext.getTableName()); 1958 } 1959 1960 // This test is for HBASE-26026, HBase Write be stuck when active segment has no cell 1961 // but its dataSize exceeds inmemoryFlushSize 1962 @Test 1963 public void testCompactingMemStoreNoCellButDataSizeExceedsInmemoryFlushSize() 1964 throws IOException, InterruptedException { 1965 Configuration conf = HBaseConfiguration.create(); 1966 1967 byte[] smallValue = new byte[3]; 1968 byte[] largeValue = new byte[9]; 1969 final long timestamp = EnvironmentEdgeManager.currentTime(); 1970 final long seqId = 100; 1971 final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue); 1972 final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue); 1973 int smallCellByteSize = MutableSegment.getCellLength(smallCell); 1974 int largeCellByteSize = MutableSegment.getCellLength(largeCell); 1975 int flushByteSize = smallCellByteSize + largeCellByteSize - 2; 1976 1977 // set CompactingMemStore.inmemoryFlushSize to flushByteSize. 1978 conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore2.class.getName()); 1979 conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005); 1980 conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200)); 1981 1982 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) 1983 .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); 1984 1985 MyCompactingMemStore2 myCompactingMemStore = ((MyCompactingMemStore2) store.memstore); 1986 assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize); 1987 myCompactingMemStore.smallCellPreUpdateCounter.set(0); 1988 myCompactingMemStore.largeCellPreUpdateCounter.set(0); 1989 1990 final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>(); 1991 Thread smallCellThread = new Thread(() -> { 1992 try { 1993 store.add(smallCell, new NonThreadSafeMemStoreSizing()); 1994 } catch (Throwable exception) { 1995 exceptionRef.set(exception); 1996 } 1997 }); 1998 smallCellThread.setName(MyCompactingMemStore2.SMALL_CELL_THREAD_NAME); 1999 smallCellThread.start(); 2000 2001 String oldThreadName = Thread.currentThread().getName(); 2002 try { 2003 /** 2004 * 1.smallCellThread enters CompactingMemStore.checkAndAddToActiveSize first, then 2005 * largeCellThread enters CompactingMemStore.checkAndAddToActiveSize, and then largeCellThread 2006 * invokes flushInMemory. 2007 * <p/> 2008 * 2. After largeCellThread finished CompactingMemStore.flushInMemory method, smallCellThread 2009 * can add cell to currentActive . That is to say when largeCellThread called flushInMemory 2010 * method, CompactingMemStore.active has no cell. 2011 */ 2012 Thread.currentThread().setName(MyCompactingMemStore2.LARGE_CELL_THREAD_NAME); 2013 store.add(largeCell, new NonThreadSafeMemStoreSizing()); 2014 smallCellThread.join(); 2015 2016 for (int i = 0; i < 100; i++) { 2017 long currentTimestamp = timestamp + 100 + i; 2018 Cell cell = createCell(qf2, currentTimestamp, seqId, largeValue); 2019 store.add(cell, new NonThreadSafeMemStoreSizing()); 2020 } 2021 } finally { 2022 Thread.currentThread().setName(oldThreadName); 2023 } 2024 2025 assertTrue(exceptionRef.get() == null); 2026 2027 } 2028 2029 // This test is for HBASE-26210, HBase Write be stuck when there is cell which size exceeds 2030 // InmemoryFlushSize 2031 @Test(timeout = 60000) 2032 public void testCompactingMemStoreCellExceedInmemoryFlushSize() throws Exception { 2033 Configuration conf = HBaseConfiguration.create(); 2034 conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore6.class.getName()); 2035 2036 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) 2037 .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); 2038 2039 MyCompactingMemStore6 myCompactingMemStore = ((MyCompactingMemStore6) store.memstore); 2040 2041 int size = (int) (myCompactingMemStore.getInmemoryFlushSize()); 2042 byte[] value = new byte[size + 1]; 2043 2044 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 2045 long timestamp = EnvironmentEdgeManager.currentTime(); 2046 long seqId = 100; 2047 Cell cell = createCell(qf1, timestamp, seqId, value); 2048 int cellByteSize = MutableSegment.getCellLength(cell); 2049 store.add(cell, memStoreSizing); 2050 assertTrue(memStoreSizing.getCellsCount() == 1); 2051 assertTrue(memStoreSizing.getDataSize() == cellByteSize); 2052 // Waiting the in memory compaction completed, see HBASE-26438 2053 myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await(); 2054 } 2055 2056 /** 2057 * This test is for HBASE-27464, before this JIRA,when init {@link CellChunkImmutableSegment} for 2058 * 'COMPACT' action, we not force copy to current MSLab. When cell size bigger than 2059 * {@link MemStoreLABImpl#maxAlloc}, cell will stay in previous chunk which will recycle after 2060 * segment replace, and we may read wrong data when these chunk reused by others. 2061 */ 2062 @Test 2063 public void testForceCloneOfBigCellForCellChunkImmutableSegment() throws Exception { 2064 Configuration conf = HBaseConfiguration.create(); 2065 int maxAllocByteSize = conf.getInt(MemStoreLAB.MAX_ALLOC_KEY, MemStoreLAB.MAX_ALLOC_DEFAULT); 2066 2067 // Construct big cell,which is large than {@link MemStoreLABImpl#maxAlloc}. 2068 byte[] cellValue = new byte[maxAllocByteSize + 1]; 2069 final long timestamp = EnvironmentEdgeManager.currentTime(); 2070 final long seqId = 100; 2071 final byte[] rowKey1 = Bytes.toBytes("rowKey1"); 2072 final Cell originalCell1 = createCell(rowKey1, qf1, timestamp, seqId, cellValue); 2073 final byte[] rowKey2 = Bytes.toBytes("rowKey2"); 2074 final Cell originalCell2 = createCell(rowKey2, qf1, timestamp, seqId, cellValue); 2075 TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); 2076 quals.add(qf1); 2077 2078 int cellByteSize = MutableSegment.getCellLength(originalCell1); 2079 int inMemoryFlushByteSize = cellByteSize - 1; 2080 2081 // set CompactingMemStore.inmemoryFlushSize to flushByteSize. 2082 conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore6.class.getName()); 2083 conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005); 2084 conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(inMemoryFlushByteSize * 200)); 2085 conf.setBoolean(WALFactory.WAL_ENABLED, false); 2086 2087 // Use {@link MemoryCompactionPolicy#EAGER} for always compacting. 2088 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) 2089 .setInMemoryCompaction(MemoryCompactionPolicy.EAGER).build()); 2090 2091 MyCompactingMemStore6 myCompactingMemStore = ((MyCompactingMemStore6) store.memstore); 2092 assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == inMemoryFlushByteSize); 2093 2094 // Data chunk Pool is disabled. 2095 assertTrue(ChunkCreator.getInstance().getMaxCount(ChunkType.DATA_CHUNK) == 0); 2096 2097 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 2098 2099 // First compact 2100 store.add(originalCell1, memStoreSizing); 2101 // Waiting for the first in-memory compaction finished 2102 myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await(); 2103 2104 StoreScanner storeScanner = 2105 (StoreScanner) store.getScanner(new Scan(new Get(rowKey1)), quals, seqId + 1); 2106 SegmentScanner segmentScanner = getTypeKeyValueScanner(storeScanner, SegmentScanner.class); 2107 Cell resultCell1 = segmentScanner.next(); 2108 assertTrue(CellUtil.equals(resultCell1, originalCell1)); 2109 int cell1ChunkId = ((ExtendedCell) resultCell1).getChunkId(); 2110 assertTrue(cell1ChunkId != ExtendedCell.CELL_NOT_BASED_ON_CHUNK); 2111 assertNull(segmentScanner.next()); 2112 segmentScanner.close(); 2113 storeScanner.close(); 2114 Segment segment = segmentScanner.segment; 2115 assertTrue(segment instanceof CellChunkImmutableSegment); 2116 MemStoreLABImpl memStoreLAB1 = (MemStoreLABImpl) (segmentScanner.segment.getMemStoreLAB()); 2117 assertTrue(!memStoreLAB1.isClosed()); 2118 assertTrue(!memStoreLAB1.chunks.isEmpty()); 2119 assertTrue(!memStoreLAB1.isReclaimed()); 2120 2121 // Second compact 2122 store.add(originalCell2, memStoreSizing); 2123 // Waiting for the second in-memory compaction finished 2124 myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await(); 2125 2126 // Before HBASE-27464, here may throw java.lang.IllegalArgumentException: In CellChunkMap, cell 2127 // must be associated with chunk.. We were looking for a cell at index 0. 2128 // The cause for this exception is because the data chunk Pool is disabled,when the data chunks 2129 // are recycled after the second in-memory compaction finished,the 2130 // {@link ChunkCreator.putbackChunks} method does not put the chunks back to the data chunk 2131 // pool,it just removes them from {@link ChunkCreator#chunkIdMap},so in 2132 // {@link CellChunkMap#getCell} we could not get the data chunk by chunkId. 2133 storeScanner = (StoreScanner) store.getScanner(new Scan(new Get(rowKey1)), quals, seqId + 1); 2134 segmentScanner = getTypeKeyValueScanner(storeScanner, SegmentScanner.class); 2135 Cell newResultCell1 = segmentScanner.next(); 2136 assertTrue(newResultCell1 != resultCell1); 2137 assertTrue(CellUtil.equals(newResultCell1, originalCell1)); 2138 2139 Cell resultCell2 = segmentScanner.next(); 2140 assertTrue(CellUtil.equals(resultCell2, originalCell2)); 2141 assertNull(segmentScanner.next()); 2142 segmentScanner.close(); 2143 storeScanner.close(); 2144 2145 segment = segmentScanner.segment; 2146 assertTrue(segment instanceof CellChunkImmutableSegment); 2147 MemStoreLABImpl memStoreLAB2 = (MemStoreLABImpl) (segmentScanner.segment.getMemStoreLAB()); 2148 assertTrue(!memStoreLAB2.isClosed()); 2149 assertTrue(!memStoreLAB2.chunks.isEmpty()); 2150 assertTrue(!memStoreLAB2.isReclaimed()); 2151 assertTrue(memStoreLAB1.isClosed()); 2152 assertTrue(memStoreLAB1.chunks.isEmpty()); 2153 assertTrue(memStoreLAB1.isReclaimed()); 2154 } 2155 2156 // This test is for HBASE-26210 also, test write large cell and small cell concurrently when 2157 // InmemoryFlushSize is smaller,equal with and larger than cell size. 2158 @Test 2159 public void testCompactingMemStoreWriteLargeCellAndSmallCellConcurrently() 2160 throws IOException, InterruptedException { 2161 doWriteTestLargeCellAndSmallCellConcurrently( 2162 (smallCellByteSize, largeCellByteSize) -> largeCellByteSize - 1); 2163 doWriteTestLargeCellAndSmallCellConcurrently( 2164 (smallCellByteSize, largeCellByteSize) -> largeCellByteSize); 2165 doWriteTestLargeCellAndSmallCellConcurrently( 2166 (smallCellByteSize, largeCellByteSize) -> smallCellByteSize + largeCellByteSize - 1); 2167 doWriteTestLargeCellAndSmallCellConcurrently( 2168 (smallCellByteSize, largeCellByteSize) -> smallCellByteSize + largeCellByteSize); 2169 doWriteTestLargeCellAndSmallCellConcurrently( 2170 (smallCellByteSize, largeCellByteSize) -> smallCellByteSize + largeCellByteSize + 1); 2171 } 2172 2173 private void doWriteTestLargeCellAndSmallCellConcurrently(IntBinaryOperator getFlushByteSize) 2174 throws IOException, InterruptedException { 2175 2176 Configuration conf = HBaseConfiguration.create(); 2177 2178 byte[] smallValue = new byte[3]; 2179 byte[] largeValue = new byte[100]; 2180 final long timestamp = EnvironmentEdgeManager.currentTime(); 2181 final long seqId = 100; 2182 final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue); 2183 final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue); 2184 int smallCellByteSize = MutableSegment.getCellLength(smallCell); 2185 int largeCellByteSize = MutableSegment.getCellLength(largeCell); 2186 int flushByteSize = getFlushByteSize.applyAsInt(smallCellByteSize, largeCellByteSize); 2187 boolean flushByteSizeLessThanSmallAndLargeCellSize = 2188 flushByteSize < (smallCellByteSize + largeCellByteSize); 2189 2190 conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore3.class.getName()); 2191 conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005); 2192 conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200)); 2193 2194 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) 2195 .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); 2196 2197 MyCompactingMemStore3 myCompactingMemStore = ((MyCompactingMemStore3) store.memstore); 2198 assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize); 2199 myCompactingMemStore.disableCompaction(); 2200 if (flushByteSizeLessThanSmallAndLargeCellSize) { 2201 myCompactingMemStore.flushByteSizeLessThanSmallAndLargeCellSize = true; 2202 } else { 2203 myCompactingMemStore.flushByteSizeLessThanSmallAndLargeCellSize = false; 2204 } 2205 2206 final ThreadSafeMemStoreSizing memStoreSizing = new ThreadSafeMemStoreSizing(); 2207 final AtomicLong totalCellByteSize = new AtomicLong(0); 2208 final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>(); 2209 Thread smallCellThread = new Thread(() -> { 2210 try { 2211 for (int i = 1; i <= MyCompactingMemStore3.CELL_COUNT; i++) { 2212 long currentTimestamp = timestamp + i; 2213 Cell cell = createCell(qf1, currentTimestamp, seqId, smallValue); 2214 totalCellByteSize.addAndGet(MutableSegment.getCellLength(cell)); 2215 store.add(cell, memStoreSizing); 2216 } 2217 } catch (Throwable exception) { 2218 exceptionRef.set(exception); 2219 2220 } 2221 }); 2222 smallCellThread.setName(MyCompactingMemStore3.SMALL_CELL_THREAD_NAME); 2223 smallCellThread.start(); 2224 2225 String oldThreadName = Thread.currentThread().getName(); 2226 try { 2227 /** 2228 * When flushByteSizeLessThanSmallAndLargeCellSize is true: 2229 * </p> 2230 * 1.smallCellThread enters MyCompactingMemStore3.checkAndAddToActiveSize first, then 2231 * largeCellThread enters MyCompactingMemStore3.checkAndAddToActiveSize, and then 2232 * largeCellThread invokes flushInMemory. 2233 * <p/> 2234 * 2. After largeCellThread finished CompactingMemStore.flushInMemory method, smallCellThread 2235 * can run into MyCompactingMemStore3.checkAndAddToActiveSize again. 2236 * <p/> 2237 * When flushByteSizeLessThanSmallAndLargeCellSize is false: smallCellThread and 2238 * largeCellThread concurrently write one cell and wait each other, and then write another 2239 * cell etc. 2240 */ 2241 Thread.currentThread().setName(MyCompactingMemStore3.LARGE_CELL_THREAD_NAME); 2242 for (int i = 1; i <= MyCompactingMemStore3.CELL_COUNT; i++) { 2243 long currentTimestamp = timestamp + i; 2244 Cell cell = createCell(qf2, currentTimestamp, seqId, largeValue); 2245 totalCellByteSize.addAndGet(MutableSegment.getCellLength(cell)); 2246 store.add(cell, memStoreSizing); 2247 } 2248 smallCellThread.join(); 2249 2250 assertTrue(exceptionRef.get() == null); 2251 assertTrue(memStoreSizing.getCellsCount() == (MyCompactingMemStore3.CELL_COUNT * 2)); 2252 assertTrue(memStoreSizing.getDataSize() == totalCellByteSize.get()); 2253 if (flushByteSizeLessThanSmallAndLargeCellSize) { 2254 assertTrue(myCompactingMemStore.flushCounter.get() == MyCompactingMemStore3.CELL_COUNT); 2255 } else { 2256 assertTrue( 2257 myCompactingMemStore.flushCounter.get() <= (MyCompactingMemStore3.CELL_COUNT - 1)); 2258 } 2259 } finally { 2260 Thread.currentThread().setName(oldThreadName); 2261 } 2262 } 2263 2264 /** 2265 * <pre> 2266 * This test is for HBASE-26384, 2267 * test {@link CompactingMemStore#flattenOneSegment} and {@link CompactingMemStore#snapshot()} 2268 * execute concurrently. 2269 * The threads sequence before HBASE-26384 is(The bug only exists for branch-2,and I add UTs 2270 * for both branch-2 and master): 2271 * 1. The {@link CompactingMemStore} size exceeds 2272 * {@link CompactingMemStore#getInmemoryFlushSize()},the write thread adds a new 2273 * {@link ImmutableSegment} to the head of {@link CompactingMemStore#pipeline},and start a 2274 * in memory compact thread to execute {@link CompactingMemStore#inMemoryCompaction}. 2275 * 2. The in memory compact thread starts and then stopping before 2276 * {@link CompactingMemStore#flattenOneSegment}. 2277 * 3. The snapshot thread starts {@link CompactingMemStore#snapshot} concurrently,after the 2278 * snapshot thread executing {@link CompactingMemStore#getImmutableSegments},the in memory 2279 * compact thread continues. 2280 * Assuming {@link VersionedSegmentsList#version} returned from 2281 * {@link CompactingMemStore#getImmutableSegments} is v. 2282 * 4. The snapshot thread stopping before {@link CompactingMemStore#swapPipelineWithNull}. 2283 * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment}, 2284 * {@link CompactionPipeline#version} is still v. 2285 * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because 2286 * {@link CompactionPipeline#version} is v, {@link CompactingMemStore#swapPipelineWithNull} 2287 * thinks it is successful and continue flushing,but the {@link ImmutableSegment} in 2288 * {@link CompactionPipeline} has changed because 2289 * {@link CompactingMemStore#flattenOneSegment},so the {@link ImmutableSegment} is not 2290 * removed in fact and still remaining in {@link CompactionPipeline}. 2291 * 2292 * After HBASE-26384, the 5-6 step is changed to following, which is expected behavior: 2293 * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment}, 2294 * {@link CompactingMemStore#flattenOneSegment} change {@link CompactionPipeline#version} to 2295 * v+1. 2296 * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because 2297 * {@link CompactionPipeline#version} is v+1, {@link CompactingMemStore#swapPipelineWithNull} 2298 * failed and retry the while loop in {@link CompactingMemStore#pushPipelineToSnapshot} once 2299 * again, because there is no concurrent {@link CompactingMemStore#inMemoryCompaction} now, 2300 * {@link CompactingMemStore#swapPipelineWithNull} succeeds. 2301 * </pre> 2302 */ 2303 @Test 2304 public void testFlattenAndSnapshotCompactingMemStoreConcurrently() throws Exception { 2305 Configuration conf = HBaseConfiguration.create(); 2306 2307 byte[] smallValue = new byte[3]; 2308 byte[] largeValue = new byte[9]; 2309 final long timestamp = EnvironmentEdgeManager.currentTime(); 2310 final long seqId = 100; 2311 final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue); 2312 final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue); 2313 int smallCellByteSize = MutableSegment.getCellLength(smallCell); 2314 int largeCellByteSize = MutableSegment.getCellLength(largeCell); 2315 int totalCellByteSize = (smallCellByteSize + largeCellByteSize); 2316 int flushByteSize = totalCellByteSize - 2; 2317 2318 // set CompactingMemStore.inmemoryFlushSize to flushByteSize. 2319 conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore4.class.getName()); 2320 conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005); 2321 conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200)); 2322 2323 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) 2324 .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); 2325 2326 MyCompactingMemStore4 myCompactingMemStore = ((MyCompactingMemStore4) store.memstore); 2327 assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize); 2328 2329 store.add(smallCell, new NonThreadSafeMemStoreSizing()); 2330 store.add(largeCell, new NonThreadSafeMemStoreSizing()); 2331 2332 String oldThreadName = Thread.currentThread().getName(); 2333 try { 2334 Thread.currentThread().setName(MyCompactingMemStore4.TAKE_SNAPSHOT_THREAD_NAME); 2335 /** 2336 * {@link CompactingMemStore#snapshot} must wait the in memory compact thread enters 2337 * {@link CompactingMemStore#flattenOneSegment},because {@link CompactingMemStore#snapshot} 2338 * would invoke {@link CompactingMemStore#stopCompaction}. 2339 */ 2340 myCompactingMemStore.snapShotStartCyclicCyclicBarrier.await(); 2341 2342 MemStoreSnapshot memStoreSnapshot = myCompactingMemStore.snapshot(); 2343 myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await(); 2344 2345 assertTrue(memStoreSnapshot.getCellsCount() == 2); 2346 assertTrue(((int) (memStoreSnapshot.getDataSize())) == totalCellByteSize); 2347 VersionedSegmentsList segments = myCompactingMemStore.getImmutableSegments(); 2348 assertTrue(segments.getNumOfSegments() == 0); 2349 assertTrue(segments.getNumOfCells() == 0); 2350 assertTrue(myCompactingMemStore.setInMemoryCompactionFlagCounter.get() == 1); 2351 assertTrue(myCompactingMemStore.swapPipelineWithNullCounter.get() == 2); 2352 } finally { 2353 Thread.currentThread().setName(oldThreadName); 2354 } 2355 } 2356 2357 /** 2358 * <pre> 2359 * This test is for HBASE-26384, 2360 * test {@link CompactingMemStore#flattenOneSegment}{@link CompactingMemStore#snapshot()} 2361 * and writeMemStore execute concurrently. 2362 * The threads sequence before HBASE-26384 is(The bug only exists for branch-2,and I add UTs 2363 * for both branch-2 and master): 2364 * 1. The {@link CompactingMemStore} size exceeds 2365 * {@link CompactingMemStore#getInmemoryFlushSize()},the write thread adds a new 2366 * {@link ImmutableSegment} to the head of {@link CompactingMemStore#pipeline},and start a 2367 * in memory compact thread to execute {@link CompactingMemStore#inMemoryCompaction}. 2368 * 2. The in memory compact thread starts and then stopping before 2369 * {@link CompactingMemStore#flattenOneSegment}. 2370 * 3. The snapshot thread starts {@link CompactingMemStore#snapshot} concurrently,after the 2371 * snapshot thread executing {@link CompactingMemStore#getImmutableSegments},the in memory 2372 * compact thread continues. 2373 * Assuming {@link VersionedSegmentsList#version} returned from 2374 * {@link CompactingMemStore#getImmutableSegments} is v. 2375 * 4. The snapshot thread stopping before {@link CompactingMemStore#swapPipelineWithNull}. 2376 * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment}, 2377 * {@link CompactionPipeline#version} is still v. 2378 * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because 2379 * {@link CompactionPipeline#version} is v, {@link CompactingMemStore#swapPipelineWithNull} 2380 * thinks it is successful and continue flushing,but the {@link ImmutableSegment} in 2381 * {@link CompactionPipeline} has changed because 2382 * {@link CompactingMemStore#flattenOneSegment},so the {@link ImmutableSegment} is not 2383 * removed in fact and still remaining in {@link CompactionPipeline}. 2384 * 2385 * After HBASE-26384, the 5-6 step is changed to following, which is expected behavior, 2386 * and I add step 7-8 to test there is new segment added before retry. 2387 * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment}, 2388 * {@link CompactingMemStore#flattenOneSegment} change {@link CompactionPipeline#version} to 2389 * v+1. 2390 * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because 2391 * {@link CompactionPipeline#version} is v+1, {@link CompactingMemStore#swapPipelineWithNull} 2392 * failed and retry,{@link VersionedSegmentsList#version} returned from 2393 * {@link CompactingMemStore#getImmutableSegments} is v+1. 2394 * 7. The write thread continues writing to {@link CompactingMemStore} and 2395 * {@link CompactingMemStore} size exceeds {@link CompactingMemStore#getInmemoryFlushSize()}, 2396 * {@link CompactingMemStore#flushInMemory(MutableSegment)} is called and a new 2397 * {@link ImmutableSegment} is added to the head of {@link CompactingMemStore#pipeline}, 2398 * {@link CompactionPipeline#version} is still v+1. 2399 * 8. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because 2400 * {@link CompactionPipeline#version} is still v+1, 2401 * {@link CompactingMemStore#swapPipelineWithNull} succeeds.The new {@link ImmutableSegment} 2402 * remained at the head of {@link CompactingMemStore#pipeline},the old is removed by 2403 * {@link CompactingMemStore#swapPipelineWithNull}. 2404 * </pre> 2405 */ 2406 @Test 2407 public void testFlattenSnapshotWriteCompactingMemeStoreConcurrently() throws Exception { 2408 Configuration conf = HBaseConfiguration.create(); 2409 2410 byte[] smallValue = new byte[3]; 2411 byte[] largeValue = new byte[9]; 2412 final long timestamp = EnvironmentEdgeManager.currentTime(); 2413 final long seqId = 100; 2414 final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue); 2415 final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue); 2416 int smallCellByteSize = MutableSegment.getCellLength(smallCell); 2417 int largeCellByteSize = MutableSegment.getCellLength(largeCell); 2418 int firstWriteCellByteSize = (smallCellByteSize + largeCellByteSize); 2419 int flushByteSize = firstWriteCellByteSize - 2; 2420 2421 // set CompactingMemStore.inmemoryFlushSize to flushByteSize. 2422 conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore5.class.getName()); 2423 conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005); 2424 conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200)); 2425 2426 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) 2427 .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); 2428 2429 final MyCompactingMemStore5 myCompactingMemStore = ((MyCompactingMemStore5) store.memstore); 2430 assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize); 2431 2432 store.add(smallCell, new NonThreadSafeMemStoreSizing()); 2433 store.add(largeCell, new NonThreadSafeMemStoreSizing()); 2434 2435 final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>(); 2436 final Cell writeAgainCell1 = createCell(qf3, timestamp, seqId + 1, largeValue); 2437 final Cell writeAgainCell2 = createCell(qf4, timestamp, seqId + 1, largeValue); 2438 final int writeAgainCellByteSize = 2439 MutableSegment.getCellLength(writeAgainCell1) + MutableSegment.getCellLength(writeAgainCell2); 2440 final Thread writeAgainThread = new Thread(() -> { 2441 try { 2442 myCompactingMemStore.writeMemStoreAgainStartCyclicBarrier.await(); 2443 2444 store.add(writeAgainCell1, new NonThreadSafeMemStoreSizing()); 2445 store.add(writeAgainCell2, new NonThreadSafeMemStoreSizing()); 2446 2447 myCompactingMemStore.writeMemStoreAgainEndCyclicBarrier.await(); 2448 } catch (Throwable exception) { 2449 exceptionRef.set(exception); 2450 } 2451 }); 2452 writeAgainThread.setName(MyCompactingMemStore5.WRITE_AGAIN_THREAD_NAME); 2453 writeAgainThread.start(); 2454 2455 String oldThreadName = Thread.currentThread().getName(); 2456 try { 2457 Thread.currentThread().setName(MyCompactingMemStore5.TAKE_SNAPSHOT_THREAD_NAME); 2458 /** 2459 * {@link CompactingMemStore#snapshot} must wait the in memory compact thread enters 2460 * {@link CompactingMemStore#flattenOneSegment},because {@link CompactingMemStore#snapshot} 2461 * would invoke {@link CompactingMemStore#stopCompaction}. 2462 */ 2463 myCompactingMemStore.snapShotStartCyclicCyclicBarrier.await(); 2464 MemStoreSnapshot memStoreSnapshot = myCompactingMemStore.snapshot(); 2465 myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await(); 2466 writeAgainThread.join(); 2467 2468 assertTrue(memStoreSnapshot.getCellsCount() == 2); 2469 assertTrue(((int) (memStoreSnapshot.getDataSize())) == firstWriteCellByteSize); 2470 VersionedSegmentsList segments = myCompactingMemStore.getImmutableSegments(); 2471 assertTrue(segments.getNumOfSegments() == 1); 2472 assertTrue( 2473 ((int) (segments.getStoreSegments().get(0).getDataSize())) == writeAgainCellByteSize); 2474 assertTrue(segments.getNumOfCells() == 2); 2475 assertTrue(myCompactingMemStore.setInMemoryCompactionFlagCounter.get() == 2); 2476 assertTrue(exceptionRef.get() == null); 2477 assertTrue(myCompactingMemStore.swapPipelineWithNullCounter.get() == 2); 2478 } finally { 2479 Thread.currentThread().setName(oldThreadName); 2480 } 2481 } 2482 2483 /** 2484 * <pre> 2485 * This test is for HBASE-26465, 2486 * test {@link DefaultMemStore#clearSnapshot} and {@link DefaultMemStore#getScanners} execute 2487 * concurrently. The threads sequence before HBASE-26465 is: 2488 * 1.The flush thread starts {@link DefaultMemStore} flushing after some cells have be added to 2489 * {@link DefaultMemStore}. 2490 * 2.The flush thread stopping before {@link DefaultMemStore#clearSnapshot} in 2491 * {@link HStore#updateStorefiles} after completed flushing memStore to hfile. 2492 * 3.The scan thread starts and stopping after {@link DefaultMemStore#getSnapshotSegments} in 2493 * {@link DefaultMemStore#getScanners},here the scan thread gets the 2494 * {@link DefaultMemStore#snapshot} which is created by the flush thread. 2495 * 4.The flush thread continues {@link DefaultMemStore#clearSnapshot} and close 2496 * {@link DefaultMemStore#snapshot},because the reference count of the corresponding 2497 * {@link MemStoreLABImpl} is 0, the {@link Chunk}s in corresponding {@link MemStoreLABImpl} 2498 * are recycled. 2499 * 5.The scan thread continues {@link DefaultMemStore#getScanners},and create a 2500 * {@link SegmentScanner} for this {@link DefaultMemStore#snapshot}, and increase the 2501 * reference count of the corresponding {@link MemStoreLABImpl}, but {@link Chunk}s in 2502 * corresponding {@link MemStoreLABImpl} are recycled by step 4, and these {@link Chunk}s may 2503 * be overwritten by other write threads,which may cause serious problem. 2504 * After HBASE-26465,{@link DefaultMemStore#getScanners} and 2505 * {@link DefaultMemStore#clearSnapshot} could not execute concurrently. 2506 * </pre> 2507 */ 2508 @Test 2509 public void testClearSnapshotGetScannerConcurrently() throws Exception { 2510 Configuration conf = HBaseConfiguration.create(); 2511 2512 byte[] smallValue = new byte[3]; 2513 byte[] largeValue = new byte[9]; 2514 final long timestamp = EnvironmentEdgeManager.currentTime(); 2515 final long seqId = 100; 2516 final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue); 2517 final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue); 2518 TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); 2519 quals.add(qf1); 2520 quals.add(qf2); 2521 2522 conf.set(HStore.MEMSTORE_CLASS_NAME, MyDefaultMemStore.class.getName()); 2523 conf.setBoolean(WALFactory.WAL_ENABLED, false); 2524 2525 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build()); 2526 MyDefaultMemStore myDefaultMemStore = (MyDefaultMemStore) (store.memstore); 2527 myDefaultMemStore.store = store; 2528 2529 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 2530 store.add(smallCell, memStoreSizing); 2531 store.add(largeCell, memStoreSizing); 2532 2533 final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>(); 2534 final Thread flushThread = new Thread(() -> { 2535 try { 2536 flushStore(store, id++); 2537 } catch (Throwable exception) { 2538 exceptionRef.set(exception); 2539 } 2540 }); 2541 flushThread.setName(MyDefaultMemStore.FLUSH_THREAD_NAME); 2542 flushThread.start(); 2543 2544 String oldThreadName = Thread.currentThread().getName(); 2545 StoreScanner storeScanner = null; 2546 try { 2547 Thread.currentThread().setName(MyDefaultMemStore.GET_SCANNER_THREAD_NAME); 2548 2549 /** 2550 * Wait flush thread stopping before {@link DefaultMemStore#doClearSnapshot} 2551 */ 2552 myDefaultMemStore.getScannerCyclicBarrier.await(); 2553 2554 storeScanner = (StoreScanner) store.getScanner(new Scan(new Get(row)), quals, seqId + 1); 2555 flushThread.join(); 2556 2557 if (myDefaultMemStore.shouldWait) { 2558 SegmentScanner segmentScanner = getTypeKeyValueScanner(storeScanner, SegmentScanner.class); 2559 MemStoreLABImpl memStoreLAB = (MemStoreLABImpl) (segmentScanner.segment.getMemStoreLAB()); 2560 assertTrue(memStoreLAB.isClosed()); 2561 assertTrue(!memStoreLAB.chunks.isEmpty()); 2562 assertTrue(!memStoreLAB.isReclaimed()); 2563 2564 Cell cell1 = segmentScanner.next(); 2565 CellUtil.equals(smallCell, cell1); 2566 Cell cell2 = segmentScanner.next(); 2567 CellUtil.equals(largeCell, cell2); 2568 assertNull(segmentScanner.next()); 2569 } else { 2570 List<Cell> results = new ArrayList<>(); 2571 storeScanner.next(results); 2572 assertEquals(2, results.size()); 2573 CellUtil.equals(smallCell, results.get(0)); 2574 CellUtil.equals(largeCell, results.get(1)); 2575 } 2576 assertTrue(exceptionRef.get() == null); 2577 } finally { 2578 if (storeScanner != null) { 2579 storeScanner.close(); 2580 } 2581 Thread.currentThread().setName(oldThreadName); 2582 } 2583 } 2584 2585 @SuppressWarnings("unchecked") 2586 private <T> T getTypeKeyValueScanner(StoreScanner storeScanner, Class<T> keyValueScannerClass) { 2587 List<T> resultScanners = new ArrayList<T>(); 2588 for (KeyValueScanner keyValueScanner : storeScanner.currentScanners) { 2589 if (keyValueScannerClass.isInstance(keyValueScanner)) { 2590 resultScanners.add((T) keyValueScanner); 2591 } 2592 } 2593 assertTrue(resultScanners.size() == 1); 2594 return resultScanners.get(0); 2595 } 2596 2597 @Test 2598 public void testOnConfigurationChange() throws IOException { 2599 final int COMMON_MAX_FILES_TO_COMPACT = 10; 2600 final int NEW_COMMON_MAX_FILES_TO_COMPACT = 8; 2601 final int STORE_MAX_FILES_TO_COMPACT = 6; 2602 2603 // Build a table that its maxFileToCompact different from common configuration. 2604 Configuration conf = HBaseConfiguration.create(); 2605 conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 2606 COMMON_MAX_FILES_TO_COMPACT); 2607 ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.newBuilder(family) 2608 .setConfiguration(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 2609 String.valueOf(STORE_MAX_FILES_TO_COMPACT)) 2610 .build(); 2611 init(this.name.getMethodName(), conf, hcd); 2612 2613 // After updating common configuration, the conf in HStore itself must not be changed. 2614 conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 2615 NEW_COMMON_MAX_FILES_TO_COMPACT); 2616 this.store.onConfigurationChange(conf); 2617 assertEquals(STORE_MAX_FILES_TO_COMPACT, 2618 store.getStoreEngine().getCompactionPolicy().getConf().getMaxFilesToCompact()); 2619 } 2620 2621 /** 2622 * This test is for HBASE-26476 2623 */ 2624 @Test 2625 public void testExtendsDefaultMemStore() throws Exception { 2626 Configuration conf = HBaseConfiguration.create(); 2627 conf.setBoolean(WALFactory.WAL_ENABLED, false); 2628 2629 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build()); 2630 assertTrue(this.store.memstore.getClass() == DefaultMemStore.class); 2631 tearDown(); 2632 2633 conf.set(HStore.MEMSTORE_CLASS_NAME, CustomDefaultMemStore.class.getName()); 2634 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build()); 2635 assertTrue(this.store.memstore.getClass() == CustomDefaultMemStore.class); 2636 } 2637 2638 static class CustomDefaultMemStore extends DefaultMemStore { 2639 2640 public CustomDefaultMemStore(Configuration conf, CellComparator c, 2641 RegionServicesForStores regionServices) { 2642 super(conf, c, regionServices); 2643 } 2644 2645 } 2646 2647 /** 2648 * This test is for HBASE-26488 2649 */ 2650 @Test 2651 public void testMemoryLeakWhenFlushMemStoreRetrying() throws Exception { 2652 2653 Configuration conf = HBaseConfiguration.create(); 2654 2655 byte[] smallValue = new byte[3]; 2656 byte[] largeValue = new byte[9]; 2657 final long timestamp = EnvironmentEdgeManager.currentTime(); 2658 final long seqId = 100; 2659 final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue); 2660 final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue); 2661 TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); 2662 quals.add(qf1); 2663 quals.add(qf2); 2664 2665 conf.set(HStore.MEMSTORE_CLASS_NAME, MyDefaultMemStore1.class.getName()); 2666 conf.setBoolean(WALFactory.WAL_ENABLED, false); 2667 conf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY, 2668 MyDefaultStoreFlusher.class.getName()); 2669 2670 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build()); 2671 MyDefaultMemStore1 myDefaultMemStore = (MyDefaultMemStore1) (store.memstore); 2672 assertTrue((store.storeEngine.getStoreFlusher()) instanceof MyDefaultStoreFlusher); 2673 2674 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 2675 store.add(smallCell, memStoreSizing); 2676 store.add(largeCell, memStoreSizing); 2677 flushStore(store, id++); 2678 2679 MemStoreLABImpl memStoreLAB = 2680 (MemStoreLABImpl) (myDefaultMemStore.snapshotImmutableSegment.getMemStoreLAB()); 2681 assertTrue(memStoreLAB.isClosed()); 2682 assertTrue(memStoreLAB.getRefCntValue() == 0); 2683 assertTrue(memStoreLAB.isReclaimed()); 2684 assertTrue(memStoreLAB.chunks.isEmpty()); 2685 StoreScanner storeScanner = null; 2686 try { 2687 storeScanner = (StoreScanner) store.getScanner(new Scan(new Get(row)), quals, seqId + 1); 2688 assertTrue(store.storeEngine.getStoreFileManager().getStorefileCount() == 1); 2689 assertTrue(store.memstore.size().getCellsCount() == 0); 2690 assertTrue(store.memstore.getSnapshotSize().getCellsCount() == 0); 2691 assertTrue(storeScanner.currentScanners.size() == 1); 2692 assertTrue(storeScanner.currentScanners.get(0) instanceof StoreFileScanner); 2693 2694 List<Cell> results = new ArrayList<>(); 2695 storeScanner.next(results); 2696 assertEquals(2, results.size()); 2697 CellUtil.equals(smallCell, results.get(0)); 2698 CellUtil.equals(largeCell, results.get(1)); 2699 } finally { 2700 if (storeScanner != null) { 2701 storeScanner.close(); 2702 } 2703 } 2704 } 2705 2706 static class MyDefaultMemStore1 extends DefaultMemStore { 2707 2708 private ImmutableSegment snapshotImmutableSegment; 2709 2710 public MyDefaultMemStore1(Configuration conf, CellComparator c, 2711 RegionServicesForStores regionServices) { 2712 super(conf, c, regionServices); 2713 } 2714 2715 @Override 2716 public MemStoreSnapshot snapshot() { 2717 MemStoreSnapshot result = super.snapshot(); 2718 this.snapshotImmutableSegment = snapshot; 2719 return result; 2720 } 2721 2722 } 2723 2724 public static class MyDefaultStoreFlusher extends DefaultStoreFlusher { 2725 private static final AtomicInteger failCounter = new AtomicInteger(1); 2726 private static final AtomicInteger counter = new AtomicInteger(0); 2727 2728 public MyDefaultStoreFlusher(Configuration conf, HStore store) { 2729 super(conf, store); 2730 } 2731 2732 @Override 2733 public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId, 2734 MonitoredTask status, ThroughputController throughputController, 2735 FlushLifeCycleTracker tracker, Consumer<Path> writerCreationTracker) throws IOException { 2736 counter.incrementAndGet(); 2737 return super.flushSnapshot(snapshot, cacheFlushId, status, throughputController, tracker, 2738 writerCreationTracker); 2739 } 2740 2741 @Override 2742 protected void performFlush(InternalScanner scanner, final CellSink sink, 2743 ThroughputController throughputController) throws IOException { 2744 2745 final int currentCount = counter.get(); 2746 CellSink newCellSink = (cell) -> { 2747 if (currentCount <= failCounter.get()) { 2748 throw new IOException("Simulated exception by tests"); 2749 } 2750 sink.append(cell); 2751 }; 2752 super.performFlush(scanner, newCellSink, throughputController); 2753 } 2754 } 2755 2756 /** 2757 * This test is for HBASE-26494, test the {@link RefCnt} behaviors in {@link ImmutableMemStoreLAB} 2758 */ 2759 @Test 2760 public void testImmutableMemStoreLABRefCnt() throws Exception { 2761 Configuration conf = HBaseConfiguration.create(); 2762 2763 byte[] smallValue = new byte[3]; 2764 byte[] largeValue = new byte[9]; 2765 final long timestamp = EnvironmentEdgeManager.currentTime(); 2766 final long seqId = 100; 2767 final Cell smallCell1 = createCell(qf1, timestamp, seqId, smallValue); 2768 final Cell largeCell1 = createCell(qf2, timestamp, seqId, largeValue); 2769 final Cell smallCell2 = createCell(qf3, timestamp, seqId + 1, smallValue); 2770 final Cell largeCell2 = createCell(qf4, timestamp, seqId + 1, largeValue); 2771 final Cell smallCell3 = createCell(qf5, timestamp, seqId + 2, smallValue); 2772 final Cell largeCell3 = createCell(qf6, timestamp, seqId + 2, largeValue); 2773 2774 int smallCellByteSize = MutableSegment.getCellLength(smallCell1); 2775 int largeCellByteSize = MutableSegment.getCellLength(largeCell1); 2776 int firstWriteCellByteSize = (smallCellByteSize + largeCellByteSize); 2777 int flushByteSize = firstWriteCellByteSize - 2; 2778 2779 // set CompactingMemStore.inmemoryFlushSize to flushByteSize. 2780 conf.set(HStore.MEMSTORE_CLASS_NAME, CompactingMemStore.class.getName()); 2781 conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005); 2782 conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200)); 2783 conf.setBoolean(WALFactory.WAL_ENABLED, false); 2784 2785 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) 2786 .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); 2787 2788 final CompactingMemStore myCompactingMemStore = ((CompactingMemStore) store.memstore); 2789 assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize); 2790 myCompactingMemStore.allowCompaction.set(false); 2791 2792 NonThreadSafeMemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 2793 store.add(smallCell1, memStoreSizing); 2794 store.add(largeCell1, memStoreSizing); 2795 store.add(smallCell2, memStoreSizing); 2796 store.add(largeCell2, memStoreSizing); 2797 store.add(smallCell3, memStoreSizing); 2798 store.add(largeCell3, memStoreSizing); 2799 VersionedSegmentsList versionedSegmentsList = myCompactingMemStore.getImmutableSegments(); 2800 assertTrue(versionedSegmentsList.getNumOfSegments() == 3); 2801 List<ImmutableSegment> segments = versionedSegmentsList.getStoreSegments(); 2802 List<MemStoreLABImpl> memStoreLABs = new ArrayList<MemStoreLABImpl>(segments.size()); 2803 for (ImmutableSegment segment : segments) { 2804 memStoreLABs.add((MemStoreLABImpl) segment.getMemStoreLAB()); 2805 } 2806 List<KeyValueScanner> scanners1 = myCompactingMemStore.getScanners(Long.MAX_VALUE); 2807 for (MemStoreLABImpl memStoreLAB : memStoreLABs) { 2808 assertTrue(memStoreLAB.getRefCntValue() == 2); 2809 } 2810 2811 myCompactingMemStore.allowCompaction.set(true); 2812 myCompactingMemStore.flushInMemory(); 2813 2814 versionedSegmentsList = myCompactingMemStore.getImmutableSegments(); 2815 assertTrue(versionedSegmentsList.getNumOfSegments() == 1); 2816 ImmutableMemStoreLAB immutableMemStoreLAB = 2817 (ImmutableMemStoreLAB) (versionedSegmentsList.getStoreSegments().get(0).getMemStoreLAB()); 2818 for (MemStoreLABImpl memStoreLAB : memStoreLABs) { 2819 assertTrue(memStoreLAB.getRefCntValue() == 2); 2820 } 2821 2822 List<KeyValueScanner> scanners2 = myCompactingMemStore.getScanners(Long.MAX_VALUE); 2823 for (MemStoreLABImpl memStoreLAB : memStoreLABs) { 2824 assertTrue(memStoreLAB.getRefCntValue() == 2); 2825 } 2826 assertTrue(immutableMemStoreLAB.getRefCntValue() == 2); 2827 for (KeyValueScanner scanner : scanners1) { 2828 scanner.close(); 2829 } 2830 for (MemStoreLABImpl memStoreLAB : memStoreLABs) { 2831 assertTrue(memStoreLAB.getRefCntValue() == 1); 2832 } 2833 for (KeyValueScanner scanner : scanners2) { 2834 scanner.close(); 2835 } 2836 for (MemStoreLABImpl memStoreLAB : memStoreLABs) { 2837 assertTrue(memStoreLAB.getRefCntValue() == 1); 2838 } 2839 assertTrue(immutableMemStoreLAB.getRefCntValue() == 1); 2840 flushStore(store, id++); 2841 for (MemStoreLABImpl memStoreLAB : memStoreLABs) { 2842 assertTrue(memStoreLAB.getRefCntValue() == 0); 2843 } 2844 assertTrue(immutableMemStoreLAB.getRefCntValue() == 0); 2845 assertTrue(immutableMemStoreLAB.isClosed()); 2846 for (MemStoreLABImpl memStoreLAB : memStoreLABs) { 2847 assertTrue(memStoreLAB.isClosed()); 2848 assertTrue(memStoreLAB.isReclaimed()); 2849 assertTrue(memStoreLAB.chunks.isEmpty()); 2850 } 2851 } 2852 2853 private HStoreFile mockStoreFileWithLength(long length) { 2854 HStoreFile sf = mock(HStoreFile.class); 2855 StoreFileReader sfr = mock(StoreFileReader.class); 2856 when(sf.isHFile()).thenReturn(true); 2857 when(sf.getReader()).thenReturn(sfr); 2858 when(sfr.length()).thenReturn(length); 2859 return sf; 2860 } 2861 2862 private static class MyThread extends Thread { 2863 private StoreScanner scanner; 2864 private KeyValueHeap heap; 2865 2866 public MyThread(StoreScanner scanner) { 2867 this.scanner = scanner; 2868 } 2869 2870 public KeyValueHeap getHeap() { 2871 return this.heap; 2872 } 2873 2874 @Override 2875 public void run() { 2876 scanner.trySwitchToStreamRead(); 2877 heap = scanner.heap; 2878 } 2879 } 2880 2881 private static class MyMemStoreCompactor extends MemStoreCompactor { 2882 private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0); 2883 private static final CountDownLatch START_COMPACTOR_LATCH = new CountDownLatch(1); 2884 2885 public MyMemStoreCompactor(CompactingMemStore compactingMemStore, 2886 MemoryCompactionPolicy compactionPolicy) throws IllegalArgumentIOException { 2887 super(compactingMemStore, compactionPolicy); 2888 } 2889 2890 @Override 2891 public boolean start() throws IOException { 2892 boolean isFirst = RUNNER_COUNT.getAndIncrement() == 0; 2893 if (isFirst) { 2894 try { 2895 START_COMPACTOR_LATCH.await(); 2896 return super.start(); 2897 } catch (InterruptedException ex) { 2898 throw new RuntimeException(ex); 2899 } 2900 } 2901 return super.start(); 2902 } 2903 } 2904 2905 public static class MyCompactingMemStoreWithCustomCompactor extends CompactingMemStore { 2906 private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0); 2907 2908 public MyCompactingMemStoreWithCustomCompactor(Configuration conf, CellComparatorImpl c, 2909 HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy) 2910 throws IOException { 2911 super(conf, c, store, regionServices, compactionPolicy); 2912 } 2913 2914 @Override 2915 protected MemStoreCompactor createMemStoreCompactor(MemoryCompactionPolicy compactionPolicy) 2916 throws IllegalArgumentIOException { 2917 return new MyMemStoreCompactor(this, compactionPolicy); 2918 } 2919 2920 @Override 2921 protected boolean setInMemoryCompactionFlag() { 2922 boolean rval = super.setInMemoryCompactionFlag(); 2923 if (rval) { 2924 RUNNER_COUNT.incrementAndGet(); 2925 if (LOG.isDebugEnabled()) { 2926 LOG.debug("runner count: " + RUNNER_COUNT.get()); 2927 } 2928 } 2929 return rval; 2930 } 2931 } 2932 2933 public static class MyCompactingMemStore extends CompactingMemStore { 2934 private static final AtomicBoolean START_TEST = new AtomicBoolean(false); 2935 private final CountDownLatch getScannerLatch = new CountDownLatch(1); 2936 private final CountDownLatch snapshotLatch = new CountDownLatch(1); 2937 2938 public MyCompactingMemStore(Configuration conf, CellComparatorImpl c, HStore store, 2939 RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy) 2940 throws IOException { 2941 super(conf, c, store, regionServices, compactionPolicy); 2942 } 2943 2944 @Override 2945 protected List<KeyValueScanner> createList(int capacity) { 2946 if (START_TEST.get()) { 2947 try { 2948 getScannerLatch.countDown(); 2949 snapshotLatch.await(); 2950 } catch (InterruptedException e) { 2951 throw new RuntimeException(e); 2952 } 2953 } 2954 return new ArrayList<>(capacity); 2955 } 2956 2957 @Override 2958 protected void pushActiveToPipeline(MutableSegment active, boolean checkEmpty) { 2959 if (START_TEST.get()) { 2960 try { 2961 getScannerLatch.await(); 2962 } catch (InterruptedException e) { 2963 throw new RuntimeException(e); 2964 } 2965 } 2966 2967 super.pushActiveToPipeline(active, checkEmpty); 2968 if (START_TEST.get()) { 2969 snapshotLatch.countDown(); 2970 } 2971 } 2972 } 2973 2974 interface MyListHook { 2975 void hook(int currentSize); 2976 } 2977 2978 private static class MyList<T> implements List<T> { 2979 private final List<T> delegatee = new ArrayList<>(); 2980 private final MyListHook hookAtAdd; 2981 2982 MyList(final MyListHook hookAtAdd) { 2983 this.hookAtAdd = hookAtAdd; 2984 } 2985 2986 @Override 2987 public int size() { 2988 return delegatee.size(); 2989 } 2990 2991 @Override 2992 public boolean isEmpty() { 2993 return delegatee.isEmpty(); 2994 } 2995 2996 @Override 2997 public boolean contains(Object o) { 2998 return delegatee.contains(o); 2999 } 3000 3001 @Override 3002 public Iterator<T> iterator() { 3003 return delegatee.iterator(); 3004 } 3005 3006 @Override 3007 public Object[] toArray() { 3008 return delegatee.toArray(); 3009 } 3010 3011 @Override 3012 public <R> R[] toArray(R[] a) { 3013 return delegatee.toArray(a); 3014 } 3015 3016 @Override 3017 public boolean add(T e) { 3018 hookAtAdd.hook(size()); 3019 return delegatee.add(e); 3020 } 3021 3022 @Override 3023 public boolean remove(Object o) { 3024 return delegatee.remove(o); 3025 } 3026 3027 @Override 3028 public boolean containsAll(Collection<?> c) { 3029 return delegatee.containsAll(c); 3030 } 3031 3032 @Override 3033 public boolean addAll(Collection<? extends T> c) { 3034 return delegatee.addAll(c); 3035 } 3036 3037 @Override 3038 public boolean addAll(int index, Collection<? extends T> c) { 3039 return delegatee.addAll(index, c); 3040 } 3041 3042 @Override 3043 public boolean removeAll(Collection<?> c) { 3044 return delegatee.removeAll(c); 3045 } 3046 3047 @Override 3048 public boolean retainAll(Collection<?> c) { 3049 return delegatee.retainAll(c); 3050 } 3051 3052 @Override 3053 public void clear() { 3054 delegatee.clear(); 3055 } 3056 3057 @Override 3058 public T get(int index) { 3059 return delegatee.get(index); 3060 } 3061 3062 @Override 3063 public T set(int index, T element) { 3064 return delegatee.set(index, element); 3065 } 3066 3067 @Override 3068 public void add(int index, T element) { 3069 delegatee.add(index, element); 3070 } 3071 3072 @Override 3073 public T remove(int index) { 3074 return delegatee.remove(index); 3075 } 3076 3077 @Override 3078 public int indexOf(Object o) { 3079 return delegatee.indexOf(o); 3080 } 3081 3082 @Override 3083 public int lastIndexOf(Object o) { 3084 return delegatee.lastIndexOf(o); 3085 } 3086 3087 @Override 3088 public ListIterator<T> listIterator() { 3089 return delegatee.listIterator(); 3090 } 3091 3092 @Override 3093 public ListIterator<T> listIterator(int index) { 3094 return delegatee.listIterator(index); 3095 } 3096 3097 @Override 3098 public List<T> subList(int fromIndex, int toIndex) { 3099 return delegatee.subList(fromIndex, toIndex); 3100 } 3101 } 3102 3103 public static class MyCompactingMemStore2 extends CompactingMemStore { 3104 private static final String LARGE_CELL_THREAD_NAME = "largeCellThread"; 3105 private static final String SMALL_CELL_THREAD_NAME = "smallCellThread"; 3106 private final CyclicBarrier preCyclicBarrier = new CyclicBarrier(2); 3107 private final CyclicBarrier postCyclicBarrier = new CyclicBarrier(2); 3108 private final AtomicInteger largeCellPreUpdateCounter = new AtomicInteger(0); 3109 private final AtomicInteger smallCellPreUpdateCounter = new AtomicInteger(0); 3110 3111 public MyCompactingMemStore2(Configuration conf, CellComparatorImpl cellComparator, 3112 HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy) 3113 throws IOException { 3114 super(conf, cellComparator, store, regionServices, compactionPolicy); 3115 } 3116 3117 @Override 3118 protected boolean checkAndAddToActiveSize(MutableSegment currActive, Cell cellToAdd, 3119 MemStoreSizing memstoreSizing) { 3120 if (Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)) { 3121 int currentCount = largeCellPreUpdateCounter.incrementAndGet(); 3122 if (currentCount <= 1) { 3123 try { 3124 /** 3125 * smallCellThread enters CompactingMemStore.checkAndAddToActiveSize first, then 3126 * largeCellThread enters CompactingMemStore.checkAndAddToActiveSize, and then 3127 * largeCellThread invokes flushInMemory. 3128 */ 3129 preCyclicBarrier.await(); 3130 } catch (Throwable e) { 3131 throw new RuntimeException(e); 3132 } 3133 } 3134 } 3135 3136 boolean returnValue = super.checkAndAddToActiveSize(currActive, cellToAdd, memstoreSizing); 3137 if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) { 3138 try { 3139 preCyclicBarrier.await(); 3140 } catch (Throwable e) { 3141 throw new RuntimeException(e); 3142 } 3143 } 3144 return returnValue; 3145 } 3146 3147 @Override 3148 protected void doAdd(MutableSegment currentActive, Cell cell, MemStoreSizing memstoreSizing) { 3149 if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) { 3150 try { 3151 /** 3152 * After largeCellThread finished flushInMemory method, smallCellThread can add cell to 3153 * currentActive . That is to say when largeCellThread called flushInMemory method, 3154 * currentActive has no cell. 3155 */ 3156 postCyclicBarrier.await(); 3157 } catch (Throwable e) { 3158 throw new RuntimeException(e); 3159 } 3160 } 3161 super.doAdd(currentActive, cell, memstoreSizing); 3162 } 3163 3164 @Override 3165 protected void flushInMemory(MutableSegment currentActiveMutableSegment) { 3166 super.flushInMemory(currentActiveMutableSegment); 3167 if (Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)) { 3168 if (largeCellPreUpdateCounter.get() <= 1) { 3169 try { 3170 postCyclicBarrier.await(); 3171 } catch (Throwable e) { 3172 throw new RuntimeException(e); 3173 } 3174 } 3175 } 3176 } 3177 3178 } 3179 3180 public static class MyCompactingMemStore3 extends CompactingMemStore { 3181 private static final String LARGE_CELL_THREAD_NAME = "largeCellThread"; 3182 private static final String SMALL_CELL_THREAD_NAME = "smallCellThread"; 3183 3184 private final CyclicBarrier preCyclicBarrier = new CyclicBarrier(2); 3185 private final CyclicBarrier postCyclicBarrier = new CyclicBarrier(2); 3186 private final AtomicInteger flushCounter = new AtomicInteger(0); 3187 private static final int CELL_COUNT = 5; 3188 private boolean flushByteSizeLessThanSmallAndLargeCellSize = true; 3189 3190 public MyCompactingMemStore3(Configuration conf, CellComparatorImpl cellComparator, 3191 HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy) 3192 throws IOException { 3193 super(conf, cellComparator, store, regionServices, compactionPolicy); 3194 } 3195 3196 @Override 3197 protected boolean checkAndAddToActiveSize(MutableSegment currActive, Cell cellToAdd, 3198 MemStoreSizing memstoreSizing) { 3199 if (!flushByteSizeLessThanSmallAndLargeCellSize) { 3200 return super.checkAndAddToActiveSize(currActive, cellToAdd, memstoreSizing); 3201 } 3202 if (Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)) { 3203 try { 3204 preCyclicBarrier.await(); 3205 } catch (Throwable e) { 3206 throw new RuntimeException(e); 3207 } 3208 } 3209 3210 boolean returnValue = super.checkAndAddToActiveSize(currActive, cellToAdd, memstoreSizing); 3211 if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) { 3212 try { 3213 preCyclicBarrier.await(); 3214 } catch (Throwable e) { 3215 throw new RuntimeException(e); 3216 } 3217 } 3218 return returnValue; 3219 } 3220 3221 @Override 3222 protected void postUpdate(MutableSegment currentActiveMutableSegment) { 3223 super.postUpdate(currentActiveMutableSegment); 3224 if (!flushByteSizeLessThanSmallAndLargeCellSize) { 3225 try { 3226 postCyclicBarrier.await(); 3227 } catch (Throwable e) { 3228 throw new RuntimeException(e); 3229 } 3230 return; 3231 } 3232 3233 if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) { 3234 try { 3235 postCyclicBarrier.await(); 3236 } catch (Throwable e) { 3237 throw new RuntimeException(e); 3238 } 3239 } 3240 } 3241 3242 @Override 3243 protected void flushInMemory(MutableSegment currentActiveMutableSegment) { 3244 super.flushInMemory(currentActiveMutableSegment); 3245 flushCounter.incrementAndGet(); 3246 if (!flushByteSizeLessThanSmallAndLargeCellSize) { 3247 return; 3248 } 3249 3250 assertTrue(Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)); 3251 try { 3252 postCyclicBarrier.await(); 3253 } catch (Throwable e) { 3254 throw new RuntimeException(e); 3255 } 3256 3257 } 3258 3259 void disableCompaction() { 3260 allowCompaction.set(false); 3261 } 3262 3263 void enableCompaction() { 3264 allowCompaction.set(true); 3265 } 3266 3267 } 3268 3269 public static class MyCompactingMemStore4 extends CompactingMemStore { 3270 private static final String TAKE_SNAPSHOT_THREAD_NAME = "takeSnapShotThread"; 3271 /** 3272 * {@link CompactingMemStore#flattenOneSegment} must execute after 3273 * {@link CompactingMemStore#getImmutableSegments} 3274 */ 3275 private final CyclicBarrier flattenOneSegmentPreCyclicBarrier = new CyclicBarrier(2); 3276 /** 3277 * Only after {@link CompactingMemStore#flattenOneSegment} completed, 3278 * {@link CompactingMemStore#swapPipelineWithNull} could execute. 3279 */ 3280 private final CyclicBarrier flattenOneSegmentPostCyclicBarrier = new CyclicBarrier(2); 3281 /** 3282 * Only the in memory compact thread enters {@link CompactingMemStore#flattenOneSegment},the 3283 * snapshot thread starts {@link CompactingMemStore#snapshot},because 3284 * {@link CompactingMemStore#snapshot} would invoke {@link CompactingMemStore#stopCompaction}. 3285 */ 3286 private final CyclicBarrier snapShotStartCyclicCyclicBarrier = new CyclicBarrier(2); 3287 /** 3288 * To wait for {@link CompactingMemStore.InMemoryCompactionRunnable} stopping. 3289 */ 3290 private final CyclicBarrier inMemoryCompactionEndCyclicBarrier = new CyclicBarrier(2); 3291 private final AtomicInteger getImmutableSegmentsListCounter = new AtomicInteger(0); 3292 private final AtomicInteger swapPipelineWithNullCounter = new AtomicInteger(0); 3293 private final AtomicInteger flattenOneSegmentCounter = new AtomicInteger(0); 3294 private final AtomicInteger setInMemoryCompactionFlagCounter = new AtomicInteger(0); 3295 3296 public MyCompactingMemStore4(Configuration conf, CellComparatorImpl cellComparator, 3297 HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy) 3298 throws IOException { 3299 super(conf, cellComparator, store, regionServices, compactionPolicy); 3300 } 3301 3302 @Override 3303 public VersionedSegmentsList getImmutableSegments() { 3304 VersionedSegmentsList result = super.getImmutableSegments(); 3305 if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) { 3306 int currentCount = getImmutableSegmentsListCounter.incrementAndGet(); 3307 if (currentCount <= 1) { 3308 try { 3309 flattenOneSegmentPreCyclicBarrier.await(); 3310 } catch (Throwable e) { 3311 throw new RuntimeException(e); 3312 } 3313 } 3314 } 3315 return result; 3316 } 3317 3318 @Override 3319 protected boolean swapPipelineWithNull(VersionedSegmentsList segments) { 3320 if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) { 3321 int currentCount = swapPipelineWithNullCounter.incrementAndGet(); 3322 if (currentCount <= 1) { 3323 try { 3324 flattenOneSegmentPostCyclicBarrier.await(); 3325 } catch (Throwable e) { 3326 throw new RuntimeException(e); 3327 } 3328 } 3329 } 3330 boolean result = super.swapPipelineWithNull(segments); 3331 if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) { 3332 int currentCount = swapPipelineWithNullCounter.get(); 3333 if (currentCount <= 1) { 3334 assertTrue(!result); 3335 } 3336 if (currentCount == 2) { 3337 assertTrue(result); 3338 } 3339 } 3340 return result; 3341 3342 } 3343 3344 @Override 3345 public void flattenOneSegment(long requesterVersion, Action action) { 3346 int currentCount = flattenOneSegmentCounter.incrementAndGet(); 3347 if (currentCount <= 1) { 3348 try { 3349 /** 3350 * {@link CompactingMemStore#snapshot} could start. 3351 */ 3352 snapShotStartCyclicCyclicBarrier.await(); 3353 flattenOneSegmentPreCyclicBarrier.await(); 3354 } catch (Throwable e) { 3355 throw new RuntimeException(e); 3356 } 3357 } 3358 super.flattenOneSegment(requesterVersion, action); 3359 if (currentCount <= 1) { 3360 try { 3361 flattenOneSegmentPostCyclicBarrier.await(); 3362 } catch (Throwable e) { 3363 throw new RuntimeException(e); 3364 } 3365 } 3366 } 3367 3368 @Override 3369 protected boolean setInMemoryCompactionFlag() { 3370 boolean result = super.setInMemoryCompactionFlag(); 3371 assertTrue(result); 3372 setInMemoryCompactionFlagCounter.incrementAndGet(); 3373 return result; 3374 } 3375 3376 @Override 3377 void inMemoryCompaction() { 3378 try { 3379 super.inMemoryCompaction(); 3380 } finally { 3381 try { 3382 inMemoryCompactionEndCyclicBarrier.await(); 3383 } catch (Throwable e) { 3384 throw new RuntimeException(e); 3385 } 3386 3387 } 3388 } 3389 3390 } 3391 3392 public static class MyCompactingMemStore5 extends CompactingMemStore { 3393 private static final String TAKE_SNAPSHOT_THREAD_NAME = "takeSnapShotThread"; 3394 private static final String WRITE_AGAIN_THREAD_NAME = "writeAgainThread"; 3395 /** 3396 * {@link CompactingMemStore#flattenOneSegment} must execute after 3397 * {@link CompactingMemStore#getImmutableSegments} 3398 */ 3399 private final CyclicBarrier flattenOneSegmentPreCyclicBarrier = new CyclicBarrier(2); 3400 /** 3401 * Only after {@link CompactingMemStore#flattenOneSegment} completed, 3402 * {@link CompactingMemStore#swapPipelineWithNull} could execute. 3403 */ 3404 private final CyclicBarrier flattenOneSegmentPostCyclicBarrier = new CyclicBarrier(2); 3405 /** 3406 * Only the in memory compact thread enters {@link CompactingMemStore#flattenOneSegment},the 3407 * snapshot thread starts {@link CompactingMemStore#snapshot},because 3408 * {@link CompactingMemStore#snapshot} would invoke {@link CompactingMemStore#stopCompaction}. 3409 */ 3410 private final CyclicBarrier snapShotStartCyclicCyclicBarrier = new CyclicBarrier(2); 3411 /** 3412 * To wait for {@link CompactingMemStore.InMemoryCompactionRunnable} stopping. 3413 */ 3414 private final CyclicBarrier inMemoryCompactionEndCyclicBarrier = new CyclicBarrier(2); 3415 private final AtomicInteger getImmutableSegmentsListCounter = new AtomicInteger(0); 3416 private final AtomicInteger swapPipelineWithNullCounter = new AtomicInteger(0); 3417 private final AtomicInteger flattenOneSegmentCounter = new AtomicInteger(0); 3418 private final AtomicInteger setInMemoryCompactionFlagCounter = new AtomicInteger(0); 3419 /** 3420 * Only the snapshot thread retry {@link CompactingMemStore#swapPipelineWithNull}, writeAgain 3421 * thread could start. 3422 */ 3423 private final CyclicBarrier writeMemStoreAgainStartCyclicBarrier = new CyclicBarrier(2); 3424 /** 3425 * This is used for snapshot thread,writeAgain thread and in memory compact thread. Only the 3426 * writeAgain thread completes, {@link CompactingMemStore#swapPipelineWithNull} would 3427 * execute,and in memory compact thread would exit,because we expect that in memory compact 3428 * executing only once. 3429 */ 3430 private final CyclicBarrier writeMemStoreAgainEndCyclicBarrier = new CyclicBarrier(3); 3431 3432 public MyCompactingMemStore5(Configuration conf, CellComparatorImpl cellComparator, 3433 HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy) 3434 throws IOException { 3435 super(conf, cellComparator, store, regionServices, compactionPolicy); 3436 } 3437 3438 @Override 3439 public VersionedSegmentsList getImmutableSegments() { 3440 VersionedSegmentsList result = super.getImmutableSegments(); 3441 if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) { 3442 int currentCount = getImmutableSegmentsListCounter.incrementAndGet(); 3443 if (currentCount <= 1) { 3444 try { 3445 flattenOneSegmentPreCyclicBarrier.await(); 3446 } catch (Throwable e) { 3447 throw new RuntimeException(e); 3448 } 3449 } 3450 3451 } 3452 3453 return result; 3454 } 3455 3456 @Override 3457 protected boolean swapPipelineWithNull(VersionedSegmentsList segments) { 3458 if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) { 3459 int currentCount = swapPipelineWithNullCounter.incrementAndGet(); 3460 if (currentCount <= 1) { 3461 try { 3462 flattenOneSegmentPostCyclicBarrier.await(); 3463 } catch (Throwable e) { 3464 throw new RuntimeException(e); 3465 } 3466 } 3467 3468 if (currentCount == 2) { 3469 try { 3470 /** 3471 * Only the snapshot thread retry {@link CompactingMemStore#swapPipelineWithNull}, 3472 * writeAgain thread could start. 3473 */ 3474 writeMemStoreAgainStartCyclicBarrier.await(); 3475 /** 3476 * Only the writeAgain thread completes, retry 3477 * {@link CompactingMemStore#swapPipelineWithNull} would execute. 3478 */ 3479 writeMemStoreAgainEndCyclicBarrier.await(); 3480 } catch (Throwable e) { 3481 throw new RuntimeException(e); 3482 } 3483 } 3484 3485 } 3486 boolean result = super.swapPipelineWithNull(segments); 3487 if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) { 3488 int currentCount = swapPipelineWithNullCounter.get(); 3489 if (currentCount <= 1) { 3490 assertTrue(!result); 3491 } 3492 if (currentCount == 2) { 3493 assertTrue(result); 3494 } 3495 } 3496 return result; 3497 3498 } 3499 3500 @Override 3501 public void flattenOneSegment(long requesterVersion, Action action) { 3502 int currentCount = flattenOneSegmentCounter.incrementAndGet(); 3503 if (currentCount <= 1) { 3504 try { 3505 /** 3506 * {@link CompactingMemStore#snapshot} could start. 3507 */ 3508 snapShotStartCyclicCyclicBarrier.await(); 3509 flattenOneSegmentPreCyclicBarrier.await(); 3510 } catch (Throwable e) { 3511 throw new RuntimeException(e); 3512 } 3513 } 3514 super.flattenOneSegment(requesterVersion, action); 3515 if (currentCount <= 1) { 3516 try { 3517 flattenOneSegmentPostCyclicBarrier.await(); 3518 /** 3519 * Only the writeAgain thread completes, in memory compact thread would exit,because we 3520 * expect that in memory compact executing only once. 3521 */ 3522 writeMemStoreAgainEndCyclicBarrier.await(); 3523 } catch (Throwable e) { 3524 throw new RuntimeException(e); 3525 } 3526 3527 } 3528 } 3529 3530 @Override 3531 protected boolean setInMemoryCompactionFlag() { 3532 boolean result = super.setInMemoryCompactionFlag(); 3533 int count = setInMemoryCompactionFlagCounter.incrementAndGet(); 3534 if (count <= 1) { 3535 assertTrue(result); 3536 } 3537 if (count == 2) { 3538 assertTrue(!result); 3539 } 3540 return result; 3541 } 3542 3543 @Override 3544 void inMemoryCompaction() { 3545 try { 3546 super.inMemoryCompaction(); 3547 } finally { 3548 try { 3549 inMemoryCompactionEndCyclicBarrier.await(); 3550 } catch (Throwable e) { 3551 throw new RuntimeException(e); 3552 } 3553 3554 } 3555 } 3556 } 3557 3558 public static class MyCompactingMemStore6 extends CompactingMemStore { 3559 private final CyclicBarrier inMemoryCompactionEndCyclicBarrier = new CyclicBarrier(2); 3560 3561 public MyCompactingMemStore6(Configuration conf, CellComparatorImpl cellComparator, 3562 HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy) 3563 throws IOException { 3564 super(conf, cellComparator, store, regionServices, compactionPolicy); 3565 } 3566 3567 @Override 3568 void inMemoryCompaction() { 3569 try { 3570 super.inMemoryCompaction(); 3571 } finally { 3572 try { 3573 inMemoryCompactionEndCyclicBarrier.await(); 3574 } catch (Throwable e) { 3575 throw new RuntimeException(e); 3576 } 3577 3578 } 3579 } 3580 } 3581 3582 public static class MyDefaultMemStore extends DefaultMemStore { 3583 private static final String GET_SCANNER_THREAD_NAME = "getScannerMyThread"; 3584 private static final String FLUSH_THREAD_NAME = "flushMyThread"; 3585 /** 3586 * Only when flush thread enters {@link DefaultMemStore#doClearSnapShot}, getScanner thread 3587 * could start. 3588 */ 3589 private final CyclicBarrier getScannerCyclicBarrier = new CyclicBarrier(2); 3590 /** 3591 * Used by getScanner thread notifies flush thread {@link DefaultMemStore#getSnapshotSegments} 3592 * completed, {@link DefaultMemStore#doClearSnapShot} could continue. 3593 */ 3594 private final CyclicBarrier preClearSnapShotCyclicBarrier = new CyclicBarrier(2); 3595 /** 3596 * Used by flush thread notifies getScanner thread {@link DefaultMemStore#doClearSnapShot} 3597 * completed, {@link DefaultMemStore#getScanners} could continue. 3598 */ 3599 private final CyclicBarrier postClearSnapShotCyclicBarrier = new CyclicBarrier(2); 3600 private final AtomicInteger getSnapshotSegmentsCounter = new AtomicInteger(0); 3601 private final AtomicInteger clearSnapshotCounter = new AtomicInteger(0); 3602 private volatile boolean shouldWait = true; 3603 private volatile HStore store = null; 3604 3605 public MyDefaultMemStore(Configuration conf, CellComparator cellComparator, 3606 RegionServicesForStores regionServices) throws IOException { 3607 super(conf, cellComparator, regionServices); 3608 } 3609 3610 @Override 3611 protected List<Segment> getSnapshotSegments() { 3612 3613 List<Segment> result = super.getSnapshotSegments(); 3614 3615 if (Thread.currentThread().getName().equals(GET_SCANNER_THREAD_NAME)) { 3616 int currentCount = getSnapshotSegmentsCounter.incrementAndGet(); 3617 if (currentCount == 1) { 3618 if (this.shouldWait) { 3619 try { 3620 /** 3621 * Notify flush thread {@link DefaultMemStore#getSnapshotSegments} completed, 3622 * {@link DefaultMemStore#doClearSnapShot} could continue. 3623 */ 3624 preClearSnapShotCyclicBarrier.await(); 3625 /** 3626 * Wait for {@link DefaultMemStore#doClearSnapShot} completed. 3627 */ 3628 postClearSnapShotCyclicBarrier.await(); 3629 3630 } catch (Throwable e) { 3631 throw new RuntimeException(e); 3632 } 3633 } 3634 } 3635 } 3636 return result; 3637 } 3638 3639 @Override 3640 protected void doClearSnapShot() { 3641 if (Thread.currentThread().getName().equals(FLUSH_THREAD_NAME)) { 3642 int currentCount = clearSnapshotCounter.incrementAndGet(); 3643 if (currentCount == 1) { 3644 try { 3645 if ( 3646 ((ReentrantReadWriteLock) store.getStoreEngine().getLock()) 3647 .isWriteLockedByCurrentThread() 3648 ) { 3649 shouldWait = false; 3650 } 3651 /** 3652 * Only when flush thread enters {@link DefaultMemStore#doClearSnapShot}, getScanner 3653 * thread could start. 3654 */ 3655 getScannerCyclicBarrier.await(); 3656 3657 if (shouldWait) { 3658 /** 3659 * Wait for {@link DefaultMemStore#getSnapshotSegments} completed. 3660 */ 3661 preClearSnapShotCyclicBarrier.await(); 3662 } 3663 } catch (Throwable e) { 3664 throw new RuntimeException(e); 3665 } 3666 } 3667 } 3668 super.doClearSnapShot(); 3669 3670 if (Thread.currentThread().getName().equals(FLUSH_THREAD_NAME)) { 3671 int currentCount = clearSnapshotCounter.get(); 3672 if (currentCount == 1) { 3673 if (shouldWait) { 3674 try { 3675 /** 3676 * Notify getScanner thread {@link DefaultMemStore#doClearSnapShot} completed, 3677 * {@link DefaultMemStore#getScanners} could continue. 3678 */ 3679 postClearSnapShotCyclicBarrier.await(); 3680 } catch (Throwable e) { 3681 throw new RuntimeException(e); 3682 } 3683 } 3684 } 3685 } 3686 } 3687 } 3688}