View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.IOException;
22  import java.util.Comparator;
23  import java.util.List;
24  
25  import org.apache.hadoop.conf.Configuration;
26  import org.apache.hadoop.hbase.Coprocessor;
27  import org.apache.hadoop.hbase.CoprocessorEnvironment;
28  import org.apache.hadoop.hbase.MetaMutationAnnotation;
29  import org.apache.hadoop.hbase.client.Mutation;
30  import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
31  import org.apache.hadoop.hbase.coprocessor.ObserverContext;
32  import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
33  import org.apache.hadoop.hbase.coprocessor.RegionServerObserver;
34  
35  public class RegionServerCoprocessorHost extends
36      CoprocessorHost<RegionServerCoprocessorHost.RegionServerEnvironment> {
37  
38    private RegionServerServices rsServices;
39  
40    public RegionServerCoprocessorHost(RegionServerServices rsServices,
41        Configuration conf) {
42      super(rsServices);
43      this.rsServices = rsServices;
44      this.conf = conf;
45      // load system default cp's from configuration.
46      loadSystemCoprocessors(conf, REGIONSERVER_COPROCESSOR_CONF_KEY);
47    }
48  
49    @Override
50    public RegionServerEnvironment createEnvironment(Class<?> implClass,
51        Coprocessor instance, int priority, int sequence, Configuration conf) {
52      return new RegionServerEnvironment(implClass, instance, priority,
53        sequence, conf, this.rsServices);
54    }
55  
56    public void preStop(String message) throws IOException {
57      execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
58        @Override
59        public void call(RegionServerObserver oserver,
60            ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
61          oserver.preStopRegionServer(ctx);
62        }
63        @Override
64        public void postEnvCall(RegionServerEnvironment env) {
65          // invoke coprocessor stop method
66          shutdown(env);
67        }
68      });
69    }
70  
71    public boolean preMerge(final HRegion regionA, final HRegion regionB) throws IOException {
72      return execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
73        @Override
74        public void call(RegionServerObserver oserver,
75            ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
76          oserver.preMerge(ctx, regionA, regionB);
77        }
78      });
79    }
80  
81    public void postMerge(final HRegion regionA, final HRegion regionB, final HRegion mergedRegion)
82        throws IOException {
83      execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
84        @Override
85        public void call(RegionServerObserver oserver,
86            ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
87          oserver.postMerge(ctx, regionA, regionB, mergedRegion);
88        }
89      });
90    }
91  
92    public boolean preMergeCommit(final HRegion regionA, final HRegion regionB,
93        final @MetaMutationAnnotation List<Mutation> metaEntries) throws IOException {
94      return execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
95        @Override
96        public void call(RegionServerObserver oserver,
97            ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
98          oserver.preMergeCommit(ctx, regionA, regionB, metaEntries);
99        }
100     });
101   }
102 
103   public void postMergeCommit(final HRegion regionA, final HRegion regionB,
104       final HRegion mergedRegion) throws IOException {
105     execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
106       @Override
107       public void call(RegionServerObserver oserver,
108           ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
109         oserver.postMergeCommit(ctx, regionA, regionB, mergedRegion);
110       }
111     });
112   }
113 
114   public void preRollBackMerge(final HRegion regionA, final HRegion regionB) throws IOException {
115     execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
116       @Override
117       public void call(RegionServerObserver oserver,
118           ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
119         oserver.preRollBackMerge(ctx, regionA, regionB);
120       }
121     });
122   }
123 
124   public void postRollBackMerge(final HRegion regionA, final HRegion regionB) throws IOException {
125     execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
126       @Override
127       public void call(RegionServerObserver oserver,
128           ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
129         oserver.postRollBackMerge(ctx, regionA, regionB);
130       }
131     });
132   }
133 
134   private static abstract class CoprocessorOperation
135       extends ObserverContext<RegionServerCoprocessorEnvironment> {
136     public CoprocessorOperation() {
137     }
138 
139     public abstract void call(RegionServerObserver oserver,
140         ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException;
141 
142     public void postEnvCall(RegionServerEnvironment env) {
143     }
144   }
145 
146   private boolean execOperation(final CoprocessorOperation ctx) throws IOException {
147     if (ctx == null) return false;
148 
149     boolean bypass = false;
150     for (RegionServerEnvironment env: coprocessors) {
151       if (env.getInstance() instanceof RegionServerObserver) {
152         ctx.prepare(env);
153         Thread currentThread = Thread.currentThread();
154         ClassLoader cl = currentThread.getContextClassLoader();
155         try {
156           currentThread.setContextClassLoader(env.getClassLoader());
157           ctx.call((RegionServerObserver)env.getInstance(), ctx);
158         } catch (Throwable e) {
159           handleCoprocessorThrowable(env, e);
160         } finally {
161           currentThread.setContextClassLoader(cl);
162         }
163         bypass |= ctx.shouldBypass();
164         if (ctx.shouldComplete()) {
165           break;
166         }
167       }
168       ctx.postEnvCall(env);
169     }
170     return bypass;
171   }
172 
173   /**
174    * Coprocessor environment extension providing access to region server
175    * related services.
176    */
177   static class RegionServerEnvironment extends CoprocessorHost.Environment
178       implements RegionServerCoprocessorEnvironment {
179 
180     private RegionServerServices regionServerServices;
181 
182     public RegionServerEnvironment(final Class<?> implClass,
183         final Coprocessor impl, final int priority, final int seq,
184         final Configuration conf, final RegionServerServices services) {
185       super(impl, priority, seq, conf);
186       this.regionServerServices = services;
187     }
188 
189     @Override
190     public RegionServerServices getRegionServerServices() {
191       return regionServerServices;
192     }
193   }
194 
195   /**
196    * Environment priority comparator. Coprocessors are chained in sorted
197    * order.
198    */
199   static class EnvironmentPriorityComparator implements
200       Comparator<CoprocessorEnvironment> {
201     public int compare(final CoprocessorEnvironment env1,
202         final CoprocessorEnvironment env2) {
203       if (env1.getPriority() < env2.getPriority()) {
204         return -1;
205       } else if (env1.getPriority() > env2.getPriority()) {
206         return 1;
207       }
208       if (env1.getLoadSequence() < env2.getLoadSequence()) {
209         return -1;
210       } else if (env1.getLoadSequence() > env2.getLoadSequence()) {
211         return 1;
212       }
213       return 0;
214     }
215   }
216 }