001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import java.io.IOException; 021import java.lang.reflect.InvocationTargetException; 022import org.apache.hadoop.conf.Configuration; 023import org.apache.hadoop.fs.Path; 024import org.apache.hadoop.hbase.client.RegionInfo; 025import org.apache.hadoop.hbase.coprocessor.BaseEnvironment; 026import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 027import org.apache.hadoop.hbase.coprocessor.MetricsCoprocessor; 028import org.apache.hadoop.hbase.coprocessor.WALCoprocessor; 029import org.apache.hadoop.hbase.coprocessor.WALCoprocessorEnvironment; 030import org.apache.hadoop.hbase.coprocessor.WALObserver; 031import org.apache.hadoop.hbase.metrics.MetricRegistry; 032import org.apache.hadoop.hbase.wal.WAL; 033import org.apache.hadoop.hbase.wal.WALEdit; 034import org.apache.hadoop.hbase.wal.WALKey; 035import org.apache.yetus.audience.InterfaceAudience; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038 039/** 040 * Implements the coprocessor environment and runtime support for coprocessors loaded within a 041 * {@link WAL}. 042 */ 043@InterfaceAudience.Private 044public class WALCoprocessorHost extends CoprocessorHost<WALCoprocessor, WALCoprocessorEnvironment> { 045 private static final Logger LOG = LoggerFactory.getLogger(WALCoprocessorHost.class); 046 047 /** 048 * Encapsulation of the environment of each coprocessor 049 */ 050 static class WALEnvironment extends BaseEnvironment<WALCoprocessor> 051 implements WALCoprocessorEnvironment { 052 053 private final WAL wal; 054 055 private final MetricRegistry metricRegistry; 056 057 @Override 058 public WAL getWAL() { 059 return wal; 060 } 061 062 /** 063 * Constructor 064 * @param impl the coprocessor instance 065 * @param priority chaining priority 066 * @param seq load sequence 067 * @param conf configuration 068 * @param wal WAL 069 */ 070 private WALEnvironment(final WALCoprocessor impl, final int priority, final int seq, 071 final Configuration conf, final WAL wal) { 072 super(impl, priority, seq, conf); 073 this.wal = wal; 074 this.metricRegistry = 075 MetricsCoprocessor.createRegistryForWALCoprocessor(impl.getClass().getName()); 076 } 077 078 @Override 079 public MetricRegistry getMetricRegistryForRegionServer() { 080 return metricRegistry; 081 } 082 083 @Override 084 public void shutdown() { 085 super.shutdown(); 086 MetricsCoprocessor.removeRegistry(this.metricRegistry); 087 } 088 } 089 090 private final WAL wal; 091 092 /** 093 * Constructor 094 * @param log the write ahead log 095 * @param conf the configuration 096 */ 097 public WALCoprocessorHost(final WAL log, final Configuration conf) { 098 // We don't want to require an Abortable passed down through (FS)HLog, so 099 // this means that a failure to load of a WAL coprocessor won't abort the 100 // server. This isn't ideal, and means that security components that 101 // utilize a WALObserver will have to check the observer initialization 102 // state manually. However, WALObservers will eventually go away so it 103 // should be an acceptable state of affairs. 104 super(null); 105 this.wal = log; 106 // load system default cp's from configuration. 107 loadSystemCoprocessors(conf, WAL_COPROCESSOR_CONF_KEY); 108 } 109 110 @Override 111 public WALEnvironment createEnvironment(final WALCoprocessor instance, final int priority, 112 final int seq, final Configuration conf) { 113 return new WALEnvironment(instance, priority, seq, conf, this.wal); 114 } 115 116 @Override 117 public WALCoprocessor checkAndGetInstance(Class<?> implClass) 118 throws IllegalAccessException, InstantiationException { 119 if (WALCoprocessor.class.isAssignableFrom(implClass)) { 120 try { 121 return implClass.asSubclass(WALCoprocessor.class).getDeclaredConstructor().newInstance(); 122 } catch (NoSuchMethodException | InvocationTargetException e) { 123 throw (InstantiationException) new InstantiationException(implClass.getName()).initCause(e); 124 } 125 } else { 126 LOG.error(implClass.getName() + " is not of type WALCoprocessor. Check the " 127 + "configuration " + CoprocessorHost.WAL_COPROCESSOR_CONF_KEY); 128 return null; 129 } 130 } 131 132 private ObserverGetter<WALCoprocessor, WALObserver> walObserverGetter = 133 WALCoprocessor::getWALObserver; 134 135 abstract class WALObserverOperation extends ObserverOperationWithoutResult<WALObserver> { 136 public WALObserverOperation() { 137 super(walObserverGetter); 138 } 139 } 140 141 public void preWALWrite(final RegionInfo info, final WALKey logKey, final WALEdit logEdit) 142 throws IOException { 143 // Not bypassable. 144 if (this.coprocEnvironments.isEmpty()) { 145 return; 146 } 147 execOperation(new WALObserverOperation() { 148 @Override 149 public void call(WALObserver oserver) throws IOException { 150 oserver.preWALWrite(this, info, logKey, logEdit); 151 } 152 }); 153 } 154 155 public void postWALWrite(final RegionInfo info, final WALKey logKey, final WALEdit logEdit) 156 throws IOException { 157 execOperation(coprocEnvironments.isEmpty() ? null : new WALObserverOperation() { 158 @Override 159 protected void call(WALObserver observer) throws IOException { 160 observer.postWALWrite(this, info, logKey, logEdit); 161 } 162 }); 163 } 164 165 /** 166 * Called before rolling the current WAL 167 * @param oldPath the path of the current wal that we are replacing 168 * @param newPath the path of the wal we are going to create 169 */ 170 public void preWALRoll(Path oldPath, Path newPath) throws IOException { 171 execOperation(coprocEnvironments.isEmpty() ? null : new WALObserverOperation() { 172 @Override 173 protected void call(WALObserver observer) throws IOException { 174 observer.preWALRoll(this, oldPath, newPath); 175 } 176 }); 177 } 178 179 /** 180 * Called after rolling the current WAL 181 * @param oldPath the path of the wal that we replaced 182 * @param newPath the path of the wal we have created and now is the current 183 */ 184 public void postWALRoll(Path oldPath, Path newPath) throws IOException { 185 execOperation(coprocEnvironments.isEmpty() ? null : new WALObserverOperation() { 186 @Override 187 protected void call(WALObserver observer) throws IOException { 188 observer.postWALRoll(this, oldPath, newPath); 189 } 190 }); 191 } 192}