View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.IOException;
22  import java.util.Comparator;
23  import java.util.List;
24  
25  import org.apache.commons.lang.ClassUtils;
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.apache.hadoop.hbase.classification.InterfaceAudience;
29  import org.apache.hadoop.hbase.classification.InterfaceStability;
30  import org.apache.hadoop.conf.Configuration;
31  import org.apache.hadoop.hbase.CellScanner;
32  import org.apache.hadoop.hbase.Coprocessor;
33  import org.apache.hadoop.hbase.CoprocessorEnvironment;
34  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
35  import org.apache.hadoop.hbase.MetaMutationAnnotation;
36  import org.apache.hadoop.hbase.client.Mutation;
37  import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
38  import org.apache.hadoop.hbase.coprocessor.ObserverContext;
39  import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
40  import org.apache.hadoop.hbase.coprocessor.RegionServerObserver;
41  import org.apache.hadoop.hbase.coprocessor.SingletonCoprocessorService;
42  import org.apache.hadoop.hbase.ipc.RpcServer;
43  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
44  import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
45  import org.apache.hadoop.hbase.security.User;
46
47  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
48  @InterfaceStability.Evolving
49  public class RegionServerCoprocessorHost extends
50      CoprocessorHost<RegionServerCoprocessorHost.RegionServerEnvironment> {
51  
52    private static final Log LOG = LogFactory.getLog(RegionServerCoprocessorHost.class);
53  
54    private RegionServerServices rsServices;
55
56    public RegionServerCoprocessorHost(RegionServerServices rsServices,
57        Configuration conf) {
58      super(rsServices);
59      this.rsServices = rsServices;
60      this.conf = conf;
61      // Log the state of coprocessor loading here; should appear only once or
62      // twice in the daemon log, depending on HBase version, because there is
63      // only one RegionServerCoprocessorHost instance in the RS process
64      boolean coprocessorsEnabled = conf.getBoolean(COPROCESSORS_ENABLED_CONF_KEY,
65        DEFAULT_COPROCESSORS_ENABLED);
66      boolean tableCoprocessorsEnabled = conf.getBoolean(USER_COPROCESSORS_ENABLED_CONF_KEY,
67        DEFAULT_USER_COPROCESSORS_ENABLED);
68      LOG.info("System coprocessor loading is " + (coprocessorsEnabled ? "enabled" : "disabled"));
69      LOG.info("Table coprocessor loading is " +
70        ((coprocessorsEnabled && tableCoprocessorsEnabled) ? "enabled" : "disabled"));
71      loadSystemCoprocessors(conf, REGIONSERVER_COPROCESSOR_CONF_KEY);
72    }
73
74    @Override
75    public RegionServerEnvironment createEnvironment(Class<?> implClass,
76        Coprocessor instance, int priority, int sequence, Configuration conf) {
77      return new RegionServerEnvironment(implClass, instance, priority,
78        sequence, conf, this.rsServices);
79    }
80
81    public void preStop(String message) throws IOException {
82      execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
83        @Override
84        public void call(RegionServerObserver oserver,
85            ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
86          oserver.preStopRegionServer(ctx);
87        }
88        @Override
89        public void postEnvCall(RegionServerEnvironment env) {
90          // invoke coprocessor stop method
91          shutdown(env);
92        }
93      });
94    }
95
96    public boolean preMerge(final HRegion regionA, final HRegion regionB, final User user) throws IOException {
97      return execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation(user) {
98        @Override
99        public void call(RegionServerObserver oserver,
100           ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
101         oserver.preMerge(ctx, regionA, regionB);
102       }
103     });
104   }
105
106   public void postMerge(final HRegion regionA, final HRegion regionB, final HRegion mergedRegion,
107                         final User user)
108       throws IOException {
109     execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation(user) {
110       @Override
111       public void call(RegionServerObserver oserver,
112           ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
113         oserver.postMerge(ctx, regionA, regionB, mergedRegion);
114       }
115     });
116   }
117
118   public boolean preMergeCommit(final HRegion regionA, final HRegion regionB,
119       final @MetaMutationAnnotation List<Mutation> metaEntries, final User user)
120       throws IOException {
121     return execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation(user) {
122       @Override
123       public void call(RegionServerObserver oserver,
124           ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
125         oserver.preMergeCommit(ctx, regionA, regionB, metaEntries);
126       }
127     });
128   }
129
130   public void postMergeCommit(final HRegion regionA, final HRegion regionB,
131       final HRegion mergedRegion, final User user) throws IOException {
132     execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation(user) {
133       @Override
134       public void call(RegionServerObserver oserver,
135           ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
136         oserver.postMergeCommit(ctx, regionA, regionB, mergedRegion);
137       }
138     });
139   }
140
141   public void preRollBackMerge(final HRegion regionA, final HRegion regionB, final User user)
142       throws IOException {
143     execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation(user) {
144       @Override
145       public void call(RegionServerObserver oserver,
146           ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
147         oserver.preRollBackMerge(ctx, regionA, regionB);
148       }
149     });
150   }
151
152   public void postRollBackMerge(final HRegion regionA, final HRegion regionB, final User user)
153       throws IOException {
154     execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation(user) {
155       @Override
156       public void call(RegionServerObserver oserver,
157           ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
158         oserver.postRollBackMerge(ctx, regionA, regionB);
159       }
160     });
161   }
162
163   public void preRollWALWriterRequest() throws IOException {
164     execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
165       @Override
166       public void call(RegionServerObserver oserver,
167           ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
168         oserver.preRollWALWriterRequest(ctx);
169       }
170     });
171   }
172
173   public void postRollWALWriterRequest() throws IOException {
174     execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
175       @Override
176       public void call(RegionServerObserver oserver,
177           ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
178         oserver.postRollWALWriterRequest(ctx);
179       }
180     });
181   }
182
183   public void preReplicateLogEntries(final List<WALEntry> entries, final CellScanner cells)
184       throws IOException {
185     execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
186       @Override
187       public void call(RegionServerObserver oserver,
188           ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
189         oserver.preReplicateLogEntries(ctx, entries, cells);
190       }
191     });
192   }
193
194   public void postReplicateLogEntries(final List<WALEntry> entries, final CellScanner cells)
195       throws IOException {
196     execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
197       @Override
198       public void call(RegionServerObserver oserver,
199           ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
200         oserver.postReplicateLogEntries(ctx, entries, cells);
201       }
202     });
203   }
204
205   public ReplicationEndpoint postCreateReplicationEndPoint(final ReplicationEndpoint endpoint)
206       throws IOException {
207     return execOperationWithResult(endpoint, coprocessors.isEmpty() ? null
208         : new CoprocessOperationWithResult<ReplicationEndpoint>() {
209           @Override
210           public void call(RegionServerObserver oserver,
211               ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
212             setResult(oserver.postCreateReplicationEndPoint(ctx, getResult()));
213           }
214         });
215   }
216
217   private <T> T execOperationWithResult(final T defaultValue,
218       final CoprocessOperationWithResult<T> ctx) throws IOException {
219     if (ctx == null)
220       return defaultValue;
221     ctx.setResult(defaultValue);
222     execOperation(ctx);
223     return ctx.getResult();
224   }
225
226   private static abstract class CoprocessorOperation
227       extends ObserverContext<RegionServerCoprocessorEnvironment> {
228     public CoprocessorOperation() {
229       this(RpcServer.getRequestUser());
230     }
231 
232     public CoprocessorOperation(User user) {
233       super(user);
234     }
235
236     public abstract void call(RegionServerObserver oserver,
237         ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException;
238 
239     public void postEnvCall(RegionServerEnvironment env) {
240     }
241   }
242
243   private static abstract class CoprocessOperationWithResult<T> extends CoprocessorOperation {
244     private T result = null;
245
246     public void setResult(final T result) {
247       this.result = result;
248     }
249
250     public T getResult() {
251       return this.result;
252     }
253   }
254
255   private boolean execOperation(final CoprocessorOperation ctx) throws IOException {
256     if (ctx == null) return false;
257     boolean bypass = false;
258     List<RegionServerEnvironment> envs = coprocessors.get();
259     for (int i = 0; i < envs.size(); i++) {
260       RegionServerEnvironment env = envs.get(i);
261       if (env.getInstance() instanceof RegionServerObserver) {
262         ctx.prepare(env);
263         Thread currentThread = Thread.currentThread();
264         ClassLoader cl = currentThread.getContextClassLoader();
265         try {
266           currentThread.setContextClassLoader(env.getClassLoader());
267           ctx.call((RegionServerObserver)env.getInstance(), ctx);
268         } catch (Throwable e) {
269           handleCoprocessorThrowable(env, e);
270         } finally {
271           currentThread.setContextClassLoader(cl);
272         }
273         bypass |= ctx.shouldBypass();
274         if (ctx.shouldComplete()) {
275           break;
276         }
277       }
278       ctx.postEnvCall(env);
279     }
280     return bypass;
281   }
282
283   /**
284    * Coprocessor environment extension providing access to region server
285    * related services.
286    */
287   static class RegionServerEnvironment extends CoprocessorHost.Environment
288       implements RegionServerCoprocessorEnvironment {
289
290     private RegionServerServices regionServerServices;
291
292     @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="BC_UNCONFIRMED_CAST",
293         justification="Intentional; FB has trouble detecting isAssignableFrom")
294     public RegionServerEnvironment(final Class<?> implClass,
295         final Coprocessor impl, final int priority, final int seq,
296         final Configuration conf, final RegionServerServices services) {
297       super(impl, priority, seq, conf);
298       this.regionServerServices = services;
299       for (Object itf : ClassUtils.getAllInterfaces(implClass)) {
300         Class<?> c = (Class<?>) itf;
301         if (SingletonCoprocessorService.class.isAssignableFrom(c)) {// FindBugs: BC_UNCONFIRMED_CAST
302           this.regionServerServices.registerService(
303             ((SingletonCoprocessorService) impl).getService());
304           break;
305         }
306       }
307     }
308
309     @Override
310     public RegionServerServices getRegionServerServices() {
311       return regionServerServices;
312     }
313   }
314
315   /**
316    * Environment priority comparator. Coprocessors are chained in sorted
317    * order.
318    */
319   static class EnvironmentPriorityComparator implements
320       Comparator<CoprocessorEnvironment> {
321     public int compare(final CoprocessorEnvironment env1,
322         final CoprocessorEnvironment env2) {
323       if (env1.getPriority() < env2.getPriority()) {
324         return -1;
325       } else if (env1.getPriority() > env2.getPriority()) {
326         return 1;
327       }
328       if (env1.getLoadSequence() < env2.getLoadSequence()) {
329         return -1;
330       } else if (env1.getLoadSequence() > env2.getLoadSequence()) {
331         return 1;
332       }
333       return 0;
334     }
335   }
336 }