001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertTrue; 022 023import java.io.IOException; 024import java.util.concurrent.Executors; 025import java.util.concurrent.ThreadPoolExecutor; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.hbase.HBaseConfiguration; 029import org.apache.hadoop.hbase.HBaseTestingUtil; 030import org.apache.hadoop.hbase.HConstants; 031import org.apache.hadoop.hbase.MemoryCompactionPolicy; 032import org.apache.hadoop.hbase.TableName; 033import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 034import org.apache.hadoop.hbase.client.Put; 035import org.apache.hadoop.hbase.client.RegionInfo; 036import org.apache.hadoop.hbase.client.RegionInfoBuilder; 037import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 038import org.apache.hadoop.hbase.regionserver.wal.AbstractTestFSWAL; 039import org.apache.hadoop.hbase.testclassification.LargeTests; 040import org.apache.hadoop.hbase.testclassification.RegionServerTests; 041import org.apache.hadoop.hbase.util.Bytes; 042import org.apache.hadoop.hbase.util.Threads; 043import org.apache.hadoop.hbase.wal.WAL; 044import org.junit.jupiter.api.BeforeEach; 045import org.junit.jupiter.api.Tag; 046import org.junit.jupiter.api.Test; 047import org.mockito.Mockito; 048 049/** 050 * This test verifies the correctness of the Per Column Family flushing strategy when part of the 051 * memstores are compacted memstores 052 */ 053@Tag(RegionServerTests.TAG) 054@Tag(LargeTests.TAG) 055public class TestWalAndCompactingMemStoreFlush { 056 057 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 058 private static final Path DIR = TEST_UTIL.getDataTestDir("TestHRegion"); 059 public static final TableName TABLENAME = 060 TableName.valueOf("TestWalAndCompactingMemStoreFlush", "t1"); 061 062 public static final byte[][] FAMILIES = { Bytes.toBytes("f1"), Bytes.toBytes("f2"), 063 Bytes.toBytes("f3"), Bytes.toBytes("f4"), Bytes.toBytes("f5") }; 064 065 public static final byte[] FAMILY1 = FAMILIES[0]; 066 public static final byte[] FAMILY2 = FAMILIES[1]; 067 public static final byte[] FAMILY3 = FAMILIES[2]; 068 069 private Configuration conf; 070 071 private HRegion initHRegion(String callingMethod, Configuration conf) throws IOException { 072 int i = 0; 073 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TABLENAME); 074 for (byte[] family : FAMILIES) { 075 ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family); 076 // even column families are going to have compacted memstore 077 if (i % 2 == 0) { 078 cfBuilder.setInMemoryCompaction(MemoryCompactionPolicy 079 .valueOf(conf.get(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY))); 080 } else { 081 cfBuilder.setInMemoryCompaction(MemoryCompactionPolicy.NONE); 082 } 083 builder.setColumnFamily(cfBuilder.build()); 084 i++; 085 } 086 087 RegionInfo info = RegionInfoBuilder.newBuilder(TABLENAME).build(); 088 Path path = new Path(DIR, callingMethod); 089 HRegion region = HBaseTestingUtil.createRegionAndWAL(info, path, conf, builder.build(), false); 090 region.regionServicesForStores = Mockito.spy(region.regionServicesForStores); 091 ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1); 092 Mockito.when(region.regionServicesForStores.getInMemoryCompactionPool()).thenReturn(pool); 093 region.initialize(null); 094 return region; 095 } 096 097 // A helper function to create puts. 098 private Put createPut(int familyNum, int putNum) { 099 byte[] qf = Bytes.toBytes("q" + familyNum); 100 byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum); 101 byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum); 102 Put p = new Put(row); 103 p.addColumn(FAMILIES[familyNum - 1], qf, val); 104 return p; 105 } 106 107 // A helper function to create double puts, so something can be compacted later. 108 private Put createDoublePut(int familyNum, int putNum) { 109 byte[] qf = Bytes.toBytes("q" + familyNum); 110 byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum); 111 byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum); 112 Put p = new Put(row); 113 // add twice with different timestamps 114 p.addColumn(FAMILIES[familyNum - 1], qf, 10, val); 115 p.addColumn(FAMILIES[familyNum - 1], qf, 20, val); 116 return p; 117 } 118 119 private void verifyInMemoryFlushSize(Region region) { 120 assertEquals( 121 ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore).getInmemoryFlushSize(), 122 ((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore).getInmemoryFlushSize()); 123 } 124 125 @BeforeEach 126 public void setup() { 127 conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); 128 conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, 129 FlushNonSloppyStoresFirstPolicy.class.getName()); 130 conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.5); 131 } 132 133 @Test 134 public void testSelectiveFlushWithEager() throws IOException { 135 // Set up the configuration 136 conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 300 * 1024); 137 conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 75 * 1024); 138 // set memstore to do data compaction 139 conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, 140 String.valueOf(MemoryCompactionPolicy.EAGER)); 141 142 // Intialize the region 143 HRegion region = initHRegion("testSelectiveFlushWithEager", conf); 144 verifyInMemoryFlushSize(region); 145 // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3 146 for (int i = 1; i <= 1200; i++) { 147 region.put(createPut(1, i)); // compacted memstore, all the keys are unique 148 149 if (i <= 100) { 150 region.put(createPut(2, i)); 151 if (i <= 50) { 152 // compacted memstore, subject for compaction due to duplications 153 region.put(createDoublePut(3, i)); 154 } 155 } 156 } 157 158 // Now add more puts for CF2, so that we only flush CF2 (DefaultMemStore) to disk 159 for (int i = 100; i < 2000; i++) { 160 region.put(createPut(2, i)); 161 } 162 163 long totalMemstoreSize = region.getMemStoreDataSize(); 164 165 // Find the smallest LSNs for edits wrt to each CF. 166 long smallestSeqCF1PhaseI = region.getOldestSeqIdOfStore(FAMILY1); 167 long smallestSeqCF2PhaseI = region.getOldestSeqIdOfStore(FAMILY2); 168 long smallestSeqCF3PhaseI = region.getOldestSeqIdOfStore(FAMILY3); 169 170 // Find the sizes of the memstores of each CF. 171 MemStoreSize cf1MemstoreSizePhaseI = region.getStore(FAMILY1).getMemStoreSize(); 172 MemStoreSize cf2MemstoreSizePhaseI = region.getStore(FAMILY2).getMemStoreSize(); 173 MemStoreSize cf3MemstoreSizePhaseI = region.getStore(FAMILY3).getMemStoreSize(); 174 175 // Get the overall smallest LSN in the region's memstores. 176 long smallestSeqInRegionCurrentMemstorePhaseI = AbstractTestFSWAL 177 .getEarliestMemStoreSeqNum(getWAL(region), region.getRegionInfo().getEncodedNameAsBytes()); 178 179 String s = "\n\n----------------------------------\n" 180 + "Upon initial insert and before any flush, size of CF1 is:" + cf1MemstoreSizePhaseI 181 + ", is CF1 compacted memstore?:" + region.getStore(FAMILY1).isSloppyMemStore() 182 + ". Size of CF2 is:" + cf2MemstoreSizePhaseI + ", is CF2 compacted memstore?:" 183 + region.getStore(FAMILY2).isSloppyMemStore() + ". Size of CF3 is:" + cf3MemstoreSizePhaseI 184 + ", is CF3 compacted memstore?:" + region.getStore(FAMILY3).isSloppyMemStore() + "\n"; 185 186 // The overall smallest LSN in the region's memstores should be the same as 187 // the LSN of the smallest edit in CF1 188 assertEquals(smallestSeqCF1PhaseI, smallestSeqInRegionCurrentMemstorePhaseI); 189 190 // Some other sanity checks. 191 assertTrue(smallestSeqCF1PhaseI < smallestSeqCF2PhaseI); 192 assertTrue(smallestSeqCF2PhaseI < smallestSeqCF3PhaseI); 193 assertTrue(cf1MemstoreSizePhaseI.getDataSize() > 0); 194 assertTrue(cf2MemstoreSizePhaseI.getDataSize() > 0); 195 assertTrue(cf3MemstoreSizePhaseI.getDataSize() > 0); 196 197 // The total memstore size should be the same as the sum of the sizes of 198 // memstores of CF1, CF2 and CF3. 199 String msg = "totalMemstoreSize=" + totalMemstoreSize + " cf1MemstoreSizePhaseI=" 200 + cf1MemstoreSizePhaseI + " cf2MemstoreSizePhaseI=" + cf2MemstoreSizePhaseI 201 + " cf3MemstoreSizePhaseI=" + cf3MemstoreSizePhaseI; 202 assertEquals(totalMemstoreSize, cf1MemstoreSizePhaseI.getDataSize() 203 + cf2MemstoreSizePhaseI.getDataSize() + cf3MemstoreSizePhaseI.getDataSize(), msg); 204 205 // Flush!!!!!!!!!!!!!!!!!!!!!! 206 // We have big compacting memstore CF1 and two small memstores: 207 // CF2 (not compacted) and CF3 (compacting) 208 // All together they are above the flush size lower bound. 209 // Since CF1 and CF3 should be flushed to memory (not to disk), 210 // CF2 is going to be flushed to disk. 211 // CF1 - nothing to compact (but flattening), CF3 - should be twice compacted 212 CompactingMemStore cms1 = (CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore; 213 CompactingMemStore cms3 = (CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore; 214 cms1.flushInMemory(); 215 cms3.flushInMemory(); 216 region.flush(false); 217 218 // Recalculate everything 219 MemStoreSize cf1MemstoreSizePhaseII = region.getStore(FAMILY1).getMemStoreSize(); 220 MemStoreSize cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getMemStoreSize(); 221 MemStoreSize cf3MemstoreSizePhaseII = region.getStore(FAMILY3).getMemStoreSize(); 222 223 long smallestSeqInRegionCurrentMemstorePhaseII = AbstractTestFSWAL 224 .getEarliestMemStoreSeqNum(getWAL(region), region.getRegionInfo().getEncodedNameAsBytes()); 225 // Find the smallest LSNs for edits wrt to each CF. 226 long smallestSeqCF1PhaseII = region.getOldestSeqIdOfStore(FAMILY1); 227 long smallestSeqCF2PhaseII = region.getOldestSeqIdOfStore(FAMILY2); 228 long smallestSeqCF3PhaseII = region.getOldestSeqIdOfStore(FAMILY3); 229 230 s = s + "\n----After first flush! CF1 should be flushed to memory, but not compacted.---\n" 231 + "Size of CF1 is:" + cf1MemstoreSizePhaseII + ", size of CF2 is:" + cf2MemstoreSizePhaseII 232 + ", size of CF3 is:" + cf3MemstoreSizePhaseII + "\n"; 233 234 // CF1 was flushed to memory, but there is nothing to compact, and CF1 was flattened 235 assertTrue(cf1MemstoreSizePhaseII.getDataSize() == cf1MemstoreSizePhaseI.getDataSize()); 236 assertTrue(cf1MemstoreSizePhaseII.getHeapSize() < cf1MemstoreSizePhaseI.getHeapSize()); 237 238 // CF2 should become empty 239 assertEquals(0, cf2MemstoreSizePhaseII.getDataSize()); 240 assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseII.getHeapSize()); 241 242 // verify that CF3 was flushed to memory and was compacted (this is approximation check) 243 assertTrue(cf3MemstoreSizePhaseI.getDataSize() > cf3MemstoreSizePhaseII.getDataSize()); 244 assertTrue(cf3MemstoreSizePhaseI.getHeapSize() / 2 > cf3MemstoreSizePhaseII.getHeapSize()); 245 246 // Now the smallest LSN in the region should be the same as the smallest 247 // LSN in the memstore of CF1. 248 assertEquals(smallestSeqInRegionCurrentMemstorePhaseII, smallestSeqCF1PhaseI); 249 250 // Now add more puts for CF1, so that we also flush CF1 to disk instead of 251 // memory in next flush 252 for (int i = 1200; i < 3000; i++) { 253 region.put(createPut(1, i)); 254 } 255 256 s = s + "The smallest sequence in region WAL is: " + smallestSeqInRegionCurrentMemstorePhaseII 257 + ", the smallest sequence in CF1:" + smallestSeqCF1PhaseII + ", " 258 + "the smallest sequence in CF2:" + smallestSeqCF2PhaseII + ", the smallest sequence in CF3:" 259 + smallestSeqCF3PhaseII + "\n"; 260 261 // How much does the CF1 memstore occupy? Will be used later. 262 MemStoreSize cf1MemstoreSizePhaseIII = region.getStore(FAMILY1).getMemStoreSize(); 263 long smallestSeqCF1PhaseIII = region.getOldestSeqIdOfStore(FAMILY1); 264 265 s = s + "----After more puts into CF1 its size is:" + cf1MemstoreSizePhaseIII 266 + ", and its sequence is:" + smallestSeqCF1PhaseIII + " ----\n"; 267 268 // Flush!!!!!!!!!!!!!!!!!!!!!! 269 // Flush again, CF1 is flushed to disk 270 // CF2 is flushed to disk, because it is not in-memory compacted memstore 271 // CF3 is flushed empty to memory (actually nothing happens to CF3) 272 region.flush(false); 273 274 // Recalculate everything 275 MemStoreSize cf1MemstoreSizePhaseIV = region.getStore(FAMILY1).getMemStoreSize(); 276 MemStoreSize cf2MemstoreSizePhaseIV = region.getStore(FAMILY2).getMemStoreSize(); 277 MemStoreSize cf3MemstoreSizePhaseIV = region.getStore(FAMILY3).getMemStoreSize(); 278 279 long smallestSeqInRegionCurrentMemstorePhaseIV = AbstractTestFSWAL 280 .getEarliestMemStoreSeqNum(getWAL(region), region.getRegionInfo().getEncodedNameAsBytes()); 281 long smallestSeqCF1PhaseIV = region.getOldestSeqIdOfStore(FAMILY1); 282 long smallestSeqCF2PhaseIV = region.getOldestSeqIdOfStore(FAMILY2); 283 long smallestSeqCF3PhaseIV = region.getOldestSeqIdOfStore(FAMILY3); 284 285 s = s + "----After SECOND FLUSH, CF1 size is:" + cf1MemstoreSizePhaseIV + ", CF2 size is:" 286 + cf2MemstoreSizePhaseIV + " and CF3 size is:" + cf3MemstoreSizePhaseIV + "\n"; 287 288 s = s + "The smallest sequence in region WAL is: " + smallestSeqInRegionCurrentMemstorePhaseIV 289 + ", the smallest sequence in CF1:" + smallestSeqCF1PhaseIV + ", " 290 + "the smallest sequence in CF2:" + smallestSeqCF2PhaseIV + ", the smallest sequence in CF3:" 291 + smallestSeqCF3PhaseIV + "\n"; 292 293 // CF1's pipeline component (inserted before first flush) should be flushed to disk 294 // CF2 should be flushed to disk 295 assertTrue(cf1MemstoreSizePhaseIII.getDataSize() > cf1MemstoreSizePhaseIV.getDataSize()); 296 assertEquals(0, cf2MemstoreSizePhaseIV.getDataSize()); 297 assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseIV.getHeapSize()); 298 299 // CF3 shouldn't have been touched. 300 assertEquals(cf3MemstoreSizePhaseIV, cf3MemstoreSizePhaseII); 301 302 // the smallest LSN of CF3 shouldn't change 303 assertEquals(smallestSeqCF3PhaseII, smallestSeqCF3PhaseIV); 304 305 // CF3 should be bottleneck for WAL 306 assertEquals(smallestSeqInRegionCurrentMemstorePhaseIV, smallestSeqCF3PhaseIV, s); 307 308 // Flush!!!!!!!!!!!!!!!!!!!!!! 309 // Trying to clean the existing memstores, CF2 all flushed to disk. The single 310 // memstore segment in the compaction pipeline of CF1 and CF3 should be flushed to disk. 311 region.flush(true); 312 313 // Recalculate everything 314 MemStoreSize cf1MemstoreSizePhaseV = region.getStore(FAMILY1).getMemStoreSize(); 315 MemStoreSize cf2MemstoreSizePhaseV = region.getStore(FAMILY2).getMemStoreSize(); 316 MemStoreSize cf3MemstoreSizePhaseV = region.getStore(FAMILY3).getMemStoreSize(); 317 long smallestSeqInRegionCurrentMemstorePhaseV = AbstractTestFSWAL 318 .getEarliestMemStoreSeqNum(getWAL(region), region.getRegionInfo().getEncodedNameAsBytes()); 319 320 assertEquals(0, cf1MemstoreSizePhaseV.getDataSize()); 321 assertEquals(MutableSegment.DEEP_OVERHEAD, cf1MemstoreSizePhaseV.getHeapSize()); 322 assertEquals(0, cf2MemstoreSizePhaseV.getDataSize()); 323 assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseV.getHeapSize()); 324 assertEquals(0, cf3MemstoreSizePhaseV.getDataSize()); 325 assertEquals(MutableSegment.DEEP_OVERHEAD, cf3MemstoreSizePhaseV.getHeapSize()); 326 327 // What happens when we hit the memstore limit, but we are not able to find 328 // any Column Family above the threshold? 329 // In that case, we should flush all the CFs. 330 331 // The memstore limit is 100*1024 and the column family flush threshold is 332 // around 25*1024. We try to just hit the memstore limit with each CF's 333 // memstore being below the CF flush threshold. 334 for (int i = 1; i <= 300; i++) { 335 region.put(createPut(1, i)); 336 region.put(createPut(2, i)); 337 region.put(createPut(3, i)); 338 region.put(createPut(4, i)); 339 region.put(createPut(5, i)); 340 } 341 342 region.flush(false); 343 344 s = s + "----AFTER THIRD AND FORTH FLUSH, The smallest sequence in region WAL is: " 345 + smallestSeqInRegionCurrentMemstorePhaseV 346 + ". After additional inserts and last flush, the entire region size is:" 347 + region.getMemStoreDataSize() + "\n----------------------------------\n"; 348 349 // Since we won't find any CF above the threshold, and hence no specific 350 // store to flush, we should flush all the memstores 351 // Also compacted memstores are flushed to disk. 352 assertEquals(0, region.getMemStoreDataSize()); 353 System.out.println(s); 354 HBaseTestingUtil.closeRegionAndWAL(region); 355 } 356 357 /*------------------------------------------------------------------------------*/ 358 /* Check the same as above but for index-compaction type of compacting memstore */ 359 @Test 360 public void testSelectiveFlushWithIndexCompaction() throws IOException { 361 /*------------------------------------------------------------------------------*/ 362 /* SETUP */ 363 // Set up the configuration 364 conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 300 * 1024); 365 conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 75 * 1024); 366 conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.5); 367 // set memstore to index-compaction 368 conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, 369 String.valueOf(MemoryCompactionPolicy.BASIC)); 370 371 // Initialize the region 372 HRegion region = initHRegion("testSelectiveFlushWithIndexCompaction", conf); 373 verifyInMemoryFlushSize(region); 374 /*------------------------------------------------------------------------------*/ 375 /* PHASE I - insertions */ 376 // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3 377 for (int i = 1; i <= 1200; i++) { 378 region.put(createPut(1, i)); // compacted memstore 379 if (i <= 100) { 380 region.put(createPut(2, i)); 381 if (i <= 50) { 382 region.put(createDoublePut(3, i)); // subject for in-memory compaction 383 } 384 } 385 } 386 // Now add more puts for CF2, so that we only flush CF2 to disk 387 for (int i = 100; i < 2000; i++) { 388 region.put(createPut(2, i)); 389 } 390 391 /*------------------------------------------------------------------------------*/ 392 /*------------------------------------------------------------------------------*/ 393 /* PHASE I - collect sizes */ 394 long totalMemstoreSizePhaseI = region.getMemStoreDataSize(); 395 // Find the smallest LSNs for edits wrt to each CF. 396 long smallestSeqCF1PhaseI = region.getOldestSeqIdOfStore(FAMILY1); 397 long smallestSeqCF2PhaseI = region.getOldestSeqIdOfStore(FAMILY2); 398 long smallestSeqCF3PhaseI = region.getOldestSeqIdOfStore(FAMILY3); 399 // Find the sizes of the memstores of each CF. 400 MemStoreSize cf1MemstoreSizePhaseI = region.getStore(FAMILY1).getMemStoreSize(); 401 MemStoreSize cf2MemstoreSizePhaseI = region.getStore(FAMILY2).getMemStoreSize(); 402 MemStoreSize cf3MemstoreSizePhaseI = region.getStore(FAMILY3).getMemStoreSize(); 403 // Get the overall smallest LSN in the region's memstores. 404 long smallestSeqInRegionCurrentMemstorePhaseI = AbstractTestFSWAL 405 .getEarliestMemStoreSeqNum(getWAL(region), region.getRegionInfo().getEncodedNameAsBytes()); 406 407 /*------------------------------------------------------------------------------*/ 408 /* PHASE I - validation */ 409 // The overall smallest LSN in the region's memstores should be the same as 410 // the LSN of the smallest edit in CF1 411 assertEquals(smallestSeqCF1PhaseI, smallestSeqInRegionCurrentMemstorePhaseI); 412 // Some other sanity checks. 413 assertTrue(smallestSeqCF1PhaseI < smallestSeqCF2PhaseI); 414 assertTrue(smallestSeqCF2PhaseI < smallestSeqCF3PhaseI); 415 assertTrue(cf1MemstoreSizePhaseI.getDataSize() > 0); 416 assertTrue(cf2MemstoreSizePhaseI.getDataSize() > 0); 417 assertTrue(cf3MemstoreSizePhaseI.getDataSize() > 0); 418 419 // The total memstore size should be the same as the sum of the sizes of 420 // memstores of CF1, CF2 and CF3. 421 assertEquals(totalMemstoreSizePhaseI, cf1MemstoreSizePhaseI.getDataSize() 422 + cf2MemstoreSizePhaseI.getDataSize() + cf3MemstoreSizePhaseI.getDataSize()); 423 424 /*------------------------------------------------------------------------------*/ 425 /* PHASE I - Flush */ 426 // First Flush in Test!!!!!!!!!!!!!!!!!!!!!! 427 // CF1, CF2, CF3, all together they are above the flush size lower bound. 428 // Since CF1 and CF3 are compacting, CF2 is going to be flushed to disk. 429 // CF1 and CF3 - flushed to memory and flatten explicitly 430 region.flush(false); 431 CompactingMemStore cms1 = (CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore; 432 CompactingMemStore cms3 = (CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore; 433 cms1.flushInMemory(); 434 cms3.flushInMemory(); 435 436 // CF3/CF1 should be merged so wait here to be sure the compaction is done 437 while ( 438 ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore) 439 .isMemStoreFlushingInMemory() 440 ) { 441 Threads.sleep(10); 442 } 443 while ( 444 ((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore) 445 .isMemStoreFlushingInMemory() 446 ) { 447 Threads.sleep(10); 448 } 449 450 /*------------------------------------------------------------------------------*/ 451 /*------------------------------------------------------------------------------*/ 452 /* PHASE II - collect sizes */ 453 // Recalculate everything 454 MemStoreSize cf1MemstoreSizePhaseII = region.getStore(FAMILY1).getMemStoreSize(); 455 MemStoreSize cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getMemStoreSize(); 456 MemStoreSize cf3MemstoreSizePhaseII = region.getStore(FAMILY3).getMemStoreSize(); 457 long smallestSeqInRegionCurrentMemstorePhaseII = AbstractTestFSWAL 458 .getEarliestMemStoreSeqNum(getWAL(region), region.getRegionInfo().getEncodedNameAsBytes()); 459 // Find the smallest LSNs for edits wrt to each CF. 460 long smallestSeqCF3PhaseII = region.getOldestSeqIdOfStore(FAMILY3); 461 long totalMemstoreSizePhaseII = region.getMemStoreDataSize(); 462 463 /*------------------------------------------------------------------------------*/ 464 /* PHASE II - validation */ 465 // CF1 was flushed to memory, should be flattened and take less space 466 assertEquals(cf1MemstoreSizePhaseII.getDataSize(), cf1MemstoreSizePhaseI.getDataSize()); 467 assertTrue(cf1MemstoreSizePhaseII.getHeapSize() < cf1MemstoreSizePhaseI.getHeapSize()); 468 // CF2 should become empty 469 assertEquals(0, cf2MemstoreSizePhaseII.getDataSize()); 470 assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseII.getHeapSize()); 471 // verify that CF3 was flushed to memory and was not compacted (this is an approximation check) 472 // if compacted CF# should be at least twice less because its every key was duplicated 473 assertEquals(cf3MemstoreSizePhaseII.getDataSize(), cf3MemstoreSizePhaseI.getDataSize()); 474 assertTrue(cf3MemstoreSizePhaseI.getHeapSize() / 2 < cf3MemstoreSizePhaseII.getHeapSize()); 475 476 // Now the smallest LSN in the region should be the same as the smallest 477 // LSN in the memstore of CF1. 478 assertEquals(smallestSeqInRegionCurrentMemstorePhaseII, smallestSeqCF1PhaseI); 479 // The total memstore size should be the same as the sum of the sizes of 480 // memstores of CF1, CF2 and CF3. Counting the empty active segments in CF1/2/3 and pipeline 481 // items in CF1/2 482 assertEquals(totalMemstoreSizePhaseII, cf1MemstoreSizePhaseII.getDataSize() 483 + cf2MemstoreSizePhaseII.getDataSize() + cf3MemstoreSizePhaseII.getDataSize()); 484 485 /*------------------------------------------------------------------------------*/ 486 /*------------------------------------------------------------------------------*/ 487 /* PHASE III - insertions */ 488 // Now add more puts for CF1, so that we also flush CF1 to disk instead of 489 // memory in next flush. This is causing the CF! to be flushed to memory twice. 490 for (int i = 1200; i < 8000; i++) { 491 region.put(createPut(1, i)); 492 } 493 494 // CF1 should be flatten and merged so wait here to be sure the compaction is done 495 while ( 496 ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore) 497 .isMemStoreFlushingInMemory() 498 ) { 499 Threads.sleep(10); 500 } 501 502 /*------------------------------------------------------------------------------*/ 503 /* PHASE III - collect sizes */ 504 // How much does the CF1 memstore occupy now? Will be used later. 505 MemStoreSize cf1MemstoreSizePhaseIII = region.getStore(FAMILY1).getMemStoreSize(); 506 long totalMemstoreSizePhaseIII = region.getMemStoreDataSize(); 507 508 /*------------------------------------------------------------------------------*/ 509 /* PHASE III - validation */ 510 // The total memstore size should be the same as the sum of the sizes of 511 // memstores of CF1, CF2 and CF3. Counting the empty active segments in CF1/2/3 and pipeline 512 // items in CF1/2 513 assertEquals(totalMemstoreSizePhaseIII, cf1MemstoreSizePhaseIII.getDataSize() 514 + cf2MemstoreSizePhaseII.getDataSize() + cf3MemstoreSizePhaseII.getDataSize()); 515 516 /*------------------------------------------------------------------------------*/ 517 /* PHASE III - Flush */ 518 // Second Flush in Test!!!!!!!!!!!!!!!!!!!!!! 519 // CF1 is flushed to disk, but not entirely emptied. 520 // CF2 was and remained empty, same way nothing happens to CF3 521 region.flush(false); 522 523 /*------------------------------------------------------------------------------*/ 524 /*------------------------------------------------------------------------------*/ 525 /* PHASE IV - collect sizes */ 526 // Recalculate everything 527 MemStoreSize cf1MemstoreSizePhaseIV = region.getStore(FAMILY1).getMemStoreSize(); 528 MemStoreSize cf2MemstoreSizePhaseIV = region.getStore(FAMILY2).getMemStoreSize(); 529 MemStoreSize cf3MemstoreSizePhaseIV = region.getStore(FAMILY3).getMemStoreSize(); 530 long smallestSeqInRegionCurrentMemstorePhaseIV = AbstractTestFSWAL 531 .getEarliestMemStoreSeqNum(getWAL(region), region.getRegionInfo().getEncodedNameAsBytes()); 532 long smallestSeqCF3PhaseIV = region.getOldestSeqIdOfStore(FAMILY3); 533 534 /*------------------------------------------------------------------------------*/ 535 /* PHASE IV - validation */ 536 // CF1's biggest pipeline component (inserted before first flush) should be flushed to disk 537 // CF2 should remain empty 538 assertTrue(cf1MemstoreSizePhaseIII.getDataSize() > cf1MemstoreSizePhaseIV.getDataSize()); 539 assertEquals(0, cf2MemstoreSizePhaseIV.getDataSize()); 540 assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseIV.getHeapSize()); 541 // CF3 shouldn't have been touched. 542 assertEquals(cf3MemstoreSizePhaseIV, cf3MemstoreSizePhaseII); 543 // the smallest LSN of CF3 shouldn't change 544 assertEquals(smallestSeqCF3PhaseII, smallestSeqCF3PhaseIV); 545 // CF3 should be bottleneck for WAL 546 assertEquals(smallestSeqInRegionCurrentMemstorePhaseIV, smallestSeqCF3PhaseIV); 547 548 /*------------------------------------------------------------------------------*/ 549 /* PHASE IV - Flush */ 550 // Third Flush in Test!!!!!!!!!!!!!!!!!!!!!! 551 // Force flush to disk on all memstores (flush parameter true). 552 // CF1/CF3 all flushed to disk. Note that active sets of CF1 and CF3 are empty 553 region.flush(true); 554 555 /*------------------------------------------------------------------------------*/ 556 /*------------------------------------------------------------------------------*/ 557 /* PHASE V - collect sizes */ 558 // Recalculate everything 559 MemStoreSize cf1MemstoreSizePhaseV = region.getStore(FAMILY1).getMemStoreSize(); 560 MemStoreSize cf2MemstoreSizePhaseV = region.getStore(FAMILY2).getMemStoreSize(); 561 MemStoreSize cf3MemstoreSizePhaseV = region.getStore(FAMILY3).getMemStoreSize(); 562 long smallestSeqInRegionCurrentMemstorePhaseV = AbstractTestFSWAL 563 .getEarliestMemStoreSeqNum(getWAL(region), region.getRegionInfo().getEncodedNameAsBytes()); 564 long totalMemstoreSizePhaseV = region.getMemStoreDataSize(); 565 566 /*------------------------------------------------------------------------------*/ 567 /* PHASE V - validation */ 568 assertEquals(0, cf1MemstoreSizePhaseV.getDataSize()); 569 assertEquals(MutableSegment.DEEP_OVERHEAD, cf1MemstoreSizePhaseV.getHeapSize()); 570 assertEquals(0, cf2MemstoreSizePhaseV.getDataSize()); 571 assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseV.getHeapSize()); 572 assertEquals(0, cf3MemstoreSizePhaseV.getDataSize()); 573 assertEquals(MutableSegment.DEEP_OVERHEAD, cf3MemstoreSizePhaseV.getHeapSize()); 574 // The total memstores size should be empty 575 assertEquals(0, totalMemstoreSizePhaseV); 576 // Because there is nothing in any memstore the WAL's LSN should be -1 577 assertEquals(HConstants.NO_SEQNUM, smallestSeqInRegionCurrentMemstorePhaseV); 578 579 // What happens when we hit the memstore limit, but we are not able to find 580 // any Column Family above the threshold? 581 // In that case, we should flush all the CFs. 582 583 /*------------------------------------------------------------------------------*/ 584 /*------------------------------------------------------------------------------*/ 585 /* PHASE VI - insertions */ 586 // The memstore limit is 200*1024 and the column family flush threshold is 587 // around 50*1024. We try to just hit the memstore limit with each CF's 588 // memstore being below the CF flush threshold. 589 for (int i = 1; i <= 300; i++) { 590 region.put(createPut(1, i)); 591 region.put(createPut(2, i)); 592 region.put(createPut(3, i)); 593 region.put(createPut(4, i)); 594 region.put(createPut(5, i)); 595 } 596 597 MemStoreSize cf1ActiveSizePhaseVI = region.getStore(FAMILY1).getMemStoreSize(); 598 MemStoreSize cf3ActiveSizePhaseVI = region.getStore(FAMILY3).getMemStoreSize(); 599 MemStoreSize cf5ActiveSizePhaseVI = region.getStore(FAMILIES[4]).getMemStoreSize(); 600 601 /*------------------------------------------------------------------------------*/ 602 /* PHASE VI - Flush */ 603 // Fourth Flush in Test!!!!!!!!!!!!!!!!!!!!!! 604 // None among compacting memstores was flushed to memory due to previous puts. 605 // But is going to be moved to pipeline and flatten due to the flush. 606 region.flush(false); 607 // Since we won't find any CF above the threshold, and hence no specific 608 // store to flush, we should flush all the memstores 609 // Also compacted memstores are flushed to disk, but not entirely emptied 610 MemStoreSize cf1ActiveSizePhaseVII = region.getStore(FAMILY1).getMemStoreSize(); 611 MemStoreSize cf3ActiveSizePhaseVII = region.getStore(FAMILY3).getMemStoreSize(); 612 MemStoreSize cf5ActiveSizePhaseVII = region.getStore(FAMILIES[4]).getMemStoreSize(); 613 614 assertTrue(cf1ActiveSizePhaseVII.getDataSize() < cf1ActiveSizePhaseVI.getDataSize()); 615 assertTrue(cf3ActiveSizePhaseVII.getDataSize() < cf3ActiveSizePhaseVI.getDataSize()); 616 assertTrue(cf5ActiveSizePhaseVII.getDataSize() < cf5ActiveSizePhaseVI.getDataSize()); 617 618 HBaseTestingUtil.closeRegionAndWAL(region); 619 } 620 621 @Test 622 public void testSelectiveFlushAndWALinDataCompaction() throws IOException { 623 // Set up the configuration 624 conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 300 * 1024); 625 conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 75 * 1024); 626 // set memstore to do data compaction and not to use the speculative scan 627 conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, 628 String.valueOf(MemoryCompactionPolicy.EAGER)); 629 630 // Intialize the HRegion 631 HRegion region = initHRegion("testSelectiveFlushAndWALinDataCompaction", conf); 632 verifyInMemoryFlushSize(region); 633 // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3 634 for (int i = 1; i <= 1200; i++) { 635 region.put(createPut(1, i)); 636 if (i <= 100) { 637 region.put(createPut(2, i)); 638 if (i <= 50) { 639 region.put(createPut(3, i)); 640 } 641 } 642 } 643 // Now add more puts for CF2, so that we only flush CF2 to disk 644 for (int i = 100; i < 2000; i++) { 645 region.put(createPut(2, i)); 646 } 647 648 // in this test check the non-composite snapshot - flashing only tail of the pipeline 649 ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore).setCompositeSnapshot(false); 650 ((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore).setCompositeSnapshot(false); 651 652 long totalMemstoreSize = region.getMemStoreDataSize(); 653 654 // Find the sizes of the memstores of each CF. 655 MemStoreSize cf1MemstoreSizePhaseI = region.getStore(FAMILY1).getMemStoreSize(); 656 MemStoreSize cf2MemstoreSizePhaseI = region.getStore(FAMILY2).getMemStoreSize(); 657 MemStoreSize cf3MemstoreSizePhaseI = region.getStore(FAMILY3).getMemStoreSize(); 658 659 // Some other sanity checks. 660 assertTrue(cf1MemstoreSizePhaseI.getDataSize() > 0); 661 assertTrue(cf2MemstoreSizePhaseI.getDataSize() > 0); 662 assertTrue(cf3MemstoreSizePhaseI.getDataSize() > 0); 663 664 // The total memstore size should be the same as the sum of the sizes of 665 // memstores of CF1, CF2 and CF3. 666 String msg = "totalMemstoreSize=" + totalMemstoreSize + " DefaultMemStore.DEEP_OVERHEAD=" 667 + DefaultMemStore.DEEP_OVERHEAD + " cf1MemstoreSizePhaseI=" + cf1MemstoreSizePhaseI 668 + " cf2MemstoreSizePhaseI=" + cf2MemstoreSizePhaseI + " cf3MemstoreSizePhaseI=" 669 + cf3MemstoreSizePhaseI; 670 assertEquals(totalMemstoreSize, cf1MemstoreSizePhaseI.getDataSize() 671 + cf2MemstoreSizePhaseI.getDataSize() + cf3MemstoreSizePhaseI.getDataSize(), msg); 672 673 // Flush! 674 CompactingMemStore cms1 = (CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore; 675 CompactingMemStore cms3 = (CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore; 676 cms1.flushInMemory(); 677 cms3.flushInMemory(); 678 region.flush(false); 679 680 MemStoreSize cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getMemStoreSize(); 681 682 long smallestSeqInRegionCurrentMemstorePhaseII = AbstractTestFSWAL 683 .getEarliestMemStoreSeqNum(region.getWAL(), region.getRegionInfo().getEncodedNameAsBytes()); 684 long smallestSeqCF1PhaseII = region.getOldestSeqIdOfStore(FAMILY1); 685 long smallestSeqCF2PhaseII = region.getOldestSeqIdOfStore(FAMILY2); 686 long smallestSeqCF3PhaseII = region.getOldestSeqIdOfStore(FAMILY3); 687 688 // CF2 should have been cleared 689 assertEquals(0, cf2MemstoreSizePhaseII.getDataSize()); 690 assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseII.getHeapSize()); 691 692 String s = "\n\n----------------------------------\n" 693 + "Upon initial insert and flush, LSN of CF1 is:" + smallestSeqCF1PhaseII + ". LSN of CF2 is:" 694 + smallestSeqCF2PhaseII + ". LSN of CF3 is:" + smallestSeqCF3PhaseII 695 + ", smallestSeqInRegionCurrentMemstore:" + smallestSeqInRegionCurrentMemstorePhaseII + "\n"; 696 697 // Add same entries to compact them later 698 for (int i = 1; i <= 1200; i++) { 699 region.put(createPut(1, i)); 700 if (i <= 100) { 701 region.put(createPut(2, i)); 702 if (i <= 50) { 703 region.put(createPut(3, i)); 704 } 705 } 706 } 707 // Now add more puts for CF2, so that we only flush CF2 to disk 708 for (int i = 100; i < 2000; i++) { 709 region.put(createPut(2, i)); 710 } 711 712 long smallestSeqInRegionCurrentMemstorePhaseIII = AbstractTestFSWAL 713 .getEarliestMemStoreSeqNum(region.getWAL(), region.getRegionInfo().getEncodedNameAsBytes()); 714 long smallestSeqCF1PhaseIII = region.getOldestSeqIdOfStore(FAMILY1); 715 long smallestSeqCF2PhaseIII = region.getOldestSeqIdOfStore(FAMILY2); 716 long smallestSeqCF3PhaseIII = region.getOldestSeqIdOfStore(FAMILY3); 717 718 s = s + "The smallest sequence in region WAL is: " + smallestSeqInRegionCurrentMemstorePhaseIII 719 + ", the smallest sequence in CF1:" + smallestSeqCF1PhaseIII + ", " 720 + "the smallest sequence in CF2:" + smallestSeqCF2PhaseIII + ", the smallest sequence in CF3:" 721 + smallestSeqCF3PhaseIII + "\n"; 722 723 // Flush! 724 cms1 = (CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore; 725 cms3 = (CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore; 726 cms1.flushInMemory(); 727 cms3.flushInMemory(); 728 region.flush(false); 729 730 long smallestSeqInRegionCurrentMemstorePhaseIV = AbstractTestFSWAL 731 .getEarliestMemStoreSeqNum(region.getWAL(), region.getRegionInfo().getEncodedNameAsBytes()); 732 long smallestSeqCF1PhaseIV = region.getOldestSeqIdOfStore(FAMILY1); 733 long smallestSeqCF2PhaseIV = region.getOldestSeqIdOfStore(FAMILY2); 734 long smallestSeqCF3PhaseIV = region.getOldestSeqIdOfStore(FAMILY3); 735 736 s = s + "The smallest sequence in region WAL is: " + smallestSeqInRegionCurrentMemstorePhaseIV 737 + ", the smallest sequence in CF1:" + smallestSeqCF1PhaseIV + ", " 738 + "the smallest sequence in CF2:" + smallestSeqCF2PhaseIV + ", the smallest sequence in CF3:" 739 + smallestSeqCF3PhaseIV + "\n"; 740 741 // now check that the LSN of the entire WAL, of CF1 and of CF3 has progressed due to compaction 742 assertTrue( 743 smallestSeqInRegionCurrentMemstorePhaseIV > smallestSeqInRegionCurrentMemstorePhaseIII, s); 744 assertTrue(smallestSeqCF1PhaseIV > smallestSeqCF1PhaseIII); 745 assertTrue(smallestSeqCF3PhaseIV > smallestSeqCF3PhaseIII); 746 747 HBaseTestingUtil.closeRegionAndWAL(region); 748 } 749 750 @Test 751 public void testSelectiveFlushWithBasicAndMerge() throws IOException { 752 // Set up the configuration 753 conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 300 * 1024); 754 conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 75 * 1024); 755 conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.8); 756 // set memstore to do index compaction with merge 757 conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, 758 String.valueOf(MemoryCompactionPolicy.BASIC)); 759 // length of pipeline that requires merge 760 conf.setInt(MemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY, 1); 761 762 // Intialize the HRegion 763 HRegion region = initHRegion("testSelectiveFlushWithBasicAndMerge", conf); 764 verifyInMemoryFlushSize(region); 765 // Add 1200 entries for CF1 (CompactingMemStore), 100 for CF2 (DefaultMemStore) and 50 for CF3 766 for (int i = 1; i <= 1200; i++) { 767 region.put(createPut(1, i)); 768 if (i <= 100) { 769 region.put(createPut(2, i)); 770 if (i <= 50) { 771 region.put(createPut(3, i)); 772 } 773 } 774 } 775 // Now put more entries to CF2 776 for (int i = 100; i < 2000; i++) { 777 region.put(createPut(2, i)); 778 } 779 780 long totalMemstoreSize = region.getMemStoreDataSize(); 781 782 // test in-memory flashing into CAM here 783 ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore) 784 .setIndexType(CompactingMemStore.IndexType.ARRAY_MAP); 785 ((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore) 786 .setIndexType(CompactingMemStore.IndexType.ARRAY_MAP); 787 788 // Find the sizes of the memstores of each CF. 789 MemStoreSize cf1MemstoreSizePhaseI = region.getStore(FAMILY1).getMemStoreSize(); 790 MemStoreSize cf2MemstoreSizePhaseI = region.getStore(FAMILY2).getMemStoreSize(); 791 MemStoreSize cf3MemstoreSizePhaseI = region.getStore(FAMILY3).getMemStoreSize(); 792 793 // Some other sanity checks. 794 assertTrue(cf1MemstoreSizePhaseI.getDataSize() > 0); 795 assertTrue(cf2MemstoreSizePhaseI.getDataSize() > 0); 796 assertTrue(cf3MemstoreSizePhaseI.getDataSize() > 0); 797 798 // The total memstore size should be the same as the sum of the sizes of 799 // memstores of CF1, CF2 and CF3. 800 assertEquals(totalMemstoreSize, cf1MemstoreSizePhaseI.getDataSize() 801 + cf2MemstoreSizePhaseI.getDataSize() + cf3MemstoreSizePhaseI.getDataSize()); 802 803 // Initiate in-memory Flush! 804 ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore).flushInMemory(); 805 ((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore).flushInMemory(); 806 // CF1 and CF3 should be flatten and merged so wait here to be sure the merge is done 807 while ( 808 ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore) 809 .isMemStoreFlushingInMemory() 810 ) { 811 Threads.sleep(10); 812 } 813 while ( 814 ((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore) 815 .isMemStoreFlushingInMemory() 816 ) { 817 Threads.sleep(10); 818 } 819 820 // Flush-to-disk! CF2 only should be flushed 821 region.flush(false); 822 823 MemStoreSize cf1MemstoreSizePhaseII = region.getStore(FAMILY1).getMemStoreSize(); 824 MemStoreSize cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getMemStoreSize(); 825 MemStoreSize cf3MemstoreSizePhaseII = region.getStore(FAMILY3).getMemStoreSize(); 826 827 // CF1 should be flushed in memory and just flattened, so CF1 heap overhead should be smaller 828 assertTrue(cf1MemstoreSizePhaseI.getHeapSize() > cf1MemstoreSizePhaseII.getHeapSize()); 829 // CF1 should be flushed in memory and just flattened, so CF1 data size should remain the same 830 assertEquals(cf1MemstoreSizePhaseI.getDataSize(), cf1MemstoreSizePhaseII.getDataSize()); 831 // CF2 should have been cleared 832 assertEquals(0, cf2MemstoreSizePhaseII.getDataSize()); 833 834 // Add the same amount of entries to see the merging 835 for (int i = 1; i <= 1200; i++) { 836 region.put(createPut(1, i)); 837 if (i <= 100) { 838 region.put(createPut(2, i)); 839 if (i <= 50) { 840 region.put(createPut(3, i)); 841 } 842 } 843 } 844 // Now add more puts for CF2, so that we only flush CF2 to disk 845 for (int i = 100; i < 2000; i++) { 846 region.put(createPut(2, i)); 847 } 848 849 MemStoreSize cf1MemstoreSizePhaseIII = region.getStore(FAMILY1).getMemStoreSize(); 850 851 // Flush in memory! 852 ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore).flushInMemory(); 853 ((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore).flushInMemory(); 854 // CF1 and CF3 should be merged so wait here to be sure the merge is done 855 while ( 856 ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore) 857 .isMemStoreFlushingInMemory() 858 ) { 859 Threads.sleep(10); 860 } 861 while ( 862 ((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore) 863 .isMemStoreFlushingInMemory() 864 ) { 865 Threads.sleep(10); 866 } 867 region.flush(false); 868 869 MemStoreSize cf1MemstoreSizePhaseIV = region.getStore(FAMILY1).getMemStoreSize(); 870 MemStoreSize cf2MemstoreSizePhaseIV = region.getStore(FAMILY2).getMemStoreSize(); 871 872 assertEquals(2 * cf1MemstoreSizePhaseI.getDataSize(), cf1MemstoreSizePhaseIV.getDataSize()); 873 // the decrease in the heap size due to usage of CellArrayMap instead of CSLM 874 // should be the same in flattening and in merge (first and second in-memory-flush) 875 // but in phase 1 we do not yet have immutable segment 876 assertEquals(cf1MemstoreSizePhaseI.getHeapSize() - cf1MemstoreSizePhaseII.getHeapSize(), 877 cf1MemstoreSizePhaseIII.getHeapSize() - cf1MemstoreSizePhaseIV.getHeapSize() 878 - CellArrayImmutableSegment.DEEP_OVERHEAD_CAM); 879 assertEquals(3, // active, one in pipeline, snapshot 880 ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore).getSegments().size()); 881 // CF2 should have been cleared 882 assertEquals(0, cf2MemstoreSizePhaseIV.getDataSize(), 883 "\n<<< DEBUG: The data--heap sizes of stores before/after first flushes," + " CF1: " 884 + cf1MemstoreSizePhaseI.getDataSize() + "/" + cf1MemstoreSizePhaseII.getDataSize() + "--" 885 + cf1MemstoreSizePhaseI.getHeapSize() + "/" + cf1MemstoreSizePhaseII.getHeapSize() 886 + ", CF2: " + cf2MemstoreSizePhaseI.getDataSize() + "/" 887 + cf2MemstoreSizePhaseII.getDataSize() + "--" + cf2MemstoreSizePhaseI.getHeapSize() + "/" 888 + cf2MemstoreSizePhaseII.getHeapSize() + ", CF3: " + cf3MemstoreSizePhaseI.getDataSize() 889 + "/" + cf3MemstoreSizePhaseII.getDataSize() + "--" + cf3MemstoreSizePhaseI.getHeapSize() 890 + "/" + cf3MemstoreSizePhaseII.getHeapSize() + "\n<<< AND before/after second flushes " 891 + " CF1: " + cf1MemstoreSizePhaseIII.getDataSize() + "/" 892 + cf1MemstoreSizePhaseIV.getDataSize() + "--" + cf1MemstoreSizePhaseIII.getHeapSize() + "/" 893 + cf1MemstoreSizePhaseIV.getHeapSize() + "\n"); 894 895 HBaseTestingUtil.closeRegionAndWAL(region); 896 } 897 898 // should end in 300 seconds (5 minutes) 899 @Test 900 public void testStressFlushAndWALinIndexCompaction() throws IOException { 901 // Set up the configuration 902 conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 600 * 1024); 903 conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 904 200 * 1024); 905 // set memstore to do data compaction and not to use the speculative scan 906 conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, 907 String.valueOf(MemoryCompactionPolicy.BASIC)); 908 909 // Successfully initialize the HRegion 910 HRegion region = initHRegion("testSelectiveFlushAndWALinDataCompaction", conf); 911 verifyInMemoryFlushSize(region); 912 Thread[] threads = new Thread[25]; 913 for (int i = 0; i < threads.length; i++) { 914 int id = i * 10000; 915 ConcurrentPutRunnable runnable = new ConcurrentPutRunnable(region, id); 916 threads[i] = new Thread(runnable); 917 threads[i].start(); 918 } 919 Threads.sleep(10000); // let other threads start 920 region.flush(true); // enforce flush of everything TO DISK while there are still ongoing puts 921 Threads.sleep(10000); // let other threads continue 922 region.flush(true); // enforce flush of everything TO DISK while there are still ongoing puts 923 924 ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore).flushInMemory(); 925 ((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore).flushInMemory(); 926 while ( 927 ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore) 928 .isMemStoreFlushingInMemory() 929 ) { 930 Threads.sleep(10); 931 } 932 while ( 933 ((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore) 934 .isMemStoreFlushingInMemory() 935 ) { 936 Threads.sleep(10); 937 } 938 939 for (int i = 0; i < threads.length; i++) { 940 try { 941 threads[i].join(); 942 } catch (InterruptedException e) { 943 e.printStackTrace(); 944 } 945 } 946 } 947 948 /** 949 * The in-memory-flusher thread performs the flush asynchronously. There is at most one thread per 950 * memstore instance. It takes the updatesLock exclusively, pushes active into the pipeline, 951 * releases updatesLock and compacts the pipeline. 952 */ 953 private class ConcurrentPutRunnable implements Runnable { 954 private final HRegion stressedRegion; 955 private final int startNumber; 956 957 ConcurrentPutRunnable(HRegion r, int i) { 958 this.stressedRegion = r; 959 this.startNumber = i; 960 } 961 962 @Override 963 public void run() { 964 965 try { 966 int dummy = startNumber / 10000; 967 System.out.print("Thread " + dummy + " with start number " + startNumber + " starts\n"); 968 // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3 969 for (int i = startNumber; i <= startNumber + 3000; i++) { 970 stressedRegion.put(createPut(1, i)); 971 if (i <= startNumber + 2000) { 972 stressedRegion.put(createPut(2, i)); 973 if (i <= startNumber + 1000) { 974 stressedRegion.put(createPut(3, i)); 975 } 976 } 977 } 978 System.out.print("Thread with start number " + startNumber + " continues to more puts\n"); 979 // Now add more puts for CF2, so that we only flush CF2 to disk 980 for (int i = startNumber + 3000; i < startNumber + 5000; i++) { 981 stressedRegion.put(createPut(2, i)); 982 } 983 // And add more puts for CF1 984 for (int i = startNumber + 5000; i < startNumber + 7000; i++) { 985 stressedRegion.put(createPut(1, i)); 986 } 987 System.out.print("Thread with start number " + startNumber + " flushes\n"); 988 // flush (IN MEMORY) one of the stores (each thread flushes different store) 989 // and wait till the flush and the following action are done 990 if (startNumber == 0) { 991 ((CompactingMemStore) ((HStore) stressedRegion.getStore(FAMILY1)).memstore) 992 .flushInMemory(); 993 while ( 994 ((CompactingMemStore) ((HStore) stressedRegion.getStore(FAMILY1)).memstore) 995 .isMemStoreFlushingInMemory() 996 ) { 997 Threads.sleep(10); 998 } 999 } 1000 if (startNumber == 10000) { 1001 ((CompactingMemStore) ((HStore) stressedRegion.getStore(FAMILY2)).memstore) 1002 .flushInMemory(); 1003 while ( 1004 ((CompactingMemStore) ((HStore) stressedRegion.getStore(FAMILY2)).memstore) 1005 .isMemStoreFlushingInMemory() 1006 ) { 1007 Threads.sleep(10); 1008 } 1009 } 1010 if (startNumber == 20000) { 1011 ((CompactingMemStore) ((HStore) stressedRegion.getStore(FAMILY3)).memstore) 1012 .flushInMemory(); 1013 while ( 1014 ((CompactingMemStore) ((HStore) stressedRegion.getStore(FAMILY3)).memstore) 1015 .isMemStoreFlushingInMemory() 1016 ) { 1017 Threads.sleep(10); 1018 } 1019 } 1020 System.out.print("Thread with start number " + startNumber + " finishes\n"); 1021 } catch (IOException e) { 1022 assert false; 1023 } 1024 } 1025 } 1026 1027 private WAL getWAL(Region region) { 1028 return ((HRegion) region).getWAL(); 1029 } 1030}