View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.IOException;
22  import java.util.Comparator;
23  import java.util.List;
24  
25  import org.apache.commons.lang.ClassUtils;
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.apache.hadoop.hbase.classification.InterfaceAudience;
29  import org.apache.hadoop.hbase.classification.InterfaceStability;
30  import org.apache.hadoop.conf.Configuration;
31  import org.apache.hadoop.hbase.CellScanner;
32  import org.apache.hadoop.hbase.Coprocessor;
33  import org.apache.hadoop.hbase.CoprocessorEnvironment;
34  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
35  import org.apache.hadoop.hbase.MetaMutationAnnotation;
36  import org.apache.hadoop.hbase.client.Mutation;
37  import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
38  import org.apache.hadoop.hbase.coprocessor.ObserverContext;
39  import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
40  import org.apache.hadoop.hbase.coprocessor.RegionServerObserver;
41  import org.apache.hadoop.hbase.coprocessor.SingletonCoprocessorService;
42  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
43  import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
44  
45  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
46  @InterfaceStability.Evolving
47  public class RegionServerCoprocessorHost extends
48      CoprocessorHost<RegionServerCoprocessorHost.RegionServerEnvironment> {
49  
50    private static final Log LOG = LogFactory.getLog(RegionServerCoprocessorHost.class);
51  
52    private RegionServerServices rsServices;
53  
54    public RegionServerCoprocessorHost(RegionServerServices rsServices,
55        Configuration conf) {
56      super(rsServices);
57      this.rsServices = rsServices;
58      this.conf = conf;
59      // Log the state of coprocessor loading here; should appear only once or
60      // twice in the daemon log, depending on HBase version, because there is
61      // only one RegionServerCoprocessorHost instance in the RS process
62      boolean coprocessorsEnabled = conf.getBoolean(COPROCESSORS_ENABLED_CONF_KEY,
63        DEFAULT_COPROCESSORS_ENABLED);
64      boolean tableCoprocessorsEnabled = conf.getBoolean(USER_COPROCESSORS_ENABLED_CONF_KEY,
65        DEFAULT_USER_COPROCESSORS_ENABLED);
66      LOG.info("System coprocessor loading is " + (coprocessorsEnabled ? "enabled" : "disabled"));
67      LOG.info("Table coprocessor loading is " +
68        ((coprocessorsEnabled && tableCoprocessorsEnabled) ? "enabled" : "disabled"));
69      loadSystemCoprocessors(conf, REGIONSERVER_COPROCESSOR_CONF_KEY);
70    }
71  
72    @Override
73    public RegionServerEnvironment createEnvironment(Class<?> implClass,
74        Coprocessor instance, int priority, int sequence, Configuration conf) {
75      return new RegionServerEnvironment(implClass, instance, priority,
76        sequence, conf, this.rsServices);
77    }
78  
79    public void preStop(String message) throws IOException {
80      // While stopping the region server all coprocessors method should be executed first then the
81      // coprocessor should be cleaned up.
82      execShutdown(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
83        @Override
84        public void call(RegionServerObserver oserver,
85            ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
86          oserver.preStopRegionServer(ctx);
87        }
88        @Override
89        public void postEnvCall(RegionServerEnvironment env) {
90          // invoke coprocessor stop method
91          shutdown(env);
92        }
93      });
94    }
95  
96    public boolean preMerge(final HRegion regionA, final HRegion regionB) throws IOException {
97      return execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
98        @Override
99        public void call(RegionServerObserver oserver,
100           ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
101         oserver.preMerge(ctx, regionA, regionB);
102       }
103     });
104   }
105 
106   public void postMerge(final HRegion regionA, final HRegion regionB, final HRegion mergedRegion)
107       throws IOException {
108     execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
109       @Override
110       public void call(RegionServerObserver oserver,
111           ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
112         oserver.postMerge(ctx, regionA, regionB, mergedRegion);
113       }
114     });
115   }
116 
117   public boolean preMergeCommit(final HRegion regionA, final HRegion regionB,
118       final @MetaMutationAnnotation List<Mutation> metaEntries) throws IOException {
119     return execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
120       @Override
121       public void call(RegionServerObserver oserver,
122           ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
123         oserver.preMergeCommit(ctx, regionA, regionB, metaEntries);
124       }
125     });
126   }
127 
128   public void postMergeCommit(final HRegion regionA, final HRegion regionB,
129       final HRegion mergedRegion) throws IOException {
130     execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
131       @Override
132       public void call(RegionServerObserver oserver,
133           ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
134         oserver.postMergeCommit(ctx, regionA, regionB, mergedRegion);
135       }
136     });
137   }
138 
139   public void preRollBackMerge(final HRegion regionA, final HRegion regionB) throws IOException {
140     execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
141       @Override
142       public void call(RegionServerObserver oserver,
143           ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
144         oserver.preRollBackMerge(ctx, regionA, regionB);
145       }
146     });
147   }
148 
149   public void postRollBackMerge(final HRegion regionA, final HRegion regionB) throws IOException {
150     execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
151       @Override
152       public void call(RegionServerObserver oserver,
153           ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
154         oserver.postRollBackMerge(ctx, regionA, regionB);
155       }
156     });
157   }
158 
159   public void preRollWALWriterRequest() throws IOException {
160     execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
161       @Override
162       public void call(RegionServerObserver oserver,
163           ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
164         oserver.preRollWALWriterRequest(ctx);
165       }
166     });
167   }
168 
169   public void postRollWALWriterRequest() throws IOException {
170     execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
171       @Override
172       public void call(RegionServerObserver oserver,
173           ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
174         oserver.postRollWALWriterRequest(ctx);
175       }
176     });
177   }
178 
179   public void preReplicateLogEntries(final List<WALEntry> entries, final CellScanner cells)
180       throws IOException {
181     execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
182       @Override
183       public void call(RegionServerObserver oserver,
184           ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
185         oserver.preReplicateLogEntries(ctx, entries, cells);
186       }
187     });
188   }
189 
190   public void postReplicateLogEntries(final List<WALEntry> entries, final CellScanner cells)
191       throws IOException {
192     execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
193       @Override
194       public void call(RegionServerObserver oserver,
195           ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
196         oserver.postReplicateLogEntries(ctx, entries, cells);
197       }
198     });
199   }
200 
201   public ReplicationEndpoint postCreateReplicationEndPoint(final ReplicationEndpoint endpoint)
202       throws IOException {
203     return execOperationWithResult(endpoint, coprocessors.isEmpty() ? null
204         : new CoprocessOperationWithResult<ReplicationEndpoint>() {
205           @Override
206           public void call(RegionServerObserver oserver,
207               ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
208             setResult(oserver.postCreateReplicationEndPoint(ctx, getResult()));
209           }
210         });
211   }
212 
213   private <T> T execOperationWithResult(final T defaultValue,
214       final CoprocessOperationWithResult<T> ctx) throws IOException {
215     if (ctx == null)
216       return defaultValue;
217     ctx.setResult(defaultValue);
218     execOperation(ctx);
219     return ctx.getResult();
220   }
221 
222   private static abstract class CoprocessorOperation
223       extends ObserverContext<RegionServerCoprocessorEnvironment> {
224     public CoprocessorOperation() {
225     }
226 
227     public abstract void call(RegionServerObserver oserver,
228         ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException;
229 
230     public void postEnvCall(RegionServerEnvironment env) {
231     }
232   }
233 
234   private static abstract class CoprocessOperationWithResult<T> extends CoprocessorOperation {
235     private T result = null;
236 
237     public void setResult(final T result) {
238       this.result = result;
239     }
240 
241     public T getResult() {
242       return this.result;
243     }
244   }
245 
246   private boolean execOperation(final CoprocessorOperation ctx) throws IOException {
247     if (ctx == null) return false;
248     boolean bypass = false;
249     List<RegionServerEnvironment> envs = coprocessors.get();
250     for (int i = 0; i < envs.size(); i++) {
251       RegionServerEnvironment env = envs.get(i);
252       if (env.getInstance() instanceof RegionServerObserver) {
253         ctx.prepare(env);
254         Thread currentThread = Thread.currentThread();
255         ClassLoader cl = currentThread.getContextClassLoader();
256         try {
257           currentThread.setContextClassLoader(env.getClassLoader());
258           ctx.call((RegionServerObserver)env.getInstance(), ctx);
259         } catch (Throwable e) {
260           handleCoprocessorThrowable(env, e);
261         } finally {
262           currentThread.setContextClassLoader(cl);
263         }
264         bypass |= ctx.shouldBypass();
265         if (ctx.shouldComplete()) {
266           break;
267         }
268       }
269       ctx.postEnvCall(env);
270     }
271     return bypass;
272   }
273 
274   /**
275    * RegionServer coprocessor classes can be configured in any order, based on that priority is set
276    * and chained in a sorted order. For preStop(), coprocessor methods are invoked in call() and
277    * environment is shutdown in postEnvCall(). <br>
278    * Need to execute all coprocessor methods first then postEnvCall(), otherwise some coprocessors
279    * may remain shutdown if any exception occurs during next coprocessor execution which prevent
280    * RegionServer stop. (Refer:
281    * <a href="https://issues.apache.org/jira/browse/HBASE-16663">HBASE-16663</a>
282    * @param ctx CoprocessorOperation
283    * @return true if bypaas coprocessor execution, false if not.
284    * @throws IOException
285    */
286   private boolean execShutdown(final CoprocessorOperation ctx) throws IOException {
287     if (ctx == null) return false;
288     boolean bypass = false;
289     List<RegionServerEnvironment> envs = coprocessors.get();
290     int envsSize = envs.size();
291     // Iterate the coprocessors and execute CoprocessorOperation's call()
292     for (int i = 0; i < envsSize; i++) {
293       RegionServerEnvironment env = envs.get(i);
294       if (env.getInstance() instanceof RegionServerObserver) {
295         ctx.prepare(env);
296         Thread currentThread = Thread.currentThread();
297         ClassLoader cl = currentThread.getContextClassLoader();
298         try {
299           currentThread.setContextClassLoader(env.getClassLoader());
300           ctx.call((RegionServerObserver) env.getInstance(), ctx);
301         } catch (Throwable e) {
302           handleCoprocessorThrowable(env, e);
303         } finally {
304           currentThread.setContextClassLoader(cl);
305         }
306         bypass |= ctx.shouldBypass();
307         if (ctx.shouldComplete()) {
308           break;
309         }
310       }
311     }
312 
313     // Iterate the coprocessors and execute CoprocessorOperation's postEnvCall()
314     for (int i = 0; i < envsSize; i++) {
315       RegionServerEnvironment env = envs.get(i);
316       ctx.postEnvCall(env);
317     }
318     return bypass;
319   }
320 
321   /**
322    * Coprocessor environment extension providing access to region server
323    * related services.
324    */
325   static class RegionServerEnvironment extends CoprocessorHost.Environment
326       implements RegionServerCoprocessorEnvironment {
327 
328     private RegionServerServices regionServerServices;
329 
330     @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="BC_UNCONFIRMED_CAST",
331         justification="Intentional; FB has trouble detecting isAssignableFrom")
332     public RegionServerEnvironment(final Class<?> implClass,
333         final Coprocessor impl, final int priority, final int seq,
334         final Configuration conf, final RegionServerServices services) {
335       super(impl, priority, seq, conf);
336       this.regionServerServices = services;
337       for (Object itf : ClassUtils.getAllInterfaces(implClass)) {
338         Class<?> c = (Class<?>) itf;
339         if (SingletonCoprocessorService.class.isAssignableFrom(c)) {// FindBugs: BC_UNCONFIRMED_CAST
340           this.regionServerServices.registerService(
341             ((SingletonCoprocessorService) impl).getService());
342           break;
343         }
344       }
345     }
346 
347     @Override
348     public RegionServerServices getRegionServerServices() {
349       return regionServerServices;
350     }
351   }
352 
353   /**
354    * Environment priority comparator. Coprocessors are chained in sorted
355    * order.
356    */
357   static class EnvironmentPriorityComparator implements
358       Comparator<CoprocessorEnvironment> {
359     public int compare(final CoprocessorEnvironment env1,
360         final CoprocessorEnvironment env2) {
361       if (env1.getPriority() < env2.getPriority()) {
362         return -1;
363       } else if (env1.getPriority() > env2.getPriority()) {
364         return 1;
365       }
366       if (env1.getLoadSequence() < env2.getLoadSequence()) {
367         return -1;
368       } else if (env1.getLoadSequence() > env2.getLoadSequence()) {
369         return 1;
370       }
371       return 0;
372     }
373   }
374 }