001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.assignment; 019 020import edu.umd.cs.findbugs.annotations.NonNull; 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Collection; 024import java.util.Collections; 025import java.util.HashMap; 026import java.util.HashSet; 027import java.util.List; 028import java.util.Map; 029import java.util.Set; 030import java.util.concurrent.CompletableFuture; 031import java.util.concurrent.Future; 032import java.util.concurrent.TimeUnit; 033import java.util.concurrent.atomic.AtomicBoolean; 034import java.util.concurrent.locks.Condition; 035import java.util.concurrent.locks.ReentrantLock; 036import java.util.function.Consumer; 037import java.util.stream.Collectors; 038import java.util.stream.Stream; 039import org.apache.hadoop.conf.Configuration; 040import org.apache.hadoop.hbase.CatalogFamilyFormat; 041import org.apache.hadoop.hbase.DoNotRetryIOException; 042import org.apache.hadoop.hbase.HBaseIOException; 043import org.apache.hadoop.hbase.HConstants; 044import org.apache.hadoop.hbase.PleaseHoldException; 045import org.apache.hadoop.hbase.ServerName; 046import org.apache.hadoop.hbase.TableName; 047import org.apache.hadoop.hbase.UnknownRegionException; 048import org.apache.hadoop.hbase.client.DoNotRetryRegionException; 049import org.apache.hadoop.hbase.client.MasterSwitchType; 050import org.apache.hadoop.hbase.client.RegionInfo; 051import org.apache.hadoop.hbase.client.RegionInfoBuilder; 052import org.apache.hadoop.hbase.client.RegionReplicaUtil; 053import org.apache.hadoop.hbase.client.RegionStatesCount; 054import org.apache.hadoop.hbase.client.Result; 055import org.apache.hadoop.hbase.client.ResultScanner; 056import org.apache.hadoop.hbase.client.Scan; 057import org.apache.hadoop.hbase.client.TableDescriptor; 058import org.apache.hadoop.hbase.client.TableState; 059import org.apache.hadoop.hbase.exceptions.UnexpectedStateException; 060import org.apache.hadoop.hbase.favored.FavoredNodesManager; 061import org.apache.hadoop.hbase.favored.FavoredNodesPromoter; 062import org.apache.hadoop.hbase.master.LoadBalancer; 063import org.apache.hadoop.hbase.master.MasterServices; 064import org.apache.hadoop.hbase.master.MetricsAssignmentManager; 065import org.apache.hadoop.hbase.master.RegionPlan; 066import org.apache.hadoop.hbase.master.RegionState; 067import org.apache.hadoop.hbase.master.RegionState.State; 068import org.apache.hadoop.hbase.master.ServerManager; 069import org.apache.hadoop.hbase.master.TableStateManager; 070import org.apache.hadoop.hbase.master.balancer.FavoredStochasticBalancer; 071import org.apache.hadoop.hbase.master.procedure.HBCKServerCrashProcedure; 072import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 073import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler; 074import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait; 075import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure; 076import org.apache.hadoop.hbase.master.procedure.TruncateRegionProcedure; 077import org.apache.hadoop.hbase.master.region.MasterRegion; 078import org.apache.hadoop.hbase.procedure2.Procedure; 079import org.apache.hadoop.hbase.procedure2.ProcedureEvent; 080import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; 081import org.apache.hadoop.hbase.procedure2.ProcedureInMemoryChore; 082import org.apache.hadoop.hbase.procedure2.util.StringUtils; 083import org.apache.hadoop.hbase.regionserver.SequenceId; 084import org.apache.hadoop.hbase.rsgroup.RSGroupBasedLoadBalancer; 085import org.apache.hadoop.hbase.util.Bytes; 086import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 087import org.apache.hadoop.hbase.util.FutureUtils; 088import org.apache.hadoop.hbase.util.Pair; 089import org.apache.hadoop.hbase.util.Threads; 090import org.apache.hadoop.hbase.util.VersionInfo; 091import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; 092import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 093import org.apache.yetus.audience.InterfaceAudience; 094import org.apache.zookeeper.KeeperException; 095import org.slf4j.Logger; 096import org.slf4j.LoggerFactory; 097 098import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 099import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition; 100import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; 101import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest; 102import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse; 103 104/** 105 * The AssignmentManager is the coordinator for region assign/unassign operations. 106 * <ul> 107 * <li>In-memory states of regions and servers are stored in {@link RegionStates}.</li> 108 * <li>hbase:meta state updates are handled by {@link RegionStateStore}.</li> 109 * </ul> 110 * Regions are created by CreateTable, Split, Merge. Regions are deleted by DeleteTable, Split, 111 * Merge. Assigns are triggered by CreateTable, EnableTable, Split, Merge, ServerCrash. Unassigns 112 * are triggered by DisableTable, Split, Merge 113 */ 114@InterfaceAudience.Private 115public class AssignmentManager { 116 private static final Logger LOG = LoggerFactory.getLogger(AssignmentManager.class); 117 118 // TODO: AMv2 119 // - handle region migration from hbase1 to hbase2. 120 // - handle sys table assignment first (e.g. acl, namespace) 121 // - handle table priorities 122 // - If ServerBusyException trying to update hbase:meta, we abort the Master 123 // See updateRegionLocation in RegionStateStore. 124 // 125 // See also 126 // https://docs.google.com/document/d/1eVKa7FHdeoJ1-9o8yZcOTAQbv0u0bblBlCCzVSIn69g/edit#heading=h.ystjyrkbtoq5 127 // for other TODOs. 128 129 public static final String BOOTSTRAP_THREAD_POOL_SIZE_CONF_KEY = 130 "hbase.assignment.bootstrap.thread.pool.size"; 131 132 public static final String ASSIGN_DISPATCH_WAIT_MSEC_CONF_KEY = 133 "hbase.assignment.dispatch.wait.msec"; 134 private static final int DEFAULT_ASSIGN_DISPATCH_WAIT_MSEC = 150; 135 136 public static final String ASSIGN_DISPATCH_WAITQ_MAX_CONF_KEY = 137 "hbase.assignment.dispatch.wait.queue.max.size"; 138 private static final int DEFAULT_ASSIGN_DISPATCH_WAITQ_MAX = 100; 139 140 public static final String RIT_CHORE_INTERVAL_MSEC_CONF_KEY = 141 "hbase.assignment.rit.chore.interval.msec"; 142 private static final int DEFAULT_RIT_CHORE_INTERVAL_MSEC = 60 * 1000; 143 144 public static final String DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC_CONF_KEY = 145 "hbase.assignment.dead.region.metric.chore.interval.msec"; 146 private static final int DEFAULT_DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC = 120 * 1000; 147 148 public static final String ASSIGN_MAX_ATTEMPTS = "hbase.assignment.maximum.attempts"; 149 private static final int DEFAULT_ASSIGN_MAX_ATTEMPTS = Integer.MAX_VALUE; 150 151 public static final String ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS = 152 "hbase.assignment.retry.immediately.maximum.attempts"; 153 private static final int DEFAULT_ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS = 3; 154 155 /** Region in Transition metrics threshold time */ 156 public static final String METRICS_RIT_STUCK_WARNING_THRESHOLD = 157 "hbase.metrics.rit.stuck.warning.threshold"; 158 private static final int DEFAULT_RIT_STUCK_WARNING_THRESHOLD = 60 * 1000; 159 public static final String UNEXPECTED_STATE_REGION = "Unexpected state for "; 160 161 public static final String FORCE_REGION_RETAINMENT = "hbase.master.scp.retain.assignment.force"; 162 163 public static final boolean DEFAULT_FORCE_REGION_RETAINMENT = false; 164 165 /** The wait time in millis before checking again if the region's previous RS is back online */ 166 public static final String FORCE_REGION_RETAINMENT_WAIT_INTERVAL = 167 "hbase.master.scp.retain.assignment.force.wait-interval"; 168 169 public static final long DEFAULT_FORCE_REGION_RETAINMENT_WAIT_INTERVAL = 50; 170 171 /** 172 * The number of times to check if the region's previous RS is back online, before giving up and 173 * proceeding with assignment on a new RS 174 */ 175 public static final String FORCE_REGION_RETAINMENT_RETRIES = 176 "hbase.master.scp.retain.assignment.force.retries"; 177 178 public static final int DEFAULT_FORCE_REGION_RETAINMENT_RETRIES = 600; 179 180 private final ProcedureEvent<?> metaAssignEvent = new ProcedureEvent<>("meta assign"); 181 private final ProcedureEvent<?> metaLoadEvent = new ProcedureEvent<>("meta load"); 182 183 private final MetricsAssignmentManager metrics; 184 private final RegionInTransitionChore ritChore; 185 private final DeadServerMetricRegionChore deadMetricChore; 186 private final MasterServices master; 187 188 private final AtomicBoolean running = new AtomicBoolean(false); 189 private final RegionStates regionStates = new RegionStates(); 190 private final RegionStateStore regionStateStore; 191 192 /** 193 * When the operator uses this configuration option, any version between the current cluster 194 * version and the value of "hbase.min.version.move.system.tables" does not trigger any 195 * auto-region movement. Auto-region movement here refers to auto-migration of system table 196 * regions to newer server versions. It is assumed that the configured range of versions does not 197 * require special handling of moving system table regions to higher versioned RegionServer. This 198 * auto-migration is done by {@link #checkIfShouldMoveSystemRegionAsync()}. Example: Let's assume 199 * the cluster is on version 1.4.0 and we have set "hbase.min.version.move.system.tables" as 200 * "2.0.0". Now if we upgrade one RegionServer on 1.4.0 cluster to 1.6.0 (< 2.0.0), then 201 * AssignmentManager will not move hbase:meta, hbase:namespace and other system table regions to 202 * newly brought up RegionServer 1.6.0 as part of auto-migration. However, if we upgrade one 203 * RegionServer on 1.4.0 cluster to 2.2.0 (> 2.0.0), then AssignmentManager will move all system 204 * table regions to newly brought up RegionServer 2.2.0 as part of auto-migration done by 205 * {@link #checkIfShouldMoveSystemRegionAsync()}. "hbase.min.version.move.system.tables" is 206 * introduced as part of HBASE-22923. 207 */ 208 private final String minVersionToMoveSysTables; 209 210 private static final String MIN_VERSION_MOVE_SYS_TABLES_CONFIG = 211 "hbase.min.version.move.system.tables"; 212 private static final String DEFAULT_MIN_VERSION_MOVE_SYS_TABLES_CONFIG = ""; 213 214 private final Map<ServerName, Set<byte[]>> rsReports = new HashMap<>(); 215 216 private final boolean shouldAssignRegionsWithFavoredNodes; 217 private final int assignDispatchWaitQueueMaxSize; 218 private final int assignDispatchWaitMillis; 219 private final int assignMaxAttempts; 220 private final int assignRetryImmediatelyMaxAttempts; 221 222 private final MasterRegion masterRegion; 223 224 private final Object checkIfShouldMoveSystemRegionLock = new Object(); 225 226 private Thread assignThread; 227 228 private final boolean forceRegionRetainment; 229 230 private final long forceRegionRetainmentWaitInterval; 231 232 private final int forceRegionRetainmentRetries; 233 234 public AssignmentManager(MasterServices master, MasterRegion masterRegion) { 235 this(master, masterRegion, new RegionStateStore(master, masterRegion)); 236 } 237 238 AssignmentManager(MasterServices master, MasterRegion masterRegion, RegionStateStore stateStore) { 239 this.master = master; 240 this.regionStateStore = stateStore; 241 this.metrics = new MetricsAssignmentManager(); 242 this.masterRegion = masterRegion; 243 244 final Configuration conf = master.getConfiguration(); 245 246 // Only read favored nodes if using the favored nodes load balancer. 247 this.shouldAssignRegionsWithFavoredNodes = FavoredStochasticBalancer.class 248 .isAssignableFrom(conf.getClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, Object.class)); 249 250 this.assignDispatchWaitMillis = 251 conf.getInt(ASSIGN_DISPATCH_WAIT_MSEC_CONF_KEY, DEFAULT_ASSIGN_DISPATCH_WAIT_MSEC); 252 this.assignDispatchWaitQueueMaxSize = 253 conf.getInt(ASSIGN_DISPATCH_WAITQ_MAX_CONF_KEY, DEFAULT_ASSIGN_DISPATCH_WAITQ_MAX); 254 255 this.assignMaxAttempts = 256 Math.max(1, conf.getInt(ASSIGN_MAX_ATTEMPTS, DEFAULT_ASSIGN_MAX_ATTEMPTS)); 257 this.assignRetryImmediatelyMaxAttempts = conf.getInt(ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS, 258 DEFAULT_ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS); 259 260 int ritChoreInterval = 261 conf.getInt(RIT_CHORE_INTERVAL_MSEC_CONF_KEY, DEFAULT_RIT_CHORE_INTERVAL_MSEC); 262 this.ritChore = new RegionInTransitionChore(ritChoreInterval); 263 264 int deadRegionChoreInterval = conf.getInt(DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC_CONF_KEY, 265 DEFAULT_DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC); 266 if (deadRegionChoreInterval > 0) { 267 this.deadMetricChore = new DeadServerMetricRegionChore(deadRegionChoreInterval); 268 } else { 269 this.deadMetricChore = null; 270 } 271 minVersionToMoveSysTables = 272 conf.get(MIN_VERSION_MOVE_SYS_TABLES_CONFIG, DEFAULT_MIN_VERSION_MOVE_SYS_TABLES_CONFIG); 273 274 forceRegionRetainment = 275 conf.getBoolean(FORCE_REGION_RETAINMENT, DEFAULT_FORCE_REGION_RETAINMENT); 276 forceRegionRetainmentWaitInterval = conf.getLong(FORCE_REGION_RETAINMENT_WAIT_INTERVAL, 277 DEFAULT_FORCE_REGION_RETAINMENT_WAIT_INTERVAL); 278 forceRegionRetainmentRetries = 279 conf.getInt(FORCE_REGION_RETAINMENT_RETRIES, DEFAULT_FORCE_REGION_RETAINMENT_RETRIES); 280 } 281 282 private void mirrorMetaLocations() throws IOException, KeeperException { 283 // For compatibility, mirror the meta region state to zookeeper 284 // And we still need to use zookeeper to publish the meta region locations to region 285 // server, so they can serve as ClientMetaService 286 ZKWatcher zk = master.getZooKeeper(); 287 if (zk == null || !zk.getRecoverableZooKeeper().getState().isAlive()) { 288 // this is possible in tests, we do not provide a zk watcher or the zk watcher has been closed 289 return; 290 } 291 Collection<RegionStateNode> metaStates = regionStates.getRegionStateNodes(); 292 for (RegionStateNode metaState : metaStates) { 293 MetaTableLocator.setMetaLocation(zk, metaState.getRegionLocation(), 294 metaState.getRegionInfo().getReplicaId(), metaState.getState()); 295 } 296 int replicaCount = metaStates.size(); 297 // remove extra mirror locations 298 for (String znode : zk.getMetaReplicaNodes()) { 299 int replicaId = zk.getZNodePaths().getMetaReplicaIdFromZNode(znode); 300 if (replicaId >= replicaCount) { 301 MetaTableLocator.deleteMetaLocation(zk, replicaId); 302 } 303 } 304 } 305 306 public void start() throws IOException, KeeperException { 307 if (!running.compareAndSet(false, true)) { 308 return; 309 } 310 311 LOG.trace("Starting assignment manager"); 312 313 // Start the Assignment Thread 314 startAssignmentThread(); 315 // load meta region states. 316 // here we are still in the early steps of active master startup. There is only one thread(us) 317 // can access AssignmentManager and create region node, so here we do not need to lock the 318 // region node. 319 try (ResultScanner scanner = 320 masterRegion.getScanner(new Scan().addFamily(HConstants.CATALOG_FAMILY))) { 321 for (;;) { 322 Result result = scanner.next(); 323 if (result == null) { 324 break; 325 } 326 RegionStateStore 327 .visitMetaEntry((r, regionInfo, state, regionLocation, lastHost, openSeqNum) -> { 328 RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo); 329 regionNode.setState(state); 330 regionNode.setLastHost(lastHost); 331 regionNode.setRegionLocation(regionLocation); 332 regionNode.setOpenSeqNum(openSeqNum); 333 if (regionNode.getProcedure() != null) { 334 regionNode.getProcedure().stateLoaded(this, regionNode); 335 } 336 if (regionLocation != null) { 337 // TODO: this could lead to some orphan server state nodes, as it is possible that the 338 // region server is already dead and its SCP has already finished but we have 339 // persisted an opening state on this region server. Finally the TRSP will assign the 340 // region to another region server, so it will not cause critical problems, just waste 341 // some memory as no one will try to cleanup these orphan server state nodes. 342 regionStates.createServer(regionLocation); 343 regionStates.addRegionToServer(regionNode); 344 } 345 if (RegionReplicaUtil.isDefaultReplica(regionInfo.getReplicaId())) { 346 setMetaAssigned(regionInfo, state == State.OPEN); 347 } 348 LOG.debug("Loaded hbase:meta {}", regionNode); 349 }, result); 350 } 351 } 352 mirrorMetaLocations(); 353 } 354 355 /** 356 * Create RegionStateNode based on the TRSP list, and attach the TRSP to the RegionStateNode. 357 * <p> 358 * This is used to restore the RIT region list, so we do not need to restore it in the loadingMeta 359 * method below. And it is also very important as now before submitting a TRSP, we need to attach 360 * it to the RegionStateNode, which acts like a guard, so we need to restore this information at 361 * the very beginning, before we start processing any procedures. 362 */ 363 public void setupRIT(List<TransitRegionStateProcedure> procs) { 364 procs.forEach(proc -> { 365 RegionInfo regionInfo = proc.getRegion(); 366 RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo); 367 TransitRegionStateProcedure existingProc = regionNode.getProcedure(); 368 if (existingProc != null) { 369 // This is possible, as we will detach the procedure from the RSN before we 370 // actually finish the procedure. This is because that, we will detach the TRSP from the RSN 371 // during execution, at that time, the procedure has not been marked as done in the pv2 372 // framework yet, so it is possible that we schedule a new TRSP immediately and when 373 // arriving here, we will find out that there are multiple TRSPs for the region. But we can 374 // make sure that, only the last one can take the charge, the previous ones should have all 375 // been finished already. So here we will compare the proc id, the greater one will win. 376 if (existingProc.getProcId() < proc.getProcId()) { 377 // the new one wins, unset and set it to the new one below 378 regionNode.unsetProcedure(existingProc); 379 } else { 380 // the old one wins, skip 381 return; 382 } 383 } 384 LOG.info("Attach {} to {} to restore RIT", proc, regionNode); 385 regionNode.setProcedure(proc); 386 }); 387 } 388 389 public void stop() { 390 if (!running.compareAndSet(true, false)) { 391 return; 392 } 393 394 LOG.info("Stopping assignment manager"); 395 396 // The AM is started before the procedure executor, 397 // but the actual work will be loaded/submitted only once we have the executor 398 final boolean hasProcExecutor = master.getMasterProcedureExecutor() != null; 399 400 // Remove the RIT chore 401 if (hasProcExecutor) { 402 master.getMasterProcedureExecutor().removeChore(this.ritChore); 403 if (this.deadMetricChore != null) { 404 master.getMasterProcedureExecutor().removeChore(this.deadMetricChore); 405 } 406 } 407 408 // Stop the Assignment Thread 409 stopAssignmentThread(); 410 411 // Stop the RegionStateStore 412 regionStates.clear(); 413 414 // Update meta events (for testing) 415 if (hasProcExecutor) { 416 metaLoadEvent.suspend(); 417 for (RegionInfo hri : getMetaRegionSet()) { 418 setMetaAssigned(hri, false); 419 } 420 } 421 } 422 423 public boolean isRunning() { 424 return running.get(); 425 } 426 427 public Configuration getConfiguration() { 428 return master.getConfiguration(); 429 } 430 431 public MetricsAssignmentManager getAssignmentManagerMetrics() { 432 return metrics; 433 } 434 435 private LoadBalancer getBalancer() { 436 return master.getLoadBalancer(); 437 } 438 439 private FavoredNodesPromoter getFavoredNodePromoter() { 440 return (FavoredNodesPromoter) ((RSGroupBasedLoadBalancer) master.getLoadBalancer()) 441 .getInternalBalancer(); 442 } 443 444 private MasterProcedureEnv getProcedureEnvironment() { 445 return master.getMasterProcedureExecutor().getEnvironment(); 446 } 447 448 private MasterProcedureScheduler getProcedureScheduler() { 449 return getProcedureEnvironment().getProcedureScheduler(); 450 } 451 452 int getAssignMaxAttempts() { 453 return assignMaxAttempts; 454 } 455 456 public boolean isForceRegionRetainment() { 457 return forceRegionRetainment; 458 } 459 460 public long getForceRegionRetainmentWaitInterval() { 461 return forceRegionRetainmentWaitInterval; 462 } 463 464 public int getForceRegionRetainmentRetries() { 465 return forceRegionRetainmentRetries; 466 } 467 468 int getAssignRetryImmediatelyMaxAttempts() { 469 return assignRetryImmediatelyMaxAttempts; 470 } 471 472 public RegionStates getRegionStates() { 473 return regionStates; 474 } 475 476 /** 477 * Returns the regions hosted by the specified server. 478 * <p/> 479 * Notice that, for SCP, after we submit the SCP, no one can change the region list for the 480 * ServerStateNode so we do not need any locks here. And for other usage, this can only give you a 481 * snapshot of the current region list for this server, which means, right after you get the 482 * region list, new regions may be moved to this server or some regions may be moved out from this 483 * server, so you should not use it critically if you need strong consistency. 484 */ 485 public List<RegionInfo> getRegionsOnServer(ServerName serverName) { 486 ServerStateNode serverInfo = regionStates.getServerNode(serverName); 487 if (serverInfo == null) { 488 return Collections.emptyList(); 489 } 490 return serverInfo.getRegionInfoList(); 491 } 492 493 private RegionInfo getRegionInfo(RegionStateNode rsn) { 494 if (rsn.isSplit() && !rsn.getRegionInfo().isSplit()) { 495 // see the comments in markRegionAsSplit on why we need to do this converting. 496 return RegionInfoBuilder.newBuilder(rsn.getRegionInfo()).setSplit(true).setOffline(true) 497 .build(); 498 } else { 499 return rsn.getRegionInfo(); 500 } 501 } 502 503 private Stream<RegionStateNode> getRegionStateNodes(TableName tableName, 504 boolean excludeOfflinedSplitParents) { 505 Stream<RegionStateNode> stream = regionStates.getTableRegionStateNodes(tableName).stream(); 506 if (excludeOfflinedSplitParents) { 507 return stream.filter(rsn -> !rsn.isSplit()); 508 } else { 509 return stream; 510 } 511 } 512 513 public List<RegionInfo> getTableRegions(TableName tableName, 514 boolean excludeOfflinedSplitParents) { 515 return getRegionStateNodes(tableName, excludeOfflinedSplitParents).map(this::getRegionInfo) 516 .collect(Collectors.toList()); 517 } 518 519 public List<Pair<RegionInfo, ServerName>> getTableRegionsAndLocations(TableName tableName, 520 boolean excludeOfflinedSplitParents) { 521 return getRegionStateNodes(tableName, excludeOfflinedSplitParents) 522 .map(rsn -> Pair.newPair(getRegionInfo(rsn), rsn.getRegionLocation())) 523 .collect(Collectors.toList()); 524 } 525 526 public RegionStateStore getRegionStateStore() { 527 return regionStateStore; 528 } 529 530 public List<ServerName> getFavoredNodes(final RegionInfo regionInfo) { 531 return this.shouldAssignRegionsWithFavoredNodes 532 ? getFavoredNodePromoter().getFavoredNodes(regionInfo) 533 : ServerName.EMPTY_SERVER_LIST; 534 } 535 536 // ============================================================================================ 537 // Table State Manager helpers 538 // ============================================================================================ 539 private TableStateManager getTableStateManager() { 540 return master.getTableStateManager(); 541 } 542 543 private boolean isTableEnabled(final TableName tableName) { 544 return getTableStateManager().isTableState(tableName, TableState.State.ENABLED); 545 } 546 547 private boolean isTableDisabled(final TableName tableName) { 548 return getTableStateManager().isTableState(tableName, TableState.State.DISABLED, 549 TableState.State.DISABLING); 550 } 551 552 // ============================================================================================ 553 // META Helpers 554 // ============================================================================================ 555 private boolean isMetaRegion(final RegionInfo regionInfo) { 556 return regionInfo.isMetaRegion(); 557 } 558 559 public boolean isMetaRegion(final byte[] regionName) { 560 return getMetaRegionFromName(regionName) != null; 561 } 562 563 public RegionInfo getMetaRegionFromName(final byte[] regionName) { 564 for (RegionInfo hri : getMetaRegionSet()) { 565 if (Bytes.equals(hri.getRegionName(), regionName)) { 566 return hri; 567 } 568 } 569 return null; 570 } 571 572 public boolean isCarryingMeta(final ServerName serverName) { 573 // TODO: handle multiple meta 574 return isCarryingRegion(serverName, RegionInfoBuilder.FIRST_META_REGIONINFO); 575 } 576 577 private boolean isCarryingRegion(final ServerName serverName, final RegionInfo regionInfo) { 578 // TODO: check for state? 579 final RegionStateNode node = regionStates.getRegionStateNode(regionInfo); 580 return (node != null && serverName.equals(node.getRegionLocation())); 581 } 582 583 private RegionInfo getMetaForRegion(final RegionInfo regionInfo) { 584 // if (regionInfo.isMetaRegion()) return regionInfo; 585 // TODO: handle multiple meta. if the region provided is not meta lookup 586 // which meta the region belongs to. 587 return RegionInfoBuilder.FIRST_META_REGIONINFO; 588 } 589 590 // TODO: handle multiple meta. 591 private static final Set<RegionInfo> META_REGION_SET = 592 Collections.singleton(RegionInfoBuilder.FIRST_META_REGIONINFO); 593 594 public Set<RegionInfo> getMetaRegionSet() { 595 return META_REGION_SET; 596 } 597 598 // ============================================================================================ 599 // META Event(s) helpers 600 // ============================================================================================ 601 /** 602 * Notice that, this only means the meta region is available on a RS, but the AM may still be 603 * loading the region states from meta, so usually you need to check {@link #isMetaLoaded()} first 604 * before checking this method, unless you can make sure that your piece of code can only be 605 * executed after AM builds the region states. 606 * @see #isMetaLoaded() 607 */ 608 public boolean isMetaAssigned() { 609 return metaAssignEvent.isReady(); 610 } 611 612 public boolean isMetaRegionInTransition() { 613 return !isMetaAssigned(); 614 } 615 616 /** 617 * Notice that this event does not mean the AM has already finished region state rebuilding. See 618 * the comment of {@link #isMetaAssigned()} for more details. 619 * @see #isMetaAssigned() 620 */ 621 public boolean waitMetaAssigned(Procedure<?> proc, RegionInfo regionInfo) { 622 return getMetaAssignEvent(getMetaForRegion(regionInfo)).suspendIfNotReady(proc); 623 } 624 625 private void setMetaAssigned(RegionInfo metaRegionInfo, boolean assigned) { 626 assert isMetaRegion(metaRegionInfo) : "unexpected non-meta region " + metaRegionInfo; 627 ProcedureEvent<?> metaAssignEvent = getMetaAssignEvent(metaRegionInfo); 628 if (assigned) { 629 metaAssignEvent.wake(getProcedureScheduler()); 630 } else { 631 metaAssignEvent.suspend(); 632 } 633 } 634 635 private ProcedureEvent<?> getMetaAssignEvent(RegionInfo metaRegionInfo) { 636 assert isMetaRegion(metaRegionInfo) : "unexpected non-meta region " + metaRegionInfo; 637 // TODO: handle multiple meta. 638 return metaAssignEvent; 639 } 640 641 /** 642 * Wait until AM finishes the meta loading, i.e, the region states rebuilding. 643 * @see #isMetaLoaded() 644 * @see #waitMetaAssigned(Procedure, RegionInfo) 645 */ 646 public boolean waitMetaLoaded(Procedure<?> proc) { 647 return metaLoadEvent.suspendIfNotReady(proc); 648 } 649 650 /** 651 * This method will be called in master initialization method after calling 652 * {@link #processOfflineRegions()}, as in processOfflineRegions we will generate assign 653 * procedures for offline regions, which may be conflict with creating table. 654 * <p/> 655 * This is a bit dirty, should be reconsidered after we decide whether to keep the 656 * {@link #processOfflineRegions()} method. 657 */ 658 public void wakeMetaLoadedEvent() { 659 metaLoadEvent.wake(getProcedureScheduler()); 660 assert isMetaLoaded() : "expected meta to be loaded"; 661 } 662 663 /** 664 * Return whether AM finishes the meta loading, i.e, the region states rebuilding. 665 * @see #isMetaAssigned() 666 * @see #waitMetaLoaded(Procedure) 667 */ 668 public boolean isMetaLoaded() { 669 return metaLoadEvent.isReady(); 670 } 671 672 /** 673 * Start a new thread to check if there are region servers whose versions are higher than others. 674 * If so, move all system table regions to RS with the highest version to keep compatibility. The 675 * reason is, RS in new version may not be able to access RS in old version when there are some 676 * incompatible changes. 677 * <p> 678 * This method is called when a new RegionServer is added to cluster only. 679 * </p> 680 */ 681 public void checkIfShouldMoveSystemRegionAsync() { 682 // TODO: Fix this thread. If a server is killed and a new one started, this thread thinks that 683 // it should 'move' the system tables from the old server to the new server but 684 // ServerCrashProcedure is on it; and it will take care of the assign without dataloss. 685 if (this.master.getServerManager().countOfRegionServers() <= 1) { 686 return; 687 } 688 // This thread used to run whenever there was a change in the cluster. The ZooKeeper 689 // childrenChanged notification came in before the nodeDeleted message and so this method 690 // cold run before a ServerCrashProcedure could run. That meant that this thread could see 691 // a Crashed Server before ServerCrashProcedure and it could find system regions on the 692 // crashed server and go move them before ServerCrashProcedure had a chance; could be 693 // dataloss too if WALs were not recovered. 694 new Thread(() -> { 695 try { 696 synchronized (checkIfShouldMoveSystemRegionLock) { 697 List<RegionPlan> plans = new ArrayList<>(); 698 // TODO: I don't think this code does a good job if all servers in cluster have same 699 // version. It looks like it will schedule unnecessary moves. 700 for (ServerName server : getExcludedServersForSystemTable()) { 701 if (master.getServerManager().isServerDead(server)) { 702 // TODO: See HBASE-18494 and HBASE-18495. Though getExcludedServersForSystemTable() 703 // considers only online servers, the server could be queued for dead server 704 // processing. As region assignments for crashed server is handled by 705 // ServerCrashProcedure, do NOT handle them here. The goal is to handle this through 706 // regular flow of LoadBalancer as a favored node and not to have this special 707 // handling. 708 continue; 709 } 710 List<RegionInfo> regionsShouldMove = getSystemTables(server); 711 if (!regionsShouldMove.isEmpty()) { 712 for (RegionInfo regionInfo : regionsShouldMove) { 713 // null value for dest forces destination server to be selected by balancer 714 RegionPlan plan = new RegionPlan(regionInfo, server, null); 715 if (regionInfo.isMetaRegion()) { 716 // Must move meta region first. 717 LOG.info("Async MOVE of {} to newer Server={}", regionInfo.getEncodedName(), 718 server); 719 moveAsync(plan); 720 } else { 721 plans.add(plan); 722 } 723 } 724 } 725 for (RegionPlan plan : plans) { 726 LOG.info("Async MOVE of {} to newer Server={}", plan.getRegionInfo().getEncodedName(), 727 server); 728 moveAsync(plan); 729 } 730 } 731 } 732 } catch (Throwable t) { 733 LOG.error(t.toString(), t); 734 } 735 }).start(); 736 } 737 738 private List<RegionInfo> getSystemTables(ServerName serverName) { 739 ServerStateNode serverNode = regionStates.getServerNode(serverName); 740 if (serverNode == null) { 741 return Collections.emptyList(); 742 } 743 return serverNode.getSystemRegionInfoList(); 744 } 745 746 private void preTransitCheck(RegionStateNode regionNode, RegionState.State[] expectedStates) 747 throws HBaseIOException { 748 if (regionNode.getProcedure() != null) { 749 throw new HBaseIOException( 750 regionNode + " is currently in transition; pid=" + regionNode.getProcedure().getProcId()); 751 } 752 if (!regionNode.isInState(expectedStates)) { 753 throw new DoNotRetryRegionException(UNEXPECTED_STATE_REGION + regionNode); 754 } 755 if (isTableDisabled(regionNode.getTable())) { 756 throw new DoNotRetryIOException(regionNode.getTable() + " is disabled for " + regionNode); 757 } 758 } 759 760 /** 761 * Create an assign TransitRegionStateProcedure. Makes sure of RegionState. Throws exception if 762 * not appropriate UNLESS override is set. Used by hbck2 but also by straightline 763 * {@link #assign(RegionInfo, ServerName)} and {@link #assignAsync(RegionInfo, ServerName)}. 764 * @see #createAssignProcedure(RegionStateNode, ServerName) for a version that does NO checking 765 * used when only when no checking needed. 766 * @param override If false, check RegionState is appropriate for assign; if not throw exception. 767 */ 768 private TransitRegionStateProcedure createAssignProcedure(RegionInfo regionInfo, ServerName sn, 769 boolean override, boolean force) throws IOException { 770 RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo); 771 regionNode.lock(); 772 try { 773 if (override) { 774 if (!force) { 775 preTransitCheck(regionNode, STATES_EXPECTED_ON_ASSIGN); 776 } 777 if (regionNode.getProcedure() != null) { 778 regionNode.unsetProcedure(regionNode.getProcedure()); 779 } 780 } else { 781 preTransitCheck(regionNode, STATES_EXPECTED_ON_ASSIGN); 782 } 783 assert regionNode.getProcedure() == null; 784 return regionNode.setProcedure( 785 TransitRegionStateProcedure.assign(getProcedureEnvironment(), regionInfo, sn)); 786 } finally { 787 regionNode.unlock(); 788 } 789 } 790 791 /** 792 * Create an assign TransitRegionStateProcedure. Does NO checking of RegionState. Presumes 793 * appriopriate state ripe for assign. 794 * @see #createAssignProcedure(RegionInfo, ServerName, boolean, boolean) 795 */ 796 private TransitRegionStateProcedure createAssignProcedure(RegionStateNode regionNode, 797 ServerName targetServer) { 798 regionNode.lock(); 799 try { 800 return regionNode.setProcedure(TransitRegionStateProcedure.assign(getProcedureEnvironment(), 801 regionNode.getRegionInfo(), targetServer)); 802 } finally { 803 regionNode.unlock(); 804 } 805 } 806 807 public long assign(RegionInfo regionInfo, ServerName sn) throws IOException { 808 TransitRegionStateProcedure proc = createAssignProcedure(regionInfo, sn, false, false); 809 ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc); 810 return proc.getProcId(); 811 } 812 813 public long assign(RegionInfo regionInfo) throws IOException { 814 return assign(regionInfo, null); 815 } 816 817 /** 818 * Submits a procedure that assigns a region to a target server without waiting for it to finish 819 * @param regionInfo the region we would like to assign 820 * @param sn target server name 821 */ 822 public Future<byte[]> assignAsync(RegionInfo regionInfo, ServerName sn) throws IOException { 823 return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), 824 createAssignProcedure(regionInfo, sn, false, false)); 825 } 826 827 /** 828 * Submits a procedure that assigns a region without waiting for it to finish 829 * @param regionInfo the region we would like to assign 830 */ 831 public Future<byte[]> assignAsync(RegionInfo regionInfo) throws IOException { 832 return assignAsync(regionInfo, null); 833 } 834 835 public long unassign(RegionInfo regionInfo) throws IOException { 836 RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo); 837 if (regionNode == null) { 838 throw new UnknownRegionException("No RegionState found for " + regionInfo.getEncodedName()); 839 } 840 TransitRegionStateProcedure proc; 841 regionNode.lock(); 842 try { 843 preTransitCheck(regionNode, STATES_EXPECTED_ON_UNASSIGN_OR_MOVE); 844 proc = TransitRegionStateProcedure.unassign(getProcedureEnvironment(), regionInfo); 845 regionNode.setProcedure(proc); 846 } finally { 847 regionNode.unlock(); 848 } 849 ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc); 850 return proc.getProcId(); 851 } 852 853 public TransitRegionStateProcedure createMoveRegionProcedure(RegionInfo regionInfo, 854 ServerName targetServer) throws HBaseIOException { 855 RegionStateNode regionNode = this.regionStates.getRegionStateNode(regionInfo); 856 if (regionNode == null) { 857 throw new UnknownRegionException( 858 "No RegionStateNode found for " + regionInfo.getEncodedName() + "(Closed/Deleted?)"); 859 } 860 TransitRegionStateProcedure proc; 861 regionNode.lock(); 862 try { 863 preTransitCheck(regionNode, STATES_EXPECTED_ON_UNASSIGN_OR_MOVE); 864 regionNode.checkOnline(); 865 proc = TransitRegionStateProcedure.move(getProcedureEnvironment(), regionInfo, targetServer); 866 regionNode.setProcedure(proc); 867 } finally { 868 regionNode.unlock(); 869 } 870 return proc; 871 } 872 873 public void move(RegionInfo regionInfo) throws IOException { 874 TransitRegionStateProcedure proc = createMoveRegionProcedure(regionInfo, null); 875 ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc); 876 } 877 878 public Future<byte[]> moveAsync(RegionPlan regionPlan) throws HBaseIOException { 879 TransitRegionStateProcedure proc = 880 createMoveRegionProcedure(regionPlan.getRegionInfo(), regionPlan.getDestination()); 881 return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), proc); 882 } 883 884 public Future<byte[]> balance(RegionPlan regionPlan) throws HBaseIOException { 885 ServerName current = 886 this.getRegionStates().getRegionAssignments().get(regionPlan.getRegionInfo()); 887 if (current == null || !current.equals(regionPlan.getSource())) { 888 LOG.debug("Skip region plan {}, source server not match, current region location is {}", 889 regionPlan, current == null ? "(null)" : current); 890 return null; 891 } 892 return moveAsync(regionPlan); 893 } 894 895 // ============================================================================================ 896 // RegionTransition procedures helpers 897 // ============================================================================================ 898 899 /** 900 * Create round-robin assigns. Use on table creation to distribute out regions across cluster. 901 * @return AssignProcedures made out of the passed in <code>hris</code> and a call to the balancer 902 * to populate the assigns with targets chosen using round-robin (default balancer 903 * scheme). If at assign-time, the target chosen is no longer up, thats fine, the 904 * AssignProcedure will ask the balancer for a new target, and so on. 905 */ 906 public TransitRegionStateProcedure[] createRoundRobinAssignProcedures(List<RegionInfo> hris, 907 List<ServerName> serversToExclude) { 908 if (hris.isEmpty()) { 909 return new TransitRegionStateProcedure[0]; 910 } 911 912 if ( 913 serversToExclude != null && this.master.getServerManager().getOnlineServersList().size() == 1 914 ) { 915 LOG.debug("Only one region server found and hence going ahead with the assignment"); 916 serversToExclude = null; 917 } 918 try { 919 // Ask the balancer to assign our regions. Pass the regions en masse. The balancer can do 920 // a better job if it has all the assignments in the one lump. 921 Map<ServerName, List<RegionInfo>> assignments = getBalancer().roundRobinAssignment(hris, 922 this.master.getServerManager().createDestinationServersList(serversToExclude)); 923 // Return mid-method! 924 return createAssignProcedures(assignments); 925 } catch (IOException hioe) { 926 LOG.warn("Failed roundRobinAssignment", hioe); 927 } 928 // If an error above, fall-through to this simpler assign. Last resort. 929 return createAssignProcedures(hris); 930 } 931 932 /** 933 * Create round-robin assigns. Use on table creation to distribute out regions across cluster. 934 * @return AssignProcedures made out of the passed in <code>hris</code> and a call to the balancer 935 * to populate the assigns with targets chosen using round-robin (default balancer 936 * scheme). If at assign-time, the target chosen is no longer up, thats fine, the 937 * AssignProcedure will ask the balancer for a new target, and so on. 938 */ 939 public TransitRegionStateProcedure[] createRoundRobinAssignProcedures(List<RegionInfo> hris) { 940 return createRoundRobinAssignProcedures(hris, null); 941 } 942 943 static int compare(TransitRegionStateProcedure left, TransitRegionStateProcedure right) { 944 if (left.getRegion().isMetaRegion()) { 945 if (right.getRegion().isMetaRegion()) { 946 return RegionInfo.COMPARATOR.compare(left.getRegion(), right.getRegion()); 947 } 948 return -1; 949 } else if (right.getRegion().isMetaRegion()) { 950 return +1; 951 } 952 if (left.getRegion().getTable().isSystemTable()) { 953 if (right.getRegion().getTable().isSystemTable()) { 954 return RegionInfo.COMPARATOR.compare(left.getRegion(), right.getRegion()); 955 } 956 return -1; 957 } else if (right.getRegion().getTable().isSystemTable()) { 958 return +1; 959 } 960 return RegionInfo.COMPARATOR.compare(left.getRegion(), right.getRegion()); 961 } 962 963 /** 964 * Create one TransitRegionStateProcedure to assign a region w/o specifying a target server. This 965 * method is called from HBCK2. 966 * @return an assign or null 967 */ 968 public TransitRegionStateProcedure createOneAssignProcedure(RegionInfo ri, boolean override, 969 boolean force) { 970 TransitRegionStateProcedure trsp = null; 971 try { 972 trsp = createAssignProcedure(ri, null, override, force); 973 } catch (IOException ioe) { 974 LOG.info( 975 "Failed {} assign, override={}" 976 + (override ? "" : "; set override to by-pass state checks."), 977 ri.getEncodedName(), override, ioe); 978 } 979 return trsp; 980 } 981 982 /** 983 * Create one TransitRegionStateProcedure to unassign a region. This method is called from HBCK2. 984 * @return an unassign or null 985 */ 986 public TransitRegionStateProcedure createOneUnassignProcedure(RegionInfo ri, boolean override, 987 boolean force) { 988 RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(ri); 989 TransitRegionStateProcedure trsp = null; 990 regionNode.lock(); 991 try { 992 if (override) { 993 if (!force) { 994 preTransitCheck(regionNode, STATES_EXPECTED_ON_UNASSIGN_OR_MOVE); 995 } 996 if (regionNode.getProcedure() != null) { 997 regionNode.unsetProcedure(regionNode.getProcedure()); 998 } 999 } else { 1000 // This is where we could throw an exception; i.e. override is false. 1001 preTransitCheck(regionNode, STATES_EXPECTED_ON_UNASSIGN_OR_MOVE); 1002 } 1003 assert regionNode.getProcedure() == null; 1004 trsp = 1005 TransitRegionStateProcedure.unassign(getProcedureEnvironment(), regionNode.getRegionInfo()); 1006 regionNode.setProcedure(trsp); 1007 } catch (IOException ioe) { 1008 // 'override' must be false here. 1009 LOG.info("Failed {} unassign, override=false; set override to by-pass state checks.", 1010 ri.getEncodedName(), ioe); 1011 } finally { 1012 regionNode.unlock(); 1013 } 1014 return trsp; 1015 } 1016 1017 /** 1018 * Create an array of TransitRegionStateProcedure w/o specifying a target server. Used as fallback 1019 * of caller is unable to do {@link #createAssignProcedures(Map)}. 1020 * <p/> 1021 * If no target server, at assign time, we will try to use the former location of the region if 1022 * one exists. This is how we 'retain' the old location across a server restart. 1023 * <p/> 1024 * Should only be called when you can make sure that no one can touch these regions other than 1025 * you. For example, when you are creating or enabling table. Presumes all Regions are in 1026 * appropriate state ripe for assign; no checking of Region state is done in here. 1027 * @see #createAssignProcedures(Map) 1028 */ 1029 public TransitRegionStateProcedure[] createAssignProcedures(List<RegionInfo> hris) { 1030 return hris.stream().map(hri -> regionStates.getOrCreateRegionStateNode(hri)) 1031 .map(regionNode -> createAssignProcedure(regionNode, null)).sorted(AssignmentManager::compare) 1032 .toArray(TransitRegionStateProcedure[]::new); 1033 } 1034 1035 /** 1036 * Tied to {@link #createAssignProcedures(List)} in that it is called if caller is unable to run 1037 * this method. Presumes all Regions are in appropriate state ripe for assign; no checking of 1038 * Region state is done in here. 1039 * @param assignments Map of assignments from which we produce an array of AssignProcedures. 1040 * @return Assignments made from the passed in <code>assignments</code> 1041 * @see #createAssignProcedures(List) 1042 */ 1043 private TransitRegionStateProcedure[] 1044 createAssignProcedures(Map<ServerName, List<RegionInfo>> assignments) { 1045 return assignments.entrySet().stream() 1046 .flatMap(e -> e.getValue().stream().map(hri -> regionStates.getOrCreateRegionStateNode(hri)) 1047 .map(regionNode -> createAssignProcedure(regionNode, e.getKey()))) 1048 .sorted(AssignmentManager::compare).toArray(TransitRegionStateProcedure[]::new); 1049 } 1050 1051 // for creating unassign TRSP when disabling a table or closing excess region replicas 1052 private TransitRegionStateProcedure forceCreateUnssignProcedure(RegionStateNode regionNode) { 1053 regionNode.lock(); 1054 try { 1055 if (regionNode.isInState(State.OFFLINE, State.CLOSED, State.SPLIT)) { 1056 return null; 1057 } 1058 // in general, a split parent should be in CLOSED or SPLIT state, but anyway, let's check it 1059 // here for safety 1060 if (regionNode.getRegionInfo().isSplit()) { 1061 LOG.warn("{} is a split parent but not in CLOSED or SPLIT state", regionNode); 1062 return null; 1063 } 1064 // As in DisableTableProcedure or ModifyTableProcedure, we will hold the xlock for table, so 1065 // we can make sure that this procedure has not been executed yet, as TRSP will hold the 1066 // shared lock for table all the time. So here we will unset it and when it is actually 1067 // executed, it will find that the attach procedure is not itself and quit immediately. 1068 if (regionNode.getProcedure() != null) { 1069 regionNode.unsetProcedure(regionNode.getProcedure()); 1070 } 1071 return regionNode.setProcedure(TransitRegionStateProcedure.unassign(getProcedureEnvironment(), 1072 regionNode.getRegionInfo())); 1073 } finally { 1074 regionNode.unlock(); 1075 } 1076 } 1077 1078 /** 1079 * Called by DisableTableProcedure to unassign all the regions for a table. 1080 */ 1081 public TransitRegionStateProcedure[] createUnassignProceduresForDisabling(TableName tableName) { 1082 return regionStates.getTableRegionStateNodes(tableName).stream() 1083 .map(this::forceCreateUnssignProcedure).filter(p -> p != null) 1084 .toArray(TransitRegionStateProcedure[]::new); 1085 } 1086 1087 /** 1088 * Called by ModifyTableProcedure to unassign all the excess region replicas for a table. Will 1089 * skip submit unassign procedure if the region is in transition, so you may need to call this 1090 * method multiple times. 1091 * @param tableName the table for closing excess region replicas 1092 * @param newReplicaCount the new replica count, should be less than current replica count 1093 * @param submit for submitting procedure 1094 * @return the number of regions in transition that we can not schedule unassign procedures 1095 */ 1096 public int submitUnassignProcedureForClosingExcessRegionReplicas(TableName tableName, 1097 int newReplicaCount, Consumer<TransitRegionStateProcedure> submit) { 1098 int inTransitionCount = 0; 1099 for (RegionStateNode regionNode : regionStates.getTableRegionStateNodes(tableName)) { 1100 regionNode.lock(); 1101 try { 1102 if (regionNode.getRegionInfo().getReplicaId() >= newReplicaCount) { 1103 if (regionNode.isInTransition()) { 1104 LOG.debug("skip scheduling unassign procedure for {} when closing excess region " 1105 + "replicas since it is in transition", regionNode); 1106 inTransitionCount++; 1107 continue; 1108 } 1109 if (regionNode.isInState(State.OFFLINE, State.CLOSED, State.SPLIT)) { 1110 continue; 1111 } 1112 submit.accept(regionNode.setProcedure(TransitRegionStateProcedure 1113 .unassign(getProcedureEnvironment(), regionNode.getRegionInfo()))); 1114 } 1115 } finally { 1116 regionNode.unlock(); 1117 } 1118 } 1119 return inTransitionCount; 1120 } 1121 1122 public int numberOfUnclosedExcessRegionReplicas(TableName tableName, int newReplicaCount) { 1123 int unclosed = 0; 1124 for (RegionStateNode regionNode : regionStates.getTableRegionStateNodes(tableName)) { 1125 regionNode.lock(); 1126 try { 1127 if (regionNode.getRegionInfo().getReplicaId() >= newReplicaCount) { 1128 if (!regionNode.isInState(State.OFFLINE, State.CLOSED, State.SPLIT)) { 1129 unclosed++; 1130 } 1131 } 1132 } finally { 1133 regionNode.unlock(); 1134 } 1135 } 1136 return unclosed; 1137 } 1138 1139 public SplitTableRegionProcedure createSplitProcedure(final RegionInfo regionToSplit, 1140 final byte[] splitKey) throws IOException { 1141 return new SplitTableRegionProcedure(getProcedureEnvironment(), regionToSplit, splitKey); 1142 } 1143 1144 public TruncateRegionProcedure createTruncateRegionProcedure(final RegionInfo regionToTruncate) 1145 throws IOException { 1146 return new TruncateRegionProcedure(getProcedureEnvironment(), regionToTruncate); 1147 } 1148 1149 public MergeTableRegionsProcedure createMergeProcedure(RegionInfo... ris) throws IOException { 1150 return new MergeTableRegionsProcedure(getProcedureEnvironment(), ris, false); 1151 } 1152 1153 /** 1154 * Delete the region states. This is called by "DeleteTable" 1155 */ 1156 public void deleteTable(final TableName tableName) throws IOException { 1157 final ArrayList<RegionInfo> regions = regionStates.getTableRegionsInfo(tableName); 1158 regionStateStore.deleteRegions(regions); 1159 for (int i = 0; i < regions.size(); ++i) { 1160 final RegionInfo regionInfo = regions.get(i); 1161 regionStates.deleteRegion(regionInfo); 1162 } 1163 } 1164 1165 // ============================================================================================ 1166 // RS Region Transition Report helpers 1167 // ============================================================================================ 1168 private void reportRegionStateTransition(ReportRegionStateTransitionResponse.Builder builder, 1169 ServerStateNode serverNode, List<RegionStateTransition> transitionList) throws IOException { 1170 for (RegionStateTransition transition : transitionList) { 1171 switch (transition.getTransitionCode()) { 1172 case OPENED: 1173 case FAILED_OPEN: 1174 case CLOSED: 1175 assert transition.getRegionInfoCount() == 1 : transition; 1176 final RegionInfo hri = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0)); 1177 long procId = 1178 transition.getProcIdCount() > 0 ? transition.getProcId(0) : Procedure.NO_PROC_ID; 1179 updateRegionTransition(serverNode, transition.getTransitionCode(), hri, 1180 transition.hasOpenSeqNum() ? transition.getOpenSeqNum() : HConstants.NO_SEQNUM, procId); 1181 break; 1182 case READY_TO_SPLIT: 1183 case SPLIT: 1184 case SPLIT_REVERTED: 1185 assert transition.getRegionInfoCount() == 3 : transition; 1186 final RegionInfo parent = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0)); 1187 final RegionInfo splitA = ProtobufUtil.toRegionInfo(transition.getRegionInfo(1)); 1188 final RegionInfo splitB = ProtobufUtil.toRegionInfo(transition.getRegionInfo(2)); 1189 updateRegionSplitTransition(serverNode, transition.getTransitionCode(), parent, splitA, 1190 splitB); 1191 break; 1192 case READY_TO_MERGE: 1193 case MERGED: 1194 case MERGE_REVERTED: 1195 assert transition.getRegionInfoCount() == 3 : transition; 1196 final RegionInfo merged = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0)); 1197 final RegionInfo mergeA = ProtobufUtil.toRegionInfo(transition.getRegionInfo(1)); 1198 final RegionInfo mergeB = ProtobufUtil.toRegionInfo(transition.getRegionInfo(2)); 1199 updateRegionMergeTransition(serverNode, transition.getTransitionCode(), merged, mergeA, 1200 mergeB); 1201 break; 1202 } 1203 } 1204 } 1205 1206 public ReportRegionStateTransitionResponse reportRegionStateTransition( 1207 final ReportRegionStateTransitionRequest req) throws PleaseHoldException { 1208 ReportRegionStateTransitionResponse.Builder builder = 1209 ReportRegionStateTransitionResponse.newBuilder(); 1210 ServerName serverName = ProtobufUtil.toServerName(req.getServer()); 1211 ServerStateNode serverNode = regionStates.getServerNode(serverName); 1212 if (serverNode == null) { 1213 LOG.warn("No server node for {}", serverName); 1214 builder.setErrorMessage("No server node for " + serverName); 1215 return builder.build(); 1216 } 1217 // here we have to acquire a read lock instead of a simple exclusive lock. This is because that 1218 // we should not block other reportRegionStateTransition call from the same region server. This 1219 // is not only about performance, but also to prevent dead lock. Think of the meta region is 1220 // also on the same region server and you hold the lock which blocks the 1221 // reportRegionStateTransition for meta, and since meta is not online, you will block inside the 1222 // lock protection to wait for meta online... 1223 serverNode.readLock().lock(); 1224 try { 1225 // we only accept reportRegionStateTransition if the region server is online, see the comment 1226 // above in submitServerCrash method and HBASE-21508 for more details. 1227 if (serverNode.isInState(ServerState.ONLINE)) { 1228 try { 1229 reportRegionStateTransition(builder, serverNode, req.getTransitionList()); 1230 } catch (PleaseHoldException e) { 1231 LOG.trace("Failed transition ", e); 1232 throw e; 1233 } catch (UnsupportedOperationException | IOException e) { 1234 // TODO: at the moment we have a single error message and the RS will abort 1235 // if the master says that one of the region transitions failed. 1236 LOG.warn("Failed transition", e); 1237 builder.setErrorMessage("Failed transition " + e.getMessage()); 1238 } 1239 } else { 1240 LOG.warn("The region server {} is already dead, skip reportRegionStateTransition call", 1241 serverName); 1242 builder.setErrorMessage("You are dead"); 1243 } 1244 } finally { 1245 serverNode.readLock().unlock(); 1246 } 1247 1248 return builder.build(); 1249 } 1250 1251 private void updateRegionTransition(ServerStateNode serverNode, TransitionCode state, 1252 RegionInfo regionInfo, long seqId, long procId) throws IOException { 1253 checkMetaLoaded(regionInfo); 1254 1255 RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo); 1256 if (regionNode == null) { 1257 // the table/region is gone. maybe a delete, split, merge 1258 throw new UnexpectedStateException(String.format( 1259 "Server %s was trying to transition region %s to %s. but Region is not known.", 1260 serverNode.getServerName(), regionInfo, state)); 1261 } 1262 LOG.trace("Update region transition serverName={} region={} regionState={}", 1263 serverNode.getServerName(), regionNode, state); 1264 1265 regionNode.lock(); 1266 try { 1267 if (!reportTransition(regionNode, serverNode, state, seqId, procId)) { 1268 // Don't log WARN if shutting down cluster; during shutdown. Avoid the below messages: 1269 // 2018-08-13 10:45:10,551 WARN ...AssignmentManager: No matching procedure found for 1270 // rit=OPEN, location=ve0538.halxg.cloudera.com,16020,1533493000958, 1271 // table=IntegrationTestBigLinkedList, region=65ab289e2fc1530df65f6c3d7cde7aa5 transition 1272 // to CLOSED 1273 // These happen because on cluster shutdown, we currently let the RegionServers close 1274 // regions. This is the only time that region close is not run by the Master (so cluster 1275 // goes down fast). Consider changing it so Master runs all shutdowns. 1276 if ( 1277 this.master.getServerManager().isClusterShutdown() && state.equals(TransitionCode.CLOSED) 1278 ) { 1279 LOG.info("RegionServer {} {}", state, regionNode.getRegionInfo().getEncodedName()); 1280 } else { 1281 LOG.warn("No matching procedure found for {} transition on {} to {}", 1282 serverNode.getServerName(), regionNode, state); 1283 } 1284 } 1285 } finally { 1286 regionNode.unlock(); 1287 } 1288 } 1289 1290 private boolean reportTransition(RegionStateNode regionNode, ServerStateNode serverNode, 1291 TransitionCode state, long seqId, long procId) throws IOException { 1292 ServerName serverName = serverNode.getServerName(); 1293 TransitRegionStateProcedure proc = regionNode.getProcedure(); 1294 if (proc == null) { 1295 return false; 1296 } 1297 proc.reportTransition(master.getMasterProcedureExecutor().getEnvironment(), regionNode, 1298 serverName, state, seqId, procId); 1299 return true; 1300 } 1301 1302 private void updateRegionSplitTransition(final ServerStateNode serverNode, 1303 final TransitionCode state, final RegionInfo parent, final RegionInfo hriA, 1304 final RegionInfo hriB) throws IOException { 1305 checkMetaLoaded(parent); 1306 1307 if (state != TransitionCode.READY_TO_SPLIT) { 1308 throw new UnexpectedStateException( 1309 "unsupported split regionState=" + state + " for parent region " + parent 1310 + " maybe an old RS (< 2.0) had the operation in progress"); 1311 } 1312 1313 // sanity check on the request 1314 if (!Bytes.equals(hriA.getEndKey(), hriB.getStartKey())) { 1315 throw new UnsupportedOperationException("unsupported split request with bad keys: parent=" 1316 + parent + " hriA=" + hriA + " hriB=" + hriB); 1317 } 1318 1319 if (!master.isSplitOrMergeEnabled(MasterSwitchType.SPLIT)) { 1320 LOG.warn("Split switch is off! skip split of " + parent); 1321 throw new DoNotRetryIOException( 1322 "Split region " + parent.getRegionNameAsString() + " failed due to split switch off"); 1323 } 1324 1325 // Submit the Split procedure 1326 final byte[] splitKey = hriB.getStartKey(); 1327 if (LOG.isDebugEnabled()) { 1328 LOG.debug("Split request from {}, parent={}, splitKey={}", serverNode.getServerName(), parent, 1329 Bytes.toStringBinary(splitKey)); 1330 } 1331 // Processing this report happens asynchronously from other activities which can mutate 1332 // the region state. For example, a split procedure may already be running for this parent. 1333 // A split procedure cannot succeed if the parent region is no longer open, so we can 1334 // ignore it in that case. 1335 // Note that submitting more than one split procedure for a given region is 1336 // harmless -- the split is fenced in the procedure handling -- but it would be noisy in 1337 // the logs. Only one procedure can succeed. The other procedure(s) would abort during 1338 // initialization and report failure with WARN level logging. 1339 RegionState parentState = regionStates.getRegionState(parent); 1340 if (parentState != null && parentState.isOpened()) { 1341 master.getMasterProcedureExecutor().submitProcedure(createSplitProcedure(parent, splitKey)); 1342 } else { 1343 LOG.info("Ignoring split request from {}, parent={} because parent is unknown or not open", 1344 serverNode.getServerName(), parent); 1345 return; 1346 } 1347 1348 // If the RS is < 2.0 throw an exception to abort the operation, we are handling the split 1349 if (master.getServerManager().getVersionNumber(serverNode.getServerName()) < 0x0200000) { 1350 throw new UnsupportedOperationException( 1351 String.format("Split handled by the master: " + "parent=%s hriA=%s hriB=%s", 1352 parent.getShortNameToLog(), hriA, hriB)); 1353 } 1354 } 1355 1356 private void updateRegionMergeTransition(final ServerStateNode serverNode, 1357 final TransitionCode state, final RegionInfo merged, final RegionInfo hriA, 1358 final RegionInfo hriB) throws IOException { 1359 checkMetaLoaded(merged); 1360 1361 if (state != TransitionCode.READY_TO_MERGE) { 1362 throw new UnexpectedStateException( 1363 "Unsupported merge regionState=" + state + " for regionA=" + hriA + " regionB=" + hriB 1364 + " merged=" + merged + " maybe an old RS (< 2.0) had the operation in progress"); 1365 } 1366 1367 if (!master.isSplitOrMergeEnabled(MasterSwitchType.MERGE)) { 1368 LOG.warn("Merge switch is off! skip merge of regionA=" + hriA + " regionB=" + hriB); 1369 throw new DoNotRetryIOException( 1370 "Merge of regionA=" + hriA + " regionB=" + hriB + " failed because merge switch is off"); 1371 } 1372 1373 // Submit the Merge procedure 1374 if (LOG.isDebugEnabled()) { 1375 LOG.debug("Handling merge request from RS=" + merged + ", merged=" + merged); 1376 } 1377 master.getMasterProcedureExecutor().submitProcedure(createMergeProcedure(hriA, hriB)); 1378 1379 // If the RS is < 2.0 throw an exception to abort the operation, we are handling the merge 1380 if (master.getServerManager().getVersionNumber(serverNode.getServerName()) < 0x0200000) { 1381 throw new UnsupportedOperationException( 1382 String.format("Merge not handled yet: regionState=%s merged=%s hriA=%s hriB=%s", state, 1383 merged, hriA, hriB)); 1384 } 1385 } 1386 1387 // ============================================================================================ 1388 // RS Status update (report online regions) helpers 1389 // ============================================================================================ 1390 /** 1391 * The master will call this method when the RS send the regionServerReport(). The report will 1392 * contains the "online regions". This method will check the the online regions against the 1393 * in-memory state of the AM, and we will log a warn message if there is a mismatch. This is 1394 * because that there is no fencing between the reportRegionStateTransition method and 1395 * regionServerReport method, so there could be race and introduce inconsistency here, but 1396 * actually there is no problem. 1397 * <p/> 1398 * Please see HBASE-21421 and HBASE-21463 for more details. 1399 */ 1400 public void reportOnlineRegions(ServerName serverName, Set<byte[]> regionNames) { 1401 if (!isRunning()) { 1402 return; 1403 } 1404 if (LOG.isTraceEnabled()) { 1405 LOG.trace("ReportOnlineRegions {} regionCount={}, metaLoaded={} {}", serverName, 1406 regionNames.size(), isMetaLoaded(), 1407 regionNames.stream().map(Bytes::toStringBinary).collect(Collectors.toList())); 1408 } 1409 1410 ServerStateNode serverNode = regionStates.getServerNode(serverName); 1411 if (serverNode == null) { 1412 LOG.warn("Got a report from server {} where its server node is null", serverName); 1413 return; 1414 } 1415 serverNode.readLock().lock(); 1416 try { 1417 if (!serverNode.isInState(ServerState.ONLINE)) { 1418 LOG.warn("Got a report from a server result in state {}", serverNode); 1419 return; 1420 } 1421 } finally { 1422 serverNode.readLock().unlock(); 1423 } 1424 1425 // Track the regionserver reported online regions in memory. 1426 synchronized (rsReports) { 1427 rsReports.put(serverName, regionNames); 1428 } 1429 1430 if (regionNames.isEmpty()) { 1431 // nothing to do if we don't have regions 1432 LOG.trace("no online region found on {}", serverName); 1433 return; 1434 } 1435 if (!isMetaLoaded()) { 1436 // we are still on startup, skip checking 1437 return; 1438 } 1439 // The Heartbeat tells us of what regions are on the region serve, check the state. 1440 checkOnlineRegionsReport(serverNode, regionNames); 1441 } 1442 1443 /** 1444 * Close <code>regionName</code> on <code>sn</code> silently and immediately without using a 1445 * Procedure or going via hbase:meta. For case where a RegionServer's hosting of a Region is not 1446 * aligned w/ the Master's accounting of Region state. This is for cleaning up an error in 1447 * accounting. 1448 */ 1449 private void closeRegionSilently(ServerName sn, byte[] regionName) { 1450 try { 1451 RegionInfo ri = CatalogFamilyFormat.parseRegionInfoFromRegionName(regionName); 1452 // Pass -1 for timeout. Means do not wait. 1453 ServerManager.closeRegionSilentlyAndWait(this.master.getAsyncClusterConnection(), sn, ri, -1); 1454 } catch (Exception e) { 1455 LOG.error("Failed trying to close {} on {}", Bytes.toStringBinary(regionName), sn, e); 1456 } 1457 } 1458 1459 /** 1460 * Check that what the RegionServer reports aligns with the Master's image. If disagreement, we 1461 * will tell the RegionServer to expediently close a Region we do not think it should have. 1462 */ 1463 private void checkOnlineRegionsReport(ServerStateNode serverNode, Set<byte[]> regionNames) { 1464 ServerName serverName = serverNode.getServerName(); 1465 for (byte[] regionName : regionNames) { 1466 if (!isRunning()) { 1467 return; 1468 } 1469 RegionStateNode regionNode = regionStates.getRegionStateNodeFromName(regionName); 1470 if (regionNode == null) { 1471 String regionNameAsStr = Bytes.toStringBinary(regionName); 1472 LOG.warn("No RegionStateNode for {} but reported as up on {}; closing...", regionNameAsStr, 1473 serverName); 1474 closeRegionSilently(serverNode.getServerName(), regionName); 1475 continue; 1476 } 1477 final long lag = 1000; 1478 // This is just a fallback check designed to identify unexpected data inconsistencies, so we 1479 // use tryLock to attempt to acquire the lock, and if the lock cannot be acquired, we skip the 1480 // check. This will not cause any additional problems and also prevents the regionServerReport 1481 // call from being stuck for too long which may cause deadlock on region assignment. 1482 if (regionNode.tryLock()) { 1483 try { 1484 long diff = EnvironmentEdgeManager.currentTime() - regionNode.getLastUpdate(); 1485 if (regionNode.isInState(State.OPENING, State.OPEN)) { 1486 // This is possible as a region server has just closed a region but the region server 1487 // report is generated before the closing, but arrive after the closing. Make sure 1488 // there 1489 // is some elapsed time so less false alarms. 1490 if (!regionNode.getRegionLocation().equals(serverName) && diff > lag) { 1491 LOG.warn("Reporting {} server does not match {} (time since last " 1492 + "update={}ms); closing...", serverName, regionNode, diff); 1493 closeRegionSilently(serverNode.getServerName(), regionName); 1494 } 1495 } else if (!regionNode.isInState(State.CLOSING, State.SPLITTING)) { 1496 // So, we can get report that a region is CLOSED or SPLIT because a heartbeat 1497 // came in at about same time as a region transition. Make sure there is some 1498 // elapsed time so less false alarms. 1499 if (diff > lag) { 1500 LOG.warn("Reporting {} state does not match {} (time since last update={}ms)", 1501 serverName, regionNode, diff); 1502 } 1503 } 1504 } finally { 1505 regionNode.unlock(); 1506 } 1507 } else { 1508 LOG.warn( 1509 "Unable to acquire lock for regionNode {}. It is likely that another thread is currently holding the lock. To avoid deadlock, skip execution for now.", 1510 regionNode); 1511 } 1512 } 1513 } 1514 1515 // ============================================================================================ 1516 // RIT chore 1517 // ============================================================================================ 1518 private static class RegionInTransitionChore extends ProcedureInMemoryChore<MasterProcedureEnv> { 1519 public RegionInTransitionChore(final int timeoutMsec) { 1520 super(timeoutMsec); 1521 } 1522 1523 @Override 1524 protected void periodicExecute(final MasterProcedureEnv env) { 1525 final AssignmentManager am = env.getAssignmentManager(); 1526 1527 final RegionInTransitionStat ritStat = am.computeRegionInTransitionStat(); 1528 if (ritStat.hasRegionsOverThreshold()) { 1529 for (RegionState hri : ritStat.getRegionOverThreshold()) { 1530 am.handleRegionOverStuckWarningThreshold(hri.getRegion()); 1531 } 1532 } 1533 1534 // update metrics 1535 am.updateRegionsInTransitionMetrics(ritStat); 1536 } 1537 } 1538 1539 private static class DeadServerMetricRegionChore 1540 extends ProcedureInMemoryChore<MasterProcedureEnv> { 1541 public DeadServerMetricRegionChore(final int timeoutMsec) { 1542 super(timeoutMsec); 1543 } 1544 1545 @Override 1546 protected void periodicExecute(final MasterProcedureEnv env) { 1547 final ServerManager sm = env.getMasterServices().getServerManager(); 1548 final AssignmentManager am = env.getAssignmentManager(); 1549 // To minimize inconsistencies we are not going to snapshot live servers in advance in case 1550 // new servers are added; OTOH we don't want to add heavy sync for a consistent view since 1551 // this is for metrics. Instead, we're going to check each regions as we go; to avoid making 1552 // too many checks, we maintain a local lists of server, limiting us to false negatives. If 1553 // we miss some recently-dead server, we'll just see it next time. 1554 Set<ServerName> recentlyLiveServers = new HashSet<>(); 1555 int deadRegions = 0, unknownRegions = 0; 1556 for (RegionStateNode rsn : am.getRegionStates().getRegionStateNodes()) { 1557 if (rsn.getState() != State.OPEN) { 1558 continue; // Opportunistic check, should quickly skip RITs, offline tables, etc. 1559 } 1560 // Do not need to acquire region state lock as this is only for showing metrics. 1561 ServerName sn = rsn.getRegionLocation(); 1562 State state = rsn.getState(); 1563 if (state != State.OPEN) { 1564 continue; // Mostly skipping RITs that are already being take care of. 1565 } 1566 if (sn == null) { 1567 ++unknownRegions; // Opened on null? 1568 continue; 1569 } 1570 if (recentlyLiveServers.contains(sn)) { 1571 continue; 1572 } 1573 ServerManager.ServerLiveState sls = sm.isServerKnownAndOnline(sn); 1574 switch (sls) { 1575 case LIVE: 1576 recentlyLiveServers.add(sn); 1577 break; 1578 case DEAD: 1579 ++deadRegions; 1580 break; 1581 case UNKNOWN: 1582 ++unknownRegions; 1583 break; 1584 default: 1585 throw new AssertionError("Unexpected " + sls); 1586 } 1587 } 1588 if (deadRegions > 0 || unknownRegions > 0) { 1589 LOG.info("Found {} OPEN regions on dead servers and {} OPEN regions on unknown servers", 1590 deadRegions, unknownRegions); 1591 } 1592 1593 am.updateDeadServerRegionMetrics(deadRegions, unknownRegions); 1594 } 1595 } 1596 1597 public RegionInTransitionStat computeRegionInTransitionStat() { 1598 final RegionInTransitionStat rit = new RegionInTransitionStat(getConfiguration()); 1599 rit.update(this); 1600 return rit; 1601 } 1602 1603 public static class RegionInTransitionStat { 1604 private final int ritThreshold; 1605 1606 private HashMap<String, RegionState> ritsOverThreshold = null; 1607 private long statTimestamp; 1608 private long oldestRITTime = 0; 1609 private int totalRITsTwiceThreshold = 0; 1610 private int totalRITs = 0; 1611 1612 public RegionInTransitionStat(final Configuration conf) { 1613 this.ritThreshold = 1614 conf.getInt(METRICS_RIT_STUCK_WARNING_THRESHOLD, DEFAULT_RIT_STUCK_WARNING_THRESHOLD); 1615 } 1616 1617 public int getRITThreshold() { 1618 return ritThreshold; 1619 } 1620 1621 public long getTimestamp() { 1622 return statTimestamp; 1623 } 1624 1625 public int getTotalRITs() { 1626 return totalRITs; 1627 } 1628 1629 public long getOldestRITTime() { 1630 return oldestRITTime; 1631 } 1632 1633 public int getTotalRITsOverThreshold() { 1634 Map<String, RegionState> m = this.ritsOverThreshold; 1635 return m != null ? m.size() : 0; 1636 } 1637 1638 public boolean hasRegionsTwiceOverThreshold() { 1639 return totalRITsTwiceThreshold > 0; 1640 } 1641 1642 public boolean hasRegionsOverThreshold() { 1643 Map<String, RegionState> m = this.ritsOverThreshold; 1644 return m != null && !m.isEmpty(); 1645 } 1646 1647 public Collection<RegionState> getRegionOverThreshold() { 1648 Map<String, RegionState> m = this.ritsOverThreshold; 1649 return m != null ? m.values() : Collections.emptySet(); 1650 } 1651 1652 public boolean isRegionOverThreshold(final RegionInfo regionInfo) { 1653 Map<String, RegionState> m = this.ritsOverThreshold; 1654 return m != null && m.containsKey(regionInfo.getEncodedName()); 1655 } 1656 1657 public boolean isRegionTwiceOverThreshold(final RegionInfo regionInfo) { 1658 Map<String, RegionState> m = this.ritsOverThreshold; 1659 if (m == null) { 1660 return false; 1661 } 1662 final RegionState state = m.get(regionInfo.getEncodedName()); 1663 if (state == null) { 1664 return false; 1665 } 1666 return (statTimestamp - state.getStamp()) > (ritThreshold * 2); 1667 } 1668 1669 protected void update(final AssignmentManager am) { 1670 final RegionStates regionStates = am.getRegionStates(); 1671 this.statTimestamp = EnvironmentEdgeManager.currentTime(); 1672 update(regionStates.getRegionsStateInTransition(), statTimestamp); 1673 update(regionStates.getRegionFailedOpen(), statTimestamp); 1674 1675 if (LOG.isDebugEnabled() && ritsOverThreshold != null && !ritsOverThreshold.isEmpty()) { 1676 LOG.debug("RITs over threshold: {}", 1677 ritsOverThreshold.entrySet().stream() 1678 .map(e -> e.getKey() + ":" + e.getValue().getState().name()) 1679 .collect(Collectors.joining("\n"))); 1680 } 1681 } 1682 1683 private void update(final Collection<RegionState> regions, final long currentTime) { 1684 for (RegionState state : regions) { 1685 totalRITs++; 1686 final long ritStartedMs = state.getStamp(); 1687 if (ritStartedMs == 0) { 1688 // Don't output bogus values to metrics if they accidentally make it here. 1689 LOG.warn("The RIT {} has no start time", state.getRegion()); 1690 continue; 1691 } 1692 final long ritTime = currentTime - ritStartedMs; 1693 if (ritTime > ritThreshold) { 1694 if (ritsOverThreshold == null) { 1695 ritsOverThreshold = new HashMap<String, RegionState>(); 1696 } 1697 ritsOverThreshold.put(state.getRegion().getEncodedName(), state); 1698 totalRITsTwiceThreshold += (ritTime > (ritThreshold * 2)) ? 1 : 0; 1699 } 1700 if (oldestRITTime < ritTime) { 1701 oldestRITTime = ritTime; 1702 } 1703 } 1704 } 1705 } 1706 1707 private void updateRegionsInTransitionMetrics(final RegionInTransitionStat ritStat) { 1708 metrics.updateRITOldestAge(ritStat.getOldestRITTime()); 1709 metrics.updateRITCount(ritStat.getTotalRITs()); 1710 metrics.updateRITCountOverThreshold(ritStat.getTotalRITsOverThreshold()); 1711 } 1712 1713 private void updateDeadServerRegionMetrics(int deadRegions, int unknownRegions) { 1714 metrics.updateDeadServerOpenRegions(deadRegions); 1715 metrics.updateUnknownServerOpenRegions(unknownRegions); 1716 } 1717 1718 private void handleRegionOverStuckWarningThreshold(final RegionInfo regionInfo) { 1719 final RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo); 1720 // if (regionNode.isStuck()) { 1721 LOG.warn("STUCK Region-In-Transition {}", regionNode); 1722 } 1723 1724 // ============================================================================================ 1725 // TODO: Master load/bootstrap 1726 // ============================================================================================ 1727 public void joinCluster() throws IOException { 1728 long startTime = System.nanoTime(); 1729 LOG.debug("Joining cluster..."); 1730 1731 // Scan hbase:meta to build list of existing regions, servers, and assignment. 1732 // hbase:meta is online now or will be. Inside loadMeta, we keep trying. Can't make progress 1733 // w/o meta. 1734 loadMeta(); 1735 1736 while (master.getServerManager().countOfRegionServers() < 1) { 1737 LOG.info("Waiting for RegionServers to join; current count={}", 1738 master.getServerManager().countOfRegionServers()); 1739 Threads.sleep(250); 1740 } 1741 LOG.info("Number of RegionServers={}", master.getServerManager().countOfRegionServers()); 1742 1743 // Start the chores 1744 master.getMasterProcedureExecutor().addChore(this.ritChore); 1745 master.getMasterProcedureExecutor().addChore(this.deadMetricChore); 1746 1747 long costMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime); 1748 LOG.info("Joined the cluster in {}", StringUtils.humanTimeDiff(costMs)); 1749 } 1750 1751 /** 1752 * Create assign procedure for offline regions. Just follow the old 1753 * processofflineServersWithOnlineRegions method. Since now we do not need to deal with dead 1754 * server any more, we only deal with the regions in OFFLINE state in this method. And this is a 1755 * bit strange, that for new regions, we will add it in CLOSED state instead of OFFLINE state, and 1756 * usually there will be a procedure to track them. The processofflineServersWithOnlineRegions is 1757 * a legacy from long ago, as things are going really different now, maybe we do not need this 1758 * method any more. Need to revisit later. 1759 */ 1760 // Public so can be run by the Master as part of the startup. Needs hbase:meta to be online. 1761 // Needs to be done after the table state manager has been started. 1762 public void processOfflineRegions() { 1763 TransitRegionStateProcedure[] procs = 1764 regionStates.getRegionStateNodes().stream().filter(rsn -> rsn.isInState(State.OFFLINE)) 1765 .filter(rsn -> isTableEnabled(rsn.getRegionInfo().getTable())).map(rsn -> { 1766 rsn.lock(); 1767 try { 1768 if (rsn.getProcedure() != null) { 1769 return null; 1770 } else { 1771 return rsn.setProcedure(TransitRegionStateProcedure.assign(getProcedureEnvironment(), 1772 rsn.getRegionInfo(), null)); 1773 } 1774 } finally { 1775 rsn.unlock(); 1776 } 1777 }).filter(p -> p != null).toArray(TransitRegionStateProcedure[]::new); 1778 if (procs.length > 0) { 1779 master.getMasterProcedureExecutor().submitProcedures(procs); 1780 } 1781 } 1782 1783 /* 1784 * AM internal RegionStateStore.RegionStateVisitor implementation. To be used when scanning META 1785 * table for region rows, using RegionStateStore utility methods. RegionStateStore methods will 1786 * convert Result into proper RegionInfo instances, but those would still need to be added into 1787 * AssignmentManager.regionStates in-memory cache. RegionMetaLoadingVisitor.visitRegionState 1788 * method provides the logic for adding RegionInfo instances as loaded from latest META scan into 1789 * AssignmentManager.regionStates. 1790 */ 1791 private class RegionMetaLoadingVisitor implements RegionStateStore.RegionStateVisitor { 1792 1793 @Override 1794 public void visitRegionState(Result result, final RegionInfo regionInfo, final State state, 1795 final ServerName regionLocation, final ServerName lastHost, final long openSeqNum) { 1796 if ( 1797 state == null && regionLocation == null && lastHost == null 1798 && openSeqNum == SequenceId.NO_SEQUENCE_ID 1799 ) { 1800 // This is a row with nothing in it. 1801 LOG.warn("Skipping empty row={}", result); 1802 return; 1803 } 1804 State localState = state; 1805 if (localState == null) { 1806 // No region state column data in hbase:meta table! Are I doing a rolling upgrade from 1807 // hbase1 to hbase2? Am I restoring a SNAPSHOT or otherwise adding a region to hbase:meta? 1808 // In any of these cases, state is empty. For now, presume OFFLINE but there are probably 1809 // cases where we need to probe more to be sure this correct; TODO informed by experience. 1810 LOG.info(regionInfo.getEncodedName() + " regionState=null; presuming " + State.OFFLINE); 1811 localState = State.OFFLINE; 1812 } 1813 RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo); 1814 // Do not need to lock on regionNode, as we can make sure that before we finish loading 1815 // meta, all the related procedures can not be executed. The only exception is for meta 1816 // region related operations, but here we do not load the informations for meta region. 1817 regionNode.setState(localState); 1818 regionNode.setLastHost(lastHost); 1819 regionNode.setRegionLocation(regionLocation); 1820 regionNode.setOpenSeqNum(openSeqNum); 1821 1822 // Note: keep consistent with other methods, see region(Opening|Opened|Closing) 1823 // RIT/ServerCrash handling should take care of the transiting regions. 1824 if ( 1825 localState.matches(State.OPEN, State.OPENING, State.CLOSING, State.SPLITTING, State.MERGING) 1826 ) { 1827 assert regionLocation != null : "found null region location for " + regionNode; 1828 // TODO: this could lead to some orphan server state nodes, as it is possible that the 1829 // region server is already dead and its SCP has already finished but we have 1830 // persisted an opening state on this region server. Finally the TRSP will assign the 1831 // region to another region server, so it will not cause critical problems, just waste 1832 // some memory as no one will try to cleanup these orphan server state nodes. 1833 regionStates.createServer(regionLocation); 1834 regionStates.addRegionToServer(regionNode); 1835 } else if (localState == State.OFFLINE || regionInfo.isOffline()) { 1836 regionStates.addToOfflineRegions(regionNode); 1837 } 1838 if (regionNode.getProcedure() != null) { 1839 regionNode.getProcedure().stateLoaded(AssignmentManager.this, regionNode); 1840 } 1841 } 1842 }; 1843 1844 /** 1845 * Attempt to load {@code regionInfo} from META, adding any results to the 1846 * {@link #regionStateStore} Is NOT aware of replica regions. 1847 * @param regionInfo the region to be loaded from META. 1848 * @throws IOException If some error occurs while querying META or parsing results. 1849 */ 1850 public void populateRegionStatesFromMeta(@NonNull final RegionInfo regionInfo) 1851 throws IOException { 1852 final String regionEncodedName = RegionInfo.DEFAULT_REPLICA_ID == regionInfo.getReplicaId() 1853 ? regionInfo.getEncodedName() 1854 : RegionInfoBuilder.newBuilder(regionInfo).setReplicaId(RegionInfo.DEFAULT_REPLICA_ID).build() 1855 .getEncodedName(); 1856 populateRegionStatesFromMeta(regionEncodedName); 1857 } 1858 1859 /** 1860 * Attempt to load {@code regionEncodedName} from META, adding any results to the 1861 * {@link #regionStateStore} Is NOT aware of replica regions. 1862 * @param regionEncodedName encoded name for the region to be loaded from META. 1863 * @throws IOException If some error occurs while querying META or parsing results. 1864 */ 1865 public void populateRegionStatesFromMeta(@NonNull String regionEncodedName) throws IOException { 1866 final RegionMetaLoadingVisitor visitor = new RegionMetaLoadingVisitor(); 1867 regionStateStore.visitMetaForRegion(regionEncodedName, visitor); 1868 } 1869 1870 private void loadMeta() throws IOException { 1871 // TODO: use a thread pool 1872 regionStateStore.visitMeta(new RegionMetaLoadingVisitor()); 1873 } 1874 1875 /** 1876 * Used to check if the meta loading is done. 1877 * <p/> 1878 * if not we throw PleaseHoldException since we are rebuilding the RegionStates 1879 * @param hri region to check if it is already rebuild 1880 * @throws PleaseHoldException if meta has not been loaded yet 1881 */ 1882 private void checkMetaLoaded(RegionInfo hri) throws PleaseHoldException { 1883 if (!isRunning()) { 1884 throw new PleaseHoldException("AssignmentManager not running"); 1885 } 1886 boolean meta = isMetaRegion(hri); 1887 boolean metaLoaded = isMetaLoaded(); 1888 if (!meta && !metaLoaded) { 1889 throw new PleaseHoldException( 1890 "Master not fully online; hbase:meta=" + meta + ", metaLoaded=" + metaLoaded); 1891 } 1892 } 1893 1894 // ============================================================================================ 1895 // TODO: Metrics 1896 // ============================================================================================ 1897 public int getNumRegionsOpened() { 1898 // TODO: Used by TestRegionPlacement.java and assume monotonically increasing value 1899 return 0; 1900 } 1901 1902 /** 1903 * Usually run by the Master in reaction to server crash during normal processing. Can also be 1904 * invoked via external RPC to effect repair; in the latter case, the 'force' flag is set so we 1905 * push through the SCP though context may indicate already-running-SCP (An old SCP may have 1906 * exited abnormally, or damaged cluster may still have references in hbase:meta to 'Unknown 1907 * Servers' -- servers that are not online or in dead servers list, etc.) 1908 * @param force Set if the request came in externally over RPC (via hbck2). Force means run the 1909 * SCP even if it seems as though there might be an outstanding SCP running. 1910 * @return pid of scheduled SCP or {@link Procedure#NO_PROC_ID} if none scheduled. 1911 */ 1912 public long submitServerCrash(ServerName serverName, boolean shouldSplitWal, boolean force) { 1913 // May be an 'Unknown Server' so handle case where serverNode is null. 1914 ServerStateNode serverNode = regionStates.getServerNode(serverName); 1915 // Remove the in-memory rsReports result 1916 synchronized (rsReports) { 1917 rsReports.remove(serverName); 1918 } 1919 if (serverNode == null) { 1920 if (force) { 1921 LOG.info("Force adding ServerCrashProcedure for {} when server node is null", serverName); 1922 } else { 1923 // for normal case, do not schedule SCP if ServerStateNode is null 1924 LOG.warn("Skip adding ServerCrashProcedure for {} because server node is null", serverName); 1925 return Procedure.NO_PROC_ID; 1926 } 1927 } 1928 1929 ProcedureExecutor<MasterProcedureEnv> procExec = this.master.getMasterProcedureExecutor(); 1930 // We hold the write lock here for fencing on reportRegionStateTransition. Once we set the 1931 // server state to CRASHED, we will no longer accept the reportRegionStateTransition call from 1932 // this server. This is used to simplify the implementation for TRSP and SCP, where we can make 1933 // sure that, the region list fetched by SCP will not be changed any more. 1934 if (serverNode != null) { 1935 serverNode.writeLock().lock(); 1936 } 1937 try { 1938 1939 boolean carryingMeta = isCarryingMeta(serverName); 1940 if (serverNode != null && !serverNode.isInState(ServerState.ONLINE)) { 1941 if (force) { 1942 LOG.info("Force adding ServerCrashProcedure for {} (meta={}) when state is not {}", 1943 serverNode, carryingMeta, ServerState.ONLINE); 1944 } else { 1945 LOG.info("Skip adding ServerCrashProcedure for {} (meta={}) when state is not {}", 1946 serverNode, carryingMeta, ServerState.ONLINE); 1947 return Procedure.NO_PROC_ID; 1948 } 1949 } 1950 MasterProcedureEnv mpe = procExec.getEnvironment(); 1951 // If serverNode == null, then 'Unknown Server'. Schedule HBCKSCP instead. 1952 // HBCKSCP scours Master in-memory state AND hbase;meta for references to 1953 // serverName just-in-case. An SCP that is scheduled when the server is 1954 // 'Unknown' probably originated externally with HBCK2 fix-it tool. 1955 ServerState oldState = null; 1956 if (serverNode != null) { 1957 oldState = serverNode.getState(); 1958 serverNode.setState(ServerState.CRASHED); 1959 } 1960 ServerCrashProcedure scp = force 1961 ? new HBCKServerCrashProcedure(mpe, serverName, shouldSplitWal, carryingMeta) 1962 : new ServerCrashProcedure(mpe, serverName, shouldSplitWal, carryingMeta); 1963 long pid = procExec.submitProcedure(scp); 1964 LOG.info("Scheduled ServerCrashProcedure pid={} for {} (carryingMeta={}){}.", pid, serverName, 1965 carryingMeta, 1966 serverNode == null ? "" : " " + serverNode.toString() + ", oldState=" + oldState); 1967 return pid; 1968 } finally { 1969 if (serverNode != null) { 1970 serverNode.writeLock().unlock(); 1971 } 1972 } 1973 } 1974 1975 public void offlineRegion(final RegionInfo regionInfo) { 1976 // TODO used by MasterRpcServices 1977 RegionStateNode node = regionStates.getRegionStateNode(regionInfo); 1978 if (node != null) { 1979 node.offline(); 1980 } 1981 } 1982 1983 public void onlineRegion(final RegionInfo regionInfo, final ServerName serverName) { 1984 // TODO used by TestSplitTransactionOnCluster.java 1985 } 1986 1987 public Map<ServerName, List<RegionInfo>> 1988 getSnapShotOfAssignment(final Collection<RegionInfo> regions) { 1989 return regionStates.getSnapShotOfAssignment(regions); 1990 } 1991 1992 // ============================================================================================ 1993 // TODO: UTILS/HELPERS? 1994 // ============================================================================================ 1995 /** 1996 * Used by the client (via master) to identify if all regions have the schema updates 1997 * @return Pair indicating the status of the alter command (pending/total) 1998 */ 1999 public Pair<Integer, Integer> getReopenStatus(TableName tableName) { 2000 if (isTableDisabled(tableName)) { 2001 return new Pair<Integer, Integer>(0, 0); 2002 } 2003 2004 final List<RegionState> states = regionStates.getTableRegionStates(tableName); 2005 int ritCount = 0; 2006 for (RegionState regionState : states) { 2007 if (!regionState.isOpened() && !regionState.isSplit()) { 2008 ritCount++; 2009 } 2010 } 2011 return new Pair<Integer, Integer>(ritCount, states.size()); 2012 } 2013 2014 // ============================================================================================ 2015 // TODO: Region State In Transition 2016 // ============================================================================================ 2017 public boolean hasRegionsInTransition() { 2018 return regionStates.hasRegionsInTransition(); 2019 } 2020 2021 public List<RegionStateNode> getRegionsInTransition() { 2022 return regionStates.getRegionsInTransition(); 2023 } 2024 2025 public List<RegionInfo> getAssignedRegions() { 2026 return regionStates.getAssignedRegions(); 2027 } 2028 2029 /** 2030 * Resolve a cached {@link RegionInfo} from the region name as a {@code byte[]}. 2031 */ 2032 public RegionInfo getRegionInfo(final byte[] regionName) { 2033 final RegionStateNode regionState = regionStates.getRegionStateNodeFromName(regionName); 2034 return regionState != null ? regionState.getRegionInfo() : null; 2035 } 2036 2037 /** 2038 * Resolve a cached {@link RegionInfo} from the encoded region name as a {@code String}. 2039 */ 2040 public RegionInfo getRegionInfo(final String encodedRegionName) { 2041 final RegionStateNode regionState = 2042 regionStates.getRegionStateNodeFromEncodedRegionName(encodedRegionName); 2043 return regionState != null ? regionState.getRegionInfo() : null; 2044 } 2045 2046 // ============================================================================================ 2047 // Expected states on region state transition. 2048 // Notice that there is expected states for transiting to OPENING state, this is because SCP. 2049 // See the comments in regionOpening method for more details. 2050 // ============================================================================================ 2051 private static final State[] STATES_EXPECTED_ON_OPEN = { State.OPENING, // Normal case 2052 State.OPEN // Retrying 2053 }; 2054 2055 private static final State[] STATES_EXPECTED_ON_CLOSING = { State.OPEN, // Normal case 2056 State.CLOSING, // Retrying 2057 State.SPLITTING, // Offline the split parent 2058 State.MERGING // Offline the merge parents 2059 }; 2060 2061 private static final State[] STATES_EXPECTED_ON_CLOSED = { State.CLOSING, // Normal case 2062 State.CLOSED // Retrying 2063 }; 2064 2065 // This is for manually scheduled region assign, can add other states later if we find out other 2066 // usages 2067 private static final State[] STATES_EXPECTED_ON_ASSIGN = { State.CLOSED, State.OFFLINE }; 2068 2069 // We only allow unassign or move a region which is in OPEN state. 2070 private static final State[] STATES_EXPECTED_ON_UNASSIGN_OR_MOVE = { State.OPEN }; 2071 2072 // ============================================================================================ 2073 // Region Status update 2074 // Should only be called in TransitRegionStateProcedure(and related procedures), as the locking 2075 // and pre-assumptions are very tricky. 2076 // ============================================================================================ 2077 private CompletableFuture<Void> transitStateAndUpdate(RegionStateNode regionNode, 2078 RegionState.State newState, RegionState.State... expectedStates) { 2079 RegionState.State state = regionNode.getState(); 2080 try { 2081 regionNode.transitionState(newState, expectedStates); 2082 } catch (UnexpectedStateException e) { 2083 return FutureUtils.failedFuture(e); 2084 } 2085 CompletableFuture<Void> future = regionStateStore.updateRegionLocation(regionNode); 2086 FutureUtils.addListener(future, (r, e) -> { 2087 if (e != null) { 2088 // revert 2089 regionNode.setState(state); 2090 } 2091 }); 2092 return future; 2093 } 2094 2095 // should be called within the synchronized block of RegionStateNode 2096 CompletableFuture<Void> regionOpening(RegionStateNode regionNode) { 2097 // As in SCP, for performance reason, there is no TRSP attached with this region, we will not 2098 // update the region state, which means that the region could be in any state when we want to 2099 // assign it after a RS crash. So here we do not pass the expectedStates parameter. 2100 return transitStateAndUpdate(regionNode, State.OPENING).thenAccept(r -> { 2101 ServerStateNode serverNode = regionStates.getServerNode(regionNode.getRegionLocation()); 2102 // Here the server node could be null. For example, we want to assign the region to a given 2103 // region server and it crashes, and it is the region server which holds hbase:meta, then the 2104 // above transitStateAndUpdate call will never succeed until we finishes the SCP for it. But 2105 // after the SCP finishes, the server node will be removed, so when we arrive there, the 2106 // server 2107 // node will be null. This is not a big problem if we skip adding it, as later we will fail to 2108 // execute the remote procedure on the region server and then try to assign to another region 2109 // server 2110 if (serverNode != null) { 2111 serverNode.addRegion(regionNode); 2112 } 2113 // update the operation count metrics 2114 metrics.incrementOperationCounter(); 2115 }); 2116 } 2117 2118 // should be called under the RegionStateNode lock 2119 // The parameter 'giveUp' means whether we will try to open the region again, if it is true, then 2120 // we will persist the FAILED_OPEN state into hbase:meta. 2121 CompletableFuture<Void> regionFailedOpen(RegionStateNode regionNode, boolean giveUp) { 2122 RegionState.State state = regionNode.getState(); 2123 ServerName regionLocation = regionNode.getRegionLocation(); 2124 if (!giveUp) { 2125 if (regionLocation != null) { 2126 regionStates.removeRegionFromServer(regionLocation, regionNode); 2127 } 2128 return CompletableFuture.completedFuture(null); 2129 } 2130 regionNode.setState(State.FAILED_OPEN); 2131 regionNode.setRegionLocation(null); 2132 CompletableFuture<Void> future = regionStateStore.updateRegionLocation(regionNode); 2133 FutureUtils.addListener(future, (r, e) -> { 2134 if (e == null) { 2135 if (regionLocation != null) { 2136 regionStates.removeRegionFromServer(regionLocation, regionNode); 2137 } 2138 } else { 2139 // revert 2140 regionNode.setState(state); 2141 regionNode.setRegionLocation(regionLocation); 2142 } 2143 }); 2144 return future; 2145 } 2146 2147 // should be called under the RegionStateNode lock 2148 CompletableFuture<Void> regionClosing(RegionStateNode regionNode) { 2149 return transitStateAndUpdate(regionNode, State.CLOSING, STATES_EXPECTED_ON_CLOSING) 2150 .thenAccept(r -> { 2151 RegionInfo hri = regionNode.getRegionInfo(); 2152 // Set meta has not initialized early. so people trying to create/edit tables will wait 2153 if (isMetaRegion(hri)) { 2154 setMetaAssigned(hri, false); 2155 } 2156 // update the operation count metrics 2157 metrics.incrementOperationCounter(); 2158 }); 2159 } 2160 2161 // for open and close, they will first be persist to the procedure store in 2162 // RegionRemoteProcedureBase. So here we will first change the in memory state as it is considered 2163 // as succeeded if the persistence to procedure store is succeeded, and then when the 2164 // RegionRemoteProcedureBase is woken up, we will persist the RegionStateNode to hbase:meta. 2165 2166 // should be called under the RegionStateNode lock 2167 void regionOpenedWithoutPersistingToMeta(RegionStateNode regionNode) 2168 throws UnexpectedStateException { 2169 regionNode.transitionState(State.OPEN, STATES_EXPECTED_ON_OPEN); 2170 RegionInfo regionInfo = regionNode.getRegionInfo(); 2171 regionStates.addRegionToServer(regionNode); 2172 regionStates.removeFromFailedOpen(regionInfo); 2173 } 2174 2175 // should be called under the RegionStateNode lock 2176 void regionClosedWithoutPersistingToMeta(RegionStateNode regionNode) 2177 throws UnexpectedStateException { 2178 ServerName regionLocation = regionNode.getRegionLocation(); 2179 regionNode.transitionState(State.CLOSED, STATES_EXPECTED_ON_CLOSED); 2180 regionNode.setRegionLocation(null); 2181 if (regionLocation != null) { 2182 regionNode.setLastHost(regionLocation); 2183 regionStates.removeRegionFromServer(regionLocation, regionNode); 2184 } 2185 } 2186 2187 // should be called under the RegionStateNode lock 2188 CompletableFuture<Void> persistToMeta(RegionStateNode regionNode) { 2189 return regionStateStore.updateRegionLocation(regionNode).thenAccept(r -> { 2190 RegionInfo regionInfo = regionNode.getRegionInfo(); 2191 if (isMetaRegion(regionInfo) && regionNode.getState() == State.OPEN) { 2192 // Usually we'd set a table ENABLED at this stage but hbase:meta is ALWAYs enabled, it 2193 // can't be disabled -- so skip the RPC (besides... enabled is managed by TableStateManager 2194 // which is backed by hbase:meta... Avoid setting ENABLED to avoid having to update state 2195 // on table that contains state. 2196 setMetaAssigned(regionInfo, true); 2197 } 2198 }); 2199 } 2200 2201 // should be called under the RegionStateNode lock 2202 // for SCP 2203 public CompletableFuture<Void> regionClosedAbnormally(RegionStateNode regionNode) { 2204 RegionState.State state = regionNode.getState(); 2205 ServerName regionLocation = regionNode.getRegionLocation(); 2206 regionNode.setState(State.ABNORMALLY_CLOSED); 2207 regionNode.setRegionLocation(null); 2208 CompletableFuture<Void> future = regionStateStore.updateRegionLocation(regionNode); 2209 FutureUtils.addListener(future, (r, e) -> { 2210 if (e == null) { 2211 if (regionLocation != null) { 2212 regionNode.setLastHost(regionLocation); 2213 regionStates.removeRegionFromServer(regionLocation, regionNode); 2214 } 2215 } else { 2216 // revert 2217 regionNode.setState(state); 2218 regionNode.setRegionLocation(regionLocation); 2219 } 2220 }); 2221 return future; 2222 } 2223 2224 // ============================================================================================ 2225 // The above methods can only be called in TransitRegionStateProcedure(and related procedures) 2226 // ============================================================================================ 2227 2228 public void markRegionAsSplit(final RegionInfo parent, final ServerName serverName, 2229 final RegionInfo daughterA, final RegionInfo daughterB) throws IOException { 2230 // Update hbase:meta. Parent will be marked offline and split up in hbase:meta. 2231 // The parent stays in regionStates until cleared when removed by CatalogJanitor. 2232 // Update its state in regionStates to it shows as offline and split when read 2233 // later figuring what regions are in a table and what are not: see 2234 // regionStates#getRegionsOfTable 2235 final RegionStateNode node = regionStates.getOrCreateRegionStateNode(parent); 2236 node.setState(State.SPLIT); 2237 final RegionStateNode nodeA = regionStates.getOrCreateRegionStateNode(daughterA); 2238 nodeA.setState(State.SPLITTING_NEW); 2239 final RegionStateNode nodeB = regionStates.getOrCreateRegionStateNode(daughterB); 2240 nodeB.setState(State.SPLITTING_NEW); 2241 2242 TableDescriptor td = master.getTableDescriptors().get(parent.getTable()); 2243 // TODO: here we just update the parent region info in meta, to set split and offline to true, 2244 // without changing the one in the region node. This is a bit confusing but the region info 2245 // field in RegionStateNode is not expected to be changed in the current design. Need to find a 2246 // possible way to address this problem, or at least adding more comments about the trick to 2247 // deal with this problem, that when you want to filter out split parent, you need to check both 2248 // the RegionState on whether it is split, and also the region info. If one of them matches then 2249 // it is a split parent. And usually only one of them can match, as after restart, the region 2250 // state will be changed from SPLIT to CLOSED. 2251 regionStateStore.splitRegion(parent, daughterA, daughterB, serverName, td); 2252 if (shouldAssignFavoredNodes(parent)) { 2253 List<ServerName> onlineServers = this.master.getServerManager().getOnlineServersList(); 2254 getFavoredNodePromoter().generateFavoredNodesForDaughter(onlineServers, parent, daughterA, 2255 daughterB); 2256 } 2257 } 2258 2259 /** 2260 * When called here, the merge has happened. The merged regions have been unassigned and the above 2261 * markRegionClosed has been called on each so they have been disassociated from a hosting Server. 2262 * The merged region will be open after this call. The merged regions are removed from hbase:meta 2263 * below. Later they are deleted from the filesystem by the catalog janitor running against 2264 * hbase:meta. It notices when the merged region no longer holds references to the old regions 2265 * (References are deleted after a compaction rewrites what the Reference points at but not until 2266 * the archiver chore runs, are the References removed). 2267 */ 2268 public void markRegionAsMerged(final RegionInfo child, final ServerName serverName, 2269 RegionInfo[] mergeParents) throws IOException { 2270 final RegionStateNode node = regionStates.getOrCreateRegionStateNode(child); 2271 node.setState(State.MERGED); 2272 for (RegionInfo ri : mergeParents) { 2273 regionStates.deleteRegion(ri); 2274 } 2275 TableDescriptor td = master.getTableDescriptors().get(child.getTable()); 2276 regionStateStore.mergeRegions(child, mergeParents, serverName, td); 2277 if (shouldAssignFavoredNodes(child)) { 2278 getFavoredNodePromoter().generateFavoredNodesForMergedRegion(child, mergeParents); 2279 } 2280 } 2281 2282 /* 2283 * Favored nodes should be applied only when FavoredNodes balancer is configured and the region 2284 * belongs to a non-system table. 2285 */ 2286 private boolean shouldAssignFavoredNodes(RegionInfo region) { 2287 return this.shouldAssignRegionsWithFavoredNodes 2288 && FavoredNodesManager.isFavoredNodeApplicable(region); 2289 } 2290 2291 // ============================================================================================ 2292 // Assign Queue (Assign/Balance) 2293 // ============================================================================================ 2294 private final ArrayList<RegionStateNode> pendingAssignQueue = new ArrayList<RegionStateNode>(); 2295 private final ReentrantLock assignQueueLock = new ReentrantLock(); 2296 private final Condition assignQueueFullCond = assignQueueLock.newCondition(); 2297 2298 /** 2299 * Add the assign operation to the assignment queue. The pending assignment operation will be 2300 * processed, and each region will be assigned by a server using the balancer. 2301 */ 2302 protected void queueAssign(final RegionStateNode regionNode) { 2303 regionNode.getProcedureEvent().suspend(); 2304 2305 // TODO: quick-start for meta and the other sys-tables? 2306 assignQueueLock.lock(); 2307 try { 2308 pendingAssignQueue.add(regionNode); 2309 if ( 2310 regionNode.isSystemTable() || pendingAssignQueue.size() == 1 2311 || pendingAssignQueue.size() >= assignDispatchWaitQueueMaxSize 2312 ) { 2313 assignQueueFullCond.signal(); 2314 } 2315 } finally { 2316 assignQueueLock.unlock(); 2317 } 2318 } 2319 2320 private void startAssignmentThread() { 2321 assignThread = new Thread(master.getServerName().toShortString()) { 2322 @Override 2323 public void run() { 2324 while (isRunning()) { 2325 processAssignQueue(); 2326 } 2327 pendingAssignQueue.clear(); 2328 } 2329 }; 2330 assignThread.setDaemon(true); 2331 assignThread.start(); 2332 } 2333 2334 private void stopAssignmentThread() { 2335 assignQueueSignal(); 2336 try { 2337 while (assignThread.isAlive()) { 2338 assignQueueSignal(); 2339 assignThread.join(250); 2340 } 2341 } catch (InterruptedException e) { 2342 LOG.warn("join interrupted", e); 2343 Thread.currentThread().interrupt(); 2344 } 2345 } 2346 2347 private void assignQueueSignal() { 2348 assignQueueLock.lock(); 2349 try { 2350 assignQueueFullCond.signal(); 2351 } finally { 2352 assignQueueLock.unlock(); 2353 } 2354 } 2355 2356 @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP") 2357 private HashMap<RegionInfo, RegionStateNode> waitOnAssignQueue() { 2358 HashMap<RegionInfo, RegionStateNode> regions = null; 2359 2360 assignQueueLock.lock(); 2361 try { 2362 if (pendingAssignQueue.isEmpty() && isRunning()) { 2363 assignQueueFullCond.await(); 2364 } 2365 2366 if (!isRunning()) { 2367 return null; 2368 } 2369 assignQueueFullCond.await(assignDispatchWaitMillis, TimeUnit.MILLISECONDS); 2370 regions = new HashMap<RegionInfo, RegionStateNode>(pendingAssignQueue.size()); 2371 for (RegionStateNode regionNode : pendingAssignQueue) { 2372 regions.put(regionNode.getRegionInfo(), regionNode); 2373 } 2374 pendingAssignQueue.clear(); 2375 } catch (InterruptedException e) { 2376 LOG.warn("got interrupted ", e); 2377 Thread.currentThread().interrupt(); 2378 } finally { 2379 assignQueueLock.unlock(); 2380 } 2381 return regions; 2382 } 2383 2384 private void processAssignQueue() { 2385 final HashMap<RegionInfo, RegionStateNode> regions = waitOnAssignQueue(); 2386 if (regions == null || regions.size() == 0 || !isRunning()) { 2387 return; 2388 } 2389 2390 if (LOG.isTraceEnabled()) { 2391 LOG.trace("PROCESS ASSIGN QUEUE regionCount=" + regions.size()); 2392 } 2393 2394 // TODO: Optimize balancer. pass a RegionPlan? 2395 final HashMap<RegionInfo, ServerName> retainMap = new HashMap<>(); 2396 final List<RegionInfo> userHRIs = new ArrayList<>(regions.size()); 2397 // Regions for system tables requiring reassignment 2398 final List<RegionInfo> systemHRIs = new ArrayList<>(); 2399 for (RegionStateNode regionStateNode : regions.values()) { 2400 boolean sysTable = regionStateNode.isSystemTable(); 2401 final List<RegionInfo> hris = sysTable ? systemHRIs : userHRIs; 2402 if (regionStateNode.getRegionLocation() != null) { 2403 retainMap.put(regionStateNode.getRegionInfo(), regionStateNode.getRegionLocation()); 2404 } else { 2405 hris.add(regionStateNode.getRegionInfo()); 2406 } 2407 } 2408 2409 // TODO: connect with the listener to invalidate the cache 2410 2411 // TODO use events 2412 List<ServerName> servers = master.getServerManager().createDestinationServersList(); 2413 for (int i = 0; servers.size() < 1; ++i) { 2414 // Report every fourth time around this loop; try not to flood log. 2415 if (i % 4 == 0) { 2416 LOG.warn("No servers available; cannot place " + regions.size() + " unassigned regions."); 2417 } 2418 2419 if (!isRunning()) { 2420 LOG.debug("Stopped! Dropping assign of " + regions.size() + " queued regions."); 2421 return; 2422 } 2423 Threads.sleep(250); 2424 servers = master.getServerManager().createDestinationServersList(); 2425 } 2426 2427 if (!systemHRIs.isEmpty()) { 2428 // System table regions requiring reassignment are present, get region servers 2429 // not available for system table regions 2430 final List<ServerName> excludeServers = getExcludedServersForSystemTable(); 2431 List<ServerName> serversForSysTables = 2432 servers.stream().filter(s -> !excludeServers.contains(s)).collect(Collectors.toList()); 2433 if (serversForSysTables.isEmpty()) { 2434 LOG.warn("Filtering old server versions and the excluded produced an empty set; " 2435 + "instead considering all candidate servers!"); 2436 } 2437 LOG.debug("Processing assignQueue; systemServersCount=" + serversForSysTables.size() 2438 + ", allServersCount=" + servers.size()); 2439 processAssignmentPlans(regions, null, systemHRIs, 2440 serversForSysTables.isEmpty() && !containsBogusAssignments(regions, systemHRIs) 2441 ? servers 2442 : serversForSysTables); 2443 } 2444 2445 processAssignmentPlans(regions, retainMap, userHRIs, servers); 2446 } 2447 2448 private boolean containsBogusAssignments(Map<RegionInfo, RegionStateNode> regions, 2449 List<RegionInfo> hirs) { 2450 for (RegionInfo ri : hirs) { 2451 if ( 2452 regions.get(ri).getRegionLocation() != null 2453 && regions.get(ri).getRegionLocation().equals(LoadBalancer.BOGUS_SERVER_NAME) 2454 ) { 2455 return true; 2456 } 2457 } 2458 return false; 2459 } 2460 2461 private void processAssignmentPlans(final HashMap<RegionInfo, RegionStateNode> regions, 2462 final HashMap<RegionInfo, ServerName> retainMap, final List<RegionInfo> hris, 2463 final List<ServerName> servers) { 2464 boolean isTraceEnabled = LOG.isTraceEnabled(); 2465 if (isTraceEnabled) { 2466 LOG.trace("Available servers count=" + servers.size() + ": " + servers); 2467 } 2468 2469 final LoadBalancer balancer = getBalancer(); 2470 // ask the balancer where to place regions 2471 if (retainMap != null && !retainMap.isEmpty()) { 2472 if (isTraceEnabled) { 2473 LOG.trace("retain assign regions=" + retainMap); 2474 } 2475 try { 2476 acceptPlan(regions, balancer.retainAssignment(retainMap, servers)); 2477 } catch (IOException e) { 2478 LOG.warn("unable to retain assignment", e); 2479 addToPendingAssignment(regions, retainMap.keySet()); 2480 } 2481 } 2482 2483 // TODO: Do we need to split retain and round-robin? 2484 // the retain seems to fallback to round-robin/random if the region is not in the map. 2485 if (!hris.isEmpty()) { 2486 Collections.sort(hris, RegionInfo.COMPARATOR); 2487 if (isTraceEnabled) { 2488 LOG.trace("round robin regions=" + hris); 2489 } 2490 try { 2491 acceptPlan(regions, balancer.roundRobinAssignment(hris, servers)); 2492 } catch (IOException e) { 2493 LOG.warn("unable to round-robin assignment", e); 2494 addToPendingAssignment(regions, hris); 2495 } 2496 } 2497 } 2498 2499 private void acceptPlan(final HashMap<RegionInfo, RegionStateNode> regions, 2500 final Map<ServerName, List<RegionInfo>> plan) throws HBaseIOException { 2501 final ProcedureEvent<?>[] events = new ProcedureEvent[regions.size()]; 2502 final long st = EnvironmentEdgeManager.currentTime(); 2503 2504 if (plan.isEmpty()) { 2505 throw new HBaseIOException("unable to compute plans for regions=" + regions.size()); 2506 } 2507 2508 int evcount = 0; 2509 for (Map.Entry<ServerName, List<RegionInfo>> entry : plan.entrySet()) { 2510 final ServerName server = entry.getKey(); 2511 for (RegionInfo hri : entry.getValue()) { 2512 final RegionStateNode regionNode = regions.get(hri); 2513 regionNode.setRegionLocation(server); 2514 if (server.equals(LoadBalancer.BOGUS_SERVER_NAME) && regionNode.isSystemTable()) { 2515 assignQueueLock.lock(); 2516 try { 2517 pendingAssignQueue.add(regionNode); 2518 } finally { 2519 assignQueueLock.unlock(); 2520 } 2521 } else { 2522 events[evcount++] = regionNode.getProcedureEvent(); 2523 } 2524 } 2525 } 2526 ProcedureEvent.wakeEvents(getProcedureScheduler(), events); 2527 2528 final long et = EnvironmentEdgeManager.currentTime(); 2529 if (LOG.isTraceEnabled()) { 2530 LOG.trace("ASSIGN ACCEPT " + events.length + " -> " + StringUtils.humanTimeDiff(et - st)); 2531 } 2532 } 2533 2534 private void addToPendingAssignment(final HashMap<RegionInfo, RegionStateNode> regions, 2535 final Collection<RegionInfo> pendingRegions) { 2536 assignQueueLock.lock(); 2537 try { 2538 for (RegionInfo hri : pendingRegions) { 2539 pendingAssignQueue.add(regions.get(hri)); 2540 } 2541 } finally { 2542 assignQueueLock.unlock(); 2543 } 2544 } 2545 2546 /** 2547 * For a given cluster with mixed versions of servers, get a list of servers with lower versions, 2548 * where system table regions should not be assigned to. For system table, we must assign regions 2549 * to a server with highest version. However, we can disable this exclusion using config: 2550 * "hbase.min.version.move.system.tables" if checkForMinVersion is true. Detailed explanation 2551 * available with definition of minVersionToMoveSysTables. 2552 * @return List of Excluded servers for System table regions. 2553 */ 2554 public List<ServerName> getExcludedServersForSystemTable() { 2555 // TODO: This should be a cached list kept by the ServerManager rather than calculated on each 2556 // move or system region assign. The RegionServerTracker keeps list of online Servers with 2557 // RegionServerInfo that includes Version. 2558 List<Pair<ServerName, String>> serverList = 2559 master.getServerManager().getOnlineServersList().stream() 2560 .map(s -> new Pair<>(s, master.getRegionServerVersion(s))).collect(Collectors.toList()); 2561 if (serverList.isEmpty()) { 2562 return new ArrayList<>(); 2563 } 2564 String highestVersion = Collections 2565 .max(serverList, (o1, o2) -> VersionInfo.compareVersion(o1.getSecond(), o2.getSecond())) 2566 .getSecond(); 2567 if (!DEFAULT_MIN_VERSION_MOVE_SYS_TABLES_CONFIG.equals(minVersionToMoveSysTables)) { 2568 int comparedValue = VersionInfo.compareVersion(minVersionToMoveSysTables, highestVersion); 2569 if (comparedValue > 0) { 2570 return new ArrayList<>(); 2571 } 2572 } 2573 return serverList.stream().filter(pair -> !pair.getSecond().equals(highestVersion)) 2574 .map(Pair::getFirst).collect(Collectors.toList()); 2575 } 2576 2577 MasterServices getMaster() { 2578 return master; 2579 } 2580 2581 /** Returns a snapshot of rsReports */ 2582 public Map<ServerName, Set<byte[]>> getRSReports() { 2583 Map<ServerName, Set<byte[]>> rsReportsSnapshot = new HashMap<>(); 2584 synchronized (rsReports) { 2585 rsReports.entrySet().forEach(e -> rsReportsSnapshot.put(e.getKey(), e.getValue())); 2586 } 2587 return rsReportsSnapshot; 2588 } 2589 2590 /** 2591 * Provide regions state count for given table. e.g howmany regions of give table are 2592 * opened/closed/rit etc 2593 * @param tableName TableName 2594 * @return region states count 2595 */ 2596 public RegionStatesCount getRegionStatesCount(TableName tableName) { 2597 int openRegionsCount = 0; 2598 int closedRegionCount = 0; 2599 int ritCount = 0; 2600 int splitRegionCount = 0; 2601 int totalRegionCount = 0; 2602 if (!isTableDisabled(tableName)) { 2603 final List<RegionState> states = regionStates.getTableRegionStates(tableName); 2604 for (RegionState regionState : states) { 2605 if (regionState.isOpened()) { 2606 openRegionsCount++; 2607 } else if (regionState.isClosed()) { 2608 closedRegionCount++; 2609 } else if (regionState.isSplit()) { 2610 splitRegionCount++; 2611 } 2612 } 2613 totalRegionCount = states.size(); 2614 ritCount = totalRegionCount - openRegionsCount - splitRegionCount; 2615 } 2616 return new RegionStatesCount.RegionStatesCountBuilder().setOpenRegions(openRegionsCount) 2617 .setClosedRegions(closedRegionCount).setSplitRegions(splitRegionCount) 2618 .setRegionsInTransition(ritCount).setTotalRegions(totalRegionCount).build(); 2619 } 2620 2621}