001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertNotNull; 022import static org.junit.Assert.assertNull; 023import static org.junit.Assert.assertTrue; 024 025import java.io.IOException; 026import java.util.Arrays; 027import java.util.List; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.fs.Path; 030import org.apache.hadoop.hbase.HBaseClassTestRule; 031import org.apache.hadoop.hbase.HBaseConfiguration; 032import org.apache.hadoop.hbase.HBaseTestingUtil; 033import org.apache.hadoop.hbase.HConstants; 034import org.apache.hadoop.hbase.NamespaceDescriptor; 035import org.apache.hadoop.hbase.SingleProcessHBaseCluster; 036import org.apache.hadoop.hbase.TableName; 037import org.apache.hadoop.hbase.Waiter; 038import org.apache.hadoop.hbase.client.Admin; 039import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 040import org.apache.hadoop.hbase.client.Connection; 041import org.apache.hadoop.hbase.client.ConnectionFactory; 042import org.apache.hadoop.hbase.client.Get; 043import org.apache.hadoop.hbase.client.Put; 044import org.apache.hadoop.hbase.client.RegionInfo; 045import org.apache.hadoop.hbase.client.RegionInfoBuilder; 046import org.apache.hadoop.hbase.client.Result; 047import org.apache.hadoop.hbase.client.Table; 048import org.apache.hadoop.hbase.client.TableDescriptor; 049import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 050import org.apache.hadoop.hbase.testclassification.LargeTests; 051import org.apache.hadoop.hbase.testclassification.RegionServerTests; 052import org.apache.hadoop.hbase.util.Bytes; 053import org.apache.hadoop.hbase.util.JVMClusterUtil; 054import org.apache.hadoop.hbase.util.Pair; 055import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 056import org.apache.hadoop.hbase.wal.WAL; 057import org.junit.ClassRule; 058import org.junit.Test; 059import org.junit.experimental.categories.Category; 060import org.slf4j.Logger; 061import org.slf4j.LoggerFactory; 062 063import org.apache.hbase.thirdparty.com.google.common.hash.Hashing; 064 065/** 066 * This test verifies the correctness of the Per Column Family flushing strategy 067 */ 068@Category({ RegionServerTests.class, LargeTests.class }) 069public class TestPerColumnFamilyFlush { 070 071 @ClassRule 072 public static final HBaseClassTestRule CLASS_RULE = 073 HBaseClassTestRule.forClass(TestPerColumnFamilyFlush.class); 074 075 private static final Logger LOG = LoggerFactory.getLogger(TestPerColumnFamilyFlush.class); 076 077 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 078 079 private static final Path DIR = TEST_UTIL.getDataTestDir("TestHRegion"); 080 081 public static final TableName TABLENAME = TableName.valueOf("TestPerColumnFamilyFlush", "t1"); 082 083 public static final byte[][] FAMILIES = { Bytes.toBytes("f1"), Bytes.toBytes("f2"), 084 Bytes.toBytes("f3"), Bytes.toBytes("f4"), Bytes.toBytes("f5") }; 085 086 public static final byte[] FAMILY1 = FAMILIES[0]; 087 088 public static final byte[] FAMILY2 = FAMILIES[1]; 089 090 public static final byte[] FAMILY3 = FAMILIES[2]; 091 092 private HRegion initHRegion(String callingMethod, Configuration conf) throws IOException { 093 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TABLENAME); 094 for (byte[] family : FAMILIES) { 095 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)); 096 } 097 RegionInfo info = RegionInfoBuilder.newBuilder(TABLENAME).build(); 098 Path path = new Path(DIR, callingMethod); 099 return HBaseTestingUtil.createRegionAndWAL(info, path, conf, builder.build()); 100 } 101 102 // A helper function to create puts. 103 private Put createPut(int familyNum, int putNum) { 104 byte[] qf = Bytes.toBytes("q" + familyNum); 105 byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum); 106 byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum); 107 Put p = new Put(row); 108 p.addColumn(FAMILIES[familyNum - 1], qf, val); 109 return p; 110 } 111 112 // A helper function to create puts. 113 private Get createGet(int familyNum, int putNum) { 114 byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum); 115 return new Get(row); 116 } 117 118 // A helper function to verify edits. 119 void verifyEdit(int familyNum, int putNum, Table table) throws IOException { 120 Result r = table.get(createGet(familyNum, putNum)); 121 byte[] family = FAMILIES[familyNum - 1]; 122 byte[] qf = Bytes.toBytes("q" + familyNum); 123 byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum); 124 assertNotNull(("Missing Put#" + putNum + " for CF# " + familyNum), r.getFamilyMap(family)); 125 assertNotNull(("Missing Put#" + putNum + " for CF# " + familyNum), 126 r.getFamilyMap(family).get(qf)); 127 assertTrue(("Incorrect value for Put#" + putNum + " for CF# " + familyNum), 128 Arrays.equals(r.getFamilyMap(family).get(qf), val)); 129 } 130 131 @Test 132 public void testSelectiveFlushWhenEnabled() throws IOException { 133 // Set up the configuration, use new one to not conflict with minicluster in other tests 134 Configuration conf = new HBaseTestingUtil().getConfiguration(); 135 conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 200 * 1024); 136 conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllLargeStoresPolicy.class.getName()); 137 conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 40 * 1024); 138 // Intialize the region 139 HRegion region = initHRegion("testSelectiveFlushWithDataCompaction", conf); 140 // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3 141 for (int i = 1; i <= 1200; i++) { 142 region.put(createPut(1, i)); 143 144 if (i <= 100) { 145 region.put(createPut(2, i)); 146 if (i <= 50) { 147 region.put(createPut(3, i)); 148 } 149 } 150 } 151 152 long totalMemstoreSize = region.getMemStoreDataSize(); 153 154 // Find the smallest LSNs for edits wrt to each CF. 155 long smallestSeqCF1 = region.getOldestSeqIdOfStore(FAMILY1); 156 long smallestSeqCF2 = region.getOldestSeqIdOfStore(FAMILY2); 157 long smallestSeqCF3 = region.getOldestSeqIdOfStore(FAMILY3); 158 159 // Find the sizes of the memstores of each CF. 160 MemStoreSize cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize(); 161 MemStoreSize cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize(); 162 MemStoreSize cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize(); 163 164 // Get the overall smallest LSN in the region's memstores. 165 long smallestSeqInRegionCurrentMemstore = 166 getWAL(region).getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); 167 168 // The overall smallest LSN in the region's memstores should be the same as 169 // the LSN of the smallest edit in CF1 170 assertEquals(smallestSeqCF1, smallestSeqInRegionCurrentMemstore); 171 172 // Some other sanity checks. 173 assertTrue(smallestSeqCF1 < smallestSeqCF2); 174 assertTrue(smallestSeqCF2 < smallestSeqCF3); 175 assertTrue(cf1MemstoreSize.getDataSize() > 0); 176 assertTrue(cf2MemstoreSize.getDataSize() > 0); 177 assertTrue(cf3MemstoreSize.getDataSize() > 0); 178 179 // The total memstore size should be the same as the sum of the sizes of 180 // memstores of CF1, CF2 and CF3. 181 assertEquals(totalMemstoreSize, cf1MemstoreSize.getDataSize() + cf2MemstoreSize.getDataSize() 182 + cf3MemstoreSize.getDataSize()); 183 184 // Flush! 185 region.flush(false); 186 187 // Will use these to check if anything changed. 188 MemStoreSize oldCF2MemstoreSize = cf2MemstoreSize; 189 MemStoreSize oldCF3MemstoreSize = cf3MemstoreSize; 190 191 // Recalculate everything 192 cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize(); 193 cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize(); 194 cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize(); 195 totalMemstoreSize = region.getMemStoreDataSize(); 196 smallestSeqInRegionCurrentMemstore = 197 getWAL(region).getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); 198 199 // We should have cleared out only CF1, since we chose the flush thresholds 200 // and number of puts accordingly. 201 assertEquals(0, cf1MemstoreSize.getDataSize()); 202 assertEquals(MutableSegment.DEEP_OVERHEAD, cf1MemstoreSize.getHeapSize()); 203 // Nothing should have happened to CF2, ... 204 assertEquals(cf2MemstoreSize, oldCF2MemstoreSize); 205 // ... or CF3 206 assertEquals(cf3MemstoreSize, oldCF3MemstoreSize); 207 // Now the smallest LSN in the region should be the same as the smallest 208 // LSN in the memstore of CF2. 209 assertEquals(smallestSeqInRegionCurrentMemstore, smallestSeqCF2); 210 // Of course, this should hold too. 211 assertEquals(totalMemstoreSize, cf2MemstoreSize.getDataSize() + cf3MemstoreSize.getDataSize()); 212 213 // Now add more puts (mostly for CF2), so that we only flush CF2 this time. 214 for (int i = 1200; i < 2400; i++) { 215 region.put(createPut(2, i)); 216 217 // Add only 100 puts for CF3 218 if (i - 1200 < 100) { 219 region.put(createPut(3, i)); 220 } 221 } 222 223 // How much does the CF3 memstore occupy? Will be used later. 224 oldCF3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize(); 225 226 // Flush again 227 region.flush(false); 228 229 // Recalculate everything 230 cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize(); 231 cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize(); 232 cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize(); 233 totalMemstoreSize = region.getMemStoreDataSize(); 234 smallestSeqInRegionCurrentMemstore = 235 getWAL(region).getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); 236 237 // CF1 and CF2, both should be absent. 238 assertEquals(0, cf1MemstoreSize.getDataSize()); 239 assertEquals(MutableSegment.DEEP_OVERHEAD, cf1MemstoreSize.getHeapSize()); 240 assertEquals(0, cf2MemstoreSize.getDataSize()); 241 assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSize.getHeapSize()); 242 // CF3 shouldn't have been touched. 243 assertEquals(cf3MemstoreSize, oldCF3MemstoreSize); 244 assertEquals(totalMemstoreSize, cf3MemstoreSize.getDataSize()); 245 246 // What happens when we hit the memstore limit, but we are not able to find 247 // any Column Family above the threshold? 248 // In that case, we should flush all the CFs. 249 250 // Clearing the existing memstores. 251 region.flush(true); 252 253 // The memstore limit is 200*1024 and the column family flush threshold is 254 // around 50*1024. We try to just hit the memstore limit with each CF's 255 // memstore being below the CF flush threshold. 256 for (int i = 1; i <= 300; i++) { 257 region.put(createPut(1, i)); 258 region.put(createPut(2, i)); 259 region.put(createPut(3, i)); 260 region.put(createPut(4, i)); 261 region.put(createPut(5, i)); 262 } 263 264 region.flush(false); 265 266 // Since we won't find any CF above the threshold, and hence no specific 267 // store to flush, we should flush all the memstores. 268 assertEquals(0, region.getMemStoreDataSize()); 269 HBaseTestingUtil.closeRegionAndWAL(region); 270 } 271 272 @Test 273 public void testSelectiveFlushWhenNotEnabled() throws IOException { 274 // Set up the configuration, use new one to not conflict with minicluster in other tests 275 Configuration conf = new HBaseTestingUtil().getConfiguration(); 276 conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 200 * 1024); 277 conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllStoresPolicy.class.getName()); 278 279 // Intialize the HRegion 280 HRegion region = initHRegion("testSelectiveFlushWhenNotEnabled", conf); 281 // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3 282 for (int i = 1; i <= 1200; i++) { 283 region.put(createPut(1, i)); 284 if (i <= 100) { 285 region.put(createPut(2, i)); 286 if (i <= 50) { 287 region.put(createPut(3, i)); 288 } 289 } 290 } 291 292 long totalMemstoreSize = region.getMemStoreDataSize(); 293 294 // Find the sizes of the memstores of each CF. 295 MemStoreSize cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize(); 296 MemStoreSize cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize(); 297 MemStoreSize cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize(); 298 299 // Some other sanity checks. 300 assertTrue(cf1MemstoreSize.getDataSize() > 0); 301 assertTrue(cf2MemstoreSize.getDataSize() > 0); 302 assertTrue(cf3MemstoreSize.getDataSize() > 0); 303 304 // The total memstore size should be the same as the sum of the sizes of 305 // memstores of CF1, CF2 and CF3. 306 assertEquals(totalMemstoreSize, cf1MemstoreSize.getDataSize() + cf2MemstoreSize.getDataSize() 307 + cf3MemstoreSize.getDataSize()); 308 309 // Flush! 310 region.flush(false); 311 312 cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize(); 313 cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize(); 314 cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize(); 315 totalMemstoreSize = region.getMemStoreDataSize(); 316 long smallestSeqInRegionCurrentMemstore = 317 region.getWAL().getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); 318 319 // Everything should have been cleared 320 assertEquals(0, cf1MemstoreSize.getDataSize()); 321 assertEquals(MutableSegment.DEEP_OVERHEAD, cf1MemstoreSize.getHeapSize()); 322 assertEquals(0, cf2MemstoreSize.getDataSize()); 323 assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSize.getHeapSize()); 324 assertEquals(0, cf3MemstoreSize.getDataSize()); 325 assertEquals(MutableSegment.DEEP_OVERHEAD, cf3MemstoreSize.getHeapSize()); 326 assertEquals(0, totalMemstoreSize); 327 assertEquals(HConstants.NO_SEQNUM, smallestSeqInRegionCurrentMemstore); 328 HBaseTestingUtil.closeRegionAndWAL(region); 329 } 330 331 // Find the (first) region which has the specified name. 332 private static Pair<HRegion, HRegionServer> getRegionWithName(TableName tableName) { 333 SingleProcessHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster(); 334 List<JVMClusterUtil.RegionServerThread> rsts = cluster.getRegionServerThreads(); 335 for (int i = 0; i < cluster.getRegionServerThreads().size(); i++) { 336 HRegionServer hrs = rsts.get(i).getRegionServer(); 337 for (HRegion region : hrs.getRegions(tableName)) { 338 return Pair.newPair(region, hrs); 339 } 340 } 341 return null; 342 } 343 344 private void doTestLogReplay() throws Exception { 345 Configuration conf = TEST_UTIL.getConfiguration(); 346 conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 10000); 347 // Carefully chosen limits so that the memstore just flushes when we're done 348 conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllLargeStoresPolicy.class.getName()); 349 conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 2500); 350 final int numRegionServers = 4; 351 try { 352 TEST_UTIL.startMiniCluster(numRegionServers); 353 TEST_UTIL.getAdmin() 354 .createNamespace(NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build()); 355 Table table = TEST_UTIL.createTable(TABLENAME, FAMILIES); 356 357 // Add 100 edits for CF1, 20 for CF2, 20 for CF3. 358 // These will all be interleaved in the log. 359 for (int i = 1; i <= 80; i++) { 360 table.put(createPut(1, i)); 361 if (i <= 10) { 362 table.put(createPut(2, i)); 363 table.put(createPut(3, i)); 364 } 365 } 366 Thread.sleep(1000); 367 368 Pair<HRegion, HRegionServer> desiredRegionAndServer = getRegionWithName(TABLENAME); 369 HRegion desiredRegion = desiredRegionAndServer.getFirst(); 370 assertTrue("Could not find a region which hosts the new region.", desiredRegion != null); 371 372 // Flush the region selectively. 373 desiredRegion.flush(false); 374 375 long totalMemstoreSize; 376 long cf1MemstoreSize, cf2MemstoreSize, cf3MemstoreSize; 377 totalMemstoreSize = desiredRegion.getMemStoreDataSize(); 378 379 // Find the sizes of the memstores of each CF. 380 cf1MemstoreSize = desiredRegion.getStore(FAMILY1).getMemStoreSize().getDataSize(); 381 cf2MemstoreSize = desiredRegion.getStore(FAMILY2).getMemStoreSize().getDataSize(); 382 cf3MemstoreSize = desiredRegion.getStore(FAMILY3).getMemStoreSize().getDataSize(); 383 384 // CF1 Should have been flushed 385 assertEquals(0, cf1MemstoreSize); 386 // CF2 and CF3 shouldn't have been flushed. 387 // TODO: This test doesn't allow for this case: 388 // " Since none of the CFs were above the size, flushing all." 389 // i.e. a flush happens before we get to here and its a flush-all. 390 assertTrue(cf2MemstoreSize >= 0); 391 assertTrue(cf3MemstoreSize >= 0); 392 assertEquals(totalMemstoreSize, cf2MemstoreSize + cf3MemstoreSize); 393 394 // Wait for the RS report to go across to the master, so that the master 395 // is aware of which sequence ids have been flushed, before we kill the RS. 396 // If in production, the RS dies before the report goes across, we will 397 // safely replay all the edits. 398 Thread.sleep(2000); 399 400 // Abort the region server where we have the region hosted. 401 HRegionServer rs = desiredRegionAndServer.getSecond(); 402 rs.abort("testing"); 403 404 // The aborted region server's regions will be eventually assigned to some 405 // other region server, and the get RPC call (inside verifyEdit()) will 406 // retry for some time till the regions come back up. 407 408 // Verify that all the edits are safe. 409 for (int i = 1; i <= 80; i++) { 410 verifyEdit(1, i, table); 411 if (i <= 10) { 412 verifyEdit(2, i, table); 413 verifyEdit(3, i, table); 414 } 415 } 416 } finally { 417 TEST_UTIL.shutdownMiniCluster(); 418 } 419 } 420 421 // Test Log Replay with Distributed log split on. 422 @Test 423 public void testLogReplayWithDistributedLogSplit() throws Exception { 424 doTestLogReplay(); 425 } 426 427 private WAL getWAL(Region region) { 428 return ((HRegion) region).getWAL(); 429 } 430 431 private int getNumRolledLogFiles(Region region) { 432 return AbstractFSWALProvider.getNumRolledLogFiles(getWAL(region)); 433 } 434 435 /** 436 * When a log roll is about to happen, we do a flush of the regions who will be affected by the 437 * log roll. These flushes cannot be a selective flushes, otherwise we cannot roll the logs. This 438 * test ensures that we do a full-flush in that scenario. 439 */ 440 @Test 441 public void testFlushingWhenLogRolling() throws Exception { 442 TableName tableName = TableName.valueOf("testFlushingWhenLogRolling"); 443 Configuration conf = TEST_UTIL.getConfiguration(); 444 conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 128 * 1024 * 1024); 445 conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllLargeStoresPolicy.class.getName()); 446 long cfFlushSizeLowerBound = 2048; 447 conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 448 cfFlushSizeLowerBound); 449 450 // One hour, prevent periodic rolling 451 conf.setLong("hbase.regionserver.logroll.period", 60L * 60 * 1000); 452 // prevent rolling by size 453 conf.setLong("hbase.regionserver.hlog.blocksize", 128L * 1024 * 1024); 454 // Make it 10 as max logs before a flush comes on. 455 final int maxLogs = 10; 456 conf.setInt("hbase.regionserver.maxlogs", maxLogs); 457 458 final int numRegionServers = 1; 459 TEST_UTIL.startMiniCluster(numRegionServers); 460 try { 461 Table table = TEST_UTIL.createTable(tableName, FAMILIES); 462 Pair<HRegion, HRegionServer> desiredRegionAndServer = getRegionWithName(tableName); 463 final HRegion desiredRegion = desiredRegionAndServer.getFirst(); 464 assertTrue("Could not find a region which hosts the new region.", desiredRegion != null); 465 LOG.info("Writing to region=" + desiredRegion); 466 467 // Add one row for both CFs. 468 for (int i = 1; i <= 3; i++) { 469 table.put(createPut(i, 0)); 470 } 471 // Now only add row to CF1, make sure when we force a flush, CF1 is larger than the lower 472 // bound and CF2 and CF3 are smaller than the lower bound. 473 for (int i = 0; i < maxLogs; i++) { 474 for (int j = 0; j < 100; j++) { 475 table.put(createPut(1, i * 100 + j)); 476 } 477 // Roll the WAL. The log file count is less than maxLogs so no flush is triggered. 478 int currentNumRolledLogFiles = getNumRolledLogFiles(desiredRegion); 479 assertNull(getWAL(desiredRegion).rollWriter()); 480 TEST_UTIL.waitFor(60000, 481 () -> getNumRolledLogFiles(desiredRegion) > currentNumRolledLogFiles); 482 } 483 assertEquals(maxLogs, getNumRolledLogFiles(desiredRegion)); 484 assertTrue( 485 desiredRegion.getStore(FAMILY1).getMemStoreSize().getHeapSize() > cfFlushSizeLowerBound); 486 assertTrue( 487 desiredRegion.getStore(FAMILY2).getMemStoreSize().getHeapSize() < cfFlushSizeLowerBound); 488 assertTrue( 489 desiredRegion.getStore(FAMILY3).getMemStoreSize().getHeapSize() < cfFlushSizeLowerBound); 490 table.put(createPut(1, 12345678)); 491 // Make numRolledLogFiles greater than maxLogs 492 desiredRegionAndServer.getSecond().getWalRoller().requestRollAll(); 493 // Wait for some time till the flush caused by log rolling happens. 494 TEST_UTIL.waitFor(30000, new Waiter.ExplainingPredicate<Exception>() { 495 496 @Override 497 public boolean evaluate() throws Exception { 498 return desiredRegion.getMemStoreDataSize() == 0; 499 } 500 501 @Override 502 public String explainFailure() throws Exception { 503 long memstoreSize = desiredRegion.getMemStoreDataSize(); 504 if (memstoreSize > 0) { 505 return "Still have unflushed entries in memstore, memstore size is " + memstoreSize; 506 } 507 return "Unknown"; 508 } 509 }); 510 LOG.info("Finished waiting on flush after too many WALs..."); 511 // Individual families should have been flushed. 512 assertEquals(MutableSegment.DEEP_OVERHEAD, 513 desiredRegion.getStore(FAMILY1).getMemStoreSize().getHeapSize()); 514 assertEquals(MutableSegment.DEEP_OVERHEAD, 515 desiredRegion.getStore(FAMILY2).getMemStoreSize().getHeapSize()); 516 assertEquals(MutableSegment.DEEP_OVERHEAD, 517 desiredRegion.getStore(FAMILY3).getMemStoreSize().getHeapSize()); 518 // let WAL cleanOldLogs 519 assertNull(getWAL(desiredRegion).rollWriter(true)); 520 TEST_UTIL.waitFor(60000, () -> getNumRolledLogFiles(desiredRegion) < maxLogs); 521 } finally { 522 TEST_UTIL.shutdownMiniCluster(); 523 } 524 } 525 526 private void doPut(Table table, long memstoreFlushSize) throws IOException, InterruptedException { 527 Region region = getRegionWithName(table.getName()).getFirst(); 528 // cf1 4B per row, cf2 40B per row and cf3 400B per row 529 byte[] qf = Bytes.toBytes("qf"); 530 for (int i = 0; i < 10000; i++) { 531 Put put = new Put(Bytes.toBytes("row-" + i)); 532 byte[] value1 = new byte[100]; 533 Bytes.random(value1); 534 put.addColumn(FAMILY1, qf, value1); 535 byte[] value2 = new byte[200]; 536 Bytes.random(value2); 537 put.addColumn(FAMILY2, qf, value2); 538 byte[] value3 = new byte[400]; 539 Bytes.random(value3); 540 put.addColumn(FAMILY3, qf, value3); 541 table.put(put); 542 // slow down to let regionserver flush region. 543 while (region.getMemStoreHeapSize() > memstoreFlushSize) { 544 Thread.sleep(100); 545 } 546 } 547 } 548 549 // Under the same write load, small stores should have less store files when 550 // percolumnfamilyflush enabled. 551 @Test 552 public void testCompareStoreFileCount() throws Exception { 553 long memstoreFlushSize = 1024L * 1024; 554 Configuration conf = TEST_UTIL.getConfiguration(); 555 conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, memstoreFlushSize); 556 conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllStoresPolicy.class.getName()); 557 conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000); 558 conf.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY, 559 ConstantSizeRegionSplitPolicy.class.getName()); 560 561 TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(TABLENAME) 562 .setCompactionEnabled(false).setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY1)) 563 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY2)) 564 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY3)).build(); 565 566 LOG.info("==============Test with selective flush disabled==============="); 567 int cf1StoreFileCount = -1; 568 int cf2StoreFileCount = -1; 569 int cf3StoreFileCount = -1; 570 int cf1StoreFileCount1 = -1; 571 int cf2StoreFileCount1 = -1; 572 int cf3StoreFileCount1 = -1; 573 try { 574 TEST_UTIL.startMiniCluster(1); 575 TEST_UTIL.getAdmin() 576 .createNamespace(NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build()); 577 TEST_UTIL.getAdmin().createTable(tableDescriptor); 578 TEST_UTIL.waitTableAvailable(TABLENAME); 579 Connection conn = ConnectionFactory.createConnection(conf); 580 Table table = conn.getTable(TABLENAME); 581 doPut(table, memstoreFlushSize); 582 table.close(); 583 conn.close(); 584 585 Region region = getRegionWithName(TABLENAME).getFirst(); 586 cf1StoreFileCount = region.getStore(FAMILY1).getStorefilesCount(); 587 cf2StoreFileCount = region.getStore(FAMILY2).getStorefilesCount(); 588 cf3StoreFileCount = region.getStore(FAMILY3).getStorefilesCount(); 589 } finally { 590 TEST_UTIL.shutdownMiniCluster(); 591 } 592 593 LOG.info("==============Test with selective flush enabled==============="); 594 conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllLargeStoresPolicy.class.getName()); 595 // default value of per-cf flush lower bound is too big, set to a small enough value 596 conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 0); 597 try { 598 TEST_UTIL.startMiniCluster(1); 599 TEST_UTIL.getAdmin() 600 .createNamespace(NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build()); 601 TEST_UTIL.getAdmin().createTable(tableDescriptor); 602 Connection conn = ConnectionFactory.createConnection(conf); 603 Table table = conn.getTable(TABLENAME); 604 doPut(table, memstoreFlushSize); 605 table.close(); 606 conn.close(); 607 608 Region region = getRegionWithName(TABLENAME).getFirst(); 609 cf1StoreFileCount1 = region.getStore(FAMILY1).getStorefilesCount(); 610 cf2StoreFileCount1 = region.getStore(FAMILY2).getStorefilesCount(); 611 cf3StoreFileCount1 = region.getStore(FAMILY3).getStorefilesCount(); 612 } finally { 613 TEST_UTIL.shutdownMiniCluster(); 614 } 615 616 LOG.info("disable selective flush: " + Bytes.toString(FAMILY1) + "=>" + cf1StoreFileCount + ", " 617 + Bytes.toString(FAMILY2) + "=>" + cf2StoreFileCount + ", " + Bytes.toString(FAMILY3) + "=>" 618 + cf3StoreFileCount); 619 LOG.info("enable selective flush: " + Bytes.toString(FAMILY1) + "=>" + cf1StoreFileCount1 + ", " 620 + Bytes.toString(FAMILY2) + "=>" + cf2StoreFileCount1 + ", " + Bytes.toString(FAMILY3) + "=>" 621 + cf3StoreFileCount1); 622 // small CF will have less store files. 623 assertTrue(cf1StoreFileCount1 < cf1StoreFileCount); 624 assertTrue(cf2StoreFileCount1 < cf2StoreFileCount); 625 } 626 627 public static void main(String[] args) throws Exception { 628 int numRegions = Integer.parseInt(args[0]); 629 long numRows = Long.parseLong(args[1]); 630 631 TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(TABLENAME) 632 .setMaxFileSize(10L * 1024 * 1024 * 1024) 633 .setValue(TableDescriptorBuilder.SPLIT_POLICY, ConstantSizeRegionSplitPolicy.class.getName()) 634 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY1)) 635 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY2)) 636 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY3)).build(); 637 638 Configuration conf = HBaseConfiguration.create(); 639 Connection conn = ConnectionFactory.createConnection(conf); 640 Admin admin = conn.getAdmin(); 641 if (admin.tableExists(TABLENAME)) { 642 admin.disableTable(TABLENAME); 643 admin.deleteTable(TABLENAME); 644 } 645 if (numRegions >= 3) { 646 byte[] startKey = new byte[16]; 647 byte[] endKey = new byte[16]; 648 Arrays.fill(endKey, (byte) 0xFF); 649 admin.createTable(tableDescriptor, startKey, endKey, numRegions); 650 } else { 651 admin.createTable(tableDescriptor); 652 } 653 admin.close(); 654 655 Table table = conn.getTable(TABLENAME); 656 byte[] qf = Bytes.toBytes("qf"); 657 byte[] value1 = new byte[16]; 658 byte[] value2 = new byte[256]; 659 byte[] value3 = new byte[4096]; 660 for (long i = 0; i < numRows; i++) { 661 Put put = new Put(Hashing.md5().hashLong(i).asBytes()); 662 Bytes.random(value1); 663 Bytes.random(value2); 664 Bytes.random(value3); 665 put.addColumn(FAMILY1, qf, value1); 666 put.addColumn(FAMILY2, qf, value2); 667 put.addColumn(FAMILY3, qf, value3); 668 table.put(put); 669 if (i % 10000 == 0) { 670 LOG.info(i + " rows put"); 671 } 672 } 673 table.close(); 674 conn.close(); 675 } 676}