001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.assignment; 019 020import edu.umd.cs.findbugs.annotations.NonNull; 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Collection; 024import java.util.Collections; 025import java.util.HashMap; 026import java.util.HashSet; 027import java.util.List; 028import java.util.Map; 029import java.util.Set; 030import java.util.concurrent.CompletableFuture; 031import java.util.concurrent.Future; 032import java.util.concurrent.TimeUnit; 033import java.util.concurrent.atomic.AtomicBoolean; 034import java.util.concurrent.locks.Condition; 035import java.util.concurrent.locks.ReentrantLock; 036import java.util.stream.Collectors; 037import java.util.stream.Stream; 038import org.apache.hadoop.conf.Configuration; 039import org.apache.hadoop.hbase.CatalogFamilyFormat; 040import org.apache.hadoop.hbase.DoNotRetryIOException; 041import org.apache.hadoop.hbase.HBaseIOException; 042import org.apache.hadoop.hbase.HConstants; 043import org.apache.hadoop.hbase.PleaseHoldException; 044import org.apache.hadoop.hbase.ServerName; 045import org.apache.hadoop.hbase.TableName; 046import org.apache.hadoop.hbase.UnknownRegionException; 047import org.apache.hadoop.hbase.client.DoNotRetryRegionException; 048import org.apache.hadoop.hbase.client.MasterSwitchType; 049import org.apache.hadoop.hbase.client.RegionInfo; 050import org.apache.hadoop.hbase.client.RegionInfoBuilder; 051import org.apache.hadoop.hbase.client.RegionReplicaUtil; 052import org.apache.hadoop.hbase.client.RegionStatesCount; 053import org.apache.hadoop.hbase.client.Result; 054import org.apache.hadoop.hbase.client.ResultScanner; 055import org.apache.hadoop.hbase.client.Scan; 056import org.apache.hadoop.hbase.client.TableDescriptor; 057import org.apache.hadoop.hbase.client.TableState; 058import org.apache.hadoop.hbase.exceptions.UnexpectedStateException; 059import org.apache.hadoop.hbase.favored.FavoredNodesManager; 060import org.apache.hadoop.hbase.favored.FavoredNodesPromoter; 061import org.apache.hadoop.hbase.master.LoadBalancer; 062import org.apache.hadoop.hbase.master.MasterServices; 063import org.apache.hadoop.hbase.master.MetricsAssignmentManager; 064import org.apache.hadoop.hbase.master.RegionPlan; 065import org.apache.hadoop.hbase.master.RegionState; 066import org.apache.hadoop.hbase.master.RegionState.State; 067import org.apache.hadoop.hbase.master.ServerManager; 068import org.apache.hadoop.hbase.master.TableStateManager; 069import org.apache.hadoop.hbase.master.balancer.FavoredStochasticBalancer; 070import org.apache.hadoop.hbase.master.procedure.HBCKServerCrashProcedure; 071import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 072import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler; 073import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait; 074import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure; 075import org.apache.hadoop.hbase.master.procedure.TruncateRegionProcedure; 076import org.apache.hadoop.hbase.master.region.MasterRegion; 077import org.apache.hadoop.hbase.procedure2.Procedure; 078import org.apache.hadoop.hbase.procedure2.ProcedureEvent; 079import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; 080import org.apache.hadoop.hbase.procedure2.ProcedureInMemoryChore; 081import org.apache.hadoop.hbase.procedure2.util.StringUtils; 082import org.apache.hadoop.hbase.regionserver.SequenceId; 083import org.apache.hadoop.hbase.rsgroup.RSGroupBasedLoadBalancer; 084import org.apache.hadoop.hbase.util.Bytes; 085import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 086import org.apache.hadoop.hbase.util.FutureUtils; 087import org.apache.hadoop.hbase.util.Pair; 088import org.apache.hadoop.hbase.util.Threads; 089import org.apache.hadoop.hbase.util.VersionInfo; 090import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; 091import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 092import org.apache.yetus.audience.InterfaceAudience; 093import org.apache.zookeeper.KeeperException; 094import org.slf4j.Logger; 095import org.slf4j.LoggerFactory; 096 097import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 098import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition; 099import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; 100import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest; 101import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse; 102 103/** 104 * The AssignmentManager is the coordinator for region assign/unassign operations. 105 * <ul> 106 * <li>In-memory states of regions and servers are stored in {@link RegionStates}.</li> 107 * <li>hbase:meta state updates are handled by {@link RegionStateStore}.</li> 108 * </ul> 109 * Regions are created by CreateTable, Split, Merge. Regions are deleted by DeleteTable, Split, 110 * Merge. Assigns are triggered by CreateTable, EnableTable, Split, Merge, ServerCrash. Unassigns 111 * are triggered by DisableTable, Split, Merge 112 */ 113@InterfaceAudience.Private 114public class AssignmentManager { 115 private static final Logger LOG = LoggerFactory.getLogger(AssignmentManager.class); 116 117 // TODO: AMv2 118 // - handle region migration from hbase1 to hbase2. 119 // - handle sys table assignment first (e.g. acl, namespace) 120 // - handle table priorities 121 // - If ServerBusyException trying to update hbase:meta, we abort the Master 122 // See updateRegionLocation in RegionStateStore. 123 // 124 // See also 125 // https://docs.google.com/document/d/1eVKa7FHdeoJ1-9o8yZcOTAQbv0u0bblBlCCzVSIn69g/edit#heading=h.ystjyrkbtoq5 126 // for other TODOs. 127 128 public static final String BOOTSTRAP_THREAD_POOL_SIZE_CONF_KEY = 129 "hbase.assignment.bootstrap.thread.pool.size"; 130 131 public static final String ASSIGN_DISPATCH_WAIT_MSEC_CONF_KEY = 132 "hbase.assignment.dispatch.wait.msec"; 133 private static final int DEFAULT_ASSIGN_DISPATCH_WAIT_MSEC = 150; 134 135 public static final String ASSIGN_DISPATCH_WAITQ_MAX_CONF_KEY = 136 "hbase.assignment.dispatch.wait.queue.max.size"; 137 private static final int DEFAULT_ASSIGN_DISPATCH_WAITQ_MAX = 100; 138 139 public static final String RIT_CHORE_INTERVAL_MSEC_CONF_KEY = 140 "hbase.assignment.rit.chore.interval.msec"; 141 private static final int DEFAULT_RIT_CHORE_INTERVAL_MSEC = 60 * 1000; 142 143 public static final String DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC_CONF_KEY = 144 "hbase.assignment.dead.region.metric.chore.interval.msec"; 145 private static final int DEFAULT_DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC = 120 * 1000; 146 147 public static final String ASSIGN_MAX_ATTEMPTS = "hbase.assignment.maximum.attempts"; 148 private static final int DEFAULT_ASSIGN_MAX_ATTEMPTS = Integer.MAX_VALUE; 149 150 public static final String ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS = 151 "hbase.assignment.retry.immediately.maximum.attempts"; 152 private static final int DEFAULT_ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS = 3; 153 154 /** Region in Transition metrics threshold time */ 155 public static final String METRICS_RIT_STUCK_WARNING_THRESHOLD = 156 "hbase.metrics.rit.stuck.warning.threshold"; 157 private static final int DEFAULT_RIT_STUCK_WARNING_THRESHOLD = 60 * 1000; 158 public static final String UNEXPECTED_STATE_REGION = "Unexpected state for "; 159 160 public static final String FORCE_REGION_RETAINMENT = "hbase.master.scp.retain.assignment.force"; 161 162 public static final boolean DEFAULT_FORCE_REGION_RETAINMENT = false; 163 164 /** The wait time in millis before checking again if the region's previous RS is back online */ 165 public static final String FORCE_REGION_RETAINMENT_WAIT_INTERVAL = 166 "hbase.master.scp.retain.assignment.force.wait-interval"; 167 168 public static final long DEFAULT_FORCE_REGION_RETAINMENT_WAIT_INTERVAL = 50; 169 170 /** 171 * The number of times to check if the region's previous RS is back online, before giving up and 172 * proceeding with assignment on a new RS 173 */ 174 public static final String FORCE_REGION_RETAINMENT_RETRIES = 175 "hbase.master.scp.retain.assignment.force.retries"; 176 177 public static final int DEFAULT_FORCE_REGION_RETAINMENT_RETRIES = 600; 178 179 private final ProcedureEvent<?> metaAssignEvent = new ProcedureEvent<>("meta assign"); 180 private final ProcedureEvent<?> metaLoadEvent = new ProcedureEvent<>("meta load"); 181 182 private final MetricsAssignmentManager metrics; 183 private final RegionInTransitionChore ritChore; 184 private final DeadServerMetricRegionChore deadMetricChore; 185 private final MasterServices master; 186 187 private final AtomicBoolean running = new AtomicBoolean(false); 188 private final RegionStates regionStates = new RegionStates(); 189 private final RegionStateStore regionStateStore; 190 191 /** 192 * When the operator uses this configuration option, any version between the current cluster 193 * version and the value of "hbase.min.version.move.system.tables" does not trigger any 194 * auto-region movement. Auto-region movement here refers to auto-migration of system table 195 * regions to newer server versions. It is assumed that the configured range of versions does not 196 * require special handling of moving system table regions to higher versioned RegionServer. This 197 * auto-migration is done by {@link #checkIfShouldMoveSystemRegionAsync()}. Example: Let's assume 198 * the cluster is on version 1.4.0 and we have set "hbase.min.version.move.system.tables" as 199 * "2.0.0". Now if we upgrade one RegionServer on 1.4.0 cluster to 1.6.0 (< 2.0.0), then 200 * AssignmentManager will not move hbase:meta, hbase:namespace and other system table regions to 201 * newly brought up RegionServer 1.6.0 as part of auto-migration. However, if we upgrade one 202 * RegionServer on 1.4.0 cluster to 2.2.0 (> 2.0.0), then AssignmentManager will move all system 203 * table regions to newly brought up RegionServer 2.2.0 as part of auto-migration done by 204 * {@link #checkIfShouldMoveSystemRegionAsync()}. "hbase.min.version.move.system.tables" is 205 * introduced as part of HBASE-22923. 206 */ 207 private final String minVersionToMoveSysTables; 208 209 private static final String MIN_VERSION_MOVE_SYS_TABLES_CONFIG = 210 "hbase.min.version.move.system.tables"; 211 private static final String DEFAULT_MIN_VERSION_MOVE_SYS_TABLES_CONFIG = ""; 212 213 private final Map<ServerName, Set<byte[]>> rsReports = new HashMap<>(); 214 215 private final boolean shouldAssignRegionsWithFavoredNodes; 216 private final int assignDispatchWaitQueueMaxSize; 217 private final int assignDispatchWaitMillis; 218 private final int assignMaxAttempts; 219 private final int assignRetryImmediatelyMaxAttempts; 220 221 private final MasterRegion masterRegion; 222 223 private final Object checkIfShouldMoveSystemRegionLock = new Object(); 224 225 private Thread assignThread; 226 227 private final boolean forceRegionRetainment; 228 229 private final long forceRegionRetainmentWaitInterval; 230 231 private final int forceRegionRetainmentRetries; 232 233 public AssignmentManager(MasterServices master, MasterRegion masterRegion) { 234 this(master, masterRegion, new RegionStateStore(master, masterRegion)); 235 } 236 237 AssignmentManager(MasterServices master, MasterRegion masterRegion, RegionStateStore stateStore) { 238 this.master = master; 239 this.regionStateStore = stateStore; 240 this.metrics = new MetricsAssignmentManager(); 241 this.masterRegion = masterRegion; 242 243 final Configuration conf = master.getConfiguration(); 244 245 // Only read favored nodes if using the favored nodes load balancer. 246 this.shouldAssignRegionsWithFavoredNodes = FavoredStochasticBalancer.class 247 .isAssignableFrom(conf.getClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, Object.class)); 248 249 this.assignDispatchWaitMillis = 250 conf.getInt(ASSIGN_DISPATCH_WAIT_MSEC_CONF_KEY, DEFAULT_ASSIGN_DISPATCH_WAIT_MSEC); 251 this.assignDispatchWaitQueueMaxSize = 252 conf.getInt(ASSIGN_DISPATCH_WAITQ_MAX_CONF_KEY, DEFAULT_ASSIGN_DISPATCH_WAITQ_MAX); 253 254 this.assignMaxAttempts = 255 Math.max(1, conf.getInt(ASSIGN_MAX_ATTEMPTS, DEFAULT_ASSIGN_MAX_ATTEMPTS)); 256 this.assignRetryImmediatelyMaxAttempts = conf.getInt(ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS, 257 DEFAULT_ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS); 258 259 int ritChoreInterval = 260 conf.getInt(RIT_CHORE_INTERVAL_MSEC_CONF_KEY, DEFAULT_RIT_CHORE_INTERVAL_MSEC); 261 this.ritChore = new RegionInTransitionChore(ritChoreInterval); 262 263 int deadRegionChoreInterval = conf.getInt(DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC_CONF_KEY, 264 DEFAULT_DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC); 265 if (deadRegionChoreInterval > 0) { 266 this.deadMetricChore = new DeadServerMetricRegionChore(deadRegionChoreInterval); 267 } else { 268 this.deadMetricChore = null; 269 } 270 minVersionToMoveSysTables = 271 conf.get(MIN_VERSION_MOVE_SYS_TABLES_CONFIG, DEFAULT_MIN_VERSION_MOVE_SYS_TABLES_CONFIG); 272 273 forceRegionRetainment = 274 conf.getBoolean(FORCE_REGION_RETAINMENT, DEFAULT_FORCE_REGION_RETAINMENT); 275 forceRegionRetainmentWaitInterval = conf.getLong(FORCE_REGION_RETAINMENT_WAIT_INTERVAL, 276 DEFAULT_FORCE_REGION_RETAINMENT_WAIT_INTERVAL); 277 forceRegionRetainmentRetries = 278 conf.getInt(FORCE_REGION_RETAINMENT_RETRIES, DEFAULT_FORCE_REGION_RETAINMENT_RETRIES); 279 } 280 281 private void mirrorMetaLocations() throws IOException, KeeperException { 282 // For compatibility, mirror the meta region state to zookeeper 283 // And we still need to use zookeeper to publish the meta region locations to region 284 // server, so they can serve as ClientMetaService 285 ZKWatcher zk = master.getZooKeeper(); 286 if (zk == null || !zk.getRecoverableZooKeeper().getState().isAlive()) { 287 // this is possible in tests, we do not provide a zk watcher or the zk watcher has been closed 288 return; 289 } 290 Collection<RegionStateNode> metaStates = regionStates.getRegionStateNodes(); 291 for (RegionStateNode metaState : metaStates) { 292 MetaTableLocator.setMetaLocation(zk, metaState.getRegionLocation(), 293 metaState.getRegionInfo().getReplicaId(), metaState.getState()); 294 } 295 int replicaCount = metaStates.size(); 296 // remove extra mirror locations 297 for (String znode : zk.getMetaReplicaNodes()) { 298 int replicaId = zk.getZNodePaths().getMetaReplicaIdFromZNode(znode); 299 if (replicaId >= replicaCount) { 300 MetaTableLocator.deleteMetaLocation(zk, replicaId); 301 } 302 } 303 } 304 305 public void start() throws IOException, KeeperException { 306 if (!running.compareAndSet(false, true)) { 307 return; 308 } 309 310 LOG.trace("Starting assignment manager"); 311 312 // Start the Assignment Thread 313 startAssignmentThread(); 314 // load meta region states. 315 // here we are still in the early steps of active master startup. There is only one thread(us) 316 // can access AssignmentManager and create region node, so here we do not need to lock the 317 // region node. 318 try (ResultScanner scanner = 319 masterRegion.getScanner(new Scan().addFamily(HConstants.CATALOG_FAMILY))) { 320 for (;;) { 321 Result result = scanner.next(); 322 if (result == null) { 323 break; 324 } 325 RegionStateStore 326 .visitMetaEntry((r, regionInfo, state, regionLocation, lastHost, openSeqNum) -> { 327 RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo); 328 regionNode.setState(state); 329 regionNode.setLastHost(lastHost); 330 regionNode.setRegionLocation(regionLocation); 331 regionNode.setOpenSeqNum(openSeqNum); 332 if (regionNode.getProcedure() != null) { 333 regionNode.getProcedure().stateLoaded(this, regionNode); 334 } 335 if (regionLocation != null) { 336 // TODO: this could lead to some orphan server state nodes, as it is possible that the 337 // region server is already dead and its SCP has already finished but we have 338 // persisted an opening state on this region server. Finally the TRSP will assign the 339 // region to another region server, so it will not cause critical problems, just waste 340 // some memory as no one will try to cleanup these orphan server state nodes. 341 regionStates.createServer(regionLocation); 342 regionStates.addRegionToServer(regionNode); 343 } 344 if (RegionReplicaUtil.isDefaultReplica(regionInfo.getReplicaId())) { 345 setMetaAssigned(regionInfo, state == State.OPEN); 346 } 347 LOG.debug("Loaded hbase:meta {}", regionNode); 348 }, result); 349 } 350 } 351 mirrorMetaLocations(); 352 } 353 354 /** 355 * Create RegionStateNode based on the TRSP list, and attach the TRSP to the RegionStateNode. 356 * <p> 357 * This is used to restore the RIT region list, so we do not need to restore it in the loadingMeta 358 * method below. And it is also very important as now before submitting a TRSP, we need to attach 359 * it to the RegionStateNode, which acts like a guard, so we need to restore this information at 360 * the very beginning, before we start processing any procedures. 361 */ 362 public void setupRIT(List<TransitRegionStateProcedure> procs) { 363 procs.forEach(proc -> { 364 RegionInfo regionInfo = proc.getRegion(); 365 RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo); 366 TransitRegionStateProcedure existingProc = regionNode.getProcedure(); 367 if (existingProc != null) { 368 // This is possible, as we will detach the procedure from the RSN before we 369 // actually finish the procedure. This is because that, we will detach the TRSP from the RSN 370 // during execution, at that time, the procedure has not been marked as done in the pv2 371 // framework yet, so it is possible that we schedule a new TRSP immediately and when 372 // arriving here, we will find out that there are multiple TRSPs for the region. But we can 373 // make sure that, only the last one can take the charge, the previous ones should have all 374 // been finished already. So here we will compare the proc id, the greater one will win. 375 if (existingProc.getProcId() < proc.getProcId()) { 376 // the new one wins, unset and set it to the new one below 377 regionNode.unsetProcedure(existingProc); 378 } else { 379 // the old one wins, skip 380 return; 381 } 382 } 383 LOG.info("Attach {} to {} to restore RIT", proc, regionNode); 384 regionNode.setProcedure(proc); 385 }); 386 } 387 388 public void stop() { 389 if (!running.compareAndSet(true, false)) { 390 return; 391 } 392 393 LOG.info("Stopping assignment manager"); 394 395 // The AM is started before the procedure executor, 396 // but the actual work will be loaded/submitted only once we have the executor 397 final boolean hasProcExecutor = master.getMasterProcedureExecutor() != null; 398 399 // Remove the RIT chore 400 if (hasProcExecutor) { 401 master.getMasterProcedureExecutor().removeChore(this.ritChore); 402 if (this.deadMetricChore != null) { 403 master.getMasterProcedureExecutor().removeChore(this.deadMetricChore); 404 } 405 } 406 407 // Stop the Assignment Thread 408 stopAssignmentThread(); 409 410 // Stop the RegionStateStore 411 regionStates.clear(); 412 413 // Update meta events (for testing) 414 if (hasProcExecutor) { 415 metaLoadEvent.suspend(); 416 for (RegionInfo hri : getMetaRegionSet()) { 417 setMetaAssigned(hri, false); 418 } 419 } 420 } 421 422 public boolean isRunning() { 423 return running.get(); 424 } 425 426 public Configuration getConfiguration() { 427 return master.getConfiguration(); 428 } 429 430 public MetricsAssignmentManager getAssignmentManagerMetrics() { 431 return metrics; 432 } 433 434 private LoadBalancer getBalancer() { 435 return master.getLoadBalancer(); 436 } 437 438 private FavoredNodesPromoter getFavoredNodePromoter() { 439 return (FavoredNodesPromoter) ((RSGroupBasedLoadBalancer) master.getLoadBalancer()) 440 .getInternalBalancer(); 441 } 442 443 private MasterProcedureEnv getProcedureEnvironment() { 444 return master.getMasterProcedureExecutor().getEnvironment(); 445 } 446 447 private MasterProcedureScheduler getProcedureScheduler() { 448 return getProcedureEnvironment().getProcedureScheduler(); 449 } 450 451 int getAssignMaxAttempts() { 452 return assignMaxAttempts; 453 } 454 455 public boolean isForceRegionRetainment() { 456 return forceRegionRetainment; 457 } 458 459 public long getForceRegionRetainmentWaitInterval() { 460 return forceRegionRetainmentWaitInterval; 461 } 462 463 public int getForceRegionRetainmentRetries() { 464 return forceRegionRetainmentRetries; 465 } 466 467 int getAssignRetryImmediatelyMaxAttempts() { 468 return assignRetryImmediatelyMaxAttempts; 469 } 470 471 public RegionStates getRegionStates() { 472 return regionStates; 473 } 474 475 /** 476 * Returns the regions hosted by the specified server. 477 * <p/> 478 * Notice that, for SCP, after we submit the SCP, no one can change the region list for the 479 * ServerStateNode so we do not need any locks here. And for other usage, this can only give you a 480 * snapshot of the current region list for this server, which means, right after you get the 481 * region list, new regions may be moved to this server or some regions may be moved out from this 482 * server, so you should not use it critically if you need strong consistency. 483 */ 484 public List<RegionInfo> getRegionsOnServer(ServerName serverName) { 485 ServerStateNode serverInfo = regionStates.getServerNode(serverName); 486 if (serverInfo == null) { 487 return Collections.emptyList(); 488 } 489 return serverInfo.getRegionInfoList(); 490 } 491 492 private RegionInfo getRegionInfo(RegionStateNode rsn) { 493 if (rsn.isSplit() && !rsn.getRegionInfo().isSplit()) { 494 // see the comments in markRegionAsSplit on why we need to do this converting. 495 return RegionInfoBuilder.newBuilder(rsn.getRegionInfo()).setSplit(true).setOffline(true) 496 .build(); 497 } else { 498 return rsn.getRegionInfo(); 499 } 500 } 501 502 private Stream<RegionStateNode> getRegionStateNodes(TableName tableName, 503 boolean excludeOfflinedSplitParents) { 504 Stream<RegionStateNode> stream = regionStates.getTableRegionStateNodes(tableName).stream(); 505 if (excludeOfflinedSplitParents) { 506 return stream.filter(rsn -> !rsn.isSplit()); 507 } else { 508 return stream; 509 } 510 } 511 512 public List<RegionInfo> getTableRegions(TableName tableName, 513 boolean excludeOfflinedSplitParents) { 514 return getRegionStateNodes(tableName, excludeOfflinedSplitParents).map(this::getRegionInfo) 515 .collect(Collectors.toList()); 516 } 517 518 public List<Pair<RegionInfo, ServerName>> getTableRegionsAndLocations(TableName tableName, 519 boolean excludeOfflinedSplitParents) { 520 return getRegionStateNodes(tableName, excludeOfflinedSplitParents) 521 .map(rsn -> Pair.newPair(getRegionInfo(rsn), rsn.getRegionLocation())) 522 .collect(Collectors.toList()); 523 } 524 525 public RegionStateStore getRegionStateStore() { 526 return regionStateStore; 527 } 528 529 public List<ServerName> getFavoredNodes(final RegionInfo regionInfo) { 530 return this.shouldAssignRegionsWithFavoredNodes 531 ? getFavoredNodePromoter().getFavoredNodes(regionInfo) 532 : ServerName.EMPTY_SERVER_LIST; 533 } 534 535 // ============================================================================================ 536 // Table State Manager helpers 537 // ============================================================================================ 538 private TableStateManager getTableStateManager() { 539 return master.getTableStateManager(); 540 } 541 542 private boolean isTableEnabled(final TableName tableName) { 543 return getTableStateManager().isTableState(tableName, TableState.State.ENABLED); 544 } 545 546 private boolean isTableDisabled(final TableName tableName) { 547 return getTableStateManager().isTableState(tableName, TableState.State.DISABLED, 548 TableState.State.DISABLING); 549 } 550 551 // ============================================================================================ 552 // META Helpers 553 // ============================================================================================ 554 private boolean isMetaRegion(final RegionInfo regionInfo) { 555 return regionInfo.isMetaRegion(); 556 } 557 558 public boolean isMetaRegion(final byte[] regionName) { 559 return getMetaRegionFromName(regionName) != null; 560 } 561 562 public RegionInfo getMetaRegionFromName(final byte[] regionName) { 563 for (RegionInfo hri : getMetaRegionSet()) { 564 if (Bytes.equals(hri.getRegionName(), regionName)) { 565 return hri; 566 } 567 } 568 return null; 569 } 570 571 public boolean isCarryingMeta(final ServerName serverName) { 572 // TODO: handle multiple meta 573 return isCarryingRegion(serverName, RegionInfoBuilder.FIRST_META_REGIONINFO); 574 } 575 576 private boolean isCarryingRegion(final ServerName serverName, final RegionInfo regionInfo) { 577 // TODO: check for state? 578 final RegionStateNode node = regionStates.getRegionStateNode(regionInfo); 579 return (node != null && serverName.equals(node.getRegionLocation())); 580 } 581 582 private RegionInfo getMetaForRegion(final RegionInfo regionInfo) { 583 // if (regionInfo.isMetaRegion()) return regionInfo; 584 // TODO: handle multiple meta. if the region provided is not meta lookup 585 // which meta the region belongs to. 586 return RegionInfoBuilder.FIRST_META_REGIONINFO; 587 } 588 589 // TODO: handle multiple meta. 590 private static final Set<RegionInfo> META_REGION_SET = 591 Collections.singleton(RegionInfoBuilder.FIRST_META_REGIONINFO); 592 593 public Set<RegionInfo> getMetaRegionSet() { 594 return META_REGION_SET; 595 } 596 597 // ============================================================================================ 598 // META Event(s) helpers 599 // ============================================================================================ 600 /** 601 * Notice that, this only means the meta region is available on a RS, but the AM may still be 602 * loading the region states from meta, so usually you need to check {@link #isMetaLoaded()} first 603 * before checking this method, unless you can make sure that your piece of code can only be 604 * executed after AM builds the region states. 605 * @see #isMetaLoaded() 606 */ 607 public boolean isMetaAssigned() { 608 return metaAssignEvent.isReady(); 609 } 610 611 public boolean isMetaRegionInTransition() { 612 return !isMetaAssigned(); 613 } 614 615 /** 616 * Notice that this event does not mean the AM has already finished region state rebuilding. See 617 * the comment of {@link #isMetaAssigned()} for more details. 618 * @see #isMetaAssigned() 619 */ 620 public boolean waitMetaAssigned(Procedure<?> proc, RegionInfo regionInfo) { 621 return getMetaAssignEvent(getMetaForRegion(regionInfo)).suspendIfNotReady(proc); 622 } 623 624 private void setMetaAssigned(RegionInfo metaRegionInfo, boolean assigned) { 625 assert isMetaRegion(metaRegionInfo) : "unexpected non-meta region " + metaRegionInfo; 626 ProcedureEvent<?> metaAssignEvent = getMetaAssignEvent(metaRegionInfo); 627 if (assigned) { 628 metaAssignEvent.wake(getProcedureScheduler()); 629 } else { 630 metaAssignEvent.suspend(); 631 } 632 } 633 634 private ProcedureEvent<?> getMetaAssignEvent(RegionInfo metaRegionInfo) { 635 assert isMetaRegion(metaRegionInfo) : "unexpected non-meta region " + metaRegionInfo; 636 // TODO: handle multiple meta. 637 return metaAssignEvent; 638 } 639 640 /** 641 * Wait until AM finishes the meta loading, i.e, the region states rebuilding. 642 * @see #isMetaLoaded() 643 * @see #waitMetaAssigned(Procedure, RegionInfo) 644 */ 645 public boolean waitMetaLoaded(Procedure<?> proc) { 646 return metaLoadEvent.suspendIfNotReady(proc); 647 } 648 649 /** 650 * This method will be called in master initialization method after calling 651 * {@link #processOfflineRegions()}, as in processOfflineRegions we will generate assign 652 * procedures for offline regions, which may be conflict with creating table. 653 * <p/> 654 * This is a bit dirty, should be reconsidered after we decide whether to keep the 655 * {@link #processOfflineRegions()} method. 656 */ 657 public void wakeMetaLoadedEvent() { 658 metaLoadEvent.wake(getProcedureScheduler()); 659 assert isMetaLoaded() : "expected meta to be loaded"; 660 } 661 662 /** 663 * Return whether AM finishes the meta loading, i.e, the region states rebuilding. 664 * @see #isMetaAssigned() 665 * @see #waitMetaLoaded(Procedure) 666 */ 667 public boolean isMetaLoaded() { 668 return metaLoadEvent.isReady(); 669 } 670 671 /** 672 * Start a new thread to check if there are region servers whose versions are higher than others. 673 * If so, move all system table regions to RS with the highest version to keep compatibility. The 674 * reason is, RS in new version may not be able to access RS in old version when there are some 675 * incompatible changes. 676 * <p> 677 * This method is called when a new RegionServer is added to cluster only. 678 * </p> 679 */ 680 public void checkIfShouldMoveSystemRegionAsync() { 681 // TODO: Fix this thread. If a server is killed and a new one started, this thread thinks that 682 // it should 'move' the system tables from the old server to the new server but 683 // ServerCrashProcedure is on it; and it will take care of the assign without dataloss. 684 if (this.master.getServerManager().countOfRegionServers() <= 1) { 685 return; 686 } 687 // This thread used to run whenever there was a change in the cluster. The ZooKeeper 688 // childrenChanged notification came in before the nodeDeleted message and so this method 689 // cold run before a ServerCrashProcedure could run. That meant that this thread could see 690 // a Crashed Server before ServerCrashProcedure and it could find system regions on the 691 // crashed server and go move them before ServerCrashProcedure had a chance; could be 692 // dataloss too if WALs were not recovered. 693 new Thread(() -> { 694 try { 695 synchronized (checkIfShouldMoveSystemRegionLock) { 696 List<RegionPlan> plans = new ArrayList<>(); 697 // TODO: I don't think this code does a good job if all servers in cluster have same 698 // version. It looks like it will schedule unnecessary moves. 699 for (ServerName server : getExcludedServersForSystemTable()) { 700 if (master.getServerManager().isServerDead(server)) { 701 // TODO: See HBASE-18494 and HBASE-18495. Though getExcludedServersForSystemTable() 702 // considers only online servers, the server could be queued for dead server 703 // processing. As region assignments for crashed server is handled by 704 // ServerCrashProcedure, do NOT handle them here. The goal is to handle this through 705 // regular flow of LoadBalancer as a favored node and not to have this special 706 // handling. 707 continue; 708 } 709 List<RegionInfo> regionsShouldMove = getSystemTables(server); 710 if (!regionsShouldMove.isEmpty()) { 711 for (RegionInfo regionInfo : regionsShouldMove) { 712 // null value for dest forces destination server to be selected by balancer 713 RegionPlan plan = new RegionPlan(regionInfo, server, null); 714 if (regionInfo.isMetaRegion()) { 715 // Must move meta region first. 716 LOG.info("Async MOVE of {} to newer Server={}", regionInfo.getEncodedName(), 717 server); 718 moveAsync(plan); 719 } else { 720 plans.add(plan); 721 } 722 } 723 } 724 for (RegionPlan plan : plans) { 725 LOG.info("Async MOVE of {} to newer Server={}", plan.getRegionInfo().getEncodedName(), 726 server); 727 moveAsync(plan); 728 } 729 } 730 } 731 } catch (Throwable t) { 732 LOG.error(t.toString(), t); 733 } 734 }).start(); 735 } 736 737 private List<RegionInfo> getSystemTables(ServerName serverName) { 738 ServerStateNode serverNode = regionStates.getServerNode(serverName); 739 if (serverNode == null) { 740 return Collections.emptyList(); 741 } 742 return serverNode.getSystemRegionInfoList(); 743 } 744 745 private void preTransitCheck(RegionStateNode regionNode, RegionState.State[] expectedStates) 746 throws HBaseIOException { 747 if (regionNode.getProcedure() != null) { 748 throw new HBaseIOException( 749 regionNode + " is currently in transition; pid=" + regionNode.getProcedure().getProcId()); 750 } 751 if (!regionNode.isInState(expectedStates)) { 752 throw new DoNotRetryRegionException(UNEXPECTED_STATE_REGION + regionNode); 753 } 754 if (isTableDisabled(regionNode.getTable())) { 755 throw new DoNotRetryIOException(regionNode.getTable() + " is disabled for " + regionNode); 756 } 757 } 758 759 /** 760 * Create an assign TransitRegionStateProcedure. Makes sure of RegionState. Throws exception if 761 * not appropriate UNLESS override is set. Used by hbck2 but also by straightline 762 * {@link #assign(RegionInfo, ServerName)} and {@link #assignAsync(RegionInfo, ServerName)}. 763 * @see #createAssignProcedure(RegionStateNode, ServerName) for a version that does NO checking 764 * used when only when no checking needed. 765 * @param override If false, check RegionState is appropriate for assign; if not throw exception. 766 */ 767 private TransitRegionStateProcedure createAssignProcedure(RegionInfo regionInfo, ServerName sn, 768 boolean override, boolean force) throws IOException { 769 RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo); 770 regionNode.lock(); 771 try { 772 if (override) { 773 if (!force) { 774 preTransitCheck(regionNode, STATES_EXPECTED_ON_ASSIGN); 775 } 776 if (regionNode.getProcedure() != null) { 777 regionNode.unsetProcedure(regionNode.getProcedure()); 778 } 779 } else { 780 preTransitCheck(regionNode, STATES_EXPECTED_ON_ASSIGN); 781 } 782 assert regionNode.getProcedure() == null; 783 return regionNode.setProcedure( 784 TransitRegionStateProcedure.assign(getProcedureEnvironment(), regionInfo, sn)); 785 } finally { 786 regionNode.unlock(); 787 } 788 } 789 790 /** 791 * Create an assign TransitRegionStateProcedure. Does NO checking of RegionState. Presumes 792 * appriopriate state ripe for assign. 793 * @see #createAssignProcedure(RegionInfo, ServerName, boolean, boolean) 794 */ 795 private TransitRegionStateProcedure createAssignProcedure(RegionStateNode regionNode, 796 ServerName targetServer) { 797 regionNode.lock(); 798 try { 799 return regionNode.setProcedure(TransitRegionStateProcedure.assign(getProcedureEnvironment(), 800 regionNode.getRegionInfo(), targetServer)); 801 } finally { 802 regionNode.unlock(); 803 } 804 } 805 806 public long assign(RegionInfo regionInfo, ServerName sn) throws IOException { 807 TransitRegionStateProcedure proc = createAssignProcedure(regionInfo, sn, false, false); 808 ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc); 809 return proc.getProcId(); 810 } 811 812 public long assign(RegionInfo regionInfo) throws IOException { 813 return assign(regionInfo, null); 814 } 815 816 /** 817 * Submits a procedure that assigns a region to a target server without waiting for it to finish 818 * @param regionInfo the region we would like to assign 819 * @param sn target server name 820 */ 821 public Future<byte[]> assignAsync(RegionInfo regionInfo, ServerName sn) throws IOException { 822 return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), 823 createAssignProcedure(regionInfo, sn, false, false)); 824 } 825 826 /** 827 * Submits a procedure that assigns a region without waiting for it to finish 828 * @param regionInfo the region we would like to assign 829 */ 830 public Future<byte[]> assignAsync(RegionInfo regionInfo) throws IOException { 831 return assignAsync(regionInfo, null); 832 } 833 834 public long unassign(RegionInfo regionInfo) throws IOException { 835 RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo); 836 if (regionNode == null) { 837 throw new UnknownRegionException("No RegionState found for " + regionInfo.getEncodedName()); 838 } 839 TransitRegionStateProcedure proc; 840 regionNode.lock(); 841 try { 842 preTransitCheck(regionNode, STATES_EXPECTED_ON_UNASSIGN_OR_MOVE); 843 proc = TransitRegionStateProcedure.unassign(getProcedureEnvironment(), regionInfo); 844 regionNode.setProcedure(proc); 845 } finally { 846 regionNode.unlock(); 847 } 848 ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc); 849 return proc.getProcId(); 850 } 851 852 public TransitRegionStateProcedure createMoveRegionProcedure(RegionInfo regionInfo, 853 ServerName targetServer) throws HBaseIOException { 854 RegionStateNode regionNode = this.regionStates.getRegionStateNode(regionInfo); 855 if (regionNode == null) { 856 throw new UnknownRegionException( 857 "No RegionStateNode found for " + regionInfo.getEncodedName() + "(Closed/Deleted?)"); 858 } 859 TransitRegionStateProcedure proc; 860 regionNode.lock(); 861 try { 862 preTransitCheck(regionNode, STATES_EXPECTED_ON_UNASSIGN_OR_MOVE); 863 regionNode.checkOnline(); 864 proc = TransitRegionStateProcedure.move(getProcedureEnvironment(), regionInfo, targetServer); 865 regionNode.setProcedure(proc); 866 } finally { 867 regionNode.unlock(); 868 } 869 return proc; 870 } 871 872 public void move(RegionInfo regionInfo) throws IOException { 873 TransitRegionStateProcedure proc = createMoveRegionProcedure(regionInfo, null); 874 ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc); 875 } 876 877 public Future<byte[]> moveAsync(RegionPlan regionPlan) throws HBaseIOException { 878 TransitRegionStateProcedure proc = 879 createMoveRegionProcedure(regionPlan.getRegionInfo(), regionPlan.getDestination()); 880 return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), proc); 881 } 882 883 public Future<byte[]> balance(RegionPlan regionPlan) throws HBaseIOException { 884 ServerName current = 885 this.getRegionStates().getRegionAssignments().get(regionPlan.getRegionInfo()); 886 if (current == null || !current.equals(regionPlan.getSource())) { 887 LOG.debug("Skip region plan {}, source server not match, current region location is {}", 888 regionPlan, current == null ? "(null)" : current); 889 return null; 890 } 891 return moveAsync(regionPlan); 892 } 893 894 // ============================================================================================ 895 // RegionTransition procedures helpers 896 // ============================================================================================ 897 898 /** 899 * Create round-robin assigns. Use on table creation to distribute out regions across cluster. 900 * @return AssignProcedures made out of the passed in <code>hris</code> and a call to the balancer 901 * to populate the assigns with targets chosen using round-robin (default balancer 902 * scheme). If at assign-time, the target chosen is no longer up, thats fine, the 903 * AssignProcedure will ask the balancer for a new target, and so on. 904 */ 905 public TransitRegionStateProcedure[] createRoundRobinAssignProcedures(List<RegionInfo> hris, 906 List<ServerName> serversToExclude) { 907 if (hris.isEmpty()) { 908 return new TransitRegionStateProcedure[0]; 909 } 910 911 if ( 912 serversToExclude != null && this.master.getServerManager().getOnlineServersList().size() == 1 913 ) { 914 LOG.debug("Only one region server found and hence going ahead with the assignment"); 915 serversToExclude = null; 916 } 917 try { 918 // Ask the balancer to assign our regions. Pass the regions en masse. The balancer can do 919 // a better job if it has all the assignments in the one lump. 920 Map<ServerName, List<RegionInfo>> assignments = getBalancer().roundRobinAssignment(hris, 921 this.master.getServerManager().createDestinationServersList(serversToExclude)); 922 // Return mid-method! 923 return createAssignProcedures(assignments); 924 } catch (IOException hioe) { 925 LOG.warn("Failed roundRobinAssignment", hioe); 926 } 927 // If an error above, fall-through to this simpler assign. Last resort. 928 return createAssignProcedures(hris); 929 } 930 931 /** 932 * Create round-robin assigns. Use on table creation to distribute out regions across cluster. 933 * @return AssignProcedures made out of the passed in <code>hris</code> and a call to the balancer 934 * to populate the assigns with targets chosen using round-robin (default balancer 935 * scheme). If at assign-time, the target chosen is no longer up, thats fine, the 936 * AssignProcedure will ask the balancer for a new target, and so on. 937 */ 938 public TransitRegionStateProcedure[] createRoundRobinAssignProcedures(List<RegionInfo> hris) { 939 return createRoundRobinAssignProcedures(hris, null); 940 } 941 942 static int compare(TransitRegionStateProcedure left, TransitRegionStateProcedure right) { 943 if (left.getRegion().isMetaRegion()) { 944 if (right.getRegion().isMetaRegion()) { 945 return RegionInfo.COMPARATOR.compare(left.getRegion(), right.getRegion()); 946 } 947 return -1; 948 } else if (right.getRegion().isMetaRegion()) { 949 return +1; 950 } 951 if (left.getRegion().getTable().isSystemTable()) { 952 if (right.getRegion().getTable().isSystemTable()) { 953 return RegionInfo.COMPARATOR.compare(left.getRegion(), right.getRegion()); 954 } 955 return -1; 956 } else if (right.getRegion().getTable().isSystemTable()) { 957 return +1; 958 } 959 return RegionInfo.COMPARATOR.compare(left.getRegion(), right.getRegion()); 960 } 961 962 /** 963 * Create one TransitRegionStateProcedure to assign a region w/o specifying a target server. This 964 * method is called from HBCK2. 965 * @return an assign or null 966 */ 967 public TransitRegionStateProcedure createOneAssignProcedure(RegionInfo ri, boolean override, 968 boolean force) { 969 TransitRegionStateProcedure trsp = null; 970 try { 971 trsp = createAssignProcedure(ri, null, override, force); 972 } catch (IOException ioe) { 973 LOG.info( 974 "Failed {} assign, override={}" 975 + (override ? "" : "; set override to by-pass state checks."), 976 ri.getEncodedName(), override, ioe); 977 } 978 return trsp; 979 } 980 981 /** 982 * Create one TransitRegionStateProcedure to unassign a region. This method is called from HBCK2. 983 * @return an unassign or null 984 */ 985 public TransitRegionStateProcedure createOneUnassignProcedure(RegionInfo ri, boolean override, 986 boolean force) { 987 RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(ri); 988 TransitRegionStateProcedure trsp = null; 989 regionNode.lock(); 990 try { 991 if (override) { 992 if (!force) { 993 preTransitCheck(regionNode, STATES_EXPECTED_ON_UNASSIGN_OR_MOVE); 994 } 995 if (regionNode.getProcedure() != null) { 996 regionNode.unsetProcedure(regionNode.getProcedure()); 997 } 998 } else { 999 // This is where we could throw an exception; i.e. override is false. 1000 preTransitCheck(regionNode, STATES_EXPECTED_ON_UNASSIGN_OR_MOVE); 1001 } 1002 assert regionNode.getProcedure() == null; 1003 trsp = 1004 TransitRegionStateProcedure.unassign(getProcedureEnvironment(), regionNode.getRegionInfo()); 1005 regionNode.setProcedure(trsp); 1006 } catch (IOException ioe) { 1007 // 'override' must be false here. 1008 LOG.info("Failed {} unassign, override=false; set override to by-pass state checks.", 1009 ri.getEncodedName(), ioe); 1010 } finally { 1011 regionNode.unlock(); 1012 } 1013 return trsp; 1014 } 1015 1016 /** 1017 * Create an array of TransitRegionStateProcedure w/o specifying a target server. Used as fallback 1018 * of caller is unable to do {@link #createAssignProcedures(Map)}. 1019 * <p/> 1020 * If no target server, at assign time, we will try to use the former location of the region if 1021 * one exists. This is how we 'retain' the old location across a server restart. 1022 * <p/> 1023 * Should only be called when you can make sure that no one can touch these regions other than 1024 * you. For example, when you are creating or enabling table. Presumes all Regions are in 1025 * appropriate state ripe for assign; no checking of Region state is done in here. 1026 * @see #createAssignProcedures(Map) 1027 */ 1028 public TransitRegionStateProcedure[] createAssignProcedures(List<RegionInfo> hris) { 1029 return hris.stream().map(hri -> regionStates.getOrCreateRegionStateNode(hri)) 1030 .map(regionNode -> createAssignProcedure(regionNode, null)).sorted(AssignmentManager::compare) 1031 .toArray(TransitRegionStateProcedure[]::new); 1032 } 1033 1034 /** 1035 * Tied to {@link #createAssignProcedures(List)} in that it is called if caller is unable to run 1036 * this method. Presumes all Regions are in appropriate state ripe for assign; no checking of 1037 * Region state is done in here. 1038 * @param assignments Map of assignments from which we produce an array of AssignProcedures. 1039 * @return Assignments made from the passed in <code>assignments</code> 1040 * @see #createAssignProcedures(List) 1041 */ 1042 private TransitRegionStateProcedure[] 1043 createAssignProcedures(Map<ServerName, List<RegionInfo>> assignments) { 1044 return assignments.entrySet().stream() 1045 .flatMap(e -> e.getValue().stream().map(hri -> regionStates.getOrCreateRegionStateNode(hri)) 1046 .map(regionNode -> createAssignProcedure(regionNode, e.getKey()))) 1047 .sorted(AssignmentManager::compare).toArray(TransitRegionStateProcedure[]::new); 1048 } 1049 1050 // for creating unassign TRSP when disabling a table or closing excess region replicas 1051 private TransitRegionStateProcedure forceCreateUnssignProcedure(RegionStateNode regionNode) { 1052 regionNode.lock(); 1053 try { 1054 if (regionNode.isInState(State.OFFLINE, State.CLOSED, State.SPLIT)) { 1055 return null; 1056 } 1057 // in general, a split parent should be in CLOSED or SPLIT state, but anyway, let's check it 1058 // here for safety 1059 if (regionNode.getRegionInfo().isSplit()) { 1060 LOG.warn("{} is a split parent but not in CLOSED or SPLIT state", regionNode); 1061 return null; 1062 } 1063 // As in DisableTableProcedure or ModifyTableProcedure, we will hold the xlock for table, so 1064 // we can make sure that this procedure has not been executed yet, as TRSP will hold the 1065 // shared lock for table all the time. So here we will unset it and when it is actually 1066 // executed, it will find that the attach procedure is not itself and quit immediately. 1067 if (regionNode.getProcedure() != null) { 1068 regionNode.unsetProcedure(regionNode.getProcedure()); 1069 } 1070 return regionNode.setProcedure(TransitRegionStateProcedure.unassign(getProcedureEnvironment(), 1071 regionNode.getRegionInfo())); 1072 } finally { 1073 regionNode.unlock(); 1074 } 1075 } 1076 1077 /** 1078 * Called by DisableTableProcedure to unassign all the regions for a table. 1079 */ 1080 public TransitRegionStateProcedure[] createUnassignProceduresForDisabling(TableName tableName) { 1081 return regionStates.getTableRegionStateNodes(tableName).stream() 1082 .map(this::forceCreateUnssignProcedure).filter(p -> p != null) 1083 .toArray(TransitRegionStateProcedure[]::new); 1084 } 1085 1086 /** 1087 * Called by ModifyTableProcedures to unassign all the excess region replicas for a table. 1088 */ 1089 public TransitRegionStateProcedure[] createUnassignProceduresForClosingExcessRegionReplicas( 1090 TableName tableName, int newReplicaCount) { 1091 return regionStates.getTableRegionStateNodes(tableName).stream() 1092 .filter(regionNode -> regionNode.getRegionInfo().getReplicaId() >= newReplicaCount) 1093 .map(this::forceCreateUnssignProcedure).filter(p -> p != null) 1094 .toArray(TransitRegionStateProcedure[]::new); 1095 } 1096 1097 public SplitTableRegionProcedure createSplitProcedure(final RegionInfo regionToSplit, 1098 final byte[] splitKey) throws IOException { 1099 return new SplitTableRegionProcedure(getProcedureEnvironment(), regionToSplit, splitKey); 1100 } 1101 1102 public TruncateRegionProcedure createTruncateRegionProcedure(final RegionInfo regionToTruncate) 1103 throws IOException { 1104 return new TruncateRegionProcedure(getProcedureEnvironment(), regionToTruncate); 1105 } 1106 1107 public MergeTableRegionsProcedure createMergeProcedure(RegionInfo... ris) throws IOException { 1108 return new MergeTableRegionsProcedure(getProcedureEnvironment(), ris, false); 1109 } 1110 1111 /** 1112 * Delete the region states. This is called by "DeleteTable" 1113 */ 1114 public void deleteTable(final TableName tableName) throws IOException { 1115 final ArrayList<RegionInfo> regions = regionStates.getTableRegionsInfo(tableName); 1116 regionStateStore.deleteRegions(regions); 1117 for (int i = 0; i < regions.size(); ++i) { 1118 final RegionInfo regionInfo = regions.get(i); 1119 regionStates.deleteRegion(regionInfo); 1120 } 1121 } 1122 1123 // ============================================================================================ 1124 // RS Region Transition Report helpers 1125 // ============================================================================================ 1126 private void reportRegionStateTransition(ReportRegionStateTransitionResponse.Builder builder, 1127 ServerStateNode serverNode, List<RegionStateTransition> transitionList) throws IOException { 1128 for (RegionStateTransition transition : transitionList) { 1129 switch (transition.getTransitionCode()) { 1130 case OPENED: 1131 case FAILED_OPEN: 1132 case CLOSED: 1133 assert transition.getRegionInfoCount() == 1 : transition; 1134 final RegionInfo hri = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0)); 1135 long procId = 1136 transition.getProcIdCount() > 0 ? transition.getProcId(0) : Procedure.NO_PROC_ID; 1137 updateRegionTransition(serverNode, transition.getTransitionCode(), hri, 1138 transition.hasOpenSeqNum() ? transition.getOpenSeqNum() : HConstants.NO_SEQNUM, procId); 1139 break; 1140 case READY_TO_SPLIT: 1141 case SPLIT: 1142 case SPLIT_REVERTED: 1143 assert transition.getRegionInfoCount() == 3 : transition; 1144 final RegionInfo parent = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0)); 1145 final RegionInfo splitA = ProtobufUtil.toRegionInfo(transition.getRegionInfo(1)); 1146 final RegionInfo splitB = ProtobufUtil.toRegionInfo(transition.getRegionInfo(2)); 1147 updateRegionSplitTransition(serverNode, transition.getTransitionCode(), parent, splitA, 1148 splitB); 1149 break; 1150 case READY_TO_MERGE: 1151 case MERGED: 1152 case MERGE_REVERTED: 1153 assert transition.getRegionInfoCount() == 3 : transition; 1154 final RegionInfo merged = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0)); 1155 final RegionInfo mergeA = ProtobufUtil.toRegionInfo(transition.getRegionInfo(1)); 1156 final RegionInfo mergeB = ProtobufUtil.toRegionInfo(transition.getRegionInfo(2)); 1157 updateRegionMergeTransition(serverNode, transition.getTransitionCode(), merged, mergeA, 1158 mergeB); 1159 break; 1160 } 1161 } 1162 } 1163 1164 public ReportRegionStateTransitionResponse reportRegionStateTransition( 1165 final ReportRegionStateTransitionRequest req) throws PleaseHoldException { 1166 ReportRegionStateTransitionResponse.Builder builder = 1167 ReportRegionStateTransitionResponse.newBuilder(); 1168 ServerName serverName = ProtobufUtil.toServerName(req.getServer()); 1169 ServerStateNode serverNode = regionStates.getServerNode(serverName); 1170 if (serverNode == null) { 1171 LOG.warn("No server node for {}", serverName); 1172 builder.setErrorMessage("No server node for " + serverName); 1173 return builder.build(); 1174 } 1175 // here we have to acquire a read lock instead of a simple exclusive lock. This is because that 1176 // we should not block other reportRegionStateTransition call from the same region server. This 1177 // is not only about performance, but also to prevent dead lock. Think of the meta region is 1178 // also on the same region server and you hold the lock which blocks the 1179 // reportRegionStateTransition for meta, and since meta is not online, you will block inside the 1180 // lock protection to wait for meta online... 1181 serverNode.readLock().lock(); 1182 try { 1183 // we only accept reportRegionStateTransition if the region server is online, see the comment 1184 // above in submitServerCrash method and HBASE-21508 for more details. 1185 if (serverNode.isInState(ServerState.ONLINE)) { 1186 try { 1187 reportRegionStateTransition(builder, serverNode, req.getTransitionList()); 1188 } catch (PleaseHoldException e) { 1189 LOG.trace("Failed transition ", e); 1190 throw e; 1191 } catch (UnsupportedOperationException | IOException e) { 1192 // TODO: at the moment we have a single error message and the RS will abort 1193 // if the master says that one of the region transitions failed. 1194 LOG.warn("Failed transition", e); 1195 builder.setErrorMessage("Failed transition " + e.getMessage()); 1196 } 1197 } else { 1198 LOG.warn("The region server {} is already dead, skip reportRegionStateTransition call", 1199 serverName); 1200 builder.setErrorMessage("You are dead"); 1201 } 1202 } finally { 1203 serverNode.readLock().unlock(); 1204 } 1205 1206 return builder.build(); 1207 } 1208 1209 private void updateRegionTransition(ServerStateNode serverNode, TransitionCode state, 1210 RegionInfo regionInfo, long seqId, long procId) throws IOException { 1211 checkMetaLoaded(regionInfo); 1212 1213 RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo); 1214 if (regionNode == null) { 1215 // the table/region is gone. maybe a delete, split, merge 1216 throw new UnexpectedStateException(String.format( 1217 "Server %s was trying to transition region %s to %s. but Region is not known.", 1218 serverNode.getServerName(), regionInfo, state)); 1219 } 1220 LOG.trace("Update region transition serverName={} region={} regionState={}", 1221 serverNode.getServerName(), regionNode, state); 1222 1223 regionNode.lock(); 1224 try { 1225 if (!reportTransition(regionNode, serverNode, state, seqId, procId)) { 1226 // Don't log WARN if shutting down cluster; during shutdown. Avoid the below messages: 1227 // 2018-08-13 10:45:10,551 WARN ...AssignmentManager: No matching procedure found for 1228 // rit=OPEN, location=ve0538.halxg.cloudera.com,16020,1533493000958, 1229 // table=IntegrationTestBigLinkedList, region=65ab289e2fc1530df65f6c3d7cde7aa5 transition 1230 // to CLOSED 1231 // These happen because on cluster shutdown, we currently let the RegionServers close 1232 // regions. This is the only time that region close is not run by the Master (so cluster 1233 // goes down fast). Consider changing it so Master runs all shutdowns. 1234 if ( 1235 this.master.getServerManager().isClusterShutdown() && state.equals(TransitionCode.CLOSED) 1236 ) { 1237 LOG.info("RegionServer {} {}", state, regionNode.getRegionInfo().getEncodedName()); 1238 } else { 1239 LOG.warn("No matching procedure found for {} transition on {} to {}", 1240 serverNode.getServerName(), regionNode, state); 1241 } 1242 } 1243 } finally { 1244 regionNode.unlock(); 1245 } 1246 } 1247 1248 private boolean reportTransition(RegionStateNode regionNode, ServerStateNode serverNode, 1249 TransitionCode state, long seqId, long procId) throws IOException { 1250 ServerName serverName = serverNode.getServerName(); 1251 TransitRegionStateProcedure proc = regionNode.getProcedure(); 1252 if (proc == null) { 1253 return false; 1254 } 1255 proc.reportTransition(master.getMasterProcedureExecutor().getEnvironment(), regionNode, 1256 serverName, state, seqId, procId); 1257 return true; 1258 } 1259 1260 private void updateRegionSplitTransition(final ServerStateNode serverNode, 1261 final TransitionCode state, final RegionInfo parent, final RegionInfo hriA, 1262 final RegionInfo hriB) throws IOException { 1263 checkMetaLoaded(parent); 1264 1265 if (state != TransitionCode.READY_TO_SPLIT) { 1266 throw new UnexpectedStateException( 1267 "unsupported split regionState=" + state + " for parent region " + parent 1268 + " maybe an old RS (< 2.0) had the operation in progress"); 1269 } 1270 1271 // sanity check on the request 1272 if (!Bytes.equals(hriA.getEndKey(), hriB.getStartKey())) { 1273 throw new UnsupportedOperationException("unsupported split request with bad keys: parent=" 1274 + parent + " hriA=" + hriA + " hriB=" + hriB); 1275 } 1276 1277 if (!master.isSplitOrMergeEnabled(MasterSwitchType.SPLIT)) { 1278 LOG.warn("Split switch is off! skip split of " + parent); 1279 throw new DoNotRetryIOException( 1280 "Split region " + parent.getRegionNameAsString() + " failed due to split switch off"); 1281 } 1282 1283 // Submit the Split procedure 1284 final byte[] splitKey = hriB.getStartKey(); 1285 if (LOG.isDebugEnabled()) { 1286 LOG.debug("Split request from {}, parent={}, splitKey={}", serverNode.getServerName(), parent, 1287 Bytes.toStringBinary(splitKey)); 1288 } 1289 // Processing this report happens asynchronously from other activities which can mutate 1290 // the region state. For example, a split procedure may already be running for this parent. 1291 // A split procedure cannot succeed if the parent region is no longer open, so we can 1292 // ignore it in that case. 1293 // Note that submitting more than one split procedure for a given region is 1294 // harmless -- the split is fenced in the procedure handling -- but it would be noisy in 1295 // the logs. Only one procedure can succeed. The other procedure(s) would abort during 1296 // initialization and report failure with WARN level logging. 1297 RegionState parentState = regionStates.getRegionState(parent); 1298 if (parentState != null && parentState.isOpened()) { 1299 master.getMasterProcedureExecutor().submitProcedure(createSplitProcedure(parent, splitKey)); 1300 } else { 1301 LOG.info("Ignoring split request from {}, parent={} because parent is unknown or not open", 1302 serverNode.getServerName(), parent); 1303 return; 1304 } 1305 1306 // If the RS is < 2.0 throw an exception to abort the operation, we are handling the split 1307 if (master.getServerManager().getVersionNumber(serverNode.getServerName()) < 0x0200000) { 1308 throw new UnsupportedOperationException( 1309 String.format("Split handled by the master: " + "parent=%s hriA=%s hriB=%s", 1310 parent.getShortNameToLog(), hriA, hriB)); 1311 } 1312 } 1313 1314 private void updateRegionMergeTransition(final ServerStateNode serverNode, 1315 final TransitionCode state, final RegionInfo merged, final RegionInfo hriA, 1316 final RegionInfo hriB) throws IOException { 1317 checkMetaLoaded(merged); 1318 1319 if (state != TransitionCode.READY_TO_MERGE) { 1320 throw new UnexpectedStateException( 1321 "Unsupported merge regionState=" + state + " for regionA=" + hriA + " regionB=" + hriB 1322 + " merged=" + merged + " maybe an old RS (< 2.0) had the operation in progress"); 1323 } 1324 1325 if (!master.isSplitOrMergeEnabled(MasterSwitchType.MERGE)) { 1326 LOG.warn("Merge switch is off! skip merge of regionA=" + hriA + " regionB=" + hriB); 1327 throw new DoNotRetryIOException( 1328 "Merge of regionA=" + hriA + " regionB=" + hriB + " failed because merge switch is off"); 1329 } 1330 1331 // Submit the Merge procedure 1332 if (LOG.isDebugEnabled()) { 1333 LOG.debug("Handling merge request from RS=" + merged + ", merged=" + merged); 1334 } 1335 master.getMasterProcedureExecutor().submitProcedure(createMergeProcedure(hriA, hriB)); 1336 1337 // If the RS is < 2.0 throw an exception to abort the operation, we are handling the merge 1338 if (master.getServerManager().getVersionNumber(serverNode.getServerName()) < 0x0200000) { 1339 throw new UnsupportedOperationException( 1340 String.format("Merge not handled yet: regionState=%s merged=%s hriA=%s hriB=%s", state, 1341 merged, hriA, hriB)); 1342 } 1343 } 1344 1345 // ============================================================================================ 1346 // RS Status update (report online regions) helpers 1347 // ============================================================================================ 1348 /** 1349 * The master will call this method when the RS send the regionServerReport(). The report will 1350 * contains the "online regions". This method will check the the online regions against the 1351 * in-memory state of the AM, and we will log a warn message if there is a mismatch. This is 1352 * because that there is no fencing between the reportRegionStateTransition method and 1353 * regionServerReport method, so there could be race and introduce inconsistency here, but 1354 * actually there is no problem. 1355 * <p/> 1356 * Please see HBASE-21421 and HBASE-21463 for more details. 1357 */ 1358 public void reportOnlineRegions(ServerName serverName, Set<byte[]> regionNames) { 1359 if (!isRunning()) { 1360 return; 1361 } 1362 if (LOG.isTraceEnabled()) { 1363 LOG.trace("ReportOnlineRegions {} regionCount={}, metaLoaded={} {}", serverName, 1364 regionNames.size(), isMetaLoaded(), 1365 regionNames.stream().map(Bytes::toStringBinary).collect(Collectors.toList())); 1366 } 1367 1368 ServerStateNode serverNode = regionStates.getServerNode(serverName); 1369 if (serverNode == null) { 1370 LOG.warn("Got a report from server {} where its server node is null", serverName); 1371 return; 1372 } 1373 serverNode.readLock().lock(); 1374 try { 1375 if (!serverNode.isInState(ServerState.ONLINE)) { 1376 LOG.warn("Got a report from a server result in state {}", serverNode); 1377 return; 1378 } 1379 } finally { 1380 serverNode.readLock().unlock(); 1381 } 1382 1383 // Track the regionserver reported online regions in memory. 1384 synchronized (rsReports) { 1385 rsReports.put(serverName, regionNames); 1386 } 1387 1388 if (regionNames.isEmpty()) { 1389 // nothing to do if we don't have regions 1390 LOG.trace("no online region found on {}", serverName); 1391 return; 1392 } 1393 if (!isMetaLoaded()) { 1394 // we are still on startup, skip checking 1395 return; 1396 } 1397 // The Heartbeat tells us of what regions are on the region serve, check the state. 1398 checkOnlineRegionsReport(serverNode, regionNames); 1399 } 1400 1401 /** 1402 * Close <code>regionName</code> on <code>sn</code> silently and immediately without using a 1403 * Procedure or going via hbase:meta. For case where a RegionServer's hosting of a Region is not 1404 * aligned w/ the Master's accounting of Region state. This is for cleaning up an error in 1405 * accounting. 1406 */ 1407 private void closeRegionSilently(ServerName sn, byte[] regionName) { 1408 try { 1409 RegionInfo ri = CatalogFamilyFormat.parseRegionInfoFromRegionName(regionName); 1410 // Pass -1 for timeout. Means do not wait. 1411 ServerManager.closeRegionSilentlyAndWait(this.master.getAsyncClusterConnection(), sn, ri, -1); 1412 } catch (Exception e) { 1413 LOG.error("Failed trying to close {} on {}", Bytes.toStringBinary(regionName), sn, e); 1414 } 1415 } 1416 1417 /** 1418 * Check that what the RegionServer reports aligns with the Master's image. If disagreement, we 1419 * will tell the RegionServer to expediently close a Region we do not think it should have. 1420 */ 1421 private void checkOnlineRegionsReport(ServerStateNode serverNode, Set<byte[]> regionNames) { 1422 ServerName serverName = serverNode.getServerName(); 1423 for (byte[] regionName : regionNames) { 1424 if (!isRunning()) { 1425 return; 1426 } 1427 RegionStateNode regionNode = regionStates.getRegionStateNodeFromName(regionName); 1428 if (regionNode == null) { 1429 String regionNameAsStr = Bytes.toStringBinary(regionName); 1430 LOG.warn("No RegionStateNode for {} but reported as up on {}; closing...", regionNameAsStr, 1431 serverName); 1432 closeRegionSilently(serverNode.getServerName(), regionName); 1433 continue; 1434 } 1435 final long lag = 1000; 1436 // This is just a fallback check designed to identify unexpected data inconsistencies, so we 1437 // use tryLock to attempt to acquire the lock, and if the lock cannot be acquired, we skip the 1438 // check. This will not cause any additional problems and also prevents the regionServerReport 1439 // call from being stuck for too long which may cause deadlock on region assignment. 1440 if (regionNode.tryLock()) { 1441 try { 1442 long diff = EnvironmentEdgeManager.currentTime() - regionNode.getLastUpdate(); 1443 if (regionNode.isInState(State.OPENING, State.OPEN)) { 1444 // This is possible as a region server has just closed a region but the region server 1445 // report is generated before the closing, but arrive after the closing. Make sure 1446 // there 1447 // is some elapsed time so less false alarms. 1448 if (!regionNode.getRegionLocation().equals(serverName) && diff > lag) { 1449 LOG.warn("Reporting {} server does not match {} (time since last " 1450 + "update={}ms); closing...", serverName, regionNode, diff); 1451 closeRegionSilently(serverNode.getServerName(), regionName); 1452 } 1453 } else if (!regionNode.isInState(State.CLOSING, State.SPLITTING)) { 1454 // So, we can get report that a region is CLOSED or SPLIT because a heartbeat 1455 // came in at about same time as a region transition. Make sure there is some 1456 // elapsed time so less false alarms. 1457 if (diff > lag) { 1458 LOG.warn("Reporting {} state does not match {} (time since last update={}ms)", 1459 serverName, regionNode, diff); 1460 } 1461 } 1462 } finally { 1463 regionNode.unlock(); 1464 } 1465 } else { 1466 LOG.warn( 1467 "Unable to acquire lock for regionNode {}. It is likely that another thread is currently holding the lock. To avoid deadlock, skip execution for now.", 1468 regionNode); 1469 } 1470 } 1471 } 1472 1473 // ============================================================================================ 1474 // RIT chore 1475 // ============================================================================================ 1476 private static class RegionInTransitionChore extends ProcedureInMemoryChore<MasterProcedureEnv> { 1477 public RegionInTransitionChore(final int timeoutMsec) { 1478 super(timeoutMsec); 1479 } 1480 1481 @Override 1482 protected void periodicExecute(final MasterProcedureEnv env) { 1483 final AssignmentManager am = env.getAssignmentManager(); 1484 1485 final RegionInTransitionStat ritStat = am.computeRegionInTransitionStat(); 1486 if (ritStat.hasRegionsOverThreshold()) { 1487 for (RegionState hri : ritStat.getRegionOverThreshold()) { 1488 am.handleRegionOverStuckWarningThreshold(hri.getRegion()); 1489 } 1490 } 1491 1492 // update metrics 1493 am.updateRegionsInTransitionMetrics(ritStat); 1494 } 1495 } 1496 1497 private static class DeadServerMetricRegionChore 1498 extends ProcedureInMemoryChore<MasterProcedureEnv> { 1499 public DeadServerMetricRegionChore(final int timeoutMsec) { 1500 super(timeoutMsec); 1501 } 1502 1503 @Override 1504 protected void periodicExecute(final MasterProcedureEnv env) { 1505 final ServerManager sm = env.getMasterServices().getServerManager(); 1506 final AssignmentManager am = env.getAssignmentManager(); 1507 // To minimize inconsistencies we are not going to snapshot live servers in advance in case 1508 // new servers are added; OTOH we don't want to add heavy sync for a consistent view since 1509 // this is for metrics. Instead, we're going to check each regions as we go; to avoid making 1510 // too many checks, we maintain a local lists of server, limiting us to false negatives. If 1511 // we miss some recently-dead server, we'll just see it next time. 1512 Set<ServerName> recentlyLiveServers = new HashSet<>(); 1513 int deadRegions = 0, unknownRegions = 0; 1514 for (RegionStateNode rsn : am.getRegionStates().getRegionStateNodes()) { 1515 if (rsn.getState() != State.OPEN) { 1516 continue; // Opportunistic check, should quickly skip RITs, offline tables, etc. 1517 } 1518 // Do not need to acquire region state lock as this is only for showing metrics. 1519 ServerName sn = rsn.getRegionLocation(); 1520 State state = rsn.getState(); 1521 if (state != State.OPEN) { 1522 continue; // Mostly skipping RITs that are already being take care of. 1523 } 1524 if (sn == null) { 1525 ++unknownRegions; // Opened on null? 1526 continue; 1527 } 1528 if (recentlyLiveServers.contains(sn)) { 1529 continue; 1530 } 1531 ServerManager.ServerLiveState sls = sm.isServerKnownAndOnline(sn); 1532 switch (sls) { 1533 case LIVE: 1534 recentlyLiveServers.add(sn); 1535 break; 1536 case DEAD: 1537 ++deadRegions; 1538 break; 1539 case UNKNOWN: 1540 ++unknownRegions; 1541 break; 1542 default: 1543 throw new AssertionError("Unexpected " + sls); 1544 } 1545 } 1546 if (deadRegions > 0 || unknownRegions > 0) { 1547 LOG.info("Found {} OPEN regions on dead servers and {} OPEN regions on unknown servers", 1548 deadRegions, unknownRegions); 1549 } 1550 1551 am.updateDeadServerRegionMetrics(deadRegions, unknownRegions); 1552 } 1553 } 1554 1555 public RegionInTransitionStat computeRegionInTransitionStat() { 1556 final RegionInTransitionStat rit = new RegionInTransitionStat(getConfiguration()); 1557 rit.update(this); 1558 return rit; 1559 } 1560 1561 public static class RegionInTransitionStat { 1562 private final int ritThreshold; 1563 1564 private HashMap<String, RegionState> ritsOverThreshold = null; 1565 private long statTimestamp; 1566 private long oldestRITTime = 0; 1567 private int totalRITsTwiceThreshold = 0; 1568 private int totalRITs = 0; 1569 1570 public RegionInTransitionStat(final Configuration conf) { 1571 this.ritThreshold = 1572 conf.getInt(METRICS_RIT_STUCK_WARNING_THRESHOLD, DEFAULT_RIT_STUCK_WARNING_THRESHOLD); 1573 } 1574 1575 public int getRITThreshold() { 1576 return ritThreshold; 1577 } 1578 1579 public long getTimestamp() { 1580 return statTimestamp; 1581 } 1582 1583 public int getTotalRITs() { 1584 return totalRITs; 1585 } 1586 1587 public long getOldestRITTime() { 1588 return oldestRITTime; 1589 } 1590 1591 public int getTotalRITsOverThreshold() { 1592 Map<String, RegionState> m = this.ritsOverThreshold; 1593 return m != null ? m.size() : 0; 1594 } 1595 1596 public boolean hasRegionsTwiceOverThreshold() { 1597 return totalRITsTwiceThreshold > 0; 1598 } 1599 1600 public boolean hasRegionsOverThreshold() { 1601 Map<String, RegionState> m = this.ritsOverThreshold; 1602 return m != null && !m.isEmpty(); 1603 } 1604 1605 public Collection<RegionState> getRegionOverThreshold() { 1606 Map<String, RegionState> m = this.ritsOverThreshold; 1607 return m != null ? m.values() : Collections.emptySet(); 1608 } 1609 1610 public boolean isRegionOverThreshold(final RegionInfo regionInfo) { 1611 Map<String, RegionState> m = this.ritsOverThreshold; 1612 return m != null && m.containsKey(regionInfo.getEncodedName()); 1613 } 1614 1615 public boolean isRegionTwiceOverThreshold(final RegionInfo regionInfo) { 1616 Map<String, RegionState> m = this.ritsOverThreshold; 1617 if (m == null) { 1618 return false; 1619 } 1620 final RegionState state = m.get(regionInfo.getEncodedName()); 1621 if (state == null) { 1622 return false; 1623 } 1624 return (statTimestamp - state.getStamp()) > (ritThreshold * 2); 1625 } 1626 1627 protected void update(final AssignmentManager am) { 1628 final RegionStates regionStates = am.getRegionStates(); 1629 this.statTimestamp = EnvironmentEdgeManager.currentTime(); 1630 update(regionStates.getRegionsStateInTransition(), statTimestamp); 1631 update(regionStates.getRegionFailedOpen(), statTimestamp); 1632 1633 if (LOG.isDebugEnabled() && ritsOverThreshold != null && !ritsOverThreshold.isEmpty()) { 1634 LOG.debug("RITs over threshold: {}", 1635 ritsOverThreshold.entrySet().stream() 1636 .map(e -> e.getKey() + ":" + e.getValue().getState().name()) 1637 .collect(Collectors.joining("\n"))); 1638 } 1639 } 1640 1641 private void update(final Collection<RegionState> regions, final long currentTime) { 1642 for (RegionState state : regions) { 1643 totalRITs++; 1644 final long ritStartedMs = state.getStamp(); 1645 if (ritStartedMs == 0) { 1646 // Don't output bogus values to metrics if they accidentally make it here. 1647 LOG.warn("The RIT {} has no start time", state.getRegion()); 1648 continue; 1649 } 1650 final long ritTime = currentTime - ritStartedMs; 1651 if (ritTime > ritThreshold) { 1652 if (ritsOverThreshold == null) { 1653 ritsOverThreshold = new HashMap<String, RegionState>(); 1654 } 1655 ritsOverThreshold.put(state.getRegion().getEncodedName(), state); 1656 totalRITsTwiceThreshold += (ritTime > (ritThreshold * 2)) ? 1 : 0; 1657 } 1658 if (oldestRITTime < ritTime) { 1659 oldestRITTime = ritTime; 1660 } 1661 } 1662 } 1663 } 1664 1665 private void updateRegionsInTransitionMetrics(final RegionInTransitionStat ritStat) { 1666 metrics.updateRITOldestAge(ritStat.getOldestRITTime()); 1667 metrics.updateRITCount(ritStat.getTotalRITs()); 1668 metrics.updateRITCountOverThreshold(ritStat.getTotalRITsOverThreshold()); 1669 } 1670 1671 private void updateDeadServerRegionMetrics(int deadRegions, int unknownRegions) { 1672 metrics.updateDeadServerOpenRegions(deadRegions); 1673 metrics.updateUnknownServerOpenRegions(unknownRegions); 1674 } 1675 1676 private void handleRegionOverStuckWarningThreshold(final RegionInfo regionInfo) { 1677 final RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo); 1678 // if (regionNode.isStuck()) { 1679 LOG.warn("STUCK Region-In-Transition {}", regionNode); 1680 } 1681 1682 // ============================================================================================ 1683 // TODO: Master load/bootstrap 1684 // ============================================================================================ 1685 public void joinCluster() throws IOException { 1686 long startTime = System.nanoTime(); 1687 LOG.debug("Joining cluster..."); 1688 1689 // Scan hbase:meta to build list of existing regions, servers, and assignment. 1690 // hbase:meta is online now or will be. Inside loadMeta, we keep trying. Can't make progress 1691 // w/o meta. 1692 loadMeta(); 1693 1694 while (master.getServerManager().countOfRegionServers() < 1) { 1695 LOG.info("Waiting for RegionServers to join; current count={}", 1696 master.getServerManager().countOfRegionServers()); 1697 Threads.sleep(250); 1698 } 1699 LOG.info("Number of RegionServers={}", master.getServerManager().countOfRegionServers()); 1700 1701 // Start the chores 1702 master.getMasterProcedureExecutor().addChore(this.ritChore); 1703 master.getMasterProcedureExecutor().addChore(this.deadMetricChore); 1704 1705 long costMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime); 1706 LOG.info("Joined the cluster in {}", StringUtils.humanTimeDiff(costMs)); 1707 } 1708 1709 /** 1710 * Create assign procedure for offline regions. Just follow the old 1711 * processofflineServersWithOnlineRegions method. Since now we do not need to deal with dead 1712 * server any more, we only deal with the regions in OFFLINE state in this method. And this is a 1713 * bit strange, that for new regions, we will add it in CLOSED state instead of OFFLINE state, and 1714 * usually there will be a procedure to track them. The processofflineServersWithOnlineRegions is 1715 * a legacy from long ago, as things are going really different now, maybe we do not need this 1716 * method any more. Need to revisit later. 1717 */ 1718 // Public so can be run by the Master as part of the startup. Needs hbase:meta to be online. 1719 // Needs to be done after the table state manager has been started. 1720 public void processOfflineRegions() { 1721 TransitRegionStateProcedure[] procs = 1722 regionStates.getRegionStateNodes().stream().filter(rsn -> rsn.isInState(State.OFFLINE)) 1723 .filter(rsn -> isTableEnabled(rsn.getRegionInfo().getTable())).map(rsn -> { 1724 rsn.lock(); 1725 try { 1726 if (rsn.getProcedure() != null) { 1727 return null; 1728 } else { 1729 return rsn.setProcedure(TransitRegionStateProcedure.assign(getProcedureEnvironment(), 1730 rsn.getRegionInfo(), null)); 1731 } 1732 } finally { 1733 rsn.unlock(); 1734 } 1735 }).filter(p -> p != null).toArray(TransitRegionStateProcedure[]::new); 1736 if (procs.length > 0) { 1737 master.getMasterProcedureExecutor().submitProcedures(procs); 1738 } 1739 } 1740 1741 /* 1742 * AM internal RegionStateStore.RegionStateVisitor implementation. To be used when scanning META 1743 * table for region rows, using RegionStateStore utility methods. RegionStateStore methods will 1744 * convert Result into proper RegionInfo instances, but those would still need to be added into 1745 * AssignmentManager.regionStates in-memory cache. RegionMetaLoadingVisitor.visitRegionState 1746 * method provides the logic for adding RegionInfo instances as loaded from latest META scan into 1747 * AssignmentManager.regionStates. 1748 */ 1749 private class RegionMetaLoadingVisitor implements RegionStateStore.RegionStateVisitor { 1750 1751 @Override 1752 public void visitRegionState(Result result, final RegionInfo regionInfo, final State state, 1753 final ServerName regionLocation, final ServerName lastHost, final long openSeqNum) { 1754 if ( 1755 state == null && regionLocation == null && lastHost == null 1756 && openSeqNum == SequenceId.NO_SEQUENCE_ID 1757 ) { 1758 // This is a row with nothing in it. 1759 LOG.warn("Skipping empty row={}", result); 1760 return; 1761 } 1762 State localState = state; 1763 if (localState == null) { 1764 // No region state column data in hbase:meta table! Are I doing a rolling upgrade from 1765 // hbase1 to hbase2? Am I restoring a SNAPSHOT or otherwise adding a region to hbase:meta? 1766 // In any of these cases, state is empty. For now, presume OFFLINE but there are probably 1767 // cases where we need to probe more to be sure this correct; TODO informed by experience. 1768 LOG.info(regionInfo.getEncodedName() + " regionState=null; presuming " + State.OFFLINE); 1769 localState = State.OFFLINE; 1770 } 1771 RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo); 1772 // Do not need to lock on regionNode, as we can make sure that before we finish loading 1773 // meta, all the related procedures can not be executed. The only exception is for meta 1774 // region related operations, but here we do not load the informations for meta region. 1775 regionNode.setState(localState); 1776 regionNode.setLastHost(lastHost); 1777 regionNode.setRegionLocation(regionLocation); 1778 regionNode.setOpenSeqNum(openSeqNum); 1779 1780 // Note: keep consistent with other methods, see region(Opening|Opened|Closing) 1781 // RIT/ServerCrash handling should take care of the transiting regions. 1782 if ( 1783 localState.matches(State.OPEN, State.OPENING, State.CLOSING, State.SPLITTING, State.MERGING) 1784 ) { 1785 assert regionLocation != null : "found null region location for " + regionNode; 1786 // TODO: this could lead to some orphan server state nodes, as it is possible that the 1787 // region server is already dead and its SCP has already finished but we have 1788 // persisted an opening state on this region server. Finally the TRSP will assign the 1789 // region to another region server, so it will not cause critical problems, just waste 1790 // some memory as no one will try to cleanup these orphan server state nodes. 1791 regionStates.createServer(regionLocation); 1792 regionStates.addRegionToServer(regionNode); 1793 } else if (localState == State.OFFLINE || regionInfo.isOffline()) { 1794 regionStates.addToOfflineRegions(regionNode); 1795 } 1796 if (regionNode.getProcedure() != null) { 1797 regionNode.getProcedure().stateLoaded(AssignmentManager.this, regionNode); 1798 } 1799 } 1800 }; 1801 1802 /** 1803 * Attempt to load {@code regionInfo} from META, adding any results to the 1804 * {@link #regionStateStore} Is NOT aware of replica regions. 1805 * @param regionInfo the region to be loaded from META. 1806 * @throws IOException If some error occurs while querying META or parsing results. 1807 */ 1808 public void populateRegionStatesFromMeta(@NonNull final RegionInfo regionInfo) 1809 throws IOException { 1810 final String regionEncodedName = RegionInfo.DEFAULT_REPLICA_ID == regionInfo.getReplicaId() 1811 ? regionInfo.getEncodedName() 1812 : RegionInfoBuilder.newBuilder(regionInfo).setReplicaId(RegionInfo.DEFAULT_REPLICA_ID).build() 1813 .getEncodedName(); 1814 populateRegionStatesFromMeta(regionEncodedName); 1815 } 1816 1817 /** 1818 * Attempt to load {@code regionEncodedName} from META, adding any results to the 1819 * {@link #regionStateStore} Is NOT aware of replica regions. 1820 * @param regionEncodedName encoded name for the region to be loaded from META. 1821 * @throws IOException If some error occurs while querying META or parsing results. 1822 */ 1823 public void populateRegionStatesFromMeta(@NonNull String regionEncodedName) throws IOException { 1824 final RegionMetaLoadingVisitor visitor = new RegionMetaLoadingVisitor(); 1825 regionStateStore.visitMetaForRegion(regionEncodedName, visitor); 1826 } 1827 1828 private void loadMeta() throws IOException { 1829 // TODO: use a thread pool 1830 regionStateStore.visitMeta(new RegionMetaLoadingVisitor()); 1831 } 1832 1833 /** 1834 * Used to check if the meta loading is done. 1835 * <p/> 1836 * if not we throw PleaseHoldException since we are rebuilding the RegionStates 1837 * @param hri region to check if it is already rebuild 1838 * @throws PleaseHoldException if meta has not been loaded yet 1839 */ 1840 private void checkMetaLoaded(RegionInfo hri) throws PleaseHoldException { 1841 if (!isRunning()) { 1842 throw new PleaseHoldException("AssignmentManager not running"); 1843 } 1844 boolean meta = isMetaRegion(hri); 1845 boolean metaLoaded = isMetaLoaded(); 1846 if (!meta && !metaLoaded) { 1847 throw new PleaseHoldException( 1848 "Master not fully online; hbase:meta=" + meta + ", metaLoaded=" + metaLoaded); 1849 } 1850 } 1851 1852 // ============================================================================================ 1853 // TODO: Metrics 1854 // ============================================================================================ 1855 public int getNumRegionsOpened() { 1856 // TODO: Used by TestRegionPlacement.java and assume monotonically increasing value 1857 return 0; 1858 } 1859 1860 /** 1861 * Usually run by the Master in reaction to server crash during normal processing. Can also be 1862 * invoked via external RPC to effect repair; in the latter case, the 'force' flag is set so we 1863 * push through the SCP though context may indicate already-running-SCP (An old SCP may have 1864 * exited abnormally, or damaged cluster may still have references in hbase:meta to 'Unknown 1865 * Servers' -- servers that are not online or in dead servers list, etc.) 1866 * @param force Set if the request came in externally over RPC (via hbck2). Force means run the 1867 * SCP even if it seems as though there might be an outstanding SCP running. 1868 * @return pid of scheduled SCP or {@link Procedure#NO_PROC_ID} if none scheduled. 1869 */ 1870 public long submitServerCrash(ServerName serverName, boolean shouldSplitWal, boolean force) { 1871 // May be an 'Unknown Server' so handle case where serverNode is null. 1872 ServerStateNode serverNode = regionStates.getServerNode(serverName); 1873 // Remove the in-memory rsReports result 1874 synchronized (rsReports) { 1875 rsReports.remove(serverName); 1876 } 1877 if (serverNode == null) { 1878 if (force) { 1879 LOG.info("Force adding ServerCrashProcedure for {} when server node is null", serverName); 1880 } else { 1881 // for normal case, do not schedule SCP if ServerStateNode is null 1882 LOG.warn("Skip adding ServerCrashProcedure for {} because server node is null", serverName); 1883 return Procedure.NO_PROC_ID; 1884 } 1885 } 1886 1887 ProcedureExecutor<MasterProcedureEnv> procExec = this.master.getMasterProcedureExecutor(); 1888 // We hold the write lock here for fencing on reportRegionStateTransition. Once we set the 1889 // server state to CRASHED, we will no longer accept the reportRegionStateTransition call from 1890 // this server. This is used to simplify the implementation for TRSP and SCP, where we can make 1891 // sure that, the region list fetched by SCP will not be changed any more. 1892 if (serverNode != null) { 1893 serverNode.writeLock().lock(); 1894 } 1895 try { 1896 1897 boolean carryingMeta = isCarryingMeta(serverName); 1898 if (serverNode != null && !serverNode.isInState(ServerState.ONLINE)) { 1899 if (force) { 1900 LOG.info("Force adding ServerCrashProcedure for {} (meta={}) when state is not {}", 1901 serverNode, carryingMeta, ServerState.ONLINE); 1902 } else { 1903 LOG.info("Skip adding ServerCrashProcedure for {} (meta={}) when state is not {}", 1904 serverNode, carryingMeta, ServerState.ONLINE); 1905 return Procedure.NO_PROC_ID; 1906 } 1907 } 1908 MasterProcedureEnv mpe = procExec.getEnvironment(); 1909 // If serverNode == null, then 'Unknown Server'. Schedule HBCKSCP instead. 1910 // HBCKSCP scours Master in-memory state AND hbase;meta for references to 1911 // serverName just-in-case. An SCP that is scheduled when the server is 1912 // 'Unknown' probably originated externally with HBCK2 fix-it tool. 1913 ServerState oldState = null; 1914 if (serverNode != null) { 1915 oldState = serverNode.getState(); 1916 serverNode.setState(ServerState.CRASHED); 1917 } 1918 ServerCrashProcedure scp = force 1919 ? new HBCKServerCrashProcedure(mpe, serverName, shouldSplitWal, carryingMeta) 1920 : new ServerCrashProcedure(mpe, serverName, shouldSplitWal, carryingMeta); 1921 long pid = procExec.submitProcedure(scp); 1922 LOG.info("Scheduled ServerCrashProcedure pid={} for {} (carryingMeta={}){}.", pid, serverName, 1923 carryingMeta, 1924 serverNode == null ? "" : " " + serverNode.toString() + ", oldState=" + oldState); 1925 return pid; 1926 } finally { 1927 if (serverNode != null) { 1928 serverNode.writeLock().unlock(); 1929 } 1930 } 1931 } 1932 1933 public void offlineRegion(final RegionInfo regionInfo) { 1934 // TODO used by MasterRpcServices 1935 RegionStateNode node = regionStates.getRegionStateNode(regionInfo); 1936 if (node != null) { 1937 node.offline(); 1938 } 1939 } 1940 1941 public void onlineRegion(final RegionInfo regionInfo, final ServerName serverName) { 1942 // TODO used by TestSplitTransactionOnCluster.java 1943 } 1944 1945 public Map<ServerName, List<RegionInfo>> 1946 getSnapShotOfAssignment(final Collection<RegionInfo> regions) { 1947 return regionStates.getSnapShotOfAssignment(regions); 1948 } 1949 1950 // ============================================================================================ 1951 // TODO: UTILS/HELPERS? 1952 // ============================================================================================ 1953 /** 1954 * Used by the client (via master) to identify if all regions have the schema updates 1955 * @return Pair indicating the status of the alter command (pending/total) 1956 */ 1957 public Pair<Integer, Integer> getReopenStatus(TableName tableName) { 1958 if (isTableDisabled(tableName)) { 1959 return new Pair<Integer, Integer>(0, 0); 1960 } 1961 1962 final List<RegionState> states = regionStates.getTableRegionStates(tableName); 1963 int ritCount = 0; 1964 for (RegionState regionState : states) { 1965 if (!regionState.isOpened() && !regionState.isSplit()) { 1966 ritCount++; 1967 } 1968 } 1969 return new Pair<Integer, Integer>(ritCount, states.size()); 1970 } 1971 1972 // ============================================================================================ 1973 // TODO: Region State In Transition 1974 // ============================================================================================ 1975 public boolean hasRegionsInTransition() { 1976 return regionStates.hasRegionsInTransition(); 1977 } 1978 1979 public List<RegionStateNode> getRegionsInTransition() { 1980 return regionStates.getRegionsInTransition(); 1981 } 1982 1983 public List<RegionInfo> getAssignedRegions() { 1984 return regionStates.getAssignedRegions(); 1985 } 1986 1987 /** 1988 * Resolve a cached {@link RegionInfo} from the region name as a {@code byte[]}. 1989 */ 1990 public RegionInfo getRegionInfo(final byte[] regionName) { 1991 final RegionStateNode regionState = regionStates.getRegionStateNodeFromName(regionName); 1992 return regionState != null ? regionState.getRegionInfo() : null; 1993 } 1994 1995 /** 1996 * Resolve a cached {@link RegionInfo} from the encoded region name as a {@code String}. 1997 */ 1998 public RegionInfo getRegionInfo(final String encodedRegionName) { 1999 final RegionStateNode regionState = 2000 regionStates.getRegionStateNodeFromEncodedRegionName(encodedRegionName); 2001 return regionState != null ? regionState.getRegionInfo() : null; 2002 } 2003 2004 // ============================================================================================ 2005 // Expected states on region state transition. 2006 // Notice that there is expected states for transiting to OPENING state, this is because SCP. 2007 // See the comments in regionOpening method for more details. 2008 // ============================================================================================ 2009 private static final State[] STATES_EXPECTED_ON_OPEN = { State.OPENING, // Normal case 2010 State.OPEN // Retrying 2011 }; 2012 2013 private static final State[] STATES_EXPECTED_ON_CLOSING = { State.OPEN, // Normal case 2014 State.CLOSING, // Retrying 2015 State.SPLITTING, // Offline the split parent 2016 State.MERGING // Offline the merge parents 2017 }; 2018 2019 private static final State[] STATES_EXPECTED_ON_CLOSED = { State.CLOSING, // Normal case 2020 State.CLOSED // Retrying 2021 }; 2022 2023 // This is for manually scheduled region assign, can add other states later if we find out other 2024 // usages 2025 private static final State[] STATES_EXPECTED_ON_ASSIGN = { State.CLOSED, State.OFFLINE }; 2026 2027 // We only allow unassign or move a region which is in OPEN state. 2028 private static final State[] STATES_EXPECTED_ON_UNASSIGN_OR_MOVE = { State.OPEN }; 2029 2030 // ============================================================================================ 2031 // Region Status update 2032 // Should only be called in TransitRegionStateProcedure(and related procedures), as the locking 2033 // and pre-assumptions are very tricky. 2034 // ============================================================================================ 2035 private CompletableFuture<Void> transitStateAndUpdate(RegionStateNode regionNode, 2036 RegionState.State newState, RegionState.State... expectedStates) { 2037 RegionState.State state = regionNode.getState(); 2038 try { 2039 regionNode.transitionState(newState, expectedStates); 2040 } catch (UnexpectedStateException e) { 2041 return FutureUtils.failedFuture(e); 2042 } 2043 CompletableFuture<Void> future = regionStateStore.updateRegionLocation(regionNode); 2044 FutureUtils.addListener(future, (r, e) -> { 2045 if (e != null) { 2046 // revert 2047 regionNode.setState(state); 2048 } 2049 }); 2050 return future; 2051 } 2052 2053 // should be called within the synchronized block of RegionStateNode 2054 CompletableFuture<Void> regionOpening(RegionStateNode regionNode) { 2055 // As in SCP, for performance reason, there is no TRSP attached with this region, we will not 2056 // update the region state, which means that the region could be in any state when we want to 2057 // assign it after a RS crash. So here we do not pass the expectedStates parameter. 2058 return transitStateAndUpdate(regionNode, State.OPENING).thenAccept(r -> { 2059 ServerStateNode serverNode = regionStates.getServerNode(regionNode.getRegionLocation()); 2060 // Here the server node could be null. For example, we want to assign the region to a given 2061 // region server and it crashes, and it is the region server which holds hbase:meta, then the 2062 // above transitStateAndUpdate call will never succeed until we finishes the SCP for it. But 2063 // after the SCP finishes, the server node will be removed, so when we arrive there, the 2064 // server 2065 // node will be null. This is not a big problem if we skip adding it, as later we will fail to 2066 // execute the remote procedure on the region server and then try to assign to another region 2067 // server 2068 if (serverNode != null) { 2069 serverNode.addRegion(regionNode); 2070 } 2071 // update the operation count metrics 2072 metrics.incrementOperationCounter(); 2073 }); 2074 } 2075 2076 // should be called under the RegionStateNode lock 2077 // The parameter 'giveUp' means whether we will try to open the region again, if it is true, then 2078 // we will persist the FAILED_OPEN state into hbase:meta. 2079 CompletableFuture<Void> regionFailedOpen(RegionStateNode regionNode, boolean giveUp) { 2080 RegionState.State state = regionNode.getState(); 2081 ServerName regionLocation = regionNode.getRegionLocation(); 2082 if (!giveUp) { 2083 if (regionLocation != null) { 2084 regionStates.removeRegionFromServer(regionLocation, regionNode); 2085 } 2086 return CompletableFuture.completedFuture(null); 2087 } 2088 regionNode.setState(State.FAILED_OPEN); 2089 regionNode.setRegionLocation(null); 2090 CompletableFuture<Void> future = regionStateStore.updateRegionLocation(regionNode); 2091 FutureUtils.addListener(future, (r, e) -> { 2092 if (e == null) { 2093 if (regionLocation != null) { 2094 regionStates.removeRegionFromServer(regionLocation, regionNode); 2095 } 2096 } else { 2097 // revert 2098 regionNode.setState(state); 2099 regionNode.setRegionLocation(regionLocation); 2100 } 2101 }); 2102 return future; 2103 } 2104 2105 // should be called under the RegionStateNode lock 2106 CompletableFuture<Void> regionClosing(RegionStateNode regionNode) { 2107 return transitStateAndUpdate(regionNode, State.CLOSING, STATES_EXPECTED_ON_CLOSING) 2108 .thenAccept(r -> { 2109 RegionInfo hri = regionNode.getRegionInfo(); 2110 // Set meta has not initialized early. so people trying to create/edit tables will wait 2111 if (isMetaRegion(hri)) { 2112 setMetaAssigned(hri, false); 2113 } 2114 // update the operation count metrics 2115 metrics.incrementOperationCounter(); 2116 }); 2117 } 2118 2119 // for open and close, they will first be persist to the procedure store in 2120 // RegionRemoteProcedureBase. So here we will first change the in memory state as it is considered 2121 // as succeeded if the persistence to procedure store is succeeded, and then when the 2122 // RegionRemoteProcedureBase is woken up, we will persist the RegionStateNode to hbase:meta. 2123 2124 // should be called under the RegionStateNode lock 2125 void regionOpenedWithoutPersistingToMeta(RegionStateNode regionNode) 2126 throws UnexpectedStateException { 2127 regionNode.transitionState(State.OPEN, STATES_EXPECTED_ON_OPEN); 2128 RegionInfo regionInfo = regionNode.getRegionInfo(); 2129 regionStates.addRegionToServer(regionNode); 2130 regionStates.removeFromFailedOpen(regionInfo); 2131 } 2132 2133 // should be called under the RegionStateNode lock 2134 void regionClosedWithoutPersistingToMeta(RegionStateNode regionNode) 2135 throws UnexpectedStateException { 2136 ServerName regionLocation = regionNode.getRegionLocation(); 2137 regionNode.transitionState(State.CLOSED, STATES_EXPECTED_ON_CLOSED); 2138 regionNode.setRegionLocation(null); 2139 if (regionLocation != null) { 2140 regionNode.setLastHost(regionLocation); 2141 regionStates.removeRegionFromServer(regionLocation, regionNode); 2142 } 2143 } 2144 2145 // should be called under the RegionStateNode lock 2146 CompletableFuture<Void> persistToMeta(RegionStateNode regionNode) { 2147 return regionStateStore.updateRegionLocation(regionNode).thenAccept(r -> { 2148 RegionInfo regionInfo = regionNode.getRegionInfo(); 2149 if (isMetaRegion(regionInfo) && regionNode.getState() == State.OPEN) { 2150 // Usually we'd set a table ENABLED at this stage but hbase:meta is ALWAYs enabled, it 2151 // can't be disabled -- so skip the RPC (besides... enabled is managed by TableStateManager 2152 // which is backed by hbase:meta... Avoid setting ENABLED to avoid having to update state 2153 // on table that contains state. 2154 setMetaAssigned(regionInfo, true); 2155 } 2156 }); 2157 } 2158 2159 // should be called under the RegionStateNode lock 2160 // for SCP 2161 public CompletableFuture<Void> regionClosedAbnormally(RegionStateNode regionNode) { 2162 RegionState.State state = regionNode.getState(); 2163 ServerName regionLocation = regionNode.getRegionLocation(); 2164 regionNode.setState(State.ABNORMALLY_CLOSED); 2165 regionNode.setRegionLocation(null); 2166 CompletableFuture<Void> future = regionStateStore.updateRegionLocation(regionNode); 2167 FutureUtils.addListener(future, (r, e) -> { 2168 if (e == null) { 2169 if (regionLocation != null) { 2170 regionNode.setLastHost(regionLocation); 2171 regionStates.removeRegionFromServer(regionLocation, regionNode); 2172 } 2173 } else { 2174 // revert 2175 regionNode.setState(state); 2176 regionNode.setRegionLocation(regionLocation); 2177 } 2178 }); 2179 return future; 2180 } 2181 2182 // ============================================================================================ 2183 // The above methods can only be called in TransitRegionStateProcedure(and related procedures) 2184 // ============================================================================================ 2185 2186 public void markRegionAsSplit(final RegionInfo parent, final ServerName serverName, 2187 final RegionInfo daughterA, final RegionInfo daughterB) throws IOException { 2188 // Update hbase:meta. Parent will be marked offline and split up in hbase:meta. 2189 // The parent stays in regionStates until cleared when removed by CatalogJanitor. 2190 // Update its state in regionStates to it shows as offline and split when read 2191 // later figuring what regions are in a table and what are not: see 2192 // regionStates#getRegionsOfTable 2193 final RegionStateNode node = regionStates.getOrCreateRegionStateNode(parent); 2194 node.setState(State.SPLIT); 2195 final RegionStateNode nodeA = regionStates.getOrCreateRegionStateNode(daughterA); 2196 nodeA.setState(State.SPLITTING_NEW); 2197 final RegionStateNode nodeB = regionStates.getOrCreateRegionStateNode(daughterB); 2198 nodeB.setState(State.SPLITTING_NEW); 2199 2200 TableDescriptor td = master.getTableDescriptors().get(parent.getTable()); 2201 // TODO: here we just update the parent region info in meta, to set split and offline to true, 2202 // without changing the one in the region node. This is a bit confusing but the region info 2203 // field in RegionStateNode is not expected to be changed in the current design. Need to find a 2204 // possible way to address this problem, or at least adding more comments about the trick to 2205 // deal with this problem, that when you want to filter out split parent, you need to check both 2206 // the RegionState on whether it is split, and also the region info. If one of them matches then 2207 // it is a split parent. And usually only one of them can match, as after restart, the region 2208 // state will be changed from SPLIT to CLOSED. 2209 regionStateStore.splitRegion(parent, daughterA, daughterB, serverName, td); 2210 if (shouldAssignFavoredNodes(parent)) { 2211 List<ServerName> onlineServers = this.master.getServerManager().getOnlineServersList(); 2212 getFavoredNodePromoter().generateFavoredNodesForDaughter(onlineServers, parent, daughterA, 2213 daughterB); 2214 } 2215 } 2216 2217 /** 2218 * When called here, the merge has happened. The merged regions have been unassigned and the above 2219 * markRegionClosed has been called on each so they have been disassociated from a hosting Server. 2220 * The merged region will be open after this call. The merged regions are removed from hbase:meta 2221 * below. Later they are deleted from the filesystem by the catalog janitor running against 2222 * hbase:meta. It notices when the merged region no longer holds references to the old regions 2223 * (References are deleted after a compaction rewrites what the Reference points at but not until 2224 * the archiver chore runs, are the References removed). 2225 */ 2226 public void markRegionAsMerged(final RegionInfo child, final ServerName serverName, 2227 RegionInfo[] mergeParents) throws IOException { 2228 final RegionStateNode node = regionStates.getOrCreateRegionStateNode(child); 2229 node.setState(State.MERGED); 2230 for (RegionInfo ri : mergeParents) { 2231 regionStates.deleteRegion(ri); 2232 } 2233 TableDescriptor td = master.getTableDescriptors().get(child.getTable()); 2234 regionStateStore.mergeRegions(child, mergeParents, serverName, td); 2235 if (shouldAssignFavoredNodes(child)) { 2236 getFavoredNodePromoter().generateFavoredNodesForMergedRegion(child, mergeParents); 2237 } 2238 } 2239 2240 /* 2241 * Favored nodes should be applied only when FavoredNodes balancer is configured and the region 2242 * belongs to a non-system table. 2243 */ 2244 private boolean shouldAssignFavoredNodes(RegionInfo region) { 2245 return this.shouldAssignRegionsWithFavoredNodes 2246 && FavoredNodesManager.isFavoredNodeApplicable(region); 2247 } 2248 2249 // ============================================================================================ 2250 // Assign Queue (Assign/Balance) 2251 // ============================================================================================ 2252 private final ArrayList<RegionStateNode> pendingAssignQueue = new ArrayList<RegionStateNode>(); 2253 private final ReentrantLock assignQueueLock = new ReentrantLock(); 2254 private final Condition assignQueueFullCond = assignQueueLock.newCondition(); 2255 2256 /** 2257 * Add the assign operation to the assignment queue. The pending assignment operation will be 2258 * processed, and each region will be assigned by a server using the balancer. 2259 */ 2260 protected void queueAssign(final RegionStateNode regionNode) { 2261 regionNode.getProcedureEvent().suspend(); 2262 2263 // TODO: quick-start for meta and the other sys-tables? 2264 assignQueueLock.lock(); 2265 try { 2266 pendingAssignQueue.add(regionNode); 2267 if ( 2268 regionNode.isSystemTable() || pendingAssignQueue.size() == 1 2269 || pendingAssignQueue.size() >= assignDispatchWaitQueueMaxSize 2270 ) { 2271 assignQueueFullCond.signal(); 2272 } 2273 } finally { 2274 assignQueueLock.unlock(); 2275 } 2276 } 2277 2278 private void startAssignmentThread() { 2279 assignThread = new Thread(master.getServerName().toShortString()) { 2280 @Override 2281 public void run() { 2282 while (isRunning()) { 2283 processAssignQueue(); 2284 } 2285 pendingAssignQueue.clear(); 2286 } 2287 }; 2288 assignThread.setDaemon(true); 2289 assignThread.start(); 2290 } 2291 2292 private void stopAssignmentThread() { 2293 assignQueueSignal(); 2294 try { 2295 while (assignThread.isAlive()) { 2296 assignQueueSignal(); 2297 assignThread.join(250); 2298 } 2299 } catch (InterruptedException e) { 2300 LOG.warn("join interrupted", e); 2301 Thread.currentThread().interrupt(); 2302 } 2303 } 2304 2305 private void assignQueueSignal() { 2306 assignQueueLock.lock(); 2307 try { 2308 assignQueueFullCond.signal(); 2309 } finally { 2310 assignQueueLock.unlock(); 2311 } 2312 } 2313 2314 @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP") 2315 private HashMap<RegionInfo, RegionStateNode> waitOnAssignQueue() { 2316 HashMap<RegionInfo, RegionStateNode> regions = null; 2317 2318 assignQueueLock.lock(); 2319 try { 2320 if (pendingAssignQueue.isEmpty() && isRunning()) { 2321 assignQueueFullCond.await(); 2322 } 2323 2324 if (!isRunning()) { 2325 return null; 2326 } 2327 assignQueueFullCond.await(assignDispatchWaitMillis, TimeUnit.MILLISECONDS); 2328 regions = new HashMap<RegionInfo, RegionStateNode>(pendingAssignQueue.size()); 2329 for (RegionStateNode regionNode : pendingAssignQueue) { 2330 regions.put(regionNode.getRegionInfo(), regionNode); 2331 } 2332 pendingAssignQueue.clear(); 2333 } catch (InterruptedException e) { 2334 LOG.warn("got interrupted ", e); 2335 Thread.currentThread().interrupt(); 2336 } finally { 2337 assignQueueLock.unlock(); 2338 } 2339 return regions; 2340 } 2341 2342 private void processAssignQueue() { 2343 final HashMap<RegionInfo, RegionStateNode> regions = waitOnAssignQueue(); 2344 if (regions == null || regions.size() == 0 || !isRunning()) { 2345 return; 2346 } 2347 2348 if (LOG.isTraceEnabled()) { 2349 LOG.trace("PROCESS ASSIGN QUEUE regionCount=" + regions.size()); 2350 } 2351 2352 // TODO: Optimize balancer. pass a RegionPlan? 2353 final HashMap<RegionInfo, ServerName> retainMap = new HashMap<>(); 2354 final List<RegionInfo> userHRIs = new ArrayList<>(regions.size()); 2355 // Regions for system tables requiring reassignment 2356 final List<RegionInfo> systemHRIs = new ArrayList<>(); 2357 for (RegionStateNode regionStateNode : regions.values()) { 2358 boolean sysTable = regionStateNode.isSystemTable(); 2359 final List<RegionInfo> hris = sysTable ? systemHRIs : userHRIs; 2360 if (regionStateNode.getRegionLocation() != null) { 2361 retainMap.put(regionStateNode.getRegionInfo(), regionStateNode.getRegionLocation()); 2362 } else { 2363 hris.add(regionStateNode.getRegionInfo()); 2364 } 2365 } 2366 2367 // TODO: connect with the listener to invalidate the cache 2368 2369 // TODO use events 2370 List<ServerName> servers = master.getServerManager().createDestinationServersList(); 2371 for (int i = 0; servers.size() < 1; ++i) { 2372 // Report every fourth time around this loop; try not to flood log. 2373 if (i % 4 == 0) { 2374 LOG.warn("No servers available; cannot place " + regions.size() + " unassigned regions."); 2375 } 2376 2377 if (!isRunning()) { 2378 LOG.debug("Stopped! Dropping assign of " + regions.size() + " queued regions."); 2379 return; 2380 } 2381 Threads.sleep(250); 2382 servers = master.getServerManager().createDestinationServersList(); 2383 } 2384 2385 if (!systemHRIs.isEmpty()) { 2386 // System table regions requiring reassignment are present, get region servers 2387 // not available for system table regions 2388 final List<ServerName> excludeServers = getExcludedServersForSystemTable(); 2389 List<ServerName> serversForSysTables = 2390 servers.stream().filter(s -> !excludeServers.contains(s)).collect(Collectors.toList()); 2391 if (serversForSysTables.isEmpty()) { 2392 LOG.warn("Filtering old server versions and the excluded produced an empty set; " 2393 + "instead considering all candidate servers!"); 2394 } 2395 LOG.debug("Processing assignQueue; systemServersCount=" + serversForSysTables.size() 2396 + ", allServersCount=" + servers.size()); 2397 processAssignmentPlans(regions, null, systemHRIs, 2398 serversForSysTables.isEmpty() && !containsBogusAssignments(regions, systemHRIs) 2399 ? servers 2400 : serversForSysTables); 2401 } 2402 2403 processAssignmentPlans(regions, retainMap, userHRIs, servers); 2404 } 2405 2406 private boolean containsBogusAssignments(Map<RegionInfo, RegionStateNode> regions, 2407 List<RegionInfo> hirs) { 2408 for (RegionInfo ri : hirs) { 2409 if ( 2410 regions.get(ri).getRegionLocation() != null 2411 && regions.get(ri).getRegionLocation().equals(LoadBalancer.BOGUS_SERVER_NAME) 2412 ) { 2413 return true; 2414 } 2415 } 2416 return false; 2417 } 2418 2419 private void processAssignmentPlans(final HashMap<RegionInfo, RegionStateNode> regions, 2420 final HashMap<RegionInfo, ServerName> retainMap, final List<RegionInfo> hris, 2421 final List<ServerName> servers) { 2422 boolean isTraceEnabled = LOG.isTraceEnabled(); 2423 if (isTraceEnabled) { 2424 LOG.trace("Available servers count=" + servers.size() + ": " + servers); 2425 } 2426 2427 final LoadBalancer balancer = getBalancer(); 2428 // ask the balancer where to place regions 2429 if (retainMap != null && !retainMap.isEmpty()) { 2430 if (isTraceEnabled) { 2431 LOG.trace("retain assign regions=" + retainMap); 2432 } 2433 try { 2434 acceptPlan(regions, balancer.retainAssignment(retainMap, servers)); 2435 } catch (IOException e) { 2436 LOG.warn("unable to retain assignment", e); 2437 addToPendingAssignment(regions, retainMap.keySet()); 2438 } 2439 } 2440 2441 // TODO: Do we need to split retain and round-robin? 2442 // the retain seems to fallback to round-robin/random if the region is not in the map. 2443 if (!hris.isEmpty()) { 2444 Collections.sort(hris, RegionInfo.COMPARATOR); 2445 if (isTraceEnabled) { 2446 LOG.trace("round robin regions=" + hris); 2447 } 2448 try { 2449 acceptPlan(regions, balancer.roundRobinAssignment(hris, servers)); 2450 } catch (IOException e) { 2451 LOG.warn("unable to round-robin assignment", e); 2452 addToPendingAssignment(regions, hris); 2453 } 2454 } 2455 } 2456 2457 private void acceptPlan(final HashMap<RegionInfo, RegionStateNode> regions, 2458 final Map<ServerName, List<RegionInfo>> plan) throws HBaseIOException { 2459 final ProcedureEvent<?>[] events = new ProcedureEvent[regions.size()]; 2460 final long st = EnvironmentEdgeManager.currentTime(); 2461 2462 if (plan.isEmpty()) { 2463 throw new HBaseIOException("unable to compute plans for regions=" + regions.size()); 2464 } 2465 2466 int evcount = 0; 2467 for (Map.Entry<ServerName, List<RegionInfo>> entry : plan.entrySet()) { 2468 final ServerName server = entry.getKey(); 2469 for (RegionInfo hri : entry.getValue()) { 2470 final RegionStateNode regionNode = regions.get(hri); 2471 regionNode.setRegionLocation(server); 2472 if (server.equals(LoadBalancer.BOGUS_SERVER_NAME) && regionNode.isSystemTable()) { 2473 assignQueueLock.lock(); 2474 try { 2475 pendingAssignQueue.add(regionNode); 2476 } finally { 2477 assignQueueLock.unlock(); 2478 } 2479 } else { 2480 events[evcount++] = regionNode.getProcedureEvent(); 2481 } 2482 } 2483 } 2484 ProcedureEvent.wakeEvents(getProcedureScheduler(), events); 2485 2486 final long et = EnvironmentEdgeManager.currentTime(); 2487 if (LOG.isTraceEnabled()) { 2488 LOG.trace("ASSIGN ACCEPT " + events.length + " -> " + StringUtils.humanTimeDiff(et - st)); 2489 } 2490 } 2491 2492 private void addToPendingAssignment(final HashMap<RegionInfo, RegionStateNode> regions, 2493 final Collection<RegionInfo> pendingRegions) { 2494 assignQueueLock.lock(); 2495 try { 2496 for (RegionInfo hri : pendingRegions) { 2497 pendingAssignQueue.add(regions.get(hri)); 2498 } 2499 } finally { 2500 assignQueueLock.unlock(); 2501 } 2502 } 2503 2504 /** 2505 * For a given cluster with mixed versions of servers, get a list of servers with lower versions, 2506 * where system table regions should not be assigned to. For system table, we must assign regions 2507 * to a server with highest version. However, we can disable this exclusion using config: 2508 * "hbase.min.version.move.system.tables" if checkForMinVersion is true. Detailed explanation 2509 * available with definition of minVersionToMoveSysTables. 2510 * @return List of Excluded servers for System table regions. 2511 */ 2512 public List<ServerName> getExcludedServersForSystemTable() { 2513 // TODO: This should be a cached list kept by the ServerManager rather than calculated on each 2514 // move or system region assign. The RegionServerTracker keeps list of online Servers with 2515 // RegionServerInfo that includes Version. 2516 List<Pair<ServerName, String>> serverList = 2517 master.getServerManager().getOnlineServersList().stream() 2518 .map(s -> new Pair<>(s, master.getRegionServerVersion(s))).collect(Collectors.toList()); 2519 if (serverList.isEmpty()) { 2520 return new ArrayList<>(); 2521 } 2522 String highestVersion = Collections 2523 .max(serverList, (o1, o2) -> VersionInfo.compareVersion(o1.getSecond(), o2.getSecond())) 2524 .getSecond(); 2525 if (!DEFAULT_MIN_VERSION_MOVE_SYS_TABLES_CONFIG.equals(minVersionToMoveSysTables)) { 2526 int comparedValue = VersionInfo.compareVersion(minVersionToMoveSysTables, highestVersion); 2527 if (comparedValue > 0) { 2528 return new ArrayList<>(); 2529 } 2530 } 2531 return serverList.stream().filter(pair -> !pair.getSecond().equals(highestVersion)) 2532 .map(Pair::getFirst).collect(Collectors.toList()); 2533 } 2534 2535 MasterServices getMaster() { 2536 return master; 2537 } 2538 2539 /** Returns a snapshot of rsReports */ 2540 public Map<ServerName, Set<byte[]>> getRSReports() { 2541 Map<ServerName, Set<byte[]>> rsReportsSnapshot = new HashMap<>(); 2542 synchronized (rsReports) { 2543 rsReports.entrySet().forEach(e -> rsReportsSnapshot.put(e.getKey(), e.getValue())); 2544 } 2545 return rsReportsSnapshot; 2546 } 2547 2548 /** 2549 * Provide regions state count for given table. e.g howmany regions of give table are 2550 * opened/closed/rit etc 2551 * @param tableName TableName 2552 * @return region states count 2553 */ 2554 public RegionStatesCount getRegionStatesCount(TableName tableName) { 2555 int openRegionsCount = 0; 2556 int closedRegionCount = 0; 2557 int ritCount = 0; 2558 int splitRegionCount = 0; 2559 int totalRegionCount = 0; 2560 if (!isTableDisabled(tableName)) { 2561 final List<RegionState> states = regionStates.getTableRegionStates(tableName); 2562 for (RegionState regionState : states) { 2563 if (regionState.isOpened()) { 2564 openRegionsCount++; 2565 } else if (regionState.isClosed()) { 2566 closedRegionCount++; 2567 } else if (regionState.isSplit()) { 2568 splitRegionCount++; 2569 } 2570 } 2571 totalRegionCount = states.size(); 2572 ritCount = totalRegionCount - openRegionsCount - splitRegionCount; 2573 } 2574 return new RegionStatesCount.RegionStatesCountBuilder().setOpenRegions(openRegionsCount) 2575 .setClosedRegions(closedRegionCount).setSplitRegions(splitRegionCount) 2576 .setRegionsInTransition(ritCount).setTotalRegions(totalRegionCount).build(); 2577 } 2578 2579}