View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.IOException;
22  import java.util.Comparator;
23  import java.util.List;
24  
25  import org.apache.commons.lang.ClassUtils;
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.apache.hadoop.hbase.classification.InterfaceAudience;
29  import org.apache.hadoop.hbase.classification.InterfaceStability;
30  import org.apache.hadoop.conf.Configuration;
31  import org.apache.hadoop.hbase.CellScanner;
32  import org.apache.hadoop.hbase.Coprocessor;
33  import org.apache.hadoop.hbase.CoprocessorEnvironment;
34  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
35  import org.apache.hadoop.hbase.MetaMutationAnnotation;
36  import org.apache.hadoop.hbase.client.Mutation;
37  import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
38  import org.apache.hadoop.hbase.coprocessor.ObserverContext;
39  import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
40  import org.apache.hadoop.hbase.coprocessor.RegionServerObserver;
41  import org.apache.hadoop.hbase.coprocessor.SingletonCoprocessorService;
42  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
43  import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
44  
45  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
46  @InterfaceStability.Evolving
47  public class RegionServerCoprocessorHost extends
48      CoprocessorHost<RegionServerCoprocessorHost.RegionServerEnvironment> {
49  
50    private static final Log LOG = LogFactory.getLog(RegionServerCoprocessorHost.class);
51  
52    private RegionServerServices rsServices;
53  
54    public RegionServerCoprocessorHost(RegionServerServices rsServices,
55        Configuration conf) {
56      super(rsServices);
57      this.rsServices = rsServices;
58      this.conf = conf;
59      // Log the state of coprocessor loading here; should appear only once or
60      // twice in the daemon log, depending on HBase version, because there is
61      // only one RegionServerCoprocessorHost instance in the RS process
62      boolean coprocessorsEnabled = conf.getBoolean(COPROCESSORS_ENABLED_CONF_KEY,
63        DEFAULT_COPROCESSORS_ENABLED);
64      boolean tableCoprocessorsEnabled = conf.getBoolean(USER_COPROCESSORS_ENABLED_CONF_KEY,
65        DEFAULT_USER_COPROCESSORS_ENABLED);
66      LOG.info("System coprocessor loading is " + (coprocessorsEnabled ? "enabled" : "disabled"));
67      LOG.info("Table coprocessor loading is " +
68        ((coprocessorsEnabled && tableCoprocessorsEnabled) ? "enabled" : "disabled"));
69      loadSystemCoprocessors(conf, REGIONSERVER_COPROCESSOR_CONF_KEY);
70    }
71  
72    @Override
73    public RegionServerEnvironment createEnvironment(Class<?> implClass,
74        Coprocessor instance, int priority, int sequence, Configuration conf) {
75      return new RegionServerEnvironment(implClass, instance, priority,
76        sequence, conf, this.rsServices);
77    }
78  
79    public void preStop(String message) throws IOException {
80      execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
81        @Override
82        public void call(RegionServerObserver oserver,
83            ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
84          oserver.preStopRegionServer(ctx);
85        }
86        @Override
87        public void postEnvCall(RegionServerEnvironment env) {
88          // invoke coprocessor stop method
89          shutdown(env);
90        }
91      });
92    }
93  
94    public boolean preMerge(final HRegion regionA, final HRegion regionB) throws IOException {
95      return execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
96        @Override
97        public void call(RegionServerObserver oserver,
98            ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
99          oserver.preMerge(ctx, regionA, regionB);
100       }
101     });
102   }
103 
104   public void postMerge(final HRegion regionA, final HRegion regionB, final HRegion mergedRegion)
105       throws IOException {
106     execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
107       @Override
108       public void call(RegionServerObserver oserver,
109           ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
110         oserver.postMerge(ctx, regionA, regionB, mergedRegion);
111       }
112     });
113   }
114 
115   public boolean preMergeCommit(final HRegion regionA, final HRegion regionB,
116       final @MetaMutationAnnotation List<Mutation> metaEntries) throws IOException {
117     return execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
118       @Override
119       public void call(RegionServerObserver oserver,
120           ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
121         oserver.preMergeCommit(ctx, regionA, regionB, metaEntries);
122       }
123     });
124   }
125 
126   public void postMergeCommit(final HRegion regionA, final HRegion regionB,
127       final HRegion mergedRegion) throws IOException {
128     execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
129       @Override
130       public void call(RegionServerObserver oserver,
131           ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
132         oserver.postMergeCommit(ctx, regionA, regionB, mergedRegion);
133       }
134     });
135   }
136 
137   public void preRollBackMerge(final HRegion regionA, final HRegion regionB) throws IOException {
138     execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
139       @Override
140       public void call(RegionServerObserver oserver,
141           ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
142         oserver.preRollBackMerge(ctx, regionA, regionB);
143       }
144     });
145   }
146 
147   public void postRollBackMerge(final HRegion regionA, final HRegion regionB) throws IOException {
148     execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
149       @Override
150       public void call(RegionServerObserver oserver,
151           ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
152         oserver.postRollBackMerge(ctx, regionA, regionB);
153       }
154     });
155   }
156 
157   public void preRollWALWriterRequest() throws IOException {
158     execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
159       @Override
160       public void call(RegionServerObserver oserver,
161           ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
162         oserver.preRollWALWriterRequest(ctx);
163       }
164     });
165   }
166 
167   public void postRollWALWriterRequest() throws IOException {
168     execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
169       @Override
170       public void call(RegionServerObserver oserver,
171           ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
172         oserver.postRollWALWriterRequest(ctx);
173       }
174     });
175   }
176 
177   public void preReplicateLogEntries(final List<WALEntry> entries, final CellScanner cells)
178       throws IOException {
179     execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
180       @Override
181       public void call(RegionServerObserver oserver,
182           ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
183         oserver.preReplicateLogEntries(ctx, entries, cells);
184       }
185     });
186   }
187 
188   public void postReplicateLogEntries(final List<WALEntry> entries, final CellScanner cells)
189       throws IOException {
190     execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
191       @Override
192       public void call(RegionServerObserver oserver,
193           ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
194         oserver.postReplicateLogEntries(ctx, entries, cells);
195       }
196     });
197   }
198 
199   public ReplicationEndpoint postCreateReplicationEndPoint(final ReplicationEndpoint endpoint)
200       throws IOException {
201     return execOperationWithResult(endpoint, coprocessors.isEmpty() ? null
202         : new CoprocessOperationWithResult<ReplicationEndpoint>() {
203           @Override
204           public void call(RegionServerObserver oserver,
205               ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
206             setResult(oserver.postCreateReplicationEndPoint(ctx, getResult()));
207           }
208         });
209   }
210 
211   private <T> T execOperationWithResult(final T defaultValue,
212       final CoprocessOperationWithResult<T> ctx) throws IOException {
213     if (ctx == null)
214       return defaultValue;
215     ctx.setResult(defaultValue);
216     execOperation(ctx);
217     return ctx.getResult();
218   }
219 
220   private static abstract class CoprocessorOperation
221       extends ObserverContext<RegionServerCoprocessorEnvironment> {
222     public CoprocessorOperation() {
223     }
224 
225     public abstract void call(RegionServerObserver oserver,
226         ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException;
227 
228     public void postEnvCall(RegionServerEnvironment env) {
229     }
230   }
231 
232   private static abstract class CoprocessOperationWithResult<T> extends CoprocessorOperation {
233     private T result = null;
234 
235     public void setResult(final T result) {
236       this.result = result;
237     }
238 
239     public T getResult() {
240       return this.result;
241     }
242   }
243 
244   private boolean execOperation(final CoprocessorOperation ctx) throws IOException {
245     if (ctx == null) return false;
246 
247     boolean bypass = false;
248     for (RegionServerEnvironment env: coprocessors) {
249       if (env.getInstance() instanceof RegionServerObserver) {
250         ctx.prepare(env);
251         Thread currentThread = Thread.currentThread();
252         ClassLoader cl = currentThread.getContextClassLoader();
253         try {
254           currentThread.setContextClassLoader(env.getClassLoader());
255           ctx.call((RegionServerObserver)env.getInstance(), ctx);
256         } catch (Throwable e) {
257           handleCoprocessorThrowable(env, e);
258         } finally {
259           currentThread.setContextClassLoader(cl);
260         }
261         bypass |= ctx.shouldBypass();
262         if (ctx.shouldComplete()) {
263           break;
264         }
265       }
266       ctx.postEnvCall(env);
267     }
268     return bypass;
269   }
270 
271   /**
272    * Coprocessor environment extension providing access to region server
273    * related services.
274    */
275   static class RegionServerEnvironment extends CoprocessorHost.Environment
276       implements RegionServerCoprocessorEnvironment {
277 
278     private RegionServerServices regionServerServices;
279 
280     public RegionServerEnvironment(final Class<?> implClass,
281         final Coprocessor impl, final int priority, final int seq,
282         final Configuration conf, final RegionServerServices services) {
283       super(impl, priority, seq, conf);
284       this.regionServerServices = services;
285       for (Object itf : ClassUtils.getAllInterfaces(implClass)) {
286         Class<?> c = (Class<?>) itf;
287         if (SingletonCoprocessorService.class.isAssignableFrom(c)) {
288           this.regionServerServices.registerService(
289             ((SingletonCoprocessorService) impl).getService());
290           break;
291         }
292       }
293     }
294 
295     @Override
296     public RegionServerServices getRegionServerServices() {
297       return regionServerServices;
298     }
299   }
300 
301   /**
302    * Environment priority comparator. Coprocessors are chained in sorted
303    * order.
304    */
305   static class EnvironmentPriorityComparator implements
306       Comparator<CoprocessorEnvironment> {
307     public int compare(final CoprocessorEnvironment env1,
308         final CoprocessorEnvironment env2) {
309       if (env1.getPriority() < env2.getPriority()) {
310         return -1;
311       } else if (env1.getPriority() > env2.getPriority()) {
312         return 1;
313       }
314       if (env1.getLoadSequence() < env2.getLoadSequence()) {
315         return -1;
316       } else if (env1.getLoadSequence() > env2.getLoadSequence()) {
317         return 1;
318       }
319       return 0;
320     }
321   }
322 }