View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.IOException;
22  import java.util.Comparator;
23  import java.util.List;
24  
25  import org.apache.hadoop.conf.Configuration;
26  import org.apache.hadoop.hbase.Coprocessor;
27  import org.apache.hadoop.hbase.CoprocessorEnvironment;
28  import org.apache.hadoop.hbase.MetaMutationAnnotation;
29  import org.apache.hadoop.hbase.client.Mutation;
30  import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
31  import org.apache.hadoop.hbase.coprocessor.ObserverContext;
32  import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
33  import org.apache.hadoop.hbase.coprocessor.RegionServerObserver;
34  
35  public class RegionServerCoprocessorHost extends
36      CoprocessorHost<RegionServerCoprocessorHost.RegionServerEnvironment> {
37  
38    private RegionServerServices rsServices;
39  
40    public RegionServerCoprocessorHost(RegionServerServices rsServices,
41        Configuration conf) {
42      super(rsServices);
43      this.rsServices = rsServices;
44      this.conf = conf;
45      // load system default cp's from configuration.
46      loadSystemCoprocessors(conf, REGIONSERVER_COPROCESSOR_CONF_KEY);
47    }
48  
49    @Override
50    public RegionServerEnvironment createEnvironment(Class<?> implClass,
51        Coprocessor instance, int priority, int sequence, Configuration conf) {
52      return new RegionServerEnvironment(implClass, instance, priority,
53        sequence, conf, this.rsServices);
54    }
55  
56    public void preStop(String message) throws IOException {
57      ObserverContext<RegionServerCoprocessorEnvironment> ctx = null;
58      for (RegionServerEnvironment env : coprocessors) {
59        if (env.getInstance() instanceof RegionServerObserver) {
60          ctx = ObserverContext.createAndPrepare(env, ctx);
61          Thread currentThread = Thread.currentThread();
62          ClassLoader cl = currentThread.getContextClassLoader();
63          try {
64            currentThread.setContextClassLoader(env.getClassLoader());
65            ((RegionServerObserver) env.getInstance()).preStopRegionServer(ctx);
66          } catch (Throwable e) {
67            handleCoprocessorThrowable(env, e);
68          } finally {
69            currentThread.setContextClassLoader(cl);
70          }
71          if (ctx.shouldComplete()) {
72            break;
73          }
74        }
75        // invoke coprocessor stop method
76        shutdown(env);
77      }
78    }
79  
80    public boolean preMerge(final HRegion regionA, final HRegion regionB) throws IOException {
81      boolean bypass = false;
82      ObserverContext<RegionServerCoprocessorEnvironment> ctx = null;
83      for (RegionServerEnvironment env : coprocessors) {
84        if (env.getInstance() instanceof RegionServerObserver) {
85          ctx = ObserverContext.createAndPrepare(env, ctx);
86          Thread currentThread = Thread.currentThread();
87          ClassLoader cl = currentThread.getContextClassLoader();
88          try {
89            currentThread.setContextClassLoader(env.getClassLoader());
90            ((RegionServerObserver) env.getInstance()).preMerge(ctx, regionA, regionB);
91          } catch (Throwable e) {
92            handleCoprocessorThrowable(env, e);
93          } finally {
94            currentThread.setContextClassLoader(cl);
95          }
96          bypass |= ctx.shouldBypass();
97          if (ctx.shouldComplete()) {
98            break;
99          }
100       }
101     }
102     return bypass;
103   }
104 
105   public void postMerge(final HRegion regionA, final HRegion regionB, final HRegion mergedRegion)
106       throws IOException {
107     ObserverContext<RegionServerCoprocessorEnvironment> ctx = null;
108     for (RegionServerEnvironment env : coprocessors) {
109       if (env.getInstance() instanceof RegionServerObserver) {
110         ctx = ObserverContext.createAndPrepare(env, ctx);
111         Thread currentThread = Thread.currentThread();
112         ClassLoader cl = currentThread.getContextClassLoader();
113         try {
114           currentThread.setContextClassLoader(env.getClassLoader());
115           ((RegionServerObserver) env.getInstance()).postMerge(ctx, regionA, regionB, mergedRegion);
116         } catch (Throwable e) {
117           handleCoprocessorThrowable(env, e);
118         } finally {
119           currentThread.setContextClassLoader(cl);
120         }
121         if (ctx.shouldComplete()) {
122           break;
123         }
124       }
125     }
126   }
127 
128   public boolean preMergeCommit(final HRegion regionA, final HRegion regionB,
129       final @MetaMutationAnnotation List<Mutation> metaEntries) throws IOException {
130     boolean bypass = false;
131     ObserverContext<RegionServerCoprocessorEnvironment> ctx = null;
132     for (RegionServerEnvironment env : coprocessors) {
133       if (env.getInstance() instanceof RegionServerObserver) {
134         ctx = ObserverContext.createAndPrepare(env, ctx);
135         Thread currentThread = Thread.currentThread();
136         ClassLoader cl = currentThread.getContextClassLoader();
137         try {
138           currentThread.setContextClassLoader(env.getClassLoader());
139           ((RegionServerObserver) env.getInstance()).preMergeCommit(ctx, regionA, regionB,
140             metaEntries);
141         } catch (Throwable e) {
142           handleCoprocessorThrowable(env, e);
143         } finally {
144           currentThread.setContextClassLoader(cl);
145         }
146         bypass |= ctx.shouldBypass();
147         if (ctx.shouldComplete()) {
148           break;
149         }
150       }
151     }
152     return bypass;
153   }
154 
155   public void postMergeCommit(final HRegion regionA, final HRegion regionB,
156       final HRegion mergedRegion) throws IOException {
157     ObserverContext<RegionServerCoprocessorEnvironment> ctx = null;
158     for (RegionServerEnvironment env : coprocessors) {
159       if (env.getInstance() instanceof RegionServerObserver) {
160         ctx = ObserverContext.createAndPrepare(env, ctx);
161         Thread currentThread = Thread.currentThread();
162         ClassLoader cl = currentThread.getContextClassLoader();
163         try {
164           currentThread.setContextClassLoader(env.getClassLoader());
165           ((RegionServerObserver) env.getInstance()).postMergeCommit(ctx, regionA, regionB,
166             mergedRegion);
167         } catch (Throwable e) {
168           handleCoprocessorThrowable(env, e);
169         } finally {
170           currentThread.setContextClassLoader(cl);
171         }
172         if (ctx.shouldComplete()) {
173           break;
174         }
175       }
176     }
177   }
178 
179   public void preRollBackMerge(final HRegion regionA, final HRegion regionB) throws IOException {
180     ObserverContext<RegionServerCoprocessorEnvironment> ctx = null;
181     for (RegionServerEnvironment env : coprocessors) {
182       if (env.getInstance() instanceof RegionServerObserver) {
183         ctx = ObserverContext.createAndPrepare(env, ctx);
184         Thread currentThread = Thread.currentThread();
185         ClassLoader cl = currentThread.getContextClassLoader();
186         try {
187           currentThread.setContextClassLoader(env.getClassLoader());
188           ((RegionServerObserver) env.getInstance()).preRollBackMerge(ctx, regionA, regionB);
189         } catch (Throwable e) {
190           handleCoprocessorThrowable(env, e);
191         } finally {
192           currentThread.setContextClassLoader(cl);
193         }
194         if (ctx.shouldComplete()) {
195           break;
196         }
197       }
198     }
199   }
200 
201   public void postRollBackMerge(final HRegion regionA, final HRegion regionB) throws IOException {
202     ObserverContext<RegionServerCoprocessorEnvironment> ctx = null;
203     for (RegionServerEnvironment env : coprocessors) {
204       if (env.getInstance() instanceof RegionServerObserver) {
205         ctx = ObserverContext.createAndPrepare(env, ctx);
206         Thread currentThread = Thread.currentThread();
207         ClassLoader cl = currentThread.getContextClassLoader();
208         try {
209           currentThread.setContextClassLoader(env.getClassLoader());
210           ((RegionServerObserver) env.getInstance()).postRollBackMerge(ctx, regionA, regionB);
211         } catch (Throwable e) {
212           handleCoprocessorThrowable(env, e);
213         } finally {
214           currentThread.setContextClassLoader(cl);
215         }
216         if (ctx.shouldComplete()) {
217           break;
218         }
219       }
220     }
221   }
222 
223   /**
224    * Coprocessor environment extension providing access to region server
225    * related services.
226    */
227   static class RegionServerEnvironment extends CoprocessorHost.Environment
228       implements RegionServerCoprocessorEnvironment {
229 
230     private RegionServerServices regionServerServices;
231 
232     public RegionServerEnvironment(final Class<?> implClass,
233         final Coprocessor impl, final int priority, final int seq,
234         final Configuration conf, final RegionServerServices services) {
235       super(impl, priority, seq, conf);
236       this.regionServerServices = services;
237     }
238 
239     @Override
240     public RegionServerServices getRegionServerServices() {
241       return regionServerServices;
242     }
243   }
244 
245   /**
246    * Environment priority comparator. Coprocessors are chained in sorted
247    * order.
248    */
249   static class EnvironmentPriorityComparator implements
250       Comparator<CoprocessorEnvironment> {
251     public int compare(final CoprocessorEnvironment env1,
252         final CoprocessorEnvironment env2) {
253       if (env1.getPriority() < env2.getPriority()) {
254         return -1;
255       } else if (env1.getPriority() > env2.getPriority()) {
256         return 1;
257       }
258       if (env1.getLoadSequence() < env2.getLoadSequence()) {
259         return -1;
260       } else if (env1.getLoadSequence() > env2.getLoadSequence()) {
261         return 1;
262       }
263       return 0;
264     }
265   }
266 }