001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.assignment; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Collection; 023import java.util.Collections; 024import java.util.HashMap; 025import java.util.HashSet; 026import java.util.List; 027import java.util.Map; 028import java.util.Set; 029import java.util.concurrent.Future; 030import java.util.concurrent.TimeUnit; 031import java.util.concurrent.atomic.AtomicBoolean; 032import java.util.concurrent.locks.Condition; 033import java.util.concurrent.locks.ReentrantLock; 034import java.util.stream.Collectors; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.hbase.DoNotRetryIOException; 037import org.apache.hadoop.hbase.HBaseIOException; 038import org.apache.hadoop.hbase.HConstants; 039import org.apache.hadoop.hbase.PleaseHoldException; 040import org.apache.hadoop.hbase.ServerName; 041import org.apache.hadoop.hbase.TableName; 042import org.apache.hadoop.hbase.UnknownRegionException; 043import org.apache.hadoop.hbase.client.DoNotRetryRegionException; 044import org.apache.hadoop.hbase.client.RegionInfo; 045import org.apache.hadoop.hbase.client.RegionInfoBuilder; 046import org.apache.hadoop.hbase.client.RegionStatesCount; 047import org.apache.hadoop.hbase.client.Result; 048import org.apache.hadoop.hbase.client.TableState; 049import org.apache.hadoop.hbase.exceptions.UnexpectedStateException; 050import org.apache.hadoop.hbase.favored.FavoredNodesManager; 051import org.apache.hadoop.hbase.favored.FavoredNodesPromoter; 052import org.apache.hadoop.hbase.master.LoadBalancer; 053import org.apache.hadoop.hbase.master.MasterServices; 054import org.apache.hadoop.hbase.master.MetricsAssignmentManager; 055import org.apache.hadoop.hbase.master.RegionPlan; 056import org.apache.hadoop.hbase.master.RegionState; 057import org.apache.hadoop.hbase.master.RegionState.State; 058import org.apache.hadoop.hbase.master.ServerManager; 059import org.apache.hadoop.hbase.master.TableStateManager; 060import org.apache.hadoop.hbase.master.balancer.FavoredStochasticBalancer; 061import org.apache.hadoop.hbase.master.procedure.HBCKServerCrashProcedure; 062import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 063import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler; 064import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait; 065import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure; 066import org.apache.hadoop.hbase.procedure2.Procedure; 067import org.apache.hadoop.hbase.procedure2.ProcedureEvent; 068import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; 069import org.apache.hadoop.hbase.procedure2.ProcedureInMemoryChore; 070import org.apache.hadoop.hbase.procedure2.util.StringUtils; 071import org.apache.hadoop.hbase.regionserver.SequenceId; 072import org.apache.hadoop.hbase.util.Bytes; 073import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 074import org.apache.hadoop.hbase.util.HasThread; 075import org.apache.hadoop.hbase.util.Pair; 076import org.apache.hadoop.hbase.util.Threads; 077import org.apache.hadoop.hbase.util.VersionInfo; 078import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; 079import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 080import org.apache.yetus.audience.InterfaceAudience; 081import org.apache.zookeeper.KeeperException; 082import org.slf4j.Logger; 083import org.slf4j.LoggerFactory; 084 085import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 086 087import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 088import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition; 089import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; 090import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest; 091import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse; 092 093/** 094 * The AssignmentManager is the coordinator for region assign/unassign operations. 095 * <ul> 096 * <li>In-memory states of regions and servers are stored in {@link RegionStates}.</li> 097 * <li>hbase:meta state updates are handled by {@link RegionStateStore}.</li> 098 * </ul> 099 * Regions are created by CreateTable, Split, Merge. 100 * Regions are deleted by DeleteTable, Split, Merge. 101 * Assigns are triggered by CreateTable, EnableTable, Split, Merge, ServerCrash. 102 * Unassigns are triggered by DisableTable, Split, Merge 103 */ 104@InterfaceAudience.Private 105public class AssignmentManager { 106 private static final Logger LOG = LoggerFactory.getLogger(AssignmentManager.class); 107 108 // TODO: AMv2 109 // - handle region migration from hbase1 to hbase2. 110 // - handle sys table assignment first (e.g. acl, namespace) 111 // - handle table priorities 112 // - If ServerBusyException trying to update hbase:meta, we abort the Master 113 // See updateRegionLocation in RegionStateStore. 114 // 115 // See also 116 // https://docs.google.com/document/d/1eVKa7FHdeoJ1-9o8yZcOTAQbv0u0bblBlCCzVSIn69g/edit#heading=h.ystjyrkbtoq5 117 // for other TODOs. 118 119 public static final String BOOTSTRAP_THREAD_POOL_SIZE_CONF_KEY = 120 "hbase.assignment.bootstrap.thread.pool.size"; 121 122 public static final String ASSIGN_DISPATCH_WAIT_MSEC_CONF_KEY = 123 "hbase.assignment.dispatch.wait.msec"; 124 private static final int DEFAULT_ASSIGN_DISPATCH_WAIT_MSEC = 150; 125 126 public static final String ASSIGN_DISPATCH_WAITQ_MAX_CONF_KEY = 127 "hbase.assignment.dispatch.wait.queue.max.size"; 128 private static final int DEFAULT_ASSIGN_DISPATCH_WAITQ_MAX = 100; 129 130 public static final String RIT_CHORE_INTERVAL_MSEC_CONF_KEY = 131 "hbase.assignment.rit.chore.interval.msec"; 132 private static final int DEFAULT_RIT_CHORE_INTERVAL_MSEC = 60 * 1000; 133 134 public static final String DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC_CONF_KEY = 135 "hbase.assignment.dead.region.metric.chore.interval.msec"; 136 private static final int DEFAULT_DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC = 120 * 1000; 137 138 public static final String ASSIGN_MAX_ATTEMPTS = 139 "hbase.assignment.maximum.attempts"; 140 private static final int DEFAULT_ASSIGN_MAX_ATTEMPTS = Integer.MAX_VALUE; 141 142 public static final String ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS = 143 "hbase.assignment.retry.immediately.maximum.attempts"; 144 private static final int DEFAULT_ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS = 3; 145 146 /** Region in Transition metrics threshold time */ 147 public static final String METRICS_RIT_STUCK_WARNING_THRESHOLD = 148 "hbase.metrics.rit.stuck.warning.threshold"; 149 private static final int DEFAULT_RIT_STUCK_WARNING_THRESHOLD = 60 * 1000; 150 151 private final ProcedureEvent<?> metaAssignEvent = new ProcedureEvent<>("meta assign"); 152 private final ProcedureEvent<?> metaLoadEvent = new ProcedureEvent<>("meta load"); 153 154 private final MetricsAssignmentManager metrics; 155 private final RegionInTransitionChore ritChore; 156 private final DeadServerMetricRegionChore deadMetricChore; 157 private final MasterServices master; 158 159 private final AtomicBoolean running = new AtomicBoolean(false); 160 private final RegionStates regionStates = new RegionStates(); 161 private final RegionStateStore regionStateStore; 162 163 private final Map<ServerName, Set<byte[]>> rsReports = new HashMap<>(); 164 165 private final boolean shouldAssignRegionsWithFavoredNodes; 166 private final int assignDispatchWaitQueueMaxSize; 167 private final int assignDispatchWaitMillis; 168 private final int assignMaxAttempts; 169 private final int assignRetryImmediatelyMaxAttempts; 170 171 private final Object checkIfShouldMoveSystemRegionLock = new Object(); 172 173 private Thread assignThread; 174 175 public AssignmentManager(final MasterServices master) { 176 this(master, new RegionStateStore(master)); 177 } 178 179 @VisibleForTesting 180 AssignmentManager(final MasterServices master, final RegionStateStore stateStore) { 181 this.master = master; 182 this.regionStateStore = stateStore; 183 this.metrics = new MetricsAssignmentManager(); 184 185 final Configuration conf = master.getConfiguration(); 186 187 // Only read favored nodes if using the favored nodes load balancer. 188 this.shouldAssignRegionsWithFavoredNodes = FavoredStochasticBalancer.class.isAssignableFrom( 189 conf.getClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, Object.class)); 190 191 this.assignDispatchWaitMillis = conf.getInt(ASSIGN_DISPATCH_WAIT_MSEC_CONF_KEY, 192 DEFAULT_ASSIGN_DISPATCH_WAIT_MSEC); 193 this.assignDispatchWaitQueueMaxSize = conf.getInt(ASSIGN_DISPATCH_WAITQ_MAX_CONF_KEY, 194 DEFAULT_ASSIGN_DISPATCH_WAITQ_MAX); 195 196 this.assignMaxAttempts = Math.max(1, conf.getInt(ASSIGN_MAX_ATTEMPTS, 197 DEFAULT_ASSIGN_MAX_ATTEMPTS)); 198 this.assignRetryImmediatelyMaxAttempts = conf.getInt(ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS, 199 DEFAULT_ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS); 200 201 int ritChoreInterval = conf.getInt(RIT_CHORE_INTERVAL_MSEC_CONF_KEY, 202 DEFAULT_RIT_CHORE_INTERVAL_MSEC); 203 this.ritChore = new RegionInTransitionChore(ritChoreInterval); 204 205 int deadRegionChoreInterval = conf.getInt(DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC_CONF_KEY, 206 DEFAULT_DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC); 207 if (deadRegionChoreInterval > 0) { 208 this.deadMetricChore = new DeadServerMetricRegionChore(deadRegionChoreInterval); 209 } else { 210 this.deadMetricChore = null; 211 } 212 } 213 214 public void start() throws IOException, KeeperException { 215 if (!running.compareAndSet(false, true)) { 216 return; 217 } 218 219 LOG.trace("Starting assignment manager"); 220 221 // Start the Assignment Thread 222 startAssignmentThread(); 223 224 // load meta region state 225 ZKWatcher zkw = master.getZooKeeper(); 226 // it could be null in some tests 227 if (zkw != null) { 228 RegionState regionState = MetaTableLocator.getMetaRegionState(zkw); 229 RegionStateNode regionNode = 230 regionStates.getOrCreateRegionStateNode(RegionInfoBuilder.FIRST_META_REGIONINFO); 231 regionNode.lock(); 232 try { 233 regionNode.setRegionLocation(regionState.getServerName()); 234 regionNode.setState(regionState.getState()); 235 if (regionNode.getProcedure() != null) { 236 regionNode.getProcedure().stateLoaded(this, regionNode); 237 } 238 setMetaAssigned(regionState.getRegion(), regionState.getState() == State.OPEN); 239 } finally { 240 regionNode.unlock(); 241 } 242 } 243 } 244 245 /** 246 * Create RegionStateNode based on the TRSP list, and attach the TRSP to the RegionStateNode. 247 * <p> 248 * This is used to restore the RIT region list, so we do not need to restore it in the loadingMeta 249 * method below. And it is also very important as now before submitting a TRSP, we need to attach 250 * it to the RegionStateNode, which acts like a guard, so we need to restore this information at 251 * the very beginning, before we start processing any procedures. 252 */ 253 public void setupRIT(List<TransitRegionStateProcedure> procs) { 254 procs.forEach(proc -> { 255 RegionInfo regionInfo = proc.getRegion(); 256 RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo); 257 TransitRegionStateProcedure existingProc = regionNode.getProcedure(); 258 if (existingProc != null) { 259 // This is possible, as we will detach the procedure from the RSN before we 260 // actually finish the procedure. This is because that, we will detach the TRSP from the RSN 261 // during execution, at that time, the procedure has not been marked as done in the pv2 262 // framework yet, so it is possible that we schedule a new TRSP immediately and when 263 // arriving here, we will find out that there are multiple TRSPs for the region. But we can 264 // make sure that, only the last one can take the charge, the previous ones should have all 265 // been finished already. So here we will compare the proc id, the greater one will win. 266 if (existingProc.getProcId() < proc.getProcId()) { 267 // the new one wins, unset and set it to the new one below 268 regionNode.unsetProcedure(existingProc); 269 } else { 270 // the old one wins, skip 271 return; 272 } 273 } 274 LOG.info("Attach {} to {} to restore RIT", proc, regionNode); 275 regionNode.setProcedure(proc); 276 }); 277 } 278 279 public void stop() { 280 if (!running.compareAndSet(true, false)) { 281 return; 282 } 283 284 LOG.info("Stopping assignment manager"); 285 286 // The AM is started before the procedure executor, 287 // but the actual work will be loaded/submitted only once we have the executor 288 final boolean hasProcExecutor = master.getMasterProcedureExecutor() != null; 289 290 // Remove the RIT chore 291 if (hasProcExecutor) { 292 master.getMasterProcedureExecutor().removeChore(this.ritChore); 293 if (this.deadMetricChore != null) { 294 master.getMasterProcedureExecutor().removeChore(this.deadMetricChore); 295 } 296 } 297 298 // Stop the Assignment Thread 299 stopAssignmentThread(); 300 301 // Stop the RegionStateStore 302 regionStates.clear(); 303 304 // Update meta events (for testing) 305 if (hasProcExecutor) { 306 metaLoadEvent.suspend(); 307 for (RegionInfo hri: getMetaRegionSet()) { 308 setMetaAssigned(hri, false); 309 } 310 } 311 } 312 313 public boolean isRunning() { 314 return running.get(); 315 } 316 317 public Configuration getConfiguration() { 318 return master.getConfiguration(); 319 } 320 321 public MetricsAssignmentManager getAssignmentManagerMetrics() { 322 return metrics; 323 } 324 325 private LoadBalancer getBalancer() { 326 return master.getLoadBalancer(); 327 } 328 329 private MasterProcedureEnv getProcedureEnvironment() { 330 return master.getMasterProcedureExecutor().getEnvironment(); 331 } 332 333 private MasterProcedureScheduler getProcedureScheduler() { 334 return getProcedureEnvironment().getProcedureScheduler(); 335 } 336 337 int getAssignMaxAttempts() { 338 return assignMaxAttempts; 339 } 340 341 int getAssignRetryImmediatelyMaxAttempts() { 342 return assignRetryImmediatelyMaxAttempts; 343 } 344 345 public RegionStates getRegionStates() { 346 return regionStates; 347 } 348 349 /** 350 * Returns the regions hosted by the specified server. 351 * <p/> 352 * Notice that, for SCP, after we submit the SCP, no one can change the region list for the 353 * ServerStateNode so we do not need any locks here. And for other usage, this can only give you a 354 * snapshot of the current region list for this server, which means, right after you get the 355 * region list, new regions may be moved to this server or some regions may be moved out from this 356 * server, so you should not use it critically if you need strong consistency. 357 */ 358 public List<RegionInfo> getRegionsOnServer(ServerName serverName) { 359 ServerStateNode serverInfo = regionStates.getServerNode(serverName); 360 if (serverInfo == null) { 361 return Collections.emptyList(); 362 } 363 return serverInfo.getRegionInfoList(); 364 } 365 366 public RegionStateStore getRegionStateStore() { 367 return regionStateStore; 368 } 369 370 public List<ServerName> getFavoredNodes(final RegionInfo regionInfo) { 371 return this.shouldAssignRegionsWithFavoredNodes 372 ? ((FavoredStochasticBalancer) getBalancer()).getFavoredNodes(regionInfo) 373 : ServerName.EMPTY_SERVER_LIST; 374 } 375 376 // ============================================================================================ 377 // Table State Manager helpers 378 // ============================================================================================ 379 TableStateManager getTableStateManager() { 380 return master.getTableStateManager(); 381 } 382 383 public boolean isTableEnabled(final TableName tableName) { 384 return getTableStateManager().isTableState(tableName, TableState.State.ENABLED); 385 } 386 387 public boolean isTableDisabled(final TableName tableName) { 388 return getTableStateManager().isTableState(tableName, 389 TableState.State.DISABLED, TableState.State.DISABLING); 390 } 391 392 // ============================================================================================ 393 // META Helpers 394 // ============================================================================================ 395 private boolean isMetaRegion(final RegionInfo regionInfo) { 396 return regionInfo.isMetaRegion(); 397 } 398 399 public boolean isMetaRegion(final byte[] regionName) { 400 return getMetaRegionFromName(regionName) != null; 401 } 402 403 public RegionInfo getMetaRegionFromName(final byte[] regionName) { 404 for (RegionInfo hri: getMetaRegionSet()) { 405 if (Bytes.equals(hri.getRegionName(), regionName)) { 406 return hri; 407 } 408 } 409 return null; 410 } 411 412 public boolean isCarryingMeta(final ServerName serverName) { 413 // TODO: handle multiple meta 414 return isCarryingRegion(serverName, RegionInfoBuilder.FIRST_META_REGIONINFO); 415 } 416 417 private boolean isCarryingRegion(final ServerName serverName, final RegionInfo regionInfo) { 418 // TODO: check for state? 419 final RegionStateNode node = regionStates.getRegionStateNode(regionInfo); 420 return(node != null && serverName.equals(node.getRegionLocation())); 421 } 422 423 private RegionInfo getMetaForRegion(final RegionInfo regionInfo) { 424 //if (regionInfo.isMetaRegion()) return regionInfo; 425 // TODO: handle multiple meta. if the region provided is not meta lookup 426 // which meta the region belongs to. 427 return RegionInfoBuilder.FIRST_META_REGIONINFO; 428 } 429 430 // TODO: handle multiple meta. 431 private static final Set<RegionInfo> META_REGION_SET = 432 Collections.singleton(RegionInfoBuilder.FIRST_META_REGIONINFO); 433 public Set<RegionInfo> getMetaRegionSet() { 434 return META_REGION_SET; 435 } 436 437 // ============================================================================================ 438 // META Event(s) helpers 439 // ============================================================================================ 440 /** 441 * Notice that, this only means the meta region is available on a RS, but the AM may still be 442 * loading the region states from meta, so usually you need to check {@link #isMetaLoaded()} first 443 * before checking this method, unless you can make sure that your piece of code can only be 444 * executed after AM builds the region states. 445 * @see #isMetaLoaded() 446 */ 447 public boolean isMetaAssigned() { 448 return metaAssignEvent.isReady(); 449 } 450 451 public boolean isMetaRegionInTransition() { 452 return !isMetaAssigned(); 453 } 454 455 /** 456 * Notice that this event does not mean the AM has already finished region state rebuilding. See 457 * the comment of {@link #isMetaAssigned()} for more details. 458 * @see #isMetaAssigned() 459 */ 460 public boolean waitMetaAssigned(Procedure<?> proc, RegionInfo regionInfo) { 461 return getMetaAssignEvent(getMetaForRegion(regionInfo)).suspendIfNotReady(proc); 462 } 463 464 private void setMetaAssigned(RegionInfo metaRegionInfo, boolean assigned) { 465 assert isMetaRegion(metaRegionInfo) : "unexpected non-meta region " + metaRegionInfo; 466 ProcedureEvent<?> metaAssignEvent = getMetaAssignEvent(metaRegionInfo); 467 if (assigned) { 468 metaAssignEvent.wake(getProcedureScheduler()); 469 } else { 470 metaAssignEvent.suspend(); 471 } 472 } 473 474 private ProcedureEvent<?> getMetaAssignEvent(RegionInfo metaRegionInfo) { 475 assert isMetaRegion(metaRegionInfo) : "unexpected non-meta region " + metaRegionInfo; 476 // TODO: handle multiple meta. 477 return metaAssignEvent; 478 } 479 480 /** 481 * Wait until AM finishes the meta loading, i.e, the region states rebuilding. 482 * @see #isMetaLoaded() 483 * @see #waitMetaAssigned(Procedure, RegionInfo) 484 */ 485 public boolean waitMetaLoaded(Procedure<?> proc) { 486 return metaLoadEvent.suspendIfNotReady(proc); 487 } 488 489 @VisibleForTesting 490 void wakeMetaLoadedEvent() { 491 metaLoadEvent.wake(getProcedureScheduler()); 492 assert isMetaLoaded() : "expected meta to be loaded"; 493 } 494 495 /** 496 * Return whether AM finishes the meta loading, i.e, the region states rebuilding. 497 * @see #isMetaAssigned() 498 * @see #waitMetaLoaded(Procedure) 499 */ 500 public boolean isMetaLoaded() { 501 return metaLoadEvent.isReady(); 502 } 503 504 /** 505 * Start a new thread to check if there are region servers whose versions are higher than others. 506 * If so, move all system table regions to RS with the highest version to keep compatibility. 507 * The reason is, RS in new version may not be able to access RS in old version when there are 508 * some incompatible changes. 509 * <p>This method is called when a new RegionServer is added to cluster only.</p> 510 */ 511 public void checkIfShouldMoveSystemRegionAsync() { 512 // TODO: Fix this thread. If a server is killed and a new one started, this thread thinks that 513 // it should 'move' the system tables from the old server to the new server but 514 // ServerCrashProcedure is on it; and it will take care of the assign without dataloss. 515 if (this.master.getServerManager().countOfRegionServers() <= 1) { 516 return; 517 } 518 // This thread used to run whenever there was a change in the cluster. The ZooKeeper 519 // childrenChanged notification came in before the nodeDeleted message and so this method 520 // cold run before a ServerCrashProcedure could run. That meant that this thread could see 521 // a Crashed Server before ServerCrashProcedure and it could find system regions on the 522 // crashed server and go move them before ServerCrashProcedure had a chance; could be 523 // dataloss too if WALs were not recovered. 524 new Thread(() -> { 525 try { 526 synchronized (checkIfShouldMoveSystemRegionLock) { 527 List<RegionPlan> plans = new ArrayList<>(); 528 // TODO: I don't think this code does a good job if all servers in cluster have same 529 // version. It looks like it will schedule unnecessary moves. 530 for (ServerName server : getExcludedServersForSystemTable()) { 531 if (master.getServerManager().isServerDead(server)) { 532 // TODO: See HBASE-18494 and HBASE-18495. Though getExcludedServersForSystemTable() 533 // considers only online servers, the server could be queued for dead server 534 // processing. As region assignments for crashed server is handled by 535 // ServerCrashProcedure, do NOT handle them here. The goal is to handle this through 536 // regular flow of LoadBalancer as a favored node and not to have this special 537 // handling. 538 continue; 539 } 540 List<RegionInfo> regionsShouldMove = getSystemTables(server); 541 if (!regionsShouldMove.isEmpty()) { 542 for (RegionInfo regionInfo : regionsShouldMove) { 543 // null value for dest forces destination server to be selected by balancer 544 RegionPlan plan = new RegionPlan(regionInfo, server, null); 545 if (regionInfo.isMetaRegion()) { 546 // Must move meta region first. 547 LOG.info("Async MOVE of {} to newer Server={}", 548 regionInfo.getEncodedName(), server); 549 moveAsync(plan); 550 } else { 551 plans.add(plan); 552 } 553 } 554 } 555 for (RegionPlan plan : plans) { 556 LOG.info("Async MOVE of {} to newer Server={}", 557 plan.getRegionInfo().getEncodedName(), server); 558 moveAsync(plan); 559 } 560 } 561 } 562 } catch (Throwable t) { 563 LOG.error(t.toString(), t); 564 } 565 }).start(); 566 } 567 568 private List<RegionInfo> getSystemTables(ServerName serverName) { 569 ServerStateNode serverNode = regionStates.getServerNode(serverName); 570 if (serverNode == null) { 571 return Collections.emptyList(); 572 } 573 return serverNode.getSystemRegionInfoList(); 574 } 575 576 private void preTransitCheck(RegionStateNode regionNode, RegionState.State[] expectedStates) 577 throws HBaseIOException { 578 if (regionNode.getProcedure() != null) { 579 throw new HBaseIOException(regionNode + " is currently in transition"); 580 } 581 if (!regionNode.isInState(expectedStates)) { 582 throw new DoNotRetryRegionException("Unexpected state for " + regionNode); 583 } 584 if (isTableDisabled(regionNode.getTable())) { 585 throw new DoNotRetryIOException(regionNode.getTable() + " is disabled for " + regionNode); 586 } 587 } 588 589 // TODO: Need an async version of this for hbck2. 590 public long assign(RegionInfo regionInfo, ServerName sn) throws IOException { 591 // TODO: should we use getRegionStateNode? 592 RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo); 593 TransitRegionStateProcedure proc; 594 regionNode.lock(); 595 try { 596 preTransitCheck(regionNode, STATES_EXPECTED_ON_ASSIGN); 597 proc = TransitRegionStateProcedure.assign(getProcedureEnvironment(), regionInfo, sn); 598 regionNode.setProcedure(proc); 599 } finally { 600 regionNode.unlock(); 601 } 602 ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc); 603 return proc.getProcId(); 604 } 605 606 public long assign(RegionInfo regionInfo) throws IOException { 607 return assign(regionInfo, null); 608 } 609 610 public long unassign(RegionInfo regionInfo) throws IOException { 611 RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo); 612 if (regionNode == null) { 613 throw new UnknownRegionException("No RegionState found for " + regionInfo.getEncodedName()); 614 } 615 TransitRegionStateProcedure proc; 616 regionNode.lock(); 617 try { 618 preTransitCheck(regionNode, STATES_EXPECTED_ON_UNASSIGN_OR_MOVE); 619 proc = TransitRegionStateProcedure.unassign(getProcedureEnvironment(), regionInfo); 620 regionNode.setProcedure(proc); 621 } finally { 622 regionNode.unlock(); 623 } 624 ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc); 625 return proc.getProcId(); 626 } 627 628 public TransitRegionStateProcedure createMoveRegionProcedure(RegionInfo regionInfo, 629 ServerName targetServer) throws HBaseIOException { 630 RegionStateNode regionNode = this.regionStates.getRegionStateNode(regionInfo); 631 if (regionNode == null) { 632 throw new UnknownRegionException("No RegionState found for " + regionInfo.getEncodedName()); 633 } 634 TransitRegionStateProcedure proc; 635 regionNode.lock(); 636 try { 637 preTransitCheck(regionNode, STATES_EXPECTED_ON_UNASSIGN_OR_MOVE); 638 regionNode.checkOnline(); 639 proc = TransitRegionStateProcedure.move(getProcedureEnvironment(), regionInfo, targetServer); 640 regionNode.setProcedure(proc); 641 } finally { 642 regionNode.unlock(); 643 } 644 return proc; 645 } 646 647 public void move(RegionInfo regionInfo) throws IOException { 648 TransitRegionStateProcedure proc = createMoveRegionProcedure(regionInfo, null); 649 ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc); 650 } 651 652 public Future<byte[]> moveAsync(RegionPlan regionPlan) throws HBaseIOException { 653 TransitRegionStateProcedure proc = 654 createMoveRegionProcedure(regionPlan.getRegionInfo(), regionPlan.getDestination()); 655 return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), proc); 656 } 657 658 // ============================================================================================ 659 // RegionTransition procedures helpers 660 // ============================================================================================ 661 662 /** 663 * Create round-robin assigns. Use on table creation to distribute out regions across cluster. 664 * @return AssignProcedures made out of the passed in <code>hris</code> and a call to the balancer 665 * to populate the assigns with targets chosen using round-robin (default balancer 666 * scheme). If at assign-time, the target chosen is no longer up, thats fine, the 667 * AssignProcedure will ask the balancer for a new target, and so on. 668 */ 669 public TransitRegionStateProcedure[] createRoundRobinAssignProcedures(List<RegionInfo> hris, 670 List<ServerName> serversToExclude) { 671 if (hris.isEmpty()) { 672 return new TransitRegionStateProcedure[0]; 673 } 674 675 if (serversToExclude != null 676 && this.master.getServerManager().getOnlineServersList().size() == 1) { 677 LOG.debug("Only one region server found and hence going ahead with the assignment"); 678 serversToExclude = null; 679 } 680 try { 681 // Ask the balancer to assign our regions. Pass the regions en masse. The balancer can do 682 // a better job if it has all the assignments in the one lump. 683 Map<ServerName, List<RegionInfo>> assignments = getBalancer().roundRobinAssignment(hris, 684 this.master.getServerManager().createDestinationServersList(serversToExclude)); 685 // Return mid-method! 686 return createAssignProcedures(assignments); 687 } catch (HBaseIOException hioe) { 688 LOG.warn("Failed roundRobinAssignment", hioe); 689 } 690 // If an error above, fall-through to this simpler assign. Last resort. 691 return createAssignProcedures(hris); 692 } 693 694 /** 695 * Create round-robin assigns. Use on table creation to distribute out regions across cluster. 696 * @return AssignProcedures made out of the passed in <code>hris</code> and a call to the balancer 697 * to populate the assigns with targets chosen using round-robin (default balancer 698 * scheme). If at assign-time, the target chosen is no longer up, thats fine, the 699 * AssignProcedure will ask the balancer for a new target, and so on. 700 */ 701 public TransitRegionStateProcedure[] createRoundRobinAssignProcedures(List<RegionInfo> hris) { 702 return createRoundRobinAssignProcedures(hris, null); 703 } 704 705 @VisibleForTesting 706 static int compare(TransitRegionStateProcedure left, TransitRegionStateProcedure right) { 707 if (left.getRegion().isMetaRegion()) { 708 if (right.getRegion().isMetaRegion()) { 709 return RegionInfo.COMPARATOR.compare(left.getRegion(), right.getRegion()); 710 } 711 return -1; 712 } else if (right.getRegion().isMetaRegion()) { 713 return +1; 714 } 715 if (left.getRegion().getTable().isSystemTable()) { 716 if (right.getRegion().getTable().isSystemTable()) { 717 return RegionInfo.COMPARATOR.compare(left.getRegion(), right.getRegion()); 718 } 719 return -1; 720 } else if (right.getRegion().getTable().isSystemTable()) { 721 return +1; 722 } 723 return RegionInfo.COMPARATOR.compare(left.getRegion(), right.getRegion()); 724 } 725 726 private TransitRegionStateProcedure createAssignProcedure(RegionStateNode regionNode, 727 ServerName targetServer, boolean override) { 728 TransitRegionStateProcedure proc; 729 regionNode.lock(); 730 try { 731 if(override && regionNode.getProcedure() != null) { 732 regionNode.unsetProcedure(regionNode.getProcedure()); 733 } 734 assert regionNode.getProcedure() == null; 735 proc = TransitRegionStateProcedure.assign(getProcedureEnvironment(), 736 regionNode.getRegionInfo(), targetServer); 737 regionNode.setProcedure(proc); 738 } finally { 739 regionNode.unlock(); 740 } 741 return proc; 742 } 743 744 private TransitRegionStateProcedure createUnassignProcedure(RegionStateNode regionNode, 745 boolean override) { 746 TransitRegionStateProcedure proc; 747 regionNode.lock(); 748 try { 749 if(override && regionNode.getProcedure() != null) { 750 regionNode.unsetProcedure(regionNode.getProcedure()); 751 } 752 assert regionNode.getProcedure() == null; 753 proc = TransitRegionStateProcedure.unassign(getProcedureEnvironment(), 754 regionNode.getRegionInfo()); 755 regionNode.setProcedure(proc); 756 } finally { 757 regionNode.unlock(); 758 } 759 return proc; 760 } 761 762 /** 763 * Create one TransitRegionStateProcedure to assign a region w/o specifying a target server. 764 * This method is specified for HBCK2 765 */ 766 public TransitRegionStateProcedure createOneAssignProcedure(RegionInfo hri, boolean override) { 767 RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(hri); 768 return createAssignProcedure(regionNode, null, override); 769 } 770 771 /** 772 * Create one TransitRegionStateProcedure to unassign a region. 773 * This method is specified for HBCK2 774 */ 775 public TransitRegionStateProcedure createOneUnassignProcedure(RegionInfo hri, boolean override) { 776 RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(hri); 777 return createUnassignProcedure(regionNode, override); 778 } 779 780 /** 781 * Create an array of TransitRegionStateProcedure w/o specifying a target server. 782 * <p/> 783 * If no target server, at assign time, we will try to use the former location of the region if 784 * one exists. This is how we 'retain' the old location across a server restart. 785 * <p/> 786 * Should only be called when you can make sure that no one can touch these regions other than 787 * you. For example, when you are creating table. 788 */ 789 public TransitRegionStateProcedure[] createAssignProcedures(List<RegionInfo> hris) { 790 return hris.stream().map(hri -> regionStates.getOrCreateRegionStateNode(hri)) 791 .map(regionNode -> createAssignProcedure(regionNode, null, false)) 792 .sorted(AssignmentManager::compare).toArray(TransitRegionStateProcedure[]::new); 793 } 794 795 /** 796 * @param assignments Map of assignments from which we produce an array of AssignProcedures. 797 * @return Assignments made from the passed in <code>assignments</code> 798 */ 799 private TransitRegionStateProcedure[] createAssignProcedures( 800 Map<ServerName, List<RegionInfo>> assignments) { 801 return assignments.entrySet().stream() 802 .flatMap(e -> e.getValue().stream().map(hri -> regionStates.getOrCreateRegionStateNode(hri)) 803 .map(regionNode -> createAssignProcedure(regionNode, e.getKey(), false))) 804 .sorted(AssignmentManager::compare).toArray(TransitRegionStateProcedure[]::new); 805 } 806 807 /** 808 * Called by DisableTableProcedure to unassign all the regions for a table. 809 */ 810 public TransitRegionStateProcedure[] createUnassignProceduresForDisabling(TableName tableName) { 811 return regionStates.getTableRegionStateNodes(tableName).stream().map(regionNode -> { 812 regionNode.lock(); 813 try { 814 if (!regionStates.include(regionNode, false) || 815 regionStates.isRegionOffline(regionNode.getRegionInfo())) { 816 return null; 817 } 818 // As in DisableTableProcedure, we will hold the xlock for table, so we can make sure that 819 // this procedure has not been executed yet, as TRSP will hold the shared lock for table all 820 // the time. So here we will unset it and when it is actually executed, it will find that 821 // the attach procedure is not itself and quit immediately. 822 if (regionNode.getProcedure() != null) { 823 regionNode.unsetProcedure(regionNode.getProcedure()); 824 } 825 TransitRegionStateProcedure proc = TransitRegionStateProcedure 826 .unassign(getProcedureEnvironment(), regionNode.getRegionInfo()); 827 regionNode.setProcedure(proc); 828 return proc; 829 } finally { 830 regionNode.unlock(); 831 } 832 }).filter(p -> p != null).toArray(TransitRegionStateProcedure[]::new); 833 } 834 835 public SplitTableRegionProcedure createSplitProcedure(final RegionInfo regionToSplit, 836 final byte[] splitKey) throws IOException { 837 return new SplitTableRegionProcedure(getProcedureEnvironment(), regionToSplit, splitKey); 838 } 839 840 public MergeTableRegionsProcedure createMergeProcedure(RegionInfo ... ris) throws IOException { 841 return new MergeTableRegionsProcedure(getProcedureEnvironment(), ris, false); 842 } 843 844 /** 845 * Delete the region states. This is called by "DeleteTable" 846 */ 847 public void deleteTable(final TableName tableName) throws IOException { 848 final ArrayList<RegionInfo> regions = regionStates.getTableRegionsInfo(tableName); 849 regionStateStore.deleteRegions(regions); 850 for (int i = 0; i < regions.size(); ++i) { 851 final RegionInfo regionInfo = regions.get(i); 852 // we expect the region to be offline 853 regionStates.removeFromOfflineRegions(regionInfo); 854 regionStates.deleteRegion(regionInfo); 855 } 856 } 857 858 // ============================================================================================ 859 // RS Region Transition Report helpers 860 // ============================================================================================ 861 private void reportRegionStateTransition(ReportRegionStateTransitionResponse.Builder builder, 862 ServerName serverName, List<RegionStateTransition> transitionList) throws IOException { 863 for (RegionStateTransition transition : transitionList) { 864 switch (transition.getTransitionCode()) { 865 case OPENED: 866 case FAILED_OPEN: 867 case CLOSED: 868 assert transition.getRegionInfoCount() == 1 : transition; 869 final RegionInfo hri = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0)); 870 long procId = 871 transition.getProcIdCount() > 0 ? transition.getProcId(0) : Procedure.NO_PROC_ID; 872 updateRegionTransition(serverName, transition.getTransitionCode(), hri, 873 transition.hasOpenSeqNum() ? transition.getOpenSeqNum() : HConstants.NO_SEQNUM, procId); 874 break; 875 case READY_TO_SPLIT: 876 case SPLIT: 877 case SPLIT_REVERTED: 878 assert transition.getRegionInfoCount() == 3 : transition; 879 final RegionInfo parent = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0)); 880 final RegionInfo splitA = ProtobufUtil.toRegionInfo(transition.getRegionInfo(1)); 881 final RegionInfo splitB = ProtobufUtil.toRegionInfo(transition.getRegionInfo(2)); 882 updateRegionSplitTransition(serverName, transition.getTransitionCode(), parent, splitA, 883 splitB); 884 break; 885 case READY_TO_MERGE: 886 case MERGED: 887 case MERGE_REVERTED: 888 assert transition.getRegionInfoCount() == 3 : transition; 889 final RegionInfo merged = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0)); 890 final RegionInfo mergeA = ProtobufUtil.toRegionInfo(transition.getRegionInfo(1)); 891 final RegionInfo mergeB = ProtobufUtil.toRegionInfo(transition.getRegionInfo(2)); 892 updateRegionMergeTransition(serverName, transition.getTransitionCode(), merged, mergeA, 893 mergeB); 894 break; 895 } 896 } 897 } 898 899 public ReportRegionStateTransitionResponse reportRegionStateTransition( 900 final ReportRegionStateTransitionRequest req) throws PleaseHoldException { 901 ReportRegionStateTransitionResponse.Builder builder = 902 ReportRegionStateTransitionResponse.newBuilder(); 903 ServerName serverName = ProtobufUtil.toServerName(req.getServer()); 904 ServerStateNode serverNode = regionStates.getOrCreateServer(serverName); 905 // here we have to acquire a read lock instead of a simple exclusive lock. This is because that 906 // we should not block other reportRegionStateTransition call from the same region server. This 907 // is not only about performance, but also to prevent dead lock. Think of the meta region is 908 // also on the same region server and you hold the lock which blocks the 909 // reportRegionStateTransition for meta, and since meta is not online, you will block inside the 910 // lock protection to wait for meta online... 911 serverNode.readLock().lock(); 912 try { 913 // we only accept reportRegionStateTransition if the region server is online, see the comment 914 // above in submitServerCrash method and HBASE-21508 for more details. 915 if (serverNode.isInState(ServerState.ONLINE)) { 916 try { 917 reportRegionStateTransition(builder, serverName, req.getTransitionList()); 918 } catch (PleaseHoldException e) { 919 LOG.trace("Failed transition ", e); 920 throw e; 921 } catch (UnsupportedOperationException | IOException e) { 922 // TODO: at the moment we have a single error message and the RS will abort 923 // if the master says that one of the region transitions failed. 924 LOG.warn("Failed transition", e); 925 builder.setErrorMessage("Failed transition " + e.getMessage()); 926 } 927 } else { 928 LOG.warn("The region server {} is already dead, skip reportRegionStateTransition call", 929 serverName); 930 builder.setErrorMessage("You are dead"); 931 } 932 } finally { 933 serverNode.readLock().unlock(); 934 } 935 936 return builder.build(); 937 } 938 939 private void updateRegionTransition(ServerName serverName, TransitionCode state, 940 RegionInfo regionInfo, long seqId, long procId) throws IOException { 941 checkMetaLoaded(regionInfo); 942 943 RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo); 944 if (regionNode == null) { 945 // the table/region is gone. maybe a delete, split, merge 946 throw new UnexpectedStateException(String.format( 947 "Server %s was trying to transition region %s to %s. but the region was removed.", 948 serverName, regionInfo, state)); 949 } 950 LOG.trace("Update region transition serverName={} region={} regionState={}", serverName, 951 regionNode, state); 952 953 ServerStateNode serverNode = regionStates.getOrCreateServer(serverName); 954 regionNode.lock(); 955 try { 956 if (!reportTransition(regionNode, serverNode, state, seqId, procId)) { 957 // Don't log WARN if shutting down cluster; during shutdown. Avoid the below messages: 958 // 2018-08-13 10:45:10,551 WARN ...AssignmentManager: No matching procedure found for 959 // rit=OPEN, location=ve0538.halxg.cloudera.com,16020,1533493000958, 960 // table=IntegrationTestBigLinkedList, region=65ab289e2fc1530df65f6c3d7cde7aa5 transition 961 // to CLOSED 962 // These happen because on cluster shutdown, we currently let the RegionServers close 963 // regions. This is the only time that region close is not run by the Master (so cluster 964 // goes down fast). Consider changing it so Master runs all shutdowns. 965 if (this.master.getServerManager().isClusterShutdown() && 966 state.equals(TransitionCode.CLOSED)) { 967 LOG.info("RegionServer {} {}", state, regionNode.getRegionInfo().getEncodedName()); 968 } else { 969 LOG.warn("No matching procedure found for {} transition to {}", regionNode, state); 970 } 971 } 972 } finally { 973 regionNode.unlock(); 974 } 975 } 976 977 private boolean reportTransition(RegionStateNode regionNode, ServerStateNode serverNode, 978 TransitionCode state, long seqId, long procId) throws IOException { 979 ServerName serverName = serverNode.getServerName(); 980 TransitRegionStateProcedure proc = regionNode.getProcedure(); 981 if (proc == null) { 982 return false; 983 } 984 proc.reportTransition(master.getMasterProcedureExecutor().getEnvironment(), regionNode, 985 serverName, state, seqId, procId); 986 return true; 987 } 988 989 private void updateRegionSplitTransition(final ServerName serverName, final TransitionCode state, 990 final RegionInfo parent, final RegionInfo hriA, final RegionInfo hriB) 991 throws IOException { 992 checkMetaLoaded(parent); 993 994 if (state != TransitionCode.READY_TO_SPLIT) { 995 throw new UnexpectedStateException("unsupported split regionState=" + state + 996 " for parent region " + parent + 997 " maybe an old RS (< 2.0) had the operation in progress"); 998 } 999 1000 // sanity check on the request 1001 if (!Bytes.equals(hriA.getEndKey(), hriB.getStartKey())) { 1002 throw new UnsupportedOperationException( 1003 "unsupported split request with bad keys: parent=" + parent + 1004 " hriA=" + hriA + " hriB=" + hriB); 1005 } 1006 1007 // Submit the Split procedure 1008 final byte[] splitKey = hriB.getStartKey(); 1009 if (LOG.isDebugEnabled()) { 1010 LOG.debug("Split request from " + serverName + 1011 ", parent=" + parent + " splitKey=" + Bytes.toStringBinary(splitKey)); 1012 } 1013 master.getMasterProcedureExecutor().submitProcedure(createSplitProcedure(parent, splitKey)); 1014 1015 // If the RS is < 2.0 throw an exception to abort the operation, we are handling the split 1016 if (master.getServerManager().getVersionNumber(serverName) < 0x0200000) { 1017 throw new UnsupportedOperationException(String.format( 1018 "Split handled by the master: parent=%s hriA=%s hriB=%s", parent.getShortNameToLog(), hriA, hriB)); 1019 } 1020 } 1021 1022 private void updateRegionMergeTransition(final ServerName serverName, final TransitionCode state, 1023 final RegionInfo merged, final RegionInfo hriA, final RegionInfo hriB) throws IOException { 1024 checkMetaLoaded(merged); 1025 1026 if (state != TransitionCode.READY_TO_MERGE) { 1027 throw new UnexpectedStateException("Unsupported merge regionState=" + state + 1028 " for regionA=" + hriA + " regionB=" + hriB + " merged=" + merged + 1029 " maybe an old RS (< 2.0) had the operation in progress"); 1030 } 1031 1032 // Submit the Merge procedure 1033 if (LOG.isDebugEnabled()) { 1034 LOG.debug("Handling merge request from RS=" + merged + ", merged=" + merged); 1035 } 1036 master.getMasterProcedureExecutor().submitProcedure(createMergeProcedure(hriA, hriB)); 1037 1038 // If the RS is < 2.0 throw an exception to abort the operation, we are handling the merge 1039 if (master.getServerManager().getVersionNumber(serverName) < 0x0200000) { 1040 throw new UnsupportedOperationException(String.format( 1041 "Merge not handled yet: regionState=%s merged=%s hriA=%s hriB=%s", state, merged, hriA, 1042 hriB)); 1043 } 1044 } 1045 1046 // ============================================================================================ 1047 // RS Status update (report online regions) helpers 1048 // ============================================================================================ 1049 /** 1050 * The master will call this method when the RS send the regionServerReport(). The report will 1051 * contains the "online regions". This method will check the the online regions against the 1052 * in-memory state of the AM, and we will log a warn message if there is a mismatch. This is 1053 * because that there is no fencing between the reportRegionStateTransition method and 1054 * regionServerReport method, so there could be race and introduce inconsistency here, but 1055 * actually there is no problem. 1056 * <p/> 1057 * Please see HBASE-21421 and HBASE-21463 for more details. 1058 */ 1059 public void reportOnlineRegions(ServerName serverName, Set<byte[]> regionNames) { 1060 if (!isRunning()) { 1061 return; 1062 } 1063 if (LOG.isTraceEnabled()) { 1064 LOG.trace("ReportOnlineRegions {} regionCount={}, metaLoaded={} {}", serverName, 1065 regionNames.size(), isMetaLoaded(), 1066 regionNames.stream().map(Bytes::toStringBinary).collect(Collectors.toList())); 1067 } 1068 1069 ServerStateNode serverNode = regionStates.getOrCreateServer(serverName); 1070 synchronized (serverNode) { 1071 if (!serverNode.isInState(ServerState.ONLINE)) { 1072 LOG.warn("Got a report from a server result in state " + serverNode.getState()); 1073 return; 1074 } 1075 } 1076 1077 // Track the regionserver reported online regions in memory. 1078 synchronized (rsReports) { 1079 rsReports.put(serverName, regionNames); 1080 } 1081 1082 if (regionNames.isEmpty()) { 1083 // nothing to do if we don't have regions 1084 LOG.trace("no online region found on {}", serverName); 1085 return; 1086 } 1087 if (!isMetaLoaded()) { 1088 // we are still on startup, skip checking 1089 return; 1090 } 1091 // The Heartbeat tells us of what regions are on the region serve, check the state. 1092 checkOnlineRegionsReport(serverNode, regionNames); 1093 } 1094 1095 // just check and output possible inconsistency, without actually doing anything 1096 private void checkOnlineRegionsReport(ServerStateNode serverNode, Set<byte[]> regionNames) { 1097 ServerName serverName = serverNode.getServerName(); 1098 for (byte[] regionName : regionNames) { 1099 if (!isRunning()) { 1100 return; 1101 } 1102 RegionStateNode regionNode = regionStates.getRegionStateNodeFromName(regionName); 1103 if (regionNode == null) { 1104 LOG.warn("No region state node for {}, it should already be on {}", 1105 Bytes.toStringBinary(regionName), serverName); 1106 continue; 1107 } 1108 regionNode.lock(); 1109 try { 1110 long diff = EnvironmentEdgeManager.currentTime() - regionNode.getLastUpdate(); 1111 if (regionNode.isInState(State.OPENING, State.OPEN)) { 1112 // This is possible as a region server has just closed a region but the region server 1113 // report is generated before the closing, but arrive after the closing. Make sure there 1114 // is some elapsed time so less false alarms. 1115 if (!regionNode.getRegionLocation().equals(serverName) && diff > 1000) { 1116 LOG.warn("{} reported OPEN on server={} but state has otherwise", regionNode, 1117 serverName); 1118 } 1119 } else if (!regionNode.isInState(State.CLOSING, State.SPLITTING)) { 1120 // So, we can get report that a region is CLOSED or SPLIT because a heartbeat 1121 // came in at about same time as a region transition. Make sure there is some 1122 // elapsed time so less false alarms. 1123 if (diff > 1000) { 1124 LOG.warn("{} reported an unexpected OPEN on {}; time since last update={}ms", 1125 regionNode, serverName, diff); 1126 } 1127 } 1128 } finally { 1129 regionNode.unlock(); 1130 } 1131 } 1132 } 1133 1134 // ============================================================================================ 1135 // RIT chore 1136 // ============================================================================================ 1137 private static class RegionInTransitionChore extends ProcedureInMemoryChore<MasterProcedureEnv> { 1138 public RegionInTransitionChore(final int timeoutMsec) { 1139 super(timeoutMsec); 1140 } 1141 1142 @Override 1143 protected void periodicExecute(final MasterProcedureEnv env) { 1144 final AssignmentManager am = env.getAssignmentManager(); 1145 1146 final RegionInTransitionStat ritStat = am.computeRegionInTransitionStat(); 1147 if (ritStat.hasRegionsOverThreshold()) { 1148 for (RegionState hri: ritStat.getRegionOverThreshold()) { 1149 am.handleRegionOverStuckWarningThreshold(hri.getRegion()); 1150 } 1151 } 1152 1153 // update metrics 1154 am.updateRegionsInTransitionMetrics(ritStat); 1155 } 1156 } 1157 1158 private static class DeadServerMetricRegionChore 1159 extends ProcedureInMemoryChore<MasterProcedureEnv> { 1160 public DeadServerMetricRegionChore(final int timeoutMsec) { 1161 super(timeoutMsec); 1162 } 1163 1164 @Override 1165 protected void periodicExecute(final MasterProcedureEnv env) { 1166 final ServerManager sm = env.getMasterServices().getServerManager(); 1167 final AssignmentManager am = env.getAssignmentManager(); 1168 // To minimize inconsistencies we are not going to snapshot live servers in advance in case 1169 // new servers are added; OTOH we don't want to add heavy sync for a consistent view since 1170 // this is for metrics. Instead, we're going to check each regions as we go; to avoid making 1171 // too many checks, we maintain a local lists of server, limiting us to false negatives. If 1172 // we miss some recently-dead server, we'll just see it next time. 1173 Set<ServerName> recentlyLiveServers = new HashSet<>(); 1174 int deadRegions = 0, unknownRegions = 0; 1175 for (RegionStateNode rsn : am.getRegionStates().getRegionStateNodes()) { 1176 if (rsn.getState() != State.OPEN) { 1177 continue; // Opportunistic check, should quickly skip RITs, offline tables, etc. 1178 } 1179 // Do not need to acquire region state lock as this is only for showing metrics. 1180 ServerName sn = rsn.getRegionLocation(); 1181 State state = rsn.getState(); 1182 if (state != State.OPEN) { 1183 continue; // Mostly skipping RITs that are already being take care of. 1184 } 1185 if (sn == null) { 1186 ++unknownRegions; // Opened on null? 1187 continue; 1188 } 1189 if (recentlyLiveServers.contains(sn)) { 1190 continue; 1191 } 1192 ServerManager.ServerLiveState sls = sm.isServerKnownAndOnline(sn); 1193 switch (sls) { 1194 case LIVE: 1195 recentlyLiveServers.add(sn); 1196 break; 1197 case DEAD: 1198 ++deadRegions; 1199 break; 1200 case UNKNOWN: 1201 ++unknownRegions; 1202 break; 1203 default: throw new AssertionError("Unexpected " + sls); 1204 } 1205 } 1206 if (deadRegions > 0 || unknownRegions > 0) { 1207 LOG.info("Found {} OPEN regions on dead servers and {} OPEN regions on unknown servers", 1208 deadRegions, unknownRegions); 1209 } 1210 1211 am.updateDeadServerRegionMetrics(deadRegions, unknownRegions); 1212 } 1213 } 1214 1215 public RegionInTransitionStat computeRegionInTransitionStat() { 1216 final RegionInTransitionStat rit = new RegionInTransitionStat(getConfiguration()); 1217 rit.update(this); 1218 return rit; 1219 } 1220 1221 public static class RegionInTransitionStat { 1222 private final int ritThreshold; 1223 1224 private HashMap<String, RegionState> ritsOverThreshold = null; 1225 private long statTimestamp; 1226 private long oldestRITTime = 0; 1227 private int totalRITsTwiceThreshold = 0; 1228 private int totalRITs = 0; 1229 1230 @VisibleForTesting 1231 public RegionInTransitionStat(final Configuration conf) { 1232 this.ritThreshold = 1233 conf.getInt(METRICS_RIT_STUCK_WARNING_THRESHOLD, DEFAULT_RIT_STUCK_WARNING_THRESHOLD); 1234 } 1235 1236 public int getRITThreshold() { 1237 return ritThreshold; 1238 } 1239 1240 public long getTimestamp() { 1241 return statTimestamp; 1242 } 1243 1244 public int getTotalRITs() { 1245 return totalRITs; 1246 } 1247 1248 public long getOldestRITTime() { 1249 return oldestRITTime; 1250 } 1251 1252 public int getTotalRITsOverThreshold() { 1253 Map<String, RegionState> m = this.ritsOverThreshold; 1254 return m != null ? m.size() : 0; 1255 } 1256 1257 public boolean hasRegionsTwiceOverThreshold() { 1258 return totalRITsTwiceThreshold > 0; 1259 } 1260 1261 public boolean hasRegionsOverThreshold() { 1262 Map<String, RegionState> m = this.ritsOverThreshold; 1263 return m != null && !m.isEmpty(); 1264 } 1265 1266 public Collection<RegionState> getRegionOverThreshold() { 1267 Map<String, RegionState> m = this.ritsOverThreshold; 1268 return m != null? m.values(): Collections.emptySet(); 1269 } 1270 1271 public boolean isRegionOverThreshold(final RegionInfo regionInfo) { 1272 Map<String, RegionState> m = this.ritsOverThreshold; 1273 return m != null && m.containsKey(regionInfo.getEncodedName()); 1274 } 1275 1276 public boolean isRegionTwiceOverThreshold(final RegionInfo regionInfo) { 1277 Map<String, RegionState> m = this.ritsOverThreshold; 1278 if (m == null) return false; 1279 final RegionState state = m.get(regionInfo.getEncodedName()); 1280 if (state == null) return false; 1281 return (statTimestamp - state.getStamp()) > (ritThreshold * 2); 1282 } 1283 1284 protected void update(final AssignmentManager am) { 1285 final RegionStates regionStates = am.getRegionStates(); 1286 this.statTimestamp = EnvironmentEdgeManager.currentTime(); 1287 update(regionStates.getRegionsStateInTransition(), statTimestamp); 1288 update(regionStates.getRegionFailedOpen(), statTimestamp); 1289 } 1290 1291 private void update(final Collection<RegionState> regions, final long currentTime) { 1292 for (RegionState state: regions) { 1293 totalRITs++; 1294 final long ritStartedMs = state.getStamp(); 1295 if (ritStartedMs == 0) { 1296 // Don't output bogus values to metrics if they accidentally make it here. 1297 LOG.warn("The RIT {} has no start time", state.getRegion()); 1298 continue; 1299 } 1300 final long ritTime = currentTime - ritStartedMs; 1301 if (ritTime > ritThreshold) { 1302 if (ritsOverThreshold == null) { 1303 ritsOverThreshold = new HashMap<String, RegionState>(); 1304 } 1305 ritsOverThreshold.put(state.getRegion().getEncodedName(), state); 1306 totalRITsTwiceThreshold += (ritTime > (ritThreshold * 2)) ? 1 : 0; 1307 } 1308 if (oldestRITTime < ritTime) { 1309 oldestRITTime = ritTime; 1310 } 1311 } 1312 } 1313 } 1314 1315 private void updateRegionsInTransitionMetrics(final RegionInTransitionStat ritStat) { 1316 metrics.updateRITOldestAge(ritStat.getOldestRITTime()); 1317 metrics.updateRITCount(ritStat.getTotalRITs()); 1318 metrics.updateRITCountOverThreshold(ritStat.getTotalRITsOverThreshold()); 1319 } 1320 1321 private void updateDeadServerRegionMetrics(int deadRegions, int unknownRegions) { 1322 metrics.updateDeadServerOpenRegions(deadRegions); 1323 metrics.updateUnknownServerOpenRegions(unknownRegions); 1324 } 1325 1326 private void handleRegionOverStuckWarningThreshold(final RegionInfo regionInfo) { 1327 final RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo); 1328 //if (regionNode.isStuck()) { 1329 LOG.warn("STUCK Region-In-Transition {}", regionNode); 1330 } 1331 1332 // ============================================================================================ 1333 // TODO: Master load/bootstrap 1334 // ============================================================================================ 1335 public void joinCluster() throws IOException { 1336 long startTime = System.nanoTime(); 1337 LOG.debug("Joining cluster..."); 1338 1339 // Scan hbase:meta to build list of existing regions, servers, and assignment. 1340 // hbase:meta is online now or will be. Inside loadMeta, we keep trying. Can't make progress 1341 // w/o meta. 1342 loadMeta(); 1343 1344 while (master.getServerManager().countOfRegionServers() < 1) { 1345 LOG.info("Waiting for RegionServers to join; current count={}", 1346 master.getServerManager().countOfRegionServers()); 1347 Threads.sleep(250); 1348 } 1349 LOG.info("Number of RegionServers={}", master.getServerManager().countOfRegionServers()); 1350 1351 // Start the chores 1352 master.getMasterProcedureExecutor().addChore(this.ritChore); 1353 master.getMasterProcedureExecutor().addChore(this.deadMetricChore); 1354 1355 long costMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime); 1356 LOG.info("Joined the cluster in {}", StringUtils.humanTimeDiff(costMs)); 1357 } 1358 1359 /** 1360 * Create assign procedure for offline regions. 1361 * Just follow the old processofflineServersWithOnlineRegions method. Since now we do not need to 1362 * deal with dead server any more, we only deal with the regions in OFFLINE state in this method. 1363 * And this is a bit strange, that for new regions, we will add it in CLOSED state instead of 1364 * OFFLINE state, and usually there will be a procedure to track them. The 1365 * processofflineServersWithOnlineRegions is a legacy from long ago, as things are going really 1366 * different now, maybe we do not need this method any more. Need to revisit later. 1367 */ 1368 // Public so can be run by the Master as part of the startup. Needs hbase:meta to be online. 1369 // Needs to be done after the table state manager has been started. 1370 public void processOfflineRegions() { 1371 List<RegionInfo> offlineRegions = regionStates.getRegionStates().stream() 1372 .filter(RegionState::isOffline).filter(s -> isTableEnabled(s.getRegion().getTable())) 1373 .map(RegionState::getRegion).collect(Collectors.toList()); 1374 if (!offlineRegions.isEmpty()) { 1375 master.getMasterProcedureExecutor().submitProcedures( 1376 master.getAssignmentManager().createRoundRobinAssignProcedures(offlineRegions)); 1377 } 1378 } 1379 1380 private void loadMeta() throws IOException { 1381 // TODO: use a thread pool 1382 regionStateStore.visitMeta(new RegionStateStore.RegionStateVisitor() { 1383 @Override 1384 public void visitRegionState(Result result, final RegionInfo regionInfo, final State state, 1385 final ServerName regionLocation, final ServerName lastHost, final long openSeqNum) { 1386 if (state == null && regionLocation == null && lastHost == null && 1387 openSeqNum == SequenceId.NO_SEQUENCE_ID) { 1388 // This is a row with nothing in it. 1389 LOG.warn("Skipping empty row={}", result); 1390 return; 1391 } 1392 State localState = state; 1393 if (localState == null) { 1394 // No region state column data in hbase:meta table! Are I doing a rolling upgrade from 1395 // hbase1 to hbase2? Am I restoring a SNAPSHOT or otherwise adding a region to hbase:meta? 1396 // In any of these cases, state is empty. For now, presume OFFLINE but there are probably 1397 // cases where we need to probe more to be sure this correct; TODO informed by experience. 1398 LOG.info(regionInfo.getEncodedName() + " regionState=null; presuming " + State.OFFLINE); 1399 localState = State.OFFLINE; 1400 } 1401 RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo); 1402 // Do not need to lock on regionNode, as we can make sure that before we finish loading 1403 // meta, all the related procedures can not be executed. The only exception is for meta 1404 // region related operations, but here we do not load the informations for meta region. 1405 regionNode.setState(localState); 1406 regionNode.setLastHost(lastHost); 1407 regionNode.setRegionLocation(regionLocation); 1408 regionNode.setOpenSeqNum(openSeqNum); 1409 1410 // Note: keep consistent with other methods, see region(Opening|Opened|Closing) 1411 // RIT/ServerCrash handling should take care of the transiting regions. 1412 if (localState.matches(State.OPEN, State.OPENING, State.CLOSING, State.SPLITTING, 1413 State.MERGING)) { 1414 assert regionLocation != null : "found null region location for " + regionNode; 1415 regionStates.addRegionToServer(regionNode); 1416 } else if (localState == State.OFFLINE || regionInfo.isOffline()) { 1417 regionStates.addToOfflineRegions(regionNode); 1418 } 1419 if (regionNode.getProcedure() != null) { 1420 regionNode.getProcedure().stateLoaded(AssignmentManager.this, regionNode); 1421 } 1422 } 1423 }); 1424 1425 // every assignment is blocked until meta is loaded. 1426 wakeMetaLoadedEvent(); 1427 } 1428 1429 /** 1430 * Used to check if the meta loading is done. 1431 * <p/> 1432 * if not we throw PleaseHoldException since we are rebuilding the RegionStates 1433 * @param hri region to check if it is already rebuild 1434 * @throws PleaseHoldException if meta has not been loaded yet 1435 */ 1436 private void checkMetaLoaded(RegionInfo hri) throws PleaseHoldException { 1437 if (!isRunning()) { 1438 throw new PleaseHoldException("AssignmentManager not running"); 1439 } 1440 boolean meta = isMetaRegion(hri); 1441 boolean metaLoaded = isMetaLoaded(); 1442 if (!meta && !metaLoaded) { 1443 throw new PleaseHoldException( 1444 "Master not fully online; hbase:meta=" + meta + ", metaLoaded=" + metaLoaded); 1445 } 1446 } 1447 1448 // ============================================================================================ 1449 // TODO: Metrics 1450 // ============================================================================================ 1451 public int getNumRegionsOpened() { 1452 // TODO: Used by TestRegionPlacement.java and assume monotonically increasing value 1453 return 0; 1454 } 1455 1456 /** 1457 * Usually run by the Master in reaction to server crash during normal processing. 1458 * Can also be invoked via external RPC to effect repair; in the latter case, 1459 * the 'force' flag is set so we push through the SCP though context may indicate 1460 * already-running-SCP (An old SCP may have exited abnormally, or damaged cluster 1461 * may still have references in hbase:meta to 'Unknown Servers' -- servers that 1462 * are not online or in dead servers list, etc.) 1463 * @param force Set if the request came in externally over RPC (via hbck2). Force means 1464 * run the SCP even if it seems as though there might be an outstanding 1465 * SCP running. 1466 * @return pid of scheduled SCP or {@link Procedure#NO_PROC_ID} if none scheduled. 1467 */ 1468 public long submitServerCrash(ServerName serverName, boolean shouldSplitWal, boolean force) { 1469 // May be an 'Unknown Server' so handle case where serverNode is null. 1470 ServerStateNode serverNode = regionStates.getServerNode(serverName); 1471 // Remove the in-memory rsReports result 1472 synchronized (rsReports) { 1473 rsReports.remove(serverName); 1474 } 1475 1476 // We hold the write lock here for fencing on reportRegionStateTransition. Once we set the 1477 // server state to CRASHED, we will no longer accept the reportRegionStateTransition call from 1478 // this server. This is used to simplify the implementation for TRSP and SCP, where we can make 1479 // sure that, the region list fetched by SCP will not be changed any more. 1480 if (serverNode != null) { 1481 serverNode.writeLock().lock(); 1482 } 1483 boolean carryingMeta; 1484 long pid; 1485 try { 1486 ProcedureExecutor<MasterProcedureEnv> procExec = this.master.getMasterProcedureExecutor(); 1487 carryingMeta = isCarryingMeta(serverName); 1488 if (!force && serverNode != null && !serverNode.isInState(ServerState.ONLINE)) { 1489 LOG.info("Skip adding SCP for {} (meta={}) -- running?", serverNode, carryingMeta); 1490 return Procedure.NO_PROC_ID; 1491 } else { 1492 MasterProcedureEnv mpe = procExec.getEnvironment(); 1493 // If serverNode == null, then 'Unknown Server'. Schedule HBCKSCP instead. 1494 // HBCKSCP scours Master in-memory state AND hbase;meta for references to 1495 // serverName just-in-case. An SCP that is scheduled when the server is 1496 // 'Unknown' probably originated externally with HBCK2 fix-it tool. 1497 ServerState oldState = null; 1498 if (serverNode != null) { 1499 oldState = serverNode.getState(); 1500 serverNode.setState(ServerState.CRASHED); 1501 } 1502 1503 if (force) { 1504 pid = procExec.submitProcedure( 1505 new HBCKServerCrashProcedure(mpe, serverName, shouldSplitWal, carryingMeta)); 1506 } else { 1507 pid = procExec.submitProcedure( 1508 new ServerCrashProcedure(mpe, serverName, shouldSplitWal, carryingMeta)); 1509 } 1510 LOG.info("Scheduled SCP pid={} for {} (carryingMeta={}){}.", pid, serverName, carryingMeta, 1511 serverNode == null? "": " " + serverNode.toString() + ", oldState=" + oldState); 1512 } 1513 } finally { 1514 if (serverNode != null) { 1515 serverNode.writeLock().unlock(); 1516 } 1517 } 1518 return pid; 1519 } 1520 1521 public void offlineRegion(final RegionInfo regionInfo) { 1522 // TODO used by MasterRpcServices 1523 RegionStateNode node = regionStates.getRegionStateNode(regionInfo); 1524 if (node != null) { 1525 node.offline(); 1526 } 1527 } 1528 1529 public void onlineRegion(final RegionInfo regionInfo, final ServerName serverName) { 1530 // TODO used by TestSplitTransactionOnCluster.java 1531 } 1532 1533 public Map<ServerName, List<RegionInfo>> getSnapShotOfAssignment( 1534 final Collection<RegionInfo> regions) { 1535 return regionStates.getSnapShotOfAssignment(regions); 1536 } 1537 1538 // ============================================================================================ 1539 // TODO: UTILS/HELPERS? 1540 // ============================================================================================ 1541 /** 1542 * Used by the client (via master) to identify if all regions have the schema updates 1543 * 1544 * @param tableName 1545 * @return Pair indicating the status of the alter command (pending/total) 1546 * @throws IOException 1547 */ 1548 public Pair<Integer, Integer> getReopenStatus(TableName tableName) { 1549 if (isTableDisabled(tableName)) return new Pair<Integer, Integer>(0, 0); 1550 1551 final List<RegionState> states = regionStates.getTableRegionStates(tableName); 1552 int ritCount = 0; 1553 for (RegionState regionState: states) { 1554 if (!regionState.isOpened() && !regionState.isSplit()) { 1555 ritCount++; 1556 } 1557 } 1558 return new Pair<Integer, Integer>(ritCount, states.size()); 1559 } 1560 1561 // ============================================================================================ 1562 // TODO: Region State In Transition 1563 // ============================================================================================ 1564 public boolean hasRegionsInTransition() { 1565 return regionStates.hasRegionsInTransition(); 1566 } 1567 1568 public List<RegionStateNode> getRegionsInTransition() { 1569 return regionStates.getRegionsInTransition(); 1570 } 1571 1572 public List<RegionInfo> getAssignedRegions() { 1573 return regionStates.getAssignedRegions(); 1574 } 1575 1576 public RegionInfo getRegionInfo(final byte[] regionName) { 1577 final RegionStateNode regionState = regionStates.getRegionStateNodeFromName(regionName); 1578 return regionState != null ? regionState.getRegionInfo() : null; 1579 } 1580 1581 // ============================================================================================ 1582 // Expected states on region state transition. 1583 // Notice that there is expected states for transiting to OPENING state, this is because SCP. 1584 // See the comments in regionOpening method for more details. 1585 // ============================================================================================ 1586 private static final State[] STATES_EXPECTED_ON_OPEN = { 1587 State.OPENING, // Normal case 1588 State.OPEN // Retrying 1589 }; 1590 1591 private static final State[] STATES_EXPECTED_ON_CLOSING = { 1592 State.OPEN, // Normal case 1593 State.CLOSING, // Retrying 1594 State.SPLITTING, // Offline the split parent 1595 State.MERGING // Offline the merge parents 1596 }; 1597 1598 private static final State[] STATES_EXPECTED_ON_CLOSED = { 1599 State.CLOSING, // Normal case 1600 State.CLOSED // Retrying 1601 }; 1602 1603 // This is for manually scheduled region assign, can add other states later if we find out other 1604 // usages 1605 private static final State[] STATES_EXPECTED_ON_ASSIGN = { State.CLOSED, State.OFFLINE }; 1606 1607 // We only allow unassign or move a region which is in OPEN state. 1608 private static final State[] STATES_EXPECTED_ON_UNASSIGN_OR_MOVE = { State.OPEN }; 1609 1610 // ============================================================================================ 1611 // Region Status update 1612 // Should only be called in TransitRegionStateProcedure(and related procedures), as the locking 1613 // and pre-assumptions are very tricky. 1614 // ============================================================================================ 1615 private void transitStateAndUpdate(RegionStateNode regionNode, RegionState.State newState, 1616 RegionState.State... expectedStates) throws IOException { 1617 RegionState.State state = regionNode.getState(); 1618 regionNode.transitionState(newState, expectedStates); 1619 boolean succ = false; 1620 try { 1621 regionStateStore.updateRegionLocation(regionNode); 1622 succ = true; 1623 } finally { 1624 if (!succ) { 1625 // revert 1626 regionNode.setState(state); 1627 } 1628 } 1629 } 1630 1631 // should be called within the synchronized block of RegionStateNode 1632 void regionOpening(RegionStateNode regionNode) throws IOException { 1633 // As in SCP, for performance reason, there is no TRSP attached with this region, we will not 1634 // update the region state, which means that the region could be in any state when we want to 1635 // assign it after a RS crash. So here we do not pass the expectedStates parameter. 1636 transitStateAndUpdate(regionNode, State.OPENING); 1637 regionStates.addRegionToServer(regionNode); 1638 // update the operation count metrics 1639 metrics.incrementOperationCounter(); 1640 } 1641 1642 // should be called under the RegionStateNode lock 1643 // The parameter 'giveUp' means whether we will try to open the region again, if it is true, then 1644 // we will persist the FAILED_OPEN state into hbase:meta. 1645 void regionFailedOpen(RegionStateNode regionNode, boolean giveUp) throws IOException { 1646 RegionState.State state = regionNode.getState(); 1647 ServerName regionLocation = regionNode.getRegionLocation(); 1648 if (giveUp) { 1649 regionNode.setState(State.FAILED_OPEN); 1650 regionNode.setRegionLocation(null); 1651 boolean succ = false; 1652 try { 1653 regionStateStore.updateRegionLocation(regionNode); 1654 succ = true; 1655 } finally { 1656 if (!succ) { 1657 // revert 1658 regionNode.setState(state); 1659 regionNode.setRegionLocation(regionLocation); 1660 } 1661 } 1662 } 1663 if (regionLocation != null) { 1664 regionStates.removeRegionFromServer(regionLocation, regionNode); 1665 } 1666 } 1667 1668 // should be called under the RegionStateNode lock 1669 void regionClosing(RegionStateNode regionNode) throws IOException { 1670 transitStateAndUpdate(regionNode, State.CLOSING, STATES_EXPECTED_ON_CLOSING); 1671 1672 RegionInfo hri = regionNode.getRegionInfo(); 1673 // Set meta has not initialized early. so people trying to create/edit tables will wait 1674 if (isMetaRegion(hri)) { 1675 setMetaAssigned(hri, false); 1676 } 1677 regionStates.addRegionToServer(regionNode); 1678 // update the operation count metrics 1679 metrics.incrementOperationCounter(); 1680 } 1681 1682 // for open and close, they will first be persist to the procedure store in 1683 // RegionRemoteProcedureBase. So here we will first change the in memory state as it is considered 1684 // as succeeded if the persistence to procedure store is succeeded, and then when the 1685 // RegionRemoteProcedureBase is woken up, we will persist the RegionStateNode to hbase:meta. 1686 1687 // should be called under the RegionStateNode lock 1688 void regionOpenedWithoutPersistingToMeta(RegionStateNode regionNode) throws IOException { 1689 regionNode.transitionState(State.OPEN, STATES_EXPECTED_ON_OPEN); 1690 RegionInfo regionInfo = regionNode.getRegionInfo(); 1691 regionStates.addRegionToServer(regionNode); 1692 regionStates.removeFromFailedOpen(regionInfo); 1693 } 1694 1695 // should be called under the RegionStateNode lock 1696 void regionClosedWithoutPersistingToMeta(RegionStateNode regionNode) throws IOException { 1697 ServerName regionLocation = regionNode.getRegionLocation(); 1698 regionNode.transitionState(State.CLOSED, STATES_EXPECTED_ON_CLOSED); 1699 regionNode.setRegionLocation(null); 1700 if (regionLocation != null) { 1701 regionNode.setLastHost(regionLocation); 1702 regionStates.removeRegionFromServer(regionLocation, regionNode); 1703 } 1704 } 1705 1706 // should be called under the RegionStateNode lock 1707 // for SCP 1708 public void regionClosedAbnormally(RegionStateNode regionNode) throws IOException { 1709 RegionState.State state = regionNode.getState(); 1710 ServerName regionLocation = regionNode.getRegionLocation(); 1711 regionNode.transitionState(State.ABNORMALLY_CLOSED); 1712 regionNode.setRegionLocation(null); 1713 boolean succ = false; 1714 try { 1715 regionStateStore.updateRegionLocation(regionNode); 1716 succ = true; 1717 } finally { 1718 if (!succ) { 1719 // revert 1720 regionNode.setState(state); 1721 regionNode.setRegionLocation(regionLocation); 1722 } 1723 } 1724 if (regionLocation != null) { 1725 regionNode.setLastHost(regionLocation); 1726 regionStates.removeRegionFromServer(regionLocation, regionNode); 1727 } 1728 } 1729 1730 void persistToMeta(RegionStateNode regionNode) throws IOException { 1731 regionStateStore.updateRegionLocation(regionNode); 1732 RegionInfo regionInfo = regionNode.getRegionInfo(); 1733 if (isMetaRegion(regionInfo) && regionNode.getState() == State.OPEN) { 1734 // Usually we'd set a table ENABLED at this stage but hbase:meta is ALWAYs enabled, it 1735 // can't be disabled -- so skip the RPC (besides... enabled is managed by TableStateManager 1736 // which is backed by hbase:meta... Avoid setting ENABLED to avoid having to update state 1737 // on table that contains state. 1738 setMetaAssigned(regionInfo, true); 1739 } 1740 } 1741 1742 // ============================================================================================ 1743 // The above methods can only be called in TransitRegionStateProcedure(and related procedures) 1744 // ============================================================================================ 1745 1746 public void markRegionAsSplit(final RegionInfo parent, final ServerName serverName, 1747 final RegionInfo daughterA, final RegionInfo daughterB) throws IOException { 1748 // Update hbase:meta. Parent will be marked offline and split up in hbase:meta. 1749 // The parent stays in regionStates until cleared when removed by CatalogJanitor. 1750 // Update its state in regionStates to it shows as offline and split when read 1751 // later figuring what regions are in a table and what are not: see 1752 // regionStates#getRegionsOfTable 1753 final RegionStateNode node = regionStates.getOrCreateRegionStateNode(parent); 1754 node.setState(State.SPLIT); 1755 final RegionStateNode nodeA = regionStates.getOrCreateRegionStateNode(daughterA); 1756 nodeA.setState(State.SPLITTING_NEW); 1757 final RegionStateNode nodeB = regionStates.getOrCreateRegionStateNode(daughterB); 1758 nodeB.setState(State.SPLITTING_NEW); 1759 1760 regionStateStore.splitRegion(parent, daughterA, daughterB, serverName); 1761 if (shouldAssignFavoredNodes(parent)) { 1762 List<ServerName> onlineServers = this.master.getServerManager().getOnlineServersList(); 1763 ((FavoredNodesPromoter)getBalancer()). 1764 generateFavoredNodesForDaughter(onlineServers, parent, daughterA, daughterB); 1765 } 1766 } 1767 1768 /** 1769 * When called here, the merge has happened. The merged regions have been 1770 * unassigned and the above markRegionClosed has been called on each so they have been 1771 * disassociated from a hosting Server. The merged region will be open after this call. The 1772 * merged regions are removed from hbase:meta below. Later they are deleted from the filesystem 1773 * by the catalog janitor running against hbase:meta. It notices when the merged region no 1774 * longer holds references to the old regions (References are deleted after a compaction 1775 * rewrites what the Reference points at but not until the archiver chore runs, are the 1776 * References removed). 1777 */ 1778 public void markRegionAsMerged(final RegionInfo child, final ServerName serverName, 1779 RegionInfo [] mergeParents) 1780 throws IOException { 1781 final RegionStateNode node = regionStates.getOrCreateRegionStateNode(child); 1782 node.setState(State.MERGED); 1783 for (RegionInfo ri: mergeParents) { 1784 regionStates.deleteRegion(ri); 1785 1786 } 1787 regionStateStore.mergeRegions(child, mergeParents, serverName); 1788 if (shouldAssignFavoredNodes(child)) { 1789 ((FavoredNodesPromoter)getBalancer()). 1790 generateFavoredNodesForMergedRegion(child, mergeParents); 1791 } 1792 } 1793 1794 /* 1795 * Favored nodes should be applied only when FavoredNodes balancer is configured and the region 1796 * belongs to a non-system table. 1797 */ 1798 private boolean shouldAssignFavoredNodes(RegionInfo region) { 1799 return this.shouldAssignRegionsWithFavoredNodes && 1800 FavoredNodesManager.isFavoredNodeApplicable(region); 1801 } 1802 1803 // ============================================================================================ 1804 // Assign Queue (Assign/Balance) 1805 // ============================================================================================ 1806 private final ArrayList<RegionStateNode> pendingAssignQueue = new ArrayList<RegionStateNode>(); 1807 private final ReentrantLock assignQueueLock = new ReentrantLock(); 1808 private final Condition assignQueueFullCond = assignQueueLock.newCondition(); 1809 1810 /** 1811 * Add the assign operation to the assignment queue. 1812 * The pending assignment operation will be processed, 1813 * and each region will be assigned by a server using the balancer. 1814 */ 1815 protected void queueAssign(final RegionStateNode regionNode) { 1816 regionNode.getProcedureEvent().suspend(); 1817 1818 // TODO: quick-start for meta and the other sys-tables? 1819 assignQueueLock.lock(); 1820 try { 1821 pendingAssignQueue.add(regionNode); 1822 if (regionNode.isSystemTable() || 1823 pendingAssignQueue.size() == 1 || 1824 pendingAssignQueue.size() >= assignDispatchWaitQueueMaxSize) { 1825 assignQueueFullCond.signal(); 1826 } 1827 } finally { 1828 assignQueueLock.unlock(); 1829 } 1830 } 1831 1832 private void startAssignmentThread() { 1833 // Get Server Thread name. Sometimes the Server is mocked so may not implement HasThread. 1834 // For example, in tests. 1835 String name = master instanceof HasThread? ((HasThread)master).getName(): 1836 master.getServerName().toShortString(); 1837 assignThread = new Thread(name) { 1838 @Override 1839 public void run() { 1840 while (isRunning()) { 1841 processAssignQueue(); 1842 } 1843 pendingAssignQueue.clear(); 1844 } 1845 }; 1846 assignThread.setDaemon(true); 1847 assignThread.start(); 1848 } 1849 1850 private void stopAssignmentThread() { 1851 assignQueueSignal(); 1852 try { 1853 while (assignThread.isAlive()) { 1854 assignQueueSignal(); 1855 assignThread.join(250); 1856 } 1857 } catch (InterruptedException e) { 1858 LOG.warn("join interrupted", e); 1859 Thread.currentThread().interrupt(); 1860 } 1861 } 1862 1863 private void assignQueueSignal() { 1864 assignQueueLock.lock(); 1865 try { 1866 assignQueueFullCond.signal(); 1867 } finally { 1868 assignQueueLock.unlock(); 1869 } 1870 } 1871 1872 @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP") 1873 private HashMap<RegionInfo, RegionStateNode> waitOnAssignQueue() { 1874 HashMap<RegionInfo, RegionStateNode> regions = null; 1875 1876 assignQueueLock.lock(); 1877 try { 1878 if (pendingAssignQueue.isEmpty() && isRunning()) { 1879 assignQueueFullCond.await(); 1880 } 1881 1882 if (!isRunning()) return null; 1883 assignQueueFullCond.await(assignDispatchWaitMillis, TimeUnit.MILLISECONDS); 1884 regions = new HashMap<RegionInfo, RegionStateNode>(pendingAssignQueue.size()); 1885 for (RegionStateNode regionNode: pendingAssignQueue) { 1886 regions.put(regionNode.getRegionInfo(), regionNode); 1887 } 1888 pendingAssignQueue.clear(); 1889 } catch (InterruptedException e) { 1890 LOG.warn("got interrupted ", e); 1891 Thread.currentThread().interrupt(); 1892 } finally { 1893 assignQueueLock.unlock(); 1894 } 1895 return regions; 1896 } 1897 1898 private void processAssignQueue() { 1899 final HashMap<RegionInfo, RegionStateNode> regions = waitOnAssignQueue(); 1900 if (regions == null || regions.size() == 0 || !isRunning()) { 1901 return; 1902 } 1903 1904 if (LOG.isTraceEnabled()) { 1905 LOG.trace("PROCESS ASSIGN QUEUE regionCount=" + regions.size()); 1906 } 1907 1908 // TODO: Optimize balancer. pass a RegionPlan? 1909 final HashMap<RegionInfo, ServerName> retainMap = new HashMap<>(); 1910 final List<RegionInfo> userHRIs = new ArrayList<>(regions.size()); 1911 // Regions for system tables requiring reassignment 1912 final List<RegionInfo> systemHRIs = new ArrayList<>(); 1913 for (RegionStateNode regionStateNode: regions.values()) { 1914 boolean sysTable = regionStateNode.isSystemTable(); 1915 final List<RegionInfo> hris = sysTable? systemHRIs: userHRIs; 1916 if (regionStateNode.getRegionLocation() != null) { 1917 retainMap.put(regionStateNode.getRegionInfo(), regionStateNode.getRegionLocation()); 1918 } else { 1919 hris.add(regionStateNode.getRegionInfo()); 1920 } 1921 } 1922 1923 // TODO: connect with the listener to invalidate the cache 1924 1925 // TODO use events 1926 List<ServerName> servers = master.getServerManager().createDestinationServersList(); 1927 for (int i = 0; servers.size() < 1; ++i) { 1928 // Report every fourth time around this loop; try not to flood log. 1929 if (i % 4 == 0) { 1930 LOG.warn("No servers available; cannot place " + regions.size() + " unassigned regions."); 1931 } 1932 1933 if (!isRunning()) { 1934 LOG.debug("Stopped! Dropping assign of " + regions.size() + " queued regions."); 1935 return; 1936 } 1937 Threads.sleep(250); 1938 servers = master.getServerManager().createDestinationServersList(); 1939 } 1940 1941 if (!systemHRIs.isEmpty()) { 1942 // System table regions requiring reassignment are present, get region servers 1943 // not available for system table regions 1944 final List<ServerName> excludeServers = getExcludedServersForSystemTable(); 1945 List<ServerName> serversForSysTables = servers.stream() 1946 .filter(s -> !excludeServers.contains(s)).collect(Collectors.toList()); 1947 if (serversForSysTables.isEmpty()) { 1948 LOG.warn("Filtering old server versions and the excluded produced an empty set; " + 1949 "instead considering all candidate servers!"); 1950 } 1951 LOG.debug("Processing assignQueue; systemServersCount=" + serversForSysTables.size() + 1952 ", allServersCount=" + servers.size()); 1953 processAssignmentPlans(regions, null, systemHRIs, 1954 serversForSysTables.isEmpty() && !containsBogusAssignments(regions, systemHRIs) ? 1955 servers: serversForSysTables); 1956 } 1957 1958 processAssignmentPlans(regions, retainMap, userHRIs, servers); 1959 } 1960 1961 private boolean containsBogusAssignments(Map<RegionInfo, RegionStateNode> regions, 1962 List<RegionInfo> hirs) { 1963 for (RegionInfo ri : hirs) { 1964 if (regions.get(ri).getRegionLocation() != null && 1965 regions.get(ri).getRegionLocation().equals(LoadBalancer.BOGUS_SERVER_NAME)){ 1966 return true; 1967 } 1968 } 1969 return false; 1970 } 1971 1972 private void processAssignmentPlans(final HashMap<RegionInfo, RegionStateNode> regions, 1973 final HashMap<RegionInfo, ServerName> retainMap, final List<RegionInfo> hris, 1974 final List<ServerName> servers) { 1975 boolean isTraceEnabled = LOG.isTraceEnabled(); 1976 if (isTraceEnabled) { 1977 LOG.trace("Available servers count=" + servers.size() + ": " + servers); 1978 } 1979 1980 final LoadBalancer balancer = getBalancer(); 1981 // ask the balancer where to place regions 1982 if (retainMap != null && !retainMap.isEmpty()) { 1983 if (isTraceEnabled) { 1984 LOG.trace("retain assign regions=" + retainMap); 1985 } 1986 try { 1987 acceptPlan(regions, balancer.retainAssignment(retainMap, servers)); 1988 } catch (HBaseIOException e) { 1989 LOG.warn("unable to retain assignment", e); 1990 addToPendingAssignment(regions, retainMap.keySet()); 1991 } 1992 } 1993 1994 // TODO: Do we need to split retain and round-robin? 1995 // the retain seems to fallback to round-robin/random if the region is not in the map. 1996 if (!hris.isEmpty()) { 1997 Collections.sort(hris, RegionInfo.COMPARATOR); 1998 if (isTraceEnabled) { 1999 LOG.trace("round robin regions=" + hris); 2000 } 2001 try { 2002 acceptPlan(regions, balancer.roundRobinAssignment(hris, servers)); 2003 } catch (HBaseIOException e) { 2004 LOG.warn("unable to round-robin assignment", e); 2005 addToPendingAssignment(regions, hris); 2006 } 2007 } 2008 } 2009 2010 private void acceptPlan(final HashMap<RegionInfo, RegionStateNode> regions, 2011 final Map<ServerName, List<RegionInfo>> plan) throws HBaseIOException { 2012 final ProcedureEvent<?>[] events = new ProcedureEvent[regions.size()]; 2013 final long st = System.currentTimeMillis(); 2014 2015 if (plan == null) { 2016 throw new HBaseIOException("unable to compute plans for regions=" + regions.size()); 2017 } 2018 2019 if (plan.isEmpty()) return; 2020 2021 int evcount = 0; 2022 for (Map.Entry<ServerName, List<RegionInfo>> entry: plan.entrySet()) { 2023 final ServerName server = entry.getKey(); 2024 for (RegionInfo hri: entry.getValue()) { 2025 final RegionStateNode regionNode = regions.get(hri); 2026 regionNode.setRegionLocation(server); 2027 if (server.equals(LoadBalancer.BOGUS_SERVER_NAME) && regionNode.isSystemTable()) { 2028 assignQueueLock.lock(); 2029 try { 2030 pendingAssignQueue.add(regionNode); 2031 } finally { 2032 assignQueueLock.unlock(); 2033 } 2034 }else { 2035 events[evcount++] = regionNode.getProcedureEvent(); 2036 } 2037 } 2038 } 2039 ProcedureEvent.wakeEvents(getProcedureScheduler(), events); 2040 2041 final long et = System.currentTimeMillis(); 2042 if (LOG.isTraceEnabled()) { 2043 LOG.trace("ASSIGN ACCEPT " + events.length + " -> " + 2044 StringUtils.humanTimeDiff(et - st)); 2045 } 2046 } 2047 2048 private void addToPendingAssignment(final HashMap<RegionInfo, RegionStateNode> regions, 2049 final Collection<RegionInfo> pendingRegions) { 2050 assignQueueLock.lock(); 2051 try { 2052 for (RegionInfo hri: pendingRegions) { 2053 pendingAssignQueue.add(regions.get(hri)); 2054 } 2055 } finally { 2056 assignQueueLock.unlock(); 2057 } 2058 } 2059 2060 /** 2061 * Get a list of servers that this region cannot be assigned to. 2062 * For system tables, we must assign them to a server with highest version. 2063 */ 2064 public List<ServerName> getExcludedServersForSystemTable() { 2065 // TODO: This should be a cached list kept by the ServerManager rather than calculated on each 2066 // move or system region assign. The RegionServerTracker keeps list of online Servers with 2067 // RegionServerInfo that includes Version. 2068 List<Pair<ServerName, String>> serverList = master.getServerManager().getOnlineServersList() 2069 .stream() 2070 .map((s)->new Pair<>(s, master.getRegionServerVersion(s))) 2071 .collect(Collectors.toList()); 2072 if (serverList.isEmpty()) { 2073 return Collections.emptyList(); 2074 } 2075 String highestVersion = Collections.max(serverList, 2076 (o1, o2) -> VersionInfo.compareVersion(o1.getSecond(), o2.getSecond())).getSecond(); 2077 return serverList.stream() 2078 .filter((p)->!p.getSecond().equals(highestVersion)) 2079 .map(Pair::getFirst) 2080 .collect(Collectors.toList()); 2081 } 2082 2083 @VisibleForTesting 2084 MasterServices getMaster() { 2085 return master; 2086 } 2087 2088 /** 2089 * @return a snapshot of rsReports 2090 */ 2091 public Map<ServerName, Set<byte[]>> getRSReports() { 2092 Map<ServerName, Set<byte[]>> rsReportsSnapshot = new HashMap<>(); 2093 synchronized (rsReports) { 2094 rsReports.entrySet().forEach(e -> rsReportsSnapshot.put(e.getKey(), e.getValue())); 2095 } 2096 return rsReportsSnapshot; 2097 } 2098 2099 /** 2100 * Provide regions state count for given table. 2101 * e.g howmany regions of give table are opened/closed/rit etc 2102 * 2103 * @param tableName TableName 2104 * @return region states count 2105 */ 2106 public RegionStatesCount getRegionStatesCount(TableName tableName) { 2107 int openRegionsCount = 0; 2108 int closedRegionCount = 0; 2109 int ritCount = 0; 2110 int splitRegionCount = 0; 2111 int totalRegionCount = 0; 2112 if (!isTableDisabled(tableName)) { 2113 final List<RegionState> states = regionStates.getTableRegionStates(tableName); 2114 for (RegionState regionState : states) { 2115 if (regionState.isOpened()) { 2116 openRegionsCount++; 2117 } else if (regionState.isClosed()) { 2118 closedRegionCount++; 2119 } else if (regionState.isSplit()) { 2120 splitRegionCount++; 2121 } 2122 } 2123 totalRegionCount = states.size(); 2124 ritCount = totalRegionCount - openRegionsCount - splitRegionCount; 2125 } 2126 return new RegionStatesCount.RegionStatesCountBuilder() 2127 .setOpenRegions(openRegionsCount) 2128 .setClosedRegions(closedRegionCount) 2129 .setSplitRegions(splitRegionCount) 2130 .setRegionsInTransition(ritCount) 2131 .setTotalRegions(totalRegionCount) 2132 .build(); 2133 } 2134 2135}