001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.assignment; 019 020import edu.umd.cs.findbugs.annotations.NonNull; 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Collection; 024import java.util.Collections; 025import java.util.HashMap; 026import java.util.HashSet; 027import java.util.List; 028import java.util.Map; 029import java.util.Set; 030import java.util.concurrent.CompletableFuture; 031import java.util.concurrent.Future; 032import java.util.concurrent.TimeUnit; 033import java.util.concurrent.atomic.AtomicBoolean; 034import java.util.concurrent.locks.Condition; 035import java.util.concurrent.locks.ReentrantLock; 036import java.util.function.Consumer; 037import java.util.function.Function; 038import java.util.stream.Collectors; 039import java.util.stream.Stream; 040import org.apache.hadoop.conf.Configuration; 041import org.apache.hadoop.hbase.CatalogFamilyFormat; 042import org.apache.hadoop.hbase.DoNotRetryIOException; 043import org.apache.hadoop.hbase.HBaseIOException; 044import org.apache.hadoop.hbase.HConstants; 045import org.apache.hadoop.hbase.PleaseHoldException; 046import org.apache.hadoop.hbase.ServerName; 047import org.apache.hadoop.hbase.TableName; 048import org.apache.hadoop.hbase.UnknownRegionException; 049import org.apache.hadoop.hbase.client.DoNotRetryRegionException; 050import org.apache.hadoop.hbase.client.MasterSwitchType; 051import org.apache.hadoop.hbase.client.RegionInfo; 052import org.apache.hadoop.hbase.client.RegionInfoBuilder; 053import org.apache.hadoop.hbase.client.RegionReplicaUtil; 054import org.apache.hadoop.hbase.client.RegionStatesCount; 055import org.apache.hadoop.hbase.client.Result; 056import org.apache.hadoop.hbase.client.ResultScanner; 057import org.apache.hadoop.hbase.client.Scan; 058import org.apache.hadoop.hbase.client.TableDescriptor; 059import org.apache.hadoop.hbase.client.TableState; 060import org.apache.hadoop.hbase.exceptions.UnexpectedStateException; 061import org.apache.hadoop.hbase.favored.FavoredNodesManager; 062import org.apache.hadoop.hbase.favored.FavoredNodesPromoter; 063import org.apache.hadoop.hbase.master.LoadBalancer; 064import org.apache.hadoop.hbase.master.MasterServices; 065import org.apache.hadoop.hbase.master.MetricsAssignmentManager; 066import org.apache.hadoop.hbase.master.RegionPlan; 067import org.apache.hadoop.hbase.master.RegionState; 068import org.apache.hadoop.hbase.master.RegionState.State; 069import org.apache.hadoop.hbase.master.ServerManager; 070import org.apache.hadoop.hbase.master.TableStateManager; 071import org.apache.hadoop.hbase.master.balancer.FavoredStochasticBalancer; 072import org.apache.hadoop.hbase.master.procedure.HBCKServerCrashProcedure; 073import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 074import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler; 075import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait; 076import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure; 077import org.apache.hadoop.hbase.master.procedure.TruncateRegionProcedure; 078import org.apache.hadoop.hbase.master.region.MasterRegion; 079import org.apache.hadoop.hbase.procedure2.Procedure; 080import org.apache.hadoop.hbase.procedure2.ProcedureEvent; 081import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; 082import org.apache.hadoop.hbase.procedure2.ProcedureInMemoryChore; 083import org.apache.hadoop.hbase.procedure2.util.StringUtils; 084import org.apache.hadoop.hbase.regionserver.SequenceId; 085import org.apache.hadoop.hbase.rsgroup.RSGroupBasedLoadBalancer; 086import org.apache.hadoop.hbase.util.Bytes; 087import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 088import org.apache.hadoop.hbase.util.FutureUtils; 089import org.apache.hadoop.hbase.util.Pair; 090import org.apache.hadoop.hbase.util.Threads; 091import org.apache.hadoop.hbase.util.VersionInfo; 092import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; 093import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 094import org.apache.yetus.audience.InterfaceAudience; 095import org.apache.zookeeper.KeeperException; 096import org.slf4j.Logger; 097import org.slf4j.LoggerFactory; 098 099import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 100import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition; 101import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; 102import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest; 103import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse; 104 105/** 106 * The AssignmentManager is the coordinator for region assign/unassign operations. 107 * <ul> 108 * <li>In-memory states of regions and servers are stored in {@link RegionStates}.</li> 109 * <li>hbase:meta state updates are handled by {@link RegionStateStore}.</li> 110 * </ul> 111 * Regions are created by CreateTable, Split, Merge. Regions are deleted by DeleteTable, Split, 112 * Merge. Assigns are triggered by CreateTable, EnableTable, Split, Merge, ServerCrash. Unassigns 113 * are triggered by DisableTable, Split, Merge 114 */ 115@InterfaceAudience.Private 116public class AssignmentManager { 117 private static final Logger LOG = LoggerFactory.getLogger(AssignmentManager.class); 118 119 // TODO: AMv2 120 // - handle region migration from hbase1 to hbase2. 121 // - handle sys table assignment first (e.g. acl, namespace) 122 // - handle table priorities 123 // - If ServerBusyException trying to update hbase:meta, we abort the Master 124 // See updateRegionLocation in RegionStateStore. 125 // 126 // See also 127 // https://docs.google.com/document/d/1eVKa7FHdeoJ1-9o8yZcOTAQbv0u0bblBlCCzVSIn69g/edit#heading=h.ystjyrkbtoq5 128 // for other TODOs. 129 130 public static final String BOOTSTRAP_THREAD_POOL_SIZE_CONF_KEY = 131 "hbase.assignment.bootstrap.thread.pool.size"; 132 133 public static final String ASSIGN_DISPATCH_WAIT_MSEC_CONF_KEY = 134 "hbase.assignment.dispatch.wait.msec"; 135 private static final int DEFAULT_ASSIGN_DISPATCH_WAIT_MSEC = 150; 136 137 public static final String ASSIGN_DISPATCH_WAITQ_MAX_CONF_KEY = 138 "hbase.assignment.dispatch.wait.queue.max.size"; 139 private static final int DEFAULT_ASSIGN_DISPATCH_WAITQ_MAX = 100; 140 141 public static final String RIT_CHORE_INTERVAL_MSEC_CONF_KEY = 142 "hbase.assignment.rit.chore.interval.msec"; 143 private static final int DEFAULT_RIT_CHORE_INTERVAL_MSEC = 60 * 1000; 144 145 public static final String DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC_CONF_KEY = 146 "hbase.assignment.dead.region.metric.chore.interval.msec"; 147 private static final int DEFAULT_DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC = 120 * 1000; 148 149 public static final String ASSIGN_MAX_ATTEMPTS = "hbase.assignment.maximum.attempts"; 150 private static final int DEFAULT_ASSIGN_MAX_ATTEMPTS = Integer.MAX_VALUE; 151 152 public static final String ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS = 153 "hbase.assignment.retry.immediately.maximum.attempts"; 154 private static final int DEFAULT_ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS = 3; 155 156 /** Region in Transition metrics threshold time */ 157 public static final String METRICS_RIT_STUCK_WARNING_THRESHOLD = 158 "hbase.metrics.rit.stuck.warning.threshold"; 159 private static final int DEFAULT_RIT_STUCK_WARNING_THRESHOLD = 60 * 1000; 160 public static final String UNEXPECTED_STATE_REGION = "Unexpected state for "; 161 162 public static final String FORCE_REGION_RETAINMENT = "hbase.master.scp.retain.assignment.force"; 163 164 public static final boolean DEFAULT_FORCE_REGION_RETAINMENT = false; 165 166 /** The wait time in millis before checking again if the region's previous RS is back online */ 167 public static final String FORCE_REGION_RETAINMENT_WAIT_INTERVAL = 168 "hbase.master.scp.retain.assignment.force.wait-interval"; 169 170 public static final long DEFAULT_FORCE_REGION_RETAINMENT_WAIT_INTERVAL = 50; 171 172 /** 173 * The number of times to check if the region's previous RS is back online, before giving up and 174 * proceeding with assignment on a new RS 175 */ 176 public static final String FORCE_REGION_RETAINMENT_RETRIES = 177 "hbase.master.scp.retain.assignment.force.retries"; 178 179 public static final int DEFAULT_FORCE_REGION_RETAINMENT_RETRIES = 600; 180 181 private final ProcedureEvent<?> metaAssignEvent = new ProcedureEvent<>("meta assign"); 182 private final ProcedureEvent<?> metaLoadEvent = new ProcedureEvent<>("meta load"); 183 184 private final MetricsAssignmentManager metrics; 185 private final RegionInTransitionChore ritChore; 186 private final DeadServerMetricRegionChore deadMetricChore; 187 private final MasterServices master; 188 189 private final AtomicBoolean running = new AtomicBoolean(false); 190 private final RegionStates regionStates = new RegionStates(); 191 private final RegionStateStore regionStateStore; 192 193 /** 194 * When the operator uses this configuration option, any version between the current cluster 195 * version and the value of "hbase.min.version.move.system.tables" does not trigger any 196 * auto-region movement. Auto-region movement here refers to auto-migration of system table 197 * regions to newer server versions. It is assumed that the configured range of versions does not 198 * require special handling of moving system table regions to higher versioned RegionServer. This 199 * auto-migration is done by {@link #checkIfShouldMoveSystemRegionAsync()}. Example: Let's assume 200 * the cluster is on version 1.4.0 and we have set "hbase.min.version.move.system.tables" as 201 * "2.0.0". Now if we upgrade one RegionServer on 1.4.0 cluster to 1.6.0 (< 2.0.0), then 202 * AssignmentManager will not move hbase:meta, hbase:namespace and other system table regions to 203 * newly brought up RegionServer 1.6.0 as part of auto-migration. However, if we upgrade one 204 * RegionServer on 1.4.0 cluster to 2.2.0 (> 2.0.0), then AssignmentManager will move all system 205 * table regions to newly brought up RegionServer 2.2.0 as part of auto-migration done by 206 * {@link #checkIfShouldMoveSystemRegionAsync()}. "hbase.min.version.move.system.tables" is 207 * introduced as part of HBASE-22923. 208 */ 209 private final String minVersionToMoveSysTables; 210 211 private static final String MIN_VERSION_MOVE_SYS_TABLES_CONFIG = 212 "hbase.min.version.move.system.tables"; 213 private static final String DEFAULT_MIN_VERSION_MOVE_SYS_TABLES_CONFIG = ""; 214 215 private final Map<ServerName, Set<byte[]>> rsReports = new HashMap<>(); 216 217 private final boolean shouldAssignRegionsWithFavoredNodes; 218 private final int assignDispatchWaitQueueMaxSize; 219 private final int assignDispatchWaitMillis; 220 private final int assignMaxAttempts; 221 private final int assignRetryImmediatelyMaxAttempts; 222 223 private final MasterRegion masterRegion; 224 225 private final Object checkIfShouldMoveSystemRegionLock = new Object(); 226 227 private Thread assignThread; 228 229 private final boolean forceRegionRetainment; 230 231 private final long forceRegionRetainmentWaitInterval; 232 233 private final int forceRegionRetainmentRetries; 234 235 public AssignmentManager(MasterServices master, MasterRegion masterRegion) { 236 this(master, masterRegion, new RegionStateStore(master, masterRegion)); 237 } 238 239 AssignmentManager(MasterServices master, MasterRegion masterRegion, RegionStateStore stateStore) { 240 this.master = master; 241 this.regionStateStore = stateStore; 242 this.metrics = new MetricsAssignmentManager(); 243 this.masterRegion = masterRegion; 244 245 final Configuration conf = master.getConfiguration(); 246 247 // Only read favored nodes if using the favored nodes load balancer. 248 this.shouldAssignRegionsWithFavoredNodes = FavoredStochasticBalancer.class 249 .isAssignableFrom(conf.getClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, Object.class)); 250 251 this.assignDispatchWaitMillis = 252 conf.getInt(ASSIGN_DISPATCH_WAIT_MSEC_CONF_KEY, DEFAULT_ASSIGN_DISPATCH_WAIT_MSEC); 253 this.assignDispatchWaitQueueMaxSize = 254 conf.getInt(ASSIGN_DISPATCH_WAITQ_MAX_CONF_KEY, DEFAULT_ASSIGN_DISPATCH_WAITQ_MAX); 255 256 this.assignMaxAttempts = 257 Math.max(1, conf.getInt(ASSIGN_MAX_ATTEMPTS, DEFAULT_ASSIGN_MAX_ATTEMPTS)); 258 this.assignRetryImmediatelyMaxAttempts = conf.getInt(ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS, 259 DEFAULT_ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS); 260 261 int ritChoreInterval = 262 conf.getInt(RIT_CHORE_INTERVAL_MSEC_CONF_KEY, DEFAULT_RIT_CHORE_INTERVAL_MSEC); 263 this.ritChore = new RegionInTransitionChore(ritChoreInterval); 264 265 int deadRegionChoreInterval = conf.getInt(DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC_CONF_KEY, 266 DEFAULT_DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC); 267 if (deadRegionChoreInterval > 0) { 268 this.deadMetricChore = new DeadServerMetricRegionChore(deadRegionChoreInterval); 269 } else { 270 this.deadMetricChore = null; 271 } 272 minVersionToMoveSysTables = 273 conf.get(MIN_VERSION_MOVE_SYS_TABLES_CONFIG, DEFAULT_MIN_VERSION_MOVE_SYS_TABLES_CONFIG); 274 275 forceRegionRetainment = 276 conf.getBoolean(FORCE_REGION_RETAINMENT, DEFAULT_FORCE_REGION_RETAINMENT); 277 forceRegionRetainmentWaitInterval = conf.getLong(FORCE_REGION_RETAINMENT_WAIT_INTERVAL, 278 DEFAULT_FORCE_REGION_RETAINMENT_WAIT_INTERVAL); 279 forceRegionRetainmentRetries = 280 conf.getInt(FORCE_REGION_RETAINMENT_RETRIES, DEFAULT_FORCE_REGION_RETAINMENT_RETRIES); 281 } 282 283 private void mirrorMetaLocations() throws IOException, KeeperException { 284 // For compatibility, mirror the meta region state to zookeeper 285 // And we still need to use zookeeper to publish the meta region locations to region 286 // server, so they can serve as ClientMetaService 287 ZKWatcher zk = master.getZooKeeper(); 288 if (zk == null || !zk.getRecoverableZooKeeper().getState().isAlive()) { 289 // this is possible in tests, we do not provide a zk watcher or the zk watcher has been closed 290 return; 291 } 292 Collection<RegionStateNode> metaStates = regionStates.getRegionStateNodes(); 293 for (RegionStateNode metaState : metaStates) { 294 MetaTableLocator.setMetaLocation(zk, metaState.getRegionLocation(), 295 metaState.getRegionInfo().getReplicaId(), metaState.getState()); 296 } 297 int replicaCount = metaStates.size(); 298 // remove extra mirror locations 299 for (String znode : zk.getMetaReplicaNodes()) { 300 int replicaId = zk.getZNodePaths().getMetaReplicaIdFromZNode(znode); 301 if (replicaId >= replicaCount) { 302 MetaTableLocator.deleteMetaLocation(zk, replicaId); 303 } 304 } 305 } 306 307 public void start() throws IOException, KeeperException { 308 if (!running.compareAndSet(false, true)) { 309 return; 310 } 311 312 LOG.trace("Starting assignment manager"); 313 314 // Start the Assignment Thread 315 startAssignmentThread(); 316 // load meta region states. 317 // here we are still in the early steps of active master startup. There is only one thread(us) 318 // can access AssignmentManager and create region node, so here we do not need to lock the 319 // region node. 320 try (ResultScanner scanner = 321 masterRegion.getScanner(new Scan().addFamily(HConstants.CATALOG_FAMILY))) { 322 for (;;) { 323 Result result = scanner.next(); 324 if (result == null) { 325 break; 326 } 327 RegionStateStore 328 .visitMetaEntry((r, regionInfo, state, regionLocation, lastHost, openSeqNum) -> { 329 RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo); 330 regionNode.setState(state); 331 regionNode.setLastHost(lastHost); 332 regionNode.setRegionLocation(regionLocation); 333 regionNode.setOpenSeqNum(openSeqNum); 334 if (regionNode.getProcedure() != null) { 335 regionNode.getProcedure().stateLoaded(this, regionNode); 336 } 337 if (regionLocation != null) { 338 // TODO: this could lead to some orphan server state nodes, as it is possible that the 339 // region server is already dead and its SCP has already finished but we have 340 // persisted an opening state on this region server. Finally the TRSP will assign the 341 // region to another region server, so it will not cause critical problems, just waste 342 // some memory as no one will try to cleanup these orphan server state nodes. 343 regionStates.createServer(regionLocation); 344 regionStates.addRegionToServer(regionNode); 345 } 346 if (RegionReplicaUtil.isDefaultReplica(regionInfo.getReplicaId())) { 347 setMetaAssigned(regionInfo, state == State.OPEN); 348 } 349 LOG.debug("Loaded hbase:meta {}", regionNode); 350 }, result); 351 } 352 } 353 mirrorMetaLocations(); 354 } 355 356 /** 357 * Create RegionStateNode based on the TRSP list, and attach the TRSP to the RegionStateNode. 358 * <p> 359 * This is used to restore the RIT region list, so we do not need to restore it in the loadingMeta 360 * method below. And it is also very important as now before submitting a TRSP, we need to attach 361 * it to the RegionStateNode, which acts like a guard, so we need to restore this information at 362 * the very beginning, before we start processing any procedures. 363 */ 364 public void setupRIT(List<TransitRegionStateProcedure> procs) { 365 procs.forEach(proc -> { 366 RegionInfo regionInfo = proc.getRegion(); 367 RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo); 368 TransitRegionStateProcedure existingProc = regionNode.getProcedure(); 369 if (existingProc != null) { 370 // This is possible, as we will detach the procedure from the RSN before we 371 // actually finish the procedure. This is because that, we will detach the TRSP from the RSN 372 // during execution, at that time, the procedure has not been marked as done in the pv2 373 // framework yet, so it is possible that we schedule a new TRSP immediately and when 374 // arriving here, we will find out that there are multiple TRSPs for the region. But we can 375 // make sure that, only the last one can take the charge, the previous ones should have all 376 // been finished already. So here we will compare the proc id, the greater one will win. 377 if (existingProc.getProcId() < proc.getProcId()) { 378 // the new one wins, unset and set it to the new one below 379 regionNode.unsetProcedure(existingProc); 380 } else { 381 // the old one wins, skip 382 return; 383 } 384 } 385 LOG.info("Attach {} to {} to restore RIT", proc, regionNode); 386 regionNode.setProcedure(proc); 387 }); 388 } 389 390 public void stop() { 391 if (!running.compareAndSet(true, false)) { 392 return; 393 } 394 395 LOG.info("Stopping assignment manager"); 396 397 // The AM is started before the procedure executor, 398 // but the actual work will be loaded/submitted only once we have the executor 399 final boolean hasProcExecutor = master.getMasterProcedureExecutor() != null; 400 401 // Remove the RIT chore 402 if (hasProcExecutor) { 403 master.getMasterProcedureExecutor().removeChore(this.ritChore); 404 if (this.deadMetricChore != null) { 405 master.getMasterProcedureExecutor().removeChore(this.deadMetricChore); 406 } 407 } 408 409 // Stop the Assignment Thread 410 stopAssignmentThread(); 411 412 // Stop the RegionStateStore 413 regionStates.clear(); 414 415 // Update meta events (for testing) 416 if (hasProcExecutor) { 417 metaLoadEvent.suspend(); 418 for (RegionInfo hri : getMetaRegionSet()) { 419 setMetaAssigned(hri, false); 420 } 421 } 422 } 423 424 public boolean isRunning() { 425 return running.get(); 426 } 427 428 public Configuration getConfiguration() { 429 return master.getConfiguration(); 430 } 431 432 public MetricsAssignmentManager getAssignmentManagerMetrics() { 433 return metrics; 434 } 435 436 private LoadBalancer getBalancer() { 437 return master.getLoadBalancer(); 438 } 439 440 private FavoredNodesPromoter getFavoredNodePromoter() { 441 return (FavoredNodesPromoter) ((RSGroupBasedLoadBalancer) master.getLoadBalancer()) 442 .getInternalBalancer(); 443 } 444 445 private MasterProcedureEnv getProcedureEnvironment() { 446 return master.getMasterProcedureExecutor().getEnvironment(); 447 } 448 449 private MasterProcedureScheduler getProcedureScheduler() { 450 return getProcedureEnvironment().getProcedureScheduler(); 451 } 452 453 int getAssignMaxAttempts() { 454 return assignMaxAttempts; 455 } 456 457 public boolean isForceRegionRetainment() { 458 return forceRegionRetainment; 459 } 460 461 public long getForceRegionRetainmentWaitInterval() { 462 return forceRegionRetainmentWaitInterval; 463 } 464 465 public int getForceRegionRetainmentRetries() { 466 return forceRegionRetainmentRetries; 467 } 468 469 int getAssignRetryImmediatelyMaxAttempts() { 470 return assignRetryImmediatelyMaxAttempts; 471 } 472 473 public RegionStates getRegionStates() { 474 return regionStates; 475 } 476 477 /** 478 * Returns the regions hosted by the specified server. 479 * <p/> 480 * Notice that, for SCP, after we submit the SCP, no one can change the region list for the 481 * ServerStateNode so we do not need any locks here. And for other usage, this can only give you a 482 * snapshot of the current region list for this server, which means, right after you get the 483 * region list, new regions may be moved to this server or some regions may be moved out from this 484 * server, so you should not use it critically if you need strong consistency. 485 */ 486 public List<RegionInfo> getRegionsOnServer(ServerName serverName) { 487 ServerStateNode serverInfo = regionStates.getServerNode(serverName); 488 if (serverInfo == null) { 489 return Collections.emptyList(); 490 } 491 return serverInfo.getRegionInfoList(); 492 } 493 494 private RegionInfo getRegionInfo(RegionStateNode rsn) { 495 if (rsn.isSplit() && !rsn.getRegionInfo().isSplit()) { 496 // see the comments in markRegionAsSplit on why we need to do this converting. 497 return RegionInfoBuilder.newBuilder(rsn.getRegionInfo()).setSplit(true).setOffline(true) 498 .build(); 499 } else { 500 return rsn.getRegionInfo(); 501 } 502 } 503 504 private Stream<RegionStateNode> getRegionStateNodes(TableName tableName, 505 boolean excludeOfflinedSplitParents) { 506 Stream<RegionStateNode> stream = regionStates.getTableRegionStateNodes(tableName).stream(); 507 if (excludeOfflinedSplitParents) { 508 return stream.filter(rsn -> !rsn.isSplit()); 509 } else { 510 return stream; 511 } 512 } 513 514 public List<RegionInfo> getTableRegions(TableName tableName, 515 boolean excludeOfflinedSplitParents) { 516 return getRegionStateNodes(tableName, excludeOfflinedSplitParents).map(this::getRegionInfo) 517 .collect(Collectors.toList()); 518 } 519 520 public List<Pair<RegionInfo, ServerName>> getTableRegionsAndLocations(TableName tableName, 521 boolean excludeOfflinedSplitParents) { 522 return getRegionStateNodes(tableName, excludeOfflinedSplitParents) 523 .map(rsn -> Pair.newPair(getRegionInfo(rsn), rsn.getRegionLocation())) 524 .collect(Collectors.toList()); 525 } 526 527 public RegionStateStore getRegionStateStore() { 528 return regionStateStore; 529 } 530 531 public List<ServerName> getFavoredNodes(final RegionInfo regionInfo) { 532 return this.shouldAssignRegionsWithFavoredNodes 533 ? getFavoredNodePromoter().getFavoredNodes(regionInfo) 534 : ServerName.EMPTY_SERVER_LIST; 535 } 536 537 // ============================================================================================ 538 // Table State Manager helpers 539 // ============================================================================================ 540 private TableStateManager getTableStateManager() { 541 return master.getTableStateManager(); 542 } 543 544 private boolean isTableEnabled(final TableName tableName) { 545 return getTableStateManager().isTableState(tableName, TableState.State.ENABLED); 546 } 547 548 private boolean isTableDisabled(final TableName tableName) { 549 return getTableStateManager().isTableState(tableName, TableState.State.DISABLED, 550 TableState.State.DISABLING); 551 } 552 553 // ============================================================================================ 554 // META Helpers 555 // ============================================================================================ 556 private boolean isMetaRegion(final RegionInfo regionInfo) { 557 return regionInfo.isMetaRegion(); 558 } 559 560 public boolean isMetaRegion(final byte[] regionName) { 561 return getMetaRegionFromName(regionName) != null; 562 } 563 564 public RegionInfo getMetaRegionFromName(final byte[] regionName) { 565 for (RegionInfo hri : getMetaRegionSet()) { 566 if (Bytes.equals(hri.getRegionName(), regionName)) { 567 return hri; 568 } 569 } 570 return null; 571 } 572 573 public boolean isCarryingMeta(final ServerName serverName) { 574 // TODO: handle multiple meta 575 return isCarryingRegion(serverName, RegionInfoBuilder.FIRST_META_REGIONINFO); 576 } 577 578 private boolean isCarryingRegion(final ServerName serverName, final RegionInfo regionInfo) { 579 // TODO: check for state? 580 final RegionStateNode node = regionStates.getRegionStateNode(regionInfo); 581 return (node != null && serverName.equals(node.getRegionLocation())); 582 } 583 584 private RegionInfo getMetaForRegion(final RegionInfo regionInfo) { 585 // if (regionInfo.isMetaRegion()) return regionInfo; 586 // TODO: handle multiple meta. if the region provided is not meta lookup 587 // which meta the region belongs to. 588 return RegionInfoBuilder.FIRST_META_REGIONINFO; 589 } 590 591 // TODO: handle multiple meta. 592 private static final Set<RegionInfo> META_REGION_SET = 593 Collections.singleton(RegionInfoBuilder.FIRST_META_REGIONINFO); 594 595 public Set<RegionInfo> getMetaRegionSet() { 596 return META_REGION_SET; 597 } 598 599 // ============================================================================================ 600 // META Event(s) helpers 601 // ============================================================================================ 602 /** 603 * Notice that, this only means the meta region is available on a RS, but the AM may still be 604 * loading the region states from meta, so usually you need to check {@link #isMetaLoaded()} first 605 * before checking this method, unless you can make sure that your piece of code can only be 606 * executed after AM builds the region states. 607 * @see #isMetaLoaded() 608 */ 609 public boolean isMetaAssigned() { 610 return metaAssignEvent.isReady(); 611 } 612 613 public boolean isMetaRegionInTransition() { 614 return !isMetaAssigned(); 615 } 616 617 /** 618 * Notice that this event does not mean the AM has already finished region state rebuilding. See 619 * the comment of {@link #isMetaAssigned()} for more details. 620 * @see #isMetaAssigned() 621 */ 622 public boolean waitMetaAssigned(Procedure<?> proc, RegionInfo regionInfo) { 623 return getMetaAssignEvent(getMetaForRegion(regionInfo)).suspendIfNotReady(proc); 624 } 625 626 private void setMetaAssigned(RegionInfo metaRegionInfo, boolean assigned) { 627 assert isMetaRegion(metaRegionInfo) : "unexpected non-meta region " + metaRegionInfo; 628 ProcedureEvent<?> metaAssignEvent = getMetaAssignEvent(metaRegionInfo); 629 if (assigned) { 630 metaAssignEvent.wake(getProcedureScheduler()); 631 } else { 632 metaAssignEvent.suspend(); 633 } 634 } 635 636 private ProcedureEvent<?> getMetaAssignEvent(RegionInfo metaRegionInfo) { 637 assert isMetaRegion(metaRegionInfo) : "unexpected non-meta region " + metaRegionInfo; 638 // TODO: handle multiple meta. 639 return metaAssignEvent; 640 } 641 642 /** 643 * Wait until AM finishes the meta loading, i.e, the region states rebuilding. 644 * @see #isMetaLoaded() 645 * @see #waitMetaAssigned(Procedure, RegionInfo) 646 */ 647 public boolean waitMetaLoaded(Procedure<?> proc) { 648 return metaLoadEvent.suspendIfNotReady(proc); 649 } 650 651 /** 652 * This method will be called in master initialization method after calling 653 * {@link #processOfflineRegions()}, as in processOfflineRegions we will generate assign 654 * procedures for offline regions, which may be conflict with creating table. 655 * <p/> 656 * This is a bit dirty, should be reconsidered after we decide whether to keep the 657 * {@link #processOfflineRegions()} method. 658 */ 659 public void wakeMetaLoadedEvent() { 660 metaLoadEvent.wake(getProcedureScheduler()); 661 assert isMetaLoaded() : "expected meta to be loaded"; 662 } 663 664 /** 665 * Return whether AM finishes the meta loading, i.e, the region states rebuilding. 666 * @see #isMetaAssigned() 667 * @see #waitMetaLoaded(Procedure) 668 */ 669 public boolean isMetaLoaded() { 670 return metaLoadEvent.isReady(); 671 } 672 673 /** 674 * Start a new thread to check if there are region servers whose versions are higher than others. 675 * If so, move all system table regions to RS with the highest version to keep compatibility. The 676 * reason is, RS in new version may not be able to access RS in old version when there are some 677 * incompatible changes. 678 * <p> 679 * This method is called when a new RegionServer is added to cluster only. 680 * </p> 681 */ 682 public void checkIfShouldMoveSystemRegionAsync() { 683 // TODO: Fix this thread. If a server is killed and a new one started, this thread thinks that 684 // it should 'move' the system tables from the old server to the new server but 685 // ServerCrashProcedure is on it; and it will take care of the assign without dataloss. 686 if (this.master.getServerManager().countOfRegionServers() <= 1) { 687 return; 688 } 689 // This thread used to run whenever there was a change in the cluster. The ZooKeeper 690 // childrenChanged notification came in before the nodeDeleted message and so this method 691 // cold run before a ServerCrashProcedure could run. That meant that this thread could see 692 // a Crashed Server before ServerCrashProcedure and it could find system regions on the 693 // crashed server and go move them before ServerCrashProcedure had a chance; could be 694 // dataloss too if WALs were not recovered. 695 new Thread(() -> { 696 try { 697 synchronized (checkIfShouldMoveSystemRegionLock) { 698 List<RegionPlan> plans = new ArrayList<>(); 699 // TODO: I don't think this code does a good job if all servers in cluster have same 700 // version. It looks like it will schedule unnecessary moves. 701 for (ServerName server : getExcludedServersForSystemTable()) { 702 if (master.getServerManager().isServerDead(server)) { 703 // TODO: See HBASE-18494 and HBASE-18495. Though getExcludedServersForSystemTable() 704 // considers only online servers, the server could be queued for dead server 705 // processing. As region assignments for crashed server is handled by 706 // ServerCrashProcedure, do NOT handle them here. The goal is to handle this through 707 // regular flow of LoadBalancer as a favored node and not to have this special 708 // handling. 709 continue; 710 } 711 List<RegionInfo> regionsShouldMove = getSystemTables(server); 712 if (!regionsShouldMove.isEmpty()) { 713 for (RegionInfo regionInfo : regionsShouldMove) { 714 // null value for dest forces destination server to be selected by balancer 715 RegionPlan plan = new RegionPlan(regionInfo, server, null); 716 if (regionInfo.isMetaRegion()) { 717 // Must move meta region first. 718 LOG.info("Async MOVE of {} to newer Server={}", regionInfo.getEncodedName(), 719 server); 720 moveAsync(plan); 721 } else { 722 plans.add(plan); 723 } 724 } 725 } 726 for (RegionPlan plan : plans) { 727 LOG.info("Async MOVE of {} to newer Server={}", plan.getRegionInfo().getEncodedName(), 728 server); 729 moveAsync(plan); 730 } 731 } 732 } 733 } catch (Throwable t) { 734 LOG.error(t.toString(), t); 735 } 736 }).start(); 737 } 738 739 private List<RegionInfo> getSystemTables(ServerName serverName) { 740 ServerStateNode serverNode = regionStates.getServerNode(serverName); 741 if (serverNode == null) { 742 return Collections.emptyList(); 743 } 744 return serverNode.getSystemRegionInfoList(); 745 } 746 747 private void preTransitCheck(RegionStateNode regionNode, RegionState.State[] expectedStates) 748 throws HBaseIOException { 749 if (regionNode.getProcedure() != null) { 750 throw new HBaseIOException( 751 regionNode + " is currently in transition; pid=" + regionNode.getProcedure().getProcId()); 752 } 753 if (!regionNode.isInState(expectedStates)) { 754 throw new DoNotRetryRegionException(UNEXPECTED_STATE_REGION + regionNode); 755 } 756 if (isTableDisabled(regionNode.getTable())) { 757 throw new DoNotRetryIOException(regionNode.getTable() + " is disabled for " + regionNode); 758 } 759 } 760 761 /** 762 * Create an assign TransitRegionStateProcedure. Makes sure of RegionState. Throws exception if 763 * not appropriate UNLESS override is set. Used by hbck2 but also by straightline 764 * {@link #assign(RegionInfo, ServerName)} and {@link #assignAsync(RegionInfo, ServerName)}. 765 * @see #createAssignProcedure(RegionStateNode, ServerName) for a version that does NO checking 766 * used when only when no checking needed. 767 * @param override If false, check RegionState is appropriate for assign; if not throw exception. 768 */ 769 private TransitRegionStateProcedure createAssignProcedure(RegionInfo regionInfo, ServerName sn, 770 boolean override, boolean force) throws IOException { 771 RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo); 772 regionNode.lock(); 773 try { 774 if (override) { 775 if (!force) { 776 preTransitCheck(regionNode, STATES_EXPECTED_ON_ASSIGN); 777 } 778 if (regionNode.getProcedure() != null) { 779 regionNode.unsetProcedure(regionNode.getProcedure()); 780 } 781 } else { 782 preTransitCheck(regionNode, STATES_EXPECTED_ON_ASSIGN); 783 } 784 assert regionNode.getProcedure() == null; 785 return regionNode.setProcedure( 786 TransitRegionStateProcedure.assign(getProcedureEnvironment(), regionInfo, sn)); 787 } finally { 788 regionNode.unlock(); 789 } 790 } 791 792 /** 793 * Create an assign TransitRegionStateProcedure. Does NO checking of RegionState. Presumes 794 * appriopriate state ripe for assign. 795 * @see #createAssignProcedure(RegionInfo, ServerName, boolean, boolean) 796 */ 797 private TransitRegionStateProcedure createAssignProcedure(RegionStateNode regionNode, 798 ServerName targetServer) { 799 regionNode.lock(); 800 try { 801 return regionNode.setProcedure(TransitRegionStateProcedure.assign(getProcedureEnvironment(), 802 regionNode.getRegionInfo(), targetServer)); 803 } finally { 804 regionNode.unlock(); 805 } 806 } 807 808 public long assign(RegionInfo regionInfo, ServerName sn) throws IOException { 809 TransitRegionStateProcedure proc = createAssignProcedure(regionInfo, sn, false, false); 810 ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc); 811 return proc.getProcId(); 812 } 813 814 public long assign(RegionInfo regionInfo) throws IOException { 815 return assign(regionInfo, null); 816 } 817 818 /** 819 * Submits a procedure that assigns a region to a target server without waiting for it to finish 820 * @param regionInfo the region we would like to assign 821 * @param sn target server name 822 */ 823 public Future<byte[]> assignAsync(RegionInfo regionInfo, ServerName sn) throws IOException { 824 return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), 825 createAssignProcedure(regionInfo, sn, false, false)); 826 } 827 828 /** 829 * Submits a procedure that assigns a region without waiting for it to finish 830 * @param regionInfo the region we would like to assign 831 */ 832 public Future<byte[]> assignAsync(RegionInfo regionInfo) throws IOException { 833 return assignAsync(regionInfo, null); 834 } 835 836 public long unassign(RegionInfo regionInfo) throws IOException { 837 RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo); 838 if (regionNode == null) { 839 throw new UnknownRegionException("No RegionState found for " + regionInfo.getEncodedName()); 840 } 841 TransitRegionStateProcedure proc; 842 regionNode.lock(); 843 try { 844 preTransitCheck(regionNode, STATES_EXPECTED_ON_UNASSIGN_OR_MOVE); 845 proc = TransitRegionStateProcedure.unassign(getProcedureEnvironment(), regionInfo); 846 regionNode.setProcedure(proc); 847 } finally { 848 regionNode.unlock(); 849 } 850 ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc); 851 return proc.getProcId(); 852 } 853 854 public TransitRegionStateProcedure createMoveRegionProcedure(RegionInfo regionInfo, 855 ServerName targetServer) throws HBaseIOException { 856 RegionStateNode regionNode = this.regionStates.getRegionStateNode(regionInfo); 857 if (regionNode == null) { 858 throw new UnknownRegionException( 859 "No RegionStateNode found for " + regionInfo.getEncodedName() + "(Closed/Deleted?)"); 860 } 861 TransitRegionStateProcedure proc; 862 regionNode.lock(); 863 try { 864 preTransitCheck(regionNode, STATES_EXPECTED_ON_UNASSIGN_OR_MOVE); 865 regionNode.checkOnline(); 866 proc = TransitRegionStateProcedure.move(getProcedureEnvironment(), regionInfo, targetServer); 867 regionNode.setProcedure(proc); 868 } finally { 869 regionNode.unlock(); 870 } 871 return proc; 872 } 873 874 public void move(RegionInfo regionInfo) throws IOException { 875 TransitRegionStateProcedure proc = createMoveRegionProcedure(regionInfo, null); 876 ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc); 877 } 878 879 public Future<byte[]> moveAsync(RegionPlan regionPlan) throws HBaseIOException { 880 TransitRegionStateProcedure proc = 881 createMoveRegionProcedure(regionPlan.getRegionInfo(), regionPlan.getDestination()); 882 return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), proc); 883 } 884 885 public Future<byte[]> balance(RegionPlan regionPlan) throws HBaseIOException { 886 ServerName current = 887 this.getRegionStates().getRegionAssignments().get(regionPlan.getRegionInfo()); 888 if (current == null || !current.equals(regionPlan.getSource())) { 889 LOG.debug("Skip region plan {}, source server not match, current region location is {}", 890 regionPlan, current == null ? "(null)" : current); 891 return null; 892 } 893 return moveAsync(regionPlan); 894 } 895 896 // ============================================================================================ 897 // RegionTransition procedures helpers 898 // ============================================================================================ 899 900 /** 901 * Create round-robin assigns. Use on table creation to distribute out regions across cluster. 902 * @return AssignProcedures made out of the passed in <code>hris</code> and a call to the balancer 903 * to populate the assigns with targets chosen using round-robin (default balancer 904 * scheme). If at assign-time, the target chosen is no longer up, thats fine, the 905 * AssignProcedure will ask the balancer for a new target, and so on. 906 */ 907 public TransitRegionStateProcedure[] createRoundRobinAssignProcedures(List<RegionInfo> hris, 908 List<ServerName> serversToExclude) { 909 if (hris.isEmpty()) { 910 return new TransitRegionStateProcedure[0]; 911 } 912 913 if ( 914 serversToExclude != null && this.master.getServerManager().getOnlineServersList().size() == 1 915 ) { 916 LOG.debug("Only one region server found and hence going ahead with the assignment"); 917 serversToExclude = null; 918 } 919 try { 920 // Ask the balancer to assign our regions. Pass the regions en masse. The balancer can do 921 // a better job if it has all the assignments in the one lump. 922 Map<ServerName, List<RegionInfo>> assignments = getBalancer().roundRobinAssignment(hris, 923 this.master.getServerManager().createDestinationServersList(serversToExclude)); 924 // Return mid-method! 925 return createAssignProcedures(assignments); 926 } catch (IOException hioe) { 927 LOG.warn("Failed roundRobinAssignment", hioe); 928 } 929 // If an error above, fall-through to this simpler assign. Last resort. 930 return createAssignProcedures(hris); 931 } 932 933 /** 934 * Create round-robin assigns. Use on table creation to distribute out regions across cluster. 935 * @return AssignProcedures made out of the passed in <code>hris</code> and a call to the balancer 936 * to populate the assigns with targets chosen using round-robin (default balancer 937 * scheme). If at assign-time, the target chosen is no longer up, thats fine, the 938 * AssignProcedure will ask the balancer for a new target, and so on. 939 */ 940 public TransitRegionStateProcedure[] createRoundRobinAssignProcedures(List<RegionInfo> hris) { 941 return createRoundRobinAssignProcedures(hris, null); 942 } 943 944 static int compare(TransitRegionStateProcedure left, TransitRegionStateProcedure right) { 945 if (left.getRegion().isMetaRegion()) { 946 if (right.getRegion().isMetaRegion()) { 947 return RegionInfo.COMPARATOR.compare(left.getRegion(), right.getRegion()); 948 } 949 return -1; 950 } else if (right.getRegion().isMetaRegion()) { 951 return +1; 952 } 953 if (left.getRegion().getTable().isSystemTable()) { 954 if (right.getRegion().getTable().isSystemTable()) { 955 return RegionInfo.COMPARATOR.compare(left.getRegion(), right.getRegion()); 956 } 957 return -1; 958 } else if (right.getRegion().getTable().isSystemTable()) { 959 return +1; 960 } 961 return RegionInfo.COMPARATOR.compare(left.getRegion(), right.getRegion()); 962 } 963 964 /** 965 * Create one TransitRegionStateProcedure to assign a region w/o specifying a target server. This 966 * method is called from HBCK2. 967 * @return an assign or null 968 */ 969 public TransitRegionStateProcedure createOneAssignProcedure(RegionInfo ri, boolean override, 970 boolean force) { 971 TransitRegionStateProcedure trsp = null; 972 try { 973 trsp = createAssignProcedure(ri, null, override, force); 974 } catch (IOException ioe) { 975 LOG.info( 976 "Failed {} assign, override={}" 977 + (override ? "" : "; set override to by-pass state checks."), 978 ri.getEncodedName(), override, ioe); 979 } 980 return trsp; 981 } 982 983 /** 984 * Create one TransitRegionStateProcedure to unassign a region. This method is called from HBCK2. 985 * @return an unassign or null 986 */ 987 public TransitRegionStateProcedure createOneUnassignProcedure(RegionInfo ri, boolean override, 988 boolean force) { 989 RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(ri); 990 TransitRegionStateProcedure trsp = null; 991 regionNode.lock(); 992 try { 993 if (override) { 994 if (!force) { 995 preTransitCheck(regionNode, STATES_EXPECTED_ON_UNASSIGN_OR_MOVE); 996 } 997 if (regionNode.getProcedure() != null) { 998 regionNode.unsetProcedure(regionNode.getProcedure()); 999 } 1000 } else { 1001 // This is where we could throw an exception; i.e. override is false. 1002 preTransitCheck(regionNode, STATES_EXPECTED_ON_UNASSIGN_OR_MOVE); 1003 } 1004 assert regionNode.getProcedure() == null; 1005 trsp = 1006 TransitRegionStateProcedure.unassign(getProcedureEnvironment(), regionNode.getRegionInfo()); 1007 regionNode.setProcedure(trsp); 1008 } catch (IOException ioe) { 1009 // 'override' must be false here. 1010 LOG.info("Failed {} unassign, override=false; set override to by-pass state checks.", 1011 ri.getEncodedName(), ioe); 1012 } finally { 1013 regionNode.unlock(); 1014 } 1015 return trsp; 1016 } 1017 1018 /** 1019 * Create an array of TransitRegionStateProcedure w/o specifying a target server. Used as fallback 1020 * of caller is unable to do {@link #createAssignProcedures(Map)}. 1021 * <p/> 1022 * If no target server, at assign time, we will try to use the former location of the region if 1023 * one exists. This is how we 'retain' the old location across a server restart. 1024 * <p/> 1025 * Should only be called when you can make sure that no one can touch these regions other than 1026 * you. For example, when you are creating or enabling table. Presumes all Regions are in 1027 * appropriate state ripe for assign; no checking of Region state is done in here. 1028 * @see #createAssignProcedures(Map) 1029 */ 1030 public TransitRegionStateProcedure[] createAssignProcedures(List<RegionInfo> hris) { 1031 return hris.stream().map(hri -> regionStates.getOrCreateRegionStateNode(hri)) 1032 .map(regionNode -> createAssignProcedure(regionNode, null)).sorted(AssignmentManager::compare) 1033 .toArray(TransitRegionStateProcedure[]::new); 1034 } 1035 1036 /** 1037 * Tied to {@link #createAssignProcedures(List)} in that it is called if caller is unable to run 1038 * this method. Presumes all Regions are in appropriate state ripe for assign; no checking of 1039 * Region state is done in here. 1040 * @param assignments Map of assignments from which we produce an array of AssignProcedures. 1041 * @return Assignments made from the passed in <code>assignments</code> 1042 * @see #createAssignProcedures(List) 1043 */ 1044 private TransitRegionStateProcedure[] 1045 createAssignProcedures(Map<ServerName, List<RegionInfo>> assignments) { 1046 return assignments.entrySet().stream() 1047 .flatMap(e -> e.getValue().stream().map(hri -> regionStates.getOrCreateRegionStateNode(hri)) 1048 .map(regionNode -> createAssignProcedure(regionNode, e.getKey()))) 1049 .sorted(AssignmentManager::compare).toArray(TransitRegionStateProcedure[]::new); 1050 } 1051 1052 // for creating unassign TRSP when disabling a table or closing excess region replicas 1053 private TransitRegionStateProcedure forceCreateUnssignProcedure(RegionStateNode regionNode) { 1054 regionNode.lock(); 1055 try { 1056 if (regionNode.isInState(State.OFFLINE, State.CLOSED, State.SPLIT)) { 1057 return null; 1058 } 1059 // in general, a split parent should be in CLOSED or SPLIT state, but anyway, let's check it 1060 // here for safety 1061 if (regionNode.getRegionInfo().isSplit()) { 1062 LOG.warn("{} is a split parent but not in CLOSED or SPLIT state", regionNode); 1063 return null; 1064 } 1065 // As in DisableTableProcedure or ModifyTableProcedure, we will hold the xlock for table, so 1066 // we can make sure that this procedure has not been executed yet, as TRSP will hold the 1067 // shared lock for table all the time. So here we will unset it and when it is actually 1068 // executed, it will find that the attach procedure is not itself and quit immediately. 1069 if (regionNode.getProcedure() != null) { 1070 regionNode.unsetProcedure(regionNode.getProcedure()); 1071 } 1072 return regionNode.setProcedure(TransitRegionStateProcedure.unassign(getProcedureEnvironment(), 1073 regionNode.getRegionInfo())); 1074 } finally { 1075 regionNode.unlock(); 1076 } 1077 } 1078 1079 /** 1080 * Called by DisableTableProcedure to unassign all the regions for a table. 1081 */ 1082 public TransitRegionStateProcedure[] createUnassignProceduresForDisabling(TableName tableName) { 1083 return regionStates.getTableRegionStateNodes(tableName).stream() 1084 .map(this::forceCreateUnssignProcedure).filter(p -> p != null) 1085 .toArray(TransitRegionStateProcedure[]::new); 1086 } 1087 1088 private int submitUnassignProcedure(TableName tableName, 1089 Function<RegionStateNode, Boolean> shouldSubmit, Consumer<RegionStateNode> logRIT, 1090 Consumer<TransitRegionStateProcedure> submit) { 1091 int inTransitionCount = 0; 1092 for (RegionStateNode regionNode : regionStates.getTableRegionStateNodes(tableName)) { 1093 regionNode.lock(); 1094 try { 1095 if (shouldSubmit.apply(regionNode)) { 1096 if (regionNode.isInTransition()) { 1097 logRIT.accept(regionNode); 1098 inTransitionCount++; 1099 continue; 1100 } 1101 if (regionNode.isInState(State.OFFLINE, State.CLOSED, State.SPLIT)) { 1102 continue; 1103 } 1104 submit.accept(regionNode.setProcedure(TransitRegionStateProcedure 1105 .unassign(getProcedureEnvironment(), regionNode.getRegionInfo()))); 1106 } 1107 } finally { 1108 regionNode.unlock(); 1109 } 1110 } 1111 return inTransitionCount; 1112 } 1113 1114 /** 1115 * Called by DsiableTableProcedure to unassign all regions for a table. Will skip submit unassign 1116 * procedure if the region is in transition, so you may need to call this method multiple times. 1117 * @param tableName the table for closing excess region replicas 1118 * @param submit for submitting procedure 1119 * @return the number of regions in transition that we can not schedule unassign procedures 1120 */ 1121 public int submitUnassignProcedureForDisablingTable(TableName tableName, 1122 Consumer<TransitRegionStateProcedure> submit) { 1123 return submitUnassignProcedure(tableName, rn -> true, 1124 rn -> LOG.debug("skip scheduling unassign procedure for {} when closing table regions " 1125 + "for disabling since it is in transition", rn), 1126 submit); 1127 } 1128 1129 /** 1130 * Called by ModifyTableProcedure to unassign all the excess region replicas for a table. Will 1131 * skip submit unassign procedure if the region is in transition, so you may need to call this 1132 * method multiple times. 1133 * @param tableName the table for closing excess region replicas 1134 * @param newReplicaCount the new replica count, should be less than current replica count 1135 * @param submit for submitting procedure 1136 * @return the number of regions in transition that we can not schedule unassign procedures 1137 */ 1138 public int submitUnassignProcedureForClosingExcessRegionReplicas(TableName tableName, 1139 int newReplicaCount, Consumer<TransitRegionStateProcedure> submit) { 1140 return submitUnassignProcedure(tableName, 1141 rn -> rn.getRegionInfo().getReplicaId() >= newReplicaCount, 1142 rn -> LOG.debug("skip scheduling unassign procedure for {} when closing excess region " 1143 + "replicas since it is in transition", rn), 1144 submit); 1145 } 1146 1147 private int numberOfUnclosedRegions(TableName tableName, 1148 Function<RegionStateNode, Boolean> shouldSubmit) { 1149 int unclosed = 0; 1150 for (RegionStateNode regionNode : regionStates.getTableRegionStateNodes(tableName)) { 1151 regionNode.lock(); 1152 try { 1153 if (shouldSubmit.apply(regionNode)) { 1154 if (!regionNode.isInState(State.OFFLINE, State.CLOSED, State.SPLIT)) { 1155 unclosed++; 1156 } 1157 } 1158 } finally { 1159 regionNode.unlock(); 1160 } 1161 } 1162 return unclosed; 1163 } 1164 1165 public int numberOfUnclosedRegionsForDisabling(TableName tableName) { 1166 return numberOfUnclosedRegions(tableName, rn -> true); 1167 } 1168 1169 public int numberOfUnclosedExcessRegionReplicas(TableName tableName, int newReplicaCount) { 1170 return numberOfUnclosedRegions(tableName, 1171 rn -> rn.getRegionInfo().getReplicaId() >= newReplicaCount); 1172 } 1173 1174 public SplitTableRegionProcedure createSplitProcedure(final RegionInfo regionToSplit, 1175 final byte[] splitKey) throws IOException { 1176 return new SplitTableRegionProcedure(getProcedureEnvironment(), regionToSplit, splitKey); 1177 } 1178 1179 public TruncateRegionProcedure createTruncateRegionProcedure(final RegionInfo regionToTruncate) 1180 throws IOException { 1181 return new TruncateRegionProcedure(getProcedureEnvironment(), regionToTruncate); 1182 } 1183 1184 public MergeTableRegionsProcedure createMergeProcedure(RegionInfo... ris) throws IOException { 1185 return new MergeTableRegionsProcedure(getProcedureEnvironment(), ris, false); 1186 } 1187 1188 /** 1189 * Delete the region states. This is called by "DeleteTable" 1190 */ 1191 public void deleteTable(final TableName tableName) throws IOException { 1192 final ArrayList<RegionInfo> regions = regionStates.getTableRegionsInfo(tableName); 1193 regionStateStore.deleteRegions(regions); 1194 for (int i = 0; i < regions.size(); ++i) { 1195 final RegionInfo regionInfo = regions.get(i); 1196 regionStates.deleteRegion(regionInfo); 1197 } 1198 } 1199 1200 // ============================================================================================ 1201 // RS Region Transition Report helpers 1202 // ============================================================================================ 1203 private void reportRegionStateTransition(ReportRegionStateTransitionResponse.Builder builder, 1204 ServerStateNode serverNode, List<RegionStateTransition> transitionList) throws IOException { 1205 for (RegionStateTransition transition : transitionList) { 1206 switch (transition.getTransitionCode()) { 1207 case OPENED: 1208 case FAILED_OPEN: 1209 case CLOSED: 1210 assert transition.getRegionInfoCount() == 1 : transition; 1211 final RegionInfo hri = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0)); 1212 long procId = 1213 transition.getProcIdCount() > 0 ? transition.getProcId(0) : Procedure.NO_PROC_ID; 1214 updateRegionTransition(serverNode, transition.getTransitionCode(), hri, 1215 transition.hasOpenSeqNum() ? transition.getOpenSeqNum() : HConstants.NO_SEQNUM, procId); 1216 break; 1217 case READY_TO_SPLIT: 1218 case SPLIT: 1219 case SPLIT_REVERTED: 1220 assert transition.getRegionInfoCount() == 3 : transition; 1221 final RegionInfo parent = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0)); 1222 final RegionInfo splitA = ProtobufUtil.toRegionInfo(transition.getRegionInfo(1)); 1223 final RegionInfo splitB = ProtobufUtil.toRegionInfo(transition.getRegionInfo(2)); 1224 updateRegionSplitTransition(serverNode, transition.getTransitionCode(), parent, splitA, 1225 splitB); 1226 break; 1227 case READY_TO_MERGE: 1228 case MERGED: 1229 case MERGE_REVERTED: 1230 assert transition.getRegionInfoCount() == 3 : transition; 1231 final RegionInfo merged = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0)); 1232 final RegionInfo mergeA = ProtobufUtil.toRegionInfo(transition.getRegionInfo(1)); 1233 final RegionInfo mergeB = ProtobufUtil.toRegionInfo(transition.getRegionInfo(2)); 1234 updateRegionMergeTransition(serverNode, transition.getTransitionCode(), merged, mergeA, 1235 mergeB); 1236 break; 1237 } 1238 } 1239 } 1240 1241 public ReportRegionStateTransitionResponse reportRegionStateTransition( 1242 final ReportRegionStateTransitionRequest req) throws PleaseHoldException { 1243 ReportRegionStateTransitionResponse.Builder builder = 1244 ReportRegionStateTransitionResponse.newBuilder(); 1245 ServerName serverName = ProtobufUtil.toServerName(req.getServer()); 1246 ServerStateNode serverNode = regionStates.getServerNode(serverName); 1247 if (serverNode == null) { 1248 LOG.warn("No server node for {}", serverName); 1249 builder.setErrorMessage("No server node for " + serverName); 1250 return builder.build(); 1251 } 1252 // here we have to acquire a read lock instead of a simple exclusive lock. This is because that 1253 // we should not block other reportRegionStateTransition call from the same region server. This 1254 // is not only about performance, but also to prevent dead lock. Think of the meta region is 1255 // also on the same region server and you hold the lock which blocks the 1256 // reportRegionStateTransition for meta, and since meta is not online, you will block inside the 1257 // lock protection to wait for meta online... 1258 serverNode.readLock().lock(); 1259 try { 1260 // we only accept reportRegionStateTransition if the region server is online, see the comment 1261 // above in submitServerCrash method and HBASE-21508 for more details. 1262 if (serverNode.isInState(ServerState.ONLINE)) { 1263 try { 1264 reportRegionStateTransition(builder, serverNode, req.getTransitionList()); 1265 } catch (PleaseHoldException e) { 1266 LOG.trace("Failed transition ", e); 1267 throw e; 1268 } catch (UnsupportedOperationException | IOException e) { 1269 // TODO: at the moment we have a single error message and the RS will abort 1270 // if the master says that one of the region transitions failed. 1271 LOG.warn("Failed transition", e); 1272 builder.setErrorMessage("Failed transition " + e.getMessage()); 1273 } 1274 } else { 1275 LOG.warn("The region server {} is already dead, skip reportRegionStateTransition call", 1276 serverName); 1277 builder.setErrorMessage("You are dead"); 1278 } 1279 } finally { 1280 serverNode.readLock().unlock(); 1281 } 1282 1283 return builder.build(); 1284 } 1285 1286 private void updateRegionTransition(ServerStateNode serverNode, TransitionCode state, 1287 RegionInfo regionInfo, long seqId, long procId) throws IOException { 1288 checkMetaLoaded(regionInfo); 1289 1290 RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo); 1291 if (regionNode == null) { 1292 // the table/region is gone. maybe a delete, split, merge 1293 throw new UnexpectedStateException(String.format( 1294 "Server %s was trying to transition region %s to %s. but Region is not known.", 1295 serverNode.getServerName(), regionInfo, state)); 1296 } 1297 LOG.trace("Update region transition serverName={} region={} regionState={}", 1298 serverNode.getServerName(), regionNode, state); 1299 1300 regionNode.lock(); 1301 try { 1302 if (!reportTransition(regionNode, serverNode, state, seqId, procId)) { 1303 // Don't log WARN if shutting down cluster; during shutdown. Avoid the below messages: 1304 // 2018-08-13 10:45:10,551 WARN ...AssignmentManager: No matching procedure found for 1305 // rit=OPEN, location=ve0538.halxg.cloudera.com,16020,1533493000958, 1306 // table=IntegrationTestBigLinkedList, region=65ab289e2fc1530df65f6c3d7cde7aa5 transition 1307 // to CLOSED 1308 // These happen because on cluster shutdown, we currently let the RegionServers close 1309 // regions. This is the only time that region close is not run by the Master (so cluster 1310 // goes down fast). Consider changing it so Master runs all shutdowns. 1311 if ( 1312 this.master.getServerManager().isClusterShutdown() && state.equals(TransitionCode.CLOSED) 1313 ) { 1314 LOG.info("RegionServer {} {}", state, regionNode.getRegionInfo().getEncodedName()); 1315 } else { 1316 LOG.warn("No matching procedure found for {} transition on {} to {}", 1317 serverNode.getServerName(), regionNode, state); 1318 } 1319 } 1320 } finally { 1321 regionNode.unlock(); 1322 } 1323 } 1324 1325 private boolean reportTransition(RegionStateNode regionNode, ServerStateNode serverNode, 1326 TransitionCode state, long seqId, long procId) throws IOException { 1327 ServerName serverName = serverNode.getServerName(); 1328 TransitRegionStateProcedure proc = regionNode.getProcedure(); 1329 if (proc == null) { 1330 return false; 1331 } 1332 proc.reportTransition(master.getMasterProcedureExecutor().getEnvironment(), regionNode, 1333 serverName, state, seqId, procId); 1334 return true; 1335 } 1336 1337 private void updateRegionSplitTransition(final ServerStateNode serverNode, 1338 final TransitionCode state, final RegionInfo parent, final RegionInfo hriA, 1339 final RegionInfo hriB) throws IOException { 1340 checkMetaLoaded(parent); 1341 1342 if (state != TransitionCode.READY_TO_SPLIT) { 1343 throw new UnexpectedStateException( 1344 "unsupported split regionState=" + state + " for parent region " + parent 1345 + " maybe an old RS (< 2.0) had the operation in progress"); 1346 } 1347 1348 // sanity check on the request 1349 if (!Bytes.equals(hriA.getEndKey(), hriB.getStartKey())) { 1350 throw new UnsupportedOperationException("unsupported split request with bad keys: parent=" 1351 + parent + " hriA=" + hriA + " hriB=" + hriB); 1352 } 1353 1354 if (!master.isSplitOrMergeEnabled(MasterSwitchType.SPLIT)) { 1355 LOG.warn("Split switch is off! skip split of " + parent); 1356 throw new DoNotRetryIOException( 1357 "Split region " + parent.getRegionNameAsString() + " failed due to split switch off"); 1358 } 1359 1360 // Submit the Split procedure 1361 final byte[] splitKey = hriB.getStartKey(); 1362 if (LOG.isDebugEnabled()) { 1363 LOG.debug("Split request from {}, parent={}, splitKey={}", serverNode.getServerName(), parent, 1364 Bytes.toStringBinary(splitKey)); 1365 } 1366 // Processing this report happens asynchronously from other activities which can mutate 1367 // the region state. For example, a split procedure may already be running for this parent. 1368 // A split procedure cannot succeed if the parent region is no longer open, so we can 1369 // ignore it in that case. 1370 // Note that submitting more than one split procedure for a given region is 1371 // harmless -- the split is fenced in the procedure handling -- but it would be noisy in 1372 // the logs. Only one procedure can succeed. The other procedure(s) would abort during 1373 // initialization and report failure with WARN level logging. 1374 RegionState parentState = regionStates.getRegionState(parent); 1375 if (parentState != null && parentState.isOpened()) { 1376 master.getMasterProcedureExecutor().submitProcedure(createSplitProcedure(parent, splitKey)); 1377 } else { 1378 LOG.info("Ignoring split request from {}, parent={} because parent is unknown or not open", 1379 serverNode.getServerName(), parent); 1380 return; 1381 } 1382 1383 // If the RS is < 2.0 throw an exception to abort the operation, we are handling the split 1384 if (master.getServerManager().getVersionNumber(serverNode.getServerName()) < 0x0200000) { 1385 throw new UnsupportedOperationException( 1386 String.format("Split handled by the master: " + "parent=%s hriA=%s hriB=%s", 1387 parent.getShortNameToLog(), hriA, hriB)); 1388 } 1389 } 1390 1391 private void updateRegionMergeTransition(final ServerStateNode serverNode, 1392 final TransitionCode state, final RegionInfo merged, final RegionInfo hriA, 1393 final RegionInfo hriB) throws IOException { 1394 checkMetaLoaded(merged); 1395 1396 if (state != TransitionCode.READY_TO_MERGE) { 1397 throw new UnexpectedStateException( 1398 "Unsupported merge regionState=" + state + " for regionA=" + hriA + " regionB=" + hriB 1399 + " merged=" + merged + " maybe an old RS (< 2.0) had the operation in progress"); 1400 } 1401 1402 if (!master.isSplitOrMergeEnabled(MasterSwitchType.MERGE)) { 1403 LOG.warn("Merge switch is off! skip merge of regionA=" + hriA + " regionB=" + hriB); 1404 throw new DoNotRetryIOException( 1405 "Merge of regionA=" + hriA + " regionB=" + hriB + " failed because merge switch is off"); 1406 } 1407 1408 // Submit the Merge procedure 1409 if (LOG.isDebugEnabled()) { 1410 LOG.debug("Handling merge request from RS=" + merged + ", merged=" + merged); 1411 } 1412 master.getMasterProcedureExecutor().submitProcedure(createMergeProcedure(hriA, hriB)); 1413 1414 // If the RS is < 2.0 throw an exception to abort the operation, we are handling the merge 1415 if (master.getServerManager().getVersionNumber(serverNode.getServerName()) < 0x0200000) { 1416 throw new UnsupportedOperationException( 1417 String.format("Merge not handled yet: regionState=%s merged=%s hriA=%s hriB=%s", state, 1418 merged, hriA, hriB)); 1419 } 1420 } 1421 1422 // ============================================================================================ 1423 // RS Status update (report online regions) helpers 1424 // ============================================================================================ 1425 /** 1426 * The master will call this method when the RS send the regionServerReport(). The report will 1427 * contains the "online regions". This method will check the the online regions against the 1428 * in-memory state of the AM, and we will log a warn message if there is a mismatch. This is 1429 * because that there is no fencing between the reportRegionStateTransition method and 1430 * regionServerReport method, so there could be race and introduce inconsistency here, but 1431 * actually there is no problem. 1432 * <p/> 1433 * Please see HBASE-21421 and HBASE-21463 for more details. 1434 */ 1435 public void reportOnlineRegions(ServerName serverName, Set<byte[]> regionNames) { 1436 if (!isRunning()) { 1437 return; 1438 } 1439 if (LOG.isTraceEnabled()) { 1440 LOG.trace("ReportOnlineRegions {} regionCount={}, metaLoaded={} {}", serverName, 1441 regionNames.size(), isMetaLoaded(), 1442 regionNames.stream().map(Bytes::toStringBinary).collect(Collectors.toList())); 1443 } 1444 1445 ServerStateNode serverNode = regionStates.getServerNode(serverName); 1446 if (serverNode == null) { 1447 LOG.warn("Got a report from server {} where its server node is null", serverName); 1448 return; 1449 } 1450 serverNode.readLock().lock(); 1451 try { 1452 if (!serverNode.isInState(ServerState.ONLINE)) { 1453 LOG.warn("Got a report from a server result in state {}", serverNode); 1454 return; 1455 } 1456 } finally { 1457 serverNode.readLock().unlock(); 1458 } 1459 1460 // Track the regionserver reported online regions in memory. 1461 synchronized (rsReports) { 1462 rsReports.put(serverName, regionNames); 1463 } 1464 1465 if (regionNames.isEmpty()) { 1466 // nothing to do if we don't have regions 1467 LOG.trace("no online region found on {}", serverName); 1468 return; 1469 } 1470 if (!isMetaLoaded()) { 1471 // we are still on startup, skip checking 1472 return; 1473 } 1474 // The Heartbeat tells us of what regions are on the region serve, check the state. 1475 checkOnlineRegionsReport(serverNode, regionNames); 1476 } 1477 1478 /** 1479 * Close <code>regionName</code> on <code>sn</code> silently and immediately without using a 1480 * Procedure or going via hbase:meta. For case where a RegionServer's hosting of a Region is not 1481 * aligned w/ the Master's accounting of Region state. This is for cleaning up an error in 1482 * accounting. 1483 */ 1484 private void closeRegionSilently(ServerName sn, byte[] regionName) { 1485 try { 1486 RegionInfo ri = CatalogFamilyFormat.parseRegionInfoFromRegionName(regionName); 1487 // Pass -1 for timeout. Means do not wait. 1488 ServerManager.closeRegionSilentlyAndWait(this.master.getAsyncClusterConnection(), sn, ri, -1); 1489 } catch (Exception e) { 1490 LOG.error("Failed trying to close {} on {}", Bytes.toStringBinary(regionName), sn, e); 1491 } 1492 } 1493 1494 /** 1495 * Check that what the RegionServer reports aligns with the Master's image. If disagreement, we 1496 * will tell the RegionServer to expediently close a Region we do not think it should have. 1497 */ 1498 private void checkOnlineRegionsReport(ServerStateNode serverNode, Set<byte[]> regionNames) { 1499 ServerName serverName = serverNode.getServerName(); 1500 for (byte[] regionName : regionNames) { 1501 if (!isRunning()) { 1502 return; 1503 } 1504 RegionStateNode regionNode = regionStates.getRegionStateNodeFromName(regionName); 1505 if (regionNode == null) { 1506 String regionNameAsStr = Bytes.toStringBinary(regionName); 1507 LOG.warn("No RegionStateNode for {} but reported as up on {}; closing...", regionNameAsStr, 1508 serverName); 1509 closeRegionSilently(serverNode.getServerName(), regionName); 1510 continue; 1511 } 1512 final long lag = 1000; 1513 // This is just a fallback check designed to identify unexpected data inconsistencies, so we 1514 // use tryLock to attempt to acquire the lock, and if the lock cannot be acquired, we skip the 1515 // check. This will not cause any additional problems and also prevents the regionServerReport 1516 // call from being stuck for too long which may cause deadlock on region assignment. 1517 if (regionNode.tryLock()) { 1518 try { 1519 long diff = EnvironmentEdgeManager.currentTime() - regionNode.getLastUpdate(); 1520 if (regionNode.isInState(State.OPENING, State.OPEN)) { 1521 // This is possible as a region server has just closed a region but the region server 1522 // report is generated before the closing, but arrive after the closing. Make sure 1523 // there 1524 // is some elapsed time so less false alarms. 1525 if (!regionNode.getRegionLocation().equals(serverName) && diff > lag) { 1526 LOG.warn("Reporting {} server does not match {} (time since last " 1527 + "update={}ms); closing...", serverName, regionNode, diff); 1528 closeRegionSilently(serverNode.getServerName(), regionName); 1529 } 1530 } else if (!regionNode.isInState(State.CLOSING, State.SPLITTING)) { 1531 // So, we can get report that a region is CLOSED or SPLIT because a heartbeat 1532 // came in at about same time as a region transition. Make sure there is some 1533 // elapsed time so less false alarms. 1534 if (diff > lag) { 1535 LOG.warn("Reporting {} state does not match {} (time since last update={}ms)", 1536 serverName, regionNode, diff); 1537 } 1538 } 1539 } finally { 1540 regionNode.unlock(); 1541 } 1542 } else { 1543 LOG.warn( 1544 "Unable to acquire lock for regionNode {}. It is likely that another thread is currently holding the lock. To avoid deadlock, skip execution for now.", 1545 regionNode); 1546 } 1547 } 1548 } 1549 1550 // ============================================================================================ 1551 // RIT chore 1552 // ============================================================================================ 1553 private static class RegionInTransitionChore extends ProcedureInMemoryChore<MasterProcedureEnv> { 1554 public RegionInTransitionChore(final int timeoutMsec) { 1555 super(timeoutMsec); 1556 } 1557 1558 @Override 1559 protected void periodicExecute(final MasterProcedureEnv env) { 1560 final AssignmentManager am = env.getAssignmentManager(); 1561 1562 final RegionInTransitionStat ritStat = am.computeRegionInTransitionStat(); 1563 if (ritStat.hasRegionsOverThreshold()) { 1564 for (RegionState hri : ritStat.getRegionOverThreshold()) { 1565 am.handleRegionOverStuckWarningThreshold(hri.getRegion()); 1566 } 1567 } 1568 1569 // update metrics 1570 am.updateRegionsInTransitionMetrics(ritStat); 1571 } 1572 } 1573 1574 private static class DeadServerMetricRegionChore 1575 extends ProcedureInMemoryChore<MasterProcedureEnv> { 1576 public DeadServerMetricRegionChore(final int timeoutMsec) { 1577 super(timeoutMsec); 1578 } 1579 1580 @Override 1581 protected void periodicExecute(final MasterProcedureEnv env) { 1582 final ServerManager sm = env.getMasterServices().getServerManager(); 1583 final AssignmentManager am = env.getAssignmentManager(); 1584 // To minimize inconsistencies we are not going to snapshot live servers in advance in case 1585 // new servers are added; OTOH we don't want to add heavy sync for a consistent view since 1586 // this is for metrics. Instead, we're going to check each regions as we go; to avoid making 1587 // too many checks, we maintain a local lists of server, limiting us to false negatives. If 1588 // we miss some recently-dead server, we'll just see it next time. 1589 Set<ServerName> recentlyLiveServers = new HashSet<>(); 1590 int deadRegions = 0, unknownRegions = 0; 1591 for (RegionStateNode rsn : am.getRegionStates().getRegionStateNodes()) { 1592 if (rsn.getState() != State.OPEN) { 1593 continue; // Opportunistic check, should quickly skip RITs, offline tables, etc. 1594 } 1595 // Do not need to acquire region state lock as this is only for showing metrics. 1596 ServerName sn = rsn.getRegionLocation(); 1597 State state = rsn.getState(); 1598 if (state != State.OPEN) { 1599 continue; // Mostly skipping RITs that are already being take care of. 1600 } 1601 if (sn == null) { 1602 ++unknownRegions; // Opened on null? 1603 continue; 1604 } 1605 if (recentlyLiveServers.contains(sn)) { 1606 continue; 1607 } 1608 ServerManager.ServerLiveState sls = sm.isServerKnownAndOnline(sn); 1609 switch (sls) { 1610 case LIVE: 1611 recentlyLiveServers.add(sn); 1612 break; 1613 case DEAD: 1614 ++deadRegions; 1615 break; 1616 case UNKNOWN: 1617 ++unknownRegions; 1618 break; 1619 default: 1620 throw new AssertionError("Unexpected " + sls); 1621 } 1622 } 1623 if (deadRegions > 0 || unknownRegions > 0) { 1624 LOG.info("Found {} OPEN regions on dead servers and {} OPEN regions on unknown servers", 1625 deadRegions, unknownRegions); 1626 } 1627 1628 am.updateDeadServerRegionMetrics(deadRegions, unknownRegions); 1629 } 1630 } 1631 1632 public RegionInTransitionStat computeRegionInTransitionStat() { 1633 final RegionInTransitionStat rit = new RegionInTransitionStat(getConfiguration()); 1634 rit.update(this); 1635 return rit; 1636 } 1637 1638 public static class RegionInTransitionStat { 1639 private final int ritThreshold; 1640 1641 private HashMap<String, RegionState> ritsOverThreshold = null; 1642 private long statTimestamp; 1643 private long oldestRITTime = 0; 1644 private int totalRITsTwiceThreshold = 0; 1645 private int totalRITs = 0; 1646 1647 public RegionInTransitionStat(final Configuration conf) { 1648 this.ritThreshold = 1649 conf.getInt(METRICS_RIT_STUCK_WARNING_THRESHOLD, DEFAULT_RIT_STUCK_WARNING_THRESHOLD); 1650 } 1651 1652 public int getRITThreshold() { 1653 return ritThreshold; 1654 } 1655 1656 public long getTimestamp() { 1657 return statTimestamp; 1658 } 1659 1660 public int getTotalRITs() { 1661 return totalRITs; 1662 } 1663 1664 public long getOldestRITTime() { 1665 return oldestRITTime; 1666 } 1667 1668 public int getTotalRITsOverThreshold() { 1669 Map<String, RegionState> m = this.ritsOverThreshold; 1670 return m != null ? m.size() : 0; 1671 } 1672 1673 public boolean hasRegionsTwiceOverThreshold() { 1674 return totalRITsTwiceThreshold > 0; 1675 } 1676 1677 public boolean hasRegionsOverThreshold() { 1678 Map<String, RegionState> m = this.ritsOverThreshold; 1679 return m != null && !m.isEmpty(); 1680 } 1681 1682 public Collection<RegionState> getRegionOverThreshold() { 1683 Map<String, RegionState> m = this.ritsOverThreshold; 1684 return m != null ? m.values() : Collections.emptySet(); 1685 } 1686 1687 public boolean isRegionOverThreshold(final RegionInfo regionInfo) { 1688 Map<String, RegionState> m = this.ritsOverThreshold; 1689 return m != null && m.containsKey(regionInfo.getEncodedName()); 1690 } 1691 1692 public boolean isRegionTwiceOverThreshold(final RegionInfo regionInfo) { 1693 Map<String, RegionState> m = this.ritsOverThreshold; 1694 if (m == null) { 1695 return false; 1696 } 1697 final RegionState state = m.get(regionInfo.getEncodedName()); 1698 if (state == null) { 1699 return false; 1700 } 1701 return (statTimestamp - state.getStamp()) > (ritThreshold * 2); 1702 } 1703 1704 protected void update(final AssignmentManager am) { 1705 final RegionStates regionStates = am.getRegionStates(); 1706 this.statTimestamp = EnvironmentEdgeManager.currentTime(); 1707 update(regionStates.getRegionsStateInTransition(), statTimestamp); 1708 update(regionStates.getRegionFailedOpen(), statTimestamp); 1709 1710 if (LOG.isDebugEnabled() && ritsOverThreshold != null && !ritsOverThreshold.isEmpty()) { 1711 LOG.debug("RITs over threshold: {}", 1712 ritsOverThreshold.entrySet().stream() 1713 .map(e -> e.getKey() + ":" + e.getValue().getState().name()) 1714 .collect(Collectors.joining("\n"))); 1715 } 1716 } 1717 1718 private void update(final Collection<RegionState> regions, final long currentTime) { 1719 for (RegionState state : regions) { 1720 totalRITs++; 1721 final long ritStartedMs = state.getStamp(); 1722 if (ritStartedMs == 0) { 1723 // Don't output bogus values to metrics if they accidentally make it here. 1724 LOG.warn("The RIT {} has no start time", state.getRegion()); 1725 continue; 1726 } 1727 final long ritTime = currentTime - ritStartedMs; 1728 if (ritTime > ritThreshold) { 1729 if (ritsOverThreshold == null) { 1730 ritsOverThreshold = new HashMap<String, RegionState>(); 1731 } 1732 ritsOverThreshold.put(state.getRegion().getEncodedName(), state); 1733 totalRITsTwiceThreshold += (ritTime > (ritThreshold * 2)) ? 1 : 0; 1734 } 1735 if (oldestRITTime < ritTime) { 1736 oldestRITTime = ritTime; 1737 } 1738 } 1739 } 1740 } 1741 1742 private void updateRegionsInTransitionMetrics(final RegionInTransitionStat ritStat) { 1743 metrics.updateRITOldestAge(ritStat.getOldestRITTime()); 1744 metrics.updateRITCount(ritStat.getTotalRITs()); 1745 metrics.updateRITCountOverThreshold(ritStat.getTotalRITsOverThreshold()); 1746 } 1747 1748 private void updateDeadServerRegionMetrics(int deadRegions, int unknownRegions) { 1749 metrics.updateDeadServerOpenRegions(deadRegions); 1750 metrics.updateUnknownServerOpenRegions(unknownRegions); 1751 } 1752 1753 private void handleRegionOverStuckWarningThreshold(final RegionInfo regionInfo) { 1754 final RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo); 1755 // if (regionNode.isStuck()) { 1756 LOG.warn("STUCK Region-In-Transition {}", regionNode); 1757 } 1758 1759 // ============================================================================================ 1760 // TODO: Master load/bootstrap 1761 // ============================================================================================ 1762 public void joinCluster() throws IOException { 1763 long startTime = System.nanoTime(); 1764 LOG.debug("Joining cluster..."); 1765 1766 // Scan hbase:meta to build list of existing regions, servers, and assignment. 1767 // hbase:meta is online now or will be. Inside loadMeta, we keep trying. Can't make progress 1768 // w/o meta. 1769 loadMeta(); 1770 1771 while (master.getServerManager().countOfRegionServers() < 1) { 1772 LOG.info("Waiting for RegionServers to join; current count={}", 1773 master.getServerManager().countOfRegionServers()); 1774 Threads.sleep(250); 1775 } 1776 LOG.info("Number of RegionServers={}", master.getServerManager().countOfRegionServers()); 1777 1778 // Start the chores 1779 master.getMasterProcedureExecutor().addChore(this.ritChore); 1780 master.getMasterProcedureExecutor().addChore(this.deadMetricChore); 1781 1782 long costMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime); 1783 LOG.info("Joined the cluster in {}", StringUtils.humanTimeDiff(costMs)); 1784 } 1785 1786 /** 1787 * Create assign procedure for offline regions. Just follow the old 1788 * processofflineServersWithOnlineRegions method. Since now we do not need to deal with dead 1789 * server any more, we only deal with the regions in OFFLINE state in this method. And this is a 1790 * bit strange, that for new regions, we will add it in CLOSED state instead of OFFLINE state, and 1791 * usually there will be a procedure to track them. The processofflineServersWithOnlineRegions is 1792 * a legacy from long ago, as things are going really different now, maybe we do not need this 1793 * method any more. Need to revisit later. 1794 */ 1795 // Public so can be run by the Master as part of the startup. Needs hbase:meta to be online. 1796 // Needs to be done after the table state manager has been started. 1797 public void processOfflineRegions() { 1798 TransitRegionStateProcedure[] procs = 1799 regionStates.getRegionStateNodes().stream().filter(rsn -> rsn.isInState(State.OFFLINE)) 1800 .filter(rsn -> isTableEnabled(rsn.getRegionInfo().getTable())).map(rsn -> { 1801 rsn.lock(); 1802 try { 1803 if (rsn.getProcedure() != null) { 1804 return null; 1805 } else { 1806 return rsn.setProcedure(TransitRegionStateProcedure.assign(getProcedureEnvironment(), 1807 rsn.getRegionInfo(), null)); 1808 } 1809 } finally { 1810 rsn.unlock(); 1811 } 1812 }).filter(p -> p != null).toArray(TransitRegionStateProcedure[]::new); 1813 if (procs.length > 0) { 1814 master.getMasterProcedureExecutor().submitProcedures(procs); 1815 } 1816 } 1817 1818 /* 1819 * AM internal RegionStateStore.RegionStateVisitor implementation. To be used when scanning META 1820 * table for region rows, using RegionStateStore utility methods. RegionStateStore methods will 1821 * convert Result into proper RegionInfo instances, but those would still need to be added into 1822 * AssignmentManager.regionStates in-memory cache. RegionMetaLoadingVisitor.visitRegionState 1823 * method provides the logic for adding RegionInfo instances as loaded from latest META scan into 1824 * AssignmentManager.regionStates. 1825 */ 1826 private class RegionMetaLoadingVisitor implements RegionStateStore.RegionStateVisitor { 1827 1828 @Override 1829 public void visitRegionState(Result result, final RegionInfo regionInfo, final State state, 1830 final ServerName regionLocation, final ServerName lastHost, final long openSeqNum) { 1831 if ( 1832 state == null && regionLocation == null && lastHost == null 1833 && openSeqNum == SequenceId.NO_SEQUENCE_ID 1834 ) { 1835 // This is a row with nothing in it. 1836 LOG.warn("Skipping empty row={}", result); 1837 return; 1838 } 1839 State localState = state; 1840 if (localState == null) { 1841 // No region state column data in hbase:meta table! Are I doing a rolling upgrade from 1842 // hbase1 to hbase2? Am I restoring a SNAPSHOT or otherwise adding a region to hbase:meta? 1843 // In any of these cases, state is empty. For now, presume OFFLINE but there are probably 1844 // cases where we need to probe more to be sure this correct; TODO informed by experience. 1845 LOG.info(regionInfo.getEncodedName() + " regionState=null; presuming " + State.OFFLINE); 1846 localState = State.OFFLINE; 1847 } 1848 RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo); 1849 // Do not need to lock on regionNode, as we can make sure that before we finish loading 1850 // meta, all the related procedures can not be executed. The only exception is for meta 1851 // region related operations, but here we do not load the informations for meta region. 1852 regionNode.setState(localState); 1853 regionNode.setLastHost(lastHost); 1854 regionNode.setRegionLocation(regionLocation); 1855 regionNode.setOpenSeqNum(openSeqNum); 1856 1857 // Note: keep consistent with other methods, see region(Opening|Opened|Closing) 1858 // RIT/ServerCrash handling should take care of the transiting regions. 1859 if ( 1860 localState.matches(State.OPEN, State.OPENING, State.CLOSING, State.SPLITTING, State.MERGING) 1861 ) { 1862 assert regionLocation != null : "found null region location for " + regionNode; 1863 // TODO: this could lead to some orphan server state nodes, as it is possible that the 1864 // region server is already dead and its SCP has already finished but we have 1865 // persisted an opening state on this region server. Finally the TRSP will assign the 1866 // region to another region server, so it will not cause critical problems, just waste 1867 // some memory as no one will try to cleanup these orphan server state nodes. 1868 regionStates.createServer(regionLocation); 1869 regionStates.addRegionToServer(regionNode); 1870 } else if (localState == State.OFFLINE || regionInfo.isOffline()) { 1871 regionStates.addToOfflineRegions(regionNode); 1872 } 1873 if (regionNode.getProcedure() != null) { 1874 regionNode.getProcedure().stateLoaded(AssignmentManager.this, regionNode); 1875 } 1876 } 1877 }; 1878 1879 /** 1880 * Attempt to load {@code regionInfo} from META, adding any results to the 1881 * {@link #regionStateStore} Is NOT aware of replica regions. 1882 * @param regionInfo the region to be loaded from META. 1883 * @throws IOException If some error occurs while querying META or parsing results. 1884 */ 1885 public void populateRegionStatesFromMeta(@NonNull final RegionInfo regionInfo) 1886 throws IOException { 1887 final String regionEncodedName = RegionInfo.DEFAULT_REPLICA_ID == regionInfo.getReplicaId() 1888 ? regionInfo.getEncodedName() 1889 : RegionInfoBuilder.newBuilder(regionInfo).setReplicaId(RegionInfo.DEFAULT_REPLICA_ID).build() 1890 .getEncodedName(); 1891 populateRegionStatesFromMeta(regionEncodedName); 1892 } 1893 1894 /** 1895 * Attempt to load {@code regionEncodedName} from META, adding any results to the 1896 * {@link #regionStateStore} Is NOT aware of replica regions. 1897 * @param regionEncodedName encoded name for the region to be loaded from META. 1898 * @throws IOException If some error occurs while querying META or parsing results. 1899 */ 1900 public void populateRegionStatesFromMeta(@NonNull String regionEncodedName) throws IOException { 1901 final RegionMetaLoadingVisitor visitor = new RegionMetaLoadingVisitor(); 1902 regionStateStore.visitMetaForRegion(regionEncodedName, visitor); 1903 } 1904 1905 private void loadMeta() throws IOException { 1906 // TODO: use a thread pool 1907 regionStateStore.visitMeta(new RegionMetaLoadingVisitor()); 1908 } 1909 1910 /** 1911 * Used to check if the meta loading is done. 1912 * <p/> 1913 * if not we throw PleaseHoldException since we are rebuilding the RegionStates 1914 * @param hri region to check if it is already rebuild 1915 * @throws PleaseHoldException if meta has not been loaded yet 1916 */ 1917 private void checkMetaLoaded(RegionInfo hri) throws PleaseHoldException { 1918 if (!isRunning()) { 1919 throw new PleaseHoldException("AssignmentManager not running"); 1920 } 1921 boolean meta = isMetaRegion(hri); 1922 boolean metaLoaded = isMetaLoaded(); 1923 if (!meta && !metaLoaded) { 1924 throw new PleaseHoldException( 1925 "Master not fully online; hbase:meta=" + meta + ", metaLoaded=" + metaLoaded); 1926 } 1927 } 1928 1929 // ============================================================================================ 1930 // TODO: Metrics 1931 // ============================================================================================ 1932 public int getNumRegionsOpened() { 1933 // TODO: Used by TestRegionPlacement.java and assume monotonically increasing value 1934 return 0; 1935 } 1936 1937 /** 1938 * Usually run by the Master in reaction to server crash during normal processing. Can also be 1939 * invoked via external RPC to effect repair; in the latter case, the 'force' flag is set so we 1940 * push through the SCP though context may indicate already-running-SCP (An old SCP may have 1941 * exited abnormally, or damaged cluster may still have references in hbase:meta to 'Unknown 1942 * Servers' -- servers that are not online or in dead servers list, etc.) 1943 * @param force Set if the request came in externally over RPC (via hbck2). Force means run the 1944 * SCP even if it seems as though there might be an outstanding SCP running. 1945 * @return pid of scheduled SCP or {@link Procedure#NO_PROC_ID} if none scheduled. 1946 */ 1947 public long submitServerCrash(ServerName serverName, boolean shouldSplitWal, boolean force) { 1948 // May be an 'Unknown Server' so handle case where serverNode is null. 1949 ServerStateNode serverNode = regionStates.getServerNode(serverName); 1950 // Remove the in-memory rsReports result 1951 synchronized (rsReports) { 1952 rsReports.remove(serverName); 1953 } 1954 if (serverNode == null) { 1955 if (force) { 1956 LOG.info("Force adding ServerCrashProcedure for {} when server node is null", serverName); 1957 } else { 1958 // for normal case, do not schedule SCP if ServerStateNode is null 1959 LOG.warn("Skip adding ServerCrashProcedure for {} because server node is null", serverName); 1960 return Procedure.NO_PROC_ID; 1961 } 1962 } 1963 1964 ProcedureExecutor<MasterProcedureEnv> procExec = this.master.getMasterProcedureExecutor(); 1965 // We hold the write lock here for fencing on reportRegionStateTransition. Once we set the 1966 // server state to CRASHED, we will no longer accept the reportRegionStateTransition call from 1967 // this server. This is used to simplify the implementation for TRSP and SCP, where we can make 1968 // sure that, the region list fetched by SCP will not be changed any more. 1969 if (serverNode != null) { 1970 serverNode.writeLock().lock(); 1971 } 1972 try { 1973 1974 boolean carryingMeta = isCarryingMeta(serverName); 1975 if (serverNode != null && !serverNode.isInState(ServerState.ONLINE)) { 1976 if (force) { 1977 LOG.info("Force adding ServerCrashProcedure for {} (meta={}) when state is not {}", 1978 serverNode, carryingMeta, ServerState.ONLINE); 1979 } else { 1980 LOG.info("Skip adding ServerCrashProcedure for {} (meta={}) when state is not {}", 1981 serverNode, carryingMeta, ServerState.ONLINE); 1982 return Procedure.NO_PROC_ID; 1983 } 1984 } 1985 MasterProcedureEnv mpe = procExec.getEnvironment(); 1986 // If serverNode == null, then 'Unknown Server'. Schedule HBCKSCP instead. 1987 // HBCKSCP scours Master in-memory state AND hbase;meta for references to 1988 // serverName just-in-case. An SCP that is scheduled when the server is 1989 // 'Unknown' probably originated externally with HBCK2 fix-it tool. 1990 ServerState oldState = null; 1991 if (serverNode != null) { 1992 oldState = serverNode.getState(); 1993 serverNode.setState(ServerState.CRASHED); 1994 } 1995 ServerCrashProcedure scp = force 1996 ? new HBCKServerCrashProcedure(mpe, serverName, shouldSplitWal, carryingMeta) 1997 : new ServerCrashProcedure(mpe, serverName, shouldSplitWal, carryingMeta); 1998 long pid = procExec.submitProcedure(scp); 1999 LOG.info("Scheduled ServerCrashProcedure pid={} for {} (carryingMeta={}){}.", pid, serverName, 2000 carryingMeta, 2001 serverNode == null ? "" : " " + serverNode.toString() + ", oldState=" + oldState); 2002 return pid; 2003 } finally { 2004 if (serverNode != null) { 2005 serverNode.writeLock().unlock(); 2006 } 2007 } 2008 } 2009 2010 public void offlineRegion(final RegionInfo regionInfo) { 2011 // TODO used by MasterRpcServices 2012 RegionStateNode node = regionStates.getRegionStateNode(regionInfo); 2013 if (node != null) { 2014 node.offline(); 2015 } 2016 } 2017 2018 public void onlineRegion(final RegionInfo regionInfo, final ServerName serverName) { 2019 // TODO used by TestSplitTransactionOnCluster.java 2020 } 2021 2022 public Map<ServerName, List<RegionInfo>> 2023 getSnapShotOfAssignment(final Collection<RegionInfo> regions) { 2024 return regionStates.getSnapShotOfAssignment(regions); 2025 } 2026 2027 // ============================================================================================ 2028 // TODO: UTILS/HELPERS? 2029 // ============================================================================================ 2030 /** 2031 * Used by the client (via master) to identify if all regions have the schema updates 2032 * @return Pair indicating the status of the alter command (pending/total) 2033 */ 2034 public Pair<Integer, Integer> getReopenStatus(TableName tableName) { 2035 if (isTableDisabled(tableName)) { 2036 return new Pair<Integer, Integer>(0, 0); 2037 } 2038 2039 final List<RegionState> states = regionStates.getTableRegionStates(tableName); 2040 int ritCount = 0; 2041 for (RegionState regionState : states) { 2042 if (!regionState.isOpened() && !regionState.isSplit()) { 2043 ritCount++; 2044 } 2045 } 2046 return new Pair<Integer, Integer>(ritCount, states.size()); 2047 } 2048 2049 // ============================================================================================ 2050 // TODO: Region State In Transition 2051 // ============================================================================================ 2052 public boolean hasRegionsInTransition() { 2053 return regionStates.hasRegionsInTransition(); 2054 } 2055 2056 public List<RegionStateNode> getRegionsInTransition() { 2057 return regionStates.getRegionsInTransition(); 2058 } 2059 2060 public List<RegionInfo> getAssignedRegions() { 2061 return regionStates.getAssignedRegions(); 2062 } 2063 2064 /** 2065 * Resolve a cached {@link RegionInfo} from the region name as a {@code byte[]}. 2066 */ 2067 public RegionInfo getRegionInfo(final byte[] regionName) { 2068 final RegionStateNode regionState = regionStates.getRegionStateNodeFromName(regionName); 2069 return regionState != null ? regionState.getRegionInfo() : null; 2070 } 2071 2072 /** 2073 * Resolve a cached {@link RegionInfo} from the encoded region name as a {@code String}. 2074 */ 2075 public RegionInfo getRegionInfo(final String encodedRegionName) { 2076 final RegionStateNode regionState = 2077 regionStates.getRegionStateNodeFromEncodedRegionName(encodedRegionName); 2078 return regionState != null ? regionState.getRegionInfo() : null; 2079 } 2080 2081 // ============================================================================================ 2082 // Expected states on region state transition. 2083 // Notice that there is expected states for transiting to OPENING state, this is because SCP. 2084 // See the comments in regionOpening method for more details. 2085 // ============================================================================================ 2086 private static final State[] STATES_EXPECTED_ON_OPEN = { State.OPENING, // Normal case 2087 State.OPEN // Retrying 2088 }; 2089 2090 private static final State[] STATES_EXPECTED_ON_CLOSING = { State.OPEN, // Normal case 2091 State.CLOSING, // Retrying 2092 State.SPLITTING, // Offline the split parent 2093 State.MERGING // Offline the merge parents 2094 }; 2095 2096 private static final State[] STATES_EXPECTED_ON_CLOSED = { State.CLOSING, // Normal case 2097 State.CLOSED // Retrying 2098 }; 2099 2100 // This is for manually scheduled region assign, can add other states later if we find out other 2101 // usages 2102 private static final State[] STATES_EXPECTED_ON_ASSIGN = { State.CLOSED, State.OFFLINE }; 2103 2104 // We only allow unassign or move a region which is in OPEN state. 2105 private static final State[] STATES_EXPECTED_ON_UNASSIGN_OR_MOVE = { State.OPEN }; 2106 2107 // ============================================================================================ 2108 // Region Status update 2109 // Should only be called in TransitRegionStateProcedure(and related procedures), as the locking 2110 // and pre-assumptions are very tricky. 2111 // ============================================================================================ 2112 private CompletableFuture<Void> transitStateAndUpdate(RegionStateNode regionNode, 2113 RegionState.State newState, RegionState.State... expectedStates) { 2114 RegionState.State state = regionNode.getState(); 2115 try { 2116 regionNode.transitionState(newState, expectedStates); 2117 } catch (UnexpectedStateException e) { 2118 return FutureUtils.failedFuture(e); 2119 } 2120 CompletableFuture<Void> future = regionStateStore.updateRegionLocation(regionNode); 2121 FutureUtils.addListener(future, (r, e) -> { 2122 if (e != null) { 2123 // revert 2124 regionNode.setState(state); 2125 } 2126 }); 2127 return future; 2128 } 2129 2130 // should be called within the synchronized block of RegionStateNode 2131 CompletableFuture<Void> regionOpening(RegionStateNode regionNode) { 2132 // As in SCP, for performance reason, there is no TRSP attached with this region, we will not 2133 // update the region state, which means that the region could be in any state when we want to 2134 // assign it after a RS crash. So here we do not pass the expectedStates parameter. 2135 return transitStateAndUpdate(regionNode, State.OPENING).thenAccept(r -> { 2136 ServerStateNode serverNode = regionStates.getServerNode(regionNode.getRegionLocation()); 2137 // Here the server node could be null. For example, we want to assign the region to a given 2138 // region server and it crashes, and it is the region server which holds hbase:meta, then the 2139 // above transitStateAndUpdate call will never succeed until we finishes the SCP for it. But 2140 // after the SCP finishes, the server node will be removed, so when we arrive there, the 2141 // server 2142 // node will be null. This is not a big problem if we skip adding it, as later we will fail to 2143 // execute the remote procedure on the region server and then try to assign to another region 2144 // server 2145 if (serverNode != null) { 2146 serverNode.addRegion(regionNode); 2147 } 2148 // update the operation count metrics 2149 metrics.incrementOperationCounter(); 2150 }); 2151 } 2152 2153 // should be called under the RegionStateNode lock 2154 // The parameter 'giveUp' means whether we will try to open the region again, if it is true, then 2155 // we will persist the FAILED_OPEN state into hbase:meta. 2156 CompletableFuture<Void> regionFailedOpen(RegionStateNode regionNode, boolean giveUp) { 2157 RegionState.State state = regionNode.getState(); 2158 ServerName regionLocation = regionNode.getRegionLocation(); 2159 if (!giveUp) { 2160 if (regionLocation != null) { 2161 regionStates.removeRegionFromServer(regionLocation, regionNode); 2162 } 2163 return CompletableFuture.completedFuture(null); 2164 } 2165 regionNode.setState(State.FAILED_OPEN); 2166 regionNode.setRegionLocation(null); 2167 CompletableFuture<Void> future = regionStateStore.updateRegionLocation(regionNode); 2168 FutureUtils.addListener(future, (r, e) -> { 2169 if (e == null) { 2170 if (regionLocation != null) { 2171 regionStates.removeRegionFromServer(regionLocation, regionNode); 2172 } 2173 } else { 2174 // revert 2175 regionNode.setState(state); 2176 regionNode.setRegionLocation(regionLocation); 2177 } 2178 }); 2179 return future; 2180 } 2181 2182 // should be called under the RegionStateNode lock 2183 CompletableFuture<Void> regionClosing(RegionStateNode regionNode) { 2184 return transitStateAndUpdate(regionNode, State.CLOSING, STATES_EXPECTED_ON_CLOSING) 2185 .thenAccept(r -> { 2186 RegionInfo hri = regionNode.getRegionInfo(); 2187 // Set meta has not initialized early. so people trying to create/edit tables will wait 2188 if (isMetaRegion(hri)) { 2189 setMetaAssigned(hri, false); 2190 } 2191 // update the operation count metrics 2192 metrics.incrementOperationCounter(); 2193 }); 2194 } 2195 2196 // for open and close, they will first be persist to the procedure store in 2197 // RegionRemoteProcedureBase. So here we will first change the in memory state as it is considered 2198 // as succeeded if the persistence to procedure store is succeeded, and then when the 2199 // RegionRemoteProcedureBase is woken up, we will persist the RegionStateNode to hbase:meta. 2200 2201 // should be called under the RegionStateNode lock 2202 void regionOpenedWithoutPersistingToMeta(RegionStateNode regionNode) 2203 throws UnexpectedStateException { 2204 regionNode.transitionState(State.OPEN, STATES_EXPECTED_ON_OPEN); 2205 RegionInfo regionInfo = regionNode.getRegionInfo(); 2206 regionStates.addRegionToServer(regionNode); 2207 regionStates.removeFromFailedOpen(regionInfo); 2208 } 2209 2210 // should be called under the RegionStateNode lock 2211 void regionClosedWithoutPersistingToMeta(RegionStateNode regionNode) 2212 throws UnexpectedStateException { 2213 ServerName regionLocation = regionNode.getRegionLocation(); 2214 regionNode.transitionState(State.CLOSED, STATES_EXPECTED_ON_CLOSED); 2215 regionNode.setRegionLocation(null); 2216 if (regionLocation != null) { 2217 regionNode.setLastHost(regionLocation); 2218 regionStates.removeRegionFromServer(regionLocation, regionNode); 2219 } 2220 } 2221 2222 // should be called under the RegionStateNode lock 2223 CompletableFuture<Void> persistToMeta(RegionStateNode regionNode) { 2224 return regionStateStore.updateRegionLocation(regionNode).thenAccept(r -> { 2225 RegionInfo regionInfo = regionNode.getRegionInfo(); 2226 if (isMetaRegion(regionInfo) && regionNode.getState() == State.OPEN) { 2227 // Usually we'd set a table ENABLED at this stage but hbase:meta is ALWAYs enabled, it 2228 // can't be disabled -- so skip the RPC (besides... enabled is managed by TableStateManager 2229 // which is backed by hbase:meta... Avoid setting ENABLED to avoid having to update state 2230 // on table that contains state. 2231 setMetaAssigned(regionInfo, true); 2232 } 2233 }); 2234 } 2235 2236 // should be called under the RegionStateNode lock 2237 // for SCP 2238 public CompletableFuture<Void> regionClosedAbnormally(RegionStateNode regionNode) { 2239 RegionState.State state = regionNode.getState(); 2240 ServerName regionLocation = regionNode.getRegionLocation(); 2241 regionNode.setState(State.ABNORMALLY_CLOSED); 2242 regionNode.setRegionLocation(null); 2243 CompletableFuture<Void> future = regionStateStore.updateRegionLocation(regionNode); 2244 FutureUtils.addListener(future, (r, e) -> { 2245 if (e == null) { 2246 if (regionLocation != null) { 2247 regionNode.setLastHost(regionLocation); 2248 regionStates.removeRegionFromServer(regionLocation, regionNode); 2249 } 2250 } else { 2251 // revert 2252 regionNode.setState(state); 2253 regionNode.setRegionLocation(regionLocation); 2254 } 2255 }); 2256 return future; 2257 } 2258 2259 // ============================================================================================ 2260 // The above methods can only be called in TransitRegionStateProcedure(and related procedures) 2261 // ============================================================================================ 2262 2263 public void markRegionAsSplit(final RegionInfo parent, final ServerName serverName, 2264 final RegionInfo daughterA, final RegionInfo daughterB) throws IOException { 2265 // Update hbase:meta. Parent will be marked offline and split up in hbase:meta. 2266 // The parent stays in regionStates until cleared when removed by CatalogJanitor. 2267 // Update its state in regionStates to it shows as offline and split when read 2268 // later figuring what regions are in a table and what are not: see 2269 // regionStates#getRegionsOfTable 2270 final RegionStateNode node = regionStates.getOrCreateRegionStateNode(parent); 2271 node.setState(State.SPLIT); 2272 final RegionStateNode nodeA = regionStates.getOrCreateRegionStateNode(daughterA); 2273 nodeA.setState(State.SPLITTING_NEW); 2274 final RegionStateNode nodeB = regionStates.getOrCreateRegionStateNode(daughterB); 2275 nodeB.setState(State.SPLITTING_NEW); 2276 2277 TableDescriptor td = master.getTableDescriptors().get(parent.getTable()); 2278 // TODO: here we just update the parent region info in meta, to set split and offline to true, 2279 // without changing the one in the region node. This is a bit confusing but the region info 2280 // field in RegionStateNode is not expected to be changed in the current design. Need to find a 2281 // possible way to address this problem, or at least adding more comments about the trick to 2282 // deal with this problem, that when you want to filter out split parent, you need to check both 2283 // the RegionState on whether it is split, and also the region info. If one of them matches then 2284 // it is a split parent. And usually only one of them can match, as after restart, the region 2285 // state will be changed from SPLIT to CLOSED. 2286 regionStateStore.splitRegion(parent, daughterA, daughterB, serverName, td); 2287 if (shouldAssignFavoredNodes(parent)) { 2288 List<ServerName> onlineServers = this.master.getServerManager().getOnlineServersList(); 2289 getFavoredNodePromoter().generateFavoredNodesForDaughter(onlineServers, parent, daughterA, 2290 daughterB); 2291 } 2292 } 2293 2294 /** 2295 * When called here, the merge has happened. The merged regions have been unassigned and the above 2296 * markRegionClosed has been called on each so they have been disassociated from a hosting Server. 2297 * The merged region will be open after this call. The merged regions are removed from hbase:meta 2298 * below. Later they are deleted from the filesystem by the catalog janitor running against 2299 * hbase:meta. It notices when the merged region no longer holds references to the old regions 2300 * (References are deleted after a compaction rewrites what the Reference points at but not until 2301 * the archiver chore runs, are the References removed). 2302 */ 2303 public void markRegionAsMerged(final RegionInfo child, final ServerName serverName, 2304 RegionInfo[] mergeParents) throws IOException { 2305 final RegionStateNode node = regionStates.getOrCreateRegionStateNode(child); 2306 node.setState(State.MERGED); 2307 for (RegionInfo ri : mergeParents) { 2308 regionStates.deleteRegion(ri); 2309 } 2310 TableDescriptor td = master.getTableDescriptors().get(child.getTable()); 2311 regionStateStore.mergeRegions(child, mergeParents, serverName, td); 2312 if (shouldAssignFavoredNodes(child)) { 2313 getFavoredNodePromoter().generateFavoredNodesForMergedRegion(child, mergeParents); 2314 } 2315 } 2316 2317 /* 2318 * Favored nodes should be applied only when FavoredNodes balancer is configured and the region 2319 * belongs to a non-system table. 2320 */ 2321 private boolean shouldAssignFavoredNodes(RegionInfo region) { 2322 return this.shouldAssignRegionsWithFavoredNodes 2323 && FavoredNodesManager.isFavoredNodeApplicable(region); 2324 } 2325 2326 // ============================================================================================ 2327 // Assign Queue (Assign/Balance) 2328 // ============================================================================================ 2329 private final ArrayList<RegionStateNode> pendingAssignQueue = new ArrayList<RegionStateNode>(); 2330 private final ReentrantLock assignQueueLock = new ReentrantLock(); 2331 private final Condition assignQueueFullCond = assignQueueLock.newCondition(); 2332 2333 /** 2334 * Add the assign operation to the assignment queue. The pending assignment operation will be 2335 * processed, and each region will be assigned by a server using the balancer. 2336 */ 2337 protected void queueAssign(final RegionStateNode regionNode) { 2338 regionNode.getProcedureEvent().suspend(); 2339 2340 // TODO: quick-start for meta and the other sys-tables? 2341 assignQueueLock.lock(); 2342 try { 2343 pendingAssignQueue.add(regionNode); 2344 if ( 2345 regionNode.isSystemTable() || pendingAssignQueue.size() == 1 2346 || pendingAssignQueue.size() >= assignDispatchWaitQueueMaxSize 2347 ) { 2348 assignQueueFullCond.signal(); 2349 } 2350 } finally { 2351 assignQueueLock.unlock(); 2352 } 2353 } 2354 2355 private void startAssignmentThread() { 2356 assignThread = new Thread(master.getServerName().toShortString()) { 2357 @Override 2358 public void run() { 2359 while (isRunning()) { 2360 processAssignQueue(); 2361 } 2362 pendingAssignQueue.clear(); 2363 } 2364 }; 2365 assignThread.setDaemon(true); 2366 assignThread.start(); 2367 } 2368 2369 private void stopAssignmentThread() { 2370 assignQueueSignal(); 2371 try { 2372 while (assignThread.isAlive()) { 2373 assignQueueSignal(); 2374 assignThread.join(250); 2375 } 2376 } catch (InterruptedException e) { 2377 LOG.warn("join interrupted", e); 2378 Thread.currentThread().interrupt(); 2379 } 2380 } 2381 2382 private void assignQueueSignal() { 2383 assignQueueLock.lock(); 2384 try { 2385 assignQueueFullCond.signal(); 2386 } finally { 2387 assignQueueLock.unlock(); 2388 } 2389 } 2390 2391 @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP") 2392 private HashMap<RegionInfo, RegionStateNode> waitOnAssignQueue() { 2393 HashMap<RegionInfo, RegionStateNode> regions = null; 2394 2395 assignQueueLock.lock(); 2396 try { 2397 if (pendingAssignQueue.isEmpty() && isRunning()) { 2398 assignQueueFullCond.await(); 2399 } 2400 2401 if (!isRunning()) { 2402 return null; 2403 } 2404 assignQueueFullCond.await(assignDispatchWaitMillis, TimeUnit.MILLISECONDS); 2405 regions = new HashMap<RegionInfo, RegionStateNode>(pendingAssignQueue.size()); 2406 for (RegionStateNode regionNode : pendingAssignQueue) { 2407 regions.put(regionNode.getRegionInfo(), regionNode); 2408 } 2409 pendingAssignQueue.clear(); 2410 } catch (InterruptedException e) { 2411 LOG.warn("got interrupted ", e); 2412 Thread.currentThread().interrupt(); 2413 } finally { 2414 assignQueueLock.unlock(); 2415 } 2416 return regions; 2417 } 2418 2419 private void processAssignQueue() { 2420 final HashMap<RegionInfo, RegionStateNode> regions = waitOnAssignQueue(); 2421 if (regions == null || regions.size() == 0 || !isRunning()) { 2422 return; 2423 } 2424 2425 if (LOG.isTraceEnabled()) { 2426 LOG.trace("PROCESS ASSIGN QUEUE regionCount=" + regions.size()); 2427 } 2428 2429 // TODO: Optimize balancer. pass a RegionPlan? 2430 final HashMap<RegionInfo, ServerName> retainMap = new HashMap<>(); 2431 final List<RegionInfo> userHRIs = new ArrayList<>(regions.size()); 2432 // Regions for system tables requiring reassignment 2433 final List<RegionInfo> systemHRIs = new ArrayList<>(); 2434 for (RegionStateNode regionStateNode : regions.values()) { 2435 boolean sysTable = regionStateNode.isSystemTable(); 2436 final List<RegionInfo> hris = sysTable ? systemHRIs : userHRIs; 2437 if (regionStateNode.getRegionLocation() != null) { 2438 retainMap.put(regionStateNode.getRegionInfo(), regionStateNode.getRegionLocation()); 2439 } else { 2440 hris.add(regionStateNode.getRegionInfo()); 2441 } 2442 } 2443 2444 // TODO: connect with the listener to invalidate the cache 2445 2446 // TODO use events 2447 List<ServerName> servers = master.getServerManager().createDestinationServersList(); 2448 for (int i = 0; servers.size() < 1; ++i) { 2449 // Report every fourth time around this loop; try not to flood log. 2450 if (i % 4 == 0) { 2451 LOG.warn("No servers available; cannot place " + regions.size() + " unassigned regions."); 2452 } 2453 2454 if (!isRunning()) { 2455 LOG.debug("Stopped! Dropping assign of " + regions.size() + " queued regions."); 2456 return; 2457 } 2458 Threads.sleep(250); 2459 servers = master.getServerManager().createDestinationServersList(); 2460 } 2461 2462 if (!systemHRIs.isEmpty()) { 2463 // System table regions requiring reassignment are present, get region servers 2464 // not available for system table regions 2465 final List<ServerName> excludeServers = getExcludedServersForSystemTable(); 2466 List<ServerName> serversForSysTables = 2467 servers.stream().filter(s -> !excludeServers.contains(s)).collect(Collectors.toList()); 2468 if (serversForSysTables.isEmpty()) { 2469 LOG.warn("Filtering old server versions and the excluded produced an empty set; " 2470 + "instead considering all candidate servers!"); 2471 } 2472 LOG.debug("Processing assignQueue; systemServersCount=" + serversForSysTables.size() 2473 + ", allServersCount=" + servers.size()); 2474 processAssignmentPlans(regions, null, systemHRIs, 2475 serversForSysTables.isEmpty() && !containsBogusAssignments(regions, systemHRIs) 2476 ? servers 2477 : serversForSysTables); 2478 } 2479 2480 processAssignmentPlans(regions, retainMap, userHRIs, servers); 2481 } 2482 2483 private boolean containsBogusAssignments(Map<RegionInfo, RegionStateNode> regions, 2484 List<RegionInfo> hirs) { 2485 for (RegionInfo ri : hirs) { 2486 if ( 2487 regions.get(ri).getRegionLocation() != null 2488 && regions.get(ri).getRegionLocation().equals(LoadBalancer.BOGUS_SERVER_NAME) 2489 ) { 2490 return true; 2491 } 2492 } 2493 return false; 2494 } 2495 2496 private void processAssignmentPlans(final HashMap<RegionInfo, RegionStateNode> regions, 2497 final HashMap<RegionInfo, ServerName> retainMap, final List<RegionInfo> hris, 2498 final List<ServerName> servers) { 2499 boolean isTraceEnabled = LOG.isTraceEnabled(); 2500 if (isTraceEnabled) { 2501 LOG.trace("Available servers count=" + servers.size() + ": " + servers); 2502 } 2503 2504 final LoadBalancer balancer = getBalancer(); 2505 // ask the balancer where to place regions 2506 if (retainMap != null && !retainMap.isEmpty()) { 2507 if (isTraceEnabled) { 2508 LOG.trace("retain assign regions=" + retainMap); 2509 } 2510 try { 2511 acceptPlan(regions, balancer.retainAssignment(retainMap, servers)); 2512 } catch (IOException e) { 2513 LOG.warn("unable to retain assignment", e); 2514 addToPendingAssignment(regions, retainMap.keySet()); 2515 } 2516 } 2517 2518 // TODO: Do we need to split retain and round-robin? 2519 // the retain seems to fallback to round-robin/random if the region is not in the map. 2520 if (!hris.isEmpty()) { 2521 Collections.sort(hris, RegionInfo.COMPARATOR); 2522 if (isTraceEnabled) { 2523 LOG.trace("round robin regions=" + hris); 2524 } 2525 try { 2526 acceptPlan(regions, balancer.roundRobinAssignment(hris, servers)); 2527 } catch (IOException e) { 2528 LOG.warn("unable to round-robin assignment", e); 2529 addToPendingAssignment(regions, hris); 2530 } 2531 } 2532 } 2533 2534 private void acceptPlan(final HashMap<RegionInfo, RegionStateNode> regions, 2535 final Map<ServerName, List<RegionInfo>> plan) throws HBaseIOException { 2536 final ProcedureEvent<?>[] events = new ProcedureEvent[regions.size()]; 2537 final long st = EnvironmentEdgeManager.currentTime(); 2538 2539 if (plan.isEmpty()) { 2540 throw new HBaseIOException("unable to compute plans for regions=" + regions.size()); 2541 } 2542 2543 int evcount = 0; 2544 for (Map.Entry<ServerName, List<RegionInfo>> entry : plan.entrySet()) { 2545 final ServerName server = entry.getKey(); 2546 for (RegionInfo hri : entry.getValue()) { 2547 final RegionStateNode regionNode = regions.get(hri); 2548 regionNode.setRegionLocation(server); 2549 if (server.equals(LoadBalancer.BOGUS_SERVER_NAME) && regionNode.isSystemTable()) { 2550 assignQueueLock.lock(); 2551 try { 2552 pendingAssignQueue.add(regionNode); 2553 } finally { 2554 assignQueueLock.unlock(); 2555 } 2556 } else { 2557 events[evcount++] = regionNode.getProcedureEvent(); 2558 } 2559 } 2560 } 2561 ProcedureEvent.wakeEvents(getProcedureScheduler(), events); 2562 2563 final long et = EnvironmentEdgeManager.currentTime(); 2564 if (LOG.isTraceEnabled()) { 2565 LOG.trace("ASSIGN ACCEPT " + events.length + " -> " + StringUtils.humanTimeDiff(et - st)); 2566 } 2567 } 2568 2569 private void addToPendingAssignment(final HashMap<RegionInfo, RegionStateNode> regions, 2570 final Collection<RegionInfo> pendingRegions) { 2571 assignQueueLock.lock(); 2572 try { 2573 for (RegionInfo hri : pendingRegions) { 2574 pendingAssignQueue.add(regions.get(hri)); 2575 } 2576 } finally { 2577 assignQueueLock.unlock(); 2578 } 2579 } 2580 2581 /** 2582 * For a given cluster with mixed versions of servers, get a list of servers with lower versions, 2583 * where system table regions should not be assigned to. For system table, we must assign regions 2584 * to a server with highest version. However, we can disable this exclusion using config: 2585 * "hbase.min.version.move.system.tables" if checkForMinVersion is true. Detailed explanation 2586 * available with definition of minVersionToMoveSysTables. 2587 * @return List of Excluded servers for System table regions. 2588 */ 2589 public List<ServerName> getExcludedServersForSystemTable() { 2590 // TODO: This should be a cached list kept by the ServerManager rather than calculated on each 2591 // move or system region assign. The RegionServerTracker keeps list of online Servers with 2592 // RegionServerInfo that includes Version. 2593 List<Pair<ServerName, String>> serverList = 2594 master.getServerManager().getOnlineServersList().stream() 2595 .map(s -> new Pair<>(s, master.getRegionServerVersion(s))).collect(Collectors.toList()); 2596 if (serverList.isEmpty()) { 2597 return new ArrayList<>(); 2598 } 2599 String highestVersion = Collections 2600 .max(serverList, (o1, o2) -> VersionInfo.compareVersion(o1.getSecond(), o2.getSecond())) 2601 .getSecond(); 2602 if (!DEFAULT_MIN_VERSION_MOVE_SYS_TABLES_CONFIG.equals(minVersionToMoveSysTables)) { 2603 int comparedValue = VersionInfo.compareVersion(minVersionToMoveSysTables, highestVersion); 2604 if (comparedValue > 0) { 2605 return new ArrayList<>(); 2606 } 2607 } 2608 return serverList.stream().filter(pair -> !pair.getSecond().equals(highestVersion)) 2609 .map(Pair::getFirst).collect(Collectors.toList()); 2610 } 2611 2612 MasterServices getMaster() { 2613 return master; 2614 } 2615 2616 /** Returns a snapshot of rsReports */ 2617 public Map<ServerName, Set<byte[]>> getRSReports() { 2618 Map<ServerName, Set<byte[]>> rsReportsSnapshot = new HashMap<>(); 2619 synchronized (rsReports) { 2620 rsReports.entrySet().forEach(e -> rsReportsSnapshot.put(e.getKey(), e.getValue())); 2621 } 2622 return rsReportsSnapshot; 2623 } 2624 2625 /** 2626 * Provide regions state count for given table. e.g howmany regions of give table are 2627 * opened/closed/rit etc 2628 * @param tableName TableName 2629 * @return region states count 2630 */ 2631 public RegionStatesCount getRegionStatesCount(TableName tableName) { 2632 int openRegionsCount = 0; 2633 int closedRegionCount = 0; 2634 int ritCount = 0; 2635 int splitRegionCount = 0; 2636 int totalRegionCount = 0; 2637 if (!isTableDisabled(tableName)) { 2638 final List<RegionState> states = regionStates.getTableRegionStates(tableName); 2639 for (RegionState regionState : states) { 2640 if (regionState.isOpened()) { 2641 openRegionsCount++; 2642 } else if (regionState.isClosed()) { 2643 closedRegionCount++; 2644 } else if (regionState.isSplit()) { 2645 splitRegionCount++; 2646 } 2647 } 2648 totalRegionCount = states.size(); 2649 ritCount = totalRegionCount - openRegionsCount - splitRegionCount; 2650 } 2651 return new RegionStatesCount.RegionStatesCountBuilder().setOpenRegions(openRegionsCount) 2652 .setClosedRegions(closedRegionCount).setSplitRegions(splitRegionCount) 2653 .setRegionsInTransition(ritCount).setTotalRegions(totalRegionCount).build(); 2654 } 2655 2656}