001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertNotNull;
022import static org.junit.jupiter.api.Assertions.assertNull;
023import static org.junit.jupiter.api.Assertions.assertTrue;
024
025import java.io.IOException;
026import java.util.Arrays;
027import java.util.List;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.hbase.HBaseConfiguration;
031import org.apache.hadoop.hbase.HBaseTestingUtil;
032import org.apache.hadoop.hbase.HConstants;
033import org.apache.hadoop.hbase.NamespaceDescriptor;
034import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.Waiter;
037import org.apache.hadoop.hbase.client.Admin;
038import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
039import org.apache.hadoop.hbase.client.Connection;
040import org.apache.hadoop.hbase.client.ConnectionFactory;
041import org.apache.hadoop.hbase.client.Get;
042import org.apache.hadoop.hbase.client.Put;
043import org.apache.hadoop.hbase.client.RegionInfo;
044import org.apache.hadoop.hbase.client.RegionInfoBuilder;
045import org.apache.hadoop.hbase.client.Result;
046import org.apache.hadoop.hbase.client.Table;
047import org.apache.hadoop.hbase.client.TableDescriptor;
048import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
049import org.apache.hadoop.hbase.regionserver.wal.AbstractTestFSWAL;
050import org.apache.hadoop.hbase.testclassification.LargeTests;
051import org.apache.hadoop.hbase.testclassification.RegionServerTests;
052import org.apache.hadoop.hbase.util.Bytes;
053import org.apache.hadoop.hbase.util.JVMClusterUtil;
054import org.apache.hadoop.hbase.util.Pair;
055import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
056import org.apache.hadoop.hbase.wal.WAL;
057import org.junit.jupiter.api.Tag;
058import org.junit.jupiter.api.Test;
059import org.slf4j.Logger;
060import org.slf4j.LoggerFactory;
061
062import org.apache.hbase.thirdparty.com.google.common.hash.Hashing;
063
064/**
065 * This test verifies the correctness of the Per Column Family flushing strategy
066 */
067@Tag(RegionServerTests.TAG)
068@Tag(LargeTests.TAG)
069public class TestPerColumnFamilyFlush {
070
071  private static final Logger LOG = LoggerFactory.getLogger(TestPerColumnFamilyFlush.class);
072
073  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
074
075  private static final Path DIR = TEST_UTIL.getDataTestDir("TestHRegion");
076
077  public static final TableName TABLENAME = TableName.valueOf("TestPerColumnFamilyFlush", "t1");
078
079  public static final byte[][] FAMILIES = { Bytes.toBytes("f1"), Bytes.toBytes("f2"),
080    Bytes.toBytes("f3"), Bytes.toBytes("f4"), Bytes.toBytes("f5") };
081
082  public static final byte[] FAMILY1 = FAMILIES[0];
083
084  public static final byte[] FAMILY2 = FAMILIES[1];
085
086  public static final byte[] FAMILY3 = FAMILIES[2];
087
088  private HRegion initHRegion(String callingMethod, Configuration conf) throws IOException {
089    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TABLENAME);
090    for (byte[] family : FAMILIES) {
091      builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(family));
092    }
093    RegionInfo info = RegionInfoBuilder.newBuilder(TABLENAME).build();
094    Path path = new Path(DIR, callingMethod);
095    return HBaseTestingUtil.createRegionAndWAL(info, path, conf, builder.build());
096  }
097
098  // A helper function to create puts.
099  private Put createPut(int familyNum, int putNum) {
100    byte[] qf = Bytes.toBytes("q" + familyNum);
101    byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum);
102    byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum);
103    Put p = new Put(row);
104    p.addColumn(FAMILIES[familyNum - 1], qf, val);
105    return p;
106  }
107
108  // A helper function to create puts.
109  private Get createGet(int familyNum, int putNum) {
110    byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum);
111    return new Get(row);
112  }
113
114  // A helper function to verify edits.
115  void verifyEdit(int familyNum, int putNum, Table table) throws IOException {
116    Result r = table.get(createGet(familyNum, putNum));
117    byte[] family = FAMILIES[familyNum - 1];
118    byte[] qf = Bytes.toBytes("q" + familyNum);
119    byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum);
120    assertNotNull(r.getFamilyMap(family), "Missing Put#" + putNum + " for CF# " + familyNum);
121    assertNotNull(r.getFamilyMap(family).get(qf),
122      "Missing Put#" + putNum + " for CF# " + familyNum);
123    assertTrue(Arrays.equals(r.getFamilyMap(family).get(qf), val),
124      "Incorrect value for Put#" + putNum + " for CF# " + familyNum);
125  }
126
127  @Test
128  public void testSelectiveFlushWhenEnabled() throws IOException {
129    // Set up the configuration, use new one to not conflict with minicluster in other tests
130    Configuration conf = new HBaseTestingUtil().getConfiguration();
131    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 200 * 1024);
132    conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllLargeStoresPolicy.class.getName());
133    conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 40 * 1024);
134    // Intialize the region
135    HRegion region = initHRegion("testSelectiveFlushWithDataCompaction", conf);
136    // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3
137    for (int i = 1; i <= 1200; i++) {
138      region.put(createPut(1, i));
139
140      if (i <= 100) {
141        region.put(createPut(2, i));
142        if (i <= 50) {
143          region.put(createPut(3, i));
144        }
145      }
146    }
147
148    long totalMemstoreSize = region.getMemStoreDataSize();
149
150    // Find the smallest LSNs for edits wrt to each CF.
151    long smallestSeqCF1 = region.getOldestSeqIdOfStore(FAMILY1);
152    long smallestSeqCF2 = region.getOldestSeqIdOfStore(FAMILY2);
153    long smallestSeqCF3 = region.getOldestSeqIdOfStore(FAMILY3);
154
155    // Find the sizes of the memstores of each CF.
156    MemStoreSize cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize();
157    MemStoreSize cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize();
158    MemStoreSize cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
159
160    // Get the overall smallest LSN in the region's memstores.
161    long smallestSeqInRegionCurrentMemstore = AbstractTestFSWAL
162      .getEarliestMemStoreSeqNum(getWAL(region), region.getRegionInfo().getEncodedNameAsBytes());
163
164    // The overall smallest LSN in the region's memstores should be the same as
165    // the LSN of the smallest edit in CF1
166    assertEquals(smallestSeqCF1, smallestSeqInRegionCurrentMemstore);
167
168    // Some other sanity checks.
169    assertTrue(smallestSeqCF1 < smallestSeqCF2);
170    assertTrue(smallestSeqCF2 < smallestSeqCF3);
171    assertTrue(cf1MemstoreSize.getDataSize() > 0);
172    assertTrue(cf2MemstoreSize.getDataSize() > 0);
173    assertTrue(cf3MemstoreSize.getDataSize() > 0);
174
175    // The total memstore size should be the same as the sum of the sizes of
176    // memstores of CF1, CF2 and CF3.
177    assertEquals(totalMemstoreSize, cf1MemstoreSize.getDataSize() + cf2MemstoreSize.getDataSize()
178      + cf3MemstoreSize.getDataSize());
179
180    // Flush!
181    region.flush(false);
182
183    // Will use these to check if anything changed.
184    MemStoreSize oldCF2MemstoreSize = cf2MemstoreSize;
185    MemStoreSize oldCF3MemstoreSize = cf3MemstoreSize;
186
187    // Recalculate everything
188    cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize();
189    cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize();
190    cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
191    totalMemstoreSize = region.getMemStoreDataSize();
192    smallestSeqInRegionCurrentMemstore = AbstractTestFSWAL.getEarliestMemStoreSeqNum(getWAL(region),
193      region.getRegionInfo().getEncodedNameAsBytes());
194
195    // We should have cleared out only CF1, since we chose the flush thresholds
196    // and number of puts accordingly.
197    assertEquals(0, cf1MemstoreSize.getDataSize());
198    assertEquals(MutableSegment.DEEP_OVERHEAD, cf1MemstoreSize.getHeapSize());
199    // Nothing should have happened to CF2, ...
200    assertEquals(cf2MemstoreSize, oldCF2MemstoreSize);
201    // ... or CF3
202    assertEquals(cf3MemstoreSize, oldCF3MemstoreSize);
203    // Now the smallest LSN in the region should be the same as the smallest
204    // LSN in the memstore of CF2.
205    assertEquals(smallestSeqInRegionCurrentMemstore, smallestSeqCF2);
206    // Of course, this should hold too.
207    assertEquals(totalMemstoreSize, cf2MemstoreSize.getDataSize() + cf3MemstoreSize.getDataSize());
208
209    // Now add more puts (mostly for CF2), so that we only flush CF2 this time.
210    for (int i = 1200; i < 2400; i++) {
211      region.put(createPut(2, i));
212
213      // Add only 100 puts for CF3
214      if (i - 1200 < 100) {
215        region.put(createPut(3, i));
216      }
217    }
218
219    // How much does the CF3 memstore occupy? Will be used later.
220    oldCF3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
221
222    // Flush again
223    region.flush(false);
224
225    // Recalculate everything
226    cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize();
227    cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize();
228    cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
229    totalMemstoreSize = region.getMemStoreDataSize();
230    smallestSeqInRegionCurrentMemstore = AbstractTestFSWAL.getEarliestMemStoreSeqNum(getWAL(region),
231      region.getRegionInfo().getEncodedNameAsBytes());
232
233    // CF1 and CF2, both should be absent.
234    assertEquals(0, cf1MemstoreSize.getDataSize());
235    assertEquals(MutableSegment.DEEP_OVERHEAD, cf1MemstoreSize.getHeapSize());
236    assertEquals(0, cf2MemstoreSize.getDataSize());
237    assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSize.getHeapSize());
238    // CF3 shouldn't have been touched.
239    assertEquals(cf3MemstoreSize, oldCF3MemstoreSize);
240    assertEquals(totalMemstoreSize, cf3MemstoreSize.getDataSize());
241    assertEquals(smallestSeqInRegionCurrentMemstore, smallestSeqCF3);
242
243    // What happens when we hit the memstore limit, but we are not able to find
244    // any Column Family above the threshold?
245    // In that case, we should flush all the CFs.
246
247    // Clearing the existing memstores.
248    region.flush(true);
249
250    // The memstore limit is 200*1024 and the column family flush threshold is
251    // around 50*1024. We try to just hit the memstore limit with each CF's
252    // memstore being below the CF flush threshold.
253    for (int i = 1; i <= 300; i++) {
254      region.put(createPut(1, i));
255      region.put(createPut(2, i));
256      region.put(createPut(3, i));
257      region.put(createPut(4, i));
258      region.put(createPut(5, i));
259    }
260
261    region.flush(false);
262
263    // Since we won't find any CF above the threshold, and hence no specific
264    // store to flush, we should flush all the memstores.
265    assertEquals(0, region.getMemStoreDataSize());
266    HBaseTestingUtil.closeRegionAndWAL(region);
267  }
268
269  @Test
270  public void testSelectiveFlushWhenNotEnabled() throws IOException {
271    // Set up the configuration, use new one to not conflict with minicluster in other tests
272    Configuration conf = new HBaseTestingUtil().getConfiguration();
273    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 200 * 1024);
274    conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllStoresPolicy.class.getName());
275
276    // Intialize the HRegion
277    HRegion region = initHRegion("testSelectiveFlushWhenNotEnabled", conf);
278    // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3
279    for (int i = 1; i <= 1200; i++) {
280      region.put(createPut(1, i));
281      if (i <= 100) {
282        region.put(createPut(2, i));
283        if (i <= 50) {
284          region.put(createPut(3, i));
285        }
286      }
287    }
288
289    long totalMemstoreSize = region.getMemStoreDataSize();
290
291    // Find the sizes of the memstores of each CF.
292    MemStoreSize cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize();
293    MemStoreSize cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize();
294    MemStoreSize cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
295
296    // Some other sanity checks.
297    assertTrue(cf1MemstoreSize.getDataSize() > 0);
298    assertTrue(cf2MemstoreSize.getDataSize() > 0);
299    assertTrue(cf3MemstoreSize.getDataSize() > 0);
300
301    // The total memstore size should be the same as the sum of the sizes of
302    // memstores of CF1, CF2 and CF3.
303    assertEquals(totalMemstoreSize, cf1MemstoreSize.getDataSize() + cf2MemstoreSize.getDataSize()
304      + cf3MemstoreSize.getDataSize());
305
306    // Flush!
307    region.flush(false);
308
309    cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize();
310    cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize();
311    cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
312    totalMemstoreSize = region.getMemStoreDataSize();
313    long smallestSeqInRegionCurrentMemstore = AbstractTestFSWAL
314      .getEarliestMemStoreSeqNum(region.getWAL(), region.getRegionInfo().getEncodedNameAsBytes());
315
316    // Everything should have been cleared
317    assertEquals(0, cf1MemstoreSize.getDataSize());
318    assertEquals(MutableSegment.DEEP_OVERHEAD, cf1MemstoreSize.getHeapSize());
319    assertEquals(0, cf2MemstoreSize.getDataSize());
320    assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSize.getHeapSize());
321    assertEquals(0, cf3MemstoreSize.getDataSize());
322    assertEquals(MutableSegment.DEEP_OVERHEAD, cf3MemstoreSize.getHeapSize());
323    assertEquals(0, totalMemstoreSize);
324    assertEquals(HConstants.NO_SEQNUM, smallestSeqInRegionCurrentMemstore);
325    HBaseTestingUtil.closeRegionAndWAL(region);
326  }
327
328  // Find the (first) region which has the specified name.
329  private static Pair<HRegion, HRegionServer> getRegionWithName(TableName tableName) {
330    SingleProcessHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster();
331    List<JVMClusterUtil.RegionServerThread> rsts = cluster.getRegionServerThreads();
332    for (int i = 0; i < cluster.getRegionServerThreads().size(); i++) {
333      HRegionServer hrs = rsts.get(i).getRegionServer();
334      for (HRegion region : hrs.getRegions(tableName)) {
335        return Pair.newPair(region, hrs);
336      }
337    }
338    return null;
339  }
340
341  private void doTestLogReplay() throws Exception {
342    Configuration conf = TEST_UTIL.getConfiguration();
343    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 10000);
344    // Carefully chosen limits so that the memstore just flushes when we're done
345    conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllLargeStoresPolicy.class.getName());
346    conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 2500);
347    final int numRegionServers = 4;
348    try {
349      TEST_UTIL.startMiniCluster(numRegionServers);
350      TEST_UTIL.getAdmin()
351        .createNamespace(NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build());
352      Table table = TEST_UTIL.createTable(TABLENAME, FAMILIES);
353
354      // Add 100 edits for CF1, 20 for CF2, 20 for CF3.
355      // These will all be interleaved in the log.
356      for (int i = 1; i <= 80; i++) {
357        table.put(createPut(1, i));
358        if (i <= 10) {
359          table.put(createPut(2, i));
360          table.put(createPut(3, i));
361        }
362      }
363      Thread.sleep(1000);
364
365      Pair<HRegion, HRegionServer> desiredRegionAndServer = getRegionWithName(TABLENAME);
366      HRegion desiredRegion = desiredRegionAndServer.getFirst();
367      assertNotNull(desiredRegion, "Could not find a region which hosts the new region.");
368
369      // Flush the region selectively.
370      desiredRegion.flush(false);
371
372      long totalMemstoreSize;
373      long cf1MemstoreSize, cf2MemstoreSize, cf3MemstoreSize;
374      totalMemstoreSize = desiredRegion.getMemStoreDataSize();
375
376      // Find the sizes of the memstores of each CF.
377      cf1MemstoreSize = desiredRegion.getStore(FAMILY1).getMemStoreSize().getDataSize();
378      cf2MemstoreSize = desiredRegion.getStore(FAMILY2).getMemStoreSize().getDataSize();
379      cf3MemstoreSize = desiredRegion.getStore(FAMILY3).getMemStoreSize().getDataSize();
380
381      // CF1 Should have been flushed
382      assertEquals(0, cf1MemstoreSize);
383      // CF2 and CF3 shouldn't have been flushed.
384      // TODO: This test doesn't allow for this case:
385      // " Since none of the CFs were above the size, flushing all."
386      // i.e. a flush happens before we get to here and its a flush-all.
387      assertTrue(cf2MemstoreSize >= 0);
388      assertTrue(cf3MemstoreSize >= 0);
389      assertEquals(totalMemstoreSize, cf2MemstoreSize + cf3MemstoreSize);
390
391      // Wait for the RS report to go across to the master, so that the master
392      // is aware of which sequence ids have been flushed, before we kill the RS.
393      // If in production, the RS dies before the report goes across, we will
394      // safely replay all the edits.
395      Thread.sleep(2000);
396
397      // Abort the region server where we have the region hosted.
398      HRegionServer rs = desiredRegionAndServer.getSecond();
399      rs.abort("testing");
400
401      // The aborted region server's regions will be eventually assigned to some
402      // other region server, and the get RPC call (inside verifyEdit()) will
403      // retry for some time till the regions come back up.
404
405      // Verify that all the edits are safe.
406      for (int i = 1; i <= 80; i++) {
407        verifyEdit(1, i, table);
408        if (i <= 10) {
409          verifyEdit(2, i, table);
410          verifyEdit(3, i, table);
411        }
412      }
413    } finally {
414      TEST_UTIL.shutdownMiniCluster();
415    }
416  }
417
418  // Test Log Replay with Distributed log split on.
419  @Test
420  public void testLogReplayWithDistributedLogSplit() throws Exception {
421    doTestLogReplay();
422  }
423
424  private WAL getWAL(Region region) {
425    return ((HRegion) region).getWAL();
426  }
427
428  private int getNumRolledLogFiles(Region region) {
429    return AbstractFSWALProvider.getNumRolledLogFiles(getWAL(region));
430  }
431
432  /**
433   * When a log roll is about to happen, we do a flush of the regions who will be affected by the
434   * log roll. These flushes cannot be a selective flushes, otherwise we cannot roll the logs. This
435   * test ensures that we do a full-flush in that scenario.
436   */
437  @Test
438  public void testFlushingWhenLogRolling() throws Exception {
439    TableName tableName = TableName.valueOf("testFlushingWhenLogRolling");
440    Configuration conf = TEST_UTIL.getConfiguration();
441    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 128 * 1024 * 1024);
442    conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllLargeStoresPolicy.class.getName());
443    long cfFlushSizeLowerBound = 2048;
444    conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN,
445      cfFlushSizeLowerBound);
446
447    // One hour, prevent periodic rolling
448    conf.setLong("hbase.regionserver.logroll.period", 60L * 60 * 1000);
449    // prevent rolling by size
450    conf.setLong("hbase.regionserver.hlog.blocksize", 128L * 1024 * 1024);
451    // Make it 10 as max logs before a flush comes on.
452    final int maxLogs = 10;
453    conf.setInt("hbase.regionserver.maxlogs", maxLogs);
454
455    final int numRegionServers = 1;
456    TEST_UTIL.startMiniCluster(numRegionServers);
457    try {
458      Table table = TEST_UTIL.createTable(tableName, FAMILIES);
459      Pair<HRegion, HRegionServer> desiredRegionAndServer = getRegionWithName(tableName);
460      final HRegion desiredRegion = desiredRegionAndServer.getFirst();
461      assertNotNull(desiredRegion, "Could not find a region which hosts the new region.");
462      LOG.info("Writing to region=" + desiredRegion);
463
464      // Add one row for both CFs.
465      for (int i = 1; i <= 3; i++) {
466        table.put(createPut(i, 0));
467      }
468      // Now only add row to CF1, make sure when we force a flush, CF1 is larger than the lower
469      // bound and CF2 and CF3 are smaller than the lower bound.
470      for (int i = 0; i < maxLogs; i++) {
471        for (int j = 0; j < 100; j++) {
472          table.put(createPut(1, i * 100 + j));
473        }
474        // Roll the WAL. The log file count is less than maxLogs so no flush is triggered.
475        int currentNumRolledLogFiles = getNumRolledLogFiles(desiredRegion);
476        assertNull(getWAL(desiredRegion).rollWriter());
477        TEST_UTIL.waitFor(60000,
478          () -> getNumRolledLogFiles(desiredRegion) > currentNumRolledLogFiles);
479      }
480      assertEquals(maxLogs, getNumRolledLogFiles(desiredRegion));
481      assertTrue(
482        desiredRegion.getStore(FAMILY1).getMemStoreSize().getHeapSize() > cfFlushSizeLowerBound);
483      assertTrue(
484        desiredRegion.getStore(FAMILY2).getMemStoreSize().getHeapSize() < cfFlushSizeLowerBound);
485      assertTrue(
486        desiredRegion.getStore(FAMILY3).getMemStoreSize().getHeapSize() < cfFlushSizeLowerBound);
487      table.put(createPut(1, 12345678));
488      // Make numRolledLogFiles greater than maxLogs
489      desiredRegionAndServer.getSecond().getWalRoller().requestRollAll();
490      // Wait for some time till the flush caused by log rolling happens.
491      TEST_UTIL.waitFor(30000, new Waiter.ExplainingPredicate<Exception>() {
492
493        @Override
494        public boolean evaluate() throws Exception {
495          return desiredRegion.getMemStoreDataSize() == 0;
496        }
497
498        @Override
499        public String explainFailure() throws Exception {
500          long memstoreSize = desiredRegion.getMemStoreDataSize();
501          if (memstoreSize > 0) {
502            return "Still have unflushed entries in memstore, memstore size is " + memstoreSize;
503          }
504          return "Unknown";
505        }
506      });
507      LOG.info("Finished waiting on flush after too many WALs...");
508      // Individual families should have been flushed.
509      assertEquals(MutableSegment.DEEP_OVERHEAD,
510        desiredRegion.getStore(FAMILY1).getMemStoreSize().getHeapSize());
511      assertEquals(MutableSegment.DEEP_OVERHEAD,
512        desiredRegion.getStore(FAMILY2).getMemStoreSize().getHeapSize());
513      assertEquals(MutableSegment.DEEP_OVERHEAD,
514        desiredRegion.getStore(FAMILY3).getMemStoreSize().getHeapSize());
515      // let WAL cleanOldLogs
516      assertNull(getWAL(desiredRegion).rollWriter(true));
517      TEST_UTIL.waitFor(60000, () -> getNumRolledLogFiles(desiredRegion) < maxLogs);
518    } finally {
519      TEST_UTIL.shutdownMiniCluster();
520    }
521  }
522
523  private void doPut(Table table, long memstoreFlushSize) throws IOException, InterruptedException {
524    Region region = getRegionWithName(table.getName()).getFirst();
525    // cf1 4B per row, cf2 40B per row and cf3 400B per row
526    byte[] qf = Bytes.toBytes("qf");
527    for (int i = 0; i < 10000; i++) {
528      Put put = new Put(Bytes.toBytes("row-" + i));
529      byte[] value1 = new byte[100];
530      Bytes.random(value1);
531      put.addColumn(FAMILY1, qf, value1);
532      byte[] value2 = new byte[200];
533      Bytes.random(value2);
534      put.addColumn(FAMILY2, qf, value2);
535      byte[] value3 = new byte[400];
536      Bytes.random(value3);
537      put.addColumn(FAMILY3, qf, value3);
538      table.put(put);
539      // slow down to let regionserver flush region.
540      while (region.getMemStoreHeapSize() > memstoreFlushSize) {
541        Thread.sleep(100);
542      }
543    }
544  }
545
546  // Under the same write load, small stores should have less store files when
547  // percolumnfamilyflush enabled.
548  @Test
549  public void testCompareStoreFileCount() throws Exception {
550    long memstoreFlushSize = 1024L * 1024;
551    Configuration conf = TEST_UTIL.getConfiguration();
552    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, memstoreFlushSize);
553    conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllStoresPolicy.class.getName());
554    conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000);
555    conf.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY,
556      ConstantSizeRegionSplitPolicy.class.getName());
557
558    TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(TABLENAME)
559      .setCompactionEnabled(false).setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY1))
560      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY2))
561      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY3)).build();
562
563    LOG.info("==============Test with selective flush disabled===============");
564    int cf1StoreFileCount = -1;
565    int cf2StoreFileCount = -1;
566    int cf3StoreFileCount = -1;
567    int cf1StoreFileCount1 = -1;
568    int cf2StoreFileCount1 = -1;
569    int cf3StoreFileCount1 = -1;
570    try {
571      TEST_UTIL.startMiniCluster(1);
572      TEST_UTIL.getAdmin()
573        .createNamespace(NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build());
574      TEST_UTIL.getAdmin().createTable(tableDescriptor);
575      TEST_UTIL.waitTableAvailable(TABLENAME);
576      Connection conn = ConnectionFactory.createConnection(conf);
577      Table table = conn.getTable(TABLENAME);
578      doPut(table, memstoreFlushSize);
579      table.close();
580      conn.close();
581
582      Region region = getRegionWithName(TABLENAME).getFirst();
583      cf1StoreFileCount = region.getStore(FAMILY1).getStorefilesCount();
584      cf2StoreFileCount = region.getStore(FAMILY2).getStorefilesCount();
585      cf3StoreFileCount = region.getStore(FAMILY3).getStorefilesCount();
586    } finally {
587      TEST_UTIL.shutdownMiniCluster();
588    }
589
590    LOG.info("==============Test with selective flush enabled===============");
591    conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllLargeStoresPolicy.class.getName());
592    // default value of per-cf flush lower bound is too big, set to a small enough value
593    conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 0);
594    try {
595      TEST_UTIL.startMiniCluster(1);
596      TEST_UTIL.getAdmin()
597        .createNamespace(NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build());
598      TEST_UTIL.getAdmin().createTable(tableDescriptor);
599      Connection conn = ConnectionFactory.createConnection(conf);
600      Table table = conn.getTable(TABLENAME);
601      doPut(table, memstoreFlushSize);
602      table.close();
603      conn.close();
604
605      Region region = getRegionWithName(TABLENAME).getFirst();
606      cf1StoreFileCount1 = region.getStore(FAMILY1).getStorefilesCount();
607      cf2StoreFileCount1 = region.getStore(FAMILY2).getStorefilesCount();
608      cf3StoreFileCount1 = region.getStore(FAMILY3).getStorefilesCount();
609    } finally {
610      TEST_UTIL.shutdownMiniCluster();
611    }
612
613    LOG.info("disable selective flush: " + Bytes.toString(FAMILY1) + "=>" + cf1StoreFileCount + ", "
614      + Bytes.toString(FAMILY2) + "=>" + cf2StoreFileCount + ", " + Bytes.toString(FAMILY3) + "=>"
615      + cf3StoreFileCount);
616    LOG.info("enable selective flush: " + Bytes.toString(FAMILY1) + "=>" + cf1StoreFileCount1 + ", "
617      + Bytes.toString(FAMILY2) + "=>" + cf2StoreFileCount1 + ", " + Bytes.toString(FAMILY3) + "=>"
618      + cf3StoreFileCount1);
619    // small CF will have less store files.
620    assertTrue(cf1StoreFileCount1 < cf1StoreFileCount);
621    assertTrue(cf2StoreFileCount1 < cf2StoreFileCount);
622  }
623
624  public static void main(String[] args) throws Exception {
625    int numRegions = Integer.parseInt(args[0]);
626    long numRows = Long.parseLong(args[1]);
627
628    TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(TABLENAME)
629      .setMaxFileSize(10L * 1024 * 1024 * 1024)
630      .setValue(TableDescriptorBuilder.SPLIT_POLICY, ConstantSizeRegionSplitPolicy.class.getName())
631      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY1))
632      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY2))
633      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY3)).build();
634
635    Configuration conf = HBaseConfiguration.create();
636    Connection conn = ConnectionFactory.createConnection(conf);
637    Admin admin = conn.getAdmin();
638    if (admin.tableExists(TABLENAME)) {
639      admin.disableTable(TABLENAME);
640      admin.deleteTable(TABLENAME);
641    }
642    if (numRegions >= 3) {
643      byte[] startKey = new byte[16];
644      byte[] endKey = new byte[16];
645      Arrays.fill(endKey, (byte) 0xFF);
646      admin.createTable(tableDescriptor, startKey, endKey, numRegions);
647    } else {
648      admin.createTable(tableDescriptor);
649    }
650    admin.close();
651
652    Table table = conn.getTable(TABLENAME);
653    byte[] qf = Bytes.toBytes("qf");
654    byte[] value1 = new byte[16];
655    byte[] value2 = new byte[256];
656    byte[] value3 = new byte[4096];
657    for (long i = 0; i < numRows; i++) {
658      Put put = new Put(Hashing.md5().hashLong(i).asBytes());
659      Bytes.random(value1);
660      Bytes.random(value2);
661      Bytes.random(value3);
662      put.addColumn(FAMILY1, qf, value1);
663      put.addColumn(FAMILY2, qf, value2);
664      put.addColumn(FAMILY3, qf, value3);
665      table.put(put);
666      if (i % 10000 == 0) {
667        LOG.info(i + " rows put");
668      }
669    }
670    table.close();
671    conn.close();
672  }
673}