View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.IOException;
22  import java.util.Comparator;
23  import java.util.List;
24  
25  import org.apache.hadoop.hbase.classification.InterfaceAudience;
26  import org.apache.hadoop.hbase.classification.InterfaceStability;
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.hadoop.hbase.Coprocessor;
29  import org.apache.hadoop.hbase.CoprocessorEnvironment;
30  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
31  import org.apache.hadoop.hbase.MetaMutationAnnotation;
32  import org.apache.hadoop.hbase.client.Mutation;
33  import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
34  import org.apache.hadoop.hbase.coprocessor.ObserverContext;
35  import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
36  import org.apache.hadoop.hbase.coprocessor.RegionServerObserver;
37  import org.apache.hadoop.hbase.coprocessor.SingletonCoprocessorService;
38  import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
39  
40  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
41  @InterfaceStability.Evolving
42  public class RegionServerCoprocessorHost extends
43      CoprocessorHost<RegionServerCoprocessorHost.RegionServerEnvironment> {
44  
45    private RegionServerServices rsServices;
46  
47    public RegionServerCoprocessorHost(RegionServerServices rsServices,
48        Configuration conf) {
49      super(rsServices);
50      this.rsServices = rsServices;
51      this.conf = conf;
52      // load system default cp's from configuration.
53      loadSystemCoprocessors(conf, REGIONSERVER_COPROCESSOR_CONF_KEY);
54    }
55  
56    @Override
57    public RegionServerEnvironment createEnvironment(Class<?> implClass,
58        Coprocessor instance, int priority, int sequence, Configuration conf) {
59      return new RegionServerEnvironment(implClass, instance, priority,
60        sequence, conf, this.rsServices);
61    }
62  
63    public void preStop(String message) throws IOException {
64      execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
65        @Override
66        public void call(RegionServerObserver oserver,
67            ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
68          oserver.preStopRegionServer(ctx);
69        }
70        @Override
71        public void postEnvCall(RegionServerEnvironment env) {
72          // invoke coprocessor stop method
73          shutdown(env);
74        }
75      });
76    }
77  
78    public boolean preMerge(final HRegion regionA, final HRegion regionB) throws IOException {
79      return execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
80        @Override
81        public void call(RegionServerObserver oserver,
82            ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
83          oserver.preMerge(ctx, regionA, regionB);
84        }
85      });
86    }
87  
88    public void postMerge(final HRegion regionA, final HRegion regionB, final HRegion mergedRegion)
89        throws IOException {
90      execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
91        @Override
92        public void call(RegionServerObserver oserver,
93            ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
94          oserver.postMerge(ctx, regionA, regionB, mergedRegion);
95        }
96      });
97    }
98  
99    public boolean preMergeCommit(final HRegion regionA, final HRegion regionB,
100       final @MetaMutationAnnotation List<Mutation> metaEntries) throws IOException {
101     return execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
102       @Override
103       public void call(RegionServerObserver oserver,
104           ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
105         oserver.preMergeCommit(ctx, regionA, regionB, metaEntries);
106       }
107     });
108   }
109 
110   public void postMergeCommit(final HRegion regionA, final HRegion regionB,
111       final HRegion mergedRegion) throws IOException {
112     execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
113       @Override
114       public void call(RegionServerObserver oserver,
115           ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
116         oserver.postMergeCommit(ctx, regionA, regionB, mergedRegion);
117       }
118     });
119   }
120 
121   public void preRollBackMerge(final HRegion regionA, final HRegion regionB) throws IOException {
122     execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
123       @Override
124       public void call(RegionServerObserver oserver,
125           ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
126         oserver.preRollBackMerge(ctx, regionA, regionB);
127       }
128     });
129   }
130 
131   public void postRollBackMerge(final HRegion regionA, final HRegion regionB) throws IOException {
132     execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
133       @Override
134       public void call(RegionServerObserver oserver,
135           ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
136         oserver.postRollBackMerge(ctx, regionA, regionB);
137       }
138     });
139   }
140 
141   public void preRollWALWriterRequest() throws IOException {
142     execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
143       @Override
144       public void call(RegionServerObserver oserver,
145           ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
146         oserver.preRollWALWriterRequest(ctx);
147       }
148     });
149   }
150 
151   public void postRollWALWriterRequest() throws IOException {
152     execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
153       @Override
154       public void call(RegionServerObserver oserver,
155           ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
156         oserver.postRollWALWriterRequest(ctx);
157       }
158     });
159   }
160 
161   public ReplicationEndpoint postCreateReplicationEndPoint(final ReplicationEndpoint endpoint)
162       throws IOException {
163     return execOperationWithResult(endpoint, coprocessors.isEmpty() ? null
164         : new CoprocessOperationWithResult<ReplicationEndpoint>() {
165           @Override
166           public void call(RegionServerObserver oserver,
167               ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
168             setResult(oserver.postCreateReplicationEndPoint(ctx, getResult()));
169           }
170         });
171   }
172 
173   private <T> T execOperationWithResult(final T defaultValue,
174       final CoprocessOperationWithResult<T> ctx) throws IOException {
175     if (ctx == null)
176       return defaultValue;
177     ctx.setResult(defaultValue);
178     execOperation(ctx);
179     return ctx.getResult();
180   }
181 
182   private static abstract class CoprocessorOperation
183       extends ObserverContext<RegionServerCoprocessorEnvironment> {
184     public CoprocessorOperation() {
185     }
186 
187     public abstract void call(RegionServerObserver oserver,
188         ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException;
189 
190     public void postEnvCall(RegionServerEnvironment env) {
191     }
192   }
193 
194   private static abstract class CoprocessOperationWithResult<T> extends CoprocessorOperation {
195     private T result = null;
196 
197     public void setResult(final T result) {
198       this.result = result;
199     }
200 
201     public T getResult() {
202       return this.result;
203     }
204   }
205 
206   private boolean execOperation(final CoprocessorOperation ctx) throws IOException {
207     if (ctx == null) return false;
208 
209     boolean bypass = false;
210     for (RegionServerEnvironment env: coprocessors) {
211       if (env.getInstance() instanceof RegionServerObserver) {
212         ctx.prepare(env);
213         Thread currentThread = Thread.currentThread();
214         ClassLoader cl = currentThread.getContextClassLoader();
215         try {
216           currentThread.setContextClassLoader(env.getClassLoader());
217           ctx.call((RegionServerObserver)env.getInstance(), ctx);
218         } catch (Throwable e) {
219           handleCoprocessorThrowable(env, e);
220         } finally {
221           currentThread.setContextClassLoader(cl);
222         }
223         bypass |= ctx.shouldBypass();
224         if (ctx.shouldComplete()) {
225           break;
226         }
227       }
228       ctx.postEnvCall(env);
229     }
230     return bypass;
231   }
232 
233   /**
234    * Coprocessor environment extension providing access to region server
235    * related services.
236    */
237   static class RegionServerEnvironment extends CoprocessorHost.Environment
238       implements RegionServerCoprocessorEnvironment {
239 
240     private RegionServerServices regionServerServices;
241 
242     public RegionServerEnvironment(final Class<?> implClass,
243         final Coprocessor impl, final int priority, final int seq,
244         final Configuration conf, final RegionServerServices services) {
245       super(impl, priority, seq, conf);
246       this.regionServerServices = services;
247       for (Class c : implClass.getInterfaces()) {
248         if (SingletonCoprocessorService.class.isAssignableFrom(c)) {
249           this.regionServerServices.registerService(((SingletonCoprocessorService) impl).getService());
250           break;
251         }
252       }
253     }
254 
255     @Override
256     public RegionServerServices getRegionServerServices() {
257       return regionServerServices;
258     }
259   }
260 
261   /**
262    * Environment priority comparator. Coprocessors are chained in sorted
263    * order.
264    */
265   static class EnvironmentPriorityComparator implements
266       Comparator<CoprocessorEnvironment> {
267     public int compare(final CoprocessorEnvironment env1,
268         final CoprocessorEnvironment env2) {
269       if (env1.getPriority() < env2.getPriority()) {
270         return -1;
271       } else if (env1.getPriority() > env2.getPriority()) {
272         return 1;
273       }
274       if (env1.getLoadSequence() < env2.getLoadSequence()) {
275         return -1;
276       } else if (env1.getLoadSequence() > env2.getLoadSequence()) {
277         return 1;
278       }
279       return 0;
280     }
281   }
282 }