001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertNotNull; 022import static org.junit.Assert.assertNull; 023import static org.junit.Assert.assertTrue; 024 025import java.io.IOException; 026import java.util.Arrays; 027import java.util.List; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.fs.Path; 030import org.apache.hadoop.hbase.HBaseClassTestRule; 031import org.apache.hadoop.hbase.HBaseConfiguration; 032import org.apache.hadoop.hbase.HBaseTestingUtility; 033import org.apache.hadoop.hbase.HColumnDescriptor; 034import org.apache.hadoop.hbase.HConstants; 035import org.apache.hadoop.hbase.HRegionInfo; 036import org.apache.hadoop.hbase.HTableDescriptor; 037import org.apache.hadoop.hbase.MiniHBaseCluster; 038import org.apache.hadoop.hbase.NamespaceDescriptor; 039import org.apache.hadoop.hbase.TableName; 040import org.apache.hadoop.hbase.Waiter; 041import org.apache.hadoop.hbase.client.Admin; 042import org.apache.hadoop.hbase.client.Connection; 043import org.apache.hadoop.hbase.client.ConnectionFactory; 044import org.apache.hadoop.hbase.client.Get; 045import org.apache.hadoop.hbase.client.Put; 046import org.apache.hadoop.hbase.client.Result; 047import org.apache.hadoop.hbase.client.Table; 048import org.apache.hadoop.hbase.testclassification.LargeTests; 049import org.apache.hadoop.hbase.testclassification.RegionServerTests; 050import org.apache.hadoop.hbase.util.Bytes; 051import org.apache.hadoop.hbase.util.JVMClusterUtil; 052import org.apache.hadoop.hbase.util.Pair; 053import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 054import org.apache.hadoop.hbase.wal.WAL; 055import org.junit.ClassRule; 056import org.junit.Test; 057import org.junit.experimental.categories.Category; 058import org.slf4j.Logger; 059import org.slf4j.LoggerFactory; 060 061import org.apache.hbase.thirdparty.com.google.common.hash.Hashing; 062 063/** 064 * This test verifies the correctness of the Per Column Family flushing strategy 065 */ 066@Category({ RegionServerTests.class, LargeTests.class }) 067public class TestPerColumnFamilyFlush { 068 069 @ClassRule 070 public static final HBaseClassTestRule CLASS_RULE = 071 HBaseClassTestRule.forClass(TestPerColumnFamilyFlush.class); 072 073 private static final Logger LOG = LoggerFactory.getLogger(TestPerColumnFamilyFlush.class); 074 075 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 076 077 private static final Path DIR = TEST_UTIL.getDataTestDir("TestHRegion"); 078 079 public static final TableName TABLENAME = TableName.valueOf("TestPerColumnFamilyFlush", "t1"); 080 081 public static final byte[][] FAMILIES = { Bytes.toBytes("f1"), Bytes.toBytes("f2"), 082 Bytes.toBytes("f3"), Bytes.toBytes("f4"), Bytes.toBytes("f5") }; 083 084 public static final byte[] FAMILY1 = FAMILIES[0]; 085 086 public static final byte[] FAMILY2 = FAMILIES[1]; 087 088 public static final byte[] FAMILY3 = FAMILIES[2]; 089 090 private HRegion initHRegion(String callingMethod, Configuration conf) throws IOException { 091 HTableDescriptor htd = new HTableDescriptor(TABLENAME); 092 for (byte[] family : FAMILIES) { 093 htd.addFamily(new HColumnDescriptor(family)); 094 } 095 HRegionInfo info = new HRegionInfo(TABLENAME, null, null, false); 096 Path path = new Path(DIR, callingMethod); 097 return HBaseTestingUtility.createRegionAndWAL(info, path, conf, htd); 098 } 099 100 // A helper function to create puts. 101 private Put createPut(int familyNum, int putNum) { 102 byte[] qf = Bytes.toBytes("q" + familyNum); 103 byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum); 104 byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum); 105 Put p = new Put(row); 106 p.addColumn(FAMILIES[familyNum - 1], qf, val); 107 return p; 108 } 109 110 // A helper function to create puts. 111 private Get createGet(int familyNum, int putNum) { 112 byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum); 113 return new Get(row); 114 } 115 116 // A helper function to verify edits. 117 void verifyEdit(int familyNum, int putNum, Table table) throws IOException { 118 Result r = table.get(createGet(familyNum, putNum)); 119 byte[] family = FAMILIES[familyNum - 1]; 120 byte[] qf = Bytes.toBytes("q" + familyNum); 121 byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum); 122 assertNotNull(("Missing Put#" + putNum + " for CF# " + familyNum), r.getFamilyMap(family)); 123 assertNotNull(("Missing Put#" + putNum + " for CF# " + familyNum), 124 r.getFamilyMap(family).get(qf)); 125 assertTrue(("Incorrect value for Put#" + putNum + " for CF# " + familyNum), 126 Arrays.equals(r.getFamilyMap(family).get(qf), val)); 127 } 128 129 @Test 130 public void testSelectiveFlushWhenEnabled() throws IOException { 131 // Set up the configuration, use new one to not conflict with minicluster in other tests 132 Configuration conf = new HBaseTestingUtility().getConfiguration(); 133 conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 200 * 1024); 134 conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllLargeStoresPolicy.class.getName()); 135 conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 40 * 1024); 136 // Intialize the region 137 HRegion region = initHRegion("testSelectiveFlushWithDataCompaction", conf); 138 // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3 139 for (int i = 1; i <= 1200; i++) { 140 region.put(createPut(1, i)); 141 142 if (i <= 100) { 143 region.put(createPut(2, i)); 144 if (i <= 50) { 145 region.put(createPut(3, i)); 146 } 147 } 148 } 149 150 long totalMemstoreSize = region.getMemStoreDataSize(); 151 152 // Find the smallest LSNs for edits wrt to each CF. 153 long smallestSeqCF1 = region.getOldestSeqIdOfStore(FAMILY1); 154 long smallestSeqCF2 = region.getOldestSeqIdOfStore(FAMILY2); 155 long smallestSeqCF3 = region.getOldestSeqIdOfStore(FAMILY3); 156 157 // Find the sizes of the memstores of each CF. 158 MemStoreSize cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize(); 159 MemStoreSize cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize(); 160 MemStoreSize cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize(); 161 162 // Get the overall smallest LSN in the region's memstores. 163 long smallestSeqInRegionCurrentMemstore = 164 getWAL(region).getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); 165 166 // The overall smallest LSN in the region's memstores should be the same as 167 // the LSN of the smallest edit in CF1 168 assertEquals(smallestSeqCF1, smallestSeqInRegionCurrentMemstore); 169 170 // Some other sanity checks. 171 assertTrue(smallestSeqCF1 < smallestSeqCF2); 172 assertTrue(smallestSeqCF2 < smallestSeqCF3); 173 assertTrue(cf1MemstoreSize.getDataSize() > 0); 174 assertTrue(cf2MemstoreSize.getDataSize() > 0); 175 assertTrue(cf3MemstoreSize.getDataSize() > 0); 176 177 // The total memstore size should be the same as the sum of the sizes of 178 // memstores of CF1, CF2 and CF3. 179 assertEquals(totalMemstoreSize, cf1MemstoreSize.getDataSize() + cf2MemstoreSize.getDataSize() 180 + cf3MemstoreSize.getDataSize()); 181 182 // Flush! 183 region.flush(false); 184 185 // Will use these to check if anything changed. 186 MemStoreSize oldCF2MemstoreSize = cf2MemstoreSize; 187 MemStoreSize oldCF3MemstoreSize = cf3MemstoreSize; 188 189 // Recalculate everything 190 cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize(); 191 cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize(); 192 cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize(); 193 totalMemstoreSize = region.getMemStoreDataSize(); 194 smallestSeqInRegionCurrentMemstore = 195 getWAL(region).getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); 196 197 // We should have cleared out only CF1, since we chose the flush thresholds 198 // and number of puts accordingly. 199 assertEquals(0, cf1MemstoreSize.getDataSize()); 200 assertEquals(MutableSegment.DEEP_OVERHEAD, cf1MemstoreSize.getHeapSize()); 201 // Nothing should have happened to CF2, ... 202 assertEquals(cf2MemstoreSize, oldCF2MemstoreSize); 203 // ... or CF3 204 assertEquals(cf3MemstoreSize, oldCF3MemstoreSize); 205 // Now the smallest LSN in the region should be the same as the smallest 206 // LSN in the memstore of CF2. 207 assertEquals(smallestSeqInRegionCurrentMemstore, smallestSeqCF2); 208 // Of course, this should hold too. 209 assertEquals(totalMemstoreSize, cf2MemstoreSize.getDataSize() + cf3MemstoreSize.getDataSize()); 210 211 // Now add more puts (mostly for CF2), so that we only flush CF2 this time. 212 for (int i = 1200; i < 2400; i++) { 213 region.put(createPut(2, i)); 214 215 // Add only 100 puts for CF3 216 if (i - 1200 < 100) { 217 region.put(createPut(3, i)); 218 } 219 } 220 221 // How much does the CF3 memstore occupy? Will be used later. 222 oldCF3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize(); 223 224 // Flush again 225 region.flush(false); 226 227 // Recalculate everything 228 cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize(); 229 cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize(); 230 cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize(); 231 totalMemstoreSize = region.getMemStoreDataSize(); 232 smallestSeqInRegionCurrentMemstore = 233 getWAL(region).getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); 234 235 // CF1 and CF2, both should be absent. 236 assertEquals(0, cf1MemstoreSize.getDataSize()); 237 assertEquals(MutableSegment.DEEP_OVERHEAD, cf1MemstoreSize.getHeapSize()); 238 assertEquals(0, cf2MemstoreSize.getDataSize()); 239 assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSize.getHeapSize()); 240 // CF3 shouldn't have been touched. 241 assertEquals(cf3MemstoreSize, oldCF3MemstoreSize); 242 assertEquals(totalMemstoreSize, cf3MemstoreSize.getDataSize()); 243 244 // What happens when we hit the memstore limit, but we are not able to find 245 // any Column Family above the threshold? 246 // In that case, we should flush all the CFs. 247 248 // Clearing the existing memstores. 249 region.flush(true); 250 251 // The memstore limit is 200*1024 and the column family flush threshold is 252 // around 50*1024. We try to just hit the memstore limit with each CF's 253 // memstore being below the CF flush threshold. 254 for (int i = 1; i <= 300; i++) { 255 region.put(createPut(1, i)); 256 region.put(createPut(2, i)); 257 region.put(createPut(3, i)); 258 region.put(createPut(4, i)); 259 region.put(createPut(5, i)); 260 } 261 262 region.flush(false); 263 264 // Since we won't find any CF above the threshold, and hence no specific 265 // store to flush, we should flush all the memstores. 266 assertEquals(0, region.getMemStoreDataSize()); 267 HBaseTestingUtility.closeRegionAndWAL(region); 268 } 269 270 @Test 271 public void testSelectiveFlushWhenNotEnabled() throws IOException { 272 // Set up the configuration, use new one to not conflict with minicluster in other tests 273 Configuration conf = new HBaseTestingUtility().getConfiguration(); 274 conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 200 * 1024); 275 conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllStoresPolicy.class.getName()); 276 277 // Intialize the HRegion 278 HRegion region = initHRegion("testSelectiveFlushWhenNotEnabled", conf); 279 // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3 280 for (int i = 1; i <= 1200; i++) { 281 region.put(createPut(1, i)); 282 if (i <= 100) { 283 region.put(createPut(2, i)); 284 if (i <= 50) { 285 region.put(createPut(3, i)); 286 } 287 } 288 } 289 290 long totalMemstoreSize = region.getMemStoreDataSize(); 291 292 // Find the sizes of the memstores of each CF. 293 MemStoreSize cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize(); 294 MemStoreSize cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize(); 295 MemStoreSize cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize(); 296 297 // Some other sanity checks. 298 assertTrue(cf1MemstoreSize.getDataSize() > 0); 299 assertTrue(cf2MemstoreSize.getDataSize() > 0); 300 assertTrue(cf3MemstoreSize.getDataSize() > 0); 301 302 // The total memstore size should be the same as the sum of the sizes of 303 // memstores of CF1, CF2 and CF3. 304 assertEquals(totalMemstoreSize, cf1MemstoreSize.getDataSize() + cf2MemstoreSize.getDataSize() 305 + cf3MemstoreSize.getDataSize()); 306 307 // Flush! 308 region.flush(false); 309 310 cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize(); 311 cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize(); 312 cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize(); 313 totalMemstoreSize = region.getMemStoreDataSize(); 314 long smallestSeqInRegionCurrentMemstore = 315 region.getWAL().getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); 316 317 // Everything should have been cleared 318 assertEquals(0, cf1MemstoreSize.getDataSize()); 319 assertEquals(MutableSegment.DEEP_OVERHEAD, cf1MemstoreSize.getHeapSize()); 320 assertEquals(0, cf2MemstoreSize.getDataSize()); 321 assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSize.getHeapSize()); 322 assertEquals(0, cf3MemstoreSize.getDataSize()); 323 assertEquals(MutableSegment.DEEP_OVERHEAD, cf3MemstoreSize.getHeapSize()); 324 assertEquals(0, totalMemstoreSize); 325 assertEquals(HConstants.NO_SEQNUM, smallestSeqInRegionCurrentMemstore); 326 HBaseTestingUtility.closeRegionAndWAL(region); 327 } 328 329 // Find the (first) region which has the specified name. 330 private static Pair<HRegion, HRegionServer> getRegionWithName(TableName tableName) { 331 MiniHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster(); 332 List<JVMClusterUtil.RegionServerThread> rsts = cluster.getRegionServerThreads(); 333 for (int i = 0; i < cluster.getRegionServerThreads().size(); i++) { 334 HRegionServer hrs = rsts.get(i).getRegionServer(); 335 for (HRegion region : hrs.getRegions(tableName)) { 336 return Pair.newPair(region, hrs); 337 } 338 } 339 return null; 340 } 341 342 private void doTestLogReplay() throws Exception { 343 Configuration conf = TEST_UTIL.getConfiguration(); 344 conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 10000); 345 // Carefully chosen limits so that the memstore just flushes when we're done 346 conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllLargeStoresPolicy.class.getName()); 347 conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 2500); 348 final int numRegionServers = 4; 349 try { 350 TEST_UTIL.startMiniCluster(numRegionServers); 351 TEST_UTIL.getAdmin() 352 .createNamespace(NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build()); 353 Table table = TEST_UTIL.createTable(TABLENAME, FAMILIES); 354 HTableDescriptor htd = table.getTableDescriptor(); 355 356 for (byte[] family : FAMILIES) { 357 if (!htd.hasFamily(family)) { 358 htd.addFamily(new HColumnDescriptor(family)); 359 } 360 } 361 362 // Add 100 edits for CF1, 20 for CF2, 20 for CF3. 363 // These will all be interleaved in the log. 364 for (int i = 1; i <= 80; i++) { 365 table.put(createPut(1, i)); 366 if (i <= 10) { 367 table.put(createPut(2, i)); 368 table.put(createPut(3, i)); 369 } 370 } 371 Thread.sleep(1000); 372 373 Pair<HRegion, HRegionServer> desiredRegionAndServer = getRegionWithName(TABLENAME); 374 HRegion desiredRegion = desiredRegionAndServer.getFirst(); 375 assertTrue("Could not find a region which hosts the new region.", desiredRegion != null); 376 377 // Flush the region selectively. 378 desiredRegion.flush(false); 379 380 long totalMemstoreSize; 381 long cf1MemstoreSize, cf2MemstoreSize, cf3MemstoreSize; 382 totalMemstoreSize = desiredRegion.getMemStoreDataSize(); 383 384 // Find the sizes of the memstores of each CF. 385 cf1MemstoreSize = desiredRegion.getStore(FAMILY1).getMemStoreSize().getDataSize(); 386 cf2MemstoreSize = desiredRegion.getStore(FAMILY2).getMemStoreSize().getDataSize(); 387 cf3MemstoreSize = desiredRegion.getStore(FAMILY3).getMemStoreSize().getDataSize(); 388 389 // CF1 Should have been flushed 390 assertEquals(0, cf1MemstoreSize); 391 // CF2 and CF3 shouldn't have been flushed. 392 // TODO: This test doesn't allow for this case: 393 // " Since none of the CFs were above the size, flushing all." 394 // i.e. a flush happens before we get to here and its a flush-all. 395 assertTrue(cf2MemstoreSize >= 0); 396 assertTrue(cf3MemstoreSize >= 0); 397 assertEquals(totalMemstoreSize, cf2MemstoreSize + cf3MemstoreSize); 398 399 // Wait for the RS report to go across to the master, so that the master 400 // is aware of which sequence ids have been flushed, before we kill the RS. 401 // If in production, the RS dies before the report goes across, we will 402 // safely replay all the edits. 403 Thread.sleep(2000); 404 405 // Abort the region server where we have the region hosted. 406 HRegionServer rs = desiredRegionAndServer.getSecond(); 407 rs.abort("testing"); 408 409 // The aborted region server's regions will be eventually assigned to some 410 // other region server, and the get RPC call (inside verifyEdit()) will 411 // retry for some time till the regions come back up. 412 413 // Verify that all the edits are safe. 414 for (int i = 1; i <= 80; i++) { 415 verifyEdit(1, i, table); 416 if (i <= 10) { 417 verifyEdit(2, i, table); 418 verifyEdit(3, i, table); 419 } 420 } 421 } finally { 422 TEST_UTIL.shutdownMiniCluster(); 423 } 424 } 425 426 // Test Log Replay with Distributed log split on. 427 @Test 428 public void testLogReplayWithDistributedLogSplit() throws Exception { 429 doTestLogReplay(); 430 } 431 432 private WAL getWAL(Region region) { 433 return ((HRegion) region).getWAL(); 434 } 435 436 private int getNumRolledLogFiles(Region region) { 437 return AbstractFSWALProvider.getNumRolledLogFiles(getWAL(region)); 438 } 439 440 /** 441 * When a log roll is about to happen, we do a flush of the regions who will be affected by the 442 * log roll. These flushes cannot be a selective flushes, otherwise we cannot roll the logs. This 443 * test ensures that we do a full-flush in that scenario. n 444 */ 445 @Test 446 public void testFlushingWhenLogRolling() throws Exception { 447 TableName tableName = TableName.valueOf("testFlushingWhenLogRolling"); 448 Configuration conf = TEST_UTIL.getConfiguration(); 449 conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 128 * 1024 * 1024); 450 conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllLargeStoresPolicy.class.getName()); 451 long cfFlushSizeLowerBound = 2048; 452 conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 453 cfFlushSizeLowerBound); 454 455 // One hour, prevent periodic rolling 456 conf.setLong("hbase.regionserver.logroll.period", 60L * 60 * 1000); 457 // prevent rolling by size 458 conf.setLong("hbase.regionserver.hlog.blocksize", 128L * 1024 * 1024); 459 // Make it 10 as max logs before a flush comes on. 460 final int maxLogs = 10; 461 conf.setInt("hbase.regionserver.maxlogs", maxLogs); 462 463 final int numRegionServers = 1; 464 TEST_UTIL.startMiniCluster(numRegionServers); 465 try { 466 Table table = TEST_UTIL.createTable(tableName, FAMILIES); 467 // Force flush the namespace table so edits to it are not hanging around as oldest 468 // edits. Otherwise, below, when we make maximum number of WAL files, then it will be 469 // the namespace region that is flushed and not the below 'desiredRegion'. 470 try (Admin admin = TEST_UTIL.getConnection().getAdmin()) { 471 admin.flush(TableName.NAMESPACE_TABLE_NAME); 472 } 473 Pair<HRegion, HRegionServer> desiredRegionAndServer = getRegionWithName(tableName); 474 final HRegion desiredRegion = desiredRegionAndServer.getFirst(); 475 assertTrue("Could not find a region which hosts the new region.", desiredRegion != null); 476 LOG.info("Writing to region=" + desiredRegion); 477 478 // Add one row for both CFs. 479 for (int i = 1; i <= 3; i++) { 480 table.put(createPut(i, 0)); 481 } 482 // Now only add row to CF1, make sure when we force a flush, CF1 is larger than the lower 483 // bound and CF2 and CF3 are smaller than the lower bound. 484 for (int i = 0; i < maxLogs; i++) { 485 for (int j = 0; j < 100; j++) { 486 table.put(createPut(1, i * 100 + j)); 487 } 488 // Roll the WAL. The log file count is less than maxLogs so no flush is triggered. 489 int currentNumRolledLogFiles = getNumRolledLogFiles(desiredRegion); 490 assertNull(getWAL(desiredRegion).rollWriter()); 491 while (getNumRolledLogFiles(desiredRegion) <= currentNumRolledLogFiles) { 492 Thread.sleep(100); 493 } 494 } 495 assertEquals(maxLogs, getNumRolledLogFiles(desiredRegion)); 496 assertTrue( 497 desiredRegion.getStore(FAMILY1).getMemStoreSize().getHeapSize() > cfFlushSizeLowerBound); 498 assertTrue( 499 desiredRegion.getStore(FAMILY2).getMemStoreSize().getHeapSize() < cfFlushSizeLowerBound); 500 assertTrue( 501 desiredRegion.getStore(FAMILY3).getMemStoreSize().getHeapSize() < cfFlushSizeLowerBound); 502 table.put(createPut(1, 12345678)); 503 // Make numRolledLogFiles greater than maxLogs 504 desiredRegionAndServer.getSecond().getWalRoller().requestRollAll(); 505 // Wait for some time till the flush caused by log rolling happens. 506 TEST_UTIL.waitFor(30000, new Waiter.ExplainingPredicate<Exception>() { 507 508 @Override 509 public boolean evaluate() throws Exception { 510 return desiredRegion.getMemStoreDataSize() == 0; 511 } 512 513 @Override 514 public String explainFailure() throws Exception { 515 long memstoreSize = desiredRegion.getMemStoreDataSize(); 516 if (memstoreSize > 0) { 517 return "Still have unflushed entries in memstore, memstore size is " + memstoreSize; 518 } 519 return "Unknown"; 520 } 521 }); 522 LOG.info("Finished waiting on flush after too many WALs..."); 523 // Individual families should have been flushed. 524 assertEquals(MutableSegment.DEEP_OVERHEAD, 525 desiredRegion.getStore(FAMILY1).getMemStoreSize().getHeapSize()); 526 assertEquals(MutableSegment.DEEP_OVERHEAD, 527 desiredRegion.getStore(FAMILY2).getMemStoreSize().getHeapSize()); 528 assertEquals(MutableSegment.DEEP_OVERHEAD, 529 desiredRegion.getStore(FAMILY3).getMemStoreSize().getHeapSize()); 530 // let WAL cleanOldLogs 531 assertNull(getWAL(desiredRegion).rollWriter(true)); 532 assertTrue(getNumRolledLogFiles(desiredRegion) < maxLogs); 533 } finally { 534 TEST_UTIL.shutdownMiniCluster(); 535 } 536 } 537 538 private void doPut(Table table, long memstoreFlushSize) throws IOException, InterruptedException { 539 Region region = getRegionWithName(table.getName()).getFirst(); 540 // cf1 4B per row, cf2 40B per row and cf3 400B per row 541 byte[] qf = Bytes.toBytes("qf"); 542 for (int i = 0; i < 10000; i++) { 543 Put put = new Put(Bytes.toBytes("row-" + i)); 544 byte[] value1 = new byte[100]; 545 Bytes.random(value1); 546 put.addColumn(FAMILY1, qf, value1); 547 byte[] value2 = new byte[200]; 548 Bytes.random(value2); 549 put.addColumn(FAMILY2, qf, value2); 550 byte[] value3 = new byte[400]; 551 Bytes.random(value3); 552 put.addColumn(FAMILY3, qf, value3); 553 table.put(put); 554 // slow down to let regionserver flush region. 555 while (region.getMemStoreHeapSize() > memstoreFlushSize) { 556 Thread.sleep(100); 557 } 558 } 559 } 560 561 // Under the same write load, small stores should have less store files when 562 // percolumnfamilyflush enabled. 563 @Test 564 public void testCompareStoreFileCount() throws Exception { 565 long memstoreFlushSize = 1024L * 1024; 566 Configuration conf = TEST_UTIL.getConfiguration(); 567 conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, memstoreFlushSize); 568 conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllStoresPolicy.class.getName()); 569 conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000); 570 conf.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY, 571 ConstantSizeRegionSplitPolicy.class.getName()); 572 573 HTableDescriptor htd = new HTableDescriptor(TABLENAME); 574 htd.setCompactionEnabled(false); 575 htd.addFamily(new HColumnDescriptor(FAMILY1)); 576 htd.addFamily(new HColumnDescriptor(FAMILY2)); 577 htd.addFamily(new HColumnDescriptor(FAMILY3)); 578 579 LOG.info("==============Test with selective flush disabled==============="); 580 int cf1StoreFileCount = -1; 581 int cf2StoreFileCount = -1; 582 int cf3StoreFileCount = -1; 583 int cf1StoreFileCount1 = -1; 584 int cf2StoreFileCount1 = -1; 585 int cf3StoreFileCount1 = -1; 586 try { 587 TEST_UTIL.startMiniCluster(1); 588 TEST_UTIL.getAdmin() 589 .createNamespace(NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build()); 590 TEST_UTIL.getAdmin().createTable(htd); 591 TEST_UTIL.waitTableAvailable(TABLENAME); 592 Connection conn = ConnectionFactory.createConnection(conf); 593 Table table = conn.getTable(TABLENAME); 594 doPut(table, memstoreFlushSize); 595 table.close(); 596 conn.close(); 597 598 Region region = getRegionWithName(TABLENAME).getFirst(); 599 cf1StoreFileCount = region.getStore(FAMILY1).getStorefilesCount(); 600 cf2StoreFileCount = region.getStore(FAMILY2).getStorefilesCount(); 601 cf3StoreFileCount = region.getStore(FAMILY3).getStorefilesCount(); 602 } finally { 603 TEST_UTIL.shutdownMiniCluster(); 604 } 605 606 LOG.info("==============Test with selective flush enabled==============="); 607 conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllLargeStoresPolicy.class.getName()); 608 // default value of per-cf flush lower bound is too big, set to a small enough value 609 conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 0); 610 try { 611 TEST_UTIL.startMiniCluster(1); 612 TEST_UTIL.getAdmin() 613 .createNamespace(NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build()); 614 TEST_UTIL.getAdmin().createTable(htd); 615 Connection conn = ConnectionFactory.createConnection(conf); 616 Table table = conn.getTable(TABLENAME); 617 doPut(table, memstoreFlushSize); 618 table.close(); 619 conn.close(); 620 621 Region region = getRegionWithName(TABLENAME).getFirst(); 622 cf1StoreFileCount1 = region.getStore(FAMILY1).getStorefilesCount(); 623 cf2StoreFileCount1 = region.getStore(FAMILY2).getStorefilesCount(); 624 cf3StoreFileCount1 = region.getStore(FAMILY3).getStorefilesCount(); 625 } finally { 626 TEST_UTIL.shutdownMiniCluster(); 627 } 628 629 LOG.info("disable selective flush: " + Bytes.toString(FAMILY1) + "=>" + cf1StoreFileCount + ", " 630 + Bytes.toString(FAMILY2) + "=>" + cf2StoreFileCount + ", " + Bytes.toString(FAMILY3) + "=>" 631 + cf3StoreFileCount); 632 LOG.info("enable selective flush: " + Bytes.toString(FAMILY1) + "=>" + cf1StoreFileCount1 + ", " 633 + Bytes.toString(FAMILY2) + "=>" + cf2StoreFileCount1 + ", " + Bytes.toString(FAMILY3) + "=>" 634 + cf3StoreFileCount1); 635 // small CF will have less store files. 636 assertTrue(cf1StoreFileCount1 < cf1StoreFileCount); 637 assertTrue(cf2StoreFileCount1 < cf2StoreFileCount); 638 } 639 640 public static void main(String[] args) throws Exception { 641 int numRegions = Integer.parseInt(args[0]); 642 long numRows = Long.parseLong(args[1]); 643 644 HTableDescriptor htd = new HTableDescriptor(TABLENAME); 645 htd.setMaxFileSize(10L * 1024 * 1024 * 1024); 646 htd.setValue(HTableDescriptor.SPLIT_POLICY, ConstantSizeRegionSplitPolicy.class.getName()); 647 htd.addFamily(new HColumnDescriptor(FAMILY1)); 648 htd.addFamily(new HColumnDescriptor(FAMILY2)); 649 htd.addFamily(new HColumnDescriptor(FAMILY3)); 650 651 Configuration conf = HBaseConfiguration.create(); 652 Connection conn = ConnectionFactory.createConnection(conf); 653 Admin admin = conn.getAdmin(); 654 if (admin.tableExists(TABLENAME)) { 655 admin.disableTable(TABLENAME); 656 admin.deleteTable(TABLENAME); 657 } 658 if (numRegions >= 3) { 659 byte[] startKey = new byte[16]; 660 byte[] endKey = new byte[16]; 661 Arrays.fill(endKey, (byte) 0xFF); 662 admin.createTable(htd, startKey, endKey, numRegions); 663 } else { 664 admin.createTable(htd); 665 } 666 admin.close(); 667 668 Table table = conn.getTable(TABLENAME); 669 byte[] qf = Bytes.toBytes("qf"); 670 byte[] value1 = new byte[16]; 671 byte[] value2 = new byte[256]; 672 byte[] value3 = new byte[4096]; 673 for (long i = 0; i < numRows; i++) { 674 Put put = new Put(Hashing.md5().hashLong(i).asBytes()); 675 Bytes.random(value1); 676 Bytes.random(value2); 677 Bytes.random(value3); 678 put.addColumn(FAMILY1, qf, value1); 679 put.addColumn(FAMILY2, qf, value2); 680 put.addColumn(FAMILY3, qf, value3); 681 table.put(put); 682 if (i % 10000 == 0) { 683 LOG.info(i + " rows put"); 684 } 685 } 686 table.close(); 687 conn.close(); 688 } 689}