001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.Assert.assertArrayEquals; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertFalse; 023import static org.junit.Assert.assertTrue; 024import static org.junit.Assert.fail; 025import static org.mockito.ArgumentMatchers.any; 026import static org.mockito.Mockito.mock; 027import static org.mockito.Mockito.spy; 028import static org.mockito.Mockito.times; 029import static org.mockito.Mockito.verify; 030import static org.mockito.Mockito.when; 031 032import java.io.IOException; 033import java.lang.ref.SoftReference; 034import java.security.PrivilegedExceptionAction; 035import java.util.ArrayList; 036import java.util.Arrays; 037import java.util.Collection; 038import java.util.Collections; 039import java.util.Iterator; 040import java.util.List; 041import java.util.ListIterator; 042import java.util.NavigableSet; 043import java.util.TreeSet; 044import java.util.concurrent.ConcurrentSkipListSet; 045import java.util.concurrent.CountDownLatch; 046import java.util.concurrent.ExecutorService; 047import java.util.concurrent.Executors; 048import java.util.concurrent.ThreadPoolExecutor; 049import java.util.concurrent.TimeUnit; 050import java.util.concurrent.atomic.AtomicBoolean; 051import java.util.concurrent.atomic.AtomicInteger; 052import org.apache.hadoop.conf.Configuration; 053import org.apache.hadoop.fs.FSDataOutputStream; 054import org.apache.hadoop.fs.FileStatus; 055import org.apache.hadoop.fs.FileSystem; 056import org.apache.hadoop.fs.FilterFileSystem; 057import org.apache.hadoop.fs.LocalFileSystem; 058import org.apache.hadoop.fs.Path; 059import org.apache.hadoop.fs.permission.FsPermission; 060import org.apache.hadoop.hbase.Cell; 061import org.apache.hadoop.hbase.CellBuilderFactory; 062import org.apache.hadoop.hbase.CellBuilderType; 063import org.apache.hadoop.hbase.CellComparator; 064import org.apache.hadoop.hbase.CellComparatorImpl; 065import org.apache.hadoop.hbase.CellUtil; 066import org.apache.hadoop.hbase.HBaseClassTestRule; 067import org.apache.hadoop.hbase.HBaseConfiguration; 068import org.apache.hadoop.hbase.HBaseTestingUtility; 069import org.apache.hadoop.hbase.HConstants; 070import org.apache.hadoop.hbase.KeyValue; 071import org.apache.hadoop.hbase.MemoryCompactionPolicy; 072import org.apache.hadoop.hbase.NamespaceDescriptor; 073import org.apache.hadoop.hbase.PrivateCellUtil; 074import org.apache.hadoop.hbase.TableName; 075import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 076import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 077import org.apache.hadoop.hbase.client.Get; 078import org.apache.hadoop.hbase.client.RegionInfo; 079import org.apache.hadoop.hbase.client.RegionInfoBuilder; 080import org.apache.hadoop.hbase.client.Scan; 081import org.apache.hadoop.hbase.client.TableDescriptor; 082import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 083import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException; 084import org.apache.hadoop.hbase.filter.Filter; 085import org.apache.hadoop.hbase.filter.FilterBase; 086import org.apache.hadoop.hbase.io.compress.Compression; 087import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 088import org.apache.hadoop.hbase.io.hfile.CacheConfig; 089import org.apache.hadoop.hbase.io.hfile.HFile; 090import org.apache.hadoop.hbase.io.hfile.HFileContext; 091import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 092import org.apache.hadoop.hbase.monitoring.MonitoredTask; 093import org.apache.hadoop.hbase.quotas.RegionSizeStoreImpl; 094import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; 095import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; 096import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher; 097import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; 098import org.apache.hadoop.hbase.security.User; 099import org.apache.hadoop.hbase.testclassification.MediumTests; 100import org.apache.hadoop.hbase.testclassification.RegionServerTests; 101import org.apache.hadoop.hbase.util.Bytes; 102import org.apache.hadoop.hbase.util.CommonFSUtils; 103import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 104import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; 105import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge; 106import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; 107import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 108import org.apache.hadoop.hbase.wal.WALFactory; 109import org.apache.hadoop.util.Progressable; 110import org.junit.After; 111import org.junit.AfterClass; 112import org.junit.Before; 113import org.junit.ClassRule; 114import org.junit.Rule; 115import org.junit.Test; 116import org.junit.experimental.categories.Category; 117import org.junit.rules.TestName; 118import org.mockito.Mockito; 119import org.slf4j.Logger; 120import org.slf4j.LoggerFactory; 121 122import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 123 124/** 125 * Test class for the HStore 126 */ 127@Category({ RegionServerTests.class, MediumTests.class }) 128public class TestHStore { 129 130 @ClassRule 131 public static final HBaseClassTestRule CLASS_RULE = 132 HBaseClassTestRule.forClass(TestHStore.class); 133 134 private static final Logger LOG = LoggerFactory.getLogger(TestHStore.class); 135 @Rule 136 public TestName name = new TestName(); 137 138 HRegion region; 139 HStore store; 140 byte [] table = Bytes.toBytes("table"); 141 byte [] family = Bytes.toBytes("family"); 142 143 byte [] row = Bytes.toBytes("row"); 144 byte [] row2 = Bytes.toBytes("row2"); 145 byte [] qf1 = Bytes.toBytes("qf1"); 146 byte [] qf2 = Bytes.toBytes("qf2"); 147 byte [] qf3 = Bytes.toBytes("qf3"); 148 byte [] qf4 = Bytes.toBytes("qf4"); 149 byte [] qf5 = Bytes.toBytes("qf5"); 150 byte [] qf6 = Bytes.toBytes("qf6"); 151 152 NavigableSet<byte[]> qualifiers = new ConcurrentSkipListSet<>(Bytes.BYTES_COMPARATOR); 153 154 List<Cell> expected = new ArrayList<>(); 155 List<Cell> result = new ArrayList<>(); 156 157 long id = System.currentTimeMillis(); 158 Get get = new Get(row); 159 160 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 161 private static final String DIR = TEST_UTIL.getDataTestDir("TestStore").toString(); 162 163 164 /** 165 * Setup 166 * @throws IOException 167 */ 168 @Before 169 public void setUp() throws IOException { 170 qualifiers.clear(); 171 qualifiers.add(qf1); 172 qualifiers.add(qf3); 173 qualifiers.add(qf5); 174 175 Iterator<byte[]> iter = qualifiers.iterator(); 176 while(iter.hasNext()){ 177 byte [] next = iter.next(); 178 expected.add(new KeyValue(row, family, next, 1, (byte[])null)); 179 get.addColumn(family, next); 180 } 181 } 182 183 private void init(String methodName) throws IOException { 184 init(methodName, TEST_UTIL.getConfiguration()); 185 } 186 187 private HStore init(String methodName, Configuration conf) throws IOException { 188 // some of the tests write 4 versions and then flush 189 // (with HBASE-4241, lower versions are collected on flush) 190 return init(methodName, conf, 191 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(4).build()); 192 } 193 194 private HStore init(String methodName, Configuration conf, ColumnFamilyDescriptor hcd) 195 throws IOException { 196 return init(methodName, conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), hcd); 197 } 198 199 private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder, 200 ColumnFamilyDescriptor hcd) throws IOException { 201 return init(methodName, conf, builder, hcd, null); 202 } 203 204 private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder, 205 ColumnFamilyDescriptor hcd, MyStoreHook hook) throws IOException { 206 return init(methodName, conf, builder, hcd, hook, false); 207 } 208 209 private void initHRegion(String methodName, Configuration conf, TableDescriptorBuilder builder, 210 ColumnFamilyDescriptor hcd, MyStoreHook hook, boolean switchToPread) throws IOException { 211 TableDescriptor htd = builder.setColumnFamily(hcd).build(); 212 Path basedir = new Path(DIR + methodName); 213 Path tableDir = CommonFSUtils.getTableDir(basedir, htd.getTableName()); 214 final Path logdir = new Path(basedir, AbstractFSWALProvider.getWALDirectoryName(methodName)); 215 216 FileSystem fs = FileSystem.get(conf); 217 218 fs.delete(logdir, true); 219 ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 220 MemStoreLABImpl.CHUNK_SIZE_DEFAULT, 1, 0, null); 221 RegionInfo info = RegionInfoBuilder.newBuilder(htd.getTableName()).build(); 222 Configuration walConf = new Configuration(conf); 223 CommonFSUtils.setRootDir(walConf, basedir); 224 WALFactory wals = new WALFactory(walConf, methodName); 225 region = new HRegion(new HRegionFileSystem(conf, fs, tableDir, info), wals.getWAL(info), conf, 226 htd, null); 227 region.regionServicesForStores = Mockito.spy(region.regionServicesForStores); 228 ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1); 229 Mockito.when(region.regionServicesForStores.getInMemoryCompactionPool()).thenReturn(pool); 230 } 231 232 private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder, 233 ColumnFamilyDescriptor hcd, MyStoreHook hook, boolean switchToPread) throws IOException { 234 initHRegion(methodName, conf, builder, hcd, hook, switchToPread); 235 if (hook == null) { 236 store = new HStore(region, hcd, conf, false); 237 } else { 238 store = new MyStore(region, hcd, conf, hook, switchToPread); 239 } 240 return store; 241 } 242 243 /** 244 * Test we do not lose data if we fail a flush and then close. 245 * Part of HBase-10466 246 * @throws Exception 247 */ 248 @Test 249 public void testFlushSizeSizing() throws Exception { 250 LOG.info("Setting up a faulty file system that cannot write in " + this.name.getMethodName()); 251 final Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); 252 // Only retry once. 253 conf.setInt("hbase.hstore.flush.retries.number", 1); 254 User user = User.createUserForTesting(conf, this.name.getMethodName(), 255 new String[]{"foo"}); 256 // Inject our faulty LocalFileSystem 257 conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class); 258 user.runAs(new PrivilegedExceptionAction<Object>() { 259 @Override 260 public Object run() throws Exception { 261 // Make sure it worked (above is sensitive to caching details in hadoop core) 262 FileSystem fs = FileSystem.get(conf); 263 assertEquals(FaultyFileSystem.class, fs.getClass()); 264 FaultyFileSystem ffs = (FaultyFileSystem)fs; 265 266 // Initialize region 267 init(name.getMethodName(), conf); 268 269 MemStoreSize mss = store.memstore.getFlushableSize(); 270 assertEquals(0, mss.getDataSize()); 271 LOG.info("Adding some data"); 272 MemStoreSizing kvSize = new NonThreadSafeMemStoreSizing(); 273 store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), kvSize); 274 // add the heap size of active (mutable) segment 275 kvSize.incMemStoreSize(0, MutableSegment.DEEP_OVERHEAD, 0, 0); 276 mss = store.memstore.getFlushableSize(); 277 assertEquals(kvSize.getMemStoreSize(), mss); 278 // Flush. Bug #1 from HBASE-10466. Make sure size calculation on failed flush is right. 279 try { 280 LOG.info("Flushing"); 281 flushStore(store, id++); 282 fail("Didn't bubble up IOE!"); 283 } catch (IOException ioe) { 284 assertTrue(ioe.getMessage().contains("Fault injected")); 285 } 286 // due to snapshot, change mutable to immutable segment 287 kvSize.incMemStoreSize(0, 288 CSLMImmutableSegment.DEEP_OVERHEAD_CSLM - MutableSegment.DEEP_OVERHEAD, 0, 0); 289 mss = store.memstore.getFlushableSize(); 290 assertEquals(kvSize.getMemStoreSize(), mss); 291 MemStoreSizing kvSize2 = new NonThreadSafeMemStoreSizing(); 292 store.add(new KeyValue(row, family, qf2, 2, (byte[]) null), kvSize2); 293 kvSize2.incMemStoreSize(0, MutableSegment.DEEP_OVERHEAD, 0, 0); 294 // Even though we add a new kv, we expect the flushable size to be 'same' since we have 295 // not yet cleared the snapshot -- the above flush failed. 296 assertEquals(kvSize.getMemStoreSize(), mss); 297 ffs.fault.set(false); 298 flushStore(store, id++); 299 mss = store.memstore.getFlushableSize(); 300 // Size should be the foreground kv size. 301 assertEquals(kvSize2.getMemStoreSize(), mss); 302 flushStore(store, id++); 303 mss = store.memstore.getFlushableSize(); 304 assertEquals(0, mss.getDataSize()); 305 assertEquals(MutableSegment.DEEP_OVERHEAD, mss.getHeapSize()); 306 return null; 307 } 308 }); 309 } 310 311 /** 312 * Verify that compression and data block encoding are respected by the 313 * Store.createWriterInTmp() method, used on store flush. 314 */ 315 @Test 316 public void testCreateWriter() throws Exception { 317 Configuration conf = HBaseConfiguration.create(); 318 FileSystem fs = FileSystem.get(conf); 319 320 ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.newBuilder(family) 321 .setCompressionType(Compression.Algorithm.GZ).setDataBlockEncoding(DataBlockEncoding.DIFF) 322 .build(); 323 init(name.getMethodName(), conf, hcd); 324 325 // Test createWriterInTmp() 326 StoreFileWriter writer = 327 store.createWriterInTmp(4, hcd.getCompressionType(), false, true, false, false); 328 Path path = writer.getPath(); 329 writer.append(new KeyValue(row, family, qf1, Bytes.toBytes(1))); 330 writer.append(new KeyValue(row, family, qf2, Bytes.toBytes(2))); 331 writer.append(new KeyValue(row2, family, qf1, Bytes.toBytes(3))); 332 writer.append(new KeyValue(row2, family, qf2, Bytes.toBytes(4))); 333 writer.close(); 334 335 // Verify that compression and encoding settings are respected 336 HFile.Reader reader = HFile.createReader(fs, path, new CacheConfig(conf), true, conf); 337 assertEquals(hcd.getCompressionType(), reader.getTrailer().getCompressionCodec()); 338 assertEquals(hcd.getDataBlockEncoding(), reader.getDataBlockEncoding()); 339 reader.close(); 340 } 341 342 @Test 343 public void testDeleteExpiredStoreFiles() throws Exception { 344 testDeleteExpiredStoreFiles(0); 345 testDeleteExpiredStoreFiles(1); 346 } 347 348 /* 349 * @param minVersions the MIN_VERSIONS for the column family 350 */ 351 public void testDeleteExpiredStoreFiles(int minVersions) throws Exception { 352 int storeFileNum = 4; 353 int ttl = 4; 354 IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge(); 355 EnvironmentEdgeManagerTestHelper.injectEdge(edge); 356 357 Configuration conf = HBaseConfiguration.create(); 358 // Enable the expired store file deletion 359 conf.setBoolean("hbase.store.delete.expired.storefile", true); 360 // Set the compaction threshold higher to avoid normal compactions. 361 conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 5); 362 363 init(name.getMethodName() + "-" + minVersions, conf, ColumnFamilyDescriptorBuilder 364 .newBuilder(family).setMinVersions(minVersions).setTimeToLive(ttl).build()); 365 366 long storeTtl = this.store.getScanInfo().getTtl(); 367 long sleepTime = storeTtl / storeFileNum; 368 long timeStamp; 369 // There are 4 store files and the max time stamp difference among these 370 // store files will be (this.store.ttl / storeFileNum) 371 for (int i = 1; i <= storeFileNum; i++) { 372 LOG.info("Adding some data for the store file #" + i); 373 timeStamp = EnvironmentEdgeManager.currentTime(); 374 this.store.add(new KeyValue(row, family, qf1, timeStamp, (byte[]) null), null); 375 this.store.add(new KeyValue(row, family, qf2, timeStamp, (byte[]) null), null); 376 this.store.add(new KeyValue(row, family, qf3, timeStamp, (byte[]) null), null); 377 flush(i); 378 edge.incrementTime(sleepTime); 379 } 380 381 // Verify the total number of store files 382 assertEquals(storeFileNum, this.store.getStorefiles().size()); 383 384 // Each call will find one expired store file and delete it before compaction happens. 385 // There will be no compaction due to threshold above. Last file will not be replaced. 386 for (int i = 1; i <= storeFileNum - 1; i++) { 387 // verify the expired store file. 388 assertFalse(this.store.requestCompaction().isPresent()); 389 Collection<HStoreFile> sfs = this.store.getStorefiles(); 390 // Ensure i files are gone. 391 if (minVersions == 0) { 392 assertEquals(storeFileNum - i, sfs.size()); 393 // Ensure only non-expired files remain. 394 for (HStoreFile sf : sfs) { 395 assertTrue(sf.getReader().getMaxTimestamp() >= (edge.currentTime() - storeTtl)); 396 } 397 } else { 398 assertEquals(storeFileNum, sfs.size()); 399 } 400 // Let the next store file expired. 401 edge.incrementTime(sleepTime); 402 } 403 assertFalse(this.store.requestCompaction().isPresent()); 404 405 Collection<HStoreFile> sfs = this.store.getStorefiles(); 406 // Assert the last expired file is not removed. 407 if (minVersions == 0) { 408 assertEquals(1, sfs.size()); 409 } 410 long ts = sfs.iterator().next().getReader().getMaxTimestamp(); 411 assertTrue(ts < (edge.currentTime() - storeTtl)); 412 413 for (HStoreFile sf : sfs) { 414 sf.closeStoreFile(true); 415 } 416 } 417 418 @Test 419 public void testLowestModificationTime() throws Exception { 420 Configuration conf = HBaseConfiguration.create(); 421 FileSystem fs = FileSystem.get(conf); 422 // Initialize region 423 init(name.getMethodName(), conf); 424 425 int storeFileNum = 4; 426 for (int i = 1; i <= storeFileNum; i++) { 427 LOG.info("Adding some data for the store file #"+i); 428 this.store.add(new KeyValue(row, family, qf1, i, (byte[])null), null); 429 this.store.add(new KeyValue(row, family, qf2, i, (byte[])null), null); 430 this.store.add(new KeyValue(row, family, qf3, i, (byte[])null), null); 431 flush(i); 432 } 433 // after flush; check the lowest time stamp 434 long lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles()); 435 long lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles()); 436 assertEquals(lowestTimeStampFromManager,lowestTimeStampFromFS); 437 438 // after compact; check the lowest time stamp 439 store.compact(store.requestCompaction().get(), NoLimitThroughputController.INSTANCE, null); 440 lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles()); 441 lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles()); 442 assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS); 443 } 444 445 private static long getLowestTimeStampFromFS(FileSystem fs, 446 final Collection<HStoreFile> candidates) throws IOException { 447 long minTs = Long.MAX_VALUE; 448 if (candidates.isEmpty()) { 449 return minTs; 450 } 451 Path[] p = new Path[candidates.size()]; 452 int i = 0; 453 for (HStoreFile sf : candidates) { 454 p[i] = sf.getPath(); 455 ++i; 456 } 457 458 FileStatus[] stats = fs.listStatus(p); 459 if (stats == null || stats.length == 0) { 460 return minTs; 461 } 462 for (FileStatus s : stats) { 463 minTs = Math.min(minTs, s.getModificationTime()); 464 } 465 return minTs; 466 } 467 468 ////////////////////////////////////////////////////////////////////////////// 469 // Get tests 470 ////////////////////////////////////////////////////////////////////////////// 471 472 private static final int BLOCKSIZE_SMALL = 8192; 473 /** 474 * Test for hbase-1686. 475 * @throws IOException 476 */ 477 @Test 478 public void testEmptyStoreFile() throws IOException { 479 init(this.name.getMethodName()); 480 // Write a store file. 481 this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null); 482 this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null), null); 483 flush(1); 484 // Now put in place an empty store file. Its a little tricky. Have to 485 // do manually with hacked in sequence id. 486 HStoreFile f = this.store.getStorefiles().iterator().next(); 487 Path storedir = f.getPath().getParent(); 488 long seqid = f.getMaxSequenceId(); 489 Configuration c = HBaseConfiguration.create(); 490 FileSystem fs = FileSystem.get(c); 491 HFileContext meta = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).build(); 492 StoreFileWriter w = new StoreFileWriter.Builder(c, new CacheConfig(c), 493 fs) 494 .withOutputDir(storedir) 495 .withFileContext(meta) 496 .build(); 497 w.appendMetadata(seqid + 1, false); 498 w.close(); 499 this.store.close(); 500 // Reopen it... should pick up two files 501 this.store = 502 new HStore(this.store.getHRegion(), this.store.getColumnFamilyDescriptor(), c, false); 503 assertEquals(2, this.store.getStorefilesCount()); 504 505 result = HBaseTestingUtility.getFromStoreFile(store, 506 get.getRow(), 507 qualifiers); 508 assertEquals(1, result.size()); 509 } 510 511 /** 512 * Getting data from memstore only 513 * @throws IOException 514 */ 515 @Test 516 public void testGet_FromMemStoreOnly() throws IOException { 517 init(this.name.getMethodName()); 518 519 //Put data in memstore 520 this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null); 521 this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null), null); 522 this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null), null); 523 this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null), null); 524 this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null), null); 525 this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null), null); 526 527 //Get 528 result = HBaseTestingUtility.getFromStoreFile(store, 529 get.getRow(), qualifiers); 530 531 //Compare 532 assertCheck(); 533 } 534 535 @Test 536 public void testTimeRangeIfSomeCellsAreDroppedInFlush() throws IOException { 537 testTimeRangeIfSomeCellsAreDroppedInFlush(1); 538 testTimeRangeIfSomeCellsAreDroppedInFlush(3); 539 testTimeRangeIfSomeCellsAreDroppedInFlush(5); 540 } 541 542 private void testTimeRangeIfSomeCellsAreDroppedInFlush(int maxVersion) throws IOException { 543 init(this.name.getMethodName(), TEST_UTIL.getConfiguration(), 544 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(maxVersion).build()); 545 long currentTs = 100; 546 long minTs = currentTs; 547 // the extra cell won't be flushed to disk, 548 // so the min of timerange will be different between memStore and hfile. 549 for (int i = 0; i != (maxVersion + 1); ++i) { 550 this.store.add(new KeyValue(row, family, qf1, ++currentTs, (byte[])null), null); 551 if (i == 1) { 552 minTs = currentTs; 553 } 554 } 555 flushStore(store, id++); 556 557 Collection<HStoreFile> files = store.getStorefiles(); 558 assertEquals(1, files.size()); 559 HStoreFile f = files.iterator().next(); 560 f.initReader(); 561 StoreFileReader reader = f.getReader(); 562 assertEquals(minTs, reader.timeRange.getMin()); 563 assertEquals(currentTs, reader.timeRange.getMax()); 564 } 565 566 /** 567 * Getting data from files only 568 * @throws IOException 569 */ 570 @Test 571 public void testGet_FromFilesOnly() throws IOException { 572 init(this.name.getMethodName()); 573 574 //Put data in memstore 575 this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null); 576 this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null), null); 577 //flush 578 flush(1); 579 580 //Add more data 581 this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null), null); 582 this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null), null); 583 //flush 584 flush(2); 585 586 //Add more data 587 this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null), null); 588 this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null), null); 589 //flush 590 flush(3); 591 592 //Get 593 result = HBaseTestingUtility.getFromStoreFile(store, 594 get.getRow(), 595 qualifiers); 596 //this.store.get(get, qualifiers, result); 597 598 //Need to sort the result since multiple files 599 Collections.sort(result, CellComparatorImpl.COMPARATOR); 600 601 //Compare 602 assertCheck(); 603 } 604 605 /** 606 * Getting data from memstore and files 607 * @throws IOException 608 */ 609 @Test 610 public void testGet_FromMemStoreAndFiles() throws IOException { 611 init(this.name.getMethodName()); 612 613 //Put data in memstore 614 this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null); 615 this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null), null); 616 //flush 617 flush(1); 618 619 //Add more data 620 this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null), null); 621 this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null), null); 622 //flush 623 flush(2); 624 625 //Add more data 626 this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null), null); 627 this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null), null); 628 629 //Get 630 result = HBaseTestingUtility.getFromStoreFile(store, 631 get.getRow(), qualifiers); 632 633 //Need to sort the result since multiple files 634 Collections.sort(result, CellComparatorImpl.COMPARATOR); 635 636 //Compare 637 assertCheck(); 638 } 639 640 private void flush(int storeFilessize) throws IOException{ 641 this.store.snapshot(); 642 flushStore(store, id++); 643 assertEquals(storeFilessize, this.store.getStorefiles().size()); 644 assertEquals(0, ((AbstractMemStore)this.store.memstore).getActive().getCellsCount()); 645 } 646 647 private void assertCheck() { 648 assertEquals(expected.size(), result.size()); 649 for(int i=0; i<expected.size(); i++) { 650 assertEquals(expected.get(i), result.get(i)); 651 } 652 } 653 654 @After 655 public void tearDown() throws Exception { 656 EnvironmentEdgeManagerTestHelper.reset(); 657 if (store != null) { 658 try { 659 store.close(); 660 } catch (IOException e) { 661 } 662 store = null; 663 } 664 if (region != null) { 665 region.close(); 666 region = null; 667 } 668 } 669 670 @AfterClass 671 public static void tearDownAfterClass() throws IOException { 672 TEST_UTIL.cleanupTestDir(); 673 } 674 675 @Test 676 public void testHandleErrorsInFlush() throws Exception { 677 LOG.info("Setting up a faulty file system that cannot write"); 678 679 final Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); 680 User user = User.createUserForTesting(conf, 681 "testhandleerrorsinflush", new String[]{"foo"}); 682 // Inject our faulty LocalFileSystem 683 conf.setClass("fs.file.impl", FaultyFileSystem.class, 684 FileSystem.class); 685 user.runAs(new PrivilegedExceptionAction<Object>() { 686 @Override 687 public Object run() throws Exception { 688 // Make sure it worked (above is sensitive to caching details in hadoop core) 689 FileSystem fs = FileSystem.get(conf); 690 assertEquals(FaultyFileSystem.class, fs.getClass()); 691 692 // Initialize region 693 init(name.getMethodName(), conf); 694 695 LOG.info("Adding some data"); 696 store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null); 697 store.add(new KeyValue(row, family, qf2, 1, (byte[])null), null); 698 store.add(new KeyValue(row, family, qf3, 1, (byte[])null), null); 699 700 LOG.info("Before flush, we should have no files"); 701 702 Collection<StoreFileInfo> files = 703 store.getRegionFileSystem().getStoreFiles(store.getColumnFamilyName()); 704 assertEquals(0, files != null ? files.size() : 0); 705 706 //flush 707 try { 708 LOG.info("Flushing"); 709 flush(1); 710 fail("Didn't bubble up IOE!"); 711 } catch (IOException ioe) { 712 assertTrue(ioe.getMessage().contains("Fault injected")); 713 } 714 715 LOG.info("After failed flush, we should still have no files!"); 716 files = store.getRegionFileSystem().getStoreFiles(store.getColumnFamilyName()); 717 assertEquals(0, files != null ? files.size() : 0); 718 store.getHRegion().getWAL().close(); 719 return null; 720 } 721 }); 722 FileSystem.closeAllForUGI(user.getUGI()); 723 } 724 725 /** 726 * Faulty file system that will fail if you write past its fault position the FIRST TIME 727 * only; thereafter it will succeed. Used by {@link TestHRegion} too. 728 */ 729 static class FaultyFileSystem extends FilterFileSystem { 730 List<SoftReference<FaultyOutputStream>> outStreams = new ArrayList<>(); 731 private long faultPos = 200; 732 AtomicBoolean fault = new AtomicBoolean(true); 733 734 public FaultyFileSystem() { 735 super(new LocalFileSystem()); 736 System.err.println("Creating faulty!"); 737 } 738 739 @Override 740 public FSDataOutputStream create(Path p) throws IOException { 741 return new FaultyOutputStream(super.create(p), faultPos, fault); 742 } 743 744 @Override 745 public FSDataOutputStream create(Path f, FsPermission permission, 746 boolean overwrite, int bufferSize, short replication, long blockSize, 747 Progressable progress) throws IOException { 748 return new FaultyOutputStream(super.create(f, permission, 749 overwrite, bufferSize, replication, blockSize, progress), faultPos, fault); 750 } 751 752 @Override 753 public FSDataOutputStream createNonRecursive(Path f, boolean overwrite, 754 int bufferSize, short replication, long blockSize, Progressable progress) 755 throws IOException { 756 // Fake it. Call create instead. The default implementation throws an IOE 757 // that this is not supported. 758 return create(f, overwrite, bufferSize, replication, blockSize, progress); 759 } 760 } 761 762 static class FaultyOutputStream extends FSDataOutputStream { 763 volatile long faultPos = Long.MAX_VALUE; 764 private final AtomicBoolean fault; 765 766 public FaultyOutputStream(FSDataOutputStream out, long faultPos, final AtomicBoolean fault) 767 throws IOException { 768 super(out, null); 769 this.faultPos = faultPos; 770 this.fault = fault; 771 } 772 773 @Override 774 public synchronized void write(byte[] buf, int offset, int length) throws IOException { 775 System.err.println("faulty stream write at pos " + getPos()); 776 injectFault(); 777 super.write(buf, offset, length); 778 } 779 780 private void injectFault() throws IOException { 781 if (this.fault.get() && getPos() >= faultPos) { 782 throw new IOException("Fault injected"); 783 } 784 } 785 } 786 787 private static void flushStore(HStore store, long id) throws IOException { 788 StoreFlushContext storeFlushCtx = store.createFlushContext(id, FlushLifeCycleTracker.DUMMY); 789 storeFlushCtx.prepare(); 790 storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class)); 791 storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); 792 } 793 794 /** 795 * Generate a list of KeyValues for testing based on given parameters 796 * @param timestamps 797 * @param numRows 798 * @param qualifier 799 * @param family 800 * @return the rows key-value list 801 */ 802 List<Cell> getKeyValueSet(long[] timestamps, int numRows, 803 byte[] qualifier, byte[] family) { 804 List<Cell> kvList = new ArrayList<>(); 805 for (int i=1;i<=numRows;i++) { 806 byte[] b = Bytes.toBytes(i); 807 for (long timestamp: timestamps) { 808 kvList.add(new KeyValue(b, family, qualifier, timestamp, b)); 809 } 810 } 811 return kvList; 812 } 813 814 /** 815 * Test to ensure correctness when using Stores with multiple timestamps 816 * @throws IOException 817 */ 818 @Test 819 public void testMultipleTimestamps() throws IOException { 820 int numRows = 1; 821 long[] timestamps1 = new long[] {1,5,10,20}; 822 long[] timestamps2 = new long[] {30,80}; 823 824 init(this.name.getMethodName()); 825 826 List<Cell> kvList1 = getKeyValueSet(timestamps1,numRows, qf1, family); 827 for (Cell kv : kvList1) { 828 this.store.add(kv, null); 829 } 830 831 this.store.snapshot(); 832 flushStore(store, id++); 833 834 List<Cell> kvList2 = getKeyValueSet(timestamps2,numRows, qf1, family); 835 for(Cell kv : kvList2) { 836 this.store.add(kv, null); 837 } 838 839 List<Cell> result; 840 Get get = new Get(Bytes.toBytes(1)); 841 get.addColumn(family,qf1); 842 843 get.setTimeRange(0,15); 844 result = HBaseTestingUtility.getFromStoreFile(store, get); 845 assertTrue(result.size()>0); 846 847 get.setTimeRange(40,90); 848 result = HBaseTestingUtility.getFromStoreFile(store, get); 849 assertTrue(result.size()>0); 850 851 get.setTimeRange(10,45); 852 result = HBaseTestingUtility.getFromStoreFile(store, get); 853 assertTrue(result.size()>0); 854 855 get.setTimeRange(80,145); 856 result = HBaseTestingUtility.getFromStoreFile(store, get); 857 assertTrue(result.size()>0); 858 859 get.setTimeRange(1,2); 860 result = HBaseTestingUtility.getFromStoreFile(store, get); 861 assertTrue(result.size()>0); 862 863 get.setTimeRange(90,200); 864 result = HBaseTestingUtility.getFromStoreFile(store, get); 865 assertTrue(result.size()==0); 866 } 867 868 /** 869 * Test for HBASE-3492 - Test split on empty colfam (no store files). 870 * 871 * @throws IOException When the IO operations fail. 872 */ 873 @Test 874 public void testSplitWithEmptyColFam() throws IOException { 875 init(this.name.getMethodName()); 876 assertFalse(store.getSplitPoint().isPresent()); 877 store.getHRegion().forceSplit(null); 878 assertFalse(store.getSplitPoint().isPresent()); 879 store.getHRegion().clearSplit(); 880 } 881 882 @Test 883 public void testStoreUsesConfigurationFromHcdAndHtd() throws Exception { 884 final String CONFIG_KEY = "hbase.regionserver.thread.compaction.throttle"; 885 long anyValue = 10; 886 887 // We'll check that it uses correct config and propagates it appropriately by going thru 888 // the simplest "real" path I can find - "throttleCompaction", which just checks whether 889 // a number we pass in is higher than some config value, inside compactionPolicy. 890 Configuration conf = HBaseConfiguration.create(); 891 conf.setLong(CONFIG_KEY, anyValue); 892 init(name.getMethodName() + "-xml", conf); 893 assertTrue(store.throttleCompaction(anyValue + 1)); 894 assertFalse(store.throttleCompaction(anyValue)); 895 896 // HTD overrides XML. 897 --anyValue; 898 init(name.getMethodName() + "-htd", conf, TableDescriptorBuilder 899 .newBuilder(TableName.valueOf(table)).setValue(CONFIG_KEY, Long.toString(anyValue)), 900 ColumnFamilyDescriptorBuilder.of(family)); 901 assertTrue(store.throttleCompaction(anyValue + 1)); 902 assertFalse(store.throttleCompaction(anyValue)); 903 904 // HCD overrides them both. 905 --anyValue; 906 init(name.getMethodName() + "-hcd", conf, 907 TableDescriptorBuilder.newBuilder(TableName.valueOf(table)).setValue(CONFIG_KEY, 908 Long.toString(anyValue)), 909 ColumnFamilyDescriptorBuilder.newBuilder(family).setValue(CONFIG_KEY, Long.toString(anyValue)) 910 .build()); 911 assertTrue(store.throttleCompaction(anyValue + 1)); 912 assertFalse(store.throttleCompaction(anyValue)); 913 } 914 915 public static class DummyStoreEngine extends DefaultStoreEngine { 916 public static DefaultCompactor lastCreatedCompactor = null; 917 918 @Override 919 protected void createComponents(Configuration conf, HStore store, CellComparator comparator) 920 throws IOException { 921 super.createComponents(conf, store, comparator); 922 lastCreatedCompactor = this.compactor; 923 } 924 } 925 926 @Test 927 public void testStoreUsesSearchEngineOverride() throws Exception { 928 Configuration conf = HBaseConfiguration.create(); 929 conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DummyStoreEngine.class.getName()); 930 init(this.name.getMethodName(), conf); 931 assertEquals(DummyStoreEngine.lastCreatedCompactor, 932 this.store.storeEngine.getCompactor()); 933 } 934 935 private void addStoreFile() throws IOException { 936 HStoreFile f = this.store.getStorefiles().iterator().next(); 937 Path storedir = f.getPath().getParent(); 938 long seqid = this.store.getMaxSequenceId().orElse(0L); 939 Configuration c = TEST_UTIL.getConfiguration(); 940 FileSystem fs = FileSystem.get(c); 941 HFileContext fileContext = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).build(); 942 StoreFileWriter w = new StoreFileWriter.Builder(c, new CacheConfig(c), 943 fs) 944 .withOutputDir(storedir) 945 .withFileContext(fileContext) 946 .build(); 947 w.appendMetadata(seqid + 1, false); 948 w.close(); 949 LOG.info("Added store file:" + w.getPath()); 950 } 951 952 private void archiveStoreFile(int index) throws IOException { 953 Collection<HStoreFile> files = this.store.getStorefiles(); 954 HStoreFile sf = null; 955 Iterator<HStoreFile> it = files.iterator(); 956 for (int i = 0; i <= index; i++) { 957 sf = it.next(); 958 } 959 store.getRegionFileSystem().removeStoreFiles(store.getColumnFamilyName(), Lists.newArrayList(sf)); 960 } 961 962 private void closeCompactedFile(int index) throws IOException { 963 Collection<HStoreFile> files = 964 this.store.getStoreEngine().getStoreFileManager().getCompactedfiles(); 965 HStoreFile sf = null; 966 Iterator<HStoreFile> it = files.iterator(); 967 for (int i = 0; i <= index; i++) { 968 sf = it.next(); 969 } 970 sf.closeStoreFile(true); 971 store.getStoreEngine().getStoreFileManager().removeCompactedFiles(Lists.newArrayList(sf)); 972 } 973 974 @Test 975 public void testRefreshStoreFiles() throws Exception { 976 init(name.getMethodName()); 977 978 assertEquals(0, this.store.getStorefilesCount()); 979 980 // Test refreshing store files when no store files are there 981 store.refreshStoreFiles(); 982 assertEquals(0, this.store.getStorefilesCount()); 983 984 // add some data, flush 985 this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null); 986 flush(1); 987 assertEquals(1, this.store.getStorefilesCount()); 988 989 // add one more file 990 addStoreFile(); 991 992 assertEquals(1, this.store.getStorefilesCount()); 993 store.refreshStoreFiles(); 994 assertEquals(2, this.store.getStorefilesCount()); 995 996 // add three more files 997 addStoreFile(); 998 addStoreFile(); 999 addStoreFile(); 1000 1001 assertEquals(2, this.store.getStorefilesCount()); 1002 store.refreshStoreFiles(); 1003 assertEquals(5, this.store.getStorefilesCount()); 1004 1005 closeCompactedFile(0); 1006 archiveStoreFile(0); 1007 1008 assertEquals(5, this.store.getStorefilesCount()); 1009 store.refreshStoreFiles(); 1010 assertEquals(4, this.store.getStorefilesCount()); 1011 1012 archiveStoreFile(0); 1013 archiveStoreFile(1); 1014 archiveStoreFile(2); 1015 1016 assertEquals(4, this.store.getStorefilesCount()); 1017 store.refreshStoreFiles(); 1018 assertEquals(1, this.store.getStorefilesCount()); 1019 1020 archiveStoreFile(0); 1021 store.refreshStoreFiles(); 1022 assertEquals(0, this.store.getStorefilesCount()); 1023 } 1024 1025 @Test 1026 public void testRefreshStoreFilesNotChanged() throws IOException { 1027 init(name.getMethodName()); 1028 1029 assertEquals(0, this.store.getStorefilesCount()); 1030 1031 // add some data, flush 1032 this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null); 1033 flush(1); 1034 // add one more file 1035 addStoreFile(); 1036 1037 HStore spiedStore = spy(store); 1038 1039 // call first time after files changed 1040 spiedStore.refreshStoreFiles(); 1041 assertEquals(2, this.store.getStorefilesCount()); 1042 verify(spiedStore, times(1)).replaceStoreFiles(any(), any()); 1043 1044 // call second time 1045 spiedStore.refreshStoreFiles(); 1046 1047 //ensure that replaceStoreFiles is not called if files are not refreshed 1048 verify(spiedStore, times(0)).replaceStoreFiles(null, null); 1049 } 1050 1051 private long countMemStoreScanner(StoreScanner scanner) { 1052 if (scanner.currentScanners == null) { 1053 return 0; 1054 } 1055 return scanner.currentScanners.stream() 1056 .filter(s -> !s.isFileScanner()) 1057 .count(); 1058 } 1059 1060 @Test 1061 public void testNumberOfMemStoreScannersAfterFlush() throws IOException { 1062 long seqId = 100; 1063 long timestamp = System.currentTimeMillis(); 1064 Cell cell0 = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family) 1065 .setQualifier(qf1).setTimestamp(timestamp).setType(Cell.Type.Put) 1066 .setValue(qf1).build(); 1067 PrivateCellUtil.setSequenceId(cell0, seqId); 1068 testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Collections.emptyList()); 1069 1070 Cell cell1 = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family) 1071 .setQualifier(qf2).setTimestamp(timestamp).setType(Cell.Type.Put) 1072 .setValue(qf1).build(); 1073 PrivateCellUtil.setSequenceId(cell1, seqId); 1074 testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Arrays.asList(cell1)); 1075 1076 seqId = 101; 1077 timestamp = System.currentTimeMillis(); 1078 Cell cell2 = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row2).setFamily(family) 1079 .setQualifier(qf2).setTimestamp(timestamp).setType(Cell.Type.Put) 1080 .setValue(qf1).build(); 1081 PrivateCellUtil.setSequenceId(cell2, seqId); 1082 testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Arrays.asList(cell1, cell2)); 1083 } 1084 1085 private void testNumberOfMemStoreScannersAfterFlush(List<Cell> inputCellsBeforeSnapshot, 1086 List<Cell> inputCellsAfterSnapshot) throws IOException { 1087 init(this.name.getMethodName() + "-" + inputCellsBeforeSnapshot.size()); 1088 TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); 1089 long seqId = Long.MIN_VALUE; 1090 for (Cell c : inputCellsBeforeSnapshot) { 1091 quals.add(CellUtil.cloneQualifier(c)); 1092 seqId = Math.max(seqId, c.getSequenceId()); 1093 } 1094 for (Cell c : inputCellsAfterSnapshot) { 1095 quals.add(CellUtil.cloneQualifier(c)); 1096 seqId = Math.max(seqId, c.getSequenceId()); 1097 } 1098 inputCellsBeforeSnapshot.forEach(c -> store.add(c, null)); 1099 StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY); 1100 storeFlushCtx.prepare(); 1101 inputCellsAfterSnapshot.forEach(c -> store.add(c, null)); 1102 int numberOfMemScannersBeforeFlush = inputCellsAfterSnapshot.isEmpty() ? 1 : 2; 1103 try (StoreScanner s = (StoreScanner) store.getScanner(new Scan(), quals, seqId)) { 1104 // snapshot + active (if inputCellsAfterSnapshot isn't empty) 1105 assertEquals(numberOfMemScannersBeforeFlush, countMemStoreScanner(s)); 1106 storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class)); 1107 storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); 1108 // snapshot has no data after flush 1109 int numberOfMemScannersAfterFlush = inputCellsAfterSnapshot.isEmpty() ? 0 : 1; 1110 boolean more; 1111 int cellCount = 0; 1112 do { 1113 List<Cell> cells = new ArrayList<>(); 1114 more = s.next(cells); 1115 cellCount += cells.size(); 1116 assertEquals(more ? numberOfMemScannersAfterFlush : 0, countMemStoreScanner(s)); 1117 } while (more); 1118 assertEquals("The number of cells added before snapshot is " + inputCellsBeforeSnapshot.size() 1119 + ", The number of cells added after snapshot is " + inputCellsAfterSnapshot.size(), 1120 inputCellsBeforeSnapshot.size() + inputCellsAfterSnapshot.size(), cellCount); 1121 // the current scanners is cleared 1122 assertEquals(0, countMemStoreScanner(s)); 1123 } 1124 } 1125 1126 private Cell createCell(byte[] qualifier, long ts, long sequenceId, byte[] value) 1127 throws IOException { 1128 return createCell(row, qualifier, ts, sequenceId, value); 1129 } 1130 1131 private Cell createCell(byte[] row, byte[] qualifier, long ts, long sequenceId, byte[] value) 1132 throws IOException { 1133 Cell c = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family) 1134 .setQualifier(qualifier).setTimestamp(ts).setType(Cell.Type.Put) 1135 .setValue(value).build(); 1136 PrivateCellUtil.setSequenceId(c, sequenceId); 1137 return c; 1138 } 1139 1140 @Test 1141 public void testFlushBeforeCompletingScanWoFilter() throws IOException, InterruptedException { 1142 final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false); 1143 final int expectedSize = 3; 1144 testFlushBeforeCompletingScan(new MyListHook() { 1145 @Override 1146 public void hook(int currentSize) { 1147 if (currentSize == expectedSize - 1) { 1148 try { 1149 flushStore(store, id++); 1150 timeToGoNextRow.set(true); 1151 } catch (IOException e) { 1152 throw new RuntimeException(e); 1153 } 1154 } 1155 } 1156 }, new FilterBase() { 1157 @Override 1158 public Filter.ReturnCode filterCell(final Cell c) throws IOException { 1159 return ReturnCode.INCLUDE; 1160 } 1161 }, expectedSize); 1162 } 1163 1164 @Test 1165 public void testFlushBeforeCompletingScanWithFilter() throws IOException, InterruptedException { 1166 final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false); 1167 final int expectedSize = 2; 1168 testFlushBeforeCompletingScan(new MyListHook() { 1169 @Override 1170 public void hook(int currentSize) { 1171 if (currentSize == expectedSize - 1) { 1172 try { 1173 flushStore(store, id++); 1174 timeToGoNextRow.set(true); 1175 } catch (IOException e) { 1176 throw new RuntimeException(e); 1177 } 1178 } 1179 } 1180 }, new FilterBase() { 1181 @Override 1182 public Filter.ReturnCode filterCell(final Cell c) throws IOException { 1183 if (timeToGoNextRow.get()) { 1184 timeToGoNextRow.set(false); 1185 return ReturnCode.NEXT_ROW; 1186 } else { 1187 return ReturnCode.INCLUDE; 1188 } 1189 } 1190 }, expectedSize); 1191 } 1192 1193 @Test 1194 public void testFlushBeforeCompletingScanWithFilterHint() throws IOException, 1195 InterruptedException { 1196 final AtomicBoolean timeToGetHint = new AtomicBoolean(false); 1197 final int expectedSize = 2; 1198 testFlushBeforeCompletingScan(new MyListHook() { 1199 @Override 1200 public void hook(int currentSize) { 1201 if (currentSize == expectedSize - 1) { 1202 try { 1203 flushStore(store, id++); 1204 timeToGetHint.set(true); 1205 } catch (IOException e) { 1206 throw new RuntimeException(e); 1207 } 1208 } 1209 } 1210 }, new FilterBase() { 1211 @Override 1212 public Filter.ReturnCode filterCell(final Cell c) throws IOException { 1213 if (timeToGetHint.get()) { 1214 timeToGetHint.set(false); 1215 return Filter.ReturnCode.SEEK_NEXT_USING_HINT; 1216 } else { 1217 return Filter.ReturnCode.INCLUDE; 1218 } 1219 } 1220 @Override 1221 public Cell getNextCellHint(Cell currentCell) throws IOException { 1222 return currentCell; 1223 } 1224 }, expectedSize); 1225 } 1226 1227 private void testFlushBeforeCompletingScan(MyListHook hook, Filter filter, int expectedSize) 1228 throws IOException, InterruptedException { 1229 Configuration conf = HBaseConfiguration.create(); 1230 byte[] r0 = Bytes.toBytes("row0"); 1231 byte[] r1 = Bytes.toBytes("row1"); 1232 byte[] r2 = Bytes.toBytes("row2"); 1233 byte[] value0 = Bytes.toBytes("value0"); 1234 byte[] value1 = Bytes.toBytes("value1"); 1235 byte[] value2 = Bytes.toBytes("value2"); 1236 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 1237 long ts = EnvironmentEdgeManager.currentTime(); 1238 long seqId = 100; 1239 init(name.getMethodName(), conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), 1240 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(1).build(), 1241 new MyStoreHook() { 1242 @Override 1243 public long getSmallestReadPoint(HStore store) { 1244 return seqId + 3; 1245 } 1246 }); 1247 // The cells having the value0 won't be flushed to disk because the value of max version is 1 1248 store.add(createCell(r0, qf1, ts, seqId, value0), memStoreSizing); 1249 store.add(createCell(r0, qf2, ts, seqId, value0), memStoreSizing); 1250 store.add(createCell(r0, qf3, ts, seqId, value0), memStoreSizing); 1251 store.add(createCell(r1, qf1, ts + 1, seqId + 1, value1), memStoreSizing); 1252 store.add(createCell(r1, qf2, ts + 1, seqId + 1, value1), memStoreSizing); 1253 store.add(createCell(r1, qf3, ts + 1, seqId + 1, value1), memStoreSizing); 1254 store.add(createCell(r2, qf1, ts + 2, seqId + 2, value2), memStoreSizing); 1255 store.add(createCell(r2, qf2, ts + 2, seqId + 2, value2), memStoreSizing); 1256 store.add(createCell(r2, qf3, ts + 2, seqId + 2, value2), memStoreSizing); 1257 store.add(createCell(r1, qf1, ts + 3, seqId + 3, value1), memStoreSizing); 1258 store.add(createCell(r1, qf2, ts + 3, seqId + 3, value1), memStoreSizing); 1259 store.add(createCell(r1, qf3, ts + 3, seqId + 3, value1), memStoreSizing); 1260 List<Cell> myList = new MyList<>(hook); 1261 Scan scan = new Scan() 1262 .withStartRow(r1) 1263 .setFilter(filter); 1264 try (InternalScanner scanner = (InternalScanner) store.getScanner( 1265 scan, null, seqId + 3)){ 1266 // r1 1267 scanner.next(myList); 1268 assertEquals(expectedSize, myList.size()); 1269 for (Cell c : myList) { 1270 byte[] actualValue = CellUtil.cloneValue(c); 1271 assertTrue("expected:" + Bytes.toStringBinary(value1) 1272 + ", actual:" + Bytes.toStringBinary(actualValue) 1273 , Bytes.equals(actualValue, value1)); 1274 } 1275 List<Cell> normalList = new ArrayList<>(3); 1276 // r2 1277 scanner.next(normalList); 1278 assertEquals(3, normalList.size()); 1279 for (Cell c : normalList) { 1280 byte[] actualValue = CellUtil.cloneValue(c); 1281 assertTrue("expected:" + Bytes.toStringBinary(value2) 1282 + ", actual:" + Bytes.toStringBinary(actualValue) 1283 , Bytes.equals(actualValue, value2)); 1284 } 1285 } 1286 } 1287 1288 @Test 1289 public void testCreateScannerAndSnapshotConcurrently() throws IOException, InterruptedException { 1290 Configuration conf = HBaseConfiguration.create(); 1291 conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore.class.getName()); 1292 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) 1293 .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); 1294 byte[] value = Bytes.toBytes("value"); 1295 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 1296 long ts = EnvironmentEdgeManager.currentTime(); 1297 long seqId = 100; 1298 // older data whihc shouldn't be "seen" by client 1299 store.add(createCell(qf1, ts, seqId, value), memStoreSizing); 1300 store.add(createCell(qf2, ts, seqId, value), memStoreSizing); 1301 store.add(createCell(qf3, ts, seqId, value), memStoreSizing); 1302 TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); 1303 quals.add(qf1); 1304 quals.add(qf2); 1305 quals.add(qf3); 1306 StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY); 1307 MyCompactingMemStore.START_TEST.set(true); 1308 Runnable flush = () -> { 1309 // this is blocked until we create first scanner from pipeline and snapshot -- phase (1/5) 1310 // recreate the active memstore -- phase (4/5) 1311 storeFlushCtx.prepare(); 1312 }; 1313 ExecutorService service = Executors.newSingleThreadExecutor(); 1314 service.submit(flush); 1315 // we get scanner from pipeline and snapshot but they are empty. -- phase (2/5) 1316 // this is blocked until we recreate the active memstore -- phase (3/5) 1317 // we get scanner from active memstore but it is empty -- phase (5/5) 1318 InternalScanner scanner = (InternalScanner) store.getScanner( 1319 new Scan(new Get(row)), quals, seqId + 1); 1320 service.shutdown(); 1321 service.awaitTermination(20, TimeUnit.SECONDS); 1322 try { 1323 try { 1324 List<Cell> results = new ArrayList<>(); 1325 scanner.next(results); 1326 assertEquals(3, results.size()); 1327 for (Cell c : results) { 1328 byte[] actualValue = CellUtil.cloneValue(c); 1329 assertTrue("expected:" + Bytes.toStringBinary(value) 1330 + ", actual:" + Bytes.toStringBinary(actualValue) 1331 , Bytes.equals(actualValue, value)); 1332 } 1333 } finally { 1334 scanner.close(); 1335 } 1336 } finally { 1337 MyCompactingMemStore.START_TEST.set(false); 1338 storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class)); 1339 storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); 1340 } 1341 } 1342 1343 @Test 1344 public void testScanWithDoubleFlush() throws IOException { 1345 Configuration conf = HBaseConfiguration.create(); 1346 // Initialize region 1347 MyStore myStore = initMyStore(name.getMethodName(), conf, new MyStoreHook(){ 1348 @Override 1349 public void getScanners(MyStore store) throws IOException { 1350 final long tmpId = id++; 1351 ExecutorService s = Executors.newSingleThreadExecutor(); 1352 s.submit(() -> { 1353 try { 1354 // flush the store before storescanner updates the scanners from store. 1355 // The current data will be flushed into files, and the memstore will 1356 // be clear. 1357 // -- phase (4/4) 1358 flushStore(store, tmpId); 1359 }catch (IOException ex) { 1360 throw new RuntimeException(ex); 1361 } 1362 }); 1363 s.shutdown(); 1364 try { 1365 // wait for the flush, the thread will be blocked in HStore#notifyChangedReadersObservers. 1366 s.awaitTermination(3, TimeUnit.SECONDS); 1367 } catch (InterruptedException ex) { 1368 } 1369 } 1370 }); 1371 byte[] oldValue = Bytes.toBytes("oldValue"); 1372 byte[] currentValue = Bytes.toBytes("currentValue"); 1373 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 1374 long ts = EnvironmentEdgeManager.currentTime(); 1375 long seqId = 100; 1376 // older data whihc shouldn't be "seen" by client 1377 myStore.add(createCell(qf1, ts, seqId, oldValue), memStoreSizing); 1378 myStore.add(createCell(qf2, ts, seqId, oldValue), memStoreSizing); 1379 myStore.add(createCell(qf3, ts, seqId, oldValue), memStoreSizing); 1380 long snapshotId = id++; 1381 // push older data into snapshot -- phase (1/4) 1382 StoreFlushContext storeFlushCtx = store.createFlushContext(snapshotId, FlushLifeCycleTracker 1383 .DUMMY); 1384 storeFlushCtx.prepare(); 1385 1386 // insert current data into active -- phase (2/4) 1387 myStore.add(createCell(qf1, ts + 1, seqId + 1, currentValue), memStoreSizing); 1388 myStore.add(createCell(qf2, ts + 1, seqId + 1, currentValue), memStoreSizing); 1389 myStore.add(createCell(qf3, ts + 1, seqId + 1, currentValue), memStoreSizing); 1390 TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); 1391 quals.add(qf1); 1392 quals.add(qf2); 1393 quals.add(qf3); 1394 try (InternalScanner scanner = (InternalScanner) myStore.getScanner( 1395 new Scan(new Get(row)), quals, seqId + 1)) { 1396 // complete the flush -- phase (3/4) 1397 storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class)); 1398 storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); 1399 1400 List<Cell> results = new ArrayList<>(); 1401 scanner.next(results); 1402 assertEquals(3, results.size()); 1403 for (Cell c : results) { 1404 byte[] actualValue = CellUtil.cloneValue(c); 1405 assertTrue("expected:" + Bytes.toStringBinary(currentValue) 1406 + ", actual:" + Bytes.toStringBinary(actualValue) 1407 , Bytes.equals(actualValue, currentValue)); 1408 } 1409 } 1410 } 1411 1412 @Test 1413 public void testReclaimChunkWhenScaning() throws IOException { 1414 init("testReclaimChunkWhenScaning"); 1415 long ts = EnvironmentEdgeManager.currentTime(); 1416 long seqId = 100; 1417 byte[] value = Bytes.toBytes("value"); 1418 // older data whihc shouldn't be "seen" by client 1419 store.add(createCell(qf1, ts, seqId, value), null); 1420 store.add(createCell(qf2, ts, seqId, value), null); 1421 store.add(createCell(qf3, ts, seqId, value), null); 1422 TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); 1423 quals.add(qf1); 1424 quals.add(qf2); 1425 quals.add(qf3); 1426 try (InternalScanner scanner = (InternalScanner) store.getScanner( 1427 new Scan(new Get(row)), quals, seqId)) { 1428 List<Cell> results = new MyList<>(size -> { 1429 switch (size) { 1430 // 1) we get the first cell (qf1) 1431 // 2) flush the data to have StoreScanner update inner scanners 1432 // 3) the chunk will be reclaimed after updaing 1433 case 1: 1434 try { 1435 flushStore(store, id++); 1436 } catch (IOException e) { 1437 throw new RuntimeException(e); 1438 } 1439 break; 1440 // 1) we get the second cell (qf2) 1441 // 2) add some cell to fill some byte into the chunk (we have only one chunk) 1442 case 2: 1443 try { 1444 byte[] newValue = Bytes.toBytes("newValue"); 1445 // older data whihc shouldn't be "seen" by client 1446 store.add(createCell(qf1, ts + 1, seqId + 1, newValue), null); 1447 store.add(createCell(qf2, ts + 1, seqId + 1, newValue), null); 1448 store.add(createCell(qf3, ts + 1, seqId + 1, newValue), null); 1449 } catch (IOException e) { 1450 throw new RuntimeException(e); 1451 } 1452 break; 1453 default: 1454 break; 1455 } 1456 }); 1457 scanner.next(results); 1458 assertEquals(3, results.size()); 1459 for (Cell c : results) { 1460 byte[] actualValue = CellUtil.cloneValue(c); 1461 assertTrue("expected:" + Bytes.toStringBinary(value) 1462 + ", actual:" + Bytes.toStringBinary(actualValue) 1463 , Bytes.equals(actualValue, value)); 1464 } 1465 } 1466 } 1467 1468 /** 1469 * If there are two running InMemoryFlushRunnable, the later InMemoryFlushRunnable 1470 * may change the versionedList. And the first InMemoryFlushRunnable will use the chagned 1471 * versionedList to remove the corresponding segments. 1472 * In short, there will be some segements which isn't in merge are removed. 1473 * @throws IOException 1474 * @throws InterruptedException 1475 */ 1476 @Test 1477 public void testRunDoubleMemStoreCompactors() throws IOException, InterruptedException { 1478 int flushSize = 500; 1479 Configuration conf = HBaseConfiguration.create(); 1480 conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStoreWithCustomCompactor.class.getName()); 1481 conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.25); 1482 MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.set(0); 1483 conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushSize)); 1484 // Set the lower threshold to invoke the "MERGE" policy 1485 conf.set(MemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY, String.valueOf(0)); 1486 init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) 1487 .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); 1488 byte[] value = Bytes.toBytes("thisisavarylargevalue"); 1489 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 1490 long ts = EnvironmentEdgeManager.currentTime(); 1491 long seqId = 100; 1492 // older data whihc shouldn't be "seen" by client 1493 store.add(createCell(qf1, ts, seqId, value), memStoreSizing); 1494 store.add(createCell(qf2, ts, seqId, value), memStoreSizing); 1495 store.add(createCell(qf3, ts, seqId, value), memStoreSizing); 1496 assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get()); 1497 StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY); 1498 storeFlushCtx.prepare(); 1499 // This shouldn't invoke another in-memory flush because the first compactor thread 1500 // hasn't accomplished the in-memory compaction. 1501 store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing); 1502 store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing); 1503 store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing); 1504 assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get()); 1505 //okay. Let the compaction be completed 1506 MyMemStoreCompactor.START_COMPACTOR_LATCH.countDown(); 1507 CompactingMemStore mem = (CompactingMemStore) ((HStore)store).memstore; 1508 while (mem.isMemStoreFlushingInMemory()) { 1509 TimeUnit.SECONDS.sleep(1); 1510 } 1511 // This should invoke another in-memory flush. 1512 store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing); 1513 store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing); 1514 store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing); 1515 assertEquals(2, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get()); 1516 conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1517 String.valueOf(TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE)); 1518 storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class)); 1519 storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); 1520 } 1521 1522 @Test 1523 public void testAge() throws IOException { 1524 long currentTime = System.currentTimeMillis(); 1525 ManualEnvironmentEdge edge = new ManualEnvironmentEdge(); 1526 edge.setValue(currentTime); 1527 EnvironmentEdgeManager.injectEdge(edge); 1528 Configuration conf = TEST_UTIL.getConfiguration(); 1529 ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.of(family); 1530 initHRegion(name.getMethodName(), conf, 1531 TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), hcd, null, false); 1532 HStore store = new HStore(region, hcd, conf, false) { 1533 1534 @Override 1535 protected StoreEngine<?, ?, ?, ?> createStoreEngine(HStore store, Configuration conf, 1536 CellComparator kvComparator) throws IOException { 1537 List<HStoreFile> storefiles = 1538 Arrays.asList(mockStoreFile(currentTime - 10), mockStoreFile(currentTime - 100), 1539 mockStoreFile(currentTime - 1000), mockStoreFile(currentTime - 10000)); 1540 StoreFileManager sfm = mock(StoreFileManager.class); 1541 when(sfm.getStorefiles()).thenReturn(storefiles); 1542 StoreEngine<?, ?, ?, ?> storeEngine = mock(StoreEngine.class); 1543 when(storeEngine.getStoreFileManager()).thenReturn(sfm); 1544 return storeEngine; 1545 } 1546 }; 1547 assertEquals(10L, store.getMinStoreFileAge().getAsLong()); 1548 assertEquals(10000L, store.getMaxStoreFileAge().getAsLong()); 1549 assertEquals((10 + 100 + 1000 + 10000) / 4.0, store.getAvgStoreFileAge().getAsDouble(), 1E-4); 1550 } 1551 1552 private HStoreFile mockStoreFile(long createdTime) { 1553 StoreFileInfo info = mock(StoreFileInfo.class); 1554 when(info.getCreatedTimestamp()).thenReturn(createdTime); 1555 HStoreFile sf = mock(HStoreFile.class); 1556 when(sf.getReader()).thenReturn(mock(StoreFileReader.class)); 1557 when(sf.isHFile()).thenReturn(true); 1558 when(sf.getFileInfo()).thenReturn(info); 1559 return sf; 1560 } 1561 1562 private MyStore initMyStore(String methodName, Configuration conf, MyStoreHook hook) 1563 throws IOException { 1564 return (MyStore) init(methodName, conf, 1565 TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), 1566 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(5).build(), hook); 1567 } 1568 1569 private static class MyStore extends HStore { 1570 private final MyStoreHook hook; 1571 1572 MyStore(final HRegion region, final ColumnFamilyDescriptor family, final Configuration 1573 confParam, MyStoreHook hook, boolean switchToPread) throws IOException { 1574 super(region, family, confParam, false); 1575 this.hook = hook; 1576 } 1577 1578 @Override 1579 public List<KeyValueScanner> getScanners(List<HStoreFile> files, boolean cacheBlocks, 1580 boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, 1581 boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt, 1582 boolean includeMemstoreScanner) throws IOException { 1583 hook.getScanners(this); 1584 return super.getScanners(files, cacheBlocks, usePread, isCompaction, matcher, startRow, true, 1585 stopRow, false, readPt, includeMemstoreScanner); 1586 } 1587 1588 @Override 1589 public long getSmallestReadPoint() { 1590 return hook.getSmallestReadPoint(this); 1591 } 1592 } 1593 1594 private abstract static class MyStoreHook { 1595 1596 void getScanners(MyStore store) throws IOException { 1597 } 1598 1599 long getSmallestReadPoint(HStore store) { 1600 return store.getHRegion().getSmallestReadPoint(); 1601 } 1602 } 1603 1604 @Test 1605 public void testSwitchingPreadtoStreamParallelyWithCompactionDischarger() throws Exception { 1606 Configuration conf = HBaseConfiguration.create(); 1607 conf.set("hbase.hstore.engine.class", DummyStoreEngine.class.getName()); 1608 conf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 0); 1609 // Set the lower threshold to invoke the "MERGE" policy 1610 MyStore store = initMyStore(name.getMethodName(), conf, new MyStoreHook() {}); 1611 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); 1612 long ts = System.currentTimeMillis(); 1613 long seqID = 1L; 1614 // Add some data to the region and do some flushes 1615 for (int i = 1; i < 10; i++) { 1616 store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), 1617 memStoreSizing); 1618 } 1619 // flush them 1620 flushStore(store, seqID); 1621 for (int i = 11; i < 20; i++) { 1622 store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), 1623 memStoreSizing); 1624 } 1625 // flush them 1626 flushStore(store, seqID); 1627 for (int i = 21; i < 30; i++) { 1628 store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), 1629 memStoreSizing); 1630 } 1631 // flush them 1632 flushStore(store, seqID); 1633 1634 assertEquals(3, store.getStorefilesCount()); 1635 Scan scan = new Scan(); 1636 scan.addFamily(family); 1637 Collection<HStoreFile> storefiles2 = store.getStorefiles(); 1638 ArrayList<HStoreFile> actualStorefiles = Lists.newArrayList(storefiles2); 1639 StoreScanner storeScanner = 1640 (StoreScanner) store.getScanner(scan, scan.getFamilyMap().get(family), Long.MAX_VALUE); 1641 // get the current heap 1642 KeyValueHeap heap = storeScanner.heap; 1643 // create more store files 1644 for (int i = 31; i < 40; i++) { 1645 store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), 1646 memStoreSizing); 1647 } 1648 // flush them 1649 flushStore(store, seqID); 1650 1651 for (int i = 41; i < 50; i++) { 1652 store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), 1653 memStoreSizing); 1654 } 1655 // flush them 1656 flushStore(store, seqID); 1657 storefiles2 = store.getStorefiles(); 1658 ArrayList<HStoreFile> actualStorefiles1 = Lists.newArrayList(storefiles2); 1659 actualStorefiles1.removeAll(actualStorefiles); 1660 // Do compaction 1661 MyThread thread = new MyThread(storeScanner); 1662 thread.start(); 1663 store.replaceStoreFiles(actualStorefiles, actualStorefiles1); 1664 thread.join(); 1665 KeyValueHeap heap2 = thread.getHeap(); 1666 assertFalse(heap.equals(heap2)); 1667 } 1668 1669 @Test 1670 public void testInMemoryCompactionTypeWithLowerCase() throws IOException, InterruptedException { 1671 Configuration conf = HBaseConfiguration.create(); 1672 conf.set("hbase.systemtables.compacting.memstore.type", "eager"); 1673 init(name.getMethodName(), conf, 1674 TableDescriptorBuilder.newBuilder( 1675 TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME, "meta".getBytes())), 1676 ColumnFamilyDescriptorBuilder.newBuilder(family) 1677 .setInMemoryCompaction(MemoryCompactionPolicy.NONE).build()); 1678 assertTrue(((MemStoreCompactor) ((CompactingMemStore) store.memstore).compactor).toString() 1679 .startsWith("eager".toUpperCase())); 1680 } 1681 1682 @Test 1683 public void testSpaceQuotaChangeAfterReplacement() throws IOException { 1684 final TableName tn = TableName.valueOf(name.getMethodName()); 1685 init(name.getMethodName()); 1686 1687 RegionSizeStoreImpl sizeStore = new RegionSizeStoreImpl(); 1688 1689 HStoreFile sf1 = mockStoreFileWithLength(1024L); 1690 HStoreFile sf2 = mockStoreFileWithLength(2048L); 1691 HStoreFile sf3 = mockStoreFileWithLength(4096L); 1692 HStoreFile sf4 = mockStoreFileWithLength(8192L); 1693 1694 RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tn).setStartKey(Bytes.toBytes("a")) 1695 .setEndKey(Bytes.toBytes("b")).build(); 1696 1697 // Compacting two files down to one, reducing size 1698 sizeStore.put(regionInfo, 1024L + 4096L); 1699 store.updateSpaceQuotaAfterFileReplacement( 1700 sizeStore, regionInfo, Arrays.asList(sf1, sf3), Arrays.asList(sf2)); 1701 1702 assertEquals(2048L, sizeStore.getRegionSize(regionInfo).getSize()); 1703 1704 // The same file length in and out should have no change 1705 store.updateSpaceQuotaAfterFileReplacement( 1706 sizeStore, regionInfo, Arrays.asList(sf2), Arrays.asList(sf2)); 1707 1708 assertEquals(2048L, sizeStore.getRegionSize(regionInfo).getSize()); 1709 1710 // Increase the total size used 1711 store.updateSpaceQuotaAfterFileReplacement( 1712 sizeStore, regionInfo, Arrays.asList(sf2), Arrays.asList(sf3)); 1713 1714 assertEquals(4096L, sizeStore.getRegionSize(regionInfo).getSize()); 1715 1716 RegionInfo regionInfo2 = RegionInfoBuilder.newBuilder(tn).setStartKey(Bytes.toBytes("b")) 1717 .setEndKey(Bytes.toBytes("c")).build(); 1718 store.updateSpaceQuotaAfterFileReplacement(sizeStore, regionInfo2, null, Arrays.asList(sf4)); 1719 1720 assertEquals(8192L, sizeStore.getRegionSize(regionInfo2).getSize()); 1721 } 1722 1723 @Test 1724 public void testHFileContextSetWithCFAndTable() throws Exception { 1725 init(this.name.getMethodName()); 1726 StoreFileWriter writer = store.createWriterInTmp(10000L, 1727 Compression.Algorithm.NONE, false, true, false, true); 1728 HFileContext hFileContext = writer.getHFileWriter().getFileContext(); 1729 assertArrayEquals(family, hFileContext.getColumnFamily()); 1730 assertArrayEquals(table, hFileContext.getTableName()); 1731 } 1732 1733 private HStoreFile mockStoreFileWithLength(long length) { 1734 HStoreFile sf = mock(HStoreFile.class); 1735 StoreFileReader sfr = mock(StoreFileReader.class); 1736 when(sf.isHFile()).thenReturn(true); 1737 when(sf.getReader()).thenReturn(sfr); 1738 when(sfr.length()).thenReturn(length); 1739 return sf; 1740 } 1741 1742 private static class MyThread extends Thread { 1743 private StoreScanner scanner; 1744 private KeyValueHeap heap; 1745 1746 public MyThread(StoreScanner scanner) { 1747 this.scanner = scanner; 1748 } 1749 1750 public KeyValueHeap getHeap() { 1751 return this.heap; 1752 } 1753 1754 @Override 1755 public void run() { 1756 scanner.trySwitchToStreamRead(); 1757 heap = scanner.heap; 1758 } 1759 } 1760 1761 private static class MyMemStoreCompactor extends MemStoreCompactor { 1762 private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0); 1763 private static final CountDownLatch START_COMPACTOR_LATCH = new CountDownLatch(1); 1764 public MyMemStoreCompactor(CompactingMemStore compactingMemStore, MemoryCompactionPolicy 1765 compactionPolicy) throws IllegalArgumentIOException { 1766 super(compactingMemStore, compactionPolicy); 1767 } 1768 1769 @Override 1770 public boolean start() throws IOException { 1771 boolean isFirst = RUNNER_COUNT.getAndIncrement() == 0; 1772 if (isFirst) { 1773 try { 1774 START_COMPACTOR_LATCH.await(); 1775 return super.start(); 1776 } catch (InterruptedException ex) { 1777 throw new RuntimeException(ex); 1778 } 1779 } 1780 return super.start(); 1781 } 1782 } 1783 1784 public static class MyCompactingMemStoreWithCustomCompactor extends CompactingMemStore { 1785 private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0); 1786 public MyCompactingMemStoreWithCustomCompactor(Configuration conf, CellComparatorImpl c, 1787 HStore store, RegionServicesForStores regionServices, 1788 MemoryCompactionPolicy compactionPolicy) throws IOException { 1789 super(conf, c, store, regionServices, compactionPolicy); 1790 } 1791 1792 @Override 1793 protected MemStoreCompactor createMemStoreCompactor(MemoryCompactionPolicy compactionPolicy) 1794 throws IllegalArgumentIOException { 1795 return new MyMemStoreCompactor(this, compactionPolicy); 1796 } 1797 1798 @Override 1799 protected boolean setInMemoryCompactionFlag() { 1800 boolean rval = super.setInMemoryCompactionFlag(); 1801 if (rval) { 1802 RUNNER_COUNT.incrementAndGet(); 1803 if (LOG.isDebugEnabled()) { 1804 LOG.debug("runner count: " + RUNNER_COUNT.get()); 1805 } 1806 } 1807 return rval; 1808 } 1809 } 1810 1811 public static class MyCompactingMemStore extends CompactingMemStore { 1812 private static final AtomicBoolean START_TEST = new AtomicBoolean(false); 1813 private final CountDownLatch getScannerLatch = new CountDownLatch(1); 1814 private final CountDownLatch snapshotLatch = new CountDownLatch(1); 1815 public MyCompactingMemStore(Configuration conf, CellComparatorImpl c, 1816 HStore store, RegionServicesForStores regionServices, 1817 MemoryCompactionPolicy compactionPolicy) throws IOException { 1818 super(conf, c, store, regionServices, compactionPolicy); 1819 } 1820 1821 @Override 1822 protected List<KeyValueScanner> createList(int capacity) { 1823 if (START_TEST.get()) { 1824 try { 1825 getScannerLatch.countDown(); 1826 snapshotLatch.await(); 1827 } catch (InterruptedException e) { 1828 throw new RuntimeException(e); 1829 } 1830 } 1831 return new ArrayList<>(capacity); 1832 } 1833 @Override 1834 protected void pushActiveToPipeline(MutableSegment active) { 1835 if (START_TEST.get()) { 1836 try { 1837 getScannerLatch.await(); 1838 } catch (InterruptedException e) { 1839 throw new RuntimeException(e); 1840 } 1841 } 1842 1843 super.pushActiveToPipeline(active); 1844 if (START_TEST.get()) { 1845 snapshotLatch.countDown(); 1846 } 1847 } 1848 } 1849 1850 interface MyListHook { 1851 void hook(int currentSize); 1852 } 1853 1854 private static class MyList<T> implements List<T> { 1855 private final List<T> delegatee = new ArrayList<>(); 1856 private final MyListHook hookAtAdd; 1857 MyList(final MyListHook hookAtAdd) { 1858 this.hookAtAdd = hookAtAdd; 1859 } 1860 @Override 1861 public int size() {return delegatee.size();} 1862 1863 @Override 1864 public boolean isEmpty() {return delegatee.isEmpty();} 1865 1866 @Override 1867 public boolean contains(Object o) {return delegatee.contains(o);} 1868 1869 @Override 1870 public Iterator<T> iterator() {return delegatee.iterator();} 1871 1872 @Override 1873 public Object[] toArray() {return delegatee.toArray();} 1874 1875 @Override 1876 public <R> R[] toArray(R[] a) {return delegatee.toArray(a);} 1877 1878 @Override 1879 public boolean add(T e) { 1880 hookAtAdd.hook(size()); 1881 return delegatee.add(e); 1882 } 1883 1884 @Override 1885 public boolean remove(Object o) {return delegatee.remove(o);} 1886 1887 @Override 1888 public boolean containsAll(Collection<?> c) {return delegatee.containsAll(c);} 1889 1890 @Override 1891 public boolean addAll(Collection<? extends T> c) {return delegatee.addAll(c);} 1892 1893 @Override 1894 public boolean addAll(int index, Collection<? extends T> c) {return delegatee.addAll(index, c);} 1895 1896 @Override 1897 public boolean removeAll(Collection<?> c) {return delegatee.removeAll(c);} 1898 1899 @Override 1900 public boolean retainAll(Collection<?> c) {return delegatee.retainAll(c);} 1901 1902 @Override 1903 public void clear() {delegatee.clear();} 1904 1905 @Override 1906 public T get(int index) {return delegatee.get(index);} 1907 1908 @Override 1909 public T set(int index, T element) {return delegatee.set(index, element);} 1910 1911 @Override 1912 public void add(int index, T element) {delegatee.add(index, element);} 1913 1914 @Override 1915 public T remove(int index) {return delegatee.remove(index);} 1916 1917 @Override 1918 public int indexOf(Object o) {return delegatee.indexOf(o);} 1919 1920 @Override 1921 public int lastIndexOf(Object o) {return delegatee.lastIndexOf(o);} 1922 1923 @Override 1924 public ListIterator<T> listIterator() {return delegatee.listIterator();} 1925 1926 @Override 1927 public ListIterator<T> listIterator(int index) {return delegatee.listIterator(index);} 1928 1929 @Override 1930 public List<T> subList(int fromIndex, int toIndex) {return delegatee.subList(fromIndex, toIndex);} 1931 } 1932}