001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import com.google.protobuf.Service;
021import java.io.IOException;
022import java.lang.reflect.InvocationTargetException;
023import org.apache.hadoop.conf.Configuration;
024import org.apache.hadoop.hbase.ServerName;
025import org.apache.hadoop.hbase.SharedConnection;
026import org.apache.hadoop.hbase.client.Connection;
027import org.apache.hadoop.hbase.coprocessor.BaseEnvironment;
028import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
029import org.apache.hadoop.hbase.coprocessor.CoprocessorServiceBackwardCompatiblity;
030import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor;
031import org.apache.hadoop.hbase.coprocessor.HasRegionServerServices;
032import org.apache.hadoop.hbase.coprocessor.MetricsCoprocessor;
033import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessor;
034import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
035import org.apache.hadoop.hbase.coprocessor.RegionServerObserver;
036import org.apache.hadoop.hbase.coprocessor.SingletonCoprocessorService;
037import org.apache.hadoop.hbase.metrics.MetricRegistry;
038import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
039import org.apache.hadoop.hbase.security.User;
040import org.apache.yetus.audience.InterfaceAudience;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044@InterfaceAudience.Private
045public class RegionServerCoprocessorHost
046  extends CoprocessorHost<RegionServerCoprocessor, RegionServerCoprocessorEnvironment> {
047
048  private static final Logger LOG = LoggerFactory.getLogger(RegionServerCoprocessorHost.class);
049
050  private RegionServerServices rsServices;
051
052  public RegionServerCoprocessorHost(RegionServerServices rsServices, Configuration conf) {
053    super(rsServices);
054    this.rsServices = rsServices;
055    this.conf = conf;
056    // Log the state of coprocessor loading here; should appear only once or
057    // twice in the daemon log, depending on HBase version, because there is
058    // only one RegionServerCoprocessorHost instance in the RS process
059    boolean coprocessorsEnabled =
060      conf.getBoolean(COPROCESSORS_ENABLED_CONF_KEY, DEFAULT_COPROCESSORS_ENABLED);
061    boolean tableCoprocessorsEnabled =
062      conf.getBoolean(USER_COPROCESSORS_ENABLED_CONF_KEY, DEFAULT_USER_COPROCESSORS_ENABLED);
063    LOG.info("System coprocessor loading is " + (coprocessorsEnabled ? "enabled" : "disabled"));
064    LOG.info("Table coprocessor loading is "
065      + ((coprocessorsEnabled && tableCoprocessorsEnabled) ? "enabled" : "disabled"));
066    loadSystemCoprocessors(conf, REGIONSERVER_COPROCESSOR_CONF_KEY);
067  }
068
069  @Override
070  public RegionServerEnvironment createEnvironment(RegionServerCoprocessor instance, int priority,
071    int sequence, Configuration conf) {
072    // If a CoreCoprocessor, return a 'richer' environment, one laden with RegionServerServices.
073    return instance.getClass().isAnnotationPresent(CoreCoprocessor.class)
074      ? new RegionServerEnvironmentForCoreCoprocessors(instance, priority, sequence, conf,
075        this.rsServices)
076      : new RegionServerEnvironment(instance, priority, sequence, conf, this.rsServices);
077  }
078
079  @Override
080  public RegionServerCoprocessor checkAndGetInstance(Class<?> implClass)
081    throws InstantiationException, IllegalAccessException {
082    try {
083      if (RegionServerCoprocessor.class.isAssignableFrom(implClass)) {
084        return implClass.asSubclass(RegionServerCoprocessor.class).getDeclaredConstructor()
085          .newInstance();
086      } else if (SingletonCoprocessorService.class.isAssignableFrom(implClass)) {
087        // For backward compatibility with old CoprocessorService impl which don't extend
088        // RegionCoprocessor.
089        SingletonCoprocessorService tmp = implClass.asSubclass(SingletonCoprocessorService.class)
090          .getDeclaredConstructor().newInstance();
091        return new CoprocessorServiceBackwardCompatiblity.RegionServerCoprocessorService(tmp);
092      } else {
093        LOG.error("{} is not of type RegionServerCoprocessor. Check the configuration of {}",
094          implClass.getName(), CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY);
095        return null;
096      }
097    } catch (NoSuchMethodException | InvocationTargetException e) {
098      throw (InstantiationException) new InstantiationException(implClass.getName()).initCause(e);
099    }
100  }
101
102  private ObserverGetter<RegionServerCoprocessor, RegionServerObserver> rsObserverGetter =
103    RegionServerCoprocessor::getRegionServerObserver;
104
105  abstract class RegionServerObserverOperation
106    extends ObserverOperationWithoutResult<RegionServerObserver> {
107    public RegionServerObserverOperation() {
108      super(rsObserverGetter);
109    }
110
111    public RegionServerObserverOperation(User user) {
112      super(rsObserverGetter, user);
113    }
114  }
115
116  //////////////////////////////////////////////////////////////////////////////////////////////////
117  // RegionServerObserver operations
118  //////////////////////////////////////////////////////////////////////////////////////////////////
119
120  public void preStop(String message, User user) throws IOException {
121    // While stopping the region server all coprocessors method should be executed first then the
122    // coprocessor should be cleaned up.
123    if (coprocEnvironments.isEmpty()) {
124      return;
125    }
126    execShutdown(new RegionServerObserverOperation(user) {
127      @Override
128      public void call(RegionServerObserver observer) throws IOException {
129        observer.preStopRegionServer(this);
130      }
131
132      @Override
133      public void postEnvCall() {
134        // invoke coprocessor stop method
135        shutdown(this.getEnvironment());
136      }
137    });
138  }
139
140  public void preRollWALWriterRequest() throws IOException {
141    execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
142      @Override
143      public void call(RegionServerObserver observer) throws IOException {
144        observer.preRollWALWriterRequest(this);
145      }
146    });
147  }
148
149  public void postRollWALWriterRequest() throws IOException {
150    execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
151      @Override
152      public void call(RegionServerObserver observer) throws IOException {
153        observer.postRollWALWriterRequest(this);
154      }
155    });
156  }
157
158  public void preReplicateLogEntries() throws IOException {
159    execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
160      @Override
161      public void call(RegionServerObserver observer) throws IOException {
162        observer.preReplicateLogEntries(this);
163      }
164    });
165  }
166
167  public void postReplicateLogEntries() throws IOException {
168    execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
169      @Override
170      public void call(RegionServerObserver observer) throws IOException {
171        observer.postReplicateLogEntries(this);
172      }
173    });
174  }
175
176  public ReplicationEndpoint postCreateReplicationEndPoint(final ReplicationEndpoint endpoint)
177    throws IOException {
178    if (this.coprocEnvironments.isEmpty()) {
179      return endpoint;
180    }
181    return execOperationWithResult(
182      new ObserverOperationWithResult<RegionServerObserver, ReplicationEndpoint>(rsObserverGetter,
183        endpoint) {
184        @Override
185        public ReplicationEndpoint call(RegionServerObserver observer) throws IOException {
186          return observer.postCreateReplicationEndPoint(this, getResult());
187        }
188      });
189  }
190
191  public void preClearCompactionQueues() throws IOException {
192    execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
193      @Override
194      public void call(RegionServerObserver observer) throws IOException {
195        observer.preClearCompactionQueues(this);
196      }
197    });
198  }
199
200  public void postClearCompactionQueues() throws IOException {
201    execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
202      @Override
203      public void call(RegionServerObserver observer) throws IOException {
204        observer.postClearCompactionQueues(this);
205      }
206    });
207  }
208
209  public void preExecuteProcedures() throws IOException {
210    execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
211      @Override
212      public void call(RegionServerObserver observer) throws IOException {
213        observer.preExecuteProcedures(this);
214      }
215    });
216  }
217
218  public void postExecuteProcedures() throws IOException {
219    execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
220      @Override
221      public void call(RegionServerObserver observer) throws IOException {
222        observer.postExecuteProcedures(this);
223      }
224    });
225  }
226
227  /**
228   * Coprocessor environment extension providing access to region server related services.
229   */
230  private static class RegionServerEnvironment extends BaseEnvironment<RegionServerCoprocessor>
231    implements RegionServerCoprocessorEnvironment {
232    private final MetricRegistry metricRegistry;
233    private final RegionServerServices services;
234
235    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "BC_UNCONFIRMED_CAST",
236        justification = "Intentional; FB has trouble detecting isAssignableFrom")
237    public RegionServerEnvironment(final RegionServerCoprocessor impl, final int priority,
238      final int seq, final Configuration conf, final RegionServerServices services) {
239      super(impl, priority, seq, conf);
240      // If coprocessor exposes any services, register them.
241      for (Service service : impl.getServices()) {
242        services.registerService(service);
243      }
244      this.services = services;
245      this.metricRegistry =
246        MetricsCoprocessor.createRegistryForRSCoprocessor(impl.getClass().getName());
247    }
248
249    @Override
250    public OnlineRegions getOnlineRegions() {
251      return this.services;
252    }
253
254    @Override
255    public ServerName getServerName() {
256      return this.services.getServerName();
257    }
258
259    @Override
260    public Connection getConnection() {
261      return new SharedConnection(this.services.getConnection());
262    }
263
264    @Override
265    public Connection createConnection(Configuration conf) throws IOException {
266      return this.services.createConnection(conf);
267    }
268
269    @Override
270    public MetricRegistry getMetricRegistryForRegionServer() {
271      return metricRegistry;
272    }
273
274    @Override
275    public void shutdown() {
276      super.shutdown();
277      MetricsCoprocessor.removeRegistry(metricRegistry);
278    }
279  }
280
281  /**
282   * Special version of RegionServerEnvironment that exposes RegionServerServices for Core
283   * Coprocessors only. Temporary hack until Core Coprocessors are integrated into Core.
284   */
285  private static class RegionServerEnvironmentForCoreCoprocessors extends RegionServerEnvironment
286    implements HasRegionServerServices {
287    final RegionServerServices regionServerServices;
288
289    public RegionServerEnvironmentForCoreCoprocessors(final RegionServerCoprocessor impl,
290      final int priority, final int seq, final Configuration conf,
291      final RegionServerServices services) {
292      super(impl, priority, seq, conf, services);
293      this.regionServerServices = services;
294    }
295
296    /**
297     * @return An instance of RegionServerServices, an object NOT for general user-space Coprocessor
298     *         consumption.
299     */
300    @Override
301    public RegionServerServices getRegionServerServices() {
302      return this.regionServerServices;
303    }
304  }
305}