001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.Assert.assertArrayEquals; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertFalse; 023import static org.junit.Assert.assertTrue; 024import static org.junit.Assert.fail; 025import static org.mockito.ArgumentMatchers.any; 026import static org.mockito.Mockito.mock; 027import static org.mockito.Mockito.spy; 028import static org.mockito.Mockito.times; 029import static org.mockito.Mockito.verify; 030import static org.mockito.Mockito.when; 031 032import java.io.IOException; 033import java.lang.ref.SoftReference; 034import java.security.PrivilegedExceptionAction; 035import java.util.ArrayList; 036import java.util.Arrays; 037import java.util.Collection; 038import java.util.Collections; 039import java.util.Iterator; 040import java.util.List; 041import java.util.ListIterator; 042import java.util.NavigableSet; 043import java.util.TreeSet; 044import java.util.concurrent.ConcurrentSkipListSet; 045import java.util.concurrent.CountDownLatch; 046import java.util.concurrent.ExecutorService; 047import java.util.concurrent.Executors; 048import java.util.concurrent.ThreadPoolExecutor; 049import java.util.concurrent.TimeUnit; 050import java.util.concurrent.atomic.AtomicBoolean; 051import java.util.concurrent.atomic.AtomicInteger; 052import org.apache.hadoop.conf.Configuration; 053import org.apache.hadoop.fs.FSDataOutputStream; 054import org.apache.hadoop.fs.FileStatus; 055import org.apache.hadoop.fs.FileSystem; 056import org.apache.hadoop.fs.FilterFileSystem; 057import org.apache.hadoop.fs.LocalFileSystem; 058import org.apache.hadoop.fs.Path; 059import org.apache.hadoop.fs.permission.FsPermission; 060import org.apache.hadoop.hbase.Cell; 061import org.apache.hadoop.hbase.CellBuilderFactory; 062import org.apache.hadoop.hbase.CellBuilderType; 063import org.apache.hadoop.hbase.CellComparator; 064import org.apache.hadoop.hbase.CellComparatorImpl; 065import org.apache.hadoop.hbase.CellUtil; 066import org.apache.hadoop.hbase.HBaseClassTestRule; 067import org.apache.hadoop.hbase.HBaseConfiguration; 068import org.apache.hadoop.hbase.HBaseTestingUtility; 069import org.apache.hadoop.hbase.HConstants; 070import org.apache.hadoop.hbase.KeyValue; 071import org.apache.hadoop.hbase.MemoryCompactionPolicy; 072import org.apache.hadoop.hbase.NamespaceDescriptor; 073import org.apache.hadoop.hbase.PrivateCellUtil; 074import org.apache.hadoop.hbase.TableName; 075import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 076import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 077import org.apache.hadoop.hbase.client.Get; 078import org.apache.hadoop.hbase.client.RegionInfo; 079import org.apache.hadoop.hbase.client.RegionInfoBuilder; 080import org.apache.hadoop.hbase.client.Scan; 081import org.apache.hadoop.hbase.client.TableDescriptor; 082import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 083import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException; 084import org.apache.hadoop.hbase.filter.Filter; 085import org.apache.hadoop.hbase.filter.FilterBase; 086import org.apache.hadoop.hbase.io.compress.Compression; 087import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 088import org.apache.hadoop.hbase.io.hfile.CacheConfig; 089import org.apache.hadoop.hbase.io.hfile.HFile; 090import org.apache.hadoop.hbase.io.hfile.HFileContext; 091import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 092import org.apache.hadoop.hbase.monitoring.MonitoredTask; 093import org.apache.hadoop.hbase.quotas.RegionSizeStoreImpl; 094import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; 095import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; 096import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher; 097import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; 098import org.apache.hadoop.hbase.security.User; 099import org.apache.hadoop.hbase.testclassification.MediumTests; 100import org.apache.hadoop.hbase.testclassification.RegionServerTests; 101import org.apache.hadoop.hbase.util.Bytes; 102import org.apache.hadoop.hbase.util.CommonFSUtils; 103import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 104import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; 105import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge; 106import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; 107import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 108import org.apache.hadoop.hbase.wal.WALFactory; 109import org.apache.hadoop.util.Progressable; 110import org.junit.After; 111import org.junit.AfterClass; 112import org.junit.Before; 113import org.junit.ClassRule; 114import org.junit.Rule; 115import org.junit.Test; 116import org.junit.experimental.categories.Category; 117import org.junit.rules.TestName; 118import org.mockito.Mockito; 119import org.slf4j.Logger; 120import org.slf4j.LoggerFactory; 121 122import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 123 124/** 125 * Test class for the HStore 126 */ 127@Category({ RegionServerTests.class, MediumTests.class }) 128public class TestHStore { 129 130 @ClassRule 131 public static final HBaseClassTestRule CLASS_RULE = 132 HBaseClassTestRule.forClass(TestHStore.class); 133 134 private static final Logger LOG = LoggerFactory.getLogger(TestHStore.class); 135 @Rule 136 public TestName name = new TestName(); 137 138 HRegion region; 139 HStore store; 140 byte [] table = Bytes.toBytes("table"); 141 byte [] family = Bytes.toBytes("family"); 142 143 byte [] row = Bytes.toBytes("row"); 144 byte [] row2 = Bytes.toBytes("row2"); 145 byte [] qf1 = Bytes.toBytes("qf1"); 146 byte [] qf2 = Bytes.toBytes("qf2"); 147 byte [] qf3 = Bytes.toBytes("qf3"); 148 byte [] qf4 = Bytes.toBytes("qf4"); 149 byte [] qf5 = Bytes.toBytes("qf5"); 150 byte [] qf6 = Bytes.toBytes("qf6"); 151 152 NavigableSet<byte[]> qualifiers = new ConcurrentSkipListSet<>(Bytes.BYTES_COMPARATOR); 153 154 List<Cell> expected = new ArrayList<>(); 155 List<Cell> result = new ArrayList<>(); 156 157 long id = System.currentTimeMillis(); 158 Get get = new Get(row); 159 160 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 161 private static final String DIR = TEST_UTIL.getDataTestDir("TestStore").toString(); 162 163 164 /** 165 * Setup 166 * @throws IOException 167 */ 168 @Before 169 public void setUp() throws IOException { 170 qualifiers.clear(); 171 qualifiers.add(qf1); 172 qualifiers.add(qf3); 173 qualifiers.add(qf5); 174 175 Iterator<byte[]> iter = qualifiers.iterator(); 176 while(iter.hasNext()){ 177 byte [] next = iter.next(); 178 expected.add(new KeyValue(row, family, next, 1, (byte[])null)); 179 get.addColumn(family, next); 180 } 181 } 182 183 private void init(String methodName) throws IOException { 184 init(methodName, TEST_UTIL.getConfiguration()); 185 } 186 187 private HStore init(String methodName, Configuration conf) throws IOException { 188 // some of the tests write 4 versions and then flush 189 // (with HBASE-4241, lower versions are collected on flush) 190 return init(methodName, conf, 191 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(4).build()); 192 } 193 194 private HStore init(String methodName, Configuration conf, ColumnFamilyDescriptor hcd) 195 throws IOException { 196 return init(methodName, conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), hcd); 197 } 198 199 private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder, 200 ColumnFamilyDescriptor hcd) throws IOException { 201 return init(methodName, conf, builder, hcd, null); 202 } 203 204 private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder, 205 ColumnFamilyDescriptor hcd, MyStoreHook hook) throws IOException { 206 return init(methodName, conf, builder, hcd, hook, false); 207 } 208 209 private void initHRegion(String methodName, Configuration conf, TableDescriptorBuilder builder, 210 ColumnFamilyDescriptor hcd, MyStoreHook hook, boolean switchToPread) throws IOException { 211 TableDescriptor htd = builder.setColumnFamily(hcd).build(); 212 Path basedir = new Path(DIR + methodName); 213 Path tableDir = CommonFSUtils.getTableDir(basedir, htd.getTableName()); 214 final Path logdir = new Path(basedir, AbstractFSWALProvider.getWALDirectoryName(methodName)); 215 216 FileSystem fs = FileSystem.get(conf); 217 218 fs.delete(logdir, true); 219 ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 220 MemStoreLABImpl.CHUNK_SIZE_DEFAULT, 1, 0, 221 null, MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); 222 RegionInfo info = RegionInfoBuilder.newBuilder(htd.getTableName()).build(); 223 Configuration walConf = new Configuration(conf); 224 CommonFSUtils.setRootDir(walConf, basedir); 225 WALFactory wals = new WALFactory(walConf, methodName); 226 region = new HRegion(new HRegionFileSystem(conf, fs, tableDir, info), wals.getWAL(info), conf, 227 htd, null); 228 region.regionServicesForStores = Mockito.spy(region.regionServicesForStores); 229 ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1); 230 Mockito.when(region.regionServicesForStores.getInMemoryCompactionPool()).thenReturn(pool); 231 } 232 233 private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder, 234 ColumnFamilyDescriptor hcd, MyStoreHook hook, boolean switchToPread) throws IOException { 235 initHRegion(methodName, conf, builder, hcd, hook, switchToPread); 236 if (hook == null) { 237 store = new HStore(region, hcd, conf, false); 238 } else { 239 store = new MyStore(region, hcd, conf, hook, switchToPread); 240 } 241 return store; 242 } 243 244 /** 245 * Test we do not lose data if we fail a flush and then close. 246 * Part of HBase-10466 247 * @throws Exception 248 */ 249 @Test 250 public void testFlushSizeSizing() throws Exception { 251 LOG.info("Setting up a faulty file system that cannot write in " + this.name.getMethodName()); 252 final Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); 253 // Only retry once. 254 conf.setInt("hbase.hstore.flush.retries.number", 1); 255 User user = User.createUserForTesting(conf, this.name.getMethodName(), 256 new String[]{"foo"}); 257 // Inject our faulty LocalFileSystem 258 conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class); 259 user.runAs(new PrivilegedExceptionAction<Object>() { 260 @Override 261 public Object run() throws Exception { 262 // Make sure it worked (above is sensitive to caching details in hadoop core) 263 FileSystem fs = FileSystem.get(conf); 264 assertEquals(FaultyFileSystem.class, fs.getClass()); 265 FaultyFileSystem ffs = (FaultyFileSystem)fs; 266 267 // Initialize region 268 init(name.getMethodName(), conf); 269 270 MemStoreSize mss = store.memstore.getFlushableSize(); 271 assertEquals(0, mss.getDataSize()); 272 LOG.info("Adding some data"); 273 MemStoreSizing kvSize = new NonThreadSafeMemStoreSizing(); 274 store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), kvSize); 275 // add the heap size of active (mutable) segment 276 kvSize.incMemStoreSize(0, MutableSegment.DEEP_OVERHEAD, 0, 0); 277 mss = store.memstore.getFlushableSize(); 278 assertEquals(kvSize.getMemStoreSize(), mss); 279 // Flush. Bug #1 from HBASE-10466. Make sure size calculation on failed flush is right. 280 try { 281 LOG.info("Flushing"); 282 flushStore(store, id++); 283 fail("Didn't bubble up IOE!"); 284 } catch (IOException ioe) { 285 assertTrue(ioe.getMessage().contains("Fault injected")); 286 } 287 // due to snapshot, change mutable to immutable segment 288 kvSize.incMemStoreSize(0, 289 CSLMImmutableSegment.DEEP_OVERHEAD_CSLM - MutableSegment.DEEP_OVERHEAD, 0, 0); 290 mss = store.memstore.getFlushableSize(); 291 assertEquals(kvSize.getMemStoreSize(), mss); 292 MemStoreSizing kvSize2 = new NonThreadSafeMemStoreSizing(); 293 store.add(new KeyValue(row, family, qf2, 2, (byte[]) null), kvSize2); 294 kvSize2.incMemStoreSize(0, MutableSegment.DEEP_OVERHEAD, 0, 0); 295 // Even though we add a new kv, we expect the flushable size to be 'same' since we have 296 // not yet cleared the snapshot -- the above flush failed. 297 assertEquals(kvSize.getMemStoreSize(), mss); 298 ffs.fault.set(false); 299 flushStore(store, id++); 300 mss = store.memstore.getFlushableSize(); 301 // Size should be the foreground kv size. 302 assertEquals(kvSize2.getMemStoreSize(), mss); 303 flushStore(store, id++); 304 mss = store.memstore.getFlushableSize(); 305 assertEquals(0, mss.getDataSize()); 306 assertEquals(MutableSegment.DEEP_OVERHEAD, mss.getHeapSize()); 307 return null; 308 } 309 }); 310 } 311 312 /** 313 * Verify that compression and data block encoding are respected by the 314 * Store.createWriterInTmp() method, used on store flush. 315 */ 316 @Test 317 public void testCreateWriter() throws Exception { 318 Configuration conf = HBaseConfiguration.create(); 319 FileSystem fs = FileSystem.get(conf); 320 321 ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.newBuilder(family) 322 .setCompressionType(Compression.Algorithm.GZ).setDataBlockEncoding(DataBlockEncoding.DIFF) 323 .build(); 324 init(name.getMethodName(), conf, hcd); 325 326 // Test createWriterInTmp() 327 StoreFileWriter writer = 328 store.createWriterInTmp(4, hcd.getCompressionType(), false, true, false, false); 329 Path path = writer.getPath(); 330 writer.append(new KeyValue(row, family, qf1, Bytes.toBytes(1))); 331 writer.append(new KeyValue(row, family, qf2, Bytes.toBytes(2))); 332 writer.append(new KeyValue(row2, family, qf1, Bytes.toBytes(3))); 333 writer.append(new KeyValue(row2, family, qf2, Bytes.toBytes(4))); 334 writer.close(); 335 336 // Verify that compression and encoding settings are respected 337 HFile.Reader reader = HFile.createReader(fs, path, new CacheConfig(conf), true, conf); 338 assertEquals(hcd.getCompressionType(), reader.getTrailer().getCompressionCodec()); 339 assertEquals(hcd.getDataBlockEncoding(), reader.getDataBlockEncoding()); 340 reader.close(); 341 } 342 343 @Test 344 public void testDeleteExpiredStoreFiles() throws Exception { 345 testDeleteExpiredStoreFiles(0); 346 testDeleteExpiredStoreFiles(1); 347 } 348 349 /* 350 * @param minVersions the MIN_VERSIONS for the column family 351 */ 352 public void testDeleteExpiredStoreFiles(int minVersions) throws Exception { 353 int storeFileNum = 4; 354 int ttl = 4; 355 IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge(); 356 EnvironmentEdgeManagerTestHelper.injectEdge(edge); 357 358 Configuration conf = HBaseConfiguration.create(); 359 // Enable the expired store file deletion 360 conf.setBoolean("hbase.store.delete.expired.storefile", true); 361 // Set the compaction threshold higher to avoid normal compactions. 362 conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 5); 363 364 init(name.getMethodName() + "-" + minVersions, conf, ColumnFamilyDescriptorBuilder 365 .newBuilder(family).setMinVersions(minVersions).setTimeToLive(ttl).build()); 366 367 long storeTtl = this.store.getScanInfo().getTtl(); 368 long sleepTime = storeTtl / storeFileNum; 369 long timeStamp; 370 // There are 4 store files and the max time stamp difference among these 371 // store files will be (this.store.ttl / storeFileNum) 372 for (int i = 1; i <= storeFileNum; i++) { 373 LOG.info("Adding some data for the store file #" + i); 374 timeStamp = EnvironmentEdgeManager.currentTime(); 375 this.store.add(new KeyValue(row, family, qf1, timeStamp, (byte[]) null), null); 376 this.store.add(new KeyValue(row, family, qf2, timeStamp, (byte[]) null), null); 377 this.store.add(new KeyValue(row, family, qf3, timeStamp, (byte[]) null), null); 378 flush(i); 379 edge.incrementTime(sleepTime); 380 } 381 382 // Verify the total number of store files 383 assertEquals(storeFileNum, this.store.getStorefiles().size()); 384 385 // Each call will find one expired store file and delete it before compaction happens. 386 // There will be no compaction due to threshold above. Last file will not be replaced. 387 for (int i = 1; i <= storeFileNum - 1; i++) { 388 // verify the expired store file. 389 assertFalse(this.store.requestCompaction().isPresent()); 390 Collection<HStoreFile> sfs = this.store.getStorefiles(); 391 // Ensure i files are gone. 392 if (minVersions == 0) { 393 assertEquals(storeFileNum - i, sfs.size()); 394 // Ensure only non-expired files remain. 395 for (HStoreFile sf : sfs) { 396 assertTrue(sf.getReader().getMaxTimestamp() >= (edge.currentTime() - storeTtl)); 397 } 398 } else { 399 assertEquals(storeFileNum, sfs.size()); 400 } 401 // Let the next store file expired. 402 edge.incrementTime(sleepTime); 403 } 404 assertFalse(this.store.requestCompaction().isPresent()); 405 406 Collection<HStoreFile> sfs = this.store.getStorefiles(); 407 // Assert the last expired file is not removed. 408 if (minVersions == 0) { 409 assertEquals(1, sfs.size()); 410 } 411 long ts = sfs.iterator().next().getReader().getMaxTimestamp(); 412 assertTrue(ts < (edge.currentTime() - storeTtl)); 413 414 for (HStoreFile sf : sfs) { 415 sf.closeStoreFile(true); 416 } 417 } 418 419 @Test 420 public void testLowestModificationTime() throws Exception { 421 Configuration conf = HBaseConfiguration.create(); 422 FileSystem fs = FileSystem.get(conf); 423 // Initialize region 424 init(name.getMethodName(), conf); 425 426 int storeFileNum = 4; 427 for (int i = 1; i <= storeFileNum; i++) { 428 LOG.info("Adding some data for the store file #"+i); 429 this.store.add(new KeyValue(row, family, qf1, i, (byte[])null), null); 430 this.store.add(new KeyValue(row, family, qf2, i, (byte[])null), null); 431 this.store.add(new KeyValue(row, family, qf3, i, (byte[])null), null); 432 flush(i); 433 } 434 // after flush; check the lowest time stamp 435 long lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles()); 436 long lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles()); 437 assertEquals(lowestTimeStampFromManager,lowestTimeStampFromFS); 438 439 // after compact; check the lowest time stamp 440 store.compact(store.requestCompaction().get(), NoLimitThroughputController.INSTANCE, null); 441 lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles()); 442 lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles()); 443 assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS); 444 } 445 446 private static long getLowestTimeStampFromFS(FileSystem fs, 447 final Collection<HStoreFile> candidates) throws IOException { 448 long minTs = Long.MAX_VALUE; 449 if (candidates.isEmpty()) { 450 return minTs; 451 } 452 Path[] p = new Path[candidates.size()]; 453 int i = 0; 454 for (HStoreFile sf : candidates) { 455 p[i] = sf.getPath(); 456 ++i; 457 } 458 459 FileStatus[] stats = fs.listStatus(p); 460 if (stats == null || stats.length == 0) { 461 return minTs; 462 } 463 for (FileStatus s : stats) { 464 minTs = Math.min(minTs, s.getModificationTime()); 465 } 466 return minTs; 467 } 468 469 ////////////////////////////////////////////////////////////////////////////// 470 // Get tests 471 ////////////////////////////////////////////////////////////////////////////// 472 473 private static final int BLOCKSIZE_SMALL = 8192; 474 /** 475 * Test for hbase-1686. 476 * @throws IOException 477 */ 478 @Test 479 public void testEmptyStoreFile() throws IOException { 480 init(this.name.getMethodName()); 481 // Write a store file. 482 this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null); 483 this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null), null); 484 flush(1); 485 // Now put in place an empty store file. Its a little tricky. Have to 486 // do manually with hacked in sequence id. 487 HStoreFile f = this.store.getStorefiles().iterator().next(); 488 Path storedir = f.getPath().getParent(); 489 long seqid = f.getMaxSequenceId(); 490 Configuration c = HBaseConfiguration.create(); 491 FileSystem fs = FileSystem.get(c); 492 HFileContext meta = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).build(); 493 StoreFileWriter w = new StoreFileWriter.Builder(c, new CacheConfig(c), 494 fs) 495 .withOutputDir(storedir) 496 .withFileContext(meta) 497 .build(); 498 w.appendMetadata(seqid + 1, false); 499 w.close(); 500 this.store.close(); 501 // Reopen it... should pick up two files 502 this.store = 503 new HStore(this.store.getHRegion(), this.store.getColumnFamilyDescriptor(), c, false); 504 assertEquals(2, this.store.getStorefilesCount()); 505 506 result = HBaseTestingUtility.getFromStoreFile(store, 507 get.getRow(), 508 qualifiers); 509 assertEquals(1, result.size()); 510 } 511 512 /** 513 * Getting data from memstore only 514 * @throws IOException 515 */ 516 @Test 517 public void testGet_FromMemStoreOnly() throws IOException { 518 init(this.name.getMethodName()); 519 520 //Put data in memstore 521 this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null); 522 this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null), null); 523 this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null), null); 524 this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null), null); 525 this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null), null); 526 this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null), null); 527 528 //Get 529 result = HBaseTestingUtility.getFromStoreFile(store, 530 get.getRow(), qualifiers); 531 532 //Compare 533 assertCheck(); 534 } 535 536 @Test 537 public void testTimeRangeIfSomeCellsAreDroppedInFlush() throws IOException { 538 testTimeRangeIfSomeCellsAreDroppedInFlush(1); 539 testTimeRangeIfSomeCellsAreDroppedInFlush(3); 540 testTimeRangeIfSomeCellsAreDroppedInFlush(5); 541 } 542 543 private void testTimeRangeIfSomeCellsAreDroppedInFlush(int maxVersion) throws IOException { 544 init(this.name.getMethodName(), TEST_UTIL.getConfiguration(), 545 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(maxVersion).build()); 546 long currentTs = 100; 547 long minTs = currentTs; 548 // the extra cell won't be flushed to disk, 549 // so the min of timerange will be different between memStore and hfile. 550 for (int i = 0; i != (maxVersion + 1); ++i) { 551 this.store.add(new KeyValue(row, family, qf1, ++currentTs, (byte[])null), null); 552 if (i == 1) { 553 minTs = currentTs; 554 } 555 } 556 flushStore(store, id++); 557 558 Collection<HStoreFile> files = store.getStorefiles(); 559 assertEquals(1, files.size()); 560 HStoreFile f = files.iterator().next(); 561 f.initReader(); 562 StoreFileReader reader = f.getReader(); 563 assertEquals(minTs, reader.timeRange.getMin()); 564 assertEquals(currentTs, reader.timeRange.getMax()); 565 } 566 567 /** 568 * Getting data from files only 569 * @throws IOException 570 */ 571 @Test 572 public void testGet_FromFilesOnly() throws IOException { 573 init(this.name.getMethodName()); 574 575 //Put data in memstore 576 this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null); 577 this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null), null); 578 //flush 579 flush(1); 580 581 //Add more data 582 this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null), null); 583 this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null), null); 584 //flush 585 flush(2); 586 587 //Add more data 588 this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null), null); 589 this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null), null); 590 //flush 591 flush(3); 592 593 //Get 594 result = HBaseTestingUtility.getFromStoreFile(store, 595 get.getRow(), 596 qualifiers); 597 //this.store.get(get, qualifiers, result); 598 599 //Need to sort the result since multiple files 600 Collections.sort(result, CellComparatorImpl.COMPARATOR); 601 602 //Compare 603 assertCheck(); 604 } 605 606 /** 607 * Getting data from memstore and files 608 * @throws IOException 609 */ 610 @Test 611 public void testGet_FromMemStoreAndFiles() throws IOException { 612 init(this.name.getMethodName()); 613 614 //Put data in memstore 615 this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null); 616 this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null), null); 617 //flush 618 flush(1); 619 620 //Add more data 621 this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null), null); 622 this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null), null); 623 //flush 624 flush(2); 625 626 //Add more data 627 this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null), null); 628 this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null), null); 629 630 //Get 631 result = HBaseTestingUtility.getFromStoreFile(store, 632 get.getRow(), qualifiers); 633 634 //Need to sort the result since multiple files 635 Collections.sort(result, CellComparatorImpl.COMPARATOR); 636 637 //Compare 638 assertCheck(); 639 } 640 641 private void flush(int storeFilessize) throws IOException{ 642 this.store.snapshot(); 643 flushStore(store, id++); 644 assertEquals(storeFilessize, this.store.getStorefiles().size()); 645 assertEquals(0, ((AbstractMemStore)this.store.memstore).getActive().getCellsCount()); 646 } 647 648 private void assertCheck() { 649 assertEquals(expected.size(), result.size()); 650 for(int i=0; i<expected.size(); i++) { 651 assertEquals(expected.get(i), result.get(i)); 652 } 653 } 654 655 @After 656 public void tearDown() throws Exception { 657 EnvironmentEdgeManagerTestHelper.reset(); 658 if (store != null) { 659 try { 660 store.close(); 661 } catch (IOException e) { 662 } 663 store = null; 664 } 665 if (region != null) { 666 region.close(); 667 region = null; 668 } 669 } 670 671 @AfterClass 672 public static void tearDownAfterClass() throws IOException { 673 TEST_UTIL.cleanupTestDir(); 674 } 675 676 @Test 677 public void testHandleErrorsInFlush() throws Exception { 678 LOG.info("Setting up a faulty file system that cannot write"); 679 680 final Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); 681 User user = User.createUserForTesting(conf, 682 "testhandleerrorsinflush", new String[]{"foo"}); 683 // Inject our faulty LocalFileSystem 684 conf.setClass("fs.file.impl", FaultyFileSystem.class, 685 FileSystem.class); 686 user.runAs(new PrivilegedExceptionAction<Object>() { 687 @Override 688 public Object run() throws Exception { 689 // Make sure it worked (above is sensitive to caching details in hadoop core) 690 FileSystem fs = FileSystem.get(conf); 691 assertEquals(FaultyFileSystem.class, fs.getClass()); 692 693 // Initialize region 694 init(name.getMethodName(), conf); 695 696 LOG.info("Adding some data"); 697 store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null); 698 store.add(new KeyValue(row, family, qf2, 1, (byte[])null), null); 699 store.add(new KeyValue(row, family, qf3, 1, (byte[])null), null); 700 701 LOG.info("Before flush, we should have no files"); 702 703 Collection<StoreFileInfo> files = 704 store.getRegionFileSystem().getStoreFiles(store.getColumnFamilyName()); 705 assertEquals(0, files != null ? files.size() : 0); 706 707 //flush 708 try { 709 LOG.info("Flushing"); 710 flush(1); 711 fail("Didn't bubble up IOE!"); 712 } catch (IOException ioe) { 713 assertTrue(ioe.getMessage().contains("Fault injected")); 714 } 715 716 LOG.info("After failed flush, we should still have no files!"); 717 files = store.getRegionFileSystem().getStoreFiles(store.getColumnFamilyName()); 718 assertEquals(0, files != null ? files.size() : 0); 719 store.getHRegion().getWAL().close(); 720 return null; 721 } 722 }); 723 FileSystem.closeAllForUGI(user.getUGI()); 724 } 725 726 /** 727 * Faulty file system that will fail if you write past its fault position the FIRST TIME 728 * only; thereafter it will succeed. Used by {@link TestHRegion} too. 729 */ 730 static class FaultyFileSystem extends FilterFileSystem { 731 List<SoftReference<FaultyOutputStream>> outStreams = new ArrayList<>(); 732 private long faultPos = 200; 733 AtomicBoolean fault = new AtomicBoolean(true); 734 735 public FaultyFileSystem() { 736 super(new LocalFileSystem()); 737 System.err.println("Creating faulty!"); 738 } 739 740 @Override 741 public FSDataOutputStream create(Path p) throws IOException { 742 return new FaultyOutputStream(super.create(p), faultPos, fault); 743 } 744 745 @Override 746 public FSDataOutputStream create(Path f, FsPermission permission, 747 boolean overwrite, int bufferSize, short replication, long blockSize, 748 Progressable progress) throws IOException { 749 return new FaultyOutputStream(super.create(f, permission, 750 overwrite, bufferSize, replication, blockSize, progress), faultPos, fault); 751 } 752 753 @Override 754 public FSDataOutputStream createNonRecursive(Path f, boolean overwrite, 755 int bufferSize, short replication, long blockSize, Progressable progress) 756 throws IOException { 757 // Fake it. Call create instead. The default implementation throws an IOE 758 // that this is not supported. 759 return create(f, overwrite, bufferSize, replication, blockSize, progress); 760 } 761 } 762 763 static class FaultyOutputStream extends FSDataOutputStream { 764 volatile long faultPos = Long.MAX_VALUE; 765 private final AtomicBoolean fault; 766 767 public FaultyOutputStream(FSDataOutputStream out, long faultPos, final AtomicBoolean fault) 768 throws IOException { 769 super(out, null); 770 this.faultPos = faultPos; 771 this.fault = fault; 772 } 773 774 @Override 775 public synchronized void write(byte[] buf, int offset, int length) throws IOException { 776 System.err.println("faulty stream write at pos " + getPos()); 777 injectFault(); 778 super.write(buf, offset, length); 779 } 780 781 private void injectFault() throws IOException { 782 if (this.fault.get() && getPos() >= faultPos) { 783 throw new IOException("Fault injected"); 784 } 785 } 786 } 787 788 private static void flushStore(HStore store, long id) throws IOException { 789 StoreFlushContext storeFlushCtx = store.createFlushContext(id, FlushLifeCycleTracker.DUMMY); 790 storeFlushCtx.prepare(); 791 storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class)); 792 storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); 793 } 794 795 /** 796 * Generate a list of KeyValues for testing based on given parameters 797 * @param timestamps 798 * @param numRows 799 * @param qualifier 800 * @param family 801 * @return the rows key-value list 802 */ 803 List<Cell> getKeyValueSet(long[] timestamps, int numRows, 804 byte[] qualifier, byte[] family) { 805 List<Cell> kvList = new ArrayList<>(); 806 for (int i=1;i<=numRows;i++) { 807 byte[] b = Bytes.toBytes(i); 808 for (long timestamp: timestamps) { 809 kvList.add(new KeyValue(b, family, qualifier, timestamp, b)); 810 } 811 } 812 return kvList; 813 } 814 815 /** 816 * Test to ensure correctness when using Stores with multiple timestamps 817 * @throws IOException 818 */ 819 @Test 820 public void testMultipleTimestamps() throws IOException { 821 int numRows = 1; 822 long[] timestamps1 = new long[] {1,5,10,20}; 823 long[] timestamps2 = new long[] {30,80}; 824 825 init(this.name.getMethodName()); 826 827 List<Cell> kvList1 = getKeyValueSet(timestamps1,numRows, qf1, family); 828 for (Cell kv : kvList1) { 829 this.store.add(kv, null); 830 } 831 832 this.store.snapshot(); 833 flushStore(store, id++); 834 835 List<Cell> kvList2 = getKeyValueSet(timestamps2,numRows, qf1, family); 836 for(Cell kv : kvList2) { 837 this.store.add(kv, null); 838 } 839 840 List<Cell> result; 841 Get get = new Get(Bytes.toBytes(1)); 842 get.addColumn(family,qf1); 843 844 get.setTimeRange(0,15); 845 result = HBaseTestingUtility.getFromStoreFile(store, get); 846 assertTrue(result.size()>0); 847 848 get.setTimeRange(40,90); 849 result = HBaseTestingUtility.getFromStoreFile(store, get); 850 assertTrue(result.size()>0); 851 852 get.setTimeRange(10,45); 853 result = HBaseTestingUtility.getFromStoreFile(store, get); 854 assertTrue(result.size()>0); 855 856 get.setTimeRange(80,145); 857 result = HBaseTestingUtility.getFromStoreFile(store, get); 858 assertTrue(result.size()>0); 859 860 get.setTimeRange(1,2); 861 result = HBaseTestingUtility.getFromStoreFile(store, get); 862 assertTrue(result.size()>0); 863 864 get.setTimeRange(90,200); 865 result = HBaseTestingUtility.getFromStoreFile(store, get); 866 assertTrue(result.size()==0); 867 } 868 869 /** 870 * Test for HBASE-3492 - Test split on empty colfam (no store files). 871 * 872 * @throws IOException When the IO operations fail. 873 */ 874 @Test 875 public void testSplitWithEmptyColFam() throws IOException { 876 init(this.name.getMethodName()); 877 assertFalse(store.getSplitPoint().isPresent()); 878 } 879 880 @Test 881 public void testStoreUsesConfigurationFromHcdAndHtd() throws Exception { 882 final String CONFIG_KEY = "hbase.regionserver.thread.compaction.throttle"; 883 long anyValue = 10; 884 885 // We'll check that it uses correct config and propagates it appropriately by going thru 886 // the simplest "real" path I can find - "throttleCompaction", which just checks whether 887 // a number we pass in is higher than some config value, inside compactionPolicy. 888 Configuration conf = HBaseConfiguration.create(); 889 conf.setLong(CONFIG_KEY, anyValue); 890 init(name.getMethodName() + "-xml", conf); 891 assertTrue(store.throttleCompaction(anyValue + 1)); 892 assertFalse(store.throttleCompaction(anyValue)); 893 894 // HTD overrides XML. 895 --anyValue; 896 init(name.getMethodName() + "-htd", conf, TableDescriptorBuilder 897 .newBuilder(TableName.valueOf(table)).setValue(CONFIG_KEY, Long.toString(anyValue)), 898 ColumnFamilyDescriptorBuilder.of(family)); 899 assertTrue(store.throttleCompaction(anyValue + 1)); 900 assertFalse(store.throttleCompaction(anyValue)); 901 902 // HCD overrides them both. 903 --anyValue; 904 init(name.getMethodName() + "-hcd", conf, 905 TableDescriptorBuilder.newBuilder(TableName.valueOf(table)).setValue(CONFIG_KEY, 906 Long.toString(anyValue)), 907 ColumnFamilyDescriptorBuilder.newBuilder(family).setValue(CONFIG_KEY, Long.toString(anyValue)) 908 .build()); 909 assertTrue(store.throttleCompaction(anyValue + 1)); 910 assertFalse(store.throttleCompaction(anyValue)); 911 } 912 913 public static class DummyStoreEngine extends DefaultStoreEngine { 914 public static DefaultCompactor lastCreatedCompactor = null; 915 916 @Override 917 protected void createComponents(Configuration conf, HStore store, CellComparator comparator) 918 throws IOException { 919 super.createComponents(conf, store, comparator); 920 lastCreatedCompactor = this.compactor; 921 } 922 } 923 924 @Test 925 public void testStoreUsesSearchEngineOverride() throws Exception { 926 Configuration conf = HBaseConfiguration.create(); 927 conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DummyStoreEngine.class.getName()); 928 init(this.name.getMethodName(), conf); 929 assertEquals(DummyStoreEngine.lastCreatedCompactor, 930 this.store.storeEngine.getCompactor()); 931 } 932 933 private void addStoreFile() throws IOException { 934 HStoreFile f = this.store.getStorefiles().iterator().next(); 935 Path storedir = f.getPath().getParent(); 936 long seqid = this.store.getMaxSequenceId().orElse(0L); 937 Configuration c = TEST_UTIL.getConfiguration(); 938 FileSystem fs = FileSystem.get(c); 939 HFileContext fileContext = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).build(); 940 StoreFileWriter w = new StoreFileWriter.Builder(c, new CacheConfig(c), 941 fs) 942 .withOutputDir(storedir) 943 .withFileContext(fileContext) 944 .build(); 945 w.appendMetadata(seqid + 1, false); 946 w.close(); 947 LOG.info("Added store file:" + w.getPath()); 948 } 949 950 private void archiveStoreFile(int index) throws IOException { 951 Collection<HStoreFile> files = this.store.getStorefiles(); 952 HStoreFile sf = null; 953 Iterator<HStoreFile> it = files.iterator(); 954 for (int i = 0; i <= index; i++) { 955 sf = it.next(); 956 } 957 store.getRegionFileSystem().removeStoreFiles(store.getColumnFamilyName(), Lists.newArrayList(sf)); 958 } 959 960 private void closeCompactedFile(int index) throws IOException { 961 Collection<HStoreFile> files = 962 this.store.getStoreEngine().getStoreFileManager().getCompactedfiles(); 963 HStoreFile sf = null; 964 Iterator<HStoreFile> it = files.iterator(); 965 for (int i = 0; i <= index; i++) { 966 sf = it.next(); 967 } 968 sf.closeStoreFile(true); 969 store.getStoreEngine().getStoreFileManager().removeCompactedFiles(Lists.newArrayList(sf)); 970 } 971 972 @Test 973 public void testRefreshStoreFiles() throws Exception { 974 init(name.getMethodName()); 975 976 assertEquals(0, this.store.getStorefilesCount()); 977 978 // Test refreshing store files when no store files are there 979 store.refreshStoreFiles(); 980 assertEquals(0, this.store.getStorefilesCount()); 981 982 // add some data, flush 983 this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null); 984 flush(1); 985 assertEquals(1, this.store.getStorefilesCount()); 986 987 // add one more file 988 addStoreFile(); 989 990 assertEquals(1, this.store.getStorefilesCount()); 991 store.refreshStoreFiles(); 992 assertEquals(2, this.store.getStorefilesCount()); 993 994 // add three more files 995 addStoreFile(); 996 addStoreFile(); 997 addStoreFile(); 998 999 assertEquals(2, this.store.getStorefilesCount()); 1000 store.refreshStoreFiles(); 1001 assertEquals(5, this.store.getStorefilesCount()); 1002 1003 closeCompactedFile(0); 1004 archiveStoreFile(0); 1005 1006 assertEquals(5, this.store.getStorefilesCount()); 1007 store.refreshStoreFiles(); 1008 assertEquals(4, this.store.getStorefilesCount()); 1009 1010 archiveStoreFile(0); 1011 archiveStoreFile(1); 1012 archiveStoreFile(2); 1013 1014 assertEquals(4, this.store.getStorefilesCount()); 1015 store.refreshStoreFiles(); 1016 assertEquals(1, this.store.getStorefilesCount()); 1017 1018 archiveStoreFile(0); 1019 store.refreshStoreFiles(); 1020 assertEquals(0, this.store.getStorefilesCount()); 1021 } 1022 1023 @Test 1024 public void testRefreshStoreFilesNotChanged() throws IOException { 1025 init(name.getMethodName()); 1026 1027 assertEquals(0, this.store.getStorefilesCount()); 1028 1029 // add some data, flush 1030 this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null); 1031 flush(1); 1032 // add one more file 1033 addStoreFile(); 1034 1035 HStore spiedStore = spy(store); 1036 1037 // call first time after files changed 1038 spiedStore.refreshStoreFiles(); 1039 assertEquals(2, this.store.getStorefilesCount()); 1040 verify(spiedStore, times(1)).replaceStoreFiles(any(), any()); 1041 1042 // call second time 1043 spiedStore.refreshStoreFiles(); 1044 1045 //ensure that replaceStoreFiles is not called if files are not refreshed 1046 verify(spiedStore, times(0)).replaceStoreFiles(null, null); 1047 } 1048 1049 private long countMemStoreScanner(StoreScanner scanner) { 1050 if (scanner.currentScanners == null) { 1051 return 0; 1052 } 1053 return scanner.currentScanners.stream() 1054 .filter(s -> !s.isFileScanner()) 1055 .count(); 1056 } 1057 1058 @Test 1059 public void testNumberOfMemStoreScannersAfterFlush() throws IOException { 1060 long seqId = 100; 1061 long timestamp = System.currentTimeMillis(); 1062 Cell cell0 = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family) 1063 .setQualifier(qf1).setTimestamp(timestamp).setType(Cell.Type.Put) 1064 .setValue(qf1).build(); 1065 PrivateCellUtil.setSequenceId(cell0, seqId); 1066 testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Collections.emptyList()); 1067 1068 Cell cell1 = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family) 1069 .setQualifier(qf2).setTimestamp(timestamp).setType(Cell.Type.Put) 1070 .setValue(qf1).build(); 1071 PrivateCellUtil.setSequenceId(cell1, seqId); 1072 testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Arrays.asList(cell1)); 1073 1074 seqId = 101; 1075 timestamp = System.currentTimeMillis(); 1076 Cell cell2 = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row2).setFamily(family) 1077 .setQualifier(qf2).setTimestamp(timestamp).setType(Cell.Type.Put) 1078 .setValue(qf1).build(); 1079 PrivateCellUtil.setSequenceId(cell2, seqId); 1080 testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Arrays.asList(cell1, cell2)); 1081 } 1082 1083 private void testNumberOfMemStoreScannersAfterFlush(List<Cell> inputCellsBeforeSnapshot, 1084 List<Cell> inputCellsAfterSnapshot) throws IOException { 1085 init(this.name.getMethodName() + "-" + inputCellsBeforeSnapshot.size()); 1086 TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); 1087 long seqId = Long.MIN_VALUE; 1088 for (Cell c : inputCellsBeforeSnapshot) { 1089 quals.add(CellUtil.cloneQualifier(c)); 1090 seqId = Math.max(seqId, c.getSequenceId()); 1091 } 1092 for (Cell c : inputCellsAfterSnapshot) { 1093 quals.add(CellUtil.cloneQualifier(c)); 1094 seqId = Math.max(seqId, c.getSequenceId()); 1095 } 1096 inputCellsBeforeSnapshot.forEach(c -> store.add(c, null)); 1097 StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY); 1098 storeFlushCtx.prepare(); 1099 inputCellsAfterSnapshot.forEach(c -> store.add(c, null)); 1100 int numberOfMemScannersBeforeFlush = inputCellsAfterSnapshot.isEmpty() ? 1 : 2; 1101 try (StoreScanner s = (StoreScanner) store.getScanner(new Scan(), quals, seqId)) { 1102 // snapshot + active (if inputCellsAfterSnapshot isn't empty) 1103 assertEquals(numberOfMemScannersBeforeFlush, countMemStoreScanner(s)); 1104 storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class)); 1105 storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); 1106 // snapshot has no data after flush 1107 int numberOfMemScannersAfterFlush = inputCellsAfterSnapshot.isEmpty() ? 0 : 1; 1108 boolean more; 1109 int cellCount = 0; 1110 do { 1111 List<Cell> cells = new ArrayList<>(); 1112 more = s.next(cells); 1113 cellCount += cells.size(); 1114 assertEquals(more ? numberOfMemScannersAfterFlush : 0, countMemStoreScanner(s)); 1115 } while (more); 1116 assertEquals("The number of cells added before snapshot is " + inputCellsBeforeSnapshot.size() 1117 + ", The number of cells added after snapshot is " + inputCellsAfterSnapshot.size(), 1118 inputCellsBeforeSnapshot.size() + inputCellsAfterSnapshot.size(), cellCount); 1119 // the current scanners is cleared 1120 assertEquals(0, countMemStoreScanner(s)); 1121 } 1122 } 1123 1124 private Cell createCell(byte[] qualifier, long ts, long sequenceId, byte[] value) 1125 throws IOException { 1126 return createCell(row, qualifier, ts, sequenceId, value); 1127 } 1128 1129 private Cell createCell(byte[] row, byte[] qualifier, long ts, long sequenceId, byte[] value) 1130 throws IOException { 1131 Cell c = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family) 1132 .setQualifier(qualifier).setTimestamp(ts).setType(Cell.Type.Put) 1133 .setValue(value).build(); 1134 PrivateCellUtil.setSequenceId(c, sequenceId); 1135 return c; 1136 } 1137 1138 @Test 1139 public void testFlushBeforeCompletingScanWoFilter() throws IOException, InterruptedException { 1140 final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false); 1141 final int expectedSize = 3; 1142 testFlushBeforeCompletingScan(new MyListHook() { 1143 @Override 1144 public void hook(int currentSize) { 1145 if (currentSize == expectedSize - 1) { 1146 try { 1147 flushStore(store, id++); 1148 timeToGoNextRow.set(true); 1149 } catch (IOException e) { 1150 throw new RuntimeException(e); 1151 } 1152 } 1153 } 1154 }, new FilterBase() { 1155 @Override 1156 public Filter.ReturnCode filterCell(final Cell c) throws IOException { 1157 return ReturnCode.INCLUDE; 1158 } 1159 }, expectedSize); 1160 } 1161 1162 @Test 1163 public void testFlushBeforeCompletingScanWithFilter() throws IOException, InterruptedException { 1164 final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false); 1165 final int expectedSize = 2; 1166 testFlushBeforeCompletingScan(new MyListHook() { 1167 @Override 1168 public void hook(int currentSize) { 1169 if (currentSize == expectedSize - 1) { 1170 try { 1171 flushStore(store, id++); 1172 timeToGoNextRow.set(true); 1173 } catch (IOException e) { 1174 throw new RuntimeException(e); 1175 } 1176 } 1177 } 1178 }, new FilterBase() { 1179 @Override 1180 public Filter.ReturnCode filterCell(final Cell c) throws IOException { 1181 if (timeToGoNextRow.get()) { 1182 timeToGoNextRow.set(false); 1183 return ReturnCode.NEXT_ROW; 1184 } else { 1185 return ReturnCode.INCLUDE; 1186 } 1187 } 1188 }, expectedSize); 1189 } 1190 1191 @Test 1192 public void testFlushBeforeCompletingScanWithFilterHint() throws IOException, 1193 InterruptedException { 1194 final AtomicBoolean timeToGetHint = new AtomicBoolean(false); 1195 final int expectedSize = 2; 1196 testFlushBeforeCompletingScan(new MyListHook() { 1197 @Override 1198 public void hook(int currentSize) { 1199 if (currentSize == expectedSize - 1) { 1200 try { 1201 flushStore(store, id++); 1202 timeToGetHint.set(true); 1203 } catch (IOException e) { 1204 throw new RuntimeException(e); 1205 } 1206 } 1207 } 1208 }, new FilterBase() { 1209 @Override 1210 public Filter.ReturnCode filterCell(final Cell c) throws IOException { 1211 if (timeToGetHint.get()) { 1212 timeToGetHint.set(false); 1213 return Filter.ReturnCode.SEEK_NEXT_USING_HINT; 1214 } else { 1215 return Filter.ReturnCode.INCLUDE; 1216 } 1217 } 1218 @Override 1219 public Cell getNextCellHint(Cell currentCell) throws IOException { 1220 return currentCell; 1221 } 1222 }, expectedSize); 1223 } 1224 1225 private void testFlushBeforeCompletingScan(MyListHook hook, Filter filter, int expectedSize) 1226 throws IOException, InterruptedException { 1227 Configuration conf = HBaseConfiguration.create(); 1228 byte[] r0 = Bytes.toBytes("row0"); 1229 byte[] r1 = Bytes.toBytes("row1"); 1230 byte[] r2 = Bytes.toBytes("row2"); 1231 byte[] value0 = Bytes.toBytes("value0"); 1232 byte[] value1 = Bytes.toBytes("value1"); 1233 byte[] value2 = Bytes.toBytes("value2"); 1234 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 1235 long ts = EnvironmentEdgeManager.currentTime(); 1236 long seqId = 100; 1237 init(name.getMethodName(), conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), 1238 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(1).build(), 1239 new MyStoreHook() { 1240 @Override 1241 public long getSmallestReadPoint(HStore store) { 1242 return seqId + 3; 1243 } 1244 }); 1245 // The cells having the value0 won't be flushed to disk because the value of max version is 1 1246 store.add(createCell(r0, qf1, ts, seqId, value0), memStoreSizing); 1247 store.add(createCell(r0, qf2, ts, seqId, value0), memStoreSizing); 1248 store.add(createCell(r0, qf3, ts, seqId, value0), memStoreSizing); 1249 store.add(createCell(r1, qf1, ts + 1, seqId + 1, value1), memStoreSizing); 1250 store.add(createCell(r1, qf2, ts + 1, seqId + 1, value1), memStoreSizing); 1251 store.add(createCell(r1, qf3, ts + 1, seqId + 1, value1), memStoreSizing); 1252 store.add(createCell(r2, qf1, ts + 2, seqId + 2, value2), memStoreSizing); 1253 store.add(createCell(r2, qf2, ts + 2, seqId + 2, value2), memStoreSizing); 1254 store.add(createCell(r2, qf3, ts + 2, seqId + 2, value2), memStoreSizing); 1255 store.add(createCell(r1, qf1, ts + 3, seqId + 3, value1), memStoreSizing); 1256 store.add(createCell(r1, qf2, ts + 3, seqId + 3, value1), memStoreSizing); 1257 store.add(createCell(r1, qf3, ts + 3, seqId + 3, value1), memStoreSizing); 1258 List<Cell> myList = new MyList<>(hook); 1259 Scan scan = new Scan() 1260 .withStartRow(r1) 1261 .setFilter(filter); 1262 try (InternalScanner scanner = (InternalScanner) store.getScanner( 1263 scan, null, seqId + 3)){ 1264 // r1 1265 scanner.next(myList); 1266 assertEquals(expectedSize, myList.size()); 1267 for (Cell c : myList) { 1268 byte[] actualValue = CellUtil.cloneValue(c); 1269 assertTrue("expected:" + Bytes.toStringBinary(value1) 1270 + ", actual:" + Bytes.toStringBinary(actualValue) 1271 , Bytes.equals(actualValue, value1)); 1272 } 1273 List<Cell> normalList = new ArrayList<>(3); 1274 // r2 1275 scanner.next(normalList); 1276 assertEquals(3, normalList.size()); 1277 for (Cell c : normalList) { 1278 byte[] actualValue = CellUtil.cloneValue(c); 1279 assertTrue("expected:" + Bytes.toStringBinary(value2) 1280 + ", actual:" + Bytes.toStringBinary(actualValue) 1281 , Bytes.equals(actualValue, value2)); 1282 } 1283 } 1284 } 1285 1286 @Test 1287 public void testCreateScannerAndSnapshotConcurrently() throws IOException, InterruptedException { 1288 Configuration conf = HBaseConfiguration.create(); 1289 conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore.class.getName()); 1290 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) 1291 .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); 1292 byte[] value = Bytes.toBytes("value"); 1293 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 1294 long ts = EnvironmentEdgeManager.currentTime(); 1295 long seqId = 100; 1296 // older data whihc shouldn't be "seen" by client 1297 store.add(createCell(qf1, ts, seqId, value), memStoreSizing); 1298 store.add(createCell(qf2, ts, seqId, value), memStoreSizing); 1299 store.add(createCell(qf3, ts, seqId, value), memStoreSizing); 1300 TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); 1301 quals.add(qf1); 1302 quals.add(qf2); 1303 quals.add(qf3); 1304 StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY); 1305 MyCompactingMemStore.START_TEST.set(true); 1306 Runnable flush = () -> { 1307 // this is blocked until we create first scanner from pipeline and snapshot -- phase (1/5) 1308 // recreate the active memstore -- phase (4/5) 1309 storeFlushCtx.prepare(); 1310 }; 1311 ExecutorService service = Executors.newSingleThreadExecutor(); 1312 service.submit(flush); 1313 // we get scanner from pipeline and snapshot but they are empty. -- phase (2/5) 1314 // this is blocked until we recreate the active memstore -- phase (3/5) 1315 // we get scanner from active memstore but it is empty -- phase (5/5) 1316 InternalScanner scanner = (InternalScanner) store.getScanner( 1317 new Scan(new Get(row)), quals, seqId + 1); 1318 service.shutdown(); 1319 service.awaitTermination(20, TimeUnit.SECONDS); 1320 try { 1321 try { 1322 List<Cell> results = new ArrayList<>(); 1323 scanner.next(results); 1324 assertEquals(3, results.size()); 1325 for (Cell c : results) { 1326 byte[] actualValue = CellUtil.cloneValue(c); 1327 assertTrue("expected:" + Bytes.toStringBinary(value) 1328 + ", actual:" + Bytes.toStringBinary(actualValue) 1329 , Bytes.equals(actualValue, value)); 1330 } 1331 } finally { 1332 scanner.close(); 1333 } 1334 } finally { 1335 MyCompactingMemStore.START_TEST.set(false); 1336 storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class)); 1337 storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); 1338 } 1339 } 1340 1341 @Test 1342 public void testScanWithDoubleFlush() throws IOException { 1343 Configuration conf = HBaseConfiguration.create(); 1344 // Initialize region 1345 MyStore myStore = initMyStore(name.getMethodName(), conf, new MyStoreHook(){ 1346 @Override 1347 public void getScanners(MyStore store) throws IOException { 1348 final long tmpId = id++; 1349 ExecutorService s = Executors.newSingleThreadExecutor(); 1350 s.submit(() -> { 1351 try { 1352 // flush the store before storescanner updates the scanners from store. 1353 // The current data will be flushed into files, and the memstore will 1354 // be clear. 1355 // -- phase (4/4) 1356 flushStore(store, tmpId); 1357 }catch (IOException ex) { 1358 throw new RuntimeException(ex); 1359 } 1360 }); 1361 s.shutdown(); 1362 try { 1363 // wait for the flush, the thread will be blocked in HStore#notifyChangedReadersObservers. 1364 s.awaitTermination(3, TimeUnit.SECONDS); 1365 } catch (InterruptedException ex) { 1366 } 1367 } 1368 }); 1369 byte[] oldValue = Bytes.toBytes("oldValue"); 1370 byte[] currentValue = Bytes.toBytes("currentValue"); 1371 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 1372 long ts = EnvironmentEdgeManager.currentTime(); 1373 long seqId = 100; 1374 // older data whihc shouldn't be "seen" by client 1375 myStore.add(createCell(qf1, ts, seqId, oldValue), memStoreSizing); 1376 myStore.add(createCell(qf2, ts, seqId, oldValue), memStoreSizing); 1377 myStore.add(createCell(qf3, ts, seqId, oldValue), memStoreSizing); 1378 long snapshotId = id++; 1379 // push older data into snapshot -- phase (1/4) 1380 StoreFlushContext storeFlushCtx = store.createFlushContext(snapshotId, FlushLifeCycleTracker 1381 .DUMMY); 1382 storeFlushCtx.prepare(); 1383 1384 // insert current data into active -- phase (2/4) 1385 myStore.add(createCell(qf1, ts + 1, seqId + 1, currentValue), memStoreSizing); 1386 myStore.add(createCell(qf2, ts + 1, seqId + 1, currentValue), memStoreSizing); 1387 myStore.add(createCell(qf3, ts + 1, seqId + 1, currentValue), memStoreSizing); 1388 TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); 1389 quals.add(qf1); 1390 quals.add(qf2); 1391 quals.add(qf3); 1392 try (InternalScanner scanner = (InternalScanner) myStore.getScanner( 1393 new Scan(new Get(row)), quals, seqId + 1)) { 1394 // complete the flush -- phase (3/4) 1395 storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class)); 1396 storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); 1397 1398 List<Cell> results = new ArrayList<>(); 1399 scanner.next(results); 1400 assertEquals(3, results.size()); 1401 for (Cell c : results) { 1402 byte[] actualValue = CellUtil.cloneValue(c); 1403 assertTrue("expected:" + Bytes.toStringBinary(currentValue) 1404 + ", actual:" + Bytes.toStringBinary(actualValue) 1405 , Bytes.equals(actualValue, currentValue)); 1406 } 1407 } 1408 } 1409 1410 @Test 1411 public void testReclaimChunkWhenScaning() throws IOException { 1412 init("testReclaimChunkWhenScaning"); 1413 long ts = EnvironmentEdgeManager.currentTime(); 1414 long seqId = 100; 1415 byte[] value = Bytes.toBytes("value"); 1416 // older data whihc shouldn't be "seen" by client 1417 store.add(createCell(qf1, ts, seqId, value), null); 1418 store.add(createCell(qf2, ts, seqId, value), null); 1419 store.add(createCell(qf3, ts, seqId, value), null); 1420 TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); 1421 quals.add(qf1); 1422 quals.add(qf2); 1423 quals.add(qf3); 1424 try (InternalScanner scanner = (InternalScanner) store.getScanner( 1425 new Scan(new Get(row)), quals, seqId)) { 1426 List<Cell> results = new MyList<>(size -> { 1427 switch (size) { 1428 // 1) we get the first cell (qf1) 1429 // 2) flush the data to have StoreScanner update inner scanners 1430 // 3) the chunk will be reclaimed after updaing 1431 case 1: 1432 try { 1433 flushStore(store, id++); 1434 } catch (IOException e) { 1435 throw new RuntimeException(e); 1436 } 1437 break; 1438 // 1) we get the second cell (qf2) 1439 // 2) add some cell to fill some byte into the chunk (we have only one chunk) 1440 case 2: 1441 try { 1442 byte[] newValue = Bytes.toBytes("newValue"); 1443 // older data whihc shouldn't be "seen" by client 1444 store.add(createCell(qf1, ts + 1, seqId + 1, newValue), null); 1445 store.add(createCell(qf2, ts + 1, seqId + 1, newValue), null); 1446 store.add(createCell(qf3, ts + 1, seqId + 1, newValue), null); 1447 } catch (IOException e) { 1448 throw new RuntimeException(e); 1449 } 1450 break; 1451 default: 1452 break; 1453 } 1454 }); 1455 scanner.next(results); 1456 assertEquals(3, results.size()); 1457 for (Cell c : results) { 1458 byte[] actualValue = CellUtil.cloneValue(c); 1459 assertTrue("expected:" + Bytes.toStringBinary(value) 1460 + ", actual:" + Bytes.toStringBinary(actualValue) 1461 , Bytes.equals(actualValue, value)); 1462 } 1463 } 1464 } 1465 1466 /** 1467 * If there are two running InMemoryFlushRunnable, the later InMemoryFlushRunnable 1468 * may change the versionedList. And the first InMemoryFlushRunnable will use the chagned 1469 * versionedList to remove the corresponding segments. 1470 * In short, there will be some segements which isn't in merge are removed. 1471 * @throws IOException 1472 * @throws InterruptedException 1473 */ 1474 @Test 1475 public void testRunDoubleMemStoreCompactors() throws IOException, InterruptedException { 1476 int flushSize = 500; 1477 Configuration conf = HBaseConfiguration.create(); 1478 conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStoreWithCustomCompactor.class.getName()); 1479 conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.25); 1480 MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.set(0); 1481 conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushSize)); 1482 // Set the lower threshold to invoke the "MERGE" policy 1483 conf.set(MemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY, String.valueOf(0)); 1484 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) 1485 .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); 1486 byte[] value = Bytes.toBytes("thisisavarylargevalue"); 1487 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 1488 long ts = EnvironmentEdgeManager.currentTime(); 1489 long seqId = 100; 1490 // older data whihc shouldn't be "seen" by client 1491 store.add(createCell(qf1, ts, seqId, value), memStoreSizing); 1492 store.add(createCell(qf2, ts, seqId, value), memStoreSizing); 1493 store.add(createCell(qf3, ts, seqId, value), memStoreSizing); 1494 assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get()); 1495 StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY); 1496 storeFlushCtx.prepare(); 1497 // This shouldn't invoke another in-memory flush because the first compactor thread 1498 // hasn't accomplished the in-memory compaction. 1499 store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing); 1500 store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing); 1501 store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing); 1502 assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get()); 1503 //okay. Let the compaction be completed 1504 MyMemStoreCompactor.START_COMPACTOR_LATCH.countDown(); 1505 CompactingMemStore mem = (CompactingMemStore) ((HStore)store).memstore; 1506 while (mem.isMemStoreFlushingInMemory()) { 1507 TimeUnit.SECONDS.sleep(1); 1508 } 1509 // This should invoke another in-memory flush. 1510 store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing); 1511 store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing); 1512 store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing); 1513 assertEquals(2, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get()); 1514 conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1515 String.valueOf(TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE)); 1516 storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class)); 1517 storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); 1518 } 1519 1520 @Test 1521 public void testAge() throws IOException { 1522 long currentTime = System.currentTimeMillis(); 1523 ManualEnvironmentEdge edge = new ManualEnvironmentEdge(); 1524 edge.setValue(currentTime); 1525 EnvironmentEdgeManager.injectEdge(edge); 1526 Configuration conf = TEST_UTIL.getConfiguration(); 1527 ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.of(family); 1528 initHRegion(name.getMethodName(), conf, 1529 TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), hcd, null, false); 1530 HStore store = new HStore(region, hcd, conf, false) { 1531 1532 @Override 1533 protected StoreEngine<?, ?, ?, ?> createStoreEngine(HStore store, Configuration conf, 1534 CellComparator kvComparator) throws IOException { 1535 List<HStoreFile> storefiles = 1536 Arrays.asList(mockStoreFile(currentTime - 10), mockStoreFile(currentTime - 100), 1537 mockStoreFile(currentTime - 1000), mockStoreFile(currentTime - 10000)); 1538 StoreFileManager sfm = mock(StoreFileManager.class); 1539 when(sfm.getStorefiles()).thenReturn(storefiles); 1540 StoreEngine<?, ?, ?, ?> storeEngine = mock(StoreEngine.class); 1541 when(storeEngine.getStoreFileManager()).thenReturn(sfm); 1542 return storeEngine; 1543 } 1544 }; 1545 assertEquals(10L, store.getMinStoreFileAge().getAsLong()); 1546 assertEquals(10000L, store.getMaxStoreFileAge().getAsLong()); 1547 assertEquals((10 + 100 + 1000 + 10000) / 4.0, store.getAvgStoreFileAge().getAsDouble(), 1E-4); 1548 } 1549 1550 private HStoreFile mockStoreFile(long createdTime) { 1551 StoreFileInfo info = mock(StoreFileInfo.class); 1552 when(info.getCreatedTimestamp()).thenReturn(createdTime); 1553 HStoreFile sf = mock(HStoreFile.class); 1554 when(sf.getReader()).thenReturn(mock(StoreFileReader.class)); 1555 when(sf.isHFile()).thenReturn(true); 1556 when(sf.getFileInfo()).thenReturn(info); 1557 return sf; 1558 } 1559 1560 private MyStore initMyStore(String methodName, Configuration conf, MyStoreHook hook) 1561 throws IOException { 1562 return (MyStore) init(methodName, conf, 1563 TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), 1564 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(5).build(), hook); 1565 } 1566 1567 private static class MyStore extends HStore { 1568 private final MyStoreHook hook; 1569 1570 MyStore(final HRegion region, final ColumnFamilyDescriptor family, final Configuration 1571 confParam, MyStoreHook hook, boolean switchToPread) throws IOException { 1572 super(region, family, confParam, false); 1573 this.hook = hook; 1574 } 1575 1576 @Override 1577 public List<KeyValueScanner> getScanners(List<HStoreFile> files, boolean cacheBlocks, 1578 boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, 1579 boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt, 1580 boolean includeMemstoreScanner) throws IOException { 1581 hook.getScanners(this); 1582 return super.getScanners(files, cacheBlocks, usePread, isCompaction, matcher, startRow, true, 1583 stopRow, false, readPt, includeMemstoreScanner); 1584 } 1585 1586 @Override 1587 public long getSmallestReadPoint() { 1588 return hook.getSmallestReadPoint(this); 1589 } 1590 } 1591 1592 private abstract static class MyStoreHook { 1593 1594 void getScanners(MyStore store) throws IOException { 1595 } 1596 1597 long getSmallestReadPoint(HStore store) { 1598 return store.getHRegion().getSmallestReadPoint(); 1599 } 1600 } 1601 1602 @Test 1603 public void testSwitchingPreadtoStreamParallelyWithCompactionDischarger() throws Exception { 1604 Configuration conf = HBaseConfiguration.create(); 1605 conf.set("hbase.hstore.engine.class", DummyStoreEngine.class.getName()); 1606 conf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 0); 1607 // Set the lower threshold to invoke the "MERGE" policy 1608 MyStore store = initMyStore(name.getMethodName(), conf, new MyStoreHook() {}); 1609 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 1610 long ts = System.currentTimeMillis(); 1611 long seqID = 1L; 1612 // Add some data to the region and do some flushes 1613 for (int i = 1; i < 10; i++) { 1614 store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), 1615 memStoreSizing); 1616 } 1617 // flush them 1618 flushStore(store, seqID); 1619 for (int i = 11; i < 20; i++) { 1620 store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), 1621 memStoreSizing); 1622 } 1623 // flush them 1624 flushStore(store, seqID); 1625 for (int i = 21; i < 30; i++) { 1626 store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), 1627 memStoreSizing); 1628 } 1629 // flush them 1630 flushStore(store, seqID); 1631 1632 assertEquals(3, store.getStorefilesCount()); 1633 Scan scan = new Scan(); 1634 scan.addFamily(family); 1635 Collection<HStoreFile> storefiles2 = store.getStorefiles(); 1636 ArrayList<HStoreFile> actualStorefiles = Lists.newArrayList(storefiles2); 1637 StoreScanner storeScanner = 1638 (StoreScanner) store.getScanner(scan, scan.getFamilyMap().get(family), Long.MAX_VALUE); 1639 // get the current heap 1640 KeyValueHeap heap = storeScanner.heap; 1641 // create more store files 1642 for (int i = 31; i < 40; i++) { 1643 store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), 1644 memStoreSizing); 1645 } 1646 // flush them 1647 flushStore(store, seqID); 1648 1649 for (int i = 41; i < 50; i++) { 1650 store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), 1651 memStoreSizing); 1652 } 1653 // flush them 1654 flushStore(store, seqID); 1655 storefiles2 = store.getStorefiles(); 1656 ArrayList<HStoreFile> actualStorefiles1 = Lists.newArrayList(storefiles2); 1657 actualStorefiles1.removeAll(actualStorefiles); 1658 // Do compaction 1659 MyThread thread = new MyThread(storeScanner); 1660 thread.start(); 1661 store.replaceStoreFiles(actualStorefiles, actualStorefiles1); 1662 thread.join(); 1663 KeyValueHeap heap2 = thread.getHeap(); 1664 assertFalse(heap.equals(heap2)); 1665 } 1666 1667 @Test 1668 public void testInMemoryCompactionTypeWithLowerCase() throws IOException, InterruptedException { 1669 Configuration conf = HBaseConfiguration.create(); 1670 conf.set("hbase.systemtables.compacting.memstore.type", "eager"); 1671 init(name.getMethodName(), conf, 1672 TableDescriptorBuilder.newBuilder( 1673 TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME, "meta".getBytes())), 1674 ColumnFamilyDescriptorBuilder.newBuilder(family) 1675 .setInMemoryCompaction(MemoryCompactionPolicy.NONE).build()); 1676 assertTrue(((MemStoreCompactor) ((CompactingMemStore) store.memstore).compactor).toString() 1677 .startsWith("eager".toUpperCase())); 1678 } 1679 1680 @Test 1681 public void testSpaceQuotaChangeAfterReplacement() throws IOException { 1682 final TableName tn = TableName.valueOf(name.getMethodName()); 1683 init(name.getMethodName()); 1684 1685 RegionSizeStoreImpl sizeStore = new RegionSizeStoreImpl(); 1686 1687 HStoreFile sf1 = mockStoreFileWithLength(1024L); 1688 HStoreFile sf2 = mockStoreFileWithLength(2048L); 1689 HStoreFile sf3 = mockStoreFileWithLength(4096L); 1690 HStoreFile sf4 = mockStoreFileWithLength(8192L); 1691 1692 RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tn).setStartKey(Bytes.toBytes("a")) 1693 .setEndKey(Bytes.toBytes("b")).build(); 1694 1695 // Compacting two files down to one, reducing size 1696 sizeStore.put(regionInfo, 1024L + 4096L); 1697 store.updateSpaceQuotaAfterFileReplacement( 1698 sizeStore, regionInfo, Arrays.asList(sf1, sf3), Arrays.asList(sf2)); 1699 1700 assertEquals(2048L, sizeStore.getRegionSize(regionInfo).getSize()); 1701 1702 // The same file length in and out should have no change 1703 store.updateSpaceQuotaAfterFileReplacement( 1704 sizeStore, regionInfo, Arrays.asList(sf2), Arrays.asList(sf2)); 1705 1706 assertEquals(2048L, sizeStore.getRegionSize(regionInfo).getSize()); 1707 1708 // Increase the total size used 1709 store.updateSpaceQuotaAfterFileReplacement( 1710 sizeStore, regionInfo, Arrays.asList(sf2), Arrays.asList(sf3)); 1711 1712 assertEquals(4096L, sizeStore.getRegionSize(regionInfo).getSize()); 1713 1714 RegionInfo regionInfo2 = RegionInfoBuilder.newBuilder(tn).setStartKey(Bytes.toBytes("b")) 1715 .setEndKey(Bytes.toBytes("c")).build(); 1716 store.updateSpaceQuotaAfterFileReplacement(sizeStore, regionInfo2, null, Arrays.asList(sf4)); 1717 1718 assertEquals(8192L, sizeStore.getRegionSize(regionInfo2).getSize()); 1719 } 1720 1721 @Test 1722 public void testHFileContextSetWithCFAndTable() throws Exception { 1723 init(this.name.getMethodName()); 1724 StoreFileWriter writer = store.createWriterInTmp(10000L, 1725 Compression.Algorithm.NONE, false, true, false, true); 1726 HFileContext hFileContext = writer.getHFileWriter().getFileContext(); 1727 assertArrayEquals(family, hFileContext.getColumnFamily()); 1728 assertArrayEquals(table, hFileContext.getTableName()); 1729 } 1730 1731 private HStoreFile mockStoreFileWithLength(long length) { 1732 HStoreFile sf = mock(HStoreFile.class); 1733 StoreFileReader sfr = mock(StoreFileReader.class); 1734 when(sf.isHFile()).thenReturn(true); 1735 when(sf.getReader()).thenReturn(sfr); 1736 when(sfr.length()).thenReturn(length); 1737 return sf; 1738 } 1739 1740 private static class MyThread extends Thread { 1741 private StoreScanner scanner; 1742 private KeyValueHeap heap; 1743 1744 public MyThread(StoreScanner scanner) { 1745 this.scanner = scanner; 1746 } 1747 1748 public KeyValueHeap getHeap() { 1749 return this.heap; 1750 } 1751 1752 @Override 1753 public void run() { 1754 scanner.trySwitchToStreamRead(); 1755 heap = scanner.heap; 1756 } 1757 } 1758 1759 private static class MyMemStoreCompactor extends MemStoreCompactor { 1760 private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0); 1761 private static final CountDownLatch START_COMPACTOR_LATCH = new CountDownLatch(1); 1762 public MyMemStoreCompactor(CompactingMemStore compactingMemStore, MemoryCompactionPolicy 1763 compactionPolicy) throws IllegalArgumentIOException { 1764 super(compactingMemStore, compactionPolicy); 1765 } 1766 1767 @Override 1768 public boolean start() throws IOException { 1769 boolean isFirst = RUNNER_COUNT.getAndIncrement() == 0; 1770 if (isFirst) { 1771 try { 1772 START_COMPACTOR_LATCH.await(); 1773 return super.start(); 1774 } catch (InterruptedException ex) { 1775 throw new RuntimeException(ex); 1776 } 1777 } 1778 return super.start(); 1779 } 1780 } 1781 1782 public static class MyCompactingMemStoreWithCustomCompactor extends CompactingMemStore { 1783 private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0); 1784 public MyCompactingMemStoreWithCustomCompactor(Configuration conf, CellComparatorImpl c, 1785 HStore store, RegionServicesForStores regionServices, 1786 MemoryCompactionPolicy compactionPolicy) throws IOException { 1787 super(conf, c, store, regionServices, compactionPolicy); 1788 } 1789 1790 @Override 1791 protected MemStoreCompactor createMemStoreCompactor(MemoryCompactionPolicy compactionPolicy) 1792 throws IllegalArgumentIOException { 1793 return new MyMemStoreCompactor(this, compactionPolicy); 1794 } 1795 1796 @Override 1797 protected boolean setInMemoryCompactionFlag() { 1798 boolean rval = super.setInMemoryCompactionFlag(); 1799 if (rval) { 1800 RUNNER_COUNT.incrementAndGet(); 1801 if (LOG.isDebugEnabled()) { 1802 LOG.debug("runner count: " + RUNNER_COUNT.get()); 1803 } 1804 } 1805 return rval; 1806 } 1807 } 1808 1809 public static class MyCompactingMemStore extends CompactingMemStore { 1810 private static final AtomicBoolean START_TEST = new AtomicBoolean(false); 1811 private final CountDownLatch getScannerLatch = new CountDownLatch(1); 1812 private final CountDownLatch snapshotLatch = new CountDownLatch(1); 1813 public MyCompactingMemStore(Configuration conf, CellComparatorImpl c, 1814 HStore store, RegionServicesForStores regionServices, 1815 MemoryCompactionPolicy compactionPolicy) throws IOException { 1816 super(conf, c, store, regionServices, compactionPolicy); 1817 } 1818 1819 @Override 1820 protected List<KeyValueScanner> createList(int capacity) { 1821 if (START_TEST.get()) { 1822 try { 1823 getScannerLatch.countDown(); 1824 snapshotLatch.await(); 1825 } catch (InterruptedException e) { 1826 throw new RuntimeException(e); 1827 } 1828 } 1829 return new ArrayList<>(capacity); 1830 } 1831 @Override 1832 protected void pushActiveToPipeline(MutableSegment active) { 1833 if (START_TEST.get()) { 1834 try { 1835 getScannerLatch.await(); 1836 } catch (InterruptedException e) { 1837 throw new RuntimeException(e); 1838 } 1839 } 1840 1841 super.pushActiveToPipeline(active); 1842 if (START_TEST.get()) { 1843 snapshotLatch.countDown(); 1844 } 1845 } 1846 } 1847 1848 interface MyListHook { 1849 void hook(int currentSize); 1850 } 1851 1852 private static class MyList<T> implements List<T> { 1853 private final List<T> delegatee = new ArrayList<>(); 1854 private final MyListHook hookAtAdd; 1855 MyList(final MyListHook hookAtAdd) { 1856 this.hookAtAdd = hookAtAdd; 1857 } 1858 @Override 1859 public int size() {return delegatee.size();} 1860 1861 @Override 1862 public boolean isEmpty() {return delegatee.isEmpty();} 1863 1864 @Override 1865 public boolean contains(Object o) {return delegatee.contains(o);} 1866 1867 @Override 1868 public Iterator<T> iterator() {return delegatee.iterator();} 1869 1870 @Override 1871 public Object[] toArray() {return delegatee.toArray();} 1872 1873 @Override 1874 public <R> R[] toArray(R[] a) {return delegatee.toArray(a);} 1875 1876 @Override 1877 public boolean add(T e) { 1878 hookAtAdd.hook(size()); 1879 return delegatee.add(e); 1880 } 1881 1882 @Override 1883 public boolean remove(Object o) {return delegatee.remove(o);} 1884 1885 @Override 1886 public boolean containsAll(Collection<?> c) {return delegatee.containsAll(c);} 1887 1888 @Override 1889 public boolean addAll(Collection<? extends T> c) {return delegatee.addAll(c);} 1890 1891 @Override 1892 public boolean addAll(int index, Collection<? extends T> c) {return delegatee.addAll(index, c);} 1893 1894 @Override 1895 public boolean removeAll(Collection<?> c) {return delegatee.removeAll(c);} 1896 1897 @Override 1898 public boolean retainAll(Collection<?> c) {return delegatee.retainAll(c);} 1899 1900 @Override 1901 public void clear() {delegatee.clear();} 1902 1903 @Override 1904 public T get(int index) {return delegatee.get(index);} 1905 1906 @Override 1907 public T set(int index, T element) {return delegatee.set(index, element);} 1908 1909 @Override 1910 public void add(int index, T element) {delegatee.add(index, element);} 1911 1912 @Override 1913 public T remove(int index) {return delegatee.remove(index);} 1914 1915 @Override 1916 public int indexOf(Object o) {return delegatee.indexOf(o);} 1917 1918 @Override 1919 public int lastIndexOf(Object o) {return delegatee.lastIndexOf(o);} 1920 1921 @Override 1922 public ListIterator<T> listIterator() {return delegatee.listIterator();} 1923 1924 @Override 1925 public ListIterator<T> listIterator(int index) {return delegatee.listIterator(index);} 1926 1927 @Override 1928 public List<T> subList(int fromIndex, int toIndex) {return delegatee.subList(fromIndex, toIndex);} 1929 } 1930}