001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import java.io.IOException;
021import java.lang.reflect.InvocationTargetException;
022import org.apache.hadoop.conf.Configuration;
023import org.apache.hadoop.fs.Path;
024import org.apache.hadoop.hbase.client.RegionInfo;
025import org.apache.hadoop.hbase.coprocessor.BaseEnvironment;
026import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
027import org.apache.hadoop.hbase.coprocessor.MetricsCoprocessor;
028import org.apache.hadoop.hbase.coprocessor.WALCoprocessor;
029import org.apache.hadoop.hbase.coprocessor.WALCoprocessorEnvironment;
030import org.apache.hadoop.hbase.coprocessor.WALObserver;
031import org.apache.hadoop.hbase.metrics.MetricRegistry;
032import org.apache.hadoop.hbase.wal.WAL;
033import org.apache.hadoop.hbase.wal.WALEdit;
034import org.apache.hadoop.hbase.wal.WALKey;
035import org.apache.yetus.audience.InterfaceAudience;
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038
039/**
040 * Implements the coprocessor environment and runtime support for coprocessors loaded within a
041 * {@link WAL}.
042 */
043@InterfaceAudience.Private
044public class WALCoprocessorHost extends CoprocessorHost<WALCoprocessor, WALCoprocessorEnvironment> {
045  private static final Logger LOG = LoggerFactory.getLogger(WALCoprocessorHost.class);
046
047  /**
048   * Encapsulation of the environment of each coprocessor
049   */
050  static class WALEnvironment extends BaseEnvironment<WALCoprocessor>
051    implements WALCoprocessorEnvironment {
052
053    private final WAL wal;
054
055    private final MetricRegistry metricRegistry;
056
057    @Override
058    public WAL getWAL() {
059      return wal;
060    }
061
062    /**
063     * Constructor
064     * @param impl     the coprocessor instance
065     * @param priority chaining priority
066     * @param seq      load sequence
067     * @param conf     configuration
068     * @param wal      WAL
069     */
070    private WALEnvironment(final WALCoprocessor impl, final int priority, final int seq,
071      final Configuration conf, final WAL wal) {
072      super(impl, priority, seq, conf);
073      this.wal = wal;
074      this.metricRegistry =
075        MetricsCoprocessor.createRegistryForWALCoprocessor(impl.getClass().getName());
076    }
077
078    @Override
079    public MetricRegistry getMetricRegistryForRegionServer() {
080      return metricRegistry;
081    }
082
083    @Override
084    public void shutdown() {
085      super.shutdown();
086      MetricsCoprocessor.removeRegistry(this.metricRegistry);
087    }
088  }
089
090  private final WAL wal;
091
092  /**
093   * Constructor
094   * @param log  the write ahead log
095   * @param conf the configuration
096   */
097  public WALCoprocessorHost(final WAL log, final Configuration conf) {
098    // We don't want to require an Abortable passed down through (FS)HLog, so
099    // this means that a failure to load of a WAL coprocessor won't abort the
100    // server. This isn't ideal, and means that security components that
101    // utilize a WALObserver will have to check the observer initialization
102    // state manually. However, WALObservers will eventually go away so it
103    // should be an acceptable state of affairs.
104    super(null);
105    this.wal = log;
106    // load system default cp's from configuration.
107    loadSystemCoprocessors(conf, WAL_COPROCESSOR_CONF_KEY);
108  }
109
110  @Override
111  public WALEnvironment createEnvironment(final WALCoprocessor instance, final int priority,
112    final int seq, final Configuration conf) {
113    return new WALEnvironment(instance, priority, seq, conf, this.wal);
114  }
115
116  @Override
117  public WALCoprocessor checkAndGetInstance(Class<?> implClass)
118    throws IllegalAccessException, InstantiationException {
119    if (WALCoprocessor.class.isAssignableFrom(implClass)) {
120      try {
121        return implClass.asSubclass(WALCoprocessor.class).getDeclaredConstructor().newInstance();
122      } catch (NoSuchMethodException | InvocationTargetException e) {
123        throw (InstantiationException) new InstantiationException(implClass.getName()).initCause(e);
124      }
125    } else {
126      LOG.error(implClass.getName() + " is not of type WALCoprocessor. Check the "
127        + "configuration " + CoprocessorHost.WAL_COPROCESSOR_CONF_KEY);
128      return null;
129    }
130  }
131
132  private ObserverGetter<WALCoprocessor, WALObserver> walObserverGetter =
133    WALCoprocessor::getWALObserver;
134
135  abstract class WALObserverOperation extends ObserverOperationWithoutResult<WALObserver> {
136    public WALObserverOperation() {
137      super(walObserverGetter);
138    }
139  }
140
141  public void preWALWrite(final RegionInfo info, final WALKey logKey, final WALEdit logEdit)
142    throws IOException {
143    // Not bypassable.
144    if (this.coprocEnvironments.isEmpty()) {
145      return;
146    }
147    execOperation(new WALObserverOperation() {
148      @Override
149      public void call(WALObserver oserver) throws IOException {
150        oserver.preWALWrite(this, info, logKey, logEdit);
151      }
152    });
153  }
154
155  public void postWALWrite(final RegionInfo info, final WALKey logKey, final WALEdit logEdit)
156    throws IOException {
157    execOperation(coprocEnvironments.isEmpty() ? null : new WALObserverOperation() {
158      @Override
159      protected void call(WALObserver observer) throws IOException {
160        observer.postWALWrite(this, info, logKey, logEdit);
161      }
162    });
163  }
164
165  /**
166   * Called before rolling the current WAL
167   * @param oldPath the path of the current wal that we are replacing
168   * @param newPath the path of the wal we are going to create
169   */
170  public void preWALRoll(Path oldPath, Path newPath) throws IOException {
171    execOperation(coprocEnvironments.isEmpty() ? null : new WALObserverOperation() {
172      @Override
173      protected void call(WALObserver observer) throws IOException {
174        observer.preWALRoll(this, oldPath, newPath);
175      }
176    });
177  }
178
179  /**
180   * Called after rolling the current WAL
181   * @param oldPath the path of the wal that we replaced
182   * @param newPath the path of the wal we have created and now is the current
183   */
184  public void postWALRoll(Path oldPath, Path newPath) throws IOException {
185    execOperation(coprocEnvironments.isEmpty() ? null : new WALObserverOperation() {
186      @Override
187      protected void call(WALObserver observer) throws IOException {
188        observer.postWALRoll(this, oldPath, newPath);
189      }
190    });
191  }
192}