001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.io.IOException;
021import java.lang.reflect.InvocationTargetException;
022import org.apache.hadoop.conf.Configuration;
023import org.apache.hadoop.hbase.ServerName;
024import org.apache.hadoop.hbase.client.Connection;
025import org.apache.hadoop.hbase.client.SharedConnection;
026import org.apache.hadoop.hbase.coprocessor.BaseEnvironment;
027import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
028import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor;
029import org.apache.hadoop.hbase.coprocessor.HasRegionServerServices;
030import org.apache.hadoop.hbase.coprocessor.MetricsCoprocessor;
031import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessor;
032import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
033import org.apache.hadoop.hbase.coprocessor.RegionServerObserver;
034import org.apache.hadoop.hbase.metrics.MetricRegistry;
035import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
036import org.apache.hadoop.hbase.security.User;
037import org.apache.yetus.audience.InterfaceAudience;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041import org.apache.hbase.thirdparty.com.google.protobuf.Service;
042
043@InterfaceAudience.Private
044public class RegionServerCoprocessorHost
045  extends CoprocessorHost<RegionServerCoprocessor, RegionServerCoprocessorEnvironment> {
046
047  private static final Logger LOG = LoggerFactory.getLogger(RegionServerCoprocessorHost.class);
048
049  private RegionServerServices rsServices;
050
051  public RegionServerCoprocessorHost(RegionServerServices rsServices, Configuration conf) {
052    super(rsServices);
053    this.rsServices = rsServices;
054    this.conf = conf;
055    // Log the state of coprocessor loading here; should appear only once or
056    // twice in the daemon log, depending on HBase version, because there is
057    // only one RegionServerCoprocessorHost instance in the RS process
058    boolean coprocessorsEnabled =
059      conf.getBoolean(COPROCESSORS_ENABLED_CONF_KEY, DEFAULT_COPROCESSORS_ENABLED);
060    boolean tableCoprocessorsEnabled =
061      conf.getBoolean(USER_COPROCESSORS_ENABLED_CONF_KEY, DEFAULT_USER_COPROCESSORS_ENABLED);
062    LOG.info("System coprocessor loading is " + (coprocessorsEnabled ? "enabled" : "disabled"));
063    LOG.info("Table coprocessor loading is "
064      + ((coprocessorsEnabled && tableCoprocessorsEnabled) ? "enabled" : "disabled"));
065    loadSystemCoprocessors(conf, REGIONSERVER_COPROCESSOR_CONF_KEY);
066  }
067
068  @Override
069  public RegionServerEnvironment createEnvironment(RegionServerCoprocessor instance, int priority,
070    int sequence, Configuration conf) {
071    // If a CoreCoprocessor, return a 'richer' environment, one laden with RegionServerServices.
072    return instance.getClass().isAnnotationPresent(CoreCoprocessor.class)
073      ? new RegionServerEnvironmentForCoreCoprocessors(instance, priority, sequence, conf,
074        this.rsServices)
075      : new RegionServerEnvironment(instance, priority, sequence, conf, this.rsServices);
076  }
077
078  @Override
079  public RegionServerCoprocessor checkAndGetInstance(Class<?> implClass)
080    throws InstantiationException, IllegalAccessException {
081    try {
082      if (RegionServerCoprocessor.class.isAssignableFrom(implClass)) {
083        return implClass.asSubclass(RegionServerCoprocessor.class).getDeclaredConstructor()
084          .newInstance();
085      } else {
086        LOG.error("{} is not of type RegionServerCoprocessor. Check the configuration of {}",
087          implClass.getName(), CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY);
088        return null;
089      }
090    } catch (NoSuchMethodException | InvocationTargetException e) {
091      throw (InstantiationException) new InstantiationException(implClass.getName()).initCause(e);
092    }
093  }
094
095  private ObserverGetter<RegionServerCoprocessor, RegionServerObserver> rsObserverGetter =
096    RegionServerCoprocessor::getRegionServerObserver;
097
098  abstract class RegionServerObserverOperation
099    extends ObserverOperationWithoutResult<RegionServerObserver> {
100    public RegionServerObserverOperation() {
101      super(rsObserverGetter);
102    }
103
104    public RegionServerObserverOperation(User user) {
105      super(rsObserverGetter, user);
106    }
107  }
108
109  //////////////////////////////////////////////////////////////////////////////////////////////////
110  // RegionServerObserver operations
111  //////////////////////////////////////////////////////////////////////////////////////////////////
112
113  public void preStop(String message, User user) throws IOException {
114    // While stopping the region server all coprocessors method should be executed first then the
115    // coprocessor should be cleaned up.
116    if (coprocEnvironments.isEmpty()) {
117      return;
118    }
119    execShutdown(new RegionServerObserverOperation(user) {
120      @Override
121      public void call(RegionServerObserver observer) throws IOException {
122        observer.preStopRegionServer(this);
123      }
124
125      @Override
126      public void postEnvCall() {
127        // invoke coprocessor stop method
128        shutdown(this.getEnvironment());
129      }
130    });
131  }
132
133  public void preRollWALWriterRequest() throws IOException {
134    execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
135      @Override
136      public void call(RegionServerObserver observer) throws IOException {
137        observer.preRollWALWriterRequest(this);
138      }
139    });
140  }
141
142  public void postRollWALWriterRequest() throws IOException {
143    execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
144      @Override
145      public void call(RegionServerObserver observer) throws IOException {
146        observer.postRollWALWriterRequest(this);
147      }
148    });
149  }
150
151  public void preReplicateLogEntries() throws IOException {
152    execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
153      @Override
154      public void call(RegionServerObserver observer) throws IOException {
155        observer.preReplicateLogEntries(this);
156      }
157    });
158  }
159
160  public void postReplicateLogEntries() throws IOException {
161    execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
162      @Override
163      public void call(RegionServerObserver observer) throws IOException {
164        observer.postReplicateLogEntries(this);
165      }
166    });
167  }
168
169  public ReplicationEndpoint postCreateReplicationEndPoint(final ReplicationEndpoint endpoint)
170    throws IOException {
171    if (this.coprocEnvironments.isEmpty()) {
172      return endpoint;
173    }
174    return execOperationWithResult(
175      new ObserverOperationWithResult<RegionServerObserver, ReplicationEndpoint>(rsObserverGetter,
176        endpoint) {
177        @Override
178        public ReplicationEndpoint call(RegionServerObserver observer) throws IOException {
179          return observer.postCreateReplicationEndPoint(this, getResult());
180        }
181      });
182  }
183
184  public void preClearCompactionQueues() throws IOException {
185    execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
186      @Override
187      public void call(RegionServerObserver observer) throws IOException {
188        observer.preClearCompactionQueues(this);
189      }
190    });
191  }
192
193  public void postClearCompactionQueues() throws IOException {
194    execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
195      @Override
196      public void call(RegionServerObserver observer) throws IOException {
197        observer.postClearCompactionQueues(this);
198      }
199    });
200  }
201
202  public void preExecuteProcedures() throws IOException {
203    execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
204      @Override
205      public void call(RegionServerObserver observer) throws IOException {
206        observer.preExecuteProcedures(this);
207      }
208    });
209  }
210
211  public void postExecuteProcedures() throws IOException {
212    execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
213      @Override
214      public void call(RegionServerObserver observer) throws IOException {
215        observer.postExecuteProcedures(this);
216      }
217    });
218  }
219
220  /**
221   * Coprocessor environment extension providing access to region server related services.
222   */
223  private static class RegionServerEnvironment extends BaseEnvironment<RegionServerCoprocessor>
224    implements RegionServerCoprocessorEnvironment {
225    private final MetricRegistry metricRegistry;
226    private final RegionServerServices services;
227
228    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "BC_UNCONFIRMED_CAST",
229        justification = "Intentional; FB has trouble detecting isAssignableFrom")
230    public RegionServerEnvironment(final RegionServerCoprocessor impl, final int priority,
231      final int seq, final Configuration conf, final RegionServerServices services) {
232      super(impl, priority, seq, conf);
233      // If coprocessor exposes any services, register them.
234      for (Service service : impl.getServices()) {
235        services.registerService(service);
236      }
237      this.services = services;
238      this.metricRegistry =
239        MetricsCoprocessor.createRegistryForRSCoprocessor(impl.getClass().getName());
240    }
241
242    @Override
243    public OnlineRegions getOnlineRegions() {
244      return this.services;
245    }
246
247    @Override
248    public ServerName getServerName() {
249      return this.services.getServerName();
250    }
251
252    @Override
253    public Connection getConnection() {
254      return new SharedConnection(this.services.getConnection());
255    }
256
257    @Override
258    public Connection createConnection(Configuration conf) throws IOException {
259      return this.services.createConnection(conf);
260    }
261
262    @Override
263    public MetricRegistry getMetricRegistryForRegionServer() {
264      return metricRegistry;
265    }
266
267    @Override
268    public void shutdown() {
269      super.shutdown();
270      MetricsCoprocessor.removeRegistry(metricRegistry);
271    }
272  }
273
274  /**
275   * Special version of RegionServerEnvironment that exposes RegionServerServices for Core
276   * Coprocessors only. Temporary hack until Core Coprocessors are integrated into Core.
277   */
278  private static class RegionServerEnvironmentForCoreCoprocessors extends RegionServerEnvironment
279    implements HasRegionServerServices {
280    final RegionServerServices regionServerServices;
281
282    public RegionServerEnvironmentForCoreCoprocessors(final RegionServerCoprocessor impl,
283      final int priority, final int seq, final Configuration conf,
284      final RegionServerServices services) {
285      super(impl, priority, seq, conf, services);
286      this.regionServerServices = services;
287    }
288
289    /**
290     * @return An instance of RegionServerServices, an object NOT for general user-space Coprocessor
291     *         consumption.
292     */
293    @Override
294    public RegionServerServices getRegionServerServices() {
295      return this.regionServerServices;
296    }
297  }
298}