001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.Assert.assertArrayEquals; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertFalse; 023import static org.junit.Assert.assertNull; 024import static org.junit.Assert.assertTrue; 025import static org.junit.Assert.fail; 026import static org.mockito.ArgumentMatchers.any; 027import static org.mockito.Mockito.mock; 028import static org.mockito.Mockito.spy; 029import static org.mockito.Mockito.times; 030import static org.mockito.Mockito.verify; 031import static org.mockito.Mockito.when; 032 033import java.io.IOException; 034import java.lang.ref.SoftReference; 035import java.security.PrivilegedExceptionAction; 036import java.util.ArrayList; 037import java.util.Arrays; 038import java.util.Collection; 039import java.util.Collections; 040import java.util.Iterator; 041import java.util.List; 042import java.util.ListIterator; 043import java.util.NavigableSet; 044import java.util.TreeSet; 045import java.util.concurrent.ConcurrentSkipListSet; 046import java.util.concurrent.CountDownLatch; 047import java.util.concurrent.CyclicBarrier; 048import java.util.concurrent.ExecutorService; 049import java.util.concurrent.Executors; 050import java.util.concurrent.ThreadPoolExecutor; 051import java.util.concurrent.TimeUnit; 052import java.util.concurrent.atomic.AtomicBoolean; 053import java.util.concurrent.atomic.AtomicInteger; 054import java.util.concurrent.atomic.AtomicLong; 055import java.util.concurrent.atomic.AtomicReference; 056import java.util.concurrent.locks.ReentrantReadWriteLock; 057import java.util.function.Consumer; 058import java.util.function.IntBinaryOperator; 059import org.apache.hadoop.conf.Configuration; 060import org.apache.hadoop.fs.FSDataOutputStream; 061import org.apache.hadoop.fs.FileStatus; 062import org.apache.hadoop.fs.FileSystem; 063import org.apache.hadoop.fs.FilterFileSystem; 064import org.apache.hadoop.fs.LocalFileSystem; 065import org.apache.hadoop.fs.Path; 066import org.apache.hadoop.fs.permission.FsPermission; 067import org.apache.hadoop.hbase.Cell; 068import org.apache.hadoop.hbase.CellBuilderFactory; 069import org.apache.hadoop.hbase.CellBuilderType; 070import org.apache.hadoop.hbase.CellComparator; 071import org.apache.hadoop.hbase.CellComparatorImpl; 072import org.apache.hadoop.hbase.CellUtil; 073import org.apache.hadoop.hbase.HBaseClassTestRule; 074import org.apache.hadoop.hbase.HBaseConfiguration; 075import org.apache.hadoop.hbase.HBaseTestingUtility; 076import org.apache.hadoop.hbase.HConstants; 077import org.apache.hadoop.hbase.KeyValue; 078import org.apache.hadoop.hbase.MemoryCompactionPolicy; 079import org.apache.hadoop.hbase.NamespaceDescriptor; 080import org.apache.hadoop.hbase.PrivateCellUtil; 081import org.apache.hadoop.hbase.TableName; 082import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 083import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 084import org.apache.hadoop.hbase.client.Get; 085import org.apache.hadoop.hbase.client.RegionInfo; 086import org.apache.hadoop.hbase.client.RegionInfoBuilder; 087import org.apache.hadoop.hbase.client.Scan; 088import org.apache.hadoop.hbase.client.Scan.ReadType; 089import org.apache.hadoop.hbase.client.TableDescriptor; 090import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 091import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException; 092import org.apache.hadoop.hbase.filter.Filter; 093import org.apache.hadoop.hbase.filter.FilterBase; 094import org.apache.hadoop.hbase.io.compress.Compression; 095import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 096import org.apache.hadoop.hbase.io.hfile.CacheConfig; 097import org.apache.hadoop.hbase.io.hfile.HFile; 098import org.apache.hadoop.hbase.io.hfile.HFileContext; 099import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 100import org.apache.hadoop.hbase.monitoring.MonitoredTask; 101import org.apache.hadoop.hbase.nio.RefCnt; 102import org.apache.hadoop.hbase.quotas.RegionSizeStoreImpl; 103import org.apache.hadoop.hbase.regionserver.MemStoreCompactionStrategy.Action; 104import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; 105import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; 106import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher; 107import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; 108import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 109import org.apache.hadoop.hbase.security.User; 110import org.apache.hadoop.hbase.testclassification.MediumTests; 111import org.apache.hadoop.hbase.testclassification.RegionServerTests; 112import org.apache.hadoop.hbase.util.BloomFilterUtil; 113import org.apache.hadoop.hbase.util.Bytes; 114import org.apache.hadoop.hbase.util.CommonFSUtils; 115import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 116import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; 117import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge; 118import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; 119import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 120import org.apache.hadoop.hbase.wal.WALFactory; 121import org.apache.hadoop.util.Progressable; 122import org.junit.After; 123import org.junit.AfterClass; 124import org.junit.Before; 125import org.junit.ClassRule; 126import org.junit.Rule; 127import org.junit.Test; 128import org.junit.experimental.categories.Category; 129import org.junit.rules.TestName; 130import org.mockito.Mockito; 131import org.slf4j.Logger; 132import org.slf4j.LoggerFactory; 133 134import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 135 136/** 137 * Test class for the HStore 138 */ 139@Category({ RegionServerTests.class, MediumTests.class }) 140public class TestHStore { 141 142 @ClassRule 143 public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestHStore.class); 144 145 private static final Logger LOG = LoggerFactory.getLogger(TestHStore.class); 146 @Rule 147 public TestName name = new TestName(); 148 149 HRegion region; 150 HStore store; 151 byte[] table = Bytes.toBytes("table"); 152 byte[] family = Bytes.toBytes("family"); 153 154 byte[] row = Bytes.toBytes("row"); 155 byte[] row2 = Bytes.toBytes("row2"); 156 byte[] qf1 = Bytes.toBytes("qf1"); 157 byte[] qf2 = Bytes.toBytes("qf2"); 158 byte[] qf3 = Bytes.toBytes("qf3"); 159 byte[] qf4 = Bytes.toBytes("qf4"); 160 byte[] qf5 = Bytes.toBytes("qf5"); 161 byte[] qf6 = Bytes.toBytes("qf6"); 162 163 NavigableSet<byte[]> qualifiers = new ConcurrentSkipListSet<>(Bytes.BYTES_COMPARATOR); 164 165 List<Cell> expected = new ArrayList<>(); 166 List<Cell> result = new ArrayList<>(); 167 168 long id = EnvironmentEdgeManager.currentTime(); 169 Get get = new Get(row); 170 171 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 172 private static final String DIR = TEST_UTIL.getDataTestDir("TestStore").toString(); 173 174 @Before 175 public void setUp() throws IOException { 176 qualifiers.clear(); 177 qualifiers.add(qf1); 178 qualifiers.add(qf3); 179 qualifiers.add(qf5); 180 181 Iterator<byte[]> iter = qualifiers.iterator(); 182 while (iter.hasNext()) { 183 byte[] next = iter.next(); 184 expected.add(new KeyValue(row, family, next, 1, (byte[]) null)); 185 get.addColumn(family, next); 186 } 187 } 188 189 private void init(String methodName) throws IOException { 190 init(methodName, TEST_UTIL.getConfiguration()); 191 } 192 193 private HStore init(String methodName, Configuration conf) throws IOException { 194 // some of the tests write 4 versions and then flush 195 // (with HBASE-4241, lower versions are collected on flush) 196 return init(methodName, conf, 197 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(4).build()); 198 } 199 200 private HStore init(String methodName, Configuration conf, ColumnFamilyDescriptor hcd) 201 throws IOException { 202 return init(methodName, conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), hcd); 203 } 204 205 private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder, 206 ColumnFamilyDescriptor hcd) throws IOException { 207 return init(methodName, conf, builder, hcd, null); 208 } 209 210 private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder, 211 ColumnFamilyDescriptor hcd, MyStoreHook hook) throws IOException { 212 return init(methodName, conf, builder, hcd, hook, false); 213 } 214 215 private void initHRegion(String methodName, Configuration conf, TableDescriptorBuilder builder, 216 ColumnFamilyDescriptor hcd, MyStoreHook hook, boolean switchToPread) throws IOException { 217 TableDescriptor htd = builder.setColumnFamily(hcd).build(); 218 Path basedir = new Path(DIR + methodName); 219 Path tableDir = CommonFSUtils.getTableDir(basedir, htd.getTableName()); 220 final Path logdir = new Path(basedir, AbstractFSWALProvider.getWALDirectoryName(methodName)); 221 222 FileSystem fs = FileSystem.get(conf); 223 224 fs.delete(logdir, true); 225 ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 226 MemStoreLABImpl.CHUNK_SIZE_DEFAULT, 1, 0, null, 227 MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); 228 RegionInfo info = RegionInfoBuilder.newBuilder(htd.getTableName()).build(); 229 Configuration walConf = new Configuration(conf); 230 CommonFSUtils.setRootDir(walConf, basedir); 231 WALFactory wals = new WALFactory(walConf, methodName); 232 region = new HRegion(new HRegionFileSystem(conf, fs, tableDir, info), wals.getWAL(info), conf, 233 htd, null); 234 region.regionServicesForStores = Mockito.spy(region.regionServicesForStores); 235 ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1); 236 Mockito.when(region.regionServicesForStores.getInMemoryCompactionPool()).thenReturn(pool); 237 } 238 239 private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder, 240 ColumnFamilyDescriptor hcd, MyStoreHook hook, boolean switchToPread) throws IOException { 241 initHRegion(methodName, conf, builder, hcd, hook, switchToPread); 242 if (hook == null) { 243 store = new HStore(region, hcd, conf, false); 244 } else { 245 store = new MyStore(region, hcd, conf, hook, switchToPread); 246 } 247 region.stores.put(store.getColumnFamilyDescriptor().getName(), store); 248 return store; 249 } 250 251 /** 252 * Test we do not lose data if we fail a flush and then close. Part of HBase-10466 253 */ 254 @Test 255 public void testFlushSizeSizing() throws Exception { 256 LOG.info("Setting up a faulty file system that cannot write in " + this.name.getMethodName()); 257 final Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); 258 // Only retry once. 259 conf.setInt("hbase.hstore.flush.retries.number", 1); 260 User user = User.createUserForTesting(conf, this.name.getMethodName(), new String[] { "foo" }); 261 // Inject our faulty LocalFileSystem 262 conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class); 263 user.runAs(new PrivilegedExceptionAction<Object>() { 264 @Override 265 public Object run() throws Exception { 266 // Make sure it worked (above is sensitive to caching details in hadoop core) 267 FileSystem fs = FileSystem.get(conf); 268 assertEquals(FaultyFileSystem.class, fs.getClass()); 269 FaultyFileSystem ffs = (FaultyFileSystem) fs; 270 271 // Initialize region 272 init(name.getMethodName(), conf); 273 274 MemStoreSize mss = store.memstore.getFlushableSize(); 275 assertEquals(0, mss.getDataSize()); 276 LOG.info("Adding some data"); 277 MemStoreSizing kvSize = new NonThreadSafeMemStoreSizing(); 278 store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), kvSize); 279 // add the heap size of active (mutable) segment 280 kvSize.incMemStoreSize(0, MutableSegment.DEEP_OVERHEAD, 0, 0); 281 mss = store.memstore.getFlushableSize(); 282 assertEquals(kvSize.getMemStoreSize(), mss); 283 // Flush. Bug #1 from HBASE-10466. Make sure size calculation on failed flush is right. 284 try { 285 LOG.info("Flushing"); 286 flushStore(store, id++); 287 fail("Didn't bubble up IOE!"); 288 } catch (IOException ioe) { 289 assertTrue(ioe.getMessage().contains("Fault injected")); 290 } 291 // due to snapshot, change mutable to immutable segment 292 kvSize.incMemStoreSize(0, 293 CSLMImmutableSegment.DEEP_OVERHEAD_CSLM - MutableSegment.DEEP_OVERHEAD, 0, 0); 294 mss = store.memstore.getFlushableSize(); 295 assertEquals(kvSize.getMemStoreSize(), mss); 296 MemStoreSizing kvSize2 = new NonThreadSafeMemStoreSizing(); 297 store.add(new KeyValue(row, family, qf2, 2, (byte[]) null), kvSize2); 298 kvSize2.incMemStoreSize(0, MutableSegment.DEEP_OVERHEAD, 0, 0); 299 // Even though we add a new kv, we expect the flushable size to be 'same' since we have 300 // not yet cleared the snapshot -- the above flush failed. 301 assertEquals(kvSize.getMemStoreSize(), mss); 302 ffs.fault.set(false); 303 flushStore(store, id++); 304 mss = store.memstore.getFlushableSize(); 305 // Size should be the foreground kv size. 306 assertEquals(kvSize2.getMemStoreSize(), mss); 307 flushStore(store, id++); 308 mss = store.memstore.getFlushableSize(); 309 assertEquals(0, mss.getDataSize()); 310 assertEquals(MutableSegment.DEEP_OVERHEAD, mss.getHeapSize()); 311 return null; 312 } 313 }); 314 } 315 316 @Test 317 public void testStoreBloomFilterMetricsWithBloomRowCol() throws IOException { 318 int numStoreFiles = 5; 319 writeAndRead(BloomType.ROWCOL, numStoreFiles); 320 321 assertEquals(0, store.getBloomFilterEligibleRequestsCount()); 322 // hard to know exactly the numbers here, we are just trying to 323 // prove that they are incrementing 324 assertTrue(store.getBloomFilterRequestsCount() >= numStoreFiles); 325 assertTrue(store.getBloomFilterNegativeResultsCount() > 0); 326 } 327 328 @Test 329 public void testStoreBloomFilterMetricsWithBloomRow() throws IOException { 330 int numStoreFiles = 5; 331 writeAndRead(BloomType.ROWCOL, numStoreFiles); 332 333 assertEquals(0, store.getBloomFilterEligibleRequestsCount()); 334 // hard to know exactly the numbers here, we are just trying to 335 // prove that they are incrementing 336 assertTrue(store.getBloomFilterRequestsCount() >= numStoreFiles); 337 assertTrue(store.getBloomFilterNegativeResultsCount() > 0); 338 } 339 340 @Test 341 public void testStoreBloomFilterMetricsWithBloomRowPrefix() throws IOException { 342 int numStoreFiles = 5; 343 writeAndRead(BloomType.ROWPREFIX_FIXED_LENGTH, numStoreFiles); 344 345 assertEquals(0, store.getBloomFilterEligibleRequestsCount()); 346 // hard to know exactly the numbers here, we are just trying to 347 // prove that they are incrementing 348 assertTrue(store.getBloomFilterRequestsCount() >= numStoreFiles); 349 } 350 351 @Test 352 public void testStoreBloomFilterMetricsWithBloomNone() throws IOException { 353 int numStoreFiles = 5; 354 writeAndRead(BloomType.NONE, numStoreFiles); 355 356 assertEquals(0, store.getBloomFilterRequestsCount()); 357 assertEquals(0, store.getBloomFilterNegativeResultsCount()); 358 359 // hard to know exactly the numbers here, we are just trying to 360 // prove that they are incrementing 361 assertTrue(store.getBloomFilterEligibleRequestsCount() >= numStoreFiles); 362 } 363 364 private void writeAndRead(BloomType bloomType, int numStoreFiles) throws IOException { 365 Configuration conf = HBaseConfiguration.create(); 366 FileSystem fs = FileSystem.get(conf); 367 368 ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.newBuilder(family) 369 .setCompressionType(Compression.Algorithm.GZ).setBloomFilterType(bloomType) 370 .setConfiguration(BloomFilterUtil.PREFIX_LENGTH_KEY, "3").build(); 371 init(name.getMethodName(), conf, hcd); 372 373 for (int i = 1; i <= numStoreFiles; i++) { 374 byte[] row = Bytes.toBytes("row" + i); 375 LOG.info("Adding some data for the store file #" + i); 376 long timeStamp = EnvironmentEdgeManager.currentTime(); 377 this.store.add(new KeyValue(row, family, qf1, timeStamp, (byte[]) null), null); 378 this.store.add(new KeyValue(row, family, qf2, timeStamp, (byte[]) null), null); 379 this.store.add(new KeyValue(row, family, qf3, timeStamp, (byte[]) null), null); 380 flush(i); 381 } 382 383 // Verify the total number of store files 384 assertEquals(numStoreFiles, this.store.getStorefiles().size()); 385 386 TreeSet<byte[]> columns = new TreeSet<>(Bytes.BYTES_COMPARATOR); 387 columns.add(qf1); 388 389 for (int i = 1; i <= numStoreFiles; i++) { 390 KeyValueScanner scanner = 391 store.getScanner(new Scan(new Get(Bytes.toBytes("row" + i))), columns, 0); 392 scanner.peek(); 393 } 394 } 395 396 /** 397 * Verify that compression and data block encoding are respected by the createWriter method, used 398 * on store flush. 399 */ 400 @Test 401 public void testCreateWriter() throws Exception { 402 Configuration conf = HBaseConfiguration.create(); 403 FileSystem fs = FileSystem.get(conf); 404 405 ColumnFamilyDescriptor hcd = 406 ColumnFamilyDescriptorBuilder.newBuilder(family).setCompressionType(Compression.Algorithm.GZ) 407 .setDataBlockEncoding(DataBlockEncoding.DIFF).build(); 408 init(name.getMethodName(), conf, hcd); 409 410 // Test createWriter 411 StoreFileWriter writer = store.getStoreEngine() 412 .createWriter(CreateStoreFileWriterParams.create().maxKeyCount(4) 413 .compression(hcd.getCompressionType()).isCompaction(false).includeMVCCReadpoint(true) 414 .includesTag(false).shouldDropBehind(false)); 415 Path path = writer.getPath(); 416 writer.append(new KeyValue(row, family, qf1, Bytes.toBytes(1))); 417 writer.append(new KeyValue(row, family, qf2, Bytes.toBytes(2))); 418 writer.append(new KeyValue(row2, family, qf1, Bytes.toBytes(3))); 419 writer.append(new KeyValue(row2, family, qf2, Bytes.toBytes(4))); 420 writer.close(); 421 422 // Verify that compression and encoding settings are respected 423 HFile.Reader reader = HFile.createReader(fs, path, new CacheConfig(conf), true, conf); 424 assertEquals(hcd.getCompressionType(), reader.getTrailer().getCompressionCodec()); 425 assertEquals(hcd.getDataBlockEncoding(), reader.getDataBlockEncoding()); 426 reader.close(); 427 } 428 429 @Test 430 public void testDeleteExpiredStoreFiles() throws Exception { 431 testDeleteExpiredStoreFiles(0); 432 testDeleteExpiredStoreFiles(1); 433 } 434 435 /** 436 * @param minVersions the MIN_VERSIONS for the column family 437 */ 438 public void testDeleteExpiredStoreFiles(int minVersions) throws Exception { 439 int storeFileNum = 4; 440 int ttl = 4; 441 IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge(); 442 EnvironmentEdgeManagerTestHelper.injectEdge(edge); 443 444 Configuration conf = HBaseConfiguration.create(); 445 // Enable the expired store file deletion 446 conf.setBoolean("hbase.store.delete.expired.storefile", true); 447 // Set the compaction threshold higher to avoid normal compactions. 448 conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 5); 449 450 init(name.getMethodName() + "-" + minVersions, conf, ColumnFamilyDescriptorBuilder 451 .newBuilder(family).setMinVersions(minVersions).setTimeToLive(ttl).build()); 452 453 long storeTtl = this.store.getScanInfo().getTtl(); 454 long sleepTime = storeTtl / storeFileNum; 455 long timeStamp; 456 // There are 4 store files and the max time stamp difference among these 457 // store files will be (this.store.ttl / storeFileNum) 458 for (int i = 1; i <= storeFileNum; i++) { 459 LOG.info("Adding some data for the store file #" + i); 460 timeStamp = EnvironmentEdgeManager.currentTime(); 461 this.store.add(new KeyValue(row, family, qf1, timeStamp, (byte[]) null), null); 462 this.store.add(new KeyValue(row, family, qf2, timeStamp, (byte[]) null), null); 463 this.store.add(new KeyValue(row, family, qf3, timeStamp, (byte[]) null), null); 464 flush(i); 465 edge.incrementTime(sleepTime); 466 } 467 468 // Verify the total number of store files 469 assertEquals(storeFileNum, this.store.getStorefiles().size()); 470 471 // Each call will find one expired store file and delete it before compaction happens. 472 // There will be no compaction due to threshold above. Last file will not be replaced. 473 for (int i = 1; i <= storeFileNum - 1; i++) { 474 // verify the expired store file. 475 assertFalse(this.store.requestCompaction().isPresent()); 476 Collection<HStoreFile> sfs = this.store.getStorefiles(); 477 // Ensure i files are gone. 478 if (minVersions == 0) { 479 assertEquals(storeFileNum - i, sfs.size()); 480 // Ensure only non-expired files remain. 481 for (HStoreFile sf : sfs) { 482 assertTrue(sf.getReader().getMaxTimestamp() >= (edge.currentTime() - storeTtl)); 483 } 484 } else { 485 assertEquals(storeFileNum, sfs.size()); 486 } 487 // Let the next store file expired. 488 edge.incrementTime(sleepTime); 489 } 490 assertFalse(this.store.requestCompaction().isPresent()); 491 492 Collection<HStoreFile> sfs = this.store.getStorefiles(); 493 // Assert the last expired file is not removed. 494 if (minVersions == 0) { 495 assertEquals(1, sfs.size()); 496 } 497 long ts = sfs.iterator().next().getReader().getMaxTimestamp(); 498 assertTrue(ts < (edge.currentTime() - storeTtl)); 499 500 for (HStoreFile sf : sfs) { 501 sf.closeStoreFile(true); 502 } 503 } 504 505 @Test 506 public void testLowestModificationTime() throws Exception { 507 Configuration conf = HBaseConfiguration.create(); 508 FileSystem fs = FileSystem.get(conf); 509 // Initialize region 510 init(name.getMethodName(), conf); 511 512 int storeFileNum = 4; 513 for (int i = 1; i <= storeFileNum; i++) { 514 LOG.info("Adding some data for the store file #" + i); 515 this.store.add(new KeyValue(row, family, qf1, i, (byte[]) null), null); 516 this.store.add(new KeyValue(row, family, qf2, i, (byte[]) null), null); 517 this.store.add(new KeyValue(row, family, qf3, i, (byte[]) null), null); 518 flush(i); 519 } 520 // after flush; check the lowest time stamp 521 long lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles()); 522 long lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles()); 523 assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS); 524 525 // after compact; check the lowest time stamp 526 store.compact(store.requestCompaction().get(), NoLimitThroughputController.INSTANCE, null); 527 lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles()); 528 lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles()); 529 assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS); 530 } 531 532 private static long getLowestTimeStampFromFS(FileSystem fs, 533 final Collection<HStoreFile> candidates) throws IOException { 534 long minTs = Long.MAX_VALUE; 535 if (candidates.isEmpty()) { 536 return minTs; 537 } 538 Path[] p = new Path[candidates.size()]; 539 int i = 0; 540 for (HStoreFile sf : candidates) { 541 p[i] = sf.getPath(); 542 ++i; 543 } 544 545 FileStatus[] stats = fs.listStatus(p); 546 if (stats == null || stats.length == 0) { 547 return minTs; 548 } 549 for (FileStatus s : stats) { 550 minTs = Math.min(minTs, s.getModificationTime()); 551 } 552 return minTs; 553 } 554 555 ////////////////////////////////////////////////////////////////////////////// 556 // Get tests 557 ////////////////////////////////////////////////////////////////////////////// 558 559 private static final int BLOCKSIZE_SMALL = 8192; 560 561 /** 562 * Test for hbase-1686. 563 */ 564 @Test 565 public void testEmptyStoreFile() throws IOException { 566 init(this.name.getMethodName()); 567 // Write a store file. 568 this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null); 569 this.store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null); 570 flush(1); 571 // Now put in place an empty store file. Its a little tricky. Have to 572 // do manually with hacked in sequence id. 573 HStoreFile f = this.store.getStorefiles().iterator().next(); 574 Path storedir = f.getPath().getParent(); 575 long seqid = f.getMaxSequenceId(); 576 Configuration c = HBaseConfiguration.create(); 577 FileSystem fs = FileSystem.get(c); 578 HFileContext meta = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).build(); 579 StoreFileWriter w = new StoreFileWriter.Builder(c, new CacheConfig(c), fs) 580 .withOutputDir(storedir).withFileContext(meta).build(); 581 w.appendMetadata(seqid + 1, false); 582 w.close(); 583 this.store.close(); 584 // Reopen it... should pick up two files 585 this.store = 586 new HStore(this.store.getHRegion(), this.store.getColumnFamilyDescriptor(), c, false); 587 assertEquals(2, this.store.getStorefilesCount()); 588 589 result = HBaseTestingUtility.getFromStoreFile(store, get.getRow(), qualifiers); 590 assertEquals(1, result.size()); 591 } 592 593 /** 594 * Getting data from memstore only 595 */ 596 @Test 597 public void testGet_FromMemStoreOnly() throws IOException { 598 init(this.name.getMethodName()); 599 600 // Put data in memstore 601 this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null); 602 this.store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null); 603 this.store.add(new KeyValue(row, family, qf3, 1, (byte[]) null), null); 604 this.store.add(new KeyValue(row, family, qf4, 1, (byte[]) null), null); 605 this.store.add(new KeyValue(row, family, qf5, 1, (byte[]) null), null); 606 this.store.add(new KeyValue(row, family, qf6, 1, (byte[]) null), null); 607 608 // Get 609 result = HBaseTestingUtility.getFromStoreFile(store, get.getRow(), qualifiers); 610 611 // Compare 612 assertCheck(); 613 } 614 615 @Test 616 public void testTimeRangeIfSomeCellsAreDroppedInFlush() throws IOException { 617 testTimeRangeIfSomeCellsAreDroppedInFlush(1); 618 testTimeRangeIfSomeCellsAreDroppedInFlush(3); 619 testTimeRangeIfSomeCellsAreDroppedInFlush(5); 620 } 621 622 private void testTimeRangeIfSomeCellsAreDroppedInFlush(int maxVersion) throws IOException { 623 init(this.name.getMethodName(), TEST_UTIL.getConfiguration(), 624 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(maxVersion).build()); 625 long currentTs = 100; 626 long minTs = currentTs; 627 // the extra cell won't be flushed to disk, 628 // so the min of timerange will be different between memStore and hfile. 629 for (int i = 0; i != (maxVersion + 1); ++i) { 630 this.store.add(new KeyValue(row, family, qf1, ++currentTs, (byte[]) null), null); 631 if (i == 1) { 632 minTs = currentTs; 633 } 634 } 635 flushStore(store, id++); 636 637 Collection<HStoreFile> files = store.getStorefiles(); 638 assertEquals(1, files.size()); 639 HStoreFile f = files.iterator().next(); 640 f.initReader(); 641 StoreFileReader reader = f.getReader(); 642 assertEquals(minTs, reader.timeRange.getMin()); 643 assertEquals(currentTs, reader.timeRange.getMax()); 644 } 645 646 /** 647 * Getting data from files only 648 */ 649 @Test 650 public void testGet_FromFilesOnly() throws IOException { 651 init(this.name.getMethodName()); 652 653 // Put data in memstore 654 this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null); 655 this.store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null); 656 // flush 657 flush(1); 658 659 // Add more data 660 this.store.add(new KeyValue(row, family, qf3, 1, (byte[]) null), null); 661 this.store.add(new KeyValue(row, family, qf4, 1, (byte[]) null), null); 662 // flush 663 flush(2); 664 665 // Add more data 666 this.store.add(new KeyValue(row, family, qf5, 1, (byte[]) null), null); 667 this.store.add(new KeyValue(row, family, qf6, 1, (byte[]) null), null); 668 // flush 669 flush(3); 670 671 // Get 672 result = HBaseTestingUtility.getFromStoreFile(store, get.getRow(), qualifiers); 673 // this.store.get(get, qualifiers, result); 674 675 // Need to sort the result since multiple files 676 Collections.sort(result, CellComparatorImpl.COMPARATOR); 677 678 // Compare 679 assertCheck(); 680 } 681 682 /** 683 * Getting data from memstore and files 684 */ 685 @Test 686 public void testGet_FromMemStoreAndFiles() throws IOException { 687 init(this.name.getMethodName()); 688 689 // Put data in memstore 690 this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null); 691 this.store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null); 692 // flush 693 flush(1); 694 695 // Add more data 696 this.store.add(new KeyValue(row, family, qf3, 1, (byte[]) null), null); 697 this.store.add(new KeyValue(row, family, qf4, 1, (byte[]) null), null); 698 // flush 699 flush(2); 700 701 // Add more data 702 this.store.add(new KeyValue(row, family, qf5, 1, (byte[]) null), null); 703 this.store.add(new KeyValue(row, family, qf6, 1, (byte[]) null), null); 704 705 // Get 706 result = HBaseTestingUtility.getFromStoreFile(store, get.getRow(), qualifiers); 707 708 // Need to sort the result since multiple files 709 Collections.sort(result, CellComparatorImpl.COMPARATOR); 710 711 // Compare 712 assertCheck(); 713 } 714 715 private void flush(int storeFilessize) throws IOException { 716 flushStore(store, id++); 717 assertEquals(storeFilessize, this.store.getStorefiles().size()); 718 assertEquals(0, ((AbstractMemStore) this.store.memstore).getActive().getCellsCount()); 719 } 720 721 private void assertCheck() { 722 assertEquals(expected.size(), result.size()); 723 for (int i = 0; i < expected.size(); i++) { 724 assertEquals(expected.get(i), result.get(i)); 725 } 726 } 727 728 @After 729 public void tearDown() throws Exception { 730 EnvironmentEdgeManagerTestHelper.reset(); 731 if (store != null) { 732 try { 733 store.close(); 734 } catch (IOException e) { 735 } 736 store = null; 737 } 738 if (region != null) { 739 region.close(); 740 region = null; 741 } 742 } 743 744 @AfterClass 745 public static void tearDownAfterClass() throws IOException { 746 TEST_UTIL.cleanupTestDir(); 747 } 748 749 @Test 750 public void testHandleErrorsInFlush() throws Exception { 751 LOG.info("Setting up a faulty file system that cannot write"); 752 753 final Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); 754 User user = User.createUserForTesting(conf, "testhandleerrorsinflush", new String[] { "foo" }); 755 // Inject our faulty LocalFileSystem 756 conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class); 757 user.runAs(new PrivilegedExceptionAction<Object>() { 758 @Override 759 public Object run() throws Exception { 760 // Make sure it worked (above is sensitive to caching details in hadoop core) 761 FileSystem fs = FileSystem.get(conf); 762 assertEquals(FaultyFileSystem.class, fs.getClass()); 763 764 // Initialize region 765 init(name.getMethodName(), conf); 766 767 LOG.info("Adding some data"); 768 store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null); 769 store.add(new KeyValue(row, family, qf2, 1, (byte[]) null), null); 770 store.add(new KeyValue(row, family, qf3, 1, (byte[]) null), null); 771 772 LOG.info("Before flush, we should have no files"); 773 774 Collection<StoreFileInfo> files = 775 store.getRegionFileSystem().getStoreFiles(store.getColumnFamilyName()); 776 assertEquals(0, files != null ? files.size() : 0); 777 778 // flush 779 try { 780 LOG.info("Flushing"); 781 flush(1); 782 fail("Didn't bubble up IOE!"); 783 } catch (IOException ioe) { 784 assertTrue(ioe.getMessage().contains("Fault injected")); 785 } 786 787 LOG.info("After failed flush, we should still have no files!"); 788 files = store.getRegionFileSystem().getStoreFiles(store.getColumnFamilyName()); 789 assertEquals(0, files != null ? files.size() : 0); 790 store.getHRegion().getWAL().close(); 791 return null; 792 } 793 }); 794 FileSystem.closeAllForUGI(user.getUGI()); 795 } 796 797 /** 798 * Faulty file system that will fail if you write past its fault position the FIRST TIME only; 799 * thereafter it will succeed. Used by {@link TestHRegion} too. 800 */ 801 static class FaultyFileSystem extends FilterFileSystem { 802 List<SoftReference<FaultyOutputStream>> outStreams = new ArrayList<>(); 803 private long faultPos = 200; 804 AtomicBoolean fault = new AtomicBoolean(true); 805 806 public FaultyFileSystem() { 807 super(new LocalFileSystem()); 808 LOG.info("Creating faulty!"); 809 } 810 811 @Override 812 public FSDataOutputStream create(Path p) throws IOException { 813 return new FaultyOutputStream(super.create(p), faultPos, fault); 814 } 815 816 @Override 817 public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, 818 int bufferSize, short replication, long blockSize, Progressable progress) throws IOException { 819 return new FaultyOutputStream( 820 super.create(f, permission, overwrite, bufferSize, replication, blockSize, progress), 821 faultPos, fault); 822 } 823 824 @Override 825 public FSDataOutputStream createNonRecursive(Path f, boolean overwrite, int bufferSize, 826 short replication, long blockSize, Progressable progress) throws IOException { 827 // Fake it. Call create instead. The default implementation throws an IOE 828 // that this is not supported. 829 return create(f, overwrite, bufferSize, replication, blockSize, progress); 830 } 831 } 832 833 static class FaultyOutputStream extends FSDataOutputStream { 834 volatile long faultPos = Long.MAX_VALUE; 835 private final AtomicBoolean fault; 836 837 public FaultyOutputStream(FSDataOutputStream out, long faultPos, final AtomicBoolean fault) 838 throws IOException { 839 super(out, null); 840 this.faultPos = faultPos; 841 this.fault = fault; 842 } 843 844 @Override 845 public synchronized void write(byte[] buf, int offset, int length) throws IOException { 846 LOG.info("faulty stream write at pos " + getPos()); 847 injectFault(); 848 super.write(buf, offset, length); 849 } 850 851 private void injectFault() throws IOException { 852 if (this.fault.get() && getPos() >= faultPos) { 853 throw new IOException("Fault injected"); 854 } 855 } 856 } 857 858 private static StoreFlushContext flushStore(HStore store, long id) throws IOException { 859 StoreFlushContext storeFlushCtx = store.createFlushContext(id, FlushLifeCycleTracker.DUMMY); 860 storeFlushCtx.prepare(); 861 storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class)); 862 storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); 863 return storeFlushCtx; 864 } 865 866 /** 867 * Generate a list of KeyValues for testing based on given parameters 868 * @return the rows key-value list 869 */ 870 private List<Cell> getKeyValueSet(long[] timestamps, int numRows, byte[] qualifier, 871 byte[] family) { 872 List<Cell> kvList = new ArrayList<>(); 873 for (int i = 1; i <= numRows; i++) { 874 byte[] b = Bytes.toBytes(i); 875 for (long timestamp : timestamps) { 876 kvList.add(new KeyValue(b, family, qualifier, timestamp, b)); 877 } 878 } 879 return kvList; 880 } 881 882 /** 883 * Test to ensure correctness when using Stores with multiple timestamps 884 */ 885 @Test 886 public void testMultipleTimestamps() throws IOException { 887 int numRows = 1; 888 long[] timestamps1 = new long[] { 1, 5, 10, 20 }; 889 long[] timestamps2 = new long[] { 30, 80 }; 890 891 init(this.name.getMethodName()); 892 893 List<Cell> kvList1 = getKeyValueSet(timestamps1, numRows, qf1, family); 894 for (Cell kv : kvList1) { 895 this.store.add(kv, null); 896 } 897 898 flushStore(store, id++); 899 900 List<Cell> kvList2 = getKeyValueSet(timestamps2, numRows, qf1, family); 901 for (Cell kv : kvList2) { 902 this.store.add(kv, null); 903 } 904 905 List<Cell> result; 906 Get get = new Get(Bytes.toBytes(1)); 907 get.addColumn(family, qf1); 908 909 get.setTimeRange(0, 15); 910 result = HBaseTestingUtility.getFromStoreFile(store, get); 911 assertTrue(result.size() > 0); 912 913 get.setTimeRange(40, 90); 914 result = HBaseTestingUtility.getFromStoreFile(store, get); 915 assertTrue(result.size() > 0); 916 917 get.setTimeRange(10, 45); 918 result = HBaseTestingUtility.getFromStoreFile(store, get); 919 assertTrue(result.size() > 0); 920 921 get.setTimeRange(80, 145); 922 result = HBaseTestingUtility.getFromStoreFile(store, get); 923 assertTrue(result.size() > 0); 924 925 get.setTimeRange(1, 2); 926 result = HBaseTestingUtility.getFromStoreFile(store, get); 927 assertTrue(result.size() > 0); 928 929 get.setTimeRange(90, 200); 930 result = HBaseTestingUtility.getFromStoreFile(store, get); 931 assertTrue(result.size() == 0); 932 } 933 934 /** 935 * Test for HBASE-3492 - Test split on empty colfam (no store files). 936 * @throws IOException When the IO operations fail. 937 */ 938 @Test 939 public void testSplitWithEmptyColFam() throws IOException { 940 init(this.name.getMethodName()); 941 assertFalse(store.getSplitPoint().isPresent()); 942 } 943 944 @Test 945 public void testStoreUsesConfigurationFromHcdAndHtd() throws Exception { 946 final String CONFIG_KEY = "hbase.regionserver.thread.compaction.throttle"; 947 long anyValue = 10; 948 949 // We'll check that it uses correct config and propagates it appropriately by going thru 950 // the simplest "real" path I can find - "throttleCompaction", which just checks whether 951 // a number we pass in is higher than some config value, inside compactionPolicy. 952 Configuration conf = HBaseConfiguration.create(); 953 conf.setLong(CONFIG_KEY, anyValue); 954 init(name.getMethodName() + "-xml", conf); 955 assertTrue(store.throttleCompaction(anyValue + 1)); 956 assertFalse(store.throttleCompaction(anyValue)); 957 958 // HTD overrides XML. 959 --anyValue; 960 init( 961 name.getMethodName() + "-htd", conf, TableDescriptorBuilder 962 .newBuilder(TableName.valueOf(table)).setValue(CONFIG_KEY, Long.toString(anyValue)), 963 ColumnFamilyDescriptorBuilder.of(family)); 964 assertTrue(store.throttleCompaction(anyValue + 1)); 965 assertFalse(store.throttleCompaction(anyValue)); 966 967 // HCD overrides them both. 968 --anyValue; 969 init(name.getMethodName() + "-hcd", conf, 970 TableDescriptorBuilder.newBuilder(TableName.valueOf(table)).setValue(CONFIG_KEY, 971 Long.toString(anyValue)), 972 ColumnFamilyDescriptorBuilder.newBuilder(family).setValue(CONFIG_KEY, Long.toString(anyValue)) 973 .build()); 974 assertTrue(store.throttleCompaction(anyValue + 1)); 975 assertFalse(store.throttleCompaction(anyValue)); 976 } 977 978 public static class DummyStoreEngine extends DefaultStoreEngine { 979 public static DefaultCompactor lastCreatedCompactor = null; 980 981 @Override 982 protected void createComponents(Configuration conf, HStore store, CellComparator comparator) 983 throws IOException { 984 super.createComponents(conf, store, comparator); 985 lastCreatedCompactor = this.compactor; 986 } 987 } 988 989 @Test 990 public void testStoreUsesSearchEngineOverride() throws Exception { 991 Configuration conf = HBaseConfiguration.create(); 992 conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DummyStoreEngine.class.getName()); 993 init(this.name.getMethodName(), conf); 994 assertEquals(DummyStoreEngine.lastCreatedCompactor, this.store.storeEngine.getCompactor()); 995 } 996 997 private void addStoreFile() throws IOException { 998 HStoreFile f = this.store.getStorefiles().iterator().next(); 999 Path storedir = f.getPath().getParent(); 1000 long seqid = this.store.getMaxSequenceId().orElse(0L); 1001 Configuration c = TEST_UTIL.getConfiguration(); 1002 FileSystem fs = FileSystem.get(c); 1003 HFileContext fileContext = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).build(); 1004 StoreFileWriter w = new StoreFileWriter.Builder(c, new CacheConfig(c), fs) 1005 .withOutputDir(storedir).withFileContext(fileContext).build(); 1006 w.appendMetadata(seqid + 1, false); 1007 w.close(); 1008 LOG.info("Added store file:" + w.getPath()); 1009 } 1010 1011 private void archiveStoreFile(int index) throws IOException { 1012 Collection<HStoreFile> files = this.store.getStorefiles(); 1013 HStoreFile sf = null; 1014 Iterator<HStoreFile> it = files.iterator(); 1015 for (int i = 0; i <= index; i++) { 1016 sf = it.next(); 1017 } 1018 store.getRegionFileSystem().removeStoreFiles(store.getColumnFamilyName(), 1019 Lists.newArrayList(sf)); 1020 } 1021 1022 private void closeCompactedFile(int index) throws IOException { 1023 Collection<HStoreFile> files = 1024 this.store.getStoreEngine().getStoreFileManager().getCompactedfiles(); 1025 if (files.size() > 0) { 1026 HStoreFile sf = null; 1027 Iterator<HStoreFile> it = files.iterator(); 1028 for (int i = 0; i <= index; i++) { 1029 sf = it.next(); 1030 } 1031 sf.closeStoreFile(true); 1032 store.getStoreEngine().getStoreFileManager() 1033 .removeCompactedFiles(Collections.singletonList(sf)); 1034 } 1035 } 1036 1037 @Test 1038 public void testRefreshStoreFiles() throws Exception { 1039 init(name.getMethodName()); 1040 1041 assertEquals(0, this.store.getStorefilesCount()); 1042 1043 // Test refreshing store files when no store files are there 1044 store.refreshStoreFiles(); 1045 assertEquals(0, this.store.getStorefilesCount()); 1046 1047 // add some data, flush 1048 this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null); 1049 flush(1); 1050 assertEquals(1, this.store.getStorefilesCount()); 1051 1052 // add one more file 1053 addStoreFile(); 1054 1055 assertEquals(1, this.store.getStorefilesCount()); 1056 store.refreshStoreFiles(); 1057 assertEquals(2, this.store.getStorefilesCount()); 1058 1059 // add three more files 1060 addStoreFile(); 1061 addStoreFile(); 1062 addStoreFile(); 1063 1064 assertEquals(2, this.store.getStorefilesCount()); 1065 store.refreshStoreFiles(); 1066 assertEquals(5, this.store.getStorefilesCount()); 1067 1068 closeCompactedFile(0); 1069 archiveStoreFile(0); 1070 1071 assertEquals(5, this.store.getStorefilesCount()); 1072 store.refreshStoreFiles(); 1073 assertEquals(4, this.store.getStorefilesCount()); 1074 1075 archiveStoreFile(0); 1076 archiveStoreFile(1); 1077 archiveStoreFile(2); 1078 1079 assertEquals(4, this.store.getStorefilesCount()); 1080 store.refreshStoreFiles(); 1081 assertEquals(1, this.store.getStorefilesCount()); 1082 1083 archiveStoreFile(0); 1084 store.refreshStoreFiles(); 1085 assertEquals(0, this.store.getStorefilesCount()); 1086 } 1087 1088 @Test 1089 public void testRefreshStoreFilesNotChanged() throws IOException { 1090 init(name.getMethodName()); 1091 1092 assertEquals(0, this.store.getStorefilesCount()); 1093 1094 // add some data, flush 1095 this.store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), null); 1096 flush(1); 1097 // add one more file 1098 addStoreFile(); 1099 1100 StoreEngine<?, ?, ?, ?> spiedStoreEngine = spy(store.getStoreEngine()); 1101 1102 // call first time after files changed 1103 spiedStoreEngine.refreshStoreFiles(); 1104 assertEquals(2, this.store.getStorefilesCount()); 1105 verify(spiedStoreEngine, times(1)).replaceStoreFiles(any(), any(), any(), any()); 1106 1107 // call second time 1108 spiedStoreEngine.refreshStoreFiles(); 1109 1110 // ensure that replaceStoreFiles is not called, i.e, the times does not change, if files are not 1111 // refreshed, 1112 verify(spiedStoreEngine, times(1)).replaceStoreFiles(any(), any(), any(), any()); 1113 } 1114 1115 private long countMemStoreScanner(StoreScanner scanner) { 1116 if (scanner.currentScanners == null) { 1117 return 0; 1118 } 1119 return scanner.currentScanners.stream().filter(s -> !s.isFileScanner()).count(); 1120 } 1121 1122 @Test 1123 public void testNumberOfMemStoreScannersAfterFlush() throws IOException { 1124 long seqId = 100; 1125 long timestamp = EnvironmentEdgeManager.currentTime(); 1126 Cell cell0 = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family) 1127 .setQualifier(qf1).setTimestamp(timestamp).setType(Cell.Type.Put).setValue(qf1).build(); 1128 PrivateCellUtil.setSequenceId(cell0, seqId); 1129 testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Collections.emptyList()); 1130 1131 Cell cell1 = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family) 1132 .setQualifier(qf2).setTimestamp(timestamp).setType(Cell.Type.Put).setValue(qf1).build(); 1133 PrivateCellUtil.setSequenceId(cell1, seqId); 1134 testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Arrays.asList(cell1)); 1135 1136 seqId = 101; 1137 timestamp = EnvironmentEdgeManager.currentTime(); 1138 Cell cell2 = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row2).setFamily(family) 1139 .setQualifier(qf2).setTimestamp(timestamp).setType(Cell.Type.Put).setValue(qf1).build(); 1140 PrivateCellUtil.setSequenceId(cell2, seqId); 1141 testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Arrays.asList(cell1, cell2)); 1142 } 1143 1144 private void testNumberOfMemStoreScannersAfterFlush(List<Cell> inputCellsBeforeSnapshot, 1145 List<Cell> inputCellsAfterSnapshot) throws IOException { 1146 init(this.name.getMethodName() + "-" + inputCellsBeforeSnapshot.size()); 1147 TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); 1148 long seqId = Long.MIN_VALUE; 1149 for (Cell c : inputCellsBeforeSnapshot) { 1150 quals.add(CellUtil.cloneQualifier(c)); 1151 seqId = Math.max(seqId, c.getSequenceId()); 1152 } 1153 for (Cell c : inputCellsAfterSnapshot) { 1154 quals.add(CellUtil.cloneQualifier(c)); 1155 seqId = Math.max(seqId, c.getSequenceId()); 1156 } 1157 inputCellsBeforeSnapshot.forEach(c -> store.add(c, null)); 1158 StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY); 1159 storeFlushCtx.prepare(); 1160 inputCellsAfterSnapshot.forEach(c -> store.add(c, null)); 1161 int numberOfMemScannersBeforeFlush = inputCellsAfterSnapshot.isEmpty() ? 1 : 2; 1162 try (StoreScanner s = (StoreScanner) store.getScanner(new Scan(), quals, seqId)) { 1163 // snapshot + active (if inputCellsAfterSnapshot isn't empty) 1164 assertEquals(numberOfMemScannersBeforeFlush, countMemStoreScanner(s)); 1165 storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class)); 1166 storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); 1167 // snapshot has no data after flush 1168 int numberOfMemScannersAfterFlush = inputCellsAfterSnapshot.isEmpty() ? 0 : 1; 1169 boolean more; 1170 int cellCount = 0; 1171 do { 1172 List<Cell> cells = new ArrayList<>(); 1173 more = s.next(cells); 1174 cellCount += cells.size(); 1175 assertEquals(more ? numberOfMemScannersAfterFlush : 0, countMemStoreScanner(s)); 1176 } while (more); 1177 assertEquals( 1178 "The number of cells added before snapshot is " + inputCellsBeforeSnapshot.size() 1179 + ", The number of cells added after snapshot is " + inputCellsAfterSnapshot.size(), 1180 inputCellsBeforeSnapshot.size() + inputCellsAfterSnapshot.size(), cellCount); 1181 // the current scanners is cleared 1182 assertEquals(0, countMemStoreScanner(s)); 1183 } 1184 } 1185 1186 private Cell createCell(byte[] qualifier, long ts, long sequenceId, byte[] value) 1187 throws IOException { 1188 return createCell(row, qualifier, ts, sequenceId, value); 1189 } 1190 1191 private Cell createCell(byte[] row, byte[] qualifier, long ts, long sequenceId, byte[] value) 1192 throws IOException { 1193 Cell c = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family) 1194 .setQualifier(qualifier).setTimestamp(ts).setType(Cell.Type.Put).setValue(value).build(); 1195 PrivateCellUtil.setSequenceId(c, sequenceId); 1196 return c; 1197 } 1198 1199 @Test 1200 public void testFlushBeforeCompletingScanWoFilter() throws IOException, InterruptedException { 1201 final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false); 1202 final int expectedSize = 3; 1203 testFlushBeforeCompletingScan(new MyListHook() { 1204 @Override 1205 public void hook(int currentSize) { 1206 if (currentSize == expectedSize - 1) { 1207 try { 1208 flushStore(store, id++); 1209 timeToGoNextRow.set(true); 1210 } catch (IOException e) { 1211 throw new RuntimeException(e); 1212 } 1213 } 1214 } 1215 }, new FilterBase() { 1216 @Override 1217 public Filter.ReturnCode filterCell(final Cell c) throws IOException { 1218 return ReturnCode.INCLUDE; 1219 } 1220 }, expectedSize); 1221 } 1222 1223 @Test 1224 public void testFlushBeforeCompletingScanWithFilter() throws IOException, InterruptedException { 1225 final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false); 1226 final int expectedSize = 2; 1227 testFlushBeforeCompletingScan(new MyListHook() { 1228 @Override 1229 public void hook(int currentSize) { 1230 if (currentSize == expectedSize - 1) { 1231 try { 1232 flushStore(store, id++); 1233 timeToGoNextRow.set(true); 1234 } catch (IOException e) { 1235 throw new RuntimeException(e); 1236 } 1237 } 1238 } 1239 }, new FilterBase() { 1240 @Override 1241 public Filter.ReturnCode filterCell(final Cell c) throws IOException { 1242 if (timeToGoNextRow.get()) { 1243 timeToGoNextRow.set(false); 1244 return ReturnCode.NEXT_ROW; 1245 } else { 1246 return ReturnCode.INCLUDE; 1247 } 1248 } 1249 }, expectedSize); 1250 } 1251 1252 @Test 1253 public void testFlushBeforeCompletingScanWithFilterHint() 1254 throws IOException, InterruptedException { 1255 final AtomicBoolean timeToGetHint = new AtomicBoolean(false); 1256 final int expectedSize = 2; 1257 testFlushBeforeCompletingScan(new MyListHook() { 1258 @Override 1259 public void hook(int currentSize) { 1260 if (currentSize == expectedSize - 1) { 1261 try { 1262 flushStore(store, id++); 1263 timeToGetHint.set(true); 1264 } catch (IOException e) { 1265 throw new RuntimeException(e); 1266 } 1267 } 1268 } 1269 }, new FilterBase() { 1270 @Override 1271 public Filter.ReturnCode filterCell(final Cell c) throws IOException { 1272 if (timeToGetHint.get()) { 1273 timeToGetHint.set(false); 1274 return Filter.ReturnCode.SEEK_NEXT_USING_HINT; 1275 } else { 1276 return Filter.ReturnCode.INCLUDE; 1277 } 1278 } 1279 1280 @Override 1281 public Cell getNextCellHint(Cell currentCell) throws IOException { 1282 return currentCell; 1283 } 1284 }, expectedSize); 1285 } 1286 1287 private void testFlushBeforeCompletingScan(MyListHook hook, Filter filter, int expectedSize) 1288 throws IOException, InterruptedException { 1289 Configuration conf = HBaseConfiguration.create(); 1290 byte[] r0 = Bytes.toBytes("row0"); 1291 byte[] r1 = Bytes.toBytes("row1"); 1292 byte[] r2 = Bytes.toBytes("row2"); 1293 byte[] value0 = Bytes.toBytes("value0"); 1294 byte[] value1 = Bytes.toBytes("value1"); 1295 byte[] value2 = Bytes.toBytes("value2"); 1296 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 1297 long ts = EnvironmentEdgeManager.currentTime(); 1298 long seqId = 100; 1299 init(name.getMethodName(), conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), 1300 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(1).build(), 1301 new MyStoreHook() { 1302 @Override 1303 public long getSmallestReadPoint(HStore store) { 1304 return seqId + 3; 1305 } 1306 }); 1307 // The cells having the value0 won't be flushed to disk because the value of max version is 1 1308 store.add(createCell(r0, qf1, ts, seqId, value0), memStoreSizing); 1309 store.add(createCell(r0, qf2, ts, seqId, value0), memStoreSizing); 1310 store.add(createCell(r0, qf3, ts, seqId, value0), memStoreSizing); 1311 store.add(createCell(r1, qf1, ts + 1, seqId + 1, value1), memStoreSizing); 1312 store.add(createCell(r1, qf2, ts + 1, seqId + 1, value1), memStoreSizing); 1313 store.add(createCell(r1, qf3, ts + 1, seqId + 1, value1), memStoreSizing); 1314 store.add(createCell(r2, qf1, ts + 2, seqId + 2, value2), memStoreSizing); 1315 store.add(createCell(r2, qf2, ts + 2, seqId + 2, value2), memStoreSizing); 1316 store.add(createCell(r2, qf3, ts + 2, seqId + 2, value2), memStoreSizing); 1317 store.add(createCell(r1, qf1, ts + 3, seqId + 3, value1), memStoreSizing); 1318 store.add(createCell(r1, qf2, ts + 3, seqId + 3, value1), memStoreSizing); 1319 store.add(createCell(r1, qf3, ts + 3, seqId + 3, value1), memStoreSizing); 1320 List<Cell> myList = new MyList<>(hook); 1321 Scan scan = new Scan().withStartRow(r1).setFilter(filter); 1322 try (InternalScanner scanner = (InternalScanner) store.getScanner(scan, null, seqId + 3)) { 1323 // r1 1324 scanner.next(myList); 1325 assertEquals(expectedSize, myList.size()); 1326 for (Cell c : myList) { 1327 byte[] actualValue = CellUtil.cloneValue(c); 1328 assertTrue("expected:" + Bytes.toStringBinary(value1) + ", actual:" 1329 + Bytes.toStringBinary(actualValue), Bytes.equals(actualValue, value1)); 1330 } 1331 List<Cell> normalList = new ArrayList<>(3); 1332 // r2 1333 scanner.next(normalList); 1334 assertEquals(3, normalList.size()); 1335 for (Cell c : normalList) { 1336 byte[] actualValue = CellUtil.cloneValue(c); 1337 assertTrue("expected:" + Bytes.toStringBinary(value2) + ", actual:" 1338 + Bytes.toStringBinary(actualValue), Bytes.equals(actualValue, value2)); 1339 } 1340 } 1341 } 1342 1343 @Test 1344 public void testCreateScannerAndSnapshotConcurrently() throws IOException, InterruptedException { 1345 Configuration conf = HBaseConfiguration.create(); 1346 conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore.class.getName()); 1347 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) 1348 .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); 1349 byte[] value = Bytes.toBytes("value"); 1350 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 1351 long ts = EnvironmentEdgeManager.currentTime(); 1352 long seqId = 100; 1353 // older data whihc shouldn't be "seen" by client 1354 store.add(createCell(qf1, ts, seqId, value), memStoreSizing); 1355 store.add(createCell(qf2, ts, seqId, value), memStoreSizing); 1356 store.add(createCell(qf3, ts, seqId, value), memStoreSizing); 1357 TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); 1358 quals.add(qf1); 1359 quals.add(qf2); 1360 quals.add(qf3); 1361 StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY); 1362 MyCompactingMemStore.START_TEST.set(true); 1363 Runnable flush = () -> { 1364 // this is blocked until we create first scanner from pipeline and snapshot -- phase (1/5) 1365 // recreate the active memstore -- phase (4/5) 1366 storeFlushCtx.prepare(); 1367 }; 1368 ExecutorService service = Executors.newSingleThreadExecutor(); 1369 service.execute(flush); 1370 // we get scanner from pipeline and snapshot but they are empty. -- phase (2/5) 1371 // this is blocked until we recreate the active memstore -- phase (3/5) 1372 // we get scanner from active memstore but it is empty -- phase (5/5) 1373 InternalScanner scanner = 1374 (InternalScanner) store.getScanner(new Scan(new Get(row)), quals, seqId + 1); 1375 service.shutdown(); 1376 service.awaitTermination(20, TimeUnit.SECONDS); 1377 try { 1378 try { 1379 List<Cell> results = new ArrayList<>(); 1380 scanner.next(results); 1381 assertEquals(3, results.size()); 1382 for (Cell c : results) { 1383 byte[] actualValue = CellUtil.cloneValue(c); 1384 assertTrue("expected:" + Bytes.toStringBinary(value) + ", actual:" 1385 + Bytes.toStringBinary(actualValue), Bytes.equals(actualValue, value)); 1386 } 1387 } finally { 1388 scanner.close(); 1389 } 1390 } finally { 1391 MyCompactingMemStore.START_TEST.set(false); 1392 storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class)); 1393 storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); 1394 } 1395 } 1396 1397 @Test 1398 public void testScanWithDoubleFlush() throws IOException { 1399 Configuration conf = HBaseConfiguration.create(); 1400 // Initialize region 1401 MyStore myStore = initMyStore(name.getMethodName(), conf, new MyStoreHook() { 1402 @Override 1403 public void getScanners(MyStore store) throws IOException { 1404 final long tmpId = id++; 1405 ExecutorService s = Executors.newSingleThreadExecutor(); 1406 s.execute(() -> { 1407 try { 1408 // flush the store before storescanner updates the scanners from store. 1409 // The current data will be flushed into files, and the memstore will 1410 // be clear. 1411 // -- phase (4/4) 1412 flushStore(store, tmpId); 1413 } catch (IOException ex) { 1414 throw new RuntimeException(ex); 1415 } 1416 }); 1417 s.shutdown(); 1418 try { 1419 // wait for the flush, the thread will be blocked in HStore#notifyChangedReadersObservers. 1420 s.awaitTermination(3, TimeUnit.SECONDS); 1421 } catch (InterruptedException ex) { 1422 } 1423 } 1424 }); 1425 byte[] oldValue = Bytes.toBytes("oldValue"); 1426 byte[] currentValue = Bytes.toBytes("currentValue"); 1427 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 1428 long ts = EnvironmentEdgeManager.currentTime(); 1429 long seqId = 100; 1430 // older data whihc shouldn't be "seen" by client 1431 myStore.add(createCell(qf1, ts, seqId, oldValue), memStoreSizing); 1432 myStore.add(createCell(qf2, ts, seqId, oldValue), memStoreSizing); 1433 myStore.add(createCell(qf3, ts, seqId, oldValue), memStoreSizing); 1434 long snapshotId = id++; 1435 // push older data into snapshot -- phase (1/4) 1436 StoreFlushContext storeFlushCtx = 1437 store.createFlushContext(snapshotId, FlushLifeCycleTracker.DUMMY); 1438 storeFlushCtx.prepare(); 1439 1440 // insert current data into active -- phase (2/4) 1441 myStore.add(createCell(qf1, ts + 1, seqId + 1, currentValue), memStoreSizing); 1442 myStore.add(createCell(qf2, ts + 1, seqId + 1, currentValue), memStoreSizing); 1443 myStore.add(createCell(qf3, ts + 1, seqId + 1, currentValue), memStoreSizing); 1444 TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); 1445 quals.add(qf1); 1446 quals.add(qf2); 1447 quals.add(qf3); 1448 try (InternalScanner scanner = 1449 (InternalScanner) myStore.getScanner(new Scan(new Get(row)), quals, seqId + 1)) { 1450 // complete the flush -- phase (3/4) 1451 storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class)); 1452 storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); 1453 1454 List<Cell> results = new ArrayList<>(); 1455 scanner.next(results); 1456 assertEquals(3, results.size()); 1457 for (Cell c : results) { 1458 byte[] actualValue = CellUtil.cloneValue(c); 1459 assertTrue("expected:" + Bytes.toStringBinary(currentValue) + ", actual:" 1460 + Bytes.toStringBinary(actualValue), Bytes.equals(actualValue, currentValue)); 1461 } 1462 } 1463 } 1464 1465 @Test 1466 public void testReclaimChunkWhenScaning() throws IOException { 1467 init("testReclaimChunkWhenScaning"); 1468 long ts = EnvironmentEdgeManager.currentTime(); 1469 long seqId = 100; 1470 byte[] value = Bytes.toBytes("value"); 1471 // older data whihc shouldn't be "seen" by client 1472 store.add(createCell(qf1, ts, seqId, value), null); 1473 store.add(createCell(qf2, ts, seqId, value), null); 1474 store.add(createCell(qf3, ts, seqId, value), null); 1475 TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); 1476 quals.add(qf1); 1477 quals.add(qf2); 1478 quals.add(qf3); 1479 try (InternalScanner scanner = 1480 (InternalScanner) store.getScanner(new Scan(new Get(row)), quals, seqId)) { 1481 List<Cell> results = new MyList<>(size -> { 1482 switch (size) { 1483 // 1) we get the first cell (qf1) 1484 // 2) flush the data to have StoreScanner update inner scanners 1485 // 3) the chunk will be reclaimed after updaing 1486 case 1: 1487 try { 1488 flushStore(store, id++); 1489 } catch (IOException e) { 1490 throw new RuntimeException(e); 1491 } 1492 break; 1493 // 1) we get the second cell (qf2) 1494 // 2) add some cell to fill some byte into the chunk (we have only one chunk) 1495 case 2: 1496 try { 1497 byte[] newValue = Bytes.toBytes("newValue"); 1498 // older data whihc shouldn't be "seen" by client 1499 store.add(createCell(qf1, ts + 1, seqId + 1, newValue), null); 1500 store.add(createCell(qf2, ts + 1, seqId + 1, newValue), null); 1501 store.add(createCell(qf3, ts + 1, seqId + 1, newValue), null); 1502 } catch (IOException e) { 1503 throw new RuntimeException(e); 1504 } 1505 break; 1506 default: 1507 break; 1508 } 1509 }); 1510 scanner.next(results); 1511 assertEquals(3, results.size()); 1512 for (Cell c : results) { 1513 byte[] actualValue = CellUtil.cloneValue(c); 1514 assertTrue("expected:" + Bytes.toStringBinary(value) + ", actual:" 1515 + Bytes.toStringBinary(actualValue), Bytes.equals(actualValue, value)); 1516 } 1517 } 1518 } 1519 1520 /** 1521 * If there are two running InMemoryFlushRunnable, the later InMemoryFlushRunnable may change the 1522 * versionedList. And the first InMemoryFlushRunnable will use the chagned versionedList to remove 1523 * the corresponding segments. In short, there will be some segements which isn't in merge are 1524 * removed. 1525 */ 1526 @Test 1527 public void testRunDoubleMemStoreCompactors() throws IOException, InterruptedException { 1528 int flushSize = 500; 1529 Configuration conf = HBaseConfiguration.create(); 1530 conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStoreWithCustomCompactor.class.getName()); 1531 conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.25); 1532 MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.set(0); 1533 conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushSize)); 1534 // Set the lower threshold to invoke the "MERGE" policy 1535 conf.set(MemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY, String.valueOf(0)); 1536 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) 1537 .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); 1538 byte[] value = Bytes.toBytes("thisisavarylargevalue"); 1539 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 1540 long ts = EnvironmentEdgeManager.currentTime(); 1541 long seqId = 100; 1542 // older data whihc shouldn't be "seen" by client 1543 store.add(createCell(qf1, ts, seqId, value), memStoreSizing); 1544 store.add(createCell(qf2, ts, seqId, value), memStoreSizing); 1545 store.add(createCell(qf3, ts, seqId, value), memStoreSizing); 1546 assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get()); 1547 StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY); 1548 storeFlushCtx.prepare(); 1549 // This shouldn't invoke another in-memory flush because the first compactor thread 1550 // hasn't accomplished the in-memory compaction. 1551 store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing); 1552 store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing); 1553 store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing); 1554 assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get()); 1555 // okay. Let the compaction be completed 1556 MyMemStoreCompactor.START_COMPACTOR_LATCH.countDown(); 1557 CompactingMemStore mem = (CompactingMemStore) ((HStore) store).memstore; 1558 while (mem.isMemStoreFlushingInMemory()) { 1559 TimeUnit.SECONDS.sleep(1); 1560 } 1561 // This should invoke another in-memory flush. 1562 store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing); 1563 store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing); 1564 store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing); 1565 assertEquals(2, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get()); 1566 conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1567 String.valueOf(TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE)); 1568 storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class)); 1569 storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); 1570 } 1571 1572 @Test 1573 public void testAge() throws IOException { 1574 long currentTime = EnvironmentEdgeManager.currentTime(); 1575 ManualEnvironmentEdge edge = new ManualEnvironmentEdge(); 1576 edge.setValue(currentTime); 1577 EnvironmentEdgeManager.injectEdge(edge); 1578 Configuration conf = TEST_UTIL.getConfiguration(); 1579 ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.of(family); 1580 initHRegion(name.getMethodName(), conf, 1581 TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), hcd, null, false); 1582 HStore store = new HStore(region, hcd, conf, false) { 1583 1584 @Override 1585 protected StoreEngine<?, ?, ?, ?> createStoreEngine(HStore store, Configuration conf, 1586 CellComparator kvComparator) throws IOException { 1587 List<HStoreFile> storefiles = 1588 Arrays.asList(mockStoreFile(currentTime - 10), mockStoreFile(currentTime - 100), 1589 mockStoreFile(currentTime - 1000), mockStoreFile(currentTime - 10000)); 1590 StoreFileManager sfm = mock(StoreFileManager.class); 1591 when(sfm.getStorefiles()).thenReturn(storefiles); 1592 StoreEngine<?, ?, ?, ?> storeEngine = mock(StoreEngine.class); 1593 when(storeEngine.getStoreFileManager()).thenReturn(sfm); 1594 return storeEngine; 1595 } 1596 }; 1597 assertEquals(10L, store.getMinStoreFileAge().getAsLong()); 1598 assertEquals(10000L, store.getMaxStoreFileAge().getAsLong()); 1599 assertEquals((10 + 100 + 1000 + 10000) / 4.0, store.getAvgStoreFileAge().getAsDouble(), 1E-4); 1600 } 1601 1602 private HStoreFile mockStoreFile(long createdTime) { 1603 StoreFileInfo info = mock(StoreFileInfo.class); 1604 when(info.getCreatedTimestamp()).thenReturn(createdTime); 1605 HStoreFile sf = mock(HStoreFile.class); 1606 when(sf.getReader()).thenReturn(mock(StoreFileReader.class)); 1607 when(sf.isHFile()).thenReturn(true); 1608 when(sf.getFileInfo()).thenReturn(info); 1609 return sf; 1610 } 1611 1612 private MyStore initMyStore(String methodName, Configuration conf, MyStoreHook hook) 1613 throws IOException { 1614 return (MyStore) init(methodName, conf, 1615 TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), 1616 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(5).build(), hook); 1617 } 1618 1619 private static class MyStore extends HStore { 1620 private final MyStoreHook hook; 1621 1622 MyStore(final HRegion region, final ColumnFamilyDescriptor family, 1623 final Configuration confParam, MyStoreHook hook, boolean switchToPread) throws IOException { 1624 super(region, family, confParam, false); 1625 this.hook = hook; 1626 } 1627 1628 @Override 1629 public List<KeyValueScanner> getScanners(List<HStoreFile> files, boolean cacheBlocks, 1630 boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, 1631 boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt, 1632 boolean includeMemstoreScanner) throws IOException { 1633 hook.getScanners(this); 1634 return super.getScanners(files, cacheBlocks, usePread, isCompaction, matcher, startRow, true, 1635 stopRow, false, readPt, includeMemstoreScanner); 1636 } 1637 1638 @Override 1639 public long getSmallestReadPoint() { 1640 return hook.getSmallestReadPoint(this); 1641 } 1642 } 1643 1644 private abstract static class MyStoreHook { 1645 1646 void getScanners(MyStore store) throws IOException { 1647 } 1648 1649 long getSmallestReadPoint(HStore store) { 1650 return store.getHRegion().getSmallestReadPoint(); 1651 } 1652 } 1653 1654 @Test 1655 public void testSwitchingPreadtoStreamParallelyWithCompactionDischarger() throws Exception { 1656 Configuration conf = HBaseConfiguration.create(); 1657 conf.set("hbase.hstore.engine.class", DummyStoreEngine.class.getName()); 1658 conf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 0); 1659 // Set the lower threshold to invoke the "MERGE" policy 1660 MyStore store = initMyStore(name.getMethodName(), conf, new MyStoreHook() { 1661 }); 1662 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 1663 long ts = EnvironmentEdgeManager.currentTime(); 1664 long seqID = 1L; 1665 // Add some data to the region and do some flushes 1666 for (int i = 1; i < 10; i++) { 1667 store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), 1668 memStoreSizing); 1669 } 1670 // flush them 1671 flushStore(store, seqID); 1672 for (int i = 11; i < 20; i++) { 1673 store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), 1674 memStoreSizing); 1675 } 1676 // flush them 1677 flushStore(store, seqID); 1678 for (int i = 21; i < 30; i++) { 1679 store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), 1680 memStoreSizing); 1681 } 1682 // flush them 1683 flushStore(store, seqID); 1684 1685 assertEquals(3, store.getStorefilesCount()); 1686 Scan scan = new Scan(); 1687 scan.addFamily(family); 1688 Collection<HStoreFile> storefiles2 = store.getStorefiles(); 1689 ArrayList<HStoreFile> actualStorefiles = Lists.newArrayList(storefiles2); 1690 StoreScanner storeScanner = 1691 (StoreScanner) store.getScanner(scan, scan.getFamilyMap().get(family), Long.MAX_VALUE); 1692 // get the current heap 1693 KeyValueHeap heap = storeScanner.heap; 1694 // create more store files 1695 for (int i = 31; i < 40; i++) { 1696 store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), 1697 memStoreSizing); 1698 } 1699 // flush them 1700 flushStore(store, seqID); 1701 1702 for (int i = 41; i < 50; i++) { 1703 store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), 1704 memStoreSizing); 1705 } 1706 // flush them 1707 flushStore(store, seqID); 1708 storefiles2 = store.getStorefiles(); 1709 ArrayList<HStoreFile> actualStorefiles1 = Lists.newArrayList(storefiles2); 1710 actualStorefiles1.removeAll(actualStorefiles); 1711 // Do compaction 1712 MyThread thread = new MyThread(storeScanner); 1713 thread.start(); 1714 store.replaceStoreFiles(actualStorefiles, actualStorefiles1, false); 1715 thread.join(); 1716 KeyValueHeap heap2 = thread.getHeap(); 1717 assertFalse(heap.equals(heap2)); 1718 } 1719 1720 @Test 1721 public void testMaxPreadBytesConfiguredToBeLessThanZero() throws Exception { 1722 Configuration conf = HBaseConfiguration.create(); 1723 conf.set("hbase.hstore.engine.class", DummyStoreEngine.class.getName()); 1724 // Set 'hbase.storescanner.pread.max.bytes' < 0, so that StoreScanner will be a STREAM type. 1725 conf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, -1); 1726 MyStore store = initMyStore(name.getMethodName(), conf, new MyStoreHook() { 1727 }); 1728 Scan scan = new Scan(); 1729 scan.addFamily(family); 1730 // ReadType on Scan is still DEFAULT only. 1731 assertEquals(ReadType.DEFAULT, scan.getReadType()); 1732 StoreScanner storeScanner = 1733 (StoreScanner) store.getScanner(scan, scan.getFamilyMap().get(family), Long.MAX_VALUE); 1734 assertFalse(storeScanner.isScanUsePread()); 1735 } 1736 1737 @Test 1738 public void testInMemoryCompactionTypeWithLowerCase() throws IOException, InterruptedException { 1739 Configuration conf = HBaseConfiguration.create(); 1740 conf.set("hbase.systemtables.compacting.memstore.type", "eager"); 1741 init(name.getMethodName(), conf, 1742 TableDescriptorBuilder.newBuilder( 1743 TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME, "meta".getBytes())), 1744 ColumnFamilyDescriptorBuilder.newBuilder(family) 1745 .setInMemoryCompaction(MemoryCompactionPolicy.NONE).build()); 1746 assertTrue(((MemStoreCompactor) ((CompactingMemStore) store.memstore).compactor).toString() 1747 .startsWith("eager".toUpperCase())); 1748 } 1749 1750 @Test 1751 public void testSpaceQuotaChangeAfterReplacement() throws IOException { 1752 final TableName tn = TableName.valueOf(name.getMethodName()); 1753 init(name.getMethodName()); 1754 1755 RegionSizeStoreImpl sizeStore = new RegionSizeStoreImpl(); 1756 1757 HStoreFile sf1 = mockStoreFileWithLength(1024L); 1758 HStoreFile sf2 = mockStoreFileWithLength(2048L); 1759 HStoreFile sf3 = mockStoreFileWithLength(4096L); 1760 HStoreFile sf4 = mockStoreFileWithLength(8192L); 1761 1762 RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tn).setStartKey(Bytes.toBytes("a")) 1763 .setEndKey(Bytes.toBytes("b")).build(); 1764 1765 // Compacting two files down to one, reducing size 1766 sizeStore.put(regionInfo, 1024L + 4096L); 1767 store.updateSpaceQuotaAfterFileReplacement(sizeStore, regionInfo, Arrays.asList(sf1, sf3), 1768 Arrays.asList(sf2)); 1769 1770 assertEquals(2048L, sizeStore.getRegionSize(regionInfo).getSize()); 1771 1772 // The same file length in and out should have no change 1773 store.updateSpaceQuotaAfterFileReplacement(sizeStore, regionInfo, Arrays.asList(sf2), 1774 Arrays.asList(sf2)); 1775 1776 assertEquals(2048L, sizeStore.getRegionSize(regionInfo).getSize()); 1777 1778 // Increase the total size used 1779 store.updateSpaceQuotaAfterFileReplacement(sizeStore, regionInfo, Arrays.asList(sf2), 1780 Arrays.asList(sf3)); 1781 1782 assertEquals(4096L, sizeStore.getRegionSize(regionInfo).getSize()); 1783 1784 RegionInfo regionInfo2 = RegionInfoBuilder.newBuilder(tn).setStartKey(Bytes.toBytes("b")) 1785 .setEndKey(Bytes.toBytes("c")).build(); 1786 store.updateSpaceQuotaAfterFileReplacement(sizeStore, regionInfo2, null, Arrays.asList(sf4)); 1787 1788 assertEquals(8192L, sizeStore.getRegionSize(regionInfo2).getSize()); 1789 } 1790 1791 @Test 1792 public void testHFileContextSetWithCFAndTable() throws Exception { 1793 init(this.name.getMethodName()); 1794 StoreFileWriter writer = store.getStoreEngine() 1795 .createWriter(CreateStoreFileWriterParams.create().maxKeyCount(10000L) 1796 .compression(Compression.Algorithm.NONE).isCompaction(true).includeMVCCReadpoint(true) 1797 .includesTag(false).shouldDropBehind(true)); 1798 HFileContext hFileContext = writer.getHFileWriter().getFileContext(); 1799 assertArrayEquals(family, hFileContext.getColumnFamily()); 1800 assertArrayEquals(table, hFileContext.getTableName()); 1801 } 1802 1803 // This test is for HBASE-26026, HBase Write be stuck when active segment has no cell 1804 // but its dataSize exceeds inmemoryFlushSize 1805 @Test 1806 public void testCompactingMemStoreNoCellButDataSizeExceedsInmemoryFlushSize() 1807 throws IOException, InterruptedException { 1808 Configuration conf = HBaseConfiguration.create(); 1809 1810 byte[] smallValue = new byte[3]; 1811 byte[] largeValue = new byte[9]; 1812 final long timestamp = EnvironmentEdgeManager.currentTime(); 1813 final long seqId = 100; 1814 final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue); 1815 final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue); 1816 int smallCellByteSize = MutableSegment.getCellLength(smallCell); 1817 int largeCellByteSize = MutableSegment.getCellLength(largeCell); 1818 int flushByteSize = smallCellByteSize + largeCellByteSize - 2; 1819 1820 // set CompactingMemStore.inmemoryFlushSize to flushByteSize. 1821 conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore2.class.getName()); 1822 conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005); 1823 conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200)); 1824 1825 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) 1826 .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); 1827 1828 MyCompactingMemStore2 myCompactingMemStore = ((MyCompactingMemStore2) store.memstore); 1829 assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize); 1830 myCompactingMemStore.smallCellPreUpdateCounter.set(0); 1831 myCompactingMemStore.largeCellPreUpdateCounter.set(0); 1832 1833 final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>(); 1834 Thread smallCellThread = new Thread(() -> { 1835 try { 1836 store.add(smallCell, new NonThreadSafeMemStoreSizing()); 1837 } catch (Throwable exception) { 1838 exceptionRef.set(exception); 1839 } 1840 }); 1841 smallCellThread.setName(MyCompactingMemStore2.SMALL_CELL_THREAD_NAME); 1842 smallCellThread.start(); 1843 1844 String oldThreadName = Thread.currentThread().getName(); 1845 try { 1846 /** 1847 * 1.smallCellThread enters CompactingMemStore.checkAndAddToActiveSize first, then 1848 * largeCellThread enters CompactingMemStore.checkAndAddToActiveSize, and then largeCellThread 1849 * invokes flushInMemory. 1850 * <p/> 1851 * 2. After largeCellThread finished CompactingMemStore.flushInMemory method, smallCellThread 1852 * can add cell to currentActive . That is to say when largeCellThread called flushInMemory 1853 * method, CompactingMemStore.active has no cell. 1854 */ 1855 Thread.currentThread().setName(MyCompactingMemStore2.LARGE_CELL_THREAD_NAME); 1856 store.add(largeCell, new NonThreadSafeMemStoreSizing()); 1857 smallCellThread.join(); 1858 1859 for (int i = 0; i < 100; i++) { 1860 long currentTimestamp = timestamp + 100 + i; 1861 Cell cell = createCell(qf2, currentTimestamp, seqId, largeValue); 1862 store.add(cell, new NonThreadSafeMemStoreSizing()); 1863 } 1864 } finally { 1865 Thread.currentThread().setName(oldThreadName); 1866 } 1867 1868 assertTrue(exceptionRef.get() == null); 1869 1870 } 1871 1872 // This test is for HBASE-26210, HBase Write be stuck when there is cell which size exceeds 1873 // InmemoryFlushSize 1874 @Test(timeout = 60000) 1875 public void testCompactingMemStoreCellExceedInmemoryFlushSize() throws Exception { 1876 Configuration conf = HBaseConfiguration.create(); 1877 conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore6.class.getName()); 1878 1879 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) 1880 .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); 1881 1882 MyCompactingMemStore6 myCompactingMemStore = ((MyCompactingMemStore6) store.memstore); 1883 1884 int size = (int) (myCompactingMemStore.getInmemoryFlushSize()); 1885 byte[] value = new byte[size + 1]; 1886 1887 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 1888 long timestamp = EnvironmentEdgeManager.currentTime(); 1889 long seqId = 100; 1890 Cell cell = createCell(qf1, timestamp, seqId, value); 1891 int cellByteSize = MutableSegment.getCellLength(cell); 1892 store.add(cell, memStoreSizing); 1893 assertTrue(memStoreSizing.getCellsCount() == 1); 1894 assertTrue(memStoreSizing.getDataSize() == cellByteSize); 1895 // Waiting the in memory compaction completed, see HBASE-26438 1896 myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await(); 1897 } 1898 1899 // This test is for HBASE-26210 also, test write large cell and small cell concurrently when 1900 // InmemoryFlushSize is smaller,equal with and larger than cell size. 1901 @Test 1902 public void testCompactingMemStoreWriteLargeCellAndSmallCellConcurrently() 1903 throws IOException, InterruptedException { 1904 doWriteTestLargeCellAndSmallCellConcurrently( 1905 (smallCellByteSize, largeCellByteSize) -> largeCellByteSize - 1); 1906 doWriteTestLargeCellAndSmallCellConcurrently( 1907 (smallCellByteSize, largeCellByteSize) -> largeCellByteSize); 1908 doWriteTestLargeCellAndSmallCellConcurrently( 1909 (smallCellByteSize, largeCellByteSize) -> smallCellByteSize + largeCellByteSize - 1); 1910 doWriteTestLargeCellAndSmallCellConcurrently( 1911 (smallCellByteSize, largeCellByteSize) -> smallCellByteSize + largeCellByteSize); 1912 doWriteTestLargeCellAndSmallCellConcurrently( 1913 (smallCellByteSize, largeCellByteSize) -> smallCellByteSize + largeCellByteSize + 1); 1914 } 1915 1916 private void doWriteTestLargeCellAndSmallCellConcurrently(IntBinaryOperator getFlushByteSize) 1917 throws IOException, InterruptedException { 1918 1919 Configuration conf = HBaseConfiguration.create(); 1920 1921 byte[] smallValue = new byte[3]; 1922 byte[] largeValue = new byte[100]; 1923 final long timestamp = EnvironmentEdgeManager.currentTime(); 1924 final long seqId = 100; 1925 final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue); 1926 final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue); 1927 int smallCellByteSize = MutableSegment.getCellLength(smallCell); 1928 int largeCellByteSize = MutableSegment.getCellLength(largeCell); 1929 int flushByteSize = getFlushByteSize.applyAsInt(smallCellByteSize, largeCellByteSize); 1930 boolean flushByteSizeLessThanSmallAndLargeCellSize = 1931 flushByteSize < (smallCellByteSize + largeCellByteSize); 1932 1933 conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore3.class.getName()); 1934 conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005); 1935 conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200)); 1936 1937 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) 1938 .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); 1939 1940 MyCompactingMemStore3 myCompactingMemStore = ((MyCompactingMemStore3) store.memstore); 1941 assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize); 1942 myCompactingMemStore.disableCompaction(); 1943 if (flushByteSizeLessThanSmallAndLargeCellSize) { 1944 myCompactingMemStore.flushByteSizeLessThanSmallAndLargeCellSize = true; 1945 } else { 1946 myCompactingMemStore.flushByteSizeLessThanSmallAndLargeCellSize = false; 1947 } 1948 1949 final ThreadSafeMemStoreSizing memStoreSizing = new ThreadSafeMemStoreSizing(); 1950 final AtomicLong totalCellByteSize = new AtomicLong(0); 1951 final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>(); 1952 Thread smallCellThread = new Thread(() -> { 1953 try { 1954 for (int i = 1; i <= MyCompactingMemStore3.CELL_COUNT; i++) { 1955 long currentTimestamp = timestamp + i; 1956 Cell cell = createCell(qf1, currentTimestamp, seqId, smallValue); 1957 totalCellByteSize.addAndGet(MutableSegment.getCellLength(cell)); 1958 store.add(cell, memStoreSizing); 1959 } 1960 } catch (Throwable exception) { 1961 exceptionRef.set(exception); 1962 1963 } 1964 }); 1965 smallCellThread.setName(MyCompactingMemStore3.SMALL_CELL_THREAD_NAME); 1966 smallCellThread.start(); 1967 1968 String oldThreadName = Thread.currentThread().getName(); 1969 try { 1970 /** 1971 * When flushByteSizeLessThanSmallAndLargeCellSize is true: 1972 * </p> 1973 * 1.smallCellThread enters MyCompactingMemStore3.checkAndAddToActiveSize first, then 1974 * largeCellThread enters MyCompactingMemStore3.checkAndAddToActiveSize, and then 1975 * largeCellThread invokes flushInMemory. 1976 * <p/> 1977 * 2. After largeCellThread finished CompactingMemStore.flushInMemory method, smallCellThread 1978 * can run into MyCompactingMemStore3.checkAndAddToActiveSize again. 1979 * <p/> 1980 * When flushByteSizeLessThanSmallAndLargeCellSize is false: smallCellThread and 1981 * largeCellThread concurrently write one cell and wait each other, and then write another 1982 * cell etc. 1983 */ 1984 Thread.currentThread().setName(MyCompactingMemStore3.LARGE_CELL_THREAD_NAME); 1985 for (int i = 1; i <= MyCompactingMemStore3.CELL_COUNT; i++) { 1986 long currentTimestamp = timestamp + i; 1987 Cell cell = createCell(qf2, currentTimestamp, seqId, largeValue); 1988 totalCellByteSize.addAndGet(MutableSegment.getCellLength(cell)); 1989 store.add(cell, memStoreSizing); 1990 } 1991 smallCellThread.join(); 1992 1993 assertTrue(exceptionRef.get() == null); 1994 assertTrue(memStoreSizing.getCellsCount() == (MyCompactingMemStore3.CELL_COUNT * 2)); 1995 assertTrue(memStoreSizing.getDataSize() == totalCellByteSize.get()); 1996 if (flushByteSizeLessThanSmallAndLargeCellSize) { 1997 assertTrue(myCompactingMemStore.flushCounter.get() == MyCompactingMemStore3.CELL_COUNT); 1998 } else { 1999 assertTrue( 2000 myCompactingMemStore.flushCounter.get() <= (MyCompactingMemStore3.CELL_COUNT - 1)); 2001 } 2002 } finally { 2003 Thread.currentThread().setName(oldThreadName); 2004 } 2005 } 2006 2007 /** 2008 * <pre> 2009 * This test is for HBASE-26384, 2010 * test {@link CompactingMemStore#flattenOneSegment} and {@link CompactingMemStore#snapshot()} 2011 * execute concurrently. 2012 * The threads sequence before HBASE-26384 is(The bug only exists for branch-2,and I add UTs 2013 * for both branch-2 and master): 2014 * 1. The {@link CompactingMemStore} size exceeds 2015 * {@link CompactingMemStore#getInmemoryFlushSize()},the write thread adds a new 2016 * {@link ImmutableSegment} to the head of {@link CompactingMemStore#pipeline},and start a 2017 * in memory compact thread to execute {@link CompactingMemStore#inMemoryCompaction}. 2018 * 2. The in memory compact thread starts and then stopping before 2019 * {@link CompactingMemStore#flattenOneSegment}. 2020 * 3. The snapshot thread starts {@link CompactingMemStore#snapshot} concurrently,after the 2021 * snapshot thread executing {@link CompactingMemStore#getImmutableSegments},the in memory 2022 * compact thread continues. 2023 * Assuming {@link VersionedSegmentsList#version} returned from 2024 * {@link CompactingMemStore#getImmutableSegments} is v. 2025 * 4. The snapshot thread stopping before {@link CompactingMemStore#swapPipelineWithNull}. 2026 * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment}, 2027 * {@link CompactionPipeline#version} is still v. 2028 * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because 2029 * {@link CompactionPipeline#version} is v, {@link CompactingMemStore#swapPipelineWithNull} 2030 * thinks it is successful and continue flushing,but the {@link ImmutableSegment} in 2031 * {@link CompactionPipeline} has changed because 2032 * {@link CompactingMemStore#flattenOneSegment},so the {@link ImmutableSegment} is not 2033 * removed in fact and still remaining in {@link CompactionPipeline}. 2034 * 2035 * After HBASE-26384, the 5-6 step is changed to following, which is expected behavior: 2036 * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment}, 2037 * {@link CompactingMemStore#flattenOneSegment} change {@link CompactionPipeline#version} to 2038 * v+1. 2039 * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because 2040 * {@link CompactionPipeline#version} is v+1, {@link CompactingMemStore#swapPipelineWithNull} 2041 * failed and retry the while loop in {@link CompactingMemStore#pushPipelineToSnapshot} once 2042 * again, because there is no concurrent {@link CompactingMemStore#inMemoryCompaction} now, 2043 * {@link CompactingMemStore#swapPipelineWithNull} succeeds. 2044 * </pre> 2045 */ 2046 @Test 2047 public void testFlattenAndSnapshotCompactingMemStoreConcurrently() throws Exception { 2048 Configuration conf = HBaseConfiguration.create(); 2049 2050 byte[] smallValue = new byte[3]; 2051 byte[] largeValue = new byte[9]; 2052 final long timestamp = EnvironmentEdgeManager.currentTime(); 2053 final long seqId = 100; 2054 final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue); 2055 final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue); 2056 int smallCellByteSize = MutableSegment.getCellLength(smallCell); 2057 int largeCellByteSize = MutableSegment.getCellLength(largeCell); 2058 int totalCellByteSize = (smallCellByteSize + largeCellByteSize); 2059 int flushByteSize = totalCellByteSize - 2; 2060 2061 // set CompactingMemStore.inmemoryFlushSize to flushByteSize. 2062 conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore4.class.getName()); 2063 conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005); 2064 conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200)); 2065 2066 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) 2067 .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); 2068 2069 MyCompactingMemStore4 myCompactingMemStore = ((MyCompactingMemStore4) store.memstore); 2070 assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize); 2071 2072 store.add(smallCell, new NonThreadSafeMemStoreSizing()); 2073 store.add(largeCell, new NonThreadSafeMemStoreSizing()); 2074 2075 String oldThreadName = Thread.currentThread().getName(); 2076 try { 2077 Thread.currentThread().setName(MyCompactingMemStore4.TAKE_SNAPSHOT_THREAD_NAME); 2078 /** 2079 * {@link CompactingMemStore#snapshot} must wait the in memory compact thread enters 2080 * {@link CompactingMemStore#flattenOneSegment},because {@link CompactingMemStore#snapshot} 2081 * would invoke {@link CompactingMemStore#stopCompaction}. 2082 */ 2083 myCompactingMemStore.snapShotStartCyclicCyclicBarrier.await(); 2084 2085 MemStoreSnapshot memStoreSnapshot = myCompactingMemStore.snapshot(); 2086 myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await(); 2087 2088 assertTrue(memStoreSnapshot.getCellsCount() == 2); 2089 assertTrue(((int) (memStoreSnapshot.getDataSize())) == totalCellByteSize); 2090 VersionedSegmentsList segments = myCompactingMemStore.getImmutableSegments(); 2091 assertTrue(segments.getNumOfSegments() == 0); 2092 assertTrue(segments.getNumOfCells() == 0); 2093 assertTrue(myCompactingMemStore.setInMemoryCompactionFlagCounter.get() == 1); 2094 assertTrue(myCompactingMemStore.swapPipelineWithNullCounter.get() == 2); 2095 } finally { 2096 Thread.currentThread().setName(oldThreadName); 2097 } 2098 } 2099 2100 /** 2101 * <pre> 2102 * This test is for HBASE-26384, 2103 * test {@link CompactingMemStore#flattenOneSegment}{@link CompactingMemStore#snapshot()} 2104 * and writeMemStore execute concurrently. 2105 * The threads sequence before HBASE-26384 is(The bug only exists for branch-2,and I add UTs 2106 * for both branch-2 and master): 2107 * 1. The {@link CompactingMemStore} size exceeds 2108 * {@link CompactingMemStore#getInmemoryFlushSize()},the write thread adds a new 2109 * {@link ImmutableSegment} to the head of {@link CompactingMemStore#pipeline},and start a 2110 * in memory compact thread to execute {@link CompactingMemStore#inMemoryCompaction}. 2111 * 2. The in memory compact thread starts and then stopping before 2112 * {@link CompactingMemStore#flattenOneSegment}. 2113 * 3. The snapshot thread starts {@link CompactingMemStore#snapshot} concurrently,after the 2114 * snapshot thread executing {@link CompactingMemStore#getImmutableSegments},the in memory 2115 * compact thread continues. 2116 * Assuming {@link VersionedSegmentsList#version} returned from 2117 * {@link CompactingMemStore#getImmutableSegments} is v. 2118 * 4. The snapshot thread stopping before {@link CompactingMemStore#swapPipelineWithNull}. 2119 * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment}, 2120 * {@link CompactionPipeline#version} is still v. 2121 * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because 2122 * {@link CompactionPipeline#version} is v, {@link CompactingMemStore#swapPipelineWithNull} 2123 * thinks it is successful and continue flushing,but the {@link ImmutableSegment} in 2124 * {@link CompactionPipeline} has changed because 2125 * {@link CompactingMemStore#flattenOneSegment},so the {@link ImmutableSegment} is not 2126 * removed in fact and still remaining in {@link CompactionPipeline}. 2127 * 2128 * After HBASE-26384, the 5-6 step is changed to following, which is expected behavior, 2129 * and I add step 7-8 to test there is new segment added before retry. 2130 * 5. The in memory compact thread completes {@link CompactingMemStore#flattenOneSegment}, 2131 * {@link CompactingMemStore#flattenOneSegment} change {@link CompactionPipeline#version} to 2132 * v+1. 2133 * 6. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because 2134 * {@link CompactionPipeline#version} is v+1, {@link CompactingMemStore#swapPipelineWithNull} 2135 * failed and retry,{@link VersionedSegmentsList#version} returned from 2136 * {@link CompactingMemStore#getImmutableSegments} is v+1. 2137 * 7. The write thread continues writing to {@link CompactingMemStore} and 2138 * {@link CompactingMemStore} size exceeds {@link CompactingMemStore#getInmemoryFlushSize()}, 2139 * {@link CompactingMemStore#flushInMemory(MutableSegment)} is called and a new 2140 * {@link ImmutableSegment} is added to the head of {@link CompactingMemStore#pipeline}, 2141 * {@link CompactionPipeline#version} is still v+1. 2142 * 8. The snapshot thread continues {@link CompactingMemStore#swapPipelineWithNull}, and because 2143 * {@link CompactionPipeline#version} is still v+1, 2144 * {@link CompactingMemStore#swapPipelineWithNull} succeeds.The new {@link ImmutableSegment} 2145 * remained at the head of {@link CompactingMemStore#pipeline},the old is removed by 2146 * {@link CompactingMemStore#swapPipelineWithNull}. 2147 * </pre> 2148 */ 2149 @Test 2150 public void testFlattenSnapshotWriteCompactingMemeStoreConcurrently() throws Exception { 2151 Configuration conf = HBaseConfiguration.create(); 2152 2153 byte[] smallValue = new byte[3]; 2154 byte[] largeValue = new byte[9]; 2155 final long timestamp = EnvironmentEdgeManager.currentTime(); 2156 final long seqId = 100; 2157 final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue); 2158 final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue); 2159 int smallCellByteSize = MutableSegment.getCellLength(smallCell); 2160 int largeCellByteSize = MutableSegment.getCellLength(largeCell); 2161 int firstWriteCellByteSize = (smallCellByteSize + largeCellByteSize); 2162 int flushByteSize = firstWriteCellByteSize - 2; 2163 2164 // set CompactingMemStore.inmemoryFlushSize to flushByteSize. 2165 conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore5.class.getName()); 2166 conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005); 2167 conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200)); 2168 2169 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) 2170 .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); 2171 2172 final MyCompactingMemStore5 myCompactingMemStore = ((MyCompactingMemStore5) store.memstore); 2173 assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize); 2174 2175 store.add(smallCell, new NonThreadSafeMemStoreSizing()); 2176 store.add(largeCell, new NonThreadSafeMemStoreSizing()); 2177 2178 final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>(); 2179 final Cell writeAgainCell1 = createCell(qf3, timestamp, seqId + 1, largeValue); 2180 final Cell writeAgainCell2 = createCell(qf4, timestamp, seqId + 1, largeValue); 2181 final int writeAgainCellByteSize = 2182 MutableSegment.getCellLength(writeAgainCell1) + MutableSegment.getCellLength(writeAgainCell2); 2183 final Thread writeAgainThread = new Thread(() -> { 2184 try { 2185 myCompactingMemStore.writeMemStoreAgainStartCyclicBarrier.await(); 2186 2187 store.add(writeAgainCell1, new NonThreadSafeMemStoreSizing()); 2188 store.add(writeAgainCell2, new NonThreadSafeMemStoreSizing()); 2189 2190 myCompactingMemStore.writeMemStoreAgainEndCyclicBarrier.await(); 2191 } catch (Throwable exception) { 2192 exceptionRef.set(exception); 2193 } 2194 }); 2195 writeAgainThread.setName(MyCompactingMemStore5.WRITE_AGAIN_THREAD_NAME); 2196 writeAgainThread.start(); 2197 2198 String oldThreadName = Thread.currentThread().getName(); 2199 try { 2200 Thread.currentThread().setName(MyCompactingMemStore5.TAKE_SNAPSHOT_THREAD_NAME); 2201 /** 2202 * {@link CompactingMemStore#snapshot} must wait the in memory compact thread enters 2203 * {@link CompactingMemStore#flattenOneSegment},because {@link CompactingMemStore#snapshot} 2204 * would invoke {@link CompactingMemStore#stopCompaction}. 2205 */ 2206 myCompactingMemStore.snapShotStartCyclicCyclicBarrier.await(); 2207 MemStoreSnapshot memStoreSnapshot = myCompactingMemStore.snapshot(); 2208 myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await(); 2209 writeAgainThread.join(); 2210 2211 assertTrue(memStoreSnapshot.getCellsCount() == 2); 2212 assertTrue(((int) (memStoreSnapshot.getDataSize())) == firstWriteCellByteSize); 2213 VersionedSegmentsList segments = myCompactingMemStore.getImmutableSegments(); 2214 assertTrue(segments.getNumOfSegments() == 1); 2215 assertTrue( 2216 ((int) (segments.getStoreSegments().get(0).getDataSize())) == writeAgainCellByteSize); 2217 assertTrue(segments.getNumOfCells() == 2); 2218 assertTrue(myCompactingMemStore.setInMemoryCompactionFlagCounter.get() == 2); 2219 assertTrue(exceptionRef.get() == null); 2220 assertTrue(myCompactingMemStore.swapPipelineWithNullCounter.get() == 2); 2221 } finally { 2222 Thread.currentThread().setName(oldThreadName); 2223 } 2224 } 2225 2226 /** 2227 * <pre> 2228 * This test is for HBASE-26465, 2229 * test {@link DefaultMemStore#clearSnapshot} and {@link DefaultMemStore#getScanners} execute 2230 * concurrently. The threads sequence before HBASE-26465 is: 2231 * 1.The flush thread starts {@link DefaultMemStore} flushing after some cells have be added to 2232 * {@link DefaultMemStore}. 2233 * 2.The flush thread stopping before {@link DefaultMemStore#clearSnapshot} in 2234 * {@link HStore#updateStorefiles} after completed flushing memStore to hfile. 2235 * 3.The scan thread starts and stopping after {@link DefaultMemStore#getSnapshotSegments} in 2236 * {@link DefaultMemStore#getScanners},here the scan thread gets the 2237 * {@link DefaultMemStore#snapshot} which is created by the flush thread. 2238 * 4.The flush thread continues {@link DefaultMemStore#clearSnapshot} and close 2239 * {@link DefaultMemStore#snapshot},because the reference count of the corresponding 2240 * {@link MemStoreLABImpl} is 0, the {@link Chunk}s in corresponding {@link MemStoreLABImpl} 2241 * are recycled. 2242 * 5.The scan thread continues {@link DefaultMemStore#getScanners},and create a 2243 * {@link SegmentScanner} for this {@link DefaultMemStore#snapshot}, and increase the 2244 * reference count of the corresponding {@link MemStoreLABImpl}, but {@link Chunk}s in 2245 * corresponding {@link MemStoreLABImpl} are recycled by step 4, and these {@link Chunk}s may 2246 * be overwritten by other write threads,which may cause serious problem. 2247 * After HBASE-26465,{@link DefaultMemStore#getScanners} and 2248 * {@link DefaultMemStore#clearSnapshot} could not execute concurrently. 2249 * </pre> 2250 */ 2251 @Test 2252 public void testClearSnapshotGetScannerConcurrently() throws Exception { 2253 Configuration conf = HBaseConfiguration.create(); 2254 2255 byte[] smallValue = new byte[3]; 2256 byte[] largeValue = new byte[9]; 2257 final long timestamp = EnvironmentEdgeManager.currentTime(); 2258 final long seqId = 100; 2259 final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue); 2260 final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue); 2261 TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); 2262 quals.add(qf1); 2263 quals.add(qf2); 2264 2265 conf.set(HStore.MEMSTORE_CLASS_NAME, MyDefaultMemStore.class.getName()); 2266 conf.setBoolean(WALFactory.WAL_ENABLED, false); 2267 2268 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build()); 2269 MyDefaultMemStore myDefaultMemStore = (MyDefaultMemStore) (store.memstore); 2270 myDefaultMemStore.store = store; 2271 2272 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 2273 store.add(smallCell, memStoreSizing); 2274 store.add(largeCell, memStoreSizing); 2275 2276 final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>(); 2277 final Thread flushThread = new Thread(() -> { 2278 try { 2279 flushStore(store, id++); 2280 } catch (Throwable exception) { 2281 exceptionRef.set(exception); 2282 } 2283 }); 2284 flushThread.setName(MyDefaultMemStore.FLUSH_THREAD_NAME); 2285 flushThread.start(); 2286 2287 String oldThreadName = Thread.currentThread().getName(); 2288 StoreScanner storeScanner = null; 2289 try { 2290 Thread.currentThread().setName(MyDefaultMemStore.GET_SCANNER_THREAD_NAME); 2291 2292 /** 2293 * Wait flush thread stopping before {@link DefaultMemStore#doClearSnapshot} 2294 */ 2295 myDefaultMemStore.getScannerCyclicBarrier.await(); 2296 2297 storeScanner = (StoreScanner) store.getScanner(new Scan(new Get(row)), quals, seqId + 1); 2298 flushThread.join(); 2299 2300 if (myDefaultMemStore.shouldWait) { 2301 SegmentScanner segmentScanner = getTypeKeyValueScanner(storeScanner, SegmentScanner.class); 2302 MemStoreLABImpl memStoreLAB = (MemStoreLABImpl) (segmentScanner.segment.getMemStoreLAB()); 2303 assertTrue(memStoreLAB.isClosed()); 2304 assertTrue(!memStoreLAB.chunks.isEmpty()); 2305 assertTrue(!memStoreLAB.isReclaimed()); 2306 2307 Cell cell1 = segmentScanner.next(); 2308 CellUtil.equals(smallCell, cell1); 2309 Cell cell2 = segmentScanner.next(); 2310 CellUtil.equals(largeCell, cell2); 2311 assertNull(segmentScanner.next()); 2312 } else { 2313 List<Cell> results = new ArrayList<>(); 2314 storeScanner.next(results); 2315 assertEquals(2, results.size()); 2316 CellUtil.equals(smallCell, results.get(0)); 2317 CellUtil.equals(largeCell, results.get(1)); 2318 } 2319 assertTrue(exceptionRef.get() == null); 2320 } finally { 2321 if (storeScanner != null) { 2322 storeScanner.close(); 2323 } 2324 Thread.currentThread().setName(oldThreadName); 2325 } 2326 } 2327 2328 @SuppressWarnings("unchecked") 2329 private <T> T getTypeKeyValueScanner(StoreScanner storeScanner, Class<T> keyValueScannerClass) { 2330 List<T> resultScanners = new ArrayList<T>(); 2331 for (KeyValueScanner keyValueScanner : storeScanner.currentScanners) { 2332 if (keyValueScannerClass.isInstance(keyValueScanner)) { 2333 resultScanners.add((T) keyValueScanner); 2334 } 2335 } 2336 assertTrue(resultScanners.size() == 1); 2337 return resultScanners.get(0); 2338 } 2339 2340 @Test 2341 public void testOnConfigurationChange() throws IOException { 2342 final int COMMON_MAX_FILES_TO_COMPACT = 10; 2343 final int NEW_COMMON_MAX_FILES_TO_COMPACT = 8; 2344 final int STORE_MAX_FILES_TO_COMPACT = 6; 2345 2346 // Build a table that its maxFileToCompact different from common configuration. 2347 Configuration conf = HBaseConfiguration.create(); 2348 conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 2349 COMMON_MAX_FILES_TO_COMPACT); 2350 ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.newBuilder(family) 2351 .setConfiguration(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 2352 String.valueOf(STORE_MAX_FILES_TO_COMPACT)) 2353 .build(); 2354 init(this.name.getMethodName(), conf, hcd); 2355 2356 // After updating common configuration, the conf in HStore itself must not be changed. 2357 conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 2358 NEW_COMMON_MAX_FILES_TO_COMPACT); 2359 this.store.onConfigurationChange(conf); 2360 assertEquals(STORE_MAX_FILES_TO_COMPACT, 2361 store.getStoreEngine().getCompactionPolicy().getConf().getMaxFilesToCompact()); 2362 } 2363 2364 /** 2365 * This test is for HBASE-26476 2366 */ 2367 @Test 2368 public void testExtendsDefaultMemStore() throws Exception { 2369 Configuration conf = HBaseConfiguration.create(); 2370 conf.setBoolean(WALFactory.WAL_ENABLED, false); 2371 2372 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build()); 2373 assertTrue(this.store.memstore.getClass() == DefaultMemStore.class); 2374 tearDown(); 2375 2376 conf.set(HStore.MEMSTORE_CLASS_NAME, CustomDefaultMemStore.class.getName()); 2377 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build()); 2378 assertTrue(this.store.memstore.getClass() == CustomDefaultMemStore.class); 2379 } 2380 2381 static class CustomDefaultMemStore extends DefaultMemStore { 2382 2383 public CustomDefaultMemStore(Configuration conf, CellComparator c, 2384 RegionServicesForStores regionServices) { 2385 super(conf, c, regionServices); 2386 } 2387 2388 } 2389 2390 /** 2391 * This test is for HBASE-26488 2392 */ 2393 @Test 2394 public void testMemoryLeakWhenFlushMemStoreRetrying() throws Exception { 2395 2396 Configuration conf = HBaseConfiguration.create(); 2397 2398 byte[] smallValue = new byte[3]; 2399 byte[] largeValue = new byte[9]; 2400 final long timestamp = EnvironmentEdgeManager.currentTime(); 2401 final long seqId = 100; 2402 final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue); 2403 final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue); 2404 TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); 2405 quals.add(qf1); 2406 quals.add(qf2); 2407 2408 conf.set(HStore.MEMSTORE_CLASS_NAME, MyDefaultMemStore1.class.getName()); 2409 conf.setBoolean(WALFactory.WAL_ENABLED, false); 2410 conf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY, 2411 MyDefaultStoreFlusher.class.getName()); 2412 2413 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build()); 2414 MyDefaultMemStore1 myDefaultMemStore = (MyDefaultMemStore1) (store.memstore); 2415 assertTrue((store.storeEngine.getStoreFlusher()) instanceof MyDefaultStoreFlusher); 2416 2417 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 2418 store.add(smallCell, memStoreSizing); 2419 store.add(largeCell, memStoreSizing); 2420 flushStore(store, id++); 2421 2422 MemStoreLABImpl memStoreLAB = 2423 (MemStoreLABImpl) (myDefaultMemStore.snapshotImmutableSegment.getMemStoreLAB()); 2424 assertTrue(memStoreLAB.isClosed()); 2425 assertTrue(memStoreLAB.getRefCntValue() == 0); 2426 assertTrue(memStoreLAB.isReclaimed()); 2427 assertTrue(memStoreLAB.chunks.isEmpty()); 2428 StoreScanner storeScanner = null; 2429 try { 2430 storeScanner = (StoreScanner) store.getScanner(new Scan(new Get(row)), quals, seqId + 1); 2431 assertTrue(store.storeEngine.getStoreFileManager().getStorefileCount() == 1); 2432 assertTrue(store.memstore.size().getCellsCount() == 0); 2433 assertTrue(store.memstore.getSnapshotSize().getCellsCount() == 0); 2434 assertTrue(storeScanner.currentScanners.size() == 1); 2435 assertTrue(storeScanner.currentScanners.get(0) instanceof StoreFileScanner); 2436 2437 List<Cell> results = new ArrayList<>(); 2438 storeScanner.next(results); 2439 assertEquals(2, results.size()); 2440 CellUtil.equals(smallCell, results.get(0)); 2441 CellUtil.equals(largeCell, results.get(1)); 2442 } finally { 2443 if (storeScanner != null) { 2444 storeScanner.close(); 2445 } 2446 } 2447 } 2448 2449 static class MyDefaultMemStore1 extends DefaultMemStore { 2450 2451 private ImmutableSegment snapshotImmutableSegment; 2452 2453 public MyDefaultMemStore1(Configuration conf, CellComparator c, 2454 RegionServicesForStores regionServices) { 2455 super(conf, c, regionServices); 2456 } 2457 2458 @Override 2459 public MemStoreSnapshot snapshot() { 2460 MemStoreSnapshot result = super.snapshot(); 2461 this.snapshotImmutableSegment = snapshot; 2462 return result; 2463 } 2464 2465 } 2466 2467 public static class MyDefaultStoreFlusher extends DefaultStoreFlusher { 2468 private static final AtomicInteger failCounter = new AtomicInteger(1); 2469 private static final AtomicInteger counter = new AtomicInteger(0); 2470 2471 public MyDefaultStoreFlusher(Configuration conf, HStore store) { 2472 super(conf, store); 2473 } 2474 2475 @Override 2476 public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId, 2477 MonitoredTask status, ThroughputController throughputController, 2478 FlushLifeCycleTracker tracker, Consumer<Path> writerCreationTracker) throws IOException { 2479 counter.incrementAndGet(); 2480 return super.flushSnapshot(snapshot, cacheFlushId, status, throughputController, tracker, 2481 writerCreationTracker); 2482 } 2483 2484 @Override 2485 protected void performFlush(InternalScanner scanner, final CellSink sink, 2486 ThroughputController throughputController) throws IOException { 2487 2488 final int currentCount = counter.get(); 2489 CellSink newCellSink = (cell) -> { 2490 if (currentCount <= failCounter.get()) { 2491 throw new IOException("Simulated exception by tests"); 2492 } 2493 sink.append(cell); 2494 }; 2495 super.performFlush(scanner, newCellSink, throughputController); 2496 } 2497 } 2498 2499 /** 2500 * This test is for HBASE-26494, test the {@link RefCnt} behaviors in {@link ImmutableMemStoreLAB} 2501 */ 2502 @Test 2503 public void testImmutableMemStoreLABRefCnt() throws Exception { 2504 Configuration conf = HBaseConfiguration.create(); 2505 2506 byte[] smallValue = new byte[3]; 2507 byte[] largeValue = new byte[9]; 2508 final long timestamp = EnvironmentEdgeManager.currentTime(); 2509 final long seqId = 100; 2510 final Cell smallCell1 = createCell(qf1, timestamp, seqId, smallValue); 2511 final Cell largeCell1 = createCell(qf2, timestamp, seqId, largeValue); 2512 final Cell smallCell2 = createCell(qf3, timestamp, seqId + 1, smallValue); 2513 final Cell largeCell2 = createCell(qf4, timestamp, seqId + 1, largeValue); 2514 final Cell smallCell3 = createCell(qf5, timestamp, seqId + 2, smallValue); 2515 final Cell largeCell3 = createCell(qf6, timestamp, seqId + 2, largeValue); 2516 2517 int smallCellByteSize = MutableSegment.getCellLength(smallCell1); 2518 int largeCellByteSize = MutableSegment.getCellLength(largeCell1); 2519 int firstWriteCellByteSize = (smallCellByteSize + largeCellByteSize); 2520 int flushByteSize = firstWriteCellByteSize - 2; 2521 2522 // set CompactingMemStore.inmemoryFlushSize to flushByteSize. 2523 conf.set(HStore.MEMSTORE_CLASS_NAME, CompactingMemStore.class.getName()); 2524 conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005); 2525 conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200)); 2526 conf.setBoolean(WALFactory.WAL_ENABLED, false); 2527 2528 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) 2529 .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); 2530 2531 final CompactingMemStore myCompactingMemStore = ((CompactingMemStore) store.memstore); 2532 assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize); 2533 myCompactingMemStore.allowCompaction.set(false); 2534 2535 NonThreadSafeMemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 2536 store.add(smallCell1, memStoreSizing); 2537 store.add(largeCell1, memStoreSizing); 2538 store.add(smallCell2, memStoreSizing); 2539 store.add(largeCell2, memStoreSizing); 2540 store.add(smallCell3, memStoreSizing); 2541 store.add(largeCell3, memStoreSizing); 2542 VersionedSegmentsList versionedSegmentsList = myCompactingMemStore.getImmutableSegments(); 2543 assertTrue(versionedSegmentsList.getNumOfSegments() == 3); 2544 List<ImmutableSegment> segments = versionedSegmentsList.getStoreSegments(); 2545 List<MemStoreLABImpl> memStoreLABs = new ArrayList<MemStoreLABImpl>(segments.size()); 2546 for (ImmutableSegment segment : segments) { 2547 memStoreLABs.add((MemStoreLABImpl) segment.getMemStoreLAB()); 2548 } 2549 List<KeyValueScanner> scanners1 = myCompactingMemStore.getScanners(Long.MAX_VALUE); 2550 for (MemStoreLABImpl memStoreLAB : memStoreLABs) { 2551 assertTrue(memStoreLAB.getRefCntValue() == 2); 2552 } 2553 2554 myCompactingMemStore.allowCompaction.set(true); 2555 myCompactingMemStore.flushInMemory(); 2556 2557 versionedSegmentsList = myCompactingMemStore.getImmutableSegments(); 2558 assertTrue(versionedSegmentsList.getNumOfSegments() == 1); 2559 ImmutableMemStoreLAB immutableMemStoreLAB = 2560 (ImmutableMemStoreLAB) (versionedSegmentsList.getStoreSegments().get(0).getMemStoreLAB()); 2561 for (MemStoreLABImpl memStoreLAB : memStoreLABs) { 2562 assertTrue(memStoreLAB.getRefCntValue() == 2); 2563 } 2564 2565 List<KeyValueScanner> scanners2 = myCompactingMemStore.getScanners(Long.MAX_VALUE); 2566 for (MemStoreLABImpl memStoreLAB : memStoreLABs) { 2567 assertTrue(memStoreLAB.getRefCntValue() == 2); 2568 } 2569 assertTrue(immutableMemStoreLAB.getRefCntValue() == 2); 2570 for (KeyValueScanner scanner : scanners1) { 2571 scanner.close(); 2572 } 2573 for (MemStoreLABImpl memStoreLAB : memStoreLABs) { 2574 assertTrue(memStoreLAB.getRefCntValue() == 1); 2575 } 2576 for (KeyValueScanner scanner : scanners2) { 2577 scanner.close(); 2578 } 2579 for (MemStoreLABImpl memStoreLAB : memStoreLABs) { 2580 assertTrue(memStoreLAB.getRefCntValue() == 1); 2581 } 2582 assertTrue(immutableMemStoreLAB.getRefCntValue() == 1); 2583 flushStore(store, id++); 2584 for (MemStoreLABImpl memStoreLAB : memStoreLABs) { 2585 assertTrue(memStoreLAB.getRefCntValue() == 0); 2586 } 2587 assertTrue(immutableMemStoreLAB.getRefCntValue() == 0); 2588 assertTrue(immutableMemStoreLAB.isClosed()); 2589 for (MemStoreLABImpl memStoreLAB : memStoreLABs) { 2590 assertTrue(memStoreLAB.isClosed()); 2591 assertTrue(memStoreLAB.isReclaimed()); 2592 assertTrue(memStoreLAB.chunks.isEmpty()); 2593 } 2594 } 2595 2596 private HStoreFile mockStoreFileWithLength(long length) { 2597 HStoreFile sf = mock(HStoreFile.class); 2598 StoreFileReader sfr = mock(StoreFileReader.class); 2599 when(sf.isHFile()).thenReturn(true); 2600 when(sf.getReader()).thenReturn(sfr); 2601 when(sfr.length()).thenReturn(length); 2602 return sf; 2603 } 2604 2605 private static class MyThread extends Thread { 2606 private StoreScanner scanner; 2607 private KeyValueHeap heap; 2608 2609 public MyThread(StoreScanner scanner) { 2610 this.scanner = scanner; 2611 } 2612 2613 public KeyValueHeap getHeap() { 2614 return this.heap; 2615 } 2616 2617 @Override 2618 public void run() { 2619 scanner.trySwitchToStreamRead(); 2620 heap = scanner.heap; 2621 } 2622 } 2623 2624 private static class MyMemStoreCompactor extends MemStoreCompactor { 2625 private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0); 2626 private static final CountDownLatch START_COMPACTOR_LATCH = new CountDownLatch(1); 2627 2628 public MyMemStoreCompactor(CompactingMemStore compactingMemStore, 2629 MemoryCompactionPolicy compactionPolicy) throws IllegalArgumentIOException { 2630 super(compactingMemStore, compactionPolicy); 2631 } 2632 2633 @Override 2634 public boolean start() throws IOException { 2635 boolean isFirst = RUNNER_COUNT.getAndIncrement() == 0; 2636 if (isFirst) { 2637 try { 2638 START_COMPACTOR_LATCH.await(); 2639 return super.start(); 2640 } catch (InterruptedException ex) { 2641 throw new RuntimeException(ex); 2642 } 2643 } 2644 return super.start(); 2645 } 2646 } 2647 2648 public static class MyCompactingMemStoreWithCustomCompactor extends CompactingMemStore { 2649 private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0); 2650 2651 public MyCompactingMemStoreWithCustomCompactor(Configuration conf, CellComparatorImpl c, 2652 HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy) 2653 throws IOException { 2654 super(conf, c, store, regionServices, compactionPolicy); 2655 } 2656 2657 @Override 2658 protected MemStoreCompactor createMemStoreCompactor(MemoryCompactionPolicy compactionPolicy) 2659 throws IllegalArgumentIOException { 2660 return new MyMemStoreCompactor(this, compactionPolicy); 2661 } 2662 2663 @Override 2664 protected boolean setInMemoryCompactionFlag() { 2665 boolean rval = super.setInMemoryCompactionFlag(); 2666 if (rval) { 2667 RUNNER_COUNT.incrementAndGet(); 2668 if (LOG.isDebugEnabled()) { 2669 LOG.debug("runner count: " + RUNNER_COUNT.get()); 2670 } 2671 } 2672 return rval; 2673 } 2674 } 2675 2676 public static class MyCompactingMemStore extends CompactingMemStore { 2677 private static final AtomicBoolean START_TEST = new AtomicBoolean(false); 2678 private final CountDownLatch getScannerLatch = new CountDownLatch(1); 2679 private final CountDownLatch snapshotLatch = new CountDownLatch(1); 2680 2681 public MyCompactingMemStore(Configuration conf, CellComparatorImpl c, HStore store, 2682 RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy) 2683 throws IOException { 2684 super(conf, c, store, regionServices, compactionPolicy); 2685 } 2686 2687 @Override 2688 protected List<KeyValueScanner> createList(int capacity) { 2689 if (START_TEST.get()) { 2690 try { 2691 getScannerLatch.countDown(); 2692 snapshotLatch.await(); 2693 } catch (InterruptedException e) { 2694 throw new RuntimeException(e); 2695 } 2696 } 2697 return new ArrayList<>(capacity); 2698 } 2699 2700 @Override 2701 protected void pushActiveToPipeline(MutableSegment active, boolean checkEmpty) { 2702 if (START_TEST.get()) { 2703 try { 2704 getScannerLatch.await(); 2705 } catch (InterruptedException e) { 2706 throw new RuntimeException(e); 2707 } 2708 } 2709 2710 super.pushActiveToPipeline(active, checkEmpty); 2711 if (START_TEST.get()) { 2712 snapshotLatch.countDown(); 2713 } 2714 } 2715 } 2716 2717 interface MyListHook { 2718 void hook(int currentSize); 2719 } 2720 2721 private static class MyList<T> implements List<T> { 2722 private final List<T> delegatee = new ArrayList<>(); 2723 private final MyListHook hookAtAdd; 2724 2725 MyList(final MyListHook hookAtAdd) { 2726 this.hookAtAdd = hookAtAdd; 2727 } 2728 2729 @Override 2730 public int size() { 2731 return delegatee.size(); 2732 } 2733 2734 @Override 2735 public boolean isEmpty() { 2736 return delegatee.isEmpty(); 2737 } 2738 2739 @Override 2740 public boolean contains(Object o) { 2741 return delegatee.contains(o); 2742 } 2743 2744 @Override 2745 public Iterator<T> iterator() { 2746 return delegatee.iterator(); 2747 } 2748 2749 @Override 2750 public Object[] toArray() { 2751 return delegatee.toArray(); 2752 } 2753 2754 @Override 2755 public <R> R[] toArray(R[] a) { 2756 return delegatee.toArray(a); 2757 } 2758 2759 @Override 2760 public boolean add(T e) { 2761 hookAtAdd.hook(size()); 2762 return delegatee.add(e); 2763 } 2764 2765 @Override 2766 public boolean remove(Object o) { 2767 return delegatee.remove(o); 2768 } 2769 2770 @Override 2771 public boolean containsAll(Collection<?> c) { 2772 return delegatee.containsAll(c); 2773 } 2774 2775 @Override 2776 public boolean addAll(Collection<? extends T> c) { 2777 return delegatee.addAll(c); 2778 } 2779 2780 @Override 2781 public boolean addAll(int index, Collection<? extends T> c) { 2782 return delegatee.addAll(index, c); 2783 } 2784 2785 @Override 2786 public boolean removeAll(Collection<?> c) { 2787 return delegatee.removeAll(c); 2788 } 2789 2790 @Override 2791 public boolean retainAll(Collection<?> c) { 2792 return delegatee.retainAll(c); 2793 } 2794 2795 @Override 2796 public void clear() { 2797 delegatee.clear(); 2798 } 2799 2800 @Override 2801 public T get(int index) { 2802 return delegatee.get(index); 2803 } 2804 2805 @Override 2806 public T set(int index, T element) { 2807 return delegatee.set(index, element); 2808 } 2809 2810 @Override 2811 public void add(int index, T element) { 2812 delegatee.add(index, element); 2813 } 2814 2815 @Override 2816 public T remove(int index) { 2817 return delegatee.remove(index); 2818 } 2819 2820 @Override 2821 public int indexOf(Object o) { 2822 return delegatee.indexOf(o); 2823 } 2824 2825 @Override 2826 public int lastIndexOf(Object o) { 2827 return delegatee.lastIndexOf(o); 2828 } 2829 2830 @Override 2831 public ListIterator<T> listIterator() { 2832 return delegatee.listIterator(); 2833 } 2834 2835 @Override 2836 public ListIterator<T> listIterator(int index) { 2837 return delegatee.listIterator(index); 2838 } 2839 2840 @Override 2841 public List<T> subList(int fromIndex, int toIndex) { 2842 return delegatee.subList(fromIndex, toIndex); 2843 } 2844 } 2845 2846 public static class MyCompactingMemStore2 extends CompactingMemStore { 2847 private static final String LARGE_CELL_THREAD_NAME = "largeCellThread"; 2848 private static final String SMALL_CELL_THREAD_NAME = "smallCellThread"; 2849 private final CyclicBarrier preCyclicBarrier = new CyclicBarrier(2); 2850 private final CyclicBarrier postCyclicBarrier = new CyclicBarrier(2); 2851 private final AtomicInteger largeCellPreUpdateCounter = new AtomicInteger(0); 2852 private final AtomicInteger smallCellPreUpdateCounter = new AtomicInteger(0); 2853 2854 public MyCompactingMemStore2(Configuration conf, CellComparatorImpl cellComparator, 2855 HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy) 2856 throws IOException { 2857 super(conf, cellComparator, store, regionServices, compactionPolicy); 2858 } 2859 2860 @Override 2861 protected boolean checkAndAddToActiveSize(MutableSegment currActive, Cell cellToAdd, 2862 MemStoreSizing memstoreSizing) { 2863 if (Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)) { 2864 int currentCount = largeCellPreUpdateCounter.incrementAndGet(); 2865 if (currentCount <= 1) { 2866 try { 2867 /** 2868 * smallCellThread enters CompactingMemStore.checkAndAddToActiveSize first, then 2869 * largeCellThread enters CompactingMemStore.checkAndAddToActiveSize, and then 2870 * largeCellThread invokes flushInMemory. 2871 */ 2872 preCyclicBarrier.await(); 2873 } catch (Throwable e) { 2874 throw new RuntimeException(e); 2875 } 2876 } 2877 } 2878 2879 boolean returnValue = super.checkAndAddToActiveSize(currActive, cellToAdd, memstoreSizing); 2880 if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) { 2881 try { 2882 preCyclicBarrier.await(); 2883 } catch (Throwable e) { 2884 throw new RuntimeException(e); 2885 } 2886 } 2887 return returnValue; 2888 } 2889 2890 @Override 2891 protected void doAdd(MutableSegment currentActive, Cell cell, MemStoreSizing memstoreSizing) { 2892 if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) { 2893 try { 2894 /** 2895 * After largeCellThread finished flushInMemory method, smallCellThread can add cell to 2896 * currentActive . That is to say when largeCellThread called flushInMemory method, 2897 * currentActive has no cell. 2898 */ 2899 postCyclicBarrier.await(); 2900 } catch (Throwable e) { 2901 throw new RuntimeException(e); 2902 } 2903 } 2904 super.doAdd(currentActive, cell, memstoreSizing); 2905 } 2906 2907 @Override 2908 protected void flushInMemory(MutableSegment currentActiveMutableSegment) { 2909 super.flushInMemory(currentActiveMutableSegment); 2910 if (Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)) { 2911 if (largeCellPreUpdateCounter.get() <= 1) { 2912 try { 2913 postCyclicBarrier.await(); 2914 } catch (Throwable e) { 2915 throw new RuntimeException(e); 2916 } 2917 } 2918 } 2919 } 2920 2921 } 2922 2923 public static class MyCompactingMemStore3 extends CompactingMemStore { 2924 private static final String LARGE_CELL_THREAD_NAME = "largeCellThread"; 2925 private static final String SMALL_CELL_THREAD_NAME = "smallCellThread"; 2926 2927 private final CyclicBarrier preCyclicBarrier = new CyclicBarrier(2); 2928 private final CyclicBarrier postCyclicBarrier = new CyclicBarrier(2); 2929 private final AtomicInteger flushCounter = new AtomicInteger(0); 2930 private static final int CELL_COUNT = 5; 2931 private boolean flushByteSizeLessThanSmallAndLargeCellSize = true; 2932 2933 public MyCompactingMemStore3(Configuration conf, CellComparatorImpl cellComparator, 2934 HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy) 2935 throws IOException { 2936 super(conf, cellComparator, store, regionServices, compactionPolicy); 2937 } 2938 2939 @Override 2940 protected boolean checkAndAddToActiveSize(MutableSegment currActive, Cell cellToAdd, 2941 MemStoreSizing memstoreSizing) { 2942 if (!flushByteSizeLessThanSmallAndLargeCellSize) { 2943 return super.checkAndAddToActiveSize(currActive, cellToAdd, memstoreSizing); 2944 } 2945 if (Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)) { 2946 try { 2947 preCyclicBarrier.await(); 2948 } catch (Throwable e) { 2949 throw new RuntimeException(e); 2950 } 2951 } 2952 2953 boolean returnValue = super.checkAndAddToActiveSize(currActive, cellToAdd, memstoreSizing); 2954 if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) { 2955 try { 2956 preCyclicBarrier.await(); 2957 } catch (Throwable e) { 2958 throw new RuntimeException(e); 2959 } 2960 } 2961 return returnValue; 2962 } 2963 2964 @Override 2965 protected void postUpdate(MutableSegment currentActiveMutableSegment) { 2966 super.postUpdate(currentActiveMutableSegment); 2967 if (!flushByteSizeLessThanSmallAndLargeCellSize) { 2968 try { 2969 postCyclicBarrier.await(); 2970 } catch (Throwable e) { 2971 throw new RuntimeException(e); 2972 } 2973 return; 2974 } 2975 2976 if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) { 2977 try { 2978 postCyclicBarrier.await(); 2979 } catch (Throwable e) { 2980 throw new RuntimeException(e); 2981 } 2982 } 2983 } 2984 2985 @Override 2986 protected void flushInMemory(MutableSegment currentActiveMutableSegment) { 2987 super.flushInMemory(currentActiveMutableSegment); 2988 flushCounter.incrementAndGet(); 2989 if (!flushByteSizeLessThanSmallAndLargeCellSize) { 2990 return; 2991 } 2992 2993 assertTrue(Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)); 2994 try { 2995 postCyclicBarrier.await(); 2996 } catch (Throwable e) { 2997 throw new RuntimeException(e); 2998 } 2999 3000 } 3001 3002 void disableCompaction() { 3003 allowCompaction.set(false); 3004 } 3005 3006 void enableCompaction() { 3007 allowCompaction.set(true); 3008 } 3009 3010 } 3011 3012 public static class MyCompactingMemStore4 extends CompactingMemStore { 3013 private static final String TAKE_SNAPSHOT_THREAD_NAME = "takeSnapShotThread"; 3014 /** 3015 * {@link CompactingMemStore#flattenOneSegment} must execute after 3016 * {@link CompactingMemStore#getImmutableSegments} 3017 */ 3018 private final CyclicBarrier flattenOneSegmentPreCyclicBarrier = new CyclicBarrier(2); 3019 /** 3020 * Only after {@link CompactingMemStore#flattenOneSegment} completed, 3021 * {@link CompactingMemStore#swapPipelineWithNull} could execute. 3022 */ 3023 private final CyclicBarrier flattenOneSegmentPostCyclicBarrier = new CyclicBarrier(2); 3024 /** 3025 * Only the in memory compact thread enters {@link CompactingMemStore#flattenOneSegment},the 3026 * snapshot thread starts {@link CompactingMemStore#snapshot},because 3027 * {@link CompactingMemStore#snapshot} would invoke {@link CompactingMemStore#stopCompaction}. 3028 */ 3029 private final CyclicBarrier snapShotStartCyclicCyclicBarrier = new CyclicBarrier(2); 3030 /** 3031 * To wait for {@link CompactingMemStore.InMemoryCompactionRunnable} stopping. 3032 */ 3033 private final CyclicBarrier inMemoryCompactionEndCyclicBarrier = new CyclicBarrier(2); 3034 private final AtomicInteger getImmutableSegmentsListCounter = new AtomicInteger(0); 3035 private final AtomicInteger swapPipelineWithNullCounter = new AtomicInteger(0); 3036 private final AtomicInteger flattenOneSegmentCounter = new AtomicInteger(0); 3037 private final AtomicInteger setInMemoryCompactionFlagCounter = new AtomicInteger(0); 3038 3039 public MyCompactingMemStore4(Configuration conf, CellComparatorImpl cellComparator, 3040 HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy) 3041 throws IOException { 3042 super(conf, cellComparator, store, regionServices, compactionPolicy); 3043 } 3044 3045 @Override 3046 public VersionedSegmentsList getImmutableSegments() { 3047 VersionedSegmentsList result = super.getImmutableSegments(); 3048 if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) { 3049 int currentCount = getImmutableSegmentsListCounter.incrementAndGet(); 3050 if (currentCount <= 1) { 3051 try { 3052 flattenOneSegmentPreCyclicBarrier.await(); 3053 } catch (Throwable e) { 3054 throw new RuntimeException(e); 3055 } 3056 } 3057 } 3058 return result; 3059 } 3060 3061 @Override 3062 protected boolean swapPipelineWithNull(VersionedSegmentsList segments) { 3063 if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) { 3064 int currentCount = swapPipelineWithNullCounter.incrementAndGet(); 3065 if (currentCount <= 1) { 3066 try { 3067 flattenOneSegmentPostCyclicBarrier.await(); 3068 } catch (Throwable e) { 3069 throw new RuntimeException(e); 3070 } 3071 } 3072 } 3073 boolean result = super.swapPipelineWithNull(segments); 3074 if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) { 3075 int currentCount = swapPipelineWithNullCounter.get(); 3076 if (currentCount <= 1) { 3077 assertTrue(!result); 3078 } 3079 if (currentCount == 2) { 3080 assertTrue(result); 3081 } 3082 } 3083 return result; 3084 3085 } 3086 3087 @Override 3088 public void flattenOneSegment(long requesterVersion, Action action) { 3089 int currentCount = flattenOneSegmentCounter.incrementAndGet(); 3090 if (currentCount <= 1) { 3091 try { 3092 /** 3093 * {@link CompactingMemStore#snapshot} could start. 3094 */ 3095 snapShotStartCyclicCyclicBarrier.await(); 3096 flattenOneSegmentPreCyclicBarrier.await(); 3097 } catch (Throwable e) { 3098 throw new RuntimeException(e); 3099 } 3100 } 3101 super.flattenOneSegment(requesterVersion, action); 3102 if (currentCount <= 1) { 3103 try { 3104 flattenOneSegmentPostCyclicBarrier.await(); 3105 } catch (Throwable e) { 3106 throw new RuntimeException(e); 3107 } 3108 } 3109 } 3110 3111 @Override 3112 protected boolean setInMemoryCompactionFlag() { 3113 boolean result = super.setInMemoryCompactionFlag(); 3114 assertTrue(result); 3115 setInMemoryCompactionFlagCounter.incrementAndGet(); 3116 return result; 3117 } 3118 3119 @Override 3120 void inMemoryCompaction() { 3121 try { 3122 super.inMemoryCompaction(); 3123 } finally { 3124 try { 3125 inMemoryCompactionEndCyclicBarrier.await(); 3126 } catch (Throwable e) { 3127 throw new RuntimeException(e); 3128 } 3129 3130 } 3131 } 3132 3133 } 3134 3135 public static class MyCompactingMemStore5 extends CompactingMemStore { 3136 private static final String TAKE_SNAPSHOT_THREAD_NAME = "takeSnapShotThread"; 3137 private static final String WRITE_AGAIN_THREAD_NAME = "writeAgainThread"; 3138 /** 3139 * {@link CompactingMemStore#flattenOneSegment} must execute after 3140 * {@link CompactingMemStore#getImmutableSegments} 3141 */ 3142 private final CyclicBarrier flattenOneSegmentPreCyclicBarrier = new CyclicBarrier(2); 3143 /** 3144 * Only after {@link CompactingMemStore#flattenOneSegment} completed, 3145 * {@link CompactingMemStore#swapPipelineWithNull} could execute. 3146 */ 3147 private final CyclicBarrier flattenOneSegmentPostCyclicBarrier = new CyclicBarrier(2); 3148 /** 3149 * Only the in memory compact thread enters {@link CompactingMemStore#flattenOneSegment},the 3150 * snapshot thread starts {@link CompactingMemStore#snapshot},because 3151 * {@link CompactingMemStore#snapshot} would invoke {@link CompactingMemStore#stopCompaction}. 3152 */ 3153 private final CyclicBarrier snapShotStartCyclicCyclicBarrier = new CyclicBarrier(2); 3154 /** 3155 * To wait for {@link CompactingMemStore.InMemoryCompactionRunnable} stopping. 3156 */ 3157 private final CyclicBarrier inMemoryCompactionEndCyclicBarrier = new CyclicBarrier(2); 3158 private final AtomicInteger getImmutableSegmentsListCounter = new AtomicInteger(0); 3159 private final AtomicInteger swapPipelineWithNullCounter = new AtomicInteger(0); 3160 private final AtomicInteger flattenOneSegmentCounter = new AtomicInteger(0); 3161 private final AtomicInteger setInMemoryCompactionFlagCounter = new AtomicInteger(0); 3162 /** 3163 * Only the snapshot thread retry {@link CompactingMemStore#swapPipelineWithNull}, writeAgain 3164 * thread could start. 3165 */ 3166 private final CyclicBarrier writeMemStoreAgainStartCyclicBarrier = new CyclicBarrier(2); 3167 /** 3168 * This is used for snapshot thread,writeAgain thread and in memory compact thread. Only the 3169 * writeAgain thread completes, {@link CompactingMemStore#swapPipelineWithNull} would 3170 * execute,and in memory compact thread would exit,because we expect that in memory compact 3171 * executing only once. 3172 */ 3173 private final CyclicBarrier writeMemStoreAgainEndCyclicBarrier = new CyclicBarrier(3); 3174 3175 public MyCompactingMemStore5(Configuration conf, CellComparatorImpl cellComparator, 3176 HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy) 3177 throws IOException { 3178 super(conf, cellComparator, store, regionServices, compactionPolicy); 3179 } 3180 3181 @Override 3182 public VersionedSegmentsList getImmutableSegments() { 3183 VersionedSegmentsList result = super.getImmutableSegments(); 3184 if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) { 3185 int currentCount = getImmutableSegmentsListCounter.incrementAndGet(); 3186 if (currentCount <= 1) { 3187 try { 3188 flattenOneSegmentPreCyclicBarrier.await(); 3189 } catch (Throwable e) { 3190 throw new RuntimeException(e); 3191 } 3192 } 3193 3194 } 3195 3196 return result; 3197 } 3198 3199 @Override 3200 protected boolean swapPipelineWithNull(VersionedSegmentsList segments) { 3201 if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) { 3202 int currentCount = swapPipelineWithNullCounter.incrementAndGet(); 3203 if (currentCount <= 1) { 3204 try { 3205 flattenOneSegmentPostCyclicBarrier.await(); 3206 } catch (Throwable e) { 3207 throw new RuntimeException(e); 3208 } 3209 } 3210 3211 if (currentCount == 2) { 3212 try { 3213 /** 3214 * Only the snapshot thread retry {@link CompactingMemStore#swapPipelineWithNull}, 3215 * writeAgain thread could start. 3216 */ 3217 writeMemStoreAgainStartCyclicBarrier.await(); 3218 /** 3219 * Only the writeAgain thread completes, retry 3220 * {@link CompactingMemStore#swapPipelineWithNull} would execute. 3221 */ 3222 writeMemStoreAgainEndCyclicBarrier.await(); 3223 } catch (Throwable e) { 3224 throw new RuntimeException(e); 3225 } 3226 } 3227 3228 } 3229 boolean result = super.swapPipelineWithNull(segments); 3230 if (Thread.currentThread().getName().equals(TAKE_SNAPSHOT_THREAD_NAME)) { 3231 int currentCount = swapPipelineWithNullCounter.get(); 3232 if (currentCount <= 1) { 3233 assertTrue(!result); 3234 } 3235 if (currentCount == 2) { 3236 assertTrue(result); 3237 } 3238 } 3239 return result; 3240 3241 } 3242 3243 @Override 3244 public void flattenOneSegment(long requesterVersion, Action action) { 3245 int currentCount = flattenOneSegmentCounter.incrementAndGet(); 3246 if (currentCount <= 1) { 3247 try { 3248 /** 3249 * {@link CompactingMemStore#snapshot} could start. 3250 */ 3251 snapShotStartCyclicCyclicBarrier.await(); 3252 flattenOneSegmentPreCyclicBarrier.await(); 3253 } catch (Throwable e) { 3254 throw new RuntimeException(e); 3255 } 3256 } 3257 super.flattenOneSegment(requesterVersion, action); 3258 if (currentCount <= 1) { 3259 try { 3260 flattenOneSegmentPostCyclicBarrier.await(); 3261 /** 3262 * Only the writeAgain thread completes, in memory compact thread would exit,because we 3263 * expect that in memory compact executing only once. 3264 */ 3265 writeMemStoreAgainEndCyclicBarrier.await(); 3266 } catch (Throwable e) { 3267 throw new RuntimeException(e); 3268 } 3269 3270 } 3271 } 3272 3273 @Override 3274 protected boolean setInMemoryCompactionFlag() { 3275 boolean result = super.setInMemoryCompactionFlag(); 3276 int count = setInMemoryCompactionFlagCounter.incrementAndGet(); 3277 if (count <= 1) { 3278 assertTrue(result); 3279 } 3280 if (count == 2) { 3281 assertTrue(!result); 3282 } 3283 return result; 3284 } 3285 3286 @Override 3287 void inMemoryCompaction() { 3288 try { 3289 super.inMemoryCompaction(); 3290 } finally { 3291 try { 3292 inMemoryCompactionEndCyclicBarrier.await(); 3293 } catch (Throwable e) { 3294 throw new RuntimeException(e); 3295 } 3296 3297 } 3298 } 3299 } 3300 3301 public static class MyCompactingMemStore6 extends CompactingMemStore { 3302 private final CyclicBarrier inMemoryCompactionEndCyclicBarrier = new CyclicBarrier(2); 3303 3304 public MyCompactingMemStore6(Configuration conf, CellComparatorImpl cellComparator, 3305 HStore store, RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy) 3306 throws IOException { 3307 super(conf, cellComparator, store, regionServices, compactionPolicy); 3308 } 3309 3310 @Override 3311 void inMemoryCompaction() { 3312 try { 3313 super.inMemoryCompaction(); 3314 } finally { 3315 try { 3316 inMemoryCompactionEndCyclicBarrier.await(); 3317 } catch (Throwable e) { 3318 throw new RuntimeException(e); 3319 } 3320 3321 } 3322 } 3323 } 3324 3325 public static class MyDefaultMemStore extends DefaultMemStore { 3326 private static final String GET_SCANNER_THREAD_NAME = "getScannerMyThread"; 3327 private static final String FLUSH_THREAD_NAME = "flushMyThread"; 3328 /** 3329 * Only when flush thread enters {@link DefaultMemStore#doClearSnapShot}, getScanner thread 3330 * could start. 3331 */ 3332 private final CyclicBarrier getScannerCyclicBarrier = new CyclicBarrier(2); 3333 /** 3334 * Used by getScanner thread notifies flush thread {@link DefaultMemStore#getSnapshotSegments} 3335 * completed, {@link DefaultMemStore#doClearSnapShot} could continue. 3336 */ 3337 private final CyclicBarrier preClearSnapShotCyclicBarrier = new CyclicBarrier(2); 3338 /** 3339 * Used by flush thread notifies getScanner thread {@link DefaultMemStore#doClearSnapShot} 3340 * completed, {@link DefaultMemStore#getScanners} could continue. 3341 */ 3342 private final CyclicBarrier postClearSnapShotCyclicBarrier = new CyclicBarrier(2); 3343 private final AtomicInteger getSnapshotSegmentsCounter = new AtomicInteger(0); 3344 private final AtomicInteger clearSnapshotCounter = new AtomicInteger(0); 3345 private volatile boolean shouldWait = true; 3346 private volatile HStore store = null; 3347 3348 public MyDefaultMemStore(Configuration conf, CellComparator cellComparator, 3349 RegionServicesForStores regionServices) throws IOException { 3350 super(conf, cellComparator, regionServices); 3351 } 3352 3353 @Override 3354 protected List<Segment> getSnapshotSegments() { 3355 3356 List<Segment> result = super.getSnapshotSegments(); 3357 3358 if (Thread.currentThread().getName().equals(GET_SCANNER_THREAD_NAME)) { 3359 int currentCount = getSnapshotSegmentsCounter.incrementAndGet(); 3360 if (currentCount == 1) { 3361 if (this.shouldWait) { 3362 try { 3363 /** 3364 * Notify flush thread {@link DefaultMemStore#getSnapshotSegments} completed, 3365 * {@link DefaultMemStore#doClearSnapShot} could continue. 3366 */ 3367 preClearSnapShotCyclicBarrier.await(); 3368 /** 3369 * Wait for {@link DefaultMemStore#doClearSnapShot} completed. 3370 */ 3371 postClearSnapShotCyclicBarrier.await(); 3372 3373 } catch (Throwable e) { 3374 throw new RuntimeException(e); 3375 } 3376 } 3377 } 3378 } 3379 return result; 3380 } 3381 3382 @Override 3383 protected void doClearSnapShot() { 3384 if (Thread.currentThread().getName().equals(FLUSH_THREAD_NAME)) { 3385 int currentCount = clearSnapshotCounter.incrementAndGet(); 3386 if (currentCount == 1) { 3387 try { 3388 if ( 3389 ((ReentrantReadWriteLock) store.getStoreEngine().getLock()) 3390 .isWriteLockedByCurrentThread() 3391 ) { 3392 shouldWait = false; 3393 } 3394 /** 3395 * Only when flush thread enters {@link DefaultMemStore#doClearSnapShot}, getScanner 3396 * thread could start. 3397 */ 3398 getScannerCyclicBarrier.await(); 3399 3400 if (shouldWait) { 3401 /** 3402 * Wait for {@link DefaultMemStore#getSnapshotSegments} completed. 3403 */ 3404 preClearSnapShotCyclicBarrier.await(); 3405 } 3406 } catch (Throwable e) { 3407 throw new RuntimeException(e); 3408 } 3409 } 3410 } 3411 super.doClearSnapShot(); 3412 3413 if (Thread.currentThread().getName().equals(FLUSH_THREAD_NAME)) { 3414 int currentCount = clearSnapshotCounter.get(); 3415 if (currentCount == 1) { 3416 if (shouldWait) { 3417 try { 3418 /** 3419 * Notify getScanner thread {@link DefaultMemStore#doClearSnapShot} completed, 3420 * {@link DefaultMemStore#getScanners} could continue. 3421 */ 3422 postClearSnapShotCyclicBarrier.await(); 3423 } catch (Throwable e) { 3424 throw new RuntimeException(e); 3425 } 3426 } 3427 } 3428 } 3429 } 3430 } 3431}