View Javadoc

1   
2   /*
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   * http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  
21  package org.apache.hadoop.hbase.regionserver.wal;
22  
23  import java.io.IOException;
24  import java.util.List;
25  
26  import org.apache.hadoop.fs.Path;
27  import org.apache.hadoop.hbase.Coprocessor;
28  import org.apache.hadoop.hbase.HRegionInfo;
29  import org.apache.hadoop.hbase.coprocessor.*;
30  import org.apache.hadoop.hbase.classification.InterfaceAudience;
31  import org.apache.hadoop.conf.Configuration;
32  import org.apache.hadoop.hbase.wal.WAL;
33  import org.apache.hadoop.hbase.wal.WALKey;
34  
35  /**
36   * Implements the coprocessor environment and runtime support for coprocessors
37   * loaded within a {@link WAL}.
38   */
39  @InterfaceAudience.Private
40  public class WALCoprocessorHost
41      extends CoprocessorHost<WALCoprocessorHost.WALEnvironment> {
42  
43    /**
44     * Encapsulation of the environment of each coprocessor
45     */
46    static class WALEnvironment extends CoprocessorHost.Environment
47      implements WALCoprocessorEnvironment {
48  
49      private final WAL wal;
50  
51      final boolean useLegacyPre;
52      final boolean useLegacyPost;
53  
54      @Override
55      public WAL getWAL() {
56        return wal;
57      }
58  
59      /**
60       * Constructor
61       * @param implClass - not used
62       * @param impl the coprocessor instance
63       * @param priority chaining priority
64       * @param seq load sequence
65       * @param conf configuration
66       * @param wal WAL
67       */
68      public WALEnvironment(Class<?> implClass, final Coprocessor impl,
69          final int priority, final int seq, final Configuration conf,
70          final WAL wal) {
71        super(impl, priority, seq, conf);
72        this.wal = wal;
73        // Pick which version of the API we'll call.
74        // This way we avoid calling the new version on older WALObservers so
75        // we can maintain binary compatibility.
76        // See notes in javadoc for WALObserver
77        useLegacyPre = useLegacyMethod(impl.getClass(), "preWALWrite", ObserverContext.class,
78            HRegionInfo.class, WALKey.class, WALEdit.class);
79        useLegacyPost = useLegacyMethod(impl.getClass(), "postWALWrite", ObserverContext.class,
80            HRegionInfo.class, WALKey.class, WALEdit.class);
81      }
82    }
83  
84    private final WAL wal;
85  
86    /**
87     * Constructor
88     * @param log the write ahead log
89     * @param conf the configuration
90     */
91    public WALCoprocessorHost(final WAL log, final Configuration conf) {
92      // We don't want to require an Abortable passed down through (FS)HLog, so
93      // this means that a failure to load of a WAL coprocessor won't abort the
94      // server. This isn't ideal, and means that security components that
95      // utilize a WALObserver will have to check the observer initialization
96      // state manually. However, WALObservers will eventually go away so it
97      // should be an acceptable state of affairs.
98      super(null);
99      this.wal = log;
100     // load system default cp's from configuration.
101     loadSystemCoprocessors(conf, WAL_COPROCESSOR_CONF_KEY);
102   }
103 
104   @Override
105   public WALEnvironment createEnvironment(final Class<?> implClass,
106       final Coprocessor instance, final int priority, final int seq,
107       final Configuration conf) {
108     return new WALEnvironment(implClass, instance, priority, seq, conf,
109         this.wal);
110   }
111 
112   /**
113    * @param info
114    * @param logKey
115    * @param logEdit
116    * @return true if default behavior should be bypassed, false otherwise
117    * @throws IOException
118    */
119   public boolean preWALWrite(final HRegionInfo info, final WALKey logKey, final WALEdit logEdit)
120       throws IOException {
121     boolean bypass = false;
122     if (this.coprocessors == null || this.coprocessors.isEmpty()) return bypass;
123     ObserverContext<WALCoprocessorEnvironment> ctx = null;
124     List<WALEnvironment> envs = coprocessors.get();
125     for (int i = 0; i < envs.size(); i++) {
126       WALEnvironment env = envs.get(i);
127       if (env.getInstance() instanceof WALObserver) {
128         final WALObserver observer = (WALObserver)env.getInstance();
129         ctx = ObserverContext.createAndPrepare(env, ctx);
130         Thread currentThread = Thread.currentThread();
131         ClassLoader cl = currentThread.getContextClassLoader();
132         try {
133           currentThread.setContextClassLoader(env.getClassLoader());
134           if (env.useLegacyPre) {
135             if (logKey instanceof HLogKey) {
136               observer.preWALWrite(ctx, info, (HLogKey)logKey, logEdit);
137             } else {
138               legacyWarning(observer.getClass(),
139                   "There are wal keys present that are not HLogKey.");
140             }
141           } else {
142             observer.preWALWrite(ctx, info, logKey, logEdit);
143           }
144         } catch (Throwable e) {
145           handleCoprocessorThrowable(env, e);
146         } finally {
147           currentThread.setContextClassLoader(cl);
148         }
149         bypass |= ctx.shouldBypass();
150         if (ctx.shouldComplete()) {
151           break;
152         }
153       }
154     }
155     return bypass;
156   }
157 
158   /**
159    * @param info
160    * @param logKey
161    * @param logEdit
162    * @throws IOException
163    */
164   public void postWALWrite(final HRegionInfo info, final WALKey logKey, final WALEdit logEdit)
165       throws IOException {
166     if (this.coprocessors == null || this.coprocessors.isEmpty()) return;
167     ObserverContext<WALCoprocessorEnvironment> ctx = null;
168     List<WALEnvironment> envs = coprocessors.get();
169     for (int i = 0; i < envs.size(); i++) {
170       WALEnvironment env = envs.get(i);
171       if (env.getInstance() instanceof WALObserver) {
172         final WALObserver observer = (WALObserver)env.getInstance();
173         ctx = ObserverContext.createAndPrepare(env, ctx);
174         Thread currentThread = Thread.currentThread();
175         ClassLoader cl = currentThread.getContextClassLoader();
176         try {
177           currentThread.setContextClassLoader(env.getClassLoader());
178           if (env.useLegacyPost) {
179             if (logKey instanceof HLogKey) {
180               observer.postWALWrite(ctx, info, (HLogKey)logKey, logEdit);
181             } else {
182               legacyWarning(observer.getClass(),
183                   "There are wal keys present that are not HLogKey.");
184             }
185           } else {
186             observer.postWALWrite(ctx, info, logKey, logEdit);
187           }
188         } catch (Throwable e) {
189           handleCoprocessorThrowable(env, e);
190         } finally {
191           currentThread.setContextClassLoader(cl);
192         }
193         if (ctx.shouldComplete()) {
194           break;
195         }
196       }
197     }
198   }
199 
200   /**
201    * Called before rolling the current WAL
202    * @param oldPath the path of the current wal that we are replacing
203    * @param newPath the path of the wal we are going to create
204    */
205   public void preWALRoll(Path oldPath, Path newPath) throws IOException {
206     if (this.coprocessors == null || this.coprocessors.isEmpty()) return;
207     ObserverContext<WALCoprocessorEnvironment> ctx = null;
208     List<WALEnvironment> envs = coprocessors.get();
209     for (int i = 0; i < envs.size(); i++) {
210       WALEnvironment env = envs.get(i);
211       if (env.getInstance() instanceof WALObserver) {
212         final WALObserver observer = (WALObserver)env.getInstance();
213         ctx = ObserverContext.createAndPrepare(env, ctx);
214         Thread currentThread = Thread.currentThread();
215         ClassLoader cl = currentThread.getContextClassLoader();
216         try {
217           currentThread.setContextClassLoader(env.getClassLoader());
218           observer.preWALRoll(ctx, oldPath, newPath);
219         } catch (Throwable e) {
220           handleCoprocessorThrowable(env, e);
221         } finally {
222           currentThread.setContextClassLoader(cl);
223         }
224         if (ctx.shouldComplete()) {
225           break;
226         }
227       }
228     }
229   }
230 
231   /**
232    * Called after rolling the current WAL
233    * @param oldPath the path of the wal that we replaced
234    * @param newPath the path of the wal we have created and now is the current
235    */
236   public void postWALRoll(Path oldPath, Path newPath) throws IOException {
237     if (this.coprocessors == null || this.coprocessors.isEmpty()) return;
238     ObserverContext<WALCoprocessorEnvironment> ctx = null;
239     List<WALEnvironment> envs = coprocessors.get();
240     for (int i = 0; i < envs.size(); i++) {
241       WALEnvironment env = envs.get(i);
242       if (env.getInstance() instanceof WALObserver) {
243         final WALObserver observer = (WALObserver)env.getInstance();
244         ctx = ObserverContext.createAndPrepare(env, ctx);
245         Thread currentThread = Thread.currentThread();
246         ClassLoader cl = currentThread.getContextClassLoader();
247         try {
248           currentThread.setContextClassLoader(env.getClassLoader());
249           observer.postWALRoll(ctx, oldPath, newPath);
250         } catch (Throwable e) {
251           handleCoprocessorThrowable(env, e);
252         } finally {
253           currentThread.setContextClassLoader(cl);
254         }
255         if (ctx.shouldComplete()) {
256           break;
257         }
258       }
259     }
260   }
261 }