001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertNotNull;
022import static org.junit.Assert.assertNull;
023import static org.junit.Assert.assertTrue;
024
025import java.io.IOException;
026import java.util.Arrays;
027import java.util.List;
028import java.util.Random;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.Path;
031import org.apache.hadoop.hbase.HBaseClassTestRule;
032import org.apache.hadoop.hbase.HBaseConfiguration;
033import org.apache.hadoop.hbase.HBaseTestingUtility;
034import org.apache.hadoop.hbase.HColumnDescriptor;
035import org.apache.hadoop.hbase.HConstants;
036import org.apache.hadoop.hbase.HRegionInfo;
037import org.apache.hadoop.hbase.HTableDescriptor;
038import org.apache.hadoop.hbase.MiniHBaseCluster;
039import org.apache.hadoop.hbase.NamespaceDescriptor;
040import org.apache.hadoop.hbase.TableName;
041import org.apache.hadoop.hbase.Waiter;
042import org.apache.hadoop.hbase.client.Admin;
043import org.apache.hadoop.hbase.client.Connection;
044import org.apache.hadoop.hbase.client.ConnectionFactory;
045import org.apache.hadoop.hbase.client.Get;
046import org.apache.hadoop.hbase.client.Put;
047import org.apache.hadoop.hbase.client.Result;
048import org.apache.hadoop.hbase.client.Table;
049import org.apache.hadoop.hbase.testclassification.LargeTests;
050import org.apache.hadoop.hbase.testclassification.RegionServerTests;
051import org.apache.hadoop.hbase.util.Bytes;
052import org.apache.hadoop.hbase.util.JVMClusterUtil;
053import org.apache.hadoop.hbase.util.Pair;
054import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
055import org.apache.hadoop.hbase.wal.WAL;
056import org.junit.ClassRule;
057import org.junit.Test;
058import org.junit.experimental.categories.Category;
059import org.slf4j.Logger;
060import org.slf4j.LoggerFactory;
061
062import org.apache.hbase.thirdparty.com.google.common.hash.Hashing;
063
064/**
065 * This test verifies the correctness of the Per Column Family flushing strategy
066 */
067@Category({ RegionServerTests.class, LargeTests.class })
068public class TestPerColumnFamilyFlush {
069
070  @ClassRule
071  public static final HBaseClassTestRule CLASS_RULE =
072      HBaseClassTestRule.forClass(TestPerColumnFamilyFlush.class);
073
074  private static final Logger LOG = LoggerFactory.getLogger(TestPerColumnFamilyFlush.class);
075
076  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
077
078  private static final Path DIR = TEST_UTIL.getDataTestDir("TestHRegion");
079
080  public static final TableName TABLENAME = TableName.valueOf("TestPerColumnFamilyFlush", "t1");
081
082  public static final byte[][] FAMILIES = { Bytes.toBytes("f1"), Bytes.toBytes("f2"),
083      Bytes.toBytes("f3"), Bytes.toBytes("f4"), Bytes.toBytes("f5") };
084
085  public static final byte[] FAMILY1 = FAMILIES[0];
086
087  public static final byte[] FAMILY2 = FAMILIES[1];
088
089  public static final byte[] FAMILY3 = FAMILIES[2];
090
091  private HRegion initHRegion(String callingMethod, Configuration conf) throws IOException {
092    HTableDescriptor htd = new HTableDescriptor(TABLENAME);
093    for (byte[] family : FAMILIES) {
094      htd.addFamily(new HColumnDescriptor(family));
095    }
096    HRegionInfo info = new HRegionInfo(TABLENAME, null, null, false);
097    Path path = new Path(DIR, callingMethod);
098    return HBaseTestingUtility.createRegionAndWAL(info, path, conf, htd);
099  }
100
101  // A helper function to create puts.
102  private Put createPut(int familyNum, int putNum) {
103    byte[] qf = Bytes.toBytes("q" + familyNum);
104    byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum);
105    byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum);
106    Put p = new Put(row);
107    p.addColumn(FAMILIES[familyNum - 1], qf, val);
108    return p;
109  }
110
111  // A helper function to create puts.
112  private Get createGet(int familyNum, int putNum) {
113    byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum);
114    return new Get(row);
115  }
116
117  // A helper function to verify edits.
118  void verifyEdit(int familyNum, int putNum, Table table) throws IOException {
119    Result r = table.get(createGet(familyNum, putNum));
120    byte[] family = FAMILIES[familyNum - 1];
121    byte[] qf = Bytes.toBytes("q" + familyNum);
122    byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum);
123    assertNotNull(("Missing Put#" + putNum + " for CF# " + familyNum), r.getFamilyMap(family));
124    assertNotNull(("Missing Put#" + putNum + " for CF# " + familyNum),
125      r.getFamilyMap(family).get(qf));
126    assertTrue(("Incorrect value for Put#" + putNum + " for CF# " + familyNum),
127      Arrays.equals(r.getFamilyMap(family).get(qf), val));
128  }
129
130  @Test
131  public void testSelectiveFlushWhenEnabled() throws IOException {
132    // Set up the configuration, use new one to not conflict with minicluster in other tests
133    Configuration conf = new HBaseTestingUtility().getConfiguration();
134    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 200 * 1024);
135    conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllLargeStoresPolicy.class.getName());
136    conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN,
137        40 * 1024);
138    // Intialize the region
139    HRegion region = initHRegion("testSelectiveFlushWithDataCompaction", conf);
140    // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3
141    for (int i = 1; i <= 1200; i++) {
142      region.put(createPut(1, i));
143
144      if (i <= 100) {
145        region.put(createPut(2, i));
146        if (i <= 50) {
147          region.put(createPut(3, i));
148        }
149      }
150    }
151
152    long totalMemstoreSize = region.getMemStoreDataSize();
153
154    // Find the smallest LSNs for edits wrt to each CF.
155    long smallestSeqCF1 = region.getOldestSeqIdOfStore(FAMILY1);
156    long smallestSeqCF2 = region.getOldestSeqIdOfStore(FAMILY2);
157    long smallestSeqCF3 = region.getOldestSeqIdOfStore(FAMILY3);
158
159    // Find the sizes of the memstores of each CF.
160    MemStoreSize cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize();
161    MemStoreSize cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize();
162    MemStoreSize cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
163
164    // Get the overall smallest LSN in the region's memstores.
165    long smallestSeqInRegionCurrentMemstore = getWAL(region)
166        .getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
167
168    // The overall smallest LSN in the region's memstores should be the same as
169    // the LSN of the smallest edit in CF1
170    assertEquals(smallestSeqCF1, smallestSeqInRegionCurrentMemstore);
171
172    // Some other sanity checks.
173    assertTrue(smallestSeqCF1 < smallestSeqCF2);
174    assertTrue(smallestSeqCF2 < smallestSeqCF3);
175    assertTrue(cf1MemstoreSize.getDataSize() > 0);
176    assertTrue(cf2MemstoreSize.getDataSize() > 0);
177    assertTrue(cf3MemstoreSize.getDataSize() > 0);
178
179    // The total memstore size should be the same as the sum of the sizes of
180    // memstores of CF1, CF2 and CF3.
181    assertEquals(totalMemstoreSize, cf1MemstoreSize.getDataSize() + cf2MemstoreSize.getDataSize()
182        + cf3MemstoreSize.getDataSize());
183
184    // Flush!
185    region.flush(false);
186
187    // Will use these to check if anything changed.
188    MemStoreSize oldCF2MemstoreSize = cf2MemstoreSize;
189    MemStoreSize oldCF3MemstoreSize = cf3MemstoreSize;
190
191    // Recalculate everything
192    cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize();
193    cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize();
194    cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
195    totalMemstoreSize = region.getMemStoreDataSize();
196    smallestSeqInRegionCurrentMemstore = getWAL(region)
197        .getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
198
199    // We should have cleared out only CF1, since we chose the flush thresholds
200    // and number of puts accordingly.
201    assertEquals(0, cf1MemstoreSize.getDataSize());
202    assertEquals(MutableSegment.DEEP_OVERHEAD, cf1MemstoreSize.getHeapSize());
203    // Nothing should have happened to CF2, ...
204    assertEquals(cf2MemstoreSize, oldCF2MemstoreSize);
205    // ... or CF3
206    assertEquals(cf3MemstoreSize, oldCF3MemstoreSize);
207    // Now the smallest LSN in the region should be the same as the smallest
208    // LSN in the memstore of CF2.
209    assertEquals(smallestSeqInRegionCurrentMemstore, smallestSeqCF2);
210    // Of course, this should hold too.
211    assertEquals(totalMemstoreSize, cf2MemstoreSize.getDataSize() + cf3MemstoreSize.getDataSize());
212
213    // Now add more puts (mostly for CF2), so that we only flush CF2 this time.
214    for (int i = 1200; i < 2400; i++) {
215      region.put(createPut(2, i));
216
217      // Add only 100 puts for CF3
218      if (i - 1200 < 100) {
219        region.put(createPut(3, i));
220      }
221    }
222
223    // How much does the CF3 memstore occupy? Will be used later.
224    oldCF3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
225
226    // Flush again
227    region.flush(false);
228
229    // Recalculate everything
230    cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize();
231    cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize();
232    cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
233    totalMemstoreSize = region.getMemStoreDataSize();
234    smallestSeqInRegionCurrentMemstore = getWAL(region)
235        .getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
236
237    // CF1 and CF2, both should be absent.
238    assertEquals(0, cf1MemstoreSize.getDataSize());
239    assertEquals(MutableSegment.DEEP_OVERHEAD, cf1MemstoreSize.getHeapSize());
240    assertEquals(0, cf2MemstoreSize.getDataSize());
241    assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSize.getHeapSize());
242    // CF3 shouldn't have been touched.
243    assertEquals(cf3MemstoreSize, oldCF3MemstoreSize);
244    assertEquals(totalMemstoreSize, cf3MemstoreSize.getDataSize());
245
246    // What happens when we hit the memstore limit, but we are not able to find
247    // any Column Family above the threshold?
248    // In that case, we should flush all the CFs.
249
250    // Clearing the existing memstores.
251    region.flush(true);
252
253    // The memstore limit is 200*1024 and the column family flush threshold is
254    // around 50*1024. We try to just hit the memstore limit with each CF's
255    // memstore being below the CF flush threshold.
256    for (int i = 1; i <= 300; i++) {
257      region.put(createPut(1, i));
258      region.put(createPut(2, i));
259      region.put(createPut(3, i));
260      region.put(createPut(4, i));
261      region.put(createPut(5, i));
262    }
263
264    region.flush(false);
265
266    // Since we won't find any CF above the threshold, and hence no specific
267    // store to flush, we should flush all the memstores.
268    assertEquals(0, region.getMemStoreDataSize());
269    HBaseTestingUtility.closeRegionAndWAL(region);
270  }
271
272  @Test
273  public void testSelectiveFlushWhenNotEnabled() throws IOException {
274    // Set up the configuration, use new one to not conflict with minicluster in other tests
275    Configuration conf = new HBaseTestingUtility().getConfiguration();
276    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 200 * 1024);
277    conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllStoresPolicy.class.getName());
278
279    // Intialize the HRegion
280    HRegion region = initHRegion("testSelectiveFlushWhenNotEnabled", conf);
281    // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3
282    for (int i = 1; i <= 1200; i++) {
283      region.put(createPut(1, i));
284      if (i <= 100) {
285        region.put(createPut(2, i));
286        if (i <= 50) {
287          region.put(createPut(3, i));
288        }
289      }
290    }
291
292    long totalMemstoreSize = region.getMemStoreDataSize();
293
294    // Find the sizes of the memstores of each CF.
295    MemStoreSize cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize();
296    MemStoreSize cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize();
297    MemStoreSize cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
298
299    // Some other sanity checks.
300    assertTrue(cf1MemstoreSize.getDataSize() > 0);
301    assertTrue(cf2MemstoreSize.getDataSize() > 0);
302    assertTrue(cf3MemstoreSize.getDataSize() > 0);
303
304    // The total memstore size should be the same as the sum of the sizes of
305    // memstores of CF1, CF2 and CF3.
306    assertEquals(totalMemstoreSize, cf1MemstoreSize.getDataSize() + cf2MemstoreSize.getDataSize()
307        + cf3MemstoreSize.getDataSize());
308
309    // Flush!
310    region.flush(false);
311
312    cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize();
313    cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize();
314    cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
315    totalMemstoreSize = region.getMemStoreDataSize();
316    long smallestSeqInRegionCurrentMemstore =
317        region.getWAL().getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
318
319    // Everything should have been cleared
320    assertEquals(0, cf1MemstoreSize.getDataSize());
321    assertEquals(MutableSegment.DEEP_OVERHEAD, cf1MemstoreSize.getHeapSize());
322    assertEquals(0, cf2MemstoreSize.getDataSize());
323    assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSize.getHeapSize());
324    assertEquals(0, cf3MemstoreSize.getDataSize());
325    assertEquals(MutableSegment.DEEP_OVERHEAD, cf3MemstoreSize.getHeapSize());
326    assertEquals(0, totalMemstoreSize);
327    assertEquals(HConstants.NO_SEQNUM, smallestSeqInRegionCurrentMemstore);
328    HBaseTestingUtility.closeRegionAndWAL(region);
329  }
330
331  // Find the (first) region which has the specified name.
332  private static Pair<HRegion, HRegionServer> getRegionWithName(TableName tableName) {
333    MiniHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster();
334    List<JVMClusterUtil.RegionServerThread> rsts = cluster.getRegionServerThreads();
335    for (int i = 0; i < cluster.getRegionServerThreads().size(); i++) {
336      HRegionServer hrs = rsts.get(i).getRegionServer();
337      for (HRegion region : hrs.getRegions(tableName)) {
338        return Pair.newPair(region, hrs);
339      }
340    }
341    return null;
342  }
343
344  private void doTestLogReplay() throws Exception {
345    Configuration conf = TEST_UTIL.getConfiguration();
346    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 10000);
347    // Carefully chosen limits so that the memstore just flushes when we're done
348    conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllLargeStoresPolicy.class.getName());
349    conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 2500);
350    final int numRegionServers = 4;
351    try {
352      TEST_UTIL.startMiniCluster(numRegionServers);
353      TEST_UTIL.getAdmin().createNamespace(
354        NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build());
355      Table table = TEST_UTIL.createTable(TABLENAME, FAMILIES);
356      HTableDescriptor htd = table.getTableDescriptor();
357
358      for (byte[] family : FAMILIES) {
359        if (!htd.hasFamily(family)) {
360          htd.addFamily(new HColumnDescriptor(family));
361        }
362      }
363
364      // Add 100 edits for CF1, 20 for CF2, 20 for CF3.
365      // These will all be interleaved in the log.
366      for (int i = 1; i <= 80; i++) {
367        table.put(createPut(1, i));
368        if (i <= 10) {
369          table.put(createPut(2, i));
370          table.put(createPut(3, i));
371        }
372      }
373      Thread.sleep(1000);
374
375      Pair<HRegion, HRegionServer> desiredRegionAndServer = getRegionWithName(TABLENAME);
376      HRegion desiredRegion = desiredRegionAndServer.getFirst();
377      assertTrue("Could not find a region which hosts the new region.", desiredRegion != null);
378
379      // Flush the region selectively.
380      desiredRegion.flush(false);
381
382      long totalMemstoreSize;
383      long cf1MemstoreSize, cf2MemstoreSize, cf3MemstoreSize;
384      totalMemstoreSize = desiredRegion.getMemStoreDataSize();
385
386      // Find the sizes of the memstores of each CF.
387      cf1MemstoreSize = desiredRegion.getStore(FAMILY1).getMemStoreSize().getDataSize();
388      cf2MemstoreSize = desiredRegion.getStore(FAMILY2).getMemStoreSize().getDataSize();
389      cf3MemstoreSize = desiredRegion.getStore(FAMILY3).getMemStoreSize().getDataSize();
390
391      // CF1 Should have been flushed
392      assertEquals(0, cf1MemstoreSize);
393      // CF2 and CF3 shouldn't have been flushed.
394      // TODO: This test doesn't allow for this case:
395      // " Since none of the CFs were above the size, flushing all."
396      // i.e. a flush happens before we get to here and its a flush-all.
397      assertTrue(cf2MemstoreSize >= 0);
398      assertTrue(cf3MemstoreSize >= 0);
399      assertEquals(totalMemstoreSize, cf2MemstoreSize + cf3MemstoreSize);
400
401      // Wait for the RS report to go across to the master, so that the master
402      // is aware of which sequence ids have been flushed, before we kill the RS.
403      // If in production, the RS dies before the report goes across, we will
404      // safely replay all the edits.
405      Thread.sleep(2000);
406
407      // Abort the region server where we have the region hosted.
408      HRegionServer rs = desiredRegionAndServer.getSecond();
409      rs.abort("testing");
410
411      // The aborted region server's regions will be eventually assigned to some
412      // other region server, and the get RPC call (inside verifyEdit()) will
413      // retry for some time till the regions come back up.
414
415      // Verify that all the edits are safe.
416      for (int i = 1; i <= 80; i++) {
417        verifyEdit(1, i, table);
418        if (i <= 10) {
419          verifyEdit(2, i, table);
420          verifyEdit(3, i, table);
421        }
422      }
423    } finally {
424      TEST_UTIL.shutdownMiniCluster();
425    }
426  }
427
428  // Test Log Replay with Distributed log split on.
429  @Test
430  public void testLogReplayWithDistributedLogSplit() throws Exception {
431    doTestLogReplay();
432  }
433
434  private WAL getWAL(Region region) {
435    return ((HRegion)region).getWAL();
436  }
437
438  private int getNumRolledLogFiles(Region region) {
439    return AbstractFSWALProvider.getNumRolledLogFiles(getWAL(region));
440  }
441
442  /**
443   * When a log roll is about to happen, we do a flush of the regions who will be affected by the
444   * log roll. These flushes cannot be a selective flushes, otherwise we cannot roll the logs. This
445   * test ensures that we do a full-flush in that scenario.
446   * @throws IOException
447   */
448  @Test
449  public void testFlushingWhenLogRolling() throws Exception {
450    TableName tableName = TableName.valueOf("testFlushingWhenLogRolling");
451    Configuration conf = TEST_UTIL.getConfiguration();
452    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 128 * 1024 * 1024);
453    conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllLargeStoresPolicy.class.getName());
454    long cfFlushSizeLowerBound = 2048;
455    conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN,
456      cfFlushSizeLowerBound);
457
458    // One hour, prevent periodic rolling
459    conf.setLong("hbase.regionserver.logroll.period", 60L * 60 * 1000);
460    // prevent rolling by size
461    conf.setLong("hbase.regionserver.hlog.blocksize", 128L * 1024 * 1024);
462    // Make it 10 as max logs before a flush comes on.
463    final int maxLogs = 10;
464    conf.setInt("hbase.regionserver.maxlogs", maxLogs);
465
466    final int numRegionServers = 1;
467    TEST_UTIL.startMiniCluster(numRegionServers);
468    try {
469      Table table = TEST_UTIL.createTable(tableName, FAMILIES);
470      // Force flush the namespace table so edits to it are not hanging around as oldest
471      // edits. Otherwise, below, when we make maximum number of WAL files, then it will be
472      // the namespace region that is flushed and not the below 'desiredRegion'.
473      try (Admin admin = TEST_UTIL.getConnection().getAdmin()) {
474        admin.flush(TableName.NAMESPACE_TABLE_NAME);
475      }
476      Pair<HRegion, HRegionServer> desiredRegionAndServer = getRegionWithName(tableName);
477      final HRegion desiredRegion = desiredRegionAndServer.getFirst();
478      assertTrue("Could not find a region which hosts the new region.", desiredRegion != null);
479      LOG.info("Writing to region=" + desiredRegion);
480
481      // Add one row for both CFs.
482      for (int i = 1; i <= 3; i++) {
483        table.put(createPut(i, 0));
484      }
485      // Now only add row to CF1, make sure when we force a flush, CF1 is larger than the lower
486      // bound and CF2 and CF3 are smaller than the lower bound.
487      for (int i = 0; i < maxLogs; i++) {
488        for (int j = 0; j < 100; j++) {
489          table.put(createPut(1, i * 100 + j));
490        }
491        // Roll the WAL. The log file count is less than maxLogs so no flush is triggered.
492        int currentNumRolledLogFiles = getNumRolledLogFiles(desiredRegion);
493        assertNull(getWAL(desiredRegion).rollWriter());
494        while (getNumRolledLogFiles(desiredRegion) <= currentNumRolledLogFiles) {
495          Thread.sleep(100);
496        }
497      }
498      assertEquals(maxLogs, getNumRolledLogFiles(desiredRegion));
499      assertTrue(desiredRegion.getStore(FAMILY1).getMemStoreSize().getHeapSize() > cfFlushSizeLowerBound);
500      assertTrue(desiredRegion.getStore(FAMILY2).getMemStoreSize().getHeapSize() < cfFlushSizeLowerBound);
501      assertTrue(desiredRegion.getStore(FAMILY3).getMemStoreSize().getHeapSize() < cfFlushSizeLowerBound);
502      table.put(createPut(1, 12345678));
503      // Make numRolledLogFiles greater than maxLogs
504      desiredRegionAndServer.getSecond().getWalRoller().requestRollAll();
505      // Wait for some time till the flush caused by log rolling happens.
506      TEST_UTIL.waitFor(30000, new Waiter.ExplainingPredicate<Exception>() {
507
508        @Override
509        public boolean evaluate() throws Exception {
510          return desiredRegion.getMemStoreDataSize() == 0;
511        }
512
513        @Override
514        public String explainFailure() throws Exception {
515          long memstoreSize = desiredRegion.getMemStoreDataSize();
516          if (memstoreSize > 0) {
517            return "Still have unflushed entries in memstore, memstore size is " + memstoreSize;
518          }
519          return "Unknown";
520        }
521      });
522      LOG.info("Finished waiting on flush after too many WALs...");
523      // Individual families should have been flushed.
524      assertEquals(MutableSegment.DEEP_OVERHEAD,
525        desiredRegion.getStore(FAMILY1).getMemStoreSize().getHeapSize());
526      assertEquals(MutableSegment.DEEP_OVERHEAD,
527        desiredRegion.getStore(FAMILY2).getMemStoreSize().getHeapSize());
528      assertEquals(MutableSegment.DEEP_OVERHEAD,
529        desiredRegion.getStore(FAMILY3).getMemStoreSize().getHeapSize());
530      // let WAL cleanOldLogs
531      assertNull(getWAL(desiredRegion).rollWriter(true));
532      assertTrue(getNumRolledLogFiles(desiredRegion) < maxLogs);
533    } finally {
534      TEST_UTIL.shutdownMiniCluster();
535    }
536  }
537
538  private void doPut(Table table, long memstoreFlushSize) throws IOException, InterruptedException {
539    Region region = getRegionWithName(table.getName()).getFirst();
540    // cf1 4B per row, cf2 40B per row and cf3 400B per row
541    byte[] qf = Bytes.toBytes("qf");
542    Random rand = new Random();
543    byte[] value1 = new byte[100];
544    byte[] value2 = new byte[200];
545    byte[] value3 = new byte[400];
546    for (int i = 0; i < 10000; i++) {
547      Put put = new Put(Bytes.toBytes("row-" + i));
548      rand.setSeed(i);
549      rand.nextBytes(value1);
550      rand.nextBytes(value2);
551      rand.nextBytes(value3);
552      put.addColumn(FAMILY1, qf, value1);
553      put.addColumn(FAMILY2, qf, value2);
554      put.addColumn(FAMILY3, qf, value3);
555      table.put(put);
556      // slow down to let regionserver flush region.
557      while (region.getMemStoreHeapSize() > memstoreFlushSize) {
558        Thread.sleep(100);
559      }
560    }
561  }
562
563  // Under the same write load, small stores should have less store files when
564  // percolumnfamilyflush enabled.
565  @Test
566  public void testCompareStoreFileCount() throws Exception {
567    long memstoreFlushSize = 1024L * 1024;
568    Configuration conf = TEST_UTIL.getConfiguration();
569    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, memstoreFlushSize);
570    conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllStoresPolicy.class.getName());
571    conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000);
572    conf.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY,
573      ConstantSizeRegionSplitPolicy.class.getName());
574
575    HTableDescriptor htd = new HTableDescriptor(TABLENAME);
576    htd.setCompactionEnabled(false);
577    htd.addFamily(new HColumnDescriptor(FAMILY1));
578    htd.addFamily(new HColumnDescriptor(FAMILY2));
579    htd.addFamily(new HColumnDescriptor(FAMILY3));
580
581    LOG.info("==============Test with selective flush disabled===============");
582    int cf1StoreFileCount = -1;
583    int cf2StoreFileCount = -1;
584    int cf3StoreFileCount = -1;
585    int cf1StoreFileCount1 = -1;
586    int cf2StoreFileCount1 = -1;
587    int cf3StoreFileCount1 = -1;
588    try {
589      TEST_UTIL.startMiniCluster(1);
590      TEST_UTIL.getAdmin().createNamespace(
591        NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build());
592      TEST_UTIL.getAdmin().createTable(htd);
593      TEST_UTIL.waitTableAvailable(TABLENAME);
594      Connection conn = ConnectionFactory.createConnection(conf);
595      Table table = conn.getTable(TABLENAME);
596      doPut(table, memstoreFlushSize);
597      table.close();
598      conn.close();
599
600      Region region = getRegionWithName(TABLENAME).getFirst();
601      cf1StoreFileCount = region.getStore(FAMILY1).getStorefilesCount();
602      cf2StoreFileCount = region.getStore(FAMILY2).getStorefilesCount();
603      cf3StoreFileCount = region.getStore(FAMILY3).getStorefilesCount();
604    } finally {
605      TEST_UTIL.shutdownMiniCluster();
606    }
607
608    LOG.info("==============Test with selective flush enabled===============");
609    conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllLargeStoresPolicy.class.getName());
610    // default value of per-cf flush lower bound is too big, set to a small enough value
611    conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 0);
612    try {
613      TEST_UTIL.startMiniCluster(1);
614      TEST_UTIL.getAdmin().createNamespace(
615        NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build());
616      TEST_UTIL.getAdmin().createTable(htd);
617      Connection conn = ConnectionFactory.createConnection(conf);
618      Table table = conn.getTable(TABLENAME);
619      doPut(table, memstoreFlushSize);
620      table.close();
621      conn.close();
622
623      Region region = getRegionWithName(TABLENAME).getFirst();
624      cf1StoreFileCount1 = region.getStore(FAMILY1).getStorefilesCount();
625      cf2StoreFileCount1 = region.getStore(FAMILY2).getStorefilesCount();
626      cf3StoreFileCount1 = region.getStore(FAMILY3).getStorefilesCount();
627    } finally {
628      TEST_UTIL.shutdownMiniCluster();
629    }
630
631    LOG.info("disable selective flush: " + Bytes.toString(FAMILY1) + "=>" + cf1StoreFileCount
632        + ", " + Bytes.toString(FAMILY2) + "=>" + cf2StoreFileCount + ", "
633        + Bytes.toString(FAMILY3) + "=>" + cf3StoreFileCount);
634    LOG.info("enable selective flush: " + Bytes.toString(FAMILY1) + "=>" + cf1StoreFileCount1
635        + ", " + Bytes.toString(FAMILY2) + "=>" + cf2StoreFileCount1 + ", "
636        + Bytes.toString(FAMILY3) + "=>" + cf3StoreFileCount1);
637    // small CF will have less store files.
638    assertTrue(cf1StoreFileCount1 < cf1StoreFileCount);
639    assertTrue(cf2StoreFileCount1 < cf2StoreFileCount);
640  }
641
642  public static void main(String[] args) throws Exception {
643    int numRegions = Integer.parseInt(args[0]);
644    long numRows = Long.parseLong(args[1]);
645
646    HTableDescriptor htd = new HTableDescriptor(TABLENAME);
647    htd.setMaxFileSize(10L * 1024 * 1024 * 1024);
648    htd.setValue(HTableDescriptor.SPLIT_POLICY, ConstantSizeRegionSplitPolicy.class.getName());
649    htd.addFamily(new HColumnDescriptor(FAMILY1));
650    htd.addFamily(new HColumnDescriptor(FAMILY2));
651    htd.addFamily(new HColumnDescriptor(FAMILY3));
652
653    Configuration conf = HBaseConfiguration.create();
654    Connection conn = ConnectionFactory.createConnection(conf);
655    Admin admin = conn.getAdmin();
656    if (admin.tableExists(TABLENAME)) {
657      admin.disableTable(TABLENAME);
658      admin.deleteTable(TABLENAME);
659    }
660    if (numRegions >= 3) {
661      byte[] startKey = new byte[16];
662      byte[] endKey = new byte[16];
663      Arrays.fill(endKey, (byte) 0xFF);
664      admin.createTable(htd, startKey, endKey, numRegions);
665    } else {
666      admin.createTable(htd);
667    }
668    admin.close();
669
670    Table table = conn.getTable(TABLENAME);
671    byte[] qf = Bytes.toBytes("qf");
672    Random rand = new Random();
673    byte[] value1 = new byte[16];
674    byte[] value2 = new byte[256];
675    byte[] value3 = new byte[4096];
676    for (long i = 0; i < numRows; i++) {
677      Put put = new Put(Hashing.md5().hashLong(i).asBytes());
678      rand.setSeed(i);
679      rand.nextBytes(value1);
680      rand.nextBytes(value2);
681      rand.nextBytes(value3);
682      put.addColumn(FAMILY1, qf, value1);
683      put.addColumn(FAMILY2, qf, value2);
684      put.addColumn(FAMILY3, qf, value3);
685      table.put(put);
686      if (i % 10000 == 0) {
687        LOG.info(i + " rows put");
688      }
689    }
690    table.close();
691    conn.close();
692  }
693}