001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertNotNull;
022import static org.junit.Assert.assertNull;
023import static org.junit.Assert.assertTrue;
024
025import java.io.IOException;
026import java.util.Arrays;
027import java.util.List;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.HBaseConfiguration;
032import org.apache.hadoop.hbase.HBaseTestingUtility;
033import org.apache.hadoop.hbase.HColumnDescriptor;
034import org.apache.hadoop.hbase.HConstants;
035import org.apache.hadoop.hbase.HRegionInfo;
036import org.apache.hadoop.hbase.HTableDescriptor;
037import org.apache.hadoop.hbase.MiniHBaseCluster;
038import org.apache.hadoop.hbase.NamespaceDescriptor;
039import org.apache.hadoop.hbase.TableName;
040import org.apache.hadoop.hbase.Waiter;
041import org.apache.hadoop.hbase.client.Admin;
042import org.apache.hadoop.hbase.client.Connection;
043import org.apache.hadoop.hbase.client.ConnectionFactory;
044import org.apache.hadoop.hbase.client.Get;
045import org.apache.hadoop.hbase.client.Put;
046import org.apache.hadoop.hbase.client.Result;
047import org.apache.hadoop.hbase.client.Table;
048import org.apache.hadoop.hbase.testclassification.LargeTests;
049import org.apache.hadoop.hbase.testclassification.RegionServerTests;
050import org.apache.hadoop.hbase.util.Bytes;
051import org.apache.hadoop.hbase.util.JVMClusterUtil;
052import org.apache.hadoop.hbase.util.Pair;
053import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
054import org.apache.hadoop.hbase.wal.WAL;
055import org.junit.ClassRule;
056import org.junit.Test;
057import org.junit.experimental.categories.Category;
058import org.slf4j.Logger;
059import org.slf4j.LoggerFactory;
060
061import org.apache.hbase.thirdparty.com.google.common.hash.Hashing;
062
063/**
064 * This test verifies the correctness of the Per Column Family flushing strategy
065 */
066@Category({ RegionServerTests.class, LargeTests.class })
067public class TestPerColumnFamilyFlush {
068
069  @ClassRule
070  public static final HBaseClassTestRule CLASS_RULE =
071    HBaseClassTestRule.forClass(TestPerColumnFamilyFlush.class);
072
073  private static final Logger LOG = LoggerFactory.getLogger(TestPerColumnFamilyFlush.class);
074
075  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
076
077  private static final Path DIR = TEST_UTIL.getDataTestDir("TestHRegion");
078
079  public static final TableName TABLENAME = TableName.valueOf("TestPerColumnFamilyFlush", "t1");
080
081  public static final byte[][] FAMILIES = { Bytes.toBytes("f1"), Bytes.toBytes("f2"),
082    Bytes.toBytes("f3"), Bytes.toBytes("f4"), Bytes.toBytes("f5") };
083
084  public static final byte[] FAMILY1 = FAMILIES[0];
085
086  public static final byte[] FAMILY2 = FAMILIES[1];
087
088  public static final byte[] FAMILY3 = FAMILIES[2];
089
090  private HRegion initHRegion(String callingMethod, Configuration conf) throws IOException {
091    HTableDescriptor htd = new HTableDescriptor(TABLENAME);
092    for (byte[] family : FAMILIES) {
093      htd.addFamily(new HColumnDescriptor(family));
094    }
095    HRegionInfo info = new HRegionInfo(TABLENAME, null, null, false);
096    Path path = new Path(DIR, callingMethod);
097    return HBaseTestingUtility.createRegionAndWAL(info, path, conf, htd);
098  }
099
100  // A helper function to create puts.
101  private Put createPut(int familyNum, int putNum) {
102    byte[] qf = Bytes.toBytes("q" + familyNum);
103    byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum);
104    byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum);
105    Put p = new Put(row);
106    p.addColumn(FAMILIES[familyNum - 1], qf, val);
107    return p;
108  }
109
110  // A helper function to create puts.
111  private Get createGet(int familyNum, int putNum) {
112    byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum);
113    return new Get(row);
114  }
115
116  // A helper function to verify edits.
117  void verifyEdit(int familyNum, int putNum, Table table) throws IOException {
118    Result r = table.get(createGet(familyNum, putNum));
119    byte[] family = FAMILIES[familyNum - 1];
120    byte[] qf = Bytes.toBytes("q" + familyNum);
121    byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum);
122    assertNotNull(("Missing Put#" + putNum + " for CF# " + familyNum), r.getFamilyMap(family));
123    assertNotNull(("Missing Put#" + putNum + " for CF# " + familyNum),
124      r.getFamilyMap(family).get(qf));
125    assertTrue(("Incorrect value for Put#" + putNum + " for CF# " + familyNum),
126      Arrays.equals(r.getFamilyMap(family).get(qf), val));
127  }
128
129  @Test
130  public void testSelectiveFlushWhenEnabled() throws IOException {
131    // Set up the configuration, use new one to not conflict with minicluster in other tests
132    Configuration conf = new HBaseTestingUtility().getConfiguration();
133    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 200 * 1024);
134    conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllLargeStoresPolicy.class.getName());
135    conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 40 * 1024);
136    // Intialize the region
137    HRegion region = initHRegion("testSelectiveFlushWithDataCompaction", conf);
138    // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3
139    for (int i = 1; i <= 1200; i++) {
140      region.put(createPut(1, i));
141
142      if (i <= 100) {
143        region.put(createPut(2, i));
144        if (i <= 50) {
145          region.put(createPut(3, i));
146        }
147      }
148    }
149
150    long totalMemstoreSize = region.getMemStoreDataSize();
151
152    // Find the smallest LSNs for edits wrt to each CF.
153    long smallestSeqCF1 = region.getOldestSeqIdOfStore(FAMILY1);
154    long smallestSeqCF2 = region.getOldestSeqIdOfStore(FAMILY2);
155    long smallestSeqCF3 = region.getOldestSeqIdOfStore(FAMILY3);
156
157    // Find the sizes of the memstores of each CF.
158    MemStoreSize cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize();
159    MemStoreSize cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize();
160    MemStoreSize cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
161
162    // Get the overall smallest LSN in the region's memstores.
163    long smallestSeqInRegionCurrentMemstore =
164      getWAL(region).getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
165
166    // The overall smallest LSN in the region's memstores should be the same as
167    // the LSN of the smallest edit in CF1
168    assertEquals(smallestSeqCF1, smallestSeqInRegionCurrentMemstore);
169
170    // Some other sanity checks.
171    assertTrue(smallestSeqCF1 < smallestSeqCF2);
172    assertTrue(smallestSeqCF2 < smallestSeqCF3);
173    assertTrue(cf1MemstoreSize.getDataSize() > 0);
174    assertTrue(cf2MemstoreSize.getDataSize() > 0);
175    assertTrue(cf3MemstoreSize.getDataSize() > 0);
176
177    // The total memstore size should be the same as the sum of the sizes of
178    // memstores of CF1, CF2 and CF3.
179    assertEquals(totalMemstoreSize, cf1MemstoreSize.getDataSize() + cf2MemstoreSize.getDataSize()
180      + cf3MemstoreSize.getDataSize());
181
182    // Flush!
183    region.flush(false);
184
185    // Will use these to check if anything changed.
186    MemStoreSize oldCF2MemstoreSize = cf2MemstoreSize;
187    MemStoreSize oldCF3MemstoreSize = cf3MemstoreSize;
188
189    // Recalculate everything
190    cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize();
191    cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize();
192    cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
193    totalMemstoreSize = region.getMemStoreDataSize();
194    smallestSeqInRegionCurrentMemstore =
195      getWAL(region).getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
196
197    // We should have cleared out only CF1, since we chose the flush thresholds
198    // and number of puts accordingly.
199    assertEquals(0, cf1MemstoreSize.getDataSize());
200    assertEquals(MutableSegment.DEEP_OVERHEAD, cf1MemstoreSize.getHeapSize());
201    // Nothing should have happened to CF2, ...
202    assertEquals(cf2MemstoreSize, oldCF2MemstoreSize);
203    // ... or CF3
204    assertEquals(cf3MemstoreSize, oldCF3MemstoreSize);
205    // Now the smallest LSN in the region should be the same as the smallest
206    // LSN in the memstore of CF2.
207    assertEquals(smallestSeqInRegionCurrentMemstore, smallestSeqCF2);
208    // Of course, this should hold too.
209    assertEquals(totalMemstoreSize, cf2MemstoreSize.getDataSize() + cf3MemstoreSize.getDataSize());
210
211    // Now add more puts (mostly for CF2), so that we only flush CF2 this time.
212    for (int i = 1200; i < 2400; i++) {
213      region.put(createPut(2, i));
214
215      // Add only 100 puts for CF3
216      if (i - 1200 < 100) {
217        region.put(createPut(3, i));
218      }
219    }
220
221    // How much does the CF3 memstore occupy? Will be used later.
222    oldCF3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
223
224    // Flush again
225    region.flush(false);
226
227    // Recalculate everything
228    cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize();
229    cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize();
230    cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
231    totalMemstoreSize = region.getMemStoreDataSize();
232    smallestSeqInRegionCurrentMemstore =
233      getWAL(region).getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
234
235    // CF1 and CF2, both should be absent.
236    assertEquals(0, cf1MemstoreSize.getDataSize());
237    assertEquals(MutableSegment.DEEP_OVERHEAD, cf1MemstoreSize.getHeapSize());
238    assertEquals(0, cf2MemstoreSize.getDataSize());
239    assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSize.getHeapSize());
240    // CF3 shouldn't have been touched.
241    assertEquals(cf3MemstoreSize, oldCF3MemstoreSize);
242    assertEquals(totalMemstoreSize, cf3MemstoreSize.getDataSize());
243
244    // What happens when we hit the memstore limit, but we are not able to find
245    // any Column Family above the threshold?
246    // In that case, we should flush all the CFs.
247
248    // Clearing the existing memstores.
249    region.flush(true);
250
251    // The memstore limit is 200*1024 and the column family flush threshold is
252    // around 50*1024. We try to just hit the memstore limit with each CF's
253    // memstore being below the CF flush threshold.
254    for (int i = 1; i <= 300; i++) {
255      region.put(createPut(1, i));
256      region.put(createPut(2, i));
257      region.put(createPut(3, i));
258      region.put(createPut(4, i));
259      region.put(createPut(5, i));
260    }
261
262    region.flush(false);
263
264    // Since we won't find any CF above the threshold, and hence no specific
265    // store to flush, we should flush all the memstores.
266    assertEquals(0, region.getMemStoreDataSize());
267    HBaseTestingUtility.closeRegionAndWAL(region);
268  }
269
270  @Test
271  public void testSelectiveFlushWhenNotEnabled() throws IOException {
272    // Set up the configuration, use new one to not conflict with minicluster in other tests
273    Configuration conf = new HBaseTestingUtility().getConfiguration();
274    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 200 * 1024);
275    conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllStoresPolicy.class.getName());
276
277    // Intialize the HRegion
278    HRegion region = initHRegion("testSelectiveFlushWhenNotEnabled", conf);
279    // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3
280    for (int i = 1; i <= 1200; i++) {
281      region.put(createPut(1, i));
282      if (i <= 100) {
283        region.put(createPut(2, i));
284        if (i <= 50) {
285          region.put(createPut(3, i));
286        }
287      }
288    }
289
290    long totalMemstoreSize = region.getMemStoreDataSize();
291
292    // Find the sizes of the memstores of each CF.
293    MemStoreSize cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize();
294    MemStoreSize cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize();
295    MemStoreSize cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
296
297    // Some other sanity checks.
298    assertTrue(cf1MemstoreSize.getDataSize() > 0);
299    assertTrue(cf2MemstoreSize.getDataSize() > 0);
300    assertTrue(cf3MemstoreSize.getDataSize() > 0);
301
302    // The total memstore size should be the same as the sum of the sizes of
303    // memstores of CF1, CF2 and CF3.
304    assertEquals(totalMemstoreSize, cf1MemstoreSize.getDataSize() + cf2MemstoreSize.getDataSize()
305      + cf3MemstoreSize.getDataSize());
306
307    // Flush!
308    region.flush(false);
309
310    cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize();
311    cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize();
312    cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
313    totalMemstoreSize = region.getMemStoreDataSize();
314    long smallestSeqInRegionCurrentMemstore =
315      region.getWAL().getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
316
317    // Everything should have been cleared
318    assertEquals(0, cf1MemstoreSize.getDataSize());
319    assertEquals(MutableSegment.DEEP_OVERHEAD, cf1MemstoreSize.getHeapSize());
320    assertEquals(0, cf2MemstoreSize.getDataSize());
321    assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSize.getHeapSize());
322    assertEquals(0, cf3MemstoreSize.getDataSize());
323    assertEquals(MutableSegment.DEEP_OVERHEAD, cf3MemstoreSize.getHeapSize());
324    assertEquals(0, totalMemstoreSize);
325    assertEquals(HConstants.NO_SEQNUM, smallestSeqInRegionCurrentMemstore);
326    HBaseTestingUtility.closeRegionAndWAL(region);
327  }
328
329  // Find the (first) region which has the specified name.
330  private static Pair<HRegion, HRegionServer> getRegionWithName(TableName tableName) {
331    MiniHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster();
332    List<JVMClusterUtil.RegionServerThread> rsts = cluster.getRegionServerThreads();
333    for (int i = 0; i < cluster.getRegionServerThreads().size(); i++) {
334      HRegionServer hrs = rsts.get(i).getRegionServer();
335      for (HRegion region : hrs.getRegions(tableName)) {
336        return Pair.newPair(region, hrs);
337      }
338    }
339    return null;
340  }
341
342  private void doTestLogReplay() throws Exception {
343    Configuration conf = TEST_UTIL.getConfiguration();
344    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 10000);
345    // Carefully chosen limits so that the memstore just flushes when we're done
346    conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllLargeStoresPolicy.class.getName());
347    conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 2500);
348    final int numRegionServers = 4;
349    try {
350      TEST_UTIL.startMiniCluster(numRegionServers);
351      TEST_UTIL.getAdmin()
352        .createNamespace(NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build());
353      Table table = TEST_UTIL.createTable(TABLENAME, FAMILIES);
354      HTableDescriptor htd = table.getTableDescriptor();
355
356      for (byte[] family : FAMILIES) {
357        if (!htd.hasFamily(family)) {
358          htd.addFamily(new HColumnDescriptor(family));
359        }
360      }
361
362      // Add 100 edits for CF1, 20 for CF2, 20 for CF3.
363      // These will all be interleaved in the log.
364      for (int i = 1; i <= 80; i++) {
365        table.put(createPut(1, i));
366        if (i <= 10) {
367          table.put(createPut(2, i));
368          table.put(createPut(3, i));
369        }
370      }
371      Thread.sleep(1000);
372
373      Pair<HRegion, HRegionServer> desiredRegionAndServer = getRegionWithName(TABLENAME);
374      HRegion desiredRegion = desiredRegionAndServer.getFirst();
375      assertTrue("Could not find a region which hosts the new region.", desiredRegion != null);
376
377      // Flush the region selectively.
378      desiredRegion.flush(false);
379
380      long totalMemstoreSize;
381      long cf1MemstoreSize, cf2MemstoreSize, cf3MemstoreSize;
382      totalMemstoreSize = desiredRegion.getMemStoreDataSize();
383
384      // Find the sizes of the memstores of each CF.
385      cf1MemstoreSize = desiredRegion.getStore(FAMILY1).getMemStoreSize().getDataSize();
386      cf2MemstoreSize = desiredRegion.getStore(FAMILY2).getMemStoreSize().getDataSize();
387      cf3MemstoreSize = desiredRegion.getStore(FAMILY3).getMemStoreSize().getDataSize();
388
389      // CF1 Should have been flushed
390      assertEquals(0, cf1MemstoreSize);
391      // CF2 and CF3 shouldn't have been flushed.
392      // TODO: This test doesn't allow for this case:
393      // " Since none of the CFs were above the size, flushing all."
394      // i.e. a flush happens before we get to here and its a flush-all.
395      assertTrue(cf2MemstoreSize >= 0);
396      assertTrue(cf3MemstoreSize >= 0);
397      assertEquals(totalMemstoreSize, cf2MemstoreSize + cf3MemstoreSize);
398
399      // Wait for the RS report to go across to the master, so that the master
400      // is aware of which sequence ids have been flushed, before we kill the RS.
401      // If in production, the RS dies before the report goes across, we will
402      // safely replay all the edits.
403      Thread.sleep(2000);
404
405      // Abort the region server where we have the region hosted.
406      HRegionServer rs = desiredRegionAndServer.getSecond();
407      rs.abort("testing");
408
409      // The aborted region server's regions will be eventually assigned to some
410      // other region server, and the get RPC call (inside verifyEdit()) will
411      // retry for some time till the regions come back up.
412
413      // Verify that all the edits are safe.
414      for (int i = 1; i <= 80; i++) {
415        verifyEdit(1, i, table);
416        if (i <= 10) {
417          verifyEdit(2, i, table);
418          verifyEdit(3, i, table);
419        }
420      }
421    } finally {
422      TEST_UTIL.shutdownMiniCluster();
423    }
424  }
425
426  // Test Log Replay with Distributed log split on.
427  @Test
428  public void testLogReplayWithDistributedLogSplit() throws Exception {
429    doTestLogReplay();
430  }
431
432  private WAL getWAL(Region region) {
433    return ((HRegion) region).getWAL();
434  }
435
436  private int getNumRolledLogFiles(Region region) {
437    return AbstractFSWALProvider.getNumRolledLogFiles(getWAL(region));
438  }
439
440  /**
441   * When a log roll is about to happen, we do a flush of the regions who will be affected by the
442   * log roll. These flushes cannot be a selective flushes, otherwise we cannot roll the logs. This
443   * test ensures that we do a full-flush in that scenario. n
444   */
445  @Test
446  public void testFlushingWhenLogRolling() throws Exception {
447    TableName tableName = TableName.valueOf("testFlushingWhenLogRolling");
448    Configuration conf = TEST_UTIL.getConfiguration();
449    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 128 * 1024 * 1024);
450    conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllLargeStoresPolicy.class.getName());
451    long cfFlushSizeLowerBound = 2048;
452    conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN,
453      cfFlushSizeLowerBound);
454
455    // One hour, prevent periodic rolling
456    conf.setLong("hbase.regionserver.logroll.period", 60L * 60 * 1000);
457    // prevent rolling by size
458    conf.setLong("hbase.regionserver.hlog.blocksize", 128L * 1024 * 1024);
459    // Make it 10 as max logs before a flush comes on.
460    final int maxLogs = 10;
461    conf.setInt("hbase.regionserver.maxlogs", maxLogs);
462
463    final int numRegionServers = 1;
464    TEST_UTIL.startMiniCluster(numRegionServers);
465    try {
466      Table table = TEST_UTIL.createTable(tableName, FAMILIES);
467      // Force flush the namespace table so edits to it are not hanging around as oldest
468      // edits. Otherwise, below, when we make maximum number of WAL files, then it will be
469      // the namespace region that is flushed and not the below 'desiredRegion'.
470      try (Admin admin = TEST_UTIL.getConnection().getAdmin()) {
471        admin.flush(TableName.NAMESPACE_TABLE_NAME);
472      }
473      Pair<HRegion, HRegionServer> desiredRegionAndServer = getRegionWithName(tableName);
474      final HRegion desiredRegion = desiredRegionAndServer.getFirst();
475      assertTrue("Could not find a region which hosts the new region.", desiredRegion != null);
476      LOG.info("Writing to region=" + desiredRegion);
477
478      // Add one row for both CFs.
479      for (int i = 1; i <= 3; i++) {
480        table.put(createPut(i, 0));
481      }
482      // Now only add row to CF1, make sure when we force a flush, CF1 is larger than the lower
483      // bound and CF2 and CF3 are smaller than the lower bound.
484      for (int i = 0; i < maxLogs; i++) {
485        for (int j = 0; j < 100; j++) {
486          table.put(createPut(1, i * 100 + j));
487        }
488        // Roll the WAL. The log file count is less than maxLogs so no flush is triggered.
489        int currentNumRolledLogFiles = getNumRolledLogFiles(desiredRegion);
490        assertNull(getWAL(desiredRegion).rollWriter());
491        while (getNumRolledLogFiles(desiredRegion) <= currentNumRolledLogFiles) {
492          Thread.sleep(100);
493        }
494      }
495      assertEquals(maxLogs, getNumRolledLogFiles(desiredRegion));
496      assertTrue(
497        desiredRegion.getStore(FAMILY1).getMemStoreSize().getHeapSize() > cfFlushSizeLowerBound);
498      assertTrue(
499        desiredRegion.getStore(FAMILY2).getMemStoreSize().getHeapSize() < cfFlushSizeLowerBound);
500      assertTrue(
501        desiredRegion.getStore(FAMILY3).getMemStoreSize().getHeapSize() < cfFlushSizeLowerBound);
502      table.put(createPut(1, 12345678));
503      // Make numRolledLogFiles greater than maxLogs
504      desiredRegionAndServer.getSecond().getWalRoller().requestRollAll();
505      // Wait for some time till the flush caused by log rolling happens.
506      TEST_UTIL.waitFor(30000, new Waiter.ExplainingPredicate<Exception>() {
507
508        @Override
509        public boolean evaluate() throws Exception {
510          return desiredRegion.getMemStoreDataSize() == 0;
511        }
512
513        @Override
514        public String explainFailure() throws Exception {
515          long memstoreSize = desiredRegion.getMemStoreDataSize();
516          if (memstoreSize > 0) {
517            return "Still have unflushed entries in memstore, memstore size is " + memstoreSize;
518          }
519          return "Unknown";
520        }
521      });
522      LOG.info("Finished waiting on flush after too many WALs...");
523      // Individual families should have been flushed.
524      assertEquals(MutableSegment.DEEP_OVERHEAD,
525        desiredRegion.getStore(FAMILY1).getMemStoreSize().getHeapSize());
526      assertEquals(MutableSegment.DEEP_OVERHEAD,
527        desiredRegion.getStore(FAMILY2).getMemStoreSize().getHeapSize());
528      assertEquals(MutableSegment.DEEP_OVERHEAD,
529        desiredRegion.getStore(FAMILY3).getMemStoreSize().getHeapSize());
530      // let WAL cleanOldLogs
531      assertNull(getWAL(desiredRegion).rollWriter(true));
532      assertTrue(getNumRolledLogFiles(desiredRegion) < maxLogs);
533    } finally {
534      TEST_UTIL.shutdownMiniCluster();
535    }
536  }
537
538  private void doPut(Table table, long memstoreFlushSize) throws IOException, InterruptedException {
539    Region region = getRegionWithName(table.getName()).getFirst();
540    // cf1 4B per row, cf2 40B per row and cf3 400B per row
541    byte[] qf = Bytes.toBytes("qf");
542    for (int i = 0; i < 10000; i++) {
543      Put put = new Put(Bytes.toBytes("row-" + i));
544      byte[] value1 = new byte[100];
545      Bytes.random(value1);
546      put.addColumn(FAMILY1, qf, value1);
547      byte[] value2 = new byte[200];
548      Bytes.random(value2);
549      put.addColumn(FAMILY2, qf, value2);
550      byte[] value3 = new byte[400];
551      Bytes.random(value3);
552      put.addColumn(FAMILY3, qf, value3);
553      table.put(put);
554      // slow down to let regionserver flush region.
555      while (region.getMemStoreHeapSize() > memstoreFlushSize) {
556        Thread.sleep(100);
557      }
558    }
559  }
560
561  // Under the same write load, small stores should have less store files when
562  // percolumnfamilyflush enabled.
563  @Test
564  public void testCompareStoreFileCount() throws Exception {
565    long memstoreFlushSize = 1024L * 1024;
566    Configuration conf = TEST_UTIL.getConfiguration();
567    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, memstoreFlushSize);
568    conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllStoresPolicy.class.getName());
569    conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000);
570    conf.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY,
571      ConstantSizeRegionSplitPolicy.class.getName());
572
573    HTableDescriptor htd = new HTableDescriptor(TABLENAME);
574    htd.setCompactionEnabled(false);
575    htd.addFamily(new HColumnDescriptor(FAMILY1));
576    htd.addFamily(new HColumnDescriptor(FAMILY2));
577    htd.addFamily(new HColumnDescriptor(FAMILY3));
578
579    LOG.info("==============Test with selective flush disabled===============");
580    int cf1StoreFileCount = -1;
581    int cf2StoreFileCount = -1;
582    int cf3StoreFileCount = -1;
583    int cf1StoreFileCount1 = -1;
584    int cf2StoreFileCount1 = -1;
585    int cf3StoreFileCount1 = -1;
586    try {
587      TEST_UTIL.startMiniCluster(1);
588      TEST_UTIL.getAdmin()
589        .createNamespace(NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build());
590      TEST_UTIL.getAdmin().createTable(htd);
591      TEST_UTIL.waitTableAvailable(TABLENAME);
592      Connection conn = ConnectionFactory.createConnection(conf);
593      Table table = conn.getTable(TABLENAME);
594      doPut(table, memstoreFlushSize);
595      table.close();
596      conn.close();
597
598      Region region = getRegionWithName(TABLENAME).getFirst();
599      cf1StoreFileCount = region.getStore(FAMILY1).getStorefilesCount();
600      cf2StoreFileCount = region.getStore(FAMILY2).getStorefilesCount();
601      cf3StoreFileCount = region.getStore(FAMILY3).getStorefilesCount();
602    } finally {
603      TEST_UTIL.shutdownMiniCluster();
604    }
605
606    LOG.info("==============Test with selective flush enabled===============");
607    conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllLargeStoresPolicy.class.getName());
608    // default value of per-cf flush lower bound is too big, set to a small enough value
609    conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 0);
610    try {
611      TEST_UTIL.startMiniCluster(1);
612      TEST_UTIL.getAdmin()
613        .createNamespace(NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build());
614      TEST_UTIL.getAdmin().createTable(htd);
615      Connection conn = ConnectionFactory.createConnection(conf);
616      Table table = conn.getTable(TABLENAME);
617      doPut(table, memstoreFlushSize);
618      table.close();
619      conn.close();
620
621      Region region = getRegionWithName(TABLENAME).getFirst();
622      cf1StoreFileCount1 = region.getStore(FAMILY1).getStorefilesCount();
623      cf2StoreFileCount1 = region.getStore(FAMILY2).getStorefilesCount();
624      cf3StoreFileCount1 = region.getStore(FAMILY3).getStorefilesCount();
625    } finally {
626      TEST_UTIL.shutdownMiniCluster();
627    }
628
629    LOG.info("disable selective flush: " + Bytes.toString(FAMILY1) + "=>" + cf1StoreFileCount + ", "
630      + Bytes.toString(FAMILY2) + "=>" + cf2StoreFileCount + ", " + Bytes.toString(FAMILY3) + "=>"
631      + cf3StoreFileCount);
632    LOG.info("enable selective flush: " + Bytes.toString(FAMILY1) + "=>" + cf1StoreFileCount1 + ", "
633      + Bytes.toString(FAMILY2) + "=>" + cf2StoreFileCount1 + ", " + Bytes.toString(FAMILY3) + "=>"
634      + cf3StoreFileCount1);
635    // small CF will have less store files.
636    assertTrue(cf1StoreFileCount1 < cf1StoreFileCount);
637    assertTrue(cf2StoreFileCount1 < cf2StoreFileCount);
638  }
639
640  public static void main(String[] args) throws Exception {
641    int numRegions = Integer.parseInt(args[0]);
642    long numRows = Long.parseLong(args[1]);
643
644    HTableDescriptor htd = new HTableDescriptor(TABLENAME);
645    htd.setMaxFileSize(10L * 1024 * 1024 * 1024);
646    htd.setValue(HTableDescriptor.SPLIT_POLICY, ConstantSizeRegionSplitPolicy.class.getName());
647    htd.addFamily(new HColumnDescriptor(FAMILY1));
648    htd.addFamily(new HColumnDescriptor(FAMILY2));
649    htd.addFamily(new HColumnDescriptor(FAMILY3));
650
651    Configuration conf = HBaseConfiguration.create();
652    Connection conn = ConnectionFactory.createConnection(conf);
653    Admin admin = conn.getAdmin();
654    if (admin.tableExists(TABLENAME)) {
655      admin.disableTable(TABLENAME);
656      admin.deleteTable(TABLENAME);
657    }
658    if (numRegions >= 3) {
659      byte[] startKey = new byte[16];
660      byte[] endKey = new byte[16];
661      Arrays.fill(endKey, (byte) 0xFF);
662      admin.createTable(htd, startKey, endKey, numRegions);
663    } else {
664      admin.createTable(htd);
665    }
666    admin.close();
667
668    Table table = conn.getTable(TABLENAME);
669    byte[] qf = Bytes.toBytes("qf");
670    byte[] value1 = new byte[16];
671    byte[] value2 = new byte[256];
672    byte[] value3 = new byte[4096];
673    for (long i = 0; i < numRows; i++) {
674      Put put = new Put(Hashing.md5().hashLong(i).asBytes());
675      Bytes.random(value1);
676      Bytes.random(value2);
677      Bytes.random(value3);
678      put.addColumn(FAMILY1, qf, value1);
679      put.addColumn(FAMILY2, qf, value2);
680      put.addColumn(FAMILY3, qf, value3);
681      table.put(put);
682      if (i % 10000 == 0) {
683        LOG.info(i + " rows put");
684      }
685    }
686    table.close();
687    conn.close();
688  }
689}