001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import com.google.errorprone.annotations.RestrictedApi; 021import java.io.IOException; 022import java.lang.reflect.InvocationTargetException; 023import java.lang.reflect.Method; 024import java.net.BindException; 025import java.net.InetSocketAddress; 026import java.util.Collections; 027import java.util.LinkedHashMap; 028import java.util.List; 029import java.util.Map; 030import java.util.Optional; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.hbase.client.ConnectionUtils; 033import org.apache.hadoop.hbase.conf.ConfigurationObserver; 034import org.apache.hadoop.hbase.coprocessor.ClientMetaCoprocessorHost; 035import org.apache.hadoop.hbase.io.ByteBuffAllocator; 036import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler; 037import org.apache.hadoop.hbase.ipc.PriorityFunction; 038import org.apache.hadoop.hbase.ipc.QosPriority; 039import org.apache.hadoop.hbase.ipc.RpcScheduler; 040import org.apache.hadoop.hbase.ipc.RpcServer; 041import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; 042import org.apache.hadoop.hbase.ipc.RpcServerFactory; 043import org.apache.hadoop.hbase.ipc.RpcServerInterface; 044import org.apache.hadoop.hbase.namequeues.NamedQueuePayload; 045import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder; 046import org.apache.hadoop.hbase.namequeues.RpcLogDetails; 047import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest; 048import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse; 049import org.apache.hadoop.hbase.net.Address; 050import org.apache.hadoop.hbase.regionserver.RpcSchedulerFactory; 051import org.apache.hadoop.hbase.security.User; 052import org.apache.hadoop.hbase.security.access.AccessChecker; 053import org.apache.hadoop.hbase.security.access.NoopAccessChecker; 054import org.apache.hadoop.hbase.security.access.Permission; 055import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher; 056import org.apache.hadoop.hbase.util.DNS; 057import org.apache.hadoop.hbase.util.OOMEChecker; 058import org.apache.hadoop.hbase.util.ReservoirSample; 059import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 060import org.apache.yetus.audience.InterfaceAudience; 061import org.apache.zookeeper.KeeperException; 062import org.slf4j.Logger; 063import org.slf4j.LoggerFactory; 064 065import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 066import org.apache.hbase.thirdparty.com.google.protobuf.Message; 067import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 068import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 069 070import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 071import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; 072import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearSlowLogResponseRequest; 073import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearSlowLogResponses; 074import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SlowLogResponseRequest; 075import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SlowLogResponses; 076import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest; 077import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse; 078import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 079import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; 080import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ClientMetaService; 081import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetActiveMasterRequest; 082import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetActiveMasterResponse; 083import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetBootstrapNodesRequest; 084import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetBootstrapNodesResponse; 085import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetClusterIdRequest; 086import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetClusterIdResponse; 087import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMastersRequest; 088import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMastersResponse; 089import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMastersResponseEntry; 090import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMetaRegionLocationsRequest; 091import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMetaRegionLocationsResponse; 092import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPayload; 093 094/** 095 * Base class for Master and RegionServer RpcServices. 096 */ 097@InterfaceAudience.Private 098public abstract class HBaseRpcServicesBase<S extends HBaseServerBase<?>> 099 implements ClientMetaService.BlockingInterface, AdminService.BlockingInterface, 100 HBaseRPCErrorHandler, PriorityFunction, ConfigurationObserver { 101 102 private static final Logger LOG = LoggerFactory.getLogger(HBaseRpcServicesBase.class); 103 104 public static final String CLIENT_BOOTSTRAP_NODE_LIMIT = "hbase.client.bootstrap.node.limit"; 105 106 public static final int DEFAULT_CLIENT_BOOTSTRAP_NODE_LIMIT = 10; 107 108 protected final S server; 109 110 // Server to handle client requests. 111 protected final RpcServer rpcServer; 112 113 private final InetSocketAddress isa; 114 115 protected final PriorityFunction priority; 116 117 private ClientMetaCoprocessorHost clientMetaCoprocessorHost; 118 119 private AccessChecker accessChecker; 120 121 private ZKPermissionWatcher zkPermissionWatcher; 122 123 protected HBaseRpcServicesBase(S server, String processName) throws IOException { 124 this.server = server; 125 Configuration conf = server.getConfiguration(); 126 final RpcSchedulerFactory rpcSchedulerFactory; 127 try { 128 rpcSchedulerFactory = getRpcSchedulerFactoryClass(conf).asSubclass(RpcSchedulerFactory.class) 129 .getDeclaredConstructor().newInstance(); 130 } catch (NoSuchMethodException | InvocationTargetException | InstantiationException 131 | IllegalAccessException e) { 132 throw new IllegalArgumentException(e); 133 } 134 String hostname = DNS.getHostname(conf, getDNSServerType()); 135 int port = conf.getInt(getPortConfigName(), getDefaultPort()); 136 // Creation of a HSA will force a resolve. 137 final InetSocketAddress initialIsa = new InetSocketAddress(hostname, port); 138 final InetSocketAddress bindAddress = new InetSocketAddress(getHostname(conf, hostname), port); 139 if (initialIsa.getAddress() == null) { 140 throw new IllegalArgumentException("Failed resolve of " + initialIsa); 141 } 142 priority = createPriority(); 143 // Using Address means we don't get the IP too. Shorten it more even to just the host name 144 // w/o the domain. 145 final String name = processName + "/" 146 + Address.fromParts(initialIsa.getHostName(), initialIsa.getPort()).toStringWithoutDomain(); 147 server.setName(name); 148 // Set how many times to retry talking to another server over Connection. 149 ConnectionUtils.setServerSideHConnectionRetriesConfig(conf, name, LOG); 150 boolean reservoirEnabled = 151 conf.getBoolean(ByteBuffAllocator.ALLOCATOR_POOL_ENABLED_KEY, defaultReservoirEnabled()); 152 try { 153 // use final bindAddress for this server. 154 rpcServer = RpcServerFactory.createRpcServer(server, name, getServices(), bindAddress, conf, 155 rpcSchedulerFactory.create(conf, this, server), reservoirEnabled); 156 } catch (BindException be) { 157 throw new IOException(be.getMessage() + ". To switch ports use the '" + getPortConfigName() 158 + "' configuration property.", be.getCause() != null ? be.getCause() : be); 159 } 160 final InetSocketAddress address = rpcServer.getListenerAddress(); 161 if (address == null) { 162 throw new IOException("Listener channel is closed"); 163 } 164 // Set our address, however we need the final port that was given to rpcServer 165 isa = new InetSocketAddress(initialIsa.getHostName(), address.getPort()); 166 rpcServer.setErrorHandler(this); 167 168 clientMetaCoprocessorHost = new ClientMetaCoprocessorHost(conf); 169 } 170 171 protected abstract boolean defaultReservoirEnabled(); 172 173 protected abstract DNS.ServerType getDNSServerType(); 174 175 protected abstract String getHostname(Configuration conf, String defaultHostname); 176 177 protected abstract String getPortConfigName(); 178 179 protected abstract int getDefaultPort(); 180 181 protected abstract PriorityFunction createPriority(); 182 183 protected abstract Class<?> getRpcSchedulerFactoryClass(Configuration conf); 184 185 protected abstract List<BlockingServiceAndInterface> getServices(); 186 187 protected final void internalStart(ZKWatcher zkWatcher) { 188 if (AccessChecker.isAuthorizationSupported(getConfiguration())) { 189 accessChecker = new AccessChecker(getConfiguration()); 190 } else { 191 accessChecker = new NoopAccessChecker(getConfiguration()); 192 } 193 zkPermissionWatcher = 194 new ZKPermissionWatcher(zkWatcher, accessChecker.getAuthManager(), getConfiguration()); 195 try { 196 zkPermissionWatcher.start(); 197 } catch (KeeperException e) { 198 LOG.error("ZooKeeper permission watcher initialization failed", e); 199 } 200 rpcServer.start(); 201 } 202 203 protected final void requirePermission(String request, Permission.Action perm) 204 throws IOException { 205 if (accessChecker != null) { 206 accessChecker.requirePermission(RpcServer.getRequestUser().orElse(null), request, null, perm); 207 } 208 } 209 210 @RestrictedApi(explanation = "Should only be called in tests", link = "", 211 allowedOnPath = ".*/src/test/.*") 212 public ClientMetaCoprocessorHost getClientMetaCoprocessorHost() { 213 return clientMetaCoprocessorHost; 214 } 215 216 public AccessChecker getAccessChecker() { 217 return accessChecker; 218 } 219 220 public ZKPermissionWatcher getZkPermissionWatcher() { 221 return zkPermissionWatcher; 222 } 223 224 protected final void internalStop() { 225 if (zkPermissionWatcher != null) { 226 zkPermissionWatcher.close(); 227 } 228 rpcServer.stop(); 229 } 230 231 public Configuration getConfiguration() { 232 return server.getConfiguration(); 233 } 234 235 public S getServer() { 236 return server; 237 } 238 239 public InetSocketAddress getSocketAddress() { 240 return isa; 241 } 242 243 public RpcServerInterface getRpcServer() { 244 return rpcServer; 245 } 246 247 public RpcScheduler getRpcScheduler() { 248 return rpcServer.getScheduler(); 249 } 250 251 @Override 252 public int getPriority(RequestHeader header, Message param, User user) { 253 return priority.getPriority(header, param, user); 254 } 255 256 @Override 257 public long getDeadline(RequestHeader header, Message param) { 258 return priority.getDeadline(header, param); 259 } 260 261 /** 262 * Check if an OOME and, if so, abort immediately to avoid creating more objects. 263 * @return True if we OOME'd and are aborting. 264 */ 265 @Override 266 public boolean checkOOME(Throwable e) { 267 return OOMEChecker.exitIfOOME(e, getClass().getSimpleName()); 268 } 269 270 @Override 271 public void onConfigurationChange(Configuration conf) { 272 rpcServer.onConfigurationChange(conf); 273 } 274 275 @Override 276 public GetClusterIdResponse getClusterId(RpcController controller, GetClusterIdRequest request) 277 throws ServiceException { 278 try { 279 clientMetaCoprocessorHost.preGetClusterId(); 280 281 String clusterId = server.getClusterId(); 282 String clusterIdReply = clientMetaCoprocessorHost.postGetClusterId(clusterId); 283 284 return GetClusterIdResponse.newBuilder().setClusterId(clusterIdReply).build(); 285 } catch (IOException e) { 286 throw new ServiceException(e); 287 } 288 } 289 290 @Override 291 public GetActiveMasterResponse getActiveMaster(RpcController controller, 292 GetActiveMasterRequest request) throws ServiceException { 293 GetActiveMasterResponse.Builder builder = GetActiveMasterResponse.newBuilder(); 294 295 try { 296 clientMetaCoprocessorHost.preGetActiveMaster(); 297 298 ServerName serverName = server.getActiveMaster().orElse(null); 299 ServerName serverNameReply = clientMetaCoprocessorHost.postGetActiveMaster(serverName); 300 301 if (serverNameReply != null) { 302 builder.setServerName(ProtobufUtil.toServerName(serverNameReply)); 303 } 304 } catch (IOException e) { 305 throw new ServiceException(e); 306 } 307 308 return builder.build(); 309 } 310 311 @Override 312 public GetMastersResponse getMasters(RpcController controller, GetMastersRequest request) 313 throws ServiceException { 314 GetMastersResponse.Builder builder = GetMastersResponse.newBuilder(); 315 316 try { 317 clientMetaCoprocessorHost.preGetMasters(); 318 319 Map<ServerName, Boolean> serverNames = new LinkedHashMap<>(); 320 321 server.getActiveMaster().ifPresent(serverName -> serverNames.put(serverName, Boolean.TRUE)); 322 server.getBackupMasters().forEach(serverName -> serverNames.put(serverName, Boolean.FALSE)); 323 324 Map<ServerName, Boolean> serverNamesReply = 325 clientMetaCoprocessorHost.postGetMasters(serverNames); 326 327 serverNamesReply 328 .forEach((serverName, active) -> builder.addMasterServers(GetMastersResponseEntry 329 .newBuilder().setServerName(ProtobufUtil.toServerName(serverName)).setIsActive(active))); 330 } catch (IOException e) { 331 throw new ServiceException(e); 332 } 333 334 return builder.build(); 335 } 336 337 @Override 338 public GetMetaRegionLocationsResponse getMetaRegionLocations(RpcController controller, 339 GetMetaRegionLocationsRequest request) throws ServiceException { 340 GetMetaRegionLocationsResponse.Builder builder = GetMetaRegionLocationsResponse.newBuilder(); 341 342 try { 343 clientMetaCoprocessorHost.preGetMetaLocations(); 344 345 List<HRegionLocation> metaLocations = server.getMetaLocations(); 346 List<HRegionLocation> metaLocationsReply = 347 clientMetaCoprocessorHost.postGetMetaLocations(metaLocations); 348 349 metaLocationsReply 350 .forEach(location -> builder.addMetaLocations(ProtobufUtil.toRegionLocation(location))); 351 } catch (IOException e) { 352 throw new ServiceException(e); 353 } 354 355 return builder.build(); 356 } 357 358 @Override 359 public final GetBootstrapNodesResponse getBootstrapNodes(RpcController controller, 360 GetBootstrapNodesRequest request) throws ServiceException { 361 GetBootstrapNodesResponse.Builder builder = GetBootstrapNodesResponse.newBuilder(); 362 363 try { 364 clientMetaCoprocessorHost.preGetBootstrapNodes(); 365 366 int maxNodeCount = server.getConfiguration().getInt(CLIENT_BOOTSTRAP_NODE_LIMIT, 367 DEFAULT_CLIENT_BOOTSTRAP_NODE_LIMIT); 368 ReservoirSample<ServerName> sample = new ReservoirSample<>(maxNodeCount); 369 sample.add(server.getBootstrapNodes()); 370 371 List<ServerName> bootstrapNodes = sample.getSamplingResult(); 372 List<ServerName> bootstrapNodesReply = 373 clientMetaCoprocessorHost.postGetBootstrapNodes(bootstrapNodes); 374 375 bootstrapNodesReply 376 .forEach(serverName -> builder.addServerName(ProtobufUtil.toServerName(serverName))); 377 } catch (IOException e) { 378 throw new ServiceException(e); 379 } 380 381 return builder.build(); 382 } 383 384 @Override 385 @QosPriority(priority = HConstants.ADMIN_QOS) 386 public UpdateConfigurationResponse updateConfiguration(RpcController controller, 387 UpdateConfigurationRequest request) throws ServiceException { 388 try { 389 requirePermission("updateConfiguration", Permission.Action.ADMIN); 390 this.server.updateConfiguration(); 391 392 clientMetaCoprocessorHost = new ClientMetaCoprocessorHost(getConfiguration()); 393 } catch (Exception e) { 394 throw new ServiceException(e); 395 } 396 return UpdateConfigurationResponse.getDefaultInstance(); 397 } 398 399 @Override 400 @QosPriority(priority = HConstants.ADMIN_QOS) 401 public ClearSlowLogResponses clearSlowLogsResponses(final RpcController controller, 402 final ClearSlowLogResponseRequest request) throws ServiceException { 403 try { 404 requirePermission("clearSlowLogsResponses", Permission.Action.ADMIN); 405 } catch (IOException e) { 406 throw new ServiceException(e); 407 } 408 final NamedQueueRecorder namedQueueRecorder = this.server.getNamedQueueRecorder(); 409 boolean slowLogsCleaned = Optional.ofNullable(namedQueueRecorder) 410 .map( 411 queueRecorder -> queueRecorder.clearNamedQueue(NamedQueuePayload.NamedQueueEvent.SLOW_LOG)) 412 .orElse(false); 413 ClearSlowLogResponses clearSlowLogResponses = 414 ClearSlowLogResponses.newBuilder().setIsCleaned(slowLogsCleaned).build(); 415 return clearSlowLogResponses; 416 } 417 418 private List<SlowLogPayload> getSlowLogPayloads(SlowLogResponseRequest request, 419 NamedQueueRecorder namedQueueRecorder) { 420 if (namedQueueRecorder == null) { 421 return Collections.emptyList(); 422 } 423 List<SlowLogPayload> slowLogPayloads; 424 NamedQueueGetRequest namedQueueGetRequest = new NamedQueueGetRequest(); 425 namedQueueGetRequest.setNamedQueueEvent(RpcLogDetails.SLOW_LOG_EVENT); 426 namedQueueGetRequest.setSlowLogResponseRequest(request); 427 NamedQueueGetResponse namedQueueGetResponse = 428 namedQueueRecorder.getNamedQueueRecords(namedQueueGetRequest); 429 slowLogPayloads = namedQueueGetResponse != null 430 ? namedQueueGetResponse.getSlowLogPayloads() 431 : Collections.emptyList(); 432 return slowLogPayloads; 433 } 434 435 @Override 436 @QosPriority(priority = HConstants.ADMIN_QOS) 437 public HBaseProtos.LogEntry getLogEntries(RpcController controller, 438 HBaseProtos.LogRequest request) throws ServiceException { 439 try { 440 final String logClassName = request.getLogClassName(); 441 Class<?> logClass = Class.forName(logClassName).asSubclass(Message.class); 442 Method method = logClass.getMethod("parseFrom", ByteString.class); 443 if (logClassName.contains("SlowLogResponseRequest")) { 444 SlowLogResponseRequest slowLogResponseRequest = 445 (SlowLogResponseRequest) method.invoke(null, request.getLogMessage()); 446 final NamedQueueRecorder namedQueueRecorder = this.server.getNamedQueueRecorder(); 447 final List<SlowLogPayload> slowLogPayloads = 448 getSlowLogPayloads(slowLogResponseRequest, namedQueueRecorder); 449 SlowLogResponses slowLogResponses = 450 SlowLogResponses.newBuilder().addAllSlowLogPayloads(slowLogPayloads).build(); 451 return HBaseProtos.LogEntry.newBuilder() 452 .setLogClassName(slowLogResponses.getClass().getName()) 453 .setLogMessage(slowLogResponses.toByteString()).build(); 454 } 455 } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException 456 | InvocationTargetException e) { 457 LOG.error("Error while retrieving log entries.", e); 458 throw new ServiceException(e); 459 } 460 throw new ServiceException("Invalid request params"); 461 } 462}