1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.regionserver;
19
20 import java.util.Collection;
21 import java.util.HashSet;
22 import java.util.Set;
23
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
27 import org.apache.hadoop.hbase.classification.InterfaceAudience;
28
29
30
31
32
33 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
34 public class FlushLargeStoresPolicy extends FlushPolicy {
35
36 private static final Log LOG = LogFactory.getLog(FlushLargeStoresPolicy.class);
37
38 public static final String HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND =
39 "hbase.hregion.percolumnfamilyflush.size.lower.bound";
40
41 private static final long DEFAULT_HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND = 1024 * 1024 * 16L;
42
43 private long flushSizeLowerBound;
44
45 @Override
46 protected void configureForRegion(HRegion region) {
47 super.configureForRegion(region);
48 long flushSizeLowerBound;
49 String flushedSizeLowerBoundString =
50 region.getTableDesc().getValue(HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND);
51 if (flushedSizeLowerBoundString == null) {
52 flushSizeLowerBound =
53 getConf().getLong(HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND,
54 DEFAULT_HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND);
55 if (LOG.isDebugEnabled()) {
56 LOG.debug(HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND
57 + " is not specified, use global config(" + flushSizeLowerBound + ") instead");
58 }
59 } else {
60 try {
61 flushSizeLowerBound = Long.parseLong(flushedSizeLowerBoundString);
62 } catch (NumberFormatException nfe) {
63 flushSizeLowerBound =
64 getConf().getLong(HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND,
65 DEFAULT_HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND);
66 LOG.warn("Number format exception when parsing "
67 + HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND + " for table "
68 + region.getTableDesc().getTableName() + ":" + flushedSizeLowerBoundString + ". " + nfe
69 + ", use global config(" + flushSizeLowerBound + ") instead");
70
71 }
72 }
73 this.flushSizeLowerBound = flushSizeLowerBound;
74 }
75
76 private boolean shouldFlush(Store store) {
77 if (store.getMemStoreSize() > this.flushSizeLowerBound) {
78 if (LOG.isDebugEnabled()) {
79 LOG.debug("Column Family: " + store.getColumnFamilyName() + " of region " + region
80 + " will be flushed because of memstoreSize(" + store.getMemStoreSize()
81 + ") is larger than lower bound(" + this.flushSizeLowerBound + ")");
82 }
83 return true;
84 }
85 return region.shouldFlushStore(store);
86 }
87
88 @Override
89 public Collection<Store> selectStoresToFlush() {
90 Collection<Store> stores = region.stores.values();
91 Set<Store> specificStoresToFlush = new HashSet<Store>();
92 for (Store store : stores) {
93 if (shouldFlush(store)) {
94 specificStoresToFlush.add(store);
95 }
96 }
97
98 if (specificStoresToFlush.isEmpty()) {
99 if (LOG.isDebugEnabled()) {
100 LOG.debug("Since none of the CFs were above the size, flushing all.");
101 }
102 return stores;
103 } else {
104 return specificStoresToFlush;
105 }
106 }
107
108 }