001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023import static org.junit.Assert.fail; 024import static org.mockito.ArgumentMatchers.any; 025import static org.mockito.Mockito.mock; 026import static org.mockito.Mockito.spy; 027import static org.mockito.Mockito.times; 028import static org.mockito.Mockito.verify; 029import static org.mockito.Mockito.when; 030 031import java.io.IOException; 032import java.lang.ref.SoftReference; 033import java.security.PrivilegedExceptionAction; 034import java.util.ArrayList; 035import java.util.Arrays; 036import java.util.Collection; 037import java.util.Collections; 038import java.util.Iterator; 039import java.util.List; 040import java.util.ListIterator; 041import java.util.NavigableSet; 042import java.util.TreeSet; 043import java.util.concurrent.ConcurrentSkipListSet; 044import java.util.concurrent.CountDownLatch; 045import java.util.concurrent.ExecutorService; 046import java.util.concurrent.Executors; 047import java.util.concurrent.ThreadPoolExecutor; 048import java.util.concurrent.TimeUnit; 049import java.util.concurrent.atomic.AtomicBoolean; 050import java.util.concurrent.atomic.AtomicInteger; 051import org.apache.hadoop.conf.Configuration; 052import org.apache.hadoop.fs.FSDataOutputStream; 053import org.apache.hadoop.fs.FileStatus; 054import org.apache.hadoop.fs.FileSystem; 055import org.apache.hadoop.fs.FilterFileSystem; 056import org.apache.hadoop.fs.LocalFileSystem; 057import org.apache.hadoop.fs.Path; 058import org.apache.hadoop.fs.permission.FsPermission; 059import org.apache.hadoop.hbase.Cell; 060import org.apache.hadoop.hbase.CellBuilderFactory; 061import org.apache.hadoop.hbase.CellBuilderType; 062import org.apache.hadoop.hbase.CellComparator; 063import org.apache.hadoop.hbase.CellComparatorImpl; 064import org.apache.hadoop.hbase.CellUtil; 065import org.apache.hadoop.hbase.HBaseClassTestRule; 066import org.apache.hadoop.hbase.HBaseConfiguration; 067import org.apache.hadoop.hbase.HBaseTestingUtility; 068import org.apache.hadoop.hbase.HConstants; 069import org.apache.hadoop.hbase.KeyValue; 070import org.apache.hadoop.hbase.MemoryCompactionPolicy; 071import org.apache.hadoop.hbase.NamespaceDescriptor; 072import org.apache.hadoop.hbase.PrivateCellUtil; 073import org.apache.hadoop.hbase.TableName; 074import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 075import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 076import org.apache.hadoop.hbase.client.Get; 077import org.apache.hadoop.hbase.client.RegionInfo; 078import org.apache.hadoop.hbase.client.RegionInfoBuilder; 079import org.apache.hadoop.hbase.client.Scan; 080import org.apache.hadoop.hbase.client.TableDescriptor; 081import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 082import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException; 083import org.apache.hadoop.hbase.filter.Filter; 084import org.apache.hadoop.hbase.filter.FilterBase; 085import org.apache.hadoop.hbase.io.compress.Compression; 086import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 087import org.apache.hadoop.hbase.io.hfile.CacheConfig; 088import org.apache.hadoop.hbase.io.hfile.HFile; 089import org.apache.hadoop.hbase.io.hfile.HFileContext; 090import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 091import org.apache.hadoop.hbase.monitoring.MonitoredTask; 092import org.apache.hadoop.hbase.quotas.RegionSizeStoreImpl; 093import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; 094import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; 095import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher; 096import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; 097import org.apache.hadoop.hbase.security.User; 098import org.apache.hadoop.hbase.testclassification.MediumTests; 099import org.apache.hadoop.hbase.testclassification.RegionServerTests; 100import org.apache.hadoop.hbase.util.Bytes; 101import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 102import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; 103import org.apache.hadoop.hbase.util.FSUtils; 104import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge; 105import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; 106import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 107import org.apache.hadoop.hbase.wal.WALFactory; 108import org.apache.hadoop.util.Progressable; 109import org.junit.After; 110import org.junit.AfterClass; 111import org.junit.Before; 112import org.junit.ClassRule; 113import org.junit.Rule; 114import org.junit.Test; 115import org.junit.experimental.categories.Category; 116import org.junit.rules.TestName; 117import org.mockito.Mockito; 118import org.slf4j.Logger; 119import org.slf4j.LoggerFactory; 120 121import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 122 123/** 124 * Test class for the HStore 125 */ 126@Category({ RegionServerTests.class, MediumTests.class }) 127public class TestHStore { 128 129 @ClassRule 130 public static final HBaseClassTestRule CLASS_RULE = 131 HBaseClassTestRule.forClass(TestHStore.class); 132 133 private static final Logger LOG = LoggerFactory.getLogger(TestHStore.class); 134 @Rule 135 public TestName name = new TestName(); 136 137 HRegion region; 138 HStore store; 139 byte [] table = Bytes.toBytes("table"); 140 byte [] family = Bytes.toBytes("family"); 141 142 byte [] row = Bytes.toBytes("row"); 143 byte [] row2 = Bytes.toBytes("row2"); 144 byte [] qf1 = Bytes.toBytes("qf1"); 145 byte [] qf2 = Bytes.toBytes("qf2"); 146 byte [] qf3 = Bytes.toBytes("qf3"); 147 byte [] qf4 = Bytes.toBytes("qf4"); 148 byte [] qf5 = Bytes.toBytes("qf5"); 149 byte [] qf6 = Bytes.toBytes("qf6"); 150 151 NavigableSet<byte[]> qualifiers = new ConcurrentSkipListSet<>(Bytes.BYTES_COMPARATOR); 152 153 List<Cell> expected = new ArrayList<>(); 154 List<Cell> result = new ArrayList<>(); 155 156 long id = System.currentTimeMillis(); 157 Get get = new Get(row); 158 159 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 160 private static final String DIR = TEST_UTIL.getDataTestDir("TestStore").toString(); 161 162 163 /** 164 * Setup 165 * @throws IOException 166 */ 167 @Before 168 public void setUp() throws IOException { 169 qualifiers.add(qf1); 170 qualifiers.add(qf3); 171 qualifiers.add(qf5); 172 173 Iterator<byte[]> iter = qualifiers.iterator(); 174 while(iter.hasNext()){ 175 byte [] next = iter.next(); 176 expected.add(new KeyValue(row, family, next, 1, (byte[])null)); 177 get.addColumn(family, next); 178 } 179 } 180 181 private void init(String methodName) throws IOException { 182 init(methodName, TEST_UTIL.getConfiguration()); 183 } 184 185 private HStore init(String methodName, Configuration conf) throws IOException { 186 // some of the tests write 4 versions and then flush 187 // (with HBASE-4241, lower versions are collected on flush) 188 return init(methodName, conf, 189 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(4).build()); 190 } 191 192 private HStore init(String methodName, Configuration conf, ColumnFamilyDescriptor hcd) 193 throws IOException { 194 return init(methodName, conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), hcd); 195 } 196 197 private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder, 198 ColumnFamilyDescriptor hcd) throws IOException { 199 return init(methodName, conf, builder, hcd, null); 200 } 201 202 private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder, 203 ColumnFamilyDescriptor hcd, MyStoreHook hook) throws IOException { 204 return init(methodName, conf, builder, hcd, hook, false); 205 } 206 207 private void initHRegion(String methodName, Configuration conf, TableDescriptorBuilder builder, 208 ColumnFamilyDescriptor hcd, MyStoreHook hook, boolean switchToPread) throws IOException { 209 TableDescriptor htd = builder.setColumnFamily(hcd).build(); 210 Path basedir = new Path(DIR + methodName); 211 Path tableDir = FSUtils.getTableDir(basedir, htd.getTableName()); 212 final Path logdir = new Path(basedir, AbstractFSWALProvider.getWALDirectoryName(methodName)); 213 214 FileSystem fs = FileSystem.get(conf); 215 216 fs.delete(logdir, true); 217 ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 218 MemStoreLABImpl.CHUNK_SIZE_DEFAULT, 1, 0, null); 219 RegionInfo info = RegionInfoBuilder.newBuilder(htd.getTableName()).build(); 220 Configuration walConf = new Configuration(conf); 221 FSUtils.setRootDir(walConf, basedir); 222 WALFactory wals = new WALFactory(walConf, methodName); 223 region = new HRegion(new HRegionFileSystem(conf, fs, tableDir, info), wals.getWAL(info), conf, 224 htd, null); 225 region.regionServicesForStores = Mockito.spy(region.regionServicesForStores); 226 ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1); 227 Mockito.when(region.regionServicesForStores.getInMemoryCompactionPool()).thenReturn(pool); 228 } 229 230 private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder, 231 ColumnFamilyDescriptor hcd, MyStoreHook hook, boolean switchToPread) throws IOException { 232 initHRegion(methodName, conf, builder, hcd, hook, switchToPread); 233 if (hook == null) { 234 store = new HStore(region, hcd, conf, false); 235 } else { 236 store = new MyStore(region, hcd, conf, hook, switchToPread); 237 } 238 return store; 239 } 240 241 /** 242 * Test we do not lose data if we fail a flush and then close. 243 * Part of HBase-10466 244 * @throws Exception 245 */ 246 @Test 247 public void testFlushSizeSizing() throws Exception { 248 LOG.info("Setting up a faulty file system that cannot write in " + this.name.getMethodName()); 249 final Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); 250 // Only retry once. 251 conf.setInt("hbase.hstore.flush.retries.number", 1); 252 User user = User.createUserForTesting(conf, this.name.getMethodName(), 253 new String[]{"foo"}); 254 // Inject our faulty LocalFileSystem 255 conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class); 256 user.runAs(new PrivilegedExceptionAction<Object>() { 257 @Override 258 public Object run() throws Exception { 259 // Make sure it worked (above is sensitive to caching details in hadoop core) 260 FileSystem fs = FileSystem.get(conf); 261 assertEquals(FaultyFileSystem.class, fs.getClass()); 262 FaultyFileSystem ffs = (FaultyFileSystem)fs; 263 264 // Initialize region 265 init(name.getMethodName(), conf); 266 267 MemStoreSize mss = store.memstore.getFlushableSize(); 268 assertEquals(0, mss.getDataSize()); 269 LOG.info("Adding some data"); 270 MemStoreSizing kvSize = new NonThreadSafeMemStoreSizing(); 271 store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), kvSize); 272 // add the heap size of active (mutable) segment 273 kvSize.incMemStoreSize(0, MutableSegment.DEEP_OVERHEAD, 0, 0); 274 mss = store.memstore.getFlushableSize(); 275 assertEquals(kvSize.getMemStoreSize(), mss); 276 // Flush. Bug #1 from HBASE-10466. Make sure size calculation on failed flush is right. 277 try { 278 LOG.info("Flushing"); 279 flushStore(store, id++); 280 fail("Didn't bubble up IOE!"); 281 } catch (IOException ioe) { 282 assertTrue(ioe.getMessage().contains("Fault injected")); 283 } 284 // due to snapshot, change mutable to immutable segment 285 kvSize.incMemStoreSize(0, 286 CSLMImmutableSegment.DEEP_OVERHEAD_CSLM - MutableSegment.DEEP_OVERHEAD, 0, 0); 287 mss = store.memstore.getFlushableSize(); 288 assertEquals(kvSize.getMemStoreSize(), mss); 289 MemStoreSizing kvSize2 = new NonThreadSafeMemStoreSizing(); 290 store.add(new KeyValue(row, family, qf2, 2, (byte[]) null), kvSize2); 291 kvSize2.incMemStoreSize(0, MutableSegment.DEEP_OVERHEAD, 0, 0); 292 // Even though we add a new kv, we expect the flushable size to be 'same' since we have 293 // not yet cleared the snapshot -- the above flush failed. 294 assertEquals(kvSize.getMemStoreSize(), mss); 295 ffs.fault.set(false); 296 flushStore(store, id++); 297 mss = store.memstore.getFlushableSize(); 298 // Size should be the foreground kv size. 299 assertEquals(kvSize2.getMemStoreSize(), mss); 300 flushStore(store, id++); 301 mss = store.memstore.getFlushableSize(); 302 assertEquals(0, mss.getDataSize()); 303 assertEquals(MutableSegment.DEEP_OVERHEAD, mss.getHeapSize()); 304 return null; 305 } 306 }); 307 } 308 309 /** 310 * Verify that compression and data block encoding are respected by the 311 * Store.createWriterInTmp() method, used on store flush. 312 */ 313 @Test 314 public void testCreateWriter() throws Exception { 315 Configuration conf = HBaseConfiguration.create(); 316 FileSystem fs = FileSystem.get(conf); 317 318 ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.newBuilder(family) 319 .setCompressionType(Compression.Algorithm.GZ).setDataBlockEncoding(DataBlockEncoding.DIFF) 320 .build(); 321 init(name.getMethodName(), conf, hcd); 322 323 // Test createWriterInTmp() 324 StoreFileWriter writer = 325 store.createWriterInTmp(4, hcd.getCompressionType(), false, true, false, false); 326 Path path = writer.getPath(); 327 writer.append(new KeyValue(row, family, qf1, Bytes.toBytes(1))); 328 writer.append(new KeyValue(row, family, qf2, Bytes.toBytes(2))); 329 writer.append(new KeyValue(row2, family, qf1, Bytes.toBytes(3))); 330 writer.append(new KeyValue(row2, family, qf2, Bytes.toBytes(4))); 331 writer.close(); 332 333 // Verify that compression and encoding settings are respected 334 HFile.Reader reader = HFile.createReader(fs, path, new CacheConfig(conf), true, conf); 335 assertEquals(hcd.getCompressionType(), reader.getCompressionAlgorithm()); 336 assertEquals(hcd.getDataBlockEncoding(), reader.getDataBlockEncoding()); 337 reader.close(); 338 } 339 340 @Test 341 public void testDeleteExpiredStoreFiles() throws Exception { 342 testDeleteExpiredStoreFiles(0); 343 testDeleteExpiredStoreFiles(1); 344 } 345 346 /* 347 * @param minVersions the MIN_VERSIONS for the column family 348 */ 349 public void testDeleteExpiredStoreFiles(int minVersions) throws Exception { 350 int storeFileNum = 4; 351 int ttl = 4; 352 IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge(); 353 EnvironmentEdgeManagerTestHelper.injectEdge(edge); 354 355 Configuration conf = HBaseConfiguration.create(); 356 // Enable the expired store file deletion 357 conf.setBoolean("hbase.store.delete.expired.storefile", true); 358 // Set the compaction threshold higher to avoid normal compactions. 359 conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 5); 360 361 init(name.getMethodName() + "-" + minVersions, conf, ColumnFamilyDescriptorBuilder 362 .newBuilder(family).setMinVersions(minVersions).setTimeToLive(ttl).build()); 363 364 long storeTtl = this.store.getScanInfo().getTtl(); 365 long sleepTime = storeTtl / storeFileNum; 366 long timeStamp; 367 // There are 4 store files and the max time stamp difference among these 368 // store files will be (this.store.ttl / storeFileNum) 369 for (int i = 1; i <= storeFileNum; i++) { 370 LOG.info("Adding some data for the store file #" + i); 371 timeStamp = EnvironmentEdgeManager.currentTime(); 372 this.store.add(new KeyValue(row, family, qf1, timeStamp, (byte[]) null), null); 373 this.store.add(new KeyValue(row, family, qf2, timeStamp, (byte[]) null), null); 374 this.store.add(new KeyValue(row, family, qf3, timeStamp, (byte[]) null), null); 375 flush(i); 376 edge.incrementTime(sleepTime); 377 } 378 379 // Verify the total number of store files 380 assertEquals(storeFileNum, this.store.getStorefiles().size()); 381 382 // Each call will find one expired store file and delete it before compaction happens. 383 // There will be no compaction due to threshold above. Last file will not be replaced. 384 for (int i = 1; i <= storeFileNum - 1; i++) { 385 // verify the expired store file. 386 assertFalse(this.store.requestCompaction().isPresent()); 387 Collection<HStoreFile> sfs = this.store.getStorefiles(); 388 // Ensure i files are gone. 389 if (minVersions == 0) { 390 assertEquals(storeFileNum - i, sfs.size()); 391 // Ensure only non-expired files remain. 392 for (HStoreFile sf : sfs) { 393 assertTrue(sf.getReader().getMaxTimestamp() >= (edge.currentTime() - storeTtl)); 394 } 395 } else { 396 assertEquals(storeFileNum, sfs.size()); 397 } 398 // Let the next store file expired. 399 edge.incrementTime(sleepTime); 400 } 401 assertFalse(this.store.requestCompaction().isPresent()); 402 403 Collection<HStoreFile> sfs = this.store.getStorefiles(); 404 // Assert the last expired file is not removed. 405 if (minVersions == 0) { 406 assertEquals(1, sfs.size()); 407 } 408 long ts = sfs.iterator().next().getReader().getMaxTimestamp(); 409 assertTrue(ts < (edge.currentTime() - storeTtl)); 410 411 for (HStoreFile sf : sfs) { 412 sf.closeStoreFile(true); 413 } 414 } 415 416 @Test 417 public void testLowestModificationTime() throws Exception { 418 Configuration conf = HBaseConfiguration.create(); 419 FileSystem fs = FileSystem.get(conf); 420 // Initialize region 421 init(name.getMethodName(), conf); 422 423 int storeFileNum = 4; 424 for (int i = 1; i <= storeFileNum; i++) { 425 LOG.info("Adding some data for the store file #"+i); 426 this.store.add(new KeyValue(row, family, qf1, i, (byte[])null), null); 427 this.store.add(new KeyValue(row, family, qf2, i, (byte[])null), null); 428 this.store.add(new KeyValue(row, family, qf3, i, (byte[])null), null); 429 flush(i); 430 } 431 // after flush; check the lowest time stamp 432 long lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles()); 433 long lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles()); 434 assertEquals(lowestTimeStampFromManager,lowestTimeStampFromFS); 435 436 // after compact; check the lowest time stamp 437 store.compact(store.requestCompaction().get(), NoLimitThroughputController.INSTANCE, null); 438 lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles()); 439 lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles()); 440 assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS); 441 } 442 443 private static long getLowestTimeStampFromFS(FileSystem fs, 444 final Collection<HStoreFile> candidates) throws IOException { 445 long minTs = Long.MAX_VALUE; 446 if (candidates.isEmpty()) { 447 return minTs; 448 } 449 Path[] p = new Path[candidates.size()]; 450 int i = 0; 451 for (HStoreFile sf : candidates) { 452 p[i] = sf.getPath(); 453 ++i; 454 } 455 456 FileStatus[] stats = fs.listStatus(p); 457 if (stats == null || stats.length == 0) { 458 return minTs; 459 } 460 for (FileStatus s : stats) { 461 minTs = Math.min(minTs, s.getModificationTime()); 462 } 463 return minTs; 464 } 465 466 ////////////////////////////////////////////////////////////////////////////// 467 // Get tests 468 ////////////////////////////////////////////////////////////////////////////// 469 470 private static final int BLOCKSIZE_SMALL = 8192; 471 /** 472 * Test for hbase-1686. 473 * @throws IOException 474 */ 475 @Test 476 public void testEmptyStoreFile() throws IOException { 477 init(this.name.getMethodName()); 478 // Write a store file. 479 this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null); 480 this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null), null); 481 flush(1); 482 // Now put in place an empty store file. Its a little tricky. Have to 483 // do manually with hacked in sequence id. 484 HStoreFile f = this.store.getStorefiles().iterator().next(); 485 Path storedir = f.getPath().getParent(); 486 long seqid = f.getMaxSequenceId(); 487 Configuration c = HBaseConfiguration.create(); 488 FileSystem fs = FileSystem.get(c); 489 HFileContext meta = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).build(); 490 StoreFileWriter w = new StoreFileWriter.Builder(c, new CacheConfig(c), 491 fs) 492 .withOutputDir(storedir) 493 .withFileContext(meta) 494 .build(); 495 w.appendMetadata(seqid + 1, false); 496 w.close(); 497 this.store.close(); 498 // Reopen it... should pick up two files 499 this.store = 500 new HStore(this.store.getHRegion(), this.store.getColumnFamilyDescriptor(), c, false); 501 assertEquals(2, this.store.getStorefilesCount()); 502 503 result = HBaseTestingUtility.getFromStoreFile(store, 504 get.getRow(), 505 qualifiers); 506 assertEquals(1, result.size()); 507 } 508 509 /** 510 * Getting data from memstore only 511 * @throws IOException 512 */ 513 @Test 514 public void testGet_FromMemStoreOnly() throws IOException { 515 init(this.name.getMethodName()); 516 517 //Put data in memstore 518 this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null); 519 this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null), null); 520 this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null), null); 521 this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null), null); 522 this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null), null); 523 this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null), null); 524 525 //Get 526 result = HBaseTestingUtility.getFromStoreFile(store, 527 get.getRow(), qualifiers); 528 529 //Compare 530 assertCheck(); 531 } 532 533 @Test 534 public void testTimeRangeIfSomeCellsAreDroppedInFlush() throws IOException { 535 testTimeRangeIfSomeCellsAreDroppedInFlush(1); 536 testTimeRangeIfSomeCellsAreDroppedInFlush(3); 537 testTimeRangeIfSomeCellsAreDroppedInFlush(5); 538 } 539 540 private void testTimeRangeIfSomeCellsAreDroppedInFlush(int maxVersion) throws IOException { 541 init(this.name.getMethodName(), TEST_UTIL.getConfiguration(), 542 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(maxVersion).build()); 543 long currentTs = 100; 544 long minTs = currentTs; 545 // the extra cell won't be flushed to disk, 546 // so the min of timerange will be different between memStore and hfile. 547 for (int i = 0; i != (maxVersion + 1); ++i) { 548 this.store.add(new KeyValue(row, family, qf1, ++currentTs, (byte[])null), null); 549 if (i == 1) { 550 minTs = currentTs; 551 } 552 } 553 flushStore(store, id++); 554 555 Collection<HStoreFile> files = store.getStorefiles(); 556 assertEquals(1, files.size()); 557 HStoreFile f = files.iterator().next(); 558 f.initReader(); 559 StoreFileReader reader = f.getReader(); 560 assertEquals(minTs, reader.timeRange.getMin()); 561 assertEquals(currentTs, reader.timeRange.getMax()); 562 } 563 564 /** 565 * Getting data from files only 566 * @throws IOException 567 */ 568 @Test 569 public void testGet_FromFilesOnly() throws IOException { 570 init(this.name.getMethodName()); 571 572 //Put data in memstore 573 this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null); 574 this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null), null); 575 //flush 576 flush(1); 577 578 //Add more data 579 this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null), null); 580 this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null), null); 581 //flush 582 flush(2); 583 584 //Add more data 585 this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null), null); 586 this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null), null); 587 //flush 588 flush(3); 589 590 //Get 591 result = HBaseTestingUtility.getFromStoreFile(store, 592 get.getRow(), 593 qualifiers); 594 //this.store.get(get, qualifiers, result); 595 596 //Need to sort the result since multiple files 597 Collections.sort(result, CellComparatorImpl.COMPARATOR); 598 599 //Compare 600 assertCheck(); 601 } 602 603 /** 604 * Getting data from memstore and files 605 * @throws IOException 606 */ 607 @Test 608 public void testGet_FromMemStoreAndFiles() throws IOException { 609 init(this.name.getMethodName()); 610 611 //Put data in memstore 612 this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null); 613 this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null), null); 614 //flush 615 flush(1); 616 617 //Add more data 618 this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null), null); 619 this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null), null); 620 //flush 621 flush(2); 622 623 //Add more data 624 this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null), null); 625 this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null), null); 626 627 //Get 628 result = HBaseTestingUtility.getFromStoreFile(store, 629 get.getRow(), qualifiers); 630 631 //Need to sort the result since multiple files 632 Collections.sort(result, CellComparatorImpl.COMPARATOR); 633 634 //Compare 635 assertCheck(); 636 } 637 638 private void flush(int storeFilessize) throws IOException{ 639 this.store.snapshot(); 640 flushStore(store, id++); 641 assertEquals(storeFilessize, this.store.getStorefiles().size()); 642 assertEquals(0, ((AbstractMemStore)this.store.memstore).getActive().getCellsCount()); 643 } 644 645 private void assertCheck() { 646 assertEquals(expected.size(), result.size()); 647 for(int i=0; i<expected.size(); i++) { 648 assertEquals(expected.get(i), result.get(i)); 649 } 650 } 651 652 @After 653 public void tearDown() throws Exception { 654 EnvironmentEdgeManagerTestHelper.reset(); 655 if (store != null) { 656 try { 657 store.close(); 658 } catch (IOException e) { 659 } 660 store = null; 661 } 662 if (region != null) { 663 region.close(); 664 region = null; 665 } 666 } 667 668 @AfterClass 669 public static void tearDownAfterClass() throws IOException { 670 TEST_UTIL.cleanupTestDir(); 671 } 672 673 @Test 674 public void testHandleErrorsInFlush() throws Exception { 675 LOG.info("Setting up a faulty file system that cannot write"); 676 677 final Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); 678 User user = User.createUserForTesting(conf, 679 "testhandleerrorsinflush", new String[]{"foo"}); 680 // Inject our faulty LocalFileSystem 681 conf.setClass("fs.file.impl", FaultyFileSystem.class, 682 FileSystem.class); 683 user.runAs(new PrivilegedExceptionAction<Object>() { 684 @Override 685 public Object run() throws Exception { 686 // Make sure it worked (above is sensitive to caching details in hadoop core) 687 FileSystem fs = FileSystem.get(conf); 688 assertEquals(FaultyFileSystem.class, fs.getClass()); 689 690 // Initialize region 691 init(name.getMethodName(), conf); 692 693 LOG.info("Adding some data"); 694 store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null); 695 store.add(new KeyValue(row, family, qf2, 1, (byte[])null), null); 696 store.add(new KeyValue(row, family, qf3, 1, (byte[])null), null); 697 698 LOG.info("Before flush, we should have no files"); 699 700 Collection<StoreFileInfo> files = 701 store.getRegionFileSystem().getStoreFiles(store.getColumnFamilyName()); 702 assertEquals(0, files != null ? files.size() : 0); 703 704 //flush 705 try { 706 LOG.info("Flushing"); 707 flush(1); 708 fail("Didn't bubble up IOE!"); 709 } catch (IOException ioe) { 710 assertTrue(ioe.getMessage().contains("Fault injected")); 711 } 712 713 LOG.info("After failed flush, we should still have no files!"); 714 files = store.getRegionFileSystem().getStoreFiles(store.getColumnFamilyName()); 715 assertEquals(0, files != null ? files.size() : 0); 716 store.getHRegion().getWAL().close(); 717 return null; 718 } 719 }); 720 FileSystem.closeAllForUGI(user.getUGI()); 721 } 722 723 /** 724 * Faulty file system that will fail if you write past its fault position the FIRST TIME 725 * only; thereafter it will succeed. Used by {@link TestHRegion} too. 726 */ 727 static class FaultyFileSystem extends FilterFileSystem { 728 List<SoftReference<FaultyOutputStream>> outStreams = new ArrayList<>(); 729 private long faultPos = 200; 730 AtomicBoolean fault = new AtomicBoolean(true); 731 732 public FaultyFileSystem() { 733 super(new LocalFileSystem()); 734 System.err.println("Creating faulty!"); 735 } 736 737 @Override 738 public FSDataOutputStream create(Path p) throws IOException { 739 return new FaultyOutputStream(super.create(p), faultPos, fault); 740 } 741 742 @Override 743 public FSDataOutputStream create(Path f, FsPermission permission, 744 boolean overwrite, int bufferSize, short replication, long blockSize, 745 Progressable progress) throws IOException { 746 return new FaultyOutputStream(super.create(f, permission, 747 overwrite, bufferSize, replication, blockSize, progress), faultPos, fault); 748 } 749 750 @Override 751 public FSDataOutputStream createNonRecursive(Path f, boolean overwrite, 752 int bufferSize, short replication, long blockSize, Progressable progress) 753 throws IOException { 754 // Fake it. Call create instead. The default implementation throws an IOE 755 // that this is not supported. 756 return create(f, overwrite, bufferSize, replication, blockSize, progress); 757 } 758 } 759 760 static class FaultyOutputStream extends FSDataOutputStream { 761 volatile long faultPos = Long.MAX_VALUE; 762 private final AtomicBoolean fault; 763 764 public FaultyOutputStream(FSDataOutputStream out, long faultPos, final AtomicBoolean fault) 765 throws IOException { 766 super(out, null); 767 this.faultPos = faultPos; 768 this.fault = fault; 769 } 770 771 @Override 772 public synchronized void write(byte[] buf, int offset, int length) throws IOException { 773 System.err.println("faulty stream write at pos " + getPos()); 774 injectFault(); 775 super.write(buf, offset, length); 776 } 777 778 private void injectFault() throws IOException { 779 if (this.fault.get() && getPos() >= faultPos) { 780 throw new IOException("Fault injected"); 781 } 782 } 783 } 784 785 private static void flushStore(HStore store, long id) throws IOException { 786 StoreFlushContext storeFlushCtx = store.createFlushContext(id, FlushLifeCycleTracker.DUMMY); 787 storeFlushCtx.prepare(); 788 storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class)); 789 storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); 790 } 791 792 /** 793 * Generate a list of KeyValues for testing based on given parameters 794 * @param timestamps 795 * @param numRows 796 * @param qualifier 797 * @param family 798 * @return the rows key-value list 799 */ 800 List<Cell> getKeyValueSet(long[] timestamps, int numRows, 801 byte[] qualifier, byte[] family) { 802 List<Cell> kvList = new ArrayList<>(); 803 for (int i=1;i<=numRows;i++) { 804 byte[] b = Bytes.toBytes(i); 805 for (long timestamp: timestamps) { 806 kvList.add(new KeyValue(b, family, qualifier, timestamp, b)); 807 } 808 } 809 return kvList; 810 } 811 812 /** 813 * Test to ensure correctness when using Stores with multiple timestamps 814 * @throws IOException 815 */ 816 @Test 817 public void testMultipleTimestamps() throws IOException { 818 int numRows = 1; 819 long[] timestamps1 = new long[] {1,5,10,20}; 820 long[] timestamps2 = new long[] {30,80}; 821 822 init(this.name.getMethodName()); 823 824 List<Cell> kvList1 = getKeyValueSet(timestamps1,numRows, qf1, family); 825 for (Cell kv : kvList1) { 826 this.store.add(kv, null); 827 } 828 829 this.store.snapshot(); 830 flushStore(store, id++); 831 832 List<Cell> kvList2 = getKeyValueSet(timestamps2,numRows, qf1, family); 833 for(Cell kv : kvList2) { 834 this.store.add(kv, null); 835 } 836 837 List<Cell> result; 838 Get get = new Get(Bytes.toBytes(1)); 839 get.addColumn(family,qf1); 840 841 get.setTimeRange(0,15); 842 result = HBaseTestingUtility.getFromStoreFile(store, get); 843 assertTrue(result.size()>0); 844 845 get.setTimeRange(40,90); 846 result = HBaseTestingUtility.getFromStoreFile(store, get); 847 assertTrue(result.size()>0); 848 849 get.setTimeRange(10,45); 850 result = HBaseTestingUtility.getFromStoreFile(store, get); 851 assertTrue(result.size()>0); 852 853 get.setTimeRange(80,145); 854 result = HBaseTestingUtility.getFromStoreFile(store, get); 855 assertTrue(result.size()>0); 856 857 get.setTimeRange(1,2); 858 result = HBaseTestingUtility.getFromStoreFile(store, get); 859 assertTrue(result.size()>0); 860 861 get.setTimeRange(90,200); 862 result = HBaseTestingUtility.getFromStoreFile(store, get); 863 assertTrue(result.size()==0); 864 } 865 866 /** 867 * Test for HBASE-3492 - Test split on empty colfam (no store files). 868 * 869 * @throws IOException When the IO operations fail. 870 */ 871 @Test 872 public void testSplitWithEmptyColFam() throws IOException { 873 init(this.name.getMethodName()); 874 assertFalse(store.getSplitPoint().isPresent()); 875 store.getHRegion().forceSplit(null); 876 assertFalse(store.getSplitPoint().isPresent()); 877 store.getHRegion().clearSplit(); 878 } 879 880 @Test 881 public void testStoreUsesConfigurationFromHcdAndHtd() throws Exception { 882 final String CONFIG_KEY = "hbase.regionserver.thread.compaction.throttle"; 883 long anyValue = 10; 884 885 // We'll check that it uses correct config and propagates it appropriately by going thru 886 // the simplest "real" path I can find - "throttleCompaction", which just checks whether 887 // a number we pass in is higher than some config value, inside compactionPolicy. 888 Configuration conf = HBaseConfiguration.create(); 889 conf.setLong(CONFIG_KEY, anyValue); 890 init(name.getMethodName() + "-xml", conf); 891 assertTrue(store.throttleCompaction(anyValue + 1)); 892 assertFalse(store.throttleCompaction(anyValue)); 893 894 // HTD overrides XML. 895 --anyValue; 896 init(name.getMethodName() + "-htd", conf, TableDescriptorBuilder 897 .newBuilder(TableName.valueOf(table)).setValue(CONFIG_KEY, Long.toString(anyValue)), 898 ColumnFamilyDescriptorBuilder.of(family)); 899 assertTrue(store.throttleCompaction(anyValue + 1)); 900 assertFalse(store.throttleCompaction(anyValue)); 901 902 // HCD overrides them both. 903 --anyValue; 904 init(name.getMethodName() + "-hcd", conf, 905 TableDescriptorBuilder.newBuilder(TableName.valueOf(table)).setValue(CONFIG_KEY, 906 Long.toString(anyValue)), 907 ColumnFamilyDescriptorBuilder.newBuilder(family).setValue(CONFIG_KEY, Long.toString(anyValue)) 908 .build()); 909 assertTrue(store.throttleCompaction(anyValue + 1)); 910 assertFalse(store.throttleCompaction(anyValue)); 911 } 912 913 public static class DummyStoreEngine extends DefaultStoreEngine { 914 public static DefaultCompactor lastCreatedCompactor = null; 915 916 @Override 917 protected void createComponents(Configuration conf, HStore store, CellComparator comparator) 918 throws IOException { 919 super.createComponents(conf, store, comparator); 920 lastCreatedCompactor = this.compactor; 921 } 922 } 923 924 @Test 925 public void testStoreUsesSearchEngineOverride() throws Exception { 926 Configuration conf = HBaseConfiguration.create(); 927 conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DummyStoreEngine.class.getName()); 928 init(this.name.getMethodName(), conf); 929 assertEquals(DummyStoreEngine.lastCreatedCompactor, 930 this.store.storeEngine.getCompactor()); 931 } 932 933 private void addStoreFile() throws IOException { 934 HStoreFile f = this.store.getStorefiles().iterator().next(); 935 Path storedir = f.getPath().getParent(); 936 long seqid = this.store.getMaxSequenceId().orElse(0L); 937 Configuration c = TEST_UTIL.getConfiguration(); 938 FileSystem fs = FileSystem.get(c); 939 HFileContext fileContext = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).build(); 940 StoreFileWriter w = new StoreFileWriter.Builder(c, new CacheConfig(c), 941 fs) 942 .withOutputDir(storedir) 943 .withFileContext(fileContext) 944 .build(); 945 w.appendMetadata(seqid + 1, false); 946 w.close(); 947 LOG.info("Added store file:" + w.getPath()); 948 } 949 950 private void archiveStoreFile(int index) throws IOException { 951 Collection<HStoreFile> files = this.store.getStorefiles(); 952 HStoreFile sf = null; 953 Iterator<HStoreFile> it = files.iterator(); 954 for (int i = 0; i <= index; i++) { 955 sf = it.next(); 956 } 957 store.getRegionFileSystem().removeStoreFiles(store.getColumnFamilyName(), Lists.newArrayList(sf)); 958 } 959 960 private void closeCompactedFile(int index) throws IOException { 961 Collection<HStoreFile> files = 962 this.store.getStoreEngine().getStoreFileManager().getCompactedfiles(); 963 HStoreFile sf = null; 964 Iterator<HStoreFile> it = files.iterator(); 965 for (int i = 0; i <= index; i++) { 966 sf = it.next(); 967 } 968 sf.closeStoreFile(true); 969 store.getStoreEngine().getStoreFileManager().removeCompactedFiles(Lists.newArrayList(sf)); 970 } 971 972 @Test 973 public void testRefreshStoreFiles() throws Exception { 974 init(name.getMethodName()); 975 976 assertEquals(0, this.store.getStorefilesCount()); 977 978 // Test refreshing store files when no store files are there 979 store.refreshStoreFiles(); 980 assertEquals(0, this.store.getStorefilesCount()); 981 982 // add some data, flush 983 this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null); 984 flush(1); 985 assertEquals(1, this.store.getStorefilesCount()); 986 987 // add one more file 988 addStoreFile(); 989 990 assertEquals(1, this.store.getStorefilesCount()); 991 store.refreshStoreFiles(); 992 assertEquals(2, this.store.getStorefilesCount()); 993 994 // add three more files 995 addStoreFile(); 996 addStoreFile(); 997 addStoreFile(); 998 999 assertEquals(2, this.store.getStorefilesCount()); 1000 store.refreshStoreFiles(); 1001 assertEquals(5, this.store.getStorefilesCount()); 1002 1003 closeCompactedFile(0); 1004 archiveStoreFile(0); 1005 1006 assertEquals(5, this.store.getStorefilesCount()); 1007 store.refreshStoreFiles(); 1008 assertEquals(4, this.store.getStorefilesCount()); 1009 1010 archiveStoreFile(0); 1011 archiveStoreFile(1); 1012 archiveStoreFile(2); 1013 1014 assertEquals(4, this.store.getStorefilesCount()); 1015 store.refreshStoreFiles(); 1016 assertEquals(1, this.store.getStorefilesCount()); 1017 1018 archiveStoreFile(0); 1019 store.refreshStoreFiles(); 1020 assertEquals(0, this.store.getStorefilesCount()); 1021 } 1022 1023 @Test 1024 public void testRefreshStoreFilesNotChanged() throws IOException { 1025 init(name.getMethodName()); 1026 1027 assertEquals(0, this.store.getStorefilesCount()); 1028 1029 // add some data, flush 1030 this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null); 1031 flush(1); 1032 // add one more file 1033 addStoreFile(); 1034 1035 HStore spiedStore = spy(store); 1036 1037 // call first time after files changed 1038 spiedStore.refreshStoreFiles(); 1039 assertEquals(2, this.store.getStorefilesCount()); 1040 verify(spiedStore, times(1)).replaceStoreFiles(any(), any()); 1041 1042 // call second time 1043 spiedStore.refreshStoreFiles(); 1044 1045 //ensure that replaceStoreFiles is not called if files are not refreshed 1046 verify(spiedStore, times(0)).replaceStoreFiles(null, null); 1047 } 1048 1049 private long countMemStoreScanner(StoreScanner scanner) { 1050 if (scanner.currentScanners == null) { 1051 return 0; 1052 } 1053 return scanner.currentScanners.stream() 1054 .filter(s -> !s.isFileScanner()) 1055 .count(); 1056 } 1057 1058 @Test 1059 public void testNumberOfMemStoreScannersAfterFlush() throws IOException { 1060 long seqId = 100; 1061 long timestamp = System.currentTimeMillis(); 1062 Cell cell0 = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family) 1063 .setQualifier(qf1).setTimestamp(timestamp).setType(Cell.Type.Put) 1064 .setValue(qf1).build(); 1065 PrivateCellUtil.setSequenceId(cell0, seqId); 1066 testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Collections.emptyList()); 1067 1068 Cell cell1 = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family) 1069 .setQualifier(qf2).setTimestamp(timestamp).setType(Cell.Type.Put) 1070 .setValue(qf1).build(); 1071 PrivateCellUtil.setSequenceId(cell1, seqId); 1072 testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Arrays.asList(cell1)); 1073 1074 seqId = 101; 1075 timestamp = System.currentTimeMillis(); 1076 Cell cell2 = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row2).setFamily(family) 1077 .setQualifier(qf2).setTimestamp(timestamp).setType(Cell.Type.Put) 1078 .setValue(qf1).build(); 1079 PrivateCellUtil.setSequenceId(cell2, seqId); 1080 testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Arrays.asList(cell1, cell2)); 1081 } 1082 1083 private void testNumberOfMemStoreScannersAfterFlush(List<Cell> inputCellsBeforeSnapshot, 1084 List<Cell> inputCellsAfterSnapshot) throws IOException { 1085 init(this.name.getMethodName() + "-" + inputCellsBeforeSnapshot.size()); 1086 TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); 1087 long seqId = Long.MIN_VALUE; 1088 for (Cell c : inputCellsBeforeSnapshot) { 1089 quals.add(CellUtil.cloneQualifier(c)); 1090 seqId = Math.max(seqId, c.getSequenceId()); 1091 } 1092 for (Cell c : inputCellsAfterSnapshot) { 1093 quals.add(CellUtil.cloneQualifier(c)); 1094 seqId = Math.max(seqId, c.getSequenceId()); 1095 } 1096 inputCellsBeforeSnapshot.forEach(c -> store.add(c, null)); 1097 StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY); 1098 storeFlushCtx.prepare(); 1099 inputCellsAfterSnapshot.forEach(c -> store.add(c, null)); 1100 int numberOfMemScannersBeforeFlush = inputCellsAfterSnapshot.isEmpty() ? 1 : 2; 1101 try (StoreScanner s = (StoreScanner) store.getScanner(new Scan(), quals, seqId)) { 1102 // snapshot + active (if inputCellsAfterSnapshot isn't empty) 1103 assertEquals(numberOfMemScannersBeforeFlush, countMemStoreScanner(s)); 1104 storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class)); 1105 storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); 1106 // snapshot has no data after flush 1107 int numberOfMemScannersAfterFlush = inputCellsAfterSnapshot.isEmpty() ? 0 : 1; 1108 boolean more; 1109 int cellCount = 0; 1110 do { 1111 List<Cell> cells = new ArrayList<>(); 1112 more = s.next(cells); 1113 cellCount += cells.size(); 1114 assertEquals(more ? numberOfMemScannersAfterFlush : 0, countMemStoreScanner(s)); 1115 } while (more); 1116 assertEquals("The number of cells added before snapshot is " + inputCellsBeforeSnapshot.size() 1117 + ", The number of cells added after snapshot is " + inputCellsAfterSnapshot.size(), 1118 inputCellsBeforeSnapshot.size() + inputCellsAfterSnapshot.size(), cellCount); 1119 // the current scanners is cleared 1120 assertEquals(0, countMemStoreScanner(s)); 1121 } 1122 } 1123 1124 private Cell createCell(byte[] qualifier, long ts, long sequenceId, byte[] value) 1125 throws IOException { 1126 return createCell(row, qualifier, ts, sequenceId, value); 1127 } 1128 1129 private Cell createCell(byte[] row, byte[] qualifier, long ts, long sequenceId, byte[] value) 1130 throws IOException { 1131 Cell c = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family) 1132 .setQualifier(qualifier).setTimestamp(ts).setType(Cell.Type.Put) 1133 .setValue(value).build(); 1134 PrivateCellUtil.setSequenceId(c, sequenceId); 1135 return c; 1136 } 1137 1138 @Test 1139 public void testFlushBeforeCompletingScanWoFilter() throws IOException, InterruptedException { 1140 final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false); 1141 final int expectedSize = 3; 1142 testFlushBeforeCompletingScan(new MyListHook() { 1143 @Override 1144 public void hook(int currentSize) { 1145 if (currentSize == expectedSize - 1) { 1146 try { 1147 flushStore(store, id++); 1148 timeToGoNextRow.set(true); 1149 } catch (IOException e) { 1150 throw new RuntimeException(e); 1151 } 1152 } 1153 } 1154 }, new FilterBase() { 1155 @Override 1156 public Filter.ReturnCode filterCell(final Cell c) throws IOException { 1157 return ReturnCode.INCLUDE; 1158 } 1159 }, expectedSize); 1160 } 1161 1162 @Test 1163 public void testFlushBeforeCompletingScanWithFilter() throws IOException, InterruptedException { 1164 final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false); 1165 final int expectedSize = 2; 1166 testFlushBeforeCompletingScan(new MyListHook() { 1167 @Override 1168 public void hook(int currentSize) { 1169 if (currentSize == expectedSize - 1) { 1170 try { 1171 flushStore(store, id++); 1172 timeToGoNextRow.set(true); 1173 } catch (IOException e) { 1174 throw new RuntimeException(e); 1175 } 1176 } 1177 } 1178 }, new FilterBase() { 1179 @Override 1180 public Filter.ReturnCode filterCell(final Cell c) throws IOException { 1181 if (timeToGoNextRow.get()) { 1182 timeToGoNextRow.set(false); 1183 return ReturnCode.NEXT_ROW; 1184 } else { 1185 return ReturnCode.INCLUDE; 1186 } 1187 } 1188 }, expectedSize); 1189 } 1190 1191 @Test 1192 public void testFlushBeforeCompletingScanWithFilterHint() throws IOException, 1193 InterruptedException { 1194 final AtomicBoolean timeToGetHint = new AtomicBoolean(false); 1195 final int expectedSize = 2; 1196 testFlushBeforeCompletingScan(new MyListHook() { 1197 @Override 1198 public void hook(int currentSize) { 1199 if (currentSize == expectedSize - 1) { 1200 try { 1201 flushStore(store, id++); 1202 timeToGetHint.set(true); 1203 } catch (IOException e) { 1204 throw new RuntimeException(e); 1205 } 1206 } 1207 } 1208 }, new FilterBase() { 1209 @Override 1210 public Filter.ReturnCode filterCell(final Cell c) throws IOException { 1211 if (timeToGetHint.get()) { 1212 timeToGetHint.set(false); 1213 return Filter.ReturnCode.SEEK_NEXT_USING_HINT; 1214 } else { 1215 return Filter.ReturnCode.INCLUDE; 1216 } 1217 } 1218 @Override 1219 public Cell getNextCellHint(Cell currentCell) throws IOException { 1220 return currentCell; 1221 } 1222 }, expectedSize); 1223 } 1224 1225 private void testFlushBeforeCompletingScan(MyListHook hook, Filter filter, int expectedSize) 1226 throws IOException, InterruptedException { 1227 Configuration conf = HBaseConfiguration.create(); 1228 byte[] r0 = Bytes.toBytes("row0"); 1229 byte[] r1 = Bytes.toBytes("row1"); 1230 byte[] r2 = Bytes.toBytes("row2"); 1231 byte[] value0 = Bytes.toBytes("value0"); 1232 byte[] value1 = Bytes.toBytes("value1"); 1233 byte[] value2 = Bytes.toBytes("value2"); 1234 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 1235 long ts = EnvironmentEdgeManager.currentTime(); 1236 long seqId = 100; 1237 init(name.getMethodName(), conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), 1238 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(1).build(), 1239 new MyStoreHook() { 1240 @Override 1241 public long getSmallestReadPoint(HStore store) { 1242 return seqId + 3; 1243 } 1244 }); 1245 // The cells having the value0 won't be flushed to disk because the value of max version is 1 1246 store.add(createCell(r0, qf1, ts, seqId, value0), memStoreSizing); 1247 store.add(createCell(r0, qf2, ts, seqId, value0), memStoreSizing); 1248 store.add(createCell(r0, qf3, ts, seqId, value0), memStoreSizing); 1249 store.add(createCell(r1, qf1, ts + 1, seqId + 1, value1), memStoreSizing); 1250 store.add(createCell(r1, qf2, ts + 1, seqId + 1, value1), memStoreSizing); 1251 store.add(createCell(r1, qf3, ts + 1, seqId + 1, value1), memStoreSizing); 1252 store.add(createCell(r2, qf1, ts + 2, seqId + 2, value2), memStoreSizing); 1253 store.add(createCell(r2, qf2, ts + 2, seqId + 2, value2), memStoreSizing); 1254 store.add(createCell(r2, qf3, ts + 2, seqId + 2, value2), memStoreSizing); 1255 store.add(createCell(r1, qf1, ts + 3, seqId + 3, value1), memStoreSizing); 1256 store.add(createCell(r1, qf2, ts + 3, seqId + 3, value1), memStoreSizing); 1257 store.add(createCell(r1, qf3, ts + 3, seqId + 3, value1), memStoreSizing); 1258 List<Cell> myList = new MyList<>(hook); 1259 Scan scan = new Scan() 1260 .withStartRow(r1) 1261 .setFilter(filter); 1262 try (InternalScanner scanner = (InternalScanner) store.getScanner( 1263 scan, null, seqId + 3)){ 1264 // r1 1265 scanner.next(myList); 1266 assertEquals(expectedSize, myList.size()); 1267 for (Cell c : myList) { 1268 byte[] actualValue = CellUtil.cloneValue(c); 1269 assertTrue("expected:" + Bytes.toStringBinary(value1) 1270 + ", actual:" + Bytes.toStringBinary(actualValue) 1271 , Bytes.equals(actualValue, value1)); 1272 } 1273 List<Cell> normalList = new ArrayList<>(3); 1274 // r2 1275 scanner.next(normalList); 1276 assertEquals(3, normalList.size()); 1277 for (Cell c : normalList) { 1278 byte[] actualValue = CellUtil.cloneValue(c); 1279 assertTrue("expected:" + Bytes.toStringBinary(value2) 1280 + ", actual:" + Bytes.toStringBinary(actualValue) 1281 , Bytes.equals(actualValue, value2)); 1282 } 1283 } 1284 } 1285 1286 @Test 1287 public void testCreateScannerAndSnapshotConcurrently() throws IOException, InterruptedException { 1288 Configuration conf = HBaseConfiguration.create(); 1289 conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore.class.getName()); 1290 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) 1291 .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); 1292 byte[] value = Bytes.toBytes("value"); 1293 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 1294 long ts = EnvironmentEdgeManager.currentTime(); 1295 long seqId = 100; 1296 // older data whihc shouldn't be "seen" by client 1297 store.add(createCell(qf1, ts, seqId, value), memStoreSizing); 1298 store.add(createCell(qf2, ts, seqId, value), memStoreSizing); 1299 store.add(createCell(qf3, ts, seqId, value), memStoreSizing); 1300 TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); 1301 quals.add(qf1); 1302 quals.add(qf2); 1303 quals.add(qf3); 1304 StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY); 1305 MyCompactingMemStore.START_TEST.set(true); 1306 Runnable flush = () -> { 1307 // this is blocked until we create first scanner from pipeline and snapshot -- phase (1/5) 1308 // recreate the active memstore -- phase (4/5) 1309 storeFlushCtx.prepare(); 1310 }; 1311 ExecutorService service = Executors.newSingleThreadExecutor(); 1312 service.submit(flush); 1313 // we get scanner from pipeline and snapshot but they are empty. -- phase (2/5) 1314 // this is blocked until we recreate the active memstore -- phase (3/5) 1315 // we get scanner from active memstore but it is empty -- phase (5/5) 1316 InternalScanner scanner = (InternalScanner) store.getScanner( 1317 new Scan(new Get(row)), quals, seqId + 1); 1318 service.shutdown(); 1319 service.awaitTermination(20, TimeUnit.SECONDS); 1320 try { 1321 try { 1322 List<Cell> results = new ArrayList<>(); 1323 scanner.next(results); 1324 assertEquals(3, results.size()); 1325 for (Cell c : results) { 1326 byte[] actualValue = CellUtil.cloneValue(c); 1327 assertTrue("expected:" + Bytes.toStringBinary(value) 1328 + ", actual:" + Bytes.toStringBinary(actualValue) 1329 , Bytes.equals(actualValue, value)); 1330 } 1331 } finally { 1332 scanner.close(); 1333 } 1334 } finally { 1335 MyCompactingMemStore.START_TEST.set(false); 1336 storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class)); 1337 storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); 1338 } 1339 } 1340 1341 @Test 1342 public void testScanWithDoubleFlush() throws IOException { 1343 Configuration conf = HBaseConfiguration.create(); 1344 // Initialize region 1345 MyStore myStore = initMyStore(name.getMethodName(), conf, new MyStoreHook(){ 1346 @Override 1347 public void getScanners(MyStore store) throws IOException { 1348 final long tmpId = id++; 1349 ExecutorService s = Executors.newSingleThreadExecutor(); 1350 s.submit(() -> { 1351 try { 1352 // flush the store before storescanner updates the scanners from store. 1353 // The current data will be flushed into files, and the memstore will 1354 // be clear. 1355 // -- phase (4/4) 1356 flushStore(store, tmpId); 1357 }catch (IOException ex) { 1358 throw new RuntimeException(ex); 1359 } 1360 }); 1361 s.shutdown(); 1362 try { 1363 // wait for the flush, the thread will be blocked in HStore#notifyChangedReadersObservers. 1364 s.awaitTermination(3, TimeUnit.SECONDS); 1365 } catch (InterruptedException ex) { 1366 } 1367 } 1368 }); 1369 byte[] oldValue = Bytes.toBytes("oldValue"); 1370 byte[] currentValue = Bytes.toBytes("currentValue"); 1371 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 1372 long ts = EnvironmentEdgeManager.currentTime(); 1373 long seqId = 100; 1374 // older data whihc shouldn't be "seen" by client 1375 myStore.add(createCell(qf1, ts, seqId, oldValue), memStoreSizing); 1376 myStore.add(createCell(qf2, ts, seqId, oldValue), memStoreSizing); 1377 myStore.add(createCell(qf3, ts, seqId, oldValue), memStoreSizing); 1378 long snapshotId = id++; 1379 // push older data into snapshot -- phase (1/4) 1380 StoreFlushContext storeFlushCtx = store.createFlushContext(snapshotId, FlushLifeCycleTracker 1381 .DUMMY); 1382 storeFlushCtx.prepare(); 1383 1384 // insert current data into active -- phase (2/4) 1385 myStore.add(createCell(qf1, ts + 1, seqId + 1, currentValue), memStoreSizing); 1386 myStore.add(createCell(qf2, ts + 1, seqId + 1, currentValue), memStoreSizing); 1387 myStore.add(createCell(qf3, ts + 1, seqId + 1, currentValue), memStoreSizing); 1388 TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); 1389 quals.add(qf1); 1390 quals.add(qf2); 1391 quals.add(qf3); 1392 try (InternalScanner scanner = (InternalScanner) myStore.getScanner( 1393 new Scan(new Get(row)), quals, seqId + 1)) { 1394 // complete the flush -- phase (3/4) 1395 storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class)); 1396 storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); 1397 1398 List<Cell> results = new ArrayList<>(); 1399 scanner.next(results); 1400 assertEquals(3, results.size()); 1401 for (Cell c : results) { 1402 byte[] actualValue = CellUtil.cloneValue(c); 1403 assertTrue("expected:" + Bytes.toStringBinary(currentValue) 1404 + ", actual:" + Bytes.toStringBinary(actualValue) 1405 , Bytes.equals(actualValue, currentValue)); 1406 } 1407 } 1408 } 1409 1410 @Test 1411 public void testReclaimChunkWhenScaning() throws IOException { 1412 init("testReclaimChunkWhenScaning"); 1413 long ts = EnvironmentEdgeManager.currentTime(); 1414 long seqId = 100; 1415 byte[] value = Bytes.toBytes("value"); 1416 // older data whihc shouldn't be "seen" by client 1417 store.add(createCell(qf1, ts, seqId, value), null); 1418 store.add(createCell(qf2, ts, seqId, value), null); 1419 store.add(createCell(qf3, ts, seqId, value), null); 1420 TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); 1421 quals.add(qf1); 1422 quals.add(qf2); 1423 quals.add(qf3); 1424 try (InternalScanner scanner = (InternalScanner) store.getScanner( 1425 new Scan(new Get(row)), quals, seqId)) { 1426 List<Cell> results = new MyList<>(size -> { 1427 switch (size) { 1428 // 1) we get the first cell (qf1) 1429 // 2) flush the data to have StoreScanner update inner scanners 1430 // 3) the chunk will be reclaimed after updaing 1431 case 1: 1432 try { 1433 flushStore(store, id++); 1434 } catch (IOException e) { 1435 throw new RuntimeException(e); 1436 } 1437 break; 1438 // 1) we get the second cell (qf2) 1439 // 2) add some cell to fill some byte into the chunk (we have only one chunk) 1440 case 2: 1441 try { 1442 byte[] newValue = Bytes.toBytes("newValue"); 1443 // older data whihc shouldn't be "seen" by client 1444 store.add(createCell(qf1, ts + 1, seqId + 1, newValue), null); 1445 store.add(createCell(qf2, ts + 1, seqId + 1, newValue), null); 1446 store.add(createCell(qf3, ts + 1, seqId + 1, newValue), null); 1447 } catch (IOException e) { 1448 throw new RuntimeException(e); 1449 } 1450 break; 1451 default: 1452 break; 1453 } 1454 }); 1455 scanner.next(results); 1456 assertEquals(3, results.size()); 1457 for (Cell c : results) { 1458 byte[] actualValue = CellUtil.cloneValue(c); 1459 assertTrue("expected:" + Bytes.toStringBinary(value) 1460 + ", actual:" + Bytes.toStringBinary(actualValue) 1461 , Bytes.equals(actualValue, value)); 1462 } 1463 } 1464 } 1465 1466 /** 1467 * If there are two running InMemoryFlushRunnable, the later InMemoryFlushRunnable 1468 * may change the versionedList. And the first InMemoryFlushRunnable will use the chagned 1469 * versionedList to remove the corresponding segments. 1470 * In short, there will be some segements which isn't in merge are removed. 1471 * @throws IOException 1472 * @throws InterruptedException 1473 */ 1474 @Test 1475 public void testRunDoubleMemStoreCompactors() throws IOException, InterruptedException { 1476 int flushSize = 500; 1477 Configuration conf = HBaseConfiguration.create(); 1478 conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStoreWithCustomCompactor.class.getName()); 1479 conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.25); 1480 MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.set(0); 1481 conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushSize)); 1482 // Set the lower threshold to invoke the "MERGE" policy 1483 conf.set(MemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY, String.valueOf(0)); 1484 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) 1485 .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); 1486 byte[] value = Bytes.toBytes("thisisavarylargevalue"); 1487 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 1488 long ts = EnvironmentEdgeManager.currentTime(); 1489 long seqId = 100; 1490 // older data whihc shouldn't be "seen" by client 1491 store.add(createCell(qf1, ts, seqId, value), memStoreSizing); 1492 store.add(createCell(qf2, ts, seqId, value), memStoreSizing); 1493 store.add(createCell(qf3, ts, seqId, value), memStoreSizing); 1494 assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get()); 1495 StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY); 1496 storeFlushCtx.prepare(); 1497 // This shouldn't invoke another in-memory flush because the first compactor thread 1498 // hasn't accomplished the in-memory compaction. 1499 store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing); 1500 store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing); 1501 store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing); 1502 assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get()); 1503 //okay. Let the compaction be completed 1504 MyMemStoreCompactor.START_COMPACTOR_LATCH.countDown(); 1505 CompactingMemStore mem = (CompactingMemStore) ((HStore)store).memstore; 1506 while (mem.isMemStoreFlushingInMemory()) { 1507 TimeUnit.SECONDS.sleep(1); 1508 } 1509 // This should invoke another in-memory flush. 1510 store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing); 1511 store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing); 1512 store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing); 1513 assertEquals(2, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get()); 1514 conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1515 String.valueOf(TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE)); 1516 storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class)); 1517 storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); 1518 } 1519 1520 @Test 1521 public void testAge() throws IOException { 1522 long currentTime = System.currentTimeMillis(); 1523 ManualEnvironmentEdge edge = new ManualEnvironmentEdge(); 1524 edge.setValue(currentTime); 1525 EnvironmentEdgeManager.injectEdge(edge); 1526 Configuration conf = TEST_UTIL.getConfiguration(); 1527 ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.of(family); 1528 initHRegion(name.getMethodName(), conf, 1529 TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), hcd, null, false); 1530 HStore store = new HStore(region, hcd, conf, false) { 1531 1532 @Override 1533 protected StoreEngine<?, ?, ?, ?> createStoreEngine(HStore store, Configuration conf, 1534 CellComparator kvComparator) throws IOException { 1535 List<HStoreFile> storefiles = 1536 Arrays.asList(mockStoreFile(currentTime - 10), mockStoreFile(currentTime - 100), 1537 mockStoreFile(currentTime - 1000), mockStoreFile(currentTime - 10000)); 1538 StoreFileManager sfm = mock(StoreFileManager.class); 1539 when(sfm.getStorefiles()).thenReturn(storefiles); 1540 StoreEngine<?, ?, ?, ?> storeEngine = mock(StoreEngine.class); 1541 when(storeEngine.getStoreFileManager()).thenReturn(sfm); 1542 return storeEngine; 1543 } 1544 }; 1545 assertEquals(10L, store.getMinStoreFileAge().getAsLong()); 1546 assertEquals(10000L, store.getMaxStoreFileAge().getAsLong()); 1547 assertEquals((10 + 100 + 1000 + 10000) / 4.0, store.getAvgStoreFileAge().getAsDouble(), 1E-4); 1548 } 1549 1550 private HStoreFile mockStoreFile(long createdTime) { 1551 StoreFileInfo info = mock(StoreFileInfo.class); 1552 when(info.getCreatedTimestamp()).thenReturn(createdTime); 1553 HStoreFile sf = mock(HStoreFile.class); 1554 when(sf.getReader()).thenReturn(mock(StoreFileReader.class)); 1555 when(sf.isHFile()).thenReturn(true); 1556 when(sf.getFileInfo()).thenReturn(info); 1557 return sf; 1558 } 1559 1560 private MyStore initMyStore(String methodName, Configuration conf, MyStoreHook hook) 1561 throws IOException { 1562 return (MyStore) init(methodName, conf, 1563 TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), 1564 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(5).build(), hook); 1565 } 1566 1567 private static class MyStore extends HStore { 1568 private final MyStoreHook hook; 1569 1570 MyStore(final HRegion region, final ColumnFamilyDescriptor family, final Configuration 1571 confParam, MyStoreHook hook, boolean switchToPread) throws IOException { 1572 super(region, family, confParam, false); 1573 this.hook = hook; 1574 } 1575 1576 @Override 1577 public List<KeyValueScanner> getScanners(List<HStoreFile> files, boolean cacheBlocks, 1578 boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, 1579 boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt, 1580 boolean includeMemstoreScanner) throws IOException { 1581 hook.getScanners(this); 1582 return super.getScanners(files, cacheBlocks, usePread, isCompaction, matcher, startRow, true, 1583 stopRow, false, readPt, includeMemstoreScanner); 1584 } 1585 1586 @Override 1587 public long getSmallestReadPoint() { 1588 return hook.getSmallestReadPoint(this); 1589 } 1590 } 1591 1592 private abstract static class MyStoreHook { 1593 1594 void getScanners(MyStore store) throws IOException { 1595 } 1596 1597 long getSmallestReadPoint(HStore store) { 1598 return store.getHRegion().getSmallestReadPoint(); 1599 } 1600 } 1601 1602 @Test 1603 public void testSwitchingPreadtoStreamParallelyWithCompactionDischarger() throws Exception { 1604 Configuration conf = HBaseConfiguration.create(); 1605 conf.set("hbase.hstore.engine.class", DummyStoreEngine.class.getName()); 1606 conf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 0); 1607 // Set the lower threshold to invoke the "MERGE" policy 1608 MyStore store = initMyStore(name.getMethodName(), conf, new MyStoreHook() {}); 1609 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 1610 long ts = System.currentTimeMillis(); 1611 long seqID = 1L; 1612 // Add some data to the region and do some flushes 1613 for (int i = 1; i < 10; i++) { 1614 store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), 1615 memStoreSizing); 1616 } 1617 // flush them 1618 flushStore(store, seqID); 1619 for (int i = 11; i < 20; i++) { 1620 store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), 1621 memStoreSizing); 1622 } 1623 // flush them 1624 flushStore(store, seqID); 1625 for (int i = 21; i < 30; i++) { 1626 store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), 1627 memStoreSizing); 1628 } 1629 // flush them 1630 flushStore(store, seqID); 1631 1632 assertEquals(3, store.getStorefilesCount()); 1633 Scan scan = new Scan(); 1634 scan.addFamily(family); 1635 Collection<HStoreFile> storefiles2 = store.getStorefiles(); 1636 ArrayList<HStoreFile> actualStorefiles = Lists.newArrayList(storefiles2); 1637 StoreScanner storeScanner = 1638 (StoreScanner) store.getScanner(scan, scan.getFamilyMap().get(family), Long.MAX_VALUE); 1639 // get the current heap 1640 KeyValueHeap heap = storeScanner.heap; 1641 // create more store files 1642 for (int i = 31; i < 40; i++) { 1643 store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), 1644 memStoreSizing); 1645 } 1646 // flush them 1647 flushStore(store, seqID); 1648 1649 for (int i = 41; i < 50; i++) { 1650 store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), 1651 memStoreSizing); 1652 } 1653 // flush them 1654 flushStore(store, seqID); 1655 storefiles2 = store.getStorefiles(); 1656 ArrayList<HStoreFile> actualStorefiles1 = Lists.newArrayList(storefiles2); 1657 actualStorefiles1.removeAll(actualStorefiles); 1658 // Do compaction 1659 MyThread thread = new MyThread(storeScanner); 1660 thread.start(); 1661 store.replaceStoreFiles(actualStorefiles, actualStorefiles1); 1662 thread.join(); 1663 KeyValueHeap heap2 = thread.getHeap(); 1664 assertFalse(heap.equals(heap2)); 1665 } 1666 1667 @Test 1668 public void testInMemoryCompactionTypeWithLowerCase() throws IOException, InterruptedException { 1669 Configuration conf = HBaseConfiguration.create(); 1670 conf.set("hbase.systemtables.compacting.memstore.type", "eager"); 1671 init(name.getMethodName(), conf, 1672 TableDescriptorBuilder.newBuilder( 1673 TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME, "meta".getBytes())), 1674 ColumnFamilyDescriptorBuilder.newBuilder(family) 1675 .setInMemoryCompaction(MemoryCompactionPolicy.NONE).build()); 1676 assertTrue(((MemStoreCompactor) ((CompactingMemStore) store.memstore).compactor).toString() 1677 .startsWith("eager".toUpperCase())); 1678 } 1679 1680 @Test 1681 public void testSpaceQuotaChangeAfterReplacement() throws IOException { 1682 final TableName tn = TableName.valueOf(name.getMethodName()); 1683 init(name.getMethodName()); 1684 1685 RegionSizeStoreImpl sizeStore = new RegionSizeStoreImpl(); 1686 1687 HStoreFile sf1 = mockStoreFileWithLength(1024L); 1688 HStoreFile sf2 = mockStoreFileWithLength(2048L); 1689 HStoreFile sf3 = mockStoreFileWithLength(4096L); 1690 HStoreFile sf4 = mockStoreFileWithLength(8192L); 1691 1692 RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tn).setStartKey(Bytes.toBytes("a")) 1693 .setEndKey(Bytes.toBytes("b")).build(); 1694 1695 // Compacting two files down to one, reducing size 1696 sizeStore.put(regionInfo, 1024L + 4096L); 1697 store.updateSpaceQuotaAfterFileReplacement( 1698 sizeStore, regionInfo, Arrays.asList(sf1, sf3), Arrays.asList(sf2)); 1699 1700 assertEquals(2048L, sizeStore.getRegionSize(regionInfo).getSize()); 1701 1702 // The same file length in and out should have no change 1703 store.updateSpaceQuotaAfterFileReplacement( 1704 sizeStore, regionInfo, Arrays.asList(sf2), Arrays.asList(sf2)); 1705 1706 assertEquals(2048L, sizeStore.getRegionSize(regionInfo).getSize()); 1707 1708 // Increase the total size used 1709 store.updateSpaceQuotaAfterFileReplacement( 1710 sizeStore, regionInfo, Arrays.asList(sf2), Arrays.asList(sf3)); 1711 1712 assertEquals(4096L, sizeStore.getRegionSize(regionInfo).getSize()); 1713 1714 RegionInfo regionInfo2 = RegionInfoBuilder.newBuilder(tn).setStartKey(Bytes.toBytes("b")) 1715 .setEndKey(Bytes.toBytes("c")).build(); 1716 store.updateSpaceQuotaAfterFileReplacement(sizeStore, regionInfo2, null, Arrays.asList(sf4)); 1717 1718 assertEquals(8192L, sizeStore.getRegionSize(regionInfo2).getSize()); 1719 } 1720 1721 private HStoreFile mockStoreFileWithLength(long length) { 1722 HStoreFile sf = mock(HStoreFile.class); 1723 StoreFileReader sfr = mock(StoreFileReader.class); 1724 when(sf.isHFile()).thenReturn(true); 1725 when(sf.getReader()).thenReturn(sfr); 1726 when(sfr.length()).thenReturn(length); 1727 return sf; 1728 } 1729 1730 private static class MyThread extends Thread { 1731 private StoreScanner scanner; 1732 private KeyValueHeap heap; 1733 1734 public MyThread(StoreScanner scanner) { 1735 this.scanner = scanner; 1736 } 1737 1738 public KeyValueHeap getHeap() { 1739 return this.heap; 1740 } 1741 1742 @Override 1743 public void run() { 1744 scanner.trySwitchToStreamRead(); 1745 heap = scanner.heap; 1746 } 1747 } 1748 1749 private static class MyMemStoreCompactor extends MemStoreCompactor { 1750 private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0); 1751 private static final CountDownLatch START_COMPACTOR_LATCH = new CountDownLatch(1); 1752 public MyMemStoreCompactor(CompactingMemStore compactingMemStore, MemoryCompactionPolicy 1753 compactionPolicy) throws IllegalArgumentIOException { 1754 super(compactingMemStore, compactionPolicy); 1755 } 1756 1757 @Override 1758 public boolean start() throws IOException { 1759 boolean isFirst = RUNNER_COUNT.getAndIncrement() == 0; 1760 if (isFirst) { 1761 try { 1762 START_COMPACTOR_LATCH.await(); 1763 return super.start(); 1764 } catch (InterruptedException ex) { 1765 throw new RuntimeException(ex); 1766 } 1767 } 1768 return super.start(); 1769 } 1770 } 1771 1772 public static class MyCompactingMemStoreWithCustomCompactor extends CompactingMemStore { 1773 private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0); 1774 public MyCompactingMemStoreWithCustomCompactor(Configuration conf, CellComparatorImpl c, 1775 HStore store, RegionServicesForStores regionServices, 1776 MemoryCompactionPolicy compactionPolicy) throws IOException { 1777 super(conf, c, store, regionServices, compactionPolicy); 1778 } 1779 1780 @Override 1781 protected MemStoreCompactor createMemStoreCompactor(MemoryCompactionPolicy compactionPolicy) 1782 throws IllegalArgumentIOException { 1783 return new MyMemStoreCompactor(this, compactionPolicy); 1784 } 1785 1786 @Override 1787 protected boolean setInMemoryCompactionFlag() { 1788 boolean rval = super.setInMemoryCompactionFlag(); 1789 if (rval) { 1790 RUNNER_COUNT.incrementAndGet(); 1791 if (LOG.isDebugEnabled()) { 1792 LOG.debug("runner count: " + RUNNER_COUNT.get()); 1793 } 1794 } 1795 return rval; 1796 } 1797 } 1798 1799 public static class MyCompactingMemStore extends CompactingMemStore { 1800 private static final AtomicBoolean START_TEST = new AtomicBoolean(false); 1801 private final CountDownLatch getScannerLatch = new CountDownLatch(1); 1802 private final CountDownLatch snapshotLatch = new CountDownLatch(1); 1803 public MyCompactingMemStore(Configuration conf, CellComparatorImpl c, 1804 HStore store, RegionServicesForStores regionServices, 1805 MemoryCompactionPolicy compactionPolicy) throws IOException { 1806 super(conf, c, store, regionServices, compactionPolicy); 1807 } 1808 1809 @Override 1810 protected List<KeyValueScanner> createList(int capacity) { 1811 if (START_TEST.get()) { 1812 try { 1813 getScannerLatch.countDown(); 1814 snapshotLatch.await(); 1815 } catch (InterruptedException e) { 1816 throw new RuntimeException(e); 1817 } 1818 } 1819 return new ArrayList<>(capacity); 1820 } 1821 @Override 1822 protected void pushActiveToPipeline(MutableSegment active) { 1823 if (START_TEST.get()) { 1824 try { 1825 getScannerLatch.await(); 1826 } catch (InterruptedException e) { 1827 throw new RuntimeException(e); 1828 } 1829 } 1830 1831 super.pushActiveToPipeline(active); 1832 if (START_TEST.get()) { 1833 snapshotLatch.countDown(); 1834 } 1835 } 1836 } 1837 1838 interface MyListHook { 1839 void hook(int currentSize); 1840 } 1841 1842 private static class MyList<T> implements List<T> { 1843 private final List<T> delegatee = new ArrayList<>(); 1844 private final MyListHook hookAtAdd; 1845 MyList(final MyListHook hookAtAdd) { 1846 this.hookAtAdd = hookAtAdd; 1847 } 1848 @Override 1849 public int size() {return delegatee.size();} 1850 1851 @Override 1852 public boolean isEmpty() {return delegatee.isEmpty();} 1853 1854 @Override 1855 public boolean contains(Object o) {return delegatee.contains(o);} 1856 1857 @Override 1858 public Iterator<T> iterator() {return delegatee.iterator();} 1859 1860 @Override 1861 public Object[] toArray() {return delegatee.toArray();} 1862 1863 @Override 1864 public <R> R[] toArray(R[] a) {return delegatee.toArray(a);} 1865 1866 @Override 1867 public boolean add(T e) { 1868 hookAtAdd.hook(size()); 1869 return delegatee.add(e); 1870 } 1871 1872 @Override 1873 public boolean remove(Object o) {return delegatee.remove(o);} 1874 1875 @Override 1876 public boolean containsAll(Collection<?> c) {return delegatee.containsAll(c);} 1877 1878 @Override 1879 public boolean addAll(Collection<? extends T> c) {return delegatee.addAll(c);} 1880 1881 @Override 1882 public boolean addAll(int index, Collection<? extends T> c) {return delegatee.addAll(index, c);} 1883 1884 @Override 1885 public boolean removeAll(Collection<?> c) {return delegatee.removeAll(c);} 1886 1887 @Override 1888 public boolean retainAll(Collection<?> c) {return delegatee.retainAll(c);} 1889 1890 @Override 1891 public void clear() {delegatee.clear();} 1892 1893 @Override 1894 public T get(int index) {return delegatee.get(index);} 1895 1896 @Override 1897 public T set(int index, T element) {return delegatee.set(index, element);} 1898 1899 @Override 1900 public void add(int index, T element) {delegatee.add(index, element);} 1901 1902 @Override 1903 public T remove(int index) {return delegatee.remove(index);} 1904 1905 @Override 1906 public int indexOf(Object o) {return delegatee.indexOf(o);} 1907 1908 @Override 1909 public int lastIndexOf(Object o) {return delegatee.lastIndexOf(o);} 1910 1911 @Override 1912 public ListIterator<T> listIterator() {return delegatee.listIterator();} 1913 1914 @Override 1915 public ListIterator<T> listIterator(int index) {return delegatee.listIterator(index);} 1916 1917 @Override 1918 public List<T> subList(int fromIndex, int toIndex) {return delegatee.subList(fromIndex, toIndex);} 1919 } 1920}