View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.IOException;
22  import java.util.Comparator;
23  import java.util.List;
24  
25  import org.apache.hadoop.classification.InterfaceAudience;
26  import org.apache.hadoop.classification.InterfaceStability;
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.hadoop.hbase.Coprocessor;
29  import org.apache.hadoop.hbase.CoprocessorEnvironment;
30  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
31  import org.apache.hadoop.hbase.MetaMutationAnnotation;
32  import org.apache.hadoop.hbase.client.Mutation;
33  import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
34  import org.apache.hadoop.hbase.coprocessor.ObserverContext;
35  import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
36  import org.apache.hadoop.hbase.coprocessor.RegionServerObserver;
37  
38  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
39  @InterfaceStability.Evolving
40  public class RegionServerCoprocessorHost extends
41      CoprocessorHost<RegionServerCoprocessorHost.RegionServerEnvironment> {
42  
43    private RegionServerServices rsServices;
44  
45    public RegionServerCoprocessorHost(RegionServerServices rsServices,
46        Configuration conf) {
47      super(rsServices);
48      this.rsServices = rsServices;
49      this.conf = conf;
50      // load system default cp's from configuration.
51      loadSystemCoprocessors(conf, REGIONSERVER_COPROCESSOR_CONF_KEY);
52    }
53  
54    @Override
55    public RegionServerEnvironment createEnvironment(Class<?> implClass,
56        Coprocessor instance, int priority, int sequence, Configuration conf) {
57      return new RegionServerEnvironment(implClass, instance, priority,
58        sequence, conf, this.rsServices);
59    }
60  
61    public void preStop(String message) throws IOException {
62      execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
63        @Override
64        public void call(RegionServerObserver oserver,
65            ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
66          oserver.preStopRegionServer(ctx);
67        }
68        @Override
69        public void postEnvCall(RegionServerEnvironment env) {
70          // invoke coprocessor stop method
71          shutdown(env);
72        }
73      });
74    }
75  
76    public boolean preMerge(final HRegion regionA, final HRegion regionB) throws IOException {
77      return execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
78        @Override
79        public void call(RegionServerObserver oserver,
80            ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
81          oserver.preMerge(ctx, regionA, regionB);
82        }
83      });
84    }
85  
86    public void postMerge(final HRegion regionA, final HRegion regionB, final HRegion mergedRegion)
87        throws IOException {
88      execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
89        @Override
90        public void call(RegionServerObserver oserver,
91            ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
92          oserver.postMerge(ctx, regionA, regionB, mergedRegion);
93        }
94      });
95    }
96  
97    public boolean preMergeCommit(final HRegion regionA, final HRegion regionB,
98        final @MetaMutationAnnotation List<Mutation> metaEntries) throws IOException {
99      return execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
100       @Override
101       public void call(RegionServerObserver oserver,
102           ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
103         oserver.preMergeCommit(ctx, regionA, regionB, metaEntries);
104       }
105     });
106   }
107 
108   public void postMergeCommit(final HRegion regionA, final HRegion regionB,
109       final HRegion mergedRegion) throws IOException {
110     execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
111       @Override
112       public void call(RegionServerObserver oserver,
113           ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
114         oserver.postMergeCommit(ctx, regionA, regionB, mergedRegion);
115       }
116     });
117   }
118 
119   public void preRollBackMerge(final HRegion regionA, final HRegion regionB) throws IOException {
120     execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
121       @Override
122       public void call(RegionServerObserver oserver,
123           ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
124         oserver.preRollBackMerge(ctx, regionA, regionB);
125       }
126     });
127   }
128 
129   public void postRollBackMerge(final HRegion regionA, final HRegion regionB) throws IOException {
130     execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
131       @Override
132       public void call(RegionServerObserver oserver,
133           ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
134         oserver.postRollBackMerge(ctx, regionA, regionB);
135       }
136     });
137   }
138 
139   public void preRollWALWriterRequest() throws IOException {
140     execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
141       @Override
142       public void call(RegionServerObserver oserver,
143           ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
144         oserver.preRollWALWriterRequest(ctx);
145       }
146     });
147   }
148 
149   public void postRollWALWriterRequest() throws IOException {
150     execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
151       @Override
152       public void call(RegionServerObserver oserver,
153           ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
154         oserver.postRollWALWriterRequest(ctx);
155       }
156     });
157   }
158 
159   private static abstract class CoprocessorOperation
160       extends ObserverContext<RegionServerCoprocessorEnvironment> {
161     public CoprocessorOperation() {
162     }
163 
164     public abstract void call(RegionServerObserver oserver,
165         ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException;
166 
167     public void postEnvCall(RegionServerEnvironment env) {
168     }
169   }
170 
171   private boolean execOperation(final CoprocessorOperation ctx) throws IOException {
172     if (ctx == null) return false;
173 
174     boolean bypass = false;
175     for (RegionServerEnvironment env: coprocessors) {
176       if (env.getInstance() instanceof RegionServerObserver) {
177         ctx.prepare(env);
178         Thread currentThread = Thread.currentThread();
179         ClassLoader cl = currentThread.getContextClassLoader();
180         try {
181           currentThread.setContextClassLoader(env.getClassLoader());
182           ctx.call((RegionServerObserver)env.getInstance(), ctx);
183         } catch (Throwable e) {
184           handleCoprocessorThrowable(env, e);
185         } finally {
186           currentThread.setContextClassLoader(cl);
187         }
188         bypass |= ctx.shouldBypass();
189         if (ctx.shouldComplete()) {
190           break;
191         }
192       }
193       ctx.postEnvCall(env);
194     }
195     return bypass;
196   }
197 
198   /**
199    * Coprocessor environment extension providing access to region server
200    * related services.
201    */
202   static class RegionServerEnvironment extends CoprocessorHost.Environment
203       implements RegionServerCoprocessorEnvironment {
204 
205     private RegionServerServices regionServerServices;
206 
207     public RegionServerEnvironment(final Class<?> implClass,
208         final Coprocessor impl, final int priority, final int seq,
209         final Configuration conf, final RegionServerServices services) {
210       super(impl, priority, seq, conf);
211       this.regionServerServices = services;
212     }
213 
214     @Override
215     public RegionServerServices getRegionServerServices() {
216       return regionServerServices;
217     }
218   }
219 
220   /**
221    * Environment priority comparator. Coprocessors are chained in sorted
222    * order.
223    */
224   static class EnvironmentPriorityComparator implements
225       Comparator<CoprocessorEnvironment> {
226     public int compare(final CoprocessorEnvironment env1,
227         final CoprocessorEnvironment env2) {
228       if (env1.getPriority() < env2.getPriority()) {
229         return -1;
230       } else if (env1.getPriority() > env2.getPriority()) {
231         return 1;
232       }
233       if (env1.getLoadSequence() < env2.getLoadSequence()) {
234         return -1;
235       } else if (env1.getLoadSequence() > env2.getLoadSequence()) {
236         return 1;
237       }
238       return 0;
239     }
240   }
241 }