001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.assignment; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Arrays; 023import java.util.Collection; 024import java.util.Collections; 025import java.util.HashMap; 026import java.util.HashSet; 027import java.util.List; 028import java.util.Map; 029import java.util.Set; 030import java.util.concurrent.CopyOnWriteArrayList; 031import java.util.concurrent.Future; 032import java.util.concurrent.TimeUnit; 033import java.util.concurrent.atomic.AtomicBoolean; 034import java.util.concurrent.locks.Condition; 035import java.util.concurrent.locks.ReentrantLock; 036import java.util.stream.Collectors; 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.hbase.HBaseIOException; 039import org.apache.hadoop.hbase.HConstants; 040import org.apache.hadoop.hbase.PleaseHoldException; 041import org.apache.hadoop.hbase.ServerName; 042import org.apache.hadoop.hbase.TableName; 043import org.apache.hadoop.hbase.YouAreDeadException; 044import org.apache.hadoop.hbase.client.RegionInfo; 045import org.apache.hadoop.hbase.client.RegionInfoBuilder; 046import org.apache.hadoop.hbase.client.RegionStatesCount; 047import org.apache.hadoop.hbase.client.Result; 048import org.apache.hadoop.hbase.client.TableState; 049import org.apache.hadoop.hbase.exceptions.UnexpectedStateException; 050import org.apache.hadoop.hbase.favored.FavoredNodesManager; 051import org.apache.hadoop.hbase.favored.FavoredNodesPromoter; 052import org.apache.hadoop.hbase.master.AssignmentListener; 053import org.apache.hadoop.hbase.master.LoadBalancer; 054import org.apache.hadoop.hbase.master.MasterServices; 055import org.apache.hadoop.hbase.master.MetricsAssignmentManager; 056import org.apache.hadoop.hbase.master.RegionPlan; 057import org.apache.hadoop.hbase.master.RegionState; 058import org.apache.hadoop.hbase.master.RegionState.State; 059import org.apache.hadoop.hbase.master.ServerListener; 060import org.apache.hadoop.hbase.master.TableStateManager; 061import org.apache.hadoop.hbase.master.assignment.RegionStates.RegionStateNode; 062import org.apache.hadoop.hbase.master.assignment.RegionStates.ServerState; 063import org.apache.hadoop.hbase.master.assignment.RegionStates.ServerStateNode; 064import org.apache.hadoop.hbase.master.balancer.FavoredStochasticBalancer; 065import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 066import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler; 067import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait; 068import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure; 069import org.apache.hadoop.hbase.procedure2.Procedure; 070import org.apache.hadoop.hbase.procedure2.ProcedureEvent; 071import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; 072import org.apache.hadoop.hbase.procedure2.ProcedureInMemoryChore; 073import org.apache.hadoop.hbase.procedure2.util.StringUtils; 074import org.apache.hadoop.hbase.regionserver.SequenceId; 075import org.apache.hadoop.hbase.util.Bytes; 076import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 077import org.apache.hadoop.hbase.util.HasThread; 078import org.apache.hadoop.hbase.util.Pair; 079import org.apache.hadoop.hbase.util.Threads; 080import org.apache.hadoop.hbase.util.VersionInfo; 081import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; 082import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 083import org.apache.yetus.audience.InterfaceAudience; 084import org.apache.zookeeper.KeeperException; 085import org.slf4j.Logger; 086import org.slf4j.LoggerFactory; 087 088import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 089 090import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 091import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition; 092import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; 093import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest; 094import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse; 095 096/** 097 * The AssignmentManager is the coordinator for region assign/unassign operations. 098 * <ul> 099 * <li>In-memory states of regions and servers are stored in {@link RegionStates}.</li> 100 * <li>hbase:meta state updates are handled by {@link RegionStateStore}.</li> 101 * </ul> 102 * Regions are created by CreateTable, Split, Merge. 103 * Regions are deleted by DeleteTable, Split, Merge. 104 * Assigns are triggered by CreateTable, EnableTable, Split, Merge, ServerCrash. 105 * Unassigns are triggered by DisableTable, Split, Merge 106 */ 107@InterfaceAudience.Private 108public class AssignmentManager implements ServerListener { 109 private static final Logger LOG = LoggerFactory.getLogger(AssignmentManager.class); 110 111 // TODO: AMv2 112 // - handle region migration from hbase1 to hbase2. 113 // - handle sys table assignment first (e.g. acl, namespace) 114 // - handle table priorities 115 // - If ServerBusyException trying to update hbase:meta, we abort the Master 116 // See updateRegionLocation in RegionStateStore. 117 // 118 // See also 119 // https://docs.google.com/document/d/1eVKa7FHdeoJ1-9o8yZcOTAQbv0u0bblBlCCzVSIn69g/edit#heading=h.ystjyrkbtoq5 120 // for other TODOs. 121 122 public static final String BOOTSTRAP_THREAD_POOL_SIZE_CONF_KEY = 123 "hbase.assignment.bootstrap.thread.pool.size"; 124 125 public static final String ASSIGN_DISPATCH_WAIT_MSEC_CONF_KEY = 126 "hbase.assignment.dispatch.wait.msec"; 127 private static final int DEFAULT_ASSIGN_DISPATCH_WAIT_MSEC = 150; 128 129 public static final String ASSIGN_DISPATCH_WAITQ_MAX_CONF_KEY = 130 "hbase.assignment.dispatch.wait.queue.max.size"; 131 private static final int DEFAULT_ASSIGN_DISPATCH_WAITQ_MAX = 100; 132 133 public static final String RIT_CHORE_INTERVAL_MSEC_CONF_KEY = 134 "hbase.assignment.rit.chore.interval.msec"; 135 private static final int DEFAULT_RIT_CHORE_INTERVAL_MSEC = 60 * 1000; 136 137 public static final String ASSIGN_MAX_ATTEMPTS = 138 "hbase.assignment.maximum.attempts"; 139 private static final int DEFAULT_ASSIGN_MAX_ATTEMPTS = Integer.MAX_VALUE; 140 141 /** Region in Transition metrics threshold time */ 142 public static final String METRICS_RIT_STUCK_WARNING_THRESHOLD = 143 "hbase.metrics.rit.stuck.warning.threshold"; 144 private static final int DEFAULT_RIT_STUCK_WARNING_THRESHOLD = 60 * 1000; 145 146 private final ProcedureEvent<?> metaAssignEvent = new ProcedureEvent<>("meta assign"); 147 private final ProcedureEvent<?> metaLoadEvent = new ProcedureEvent<>("meta load"); 148 149 /** Listeners that are called on assignment events. */ 150 private final CopyOnWriteArrayList<AssignmentListener> listeners = 151 new CopyOnWriteArrayList<AssignmentListener>(); 152 153 private final MetricsAssignmentManager metrics; 154 private final RegionInTransitionChore ritChore; 155 private final MasterServices master; 156 157 private final AtomicBoolean running = new AtomicBoolean(false); 158 private final RegionStates regionStates = new RegionStates(); 159 private final RegionStateStore regionStateStore; 160 161 private final Map<ServerName, Set<byte[]>> rsReports = new HashMap<>(); 162 163 private final boolean shouldAssignRegionsWithFavoredNodes; 164 private final int assignDispatchWaitQueueMaxSize; 165 private final int assignDispatchWaitMillis; 166 private final int assignMaxAttempts; 167 168 private final Object checkIfShouldMoveSystemRegionLock = new Object(); 169 170 private Thread assignThread; 171 172 public AssignmentManager(final MasterServices master) { 173 this(master, new RegionStateStore(master)); 174 } 175 176 public AssignmentManager(final MasterServices master, final RegionStateStore stateStore) { 177 this.master = master; 178 this.regionStateStore = stateStore; 179 this.metrics = new MetricsAssignmentManager(); 180 181 final Configuration conf = master.getConfiguration(); 182 183 // Only read favored nodes if using the favored nodes load balancer. 184 this.shouldAssignRegionsWithFavoredNodes = FavoredStochasticBalancer.class.isAssignableFrom( 185 conf.getClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, Object.class)); 186 187 this.assignDispatchWaitMillis = conf.getInt(ASSIGN_DISPATCH_WAIT_MSEC_CONF_KEY, 188 DEFAULT_ASSIGN_DISPATCH_WAIT_MSEC); 189 this.assignDispatchWaitQueueMaxSize = conf.getInt(ASSIGN_DISPATCH_WAITQ_MAX_CONF_KEY, 190 DEFAULT_ASSIGN_DISPATCH_WAITQ_MAX); 191 192 this.assignMaxAttempts = Math.max(1, conf.getInt(ASSIGN_MAX_ATTEMPTS, 193 DEFAULT_ASSIGN_MAX_ATTEMPTS)); 194 195 int ritChoreInterval = conf.getInt(RIT_CHORE_INTERVAL_MSEC_CONF_KEY, 196 DEFAULT_RIT_CHORE_INTERVAL_MSEC); 197 this.ritChore = new RegionInTransitionChore(ritChoreInterval); 198 } 199 200 public void start() throws IOException, KeeperException { 201 if (!running.compareAndSet(false, true)) { 202 return; 203 } 204 205 LOG.trace("Starting assignment manager"); 206 207 // Register Server Listener 208 master.getServerManager().registerListener(this); 209 210 // Start the Assignment Thread 211 startAssignmentThread(); 212 213 // load meta region state 214 ZKWatcher zkw = master.getZooKeeper(); 215 // it could be null in some tests 216 if (zkw != null) { 217 RegionState regionState = MetaTableLocator.getMetaRegionState(zkw); 218 RegionStateNode regionStateNode = 219 regionStates.getOrCreateRegionStateNode(RegionInfoBuilder.FIRST_META_REGIONINFO); 220 synchronized (regionStateNode) { 221 regionStateNode.setRegionLocation(regionState.getServerName()); 222 regionStateNode.setState(regionState.getState()); 223 setMetaAssigned(regionState.getRegion(), regionState.getState() == State.OPEN); 224 } 225 } 226 } 227 228 public void stop() { 229 if (!running.compareAndSet(true, false)) { 230 return; 231 } 232 233 LOG.info("Stopping assignment manager"); 234 235 // The AM is started before the procedure executor, 236 // but the actual work will be loaded/submitted only once we have the executor 237 final boolean hasProcExecutor = master.getMasterProcedureExecutor() != null; 238 239 // Remove the RIT chore 240 if (hasProcExecutor) { 241 master.getMasterProcedureExecutor().removeChore(this.ritChore); 242 } 243 244 // Stop the Assignment Thread 245 stopAssignmentThread(); 246 247 // Stop the RegionStateStore 248 regionStates.clear(); 249 250 // Unregister Server Listener 251 master.getServerManager().unregisterListener(this); 252 253 // Update meta events (for testing) 254 if (hasProcExecutor) { 255 metaLoadEvent.suspend(); 256 for (RegionInfo hri: getMetaRegionSet()) { 257 setMetaAssigned(hri, false); 258 } 259 } 260 } 261 262 public boolean isRunning() { 263 return running.get(); 264 } 265 266 public Configuration getConfiguration() { 267 return master.getConfiguration(); 268 } 269 270 public MetricsAssignmentManager getAssignmentManagerMetrics() { 271 return metrics; 272 } 273 274 private LoadBalancer getBalancer() { 275 return master.getLoadBalancer(); 276 } 277 278 private MasterProcedureEnv getProcedureEnvironment() { 279 return master.getMasterProcedureExecutor().getEnvironment(); 280 } 281 282 private MasterProcedureScheduler getProcedureScheduler() { 283 return getProcedureEnvironment().getProcedureScheduler(); 284 } 285 286 int getAssignMaxAttempts() { 287 return assignMaxAttempts; 288 } 289 290 /** 291 * Add the listener to the notification list. 292 * @param listener The AssignmentListener to register 293 */ 294 public void registerListener(final AssignmentListener listener) { 295 this.listeners.add(listener); 296 } 297 298 /** 299 * Remove the listener from the notification list. 300 * @param listener The AssignmentListener to unregister 301 */ 302 public boolean unregisterListener(final AssignmentListener listener) { 303 return this.listeners.remove(listener); 304 } 305 306 public RegionStates getRegionStates() { 307 return regionStates; 308 } 309 310 public RegionStateStore getRegionStateStore() { 311 return regionStateStore; 312 } 313 314 public List<ServerName> getFavoredNodes(final RegionInfo regionInfo) { 315 return this.shouldAssignRegionsWithFavoredNodes? 316 ((FavoredStochasticBalancer)getBalancer()).getFavoredNodes(regionInfo): 317 ServerName.EMPTY_SERVER_LIST; 318 } 319 320 // ============================================================================================ 321 // Table State Manager helpers 322 // ============================================================================================ 323 TableStateManager getTableStateManager() { 324 return master.getTableStateManager(); 325 } 326 327 public boolean isTableEnabled(final TableName tableName) { 328 return getTableStateManager().isTableState(tableName, TableState.State.ENABLED); 329 } 330 331 public boolean isTableDisabled(final TableName tableName) { 332 return getTableStateManager().isTableState(tableName, 333 TableState.State.DISABLED, TableState.State.DISABLING); 334 } 335 336 // ============================================================================================ 337 // META Helpers 338 // ============================================================================================ 339 private boolean isMetaRegion(final RegionInfo regionInfo) { 340 return regionInfo.isMetaRegion(); 341 } 342 343 public boolean isMetaRegion(final byte[] regionName) { 344 return getMetaRegionFromName(regionName) != null; 345 } 346 347 public RegionInfo getMetaRegionFromName(final byte[] regionName) { 348 for (RegionInfo hri: getMetaRegionSet()) { 349 if (Bytes.equals(hri.getRegionName(), regionName)) { 350 return hri; 351 } 352 } 353 return null; 354 } 355 356 public boolean isCarryingMeta(final ServerName serverName) { 357 // TODO: handle multiple meta 358 return isCarryingRegion(serverName, RegionInfoBuilder.FIRST_META_REGIONINFO); 359 } 360 361 private boolean isCarryingRegion(final ServerName serverName, final RegionInfo regionInfo) { 362 // TODO: check for state? 363 final RegionStateNode node = regionStates.getRegionStateNode(regionInfo); 364 return(node != null && serverName.equals(node.getRegionLocation())); 365 } 366 367 private RegionInfo getMetaForRegion(final RegionInfo regionInfo) { 368 //if (regionInfo.isMetaRegion()) return regionInfo; 369 // TODO: handle multiple meta. if the region provided is not meta lookup 370 // which meta the region belongs to. 371 return RegionInfoBuilder.FIRST_META_REGIONINFO; 372 } 373 374 // TODO: handle multiple meta. 375 private static final Set<RegionInfo> META_REGION_SET = 376 Collections.singleton(RegionInfoBuilder.FIRST_META_REGIONINFO); 377 public Set<RegionInfo> getMetaRegionSet() { 378 return META_REGION_SET; 379 } 380 381 // ============================================================================================ 382 // META Event(s) helpers 383 // ============================================================================================ 384 /** 385 * Notice that, this only means the meta region is available on a RS, but the AM may still be 386 * loading the region states from meta, so usually you need to check {@link #isMetaLoaded()} first 387 * before checking this method, unless you can make sure that your piece of code can only be 388 * executed after AM builds the region states. 389 * @see #isMetaLoaded() 390 */ 391 public boolean isMetaAssigned() { 392 return metaAssignEvent.isReady(); 393 } 394 395 public boolean isMetaRegionInTransition() { 396 return !isMetaAssigned(); 397 } 398 399 /** 400 * Notice that this event does not mean the AM has already finished region state rebuilding. See 401 * the comment of {@link #isMetaAssigned()} for more details. 402 * @see #isMetaAssigned() 403 */ 404 public boolean waitMetaAssigned(Procedure<?> proc, RegionInfo regionInfo) { 405 return getMetaAssignEvent(getMetaForRegion(regionInfo)).suspendIfNotReady(proc); 406 } 407 408 private void setMetaAssigned(RegionInfo metaRegionInfo, boolean assigned) { 409 assert isMetaRegion(metaRegionInfo) : "unexpected non-meta region " + metaRegionInfo; 410 ProcedureEvent<?> metaAssignEvent = getMetaAssignEvent(metaRegionInfo); 411 if (assigned) { 412 metaAssignEvent.wake(getProcedureScheduler()); 413 } else { 414 metaAssignEvent.suspend(); 415 } 416 } 417 418 private ProcedureEvent<?> getMetaAssignEvent(RegionInfo metaRegionInfo) { 419 assert isMetaRegion(metaRegionInfo) : "unexpected non-meta region " + metaRegionInfo; 420 // TODO: handle multiple meta. 421 return metaAssignEvent; 422 } 423 424 /** 425 * Wait until AM finishes the meta loading, i.e, the region states rebuilding. 426 * @see #isMetaLoaded() 427 * @see #waitMetaAssigned(Procedure, RegionInfo) 428 */ 429 public boolean waitMetaLoaded(Procedure<?> proc) { 430 return metaLoadEvent.suspendIfNotReady(proc); 431 } 432 433 @VisibleForTesting 434 void wakeMetaLoadedEvent() { 435 metaLoadEvent.wake(getProcedureScheduler()); 436 assert isMetaLoaded() : "expected meta to be loaded"; 437 } 438 439 /** 440 * Return whether AM finishes the meta loading, i.e, the region states rebuilding. 441 * @see #isMetaAssigned() 442 * @see #waitMetaLoaded(Procedure) 443 */ 444 public boolean isMetaLoaded() { 445 return metaLoadEvent.isReady(); 446 } 447 448 /** 449 * Start a new thread to check if there are region servers whose versions are higher than others. 450 * If so, move all system table regions to RS with the highest version to keep compatibility. 451 * The reason is, RS in new version may not be able to access RS in old version when there are 452 * some incompatible changes. 453 * <p>This method is called when a new RegionServer is added to cluster only.</p> 454 */ 455 public void checkIfShouldMoveSystemRegionAsync() { 456 // TODO: Fix this thread. If a server is killed and a new one started, this thread thinks that 457 // it should 'move' the system tables from the old server to the new server but 458 // ServerCrashProcedure is on it; and it will take care of the assign without dataloss. 459 if (this.master.getServerManager().countOfRegionServers() <= 1) { 460 return; 461 } 462 // This thread used to run whenever there was a change in the cluster. The ZooKeeper 463 // childrenChanged notification came in before the nodeDeleted message and so this method 464 // cold run before a ServerCrashProcedure could run. That meant that this thread could see 465 // a Crashed Server before ServerCrashProcedure and it could find system regions on the 466 // crashed server and go move them before ServerCrashProcedure had a chance; could be 467 // dataloss too if WALs were not recovered. 468 new Thread(() -> { 469 try { 470 synchronized (checkIfShouldMoveSystemRegionLock) { 471 List<RegionPlan> plans = new ArrayList<>(); 472 // TODO: I don't think this code does a good job if all servers in cluster have same 473 // version. It looks like it will schedule unnecessary moves. 474 for (ServerName server : getExcludedServersForSystemTable()) { 475 if (master.getServerManager().isServerDead(server)) { 476 // TODO: See HBASE-18494 and HBASE-18495. Though getExcludedServersForSystemTable() 477 // considers only online servers, the server could be queued for dead server 478 // processing. As region assignments for crashed server is handled by 479 // ServerCrashProcedure, do NOT handle them here. The goal is to handle this through 480 // regular flow of LoadBalancer as a favored node and not to have this special 481 // handling. 482 continue; 483 } 484 List<RegionInfo> regionsShouldMove = getSystemTables(server); 485 if (!regionsShouldMove.isEmpty()) { 486 for (RegionInfo regionInfo : regionsShouldMove) { 487 // null value for dest forces destination server to be selected by balancer 488 RegionPlan plan = new RegionPlan(regionInfo, server, null); 489 if (regionInfo.isMetaRegion()) { 490 // Must move meta region first. 491 LOG.info("Async MOVE of {} to newer Server={}", 492 regionInfo.getEncodedName(), server); 493 moveAsync(plan); 494 } else { 495 plans.add(plan); 496 } 497 } 498 } 499 for (RegionPlan plan : plans) { 500 LOG.info("Async MOVE of {} to newer Server={}", 501 plan.getRegionInfo().getEncodedName(), server); 502 moveAsync(plan); 503 } 504 } 505 } 506 } catch (Throwable t) { 507 LOG.error(t.toString(), t); 508 } 509 }).start(); 510 } 511 512 private List<RegionInfo> getSystemTables(ServerName serverName) { 513 Set<RegionStateNode> regions = this.getRegionStates().getServerNode(serverName).getRegions(); 514 if (regions == null) { 515 return new ArrayList<>(); 516 } 517 return regions.stream() 518 .map(RegionStateNode::getRegionInfo) 519 .filter(r -> r.getTable().isSystemTable()) 520 .collect(Collectors.toList()); 521 } 522 523 public void assign(final RegionInfo regionInfo, ServerName sn) throws IOException { 524 AssignProcedure proc = createAssignProcedure(regionInfo, sn); 525 ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc); 526 } 527 528 public void assign(final RegionInfo regionInfo) throws IOException { 529 AssignProcedure proc = createAssignProcedure(regionInfo); 530 ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc); 531 } 532 533 public void unassign(final RegionInfo regionInfo) throws IOException { 534 unassign(regionInfo, false); 535 } 536 537 public void unassign(final RegionInfo regionInfo, final boolean forceNewPlan) 538 throws IOException { 539 // TODO: rename this reassign 540 RegionStateNode node = this.regionStates.getRegionStateNode(regionInfo); 541 ServerName destinationServer = node.getRegionLocation(); 542 if (destinationServer == null) { 543 throw new UnexpectedStateException("DestinationServer is null; Assigned? " + node.toString()); 544 } 545 assert destinationServer != null; node.toString(); 546 UnassignProcedure proc = createUnassignProcedure(regionInfo, destinationServer, forceNewPlan); 547 ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc); 548 } 549 550 public void move(final RegionInfo regionInfo) throws IOException { 551 RegionStateNode node = this.regionStates.getRegionStateNode(regionInfo); 552 ServerName sourceServer = node.getRegionLocation(); 553 RegionPlan plan = new RegionPlan(regionInfo, sourceServer, null); 554 MoveRegionProcedure proc = createMoveRegionProcedure(plan); 555 ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc); 556 } 557 558 public Future<byte[]> moveAsync(final RegionPlan regionPlan) throws HBaseIOException { 559 MoveRegionProcedure proc = createMoveRegionProcedure(regionPlan); 560 return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), proc); 561 } 562 563 /** 564 * Create round-robin assigns. Use on table creation to distribute out regions across cluster. 565 * @return AssignProcedures made out of the passed in <code>hris</code> and a call to the balancer 566 * to populate the assigns with targets chosen using round-robin (default balancer 567 * scheme). If at assign-time, the target chosen is no longer up, thats fine, the 568 * AssignProcedure will ask the balancer for a new target, and so on. 569 */ 570 public AssignProcedure[] createRoundRobinAssignProcedures(List<RegionInfo> hris) { 571 return createRoundRobinAssignProcedures(hris, null); 572 } 573 574 // ============================================================================================ 575 // RegionTransition procedures helpers 576 // ============================================================================================ 577 578 /** 579 * Create round-robin assigns. Use on table creation to distribute out regions across cluster. 580 * @return AssignProcedures made out of the passed in <code>hris</code> and a call 581 * to the balancer to populate the assigns with targets chosen using round-robin (default 582 * balancer scheme). If at assign-time, the target chosen is no longer up, thats fine, 583 * the AssignProcedure will ask the balancer for a new target, and so on. 584 */ 585 public AssignProcedure[] createRoundRobinAssignProcedures(final List<RegionInfo> hris, 586 List<ServerName> serversToExclude) { 587 if (hris.isEmpty()) { 588 return null; 589 } 590 if (serversToExclude != null 591 && this.master.getServerManager().getOnlineServersList().size() == 1) { 592 LOG.debug("Only one region server found and hence going ahead with the assignment"); 593 serversToExclude = null; 594 } 595 try { 596 // Ask the balancer to assign our regions. Pass the regions en masse. The balancer can do 597 // a better job if it has all the assignments in the one lump. 598 Map<ServerName, List<RegionInfo>> assignments = getBalancer().roundRobinAssignment(hris, 599 this.master.getServerManager().createDestinationServersList(serversToExclude)); 600 // Return mid-method! 601 return createAssignProcedures(assignments, hris.size()); 602 } catch (HBaseIOException hioe) { 603 LOG.warn("Failed roundRobinAssignment", hioe); 604 } 605 // If an error above, fall-through to this simpler assign. Last resort. 606 return createAssignProcedures(hris); 607 } 608 609 /** 610 * Create an array of AssignProcedures w/o specifying a target server. 611 * If no target server, at assign time, we will try to use the former location of the region 612 * if one exists. This is how we 'retain' the old location across a server restart. 613 * Used by {@link ServerCrashProcedure} assigning regions on a server that has crashed (SCP is 614 * also used across a cluster-restart just-in-case to ensure we do cleanup of any old WALs or 615 * server processes). 616 */ 617 public AssignProcedure[] createAssignProcedures(final List<RegionInfo> hris) { 618 if (hris.isEmpty()) { 619 return null; 620 } 621 int index = 0; 622 AssignProcedure [] procedures = new AssignProcedure[hris.size()]; 623 for (RegionInfo hri : hris) { 624 // Sort the procedures so meta and system regions are first in the returned array. 625 procedures[index++] = createAssignProcedure(hri); 626 } 627 if (procedures.length > 1) { 628 // Sort the procedures so meta and system regions are first in the returned array. 629 Arrays.sort(procedures, AssignProcedure.COMPARATOR); 630 } 631 return procedures; 632 } 633 634 // Make this static for the method below where we use it typing the AssignProcedure array we 635 // return as result. 636 private static final AssignProcedure [] ASSIGN_PROCEDURE_ARRAY_TYPE = new AssignProcedure[] {}; 637 638 /** 639 * @param assignments Map of assignments from which we produce an array of AssignProcedures. 640 * @param size Count of assignments to make (the caller may know the total count) 641 * @return Assignments made from the passed in <code>assignments</code> 642 */ 643 private AssignProcedure[] createAssignProcedures(Map<ServerName, List<RegionInfo>> assignments, 644 int size) { 645 List<AssignProcedure> procedures = new ArrayList<>(size > 0? size: 8/*Arbitrary*/); 646 for (Map.Entry<ServerName, List<RegionInfo>> e: assignments.entrySet()) { 647 for (RegionInfo ri: e.getValue()) { 648 AssignProcedure ap = createAssignProcedure(ri, e.getKey()); 649 ap.setOwner(getProcedureEnvironment().getRequestUser().getShortName()); 650 procedures.add(ap); 651 } 652 } 653 if (procedures.size() > 1) { 654 // Sort the procedures so meta and system regions are first in the returned array. 655 procedures.sort(AssignProcedure.COMPARATOR); 656 } 657 return procedures.toArray(ASSIGN_PROCEDURE_ARRAY_TYPE); 658 } 659 660 // Needed for the following method so it can type the created Array we retur n 661 private static final UnassignProcedure [] UNASSIGN_PROCEDURE_ARRAY_TYPE = 662 new UnassignProcedure[0]; 663 664 UnassignProcedure[] createUnassignProcedures(final Collection<RegionStateNode> nodes) { 665 if (nodes.isEmpty()) return null; 666 final List<UnassignProcedure> procs = new ArrayList<UnassignProcedure>(nodes.size()); 667 for (RegionStateNode node: nodes) { 668 if (!this.regionStates.include(node, false)) continue; 669 // Look for regions that are offline/closed; i.e. already unassigned. 670 if (this.regionStates.isRegionOffline(node.getRegionInfo())) continue; 671 assert node.getRegionLocation() != null: node.toString(); 672 procs.add(createUnassignProcedure(node.getRegionInfo(), node.getRegionLocation(), false)); 673 } 674 return procs.toArray(UNASSIGN_PROCEDURE_ARRAY_TYPE); 675 } 676 677 /** 678 * Called by things like DisableTableProcedure to get a list of UnassignProcedure 679 * to unassign the regions of the table. 680 */ 681 public AssignProcedure createAssignProcedure(final RegionInfo regionInfo) { 682 return createAssignProcedure(regionInfo, null, false); 683 } 684 685 public AssignProcedure createAssignProcedure(final RegionInfo regionInfo, boolean override) { 686 return createAssignProcedure(regionInfo, null, override); 687 } 688 689 public AssignProcedure createAssignProcedure(final RegionInfo regionInfo, 690 ServerName targetServer) { 691 return createAssignProcedure(regionInfo, targetServer, false); 692 } 693 694 public AssignProcedure createAssignProcedure(final RegionInfo regionInfo, 695 final ServerName targetServer, boolean override) { 696 AssignProcedure proc = new AssignProcedure(regionInfo, targetServer, override); 697 proc.setOwner(getProcedureEnvironment().getRequestUser().getShortName()); 698 return proc; 699 } 700 701 public UnassignProcedure createUnassignProcedure(final RegionInfo regionInfo) { 702 return createUnassignProcedure(regionInfo, null, false); 703 } 704 705 public UnassignProcedure createUnassignProcedure(final RegionInfo regionInfo, 706 boolean override) { 707 return createUnassignProcedure(regionInfo, null, override); 708 } 709 710 public UnassignProcedure[] createUnassignProcedures(final TableName tableName) { 711 return createUnassignProcedures(regionStates.getTableRegionStateNodes(tableName)); 712 } 713 714 UnassignProcedure createUnassignProcedure(final RegionInfo regionInfo, 715 final ServerName destinationServer, final boolean force) { 716 return createUnassignProcedure(regionInfo, destinationServer, force, false); 717 } 718 719 UnassignProcedure createUnassignProcedure(final RegionInfo regionInfo, 720 final ServerName destinationServer, final boolean force, 721 final boolean removeAfterUnassigning) { 722 // If destinationServer is null, figure it. 723 ServerName sn = destinationServer != null? destinationServer: 724 getRegionStates().getRegionState(regionInfo).getServerName(); 725 assert sn != null; 726 UnassignProcedure proc = new UnassignProcedure(regionInfo, sn, force, removeAfterUnassigning); 727 proc.setOwner(getProcedureEnvironment().getRequestUser().getShortName()); 728 return proc; 729 } 730 731 private MoveRegionProcedure createMoveRegionProcedure(RegionPlan plan) throws HBaseIOException { 732 if (plan.getRegionInfo().getTable().isSystemTable()) { 733 List<ServerName> exclude = getExcludedServersForSystemTable(); 734 if (plan.getDestination() != null && exclude.contains(plan.getDestination())) { 735 try { 736 LOG.info("Can not move " + plan.getRegionInfo() + " to " + plan.getDestination() + 737 " because the server is not with highest version"); 738 plan.setDestination(getBalancer().randomAssignment(plan.getRegionInfo(), 739 this.master.getServerManager().createDestinationServersList(exclude))); 740 } catch (HBaseIOException e) { 741 LOG.warn(e.toString(), e); 742 } 743 } 744 } 745 return new MoveRegionProcedure(getProcedureEnvironment(), plan, true); 746 } 747 748 749 public SplitTableRegionProcedure createSplitProcedure(final RegionInfo regionToSplit, 750 final byte[] splitKey) throws IOException { 751 return new SplitTableRegionProcedure(getProcedureEnvironment(), regionToSplit, splitKey); 752 } 753 754 public MergeTableRegionsProcedure createMergeProcedure(RegionInfo ... ris) throws IOException { 755 return new MergeTableRegionsProcedure(getProcedureEnvironment(), ris, false); 756 } 757 758 /** 759 * Delete the region states. This is called by "DeleteTable" 760 */ 761 public void deleteTable(final TableName tableName) throws IOException { 762 final ArrayList<RegionInfo> regions = regionStates.getTableRegionsInfo(tableName); 763 regionStateStore.deleteRegions(regions); 764 for (int i = 0; i < regions.size(); ++i) { 765 final RegionInfo regionInfo = regions.get(i); 766 // we expect the region to be offline 767 regionStates.removeFromOfflineRegions(regionInfo); 768 regionStates.deleteRegion(regionInfo); 769 } 770 } 771 772 // ============================================================================================ 773 // RS Region Transition Report helpers 774 // ============================================================================================ 775 // TODO: Move this code in MasterRpcServices and call on specific event? 776 public ReportRegionStateTransitionResponse reportRegionStateTransition( 777 final ReportRegionStateTransitionRequest req) 778 throws PleaseHoldException { 779 final ReportRegionStateTransitionResponse.Builder builder = 780 ReportRegionStateTransitionResponse.newBuilder(); 781 final ServerName serverName = ProtobufUtil.toServerName(req.getServer()); 782 try { 783 for (RegionStateTransition transition: req.getTransitionList()) { 784 switch (transition.getTransitionCode()) { 785 case OPENED: 786 case FAILED_OPEN: 787 case CLOSED: 788 assert transition.getRegionInfoCount() == 1 : transition; 789 final RegionInfo hri = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0)); 790 updateRegionTransition(serverName, transition.getTransitionCode(), hri, 791 transition.hasOpenSeqNum() ? transition.getOpenSeqNum() : HConstants.NO_SEQNUM); 792 break; 793 case READY_TO_SPLIT: 794 case SPLIT: 795 case SPLIT_REVERTED: 796 assert transition.getRegionInfoCount() == 3 : transition; 797 final RegionInfo parent = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0)); 798 final RegionInfo splitA = ProtobufUtil.toRegionInfo(transition.getRegionInfo(1)); 799 final RegionInfo splitB = ProtobufUtil.toRegionInfo(transition.getRegionInfo(2)); 800 updateRegionSplitTransition(serverName, transition.getTransitionCode(), 801 parent, splitA, splitB); 802 break; 803 case READY_TO_MERGE: 804 case MERGED: 805 case MERGE_REVERTED: 806 assert transition.getRegionInfoCount() == 3 : transition; 807 final RegionInfo merged = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0)); 808 final RegionInfo mergeA = ProtobufUtil.toRegionInfo(transition.getRegionInfo(1)); 809 final RegionInfo mergeB = ProtobufUtil.toRegionInfo(transition.getRegionInfo(2)); 810 updateRegionMergeTransition(serverName, transition.getTransitionCode(), 811 merged, mergeA, mergeB); 812 break; 813 } 814 } 815 } catch (PleaseHoldException e) { 816 if (LOG.isTraceEnabled()) LOG.trace("Failed transition " + e.getMessage()); 817 throw e; 818 } catch (UnsupportedOperationException|IOException e) { 819 // TODO: at the moment we have a single error message and the RS will abort 820 // if the master says that one of the region transitions failed. 821 LOG.warn("Failed transition", e); 822 builder.setErrorMessage("Failed transition " + e.getMessage()); 823 } 824 return builder.build(); 825 } 826 827 /** 828 * Called when the RegionServer wants to report a Procedure transition. 829 * Ends up calling {@link #reportTransition(RegionStateNode, ServerName, TransitionCode, long)} 830 */ 831 private void updateRegionTransition(final ServerName serverName, final TransitionCode state, 832 final RegionInfo regionInfo, final long seqId) 833 throws PleaseHoldException, UnexpectedStateException { 834 checkMetaLoaded(regionInfo); 835 836 final RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo); 837 if (regionNode == null) { 838 // the table/region is gone. maybe a delete, split, merge 839 throw new UnexpectedStateException(String.format( 840 "Server %s was trying to transition region %s to %s. but the region was removed.", 841 serverName, regionInfo, state)); 842 } 843 844 if (LOG.isTraceEnabled()) { 845 LOG.trace(String.format("Update region transition serverName=%s region=%s regionState=%s", 846 serverName, regionNode, state)); 847 } 848 849 if (!reportTransition(regionNode, serverName, state, seqId)) { 850 // Don't log WARN if shutting down cluster; during shutdown. Avoid the below messages: 851 // 2018-08-13 10:45:10,551 WARN ...AssignmentManager: No matching procedure found for 852 // rit=OPEN, location=ve0538.halxg.cloudera.com,16020,1533493000958, 853 // table=IntegrationTestBigLinkedList, region=65ab289e2fc1530df65f6c3d7cde7aa5 transition 854 // to CLOSED 855 // These happen because on cluster shutdown, we currently let the RegionServers close 856 // regions. This is the only time that region close is not run by the Master (so cluster 857 // goes down fast). Consider changing it so Master runs all shutdowns. 858 if (this.master.getServerManager().isClusterShutdown() && 859 state.equals(TransitionCode.CLOSED)) { 860 LOG.info("RegionServer {} {}", state, regionNode.getRegionInfo().getEncodedName()); 861 } else { 862 LOG.warn("No matching procedure found for {} transition to {}", regionNode, state); 863 } 864 } 865 } 866 867 // FYI: regionNode is sometimes synchronized by the caller but not always. 868 private boolean reportTransition(final RegionStateNode regionNode, ServerName serverName, 869 final TransitionCode state, final long seqId) 870 throws UnexpectedStateException { 871 synchronized (regionNode) { 872 final RegionTransitionProcedure proc = regionNode.getProcedure(); 873 if (proc == null) return false; 874 875 // serverNode.getReportEvent().removeProcedure(proc); 876 proc.reportTransition(master.getMasterProcedureExecutor().getEnvironment(), 877 serverName, state, seqId); 878 } 879 return true; 880 } 881 882 private void updateRegionSplitTransition(final ServerName serverName, final TransitionCode state, 883 final RegionInfo parent, final RegionInfo hriA, final RegionInfo hriB) 884 throws IOException { 885 checkMetaLoaded(parent); 886 887 if (state != TransitionCode.READY_TO_SPLIT) { 888 throw new UnexpectedStateException("unsupported split regionState=" + state + 889 " for parent region " + parent + 890 " maybe an old RS (< 2.0) had the operation in progress"); 891 } 892 893 // sanity check on the request 894 if (!Bytes.equals(hriA.getEndKey(), hriB.getStartKey())) { 895 throw new UnsupportedOperationException( 896 "unsupported split request with bad keys: parent=" + parent + 897 " hriA=" + hriA + " hriB=" + hriB); 898 } 899 900 // Submit the Split procedure 901 final byte[] splitKey = hriB.getStartKey(); 902 if (LOG.isDebugEnabled()) { 903 LOG.debug("Split request from " + serverName + 904 ", parent=" + parent + " splitKey=" + Bytes.toStringBinary(splitKey)); 905 } 906 master.getMasterProcedureExecutor().submitProcedure(createSplitProcedure(parent, splitKey)); 907 908 // If the RS is < 2.0 throw an exception to abort the operation, we are handling the split 909 if (master.getServerManager().getVersionNumber(serverName) < 0x0200000) { 910 throw new UnsupportedOperationException(String.format( 911 "Split handled by the master: parent=%s hriA=%s hriB=%s", parent.getShortNameToLog(), hriA, hriB)); 912 } 913 } 914 915 private void updateRegionMergeTransition(final ServerName serverName, final TransitionCode state, 916 final RegionInfo merged, final RegionInfo hriA, final RegionInfo hriB) throws IOException { 917 checkMetaLoaded(merged); 918 919 if (state != TransitionCode.READY_TO_MERGE) { 920 throw new UnexpectedStateException("Unsupported merge regionState=" + state + 921 " for regionA=" + hriA + " regionB=" + hriB + " merged=" + merged + 922 " maybe an old RS (< 2.0) had the operation in progress"); 923 } 924 925 // Submit the Merge procedure 926 if (LOG.isDebugEnabled()) { 927 LOG.debug("Handling merge request from RS=" + merged + ", merged=" + merged); 928 } 929 master.getMasterProcedureExecutor().submitProcedure(createMergeProcedure(hriA, hriB)); 930 931 // If the RS is < 2.0 throw an exception to abort the operation, we are handling the merge 932 if (master.getServerManager().getVersionNumber(serverName) < 0x0200000) { 933 throw new UnsupportedOperationException(String.format( 934 "Merge not handled yet: regionState=%s merged=%s hriA=%s hriB=%s", state, merged, hriA, 935 hriB)); 936 } 937 } 938 939 // ============================================================================================ 940 // RS Status update (report online regions) helpers 941 // ============================================================================================ 942 /** 943 * the master will call this method when the RS send the regionServerReport(). 944 * the report will contains the "online regions". 945 * this method will check the the online regions against the in-memory state of the AM, 946 * if there is a mismatch we will try to fence out the RS with the assumption 947 * that something went wrong on the RS side. 948 */ 949 public void reportOnlineRegions(final ServerName serverName, final Set<byte[]> regionNames) 950 throws YouAreDeadException { 951 if (!isRunning()) return; 952 if (LOG.isTraceEnabled()) { 953 LOG.trace("ReportOnlineRegions " + serverName + " regionCount=" + regionNames.size() + 954 ", metaLoaded=" + isMetaLoaded() + " " + 955 regionNames.stream().map(element -> Bytes.toStringBinary(element)). 956 collect(Collectors.toList())); 957 } 958 959 // Make sure there is a ServerStateNode for this server that just checked in. 960 ServerStateNode serverNode = regionStates.getOrCreateServer(serverName); 961 synchronized (serverNode) { 962 if (!serverNode.isInState(ServerState.ONLINE)) { 963 LOG.warn("Got a report from a server result in state " + serverNode.getState()); 964 return; 965 } 966 } 967 968 // Track the regionserver reported online regions in memory. 969 synchronized (rsReports) { 970 rsReports.put(serverName, regionNames); 971 } 972 973 if (regionNames.isEmpty()) { 974 // nothing to do if we don't have regions 975 LOG.trace("no online region found on " + serverName); 976 } else if (!isMetaLoaded()) { 977 // if we are still on startup, discard the report unless is from someone holding meta 978 checkOnlineRegionsReportForMeta(serverName, regionNames); 979 } else { 980 // The Heartbeat updates us of what regions are only. check and verify the state. 981 checkOnlineRegionsReport(serverNode, regionNames); 982 } 983 984 // wake report event 985 wakeServerReportEvent(serverNode); 986 } 987 988 void checkOnlineRegionsReportForMeta(final ServerName serverName, final Set<byte[]> regionNames) { 989 try { 990 for (byte[] regionName: regionNames) { 991 final RegionInfo hri = getMetaRegionFromName(regionName); 992 if (hri == null) { 993 if (LOG.isTraceEnabled()) { 994 LOG.trace("Skip online report for region={} while meta is loading from server={}", 995 Bytes.toStringBinary(regionName), serverName); 996 } 997 continue; 998 } 999 1000 final RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(hri); 1001 LOG.info("META REPORTED: " + regionNode); 1002 if (!reportTransition(regionNode, serverName, TransitionCode.OPENED, 0)) { 1003 LOG.warn("META REPORTED but no procedure found (complete?); set location={}", serverName); 1004 regionNode.setRegionLocation(serverName); 1005 } else if (LOG.isTraceEnabled()) { 1006 LOG.trace("META REPORTED: " + regionNode); 1007 } 1008 } 1009 } catch (UnexpectedStateException e) { 1010 LOG.warn("KILLING " + serverName + ": " + e.getMessage()); 1011 killRegionServer(serverName); 1012 } 1013 } 1014 1015 void checkOnlineRegionsReport(final ServerStateNode serverNode, final Set<byte[]> regionNames) { 1016 final ServerName serverName = serverNode.getServerName(); 1017 try { 1018 for (byte[] regionName: regionNames) { 1019 if (!isRunning()) return; 1020 final RegionStateNode regionNode = regionStates.getRegionStateNodeFromName(regionName); 1021 if (regionNode == null) { 1022 throw new UnexpectedStateException("Not online: " + Bytes.toStringBinary(regionName)); 1023 } 1024 synchronized (regionNode) { 1025 if (regionNode.isInState(State.OPENING, State.OPEN)) { 1026 if (!regionNode.getRegionLocation().equals(serverName)) { 1027 throw new UnexpectedStateException(regionNode.toString() + 1028 " reported OPEN on server=" + serverName + 1029 " but state has otherwise."); 1030 } else if (regionNode.isInState(State.OPENING)) { 1031 try { 1032 if (!reportTransition(regionNode, serverNode.getServerName(), 1033 TransitionCode.OPENED, 0)) { 1034 LOG.warn(regionNode.toString() + " reported OPEN on server=" + serverName + 1035 " but state has otherwise AND NO procedure is running"); 1036 } 1037 } catch (UnexpectedStateException e) { 1038 LOG.warn("{} reported unexpteced OPEN: {} sever={}", regionNode.toString(), 1039 e.getMessage(), serverName, e); 1040 } 1041 } 1042 } else if (!regionNode.isInState(State.CLOSING, State.SPLITTING)) { 1043 long diff = regionNode.getLastUpdate() - EnvironmentEdgeManager.currentTime(); 1044 if (diff > 1000/*One Second... make configurable if an issue*/) { 1045 // So, we can get report that a region is CLOSED or SPLIT because a heartbeat 1046 // came in at about same time as a region transition. Make sure there is some 1047 // elapsed time between killing remote server. 1048 throw new UnexpectedStateException(regionNode.toString() + 1049 " reported an unexpected OPEN; time since last update=" + diff); 1050 } 1051 } 1052 } 1053 } 1054 } catch (UnexpectedStateException e) { 1055 //See HBASE-21421, we can count on reportRegionStateTransition calls 1056 //We only log a warming here. It could be a network lag. 1057 LOG.warn("Failed to checkOnlineRegionsReport, maybe due to network lag, " 1058 + "if this message continues, be careful of double assign. report from server={}", 1059 serverName, e); 1060 } 1061 } 1062 1063 protected boolean waitServerReportEvent(ServerName serverName, Procedure<?> proc) { 1064 ServerStateNode ssn = this.regionStates.getServerNode(serverName); 1065 if (ssn == null) { 1066 LOG.warn("Why is ServerStateNode for {} empty at this point? Creating...", serverName); 1067 ssn = this.regionStates.getOrCreateServer(serverName); 1068 } 1069 return ssn.getReportEvent().suspendIfNotReady(proc); 1070 } 1071 1072 protected void wakeServerReportEvent(final ServerStateNode serverNode) { 1073 serverNode.getReportEvent().wake(getProcedureScheduler()); 1074 } 1075 1076 // ============================================================================================ 1077 // RIT chore 1078 // ============================================================================================ 1079 private static class RegionInTransitionChore extends ProcedureInMemoryChore<MasterProcedureEnv> { 1080 public RegionInTransitionChore(final int timeoutMsec) { 1081 super(timeoutMsec); 1082 } 1083 1084 @Override 1085 protected void periodicExecute(final MasterProcedureEnv env) { 1086 final AssignmentManager am = env.getAssignmentManager(); 1087 1088 final RegionInTransitionStat ritStat = am.computeRegionInTransitionStat(); 1089 if (ritStat.hasRegionsOverThreshold()) { 1090 for (RegionState hri: ritStat.getRegionOverThreshold()) { 1091 am.handleRegionOverStuckWarningThreshold(hri.getRegion()); 1092 } 1093 } 1094 1095 // update metrics 1096 am.updateRegionsInTransitionMetrics(ritStat); 1097 } 1098 } 1099 1100 public RegionInTransitionStat computeRegionInTransitionStat() { 1101 final RegionInTransitionStat rit = new RegionInTransitionStat(getConfiguration()); 1102 rit.update(this); 1103 return rit; 1104 } 1105 1106 public static class RegionInTransitionStat { 1107 private final int ritThreshold; 1108 1109 private HashMap<String, RegionState> ritsOverThreshold = null; 1110 private long statTimestamp; 1111 private long oldestRITTime = 0; 1112 private int totalRITsTwiceThreshold = 0; 1113 private int totalRITs = 0; 1114 1115 @VisibleForTesting 1116 public RegionInTransitionStat(final Configuration conf) { 1117 this.ritThreshold = 1118 conf.getInt(METRICS_RIT_STUCK_WARNING_THRESHOLD, DEFAULT_RIT_STUCK_WARNING_THRESHOLD); 1119 } 1120 1121 public int getRITThreshold() { 1122 return ritThreshold; 1123 } 1124 1125 public long getTimestamp() { 1126 return statTimestamp; 1127 } 1128 1129 public int getTotalRITs() { 1130 return totalRITs; 1131 } 1132 1133 public long getOldestRITTime() { 1134 return oldestRITTime; 1135 } 1136 1137 public int getTotalRITsOverThreshold() { 1138 Map<String, RegionState> m = this.ritsOverThreshold; 1139 return m != null ? m.size() : 0; 1140 } 1141 1142 public boolean hasRegionsTwiceOverThreshold() { 1143 return totalRITsTwiceThreshold > 0; 1144 } 1145 1146 public boolean hasRegionsOverThreshold() { 1147 Map<String, RegionState> m = this.ritsOverThreshold; 1148 return m != null && !m.isEmpty(); 1149 } 1150 1151 public Collection<RegionState> getRegionOverThreshold() { 1152 Map<String, RegionState> m = this.ritsOverThreshold; 1153 return m != null? m.values(): Collections.emptySet(); 1154 } 1155 1156 public boolean isRegionOverThreshold(final RegionInfo regionInfo) { 1157 Map<String, RegionState> m = this.ritsOverThreshold; 1158 return m != null && m.containsKey(regionInfo.getEncodedName()); 1159 } 1160 1161 public boolean isRegionTwiceOverThreshold(final RegionInfo regionInfo) { 1162 Map<String, RegionState> m = this.ritsOverThreshold; 1163 if (m == null) return false; 1164 final RegionState state = m.get(regionInfo.getEncodedName()); 1165 if (state == null) return false; 1166 return (statTimestamp - state.getStamp()) > (ritThreshold * 2); 1167 } 1168 1169 protected void update(final AssignmentManager am) { 1170 final RegionStates regionStates = am.getRegionStates(); 1171 this.statTimestamp = EnvironmentEdgeManager.currentTime(); 1172 update(regionStates.getRegionsStateInTransition(), statTimestamp); 1173 update(regionStates.getRegionFailedOpen(), statTimestamp); 1174 } 1175 1176 private void update(final Collection<RegionState> regions, final long currentTime) { 1177 for (RegionState state: regions) { 1178 totalRITs++; 1179 final long ritTime = currentTime - state.getStamp(); 1180 if (ritTime > ritThreshold) { 1181 if (ritsOverThreshold == null) { 1182 ritsOverThreshold = new HashMap<String, RegionState>(); 1183 } 1184 ritsOverThreshold.put(state.getRegion().getEncodedName(), state); 1185 totalRITsTwiceThreshold += (ritTime > (ritThreshold * 2)) ? 1 : 0; 1186 } 1187 if (oldestRITTime < ritTime) { 1188 oldestRITTime = ritTime; 1189 } 1190 } 1191 } 1192 } 1193 1194 private void updateRegionsInTransitionMetrics(final RegionInTransitionStat ritStat) { 1195 metrics.updateRITOldestAge(ritStat.getOldestRITTime()); 1196 metrics.updateRITCount(ritStat.getTotalRITs()); 1197 metrics.updateRITCountOverThreshold(ritStat.getTotalRITsOverThreshold()); 1198 } 1199 1200 private void handleRegionOverStuckWarningThreshold(final RegionInfo regionInfo) { 1201 final RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo); 1202 //if (regionNode.isStuck()) { 1203 LOG.warn("STUCK Region-In-Transition {}", regionNode); 1204 } 1205 1206 // ============================================================================================ 1207 // TODO: Master load/bootstrap 1208 // ============================================================================================ 1209 public void joinCluster() throws IOException { 1210 long startTime = System.nanoTime(); 1211 LOG.debug("Joining cluster..."); 1212 1213 // Scan hbase:meta to build list of existing regions, servers, and assignment. 1214 // hbase:meta is online now or will be. Inside loadMeta, we keep trying. Can't make progress 1215 // w/o meta. 1216 loadMeta(); 1217 1218 while (master.getServerManager().countOfRegionServers() < 1) { 1219 LOG.info("Waiting for RegionServers to join; current count={}", 1220 master.getServerManager().countOfRegionServers()); 1221 Threads.sleep(250); 1222 } 1223 LOG.info("Number of RegionServers={}", master.getServerManager().countOfRegionServers()); 1224 1225 // Start the RIT chore 1226 master.getMasterProcedureExecutor().addChore(this.ritChore); 1227 1228 long costMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime); 1229 LOG.info("Joined the cluster in {}", StringUtils.humanTimeDiff(costMs)); 1230 } 1231 1232 /** 1233 * Create assign procedure for offline regions. 1234 * Just follow the old processofflineServersWithOnlineRegions method. Since now we do not need to 1235 * deal with dead server any more, we only deal with the regions in OFFLINE state in this method. 1236 * And this is a bit strange, that for new regions, we will add it in CLOSED state instead of 1237 * OFFLINE state, and usually there will be a procedure to track them. The 1238 * processofflineServersWithOnlineRegions is a legacy from long ago, as things are going really 1239 * different now, maybe we do not need this method any more. Need to revisit later. 1240 */ 1241 // Public so can be run by the Master as part of the startup. Needs hbase:meta to be online. 1242 // Needs to be done after the table state manager has been started. 1243 public void processOfflineRegions() { 1244 List<RegionInfo> offlineRegions = regionStates.getRegionStates().stream() 1245 .filter(RegionState::isOffline).filter(s -> isTableEnabled(s.getRegion().getTable())) 1246 .map(RegionState::getRegion).collect(Collectors.toList()); 1247 if (!offlineRegions.isEmpty()) { 1248 master.getMasterProcedureExecutor().submitProcedures( 1249 master.getAssignmentManager().createRoundRobinAssignProcedures(offlineRegions)); 1250 } 1251 } 1252 1253 private void loadMeta() throws IOException { 1254 // TODO: use a thread pool 1255 regionStateStore.visitMeta(new RegionStateStore.RegionStateVisitor() { 1256 @Override 1257 public void visitRegionState(Result result, final RegionInfo regionInfo, final State state, 1258 final ServerName regionLocation, final ServerName lastHost, final long openSeqNum) { 1259 if (state == null && regionLocation == null && lastHost == null && 1260 openSeqNum == SequenceId.NO_SEQUENCE_ID) { 1261 // This is a row with nothing in it. 1262 LOG.warn("Skipping empty row={}", result); 1263 return; 1264 } 1265 State localState = state; 1266 if (localState == null) { 1267 // No region state column data in hbase:meta table! Are I doing a rolling upgrade from 1268 // hbase1 to hbase2? Am I restoring a SNAPSHOT or otherwise adding a region to hbase:meta? 1269 // In any of these cases, state is empty. For now, presume OFFLINE but there are probably 1270 // cases where we need to probe more to be sure this correct; TODO informed by experience. 1271 LOG.info(regionInfo.getEncodedName() + " regionState=null; presuming " + State.OFFLINE); 1272 1273 localState = State.OFFLINE; 1274 } 1275 final RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo); 1276 synchronized (regionNode) { 1277 if (!regionNode.isInTransition()) { 1278 regionNode.setState(localState); 1279 regionNode.setLastHost(lastHost); 1280 regionNode.setRegionLocation(regionLocation); 1281 regionNode.setOpenSeqNum(openSeqNum); 1282 if (localState == State.OPEN) { 1283 assert regionLocation != null : "found null region location for " + regionNode; 1284 regionStates.getOrCreateServer(regionNode.getRegionLocation()); 1285 regionStates.addRegionToServer(regionNode); 1286 } else if (localState == State.OFFLINE || regionInfo.isOffline()) { 1287 regionStates.addToOfflineRegions(regionNode); 1288 } else if (localState == State.CLOSED && getTableStateManager(). 1289 isTableState(regionNode.getTable(), TableState.State.DISABLED, 1290 TableState.State.DISABLING)) { 1291 // The region is CLOSED and the table is DISABLED/ DISABLING, there is nothing to 1292 // schedule; the region is inert. 1293 } else { 1294 // This is region in CLOSING or OPENING state. 1295 // These regions should have a procedure in replay. 1296 // If they don't, then they will show as STUCK after a while because of the below 1297 // registration and will need intervention by operator to fix. Add them to a server 1298 // even if the server is not around so a SCP cleans them up. 1299 if (regionLocation != null) { 1300 regionStates.getOrCreateServer(regionLocation); 1301 } 1302 regionStates.addRegionInTransition(regionNode, null); 1303 } 1304 } else { 1305 LOG.info("RIT {}", regionNode); 1306 } 1307 } 1308 } 1309 }); 1310 1311 // every assignment is blocked until meta is loaded. 1312 wakeMetaLoadedEvent(); 1313 } 1314 1315 /** 1316 * Used to check if the meta loading is done. 1317 * <p/> 1318 * if not we throw PleaseHoldException since we are rebuilding the RegionStates 1319 * @param hri region to check if it is already rebuild 1320 * @throws PleaseHoldException if meta has not been loaded yet 1321 */ 1322 private void checkMetaLoaded(RegionInfo hri) throws PleaseHoldException { 1323 if (!isRunning()) { 1324 throw new PleaseHoldException("AssignmentManager not running"); 1325 } 1326 boolean meta = isMetaRegion(hri); 1327 boolean metaLoaded = isMetaLoaded(); 1328 if (!meta && !metaLoaded) { 1329 throw new PleaseHoldException( 1330 "Master not fully online; hbase:meta=" + meta + ", metaLoaded=" + metaLoaded); 1331 } 1332 } 1333 1334 // ============================================================================================ 1335 // TODO: Metrics 1336 // ============================================================================================ 1337 public int getNumRegionsOpened() { 1338 // TODO: Used by TestRegionPlacement.java and assume monotonically increasing value 1339 return 0; 1340 } 1341 1342 public long submitServerCrash(final ServerName serverName, final boolean shouldSplitWal) { 1343 boolean carryingMeta = isCarryingMeta(serverName); 1344 1345 // Remove the in-memory rsReports result 1346 synchronized (rsReports) { 1347 rsReports.remove(serverName); 1348 } 1349 1350 ProcedureExecutor<MasterProcedureEnv> procExec = this.master.getMasterProcedureExecutor(); 1351 long pid = procExec.submitProcedure(new ServerCrashProcedure(procExec.getEnvironment(), 1352 serverName, shouldSplitWal, carryingMeta)); 1353 LOG.debug("Added=" + serverName 1354 + " to dead servers, submitted shutdown handler to be executed meta=" + carryingMeta); 1355 return pid; 1356 } 1357 1358 public void offlineRegion(final RegionInfo regionInfo) { 1359 // TODO used by MasterRpcServices ServerCrashProcedure 1360 final RegionStateNode node = regionStates.getRegionStateNode(regionInfo); 1361 if (node != null) node.offline(); 1362 } 1363 1364 public void onlineRegion(final RegionInfo regionInfo, final ServerName serverName) { 1365 // TODO used by TestSplitTransactionOnCluster.java 1366 } 1367 1368 public Map<ServerName, List<RegionInfo>> getSnapShotOfAssignment( 1369 final Collection<RegionInfo> regions) { 1370 return regionStates.getSnapShotOfAssignment(regions); 1371 } 1372 1373 // ============================================================================================ 1374 // TODO: UTILS/HELPERS? 1375 // ============================================================================================ 1376 /** 1377 * Used by the client (via master) to identify if all regions have the schema updates 1378 * 1379 * @param tableName 1380 * @return Pair indicating the status of the alter command (pending/total) 1381 * @throws IOException 1382 */ 1383 public Pair<Integer, Integer> getReopenStatus(TableName tableName) { 1384 if (isTableDisabled(tableName)) return new Pair<Integer, Integer>(0, 0); 1385 1386 final List<RegionState> states = regionStates.getTableRegionStates(tableName); 1387 int ritCount = 0; 1388 for (RegionState regionState: states) { 1389 if (!regionState.isOpened() && !regionState.isSplit()) { 1390 ritCount++; 1391 } 1392 } 1393 return new Pair<Integer, Integer>(ritCount, states.size()); 1394 } 1395 1396 // ============================================================================================ 1397 // TODO: Region State In Transition 1398 // ============================================================================================ 1399 protected boolean addRegionInTransition(final RegionStateNode regionNode, 1400 final RegionTransitionProcedure procedure) { 1401 return regionStates.addRegionInTransition(regionNode, procedure); 1402 } 1403 1404 protected void removeRegionInTransition(final RegionStateNode regionNode, 1405 final RegionTransitionProcedure procedure) { 1406 regionStates.removeRegionInTransition(regionNode, procedure); 1407 } 1408 1409 public boolean hasRegionsInTransition() { 1410 return regionStates.hasRegionsInTransition(); 1411 } 1412 1413 public List<RegionStateNode> getRegionsInTransition() { 1414 return regionStates.getRegionsInTransition(); 1415 } 1416 1417 public List<RegionInfo> getAssignedRegions() { 1418 return regionStates.getAssignedRegions(); 1419 } 1420 1421 public RegionInfo getRegionInfo(final byte[] regionName) { 1422 final RegionStateNode regionState = regionStates.getRegionStateNodeFromName(regionName); 1423 return regionState != null ? regionState.getRegionInfo() : null; 1424 } 1425 1426 // ============================================================================================ 1427 // TODO: Region Status update 1428 // ============================================================================================ 1429 private void sendRegionOpenedNotification(final RegionInfo regionInfo, 1430 final ServerName serverName) { 1431 getBalancer().regionOnline(regionInfo, serverName); 1432 if (!this.listeners.isEmpty()) { 1433 for (AssignmentListener listener : this.listeners) { 1434 listener.regionOpened(regionInfo, serverName); 1435 } 1436 } 1437 } 1438 1439 private void sendRegionClosedNotification(final RegionInfo regionInfo) { 1440 getBalancer().regionOffline(regionInfo); 1441 if (!this.listeners.isEmpty()) { 1442 for (AssignmentListener listener : this.listeners) { 1443 listener.regionClosed(regionInfo); 1444 } 1445 } 1446 } 1447 1448 public void markRegionAsOpening(final RegionStateNode regionNode) throws IOException { 1449 synchronized (regionNode) { 1450 regionNode.transitionState(State.OPENING, RegionStates.STATES_EXPECTED_ON_OPEN); 1451 regionStates.addRegionToServer(regionNode); 1452 regionStateStore.updateRegionLocation(regionNode); 1453 } 1454 1455 // update the operation count metrics 1456 metrics.incrementOperationCounter(); 1457 } 1458 1459 public void undoRegionAsOpening(final RegionStateNode regionNode) { 1460 boolean opening = false; 1461 synchronized (regionNode) { 1462 if (regionNode.isInState(State.OPENING)) { 1463 opening = true; 1464 regionStates.removeRegionFromServer(regionNode.getRegionLocation(), regionNode); 1465 } 1466 // Should we update hbase:meta? 1467 } 1468 if (opening) { 1469 // TODO: Metrics. Do opposite of metrics.incrementOperationCounter(); 1470 } 1471 } 1472 1473 public void markRegionAsOpened(final RegionStateNode regionNode) throws IOException { 1474 final RegionInfo hri = regionNode.getRegionInfo(); 1475 synchronized (regionNode) { 1476 regionNode.transitionState(State.OPEN, RegionStates.STATES_EXPECTED_ON_OPEN); 1477 if (isMetaRegion(hri)) { 1478 // Usually we'd set a table ENABLED at this stage but hbase:meta is ALWAYs enabled, it 1479 // can't be disabled -- so skip the RPC (besides... enabled is managed by TableStateManager 1480 // which is backed by hbase:meta... Avoid setting ENABLED to avoid having to update state 1481 // on table that contains state. 1482 setMetaAssigned(hri, true); 1483 } 1484 regionStates.addRegionToServer(regionNode); 1485 // TODO: OPENING Updates hbase:meta too... we need to do both here and there? 1486 // That is a lot of hbase:meta writing. 1487 regionStateStore.updateRegionLocation(regionNode); 1488 sendRegionOpenedNotification(hri, regionNode.getRegionLocation()); 1489 } 1490 } 1491 1492 public void markRegionAsClosing(final RegionStateNode regionNode) throws IOException { 1493 final RegionInfo hri = regionNode.getRegionInfo(); 1494 synchronized (regionNode) { 1495 regionNode.transitionState(State.CLOSING, RegionStates.STATES_EXPECTED_ON_CLOSE); 1496 // Set meta has not initialized early. so people trying to create/edit tables will wait 1497 if (isMetaRegion(hri)) { 1498 setMetaAssigned(hri, false); 1499 } 1500 regionStates.addRegionToServer(regionNode); 1501 regionStateStore.updateRegionLocation(regionNode); 1502 } 1503 1504 // update the operation count metrics 1505 metrics.incrementOperationCounter(); 1506 } 1507 1508 public void undoRegionAsClosing(final RegionStateNode regionNode) { 1509 // TODO: Metrics. Do opposite of metrics.incrementOperationCounter(); 1510 // There is nothing to undo? 1511 } 1512 1513 public void markRegionAsClosed(final RegionStateNode regionNode) throws IOException { 1514 final RegionInfo hri = regionNode.getRegionInfo(); 1515 synchronized (regionNode) { 1516 regionNode.transitionState(State.CLOSED, RegionStates.STATES_EXPECTED_ON_CLOSE); 1517 regionStates.removeRegionFromServer(regionNode.getRegionLocation(), regionNode); 1518 regionNode.setLastHost(regionNode.getRegionLocation()); 1519 regionNode.setRegionLocation(null); 1520 regionStateStore.updateRegionLocation(regionNode); 1521 sendRegionClosedNotification(hri); 1522 } 1523 } 1524 1525 public void markRegionAsSplit(final RegionInfo parent, final ServerName serverName, 1526 final RegionInfo daughterA, final RegionInfo daughterB) throws IOException { 1527 // Update hbase:meta. Parent will be marked offline and split up in hbase:meta. 1528 // The parent stays in regionStates until cleared when removed by CatalogJanitor. 1529 // Update its state in regionStates to it shows as offline and split when read 1530 // later figuring what regions are in a table and what are not: see 1531 // regionStates#getRegionsOfTable 1532 final RegionStateNode node = regionStates.getOrCreateRegionStateNode(parent); 1533 node.setState(State.SPLIT); 1534 final RegionStateNode nodeA = regionStates.getOrCreateRegionStateNode(daughterA); 1535 nodeA.setState(State.SPLITTING_NEW); 1536 final RegionStateNode nodeB = regionStates.getOrCreateRegionStateNode(daughterB); 1537 nodeB.setState(State.SPLITTING_NEW); 1538 1539 regionStateStore.splitRegion(parent, daughterA, daughterB, serverName); 1540 if (shouldAssignFavoredNodes(parent)) { 1541 List<ServerName> onlineServers = this.master.getServerManager().getOnlineServersList(); 1542 ((FavoredNodesPromoter)getBalancer()). 1543 generateFavoredNodesForDaughter(onlineServers, parent, daughterA, daughterB); 1544 } 1545 } 1546 1547 /** 1548 * When called here, the merge has happened. The merged regions have been 1549 * unassigned and the above markRegionClosed has been called on each so they have been 1550 * disassociated from a hosting Server. The merged region will be open after this call. The 1551 * merged regions are removed from hbase:meta below. Later they are deleted from the filesystem 1552 * by the catalog janitor running against hbase:meta. It notices when the merged region no 1553 * longer holds references to the old regions (References are deleted after a compaction 1554 * rewrites what the Reference points at but not until the archiver chore runs, are the 1555 * References removed). 1556 */ 1557 public void markRegionAsMerged(final RegionInfo child, final ServerName serverName, 1558 RegionInfo [] mergeParents) 1559 throws IOException { 1560 final RegionStateNode node = regionStates.getOrCreateRegionStateNode(child); 1561 node.setState(State.MERGED); 1562 for (RegionInfo ri: mergeParents) { 1563 regionStates.deleteRegion(ri); 1564 1565 } 1566 regionStateStore.mergeRegions(child, mergeParents, serverName); 1567 if (shouldAssignFavoredNodes(child)) { 1568 ((FavoredNodesPromoter)getBalancer()). 1569 generateFavoredNodesForMergedRegion(child, mergeParents); 1570 } 1571 } 1572 1573 /* 1574 * Favored nodes should be applied only when FavoredNodes balancer is configured and the region 1575 * belongs to a non-system table. 1576 */ 1577 private boolean shouldAssignFavoredNodes(RegionInfo region) { 1578 return this.shouldAssignRegionsWithFavoredNodes && 1579 FavoredNodesManager.isFavoredNodeApplicable(region); 1580 } 1581 1582 // ============================================================================================ 1583 // Assign Queue (Assign/Balance) 1584 // ============================================================================================ 1585 private final ArrayList<RegionStateNode> pendingAssignQueue = new ArrayList<RegionStateNode>(); 1586 private final ReentrantLock assignQueueLock = new ReentrantLock(); 1587 private final Condition assignQueueFullCond = assignQueueLock.newCondition(); 1588 1589 /** 1590 * Add the assign operation to the assignment queue. 1591 * The pending assignment operation will be processed, 1592 * and each region will be assigned by a server using the balancer. 1593 */ 1594 protected void queueAssign(final RegionStateNode regionNode) { 1595 regionNode.getProcedureEvent().suspend(); 1596 1597 // TODO: quick-start for meta and the other sys-tables? 1598 assignQueueLock.lock(); 1599 try { 1600 pendingAssignQueue.add(regionNode); 1601 if (regionNode.isSystemTable() || 1602 pendingAssignQueue.size() == 1 || 1603 pendingAssignQueue.size() >= assignDispatchWaitQueueMaxSize) { 1604 assignQueueFullCond.signal(); 1605 } 1606 } finally { 1607 assignQueueLock.unlock(); 1608 } 1609 } 1610 1611 private void startAssignmentThread() { 1612 // Get Server Thread name. Sometimes the Server is mocked so may not implement HasThread. 1613 // For example, in tests. 1614 String name = master instanceof HasThread? ((HasThread)master).getName(): 1615 master.getServerName().toShortString(); 1616 assignThread = new Thread(name) { 1617 @Override 1618 public void run() { 1619 while (isRunning()) { 1620 processAssignQueue(); 1621 } 1622 pendingAssignQueue.clear(); 1623 } 1624 }; 1625 assignThread.setDaemon(true); 1626 assignThread.start(); 1627 } 1628 1629 private void stopAssignmentThread() { 1630 assignQueueSignal(); 1631 try { 1632 while (assignThread.isAlive()) { 1633 assignQueueSignal(); 1634 assignThread.join(250); 1635 } 1636 } catch (InterruptedException e) { 1637 LOG.warn("join interrupted", e); 1638 Thread.currentThread().interrupt(); 1639 } 1640 } 1641 1642 private void assignQueueSignal() { 1643 assignQueueLock.lock(); 1644 try { 1645 assignQueueFullCond.signal(); 1646 } finally { 1647 assignQueueLock.unlock(); 1648 } 1649 } 1650 1651 @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP") 1652 private HashMap<RegionInfo, RegionStateNode> waitOnAssignQueue() { 1653 HashMap<RegionInfo, RegionStateNode> regions = null; 1654 1655 assignQueueLock.lock(); 1656 try { 1657 if (pendingAssignQueue.isEmpty() && isRunning()) { 1658 assignQueueFullCond.await(); 1659 } 1660 1661 if (!isRunning()) return null; 1662 assignQueueFullCond.await(assignDispatchWaitMillis, TimeUnit.MILLISECONDS); 1663 regions = new HashMap<RegionInfo, RegionStateNode>(pendingAssignQueue.size()); 1664 for (RegionStateNode regionNode: pendingAssignQueue) { 1665 regions.put(regionNode.getRegionInfo(), regionNode); 1666 } 1667 pendingAssignQueue.clear(); 1668 } catch (InterruptedException e) { 1669 LOG.warn("got interrupted ", e); 1670 Thread.currentThread().interrupt(); 1671 } finally { 1672 assignQueueLock.unlock(); 1673 } 1674 return regions; 1675 } 1676 1677 private void processAssignQueue() { 1678 final HashMap<RegionInfo, RegionStateNode> regions = waitOnAssignQueue(); 1679 if (regions == null || regions.size() == 0 || !isRunning()) { 1680 return; 1681 } 1682 1683 if (LOG.isTraceEnabled()) { 1684 LOG.trace("PROCESS ASSIGN QUEUE regionCount=" + regions.size()); 1685 } 1686 1687 // TODO: Optimize balancer. pass a RegionPlan? 1688 final HashMap<RegionInfo, ServerName> retainMap = new HashMap<>(); 1689 final List<RegionInfo> userHRIs = new ArrayList<>(regions.size()); 1690 // Regions for system tables requiring reassignment 1691 final List<RegionInfo> systemHRIs = new ArrayList<>(); 1692 for (RegionStateNode regionStateNode: regions.values()) { 1693 boolean sysTable = regionStateNode.isSystemTable(); 1694 final List<RegionInfo> hris = sysTable? systemHRIs: userHRIs; 1695 if (regionStateNode.getRegionLocation() != null) { 1696 retainMap.put(regionStateNode.getRegionInfo(), regionStateNode.getRegionLocation()); 1697 } else { 1698 hris.add(regionStateNode.getRegionInfo()); 1699 } 1700 } 1701 1702 // TODO: connect with the listener to invalidate the cache 1703 1704 // TODO use events 1705 List<ServerName> servers = master.getServerManager().createDestinationServersList(); 1706 for (int i = 0; servers.size() < 1; ++i) { 1707 // Report every fourth time around this loop; try not to flood log. 1708 if (i % 4 == 0) { 1709 LOG.warn("No servers available; cannot place " + regions.size() + " unassigned regions."); 1710 } 1711 1712 if (!isRunning()) { 1713 LOG.debug("Stopped! Dropping assign of " + regions.size() + " queued regions."); 1714 return; 1715 } 1716 Threads.sleep(250); 1717 servers = master.getServerManager().createDestinationServersList(); 1718 } 1719 1720 if (!systemHRIs.isEmpty()) { 1721 // System table regions requiring reassignment are present, get region servers 1722 // not available for system table regions 1723 final List<ServerName> excludeServers = getExcludedServersForSystemTable(); 1724 List<ServerName> serversForSysTables = servers.stream() 1725 .filter(s -> !excludeServers.contains(s)).collect(Collectors.toList()); 1726 if (serversForSysTables.isEmpty()) { 1727 LOG.warn("Filtering old server versions and the excluded produced an empty set; " + 1728 "instead considering all candidate servers!"); 1729 } 1730 LOG.debug("Processing assignQueue; systemServersCount=" + serversForSysTables.size() + 1731 ", allServersCount=" + servers.size()); 1732 processAssignmentPlans(regions, null, systemHRIs, 1733 serversForSysTables.isEmpty()? servers: serversForSysTables); 1734 } 1735 1736 processAssignmentPlans(regions, retainMap, userHRIs, servers); 1737 } 1738 1739 private void processAssignmentPlans(final HashMap<RegionInfo, RegionStateNode> regions, 1740 final HashMap<RegionInfo, ServerName> retainMap, final List<RegionInfo> hris, 1741 final List<ServerName> servers) { 1742 boolean isTraceEnabled = LOG.isTraceEnabled(); 1743 if (isTraceEnabled) { 1744 LOG.trace("Available servers count=" + servers.size() + ": " + servers); 1745 } 1746 1747 final LoadBalancer balancer = getBalancer(); 1748 // ask the balancer where to place regions 1749 if (retainMap != null && !retainMap.isEmpty()) { 1750 if (isTraceEnabled) { 1751 LOG.trace("retain assign regions=" + retainMap); 1752 } 1753 try { 1754 acceptPlan(regions, balancer.retainAssignment(retainMap, servers)); 1755 } catch (HBaseIOException e) { 1756 LOG.warn("unable to retain assignment", e); 1757 addToPendingAssignment(regions, retainMap.keySet()); 1758 } 1759 } 1760 1761 // TODO: Do we need to split retain and round-robin? 1762 // the retain seems to fallback to round-robin/random if the region is not in the map. 1763 if (!hris.isEmpty()) { 1764 Collections.sort(hris, RegionInfo.COMPARATOR); 1765 if (isTraceEnabled) { 1766 LOG.trace("round robin regions=" + hris); 1767 } 1768 try { 1769 acceptPlan(regions, balancer.roundRobinAssignment(hris, servers)); 1770 } catch (HBaseIOException e) { 1771 LOG.warn("unable to round-robin assignment", e); 1772 addToPendingAssignment(regions, hris); 1773 } 1774 } 1775 } 1776 1777 private void acceptPlan(final HashMap<RegionInfo, RegionStateNode> regions, 1778 final Map<ServerName, List<RegionInfo>> plan) throws HBaseIOException { 1779 final ProcedureEvent<?>[] events = new ProcedureEvent[regions.size()]; 1780 final long st = System.currentTimeMillis(); 1781 1782 if (plan == null) { 1783 throw new HBaseIOException("unable to compute plans for regions=" + regions.size()); 1784 } 1785 1786 if (plan.isEmpty()) return; 1787 1788 List<RegionInfo> bogusRegions = plan.remove(LoadBalancer.BOGUS_SERVER_NAME); 1789 if (bogusRegions != null && !bogusRegions.isEmpty()) { 1790 addToPendingAssignment(regions, bogusRegions); 1791 } 1792 1793 int evcount = 0; 1794 for (Map.Entry<ServerName, List<RegionInfo>> entry: plan.entrySet()) { 1795 final ServerName server = entry.getKey(); 1796 for (RegionInfo hri: entry.getValue()) { 1797 final RegionStateNode regionNode = regions.get(hri); 1798 regionNode.setRegionLocation(server); 1799 events[evcount++] = regionNode.getProcedureEvent(); 1800 } 1801 } 1802 ProcedureEvent.wakeEvents(getProcedureScheduler(), events); 1803 1804 final long et = System.currentTimeMillis(); 1805 if (LOG.isTraceEnabled()) { 1806 LOG.trace("ASSIGN ACCEPT " + events.length + " -> " + 1807 StringUtils.humanTimeDiff(et - st)); 1808 } 1809 } 1810 1811 private void addToPendingAssignment(final HashMap<RegionInfo, RegionStateNode> regions, 1812 final Collection<RegionInfo> pendingRegions) { 1813 assignQueueLock.lock(); 1814 try { 1815 for (RegionInfo hri: pendingRegions) { 1816 pendingAssignQueue.add(regions.get(hri)); 1817 } 1818 } finally { 1819 assignQueueLock.unlock(); 1820 } 1821 } 1822 1823 /** 1824 * Get a list of servers that this region cannot be assigned to. 1825 * For system tables, we must assign them to a server with highest version. 1826 */ 1827 public List<ServerName> getExcludedServersForSystemTable() { 1828 // TODO: This should be a cached list kept by the ServerManager rather than calculated on each 1829 // move or system region assign. The RegionServerTracker keeps list of online Servers with 1830 // RegionServerInfo that includes Version. 1831 List<Pair<ServerName, String>> serverList = master.getServerManager().getOnlineServersList() 1832 .stream() 1833 .map((s)->new Pair<>(s, master.getRegionServerVersion(s))) 1834 .collect(Collectors.toList()); 1835 if (serverList.isEmpty()) { 1836 return Collections.emptyList(); 1837 } 1838 String highestVersion = Collections.max(serverList, 1839 (o1, o2) -> VersionInfo.compareVersion(o1.getSecond(), o2.getSecond())).getSecond(); 1840 return serverList.stream() 1841 .filter((p)->!p.getSecond().equals(highestVersion)) 1842 .map(Pair::getFirst) 1843 .collect(Collectors.toList()); 1844 } 1845 1846 // ============================================================================================ 1847 // Server Helpers 1848 // ============================================================================================ 1849 @Override 1850 public void serverAdded(final ServerName serverName) { 1851 } 1852 1853 @Override 1854 public void serverRemoved(final ServerName serverName) { 1855 final ServerStateNode serverNode = regionStates.getServerNode(serverName); 1856 if (serverNode == null) return; 1857 1858 // just in case, wake procedures waiting for this server report 1859 wakeServerReportEvent(serverNode); 1860 } 1861 1862 private void killRegionServer(final ServerName serverName) { 1863 master.getServerManager().expireServer(serverName); 1864 } 1865 1866 /** 1867 * <p> 1868 * This is a very particular check. The {@link org.apache.hadoop.hbase.master.ServerManager} is 1869 * where you go to check on state of 'Servers', what Servers are online, etc. 1870 * </p> 1871 * <p> 1872 * Here we are checking the state of a server that is post expiration, a ServerManager function 1873 * that moves a server from online to dead. Here we are seeing if the server has moved beyond a 1874 * particular point in the recovery process such that it is safe to move on with assigns; etc. 1875 * </p> 1876 * <p> 1877 * For now it is only used in 1878 * {@link UnassignProcedure#remoteCallFailed(MasterProcedureEnv, RegionStateNode, IOException)} to 1879 * see whether we can safely quit without losing data. 1880 * </p> 1881 * @param meta whether to check for meta log splitting 1882 * @return {@code true} if the server does not exist or the log splitting is done, i.e, the server 1883 * is in OFFLINE state, or for meta log, is in SPLITTING_META_DONE state. If null, 1884 * presumes the ServerStateNode was cleaned up by SCP. 1885 * @see UnassignProcedure#remoteCallFailed(MasterProcedureEnv, RegionStateNode, IOException) 1886 */ 1887 boolean isLogSplittingDone(ServerName serverName, boolean meta) { 1888 ServerStateNode ssn = this.regionStates.getServerNode(serverName); 1889 if (ssn == null) { 1890 return true; 1891 } 1892 ServerState[] inState = 1893 meta 1894 ? new ServerState[] { ServerState.SPLITTING_META_DONE, ServerState.SPLITTING, 1895 ServerState.OFFLINE } 1896 : new ServerState[] { ServerState.OFFLINE }; 1897 synchronized (ssn) { 1898 return ssn.isInState(inState); 1899 } 1900 } 1901 1902 @VisibleForTesting 1903 MasterServices getMaster() { 1904 return master; 1905 } 1906 1907 /** 1908 * @return a snapshot of rsReports 1909 */ 1910 public Map<ServerName, Set<byte[]>> getRSReports() { 1911 Map<ServerName, Set<byte[]>> rsReportsSnapshot = new HashMap<>(); 1912 synchronized (rsReports) { 1913 rsReports.entrySet().forEach(e -> rsReportsSnapshot.put(e.getKey(), e.getValue())); 1914 } 1915 return rsReportsSnapshot; 1916 } 1917 1918 /** 1919 * Provide regions state count for given table. 1920 * e.g howmany regions of give table are opened/closed/rit etc 1921 * 1922 * @param tableName TableName 1923 * @return region states count 1924 */ 1925 public RegionStatesCount getRegionStatesCount(TableName tableName) { 1926 int openRegionsCount = 0; 1927 int closedRegionCount = 0; 1928 int ritCount = 0; 1929 int splitRegionCount = 0; 1930 int totalRegionCount = 0; 1931 if (!isTableDisabled(tableName)) { 1932 final List<RegionState> states = regionStates.getTableRegionStates(tableName); 1933 for (RegionState regionState : states) { 1934 if (regionState.isOpened()) { 1935 openRegionsCount++; 1936 } else if (regionState.isClosed()) { 1937 closedRegionCount++; 1938 } else if (regionState.isSplit()) { 1939 splitRegionCount++; 1940 } 1941 } 1942 totalRegionCount = states.size(); 1943 ritCount = totalRegionCount - openRegionsCount - splitRegionCount; 1944 } 1945 return new RegionStatesCount.RegionStatesCountBuilder() 1946 .setOpenRegions(openRegionsCount) 1947 .setClosedRegions(closedRegionCount) 1948 .setSplitRegions(splitRegionCount) 1949 .setRegionsInTransition(ritCount) 1950 .setTotalRegions(totalRegionCount) 1951 .build(); 1952 } 1953 1954}