001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import java.io.IOException; 021import java.lang.reflect.InvocationTargetException; 022import java.lang.reflect.Method; 023import java.net.BindException; 024import java.net.InetSocketAddress; 025import java.util.Collections; 026import java.util.List; 027import java.util.Optional; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.hbase.client.ConnectionUtils; 030import org.apache.hadoop.hbase.conf.ConfigurationObserver; 031import org.apache.hadoop.hbase.io.ByteBuffAllocator; 032import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler; 033import org.apache.hadoop.hbase.ipc.PriorityFunction; 034import org.apache.hadoop.hbase.ipc.QosPriority; 035import org.apache.hadoop.hbase.ipc.RpcScheduler; 036import org.apache.hadoop.hbase.ipc.RpcServer; 037import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; 038import org.apache.hadoop.hbase.ipc.RpcServerFactory; 039import org.apache.hadoop.hbase.ipc.RpcServerInterface; 040import org.apache.hadoop.hbase.namequeues.NamedQueuePayload; 041import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder; 042import org.apache.hadoop.hbase.namequeues.RpcLogDetails; 043import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest; 044import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse; 045import org.apache.hadoop.hbase.net.Address; 046import org.apache.hadoop.hbase.regionserver.RpcSchedulerFactory; 047import org.apache.hadoop.hbase.security.User; 048import org.apache.hadoop.hbase.security.access.AccessChecker; 049import org.apache.hadoop.hbase.security.access.NoopAccessChecker; 050import org.apache.hadoop.hbase.security.access.Permission; 051import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher; 052import org.apache.hadoop.hbase.util.DNS; 053import org.apache.hadoop.hbase.util.OOMEChecker; 054import org.apache.hadoop.hbase.util.ReservoirSample; 055import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 056import org.apache.yetus.audience.InterfaceAudience; 057import org.apache.zookeeper.KeeperException; 058import org.slf4j.Logger; 059import org.slf4j.LoggerFactory; 060 061import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 062import org.apache.hbase.thirdparty.com.google.protobuf.Message; 063import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 064import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 065 066import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 067import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; 068import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearSlowLogResponseRequest; 069import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearSlowLogResponses; 070import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SlowLogResponseRequest; 071import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SlowLogResponses; 072import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest; 073import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse; 074import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 075import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; 076import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ClientMetaService; 077import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetActiveMasterRequest; 078import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetActiveMasterResponse; 079import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetBootstrapNodesRequest; 080import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetBootstrapNodesResponse; 081import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetClusterIdRequest; 082import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetClusterIdResponse; 083import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMastersRequest; 084import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMastersResponse; 085import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMastersResponseEntry; 086import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMetaRegionLocationsRequest; 087import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMetaRegionLocationsResponse; 088import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPayload; 089 090/** 091 * Base class for Master and RegionServer RpcServices. 092 */ 093@InterfaceAudience.Private 094public abstract class HBaseRpcServicesBase<S extends HBaseServerBase<?>> 095 implements ClientMetaService.BlockingInterface, AdminService.BlockingInterface, 096 HBaseRPCErrorHandler, PriorityFunction, ConfigurationObserver { 097 098 private static final Logger LOG = LoggerFactory.getLogger(HBaseRpcServicesBase.class); 099 100 public static final String CLIENT_BOOTSTRAP_NODE_LIMIT = "hbase.client.bootstrap.node.limit"; 101 102 public static final int DEFAULT_CLIENT_BOOTSTRAP_NODE_LIMIT = 10; 103 104 protected final S server; 105 106 // Server to handle client requests. 107 protected final RpcServer rpcServer; 108 109 private final InetSocketAddress isa; 110 111 protected final PriorityFunction priority; 112 113 private AccessChecker accessChecker; 114 115 private ZKPermissionWatcher zkPermissionWatcher; 116 117 protected HBaseRpcServicesBase(S server, String processName) throws IOException { 118 this.server = server; 119 Configuration conf = server.getConfiguration(); 120 final RpcSchedulerFactory rpcSchedulerFactory; 121 try { 122 rpcSchedulerFactory = getRpcSchedulerFactoryClass(conf).asSubclass(RpcSchedulerFactory.class) 123 .getDeclaredConstructor().newInstance(); 124 } catch (NoSuchMethodException | InvocationTargetException | InstantiationException 125 | IllegalAccessException e) { 126 throw new IllegalArgumentException(e); 127 } 128 String hostname = DNS.getHostname(conf, getDNSServerType()); 129 int port = conf.getInt(getPortConfigName(), getDefaultPort()); 130 // Creation of a HSA will force a resolve. 131 final InetSocketAddress initialIsa = new InetSocketAddress(hostname, port); 132 final InetSocketAddress bindAddress = new InetSocketAddress(getHostname(conf, hostname), port); 133 if (initialIsa.getAddress() == null) { 134 throw new IllegalArgumentException("Failed resolve of " + initialIsa); 135 } 136 priority = createPriority(); 137 // Using Address means we don't get the IP too. Shorten it more even to just the host name 138 // w/o the domain. 139 final String name = processName + "/" 140 + Address.fromParts(initialIsa.getHostName(), initialIsa.getPort()).toStringWithoutDomain(); 141 server.setName(name); 142 // Set how many times to retry talking to another server over Connection. 143 ConnectionUtils.setServerSideHConnectionRetriesConfig(conf, name, LOG); 144 boolean reservoirEnabled = 145 conf.getBoolean(ByteBuffAllocator.ALLOCATOR_POOL_ENABLED_KEY, defaultReservoirEnabled()); 146 try { 147 // use final bindAddress for this server. 148 rpcServer = RpcServerFactory.createRpcServer(server, name, getServices(), bindAddress, conf, 149 rpcSchedulerFactory.create(conf, this, server), reservoirEnabled); 150 } catch (BindException be) { 151 throw new IOException(be.getMessage() + ". To switch ports use the '" + getPortConfigName() 152 + "' configuration property.", be.getCause() != null ? be.getCause() : be); 153 } 154 final InetSocketAddress address = rpcServer.getListenerAddress(); 155 if (address == null) { 156 throw new IOException("Listener channel is closed"); 157 } 158 // Set our address, however we need the final port that was given to rpcServer 159 isa = new InetSocketAddress(initialIsa.getHostName(), address.getPort()); 160 rpcServer.setErrorHandler(this); 161 } 162 163 protected abstract boolean defaultReservoirEnabled(); 164 165 protected abstract DNS.ServerType getDNSServerType(); 166 167 protected abstract String getHostname(Configuration conf, String defaultHostname); 168 169 protected abstract String getPortConfigName(); 170 171 protected abstract int getDefaultPort(); 172 173 protected abstract PriorityFunction createPriority(); 174 175 protected abstract Class<?> getRpcSchedulerFactoryClass(Configuration conf); 176 177 protected abstract List<BlockingServiceAndInterface> getServices(); 178 179 protected final void internalStart(ZKWatcher zkWatcher) { 180 if (AccessChecker.isAuthorizationSupported(getConfiguration())) { 181 accessChecker = new AccessChecker(getConfiguration()); 182 } else { 183 accessChecker = new NoopAccessChecker(getConfiguration()); 184 } 185 zkPermissionWatcher = 186 new ZKPermissionWatcher(zkWatcher, accessChecker.getAuthManager(), getConfiguration()); 187 try { 188 zkPermissionWatcher.start(); 189 } catch (KeeperException e) { 190 LOG.error("ZooKeeper permission watcher initialization failed", e); 191 } 192 rpcServer.start(); 193 } 194 195 protected final void requirePermission(String request, Permission.Action perm) 196 throws IOException { 197 if (accessChecker != null) { 198 accessChecker.requirePermission(RpcServer.getRequestUser().orElse(null), request, null, perm); 199 } 200 } 201 202 public AccessChecker getAccessChecker() { 203 return accessChecker; 204 } 205 206 public ZKPermissionWatcher getZkPermissionWatcher() { 207 return zkPermissionWatcher; 208 } 209 210 protected final void internalStop() { 211 if (zkPermissionWatcher != null) { 212 zkPermissionWatcher.close(); 213 } 214 rpcServer.stop(); 215 } 216 217 public Configuration getConfiguration() { 218 return server.getConfiguration(); 219 } 220 221 public S getServer() { 222 return server; 223 } 224 225 public InetSocketAddress getSocketAddress() { 226 return isa; 227 } 228 229 public RpcServerInterface getRpcServer() { 230 return rpcServer; 231 } 232 233 public RpcScheduler getRpcScheduler() { 234 return rpcServer.getScheduler(); 235 } 236 237 @Override 238 public int getPriority(RequestHeader header, Message param, User user) { 239 return priority.getPriority(header, param, user); 240 } 241 242 @Override 243 public long getDeadline(RequestHeader header, Message param) { 244 return priority.getDeadline(header, param); 245 } 246 247 /** 248 * Check if an OOME and, if so, abort immediately to avoid creating more objects. 249 * @return True if we OOME'd and are aborting. 250 */ 251 @Override 252 public boolean checkOOME(Throwable e) { 253 return OOMEChecker.exitIfOOME(e, getClass().getSimpleName()); 254 } 255 256 @Override 257 public void onConfigurationChange(Configuration conf) { 258 rpcServer.onConfigurationChange(conf); 259 } 260 261 @Override 262 public GetClusterIdResponse getClusterId(RpcController controller, GetClusterIdRequest request) 263 throws ServiceException { 264 return GetClusterIdResponse.newBuilder().setClusterId(server.getClusterId()).build(); 265 } 266 267 @Override 268 public GetActiveMasterResponse getActiveMaster(RpcController controller, 269 GetActiveMasterRequest request) throws ServiceException { 270 GetActiveMasterResponse.Builder builder = GetActiveMasterResponse.newBuilder(); 271 server.getActiveMaster() 272 .ifPresent(name -> builder.setServerName(ProtobufUtil.toServerName(name))); 273 return builder.build(); 274 } 275 276 @Override 277 public GetMastersResponse getMasters(RpcController controller, GetMastersRequest request) 278 throws ServiceException { 279 GetMastersResponse.Builder builder = GetMastersResponse.newBuilder(); 280 server.getActiveMaster() 281 .ifPresent(activeMaster -> builder.addMasterServers(GetMastersResponseEntry.newBuilder() 282 .setServerName(ProtobufUtil.toServerName(activeMaster)).setIsActive(true))); 283 server.getBackupMasters() 284 .forEach(backupMaster -> builder.addMasterServers(GetMastersResponseEntry.newBuilder() 285 .setServerName(ProtobufUtil.toServerName(backupMaster)).setIsActive(false))); 286 return builder.build(); 287 } 288 289 @Override 290 public GetMetaRegionLocationsResponse getMetaRegionLocations(RpcController controller, 291 GetMetaRegionLocationsRequest request) throws ServiceException { 292 GetMetaRegionLocationsResponse.Builder builder = GetMetaRegionLocationsResponse.newBuilder(); 293 server.getMetaLocations() 294 .forEach(location -> builder.addMetaLocations(ProtobufUtil.toRegionLocation(location))); 295 return builder.build(); 296 } 297 298 @Override 299 public final GetBootstrapNodesResponse getBootstrapNodes(RpcController controller, 300 GetBootstrapNodesRequest request) throws ServiceException { 301 int maxNodeCount = server.getConfiguration().getInt(CLIENT_BOOTSTRAP_NODE_LIMIT, 302 DEFAULT_CLIENT_BOOTSTRAP_NODE_LIMIT); 303 ReservoirSample<ServerName> sample = new ReservoirSample<>(maxNodeCount); 304 sample.add(server.getBootstrapNodes()); 305 306 GetBootstrapNodesResponse.Builder builder = GetBootstrapNodesResponse.newBuilder(); 307 sample.getSamplingResult().stream().map(ProtobufUtil::toServerName) 308 .forEach(builder::addServerName); 309 return builder.build(); 310 } 311 312 @Override 313 @QosPriority(priority = HConstants.ADMIN_QOS) 314 public UpdateConfigurationResponse updateConfiguration(RpcController controller, 315 UpdateConfigurationRequest request) throws ServiceException { 316 try { 317 requirePermission("updateConfiguration", Permission.Action.ADMIN); 318 this.server.updateConfiguration(); 319 } catch (Exception e) { 320 throw new ServiceException(e); 321 } 322 return UpdateConfigurationResponse.getDefaultInstance(); 323 } 324 325 @Override 326 @QosPriority(priority = HConstants.ADMIN_QOS) 327 public ClearSlowLogResponses clearSlowLogsResponses(final RpcController controller, 328 final ClearSlowLogResponseRequest request) throws ServiceException { 329 try { 330 requirePermission("clearSlowLogsResponses", Permission.Action.ADMIN); 331 } catch (IOException e) { 332 throw new ServiceException(e); 333 } 334 final NamedQueueRecorder namedQueueRecorder = this.server.getNamedQueueRecorder(); 335 boolean slowLogsCleaned = Optional.ofNullable(namedQueueRecorder) 336 .map( 337 queueRecorder -> queueRecorder.clearNamedQueue(NamedQueuePayload.NamedQueueEvent.SLOW_LOG)) 338 .orElse(false); 339 ClearSlowLogResponses clearSlowLogResponses = 340 ClearSlowLogResponses.newBuilder().setIsCleaned(slowLogsCleaned).build(); 341 return clearSlowLogResponses; 342 } 343 344 private List<SlowLogPayload> getSlowLogPayloads(SlowLogResponseRequest request, 345 NamedQueueRecorder namedQueueRecorder) { 346 if (namedQueueRecorder == null) { 347 return Collections.emptyList(); 348 } 349 List<SlowLogPayload> slowLogPayloads; 350 NamedQueueGetRequest namedQueueGetRequest = new NamedQueueGetRequest(); 351 namedQueueGetRequest.setNamedQueueEvent(RpcLogDetails.SLOW_LOG_EVENT); 352 namedQueueGetRequest.setSlowLogResponseRequest(request); 353 NamedQueueGetResponse namedQueueGetResponse = 354 namedQueueRecorder.getNamedQueueRecords(namedQueueGetRequest); 355 slowLogPayloads = namedQueueGetResponse != null 356 ? namedQueueGetResponse.getSlowLogPayloads() 357 : Collections.emptyList(); 358 return slowLogPayloads; 359 } 360 361 @Override 362 @QosPriority(priority = HConstants.ADMIN_QOS) 363 public HBaseProtos.LogEntry getLogEntries(RpcController controller, 364 HBaseProtos.LogRequest request) throws ServiceException { 365 try { 366 final String logClassName = request.getLogClassName(); 367 Class<?> logClass = Class.forName(logClassName).asSubclass(Message.class); 368 Method method = logClass.getMethod("parseFrom", ByteString.class); 369 if (logClassName.contains("SlowLogResponseRequest")) { 370 SlowLogResponseRequest slowLogResponseRequest = 371 (SlowLogResponseRequest) method.invoke(null, request.getLogMessage()); 372 final NamedQueueRecorder namedQueueRecorder = this.server.getNamedQueueRecorder(); 373 final List<SlowLogPayload> slowLogPayloads = 374 getSlowLogPayloads(slowLogResponseRequest, namedQueueRecorder); 375 SlowLogResponses slowLogResponses = 376 SlowLogResponses.newBuilder().addAllSlowLogPayloads(slowLogPayloads).build(); 377 return HBaseProtos.LogEntry.newBuilder() 378 .setLogClassName(slowLogResponses.getClass().getName()) 379 .setLogMessage(slowLogResponses.toByteString()).build(); 380 } 381 } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException 382 | InvocationTargetException e) { 383 LOG.error("Error while retrieving log entries.", e); 384 throw new ServiceException(e); 385 } 386 throw new ServiceException("Invalid request params"); 387 } 388}