View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.IOException;
22  import java.util.Comparator;
23  import java.util.List;
24  
25  import org.apache.hadoop.conf.Configuration;
26  import org.apache.hadoop.hbase.Coprocessor;
27  import org.apache.hadoop.hbase.CoprocessorEnvironment;
28  import org.apache.hadoop.hbase.MetaMutationAnnotation;
29  import org.apache.hadoop.hbase.client.Mutation;
30  import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
31  import org.apache.hadoop.hbase.coprocessor.ObserverContext;
32  import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
33  import org.apache.hadoop.hbase.coprocessor.RegionServerObserver;
34  
35  public class RegionServerCoprocessorHost extends
36      CoprocessorHost<RegionServerCoprocessorHost.RegionServerEnvironment> {
37  
38    private RegionServerServices rsServices;
39  
40    public RegionServerCoprocessorHost(RegionServerServices rsServices,
41        Configuration conf) {
42      super(rsServices);
43      this.rsServices = rsServices;
44      this.conf = conf;
45      // load system default cp's from configuration.
46      loadSystemCoprocessors(conf, REGIONSERVER_COPROCESSOR_CONF_KEY);
47    }
48  
49    @Override
50    public RegionServerEnvironment createEnvironment(Class<?> implClass,
51        Coprocessor instance, int priority, int sequence, Configuration conf) {
52      return new RegionServerEnvironment(implClass, instance, priority,
53        sequence, conf, this.rsServices);
54    }
55  
56    public void preStop(String message) throws IOException {
57      ObserverContext<RegionServerCoprocessorEnvironment> ctx = null;
58      for (RegionServerEnvironment env : coprocessors) {
59        if (env.getInstance() instanceof RegionServerObserver) {
60          ctx = ObserverContext.createAndPrepare(env, ctx);
61          Thread currentThread = Thread.currentThread();
62          ClassLoader cl = currentThread.getContextClassLoader();
63          try {
64            currentThread.setContextClassLoader(env.getClassLoader());
65            ((RegionServerObserver) env.getInstance()).preStopRegionServer(ctx);
66          } catch (Throwable e) {
67            handleCoprocessorThrowable(env, e);
68          } finally {
69            currentThread.setContextClassLoader(cl);
70          }
71          if (ctx.shouldComplete()) {
72            break;
73          }
74        }
75      }
76    }
77  
78    public boolean preMerge(final HRegion regionA, final HRegion regionB) throws IOException {
79      boolean bypass = false;
80      ObserverContext<RegionServerCoprocessorEnvironment> ctx = null;
81      for (RegionServerEnvironment env : coprocessors) {
82        if (env.getInstance() instanceof RegionServerObserver) {
83          ctx = ObserverContext.createAndPrepare(env, ctx);
84          Thread currentThread = Thread.currentThread();
85          ClassLoader cl = currentThread.getContextClassLoader();
86          try {
87            currentThread.setContextClassLoader(env.getClassLoader());
88            ((RegionServerObserver) env.getInstance()).preMerge(ctx, regionA, regionB);
89          } catch (Throwable e) {
90            handleCoprocessorThrowable(env, e);
91          } finally {
92            currentThread.setContextClassLoader(cl);
93          }
94          bypass |= ctx.shouldBypass();
95          if (ctx.shouldComplete()) {
96            break;
97          }
98        }
99      }
100     return bypass;
101   }
102 
103   public void postMerge(final HRegion regionA, final HRegion regionB, final HRegion mergedRegion)
104       throws IOException {
105     ObserverContext<RegionServerCoprocessorEnvironment> ctx = null;
106     for (RegionServerEnvironment env : coprocessors) {
107       if (env.getInstance() instanceof RegionServerObserver) {
108         ctx = ObserverContext.createAndPrepare(env, ctx);
109         Thread currentThread = Thread.currentThread();
110         ClassLoader cl = currentThread.getContextClassLoader();
111         try {
112           currentThread.setContextClassLoader(env.getClassLoader());
113           ((RegionServerObserver) env.getInstance()).postMerge(ctx, regionA, regionB, mergedRegion);
114         } catch (Throwable e) {
115           handleCoprocessorThrowable(env, e);
116         } finally {
117           currentThread.setContextClassLoader(cl);
118         }
119         if (ctx.shouldComplete()) {
120           break;
121         }
122       }
123     }
124   }
125 
126   public boolean preMergeCommit(final HRegion regionA, final HRegion regionB,
127       final @MetaMutationAnnotation List<Mutation> metaEntries) throws IOException {
128     boolean bypass = false;
129     ObserverContext<RegionServerCoprocessorEnvironment> ctx = null;
130     for (RegionServerEnvironment env : coprocessors) {
131       if (env.getInstance() instanceof RegionServerObserver) {
132         ctx = ObserverContext.createAndPrepare(env, ctx);
133         Thread currentThread = Thread.currentThread();
134         ClassLoader cl = currentThread.getContextClassLoader();
135         try {
136           currentThread.setContextClassLoader(env.getClassLoader());
137           ((RegionServerObserver) env.getInstance()).preMergeCommit(ctx, regionA, regionB,
138             metaEntries);
139         } catch (Throwable e) {
140           handleCoprocessorThrowable(env, e);
141         } finally {
142           currentThread.setContextClassLoader(cl);
143         }
144         bypass |= ctx.shouldBypass();
145         if (ctx.shouldComplete()) {
146           break;
147         }
148       }
149     }
150     return bypass;
151   }
152 
153   public void postMergeCommit(final HRegion regionA, final HRegion regionB,
154       final HRegion mergedRegion) throws IOException {
155     ObserverContext<RegionServerCoprocessorEnvironment> ctx = null;
156     for (RegionServerEnvironment env : coprocessors) {
157       if (env.getInstance() instanceof RegionServerObserver) {
158         ctx = ObserverContext.createAndPrepare(env, ctx);
159         Thread currentThread = Thread.currentThread();
160         ClassLoader cl = currentThread.getContextClassLoader();
161         try {
162           currentThread.setContextClassLoader(env.getClassLoader());
163           ((RegionServerObserver) env.getInstance()).postMergeCommit(ctx, regionA, regionB,
164             mergedRegion);
165         } catch (Throwable e) {
166           handleCoprocessorThrowable(env, e);
167         } finally {
168           currentThread.setContextClassLoader(cl);
169         }
170         if (ctx.shouldComplete()) {
171           break;
172         }
173       }
174     }
175   }
176 
177   public void preRollBackMerge(final HRegion regionA, final HRegion regionB) throws IOException {
178     ObserverContext<RegionServerCoprocessorEnvironment> ctx = null;
179     for (RegionServerEnvironment env : coprocessors) {
180       if (env.getInstance() instanceof RegionServerObserver) {
181         ctx = ObserverContext.createAndPrepare(env, ctx);
182         Thread currentThread = Thread.currentThread();
183         ClassLoader cl = currentThread.getContextClassLoader();
184         try {
185           currentThread.setContextClassLoader(env.getClassLoader());
186           ((RegionServerObserver) env.getInstance()).preRollBackMerge(ctx, regionA, regionB);
187         } catch (Throwable e) {
188           handleCoprocessorThrowable(env, e);
189         } finally {
190           currentThread.setContextClassLoader(cl);
191         }
192         if (ctx.shouldComplete()) {
193           break;
194         }
195       }
196     }
197   }
198 
199   public void postRollBackMerge(final HRegion regionA, final HRegion regionB) throws IOException {
200     ObserverContext<RegionServerCoprocessorEnvironment> ctx = null;
201     for (RegionServerEnvironment env : coprocessors) {
202       if (env.getInstance() instanceof RegionServerObserver) {
203         ctx = ObserverContext.createAndPrepare(env, ctx);
204         Thread currentThread = Thread.currentThread();
205         ClassLoader cl = currentThread.getContextClassLoader();
206         try {
207           currentThread.setContextClassLoader(env.getClassLoader());
208           ((RegionServerObserver) env.getInstance()).postRollBackMerge(ctx, regionA, regionB);
209         } catch (Throwable e) {
210           handleCoprocessorThrowable(env, e);
211         } finally {
212           currentThread.setContextClassLoader(cl);
213         }
214         if (ctx.shouldComplete()) {
215           break;
216         }
217       }
218     }
219   }
220 
221   /**
222    * Coprocessor environment extension providing access to region server
223    * related services.
224    */
225   static class RegionServerEnvironment extends CoprocessorHost.Environment
226       implements RegionServerCoprocessorEnvironment {
227 
228     private RegionServerServices regionServerServices;
229 
230     public RegionServerEnvironment(final Class<?> implClass,
231         final Coprocessor impl, final int priority, final int seq,
232         final Configuration conf, final RegionServerServices services) {
233       super(impl, priority, seq, conf);
234       this.regionServerServices = services;
235     }
236 
237     @Override
238     public RegionServerServices getRegionServerServices() {
239       return regionServerServices;
240     }
241   }
242 
243   /**
244    * Environment priority comparator. Coprocessors are chained in sorted
245    * order.
246    */
247   static class EnvironmentPriorityComparator implements
248       Comparator<CoprocessorEnvironment> {
249     public int compare(final CoprocessorEnvironment env1,
250         final CoprocessorEnvironment env2) {
251       if (env1.getPriority() < env2.getPriority()) {
252         return -1;
253       } else if (env1.getPriority() > env2.getPriority()) {
254         return 1;
255       }
256       if (env1.getLoadSequence() < env2.getLoadSequence()) {
257         return -1;
258       } else if (env1.getLoadSequence() > env2.getLoadSequence()) {
259         return 1;
260       }
261       return 0;
262     }
263   }
264 }