001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertNotEquals; 022import static org.junit.Assert.assertNotNull; 023import static org.junit.Assert.assertNull; 024import static org.junit.Assert.assertTrue; 025 026import java.io.IOException; 027import java.lang.management.ManagementFactory; 028import java.util.ArrayList; 029import java.util.List; 030import java.util.concurrent.Executors; 031import java.util.concurrent.ThreadPoolExecutor; 032 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.hbase.Cell; 035import org.apache.hadoop.hbase.CellComparator; 036import org.apache.hadoop.hbase.CellUtil; 037import org.apache.hadoop.hbase.HBaseClassTestRule; 038import org.apache.hadoop.hbase.HBaseConfiguration; 039import org.apache.hadoop.hbase.HBaseTestingUtility; 040import org.apache.hadoop.hbase.HColumnDescriptor; 041import org.apache.hadoop.hbase.HConstants; 042import org.apache.hadoop.hbase.HRegionInfo; 043import org.apache.hadoop.hbase.HTableDescriptor; 044import org.apache.hadoop.hbase.KeepDeletedCells; 045import org.apache.hadoop.hbase.KeyValue; 046import org.apache.hadoop.hbase.KeyValueTestUtil; 047import org.apache.hadoop.hbase.MemoryCompactionPolicy; 048import org.apache.hadoop.hbase.TableName; 049import org.apache.hadoop.hbase.client.Scan; 050import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException; 051import org.apache.hadoop.hbase.io.util.MemorySizeUtil; 052import org.apache.hadoop.hbase.testclassification.MediumTests; 053import org.apache.hadoop.hbase.testclassification.RegionServerTests; 054import org.apache.hadoop.hbase.util.Bytes; 055import org.apache.hadoop.hbase.util.EnvironmentEdge; 056import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 057import org.apache.hadoop.hbase.util.Threads; 058import org.apache.hadoop.hbase.wal.WAL; 059import org.junit.After; 060import org.junit.Before; 061import org.junit.ClassRule; 062import org.junit.Test; 063import org.junit.experimental.categories.Category; 064import org.mockito.Mockito; 065import org.slf4j.Logger; 066import org.slf4j.LoggerFactory; 067 068/** 069 * compacted memstore test case 070 */ 071@Category({RegionServerTests.class, MediumTests.class}) 072public class TestCompactingMemStore extends TestDefaultMemStore { 073 074 @ClassRule 075 public static final HBaseClassTestRule CLASS_RULE = 076 HBaseClassTestRule.forClass(TestCompactingMemStore.class); 077 078 private static final Logger LOG = LoggerFactory.getLogger(TestCompactingMemStore.class); 079 protected static ChunkCreator chunkCreator; 080 protected HRegion region; 081 protected RegionServicesForStores regionServicesForStores; 082 protected HStore store; 083 084 ////////////////////////////////////////////////////////////////////////////// 085 // Helpers 086 ////////////////////////////////////////////////////////////////////////////// 087 protected static byte[] makeQualifier(final int i1, final int i2) { 088 return Bytes.toBytes(Integer.toString(i1) + ";" + 089 Integer.toString(i2)); 090 } 091 092 @After 093 public void tearDown() throws Exception { 094 chunkCreator.clearChunksInPool(); 095 } 096 097 @Override 098 @Before 099 public void setUp() throws Exception { 100 compactingSetUp(); 101 this.memstore = new MyCompactingMemStore(HBaseConfiguration.create(), CellComparator.getInstance(), 102 store, regionServicesForStores, MemoryCompactionPolicy.EAGER); 103 ((CompactingMemStore)memstore).setIndexType(CompactingMemStore.IndexType.ARRAY_MAP); 104 } 105 106 protected void compactingSetUp() throws Exception { 107 super.internalSetUp(); 108 Configuration conf = new Configuration(); 109 conf.setBoolean(MemStoreLAB.USEMSLAB_KEY, true); 110 conf.setFloat(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, 0.2f); 111 conf.setInt(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, 1000); 112 HBaseTestingUtility hbaseUtility = HBaseTestingUtility.createLocalHTU(conf); 113 HColumnDescriptor hcd = new HColumnDescriptor(FAMILY); 114 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("foobar")); 115 htd.addFamily(hcd); 116 HRegionInfo info = 117 new HRegionInfo(TableName.valueOf("foobar"), null, null, false); 118 WAL wal = hbaseUtility.createWal(conf, hbaseUtility.getDataTestDir(), info); 119 this.region = HRegion.createHRegion(info, hbaseUtility.getDataTestDir(), conf, htd, wal, true); 120 this.regionServicesForStores = Mockito.spy(region.getRegionServicesForStores()); 121 ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1); 122 Mockito.when(regionServicesForStores.getInMemoryCompactionPool()).thenReturn(pool); 123 this.store = new HStore(region, hcd, conf, false); 124 125 long globalMemStoreLimit = (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage() 126 .getMax() * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false)); 127 chunkCreator = ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 128 globalMemStoreLimit, 0.4f, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, 129 null, MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); 130 assertNotNull(chunkCreator); 131 } 132 133 /** 134 * A simple test which flush in memory affect timeOfOldestEdit 135 */ 136 @Test 137 public void testTimeOfOldestEdit() { 138 assertEquals(Long.MAX_VALUE, memstore.timeOfOldestEdit()); 139 final byte[] r = Bytes.toBytes("r"); 140 final byte[] f = Bytes.toBytes("f"); 141 final byte[] q = Bytes.toBytes("q"); 142 final byte[] v = Bytes.toBytes("v"); 143 final KeyValue kv = new KeyValue(r, f, q, v); 144 memstore.add(kv, null); 145 long timeOfOldestEdit = memstore.timeOfOldestEdit(); 146 assertNotEquals(Long.MAX_VALUE, timeOfOldestEdit); 147 148 ((CompactingMemStore)memstore).flushInMemory(); 149 assertEquals(timeOfOldestEdit, memstore.timeOfOldestEdit()); 150 memstore.add(kv, null); 151 assertEquals(timeOfOldestEdit, memstore.timeOfOldestEdit()); 152 memstore.snapshot(); 153 assertEquals(Long.MAX_VALUE, memstore.timeOfOldestEdit()); 154 } 155 156 /** 157 * A simple test which verifies the 3 possible states when scanning across snapshot. 158 * 159 * @throws IOException 160 * @throws CloneNotSupportedException 161 */ 162 @Override 163 @Test 164 public void testScanAcrossSnapshot2() throws IOException, CloneNotSupportedException { 165 // we are going to the scanning across snapshot with two kvs 166 // kv1 should always be returned before kv2 167 final byte[] one = Bytes.toBytes(1); 168 final byte[] two = Bytes.toBytes(2); 169 final byte[] f = Bytes.toBytes("f"); 170 final byte[] q = Bytes.toBytes("q"); 171 final byte[] v = Bytes.toBytes(3); 172 173 final KeyValue kv1 = new KeyValue(one, f, q, 10, v); 174 final KeyValue kv2 = new KeyValue(two, f, q, 10, v); 175 176 // use case 1: both kvs in kvset 177 this.memstore.add(kv1.clone(), null); 178 this.memstore.add(kv2.clone(), null); 179 verifyScanAcrossSnapshot2(kv1, kv2); 180 181 // use case 2: both kvs in snapshot 182 this.memstore.snapshot(); 183 verifyScanAcrossSnapshot2(kv1, kv2); 184 185 // use case 3: first in snapshot second in kvset 186 this.memstore = new CompactingMemStore(HBaseConfiguration.create(), 187 CellComparator.getInstance(), store, regionServicesForStores, 188 MemoryCompactionPolicy.EAGER); 189 this.memstore.add(kv1.clone(), null); 190 // As compaction is starting in the background the repetition 191 // of the k1 might be removed BUT the scanners created earlier 192 // should look on the OLD MutableCellSetSegment, so this should be OK... 193 this.memstore.snapshot(); 194 this.memstore.add(kv2.clone(), null); 195 verifyScanAcrossSnapshot2(kv1,kv2); 196 } 197 198 /** 199 * Test memstore snapshots 200 * @throws IOException 201 */ 202 @Override 203 @Test 204 public void testSnapshotting() throws IOException { 205 final int snapshotCount = 5; 206 // Add some rows, run a snapshot. Do it a few times. 207 for (int i = 0; i < snapshotCount; i++) { 208 addRows(this.memstore); 209 runSnapshot(this.memstore, true); 210 assertEquals("History not being cleared", 0, this.memstore.getSnapshot().getCellsCount()); 211 } 212 } 213 214 215 ////////////////////////////////////////////////////////////////////////////// 216 // Get tests 217 ////////////////////////////////////////////////////////////////////////////// 218 219 /** Test getNextRow from memstore 220 * @throws InterruptedException 221 */ 222 @Override 223 @Test 224 public void testGetNextRow() throws Exception { 225 addRows(this.memstore); 226 // Add more versions to make it a little more interesting. 227 Thread.sleep(1); 228 addRows(this.memstore); 229 Cell closestToEmpty = ((CompactingMemStore)this.memstore).getNextRow(KeyValue.LOWESTKEY); 230 assertTrue(CellComparator.getInstance().compareRows(closestToEmpty, 231 new KeyValue(Bytes.toBytes(0), System.currentTimeMillis())) == 0); 232 for (int i = 0; i < ROW_COUNT; i++) { 233 Cell nr = ((CompactingMemStore)this.memstore).getNextRow(new KeyValue(Bytes.toBytes(i), 234 System.currentTimeMillis())); 235 if (i + 1 == ROW_COUNT) { 236 assertNull(nr); 237 } else { 238 assertTrue(CellComparator.getInstance().compareRows(nr, 239 new KeyValue(Bytes.toBytes(i + 1), System.currentTimeMillis())) == 0); 240 } 241 } 242 //starting from each row, validate results should contain the starting row 243 Configuration conf = HBaseConfiguration.create(); 244 for (int startRowId = 0; startRowId < ROW_COUNT; startRowId++) { 245 ScanInfo scanInfo = new ScanInfo(conf, FAMILY, 0, 1, Integer.MAX_VALUE, 246 KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, this.memstore.getComparator(), false); 247 try (InternalScanner scanner = 248 new StoreScanner(new Scan().withStartRow(Bytes.toBytes(startRowId)), scanInfo, null, 249 memstore.getScanners(0))) { 250 List<Cell> results = new ArrayList<>(); 251 for (int i = 0; scanner.next(results); i++) { 252 int rowId = startRowId + i; 253 Cell left = results.get(0); 254 byte[] row1 = Bytes.toBytes(rowId); 255 assertTrue("Row name", 256 CellComparator.getInstance().compareRows(left, row1, 0, row1.length) == 0); 257 assertEquals("Count of columns", QUALIFIER_COUNT, results.size()); 258 List<Cell> row = new ArrayList<>(); 259 for (Cell kv : results) { 260 row.add(kv); 261 } 262 isExpectedRowWithoutTimestamps(rowId, row); 263 // Clear out set. Otherwise row results accumulate. 264 results.clear(); 265 } 266 } 267 } 268 } 269 270 @Override 271 @Test 272 public void testGet_memstoreAndSnapShot() throws IOException { 273 byte[] row = Bytes.toBytes("testrow"); 274 byte[] fam = Bytes.toBytes("testfamily"); 275 byte[] qf1 = Bytes.toBytes("testqualifier1"); 276 byte[] qf2 = Bytes.toBytes("testqualifier2"); 277 byte[] qf3 = Bytes.toBytes("testqualifier3"); 278 byte[] qf4 = Bytes.toBytes("testqualifier4"); 279 byte[] qf5 = Bytes.toBytes("testqualifier5"); 280 byte[] val = Bytes.toBytes("testval"); 281 282 //Setting up memstore 283 memstore.add(new KeyValue(row, fam, qf1, val), null); 284 memstore.add(new KeyValue(row, fam, qf2, val), null); 285 memstore.add(new KeyValue(row, fam, qf3, val), null); 286 //Pushing to pipeline 287 ((CompactingMemStore)memstore).flushInMemory(); 288 assertEquals(0, memstore.getSnapshot().getCellsCount()); 289 //Creating a snapshot 290 memstore.snapshot(); 291 assertEquals(3, memstore.getSnapshot().getCellsCount()); 292 //Adding value to "new" memstore 293 assertEquals(0, memstore.getActive().getCellsCount()); 294 memstore.add(new KeyValue(row, fam, qf4, val), null); 295 memstore.add(new KeyValue(row, fam, qf5, val), null); 296 assertEquals(2, memstore.getActive().getCellsCount()); 297 } 298 299 //////////////////////////////////// 300 // Test for periodic memstore flushes 301 // based on time of oldest edit 302 //////////////////////////////////// 303 304 /** 305 * Add keyvalues with a fixed memstoreTs, and checks that memstore size is decreased 306 * as older keyvalues are deleted from the memstore. 307 * 308 * @throws Exception 309 */ 310 @Override 311 @Test 312 public void testUpsertMemstoreSize() throws Exception { 313 MemStoreSize oldSize = memstore.size(); 314 315 List<Cell> l = new ArrayList<>(); 316 KeyValue kv1 = KeyValueTestUtil.create("r", "f", "q", 100, "v"); 317 KeyValue kv2 = KeyValueTestUtil.create("r", "f", "q", 101, "v"); 318 KeyValue kv3 = KeyValueTestUtil.create("r", "f", "q", 102, "v"); 319 320 kv1.setSequenceId(1); 321 kv2.setSequenceId(1); 322 kv3.setSequenceId(1); 323 l.add(kv1); 324 l.add(kv2); 325 l.add(kv3); 326 327 this.memstore.upsert(l, 2, null);// readpoint is 2 328 MemStoreSize newSize = this.memstore.size(); 329 assert (newSize.getDataSize() > oldSize.getDataSize()); 330 //The kv1 should be removed. 331 assert (memstore.getActive().getCellsCount() == 2); 332 333 KeyValue kv4 = KeyValueTestUtil.create("r", "f", "q", 104, "v"); 334 kv4.setSequenceId(1); 335 l.clear(); 336 l.add(kv4); 337 this.memstore.upsert(l, 3, null); 338 assertEquals(newSize, this.memstore.size()); 339 //The kv2 should be removed. 340 assert (memstore.getActive().getCellsCount() == 2); 341 //this.memstore = null; 342 } 343 344 /** 345 * Tests that the timeOfOldestEdit is updated correctly for the 346 * various edit operations in memstore. 347 * @throws Exception 348 */ 349 @Override 350 @Test 351 public void testUpdateToTimeOfOldestEdit() throws Exception { 352 try { 353 EnvironmentEdgeForMemstoreTest edge = new EnvironmentEdgeForMemstoreTest(); 354 EnvironmentEdgeManager.injectEdge(edge); 355 long t = memstore.timeOfOldestEdit(); 356 assertEquals(Long.MAX_VALUE, t); 357 358 // test the case that the timeOfOldestEdit is updated after a KV add 359 memstore.add(KeyValueTestUtil.create("r", "f", "q", 100, "v"), null); 360 t = memstore.timeOfOldestEdit(); 361 assertTrue(t == 1234); 362 // The method will also assert 363 // the value is reset to Long.MAX_VALUE 364 t = runSnapshot(memstore, true); 365 366 // test the case that the timeOfOldestEdit is updated after a KV delete 367 memstore.add(KeyValueTestUtil.create("r", "f", "q", 100, KeyValue.Type.Delete, "v"), null); 368 t = memstore.timeOfOldestEdit(); 369 assertTrue(t == 1234); 370 t = runSnapshot(memstore, true); 371 372 // test the case that the timeOfOldestEdit is updated after a KV upsert 373 List<Cell> l = new ArrayList<>(); 374 KeyValue kv1 = KeyValueTestUtil.create("r", "f", "q", 100, "v"); 375 kv1.setSequenceId(100); 376 l.add(kv1); 377 memstore.upsert(l, 1000, null); 378 t = memstore.timeOfOldestEdit(); 379 assertTrue(t == 1234); 380 } finally { 381 EnvironmentEdgeManager.reset(); 382 } 383 } 384 385 private long runSnapshot(final AbstractMemStore hmc, boolean useForce) 386 throws IOException { 387 // Save off old state. 388 long oldHistorySize = hmc.getSnapshot().getDataSize(); 389 long prevTimeStamp = hmc.timeOfOldestEdit(); 390 391 hmc.snapshot(); 392 MemStoreSnapshot snapshot = hmc.snapshot(); 393 if (useForce) { 394 // Make some assertions about what just happened. 395 assertTrue("History size has not increased", oldHistorySize < snapshot.getDataSize()); 396 long t = hmc.timeOfOldestEdit(); 397 assertTrue("Time of oldest edit is not Long.MAX_VALUE", t == Long.MAX_VALUE); 398 hmc.clearSnapshot(snapshot.getId()); 399 } else { 400 long t = hmc.timeOfOldestEdit(); 401 assertTrue("Time of oldest edit didn't remain the same", t == prevTimeStamp); 402 } 403 return prevTimeStamp; 404 } 405 406 private void isExpectedRowWithoutTimestamps(final int rowIndex, 407 List<Cell> kvs) { 408 int i = 0; 409 for (Cell kv : kvs) { 410 byte[] expectedColname = makeQualifier(rowIndex, i++); 411 assertTrue("Column name", CellUtil.matchingQualifier(kv, expectedColname)); 412 // Value is column name as bytes. Usually result is 413 // 100 bytes in size at least. This is the default size 414 // for BytesWriteable. For comparison, convert bytes to 415 // String and trim to remove trailing null bytes. 416 assertTrue("Content", CellUtil.matchingValue(kv, expectedColname)); 417 } 418 } 419 420 @Test 421 public void testPuttingBackChunksAfterFlushing() throws IOException { 422 byte[] row = Bytes.toBytes("testrow"); 423 byte[] fam = Bytes.toBytes("testfamily"); 424 byte[] qf1 = Bytes.toBytes("testqualifier1"); 425 byte[] qf2 = Bytes.toBytes("testqualifier2"); 426 byte[] qf3 = Bytes.toBytes("testqualifier3"); 427 byte[] qf4 = Bytes.toBytes("testqualifier4"); 428 byte[] qf5 = Bytes.toBytes("testqualifier5"); 429 byte[] val = Bytes.toBytes("testval"); 430 431 // Setting up memstore 432 memstore.add(new KeyValue(row, fam, qf1, val), null); 433 memstore.add(new KeyValue(row, fam, qf2, val), null); 434 memstore.add(new KeyValue(row, fam, qf3, val), null); 435 436 // Creating a snapshot 437 MemStoreSnapshot snapshot = memstore.snapshot(); 438 assertEquals(3, memstore.getSnapshot().getCellsCount()); 439 440 // Adding value to "new" memstore 441 assertEquals(0, memstore.getActive().getCellsCount()); 442 memstore.add(new KeyValue(row, fam, qf4, val), null); 443 memstore.add(new KeyValue(row, fam, qf5, val), null); 444 assertEquals(2, memstore.getActive().getCellsCount()); 445 // close the scanners 446 for(KeyValueScanner scanner : snapshot.getScanners()) { 447 scanner.close(); 448 } 449 memstore.clearSnapshot(snapshot.getId()); 450 451 int chunkCount = chunkCreator.getPoolSize(); 452 assertTrue(chunkCount > 0); 453 454 } 455 456 @Test 457 public void testPuttingBackChunksWithOpeningScanner() 458 throws IOException { 459 byte[] row = Bytes.toBytes("testrow"); 460 byte[] fam = Bytes.toBytes("testfamily"); 461 byte[] qf1 = Bytes.toBytes("testqualifier1"); 462 byte[] qf2 = Bytes.toBytes("testqualifier2"); 463 byte[] qf3 = Bytes.toBytes("testqualifier3"); 464 byte[] qf4 = Bytes.toBytes("testqualifier4"); 465 byte[] qf5 = Bytes.toBytes("testqualifier5"); 466 byte[] qf6 = Bytes.toBytes("testqualifier6"); 467 byte[] qf7 = Bytes.toBytes("testqualifier7"); 468 byte[] val = Bytes.toBytes("testval"); 469 470 // Setting up memstore 471 memstore.add(new KeyValue(row, fam, qf1, val), null); 472 memstore.add(new KeyValue(row, fam, qf2, val), null); 473 memstore.add(new KeyValue(row, fam, qf3, val), null); 474 475 // Creating a snapshot 476 MemStoreSnapshot snapshot = memstore.snapshot(); 477 assertEquals(3, memstore.getSnapshot().getCellsCount()); 478 479 // Adding value to "new" memstore 480 assertEquals(0, memstore.getActive().getCellsCount()); 481 memstore.add(new KeyValue(row, fam, qf4, val), null); 482 memstore.add(new KeyValue(row, fam, qf5, val), null); 483 assertEquals(2, memstore.getActive().getCellsCount()); 484 485 // opening scanner before clear the snapshot 486 List<KeyValueScanner> scanners = memstore.getScanners(0); 487 // Shouldn't putting back the chunks to pool,since some scanners are opening 488 // based on their data 489 // close the scanners 490 for(KeyValueScanner scanner : snapshot.getScanners()) { 491 scanner.close(); 492 } 493 memstore.clearSnapshot(snapshot.getId()); 494 495 assertTrue(chunkCreator.getPoolSize() == 0); 496 497 // Chunks will be put back to pool after close scanners; 498 for (KeyValueScanner scanner : scanners) { 499 scanner.close(); 500 } 501 assertTrue(chunkCreator.getPoolSize() > 0); 502 503 // clear chunks 504 chunkCreator.clearChunksInPool(); 505 506 // Creating another snapshot 507 508 snapshot = memstore.snapshot(); 509 // Adding more value 510 memstore.add(new KeyValue(row, fam, qf6, val), null); 511 memstore.add(new KeyValue(row, fam, qf7, val), null); 512 // opening scanners 513 scanners = memstore.getScanners(0); 514 // close scanners before clear the snapshot 515 for (KeyValueScanner scanner : scanners) { 516 scanner.close(); 517 } 518 // Since no opening scanner, the chunks of snapshot should be put back to 519 // pool 520 // close the scanners 521 for(KeyValueScanner scanner : snapshot.getScanners()) { 522 scanner.close(); 523 } 524 memstore.clearSnapshot(snapshot.getId()); 525 assertTrue(chunkCreator.getPoolSize() > 0); 526 } 527 528 @Test 529 public void testPuttingBackChunksWithOpeningPipelineScanner() 530 throws IOException { 531 532 // set memstore to do data compaction and not to use the speculative scan 533 MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.EAGER; 534 memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, 535 String.valueOf(compactionType)); 536 ((MyCompactingMemStore)memstore).initiateType(compactionType, memstore.getConfiguration()); 537 538 byte[] row = Bytes.toBytes("testrow"); 539 byte[] fam = Bytes.toBytes("testfamily"); 540 byte[] qf1 = Bytes.toBytes("testqualifier1"); 541 byte[] qf2 = Bytes.toBytes("testqualifier2"); 542 byte[] qf3 = Bytes.toBytes("testqualifier3"); 543 byte[] val = Bytes.toBytes("testval"); 544 545 // Setting up memstore 546 memstore.add(new KeyValue(row, fam, qf1, 1, val), null); 547 memstore.add(new KeyValue(row, fam, qf2, 1, val), null); 548 memstore.add(new KeyValue(row, fam, qf3, 1, val), null); 549 550 // Creating a pipeline 551 ((MyCompactingMemStore)memstore).disableCompaction(); 552 ((CompactingMemStore)memstore).flushInMemory(); 553 554 // Adding value to "new" memstore 555 assertEquals(0, memstore.getActive().getCellsCount()); 556 memstore.add(new KeyValue(row, fam, qf1, 2, val), null); 557 memstore.add(new KeyValue(row, fam, qf2, 2, val), null); 558 assertEquals(2, memstore.getActive().getCellsCount()); 559 560 // pipeline bucket 2 561 ((CompactingMemStore)memstore).flushInMemory(); 562 // opening scanner before force flushing 563 List<KeyValueScanner> scanners = memstore.getScanners(0); 564 // Shouldn't putting back the chunks to pool,since some scanners are opening 565 // based on their data 566 ((MyCompactingMemStore)memstore).enableCompaction(); 567 // trigger compaction 568 ((CompactingMemStore)memstore).flushInMemory(); 569 570 // Adding value to "new" memstore 571 assertEquals(0, memstore.getActive().getCellsCount()); 572 memstore.add(new KeyValue(row, fam, qf3, 3, val), null); 573 memstore.add(new KeyValue(row, fam, qf2, 3, val), null); 574 memstore.add(new KeyValue(row, fam, qf1, 3, val), null); 575 assertEquals(3, memstore.getActive().getCellsCount()); 576 577 assertTrue(chunkCreator.getPoolSize() == 0); 578 579 // Chunks will be put back to pool after close scanners; 580 for (KeyValueScanner scanner : scanners) { 581 scanner.close(); 582 } 583 assertTrue(chunkCreator.getPoolSize() > 0); 584 585 // clear chunks 586 chunkCreator.clearChunksInPool(); 587 588 // Creating another snapshot 589 590 MemStoreSnapshot snapshot = memstore.snapshot(); 591 // close the scanners 592 for(KeyValueScanner scanner : snapshot.getScanners()) { 593 scanner.close(); 594 } 595 memstore.clearSnapshot(snapshot.getId()); 596 597 snapshot = memstore.snapshot(); 598 // Adding more value 599 memstore.add(new KeyValue(row, fam, qf2, 4, val), null); 600 memstore.add(new KeyValue(row, fam, qf3, 4, val), null); 601 // opening scanners 602 scanners = memstore.getScanners(0); 603 // close scanners before clear the snapshot 604 for (KeyValueScanner scanner : scanners) { 605 scanner.close(); 606 } 607 // Since no opening scanner, the chunks of snapshot should be put back to 608 // pool 609 // close the scanners 610 for(KeyValueScanner scanner : snapshot.getScanners()) { 611 scanner.close(); 612 } 613 memstore.clearSnapshot(snapshot.getId()); 614 assertTrue(chunkCreator.getPoolSize() > 0); 615 } 616 617 ////////////////////////////////////////////////////////////////////////////// 618 // Compaction tests 619 ////////////////////////////////////////////////////////////////////////////// 620 @Test 621 public void testCompaction1Bucket() throws IOException { 622 623 // set memstore to do basic structure flattening, the "eager" option is tested in 624 // TestCompactingToCellFlatMapMemStore 625 MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC; 626 memstore.getConfiguration() 627 .set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, String.valueOf(compactionType)); 628 ((MyCompactingMemStore)memstore).initiateType(compactionType, memstore.getConfiguration()); 629 630 String[] keys1 = { "A", "A", "B", "C" }; //A1, A2, B3, C4 631 632 // test 1 bucket 633 int totalCellsLen = addRowsByKeys(memstore, keys1); 634 int oneCellOnCSLMHeapSize = 120; 635 int oneCellOnCAHeapSize = 88; 636 long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 4 * oneCellOnCSLMHeapSize; 637 assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize()); 638 assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize()); 639 640 ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact 641 assertEquals(0, memstore.getSnapshot().getCellsCount()); 642 // There is no compaction, as the compacting memstore type is basic. 643 // totalCellsLen remains the same 644 totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM 645 + 4 * oneCellOnCAHeapSize; 646 assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize()); 647 assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize()); 648 649 MemStoreSize mss = memstore.getFlushableSize(); 650 MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot 651 // simulate flusher 652 region.decrMemStoreSize(mss); 653 ImmutableSegment s = memstore.getSnapshot(); 654 assertEquals(4, s.getCellsCount()); 655 assertEquals(0, regionServicesForStores.getMemStoreSize()); 656 657 memstore.clearSnapshot(snapshot.getId()); 658 } 659 660 @Test 661 public void testCompaction2Buckets() throws IOException { 662 663 // set memstore to do basic structure flattening, the "eager" option is tested in 664 // TestCompactingToCellFlatMapMemStore 665 MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC; 666 memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, 667 String.valueOf(compactionType)); 668 memstore.getConfiguration().set(MemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY, 669 String.valueOf(1)); 670 ((MyCompactingMemStore)memstore).initiateType(compactionType, memstore.getConfiguration()); 671 String[] keys1 = { "A", "A", "B", "C" }; 672 String[] keys2 = { "A", "B", "D" }; 673 674 int totalCellsLen1 = addRowsByKeys(memstore, keys1); 675 int oneCellOnCSLMHeapSize = 120; 676 int oneCellOnCAHeapSize = 88; 677 long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 4 * oneCellOnCSLMHeapSize; 678 679 assertEquals(totalCellsLen1, regionServicesForStores.getMemStoreSize()); 680 assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize()); 681 682 ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact 683 int counter = 0; 684 for ( Segment s : memstore.getSegments()) { 685 counter += s.getCellsCount(); 686 } 687 assertEquals(4, counter); 688 assertEquals(0, memstore.getSnapshot().getCellsCount()); 689 // There is no compaction, as the compacting memstore type is basic. 690 // totalCellsLen remains the same 691 assertEquals(totalCellsLen1, regionServicesForStores.getMemStoreSize()); 692 totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM 693 + 4 * oneCellOnCAHeapSize; 694 assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize()); 695 696 int totalCellsLen2 = addRowsByKeys(memstore, keys2); 697 totalHeapSize += 3 * oneCellOnCSLMHeapSize; 698 assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize()); 699 assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize()); 700 701 MemStoreSize mss = memstore.getFlushableSize(); 702 ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact 703 assertEquals(0, memstore.getSnapshot().getCellsCount()); 704 assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize()); 705 totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM 706 + 7 * oneCellOnCAHeapSize; 707 assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize()); 708 709 mss = memstore.getFlushableSize(); 710 MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot 711 // simulate flusher 712 region.decrMemStoreSize(mss.getDataSize(), mss.getHeapSize(), mss.getOffHeapSize(), 713 mss.getCellsCount()); 714 ImmutableSegment s = memstore.getSnapshot(); 715 assertEquals(7, s.getCellsCount()); 716 assertEquals(0, regionServicesForStores.getMemStoreSize()); 717 718 memstore.clearSnapshot(snapshot.getId()); 719 } 720 721 @Test 722 public void testCompaction3Buckets() throws IOException { 723 724 // set memstore to do data compaction and not to use the speculative scan 725 MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.EAGER; 726 memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, 727 String.valueOf(compactionType)); 728 ((MyCompactingMemStore)memstore).initiateType(compactionType, memstore.getConfiguration()); 729 String[] keys1 = { "A", "A", "B", "C" }; 730 String[] keys2 = { "A", "B", "D" }; 731 String[] keys3 = { "D", "B", "B" }; 732 733 int totalCellsLen1 = addRowsByKeys(memstore, keys1);// Adding 4 cells. 734 int oneCellOnCSLMHeapSize = 120; 735 int oneCellOnCAHeapSize = 88; 736 assertEquals(totalCellsLen1, region.getMemStoreDataSize()); 737 long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 4 * oneCellOnCSLMHeapSize; 738 assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize()); 739 ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact 740 741 assertEquals(0, memstore.getSnapshot().getCellsCount()); 742 // One cell is duplicated and the compaction will remove it. All cells of same time so adjusting 743 // totalCellsLen 744 totalCellsLen1 = (totalCellsLen1 * 3) / 4; 745 assertEquals(totalCellsLen1, regionServicesForStores.getMemStoreSize()); 746 // In memory flush to make a CellArrayMap instead of CSLM. See the overhead diff. 747 totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM 748 + 3 * oneCellOnCAHeapSize; 749 assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize()); 750 751 int totalCellsLen2 = addRowsByKeys(memstore, keys2);// Adding 3 more cells. 752 long totalHeapSize2 = totalHeapSize + 3 * oneCellOnCSLMHeapSize; 753 754 assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize()); 755 assertEquals(totalHeapSize2, ((CompactingMemStore) memstore).heapSize()); 756 757 ((MyCompactingMemStore) memstore).disableCompaction(); 758 MemStoreSize mss = memstore.getFlushableSize(); 759 ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline without compaction 760 assertEquals(0, memstore.getSnapshot().getCellsCount()); 761 // No change in the cells data size. ie. memstore size. as there is no compaction. 762 assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize()); 763 assertEquals(totalHeapSize2 + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM, 764 ((CompactingMemStore) memstore).heapSize()); 765 766 int totalCellsLen3 = addRowsByKeys(memstore, keys3);// 3 more cells added 767 assertEquals(totalCellsLen1 + totalCellsLen2 + totalCellsLen3, 768 regionServicesForStores.getMemStoreSize()); 769 long totalHeapSize3 = totalHeapSize2 + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM 770 + 3 * oneCellOnCSLMHeapSize; 771 assertEquals(totalHeapSize3, ((CompactingMemStore) memstore).heapSize()); 772 773 ((MyCompactingMemStore)memstore).enableCompaction(); 774 mss = memstore.getFlushableSize(); 775 ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact 776 assertEquals(0, memstore.getSnapshot().getCellsCount()); 777 // active flushed to pipeline and all 3 segments compacted. Will get rid of duplicated cells. 778 // Out of total 10, only 4 cells are unique 779 totalCellsLen2 = totalCellsLen2 / 3;// 2 out of 3 cells are duplicated 780 totalCellsLen3 = 0;// All duplicated cells. 781 assertEquals(totalCellsLen1 + totalCellsLen2 + totalCellsLen3, 782 regionServicesForStores.getMemStoreSize()); 783 // Only 4 unique cells left 784 assertEquals(4 * oneCellOnCAHeapSize + MutableSegment.DEEP_OVERHEAD 785 + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM, ((CompactingMemStore) memstore).heapSize()); 786 787 mss = memstore.getFlushableSize(); 788 MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot 789 // simulate flusher 790 region.decrMemStoreSize(mss.getDataSize(), mss.getHeapSize(), mss.getOffHeapSize(), 791 mss.getCellsCount()); 792 ImmutableSegment s = memstore.getSnapshot(); 793 assertEquals(4, s.getCellsCount()); 794 assertEquals(0, regionServicesForStores.getMemStoreSize()); 795 796 memstore.clearSnapshot(snapshot.getId()); 797 } 798 799 @Test 800 public void testMagicCompaction3Buckets() throws IOException { 801 802 MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.ADAPTIVE; 803 memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, 804 String.valueOf(compactionType)); 805 memstore.getConfiguration().setDouble( 806 AdaptiveMemStoreCompactionStrategy.ADAPTIVE_COMPACTION_THRESHOLD_KEY, 0.45); 807 memstore.getConfiguration().setInt( 808 AdaptiveMemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY, 2); 809 memstore.getConfiguration().setInt(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 1); 810 ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration()); 811 812 String[] keys1 = { "A", "B", "D" }; 813 String[] keys2 = { "A" }; 814 String[] keys3 = { "A", "A", "B", "C" }; 815 String[] keys4 = { "D", "B", "B" }; 816 817 int totalCellsLen1 = addRowsByKeys(memstore, keys1);// Adding 3 cells. 818 int oneCellOnCSLMHeapSize = 120; 819 assertEquals(totalCellsLen1, region.getMemStoreDataSize()); 820 long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 3 * oneCellOnCSLMHeapSize; 821 assertEquals(totalHeapSize, memstore.heapSize()); 822 823 ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline - flatten 824 assertEquals(3, ((CompactingMemStore) memstore).getImmutableSegments().getNumOfCells()); 825 assertEquals(1.0, 826 ((CompactingMemStore) memstore).getImmutableSegments().getEstimatedUniquesFrac(), 0); 827 assertEquals(0, memstore.getSnapshot().getCellsCount()); 828 829 addRowsByKeys(memstore, keys2);// Adding 1 more cell - flatten. 830 ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline without compaction 831 assertEquals(4, ((CompactingMemStore) memstore).getImmutableSegments().getNumOfCells()); 832 assertEquals(1.0, 833 ((CompactingMemStore) memstore).getImmutableSegments().getEstimatedUniquesFrac(), 0); 834 assertEquals(0, memstore.getSnapshot().getCellsCount()); 835 836 addRowsByKeys(memstore, keys3);// Adding 4 more cells - merge. 837 ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline without compaction 838 assertEquals(8, ((CompactingMemStore) memstore).getImmutableSegments().getNumOfCells()); 839 assertEquals((4.0 / 8.0), 840 ((CompactingMemStore) memstore).getImmutableSegments().getEstimatedUniquesFrac(), 0); 841 assertEquals(0, memstore.getSnapshot().getCellsCount()); 842 843 addRowsByKeys(memstore, keys4);// 3 more cells added - compact (or not) 844 ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact 845 int numCells = ((CompactingMemStore) memstore).getImmutableSegments().getNumOfCells(); 846 assertTrue(4 == numCells || 11 == numCells); 847 assertEquals(0, memstore.getSnapshot().getCellsCount()); 848 849 MemStoreSize mss = memstore.getFlushableSize(); 850 MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot 851 // simulate flusher 852 region.decrMemStoreSize(mss); 853 ImmutableSegment s = memstore.getSnapshot(); 854 numCells = s.getCellsCount(); 855 assertTrue(4 == numCells || 11 == numCells); 856 assertEquals(0, regionServicesForStores.getMemStoreSize()); 857 858 memstore.clearSnapshot(snapshot.getId()); 859 } 860 861 protected int addRowsByKeys(final AbstractMemStore hmc, String[] keys) { 862 byte[] fam = Bytes.toBytes("testfamily"); 863 byte[] qf = Bytes.toBytes("testqualifier"); 864 long size = hmc.getActive().getDataSize(); 865 long heapOverhead = hmc.getActive().getHeapSize(); 866 int cellsCount = hmc.getActive().getCellsCount(); 867 int totalLen = 0; 868 for (int i = 0; i < keys.length; i++) { 869 long timestamp = System.currentTimeMillis(); 870 Threads.sleep(1); // to make sure each kv gets a different ts 871 byte[] row = Bytes.toBytes(keys[i]); 872 byte[] val = Bytes.toBytes(keys[i] + i); 873 KeyValue kv = new KeyValue(row, fam, qf, timestamp, val); 874 totalLen += Segment.getCellLength(kv); 875 hmc.add(kv, null); 876 LOG.debug("added kv: " + kv.getKeyString() + ", timestamp:" + kv.getTimestamp()); 877 } 878 regionServicesForStores.addMemStoreSize(hmc.getActive().getDataSize() - size, 879 hmc.getActive().getHeapSize() - heapOverhead, 0, 880 hmc.getActive().getCellsCount() - cellsCount); 881 return totalLen; 882 } 883 884 // for controlling the val size when adding a new cell 885 protected int addRowsByKeys(final AbstractMemStore hmc, String[] keys, byte[] val) { 886 byte[] fam = Bytes.toBytes("testfamily"); 887 byte[] qf = Bytes.toBytes("testqualifier"); 888 long size = hmc.getActive().getDataSize(); 889 long heapOverhead = hmc.getActive().getHeapSize(); 890 int cellsCount = hmc.getActive().getCellsCount(); 891 int totalLen = 0; 892 for (int i = 0; i < keys.length; i++) { 893 long timestamp = System.currentTimeMillis(); 894 Threads.sleep(1); // to make sure each kv gets a different ts 895 byte[] row = Bytes.toBytes(keys[i]); 896 KeyValue kv = new KeyValue(row, fam, qf, timestamp, val); 897 totalLen += Segment.getCellLength(kv); 898 hmc.add(kv, null); 899 LOG.debug("added kv: " + kv.getKeyString() + ", timestamp:" + kv.getTimestamp()); 900 } 901 regionServicesForStores.addMemStoreSize(hmc.getActive().getDataSize() - size, 902 hmc.getActive().getHeapSize() - heapOverhead, 0, cellsCount); 903 return totalLen; 904 } 905 906 private class EnvironmentEdgeForMemstoreTest implements EnvironmentEdge { 907 long t = 1234; 908 909 @Override 910 public long currentTime() { 911 return t; 912 } 913 public void setCurrentTimeMillis(long t) { 914 this.t = t; 915 } 916 } 917 918 static protected class MyCompactingMemStore extends CompactingMemStore { 919 920 public MyCompactingMemStore(Configuration conf, CellComparator c, HStore store, 921 RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy) 922 throws IOException { 923 super(conf, c, store, regionServices, compactionPolicy); 924 } 925 926 void disableCompaction() { 927 allowCompaction.set(false); 928 } 929 930 void enableCompaction() { 931 allowCompaction.set(true); 932 } 933 void initiateType(MemoryCompactionPolicy compactionType, Configuration conf) 934 throws IllegalArgumentIOException { 935 compactor.initiateCompactionStrategy(compactionType, conf, "CF_TEST"); 936 } 937 938 } 939}