001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.regionserver;
020
021import java.io.IOException;
022import java.lang.reflect.InvocationTargetException;
023
024import com.google.protobuf.Service;
025
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.hbase.ServerName;
028import org.apache.hadoop.hbase.client.Connection;
029import org.apache.hadoop.hbase.client.SharedConnection;
030import org.apache.hadoop.hbase.coprocessor.BaseEnvironment;
031import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
032import org.apache.hadoop.hbase.coprocessor.CoprocessorServiceBackwardCompatiblity;
033import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor;
034import org.apache.hadoop.hbase.coprocessor.HasRegionServerServices;
035import org.apache.hadoop.hbase.coprocessor.MetricsCoprocessor;
036import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessor;
037import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
038import org.apache.hadoop.hbase.coprocessor.RegionServerObserver;
039import org.apache.hadoop.hbase.coprocessor.SingletonCoprocessorService;
040import org.apache.hadoop.hbase.metrics.MetricRegistry;
041import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
042import org.apache.hadoop.hbase.security.User;
043import org.apache.yetus.audience.InterfaceAudience;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046
047@InterfaceAudience.Private
048public class RegionServerCoprocessorHost extends
049    CoprocessorHost<RegionServerCoprocessor, RegionServerCoprocessorEnvironment> {
050
051  private static final Logger LOG = LoggerFactory.getLogger(RegionServerCoprocessorHost.class);
052
053  private RegionServerServices rsServices;
054
055  public RegionServerCoprocessorHost(RegionServerServices rsServices,
056      Configuration conf) {
057    super(rsServices);
058    this.rsServices = rsServices;
059    this.conf = conf;
060    // Log the state of coprocessor loading here; should appear only once or
061    // twice in the daemon log, depending on HBase version, because there is
062    // only one RegionServerCoprocessorHost instance in the RS process
063    boolean coprocessorsEnabled = conf.getBoolean(COPROCESSORS_ENABLED_CONF_KEY,
064      DEFAULT_COPROCESSORS_ENABLED);
065    boolean tableCoprocessorsEnabled = conf.getBoolean(USER_COPROCESSORS_ENABLED_CONF_KEY,
066      DEFAULT_USER_COPROCESSORS_ENABLED);
067    LOG.info("System coprocessor loading is " + (coprocessorsEnabled ? "enabled" : "disabled"));
068    LOG.info("Table coprocessor loading is " +
069      ((coprocessorsEnabled && tableCoprocessorsEnabled) ? "enabled" : "disabled"));
070    loadSystemCoprocessors(conf, REGIONSERVER_COPROCESSOR_CONF_KEY);
071  }
072
073  @Override
074  public RegionServerEnvironment createEnvironment(
075      RegionServerCoprocessor instance, int priority, int sequence, Configuration conf) {
076    // If a CoreCoprocessor, return a 'richer' environment, one laden with RegionServerServices.
077    return instance.getClass().isAnnotationPresent(CoreCoprocessor.class)?
078      new RegionServerEnvironmentForCoreCoprocessors(instance, priority, sequence, conf,
079            this.rsServices):
080      new RegionServerEnvironment(instance, priority, sequence, conf, this.rsServices);
081  }
082
083  @Override
084  public RegionServerCoprocessor checkAndGetInstance(Class<?> implClass)
085      throws InstantiationException, IllegalAccessException {
086    try {
087      if (RegionServerCoprocessor.class.isAssignableFrom(implClass)) {
088        return implClass.asSubclass(RegionServerCoprocessor.class).getDeclaredConstructor()
089            .newInstance();
090      } else if (SingletonCoprocessorService.class.isAssignableFrom(implClass)) {
091        // For backward compatibility with old CoprocessorService impl which don't extend
092        // RegionCoprocessor.
093        SingletonCoprocessorService tmp = implClass.asSubclass(SingletonCoprocessorService.class)
094            .getDeclaredConstructor().newInstance();
095        return new CoprocessorServiceBackwardCompatiblity.RegionServerCoprocessorService(tmp);
096      } else {
097        LOG.error("{} is not of type RegionServerCoprocessor. Check the configuration of {}",
098            implClass.getName(), CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY);
099        return null;
100      }
101    } catch (NoSuchMethodException | InvocationTargetException e) {
102      throw (InstantiationException) new InstantiationException(implClass.getName()).initCause(e);
103    }
104  }
105
106  private ObserverGetter<RegionServerCoprocessor, RegionServerObserver> rsObserverGetter =
107      RegionServerCoprocessor::getRegionServerObserver;
108
109  abstract class RegionServerObserverOperation extends
110      ObserverOperationWithoutResult<RegionServerObserver> {
111    public RegionServerObserverOperation() {
112      super(rsObserverGetter);
113    }
114
115    public RegionServerObserverOperation(User user) {
116      super(rsObserverGetter, user);
117    }
118  }
119
120  //////////////////////////////////////////////////////////////////////////////////////////////////
121  // RegionServerObserver operations
122  //////////////////////////////////////////////////////////////////////////////////////////////////
123
124  public void preStop(String message, User user) throws IOException {
125    // While stopping the region server all coprocessors method should be executed first then the
126    // coprocessor should be cleaned up.
127    if (coprocEnvironments.isEmpty()) {
128      return;
129    }
130    execShutdown(new RegionServerObserverOperation(user) {
131      @Override
132      public void call(RegionServerObserver observer) throws IOException {
133        observer.preStopRegionServer(this);
134      }
135
136      @Override
137      public void postEnvCall() {
138        // invoke coprocessor stop method
139        shutdown(this.getEnvironment());
140      }
141    });
142  }
143
144  public void preRollWALWriterRequest() throws IOException {
145    execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
146      @Override
147      public void call(RegionServerObserver observer) throws IOException {
148        observer.preRollWALWriterRequest(this);
149      }
150    });
151  }
152
153  public void postRollWALWriterRequest() throws IOException {
154    execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
155      @Override
156      public void call(RegionServerObserver observer) throws IOException {
157        observer.postRollWALWriterRequest(this);
158      }
159    });
160  }
161
162  public void preReplicateLogEntries()
163      throws IOException {
164    execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
165      @Override
166      public void call(RegionServerObserver observer) throws IOException {
167        observer.preReplicateLogEntries(this);
168      }
169    });
170  }
171
172  public void postReplicateLogEntries()
173      throws IOException {
174    execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
175      @Override
176      public void call(RegionServerObserver observer) throws IOException {
177        observer.postReplicateLogEntries(this);
178      }
179    });
180  }
181
182  public ReplicationEndpoint postCreateReplicationEndPoint(final ReplicationEndpoint endpoint)
183      throws IOException {
184    if (this.coprocEnvironments.isEmpty()) {
185      return endpoint;
186    }
187    return execOperationWithResult(
188        new ObserverOperationWithResult<RegionServerObserver, ReplicationEndpoint>(
189            rsObserverGetter, endpoint) {
190      @Override
191      public ReplicationEndpoint call(RegionServerObserver observer) throws IOException {
192        return observer.postCreateReplicationEndPoint(this, getResult());
193      }
194    });
195  }
196
197  public void preClearCompactionQueues() throws IOException {
198    execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
199      @Override
200      public void call(RegionServerObserver observer) throws IOException {
201        observer.preClearCompactionQueues(this);
202      }
203    });
204  }
205
206  public void postClearCompactionQueues() throws IOException {
207    execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
208      @Override
209      public void call(RegionServerObserver observer) throws IOException {
210        observer.postClearCompactionQueues(this);
211      }
212    });
213  }
214
215  public void preExecuteProcedures() throws IOException {
216    execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
217      @Override
218      public void call(RegionServerObserver observer) throws IOException {
219        observer.preExecuteProcedures(this);
220      }
221    });
222  }
223
224  public void postExecuteProcedures() throws IOException {
225    execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
226      @Override
227      public void call(RegionServerObserver observer) throws IOException {
228        observer.postExecuteProcedures(this);
229      }
230    });
231  }
232
233  /**
234   * Coprocessor environment extension providing access to region server
235   * related services.
236   */
237  private static class RegionServerEnvironment extends BaseEnvironment<RegionServerCoprocessor>
238      implements RegionServerCoprocessorEnvironment {
239    private final MetricRegistry metricRegistry;
240    private final RegionServerServices services;
241
242    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="BC_UNCONFIRMED_CAST",
243        justification="Intentional; FB has trouble detecting isAssignableFrom")
244    public RegionServerEnvironment(final RegionServerCoprocessor impl, final int priority,
245        final int seq, final Configuration conf, final RegionServerServices services) {
246      super(impl, priority, seq, conf);
247      // If coprocessor exposes any services, register them.
248      for (Service service : impl.getServices()) {
249        services.registerService(service);
250      }
251      this.services = services;
252      this.metricRegistry =
253          MetricsCoprocessor.createRegistryForRSCoprocessor(impl.getClass().getName());
254    }
255
256    @Override
257    public OnlineRegions getOnlineRegions() {
258      return this.services;
259    }
260
261    @Override
262    public ServerName getServerName() {
263      return this.services.getServerName();
264    }
265
266    @Override
267    public Connection getConnection() {
268      return new SharedConnection(this.services.getConnection());
269    }
270
271    @Override
272    public Connection createConnection(Configuration conf) throws IOException {
273      return this.services.createConnection(conf);
274    }
275
276    @Override
277    public MetricRegistry getMetricRegistryForRegionServer() {
278      return metricRegistry;
279    }
280
281    @Override
282    public void shutdown() {
283      super.shutdown();
284      MetricsCoprocessor.removeRegistry(metricRegistry);
285    }
286  }
287
288  /**
289   * Special version of RegionServerEnvironment that exposes RegionServerServices for Core
290   * Coprocessors only. Temporary hack until Core Coprocessors are integrated into Core.
291   */
292  private static class RegionServerEnvironmentForCoreCoprocessors extends RegionServerEnvironment
293      implements HasRegionServerServices {
294    final RegionServerServices regionServerServices;
295
296    public RegionServerEnvironmentForCoreCoprocessors(final RegionServerCoprocessor impl,
297          final int priority, final int seq, final Configuration conf,
298          final RegionServerServices services) {
299       super(impl, priority, seq, conf, services);
300       this.regionServerServices = services;
301    }
302
303    /**
304     * @return An instance of RegionServerServices, an object NOT for general user-space Coprocessor
305     * consumption.
306     */
307    @Override
308    public RegionServerServices getRegionServerServices() {
309      return this.regionServerServices;
310    }
311  }
312}