001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.util.ArrayList; 025import java.util.List; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.hbase.CellComparatorImpl; 028import org.apache.hadoop.hbase.HBaseClassTestRule; 029import org.apache.hadoop.hbase.HBaseConfiguration; 030import org.apache.hadoop.hbase.KeyValue; 031import org.apache.hadoop.hbase.KeyValueUtil; 032import org.apache.hadoop.hbase.MemoryCompactionPolicy; 033import org.apache.hadoop.hbase.testclassification.MediumTests; 034import org.apache.hadoop.hbase.testclassification.RegionServerTests; 035import org.apache.hadoop.hbase.util.Bytes; 036import org.apache.hadoop.hbase.util.ClassSize; 037import org.apache.hadoop.hbase.util.Threads; 038import org.junit.ClassRule; 039import org.junit.Test; 040import org.junit.experimental.categories.Category; 041import org.junit.runner.RunWith; 042import org.junit.runners.Parameterized; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046/** 047 * compacted memstore test case 048 */ 049@Category({RegionServerTests.class, MediumTests.class}) 050@RunWith(Parameterized.class) 051public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore { 052 053 @ClassRule 054 public static final HBaseClassTestRule CLASS_RULE = 055 HBaseClassTestRule.forClass(TestCompactingToCellFlatMapMemStore.class); 056 057 @Parameterized.Parameters 058 public static Object[] data() { 059 return new Object[] { "CHUNK_MAP", "ARRAY_MAP" }; // test different immutable indexes 060 } 061 private static final Logger LOG = 062 LoggerFactory.getLogger(TestCompactingToCellFlatMapMemStore.class); 063 public final boolean toCellChunkMap; 064 Configuration conf; 065 ////////////////////////////////////////////////////////////////////////////// 066 // Helpers 067 ////////////////////////////////////////////////////////////////////////////// 068 public TestCompactingToCellFlatMapMemStore(String type){ 069 if (type == "CHUNK_MAP") { 070 toCellChunkMap = true; 071 } else { 072 toCellChunkMap = false; 073 } 074 } 075 076 @Override public void tearDown() throws Exception { 077 chunkCreator.clearChunksInPool(); 078 } 079 080 @Override public void setUp() throws Exception { 081 082 compactingSetUp(); 083 this.conf = HBaseConfiguration.create(); 084 085 // set memstore to do data compaction 086 conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, 087 String.valueOf(MemoryCompactionPolicy.EAGER)); 088 conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.02); 089 this.memstore = 090 new MyCompactingMemStore(conf, CellComparatorImpl.COMPARATOR, store, 091 regionServicesForStores, MemoryCompactionPolicy.EAGER); 092 } 093 094 ////////////////////////////////////////////////////////////////////////////// 095 // Compaction tests 096 ////////////////////////////////////////////////////////////////////////////// 097 @Override 098 public void testCompaction1Bucket() throws IOException { 099 int counter = 0; 100 String[] keys1 = { "A", "A", "B", "C" }; //A1, A2, B3, C4 101 if (toCellChunkMap) { 102 // set memstore to flat into CellChunkMap 103 ((CompactingMemStore)memstore).setIndexType(CompactingMemStore.IndexType.CHUNK_MAP); 104 } else { 105 ((CompactingMemStore)memstore).setIndexType(CompactingMemStore.IndexType.ARRAY_MAP); 106 } 107 108 // test 1 bucket 109 long totalCellsLen = addRowsByKeysDataSize(memstore, keys1); 110 long cellBeforeFlushSize = cellBeforeFlushSize(); 111 long cellAfterFlushSize = cellAfterFlushSize(); 112 long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 4 * cellBeforeFlushSize; 113 114 assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize()); 115 assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize()); 116 117 assertEquals(4, memstore.getActive().getCellsCount()); 118 ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact 119 assertEquals(0, memstore.getSnapshot().getCellsCount()); 120 // One cell is duplicated and the compaction will remove it. All cells of same size so adjusting 121 // totalCellsLen 122 totalCellsLen = (totalCellsLen * 3) / 4; 123 assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize()); 124 125 totalHeapSize = 126 3 * cellAfterFlushSize + MutableSegment.DEEP_OVERHEAD 127 + (toCellChunkMap ? 128 CellChunkImmutableSegment.DEEP_OVERHEAD_CCM : 129 CellArrayImmutableSegment.DEEP_OVERHEAD_CAM); 130 assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize()); 131 for ( Segment s : memstore.getSegments()) { 132 counter += s.getCellsCount(); 133 } 134 assertEquals(3, counter); 135 MemStoreSize mss = memstore.getFlushableSize(); 136 MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot 137 region.decrMemStoreSize(mss); // simulate flusher 138 ImmutableSegment s = memstore.getSnapshot(); 139 assertEquals(3, s.getCellsCount()); 140 assertEquals(0, regionServicesForStores.getMemStoreSize()); 141 142 memstore.clearSnapshot(snapshot.getId()); 143 } 144 145 @Override 146 public void testCompaction2Buckets() throws IOException { 147 if (toCellChunkMap) { 148 // set memstore to flat into CellChunkMap 149 ((CompactingMemStore)memstore).setIndexType(CompactingMemStore.IndexType.CHUNK_MAP); 150 } else { 151 ((CompactingMemStore)memstore).setIndexType(CompactingMemStore.IndexType.ARRAY_MAP); 152 } 153 String[] keys1 = { "A", "A", "B", "C" }; 154 String[] keys2 = { "A", "B", "D" }; 155 156 long totalCellsLen1 = addRowsByKeysDataSize(memstore, keys1); // INSERT 4 157 long cellBeforeFlushSize = cellBeforeFlushSize(); 158 long cellAfterFlushSize = cellAfterFlushSize(); 159 long totalHeapSize1 = MutableSegment.DEEP_OVERHEAD + 4 * cellBeforeFlushSize; 160 assertEquals(totalCellsLen1, regionServicesForStores.getMemStoreSize()); 161 assertEquals(totalHeapSize1, ((CompactingMemStore) memstore).heapSize()); 162 163 ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact 164 int counter = 0; // COMPACT 4->3 165 for ( Segment s : memstore.getSegments()) { 166 counter += s.getCellsCount(); 167 } 168 assertEquals(3,counter); 169 assertEquals(0, memstore.getSnapshot().getCellsCount()); 170 // One cell is duplicated and the compaction will remove it. All cells of same size so adjusting 171 // totalCellsLen 172 totalCellsLen1 = (totalCellsLen1 * 3) / 4; 173 totalHeapSize1 = 3 * cellAfterFlushSize + MutableSegment.DEEP_OVERHEAD 174 + (toCellChunkMap ? 175 CellChunkImmutableSegment.DEEP_OVERHEAD_CCM : 176 CellArrayImmutableSegment.DEEP_OVERHEAD_CAM); 177 assertEquals(totalCellsLen1, regionServicesForStores.getMemStoreSize()); 178 assertEquals(totalHeapSize1, ((CompactingMemStore) memstore).heapSize()); 179 180 long totalCellsLen2 = addRowsByKeysDataSize(memstore, keys2); // INSERT 3 (3+3=6) 181 long totalHeapSize2 = 3 * cellBeforeFlushSize; 182 assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize()); 183 assertEquals(totalHeapSize1 + totalHeapSize2, ((CompactingMemStore) memstore).heapSize()); 184 185 ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact 186 assertEquals(0, memstore.getSnapshot().getCellsCount());// COMPACT 6->4 187 counter = 0; 188 for ( Segment s : memstore.getSegments()) { 189 counter += s.getCellsCount(); 190 } 191 assertEquals(4,counter); 192 totalCellsLen2 = totalCellsLen2 / 3;// 2 cells duplicated in set 2 193 assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize()); 194 totalHeapSize2 = 1 * cellAfterFlushSize; 195 assertEquals(totalHeapSize1 + totalHeapSize2, ((CompactingMemStore) memstore).heapSize()); 196 197 MemStoreSize mss = memstore.getFlushableSize(); 198 MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot 199 // simulate flusher 200 region.decrMemStoreSize(mss); 201 ImmutableSegment s = memstore.getSnapshot(); 202 assertEquals(4, s.getCellsCount()); 203 assertEquals(0, regionServicesForStores.getMemStoreSize()); 204 205 memstore.clearSnapshot(snapshot.getId()); 206 } 207 208 @Override 209 public void testCompaction3Buckets() throws IOException { 210 if (toCellChunkMap) { 211 // set memstore to flat into CellChunkMap 212 ((CompactingMemStore)memstore).setIndexType(CompactingMemStore.IndexType.CHUNK_MAP); 213 } else { 214 // set to CellArrayMap as CCM is configured by default due to MSLAB usage 215 ((CompactingMemStore)memstore).setIndexType(CompactingMemStore.IndexType.ARRAY_MAP); 216 } 217 String[] keys1 = { "A", "A", "B", "C" }; 218 String[] keys2 = { "A", "B", "D" }; 219 String[] keys3 = { "D", "B", "B" }; 220 221 long totalCellsLen1 = addRowsByKeysDataSize(memstore, keys1); 222 long cellBeforeFlushSize = cellBeforeFlushSize(); 223 long cellAfterFlushSize = cellAfterFlushSize(); 224 long totalHeapSize1 = MutableSegment.DEEP_OVERHEAD + 4 * cellBeforeFlushSize; 225 assertEquals(totalCellsLen1, region.getMemStoreDataSize()); 226 assertEquals(totalHeapSize1, ((CompactingMemStore) memstore).heapSize()); 227 228 MemStoreSize mss = memstore.getFlushableSize(); 229 ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact 230 231 assertEquals(0, memstore.getSnapshot().getCellsCount()); 232 // One cell is duplicated and the compaction will remove it. All cells of same size so adjusting 233 // totalCellsLen 234 totalCellsLen1 = (totalCellsLen1 * 3) / 4; 235 totalHeapSize1 = 3 * cellAfterFlushSize + MutableSegment.DEEP_OVERHEAD 236 + (toCellChunkMap ? 237 CellChunkImmutableSegment.DEEP_OVERHEAD_CCM : 238 CellArrayImmutableSegment.DEEP_OVERHEAD_CAM); 239 assertEquals(totalCellsLen1, regionServicesForStores.getMemStoreSize()); 240 assertEquals(totalHeapSize1, ((CompactingMemStore) memstore).heapSize()); 241 242 long totalCellsLen2 = addRowsByKeysDataSize(memstore, keys2); 243 long totalHeapSize2 = 3 * cellBeforeFlushSize; 244 245 assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize()); 246 assertEquals(totalHeapSize1 + totalHeapSize2, ((CompactingMemStore) memstore).heapSize()); 247 248 ((MyCompactingMemStore) memstore).disableCompaction(); 249 mss = memstore.getFlushableSize(); 250 ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline without compaction 251 totalHeapSize2 = totalHeapSize2 + CSLMImmutableSegment.DEEP_OVERHEAD_CSLM; 252 assertEquals(0, memstore.getSnapshot().getCellsCount()); 253 assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize()); 254 assertEquals(totalHeapSize1 + totalHeapSize2, ((CompactingMemStore) memstore).heapSize()); 255 256 long totalCellsLen3 = addRowsByKeysDataSize(memstore, keys3); 257 long totalHeapSize3 = 3 * cellBeforeFlushSize; 258 assertEquals(totalCellsLen1 + totalCellsLen2 + totalCellsLen3, 259 regionServicesForStores.getMemStoreSize()); 260 assertEquals(totalHeapSize1 + totalHeapSize2 + totalHeapSize3, 261 ((CompactingMemStore) memstore).heapSize()); 262 263 ((MyCompactingMemStore) memstore).enableCompaction(); 264 mss = memstore.getFlushableSize(); 265 ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact 266 while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) { 267 Threads.sleep(10); 268 } 269 assertEquals(0, memstore.getSnapshot().getCellsCount()); 270 // active flushed to pipeline and all 3 segments compacted. Will get rid of duplicated cells. 271 // Out of total 10, only 4 cells are unique 272 totalCellsLen2 = totalCellsLen2 / 3;// 2 out of 3 cells are duplicated 273 totalCellsLen3 = 0;// All duplicated cells. 274 assertEquals(totalCellsLen1 + totalCellsLen2 + totalCellsLen3, 275 regionServicesForStores.getMemStoreSize()); 276 // Only 4 unique cells left 277 long totalHeapSize4 = 4 * cellAfterFlushSize + MutableSegment.DEEP_OVERHEAD 278 + (toCellChunkMap ? 279 CellChunkImmutableSegment.DEEP_OVERHEAD_CCM : 280 CellArrayImmutableSegment.DEEP_OVERHEAD_CAM); 281 assertEquals(totalHeapSize4, ((CompactingMemStore) memstore).heapSize()); 282 283 mss = memstore.getFlushableSize(); 284 MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot 285 // simulate flusher 286 region.decrMemStoreSize(mss.getDataSize(), mss.getHeapSize(), mss.getOffHeapSize(), 287 mss.getCellsCount()); 288 ImmutableSegment s = memstore.getSnapshot(); 289 assertEquals(4, s.getCellsCount()); 290 assertEquals(0, regionServicesForStores.getMemStoreSize()); 291 292 memstore.clearSnapshot(snapshot.getId()); 293 } 294 295 ////////////////////////////////////////////////////////////////////////////// 296 // Merging tests 297 ////////////////////////////////////////////////////////////////////////////// 298 @Test 299 public void testMerging() throws IOException { 300 if (toCellChunkMap) { 301 // set memstore to flat into CellChunkMap 302 ((CompactingMemStore)memstore).setIndexType(CompactingMemStore.IndexType.CHUNK_MAP); 303 } 304 String[] keys1 = { "A", "A", "B", "C", "F", "H"}; 305 String[] keys2 = { "A", "B", "D", "G", "I", "J"}; 306 String[] keys3 = { "D", "B", "B", "E" }; 307 308 MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC; 309 memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, 310 String.valueOf(compactionType)); 311 ((MyCompactingMemStore)memstore).initiateType(compactionType, memstore.getConfiguration()); 312 addRowsByKeysDataSize(memstore, keys1); 313 314 ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline should not compact 315 316 while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) { 317 Threads.sleep(10); 318 } 319 assertEquals(0, memstore.getSnapshot().getCellsCount()); 320 321 addRowsByKeysDataSize(memstore, keys2); // also should only flatten 322 323 int counter2 = 0; 324 for ( Segment s : memstore.getSegments()) { 325 counter2 += s.getCellsCount(); 326 } 327 assertEquals(12, counter2); 328 329 ((MyCompactingMemStore) memstore).disableCompaction(); 330 331 ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline without flattening 332 assertEquals(0, memstore.getSnapshot().getCellsCount()); 333 334 int counter3 = 0; 335 for ( Segment s : memstore.getSegments()) { 336 counter3 += s.getCellsCount(); 337 } 338 assertEquals(12, counter3); 339 340 addRowsByKeysDataSize(memstore, keys3); 341 342 int counter4 = 0; 343 for ( Segment s : memstore.getSegments()) { 344 counter4 += s.getCellsCount(); 345 } 346 assertEquals(16, counter4); 347 348 ((MyCompactingMemStore) memstore).enableCompaction(); 349 350 351 ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact 352 while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) { 353 Threads.sleep(10); 354 } 355 assertEquals(0, memstore.getSnapshot().getCellsCount()); 356 357 int counter = 0; 358 for ( Segment s : memstore.getSegments()) { 359 counter += s.getCellsCount(); 360 } 361 assertEquals(16,counter); 362 363 MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot 364 ImmutableSegment s = memstore.getSnapshot(); 365 memstore.clearSnapshot(snapshot.getId()); 366 } 367 368 @Test 369 public void testTimeRangeAfterCompaction() throws IOException { 370 if (toCellChunkMap) { 371 // set memstore to flat into CellChunkMap 372 ((CompactingMemStore)memstore).setIndexType(CompactingMemStore.IndexType.CHUNK_MAP); 373 } 374 testTimeRange(true); 375 } 376 377 @Test 378 public void testTimeRangeAfterMerge() throws IOException { 379 if (toCellChunkMap) { 380 // set memstore to flat into CellChunkMap 381 ((CompactingMemStore)memstore).setIndexType(CompactingMemStore.IndexType.CHUNK_MAP); 382 } 383 MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC; 384 memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, 385 String.valueOf(compactionType)); 386 ((MyCompactingMemStore)memstore).initiateType(compactionType, memstore.getConfiguration()); 387 testTimeRange(false); 388 } 389 390 private void testTimeRange(boolean isCompaction) throws IOException { 391 final long initTs = 100; 392 long currentTs = initTs; 393 byte[] row = Bytes.toBytes("row"); 394 byte[] family = Bytes.toBytes("family"); 395 byte[] qf1 = Bytes.toBytes("qf1"); 396 397 // first segment in pipeline 398 this.memstore.add(new KeyValue(row, family, qf1, ++currentTs, (byte[])null), null); 399 long minTs = currentTs; 400 this.memstore.add(new KeyValue(row, family, qf1, ++currentTs, (byte[])null), null); 401 402 long numberOfCell = 2; 403 assertEquals(numberOfCell, memstore.getSegments().stream().mapToInt(Segment::getCellsCount).sum()); 404 assertEquals(minTs, memstore.getSegments().stream().mapToLong( 405 m -> m.getTimeRangeTracker().getMin()).min().getAsLong()); 406 assertEquals(currentTs, memstore.getSegments().stream().mapToLong( 407 m -> m.getTimeRangeTracker().getMax()).max().getAsLong()); 408 409 ((CompactingMemStore) memstore).flushInMemory(); 410 411 while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) { 412 Threads.sleep(10); 413 } 414 if (isCompaction) { 415 // max version = 1, so one cell will be dropped. 416 numberOfCell = 1; 417 minTs = currentTs; 418 } 419 // second segment in pipeline 420 this.memstore.add(new KeyValue(row, family, qf1, ++currentTs, (byte[])null), null); 421 this.memstore.add(new KeyValue(row, family, qf1, ++currentTs, (byte[])null), null); 422 numberOfCell += 2; 423 assertEquals(numberOfCell, memstore.getSegments().stream().mapToInt(Segment::getCellsCount).sum()); 424 assertEquals(minTs, memstore.getSegments().stream().mapToLong( 425 m -> m.getTimeRangeTracker().getMin()).min().getAsLong()); 426 assertEquals(currentTs, memstore.getSegments().stream().mapToLong( 427 m -> m.getTimeRangeTracker().getMax()).max().getAsLong()); 428 429 ((CompactingMemStore) memstore).flushInMemory(); // trigger the merge 430 431 while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) { 432 Threads.sleep(10); 433 } 434 if (isCompaction) { 435 // max version = 1, so one cell will be dropped. 436 numberOfCell = 1; 437 minTs = currentTs; 438 } 439 440 assertEquals(numberOfCell, memstore.getSegments().stream().mapToInt(Segment::getCellsCount).sum()); 441 assertEquals(minTs, memstore.getSegments().stream().mapToLong( 442 m -> m.getTimeRangeTracker().getMin()).min().getAsLong()); 443 assertEquals(currentTs, memstore.getSegments().stream().mapToLong( 444 m -> m.getTimeRangeTracker().getMax()).max().getAsLong()); 445 } 446 447 @Test 448 public void testCountOfCellsAfterFlatteningByScan() throws IOException { 449 String[] keys1 = { "A", "B", "C" }; // A, B, C 450 addRowsByKeysWith50Cols(memstore, keys1); 451 // this should only flatten as there are no duplicates 452 ((CompactingMemStore) memstore).flushInMemory(); 453 while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) { 454 Threads.sleep(10); 455 } 456 List<KeyValueScanner> scanners = memstore.getScanners(Long.MAX_VALUE); 457 // seek 458 int count = 0; 459 for(int i = 0; i < scanners.size(); i++) { 460 scanners.get(i).seek(KeyValue.LOWESTKEY); 461 while (scanners.get(i).next() != null) { 462 count++; 463 } 464 } 465 assertEquals("the count should be ", 150, count); 466 for(int i = 0; i < scanners.size(); i++) { 467 scanners.get(i).close(); 468 } 469 } 470 471 @Test 472 public void testCountOfCellsAfterFlatteningByIterator() throws IOException { 473 String[] keys1 = { "A", "B", "C" }; // A, B, C 474 addRowsByKeysWith50Cols(memstore, keys1); 475 // this should only flatten as there are no duplicates 476 ((CompactingMemStore) memstore).flushInMemory(); 477 while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) { 478 Threads.sleep(10); 479 } 480 // Just doing the cnt operation here 481 MemStoreSegmentsIterator itr = new MemStoreMergerSegmentsIterator( 482 ((CompactingMemStore) memstore).getImmutableSegments().getStoreSegments(), 483 CellComparatorImpl.COMPARATOR, 10); 484 int cnt = 0; 485 try { 486 while (itr.next() != null) { 487 cnt++; 488 } 489 } finally { 490 itr.close(); 491 } 492 assertEquals("the count should be ", 150, cnt); 493 } 494 495 private void addRowsByKeysWith50Cols(AbstractMemStore hmc, String[] keys) { 496 byte[] fam = Bytes.toBytes("testfamily"); 497 for (int i = 0; i < keys.length; i++) { 498 long timestamp = System.currentTimeMillis(); 499 Threads.sleep(1); // to make sure each kv gets a different ts 500 byte[] row = Bytes.toBytes(keys[i]); 501 for(int j =0 ;j < 50; j++) { 502 byte[] qf = Bytes.toBytes("testqualifier"+j); 503 byte[] val = Bytes.toBytes(keys[i] + j); 504 KeyValue kv = new KeyValue(row, fam, qf, timestamp, val); 505 hmc.add(kv, null); 506 } 507 } 508 } 509 510 @Override 511 @Test 512 public void testPuttingBackChunksWithOpeningScanner() throws IOException { 513 byte[] row = Bytes.toBytes("testrow"); 514 byte[] fam = Bytes.toBytes("testfamily"); 515 byte[] qf1 = Bytes.toBytes("testqualifier1"); 516 byte[] qf2 = Bytes.toBytes("testqualifier2"); 517 byte[] qf3 = Bytes.toBytes("testqualifier3"); 518 byte[] qf4 = Bytes.toBytes("testqualifier4"); 519 byte[] qf5 = Bytes.toBytes("testqualifier5"); 520 byte[] qf6 = Bytes.toBytes("testqualifier6"); 521 byte[] qf7 = Bytes.toBytes("testqualifier7"); 522 byte[] val = Bytes.toBytes("testval"); 523 524 // Setting up memstore 525 memstore.add(new KeyValue(row, fam, qf1, val), null); 526 memstore.add(new KeyValue(row, fam, qf2, val), null); 527 memstore.add(new KeyValue(row, fam, qf3, val), null); 528 529 // Creating a snapshot 530 MemStoreSnapshot snapshot = memstore.snapshot(); 531 assertEquals(3, memstore.getSnapshot().getCellsCount()); 532 533 // Adding value to "new" memstore 534 assertEquals(0, memstore.getActive().getCellsCount()); 535 memstore.add(new KeyValue(row, fam, qf4, val), null); 536 memstore.add(new KeyValue(row, fam, qf5, val), null); 537 assertEquals(2, memstore.getActive().getCellsCount()); 538 539 // opening scanner before clear the snapshot 540 List<KeyValueScanner> scanners = memstore.getScanners(0); 541 // Shouldn't putting back the chunks to pool,since some scanners are opening 542 // based on their data 543 // close the scanners 544 for(KeyValueScanner scanner : snapshot.getScanners()) { 545 scanner.close(); 546 } 547 memstore.clearSnapshot(snapshot.getId()); 548 549 assertTrue(chunkCreator.getPoolSize() == 0); 550 551 // Chunks will be put back to pool after close scanners; 552 for (KeyValueScanner scanner : scanners) { 553 scanner.close(); 554 } 555 assertTrue(chunkCreator.getPoolSize() > 0); 556 557 // clear chunks 558 chunkCreator.clearChunksInPool(); 559 560 // Creating another snapshot 561 562 snapshot = memstore.snapshot(); 563 // Adding more value 564 memstore.add(new KeyValue(row, fam, qf6, val), null); 565 memstore.add(new KeyValue(row, fam, qf7, val), null); 566 // opening scanners 567 scanners = memstore.getScanners(0); 568 // close scanners before clear the snapshot 569 for (KeyValueScanner scanner : scanners) { 570 scanner.close(); 571 } 572 // Since no opening scanner, the chunks of snapshot should be put back to 573 // pool 574 // close the scanners 575 for(KeyValueScanner scanner : snapshot.getScanners()) { 576 scanner.close(); 577 } 578 memstore.clearSnapshot(snapshot.getId()); 579 assertTrue(chunkCreator.getPoolSize() > 0); 580 } 581 582 @Override 583 @Test 584 public void testPuttingBackChunksAfterFlushing() throws IOException { 585 byte[] row = Bytes.toBytes("testrow"); 586 byte[] fam = Bytes.toBytes("testfamily"); 587 byte[] qf1 = Bytes.toBytes("testqualifier1"); 588 byte[] qf2 = Bytes.toBytes("testqualifier2"); 589 byte[] qf3 = Bytes.toBytes("testqualifier3"); 590 byte[] qf4 = Bytes.toBytes("testqualifier4"); 591 byte[] qf5 = Bytes.toBytes("testqualifier5"); 592 byte[] val = Bytes.toBytes("testval"); 593 594 // Setting up memstore 595 memstore.add(new KeyValue(row, fam, qf1, val), null); 596 memstore.add(new KeyValue(row, fam, qf2, val), null); 597 memstore.add(new KeyValue(row, fam, qf3, val), null); 598 599 // Creating a snapshot 600 MemStoreSnapshot snapshot = memstore.snapshot(); 601 assertEquals(3, memstore.getSnapshot().getCellsCount()); 602 603 // Adding value to "new" memstore 604 assertEquals(0, memstore.getActive().getCellsCount()); 605 memstore.add(new KeyValue(row, fam, qf4, val), null); 606 memstore.add(new KeyValue(row, fam, qf5, val), null); 607 assertEquals(2, memstore.getActive().getCellsCount()); 608 // close the scanners 609 for(KeyValueScanner scanner : snapshot.getScanners()) { 610 scanner.close(); 611 } 612 memstore.clearSnapshot(snapshot.getId()); 613 614 int chunkCount = chunkCreator.getPoolSize(); 615 assertTrue(chunkCount > 0); 616 } 617 618 @Test 619 public void testFlatteningToCellChunkMap() throws IOException { 620 if(!toCellChunkMap) { 621 return; 622 } 623 // set memstore to flat into CellChunkMap 624 MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC; 625 memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, 626 String.valueOf(compactionType)); 627 ((MyCompactingMemStore)memstore).initiateType(compactionType, memstore.getConfiguration()); 628 ((CompactingMemStore)memstore).setIndexType(CompactingMemStore.IndexType.CHUNK_MAP); 629 int numOfCells = 8; 630 String[] keys1 = { "A", "A", "B", "C", "D", "D", "E", "F" }; //A1, A2, B3, C4, D5, D6, E7, F8 631 632 // make one cell 633 byte[] row = Bytes.toBytes(keys1[0]); 634 byte[] val = Bytes.toBytes(keys1[0] + 0); 635 KeyValue kv = 636 new KeyValue(row, Bytes.toBytes("testfamily"), Bytes.toBytes("testqualifier"), 637 System.currentTimeMillis(), val); 638 639 // test 1 bucket 640 int totalCellsLen = addRowsByKeys(memstore, keys1); 641 long oneCellOnCSLMHeapSize = 642 ClassSize.align( 643 ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + KeyValue.FIXED_OVERHEAD + KeyValueUtil 644 .length(kv)); 645 646 long totalHeapSize = numOfCells * oneCellOnCSLMHeapSize + MutableSegment.DEEP_OVERHEAD; 647 assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize()); 648 assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize()); 649 650 ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and flatten 651 assertEquals(0, memstore.getSnapshot().getCellsCount()); 652 long oneCellOnCCMHeapSize = 653 ClassSize.CELL_CHUNK_MAP_ENTRY + ClassSize.align(KeyValueUtil.length(kv)); 654 totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellChunkImmutableSegment.DEEP_OVERHEAD_CCM 655 + numOfCells * oneCellOnCCMHeapSize; 656 657 assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize()); 658 assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize()); 659 660 MemStoreSize mss = memstore.getFlushableSize(); 661 MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot 662 // simulate flusher 663 region.decrMemStoreSize(mss); 664 ImmutableSegment s = memstore.getSnapshot(); 665 assertEquals(numOfCells, s.getCellsCount()); 666 assertEquals(0, regionServicesForStores.getMemStoreSize()); 667 668 memstore.clearSnapshot(snapshot.getId()); 669 } 670 671 /** 672 * CellChunkMap Segment index requires all cell data to be written in the MSLAB Chunks. 673 * Even though MSLAB is enabled, cells bigger than maxAlloc 674 * (even if smaller than the size of a chunk) are not written in the MSLAB Chunks. 675 * If such cells are found in the process of flattening into CellChunkMap 676 * (in-memory-flush) they need to be copied into MSLAB. 677 * testFlatteningToBigCellChunkMap checks that the process of flattening into 678 * CellChunkMap succeeds, even when such big cells are allocated. 679 */ 680 @Test 681 public void testFlatteningToBigCellChunkMap() throws IOException { 682 683 if (toCellChunkMap == false) { 684 return; 685 } 686 // set memstore to flat into CellChunkMap 687 MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC; 688 memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, 689 String.valueOf(compactionType)); 690 ((MyCompactingMemStore)memstore).initiateType(compactionType, memstore.getConfiguration()); 691 ((CompactingMemStore)memstore).setIndexType(CompactingMemStore.IndexType.CHUNK_MAP); 692 int numOfCells = 4; 693 char[] chars = new char[MemStoreLAB.MAX_ALLOC_DEFAULT]; 694 for (int i = 0; i < chars.length; i++) { 695 chars[i] = 'A'; 696 } 697 String bigVal = new String(chars); 698 String[] keys1 = { "A", "B", "C", "D"}; 699 700 // make one cell 701 byte[] row = Bytes.toBytes(keys1[0]); 702 byte[] val = Bytes.toBytes(bigVal); 703 KeyValue kv = 704 new KeyValue(row, Bytes.toBytes("testfamily"), Bytes.toBytes("testqualifier"), 705 System.currentTimeMillis(), val); 706 707 // test 1 bucket 708 int totalCellsLen = addRowsByKeys(memstore, keys1, val); 709 710 long oneCellOnCSLMHeapSize = 711 ClassSize.align( 712 ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + kv.heapSize()); 713 714 long totalHeapSize = numOfCells * oneCellOnCSLMHeapSize + MutableSegment.DEEP_OVERHEAD; 715 assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize()); 716 assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize()); 717 718 ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and flatten 719 while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) { 720 Threads.sleep(10); 721 } 722 assertEquals(0, memstore.getSnapshot().getCellsCount()); 723 // One cell is duplicated, but it shouldn't be compacted because we are in BASIC mode. 724 // totalCellsLen should remain the same 725 long oneCellOnCCMHeapSize = 726 ClassSize.CELL_CHUNK_MAP_ENTRY + ClassSize.align(KeyValueUtil.length(kv)); 727 totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellChunkImmutableSegment.DEEP_OVERHEAD_CCM 728 + numOfCells * oneCellOnCCMHeapSize; 729 730 assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize()); 731 assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize()); 732 733 MemStoreSize mss = memstore.getFlushableSize(); 734 MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot 735 // simulate flusher 736 region.decrMemStoreSize(mss); 737 ImmutableSegment s = memstore.getSnapshot(); 738 assertEquals(numOfCells, s.getCellsCount()); 739 assertEquals(0, regionServicesForStores.getMemStoreSize()); 740 741 memstore.clearSnapshot(snapshot.getId()); 742 } 743 744 /** 745 * CellChunkMap Segment index requires all cell data to be written in the MSLAB Chunks. 746 * Even though MSLAB is enabled, cells bigger than the size of a chunk are not 747 * written in the MSLAB Chunks. 748 * If such cells are found in the process of flattening into CellChunkMap 749 * (in-memory-flush) they need to be copied into MSLAB. 750 * testFlatteningToJumboCellChunkMap checks that the process of flattening 751 * into CellChunkMap succeeds, even when such big cells are allocated. 752 */ 753 @Test 754 public void testFlatteningToJumboCellChunkMap() throws IOException { 755 756 if (toCellChunkMap == false) { 757 return; 758 } 759 // set memstore to flat into CellChunkMap 760 MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC; 761 memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, 762 String.valueOf(compactionType)); 763 ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration()); 764 ((CompactingMemStore) memstore).setIndexType(CompactingMemStore.IndexType.CHUNK_MAP); 765 766 int numOfCells = 1; 767 char[] chars = new char[MemStoreLAB.CHUNK_SIZE_DEFAULT]; 768 for (int i = 0; i < chars.length; i++) { 769 chars[i] = 'A'; 770 } 771 String bigVal = new String(chars); 772 String[] keys1 = {"A"}; 773 774 // make one cell 775 byte[] row = Bytes.toBytes(keys1[0]); 776 byte[] val = Bytes.toBytes(bigVal); 777 KeyValue kv = 778 new KeyValue(row, Bytes.toBytes("testfamily"), Bytes.toBytes("testqualifier"), 779 System.currentTimeMillis(), val); 780 781 // test 1 bucket 782 int totalCellsLen = addRowsByKeys(memstore, keys1, val); 783 784 long oneCellOnCSLMHeapSize = 785 ClassSize.align( 786 ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + kv.heapSize()); 787 788 long totalHeapSize = numOfCells * oneCellOnCSLMHeapSize + MutableSegment.DEEP_OVERHEAD; 789 assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize()); 790 assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize()); 791 792 ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and flatten 793 while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) { 794 Threads.sleep(10); 795 } 796 assertEquals(0, memstore.getSnapshot().getCellsCount()); 797 798 // One cell is duplicated, but it shouldn't be compacted because we are in BASIC mode. 799 // totalCellsLen should remain the same 800 long oneCellOnCCMHeapSize = 801 ClassSize.CELL_CHUNK_MAP_ENTRY + ClassSize.align(KeyValueUtil.length(kv)); 802 totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellChunkImmutableSegment.DEEP_OVERHEAD_CCM 803 + numOfCells * oneCellOnCCMHeapSize; 804 805 assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize()); 806 assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize()); 807 808 MemStoreSize mss = memstore.getFlushableSize(); 809 MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot 810 // simulate flusher 811 region.decrMemStoreSize(mss); 812 ImmutableSegment s = memstore.getSnapshot(); 813 assertEquals(numOfCells, s.getCellsCount()); 814 assertEquals(0, regionServicesForStores.getMemStoreSize()); 815 816 memstore.clearSnapshot(snapshot.getId()); 817 818 // Allocating two big cells (too big for being copied into a regular chunk). 819 String[] keys2 = {"C", "D"}; 820 addRowsByKeys(memstore, keys2, val); 821 while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) { 822 Threads.sleep(10); 823 } 824 825 // The in-memory flush size is bigger than the size of a single cell, 826 // but smaller than the size of two cells. 827 // Therefore, the two created cells are flattened together. 828 totalHeapSize = MutableSegment.DEEP_OVERHEAD 829 + CellChunkImmutableSegment.DEEP_OVERHEAD_CCM 830 + 2 * oneCellOnCCMHeapSize; 831 assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize()); 832 } 833 834 /** 835 * CellChunkMap Segment index requires all cell data to be written in the MSLAB Chunks. 836 * Even though MSLAB is enabled, cells bigger than the size of a chunk are not 837 * written in the MSLAB Chunks. 838 * If such cells are found in the process of a merge they need to be copied into MSLAB. 839 * testForceCopyOfBigCellIntoImmutableSegment checks that the 840 * ImmutableMemStoreLAB's forceCopyOfBigCellInto does what it's supposed to do. 841 */ 842 @Test 843 public void testForceCopyOfBigCellIntoImmutableSegment() throws IOException { 844 845 if (toCellChunkMap == false) { 846 return; 847 } 848 849 // set memstore to flat into CellChunkMap 850 MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC; 851 memstore.getConfiguration().setInt(MemStoreCompactionStrategy 852 .COMPACTING_MEMSTORE_THRESHOLD_KEY, 4); 853 memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, 854 String.valueOf(compactionType)); 855 ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration()); 856 ((CompactingMemStore) memstore).setIndexType(CompactingMemStore.IndexType.CHUNK_MAP); 857 858 char[] chars = new char[MemStoreLAB.CHUNK_SIZE_DEFAULT]; 859 for (int i = 0; i < chars.length; i++) { 860 chars[i] = 'A'; 861 } 862 String bigVal = new String(chars); 863 byte[] val = Bytes.toBytes(bigVal); 864 865 // We need to add two cells, five times, in order to guarantee a merge 866 List<String[]> keysList = new ArrayList<>(); 867 keysList.add(new String[]{"A", "B"}); 868 keysList.add(new String[]{"C", "D"}); 869 keysList.add(new String[]{"E", "F"}); 870 keysList.add(new String[]{"G", "H"}); 871 keysList.add(new String[]{"I", "J"}); 872 873 // Measuring the size of a single kv 874 KeyValue kv = new KeyValue(Bytes.toBytes("A"), Bytes.toBytes("testfamily"), 875 Bytes.toBytes("testqualifier"), System.currentTimeMillis(), val); 876 long oneCellOnCCMHeapSize = 877 ClassSize.CELL_CHUNK_MAP_ENTRY + ClassSize.align(KeyValueUtil.length(kv)); 878 879 long totalHeapSize = MutableSegment.DEEP_OVERHEAD; 880 for (int i = 0; i < 5; i++) { 881 addRowsByKeys(memstore, keysList.get(i), val); 882 while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) { 883 Threads.sleep(10); 884 } 885 886 // The in-memory flush size is bigger than the size of a single cell, 887 // but smaller than the size of two cells. 888 // Therefore, the two created cells are flattened together. 889 totalHeapSize += CellChunkImmutableSegment.DEEP_OVERHEAD_CCM 890 + 2 * oneCellOnCCMHeapSize; 891 if (i == 4) { 892 // Four out of the five are merged into one, 893 // and the segment becomes immutable 894 totalHeapSize -= (3 * CellChunkImmutableSegment.DEEP_OVERHEAD_CCM 895 + MutableSegment.DEEP_OVERHEAD); 896 } 897 assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize()); 898 } 899 } 900 901 902 private long addRowsByKeysDataSize(final AbstractMemStore hmc, String[] keys) { 903 byte[] fam = Bytes.toBytes("testfamily"); 904 byte[] qf = Bytes.toBytes("testqualifier"); 905 MemStoreSizing memstoreSizing = new NonThreadSafeMemStoreSizing(); 906 for (int i = 0; i < keys.length; i++) { 907 long timestamp = System.currentTimeMillis(); 908 Threads.sleep(1); // to make sure each kv gets a different ts 909 byte[] row = Bytes.toBytes(keys[i]); 910 byte[] val = Bytes.toBytes(keys[i] + i); 911 KeyValue kv = new KeyValue(row, fam, qf, timestamp, val); 912 hmc.add(kv, memstoreSizing); 913 LOG.debug("added kv: " + kv.getKeyString() + ", timestamp" + kv.getTimestamp()); 914 } 915 MemStoreSize mss = memstoreSizing.getMemStoreSize(); 916 regionServicesForStores.addMemStoreSize(mss.getDataSize(), mss.getHeapSize(), 917 mss.getOffHeapSize(), mss.getCellsCount()); 918 return mss.getDataSize(); 919 } 920 921 private long cellBeforeFlushSize() { 922 // make one cell 923 byte[] row = Bytes.toBytes("A"); 924 byte[] val = Bytes.toBytes("A" + 0); 925 KeyValue kv = 926 new KeyValue(row, Bytes.toBytes("testfamily"), Bytes.toBytes("testqualifier"), 927 System.currentTimeMillis(), val); 928 return ClassSize.align( 929 ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + KeyValue.FIXED_OVERHEAD + KeyValueUtil.length(kv)); 930 } 931 932 private long cellAfterFlushSize() { 933 // make one cell 934 byte[] row = Bytes.toBytes("A"); 935 byte[] val = Bytes.toBytes("A" + 0); 936 KeyValue kv = 937 new KeyValue(row, Bytes.toBytes("testfamily"), Bytes.toBytes("testqualifier"), 938 System.currentTimeMillis(), val); 939 940 return toCellChunkMap ? 941 ClassSize.align( 942 ClassSize.CELL_CHUNK_MAP_ENTRY + KeyValueUtil.length(kv)) : 943 ClassSize.align( 944 ClassSize.CELL_ARRAY_MAP_ENTRY + KeyValue.FIXED_OVERHEAD + KeyValueUtil.length(kv)); 945 } 946}