001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.util.ArrayList; 025import java.util.List; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.hbase.CellComparatorImpl; 028import org.apache.hadoop.hbase.HBaseClassTestRule; 029import org.apache.hadoop.hbase.HBaseConfiguration; 030import org.apache.hadoop.hbase.KeyValue; 031import org.apache.hadoop.hbase.MemoryCompactionPolicy; 032import org.apache.hadoop.hbase.testclassification.LargeTests; 033import org.apache.hadoop.hbase.testclassification.RegionServerTests; 034import org.apache.hadoop.hbase.util.Bytes; 035import org.apache.hadoop.hbase.util.ClassSize; 036import org.apache.hadoop.hbase.util.Threads; 037import org.junit.ClassRule; 038import org.junit.Test; 039import org.junit.experimental.categories.Category; 040import org.junit.runner.RunWith; 041import org.junit.runners.Parameterized; 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044 045/** 046 * compacted memstore test case 047 */ 048@Category({RegionServerTests.class, LargeTests.class}) 049@RunWith(Parameterized.class) 050public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore { 051 052 @ClassRule 053 public static final HBaseClassTestRule CLASS_RULE = 054 HBaseClassTestRule.forClass(TestCompactingToCellFlatMapMemStore.class); 055 056 @Parameterized.Parameters 057 public static Object[] data() { 058 return new Object[] { "CHUNK_MAP", "ARRAY_MAP" }; // test different immutable indexes 059 } 060 private static final Logger LOG = 061 LoggerFactory.getLogger(TestCompactingToCellFlatMapMemStore.class); 062 public final boolean toCellChunkMap; 063 Configuration conf; 064 ////////////////////////////////////////////////////////////////////////////// 065 // Helpers 066 ////////////////////////////////////////////////////////////////////////////// 067 public TestCompactingToCellFlatMapMemStore(String type){ 068 if (type == "CHUNK_MAP") { 069 toCellChunkMap = true; 070 } else { 071 toCellChunkMap = false; 072 } 073 } 074 075 @Override public void tearDown() throws Exception { 076 chunkCreator.clearChunksInPool(); 077 } 078 079 @Override public void setUp() throws Exception { 080 081 compactingSetUp(); 082 this.conf = HBaseConfiguration.create(); 083 084 // set memstore to do data compaction 085 conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, 086 String.valueOf(MemoryCompactionPolicy.EAGER)); 087 conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.02); 088 this.memstore = 089 new MyCompactingMemStore(conf, CellComparatorImpl.COMPARATOR, store, 090 regionServicesForStores, MemoryCompactionPolicy.EAGER); 091 } 092 093 ////////////////////////////////////////////////////////////////////////////// 094 // Compaction tests 095 ////////////////////////////////////////////////////////////////////////////// 096 @Override 097 public void testCompaction1Bucket() throws IOException { 098 int counter = 0; 099 String[] keys1 = { "A", "A", "B", "C" }; //A1, A2, B3, C4 100 if (toCellChunkMap) { 101 // set memstore to flat into CellChunkMap 102 ((CompactingMemStore)memstore).setIndexType(CompactingMemStore.IndexType.CHUNK_MAP); 103 } else { 104 ((CompactingMemStore)memstore).setIndexType(CompactingMemStore.IndexType.ARRAY_MAP); 105 } 106 107 // test 1 bucket 108 long totalCellsLen = addRowsByKeysDataSize(memstore, keys1); 109 long cellBeforeFlushSize = cellBeforeFlushSize(); 110 long cellAfterFlushSize = cellAfterFlushSize(); 111 long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 4 * cellBeforeFlushSize; 112 113 assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize()); 114 assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize()); 115 116 assertEquals(4, memstore.getActive().getCellsCount()); 117 ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact 118 assertEquals(0, memstore.getSnapshot().getCellsCount()); 119 // One cell is duplicated and the compaction will remove it. All cells of same size so adjusting 120 // totalCellsLen 121 totalCellsLen = (totalCellsLen * 3) / 4; 122 assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize()); 123 124 totalHeapSize = 125 3 * cellAfterFlushSize + MutableSegment.DEEP_OVERHEAD 126 + (toCellChunkMap ? 127 CellChunkImmutableSegment.DEEP_OVERHEAD_CCM : 128 CellArrayImmutableSegment.DEEP_OVERHEAD_CAM); 129 assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize()); 130 for ( Segment s : memstore.getSegments()) { 131 counter += s.getCellsCount(); 132 } 133 assertEquals(3, counter); 134 MemStoreSize mss = memstore.getFlushableSize(); 135 MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot 136 region.decrMemStoreSize(mss); // simulate flusher 137 ImmutableSegment s = memstore.getSnapshot(); 138 assertEquals(3, s.getCellsCount()); 139 assertEquals(0, regionServicesForStores.getMemStoreSize()); 140 141 memstore.clearSnapshot(snapshot.getId()); 142 } 143 144 @Override 145 public void testCompaction2Buckets() throws IOException { 146 if (toCellChunkMap) { 147 // set memstore to flat into CellChunkMap 148 ((CompactingMemStore)memstore).setIndexType(CompactingMemStore.IndexType.CHUNK_MAP); 149 } else { 150 ((CompactingMemStore)memstore).setIndexType(CompactingMemStore.IndexType.ARRAY_MAP); 151 } 152 String[] keys1 = { "A", "A", "B", "C" }; 153 String[] keys2 = { "A", "B", "D" }; 154 155 long totalCellsLen1 = addRowsByKeysDataSize(memstore, keys1); // INSERT 4 156 long cellBeforeFlushSize = cellBeforeFlushSize(); 157 long cellAfterFlushSize = cellAfterFlushSize(); 158 long totalHeapSize1 = MutableSegment.DEEP_OVERHEAD + 4 * cellBeforeFlushSize; 159 assertEquals(totalCellsLen1, regionServicesForStores.getMemStoreSize()); 160 assertEquals(totalHeapSize1, ((CompactingMemStore) memstore).heapSize()); 161 162 ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact 163 int counter = 0; // COMPACT 4->3 164 for ( Segment s : memstore.getSegments()) { 165 counter += s.getCellsCount(); 166 } 167 assertEquals(3,counter); 168 assertEquals(0, memstore.getSnapshot().getCellsCount()); 169 // One cell is duplicated and the compaction will remove it. All cells of same size so adjusting 170 // totalCellsLen 171 totalCellsLen1 = (totalCellsLen1 * 3) / 4; 172 totalHeapSize1 = 3 * cellAfterFlushSize + MutableSegment.DEEP_OVERHEAD 173 + (toCellChunkMap ? 174 CellChunkImmutableSegment.DEEP_OVERHEAD_CCM : 175 CellArrayImmutableSegment.DEEP_OVERHEAD_CAM); 176 assertEquals(totalCellsLen1, regionServicesForStores.getMemStoreSize()); 177 assertEquals(totalHeapSize1, ((CompactingMemStore) memstore).heapSize()); 178 179 long totalCellsLen2 = addRowsByKeysDataSize(memstore, keys2); // INSERT 3 (3+3=6) 180 long totalHeapSize2 = 3 * cellBeforeFlushSize; 181 assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize()); 182 assertEquals(totalHeapSize1 + totalHeapSize2, ((CompactingMemStore) memstore).heapSize()); 183 184 ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact 185 assertEquals(0, memstore.getSnapshot().getCellsCount());// COMPACT 6->4 186 counter = 0; 187 for ( Segment s : memstore.getSegments()) { 188 counter += s.getCellsCount(); 189 } 190 assertEquals(4,counter); 191 totalCellsLen2 = totalCellsLen2 / 3;// 2 cells duplicated in set 2 192 assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize()); 193 totalHeapSize2 = 1 * cellAfterFlushSize; 194 assertEquals(totalHeapSize1 + totalHeapSize2, ((CompactingMemStore) memstore).heapSize()); 195 196 MemStoreSize mss = memstore.getFlushableSize(); 197 MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot 198 // simulate flusher 199 region.decrMemStoreSize(mss); 200 ImmutableSegment s = memstore.getSnapshot(); 201 assertEquals(4, s.getCellsCount()); 202 assertEquals(0, regionServicesForStores.getMemStoreSize()); 203 204 memstore.clearSnapshot(snapshot.getId()); 205 } 206 207 @Override 208 public void testCompaction3Buckets() throws IOException { 209 if (toCellChunkMap) { 210 // set memstore to flat into CellChunkMap 211 ((CompactingMemStore)memstore).setIndexType(CompactingMemStore.IndexType.CHUNK_MAP); 212 } else { 213 // set to CellArrayMap as CCM is configured by default due to MSLAB usage 214 ((CompactingMemStore)memstore).setIndexType(CompactingMemStore.IndexType.ARRAY_MAP); 215 } 216 String[] keys1 = { "A", "A", "B", "C" }; 217 String[] keys2 = { "A", "B", "D" }; 218 String[] keys3 = { "D", "B", "B" }; 219 220 long totalCellsLen1 = addRowsByKeysDataSize(memstore, keys1); 221 long cellBeforeFlushSize = cellBeforeFlushSize(); 222 long cellAfterFlushSize = cellAfterFlushSize(); 223 long totalHeapSize1 = MutableSegment.DEEP_OVERHEAD + 4 * cellBeforeFlushSize; 224 assertEquals(totalCellsLen1, region.getMemStoreDataSize()); 225 assertEquals(totalHeapSize1, ((CompactingMemStore) memstore).heapSize()); 226 227 MemStoreSize mss = memstore.getFlushableSize(); 228 ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact 229 230 assertEquals(0, memstore.getSnapshot().getCellsCount()); 231 // One cell is duplicated and the compaction will remove it. All cells of same size so adjusting 232 // totalCellsLen 233 totalCellsLen1 = (totalCellsLen1 * 3) / 4; 234 totalHeapSize1 = 3 * cellAfterFlushSize + MutableSegment.DEEP_OVERHEAD 235 + (toCellChunkMap ? 236 CellChunkImmutableSegment.DEEP_OVERHEAD_CCM : 237 CellArrayImmutableSegment.DEEP_OVERHEAD_CAM); 238 assertEquals(totalCellsLen1, regionServicesForStores.getMemStoreSize()); 239 assertEquals(totalHeapSize1, ((CompactingMemStore) memstore).heapSize()); 240 241 long totalCellsLen2 = addRowsByKeysDataSize(memstore, keys2); 242 long totalHeapSize2 = 3 * cellBeforeFlushSize; 243 244 assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize()); 245 assertEquals(totalHeapSize1 + totalHeapSize2, ((CompactingMemStore) memstore).heapSize()); 246 247 ((MyCompactingMemStore) memstore).disableCompaction(); 248 mss = memstore.getFlushableSize(); 249 ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline without compaction 250 totalHeapSize2 = totalHeapSize2 + CSLMImmutableSegment.DEEP_OVERHEAD_CSLM; 251 assertEquals(0, memstore.getSnapshot().getCellsCount()); 252 assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize()); 253 assertEquals(totalHeapSize1 + totalHeapSize2, ((CompactingMemStore) memstore).heapSize()); 254 255 long totalCellsLen3 = addRowsByKeysDataSize(memstore, keys3); 256 long totalHeapSize3 = 3 * cellBeforeFlushSize; 257 assertEquals(totalCellsLen1 + totalCellsLen2 + totalCellsLen3, 258 regionServicesForStores.getMemStoreSize()); 259 assertEquals(totalHeapSize1 + totalHeapSize2 + totalHeapSize3, 260 ((CompactingMemStore) memstore).heapSize()); 261 262 ((MyCompactingMemStore) memstore).enableCompaction(); 263 mss = memstore.getFlushableSize(); 264 ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact 265 while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) { 266 Threads.sleep(10); 267 } 268 assertEquals(0, memstore.getSnapshot().getCellsCount()); 269 // active flushed to pipeline and all 3 segments compacted. Will get rid of duplicated cells. 270 // Out of total 10, only 4 cells are unique 271 totalCellsLen2 = totalCellsLen2 / 3;// 2 out of 3 cells are duplicated 272 totalCellsLen3 = 0;// All duplicated cells. 273 assertEquals(totalCellsLen1 + totalCellsLen2 + totalCellsLen3, 274 regionServicesForStores.getMemStoreSize()); 275 // Only 4 unique cells left 276 long totalHeapSize4 = 4 * cellAfterFlushSize + MutableSegment.DEEP_OVERHEAD 277 + (toCellChunkMap ? 278 CellChunkImmutableSegment.DEEP_OVERHEAD_CCM : 279 CellArrayImmutableSegment.DEEP_OVERHEAD_CAM); 280 assertEquals(totalHeapSize4, ((CompactingMemStore) memstore).heapSize()); 281 282 mss = memstore.getFlushableSize(); 283 MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot 284 // simulate flusher 285 region.decrMemStoreSize(mss.getDataSize(), mss.getHeapSize(), mss.getOffHeapSize(), 286 mss.getCellsCount()); 287 ImmutableSegment s = memstore.getSnapshot(); 288 assertEquals(4, s.getCellsCount()); 289 assertEquals(0, regionServicesForStores.getMemStoreSize()); 290 291 memstore.clearSnapshot(snapshot.getId()); 292 } 293 294 ////////////////////////////////////////////////////////////////////////////// 295 // Merging tests 296 ////////////////////////////////////////////////////////////////////////////// 297 @Test 298 public void testMerging() throws IOException { 299 if (toCellChunkMap) { 300 // set memstore to flat into CellChunkMap 301 ((CompactingMemStore)memstore).setIndexType(CompactingMemStore.IndexType.CHUNK_MAP); 302 } 303 String[] keys1 = { "A", "A", "B", "C", "F", "H"}; 304 String[] keys2 = { "A", "B", "D", "G", "I", "J"}; 305 String[] keys3 = { "D", "B", "B", "E" }; 306 307 MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC; 308 memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, 309 String.valueOf(compactionType)); 310 ((MyCompactingMemStore)memstore).initiateType(compactionType, memstore.getConfiguration()); 311 addRowsByKeysDataSize(memstore, keys1); 312 313 ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline should not compact 314 315 while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) { 316 Threads.sleep(10); 317 } 318 assertEquals(0, memstore.getSnapshot().getCellsCount()); 319 320 addRowsByKeysDataSize(memstore, keys2); // also should only flatten 321 322 int counter2 = 0; 323 for ( Segment s : memstore.getSegments()) { 324 counter2 += s.getCellsCount(); 325 } 326 assertEquals(12, counter2); 327 328 ((MyCompactingMemStore) memstore).disableCompaction(); 329 330 ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline without flattening 331 assertEquals(0, memstore.getSnapshot().getCellsCount()); 332 333 int counter3 = 0; 334 for ( Segment s : memstore.getSegments()) { 335 counter3 += s.getCellsCount(); 336 } 337 assertEquals(12, counter3); 338 339 addRowsByKeysDataSize(memstore, keys3); 340 341 int counter4 = 0; 342 for ( Segment s : memstore.getSegments()) { 343 counter4 += s.getCellsCount(); 344 } 345 assertEquals(16, counter4); 346 347 ((MyCompactingMemStore) memstore).enableCompaction(); 348 349 350 ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact 351 while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) { 352 Threads.sleep(10); 353 } 354 assertEquals(0, memstore.getSnapshot().getCellsCount()); 355 356 int counter = 0; 357 for ( Segment s : memstore.getSegments()) { 358 counter += s.getCellsCount(); 359 } 360 assertEquals(16,counter); 361 362 MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot 363 ImmutableSegment s = memstore.getSnapshot(); 364 memstore.clearSnapshot(snapshot.getId()); 365 } 366 367 @Test 368 public void testTimeRangeAfterCompaction() throws IOException { 369 if (toCellChunkMap) { 370 // set memstore to flat into CellChunkMap 371 ((CompactingMemStore)memstore).setIndexType(CompactingMemStore.IndexType.CHUNK_MAP); 372 } 373 testTimeRange(true); 374 } 375 376 @Test 377 public void testTimeRangeAfterMerge() throws IOException { 378 if (toCellChunkMap) { 379 // set memstore to flat into CellChunkMap 380 ((CompactingMemStore)memstore).setIndexType(CompactingMemStore.IndexType.CHUNK_MAP); 381 } 382 MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC; 383 memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, 384 String.valueOf(compactionType)); 385 ((MyCompactingMemStore)memstore).initiateType(compactionType, memstore.getConfiguration()); 386 testTimeRange(false); 387 } 388 389 private void testTimeRange(boolean isCompaction) throws IOException { 390 final long initTs = 100; 391 long currentTs = initTs; 392 byte[] row = Bytes.toBytes("row"); 393 byte[] family = Bytes.toBytes("family"); 394 byte[] qf1 = Bytes.toBytes("qf1"); 395 396 // first segment in pipeline 397 this.memstore.add(new KeyValue(row, family, qf1, ++currentTs, (byte[])null), null); 398 long minTs = currentTs; 399 this.memstore.add(new KeyValue(row, family, qf1, ++currentTs, (byte[])null), null); 400 401 long numberOfCell = 2; 402 assertEquals(numberOfCell, memstore.getSegments().stream().mapToInt(Segment::getCellsCount).sum()); 403 assertEquals(minTs, memstore.getSegments().stream().mapToLong( 404 m -> m.getTimeRangeTracker().getMin()).min().getAsLong()); 405 assertEquals(currentTs, memstore.getSegments().stream().mapToLong( 406 m -> m.getTimeRangeTracker().getMax()).max().getAsLong()); 407 408 ((CompactingMemStore) memstore).flushInMemory(); 409 410 while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) { 411 Threads.sleep(10); 412 } 413 if (isCompaction) { 414 // max version = 1, so one cell will be dropped. 415 numberOfCell = 1; 416 minTs = currentTs; 417 } 418 // second segment in pipeline 419 this.memstore.add(new KeyValue(row, family, qf1, ++currentTs, (byte[])null), null); 420 this.memstore.add(new KeyValue(row, family, qf1, ++currentTs, (byte[])null), null); 421 numberOfCell += 2; 422 assertEquals(numberOfCell, memstore.getSegments().stream().mapToInt(Segment::getCellsCount).sum()); 423 assertEquals(minTs, memstore.getSegments().stream().mapToLong( 424 m -> m.getTimeRangeTracker().getMin()).min().getAsLong()); 425 assertEquals(currentTs, memstore.getSegments().stream().mapToLong( 426 m -> m.getTimeRangeTracker().getMax()).max().getAsLong()); 427 428 ((CompactingMemStore) memstore).flushInMemory(); // trigger the merge 429 430 while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) { 431 Threads.sleep(10); 432 } 433 if (isCompaction) { 434 // max version = 1, so one cell will be dropped. 435 numberOfCell = 1; 436 minTs = currentTs; 437 } 438 439 assertEquals(numberOfCell, memstore.getSegments().stream().mapToInt(Segment::getCellsCount).sum()); 440 assertEquals(minTs, memstore.getSegments().stream().mapToLong( 441 m -> m.getTimeRangeTracker().getMin()).min().getAsLong()); 442 assertEquals(currentTs, memstore.getSegments().stream().mapToLong( 443 m -> m.getTimeRangeTracker().getMax()).max().getAsLong()); 444 } 445 446 @Test 447 public void testCountOfCellsAfterFlatteningByScan() throws IOException { 448 String[] keys1 = { "A", "B", "C" }; // A, B, C 449 addRowsByKeysWith50Cols(memstore, keys1); 450 // this should only flatten as there are no duplicates 451 ((CompactingMemStore) memstore).flushInMemory(); 452 while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) { 453 Threads.sleep(10); 454 } 455 List<KeyValueScanner> scanners = memstore.getScanners(Long.MAX_VALUE); 456 // seek 457 int count = 0; 458 for(int i = 0; i < scanners.size(); i++) { 459 scanners.get(i).seek(KeyValue.LOWESTKEY); 460 while (scanners.get(i).next() != null) { 461 count++; 462 } 463 } 464 assertEquals("the count should be ", 150, count); 465 for(int i = 0; i < scanners.size(); i++) { 466 scanners.get(i).close(); 467 } 468 } 469 470 @Test 471 public void testCountOfCellsAfterFlatteningByIterator() throws IOException { 472 String[] keys1 = { "A", "B", "C" }; // A, B, C 473 addRowsByKeysWith50Cols(memstore, keys1); 474 // this should only flatten as there are no duplicates 475 ((CompactingMemStore) memstore).flushInMemory(); 476 while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) { 477 Threads.sleep(10); 478 } 479 // Just doing the cnt operation here 480 MemStoreSegmentsIterator itr = new MemStoreMergerSegmentsIterator( 481 ((CompactingMemStore) memstore).getImmutableSegments().getStoreSegments(), 482 CellComparatorImpl.COMPARATOR, 10); 483 int cnt = 0; 484 try { 485 while (itr.next() != null) { 486 cnt++; 487 } 488 } finally { 489 itr.close(); 490 } 491 assertEquals("the count should be ", 150, cnt); 492 } 493 494 private void addRowsByKeysWith50Cols(AbstractMemStore hmc, String[] keys) { 495 byte[] fam = Bytes.toBytes("testfamily"); 496 for (int i = 0; i < keys.length; i++) { 497 long timestamp = System.currentTimeMillis(); 498 Threads.sleep(1); // to make sure each kv gets a different ts 499 byte[] row = Bytes.toBytes(keys[i]); 500 for(int j =0 ;j < 50; j++) { 501 byte[] qf = Bytes.toBytes("testqualifier"+j); 502 byte[] val = Bytes.toBytes(keys[i] + j); 503 KeyValue kv = new KeyValue(row, fam, qf, timestamp, val); 504 hmc.add(kv, null); 505 } 506 } 507 } 508 509 @Override 510 @Test 511 public void testPuttingBackChunksWithOpeningScanner() throws IOException { 512 byte[] row = Bytes.toBytes("testrow"); 513 byte[] fam = Bytes.toBytes("testfamily"); 514 byte[] qf1 = Bytes.toBytes("testqualifier1"); 515 byte[] qf2 = Bytes.toBytes("testqualifier2"); 516 byte[] qf3 = Bytes.toBytes("testqualifier3"); 517 byte[] qf4 = Bytes.toBytes("testqualifier4"); 518 byte[] qf5 = Bytes.toBytes("testqualifier5"); 519 byte[] qf6 = Bytes.toBytes("testqualifier6"); 520 byte[] qf7 = Bytes.toBytes("testqualifier7"); 521 byte[] val = Bytes.toBytes("testval"); 522 523 // Setting up memstore 524 memstore.add(new KeyValue(row, fam, qf1, val), null); 525 memstore.add(new KeyValue(row, fam, qf2, val), null); 526 memstore.add(new KeyValue(row, fam, qf3, val), null); 527 528 // Creating a snapshot 529 MemStoreSnapshot snapshot = memstore.snapshot(); 530 assertEquals(3, memstore.getSnapshot().getCellsCount()); 531 532 // Adding value to "new" memstore 533 assertEquals(0, memstore.getActive().getCellsCount()); 534 memstore.add(new KeyValue(row, fam, qf4, val), null); 535 memstore.add(new KeyValue(row, fam, qf5, val), null); 536 assertEquals(2, memstore.getActive().getCellsCount()); 537 538 // opening scanner before clear the snapshot 539 List<KeyValueScanner> scanners = memstore.getScanners(0); 540 // Shouldn't putting back the chunks to pool,since some scanners are opening 541 // based on their data 542 // close the scanners 543 for(KeyValueScanner scanner : snapshot.getScanners()) { 544 scanner.close(); 545 } 546 memstore.clearSnapshot(snapshot.getId()); 547 548 assertTrue(chunkCreator.getPoolSize() == 0); 549 550 // Chunks will be put back to pool after close scanners; 551 for (KeyValueScanner scanner : scanners) { 552 scanner.close(); 553 } 554 assertTrue(chunkCreator.getPoolSize() > 0); 555 556 // clear chunks 557 chunkCreator.clearChunksInPool(); 558 559 // Creating another snapshot 560 561 snapshot = memstore.snapshot(); 562 // Adding more value 563 memstore.add(new KeyValue(row, fam, qf6, val), null); 564 memstore.add(new KeyValue(row, fam, qf7, val), null); 565 // opening scanners 566 scanners = memstore.getScanners(0); 567 // close scanners before clear the snapshot 568 for (KeyValueScanner scanner : scanners) { 569 scanner.close(); 570 } 571 // Since no opening scanner, the chunks of snapshot should be put back to 572 // pool 573 // close the scanners 574 for(KeyValueScanner scanner : snapshot.getScanners()) { 575 scanner.close(); 576 } 577 memstore.clearSnapshot(snapshot.getId()); 578 assertTrue(chunkCreator.getPoolSize() > 0); 579 } 580 581 @Override 582 @Test 583 public void testPuttingBackChunksAfterFlushing() throws IOException { 584 byte[] row = Bytes.toBytes("testrow"); 585 byte[] fam = Bytes.toBytes("testfamily"); 586 byte[] qf1 = Bytes.toBytes("testqualifier1"); 587 byte[] qf2 = Bytes.toBytes("testqualifier2"); 588 byte[] qf3 = Bytes.toBytes("testqualifier3"); 589 byte[] qf4 = Bytes.toBytes("testqualifier4"); 590 byte[] qf5 = Bytes.toBytes("testqualifier5"); 591 byte[] val = Bytes.toBytes("testval"); 592 593 // Setting up memstore 594 memstore.add(new KeyValue(row, fam, qf1, val), null); 595 memstore.add(new KeyValue(row, fam, qf2, val), null); 596 memstore.add(new KeyValue(row, fam, qf3, val), null); 597 598 // Creating a snapshot 599 MemStoreSnapshot snapshot = memstore.snapshot(); 600 assertEquals(3, memstore.getSnapshot().getCellsCount()); 601 602 // Adding value to "new" memstore 603 assertEquals(0, memstore.getActive().getCellsCount()); 604 memstore.add(new KeyValue(row, fam, qf4, val), null); 605 memstore.add(new KeyValue(row, fam, qf5, val), null); 606 assertEquals(2, memstore.getActive().getCellsCount()); 607 // close the scanners 608 for(KeyValueScanner scanner : snapshot.getScanners()) { 609 scanner.close(); 610 } 611 memstore.clearSnapshot(snapshot.getId()); 612 613 int chunkCount = chunkCreator.getPoolSize(); 614 assertTrue(chunkCount > 0); 615 } 616 617 @Test 618 public void testFlatteningToCellChunkMap() throws IOException { 619 if(!toCellChunkMap) { 620 return; 621 } 622 // set memstore to flat into CellChunkMap 623 MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC; 624 memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, 625 String.valueOf(compactionType)); 626 ((MyCompactingMemStore)memstore).initiateType(compactionType, memstore.getConfiguration()); 627 ((CompactingMemStore)memstore).setIndexType(CompactingMemStore.IndexType.CHUNK_MAP); 628 int numOfCells = 8; 629 String[] keys1 = { "A", "A", "B", "C", "D", "D", "E", "F" }; //A1, A2, B3, C4, D5, D6, E7, F8 630 631 // make one cell 632 byte[] row = Bytes.toBytes(keys1[0]); 633 byte[] val = Bytes.toBytes(keys1[0] + 0); 634 KeyValue kv = 635 new KeyValue(row, Bytes.toBytes("testfamily"), Bytes.toBytes("testqualifier"), 636 System.currentTimeMillis(), val); 637 638 // test 1 bucket 639 int totalCellsLen = addRowsByKeys(memstore, keys1); 640 long oneCellOnCSLMHeapSize = ClassSize.align( 641 ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + KeyValue.FIXED_OVERHEAD + kv.getSerializedSize()); 642 643 long totalHeapSize = numOfCells * oneCellOnCSLMHeapSize + MutableSegment.DEEP_OVERHEAD; 644 assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize()); 645 assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize()); 646 647 ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and flatten 648 assertEquals(0, memstore.getSnapshot().getCellsCount()); 649 long oneCellOnCCMHeapSize = 650 ClassSize.CELL_CHUNK_MAP_ENTRY + ClassSize.align(kv.getSerializedSize()); 651 totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellChunkImmutableSegment.DEEP_OVERHEAD_CCM 652 + numOfCells * oneCellOnCCMHeapSize; 653 654 assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize()); 655 assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize()); 656 657 MemStoreSize mss = memstore.getFlushableSize(); 658 MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot 659 // simulate flusher 660 region.decrMemStoreSize(mss); 661 ImmutableSegment s = memstore.getSnapshot(); 662 assertEquals(numOfCells, s.getCellsCount()); 663 assertEquals(0, regionServicesForStores.getMemStoreSize()); 664 665 memstore.clearSnapshot(snapshot.getId()); 666 } 667 668 /** 669 * CellChunkMap Segment index requires all cell data to be written in the MSLAB Chunks. 670 * Even though MSLAB is enabled, cells bigger than maxAlloc 671 * (even if smaller than the size of a chunk) are not written in the MSLAB Chunks. 672 * If such cells are found in the process of flattening into CellChunkMap 673 * (in-memory-flush) they need to be copied into MSLAB. 674 * testFlatteningToBigCellChunkMap checks that the process of flattening into 675 * CellChunkMap succeeds, even when such big cells are allocated. 676 */ 677 @Test 678 public void testFlatteningToBigCellChunkMap() throws IOException { 679 680 if (toCellChunkMap == false) { 681 return; 682 } 683 // set memstore to flat into CellChunkMap 684 MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC; 685 memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, 686 String.valueOf(compactionType)); 687 ((MyCompactingMemStore)memstore).initiateType(compactionType, memstore.getConfiguration()); 688 ((CompactingMemStore)memstore).setIndexType(CompactingMemStore.IndexType.CHUNK_MAP); 689 int numOfCells = 4; 690 char[] chars = new char[MemStoreLAB.MAX_ALLOC_DEFAULT]; 691 for (int i = 0; i < chars.length; i++) { 692 chars[i] = 'A'; 693 } 694 String bigVal = new String(chars); 695 String[] keys1 = { "A", "B", "C", "D"}; 696 697 // make one cell 698 byte[] row = Bytes.toBytes(keys1[0]); 699 byte[] val = Bytes.toBytes(bigVal); 700 KeyValue kv = 701 new KeyValue(row, Bytes.toBytes("testfamily"), Bytes.toBytes("testqualifier"), 702 System.currentTimeMillis(), val); 703 704 // test 1 bucket 705 int totalCellsLen = addRowsByKeys(memstore, keys1, val); 706 707 long oneCellOnCSLMHeapSize = 708 ClassSize.align( 709 ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + kv.heapSize()); 710 711 long totalHeapSize = numOfCells * oneCellOnCSLMHeapSize + MutableSegment.DEEP_OVERHEAD; 712 assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize()); 713 assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize()); 714 715 ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and flatten 716 while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) { 717 Threads.sleep(10); 718 } 719 assertEquals(0, memstore.getSnapshot().getCellsCount()); 720 // One cell is duplicated, but it shouldn't be compacted because we are in BASIC mode. 721 // totalCellsLen should remain the same 722 long oneCellOnCCMHeapSize = 723 ClassSize.CELL_CHUNK_MAP_ENTRY + ClassSize.align(kv.getSerializedSize()); 724 totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellChunkImmutableSegment.DEEP_OVERHEAD_CCM 725 + numOfCells * oneCellOnCCMHeapSize; 726 727 assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize()); 728 assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize()); 729 730 MemStoreSize mss = memstore.getFlushableSize(); 731 MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot 732 // simulate flusher 733 region.decrMemStoreSize(mss); 734 ImmutableSegment s = memstore.getSnapshot(); 735 assertEquals(numOfCells, s.getCellsCount()); 736 assertEquals(0, regionServicesForStores.getMemStoreSize()); 737 738 memstore.clearSnapshot(snapshot.getId()); 739 } 740 741 /** 742 * CellChunkMap Segment index requires all cell data to be written in the MSLAB Chunks. 743 * Even though MSLAB is enabled, cells bigger than the size of a chunk are not 744 * written in the MSLAB Chunks. 745 * If such cells are found in the process of flattening into CellChunkMap 746 * (in-memory-flush) they need to be copied into MSLAB. 747 * testFlatteningToJumboCellChunkMap checks that the process of flattening 748 * into CellChunkMap succeeds, even when such big cells are allocated. 749 */ 750 @Test 751 public void testFlatteningToJumboCellChunkMap() throws IOException { 752 753 if (toCellChunkMap == false) { 754 return; 755 } 756 // set memstore to flat into CellChunkMap 757 MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC; 758 memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, 759 String.valueOf(compactionType)); 760 ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration()); 761 ((CompactingMemStore) memstore).setIndexType(CompactingMemStore.IndexType.CHUNK_MAP); 762 763 int numOfCells = 1; 764 char[] chars = new char[MemStoreLAB.CHUNK_SIZE_DEFAULT]; 765 for (int i = 0; i < chars.length; i++) { 766 chars[i] = 'A'; 767 } 768 String bigVal = new String(chars); 769 String[] keys1 = {"A"}; 770 771 // make one cell 772 byte[] row = Bytes.toBytes(keys1[0]); 773 byte[] val = Bytes.toBytes(bigVal); 774 KeyValue kv = 775 new KeyValue(row, Bytes.toBytes("testfamily"), Bytes.toBytes("testqualifier"), 776 System.currentTimeMillis(), val); 777 778 // test 1 bucket 779 int totalCellsLen = addRowsByKeys(memstore, keys1, val); 780 781 long oneCellOnCSLMHeapSize = 782 ClassSize.align( 783 ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + kv.heapSize()); 784 785 long totalHeapSize = numOfCells * oneCellOnCSLMHeapSize + MutableSegment.DEEP_OVERHEAD; 786 assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize()); 787 assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize()); 788 789 ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and flatten 790 while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) { 791 Threads.sleep(10); 792 } 793 assertEquals(0, memstore.getSnapshot().getCellsCount()); 794 795 // One cell is duplicated, but it shouldn't be compacted because we are in BASIC mode. 796 // totalCellsLen should remain the same 797 long oneCellOnCCMHeapSize = 798 (long) ClassSize.CELL_CHUNK_MAP_ENTRY + ClassSize.align(kv.getSerializedSize()); 799 totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellChunkImmutableSegment.DEEP_OVERHEAD_CCM 800 + numOfCells * oneCellOnCCMHeapSize; 801 802 assertEquals(totalCellsLen+ChunkCreator.SIZEOF_CHUNK_HEADER, regionServicesForStores 803 .getMemStoreSize()); 804 805 assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize()); 806 807 MemStoreSize mss = memstore.getFlushableSize(); 808 MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot 809 // simulate flusher 810 region.decrMemStoreSize(mss); 811 ImmutableSegment s = memstore.getSnapshot(); 812 assertEquals(numOfCells, s.getCellsCount()); 813 assertEquals(0, regionServicesForStores.getMemStoreSize()); 814 815 memstore.clearSnapshot(snapshot.getId()); 816 817 // Allocating two big cells (too big for being copied into a regular chunk). 818 String[] keys2 = {"C", "D"}; 819 addRowsByKeys(memstore, keys2, val); 820 while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) { 821 Threads.sleep(10); 822 } 823 824 // The in-memory flush size is bigger than the size of a single cell, 825 // but smaller than the size of two cells. 826 // Therefore, the two created cells are flattened together. 827 totalHeapSize = MutableSegment.DEEP_OVERHEAD 828 + CellChunkImmutableSegment.DEEP_OVERHEAD_CCM 829 + 1 * oneCellOnCSLMHeapSize 830 + 1 * oneCellOnCCMHeapSize; 831 assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize()); 832 } 833 834 /** 835 * CellChunkMap Segment index requires all cell data to be written in the MSLAB Chunks. 836 * Even though MSLAB is enabled, cells bigger than the size of a chunk are not 837 * written in the MSLAB Chunks. 838 * If such cells are found in the process of a merge they need to be copied into MSLAB. 839 * testForceCopyOfBigCellIntoImmutableSegment checks that the 840 * ImmutableMemStoreLAB's forceCopyOfBigCellInto does what it's supposed to do. 841 */ 842 @org.junit.Ignore @Test // Flakey. Disabled by HBASE-24128. HBASE-24129 is for reenable. 843 // TestCompactingToCellFlatMapMemStore.testForceCopyOfBigCellIntoImmutableSegment:902 i=1 844 // expected:<8389924> but was:<8389992> 845 public void testForceCopyOfBigCellIntoImmutableSegment() throws IOException { 846 847 if (toCellChunkMap == false) { 848 return; 849 } 850 851 // set memstore to flat into CellChunkMap 852 MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC; 853 memstore.getConfiguration().setInt(MemStoreCompactionStrategy 854 .COMPACTING_MEMSTORE_THRESHOLD_KEY, 4); 855 memstore.getConfiguration() 856 .setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.014); 857 memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, 858 String.valueOf(compactionType)); 859 ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration()); 860 ((CompactingMemStore) memstore).setIndexType(CompactingMemStore.IndexType.CHUNK_MAP); 861 862 char[] chars = new char[MemStoreLAB.CHUNK_SIZE_DEFAULT]; 863 for (int i = 0; i < chars.length; i++) { 864 chars[i] = 'A'; 865 } 866 String bigVal = new String(chars); 867 byte[] val = Bytes.toBytes(bigVal); 868 869 // We need to add two cells, three times, in order to guarantee a merge 870 List<String[]> keysList = new ArrayList<>(); 871 keysList.add(new String[]{"A", "B"}); 872 keysList.add(new String[]{"C", "D"}); 873 keysList.add(new String[]{"E", "F"}); 874 keysList.add(new String[]{"G", "H"}); 875 876 // Measuring the size of a single kv 877 KeyValue kv = new KeyValue(Bytes.toBytes("A"), Bytes.toBytes("testfamily"), 878 Bytes.toBytes("testqualifier"), System.currentTimeMillis(), val); 879 long oneCellOnCCMHeapSize = 880 (long) ClassSize.CELL_CHUNK_MAP_ENTRY + ClassSize.align(kv.getSerializedSize()); 881 long oneCellOnCSLMHeapSize = 882 ClassSize.align(ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + kv.heapSize()); 883 long totalHeapSize = MutableSegment.DEEP_OVERHEAD; 884 for (int i = 0; i < keysList.size(); i++) { 885 addRowsByKeys(memstore, keysList.get(i), val); 886 while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) { 887 Threads.sleep(10); 888 } 889 890 if(i==0) { 891 totalHeapSize += CellChunkImmutableSegment.DEEP_OVERHEAD_CCM 892 + oneCellOnCCMHeapSize + oneCellOnCSLMHeapSize; 893 } else { 894 // The in-memory flush size is bigger than the size of a single cell, 895 // but smaller than the size of two cells. 896 // Therefore, the two created cells are flattened in a seperate segment. 897 totalHeapSize += 2 * (CellChunkImmutableSegment.DEEP_OVERHEAD_CCM + oneCellOnCCMHeapSize); 898 } 899 if (i == 2) { 900 // Four out of the five segments are merged into one 901 totalHeapSize -= (4 * CellChunkImmutableSegment.DEEP_OVERHEAD_CCM); 902 totalHeapSize = ClassSize.align(totalHeapSize); 903 } 904 assertEquals("i="+i, totalHeapSize, ((CompactingMemStore) memstore).heapSize()); 905 } 906 } 907 908 909 private long addRowsByKeysDataSize(final AbstractMemStore hmc, String[] keys) { 910 byte[] fam = Bytes.toBytes("testfamily"); 911 byte[] qf = Bytes.toBytes("testqualifier"); 912 MemStoreSizing memstoreSizing = new NonThreadSafeMemStoreSizing(); 913 for (int i = 0; i < keys.length; i++) { 914 long timestamp = System.currentTimeMillis(); 915 Threads.sleep(1); // to make sure each kv gets a different ts 916 byte[] row = Bytes.toBytes(keys[i]); 917 byte[] val = Bytes.toBytes(keys[i] + i); 918 KeyValue kv = new KeyValue(row, fam, qf, timestamp, val); 919 hmc.add(kv, memstoreSizing); 920 LOG.debug("added kv: " + kv.getKeyString() + ", timestamp" + kv.getTimestamp()); 921 } 922 MemStoreSize mss = memstoreSizing.getMemStoreSize(); 923 regionServicesForStores.addMemStoreSize(mss.getDataSize(), mss.getHeapSize(), 924 mss.getOffHeapSize(), mss.getCellsCount()); 925 return mss.getDataSize(); 926 } 927 928 private long cellBeforeFlushSize() { 929 // make one cell 930 byte[] row = Bytes.toBytes("A"); 931 byte[] val = Bytes.toBytes("A" + 0); 932 KeyValue kv = 933 new KeyValue(row, Bytes.toBytes("testfamily"), Bytes.toBytes("testqualifier"), 934 System.currentTimeMillis(), val); 935 return ClassSize.align( 936 ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + KeyValue.FIXED_OVERHEAD + kv.getSerializedSize()); 937 } 938 939 private long cellAfterFlushSize() { 940 // make one cell 941 byte[] row = Bytes.toBytes("A"); 942 byte[] val = Bytes.toBytes("A" + 0); 943 KeyValue kv = 944 new KeyValue(row, Bytes.toBytes("testfamily"), Bytes.toBytes("testqualifier"), 945 System.currentTimeMillis(), val); 946 947 return toCellChunkMap ? 948 ClassSize.align( 949 ClassSize.CELL_CHUNK_MAP_ENTRY + kv.getSerializedSize()) : 950 ClassSize.align( 951 ClassSize.CELL_ARRAY_MAP_ENTRY + KeyValue.FIXED_OVERHEAD + kv.getSerializedSize()); 952 } 953}