001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertNull; 022import static org.junit.Assert.assertTrue; 023 024import java.io.IOException; 025import java.lang.management.ManagementFactory; 026import java.util.ArrayList; 027import java.util.List; 028import java.util.concurrent.Executors; 029import java.util.concurrent.ThreadPoolExecutor; 030 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.hbase.Cell; 033import org.apache.hadoop.hbase.CellComparator; 034import org.apache.hadoop.hbase.CellUtil; 035import org.apache.hadoop.hbase.HBaseClassTestRule; 036import org.apache.hadoop.hbase.HBaseConfiguration; 037import org.apache.hadoop.hbase.HBaseTestingUtility; 038import org.apache.hadoop.hbase.HColumnDescriptor; 039import org.apache.hadoop.hbase.HConstants; 040import org.apache.hadoop.hbase.HRegionInfo; 041import org.apache.hadoop.hbase.HTableDescriptor; 042import org.apache.hadoop.hbase.KeepDeletedCells; 043import org.apache.hadoop.hbase.KeyValue; 044import org.apache.hadoop.hbase.KeyValueTestUtil; 045import org.apache.hadoop.hbase.MemoryCompactionPolicy; 046import org.apache.hadoop.hbase.TableName; 047import org.apache.hadoop.hbase.client.Scan; 048import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException; 049import org.apache.hadoop.hbase.io.util.MemorySizeUtil; 050import org.apache.hadoop.hbase.testclassification.MediumTests; 051import org.apache.hadoop.hbase.testclassification.RegionServerTests; 052import org.apache.hadoop.hbase.util.Bytes; 053import org.apache.hadoop.hbase.util.EnvironmentEdge; 054import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 055import org.apache.hadoop.hbase.util.Threads; 056import org.apache.hadoop.hbase.wal.WAL; 057import org.junit.After; 058import org.junit.Before; 059import org.junit.ClassRule; 060import org.junit.Test; 061import org.junit.experimental.categories.Category; 062import org.mockito.Mockito; 063import org.slf4j.Logger; 064import org.slf4j.LoggerFactory; 065 066/** 067 * compacted memstore test case 068 */ 069@Category({RegionServerTests.class, MediumTests.class}) 070public class TestCompactingMemStore extends TestDefaultMemStore { 071 072 @ClassRule 073 public static final HBaseClassTestRule CLASS_RULE = 074 HBaseClassTestRule.forClass(TestCompactingMemStore.class); 075 076 private static final Logger LOG = LoggerFactory.getLogger(TestCompactingMemStore.class); 077 protected static ChunkCreator chunkCreator; 078 protected HRegion region; 079 protected RegionServicesForStores regionServicesForStores; 080 protected HStore store; 081 082 ////////////////////////////////////////////////////////////////////////////// 083 // Helpers 084 ////////////////////////////////////////////////////////////////////////////// 085 protected static byte[] makeQualifier(final int i1, final int i2) { 086 return Bytes.toBytes(Integer.toString(i1) + ";" + 087 Integer.toString(i2)); 088 } 089 090 @After 091 public void tearDown() throws Exception { 092 chunkCreator.clearChunksInPool(); 093 } 094 095 @Override 096 @Before 097 public void setUp() throws Exception { 098 compactingSetUp(); 099 this.memstore = new MyCompactingMemStore(HBaseConfiguration.create(), CellComparator.getInstance(), 100 store, regionServicesForStores, MemoryCompactionPolicy.EAGER); 101 ((CompactingMemStore)memstore).setIndexType(CompactingMemStore.IndexType.ARRAY_MAP); 102 } 103 104 protected void compactingSetUp() throws Exception { 105 super.internalSetUp(); 106 Configuration conf = new Configuration(); 107 conf.setBoolean(MemStoreLAB.USEMSLAB_KEY, true); 108 conf.setFloat(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, 0.2f); 109 conf.setInt(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, 1000); 110 HBaseTestingUtility hbaseUtility = HBaseTestingUtility.createLocalHTU(conf); 111 HColumnDescriptor hcd = new HColumnDescriptor(FAMILY); 112 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("foobar")); 113 htd.addFamily(hcd); 114 HRegionInfo info = 115 new HRegionInfo(TableName.valueOf("foobar"), null, null, false); 116 WAL wal = hbaseUtility.createWal(conf, hbaseUtility.getDataTestDir(), info); 117 this.region = HRegion.createHRegion(info, hbaseUtility.getDataTestDir(), conf, htd, wal, true); 118 this.regionServicesForStores = Mockito.spy(region.getRegionServicesForStores()); 119 ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1); 120 Mockito.when(regionServicesForStores.getInMemoryCompactionPool()).thenReturn(pool); 121 this.store = new HStore(region, hcd, conf, false); 122 123 long globalMemStoreLimit = (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage() 124 .getMax() * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false)); 125 chunkCreator = ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 126 globalMemStoreLimit, 0.4f, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, 127 null); 128 assertTrue(chunkCreator != null); 129 } 130 131 /** 132 * A simple test which verifies the 3 possible states when scanning across snapshot. 133 * 134 * @throws IOException 135 * @throws CloneNotSupportedException 136 */ 137 @Override 138 @Test 139 public void testScanAcrossSnapshot2() throws IOException, CloneNotSupportedException { 140 // we are going to the scanning across snapshot with two kvs 141 // kv1 should always be returned before kv2 142 final byte[] one = Bytes.toBytes(1); 143 final byte[] two = Bytes.toBytes(2); 144 final byte[] f = Bytes.toBytes("f"); 145 final byte[] q = Bytes.toBytes("q"); 146 final byte[] v = Bytes.toBytes(3); 147 148 final KeyValue kv1 = new KeyValue(one, f, q, 10, v); 149 final KeyValue kv2 = new KeyValue(two, f, q, 10, v); 150 151 // use case 1: both kvs in kvset 152 this.memstore.add(kv1.clone(), null); 153 this.memstore.add(kv2.clone(), null); 154 verifyScanAcrossSnapshot2(kv1, kv2); 155 156 // use case 2: both kvs in snapshot 157 this.memstore.snapshot(); 158 verifyScanAcrossSnapshot2(kv1, kv2); 159 160 // use case 3: first in snapshot second in kvset 161 this.memstore = new CompactingMemStore(HBaseConfiguration.create(), 162 CellComparator.getInstance(), store, regionServicesForStores, 163 MemoryCompactionPolicy.EAGER); 164 this.memstore.add(kv1.clone(), null); 165 // As compaction is starting in the background the repetition 166 // of the k1 might be removed BUT the scanners created earlier 167 // should look on the OLD MutableCellSetSegment, so this should be OK... 168 this.memstore.snapshot(); 169 this.memstore.add(kv2.clone(), null); 170 verifyScanAcrossSnapshot2(kv1,kv2); 171 } 172 173 /** 174 * Test memstore snapshots 175 * @throws IOException 176 */ 177 @Override 178 @Test 179 public void testSnapshotting() throws IOException { 180 final int snapshotCount = 5; 181 // Add some rows, run a snapshot. Do it a few times. 182 for (int i = 0; i < snapshotCount; i++) { 183 addRows(this.memstore); 184 runSnapshot(this.memstore, true); 185 assertEquals("History not being cleared", 0, this.memstore.getSnapshot().getCellsCount()); 186 } 187 } 188 189 190 ////////////////////////////////////////////////////////////////////////////// 191 // Get tests 192 ////////////////////////////////////////////////////////////////////////////// 193 194 /** Test getNextRow from memstore 195 * @throws InterruptedException 196 */ 197 @Override 198 @Test 199 public void testGetNextRow() throws Exception { 200 addRows(this.memstore); 201 // Add more versions to make it a little more interesting. 202 Thread.sleep(1); 203 addRows(this.memstore); 204 Cell closestToEmpty = ((CompactingMemStore)this.memstore).getNextRow(KeyValue.LOWESTKEY); 205 assertTrue(CellComparator.getInstance().compareRows(closestToEmpty, 206 new KeyValue(Bytes.toBytes(0), System.currentTimeMillis())) == 0); 207 for (int i = 0; i < ROW_COUNT; i++) { 208 Cell nr = ((CompactingMemStore)this.memstore).getNextRow(new KeyValue(Bytes.toBytes(i), 209 System.currentTimeMillis())); 210 if (i + 1 == ROW_COUNT) { 211 assertNull(nr); 212 } else { 213 assertTrue(CellComparator.getInstance().compareRows(nr, 214 new KeyValue(Bytes.toBytes(i + 1), System.currentTimeMillis())) == 0); 215 } 216 } 217 //starting from each row, validate results should contain the starting row 218 Configuration conf = HBaseConfiguration.create(); 219 for (int startRowId = 0; startRowId < ROW_COUNT; startRowId++) { 220 ScanInfo scanInfo = new ScanInfo(conf, FAMILY, 0, 1, Integer.MAX_VALUE, 221 KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, this.memstore.getComparator(), false); 222 try (InternalScanner scanner = 223 new StoreScanner(new Scan().withStartRow(Bytes.toBytes(startRowId)), scanInfo, null, 224 memstore.getScanners(0))) { 225 List<Cell> results = new ArrayList<>(); 226 for (int i = 0; scanner.next(results); i++) { 227 int rowId = startRowId + i; 228 Cell left = results.get(0); 229 byte[] row1 = Bytes.toBytes(rowId); 230 assertTrue("Row name", 231 CellComparator.getInstance().compareRows(left, row1, 0, row1.length) == 0); 232 assertEquals("Count of columns", QUALIFIER_COUNT, results.size()); 233 List<Cell> row = new ArrayList<>(); 234 for (Cell kv : results) { 235 row.add(kv); 236 } 237 isExpectedRowWithoutTimestamps(rowId, row); 238 // Clear out set. Otherwise row results accumulate. 239 results.clear(); 240 } 241 } 242 } 243 } 244 245 @Override 246 @Test 247 public void testGet_memstoreAndSnapShot() throws IOException { 248 byte[] row = Bytes.toBytes("testrow"); 249 byte[] fam = Bytes.toBytes("testfamily"); 250 byte[] qf1 = Bytes.toBytes("testqualifier1"); 251 byte[] qf2 = Bytes.toBytes("testqualifier2"); 252 byte[] qf3 = Bytes.toBytes("testqualifier3"); 253 byte[] qf4 = Bytes.toBytes("testqualifier4"); 254 byte[] qf5 = Bytes.toBytes("testqualifier5"); 255 byte[] val = Bytes.toBytes("testval"); 256 257 //Setting up memstore 258 memstore.add(new KeyValue(row, fam, qf1, val), null); 259 memstore.add(new KeyValue(row, fam, qf2, val), null); 260 memstore.add(new KeyValue(row, fam, qf3, val), null); 261 //Pushing to pipeline 262 ((CompactingMemStore)memstore).flushInMemory(); 263 assertEquals(0, memstore.getSnapshot().getCellsCount()); 264 //Creating a snapshot 265 memstore.snapshot(); 266 assertEquals(3, memstore.getSnapshot().getCellsCount()); 267 //Adding value to "new" memstore 268 assertEquals(0, memstore.getActive().getCellsCount()); 269 memstore.add(new KeyValue(row, fam, qf4, val), null); 270 memstore.add(new KeyValue(row, fam, qf5, val), null); 271 assertEquals(2, memstore.getActive().getCellsCount()); 272 } 273 274 //////////////////////////////////// 275 // Test for periodic memstore flushes 276 // based on time of oldest edit 277 //////////////////////////////////// 278 279 /** 280 * Add keyvalues with a fixed memstoreTs, and checks that memstore size is decreased 281 * as older keyvalues are deleted from the memstore. 282 * 283 * @throws Exception 284 */ 285 @Override 286 @Test 287 public void testUpsertMemstoreSize() throws Exception { 288 MemStoreSize oldSize = memstore.size(); 289 290 List<Cell> l = new ArrayList<>(); 291 KeyValue kv1 = KeyValueTestUtil.create("r", "f", "q", 100, "v"); 292 KeyValue kv2 = KeyValueTestUtil.create("r", "f", "q", 101, "v"); 293 KeyValue kv3 = KeyValueTestUtil.create("r", "f", "q", 102, "v"); 294 295 kv1.setSequenceId(1); 296 kv2.setSequenceId(1); 297 kv3.setSequenceId(1); 298 l.add(kv1); 299 l.add(kv2); 300 l.add(kv3); 301 302 this.memstore.upsert(l, 2, null);// readpoint is 2 303 MemStoreSize newSize = this.memstore.size(); 304 assert (newSize.getDataSize() > oldSize.getDataSize()); 305 //The kv1 should be removed. 306 assert (memstore.getActive().getCellsCount() == 2); 307 308 KeyValue kv4 = KeyValueTestUtil.create("r", "f", "q", 104, "v"); 309 kv4.setSequenceId(1); 310 l.clear(); 311 l.add(kv4); 312 this.memstore.upsert(l, 3, null); 313 assertEquals(newSize, this.memstore.size()); 314 //The kv2 should be removed. 315 assert (memstore.getActive().getCellsCount() == 2); 316 //this.memstore = null; 317 } 318 319 /** 320 * Tests that the timeOfOldestEdit is updated correctly for the 321 * various edit operations in memstore. 322 * @throws Exception 323 */ 324 @Override 325 @Test 326 public void testUpdateToTimeOfOldestEdit() throws Exception { 327 try { 328 EnvironmentEdgeForMemstoreTest edge = new EnvironmentEdgeForMemstoreTest(); 329 EnvironmentEdgeManager.injectEdge(edge); 330 long t = memstore.timeOfOldestEdit(); 331 assertEquals(Long.MAX_VALUE, t); 332 333 // test the case that the timeOfOldestEdit is updated after a KV add 334 memstore.add(KeyValueTestUtil.create("r", "f", "q", 100, "v"), null); 335 t = memstore.timeOfOldestEdit(); 336 assertTrue(t == 1234); 337 // The method will also assert 338 // the value is reset to Long.MAX_VALUE 339 t = runSnapshot(memstore, true); 340 341 // test the case that the timeOfOldestEdit is updated after a KV delete 342 memstore.add(KeyValueTestUtil.create("r", "f", "q", 100, KeyValue.Type.Delete, "v"), null); 343 t = memstore.timeOfOldestEdit(); 344 assertTrue(t == 1234); 345 t = runSnapshot(memstore, true); 346 347 // test the case that the timeOfOldestEdit is updated after a KV upsert 348 List<Cell> l = new ArrayList<>(); 349 KeyValue kv1 = KeyValueTestUtil.create("r", "f", "q", 100, "v"); 350 kv1.setSequenceId(100); 351 l.add(kv1); 352 memstore.upsert(l, 1000, null); 353 t = memstore.timeOfOldestEdit(); 354 assertTrue(t == 1234); 355 } finally { 356 EnvironmentEdgeManager.reset(); 357 } 358 } 359 360 private long runSnapshot(final AbstractMemStore hmc, boolean useForce) 361 throws IOException { 362 // Save off old state. 363 long oldHistorySize = hmc.getSnapshot().getDataSize(); 364 long prevTimeStamp = hmc.timeOfOldestEdit(); 365 366 hmc.snapshot(); 367 MemStoreSnapshot snapshot = hmc.snapshot(); 368 if (useForce) { 369 // Make some assertions about what just happened. 370 assertTrue("History size has not increased", oldHistorySize < snapshot.getDataSize()); 371 long t = hmc.timeOfOldestEdit(); 372 assertTrue("Time of oldest edit is not Long.MAX_VALUE", t == Long.MAX_VALUE); 373 hmc.clearSnapshot(snapshot.getId()); 374 } else { 375 long t = hmc.timeOfOldestEdit(); 376 assertTrue("Time of oldest edit didn't remain the same", t == prevTimeStamp); 377 } 378 return prevTimeStamp; 379 } 380 381 private void isExpectedRowWithoutTimestamps(final int rowIndex, 382 List<Cell> kvs) { 383 int i = 0; 384 for (Cell kv : kvs) { 385 byte[] expectedColname = makeQualifier(rowIndex, i++); 386 assertTrue("Column name", CellUtil.matchingQualifier(kv, expectedColname)); 387 // Value is column name as bytes. Usually result is 388 // 100 bytes in size at least. This is the default size 389 // for BytesWriteable. For comparison, convert bytes to 390 // String and trim to remove trailing null bytes. 391 assertTrue("Content", CellUtil.matchingValue(kv, expectedColname)); 392 } 393 } 394 395 @Test 396 public void testPuttingBackChunksAfterFlushing() throws IOException { 397 byte[] row = Bytes.toBytes("testrow"); 398 byte[] fam = Bytes.toBytes("testfamily"); 399 byte[] qf1 = Bytes.toBytes("testqualifier1"); 400 byte[] qf2 = Bytes.toBytes("testqualifier2"); 401 byte[] qf3 = Bytes.toBytes("testqualifier3"); 402 byte[] qf4 = Bytes.toBytes("testqualifier4"); 403 byte[] qf5 = Bytes.toBytes("testqualifier5"); 404 byte[] val = Bytes.toBytes("testval"); 405 406 // Setting up memstore 407 memstore.add(new KeyValue(row, fam, qf1, val), null); 408 memstore.add(new KeyValue(row, fam, qf2, val), null); 409 memstore.add(new KeyValue(row, fam, qf3, val), null); 410 411 // Creating a snapshot 412 MemStoreSnapshot snapshot = memstore.snapshot(); 413 assertEquals(3, memstore.getSnapshot().getCellsCount()); 414 415 // Adding value to "new" memstore 416 assertEquals(0, memstore.getActive().getCellsCount()); 417 memstore.add(new KeyValue(row, fam, qf4, val), null); 418 memstore.add(new KeyValue(row, fam, qf5, val), null); 419 assertEquals(2, memstore.getActive().getCellsCount()); 420 // close the scanners 421 for(KeyValueScanner scanner : snapshot.getScanners()) { 422 scanner.close(); 423 } 424 memstore.clearSnapshot(snapshot.getId()); 425 426 int chunkCount = chunkCreator.getPoolSize(); 427 assertTrue(chunkCount > 0); 428 429 } 430 431 @Test 432 public void testPuttingBackChunksWithOpeningScanner() 433 throws IOException { 434 byte[] row = Bytes.toBytes("testrow"); 435 byte[] fam = Bytes.toBytes("testfamily"); 436 byte[] qf1 = Bytes.toBytes("testqualifier1"); 437 byte[] qf2 = Bytes.toBytes("testqualifier2"); 438 byte[] qf3 = Bytes.toBytes("testqualifier3"); 439 byte[] qf4 = Bytes.toBytes("testqualifier4"); 440 byte[] qf5 = Bytes.toBytes("testqualifier5"); 441 byte[] qf6 = Bytes.toBytes("testqualifier6"); 442 byte[] qf7 = Bytes.toBytes("testqualifier7"); 443 byte[] val = Bytes.toBytes("testval"); 444 445 // Setting up memstore 446 memstore.add(new KeyValue(row, fam, qf1, val), null); 447 memstore.add(new KeyValue(row, fam, qf2, val), null); 448 memstore.add(new KeyValue(row, fam, qf3, val), null); 449 450 // Creating a snapshot 451 MemStoreSnapshot snapshot = memstore.snapshot(); 452 assertEquals(3, memstore.getSnapshot().getCellsCount()); 453 454 // Adding value to "new" memstore 455 assertEquals(0, memstore.getActive().getCellsCount()); 456 memstore.add(new KeyValue(row, fam, qf4, val), null); 457 memstore.add(new KeyValue(row, fam, qf5, val), null); 458 assertEquals(2, memstore.getActive().getCellsCount()); 459 460 // opening scanner before clear the snapshot 461 List<KeyValueScanner> scanners = memstore.getScanners(0); 462 // Shouldn't putting back the chunks to pool,since some scanners are opening 463 // based on their data 464 // close the scanners 465 for(KeyValueScanner scanner : snapshot.getScanners()) { 466 scanner.close(); 467 } 468 memstore.clearSnapshot(snapshot.getId()); 469 470 assertTrue(chunkCreator.getPoolSize() == 0); 471 472 // Chunks will be put back to pool after close scanners; 473 for (KeyValueScanner scanner : scanners) { 474 scanner.close(); 475 } 476 assertTrue(chunkCreator.getPoolSize() > 0); 477 478 // clear chunks 479 chunkCreator.clearChunksInPool(); 480 481 // Creating another snapshot 482 483 snapshot = memstore.snapshot(); 484 // Adding more value 485 memstore.add(new KeyValue(row, fam, qf6, val), null); 486 memstore.add(new KeyValue(row, fam, qf7, val), null); 487 // opening scanners 488 scanners = memstore.getScanners(0); 489 // close scanners before clear the snapshot 490 for (KeyValueScanner scanner : scanners) { 491 scanner.close(); 492 } 493 // Since no opening scanner, the chunks of snapshot should be put back to 494 // pool 495 // close the scanners 496 for(KeyValueScanner scanner : snapshot.getScanners()) { 497 scanner.close(); 498 } 499 memstore.clearSnapshot(snapshot.getId()); 500 assertTrue(chunkCreator.getPoolSize() > 0); 501 } 502 503 @Test 504 public void testPuttingBackChunksWithOpeningPipelineScanner() 505 throws IOException { 506 507 // set memstore to do data compaction and not to use the speculative scan 508 MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.EAGER; 509 memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, 510 String.valueOf(compactionType)); 511 ((MyCompactingMemStore)memstore).initiateType(compactionType, memstore.getConfiguration()); 512 513 byte[] row = Bytes.toBytes("testrow"); 514 byte[] fam = Bytes.toBytes("testfamily"); 515 byte[] qf1 = Bytes.toBytes("testqualifier1"); 516 byte[] qf2 = Bytes.toBytes("testqualifier2"); 517 byte[] qf3 = Bytes.toBytes("testqualifier3"); 518 byte[] val = Bytes.toBytes("testval"); 519 520 // Setting up memstore 521 memstore.add(new KeyValue(row, fam, qf1, 1, val), null); 522 memstore.add(new KeyValue(row, fam, qf2, 1, val), null); 523 memstore.add(new KeyValue(row, fam, qf3, 1, val), null); 524 525 // Creating a pipeline 526 ((MyCompactingMemStore)memstore).disableCompaction(); 527 ((CompactingMemStore)memstore).flushInMemory(); 528 529 // Adding value to "new" memstore 530 assertEquals(0, memstore.getActive().getCellsCount()); 531 memstore.add(new KeyValue(row, fam, qf1, 2, val), null); 532 memstore.add(new KeyValue(row, fam, qf2, 2, val), null); 533 assertEquals(2, memstore.getActive().getCellsCount()); 534 535 // pipeline bucket 2 536 ((CompactingMemStore)memstore).flushInMemory(); 537 // opening scanner before force flushing 538 List<KeyValueScanner> scanners = memstore.getScanners(0); 539 // Shouldn't putting back the chunks to pool,since some scanners are opening 540 // based on their data 541 ((MyCompactingMemStore)memstore).enableCompaction(); 542 // trigger compaction 543 ((CompactingMemStore)memstore).flushInMemory(); 544 545 // Adding value to "new" memstore 546 assertEquals(0, memstore.getActive().getCellsCount()); 547 memstore.add(new KeyValue(row, fam, qf3, 3, val), null); 548 memstore.add(new KeyValue(row, fam, qf2, 3, val), null); 549 memstore.add(new KeyValue(row, fam, qf1, 3, val), null); 550 assertEquals(3, memstore.getActive().getCellsCount()); 551 552 assertTrue(chunkCreator.getPoolSize() == 0); 553 554 // Chunks will be put back to pool after close scanners; 555 for (KeyValueScanner scanner : scanners) { 556 scanner.close(); 557 } 558 assertTrue(chunkCreator.getPoolSize() > 0); 559 560 // clear chunks 561 chunkCreator.clearChunksInPool(); 562 563 // Creating another snapshot 564 565 MemStoreSnapshot snapshot = memstore.snapshot(); 566 // close the scanners 567 for(KeyValueScanner scanner : snapshot.getScanners()) { 568 scanner.close(); 569 } 570 memstore.clearSnapshot(snapshot.getId()); 571 572 snapshot = memstore.snapshot(); 573 // Adding more value 574 memstore.add(new KeyValue(row, fam, qf2, 4, val), null); 575 memstore.add(new KeyValue(row, fam, qf3, 4, val), null); 576 // opening scanners 577 scanners = memstore.getScanners(0); 578 // close scanners before clear the snapshot 579 for (KeyValueScanner scanner : scanners) { 580 scanner.close(); 581 } 582 // Since no opening scanner, the chunks of snapshot should be put back to 583 // pool 584 // close the scanners 585 for(KeyValueScanner scanner : snapshot.getScanners()) { 586 scanner.close(); 587 } 588 memstore.clearSnapshot(snapshot.getId()); 589 assertTrue(chunkCreator.getPoolSize() > 0); 590 } 591 592 ////////////////////////////////////////////////////////////////////////////// 593 // Compaction tests 594 ////////////////////////////////////////////////////////////////////////////// 595 @Test 596 public void testCompaction1Bucket() throws IOException { 597 598 // set memstore to do basic structure flattening, the "eager" option is tested in 599 // TestCompactingToCellFlatMapMemStore 600 MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC; 601 memstore.getConfiguration() 602 .set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, String.valueOf(compactionType)); 603 ((MyCompactingMemStore)memstore).initiateType(compactionType, memstore.getConfiguration()); 604 605 String[] keys1 = { "A", "A", "B", "C" }; //A1, A2, B3, C4 606 607 // test 1 bucket 608 int totalCellsLen = addRowsByKeys(memstore, keys1); 609 int oneCellOnCSLMHeapSize = 120; 610 int oneCellOnCAHeapSize = 88; 611 long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 4 * oneCellOnCSLMHeapSize; 612 assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize()); 613 assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize()); 614 615 ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact 616 assertEquals(0, memstore.getSnapshot().getCellsCount()); 617 // There is no compaction, as the compacting memstore type is basic. 618 // totalCellsLen remains the same 619 totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM 620 + 4 * oneCellOnCAHeapSize; 621 assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize()); 622 assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize()); 623 624 MemStoreSize mss = memstore.getFlushableSize(); 625 MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot 626 // simulate flusher 627 region.decrMemStoreSize(mss); 628 ImmutableSegment s = memstore.getSnapshot(); 629 assertEquals(4, s.getCellsCount()); 630 assertEquals(0, regionServicesForStores.getMemStoreSize()); 631 632 memstore.clearSnapshot(snapshot.getId()); 633 } 634 635 @Test 636 public void testCompaction2Buckets() throws IOException { 637 638 // set memstore to do basic structure flattening, the "eager" option is tested in 639 // TestCompactingToCellFlatMapMemStore 640 MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC; 641 memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, 642 String.valueOf(compactionType)); 643 memstore.getConfiguration().set(MemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY, 644 String.valueOf(1)); 645 ((MyCompactingMemStore)memstore).initiateType(compactionType, memstore.getConfiguration()); 646 String[] keys1 = { "A", "A", "B", "C" }; 647 String[] keys2 = { "A", "B", "D" }; 648 649 int totalCellsLen1 = addRowsByKeys(memstore, keys1); 650 int oneCellOnCSLMHeapSize = 120; 651 int oneCellOnCAHeapSize = 88; 652 long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 4 * oneCellOnCSLMHeapSize; 653 654 assertEquals(totalCellsLen1, regionServicesForStores.getMemStoreSize()); 655 assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize()); 656 657 ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact 658 int counter = 0; 659 for ( Segment s : memstore.getSegments()) { 660 counter += s.getCellsCount(); 661 } 662 assertEquals(4, counter); 663 assertEquals(0, memstore.getSnapshot().getCellsCount()); 664 // There is no compaction, as the compacting memstore type is basic. 665 // totalCellsLen remains the same 666 assertEquals(totalCellsLen1, regionServicesForStores.getMemStoreSize()); 667 totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM 668 + 4 * oneCellOnCAHeapSize; 669 assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize()); 670 671 int totalCellsLen2 = addRowsByKeys(memstore, keys2); 672 totalHeapSize += 3 * oneCellOnCSLMHeapSize; 673 assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize()); 674 assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize()); 675 676 MemStoreSize mss = memstore.getFlushableSize(); 677 ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact 678 assertEquals(0, memstore.getSnapshot().getCellsCount()); 679 assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize()); 680 totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM 681 + 7 * oneCellOnCAHeapSize; 682 assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize()); 683 684 mss = memstore.getFlushableSize(); 685 MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot 686 // simulate flusher 687 region.decrMemStoreSize(mss.getDataSize(), mss.getHeapSize(), mss.getOffHeapSize(), 688 mss.getCellsCount()); 689 ImmutableSegment s = memstore.getSnapshot(); 690 assertEquals(7, s.getCellsCount()); 691 assertEquals(0, regionServicesForStores.getMemStoreSize()); 692 693 memstore.clearSnapshot(snapshot.getId()); 694 } 695 696 @Test 697 public void testCompaction3Buckets() throws IOException { 698 699 // set memstore to do data compaction and not to use the speculative scan 700 MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.EAGER; 701 memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, 702 String.valueOf(compactionType)); 703 ((MyCompactingMemStore)memstore).initiateType(compactionType, memstore.getConfiguration()); 704 String[] keys1 = { "A", "A", "B", "C" }; 705 String[] keys2 = { "A", "B", "D" }; 706 String[] keys3 = { "D", "B", "B" }; 707 708 int totalCellsLen1 = addRowsByKeys(memstore, keys1);// Adding 4 cells. 709 int oneCellOnCSLMHeapSize = 120; 710 int oneCellOnCAHeapSize = 88; 711 assertEquals(totalCellsLen1, region.getMemStoreDataSize()); 712 long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 4 * oneCellOnCSLMHeapSize; 713 assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize()); 714 ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact 715 716 assertEquals(0, memstore.getSnapshot().getCellsCount()); 717 // One cell is duplicated and the compaction will remove it. All cells of same time so adjusting 718 // totalCellsLen 719 totalCellsLen1 = (totalCellsLen1 * 3) / 4; 720 assertEquals(totalCellsLen1, regionServicesForStores.getMemStoreSize()); 721 // In memory flush to make a CellArrayMap instead of CSLM. See the overhead diff. 722 totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM 723 + 3 * oneCellOnCAHeapSize; 724 assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize()); 725 726 int totalCellsLen2 = addRowsByKeys(memstore, keys2);// Adding 3 more cells. 727 long totalHeapSize2 = totalHeapSize + 3 * oneCellOnCSLMHeapSize; 728 729 assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize()); 730 assertEquals(totalHeapSize2, ((CompactingMemStore) memstore).heapSize()); 731 732 ((MyCompactingMemStore) memstore).disableCompaction(); 733 MemStoreSize mss = memstore.getFlushableSize(); 734 ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline without compaction 735 assertEquals(0, memstore.getSnapshot().getCellsCount()); 736 // No change in the cells data size. ie. memstore size. as there is no compaction. 737 assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize()); 738 assertEquals(totalHeapSize2 + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM, 739 ((CompactingMemStore) memstore).heapSize()); 740 741 int totalCellsLen3 = addRowsByKeys(memstore, keys3);// 3 more cells added 742 assertEquals(totalCellsLen1 + totalCellsLen2 + totalCellsLen3, 743 regionServicesForStores.getMemStoreSize()); 744 long totalHeapSize3 = totalHeapSize2 + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM 745 + 3 * oneCellOnCSLMHeapSize; 746 assertEquals(totalHeapSize3, ((CompactingMemStore) memstore).heapSize()); 747 748 ((MyCompactingMemStore)memstore).enableCompaction(); 749 mss = memstore.getFlushableSize(); 750 ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact 751 assertEquals(0, memstore.getSnapshot().getCellsCount()); 752 // active flushed to pipeline and all 3 segments compacted. Will get rid of duplicated cells. 753 // Out of total 10, only 4 cells are unique 754 totalCellsLen2 = totalCellsLen2 / 3;// 2 out of 3 cells are duplicated 755 totalCellsLen3 = 0;// All duplicated cells. 756 assertEquals(totalCellsLen1 + totalCellsLen2 + totalCellsLen3, 757 regionServicesForStores.getMemStoreSize()); 758 // Only 4 unique cells left 759 assertEquals(4 * oneCellOnCAHeapSize + MutableSegment.DEEP_OVERHEAD 760 + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM, ((CompactingMemStore) memstore).heapSize()); 761 762 mss = memstore.getFlushableSize(); 763 MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot 764 // simulate flusher 765 region.decrMemStoreSize(mss.getDataSize(), mss.getHeapSize(), mss.getOffHeapSize(), 766 mss.getCellsCount()); 767 ImmutableSegment s = memstore.getSnapshot(); 768 assertEquals(4, s.getCellsCount()); 769 assertEquals(0, regionServicesForStores.getMemStoreSize()); 770 771 memstore.clearSnapshot(snapshot.getId()); 772 } 773 774 @Test 775 public void testMagicCompaction3Buckets() throws IOException { 776 777 MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.ADAPTIVE; 778 memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, 779 String.valueOf(compactionType)); 780 memstore.getConfiguration().setDouble( 781 AdaptiveMemStoreCompactionStrategy.ADAPTIVE_COMPACTION_THRESHOLD_KEY, 0.45); 782 memstore.getConfiguration().setInt( 783 AdaptiveMemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY, 2); 784 memstore.getConfiguration().setInt(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 1); 785 ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration()); 786 787 String[] keys1 = { "A", "B", "D" }; 788 String[] keys2 = { "A" }; 789 String[] keys3 = { "A", "A", "B", "C" }; 790 String[] keys4 = { "D", "B", "B" }; 791 792 int totalCellsLen1 = addRowsByKeys(memstore, keys1);// Adding 3 cells. 793 int oneCellOnCSLMHeapSize = 120; 794 assertEquals(totalCellsLen1, region.getMemStoreDataSize()); 795 long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 3 * oneCellOnCSLMHeapSize; 796 assertEquals(totalHeapSize, memstore.heapSize()); 797 798 ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline - flatten 799 assertEquals(3, ((CompactingMemStore) memstore).getImmutableSegments().getNumOfCells()); 800 assertEquals(1.0, 801 ((CompactingMemStore) memstore).getImmutableSegments().getEstimatedUniquesFrac(), 0); 802 assertEquals(0, memstore.getSnapshot().getCellsCount()); 803 804 addRowsByKeys(memstore, keys2);// Adding 1 more cell - flatten. 805 ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline without compaction 806 assertEquals(4, ((CompactingMemStore) memstore).getImmutableSegments().getNumOfCells()); 807 assertEquals(1.0, 808 ((CompactingMemStore) memstore).getImmutableSegments().getEstimatedUniquesFrac(), 0); 809 assertEquals(0, memstore.getSnapshot().getCellsCount()); 810 811 addRowsByKeys(memstore, keys3);// Adding 4 more cells - merge. 812 ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline without compaction 813 assertEquals(8, ((CompactingMemStore) memstore).getImmutableSegments().getNumOfCells()); 814 assertEquals((4.0 / 8.0), 815 ((CompactingMemStore) memstore).getImmutableSegments().getEstimatedUniquesFrac(), 0); 816 assertEquals(0, memstore.getSnapshot().getCellsCount()); 817 818 addRowsByKeys(memstore, keys4);// 3 more cells added - compact (or not) 819 ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact 820 int numCells = ((CompactingMemStore) memstore).getImmutableSegments().getNumOfCells(); 821 assertTrue(4 == numCells || 11 == numCells); 822 assertEquals(0, memstore.getSnapshot().getCellsCount()); 823 824 MemStoreSize mss = memstore.getFlushableSize(); 825 MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot 826 // simulate flusher 827 region.decrMemStoreSize(mss); 828 ImmutableSegment s = memstore.getSnapshot(); 829 numCells = s.getCellsCount(); 830 assertTrue(4 == numCells || 11 == numCells); 831 assertEquals(0, regionServicesForStores.getMemStoreSize()); 832 833 memstore.clearSnapshot(snapshot.getId()); 834 } 835 836 protected int addRowsByKeys(final AbstractMemStore hmc, String[] keys) { 837 byte[] fam = Bytes.toBytes("testfamily"); 838 byte[] qf = Bytes.toBytes("testqualifier"); 839 long size = hmc.getActive().getDataSize(); 840 long heapOverhead = hmc.getActive().getHeapSize(); 841 int cellsCount = hmc.getActive().getCellsCount(); 842 int totalLen = 0; 843 for (int i = 0; i < keys.length; i++) { 844 long timestamp = System.currentTimeMillis(); 845 Threads.sleep(1); // to make sure each kv gets a different ts 846 byte[] row = Bytes.toBytes(keys[i]); 847 byte[] val = Bytes.toBytes(keys[i] + i); 848 KeyValue kv = new KeyValue(row, fam, qf, timestamp, val); 849 totalLen += Segment.getCellLength(kv); 850 hmc.add(kv, null); 851 LOG.debug("added kv: " + kv.getKeyString() + ", timestamp:" + kv.getTimestamp()); 852 } 853 regionServicesForStores.addMemStoreSize(hmc.getActive().getDataSize() - size, 854 hmc.getActive().getHeapSize() - heapOverhead, 0, 855 hmc.getActive().getCellsCount() - cellsCount); 856 return totalLen; 857 } 858 859 // for controlling the val size when adding a new cell 860 protected int addRowsByKeys(final AbstractMemStore hmc, String[] keys, byte[] val) { 861 byte[] fam = Bytes.toBytes("testfamily"); 862 byte[] qf = Bytes.toBytes("testqualifier"); 863 long size = hmc.getActive().getDataSize(); 864 long heapOverhead = hmc.getActive().getHeapSize(); 865 int cellsCount = hmc.getActive().getCellsCount(); 866 int totalLen = 0; 867 for (int i = 0; i < keys.length; i++) { 868 long timestamp = System.currentTimeMillis(); 869 Threads.sleep(1); // to make sure each kv gets a different ts 870 byte[] row = Bytes.toBytes(keys[i]); 871 KeyValue kv = new KeyValue(row, fam, qf, timestamp, val); 872 totalLen += Segment.getCellLength(kv); 873 hmc.add(kv, null); 874 LOG.debug("added kv: " + kv.getKeyString() + ", timestamp:" + kv.getTimestamp()); 875 } 876 regionServicesForStores.addMemStoreSize(hmc.getActive().getDataSize() - size, 877 hmc.getActive().getHeapSize() - heapOverhead, 0, cellsCount); 878 return totalLen; 879 } 880 881 private class EnvironmentEdgeForMemstoreTest implements EnvironmentEdge { 882 long t = 1234; 883 884 @Override 885 public long currentTime() { 886 return t; 887 } 888 public void setCurrentTimeMillis(long t) { 889 this.t = t; 890 } 891 } 892 893 static protected class MyCompactingMemStore extends CompactingMemStore { 894 895 public MyCompactingMemStore(Configuration conf, CellComparator c, HStore store, 896 RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy) 897 throws IOException { 898 super(conf, c, store, regionServices, compactionPolicy); 899 } 900 901 void disableCompaction() { 902 allowCompaction.set(false); 903 } 904 905 void enableCompaction() { 906 allowCompaction.set(true); 907 } 908 void initiateType(MemoryCompactionPolicy compactionType, Configuration conf) 909 throws IllegalArgumentIOException { 910 compactor.initiateCompactionStrategy(compactionType, conf, "CF_TEST"); 911 } 912 913 } 914}