001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.io.IOException;
021import java.lang.reflect.InvocationTargetException;
022import org.apache.hadoop.conf.Configuration;
023import org.apache.hadoop.hbase.ServerName;
024import org.apache.hadoop.hbase.client.Connection;
025import org.apache.hadoop.hbase.client.Mutation;
026import org.apache.hadoop.hbase.client.SharedConnection;
027import org.apache.hadoop.hbase.coprocessor.BaseEnvironment;
028import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
029import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor;
030import org.apache.hadoop.hbase.coprocessor.HasRegionServerServices;
031import org.apache.hadoop.hbase.coprocessor.MetricsCoprocessor;
032import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessor;
033import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
034import org.apache.hadoop.hbase.coprocessor.RegionServerObserver;
035import org.apache.hadoop.hbase.metrics.MetricRegistry;
036import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
037import org.apache.hadoop.hbase.security.User;
038import org.apache.yetus.audience.InterfaceAudience;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042import org.apache.hbase.thirdparty.com.google.protobuf.Service;
043
044import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
045
046@InterfaceAudience.Private
047public class RegionServerCoprocessorHost
048  extends CoprocessorHost<RegionServerCoprocessor, RegionServerCoprocessorEnvironment> {
049
050  private static final Logger LOG = LoggerFactory.getLogger(RegionServerCoprocessorHost.class);
051
052  private RegionServerServices rsServices;
053
054  public RegionServerCoprocessorHost(RegionServerServices rsServices, Configuration conf) {
055    super(rsServices);
056    this.rsServices = rsServices;
057    this.conf = conf;
058    // Log the state of coprocessor loading here; should appear only once or
059    // twice in the daemon log, depending on HBase version, because there is
060    // only one RegionServerCoprocessorHost instance in the RS process
061    boolean coprocessorsEnabled =
062      conf.getBoolean(COPROCESSORS_ENABLED_CONF_KEY, DEFAULT_COPROCESSORS_ENABLED);
063    boolean tableCoprocessorsEnabled =
064      conf.getBoolean(USER_COPROCESSORS_ENABLED_CONF_KEY, DEFAULT_USER_COPROCESSORS_ENABLED);
065    LOG.info("System coprocessor loading is " + (coprocessorsEnabled ? "enabled" : "disabled"));
066    LOG.info("Table coprocessor loading is "
067      + ((coprocessorsEnabled && tableCoprocessorsEnabled) ? "enabled" : "disabled"));
068    loadSystemCoprocessors(conf, REGIONSERVER_COPROCESSOR_CONF_KEY);
069  }
070
071  @Override
072  public RegionServerEnvironment createEnvironment(RegionServerCoprocessor instance, int priority,
073    int sequence, Configuration conf) {
074    // If a CoreCoprocessor, return a 'richer' environment, one laden with RegionServerServices.
075    return instance.getClass().isAnnotationPresent(CoreCoprocessor.class)
076      ? new RegionServerEnvironmentForCoreCoprocessors(instance, priority, sequence, conf,
077        this.rsServices)
078      : new RegionServerEnvironment(instance, priority, sequence, conf, this.rsServices);
079  }
080
081  @Override
082  public RegionServerCoprocessor checkAndGetInstance(Class<?> implClass)
083    throws InstantiationException, IllegalAccessException {
084    try {
085      if (RegionServerCoprocessor.class.isAssignableFrom(implClass)) {
086        return implClass.asSubclass(RegionServerCoprocessor.class).getDeclaredConstructor()
087          .newInstance();
088      } else {
089        LOG.error("{} is not of type RegionServerCoprocessor. Check the configuration of {}",
090          implClass.getName(), CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY);
091        return null;
092      }
093    } catch (NoSuchMethodException | InvocationTargetException e) {
094      throw (InstantiationException) new InstantiationException(implClass.getName()).initCause(e);
095    }
096  }
097
098  private ObserverGetter<RegionServerCoprocessor, RegionServerObserver> rsObserverGetter =
099    RegionServerCoprocessor::getRegionServerObserver;
100
101  abstract class RegionServerObserverOperation
102    extends ObserverOperationWithoutResult<RegionServerObserver> {
103    public RegionServerObserverOperation() {
104      super(rsObserverGetter);
105    }
106
107    public RegionServerObserverOperation(User user) {
108      super(rsObserverGetter, user);
109    }
110  }
111
112  //////////////////////////////////////////////////////////////////////////////////////////////////
113  // RegionServerObserver operations
114  //////////////////////////////////////////////////////////////////////////////////////////////////
115
116  public void preStop(String message, User user) throws IOException {
117    // While stopping the region server all coprocessors method should be executed first then the
118    // coprocessor should be cleaned up.
119    if (coprocEnvironments.isEmpty()) {
120      return;
121    }
122    execShutdown(new RegionServerObserverOperation(user) {
123      @Override
124      public void call(RegionServerObserver observer) throws IOException {
125        observer.preStopRegionServer(this);
126      }
127
128      @Override
129      public void postEnvCall() {
130        // invoke coprocessor stop method
131        shutdown(this.getEnvironment());
132      }
133    });
134  }
135
136  public void preRollWALWriterRequest() throws IOException {
137    execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
138      @Override
139      public void call(RegionServerObserver observer) throws IOException {
140        observer.preRollWALWriterRequest(this);
141      }
142    });
143  }
144
145  public void postRollWALWriterRequest() throws IOException {
146    execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
147      @Override
148      public void call(RegionServerObserver observer) throws IOException {
149        observer.postRollWALWriterRequest(this);
150      }
151    });
152  }
153
154  public void preReplicateLogEntries() throws IOException {
155    execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
156      @Override
157      public void call(RegionServerObserver observer) throws IOException {
158        observer.preReplicateLogEntries(this);
159      }
160    });
161  }
162
163  public void postReplicateLogEntries() throws IOException {
164    execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
165      @Override
166      public void call(RegionServerObserver observer) throws IOException {
167        observer.postReplicateLogEntries(this);
168      }
169    });
170  }
171
172  public void preReplicationSinkBatchMutate(AdminProtos.WALEntry walEntry, Mutation mutation)
173    throws IOException {
174    execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
175      @Override
176      public void call(RegionServerObserver observer) throws IOException {
177        observer.preReplicationSinkBatchMutate(this, walEntry, mutation);
178      }
179    });
180  }
181
182  public void postReplicationSinkBatchMutate(AdminProtos.WALEntry walEntry, Mutation mutation)
183    throws IOException {
184    execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
185      @Override
186      public void call(RegionServerObserver observer) throws IOException {
187        observer.postReplicationSinkBatchMutate(this, walEntry, mutation);
188      }
189    });
190  }
191
192  public ReplicationEndpoint postCreateReplicationEndPoint(final ReplicationEndpoint endpoint)
193    throws IOException {
194    if (this.coprocEnvironments.isEmpty()) {
195      return endpoint;
196    }
197    return execOperationWithResult(
198      new ObserverOperationWithResult<RegionServerObserver, ReplicationEndpoint>(rsObserverGetter,
199        endpoint) {
200        @Override
201        public ReplicationEndpoint call(RegionServerObserver observer) throws IOException {
202          return observer.postCreateReplicationEndPoint(this, getResult());
203        }
204      });
205  }
206
207  public void preClearCompactionQueues() throws IOException {
208    execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
209      @Override
210      public void call(RegionServerObserver observer) throws IOException {
211        observer.preClearCompactionQueues(this);
212      }
213    });
214  }
215
216  public void postClearCompactionQueues() throws IOException {
217    execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
218      @Override
219      public void call(RegionServerObserver observer) throws IOException {
220        observer.postClearCompactionQueues(this);
221      }
222    });
223  }
224
225  public void preExecuteProcedures() throws IOException {
226    execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
227      @Override
228      public void call(RegionServerObserver observer) throws IOException {
229        observer.preExecuteProcedures(this);
230      }
231    });
232  }
233
234  public void postExecuteProcedures() throws IOException {
235    execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
236      @Override
237      public void call(RegionServerObserver observer) throws IOException {
238        observer.postExecuteProcedures(this);
239      }
240    });
241  }
242
243  /**
244   * Coprocessor environment extension providing access to region server related services.
245   */
246  private static class RegionServerEnvironment extends BaseEnvironment<RegionServerCoprocessor>
247    implements RegionServerCoprocessorEnvironment {
248    private final MetricRegistry metricRegistry;
249    private final RegionServerServices services;
250
251    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "BC_UNCONFIRMED_CAST",
252        justification = "Intentional; FB has trouble detecting isAssignableFrom")
253    public RegionServerEnvironment(final RegionServerCoprocessor impl, final int priority,
254      final int seq, final Configuration conf, final RegionServerServices services) {
255      super(impl, priority, seq, conf);
256      // If coprocessor exposes any services, register them.
257      for (Service service : impl.getServices()) {
258        services.registerService(service);
259      }
260      this.services = services;
261      this.metricRegistry =
262        MetricsCoprocessor.createRegistryForRSCoprocessor(impl.getClass().getName());
263    }
264
265    @Override
266    public OnlineRegions getOnlineRegions() {
267      return this.services;
268    }
269
270    @Override
271    public ServerName getServerName() {
272      return this.services.getServerName();
273    }
274
275    @Override
276    public Connection getConnection() {
277      return new SharedConnection(this.services.getConnection());
278    }
279
280    @Override
281    public Connection createConnection(Configuration conf) throws IOException {
282      return this.services.createConnection(conf);
283    }
284
285    @Override
286    public MetricRegistry getMetricRegistryForRegionServer() {
287      return metricRegistry;
288    }
289
290    @Override
291    public void shutdown() {
292      super.shutdown();
293      MetricsCoprocessor.removeRegistry(metricRegistry);
294    }
295  }
296
297  /**
298   * Special version of RegionServerEnvironment that exposes RegionServerServices for Core
299   * Coprocessors only. Temporary hack until Core Coprocessors are integrated into Core.
300   */
301  private static class RegionServerEnvironmentForCoreCoprocessors extends RegionServerEnvironment
302    implements HasRegionServerServices {
303    final RegionServerServices regionServerServices;
304
305    public RegionServerEnvironmentForCoreCoprocessors(final RegionServerCoprocessor impl,
306      final int priority, final int seq, final Configuration conf,
307      final RegionServerServices services) {
308      super(impl, priority, seq, conf, services);
309      this.regionServerServices = services;
310    }
311
312    /**
313     * @return An instance of RegionServerServices, an object NOT for general user-space Coprocessor
314     *         consumption.
315     */
316    @Override
317    public RegionServerServices getRegionServerServices() {
318      return this.regionServerServices;
319    }
320  }
321}