001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.assignment; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Collection; 023import java.util.Collections; 024import java.util.HashMap; 025import java.util.HashSet; 026import java.util.List; 027import java.util.Map; 028import java.util.Set; 029import java.util.concurrent.Future; 030import java.util.concurrent.TimeUnit; 031import java.util.concurrent.atomic.AtomicBoolean; 032import java.util.concurrent.locks.Condition; 033import java.util.concurrent.locks.ReentrantLock; 034import java.util.stream.Collectors; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.hbase.DoNotRetryIOException; 037import org.apache.hadoop.hbase.HBaseIOException; 038import org.apache.hadoop.hbase.HConstants; 039import org.apache.hadoop.hbase.MetaTableAccessor; 040import org.apache.hadoop.hbase.PleaseHoldException; 041import org.apache.hadoop.hbase.ServerName; 042import org.apache.hadoop.hbase.TableName; 043import org.apache.hadoop.hbase.UnknownRegionException; 044import org.apache.hadoop.hbase.client.DoNotRetryRegionException; 045import org.apache.hadoop.hbase.client.MasterSwitchType; 046import org.apache.hadoop.hbase.client.RegionInfo; 047import org.apache.hadoop.hbase.client.RegionInfoBuilder; 048import org.apache.hadoop.hbase.client.RegionReplicaUtil; 049import org.apache.hadoop.hbase.client.RegionStatesCount; 050import org.apache.hadoop.hbase.client.Result; 051import org.apache.hadoop.hbase.client.TableState; 052import org.apache.hadoop.hbase.exceptions.UnexpectedStateException; 053import org.apache.hadoop.hbase.favored.FavoredNodesManager; 054import org.apache.hadoop.hbase.favored.FavoredNodesPromoter; 055import org.apache.hadoop.hbase.master.LoadBalancer; 056import org.apache.hadoop.hbase.master.MasterServices; 057import org.apache.hadoop.hbase.master.MetricsAssignmentManager; 058import org.apache.hadoop.hbase.master.RegionPlan; 059import org.apache.hadoop.hbase.master.RegionState; 060import org.apache.hadoop.hbase.master.RegionState.State; 061import org.apache.hadoop.hbase.master.ServerManager; 062import org.apache.hadoop.hbase.master.TableStateManager; 063import org.apache.hadoop.hbase.master.balancer.FavoredStochasticBalancer; 064import org.apache.hadoop.hbase.master.procedure.HBCKServerCrashProcedure; 065import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 066import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler; 067import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait; 068import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure; 069import org.apache.hadoop.hbase.procedure2.Procedure; 070import org.apache.hadoop.hbase.procedure2.ProcedureEvent; 071import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; 072import org.apache.hadoop.hbase.procedure2.ProcedureInMemoryChore; 073import org.apache.hadoop.hbase.procedure2.util.StringUtils; 074import org.apache.hadoop.hbase.regionserver.SequenceId; 075import org.apache.hadoop.hbase.util.Bytes; 076import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 077import org.apache.hadoop.hbase.util.Pair; 078import org.apache.hadoop.hbase.util.Threads; 079import org.apache.hadoop.hbase.util.VersionInfo; 080import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; 081import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 082import org.apache.yetus.audience.InterfaceAudience; 083import org.apache.zookeeper.KeeperException; 084import org.slf4j.Logger; 085import org.slf4j.LoggerFactory; 086 087import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 088import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition; 089import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; 090import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest; 091import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse; 092 093/** 094 * The AssignmentManager is the coordinator for region assign/unassign operations. 095 * <ul> 096 * <li>In-memory states of regions and servers are stored in {@link RegionStates}.</li> 097 * <li>hbase:meta state updates are handled by {@link RegionStateStore}.</li> 098 * </ul> 099 * Regions are created by CreateTable, Split, Merge. 100 * Regions are deleted by DeleteTable, Split, Merge. 101 * Assigns are triggered by CreateTable, EnableTable, Split, Merge, ServerCrash. 102 * Unassigns are triggered by DisableTable, Split, Merge 103 */ 104@InterfaceAudience.Private 105public class AssignmentManager { 106 private static final Logger LOG = LoggerFactory.getLogger(AssignmentManager.class); 107 108 // TODO: AMv2 109 // - handle region migration from hbase1 to hbase2. 110 // - handle sys table assignment first (e.g. acl, namespace) 111 // - handle table priorities 112 // - If ServerBusyException trying to update hbase:meta, we abort the Master 113 // See updateRegionLocation in RegionStateStore. 114 // 115 // See also 116 // https://docs.google.com/document/d/1eVKa7FHdeoJ1-9o8yZcOTAQbv0u0bblBlCCzVSIn69g/edit#heading=h.ystjyrkbtoq5 117 // for other TODOs. 118 119 public static final String BOOTSTRAP_THREAD_POOL_SIZE_CONF_KEY = 120 "hbase.assignment.bootstrap.thread.pool.size"; 121 122 public static final String ASSIGN_DISPATCH_WAIT_MSEC_CONF_KEY = 123 "hbase.assignment.dispatch.wait.msec"; 124 private static final int DEFAULT_ASSIGN_DISPATCH_WAIT_MSEC = 150; 125 126 public static final String ASSIGN_DISPATCH_WAITQ_MAX_CONF_KEY = 127 "hbase.assignment.dispatch.wait.queue.max.size"; 128 private static final int DEFAULT_ASSIGN_DISPATCH_WAITQ_MAX = 100; 129 130 public static final String RIT_CHORE_INTERVAL_MSEC_CONF_KEY = 131 "hbase.assignment.rit.chore.interval.msec"; 132 private static final int DEFAULT_RIT_CHORE_INTERVAL_MSEC = 60 * 1000; 133 134 public static final String DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC_CONF_KEY = 135 "hbase.assignment.dead.region.metric.chore.interval.msec"; 136 private static final int DEFAULT_DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC = 120 * 1000; 137 138 public static final String ASSIGN_MAX_ATTEMPTS = 139 "hbase.assignment.maximum.attempts"; 140 private static final int DEFAULT_ASSIGN_MAX_ATTEMPTS = Integer.MAX_VALUE; 141 142 public static final String ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS = 143 "hbase.assignment.retry.immediately.maximum.attempts"; 144 private static final int DEFAULT_ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS = 3; 145 146 /** Region in Transition metrics threshold time */ 147 public static final String METRICS_RIT_STUCK_WARNING_THRESHOLD = 148 "hbase.metrics.rit.stuck.warning.threshold"; 149 private static final int DEFAULT_RIT_STUCK_WARNING_THRESHOLD = 60 * 1000; 150 public static final String UNEXPECTED_STATE_REGION = "Unexpected state for "; 151 152 private final ProcedureEvent<?> metaAssignEvent = new ProcedureEvent<>("meta assign"); 153 private final ProcedureEvent<?> metaLoadEvent = new ProcedureEvent<>("meta load"); 154 155 private final MetricsAssignmentManager metrics; 156 private final RegionInTransitionChore ritChore; 157 private final DeadServerMetricRegionChore deadMetricChore; 158 private final MasterServices master; 159 160 private final AtomicBoolean running = new AtomicBoolean(false); 161 private final RegionStates regionStates = new RegionStates(); 162 private final RegionStateStore regionStateStore; 163 164 private final Map<ServerName, Set<byte[]>> rsReports = new HashMap<>(); 165 166 private final boolean shouldAssignRegionsWithFavoredNodes; 167 private final int assignDispatchWaitQueueMaxSize; 168 private final int assignDispatchWaitMillis; 169 private final int assignMaxAttempts; 170 private final int assignRetryImmediatelyMaxAttempts; 171 172 private final Object checkIfShouldMoveSystemRegionLock = new Object(); 173 174 private Thread assignThread; 175 176 public AssignmentManager(final MasterServices master) { 177 this(master, new RegionStateStore(master)); 178 } 179 180 AssignmentManager(final MasterServices master, final RegionStateStore stateStore) { 181 this.master = master; 182 this.regionStateStore = stateStore; 183 this.metrics = new MetricsAssignmentManager(); 184 185 final Configuration conf = master.getConfiguration(); 186 187 // Only read favored nodes if using the favored nodes load balancer. 188 this.shouldAssignRegionsWithFavoredNodes = FavoredStochasticBalancer.class.isAssignableFrom( 189 conf.getClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, Object.class)); 190 191 this.assignDispatchWaitMillis = conf.getInt(ASSIGN_DISPATCH_WAIT_MSEC_CONF_KEY, 192 DEFAULT_ASSIGN_DISPATCH_WAIT_MSEC); 193 this.assignDispatchWaitQueueMaxSize = conf.getInt(ASSIGN_DISPATCH_WAITQ_MAX_CONF_KEY, 194 DEFAULT_ASSIGN_DISPATCH_WAITQ_MAX); 195 196 this.assignMaxAttempts = Math.max(1, conf.getInt(ASSIGN_MAX_ATTEMPTS, 197 DEFAULT_ASSIGN_MAX_ATTEMPTS)); 198 this.assignRetryImmediatelyMaxAttempts = conf.getInt(ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS, 199 DEFAULT_ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS); 200 201 int ritChoreInterval = conf.getInt(RIT_CHORE_INTERVAL_MSEC_CONF_KEY, 202 DEFAULT_RIT_CHORE_INTERVAL_MSEC); 203 this.ritChore = new RegionInTransitionChore(ritChoreInterval); 204 205 int deadRegionChoreInterval = conf.getInt(DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC_CONF_KEY, 206 DEFAULT_DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC); 207 if (deadRegionChoreInterval > 0) { 208 this.deadMetricChore = new DeadServerMetricRegionChore(deadRegionChoreInterval); 209 } else { 210 this.deadMetricChore = null; 211 } 212 } 213 214 public void start() throws IOException, KeeperException { 215 if (!running.compareAndSet(false, true)) { 216 return; 217 } 218 219 LOG.trace("Starting assignment manager"); 220 221 // Start the Assignment Thread 222 startAssignmentThread(); 223 224 // load meta region state 225 ZKWatcher zkw = master.getZooKeeper(); 226 // it could be null in some tests 227 if (zkw == null) { 228 return; 229 } 230 List<String> metaZNodes = zkw.getMetaReplicaNodes(); 231 LOG.debug("hbase:meta replica znodes: {}", metaZNodes); 232 for (String metaZNode : metaZNodes) { 233 int replicaId = zkw.getZNodePaths().getMetaReplicaIdFromZNode(metaZNode); 234 // here we are still in the early steps of active master startup. There is only one thread(us) 235 // can access AssignmentManager and create region node, so here we do not need to lock the 236 // region node. 237 RegionState regionState = MetaTableLocator.getMetaRegionState(zkw, replicaId); 238 RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionState.getRegion()); 239 regionNode.setRegionLocation(regionState.getServerName()); 240 regionNode.setState(regionState.getState()); 241 if (regionNode.getProcedure() != null) { 242 regionNode.getProcedure().stateLoaded(this, regionNode); 243 } 244 if (regionState.getServerName() != null) { 245 regionStates.addRegionToServer(regionNode); 246 } 247 if (RegionReplicaUtil.isDefaultReplica(replicaId)) { 248 setMetaAssigned(regionState.getRegion(), regionState.getState() == State.OPEN); 249 } 250 LOG.debug("Loaded hbase:meta {}", regionNode); 251 } 252 } 253 254 /** 255 * Create RegionStateNode based on the TRSP list, and attach the TRSP to the RegionStateNode. 256 * <p> 257 * This is used to restore the RIT region list, so we do not need to restore it in the loadingMeta 258 * method below. And it is also very important as now before submitting a TRSP, we need to attach 259 * it to the RegionStateNode, which acts like a guard, so we need to restore this information at 260 * the very beginning, before we start processing any procedures. 261 */ 262 public void setupRIT(List<TransitRegionStateProcedure> procs) { 263 procs.forEach(proc -> { 264 RegionInfo regionInfo = proc.getRegion(); 265 RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo); 266 TransitRegionStateProcedure existingProc = regionNode.getProcedure(); 267 if (existingProc != null) { 268 // This is possible, as we will detach the procedure from the RSN before we 269 // actually finish the procedure. This is because that, we will detach the TRSP from the RSN 270 // during execution, at that time, the procedure has not been marked as done in the pv2 271 // framework yet, so it is possible that we schedule a new TRSP immediately and when 272 // arriving here, we will find out that there are multiple TRSPs for the region. But we can 273 // make sure that, only the last one can take the charge, the previous ones should have all 274 // been finished already. So here we will compare the proc id, the greater one will win. 275 if (existingProc.getProcId() < proc.getProcId()) { 276 // the new one wins, unset and set it to the new one below 277 regionNode.unsetProcedure(existingProc); 278 } else { 279 // the old one wins, skip 280 return; 281 } 282 } 283 LOG.info("Attach {} to {} to restore RIT", proc, regionNode); 284 regionNode.setProcedure(proc); 285 }); 286 } 287 288 public void stop() { 289 if (!running.compareAndSet(true, false)) { 290 return; 291 } 292 293 LOG.info("Stopping assignment manager"); 294 295 // The AM is started before the procedure executor, 296 // but the actual work will be loaded/submitted only once we have the executor 297 final boolean hasProcExecutor = master.getMasterProcedureExecutor() != null; 298 299 // Remove the RIT chore 300 if (hasProcExecutor) { 301 master.getMasterProcedureExecutor().removeChore(this.ritChore); 302 if (this.deadMetricChore != null) { 303 master.getMasterProcedureExecutor().removeChore(this.deadMetricChore); 304 } 305 } 306 307 // Stop the Assignment Thread 308 stopAssignmentThread(); 309 310 // Stop the RegionStateStore 311 regionStates.clear(); 312 313 // Update meta events (for testing) 314 if (hasProcExecutor) { 315 metaLoadEvent.suspend(); 316 for (RegionInfo hri: getMetaRegionSet()) { 317 setMetaAssigned(hri, false); 318 } 319 } 320 } 321 322 public boolean isRunning() { 323 return running.get(); 324 } 325 326 public Configuration getConfiguration() { 327 return master.getConfiguration(); 328 } 329 330 public MetricsAssignmentManager getAssignmentManagerMetrics() { 331 return metrics; 332 } 333 334 private LoadBalancer getBalancer() { 335 return master.getLoadBalancer(); 336 } 337 338 private MasterProcedureEnv getProcedureEnvironment() { 339 return master.getMasterProcedureExecutor().getEnvironment(); 340 } 341 342 private MasterProcedureScheduler getProcedureScheduler() { 343 return getProcedureEnvironment().getProcedureScheduler(); 344 } 345 346 int getAssignMaxAttempts() { 347 return assignMaxAttempts; 348 } 349 350 int getAssignRetryImmediatelyMaxAttempts() { 351 return assignRetryImmediatelyMaxAttempts; 352 } 353 354 public RegionStates getRegionStates() { 355 return regionStates; 356 } 357 358 /** 359 * Returns the regions hosted by the specified server. 360 * <p/> 361 * Notice that, for SCP, after we submit the SCP, no one can change the region list for the 362 * ServerStateNode so we do not need any locks here. And for other usage, this can only give you a 363 * snapshot of the current region list for this server, which means, right after you get the 364 * region list, new regions may be moved to this server or some regions may be moved out from this 365 * server, so you should not use it critically if you need strong consistency. 366 */ 367 public List<RegionInfo> getRegionsOnServer(ServerName serverName) { 368 ServerStateNode serverInfo = regionStates.getServerNode(serverName); 369 if (serverInfo == null) { 370 return Collections.emptyList(); 371 } 372 return serverInfo.getRegionInfoList(); 373 } 374 375 public RegionStateStore getRegionStateStore() { 376 return regionStateStore; 377 } 378 379 public List<ServerName> getFavoredNodes(final RegionInfo regionInfo) { 380 return this.shouldAssignRegionsWithFavoredNodes 381 ? ((FavoredStochasticBalancer) getBalancer()).getFavoredNodes(regionInfo) 382 : ServerName.EMPTY_SERVER_LIST; 383 } 384 385 // ============================================================================================ 386 // Table State Manager helpers 387 // ============================================================================================ 388 private TableStateManager getTableStateManager() { 389 return master.getTableStateManager(); 390 } 391 392 private boolean isTableEnabled(final TableName tableName) { 393 return getTableStateManager().isTableState(tableName, TableState.State.ENABLED); 394 } 395 396 private boolean isTableDisabled(final TableName tableName) { 397 return getTableStateManager().isTableState(tableName, 398 TableState.State.DISABLED, TableState.State.DISABLING); 399 } 400 401 // ============================================================================================ 402 // META Helpers 403 // ============================================================================================ 404 private boolean isMetaRegion(final RegionInfo regionInfo) { 405 return regionInfo.isMetaRegion(); 406 } 407 408 public boolean isMetaRegion(final byte[] regionName) { 409 return getMetaRegionFromName(regionName) != null; 410 } 411 412 public RegionInfo getMetaRegionFromName(final byte[] regionName) { 413 for (RegionInfo hri: getMetaRegionSet()) { 414 if (Bytes.equals(hri.getRegionName(), regionName)) { 415 return hri; 416 } 417 } 418 return null; 419 } 420 421 public boolean isCarryingMeta(final ServerName serverName) { 422 // TODO: handle multiple meta 423 return isCarryingRegion(serverName, RegionInfoBuilder.FIRST_META_REGIONINFO); 424 } 425 426 private boolean isCarryingRegion(final ServerName serverName, final RegionInfo regionInfo) { 427 // TODO: check for state? 428 final RegionStateNode node = regionStates.getRegionStateNode(regionInfo); 429 return(node != null && serverName.equals(node.getRegionLocation())); 430 } 431 432 private RegionInfo getMetaForRegion(final RegionInfo regionInfo) { 433 //if (regionInfo.isMetaRegion()) return regionInfo; 434 // TODO: handle multiple meta. if the region provided is not meta lookup 435 // which meta the region belongs to. 436 return RegionInfoBuilder.FIRST_META_REGIONINFO; 437 } 438 439 // TODO: handle multiple meta. 440 private static final Set<RegionInfo> META_REGION_SET = 441 Collections.singleton(RegionInfoBuilder.FIRST_META_REGIONINFO); 442 public Set<RegionInfo> getMetaRegionSet() { 443 return META_REGION_SET; 444 } 445 446 // ============================================================================================ 447 // META Event(s) helpers 448 // ============================================================================================ 449 /** 450 * Notice that, this only means the meta region is available on a RS, but the AM may still be 451 * loading the region states from meta, so usually you need to check {@link #isMetaLoaded()} first 452 * before checking this method, unless you can make sure that your piece of code can only be 453 * executed after AM builds the region states. 454 * @see #isMetaLoaded() 455 */ 456 public boolean isMetaAssigned() { 457 return metaAssignEvent.isReady(); 458 } 459 460 public boolean isMetaRegionInTransition() { 461 return !isMetaAssigned(); 462 } 463 464 /** 465 * Notice that this event does not mean the AM has already finished region state rebuilding. See 466 * the comment of {@link #isMetaAssigned()} for more details. 467 * @see #isMetaAssigned() 468 */ 469 public boolean waitMetaAssigned(Procedure<?> proc, RegionInfo regionInfo) { 470 return getMetaAssignEvent(getMetaForRegion(regionInfo)).suspendIfNotReady(proc); 471 } 472 473 private void setMetaAssigned(RegionInfo metaRegionInfo, boolean assigned) { 474 assert isMetaRegion(metaRegionInfo) : "unexpected non-meta region " + metaRegionInfo; 475 ProcedureEvent<?> metaAssignEvent = getMetaAssignEvent(metaRegionInfo); 476 if (assigned) { 477 metaAssignEvent.wake(getProcedureScheduler()); 478 } else { 479 metaAssignEvent.suspend(); 480 } 481 } 482 483 private ProcedureEvent<?> getMetaAssignEvent(RegionInfo metaRegionInfo) { 484 assert isMetaRegion(metaRegionInfo) : "unexpected non-meta region " + metaRegionInfo; 485 // TODO: handle multiple meta. 486 return metaAssignEvent; 487 } 488 489 /** 490 * Wait until AM finishes the meta loading, i.e, the region states rebuilding. 491 * @see #isMetaLoaded() 492 * @see #waitMetaAssigned(Procedure, RegionInfo) 493 */ 494 public boolean waitMetaLoaded(Procedure<?> proc) { 495 return metaLoadEvent.suspendIfNotReady(proc); 496 } 497 498 /** 499 * This method will be called in master initialization method after calling 500 * {@link #processOfflineRegions()}, as in processOfflineRegions we will generate assign 501 * procedures for offline regions, which may be conflict with creating table. 502 * <p/> 503 * This is a bit dirty, should be reconsidered after we decide whether to keep the 504 * {@link #processOfflineRegions()} method. 505 */ 506 public void wakeMetaLoadedEvent() { 507 metaLoadEvent.wake(getProcedureScheduler()); 508 assert isMetaLoaded() : "expected meta to be loaded"; 509 } 510 511 /** 512 * Return whether AM finishes the meta loading, i.e, the region states rebuilding. 513 * @see #isMetaAssigned() 514 * @see #waitMetaLoaded(Procedure) 515 */ 516 public boolean isMetaLoaded() { 517 return metaLoadEvent.isReady(); 518 } 519 520 /** 521 * Start a new thread to check if there are region servers whose versions are higher than others. 522 * If so, move all system table regions to RS with the highest version to keep compatibility. 523 * The reason is, RS in new version may not be able to access RS in old version when there are 524 * some incompatible changes. 525 * <p>This method is called when a new RegionServer is added to cluster only.</p> 526 */ 527 public void checkIfShouldMoveSystemRegionAsync() { 528 // TODO: Fix this thread. If a server is killed and a new one started, this thread thinks that 529 // it should 'move' the system tables from the old server to the new server but 530 // ServerCrashProcedure is on it; and it will take care of the assign without dataloss. 531 if (this.master.getServerManager().countOfRegionServers() <= 1) { 532 return; 533 } 534 // This thread used to run whenever there was a change in the cluster. The ZooKeeper 535 // childrenChanged notification came in before the nodeDeleted message and so this method 536 // cold run before a ServerCrashProcedure could run. That meant that this thread could see 537 // a Crashed Server before ServerCrashProcedure and it could find system regions on the 538 // crashed server and go move them before ServerCrashProcedure had a chance; could be 539 // dataloss too if WALs were not recovered. 540 new Thread(() -> { 541 try { 542 synchronized (checkIfShouldMoveSystemRegionLock) { 543 List<RegionPlan> plans = new ArrayList<>(); 544 // TODO: I don't think this code does a good job if all servers in cluster have same 545 // version. It looks like it will schedule unnecessary moves. 546 for (ServerName server : getExcludedServersForSystemTable()) { 547 if (master.getServerManager().isServerDead(server)) { 548 // TODO: See HBASE-18494 and HBASE-18495. Though getExcludedServersForSystemTable() 549 // considers only online servers, the server could be queued for dead server 550 // processing. As region assignments for crashed server is handled by 551 // ServerCrashProcedure, do NOT handle them here. The goal is to handle this through 552 // regular flow of LoadBalancer as a favored node and not to have this special 553 // handling. 554 continue; 555 } 556 List<RegionInfo> regionsShouldMove = getSystemTables(server); 557 if (!regionsShouldMove.isEmpty()) { 558 for (RegionInfo regionInfo : regionsShouldMove) { 559 // null value for dest forces destination server to be selected by balancer 560 RegionPlan plan = new RegionPlan(regionInfo, server, null); 561 if (regionInfo.isMetaRegion()) { 562 // Must move meta region first. 563 LOG.info("Async MOVE of {} to newer Server={}", 564 regionInfo.getEncodedName(), server); 565 moveAsync(plan); 566 } else { 567 plans.add(plan); 568 } 569 } 570 } 571 for (RegionPlan plan : plans) { 572 LOG.info("Async MOVE of {} to newer Server={}", 573 plan.getRegionInfo().getEncodedName(), server); 574 moveAsync(plan); 575 } 576 } 577 } 578 } catch (Throwable t) { 579 LOG.error(t.toString(), t); 580 } 581 }).start(); 582 } 583 584 private List<RegionInfo> getSystemTables(ServerName serverName) { 585 ServerStateNode serverNode = regionStates.getServerNode(serverName); 586 if (serverNode == null) { 587 return Collections.emptyList(); 588 } 589 return serverNode.getSystemRegionInfoList(); 590 } 591 592 private void preTransitCheck(RegionStateNode regionNode, RegionState.State[] expectedStates) 593 throws HBaseIOException { 594 if (regionNode.getProcedure() != null) { 595 throw new HBaseIOException(regionNode + " is currently in transition; pid=" + 596 regionNode.getProcedure().getProcId()); 597 } 598 if (!regionNode.isInState(expectedStates)) { 599 throw new DoNotRetryRegionException(UNEXPECTED_STATE_REGION + regionNode); 600 } 601 if (isTableDisabled(regionNode.getTable())) { 602 throw new DoNotRetryIOException(regionNode.getTable() + " is disabled for " + regionNode); 603 } 604 } 605 606 /** 607 * Create an assign TransitRegionStateProcedure. Makes sure of RegionState. 608 * Throws exception if not appropriate UNLESS override is set. Used by hbck2 but also by 609 * straightline {@link #assign(RegionInfo, ServerName)} and 610 * {@link #assignAsync(RegionInfo, ServerName)}. 611 * @see #createAssignProcedure(RegionStateNode, ServerName) for a version that does NO checking 612 * used when only when no checking needed. 613 * @param override If false, check RegionState is appropriate for assign; if not throw exception. 614 */ 615 private TransitRegionStateProcedure createAssignProcedure(RegionInfo regionInfo, ServerName sn, 616 boolean override) throws IOException { 617 RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo); 618 regionNode.lock(); 619 try { 620 if (override) { 621 if (regionNode.getProcedure() != null) { 622 regionNode.unsetProcedure(regionNode.getProcedure()); 623 } 624 } else { 625 preTransitCheck(regionNode, STATES_EXPECTED_ON_ASSIGN); 626 } 627 assert regionNode.getProcedure() == null; 628 return regionNode.setProcedure(TransitRegionStateProcedure.assign(getProcedureEnvironment(), 629 regionInfo, sn)); 630 } finally { 631 regionNode.unlock(); 632 } 633 } 634 635 /** 636 * Create an assign TransitRegionStateProcedure. Does NO checking of RegionState. 637 * Presumes appriopriate state ripe for assign. 638 * @see #createAssignProcedure(RegionInfo, ServerName, boolean) 639 */ 640 private TransitRegionStateProcedure createAssignProcedure(RegionStateNode regionNode, 641 ServerName targetServer) { 642 regionNode.lock(); 643 try { 644 return regionNode.setProcedure(TransitRegionStateProcedure.assign(getProcedureEnvironment(), 645 regionNode.getRegionInfo(), targetServer)); 646 } finally { 647 regionNode.unlock(); 648 } 649 } 650 651 public long assign(RegionInfo regionInfo, ServerName sn) throws IOException { 652 TransitRegionStateProcedure proc = createAssignProcedure(regionInfo, sn, false); 653 ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc); 654 return proc.getProcId(); 655 } 656 657 public long assign(RegionInfo regionInfo) throws IOException { 658 return assign(regionInfo, null); 659 } 660 661 /** 662 * Submits a procedure that assigns a region to a target server without waiting for it to finish 663 * @param regionInfo the region we would like to assign 664 * @param sn target server name 665 */ 666 public Future<byte[]> assignAsync(RegionInfo regionInfo, ServerName sn) throws IOException { 667 return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), 668 createAssignProcedure(regionInfo, sn, false)); 669 } 670 671 /** 672 * Submits a procedure that assigns a region without waiting for it to finish 673 * @param regionInfo the region we would like to assign 674 */ 675 public Future<byte[]> assignAsync(RegionInfo regionInfo) throws IOException { 676 return assignAsync(regionInfo, null); 677 } 678 679 public long unassign(RegionInfo regionInfo) throws IOException { 680 RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo); 681 if (regionNode == null) { 682 throw new UnknownRegionException("No RegionState found for " + regionInfo.getEncodedName()); 683 } 684 TransitRegionStateProcedure proc; 685 regionNode.lock(); 686 try { 687 preTransitCheck(regionNode, STATES_EXPECTED_ON_UNASSIGN_OR_MOVE); 688 proc = TransitRegionStateProcedure.unassign(getProcedureEnvironment(), regionInfo); 689 regionNode.setProcedure(proc); 690 } finally { 691 regionNode.unlock(); 692 } 693 ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc); 694 return proc.getProcId(); 695 } 696 697 public TransitRegionStateProcedure createMoveRegionProcedure(RegionInfo regionInfo, 698 ServerName targetServer) throws HBaseIOException { 699 RegionStateNode regionNode = this.regionStates.getRegionStateNode(regionInfo); 700 if (regionNode == null) { 701 throw new UnknownRegionException("No RegionStateNode found for " + 702 regionInfo.getEncodedName() + "(Closed/Deleted?)"); 703 } 704 TransitRegionStateProcedure proc; 705 regionNode.lock(); 706 try { 707 preTransitCheck(regionNode, STATES_EXPECTED_ON_UNASSIGN_OR_MOVE); 708 regionNode.checkOnline(); 709 proc = TransitRegionStateProcedure.move(getProcedureEnvironment(), regionInfo, targetServer); 710 regionNode.setProcedure(proc); 711 } finally { 712 regionNode.unlock(); 713 } 714 return proc; 715 } 716 717 public void move(RegionInfo regionInfo) throws IOException { 718 TransitRegionStateProcedure proc = createMoveRegionProcedure(regionInfo, null); 719 ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc); 720 } 721 722 public Future<byte[]> moveAsync(RegionPlan regionPlan) throws HBaseIOException { 723 TransitRegionStateProcedure proc = 724 createMoveRegionProcedure(regionPlan.getRegionInfo(), regionPlan.getDestination()); 725 return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), proc); 726 } 727 728 // ============================================================================================ 729 // RegionTransition procedures helpers 730 // ============================================================================================ 731 732 /** 733 * Create round-robin assigns. Use on table creation to distribute out regions across cluster. 734 * @return AssignProcedures made out of the passed in <code>hris</code> and a call to the balancer 735 * to populate the assigns with targets chosen using round-robin (default balancer 736 * scheme). If at assign-time, the target chosen is no longer up, thats fine, the 737 * AssignProcedure will ask the balancer for a new target, and so on. 738 */ 739 public TransitRegionStateProcedure[] createRoundRobinAssignProcedures(List<RegionInfo> hris, 740 List<ServerName> serversToExclude) { 741 if (hris.isEmpty()) { 742 return new TransitRegionStateProcedure[0]; 743 } 744 745 if (serversToExclude != null 746 && this.master.getServerManager().getOnlineServersList().size() == 1) { 747 LOG.debug("Only one region server found and hence going ahead with the assignment"); 748 serversToExclude = null; 749 } 750 try { 751 // Ask the balancer to assign our regions. Pass the regions en masse. The balancer can do 752 // a better job if it has all the assignments in the one lump. 753 Map<ServerName, List<RegionInfo>> assignments = getBalancer().roundRobinAssignment(hris, 754 this.master.getServerManager().createDestinationServersList(serversToExclude)); 755 // Return mid-method! 756 return createAssignProcedures(assignments); 757 } catch (HBaseIOException hioe) { 758 LOG.warn("Failed roundRobinAssignment", hioe); 759 } 760 // If an error above, fall-through to this simpler assign. Last resort. 761 return createAssignProcedures(hris); 762 } 763 764 /** 765 * Create round-robin assigns. Use on table creation to distribute out regions across cluster. 766 * @return AssignProcedures made out of the passed in <code>hris</code> and a call to the balancer 767 * to populate the assigns with targets chosen using round-robin (default balancer 768 * scheme). If at assign-time, the target chosen is no longer up, thats fine, the 769 * AssignProcedure will ask the balancer for a new target, and so on. 770 */ 771 public TransitRegionStateProcedure[] createRoundRobinAssignProcedures(List<RegionInfo> hris) { 772 return createRoundRobinAssignProcedures(hris, null); 773 } 774 775 static int compare(TransitRegionStateProcedure left, TransitRegionStateProcedure right) { 776 if (left.getRegion().isMetaRegion()) { 777 if (right.getRegion().isMetaRegion()) { 778 return RegionInfo.COMPARATOR.compare(left.getRegion(), right.getRegion()); 779 } 780 return -1; 781 } else if (right.getRegion().isMetaRegion()) { 782 return +1; 783 } 784 if (left.getRegion().getTable().isSystemTable()) { 785 if (right.getRegion().getTable().isSystemTable()) { 786 return RegionInfo.COMPARATOR.compare(left.getRegion(), right.getRegion()); 787 } 788 return -1; 789 } else if (right.getRegion().getTable().isSystemTable()) { 790 return +1; 791 } 792 return RegionInfo.COMPARATOR.compare(left.getRegion(), right.getRegion()); 793 } 794 795 /** 796 * Create one TransitRegionStateProcedure to assign a region w/o specifying a target server. 797 * This method is called from HBCK2. 798 * @return an assign or null 799 */ 800 public TransitRegionStateProcedure createOneAssignProcedure(RegionInfo ri, boolean override) { 801 TransitRegionStateProcedure trsp = null; 802 try { 803 trsp = createAssignProcedure(ri, null, override); 804 } catch (IOException ioe) { 805 LOG.info("Failed {} assign, override={}" + 806 (override? "": "; set override to by-pass state checks."), 807 ri.getEncodedName(), override, ioe); 808 } 809 return trsp; 810 } 811 812 /** 813 * Create one TransitRegionStateProcedure to unassign a region. 814 * This method is called from HBCK2. 815 * @return an unassign or null 816 */ 817 public TransitRegionStateProcedure createOneUnassignProcedure(RegionInfo ri, boolean override) { 818 RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(ri); 819 TransitRegionStateProcedure trsp = null; 820 regionNode.lock(); 821 try { 822 if (override) { 823 if (regionNode.getProcedure() != null) { 824 regionNode.unsetProcedure(regionNode.getProcedure()); 825 } 826 } else { 827 // This is where we could throw an exception; i.e. override is false. 828 preTransitCheck(regionNode, STATES_EXPECTED_ON_UNASSIGN_OR_MOVE); 829 } 830 assert regionNode.getProcedure() == null; 831 trsp = TransitRegionStateProcedure.unassign(getProcedureEnvironment(), 832 regionNode.getRegionInfo()); 833 regionNode.setProcedure(trsp); 834 } catch (IOException ioe) { 835 // 'override' must be false here. 836 LOG.info("Failed {} unassign, override=false; set override to by-pass state checks.", 837 ri.getEncodedName(), ioe); 838 } finally{ 839 regionNode.unlock(); 840 } 841 return trsp; 842 } 843 844 /** 845 * Create an array of TransitRegionStateProcedure w/o specifying a target server. 846 * Used as fallback of caller is unable to do {@link #createAssignProcedures(Map)}. 847 * <p/> 848 * If no target server, at assign time, we will try to use the former location of the region if 849 * one exists. This is how we 'retain' the old location across a server restart. 850 * <p/> 851 * Should only be called when you can make sure that no one can touch these regions other than 852 * you. For example, when you are creating or enabling table. Presumes all Regions are in 853 * appropriate state ripe for assign; no checking of Region state is done in here. 854 * @see #createAssignProcedures(Map) 855 */ 856 public TransitRegionStateProcedure[] createAssignProcedures(List<RegionInfo> hris) { 857 return hris.stream().map(hri -> regionStates.getOrCreateRegionStateNode(hri)) 858 .map(regionNode -> createAssignProcedure(regionNode, null)) 859 .sorted(AssignmentManager::compare).toArray(TransitRegionStateProcedure[]::new); 860 } 861 862 /** 863 * Tied to {@link #createAssignProcedures(List)} in that it is called if caller is unable to run 864 * this method. Presumes all Regions are in appropriate state ripe for assign; no checking 865 * of Region state is done in here. 866 * @param assignments Map of assignments from which we produce an array of AssignProcedures. 867 * @return Assignments made from the passed in <code>assignments</code> 868 * @see #createAssignProcedures(List) 869 */ 870 private TransitRegionStateProcedure[] createAssignProcedures( 871 Map<ServerName, List<RegionInfo>> assignments) { 872 return assignments.entrySet().stream() 873 .flatMap(e -> e.getValue().stream().map(hri -> regionStates.getOrCreateRegionStateNode(hri)) 874 .map(regionNode -> createAssignProcedure(regionNode, e.getKey()))) 875 .sorted(AssignmentManager::compare).toArray(TransitRegionStateProcedure[]::new); 876 } 877 878 // for creating unassign TRSP when disabling a table or closing excess region replicas 879 private TransitRegionStateProcedure forceCreateUnssignProcedure(RegionStateNode regionNode) { 880 regionNode.lock(); 881 try { 882 if (regionNode.isInState(State.OFFLINE, State.CLOSED, State.SPLIT)) { 883 return null; 884 } 885 // in general, a split parent should be in CLOSED or SPLIT state, but anyway, let's check it 886 // here for safety 887 if (regionNode.getRegionInfo().isSplit()) { 888 LOG.warn("{} is a split parent but not in CLOSED or SPLIT state", regionNode); 889 return null; 890 } 891 // As in DisableTableProcedure or ModifyTableProcedure, we will hold the xlock for table, so 892 // we can make sure that this procedure has not been executed yet, as TRSP will hold the 893 // shared lock for table all the time. So here we will unset it and when it is actually 894 // executed, it will find that the attach procedure is not itself and quit immediately. 895 if (regionNode.getProcedure() != null) { 896 regionNode.unsetProcedure(regionNode.getProcedure()); 897 } 898 return regionNode.setProcedure(TransitRegionStateProcedure.unassign(getProcedureEnvironment(), 899 regionNode.getRegionInfo())); 900 } finally { 901 regionNode.unlock(); 902 } 903 } 904 905 /** 906 * Called by DisableTableProcedure to unassign all the regions for a table. 907 */ 908 public TransitRegionStateProcedure[] createUnassignProceduresForDisabling(TableName tableName) { 909 return regionStates.getTableRegionStateNodes(tableName).stream() 910 .map(this::forceCreateUnssignProcedure).filter(p -> p != null) 911 .toArray(TransitRegionStateProcedure[]::new); 912 } 913 914 /** 915 * Called by ModifyTableProcedures to unassign all the excess region replicas 916 * for a table. 917 */ 918 public TransitRegionStateProcedure[] createUnassignProceduresForClosingExcessRegionReplicas( 919 TableName tableName, int newReplicaCount) { 920 return regionStates.getTableRegionStateNodes(tableName).stream() 921 .filter(regionNode -> regionNode.getRegionInfo().getReplicaId() >= newReplicaCount) 922 .map(this::forceCreateUnssignProcedure).filter(p -> p != null) 923 .toArray(TransitRegionStateProcedure[]::new); 924 } 925 926 public SplitTableRegionProcedure createSplitProcedure(final RegionInfo regionToSplit, 927 final byte[] splitKey) throws IOException { 928 return new SplitTableRegionProcedure(getProcedureEnvironment(), regionToSplit, splitKey); 929 } 930 931 public MergeTableRegionsProcedure createMergeProcedure(RegionInfo ... ris) throws IOException { 932 return new MergeTableRegionsProcedure(getProcedureEnvironment(), ris, false); 933 } 934 935 /** 936 * Delete the region states. This is called by "DeleteTable" 937 */ 938 public void deleteTable(final TableName tableName) throws IOException { 939 final ArrayList<RegionInfo> regions = regionStates.getTableRegionsInfo(tableName); 940 regionStateStore.deleteRegions(regions); 941 for (int i = 0; i < regions.size(); ++i) { 942 final RegionInfo regionInfo = regions.get(i); 943 // we expect the region to be offline 944 regionStates.removeFromOfflineRegions(regionInfo); 945 regionStates.deleteRegion(regionInfo); 946 } 947 } 948 949 // ============================================================================================ 950 // RS Region Transition Report helpers 951 // ============================================================================================ 952 private void reportRegionStateTransition(ReportRegionStateTransitionResponse.Builder builder, 953 ServerName serverName, List<RegionStateTransition> transitionList) throws IOException { 954 for (RegionStateTransition transition : transitionList) { 955 switch (transition.getTransitionCode()) { 956 case OPENED: 957 case FAILED_OPEN: 958 case CLOSED: 959 assert transition.getRegionInfoCount() == 1 : transition; 960 final RegionInfo hri = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0)); 961 long procId = 962 transition.getProcIdCount() > 0 ? transition.getProcId(0) : Procedure.NO_PROC_ID; 963 updateRegionTransition(serverName, transition.getTransitionCode(), hri, 964 transition.hasOpenSeqNum() ? transition.getOpenSeqNum() : HConstants.NO_SEQNUM, procId); 965 break; 966 case READY_TO_SPLIT: 967 case SPLIT: 968 case SPLIT_REVERTED: 969 assert transition.getRegionInfoCount() == 3 : transition; 970 final RegionInfo parent = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0)); 971 final RegionInfo splitA = ProtobufUtil.toRegionInfo(transition.getRegionInfo(1)); 972 final RegionInfo splitB = ProtobufUtil.toRegionInfo(transition.getRegionInfo(2)); 973 updateRegionSplitTransition(serverName, transition.getTransitionCode(), parent, splitA, 974 splitB); 975 break; 976 case READY_TO_MERGE: 977 case MERGED: 978 case MERGE_REVERTED: 979 assert transition.getRegionInfoCount() == 3 : transition; 980 final RegionInfo merged = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0)); 981 final RegionInfo mergeA = ProtobufUtil.toRegionInfo(transition.getRegionInfo(1)); 982 final RegionInfo mergeB = ProtobufUtil.toRegionInfo(transition.getRegionInfo(2)); 983 updateRegionMergeTransition(serverName, transition.getTransitionCode(), merged, mergeA, 984 mergeB); 985 break; 986 } 987 } 988 } 989 990 public ReportRegionStateTransitionResponse reportRegionStateTransition( 991 final ReportRegionStateTransitionRequest req) throws PleaseHoldException { 992 ReportRegionStateTransitionResponse.Builder builder = 993 ReportRegionStateTransitionResponse.newBuilder(); 994 ServerName serverName = ProtobufUtil.toServerName(req.getServer()); 995 ServerStateNode serverNode = regionStates.getOrCreateServer(serverName); 996 // here we have to acquire a read lock instead of a simple exclusive lock. This is because that 997 // we should not block other reportRegionStateTransition call from the same region server. This 998 // is not only about performance, but also to prevent dead lock. Think of the meta region is 999 // also on the same region server and you hold the lock which blocks the 1000 // reportRegionStateTransition for meta, and since meta is not online, you will block inside the 1001 // lock protection to wait for meta online... 1002 serverNode.readLock().lock(); 1003 try { 1004 // we only accept reportRegionStateTransition if the region server is online, see the comment 1005 // above in submitServerCrash method and HBASE-21508 for more details. 1006 if (serverNode.isInState(ServerState.ONLINE)) { 1007 try { 1008 reportRegionStateTransition(builder, serverName, req.getTransitionList()); 1009 } catch (PleaseHoldException e) { 1010 LOG.trace("Failed transition ", e); 1011 throw e; 1012 } catch (UnsupportedOperationException | IOException e) { 1013 // TODO: at the moment we have a single error message and the RS will abort 1014 // if the master says that one of the region transitions failed. 1015 LOG.warn("Failed transition", e); 1016 builder.setErrorMessage("Failed transition " + e.getMessage()); 1017 } 1018 } else { 1019 LOG.warn("The region server {} is already dead, skip reportRegionStateTransition call", 1020 serverName); 1021 builder.setErrorMessage("You are dead"); 1022 } 1023 } finally { 1024 serverNode.readLock().unlock(); 1025 } 1026 1027 return builder.build(); 1028 } 1029 1030 private void updateRegionTransition(ServerName serverName, TransitionCode state, 1031 RegionInfo regionInfo, long seqId, long procId) throws IOException { 1032 checkMetaLoaded(regionInfo); 1033 1034 RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo); 1035 if (regionNode == null) { 1036 // the table/region is gone. maybe a delete, split, merge 1037 throw new UnexpectedStateException(String.format( 1038 "Server %s was trying to transition region %s to %s. but Region is not known.", 1039 serverName, regionInfo, state)); 1040 } 1041 LOG.trace("Update region transition serverName={} region={} regionState={}", serverName, 1042 regionNode, state); 1043 1044 ServerStateNode serverNode = regionStates.getOrCreateServer(serverName); 1045 regionNode.lock(); 1046 try { 1047 if (!reportTransition(regionNode, serverNode, state, seqId, procId)) { 1048 // Don't log WARN if shutting down cluster; during shutdown. Avoid the below messages: 1049 // 2018-08-13 10:45:10,551 WARN ...AssignmentManager: No matching procedure found for 1050 // rit=OPEN, location=ve0538.halxg.cloudera.com,16020,1533493000958, 1051 // table=IntegrationTestBigLinkedList, region=65ab289e2fc1530df65f6c3d7cde7aa5 transition 1052 // to CLOSED 1053 // These happen because on cluster shutdown, we currently let the RegionServers close 1054 // regions. This is the only time that region close is not run by the Master (so cluster 1055 // goes down fast). Consider changing it so Master runs all shutdowns. 1056 if (this.master.getServerManager().isClusterShutdown() && 1057 state.equals(TransitionCode.CLOSED)) { 1058 LOG.info("RegionServer {} {}", state, regionNode.getRegionInfo().getEncodedName()); 1059 } else { 1060 LOG.warn("No matching procedure found for {} transition on {} to {}", 1061 serverName, regionNode, state); 1062 } 1063 } 1064 } finally { 1065 regionNode.unlock(); 1066 } 1067 } 1068 1069 private boolean reportTransition(RegionStateNode regionNode, ServerStateNode serverNode, 1070 TransitionCode state, long seqId, long procId) throws IOException { 1071 ServerName serverName = serverNode.getServerName(); 1072 TransitRegionStateProcedure proc = regionNode.getProcedure(); 1073 if (proc == null) { 1074 return false; 1075 } 1076 proc.reportTransition(master.getMasterProcedureExecutor().getEnvironment(), regionNode, 1077 serverName, state, seqId, procId); 1078 return true; 1079 } 1080 1081 private void updateRegionSplitTransition(final ServerName serverName, final TransitionCode state, 1082 final RegionInfo parent, final RegionInfo hriA, final RegionInfo hriB) 1083 throws IOException { 1084 checkMetaLoaded(parent); 1085 1086 if (state != TransitionCode.READY_TO_SPLIT) { 1087 throw new UnexpectedStateException("unsupported split regionState=" + state + 1088 " for parent region " + parent + 1089 " maybe an old RS (< 2.0) had the operation in progress"); 1090 } 1091 1092 // sanity check on the request 1093 if (!Bytes.equals(hriA.getEndKey(), hriB.getStartKey())) { 1094 throw new UnsupportedOperationException( 1095 "unsupported split request with bad keys: parent=" + parent + 1096 " hriA=" + hriA + " hriB=" + hriB); 1097 } 1098 1099 if (!master.isSplitOrMergeEnabled(MasterSwitchType.SPLIT)) { 1100 LOG.warn("Split switch is off! skip split of " + parent); 1101 throw new DoNotRetryIOException("Split region " + parent.getRegionNameAsString() + 1102 " failed due to split switch off"); 1103 } 1104 1105 // Submit the Split procedure 1106 final byte[] splitKey = hriB.getStartKey(); 1107 if (LOG.isDebugEnabled()) { 1108 LOG.debug("Split request from " + serverName + 1109 ", parent=" + parent + " splitKey=" + Bytes.toStringBinary(splitKey)); 1110 } 1111 master.getMasterProcedureExecutor().submitProcedure(createSplitProcedure(parent, splitKey)); 1112 1113 // If the RS is < 2.0 throw an exception to abort the operation, we are handling the split 1114 if (master.getServerManager().getVersionNumber(serverName) < 0x0200000) { 1115 throw new UnsupportedOperationException(String.format("Split handled by the master: " + 1116 "parent=%s hriA=%s hriB=%s", parent.getShortNameToLog(), hriA, hriB)); 1117 } 1118 } 1119 1120 private void updateRegionMergeTransition(final ServerName serverName, final TransitionCode state, 1121 final RegionInfo merged, final RegionInfo hriA, final RegionInfo hriB) throws IOException { 1122 checkMetaLoaded(merged); 1123 1124 if (state != TransitionCode.READY_TO_MERGE) { 1125 throw new UnexpectedStateException("Unsupported merge regionState=" + state + 1126 " for regionA=" + hriA + " regionB=" + hriB + " merged=" + merged + 1127 " maybe an old RS (< 2.0) had the operation in progress"); 1128 } 1129 1130 if (!master.isSplitOrMergeEnabled(MasterSwitchType.MERGE)) { 1131 LOG.warn("Merge switch is off! skip merge of regionA=" + hriA + " regionB=" + hriB); 1132 throw new DoNotRetryIOException("Merge of regionA=" + hriA + " regionB=" + hriB + 1133 " failed because merge switch is off"); 1134 } 1135 1136 // Submit the Merge procedure 1137 if (LOG.isDebugEnabled()) { 1138 LOG.debug("Handling merge request from RS=" + merged + ", merged=" + merged); 1139 } 1140 master.getMasterProcedureExecutor().submitProcedure(createMergeProcedure(hriA, hriB)); 1141 1142 // If the RS is < 2.0 throw an exception to abort the operation, we are handling the merge 1143 if (master.getServerManager().getVersionNumber(serverName) < 0x0200000) { 1144 throw new UnsupportedOperationException(String.format( 1145 "Merge not handled yet: regionState=%s merged=%s hriA=%s hriB=%s", state, merged, hriA, 1146 hriB)); 1147 } 1148 } 1149 1150 // ============================================================================================ 1151 // RS Status update (report online regions) helpers 1152 // ============================================================================================ 1153 /** 1154 * The master will call this method when the RS send the regionServerReport(). The report will 1155 * contains the "online regions". This method will check the the online regions against the 1156 * in-memory state of the AM, and we will log a warn message if there is a mismatch. This is 1157 * because that there is no fencing between the reportRegionStateTransition method and 1158 * regionServerReport method, so there could be race and introduce inconsistency here, but 1159 * actually there is no problem. 1160 * <p/> 1161 * Please see HBASE-21421 and HBASE-21463 for more details. 1162 */ 1163 public void reportOnlineRegions(ServerName serverName, Set<byte[]> regionNames) { 1164 if (!isRunning()) { 1165 return; 1166 } 1167 if (LOG.isTraceEnabled()) { 1168 LOG.trace("ReportOnlineRegions {} regionCount={}, metaLoaded={} {}", serverName, 1169 regionNames.size(), isMetaLoaded(), 1170 regionNames.stream().map(Bytes::toStringBinary).collect(Collectors.toList())); 1171 } 1172 1173 ServerStateNode serverNode = regionStates.getOrCreateServer(serverName); 1174 synchronized (serverNode) { 1175 if (!serverNode.isInState(ServerState.ONLINE)) { 1176 LOG.warn("Got a report from a server result in state " + serverNode.getState()); 1177 return; 1178 } 1179 } 1180 1181 // Track the regionserver reported online regions in memory. 1182 synchronized (rsReports) { 1183 rsReports.put(serverName, regionNames); 1184 } 1185 1186 if (regionNames.isEmpty()) { 1187 // nothing to do if we don't have regions 1188 LOG.trace("no online region found on {}", serverName); 1189 return; 1190 } 1191 if (!isMetaLoaded()) { 1192 // we are still on startup, skip checking 1193 return; 1194 } 1195 // The Heartbeat tells us of what regions are on the region serve, check the state. 1196 checkOnlineRegionsReport(serverNode, regionNames); 1197 } 1198 1199 /** 1200 * Close <code>regionName</code> on <code>sn</code> silently and immediately without 1201 * using a Procedure or going via hbase:meta. For case where a RegionServer's hosting 1202 * of a Region is not aligned w/ the Master's accounting of Region state. This is for 1203 * cleaning up an error in accounting. 1204 */ 1205 private void closeRegionSilently(ServerName sn, byte [] regionName) { 1206 try { 1207 RegionInfo ri = MetaTableAccessor.parseRegionInfoFromRegionName(regionName); 1208 // Pass -1 for timeout. Means do not wait. 1209 ServerManager.closeRegionSilentlyAndWait(this.master.getClusterConnection(), sn, ri, -1); 1210 } catch (Exception e) { 1211 LOG.error("Failed trying to close {} on {}", Bytes.toStringBinary(regionName), sn, e); 1212 } 1213 } 1214 1215 /** 1216 * Check that what the RegionServer reports aligns with the Master's image. 1217 * If disagreement, we will tell the RegionServer to expediently close 1218 * a Region we do not think it should have. 1219 */ 1220 private void checkOnlineRegionsReport(ServerStateNode serverNode, Set<byte[]> regionNames) { 1221 ServerName serverName = serverNode.getServerName(); 1222 for (byte[] regionName : regionNames) { 1223 if (!isRunning()) { 1224 return; 1225 } 1226 RegionStateNode regionNode = regionStates.getRegionStateNodeFromName(regionName); 1227 if (regionNode == null) { 1228 String regionNameAsStr = Bytes.toStringBinary(regionName); 1229 LOG.warn("No RegionStateNode for {} but reported as up on {}; closing...", 1230 regionNameAsStr, serverName); 1231 closeRegionSilently(serverNode.getServerName(), regionName); 1232 continue; 1233 } 1234 final long lag = 1000; 1235 regionNode.lock(); 1236 try { 1237 long diff = EnvironmentEdgeManager.currentTime() - regionNode.getLastUpdate(); 1238 if (regionNode.isInState(State.OPENING, State.OPEN)) { 1239 // This is possible as a region server has just closed a region but the region server 1240 // report is generated before the closing, but arrive after the closing. Make sure there 1241 // is some elapsed time so less false alarms. 1242 if (!regionNode.getRegionLocation().equals(serverName) && diff > lag) { 1243 LOG.warn("Reporting {} server does not match {} (time since last " + 1244 "update={}ms); closing...", 1245 serverName, regionNode, diff); 1246 closeRegionSilently(serverNode.getServerName(), regionName); 1247 } 1248 } else if (!regionNode.isInState(State.CLOSING, State.SPLITTING)) { 1249 // So, we can get report that a region is CLOSED or SPLIT because a heartbeat 1250 // came in at about same time as a region transition. Make sure there is some 1251 // elapsed time so less false alarms. 1252 if (diff > lag) { 1253 LOG.warn("Reporting {} state does not match {} (time since last update={}ms)", 1254 serverName, regionNode, diff); 1255 } 1256 } 1257 } finally { 1258 regionNode.unlock(); 1259 } 1260 } 1261 } 1262 1263 // ============================================================================================ 1264 // RIT chore 1265 // ============================================================================================ 1266 private static class RegionInTransitionChore extends ProcedureInMemoryChore<MasterProcedureEnv> { 1267 public RegionInTransitionChore(final int timeoutMsec) { 1268 super(timeoutMsec); 1269 } 1270 1271 @Override 1272 protected void periodicExecute(final MasterProcedureEnv env) { 1273 final AssignmentManager am = env.getAssignmentManager(); 1274 1275 final RegionInTransitionStat ritStat = am.computeRegionInTransitionStat(); 1276 if (ritStat.hasRegionsOverThreshold()) { 1277 for (RegionState hri: ritStat.getRegionOverThreshold()) { 1278 am.handleRegionOverStuckWarningThreshold(hri.getRegion()); 1279 } 1280 } 1281 1282 // update metrics 1283 am.updateRegionsInTransitionMetrics(ritStat); 1284 } 1285 } 1286 1287 private static class DeadServerMetricRegionChore 1288 extends ProcedureInMemoryChore<MasterProcedureEnv> { 1289 public DeadServerMetricRegionChore(final int timeoutMsec) { 1290 super(timeoutMsec); 1291 } 1292 1293 @Override 1294 protected void periodicExecute(final MasterProcedureEnv env) { 1295 final ServerManager sm = env.getMasterServices().getServerManager(); 1296 final AssignmentManager am = env.getAssignmentManager(); 1297 // To minimize inconsistencies we are not going to snapshot live servers in advance in case 1298 // new servers are added; OTOH we don't want to add heavy sync for a consistent view since 1299 // this is for metrics. Instead, we're going to check each regions as we go; to avoid making 1300 // too many checks, we maintain a local lists of server, limiting us to false negatives. If 1301 // we miss some recently-dead server, we'll just see it next time. 1302 Set<ServerName> recentlyLiveServers = new HashSet<>(); 1303 int deadRegions = 0, unknownRegions = 0; 1304 for (RegionStateNode rsn : am.getRegionStates().getRegionStateNodes()) { 1305 if (rsn.getState() != State.OPEN) { 1306 continue; // Opportunistic check, should quickly skip RITs, offline tables, etc. 1307 } 1308 // Do not need to acquire region state lock as this is only for showing metrics. 1309 ServerName sn = rsn.getRegionLocation(); 1310 State state = rsn.getState(); 1311 if (state != State.OPEN) { 1312 continue; // Mostly skipping RITs that are already being take care of. 1313 } 1314 if (sn == null) { 1315 ++unknownRegions; // Opened on null? 1316 continue; 1317 } 1318 if (recentlyLiveServers.contains(sn)) { 1319 continue; 1320 } 1321 ServerManager.ServerLiveState sls = sm.isServerKnownAndOnline(sn); 1322 switch (sls) { 1323 case LIVE: 1324 recentlyLiveServers.add(sn); 1325 break; 1326 case DEAD: 1327 ++deadRegions; 1328 break; 1329 case UNKNOWN: 1330 ++unknownRegions; 1331 break; 1332 default: throw new AssertionError("Unexpected " + sls); 1333 } 1334 } 1335 if (deadRegions > 0 || unknownRegions > 0) { 1336 LOG.info("Found {} OPEN regions on dead servers and {} OPEN regions on unknown servers", 1337 deadRegions, unknownRegions); 1338 } 1339 1340 am.updateDeadServerRegionMetrics(deadRegions, unknownRegions); 1341 } 1342 } 1343 1344 public RegionInTransitionStat computeRegionInTransitionStat() { 1345 final RegionInTransitionStat rit = new RegionInTransitionStat(getConfiguration()); 1346 rit.update(this); 1347 return rit; 1348 } 1349 1350 public static class RegionInTransitionStat { 1351 private final int ritThreshold; 1352 1353 private HashMap<String, RegionState> ritsOverThreshold = null; 1354 private long statTimestamp; 1355 private long oldestRITTime = 0; 1356 private int totalRITsTwiceThreshold = 0; 1357 private int totalRITs = 0; 1358 1359 public RegionInTransitionStat(final Configuration conf) { 1360 this.ritThreshold = 1361 conf.getInt(METRICS_RIT_STUCK_WARNING_THRESHOLD, DEFAULT_RIT_STUCK_WARNING_THRESHOLD); 1362 } 1363 1364 public int getRITThreshold() { 1365 return ritThreshold; 1366 } 1367 1368 public long getTimestamp() { 1369 return statTimestamp; 1370 } 1371 1372 public int getTotalRITs() { 1373 return totalRITs; 1374 } 1375 1376 public long getOldestRITTime() { 1377 return oldestRITTime; 1378 } 1379 1380 public int getTotalRITsOverThreshold() { 1381 Map<String, RegionState> m = this.ritsOverThreshold; 1382 return m != null ? m.size() : 0; 1383 } 1384 1385 public boolean hasRegionsTwiceOverThreshold() { 1386 return totalRITsTwiceThreshold > 0; 1387 } 1388 1389 public boolean hasRegionsOverThreshold() { 1390 Map<String, RegionState> m = this.ritsOverThreshold; 1391 return m != null && !m.isEmpty(); 1392 } 1393 1394 public Collection<RegionState> getRegionOverThreshold() { 1395 Map<String, RegionState> m = this.ritsOverThreshold; 1396 return m != null? m.values(): Collections.emptySet(); 1397 } 1398 1399 public boolean isRegionOverThreshold(final RegionInfo regionInfo) { 1400 Map<String, RegionState> m = this.ritsOverThreshold; 1401 return m != null && m.containsKey(regionInfo.getEncodedName()); 1402 } 1403 1404 public boolean isRegionTwiceOverThreshold(final RegionInfo regionInfo) { 1405 Map<String, RegionState> m = this.ritsOverThreshold; 1406 if (m == null) { 1407 return false; 1408 } 1409 final RegionState state = m.get(regionInfo.getEncodedName()); 1410 if (state == null) { 1411 return false; 1412 } 1413 return (statTimestamp - state.getStamp()) > (ritThreshold * 2); 1414 } 1415 1416 protected void update(final AssignmentManager am) { 1417 final RegionStates regionStates = am.getRegionStates(); 1418 this.statTimestamp = EnvironmentEdgeManager.currentTime(); 1419 update(regionStates.getRegionsStateInTransition(), statTimestamp); 1420 update(regionStates.getRegionFailedOpen(), statTimestamp); 1421 } 1422 1423 private void update(final Collection<RegionState> regions, final long currentTime) { 1424 for (RegionState state: regions) { 1425 totalRITs++; 1426 final long ritTime = currentTime - state.getStamp(); 1427 if (ritTime > ritThreshold) { 1428 if (ritsOverThreshold == null) { 1429 ritsOverThreshold = new HashMap<String, RegionState>(); 1430 } 1431 ritsOverThreshold.put(state.getRegion().getEncodedName(), state); 1432 totalRITsTwiceThreshold += (ritTime > (ritThreshold * 2)) ? 1 : 0; 1433 } 1434 if (oldestRITTime < ritTime) { 1435 oldestRITTime = ritTime; 1436 } 1437 } 1438 } 1439 } 1440 1441 private void updateRegionsInTransitionMetrics(final RegionInTransitionStat ritStat) { 1442 metrics.updateRITOldestAge(ritStat.getOldestRITTime()); 1443 metrics.updateRITCount(ritStat.getTotalRITs()); 1444 metrics.updateRITCountOverThreshold(ritStat.getTotalRITsOverThreshold()); 1445 } 1446 1447 private void updateDeadServerRegionMetrics(int deadRegions, int unknownRegions) { 1448 metrics.updateDeadServerOpenRegions(deadRegions); 1449 metrics.updateUnknownServerOpenRegions(unknownRegions); 1450 } 1451 1452 private void handleRegionOverStuckWarningThreshold(final RegionInfo regionInfo) { 1453 final RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo); 1454 //if (regionNode.isStuck()) { 1455 LOG.warn("STUCK Region-In-Transition {}", regionNode); 1456 } 1457 1458 // ============================================================================================ 1459 // TODO: Master load/bootstrap 1460 // ============================================================================================ 1461 public void joinCluster() throws IOException { 1462 long startTime = System.nanoTime(); 1463 LOG.debug("Joining cluster..."); 1464 1465 // Scan hbase:meta to build list of existing regions, servers, and assignment. 1466 // hbase:meta is online now or will be. Inside loadMeta, we keep trying. Can't make progress 1467 // w/o meta. 1468 loadMeta(); 1469 1470 while (master.getServerManager().countOfRegionServers() < 1) { 1471 LOG.info("Waiting for RegionServers to join; current count={}", 1472 master.getServerManager().countOfRegionServers()); 1473 Threads.sleep(250); 1474 } 1475 LOG.info("Number of RegionServers={}", master.getServerManager().countOfRegionServers()); 1476 1477 // Start the chores 1478 master.getMasterProcedureExecutor().addChore(this.ritChore); 1479 master.getMasterProcedureExecutor().addChore(this.deadMetricChore); 1480 1481 long costMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime); 1482 LOG.info("Joined the cluster in {}", StringUtils.humanTimeDiff(costMs)); 1483 } 1484 1485 /** 1486 * Create assign procedure for offline regions. 1487 * Just follow the old processofflineServersWithOnlineRegions method. Since now we do not need to 1488 * deal with dead server any more, we only deal with the regions in OFFLINE state in this method. 1489 * And this is a bit strange, that for new regions, we will add it in CLOSED state instead of 1490 * OFFLINE state, and usually there will be a procedure to track them. The 1491 * processofflineServersWithOnlineRegions is a legacy from long ago, as things are going really 1492 * different now, maybe we do not need this method any more. Need to revisit later. 1493 */ 1494 // Public so can be run by the Master as part of the startup. Needs hbase:meta to be online. 1495 // Needs to be done after the table state manager has been started. 1496 public void processOfflineRegions() { 1497 TransitRegionStateProcedure[] procs = 1498 regionStates.getRegionStateNodes().stream().filter(rsn -> rsn.isInState(State.OFFLINE)) 1499 .filter(rsn -> isTableEnabled(rsn.getRegionInfo().getTable())).map(rsn -> { 1500 rsn.lock(); 1501 try { 1502 if (rsn.getProcedure() != null) { 1503 return null; 1504 } else { 1505 return rsn.setProcedure(TransitRegionStateProcedure.assign(getProcedureEnvironment(), 1506 rsn.getRegionInfo(), null)); 1507 } 1508 } finally { 1509 rsn.unlock(); 1510 } 1511 }).filter(p -> p != null).toArray(TransitRegionStateProcedure[]::new); 1512 if (procs.length > 0) { 1513 master.getMasterProcedureExecutor().submitProcedures(procs); 1514 } 1515 } 1516 1517 /* AM internal RegionStateStore.RegionStateVisitor implementation. To be used when 1518 * scanning META table for region rows, using RegionStateStore utility methods. RegionStateStore 1519 * methods will convert Result into proper RegionInfo instances, but those would still need to be 1520 * added into AssignmentManager.regionStates in-memory cache. 1521 * RegionMetaLoadingVisitor.visitRegionState method provides the logic for adding RegionInfo 1522 * instances as loaded from latest META scan into AssignmentManager.regionStates. 1523 */ 1524 private class RegionMetaLoadingVisitor implements RegionStateStore.RegionStateVisitor { 1525 1526 @Override 1527 public void visitRegionState(Result result, final RegionInfo regionInfo, final State state, 1528 final ServerName regionLocation, final ServerName lastHost, final long openSeqNum) { 1529 if (state == null && regionLocation == null && lastHost == null && 1530 openSeqNum == SequenceId.NO_SEQUENCE_ID) { 1531 // This is a row with nothing in it. 1532 LOG.warn("Skipping empty row={}", result); 1533 return; 1534 } 1535 State localState = state; 1536 if (localState == null) { 1537 // No region state column data in hbase:meta table! Are I doing a rolling upgrade from 1538 // hbase1 to hbase2? Am I restoring a SNAPSHOT or otherwise adding a region to hbase:meta? 1539 // In any of these cases, state is empty. For now, presume OFFLINE but there are probably 1540 // cases where we need to probe more to be sure this correct; TODO informed by experience. 1541 LOG.info(regionInfo.getEncodedName() + " regionState=null; presuming " + State.OFFLINE); 1542 localState = State.OFFLINE; 1543 } 1544 RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo); 1545 // Do not need to lock on regionNode, as we can make sure that before we finish loading 1546 // meta, all the related procedures can not be executed. The only exception is for meta 1547 // region related operations, but here we do not load the informations for meta region. 1548 regionNode.setState(localState); 1549 regionNode.setLastHost(lastHost); 1550 regionNode.setRegionLocation(regionLocation); 1551 regionNode.setOpenSeqNum(openSeqNum); 1552 1553 // Note: keep consistent with other methods, see region(Opening|Opened|Closing) 1554 // RIT/ServerCrash handling should take care of the transiting regions. 1555 if (localState.matches(State.OPEN, State.OPENING, State.CLOSING, State.SPLITTING, 1556 State.MERGING)) { 1557 assert regionLocation != null : "found null region location for " + regionNode; 1558 regionStates.addRegionToServer(regionNode); 1559 } else if (localState == State.OFFLINE || regionInfo.isOffline()) { 1560 regionStates.addToOfflineRegions(regionNode); 1561 } 1562 if (regionNode.getProcedure() != null) { 1563 regionNode.getProcedure().stateLoaded(AssignmentManager.this, regionNode); 1564 } 1565 } 1566 }; 1567 1568 /** 1569 * Query META if the given <code>RegionInfo</code> exists, adding to 1570 * <code>AssignmentManager.regionStateStore</code> cache if the region is found in META. 1571 * @param regionEncodedName encoded name for the region to be loaded from META into 1572 * <code>AssignmentManager.regionStateStore</code> cache 1573 * @return <code>RegionInfo</code> instance for the given region if it is present in META 1574 * and got successfully loaded into <code>AssignmentManager.regionStateStore</code> 1575 * cache, <b>null</b> otherwise. 1576 * @throws UnknownRegionException if any errors occur while querying meta. 1577 */ 1578 public RegionInfo loadRegionFromMeta(String regionEncodedName) throws UnknownRegionException { 1579 try { 1580 RegionMetaLoadingVisitor visitor = new RegionMetaLoadingVisitor(); 1581 regionStateStore.visitMetaForRegion(regionEncodedName, visitor); 1582 return regionStates.getRegionState(regionEncodedName) == null ? null : 1583 regionStates.getRegionState(regionEncodedName).getRegion(); 1584 } catch(IOException e) { 1585 LOG.error("Error trying to load region {} from META", regionEncodedName, e); 1586 throw new UnknownRegionException("Error while trying load region from meta"); 1587 } 1588 } 1589 1590 private void loadMeta() throws IOException { 1591 // TODO: use a thread pool 1592 regionStateStore.visitMeta(new RegionMetaLoadingVisitor()); 1593 } 1594 1595 /** 1596 * Used to check if the meta loading is done. 1597 * <p/> 1598 * if not we throw PleaseHoldException since we are rebuilding the RegionStates 1599 * @param hri region to check if it is already rebuild 1600 * @throws PleaseHoldException if meta has not been loaded yet 1601 */ 1602 private void checkMetaLoaded(RegionInfo hri) throws PleaseHoldException { 1603 if (!isRunning()) { 1604 throw new PleaseHoldException("AssignmentManager not running"); 1605 } 1606 boolean meta = isMetaRegion(hri); 1607 boolean metaLoaded = isMetaLoaded(); 1608 if (!meta && !metaLoaded) { 1609 throw new PleaseHoldException( 1610 "Master not fully online; hbase:meta=" + meta + ", metaLoaded=" + metaLoaded); 1611 } 1612 } 1613 1614 // ============================================================================================ 1615 // TODO: Metrics 1616 // ============================================================================================ 1617 public int getNumRegionsOpened() { 1618 // TODO: Used by TestRegionPlacement.java and assume monotonically increasing value 1619 return 0; 1620 } 1621 1622 /** 1623 * Usually run by the Master in reaction to server crash during normal processing. 1624 * Can also be invoked via external RPC to effect repair; in the latter case, 1625 * the 'force' flag is set so we push through the SCP though context may indicate 1626 * already-running-SCP (An old SCP may have exited abnormally, or damaged cluster 1627 * may still have references in hbase:meta to 'Unknown Servers' -- servers that 1628 * are not online or in dead servers list, etc.) 1629 * @param force Set if the request came in externally over RPC (via hbck2). Force means 1630 * run the SCP even if it seems as though there might be an outstanding 1631 * SCP running. 1632 * @return pid of scheduled SCP or {@link Procedure#NO_PROC_ID} if none scheduled. 1633 */ 1634 public long submitServerCrash(ServerName serverName, boolean shouldSplitWal, boolean force) { 1635 // May be an 'Unknown Server' so handle case where serverNode is null. 1636 ServerStateNode serverNode = regionStates.getServerNode(serverName); 1637 // Remove the in-memory rsReports result 1638 synchronized (rsReports) { 1639 rsReports.remove(serverName); 1640 } 1641 1642 // We hold the write lock here for fencing on reportRegionStateTransition. Once we set the 1643 // server state to CRASHED, we will no longer accept the reportRegionStateTransition call from 1644 // this server. This is used to simplify the implementation for TRSP and SCP, where we can make 1645 // sure that, the region list fetched by SCP will not be changed any more. 1646 if (serverNode != null) { 1647 serverNode.writeLock().lock(); 1648 } 1649 boolean carryingMeta; 1650 long pid; 1651 try { 1652 ProcedureExecutor<MasterProcedureEnv> procExec = this.master.getMasterProcedureExecutor(); 1653 carryingMeta = isCarryingMeta(serverName); 1654 if (!force && serverNode != null && !serverNode.isInState(ServerState.ONLINE)) { 1655 LOG.info("Skip adding ServerCrashProcedure for {} (meta={}) -- running?", 1656 serverNode, carryingMeta); 1657 return Procedure.NO_PROC_ID; 1658 } else { 1659 MasterProcedureEnv mpe = procExec.getEnvironment(); 1660 // If serverNode == null, then 'Unknown Server'. Schedule HBCKSCP instead. 1661 // HBCKSCP scours Master in-memory state AND hbase;meta for references to 1662 // serverName just-in-case. An SCP that is scheduled when the server is 1663 // 'Unknown' probably originated externally with HBCK2 fix-it tool. 1664 ServerState oldState = null; 1665 if (serverNode != null) { 1666 oldState = serverNode.getState(); 1667 serverNode.setState(ServerState.CRASHED); 1668 } 1669 1670 if (force) { 1671 pid = procExec.submitProcedure( 1672 new HBCKServerCrashProcedure(mpe, serverName, shouldSplitWal, carryingMeta)); 1673 } else { 1674 pid = procExec.submitProcedure( 1675 new ServerCrashProcedure(mpe, serverName, shouldSplitWal, carryingMeta)); 1676 } 1677 LOG.info("Scheduled ServerCrashProcedure pid={} for {} (carryingMeta={}){}.", 1678 pid, serverName, carryingMeta, 1679 serverNode == null? "": " " + serverNode.toString() + ", oldState=" + oldState); 1680 } 1681 } finally { 1682 if (serverNode != null) { 1683 serverNode.writeLock().unlock(); 1684 } 1685 } 1686 return pid; 1687 } 1688 1689 public void offlineRegion(final RegionInfo regionInfo) { 1690 // TODO used by MasterRpcServices 1691 RegionStateNode node = regionStates.getRegionStateNode(regionInfo); 1692 if (node != null) { 1693 node.offline(); 1694 } 1695 } 1696 1697 public void onlineRegion(final RegionInfo regionInfo, final ServerName serverName) { 1698 // TODO used by TestSplitTransactionOnCluster.java 1699 } 1700 1701 public Map<ServerName, List<RegionInfo>> getSnapShotOfAssignment( 1702 final Collection<RegionInfo> regions) { 1703 return regionStates.getSnapShotOfAssignment(regions); 1704 } 1705 1706 // ============================================================================================ 1707 // TODO: UTILS/HELPERS? 1708 // ============================================================================================ 1709 /** 1710 * Used by the client (via master) to identify if all regions have the schema updates 1711 * @return Pair indicating the status of the alter command (pending/total) 1712 */ 1713 public Pair<Integer, Integer> getReopenStatus(TableName tableName) { 1714 if (isTableDisabled(tableName)) { 1715 return new Pair<Integer, Integer>(0, 0); 1716 } 1717 1718 final List<RegionState> states = regionStates.getTableRegionStates(tableName); 1719 int ritCount = 0; 1720 for (RegionState regionState: states) { 1721 if (!regionState.isOpened() && !regionState.isSplit()) { 1722 ritCount++; 1723 } 1724 } 1725 return new Pair<Integer, Integer>(ritCount, states.size()); 1726 } 1727 1728 // ============================================================================================ 1729 // TODO: Region State In Transition 1730 // ============================================================================================ 1731 public boolean hasRegionsInTransition() { 1732 return regionStates.hasRegionsInTransition(); 1733 } 1734 1735 public List<RegionStateNode> getRegionsInTransition() { 1736 return regionStates.getRegionsInTransition(); 1737 } 1738 1739 public List<RegionInfo> getAssignedRegions() { 1740 return regionStates.getAssignedRegions(); 1741 } 1742 1743 public RegionInfo getRegionInfo(final byte[] regionName) { 1744 final RegionStateNode regionState = regionStates.getRegionStateNodeFromName(regionName); 1745 return regionState != null ? regionState.getRegionInfo() : null; 1746 } 1747 1748 // ============================================================================================ 1749 // Expected states on region state transition. 1750 // Notice that there is expected states for transiting to OPENING state, this is because SCP. 1751 // See the comments in regionOpening method for more details. 1752 // ============================================================================================ 1753 private static final State[] STATES_EXPECTED_ON_OPEN = { 1754 State.OPENING, // Normal case 1755 State.OPEN // Retrying 1756 }; 1757 1758 private static final State[] STATES_EXPECTED_ON_CLOSING = { 1759 State.OPEN, // Normal case 1760 State.CLOSING, // Retrying 1761 State.SPLITTING, // Offline the split parent 1762 State.MERGING // Offline the merge parents 1763 }; 1764 1765 private static final State[] STATES_EXPECTED_ON_CLOSED = { 1766 State.CLOSING, // Normal case 1767 State.CLOSED // Retrying 1768 }; 1769 1770 // This is for manually scheduled region assign, can add other states later if we find out other 1771 // usages 1772 private static final State[] STATES_EXPECTED_ON_ASSIGN = { State.CLOSED, State.OFFLINE }; 1773 1774 // We only allow unassign or move a region which is in OPEN state. 1775 private static final State[] STATES_EXPECTED_ON_UNASSIGN_OR_MOVE = { State.OPEN }; 1776 1777 // ============================================================================================ 1778 // Region Status update 1779 // Should only be called in TransitRegionStateProcedure(and related procedures), as the locking 1780 // and pre-assumptions are very tricky. 1781 // ============================================================================================ 1782 private void transitStateAndUpdate(RegionStateNode regionNode, RegionState.State newState, 1783 RegionState.State... expectedStates) throws IOException { 1784 RegionState.State state = regionNode.getState(); 1785 regionNode.transitionState(newState, expectedStates); 1786 boolean succ = false; 1787 try { 1788 regionStateStore.updateRegionLocation(regionNode); 1789 succ = true; 1790 } finally { 1791 if (!succ) { 1792 // revert 1793 regionNode.setState(state); 1794 } 1795 } 1796 } 1797 1798 // should be called within the synchronized block of RegionStateNode 1799 void regionOpening(RegionStateNode regionNode) throws IOException { 1800 // As in SCP, for performance reason, there is no TRSP attached with this region, we will not 1801 // update the region state, which means that the region could be in any state when we want to 1802 // assign it after a RS crash. So here we do not pass the expectedStates parameter. 1803 transitStateAndUpdate(regionNode, State.OPENING); 1804 regionStates.addRegionToServer(regionNode); 1805 // update the operation count metrics 1806 metrics.incrementOperationCounter(); 1807 } 1808 1809 // should be called under the RegionStateNode lock 1810 // The parameter 'giveUp' means whether we will try to open the region again, if it is true, then 1811 // we will persist the FAILED_OPEN state into hbase:meta. 1812 void regionFailedOpen(RegionStateNode regionNode, boolean giveUp) throws IOException { 1813 RegionState.State state = regionNode.getState(); 1814 ServerName regionLocation = regionNode.getRegionLocation(); 1815 if (giveUp) { 1816 regionNode.setState(State.FAILED_OPEN); 1817 regionNode.setRegionLocation(null); 1818 boolean succ = false; 1819 try { 1820 regionStateStore.updateRegionLocation(regionNode); 1821 succ = true; 1822 } finally { 1823 if (!succ) { 1824 // revert 1825 regionNode.setState(state); 1826 regionNode.setRegionLocation(regionLocation); 1827 } 1828 } 1829 } 1830 if (regionLocation != null) { 1831 regionStates.removeRegionFromServer(regionLocation, regionNode); 1832 } 1833 } 1834 1835 // should be called under the RegionStateNode lock 1836 void regionClosing(RegionStateNode regionNode) throws IOException { 1837 transitStateAndUpdate(regionNode, State.CLOSING, STATES_EXPECTED_ON_CLOSING); 1838 1839 RegionInfo hri = regionNode.getRegionInfo(); 1840 // Set meta has not initialized early. so people trying to create/edit tables will wait 1841 if (isMetaRegion(hri)) { 1842 setMetaAssigned(hri, false); 1843 } 1844 regionStates.addRegionToServer(regionNode); 1845 // update the operation count metrics 1846 metrics.incrementOperationCounter(); 1847 } 1848 1849 // for open and close, they will first be persist to the procedure store in 1850 // RegionRemoteProcedureBase. So here we will first change the in memory state as it is considered 1851 // as succeeded if the persistence to procedure store is succeeded, and then when the 1852 // RegionRemoteProcedureBase is woken up, we will persist the RegionStateNode to hbase:meta. 1853 1854 // should be called under the RegionStateNode lock 1855 void regionOpenedWithoutPersistingToMeta(RegionStateNode regionNode) throws IOException { 1856 regionNode.transitionState(State.OPEN, STATES_EXPECTED_ON_OPEN); 1857 RegionInfo regionInfo = regionNode.getRegionInfo(); 1858 regionStates.addRegionToServer(regionNode); 1859 regionStates.removeFromFailedOpen(regionInfo); 1860 } 1861 1862 // should be called under the RegionStateNode lock 1863 void regionClosedWithoutPersistingToMeta(RegionStateNode regionNode) throws IOException { 1864 ServerName regionLocation = regionNode.getRegionLocation(); 1865 regionNode.transitionState(State.CLOSED, STATES_EXPECTED_ON_CLOSED); 1866 regionNode.setRegionLocation(null); 1867 if (regionLocation != null) { 1868 regionNode.setLastHost(regionLocation); 1869 regionStates.removeRegionFromServer(regionLocation, regionNode); 1870 } 1871 } 1872 1873 // should be called under the RegionStateNode lock 1874 // for SCP 1875 public void regionClosedAbnormally(RegionStateNode regionNode) throws IOException { 1876 RegionState.State state = regionNode.getState(); 1877 ServerName regionLocation = regionNode.getRegionLocation(); 1878 regionNode.transitionState(State.ABNORMALLY_CLOSED); 1879 regionNode.setRegionLocation(null); 1880 boolean succ = false; 1881 try { 1882 regionStateStore.updateRegionLocation(regionNode); 1883 succ = true; 1884 } finally { 1885 if (!succ) { 1886 // revert 1887 regionNode.setState(state); 1888 regionNode.setRegionLocation(regionLocation); 1889 } 1890 } 1891 if (regionLocation != null) { 1892 regionNode.setLastHost(regionLocation); 1893 regionStates.removeRegionFromServer(regionLocation, regionNode); 1894 } 1895 } 1896 1897 void persistToMeta(RegionStateNode regionNode) throws IOException { 1898 regionStateStore.updateRegionLocation(regionNode); 1899 RegionInfo regionInfo = regionNode.getRegionInfo(); 1900 if (isMetaRegion(regionInfo) && regionNode.getState() == State.OPEN) { 1901 // Usually we'd set a table ENABLED at this stage but hbase:meta is ALWAYs enabled, it 1902 // can't be disabled -- so skip the RPC (besides... enabled is managed by TableStateManager 1903 // which is backed by hbase:meta... Avoid setting ENABLED to avoid having to update state 1904 // on table that contains state. 1905 setMetaAssigned(regionInfo, true); 1906 } 1907 } 1908 1909 // ============================================================================================ 1910 // The above methods can only be called in TransitRegionStateProcedure(and related procedures) 1911 // ============================================================================================ 1912 1913 public void markRegionAsSplit(final RegionInfo parent, final ServerName serverName, 1914 final RegionInfo daughterA, final RegionInfo daughterB) throws IOException { 1915 // Update hbase:meta. Parent will be marked offline and split up in hbase:meta. 1916 // The parent stays in regionStates until cleared when removed by CatalogJanitor. 1917 // Update its state in regionStates to it shows as offline and split when read 1918 // later figuring what regions are in a table and what are not: see 1919 // regionStates#getRegionsOfTable 1920 final RegionStateNode node = regionStates.getOrCreateRegionStateNode(parent); 1921 node.setState(State.SPLIT); 1922 final RegionStateNode nodeA = regionStates.getOrCreateRegionStateNode(daughterA); 1923 nodeA.setState(State.SPLITTING_NEW); 1924 final RegionStateNode nodeB = regionStates.getOrCreateRegionStateNode(daughterB); 1925 nodeB.setState(State.SPLITTING_NEW); 1926 1927 // TODO: here we just update the parent region info in meta, to set split and offline to true, 1928 // without changing the one in the region node. This is a bit confusing but the region info 1929 // field in RegionStateNode is not expected to be changed in the current design. Need to find a 1930 // possible way to address this problem, or at least adding more comments about the trick to 1931 // deal with this problem, that when you want to filter out split parent, you need to check both 1932 // the RegionState on whether it is split, and also the region info. If one of them matches then 1933 // it is a split parent. And usually only one of them can match, as after restart, the region 1934 // state will be changed from SPLIT to CLOSED. 1935 regionStateStore.splitRegion(parent, daughterA, daughterB, serverName); 1936 if (shouldAssignFavoredNodes(parent)) { 1937 List<ServerName> onlineServers = this.master.getServerManager().getOnlineServersList(); 1938 ((FavoredNodesPromoter)getBalancer()). 1939 generateFavoredNodesForDaughter(onlineServers, parent, daughterA, daughterB); 1940 } 1941 } 1942 1943 /** 1944 * When called here, the merge has happened. The merged regions have been 1945 * unassigned and the above markRegionClosed has been called on each so they have been 1946 * disassociated from a hosting Server. The merged region will be open after this call. The 1947 * merged regions are removed from hbase:meta below. Later they are deleted from the filesystem 1948 * by the catalog janitor running against hbase:meta. It notices when the merged region no 1949 * longer holds references to the old regions (References are deleted after a compaction 1950 * rewrites what the Reference points at but not until the archiver chore runs, are the 1951 * References removed). 1952 */ 1953 public void markRegionAsMerged(final RegionInfo child, final ServerName serverName, 1954 RegionInfo [] mergeParents) 1955 throws IOException { 1956 final RegionStateNode node = regionStates.getOrCreateRegionStateNode(child); 1957 node.setState(State.MERGED); 1958 for (RegionInfo ri: mergeParents) { 1959 regionStates.deleteRegion(ri); 1960 1961 } 1962 regionStateStore.mergeRegions(child, mergeParents, serverName); 1963 if (shouldAssignFavoredNodes(child)) { 1964 ((FavoredNodesPromoter)getBalancer()). 1965 generateFavoredNodesForMergedRegion(child, mergeParents); 1966 } 1967 } 1968 1969 /* 1970 * Favored nodes should be applied only when FavoredNodes balancer is configured and the region 1971 * belongs to a non-system table. 1972 */ 1973 private boolean shouldAssignFavoredNodes(RegionInfo region) { 1974 return this.shouldAssignRegionsWithFavoredNodes && 1975 FavoredNodesManager.isFavoredNodeApplicable(region); 1976 } 1977 1978 // ============================================================================================ 1979 // Assign Queue (Assign/Balance) 1980 // ============================================================================================ 1981 private final ArrayList<RegionStateNode> pendingAssignQueue = new ArrayList<RegionStateNode>(); 1982 private final ReentrantLock assignQueueLock = new ReentrantLock(); 1983 private final Condition assignQueueFullCond = assignQueueLock.newCondition(); 1984 1985 /** 1986 * Add the assign operation to the assignment queue. 1987 * The pending assignment operation will be processed, 1988 * and each region will be assigned by a server using the balancer. 1989 */ 1990 protected void queueAssign(final RegionStateNode regionNode) { 1991 regionNode.getProcedureEvent().suspend(); 1992 1993 // TODO: quick-start for meta and the other sys-tables? 1994 assignQueueLock.lock(); 1995 try { 1996 pendingAssignQueue.add(regionNode); 1997 if (regionNode.isSystemTable() || 1998 pendingAssignQueue.size() == 1 || 1999 pendingAssignQueue.size() >= assignDispatchWaitQueueMaxSize) { 2000 assignQueueFullCond.signal(); 2001 } 2002 } finally { 2003 assignQueueLock.unlock(); 2004 } 2005 } 2006 2007 private void startAssignmentThread() { 2008 assignThread = new Thread(master.getServerName().toShortString()) { 2009 @Override 2010 public void run() { 2011 while (isRunning()) { 2012 processAssignQueue(); 2013 } 2014 pendingAssignQueue.clear(); 2015 } 2016 }; 2017 assignThread.setDaemon(true); 2018 assignThread.start(); 2019 } 2020 2021 private void stopAssignmentThread() { 2022 assignQueueSignal(); 2023 try { 2024 while (assignThread.isAlive()) { 2025 assignQueueSignal(); 2026 assignThread.join(250); 2027 } 2028 } catch (InterruptedException e) { 2029 LOG.warn("join interrupted", e); 2030 Thread.currentThread().interrupt(); 2031 } 2032 } 2033 2034 private void assignQueueSignal() { 2035 assignQueueLock.lock(); 2036 try { 2037 assignQueueFullCond.signal(); 2038 } finally { 2039 assignQueueLock.unlock(); 2040 } 2041 } 2042 2043 @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP") 2044 private HashMap<RegionInfo, RegionStateNode> waitOnAssignQueue() { 2045 HashMap<RegionInfo, RegionStateNode> regions = null; 2046 2047 assignQueueLock.lock(); 2048 try { 2049 if (pendingAssignQueue.isEmpty() && isRunning()) { 2050 assignQueueFullCond.await(); 2051 } 2052 2053 if (!isRunning()) { 2054 return null; 2055 } 2056 assignQueueFullCond.await(assignDispatchWaitMillis, TimeUnit.MILLISECONDS); 2057 regions = new HashMap<RegionInfo, RegionStateNode>(pendingAssignQueue.size()); 2058 for (RegionStateNode regionNode: pendingAssignQueue) { 2059 regions.put(regionNode.getRegionInfo(), regionNode); 2060 } 2061 pendingAssignQueue.clear(); 2062 } catch (InterruptedException e) { 2063 LOG.warn("got interrupted ", e); 2064 Thread.currentThread().interrupt(); 2065 } finally { 2066 assignQueueLock.unlock(); 2067 } 2068 return regions; 2069 } 2070 2071 private void processAssignQueue() { 2072 final HashMap<RegionInfo, RegionStateNode> regions = waitOnAssignQueue(); 2073 if (regions == null || regions.size() == 0 || !isRunning()) { 2074 return; 2075 } 2076 2077 if (LOG.isTraceEnabled()) { 2078 LOG.trace("PROCESS ASSIGN QUEUE regionCount=" + regions.size()); 2079 } 2080 2081 // TODO: Optimize balancer. pass a RegionPlan? 2082 final HashMap<RegionInfo, ServerName> retainMap = new HashMap<>(); 2083 final List<RegionInfo> userHRIs = new ArrayList<>(regions.size()); 2084 // Regions for system tables requiring reassignment 2085 final List<RegionInfo> systemHRIs = new ArrayList<>(); 2086 for (RegionStateNode regionStateNode: regions.values()) { 2087 boolean sysTable = regionStateNode.isSystemTable(); 2088 final List<RegionInfo> hris = sysTable? systemHRIs: userHRIs; 2089 if (regionStateNode.getRegionLocation() != null) { 2090 retainMap.put(regionStateNode.getRegionInfo(), regionStateNode.getRegionLocation()); 2091 } else { 2092 hris.add(regionStateNode.getRegionInfo()); 2093 } 2094 } 2095 2096 // TODO: connect with the listener to invalidate the cache 2097 2098 // TODO use events 2099 List<ServerName> servers = master.getServerManager().createDestinationServersList(); 2100 for (int i = 0; servers.size() < 1; ++i) { 2101 // Report every fourth time around this loop; try not to flood log. 2102 if (i % 4 == 0) { 2103 LOG.warn("No servers available; cannot place " + regions.size() + " unassigned regions."); 2104 } 2105 2106 if (!isRunning()) { 2107 LOG.debug("Stopped! Dropping assign of " + regions.size() + " queued regions."); 2108 return; 2109 } 2110 Threads.sleep(250); 2111 servers = master.getServerManager().createDestinationServersList(); 2112 } 2113 2114 if (!systemHRIs.isEmpty()) { 2115 // System table regions requiring reassignment are present, get region servers 2116 // not available for system table regions 2117 final List<ServerName> excludeServers = getExcludedServersForSystemTable(); 2118 List<ServerName> serversForSysTables = servers.stream() 2119 .filter(s -> !excludeServers.contains(s)).collect(Collectors.toList()); 2120 if (serversForSysTables.isEmpty()) { 2121 LOG.warn("Filtering old server versions and the excluded produced an empty set; " + 2122 "instead considering all candidate servers!"); 2123 } 2124 LOG.debug("Processing assignQueue; systemServersCount=" + serversForSysTables.size() + 2125 ", allServersCount=" + servers.size()); 2126 processAssignmentPlans(regions, null, systemHRIs, 2127 serversForSysTables.isEmpty() && !containsBogusAssignments(regions, systemHRIs) ? 2128 servers: serversForSysTables); 2129 } 2130 2131 processAssignmentPlans(regions, retainMap, userHRIs, servers); 2132 } 2133 2134 private boolean containsBogusAssignments(Map<RegionInfo, RegionStateNode> regions, 2135 List<RegionInfo> hirs) { 2136 for (RegionInfo ri : hirs) { 2137 if (regions.get(ri).getRegionLocation() != null && 2138 regions.get(ri).getRegionLocation().equals(LoadBalancer.BOGUS_SERVER_NAME)){ 2139 return true; 2140 } 2141 } 2142 return false; 2143 } 2144 2145 private void processAssignmentPlans(final HashMap<RegionInfo, RegionStateNode> regions, 2146 final HashMap<RegionInfo, ServerName> retainMap, final List<RegionInfo> hris, 2147 final List<ServerName> servers) { 2148 boolean isTraceEnabled = LOG.isTraceEnabled(); 2149 if (isTraceEnabled) { 2150 LOG.trace("Available servers count=" + servers.size() + ": " + servers); 2151 } 2152 2153 final LoadBalancer balancer = getBalancer(); 2154 // ask the balancer where to place regions 2155 if (retainMap != null && !retainMap.isEmpty()) { 2156 if (isTraceEnabled) { 2157 LOG.trace("retain assign regions=" + retainMap); 2158 } 2159 try { 2160 acceptPlan(regions, balancer.retainAssignment(retainMap, servers)); 2161 } catch (HBaseIOException e) { 2162 LOG.warn("unable to retain assignment", e); 2163 addToPendingAssignment(regions, retainMap.keySet()); 2164 } 2165 } 2166 2167 // TODO: Do we need to split retain and round-robin? 2168 // the retain seems to fallback to round-robin/random if the region is not in the map. 2169 if (!hris.isEmpty()) { 2170 Collections.sort(hris, RegionInfo.COMPARATOR); 2171 if (isTraceEnabled) { 2172 LOG.trace("round robin regions=" + hris); 2173 } 2174 try { 2175 acceptPlan(regions, balancer.roundRobinAssignment(hris, servers)); 2176 } catch (HBaseIOException e) { 2177 LOG.warn("unable to round-robin assignment", e); 2178 addToPendingAssignment(regions, hris); 2179 } 2180 } 2181 } 2182 2183 private void acceptPlan(final HashMap<RegionInfo, RegionStateNode> regions, 2184 final Map<ServerName, List<RegionInfo>> plan) throws HBaseIOException { 2185 final ProcedureEvent<?>[] events = new ProcedureEvent[regions.size()]; 2186 final long st = System.currentTimeMillis(); 2187 2188 if (plan.isEmpty()) { 2189 throw new HBaseIOException("unable to compute plans for regions=" + regions.size()); 2190 } 2191 2192 int evcount = 0; 2193 for (Map.Entry<ServerName, List<RegionInfo>> entry: plan.entrySet()) { 2194 final ServerName server = entry.getKey(); 2195 for (RegionInfo hri: entry.getValue()) { 2196 final RegionStateNode regionNode = regions.get(hri); 2197 regionNode.setRegionLocation(server); 2198 if (server.equals(LoadBalancer.BOGUS_SERVER_NAME) && regionNode.isSystemTable()) { 2199 assignQueueLock.lock(); 2200 try { 2201 pendingAssignQueue.add(regionNode); 2202 } finally { 2203 assignQueueLock.unlock(); 2204 } 2205 }else { 2206 events[evcount++] = regionNode.getProcedureEvent(); 2207 } 2208 } 2209 } 2210 ProcedureEvent.wakeEvents(getProcedureScheduler(), events); 2211 2212 final long et = System.currentTimeMillis(); 2213 if (LOG.isTraceEnabled()) { 2214 LOG.trace("ASSIGN ACCEPT " + events.length + " -> " + 2215 StringUtils.humanTimeDiff(et - st)); 2216 } 2217 } 2218 2219 private void addToPendingAssignment(final HashMap<RegionInfo, RegionStateNode> regions, 2220 final Collection<RegionInfo> pendingRegions) { 2221 assignQueueLock.lock(); 2222 try { 2223 for (RegionInfo hri: pendingRegions) { 2224 pendingAssignQueue.add(regions.get(hri)); 2225 } 2226 } finally { 2227 assignQueueLock.unlock(); 2228 } 2229 } 2230 2231 /** 2232 * Get a list of servers that this region cannot be assigned to. 2233 * For system tables, we must assign them to a server with highest version. 2234 */ 2235 public List<ServerName> getExcludedServersForSystemTable() { 2236 // TODO: This should be a cached list kept by the ServerManager rather than calculated on each 2237 // move or system region assign. The RegionServerTracker keeps list of online Servers with 2238 // RegionServerInfo that includes Version. 2239 List<Pair<ServerName, String>> serverList = master.getServerManager().getOnlineServersList() 2240 .stream() 2241 .map((s)->new Pair<>(s, master.getRegionServerVersion(s))) 2242 .collect(Collectors.toList()); 2243 if (serverList.isEmpty()) { 2244 return Collections.emptyList(); 2245 } 2246 String highestVersion = Collections.max(serverList, 2247 (o1, o2) -> VersionInfo.compareVersion(o1.getSecond(), o2.getSecond())).getSecond(); 2248 return serverList.stream() 2249 .filter((p)->!p.getSecond().equals(highestVersion)) 2250 .map(Pair::getFirst) 2251 .collect(Collectors.toList()); 2252 } 2253 2254 MasterServices getMaster() { 2255 return master; 2256 } 2257 2258 /** 2259 * @return a snapshot of rsReports 2260 */ 2261 public Map<ServerName, Set<byte[]>> getRSReports() { 2262 Map<ServerName, Set<byte[]>> rsReportsSnapshot = new HashMap<>(); 2263 synchronized (rsReports) { 2264 rsReports.entrySet().forEach(e -> rsReportsSnapshot.put(e.getKey(), e.getValue())); 2265 } 2266 return rsReportsSnapshot; 2267 } 2268 2269 /** 2270 * Provide regions state count for given table. 2271 * e.g howmany regions of give table are opened/closed/rit etc 2272 * 2273 * @param tableName TableName 2274 * @return region states count 2275 */ 2276 public RegionStatesCount getRegionStatesCount(TableName tableName) { 2277 int openRegionsCount = 0; 2278 int closedRegionCount = 0; 2279 int ritCount = 0; 2280 int splitRegionCount = 0; 2281 int totalRegionCount = 0; 2282 if (!isTableDisabled(tableName)) { 2283 final List<RegionState> states = regionStates.getTableRegionStates(tableName); 2284 for (RegionState regionState : states) { 2285 if (regionState.isOpened()) { 2286 openRegionsCount++; 2287 } else if (regionState.isClosed()) { 2288 closedRegionCount++; 2289 } else if (regionState.isSplit()) { 2290 splitRegionCount++; 2291 } 2292 } 2293 totalRegionCount = states.size(); 2294 ritCount = totalRegionCount - openRegionsCount - splitRegionCount; 2295 } 2296 return new RegionStatesCount.RegionStatesCountBuilder() 2297 .setOpenRegions(openRegionsCount) 2298 .setClosedRegions(closedRegionCount) 2299 .setSplitRegions(splitRegionCount) 2300 .setRegionsInTransition(ritCount) 2301 .setTotalRegions(totalRegionCount) 2302 .build(); 2303 } 2304 2305}