001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.io.IOException;
021import java.lang.reflect.InvocationTargetException;
022import org.apache.hadoop.conf.Configuration;
023import org.apache.hadoop.hbase.CacheEvictionStats;
024import org.apache.hadoop.hbase.ServerName;
025import org.apache.hadoop.hbase.client.Connection;
026import org.apache.hadoop.hbase.client.Mutation;
027import org.apache.hadoop.hbase.client.SharedConnection;
028import org.apache.hadoop.hbase.coprocessor.BaseEnvironment;
029import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
030import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor;
031import org.apache.hadoop.hbase.coprocessor.HasRegionServerServices;
032import org.apache.hadoop.hbase.coprocessor.MetricsCoprocessor;
033import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessor;
034import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
035import org.apache.hadoop.hbase.coprocessor.RegionServerObserver;
036import org.apache.hadoop.hbase.metrics.MetricRegistry;
037import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
038import org.apache.hadoop.hbase.security.User;
039import org.apache.yetus.audience.InterfaceAudience;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043import org.apache.hbase.thirdparty.com.google.protobuf.Service;
044
045import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
046
047@InterfaceAudience.Private
048public class RegionServerCoprocessorHost
049  extends CoprocessorHost<RegionServerCoprocessor, RegionServerCoprocessorEnvironment> {
050
051  private static final Logger LOG = LoggerFactory.getLogger(RegionServerCoprocessorHost.class);
052
053  private RegionServerServices rsServices;
054
055  public RegionServerCoprocessorHost(RegionServerServices rsServices, Configuration conf) {
056    super(rsServices);
057    this.rsServices = rsServices;
058    this.conf = conf;
059    // Log the state of coprocessor loading here; should appear only once or
060    // twice in the daemon log, depending on HBase version, because there is
061    // only one RegionServerCoprocessorHost instance in the RS process
062    boolean coprocessorsEnabled =
063      conf.getBoolean(COPROCESSORS_ENABLED_CONF_KEY, DEFAULT_COPROCESSORS_ENABLED);
064    boolean tableCoprocessorsEnabled =
065      conf.getBoolean(USER_COPROCESSORS_ENABLED_CONF_KEY, DEFAULT_USER_COPROCESSORS_ENABLED);
066    LOG.info("System coprocessor loading is " + (coprocessorsEnabled ? "enabled" : "disabled"));
067    LOG.info("Table coprocessor loading is "
068      + ((coprocessorsEnabled && tableCoprocessorsEnabled) ? "enabled" : "disabled"));
069    loadSystemCoprocessors(conf, REGIONSERVER_COPROCESSOR_CONF_KEY);
070  }
071
072  @Override
073  public RegionServerEnvironment createEnvironment(RegionServerCoprocessor instance, int priority,
074    int sequence, Configuration conf) {
075    // If a CoreCoprocessor, return a 'richer' environment, one laden with RegionServerServices.
076    return instance.getClass().isAnnotationPresent(CoreCoprocessor.class)
077      ? new RegionServerEnvironmentForCoreCoprocessors(instance, priority, sequence, conf,
078        this.rsServices)
079      : new RegionServerEnvironment(instance, priority, sequence, conf, this.rsServices);
080  }
081
082  @Override
083  public RegionServerCoprocessor checkAndGetInstance(Class<?> implClass)
084    throws InstantiationException, IllegalAccessException {
085    try {
086      if (RegionServerCoprocessor.class.isAssignableFrom(implClass)) {
087        return implClass.asSubclass(RegionServerCoprocessor.class).getDeclaredConstructor()
088          .newInstance();
089      } else {
090        LOG.error("{} is not of type RegionServerCoprocessor. Check the configuration of {}",
091          implClass.getName(), CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY);
092        return null;
093      }
094    } catch (NoSuchMethodException | InvocationTargetException e) {
095      throw (InstantiationException) new InstantiationException(implClass.getName()).initCause(e);
096    }
097  }
098
099  private ObserverGetter<RegionServerCoprocessor, RegionServerObserver> rsObserverGetter =
100    RegionServerCoprocessor::getRegionServerObserver;
101
102  abstract class RegionServerObserverOperation
103    extends ObserverOperationWithoutResult<RegionServerObserver> {
104    public RegionServerObserverOperation() {
105      super(rsObserverGetter);
106    }
107
108    public RegionServerObserverOperation(User user) {
109      super(rsObserverGetter, user);
110    }
111  }
112
113  //////////////////////////////////////////////////////////////////////////////////////////////////
114  // RegionServerObserver operations
115  //////////////////////////////////////////////////////////////////////////////////////////////////
116
117  public void preStop(String message, User user) throws IOException {
118    // While stopping the region server all coprocessors method should be executed first then the
119    // coprocessor should be cleaned up.
120    if (coprocEnvironments.isEmpty()) {
121      return;
122    }
123    execShutdown(new RegionServerObserverOperation(user) {
124      @Override
125      public void call(RegionServerObserver observer) throws IOException {
126        observer.preStopRegionServer(this);
127      }
128
129      @Override
130      public void postEnvCall() {
131        // invoke coprocessor stop method
132        shutdown(this.getEnvironment());
133      }
134    });
135  }
136
137  public void preRollWALWriterRequest() throws IOException {
138    execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
139      @Override
140      public void call(RegionServerObserver observer) throws IOException {
141        observer.preRollWALWriterRequest(this);
142      }
143    });
144  }
145
146  public void postRollWALWriterRequest() throws IOException {
147    execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
148      @Override
149      public void call(RegionServerObserver observer) throws IOException {
150        observer.postRollWALWriterRequest(this);
151      }
152    });
153  }
154
155  public void preReplicateLogEntries() throws IOException {
156    execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
157      @Override
158      public void call(RegionServerObserver observer) throws IOException {
159        observer.preReplicateLogEntries(this);
160      }
161    });
162  }
163
164  public void postReplicateLogEntries() throws IOException {
165    execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
166      @Override
167      public void call(RegionServerObserver observer) throws IOException {
168        observer.postReplicateLogEntries(this);
169      }
170    });
171  }
172
173  public void preReplicationSinkBatchMutate(AdminProtos.WALEntry walEntry, Mutation mutation)
174    throws IOException {
175    execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
176      @Override
177      public void call(RegionServerObserver observer) throws IOException {
178        observer.preReplicationSinkBatchMutate(this, walEntry, mutation);
179      }
180    });
181  }
182
183  public void postReplicationSinkBatchMutate(AdminProtos.WALEntry walEntry, Mutation mutation)
184    throws IOException {
185    execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
186      @Override
187      public void call(RegionServerObserver observer) throws IOException {
188        observer.postReplicationSinkBatchMutate(this, walEntry, mutation);
189      }
190    });
191  }
192
193  public ReplicationEndpoint postCreateReplicationEndPoint(final ReplicationEndpoint endpoint)
194    throws IOException {
195    if (this.coprocEnvironments.isEmpty()) {
196      return endpoint;
197    }
198    return execOperationWithResult(
199      new ObserverOperationWithResult<RegionServerObserver, ReplicationEndpoint>(rsObserverGetter,
200        endpoint) {
201        @Override
202        public ReplicationEndpoint call(RegionServerObserver observer) throws IOException {
203          return observer.postCreateReplicationEndPoint(this, getResult());
204        }
205      });
206  }
207
208  public void preClearCompactionQueues() throws IOException {
209    execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
210      @Override
211      public void call(RegionServerObserver observer) throws IOException {
212        observer.preClearCompactionQueues(this);
213      }
214    });
215  }
216
217  public void postClearCompactionQueues() throws IOException {
218    execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
219      @Override
220      public void call(RegionServerObserver observer) throws IOException {
221        observer.postClearCompactionQueues(this);
222      }
223    });
224  }
225
226  public void preExecuteProcedures() throws IOException {
227    execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
228      @Override
229      public void call(RegionServerObserver observer) throws IOException {
230        observer.preExecuteProcedures(this);
231      }
232    });
233  }
234
235  public void postExecuteProcedures() throws IOException {
236    execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
237      @Override
238      public void call(RegionServerObserver observer) throws IOException {
239        observer.postExecuteProcedures(this);
240      }
241    });
242  }
243
244  public void preUpdateConfiguration(Configuration preReloadConf) throws IOException {
245    execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
246      @Override
247      public void call(RegionServerObserver observer) throws IOException {
248        observer.preUpdateRegionServerConfiguration(this, preReloadConf);
249      }
250    });
251  }
252
253  public void postUpdateConfiguration(Configuration postReloadConf) throws IOException {
254    execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
255      @Override
256      public void call(RegionServerObserver observer) throws IOException {
257        observer.postUpdateRegionServerConfiguration(this, postReloadConf);
258      }
259    });
260  }
261
262  public void preClearRegionBlockCache() throws IOException {
263    execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
264      @Override
265      public void call(RegionServerObserver observer) throws IOException {
266        observer.preClearRegionBlockCache(this);
267      }
268    });
269  }
270
271  public void postClearRegionBlockCache(CacheEvictionStats stats) throws IOException {
272    execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
273      @Override
274      public void call(RegionServerObserver observer) throws IOException {
275        observer.postClearRegionBlockCache(this, stats);
276      }
277    });
278  }
279
280  /**
281   * Coprocessor environment extension providing access to region server related services.
282   */
283  private static class RegionServerEnvironment extends BaseEnvironment<RegionServerCoprocessor>
284    implements RegionServerCoprocessorEnvironment {
285    private final MetricRegistry metricRegistry;
286    private final RegionServerServices services;
287
288    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "BC_UNCONFIRMED_CAST",
289        justification = "Intentional; FB has trouble detecting isAssignableFrom")
290    public RegionServerEnvironment(final RegionServerCoprocessor impl, final int priority,
291      final int seq, final Configuration conf, final RegionServerServices services) {
292      super(impl, priority, seq, conf);
293      // If coprocessor exposes any services, register them.
294      for (Service service : impl.getServices()) {
295        services.registerService(service);
296      }
297      this.services = services;
298      this.metricRegistry =
299        MetricsCoprocessor.createRegistryForRSCoprocessor(impl.getClass().getName());
300    }
301
302    @Override
303    public OnlineRegions getOnlineRegions() {
304      return this.services;
305    }
306
307    @Override
308    public ServerName getServerName() {
309      return this.services.getServerName();
310    }
311
312    @Override
313    public Connection getConnection() {
314      return new SharedConnection(this.services.getConnection());
315    }
316
317    @Override
318    public Connection createConnection(Configuration conf) throws IOException {
319      return this.services.createConnection(conf);
320    }
321
322    @Override
323    public MetricRegistry getMetricRegistryForRegionServer() {
324      return metricRegistry;
325    }
326
327    @Override
328    public void shutdown() {
329      super.shutdown();
330      MetricsCoprocessor.removeRegistry(metricRegistry);
331    }
332  }
333
334  /**
335   * Special version of RegionServerEnvironment that exposes RegionServerServices for Core
336   * Coprocessors only. Temporary hack until Core Coprocessors are integrated into Core.
337   */
338  private static class RegionServerEnvironmentForCoreCoprocessors extends RegionServerEnvironment
339    implements HasRegionServerServices {
340    final RegionServerServices regionServerServices;
341
342    public RegionServerEnvironmentForCoreCoprocessors(final RegionServerCoprocessor impl,
343      final int priority, final int seq, final Configuration conf,
344      final RegionServerServices services) {
345      super(impl, priority, seq, conf, services);
346      this.regionServerServices = services;
347    }
348
349    /**
350     * @return An instance of RegionServerServices, an object NOT for general user-space Coprocessor
351     *         consumption.
352     */
353    @Override
354    public RegionServerServices getRegionServerServices() {
355      return this.regionServerServices;
356    }
357  }
358}