001 002/* 003 * 004 * Licensed to the Apache Software Foundation (ASF) under one 005 * or more contributor license agreements. See the NOTICE file 006 * distributed with this work for additional information 007 * regarding copyright ownership. The ASF licenses this file 008 * to you under the Apache License, Version 2.0 (the 009 * "License"); you may not use this file except in compliance 010 * with the License. You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package org.apache.hadoop.hbase.regionserver.wal; 022 023import java.io.IOException; 024import java.lang.reflect.InvocationTargetException; 025 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.hbase.client.RegionInfo; 029import org.apache.hadoop.hbase.coprocessor.BaseEnvironment; 030import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 031import org.apache.hadoop.hbase.coprocessor.MetricsCoprocessor; 032import org.apache.hadoop.hbase.coprocessor.WALCoprocessor; 033import org.apache.hadoop.hbase.coprocessor.WALCoprocessorEnvironment; 034import org.apache.hadoop.hbase.coprocessor.WALObserver; 035import org.apache.hadoop.hbase.metrics.MetricRegistry; 036import org.apache.hadoop.hbase.wal.WAL; 037import org.apache.hadoop.hbase.wal.WALEdit; 038import org.apache.hadoop.hbase.wal.WALKey; 039import org.apache.yetus.audience.InterfaceAudience; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042 043/** 044 * Implements the coprocessor environment and runtime support for coprocessors 045 * loaded within a {@link WAL}. 046 */ 047@InterfaceAudience.Private 048public class WALCoprocessorHost 049 extends CoprocessorHost<WALCoprocessor, WALCoprocessorEnvironment> { 050 private static final Logger LOG = LoggerFactory.getLogger(WALCoprocessorHost.class); 051 052 /** 053 * Encapsulation of the environment of each coprocessor 054 */ 055 static class WALEnvironment extends BaseEnvironment<WALCoprocessor> 056 implements WALCoprocessorEnvironment { 057 058 private final WAL wal; 059 060 private final MetricRegistry metricRegistry; 061 062 @Override 063 public WAL getWAL() { 064 return wal; 065 } 066 067 /** 068 * Constructor 069 * @param impl the coprocessor instance 070 * @param priority chaining priority 071 * @param seq load sequence 072 * @param conf configuration 073 * @param wal WAL 074 */ 075 private WALEnvironment(final WALCoprocessor impl, final int priority, final int seq, 076 final Configuration conf, final WAL wal) { 077 super(impl, priority, seq, conf); 078 this.wal = wal; 079 this.metricRegistry = MetricsCoprocessor.createRegistryForWALCoprocessor( 080 impl.getClass().getName()); 081 } 082 083 @Override 084 public MetricRegistry getMetricRegistryForRegionServer() { 085 return metricRegistry; 086 } 087 088 @Override 089 public void shutdown() { 090 super.shutdown(); 091 MetricsCoprocessor.removeRegistry(this.metricRegistry); 092 } 093 } 094 095 private final WAL wal; 096 097 /** 098 * Constructor 099 * @param log the write ahead log 100 * @param conf the configuration 101 */ 102 public WALCoprocessorHost(final WAL log, final Configuration conf) { 103 // We don't want to require an Abortable passed down through (FS)HLog, so 104 // this means that a failure to load of a WAL coprocessor won't abort the 105 // server. This isn't ideal, and means that security components that 106 // utilize a WALObserver will have to check the observer initialization 107 // state manually. However, WALObservers will eventually go away so it 108 // should be an acceptable state of affairs. 109 super(null); 110 this.wal = log; 111 // load system default cp's from configuration. 112 loadSystemCoprocessors(conf, WAL_COPROCESSOR_CONF_KEY); 113 } 114 115 @Override 116 public WALEnvironment createEnvironment(final WALCoprocessor instance, final int priority, 117 final int seq, final Configuration conf) { 118 return new WALEnvironment(instance, priority, seq, conf, this.wal); 119 } 120 121 @Override 122 public WALCoprocessor checkAndGetInstance(Class<?> implClass) throws IllegalAccessException, 123 InstantiationException { 124 if (WALCoprocessor.class.isAssignableFrom(implClass)) { 125 try { 126 return implClass.asSubclass(WALCoprocessor.class).getDeclaredConstructor().newInstance(); 127 } catch (NoSuchMethodException | InvocationTargetException e) { 128 throw (InstantiationException) new InstantiationException(implClass.getName()).initCause(e); 129 } 130 } else { 131 LOG.error(implClass.getName() + " is not of type WALCoprocessor. Check the " 132 + "configuration " + CoprocessorHost.WAL_COPROCESSOR_CONF_KEY); 133 return null; 134 } 135 } 136 137 private ObserverGetter<WALCoprocessor, WALObserver> walObserverGetter = 138 WALCoprocessor::getWALObserver; 139 140 abstract class WALObserverOperation extends 141 ObserverOperationWithoutResult<WALObserver> { 142 public WALObserverOperation() { 143 super(walObserverGetter); 144 } 145 } 146 147 public void preWALWrite(final RegionInfo info, final WALKey logKey, final WALEdit logEdit) 148 throws IOException { 149 // Not bypassable. 150 if (this.coprocEnvironments.isEmpty()) { 151 return; 152 } 153 execOperation(new WALObserverOperation() { 154 @Override 155 public void call(WALObserver oserver) throws IOException { 156 oserver.preWALWrite(this, info, logKey, logEdit); 157 } 158 }); 159 } 160 161 public void postWALWrite(final RegionInfo info, final WALKey logKey, final WALEdit logEdit) 162 throws IOException { 163 execOperation(coprocEnvironments.isEmpty() ? null : new WALObserverOperation() { 164 @Override 165 protected void call(WALObserver observer) throws IOException { 166 observer.postWALWrite(this, info, logKey, logEdit); 167 } 168 }); 169 } 170 171 /** 172 * Called before rolling the current WAL 173 * @param oldPath the path of the current wal that we are replacing 174 * @param newPath the path of the wal we are going to create 175 */ 176 public void preWALRoll(Path oldPath, Path newPath) throws IOException { 177 execOperation(coprocEnvironments.isEmpty() ? null : new WALObserverOperation() { 178 @Override 179 protected void call(WALObserver observer) throws IOException { 180 observer.preWALRoll(this, oldPath, newPath); 181 } 182 }); 183 } 184 185 /** 186 * Called after rolling the current WAL 187 * @param oldPath the path of the wal that we replaced 188 * @param newPath the path of the wal we have created and now is the current 189 */ 190 public void postWALRoll(Path oldPath, Path newPath) throws IOException { 191 execOperation(coprocEnvironments.isEmpty() ? null : new WALObserverOperation() { 192 @Override 193 protected void call(WALObserver observer) throws IOException { 194 observer.postWALRoll(this, oldPath, newPath); 195 } 196 }); 197 } 198}