001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.procedure; 019 020import java.io.IOException; 021import java.lang.Thread.UncaughtExceptionHandler; 022import java.util.List; 023import java.util.Set; 024import java.util.concurrent.TimeUnit; 025import org.apache.hadoop.hbase.CallQueueTooBigException; 026import org.apache.hadoop.hbase.DoNotRetryIOException; 027import org.apache.hadoop.hbase.ServerName; 028import org.apache.hadoop.hbase.client.RegionInfo; 029import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; 030import org.apache.hadoop.hbase.master.MasterServices; 031import org.apache.hadoop.hbase.master.ServerListener; 032import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher; 033import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException; 034import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; 035import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 036import org.apache.hadoop.ipc.RemoteException; 037import org.apache.yetus.audience.InterfaceAudience; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040 041import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 042import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap; 043import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 044import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 045 046import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 047import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 048import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; 049import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest; 050import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest; 051import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse; 052import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest; 053import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RemoteProcedureRequest; 054 055/** 056 * A remote procecdure dispatcher for regionservers. 057 */ 058@InterfaceAudience.Private 059public class RSProcedureDispatcher 060 extends RemoteProcedureDispatcher<MasterProcedureEnv, ServerName> 061 implements ServerListener { 062 private static final Logger LOG = LoggerFactory.getLogger(RSProcedureDispatcher.class); 063 064 public static final String RS_RPC_STARTUP_WAIT_TIME_CONF_KEY = 065 "hbase.regionserver.rpc.startup.waittime"; 066 private static final int DEFAULT_RS_RPC_STARTUP_WAIT_TIME = 60000; 067 068 protected final MasterServices master; 069 private final long rsStartupWaitTime; 070 private MasterProcedureEnv procedureEnv; 071 072 public RSProcedureDispatcher(final MasterServices master) { 073 super(master.getConfiguration()); 074 075 this.master = master; 076 this.rsStartupWaitTime = master.getConfiguration().getLong( 077 RS_RPC_STARTUP_WAIT_TIME_CONF_KEY, DEFAULT_RS_RPC_STARTUP_WAIT_TIME); 078 } 079 080 @Override 081 protected UncaughtExceptionHandler getUncaughtExceptionHandler() { 082 return new UncaughtExceptionHandler() { 083 084 @Override 085 public void uncaughtException(Thread t, Throwable e) { 086 LOG.error("Unexpected error caught, this may cause the procedure to hang forever", e); 087 } 088 }; 089 } 090 091 @Override 092 public boolean start() { 093 if (!super.start()) { 094 return false; 095 } 096 097 master.getServerManager().registerListener(this); 098 procedureEnv = master.getMasterProcedureExecutor().getEnvironment(); 099 for (ServerName serverName: master.getServerManager().getOnlineServersList()) { 100 addNode(serverName); 101 } 102 return true; 103 } 104 105 @Override 106 public boolean stop() { 107 if (!super.stop()) { 108 return false; 109 } 110 111 master.getServerManager().unregisterListener(this); 112 return true; 113 } 114 115 @Override 116 protected void remoteDispatch(final ServerName serverName, 117 final Set<RemoteProcedure> remoteProcedures) { 118 if (!master.getServerManager().isServerOnline(serverName)) { 119 // fail fast 120 submitTask(new DeadRSRemoteCall(serverName, remoteProcedures)); 121 } else { 122 submitTask(new ExecuteProceduresRemoteCall(serverName, remoteProcedures)); 123 } 124 } 125 126 @Override 127 protected void abortPendingOperations(final ServerName serverName, 128 final Set<RemoteProcedure> operations) { 129 // TODO: Replace with a ServerNotOnlineException() 130 final IOException e = new DoNotRetryIOException("server not online " + serverName); 131 for (RemoteProcedure proc: operations) { 132 proc.remoteCallFailed(procedureEnv, serverName, e); 133 } 134 } 135 136 @Override 137 public void serverAdded(final ServerName serverName) { 138 addNode(serverName); 139 } 140 141 @Override 142 public void serverRemoved(final ServerName serverName) { 143 removeNode(serverName); 144 } 145 146 private interface RemoteProcedureResolver { 147 void dispatchOpenRequests(MasterProcedureEnv env, List<RegionOpenOperation> operations); 148 149 void dispatchCloseRequests(MasterProcedureEnv env, List<RegionCloseOperation> operations); 150 151 void dispatchServerOperations(MasterProcedureEnv env, List<ServerOperation> operations); 152 } 153 154 /** 155 * Fetches {@link org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation}s 156 * from the given {@code remoteProcedures} and groups them by class of the returned operation. 157 * Then {@code resolver} is used to dispatch {@link RegionOpenOperation}s and 158 * {@link RegionCloseOperation}s. 159 * @param serverName RegionServer to which the remote operations are sent 160 * @param operations Remote procedures which are dispatched to the given server 161 * @param resolver Used to dispatch remote procedures to given server. 162 */ 163 public void splitAndResolveOperation(ServerName serverName, Set<RemoteProcedure> operations, 164 RemoteProcedureResolver resolver) { 165 MasterProcedureEnv env = master.getMasterProcedureExecutor().getEnvironment(); 166 ArrayListMultimap<Class<?>, RemoteOperation> reqsByType = 167 buildAndGroupRequestByType(env, serverName, operations); 168 169 List<RegionOpenOperation> openOps = fetchType(reqsByType, RegionOpenOperation.class); 170 if (!openOps.isEmpty()) { 171 resolver.dispatchOpenRequests(env, openOps); 172 } 173 174 List<RegionCloseOperation> closeOps = fetchType(reqsByType, RegionCloseOperation.class); 175 if (!closeOps.isEmpty()) { 176 resolver.dispatchCloseRequests(env, closeOps); 177 } 178 179 List<ServerOperation> refreshOps = fetchType(reqsByType, ServerOperation.class); 180 if (!refreshOps.isEmpty()) { 181 resolver.dispatchServerOperations(env, refreshOps); 182 } 183 184 if (!reqsByType.isEmpty()) { 185 LOG.warn("unknown request type in the queue: " + reqsByType); 186 } 187 } 188 189 private class DeadRSRemoteCall extends ExecuteProceduresRemoteCall { 190 191 public DeadRSRemoteCall(ServerName serverName, Set<RemoteProcedure> remoteProcedures) { 192 super(serverName, remoteProcedures); 193 } 194 195 @Override 196 public void run() { 197 remoteCallFailed(procedureEnv, 198 new RegionServerStoppedException("Server " + getServerName() + " is not online")); 199 } 200 } 201 202 // ========================================================================== 203 // Compatibility calls 204 // ========================================================================== 205 protected class ExecuteProceduresRemoteCall implements RemoteProcedureResolver, Runnable { 206 207 private final ServerName serverName; 208 209 private final Set<RemoteProcedure> remoteProcedures; 210 211 private int numberOfAttemptsSoFar = 0; 212 private long maxWaitTime = -1; 213 214 private ExecuteProceduresRequest.Builder request = null; 215 216 public ExecuteProceduresRemoteCall(final ServerName serverName, 217 final Set<RemoteProcedure> remoteProcedures) { 218 this.serverName = serverName; 219 this.remoteProcedures = remoteProcedures; 220 } 221 222 private AdminService.BlockingInterface getRsAdmin() throws IOException { 223 final AdminService.BlockingInterface admin = master.getServerManager().getRsAdmin(serverName); 224 if (admin == null) { 225 throw new IOException("Attempting to send OPEN RPC to server " + getServerName() + 226 " failed because no RPC connection found to this server"); 227 } 228 return admin; 229 } 230 231 protected final ServerName getServerName() { 232 return serverName; 233 } 234 235 private boolean scheduleForRetry(IOException e) { 236 LOG.debug("request to {} failed, try={}", serverName, numberOfAttemptsSoFar, e); 237 // Should we wait a little before retrying? If the server is starting it's yes. 238 if (e instanceof ServerNotRunningYetException) { 239 long remainingTime = getMaxWaitTime() - EnvironmentEdgeManager.currentTime(); 240 if (remainingTime > 0) { 241 LOG.warn("waiting a little before trying on the same server={}," + 242 " try={}, can wait up to {}ms", serverName, numberOfAttemptsSoFar, remainingTime); 243 numberOfAttemptsSoFar++; 244 submitTask(this, 100, TimeUnit.MILLISECONDS); 245 return true; 246 } 247 LOG.warn("server {} is not up for a while; try a new one", serverName); 248 return false; 249 } 250 if (e instanceof DoNotRetryIOException) { 251 LOG.warn("server {} tells us do not retry due to {}, try={}, give up", serverName, 252 e.toString(), numberOfAttemptsSoFar); 253 return false; 254 } 255 // this exception is thrown in the rpc framework, where we can make sure that the call has not 256 // been executed yet, so it is safe to mark it as fail. Especially for open a region, we'd 257 // better choose another region server 258 // notice that, it is safe to quit only if this is the first time we send request to region 259 // server. Maybe the region server has accept our request the first time, and then there is a 260 // network error which prevents we receive the response, and the second time we hit a 261 // CallQueueTooBigException, obviously it is not safe to quit here, otherwise it may lead to a 262 // double assign... 263 if (e instanceof CallQueueTooBigException && numberOfAttemptsSoFar == 0) { 264 LOG.warn("request to {} failed due to {}, try={}, this usually because" + 265 " server is overloaded, give up", serverName, e.toString(), numberOfAttemptsSoFar); 266 return false; 267 } 268 // Always retry for other exception types if the region server is not dead yet. 269 if (!master.getServerManager().isServerOnline(serverName)) { 270 LOG.warn("request to {} failed due to {}, try={}, and the server is dead, give up", 271 serverName, e.toString(), numberOfAttemptsSoFar); 272 return false; 273 } 274 if (e instanceof RegionServerAbortedException || e instanceof RegionServerStoppedException) { 275 // A better way is to return true here to let the upper layer quit, and then schedule a 276 // background task to check whether the region server is dead. And if it is dead, call 277 // remoteCallFailed to tell the upper layer. Keep retrying here does not lead to incorrect 278 // result, but waste some resources. 279 LOG.warn("server {} is aborted or stopped, for safety we still need to" + 280 " wait until it is fully dead, try={}", serverName, numberOfAttemptsSoFar); 281 } else { 282 LOG.warn("request to server {} failed due to {}, try={}, retrying...", serverName, 283 e.toString(), numberOfAttemptsSoFar); 284 } 285 numberOfAttemptsSoFar++; 286 submitTask(this, 100, TimeUnit.MILLISECONDS); 287 return true; 288 } 289 290 private long getMaxWaitTime() { 291 if (this.maxWaitTime < 0) { 292 // This is the max attempts, not retries, so it should be at least 1. 293 this.maxWaitTime = EnvironmentEdgeManager.currentTime() + rsStartupWaitTime; 294 } 295 return this.maxWaitTime; 296 } 297 298 private IOException unwrapException(IOException e) { 299 if (e instanceof RemoteException) { 300 e = ((RemoteException)e).unwrapRemoteException(); 301 } 302 return e; 303 } 304 305 @Override 306 public void run() { 307 request = ExecuteProceduresRequest.newBuilder(); 308 if (LOG.isTraceEnabled()) { 309 LOG.trace("Building request with operations count=" + remoteProcedures.size()); 310 } 311 splitAndResolveOperation(getServerName(), remoteProcedures, this); 312 313 try { 314 sendRequest(getServerName(), request.build()); 315 } catch (IOException e) { 316 e = unwrapException(e); 317 // TODO: In the future some operation may want to bail out early. 318 // TODO: How many times should we retry (use numberOfAttemptsSoFar) 319 if (!scheduleForRetry(e)) { 320 remoteCallFailed(procedureEnv, e); 321 } 322 } 323 } 324 325 @Override 326 public void dispatchOpenRequests(final MasterProcedureEnv env, 327 final List<RegionOpenOperation> operations) { 328 request.addOpenRegion(buildOpenRegionRequest(env, getServerName(), operations)); 329 } 330 331 @Override 332 public void dispatchCloseRequests(final MasterProcedureEnv env, 333 final List<RegionCloseOperation> operations) { 334 for (RegionCloseOperation op: operations) { 335 request.addCloseRegion(op.buildCloseRegionRequest(getServerName())); 336 } 337 } 338 339 @Override 340 public void dispatchServerOperations(MasterProcedureEnv env, List<ServerOperation> operations) { 341 operations.stream().map(o -> o.buildRequest()).forEachOrdered(request::addProc); 342 } 343 344 // will be overridden in test. 345 @VisibleForTesting 346 protected ExecuteProceduresResponse sendRequest(final ServerName serverName, 347 final ExecuteProceduresRequest request) throws IOException { 348 try { 349 return getRsAdmin().executeProcedures(null, request); 350 } catch (ServiceException se) { 351 throw ProtobufUtil.getRemoteException(se); 352 } 353 } 354 355 protected final void remoteCallFailed(final MasterProcedureEnv env, final IOException e) { 356 for (RemoteProcedure proc : remoteProcedures) { 357 proc.remoteCallFailed(env, getServerName(), e); 358 } 359 } 360 } 361 362 private static OpenRegionRequest buildOpenRegionRequest(final MasterProcedureEnv env, 363 final ServerName serverName, final List<RegionOpenOperation> operations) { 364 final OpenRegionRequest.Builder builder = OpenRegionRequest.newBuilder(); 365 builder.setServerStartCode(serverName.getStartcode()); 366 builder.setMasterSystemTime(EnvironmentEdgeManager.currentTime()); 367 for (RegionOpenOperation op: operations) { 368 builder.addOpenInfo(op.buildRegionOpenInfoRequest(env)); 369 } 370 return builder.build(); 371 } 372 373 // ========================================================================== 374 // RPC Messages 375 // - ServerOperation: refreshConfig, grant, revoke, ... (TODO) 376 // - RegionOperation: open, close, flush, snapshot, ... 377 // ========================================================================== 378 379 public static final class ServerOperation extends RemoteOperation { 380 381 private final long procId; 382 383 private final Class<?> rsProcClass; 384 385 private final byte[] rsProcData; 386 387 public ServerOperation(RemoteProcedure remoteProcedure, long procId, Class<?> rsProcClass, 388 byte[] rsProcData) { 389 super(remoteProcedure); 390 this.procId = procId; 391 this.rsProcClass = rsProcClass; 392 this.rsProcData = rsProcData; 393 } 394 395 public RemoteProcedureRequest buildRequest() { 396 return RemoteProcedureRequest.newBuilder().setProcId(procId) 397 .setProcClass(rsProcClass.getName()).setProcData(ByteString.copyFrom(rsProcData)).build(); 398 } 399 } 400 401 public static abstract class RegionOperation extends RemoteOperation { 402 protected final RegionInfo regionInfo; 403 protected final long procId; 404 405 protected RegionOperation(RemoteProcedure remoteProcedure, RegionInfo regionInfo, long procId) { 406 super(remoteProcedure); 407 this.regionInfo = regionInfo; 408 this.procId = procId; 409 } 410 } 411 412 public static class RegionOpenOperation extends RegionOperation { 413 414 public RegionOpenOperation(RemoteProcedure remoteProcedure, RegionInfo regionInfo, 415 long procId) { 416 super(remoteProcedure, regionInfo, procId); 417 } 418 419 public OpenRegionRequest.RegionOpenInfo buildRegionOpenInfoRequest( 420 final MasterProcedureEnv env) { 421 return RequestConverter.buildRegionOpenInfo(regionInfo, 422 env.getAssignmentManager().getFavoredNodes(regionInfo), procId); 423 } 424 } 425 426 public static class RegionCloseOperation extends RegionOperation { 427 private final ServerName destinationServer; 428 429 public RegionCloseOperation(RemoteProcedure remoteProcedure, RegionInfo regionInfo, long procId, 430 ServerName destinationServer) { 431 super(remoteProcedure, regionInfo, procId); 432 this.destinationServer = destinationServer; 433 } 434 435 public ServerName getDestinationServer() { 436 return destinationServer; 437 } 438 439 public CloseRegionRequest buildCloseRegionRequest(final ServerName serverName) { 440 return ProtobufUtil.buildCloseRegionRequest(serverName, regionInfo.getRegionName(), 441 getDestinationServer(), procId); 442 } 443 } 444}