View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.IOException;
22  import java.util.Comparator;
23  import java.util.List;
24  
25  import org.apache.hadoop.hbase.classification.InterfaceAudience;
26  import org.apache.hadoop.hbase.classification.InterfaceStability;
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.hadoop.hbase.CellScanner;
29  import org.apache.hadoop.hbase.Coprocessor;
30  import org.apache.hadoop.hbase.CoprocessorEnvironment;
31  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
32  import org.apache.hadoop.hbase.MetaMutationAnnotation;
33  import org.apache.hadoop.hbase.client.Mutation;
34  import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
35  import org.apache.hadoop.hbase.coprocessor.ObserverContext;
36  import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
37  import org.apache.hadoop.hbase.coprocessor.RegionServerObserver;
38  import org.apache.hadoop.hbase.coprocessor.SingletonCoprocessorService;
39  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
40  import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
41  
42  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
43  @InterfaceStability.Evolving
44  public class RegionServerCoprocessorHost extends
45      CoprocessorHost<RegionServerCoprocessorHost.RegionServerEnvironment> {
46  
47    private RegionServerServices rsServices;
48  
49    public RegionServerCoprocessorHost(RegionServerServices rsServices,
50        Configuration conf) {
51      super(rsServices);
52      this.rsServices = rsServices;
53      this.conf = conf;
54      // load system default cp's from configuration.
55      loadSystemCoprocessors(conf, REGIONSERVER_COPROCESSOR_CONF_KEY);
56    }
57  
58    @Override
59    public RegionServerEnvironment createEnvironment(Class<?> implClass,
60        Coprocessor instance, int priority, int sequence, Configuration conf) {
61      return new RegionServerEnvironment(implClass, instance, priority,
62        sequence, conf, this.rsServices);
63    }
64  
65    public void preStop(String message) throws IOException {
66      execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
67        @Override
68        public void call(RegionServerObserver oserver,
69            ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
70          oserver.preStopRegionServer(ctx);
71        }
72        @Override
73        public void postEnvCall(RegionServerEnvironment env) {
74          // invoke coprocessor stop method
75          shutdown(env);
76        }
77      });
78    }
79  
80    public boolean preMerge(final HRegion regionA, final HRegion regionB) throws IOException {
81      return execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
82        @Override
83        public void call(RegionServerObserver oserver,
84            ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
85          oserver.preMerge(ctx, regionA, regionB);
86        }
87      });
88    }
89  
90    public void postMerge(final HRegion regionA, final HRegion regionB, final HRegion mergedRegion)
91        throws IOException {
92      execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
93        @Override
94        public void call(RegionServerObserver oserver,
95            ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
96          oserver.postMerge(ctx, regionA, regionB, mergedRegion);
97        }
98      });
99    }
100 
101   public boolean preMergeCommit(final HRegion regionA, final HRegion regionB,
102       final @MetaMutationAnnotation List<Mutation> metaEntries) throws IOException {
103     return execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
104       @Override
105       public void call(RegionServerObserver oserver,
106           ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
107         oserver.preMergeCommit(ctx, regionA, regionB, metaEntries);
108       }
109     });
110   }
111 
112   public void postMergeCommit(final HRegion regionA, final HRegion regionB,
113       final HRegion mergedRegion) throws IOException {
114     execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
115       @Override
116       public void call(RegionServerObserver oserver,
117           ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
118         oserver.postMergeCommit(ctx, regionA, regionB, mergedRegion);
119       }
120     });
121   }
122 
123   public void preRollBackMerge(final HRegion regionA, final HRegion regionB) throws IOException {
124     execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
125       @Override
126       public void call(RegionServerObserver oserver,
127           ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
128         oserver.preRollBackMerge(ctx, regionA, regionB);
129       }
130     });
131   }
132 
133   public void postRollBackMerge(final HRegion regionA, final HRegion regionB) throws IOException {
134     execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
135       @Override
136       public void call(RegionServerObserver oserver,
137           ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
138         oserver.postRollBackMerge(ctx, regionA, regionB);
139       }
140     });
141   }
142 
143   public void preRollWALWriterRequest() throws IOException {
144     execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
145       @Override
146       public void call(RegionServerObserver oserver,
147           ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
148         oserver.preRollWALWriterRequest(ctx);
149       }
150     });
151   }
152 
153   public void postRollWALWriterRequest() throws IOException {
154     execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
155       @Override
156       public void call(RegionServerObserver oserver,
157           ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
158         oserver.postRollWALWriterRequest(ctx);
159       }
160     });
161   }
162 
163   public void preReplicateLogEntries(final List<WALEntry> entries, final CellScanner cells)
164       throws IOException {
165     execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
166       @Override
167       public void call(RegionServerObserver oserver,
168           ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
169         oserver.preReplicateLogEntries(ctx, entries, cells);
170       }
171     });
172   }
173 
174   public void postReplicateLogEntries(final List<WALEntry> entries, final CellScanner cells)
175       throws IOException {
176     execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
177       @Override
178       public void call(RegionServerObserver oserver,
179           ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
180         oserver.postReplicateLogEntries(ctx, entries, cells);
181       }
182     });
183   }
184 
185   public ReplicationEndpoint postCreateReplicationEndPoint(final ReplicationEndpoint endpoint)
186       throws IOException {
187     return execOperationWithResult(endpoint, coprocessors.isEmpty() ? null
188         : new CoprocessOperationWithResult<ReplicationEndpoint>() {
189           @Override
190           public void call(RegionServerObserver oserver,
191               ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
192             setResult(oserver.postCreateReplicationEndPoint(ctx, getResult()));
193           }
194         });
195   }
196 
197   private <T> T execOperationWithResult(final T defaultValue,
198       final CoprocessOperationWithResult<T> ctx) throws IOException {
199     if (ctx == null)
200       return defaultValue;
201     ctx.setResult(defaultValue);
202     execOperation(ctx);
203     return ctx.getResult();
204   }
205 
206   private static abstract class CoprocessorOperation
207       extends ObserverContext<RegionServerCoprocessorEnvironment> {
208     public CoprocessorOperation() {
209     }
210 
211     public abstract void call(RegionServerObserver oserver,
212         ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException;
213 
214     public void postEnvCall(RegionServerEnvironment env) {
215     }
216   }
217 
218   private static abstract class CoprocessOperationWithResult<T> extends CoprocessorOperation {
219     private T result = null;
220 
221     public void setResult(final T result) {
222       this.result = result;
223     }
224 
225     public T getResult() {
226       return this.result;
227     }
228   }
229 
230   private boolean execOperation(final CoprocessorOperation ctx) throws IOException {
231     if (ctx == null) return false;
232 
233     boolean bypass = false;
234     for (RegionServerEnvironment env: coprocessors) {
235       if (env.getInstance() instanceof RegionServerObserver) {
236         ctx.prepare(env);
237         Thread currentThread = Thread.currentThread();
238         ClassLoader cl = currentThread.getContextClassLoader();
239         try {
240           currentThread.setContextClassLoader(env.getClassLoader());
241           ctx.call((RegionServerObserver)env.getInstance(), ctx);
242         } catch (Throwable e) {
243           handleCoprocessorThrowable(env, e);
244         } finally {
245           currentThread.setContextClassLoader(cl);
246         }
247         bypass |= ctx.shouldBypass();
248         if (ctx.shouldComplete()) {
249           break;
250         }
251       }
252       ctx.postEnvCall(env);
253     }
254     return bypass;
255   }
256 
257   /**
258    * Coprocessor environment extension providing access to region server
259    * related services.
260    */
261   static class RegionServerEnvironment extends CoprocessorHost.Environment
262       implements RegionServerCoprocessorEnvironment {
263 
264     private RegionServerServices regionServerServices;
265 
266     public RegionServerEnvironment(final Class<?> implClass,
267         final Coprocessor impl, final int priority, final int seq,
268         final Configuration conf, final RegionServerServices services) {
269       super(impl, priority, seq, conf);
270       this.regionServerServices = services;
271       for (Class c : implClass.getInterfaces()) {
272         if (SingletonCoprocessorService.class.isAssignableFrom(c)) {
273           this.regionServerServices.registerService(((SingletonCoprocessorService) impl).getService());
274           break;
275         }
276       }
277     }
278 
279     @Override
280     public RegionServerServices getRegionServerServices() {
281       return regionServerServices;
282     }
283   }
284 
285   /**
286    * Environment priority comparator. Coprocessors are chained in sorted
287    * order.
288    */
289   static class EnvironmentPriorityComparator implements
290       Comparator<CoprocessorEnvironment> {
291     public int compare(final CoprocessorEnvironment env1,
292         final CoprocessorEnvironment env2) {
293       if (env1.getPriority() < env2.getPriority()) {
294         return -1;
295       } else if (env1.getPriority() > env2.getPriority()) {
296         return 1;
297       }
298       if (env1.getLoadSequence() < env2.getLoadSequence()) {
299         return -1;
300       } else if (env1.getLoadSequence() > env2.getLoadSequence()) {
301         return 1;
302       }
303       return 0;
304     }
305   }
306 }