001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023import static org.junit.Assert.fail; 024import static org.mockito.ArgumentMatchers.any; 025import static org.mockito.Mockito.mock; 026import static org.mockito.Mockito.spy; 027import static org.mockito.Mockito.times; 028import static org.mockito.Mockito.verify; 029import static org.mockito.Mockito.when; 030 031import java.io.IOException; 032import java.lang.ref.SoftReference; 033import java.security.PrivilegedExceptionAction; 034import java.util.ArrayList; 035import java.util.Arrays; 036import java.util.Collection; 037import java.util.Collections; 038import java.util.Iterator; 039import java.util.List; 040import java.util.ListIterator; 041import java.util.NavigableSet; 042import java.util.TreeSet; 043import java.util.concurrent.ConcurrentSkipListSet; 044import java.util.concurrent.CountDownLatch; 045import java.util.concurrent.ExecutorService; 046import java.util.concurrent.Executors; 047import java.util.concurrent.ThreadPoolExecutor; 048import java.util.concurrent.TimeUnit; 049import java.util.concurrent.atomic.AtomicBoolean; 050import java.util.concurrent.atomic.AtomicInteger; 051import org.apache.hadoop.conf.Configuration; 052import org.apache.hadoop.fs.FSDataOutputStream; 053import org.apache.hadoop.fs.FileStatus; 054import org.apache.hadoop.fs.FileSystem; 055import org.apache.hadoop.fs.FilterFileSystem; 056import org.apache.hadoop.fs.LocalFileSystem; 057import org.apache.hadoop.fs.Path; 058import org.apache.hadoop.fs.permission.FsPermission; 059import org.apache.hadoop.hbase.Cell; 060import org.apache.hadoop.hbase.CellBuilderFactory; 061import org.apache.hadoop.hbase.CellBuilderType; 062import org.apache.hadoop.hbase.CellComparator; 063import org.apache.hadoop.hbase.CellComparatorImpl; 064import org.apache.hadoop.hbase.CellUtil; 065import org.apache.hadoop.hbase.HBaseClassTestRule; 066import org.apache.hadoop.hbase.HBaseConfiguration; 067import org.apache.hadoop.hbase.HBaseTestingUtility; 068import org.apache.hadoop.hbase.HConstants; 069import org.apache.hadoop.hbase.KeyValue; 070import org.apache.hadoop.hbase.MemoryCompactionPolicy; 071import org.apache.hadoop.hbase.NamespaceDescriptor; 072import org.apache.hadoop.hbase.PrivateCellUtil; 073import org.apache.hadoop.hbase.TableName; 074import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 075import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 076import org.apache.hadoop.hbase.client.Get; 077import org.apache.hadoop.hbase.client.RegionInfo; 078import org.apache.hadoop.hbase.client.RegionInfoBuilder; 079import org.apache.hadoop.hbase.client.Scan; 080import org.apache.hadoop.hbase.client.TableDescriptor; 081import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 082import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException; 083import org.apache.hadoop.hbase.filter.Filter; 084import org.apache.hadoop.hbase.filter.FilterBase; 085import org.apache.hadoop.hbase.io.compress.Compression; 086import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 087import org.apache.hadoop.hbase.io.hfile.CacheConfig; 088import org.apache.hadoop.hbase.io.hfile.HFile; 089import org.apache.hadoop.hbase.io.hfile.HFileContext; 090import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 091import org.apache.hadoop.hbase.monitoring.MonitoredTask; 092import org.apache.hadoop.hbase.quotas.RegionSizeStoreImpl; 093import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; 094import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; 095import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher; 096import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; 097import org.apache.hadoop.hbase.security.User; 098import org.apache.hadoop.hbase.testclassification.MediumTests; 099import org.apache.hadoop.hbase.testclassification.RegionServerTests; 100import org.apache.hadoop.hbase.util.Bytes; 101import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 102import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; 103import org.apache.hadoop.hbase.util.FSUtils; 104import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge; 105import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; 106import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 107import org.apache.hadoop.hbase.wal.WALFactory; 108import org.apache.hadoop.util.Progressable; 109import org.junit.After; 110import org.junit.AfterClass; 111import org.junit.Before; 112import org.junit.ClassRule; 113import org.junit.Rule; 114import org.junit.Test; 115import org.junit.experimental.categories.Category; 116import org.junit.rules.TestName; 117import org.mockito.Mockito; 118import org.slf4j.Logger; 119import org.slf4j.LoggerFactory; 120 121import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 122 123/** 124 * Test class for the HStore 125 */ 126@Category({ RegionServerTests.class, MediumTests.class }) 127public class TestHStore { 128 129 @ClassRule 130 public static final HBaseClassTestRule CLASS_RULE = 131 HBaseClassTestRule.forClass(TestHStore.class); 132 133 private static final Logger LOG = LoggerFactory.getLogger(TestHStore.class); 134 @Rule 135 public TestName name = new TestName(); 136 137 HRegion region; 138 HStore store; 139 byte [] table = Bytes.toBytes("table"); 140 byte [] family = Bytes.toBytes("family"); 141 142 byte [] row = Bytes.toBytes("row"); 143 byte [] row2 = Bytes.toBytes("row2"); 144 byte [] qf1 = Bytes.toBytes("qf1"); 145 byte [] qf2 = Bytes.toBytes("qf2"); 146 byte [] qf3 = Bytes.toBytes("qf3"); 147 byte [] qf4 = Bytes.toBytes("qf4"); 148 byte [] qf5 = Bytes.toBytes("qf5"); 149 byte [] qf6 = Bytes.toBytes("qf6"); 150 151 NavigableSet<byte[]> qualifiers = new ConcurrentSkipListSet<>(Bytes.BYTES_COMPARATOR); 152 153 List<Cell> expected = new ArrayList<>(); 154 List<Cell> result = new ArrayList<>(); 155 156 long id = System.currentTimeMillis(); 157 Get get = new Get(row); 158 159 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 160 private static final String DIR = TEST_UTIL.getDataTestDir("TestStore").toString(); 161 162 163 /** 164 * Setup 165 * @throws IOException 166 */ 167 @Before 168 public void setUp() throws IOException { 169 qualifiers.add(qf1); 170 qualifiers.add(qf3); 171 qualifiers.add(qf5); 172 173 Iterator<byte[]> iter = qualifiers.iterator(); 174 while(iter.hasNext()){ 175 byte [] next = iter.next(); 176 expected.add(new KeyValue(row, family, next, 1, (byte[])null)); 177 get.addColumn(family, next); 178 } 179 } 180 181 private void init(String methodName) throws IOException { 182 init(methodName, TEST_UTIL.getConfiguration()); 183 } 184 185 private HStore init(String methodName, Configuration conf) throws IOException { 186 // some of the tests write 4 versions and then flush 187 // (with HBASE-4241, lower versions are collected on flush) 188 return init(methodName, conf, 189 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(4).build()); 190 } 191 192 private HStore init(String methodName, Configuration conf, ColumnFamilyDescriptor hcd) 193 throws IOException { 194 return init(methodName, conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), hcd); 195 } 196 197 private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder, 198 ColumnFamilyDescriptor hcd) throws IOException { 199 return init(methodName, conf, builder, hcd, null); 200 } 201 202 private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder, 203 ColumnFamilyDescriptor hcd, MyStoreHook hook) throws IOException { 204 return init(methodName, conf, builder, hcd, hook, false); 205 } 206 207 private void initHRegion(String methodName, Configuration conf, TableDescriptorBuilder builder, 208 ColumnFamilyDescriptor hcd, MyStoreHook hook, boolean switchToPread) throws IOException { 209 TableDescriptor htd = builder.setColumnFamily(hcd).build(); 210 Path basedir = new Path(DIR + methodName); 211 Path tableDir = FSUtils.getTableDir(basedir, htd.getTableName()); 212 final Path logdir = new Path(basedir, AbstractFSWALProvider.getWALDirectoryName(methodName)); 213 214 FileSystem fs = FileSystem.get(conf); 215 216 fs.delete(logdir, true); 217 ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 218 MemStoreLABImpl.CHUNK_SIZE_DEFAULT, 1, 0, null); 219 RegionInfo info = RegionInfoBuilder.newBuilder(htd.getTableName()).build(); 220 Configuration walConf = new Configuration(conf); 221 FSUtils.setRootDir(walConf, basedir); 222 WALFactory wals = new WALFactory(walConf, methodName); 223 region = new HRegion(new HRegionFileSystem(conf, fs, tableDir, info), wals.getWAL(info), conf, 224 htd, null); 225 region.regionServicesForStores = Mockito.spy(region.regionServicesForStores); 226 ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1); 227 Mockito.when(region.regionServicesForStores.getInMemoryCompactionPool()).thenReturn(pool); 228 } 229 230 private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder, 231 ColumnFamilyDescriptor hcd, MyStoreHook hook, boolean switchToPread) throws IOException { 232 initHRegion(methodName, conf, builder, hcd, hook, switchToPread); 233 if (hook == null) { 234 store = new HStore(region, hcd, conf); 235 } else { 236 store = new MyStore(region, hcd, conf, hook, switchToPread); 237 } 238 return store; 239 } 240 241 /** 242 * Test we do not lose data if we fail a flush and then close. 243 * Part of HBase-10466 244 * @throws Exception 245 */ 246 @Test 247 public void testFlushSizeSizing() throws Exception { 248 LOG.info("Setting up a faulty file system that cannot write in " + this.name.getMethodName()); 249 final Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); 250 // Only retry once. 251 conf.setInt("hbase.hstore.flush.retries.number", 1); 252 User user = User.createUserForTesting(conf, this.name.getMethodName(), 253 new String[]{"foo"}); 254 // Inject our faulty LocalFileSystem 255 conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class); 256 user.runAs(new PrivilegedExceptionAction<Object>() { 257 @Override 258 public Object run() throws Exception { 259 // Make sure it worked (above is sensitive to caching details in hadoop core) 260 FileSystem fs = FileSystem.get(conf); 261 assertEquals(FaultyFileSystem.class, fs.getClass()); 262 FaultyFileSystem ffs = (FaultyFileSystem)fs; 263 264 // Initialize region 265 init(name.getMethodName(), conf); 266 267 MemStoreSize mss = store.memstore.getFlushableSize(); 268 assertEquals(0, mss.getDataSize()); 269 LOG.info("Adding some data"); 270 MemStoreSizing kvSize = new NonThreadSafeMemStoreSizing(); 271 store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), kvSize); 272 // add the heap size of active (mutable) segment 273 kvSize.incMemStoreSize(0, MutableSegment.DEEP_OVERHEAD, 0, 0); 274 mss = store.memstore.getFlushableSize(); 275 assertEquals(kvSize.getMemStoreSize(), mss); 276 // Flush. Bug #1 from HBASE-10466. Make sure size calculation on failed flush is right. 277 try { 278 LOG.info("Flushing"); 279 flushStore(store, id++); 280 fail("Didn't bubble up IOE!"); 281 } catch (IOException ioe) { 282 assertTrue(ioe.getMessage().contains("Fault injected")); 283 } 284 // due to snapshot, change mutable to immutable segment 285 kvSize.incMemStoreSize(0, 286 CSLMImmutableSegment.DEEP_OVERHEAD_CSLM - MutableSegment.DEEP_OVERHEAD, 0, 0); 287 mss = store.memstore.getFlushableSize(); 288 assertEquals(kvSize.getMemStoreSize(), mss); 289 MemStoreSizing kvSize2 = new NonThreadSafeMemStoreSizing(); 290 store.add(new KeyValue(row, family, qf2, 2, (byte[]) null), kvSize2); 291 kvSize2.incMemStoreSize(0, MutableSegment.DEEP_OVERHEAD, 0, 0); 292 // Even though we add a new kv, we expect the flushable size to be 'same' since we have 293 // not yet cleared the snapshot -- the above flush failed. 294 assertEquals(kvSize.getMemStoreSize(), mss); 295 ffs.fault.set(false); 296 flushStore(store, id++); 297 mss = store.memstore.getFlushableSize(); 298 // Size should be the foreground kv size. 299 assertEquals(kvSize2.getMemStoreSize(), mss); 300 flushStore(store, id++); 301 mss = store.memstore.getFlushableSize(); 302 assertEquals(0, mss.getDataSize()); 303 assertEquals(MutableSegment.DEEP_OVERHEAD, mss.getHeapSize()); 304 return null; 305 } 306 }); 307 } 308 309 /** 310 * Verify that compression and data block encoding are respected by the 311 * Store.createWriterInTmp() method, used on store flush. 312 */ 313 @Test 314 public void testCreateWriter() throws Exception { 315 Configuration conf = HBaseConfiguration.create(); 316 FileSystem fs = FileSystem.get(conf); 317 318 ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.newBuilder(family) 319 .setCompressionType(Compression.Algorithm.GZ).setDataBlockEncoding(DataBlockEncoding.DIFF) 320 .build(); 321 init(name.getMethodName(), conf, hcd); 322 323 // Test createWriterInTmp() 324 StoreFileWriter writer = 325 store.createWriterInTmp(4, hcd.getCompressionType(), false, true, false, false); 326 Path path = writer.getPath(); 327 writer.append(new KeyValue(row, family, qf1, Bytes.toBytes(1))); 328 writer.append(new KeyValue(row, family, qf2, Bytes.toBytes(2))); 329 writer.append(new KeyValue(row2, family, qf1, Bytes.toBytes(3))); 330 writer.append(new KeyValue(row2, family, qf2, Bytes.toBytes(4))); 331 writer.close(); 332 333 // Verify that compression and encoding settings are respected 334 HFile.Reader reader = HFile.createReader(fs, path, new CacheConfig(conf), true, conf); 335 assertEquals(hcd.getCompressionType(), reader.getCompressionAlgorithm()); 336 assertEquals(hcd.getDataBlockEncoding(), reader.getDataBlockEncoding()); 337 reader.close(); 338 } 339 340 @Test 341 public void testDeleteExpiredStoreFiles() throws Exception { 342 testDeleteExpiredStoreFiles(0); 343 testDeleteExpiredStoreFiles(1); 344 } 345 346 /* 347 * @param minVersions the MIN_VERSIONS for the column family 348 */ 349 public void testDeleteExpiredStoreFiles(int minVersions) throws Exception { 350 int storeFileNum = 4; 351 int ttl = 4; 352 IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge(); 353 EnvironmentEdgeManagerTestHelper.injectEdge(edge); 354 355 Configuration conf = HBaseConfiguration.create(); 356 // Enable the expired store file deletion 357 conf.setBoolean("hbase.store.delete.expired.storefile", true); 358 // Set the compaction threshold higher to avoid normal compactions. 359 conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 5); 360 361 init(name.getMethodName() + "-" + minVersions, conf, ColumnFamilyDescriptorBuilder 362 .newBuilder(family).setMinVersions(minVersions).setTimeToLive(ttl).build()); 363 364 long storeTtl = this.store.getScanInfo().getTtl(); 365 long sleepTime = storeTtl / storeFileNum; 366 long timeStamp; 367 // There are 4 store files and the max time stamp difference among these 368 // store files will be (this.store.ttl / storeFileNum) 369 for (int i = 1; i <= storeFileNum; i++) { 370 LOG.info("Adding some data for the store file #" + i); 371 timeStamp = EnvironmentEdgeManager.currentTime(); 372 this.store.add(new KeyValue(row, family, qf1, timeStamp, (byte[]) null), null); 373 this.store.add(new KeyValue(row, family, qf2, timeStamp, (byte[]) null), null); 374 this.store.add(new KeyValue(row, family, qf3, timeStamp, (byte[]) null), null); 375 flush(i); 376 edge.incrementTime(sleepTime); 377 } 378 379 // Verify the total number of store files 380 assertEquals(storeFileNum, this.store.getStorefiles().size()); 381 382 // Each call will find one expired store file and delete it before compaction happens. 383 // There will be no compaction due to threshold above. Last file will not be replaced. 384 for (int i = 1; i <= storeFileNum - 1; i++) { 385 // verify the expired store file. 386 assertFalse(this.store.requestCompaction().isPresent()); 387 Collection<HStoreFile> sfs = this.store.getStorefiles(); 388 // Ensure i files are gone. 389 if (minVersions == 0) { 390 assertEquals(storeFileNum - i, sfs.size()); 391 // Ensure only non-expired files remain. 392 for (HStoreFile sf : sfs) { 393 assertTrue(sf.getReader().getMaxTimestamp() >= (edge.currentTime() - storeTtl)); 394 } 395 } else { 396 assertEquals(storeFileNum, sfs.size()); 397 } 398 // Let the next store file expired. 399 edge.incrementTime(sleepTime); 400 } 401 assertFalse(this.store.requestCompaction().isPresent()); 402 403 Collection<HStoreFile> sfs = this.store.getStorefiles(); 404 // Assert the last expired file is not removed. 405 if (minVersions == 0) { 406 assertEquals(1, sfs.size()); 407 } 408 long ts = sfs.iterator().next().getReader().getMaxTimestamp(); 409 assertTrue(ts < (edge.currentTime() - storeTtl)); 410 411 for (HStoreFile sf : sfs) { 412 sf.closeStoreFile(true); 413 } 414 } 415 416 @Test 417 public void testLowestModificationTime() throws Exception { 418 Configuration conf = HBaseConfiguration.create(); 419 FileSystem fs = FileSystem.get(conf); 420 // Initialize region 421 init(name.getMethodName(), conf); 422 423 int storeFileNum = 4; 424 for (int i = 1; i <= storeFileNum; i++) { 425 LOG.info("Adding some data for the store file #"+i); 426 this.store.add(new KeyValue(row, family, qf1, i, (byte[])null), null); 427 this.store.add(new KeyValue(row, family, qf2, i, (byte[])null), null); 428 this.store.add(new KeyValue(row, family, qf3, i, (byte[])null), null); 429 flush(i); 430 } 431 // after flush; check the lowest time stamp 432 long lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles()); 433 long lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles()); 434 assertEquals(lowestTimeStampFromManager,lowestTimeStampFromFS); 435 436 // after compact; check the lowest time stamp 437 store.compact(store.requestCompaction().get(), NoLimitThroughputController.INSTANCE, null); 438 lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles()); 439 lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles()); 440 assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS); 441 } 442 443 private static long getLowestTimeStampFromFS(FileSystem fs, 444 final Collection<HStoreFile> candidates) throws IOException { 445 long minTs = Long.MAX_VALUE; 446 if (candidates.isEmpty()) { 447 return minTs; 448 } 449 Path[] p = new Path[candidates.size()]; 450 int i = 0; 451 for (HStoreFile sf : candidates) { 452 p[i] = sf.getPath(); 453 ++i; 454 } 455 456 FileStatus[] stats = fs.listStatus(p); 457 if (stats == null || stats.length == 0) { 458 return minTs; 459 } 460 for (FileStatus s : stats) { 461 minTs = Math.min(minTs, s.getModificationTime()); 462 } 463 return minTs; 464 } 465 466 ////////////////////////////////////////////////////////////////////////////// 467 // Get tests 468 ////////////////////////////////////////////////////////////////////////////// 469 470 private static final int BLOCKSIZE_SMALL = 8192; 471 /** 472 * Test for hbase-1686. 473 * @throws IOException 474 */ 475 @Test 476 public void testEmptyStoreFile() throws IOException { 477 init(this.name.getMethodName()); 478 // Write a store file. 479 this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null); 480 this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null), null); 481 flush(1); 482 // Now put in place an empty store file. Its a little tricky. Have to 483 // do manually with hacked in sequence id. 484 HStoreFile f = this.store.getStorefiles().iterator().next(); 485 Path storedir = f.getPath().getParent(); 486 long seqid = f.getMaxSequenceId(); 487 Configuration c = HBaseConfiguration.create(); 488 FileSystem fs = FileSystem.get(c); 489 HFileContext meta = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).build(); 490 StoreFileWriter w = new StoreFileWriter.Builder(c, new CacheConfig(c), 491 fs) 492 .withOutputDir(storedir) 493 .withFileContext(meta) 494 .build(); 495 w.appendMetadata(seqid + 1, false); 496 w.close(); 497 this.store.close(); 498 // Reopen it... should pick up two files 499 this.store = new HStore(this.store.getHRegion(), this.store.getColumnFamilyDescriptor(), c); 500 assertEquals(2, this.store.getStorefilesCount()); 501 502 result = HBaseTestingUtility.getFromStoreFile(store, 503 get.getRow(), 504 qualifiers); 505 assertEquals(1, result.size()); 506 } 507 508 /** 509 * Getting data from memstore only 510 * @throws IOException 511 */ 512 @Test 513 public void testGet_FromMemStoreOnly() throws IOException { 514 init(this.name.getMethodName()); 515 516 //Put data in memstore 517 this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null); 518 this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null), null); 519 this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null), null); 520 this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null), null); 521 this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null), null); 522 this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null), null); 523 524 //Get 525 result = HBaseTestingUtility.getFromStoreFile(store, 526 get.getRow(), qualifiers); 527 528 //Compare 529 assertCheck(); 530 } 531 532 @Test 533 public void testTimeRangeIfSomeCellsAreDroppedInFlush() throws IOException { 534 testTimeRangeIfSomeCellsAreDroppedInFlush(1); 535 testTimeRangeIfSomeCellsAreDroppedInFlush(3); 536 testTimeRangeIfSomeCellsAreDroppedInFlush(5); 537 } 538 539 private void testTimeRangeIfSomeCellsAreDroppedInFlush(int maxVersion) throws IOException { 540 init(this.name.getMethodName(), TEST_UTIL.getConfiguration(), 541 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(maxVersion).build()); 542 long currentTs = 100; 543 long minTs = currentTs; 544 // the extra cell won't be flushed to disk, 545 // so the min of timerange will be different between memStore and hfile. 546 for (int i = 0; i != (maxVersion + 1); ++i) { 547 this.store.add(new KeyValue(row, family, qf1, ++currentTs, (byte[])null), null); 548 if (i == 1) { 549 minTs = currentTs; 550 } 551 } 552 flushStore(store, id++); 553 554 Collection<HStoreFile> files = store.getStorefiles(); 555 assertEquals(1, files.size()); 556 HStoreFile f = files.iterator().next(); 557 f.initReader(); 558 StoreFileReader reader = f.getReader(); 559 assertEquals(minTs, reader.timeRange.getMin()); 560 assertEquals(currentTs, reader.timeRange.getMax()); 561 } 562 563 /** 564 * Getting data from files only 565 * @throws IOException 566 */ 567 @Test 568 public void testGet_FromFilesOnly() throws IOException { 569 init(this.name.getMethodName()); 570 571 //Put data in memstore 572 this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null); 573 this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null), null); 574 //flush 575 flush(1); 576 577 //Add more data 578 this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null), null); 579 this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null), null); 580 //flush 581 flush(2); 582 583 //Add more data 584 this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null), null); 585 this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null), null); 586 //flush 587 flush(3); 588 589 //Get 590 result = HBaseTestingUtility.getFromStoreFile(store, 591 get.getRow(), 592 qualifiers); 593 //this.store.get(get, qualifiers, result); 594 595 //Need to sort the result since multiple files 596 Collections.sort(result, CellComparatorImpl.COMPARATOR); 597 598 //Compare 599 assertCheck(); 600 } 601 602 /** 603 * Getting data from memstore and files 604 * @throws IOException 605 */ 606 @Test 607 public void testGet_FromMemStoreAndFiles() throws IOException { 608 init(this.name.getMethodName()); 609 610 //Put data in memstore 611 this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null); 612 this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null), null); 613 //flush 614 flush(1); 615 616 //Add more data 617 this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null), null); 618 this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null), null); 619 //flush 620 flush(2); 621 622 //Add more data 623 this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null), null); 624 this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null), null); 625 626 //Get 627 result = HBaseTestingUtility.getFromStoreFile(store, 628 get.getRow(), qualifiers); 629 630 //Need to sort the result since multiple files 631 Collections.sort(result, CellComparatorImpl.COMPARATOR); 632 633 //Compare 634 assertCheck(); 635 } 636 637 private void flush(int storeFilessize) throws IOException{ 638 this.store.snapshot(); 639 flushStore(store, id++); 640 assertEquals(storeFilessize, this.store.getStorefiles().size()); 641 assertEquals(0, ((AbstractMemStore)this.store.memstore).getActive().getCellsCount()); 642 } 643 644 private void assertCheck() { 645 assertEquals(expected.size(), result.size()); 646 for(int i=0; i<expected.size(); i++) { 647 assertEquals(expected.get(i), result.get(i)); 648 } 649 } 650 651 @After 652 public void tearDown() throws Exception { 653 EnvironmentEdgeManagerTestHelper.reset(); 654 if (store != null) { 655 try { 656 store.close(); 657 } catch (IOException e) { 658 } 659 store = null; 660 } 661 if (region != null) { 662 region.close(); 663 region = null; 664 } 665 } 666 667 @AfterClass 668 public static void tearDownAfterClass() throws IOException { 669 TEST_UTIL.cleanupTestDir(); 670 } 671 672 @Test 673 public void testHandleErrorsInFlush() throws Exception { 674 LOG.info("Setting up a faulty file system that cannot write"); 675 676 final Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); 677 User user = User.createUserForTesting(conf, 678 "testhandleerrorsinflush", new String[]{"foo"}); 679 // Inject our faulty LocalFileSystem 680 conf.setClass("fs.file.impl", FaultyFileSystem.class, 681 FileSystem.class); 682 user.runAs(new PrivilegedExceptionAction<Object>() { 683 @Override 684 public Object run() throws Exception { 685 // Make sure it worked (above is sensitive to caching details in hadoop core) 686 FileSystem fs = FileSystem.get(conf); 687 assertEquals(FaultyFileSystem.class, fs.getClass()); 688 689 // Initialize region 690 init(name.getMethodName(), conf); 691 692 LOG.info("Adding some data"); 693 store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null); 694 store.add(new KeyValue(row, family, qf2, 1, (byte[])null), null); 695 store.add(new KeyValue(row, family, qf3, 1, (byte[])null), null); 696 697 LOG.info("Before flush, we should have no files"); 698 699 Collection<StoreFileInfo> files = 700 store.getRegionFileSystem().getStoreFiles(store.getColumnFamilyName()); 701 assertEquals(0, files != null ? files.size() : 0); 702 703 //flush 704 try { 705 LOG.info("Flushing"); 706 flush(1); 707 fail("Didn't bubble up IOE!"); 708 } catch (IOException ioe) { 709 assertTrue(ioe.getMessage().contains("Fault injected")); 710 } 711 712 LOG.info("After failed flush, we should still have no files!"); 713 files = store.getRegionFileSystem().getStoreFiles(store.getColumnFamilyName()); 714 assertEquals(0, files != null ? files.size() : 0); 715 store.getHRegion().getWAL().close(); 716 return null; 717 } 718 }); 719 FileSystem.closeAllForUGI(user.getUGI()); 720 } 721 722 /** 723 * Faulty file system that will fail if you write past its fault position the FIRST TIME 724 * only; thereafter it will succeed. Used by {@link TestHRegion} too. 725 */ 726 static class FaultyFileSystem extends FilterFileSystem { 727 List<SoftReference<FaultyOutputStream>> outStreams = new ArrayList<>(); 728 private long faultPos = 200; 729 AtomicBoolean fault = new AtomicBoolean(true); 730 731 public FaultyFileSystem() { 732 super(new LocalFileSystem()); 733 System.err.println("Creating faulty!"); 734 } 735 736 @Override 737 public FSDataOutputStream create(Path p) throws IOException { 738 return new FaultyOutputStream(super.create(p), faultPos, fault); 739 } 740 741 @Override 742 public FSDataOutputStream create(Path f, FsPermission permission, 743 boolean overwrite, int bufferSize, short replication, long blockSize, 744 Progressable progress) throws IOException { 745 return new FaultyOutputStream(super.create(f, permission, 746 overwrite, bufferSize, replication, blockSize, progress), faultPos, fault); 747 } 748 749 @Override 750 public FSDataOutputStream createNonRecursive(Path f, boolean overwrite, 751 int bufferSize, short replication, long blockSize, Progressable progress) 752 throws IOException { 753 // Fake it. Call create instead. The default implementation throws an IOE 754 // that this is not supported. 755 return create(f, overwrite, bufferSize, replication, blockSize, progress); 756 } 757 } 758 759 static class FaultyOutputStream extends FSDataOutputStream { 760 volatile long faultPos = Long.MAX_VALUE; 761 private final AtomicBoolean fault; 762 763 public FaultyOutputStream(FSDataOutputStream out, long faultPos, final AtomicBoolean fault) 764 throws IOException { 765 super(out, null); 766 this.faultPos = faultPos; 767 this.fault = fault; 768 } 769 770 @Override 771 public synchronized void write(byte[] buf, int offset, int length) throws IOException { 772 System.err.println("faulty stream write at pos " + getPos()); 773 injectFault(); 774 super.write(buf, offset, length); 775 } 776 777 private void injectFault() throws IOException { 778 if (this.fault.get() && getPos() >= faultPos) { 779 throw new IOException("Fault injected"); 780 } 781 } 782 } 783 784 private static void flushStore(HStore store, long id) throws IOException { 785 StoreFlushContext storeFlushCtx = store.createFlushContext(id, FlushLifeCycleTracker.DUMMY); 786 storeFlushCtx.prepare(); 787 storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class)); 788 storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); 789 } 790 791 /** 792 * Generate a list of KeyValues for testing based on given parameters 793 * @param timestamps 794 * @param numRows 795 * @param qualifier 796 * @param family 797 * @return the rows key-value list 798 */ 799 List<Cell> getKeyValueSet(long[] timestamps, int numRows, 800 byte[] qualifier, byte[] family) { 801 List<Cell> kvList = new ArrayList<>(); 802 for (int i=1;i<=numRows;i++) { 803 byte[] b = Bytes.toBytes(i); 804 for (long timestamp: timestamps) { 805 kvList.add(new KeyValue(b, family, qualifier, timestamp, b)); 806 } 807 } 808 return kvList; 809 } 810 811 /** 812 * Test to ensure correctness when using Stores with multiple timestamps 813 * @throws IOException 814 */ 815 @Test 816 public void testMultipleTimestamps() throws IOException { 817 int numRows = 1; 818 long[] timestamps1 = new long[] {1,5,10,20}; 819 long[] timestamps2 = new long[] {30,80}; 820 821 init(this.name.getMethodName()); 822 823 List<Cell> kvList1 = getKeyValueSet(timestamps1,numRows, qf1, family); 824 for (Cell kv : kvList1) { 825 this.store.add(kv, null); 826 } 827 828 this.store.snapshot(); 829 flushStore(store, id++); 830 831 List<Cell> kvList2 = getKeyValueSet(timestamps2,numRows, qf1, family); 832 for(Cell kv : kvList2) { 833 this.store.add(kv, null); 834 } 835 836 List<Cell> result; 837 Get get = new Get(Bytes.toBytes(1)); 838 get.addColumn(family,qf1); 839 840 get.setTimeRange(0,15); 841 result = HBaseTestingUtility.getFromStoreFile(store, get); 842 assertTrue(result.size()>0); 843 844 get.setTimeRange(40,90); 845 result = HBaseTestingUtility.getFromStoreFile(store, get); 846 assertTrue(result.size()>0); 847 848 get.setTimeRange(10,45); 849 result = HBaseTestingUtility.getFromStoreFile(store, get); 850 assertTrue(result.size()>0); 851 852 get.setTimeRange(80,145); 853 result = HBaseTestingUtility.getFromStoreFile(store, get); 854 assertTrue(result.size()>0); 855 856 get.setTimeRange(1,2); 857 result = HBaseTestingUtility.getFromStoreFile(store, get); 858 assertTrue(result.size()>0); 859 860 get.setTimeRange(90,200); 861 result = HBaseTestingUtility.getFromStoreFile(store, get); 862 assertTrue(result.size()==0); 863 } 864 865 /** 866 * Test for HBASE-3492 - Test split on empty colfam (no store files). 867 * 868 * @throws IOException When the IO operations fail. 869 */ 870 @Test 871 public void testSplitWithEmptyColFam() throws IOException { 872 init(this.name.getMethodName()); 873 assertFalse(store.getSplitPoint().isPresent()); 874 store.getHRegion().forceSplit(null); 875 assertFalse(store.getSplitPoint().isPresent()); 876 store.getHRegion().clearSplit(); 877 } 878 879 @Test 880 public void testStoreUsesConfigurationFromHcdAndHtd() throws Exception { 881 final String CONFIG_KEY = "hbase.regionserver.thread.compaction.throttle"; 882 long anyValue = 10; 883 884 // We'll check that it uses correct config and propagates it appropriately by going thru 885 // the simplest "real" path I can find - "throttleCompaction", which just checks whether 886 // a number we pass in is higher than some config value, inside compactionPolicy. 887 Configuration conf = HBaseConfiguration.create(); 888 conf.setLong(CONFIG_KEY, anyValue); 889 init(name.getMethodName() + "-xml", conf); 890 assertTrue(store.throttleCompaction(anyValue + 1)); 891 assertFalse(store.throttleCompaction(anyValue)); 892 893 // HTD overrides XML. 894 --anyValue; 895 init(name.getMethodName() + "-htd", conf, TableDescriptorBuilder 896 .newBuilder(TableName.valueOf(table)).setValue(CONFIG_KEY, Long.toString(anyValue)), 897 ColumnFamilyDescriptorBuilder.of(family)); 898 assertTrue(store.throttleCompaction(anyValue + 1)); 899 assertFalse(store.throttleCompaction(anyValue)); 900 901 // HCD overrides them both. 902 --anyValue; 903 init(name.getMethodName() + "-hcd", conf, 904 TableDescriptorBuilder.newBuilder(TableName.valueOf(table)).setValue(CONFIG_KEY, 905 Long.toString(anyValue)), 906 ColumnFamilyDescriptorBuilder.newBuilder(family).setValue(CONFIG_KEY, Long.toString(anyValue)) 907 .build()); 908 assertTrue(store.throttleCompaction(anyValue + 1)); 909 assertFalse(store.throttleCompaction(anyValue)); 910 } 911 912 public static class DummyStoreEngine extends DefaultStoreEngine { 913 public static DefaultCompactor lastCreatedCompactor = null; 914 915 @Override 916 protected void createComponents(Configuration conf, HStore store, CellComparator comparator) 917 throws IOException { 918 super.createComponents(conf, store, comparator); 919 lastCreatedCompactor = this.compactor; 920 } 921 } 922 923 @Test 924 public void testStoreUsesSearchEngineOverride() throws Exception { 925 Configuration conf = HBaseConfiguration.create(); 926 conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DummyStoreEngine.class.getName()); 927 init(this.name.getMethodName(), conf); 928 assertEquals(DummyStoreEngine.lastCreatedCompactor, 929 this.store.storeEngine.getCompactor()); 930 } 931 932 private void addStoreFile() throws IOException { 933 HStoreFile f = this.store.getStorefiles().iterator().next(); 934 Path storedir = f.getPath().getParent(); 935 long seqid = this.store.getMaxSequenceId().orElse(0L); 936 Configuration c = TEST_UTIL.getConfiguration(); 937 FileSystem fs = FileSystem.get(c); 938 HFileContext fileContext = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).build(); 939 StoreFileWriter w = new StoreFileWriter.Builder(c, new CacheConfig(c), 940 fs) 941 .withOutputDir(storedir) 942 .withFileContext(fileContext) 943 .build(); 944 w.appendMetadata(seqid + 1, false); 945 w.close(); 946 LOG.info("Added store file:" + w.getPath()); 947 } 948 949 private void archiveStoreFile(int index) throws IOException { 950 Collection<HStoreFile> files = this.store.getStorefiles(); 951 HStoreFile sf = null; 952 Iterator<HStoreFile> it = files.iterator(); 953 for (int i = 0; i <= index; i++) { 954 sf = it.next(); 955 } 956 store.getRegionFileSystem().removeStoreFiles(store.getColumnFamilyName(), Lists.newArrayList(sf)); 957 } 958 959 private void closeCompactedFile(int index) throws IOException { 960 Collection<HStoreFile> files = 961 this.store.getStoreEngine().getStoreFileManager().getCompactedfiles(); 962 HStoreFile sf = null; 963 Iterator<HStoreFile> it = files.iterator(); 964 for (int i = 0; i <= index; i++) { 965 sf = it.next(); 966 } 967 sf.closeStoreFile(true); 968 store.getStoreEngine().getStoreFileManager().removeCompactedFiles(Lists.newArrayList(sf)); 969 } 970 971 @Test 972 public void testRefreshStoreFiles() throws Exception { 973 init(name.getMethodName()); 974 975 assertEquals(0, this.store.getStorefilesCount()); 976 977 // Test refreshing store files when no store files are there 978 store.refreshStoreFiles(); 979 assertEquals(0, this.store.getStorefilesCount()); 980 981 // add some data, flush 982 this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null); 983 flush(1); 984 assertEquals(1, this.store.getStorefilesCount()); 985 986 // add one more file 987 addStoreFile(); 988 989 assertEquals(1, this.store.getStorefilesCount()); 990 store.refreshStoreFiles(); 991 assertEquals(2, this.store.getStorefilesCount()); 992 993 // add three more files 994 addStoreFile(); 995 addStoreFile(); 996 addStoreFile(); 997 998 assertEquals(2, this.store.getStorefilesCount()); 999 store.refreshStoreFiles(); 1000 assertEquals(5, this.store.getStorefilesCount()); 1001 1002 closeCompactedFile(0); 1003 archiveStoreFile(0); 1004 1005 assertEquals(5, this.store.getStorefilesCount()); 1006 store.refreshStoreFiles(); 1007 assertEquals(4, this.store.getStorefilesCount()); 1008 1009 archiveStoreFile(0); 1010 archiveStoreFile(1); 1011 archiveStoreFile(2); 1012 1013 assertEquals(4, this.store.getStorefilesCount()); 1014 store.refreshStoreFiles(); 1015 assertEquals(1, this.store.getStorefilesCount()); 1016 1017 archiveStoreFile(0); 1018 store.refreshStoreFiles(); 1019 assertEquals(0, this.store.getStorefilesCount()); 1020 } 1021 1022 @Test 1023 public void testRefreshStoreFilesNotChanged() throws IOException { 1024 init(name.getMethodName()); 1025 1026 assertEquals(0, this.store.getStorefilesCount()); 1027 1028 // add some data, flush 1029 this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null); 1030 flush(1); 1031 // add one more file 1032 addStoreFile(); 1033 1034 HStore spiedStore = spy(store); 1035 1036 // call first time after files changed 1037 spiedStore.refreshStoreFiles(); 1038 assertEquals(2, this.store.getStorefilesCount()); 1039 verify(spiedStore, times(1)).replaceStoreFiles(any(), any()); 1040 1041 // call second time 1042 spiedStore.refreshStoreFiles(); 1043 1044 //ensure that replaceStoreFiles is not called if files are not refreshed 1045 verify(spiedStore, times(0)).replaceStoreFiles(null, null); 1046 } 1047 1048 private long countMemStoreScanner(StoreScanner scanner) { 1049 if (scanner.currentScanners == null) { 1050 return 0; 1051 } 1052 return scanner.currentScanners.stream() 1053 .filter(s -> !s.isFileScanner()) 1054 .count(); 1055 } 1056 1057 @Test 1058 public void testNumberOfMemStoreScannersAfterFlush() throws IOException { 1059 long seqId = 100; 1060 long timestamp = System.currentTimeMillis(); 1061 Cell cell0 = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family) 1062 .setQualifier(qf1).setTimestamp(timestamp).setType(Cell.Type.Put) 1063 .setValue(qf1).build(); 1064 PrivateCellUtil.setSequenceId(cell0, seqId); 1065 testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Collections.emptyList()); 1066 1067 Cell cell1 = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family) 1068 .setQualifier(qf2).setTimestamp(timestamp).setType(Cell.Type.Put) 1069 .setValue(qf1).build(); 1070 PrivateCellUtil.setSequenceId(cell1, seqId); 1071 testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Arrays.asList(cell1)); 1072 1073 seqId = 101; 1074 timestamp = System.currentTimeMillis(); 1075 Cell cell2 = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row2).setFamily(family) 1076 .setQualifier(qf2).setTimestamp(timestamp).setType(Cell.Type.Put) 1077 .setValue(qf1).build(); 1078 PrivateCellUtil.setSequenceId(cell2, seqId); 1079 testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Arrays.asList(cell1, cell2)); 1080 } 1081 1082 private void testNumberOfMemStoreScannersAfterFlush(List<Cell> inputCellsBeforeSnapshot, 1083 List<Cell> inputCellsAfterSnapshot) throws IOException { 1084 init(this.name.getMethodName() + "-" + inputCellsBeforeSnapshot.size()); 1085 TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); 1086 long seqId = Long.MIN_VALUE; 1087 for (Cell c : inputCellsBeforeSnapshot) { 1088 quals.add(CellUtil.cloneQualifier(c)); 1089 seqId = Math.max(seqId, c.getSequenceId()); 1090 } 1091 for (Cell c : inputCellsAfterSnapshot) { 1092 quals.add(CellUtil.cloneQualifier(c)); 1093 seqId = Math.max(seqId, c.getSequenceId()); 1094 } 1095 inputCellsBeforeSnapshot.forEach(c -> store.add(c, null)); 1096 StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY); 1097 storeFlushCtx.prepare(); 1098 inputCellsAfterSnapshot.forEach(c -> store.add(c, null)); 1099 int numberOfMemScannersBeforeFlush = inputCellsAfterSnapshot.isEmpty() ? 1 : 2; 1100 try (StoreScanner s = (StoreScanner) store.getScanner(new Scan(), quals, seqId)) { 1101 // snapshot + active (if inputCellsAfterSnapshot isn't empty) 1102 assertEquals(numberOfMemScannersBeforeFlush, countMemStoreScanner(s)); 1103 storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class)); 1104 storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); 1105 // snapshot has no data after flush 1106 int numberOfMemScannersAfterFlush = inputCellsAfterSnapshot.isEmpty() ? 0 : 1; 1107 boolean more; 1108 int cellCount = 0; 1109 do { 1110 List<Cell> cells = new ArrayList<>(); 1111 more = s.next(cells); 1112 cellCount += cells.size(); 1113 assertEquals(more ? numberOfMemScannersAfterFlush : 0, countMemStoreScanner(s)); 1114 } while (more); 1115 assertEquals("The number of cells added before snapshot is " + inputCellsBeforeSnapshot.size() 1116 + ", The number of cells added after snapshot is " + inputCellsAfterSnapshot.size(), 1117 inputCellsBeforeSnapshot.size() + inputCellsAfterSnapshot.size(), cellCount); 1118 // the current scanners is cleared 1119 assertEquals(0, countMemStoreScanner(s)); 1120 } 1121 } 1122 1123 private Cell createCell(byte[] qualifier, long ts, long sequenceId, byte[] value) 1124 throws IOException { 1125 return createCell(row, qualifier, ts, sequenceId, value); 1126 } 1127 1128 private Cell createCell(byte[] row, byte[] qualifier, long ts, long sequenceId, byte[] value) 1129 throws IOException { 1130 Cell c = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family) 1131 .setQualifier(qualifier).setTimestamp(ts).setType(Cell.Type.Put) 1132 .setValue(value).build(); 1133 PrivateCellUtil.setSequenceId(c, sequenceId); 1134 return c; 1135 } 1136 1137 @Test 1138 public void testFlushBeforeCompletingScanWoFilter() throws IOException, InterruptedException { 1139 final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false); 1140 final int expectedSize = 3; 1141 testFlushBeforeCompletingScan(new MyListHook() { 1142 @Override 1143 public void hook(int currentSize) { 1144 if (currentSize == expectedSize - 1) { 1145 try { 1146 flushStore(store, id++); 1147 timeToGoNextRow.set(true); 1148 } catch (IOException e) { 1149 throw new RuntimeException(e); 1150 } 1151 } 1152 } 1153 }, new FilterBase() { 1154 @Override 1155 public Filter.ReturnCode filterCell(final Cell c) throws IOException { 1156 return ReturnCode.INCLUDE; 1157 } 1158 }, expectedSize); 1159 } 1160 1161 @Test 1162 public void testFlushBeforeCompletingScanWithFilter() throws IOException, InterruptedException { 1163 final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false); 1164 final int expectedSize = 2; 1165 testFlushBeforeCompletingScan(new MyListHook() { 1166 @Override 1167 public void hook(int currentSize) { 1168 if (currentSize == expectedSize - 1) { 1169 try { 1170 flushStore(store, id++); 1171 timeToGoNextRow.set(true); 1172 } catch (IOException e) { 1173 throw new RuntimeException(e); 1174 } 1175 } 1176 } 1177 }, new FilterBase() { 1178 @Override 1179 public Filter.ReturnCode filterCell(final Cell c) throws IOException { 1180 if (timeToGoNextRow.get()) { 1181 timeToGoNextRow.set(false); 1182 return ReturnCode.NEXT_ROW; 1183 } else { 1184 return ReturnCode.INCLUDE; 1185 } 1186 } 1187 }, expectedSize); 1188 } 1189 1190 @Test 1191 public void testFlushBeforeCompletingScanWithFilterHint() throws IOException, 1192 InterruptedException { 1193 final AtomicBoolean timeToGetHint = new AtomicBoolean(false); 1194 final int expectedSize = 2; 1195 testFlushBeforeCompletingScan(new MyListHook() { 1196 @Override 1197 public void hook(int currentSize) { 1198 if (currentSize == expectedSize - 1) { 1199 try { 1200 flushStore(store, id++); 1201 timeToGetHint.set(true); 1202 } catch (IOException e) { 1203 throw new RuntimeException(e); 1204 } 1205 } 1206 } 1207 }, new FilterBase() { 1208 @Override 1209 public Filter.ReturnCode filterCell(final Cell c) throws IOException { 1210 if (timeToGetHint.get()) { 1211 timeToGetHint.set(false); 1212 return Filter.ReturnCode.SEEK_NEXT_USING_HINT; 1213 } else { 1214 return Filter.ReturnCode.INCLUDE; 1215 } 1216 } 1217 @Override 1218 public Cell getNextCellHint(Cell currentCell) throws IOException { 1219 return currentCell; 1220 } 1221 }, expectedSize); 1222 } 1223 1224 private void testFlushBeforeCompletingScan(MyListHook hook, Filter filter, int expectedSize) 1225 throws IOException, InterruptedException { 1226 Configuration conf = HBaseConfiguration.create(); 1227 byte[] r0 = Bytes.toBytes("row0"); 1228 byte[] r1 = Bytes.toBytes("row1"); 1229 byte[] r2 = Bytes.toBytes("row2"); 1230 byte[] value0 = Bytes.toBytes("value0"); 1231 byte[] value1 = Bytes.toBytes("value1"); 1232 byte[] value2 = Bytes.toBytes("value2"); 1233 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 1234 long ts = EnvironmentEdgeManager.currentTime(); 1235 long seqId = 100; 1236 init(name.getMethodName(), conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), 1237 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(1).build(), 1238 new MyStoreHook() { 1239 @Override 1240 public long getSmallestReadPoint(HStore store) { 1241 return seqId + 3; 1242 } 1243 }); 1244 // The cells having the value0 won't be flushed to disk because the value of max version is 1 1245 store.add(createCell(r0, qf1, ts, seqId, value0), memStoreSizing); 1246 store.add(createCell(r0, qf2, ts, seqId, value0), memStoreSizing); 1247 store.add(createCell(r0, qf3, ts, seqId, value0), memStoreSizing); 1248 store.add(createCell(r1, qf1, ts + 1, seqId + 1, value1), memStoreSizing); 1249 store.add(createCell(r1, qf2, ts + 1, seqId + 1, value1), memStoreSizing); 1250 store.add(createCell(r1, qf3, ts + 1, seqId + 1, value1), memStoreSizing); 1251 store.add(createCell(r2, qf1, ts + 2, seqId + 2, value2), memStoreSizing); 1252 store.add(createCell(r2, qf2, ts + 2, seqId + 2, value2), memStoreSizing); 1253 store.add(createCell(r2, qf3, ts + 2, seqId + 2, value2), memStoreSizing); 1254 store.add(createCell(r1, qf1, ts + 3, seqId + 3, value1), memStoreSizing); 1255 store.add(createCell(r1, qf2, ts + 3, seqId + 3, value1), memStoreSizing); 1256 store.add(createCell(r1, qf3, ts + 3, seqId + 3, value1), memStoreSizing); 1257 List<Cell> myList = new MyList<>(hook); 1258 Scan scan = new Scan() 1259 .withStartRow(r1) 1260 .setFilter(filter); 1261 try (InternalScanner scanner = (InternalScanner) store.getScanner( 1262 scan, null, seqId + 3)){ 1263 // r1 1264 scanner.next(myList); 1265 assertEquals(expectedSize, myList.size()); 1266 for (Cell c : myList) { 1267 byte[] actualValue = CellUtil.cloneValue(c); 1268 assertTrue("expected:" + Bytes.toStringBinary(value1) 1269 + ", actual:" + Bytes.toStringBinary(actualValue) 1270 , Bytes.equals(actualValue, value1)); 1271 } 1272 List<Cell> normalList = new ArrayList<>(3); 1273 // r2 1274 scanner.next(normalList); 1275 assertEquals(3, normalList.size()); 1276 for (Cell c : normalList) { 1277 byte[] actualValue = CellUtil.cloneValue(c); 1278 assertTrue("expected:" + Bytes.toStringBinary(value2) 1279 + ", actual:" + Bytes.toStringBinary(actualValue) 1280 , Bytes.equals(actualValue, value2)); 1281 } 1282 } 1283 } 1284 1285 @Test 1286 public void testCreateScannerAndSnapshotConcurrently() throws IOException, InterruptedException { 1287 Configuration conf = HBaseConfiguration.create(); 1288 conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore.class.getName()); 1289 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) 1290 .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); 1291 byte[] value = Bytes.toBytes("value"); 1292 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 1293 long ts = EnvironmentEdgeManager.currentTime(); 1294 long seqId = 100; 1295 // older data whihc shouldn't be "seen" by client 1296 store.add(createCell(qf1, ts, seqId, value), memStoreSizing); 1297 store.add(createCell(qf2, ts, seqId, value), memStoreSizing); 1298 store.add(createCell(qf3, ts, seqId, value), memStoreSizing); 1299 TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); 1300 quals.add(qf1); 1301 quals.add(qf2); 1302 quals.add(qf3); 1303 StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY); 1304 MyCompactingMemStore.START_TEST.set(true); 1305 Runnable flush = () -> { 1306 // this is blocked until we create first scanner from pipeline and snapshot -- phase (1/5) 1307 // recreate the active memstore -- phase (4/5) 1308 storeFlushCtx.prepare(); 1309 }; 1310 ExecutorService service = Executors.newSingleThreadExecutor(); 1311 service.submit(flush); 1312 // we get scanner from pipeline and snapshot but they are empty. -- phase (2/5) 1313 // this is blocked until we recreate the active memstore -- phase (3/5) 1314 // we get scanner from active memstore but it is empty -- phase (5/5) 1315 InternalScanner scanner = (InternalScanner) store.getScanner( 1316 new Scan(new Get(row)), quals, seqId + 1); 1317 service.shutdown(); 1318 service.awaitTermination(20, TimeUnit.SECONDS); 1319 try { 1320 try { 1321 List<Cell> results = new ArrayList<>(); 1322 scanner.next(results); 1323 assertEquals(3, results.size()); 1324 for (Cell c : results) { 1325 byte[] actualValue = CellUtil.cloneValue(c); 1326 assertTrue("expected:" + Bytes.toStringBinary(value) 1327 + ", actual:" + Bytes.toStringBinary(actualValue) 1328 , Bytes.equals(actualValue, value)); 1329 } 1330 } finally { 1331 scanner.close(); 1332 } 1333 } finally { 1334 MyCompactingMemStore.START_TEST.set(false); 1335 storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class)); 1336 storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); 1337 } 1338 } 1339 1340 @Test 1341 public void testScanWithDoubleFlush() throws IOException { 1342 Configuration conf = HBaseConfiguration.create(); 1343 // Initialize region 1344 MyStore myStore = initMyStore(name.getMethodName(), conf, new MyStoreHook(){ 1345 @Override 1346 public void getScanners(MyStore store) throws IOException { 1347 final long tmpId = id++; 1348 ExecutorService s = Executors.newSingleThreadExecutor(); 1349 s.submit(() -> { 1350 try { 1351 // flush the store before storescanner updates the scanners from store. 1352 // The current data will be flushed into files, and the memstore will 1353 // be clear. 1354 // -- phase (4/4) 1355 flushStore(store, tmpId); 1356 }catch (IOException ex) { 1357 throw new RuntimeException(ex); 1358 } 1359 }); 1360 s.shutdown(); 1361 try { 1362 // wait for the flush, the thread will be blocked in HStore#notifyChangedReadersObservers. 1363 s.awaitTermination(3, TimeUnit.SECONDS); 1364 } catch (InterruptedException ex) { 1365 } 1366 } 1367 }); 1368 byte[] oldValue = Bytes.toBytes("oldValue"); 1369 byte[] currentValue = Bytes.toBytes("currentValue"); 1370 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 1371 long ts = EnvironmentEdgeManager.currentTime(); 1372 long seqId = 100; 1373 // older data whihc shouldn't be "seen" by client 1374 myStore.add(createCell(qf1, ts, seqId, oldValue), memStoreSizing); 1375 myStore.add(createCell(qf2, ts, seqId, oldValue), memStoreSizing); 1376 myStore.add(createCell(qf3, ts, seqId, oldValue), memStoreSizing); 1377 long snapshotId = id++; 1378 // push older data into snapshot -- phase (1/4) 1379 StoreFlushContext storeFlushCtx = store.createFlushContext(snapshotId, FlushLifeCycleTracker 1380 .DUMMY); 1381 storeFlushCtx.prepare(); 1382 1383 // insert current data into active -- phase (2/4) 1384 myStore.add(createCell(qf1, ts + 1, seqId + 1, currentValue), memStoreSizing); 1385 myStore.add(createCell(qf2, ts + 1, seqId + 1, currentValue), memStoreSizing); 1386 myStore.add(createCell(qf3, ts + 1, seqId + 1, currentValue), memStoreSizing); 1387 TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); 1388 quals.add(qf1); 1389 quals.add(qf2); 1390 quals.add(qf3); 1391 try (InternalScanner scanner = (InternalScanner) myStore.getScanner( 1392 new Scan(new Get(row)), quals, seqId + 1)) { 1393 // complete the flush -- phase (3/4) 1394 storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class)); 1395 storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); 1396 1397 List<Cell> results = new ArrayList<>(); 1398 scanner.next(results); 1399 assertEquals(3, results.size()); 1400 for (Cell c : results) { 1401 byte[] actualValue = CellUtil.cloneValue(c); 1402 assertTrue("expected:" + Bytes.toStringBinary(currentValue) 1403 + ", actual:" + Bytes.toStringBinary(actualValue) 1404 , Bytes.equals(actualValue, currentValue)); 1405 } 1406 } 1407 } 1408 1409 @Test 1410 public void testReclaimChunkWhenScaning() throws IOException { 1411 init("testReclaimChunkWhenScaning"); 1412 long ts = EnvironmentEdgeManager.currentTime(); 1413 long seqId = 100; 1414 byte[] value = Bytes.toBytes("value"); 1415 // older data whihc shouldn't be "seen" by client 1416 store.add(createCell(qf1, ts, seqId, value), null); 1417 store.add(createCell(qf2, ts, seqId, value), null); 1418 store.add(createCell(qf3, ts, seqId, value), null); 1419 TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); 1420 quals.add(qf1); 1421 quals.add(qf2); 1422 quals.add(qf3); 1423 try (InternalScanner scanner = (InternalScanner) store.getScanner( 1424 new Scan(new Get(row)), quals, seqId)) { 1425 List<Cell> results = new MyList<>(size -> { 1426 switch (size) { 1427 // 1) we get the first cell (qf1) 1428 // 2) flush the data to have StoreScanner update inner scanners 1429 // 3) the chunk will be reclaimed after updaing 1430 case 1: 1431 try { 1432 flushStore(store, id++); 1433 } catch (IOException e) { 1434 throw new RuntimeException(e); 1435 } 1436 break; 1437 // 1) we get the second cell (qf2) 1438 // 2) add some cell to fill some byte into the chunk (we have only one chunk) 1439 case 2: 1440 try { 1441 byte[] newValue = Bytes.toBytes("newValue"); 1442 // older data whihc shouldn't be "seen" by client 1443 store.add(createCell(qf1, ts + 1, seqId + 1, newValue), null); 1444 store.add(createCell(qf2, ts + 1, seqId + 1, newValue), null); 1445 store.add(createCell(qf3, ts + 1, seqId + 1, newValue), null); 1446 } catch (IOException e) { 1447 throw new RuntimeException(e); 1448 } 1449 break; 1450 default: 1451 break; 1452 } 1453 }); 1454 scanner.next(results); 1455 assertEquals(3, results.size()); 1456 for (Cell c : results) { 1457 byte[] actualValue = CellUtil.cloneValue(c); 1458 assertTrue("expected:" + Bytes.toStringBinary(value) 1459 + ", actual:" + Bytes.toStringBinary(actualValue) 1460 , Bytes.equals(actualValue, value)); 1461 } 1462 } 1463 } 1464 1465 /** 1466 * If there are two running InMemoryFlushRunnable, the later InMemoryFlushRunnable 1467 * may change the versionedList. And the first InMemoryFlushRunnable will use the chagned 1468 * versionedList to remove the corresponding segments. 1469 * In short, there will be some segements which isn't in merge are removed. 1470 * @throws IOException 1471 * @throws InterruptedException 1472 */ 1473 @Test 1474 public void testRunDoubleMemStoreCompactors() throws IOException, InterruptedException { 1475 int flushSize = 500; 1476 Configuration conf = HBaseConfiguration.create(); 1477 conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStoreWithCustomCompactor.class.getName()); 1478 conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.25); 1479 MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.set(0); 1480 conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushSize)); 1481 // Set the lower threshold to invoke the "MERGE" policy 1482 conf.set(MemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY, String.valueOf(0)); 1483 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) 1484 .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); 1485 byte[] value = Bytes.toBytes("thisisavarylargevalue"); 1486 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 1487 long ts = EnvironmentEdgeManager.currentTime(); 1488 long seqId = 100; 1489 // older data whihc shouldn't be "seen" by client 1490 store.add(createCell(qf1, ts, seqId, value), memStoreSizing); 1491 store.add(createCell(qf2, ts, seqId, value), memStoreSizing); 1492 store.add(createCell(qf3, ts, seqId, value), memStoreSizing); 1493 assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get()); 1494 StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY); 1495 storeFlushCtx.prepare(); 1496 // This shouldn't invoke another in-memory flush because the first compactor thread 1497 // hasn't accomplished the in-memory compaction. 1498 store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing); 1499 store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing); 1500 store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing); 1501 assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get()); 1502 //okay. Let the compaction be completed 1503 MyMemStoreCompactor.START_COMPACTOR_LATCH.countDown(); 1504 CompactingMemStore mem = (CompactingMemStore) ((HStore)store).memstore; 1505 while (mem.isMemStoreFlushingInMemory()) { 1506 TimeUnit.SECONDS.sleep(1); 1507 } 1508 // This should invoke another in-memory flush. 1509 store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing); 1510 store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing); 1511 store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing); 1512 assertEquals(2, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get()); 1513 conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1514 String.valueOf(TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE)); 1515 storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class)); 1516 storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); 1517 } 1518 1519 @Test 1520 public void testAge() throws IOException { 1521 long currentTime = System.currentTimeMillis(); 1522 ManualEnvironmentEdge edge = new ManualEnvironmentEdge(); 1523 edge.setValue(currentTime); 1524 EnvironmentEdgeManager.injectEdge(edge); 1525 Configuration conf = TEST_UTIL.getConfiguration(); 1526 ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.of(family); 1527 initHRegion(name.getMethodName(), conf, 1528 TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), hcd, null, false); 1529 HStore store = new HStore(region, hcd, conf) { 1530 1531 @Override 1532 protected StoreEngine<?, ?, ?, ?> createStoreEngine(HStore store, Configuration conf, 1533 CellComparator kvComparator) throws IOException { 1534 List<HStoreFile> storefiles = 1535 Arrays.asList(mockStoreFile(currentTime - 10), mockStoreFile(currentTime - 100), 1536 mockStoreFile(currentTime - 1000), mockStoreFile(currentTime - 10000)); 1537 StoreFileManager sfm = mock(StoreFileManager.class); 1538 when(sfm.getStorefiles()).thenReturn(storefiles); 1539 StoreEngine<?, ?, ?, ?> storeEngine = mock(StoreEngine.class); 1540 when(storeEngine.getStoreFileManager()).thenReturn(sfm); 1541 return storeEngine; 1542 } 1543 }; 1544 assertEquals(10L, store.getMinStoreFileAge().getAsLong()); 1545 assertEquals(10000L, store.getMaxStoreFileAge().getAsLong()); 1546 assertEquals((10 + 100 + 1000 + 10000) / 4.0, store.getAvgStoreFileAge().getAsDouble(), 1E-4); 1547 } 1548 1549 private HStoreFile mockStoreFile(long createdTime) { 1550 StoreFileInfo info = mock(StoreFileInfo.class); 1551 when(info.getCreatedTimestamp()).thenReturn(createdTime); 1552 HStoreFile sf = mock(HStoreFile.class); 1553 when(sf.getReader()).thenReturn(mock(StoreFileReader.class)); 1554 when(sf.isHFile()).thenReturn(true); 1555 when(sf.getFileInfo()).thenReturn(info); 1556 return sf; 1557 } 1558 1559 private MyStore initMyStore(String methodName, Configuration conf, MyStoreHook hook) 1560 throws IOException { 1561 return (MyStore) init(methodName, conf, 1562 TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), 1563 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(5).build(), hook); 1564 } 1565 1566 private static class MyStore extends HStore { 1567 private final MyStoreHook hook; 1568 1569 MyStore(final HRegion region, final ColumnFamilyDescriptor family, final Configuration 1570 confParam, MyStoreHook hook, boolean switchToPread) throws IOException { 1571 super(region, family, confParam); 1572 this.hook = hook; 1573 } 1574 1575 @Override 1576 public List<KeyValueScanner> getScanners(List<HStoreFile> files, boolean cacheBlocks, 1577 boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, 1578 boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt, 1579 boolean includeMemstoreScanner) throws IOException { 1580 hook.getScanners(this); 1581 return super.getScanners(files, cacheBlocks, usePread, isCompaction, matcher, startRow, true, 1582 stopRow, false, readPt, includeMemstoreScanner); 1583 } 1584 1585 @Override 1586 public long getSmallestReadPoint() { 1587 return hook.getSmallestReadPoint(this); 1588 } 1589 } 1590 1591 private abstract static class MyStoreHook { 1592 1593 void getScanners(MyStore store) throws IOException { 1594 } 1595 1596 long getSmallestReadPoint(HStore store) { 1597 return store.getHRegion().getSmallestReadPoint(); 1598 } 1599 } 1600 1601 @Test 1602 public void testSwitchingPreadtoStreamParallelyWithCompactionDischarger() throws Exception { 1603 Configuration conf = HBaseConfiguration.create(); 1604 conf.set("hbase.hstore.engine.class", DummyStoreEngine.class.getName()); 1605 conf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 0); 1606 // Set the lower threshold to invoke the "MERGE" policy 1607 MyStore store = initMyStore(name.getMethodName(), conf, new MyStoreHook() {}); 1608 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 1609 long ts = System.currentTimeMillis(); 1610 long seqID = 1L; 1611 // Add some data to the region and do some flushes 1612 for (int i = 1; i < 10; i++) { 1613 store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), 1614 memStoreSizing); 1615 } 1616 // flush them 1617 flushStore(store, seqID); 1618 for (int i = 11; i < 20; i++) { 1619 store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), 1620 memStoreSizing); 1621 } 1622 // flush them 1623 flushStore(store, seqID); 1624 for (int i = 21; i < 30; i++) { 1625 store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), 1626 memStoreSizing); 1627 } 1628 // flush them 1629 flushStore(store, seqID); 1630 1631 assertEquals(3, store.getStorefilesCount()); 1632 Scan scan = new Scan(); 1633 scan.addFamily(family); 1634 Collection<HStoreFile> storefiles2 = store.getStorefiles(); 1635 ArrayList<HStoreFile> actualStorefiles = Lists.newArrayList(storefiles2); 1636 StoreScanner storeScanner = 1637 (StoreScanner) store.getScanner(scan, scan.getFamilyMap().get(family), Long.MAX_VALUE); 1638 // get the current heap 1639 KeyValueHeap heap = storeScanner.heap; 1640 // create more store files 1641 for (int i = 31; i < 40; i++) { 1642 store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), 1643 memStoreSizing); 1644 } 1645 // flush them 1646 flushStore(store, seqID); 1647 1648 for (int i = 41; i < 50; i++) { 1649 store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), 1650 memStoreSizing); 1651 } 1652 // flush them 1653 flushStore(store, seqID); 1654 storefiles2 = store.getStorefiles(); 1655 ArrayList<HStoreFile> actualStorefiles1 = Lists.newArrayList(storefiles2); 1656 actualStorefiles1.removeAll(actualStorefiles); 1657 // Do compaction 1658 MyThread thread = new MyThread(storeScanner); 1659 thread.start(); 1660 store.replaceStoreFiles(actualStorefiles, actualStorefiles1); 1661 thread.join(); 1662 KeyValueHeap heap2 = thread.getHeap(); 1663 assertFalse(heap.equals(heap2)); 1664 } 1665 1666 @Test 1667 public void testInMemoryCompactionTypeWithLowerCase() throws IOException, InterruptedException { 1668 Configuration conf = HBaseConfiguration.create(); 1669 conf.set("hbase.systemtables.compacting.memstore.type", "eager"); 1670 init(name.getMethodName(), conf, 1671 TableDescriptorBuilder.newBuilder( 1672 TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME, "meta".getBytes())), 1673 ColumnFamilyDescriptorBuilder.newBuilder(family) 1674 .setInMemoryCompaction(MemoryCompactionPolicy.NONE).build()); 1675 assertTrue(((MemStoreCompactor) ((CompactingMemStore) store.memstore).compactor).toString() 1676 .startsWith("eager".toUpperCase())); 1677 } 1678 1679 @Test 1680 public void testSpaceQuotaChangeAfterReplacement() throws IOException { 1681 final TableName tn = TableName.valueOf(name.getMethodName()); 1682 init(name.getMethodName()); 1683 1684 RegionSizeStoreImpl sizeStore = new RegionSizeStoreImpl(); 1685 1686 HStoreFile sf1 = mockStoreFileWithLength(1024L); 1687 HStoreFile sf2 = mockStoreFileWithLength(2048L); 1688 HStoreFile sf3 = mockStoreFileWithLength(4096L); 1689 HStoreFile sf4 = mockStoreFileWithLength(8192L); 1690 1691 RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tn).setStartKey(Bytes.toBytes("a")) 1692 .setEndKey(Bytes.toBytes("b")).build(); 1693 1694 // Compacting two files down to one, reducing size 1695 sizeStore.put(regionInfo, 1024L + 4096L); 1696 store.updateSpaceQuotaAfterFileReplacement( 1697 sizeStore, regionInfo, Arrays.asList(sf1, sf3), Arrays.asList(sf2)); 1698 1699 assertEquals(2048L, sizeStore.getRegionSize(regionInfo).getSize()); 1700 1701 // The same file length in and out should have no change 1702 store.updateSpaceQuotaAfterFileReplacement( 1703 sizeStore, regionInfo, Arrays.asList(sf2), Arrays.asList(sf2)); 1704 1705 assertEquals(2048L, sizeStore.getRegionSize(regionInfo).getSize()); 1706 1707 // Increase the total size used 1708 store.updateSpaceQuotaAfterFileReplacement( 1709 sizeStore, regionInfo, Arrays.asList(sf2), Arrays.asList(sf3)); 1710 1711 assertEquals(4096L, sizeStore.getRegionSize(regionInfo).getSize()); 1712 1713 RegionInfo regionInfo2 = RegionInfoBuilder.newBuilder(tn).setStartKey(Bytes.toBytes("b")) 1714 .setEndKey(Bytes.toBytes("c")).build(); 1715 store.updateSpaceQuotaAfterFileReplacement(sizeStore, regionInfo2, null, Arrays.asList(sf4)); 1716 1717 assertEquals(8192L, sizeStore.getRegionSize(regionInfo2).getSize()); 1718 } 1719 1720 private HStoreFile mockStoreFileWithLength(long length) { 1721 HStoreFile sf = mock(HStoreFile.class); 1722 StoreFileReader sfr = mock(StoreFileReader.class); 1723 when(sf.isHFile()).thenReturn(true); 1724 when(sf.getReader()).thenReturn(sfr); 1725 when(sfr.length()).thenReturn(length); 1726 return sf; 1727 } 1728 1729 private static class MyThread extends Thread { 1730 private StoreScanner scanner; 1731 private KeyValueHeap heap; 1732 1733 public MyThread(StoreScanner scanner) { 1734 this.scanner = scanner; 1735 } 1736 1737 public KeyValueHeap getHeap() { 1738 return this.heap; 1739 } 1740 1741 @Override 1742 public void run() { 1743 scanner.trySwitchToStreamRead(); 1744 heap = scanner.heap; 1745 } 1746 } 1747 1748 private static class MyMemStoreCompactor extends MemStoreCompactor { 1749 private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0); 1750 private static final CountDownLatch START_COMPACTOR_LATCH = new CountDownLatch(1); 1751 public MyMemStoreCompactor(CompactingMemStore compactingMemStore, MemoryCompactionPolicy 1752 compactionPolicy) throws IllegalArgumentIOException { 1753 super(compactingMemStore, compactionPolicy); 1754 } 1755 1756 @Override 1757 public boolean start() throws IOException { 1758 boolean isFirst = RUNNER_COUNT.getAndIncrement() == 0; 1759 boolean rval = super.start(); 1760 if (isFirst) { 1761 try { 1762 START_COMPACTOR_LATCH.await(); 1763 } catch (InterruptedException ex) { 1764 throw new RuntimeException(ex); 1765 } 1766 } 1767 return rval; 1768 } 1769 } 1770 1771 public static class MyCompactingMemStoreWithCustomCompactor extends CompactingMemStore { 1772 private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0); 1773 public MyCompactingMemStoreWithCustomCompactor(Configuration conf, CellComparatorImpl c, 1774 HStore store, RegionServicesForStores regionServices, 1775 MemoryCompactionPolicy compactionPolicy) throws IOException { 1776 super(conf, c, store, regionServices, compactionPolicy); 1777 } 1778 1779 @Override 1780 protected MemStoreCompactor createMemStoreCompactor(MemoryCompactionPolicy compactionPolicy) 1781 throws IllegalArgumentIOException { 1782 return new MyMemStoreCompactor(this, compactionPolicy); 1783 } 1784 1785 @Override 1786 protected boolean shouldFlushInMemory() { 1787 boolean rval = super.shouldFlushInMemory(); 1788 if (rval) { 1789 RUNNER_COUNT.incrementAndGet(); 1790 if (LOG.isDebugEnabled()) { 1791 LOG.debug("runner count: " + RUNNER_COUNT.get()); 1792 } 1793 } 1794 return rval; 1795 } 1796 } 1797 1798 public static class MyCompactingMemStore extends CompactingMemStore { 1799 private static final AtomicBoolean START_TEST = new AtomicBoolean(false); 1800 private final CountDownLatch getScannerLatch = new CountDownLatch(1); 1801 private final CountDownLatch snapshotLatch = new CountDownLatch(1); 1802 public MyCompactingMemStore(Configuration conf, CellComparatorImpl c, 1803 HStore store, RegionServicesForStores regionServices, 1804 MemoryCompactionPolicy compactionPolicy) throws IOException { 1805 super(conf, c, store, regionServices, compactionPolicy); 1806 } 1807 1808 @Override 1809 protected List<KeyValueScanner> createList(int capacity) { 1810 if (START_TEST.get()) { 1811 try { 1812 getScannerLatch.countDown(); 1813 snapshotLatch.await(); 1814 } catch (InterruptedException e) { 1815 throw new RuntimeException(e); 1816 } 1817 } 1818 return new ArrayList<>(capacity); 1819 } 1820 @Override 1821 protected void pushActiveToPipeline(MutableSegment active) { 1822 if (START_TEST.get()) { 1823 try { 1824 getScannerLatch.await(); 1825 } catch (InterruptedException e) { 1826 throw new RuntimeException(e); 1827 } 1828 } 1829 1830 super.pushActiveToPipeline(active); 1831 if (START_TEST.get()) { 1832 snapshotLatch.countDown(); 1833 } 1834 } 1835 } 1836 1837 interface MyListHook { 1838 void hook(int currentSize); 1839 } 1840 1841 private static class MyList<T> implements List<T> { 1842 private final List<T> delegatee = new ArrayList<>(); 1843 private final MyListHook hookAtAdd; 1844 MyList(final MyListHook hookAtAdd) { 1845 this.hookAtAdd = hookAtAdd; 1846 } 1847 @Override 1848 public int size() {return delegatee.size();} 1849 1850 @Override 1851 public boolean isEmpty() {return delegatee.isEmpty();} 1852 1853 @Override 1854 public boolean contains(Object o) {return delegatee.contains(o);} 1855 1856 @Override 1857 public Iterator<T> iterator() {return delegatee.iterator();} 1858 1859 @Override 1860 public Object[] toArray() {return delegatee.toArray();} 1861 1862 @Override 1863 public <R> R[] toArray(R[] a) {return delegatee.toArray(a);} 1864 1865 @Override 1866 public boolean add(T e) { 1867 hookAtAdd.hook(size()); 1868 return delegatee.add(e); 1869 } 1870 1871 @Override 1872 public boolean remove(Object o) {return delegatee.remove(o);} 1873 1874 @Override 1875 public boolean containsAll(Collection<?> c) {return delegatee.containsAll(c);} 1876 1877 @Override 1878 public boolean addAll(Collection<? extends T> c) {return delegatee.addAll(c);} 1879 1880 @Override 1881 public boolean addAll(int index, Collection<? extends T> c) {return delegatee.addAll(index, c);} 1882 1883 @Override 1884 public boolean removeAll(Collection<?> c) {return delegatee.removeAll(c);} 1885 1886 @Override 1887 public boolean retainAll(Collection<?> c) {return delegatee.retainAll(c);} 1888 1889 @Override 1890 public void clear() {delegatee.clear();} 1891 1892 @Override 1893 public T get(int index) {return delegatee.get(index);} 1894 1895 @Override 1896 public T set(int index, T element) {return delegatee.set(index, element);} 1897 1898 @Override 1899 public void add(int index, T element) {delegatee.add(index, element);} 1900 1901 @Override 1902 public T remove(int index) {return delegatee.remove(index);} 1903 1904 @Override 1905 public int indexOf(Object o) {return delegatee.indexOf(o);} 1906 1907 @Override 1908 public int lastIndexOf(Object o) {return delegatee.lastIndexOf(o);} 1909 1910 @Override 1911 public ListIterator<T> listIterator() {return delegatee.listIterator();} 1912 1913 @Override 1914 public ListIterator<T> listIterator(int index) {return delegatee.listIterator(index);} 1915 1916 @Override 1917 public List<T> subList(int fromIndex, int toIndex) {return delegatee.subList(fromIndex, toIndex);} 1918 } 1919}