1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import java.io.IOException;
22 import java.util.Comparator;
23 import java.util.List;
24
25 import org.apache.commons.lang.ClassUtils;
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.hbase.classification.InterfaceAudience;
29 import org.apache.hadoop.hbase.classification.InterfaceStability;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.hbase.CellScanner;
32 import org.apache.hadoop.hbase.Coprocessor;
33 import org.apache.hadoop.hbase.CoprocessorEnvironment;
34 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
35 import org.apache.hadoop.hbase.MetaMutationAnnotation;
36 import org.apache.hadoop.hbase.client.Mutation;
37 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
38 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
39 import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
40 import org.apache.hadoop.hbase.coprocessor.RegionServerObserver;
41 import org.apache.hadoop.hbase.coprocessor.SingletonCoprocessorService;
42 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
43 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
44
45 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
46 @InterfaceStability.Evolving
47 public class RegionServerCoprocessorHost extends
48 CoprocessorHost<RegionServerCoprocessorHost.RegionServerEnvironment> {
49
50 private static final Log LOG = LogFactory.getLog(RegionServerCoprocessorHost.class);
51
52 private RegionServerServices rsServices;
53
54 public RegionServerCoprocessorHost(RegionServerServices rsServices,
55 Configuration conf) {
56 super(rsServices);
57 this.rsServices = rsServices;
58 this.conf = conf;
59
60
61
62 boolean coprocessorsEnabled = conf.getBoolean(COPROCESSORS_ENABLED_CONF_KEY,
63 DEFAULT_COPROCESSORS_ENABLED);
64 boolean tableCoprocessorsEnabled = conf.getBoolean(USER_COPROCESSORS_ENABLED_CONF_KEY,
65 DEFAULT_USER_COPROCESSORS_ENABLED);
66 LOG.info("System coprocessor loading is " + (coprocessorsEnabled ? "enabled" : "disabled"));
67 LOG.info("Table coprocessor loading is " +
68 ((coprocessorsEnabled && tableCoprocessorsEnabled) ? "enabled" : "disabled"));
69 loadSystemCoprocessors(conf, REGIONSERVER_COPROCESSOR_CONF_KEY);
70 }
71
72 @Override
73 public RegionServerEnvironment createEnvironment(Class<?> implClass,
74 Coprocessor instance, int priority, int sequence, Configuration conf) {
75 return new RegionServerEnvironment(implClass, instance, priority,
76 sequence, conf, this.rsServices);
77 }
78
79 public void preStop(String message) throws IOException {
80 execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
81 @Override
82 public void call(RegionServerObserver oserver,
83 ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
84 oserver.preStopRegionServer(ctx);
85 }
86 @Override
87 public void postEnvCall(RegionServerEnvironment env) {
88
89 shutdown(env);
90 }
91 });
92 }
93
94 public boolean preMerge(final HRegion regionA, final HRegion regionB) throws IOException {
95 return execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
96 @Override
97 public void call(RegionServerObserver oserver,
98 ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
99 oserver.preMerge(ctx, regionA, regionB);
100 }
101 });
102 }
103
104 public void postMerge(final HRegion regionA, final HRegion regionB, final HRegion mergedRegion)
105 throws IOException {
106 execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
107 @Override
108 public void call(RegionServerObserver oserver,
109 ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
110 oserver.postMerge(ctx, regionA, regionB, mergedRegion);
111 }
112 });
113 }
114
115 public boolean preMergeCommit(final HRegion regionA, final HRegion regionB,
116 final @MetaMutationAnnotation List<Mutation> metaEntries) throws IOException {
117 return execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
118 @Override
119 public void call(RegionServerObserver oserver,
120 ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
121 oserver.preMergeCommit(ctx, regionA, regionB, metaEntries);
122 }
123 });
124 }
125
126 public void postMergeCommit(final HRegion regionA, final HRegion regionB,
127 final HRegion mergedRegion) throws IOException {
128 execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
129 @Override
130 public void call(RegionServerObserver oserver,
131 ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
132 oserver.postMergeCommit(ctx, regionA, regionB, mergedRegion);
133 }
134 });
135 }
136
137 public void preRollBackMerge(final HRegion regionA, final HRegion regionB) throws IOException {
138 execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
139 @Override
140 public void call(RegionServerObserver oserver,
141 ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
142 oserver.preRollBackMerge(ctx, regionA, regionB);
143 }
144 });
145 }
146
147 public void postRollBackMerge(final HRegion regionA, final HRegion regionB) throws IOException {
148 execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
149 @Override
150 public void call(RegionServerObserver oserver,
151 ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
152 oserver.postRollBackMerge(ctx, regionA, regionB);
153 }
154 });
155 }
156
157 public void preRollWALWriterRequest() throws IOException {
158 execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
159 @Override
160 public void call(RegionServerObserver oserver,
161 ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
162 oserver.preRollWALWriterRequest(ctx);
163 }
164 });
165 }
166
167 public void postRollWALWriterRequest() throws IOException {
168 execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
169 @Override
170 public void call(RegionServerObserver oserver,
171 ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
172 oserver.postRollWALWriterRequest(ctx);
173 }
174 });
175 }
176
177 public void preReplicateLogEntries(final List<WALEntry> entries, final CellScanner cells)
178 throws IOException {
179 execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
180 @Override
181 public void call(RegionServerObserver oserver,
182 ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
183 oserver.preReplicateLogEntries(ctx, entries, cells);
184 }
185 });
186 }
187
188 public void postReplicateLogEntries(final List<WALEntry> entries, final CellScanner cells)
189 throws IOException {
190 execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
191 @Override
192 public void call(RegionServerObserver oserver,
193 ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
194 oserver.postReplicateLogEntries(ctx, entries, cells);
195 }
196 });
197 }
198
199 public ReplicationEndpoint postCreateReplicationEndPoint(final ReplicationEndpoint endpoint)
200 throws IOException {
201 return execOperationWithResult(endpoint, coprocessors.isEmpty() ? null
202 : new CoprocessOperationWithResult<ReplicationEndpoint>() {
203 @Override
204 public void call(RegionServerObserver oserver,
205 ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
206 setResult(oserver.postCreateReplicationEndPoint(ctx, getResult()));
207 }
208 });
209 }
210
211 private <T> T execOperationWithResult(final T defaultValue,
212 final CoprocessOperationWithResult<T> ctx) throws IOException {
213 if (ctx == null)
214 return defaultValue;
215 ctx.setResult(defaultValue);
216 execOperation(ctx);
217 return ctx.getResult();
218 }
219
220 private static abstract class CoprocessorOperation
221 extends ObserverContext<RegionServerCoprocessorEnvironment> {
222 public CoprocessorOperation() {
223 }
224
225 public abstract void call(RegionServerObserver oserver,
226 ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException;
227
228 public void postEnvCall(RegionServerEnvironment env) {
229 }
230 }
231
232 private static abstract class CoprocessOperationWithResult<T> extends CoprocessorOperation {
233 private T result = null;
234
235 public void setResult(final T result) {
236 this.result = result;
237 }
238
239 public T getResult() {
240 return this.result;
241 }
242 }
243
244 private boolean execOperation(final CoprocessorOperation ctx) throws IOException {
245 if (ctx == null) return false;
246
247 boolean bypass = false;
248 for (RegionServerEnvironment env: coprocessors) {
249 if (env.getInstance() instanceof RegionServerObserver) {
250 ctx.prepare(env);
251 Thread currentThread = Thread.currentThread();
252 ClassLoader cl = currentThread.getContextClassLoader();
253 try {
254 currentThread.setContextClassLoader(env.getClassLoader());
255 ctx.call((RegionServerObserver)env.getInstance(), ctx);
256 } catch (Throwable e) {
257 handleCoprocessorThrowable(env, e);
258 } finally {
259 currentThread.setContextClassLoader(cl);
260 }
261 bypass |= ctx.shouldBypass();
262 if (ctx.shouldComplete()) {
263 break;
264 }
265 }
266 ctx.postEnvCall(env);
267 }
268 return bypass;
269 }
270
271
272
273
274
275 static class RegionServerEnvironment extends CoprocessorHost.Environment
276 implements RegionServerCoprocessorEnvironment {
277
278 private RegionServerServices regionServerServices;
279
280 public RegionServerEnvironment(final Class<?> implClass,
281 final Coprocessor impl, final int priority, final int seq,
282 final Configuration conf, final RegionServerServices services) {
283 super(impl, priority, seq, conf);
284 this.regionServerServices = services;
285 for (Object itf : ClassUtils.getAllInterfaces(implClass)) {
286 Class<?> c = (Class<?>) itf;
287 if (SingletonCoprocessorService.class.isAssignableFrom(c)) {
288 this.regionServerServices.registerService(
289 ((SingletonCoprocessorService) impl).getService());
290 break;
291 }
292 }
293 }
294
295 @Override
296 public RegionServerServices getRegionServerServices() {
297 return regionServerServices;
298 }
299 }
300
301
302
303
304
305 static class EnvironmentPriorityComparator implements
306 Comparator<CoprocessorEnvironment> {
307 public int compare(final CoprocessorEnvironment env1,
308 final CoprocessorEnvironment env2) {
309 if (env1.getPriority() < env2.getPriority()) {
310 return -1;
311 } else if (env1.getPriority() > env2.getPriority()) {
312 return 1;
313 }
314 if (env1.getLoadSequence() < env2.getLoadSequence()) {
315 return -1;
316 } else if (env1.getLoadSequence() > env2.getLoadSequence()) {
317 return 1;
318 }
319 return 0;
320 }
321 }
322 }