001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.assignment; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Arrays; 023import java.util.Collection; 024import java.util.Collections; 025import java.util.HashMap; 026import java.util.HashSet; 027import java.util.List; 028import java.util.Map; 029import java.util.Set; 030import java.util.concurrent.CopyOnWriteArrayList; 031import java.util.concurrent.Future; 032import java.util.concurrent.TimeUnit; 033import java.util.concurrent.atomic.AtomicBoolean; 034import java.util.concurrent.locks.Condition; 035import java.util.concurrent.locks.ReentrantLock; 036import java.util.stream.Collectors; 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.hbase.HBaseIOException; 039import org.apache.hadoop.hbase.HConstants; 040import org.apache.hadoop.hbase.PleaseHoldException; 041import org.apache.hadoop.hbase.ServerName; 042import org.apache.hadoop.hbase.TableName; 043import org.apache.hadoop.hbase.YouAreDeadException; 044import org.apache.hadoop.hbase.client.RegionInfo; 045import org.apache.hadoop.hbase.client.RegionInfoBuilder; 046import org.apache.hadoop.hbase.client.Result; 047import org.apache.hadoop.hbase.client.TableState; 048import org.apache.hadoop.hbase.exceptions.UnexpectedStateException; 049import org.apache.hadoop.hbase.favored.FavoredNodesManager; 050import org.apache.hadoop.hbase.favored.FavoredNodesPromoter; 051import org.apache.hadoop.hbase.master.AssignmentListener; 052import org.apache.hadoop.hbase.master.LoadBalancer; 053import org.apache.hadoop.hbase.master.MasterServices; 054import org.apache.hadoop.hbase.master.MetricsAssignmentManager; 055import org.apache.hadoop.hbase.master.RegionPlan; 056import org.apache.hadoop.hbase.master.RegionState; 057import org.apache.hadoop.hbase.master.RegionState.State; 058import org.apache.hadoop.hbase.master.ServerListener; 059import org.apache.hadoop.hbase.master.TableStateManager; 060import org.apache.hadoop.hbase.master.assignment.RegionStates.RegionStateNode; 061import org.apache.hadoop.hbase.master.assignment.RegionStates.ServerState; 062import org.apache.hadoop.hbase.master.assignment.RegionStates.ServerStateNode; 063import org.apache.hadoop.hbase.master.balancer.FavoredStochasticBalancer; 064import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 065import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler; 066import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait; 067import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure; 068import org.apache.hadoop.hbase.procedure2.Procedure; 069import org.apache.hadoop.hbase.procedure2.ProcedureEvent; 070import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; 071import org.apache.hadoop.hbase.procedure2.ProcedureInMemoryChore; 072import org.apache.hadoop.hbase.procedure2.util.StringUtils; 073import org.apache.hadoop.hbase.regionserver.SequenceId; 074import org.apache.hadoop.hbase.util.Bytes; 075import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 076import org.apache.hadoop.hbase.util.HasThread; 077import org.apache.hadoop.hbase.util.Pair; 078import org.apache.hadoop.hbase.util.Threads; 079import org.apache.hadoop.hbase.util.VersionInfo; 080import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; 081import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 082import org.apache.yetus.audience.InterfaceAudience; 083import org.apache.zookeeper.KeeperException; 084import org.slf4j.Logger; 085import org.slf4j.LoggerFactory; 086 087import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 088 089import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 090import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition; 091import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; 092import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest; 093import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse; 094 095/** 096 * The AssignmentManager is the coordinator for region assign/unassign operations. 097 * <ul> 098 * <li>In-memory states of regions and servers are stored in {@link RegionStates}.</li> 099 * <li>hbase:meta state updates are handled by {@link RegionStateStore}.</li> 100 * </ul> 101 * Regions are created by CreateTable, Split, Merge. 102 * Regions are deleted by DeleteTable, Split, Merge. 103 * Assigns are triggered by CreateTable, EnableTable, Split, Merge, ServerCrash. 104 * Unassigns are triggered by DisableTable, Split, Merge 105 */ 106@InterfaceAudience.Private 107public class AssignmentManager implements ServerListener { 108 private static final Logger LOG = LoggerFactory.getLogger(AssignmentManager.class); 109 110 // TODO: AMv2 111 // - handle region migration from hbase1 to hbase2. 112 // - handle sys table assignment first (e.g. acl, namespace) 113 // - handle table priorities 114 // - If ServerBusyException trying to update hbase:meta, we abort the Master 115 // See updateRegionLocation in RegionStateStore. 116 // 117 // See also 118 // https://docs.google.com/document/d/1eVKa7FHdeoJ1-9o8yZcOTAQbv0u0bblBlCCzVSIn69g/edit#heading=h.ystjyrkbtoq5 119 // for other TODOs. 120 121 public static final String BOOTSTRAP_THREAD_POOL_SIZE_CONF_KEY = 122 "hbase.assignment.bootstrap.thread.pool.size"; 123 124 public static final String ASSIGN_DISPATCH_WAIT_MSEC_CONF_KEY = 125 "hbase.assignment.dispatch.wait.msec"; 126 private static final int DEFAULT_ASSIGN_DISPATCH_WAIT_MSEC = 150; 127 128 public static final String ASSIGN_DISPATCH_WAITQ_MAX_CONF_KEY = 129 "hbase.assignment.dispatch.wait.queue.max.size"; 130 private static final int DEFAULT_ASSIGN_DISPATCH_WAITQ_MAX = 100; 131 132 public static final String RIT_CHORE_INTERVAL_MSEC_CONF_KEY = 133 "hbase.assignment.rit.chore.interval.msec"; 134 private static final int DEFAULT_RIT_CHORE_INTERVAL_MSEC = 60 * 1000; 135 136 public static final String ASSIGN_MAX_ATTEMPTS = 137 "hbase.assignment.maximum.attempts"; 138 private static final int DEFAULT_ASSIGN_MAX_ATTEMPTS = Integer.MAX_VALUE; 139 140 /** Region in Transition metrics threshold time */ 141 public static final String METRICS_RIT_STUCK_WARNING_THRESHOLD = 142 "hbase.metrics.rit.stuck.warning.threshold"; 143 private static final int DEFAULT_RIT_STUCK_WARNING_THRESHOLD = 60 * 1000; 144 145 private final ProcedureEvent<?> metaAssignEvent = new ProcedureEvent<>("meta assign"); 146 private final ProcedureEvent<?> metaLoadEvent = new ProcedureEvent<>("meta load"); 147 148 /** Listeners that are called on assignment events. */ 149 private final CopyOnWriteArrayList<AssignmentListener> listeners = 150 new CopyOnWriteArrayList<AssignmentListener>(); 151 152 private final MetricsAssignmentManager metrics; 153 private final RegionInTransitionChore ritChore; 154 private final MasterServices master; 155 156 private final AtomicBoolean running = new AtomicBoolean(false); 157 private final RegionStates regionStates = new RegionStates(); 158 private final RegionStateStore regionStateStore; 159 160 private final Map<ServerName, Set<byte[]>> rsReports = new HashMap<>(); 161 162 private final boolean shouldAssignRegionsWithFavoredNodes; 163 private final int assignDispatchWaitQueueMaxSize; 164 private final int assignDispatchWaitMillis; 165 private final int assignMaxAttempts; 166 167 private final Object checkIfShouldMoveSystemRegionLock = new Object(); 168 169 private Thread assignThread; 170 171 public AssignmentManager(final MasterServices master) { 172 this(master, new RegionStateStore(master)); 173 } 174 175 public AssignmentManager(final MasterServices master, final RegionStateStore stateStore) { 176 this.master = master; 177 this.regionStateStore = stateStore; 178 this.metrics = new MetricsAssignmentManager(); 179 180 final Configuration conf = master.getConfiguration(); 181 182 // Only read favored nodes if using the favored nodes load balancer. 183 this.shouldAssignRegionsWithFavoredNodes = FavoredStochasticBalancer.class.isAssignableFrom( 184 conf.getClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, Object.class)); 185 186 this.assignDispatchWaitMillis = conf.getInt(ASSIGN_DISPATCH_WAIT_MSEC_CONF_KEY, 187 DEFAULT_ASSIGN_DISPATCH_WAIT_MSEC); 188 this.assignDispatchWaitQueueMaxSize = conf.getInt(ASSIGN_DISPATCH_WAITQ_MAX_CONF_KEY, 189 DEFAULT_ASSIGN_DISPATCH_WAITQ_MAX); 190 191 this.assignMaxAttempts = Math.max(1, conf.getInt(ASSIGN_MAX_ATTEMPTS, 192 DEFAULT_ASSIGN_MAX_ATTEMPTS)); 193 194 int ritChoreInterval = conf.getInt(RIT_CHORE_INTERVAL_MSEC_CONF_KEY, 195 DEFAULT_RIT_CHORE_INTERVAL_MSEC); 196 this.ritChore = new RegionInTransitionChore(ritChoreInterval); 197 } 198 199 public void start() throws IOException, KeeperException { 200 if (!running.compareAndSet(false, true)) { 201 return; 202 } 203 204 LOG.trace("Starting assignment manager"); 205 206 // Register Server Listener 207 master.getServerManager().registerListener(this); 208 209 // Start the Assignment Thread 210 startAssignmentThread(); 211 212 // load meta region state 213 ZKWatcher zkw = master.getZooKeeper(); 214 // it could be null in some tests 215 if (zkw != null) { 216 RegionState regionState = MetaTableLocator.getMetaRegionState(zkw); 217 RegionStateNode regionStateNode = 218 regionStates.getOrCreateRegionStateNode(RegionInfoBuilder.FIRST_META_REGIONINFO); 219 synchronized (regionStateNode) { 220 regionStateNode.setRegionLocation(regionState.getServerName()); 221 regionStateNode.setState(regionState.getState()); 222 setMetaAssigned(regionState.getRegion(), regionState.getState() == State.OPEN); 223 } 224 } 225 } 226 227 public void stop() { 228 if (!running.compareAndSet(true, false)) { 229 return; 230 } 231 232 LOG.info("Stopping assignment manager"); 233 234 // The AM is started before the procedure executor, 235 // but the actual work will be loaded/submitted only once we have the executor 236 final boolean hasProcExecutor = master.getMasterProcedureExecutor() != null; 237 238 // Remove the RIT chore 239 if (hasProcExecutor) { 240 master.getMasterProcedureExecutor().removeChore(this.ritChore); 241 } 242 243 // Stop the Assignment Thread 244 stopAssignmentThread(); 245 246 // Stop the RegionStateStore 247 regionStates.clear(); 248 249 // Unregister Server Listener 250 master.getServerManager().unregisterListener(this); 251 252 // Update meta events (for testing) 253 if (hasProcExecutor) { 254 metaLoadEvent.suspend(); 255 for (RegionInfo hri: getMetaRegionSet()) { 256 setMetaAssigned(hri, false); 257 } 258 } 259 } 260 261 public boolean isRunning() { 262 return running.get(); 263 } 264 265 public Configuration getConfiguration() { 266 return master.getConfiguration(); 267 } 268 269 public MetricsAssignmentManager getAssignmentManagerMetrics() { 270 return metrics; 271 } 272 273 private LoadBalancer getBalancer() { 274 return master.getLoadBalancer(); 275 } 276 277 private MasterProcedureEnv getProcedureEnvironment() { 278 return master.getMasterProcedureExecutor().getEnvironment(); 279 } 280 281 private MasterProcedureScheduler getProcedureScheduler() { 282 return getProcedureEnvironment().getProcedureScheduler(); 283 } 284 285 int getAssignMaxAttempts() { 286 return assignMaxAttempts; 287 } 288 289 /** 290 * Add the listener to the notification list. 291 * @param listener The AssignmentListener to register 292 */ 293 public void registerListener(final AssignmentListener listener) { 294 this.listeners.add(listener); 295 } 296 297 /** 298 * Remove the listener from the notification list. 299 * @param listener The AssignmentListener to unregister 300 */ 301 public boolean unregisterListener(final AssignmentListener listener) { 302 return this.listeners.remove(listener); 303 } 304 305 public RegionStates getRegionStates() { 306 return regionStates; 307 } 308 309 public RegionStateStore getRegionStateStore() { 310 return regionStateStore; 311 } 312 313 public List<ServerName> getFavoredNodes(final RegionInfo regionInfo) { 314 return this.shouldAssignRegionsWithFavoredNodes? 315 ((FavoredStochasticBalancer)getBalancer()).getFavoredNodes(regionInfo): 316 ServerName.EMPTY_SERVER_LIST; 317 } 318 319 // ============================================================================================ 320 // Table State Manager helpers 321 // ============================================================================================ 322 TableStateManager getTableStateManager() { 323 return master.getTableStateManager(); 324 } 325 326 public boolean isTableEnabled(final TableName tableName) { 327 return getTableStateManager().isTableState(tableName, TableState.State.ENABLED); 328 } 329 330 public boolean isTableDisabled(final TableName tableName) { 331 return getTableStateManager().isTableState(tableName, 332 TableState.State.DISABLED, TableState.State.DISABLING); 333 } 334 335 // ============================================================================================ 336 // META Helpers 337 // ============================================================================================ 338 private boolean isMetaRegion(final RegionInfo regionInfo) { 339 return regionInfo.isMetaRegion(); 340 } 341 342 public boolean isMetaRegion(final byte[] regionName) { 343 return getMetaRegionFromName(regionName) != null; 344 } 345 346 public RegionInfo getMetaRegionFromName(final byte[] regionName) { 347 for (RegionInfo hri: getMetaRegionSet()) { 348 if (Bytes.equals(hri.getRegionName(), regionName)) { 349 return hri; 350 } 351 } 352 return null; 353 } 354 355 public boolean isCarryingMeta(final ServerName serverName) { 356 // TODO: handle multiple meta 357 return isCarryingRegion(serverName, RegionInfoBuilder.FIRST_META_REGIONINFO); 358 } 359 360 private boolean isCarryingRegion(final ServerName serverName, final RegionInfo regionInfo) { 361 // TODO: check for state? 362 final RegionStateNode node = regionStates.getRegionStateNode(regionInfo); 363 return(node != null && serverName.equals(node.getRegionLocation())); 364 } 365 366 private RegionInfo getMetaForRegion(final RegionInfo regionInfo) { 367 //if (regionInfo.isMetaRegion()) return regionInfo; 368 // TODO: handle multiple meta. if the region provided is not meta lookup 369 // which meta the region belongs to. 370 return RegionInfoBuilder.FIRST_META_REGIONINFO; 371 } 372 373 // TODO: handle multiple meta. 374 private static final Set<RegionInfo> META_REGION_SET = 375 Collections.singleton(RegionInfoBuilder.FIRST_META_REGIONINFO); 376 public Set<RegionInfo> getMetaRegionSet() { 377 return META_REGION_SET; 378 } 379 380 // ============================================================================================ 381 // META Event(s) helpers 382 // ============================================================================================ 383 /** 384 * Notice that, this only means the meta region is available on a RS, but the AM may still be 385 * loading the region states from meta, so usually you need to check {@link #isMetaLoaded()} first 386 * before checking this method, unless you can make sure that your piece of code can only be 387 * executed after AM builds the region states. 388 * @see #isMetaLoaded() 389 */ 390 public boolean isMetaAssigned() { 391 return metaAssignEvent.isReady(); 392 } 393 394 public boolean isMetaRegionInTransition() { 395 return !isMetaAssigned(); 396 } 397 398 /** 399 * Notice that this event does not mean the AM has already finished region state rebuilding. See 400 * the comment of {@link #isMetaAssigned()} for more details. 401 * @see #isMetaAssigned() 402 */ 403 public boolean waitMetaAssigned(Procedure<?> proc, RegionInfo regionInfo) { 404 return getMetaAssignEvent(getMetaForRegion(regionInfo)).suspendIfNotReady(proc); 405 } 406 407 private void setMetaAssigned(RegionInfo metaRegionInfo, boolean assigned) { 408 assert isMetaRegion(metaRegionInfo) : "unexpected non-meta region " + metaRegionInfo; 409 ProcedureEvent<?> metaAssignEvent = getMetaAssignEvent(metaRegionInfo); 410 if (assigned) { 411 metaAssignEvent.wake(getProcedureScheduler()); 412 } else { 413 metaAssignEvent.suspend(); 414 } 415 } 416 417 private ProcedureEvent<?> getMetaAssignEvent(RegionInfo metaRegionInfo) { 418 assert isMetaRegion(metaRegionInfo) : "unexpected non-meta region " + metaRegionInfo; 419 // TODO: handle multiple meta. 420 return metaAssignEvent; 421 } 422 423 /** 424 * Wait until AM finishes the meta loading, i.e, the region states rebuilding. 425 * @see #isMetaLoaded() 426 * @see #waitMetaAssigned(Procedure, RegionInfo) 427 */ 428 public boolean waitMetaLoaded(Procedure<?> proc) { 429 return metaLoadEvent.suspendIfNotReady(proc); 430 } 431 432 @VisibleForTesting 433 void wakeMetaLoadedEvent() { 434 metaLoadEvent.wake(getProcedureScheduler()); 435 assert isMetaLoaded() : "expected meta to be loaded"; 436 } 437 438 /** 439 * Return whether AM finishes the meta loading, i.e, the region states rebuilding. 440 * @see #isMetaAssigned() 441 * @see #waitMetaLoaded(Procedure) 442 */ 443 public boolean isMetaLoaded() { 444 return metaLoadEvent.isReady(); 445 } 446 447 /** 448 * Start a new thread to check if there are region servers whose versions are higher than others. 449 * If so, move all system table regions to RS with the highest version to keep compatibility. 450 * The reason is, RS in new version may not be able to access RS in old version when there are 451 * some incompatible changes. 452 * <p>This method is called when a new RegionServer is added to cluster only.</p> 453 */ 454 public void checkIfShouldMoveSystemRegionAsync() { 455 // TODO: Fix this thread. If a server is killed and a new one started, this thread thinks that 456 // it should 'move' the system tables from the old server to the new server but 457 // ServerCrashProcedure is on it; and it will take care of the assign without dataloss. 458 if (this.master.getServerManager().countOfRegionServers() <= 1) { 459 return; 460 } 461 // This thread used to run whenever there was a change in the cluster. The ZooKeeper 462 // childrenChanged notification came in before the nodeDeleted message and so this method 463 // cold run before a ServerCrashProcedure could run. That meant that this thread could see 464 // a Crashed Server before ServerCrashProcedure and it could find system regions on the 465 // crashed server and go move them before ServerCrashProcedure had a chance; could be 466 // dataloss too if WALs were not recovered. 467 new Thread(() -> { 468 try { 469 synchronized (checkIfShouldMoveSystemRegionLock) { 470 List<RegionPlan> plans = new ArrayList<>(); 471 // TODO: I don't think this code does a good job if all servers in cluster have same 472 // version. It looks like it will schedule unnecessary moves. 473 for (ServerName server : getExcludedServersForSystemTable()) { 474 if (master.getServerManager().isServerDead(server)) { 475 // TODO: See HBASE-18494 and HBASE-18495. Though getExcludedServersForSystemTable() 476 // considers only online servers, the server could be queued for dead server 477 // processing. As region assignments for crashed server is handled by 478 // ServerCrashProcedure, do NOT handle them here. The goal is to handle this through 479 // regular flow of LoadBalancer as a favored node and not to have this special 480 // handling. 481 continue; 482 } 483 List<RegionInfo> regionsShouldMove = getSystemTables(server); 484 if (!regionsShouldMove.isEmpty()) { 485 for (RegionInfo regionInfo : regionsShouldMove) { 486 // null value for dest forces destination server to be selected by balancer 487 RegionPlan plan = new RegionPlan(regionInfo, server, null); 488 if (regionInfo.isMetaRegion()) { 489 // Must move meta region first. 490 LOG.info("Async MOVE of {} to newer Server={}", 491 regionInfo.getEncodedName(), server); 492 moveAsync(plan); 493 } else { 494 plans.add(plan); 495 } 496 } 497 } 498 for (RegionPlan plan : plans) { 499 LOG.info("Async MOVE of {} to newer Server={}", 500 plan.getRegionInfo().getEncodedName(), server); 501 moveAsync(plan); 502 } 503 } 504 } 505 } catch (Throwable t) { 506 LOG.error(t.toString(), t); 507 } 508 }).start(); 509 } 510 511 private List<RegionInfo> getSystemTables(ServerName serverName) { 512 Set<RegionStateNode> regions = this.getRegionStates().getServerNode(serverName).getRegions(); 513 if (regions == null) { 514 return new ArrayList<>(); 515 } 516 return regions.stream() 517 .map(RegionStateNode::getRegionInfo) 518 .filter(r -> r.getTable().isSystemTable()) 519 .collect(Collectors.toList()); 520 } 521 522 public void assign(final RegionInfo regionInfo, ServerName sn) throws IOException { 523 AssignProcedure proc = createAssignProcedure(regionInfo, sn); 524 ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc); 525 } 526 527 public void assign(final RegionInfo regionInfo) throws IOException { 528 AssignProcedure proc = createAssignProcedure(regionInfo); 529 ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc); 530 } 531 532 public void unassign(final RegionInfo regionInfo) throws IOException { 533 unassign(regionInfo, false); 534 } 535 536 public void unassign(final RegionInfo regionInfo, final boolean forceNewPlan) 537 throws IOException { 538 // TODO: rename this reassign 539 RegionStateNode node = this.regionStates.getRegionStateNode(regionInfo); 540 ServerName destinationServer = node.getRegionLocation(); 541 if (destinationServer == null) { 542 throw new UnexpectedStateException("DestinationServer is null; Assigned? " + node.toString()); 543 } 544 assert destinationServer != null; node.toString(); 545 UnassignProcedure proc = createUnassignProcedure(regionInfo, destinationServer, forceNewPlan); 546 ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc); 547 } 548 549 public void move(final RegionInfo regionInfo) throws IOException { 550 RegionStateNode node = this.regionStates.getRegionStateNode(regionInfo); 551 ServerName sourceServer = node.getRegionLocation(); 552 RegionPlan plan = new RegionPlan(regionInfo, sourceServer, null); 553 MoveRegionProcedure proc = createMoveRegionProcedure(plan); 554 ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc); 555 } 556 557 public Future<byte[]> moveAsync(final RegionPlan regionPlan) throws HBaseIOException { 558 MoveRegionProcedure proc = createMoveRegionProcedure(regionPlan); 559 return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), proc); 560 } 561 562 // ============================================================================================ 563 // RegionTransition procedures helpers 564 // ============================================================================================ 565 566 /** 567 * Create round-robin assigns. Use on table creation to distribute out regions across cluster. 568 * @return AssignProcedures made out of the passed in <code>hris</code> and a call 569 * to the balancer to populate the assigns with targets chosen using round-robin (default 570 * balancer scheme). If at assign-time, the target chosen is no longer up, thats fine, 571 * the AssignProcedure will ask the balancer for a new target, and so on. 572 */ 573 public AssignProcedure[] createRoundRobinAssignProcedures(final List<RegionInfo> hris) { 574 if (hris.isEmpty()) { 575 return null; 576 } 577 try { 578 // Ask the balancer to assign our regions. Pass the regions en masse. The balancer can do 579 // a better job if it has all the assignments in the one lump. 580 Map<ServerName, List<RegionInfo>> assignments = getBalancer().roundRobinAssignment(hris, 581 this.master.getServerManager().createDestinationServersList(null)); 582 // Return mid-method! 583 return createAssignProcedures(assignments, hris.size()); 584 } catch (HBaseIOException hioe) { 585 LOG.warn("Failed roundRobinAssignment", hioe); 586 } 587 // If an error above, fall-through to this simpler assign. Last resort. 588 return createAssignProcedures(hris); 589 } 590 591 /** 592 * Create an array of AssignProcedures w/o specifying a target server. 593 * If no target server, at assign time, we will try to use the former location of the region 594 * if one exists. This is how we 'retain' the old location across a server restart. 595 * Used by {@link ServerCrashProcedure} assigning regions on a server that has crashed (SCP is 596 * also used across a cluster-restart just-in-case to ensure we do cleanup of any old WALs or 597 * server processes). 598 */ 599 public AssignProcedure[] createAssignProcedures(final List<RegionInfo> hris) { 600 if (hris.isEmpty()) { 601 return null; 602 } 603 int index = 0; 604 AssignProcedure [] procedures = new AssignProcedure[hris.size()]; 605 for (RegionInfo hri : hris) { 606 // Sort the procedures so meta and system regions are first in the returned array. 607 procedures[index++] = createAssignProcedure(hri); 608 } 609 if (procedures.length > 1) { 610 // Sort the procedures so meta and system regions are first in the returned array. 611 Arrays.sort(procedures, AssignProcedure.COMPARATOR); 612 } 613 return procedures; 614 } 615 616 // Make this static for the method below where we use it typing the AssignProcedure array we 617 // return as result. 618 private static final AssignProcedure [] ASSIGN_PROCEDURE_ARRAY_TYPE = new AssignProcedure[] {}; 619 620 /** 621 * @param assignments Map of assignments from which we produce an array of AssignProcedures. 622 * @param size Count of assignments to make (the caller may know the total count) 623 * @return Assignments made from the passed in <code>assignments</code> 624 */ 625 private AssignProcedure[] createAssignProcedures(Map<ServerName, List<RegionInfo>> assignments, 626 int size) { 627 List<AssignProcedure> procedures = new ArrayList<>(size > 0? size: 8/*Arbitrary*/); 628 for (Map.Entry<ServerName, List<RegionInfo>> e: assignments.entrySet()) { 629 for (RegionInfo ri: e.getValue()) { 630 AssignProcedure ap = createAssignProcedure(ri, e.getKey()); 631 ap.setOwner(getProcedureEnvironment().getRequestUser().getShortName()); 632 procedures.add(ap); 633 } 634 } 635 if (procedures.size() > 1) { 636 // Sort the procedures so meta and system regions are first in the returned array. 637 procedures.sort(AssignProcedure.COMPARATOR); 638 } 639 return procedures.toArray(ASSIGN_PROCEDURE_ARRAY_TYPE); 640 } 641 642 // Needed for the following method so it can type the created Array we retur n 643 private static final UnassignProcedure [] UNASSIGN_PROCEDURE_ARRAY_TYPE = 644 new UnassignProcedure[0]; 645 646 UnassignProcedure[] createUnassignProcedures(final Collection<RegionStateNode> nodes) { 647 if (nodes.isEmpty()) return null; 648 final List<UnassignProcedure> procs = new ArrayList<UnassignProcedure>(nodes.size()); 649 for (RegionStateNode node: nodes) { 650 if (!this.regionStates.include(node, false)) continue; 651 // Look for regions that are offline/closed; i.e. already unassigned. 652 if (this.regionStates.isRegionOffline(node.getRegionInfo())) continue; 653 assert node.getRegionLocation() != null: node.toString(); 654 procs.add(createUnassignProcedure(node.getRegionInfo(), node.getRegionLocation(), false)); 655 } 656 return procs.toArray(UNASSIGN_PROCEDURE_ARRAY_TYPE); 657 } 658 659 /** 660 * Called by things like DisableTableProcedure to get a list of UnassignProcedure 661 * to unassign the regions of the table. 662 */ 663 public AssignProcedure createAssignProcedure(final RegionInfo regionInfo) { 664 return createAssignProcedure(regionInfo, null, false); 665 } 666 667 public AssignProcedure createAssignProcedure(final RegionInfo regionInfo, boolean override) { 668 return createAssignProcedure(regionInfo, null, override); 669 } 670 671 public AssignProcedure createAssignProcedure(final RegionInfo regionInfo, 672 ServerName targetServer) { 673 return createAssignProcedure(regionInfo, targetServer, false); 674 } 675 676 public AssignProcedure createAssignProcedure(final RegionInfo regionInfo, 677 final ServerName targetServer, boolean override) { 678 AssignProcedure proc = new AssignProcedure(regionInfo, targetServer, override); 679 proc.setOwner(getProcedureEnvironment().getRequestUser().getShortName()); 680 return proc; 681 } 682 683 public UnassignProcedure createUnassignProcedure(final RegionInfo regionInfo) { 684 return createUnassignProcedure(regionInfo, null, false); 685 } 686 687 public UnassignProcedure createUnassignProcedure(final RegionInfo regionInfo, 688 boolean override) { 689 return createUnassignProcedure(regionInfo, null, override); 690 } 691 692 public UnassignProcedure[] createUnassignProcedures(final TableName tableName) { 693 return createUnassignProcedures(regionStates.getTableRegionStateNodes(tableName)); 694 } 695 696 UnassignProcedure createUnassignProcedure(final RegionInfo regionInfo, 697 final ServerName destinationServer, final boolean force) { 698 return createUnassignProcedure(regionInfo, destinationServer, force, false); 699 } 700 701 UnassignProcedure createUnassignProcedure(final RegionInfo regionInfo, 702 final ServerName destinationServer, final boolean force, 703 final boolean removeAfterUnassigning) { 704 // If destinationServer is null, figure it. 705 ServerName sn = destinationServer != null? destinationServer: 706 getRegionStates().getRegionState(regionInfo).getServerName(); 707 assert sn != null; 708 UnassignProcedure proc = new UnassignProcedure(regionInfo, sn, force, removeAfterUnassigning); 709 proc.setOwner(getProcedureEnvironment().getRequestUser().getShortName()); 710 return proc; 711 } 712 713 private MoveRegionProcedure createMoveRegionProcedure(RegionPlan plan) throws HBaseIOException { 714 if (plan.getRegionInfo().getTable().isSystemTable()) { 715 List<ServerName> exclude = getExcludedServersForSystemTable(); 716 if (plan.getDestination() != null && exclude.contains(plan.getDestination())) { 717 try { 718 LOG.info("Can not move " + plan.getRegionInfo() + " to " + plan.getDestination() + 719 " because the server is not with highest version"); 720 plan.setDestination(getBalancer().randomAssignment(plan.getRegionInfo(), 721 this.master.getServerManager().createDestinationServersList(exclude))); 722 } catch (HBaseIOException e) { 723 LOG.warn(e.toString(), e); 724 } 725 } 726 } 727 return new MoveRegionProcedure(getProcedureEnvironment(), plan, true); 728 } 729 730 731 public SplitTableRegionProcedure createSplitProcedure(final RegionInfo regionToSplit, 732 final byte[] splitKey) throws IOException { 733 return new SplitTableRegionProcedure(getProcedureEnvironment(), regionToSplit, splitKey); 734 } 735 736 public MergeTableRegionsProcedure createMergeProcedure(RegionInfo ... ris) throws IOException { 737 return new MergeTableRegionsProcedure(getProcedureEnvironment(), ris, false); 738 } 739 740 /** 741 * Delete the region states. This is called by "DeleteTable" 742 */ 743 public void deleteTable(final TableName tableName) throws IOException { 744 final ArrayList<RegionInfo> regions = regionStates.getTableRegionsInfo(tableName); 745 regionStateStore.deleteRegions(regions); 746 for (int i = 0; i < regions.size(); ++i) { 747 final RegionInfo regionInfo = regions.get(i); 748 // we expect the region to be offline 749 regionStates.removeFromOfflineRegions(regionInfo); 750 regionStates.deleteRegion(regionInfo); 751 } 752 } 753 754 // ============================================================================================ 755 // RS Region Transition Report helpers 756 // ============================================================================================ 757 // TODO: Move this code in MasterRpcServices and call on specific event? 758 public ReportRegionStateTransitionResponse reportRegionStateTransition( 759 final ReportRegionStateTransitionRequest req) 760 throws PleaseHoldException { 761 final ReportRegionStateTransitionResponse.Builder builder = 762 ReportRegionStateTransitionResponse.newBuilder(); 763 final ServerName serverName = ProtobufUtil.toServerName(req.getServer()); 764 try { 765 for (RegionStateTransition transition: req.getTransitionList()) { 766 switch (transition.getTransitionCode()) { 767 case OPENED: 768 case FAILED_OPEN: 769 case CLOSED: 770 assert transition.getRegionInfoCount() == 1 : transition; 771 final RegionInfo hri = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0)); 772 updateRegionTransition(serverName, transition.getTransitionCode(), hri, 773 transition.hasOpenSeqNum() ? transition.getOpenSeqNum() : HConstants.NO_SEQNUM); 774 break; 775 case READY_TO_SPLIT: 776 case SPLIT: 777 case SPLIT_REVERTED: 778 assert transition.getRegionInfoCount() == 3 : transition; 779 final RegionInfo parent = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0)); 780 final RegionInfo splitA = ProtobufUtil.toRegionInfo(transition.getRegionInfo(1)); 781 final RegionInfo splitB = ProtobufUtil.toRegionInfo(transition.getRegionInfo(2)); 782 updateRegionSplitTransition(serverName, transition.getTransitionCode(), 783 parent, splitA, splitB); 784 break; 785 case READY_TO_MERGE: 786 case MERGED: 787 case MERGE_REVERTED: 788 assert transition.getRegionInfoCount() == 3 : transition; 789 final RegionInfo merged = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0)); 790 final RegionInfo mergeA = ProtobufUtil.toRegionInfo(transition.getRegionInfo(1)); 791 final RegionInfo mergeB = ProtobufUtil.toRegionInfo(transition.getRegionInfo(2)); 792 updateRegionMergeTransition(serverName, transition.getTransitionCode(), 793 merged, mergeA, mergeB); 794 break; 795 } 796 } 797 } catch (PleaseHoldException e) { 798 if (LOG.isTraceEnabled()) LOG.trace("Failed transition " + e.getMessage()); 799 throw e; 800 } catch (UnsupportedOperationException|IOException e) { 801 // TODO: at the moment we have a single error message and the RS will abort 802 // if the master says that one of the region transitions failed. 803 LOG.warn("Failed transition", e); 804 builder.setErrorMessage("Failed transition " + e.getMessage()); 805 } 806 return builder.build(); 807 } 808 809 /** 810 * Called when the RegionServer wants to report a Procedure transition. 811 * Ends up calling {@link #reportTransition(RegionStateNode, ServerName, TransitionCode, long)} 812 */ 813 private void updateRegionTransition(final ServerName serverName, final TransitionCode state, 814 final RegionInfo regionInfo, final long seqId) 815 throws PleaseHoldException, UnexpectedStateException { 816 checkMetaLoaded(regionInfo); 817 818 final RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo); 819 if (regionNode == null) { 820 // the table/region is gone. maybe a delete, split, merge 821 throw new UnexpectedStateException(String.format( 822 "Server %s was trying to transition region %s to %s. but the region was removed.", 823 serverName, regionInfo, state)); 824 } 825 826 if (LOG.isTraceEnabled()) { 827 LOG.trace(String.format("Update region transition serverName=%s region=%s regionState=%s", 828 serverName, regionNode, state)); 829 } 830 831 if (!reportTransition(regionNode, serverName, state, seqId)) { 832 // Don't log WARN if shutting down cluster; during shutdown. Avoid the below messages: 833 // 2018-08-13 10:45:10,551 WARN ...AssignmentManager: No matching procedure found for 834 // rit=OPEN, location=ve0538.halxg.cloudera.com,16020,1533493000958, 835 // table=IntegrationTestBigLinkedList, region=65ab289e2fc1530df65f6c3d7cde7aa5 transition 836 // to CLOSED 837 // These happen because on cluster shutdown, we currently let the RegionServers close 838 // regions. This is the only time that region close is not run by the Master (so cluster 839 // goes down fast). Consider changing it so Master runs all shutdowns. 840 if (this.master.getServerManager().isClusterShutdown() && 841 state.equals(TransitionCode.CLOSED)) { 842 LOG.info("RegionServer {} {}", state, regionNode.getRegionInfo().getEncodedName()); 843 } else { 844 LOG.warn("No matching procedure found for {} transition to {}", regionNode, state); 845 } 846 } 847 } 848 849 // FYI: regionNode is sometimes synchronized by the caller but not always. 850 private boolean reportTransition(final RegionStateNode regionNode, ServerName serverName, 851 final TransitionCode state, final long seqId) 852 throws UnexpectedStateException { 853 synchronized (regionNode) { 854 final RegionTransitionProcedure proc = regionNode.getProcedure(); 855 if (proc == null) return false; 856 857 // serverNode.getReportEvent().removeProcedure(proc); 858 proc.reportTransition(master.getMasterProcedureExecutor().getEnvironment(), 859 serverName, state, seqId); 860 } 861 return true; 862 } 863 864 private void updateRegionSplitTransition(final ServerName serverName, final TransitionCode state, 865 final RegionInfo parent, final RegionInfo hriA, final RegionInfo hriB) 866 throws IOException { 867 checkMetaLoaded(parent); 868 869 if (state != TransitionCode.READY_TO_SPLIT) { 870 throw new UnexpectedStateException("unsupported split regionState=" + state + 871 " for parent region " + parent + 872 " maybe an old RS (< 2.0) had the operation in progress"); 873 } 874 875 // sanity check on the request 876 if (!Bytes.equals(hriA.getEndKey(), hriB.getStartKey())) { 877 throw new UnsupportedOperationException( 878 "unsupported split request with bad keys: parent=" + parent + 879 " hriA=" + hriA + " hriB=" + hriB); 880 } 881 882 // Submit the Split procedure 883 final byte[] splitKey = hriB.getStartKey(); 884 if (LOG.isDebugEnabled()) { 885 LOG.debug("Split request from " + serverName + 886 ", parent=" + parent + " splitKey=" + Bytes.toStringBinary(splitKey)); 887 } 888 master.getMasterProcedureExecutor().submitProcedure(createSplitProcedure(parent, splitKey)); 889 890 // If the RS is < 2.0 throw an exception to abort the operation, we are handling the split 891 if (master.getServerManager().getServerVersion(serverName) < 0x0200000) { 892 throw new UnsupportedOperationException(String.format( 893 "Split handled by the master: parent=%s hriA=%s hriB=%s", parent.getShortNameToLog(), hriA, hriB)); 894 } 895 } 896 897 private void updateRegionMergeTransition(final ServerName serverName, final TransitionCode state, 898 final RegionInfo merged, final RegionInfo hriA, final RegionInfo hriB) throws IOException { 899 checkMetaLoaded(merged); 900 901 if (state != TransitionCode.READY_TO_MERGE) { 902 throw new UnexpectedStateException("Unsupported merge regionState=" + state + 903 " for regionA=" + hriA + " regionB=" + hriB + " merged=" + merged + 904 " maybe an old RS (< 2.0) had the operation in progress"); 905 } 906 907 // Submit the Merge procedure 908 if (LOG.isDebugEnabled()) { 909 LOG.debug("Handling merge request from RS=" + merged + ", merged=" + merged); 910 } 911 master.getMasterProcedureExecutor().submitProcedure(createMergeProcedure(hriA, hriB)); 912 913 // If the RS is < 2.0 throw an exception to abort the operation, we are handling the merge 914 if (master.getServerManager().getServerVersion(serverName) < 0x0200000) { 915 throw new UnsupportedOperationException(String.format( 916 "Merge not handled yet: regionState=%s merged=%s hriA=%s hriB=%s", state, merged, hriA, 917 hriB)); 918 } 919 } 920 921 // ============================================================================================ 922 // RS Status update (report online regions) helpers 923 // ============================================================================================ 924 /** 925 * the master will call this method when the RS send the regionServerReport(). 926 * the report will contains the "online regions". 927 * this method will check the the online regions against the in-memory state of the AM, 928 * if there is a mismatch we will try to fence out the RS with the assumption 929 * that something went wrong on the RS side. 930 */ 931 public void reportOnlineRegions(final ServerName serverName, final Set<byte[]> regionNames) 932 throws YouAreDeadException { 933 if (!isRunning()) return; 934 if (LOG.isTraceEnabled()) { 935 LOG.trace("ReportOnlineRegions " + serverName + " regionCount=" + regionNames.size() + 936 ", metaLoaded=" + isMetaLoaded() + " " + 937 regionNames.stream().map(element -> Bytes.toStringBinary(element)). 938 collect(Collectors.toList())); 939 } 940 941 // Make sure there is a ServerStateNode for this server that just checked in. 942 ServerStateNode serverNode = regionStates.getOrCreateServer(serverName); 943 synchronized (serverNode) { 944 if (!serverNode.isInState(ServerState.ONLINE)) { 945 LOG.warn("Got a report from a server result in state " + serverNode.getState()); 946 return; 947 } 948 } 949 950 // Track the regionserver reported online regions in memory. 951 synchronized (rsReports) { 952 rsReports.put(serverName, regionNames); 953 } 954 955 if (regionNames.isEmpty()) { 956 // nothing to do if we don't have regions 957 LOG.trace("no online region found on " + serverName); 958 } else if (!isMetaLoaded()) { 959 // if we are still on startup, discard the report unless is from someone holding meta 960 checkOnlineRegionsReportForMeta(serverName, regionNames); 961 } else { 962 // The Heartbeat updates us of what regions are only. check and verify the state. 963 checkOnlineRegionsReport(serverNode, regionNames); 964 } 965 966 // wake report event 967 wakeServerReportEvent(serverNode); 968 } 969 970 void checkOnlineRegionsReportForMeta(final ServerName serverName, final Set<byte[]> regionNames) { 971 try { 972 for (byte[] regionName: regionNames) { 973 final RegionInfo hri = getMetaRegionFromName(regionName); 974 if (hri == null) { 975 if (LOG.isTraceEnabled()) { 976 LOG.trace("Skip online report for region=" + Bytes.toStringBinary(regionName) + 977 " while meta is loading"); 978 } 979 continue; 980 } 981 982 final RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(hri); 983 LOG.info("META REPORTED: " + regionNode); 984 if (!reportTransition(regionNode, serverName, TransitionCode.OPENED, 0)) { 985 LOG.warn("META REPORTED but no procedure found (complete?); set location={}", serverName); 986 regionNode.setRegionLocation(serverName); 987 } else if (LOG.isTraceEnabled()) { 988 LOG.trace("META REPORTED: " + regionNode); 989 } 990 } 991 } catch (UnexpectedStateException e) { 992 LOG.warn("KILLING " + serverName + ": " + e.getMessage()); 993 killRegionServer(serverName); 994 } 995 } 996 997 void checkOnlineRegionsReport(final ServerStateNode serverNode, final Set<byte[]> regionNames) { 998 final ServerName serverName = serverNode.getServerName(); 999 try { 1000 for (byte[] regionName: regionNames) { 1001 if (!isRunning()) return; 1002 final RegionStateNode regionNode = regionStates.getRegionStateNodeFromName(regionName); 1003 if (regionNode == null) { 1004 throw new UnexpectedStateException("Not online: " + Bytes.toStringBinary(regionName)); 1005 } 1006 synchronized (regionNode) { 1007 if (regionNode.isInState(State.OPENING, State.OPEN)) { 1008 if (!regionNode.getRegionLocation().equals(serverName)) { 1009 throw new UnexpectedStateException(regionNode.toString() + 1010 " reported OPEN on server=" + serverName + 1011 " but state has otherwise."); 1012 } else if (regionNode.isInState(State.OPENING)) { 1013 try { 1014 if (!reportTransition(regionNode, serverNode.getServerName(), 1015 TransitionCode.OPENED, 0)) { 1016 LOG.warn(regionNode.toString() + " reported OPEN on server=" + serverName + 1017 " but state has otherwise AND NO procedure is running"); 1018 } 1019 } catch (UnexpectedStateException e) { 1020 LOG.warn(regionNode.toString() + " reported unexpteced OPEN: " + e.getMessage(), e); 1021 } 1022 } 1023 } else if (!regionNode.isInState(State.CLOSING, State.SPLITTING)) { 1024 long diff = regionNode.getLastUpdate() - EnvironmentEdgeManager.currentTime(); 1025 if (diff > 1000/*One Second... make configurable if an issue*/) { 1026 // So, we can get report that a region is CLOSED or SPLIT because a heartbeat 1027 // came in at about same time as a region transition. Make sure there is some 1028 // elapsed time between killing remote server. 1029 throw new UnexpectedStateException(regionNode.toString() + 1030 " reported an unexpected OPEN; time since last update=" + diff); 1031 } 1032 } 1033 } 1034 } 1035 } catch (UnexpectedStateException e) { 1036 //See HBASE-21421, we can count on reportRegionStateTransition calls 1037 //We only log a warming here. It could be a network lag. 1038 LOG.warn("Failed to checkOnlineRegionsReport, maybe due to network lag, " 1039 + "if this message continues, be careful of double assign", e); 1040 } 1041 } 1042 1043 protected boolean waitServerReportEvent(ServerName serverName, Procedure<?> proc) { 1044 ServerStateNode ssn = this.regionStates.getServerNode(serverName); 1045 if (ssn == null) { 1046 LOG.warn("Why is ServerStateNode for {} empty at this point? Creating...", serverName); 1047 ssn = this.regionStates.getOrCreateServer(serverName); 1048 } 1049 return ssn.getReportEvent().suspendIfNotReady(proc); 1050 } 1051 1052 protected void wakeServerReportEvent(final ServerStateNode serverNode) { 1053 serverNode.getReportEvent().wake(getProcedureScheduler()); 1054 } 1055 1056 // ============================================================================================ 1057 // RIT chore 1058 // ============================================================================================ 1059 private static class RegionInTransitionChore extends ProcedureInMemoryChore<MasterProcedureEnv> { 1060 public RegionInTransitionChore(final int timeoutMsec) { 1061 super(timeoutMsec); 1062 } 1063 1064 @Override 1065 protected void periodicExecute(final MasterProcedureEnv env) { 1066 final AssignmentManager am = env.getAssignmentManager(); 1067 1068 final RegionInTransitionStat ritStat = am.computeRegionInTransitionStat(); 1069 if (ritStat.hasRegionsOverThreshold()) { 1070 for (RegionState hri: ritStat.getRegionOverThreshold()) { 1071 am.handleRegionOverStuckWarningThreshold(hri.getRegion()); 1072 } 1073 } 1074 1075 // update metrics 1076 am.updateRegionsInTransitionMetrics(ritStat); 1077 } 1078 } 1079 1080 public RegionInTransitionStat computeRegionInTransitionStat() { 1081 final RegionInTransitionStat rit = new RegionInTransitionStat(getConfiguration()); 1082 rit.update(this); 1083 return rit; 1084 } 1085 1086 public static class RegionInTransitionStat { 1087 private final int ritThreshold; 1088 1089 private HashMap<String, RegionState> ritsOverThreshold = null; 1090 private long statTimestamp; 1091 private long oldestRITTime = 0; 1092 private int totalRITsTwiceThreshold = 0; 1093 private int totalRITs = 0; 1094 1095 @VisibleForTesting 1096 public RegionInTransitionStat(final Configuration conf) { 1097 this.ritThreshold = 1098 conf.getInt(METRICS_RIT_STUCK_WARNING_THRESHOLD, DEFAULT_RIT_STUCK_WARNING_THRESHOLD); 1099 } 1100 1101 public int getRITThreshold() { 1102 return ritThreshold; 1103 } 1104 1105 public long getTimestamp() { 1106 return statTimestamp; 1107 } 1108 1109 public int getTotalRITs() { 1110 return totalRITs; 1111 } 1112 1113 public long getOldestRITTime() { 1114 return oldestRITTime; 1115 } 1116 1117 public int getTotalRITsOverThreshold() { 1118 Map<String, RegionState> m = this.ritsOverThreshold; 1119 return m != null ? m.size() : 0; 1120 } 1121 1122 public boolean hasRegionsTwiceOverThreshold() { 1123 return totalRITsTwiceThreshold > 0; 1124 } 1125 1126 public boolean hasRegionsOverThreshold() { 1127 Map<String, RegionState> m = this.ritsOverThreshold; 1128 return m != null && !m.isEmpty(); 1129 } 1130 1131 public Collection<RegionState> getRegionOverThreshold() { 1132 Map<String, RegionState> m = this.ritsOverThreshold; 1133 return m != null? m.values(): Collections.emptySet(); 1134 } 1135 1136 public boolean isRegionOverThreshold(final RegionInfo regionInfo) { 1137 Map<String, RegionState> m = this.ritsOverThreshold; 1138 return m != null && m.containsKey(regionInfo.getEncodedName()); 1139 } 1140 1141 public boolean isRegionTwiceOverThreshold(final RegionInfo regionInfo) { 1142 Map<String, RegionState> m = this.ritsOverThreshold; 1143 if (m == null) return false; 1144 final RegionState state = m.get(regionInfo.getEncodedName()); 1145 if (state == null) return false; 1146 return (statTimestamp - state.getStamp()) > (ritThreshold * 2); 1147 } 1148 1149 protected void update(final AssignmentManager am) { 1150 final RegionStates regionStates = am.getRegionStates(); 1151 this.statTimestamp = EnvironmentEdgeManager.currentTime(); 1152 update(regionStates.getRegionsStateInTransition(), statTimestamp); 1153 update(regionStates.getRegionFailedOpen(), statTimestamp); 1154 } 1155 1156 private void update(final Collection<RegionState> regions, final long currentTime) { 1157 for (RegionState state: regions) { 1158 totalRITs++; 1159 final long ritTime = currentTime - state.getStamp(); 1160 if (ritTime > ritThreshold) { 1161 if (ritsOverThreshold == null) { 1162 ritsOverThreshold = new HashMap<String, RegionState>(); 1163 } 1164 ritsOverThreshold.put(state.getRegion().getEncodedName(), state); 1165 totalRITsTwiceThreshold += (ritTime > (ritThreshold * 2)) ? 1 : 0; 1166 } 1167 if (oldestRITTime < ritTime) { 1168 oldestRITTime = ritTime; 1169 } 1170 } 1171 } 1172 } 1173 1174 private void updateRegionsInTransitionMetrics(final RegionInTransitionStat ritStat) { 1175 metrics.updateRITOldestAge(ritStat.getOldestRITTime()); 1176 metrics.updateRITCount(ritStat.getTotalRITs()); 1177 metrics.updateRITCountOverThreshold(ritStat.getTotalRITsOverThreshold()); 1178 } 1179 1180 private void handleRegionOverStuckWarningThreshold(final RegionInfo regionInfo) { 1181 final RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo); 1182 //if (regionNode.isStuck()) { 1183 LOG.warn("STUCK Region-In-Transition {}", regionNode); 1184 } 1185 1186 // ============================================================================================ 1187 // TODO: Master load/bootstrap 1188 // ============================================================================================ 1189 public void joinCluster() throws IOException { 1190 long startTime = System.nanoTime(); 1191 LOG.debug("Joining cluster..."); 1192 1193 // Scan hbase:meta to build list of existing regions, servers, and assignment. 1194 // hbase:meta is online now or will be. Inside loadMeta, we keep trying. Can't make progress 1195 // w/o meta. 1196 loadMeta(); 1197 1198 while (master.getServerManager().countOfRegionServers() < 1) { 1199 LOG.info("Waiting for RegionServers to join; current count={}", 1200 master.getServerManager().countOfRegionServers()); 1201 Threads.sleep(250); 1202 } 1203 LOG.info("Number of RegionServers={}", master.getServerManager().countOfRegionServers()); 1204 1205 // Start the RIT chore 1206 master.getMasterProcedureExecutor().addChore(this.ritChore); 1207 1208 long costMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime); 1209 LOG.info("Joined the cluster in {}", StringUtils.humanTimeDiff(costMs)); 1210 } 1211 1212 /** 1213 * Create assign procedure for offline regions. 1214 * Just follow the old processofflineServersWithOnlineRegions method. Since now we do not need to 1215 * deal with dead server any more, we only deal with the regions in OFFLINE state in this method. 1216 * And this is a bit strange, that for new regions, we will add it in CLOSED state instead of 1217 * OFFLINE state, and usually there will be a procedure to track them. The 1218 * processofflineServersWithOnlineRegions is a legacy from long ago, as things are going really 1219 * different now, maybe we do not need this method any more. Need to revisit later. 1220 */ 1221 // Public so can be run by the Master as part of the startup. Needs hbase:meta to be online. 1222 // Needs to be done after the table state manager has been started. 1223 public void processOfflineRegions() { 1224 List<RegionInfo> offlineRegions = regionStates.getRegionStates().stream() 1225 .filter(RegionState::isOffline).filter(s -> isTableEnabled(s.getRegion().getTable())) 1226 .map(RegionState::getRegion).collect(Collectors.toList()); 1227 if (!offlineRegions.isEmpty()) { 1228 master.getMasterProcedureExecutor().submitProcedures( 1229 master.getAssignmentManager().createRoundRobinAssignProcedures(offlineRegions)); 1230 } 1231 } 1232 1233 private void loadMeta() throws IOException { 1234 // TODO: use a thread pool 1235 regionStateStore.visitMeta(new RegionStateStore.RegionStateVisitor() { 1236 @Override 1237 public void visitRegionState(Result result, final RegionInfo regionInfo, final State state, 1238 final ServerName regionLocation, final ServerName lastHost, final long openSeqNum) { 1239 if (state == null && regionLocation == null && lastHost == null && 1240 openSeqNum == SequenceId.NO_SEQUENCE_ID) { 1241 // This is a row with nothing in it. 1242 LOG.warn("Skipping empty row={}", result); 1243 return; 1244 } 1245 State localState = state; 1246 if (localState == null) { 1247 // No region state column data in hbase:meta table! Are I doing a rolling upgrade from 1248 // hbase1 to hbase2? Am I restoring a SNAPSHOT or otherwise adding a region to hbase:meta? 1249 // In any of these cases, state is empty. For now, presume OFFLINE but there are probably 1250 // cases where we need to probe more to be sure this correct; TODO informed by experience. 1251 LOG.info(regionInfo.getEncodedName() + " regionState=null; presuming " + State.OFFLINE); 1252 1253 localState = State.OFFLINE; 1254 } 1255 final RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo); 1256 synchronized (regionNode) { 1257 if (!regionNode.isInTransition()) { 1258 regionNode.setState(localState); 1259 regionNode.setLastHost(lastHost); 1260 regionNode.setRegionLocation(regionLocation); 1261 regionNode.setOpenSeqNum(openSeqNum); 1262 if (localState == State.OPEN) { 1263 assert regionLocation != null : "found null region location for " + regionNode; 1264 regionStates.getOrCreateServer(regionNode.getRegionLocation()); 1265 regionStates.addRegionToServer(regionNode); 1266 } else if (localState == State.OFFLINE || regionInfo.isOffline()) { 1267 regionStates.addToOfflineRegions(regionNode); 1268 } else if (localState == State.CLOSED && getTableStateManager(). 1269 isTableState(regionNode.getTable(), TableState.State.DISABLED, 1270 TableState.State.DISABLING)) { 1271 // The region is CLOSED and the table is DISABLED/ DISABLING, there is nothing to 1272 // schedule; the region is inert. 1273 } else { 1274 // This is region in CLOSING or OPENING state. 1275 // These regions should have a procedure in replay. 1276 // If they don't, then they will show as STUCK after a while because of the below 1277 // registration and will need intervention by operator to fix. Add them to a server 1278 // even if the server is not around so a SCP cleans them up. 1279 if (regionLocation != null) { 1280 regionStates.getOrCreateServer(regionLocation); 1281 } 1282 regionStates.addRegionInTransition(regionNode, null); 1283 } 1284 } else { 1285 LOG.info("RIT {}", regionNode); 1286 } 1287 } 1288 } 1289 }); 1290 1291 // every assignment is blocked until meta is loaded. 1292 wakeMetaLoadedEvent(); 1293 } 1294 1295 /** 1296 * Used to check if the meta loading is done. 1297 * <p/> 1298 * if not we throw PleaseHoldException since we are rebuilding the RegionStates 1299 * @param hri region to check if it is already rebuild 1300 * @throws PleaseHoldException if meta has not been loaded yet 1301 */ 1302 private void checkMetaLoaded(RegionInfo hri) throws PleaseHoldException { 1303 if (!isRunning()) { 1304 throw new PleaseHoldException("AssignmentManager not running"); 1305 } 1306 boolean meta = isMetaRegion(hri); 1307 boolean metaLoaded = isMetaLoaded(); 1308 if (!meta && !metaLoaded) { 1309 throw new PleaseHoldException( 1310 "Master not fully online; hbase:meta=" + meta + ", metaLoaded=" + metaLoaded); 1311 } 1312 } 1313 1314 // ============================================================================================ 1315 // TODO: Metrics 1316 // ============================================================================================ 1317 public int getNumRegionsOpened() { 1318 // TODO: Used by TestRegionPlacement.java and assume monotonically increasing value 1319 return 0; 1320 } 1321 1322 public long submitServerCrash(final ServerName serverName, final boolean shouldSplitWal) { 1323 boolean carryingMeta = isCarryingMeta(serverName); 1324 1325 // Remove the in-memory rsReports result 1326 synchronized (rsReports) { 1327 rsReports.remove(serverName); 1328 } 1329 1330 ProcedureExecutor<MasterProcedureEnv> procExec = this.master.getMasterProcedureExecutor(); 1331 long pid = procExec.submitProcedure(new ServerCrashProcedure(procExec.getEnvironment(), 1332 serverName, shouldSplitWal, carryingMeta)); 1333 LOG.debug("Added=" + serverName 1334 + " to dead servers, submitted shutdown handler to be executed meta=" + carryingMeta); 1335 return pid; 1336 } 1337 1338 public void offlineRegion(final RegionInfo regionInfo) { 1339 // TODO used by MasterRpcServices ServerCrashProcedure 1340 final RegionStateNode node = regionStates.getRegionStateNode(regionInfo); 1341 if (node != null) node.offline(); 1342 } 1343 1344 public void onlineRegion(final RegionInfo regionInfo, final ServerName serverName) { 1345 // TODO used by TestSplitTransactionOnCluster.java 1346 } 1347 1348 public Map<ServerName, List<RegionInfo>> getSnapShotOfAssignment( 1349 final Collection<RegionInfo> regions) { 1350 return regionStates.getSnapShotOfAssignment(regions); 1351 } 1352 1353 // ============================================================================================ 1354 // TODO: UTILS/HELPERS? 1355 // ============================================================================================ 1356 /** 1357 * Used by the client (via master) to identify if all regions have the schema updates 1358 * 1359 * @param tableName 1360 * @return Pair indicating the status of the alter command (pending/total) 1361 * @throws IOException 1362 */ 1363 public Pair<Integer, Integer> getReopenStatus(TableName tableName) { 1364 if (isTableDisabled(tableName)) return new Pair<Integer, Integer>(0, 0); 1365 1366 final List<RegionState> states = regionStates.getTableRegionStates(tableName); 1367 int ritCount = 0; 1368 for (RegionState regionState: states) { 1369 if (!regionState.isOpened() && !regionState.isSplit()) { 1370 ritCount++; 1371 } 1372 } 1373 return new Pair<Integer, Integer>(ritCount, states.size()); 1374 } 1375 1376 // ============================================================================================ 1377 // TODO: Region State In Transition 1378 // ============================================================================================ 1379 protected boolean addRegionInTransition(final RegionStateNode regionNode, 1380 final RegionTransitionProcedure procedure) { 1381 return regionStates.addRegionInTransition(regionNode, procedure); 1382 } 1383 1384 protected void removeRegionInTransition(final RegionStateNode regionNode, 1385 final RegionTransitionProcedure procedure) { 1386 regionStates.removeRegionInTransition(regionNode, procedure); 1387 } 1388 1389 public boolean hasRegionsInTransition() { 1390 return regionStates.hasRegionsInTransition(); 1391 } 1392 1393 public List<RegionStateNode> getRegionsInTransition() { 1394 return regionStates.getRegionsInTransition(); 1395 } 1396 1397 public List<RegionInfo> getAssignedRegions() { 1398 return regionStates.getAssignedRegions(); 1399 } 1400 1401 public RegionInfo getRegionInfo(final byte[] regionName) { 1402 final RegionStateNode regionState = regionStates.getRegionStateNodeFromName(regionName); 1403 return regionState != null ? regionState.getRegionInfo() : null; 1404 } 1405 1406 // ============================================================================================ 1407 // TODO: Region Status update 1408 // ============================================================================================ 1409 private void sendRegionOpenedNotification(final RegionInfo regionInfo, 1410 final ServerName serverName) { 1411 getBalancer().regionOnline(regionInfo, serverName); 1412 if (!this.listeners.isEmpty()) { 1413 for (AssignmentListener listener : this.listeners) { 1414 listener.regionOpened(regionInfo, serverName); 1415 } 1416 } 1417 } 1418 1419 private void sendRegionClosedNotification(final RegionInfo regionInfo) { 1420 getBalancer().regionOffline(regionInfo); 1421 if (!this.listeners.isEmpty()) { 1422 for (AssignmentListener listener : this.listeners) { 1423 listener.regionClosed(regionInfo); 1424 } 1425 } 1426 } 1427 1428 public void markRegionAsOpening(final RegionStateNode regionNode) throws IOException { 1429 synchronized (regionNode) { 1430 regionNode.transitionState(State.OPENING, RegionStates.STATES_EXPECTED_ON_OPEN); 1431 regionStates.addRegionToServer(regionNode); 1432 regionStateStore.updateRegionLocation(regionNode); 1433 } 1434 1435 // update the operation count metrics 1436 metrics.incrementOperationCounter(); 1437 } 1438 1439 public void undoRegionAsOpening(final RegionStateNode regionNode) { 1440 boolean opening = false; 1441 synchronized (regionNode) { 1442 if (regionNode.isInState(State.OPENING)) { 1443 opening = true; 1444 regionStates.removeRegionFromServer(regionNode.getRegionLocation(), regionNode); 1445 } 1446 // Should we update hbase:meta? 1447 } 1448 if (opening) { 1449 // TODO: Metrics. Do opposite of metrics.incrementOperationCounter(); 1450 } 1451 } 1452 1453 public void markRegionAsOpened(final RegionStateNode regionNode) throws IOException { 1454 final RegionInfo hri = regionNode.getRegionInfo(); 1455 synchronized (regionNode) { 1456 regionNode.transitionState(State.OPEN, RegionStates.STATES_EXPECTED_ON_OPEN); 1457 if (isMetaRegion(hri)) { 1458 // Usually we'd set a table ENABLED at this stage but hbase:meta is ALWAYs enabled, it 1459 // can't be disabled -- so skip the RPC (besides... enabled is managed by TableStateManager 1460 // which is backed by hbase:meta... Avoid setting ENABLED to avoid having to update state 1461 // on table that contains state. 1462 setMetaAssigned(hri, true); 1463 } 1464 regionStates.addRegionToServer(regionNode); 1465 // TODO: OPENING Updates hbase:meta too... we need to do both here and there? 1466 // That is a lot of hbase:meta writing. 1467 regionStateStore.updateRegionLocation(regionNode); 1468 sendRegionOpenedNotification(hri, regionNode.getRegionLocation()); 1469 } 1470 } 1471 1472 public void markRegionAsClosing(final RegionStateNode regionNode) throws IOException { 1473 final RegionInfo hri = regionNode.getRegionInfo(); 1474 synchronized (regionNode) { 1475 regionNode.transitionState(State.CLOSING, RegionStates.STATES_EXPECTED_ON_CLOSE); 1476 // Set meta has not initialized early. so people trying to create/edit tables will wait 1477 if (isMetaRegion(hri)) { 1478 setMetaAssigned(hri, false); 1479 } 1480 regionStates.addRegionToServer(regionNode); 1481 regionStateStore.updateRegionLocation(regionNode); 1482 } 1483 1484 // update the operation count metrics 1485 metrics.incrementOperationCounter(); 1486 } 1487 1488 public void undoRegionAsClosing(final RegionStateNode regionNode) { 1489 // TODO: Metrics. Do opposite of metrics.incrementOperationCounter(); 1490 // There is nothing to undo? 1491 } 1492 1493 public void markRegionAsClosed(final RegionStateNode regionNode) throws IOException { 1494 final RegionInfo hri = regionNode.getRegionInfo(); 1495 synchronized (regionNode) { 1496 regionNode.transitionState(State.CLOSED, RegionStates.STATES_EXPECTED_ON_CLOSE); 1497 regionStates.removeRegionFromServer(regionNode.getRegionLocation(), regionNode); 1498 regionNode.setLastHost(regionNode.getRegionLocation()); 1499 regionNode.setRegionLocation(null); 1500 regionStateStore.updateRegionLocation(regionNode); 1501 sendRegionClosedNotification(hri); 1502 } 1503 } 1504 1505 public void markRegionAsSplit(final RegionInfo parent, final ServerName serverName, 1506 final RegionInfo daughterA, final RegionInfo daughterB) 1507 throws IOException { 1508 // Update hbase:meta. Parent will be marked offline and split up in hbase:meta. 1509 // The parent stays in regionStates until cleared when removed by CatalogJanitor. 1510 // Update its state in regionStates to it shows as offline and split when read 1511 // later figuring what regions are in a table and what are not: see 1512 // regionStates#getRegionsOfTable 1513 final RegionStateNode node = regionStates.getOrCreateRegionStateNode(parent); 1514 node.setState(State.SPLIT); 1515 final RegionStateNode nodeA = regionStates.getOrCreateRegionStateNode(daughterA); 1516 nodeA.setState(State.SPLITTING_NEW); 1517 final RegionStateNode nodeB = regionStates.getOrCreateRegionStateNode(daughterB); 1518 nodeB.setState(State.SPLITTING_NEW); 1519 1520 regionStateStore.splitRegion(parent, daughterA, daughterB, serverName); 1521 if (shouldAssignFavoredNodes(parent)) { 1522 List<ServerName> onlineServers = this.master.getServerManager().getOnlineServersList(); 1523 ((FavoredNodesPromoter)getBalancer()). 1524 generateFavoredNodesForDaughter(onlineServers, parent, daughterA, daughterB); 1525 } 1526 } 1527 1528 /** 1529 * When called here, the merge has happened. The merged regions have been 1530 * unassigned and the above markRegionClosed has been called on each so they have been 1531 * disassociated from a hosting Server. The merged region will be open after this call. The 1532 * merged regions are removed from hbase:meta below. Later they are deleted from the filesystem 1533 * by the catalog janitor running against hbase:meta. It notices when the merged region no 1534 * longer holds references to the old regions (References are deleted after a compaction 1535 * rewrites what the Reference points at but not until the archiver chore runs, are the 1536 * References removed). 1537 */ 1538 public void markRegionAsMerged(final RegionInfo child, final ServerName serverName, 1539 RegionInfo [] mergeParents) 1540 throws IOException { 1541 final RegionStateNode node = regionStates.getOrCreateRegionStateNode(child); 1542 node.setState(State.MERGED); 1543 for (RegionInfo ri: mergeParents) { 1544 regionStates.deleteRegion(ri); 1545 1546 } 1547 regionStateStore.mergeRegions(child, mergeParents, serverName); 1548 if (shouldAssignFavoredNodes(child)) { 1549 ((FavoredNodesPromoter)getBalancer()). 1550 generateFavoredNodesForMergedRegion(child, mergeParents); 1551 } 1552 } 1553 1554 /* 1555 * Favored nodes should be applied only when FavoredNodes balancer is configured and the region 1556 * belongs to a non-system table. 1557 */ 1558 private boolean shouldAssignFavoredNodes(RegionInfo region) { 1559 return this.shouldAssignRegionsWithFavoredNodes && 1560 FavoredNodesManager.isFavoredNodeApplicable(region); 1561 } 1562 1563 // ============================================================================================ 1564 // Assign Queue (Assign/Balance) 1565 // ============================================================================================ 1566 private final ArrayList<RegionStateNode> pendingAssignQueue = new ArrayList<RegionStateNode>(); 1567 private final ReentrantLock assignQueueLock = new ReentrantLock(); 1568 private final Condition assignQueueFullCond = assignQueueLock.newCondition(); 1569 1570 /** 1571 * Add the assign operation to the assignment queue. 1572 * The pending assignment operation will be processed, 1573 * and each region will be assigned by a server using the balancer. 1574 */ 1575 protected void queueAssign(final RegionStateNode regionNode) { 1576 regionNode.getProcedureEvent().suspend(); 1577 1578 // TODO: quick-start for meta and the other sys-tables? 1579 assignQueueLock.lock(); 1580 try { 1581 pendingAssignQueue.add(regionNode); 1582 if (regionNode.isSystemTable() || 1583 pendingAssignQueue.size() == 1 || 1584 pendingAssignQueue.size() >= assignDispatchWaitQueueMaxSize) { 1585 assignQueueFullCond.signal(); 1586 } 1587 } finally { 1588 assignQueueLock.unlock(); 1589 } 1590 } 1591 1592 private void startAssignmentThread() { 1593 // Get Server Thread name. Sometimes the Server is mocked so may not implement HasThread. 1594 // For example, in tests. 1595 String name = master instanceof HasThread? ((HasThread)master).getName(): 1596 master.getServerName().toShortString(); 1597 assignThread = new Thread(name) { 1598 @Override 1599 public void run() { 1600 while (isRunning()) { 1601 processAssignQueue(); 1602 } 1603 pendingAssignQueue.clear(); 1604 } 1605 }; 1606 assignThread.setDaemon(true); 1607 assignThread.start(); 1608 } 1609 1610 private void stopAssignmentThread() { 1611 assignQueueSignal(); 1612 try { 1613 while (assignThread.isAlive()) { 1614 assignQueueSignal(); 1615 assignThread.join(250); 1616 } 1617 } catch (InterruptedException e) { 1618 LOG.warn("join interrupted", e); 1619 Thread.currentThread().interrupt(); 1620 } 1621 } 1622 1623 private void assignQueueSignal() { 1624 assignQueueLock.lock(); 1625 try { 1626 assignQueueFullCond.signal(); 1627 } finally { 1628 assignQueueLock.unlock(); 1629 } 1630 } 1631 1632 @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP") 1633 private HashMap<RegionInfo, RegionStateNode> waitOnAssignQueue() { 1634 HashMap<RegionInfo, RegionStateNode> regions = null; 1635 1636 assignQueueLock.lock(); 1637 try { 1638 if (pendingAssignQueue.isEmpty() && isRunning()) { 1639 assignQueueFullCond.await(); 1640 } 1641 1642 if (!isRunning()) return null; 1643 assignQueueFullCond.await(assignDispatchWaitMillis, TimeUnit.MILLISECONDS); 1644 regions = new HashMap<RegionInfo, RegionStateNode>(pendingAssignQueue.size()); 1645 for (RegionStateNode regionNode: pendingAssignQueue) { 1646 regions.put(regionNode.getRegionInfo(), regionNode); 1647 } 1648 pendingAssignQueue.clear(); 1649 } catch (InterruptedException e) { 1650 LOG.warn("got interrupted ", e); 1651 Thread.currentThread().interrupt(); 1652 } finally { 1653 assignQueueLock.unlock(); 1654 } 1655 return regions; 1656 } 1657 1658 private void processAssignQueue() { 1659 final HashMap<RegionInfo, RegionStateNode> regions = waitOnAssignQueue(); 1660 if (regions == null || regions.size() == 0 || !isRunning()) { 1661 return; 1662 } 1663 1664 if (LOG.isTraceEnabled()) { 1665 LOG.trace("PROCESS ASSIGN QUEUE regionCount=" + regions.size()); 1666 } 1667 1668 // TODO: Optimize balancer. pass a RegionPlan? 1669 final HashMap<RegionInfo, ServerName> retainMap = new HashMap<>(); 1670 final List<RegionInfo> userHRIs = new ArrayList<>(regions.size()); 1671 // Regions for system tables requiring reassignment 1672 final List<RegionInfo> systemHRIs = new ArrayList<>(); 1673 for (RegionStateNode regionStateNode: regions.values()) { 1674 boolean sysTable = regionStateNode.isSystemTable(); 1675 final List<RegionInfo> hris = sysTable? systemHRIs: userHRIs; 1676 if (regionStateNode.getRegionLocation() != null) { 1677 retainMap.put(regionStateNode.getRegionInfo(), regionStateNode.getRegionLocation()); 1678 } else { 1679 hris.add(regionStateNode.getRegionInfo()); 1680 } 1681 } 1682 1683 // TODO: connect with the listener to invalidate the cache 1684 1685 // TODO use events 1686 List<ServerName> servers = master.getServerManager().createDestinationServersList(); 1687 for (int i = 0; servers.size() < 1; ++i) { 1688 // Report every fourth time around this loop; try not to flood log. 1689 if (i % 4 == 0) { 1690 LOG.warn("No servers available; cannot place " + regions.size() + " unassigned regions."); 1691 } 1692 1693 if (!isRunning()) { 1694 LOG.debug("Stopped! Dropping assign of " + regions.size() + " queued regions."); 1695 return; 1696 } 1697 Threads.sleep(250); 1698 servers = master.getServerManager().createDestinationServersList(); 1699 } 1700 1701 if (!systemHRIs.isEmpty()) { 1702 // System table regions requiring reassignment are present, get region servers 1703 // not available for system table regions 1704 final List<ServerName> excludeServers = getExcludedServersForSystemTable(); 1705 List<ServerName> serversForSysTables = servers.stream() 1706 .filter(s -> !excludeServers.contains(s)).collect(Collectors.toList()); 1707 if (serversForSysTables.isEmpty()) { 1708 LOG.warn("Filtering old server versions and the excluded produced an empty set; " + 1709 "instead considering all candidate servers!"); 1710 } 1711 LOG.debug("Processing assignQueue; systemServersCount=" + serversForSysTables.size() + 1712 ", allServersCount=" + servers.size()); 1713 processAssignmentPlans(regions, null, systemHRIs, 1714 serversForSysTables.isEmpty()? servers: serversForSysTables); 1715 } 1716 1717 processAssignmentPlans(regions, retainMap, userHRIs, servers); 1718 } 1719 1720 private void processAssignmentPlans(final HashMap<RegionInfo, RegionStateNode> regions, 1721 final HashMap<RegionInfo, ServerName> retainMap, final List<RegionInfo> hris, 1722 final List<ServerName> servers) { 1723 boolean isTraceEnabled = LOG.isTraceEnabled(); 1724 if (isTraceEnabled) { 1725 LOG.trace("Available servers count=" + servers.size() + ": " + servers); 1726 } 1727 1728 final LoadBalancer balancer = getBalancer(); 1729 // ask the balancer where to place regions 1730 if (retainMap != null && !retainMap.isEmpty()) { 1731 if (isTraceEnabled) { 1732 LOG.trace("retain assign regions=" + retainMap); 1733 } 1734 try { 1735 acceptPlan(regions, balancer.retainAssignment(retainMap, servers)); 1736 } catch (HBaseIOException e) { 1737 LOG.warn("unable to retain assignment", e); 1738 addToPendingAssignment(regions, retainMap.keySet()); 1739 } 1740 } 1741 1742 // TODO: Do we need to split retain and round-robin? 1743 // the retain seems to fallback to round-robin/random if the region is not in the map. 1744 if (!hris.isEmpty()) { 1745 Collections.sort(hris, RegionInfo.COMPARATOR); 1746 if (isTraceEnabled) { 1747 LOG.trace("round robin regions=" + hris); 1748 } 1749 try { 1750 acceptPlan(regions, balancer.roundRobinAssignment(hris, servers)); 1751 } catch (HBaseIOException e) { 1752 LOG.warn("unable to round-robin assignment", e); 1753 addToPendingAssignment(regions, hris); 1754 } 1755 } 1756 } 1757 1758 private void acceptPlan(final HashMap<RegionInfo, RegionStateNode> regions, 1759 final Map<ServerName, List<RegionInfo>> plan) throws HBaseIOException { 1760 final ProcedureEvent<?>[] events = new ProcedureEvent[regions.size()]; 1761 final long st = System.currentTimeMillis(); 1762 1763 if (plan == null) { 1764 throw new HBaseIOException("unable to compute plans for regions=" + regions.size()); 1765 } 1766 1767 if (plan.isEmpty()) return; 1768 1769 List<RegionInfo> bogusRegions = plan.remove(LoadBalancer.BOGUS_SERVER_NAME); 1770 if (bogusRegions != null && !bogusRegions.isEmpty()) { 1771 addToPendingAssignment(regions, bogusRegions); 1772 } 1773 1774 int evcount = 0; 1775 for (Map.Entry<ServerName, List<RegionInfo>> entry: plan.entrySet()) { 1776 final ServerName server = entry.getKey(); 1777 for (RegionInfo hri: entry.getValue()) { 1778 final RegionStateNode regionNode = regions.get(hri); 1779 regionNode.setRegionLocation(server); 1780 events[evcount++] = regionNode.getProcedureEvent(); 1781 } 1782 } 1783 ProcedureEvent.wakeEvents(getProcedureScheduler(), events); 1784 1785 final long et = System.currentTimeMillis(); 1786 if (LOG.isTraceEnabled()) { 1787 LOG.trace("ASSIGN ACCEPT " + events.length + " -> " + 1788 StringUtils.humanTimeDiff(et - st)); 1789 } 1790 } 1791 1792 private void addToPendingAssignment(final HashMap<RegionInfo, RegionStateNode> regions, 1793 final Collection<RegionInfo> pendingRegions) { 1794 assignQueueLock.lock(); 1795 try { 1796 for (RegionInfo hri: pendingRegions) { 1797 pendingAssignQueue.add(regions.get(hri)); 1798 } 1799 } finally { 1800 assignQueueLock.unlock(); 1801 } 1802 } 1803 1804 /** 1805 * Get a list of servers that this region cannot be assigned to. 1806 * For system tables, we must assign them to a server with highest version. 1807 */ 1808 public List<ServerName> getExcludedServersForSystemTable() { 1809 // TODO: This should be a cached list kept by the ServerManager rather than calculated on each 1810 // move or system region assign. The RegionServerTracker keeps list of online Servers with 1811 // RegionServerInfo that includes Version. 1812 List<Pair<ServerName, String>> serverList = master.getServerManager().getOnlineServersList() 1813 .stream() 1814 .map((s)->new Pair<>(s, master.getRegionServerVersion(s))) 1815 .collect(Collectors.toList()); 1816 if (serverList.isEmpty()) { 1817 return Collections.emptyList(); 1818 } 1819 String highestVersion = Collections.max(serverList, 1820 (o1, o2) -> VersionInfo.compareVersion(o1.getSecond(), o2.getSecond())).getSecond(); 1821 return serverList.stream() 1822 .filter((p)->!p.getSecond().equals(highestVersion)) 1823 .map(Pair::getFirst) 1824 .collect(Collectors.toList()); 1825 } 1826 1827 // ============================================================================================ 1828 // Server Helpers 1829 // ============================================================================================ 1830 @Override 1831 public void serverAdded(final ServerName serverName) { 1832 } 1833 1834 @Override 1835 public void serverRemoved(final ServerName serverName) { 1836 final ServerStateNode serverNode = regionStates.getServerNode(serverName); 1837 if (serverNode == null) return; 1838 1839 // just in case, wake procedures waiting for this server report 1840 wakeServerReportEvent(serverNode); 1841 } 1842 1843 private void killRegionServer(final ServerName serverName) { 1844 master.getServerManager().expireServer(serverName); 1845 } 1846 1847 /** 1848 * <p> 1849 * This is a very particular check. The {@link org.apache.hadoop.hbase.master.ServerManager} is 1850 * where you go to check on state of 'Servers', what Servers are online, etc. 1851 * </p> 1852 * <p> 1853 * Here we are checking the state of a server that is post expiration, a ServerManager function 1854 * that moves a server from online to dead. Here we are seeing if the server has moved beyond a 1855 * particular point in the recovery process such that it is safe to move on with assigns; etc. 1856 * </p> 1857 * <p> 1858 * For now it is only used in 1859 * {@link UnassignProcedure#remoteCallFailed(MasterProcedureEnv, RegionStateNode, IOException)} to 1860 * see whether we can safely quit without losing data. 1861 * </p> 1862 * @param meta whether to check for meta log splitting 1863 * @return {@code true} if the server does not exist or the log splitting is done, i.e, the server 1864 * is in OFFLINE state, or for meta log, is in SPLITTING_META_DONE state. If null, 1865 * presumes the ServerStateNode was cleaned up by SCP. 1866 * @see UnassignProcedure#remoteCallFailed(MasterProcedureEnv, RegionStateNode, IOException) 1867 */ 1868 boolean isLogSplittingDone(ServerName serverName, boolean meta) { 1869 ServerStateNode ssn = this.regionStates.getServerNode(serverName); 1870 if (ssn == null) { 1871 return true; 1872 } 1873 ServerState[] inState = 1874 meta 1875 ? new ServerState[] { ServerState.SPLITTING_META_DONE, ServerState.SPLITTING, 1876 ServerState.OFFLINE } 1877 : new ServerState[] { ServerState.OFFLINE }; 1878 synchronized (ssn) { 1879 return ssn.isInState(inState); 1880 } 1881 } 1882 1883 @VisibleForTesting 1884 MasterServices getMaster() { 1885 return master; 1886 } 1887 1888 /** 1889 * @return a snapshot of rsReports 1890 */ 1891 public Map<ServerName, Set<byte[]>> getRSReports() { 1892 Map<ServerName, Set<byte[]>> rsReportsSnapshot = new HashMap<>(); 1893 synchronized (rsReports) { 1894 rsReports.entrySet().forEach(e -> rsReportsSnapshot.put(e.getKey(), e.getValue())); 1895 } 1896 return rsReportsSnapshot; 1897 } 1898}