001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import com.google.errorprone.annotations.RestrictedApi;
021import java.io.IOException;
022import java.lang.reflect.InvocationTargetException;
023import java.lang.reflect.Method;
024import java.net.BindException;
025import java.net.InetSocketAddress;
026import java.util.Collections;
027import java.util.LinkedHashMap;
028import java.util.List;
029import java.util.Map;
030import java.util.Optional;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.hbase.client.ConnectionUtils;
033import org.apache.hadoop.hbase.conf.ConfigurationObserver;
034import org.apache.hadoop.hbase.coprocessor.ClientMetaCoprocessorHost;
035import org.apache.hadoop.hbase.io.ByteBuffAllocator;
036import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
037import org.apache.hadoop.hbase.ipc.PriorityFunction;
038import org.apache.hadoop.hbase.ipc.QosPriority;
039import org.apache.hadoop.hbase.ipc.RpcScheduler;
040import org.apache.hadoop.hbase.ipc.RpcServer;
041import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
042import org.apache.hadoop.hbase.ipc.RpcServerFactory;
043import org.apache.hadoop.hbase.ipc.RpcServerInterface;
044import org.apache.hadoop.hbase.namequeues.NamedQueuePayload;
045import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
046import org.apache.hadoop.hbase.namequeues.RpcLogDetails;
047import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
048import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
049import org.apache.hadoop.hbase.net.Address;
050import org.apache.hadoop.hbase.regionserver.RpcSchedulerFactory;
051import org.apache.hadoop.hbase.security.User;
052import org.apache.hadoop.hbase.security.access.AccessChecker;
053import org.apache.hadoop.hbase.security.access.NoopAccessChecker;
054import org.apache.hadoop.hbase.security.access.Permission;
055import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher;
056import org.apache.hadoop.hbase.util.DNS;
057import org.apache.hadoop.hbase.util.OOMEChecker;
058import org.apache.hadoop.hbase.util.ReservoirSample;
059import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
060import org.apache.yetus.audience.InterfaceAudience;
061import org.apache.zookeeper.KeeperException;
062import org.slf4j.Logger;
063import org.slf4j.LoggerFactory;
064
065import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
066import org.apache.hbase.thirdparty.com.google.protobuf.Message;
067import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
068import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
069
070import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
071import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
072import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearSlowLogResponseRequest;
073import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearSlowLogResponses;
074import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SlowLogResponseRequest;
075import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SlowLogResponses;
076import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
077import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse;
078import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
079import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
080import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ClientMetaService;
081import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetActiveMasterRequest;
082import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetActiveMasterResponse;
083import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetBootstrapNodesRequest;
084import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetBootstrapNodesResponse;
085import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetClusterIdRequest;
086import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetClusterIdResponse;
087import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMastersRequest;
088import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMastersResponse;
089import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMastersResponseEntry;
090import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMetaRegionLocationsRequest;
091import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMetaRegionLocationsResponse;
092import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPayload;
093
094/**
095 * Base class for Master and RegionServer RpcServices.
096 */
097@InterfaceAudience.Private
098public abstract class HBaseRpcServicesBase<S extends HBaseServerBase<?>>
099  implements ClientMetaService.BlockingInterface, AdminService.BlockingInterface,
100  HBaseRPCErrorHandler, PriorityFunction, ConfigurationObserver {
101
102  private static final Logger LOG = LoggerFactory.getLogger(HBaseRpcServicesBase.class);
103
104  public static final String CLIENT_BOOTSTRAP_NODE_LIMIT = "hbase.client.bootstrap.node.limit";
105
106  public static final int DEFAULT_CLIENT_BOOTSTRAP_NODE_LIMIT = 10;
107
108  protected final S server;
109
110  // Server to handle client requests.
111  protected final RpcServer rpcServer;
112
113  private final InetSocketAddress isa;
114
115  protected final PriorityFunction priority;
116
117  private ClientMetaCoprocessorHost clientMetaCoprocessorHost;
118
119  private AccessChecker accessChecker;
120
121  private ZKPermissionWatcher zkPermissionWatcher;
122
123  protected HBaseRpcServicesBase(S server, String processName) throws IOException {
124    this.server = server;
125    Configuration conf = server.getConfiguration();
126    final RpcSchedulerFactory rpcSchedulerFactory;
127    try {
128      rpcSchedulerFactory = getRpcSchedulerFactoryClass(conf).asSubclass(RpcSchedulerFactory.class)
129        .getDeclaredConstructor().newInstance();
130    } catch (NoSuchMethodException | InvocationTargetException | InstantiationException
131      | IllegalAccessException e) {
132      throw new IllegalArgumentException(e);
133    }
134    String hostname = DNS.getHostname(conf, getDNSServerType());
135    int port = conf.getInt(getPortConfigName(), getDefaultPort());
136    // Creation of a HSA will force a resolve.
137    final InetSocketAddress initialIsa = new InetSocketAddress(hostname, port);
138    final InetSocketAddress bindAddress = new InetSocketAddress(getHostname(conf, hostname), port);
139    if (initialIsa.getAddress() == null) {
140      throw new IllegalArgumentException("Failed resolve of " + initialIsa);
141    }
142    priority = createPriority();
143    // Using Address means we don't get the IP too. Shorten it more even to just the host name
144    // w/o the domain.
145    final String name = processName + "/"
146      + Address.fromParts(initialIsa.getHostName(), initialIsa.getPort()).toStringWithoutDomain();
147    server.setName(name);
148    // Set how many times to retry talking to another server over Connection.
149    ConnectionUtils.setServerSideHConnectionRetriesConfig(conf, name, LOG);
150    boolean reservoirEnabled =
151      conf.getBoolean(ByteBuffAllocator.ALLOCATOR_POOL_ENABLED_KEY, defaultReservoirEnabled());
152    try {
153      // use final bindAddress for this server.
154      rpcServer = RpcServerFactory.createRpcServer(server, name, getServices(), bindAddress, conf,
155        rpcSchedulerFactory.create(conf, this, server), reservoirEnabled);
156    } catch (BindException be) {
157      throw new IOException(be.getMessage() + ". To switch ports use the '" + getPortConfigName()
158        + "' configuration property.", be.getCause() != null ? be.getCause() : be);
159    }
160    final InetSocketAddress address = rpcServer.getListenerAddress();
161    if (address == null) {
162      throw new IOException("Listener channel is closed");
163    }
164    // Set our address, however we need the final port that was given to rpcServer
165    isa = new InetSocketAddress(initialIsa.getHostName(), address.getPort());
166    rpcServer.setErrorHandler(this);
167
168    clientMetaCoprocessorHost = new ClientMetaCoprocessorHost(conf);
169  }
170
171  protected abstract boolean defaultReservoirEnabled();
172
173  protected abstract DNS.ServerType getDNSServerType();
174
175  protected abstract String getHostname(Configuration conf, String defaultHostname);
176
177  protected abstract String getPortConfigName();
178
179  protected abstract int getDefaultPort();
180
181  protected abstract PriorityFunction createPriority();
182
183  protected abstract Class<?> getRpcSchedulerFactoryClass(Configuration conf);
184
185  protected abstract List<BlockingServiceAndInterface> getServices();
186
187  protected final void internalStart(ZKWatcher zkWatcher) {
188    if (AccessChecker.isAuthorizationSupported(getConfiguration())) {
189      accessChecker = new AccessChecker(getConfiguration());
190    } else {
191      accessChecker = new NoopAccessChecker(getConfiguration());
192    }
193    zkPermissionWatcher =
194      new ZKPermissionWatcher(zkWatcher, accessChecker.getAuthManager(), getConfiguration());
195    try {
196      zkPermissionWatcher.start();
197    } catch (KeeperException e) {
198      LOG.error("ZooKeeper permission watcher initialization failed", e);
199    }
200    rpcServer.start();
201  }
202
203  protected final void requirePermission(String request, Permission.Action perm)
204    throws IOException {
205    if (accessChecker != null) {
206      accessChecker.requirePermission(RpcServer.getRequestUser().orElse(null), request, null, perm);
207    }
208  }
209
210  @RestrictedApi(explanation = "Should only be called in tests", link = "",
211      allowedOnPath = ".*/src/test/.*")
212  public ClientMetaCoprocessorHost getClientMetaCoprocessorHost() {
213    return clientMetaCoprocessorHost;
214  }
215
216  public AccessChecker getAccessChecker() {
217    return accessChecker;
218  }
219
220  public ZKPermissionWatcher getZkPermissionWatcher() {
221    return zkPermissionWatcher;
222  }
223
224  protected final void internalStop() {
225    if (zkPermissionWatcher != null) {
226      zkPermissionWatcher.close();
227    }
228    rpcServer.stop();
229  }
230
231  public Configuration getConfiguration() {
232    return server.getConfiguration();
233  }
234
235  public S getServer() {
236    return server;
237  }
238
239  public InetSocketAddress getSocketAddress() {
240    return isa;
241  }
242
243  public RpcServerInterface getRpcServer() {
244    return rpcServer;
245  }
246
247  public RpcScheduler getRpcScheduler() {
248    return rpcServer.getScheduler();
249  }
250
251  @Override
252  public int getPriority(RequestHeader header, Message param, User user) {
253    return priority.getPriority(header, param, user);
254  }
255
256  @Override
257  public long getDeadline(RequestHeader header, Message param) {
258    return priority.getDeadline(header, param);
259  }
260
261  /**
262   * Check if an OOME and, if so, abort immediately to avoid creating more objects.
263   * @return True if we OOME'd and are aborting.
264   */
265  @Override
266  public boolean checkOOME(Throwable e) {
267    return OOMEChecker.exitIfOOME(e, getClass().getSimpleName());
268  }
269
270  @Override
271  public void onConfigurationChange(Configuration conf) {
272    rpcServer.onConfigurationChange(conf);
273  }
274
275  @Override
276  public GetClusterIdResponse getClusterId(RpcController controller, GetClusterIdRequest request)
277    throws ServiceException {
278    try {
279      clientMetaCoprocessorHost.preGetClusterId();
280
281      String clusterId = server.getClusterId();
282      String clusterIdReply = clientMetaCoprocessorHost.postGetClusterId(clusterId);
283
284      return GetClusterIdResponse.newBuilder().setClusterId(clusterIdReply).build();
285    } catch (IOException e) {
286      throw new ServiceException(e);
287    }
288  }
289
290  @Override
291  public GetActiveMasterResponse getActiveMaster(RpcController controller,
292    GetActiveMasterRequest request) throws ServiceException {
293    GetActiveMasterResponse.Builder builder = GetActiveMasterResponse.newBuilder();
294
295    try {
296      clientMetaCoprocessorHost.preGetActiveMaster();
297
298      ServerName serverName = server.getActiveMaster().orElse(null);
299      ServerName serverNameReply = clientMetaCoprocessorHost.postGetActiveMaster(serverName);
300
301      if (serverNameReply != null) {
302        builder.setServerName(ProtobufUtil.toServerName(serverNameReply));
303      }
304    } catch (IOException e) {
305      throw new ServiceException(e);
306    }
307
308    return builder.build();
309  }
310
311  @Override
312  public GetMastersResponse getMasters(RpcController controller, GetMastersRequest request)
313    throws ServiceException {
314    GetMastersResponse.Builder builder = GetMastersResponse.newBuilder();
315
316    try {
317      clientMetaCoprocessorHost.preGetMasters();
318
319      Map<ServerName, Boolean> serverNames = new LinkedHashMap<>();
320
321      server.getActiveMaster().ifPresent(serverName -> serverNames.put(serverName, Boolean.TRUE));
322      server.getBackupMasters().forEach(serverName -> serverNames.put(serverName, Boolean.FALSE));
323
324      Map<ServerName, Boolean> serverNamesReply =
325        clientMetaCoprocessorHost.postGetMasters(serverNames);
326
327      serverNamesReply
328        .forEach((serverName, active) -> builder.addMasterServers(GetMastersResponseEntry
329          .newBuilder().setServerName(ProtobufUtil.toServerName(serverName)).setIsActive(active)));
330    } catch (IOException e) {
331      throw new ServiceException(e);
332    }
333
334    return builder.build();
335  }
336
337  @Override
338  public GetMetaRegionLocationsResponse getMetaRegionLocations(RpcController controller,
339    GetMetaRegionLocationsRequest request) throws ServiceException {
340    GetMetaRegionLocationsResponse.Builder builder = GetMetaRegionLocationsResponse.newBuilder();
341
342    try {
343      clientMetaCoprocessorHost.preGetMetaLocations();
344
345      List<HRegionLocation> metaLocations = server.getMetaLocations();
346      List<HRegionLocation> metaLocationsReply =
347        clientMetaCoprocessorHost.postGetMetaLocations(metaLocations);
348
349      metaLocationsReply
350        .forEach(location -> builder.addMetaLocations(ProtobufUtil.toRegionLocation(location)));
351    } catch (IOException e) {
352      throw new ServiceException(e);
353    }
354
355    return builder.build();
356  }
357
358  @Override
359  public final GetBootstrapNodesResponse getBootstrapNodes(RpcController controller,
360    GetBootstrapNodesRequest request) throws ServiceException {
361    GetBootstrapNodesResponse.Builder builder = GetBootstrapNodesResponse.newBuilder();
362
363    try {
364      clientMetaCoprocessorHost.preGetBootstrapNodes();
365
366      int maxNodeCount = server.getConfiguration().getInt(CLIENT_BOOTSTRAP_NODE_LIMIT,
367        DEFAULT_CLIENT_BOOTSTRAP_NODE_LIMIT);
368      ReservoirSample<ServerName> sample = new ReservoirSample<>(maxNodeCount);
369      sample.add(server.getBootstrapNodes());
370
371      List<ServerName> bootstrapNodes = sample.getSamplingResult();
372      List<ServerName> bootstrapNodesReply =
373        clientMetaCoprocessorHost.postGetBootstrapNodes(bootstrapNodes);
374
375      bootstrapNodesReply
376        .forEach(serverName -> builder.addServerName(ProtobufUtil.toServerName(serverName)));
377    } catch (IOException e) {
378      throw new ServiceException(e);
379    }
380
381    return builder.build();
382  }
383
384  @Override
385  @QosPriority(priority = HConstants.ADMIN_QOS)
386  public UpdateConfigurationResponse updateConfiguration(RpcController controller,
387    UpdateConfigurationRequest request) throws ServiceException {
388    try {
389      requirePermission("updateConfiguration", Permission.Action.ADMIN);
390      this.server.updateConfiguration();
391
392      clientMetaCoprocessorHost = new ClientMetaCoprocessorHost(getConfiguration());
393    } catch (Exception e) {
394      throw new ServiceException(e);
395    }
396    return UpdateConfigurationResponse.getDefaultInstance();
397  }
398
399  @Override
400  @QosPriority(priority = HConstants.ADMIN_QOS)
401  public ClearSlowLogResponses clearSlowLogsResponses(final RpcController controller,
402    final ClearSlowLogResponseRequest request) throws ServiceException {
403    try {
404      requirePermission("clearSlowLogsResponses", Permission.Action.ADMIN);
405    } catch (IOException e) {
406      throw new ServiceException(e);
407    }
408    final NamedQueueRecorder namedQueueRecorder = this.server.getNamedQueueRecorder();
409    boolean slowLogsCleaned = Optional.ofNullable(namedQueueRecorder)
410      .map(
411        queueRecorder -> queueRecorder.clearNamedQueue(NamedQueuePayload.NamedQueueEvent.SLOW_LOG))
412      .orElse(false);
413    ClearSlowLogResponses clearSlowLogResponses =
414      ClearSlowLogResponses.newBuilder().setIsCleaned(slowLogsCleaned).build();
415    return clearSlowLogResponses;
416  }
417
418  private List<SlowLogPayload> getSlowLogPayloads(SlowLogResponseRequest request,
419    NamedQueueRecorder namedQueueRecorder) {
420    if (namedQueueRecorder == null) {
421      return Collections.emptyList();
422    }
423    List<SlowLogPayload> slowLogPayloads;
424    NamedQueueGetRequest namedQueueGetRequest = new NamedQueueGetRequest();
425    namedQueueGetRequest.setNamedQueueEvent(RpcLogDetails.SLOW_LOG_EVENT);
426    namedQueueGetRequest.setSlowLogResponseRequest(request);
427    NamedQueueGetResponse namedQueueGetResponse =
428      namedQueueRecorder.getNamedQueueRecords(namedQueueGetRequest);
429    slowLogPayloads = namedQueueGetResponse != null
430      ? namedQueueGetResponse.getSlowLogPayloads()
431      : Collections.emptyList();
432    return slowLogPayloads;
433  }
434
435  @Override
436  @QosPriority(priority = HConstants.ADMIN_QOS)
437  public HBaseProtos.LogEntry getLogEntries(RpcController controller,
438    HBaseProtos.LogRequest request) throws ServiceException {
439    try {
440      final String logClassName = request.getLogClassName();
441      Class<?> logClass = Class.forName(logClassName).asSubclass(Message.class);
442      Method method = logClass.getMethod("parseFrom", ByteString.class);
443      if (logClassName.contains("SlowLogResponseRequest")) {
444        SlowLogResponseRequest slowLogResponseRequest =
445          (SlowLogResponseRequest) method.invoke(null, request.getLogMessage());
446        final NamedQueueRecorder namedQueueRecorder = this.server.getNamedQueueRecorder();
447        final List<SlowLogPayload> slowLogPayloads =
448          getSlowLogPayloads(slowLogResponseRequest, namedQueueRecorder);
449        SlowLogResponses slowLogResponses =
450          SlowLogResponses.newBuilder().addAllSlowLogPayloads(slowLogPayloads).build();
451        return HBaseProtos.LogEntry.newBuilder()
452          .setLogClassName(slowLogResponses.getClass().getName())
453          .setLogMessage(slowLogResponses.toByteString()).build();
454      }
455    } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException
456      | InvocationTargetException e) {
457      LOG.error("Error while retrieving log entries.", e);
458      throw new ServiceException(e);
459    }
460    throw new ServiceException("Invalid request params");
461  }
462}