001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.util.concurrent.Executors; 025import java.util.concurrent.ThreadPoolExecutor; 026 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.fs.Path; 029import org.apache.hadoop.hbase.HBaseClassTestRule; 030import org.apache.hadoop.hbase.HBaseConfiguration; 031import org.apache.hadoop.hbase.HBaseTestingUtility; 032import org.apache.hadoop.hbase.HColumnDescriptor; 033import org.apache.hadoop.hbase.HConstants; 034import org.apache.hadoop.hbase.HRegionInfo; 035import org.apache.hadoop.hbase.HTableDescriptor; 036import org.apache.hadoop.hbase.MemoryCompactionPolicy; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.client.Put; 039import org.apache.hadoop.hbase.testclassification.LargeTests; 040import org.apache.hadoop.hbase.testclassification.RegionServerTests; 041import org.apache.hadoop.hbase.util.Bytes; 042import org.apache.hadoop.hbase.util.Threads; 043import org.apache.hadoop.hbase.wal.WAL; 044import org.junit.Before; 045import org.junit.ClassRule; 046import org.junit.Test; 047import org.junit.experimental.categories.Category; 048import org.mockito.Mockito; 049 050/** 051 * This test verifies the correctness of the Per Column Family flushing strategy 052 * when part of the memstores are compacted memstores 053 */ 054@Category({ RegionServerTests.class, LargeTests.class }) 055public class TestWalAndCompactingMemStoreFlush { 056 057 @ClassRule 058 public static final HBaseClassTestRule CLASS_RULE = 059 HBaseClassTestRule.forClass(TestWalAndCompactingMemStoreFlush.class); 060 061 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 062 private static final Path DIR = TEST_UTIL.getDataTestDir("TestHRegion"); 063 public static final TableName TABLENAME = TableName.valueOf("TestWalAndCompactingMemStoreFlush", 064 "t1"); 065 066 public static final byte[][] FAMILIES = { Bytes.toBytes("f1"), Bytes.toBytes("f2"), 067 Bytes.toBytes("f3"), Bytes.toBytes("f4"), Bytes.toBytes("f5") }; 068 069 public static final byte[] FAMILY1 = FAMILIES[0]; 070 public static final byte[] FAMILY2 = FAMILIES[1]; 071 public static final byte[] FAMILY3 = FAMILIES[2]; 072 073 private Configuration conf; 074 075 private HRegion initHRegion(String callingMethod, Configuration conf) throws IOException { 076 int i = 0; 077 HTableDescriptor htd = new HTableDescriptor(TABLENAME); 078 for (byte[] family : FAMILIES) { 079 HColumnDescriptor hcd = new HColumnDescriptor(family); 080 // even column families are going to have compacted memstore 081 if (i % 2 == 0) { 082 hcd.setInMemoryCompaction(MemoryCompactionPolicy 083 .valueOf(conf.get(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY))); 084 } else { 085 hcd.setInMemoryCompaction(MemoryCompactionPolicy.NONE); 086 } 087 htd.addFamily(hcd); 088 i++; 089 } 090 091 HRegionInfo info = new HRegionInfo(TABLENAME, null, null, false); 092 Path path = new Path(DIR, callingMethod); 093 HRegion region = HBaseTestingUtility.createRegionAndWAL(info, path, conf, htd, false); 094 region.regionServicesForStores = Mockito.spy(region.regionServicesForStores); 095 ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1); 096 Mockito.when(region.regionServicesForStores.getInMemoryCompactionPool()).thenReturn(pool); 097 region.initialize(null); 098 return region; 099 } 100 101 // A helper function to create puts. 102 private Put createPut(int familyNum, int putNum) { 103 byte[] qf = Bytes.toBytes("q" + familyNum); 104 byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum); 105 byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum); 106 Put p = new Put(row); 107 p.addColumn(FAMILIES[familyNum - 1], qf, val); 108 return p; 109 } 110 111 // A helper function to create double puts, so something can be compacted later. 112 private Put createDoublePut(int familyNum, int putNum) { 113 byte[] qf = Bytes.toBytes("q" + familyNum); 114 byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum); 115 byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum); 116 Put p = new Put(row); 117 // add twice with different timestamps 118 p.addColumn(FAMILIES[familyNum - 1], qf, 10, val); 119 p.addColumn(FAMILIES[familyNum - 1], qf, 20, val); 120 return p; 121 } 122 123 private void verifyInMemoryFlushSize(Region region) { 124 assertEquals( 125 ((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore).getInmemoryFlushSize(), 126 ((CompactingMemStore) ((HStore)region.getStore(FAMILY3)).memstore).getInmemoryFlushSize()); 127 } 128 129 @Before 130 public void setup() { 131 conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); 132 conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, 133 FlushNonSloppyStoresFirstPolicy.class.getName()); 134 conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.5); 135 } 136 137 @Test 138 public void testSelectiveFlushWithEager() throws IOException { 139 // Set up the configuration 140 conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 300 * 1024); 141 conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 75 * 1024); 142 // set memstore to do data compaction 143 conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, 144 String.valueOf(MemoryCompactionPolicy.EAGER)); 145 146 // Intialize the region 147 HRegion region = initHRegion("testSelectiveFlushWithEager", conf); 148 verifyInMemoryFlushSize(region); 149 // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3 150 for (int i = 1; i <= 1200; i++) { 151 region.put(createPut(1, i)); // compacted memstore, all the keys are unique 152 153 if (i <= 100) { 154 region.put(createPut(2, i)); 155 if (i <= 50) { 156 // compacted memstore, subject for compaction due to duplications 157 region.put(createDoublePut(3, i)); 158 } 159 } 160 } 161 162 // Now add more puts for CF2, so that we only flush CF2 (DefaultMemStore) to disk 163 for (int i = 100; i < 2000; i++) { 164 region.put(createPut(2, i)); 165 } 166 167 long totalMemstoreSize = region.getMemStoreDataSize(); 168 169 // Find the smallest LSNs for edits wrt to each CF. 170 long smallestSeqCF1PhaseI = region.getOldestSeqIdOfStore(FAMILY1); 171 long smallestSeqCF2PhaseI = region.getOldestSeqIdOfStore(FAMILY2); 172 long smallestSeqCF3PhaseI = region.getOldestSeqIdOfStore(FAMILY3); 173 174 // Find the sizes of the memstores of each CF. 175 MemStoreSize cf1MemstoreSizePhaseI = region.getStore(FAMILY1).getMemStoreSize(); 176 MemStoreSize cf2MemstoreSizePhaseI = region.getStore(FAMILY2).getMemStoreSize(); 177 MemStoreSize cf3MemstoreSizePhaseI = region.getStore(FAMILY3).getMemStoreSize(); 178 179 // Get the overall smallest LSN in the region's memstores. 180 long smallestSeqInRegionCurrentMemstorePhaseI = getWAL(region) 181 .getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); 182 183 String s = "\n\n----------------------------------\n" 184 + "Upon initial insert and before any flush, size of CF1 is:" 185 + cf1MemstoreSizePhaseI + ", is CF1 compacted memstore?:" 186 + region.getStore(FAMILY1).isSloppyMemStore() + ". Size of CF2 is:" 187 + cf2MemstoreSizePhaseI + ", is CF2 compacted memstore?:" 188 + region.getStore(FAMILY2).isSloppyMemStore() + ". Size of CF3 is:" 189 + cf3MemstoreSizePhaseI + ", is CF3 compacted memstore?:" 190 + region.getStore(FAMILY3).isSloppyMemStore() + "\n"; 191 192 // The overall smallest LSN in the region's memstores should be the same as 193 // the LSN of the smallest edit in CF1 194 assertEquals(smallestSeqCF1PhaseI, smallestSeqInRegionCurrentMemstorePhaseI); 195 196 // Some other sanity checks. 197 assertTrue(smallestSeqCF1PhaseI < smallestSeqCF2PhaseI); 198 assertTrue(smallestSeqCF2PhaseI < smallestSeqCF3PhaseI); 199 assertTrue(cf1MemstoreSizePhaseI.getDataSize() > 0); 200 assertTrue(cf2MemstoreSizePhaseI.getDataSize() > 0); 201 assertTrue(cf3MemstoreSizePhaseI.getDataSize() > 0); 202 203 // The total memstore size should be the same as the sum of the sizes of 204 // memstores of CF1, CF2 and CF3. 205 String msg = "totalMemstoreSize="+totalMemstoreSize + 206 " cf1MemstoreSizePhaseI="+cf1MemstoreSizePhaseI + 207 " cf2MemstoreSizePhaseI="+cf2MemstoreSizePhaseI + 208 " cf3MemstoreSizePhaseI="+cf3MemstoreSizePhaseI ; 209 assertEquals(msg, totalMemstoreSize, cf1MemstoreSizePhaseI.getDataSize() 210 + cf2MemstoreSizePhaseI.getDataSize() + cf3MemstoreSizePhaseI.getDataSize()); 211 212 // Flush!!!!!!!!!!!!!!!!!!!!!! 213 // We have big compacting memstore CF1 and two small memstores: 214 // CF2 (not compacted) and CF3 (compacting) 215 // All together they are above the flush size lower bound. 216 // Since CF1 and CF3 should be flushed to memory (not to disk), 217 // CF2 is going to be flushed to disk. 218 // CF1 - nothing to compact (but flattening), CF3 - should be twice compacted 219 CompactingMemStore cms1 = (CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore; 220 CompactingMemStore cms3 = (CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore; 221 cms1.flushInMemory(); 222 cms3.flushInMemory(); 223 region.flush(false); 224 225 // Recalculate everything 226 MemStoreSize cf1MemstoreSizePhaseII = region.getStore(FAMILY1).getMemStoreSize(); 227 MemStoreSize cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getMemStoreSize(); 228 MemStoreSize cf3MemstoreSizePhaseII = region.getStore(FAMILY3).getMemStoreSize(); 229 230 long smallestSeqInRegionCurrentMemstorePhaseII = getWAL(region) 231 .getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); 232 // Find the smallest LSNs for edits wrt to each CF. 233 long smallestSeqCF1PhaseII = region.getOldestSeqIdOfStore(FAMILY1); 234 long smallestSeqCF2PhaseII = region.getOldestSeqIdOfStore(FAMILY2); 235 long smallestSeqCF3PhaseII = region.getOldestSeqIdOfStore(FAMILY3); 236 237 s = s + "\n----After first flush! CF1 should be flushed to memory, but not compacted.---\n" 238 + "Size of CF1 is:" + cf1MemstoreSizePhaseII + ", size of CF2 is:" + cf2MemstoreSizePhaseII 239 + ", size of CF3 is:" + cf3MemstoreSizePhaseII + "\n"; 240 241 // CF1 was flushed to memory, but there is nothing to compact, and CF1 was flattened 242 assertTrue(cf1MemstoreSizePhaseII.getDataSize() == cf1MemstoreSizePhaseI.getDataSize()); 243 assertTrue(cf1MemstoreSizePhaseII.getHeapSize() < cf1MemstoreSizePhaseI.getHeapSize()); 244 245 // CF2 should become empty 246 assertEquals(0, cf2MemstoreSizePhaseII.getDataSize()); 247 assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseII.getHeapSize()); 248 249 // verify that CF3 was flushed to memory and was compacted (this is approximation check) 250 assertTrue(cf3MemstoreSizePhaseI.getDataSize() > cf3MemstoreSizePhaseII.getDataSize()); 251 assertTrue( 252 cf3MemstoreSizePhaseI.getHeapSize() / 2 > cf3MemstoreSizePhaseII.getHeapSize()); 253 254 // Now the smallest LSN in the region should be the same as the smallest 255 // LSN in the memstore of CF1. 256 assertEquals(smallestSeqInRegionCurrentMemstorePhaseII, smallestSeqCF1PhaseI); 257 258 // Now add more puts for CF1, so that we also flush CF1 to disk instead of 259 // memory in next flush 260 for (int i = 1200; i < 3000; i++) { 261 region.put(createPut(1, i)); 262 } 263 264 s = s + "The smallest sequence in region WAL is: " + smallestSeqInRegionCurrentMemstorePhaseII 265 + ", the smallest sequence in CF1:" + smallestSeqCF1PhaseII + ", " + 266 "the smallest sequence in CF2:" 267 + smallestSeqCF2PhaseII +", the smallest sequence in CF3:" + smallestSeqCF3PhaseII + "\n"; 268 269 // How much does the CF1 memstore occupy? Will be used later. 270 MemStoreSize cf1MemstoreSizePhaseIII = region.getStore(FAMILY1).getMemStoreSize(); 271 long smallestSeqCF1PhaseIII = region.getOldestSeqIdOfStore(FAMILY1); 272 273 s = s + "----After more puts into CF1 its size is:" + cf1MemstoreSizePhaseIII 274 + ", and its sequence is:" + smallestSeqCF1PhaseIII + " ----\n" ; 275 276 277 // Flush!!!!!!!!!!!!!!!!!!!!!! 278 // Flush again, CF1 is flushed to disk 279 // CF2 is flushed to disk, because it is not in-memory compacted memstore 280 // CF3 is flushed empty to memory (actually nothing happens to CF3) 281 region.flush(false); 282 283 // Recalculate everything 284 MemStoreSize cf1MemstoreSizePhaseIV = region.getStore(FAMILY1).getMemStoreSize(); 285 MemStoreSize cf2MemstoreSizePhaseIV = region.getStore(FAMILY2).getMemStoreSize(); 286 MemStoreSize cf3MemstoreSizePhaseIV = region.getStore(FAMILY3).getMemStoreSize(); 287 288 long smallestSeqInRegionCurrentMemstorePhaseIV = getWAL(region) 289 .getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); 290 long smallestSeqCF1PhaseIV = region.getOldestSeqIdOfStore(FAMILY1); 291 long smallestSeqCF2PhaseIV = region.getOldestSeqIdOfStore(FAMILY2); 292 long smallestSeqCF3PhaseIV = region.getOldestSeqIdOfStore(FAMILY3); 293 294 s = s + "----After SECOND FLUSH, CF1 size is:" + cf1MemstoreSizePhaseIV + ", CF2 size is:" 295 + cf2MemstoreSizePhaseIV + " and CF3 size is:" + cf3MemstoreSizePhaseIV 296 + "\n"; 297 298 s = s + "The smallest sequence in region WAL is: " + smallestSeqInRegionCurrentMemstorePhaseIV 299 + ", the smallest sequence in CF1:" + smallestSeqCF1PhaseIV + ", " + 300 "the smallest sequence in CF2:" 301 + smallestSeqCF2PhaseIV +", the smallest sequence in CF3:" + smallestSeqCF3PhaseIV 302 + "\n"; 303 304 // CF1's pipeline component (inserted before first flush) should be flushed to disk 305 // CF2 should be flushed to disk 306 assertTrue(cf1MemstoreSizePhaseIII.getDataSize() > cf1MemstoreSizePhaseIV.getDataSize()); 307 assertEquals(0, cf2MemstoreSizePhaseIV.getDataSize()); 308 assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseIV.getHeapSize()); 309 310 // CF3 shouldn't have been touched. 311 assertEquals(cf3MemstoreSizePhaseIV, cf3MemstoreSizePhaseII); 312 313 // the smallest LSN of CF3 shouldn't change 314 assertEquals(smallestSeqCF3PhaseII, smallestSeqCF3PhaseIV); 315 316 // CF3 should be bottleneck for WAL 317 assertEquals(s, smallestSeqInRegionCurrentMemstorePhaseIV, smallestSeqCF3PhaseIV); 318 319 // Flush!!!!!!!!!!!!!!!!!!!!!! 320 // Trying to clean the existing memstores, CF2 all flushed to disk. The single 321 // memstore segment in the compaction pipeline of CF1 and CF3 should be flushed to disk. 322 region.flush(true); 323 324 // Recalculate everything 325 MemStoreSize cf1MemstoreSizePhaseV = region.getStore(FAMILY1).getMemStoreSize(); 326 MemStoreSize cf2MemstoreSizePhaseV = region.getStore(FAMILY2).getMemStoreSize(); 327 MemStoreSize cf3MemstoreSizePhaseV = region.getStore(FAMILY3).getMemStoreSize(); 328 long smallestSeqInRegionCurrentMemstorePhaseV = getWAL(region) 329 .getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); 330 331 assertEquals(0, cf1MemstoreSizePhaseV.getDataSize()); 332 assertEquals(MutableSegment.DEEP_OVERHEAD, cf1MemstoreSizePhaseV.getHeapSize()); 333 assertEquals(0, cf2MemstoreSizePhaseV.getDataSize()); 334 assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseV.getHeapSize()); 335 assertEquals(0, cf3MemstoreSizePhaseV.getDataSize()); 336 assertEquals(MutableSegment.DEEP_OVERHEAD, cf3MemstoreSizePhaseV.getHeapSize()); 337 338 // What happens when we hit the memstore limit, but we are not able to find 339 // any Column Family above the threshold? 340 // In that case, we should flush all the CFs. 341 342 // The memstore limit is 100*1024 and the column family flush threshold is 343 // around 25*1024. We try to just hit the memstore limit with each CF's 344 // memstore being below the CF flush threshold. 345 for (int i = 1; i <= 300; i++) { 346 region.put(createPut(1, i)); 347 region.put(createPut(2, i)); 348 region.put(createPut(3, i)); 349 region.put(createPut(4, i)); 350 region.put(createPut(5, i)); 351 } 352 353 region.flush(false); 354 355 s = s + "----AFTER THIRD AND FORTH FLUSH, The smallest sequence in region WAL is: " 356 + smallestSeqInRegionCurrentMemstorePhaseV 357 + ". After additional inserts and last flush, the entire region size is:" + region 358 .getMemStoreDataSize() 359 + "\n----------------------------------\n"; 360 361 // Since we won't find any CF above the threshold, and hence no specific 362 // store to flush, we should flush all the memstores 363 // Also compacted memstores are flushed to disk. 364 assertEquals(0, region.getMemStoreDataSize()); 365 System.out.println(s); 366 HBaseTestingUtility.closeRegionAndWAL(region); 367 } 368 369 /*------------------------------------------------------------------------------*/ 370 /* Check the same as above but for index-compaction type of compacting memstore */ 371 @Test 372 public void testSelectiveFlushWithIndexCompaction() throws IOException { 373 /*------------------------------------------------------------------------------*/ 374 /* SETUP */ 375 // Set up the configuration 376 conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 300 * 1024); 377 conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 75 * 1024); 378 conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.5); 379 // set memstore to index-compaction 380 conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, 381 String.valueOf(MemoryCompactionPolicy.BASIC)); 382 383 // Initialize the region 384 HRegion region = initHRegion("testSelectiveFlushWithIndexCompaction", conf); 385 verifyInMemoryFlushSize(region); 386 /*------------------------------------------------------------------------------*/ 387 /* PHASE I - insertions */ 388 // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3 389 for (int i = 1; i <= 1200; i++) { 390 region.put(createPut(1, i)); // compacted memstore 391 if (i <= 100) { 392 region.put(createPut(2, i)); 393 if (i <= 50) { 394 region.put(createDoublePut(3, i)); // subject for in-memory compaction 395 } 396 } 397 } 398 // Now add more puts for CF2, so that we only flush CF2 to disk 399 for (int i = 100; i < 2000; i++) { 400 region.put(createPut(2, i)); 401 } 402 403 /*------------------------------------------------------------------------------*/ 404 /*------------------------------------------------------------------------------*/ 405 /* PHASE I - collect sizes */ 406 long totalMemstoreSizePhaseI = region.getMemStoreDataSize(); 407 // Find the smallest LSNs for edits wrt to each CF. 408 long smallestSeqCF1PhaseI = region.getOldestSeqIdOfStore(FAMILY1); 409 long smallestSeqCF2PhaseI = region.getOldestSeqIdOfStore(FAMILY2); 410 long smallestSeqCF3PhaseI = region.getOldestSeqIdOfStore(FAMILY3); 411 // Find the sizes of the memstores of each CF. 412 MemStoreSize cf1MemstoreSizePhaseI = region.getStore(FAMILY1).getMemStoreSize(); 413 MemStoreSize cf2MemstoreSizePhaseI = region.getStore(FAMILY2).getMemStoreSize(); 414 MemStoreSize cf3MemstoreSizePhaseI = region.getStore(FAMILY3).getMemStoreSize(); 415 // Get the overall smallest LSN in the region's memstores. 416 long smallestSeqInRegionCurrentMemstorePhaseI = getWAL(region) 417 .getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); 418 419 /*------------------------------------------------------------------------------*/ 420 /* PHASE I - validation */ 421 // The overall smallest LSN in the region's memstores should be the same as 422 // the LSN of the smallest edit in CF1 423 assertEquals(smallestSeqCF1PhaseI, smallestSeqInRegionCurrentMemstorePhaseI); 424 // Some other sanity checks. 425 assertTrue(smallestSeqCF1PhaseI < smallestSeqCF2PhaseI); 426 assertTrue(smallestSeqCF2PhaseI < smallestSeqCF3PhaseI); 427 assertTrue(cf1MemstoreSizePhaseI.getDataSize() > 0); 428 assertTrue(cf2MemstoreSizePhaseI.getDataSize() > 0); 429 assertTrue(cf3MemstoreSizePhaseI.getDataSize() > 0); 430 431 // The total memstore size should be the same as the sum of the sizes of 432 // memstores of CF1, CF2 and CF3. 433 assertEquals(totalMemstoreSizePhaseI, cf1MemstoreSizePhaseI.getDataSize() 434 + cf2MemstoreSizePhaseI.getDataSize() + cf3MemstoreSizePhaseI.getDataSize()); 435 436 /*------------------------------------------------------------------------------*/ 437 /* PHASE I - Flush */ 438 // First Flush in Test!!!!!!!!!!!!!!!!!!!!!! 439 // CF1, CF2, CF3, all together they are above the flush size lower bound. 440 // Since CF1 and CF3 are compacting, CF2 is going to be flushed to disk. 441 // CF1 and CF3 - flushed to memory and flatten explicitly 442 region.flush(false); 443 CompactingMemStore cms1 = (CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore; 444 CompactingMemStore cms3 = (CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore; 445 cms1.flushInMemory(); 446 cms3.flushInMemory(); 447 448 // CF3/CF1 should be merged so wait here to be sure the compaction is done 449 while (((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore) 450 .isMemStoreFlushingInMemory()) { 451 Threads.sleep(10); 452 } 453 while (((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore) 454 .isMemStoreFlushingInMemory()) { 455 Threads.sleep(10); 456 } 457 458 /*------------------------------------------------------------------------------*/ 459 /*------------------------------------------------------------------------------*/ 460 /* PHASE II - collect sizes */ 461 // Recalculate everything 462 MemStoreSize cf1MemstoreSizePhaseII = region.getStore(FAMILY1).getMemStoreSize(); 463 MemStoreSize cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getMemStoreSize(); 464 MemStoreSize cf3MemstoreSizePhaseII = region.getStore(FAMILY3).getMemStoreSize(); 465 long smallestSeqInRegionCurrentMemstorePhaseII = getWAL(region) 466 .getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); 467 // Find the smallest LSNs for edits wrt to each CF. 468 long smallestSeqCF3PhaseII = region.getOldestSeqIdOfStore(FAMILY3); 469 long totalMemstoreSizePhaseII = region.getMemStoreDataSize(); 470 471 /*------------------------------------------------------------------------------*/ 472 /* PHASE II - validation */ 473 // CF1 was flushed to memory, should be flattened and take less space 474 assertEquals(cf1MemstoreSizePhaseII.getDataSize() , cf1MemstoreSizePhaseI.getDataSize()); 475 assertTrue(cf1MemstoreSizePhaseII.getHeapSize() < cf1MemstoreSizePhaseI.getHeapSize()); 476 // CF2 should become empty 477 assertEquals(0, cf2MemstoreSizePhaseII.getDataSize()); 478 assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseII.getHeapSize()); 479 // verify that CF3 was flushed to memory and was not compacted (this is an approximation check) 480 // if compacted CF# should be at least twice less because its every key was duplicated 481 assertEquals(cf3MemstoreSizePhaseII.getDataSize() , cf3MemstoreSizePhaseI.getDataSize()); 482 assertTrue(cf3MemstoreSizePhaseI.getHeapSize() / 2 < cf3MemstoreSizePhaseII.getHeapSize()); 483 484 // Now the smallest LSN in the region should be the same as the smallest 485 // LSN in the memstore of CF1. 486 assertEquals(smallestSeqInRegionCurrentMemstorePhaseII, smallestSeqCF1PhaseI); 487 // The total memstore size should be the same as the sum of the sizes of 488 // memstores of CF1, CF2 and CF3. Counting the empty active segments in CF1/2/3 and pipeline 489 // items in CF1/2 490 assertEquals(totalMemstoreSizePhaseII, cf1MemstoreSizePhaseII.getDataSize() 491 + cf2MemstoreSizePhaseII.getDataSize() + cf3MemstoreSizePhaseII.getDataSize()); 492 493 /*------------------------------------------------------------------------------*/ 494 /*------------------------------------------------------------------------------*/ 495 /* PHASE III - insertions */ 496 // Now add more puts for CF1, so that we also flush CF1 to disk instead of 497 // memory in next flush. This is causing the CF! to be flushed to memory twice. 498 for (int i = 1200; i < 8000; i++) { 499 region.put(createPut(1, i)); 500 } 501 502 // CF1 should be flatten and merged so wait here to be sure the compaction is done 503 while (((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore) 504 .isMemStoreFlushingInMemory()) { 505 Threads.sleep(10); 506 } 507 508 /*------------------------------------------------------------------------------*/ 509 /* PHASE III - collect sizes */ 510 // How much does the CF1 memstore occupy now? Will be used later. 511 MemStoreSize cf1MemstoreSizePhaseIII = region.getStore(FAMILY1).getMemStoreSize(); 512 long totalMemstoreSizePhaseIII = region.getMemStoreDataSize(); 513 514 /*------------------------------------------------------------------------------*/ 515 /* PHASE III - validation */ 516 // The total memstore size should be the same as the sum of the sizes of 517 // memstores of CF1, CF2 and CF3. Counting the empty active segments in CF1/2/3 and pipeline 518 // items in CF1/2 519 assertEquals(totalMemstoreSizePhaseIII, cf1MemstoreSizePhaseIII.getDataSize() 520 + cf2MemstoreSizePhaseII.getDataSize() + cf3MemstoreSizePhaseII.getDataSize()); 521 522 /*------------------------------------------------------------------------------*/ 523 /* PHASE III - Flush */ 524 // Second Flush in Test!!!!!!!!!!!!!!!!!!!!!! 525 // CF1 is flushed to disk, but not entirely emptied. 526 // CF2 was and remained empty, same way nothing happens to CF3 527 region.flush(false); 528 529 /*------------------------------------------------------------------------------*/ 530 /*------------------------------------------------------------------------------*/ 531 /* PHASE IV - collect sizes */ 532 // Recalculate everything 533 MemStoreSize cf1MemstoreSizePhaseIV = region.getStore(FAMILY1).getMemStoreSize(); 534 MemStoreSize cf2MemstoreSizePhaseIV = region.getStore(FAMILY2).getMemStoreSize(); 535 MemStoreSize cf3MemstoreSizePhaseIV = region.getStore(FAMILY3).getMemStoreSize(); 536 long smallestSeqInRegionCurrentMemstorePhaseIV = getWAL(region) 537 .getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); 538 long smallestSeqCF3PhaseIV = region.getOldestSeqIdOfStore(FAMILY3); 539 540 /*------------------------------------------------------------------------------*/ 541 /* PHASE IV - validation */ 542 // CF1's biggest pipeline component (inserted before first flush) should be flushed to disk 543 // CF2 should remain empty 544 assertTrue(cf1MemstoreSizePhaseIII.getDataSize() > cf1MemstoreSizePhaseIV.getDataSize()); 545 assertEquals(0, cf2MemstoreSizePhaseIV.getDataSize()); 546 assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseIV.getHeapSize()); 547 // CF3 shouldn't have been touched. 548 assertEquals(cf3MemstoreSizePhaseIV, cf3MemstoreSizePhaseII); 549 // the smallest LSN of CF3 shouldn't change 550 assertEquals(smallestSeqCF3PhaseII, smallestSeqCF3PhaseIV); 551 // CF3 should be bottleneck for WAL 552 assertEquals(smallestSeqInRegionCurrentMemstorePhaseIV, smallestSeqCF3PhaseIV); 553 554 /*------------------------------------------------------------------------------*/ 555 /* PHASE IV - Flush */ 556 // Third Flush in Test!!!!!!!!!!!!!!!!!!!!!! 557 // Force flush to disk on all memstores (flush parameter true). 558 // CF1/CF3 all flushed to disk. Note that active sets of CF1 and CF3 are empty 559 region.flush(true); 560 561 /*------------------------------------------------------------------------------*/ 562 /*------------------------------------------------------------------------------*/ 563 /* PHASE V - collect sizes */ 564 // Recalculate everything 565 MemStoreSize cf1MemstoreSizePhaseV = region.getStore(FAMILY1).getMemStoreSize(); 566 MemStoreSize cf2MemstoreSizePhaseV = region.getStore(FAMILY2).getMemStoreSize(); 567 MemStoreSize cf3MemstoreSizePhaseV = region.getStore(FAMILY3).getMemStoreSize(); 568 long smallestSeqInRegionCurrentMemstorePhaseV = getWAL(region) 569 .getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); 570 long totalMemstoreSizePhaseV = region.getMemStoreDataSize(); 571 572 /*------------------------------------------------------------------------------*/ 573 /* PHASE V - validation */ 574 assertEquals(0, cf1MemstoreSizePhaseV.getDataSize()); 575 assertEquals(MutableSegment.DEEP_OVERHEAD, cf1MemstoreSizePhaseV.getHeapSize()); 576 assertEquals(0, cf2MemstoreSizePhaseV.getDataSize()); 577 assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseV.getHeapSize()); 578 assertEquals(0, cf3MemstoreSizePhaseV.getDataSize()); 579 assertEquals(MutableSegment.DEEP_OVERHEAD, cf3MemstoreSizePhaseV.getHeapSize()); 580 // The total memstores size should be empty 581 assertEquals(0, totalMemstoreSizePhaseV); 582 // Because there is nothing in any memstore the WAL's LSN should be -1 583 assertEquals(HConstants.NO_SEQNUM, smallestSeqInRegionCurrentMemstorePhaseV); 584 585 // What happens when we hit the memstore limit, but we are not able to find 586 // any Column Family above the threshold? 587 // In that case, we should flush all the CFs. 588 589 /*------------------------------------------------------------------------------*/ 590 /*------------------------------------------------------------------------------*/ 591 /* PHASE VI - insertions */ 592 // The memstore limit is 200*1024 and the column family flush threshold is 593 // around 50*1024. We try to just hit the memstore limit with each CF's 594 // memstore being below the CF flush threshold. 595 for (int i = 1; i <= 300; i++) { 596 region.put(createPut(1, i)); 597 region.put(createPut(2, i)); 598 region.put(createPut(3, i)); 599 region.put(createPut(4, i)); 600 region.put(createPut(5, i)); 601 } 602 603 MemStoreSize cf1ActiveSizePhaseVI = region.getStore(FAMILY1).getMemStoreSize(); 604 MemStoreSize cf3ActiveSizePhaseVI = region.getStore(FAMILY3).getMemStoreSize(); 605 MemStoreSize cf5ActiveSizePhaseVI = region.getStore(FAMILIES[4]).getMemStoreSize(); 606 607 /*------------------------------------------------------------------------------*/ 608 /* PHASE VI - Flush */ 609 // Fourth Flush in Test!!!!!!!!!!!!!!!!!!!!!! 610 // None among compacting memstores was flushed to memory due to previous puts. 611 // But is going to be moved to pipeline and flatten due to the flush. 612 region.flush(false); 613 // Since we won't find any CF above the threshold, and hence no specific 614 // store to flush, we should flush all the memstores 615 // Also compacted memstores are flushed to disk, but not entirely emptied 616 MemStoreSize cf1ActiveSizePhaseVII = region.getStore(FAMILY1).getMemStoreSize(); 617 MemStoreSize cf3ActiveSizePhaseVII = region.getStore(FAMILY3).getMemStoreSize(); 618 MemStoreSize cf5ActiveSizePhaseVII = region.getStore(FAMILIES[4]).getMemStoreSize(); 619 620 assertTrue(cf1ActiveSizePhaseVII.getDataSize() < cf1ActiveSizePhaseVI.getDataSize()); 621 assertTrue(cf3ActiveSizePhaseVII.getDataSize() < cf3ActiveSizePhaseVI.getDataSize()); 622 assertTrue(cf5ActiveSizePhaseVII.getDataSize() < cf5ActiveSizePhaseVI.getDataSize()); 623 624 HBaseTestingUtility.closeRegionAndWAL(region); 625 } 626 627 @Test 628 public void testSelectiveFlushAndWALinDataCompaction() throws IOException { 629 // Set up the configuration 630 conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 300 * 1024); 631 conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 75 * 1024); 632 // set memstore to do data compaction and not to use the speculative scan 633 conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, 634 String.valueOf(MemoryCompactionPolicy.EAGER)); 635 636 // Intialize the HRegion 637 HRegion region = initHRegion("testSelectiveFlushAndWALinDataCompaction", conf); 638 verifyInMemoryFlushSize(region); 639 // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3 640 for (int i = 1; i <= 1200; i++) { 641 region.put(createPut(1, i)); 642 if (i <= 100) { 643 region.put(createPut(2, i)); 644 if (i <= 50) { 645 region.put(createPut(3, i)); 646 } 647 } 648 } 649 // Now add more puts for CF2, so that we only flush CF2 to disk 650 for (int i = 100; i < 2000; i++) { 651 region.put(createPut(2, i)); 652 } 653 654 // in this test check the non-composite snapshot - flashing only tail of the pipeline 655 ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore).setCompositeSnapshot(false); 656 ((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore).setCompositeSnapshot(false); 657 658 long totalMemstoreSize = region.getMemStoreDataSize(); 659 660 // Find the sizes of the memstores of each CF. 661 MemStoreSize cf1MemstoreSizePhaseI = region.getStore(FAMILY1).getMemStoreSize(); 662 MemStoreSize cf2MemstoreSizePhaseI = region.getStore(FAMILY2).getMemStoreSize(); 663 MemStoreSize cf3MemstoreSizePhaseI = region.getStore(FAMILY3).getMemStoreSize(); 664 665 // Some other sanity checks. 666 assertTrue(cf1MemstoreSizePhaseI.getDataSize() > 0); 667 assertTrue(cf2MemstoreSizePhaseI.getDataSize() > 0); 668 assertTrue(cf3MemstoreSizePhaseI.getDataSize() > 0); 669 670 // The total memstore size should be the same as the sum of the sizes of 671 // memstores of CF1, CF2 and CF3. 672 String msg = "totalMemstoreSize="+totalMemstoreSize + 673 " DefaultMemStore.DEEP_OVERHEAD="+DefaultMemStore.DEEP_OVERHEAD + 674 " cf1MemstoreSizePhaseI="+cf1MemstoreSizePhaseI + 675 " cf2MemstoreSizePhaseI="+cf2MemstoreSizePhaseI + 676 " cf3MemstoreSizePhaseI="+cf3MemstoreSizePhaseI ; 677 assertEquals(msg, totalMemstoreSize, cf1MemstoreSizePhaseI.getDataSize() 678 + cf2MemstoreSizePhaseI.getDataSize() + cf3MemstoreSizePhaseI.getDataSize()); 679 680 // Flush! 681 CompactingMemStore cms1 = (CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore; 682 CompactingMemStore cms3 = (CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore; 683 cms1.flushInMemory(); 684 cms3.flushInMemory(); 685 region.flush(false); 686 687 MemStoreSize cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getMemStoreSize(); 688 689 long smallestSeqInRegionCurrentMemstorePhaseII = 690 region.getWAL().getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); 691 long smallestSeqCF1PhaseII = region.getOldestSeqIdOfStore(FAMILY1); 692 long smallestSeqCF2PhaseII = region.getOldestSeqIdOfStore(FAMILY2); 693 long smallestSeqCF3PhaseII = region.getOldestSeqIdOfStore(FAMILY3); 694 695 // CF2 should have been cleared 696 assertEquals(0, cf2MemstoreSizePhaseII.getDataSize()); 697 assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseII.getHeapSize()); 698 699 String s = "\n\n----------------------------------\n" 700 + "Upon initial insert and flush, LSN of CF1 is:" 701 + smallestSeqCF1PhaseII + ". LSN of CF2 is:" 702 + smallestSeqCF2PhaseII + ". LSN of CF3 is:" 703 + smallestSeqCF3PhaseII + ", smallestSeqInRegionCurrentMemstore:" 704 + smallestSeqInRegionCurrentMemstorePhaseII + "\n"; 705 706 // Add same entries to compact them later 707 for (int i = 1; i <= 1200; i++) { 708 region.put(createPut(1, i)); 709 if (i <= 100) { 710 region.put(createPut(2, i)); 711 if (i <= 50) { 712 region.put(createPut(3, i)); 713 } 714 } 715 } 716 // Now add more puts for CF2, so that we only flush CF2 to disk 717 for (int i = 100; i < 2000; i++) { 718 region.put(createPut(2, i)); 719 } 720 721 long smallestSeqInRegionCurrentMemstorePhaseIII = 722 region.getWAL().getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); 723 long smallestSeqCF1PhaseIII = region.getOldestSeqIdOfStore(FAMILY1); 724 long smallestSeqCF2PhaseIII = region.getOldestSeqIdOfStore(FAMILY2); 725 long smallestSeqCF3PhaseIII = region.getOldestSeqIdOfStore(FAMILY3); 726 727 s = s + "The smallest sequence in region WAL is: " + smallestSeqInRegionCurrentMemstorePhaseIII 728 + ", the smallest sequence in CF1:" + smallestSeqCF1PhaseIII + ", " + 729 "the smallest sequence in CF2:" 730 + smallestSeqCF2PhaseIII +", the smallest sequence in CF3:" + smallestSeqCF3PhaseIII + "\n"; 731 732 // Flush! 733 cms1 = (CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore; 734 cms3 = (CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore; 735 cms1.flushInMemory(); 736 cms3.flushInMemory(); 737 region.flush(false); 738 739 long smallestSeqInRegionCurrentMemstorePhaseIV = 740 region.getWAL().getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); 741 long smallestSeqCF1PhaseIV = region.getOldestSeqIdOfStore(FAMILY1); 742 long smallestSeqCF2PhaseIV = region.getOldestSeqIdOfStore(FAMILY2); 743 long smallestSeqCF3PhaseIV = region.getOldestSeqIdOfStore(FAMILY3); 744 745 s = s + "The smallest sequence in region WAL is: " + smallestSeqInRegionCurrentMemstorePhaseIV 746 + ", the smallest sequence in CF1:" + smallestSeqCF1PhaseIV + ", " + 747 "the smallest sequence in CF2:" 748 + smallestSeqCF2PhaseIV +", the smallest sequence in CF3:" + smallestSeqCF3PhaseIV + "\n"; 749 750 // now check that the LSN of the entire WAL, of CF1 and of CF3 has progressed due to compaction 751 assertTrue(s, smallestSeqInRegionCurrentMemstorePhaseIV > 752 smallestSeqInRegionCurrentMemstorePhaseIII); 753 assertTrue(smallestSeqCF1PhaseIV > smallestSeqCF1PhaseIII); 754 assertTrue(smallestSeqCF3PhaseIV > smallestSeqCF3PhaseIII); 755 756 HBaseTestingUtility.closeRegionAndWAL(region); 757 } 758 759 @Test 760 public void testSelectiveFlushWithBasicAndMerge() throws IOException { 761 // Set up the configuration 762 conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 300 * 1024); 763 conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 75 * 1024); 764 conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.8); 765 // set memstore to do index compaction with merge 766 conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, 767 String.valueOf(MemoryCompactionPolicy.BASIC)); 768 // length of pipeline that requires merge 769 conf.setInt(MemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY, 1); 770 771 // Intialize the HRegion 772 HRegion region = initHRegion("testSelectiveFlushWithBasicAndMerge", conf); 773 verifyInMemoryFlushSize(region); 774 // Add 1200 entries for CF1 (CompactingMemStore), 100 for CF2 (DefaultMemStore) and 50 for CF3 775 for (int i = 1; i <= 1200; i++) { 776 region.put(createPut(1, i)); 777 if (i <= 100) { 778 region.put(createPut(2, i)); 779 if (i <= 50) { 780 region.put(createPut(3, i)); 781 } 782 } 783 } 784 // Now put more entries to CF2 785 for (int i = 100; i < 2000; i++) { 786 region.put(createPut(2, i)); 787 } 788 789 long totalMemstoreSize = region.getMemStoreDataSize(); 790 791 // test in-memory flashing into CAM here 792 ((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore).setIndexType( 793 CompactingMemStore.IndexType.ARRAY_MAP); 794 ((CompactingMemStore) ((HStore)region.getStore(FAMILY3)).memstore).setIndexType( 795 CompactingMemStore.IndexType.ARRAY_MAP); 796 797 // Find the sizes of the memstores of each CF. 798 MemStoreSize cf1MemstoreSizePhaseI = region.getStore(FAMILY1).getMemStoreSize(); 799 MemStoreSize cf2MemstoreSizePhaseI = region.getStore(FAMILY2).getMemStoreSize(); 800 MemStoreSize cf3MemstoreSizePhaseI = region.getStore(FAMILY3).getMemStoreSize(); 801 802 // Some other sanity checks. 803 assertTrue(cf1MemstoreSizePhaseI.getDataSize() > 0); 804 assertTrue(cf2MemstoreSizePhaseI.getDataSize() > 0); 805 assertTrue(cf3MemstoreSizePhaseI.getDataSize() > 0); 806 807 // The total memstore size should be the same as the sum of the sizes of 808 // memstores of CF1, CF2 and CF3. 809 assertEquals(totalMemstoreSize, 810 cf1MemstoreSizePhaseI.getDataSize() + cf2MemstoreSizePhaseI.getDataSize() 811 + cf3MemstoreSizePhaseI.getDataSize()); 812 813 // Initiate in-memory Flush! 814 ((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore).flushInMemory(); 815 ((CompactingMemStore) ((HStore)region.getStore(FAMILY3)).memstore).flushInMemory(); 816 // CF1 and CF3 should be flatten and merged so wait here to be sure the merge is done 817 while (((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore) 818 .isMemStoreFlushingInMemory()) { 819 Threads.sleep(10); 820 } 821 while (((CompactingMemStore) ((HStore)region.getStore(FAMILY3)).memstore) 822 .isMemStoreFlushingInMemory()) { 823 Threads.sleep(10); 824 } 825 826 // Flush-to-disk! CF2 only should be flushed 827 region.flush(false); 828 829 MemStoreSize cf1MemstoreSizePhaseII = region.getStore(FAMILY1).getMemStoreSize(); 830 MemStoreSize cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getMemStoreSize(); 831 MemStoreSize cf3MemstoreSizePhaseII = region.getStore(FAMILY3).getMemStoreSize(); 832 833 // CF1 should be flushed in memory and just flattened, so CF1 heap overhead should be smaller 834 assertTrue(cf1MemstoreSizePhaseI.getHeapSize() > cf1MemstoreSizePhaseII.getHeapSize()); 835 // CF1 should be flushed in memory and just flattened, so CF1 data size should remain the same 836 assertEquals(cf1MemstoreSizePhaseI.getDataSize(), cf1MemstoreSizePhaseII.getDataSize()); 837 // CF2 should have been cleared 838 assertEquals(0, cf2MemstoreSizePhaseII.getDataSize()); 839 840 // Add the same amount of entries to see the merging 841 for (int i = 1; i <= 1200; i++) { 842 region.put(createPut(1, i)); 843 if (i <= 100) { 844 region.put(createPut(2, i)); 845 if (i <= 50) { 846 region.put(createPut(3, i)); 847 } 848 } 849 } 850 // Now add more puts for CF2, so that we only flush CF2 to disk 851 for (int i = 100; i < 2000; i++) { 852 region.put(createPut(2, i)); 853 } 854 855 MemStoreSize cf1MemstoreSizePhaseIII = region.getStore(FAMILY1).getMemStoreSize(); 856 857 // Flush in memory! 858 ((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore).flushInMemory(); 859 ((CompactingMemStore) ((HStore)region.getStore(FAMILY3)).memstore).flushInMemory(); 860 // CF1 and CF3 should be merged so wait here to be sure the merge is done 861 while (((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore) 862 .isMemStoreFlushingInMemory()) { 863 Threads.sleep(10); 864 } 865 while (((CompactingMemStore) ((HStore)region.getStore(FAMILY3)).memstore) 866 .isMemStoreFlushingInMemory()) { 867 Threads.sleep(10); 868 } 869 region.flush(false); 870 871 MemStoreSize cf1MemstoreSizePhaseIV = region.getStore(FAMILY1).getMemStoreSize(); 872 MemStoreSize cf2MemstoreSizePhaseIV = region.getStore(FAMILY2).getMemStoreSize(); 873 874 assertEquals(2*cf1MemstoreSizePhaseI.getDataSize(), cf1MemstoreSizePhaseIV.getDataSize()); 875 // the decrease in the heap size due to usage of CellArrayMap instead of CSLM 876 // should be the same in flattening and in merge (first and second in-memory-flush) 877 // but in phase 1 we do not yet have immutable segment 878 assertEquals( 879 cf1MemstoreSizePhaseI.getHeapSize() - cf1MemstoreSizePhaseII.getHeapSize(), 880 cf1MemstoreSizePhaseIII.getHeapSize() - cf1MemstoreSizePhaseIV.getHeapSize() 881 - CellArrayImmutableSegment.DEEP_OVERHEAD_CAM); 882 assertEquals(3, // active, one in pipeline, snapshot 883 ((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore).getSegments().size()); 884 // CF2 should have been cleared 885 assertEquals("\n<<< DEBUG: The data--heap sizes of stores before/after first flushes," 886 + " CF1: " + cf1MemstoreSizePhaseI.getDataSize() + "/" + cf1MemstoreSizePhaseII 887 .getDataSize() + "--" + cf1MemstoreSizePhaseI.getHeapSize() + "/" + cf1MemstoreSizePhaseII 888 .getHeapSize() + ", CF2: " + cf2MemstoreSizePhaseI.getDataSize() + "/" 889 + cf2MemstoreSizePhaseII.getDataSize() + "--" + cf2MemstoreSizePhaseI.getHeapSize() + "/" 890 + cf2MemstoreSizePhaseII.getHeapSize() + ", CF3: " + cf3MemstoreSizePhaseI.getDataSize() 891 + "/" + cf3MemstoreSizePhaseII.getDataSize() + "--" + cf3MemstoreSizePhaseI.getHeapSize() 892 + "/" + cf3MemstoreSizePhaseII.getHeapSize() + "\n<<< AND before/after second flushes " 893 + " CF1: " + cf1MemstoreSizePhaseIII.getDataSize() + "/" + cf1MemstoreSizePhaseIV 894 .getDataSize() + "--" + cf1MemstoreSizePhaseIII.getHeapSize() + "/" + cf1MemstoreSizePhaseIV 895 .getHeapSize() + "\n", 896 0, cf2MemstoreSizePhaseIV.getDataSize()); 897 898 HBaseTestingUtility.closeRegionAndWAL(region); 899 } 900 901 // should end in 300 seconds (5 minutes) 902 @Test 903 public void testStressFlushAndWALinIndexCompaction() throws IOException { 904 // Set up the configuration 905 conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 600 * 1024); 906 conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 907 200 * 1024); 908 // set memstore to do data compaction and not to use the speculative scan 909 conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, 910 String.valueOf(MemoryCompactionPolicy.BASIC)); 911 912 // Successfully initialize the HRegion 913 HRegion region = initHRegion("testSelectiveFlushAndWALinDataCompaction", conf); 914 verifyInMemoryFlushSize(region); 915 Thread[] threads = new Thread[25]; 916 for (int i = 0; i < threads.length; i++) { 917 int id = i * 10000; 918 ConcurrentPutRunnable runnable = new ConcurrentPutRunnable(region, id); 919 threads[i] = new Thread(runnable); 920 threads[i].start(); 921 } 922 Threads.sleep(10000); // let other threads start 923 region.flush(true); // enforce flush of everything TO DISK while there are still ongoing puts 924 Threads.sleep(10000); // let other threads continue 925 region.flush(true); // enforce flush of everything TO DISK while there are still ongoing puts 926 927 ((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore).flushInMemory(); 928 ((CompactingMemStore) ((HStore)region.getStore(FAMILY3)).memstore).flushInMemory(); 929 while (((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore) 930 .isMemStoreFlushingInMemory()) { 931 Threads.sleep(10); 932 } 933 while (((CompactingMemStore) ((HStore)region.getStore(FAMILY3)).memstore) 934 .isMemStoreFlushingInMemory()) { 935 Threads.sleep(10); 936 } 937 938 for (int i = 0; i < threads.length; i++) { 939 try { 940 threads[i].join(); 941 } catch (InterruptedException e) { 942 e.printStackTrace(); 943 } 944 } 945 } 946 947 /** 948 * The in-memory-flusher thread performs the flush asynchronously. There is at most one thread per 949 * memstore instance. It takes the updatesLock exclusively, pushes active into the pipeline, 950 * releases updatesLock and compacts the pipeline. 951 */ 952 private class ConcurrentPutRunnable implements Runnable { 953 private final HRegion stressedRegion; 954 private final int startNumber; 955 956 ConcurrentPutRunnable(HRegion r, int i) { 957 this.stressedRegion = r; 958 this.startNumber = i; 959 } 960 961 @Override 962 public void run() { 963 964 try { 965 int dummy = startNumber / 10000; 966 System.out.print("Thread " + dummy + " with start number " + startNumber + " starts\n"); 967 // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3 968 for (int i = startNumber; i <= startNumber + 3000; i++) { 969 stressedRegion.put(createPut(1, i)); 970 if (i <= startNumber + 2000) { 971 stressedRegion.put(createPut(2, i)); 972 if (i <= startNumber + 1000) { 973 stressedRegion.put(createPut(3, i)); 974 } 975 } 976 } 977 System.out.print("Thread with start number " + startNumber + " continues to more puts\n"); 978 // Now add more puts for CF2, so that we only flush CF2 to disk 979 for (int i = startNumber + 3000; i < startNumber + 5000; i++) { 980 stressedRegion.put(createPut(2, i)); 981 } 982 // And add more puts for CF1 983 for (int i = startNumber + 5000; i < startNumber + 7000; i++) { 984 stressedRegion.put(createPut(1, i)); 985 } 986 System.out.print("Thread with start number " + startNumber + " flushes\n"); 987 // flush (IN MEMORY) one of the stores (each thread flushes different store) 988 // and wait till the flush and the following action are done 989 if (startNumber == 0) { 990 ((CompactingMemStore) ((HStore) stressedRegion.getStore(FAMILY1)).memstore) 991 .flushInMemory(); 992 while (((CompactingMemStore) ((HStore) stressedRegion.getStore(FAMILY1)).memstore) 993 .isMemStoreFlushingInMemory()) { 994 Threads.sleep(10); 995 } 996 } 997 if (startNumber == 10000) { 998 ((CompactingMemStore) ((HStore) stressedRegion.getStore(FAMILY2)).memstore).flushInMemory(); 999 while (((CompactingMemStore) ((HStore) stressedRegion.getStore(FAMILY2)).memstore) 1000 .isMemStoreFlushingInMemory()) { 1001 Threads.sleep(10); 1002 } 1003 } 1004 if (startNumber == 20000) { 1005 ((CompactingMemStore) ((HStore) stressedRegion.getStore(FAMILY3)).memstore).flushInMemory(); 1006 while (((CompactingMemStore) ((HStore) stressedRegion.getStore(FAMILY3)).memstore) 1007 .isMemStoreFlushingInMemory()) { 1008 Threads.sleep(10); 1009 } 1010 } 1011 System.out.print("Thread with start number " + startNumber + " finishes\n"); 1012 } catch (IOException e) { 1013 assert false; 1014 } 1015 } 1016 } 1017 1018 private WAL getWAL(Region region) { 1019 return ((HRegion)region).getWAL(); 1020 } 1021}