1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import java.io.IOException;
22 import java.util.Comparator;
23 import java.util.List;
24
25 import org.apache.commons.lang.ClassUtils;
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.hbase.classification.InterfaceAudience;
29 import org.apache.hadoop.hbase.classification.InterfaceStability;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.hbase.CellScanner;
32 import org.apache.hadoop.hbase.Coprocessor;
33 import org.apache.hadoop.hbase.CoprocessorEnvironment;
34 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
35 import org.apache.hadoop.hbase.MetaMutationAnnotation;
36 import org.apache.hadoop.hbase.client.Mutation;
37 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
38 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
39 import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
40 import org.apache.hadoop.hbase.coprocessor.RegionServerObserver;
41 import org.apache.hadoop.hbase.coprocessor.SingletonCoprocessorService;
42 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
43 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
44
45 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
46 @InterfaceStability.Evolving
47 public class RegionServerCoprocessorHost extends
48 CoprocessorHost<RegionServerCoprocessorHost.RegionServerEnvironment> {
49
50 private static final Log LOG = LogFactory.getLog(RegionServerCoprocessorHost.class);
51
52 private RegionServerServices rsServices;
53
54 public RegionServerCoprocessorHost(RegionServerServices rsServices,
55 Configuration conf) {
56 super(rsServices);
57 this.rsServices = rsServices;
58 this.conf = conf;
59
60
61
62 boolean coprocessorsEnabled = conf.getBoolean(COPROCESSORS_ENABLED_CONF_KEY,
63 DEFAULT_COPROCESSORS_ENABLED);
64 boolean tableCoprocessorsEnabled = conf.getBoolean(USER_COPROCESSORS_ENABLED_CONF_KEY,
65 DEFAULT_USER_COPROCESSORS_ENABLED);
66 LOG.info("System coprocessor loading is " + (coprocessorsEnabled ? "enabled" : "disabled"));
67 LOG.info("Table coprocessor loading is " +
68 ((coprocessorsEnabled && tableCoprocessorsEnabled) ? "enabled" : "disabled"));
69 loadSystemCoprocessors(conf, REGIONSERVER_COPROCESSOR_CONF_KEY);
70 }
71
72 @Override
73 public RegionServerEnvironment createEnvironment(Class<?> implClass,
74 Coprocessor instance, int priority, int sequence, Configuration conf) {
75 return new RegionServerEnvironment(implClass, instance, priority,
76 sequence, conf, this.rsServices);
77 }
78
79 public void preStop(String message) throws IOException {
80
81
82 execShutdown(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
83 @Override
84 public void call(RegionServerObserver oserver,
85 ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
86 oserver.preStopRegionServer(ctx);
87 }
88 @Override
89 public void postEnvCall(RegionServerEnvironment env) {
90
91 shutdown(env);
92 }
93 });
94 }
95
96 public boolean preMerge(final HRegion regionA, final HRegion regionB) throws IOException {
97 return execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
98 @Override
99 public void call(RegionServerObserver oserver,
100 ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
101 oserver.preMerge(ctx, regionA, regionB);
102 }
103 });
104 }
105
106 public void postMerge(final HRegion regionA, final HRegion regionB, final HRegion mergedRegion)
107 throws IOException {
108 execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
109 @Override
110 public void call(RegionServerObserver oserver,
111 ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
112 oserver.postMerge(ctx, regionA, regionB, mergedRegion);
113 }
114 });
115 }
116
117 public boolean preMergeCommit(final HRegion regionA, final HRegion regionB,
118 final @MetaMutationAnnotation List<Mutation> metaEntries) throws IOException {
119 return execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
120 @Override
121 public void call(RegionServerObserver oserver,
122 ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
123 oserver.preMergeCommit(ctx, regionA, regionB, metaEntries);
124 }
125 });
126 }
127
128 public void postMergeCommit(final HRegion regionA, final HRegion regionB,
129 final HRegion mergedRegion) throws IOException {
130 execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
131 @Override
132 public void call(RegionServerObserver oserver,
133 ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
134 oserver.postMergeCommit(ctx, regionA, regionB, mergedRegion);
135 }
136 });
137 }
138
139 public void preRollBackMerge(final HRegion regionA, final HRegion regionB) throws IOException {
140 execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
141 @Override
142 public void call(RegionServerObserver oserver,
143 ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
144 oserver.preRollBackMerge(ctx, regionA, regionB);
145 }
146 });
147 }
148
149 public void postRollBackMerge(final HRegion regionA, final HRegion regionB) throws IOException {
150 execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
151 @Override
152 public void call(RegionServerObserver oserver,
153 ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
154 oserver.postRollBackMerge(ctx, regionA, regionB);
155 }
156 });
157 }
158
159 public void preRollWALWriterRequest() throws IOException {
160 execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
161 @Override
162 public void call(RegionServerObserver oserver,
163 ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
164 oserver.preRollWALWriterRequest(ctx);
165 }
166 });
167 }
168
169 public void postRollWALWriterRequest() throws IOException {
170 execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
171 @Override
172 public void call(RegionServerObserver oserver,
173 ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
174 oserver.postRollWALWriterRequest(ctx);
175 }
176 });
177 }
178
179 public void preReplicateLogEntries(final List<WALEntry> entries, final CellScanner cells)
180 throws IOException {
181 execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
182 @Override
183 public void call(RegionServerObserver oserver,
184 ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
185 oserver.preReplicateLogEntries(ctx, entries, cells);
186 }
187 });
188 }
189
190 public void postReplicateLogEntries(final List<WALEntry> entries, final CellScanner cells)
191 throws IOException {
192 execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
193 @Override
194 public void call(RegionServerObserver oserver,
195 ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
196 oserver.postReplicateLogEntries(ctx, entries, cells);
197 }
198 });
199 }
200
201 public ReplicationEndpoint postCreateReplicationEndPoint(final ReplicationEndpoint endpoint)
202 throws IOException {
203 return execOperationWithResult(endpoint, coprocessors.isEmpty() ? null
204 : new CoprocessOperationWithResult<ReplicationEndpoint>() {
205 @Override
206 public void call(RegionServerObserver oserver,
207 ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
208 setResult(oserver.postCreateReplicationEndPoint(ctx, getResult()));
209 }
210 });
211 }
212
213 private <T> T execOperationWithResult(final T defaultValue,
214 final CoprocessOperationWithResult<T> ctx) throws IOException {
215 if (ctx == null)
216 return defaultValue;
217 ctx.setResult(defaultValue);
218 execOperation(ctx);
219 return ctx.getResult();
220 }
221
222 private static abstract class CoprocessorOperation
223 extends ObserverContext<RegionServerCoprocessorEnvironment> {
224 public CoprocessorOperation() {
225 }
226
227 public abstract void call(RegionServerObserver oserver,
228 ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException;
229
230 public void postEnvCall(RegionServerEnvironment env) {
231 }
232 }
233
234 private static abstract class CoprocessOperationWithResult<T> extends CoprocessorOperation {
235 private T result = null;
236
237 public void setResult(final T result) {
238 this.result = result;
239 }
240
241 public T getResult() {
242 return this.result;
243 }
244 }
245
246 private boolean execOperation(final CoprocessorOperation ctx) throws IOException {
247 if (ctx == null) return false;
248 boolean bypass = false;
249 List<RegionServerEnvironment> envs = coprocessors.get();
250 for (int i = 0; i < envs.size(); i++) {
251 RegionServerEnvironment env = envs.get(i);
252 if (env.getInstance() instanceof RegionServerObserver) {
253 ctx.prepare(env);
254 Thread currentThread = Thread.currentThread();
255 ClassLoader cl = currentThread.getContextClassLoader();
256 try {
257 currentThread.setContextClassLoader(env.getClassLoader());
258 ctx.call((RegionServerObserver)env.getInstance(), ctx);
259 } catch (Throwable e) {
260 handleCoprocessorThrowable(env, e);
261 } finally {
262 currentThread.setContextClassLoader(cl);
263 }
264 bypass |= ctx.shouldBypass();
265 if (ctx.shouldComplete()) {
266 break;
267 }
268 }
269 ctx.postEnvCall(env);
270 }
271 return bypass;
272 }
273
274
275
276
277
278
279
280
281
282
283
284
285
286 private boolean execShutdown(final CoprocessorOperation ctx) throws IOException {
287 if (ctx == null) return false;
288 boolean bypass = false;
289 List<RegionServerEnvironment> envs = coprocessors.get();
290 int envsSize = envs.size();
291
292 for (int i = 0; i < envsSize; i++) {
293 RegionServerEnvironment env = envs.get(i);
294 if (env.getInstance() instanceof RegionServerObserver) {
295 ctx.prepare(env);
296 Thread currentThread = Thread.currentThread();
297 ClassLoader cl = currentThread.getContextClassLoader();
298 try {
299 currentThread.setContextClassLoader(env.getClassLoader());
300 ctx.call((RegionServerObserver) env.getInstance(), ctx);
301 } catch (Throwable e) {
302 handleCoprocessorThrowable(env, e);
303 } finally {
304 currentThread.setContextClassLoader(cl);
305 }
306 bypass |= ctx.shouldBypass();
307 if (ctx.shouldComplete()) {
308 break;
309 }
310 }
311 }
312
313
314 for (int i = 0; i < envsSize; i++) {
315 RegionServerEnvironment env = envs.get(i);
316 ctx.postEnvCall(env);
317 }
318 return bypass;
319 }
320
321
322
323
324
325 static class RegionServerEnvironment extends CoprocessorHost.Environment
326 implements RegionServerCoprocessorEnvironment {
327
328 private RegionServerServices regionServerServices;
329
330 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="BC_UNCONFIRMED_CAST",
331 justification="Intentional; FB has trouble detecting isAssignableFrom")
332 public RegionServerEnvironment(final Class<?> implClass,
333 final Coprocessor impl, final int priority, final int seq,
334 final Configuration conf, final RegionServerServices services) {
335 super(impl, priority, seq, conf);
336 this.regionServerServices = services;
337 for (Object itf : ClassUtils.getAllInterfaces(implClass)) {
338 Class<?> c = (Class<?>) itf;
339 if (SingletonCoprocessorService.class.isAssignableFrom(c)) {
340 this.regionServerServices.registerService(
341 ((SingletonCoprocessorService) impl).getService());
342 break;
343 }
344 }
345 }
346
347 @Override
348 public RegionServerServices getRegionServerServices() {
349 return regionServerServices;
350 }
351 }
352
353
354
355
356
357 static class EnvironmentPriorityComparator implements
358 Comparator<CoprocessorEnvironment> {
359 public int compare(final CoprocessorEnvironment env1,
360 final CoprocessorEnvironment env2) {
361 if (env1.getPriority() < env2.getPriority()) {
362 return -1;
363 } else if (env1.getPriority() > env2.getPriority()) {
364 return 1;
365 }
366 if (env1.getLoadSequence() < env2.getLoadSequence()) {
367 return -1;
368 } else if (env1.getLoadSequence() > env2.getLoadSequence()) {
369 return 1;
370 }
371 return 0;
372 }
373 }
374 }