001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertNotNull; 022import static org.junit.Assert.assertNull; 023import static org.junit.Assert.assertTrue; 024 025import java.io.IOException; 026import java.util.Arrays; 027import java.util.List; 028import java.util.Random; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.Path; 031import org.apache.hadoop.hbase.HBaseClassTestRule; 032import org.apache.hadoop.hbase.HBaseConfiguration; 033import org.apache.hadoop.hbase.HBaseTestingUtility; 034import org.apache.hadoop.hbase.HColumnDescriptor; 035import org.apache.hadoop.hbase.HConstants; 036import org.apache.hadoop.hbase.HRegionInfo; 037import org.apache.hadoop.hbase.HTableDescriptor; 038import org.apache.hadoop.hbase.MiniHBaseCluster; 039import org.apache.hadoop.hbase.NamespaceDescriptor; 040import org.apache.hadoop.hbase.TableName; 041import org.apache.hadoop.hbase.Waiter; 042import org.apache.hadoop.hbase.client.Admin; 043import org.apache.hadoop.hbase.client.Connection; 044import org.apache.hadoop.hbase.client.ConnectionFactory; 045import org.apache.hadoop.hbase.client.Get; 046import org.apache.hadoop.hbase.client.Put; 047import org.apache.hadoop.hbase.client.Result; 048import org.apache.hadoop.hbase.client.Table; 049import org.apache.hadoop.hbase.testclassification.LargeTests; 050import org.apache.hadoop.hbase.testclassification.RegionServerTests; 051import org.apache.hadoop.hbase.util.Bytes; 052import org.apache.hadoop.hbase.util.JVMClusterUtil; 053import org.apache.hadoop.hbase.util.Pair; 054import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 055import org.apache.hadoop.hbase.wal.WAL; 056import org.junit.ClassRule; 057import org.junit.Test; 058import org.junit.experimental.categories.Category; 059import org.slf4j.Logger; 060import org.slf4j.LoggerFactory; 061 062import org.apache.hbase.thirdparty.com.google.common.hash.Hashing; 063 064/** 065 * This test verifies the correctness of the Per Column Family flushing strategy 066 */ 067@Category({ RegionServerTests.class, LargeTests.class }) 068public class TestPerColumnFamilyFlush { 069 070 @ClassRule 071 public static final HBaseClassTestRule CLASS_RULE = 072 HBaseClassTestRule.forClass(TestPerColumnFamilyFlush.class); 073 074 private static final Logger LOG = LoggerFactory.getLogger(TestPerColumnFamilyFlush.class); 075 076 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 077 078 private static final Path DIR = TEST_UTIL.getDataTestDir("TestHRegion"); 079 080 public static final TableName TABLENAME = TableName.valueOf("TestPerColumnFamilyFlush", "t1"); 081 082 public static final byte[][] FAMILIES = { Bytes.toBytes("f1"), Bytes.toBytes("f2"), 083 Bytes.toBytes("f3"), Bytes.toBytes("f4"), Bytes.toBytes("f5") }; 084 085 public static final byte[] FAMILY1 = FAMILIES[0]; 086 087 public static final byte[] FAMILY2 = FAMILIES[1]; 088 089 public static final byte[] FAMILY3 = FAMILIES[2]; 090 091 private HRegion initHRegion(String callingMethod, Configuration conf) throws IOException { 092 HTableDescriptor htd = new HTableDescriptor(TABLENAME); 093 for (byte[] family : FAMILIES) { 094 htd.addFamily(new HColumnDescriptor(family)); 095 } 096 HRegionInfo info = new HRegionInfo(TABLENAME, null, null, false); 097 Path path = new Path(DIR, callingMethod); 098 return HBaseTestingUtility.createRegionAndWAL(info, path, conf, htd); 099 } 100 101 // A helper function to create puts. 102 private Put createPut(int familyNum, int putNum) { 103 byte[] qf = Bytes.toBytes("q" + familyNum); 104 byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum); 105 byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum); 106 Put p = new Put(row); 107 p.addColumn(FAMILIES[familyNum - 1], qf, val); 108 return p; 109 } 110 111 // A helper function to create puts. 112 private Get createGet(int familyNum, int putNum) { 113 byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum); 114 return new Get(row); 115 } 116 117 // A helper function to verify edits. 118 void verifyEdit(int familyNum, int putNum, Table table) throws IOException { 119 Result r = table.get(createGet(familyNum, putNum)); 120 byte[] family = FAMILIES[familyNum - 1]; 121 byte[] qf = Bytes.toBytes("q" + familyNum); 122 byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum); 123 assertNotNull(("Missing Put#" + putNum + " for CF# " + familyNum), r.getFamilyMap(family)); 124 assertNotNull(("Missing Put#" + putNum + " for CF# " + familyNum), 125 r.getFamilyMap(family).get(qf)); 126 assertTrue(("Incorrect value for Put#" + putNum + " for CF# " + familyNum), 127 Arrays.equals(r.getFamilyMap(family).get(qf), val)); 128 } 129 130 @Test 131 public void testSelectiveFlushWhenEnabled() throws IOException { 132 // Set up the configuration, use new one to not conflict with minicluster in other tests 133 Configuration conf = new HBaseTestingUtility().getConfiguration(); 134 conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 200 * 1024); 135 conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllLargeStoresPolicy.class.getName()); 136 conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 137 40 * 1024); 138 // Intialize the region 139 HRegion region = initHRegion("testSelectiveFlushWithDataCompaction", conf); 140 // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3 141 for (int i = 1; i <= 1200; i++) { 142 region.put(createPut(1, i)); 143 144 if (i <= 100) { 145 region.put(createPut(2, i)); 146 if (i <= 50) { 147 region.put(createPut(3, i)); 148 } 149 } 150 } 151 152 long totalMemstoreSize = region.getMemStoreDataSize(); 153 154 // Find the smallest LSNs for edits wrt to each CF. 155 long smallestSeqCF1 = region.getOldestSeqIdOfStore(FAMILY1); 156 long smallestSeqCF2 = region.getOldestSeqIdOfStore(FAMILY2); 157 long smallestSeqCF3 = region.getOldestSeqIdOfStore(FAMILY3); 158 159 // Find the sizes of the memstores of each CF. 160 MemStoreSize cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize(); 161 MemStoreSize cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize(); 162 MemStoreSize cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize(); 163 164 // Get the overall smallest LSN in the region's memstores. 165 long smallestSeqInRegionCurrentMemstore = getWAL(region) 166 .getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); 167 168 // The overall smallest LSN in the region's memstores should be the same as 169 // the LSN of the smallest edit in CF1 170 assertEquals(smallestSeqCF1, smallestSeqInRegionCurrentMemstore); 171 172 // Some other sanity checks. 173 assertTrue(smallestSeqCF1 < smallestSeqCF2); 174 assertTrue(smallestSeqCF2 < smallestSeqCF3); 175 assertTrue(cf1MemstoreSize.getDataSize() > 0); 176 assertTrue(cf2MemstoreSize.getDataSize() > 0); 177 assertTrue(cf3MemstoreSize.getDataSize() > 0); 178 179 // The total memstore size should be the same as the sum of the sizes of 180 // memstores of CF1, CF2 and CF3. 181 assertEquals(totalMemstoreSize, cf1MemstoreSize.getDataSize() + cf2MemstoreSize.getDataSize() 182 + cf3MemstoreSize.getDataSize()); 183 184 // Flush! 185 region.flush(false); 186 187 // Will use these to check if anything changed. 188 MemStoreSize oldCF2MemstoreSize = cf2MemstoreSize; 189 MemStoreSize oldCF3MemstoreSize = cf3MemstoreSize; 190 191 // Recalculate everything 192 cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize(); 193 cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize(); 194 cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize(); 195 totalMemstoreSize = region.getMemStoreDataSize(); 196 smallestSeqInRegionCurrentMemstore = getWAL(region) 197 .getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); 198 199 // We should have cleared out only CF1, since we chose the flush thresholds 200 // and number of puts accordingly. 201 assertEquals(0, cf1MemstoreSize.getDataSize()); 202 assertEquals(MutableSegment.DEEP_OVERHEAD, cf1MemstoreSize.getHeapSize()); 203 // Nothing should have happened to CF2, ... 204 assertEquals(cf2MemstoreSize, oldCF2MemstoreSize); 205 // ... or CF3 206 assertEquals(cf3MemstoreSize, oldCF3MemstoreSize); 207 // Now the smallest LSN in the region should be the same as the smallest 208 // LSN in the memstore of CF2. 209 assertEquals(smallestSeqInRegionCurrentMemstore, smallestSeqCF2); 210 // Of course, this should hold too. 211 assertEquals(totalMemstoreSize, cf2MemstoreSize.getDataSize() + cf3MemstoreSize.getDataSize()); 212 213 // Now add more puts (mostly for CF2), so that we only flush CF2 this time. 214 for (int i = 1200; i < 2400; i++) { 215 region.put(createPut(2, i)); 216 217 // Add only 100 puts for CF3 218 if (i - 1200 < 100) { 219 region.put(createPut(3, i)); 220 } 221 } 222 223 // How much does the CF3 memstore occupy? Will be used later. 224 oldCF3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize(); 225 226 // Flush again 227 region.flush(false); 228 229 // Recalculate everything 230 cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize(); 231 cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize(); 232 cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize(); 233 totalMemstoreSize = region.getMemStoreDataSize(); 234 smallestSeqInRegionCurrentMemstore = getWAL(region) 235 .getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); 236 237 // CF1 and CF2, both should be absent. 238 assertEquals(0, cf1MemstoreSize.getDataSize()); 239 assertEquals(MutableSegment.DEEP_OVERHEAD, cf1MemstoreSize.getHeapSize()); 240 assertEquals(0, cf2MemstoreSize.getDataSize()); 241 assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSize.getHeapSize()); 242 // CF3 shouldn't have been touched. 243 assertEquals(cf3MemstoreSize, oldCF3MemstoreSize); 244 assertEquals(totalMemstoreSize, cf3MemstoreSize.getDataSize()); 245 246 // What happens when we hit the memstore limit, but we are not able to find 247 // any Column Family above the threshold? 248 // In that case, we should flush all the CFs. 249 250 // Clearing the existing memstores. 251 region.flush(true); 252 253 // The memstore limit is 200*1024 and the column family flush threshold is 254 // around 50*1024. We try to just hit the memstore limit with each CF's 255 // memstore being below the CF flush threshold. 256 for (int i = 1; i <= 300; i++) { 257 region.put(createPut(1, i)); 258 region.put(createPut(2, i)); 259 region.put(createPut(3, i)); 260 region.put(createPut(4, i)); 261 region.put(createPut(5, i)); 262 } 263 264 region.flush(false); 265 266 // Since we won't find any CF above the threshold, and hence no specific 267 // store to flush, we should flush all the memstores. 268 assertEquals(0, region.getMemStoreDataSize()); 269 HBaseTestingUtility.closeRegionAndWAL(region); 270 } 271 272 @Test 273 public void testSelectiveFlushWhenNotEnabled() throws IOException { 274 // Set up the configuration, use new one to not conflict with minicluster in other tests 275 Configuration conf = new HBaseTestingUtility().getConfiguration(); 276 conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 200 * 1024); 277 conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllStoresPolicy.class.getName()); 278 279 // Intialize the HRegion 280 HRegion region = initHRegion("testSelectiveFlushWhenNotEnabled", conf); 281 // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3 282 for (int i = 1; i <= 1200; i++) { 283 region.put(createPut(1, i)); 284 if (i <= 100) { 285 region.put(createPut(2, i)); 286 if (i <= 50) { 287 region.put(createPut(3, i)); 288 } 289 } 290 } 291 292 long totalMemstoreSize = region.getMemStoreDataSize(); 293 294 // Find the sizes of the memstores of each CF. 295 MemStoreSize cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize(); 296 MemStoreSize cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize(); 297 MemStoreSize cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize(); 298 299 // Some other sanity checks. 300 assertTrue(cf1MemstoreSize.getDataSize() > 0); 301 assertTrue(cf2MemstoreSize.getDataSize() > 0); 302 assertTrue(cf3MemstoreSize.getDataSize() > 0); 303 304 // The total memstore size should be the same as the sum of the sizes of 305 // memstores of CF1, CF2 and CF3. 306 assertEquals(totalMemstoreSize, cf1MemstoreSize.getDataSize() + cf2MemstoreSize.getDataSize() 307 + cf3MemstoreSize.getDataSize()); 308 309 // Flush! 310 region.flush(false); 311 312 cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize(); 313 cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize(); 314 cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize(); 315 totalMemstoreSize = region.getMemStoreDataSize(); 316 long smallestSeqInRegionCurrentMemstore = 317 region.getWAL().getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); 318 319 // Everything should have been cleared 320 assertEquals(0, cf1MemstoreSize.getDataSize()); 321 assertEquals(MutableSegment.DEEP_OVERHEAD, cf1MemstoreSize.getHeapSize()); 322 assertEquals(0, cf2MemstoreSize.getDataSize()); 323 assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSize.getHeapSize()); 324 assertEquals(0, cf3MemstoreSize.getDataSize()); 325 assertEquals(MutableSegment.DEEP_OVERHEAD, cf3MemstoreSize.getHeapSize()); 326 assertEquals(0, totalMemstoreSize); 327 assertEquals(HConstants.NO_SEQNUM, smallestSeqInRegionCurrentMemstore); 328 HBaseTestingUtility.closeRegionAndWAL(region); 329 } 330 331 // Find the (first) region which has the specified name. 332 private static Pair<HRegion, HRegionServer> getRegionWithName(TableName tableName) { 333 MiniHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster(); 334 List<JVMClusterUtil.RegionServerThread> rsts = cluster.getRegionServerThreads(); 335 for (int i = 0; i < cluster.getRegionServerThreads().size(); i++) { 336 HRegionServer hrs = rsts.get(i).getRegionServer(); 337 for (HRegion region : hrs.getRegions(tableName)) { 338 return Pair.newPair(region, hrs); 339 } 340 } 341 return null; 342 } 343 344 private void doTestLogReplay() throws Exception { 345 Configuration conf = TEST_UTIL.getConfiguration(); 346 conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 10000); 347 // Carefully chosen limits so that the memstore just flushes when we're done 348 conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllLargeStoresPolicy.class.getName()); 349 conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 2500); 350 final int numRegionServers = 4; 351 try { 352 TEST_UTIL.startMiniCluster(numRegionServers); 353 TEST_UTIL.getAdmin().createNamespace( 354 NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build()); 355 Table table = TEST_UTIL.createTable(TABLENAME, FAMILIES); 356 HTableDescriptor htd = table.getTableDescriptor(); 357 358 for (byte[] family : FAMILIES) { 359 if (!htd.hasFamily(family)) { 360 htd.addFamily(new HColumnDescriptor(family)); 361 } 362 } 363 364 // Add 100 edits for CF1, 20 for CF2, 20 for CF3. 365 // These will all be interleaved in the log. 366 for (int i = 1; i <= 80; i++) { 367 table.put(createPut(1, i)); 368 if (i <= 10) { 369 table.put(createPut(2, i)); 370 table.put(createPut(3, i)); 371 } 372 } 373 Thread.sleep(1000); 374 375 Pair<HRegion, HRegionServer> desiredRegionAndServer = getRegionWithName(TABLENAME); 376 HRegion desiredRegion = desiredRegionAndServer.getFirst(); 377 assertTrue("Could not find a region which hosts the new region.", desiredRegion != null); 378 379 // Flush the region selectively. 380 desiredRegion.flush(false); 381 382 long totalMemstoreSize; 383 long cf1MemstoreSize, cf2MemstoreSize, cf3MemstoreSize; 384 totalMemstoreSize = desiredRegion.getMemStoreDataSize(); 385 386 // Find the sizes of the memstores of each CF. 387 cf1MemstoreSize = desiredRegion.getStore(FAMILY1).getMemStoreSize().getDataSize(); 388 cf2MemstoreSize = desiredRegion.getStore(FAMILY2).getMemStoreSize().getDataSize(); 389 cf3MemstoreSize = desiredRegion.getStore(FAMILY3).getMemStoreSize().getDataSize(); 390 391 // CF1 Should have been flushed 392 assertEquals(0, cf1MemstoreSize); 393 // CF2 and CF3 shouldn't have been flushed. 394 // TODO: This test doesn't allow for this case: 395 // " Since none of the CFs were above the size, flushing all." 396 // i.e. a flush happens before we get to here and its a flush-all. 397 assertTrue(cf2MemstoreSize >= 0); 398 assertTrue(cf3MemstoreSize >= 0); 399 assertEquals(totalMemstoreSize, cf2MemstoreSize + cf3MemstoreSize); 400 401 // Wait for the RS report to go across to the master, so that the master 402 // is aware of which sequence ids have been flushed, before we kill the RS. 403 // If in production, the RS dies before the report goes across, we will 404 // safely replay all the edits. 405 Thread.sleep(2000); 406 407 // Abort the region server where we have the region hosted. 408 HRegionServer rs = desiredRegionAndServer.getSecond(); 409 rs.abort("testing"); 410 411 // The aborted region server's regions will be eventually assigned to some 412 // other region server, and the get RPC call (inside verifyEdit()) will 413 // retry for some time till the regions come back up. 414 415 // Verify that all the edits are safe. 416 for (int i = 1; i <= 80; i++) { 417 verifyEdit(1, i, table); 418 if (i <= 10) { 419 verifyEdit(2, i, table); 420 verifyEdit(3, i, table); 421 } 422 } 423 } finally { 424 TEST_UTIL.shutdownMiniCluster(); 425 } 426 } 427 428 // Test Log Replay with Distributed log split on. 429 @Test 430 public void testLogReplayWithDistributedLogSplit() throws Exception { 431 doTestLogReplay(); 432 } 433 434 private WAL getWAL(Region region) { 435 return ((HRegion)region).getWAL(); 436 } 437 438 private int getNumRolledLogFiles(Region region) { 439 return AbstractFSWALProvider.getNumRolledLogFiles(getWAL(region)); 440 } 441 442 /** 443 * When a log roll is about to happen, we do a flush of the regions who will be affected by the 444 * log roll. These flushes cannot be a selective flushes, otherwise we cannot roll the logs. This 445 * test ensures that we do a full-flush in that scenario. 446 * @throws IOException 447 */ 448 @Test 449 public void testFlushingWhenLogRolling() throws Exception { 450 TableName tableName = TableName.valueOf("testFlushingWhenLogRolling"); 451 Configuration conf = TEST_UTIL.getConfiguration(); 452 conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 128 * 1024 * 1024); 453 conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllLargeStoresPolicy.class.getName()); 454 long cfFlushSizeLowerBound = 2048; 455 conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 456 cfFlushSizeLowerBound); 457 458 // One hour, prevent periodic rolling 459 conf.setLong("hbase.regionserver.logroll.period", 60L * 60 * 1000); 460 // prevent rolling by size 461 conf.setLong("hbase.regionserver.hlog.blocksize", 128L * 1024 * 1024); 462 // Make it 10 as max logs before a flush comes on. 463 final int maxLogs = 10; 464 conf.setInt("hbase.regionserver.maxlogs", maxLogs); 465 466 final int numRegionServers = 1; 467 TEST_UTIL.startMiniCluster(numRegionServers); 468 try { 469 Table table = TEST_UTIL.createTable(tableName, FAMILIES); 470 // Force flush the namespace table so edits to it are not hanging around as oldest 471 // edits. Otherwise, below, when we make maximum number of WAL files, then it will be 472 // the namespace region that is flushed and not the below 'desiredRegion'. 473 try (Admin admin = TEST_UTIL.getConnection().getAdmin()) { 474 admin.flush(TableName.NAMESPACE_TABLE_NAME); 475 } 476 Pair<HRegion, HRegionServer> desiredRegionAndServer = getRegionWithName(tableName); 477 final HRegion desiredRegion = desiredRegionAndServer.getFirst(); 478 assertTrue("Could not find a region which hosts the new region.", desiredRegion != null); 479 LOG.info("Writing to region=" + desiredRegion); 480 481 // Add one row for both CFs. 482 for (int i = 1; i <= 3; i++) { 483 table.put(createPut(i, 0)); 484 } 485 // Now only add row to CF1, make sure when we force a flush, CF1 is larger than the lower 486 // bound and CF2 and CF3 are smaller than the lower bound. 487 for (int i = 0; i < maxLogs; i++) { 488 for (int j = 0; j < 100; j++) { 489 table.put(createPut(1, i * 100 + j)); 490 } 491 // Roll the WAL. The log file count is less than maxLogs so no flush is triggered. 492 int currentNumRolledLogFiles = getNumRolledLogFiles(desiredRegion); 493 assertNull(getWAL(desiredRegion).rollWriter()); 494 while (getNumRolledLogFiles(desiredRegion) <= currentNumRolledLogFiles) { 495 Thread.sleep(100); 496 } 497 } 498 assertEquals(maxLogs, getNumRolledLogFiles(desiredRegion)); 499 assertTrue(desiredRegion.getStore(FAMILY1).getMemStoreSize().getHeapSize() > cfFlushSizeLowerBound); 500 assertTrue(desiredRegion.getStore(FAMILY2).getMemStoreSize().getHeapSize() < cfFlushSizeLowerBound); 501 assertTrue(desiredRegion.getStore(FAMILY3).getMemStoreSize().getHeapSize() < cfFlushSizeLowerBound); 502 table.put(createPut(1, 12345678)); 503 // Make numRolledLogFiles greater than maxLogs 504 desiredRegionAndServer.getSecond().getWalRoller().requestRollAll(); 505 // Wait for some time till the flush caused by log rolling happens. 506 TEST_UTIL.waitFor(30000, new Waiter.ExplainingPredicate<Exception>() { 507 508 @Override 509 public boolean evaluate() throws Exception { 510 return desiredRegion.getMemStoreDataSize() == 0; 511 } 512 513 @Override 514 public String explainFailure() throws Exception { 515 long memstoreSize = desiredRegion.getMemStoreDataSize(); 516 if (memstoreSize > 0) { 517 return "Still have unflushed entries in memstore, memstore size is " + memstoreSize; 518 } 519 return "Unknown"; 520 } 521 }); 522 LOG.info("Finished waiting on flush after too many WALs..."); 523 // Individual families should have been flushed. 524 assertEquals(MutableSegment.DEEP_OVERHEAD, 525 desiredRegion.getStore(FAMILY1).getMemStoreSize().getHeapSize()); 526 assertEquals(MutableSegment.DEEP_OVERHEAD, 527 desiredRegion.getStore(FAMILY2).getMemStoreSize().getHeapSize()); 528 assertEquals(MutableSegment.DEEP_OVERHEAD, 529 desiredRegion.getStore(FAMILY3).getMemStoreSize().getHeapSize()); 530 // let WAL cleanOldLogs 531 assertNull(getWAL(desiredRegion).rollWriter(true)); 532 assertTrue(getNumRolledLogFiles(desiredRegion) < maxLogs); 533 } finally { 534 TEST_UTIL.shutdownMiniCluster(); 535 } 536 } 537 538 private void doPut(Table table, long memstoreFlushSize) throws IOException, InterruptedException { 539 Region region = getRegionWithName(table.getName()).getFirst(); 540 // cf1 4B per row, cf2 40B per row and cf3 400B per row 541 byte[] qf = Bytes.toBytes("qf"); 542 Random rand = new Random(); 543 byte[] value1 = new byte[100]; 544 byte[] value2 = new byte[200]; 545 byte[] value3 = new byte[400]; 546 for (int i = 0; i < 10000; i++) { 547 Put put = new Put(Bytes.toBytes("row-" + i)); 548 rand.setSeed(i); 549 rand.nextBytes(value1); 550 rand.nextBytes(value2); 551 rand.nextBytes(value3); 552 put.addColumn(FAMILY1, qf, value1); 553 put.addColumn(FAMILY2, qf, value2); 554 put.addColumn(FAMILY3, qf, value3); 555 table.put(put); 556 // slow down to let regionserver flush region. 557 while (region.getMemStoreHeapSize() > memstoreFlushSize) { 558 Thread.sleep(100); 559 } 560 } 561 } 562 563 // Under the same write load, small stores should have less store files when 564 // percolumnfamilyflush enabled. 565 @Test 566 public void testCompareStoreFileCount() throws Exception { 567 long memstoreFlushSize = 1024L * 1024; 568 Configuration conf = TEST_UTIL.getConfiguration(); 569 conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, memstoreFlushSize); 570 conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllStoresPolicy.class.getName()); 571 conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000); 572 conf.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY, 573 ConstantSizeRegionSplitPolicy.class.getName()); 574 575 HTableDescriptor htd = new HTableDescriptor(TABLENAME); 576 htd.setCompactionEnabled(false); 577 htd.addFamily(new HColumnDescriptor(FAMILY1)); 578 htd.addFamily(new HColumnDescriptor(FAMILY2)); 579 htd.addFamily(new HColumnDescriptor(FAMILY3)); 580 581 LOG.info("==============Test with selective flush disabled==============="); 582 int cf1StoreFileCount = -1; 583 int cf2StoreFileCount = -1; 584 int cf3StoreFileCount = -1; 585 int cf1StoreFileCount1 = -1; 586 int cf2StoreFileCount1 = -1; 587 int cf3StoreFileCount1 = -1; 588 try { 589 TEST_UTIL.startMiniCluster(1); 590 TEST_UTIL.getAdmin().createNamespace( 591 NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build()); 592 TEST_UTIL.getAdmin().createTable(htd); 593 TEST_UTIL.waitTableAvailable(TABLENAME); 594 Connection conn = ConnectionFactory.createConnection(conf); 595 Table table = conn.getTable(TABLENAME); 596 doPut(table, memstoreFlushSize); 597 table.close(); 598 conn.close(); 599 600 Region region = getRegionWithName(TABLENAME).getFirst(); 601 cf1StoreFileCount = region.getStore(FAMILY1).getStorefilesCount(); 602 cf2StoreFileCount = region.getStore(FAMILY2).getStorefilesCount(); 603 cf3StoreFileCount = region.getStore(FAMILY3).getStorefilesCount(); 604 } finally { 605 TEST_UTIL.shutdownMiniCluster(); 606 } 607 608 LOG.info("==============Test with selective flush enabled==============="); 609 conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllLargeStoresPolicy.class.getName()); 610 // default value of per-cf flush lower bound is too big, set to a small enough value 611 conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 0); 612 try { 613 TEST_UTIL.startMiniCluster(1); 614 TEST_UTIL.getAdmin().createNamespace( 615 NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build()); 616 TEST_UTIL.getAdmin().createTable(htd); 617 Connection conn = ConnectionFactory.createConnection(conf); 618 Table table = conn.getTable(TABLENAME); 619 doPut(table, memstoreFlushSize); 620 table.close(); 621 conn.close(); 622 623 Region region = getRegionWithName(TABLENAME).getFirst(); 624 cf1StoreFileCount1 = region.getStore(FAMILY1).getStorefilesCount(); 625 cf2StoreFileCount1 = region.getStore(FAMILY2).getStorefilesCount(); 626 cf3StoreFileCount1 = region.getStore(FAMILY3).getStorefilesCount(); 627 } finally { 628 TEST_UTIL.shutdownMiniCluster(); 629 } 630 631 LOG.info("disable selective flush: " + Bytes.toString(FAMILY1) + "=>" + cf1StoreFileCount 632 + ", " + Bytes.toString(FAMILY2) + "=>" + cf2StoreFileCount + ", " 633 + Bytes.toString(FAMILY3) + "=>" + cf3StoreFileCount); 634 LOG.info("enable selective flush: " + Bytes.toString(FAMILY1) + "=>" + cf1StoreFileCount1 635 + ", " + Bytes.toString(FAMILY2) + "=>" + cf2StoreFileCount1 + ", " 636 + Bytes.toString(FAMILY3) + "=>" + cf3StoreFileCount1); 637 // small CF will have less store files. 638 assertTrue(cf1StoreFileCount1 < cf1StoreFileCount); 639 assertTrue(cf2StoreFileCount1 < cf2StoreFileCount); 640 } 641 642 public static void main(String[] args) throws Exception { 643 int numRegions = Integer.parseInt(args[0]); 644 long numRows = Long.parseLong(args[1]); 645 646 HTableDescriptor htd = new HTableDescriptor(TABLENAME); 647 htd.setMaxFileSize(10L * 1024 * 1024 * 1024); 648 htd.setValue(HTableDescriptor.SPLIT_POLICY, ConstantSizeRegionSplitPolicy.class.getName()); 649 htd.addFamily(new HColumnDescriptor(FAMILY1)); 650 htd.addFamily(new HColumnDescriptor(FAMILY2)); 651 htd.addFamily(new HColumnDescriptor(FAMILY3)); 652 653 Configuration conf = HBaseConfiguration.create(); 654 Connection conn = ConnectionFactory.createConnection(conf); 655 Admin admin = conn.getAdmin(); 656 if (admin.tableExists(TABLENAME)) { 657 admin.disableTable(TABLENAME); 658 admin.deleteTable(TABLENAME); 659 } 660 if (numRegions >= 3) { 661 byte[] startKey = new byte[16]; 662 byte[] endKey = new byte[16]; 663 Arrays.fill(endKey, (byte) 0xFF); 664 admin.createTable(htd, startKey, endKey, numRegions); 665 } else { 666 admin.createTable(htd); 667 } 668 admin.close(); 669 670 Table table = conn.getTable(TABLENAME); 671 byte[] qf = Bytes.toBytes("qf"); 672 Random rand = new Random(); 673 byte[] value1 = new byte[16]; 674 byte[] value2 = new byte[256]; 675 byte[] value3 = new byte[4096]; 676 for (long i = 0; i < numRows; i++) { 677 Put put = new Put(Hashing.md5().hashLong(i).asBytes()); 678 rand.setSeed(i); 679 rand.nextBytes(value1); 680 rand.nextBytes(value2); 681 rand.nextBytes(value3); 682 put.addColumn(FAMILY1, qf, value1); 683 put.addColumn(FAMILY2, qf, value2); 684 put.addColumn(FAMILY3, qf, value3); 685 table.put(put); 686 if (i % 10000 == 0) { 687 LOG.info(i + " rows put"); 688 } 689 } 690 table.close(); 691 conn.close(); 692 } 693}