001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.util.concurrent.Executors; 025import java.util.concurrent.ThreadPoolExecutor; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.hbase.HBaseClassTestRule; 029import org.apache.hadoop.hbase.HBaseConfiguration; 030import org.apache.hadoop.hbase.HBaseTestingUtility; 031import org.apache.hadoop.hbase.HColumnDescriptor; 032import org.apache.hadoop.hbase.HConstants; 033import org.apache.hadoop.hbase.HRegionInfo; 034import org.apache.hadoop.hbase.HTableDescriptor; 035import org.apache.hadoop.hbase.MemoryCompactionPolicy; 036import org.apache.hadoop.hbase.TableName; 037import org.apache.hadoop.hbase.client.Put; 038import org.apache.hadoop.hbase.testclassification.LargeTests; 039import org.apache.hadoop.hbase.testclassification.RegionServerTests; 040import org.apache.hadoop.hbase.util.Bytes; 041import org.apache.hadoop.hbase.util.Threads; 042import org.apache.hadoop.hbase.wal.WAL; 043import org.junit.Before; 044import org.junit.ClassRule; 045import org.junit.Test; 046import org.junit.experimental.categories.Category; 047import org.mockito.Mockito; 048 049/** 050 * This test verifies the correctness of the Per Column Family flushing strategy when part of the 051 * memstores are compacted memstores 052 */ 053@Category({ RegionServerTests.class, LargeTests.class }) 054public class TestWalAndCompactingMemStoreFlush { 055 056 @ClassRule 057 public static final HBaseClassTestRule CLASS_RULE = 058 HBaseClassTestRule.forClass(TestWalAndCompactingMemStoreFlush.class); 059 060 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 061 private static final Path DIR = TEST_UTIL.getDataTestDir("TestHRegion"); 062 public static final TableName TABLENAME = 063 TableName.valueOf("TestWalAndCompactingMemStoreFlush", "t1"); 064 065 public static final byte[][] FAMILIES = { Bytes.toBytes("f1"), Bytes.toBytes("f2"), 066 Bytes.toBytes("f3"), Bytes.toBytes("f4"), Bytes.toBytes("f5") }; 067 068 public static final byte[] FAMILY1 = FAMILIES[0]; 069 public static final byte[] FAMILY2 = FAMILIES[1]; 070 public static final byte[] FAMILY3 = FAMILIES[2]; 071 072 private Configuration conf; 073 074 private HRegion initHRegion(String callingMethod, Configuration conf) throws IOException { 075 int i = 0; 076 HTableDescriptor htd = new HTableDescriptor(TABLENAME); 077 for (byte[] family : FAMILIES) { 078 HColumnDescriptor hcd = new HColumnDescriptor(family); 079 // even column families are going to have compacted memstore 080 if (i % 2 == 0) { 081 hcd.setInMemoryCompaction(MemoryCompactionPolicy 082 .valueOf(conf.get(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY))); 083 } else { 084 hcd.setInMemoryCompaction(MemoryCompactionPolicy.NONE); 085 } 086 htd.addFamily(hcd); 087 i++; 088 } 089 090 HRegionInfo info = new HRegionInfo(TABLENAME, null, null, false); 091 Path path = new Path(DIR, callingMethod); 092 HRegion region = HBaseTestingUtility.createRegionAndWAL(info, path, conf, htd, false); 093 region.regionServicesForStores = Mockito.spy(region.regionServicesForStores); 094 ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1); 095 Mockito.when(region.regionServicesForStores.getInMemoryCompactionPool()).thenReturn(pool); 096 region.initialize(null); 097 return region; 098 } 099 100 // A helper function to create puts. 101 private Put createPut(int familyNum, int putNum) { 102 byte[] qf = Bytes.toBytes("q" + familyNum); 103 byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum); 104 byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum); 105 Put p = new Put(row); 106 p.addColumn(FAMILIES[familyNum - 1], qf, val); 107 return p; 108 } 109 110 // A helper function to create double puts, so something can be compacted later. 111 private Put createDoublePut(int familyNum, int putNum) { 112 byte[] qf = Bytes.toBytes("q" + familyNum); 113 byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum); 114 byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum); 115 Put p = new Put(row); 116 // add twice with different timestamps 117 p.addColumn(FAMILIES[familyNum - 1], qf, 10, val); 118 p.addColumn(FAMILIES[familyNum - 1], qf, 20, val); 119 return p; 120 } 121 122 private void verifyInMemoryFlushSize(Region region) { 123 assertEquals( 124 ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore).getInmemoryFlushSize(), 125 ((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore).getInmemoryFlushSize()); 126 } 127 128 @Before 129 public void setup() { 130 conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); 131 conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, 132 FlushNonSloppyStoresFirstPolicy.class.getName()); 133 conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.5); 134 } 135 136 @Test 137 public void testSelectiveFlushWithEager() throws IOException { 138 // Set up the configuration 139 conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 300 * 1024); 140 conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 75 * 1024); 141 // set memstore to do data compaction 142 conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, 143 String.valueOf(MemoryCompactionPolicy.EAGER)); 144 145 // Intialize the region 146 HRegion region = initHRegion("testSelectiveFlushWithEager", conf); 147 verifyInMemoryFlushSize(region); 148 // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3 149 for (int i = 1; i <= 1200; i++) { 150 region.put(createPut(1, i)); // compacted memstore, all the keys are unique 151 152 if (i <= 100) { 153 region.put(createPut(2, i)); 154 if (i <= 50) { 155 // compacted memstore, subject for compaction due to duplications 156 region.put(createDoublePut(3, i)); 157 } 158 } 159 } 160 161 // Now add more puts for CF2, so that we only flush CF2 (DefaultMemStore) to disk 162 for (int i = 100; i < 2000; i++) { 163 region.put(createPut(2, i)); 164 } 165 166 long totalMemstoreSize = region.getMemStoreDataSize(); 167 168 // Find the smallest LSNs for edits wrt to each CF. 169 long smallestSeqCF1PhaseI = region.getOldestSeqIdOfStore(FAMILY1); 170 long smallestSeqCF2PhaseI = region.getOldestSeqIdOfStore(FAMILY2); 171 long smallestSeqCF3PhaseI = region.getOldestSeqIdOfStore(FAMILY3); 172 173 // Find the sizes of the memstores of each CF. 174 MemStoreSize cf1MemstoreSizePhaseI = region.getStore(FAMILY1).getMemStoreSize(); 175 MemStoreSize cf2MemstoreSizePhaseI = region.getStore(FAMILY2).getMemStoreSize(); 176 MemStoreSize cf3MemstoreSizePhaseI = region.getStore(FAMILY3).getMemStoreSize(); 177 178 // Get the overall smallest LSN in the region's memstores. 179 long smallestSeqInRegionCurrentMemstorePhaseI = 180 getWAL(region).getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); 181 182 String s = "\n\n----------------------------------\n" 183 + "Upon initial insert and before any flush, size of CF1 is:" + cf1MemstoreSizePhaseI 184 + ", is CF1 compacted memstore?:" + region.getStore(FAMILY1).isSloppyMemStore() 185 + ". Size of CF2 is:" + cf2MemstoreSizePhaseI + ", is CF2 compacted memstore?:" 186 + region.getStore(FAMILY2).isSloppyMemStore() + ". Size of CF3 is:" + cf3MemstoreSizePhaseI 187 + ", is CF3 compacted memstore?:" + region.getStore(FAMILY3).isSloppyMemStore() + "\n"; 188 189 // The overall smallest LSN in the region's memstores should be the same as 190 // the LSN of the smallest edit in CF1 191 assertEquals(smallestSeqCF1PhaseI, smallestSeqInRegionCurrentMemstorePhaseI); 192 193 // Some other sanity checks. 194 assertTrue(smallestSeqCF1PhaseI < smallestSeqCF2PhaseI); 195 assertTrue(smallestSeqCF2PhaseI < smallestSeqCF3PhaseI); 196 assertTrue(cf1MemstoreSizePhaseI.getDataSize() > 0); 197 assertTrue(cf2MemstoreSizePhaseI.getDataSize() > 0); 198 assertTrue(cf3MemstoreSizePhaseI.getDataSize() > 0); 199 200 // The total memstore size should be the same as the sum of the sizes of 201 // memstores of CF1, CF2 and CF3. 202 String msg = "totalMemstoreSize=" + totalMemstoreSize + " cf1MemstoreSizePhaseI=" 203 + cf1MemstoreSizePhaseI + " cf2MemstoreSizePhaseI=" + cf2MemstoreSizePhaseI 204 + " cf3MemstoreSizePhaseI=" + cf3MemstoreSizePhaseI; 205 assertEquals(msg, totalMemstoreSize, cf1MemstoreSizePhaseI.getDataSize() 206 + cf2MemstoreSizePhaseI.getDataSize() + cf3MemstoreSizePhaseI.getDataSize()); 207 208 // Flush!!!!!!!!!!!!!!!!!!!!!! 209 // We have big compacting memstore CF1 and two small memstores: 210 // CF2 (not compacted) and CF3 (compacting) 211 // All together they are above the flush size lower bound. 212 // Since CF1 and CF3 should be flushed to memory (not to disk), 213 // CF2 is going to be flushed to disk. 214 // CF1 - nothing to compact (but flattening), CF3 - should be twice compacted 215 CompactingMemStore cms1 = (CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore; 216 CompactingMemStore cms3 = (CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore; 217 cms1.flushInMemory(); 218 cms3.flushInMemory(); 219 region.flush(false); 220 221 // Recalculate everything 222 MemStoreSize cf1MemstoreSizePhaseII = region.getStore(FAMILY1).getMemStoreSize(); 223 MemStoreSize cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getMemStoreSize(); 224 MemStoreSize cf3MemstoreSizePhaseII = region.getStore(FAMILY3).getMemStoreSize(); 225 226 long smallestSeqInRegionCurrentMemstorePhaseII = 227 getWAL(region).getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); 228 // Find the smallest LSNs for edits wrt to each CF. 229 long smallestSeqCF1PhaseII = region.getOldestSeqIdOfStore(FAMILY1); 230 long smallestSeqCF2PhaseII = region.getOldestSeqIdOfStore(FAMILY2); 231 long smallestSeqCF3PhaseII = region.getOldestSeqIdOfStore(FAMILY3); 232 233 s = s + "\n----After first flush! CF1 should be flushed to memory, but not compacted.---\n" 234 + "Size of CF1 is:" + cf1MemstoreSizePhaseII + ", size of CF2 is:" + cf2MemstoreSizePhaseII 235 + ", size of CF3 is:" + cf3MemstoreSizePhaseII + "\n"; 236 237 // CF1 was flushed to memory, but there is nothing to compact, and CF1 was flattened 238 assertTrue(cf1MemstoreSizePhaseII.getDataSize() == cf1MemstoreSizePhaseI.getDataSize()); 239 assertTrue(cf1MemstoreSizePhaseII.getHeapSize() < cf1MemstoreSizePhaseI.getHeapSize()); 240 241 // CF2 should become empty 242 assertEquals(0, cf2MemstoreSizePhaseII.getDataSize()); 243 assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseII.getHeapSize()); 244 245 // verify that CF3 was flushed to memory and was compacted (this is approximation check) 246 assertTrue(cf3MemstoreSizePhaseI.getDataSize() > cf3MemstoreSizePhaseII.getDataSize()); 247 assertTrue(cf3MemstoreSizePhaseI.getHeapSize() / 2 > cf3MemstoreSizePhaseII.getHeapSize()); 248 249 // Now the smallest LSN in the region should be the same as the smallest 250 // LSN in the memstore of CF1. 251 assertEquals(smallestSeqInRegionCurrentMemstorePhaseII, smallestSeqCF1PhaseI); 252 253 // Now add more puts for CF1, so that we also flush CF1 to disk instead of 254 // memory in next flush 255 for (int i = 1200; i < 3000; i++) { 256 region.put(createPut(1, i)); 257 } 258 259 s = s + "The smallest sequence in region WAL is: " + smallestSeqInRegionCurrentMemstorePhaseII 260 + ", the smallest sequence in CF1:" + smallestSeqCF1PhaseII + ", " 261 + "the smallest sequence in CF2:" + smallestSeqCF2PhaseII + ", the smallest sequence in CF3:" 262 + smallestSeqCF3PhaseII + "\n"; 263 264 // How much does the CF1 memstore occupy? Will be used later. 265 MemStoreSize cf1MemstoreSizePhaseIII = region.getStore(FAMILY1).getMemStoreSize(); 266 long smallestSeqCF1PhaseIII = region.getOldestSeqIdOfStore(FAMILY1); 267 268 s = s + "----After more puts into CF1 its size is:" + cf1MemstoreSizePhaseIII 269 + ", and its sequence is:" + smallestSeqCF1PhaseIII + " ----\n"; 270 271 // Flush!!!!!!!!!!!!!!!!!!!!!! 272 // Flush again, CF1 is flushed to disk 273 // CF2 is flushed to disk, because it is not in-memory compacted memstore 274 // CF3 is flushed empty to memory (actually nothing happens to CF3) 275 region.flush(false); 276 277 // Recalculate everything 278 MemStoreSize cf1MemstoreSizePhaseIV = region.getStore(FAMILY1).getMemStoreSize(); 279 MemStoreSize cf2MemstoreSizePhaseIV = region.getStore(FAMILY2).getMemStoreSize(); 280 MemStoreSize cf3MemstoreSizePhaseIV = region.getStore(FAMILY3).getMemStoreSize(); 281 282 long smallestSeqInRegionCurrentMemstorePhaseIV = 283 getWAL(region).getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); 284 long smallestSeqCF1PhaseIV = region.getOldestSeqIdOfStore(FAMILY1); 285 long smallestSeqCF2PhaseIV = region.getOldestSeqIdOfStore(FAMILY2); 286 long smallestSeqCF3PhaseIV = region.getOldestSeqIdOfStore(FAMILY3); 287 288 s = s + "----After SECOND FLUSH, CF1 size is:" + cf1MemstoreSizePhaseIV + ", CF2 size is:" 289 + cf2MemstoreSizePhaseIV + " and CF3 size is:" + cf3MemstoreSizePhaseIV + "\n"; 290 291 s = s + "The smallest sequence in region WAL is: " + smallestSeqInRegionCurrentMemstorePhaseIV 292 + ", the smallest sequence in CF1:" + smallestSeqCF1PhaseIV + ", " 293 + "the smallest sequence in CF2:" + smallestSeqCF2PhaseIV + ", the smallest sequence in CF3:" 294 + smallestSeqCF3PhaseIV + "\n"; 295 296 // CF1's pipeline component (inserted before first flush) should be flushed to disk 297 // CF2 should be flushed to disk 298 assertTrue(cf1MemstoreSizePhaseIII.getDataSize() > cf1MemstoreSizePhaseIV.getDataSize()); 299 assertEquals(0, cf2MemstoreSizePhaseIV.getDataSize()); 300 assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseIV.getHeapSize()); 301 302 // CF3 shouldn't have been touched. 303 assertEquals(cf3MemstoreSizePhaseIV, cf3MemstoreSizePhaseII); 304 305 // the smallest LSN of CF3 shouldn't change 306 assertEquals(smallestSeqCF3PhaseII, smallestSeqCF3PhaseIV); 307 308 // CF3 should be bottleneck for WAL 309 assertEquals(s, smallestSeqInRegionCurrentMemstorePhaseIV, smallestSeqCF3PhaseIV); 310 311 // Flush!!!!!!!!!!!!!!!!!!!!!! 312 // Trying to clean the existing memstores, CF2 all flushed to disk. The single 313 // memstore segment in the compaction pipeline of CF1 and CF3 should be flushed to disk. 314 region.flush(true); 315 316 // Recalculate everything 317 MemStoreSize cf1MemstoreSizePhaseV = region.getStore(FAMILY1).getMemStoreSize(); 318 MemStoreSize cf2MemstoreSizePhaseV = region.getStore(FAMILY2).getMemStoreSize(); 319 MemStoreSize cf3MemstoreSizePhaseV = region.getStore(FAMILY3).getMemStoreSize(); 320 long smallestSeqInRegionCurrentMemstorePhaseV = 321 getWAL(region).getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); 322 323 assertEquals(0, cf1MemstoreSizePhaseV.getDataSize()); 324 assertEquals(MutableSegment.DEEP_OVERHEAD, cf1MemstoreSizePhaseV.getHeapSize()); 325 assertEquals(0, cf2MemstoreSizePhaseV.getDataSize()); 326 assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseV.getHeapSize()); 327 assertEquals(0, cf3MemstoreSizePhaseV.getDataSize()); 328 assertEquals(MutableSegment.DEEP_OVERHEAD, cf3MemstoreSizePhaseV.getHeapSize()); 329 330 // What happens when we hit the memstore limit, but we are not able to find 331 // any Column Family above the threshold? 332 // In that case, we should flush all the CFs. 333 334 // The memstore limit is 100*1024 and the column family flush threshold is 335 // around 25*1024. We try to just hit the memstore limit with each CF's 336 // memstore being below the CF flush threshold. 337 for (int i = 1; i <= 300; i++) { 338 region.put(createPut(1, i)); 339 region.put(createPut(2, i)); 340 region.put(createPut(3, i)); 341 region.put(createPut(4, i)); 342 region.put(createPut(5, i)); 343 } 344 345 region.flush(false); 346 347 s = s + "----AFTER THIRD AND FORTH FLUSH, The smallest sequence in region WAL is: " 348 + smallestSeqInRegionCurrentMemstorePhaseV 349 + ". After additional inserts and last flush, the entire region size is:" 350 + region.getMemStoreDataSize() + "\n----------------------------------\n"; 351 352 // Since we won't find any CF above the threshold, and hence no specific 353 // store to flush, we should flush all the memstores 354 // Also compacted memstores are flushed to disk. 355 assertEquals(0, region.getMemStoreDataSize()); 356 System.out.println(s); 357 HBaseTestingUtility.closeRegionAndWAL(region); 358 } 359 360 /*------------------------------------------------------------------------------*/ 361 /* Check the same as above but for index-compaction type of compacting memstore */ 362 @Test 363 public void testSelectiveFlushWithIndexCompaction() throws IOException { 364 /*------------------------------------------------------------------------------*/ 365 /* SETUP */ 366 // Set up the configuration 367 conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 300 * 1024); 368 conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 75 * 1024); 369 conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.5); 370 // set memstore to index-compaction 371 conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, 372 String.valueOf(MemoryCompactionPolicy.BASIC)); 373 374 // Initialize the region 375 HRegion region = initHRegion("testSelectiveFlushWithIndexCompaction", conf); 376 verifyInMemoryFlushSize(region); 377 /*------------------------------------------------------------------------------*/ 378 /* PHASE I - insertions */ 379 // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3 380 for (int i = 1; i <= 1200; i++) { 381 region.put(createPut(1, i)); // compacted memstore 382 if (i <= 100) { 383 region.put(createPut(2, i)); 384 if (i <= 50) { 385 region.put(createDoublePut(3, i)); // subject for in-memory compaction 386 } 387 } 388 } 389 // Now add more puts for CF2, so that we only flush CF2 to disk 390 for (int i = 100; i < 2000; i++) { 391 region.put(createPut(2, i)); 392 } 393 394 /*------------------------------------------------------------------------------*/ 395 /*------------------------------------------------------------------------------*/ 396 /* PHASE I - collect sizes */ 397 long totalMemstoreSizePhaseI = region.getMemStoreDataSize(); 398 // Find the smallest LSNs for edits wrt to each CF. 399 long smallestSeqCF1PhaseI = region.getOldestSeqIdOfStore(FAMILY1); 400 long smallestSeqCF2PhaseI = region.getOldestSeqIdOfStore(FAMILY2); 401 long smallestSeqCF3PhaseI = region.getOldestSeqIdOfStore(FAMILY3); 402 // Find the sizes of the memstores of each CF. 403 MemStoreSize cf1MemstoreSizePhaseI = region.getStore(FAMILY1).getMemStoreSize(); 404 MemStoreSize cf2MemstoreSizePhaseI = region.getStore(FAMILY2).getMemStoreSize(); 405 MemStoreSize cf3MemstoreSizePhaseI = region.getStore(FAMILY3).getMemStoreSize(); 406 // Get the overall smallest LSN in the region's memstores. 407 long smallestSeqInRegionCurrentMemstorePhaseI = 408 getWAL(region).getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); 409 410 /*------------------------------------------------------------------------------*/ 411 /* PHASE I - validation */ 412 // The overall smallest LSN in the region's memstores should be the same as 413 // the LSN of the smallest edit in CF1 414 assertEquals(smallestSeqCF1PhaseI, smallestSeqInRegionCurrentMemstorePhaseI); 415 // Some other sanity checks. 416 assertTrue(smallestSeqCF1PhaseI < smallestSeqCF2PhaseI); 417 assertTrue(smallestSeqCF2PhaseI < smallestSeqCF3PhaseI); 418 assertTrue(cf1MemstoreSizePhaseI.getDataSize() > 0); 419 assertTrue(cf2MemstoreSizePhaseI.getDataSize() > 0); 420 assertTrue(cf3MemstoreSizePhaseI.getDataSize() > 0); 421 422 // The total memstore size should be the same as the sum of the sizes of 423 // memstores of CF1, CF2 and CF3. 424 assertEquals(totalMemstoreSizePhaseI, cf1MemstoreSizePhaseI.getDataSize() 425 + cf2MemstoreSizePhaseI.getDataSize() + cf3MemstoreSizePhaseI.getDataSize()); 426 427 /*------------------------------------------------------------------------------*/ 428 /* PHASE I - Flush */ 429 // First Flush in Test!!!!!!!!!!!!!!!!!!!!!! 430 // CF1, CF2, CF3, all together they are above the flush size lower bound. 431 // Since CF1 and CF3 are compacting, CF2 is going to be flushed to disk. 432 // CF1 and CF3 - flushed to memory and flatten explicitly 433 region.flush(false); 434 CompactingMemStore cms1 = (CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore; 435 CompactingMemStore cms3 = (CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore; 436 cms1.flushInMemory(); 437 cms3.flushInMemory(); 438 439 // CF3/CF1 should be merged so wait here to be sure the compaction is done 440 while ( 441 ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore) 442 .isMemStoreFlushingInMemory() 443 ) { 444 Threads.sleep(10); 445 } 446 while ( 447 ((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore) 448 .isMemStoreFlushingInMemory() 449 ) { 450 Threads.sleep(10); 451 } 452 453 /*------------------------------------------------------------------------------*/ 454 /*------------------------------------------------------------------------------*/ 455 /* PHASE II - collect sizes */ 456 // Recalculate everything 457 MemStoreSize cf1MemstoreSizePhaseII = region.getStore(FAMILY1).getMemStoreSize(); 458 MemStoreSize cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getMemStoreSize(); 459 MemStoreSize cf3MemstoreSizePhaseII = region.getStore(FAMILY3).getMemStoreSize(); 460 long smallestSeqInRegionCurrentMemstorePhaseII = 461 getWAL(region).getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); 462 // Find the smallest LSNs for edits wrt to each CF. 463 long smallestSeqCF3PhaseII = region.getOldestSeqIdOfStore(FAMILY3); 464 long totalMemstoreSizePhaseII = region.getMemStoreDataSize(); 465 466 /*------------------------------------------------------------------------------*/ 467 /* PHASE II - validation */ 468 // CF1 was flushed to memory, should be flattened and take less space 469 assertEquals(cf1MemstoreSizePhaseII.getDataSize(), cf1MemstoreSizePhaseI.getDataSize()); 470 assertTrue(cf1MemstoreSizePhaseII.getHeapSize() < cf1MemstoreSizePhaseI.getHeapSize()); 471 // CF2 should become empty 472 assertEquals(0, cf2MemstoreSizePhaseII.getDataSize()); 473 assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseII.getHeapSize()); 474 // verify that CF3 was flushed to memory and was not compacted (this is an approximation check) 475 // if compacted CF# should be at least twice less because its every key was duplicated 476 assertEquals(cf3MemstoreSizePhaseII.getDataSize(), cf3MemstoreSizePhaseI.getDataSize()); 477 assertTrue(cf3MemstoreSizePhaseI.getHeapSize() / 2 < cf3MemstoreSizePhaseII.getHeapSize()); 478 479 // Now the smallest LSN in the region should be the same as the smallest 480 // LSN in the memstore of CF1. 481 assertEquals(smallestSeqInRegionCurrentMemstorePhaseII, smallestSeqCF1PhaseI); 482 // The total memstore size should be the same as the sum of the sizes of 483 // memstores of CF1, CF2 and CF3. Counting the empty active segments in CF1/2/3 and pipeline 484 // items in CF1/2 485 assertEquals(totalMemstoreSizePhaseII, cf1MemstoreSizePhaseII.getDataSize() 486 + cf2MemstoreSizePhaseII.getDataSize() + cf3MemstoreSizePhaseII.getDataSize()); 487 488 /*------------------------------------------------------------------------------*/ 489 /*------------------------------------------------------------------------------*/ 490 /* PHASE III - insertions */ 491 // Now add more puts for CF1, so that we also flush CF1 to disk instead of 492 // memory in next flush. This is causing the CF! to be flushed to memory twice. 493 for (int i = 1200; i < 8000; i++) { 494 region.put(createPut(1, i)); 495 } 496 497 // CF1 should be flatten and merged so wait here to be sure the compaction is done 498 while ( 499 ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore) 500 .isMemStoreFlushingInMemory() 501 ) { 502 Threads.sleep(10); 503 } 504 505 /*------------------------------------------------------------------------------*/ 506 /* PHASE III - collect sizes */ 507 // How much does the CF1 memstore occupy now? Will be used later. 508 MemStoreSize cf1MemstoreSizePhaseIII = region.getStore(FAMILY1).getMemStoreSize(); 509 long totalMemstoreSizePhaseIII = region.getMemStoreDataSize(); 510 511 /*------------------------------------------------------------------------------*/ 512 /* PHASE III - validation */ 513 // The total memstore size should be the same as the sum of the sizes of 514 // memstores of CF1, CF2 and CF3. Counting the empty active segments in CF1/2/3 and pipeline 515 // items in CF1/2 516 assertEquals(totalMemstoreSizePhaseIII, cf1MemstoreSizePhaseIII.getDataSize() 517 + cf2MemstoreSizePhaseII.getDataSize() + cf3MemstoreSizePhaseII.getDataSize()); 518 519 /*------------------------------------------------------------------------------*/ 520 /* PHASE III - Flush */ 521 // Second Flush in Test!!!!!!!!!!!!!!!!!!!!!! 522 // CF1 is flushed to disk, but not entirely emptied. 523 // CF2 was and remained empty, same way nothing happens to CF3 524 region.flush(false); 525 526 /*------------------------------------------------------------------------------*/ 527 /*------------------------------------------------------------------------------*/ 528 /* PHASE IV - collect sizes */ 529 // Recalculate everything 530 MemStoreSize cf1MemstoreSizePhaseIV = region.getStore(FAMILY1).getMemStoreSize(); 531 MemStoreSize cf2MemstoreSizePhaseIV = region.getStore(FAMILY2).getMemStoreSize(); 532 MemStoreSize cf3MemstoreSizePhaseIV = region.getStore(FAMILY3).getMemStoreSize(); 533 long smallestSeqInRegionCurrentMemstorePhaseIV = 534 getWAL(region).getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); 535 long smallestSeqCF3PhaseIV = region.getOldestSeqIdOfStore(FAMILY3); 536 537 /*------------------------------------------------------------------------------*/ 538 /* PHASE IV - validation */ 539 // CF1's biggest pipeline component (inserted before first flush) should be flushed to disk 540 // CF2 should remain empty 541 assertTrue(cf1MemstoreSizePhaseIII.getDataSize() > cf1MemstoreSizePhaseIV.getDataSize()); 542 assertEquals(0, cf2MemstoreSizePhaseIV.getDataSize()); 543 assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseIV.getHeapSize()); 544 // CF3 shouldn't have been touched. 545 assertEquals(cf3MemstoreSizePhaseIV, cf3MemstoreSizePhaseII); 546 // the smallest LSN of CF3 shouldn't change 547 assertEquals(smallestSeqCF3PhaseII, smallestSeqCF3PhaseIV); 548 // CF3 should be bottleneck for WAL 549 assertEquals(smallestSeqInRegionCurrentMemstorePhaseIV, smallestSeqCF3PhaseIV); 550 551 /*------------------------------------------------------------------------------*/ 552 /* PHASE IV - Flush */ 553 // Third Flush in Test!!!!!!!!!!!!!!!!!!!!!! 554 // Force flush to disk on all memstores (flush parameter true). 555 // CF1/CF3 all flushed to disk. Note that active sets of CF1 and CF3 are empty 556 region.flush(true); 557 558 /*------------------------------------------------------------------------------*/ 559 /*------------------------------------------------------------------------------*/ 560 /* PHASE V - collect sizes */ 561 // Recalculate everything 562 MemStoreSize cf1MemstoreSizePhaseV = region.getStore(FAMILY1).getMemStoreSize(); 563 MemStoreSize cf2MemstoreSizePhaseV = region.getStore(FAMILY2).getMemStoreSize(); 564 MemStoreSize cf3MemstoreSizePhaseV = region.getStore(FAMILY3).getMemStoreSize(); 565 long smallestSeqInRegionCurrentMemstorePhaseV = 566 getWAL(region).getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); 567 long totalMemstoreSizePhaseV = region.getMemStoreDataSize(); 568 569 /*------------------------------------------------------------------------------*/ 570 /* PHASE V - validation */ 571 assertEquals(0, cf1MemstoreSizePhaseV.getDataSize()); 572 assertEquals(MutableSegment.DEEP_OVERHEAD, cf1MemstoreSizePhaseV.getHeapSize()); 573 assertEquals(0, cf2MemstoreSizePhaseV.getDataSize()); 574 assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseV.getHeapSize()); 575 assertEquals(0, cf3MemstoreSizePhaseV.getDataSize()); 576 assertEquals(MutableSegment.DEEP_OVERHEAD, cf3MemstoreSizePhaseV.getHeapSize()); 577 // The total memstores size should be empty 578 assertEquals(0, totalMemstoreSizePhaseV); 579 // Because there is nothing in any memstore the WAL's LSN should be -1 580 assertEquals(HConstants.NO_SEQNUM, smallestSeqInRegionCurrentMemstorePhaseV); 581 582 // What happens when we hit the memstore limit, but we are not able to find 583 // any Column Family above the threshold? 584 // In that case, we should flush all the CFs. 585 586 /*------------------------------------------------------------------------------*/ 587 /*------------------------------------------------------------------------------*/ 588 /* PHASE VI - insertions */ 589 // The memstore limit is 200*1024 and the column family flush threshold is 590 // around 50*1024. We try to just hit the memstore limit with each CF's 591 // memstore being below the CF flush threshold. 592 for (int i = 1; i <= 300; i++) { 593 region.put(createPut(1, i)); 594 region.put(createPut(2, i)); 595 region.put(createPut(3, i)); 596 region.put(createPut(4, i)); 597 region.put(createPut(5, i)); 598 } 599 600 MemStoreSize cf1ActiveSizePhaseVI = region.getStore(FAMILY1).getMemStoreSize(); 601 MemStoreSize cf3ActiveSizePhaseVI = region.getStore(FAMILY3).getMemStoreSize(); 602 MemStoreSize cf5ActiveSizePhaseVI = region.getStore(FAMILIES[4]).getMemStoreSize(); 603 604 /*------------------------------------------------------------------------------*/ 605 /* PHASE VI - Flush */ 606 // Fourth Flush in Test!!!!!!!!!!!!!!!!!!!!!! 607 // None among compacting memstores was flushed to memory due to previous puts. 608 // But is going to be moved to pipeline and flatten due to the flush. 609 region.flush(false); 610 // Since we won't find any CF above the threshold, and hence no specific 611 // store to flush, we should flush all the memstores 612 // Also compacted memstores are flushed to disk, but not entirely emptied 613 MemStoreSize cf1ActiveSizePhaseVII = region.getStore(FAMILY1).getMemStoreSize(); 614 MemStoreSize cf3ActiveSizePhaseVII = region.getStore(FAMILY3).getMemStoreSize(); 615 MemStoreSize cf5ActiveSizePhaseVII = region.getStore(FAMILIES[4]).getMemStoreSize(); 616 617 assertTrue(cf1ActiveSizePhaseVII.getDataSize() < cf1ActiveSizePhaseVI.getDataSize()); 618 assertTrue(cf3ActiveSizePhaseVII.getDataSize() < cf3ActiveSizePhaseVI.getDataSize()); 619 assertTrue(cf5ActiveSizePhaseVII.getDataSize() < cf5ActiveSizePhaseVI.getDataSize()); 620 621 HBaseTestingUtility.closeRegionAndWAL(region); 622 } 623 624 @Test 625 public void testSelectiveFlushAndWALinDataCompaction() throws IOException { 626 // Set up the configuration 627 conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 300 * 1024); 628 conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 75 * 1024); 629 // set memstore to do data compaction and not to use the speculative scan 630 conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, 631 String.valueOf(MemoryCompactionPolicy.EAGER)); 632 633 // Intialize the HRegion 634 HRegion region = initHRegion("testSelectiveFlushAndWALinDataCompaction", conf); 635 verifyInMemoryFlushSize(region); 636 // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3 637 for (int i = 1; i <= 1200; i++) { 638 region.put(createPut(1, i)); 639 if (i <= 100) { 640 region.put(createPut(2, i)); 641 if (i <= 50) { 642 region.put(createPut(3, i)); 643 } 644 } 645 } 646 // Now add more puts for CF2, so that we only flush CF2 to disk 647 for (int i = 100; i < 2000; i++) { 648 region.put(createPut(2, i)); 649 } 650 651 // in this test check the non-composite snapshot - flashing only tail of the pipeline 652 ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore).setCompositeSnapshot(false); 653 ((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore).setCompositeSnapshot(false); 654 655 long totalMemstoreSize = region.getMemStoreDataSize(); 656 657 // Find the sizes of the memstores of each CF. 658 MemStoreSize cf1MemstoreSizePhaseI = region.getStore(FAMILY1).getMemStoreSize(); 659 MemStoreSize cf2MemstoreSizePhaseI = region.getStore(FAMILY2).getMemStoreSize(); 660 MemStoreSize cf3MemstoreSizePhaseI = region.getStore(FAMILY3).getMemStoreSize(); 661 662 // Some other sanity checks. 663 assertTrue(cf1MemstoreSizePhaseI.getDataSize() > 0); 664 assertTrue(cf2MemstoreSizePhaseI.getDataSize() > 0); 665 assertTrue(cf3MemstoreSizePhaseI.getDataSize() > 0); 666 667 // The total memstore size should be the same as the sum of the sizes of 668 // memstores of CF1, CF2 and CF3. 669 String msg = "totalMemstoreSize=" + totalMemstoreSize + " DefaultMemStore.DEEP_OVERHEAD=" 670 + DefaultMemStore.DEEP_OVERHEAD + " cf1MemstoreSizePhaseI=" + cf1MemstoreSizePhaseI 671 + " cf2MemstoreSizePhaseI=" + cf2MemstoreSizePhaseI + " cf3MemstoreSizePhaseI=" 672 + cf3MemstoreSizePhaseI; 673 assertEquals(msg, totalMemstoreSize, cf1MemstoreSizePhaseI.getDataSize() 674 + cf2MemstoreSizePhaseI.getDataSize() + cf3MemstoreSizePhaseI.getDataSize()); 675 676 // Flush! 677 CompactingMemStore cms1 = (CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore; 678 CompactingMemStore cms3 = (CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore; 679 cms1.flushInMemory(); 680 cms3.flushInMemory(); 681 region.flush(false); 682 683 MemStoreSize cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getMemStoreSize(); 684 685 long smallestSeqInRegionCurrentMemstorePhaseII = 686 region.getWAL().getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); 687 long smallestSeqCF1PhaseII = region.getOldestSeqIdOfStore(FAMILY1); 688 long smallestSeqCF2PhaseII = region.getOldestSeqIdOfStore(FAMILY2); 689 long smallestSeqCF3PhaseII = region.getOldestSeqIdOfStore(FAMILY3); 690 691 // CF2 should have been cleared 692 assertEquals(0, cf2MemstoreSizePhaseII.getDataSize()); 693 assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseII.getHeapSize()); 694 695 String s = "\n\n----------------------------------\n" 696 + "Upon initial insert and flush, LSN of CF1 is:" + smallestSeqCF1PhaseII + ". LSN of CF2 is:" 697 + smallestSeqCF2PhaseII + ". LSN of CF3 is:" + smallestSeqCF3PhaseII 698 + ", smallestSeqInRegionCurrentMemstore:" + smallestSeqInRegionCurrentMemstorePhaseII + "\n"; 699 700 // Add same entries to compact them later 701 for (int i = 1; i <= 1200; i++) { 702 region.put(createPut(1, i)); 703 if (i <= 100) { 704 region.put(createPut(2, i)); 705 if (i <= 50) { 706 region.put(createPut(3, i)); 707 } 708 } 709 } 710 // Now add more puts for CF2, so that we only flush CF2 to disk 711 for (int i = 100; i < 2000; i++) { 712 region.put(createPut(2, i)); 713 } 714 715 long smallestSeqInRegionCurrentMemstorePhaseIII = 716 region.getWAL().getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); 717 long smallestSeqCF1PhaseIII = region.getOldestSeqIdOfStore(FAMILY1); 718 long smallestSeqCF2PhaseIII = region.getOldestSeqIdOfStore(FAMILY2); 719 long smallestSeqCF3PhaseIII = region.getOldestSeqIdOfStore(FAMILY3); 720 721 s = s + "The smallest sequence in region WAL is: " + smallestSeqInRegionCurrentMemstorePhaseIII 722 + ", the smallest sequence in CF1:" + smallestSeqCF1PhaseIII + ", " 723 + "the smallest sequence in CF2:" + smallestSeqCF2PhaseIII + ", the smallest sequence in CF3:" 724 + smallestSeqCF3PhaseIII + "\n"; 725 726 // Flush! 727 cms1 = (CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore; 728 cms3 = (CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore; 729 cms1.flushInMemory(); 730 cms3.flushInMemory(); 731 region.flush(false); 732 733 long smallestSeqInRegionCurrentMemstorePhaseIV = 734 region.getWAL().getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); 735 long smallestSeqCF1PhaseIV = region.getOldestSeqIdOfStore(FAMILY1); 736 long smallestSeqCF2PhaseIV = region.getOldestSeqIdOfStore(FAMILY2); 737 long smallestSeqCF3PhaseIV = region.getOldestSeqIdOfStore(FAMILY3); 738 739 s = s + "The smallest sequence in region WAL is: " + smallestSeqInRegionCurrentMemstorePhaseIV 740 + ", the smallest sequence in CF1:" + smallestSeqCF1PhaseIV + ", " 741 + "the smallest sequence in CF2:" + smallestSeqCF2PhaseIV + ", the smallest sequence in CF3:" 742 + smallestSeqCF3PhaseIV + "\n"; 743 744 // now check that the LSN of the entire WAL, of CF1 and of CF3 has progressed due to compaction 745 assertTrue(s, 746 smallestSeqInRegionCurrentMemstorePhaseIV > smallestSeqInRegionCurrentMemstorePhaseIII); 747 assertTrue(smallestSeqCF1PhaseIV > smallestSeqCF1PhaseIII); 748 assertTrue(smallestSeqCF3PhaseIV > smallestSeqCF3PhaseIII); 749 750 HBaseTestingUtility.closeRegionAndWAL(region); 751 } 752 753 @Test 754 public void testSelectiveFlushWithBasicAndMerge() throws IOException { 755 // Set up the configuration 756 conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 300 * 1024); 757 conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 75 * 1024); 758 conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.8); 759 // set memstore to do index compaction with merge 760 conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, 761 String.valueOf(MemoryCompactionPolicy.BASIC)); 762 // length of pipeline that requires merge 763 conf.setInt(MemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY, 1); 764 765 // Intialize the HRegion 766 HRegion region = initHRegion("testSelectiveFlushWithBasicAndMerge", conf); 767 verifyInMemoryFlushSize(region); 768 // Add 1200 entries for CF1 (CompactingMemStore), 100 for CF2 (DefaultMemStore) and 50 for CF3 769 for (int i = 1; i <= 1200; i++) { 770 region.put(createPut(1, i)); 771 if (i <= 100) { 772 region.put(createPut(2, i)); 773 if (i <= 50) { 774 region.put(createPut(3, i)); 775 } 776 } 777 } 778 // Now put more entries to CF2 779 for (int i = 100; i < 2000; i++) { 780 region.put(createPut(2, i)); 781 } 782 783 long totalMemstoreSize = region.getMemStoreDataSize(); 784 785 // test in-memory flashing into CAM here 786 ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore) 787 .setIndexType(CompactingMemStore.IndexType.ARRAY_MAP); 788 ((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore) 789 .setIndexType(CompactingMemStore.IndexType.ARRAY_MAP); 790 791 // Find the sizes of the memstores of each CF. 792 MemStoreSize cf1MemstoreSizePhaseI = region.getStore(FAMILY1).getMemStoreSize(); 793 MemStoreSize cf2MemstoreSizePhaseI = region.getStore(FAMILY2).getMemStoreSize(); 794 MemStoreSize cf3MemstoreSizePhaseI = region.getStore(FAMILY3).getMemStoreSize(); 795 796 // Some other sanity checks. 797 assertTrue(cf1MemstoreSizePhaseI.getDataSize() > 0); 798 assertTrue(cf2MemstoreSizePhaseI.getDataSize() > 0); 799 assertTrue(cf3MemstoreSizePhaseI.getDataSize() > 0); 800 801 // The total memstore size should be the same as the sum of the sizes of 802 // memstores of CF1, CF2 and CF3. 803 assertEquals(totalMemstoreSize, cf1MemstoreSizePhaseI.getDataSize() 804 + cf2MemstoreSizePhaseI.getDataSize() + cf3MemstoreSizePhaseI.getDataSize()); 805 806 // Initiate in-memory Flush! 807 ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore).flushInMemory(); 808 ((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore).flushInMemory(); 809 // CF1 and CF3 should be flatten and merged so wait here to be sure the merge is done 810 while ( 811 ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore) 812 .isMemStoreFlushingInMemory() 813 ) { 814 Threads.sleep(10); 815 } 816 while ( 817 ((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore) 818 .isMemStoreFlushingInMemory() 819 ) { 820 Threads.sleep(10); 821 } 822 823 // Flush-to-disk! CF2 only should be flushed 824 region.flush(false); 825 826 MemStoreSize cf1MemstoreSizePhaseII = region.getStore(FAMILY1).getMemStoreSize(); 827 MemStoreSize cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getMemStoreSize(); 828 MemStoreSize cf3MemstoreSizePhaseII = region.getStore(FAMILY3).getMemStoreSize(); 829 830 // CF1 should be flushed in memory and just flattened, so CF1 heap overhead should be smaller 831 assertTrue(cf1MemstoreSizePhaseI.getHeapSize() > cf1MemstoreSizePhaseII.getHeapSize()); 832 // CF1 should be flushed in memory and just flattened, so CF1 data size should remain the same 833 assertEquals(cf1MemstoreSizePhaseI.getDataSize(), cf1MemstoreSizePhaseII.getDataSize()); 834 // CF2 should have been cleared 835 assertEquals(0, cf2MemstoreSizePhaseII.getDataSize()); 836 837 // Add the same amount of entries to see the merging 838 for (int i = 1; i <= 1200; i++) { 839 region.put(createPut(1, i)); 840 if (i <= 100) { 841 region.put(createPut(2, i)); 842 if (i <= 50) { 843 region.put(createPut(3, i)); 844 } 845 } 846 } 847 // Now add more puts for CF2, so that we only flush CF2 to disk 848 for (int i = 100; i < 2000; i++) { 849 region.put(createPut(2, i)); 850 } 851 852 MemStoreSize cf1MemstoreSizePhaseIII = region.getStore(FAMILY1).getMemStoreSize(); 853 854 // Flush in memory! 855 ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore).flushInMemory(); 856 ((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore).flushInMemory(); 857 // CF1 and CF3 should be merged so wait here to be sure the merge is done 858 while ( 859 ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore) 860 .isMemStoreFlushingInMemory() 861 ) { 862 Threads.sleep(10); 863 } 864 while ( 865 ((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore) 866 .isMemStoreFlushingInMemory() 867 ) { 868 Threads.sleep(10); 869 } 870 region.flush(false); 871 872 MemStoreSize cf1MemstoreSizePhaseIV = region.getStore(FAMILY1).getMemStoreSize(); 873 MemStoreSize cf2MemstoreSizePhaseIV = region.getStore(FAMILY2).getMemStoreSize(); 874 875 assertEquals(2 * cf1MemstoreSizePhaseI.getDataSize(), cf1MemstoreSizePhaseIV.getDataSize()); 876 // the decrease in the heap size due to usage of CellArrayMap instead of CSLM 877 // should be the same in flattening and in merge (first and second in-memory-flush) 878 // but in phase 1 we do not yet have immutable segment 879 assertEquals(cf1MemstoreSizePhaseI.getHeapSize() - cf1MemstoreSizePhaseII.getHeapSize(), 880 cf1MemstoreSizePhaseIII.getHeapSize() - cf1MemstoreSizePhaseIV.getHeapSize() 881 - CellArrayImmutableSegment.DEEP_OVERHEAD_CAM); 882 assertEquals(3, // active, one in pipeline, snapshot 883 ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore).getSegments().size()); 884 // CF2 should have been cleared 885 assertEquals("\n<<< DEBUG: The data--heap sizes of stores before/after first flushes," 886 + " CF1: " + cf1MemstoreSizePhaseI.getDataSize() + "/" + cf1MemstoreSizePhaseII.getDataSize() 887 + "--" + cf1MemstoreSizePhaseI.getHeapSize() + "/" + cf1MemstoreSizePhaseII.getHeapSize() 888 + ", CF2: " + cf2MemstoreSizePhaseI.getDataSize() + "/" + cf2MemstoreSizePhaseII.getDataSize() 889 + "--" + cf2MemstoreSizePhaseI.getHeapSize() + "/" + cf2MemstoreSizePhaseII.getHeapSize() 890 + ", CF3: " + cf3MemstoreSizePhaseI.getDataSize() + "/" + cf3MemstoreSizePhaseII.getDataSize() 891 + "--" + cf3MemstoreSizePhaseI.getHeapSize() + "/" + cf3MemstoreSizePhaseII.getHeapSize() 892 + "\n<<< AND before/after second flushes " + " CF1: " + cf1MemstoreSizePhaseIII.getDataSize() 893 + "/" + cf1MemstoreSizePhaseIV.getDataSize() + "--" + cf1MemstoreSizePhaseIII.getHeapSize() 894 + "/" + cf1MemstoreSizePhaseIV.getHeapSize() + "\n", 0, cf2MemstoreSizePhaseIV.getDataSize()); 895 896 HBaseTestingUtility.closeRegionAndWAL(region); 897 } 898 899 // should end in 300 seconds (5 minutes) 900 @Test 901 public void testStressFlushAndWALinIndexCompaction() throws IOException { 902 // Set up the configuration 903 conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 600 * 1024); 904 conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 905 200 * 1024); 906 // set memstore to do data compaction and not to use the speculative scan 907 conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, 908 String.valueOf(MemoryCompactionPolicy.BASIC)); 909 910 // Successfully initialize the HRegion 911 HRegion region = initHRegion("testSelectiveFlushAndWALinDataCompaction", conf); 912 verifyInMemoryFlushSize(region); 913 Thread[] threads = new Thread[25]; 914 for (int i = 0; i < threads.length; i++) { 915 int id = i * 10000; 916 ConcurrentPutRunnable runnable = new ConcurrentPutRunnable(region, id); 917 threads[i] = new Thread(runnable); 918 threads[i].start(); 919 } 920 Threads.sleep(10000); // let other threads start 921 region.flush(true); // enforce flush of everything TO DISK while there are still ongoing puts 922 Threads.sleep(10000); // let other threads continue 923 region.flush(true); // enforce flush of everything TO DISK while there are still ongoing puts 924 925 ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore).flushInMemory(); 926 ((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore).flushInMemory(); 927 while ( 928 ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore) 929 .isMemStoreFlushingInMemory() 930 ) { 931 Threads.sleep(10); 932 } 933 while ( 934 ((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore) 935 .isMemStoreFlushingInMemory() 936 ) { 937 Threads.sleep(10); 938 } 939 940 for (int i = 0; i < threads.length; i++) { 941 try { 942 threads[i].join(); 943 } catch (InterruptedException e) { 944 e.printStackTrace(); 945 } 946 } 947 } 948 949 /** 950 * The in-memory-flusher thread performs the flush asynchronously. There is at most one thread per 951 * memstore instance. It takes the updatesLock exclusively, pushes active into the pipeline, 952 * releases updatesLock and compacts the pipeline. 953 */ 954 private class ConcurrentPutRunnable implements Runnable { 955 private final HRegion stressedRegion; 956 private final int startNumber; 957 958 ConcurrentPutRunnable(HRegion r, int i) { 959 this.stressedRegion = r; 960 this.startNumber = i; 961 } 962 963 @Override 964 public void run() { 965 966 try { 967 int dummy = startNumber / 10000; 968 System.out.print("Thread " + dummy + " with start number " + startNumber + " starts\n"); 969 // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3 970 for (int i = startNumber; i <= startNumber + 3000; i++) { 971 stressedRegion.put(createPut(1, i)); 972 if (i <= startNumber + 2000) { 973 stressedRegion.put(createPut(2, i)); 974 if (i <= startNumber + 1000) { 975 stressedRegion.put(createPut(3, i)); 976 } 977 } 978 } 979 System.out.print("Thread with start number " + startNumber + " continues to more puts\n"); 980 // Now add more puts for CF2, so that we only flush CF2 to disk 981 for (int i = startNumber + 3000; i < startNumber + 5000; i++) { 982 stressedRegion.put(createPut(2, i)); 983 } 984 // And add more puts for CF1 985 for (int i = startNumber + 5000; i < startNumber + 7000; i++) { 986 stressedRegion.put(createPut(1, i)); 987 } 988 System.out.print("Thread with start number " + startNumber + " flushes\n"); 989 // flush (IN MEMORY) one of the stores (each thread flushes different store) 990 // and wait till the flush and the following action are done 991 if (startNumber == 0) { 992 ((CompactingMemStore) ((HStore) stressedRegion.getStore(FAMILY1)).memstore) 993 .flushInMemory(); 994 while ( 995 ((CompactingMemStore) ((HStore) stressedRegion.getStore(FAMILY1)).memstore) 996 .isMemStoreFlushingInMemory() 997 ) { 998 Threads.sleep(10); 999 } 1000 } 1001 if (startNumber == 10000) { 1002 ((CompactingMemStore) ((HStore) stressedRegion.getStore(FAMILY2)).memstore) 1003 .flushInMemory(); 1004 while ( 1005 ((CompactingMemStore) ((HStore) stressedRegion.getStore(FAMILY2)).memstore) 1006 .isMemStoreFlushingInMemory() 1007 ) { 1008 Threads.sleep(10); 1009 } 1010 } 1011 if (startNumber == 20000) { 1012 ((CompactingMemStore) ((HStore) stressedRegion.getStore(FAMILY3)).memstore) 1013 .flushInMemory(); 1014 while ( 1015 ((CompactingMemStore) ((HStore) stressedRegion.getStore(FAMILY3)).memstore) 1016 .isMemStoreFlushingInMemory() 1017 ) { 1018 Threads.sleep(10); 1019 } 1020 } 1021 System.out.print("Thread with start number " + startNumber + " finishes\n"); 1022 } catch (IOException e) { 1023 assert false; 1024 } 1025 } 1026 } 1027 1028 private WAL getWAL(Region region) { 1029 return ((HRegion) region).getWAL(); 1030 } 1031}