1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.wal;
20
21 import java.io.IOException;
22 import java.util.Collections;
23 import java.util.List;
24 import java.util.UUID;
25 import java.util.concurrent.ConcurrentHashMap;
26 import java.util.concurrent.ConcurrentMap;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.hbase.classification.InterfaceAudience;
31 import org.apache.hadoop.conf.Configuration;
32
33
34 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52 @InterfaceAudience.Private
53 class RegionGroupingProvider implements WALProvider {
54 private static final Log LOG = LogFactory.getLog(RegionGroupingProvider.class);
55
56
57
58
59 public static interface RegionGroupingStrategy {
60
61
62
63
64
65 byte[] group(final byte[] identifier);
66 void init(Configuration config);
67 }
68
69
70
71
72 static enum Strategies {
73 defaultStrategy(IdentityGroupingStrategy.class),
74 identity(IdentityGroupingStrategy.class);
75
76 final Class<? extends RegionGroupingStrategy> clazz;
77 Strategies(Class<? extends RegionGroupingStrategy> clazz) {
78 this.clazz = clazz;
79 }
80 }
81
82
83
84
85
86 RegionGroupingStrategy getStrategy(final Configuration conf, final String key,
87 final String defaultValue) throws IOException {
88 Class<? extends RegionGroupingStrategy> clazz;
89 try {
90 clazz = Strategies.valueOf(conf.get(key, defaultValue)).clazz;
91 } catch (IllegalArgumentException exception) {
92
93
94
95 clazz = conf.getClass(key, IdentityGroupingStrategy.class, RegionGroupingStrategy.class);
96 }
97 LOG.info("Instantiating RegionGroupingStrategy of type " + clazz);
98 try {
99 final RegionGroupingStrategy result = clazz.newInstance();
100 result.init(conf);
101 return result;
102 } catch (InstantiationException exception) {
103 LOG.error("couldn't set up region grouping strategy, check config key " +
104 REGION_GROUPING_STRATEGY);
105 LOG.debug("Exception details for failure to load region grouping strategy.", exception);
106 throw new IOException("couldn't set up region grouping strategy", exception);
107 } catch (IllegalAccessException exception) {
108 LOG.error("couldn't set up region grouping strategy, check config key " +
109 REGION_GROUPING_STRATEGY);
110 LOG.debug("Exception details for failure to load region grouping strategy.", exception);
111 throw new IOException("couldn't set up region grouping strategy", exception);
112 }
113 }
114
115 private static final String REGION_GROUPING_STRATEGY = "hbase.wal.regiongrouping.strategy";
116 private static final String DEFAULT_REGION_GROUPING_STRATEGY = Strategies.defaultStrategy.name();
117
118 static final String DELEGATE_PROVIDER = "hbase.wal.regiongrouping.delegate";
119 static final String DEFAULT_DELEGATE_PROVIDER = WALFactory.Providers.defaultProvider.name();
120
121 protected final ConcurrentMap<byte[], WALProvider> cached =
122 new ConcurrentHashMap<byte[], WALProvider>();
123
124
125 protected RegionGroupingStrategy strategy = null;
126 private WALFactory factory = null;
127 private List<WALActionsListener> listeners = null;
128 private String providerId = null;
129
130 @Override
131 public void init(final WALFactory factory, final Configuration conf,
132 final List<WALActionsListener> listeners, final String providerId) throws IOException {
133 if (null != strategy) {
134 throw new IllegalStateException("WALProvider.init should only be called once.");
135 }
136 this.factory = factory;
137 this.listeners = null == listeners ? null : Collections.unmodifiableList(listeners);
138 this.providerId = providerId;
139 this.strategy = getStrategy(conf, REGION_GROUPING_STRATEGY, DEFAULT_REGION_GROUPING_STRATEGY);
140 }
141
142
143
144
145 WALProvider populateCache(final byte[] group) throws IOException {
146 final WALProvider temp = factory.getProvider(DELEGATE_PROVIDER, DEFAULT_DELEGATE_PROVIDER,
147 listeners, providerId + "-" + UUID.randomUUID());
148 final WALProvider extant = cached.putIfAbsent(group, temp);
149 if (null != extant) {
150
151 temp.close();
152 return extant;
153 }
154 return temp;
155 }
156
157 @Override
158 public WAL getWAL(final byte[] identifier) throws IOException {
159 final byte[] group = strategy.group(identifier);
160 WALProvider provider = cached.get(group);
161 if (null == provider) {
162 provider = populateCache(group);
163 }
164 return provider.getWAL(identifier);
165 }
166
167 @Override
168 public void shutdown() throws IOException {
169
170 IOException failure = null;
171 for (WALProvider provider : cached.values()) {
172 try {
173 provider.shutdown();
174 } catch (IOException exception) {
175 LOG.error("Problem shutting down provider '" + provider + "': " + exception.getMessage());
176 LOG.debug("Details of problem shutting down provider '" + provider + "'", exception);
177 failure = exception;
178 }
179 }
180 if (failure != null) {
181 throw failure;
182 }
183 }
184
185 @Override
186 public void close() throws IOException {
187
188 IOException failure = null;
189 for (WALProvider provider : cached.values()) {
190 try {
191 provider.close();
192 } catch (IOException exception) {
193 LOG.error("Problem closing provider '" + provider + "': " + exception.getMessage());
194 LOG.debug("Details of problem shutting down provider '" + provider + "'", exception);
195 failure = exception;
196 }
197 }
198 if (failure != null) {
199 throw failure;
200 }
201 }
202
203 static class IdentityGroupingStrategy implements RegionGroupingStrategy {
204 @Override
205 public void init(Configuration config) {}
206 @Override
207 public byte[] group(final byte[] identifier) {
208 return identifier;
209 }
210 }
211
212 }