001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.util.concurrent.Executors; 025import java.util.concurrent.ThreadPoolExecutor; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.hbase.HBaseClassTestRule; 029import org.apache.hadoop.hbase.HBaseConfiguration; 030import org.apache.hadoop.hbase.HBaseTestingUtil; 031import org.apache.hadoop.hbase.HConstants; 032import org.apache.hadoop.hbase.MemoryCompactionPolicy; 033import org.apache.hadoop.hbase.TableName; 034import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 035import org.apache.hadoop.hbase.client.Put; 036import org.apache.hadoop.hbase.client.RegionInfo; 037import org.apache.hadoop.hbase.client.RegionInfoBuilder; 038import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 039import org.apache.hadoop.hbase.testclassification.LargeTests; 040import org.apache.hadoop.hbase.testclassification.RegionServerTests; 041import org.apache.hadoop.hbase.util.Bytes; 042import org.apache.hadoop.hbase.util.Threads; 043import org.apache.hadoop.hbase.wal.WAL; 044import org.junit.Before; 045import org.junit.ClassRule; 046import org.junit.Test; 047import org.junit.experimental.categories.Category; 048import org.mockito.Mockito; 049 050/** 051 * This test verifies the correctness of the Per Column Family flushing strategy when part of the 052 * memstores are compacted memstores 053 */ 054@Category({ RegionServerTests.class, LargeTests.class }) 055public class TestWalAndCompactingMemStoreFlush { 056 057 @ClassRule 058 public static final HBaseClassTestRule CLASS_RULE = 059 HBaseClassTestRule.forClass(TestWalAndCompactingMemStoreFlush.class); 060 061 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 062 private static final Path DIR = TEST_UTIL.getDataTestDir("TestHRegion"); 063 public static final TableName TABLENAME = 064 TableName.valueOf("TestWalAndCompactingMemStoreFlush", "t1"); 065 066 public static final byte[][] FAMILIES = { Bytes.toBytes("f1"), Bytes.toBytes("f2"), 067 Bytes.toBytes("f3"), Bytes.toBytes("f4"), Bytes.toBytes("f5") }; 068 069 public static final byte[] FAMILY1 = FAMILIES[0]; 070 public static final byte[] FAMILY2 = FAMILIES[1]; 071 public static final byte[] FAMILY3 = FAMILIES[2]; 072 073 private Configuration conf; 074 075 private HRegion initHRegion(String callingMethod, Configuration conf) throws IOException { 076 int i = 0; 077 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TABLENAME); 078 for (byte[] family : FAMILIES) { 079 ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family); 080 // even column families are going to have compacted memstore 081 if (i % 2 == 0) { 082 cfBuilder.setInMemoryCompaction(MemoryCompactionPolicy 083 .valueOf(conf.get(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY))); 084 } else { 085 cfBuilder.setInMemoryCompaction(MemoryCompactionPolicy.NONE); 086 } 087 builder.setColumnFamily(cfBuilder.build()); 088 i++; 089 } 090 091 RegionInfo info = RegionInfoBuilder.newBuilder(TABLENAME).build(); 092 Path path = new Path(DIR, callingMethod); 093 HRegion region = HBaseTestingUtil.createRegionAndWAL(info, path, conf, builder.build(), false); 094 region.regionServicesForStores = Mockito.spy(region.regionServicesForStores); 095 ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1); 096 Mockito.when(region.regionServicesForStores.getInMemoryCompactionPool()).thenReturn(pool); 097 region.initialize(null); 098 return region; 099 } 100 101 // A helper function to create puts. 102 private Put createPut(int familyNum, int putNum) { 103 byte[] qf = Bytes.toBytes("q" + familyNum); 104 byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum); 105 byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum); 106 Put p = new Put(row); 107 p.addColumn(FAMILIES[familyNum - 1], qf, val); 108 return p; 109 } 110 111 // A helper function to create double puts, so something can be compacted later. 112 private Put createDoublePut(int familyNum, int putNum) { 113 byte[] qf = Bytes.toBytes("q" + familyNum); 114 byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum); 115 byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum); 116 Put p = new Put(row); 117 // add twice with different timestamps 118 p.addColumn(FAMILIES[familyNum - 1], qf, 10, val); 119 p.addColumn(FAMILIES[familyNum - 1], qf, 20, val); 120 return p; 121 } 122 123 private void verifyInMemoryFlushSize(Region region) { 124 assertEquals( 125 ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore).getInmemoryFlushSize(), 126 ((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore).getInmemoryFlushSize()); 127 } 128 129 @Before 130 public void setup() { 131 conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); 132 conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, 133 FlushNonSloppyStoresFirstPolicy.class.getName()); 134 conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.5); 135 } 136 137 @Test 138 public void testSelectiveFlushWithEager() throws IOException { 139 // Set up the configuration 140 conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 300 * 1024); 141 conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 75 * 1024); 142 // set memstore to do data compaction 143 conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, 144 String.valueOf(MemoryCompactionPolicy.EAGER)); 145 146 // Intialize the region 147 HRegion region = initHRegion("testSelectiveFlushWithEager", conf); 148 verifyInMemoryFlushSize(region); 149 // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3 150 for (int i = 1; i <= 1200; i++) { 151 region.put(createPut(1, i)); // compacted memstore, all the keys are unique 152 153 if (i <= 100) { 154 region.put(createPut(2, i)); 155 if (i <= 50) { 156 // compacted memstore, subject for compaction due to duplications 157 region.put(createDoublePut(3, i)); 158 } 159 } 160 } 161 162 // Now add more puts for CF2, so that we only flush CF2 (DefaultMemStore) to disk 163 for (int i = 100; i < 2000; i++) { 164 region.put(createPut(2, i)); 165 } 166 167 long totalMemstoreSize = region.getMemStoreDataSize(); 168 169 // Find the smallest LSNs for edits wrt to each CF. 170 long smallestSeqCF1PhaseI = region.getOldestSeqIdOfStore(FAMILY1); 171 long smallestSeqCF2PhaseI = region.getOldestSeqIdOfStore(FAMILY2); 172 long smallestSeqCF3PhaseI = region.getOldestSeqIdOfStore(FAMILY3); 173 174 // Find the sizes of the memstores of each CF. 175 MemStoreSize cf1MemstoreSizePhaseI = region.getStore(FAMILY1).getMemStoreSize(); 176 MemStoreSize cf2MemstoreSizePhaseI = region.getStore(FAMILY2).getMemStoreSize(); 177 MemStoreSize cf3MemstoreSizePhaseI = region.getStore(FAMILY3).getMemStoreSize(); 178 179 // Get the overall smallest LSN in the region's memstores. 180 long smallestSeqInRegionCurrentMemstorePhaseI = 181 getWAL(region).getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); 182 183 String s = "\n\n----------------------------------\n" 184 + "Upon initial insert and before any flush, size of CF1 is:" + cf1MemstoreSizePhaseI 185 + ", is CF1 compacted memstore?:" + region.getStore(FAMILY1).isSloppyMemStore() 186 + ". Size of CF2 is:" + cf2MemstoreSizePhaseI + ", is CF2 compacted memstore?:" 187 + region.getStore(FAMILY2).isSloppyMemStore() + ". Size of CF3 is:" + cf3MemstoreSizePhaseI 188 + ", is CF3 compacted memstore?:" + region.getStore(FAMILY3).isSloppyMemStore() + "\n"; 189 190 // The overall smallest LSN in the region's memstores should be the same as 191 // the LSN of the smallest edit in CF1 192 assertEquals(smallestSeqCF1PhaseI, smallestSeqInRegionCurrentMemstorePhaseI); 193 194 // Some other sanity checks. 195 assertTrue(smallestSeqCF1PhaseI < smallestSeqCF2PhaseI); 196 assertTrue(smallestSeqCF2PhaseI < smallestSeqCF3PhaseI); 197 assertTrue(cf1MemstoreSizePhaseI.getDataSize() > 0); 198 assertTrue(cf2MemstoreSizePhaseI.getDataSize() > 0); 199 assertTrue(cf3MemstoreSizePhaseI.getDataSize() > 0); 200 201 // The total memstore size should be the same as the sum of the sizes of 202 // memstores of CF1, CF2 and CF3. 203 String msg = "totalMemstoreSize=" + totalMemstoreSize + " cf1MemstoreSizePhaseI=" 204 + cf1MemstoreSizePhaseI + " cf2MemstoreSizePhaseI=" + cf2MemstoreSizePhaseI 205 + " cf3MemstoreSizePhaseI=" + cf3MemstoreSizePhaseI; 206 assertEquals(msg, totalMemstoreSize, cf1MemstoreSizePhaseI.getDataSize() 207 + cf2MemstoreSizePhaseI.getDataSize() + cf3MemstoreSizePhaseI.getDataSize()); 208 209 // Flush!!!!!!!!!!!!!!!!!!!!!! 210 // We have big compacting memstore CF1 and two small memstores: 211 // CF2 (not compacted) and CF3 (compacting) 212 // All together they are above the flush size lower bound. 213 // Since CF1 and CF3 should be flushed to memory (not to disk), 214 // CF2 is going to be flushed to disk. 215 // CF1 - nothing to compact (but flattening), CF3 - should be twice compacted 216 CompactingMemStore cms1 = (CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore; 217 CompactingMemStore cms3 = (CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore; 218 cms1.flushInMemory(); 219 cms3.flushInMemory(); 220 region.flush(false); 221 222 // Recalculate everything 223 MemStoreSize cf1MemstoreSizePhaseII = region.getStore(FAMILY1).getMemStoreSize(); 224 MemStoreSize cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getMemStoreSize(); 225 MemStoreSize cf3MemstoreSizePhaseII = region.getStore(FAMILY3).getMemStoreSize(); 226 227 long smallestSeqInRegionCurrentMemstorePhaseII = 228 getWAL(region).getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); 229 // Find the smallest LSNs for edits wrt to each CF. 230 long smallestSeqCF1PhaseII = region.getOldestSeqIdOfStore(FAMILY1); 231 long smallestSeqCF2PhaseII = region.getOldestSeqIdOfStore(FAMILY2); 232 long smallestSeqCF3PhaseII = region.getOldestSeqIdOfStore(FAMILY3); 233 234 s = s + "\n----After first flush! CF1 should be flushed to memory, but not compacted.---\n" 235 + "Size of CF1 is:" + cf1MemstoreSizePhaseII + ", size of CF2 is:" + cf2MemstoreSizePhaseII 236 + ", size of CF3 is:" + cf3MemstoreSizePhaseII + "\n"; 237 238 // CF1 was flushed to memory, but there is nothing to compact, and CF1 was flattened 239 assertTrue(cf1MemstoreSizePhaseII.getDataSize() == cf1MemstoreSizePhaseI.getDataSize()); 240 assertTrue(cf1MemstoreSizePhaseII.getHeapSize() < cf1MemstoreSizePhaseI.getHeapSize()); 241 242 // CF2 should become empty 243 assertEquals(0, cf2MemstoreSizePhaseII.getDataSize()); 244 assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseII.getHeapSize()); 245 246 // verify that CF3 was flushed to memory and was compacted (this is approximation check) 247 assertTrue(cf3MemstoreSizePhaseI.getDataSize() > cf3MemstoreSizePhaseII.getDataSize()); 248 assertTrue(cf3MemstoreSizePhaseI.getHeapSize() / 2 > cf3MemstoreSizePhaseII.getHeapSize()); 249 250 // Now the smallest LSN in the region should be the same as the smallest 251 // LSN in the memstore of CF1. 252 assertEquals(smallestSeqInRegionCurrentMemstorePhaseII, smallestSeqCF1PhaseI); 253 254 // Now add more puts for CF1, so that we also flush CF1 to disk instead of 255 // memory in next flush 256 for (int i = 1200; i < 3000; i++) { 257 region.put(createPut(1, i)); 258 } 259 260 s = s + "The smallest sequence in region WAL is: " + smallestSeqInRegionCurrentMemstorePhaseII 261 + ", the smallest sequence in CF1:" + smallestSeqCF1PhaseII + ", " 262 + "the smallest sequence in CF2:" + smallestSeqCF2PhaseII + ", the smallest sequence in CF3:" 263 + smallestSeqCF3PhaseII + "\n"; 264 265 // How much does the CF1 memstore occupy? Will be used later. 266 MemStoreSize cf1MemstoreSizePhaseIII = region.getStore(FAMILY1).getMemStoreSize(); 267 long smallestSeqCF1PhaseIII = region.getOldestSeqIdOfStore(FAMILY1); 268 269 s = s + "----After more puts into CF1 its size is:" + cf1MemstoreSizePhaseIII 270 + ", and its sequence is:" + smallestSeqCF1PhaseIII + " ----\n"; 271 272 // Flush!!!!!!!!!!!!!!!!!!!!!! 273 // Flush again, CF1 is flushed to disk 274 // CF2 is flushed to disk, because it is not in-memory compacted memstore 275 // CF3 is flushed empty to memory (actually nothing happens to CF3) 276 region.flush(false); 277 278 // Recalculate everything 279 MemStoreSize cf1MemstoreSizePhaseIV = region.getStore(FAMILY1).getMemStoreSize(); 280 MemStoreSize cf2MemstoreSizePhaseIV = region.getStore(FAMILY2).getMemStoreSize(); 281 MemStoreSize cf3MemstoreSizePhaseIV = region.getStore(FAMILY3).getMemStoreSize(); 282 283 long smallestSeqInRegionCurrentMemstorePhaseIV = 284 getWAL(region).getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); 285 long smallestSeqCF1PhaseIV = region.getOldestSeqIdOfStore(FAMILY1); 286 long smallestSeqCF2PhaseIV = region.getOldestSeqIdOfStore(FAMILY2); 287 long smallestSeqCF3PhaseIV = region.getOldestSeqIdOfStore(FAMILY3); 288 289 s = s + "----After SECOND FLUSH, CF1 size is:" + cf1MemstoreSizePhaseIV + ", CF2 size is:" 290 + cf2MemstoreSizePhaseIV + " and CF3 size is:" + cf3MemstoreSizePhaseIV + "\n"; 291 292 s = s + "The smallest sequence in region WAL is: " + smallestSeqInRegionCurrentMemstorePhaseIV 293 + ", the smallest sequence in CF1:" + smallestSeqCF1PhaseIV + ", " 294 + "the smallest sequence in CF2:" + smallestSeqCF2PhaseIV + ", the smallest sequence in CF3:" 295 + smallestSeqCF3PhaseIV + "\n"; 296 297 // CF1's pipeline component (inserted before first flush) should be flushed to disk 298 // CF2 should be flushed to disk 299 assertTrue(cf1MemstoreSizePhaseIII.getDataSize() > cf1MemstoreSizePhaseIV.getDataSize()); 300 assertEquals(0, cf2MemstoreSizePhaseIV.getDataSize()); 301 assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseIV.getHeapSize()); 302 303 // CF3 shouldn't have been touched. 304 assertEquals(cf3MemstoreSizePhaseIV, cf3MemstoreSizePhaseII); 305 306 // the smallest LSN of CF3 shouldn't change 307 assertEquals(smallestSeqCF3PhaseII, smallestSeqCF3PhaseIV); 308 309 // CF3 should be bottleneck for WAL 310 assertEquals(s, smallestSeqInRegionCurrentMemstorePhaseIV, smallestSeqCF3PhaseIV); 311 312 // Flush!!!!!!!!!!!!!!!!!!!!!! 313 // Trying to clean the existing memstores, CF2 all flushed to disk. The single 314 // memstore segment in the compaction pipeline of CF1 and CF3 should be flushed to disk. 315 region.flush(true); 316 317 // Recalculate everything 318 MemStoreSize cf1MemstoreSizePhaseV = region.getStore(FAMILY1).getMemStoreSize(); 319 MemStoreSize cf2MemstoreSizePhaseV = region.getStore(FAMILY2).getMemStoreSize(); 320 MemStoreSize cf3MemstoreSizePhaseV = region.getStore(FAMILY3).getMemStoreSize(); 321 long smallestSeqInRegionCurrentMemstorePhaseV = 322 getWAL(region).getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); 323 324 assertEquals(0, cf1MemstoreSizePhaseV.getDataSize()); 325 assertEquals(MutableSegment.DEEP_OVERHEAD, cf1MemstoreSizePhaseV.getHeapSize()); 326 assertEquals(0, cf2MemstoreSizePhaseV.getDataSize()); 327 assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseV.getHeapSize()); 328 assertEquals(0, cf3MemstoreSizePhaseV.getDataSize()); 329 assertEquals(MutableSegment.DEEP_OVERHEAD, cf3MemstoreSizePhaseV.getHeapSize()); 330 331 // What happens when we hit the memstore limit, but we are not able to find 332 // any Column Family above the threshold? 333 // In that case, we should flush all the CFs. 334 335 // The memstore limit is 100*1024 and the column family flush threshold is 336 // around 25*1024. We try to just hit the memstore limit with each CF's 337 // memstore being below the CF flush threshold. 338 for (int i = 1; i <= 300; i++) { 339 region.put(createPut(1, i)); 340 region.put(createPut(2, i)); 341 region.put(createPut(3, i)); 342 region.put(createPut(4, i)); 343 region.put(createPut(5, i)); 344 } 345 346 region.flush(false); 347 348 s = s + "----AFTER THIRD AND FORTH FLUSH, The smallest sequence in region WAL is: " 349 + smallestSeqInRegionCurrentMemstorePhaseV 350 + ". After additional inserts and last flush, the entire region size is:" 351 + region.getMemStoreDataSize() + "\n----------------------------------\n"; 352 353 // Since we won't find any CF above the threshold, and hence no specific 354 // store to flush, we should flush all the memstores 355 // Also compacted memstores are flushed to disk. 356 assertEquals(0, region.getMemStoreDataSize()); 357 System.out.println(s); 358 HBaseTestingUtil.closeRegionAndWAL(region); 359 } 360 361 /*------------------------------------------------------------------------------*/ 362 /* Check the same as above but for index-compaction type of compacting memstore */ 363 @Test 364 public void testSelectiveFlushWithIndexCompaction() throws IOException { 365 /*------------------------------------------------------------------------------*/ 366 /* SETUP */ 367 // Set up the configuration 368 conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 300 * 1024); 369 conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 75 * 1024); 370 conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.5); 371 // set memstore to index-compaction 372 conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, 373 String.valueOf(MemoryCompactionPolicy.BASIC)); 374 375 // Initialize the region 376 HRegion region = initHRegion("testSelectiveFlushWithIndexCompaction", conf); 377 verifyInMemoryFlushSize(region); 378 /*------------------------------------------------------------------------------*/ 379 /* PHASE I - insertions */ 380 // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3 381 for (int i = 1; i <= 1200; i++) { 382 region.put(createPut(1, i)); // compacted memstore 383 if (i <= 100) { 384 region.put(createPut(2, i)); 385 if (i <= 50) { 386 region.put(createDoublePut(3, i)); // subject for in-memory compaction 387 } 388 } 389 } 390 // Now add more puts for CF2, so that we only flush CF2 to disk 391 for (int i = 100; i < 2000; i++) { 392 region.put(createPut(2, i)); 393 } 394 395 /*------------------------------------------------------------------------------*/ 396 /*------------------------------------------------------------------------------*/ 397 /* PHASE I - collect sizes */ 398 long totalMemstoreSizePhaseI = region.getMemStoreDataSize(); 399 // Find the smallest LSNs for edits wrt to each CF. 400 long smallestSeqCF1PhaseI = region.getOldestSeqIdOfStore(FAMILY1); 401 long smallestSeqCF2PhaseI = region.getOldestSeqIdOfStore(FAMILY2); 402 long smallestSeqCF3PhaseI = region.getOldestSeqIdOfStore(FAMILY3); 403 // Find the sizes of the memstores of each CF. 404 MemStoreSize cf1MemstoreSizePhaseI = region.getStore(FAMILY1).getMemStoreSize(); 405 MemStoreSize cf2MemstoreSizePhaseI = region.getStore(FAMILY2).getMemStoreSize(); 406 MemStoreSize cf3MemstoreSizePhaseI = region.getStore(FAMILY3).getMemStoreSize(); 407 // Get the overall smallest LSN in the region's memstores. 408 long smallestSeqInRegionCurrentMemstorePhaseI = 409 getWAL(region).getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); 410 411 /*------------------------------------------------------------------------------*/ 412 /* PHASE I - validation */ 413 // The overall smallest LSN in the region's memstores should be the same as 414 // the LSN of the smallest edit in CF1 415 assertEquals(smallestSeqCF1PhaseI, smallestSeqInRegionCurrentMemstorePhaseI); 416 // Some other sanity checks. 417 assertTrue(smallestSeqCF1PhaseI < smallestSeqCF2PhaseI); 418 assertTrue(smallestSeqCF2PhaseI < smallestSeqCF3PhaseI); 419 assertTrue(cf1MemstoreSizePhaseI.getDataSize() > 0); 420 assertTrue(cf2MemstoreSizePhaseI.getDataSize() > 0); 421 assertTrue(cf3MemstoreSizePhaseI.getDataSize() > 0); 422 423 // The total memstore size should be the same as the sum of the sizes of 424 // memstores of CF1, CF2 and CF3. 425 assertEquals(totalMemstoreSizePhaseI, cf1MemstoreSizePhaseI.getDataSize() 426 + cf2MemstoreSizePhaseI.getDataSize() + cf3MemstoreSizePhaseI.getDataSize()); 427 428 /*------------------------------------------------------------------------------*/ 429 /* PHASE I - Flush */ 430 // First Flush in Test!!!!!!!!!!!!!!!!!!!!!! 431 // CF1, CF2, CF3, all together they are above the flush size lower bound. 432 // Since CF1 and CF3 are compacting, CF2 is going to be flushed to disk. 433 // CF1 and CF3 - flushed to memory and flatten explicitly 434 region.flush(false); 435 CompactingMemStore cms1 = (CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore; 436 CompactingMemStore cms3 = (CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore; 437 cms1.flushInMemory(); 438 cms3.flushInMemory(); 439 440 // CF3/CF1 should be merged so wait here to be sure the compaction is done 441 while ( 442 ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore) 443 .isMemStoreFlushingInMemory() 444 ) { 445 Threads.sleep(10); 446 } 447 while ( 448 ((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore) 449 .isMemStoreFlushingInMemory() 450 ) { 451 Threads.sleep(10); 452 } 453 454 /*------------------------------------------------------------------------------*/ 455 /*------------------------------------------------------------------------------*/ 456 /* PHASE II - collect sizes */ 457 // Recalculate everything 458 MemStoreSize cf1MemstoreSizePhaseII = region.getStore(FAMILY1).getMemStoreSize(); 459 MemStoreSize cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getMemStoreSize(); 460 MemStoreSize cf3MemstoreSizePhaseII = region.getStore(FAMILY3).getMemStoreSize(); 461 long smallestSeqInRegionCurrentMemstorePhaseII = 462 getWAL(region).getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); 463 // Find the smallest LSNs for edits wrt to each CF. 464 long smallestSeqCF3PhaseII = region.getOldestSeqIdOfStore(FAMILY3); 465 long totalMemstoreSizePhaseII = region.getMemStoreDataSize(); 466 467 /*------------------------------------------------------------------------------*/ 468 /* PHASE II - validation */ 469 // CF1 was flushed to memory, should be flattened and take less space 470 assertEquals(cf1MemstoreSizePhaseII.getDataSize(), cf1MemstoreSizePhaseI.getDataSize()); 471 assertTrue(cf1MemstoreSizePhaseII.getHeapSize() < cf1MemstoreSizePhaseI.getHeapSize()); 472 // CF2 should become empty 473 assertEquals(0, cf2MemstoreSizePhaseII.getDataSize()); 474 assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseII.getHeapSize()); 475 // verify that CF3 was flushed to memory and was not compacted (this is an approximation check) 476 // if compacted CF# should be at least twice less because its every key was duplicated 477 assertEquals(cf3MemstoreSizePhaseII.getDataSize(), cf3MemstoreSizePhaseI.getDataSize()); 478 assertTrue(cf3MemstoreSizePhaseI.getHeapSize() / 2 < cf3MemstoreSizePhaseII.getHeapSize()); 479 480 // Now the smallest LSN in the region should be the same as the smallest 481 // LSN in the memstore of CF1. 482 assertEquals(smallestSeqInRegionCurrentMemstorePhaseII, smallestSeqCF1PhaseI); 483 // The total memstore size should be the same as the sum of the sizes of 484 // memstores of CF1, CF2 and CF3. Counting the empty active segments in CF1/2/3 and pipeline 485 // items in CF1/2 486 assertEquals(totalMemstoreSizePhaseII, cf1MemstoreSizePhaseII.getDataSize() 487 + cf2MemstoreSizePhaseII.getDataSize() + cf3MemstoreSizePhaseII.getDataSize()); 488 489 /*------------------------------------------------------------------------------*/ 490 /*------------------------------------------------------------------------------*/ 491 /* PHASE III - insertions */ 492 // Now add more puts for CF1, so that we also flush CF1 to disk instead of 493 // memory in next flush. This is causing the CF! to be flushed to memory twice. 494 for (int i = 1200; i < 8000; i++) { 495 region.put(createPut(1, i)); 496 } 497 498 // CF1 should be flatten and merged so wait here to be sure the compaction is done 499 while ( 500 ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore) 501 .isMemStoreFlushingInMemory() 502 ) { 503 Threads.sleep(10); 504 } 505 506 /*------------------------------------------------------------------------------*/ 507 /* PHASE III - collect sizes */ 508 // How much does the CF1 memstore occupy now? Will be used later. 509 MemStoreSize cf1MemstoreSizePhaseIII = region.getStore(FAMILY1).getMemStoreSize(); 510 long totalMemstoreSizePhaseIII = region.getMemStoreDataSize(); 511 512 /*------------------------------------------------------------------------------*/ 513 /* PHASE III - validation */ 514 // The total memstore size should be the same as the sum of the sizes of 515 // memstores of CF1, CF2 and CF3. Counting the empty active segments in CF1/2/3 and pipeline 516 // items in CF1/2 517 assertEquals(totalMemstoreSizePhaseIII, cf1MemstoreSizePhaseIII.getDataSize() 518 + cf2MemstoreSizePhaseII.getDataSize() + cf3MemstoreSizePhaseII.getDataSize()); 519 520 /*------------------------------------------------------------------------------*/ 521 /* PHASE III - Flush */ 522 // Second Flush in Test!!!!!!!!!!!!!!!!!!!!!! 523 // CF1 is flushed to disk, but not entirely emptied. 524 // CF2 was and remained empty, same way nothing happens to CF3 525 region.flush(false); 526 527 /*------------------------------------------------------------------------------*/ 528 /*------------------------------------------------------------------------------*/ 529 /* PHASE IV - collect sizes */ 530 // Recalculate everything 531 MemStoreSize cf1MemstoreSizePhaseIV = region.getStore(FAMILY1).getMemStoreSize(); 532 MemStoreSize cf2MemstoreSizePhaseIV = region.getStore(FAMILY2).getMemStoreSize(); 533 MemStoreSize cf3MemstoreSizePhaseIV = region.getStore(FAMILY3).getMemStoreSize(); 534 long smallestSeqInRegionCurrentMemstorePhaseIV = 535 getWAL(region).getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); 536 long smallestSeqCF3PhaseIV = region.getOldestSeqIdOfStore(FAMILY3); 537 538 /*------------------------------------------------------------------------------*/ 539 /* PHASE IV - validation */ 540 // CF1's biggest pipeline component (inserted before first flush) should be flushed to disk 541 // CF2 should remain empty 542 assertTrue(cf1MemstoreSizePhaseIII.getDataSize() > cf1MemstoreSizePhaseIV.getDataSize()); 543 assertEquals(0, cf2MemstoreSizePhaseIV.getDataSize()); 544 assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseIV.getHeapSize()); 545 // CF3 shouldn't have been touched. 546 assertEquals(cf3MemstoreSizePhaseIV, cf3MemstoreSizePhaseII); 547 // the smallest LSN of CF3 shouldn't change 548 assertEquals(smallestSeqCF3PhaseII, smallestSeqCF3PhaseIV); 549 // CF3 should be bottleneck for WAL 550 assertEquals(smallestSeqInRegionCurrentMemstorePhaseIV, smallestSeqCF3PhaseIV); 551 552 /*------------------------------------------------------------------------------*/ 553 /* PHASE IV - Flush */ 554 // Third Flush in Test!!!!!!!!!!!!!!!!!!!!!! 555 // Force flush to disk on all memstores (flush parameter true). 556 // CF1/CF3 all flushed to disk. Note that active sets of CF1 and CF3 are empty 557 region.flush(true); 558 559 /*------------------------------------------------------------------------------*/ 560 /*------------------------------------------------------------------------------*/ 561 /* PHASE V - collect sizes */ 562 // Recalculate everything 563 MemStoreSize cf1MemstoreSizePhaseV = region.getStore(FAMILY1).getMemStoreSize(); 564 MemStoreSize cf2MemstoreSizePhaseV = region.getStore(FAMILY2).getMemStoreSize(); 565 MemStoreSize cf3MemstoreSizePhaseV = region.getStore(FAMILY3).getMemStoreSize(); 566 long smallestSeqInRegionCurrentMemstorePhaseV = 567 getWAL(region).getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); 568 long totalMemstoreSizePhaseV = region.getMemStoreDataSize(); 569 570 /*------------------------------------------------------------------------------*/ 571 /* PHASE V - validation */ 572 assertEquals(0, cf1MemstoreSizePhaseV.getDataSize()); 573 assertEquals(MutableSegment.DEEP_OVERHEAD, cf1MemstoreSizePhaseV.getHeapSize()); 574 assertEquals(0, cf2MemstoreSizePhaseV.getDataSize()); 575 assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseV.getHeapSize()); 576 assertEquals(0, cf3MemstoreSizePhaseV.getDataSize()); 577 assertEquals(MutableSegment.DEEP_OVERHEAD, cf3MemstoreSizePhaseV.getHeapSize()); 578 // The total memstores size should be empty 579 assertEquals(0, totalMemstoreSizePhaseV); 580 // Because there is nothing in any memstore the WAL's LSN should be -1 581 assertEquals(HConstants.NO_SEQNUM, smallestSeqInRegionCurrentMemstorePhaseV); 582 583 // What happens when we hit the memstore limit, but we are not able to find 584 // any Column Family above the threshold? 585 // In that case, we should flush all the CFs. 586 587 /*------------------------------------------------------------------------------*/ 588 /*------------------------------------------------------------------------------*/ 589 /* PHASE VI - insertions */ 590 // The memstore limit is 200*1024 and the column family flush threshold is 591 // around 50*1024. We try to just hit the memstore limit with each CF's 592 // memstore being below the CF flush threshold. 593 for (int i = 1; i <= 300; i++) { 594 region.put(createPut(1, i)); 595 region.put(createPut(2, i)); 596 region.put(createPut(3, i)); 597 region.put(createPut(4, i)); 598 region.put(createPut(5, i)); 599 } 600 601 MemStoreSize cf1ActiveSizePhaseVI = region.getStore(FAMILY1).getMemStoreSize(); 602 MemStoreSize cf3ActiveSizePhaseVI = region.getStore(FAMILY3).getMemStoreSize(); 603 MemStoreSize cf5ActiveSizePhaseVI = region.getStore(FAMILIES[4]).getMemStoreSize(); 604 605 /*------------------------------------------------------------------------------*/ 606 /* PHASE VI - Flush */ 607 // Fourth Flush in Test!!!!!!!!!!!!!!!!!!!!!! 608 // None among compacting memstores was flushed to memory due to previous puts. 609 // But is going to be moved to pipeline and flatten due to the flush. 610 region.flush(false); 611 // Since we won't find any CF above the threshold, and hence no specific 612 // store to flush, we should flush all the memstores 613 // Also compacted memstores are flushed to disk, but not entirely emptied 614 MemStoreSize cf1ActiveSizePhaseVII = region.getStore(FAMILY1).getMemStoreSize(); 615 MemStoreSize cf3ActiveSizePhaseVII = region.getStore(FAMILY3).getMemStoreSize(); 616 MemStoreSize cf5ActiveSizePhaseVII = region.getStore(FAMILIES[4]).getMemStoreSize(); 617 618 assertTrue(cf1ActiveSizePhaseVII.getDataSize() < cf1ActiveSizePhaseVI.getDataSize()); 619 assertTrue(cf3ActiveSizePhaseVII.getDataSize() < cf3ActiveSizePhaseVI.getDataSize()); 620 assertTrue(cf5ActiveSizePhaseVII.getDataSize() < cf5ActiveSizePhaseVI.getDataSize()); 621 622 HBaseTestingUtil.closeRegionAndWAL(region); 623 } 624 625 @Test 626 public void testSelectiveFlushAndWALinDataCompaction() throws IOException { 627 // Set up the configuration 628 conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 300 * 1024); 629 conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 75 * 1024); 630 // set memstore to do data compaction and not to use the speculative scan 631 conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, 632 String.valueOf(MemoryCompactionPolicy.EAGER)); 633 634 // Intialize the HRegion 635 HRegion region = initHRegion("testSelectiveFlushAndWALinDataCompaction", conf); 636 verifyInMemoryFlushSize(region); 637 // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3 638 for (int i = 1; i <= 1200; i++) { 639 region.put(createPut(1, i)); 640 if (i <= 100) { 641 region.put(createPut(2, i)); 642 if (i <= 50) { 643 region.put(createPut(3, i)); 644 } 645 } 646 } 647 // Now add more puts for CF2, so that we only flush CF2 to disk 648 for (int i = 100; i < 2000; i++) { 649 region.put(createPut(2, i)); 650 } 651 652 // in this test check the non-composite snapshot - flashing only tail of the pipeline 653 ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore).setCompositeSnapshot(false); 654 ((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore).setCompositeSnapshot(false); 655 656 long totalMemstoreSize = region.getMemStoreDataSize(); 657 658 // Find the sizes of the memstores of each CF. 659 MemStoreSize cf1MemstoreSizePhaseI = region.getStore(FAMILY1).getMemStoreSize(); 660 MemStoreSize cf2MemstoreSizePhaseI = region.getStore(FAMILY2).getMemStoreSize(); 661 MemStoreSize cf3MemstoreSizePhaseI = region.getStore(FAMILY3).getMemStoreSize(); 662 663 // Some other sanity checks. 664 assertTrue(cf1MemstoreSizePhaseI.getDataSize() > 0); 665 assertTrue(cf2MemstoreSizePhaseI.getDataSize() > 0); 666 assertTrue(cf3MemstoreSizePhaseI.getDataSize() > 0); 667 668 // The total memstore size should be the same as the sum of the sizes of 669 // memstores of CF1, CF2 and CF3. 670 String msg = "totalMemstoreSize=" + totalMemstoreSize + " DefaultMemStore.DEEP_OVERHEAD=" 671 + DefaultMemStore.DEEP_OVERHEAD + " cf1MemstoreSizePhaseI=" + cf1MemstoreSizePhaseI 672 + " cf2MemstoreSizePhaseI=" + cf2MemstoreSizePhaseI + " cf3MemstoreSizePhaseI=" 673 + cf3MemstoreSizePhaseI; 674 assertEquals(msg, totalMemstoreSize, cf1MemstoreSizePhaseI.getDataSize() 675 + cf2MemstoreSizePhaseI.getDataSize() + cf3MemstoreSizePhaseI.getDataSize()); 676 677 // Flush! 678 CompactingMemStore cms1 = (CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore; 679 CompactingMemStore cms3 = (CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore; 680 cms1.flushInMemory(); 681 cms3.flushInMemory(); 682 region.flush(false); 683 684 MemStoreSize cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getMemStoreSize(); 685 686 long smallestSeqInRegionCurrentMemstorePhaseII = 687 region.getWAL().getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); 688 long smallestSeqCF1PhaseII = region.getOldestSeqIdOfStore(FAMILY1); 689 long smallestSeqCF2PhaseII = region.getOldestSeqIdOfStore(FAMILY2); 690 long smallestSeqCF3PhaseII = region.getOldestSeqIdOfStore(FAMILY3); 691 692 // CF2 should have been cleared 693 assertEquals(0, cf2MemstoreSizePhaseII.getDataSize()); 694 assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseII.getHeapSize()); 695 696 String s = "\n\n----------------------------------\n" 697 + "Upon initial insert and flush, LSN of CF1 is:" + smallestSeqCF1PhaseII + ". LSN of CF2 is:" 698 + smallestSeqCF2PhaseII + ". LSN of CF3 is:" + smallestSeqCF3PhaseII 699 + ", smallestSeqInRegionCurrentMemstore:" + smallestSeqInRegionCurrentMemstorePhaseII + "\n"; 700 701 // Add same entries to compact them later 702 for (int i = 1; i <= 1200; i++) { 703 region.put(createPut(1, i)); 704 if (i <= 100) { 705 region.put(createPut(2, i)); 706 if (i <= 50) { 707 region.put(createPut(3, i)); 708 } 709 } 710 } 711 // Now add more puts for CF2, so that we only flush CF2 to disk 712 for (int i = 100; i < 2000; i++) { 713 region.put(createPut(2, i)); 714 } 715 716 long smallestSeqInRegionCurrentMemstorePhaseIII = 717 region.getWAL().getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); 718 long smallestSeqCF1PhaseIII = region.getOldestSeqIdOfStore(FAMILY1); 719 long smallestSeqCF2PhaseIII = region.getOldestSeqIdOfStore(FAMILY2); 720 long smallestSeqCF3PhaseIII = region.getOldestSeqIdOfStore(FAMILY3); 721 722 s = s + "The smallest sequence in region WAL is: " + smallestSeqInRegionCurrentMemstorePhaseIII 723 + ", the smallest sequence in CF1:" + smallestSeqCF1PhaseIII + ", " 724 + "the smallest sequence in CF2:" + smallestSeqCF2PhaseIII + ", the smallest sequence in CF3:" 725 + smallestSeqCF3PhaseIII + "\n"; 726 727 // Flush! 728 cms1 = (CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore; 729 cms3 = (CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore; 730 cms1.flushInMemory(); 731 cms3.flushInMemory(); 732 region.flush(false); 733 734 long smallestSeqInRegionCurrentMemstorePhaseIV = 735 region.getWAL().getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); 736 long smallestSeqCF1PhaseIV = region.getOldestSeqIdOfStore(FAMILY1); 737 long smallestSeqCF2PhaseIV = region.getOldestSeqIdOfStore(FAMILY2); 738 long smallestSeqCF3PhaseIV = region.getOldestSeqIdOfStore(FAMILY3); 739 740 s = s + "The smallest sequence in region WAL is: " + smallestSeqInRegionCurrentMemstorePhaseIV 741 + ", the smallest sequence in CF1:" + smallestSeqCF1PhaseIV + ", " 742 + "the smallest sequence in CF2:" + smallestSeqCF2PhaseIV + ", the smallest sequence in CF3:" 743 + smallestSeqCF3PhaseIV + "\n"; 744 745 // now check that the LSN of the entire WAL, of CF1 and of CF3 has progressed due to compaction 746 assertTrue(s, 747 smallestSeqInRegionCurrentMemstorePhaseIV > smallestSeqInRegionCurrentMemstorePhaseIII); 748 assertTrue(smallestSeqCF1PhaseIV > smallestSeqCF1PhaseIII); 749 assertTrue(smallestSeqCF3PhaseIV > smallestSeqCF3PhaseIII); 750 751 HBaseTestingUtil.closeRegionAndWAL(region); 752 } 753 754 @Test 755 public void testSelectiveFlushWithBasicAndMerge() throws IOException { 756 // Set up the configuration 757 conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 300 * 1024); 758 conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 75 * 1024); 759 conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.8); 760 // set memstore to do index compaction with merge 761 conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, 762 String.valueOf(MemoryCompactionPolicy.BASIC)); 763 // length of pipeline that requires merge 764 conf.setInt(MemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY, 1); 765 766 // Intialize the HRegion 767 HRegion region = initHRegion("testSelectiveFlushWithBasicAndMerge", conf); 768 verifyInMemoryFlushSize(region); 769 // Add 1200 entries for CF1 (CompactingMemStore), 100 for CF2 (DefaultMemStore) and 50 for CF3 770 for (int i = 1; i <= 1200; i++) { 771 region.put(createPut(1, i)); 772 if (i <= 100) { 773 region.put(createPut(2, i)); 774 if (i <= 50) { 775 region.put(createPut(3, i)); 776 } 777 } 778 } 779 // Now put more entries to CF2 780 for (int i = 100; i < 2000; i++) { 781 region.put(createPut(2, i)); 782 } 783 784 long totalMemstoreSize = region.getMemStoreDataSize(); 785 786 // test in-memory flashing into CAM here 787 ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore) 788 .setIndexType(CompactingMemStore.IndexType.ARRAY_MAP); 789 ((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore) 790 .setIndexType(CompactingMemStore.IndexType.ARRAY_MAP); 791 792 // Find the sizes of the memstores of each CF. 793 MemStoreSize cf1MemstoreSizePhaseI = region.getStore(FAMILY1).getMemStoreSize(); 794 MemStoreSize cf2MemstoreSizePhaseI = region.getStore(FAMILY2).getMemStoreSize(); 795 MemStoreSize cf3MemstoreSizePhaseI = region.getStore(FAMILY3).getMemStoreSize(); 796 797 // Some other sanity checks. 798 assertTrue(cf1MemstoreSizePhaseI.getDataSize() > 0); 799 assertTrue(cf2MemstoreSizePhaseI.getDataSize() > 0); 800 assertTrue(cf3MemstoreSizePhaseI.getDataSize() > 0); 801 802 // The total memstore size should be the same as the sum of the sizes of 803 // memstores of CF1, CF2 and CF3. 804 assertEquals(totalMemstoreSize, cf1MemstoreSizePhaseI.getDataSize() 805 + cf2MemstoreSizePhaseI.getDataSize() + cf3MemstoreSizePhaseI.getDataSize()); 806 807 // Initiate in-memory Flush! 808 ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore).flushInMemory(); 809 ((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore).flushInMemory(); 810 // CF1 and CF3 should be flatten and merged so wait here to be sure the merge is done 811 while ( 812 ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore) 813 .isMemStoreFlushingInMemory() 814 ) { 815 Threads.sleep(10); 816 } 817 while ( 818 ((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore) 819 .isMemStoreFlushingInMemory() 820 ) { 821 Threads.sleep(10); 822 } 823 824 // Flush-to-disk! CF2 only should be flushed 825 region.flush(false); 826 827 MemStoreSize cf1MemstoreSizePhaseII = region.getStore(FAMILY1).getMemStoreSize(); 828 MemStoreSize cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getMemStoreSize(); 829 MemStoreSize cf3MemstoreSizePhaseII = region.getStore(FAMILY3).getMemStoreSize(); 830 831 // CF1 should be flushed in memory and just flattened, so CF1 heap overhead should be smaller 832 assertTrue(cf1MemstoreSizePhaseI.getHeapSize() > cf1MemstoreSizePhaseII.getHeapSize()); 833 // CF1 should be flushed in memory and just flattened, so CF1 data size should remain the same 834 assertEquals(cf1MemstoreSizePhaseI.getDataSize(), cf1MemstoreSizePhaseII.getDataSize()); 835 // CF2 should have been cleared 836 assertEquals(0, cf2MemstoreSizePhaseII.getDataSize()); 837 838 // Add the same amount of entries to see the merging 839 for (int i = 1; i <= 1200; i++) { 840 region.put(createPut(1, i)); 841 if (i <= 100) { 842 region.put(createPut(2, i)); 843 if (i <= 50) { 844 region.put(createPut(3, i)); 845 } 846 } 847 } 848 // Now add more puts for CF2, so that we only flush CF2 to disk 849 for (int i = 100; i < 2000; i++) { 850 region.put(createPut(2, i)); 851 } 852 853 MemStoreSize cf1MemstoreSizePhaseIII = region.getStore(FAMILY1).getMemStoreSize(); 854 855 // Flush in memory! 856 ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore).flushInMemory(); 857 ((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore).flushInMemory(); 858 // CF1 and CF3 should be merged so wait here to be sure the merge is done 859 while ( 860 ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore) 861 .isMemStoreFlushingInMemory() 862 ) { 863 Threads.sleep(10); 864 } 865 while ( 866 ((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore) 867 .isMemStoreFlushingInMemory() 868 ) { 869 Threads.sleep(10); 870 } 871 region.flush(false); 872 873 MemStoreSize cf1MemstoreSizePhaseIV = region.getStore(FAMILY1).getMemStoreSize(); 874 MemStoreSize cf2MemstoreSizePhaseIV = region.getStore(FAMILY2).getMemStoreSize(); 875 876 assertEquals(2 * cf1MemstoreSizePhaseI.getDataSize(), cf1MemstoreSizePhaseIV.getDataSize()); 877 // the decrease in the heap size due to usage of CellArrayMap instead of CSLM 878 // should be the same in flattening and in merge (first and second in-memory-flush) 879 // but in phase 1 we do not yet have immutable segment 880 assertEquals(cf1MemstoreSizePhaseI.getHeapSize() - cf1MemstoreSizePhaseII.getHeapSize(), 881 cf1MemstoreSizePhaseIII.getHeapSize() - cf1MemstoreSizePhaseIV.getHeapSize() 882 - CellArrayImmutableSegment.DEEP_OVERHEAD_CAM); 883 assertEquals(3, // active, one in pipeline, snapshot 884 ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore).getSegments().size()); 885 // CF2 should have been cleared 886 assertEquals("\n<<< DEBUG: The data--heap sizes of stores before/after first flushes," 887 + " CF1: " + cf1MemstoreSizePhaseI.getDataSize() + "/" + cf1MemstoreSizePhaseII.getDataSize() 888 + "--" + cf1MemstoreSizePhaseI.getHeapSize() + "/" + cf1MemstoreSizePhaseII.getHeapSize() 889 + ", CF2: " + cf2MemstoreSizePhaseI.getDataSize() + "/" + cf2MemstoreSizePhaseII.getDataSize() 890 + "--" + cf2MemstoreSizePhaseI.getHeapSize() + "/" + cf2MemstoreSizePhaseII.getHeapSize() 891 + ", CF3: " + cf3MemstoreSizePhaseI.getDataSize() + "/" + cf3MemstoreSizePhaseII.getDataSize() 892 + "--" + cf3MemstoreSizePhaseI.getHeapSize() + "/" + cf3MemstoreSizePhaseII.getHeapSize() 893 + "\n<<< AND before/after second flushes " + " CF1: " + cf1MemstoreSizePhaseIII.getDataSize() 894 + "/" + cf1MemstoreSizePhaseIV.getDataSize() + "--" + cf1MemstoreSizePhaseIII.getHeapSize() 895 + "/" + cf1MemstoreSizePhaseIV.getHeapSize() + "\n", 0, cf2MemstoreSizePhaseIV.getDataSize()); 896 897 HBaseTestingUtil.closeRegionAndWAL(region); 898 } 899 900 // should end in 300 seconds (5 minutes) 901 @Test 902 public void testStressFlushAndWALinIndexCompaction() throws IOException { 903 // Set up the configuration 904 conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 600 * 1024); 905 conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 906 200 * 1024); 907 // set memstore to do data compaction and not to use the speculative scan 908 conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, 909 String.valueOf(MemoryCompactionPolicy.BASIC)); 910 911 // Successfully initialize the HRegion 912 HRegion region = initHRegion("testSelectiveFlushAndWALinDataCompaction", conf); 913 verifyInMemoryFlushSize(region); 914 Thread[] threads = new Thread[25]; 915 for (int i = 0; i < threads.length; i++) { 916 int id = i * 10000; 917 ConcurrentPutRunnable runnable = new ConcurrentPutRunnable(region, id); 918 threads[i] = new Thread(runnable); 919 threads[i].start(); 920 } 921 Threads.sleep(10000); // let other threads start 922 region.flush(true); // enforce flush of everything TO DISK while there are still ongoing puts 923 Threads.sleep(10000); // let other threads continue 924 region.flush(true); // enforce flush of everything TO DISK while there are still ongoing puts 925 926 ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore).flushInMemory(); 927 ((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore).flushInMemory(); 928 while ( 929 ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore) 930 .isMemStoreFlushingInMemory() 931 ) { 932 Threads.sleep(10); 933 } 934 while ( 935 ((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore) 936 .isMemStoreFlushingInMemory() 937 ) { 938 Threads.sleep(10); 939 } 940 941 for (int i = 0; i < threads.length; i++) { 942 try { 943 threads[i].join(); 944 } catch (InterruptedException e) { 945 e.printStackTrace(); 946 } 947 } 948 } 949 950 /** 951 * The in-memory-flusher thread performs the flush asynchronously. There is at most one thread per 952 * memstore instance. It takes the updatesLock exclusively, pushes active into the pipeline, 953 * releases updatesLock and compacts the pipeline. 954 */ 955 private class ConcurrentPutRunnable implements Runnable { 956 private final HRegion stressedRegion; 957 private final int startNumber; 958 959 ConcurrentPutRunnable(HRegion r, int i) { 960 this.stressedRegion = r; 961 this.startNumber = i; 962 } 963 964 @Override 965 public void run() { 966 967 try { 968 int dummy = startNumber / 10000; 969 System.out.print("Thread " + dummy + " with start number " + startNumber + " starts\n"); 970 // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3 971 for (int i = startNumber; i <= startNumber + 3000; i++) { 972 stressedRegion.put(createPut(1, i)); 973 if (i <= startNumber + 2000) { 974 stressedRegion.put(createPut(2, i)); 975 if (i <= startNumber + 1000) { 976 stressedRegion.put(createPut(3, i)); 977 } 978 } 979 } 980 System.out.print("Thread with start number " + startNumber + " continues to more puts\n"); 981 // Now add more puts for CF2, so that we only flush CF2 to disk 982 for (int i = startNumber + 3000; i < startNumber + 5000; i++) { 983 stressedRegion.put(createPut(2, i)); 984 } 985 // And add more puts for CF1 986 for (int i = startNumber + 5000; i < startNumber + 7000; i++) { 987 stressedRegion.put(createPut(1, i)); 988 } 989 System.out.print("Thread with start number " + startNumber + " flushes\n"); 990 // flush (IN MEMORY) one of the stores (each thread flushes different store) 991 // and wait till the flush and the following action are done 992 if (startNumber == 0) { 993 ((CompactingMemStore) ((HStore) stressedRegion.getStore(FAMILY1)).memstore) 994 .flushInMemory(); 995 while ( 996 ((CompactingMemStore) ((HStore) stressedRegion.getStore(FAMILY1)).memstore) 997 .isMemStoreFlushingInMemory() 998 ) { 999 Threads.sleep(10); 1000 } 1001 } 1002 if (startNumber == 10000) { 1003 ((CompactingMemStore) ((HStore) stressedRegion.getStore(FAMILY2)).memstore) 1004 .flushInMemory(); 1005 while ( 1006 ((CompactingMemStore) ((HStore) stressedRegion.getStore(FAMILY2)).memstore) 1007 .isMemStoreFlushingInMemory() 1008 ) { 1009 Threads.sleep(10); 1010 } 1011 } 1012 if (startNumber == 20000) { 1013 ((CompactingMemStore) ((HStore) stressedRegion.getStore(FAMILY3)).memstore) 1014 .flushInMemory(); 1015 while ( 1016 ((CompactingMemStore) ((HStore) stressedRegion.getStore(FAMILY3)).memstore) 1017 .isMemStoreFlushingInMemory() 1018 ) { 1019 Threads.sleep(10); 1020 } 1021 } 1022 System.out.print("Thread with start number " + startNumber + " finishes\n"); 1023 } catch (IOException e) { 1024 assert false; 1025 } 1026 } 1027 } 1028 1029 private WAL getWAL(Region region) { 1030 return ((HRegion) region).getWAL(); 1031 } 1032}