View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.regionserver;
19  
20  import java.util.Collection;
21  import java.util.HashSet;
22  import java.util.Set;
23  
24  import org.apache.commons.logging.Log;
25  import org.apache.commons.logging.LogFactory;
26  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
27  import org.apache.hadoop.hbase.classification.InterfaceAudience;
28  
29  /**
30   * A {@link FlushPolicy} that only flushes store larger a given threshold. If no store is large
31   * enough, then all stores will be flushed.
32   */
33  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
34  public class FlushLargeStoresPolicy extends FlushPolicy {
35  
36    private static final Log LOG = LogFactory.getLog(FlushLargeStoresPolicy.class);
37  
38    public static final String HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND =
39        "hbase.hregion.percolumnfamilyflush.size.lower.bound";
40  
41    private static final long DEFAULT_HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND = 1024 * 1024 * 16L;
42  
43    private long flushSizeLowerBound;
44  
45    @Override
46    protected void configureForRegion(HRegion region) {
47      super.configureForRegion(region);
48      long flushSizeLowerBound;
49      String flushedSizeLowerBoundString =
50          region.getTableDesc().getValue(HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND);
51      if (flushedSizeLowerBoundString == null) {
52        flushSizeLowerBound =
53            getConf().getLong(HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND,
54              DEFAULT_HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND);
55        if (LOG.isDebugEnabled()) {
56          LOG.debug(HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND
57              + " is not specified, use global config(" + flushSizeLowerBound + ") instead");
58        }
59      } else {
60        try {
61          flushSizeLowerBound = Long.parseLong(flushedSizeLowerBoundString);
62        } catch (NumberFormatException nfe) {
63          flushSizeLowerBound =
64              getConf().getLong(HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND,
65                DEFAULT_HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND);
66          LOG.warn("Number format exception when parsing "
67              + HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND + " for table "
68              + region.getTableDesc().getTableName() + ":" + flushedSizeLowerBoundString + ". " + nfe
69              + ", use global config(" + flushSizeLowerBound + ") instead");
70  
71        }
72      }
73      this.flushSizeLowerBound = flushSizeLowerBound;
74    }
75  
76    private boolean shouldFlush(Store store) {
77      if (store.getMemStoreSize() > this.flushSizeLowerBound) {
78        if (LOG.isDebugEnabled()) {
79          LOG.debug("Column Family: " + store.getColumnFamilyName() + " of region " + region
80              + " will be flushed because of memstoreSize(" + store.getMemStoreSize()
81              + ") is larger than lower bound(" + this.flushSizeLowerBound + ")");
82        }
83        return true;
84      }
85      return region.shouldFlushStore(store);
86    }
87  
88    @Override
89    public Collection<Store> selectStoresToFlush() {
90      Collection<Store> stores = region.stores.values();
91      Set<Store> specificStoresToFlush = new HashSet<Store>();
92      for (Store store : stores) {
93        if (shouldFlush(store)) {
94          specificStoresToFlush.add(store);
95        }
96      }
97      // Didn't find any CFs which were above the threshold for selection.
98      if (specificStoresToFlush.isEmpty()) {
99        if (LOG.isDebugEnabled()) {
100         LOG.debug("Since none of the CFs were above the size, flushing all.");
101       }
102       return stores;
103     } else {
104       return specificStoresToFlush;
105     }
106   }
107 
108 }