001
002/*
003 *
004 * Licensed to the Apache Software Foundation (ASF) under one
005 * or more contributor license agreements.  See the NOTICE file
006 * distributed with this work for additional information
007 * regarding copyright ownership.  The ASF licenses this file
008 * to you under the Apache License, Version 2.0 (the
009 * "License"); you may not use this file except in compliance
010 * with the License.  You may obtain a copy of the License at
011 *
012 * http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package org.apache.hadoop.hbase.regionserver.wal;
022
023import java.io.IOException;
024import java.lang.reflect.InvocationTargetException;
025
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.hbase.client.RegionInfo;
029import org.apache.hadoop.hbase.coprocessor.BaseEnvironment;
030import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
031import org.apache.hadoop.hbase.coprocessor.MetricsCoprocessor;
032import org.apache.hadoop.hbase.coprocessor.WALCoprocessor;
033import org.apache.hadoop.hbase.coprocessor.WALCoprocessorEnvironment;
034import org.apache.hadoop.hbase.coprocessor.WALObserver;
035import org.apache.hadoop.hbase.metrics.MetricRegistry;
036import org.apache.hadoop.hbase.wal.WAL;
037import org.apache.hadoop.hbase.wal.WALEdit;
038import org.apache.hadoop.hbase.wal.WALKey;
039import org.apache.yetus.audience.InterfaceAudience;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043/**
044 * Implements the coprocessor environment and runtime support for coprocessors
045 * loaded within a {@link WAL}.
046 */
047@InterfaceAudience.Private
048public class WALCoprocessorHost
049    extends CoprocessorHost<WALCoprocessor, WALCoprocessorEnvironment> {
050  private static final Logger LOG = LoggerFactory.getLogger(WALCoprocessorHost.class);
051
052  /**
053   * Encapsulation of the environment of each coprocessor
054   */
055  static class WALEnvironment extends BaseEnvironment<WALCoprocessor>
056    implements WALCoprocessorEnvironment {
057
058    private final WAL wal;
059
060    private final MetricRegistry metricRegistry;
061
062    @Override
063    public WAL getWAL() {
064      return wal;
065    }
066
067    /**
068     * Constructor
069     * @param impl the coprocessor instance
070     * @param priority chaining priority
071     * @param seq load sequence
072     * @param conf configuration
073     * @param wal WAL
074     */
075    private WALEnvironment(final WALCoprocessor impl, final int priority, final int seq,
076        final Configuration conf, final WAL wal) {
077      super(impl, priority, seq, conf);
078      this.wal = wal;
079      this.metricRegistry = MetricsCoprocessor.createRegistryForWALCoprocessor(
080          impl.getClass().getName());
081    }
082
083    @Override
084    public MetricRegistry getMetricRegistryForRegionServer() {
085      return metricRegistry;
086    }
087
088    @Override
089    public void shutdown() {
090      super.shutdown();
091      MetricsCoprocessor.removeRegistry(this.metricRegistry);
092    }
093  }
094
095  private final WAL wal;
096
097  /**
098   * Constructor
099   * @param log the write ahead log
100   * @param conf the configuration
101   */
102  public WALCoprocessorHost(final WAL log, final Configuration conf) {
103    // We don't want to require an Abortable passed down through (FS)HLog, so
104    // this means that a failure to load of a WAL coprocessor won't abort the
105    // server. This isn't ideal, and means that security components that
106    // utilize a WALObserver will have to check the observer initialization
107    // state manually. However, WALObservers will eventually go away so it
108    // should be an acceptable state of affairs.
109    super(null);
110    this.wal = log;
111    // load system default cp's from configuration.
112    loadSystemCoprocessors(conf, WAL_COPROCESSOR_CONF_KEY);
113  }
114
115  @Override
116  public WALEnvironment createEnvironment(final WALCoprocessor instance, final int priority,
117      final int seq, final Configuration conf) {
118    return new WALEnvironment(instance, priority, seq, conf, this.wal);
119  }
120
121  @Override
122  public WALCoprocessor checkAndGetInstance(Class<?> implClass) throws IllegalAccessException,
123      InstantiationException {
124    if (WALCoprocessor.class.isAssignableFrom(implClass)) {
125      try {
126        return implClass.asSubclass(WALCoprocessor.class).getDeclaredConstructor().newInstance();
127      } catch (NoSuchMethodException | InvocationTargetException e) {
128        throw (InstantiationException) new InstantiationException(implClass.getName()).initCause(e);
129      }
130    } else {
131      LOG.error(implClass.getName() + " is not of type WALCoprocessor. Check the "
132          + "configuration " + CoprocessorHost.WAL_COPROCESSOR_CONF_KEY);
133      return null;
134    }
135  }
136
137  private ObserverGetter<WALCoprocessor, WALObserver> walObserverGetter =
138      WALCoprocessor::getWALObserver;
139
140  abstract class WALObserverOperation extends
141      ObserverOperationWithoutResult<WALObserver> {
142    public WALObserverOperation() {
143      super(walObserverGetter);
144    }
145  }
146
147  public void preWALWrite(final RegionInfo info, final WALKey logKey, final WALEdit logEdit)
148      throws IOException {
149    // Not bypassable.
150    if (this.coprocEnvironments.isEmpty()) {
151      return;
152    }
153    execOperation(new WALObserverOperation() {
154      @Override
155      public void call(WALObserver oserver) throws IOException {
156        oserver.preWALWrite(this, info, logKey, logEdit);
157      }
158    });
159  }
160
161  public void postWALWrite(final RegionInfo info, final WALKey logKey, final WALEdit logEdit)
162      throws IOException {
163    execOperation(coprocEnvironments.isEmpty() ? null : new WALObserverOperation() {
164      @Override
165      protected void call(WALObserver observer) throws IOException {
166        observer.postWALWrite(this, info, logKey, logEdit);
167      }
168    });
169  }
170
171  /**
172   * Called before rolling the current WAL
173   * @param oldPath the path of the current wal that we are replacing
174   * @param newPath the path of the wal we are going to create
175   */
176  public void preWALRoll(Path oldPath, Path newPath) throws IOException {
177    execOperation(coprocEnvironments.isEmpty() ? null : new WALObserverOperation() {
178      @Override
179      protected void call(WALObserver observer) throws IOException {
180        observer.preWALRoll(this, oldPath, newPath);
181      }
182    });
183  }
184
185  /**
186   * Called after rolling the current WAL
187   * @param oldPath the path of the wal that we replaced
188   * @param newPath the path of the wal we have created and now is the current
189   */
190  public void postWALRoll(Path oldPath, Path newPath) throws IOException {
191    execOperation(coprocEnvironments.isEmpty() ? null : new WALObserverOperation() {
192      @Override
193      protected void call(WALObserver observer) throws IOException {
194        observer.postWALRoll(this, oldPath, newPath);
195      }
196    });
197  }
198}