001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertNotNull;
022import static org.junit.Assert.assertNull;
023import static org.junit.Assert.assertTrue;
024
025import java.io.IOException;
026import java.util.Arrays;
027import java.util.List;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.HBaseConfiguration;
032import org.apache.hadoop.hbase.HBaseTestingUtil;
033import org.apache.hadoop.hbase.HConstants;
034import org.apache.hadoop.hbase.NamespaceDescriptor;
035import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.Waiter;
038import org.apache.hadoop.hbase.client.Admin;
039import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
040import org.apache.hadoop.hbase.client.Connection;
041import org.apache.hadoop.hbase.client.ConnectionFactory;
042import org.apache.hadoop.hbase.client.Get;
043import org.apache.hadoop.hbase.client.Put;
044import org.apache.hadoop.hbase.client.RegionInfo;
045import org.apache.hadoop.hbase.client.RegionInfoBuilder;
046import org.apache.hadoop.hbase.client.Result;
047import org.apache.hadoop.hbase.client.Table;
048import org.apache.hadoop.hbase.client.TableDescriptor;
049import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
050import org.apache.hadoop.hbase.testclassification.LargeTests;
051import org.apache.hadoop.hbase.testclassification.RegionServerTests;
052import org.apache.hadoop.hbase.util.Bytes;
053import org.apache.hadoop.hbase.util.JVMClusterUtil;
054import org.apache.hadoop.hbase.util.Pair;
055import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
056import org.apache.hadoop.hbase.wal.WAL;
057import org.junit.ClassRule;
058import org.junit.Test;
059import org.junit.experimental.categories.Category;
060import org.slf4j.Logger;
061import org.slf4j.LoggerFactory;
062
063import org.apache.hbase.thirdparty.com.google.common.hash.Hashing;
064
065/**
066 * This test verifies the correctness of the Per Column Family flushing strategy
067 */
068@Category({ RegionServerTests.class, LargeTests.class })
069public class TestPerColumnFamilyFlush {
070
071  @ClassRule
072  public static final HBaseClassTestRule CLASS_RULE =
073    HBaseClassTestRule.forClass(TestPerColumnFamilyFlush.class);
074
075  private static final Logger LOG = LoggerFactory.getLogger(TestPerColumnFamilyFlush.class);
076
077  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
078
079  private static final Path DIR = TEST_UTIL.getDataTestDir("TestHRegion");
080
081  public static final TableName TABLENAME = TableName.valueOf("TestPerColumnFamilyFlush", "t1");
082
083  public static final byte[][] FAMILIES = { Bytes.toBytes("f1"), Bytes.toBytes("f2"),
084    Bytes.toBytes("f3"), Bytes.toBytes("f4"), Bytes.toBytes("f5") };
085
086  public static final byte[] FAMILY1 = FAMILIES[0];
087
088  public static final byte[] FAMILY2 = FAMILIES[1];
089
090  public static final byte[] FAMILY3 = FAMILIES[2];
091
092  private HRegion initHRegion(String callingMethod, Configuration conf) throws IOException {
093    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TABLENAME);
094    for (byte[] family : FAMILIES) {
095      builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(family));
096    }
097    RegionInfo info = RegionInfoBuilder.newBuilder(TABLENAME).build();
098    Path path = new Path(DIR, callingMethod);
099    return HBaseTestingUtil.createRegionAndWAL(info, path, conf, builder.build());
100  }
101
102  // A helper function to create puts.
103  private Put createPut(int familyNum, int putNum) {
104    byte[] qf = Bytes.toBytes("q" + familyNum);
105    byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum);
106    byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum);
107    Put p = new Put(row);
108    p.addColumn(FAMILIES[familyNum - 1], qf, val);
109    return p;
110  }
111
112  // A helper function to create puts.
113  private Get createGet(int familyNum, int putNum) {
114    byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum);
115    return new Get(row);
116  }
117
118  // A helper function to verify edits.
119  void verifyEdit(int familyNum, int putNum, Table table) throws IOException {
120    Result r = table.get(createGet(familyNum, putNum));
121    byte[] family = FAMILIES[familyNum - 1];
122    byte[] qf = Bytes.toBytes("q" + familyNum);
123    byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum);
124    assertNotNull(("Missing Put#" + putNum + " for CF# " + familyNum), r.getFamilyMap(family));
125    assertNotNull(("Missing Put#" + putNum + " for CF# " + familyNum),
126      r.getFamilyMap(family).get(qf));
127    assertTrue(("Incorrect value for Put#" + putNum + " for CF# " + familyNum),
128      Arrays.equals(r.getFamilyMap(family).get(qf), val));
129  }
130
131  @Test
132  public void testSelectiveFlushWhenEnabled() throws IOException {
133    // Set up the configuration, use new one to not conflict with minicluster in other tests
134    Configuration conf = new HBaseTestingUtil().getConfiguration();
135    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 200 * 1024);
136    conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllLargeStoresPolicy.class.getName());
137    conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 40 * 1024);
138    // Intialize the region
139    HRegion region = initHRegion("testSelectiveFlushWithDataCompaction", conf);
140    // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3
141    for (int i = 1; i <= 1200; i++) {
142      region.put(createPut(1, i));
143
144      if (i <= 100) {
145        region.put(createPut(2, i));
146        if (i <= 50) {
147          region.put(createPut(3, i));
148        }
149      }
150    }
151
152    long totalMemstoreSize = region.getMemStoreDataSize();
153
154    // Find the smallest LSNs for edits wrt to each CF.
155    long smallestSeqCF1 = region.getOldestSeqIdOfStore(FAMILY1);
156    long smallestSeqCF2 = region.getOldestSeqIdOfStore(FAMILY2);
157    long smallestSeqCF3 = region.getOldestSeqIdOfStore(FAMILY3);
158
159    // Find the sizes of the memstores of each CF.
160    MemStoreSize cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize();
161    MemStoreSize cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize();
162    MemStoreSize cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
163
164    // Get the overall smallest LSN in the region's memstores.
165    long smallestSeqInRegionCurrentMemstore =
166      getWAL(region).getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
167
168    // The overall smallest LSN in the region's memstores should be the same as
169    // the LSN of the smallest edit in CF1
170    assertEquals(smallestSeqCF1, smallestSeqInRegionCurrentMemstore);
171
172    // Some other sanity checks.
173    assertTrue(smallestSeqCF1 < smallestSeqCF2);
174    assertTrue(smallestSeqCF2 < smallestSeqCF3);
175    assertTrue(cf1MemstoreSize.getDataSize() > 0);
176    assertTrue(cf2MemstoreSize.getDataSize() > 0);
177    assertTrue(cf3MemstoreSize.getDataSize() > 0);
178
179    // The total memstore size should be the same as the sum of the sizes of
180    // memstores of CF1, CF2 and CF3.
181    assertEquals(totalMemstoreSize, cf1MemstoreSize.getDataSize() + cf2MemstoreSize.getDataSize()
182      + cf3MemstoreSize.getDataSize());
183
184    // Flush!
185    region.flush(false);
186
187    // Will use these to check if anything changed.
188    MemStoreSize oldCF2MemstoreSize = cf2MemstoreSize;
189    MemStoreSize oldCF3MemstoreSize = cf3MemstoreSize;
190
191    // Recalculate everything
192    cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize();
193    cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize();
194    cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
195    totalMemstoreSize = region.getMemStoreDataSize();
196    smallestSeqInRegionCurrentMemstore =
197      getWAL(region).getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
198
199    // We should have cleared out only CF1, since we chose the flush thresholds
200    // and number of puts accordingly.
201    assertEquals(0, cf1MemstoreSize.getDataSize());
202    assertEquals(MutableSegment.DEEP_OVERHEAD, cf1MemstoreSize.getHeapSize());
203    // Nothing should have happened to CF2, ...
204    assertEquals(cf2MemstoreSize, oldCF2MemstoreSize);
205    // ... or CF3
206    assertEquals(cf3MemstoreSize, oldCF3MemstoreSize);
207    // Now the smallest LSN in the region should be the same as the smallest
208    // LSN in the memstore of CF2.
209    assertEquals(smallestSeqInRegionCurrentMemstore, smallestSeqCF2);
210    // Of course, this should hold too.
211    assertEquals(totalMemstoreSize, cf2MemstoreSize.getDataSize() + cf3MemstoreSize.getDataSize());
212
213    // Now add more puts (mostly for CF2), so that we only flush CF2 this time.
214    for (int i = 1200; i < 2400; i++) {
215      region.put(createPut(2, i));
216
217      // Add only 100 puts for CF3
218      if (i - 1200 < 100) {
219        region.put(createPut(3, i));
220      }
221    }
222
223    // How much does the CF3 memstore occupy? Will be used later.
224    oldCF3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
225
226    // Flush again
227    region.flush(false);
228
229    // Recalculate everything
230    cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize();
231    cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize();
232    cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
233    totalMemstoreSize = region.getMemStoreDataSize();
234    smallestSeqInRegionCurrentMemstore =
235      getWAL(region).getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
236
237    // CF1 and CF2, both should be absent.
238    assertEquals(0, cf1MemstoreSize.getDataSize());
239    assertEquals(MutableSegment.DEEP_OVERHEAD, cf1MemstoreSize.getHeapSize());
240    assertEquals(0, cf2MemstoreSize.getDataSize());
241    assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSize.getHeapSize());
242    // CF3 shouldn't have been touched.
243    assertEquals(cf3MemstoreSize, oldCF3MemstoreSize);
244    assertEquals(totalMemstoreSize, cf3MemstoreSize.getDataSize());
245
246    // What happens when we hit the memstore limit, but we are not able to find
247    // any Column Family above the threshold?
248    // In that case, we should flush all the CFs.
249
250    // Clearing the existing memstores.
251    region.flush(true);
252
253    // The memstore limit is 200*1024 and the column family flush threshold is
254    // around 50*1024. We try to just hit the memstore limit with each CF's
255    // memstore being below the CF flush threshold.
256    for (int i = 1; i <= 300; i++) {
257      region.put(createPut(1, i));
258      region.put(createPut(2, i));
259      region.put(createPut(3, i));
260      region.put(createPut(4, i));
261      region.put(createPut(5, i));
262    }
263
264    region.flush(false);
265
266    // Since we won't find any CF above the threshold, and hence no specific
267    // store to flush, we should flush all the memstores.
268    assertEquals(0, region.getMemStoreDataSize());
269    HBaseTestingUtil.closeRegionAndWAL(region);
270  }
271
272  @Test
273  public void testSelectiveFlushWhenNotEnabled() throws IOException {
274    // Set up the configuration, use new one to not conflict with minicluster in other tests
275    Configuration conf = new HBaseTestingUtil().getConfiguration();
276    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 200 * 1024);
277    conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllStoresPolicy.class.getName());
278
279    // Intialize the HRegion
280    HRegion region = initHRegion("testSelectiveFlushWhenNotEnabled", conf);
281    // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3
282    for (int i = 1; i <= 1200; i++) {
283      region.put(createPut(1, i));
284      if (i <= 100) {
285        region.put(createPut(2, i));
286        if (i <= 50) {
287          region.put(createPut(3, i));
288        }
289      }
290    }
291
292    long totalMemstoreSize = region.getMemStoreDataSize();
293
294    // Find the sizes of the memstores of each CF.
295    MemStoreSize cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize();
296    MemStoreSize cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize();
297    MemStoreSize cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
298
299    // Some other sanity checks.
300    assertTrue(cf1MemstoreSize.getDataSize() > 0);
301    assertTrue(cf2MemstoreSize.getDataSize() > 0);
302    assertTrue(cf3MemstoreSize.getDataSize() > 0);
303
304    // The total memstore size should be the same as the sum of the sizes of
305    // memstores of CF1, CF2 and CF3.
306    assertEquals(totalMemstoreSize, cf1MemstoreSize.getDataSize() + cf2MemstoreSize.getDataSize()
307      + cf3MemstoreSize.getDataSize());
308
309    // Flush!
310    region.flush(false);
311
312    cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize();
313    cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize();
314    cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
315    totalMemstoreSize = region.getMemStoreDataSize();
316    long smallestSeqInRegionCurrentMemstore =
317      region.getWAL().getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
318
319    // Everything should have been cleared
320    assertEquals(0, cf1MemstoreSize.getDataSize());
321    assertEquals(MutableSegment.DEEP_OVERHEAD, cf1MemstoreSize.getHeapSize());
322    assertEquals(0, cf2MemstoreSize.getDataSize());
323    assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSize.getHeapSize());
324    assertEquals(0, cf3MemstoreSize.getDataSize());
325    assertEquals(MutableSegment.DEEP_OVERHEAD, cf3MemstoreSize.getHeapSize());
326    assertEquals(0, totalMemstoreSize);
327    assertEquals(HConstants.NO_SEQNUM, smallestSeqInRegionCurrentMemstore);
328    HBaseTestingUtil.closeRegionAndWAL(region);
329  }
330
331  // Find the (first) region which has the specified name.
332  private static Pair<HRegion, HRegionServer> getRegionWithName(TableName tableName) {
333    SingleProcessHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster();
334    List<JVMClusterUtil.RegionServerThread> rsts = cluster.getRegionServerThreads();
335    for (int i = 0; i < cluster.getRegionServerThreads().size(); i++) {
336      HRegionServer hrs = rsts.get(i).getRegionServer();
337      for (HRegion region : hrs.getRegions(tableName)) {
338        return Pair.newPair(region, hrs);
339      }
340    }
341    return null;
342  }
343
344  private void doTestLogReplay() throws Exception {
345    Configuration conf = TEST_UTIL.getConfiguration();
346    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 10000);
347    // Carefully chosen limits so that the memstore just flushes when we're done
348    conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllLargeStoresPolicy.class.getName());
349    conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 2500);
350    final int numRegionServers = 4;
351    try {
352      TEST_UTIL.startMiniCluster(numRegionServers);
353      TEST_UTIL.getAdmin()
354        .createNamespace(NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build());
355      Table table = TEST_UTIL.createTable(TABLENAME, FAMILIES);
356
357      // Add 100 edits for CF1, 20 for CF2, 20 for CF3.
358      // These will all be interleaved in the log.
359      for (int i = 1; i <= 80; i++) {
360        table.put(createPut(1, i));
361        if (i <= 10) {
362          table.put(createPut(2, i));
363          table.put(createPut(3, i));
364        }
365      }
366      Thread.sleep(1000);
367
368      Pair<HRegion, HRegionServer> desiredRegionAndServer = getRegionWithName(TABLENAME);
369      HRegion desiredRegion = desiredRegionAndServer.getFirst();
370      assertTrue("Could not find a region which hosts the new region.", desiredRegion != null);
371
372      // Flush the region selectively.
373      desiredRegion.flush(false);
374
375      long totalMemstoreSize;
376      long cf1MemstoreSize, cf2MemstoreSize, cf3MemstoreSize;
377      totalMemstoreSize = desiredRegion.getMemStoreDataSize();
378
379      // Find the sizes of the memstores of each CF.
380      cf1MemstoreSize = desiredRegion.getStore(FAMILY1).getMemStoreSize().getDataSize();
381      cf2MemstoreSize = desiredRegion.getStore(FAMILY2).getMemStoreSize().getDataSize();
382      cf3MemstoreSize = desiredRegion.getStore(FAMILY3).getMemStoreSize().getDataSize();
383
384      // CF1 Should have been flushed
385      assertEquals(0, cf1MemstoreSize);
386      // CF2 and CF3 shouldn't have been flushed.
387      // TODO: This test doesn't allow for this case:
388      // " Since none of the CFs were above the size, flushing all."
389      // i.e. a flush happens before we get to here and its a flush-all.
390      assertTrue(cf2MemstoreSize >= 0);
391      assertTrue(cf3MemstoreSize >= 0);
392      assertEquals(totalMemstoreSize, cf2MemstoreSize + cf3MemstoreSize);
393
394      // Wait for the RS report to go across to the master, so that the master
395      // is aware of which sequence ids have been flushed, before we kill the RS.
396      // If in production, the RS dies before the report goes across, we will
397      // safely replay all the edits.
398      Thread.sleep(2000);
399
400      // Abort the region server where we have the region hosted.
401      HRegionServer rs = desiredRegionAndServer.getSecond();
402      rs.abort("testing");
403
404      // The aborted region server's regions will be eventually assigned to some
405      // other region server, and the get RPC call (inside verifyEdit()) will
406      // retry for some time till the regions come back up.
407
408      // Verify that all the edits are safe.
409      for (int i = 1; i <= 80; i++) {
410        verifyEdit(1, i, table);
411        if (i <= 10) {
412          verifyEdit(2, i, table);
413          verifyEdit(3, i, table);
414        }
415      }
416    } finally {
417      TEST_UTIL.shutdownMiniCluster();
418    }
419  }
420
421  // Test Log Replay with Distributed log split on.
422  @Test
423  public void testLogReplayWithDistributedLogSplit() throws Exception {
424    doTestLogReplay();
425  }
426
427  private WAL getWAL(Region region) {
428    return ((HRegion) region).getWAL();
429  }
430
431  private int getNumRolledLogFiles(Region region) {
432    return AbstractFSWALProvider.getNumRolledLogFiles(getWAL(region));
433  }
434
435  /**
436   * When a log roll is about to happen, we do a flush of the regions who will be affected by the
437   * log roll. These flushes cannot be a selective flushes, otherwise we cannot roll the logs. This
438   * test ensures that we do a full-flush in that scenario.
439   */
440  @Test
441  public void testFlushingWhenLogRolling() throws Exception {
442    TableName tableName = TableName.valueOf("testFlushingWhenLogRolling");
443    Configuration conf = TEST_UTIL.getConfiguration();
444    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 128 * 1024 * 1024);
445    conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllLargeStoresPolicy.class.getName());
446    long cfFlushSizeLowerBound = 2048;
447    conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN,
448      cfFlushSizeLowerBound);
449
450    // One hour, prevent periodic rolling
451    conf.setLong("hbase.regionserver.logroll.period", 60L * 60 * 1000);
452    // prevent rolling by size
453    conf.setLong("hbase.regionserver.hlog.blocksize", 128L * 1024 * 1024);
454    // Make it 10 as max logs before a flush comes on.
455    final int maxLogs = 10;
456    conf.setInt("hbase.regionserver.maxlogs", maxLogs);
457
458    final int numRegionServers = 1;
459    TEST_UTIL.startMiniCluster(numRegionServers);
460    try {
461      Table table = TEST_UTIL.createTable(tableName, FAMILIES);
462      Pair<HRegion, HRegionServer> desiredRegionAndServer = getRegionWithName(tableName);
463      final HRegion desiredRegion = desiredRegionAndServer.getFirst();
464      assertTrue("Could not find a region which hosts the new region.", desiredRegion != null);
465      LOG.info("Writing to region=" + desiredRegion);
466
467      // Add one row for both CFs.
468      for (int i = 1; i <= 3; i++) {
469        table.put(createPut(i, 0));
470      }
471      // Now only add row to CF1, make sure when we force a flush, CF1 is larger than the lower
472      // bound and CF2 and CF3 are smaller than the lower bound.
473      for (int i = 0; i < maxLogs; i++) {
474        for (int j = 0; j < 100; j++) {
475          table.put(createPut(1, i * 100 + j));
476        }
477        // Roll the WAL. The log file count is less than maxLogs so no flush is triggered.
478        int currentNumRolledLogFiles = getNumRolledLogFiles(desiredRegion);
479        assertNull(getWAL(desiredRegion).rollWriter());
480        TEST_UTIL.waitFor(60000,
481          () -> getNumRolledLogFiles(desiredRegion) > currentNumRolledLogFiles);
482      }
483      assertEquals(maxLogs, getNumRolledLogFiles(desiredRegion));
484      assertTrue(
485        desiredRegion.getStore(FAMILY1).getMemStoreSize().getHeapSize() > cfFlushSizeLowerBound);
486      assertTrue(
487        desiredRegion.getStore(FAMILY2).getMemStoreSize().getHeapSize() < cfFlushSizeLowerBound);
488      assertTrue(
489        desiredRegion.getStore(FAMILY3).getMemStoreSize().getHeapSize() < cfFlushSizeLowerBound);
490      table.put(createPut(1, 12345678));
491      // Make numRolledLogFiles greater than maxLogs
492      desiredRegionAndServer.getSecond().getWalRoller().requestRollAll();
493      // Wait for some time till the flush caused by log rolling happens.
494      TEST_UTIL.waitFor(30000, new Waiter.ExplainingPredicate<Exception>() {
495
496        @Override
497        public boolean evaluate() throws Exception {
498          return desiredRegion.getMemStoreDataSize() == 0;
499        }
500
501        @Override
502        public String explainFailure() throws Exception {
503          long memstoreSize = desiredRegion.getMemStoreDataSize();
504          if (memstoreSize > 0) {
505            return "Still have unflushed entries in memstore, memstore size is " + memstoreSize;
506          }
507          return "Unknown";
508        }
509      });
510      LOG.info("Finished waiting on flush after too many WALs...");
511      // Individual families should have been flushed.
512      assertEquals(MutableSegment.DEEP_OVERHEAD,
513        desiredRegion.getStore(FAMILY1).getMemStoreSize().getHeapSize());
514      assertEquals(MutableSegment.DEEP_OVERHEAD,
515        desiredRegion.getStore(FAMILY2).getMemStoreSize().getHeapSize());
516      assertEquals(MutableSegment.DEEP_OVERHEAD,
517        desiredRegion.getStore(FAMILY3).getMemStoreSize().getHeapSize());
518      // let WAL cleanOldLogs
519      assertNull(getWAL(desiredRegion).rollWriter(true));
520      TEST_UTIL.waitFor(60000, () -> getNumRolledLogFiles(desiredRegion) < maxLogs);
521    } finally {
522      TEST_UTIL.shutdownMiniCluster();
523    }
524  }
525
526  private void doPut(Table table, long memstoreFlushSize) throws IOException, InterruptedException {
527    Region region = getRegionWithName(table.getName()).getFirst();
528    // cf1 4B per row, cf2 40B per row and cf3 400B per row
529    byte[] qf = Bytes.toBytes("qf");
530    for (int i = 0; i < 10000; i++) {
531      Put put = new Put(Bytes.toBytes("row-" + i));
532      byte[] value1 = new byte[100];
533      Bytes.random(value1);
534      put.addColumn(FAMILY1, qf, value1);
535      byte[] value2 = new byte[200];
536      Bytes.random(value2);
537      put.addColumn(FAMILY2, qf, value2);
538      byte[] value3 = new byte[400];
539      Bytes.random(value3);
540      put.addColumn(FAMILY3, qf, value3);
541      table.put(put);
542      // slow down to let regionserver flush region.
543      while (region.getMemStoreHeapSize() > memstoreFlushSize) {
544        Thread.sleep(100);
545      }
546    }
547  }
548
549  // Under the same write load, small stores should have less store files when
550  // percolumnfamilyflush enabled.
551  @Test
552  public void testCompareStoreFileCount() throws Exception {
553    long memstoreFlushSize = 1024L * 1024;
554    Configuration conf = TEST_UTIL.getConfiguration();
555    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, memstoreFlushSize);
556    conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllStoresPolicy.class.getName());
557    conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000);
558    conf.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY,
559      ConstantSizeRegionSplitPolicy.class.getName());
560
561    TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(TABLENAME)
562      .setCompactionEnabled(false).setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY1))
563      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY2))
564      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY3)).build();
565
566    LOG.info("==============Test with selective flush disabled===============");
567    int cf1StoreFileCount = -1;
568    int cf2StoreFileCount = -1;
569    int cf3StoreFileCount = -1;
570    int cf1StoreFileCount1 = -1;
571    int cf2StoreFileCount1 = -1;
572    int cf3StoreFileCount1 = -1;
573    try {
574      TEST_UTIL.startMiniCluster(1);
575      TEST_UTIL.getAdmin()
576        .createNamespace(NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build());
577      TEST_UTIL.getAdmin().createTable(tableDescriptor);
578      TEST_UTIL.waitTableAvailable(TABLENAME);
579      Connection conn = ConnectionFactory.createConnection(conf);
580      Table table = conn.getTable(TABLENAME);
581      doPut(table, memstoreFlushSize);
582      table.close();
583      conn.close();
584
585      Region region = getRegionWithName(TABLENAME).getFirst();
586      cf1StoreFileCount = region.getStore(FAMILY1).getStorefilesCount();
587      cf2StoreFileCount = region.getStore(FAMILY2).getStorefilesCount();
588      cf3StoreFileCount = region.getStore(FAMILY3).getStorefilesCount();
589    } finally {
590      TEST_UTIL.shutdownMiniCluster();
591    }
592
593    LOG.info("==============Test with selective flush enabled===============");
594    conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllLargeStoresPolicy.class.getName());
595    // default value of per-cf flush lower bound is too big, set to a small enough value
596    conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 0);
597    try {
598      TEST_UTIL.startMiniCluster(1);
599      TEST_UTIL.getAdmin()
600        .createNamespace(NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build());
601      TEST_UTIL.getAdmin().createTable(tableDescriptor);
602      Connection conn = ConnectionFactory.createConnection(conf);
603      Table table = conn.getTable(TABLENAME);
604      doPut(table, memstoreFlushSize);
605      table.close();
606      conn.close();
607
608      Region region = getRegionWithName(TABLENAME).getFirst();
609      cf1StoreFileCount1 = region.getStore(FAMILY1).getStorefilesCount();
610      cf2StoreFileCount1 = region.getStore(FAMILY2).getStorefilesCount();
611      cf3StoreFileCount1 = region.getStore(FAMILY3).getStorefilesCount();
612    } finally {
613      TEST_UTIL.shutdownMiniCluster();
614    }
615
616    LOG.info("disable selective flush: " + Bytes.toString(FAMILY1) + "=>" + cf1StoreFileCount + ", "
617      + Bytes.toString(FAMILY2) + "=>" + cf2StoreFileCount + ", " + Bytes.toString(FAMILY3) + "=>"
618      + cf3StoreFileCount);
619    LOG.info("enable selective flush: " + Bytes.toString(FAMILY1) + "=>" + cf1StoreFileCount1 + ", "
620      + Bytes.toString(FAMILY2) + "=>" + cf2StoreFileCount1 + ", " + Bytes.toString(FAMILY3) + "=>"
621      + cf3StoreFileCount1);
622    // small CF will have less store files.
623    assertTrue(cf1StoreFileCount1 < cf1StoreFileCount);
624    assertTrue(cf2StoreFileCount1 < cf2StoreFileCount);
625  }
626
627  public static void main(String[] args) throws Exception {
628    int numRegions = Integer.parseInt(args[0]);
629    long numRows = Long.parseLong(args[1]);
630
631    TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(TABLENAME)
632      .setMaxFileSize(10L * 1024 * 1024 * 1024)
633      .setValue(TableDescriptorBuilder.SPLIT_POLICY, ConstantSizeRegionSplitPolicy.class.getName())
634      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY1))
635      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY2))
636      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY3)).build();
637
638    Configuration conf = HBaseConfiguration.create();
639    Connection conn = ConnectionFactory.createConnection(conf);
640    Admin admin = conn.getAdmin();
641    if (admin.tableExists(TABLENAME)) {
642      admin.disableTable(TABLENAME);
643      admin.deleteTable(TABLENAME);
644    }
645    if (numRegions >= 3) {
646      byte[] startKey = new byte[16];
647      byte[] endKey = new byte[16];
648      Arrays.fill(endKey, (byte) 0xFF);
649      admin.createTable(tableDescriptor, startKey, endKey, numRegions);
650    } else {
651      admin.createTable(tableDescriptor);
652    }
653    admin.close();
654
655    Table table = conn.getTable(TABLENAME);
656    byte[] qf = Bytes.toBytes("qf");
657    byte[] value1 = new byte[16];
658    byte[] value2 = new byte[256];
659    byte[] value3 = new byte[4096];
660    for (long i = 0; i < numRows; i++) {
661      Put put = new Put(Hashing.md5().hashLong(i).asBytes());
662      Bytes.random(value1);
663      Bytes.random(value2);
664      Bytes.random(value3);
665      put.addColumn(FAMILY1, qf, value1);
666      put.addColumn(FAMILY2, qf, value2);
667      put.addColumn(FAMILY3, qf, value3);
668      table.put(put);
669      if (i % 10000 == 0) {
670        LOG.info(i + " rows put");
671      }
672    }
673    table.close();
674    conn.close();
675  }
676}