View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.IOException;
22  import java.util.Comparator;
23  import java.util.List;
24  
25  import org.apache.commons.logging.Log;
26  import org.apache.commons.logging.LogFactory;
27  import org.apache.hadoop.hbase.classification.InterfaceAudience;
28  import org.apache.hadoop.hbase.classification.InterfaceStability;
29  import org.apache.hadoop.conf.Configuration;
30  import org.apache.hadoop.hbase.CellScanner;
31  import org.apache.hadoop.hbase.Coprocessor;
32  import org.apache.hadoop.hbase.CoprocessorEnvironment;
33  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
34  import org.apache.hadoop.hbase.MetaMutationAnnotation;
35  import org.apache.hadoop.hbase.client.Mutation;
36  import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
37  import org.apache.hadoop.hbase.coprocessor.ObserverContext;
38  import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
39  import org.apache.hadoop.hbase.coprocessor.RegionServerObserver;
40  import org.apache.hadoop.hbase.coprocessor.SingletonCoprocessorService;
41  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
42  import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
43  
44  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
45  @InterfaceStability.Evolving
46  public class RegionServerCoprocessorHost extends
47      CoprocessorHost<RegionServerCoprocessorHost.RegionServerEnvironment> {
48  
49    private static final Log LOG = LogFactory.getLog(RegionServerCoprocessorHost.class);
50  
51    private RegionServerServices rsServices;
52  
53    public RegionServerCoprocessorHost(RegionServerServices rsServices,
54        Configuration conf) {
55      super(rsServices);
56      this.rsServices = rsServices;
57      this.conf = conf;
58      // Log the state of coprocessor loading here; should appear only once or
59      // twice in the daemon log, depending on HBase version, because there is
60      // only one RegionServerCoprocessorHost instance in the RS process
61      boolean coprocessorsEnabled = conf.getBoolean(COPROCESSORS_ENABLED_CONF_KEY,
62        DEFAULT_COPROCESSORS_ENABLED);
63      boolean tableCoprocessorsEnabled = conf.getBoolean(USER_COPROCESSORS_ENABLED_CONF_KEY,
64        DEFAULT_USER_COPROCESSORS_ENABLED);
65      LOG.info("System coprocessor loading is " + (coprocessorsEnabled ? "enabled" : "disabled"));
66      LOG.info("Table coprocessor loading is " +
67        ((coprocessorsEnabled && tableCoprocessorsEnabled) ? "enabled" : "disabled"));
68      loadSystemCoprocessors(conf, REGIONSERVER_COPROCESSOR_CONF_KEY);
69    }
70  
71    @Override
72    public RegionServerEnvironment createEnvironment(Class<?> implClass,
73        Coprocessor instance, int priority, int sequence, Configuration conf) {
74      return new RegionServerEnvironment(implClass, instance, priority,
75        sequence, conf, this.rsServices);
76    }
77  
78    public void preStop(String message) throws IOException {
79      execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
80        @Override
81        public void call(RegionServerObserver oserver,
82            ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
83          oserver.preStopRegionServer(ctx);
84        }
85        @Override
86        public void postEnvCall(RegionServerEnvironment env) {
87          // invoke coprocessor stop method
88          shutdown(env);
89        }
90      });
91    }
92  
93    public boolean preMerge(final HRegion regionA, final HRegion regionB) throws IOException {
94      return execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
95        @Override
96        public void call(RegionServerObserver oserver,
97            ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
98          oserver.preMerge(ctx, regionA, regionB);
99        }
100     });
101   }
102 
103   public void postMerge(final HRegion regionA, final HRegion regionB, final HRegion mergedRegion)
104       throws IOException {
105     execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
106       @Override
107       public void call(RegionServerObserver oserver,
108           ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
109         oserver.postMerge(ctx, regionA, regionB, mergedRegion);
110       }
111     });
112   }
113 
114   public boolean preMergeCommit(final HRegion regionA, final HRegion regionB,
115       final @MetaMutationAnnotation List<Mutation> metaEntries) throws IOException {
116     return execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
117       @Override
118       public void call(RegionServerObserver oserver,
119           ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
120         oserver.preMergeCommit(ctx, regionA, regionB, metaEntries);
121       }
122     });
123   }
124 
125   public void postMergeCommit(final HRegion regionA, final HRegion regionB,
126       final HRegion mergedRegion) throws IOException {
127     execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
128       @Override
129       public void call(RegionServerObserver oserver,
130           ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
131         oserver.postMergeCommit(ctx, regionA, regionB, mergedRegion);
132       }
133     });
134   }
135 
136   public void preRollBackMerge(final HRegion regionA, final HRegion regionB) throws IOException {
137     execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
138       @Override
139       public void call(RegionServerObserver oserver,
140           ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
141         oserver.preRollBackMerge(ctx, regionA, regionB);
142       }
143     });
144   }
145 
146   public void postRollBackMerge(final HRegion regionA, final HRegion regionB) throws IOException {
147     execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
148       @Override
149       public void call(RegionServerObserver oserver,
150           ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
151         oserver.postRollBackMerge(ctx, regionA, regionB);
152       }
153     });
154   }
155 
156   public void preRollWALWriterRequest() throws IOException {
157     execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
158       @Override
159       public void call(RegionServerObserver oserver,
160           ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
161         oserver.preRollWALWriterRequest(ctx);
162       }
163     });
164   }
165 
166   public void postRollWALWriterRequest() throws IOException {
167     execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
168       @Override
169       public void call(RegionServerObserver oserver,
170           ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
171         oserver.postRollWALWriterRequest(ctx);
172       }
173     });
174   }
175 
176   public void preReplicateLogEntries(final List<WALEntry> entries, final CellScanner cells)
177       throws IOException {
178     execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
179       @Override
180       public void call(RegionServerObserver oserver,
181           ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
182         oserver.preReplicateLogEntries(ctx, entries, cells);
183       }
184     });
185   }
186 
187   public void postReplicateLogEntries(final List<WALEntry> entries, final CellScanner cells)
188       throws IOException {
189     execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
190       @Override
191       public void call(RegionServerObserver oserver,
192           ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
193         oserver.postReplicateLogEntries(ctx, entries, cells);
194       }
195     });
196   }
197 
198   public ReplicationEndpoint postCreateReplicationEndPoint(final ReplicationEndpoint endpoint)
199       throws IOException {
200     return execOperationWithResult(endpoint, coprocessors.isEmpty() ? null
201         : new CoprocessOperationWithResult<ReplicationEndpoint>() {
202           @Override
203           public void call(RegionServerObserver oserver,
204               ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
205             setResult(oserver.postCreateReplicationEndPoint(ctx, getResult()));
206           }
207         });
208   }
209 
210   private <T> T execOperationWithResult(final T defaultValue,
211       final CoprocessOperationWithResult<T> ctx) throws IOException {
212     if (ctx == null)
213       return defaultValue;
214     ctx.setResult(defaultValue);
215     execOperation(ctx);
216     return ctx.getResult();
217   }
218 
219   private static abstract class CoprocessorOperation
220       extends ObserverContext<RegionServerCoprocessorEnvironment> {
221     public CoprocessorOperation() {
222     }
223 
224     public abstract void call(RegionServerObserver oserver,
225         ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException;
226 
227     public void postEnvCall(RegionServerEnvironment env) {
228     }
229   }
230 
231   private static abstract class CoprocessOperationWithResult<T> extends CoprocessorOperation {
232     private T result = null;
233 
234     public void setResult(final T result) {
235       this.result = result;
236     }
237 
238     public T getResult() {
239       return this.result;
240     }
241   }
242 
243   private boolean execOperation(final CoprocessorOperation ctx) throws IOException {
244     if (ctx == null) return false;
245 
246     boolean bypass = false;
247     for (RegionServerEnvironment env: coprocessors) {
248       if (env.getInstance() instanceof RegionServerObserver) {
249         ctx.prepare(env);
250         Thread currentThread = Thread.currentThread();
251         ClassLoader cl = currentThread.getContextClassLoader();
252         try {
253           currentThread.setContextClassLoader(env.getClassLoader());
254           ctx.call((RegionServerObserver)env.getInstance(), ctx);
255         } catch (Throwable e) {
256           handleCoprocessorThrowable(env, e);
257         } finally {
258           currentThread.setContextClassLoader(cl);
259         }
260         bypass |= ctx.shouldBypass();
261         if (ctx.shouldComplete()) {
262           break;
263         }
264       }
265       ctx.postEnvCall(env);
266     }
267     return bypass;
268   }
269 
270   /**
271    * Coprocessor environment extension providing access to region server
272    * related services.
273    */
274   static class RegionServerEnvironment extends CoprocessorHost.Environment
275       implements RegionServerCoprocessorEnvironment {
276 
277     private RegionServerServices regionServerServices;
278 
279     public RegionServerEnvironment(final Class<?> implClass,
280         final Coprocessor impl, final int priority, final int seq,
281         final Configuration conf, final RegionServerServices services) {
282       super(impl, priority, seq, conf);
283       this.regionServerServices = services;
284       for (Class c : implClass.getInterfaces()) {
285         if (SingletonCoprocessorService.class.isAssignableFrom(c)) {
286           this.regionServerServices.registerService(((SingletonCoprocessorService) impl).getService());
287           break;
288         }
289       }
290     }
291 
292     @Override
293     public RegionServerServices getRegionServerServices() {
294       return regionServerServices;
295     }
296   }
297 
298   /**
299    * Environment priority comparator. Coprocessors are chained in sorted
300    * order.
301    */
302   static class EnvironmentPriorityComparator implements
303       Comparator<CoprocessorEnvironment> {
304     public int compare(final CoprocessorEnvironment env1,
305         final CoprocessorEnvironment env2) {
306       if (env1.getPriority() < env2.getPriority()) {
307         return -1;
308       } else if (env1.getPriority() > env2.getPriority()) {
309         return 1;
310       }
311       if (env1.getLoadSequence() < env2.getLoadSequence()) {
312         return -1;
313       } else if (env1.getLoadSequence() > env2.getLoadSequence()) {
314         return 1;
315       }
316       return 0;
317     }
318   }
319 }