001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertNotEquals; 022import static org.junit.Assert.assertNotNull; 023import static org.junit.Assert.assertNull; 024import static org.junit.Assert.assertTrue; 025 026import java.io.IOException; 027import java.lang.management.ManagementFactory; 028import java.util.ArrayList; 029import java.util.List; 030import java.util.concurrent.Executors; 031import java.util.concurrent.ThreadPoolExecutor; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.hbase.Cell; 034import org.apache.hadoop.hbase.CellComparator; 035import org.apache.hadoop.hbase.CellUtil; 036import org.apache.hadoop.hbase.HBaseClassTestRule; 037import org.apache.hadoop.hbase.HBaseConfiguration; 038import org.apache.hadoop.hbase.HBaseTestingUtility; 039import org.apache.hadoop.hbase.HColumnDescriptor; 040import org.apache.hadoop.hbase.HConstants; 041import org.apache.hadoop.hbase.HRegionInfo; 042import org.apache.hadoop.hbase.HTableDescriptor; 043import org.apache.hadoop.hbase.KeepDeletedCells; 044import org.apache.hadoop.hbase.KeyValue; 045import org.apache.hadoop.hbase.KeyValueTestUtil; 046import org.apache.hadoop.hbase.MemoryCompactionPolicy; 047import org.apache.hadoop.hbase.TableName; 048import org.apache.hadoop.hbase.client.Scan; 049import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException; 050import org.apache.hadoop.hbase.io.util.MemorySizeUtil; 051import org.apache.hadoop.hbase.testclassification.MediumTests; 052import org.apache.hadoop.hbase.testclassification.RegionServerTests; 053import org.apache.hadoop.hbase.util.Bytes; 054import org.apache.hadoop.hbase.util.EnvironmentEdge; 055import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 056import org.apache.hadoop.hbase.util.Threads; 057import org.apache.hadoop.hbase.wal.WAL; 058import org.junit.After; 059import org.junit.Before; 060import org.junit.ClassRule; 061import org.junit.Test; 062import org.junit.experimental.categories.Category; 063import org.mockito.Mockito; 064import org.slf4j.Logger; 065import org.slf4j.LoggerFactory; 066 067/** 068 * compacted memstore test case 069 */ 070@Category({ RegionServerTests.class, MediumTests.class }) 071public class TestCompactingMemStore extends TestDefaultMemStore { 072 073 @ClassRule 074 public static final HBaseClassTestRule CLASS_RULE = 075 HBaseClassTestRule.forClass(TestCompactingMemStore.class); 076 077 private static final Logger LOG = LoggerFactory.getLogger(TestCompactingMemStore.class); 078 protected static ChunkCreator chunkCreator; 079 protected HRegion region; 080 protected RegionServicesForStores regionServicesForStores; 081 protected HStore store; 082 083 ////////////////////////////////////////////////////////////////////////////// 084 // Helpers 085 ////////////////////////////////////////////////////////////////////////////// 086 protected static byte[] makeQualifier(final int i1, final int i2) { 087 return Bytes.toBytes(Integer.toString(i1) + ";" + Integer.toString(i2)); 088 } 089 090 @After 091 public void tearDown() throws Exception { 092 chunkCreator.clearChunksInPool(); 093 } 094 095 @Override 096 @Before 097 public void setUp() throws Exception { 098 compactingSetUp(); 099 this.memstore = new MyCompactingMemStore(HBaseConfiguration.create(), 100 CellComparator.getInstance(), store, regionServicesForStores, MemoryCompactionPolicy.EAGER); 101 ((CompactingMemStore) memstore).setIndexType(CompactingMemStore.IndexType.ARRAY_MAP); 102 } 103 104 protected void compactingSetUp() throws Exception { 105 super.internalSetUp(); 106 Configuration conf = new Configuration(); 107 conf.setBoolean(MemStoreLAB.USEMSLAB_KEY, true); 108 conf.setFloat(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, 0.2f); 109 conf.setInt(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, 1000); 110 HBaseTestingUtility hbaseUtility = HBaseTestingUtility.createLocalHTU(conf); 111 HColumnDescriptor hcd = new HColumnDescriptor(FAMILY); 112 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("foobar")); 113 htd.addFamily(hcd); 114 HRegionInfo info = new HRegionInfo(TableName.valueOf("foobar"), null, null, false); 115 WAL wal = hbaseUtility.createWal(conf, hbaseUtility.getDataTestDir(), info); 116 this.region = HRegion.createHRegion(info, hbaseUtility.getDataTestDir(), conf, htd, wal, true); 117 this.regionServicesForStores = Mockito.spy(region.getRegionServicesForStores()); 118 ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1); 119 Mockito.when(regionServicesForStores.getInMemoryCompactionPool()).thenReturn(pool); 120 this.store = new HStore(region, hcd, conf, false); 121 122 long globalMemStoreLimit = 123 (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax() 124 * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false)); 125 chunkCreator = ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 126 globalMemStoreLimit, 0.4f, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null, 127 MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); 128 assertNotNull(chunkCreator); 129 } 130 131 /** 132 * A simple test which flush in memory affect timeOfOldestEdit 133 */ 134 @Test 135 public void testTimeOfOldestEdit() { 136 assertEquals(Long.MAX_VALUE, memstore.timeOfOldestEdit()); 137 final byte[] r = Bytes.toBytes("r"); 138 final byte[] f = Bytes.toBytes("f"); 139 final byte[] q = Bytes.toBytes("q"); 140 final byte[] v = Bytes.toBytes("v"); 141 final KeyValue kv = new KeyValue(r, f, q, v); 142 memstore.add(kv, null); 143 long timeOfOldestEdit = memstore.timeOfOldestEdit(); 144 assertNotEquals(Long.MAX_VALUE, timeOfOldestEdit); 145 146 ((CompactingMemStore) memstore).flushInMemory(); 147 assertEquals(timeOfOldestEdit, memstore.timeOfOldestEdit()); 148 memstore.add(kv, null); 149 assertEquals(timeOfOldestEdit, memstore.timeOfOldestEdit()); 150 memstore.snapshot(); 151 assertEquals(Long.MAX_VALUE, memstore.timeOfOldestEdit()); 152 } 153 154 /** 155 * A simple test which verifies the 3 possible states when scanning across snapshot. nn 156 */ 157 @Override 158 @Test 159 public void testScanAcrossSnapshot2() throws IOException, CloneNotSupportedException { 160 // we are going to the scanning across snapshot with two kvs 161 // kv1 should always be returned before kv2 162 final byte[] one = Bytes.toBytes(1); 163 final byte[] two = Bytes.toBytes(2); 164 final byte[] f = Bytes.toBytes("f"); 165 final byte[] q = Bytes.toBytes("q"); 166 final byte[] v = Bytes.toBytes(3); 167 168 final KeyValue kv1 = new KeyValue(one, f, q, 10, v); 169 final KeyValue kv2 = new KeyValue(two, f, q, 10, v); 170 171 // use case 1: both kvs in kvset 172 this.memstore.add(kv1.clone(), null); 173 this.memstore.add(kv2.clone(), null); 174 // snapshot is empty,active segment is not empty, 175 // empty segment is skipped. 176 verifyOneScanAcrossSnapshot2(kv1, kv2); 177 178 // use case 2: both kvs in snapshot 179 this.memstore.snapshot(); 180 // active segment is empty,snapshot is not empty, 181 // empty segment is skipped. 182 verifyOneScanAcrossSnapshot2(kv1, kv2); 183 184 // use case 3: first in snapshot second in kvset 185 this.memstore = new CompactingMemStore(HBaseConfiguration.create(), 186 CellComparator.getInstance(), store, regionServicesForStores, MemoryCompactionPolicy.EAGER); 187 this.memstore.add(kv1.clone(), null); 188 // As compaction is starting in the background the repetition 189 // of the k1 might be removed BUT the scanners created earlier 190 // should look on the OLD MutableCellSetSegment, so this should be OK... 191 this.memstore.snapshot(); 192 this.memstore.add(kv2.clone(), null); 193 verifyScanAcrossSnapshot2(kv1, kv2); 194 } 195 196 /** 197 * Test memstore snapshots n 198 */ 199 @Override 200 @Test 201 public void testSnapshotting() throws IOException { 202 final int snapshotCount = 5; 203 // Add some rows, run a snapshot. Do it a few times. 204 for (int i = 0; i < snapshotCount; i++) { 205 addRows(this.memstore); 206 runSnapshot(this.memstore, true); 207 assertEquals("History not being cleared", 0, this.memstore.getSnapshot().getCellsCount()); 208 } 209 } 210 211 ////////////////////////////////////////////////////////////////////////////// 212 // Get tests 213 ////////////////////////////////////////////////////////////////////////////// 214 215 /** 216 * Test getNextRow from memstore n 217 */ 218 @Override 219 @Test 220 public void testGetNextRow() throws Exception { 221 addRows(this.memstore); 222 // Add more versions to make it a little more interesting. 223 Thread.sleep(1); 224 addRows(this.memstore); 225 Cell closestToEmpty = ((CompactingMemStore) this.memstore).getNextRow(KeyValue.LOWESTKEY); 226 assertTrue(CellComparator.getInstance().compareRows(closestToEmpty, 227 new KeyValue(Bytes.toBytes(0), EnvironmentEdgeManager.currentTime())) == 0); 228 for (int i = 0; i < ROW_COUNT; i++) { 229 Cell nr = ((CompactingMemStore) this.memstore) 230 .getNextRow(new KeyValue(Bytes.toBytes(i), EnvironmentEdgeManager.currentTime())); 231 if (i + 1 == ROW_COUNT) { 232 assertNull(nr); 233 } else { 234 assertTrue(CellComparator.getInstance().compareRows(nr, 235 new KeyValue(Bytes.toBytes(i + 1), EnvironmentEdgeManager.currentTime())) == 0); 236 } 237 } 238 // starting from each row, validate results should contain the starting row 239 Configuration conf = HBaseConfiguration.create(); 240 for (int startRowId = 0; startRowId < ROW_COUNT; startRowId++) { 241 ScanInfo scanInfo = 242 new ScanInfo(conf, FAMILY, 0, 1, Integer.MAX_VALUE, KeepDeletedCells.FALSE, 243 HConstants.DEFAULT_BLOCKSIZE, 0, this.memstore.getComparator(), false); 244 try (InternalScanner scanner = 245 new StoreScanner(new Scan().withStartRow(Bytes.toBytes(startRowId)), scanInfo, null, 246 memstore.getScanners(0))) { 247 List<Cell> results = new ArrayList<>(); 248 for (int i = 0; scanner.next(results); i++) { 249 int rowId = startRowId + i; 250 Cell left = results.get(0); 251 byte[] row1 = Bytes.toBytes(rowId); 252 assertTrue("Row name", 253 CellComparator.getInstance().compareRows(left, row1, 0, row1.length) == 0); 254 assertEquals("Count of columns", QUALIFIER_COUNT, results.size()); 255 List<Cell> row = new ArrayList<>(); 256 for (Cell kv : results) { 257 row.add(kv); 258 } 259 isExpectedRowWithoutTimestamps(rowId, row); 260 // Clear out set. Otherwise row results accumulate. 261 results.clear(); 262 } 263 } 264 } 265 } 266 267 @Override 268 @Test 269 public void testGet_memstoreAndSnapShot() throws IOException { 270 byte[] row = Bytes.toBytes("testrow"); 271 byte[] fam = Bytes.toBytes("testfamily"); 272 byte[] qf1 = Bytes.toBytes("testqualifier1"); 273 byte[] qf2 = Bytes.toBytes("testqualifier2"); 274 byte[] qf3 = Bytes.toBytes("testqualifier3"); 275 byte[] qf4 = Bytes.toBytes("testqualifier4"); 276 byte[] qf5 = Bytes.toBytes("testqualifier5"); 277 byte[] val = Bytes.toBytes("testval"); 278 279 // Setting up memstore 280 memstore.add(new KeyValue(row, fam, qf1, val), null); 281 memstore.add(new KeyValue(row, fam, qf2, val), null); 282 memstore.add(new KeyValue(row, fam, qf3, val), null); 283 // Pushing to pipeline 284 ((CompactingMemStore) memstore).flushInMemory(); 285 assertEquals(0, memstore.getSnapshot().getCellsCount()); 286 // Creating a snapshot 287 memstore.snapshot(); 288 assertEquals(3, memstore.getSnapshot().getCellsCount()); 289 // Adding value to "new" memstore 290 assertEquals(0, memstore.getActive().getCellsCount()); 291 memstore.add(new KeyValue(row, fam, qf4, val), null); 292 memstore.add(new KeyValue(row, fam, qf5, val), null); 293 assertEquals(2, memstore.getActive().getCellsCount()); 294 } 295 296 //////////////////////////////////// 297 // Test for periodic memstore flushes 298 // based on time of oldest edit 299 //////////////////////////////////// 300 301 /** 302 * Add keyvalues with a fixed memstoreTs, and checks that memstore size is decreased as older 303 * keyvalues are deleted from the memstore. n 304 */ 305 @Override 306 @Test 307 public void testUpsertMemstoreSize() throws Exception { 308 MemStoreSize oldSize = memstore.size(); 309 310 List<Cell> l = new ArrayList<>(); 311 KeyValue kv1 = KeyValueTestUtil.create("r", "f", "q", 100, "v"); 312 KeyValue kv2 = KeyValueTestUtil.create("r", "f", "q", 101, "v"); 313 KeyValue kv3 = KeyValueTestUtil.create("r", "f", "q", 102, "v"); 314 315 kv1.setSequenceId(1); 316 kv2.setSequenceId(1); 317 kv3.setSequenceId(1); 318 l.add(kv1); 319 l.add(kv2); 320 l.add(kv3); 321 322 this.memstore.upsert(l, 2, null);// readpoint is 2 323 MemStoreSize newSize = this.memstore.size(); 324 assert (newSize.getDataSize() > oldSize.getDataSize()); 325 // The kv1 should be removed. 326 assert (memstore.getActive().getCellsCount() == 2); 327 328 KeyValue kv4 = KeyValueTestUtil.create("r", "f", "q", 104, "v"); 329 kv4.setSequenceId(1); 330 l.clear(); 331 l.add(kv4); 332 this.memstore.upsert(l, 3, null); 333 assertEquals(newSize, this.memstore.size()); 334 // The kv2 should be removed. 335 assert (memstore.getActive().getCellsCount() == 2); 336 // this.memstore = null; 337 } 338 339 /** 340 * Tests that the timeOfOldestEdit is updated correctly for the various edit operations in 341 * memstore. n 342 */ 343 @Override 344 @Test 345 public void testUpdateToTimeOfOldestEdit() throws Exception { 346 try { 347 EnvironmentEdgeForMemstoreTest edge = new EnvironmentEdgeForMemstoreTest(); 348 EnvironmentEdgeManager.injectEdge(edge); 349 long t = memstore.timeOfOldestEdit(); 350 assertEquals(Long.MAX_VALUE, t); 351 352 // test the case that the timeOfOldestEdit is updated after a KV add 353 memstore.add(KeyValueTestUtil.create("r", "f", "q", 100, "v"), null); 354 t = memstore.timeOfOldestEdit(); 355 assertTrue(t == 1234); 356 // The method will also assert 357 // the value is reset to Long.MAX_VALUE 358 t = runSnapshot(memstore, true); 359 360 // test the case that the timeOfOldestEdit is updated after a KV delete 361 memstore.add(KeyValueTestUtil.create("r", "f", "q", 100, KeyValue.Type.Delete, "v"), null); 362 t = memstore.timeOfOldestEdit(); 363 assertTrue(t == 1234); 364 t = runSnapshot(memstore, true); 365 366 // test the case that the timeOfOldestEdit is updated after a KV upsert 367 List<Cell> l = new ArrayList<>(); 368 KeyValue kv1 = KeyValueTestUtil.create("r", "f", "q", 100, "v"); 369 kv1.setSequenceId(100); 370 l.add(kv1); 371 memstore.upsert(l, 1000, null); 372 t = memstore.timeOfOldestEdit(); 373 assertTrue(t == 1234); 374 } finally { 375 EnvironmentEdgeManager.reset(); 376 } 377 } 378 379 private long runSnapshot(final AbstractMemStore hmc, boolean useForce) throws IOException { 380 // Save off old state. 381 long oldHistorySize = hmc.getSnapshot().getDataSize(); 382 long prevTimeStamp = hmc.timeOfOldestEdit(); 383 384 hmc.snapshot(); 385 MemStoreSnapshot snapshot = hmc.snapshot(); 386 if (useForce) { 387 // Make some assertions about what just happened. 388 assertTrue("History size has not increased", oldHistorySize < snapshot.getDataSize()); 389 long t = hmc.timeOfOldestEdit(); 390 assertTrue("Time of oldest edit is not Long.MAX_VALUE", t == Long.MAX_VALUE); 391 hmc.clearSnapshot(snapshot.getId()); 392 } else { 393 long t = hmc.timeOfOldestEdit(); 394 assertTrue("Time of oldest edit didn't remain the same", t == prevTimeStamp); 395 } 396 return prevTimeStamp; 397 } 398 399 private void isExpectedRowWithoutTimestamps(final int rowIndex, List<Cell> kvs) { 400 int i = 0; 401 for (Cell kv : kvs) { 402 byte[] expectedColname = makeQualifier(rowIndex, i++); 403 assertTrue("Column name", CellUtil.matchingQualifier(kv, expectedColname)); 404 // Value is column name as bytes. Usually result is 405 // 100 bytes in size at least. This is the default size 406 // for BytesWriteable. For comparison, convert bytes to 407 // String and trim to remove trailing null bytes. 408 assertTrue("Content", CellUtil.matchingValue(kv, expectedColname)); 409 } 410 } 411 412 @Test 413 public void testPuttingBackChunksAfterFlushing() throws IOException { 414 byte[] row = Bytes.toBytes("testrow"); 415 byte[] fam = Bytes.toBytes("testfamily"); 416 byte[] qf1 = Bytes.toBytes("testqualifier1"); 417 byte[] qf2 = Bytes.toBytes("testqualifier2"); 418 byte[] qf3 = Bytes.toBytes("testqualifier3"); 419 byte[] qf4 = Bytes.toBytes("testqualifier4"); 420 byte[] qf5 = Bytes.toBytes("testqualifier5"); 421 byte[] val = Bytes.toBytes("testval"); 422 423 // Setting up memstore 424 memstore.add(new KeyValue(row, fam, qf1, val), null); 425 memstore.add(new KeyValue(row, fam, qf2, val), null); 426 memstore.add(new KeyValue(row, fam, qf3, val), null); 427 428 // Creating a snapshot 429 MemStoreSnapshot snapshot = memstore.snapshot(); 430 assertEquals(3, memstore.getSnapshot().getCellsCount()); 431 432 // Adding value to "new" memstore 433 assertEquals(0, memstore.getActive().getCellsCount()); 434 memstore.add(new KeyValue(row, fam, qf4, val), null); 435 memstore.add(new KeyValue(row, fam, qf5, val), null); 436 assertEquals(2, memstore.getActive().getCellsCount()); 437 // close the scanners 438 for (KeyValueScanner scanner : snapshot.getScanners()) { 439 scanner.close(); 440 } 441 memstore.clearSnapshot(snapshot.getId()); 442 443 int chunkCount = chunkCreator.getPoolSize(); 444 assertTrue(chunkCount > 0); 445 446 } 447 448 @Test 449 public void testPuttingBackChunksWithOpeningScanner() throws IOException { 450 byte[] row = Bytes.toBytes("testrow"); 451 byte[] fam = Bytes.toBytes("testfamily"); 452 byte[] qf1 = Bytes.toBytes("testqualifier1"); 453 byte[] qf2 = Bytes.toBytes("testqualifier2"); 454 byte[] qf3 = Bytes.toBytes("testqualifier3"); 455 byte[] qf4 = Bytes.toBytes("testqualifier4"); 456 byte[] qf5 = Bytes.toBytes("testqualifier5"); 457 byte[] qf6 = Bytes.toBytes("testqualifier6"); 458 byte[] qf7 = Bytes.toBytes("testqualifier7"); 459 byte[] val = Bytes.toBytes("testval"); 460 461 // Setting up memstore 462 memstore.add(new KeyValue(row, fam, qf1, val), null); 463 memstore.add(new KeyValue(row, fam, qf2, val), null); 464 memstore.add(new KeyValue(row, fam, qf3, val), null); 465 466 // Creating a snapshot 467 MemStoreSnapshot snapshot = memstore.snapshot(); 468 assertEquals(3, memstore.getSnapshot().getCellsCount()); 469 470 // Adding value to "new" memstore 471 assertEquals(0, memstore.getActive().getCellsCount()); 472 memstore.add(new KeyValue(row, fam, qf4, val), null); 473 memstore.add(new KeyValue(row, fam, qf5, val), null); 474 assertEquals(2, memstore.getActive().getCellsCount()); 475 476 // opening scanner before clear the snapshot 477 List<KeyValueScanner> scanners = memstore.getScanners(0); 478 // Shouldn't putting back the chunks to pool,since some scanners are opening 479 // based on their data 480 // close the scanners 481 for (KeyValueScanner scanner : snapshot.getScanners()) { 482 scanner.close(); 483 } 484 memstore.clearSnapshot(snapshot.getId()); 485 486 assertTrue(chunkCreator.getPoolSize() == 0); 487 488 // Chunks will be put back to pool after close scanners; 489 for (KeyValueScanner scanner : scanners) { 490 scanner.close(); 491 } 492 assertTrue(chunkCreator.getPoolSize() > 0); 493 494 // clear chunks 495 chunkCreator.clearChunksInPool(); 496 497 // Creating another snapshot 498 499 snapshot = memstore.snapshot(); 500 // Adding more value 501 memstore.add(new KeyValue(row, fam, qf6, val), null); 502 memstore.add(new KeyValue(row, fam, qf7, val), null); 503 // opening scanners 504 scanners = memstore.getScanners(0); 505 // close scanners before clear the snapshot 506 for (KeyValueScanner scanner : scanners) { 507 scanner.close(); 508 } 509 // Since no opening scanner, the chunks of snapshot should be put back to 510 // pool 511 // close the scanners 512 for (KeyValueScanner scanner : snapshot.getScanners()) { 513 scanner.close(); 514 } 515 memstore.clearSnapshot(snapshot.getId()); 516 assertTrue(chunkCreator.getPoolSize() > 0); 517 } 518 519 @Test 520 public void testPuttingBackChunksWithOpeningPipelineScanner() throws IOException { 521 522 // set memstore to do data compaction and not to use the speculative scan 523 MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.EAGER; 524 memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, 525 String.valueOf(compactionType)); 526 ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration()); 527 528 byte[] row = Bytes.toBytes("testrow"); 529 byte[] fam = Bytes.toBytes("testfamily"); 530 byte[] qf1 = Bytes.toBytes("testqualifier1"); 531 byte[] qf2 = Bytes.toBytes("testqualifier2"); 532 byte[] qf3 = Bytes.toBytes("testqualifier3"); 533 byte[] val = Bytes.toBytes("testval"); 534 535 // Setting up memstore 536 memstore.add(new KeyValue(row, fam, qf1, 1, val), null); 537 memstore.add(new KeyValue(row, fam, qf2, 1, val), null); 538 memstore.add(new KeyValue(row, fam, qf3, 1, val), null); 539 540 // Creating a pipeline 541 ((MyCompactingMemStore) memstore).disableCompaction(); 542 ((CompactingMemStore) memstore).flushInMemory(); 543 544 // Adding value to "new" memstore 545 assertEquals(0, memstore.getActive().getCellsCount()); 546 memstore.add(new KeyValue(row, fam, qf1, 2, val), null); 547 memstore.add(new KeyValue(row, fam, qf2, 2, val), null); 548 assertEquals(2, memstore.getActive().getCellsCount()); 549 550 // pipeline bucket 2 551 ((CompactingMemStore) memstore).flushInMemory(); 552 // opening scanner before force flushing 553 List<KeyValueScanner> scanners = memstore.getScanners(0); 554 // Shouldn't putting back the chunks to pool,since some scanners are opening 555 // based on their data 556 ((MyCompactingMemStore) memstore).enableCompaction(); 557 // trigger compaction 558 ((CompactingMemStore) memstore).flushInMemory(); 559 560 // Adding value to "new" memstore 561 assertEquals(0, memstore.getActive().getCellsCount()); 562 memstore.add(new KeyValue(row, fam, qf3, 3, val), null); 563 memstore.add(new KeyValue(row, fam, qf2, 3, val), null); 564 memstore.add(new KeyValue(row, fam, qf1, 3, val), null); 565 assertEquals(3, memstore.getActive().getCellsCount()); 566 567 assertTrue(chunkCreator.getPoolSize() == 0); 568 569 // Chunks will be put back to pool after close scanners; 570 for (KeyValueScanner scanner : scanners) { 571 scanner.close(); 572 } 573 assertTrue(chunkCreator.getPoolSize() > 0); 574 575 // clear chunks 576 chunkCreator.clearChunksInPool(); 577 578 // Creating another snapshot 579 580 MemStoreSnapshot snapshot = memstore.snapshot(); 581 // close the scanners 582 for (KeyValueScanner scanner : snapshot.getScanners()) { 583 scanner.close(); 584 } 585 memstore.clearSnapshot(snapshot.getId()); 586 587 snapshot = memstore.snapshot(); 588 // Adding more value 589 memstore.add(new KeyValue(row, fam, qf2, 4, val), null); 590 memstore.add(new KeyValue(row, fam, qf3, 4, val), null); 591 // opening scanners 592 scanners = memstore.getScanners(0); 593 // close scanners before clear the snapshot 594 for (KeyValueScanner scanner : scanners) { 595 scanner.close(); 596 } 597 // Since no opening scanner, the chunks of snapshot should be put back to 598 // pool 599 // close the scanners 600 for (KeyValueScanner scanner : snapshot.getScanners()) { 601 scanner.close(); 602 } 603 memstore.clearSnapshot(snapshot.getId()); 604 assertTrue(chunkCreator.getPoolSize() > 0); 605 } 606 607 ////////////////////////////////////////////////////////////////////////////// 608 // Compaction tests 609 ////////////////////////////////////////////////////////////////////////////// 610 @Test 611 public void testCompaction1Bucket() throws IOException { 612 613 // set memstore to do basic structure flattening, the "eager" option is tested in 614 // TestCompactingToCellFlatMapMemStore 615 MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC; 616 memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, 617 String.valueOf(compactionType)); 618 ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration()); 619 620 String[] keys1 = { "A", "A", "B", "C" }; // A1, A2, B3, C4 621 622 // test 1 bucket 623 int totalCellsLen = addRowsByKeys(memstore, keys1); 624 int oneCellOnCSLMHeapSize = 120; 625 int oneCellOnCAHeapSize = 88; 626 long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 4 * oneCellOnCSLMHeapSize; 627 assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize()); 628 assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize()); 629 630 ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact 631 assertEquals(0, memstore.getSnapshot().getCellsCount()); 632 // There is no compaction, as the compacting memstore type is basic. 633 // totalCellsLen remains the same 634 totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM 635 + 4 * oneCellOnCAHeapSize; 636 assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize()); 637 assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize()); 638 639 MemStoreSize mss = memstore.getFlushableSize(); 640 MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot 641 // simulate flusher 642 region.decrMemStoreSize(mss); 643 ImmutableSegment s = memstore.getSnapshot(); 644 assertEquals(4, s.getCellsCount()); 645 assertEquals(0, regionServicesForStores.getMemStoreSize()); 646 647 memstore.clearSnapshot(snapshot.getId()); 648 } 649 650 @Test 651 public void testCompaction2Buckets() throws IOException { 652 653 // set memstore to do basic structure flattening, the "eager" option is tested in 654 // TestCompactingToCellFlatMapMemStore 655 MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC; 656 memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, 657 String.valueOf(compactionType)); 658 memstore.getConfiguration().set(MemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY, 659 String.valueOf(1)); 660 ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration()); 661 String[] keys1 = { "A", "A", "B", "C" }; 662 String[] keys2 = { "A", "B", "D" }; 663 664 int totalCellsLen1 = addRowsByKeys(memstore, keys1); 665 int oneCellOnCSLMHeapSize = 120; 666 int oneCellOnCAHeapSize = 88; 667 long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 4 * oneCellOnCSLMHeapSize; 668 669 assertEquals(totalCellsLen1, regionServicesForStores.getMemStoreSize()); 670 assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize()); 671 672 ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact 673 int counter = 0; 674 for (Segment s : memstore.getSegments()) { 675 counter += s.getCellsCount(); 676 } 677 assertEquals(4, counter); 678 assertEquals(0, memstore.getSnapshot().getCellsCount()); 679 // There is no compaction, as the compacting memstore type is basic. 680 // totalCellsLen remains the same 681 assertEquals(totalCellsLen1, regionServicesForStores.getMemStoreSize()); 682 totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM 683 + 4 * oneCellOnCAHeapSize; 684 assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize()); 685 686 int totalCellsLen2 = addRowsByKeys(memstore, keys2); 687 totalHeapSize += 3 * oneCellOnCSLMHeapSize; 688 assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize()); 689 assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize()); 690 691 MemStoreSize mss = memstore.getFlushableSize(); 692 ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact 693 assertEquals(0, memstore.getSnapshot().getCellsCount()); 694 assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize()); 695 totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM 696 + 7 * oneCellOnCAHeapSize; 697 assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize()); 698 699 mss = memstore.getFlushableSize(); 700 MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot 701 // simulate flusher 702 region.decrMemStoreSize(mss); 703 ImmutableSegment s = memstore.getSnapshot(); 704 assertEquals(7, s.getCellsCount()); 705 assertEquals(0, regionServicesForStores.getMemStoreSize()); 706 707 memstore.clearSnapshot(snapshot.getId()); 708 } 709 710 @Test 711 public void testCompaction3Buckets() throws IOException { 712 713 // set memstore to do data compaction and not to use the speculative scan 714 MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.EAGER; 715 memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, 716 String.valueOf(compactionType)); 717 ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration()); 718 String[] keys1 = { "A", "A", "B", "C" }; 719 String[] keys2 = { "A", "B", "D" }; 720 String[] keys3 = { "D", "B", "B" }; 721 722 int totalCellsLen1 = addRowsByKeys(memstore, keys1);// Adding 4 cells. 723 int oneCellOnCSLMHeapSize = 120; 724 int oneCellOnCAHeapSize = 88; 725 assertEquals(totalCellsLen1, region.getMemStoreDataSize()); 726 long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 4 * oneCellOnCSLMHeapSize; 727 assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize()); 728 ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact 729 730 assertEquals(0, memstore.getSnapshot().getCellsCount()); 731 // One cell is duplicated and the compaction will remove it. All cells of same time so adjusting 732 // totalCellsLen 733 totalCellsLen1 = (totalCellsLen1 * 3) / 4; 734 assertEquals(totalCellsLen1, regionServicesForStores.getMemStoreSize()); 735 // In memory flush to make a CellArrayMap instead of CSLM. See the overhead diff. 736 totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM 737 + 3 * oneCellOnCAHeapSize; 738 assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize()); 739 740 int totalCellsLen2 = addRowsByKeys(memstore, keys2);// Adding 3 more cells. 741 long totalHeapSize2 = totalHeapSize + 3 * oneCellOnCSLMHeapSize; 742 743 assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize()); 744 assertEquals(totalHeapSize2, ((CompactingMemStore) memstore).heapSize()); 745 746 ((MyCompactingMemStore) memstore).disableCompaction(); 747 MemStoreSize mss = memstore.getFlushableSize(); 748 ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline without compaction 749 assertEquals(0, memstore.getSnapshot().getCellsCount()); 750 // No change in the cells data size. ie. memstore size. as there is no compaction. 751 assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize()); 752 assertEquals(totalHeapSize2 + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM, 753 ((CompactingMemStore) memstore).heapSize()); 754 755 int totalCellsLen3 = addRowsByKeys(memstore, keys3);// 3 more cells added 756 assertEquals(totalCellsLen1 + totalCellsLen2 + totalCellsLen3, 757 regionServicesForStores.getMemStoreSize()); 758 long totalHeapSize3 = 759 totalHeapSize2 + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM + 3 * oneCellOnCSLMHeapSize; 760 assertEquals(totalHeapSize3, ((CompactingMemStore) memstore).heapSize()); 761 762 ((MyCompactingMemStore) memstore).enableCompaction(); 763 mss = memstore.getFlushableSize(); 764 ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact 765 assertEquals(0, memstore.getSnapshot().getCellsCount()); 766 // active flushed to pipeline and all 3 segments compacted. Will get rid of duplicated cells. 767 // Out of total 10, only 4 cells are unique 768 totalCellsLen2 = totalCellsLen2 / 3;// 2 out of 3 cells are duplicated 769 totalCellsLen3 = 0;// All duplicated cells. 770 assertEquals(totalCellsLen1 + totalCellsLen2 + totalCellsLen3, 771 regionServicesForStores.getMemStoreSize()); 772 // Only 4 unique cells left 773 assertEquals(4 * oneCellOnCAHeapSize + MutableSegment.DEEP_OVERHEAD 774 + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM, ((CompactingMemStore) memstore).heapSize()); 775 776 mss = memstore.getFlushableSize(); 777 MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot 778 // simulate flusher 779 region.decrMemStoreSize(mss); 780 ImmutableSegment s = memstore.getSnapshot(); 781 assertEquals(4, s.getCellsCount()); 782 assertEquals(0, regionServicesForStores.getMemStoreSize()); 783 784 memstore.clearSnapshot(snapshot.getId()); 785 } 786 787 @Test 788 public void testMagicCompaction3Buckets() throws IOException { 789 790 MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.ADAPTIVE; 791 memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, 792 String.valueOf(compactionType)); 793 memstore.getConfiguration() 794 .setDouble(AdaptiveMemStoreCompactionStrategy.ADAPTIVE_COMPACTION_THRESHOLD_KEY, 0.45); 795 memstore.getConfiguration() 796 .setInt(AdaptiveMemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY, 2); 797 memstore.getConfiguration().setInt(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 1); 798 ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration()); 799 800 String[] keys1 = { "A", "B", "D" }; 801 String[] keys2 = { "A" }; 802 String[] keys3 = { "A", "A", "B", "C" }; 803 String[] keys4 = { "D", "B", "B" }; 804 805 int totalCellsLen1 = addRowsByKeys(memstore, keys1);// Adding 3 cells. 806 int oneCellOnCSLMHeapSize = 120; 807 assertEquals(totalCellsLen1, region.getMemStoreDataSize()); 808 long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 3 * oneCellOnCSLMHeapSize; 809 assertEquals(totalHeapSize, memstore.heapSize()); 810 811 ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline - flatten 812 assertEquals(3, ((CompactingMemStore) memstore).getImmutableSegments().getNumOfCells()); 813 assertEquals(1.0, 814 ((CompactingMemStore) memstore).getImmutableSegments().getEstimatedUniquesFrac(), 0); 815 assertEquals(0, memstore.getSnapshot().getCellsCount()); 816 817 addRowsByKeys(memstore, keys2);// Adding 1 more cell - flatten. 818 ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline without compaction 819 assertEquals(4, ((CompactingMemStore) memstore).getImmutableSegments().getNumOfCells()); 820 assertEquals(1.0, 821 ((CompactingMemStore) memstore).getImmutableSegments().getEstimatedUniquesFrac(), 0); 822 assertEquals(0, memstore.getSnapshot().getCellsCount()); 823 824 addRowsByKeys(memstore, keys3);// Adding 4 more cells - merge. 825 ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline without compaction 826 assertEquals(8, ((CompactingMemStore) memstore).getImmutableSegments().getNumOfCells()); 827 assertEquals((4.0 / 8.0), 828 ((CompactingMemStore) memstore).getImmutableSegments().getEstimatedUniquesFrac(), 0); 829 assertEquals(0, memstore.getSnapshot().getCellsCount()); 830 831 addRowsByKeys(memstore, keys4);// 3 more cells added - compact (or not) 832 ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact 833 int numCells = ((CompactingMemStore) memstore).getImmutableSegments().getNumOfCells(); 834 assertTrue(4 == numCells || 11 == numCells); 835 assertEquals(0, memstore.getSnapshot().getCellsCount()); 836 837 MemStoreSize mss = memstore.getFlushableSize(); 838 MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot 839 // simulate flusher 840 region.decrMemStoreSize(mss); 841 ImmutableSegment s = memstore.getSnapshot(); 842 numCells = s.getCellsCount(); 843 assertTrue(4 == numCells || 11 == numCells); 844 assertEquals(0, regionServicesForStores.getMemStoreSize()); 845 846 memstore.clearSnapshot(snapshot.getId()); 847 } 848 849 protected int addRowsByKeys(final AbstractMemStore hmc, String[] keys) { 850 byte[] fam = Bytes.toBytes("testfamily"); 851 byte[] qf = Bytes.toBytes("testqualifier"); 852 long size = hmc.getActive().getDataSize(); 853 long heapOverhead = hmc.getActive().getHeapSize(); 854 int cellsCount = hmc.getActive().getCellsCount(); 855 int totalLen = 0; 856 for (int i = 0; i < keys.length; i++) { 857 long timestamp = EnvironmentEdgeManager.currentTime(); 858 Threads.sleep(1); // to make sure each kv gets a different ts 859 byte[] row = Bytes.toBytes(keys[i]); 860 byte[] val = Bytes.toBytes(keys[i] + i); 861 KeyValue kv = new KeyValue(row, fam, qf, timestamp, val); 862 totalLen += Segment.getCellLength(kv); 863 hmc.add(kv, null); 864 LOG.debug("added kv: " + kv.getKeyString() + ", timestamp:" + kv.getTimestamp()); 865 } 866 regionServicesForStores.addMemStoreSize(hmc.getActive().getDataSize() - size, 867 hmc.getActive().getHeapSize() - heapOverhead, 0, 868 hmc.getActive().getCellsCount() - cellsCount); 869 return totalLen; 870 } 871 872 // for controlling the val size when adding a new cell 873 protected int addRowsByKeys(final AbstractMemStore hmc, String[] keys, byte[] val) { 874 byte[] fam = Bytes.toBytes("testfamily"); 875 byte[] qf = Bytes.toBytes("testqualifier"); 876 long size = hmc.getActive().getDataSize(); 877 long heapOverhead = hmc.getActive().getHeapSize(); 878 int cellsCount = hmc.getActive().getCellsCount(); 879 int totalLen = 0; 880 for (int i = 0; i < keys.length; i++) { 881 long timestamp = EnvironmentEdgeManager.currentTime(); 882 Threads.sleep(1); // to make sure each kv gets a different ts 883 byte[] row = Bytes.toBytes(keys[i]); 884 KeyValue kv = new KeyValue(row, fam, qf, timestamp, val); 885 totalLen += Segment.getCellLength(kv); 886 hmc.add(kv, null); 887 LOG.debug("added kv: " + kv.getKeyString() + ", timestamp:" + kv.getTimestamp()); 888 } 889 regionServicesForStores.addMemStoreSize(hmc.getActive().getDataSize() - size, 890 hmc.getActive().getHeapSize() - heapOverhead, 0, cellsCount); 891 return totalLen; 892 } 893 894 private class EnvironmentEdgeForMemstoreTest implements EnvironmentEdge { 895 long t = 1234; 896 897 @Override 898 public long currentTime() { 899 return t; 900 } 901 902 public void setCurrentTimeMillis(long t) { 903 this.t = t; 904 } 905 } 906 907 static protected class MyCompactingMemStore extends CompactingMemStore { 908 909 public MyCompactingMemStore(Configuration conf, CellComparator c, HStore store, 910 RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy) 911 throws IOException { 912 super(conf, c, store, regionServices, compactionPolicy); 913 } 914 915 void disableCompaction() { 916 allowCompaction.set(false); 917 } 918 919 void enableCompaction() { 920 allowCompaction.set(true); 921 } 922 923 void initiateType(MemoryCompactionPolicy compactionType, Configuration conf) 924 throws IllegalArgumentIOException { 925 compactor.initiateCompactionStrategy(compactionType, conf, "CF_TEST"); 926 } 927 928 } 929}