001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertNotNull; 022import static org.junit.jupiter.api.Assertions.assertNull; 023import static org.junit.jupiter.api.Assertions.assertTrue; 024 025import java.io.IOException; 026import java.util.Arrays; 027import java.util.List; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.fs.Path; 030import org.apache.hadoop.hbase.HBaseConfiguration; 031import org.apache.hadoop.hbase.HBaseTestingUtil; 032import org.apache.hadoop.hbase.HConstants; 033import org.apache.hadoop.hbase.NamespaceDescriptor; 034import org.apache.hadoop.hbase.SingleProcessHBaseCluster; 035import org.apache.hadoop.hbase.TableName; 036import org.apache.hadoop.hbase.Waiter; 037import org.apache.hadoop.hbase.client.Admin; 038import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 039import org.apache.hadoop.hbase.client.Connection; 040import org.apache.hadoop.hbase.client.ConnectionFactory; 041import org.apache.hadoop.hbase.client.Get; 042import org.apache.hadoop.hbase.client.Put; 043import org.apache.hadoop.hbase.client.RegionInfo; 044import org.apache.hadoop.hbase.client.RegionInfoBuilder; 045import org.apache.hadoop.hbase.client.Result; 046import org.apache.hadoop.hbase.client.Table; 047import org.apache.hadoop.hbase.client.TableDescriptor; 048import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 049import org.apache.hadoop.hbase.regionserver.wal.AbstractTestFSWAL; 050import org.apache.hadoop.hbase.testclassification.LargeTests; 051import org.apache.hadoop.hbase.testclassification.RegionServerTests; 052import org.apache.hadoop.hbase.util.Bytes; 053import org.apache.hadoop.hbase.util.JVMClusterUtil; 054import org.apache.hadoop.hbase.util.Pair; 055import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 056import org.apache.hadoop.hbase.wal.WAL; 057import org.junit.jupiter.api.Tag; 058import org.junit.jupiter.api.Test; 059import org.slf4j.Logger; 060import org.slf4j.LoggerFactory; 061 062import org.apache.hbase.thirdparty.com.google.common.hash.Hashing; 063 064/** 065 * This test verifies the correctness of the Per Column Family flushing strategy 066 */ 067@Tag(RegionServerTests.TAG) 068@Tag(LargeTests.TAG) 069public class TestPerColumnFamilyFlush { 070 071 private static final Logger LOG = LoggerFactory.getLogger(TestPerColumnFamilyFlush.class); 072 073 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 074 075 private static final Path DIR = TEST_UTIL.getDataTestDir("TestHRegion"); 076 077 public static final TableName TABLENAME = TableName.valueOf("TestPerColumnFamilyFlush", "t1"); 078 079 public static final byte[][] FAMILIES = { Bytes.toBytes("f1"), Bytes.toBytes("f2"), 080 Bytes.toBytes("f3"), Bytes.toBytes("f4"), Bytes.toBytes("f5") }; 081 082 public static final byte[] FAMILY1 = FAMILIES[0]; 083 084 public static final byte[] FAMILY2 = FAMILIES[1]; 085 086 public static final byte[] FAMILY3 = FAMILIES[2]; 087 088 private HRegion initHRegion(String callingMethod, Configuration conf) throws IOException { 089 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TABLENAME); 090 for (byte[] family : FAMILIES) { 091 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)); 092 } 093 RegionInfo info = RegionInfoBuilder.newBuilder(TABLENAME).build(); 094 Path path = new Path(DIR, callingMethod); 095 return HBaseTestingUtil.createRegionAndWAL(info, path, conf, builder.build()); 096 } 097 098 // A helper function to create puts. 099 private Put createPut(int familyNum, int putNum) { 100 byte[] qf = Bytes.toBytes("q" + familyNum); 101 byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum); 102 byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum); 103 Put p = new Put(row); 104 p.addColumn(FAMILIES[familyNum - 1], qf, val); 105 return p; 106 } 107 108 // A helper function to create puts. 109 private Get createGet(int familyNum, int putNum) { 110 byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum); 111 return new Get(row); 112 } 113 114 // A helper function to verify edits. 115 void verifyEdit(int familyNum, int putNum, Table table) throws IOException { 116 Result r = table.get(createGet(familyNum, putNum)); 117 byte[] family = FAMILIES[familyNum - 1]; 118 byte[] qf = Bytes.toBytes("q" + familyNum); 119 byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum); 120 assertNotNull(r.getFamilyMap(family), "Missing Put#" + putNum + " for CF# " + familyNum); 121 assertNotNull(r.getFamilyMap(family).get(qf), 122 "Missing Put#" + putNum + " for CF# " + familyNum); 123 assertTrue(Arrays.equals(r.getFamilyMap(family).get(qf), val), 124 "Incorrect value for Put#" + putNum + " for CF# " + familyNum); 125 } 126 127 @Test 128 public void testSelectiveFlushWhenEnabled() throws IOException { 129 // Set up the configuration, use new one to not conflict with minicluster in other tests 130 Configuration conf = new HBaseTestingUtil().getConfiguration(); 131 conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 200 * 1024); 132 conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllLargeStoresPolicy.class.getName()); 133 conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 40 * 1024); 134 // Intialize the region 135 HRegion region = initHRegion("testSelectiveFlushWithDataCompaction", conf); 136 // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3 137 for (int i = 1; i <= 1200; i++) { 138 region.put(createPut(1, i)); 139 140 if (i <= 100) { 141 region.put(createPut(2, i)); 142 if (i <= 50) { 143 region.put(createPut(3, i)); 144 } 145 } 146 } 147 148 long totalMemstoreSize = region.getMemStoreDataSize(); 149 150 // Find the smallest LSNs for edits wrt to each CF. 151 long smallestSeqCF1 = region.getOldestSeqIdOfStore(FAMILY1); 152 long smallestSeqCF2 = region.getOldestSeqIdOfStore(FAMILY2); 153 long smallestSeqCF3 = region.getOldestSeqIdOfStore(FAMILY3); 154 155 // Find the sizes of the memstores of each CF. 156 MemStoreSize cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize(); 157 MemStoreSize cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize(); 158 MemStoreSize cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize(); 159 160 // Get the overall smallest LSN in the region's memstores. 161 long smallestSeqInRegionCurrentMemstore = AbstractTestFSWAL 162 .getEarliestMemStoreSeqNum(getWAL(region), region.getRegionInfo().getEncodedNameAsBytes()); 163 164 // The overall smallest LSN in the region's memstores should be the same as 165 // the LSN of the smallest edit in CF1 166 assertEquals(smallestSeqCF1, smallestSeqInRegionCurrentMemstore); 167 168 // Some other sanity checks. 169 assertTrue(smallestSeqCF1 < smallestSeqCF2); 170 assertTrue(smallestSeqCF2 < smallestSeqCF3); 171 assertTrue(cf1MemstoreSize.getDataSize() > 0); 172 assertTrue(cf2MemstoreSize.getDataSize() > 0); 173 assertTrue(cf3MemstoreSize.getDataSize() > 0); 174 175 // The total memstore size should be the same as the sum of the sizes of 176 // memstores of CF1, CF2 and CF3. 177 assertEquals(totalMemstoreSize, cf1MemstoreSize.getDataSize() + cf2MemstoreSize.getDataSize() 178 + cf3MemstoreSize.getDataSize()); 179 180 // Flush! 181 region.flush(false); 182 183 // Will use these to check if anything changed. 184 MemStoreSize oldCF2MemstoreSize = cf2MemstoreSize; 185 MemStoreSize oldCF3MemstoreSize = cf3MemstoreSize; 186 187 // Recalculate everything 188 cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize(); 189 cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize(); 190 cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize(); 191 totalMemstoreSize = region.getMemStoreDataSize(); 192 smallestSeqInRegionCurrentMemstore = AbstractTestFSWAL.getEarliestMemStoreSeqNum(getWAL(region), 193 region.getRegionInfo().getEncodedNameAsBytes()); 194 195 // We should have cleared out only CF1, since we chose the flush thresholds 196 // and number of puts accordingly. 197 assertEquals(0, cf1MemstoreSize.getDataSize()); 198 assertEquals(MutableSegment.DEEP_OVERHEAD, cf1MemstoreSize.getHeapSize()); 199 // Nothing should have happened to CF2, ... 200 assertEquals(cf2MemstoreSize, oldCF2MemstoreSize); 201 // ... or CF3 202 assertEquals(cf3MemstoreSize, oldCF3MemstoreSize); 203 // Now the smallest LSN in the region should be the same as the smallest 204 // LSN in the memstore of CF2. 205 assertEquals(smallestSeqInRegionCurrentMemstore, smallestSeqCF2); 206 // Of course, this should hold too. 207 assertEquals(totalMemstoreSize, cf2MemstoreSize.getDataSize() + cf3MemstoreSize.getDataSize()); 208 209 // Now add more puts (mostly for CF2), so that we only flush CF2 this time. 210 for (int i = 1200; i < 2400; i++) { 211 region.put(createPut(2, i)); 212 213 // Add only 100 puts for CF3 214 if (i - 1200 < 100) { 215 region.put(createPut(3, i)); 216 } 217 } 218 219 // How much does the CF3 memstore occupy? Will be used later. 220 oldCF3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize(); 221 222 // Flush again 223 region.flush(false); 224 225 // Recalculate everything 226 cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize(); 227 cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize(); 228 cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize(); 229 totalMemstoreSize = region.getMemStoreDataSize(); 230 smallestSeqInRegionCurrentMemstore = AbstractTestFSWAL.getEarliestMemStoreSeqNum(getWAL(region), 231 region.getRegionInfo().getEncodedNameAsBytes()); 232 233 // CF1 and CF2, both should be absent. 234 assertEquals(0, cf1MemstoreSize.getDataSize()); 235 assertEquals(MutableSegment.DEEP_OVERHEAD, cf1MemstoreSize.getHeapSize()); 236 assertEquals(0, cf2MemstoreSize.getDataSize()); 237 assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSize.getHeapSize()); 238 // CF3 shouldn't have been touched. 239 assertEquals(cf3MemstoreSize, oldCF3MemstoreSize); 240 assertEquals(totalMemstoreSize, cf3MemstoreSize.getDataSize()); 241 assertEquals(smallestSeqInRegionCurrentMemstore, smallestSeqCF3); 242 243 // What happens when we hit the memstore limit, but we are not able to find 244 // any Column Family above the threshold? 245 // In that case, we should flush all the CFs. 246 247 // Clearing the existing memstores. 248 region.flush(true); 249 250 // The memstore limit is 200*1024 and the column family flush threshold is 251 // around 50*1024. We try to just hit the memstore limit with each CF's 252 // memstore being below the CF flush threshold. 253 for (int i = 1; i <= 300; i++) { 254 region.put(createPut(1, i)); 255 region.put(createPut(2, i)); 256 region.put(createPut(3, i)); 257 region.put(createPut(4, i)); 258 region.put(createPut(5, i)); 259 } 260 261 region.flush(false); 262 263 // Since we won't find any CF above the threshold, and hence no specific 264 // store to flush, we should flush all the memstores. 265 assertEquals(0, region.getMemStoreDataSize()); 266 HBaseTestingUtil.closeRegionAndWAL(region); 267 } 268 269 @Test 270 public void testSelectiveFlushWhenNotEnabled() throws IOException { 271 // Set up the configuration, use new one to not conflict with minicluster in other tests 272 Configuration conf = new HBaseTestingUtil().getConfiguration(); 273 conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 200 * 1024); 274 conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllStoresPolicy.class.getName()); 275 276 // Intialize the HRegion 277 HRegion region = initHRegion("testSelectiveFlushWhenNotEnabled", conf); 278 // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3 279 for (int i = 1; i <= 1200; i++) { 280 region.put(createPut(1, i)); 281 if (i <= 100) { 282 region.put(createPut(2, i)); 283 if (i <= 50) { 284 region.put(createPut(3, i)); 285 } 286 } 287 } 288 289 long totalMemstoreSize = region.getMemStoreDataSize(); 290 291 // Find the sizes of the memstores of each CF. 292 MemStoreSize cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize(); 293 MemStoreSize cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize(); 294 MemStoreSize cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize(); 295 296 // Some other sanity checks. 297 assertTrue(cf1MemstoreSize.getDataSize() > 0); 298 assertTrue(cf2MemstoreSize.getDataSize() > 0); 299 assertTrue(cf3MemstoreSize.getDataSize() > 0); 300 301 // The total memstore size should be the same as the sum of the sizes of 302 // memstores of CF1, CF2 and CF3. 303 assertEquals(totalMemstoreSize, cf1MemstoreSize.getDataSize() + cf2MemstoreSize.getDataSize() 304 + cf3MemstoreSize.getDataSize()); 305 306 // Flush! 307 region.flush(false); 308 309 cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize(); 310 cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize(); 311 cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize(); 312 totalMemstoreSize = region.getMemStoreDataSize(); 313 long smallestSeqInRegionCurrentMemstore = AbstractTestFSWAL 314 .getEarliestMemStoreSeqNum(region.getWAL(), region.getRegionInfo().getEncodedNameAsBytes()); 315 316 // Everything should have been cleared 317 assertEquals(0, cf1MemstoreSize.getDataSize()); 318 assertEquals(MutableSegment.DEEP_OVERHEAD, cf1MemstoreSize.getHeapSize()); 319 assertEquals(0, cf2MemstoreSize.getDataSize()); 320 assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSize.getHeapSize()); 321 assertEquals(0, cf3MemstoreSize.getDataSize()); 322 assertEquals(MutableSegment.DEEP_OVERHEAD, cf3MemstoreSize.getHeapSize()); 323 assertEquals(0, totalMemstoreSize); 324 assertEquals(HConstants.NO_SEQNUM, smallestSeqInRegionCurrentMemstore); 325 HBaseTestingUtil.closeRegionAndWAL(region); 326 } 327 328 // Find the (first) region which has the specified name. 329 private static Pair<HRegion, HRegionServer> getRegionWithName(TableName tableName) { 330 SingleProcessHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster(); 331 List<JVMClusterUtil.RegionServerThread> rsts = cluster.getRegionServerThreads(); 332 for (int i = 0; i < cluster.getRegionServerThreads().size(); i++) { 333 HRegionServer hrs = rsts.get(i).getRegionServer(); 334 for (HRegion region : hrs.getRegions(tableName)) { 335 return Pair.newPair(region, hrs); 336 } 337 } 338 return null; 339 } 340 341 private void doTestLogReplay() throws Exception { 342 Configuration conf = TEST_UTIL.getConfiguration(); 343 conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 10000); 344 // Carefully chosen limits so that the memstore just flushes when we're done 345 conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllLargeStoresPolicy.class.getName()); 346 conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 2500); 347 final int numRegionServers = 4; 348 try { 349 TEST_UTIL.startMiniCluster(numRegionServers); 350 TEST_UTIL.getAdmin() 351 .createNamespace(NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build()); 352 Table table = TEST_UTIL.createTable(TABLENAME, FAMILIES); 353 354 // Add 100 edits for CF1, 20 for CF2, 20 for CF3. 355 // These will all be interleaved in the log. 356 for (int i = 1; i <= 80; i++) { 357 table.put(createPut(1, i)); 358 if (i <= 10) { 359 table.put(createPut(2, i)); 360 table.put(createPut(3, i)); 361 } 362 } 363 Thread.sleep(1000); 364 365 Pair<HRegion, HRegionServer> desiredRegionAndServer = getRegionWithName(TABLENAME); 366 HRegion desiredRegion = desiredRegionAndServer.getFirst(); 367 assertNotNull(desiredRegion, "Could not find a region which hosts the new region."); 368 369 // Flush the region selectively. 370 desiredRegion.flush(false); 371 372 long totalMemstoreSize; 373 long cf1MemstoreSize, cf2MemstoreSize, cf3MemstoreSize; 374 totalMemstoreSize = desiredRegion.getMemStoreDataSize(); 375 376 // Find the sizes of the memstores of each CF. 377 cf1MemstoreSize = desiredRegion.getStore(FAMILY1).getMemStoreSize().getDataSize(); 378 cf2MemstoreSize = desiredRegion.getStore(FAMILY2).getMemStoreSize().getDataSize(); 379 cf3MemstoreSize = desiredRegion.getStore(FAMILY3).getMemStoreSize().getDataSize(); 380 381 // CF1 Should have been flushed 382 assertEquals(0, cf1MemstoreSize); 383 // CF2 and CF3 shouldn't have been flushed. 384 // TODO: This test doesn't allow for this case: 385 // " Since none of the CFs were above the size, flushing all." 386 // i.e. a flush happens before we get to here and its a flush-all. 387 assertTrue(cf2MemstoreSize >= 0); 388 assertTrue(cf3MemstoreSize >= 0); 389 assertEquals(totalMemstoreSize, cf2MemstoreSize + cf3MemstoreSize); 390 391 // Wait for the RS report to go across to the master, so that the master 392 // is aware of which sequence ids have been flushed, before we kill the RS. 393 // If in production, the RS dies before the report goes across, we will 394 // safely replay all the edits. 395 Thread.sleep(2000); 396 397 // Abort the region server where we have the region hosted. 398 HRegionServer rs = desiredRegionAndServer.getSecond(); 399 rs.abort("testing"); 400 401 // The aborted region server's regions will be eventually assigned to some 402 // other region server, and the get RPC call (inside verifyEdit()) will 403 // retry for some time till the regions come back up. 404 405 // Verify that all the edits are safe. 406 for (int i = 1; i <= 80; i++) { 407 verifyEdit(1, i, table); 408 if (i <= 10) { 409 verifyEdit(2, i, table); 410 verifyEdit(3, i, table); 411 } 412 } 413 } finally { 414 TEST_UTIL.shutdownMiniCluster(); 415 } 416 } 417 418 // Test Log Replay with Distributed log split on. 419 @Test 420 public void testLogReplayWithDistributedLogSplit() throws Exception { 421 doTestLogReplay(); 422 } 423 424 private WAL getWAL(Region region) { 425 return ((HRegion) region).getWAL(); 426 } 427 428 private int getNumRolledLogFiles(Region region) { 429 return AbstractFSWALProvider.getNumRolledLogFiles(getWAL(region)); 430 } 431 432 /** 433 * When a log roll is about to happen, we do a flush of the regions who will be affected by the 434 * log roll. These flushes cannot be a selective flushes, otherwise we cannot roll the logs. This 435 * test ensures that we do a full-flush in that scenario. 436 */ 437 @Test 438 public void testFlushingWhenLogRolling() throws Exception { 439 TableName tableName = TableName.valueOf("testFlushingWhenLogRolling"); 440 Configuration conf = TEST_UTIL.getConfiguration(); 441 conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 128 * 1024 * 1024); 442 conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllLargeStoresPolicy.class.getName()); 443 long cfFlushSizeLowerBound = 2048; 444 conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 445 cfFlushSizeLowerBound); 446 447 // One hour, prevent periodic rolling 448 conf.setLong("hbase.regionserver.logroll.period", 60L * 60 * 1000); 449 // prevent rolling by size 450 conf.setLong("hbase.regionserver.hlog.blocksize", 128L * 1024 * 1024); 451 // Make it 10 as max logs before a flush comes on. 452 final int maxLogs = 10; 453 conf.setInt("hbase.regionserver.maxlogs", maxLogs); 454 455 final int numRegionServers = 1; 456 TEST_UTIL.startMiniCluster(numRegionServers); 457 try { 458 Table table = TEST_UTIL.createTable(tableName, FAMILIES); 459 Pair<HRegion, HRegionServer> desiredRegionAndServer = getRegionWithName(tableName); 460 final HRegion desiredRegion = desiredRegionAndServer.getFirst(); 461 assertNotNull(desiredRegion, "Could not find a region which hosts the new region."); 462 LOG.info("Writing to region=" + desiredRegion); 463 464 // Add one row for both CFs. 465 for (int i = 1; i <= 3; i++) { 466 table.put(createPut(i, 0)); 467 } 468 // Now only add row to CF1, make sure when we force a flush, CF1 is larger than the lower 469 // bound and CF2 and CF3 are smaller than the lower bound. 470 for (int i = 0; i < maxLogs; i++) { 471 for (int j = 0; j < 100; j++) { 472 table.put(createPut(1, i * 100 + j)); 473 } 474 // Roll the WAL. The log file count is less than maxLogs so no flush is triggered. 475 int currentNumRolledLogFiles = getNumRolledLogFiles(desiredRegion); 476 assertNull(getWAL(desiredRegion).rollWriter()); 477 TEST_UTIL.waitFor(60000, 478 () -> getNumRolledLogFiles(desiredRegion) > currentNumRolledLogFiles); 479 } 480 assertEquals(maxLogs, getNumRolledLogFiles(desiredRegion)); 481 assertTrue( 482 desiredRegion.getStore(FAMILY1).getMemStoreSize().getHeapSize() > cfFlushSizeLowerBound); 483 assertTrue( 484 desiredRegion.getStore(FAMILY2).getMemStoreSize().getHeapSize() < cfFlushSizeLowerBound); 485 assertTrue( 486 desiredRegion.getStore(FAMILY3).getMemStoreSize().getHeapSize() < cfFlushSizeLowerBound); 487 table.put(createPut(1, 12345678)); 488 // Make numRolledLogFiles greater than maxLogs 489 desiredRegionAndServer.getSecond().getWalRoller().requestRollAll(); 490 // Wait for some time till the flush caused by log rolling happens. 491 TEST_UTIL.waitFor(30000, new Waiter.ExplainingPredicate<Exception>() { 492 493 @Override 494 public boolean evaluate() throws Exception { 495 return desiredRegion.getMemStoreDataSize() == 0; 496 } 497 498 @Override 499 public String explainFailure() throws Exception { 500 long memstoreSize = desiredRegion.getMemStoreDataSize(); 501 if (memstoreSize > 0) { 502 return "Still have unflushed entries in memstore, memstore size is " + memstoreSize; 503 } 504 return "Unknown"; 505 } 506 }); 507 LOG.info("Finished waiting on flush after too many WALs..."); 508 // Individual families should have been flushed. 509 assertEquals(MutableSegment.DEEP_OVERHEAD, 510 desiredRegion.getStore(FAMILY1).getMemStoreSize().getHeapSize()); 511 assertEquals(MutableSegment.DEEP_OVERHEAD, 512 desiredRegion.getStore(FAMILY2).getMemStoreSize().getHeapSize()); 513 assertEquals(MutableSegment.DEEP_OVERHEAD, 514 desiredRegion.getStore(FAMILY3).getMemStoreSize().getHeapSize()); 515 // let WAL cleanOldLogs 516 assertNull(getWAL(desiredRegion).rollWriter(true)); 517 TEST_UTIL.waitFor(60000, () -> getNumRolledLogFiles(desiredRegion) < maxLogs); 518 } finally { 519 TEST_UTIL.shutdownMiniCluster(); 520 } 521 } 522 523 private void doPut(Table table, long memstoreFlushSize) throws IOException, InterruptedException { 524 Region region = getRegionWithName(table.getName()).getFirst(); 525 // cf1 4B per row, cf2 40B per row and cf3 400B per row 526 byte[] qf = Bytes.toBytes("qf"); 527 for (int i = 0; i < 10000; i++) { 528 Put put = new Put(Bytes.toBytes("row-" + i)); 529 byte[] value1 = new byte[100]; 530 Bytes.random(value1); 531 put.addColumn(FAMILY1, qf, value1); 532 byte[] value2 = new byte[200]; 533 Bytes.random(value2); 534 put.addColumn(FAMILY2, qf, value2); 535 byte[] value3 = new byte[400]; 536 Bytes.random(value3); 537 put.addColumn(FAMILY3, qf, value3); 538 table.put(put); 539 // slow down to let regionserver flush region. 540 while (region.getMemStoreHeapSize() > memstoreFlushSize) { 541 Thread.sleep(100); 542 } 543 } 544 } 545 546 // Under the same write load, small stores should have less store files when 547 // percolumnfamilyflush enabled. 548 @Test 549 public void testCompareStoreFileCount() throws Exception { 550 long memstoreFlushSize = 1024L * 1024; 551 Configuration conf = TEST_UTIL.getConfiguration(); 552 conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, memstoreFlushSize); 553 conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllStoresPolicy.class.getName()); 554 conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000); 555 conf.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY, 556 ConstantSizeRegionSplitPolicy.class.getName()); 557 558 TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(TABLENAME) 559 .setCompactionEnabled(false).setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY1)) 560 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY2)) 561 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY3)).build(); 562 563 LOG.info("==============Test with selective flush disabled==============="); 564 int cf1StoreFileCount = -1; 565 int cf2StoreFileCount = -1; 566 int cf3StoreFileCount = -1; 567 int cf1StoreFileCount1 = -1; 568 int cf2StoreFileCount1 = -1; 569 int cf3StoreFileCount1 = -1; 570 try { 571 TEST_UTIL.startMiniCluster(1); 572 TEST_UTIL.getAdmin() 573 .createNamespace(NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build()); 574 TEST_UTIL.getAdmin().createTable(tableDescriptor); 575 TEST_UTIL.waitTableAvailable(TABLENAME); 576 Connection conn = ConnectionFactory.createConnection(conf); 577 Table table = conn.getTable(TABLENAME); 578 doPut(table, memstoreFlushSize); 579 table.close(); 580 conn.close(); 581 582 Region region = getRegionWithName(TABLENAME).getFirst(); 583 cf1StoreFileCount = region.getStore(FAMILY1).getStorefilesCount(); 584 cf2StoreFileCount = region.getStore(FAMILY2).getStorefilesCount(); 585 cf3StoreFileCount = region.getStore(FAMILY3).getStorefilesCount(); 586 } finally { 587 TEST_UTIL.shutdownMiniCluster(); 588 } 589 590 LOG.info("==============Test with selective flush enabled==============="); 591 conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllLargeStoresPolicy.class.getName()); 592 // default value of per-cf flush lower bound is too big, set to a small enough value 593 conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 0); 594 try { 595 TEST_UTIL.startMiniCluster(1); 596 TEST_UTIL.getAdmin() 597 .createNamespace(NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build()); 598 TEST_UTIL.getAdmin().createTable(tableDescriptor); 599 Connection conn = ConnectionFactory.createConnection(conf); 600 Table table = conn.getTable(TABLENAME); 601 doPut(table, memstoreFlushSize); 602 table.close(); 603 conn.close(); 604 605 Region region = getRegionWithName(TABLENAME).getFirst(); 606 cf1StoreFileCount1 = region.getStore(FAMILY1).getStorefilesCount(); 607 cf2StoreFileCount1 = region.getStore(FAMILY2).getStorefilesCount(); 608 cf3StoreFileCount1 = region.getStore(FAMILY3).getStorefilesCount(); 609 } finally { 610 TEST_UTIL.shutdownMiniCluster(); 611 } 612 613 LOG.info("disable selective flush: " + Bytes.toString(FAMILY1) + "=>" + cf1StoreFileCount + ", " 614 + Bytes.toString(FAMILY2) + "=>" + cf2StoreFileCount + ", " + Bytes.toString(FAMILY3) + "=>" 615 + cf3StoreFileCount); 616 LOG.info("enable selective flush: " + Bytes.toString(FAMILY1) + "=>" + cf1StoreFileCount1 + ", " 617 + Bytes.toString(FAMILY2) + "=>" + cf2StoreFileCount1 + ", " + Bytes.toString(FAMILY3) + "=>" 618 + cf3StoreFileCount1); 619 // small CF will have less store files. 620 assertTrue(cf1StoreFileCount1 < cf1StoreFileCount); 621 assertTrue(cf2StoreFileCount1 < cf2StoreFileCount); 622 } 623 624 public static void main(String[] args) throws Exception { 625 int numRegions = Integer.parseInt(args[0]); 626 long numRows = Long.parseLong(args[1]); 627 628 TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(TABLENAME) 629 .setMaxFileSize(10L * 1024 * 1024 * 1024) 630 .setValue(TableDescriptorBuilder.SPLIT_POLICY, ConstantSizeRegionSplitPolicy.class.getName()) 631 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY1)) 632 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY2)) 633 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY3)).build(); 634 635 Configuration conf = HBaseConfiguration.create(); 636 Connection conn = ConnectionFactory.createConnection(conf); 637 Admin admin = conn.getAdmin(); 638 if (admin.tableExists(TABLENAME)) { 639 admin.disableTable(TABLENAME); 640 admin.deleteTable(TABLENAME); 641 } 642 if (numRegions >= 3) { 643 byte[] startKey = new byte[16]; 644 byte[] endKey = new byte[16]; 645 Arrays.fill(endKey, (byte) 0xFF); 646 admin.createTable(tableDescriptor, startKey, endKey, numRegions); 647 } else { 648 admin.createTable(tableDescriptor); 649 } 650 admin.close(); 651 652 Table table = conn.getTable(TABLENAME); 653 byte[] qf = Bytes.toBytes("qf"); 654 byte[] value1 = new byte[16]; 655 byte[] value2 = new byte[256]; 656 byte[] value3 = new byte[4096]; 657 for (long i = 0; i < numRows; i++) { 658 Put put = new Put(Hashing.md5().hashLong(i).asBytes()); 659 Bytes.random(value1); 660 Bytes.random(value2); 661 Bytes.random(value3); 662 put.addColumn(FAMILY1, qf, value1); 663 put.addColumn(FAMILY2, qf, value2); 664 put.addColumn(FAMILY3, qf, value3); 665 table.put(put); 666 if (i % 10000 == 0) { 667 LOG.info(i + " rows put"); 668 } 669 } 670 table.close(); 671 conn.close(); 672 } 673}