001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import java.io.IOException;
021import java.lang.reflect.InvocationTargetException;
022import java.lang.reflect.Method;
023import java.net.BindException;
024import java.net.InetSocketAddress;
025import java.util.Collections;
026import java.util.List;
027import java.util.Optional;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.client.ConnectionUtils;
030import org.apache.hadoop.hbase.conf.ConfigurationObserver;
031import org.apache.hadoop.hbase.io.ByteBuffAllocator;
032import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
033import org.apache.hadoop.hbase.ipc.PriorityFunction;
034import org.apache.hadoop.hbase.ipc.QosPriority;
035import org.apache.hadoop.hbase.ipc.RpcScheduler;
036import org.apache.hadoop.hbase.ipc.RpcServer;
037import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
038import org.apache.hadoop.hbase.ipc.RpcServerFactory;
039import org.apache.hadoop.hbase.ipc.RpcServerInterface;
040import org.apache.hadoop.hbase.namequeues.NamedQueuePayload;
041import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
042import org.apache.hadoop.hbase.namequeues.RpcLogDetails;
043import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
044import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
045import org.apache.hadoop.hbase.net.Address;
046import org.apache.hadoop.hbase.regionserver.RpcSchedulerFactory;
047import org.apache.hadoop.hbase.security.User;
048import org.apache.hadoop.hbase.security.access.AccessChecker;
049import org.apache.hadoop.hbase.security.access.NoopAccessChecker;
050import org.apache.hadoop.hbase.security.access.Permission;
051import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher;
052import org.apache.hadoop.hbase.util.DNS;
053import org.apache.hadoop.hbase.util.OOMEChecker;
054import org.apache.hadoop.hbase.util.ReservoirSample;
055import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
056import org.apache.yetus.audience.InterfaceAudience;
057import org.apache.zookeeper.KeeperException;
058import org.slf4j.Logger;
059import org.slf4j.LoggerFactory;
060
061import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
062import org.apache.hbase.thirdparty.com.google.protobuf.Message;
063import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
064import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
065
066import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
067import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
068import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearSlowLogResponseRequest;
069import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearSlowLogResponses;
070import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SlowLogResponseRequest;
071import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SlowLogResponses;
072import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
073import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse;
074import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
075import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
076import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ClientMetaService;
077import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetActiveMasterRequest;
078import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetActiveMasterResponse;
079import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetBootstrapNodesRequest;
080import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetBootstrapNodesResponse;
081import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetClusterIdRequest;
082import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetClusterIdResponse;
083import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMastersRequest;
084import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMastersResponse;
085import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMastersResponseEntry;
086import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMetaRegionLocationsRequest;
087import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMetaRegionLocationsResponse;
088import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPayload;
089
090/**
091 * Base class for Master and RegionServer RpcServices.
092 */
093@InterfaceAudience.Private
094public abstract class HBaseRpcServicesBase<S extends HBaseServerBase<?>>
095  implements ClientMetaService.BlockingInterface, AdminService.BlockingInterface,
096  HBaseRPCErrorHandler, PriorityFunction, ConfigurationObserver {
097
098  private static final Logger LOG = LoggerFactory.getLogger(HBaseRpcServicesBase.class);
099
100  public static final String CLIENT_BOOTSTRAP_NODE_LIMIT = "hbase.client.bootstrap.node.limit";
101
102  public static final int DEFAULT_CLIENT_BOOTSTRAP_NODE_LIMIT = 10;
103
104  protected final S server;
105
106  // Server to handle client requests.
107  protected final RpcServer rpcServer;
108
109  private final InetSocketAddress isa;
110
111  protected final PriorityFunction priority;
112
113  private AccessChecker accessChecker;
114
115  private ZKPermissionWatcher zkPermissionWatcher;
116
117  protected HBaseRpcServicesBase(S server, String processName) throws IOException {
118    this.server = server;
119    Configuration conf = server.getConfiguration();
120    final RpcSchedulerFactory rpcSchedulerFactory;
121    try {
122      rpcSchedulerFactory = getRpcSchedulerFactoryClass(conf).asSubclass(RpcSchedulerFactory.class)
123        .getDeclaredConstructor().newInstance();
124    } catch (NoSuchMethodException | InvocationTargetException | InstantiationException
125      | IllegalAccessException e) {
126      throw new IllegalArgumentException(e);
127    }
128    String hostname = DNS.getHostname(conf, getDNSServerType());
129    int port = conf.getInt(getPortConfigName(), getDefaultPort());
130    // Creation of a HSA will force a resolve.
131    final InetSocketAddress initialIsa = new InetSocketAddress(hostname, port);
132    final InetSocketAddress bindAddress = new InetSocketAddress(getHostname(conf, hostname), port);
133    if (initialIsa.getAddress() == null) {
134      throw new IllegalArgumentException("Failed resolve of " + initialIsa);
135    }
136    priority = createPriority();
137    // Using Address means we don't get the IP too. Shorten it more even to just the host name
138    // w/o the domain.
139    final String name = processName + "/"
140      + Address.fromParts(initialIsa.getHostName(), initialIsa.getPort()).toStringWithoutDomain();
141    server.setName(name);
142    // Set how many times to retry talking to another server over Connection.
143    ConnectionUtils.setServerSideHConnectionRetriesConfig(conf, name, LOG);
144    boolean reservoirEnabled =
145      conf.getBoolean(ByteBuffAllocator.ALLOCATOR_POOL_ENABLED_KEY, defaultReservoirEnabled());
146    try {
147      // use final bindAddress for this server.
148      rpcServer = RpcServerFactory.createRpcServer(server, name, getServices(), bindAddress, conf,
149        rpcSchedulerFactory.create(conf, this, server), reservoirEnabled);
150    } catch (BindException be) {
151      throw new IOException(be.getMessage() + ". To switch ports use the '" + getPortConfigName()
152        + "' configuration property.", be.getCause() != null ? be.getCause() : be);
153    }
154    final InetSocketAddress address = rpcServer.getListenerAddress();
155    if (address == null) {
156      throw new IOException("Listener channel is closed");
157    }
158    // Set our address, however we need the final port that was given to rpcServer
159    isa = new InetSocketAddress(initialIsa.getHostName(), address.getPort());
160    rpcServer.setErrorHandler(this);
161  }
162
163  protected abstract boolean defaultReservoirEnabled();
164
165  protected abstract DNS.ServerType getDNSServerType();
166
167  protected abstract String getHostname(Configuration conf, String defaultHostname);
168
169  protected abstract String getPortConfigName();
170
171  protected abstract int getDefaultPort();
172
173  protected abstract PriorityFunction createPriority();
174
175  protected abstract Class<?> getRpcSchedulerFactoryClass(Configuration conf);
176
177  protected abstract List<BlockingServiceAndInterface> getServices();
178
179  protected final void internalStart(ZKWatcher zkWatcher) {
180    if (AccessChecker.isAuthorizationSupported(getConfiguration())) {
181      accessChecker = new AccessChecker(getConfiguration());
182    } else {
183      accessChecker = new NoopAccessChecker(getConfiguration());
184    }
185    zkPermissionWatcher =
186      new ZKPermissionWatcher(zkWatcher, accessChecker.getAuthManager(), getConfiguration());
187    try {
188      zkPermissionWatcher.start();
189    } catch (KeeperException e) {
190      LOG.error("ZooKeeper permission watcher initialization failed", e);
191    }
192    rpcServer.start();
193  }
194
195  protected final void requirePermission(String request, Permission.Action perm)
196    throws IOException {
197    if (accessChecker != null) {
198      accessChecker.requirePermission(RpcServer.getRequestUser().orElse(null), request, null, perm);
199    }
200  }
201
202  public AccessChecker getAccessChecker() {
203    return accessChecker;
204  }
205
206  public ZKPermissionWatcher getZkPermissionWatcher() {
207    return zkPermissionWatcher;
208  }
209
210  protected final void internalStop() {
211    if (zkPermissionWatcher != null) {
212      zkPermissionWatcher.close();
213    }
214    rpcServer.stop();
215  }
216
217  public Configuration getConfiguration() {
218    return server.getConfiguration();
219  }
220
221  public S getServer() {
222    return server;
223  }
224
225  public InetSocketAddress getSocketAddress() {
226    return isa;
227  }
228
229  public RpcServerInterface getRpcServer() {
230    return rpcServer;
231  }
232
233  public RpcScheduler getRpcScheduler() {
234    return rpcServer.getScheduler();
235  }
236
237  @Override
238  public int getPriority(RequestHeader header, Message param, User user) {
239    return priority.getPriority(header, param, user);
240  }
241
242  @Override
243  public long getDeadline(RequestHeader header, Message param) {
244    return priority.getDeadline(header, param);
245  }
246
247  /**
248   * Check if an OOME and, if so, abort immediately to avoid creating more objects.
249   * @return True if we OOME'd and are aborting.
250   */
251  @Override
252  public boolean checkOOME(Throwable e) {
253    return OOMEChecker.exitIfOOME(e, getClass().getSimpleName());
254  }
255
256  @Override
257  public void onConfigurationChange(Configuration conf) {
258    rpcServer.onConfigurationChange(conf);
259  }
260
261  @Override
262  public GetClusterIdResponse getClusterId(RpcController controller, GetClusterIdRequest request)
263    throws ServiceException {
264    return GetClusterIdResponse.newBuilder().setClusterId(server.getClusterId()).build();
265  }
266
267  @Override
268  public GetActiveMasterResponse getActiveMaster(RpcController controller,
269    GetActiveMasterRequest request) throws ServiceException {
270    GetActiveMasterResponse.Builder builder = GetActiveMasterResponse.newBuilder();
271    server.getActiveMaster()
272      .ifPresent(name -> builder.setServerName(ProtobufUtil.toServerName(name)));
273    return builder.build();
274  }
275
276  @Override
277  public GetMastersResponse getMasters(RpcController controller, GetMastersRequest request)
278    throws ServiceException {
279    GetMastersResponse.Builder builder = GetMastersResponse.newBuilder();
280    server.getActiveMaster()
281      .ifPresent(activeMaster -> builder.addMasterServers(GetMastersResponseEntry.newBuilder()
282        .setServerName(ProtobufUtil.toServerName(activeMaster)).setIsActive(true)));
283    server.getBackupMasters()
284      .forEach(backupMaster -> builder.addMasterServers(GetMastersResponseEntry.newBuilder()
285        .setServerName(ProtobufUtil.toServerName(backupMaster)).setIsActive(false)));
286    return builder.build();
287  }
288
289  @Override
290  public GetMetaRegionLocationsResponse getMetaRegionLocations(RpcController controller,
291    GetMetaRegionLocationsRequest request) throws ServiceException {
292    GetMetaRegionLocationsResponse.Builder builder = GetMetaRegionLocationsResponse.newBuilder();
293    server.getMetaLocations()
294      .forEach(location -> builder.addMetaLocations(ProtobufUtil.toRegionLocation(location)));
295    return builder.build();
296  }
297
298  @Override
299  public final GetBootstrapNodesResponse getBootstrapNodes(RpcController controller,
300    GetBootstrapNodesRequest request) throws ServiceException {
301    int maxNodeCount = server.getConfiguration().getInt(CLIENT_BOOTSTRAP_NODE_LIMIT,
302      DEFAULT_CLIENT_BOOTSTRAP_NODE_LIMIT);
303    ReservoirSample<ServerName> sample = new ReservoirSample<>(maxNodeCount);
304    sample.add(server.getBootstrapNodes());
305
306    GetBootstrapNodesResponse.Builder builder = GetBootstrapNodesResponse.newBuilder();
307    sample.getSamplingResult().stream().map(ProtobufUtil::toServerName)
308      .forEach(builder::addServerName);
309    return builder.build();
310  }
311
312  @Override
313  @QosPriority(priority = HConstants.ADMIN_QOS)
314  public UpdateConfigurationResponse updateConfiguration(RpcController controller,
315    UpdateConfigurationRequest request) throws ServiceException {
316    try {
317      requirePermission("updateConfiguration", Permission.Action.ADMIN);
318      this.server.updateConfiguration();
319    } catch (Exception e) {
320      throw new ServiceException(e);
321    }
322    return UpdateConfigurationResponse.getDefaultInstance();
323  }
324
325  @Override
326  @QosPriority(priority = HConstants.ADMIN_QOS)
327  public ClearSlowLogResponses clearSlowLogsResponses(final RpcController controller,
328    final ClearSlowLogResponseRequest request) throws ServiceException {
329    try {
330      requirePermission("clearSlowLogsResponses", Permission.Action.ADMIN);
331    } catch (IOException e) {
332      throw new ServiceException(e);
333    }
334    final NamedQueueRecorder namedQueueRecorder = this.server.getNamedQueueRecorder();
335    boolean slowLogsCleaned = Optional.ofNullable(namedQueueRecorder)
336      .map(
337        queueRecorder -> queueRecorder.clearNamedQueue(NamedQueuePayload.NamedQueueEvent.SLOW_LOG))
338      .orElse(false);
339    ClearSlowLogResponses clearSlowLogResponses =
340      ClearSlowLogResponses.newBuilder().setIsCleaned(slowLogsCleaned).build();
341    return clearSlowLogResponses;
342  }
343
344  private List<SlowLogPayload> getSlowLogPayloads(SlowLogResponseRequest request,
345    NamedQueueRecorder namedQueueRecorder) {
346    if (namedQueueRecorder == null) {
347      return Collections.emptyList();
348    }
349    List<SlowLogPayload> slowLogPayloads;
350    NamedQueueGetRequest namedQueueGetRequest = new NamedQueueGetRequest();
351    namedQueueGetRequest.setNamedQueueEvent(RpcLogDetails.SLOW_LOG_EVENT);
352    namedQueueGetRequest.setSlowLogResponseRequest(request);
353    NamedQueueGetResponse namedQueueGetResponse =
354      namedQueueRecorder.getNamedQueueRecords(namedQueueGetRequest);
355    slowLogPayloads = namedQueueGetResponse != null
356      ? namedQueueGetResponse.getSlowLogPayloads()
357      : Collections.emptyList();
358    return slowLogPayloads;
359  }
360
361  @Override
362  @QosPriority(priority = HConstants.ADMIN_QOS)
363  public HBaseProtos.LogEntry getLogEntries(RpcController controller,
364    HBaseProtos.LogRequest request) throws ServiceException {
365    try {
366      final String logClassName = request.getLogClassName();
367      Class<?> logClass = Class.forName(logClassName).asSubclass(Message.class);
368      Method method = logClass.getMethod("parseFrom", ByteString.class);
369      if (logClassName.contains("SlowLogResponseRequest")) {
370        SlowLogResponseRequest slowLogResponseRequest =
371          (SlowLogResponseRequest) method.invoke(null, request.getLogMessage());
372        final NamedQueueRecorder namedQueueRecorder = this.server.getNamedQueueRecorder();
373        final List<SlowLogPayload> slowLogPayloads =
374          getSlowLogPayloads(slowLogResponseRequest, namedQueueRecorder);
375        SlowLogResponses slowLogResponses =
376          SlowLogResponses.newBuilder().addAllSlowLogPayloads(slowLogPayloads).build();
377        return HBaseProtos.LogEntry.newBuilder()
378          .setLogClassName(slowLogResponses.getClass().getName())
379          .setLogMessage(slowLogResponses.toByteString()).build();
380      }
381    } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException
382      | InvocationTargetException e) {
383      LOG.error("Error while retrieving log entries.", e);
384      throw new ServiceException(e);
385    }
386    throw new ServiceException("Invalid request params");
387  }
388}