001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertNotEquals; 022import static org.junit.Assert.assertNotNull; 023import static org.junit.Assert.assertNull; 024import static org.junit.Assert.assertTrue; 025 026import java.io.IOException; 027import java.lang.management.ManagementFactory; 028import java.util.ArrayList; 029import java.util.List; 030import java.util.concurrent.Executors; 031import java.util.concurrent.ThreadPoolExecutor; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.hbase.Cell; 034import org.apache.hadoop.hbase.CellComparator; 035import org.apache.hadoop.hbase.CellUtil; 036import org.apache.hadoop.hbase.HBaseClassTestRule; 037import org.apache.hadoop.hbase.HBaseConfiguration; 038import org.apache.hadoop.hbase.HBaseTestingUtil; 039import org.apache.hadoop.hbase.HConstants; 040import org.apache.hadoop.hbase.KeepDeletedCells; 041import org.apache.hadoop.hbase.KeyValue; 042import org.apache.hadoop.hbase.KeyValueTestUtil; 043import org.apache.hadoop.hbase.MemoryCompactionPolicy; 044import org.apache.hadoop.hbase.TableName; 045import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 046import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 047import org.apache.hadoop.hbase.client.RegionInfo; 048import org.apache.hadoop.hbase.client.RegionInfoBuilder; 049import org.apache.hadoop.hbase.client.Scan; 050import org.apache.hadoop.hbase.client.TableDescriptor; 051import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 052import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException; 053import org.apache.hadoop.hbase.io.util.MemorySizeUtil; 054import org.apache.hadoop.hbase.testclassification.MediumTests; 055import org.apache.hadoop.hbase.testclassification.RegionServerTests; 056import org.apache.hadoop.hbase.util.Bytes; 057import org.apache.hadoop.hbase.util.EnvironmentEdge; 058import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 059import org.apache.hadoop.hbase.util.Threads; 060import org.apache.hadoop.hbase.wal.WAL; 061import org.junit.After; 062import org.junit.Before; 063import org.junit.ClassRule; 064import org.junit.Test; 065import org.junit.experimental.categories.Category; 066import org.mockito.Mockito; 067import org.slf4j.Logger; 068import org.slf4j.LoggerFactory; 069 070/** 071 * compacted memstore test case 072 */ 073@Category({ RegionServerTests.class, MediumTests.class }) 074public class TestCompactingMemStore extends TestDefaultMemStore { 075 076 @ClassRule 077 public static final HBaseClassTestRule CLASS_RULE = 078 HBaseClassTestRule.forClass(TestCompactingMemStore.class); 079 080 private static final Logger LOG = LoggerFactory.getLogger(TestCompactingMemStore.class); 081 protected static ChunkCreator chunkCreator; 082 protected HRegion region; 083 protected RegionServicesForStores regionServicesForStores; 084 protected HStore store; 085 086 ////////////////////////////////////////////////////////////////////////////// 087 // Helpers 088 ////////////////////////////////////////////////////////////////////////////// 089 protected static byte[] makeQualifier(final int i1, final int i2) { 090 return Bytes.toBytes(Integer.toString(i1) + ";" + Integer.toString(i2)); 091 } 092 093 @After 094 public void tearDown() throws Exception { 095 chunkCreator.clearChunksInPool(); 096 } 097 098 @Override 099 @Before 100 public void setUp() throws Exception { 101 compactingSetUp(); 102 this.memstore = new MyCompactingMemStore(HBaseConfiguration.create(), 103 CellComparator.getInstance(), store, regionServicesForStores, MemoryCompactionPolicy.EAGER); 104 ((CompactingMemStore) memstore).setIndexType(CompactingMemStore.IndexType.ARRAY_MAP); 105 } 106 107 protected void compactingSetUp() throws Exception { 108 super.internalSetUp(); 109 Configuration conf = new Configuration(); 110 conf.setBoolean(MemStoreLAB.USEMSLAB_KEY, true); 111 conf.setFloat(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, 0.2f); 112 conf.setInt(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, 1000); 113 HBaseTestingUtil hbaseUtility = new HBaseTestingUtil(conf); 114 ColumnFamilyDescriptor familyDescriptor = ColumnFamilyDescriptorBuilder.of(FAMILY); 115 TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(TableName.valueOf("foobar")) 116 .setColumnFamily(familyDescriptor).build(); 117 RegionInfo info = RegionInfoBuilder.newBuilder(TableName.valueOf("foobar")).build(); 118 WAL wal = HBaseTestingUtil.createWal(conf, hbaseUtility.getDataTestDir(), info); 119 this.region = 120 HRegion.createHRegion(info, hbaseUtility.getDataTestDir(), conf, tableDescriptor, wal, true); 121 this.regionServicesForStores = Mockito.spy(region.getRegionServicesForStores()); 122 ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1); 123 Mockito.when(regionServicesForStores.getInMemoryCompactionPool()).thenReturn(pool); 124 this.store = new HStore(region, familyDescriptor, conf, false); 125 126 long globalMemStoreLimit = 127 (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax() 128 * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false)); 129 chunkCreator = ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 130 globalMemStoreLimit, 0.4f, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null, 131 MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); 132 assertNotNull(chunkCreator); 133 } 134 135 /** 136 * A simple test which flush in memory affect timeOfOldestEdit 137 */ 138 @Test 139 public void testTimeOfOldestEdit() { 140 assertEquals(Long.MAX_VALUE, memstore.timeOfOldestEdit()); 141 final byte[] r = Bytes.toBytes("r"); 142 final byte[] f = Bytes.toBytes("f"); 143 final byte[] q = Bytes.toBytes("q"); 144 final byte[] v = Bytes.toBytes("v"); 145 final KeyValue kv = new KeyValue(r, f, q, v); 146 memstore.add(kv, null); 147 long timeOfOldestEdit = memstore.timeOfOldestEdit(); 148 assertNotEquals(Long.MAX_VALUE, timeOfOldestEdit); 149 150 ((CompactingMemStore) memstore).flushInMemory(); 151 assertEquals(timeOfOldestEdit, memstore.timeOfOldestEdit()); 152 memstore.add(kv, null); 153 assertEquals(timeOfOldestEdit, memstore.timeOfOldestEdit()); 154 memstore.snapshot(); 155 assertEquals(Long.MAX_VALUE, memstore.timeOfOldestEdit()); 156 } 157 158 /** 159 * A simple test which verifies the 3 possible states when scanning across snapshot. 160 */ 161 @Override 162 @Test 163 public void testScanAcrossSnapshot2() throws IOException, CloneNotSupportedException { 164 // we are going to the scanning across snapshot with two kvs 165 // kv1 should always be returned before kv2 166 final byte[] one = Bytes.toBytes(1); 167 final byte[] two = Bytes.toBytes(2); 168 final byte[] f = Bytes.toBytes("f"); 169 final byte[] q = Bytes.toBytes("q"); 170 final byte[] v = Bytes.toBytes(3); 171 172 final KeyValue kv1 = new KeyValue(one, f, q, 10, v); 173 final KeyValue kv2 = new KeyValue(two, f, q, 10, v); 174 175 // use case 1: both kvs in kvset 176 this.memstore.add(kv1.clone(), null); 177 this.memstore.add(kv2.clone(), null); 178 // snapshot is empty,active segment is not empty, 179 // empty segment is skipped. 180 verifyOneScanAcrossSnapshot2(kv1, kv2); 181 182 // use case 2: both kvs in snapshot 183 this.memstore.snapshot(); 184 // active segment is empty,snapshot is not empty, 185 // empty segment is skipped. 186 verifyOneScanAcrossSnapshot2(kv1, kv2); 187 188 // use case 3: first in snapshot second in kvset 189 this.memstore = new CompactingMemStore(HBaseConfiguration.create(), 190 CellComparator.getInstance(), store, regionServicesForStores, MemoryCompactionPolicy.EAGER); 191 this.memstore.add(kv1.clone(), null); 192 // As compaction is starting in the background the repetition 193 // of the k1 might be removed BUT the scanners created earlier 194 // should look on the OLD MutableCellSetSegment, so this should be OK... 195 this.memstore.snapshot(); 196 this.memstore.add(kv2.clone(), null); 197 verifyScanAcrossSnapshot2(kv1, kv2); 198 } 199 200 /** 201 * Test memstore snapshots 202 */ 203 @Override 204 @Test 205 public void testSnapshotting() throws IOException { 206 final int snapshotCount = 5; 207 // Add some rows, run a snapshot. Do it a few times. 208 for (int i = 0; i < snapshotCount; i++) { 209 addRows(this.memstore); 210 runSnapshot(this.memstore, true); 211 assertEquals("History not being cleared", 0, this.memstore.getSnapshot().getCellsCount()); 212 } 213 } 214 215 ////////////////////////////////////////////////////////////////////////////// 216 // Get tests 217 ////////////////////////////////////////////////////////////////////////////// 218 219 /** 220 * Test getNextRow from memstore 221 */ 222 @Override 223 @Test 224 public void testGetNextRow() throws Exception { 225 addRows(this.memstore); 226 // Add more versions to make it a little more interesting. 227 Thread.sleep(1); 228 addRows(this.memstore); 229 Cell closestToEmpty = ((CompactingMemStore) this.memstore).getNextRow(KeyValue.LOWESTKEY); 230 assertTrue(CellComparator.getInstance().compareRows(closestToEmpty, 231 new KeyValue(Bytes.toBytes(0), EnvironmentEdgeManager.currentTime())) == 0); 232 for (int i = 0; i < ROW_COUNT; i++) { 233 Cell nr = ((CompactingMemStore) this.memstore) 234 .getNextRow(new KeyValue(Bytes.toBytes(i), EnvironmentEdgeManager.currentTime())); 235 if (i + 1 == ROW_COUNT) { 236 assertNull(nr); 237 } else { 238 assertTrue(CellComparator.getInstance().compareRows(nr, 239 new KeyValue(Bytes.toBytes(i + 1), EnvironmentEdgeManager.currentTime())) == 0); 240 } 241 } 242 // starting from each row, validate results should contain the starting row 243 Configuration conf = HBaseConfiguration.create(); 244 for (int startRowId = 0; startRowId < ROW_COUNT; startRowId++) { 245 ScanInfo scanInfo = 246 new ScanInfo(conf, FAMILY, 0, 1, Integer.MAX_VALUE, KeepDeletedCells.FALSE, 247 HConstants.DEFAULT_BLOCKSIZE, 0, this.memstore.getComparator(), false); 248 try (InternalScanner scanner = 249 new StoreScanner(new Scan().withStartRow(Bytes.toBytes(startRowId)), scanInfo, null, 250 memstore.getScanners(0))) { 251 List<Cell> results = new ArrayList<>(); 252 for (int i = 0; scanner.next(results); i++) { 253 int rowId = startRowId + i; 254 Cell left = results.get(0); 255 byte[] row1 = Bytes.toBytes(rowId); 256 assertTrue("Row name", 257 CellComparator.getInstance().compareRows(left, row1, 0, row1.length) == 0); 258 assertEquals("Count of columns", QUALIFIER_COUNT, results.size()); 259 List<Cell> row = new ArrayList<>(); 260 for (Cell kv : results) { 261 row.add(kv); 262 } 263 isExpectedRowWithoutTimestamps(rowId, row); 264 // Clear out set. Otherwise row results accumulate. 265 results.clear(); 266 } 267 } 268 } 269 } 270 271 @Override 272 @Test 273 public void testGet_memstoreAndSnapShot() throws IOException { 274 byte[] row = Bytes.toBytes("testrow"); 275 byte[] fam = Bytes.toBytes("testfamily"); 276 byte[] qf1 = Bytes.toBytes("testqualifier1"); 277 byte[] qf2 = Bytes.toBytes("testqualifier2"); 278 byte[] qf3 = Bytes.toBytes("testqualifier3"); 279 byte[] qf4 = Bytes.toBytes("testqualifier4"); 280 byte[] qf5 = Bytes.toBytes("testqualifier5"); 281 byte[] val = Bytes.toBytes("testval"); 282 283 // Setting up memstore 284 memstore.add(new KeyValue(row, fam, qf1, val), null); 285 memstore.add(new KeyValue(row, fam, qf2, val), null); 286 memstore.add(new KeyValue(row, fam, qf3, val), null); 287 // Pushing to pipeline 288 ((CompactingMemStore) memstore).flushInMemory(); 289 assertEquals(0, memstore.getSnapshot().getCellsCount()); 290 // Creating a snapshot 291 memstore.snapshot(); 292 assertEquals(3, memstore.getSnapshot().getCellsCount()); 293 // Adding value to "new" memstore 294 assertEquals(0, memstore.getActive().getCellsCount()); 295 memstore.add(new KeyValue(row, fam, qf4, val), null); 296 memstore.add(new KeyValue(row, fam, qf5, val), null); 297 assertEquals(2, memstore.getActive().getCellsCount()); 298 } 299 300 //////////////////////////////////// 301 // Test for periodic memstore flushes 302 // based on time of oldest edit 303 //////////////////////////////////// 304 305 /** 306 * Add keyvalues with a fixed memstoreTs, and checks that memstore size is decreased as older 307 * keyvalues are deleted from the memstore. 308 */ 309 @Override 310 @Test 311 public void testUpsertMemstoreSize() throws Exception { 312 MemStoreSize oldSize = memstore.size(); 313 314 List<Cell> l = new ArrayList<>(); 315 KeyValue kv1 = KeyValueTestUtil.create("r", "f", "q", 100, "v"); 316 KeyValue kv2 = KeyValueTestUtil.create("r", "f", "q", 101, "v"); 317 KeyValue kv3 = KeyValueTestUtil.create("r", "f", "q", 102, "v"); 318 319 kv1.setSequenceId(1); 320 kv2.setSequenceId(1); 321 kv3.setSequenceId(1); 322 l.add(kv1); 323 l.add(kv2); 324 l.add(kv3); 325 326 this.memstore.upsert(l, 2, null);// readpoint is 2 327 MemStoreSize newSize = this.memstore.size(); 328 assert (newSize.getDataSize() > oldSize.getDataSize()); 329 // The kv1 should be removed. 330 assert (memstore.getActive().getCellsCount() == 2); 331 332 KeyValue kv4 = KeyValueTestUtil.create("r", "f", "q", 104, "v"); 333 kv4.setSequenceId(1); 334 l.clear(); 335 l.add(kv4); 336 this.memstore.upsert(l, 3, null); 337 assertEquals(newSize, this.memstore.size()); 338 // The kv2 should be removed. 339 assert (memstore.getActive().getCellsCount() == 2); 340 // this.memstore = null; 341 } 342 343 /** 344 * Tests that the timeOfOldestEdit is updated correctly for the various edit operations in 345 * memstore. 346 */ 347 @Override 348 @Test 349 public void testUpdateToTimeOfOldestEdit() throws Exception { 350 try { 351 EnvironmentEdgeForMemstoreTest edge = new EnvironmentEdgeForMemstoreTest(); 352 EnvironmentEdgeManager.injectEdge(edge); 353 long t = memstore.timeOfOldestEdit(); 354 assertEquals(Long.MAX_VALUE, t); 355 356 // test the case that the timeOfOldestEdit is updated after a KV add 357 memstore.add(KeyValueTestUtil.create("r", "f", "q", 100, "v"), null); 358 t = memstore.timeOfOldestEdit(); 359 assertTrue(t == 1234); 360 // The method will also assert 361 // the value is reset to Long.MAX_VALUE 362 t = runSnapshot(memstore, true); 363 364 // test the case that the timeOfOldestEdit is updated after a KV delete 365 memstore.add(KeyValueTestUtil.create("r", "f", "q", 100, KeyValue.Type.Delete, "v"), null); 366 t = memstore.timeOfOldestEdit(); 367 assertTrue(t == 1234); 368 t = runSnapshot(memstore, true); 369 370 // test the case that the timeOfOldestEdit is updated after a KV upsert 371 List<Cell> l = new ArrayList<>(); 372 KeyValue kv1 = KeyValueTestUtil.create("r", "f", "q", 100, "v"); 373 kv1.setSequenceId(100); 374 l.add(kv1); 375 memstore.upsert(l, 1000, null); 376 t = memstore.timeOfOldestEdit(); 377 assertTrue(t == 1234); 378 } finally { 379 EnvironmentEdgeManager.reset(); 380 } 381 } 382 383 private long runSnapshot(final AbstractMemStore hmc, boolean useForce) throws IOException { 384 // Save off old state. 385 long oldHistorySize = hmc.getSnapshot().getDataSize(); 386 long prevTimeStamp = hmc.timeOfOldestEdit(); 387 388 hmc.snapshot(); 389 MemStoreSnapshot snapshot = hmc.snapshot(); 390 if (useForce) { 391 // Make some assertions about what just happened. 392 assertTrue("History size has not increased", oldHistorySize < snapshot.getDataSize()); 393 long t = hmc.timeOfOldestEdit(); 394 assertTrue("Time of oldest edit is not Long.MAX_VALUE", t == Long.MAX_VALUE); 395 hmc.clearSnapshot(snapshot.getId()); 396 } else { 397 long t = hmc.timeOfOldestEdit(); 398 assertTrue("Time of oldest edit didn't remain the same", t == prevTimeStamp); 399 } 400 return prevTimeStamp; 401 } 402 403 private void isExpectedRowWithoutTimestamps(final int rowIndex, List<Cell> kvs) { 404 int i = 0; 405 for (Cell kv : kvs) { 406 byte[] expectedColname = makeQualifier(rowIndex, i++); 407 assertTrue("Column name", CellUtil.matchingQualifier(kv, expectedColname)); 408 // Value is column name as bytes. Usually result is 409 // 100 bytes in size at least. This is the default size 410 // for BytesWriteable. For comparison, convert bytes to 411 // String and trim to remove trailing null bytes. 412 assertTrue("Content", CellUtil.matchingValue(kv, expectedColname)); 413 } 414 } 415 416 @Test 417 public void testPuttingBackChunksAfterFlushing() throws IOException { 418 byte[] row = Bytes.toBytes("testrow"); 419 byte[] fam = Bytes.toBytes("testfamily"); 420 byte[] qf1 = Bytes.toBytes("testqualifier1"); 421 byte[] qf2 = Bytes.toBytes("testqualifier2"); 422 byte[] qf3 = Bytes.toBytes("testqualifier3"); 423 byte[] qf4 = Bytes.toBytes("testqualifier4"); 424 byte[] qf5 = Bytes.toBytes("testqualifier5"); 425 byte[] val = Bytes.toBytes("testval"); 426 427 // Setting up memstore 428 memstore.add(new KeyValue(row, fam, qf1, val), null); 429 memstore.add(new KeyValue(row, fam, qf2, val), null); 430 memstore.add(new KeyValue(row, fam, qf3, val), null); 431 432 // Creating a snapshot 433 MemStoreSnapshot snapshot = memstore.snapshot(); 434 assertEquals(3, memstore.getSnapshot().getCellsCount()); 435 436 // Adding value to "new" memstore 437 assertEquals(0, memstore.getActive().getCellsCount()); 438 memstore.add(new KeyValue(row, fam, qf4, val), null); 439 memstore.add(new KeyValue(row, fam, qf5, val), null); 440 assertEquals(2, memstore.getActive().getCellsCount()); 441 // close the scanners 442 for (KeyValueScanner scanner : snapshot.getScanners()) { 443 scanner.close(); 444 } 445 memstore.clearSnapshot(snapshot.getId()); 446 447 int chunkCount = chunkCreator.getPoolSize(); 448 assertTrue(chunkCount > 0); 449 450 } 451 452 @Test 453 public void testPuttingBackChunksWithOpeningScanner() throws IOException { 454 byte[] row = Bytes.toBytes("testrow"); 455 byte[] fam = Bytes.toBytes("testfamily"); 456 byte[] qf1 = Bytes.toBytes("testqualifier1"); 457 byte[] qf2 = Bytes.toBytes("testqualifier2"); 458 byte[] qf3 = Bytes.toBytes("testqualifier3"); 459 byte[] qf4 = Bytes.toBytes("testqualifier4"); 460 byte[] qf5 = Bytes.toBytes("testqualifier5"); 461 byte[] qf6 = Bytes.toBytes("testqualifier6"); 462 byte[] qf7 = Bytes.toBytes("testqualifier7"); 463 byte[] val = Bytes.toBytes("testval"); 464 465 // Setting up memstore 466 memstore.add(new KeyValue(row, fam, qf1, val), null); 467 memstore.add(new KeyValue(row, fam, qf2, val), null); 468 memstore.add(new KeyValue(row, fam, qf3, val), null); 469 470 // Creating a snapshot 471 MemStoreSnapshot snapshot = memstore.snapshot(); 472 assertEquals(3, memstore.getSnapshot().getCellsCount()); 473 474 // Adding value to "new" memstore 475 assertEquals(0, memstore.getActive().getCellsCount()); 476 memstore.add(new KeyValue(row, fam, qf4, val), null); 477 memstore.add(new KeyValue(row, fam, qf5, val), null); 478 assertEquals(2, memstore.getActive().getCellsCount()); 479 480 // opening scanner before clear the snapshot 481 List<KeyValueScanner> scanners = memstore.getScanners(0); 482 // Shouldn't putting back the chunks to pool,since some scanners are opening 483 // based on their data 484 // close the scanners 485 for (KeyValueScanner scanner : snapshot.getScanners()) { 486 scanner.close(); 487 } 488 memstore.clearSnapshot(snapshot.getId()); 489 490 assertTrue(chunkCreator.getPoolSize() == 0); 491 492 // Chunks will be put back to pool after close scanners; 493 for (KeyValueScanner scanner : scanners) { 494 scanner.close(); 495 } 496 assertTrue(chunkCreator.getPoolSize() > 0); 497 498 // clear chunks 499 chunkCreator.clearChunksInPool(); 500 501 // Creating another snapshot 502 503 snapshot = memstore.snapshot(); 504 // Adding more value 505 memstore.add(new KeyValue(row, fam, qf6, val), null); 506 memstore.add(new KeyValue(row, fam, qf7, val), null); 507 // opening scanners 508 scanners = memstore.getScanners(0); 509 // close scanners before clear the snapshot 510 for (KeyValueScanner scanner : scanners) { 511 scanner.close(); 512 } 513 // Since no opening scanner, the chunks of snapshot should be put back to 514 // pool 515 // close the scanners 516 for (KeyValueScanner scanner : snapshot.getScanners()) { 517 scanner.close(); 518 } 519 memstore.clearSnapshot(snapshot.getId()); 520 assertTrue(chunkCreator.getPoolSize() > 0); 521 } 522 523 @Test 524 public void testPuttingBackChunksWithOpeningPipelineScanner() throws IOException { 525 526 // set memstore to do data compaction and not to use the speculative scan 527 MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.EAGER; 528 memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, 529 String.valueOf(compactionType)); 530 ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration()); 531 532 byte[] row = Bytes.toBytes("testrow"); 533 byte[] fam = Bytes.toBytes("testfamily"); 534 byte[] qf1 = Bytes.toBytes("testqualifier1"); 535 byte[] qf2 = Bytes.toBytes("testqualifier2"); 536 byte[] qf3 = Bytes.toBytes("testqualifier3"); 537 byte[] val = Bytes.toBytes("testval"); 538 539 // Setting up memstore 540 memstore.add(new KeyValue(row, fam, qf1, 1, val), null); 541 memstore.add(new KeyValue(row, fam, qf2, 1, val), null); 542 memstore.add(new KeyValue(row, fam, qf3, 1, val), null); 543 544 // Creating a pipeline 545 ((MyCompactingMemStore) memstore).disableCompaction(); 546 ((CompactingMemStore) memstore).flushInMemory(); 547 548 // Adding value to "new" memstore 549 assertEquals(0, memstore.getActive().getCellsCount()); 550 memstore.add(new KeyValue(row, fam, qf1, 2, val), null); 551 memstore.add(new KeyValue(row, fam, qf2, 2, val), null); 552 assertEquals(2, memstore.getActive().getCellsCount()); 553 554 // pipeline bucket 2 555 ((CompactingMemStore) memstore).flushInMemory(); 556 // opening scanner before force flushing 557 List<KeyValueScanner> scanners = memstore.getScanners(0); 558 // Shouldn't putting back the chunks to pool,since some scanners are opening 559 // based on their data 560 ((MyCompactingMemStore) memstore).enableCompaction(); 561 // trigger compaction 562 ((CompactingMemStore) memstore).flushInMemory(); 563 564 // Adding value to "new" memstore 565 assertEquals(0, memstore.getActive().getCellsCount()); 566 memstore.add(new KeyValue(row, fam, qf3, 3, val), null); 567 memstore.add(new KeyValue(row, fam, qf2, 3, val), null); 568 memstore.add(new KeyValue(row, fam, qf1, 3, val), null); 569 assertEquals(3, memstore.getActive().getCellsCount()); 570 571 assertTrue(chunkCreator.getPoolSize() == 0); 572 573 // Chunks will be put back to pool after close scanners; 574 for (KeyValueScanner scanner : scanners) { 575 scanner.close(); 576 } 577 assertTrue(chunkCreator.getPoolSize() > 0); 578 579 // clear chunks 580 chunkCreator.clearChunksInPool(); 581 582 // Creating another snapshot 583 584 MemStoreSnapshot snapshot = memstore.snapshot(); 585 // close the scanners 586 for (KeyValueScanner scanner : snapshot.getScanners()) { 587 scanner.close(); 588 } 589 memstore.clearSnapshot(snapshot.getId()); 590 591 snapshot = memstore.snapshot(); 592 // Adding more value 593 memstore.add(new KeyValue(row, fam, qf2, 4, val), null); 594 memstore.add(new KeyValue(row, fam, qf3, 4, val), null); 595 // opening scanners 596 scanners = memstore.getScanners(0); 597 // close scanners before clear the snapshot 598 for (KeyValueScanner scanner : scanners) { 599 scanner.close(); 600 } 601 // Since no opening scanner, the chunks of snapshot should be put back to 602 // pool 603 // close the scanners 604 for (KeyValueScanner scanner : snapshot.getScanners()) { 605 scanner.close(); 606 } 607 memstore.clearSnapshot(snapshot.getId()); 608 assertTrue(chunkCreator.getPoolSize() > 0); 609 } 610 611 ////////////////////////////////////////////////////////////////////////////// 612 // Compaction tests 613 ////////////////////////////////////////////////////////////////////////////// 614 @Test 615 public void testCompaction1Bucket() throws IOException { 616 617 // set memstore to do basic structure flattening, the "eager" option is tested in 618 // TestCompactingToCellFlatMapMemStore 619 MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC; 620 memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, 621 String.valueOf(compactionType)); 622 ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration()); 623 624 String[] keys1 = { "A", "A", "B", "C" }; // A1, A2, B3, C4 625 626 // test 1 bucket 627 int totalCellsLen = addRowsByKeys(memstore, keys1); 628 int oneCellOnCSLMHeapSize = 120; 629 int oneCellOnCAHeapSize = 88; 630 long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 4 * oneCellOnCSLMHeapSize; 631 assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize()); 632 assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize()); 633 634 ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact 635 assertEquals(0, memstore.getSnapshot().getCellsCount()); 636 // There is no compaction, as the compacting memstore type is basic. 637 // totalCellsLen remains the same 638 totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM 639 + 4 * oneCellOnCAHeapSize; 640 assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize()); 641 assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize()); 642 643 MemStoreSize mss = memstore.getFlushableSize(); 644 MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot 645 // simulate flusher 646 region.decrMemStoreSize(mss); 647 ImmutableSegment s = memstore.getSnapshot(); 648 assertEquals(4, s.getCellsCount()); 649 assertEquals(0, regionServicesForStores.getMemStoreSize()); 650 651 memstore.clearSnapshot(snapshot.getId()); 652 } 653 654 @Test 655 public void testCompaction2Buckets() throws IOException { 656 657 // set memstore to do basic structure flattening, the "eager" option is tested in 658 // TestCompactingToCellFlatMapMemStore 659 MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC; 660 memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, 661 String.valueOf(compactionType)); 662 memstore.getConfiguration().set(MemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY, 663 String.valueOf(1)); 664 ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration()); 665 String[] keys1 = { "A", "A", "B", "C" }; 666 String[] keys2 = { "A", "B", "D" }; 667 668 int totalCellsLen1 = addRowsByKeys(memstore, keys1); 669 int oneCellOnCSLMHeapSize = 120; 670 int oneCellOnCAHeapSize = 88; 671 long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 4 * oneCellOnCSLMHeapSize; 672 673 assertEquals(totalCellsLen1, regionServicesForStores.getMemStoreSize()); 674 assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize()); 675 676 ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact 677 int counter = 0; 678 for (Segment s : memstore.getSegments()) { 679 counter += s.getCellsCount(); 680 } 681 assertEquals(4, counter); 682 assertEquals(0, memstore.getSnapshot().getCellsCount()); 683 // There is no compaction, as the compacting memstore type is basic. 684 // totalCellsLen remains the same 685 assertEquals(totalCellsLen1, regionServicesForStores.getMemStoreSize()); 686 totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM 687 + 4 * oneCellOnCAHeapSize; 688 assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize()); 689 690 int totalCellsLen2 = addRowsByKeys(memstore, keys2); 691 totalHeapSize += 3 * oneCellOnCSLMHeapSize; 692 assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize()); 693 assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize()); 694 695 MemStoreSize mss = memstore.getFlushableSize(); 696 ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact 697 assertEquals(0, memstore.getSnapshot().getCellsCount()); 698 assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize()); 699 totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM 700 + 7 * oneCellOnCAHeapSize; 701 assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize()); 702 703 mss = memstore.getFlushableSize(); 704 MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot 705 // simulate flusher 706 region.decrMemStoreSize(mss); 707 ImmutableSegment s = memstore.getSnapshot(); 708 assertEquals(7, s.getCellsCount()); 709 assertEquals(0, regionServicesForStores.getMemStoreSize()); 710 711 memstore.clearSnapshot(snapshot.getId()); 712 } 713 714 @Test 715 public void testCompaction3Buckets() throws IOException { 716 717 // set memstore to do data compaction and not to use the speculative scan 718 MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.EAGER; 719 memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, 720 String.valueOf(compactionType)); 721 ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration()); 722 String[] keys1 = { "A", "A", "B", "C" }; 723 String[] keys2 = { "A", "B", "D" }; 724 String[] keys3 = { "D", "B", "B" }; 725 726 int totalCellsLen1 = addRowsByKeys(memstore, keys1);// Adding 4 cells. 727 int oneCellOnCSLMHeapSize = 120; 728 int oneCellOnCAHeapSize = 88; 729 assertEquals(totalCellsLen1, region.getMemStoreDataSize()); 730 long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 4 * oneCellOnCSLMHeapSize; 731 assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize()); 732 ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact 733 734 assertEquals(0, memstore.getSnapshot().getCellsCount()); 735 // One cell is duplicated and the compaction will remove it. All cells of same time so adjusting 736 // totalCellsLen 737 totalCellsLen1 = (totalCellsLen1 * 3) / 4; 738 assertEquals(totalCellsLen1, regionServicesForStores.getMemStoreSize()); 739 // In memory flush to make a CellArrayMap instead of CSLM. See the overhead diff. 740 totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM 741 + 3 * oneCellOnCAHeapSize; 742 assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize()); 743 744 int totalCellsLen2 = addRowsByKeys(memstore, keys2);// Adding 3 more cells. 745 long totalHeapSize2 = totalHeapSize + 3 * oneCellOnCSLMHeapSize; 746 747 assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize()); 748 assertEquals(totalHeapSize2, ((CompactingMemStore) memstore).heapSize()); 749 750 ((MyCompactingMemStore) memstore).disableCompaction(); 751 MemStoreSize mss = memstore.getFlushableSize(); 752 ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline without compaction 753 assertEquals(0, memstore.getSnapshot().getCellsCount()); 754 // No change in the cells data size. ie. memstore size. as there is no compaction. 755 assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize()); 756 assertEquals(totalHeapSize2 + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM, 757 ((CompactingMemStore) memstore).heapSize()); 758 759 int totalCellsLen3 = addRowsByKeys(memstore, keys3);// 3 more cells added 760 assertEquals(totalCellsLen1 + totalCellsLen2 + totalCellsLen3, 761 regionServicesForStores.getMemStoreSize()); 762 long totalHeapSize3 = 763 totalHeapSize2 + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM + 3 * oneCellOnCSLMHeapSize; 764 assertEquals(totalHeapSize3, ((CompactingMemStore) memstore).heapSize()); 765 766 ((MyCompactingMemStore) memstore).enableCompaction(); 767 mss = memstore.getFlushableSize(); 768 ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact 769 assertEquals(0, memstore.getSnapshot().getCellsCount()); 770 // active flushed to pipeline and all 3 segments compacted. Will get rid of duplicated cells. 771 // Out of total 10, only 4 cells are unique 772 totalCellsLen2 = totalCellsLen2 / 3;// 2 out of 3 cells are duplicated 773 totalCellsLen3 = 0;// All duplicated cells. 774 assertEquals(totalCellsLen1 + totalCellsLen2 + totalCellsLen3, 775 regionServicesForStores.getMemStoreSize()); 776 // Only 4 unique cells left 777 assertEquals(4 * oneCellOnCAHeapSize + MutableSegment.DEEP_OVERHEAD 778 + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM, ((CompactingMemStore) memstore).heapSize()); 779 780 mss = memstore.getFlushableSize(); 781 MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot 782 // simulate flusher 783 region.decrMemStoreSize(mss); 784 ImmutableSegment s = memstore.getSnapshot(); 785 assertEquals(4, s.getCellsCount()); 786 assertEquals(0, regionServicesForStores.getMemStoreSize()); 787 788 memstore.clearSnapshot(snapshot.getId()); 789 } 790 791 @Test 792 public void testMagicCompaction3Buckets() throws IOException { 793 794 MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.ADAPTIVE; 795 memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, 796 String.valueOf(compactionType)); 797 memstore.getConfiguration() 798 .setDouble(AdaptiveMemStoreCompactionStrategy.ADAPTIVE_COMPACTION_THRESHOLD_KEY, 0.45); 799 memstore.getConfiguration() 800 .setInt(AdaptiveMemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY, 2); 801 memstore.getConfiguration().setInt(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 1); 802 ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration()); 803 804 String[] keys1 = { "A", "B", "D" }; 805 String[] keys2 = { "A" }; 806 String[] keys3 = { "A", "A", "B", "C" }; 807 String[] keys4 = { "D", "B", "B" }; 808 809 int totalCellsLen1 = addRowsByKeys(memstore, keys1);// Adding 3 cells. 810 int oneCellOnCSLMHeapSize = 120; 811 assertEquals(totalCellsLen1, region.getMemStoreDataSize()); 812 long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 3 * oneCellOnCSLMHeapSize; 813 assertEquals(totalHeapSize, memstore.heapSize()); 814 815 ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline - flatten 816 assertEquals(3, ((CompactingMemStore) memstore).getImmutableSegments().getNumOfCells()); 817 assertEquals(1.0, 818 ((CompactingMemStore) memstore).getImmutableSegments().getEstimatedUniquesFrac(), 0); 819 assertEquals(0, memstore.getSnapshot().getCellsCount()); 820 821 addRowsByKeys(memstore, keys2);// Adding 1 more cell - flatten. 822 ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline without compaction 823 assertEquals(4, ((CompactingMemStore) memstore).getImmutableSegments().getNumOfCells()); 824 assertEquals(1.0, 825 ((CompactingMemStore) memstore).getImmutableSegments().getEstimatedUniquesFrac(), 0); 826 assertEquals(0, memstore.getSnapshot().getCellsCount()); 827 828 addRowsByKeys(memstore, keys3);// Adding 4 more cells - merge. 829 ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline without compaction 830 assertEquals(8, ((CompactingMemStore) memstore).getImmutableSegments().getNumOfCells()); 831 assertEquals((4.0 / 8.0), 832 ((CompactingMemStore) memstore).getImmutableSegments().getEstimatedUniquesFrac(), 0); 833 assertEquals(0, memstore.getSnapshot().getCellsCount()); 834 835 addRowsByKeys(memstore, keys4);// 3 more cells added - compact (or not) 836 ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact 837 int numCells = ((CompactingMemStore) memstore).getImmutableSegments().getNumOfCells(); 838 assertTrue(4 == numCells || 11 == numCells); 839 assertEquals(0, memstore.getSnapshot().getCellsCount()); 840 841 MemStoreSize mss = memstore.getFlushableSize(); 842 MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot 843 // simulate flusher 844 region.decrMemStoreSize(mss); 845 ImmutableSegment s = memstore.getSnapshot(); 846 numCells = s.getCellsCount(); 847 assertTrue(4 == numCells || 11 == numCells); 848 assertEquals(0, regionServicesForStores.getMemStoreSize()); 849 850 memstore.clearSnapshot(snapshot.getId()); 851 } 852 853 protected int addRowsByKeys(final AbstractMemStore hmc, String[] keys) { 854 byte[] fam = Bytes.toBytes("testfamily"); 855 byte[] qf = Bytes.toBytes("testqualifier"); 856 long size = hmc.getActive().getDataSize(); 857 long heapOverhead = hmc.getActive().getHeapSize(); 858 int cellsCount = hmc.getActive().getCellsCount(); 859 int totalLen = 0; 860 for (int i = 0; i < keys.length; i++) { 861 long timestamp = EnvironmentEdgeManager.currentTime(); 862 Threads.sleep(1); // to make sure each kv gets a different ts 863 byte[] row = Bytes.toBytes(keys[i]); 864 byte[] val = Bytes.toBytes(keys[i] + i); 865 KeyValue kv = new KeyValue(row, fam, qf, timestamp, val); 866 totalLen += Segment.getCellLength(kv); 867 hmc.add(kv, null); 868 LOG.debug("added kv: " + kv.getKeyString() + ", timestamp:" + kv.getTimestamp()); 869 } 870 regionServicesForStores.addMemStoreSize(hmc.getActive().getDataSize() - size, 871 hmc.getActive().getHeapSize() - heapOverhead, 0, 872 hmc.getActive().getCellsCount() - cellsCount); 873 return totalLen; 874 } 875 876 // for controlling the val size when adding a new cell 877 protected int addRowsByKeys(final AbstractMemStore hmc, String[] keys, byte[] val) { 878 byte[] fam = Bytes.toBytes("testfamily"); 879 byte[] qf = Bytes.toBytes("testqualifier"); 880 long size = hmc.getActive().getDataSize(); 881 long heapOverhead = hmc.getActive().getHeapSize(); 882 int cellsCount = hmc.getActive().getCellsCount(); 883 int totalLen = 0; 884 for (int i = 0; i < keys.length; i++) { 885 long timestamp = EnvironmentEdgeManager.currentTime(); 886 Threads.sleep(1); // to make sure each kv gets a different ts 887 byte[] row = Bytes.toBytes(keys[i]); 888 KeyValue kv = new KeyValue(row, fam, qf, timestamp, val); 889 totalLen += Segment.getCellLength(kv); 890 hmc.add(kv, null); 891 LOG.debug("added kv: " + kv.getKeyString() + ", timestamp:" + kv.getTimestamp()); 892 } 893 regionServicesForStores.addMemStoreSize(hmc.getActive().getDataSize() - size, 894 hmc.getActive().getHeapSize() - heapOverhead, 0, cellsCount); 895 return totalLen; 896 } 897 898 private class EnvironmentEdgeForMemstoreTest implements EnvironmentEdge { 899 long t = 1234; 900 901 @Override 902 public long currentTime() { 903 return t; 904 } 905 } 906 907 static protected class MyCompactingMemStore extends CompactingMemStore { 908 909 public MyCompactingMemStore(Configuration conf, CellComparator c, HStore store, 910 RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy) 911 throws IOException { 912 super(conf, c, store, regionServices, compactionPolicy); 913 } 914 915 void disableCompaction() { 916 allowCompaction.set(false); 917 } 918 919 void enableCompaction() { 920 allowCompaction.set(true); 921 } 922 923 void initiateType(MemoryCompactionPolicy compactionType, Configuration conf) 924 throws IllegalArgumentIOException { 925 compactor.initiateCompactionStrategy(compactionType, conf, "CF_TEST"); 926 } 927 928 } 929}