001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.regionserver;
020
021import java.io.IOException;
022import java.lang.reflect.InvocationTargetException;
023import org.apache.hadoop.conf.Configuration;
024import org.apache.hadoop.hbase.ServerName;
025import org.apache.hadoop.hbase.client.Connection;
026import org.apache.hadoop.hbase.client.SharedConnection;
027import org.apache.hadoop.hbase.coprocessor.BaseEnvironment;
028import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
029import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor;
030import org.apache.hadoop.hbase.coprocessor.HasRegionServerServices;
031import org.apache.hadoop.hbase.coprocessor.MetricsCoprocessor;
032import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessor;
033import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
034import org.apache.hadoop.hbase.coprocessor.RegionServerObserver;
035import org.apache.hadoop.hbase.metrics.MetricRegistry;
036import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
037import org.apache.hadoop.hbase.security.User;
038import org.apache.yetus.audience.InterfaceAudience;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042import org.apache.hbase.thirdparty.com.google.protobuf.Service;
043
044@InterfaceAudience.Private
045public class RegionServerCoprocessorHost extends
046    CoprocessorHost<RegionServerCoprocessor, RegionServerCoprocessorEnvironment> {
047
048  private static final Logger LOG = LoggerFactory.getLogger(RegionServerCoprocessorHost.class);
049
050  private RegionServerServices rsServices;
051
052  public RegionServerCoprocessorHost(RegionServerServices rsServices,
053      Configuration conf) {
054    super(rsServices);
055    this.rsServices = rsServices;
056    this.conf = conf;
057    // Log the state of coprocessor loading here; should appear only once or
058    // twice in the daemon log, depending on HBase version, because there is
059    // only one RegionServerCoprocessorHost instance in the RS process
060    boolean coprocessorsEnabled = conf.getBoolean(COPROCESSORS_ENABLED_CONF_KEY,
061      DEFAULT_COPROCESSORS_ENABLED);
062    boolean tableCoprocessorsEnabled = conf.getBoolean(USER_COPROCESSORS_ENABLED_CONF_KEY,
063      DEFAULT_USER_COPROCESSORS_ENABLED);
064    LOG.info("System coprocessor loading is " + (coprocessorsEnabled ? "enabled" : "disabled"));
065    LOG.info("Table coprocessor loading is " +
066      ((coprocessorsEnabled && tableCoprocessorsEnabled) ? "enabled" : "disabled"));
067    loadSystemCoprocessors(conf, REGIONSERVER_COPROCESSOR_CONF_KEY);
068  }
069
070  @Override
071  public RegionServerEnvironment createEnvironment(
072      RegionServerCoprocessor instance, int priority, int sequence, Configuration conf) {
073    // If a CoreCoprocessor, return a 'richer' environment, one laden with RegionServerServices.
074    return instance.getClass().isAnnotationPresent(CoreCoprocessor.class)?
075      new RegionServerEnvironmentForCoreCoprocessors(instance, priority, sequence, conf,
076            this.rsServices):
077      new RegionServerEnvironment(instance, priority, sequence, conf, this.rsServices);
078  }
079
080  @Override
081  public RegionServerCoprocessor checkAndGetInstance(Class<?> implClass)
082      throws InstantiationException, IllegalAccessException {
083    try {
084      if (RegionServerCoprocessor.class.isAssignableFrom(implClass)) {
085        return implClass.asSubclass(RegionServerCoprocessor.class).getDeclaredConstructor()
086            .newInstance();
087      } else {
088        LOG.error("{} is not of type RegionServerCoprocessor. Check the configuration of {}",
089            implClass.getName(), CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY);
090        return null;
091      }
092    } catch (NoSuchMethodException | InvocationTargetException e) {
093      throw (InstantiationException) new InstantiationException(implClass.getName()).initCause(e);
094    }
095  }
096
097  private ObserverGetter<RegionServerCoprocessor, RegionServerObserver> rsObserverGetter =
098      RegionServerCoprocessor::getRegionServerObserver;
099
100  abstract class RegionServerObserverOperation extends
101      ObserverOperationWithoutResult<RegionServerObserver> {
102    public RegionServerObserverOperation() {
103      super(rsObserverGetter);
104    }
105
106    public RegionServerObserverOperation(User user) {
107      super(rsObserverGetter, user);
108    }
109  }
110
111  //////////////////////////////////////////////////////////////////////////////////////////////////
112  // RegionServerObserver operations
113  //////////////////////////////////////////////////////////////////////////////////////////////////
114
115  public void preStop(String message, User user) throws IOException {
116    // While stopping the region server all coprocessors method should be executed first then the
117    // coprocessor should be cleaned up.
118    if (coprocEnvironments.isEmpty()) {
119      return;
120    }
121    execShutdown(new RegionServerObserverOperation(user) {
122      @Override
123      public void call(RegionServerObserver observer) throws IOException {
124        observer.preStopRegionServer(this);
125      }
126
127      @Override
128      public void postEnvCall() {
129        // invoke coprocessor stop method
130        shutdown(this.getEnvironment());
131      }
132    });
133  }
134
135  public void preRollWALWriterRequest() throws IOException {
136    execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
137      @Override
138      public void call(RegionServerObserver observer) throws IOException {
139        observer.preRollWALWriterRequest(this);
140      }
141    });
142  }
143
144  public void postRollWALWriterRequest() throws IOException {
145    execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
146      @Override
147      public void call(RegionServerObserver observer) throws IOException {
148        observer.postRollWALWriterRequest(this);
149      }
150    });
151  }
152
153  public void preReplicateLogEntries()
154      throws IOException {
155    execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
156      @Override
157      public void call(RegionServerObserver observer) throws IOException {
158        observer.preReplicateLogEntries(this);
159      }
160    });
161  }
162
163  public void postReplicateLogEntries()
164      throws IOException {
165    execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
166      @Override
167      public void call(RegionServerObserver observer) throws IOException {
168        observer.postReplicateLogEntries(this);
169      }
170    });
171  }
172
173  public ReplicationEndpoint postCreateReplicationEndPoint(final ReplicationEndpoint endpoint)
174      throws IOException {
175    if (this.coprocEnvironments.isEmpty()) {
176      return endpoint;
177    }
178    return execOperationWithResult(
179        new ObserverOperationWithResult<RegionServerObserver, ReplicationEndpoint>(
180            rsObserverGetter, endpoint) {
181      @Override
182      public ReplicationEndpoint call(RegionServerObserver observer) throws IOException {
183        return observer.postCreateReplicationEndPoint(this, getResult());
184      }
185    });
186  }
187
188  public void preClearCompactionQueues() throws IOException {
189    execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
190      @Override
191      public void call(RegionServerObserver observer) throws IOException {
192        observer.preClearCompactionQueues(this);
193      }
194    });
195  }
196
197  public void postClearCompactionQueues() throws IOException {
198    execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
199      @Override
200      public void call(RegionServerObserver observer) throws IOException {
201        observer.postClearCompactionQueues(this);
202      }
203    });
204  }
205
206  public void preExecuteProcedures() throws IOException {
207    execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
208      @Override
209      public void call(RegionServerObserver observer) throws IOException {
210        observer.preExecuteProcedures(this);
211      }
212    });
213  }
214
215  public void postExecuteProcedures() throws IOException {
216    execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
217      @Override
218      public void call(RegionServerObserver observer) throws IOException {
219        observer.postExecuteProcedures(this);
220      }
221    });
222  }
223
224  /**
225   * Coprocessor environment extension providing access to region server
226   * related services.
227   */
228  private static class RegionServerEnvironment extends BaseEnvironment<RegionServerCoprocessor>
229      implements RegionServerCoprocessorEnvironment {
230    private final MetricRegistry metricRegistry;
231    private final RegionServerServices services;
232
233    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="BC_UNCONFIRMED_CAST",
234        justification="Intentional; FB has trouble detecting isAssignableFrom")
235    public RegionServerEnvironment(final RegionServerCoprocessor impl, final int priority,
236        final int seq, final Configuration conf, final RegionServerServices services) {
237      super(impl, priority, seq, conf);
238      // If coprocessor exposes any services, register them.
239      for (Service service : impl.getServices()) {
240        services.registerService(service);
241      }
242      this.services = services;
243      this.metricRegistry =
244          MetricsCoprocessor.createRegistryForRSCoprocessor(impl.getClass().getName());
245    }
246
247    @Override
248    public OnlineRegions getOnlineRegions() {
249      return this.services;
250    }
251
252    @Override
253    public ServerName getServerName() {
254      return this.services.getServerName();
255    }
256
257    @Override
258    public Connection getConnection() {
259      return new SharedConnection(this.services.getConnection());
260    }
261
262    @Override
263    public Connection createConnection(Configuration conf) throws IOException {
264      return this.services.createConnection(conf);
265    }
266
267    @Override
268    public MetricRegistry getMetricRegistryForRegionServer() {
269      return metricRegistry;
270    }
271
272    @Override
273    public void shutdown() {
274      super.shutdown();
275      MetricsCoprocessor.removeRegistry(metricRegistry);
276    }
277  }
278
279  /**
280   * Special version of RegionServerEnvironment that exposes RegionServerServices for Core
281   * Coprocessors only. Temporary hack until Core Coprocessors are integrated into Core.
282   */
283  private static class RegionServerEnvironmentForCoreCoprocessors extends RegionServerEnvironment
284      implements HasRegionServerServices {
285    final RegionServerServices regionServerServices;
286
287    public RegionServerEnvironmentForCoreCoprocessors(final RegionServerCoprocessor impl,
288          final int priority, final int seq, final Configuration conf,
289          final RegionServerServices services) {
290       super(impl, priority, seq, conf, services);
291       this.regionServerServices = services;
292    }
293
294    /**
295     * @return An instance of RegionServerServices, an object NOT for general user-space Coprocessor
296     * consumption.
297     */
298    @Override
299    public RegionServerServices getRegionServerServices() {
300      return this.regionServerServices;
301    }
302  }
303}