001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.assignment; 019 020import edu.umd.cs.findbugs.annotations.NonNull; 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Collection; 024import java.util.Collections; 025import java.util.Comparator; 026import java.util.HashMap; 027import java.util.HashSet; 028import java.util.List; 029import java.util.Map; 030import java.util.Set; 031import java.util.SortedSet; 032import java.util.TreeSet; 033import java.util.concurrent.CompletableFuture; 034import java.util.concurrent.Future; 035import java.util.concurrent.TimeUnit; 036import java.util.concurrent.atomic.AtomicBoolean; 037import java.util.concurrent.locks.Condition; 038import java.util.concurrent.locks.ReentrantLock; 039import java.util.function.Consumer; 040import java.util.function.Function; 041import java.util.stream.Collectors; 042import java.util.stream.Stream; 043import org.apache.hadoop.conf.Configuration; 044import org.apache.hadoop.hbase.CatalogFamilyFormat; 045import org.apache.hadoop.hbase.DoNotRetryIOException; 046import org.apache.hadoop.hbase.HBaseIOException; 047import org.apache.hadoop.hbase.HConstants; 048import org.apache.hadoop.hbase.PleaseHoldException; 049import org.apache.hadoop.hbase.ServerName; 050import org.apache.hadoop.hbase.TableName; 051import org.apache.hadoop.hbase.UnknownRegionException; 052import org.apache.hadoop.hbase.client.DoNotRetryRegionException; 053import org.apache.hadoop.hbase.client.MasterSwitchType; 054import org.apache.hadoop.hbase.client.RegionInfo; 055import org.apache.hadoop.hbase.client.RegionInfoBuilder; 056import org.apache.hadoop.hbase.client.RegionReplicaUtil; 057import org.apache.hadoop.hbase.client.RegionStatesCount; 058import org.apache.hadoop.hbase.client.Result; 059import org.apache.hadoop.hbase.client.ResultScanner; 060import org.apache.hadoop.hbase.client.Scan; 061import org.apache.hadoop.hbase.client.TableDescriptor; 062import org.apache.hadoop.hbase.client.TableState; 063import org.apache.hadoop.hbase.exceptions.UnexpectedStateException; 064import org.apache.hadoop.hbase.favored.FavoredNodesManager; 065import org.apache.hadoop.hbase.favored.FavoredNodesPromoter; 066import org.apache.hadoop.hbase.master.LoadBalancer; 067import org.apache.hadoop.hbase.master.MasterServices; 068import org.apache.hadoop.hbase.master.MetricsAssignmentManager; 069import org.apache.hadoop.hbase.master.RegionPlan; 070import org.apache.hadoop.hbase.master.RegionState; 071import org.apache.hadoop.hbase.master.RegionState.State; 072import org.apache.hadoop.hbase.master.ServerManager; 073import org.apache.hadoop.hbase.master.TableStateManager; 074import org.apache.hadoop.hbase.master.balancer.FavoredStochasticBalancer; 075import org.apache.hadoop.hbase.master.procedure.HBCKServerCrashProcedure; 076import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 077import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler; 078import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait; 079import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure; 080import org.apache.hadoop.hbase.master.procedure.TruncateRegionProcedure; 081import org.apache.hadoop.hbase.master.region.MasterRegion; 082import org.apache.hadoop.hbase.procedure2.Procedure; 083import org.apache.hadoop.hbase.procedure2.ProcedureEvent; 084import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; 085import org.apache.hadoop.hbase.procedure2.ProcedureInMemoryChore; 086import org.apache.hadoop.hbase.procedure2.util.StringUtils; 087import org.apache.hadoop.hbase.regionserver.SequenceId; 088import org.apache.hadoop.hbase.rsgroup.RSGroupBasedLoadBalancer; 089import org.apache.hadoop.hbase.util.Bytes; 090import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 091import org.apache.hadoop.hbase.util.FutureUtils; 092import org.apache.hadoop.hbase.util.Pair; 093import org.apache.hadoop.hbase.util.Threads; 094import org.apache.hadoop.hbase.util.VersionInfo; 095import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; 096import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 097import org.apache.yetus.audience.InterfaceAudience; 098import org.apache.zookeeper.KeeperException; 099import org.slf4j.Logger; 100import org.slf4j.LoggerFactory; 101 102import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 103import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition; 104import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; 105import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest; 106import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse; 107 108/** 109 * The AssignmentManager is the coordinator for region assign/unassign operations. 110 * <ul> 111 * <li>In-memory states of regions and servers are stored in {@link RegionStates}.</li> 112 * <li>hbase:meta state updates are handled by {@link RegionStateStore}.</li> 113 * </ul> 114 * Regions are created by CreateTable, Split, Merge. Regions are deleted by DeleteTable, Split, 115 * Merge. Assigns are triggered by CreateTable, EnableTable, Split, Merge, ServerCrash. Unassigns 116 * are triggered by DisableTable, Split, Merge 117 */ 118@InterfaceAudience.Private 119public class AssignmentManager { 120 private static final Logger LOG = LoggerFactory.getLogger(AssignmentManager.class); 121 122 // TODO: AMv2 123 // - handle region migration from hbase1 to hbase2. 124 // - handle sys table assignment first (e.g. acl, namespace) 125 // - handle table priorities 126 // - If ServerBusyException trying to update hbase:meta, we abort the Master 127 // See updateRegionLocation in RegionStateStore. 128 // 129 // See also 130 // https://docs.google.com/document/d/1eVKa7FHdeoJ1-9o8yZcOTAQbv0u0bblBlCCzVSIn69g/edit#heading=h.ystjyrkbtoq5 131 // for other TODOs. 132 133 public static final String BOOTSTRAP_THREAD_POOL_SIZE_CONF_KEY = 134 "hbase.assignment.bootstrap.thread.pool.size"; 135 136 public static final String ASSIGN_DISPATCH_WAIT_MSEC_CONF_KEY = 137 "hbase.assignment.dispatch.wait.msec"; 138 private static final int DEFAULT_ASSIGN_DISPATCH_WAIT_MSEC = 150; 139 140 public static final String ASSIGN_DISPATCH_WAITQ_MAX_CONF_KEY = 141 "hbase.assignment.dispatch.wait.queue.max.size"; 142 private static final int DEFAULT_ASSIGN_DISPATCH_WAITQ_MAX = 100; 143 144 public static final String RIT_CHORE_INTERVAL_MSEC_CONF_KEY = 145 "hbase.assignment.rit.chore.interval.msec"; 146 private static final int DEFAULT_RIT_CHORE_INTERVAL_MSEC = 60 * 1000; 147 148 public static final String DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC_CONF_KEY = 149 "hbase.assignment.dead.region.metric.chore.interval.msec"; 150 private static final int DEFAULT_DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC = 120 * 1000; 151 152 public static final String ASSIGN_MAX_ATTEMPTS = "hbase.assignment.maximum.attempts"; 153 private static final int DEFAULT_ASSIGN_MAX_ATTEMPTS = Integer.MAX_VALUE; 154 155 public static final String ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS = 156 "hbase.assignment.retry.immediately.maximum.attempts"; 157 private static final int DEFAULT_ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS = 3; 158 159 /** Region in Transition metrics threshold time */ 160 public static final String METRICS_RIT_STUCK_WARNING_THRESHOLD = 161 "hbase.metrics.rit.stuck.warning.threshold"; 162 private static final int DEFAULT_RIT_STUCK_WARNING_THRESHOLD = 60 * 1000; 163 public static final String UNEXPECTED_STATE_REGION = "Unexpected state for "; 164 165 public static final String FORCE_REGION_RETAINMENT = "hbase.master.scp.retain.assignment.force"; 166 167 public static final boolean DEFAULT_FORCE_REGION_RETAINMENT = false; 168 169 /** The wait time in millis before checking again if the region's previous RS is back online */ 170 public static final String FORCE_REGION_RETAINMENT_WAIT_INTERVAL = 171 "hbase.master.scp.retain.assignment.force.wait-interval"; 172 173 public static final long DEFAULT_FORCE_REGION_RETAINMENT_WAIT_INTERVAL = 50; 174 175 /** 176 * The number of times to check if the region's previous RS is back online, before giving up and 177 * proceeding with assignment on a new RS 178 */ 179 public static final String FORCE_REGION_RETAINMENT_RETRIES = 180 "hbase.master.scp.retain.assignment.force.retries"; 181 182 public static final int DEFAULT_FORCE_REGION_RETAINMENT_RETRIES = 600; 183 184 private final ProcedureEvent<?> metaAssignEvent = new ProcedureEvent<>("meta assign"); 185 private final ProcedureEvent<?> metaLoadEvent = new ProcedureEvent<>("meta load"); 186 187 private final MetricsAssignmentManager metrics; 188 private final RegionInTransitionChore ritChore; 189 private final DeadServerMetricRegionChore deadMetricChore; 190 private final MasterServices master; 191 192 private final AtomicBoolean running = new AtomicBoolean(false); 193 private final RegionStates regionStates = new RegionStates(); 194 private final RegionStateStore regionStateStore; 195 196 /** 197 * When the operator uses this configuration option, any version between the current cluster 198 * version and the value of "hbase.min.version.move.system.tables" does not trigger any 199 * auto-region movement. Auto-region movement here refers to auto-migration of system table 200 * regions to newer server versions. It is assumed that the configured range of versions does not 201 * require special handling of moving system table regions to higher versioned RegionServer. This 202 * auto-migration is done by {@link #checkIfShouldMoveSystemRegionAsync()}. Example: Let's assume 203 * the cluster is on version 1.4.0 and we have set "hbase.min.version.move.system.tables" as 204 * "2.0.0". Now if we upgrade one RegionServer on 1.4.0 cluster to 1.6.0 (< 2.0.0), then 205 * AssignmentManager will not move hbase:meta, hbase:namespace and other system table regions to 206 * newly brought up RegionServer 1.6.0 as part of auto-migration. However, if we upgrade one 207 * RegionServer on 1.4.0 cluster to 2.2.0 (> 2.0.0), then AssignmentManager will move all system 208 * table regions to newly brought up RegionServer 2.2.0 as part of auto-migration done by 209 * {@link #checkIfShouldMoveSystemRegionAsync()}. "hbase.min.version.move.system.tables" is 210 * introduced as part of HBASE-22923. 211 */ 212 private final String minVersionToMoveSysTables; 213 214 private static final String MIN_VERSION_MOVE_SYS_TABLES_CONFIG = 215 "hbase.min.version.move.system.tables"; 216 private static final String DEFAULT_MIN_VERSION_MOVE_SYS_TABLES_CONFIG = ""; 217 218 private final Map<ServerName, Set<byte[]>> rsReports = new HashMap<>(); 219 220 private final boolean shouldAssignRegionsWithFavoredNodes; 221 private final int assignDispatchWaitQueueMaxSize; 222 private final int assignDispatchWaitMillis; 223 private final int assignMaxAttempts; 224 private final int assignRetryImmediatelyMaxAttempts; 225 226 private final MasterRegion masterRegion; 227 228 private final Object checkIfShouldMoveSystemRegionLock = new Object(); 229 230 private Thread assignThread; 231 232 private final boolean forceRegionRetainment; 233 234 private final long forceRegionRetainmentWaitInterval; 235 236 private final int forceRegionRetainmentRetries; 237 238 private final RegionInTransitionTracker regionInTransitionTracker = 239 new RegionInTransitionTracker(); 240 241 public AssignmentManager(MasterServices master, MasterRegion masterRegion) { 242 this(master, masterRegion, new RegionStateStore(master, masterRegion)); 243 } 244 245 AssignmentManager(MasterServices master, MasterRegion masterRegion, RegionStateStore stateStore) { 246 this.master = master; 247 this.regionStateStore = stateStore; 248 this.metrics = new MetricsAssignmentManager(); 249 this.masterRegion = masterRegion; 250 251 final Configuration conf = master.getConfiguration(); 252 253 // Only read favored nodes if using the favored nodes load balancer. 254 this.shouldAssignRegionsWithFavoredNodes = FavoredStochasticBalancer.class 255 .isAssignableFrom(conf.getClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, Object.class)); 256 257 this.assignDispatchWaitMillis = 258 conf.getInt(ASSIGN_DISPATCH_WAIT_MSEC_CONF_KEY, DEFAULT_ASSIGN_DISPATCH_WAIT_MSEC); 259 this.assignDispatchWaitQueueMaxSize = 260 conf.getInt(ASSIGN_DISPATCH_WAITQ_MAX_CONF_KEY, DEFAULT_ASSIGN_DISPATCH_WAITQ_MAX); 261 262 this.assignMaxAttempts = 263 Math.max(1, conf.getInt(ASSIGN_MAX_ATTEMPTS, DEFAULT_ASSIGN_MAX_ATTEMPTS)); 264 this.assignRetryImmediatelyMaxAttempts = conf.getInt(ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS, 265 DEFAULT_ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS); 266 267 int ritChoreInterval = 268 conf.getInt(RIT_CHORE_INTERVAL_MSEC_CONF_KEY, DEFAULT_RIT_CHORE_INTERVAL_MSEC); 269 this.ritChore = new RegionInTransitionChore(ritChoreInterval); 270 271 int deadRegionChoreInterval = conf.getInt(DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC_CONF_KEY, 272 DEFAULT_DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC); 273 if (deadRegionChoreInterval > 0) { 274 this.deadMetricChore = new DeadServerMetricRegionChore(deadRegionChoreInterval); 275 } else { 276 this.deadMetricChore = null; 277 } 278 minVersionToMoveSysTables = 279 conf.get(MIN_VERSION_MOVE_SYS_TABLES_CONFIG, DEFAULT_MIN_VERSION_MOVE_SYS_TABLES_CONFIG); 280 281 forceRegionRetainment = 282 conf.getBoolean(FORCE_REGION_RETAINMENT, DEFAULT_FORCE_REGION_RETAINMENT); 283 forceRegionRetainmentWaitInterval = conf.getLong(FORCE_REGION_RETAINMENT_WAIT_INTERVAL, 284 DEFAULT_FORCE_REGION_RETAINMENT_WAIT_INTERVAL); 285 forceRegionRetainmentRetries = 286 conf.getInt(FORCE_REGION_RETAINMENT_RETRIES, DEFAULT_FORCE_REGION_RETAINMENT_RETRIES); 287 } 288 289 private void mirrorMetaLocations() throws IOException, KeeperException { 290 // For compatibility, mirror the meta region state to zookeeper 291 // And we still need to use zookeeper to publish the meta region locations to region 292 // server, so they can serve as ClientMetaService 293 ZKWatcher zk = master.getZooKeeper(); 294 if (zk == null || !zk.getRecoverableZooKeeper().getState().isAlive()) { 295 // this is possible in tests, we do not provide a zk watcher or the zk watcher has been closed 296 return; 297 } 298 Collection<RegionStateNode> metaStates = regionStates.getRegionStateNodes(); 299 for (RegionStateNode metaState : metaStates) { 300 MetaTableLocator.setMetaLocation(zk, metaState.getRegionLocation(), 301 metaState.getRegionInfo().getReplicaId(), metaState.getState()); 302 } 303 int replicaCount = metaStates.size(); 304 // remove extra mirror locations 305 for (String znode : zk.getMetaReplicaNodes()) { 306 int replicaId = zk.getZNodePaths().getMetaReplicaIdFromZNode(znode); 307 if (replicaId >= replicaCount) { 308 MetaTableLocator.deleteMetaLocation(zk, replicaId); 309 } 310 } 311 } 312 313 public void start() throws IOException, KeeperException { 314 if (!running.compareAndSet(false, true)) { 315 return; 316 } 317 318 LOG.trace("Starting assignment manager"); 319 320 // Start the Assignment Thread 321 startAssignmentThread(); 322 // load meta region states. 323 // here we are still in the early steps of active master startup. There is only one thread(us) 324 // can access AssignmentManager and create region node, so here we do not need to lock the 325 // region node. 326 try (ResultScanner scanner = 327 masterRegion.getScanner(new Scan().addFamily(HConstants.CATALOG_FAMILY))) { 328 for (;;) { 329 Result result = scanner.next(); 330 if (result == null) { 331 break; 332 } 333 RegionStateStore 334 .visitMetaEntry((r, regionInfo, state, regionLocation, lastHost, openSeqNum) -> { 335 RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo); 336 regionNode.setState(state); 337 regionNode.setLastHost(lastHost); 338 regionNode.setRegionLocation(regionLocation); 339 regionNode.setOpenSeqNum(openSeqNum); 340 regionInTransitionTracker.handleRegionStateNodeOperation(regionNode); 341 342 if (regionNode.getProcedure() != null) { 343 regionNode.getProcedure().stateLoaded(this, regionNode); 344 } 345 if (regionLocation != null) { 346 // TODO: this could lead to some orphan server state nodes, as it is possible that the 347 // region server is already dead and its SCP has already finished but we have 348 // persisted an opening state on this region server. Finally the TRSP will assign the 349 // region to another region server, so it will not cause critical problems, just waste 350 // some memory as no one will try to cleanup these orphan server state nodes. 351 regionStates.createServer(regionLocation); 352 regionStates.addRegionToServer(regionNode); 353 } 354 if (RegionReplicaUtil.isDefaultReplica(regionInfo.getReplicaId())) { 355 setMetaAssigned(regionInfo, state == State.OPEN); 356 } 357 LOG.debug("Loaded {} {}", TableName.META_TABLE_NAME, regionNode); 358 }, result); 359 } 360 } 361 mirrorMetaLocations(); 362 } 363 364 /** 365 * Create RegionStateNode based on the TRSP list, and attach the TRSP to the RegionStateNode. 366 * <p> 367 * This is used to restore the RIT region list, so we do not need to restore it in the loadingMeta 368 * method below. And it is also very important as now before submitting a TRSP, we need to attach 369 * it to the RegionStateNode, which acts like a guard, so we need to restore this information at 370 * the very beginning, before we start processing any procedures. 371 */ 372 public void setupRIT(List<TransitRegionStateProcedure> procs) { 373 procs.forEach(proc -> { 374 RegionInfo regionInfo = proc.getRegion(); 375 RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo); 376 TransitRegionStateProcedure existingProc = regionNode.getProcedure(); 377 if (existingProc != null) { 378 // This is possible, as we will detach the procedure from the RSN before we 379 // actually finish the procedure. This is because that, we will detach the TRSP from the RSN 380 // during execution, at that time, the procedure has not been marked as done in the pv2 381 // framework yet, so it is possible that we schedule a new TRSP immediately and when 382 // arriving here, we will find out that there are multiple TRSPs for the region. But we can 383 // make sure that, only the last one can take the charge, the previous ones should have all 384 // been finished already. So here we will compare the proc id, the greater one will win. 385 if (existingProc.getProcId() < proc.getProcId()) { 386 // the new one wins, unset and set it to the new one below 387 regionNode.unsetProcedure(existingProc); 388 } else { 389 // the old one wins, skip 390 return; 391 } 392 } 393 LOG.info("Attach {} to {}", proc, regionNode); 394 regionNode.setProcedure(proc); 395 }); 396 } 397 398 public void initializationPostMetaOnline() { 399 // now that we are sure that meta is online, we can set TableStateManger in 400 // regionInTransitionTracker 401 regionInTransitionTracker.setTableStateManager(master.getTableStateManager()); 402 } 403 404 public void stop() { 405 if (!running.compareAndSet(true, false)) { 406 return; 407 } 408 409 LOG.info("Stopping assignment manager"); 410 411 // The AM is started before the procedure executor, 412 // but the actual work will be loaded/submitted only once we have the executor 413 final boolean hasProcExecutor = master.getMasterProcedureExecutor() != null; 414 415 // Remove the RIT chore 416 if (hasProcExecutor) { 417 master.getMasterProcedureExecutor().removeChore(this.ritChore); 418 if (this.deadMetricChore != null) { 419 master.getMasterProcedureExecutor().removeChore(this.deadMetricChore); 420 } 421 } 422 423 // Stop the Assignment Thread 424 stopAssignmentThread(); 425 426 // Stop the RegionStateStore 427 regionStates.clear(); 428 regionInTransitionTracker.stop(); 429 430 // Update meta events (for testing) 431 if (hasProcExecutor) { 432 metaLoadEvent.suspend(); 433 for (RegionInfo hri : getMetaRegionSet()) { 434 setMetaAssigned(hri, false); 435 } 436 } 437 } 438 439 public boolean isRunning() { 440 return running.get(); 441 } 442 443 public Configuration getConfiguration() { 444 return master.getConfiguration(); 445 } 446 447 public MetricsAssignmentManager getAssignmentManagerMetrics() { 448 return metrics; 449 } 450 451 private LoadBalancer getBalancer() { 452 return master.getLoadBalancer(); 453 } 454 455 private FavoredNodesPromoter getFavoredNodePromoter() { 456 return (FavoredNodesPromoter) ((RSGroupBasedLoadBalancer) master.getLoadBalancer()) 457 .getInternalBalancer(); 458 } 459 460 private MasterProcedureEnv getProcedureEnvironment() { 461 return master.getMasterProcedureExecutor().getEnvironment(); 462 } 463 464 private MasterProcedureScheduler getProcedureScheduler() { 465 return getProcedureEnvironment().getProcedureScheduler(); 466 } 467 468 int getAssignMaxAttempts() { 469 return assignMaxAttempts; 470 } 471 472 public boolean isForceRegionRetainment() { 473 return forceRegionRetainment; 474 } 475 476 public long getForceRegionRetainmentWaitInterval() { 477 return forceRegionRetainmentWaitInterval; 478 } 479 480 public int getForceRegionRetainmentRetries() { 481 return forceRegionRetainmentRetries; 482 } 483 484 int getAssignRetryImmediatelyMaxAttempts() { 485 return assignRetryImmediatelyMaxAttempts; 486 } 487 488 public RegionStates getRegionStates() { 489 return regionStates; 490 } 491 492 /** 493 * Returns the regions hosted by the specified server. 494 * <p/> 495 * Notice that, for SCP, after we submit the SCP, no one can change the region list for the 496 * ServerStateNode so we do not need any locks here. And for other usage, this can only give you a 497 * snapshot of the current region list for this server, which means, right after you get the 498 * region list, new regions may be moved to this server or some regions may be moved out from this 499 * server, so you should not use it critically if you need strong consistency. 500 */ 501 public List<RegionInfo> getRegionsOnServer(ServerName serverName) { 502 ServerStateNode serverInfo = regionStates.getServerNode(serverName); 503 if (serverInfo == null) { 504 return Collections.emptyList(); 505 } 506 return serverInfo.getRegionInfoList(); 507 } 508 509 private RegionInfo getRegionInfo(RegionStateNode rsn) { 510 if (rsn.isSplit() && !rsn.getRegionInfo().isSplit()) { 511 // see the comments in markRegionAsSplit on why we need to do this converting. 512 return RegionInfoBuilder.newBuilder(rsn.getRegionInfo()).setSplit(true).setOffline(true) 513 .build(); 514 } else { 515 return rsn.getRegionInfo(); 516 } 517 } 518 519 private Stream<RegionStateNode> getRegionStateNodes(TableName tableName, 520 boolean excludeOfflinedSplitParents) { 521 Stream<RegionStateNode> stream = regionStates.getTableRegionStateNodes(tableName).stream(); 522 if (excludeOfflinedSplitParents) { 523 return stream.filter(rsn -> !rsn.isSplit()); 524 } else { 525 return stream; 526 } 527 } 528 529 public List<RegionInfo> getTableRegions(TableName tableName, 530 boolean excludeOfflinedSplitParents) { 531 return getRegionStateNodes(tableName, excludeOfflinedSplitParents).map(this::getRegionInfo) 532 .collect(Collectors.toList()); 533 } 534 535 public List<Pair<RegionInfo, ServerName>> getTableRegionsAndLocations(TableName tableName, 536 boolean excludeOfflinedSplitParents) { 537 return getRegionStateNodes(tableName, excludeOfflinedSplitParents) 538 .map(rsn -> Pair.newPair(getRegionInfo(rsn), rsn.getRegionLocation())) 539 .collect(Collectors.toList()); 540 } 541 542 public RegionStateStore getRegionStateStore() { 543 return regionStateStore; 544 } 545 546 public List<ServerName> getFavoredNodes(final RegionInfo regionInfo) { 547 return this.shouldAssignRegionsWithFavoredNodes 548 ? getFavoredNodePromoter().getFavoredNodes(regionInfo) 549 : ServerName.EMPTY_SERVER_LIST; 550 } 551 552 // ============================================================================================ 553 // Table State Manager helpers 554 // ============================================================================================ 555 private TableStateManager getTableStateManager() { 556 return master.getTableStateManager(); 557 } 558 559 private boolean isTableEnabled(final TableName tableName) { 560 return getTableStateManager().isTableState(tableName, TableState.State.ENABLED); 561 } 562 563 private boolean isTableDisabled(final TableName tableName) { 564 return getTableStateManager().isTableState(tableName, TableState.State.DISABLED, 565 TableState.State.DISABLING); 566 } 567 568 // ============================================================================================ 569 // META Helpers 570 // ============================================================================================ 571 private boolean isMetaRegion(final RegionInfo regionInfo) { 572 return regionInfo.isMetaRegion(); 573 } 574 575 public boolean isMetaRegion(final byte[] regionName) { 576 return getMetaRegionFromName(regionName) != null; 577 } 578 579 public RegionInfo getMetaRegionFromName(final byte[] regionName) { 580 for (RegionInfo hri : getMetaRegionSet()) { 581 if (Bytes.equals(hri.getRegionName(), regionName)) { 582 return hri; 583 } 584 } 585 return null; 586 } 587 588 public boolean isCarryingMeta(final ServerName serverName) { 589 // TODO: handle multiple meta 590 return isCarryingRegion(serverName, RegionInfoBuilder.FIRST_META_REGIONINFO); 591 } 592 593 private boolean isCarryingRegion(final ServerName serverName, final RegionInfo regionInfo) { 594 // TODO: check for state? 595 final RegionStateNode node = regionStates.getRegionStateNode(regionInfo); 596 return (node != null && serverName.equals(node.getRegionLocation())); 597 } 598 599 private RegionInfo getMetaForRegion(final RegionInfo regionInfo) { 600 // if (regionInfo.isMetaRegion()) return regionInfo; 601 // TODO: handle multiple meta. if the region provided is not meta lookup 602 // which meta the region belongs to. 603 return RegionInfoBuilder.FIRST_META_REGIONINFO; 604 } 605 606 // TODO: handle multiple meta. 607 private static final Set<RegionInfo> META_REGION_SET = 608 Collections.singleton(RegionInfoBuilder.FIRST_META_REGIONINFO); 609 610 public Set<RegionInfo> getMetaRegionSet() { 611 return META_REGION_SET; 612 } 613 614 // ============================================================================================ 615 // META Event(s) helpers 616 // ============================================================================================ 617 /** 618 * Notice that, this only means the meta region is available on a RS, but the AM may still be 619 * loading the region states from meta, so usually you need to check {@link #isMetaLoaded()} first 620 * before checking this method, unless you can make sure that your piece of code can only be 621 * executed after AM builds the region states. 622 * @see #isMetaLoaded() 623 */ 624 public boolean isMetaAssigned() { 625 return metaAssignEvent.isReady(); 626 } 627 628 public boolean isMetaRegionInTransition() { 629 return !isMetaAssigned(); 630 } 631 632 /** 633 * Notice that this event does not mean the AM has already finished region state rebuilding. See 634 * the comment of {@link #isMetaAssigned()} for more details. 635 * @see #isMetaAssigned() 636 */ 637 public boolean waitMetaAssigned(Procedure<?> proc, RegionInfo regionInfo) { 638 return getMetaAssignEvent(getMetaForRegion(regionInfo)).suspendIfNotReady(proc); 639 } 640 641 private void setMetaAssigned(RegionInfo metaRegionInfo, boolean assigned) { 642 assert isMetaRegion(metaRegionInfo) : "unexpected non-meta region " + metaRegionInfo; 643 ProcedureEvent<?> metaAssignEvent = getMetaAssignEvent(metaRegionInfo); 644 if (assigned) { 645 metaAssignEvent.wake(getProcedureScheduler()); 646 } else { 647 metaAssignEvent.suspend(); 648 } 649 } 650 651 private ProcedureEvent<?> getMetaAssignEvent(RegionInfo metaRegionInfo) { 652 assert isMetaRegion(metaRegionInfo) : "unexpected non-meta region " + metaRegionInfo; 653 // TODO: handle multiple meta. 654 return metaAssignEvent; 655 } 656 657 /** 658 * Wait until AM finishes the meta loading, i.e, the region states rebuilding. 659 * @see #isMetaLoaded() 660 * @see #waitMetaAssigned(Procedure, RegionInfo) 661 */ 662 public boolean waitMetaLoaded(Procedure<?> proc) { 663 return metaLoadEvent.suspendIfNotReady(proc); 664 } 665 666 /** 667 * This method will be called in master initialization method after calling 668 * {@link #processOfflineRegions()}, as in processOfflineRegions we will generate assign 669 * procedures for offline regions, which may be conflict with creating table. 670 * <p/> 671 * This is a bit dirty, should be reconsidered after we decide whether to keep the 672 * {@link #processOfflineRegions()} method. 673 */ 674 public void wakeMetaLoadedEvent() { 675 metaLoadEvent.wake(getProcedureScheduler()); 676 assert isMetaLoaded() : "expected meta to be loaded"; 677 } 678 679 /** 680 * Return whether AM finishes the meta loading, i.e, the region states rebuilding. 681 * @see #isMetaAssigned() 682 * @see #waitMetaLoaded(Procedure) 683 */ 684 public boolean isMetaLoaded() { 685 return metaLoadEvent.isReady(); 686 } 687 688 /** 689 * Start a new thread to check if there are region servers whose versions are higher than others. 690 * If so, move all system table regions to RS with the highest version to keep compatibility. The 691 * reason is, RS in new version may not be able to access RS in old version when there are some 692 * incompatible changes. 693 * <p> 694 * This method is called when a new RegionServer is added to cluster only. 695 * </p> 696 */ 697 public void checkIfShouldMoveSystemRegionAsync() { 698 // TODO: Fix this thread. If a server is killed and a new one started, this thread thinks that 699 // it should 'move' the system tables from the old server to the new server but 700 // ServerCrashProcedure is on it; and it will take care of the assign without dataloss. 701 if (this.master.getServerManager().countOfRegionServers() <= 1) { 702 return; 703 } 704 // This thread used to run whenever there was a change in the cluster. The ZooKeeper 705 // childrenChanged notification came in before the nodeDeleted message and so this method 706 // cold run before a ServerCrashProcedure could run. That meant that this thread could see 707 // a Crashed Server before ServerCrashProcedure and it could find system regions on the 708 // crashed server and go move them before ServerCrashProcedure had a chance; could be 709 // dataloss too if WALs were not recovered. 710 new Thread(() -> { 711 try { 712 synchronized (checkIfShouldMoveSystemRegionLock) { 713 List<RegionPlan> plans = new ArrayList<>(); 714 // TODO: I don't think this code does a good job if all servers in cluster have same 715 // version. It looks like it will schedule unnecessary moves. 716 for (ServerName server : getExcludedServersForSystemTable()) { 717 if (master.getServerManager().isServerDead(server)) { 718 // TODO: See HBASE-18494 and HBASE-18495. Though getExcludedServersForSystemTable() 719 // considers only online servers, the server could be queued for dead server 720 // processing. As region assignments for crashed server is handled by 721 // ServerCrashProcedure, do NOT handle them here. The goal is to handle this through 722 // regular flow of LoadBalancer as a favored node and not to have this special 723 // handling. 724 continue; 725 } 726 List<RegionInfo> regionsShouldMove = getSystemTables(server); 727 if (!regionsShouldMove.isEmpty()) { 728 for (RegionInfo regionInfo : regionsShouldMove) { 729 // null value for dest forces destination server to be selected by balancer 730 RegionPlan plan = new RegionPlan(regionInfo, server, null); 731 if (regionInfo.isMetaRegion()) { 732 // Must move meta region first. 733 LOG.info("Async MOVE of {} to newer Server={}", regionInfo.getEncodedName(), 734 server); 735 moveAsync(plan); 736 } else { 737 plans.add(plan); 738 } 739 } 740 } 741 for (RegionPlan plan : plans) { 742 LOG.info("Async MOVE of {} to newer Server={}", plan.getRegionInfo().getEncodedName(), 743 server); 744 moveAsync(plan); 745 } 746 } 747 } 748 } catch (Throwable t) { 749 LOG.error(t.toString(), t); 750 } 751 }).start(); 752 } 753 754 private List<RegionInfo> getSystemTables(ServerName serverName) { 755 ServerStateNode serverNode = regionStates.getServerNode(serverName); 756 if (serverNode == null) { 757 return Collections.emptyList(); 758 } 759 return serverNode.getSystemRegionInfoList(); 760 } 761 762 private void preTransitCheck(RegionStateNode regionNode, RegionState.State[] expectedStates) 763 throws HBaseIOException { 764 if (regionNode.getProcedure() != null) { 765 throw new HBaseIOException( 766 regionNode + " is currently in transition; pid=" + regionNode.getProcedure().getProcId()); 767 } 768 if (!regionNode.isInState(expectedStates)) { 769 throw new DoNotRetryRegionException(UNEXPECTED_STATE_REGION + regionNode); 770 } 771 if (isTableDisabled(regionNode.getTable())) { 772 throw new DoNotRetryIOException(regionNode.getTable() + " is disabled for " + regionNode); 773 } 774 } 775 776 /** 777 * Create an assign TransitRegionStateProcedure. Makes sure of RegionState. Throws exception if 778 * not appropriate UNLESS override is set. Used by hbck2 but also by straightline 779 * {@link #assign(RegionInfo, ServerName)} and {@link #assignAsync(RegionInfo, ServerName)}. 780 * @see #createAssignProcedure(RegionStateNode, ServerName) for a version that does NO checking 781 * used when only when no checking needed. 782 * @param override If false, check RegionState is appropriate for assign; if not throw exception. 783 */ 784 private TransitRegionStateProcedure createAssignProcedure(RegionInfo regionInfo, ServerName sn, 785 boolean override, boolean force) throws IOException { 786 RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo); 787 regionNode.lock(); 788 try { 789 if (override) { 790 if (!force) { 791 preTransitCheck(regionNode, STATES_EXPECTED_ON_ASSIGN); 792 } 793 if (regionNode.getProcedure() != null) { 794 regionNode.unsetProcedure(regionNode.getProcedure()); 795 } 796 } else { 797 preTransitCheck(regionNode, STATES_EXPECTED_ON_ASSIGN); 798 } 799 assert regionNode.getProcedure() == null; 800 TransitRegionStateProcedure proc = regionNode.setProcedure( 801 TransitRegionStateProcedure.assign(getProcedureEnvironment(), regionInfo, sn)); 802 regionInTransitionTracker.handleRegionStateNodeOperation(regionNode); 803 return proc; 804 } finally { 805 regionNode.unlock(); 806 } 807 } 808 809 /** 810 * Create an assign TransitRegionStateProcedure. Does NO checking of RegionState. Presumes 811 * appriopriate state ripe for assign. 812 * @see #createAssignProcedure(RegionInfo, ServerName, boolean, boolean) 813 */ 814 private TransitRegionStateProcedure createAssignProcedure(RegionStateNode regionNode, 815 ServerName targetServer) { 816 regionNode.lock(); 817 try { 818 TransitRegionStateProcedure proc = regionNode.setProcedure(TransitRegionStateProcedure 819 .assign(getProcedureEnvironment(), regionNode.getRegionInfo(), targetServer)); 820 regionInTransitionTracker.handleRegionStateNodeOperation(regionNode); 821 return proc; 822 } finally { 823 regionNode.unlock(); 824 } 825 } 826 827 public long assign(RegionInfo regionInfo, ServerName sn) throws IOException { 828 TransitRegionStateProcedure proc = createAssignProcedure(regionInfo, sn, false, false); 829 ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc); 830 return proc.getProcId(); 831 } 832 833 public long assign(RegionInfo regionInfo) throws IOException { 834 return assign(regionInfo, null); 835 } 836 837 /** 838 * Submits a procedure that assigns a region to a target server without waiting for it to finish 839 * @param regionInfo the region we would like to assign 840 * @param sn target server name 841 */ 842 public Future<byte[]> assignAsync(RegionInfo regionInfo, ServerName sn) throws IOException { 843 return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), 844 createAssignProcedure(regionInfo, sn, false, false)); 845 } 846 847 /** 848 * Submits a procedure that assigns a region without waiting for it to finish 849 * @param regionInfo the region we would like to assign 850 */ 851 public Future<byte[]> assignAsync(RegionInfo regionInfo) throws IOException { 852 return assignAsync(regionInfo, null); 853 } 854 855 public long unassign(RegionInfo regionInfo) throws IOException { 856 RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo); 857 if (regionNode == null) { 858 throw new UnknownRegionException("No RegionState found for " + regionInfo.getEncodedName()); 859 } 860 TransitRegionStateProcedure proc; 861 regionNode.lock(); 862 try { 863 preTransitCheck(regionNode, STATES_EXPECTED_ON_UNASSIGN_OR_MOVE); 864 proc = TransitRegionStateProcedure.unassign(getProcedureEnvironment(), regionInfo); 865 regionNode.setProcedure(proc); 866 } finally { 867 regionNode.unlock(); 868 } 869 ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc); 870 return proc.getProcId(); 871 } 872 873 public TransitRegionStateProcedure createMoveRegionProcedure(RegionInfo regionInfo, 874 ServerName targetServer) throws HBaseIOException { 875 RegionStateNode regionNode = this.regionStates.getRegionStateNode(regionInfo); 876 if (regionNode == null) { 877 throw new UnknownRegionException( 878 "No RegionStateNode found for " + regionInfo.getEncodedName() + "(Closed/Deleted?)"); 879 } 880 TransitRegionStateProcedure proc; 881 regionNode.lock(); 882 try { 883 preTransitCheck(regionNode, STATES_EXPECTED_ON_UNASSIGN_OR_MOVE); 884 regionNode.checkOnline(); 885 proc = TransitRegionStateProcedure.move(getProcedureEnvironment(), regionInfo, targetServer); 886 regionNode.setProcedure(proc); 887 } finally { 888 regionNode.unlock(); 889 } 890 return proc; 891 } 892 893 public void move(RegionInfo regionInfo) throws IOException { 894 TransitRegionStateProcedure proc = createMoveRegionProcedure(regionInfo, null); 895 ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc); 896 } 897 898 public Future<byte[]> moveAsync(RegionPlan regionPlan) throws HBaseIOException { 899 TransitRegionStateProcedure proc = 900 createMoveRegionProcedure(regionPlan.getRegionInfo(), regionPlan.getDestination()); 901 return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), proc); 902 } 903 904 public Future<byte[]> balance(RegionPlan regionPlan) throws HBaseIOException { 905 ServerName current = 906 this.getRegionStates().getRegionAssignments().get(regionPlan.getRegionInfo()); 907 if (current == null || !current.equals(regionPlan.getSource())) { 908 LOG.debug("Skip region plan {}, source server not match, current region location is {}", 909 regionPlan, current == null ? "(null)" : current); 910 return null; 911 } 912 return moveAsync(regionPlan); 913 } 914 915 // ============================================================================================ 916 // RegionTransition procedures helpers 917 // ============================================================================================ 918 919 /** 920 * Create round-robin assigns. Use on table creation to distribute out regions across cluster. 921 * @return AssignProcedures made out of the passed in <code>hris</code> and a call to the balancer 922 * to populate the assigns with targets chosen using round-robin (default balancer 923 * scheme). If at assign-time, the target chosen is no longer up, thats fine, the 924 * AssignProcedure will ask the balancer for a new target, and so on. 925 */ 926 public TransitRegionStateProcedure[] createRoundRobinAssignProcedures(List<RegionInfo> hris, 927 List<ServerName> serversToExclude) { 928 if (hris.isEmpty()) { 929 return new TransitRegionStateProcedure[0]; 930 } 931 932 if ( 933 serversToExclude != null && this.master.getServerManager().getOnlineServersList().size() == 1 934 ) { 935 LOG.debug("Only one region server found and hence going ahead with the assignment"); 936 serversToExclude = null; 937 } 938 try { 939 // Ask the balancer to assign our regions. Pass the regions en masse. The balancer can do 940 // a better job if it has all the assignments in the one lump. 941 Map<ServerName, List<RegionInfo>> assignments = getBalancer().roundRobinAssignment(hris, 942 this.master.getServerManager().createDestinationServersList(serversToExclude)); 943 // Return mid-method! 944 return createAssignProcedures(assignments); 945 } catch (IOException hioe) { 946 LOG.warn("Failed roundRobinAssignment", hioe); 947 } 948 // If an error above, fall-through to this simpler assign. Last resort. 949 return createAssignProcedures(hris); 950 } 951 952 /** 953 * Create round-robin assigns. Use on table creation to distribute out regions across cluster. 954 * @return AssignProcedures made out of the passed in <code>hris</code> and a call to the balancer 955 * to populate the assigns with targets chosen using round-robin (default balancer 956 * scheme). If at assign-time, the target chosen is no longer up, thats fine, the 957 * AssignProcedure will ask the balancer for a new target, and so on. 958 */ 959 public TransitRegionStateProcedure[] createRoundRobinAssignProcedures(List<RegionInfo> hris) { 960 return createRoundRobinAssignProcedures(hris, null); 961 } 962 963 static int compare(TransitRegionStateProcedure left, TransitRegionStateProcedure right) { 964 if (left.getRegion().isMetaRegion()) { 965 if (right.getRegion().isMetaRegion()) { 966 return RegionInfo.COMPARATOR.compare(left.getRegion(), right.getRegion()); 967 } 968 return -1; 969 } else if (right.getRegion().isMetaRegion()) { 970 return +1; 971 } 972 if (left.getRegion().getTable().isSystemTable()) { 973 if (right.getRegion().getTable().isSystemTable()) { 974 return RegionInfo.COMPARATOR.compare(left.getRegion(), right.getRegion()); 975 } 976 return -1; 977 } else if (right.getRegion().getTable().isSystemTable()) { 978 return +1; 979 } 980 return RegionInfo.COMPARATOR.compare(left.getRegion(), right.getRegion()); 981 } 982 983 /** 984 * Create one TransitRegionStateProcedure to assign a region w/o specifying a target server. This 985 * method is called from HBCK2. 986 * @return an assign or null 987 */ 988 public TransitRegionStateProcedure createOneAssignProcedure(RegionInfo ri, boolean override, 989 boolean force) { 990 TransitRegionStateProcedure trsp = null; 991 try { 992 trsp = createAssignProcedure(ri, null, override, force); 993 } catch (IOException ioe) { 994 LOG.info( 995 "Failed {} assign, override={}" 996 + (override ? "" : "; set override to by-pass state checks."), 997 ri.getEncodedName(), override, ioe); 998 } 999 return trsp; 1000 } 1001 1002 /** 1003 * Create one TransitRegionStateProcedure to unassign a region. This method is called from HBCK2. 1004 * @return an unassign or null 1005 */ 1006 public TransitRegionStateProcedure createOneUnassignProcedure(RegionInfo ri, boolean override, 1007 boolean force) { 1008 RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(ri); 1009 TransitRegionStateProcedure trsp = null; 1010 regionNode.lock(); 1011 try { 1012 if (override) { 1013 if (!force) { 1014 preTransitCheck(regionNode, STATES_EXPECTED_ON_UNASSIGN_OR_MOVE); 1015 } 1016 if (regionNode.getProcedure() != null) { 1017 regionNode.unsetProcedure(regionNode.getProcedure()); 1018 } 1019 } else { 1020 // This is where we could throw an exception; i.e. override is false. 1021 preTransitCheck(regionNode, STATES_EXPECTED_ON_UNASSIGN_OR_MOVE); 1022 } 1023 assert regionNode.getProcedure() == null; 1024 trsp = 1025 TransitRegionStateProcedure.unassign(getProcedureEnvironment(), regionNode.getRegionInfo()); 1026 regionNode.setProcedure(trsp); 1027 } catch (IOException ioe) { 1028 // 'override' must be false here. 1029 LOG.info("Failed {} unassign, override=false; set override to by-pass state checks.", 1030 ri.getEncodedName(), ioe); 1031 } finally { 1032 regionNode.unlock(); 1033 } 1034 return trsp; 1035 } 1036 1037 /** 1038 * Create an array of TransitRegionStateProcedure w/o specifying a target server. Used as fallback 1039 * of caller is unable to do {@link #createAssignProcedures(Map)}. 1040 * <p/> 1041 * If no target server, at assign time, we will try to use the former location of the region if 1042 * one exists. This is how we 'retain' the old location across a server restart. 1043 * <p/> 1044 * Should only be called when you can make sure that no one can touch these regions other than 1045 * you. For example, when you are creating or enabling table. Presumes all Regions are in 1046 * appropriate state ripe for assign; no checking of Region state is done in here. 1047 * @see #createAssignProcedures(Map) 1048 */ 1049 public TransitRegionStateProcedure[] createAssignProcedures(List<RegionInfo> hris) { 1050 return hris.stream().map(hri -> regionStates.getOrCreateRegionStateNode(hri)) 1051 .map(regionNode -> createAssignProcedure(regionNode, null)).sorted(AssignmentManager::compare) 1052 .toArray(TransitRegionStateProcedure[]::new); 1053 } 1054 1055 /** 1056 * Tied to {@link #createAssignProcedures(List)} in that it is called if caller is unable to run 1057 * this method. Presumes all Regions are in appropriate state ripe for assign; no checking of 1058 * Region state is done in here. 1059 * @param assignments Map of assignments from which we produce an array of AssignProcedures. 1060 * @return Assignments made from the passed in <code>assignments</code> 1061 * @see #createAssignProcedures(List) 1062 */ 1063 private TransitRegionStateProcedure[] 1064 createAssignProcedures(Map<ServerName, List<RegionInfo>> assignments) { 1065 return assignments.entrySet().stream() 1066 .flatMap(e -> e.getValue().stream().map(hri -> regionStates.getOrCreateRegionStateNode(hri)) 1067 .map(regionNode -> createAssignProcedure(regionNode, e.getKey()))) 1068 .sorted(AssignmentManager::compare).toArray(TransitRegionStateProcedure[]::new); 1069 } 1070 1071 // for creating unassign TRSP when disabling a table or closing excess region replicas 1072 private TransitRegionStateProcedure forceCreateUnssignProcedure(RegionStateNode regionNode) { 1073 regionNode.lock(); 1074 try { 1075 if (regionNode.isInState(State.OFFLINE, State.CLOSED, State.SPLIT)) { 1076 return null; 1077 } 1078 // in general, a split parent should be in CLOSED or SPLIT state, but anyway, let's check it 1079 // here for safety 1080 if (regionNode.getRegionInfo().isSplit()) { 1081 LOG.warn("{} is a split parent but not in CLOSED or SPLIT state", regionNode); 1082 return null; 1083 } 1084 // As in DisableTableProcedure or ModifyTableProcedure, we will hold the xlock for table, so 1085 // we can make sure that this procedure has not been executed yet, as TRSP will hold the 1086 // shared lock for table all the time. So here we will unset it and when it is actually 1087 // executed, it will find that the attach procedure is not itself and quit immediately. 1088 if (regionNode.getProcedure() != null) { 1089 regionNode.unsetProcedure(regionNode.getProcedure()); 1090 } 1091 return regionNode.setProcedure(TransitRegionStateProcedure.unassign(getProcedureEnvironment(), 1092 regionNode.getRegionInfo())); 1093 } finally { 1094 regionNode.unlock(); 1095 } 1096 } 1097 1098 /** 1099 * Called by DisableTableProcedure to unassign all the regions for a table. 1100 */ 1101 public TransitRegionStateProcedure[] createUnassignProceduresForDisabling(TableName tableName) { 1102 return regionStates.getTableRegionStateNodes(tableName).stream() 1103 .map(this::forceCreateUnssignProcedure).filter(p -> p != null) 1104 .toArray(TransitRegionStateProcedure[]::new); 1105 } 1106 1107 private int submitUnassignProcedure(TableName tableName, 1108 Function<RegionStateNode, Boolean> shouldSubmit, Consumer<RegionStateNode> logRIT, 1109 Consumer<TransitRegionStateProcedure> submit) { 1110 int inTransitionCount = 0; 1111 for (RegionStateNode regionNode : regionStates.getTableRegionStateNodes(tableName)) { 1112 regionNode.lock(); 1113 try { 1114 if (shouldSubmit.apply(regionNode)) { 1115 if (regionNode.isTransitionScheduled()) { 1116 logRIT.accept(regionNode); 1117 inTransitionCount++; 1118 continue; 1119 } 1120 if (regionNode.isInState(State.OFFLINE, State.CLOSED, State.SPLIT)) { 1121 continue; 1122 } 1123 submit.accept(regionNode.setProcedure(TransitRegionStateProcedure 1124 .unassign(getProcedureEnvironment(), regionNode.getRegionInfo()))); 1125 } 1126 } finally { 1127 regionNode.unlock(); 1128 } 1129 } 1130 return inTransitionCount; 1131 } 1132 1133 /** 1134 * Called by DisableTableProcedure to unassign all regions for a table. Will skip submit unassign 1135 * procedure if the region is in transition, so you may need to call this method multiple times. 1136 * @param tableName the table for closing excess region replicas 1137 * @param submit for submitting procedure 1138 * @return the number of regions in transition that we can not schedule unassign procedures 1139 */ 1140 public int submitUnassignProcedureForDisablingTable(TableName tableName, 1141 Consumer<TransitRegionStateProcedure> submit) { 1142 return submitUnassignProcedure(tableName, rn -> true, 1143 rn -> LOG.debug("skip scheduling unassign procedure for {} when closing table regions " 1144 + "for disabling since it is in transition", rn), 1145 submit); 1146 } 1147 1148 /** 1149 * Called by ModifyTableProcedure to unassign all the excess region replicas for a table. Will 1150 * skip submit unassign procedure if the region is in transition, so you may need to call this 1151 * method multiple times. 1152 * @param tableName the table for closing excess region replicas 1153 * @param newReplicaCount the new replica count, should be less than current replica count 1154 * @param submit for submitting procedure 1155 * @return the number of regions in transition that we can not schedule unassign procedures 1156 */ 1157 public int submitUnassignProcedureForClosingExcessRegionReplicas(TableName tableName, 1158 int newReplicaCount, Consumer<TransitRegionStateProcedure> submit) { 1159 return submitUnassignProcedure(tableName, 1160 rn -> rn.getRegionInfo().getReplicaId() >= newReplicaCount, 1161 rn -> LOG.debug("skip scheduling unassign procedure for {} when closing excess region " 1162 + "replicas since it is in transition", rn), 1163 submit); 1164 } 1165 1166 private int numberOfUnclosedRegions(TableName tableName, 1167 Function<RegionStateNode, Boolean> shouldSubmit) { 1168 int unclosed = 0; 1169 for (RegionStateNode regionNode : regionStates.getTableRegionStateNodes(tableName)) { 1170 regionNode.lock(); 1171 try { 1172 if (shouldSubmit.apply(regionNode)) { 1173 if (!regionNode.isInState(State.OFFLINE, State.CLOSED, State.SPLIT)) { 1174 unclosed++; 1175 } 1176 } 1177 } finally { 1178 regionNode.unlock(); 1179 } 1180 } 1181 return unclosed; 1182 } 1183 1184 public int numberOfUnclosedRegionsForDisabling(TableName tableName) { 1185 return numberOfUnclosedRegions(tableName, rn -> true); 1186 } 1187 1188 public int numberOfUnclosedExcessRegionReplicas(TableName tableName, int newReplicaCount) { 1189 return numberOfUnclosedRegions(tableName, 1190 rn -> rn.getRegionInfo().getReplicaId() >= newReplicaCount); 1191 } 1192 1193 public SplitTableRegionProcedure createSplitProcedure(final RegionInfo regionToSplit, 1194 final byte[] splitKey) throws IOException { 1195 return new SplitTableRegionProcedure(getProcedureEnvironment(), regionToSplit, splitKey); 1196 } 1197 1198 public TruncateRegionProcedure createTruncateRegionProcedure(final RegionInfo regionToTruncate) 1199 throws IOException { 1200 return new TruncateRegionProcedure(getProcedureEnvironment(), regionToTruncate); 1201 } 1202 1203 public MergeTableRegionsProcedure createMergeProcedure(RegionInfo... ris) throws IOException { 1204 return new MergeTableRegionsProcedure(getProcedureEnvironment(), ris, false); 1205 } 1206 1207 /** 1208 * Delete the region states. This is called by "DeleteTable" 1209 */ 1210 public void deleteTable(final TableName tableName) throws IOException { 1211 final ArrayList<RegionInfo> regions = regionStates.getTableRegionsInfo(tableName); 1212 regionStateStore.deleteRegions(regions); 1213 for (int i = 0; i < regions.size(); ++i) { 1214 final RegionInfo regionInfo = regions.get(i); 1215 regionStates.deleteRegion(regionInfo); 1216 } 1217 } 1218 1219 // ============================================================================================ 1220 // RS Region Transition Report helpers 1221 // ============================================================================================ 1222 private void reportRegionStateTransition(ReportRegionStateTransitionResponse.Builder builder, 1223 ServerStateNode serverNode, List<RegionStateTransition> transitionList) throws IOException { 1224 for (RegionStateTransition transition : transitionList) { 1225 switch (transition.getTransitionCode()) { 1226 case OPENED: 1227 case FAILED_OPEN: 1228 case CLOSED: 1229 assert transition.getRegionInfoCount() == 1 : transition; 1230 final RegionInfo hri = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0)); 1231 long procId = 1232 transition.getProcIdCount() > 0 ? transition.getProcId(0) : Procedure.NO_PROC_ID; 1233 updateRegionTransition(serverNode, transition.getTransitionCode(), hri, 1234 transition.hasOpenSeqNum() ? transition.getOpenSeqNum() : HConstants.NO_SEQNUM, procId); 1235 break; 1236 case READY_TO_SPLIT: 1237 case SPLIT: 1238 case SPLIT_REVERTED: 1239 assert transition.getRegionInfoCount() == 3 : transition; 1240 final RegionInfo parent = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0)); 1241 final RegionInfo splitA = ProtobufUtil.toRegionInfo(transition.getRegionInfo(1)); 1242 final RegionInfo splitB = ProtobufUtil.toRegionInfo(transition.getRegionInfo(2)); 1243 updateRegionSplitTransition(serverNode, transition.getTransitionCode(), parent, splitA, 1244 splitB); 1245 break; 1246 case READY_TO_MERGE: 1247 case MERGED: 1248 case MERGE_REVERTED: 1249 assert transition.getRegionInfoCount() == 3 : transition; 1250 final RegionInfo merged = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0)); 1251 final RegionInfo mergeA = ProtobufUtil.toRegionInfo(transition.getRegionInfo(1)); 1252 final RegionInfo mergeB = ProtobufUtil.toRegionInfo(transition.getRegionInfo(2)); 1253 updateRegionMergeTransition(serverNode, transition.getTransitionCode(), merged, mergeA, 1254 mergeB); 1255 break; 1256 } 1257 } 1258 } 1259 1260 public ReportRegionStateTransitionResponse reportRegionStateTransition( 1261 final ReportRegionStateTransitionRequest req) throws PleaseHoldException { 1262 ReportRegionStateTransitionResponse.Builder builder = 1263 ReportRegionStateTransitionResponse.newBuilder(); 1264 ServerName serverName = ProtobufUtil.toServerName(req.getServer()); 1265 ServerStateNode serverNode = regionStates.getServerNode(serverName); 1266 if (serverNode == null) { 1267 LOG.warn("No server node for {}", serverName); 1268 builder.setErrorMessage("No server node for " + serverName); 1269 return builder.build(); 1270 } 1271 // here we have to acquire a read lock instead of a simple exclusive lock. This is because that 1272 // we should not block other reportRegionStateTransition call from the same region server. This 1273 // is not only about performance, but also to prevent dead lock. Think of the meta region is 1274 // also on the same region server and you hold the lock which blocks the 1275 // reportRegionStateTransition for meta, and since meta is not online, you will block inside the 1276 // lock protection to wait for meta online... 1277 serverNode.readLock().lock(); 1278 try { 1279 // we only accept reportRegionStateTransition if the region server is online, see the comment 1280 // above in submitServerCrash method and HBASE-21508 for more details. 1281 if (serverNode.isInState(ServerState.ONLINE)) { 1282 try { 1283 reportRegionStateTransition(builder, serverNode, req.getTransitionList()); 1284 } catch (PleaseHoldException e) { 1285 LOG.trace("Failed transition ", e); 1286 throw e; 1287 } catch (UnsupportedOperationException | IOException e) { 1288 // TODO: at the moment we have a single error message and the RS will abort 1289 // if the master says that one of the region transitions failed. 1290 LOG.warn("Failed transition", e); 1291 builder.setErrorMessage("Failed transition " + e.getMessage()); 1292 } 1293 } else { 1294 LOG.warn("The region server {} is already dead, skip reportRegionStateTransition call", 1295 serverName); 1296 builder.setErrorMessage("You are dead"); 1297 } 1298 } finally { 1299 serverNode.readLock().unlock(); 1300 } 1301 1302 return builder.build(); 1303 } 1304 1305 private void updateRegionTransition(ServerStateNode serverNode, TransitionCode state, 1306 RegionInfo regionInfo, long seqId, long procId) throws IOException { 1307 checkMetaLoaded(regionInfo, procId); 1308 1309 RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo); 1310 if (regionNode == null) { 1311 // the table/region is gone. maybe a delete, split, merge 1312 throw new UnexpectedStateException(String.format( 1313 "Server %s was trying to transition region %s to %s. but Region is not known.", 1314 serverNode.getServerName(), regionInfo, state)); 1315 } 1316 LOG.trace("Update region transition serverName={} region={} regionState={}", 1317 serverNode.getServerName(), regionNode, state); 1318 1319 regionNode.lock(); 1320 try { 1321 if (!reportTransition(regionNode, serverNode, state, seqId, procId)) { 1322 // Don't log WARN if shutting down cluster; during shutdown. Avoid the below messages: 1323 // 2018-08-13 10:45:10,551 WARN ...AssignmentManager: No matching procedure found for 1324 // rit=OPEN, location=ve0538.halxg.cloudera.com,16020,1533493000958, 1325 // table=IntegrationTestBigLinkedList, region=65ab289e2fc1530df65f6c3d7cde7aa5 transition 1326 // to CLOSED 1327 // These happen because on cluster shutdown, we currently let the RegionServers close 1328 // regions. This is the only time that region close is not run by the Master (so cluster 1329 // goes down fast). Consider changing it so Master runs all shutdowns. 1330 if ( 1331 this.master.getServerManager().isClusterShutdown() && state.equals(TransitionCode.CLOSED) 1332 ) { 1333 LOG.info("RegionServer {} {}", state, regionNode.getRegionInfo().getEncodedName()); 1334 } else { 1335 LOG.warn("No matching procedure found for {} transition on {} to {}", 1336 serverNode.getServerName(), regionNode, state); 1337 } 1338 } 1339 } finally { 1340 regionNode.unlock(); 1341 } 1342 } 1343 1344 private boolean reportTransition(RegionStateNode regionNode, ServerStateNode serverNode, 1345 TransitionCode state, long seqId, long procId) throws IOException { 1346 ServerName serverName = serverNode.getServerName(); 1347 TransitRegionStateProcedure proc = regionNode.getProcedure(); 1348 if (proc == null) { 1349 return false; 1350 } 1351 proc.reportTransition(master.getMasterProcedureExecutor().getEnvironment(), regionNode, 1352 serverName, state, seqId, procId); 1353 return true; 1354 } 1355 1356 private void updateRegionSplitTransition(final ServerStateNode serverNode, 1357 final TransitionCode state, final RegionInfo parent, final RegionInfo hriA, 1358 final RegionInfo hriB) throws IOException { 1359 checkMetaLoaded(parent, Procedure.NO_PROC_ID); 1360 1361 if (state != TransitionCode.READY_TO_SPLIT) { 1362 throw new UnexpectedStateException( 1363 "unsupported split regionState=" + state + " for parent region " + parent 1364 + " maybe an old RS (< 2.0) had the operation in progress"); 1365 } 1366 1367 // sanity check on the request 1368 if (!Bytes.equals(hriA.getEndKey(), hriB.getStartKey())) { 1369 throw new UnsupportedOperationException("unsupported split request with bad keys: parent=" 1370 + parent + " hriA=" + hriA + " hriB=" + hriB); 1371 } 1372 1373 if (!master.isSplitOrMergeEnabled(MasterSwitchType.SPLIT)) { 1374 LOG.warn("Split switch is off! skip split of " + parent); 1375 throw new DoNotRetryIOException( 1376 "Split region " + parent.getRegionNameAsString() + " failed due to split switch off"); 1377 } 1378 1379 // Submit the Split procedure 1380 final byte[] splitKey = hriB.getStartKey(); 1381 if (LOG.isDebugEnabled()) { 1382 LOG.debug("Split request from {}, parent={}, splitKey={}", serverNode.getServerName(), parent, 1383 Bytes.toStringBinary(splitKey)); 1384 } 1385 // Processing this report happens asynchronously from other activities which can mutate 1386 // the region state. For example, a split procedure may already be running for this parent. 1387 // A split procedure cannot succeed if the parent region is no longer open, so we can 1388 // ignore it in that case. 1389 // Note that submitting more than one split procedure for a given region is 1390 // harmless -- the split is fenced in the procedure handling -- but it would be noisy in 1391 // the logs. Only one procedure can succeed. The other procedure(s) would abort during 1392 // initialization and report failure with WARN level logging. 1393 RegionState parentState = regionStates.getRegionState(parent); 1394 if (parentState != null && parentState.isOpened()) { 1395 master.getMasterProcedureExecutor().submitProcedure(createSplitProcedure(parent, splitKey)); 1396 } else { 1397 LOG.info("Ignoring split request from {}, parent={} because parent is unknown or not open", 1398 serverNode.getServerName(), parent); 1399 return; 1400 } 1401 1402 // If the RS is < 2.0 throw an exception to abort the operation, we are handling the split 1403 if (master.getServerManager().getVersionNumber(serverNode.getServerName()) < 0x0200000) { 1404 throw new UnsupportedOperationException( 1405 String.format("Split handled by the master: " + "parent=%s hriA=%s hriB=%s", 1406 parent.getShortNameToLog(), hriA, hriB)); 1407 } 1408 } 1409 1410 private void updateRegionMergeTransition(final ServerStateNode serverNode, 1411 final TransitionCode state, final RegionInfo merged, final RegionInfo hriA, 1412 final RegionInfo hriB) throws IOException { 1413 checkMetaLoaded(merged, Procedure.NO_PROC_ID); 1414 1415 if (state != TransitionCode.READY_TO_MERGE) { 1416 throw new UnexpectedStateException( 1417 "Unsupported merge regionState=" + state + " for regionA=" + hriA + " regionB=" + hriB 1418 + " merged=" + merged + " maybe an old RS (< 2.0) had the operation in progress"); 1419 } 1420 1421 if (!master.isSplitOrMergeEnabled(MasterSwitchType.MERGE)) { 1422 LOG.warn("Merge switch is off! skip merge of regionA=" + hriA + " regionB=" + hriB); 1423 throw new DoNotRetryIOException( 1424 "Merge of regionA=" + hriA + " regionB=" + hriB + " failed because merge switch is off"); 1425 } 1426 1427 // Submit the Merge procedure 1428 if (LOG.isDebugEnabled()) { 1429 LOG.debug("Handling merge request from RS=" + merged + ", merged=" + merged); 1430 } 1431 master.getMasterProcedureExecutor().submitProcedure(createMergeProcedure(hriA, hriB)); 1432 1433 // If the RS is < 2.0 throw an exception to abort the operation, we are handling the merge 1434 if (master.getServerManager().getVersionNumber(serverNode.getServerName()) < 0x0200000) { 1435 throw new UnsupportedOperationException( 1436 String.format("Merge not handled yet: regionState=%s merged=%s hriA=%s hriB=%s", state, 1437 merged, hriA, hriB)); 1438 } 1439 } 1440 1441 // ============================================================================================ 1442 // RS Status update (report online regions) helpers 1443 // ============================================================================================ 1444 /** 1445 * The master will call this method when the RS send the regionServerReport(). The report will 1446 * contains the "online regions". This method will check the the online regions against the 1447 * in-memory state of the AM, and we will log a warn message if there is a mismatch. This is 1448 * because that there is no fencing between the reportRegionStateTransition method and 1449 * regionServerReport method, so there could be race and introduce inconsistency here, but 1450 * actually there is no problem. 1451 * <p/> 1452 * Please see HBASE-21421 and HBASE-21463 for more details. 1453 */ 1454 public void reportOnlineRegions(ServerName serverName, Set<byte[]> regionNames) { 1455 if (!isRunning()) { 1456 return; 1457 } 1458 if (LOG.isTraceEnabled()) { 1459 LOG.trace("ReportOnlineRegions {} regionCount={}, metaLoaded={} {}", serverName, 1460 regionNames.size(), isMetaLoaded(), 1461 regionNames.stream().map(Bytes::toStringBinary).collect(Collectors.toList())); 1462 } 1463 1464 ServerStateNode serverNode = regionStates.getServerNode(serverName); 1465 if (serverNode == null) { 1466 LOG.warn("Got a report from server {} where its server node is null", serverName); 1467 return; 1468 } 1469 serverNode.readLock().lock(); 1470 try { 1471 if (!serverNode.isInState(ServerState.ONLINE)) { 1472 LOG.warn("Got a report from a server result in state {}", serverNode); 1473 return; 1474 } 1475 } finally { 1476 serverNode.readLock().unlock(); 1477 } 1478 1479 // Track the regionserver reported online regions in memory. 1480 synchronized (rsReports) { 1481 rsReports.put(serverName, regionNames); 1482 } 1483 1484 if (regionNames.isEmpty()) { 1485 // nothing to do if we don't have regions 1486 LOG.trace("no online region found on {}", serverName); 1487 return; 1488 } 1489 if (!isMetaLoaded()) { 1490 // we are still on startup, skip checking 1491 return; 1492 } 1493 // The Heartbeat tells us of what regions are on the region serve, check the state. 1494 checkOnlineRegionsReport(serverNode, regionNames); 1495 } 1496 1497 /** 1498 * Close <code>regionName</code> on <code>sn</code> silently and immediately without using a 1499 * Procedure or going via hbase:meta. For case where a RegionServer's hosting of a Region is not 1500 * aligned w/ the Master's accounting of Region state. This is for cleaning up an error in 1501 * accounting. 1502 */ 1503 private void closeRegionSilently(ServerName sn, byte[] regionName) { 1504 try { 1505 RegionInfo ri = CatalogFamilyFormat.parseRegionInfoFromRegionName(regionName); 1506 // Pass -1 for timeout. Means do not wait. 1507 ServerManager.closeRegionSilentlyAndWait(this.master.getAsyncClusterConnection(), sn, ri, -1); 1508 } catch (Exception e) { 1509 LOG.error("Failed trying to close {} on {}", Bytes.toStringBinary(regionName), sn, e); 1510 } 1511 } 1512 1513 /** 1514 * Check that what the RegionServer reports aligns with the Master's image. If disagreement, we 1515 * will tell the RegionServer to expediently close a Region we do not think it should have. 1516 */ 1517 private void checkOnlineRegionsReport(ServerStateNode serverNode, Set<byte[]> regionNames) { 1518 ServerName serverName = serverNode.getServerName(); 1519 for (byte[] regionName : regionNames) { 1520 if (!isRunning()) { 1521 return; 1522 } 1523 RegionStateNode regionNode = regionStates.getRegionStateNodeFromName(regionName); 1524 if (regionNode == null) { 1525 String regionNameAsStr = Bytes.toStringBinary(regionName); 1526 LOG.warn("No RegionStateNode for {} but reported as up on {}; closing...", regionNameAsStr, 1527 serverName); 1528 closeRegionSilently(serverNode.getServerName(), regionName); 1529 continue; 1530 } 1531 final long lag = 1000; 1532 // This is just a fallback check designed to identify unexpected data inconsistencies, so we 1533 // use tryLock to attempt to acquire the lock, and if the lock cannot be acquired, we skip the 1534 // check. This will not cause any additional problems and also prevents the regionServerReport 1535 // call from being stuck for too long which may cause deadlock on region assignment. 1536 if (regionNode.tryLock()) { 1537 try { 1538 long diff = EnvironmentEdgeManager.currentTime() - regionNode.getLastUpdate(); 1539 if (regionNode.isInState(State.OPENING, State.OPEN)) { 1540 // This is possible as a region server has just closed a region but the region server 1541 // report is generated before the closing, but arrive after the closing. Make sure 1542 // there 1543 // is some elapsed time so less false alarms. 1544 if (!regionNode.getRegionLocation().equals(serverName) && diff > lag) { 1545 LOG.warn("Reporting {} server does not match {} (time since last " 1546 + "update={}ms); closing...", serverName, regionNode, diff); 1547 closeRegionSilently(serverNode.getServerName(), regionName); 1548 } 1549 } else if (!regionNode.isInState(State.CLOSING, State.SPLITTING)) { 1550 // So, we can get report that a region is CLOSED or SPLIT because a heartbeat 1551 // came in at about same time as a region transition. Make sure there is some 1552 // elapsed time so less false alarms. 1553 if (diff > lag) { 1554 LOG.warn("Reporting {} state does not match {} (time since last update={}ms)", 1555 serverName, regionNode, diff); 1556 } 1557 } 1558 } finally { 1559 regionNode.unlock(); 1560 } 1561 } else { 1562 LOG.warn( 1563 "Unable to acquire lock for regionNode {}. It is likely that another thread is currently holding the lock. To avoid deadlock, skip execution for now.", 1564 regionNode); 1565 } 1566 } 1567 } 1568 1569 // ============================================================================================ 1570 // RIT chore 1571 // ============================================================================================ 1572 private static class RegionInTransitionChore extends ProcedureInMemoryChore<MasterProcedureEnv> { 1573 public RegionInTransitionChore(final int timeoutMsec) { 1574 super(timeoutMsec); 1575 } 1576 1577 @Override 1578 protected void periodicExecute(final MasterProcedureEnv env) { 1579 final AssignmentManager am = env.getAssignmentManager(); 1580 1581 final RegionInTransitionStat ritStat = am.computeRegionInTransitionStat(); 1582 if (ritStat.hasRegionsOverThreshold()) { 1583 for (RegionState hri : ritStat.getRegionOverThreshold()) { 1584 am.handleRegionOverStuckWarningThreshold(hri.getRegion()); 1585 } 1586 } 1587 1588 // update metrics 1589 am.updateRegionsInTransitionMetrics(ritStat); 1590 } 1591 } 1592 1593 private static class DeadServerMetricRegionChore 1594 extends ProcedureInMemoryChore<MasterProcedureEnv> { 1595 public DeadServerMetricRegionChore(final int timeoutMsec) { 1596 super(timeoutMsec); 1597 } 1598 1599 @Override 1600 protected void periodicExecute(final MasterProcedureEnv env) { 1601 final ServerManager sm = env.getMasterServices().getServerManager(); 1602 final AssignmentManager am = env.getAssignmentManager(); 1603 // To minimize inconsistencies we are not going to snapshot live servers in advance in case 1604 // new servers are added; OTOH we don't want to add heavy sync for a consistent view since 1605 // this is for metrics. Instead, we're going to check each regions as we go; to avoid making 1606 // too many checks, we maintain a local lists of server, limiting us to false negatives. If 1607 // we miss some recently-dead server, we'll just see it next time. 1608 Set<ServerName> recentlyLiveServers = new HashSet<>(); 1609 int deadRegions = 0, unknownRegions = 0; 1610 for (RegionStateNode rsn : am.getRegionStates().getRegionStateNodes()) { 1611 if (rsn.getState() != State.OPEN) { 1612 continue; // Opportunistic check, should quickly skip RITs, offline tables, etc. 1613 } 1614 // Do not need to acquire region state lock as this is only for showing metrics. 1615 ServerName sn = rsn.getRegionLocation(); 1616 State state = rsn.getState(); 1617 if (state != State.OPEN) { 1618 continue; // Mostly skipping RITs that are already being take care of. 1619 } 1620 if (sn == null) { 1621 ++unknownRegions; // Opened on null? 1622 continue; 1623 } 1624 if (recentlyLiveServers.contains(sn)) { 1625 continue; 1626 } 1627 ServerManager.ServerLiveState sls = sm.isServerKnownAndOnline(sn); 1628 switch (sls) { 1629 case LIVE: 1630 recentlyLiveServers.add(sn); 1631 break; 1632 case DEAD: 1633 ++deadRegions; 1634 break; 1635 case UNKNOWN: 1636 ++unknownRegions; 1637 break; 1638 default: 1639 throw new AssertionError("Unexpected " + sls); 1640 } 1641 } 1642 if (deadRegions > 0 || unknownRegions > 0) { 1643 LOG.info("Found {} OPEN regions on dead servers and {} OPEN regions on unknown servers", 1644 deadRegions, unknownRegions); 1645 } 1646 1647 am.updateDeadServerRegionMetrics(deadRegions, unknownRegions); 1648 } 1649 } 1650 1651 public RegionInTransitionStat computeRegionInTransitionStat() { 1652 final RegionInTransitionStat rit = new RegionInTransitionStat(getConfiguration()); 1653 rit.update(this); 1654 return rit; 1655 } 1656 1657 public static class RegionInTransitionStat { 1658 private final int ritThreshold; 1659 1660 private HashMap<String, RegionState> ritsOverThreshold = null; 1661 private long statTimestamp; 1662 private long oldestRITTime = 0; 1663 private int totalRITsTwiceThreshold = 0; 1664 private int totalRITs = 0; 1665 1666 public RegionInTransitionStat(final Configuration conf) { 1667 this.ritThreshold = 1668 conf.getInt(METRICS_RIT_STUCK_WARNING_THRESHOLD, DEFAULT_RIT_STUCK_WARNING_THRESHOLD); 1669 } 1670 1671 public int getRITThreshold() { 1672 return ritThreshold; 1673 } 1674 1675 public long getTimestamp() { 1676 return statTimestamp; 1677 } 1678 1679 public int getTotalRITs() { 1680 return totalRITs; 1681 } 1682 1683 public long getOldestRITTime() { 1684 return oldestRITTime; 1685 } 1686 1687 public int getTotalRITsOverThreshold() { 1688 Map<String, RegionState> m = this.ritsOverThreshold; 1689 return m != null ? m.size() : 0; 1690 } 1691 1692 public boolean hasRegionsTwiceOverThreshold() { 1693 return totalRITsTwiceThreshold > 0; 1694 } 1695 1696 public boolean hasRegionsOverThreshold() { 1697 Map<String, RegionState> m = this.ritsOverThreshold; 1698 return m != null && !m.isEmpty(); 1699 } 1700 1701 public Collection<RegionState> getRegionOverThreshold() { 1702 Map<String, RegionState> m = this.ritsOverThreshold; 1703 return m != null ? m.values() : Collections.emptySet(); 1704 } 1705 1706 public boolean isRegionOverThreshold(final RegionInfo regionInfo) { 1707 Map<String, RegionState> m = this.ritsOverThreshold; 1708 return m != null && m.containsKey(regionInfo.getEncodedName()); 1709 } 1710 1711 public boolean isRegionTwiceOverThreshold(final RegionInfo regionInfo) { 1712 Map<String, RegionState> m = this.ritsOverThreshold; 1713 if (m == null) { 1714 return false; 1715 } 1716 final RegionState state = m.get(regionInfo.getEncodedName()); 1717 if (state == null) { 1718 return false; 1719 } 1720 return (statTimestamp - state.getStamp()) > (ritThreshold * 2); 1721 } 1722 1723 protected void update(final AssignmentManager am) { 1724 this.statTimestamp = EnvironmentEdgeManager.currentTime(); 1725 update(am.getRegionsStateInTransition(), statTimestamp); 1726 1727 if (LOG.isDebugEnabled() && ritsOverThreshold != null && !ritsOverThreshold.isEmpty()) { 1728 LOG.debug("RITs over threshold: {}", 1729 ritsOverThreshold.entrySet().stream() 1730 .map(e -> e.getKey() + ":" + e.getValue().getState().name()) 1731 .collect(Collectors.joining("\n"))); 1732 } 1733 } 1734 1735 private void update(final Collection<RegionState> regions, final long currentTime) { 1736 for (RegionState state : regions) { 1737 totalRITs++; 1738 final long ritStartedMs = state.getStamp(); 1739 if (ritStartedMs == 0) { 1740 // Don't output bogus values to metrics if they accidentally make it here. 1741 LOG.warn("The RIT {} has no start time", state.getRegion()); 1742 continue; 1743 } 1744 final long ritTime = currentTime - ritStartedMs; 1745 if (ritTime > ritThreshold) { 1746 if (ritsOverThreshold == null) { 1747 ritsOverThreshold = new HashMap<String, RegionState>(); 1748 } 1749 ritsOverThreshold.put(state.getRegion().getEncodedName(), state); 1750 totalRITsTwiceThreshold += (ritTime > (ritThreshold * 2)) ? 1 : 0; 1751 } 1752 if (oldestRITTime < ritTime) { 1753 oldestRITTime = ritTime; 1754 } 1755 } 1756 } 1757 } 1758 1759 private void updateRegionsInTransitionMetrics(final RegionInTransitionStat ritStat) { 1760 metrics.updateRITOldestAge(ritStat.getOldestRITTime()); 1761 metrics.updateRITCount(ritStat.getTotalRITs()); 1762 metrics.updateRITCountOverThreshold(ritStat.getTotalRITsOverThreshold()); 1763 } 1764 1765 private void updateDeadServerRegionMetrics(int deadRegions, int unknownRegions) { 1766 metrics.updateDeadServerOpenRegions(deadRegions); 1767 metrics.updateUnknownServerOpenRegions(unknownRegions); 1768 } 1769 1770 private void handleRegionOverStuckWarningThreshold(final RegionInfo regionInfo) { 1771 final RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo); 1772 // if (regionNode.isStuck()) { 1773 LOG.warn("STUCK Region-In-Transition {}", regionNode); 1774 } 1775 1776 // ============================================================================================ 1777 // TODO: Master load/bootstrap 1778 // ============================================================================================ 1779 public void joinCluster() throws IOException { 1780 long startTime = System.nanoTime(); 1781 LOG.debug("Joining cluster..."); 1782 1783 // Scan hbase:meta to build list of existing regions, servers, and assignment. 1784 // hbase:meta is online now or will be. Inside loadMeta, we keep trying. Can't make progress 1785 // w/o meta. 1786 loadMeta(); 1787 1788 while (master.getServerManager().countOfRegionServers() < 1) { 1789 LOG.info("Waiting for RegionServers to join; current count={}", 1790 master.getServerManager().countOfRegionServers()); 1791 Threads.sleep(250); 1792 } 1793 LOG.info("Number of RegionServers={}", master.getServerManager().countOfRegionServers()); 1794 1795 // Start the chores 1796 master.getMasterProcedureExecutor().addChore(this.ritChore); 1797 master.getMasterProcedureExecutor().addChore(this.deadMetricChore); 1798 1799 long costMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime); 1800 LOG.info("Joined the cluster in {}", StringUtils.humanTimeDiff(costMs)); 1801 } 1802 1803 /** 1804 * Create assign procedure for offline regions. Just follow the old 1805 * processofflineServersWithOnlineRegions method. Since now we do not need to deal with dead 1806 * server any more, we only deal with the regions in OFFLINE state in this method. And this is a 1807 * bit strange, that for new regions, we will add it in CLOSED state instead of OFFLINE state, and 1808 * usually there will be a procedure to track them. The processofflineServersWithOnlineRegions is 1809 * a legacy from long ago, as things are going really different now, maybe we do not need this 1810 * method any more. Need to revisit later. 1811 */ 1812 // Public so can be run by the Master as part of the startup. Needs hbase:meta to be online. 1813 // Needs to be done after the table state manager has been started. 1814 public void processOfflineRegions() { 1815 TransitRegionStateProcedure[] procs = 1816 regionStates.getRegionStateNodes().stream().filter(rsn -> rsn.isInState(State.OFFLINE)) 1817 .filter(rsn -> isTableEnabled(rsn.getRegionInfo().getTable())).map(rsn -> { 1818 rsn.lock(); 1819 try { 1820 if (rsn.getProcedure() != null) { 1821 return null; 1822 } else { 1823 return rsn.setProcedure(TransitRegionStateProcedure.assign(getProcedureEnvironment(), 1824 rsn.getRegionInfo(), null)); 1825 } 1826 } finally { 1827 rsn.unlock(); 1828 } 1829 }).filter(p -> p != null).toArray(TransitRegionStateProcedure[]::new); 1830 if (procs.length > 0) { 1831 master.getMasterProcedureExecutor().submitProcedures(procs); 1832 } 1833 } 1834 1835 /* 1836 * AM internal RegionStateStore.RegionStateVisitor implementation. To be used when scanning META 1837 * table for region rows, using RegionStateStore utility methods. RegionStateStore methods will 1838 * convert Result into proper RegionInfo instances, but those would still need to be added into 1839 * AssignmentManager.regionStates in-memory cache. RegionMetaLoadingVisitor.visitRegionState 1840 * method provides the logic for adding RegionInfo instances as loaded from latest META scan into 1841 * AssignmentManager.regionStates. 1842 */ 1843 private class RegionMetaLoadingVisitor implements RegionStateStore.RegionStateVisitor { 1844 1845 @Override 1846 public void visitRegionState(Result result, final RegionInfo regionInfo, final State state, 1847 final ServerName regionLocation, final ServerName lastHost, final long openSeqNum) { 1848 if ( 1849 state == null && regionLocation == null && lastHost == null 1850 && openSeqNum == SequenceId.NO_SEQUENCE_ID 1851 ) { 1852 // This is a row with nothing in it. 1853 LOG.warn("Skipping empty row={}", result); 1854 return; 1855 } 1856 State localState = state; 1857 if (localState == null) { 1858 // No region state column data in hbase:meta table! Are I doing a rolling upgrade from 1859 // hbase1 to hbase2? Am I restoring a SNAPSHOT or otherwise adding a region to hbase:meta? 1860 // In any of these cases, state is empty. For now, presume OFFLINE but there are probably 1861 // cases where we need to probe more to be sure this correct; TODO informed by experience. 1862 LOG.info(regionInfo.getEncodedName() + " regionState=null; presuming " + State.OFFLINE); 1863 localState = State.OFFLINE; 1864 } 1865 RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo); 1866 // Do not need to lock on regionNode, as we can make sure that before we finish loading 1867 // meta, all the related procedures can not be executed. The only exception is for meta 1868 // region related operations, but here we do not load the informations for meta region. 1869 regionNode.setState(localState); 1870 regionNode.setLastHost(lastHost); 1871 regionNode.setRegionLocation(regionLocation); 1872 regionNode.setOpenSeqNum(openSeqNum); 1873 1874 // Note: keep consistent with other methods, see region(Opening|Opened|Closing) 1875 // RIT/ServerCrash handling should take care of the transiting regions. 1876 if ( 1877 localState.matches(State.OPEN, State.OPENING, State.CLOSING, State.SPLITTING, State.MERGING) 1878 ) { 1879 assert regionLocation != null : "found null region location for " + regionNode; 1880 // TODO: this could lead to some orphan server state nodes, as it is possible that the 1881 // region server is already dead and its SCP has already finished but we have 1882 // persisted an opening state on this region server. Finally the TRSP will assign the 1883 // region to another region server, so it will not cause critical problems, just waste 1884 // some memory as no one will try to cleanup these orphan server state nodes. 1885 regionStates.createServer(regionLocation); 1886 regionStates.addRegionToServer(regionNode); 1887 } else if (localState == State.OFFLINE || regionInfo.isOffline()) { 1888 regionStates.addToOfflineRegions(regionNode); 1889 } 1890 if (regionNode.getProcedure() != null) { 1891 regionNode.getProcedure().stateLoaded(AssignmentManager.this, regionNode); 1892 } 1893 // add regions to RIT while visiting the meta 1894 regionInTransitionTracker.handleRegionStateNodeOperation(regionNode); 1895 // If region location of region belongs to a dead server mark the region crashed 1896 if ( 1897 regionNode.getRegionLocation() != null 1898 && master.getServerManager().isServerDead(regionNode.getRegionLocation()) 1899 ) { 1900 long timeOfCrash = master.getServerManager().getDeadServers() 1901 .getDeathTimestamp(regionNode.getRegionLocation()); 1902 if (timeOfCrash != 0) { 1903 regionNode.crashed(timeOfCrash); 1904 } 1905 regionInTransitionTracker.regionCrashed(regionNode); 1906 } 1907 } 1908 }; 1909 1910 /** 1911 * Attempt to load {@code regionInfo} from META, adding any results to the 1912 * {@link #regionStateStore} Is NOT aware of replica regions. 1913 * @param regionInfo the region to be loaded from META. 1914 * @throws IOException If some error occurs while querying META or parsing results. 1915 */ 1916 public void populateRegionStatesFromMeta(@NonNull final RegionInfo regionInfo) 1917 throws IOException { 1918 final String regionEncodedName = RegionInfo.DEFAULT_REPLICA_ID == regionInfo.getReplicaId() 1919 ? regionInfo.getEncodedName() 1920 : RegionInfoBuilder.newBuilder(regionInfo).setReplicaId(RegionInfo.DEFAULT_REPLICA_ID).build() 1921 .getEncodedName(); 1922 populateRegionStatesFromMeta(regionEncodedName); 1923 } 1924 1925 /** 1926 * Attempt to load {@code regionEncodedName} from META, adding any results to the 1927 * {@link #regionStateStore} Is NOT aware of replica regions. 1928 * @param regionEncodedName encoded name for the region to be loaded from META. 1929 * @throws IOException If some error occurs while querying META or parsing results. 1930 */ 1931 public void populateRegionStatesFromMeta(@NonNull String regionEncodedName) throws IOException { 1932 final RegionMetaLoadingVisitor visitor = new RegionMetaLoadingVisitor(); 1933 regionStateStore.visitMetaForRegion(regionEncodedName, visitor); 1934 } 1935 1936 private void loadMeta() throws IOException { 1937 // TODO: use a thread pool 1938 regionStateStore.visitMeta(new RegionMetaLoadingVisitor()); 1939 } 1940 1941 /** 1942 * Used to check if the meta loading is done. 1943 * <p/> 1944 * if not we throw PleaseHoldException since we are rebuilding the RegionStates 1945 * @param hri region to check if it is already rebuild 1946 * @param procId the procedure id for this region operation, or NO_PROC_ID if not available 1947 * @throws PleaseHoldException if meta has not been loaded yet 1948 */ 1949 private void checkMetaLoaded(RegionInfo hri, long procId) throws PleaseHoldException { 1950 if (!isRunning()) { 1951 throw new PleaseHoldException("AssignmentManager not running"); 1952 } 1953 1954 // Check if the procedure is for a critical system table 1955 // Critical system tables can proceed even if meta is not loaded yet 1956 // We are currently making procId available only for the code path which can execute during the 1957 // cluster boot up. In the future, if additional code paths execute during cluster boot up, we 1958 // will need to make procId available for all those code paths. 1959 if (procId != Procedure.NO_PROC_ID) { 1960 Procedure<?> proc = master.getMasterProcedureExecutor().getProcedure(procId); 1961 if (proc != null && proc.isCriticalSystemTable()) { 1962 return; 1963 } 1964 } 1965 1966 boolean meta = isMetaRegion(hri); 1967 boolean metaLoaded = isMetaLoaded(); 1968 if (!meta && !metaLoaded) { 1969 throw new PleaseHoldException("Master not fully online; " + TableName.META_TABLE_NAME + "=" 1970 + meta + ", metaLoaded=" + metaLoaded); 1971 } 1972 } 1973 1974 // ============================================================================================ 1975 // TODO: Metrics 1976 // ============================================================================================ 1977 public int getNumRegionsOpened() { 1978 // TODO: Used by TestRegionPlacement.java and assume monotonically increasing value 1979 return 0; 1980 } 1981 1982 /** 1983 * Usually run by the Master in reaction to server crash during normal processing. Can also be 1984 * invoked via external RPC to effect repair; in the latter case, the 'force' flag is set so we 1985 * push through the SCP though context may indicate already-running-SCP (An old SCP may have 1986 * exited abnormally, or damaged cluster may still have references in hbase:meta to 'Unknown 1987 * Servers' -- servers that are not online or in dead servers list, etc.) 1988 * @param force Set if the request came in externally over RPC (via hbck2). Force means run the 1989 * SCP even if it seems as though there might be an outstanding SCP running. 1990 * @return pid of scheduled SCP or {@link Procedure#NO_PROC_ID} if none scheduled. 1991 */ 1992 public long submitServerCrash(ServerName serverName, boolean shouldSplitWal, boolean force) { 1993 // May be an 'Unknown Server' so handle case where serverNode is null. 1994 ServerStateNode serverNode = regionStates.getServerNode(serverName); 1995 // Remove the in-memory rsReports result 1996 synchronized (rsReports) { 1997 rsReports.remove(serverName); 1998 } 1999 if (serverNode == null) { 2000 if (force) { 2001 LOG.info("Force adding ServerCrashProcedure for {} when server node is null", serverName); 2002 } else { 2003 // for normal case, do not schedule SCP if ServerStateNode is null 2004 LOG.warn("Skip adding ServerCrashProcedure for {} because server node is null", serverName); 2005 return Procedure.NO_PROC_ID; 2006 } 2007 } 2008 2009 ProcedureExecutor<MasterProcedureEnv> procExec = this.master.getMasterProcedureExecutor(); 2010 // We hold the write lock here for fencing on reportRegionStateTransition. Once we set the 2011 // server state to CRASHED, we will no longer accept the reportRegionStateTransition call from 2012 // this server. This is used to simplify the implementation for TRSP and SCP, where we can make 2013 // sure that, the region list fetched by SCP will not be changed any more. 2014 if (serverNode != null) { 2015 serverNode.writeLock().lock(); 2016 } 2017 try { 2018 2019 boolean carryingMeta = isCarryingMeta(serverName); 2020 if (serverNode != null && !serverNode.isInState(ServerState.ONLINE)) { 2021 if (force) { 2022 LOG.info("Force adding ServerCrashProcedure for {} (meta={}) when state is not {}", 2023 serverNode, carryingMeta, ServerState.ONLINE); 2024 } else { 2025 LOG.info("Skip adding ServerCrashProcedure for {} (meta={}) when state is not {}", 2026 serverNode, carryingMeta, ServerState.ONLINE); 2027 return Procedure.NO_PROC_ID; 2028 } 2029 } 2030 MasterProcedureEnv mpe = procExec.getEnvironment(); 2031 // If serverNode == null, then 'Unknown Server'. Schedule HBCKSCP instead. 2032 // HBCKSCP scours Master in-memory state AND hbase;meta for references to 2033 // serverName just-in-case. An SCP that is scheduled when the server is 2034 // 'Unknown' probably originated externally with HBCK2 fix-it tool. 2035 ServerState oldState = null; 2036 if (serverNode != null) { 2037 oldState = serverNode.getState(); 2038 serverNode.setState(ServerState.CRASHED); 2039 } 2040 ServerCrashProcedure scp = force 2041 ? new HBCKServerCrashProcedure(mpe, serverName, shouldSplitWal, carryingMeta) 2042 : new ServerCrashProcedure(mpe, serverName, shouldSplitWal, carryingMeta); 2043 long pid = procExec.submitProcedure(scp); 2044 LOG.info("Scheduled ServerCrashProcedure pid={} for {} (carryingMeta={}){}.", pid, serverName, 2045 carryingMeta, 2046 serverNode == null ? "" : " " + serverNode.toString() + ", oldState=" + oldState); 2047 return pid; 2048 } finally { 2049 if (serverNode != null) { 2050 serverNode.writeLock().unlock(); 2051 } 2052 } 2053 } 2054 2055 public void offlineRegion(final RegionInfo regionInfo) { 2056 // TODO used by MasterRpcServices 2057 RegionStateNode node = regionStates.getRegionStateNode(regionInfo); 2058 if (node != null) { 2059 node.offline(); 2060 } 2061 } 2062 2063 public void onlineRegion(final RegionInfo regionInfo, final ServerName serverName) { 2064 // TODO used by TestSplitTransactionOnCluster.java 2065 } 2066 2067 public Map<ServerName, List<RegionInfo>> 2068 getSnapShotOfAssignment(final Collection<RegionInfo> regions) { 2069 return regionStates.getSnapShotOfAssignment(regions); 2070 } 2071 2072 // ============================================================================================ 2073 // TODO: UTILS/HELPERS? 2074 // ============================================================================================ 2075 /** 2076 * Used by the client (via master) to identify if all regions have the schema updates 2077 * @return Pair indicating the status of the alter command (pending/total) 2078 */ 2079 public Pair<Integer, Integer> getReopenStatus(TableName tableName) { 2080 if (isTableDisabled(tableName)) { 2081 return new Pair<Integer, Integer>(0, 0); 2082 } 2083 2084 final List<RegionState> states = regionStates.getTableRegionStates(tableName); 2085 int ritCount = 0; 2086 for (RegionState regionState : states) { 2087 if (!regionState.isOpened() && !regionState.isSplit()) { 2088 ritCount++; 2089 } 2090 } 2091 return new Pair<Integer, Integer>(ritCount, states.size()); 2092 } 2093 2094 // This comparator sorts the RegionStates by time stamp then Region name. 2095 // Comparing by timestamp alone can lead us to discard different RegionStates that happen 2096 // to share a timestamp. 2097 private final static class RegionStateStampComparator implements Comparator<RegionState> { 2098 @Override 2099 public int compare(final RegionState l, final RegionState r) { 2100 int stampCmp = Long.compare(l.getStamp(), r.getStamp()); 2101 return stampCmp != 0 ? stampCmp : RegionInfo.COMPARATOR.compare(l.getRegion(), r.getRegion()); 2102 } 2103 } 2104 2105 public final static RegionStateStampComparator REGION_STATE_STAMP_COMPARATOR = 2106 new RegionStateStampComparator(); 2107 2108 // ============================================================================================ 2109 // TODO: Region State In Transition 2110 // ============================================================================================ 2111 public boolean hasRegionsInTransition() { 2112 return regionInTransitionTracker.hasRegionsInTransition(); 2113 } 2114 2115 public List<RegionStateNode> getRegionsInTransition() { 2116 return regionInTransitionTracker.getRegionsInTransition(); 2117 } 2118 2119 public boolean isRegionInTransition(final RegionInfo regionInfo) { 2120 return regionInTransitionTracker.isRegionInTransition(regionInfo); 2121 } 2122 2123 public int getRegionTransitScheduledCount() { 2124 return regionStates.getRegionTransitScheduledCount(); 2125 } 2126 2127 /** 2128 * Get the number of regions in transition. 2129 */ 2130 public int getRegionsInTransitionCount() { 2131 return regionInTransitionTracker.getRegionsInTransition().size(); 2132 } 2133 2134 public SortedSet<RegionState> getRegionsStateInTransition() { 2135 final SortedSet<RegionState> rit = new TreeSet<RegionState>(REGION_STATE_STAMP_COMPARATOR); 2136 for (RegionStateNode node : getRegionsInTransition()) { 2137 rit.add(node.toRegionState()); 2138 } 2139 return rit; 2140 } 2141 2142 public List<RegionInfo> getAssignedRegions() { 2143 return regionStates.getAssignedRegions(); 2144 } 2145 2146 /** 2147 * Resolve a cached {@link RegionInfo} from the region name as a {@code byte[]}. 2148 */ 2149 public RegionInfo getRegionInfo(final byte[] regionName) { 2150 final RegionStateNode regionState = regionStates.getRegionStateNodeFromName(regionName); 2151 return regionState != null ? regionState.getRegionInfo() : null; 2152 } 2153 2154 /** 2155 * Resolve a cached {@link RegionInfo} from the encoded region name as a {@code String}. 2156 */ 2157 public RegionInfo getRegionInfo(final String encodedRegionName) { 2158 final RegionStateNode regionState = 2159 regionStates.getRegionStateNodeFromEncodedRegionName(encodedRegionName); 2160 return regionState != null ? regionState.getRegionInfo() : null; 2161 } 2162 2163 // ============================================================================================ 2164 // Expected states on region state transition. 2165 // Notice that there is expected states for transiting to OPENING state, this is because SCP. 2166 // See the comments in regionOpening method for more details. 2167 // ============================================================================================ 2168 private static final State[] STATES_EXPECTED_ON_OPEN = { State.OPENING, // Normal case 2169 State.OPEN // Retrying 2170 }; 2171 2172 private static final State[] STATES_EXPECTED_ON_CLOSING = { State.OPEN, // Normal case 2173 State.CLOSING, // Retrying 2174 State.SPLITTING, // Offline the split parent 2175 State.MERGING // Offline the merge parents 2176 }; 2177 2178 private static final State[] STATES_EXPECTED_ON_CLOSED = { State.CLOSING, // Normal case 2179 State.CLOSED // Retrying 2180 }; 2181 2182 // This is for manually scheduled region assign, can add other states later if we find out other 2183 // usages 2184 private static final State[] STATES_EXPECTED_ON_ASSIGN = { State.CLOSED, State.OFFLINE }; 2185 2186 // We only allow unassign or move a region which is in OPEN state. 2187 private static final State[] STATES_EXPECTED_ON_UNASSIGN_OR_MOVE = { State.OPEN }; 2188 2189 // ============================================================================================ 2190 // Region Status update 2191 // Should only be called in TransitRegionStateProcedure(and related procedures), as the locking 2192 // and pre-assumptions are very tricky. 2193 // ============================================================================================ 2194 private CompletableFuture<Void> transitStateAndUpdate(RegionStateNode regionNode, 2195 RegionState.State newState, RegionState.State... expectedStates) { 2196 RegionState.State state = regionNode.getState(); 2197 try { 2198 regionNode.transitionState(newState, expectedStates); 2199 } catch (UnexpectedStateException e) { 2200 return FutureUtils.failedFuture(e); 2201 } 2202 CompletableFuture<Void> future = regionStateStore.updateRegionLocation(regionNode); 2203 FutureUtils.addListener(future, (r, e) -> { 2204 if (e != null) { 2205 // revert 2206 regionNode.setState(state); 2207 } else { 2208 regionInTransitionTracker.handleRegionStateNodeOperation(regionNode); 2209 } 2210 }); 2211 return future; 2212 } 2213 2214 // should be called within the synchronized block of RegionStateNode 2215 CompletableFuture<Void> regionOpening(RegionStateNode regionNode) { 2216 // As in SCP, for performance reason, there is no TRSP attached with this region, we will not 2217 // update the region state, which means that the region could be in any state when we want to 2218 // assign it after a RS crash. So here we do not pass the expectedStates parameter. 2219 return transitStateAndUpdate(regionNode, State.OPENING).thenAccept(r -> { 2220 ServerStateNode serverNode = regionStates.getServerNode(regionNode.getRegionLocation()); 2221 // Here the server node could be null. For example, we want to assign the region to a given 2222 // region server and it crashes, and it is the region server which holds hbase:meta, then the 2223 // above transitStateAndUpdate call will never succeed until we finishes the SCP for it. But 2224 // after the SCP finishes, the server node will be removed, so when we arrive there, the 2225 // server 2226 // node will be null. This is not a big problem if we skip adding it, as later we will fail to 2227 // execute the remote procedure on the region server and then try to assign to another region 2228 // server 2229 if (serverNode != null) { 2230 serverNode.addRegion(regionNode); 2231 } 2232 // update the operation count metrics 2233 metrics.incrementOperationCounter(); 2234 }); 2235 } 2236 2237 // should be called under the RegionStateNode lock 2238 // The parameter 'giveUp' means whether we will try to open the region again, if it is true, then 2239 // we will persist the FAILED_OPEN state into hbase:meta. 2240 CompletableFuture<Void> regionFailedOpen(RegionStateNode regionNode, boolean giveUp) { 2241 RegionState.State state = regionNode.getState(); 2242 ServerName regionLocation = regionNode.getRegionLocation(); 2243 if (!giveUp) { 2244 if (regionLocation != null) { 2245 regionStates.removeRegionFromServer(regionLocation, regionNode); 2246 } 2247 return CompletableFuture.completedFuture(null); 2248 } 2249 regionNode.setState(State.FAILED_OPEN); 2250 regionNode.setRegionLocation(null); 2251 CompletableFuture<Void> future = regionStateStore.updateRegionLocation(regionNode); 2252 FutureUtils.addListener(future, (r, e) -> { 2253 if (e == null) { 2254 if (regionLocation != null) { 2255 regionStates.removeRegionFromServer(regionLocation, regionNode); 2256 } 2257 regionInTransitionTracker.handleRegionStateNodeOperation(regionNode); 2258 } else { 2259 // revert 2260 regionNode.setState(state); 2261 regionNode.setRegionLocation(regionLocation); 2262 } 2263 }); 2264 return future; 2265 } 2266 2267 // should be called under the RegionStateNode lock 2268 CompletableFuture<Void> regionClosing(RegionStateNode regionNode) { 2269 return transitStateAndUpdate(regionNode, State.CLOSING, STATES_EXPECTED_ON_CLOSING) 2270 .thenAccept(r -> { 2271 RegionInfo hri = regionNode.getRegionInfo(); 2272 // Set meta has not initialized early. so people trying to create/edit tables will wait 2273 if (isMetaRegion(hri)) { 2274 setMetaAssigned(hri, false); 2275 } 2276 // update the operation count metrics 2277 metrics.incrementOperationCounter(); 2278 }); 2279 } 2280 2281 // for open and close, they will first be persist to the procedure store in 2282 // RegionRemoteProcedureBase. So here we will first change the in memory state as it is considered 2283 // as succeeded if the persistence to procedure store is succeeded, and then when the 2284 // RegionRemoteProcedureBase is woken up, we will persist the RegionStateNode to hbase:meta. 2285 2286 // should be called under the RegionStateNode lock 2287 void regionOpenedWithoutPersistingToMeta(RegionStateNode regionNode) 2288 throws UnexpectedStateException { 2289 regionNode.transitionState(State.OPEN, STATES_EXPECTED_ON_OPEN); 2290 RegionInfo regionInfo = regionNode.getRegionInfo(); 2291 regionStates.addRegionToServer(regionNode); 2292 regionStates.removeFromFailedOpen(regionInfo); 2293 } 2294 2295 // should be called under the RegionStateNode lock 2296 void regionClosedWithoutPersistingToMeta(RegionStateNode regionNode) 2297 throws UnexpectedStateException { 2298 ServerName regionLocation = regionNode.getRegionLocation(); 2299 regionNode.transitionState(State.CLOSED, STATES_EXPECTED_ON_CLOSED); 2300 regionNode.setRegionLocation(null); 2301 if (regionLocation != null) { 2302 regionNode.setLastHost(regionLocation); 2303 regionStates.removeRegionFromServer(regionLocation, regionNode); 2304 } 2305 } 2306 2307 // should be called under the RegionStateNode lock 2308 CompletableFuture<Void> persistToMeta(RegionStateNode regionNode) { 2309 return regionStateStore.updateRegionLocation(regionNode).thenAccept(r -> { 2310 RegionInfo regionInfo = regionNode.getRegionInfo(); 2311 if (isMetaRegion(regionInfo) && regionNode.getState() == State.OPEN) { 2312 // Usually we'd set a table ENABLED at this stage but hbase:meta is ALWAYs enabled, it 2313 // can't be disabled -- so skip the RPC (besides... enabled is managed by TableStateManager 2314 // which is backed by hbase:meta... Avoid setting ENABLED to avoid having to update state 2315 // on table that contains state. 2316 setMetaAssigned(regionInfo, true); 2317 } 2318 regionInTransitionTracker.handleRegionStateNodeOperation(regionNode); 2319 }); 2320 } 2321 2322 // should be called under the RegionStateNode lock 2323 // for SCP 2324 public CompletableFuture<Void> regionClosedAbnormally(RegionStateNode regionNode) { 2325 RegionState.State state = regionNode.getState(); 2326 ServerName regionLocation = regionNode.getRegionLocation(); 2327 regionNode.setState(State.ABNORMALLY_CLOSED); 2328 regionNode.setRegionLocation(null); 2329 CompletableFuture<Void> future = regionStateStore.updateRegionLocation(regionNode); 2330 FutureUtils.addListener(future, (r, e) -> { 2331 if (e == null) { 2332 if (regionLocation != null) { 2333 regionNode.setLastHost(regionLocation); 2334 regionStates.removeRegionFromServer(regionLocation, regionNode); 2335 } 2336 regionInTransitionTracker.handleRegionStateNodeOperation(regionNode); 2337 } else { 2338 // revert 2339 regionNode.setState(state); 2340 regionNode.setRegionLocation(regionLocation); 2341 } 2342 }); 2343 return future; 2344 } 2345 2346 // ============================================================================================ 2347 // The above methods can only be called in TransitRegionStateProcedure(and related procedures) 2348 // ============================================================================================ 2349 2350 // As soon as a server a crashed, region hosting on that are un-available, this method helps to 2351 // track those un-available regions. This method can only be called from ServerCrashProcedure. 2352 public void markRegionsAsCrashed(List<RegionInfo> regionsOnCrashedServer, 2353 ServerCrashProcedure scp) { 2354 ServerName crashedServerName = scp.getServerName(); 2355 assert crashedServerName != null; 2356 for (RegionInfo regionInfo : regionsOnCrashedServer) { 2357 RegionStateNode node = regionStates.getOrCreateRegionStateNode(regionInfo); 2358 if (crashedServerName.equals(node.getRegionLocation())) { 2359 node.crashed(scp.getSubmittedTime()); 2360 regionInTransitionTracker.regionCrashed(node); 2361 } else { 2362 LOG.warn("Region {} should be on crashed region server {} but is recorded on {}", 2363 regionInfo, crashedServerName, node.getRegionLocation()); 2364 } 2365 } 2366 } 2367 2368 public void markRegionAsSplit(final RegionInfo parent, final ServerName serverName, 2369 final RegionInfo daughterA, final RegionInfo daughterB) throws IOException { 2370 // Update hbase:meta. Parent will be marked offline and split up in hbase:meta. 2371 // The parent stays in regionStates until cleared when removed by CatalogJanitor. 2372 // Update its state in regionStates to it shows as offline and split when read 2373 // later figuring what regions are in a table and what are not: see 2374 // regionStates#getRegionsOfTable 2375 final RegionStateNode node = regionStates.getOrCreateRegionStateNode(parent); 2376 node.setState(State.SPLIT); 2377 final RegionStateNode nodeA = regionStates.getOrCreateRegionStateNode(daughterA); 2378 nodeA.setState(State.SPLITTING_NEW); 2379 final RegionStateNode nodeB = regionStates.getOrCreateRegionStateNode(daughterB); 2380 nodeB.setState(State.SPLITTING_NEW); 2381 2382 TableDescriptor td = master.getTableDescriptors().get(parent.getTable()); 2383 // TODO: here we just update the parent region info in meta, to set split and offline to true, 2384 // without changing the one in the region node. This is a bit confusing but the region info 2385 // field in RegionStateNode is not expected to be changed in the current design. Need to find a 2386 // possible way to address this problem, or at least adding more comments about the trick to 2387 // deal with this problem, that when you want to filter out split parent, you need to check both 2388 // the RegionState on whether it is split, and also the region info. If one of them matches then 2389 // it is a split parent. And usually only one of them can match, as after restart, the region 2390 // state will be changed from SPLIT to CLOSED. 2391 regionStateStore.splitRegion(parent, daughterA, daughterB, serverName, td); 2392 regionInTransitionTracker.handleRegionStateNodeOperation(node); 2393 regionInTransitionTracker.handleRegionStateNodeOperation(nodeA); 2394 regionInTransitionTracker.handleRegionStateNodeOperation(nodeB); 2395 if (shouldAssignFavoredNodes(parent)) { 2396 List<ServerName> onlineServers = this.master.getServerManager().getOnlineServersList(); 2397 getFavoredNodePromoter().generateFavoredNodesForDaughter(onlineServers, parent, daughterA, 2398 daughterB); 2399 } 2400 } 2401 2402 /** 2403 * When called here, the merge has happened. The merged regions have been unassigned and the above 2404 * markRegionClosed has been called on each so they have been disassociated from a hosting Server. 2405 * The merged region will be open after this call. The merged regions are removed from hbase:meta 2406 * below. Later they are deleted from the filesystem by the catalog janitor running against 2407 * hbase:meta. It notices when the merged region no longer holds references to the old regions 2408 * (References are deleted after a compaction rewrites what the Reference points at but not until 2409 * the archiver chore runs, are the References removed). 2410 */ 2411 public void markRegionAsMerged(final RegionInfo child, final ServerName serverName, 2412 RegionInfo[] mergeParents) throws IOException { 2413 final RegionStateNode node = regionStates.getOrCreateRegionStateNode(child); 2414 for (RegionInfo ri : mergeParents) { 2415 regionStates.deleteRegion(ri); 2416 regionInTransitionTracker.handleRegionDelete(ri); 2417 } 2418 2419 TableDescriptor td = master.getTableDescriptors().get(child.getTable()); 2420 regionStateStore.mergeRegions(child, mergeParents, serverName, td); 2421 regionInTransitionTracker.handleRegionStateNodeOperation(node); 2422 if (shouldAssignFavoredNodes(child)) { 2423 getFavoredNodePromoter().generateFavoredNodesForMergedRegion(child, mergeParents); 2424 } 2425 } 2426 2427 /* 2428 * Favored nodes should be applied only when FavoredNodes balancer is configured and the region 2429 * belongs to a non-system table. 2430 */ 2431 private boolean shouldAssignFavoredNodes(RegionInfo region) { 2432 return this.shouldAssignRegionsWithFavoredNodes 2433 && FavoredNodesManager.isFavoredNodeApplicable(region); 2434 } 2435 2436 // ============================================================================================ 2437 // Assign Queue (Assign/Balance) 2438 // ============================================================================================ 2439 private final ArrayList<RegionStateNode> pendingAssignQueue = new ArrayList<RegionStateNode>(); 2440 private final ReentrantLock assignQueueLock = new ReentrantLock(); 2441 private final Condition assignQueueFullCond = assignQueueLock.newCondition(); 2442 2443 /** 2444 * Add the assign operation to the assignment queue. The pending assignment operation will be 2445 * processed, and each region will be assigned by a server using the balancer. 2446 */ 2447 protected void queueAssign(final RegionStateNode regionNode) { 2448 regionNode.getProcedureEvent().suspend(); 2449 2450 // TODO: quick-start for meta and the other sys-tables? 2451 assignQueueLock.lock(); 2452 try { 2453 pendingAssignQueue.add(regionNode); 2454 if ( 2455 regionNode.isSystemTable() || pendingAssignQueue.size() == 1 2456 || pendingAssignQueue.size() >= assignDispatchWaitQueueMaxSize 2457 ) { 2458 assignQueueFullCond.signal(); 2459 } 2460 } finally { 2461 assignQueueLock.unlock(); 2462 } 2463 } 2464 2465 private void startAssignmentThread() { 2466 assignThread = new Thread(master.getServerName().toShortString()) { 2467 @Override 2468 public void run() { 2469 while (isRunning()) { 2470 processAssignQueue(); 2471 } 2472 pendingAssignQueue.clear(); 2473 } 2474 }; 2475 assignThread.setDaemon(true); 2476 assignThread.start(); 2477 } 2478 2479 private void stopAssignmentThread() { 2480 assignQueueSignal(); 2481 try { 2482 while (assignThread.isAlive()) { 2483 assignQueueSignal(); 2484 assignThread.join(250); 2485 } 2486 } catch (InterruptedException e) { 2487 LOG.warn("join interrupted", e); 2488 Thread.currentThread().interrupt(); 2489 } 2490 } 2491 2492 private void assignQueueSignal() { 2493 assignQueueLock.lock(); 2494 try { 2495 assignQueueFullCond.signal(); 2496 } finally { 2497 assignQueueLock.unlock(); 2498 } 2499 } 2500 2501 @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP") 2502 private HashMap<RegionInfo, RegionStateNode> waitOnAssignQueue() { 2503 HashMap<RegionInfo, RegionStateNode> regions = null; 2504 2505 assignQueueLock.lock(); 2506 try { 2507 if (pendingAssignQueue.isEmpty() && isRunning()) { 2508 assignQueueFullCond.await(); 2509 } 2510 2511 if (!isRunning()) { 2512 return null; 2513 } 2514 assignQueueFullCond.await(assignDispatchWaitMillis, TimeUnit.MILLISECONDS); 2515 regions = new HashMap<RegionInfo, RegionStateNode>(pendingAssignQueue.size()); 2516 for (RegionStateNode regionNode : pendingAssignQueue) { 2517 regions.put(regionNode.getRegionInfo(), regionNode); 2518 } 2519 pendingAssignQueue.clear(); 2520 } catch (InterruptedException e) { 2521 LOG.warn("got interrupted ", e); 2522 Thread.currentThread().interrupt(); 2523 } finally { 2524 assignQueueLock.unlock(); 2525 } 2526 return regions; 2527 } 2528 2529 private void processAssignQueue() { 2530 final HashMap<RegionInfo, RegionStateNode> regions = waitOnAssignQueue(); 2531 if (regions == null || regions.size() == 0 || !isRunning()) { 2532 return; 2533 } 2534 2535 if (LOG.isTraceEnabled()) { 2536 LOG.trace("PROCESS ASSIGN QUEUE regionCount=" + regions.size()); 2537 } 2538 2539 // TODO: Optimize balancer. pass a RegionPlan? 2540 final HashMap<RegionInfo, ServerName> retainMap = new HashMap<>(); 2541 final List<RegionInfo> userHRIs = new ArrayList<>(regions.size()); 2542 // Regions for system tables requiring reassignment 2543 final List<RegionInfo> systemHRIs = new ArrayList<>(); 2544 for (RegionStateNode regionStateNode : regions.values()) { 2545 boolean sysTable = regionStateNode.isSystemTable(); 2546 final List<RegionInfo> hris = sysTable ? systemHRIs : userHRIs; 2547 if (regionStateNode.getRegionLocation() != null) { 2548 retainMap.put(regionStateNode.getRegionInfo(), regionStateNode.getRegionLocation()); 2549 } else { 2550 hris.add(regionStateNode.getRegionInfo()); 2551 } 2552 } 2553 2554 // TODO: connect with the listener to invalidate the cache 2555 2556 // TODO use events 2557 List<ServerName> servers = master.getServerManager().createDestinationServersList(); 2558 for (int i = 0; servers.size() < 1; ++i) { 2559 // Report every fourth time around this loop; try not to flood log. 2560 if (i % 4 == 0) { 2561 LOG.warn("No servers available; cannot place " + regions.size() + " unassigned regions."); 2562 } 2563 2564 if (!isRunning()) { 2565 LOG.debug("Stopped! Dropping assign of " + regions.size() + " queued regions."); 2566 return; 2567 } 2568 Threads.sleep(250); 2569 servers = master.getServerManager().createDestinationServersList(); 2570 } 2571 2572 if (!systemHRIs.isEmpty()) { 2573 // System table regions requiring reassignment are present, get region servers 2574 // not available for system table regions 2575 final List<ServerName> excludeServers = getExcludedServersForSystemTable(); 2576 List<ServerName> serversForSysTables = 2577 servers.stream().filter(s -> !excludeServers.contains(s)).collect(Collectors.toList()); 2578 if (serversForSysTables.isEmpty()) { 2579 LOG.warn("Filtering old server versions and the excluded produced an empty set; " 2580 + "instead considering all candidate servers!"); 2581 } 2582 LOG.debug("Processing assignQueue; systemServersCount=" + serversForSysTables.size() 2583 + ", allServersCount=" + servers.size()); 2584 processAssignmentPlans(regions, null, systemHRIs, 2585 serversForSysTables.isEmpty() && !containsBogusAssignments(regions, systemHRIs) 2586 ? servers 2587 : serversForSysTables); 2588 } 2589 2590 processAssignmentPlans(regions, retainMap, userHRIs, servers); 2591 } 2592 2593 private boolean containsBogusAssignments(Map<RegionInfo, RegionStateNode> regions, 2594 List<RegionInfo> hirs) { 2595 for (RegionInfo ri : hirs) { 2596 if ( 2597 regions.get(ri).getRegionLocation() != null 2598 && regions.get(ri).getRegionLocation().equals(LoadBalancer.BOGUS_SERVER_NAME) 2599 ) { 2600 return true; 2601 } 2602 } 2603 return false; 2604 } 2605 2606 private void processAssignmentPlans(final HashMap<RegionInfo, RegionStateNode> regions, 2607 final HashMap<RegionInfo, ServerName> retainMap, final List<RegionInfo> hris, 2608 final List<ServerName> servers) { 2609 boolean isTraceEnabled = LOG.isTraceEnabled(); 2610 if (isTraceEnabled) { 2611 LOG.trace("Available servers count=" + servers.size() + ": " + servers); 2612 } 2613 2614 final LoadBalancer balancer = getBalancer(); 2615 // ask the balancer where to place regions 2616 if (retainMap != null && !retainMap.isEmpty()) { 2617 if (isTraceEnabled) { 2618 LOG.trace("retain assign regions=" + retainMap); 2619 } 2620 try { 2621 acceptPlan(regions, balancer.retainAssignment(retainMap, servers)); 2622 } catch (IOException e) { 2623 LOG.warn("unable to retain assignment", e); 2624 addToPendingAssignment(regions, retainMap.keySet()); 2625 } 2626 } 2627 2628 // TODO: Do we need to split retain and round-robin? 2629 // the retain seems to fallback to round-robin/random if the region is not in the map. 2630 if (!hris.isEmpty()) { 2631 Collections.sort(hris, RegionInfo.COMPARATOR); 2632 if (isTraceEnabled) { 2633 LOG.trace("round robin regions=" + hris); 2634 } 2635 try { 2636 acceptPlan(regions, balancer.roundRobinAssignment(hris, servers)); 2637 } catch (IOException e) { 2638 LOG.warn("unable to round-robin assignment", e); 2639 addToPendingAssignment(regions, hris); 2640 } 2641 } 2642 } 2643 2644 private void acceptPlan(final HashMap<RegionInfo, RegionStateNode> regions, 2645 final Map<ServerName, List<RegionInfo>> plan) throws HBaseIOException { 2646 final ProcedureEvent<?>[] events = new ProcedureEvent[regions.size()]; 2647 final long st = EnvironmentEdgeManager.currentTime(); 2648 2649 if (plan.isEmpty()) { 2650 throw new HBaseIOException("unable to compute plans for regions=" + regions.size()); 2651 } 2652 2653 int evcount = 0; 2654 for (Map.Entry<ServerName, List<RegionInfo>> entry : plan.entrySet()) { 2655 final ServerName server = entry.getKey(); 2656 for (RegionInfo hri : entry.getValue()) { 2657 final RegionStateNode regionNode = regions.get(hri); 2658 regionNode.setRegionLocation(server); 2659 if (server.equals(LoadBalancer.BOGUS_SERVER_NAME) && regionNode.isSystemTable()) { 2660 assignQueueLock.lock(); 2661 try { 2662 pendingAssignQueue.add(regionNode); 2663 } finally { 2664 assignQueueLock.unlock(); 2665 } 2666 } else { 2667 events[evcount++] = regionNode.getProcedureEvent(); 2668 } 2669 } 2670 } 2671 ProcedureEvent.wakeEvents(getProcedureScheduler(), events); 2672 2673 final long et = EnvironmentEdgeManager.currentTime(); 2674 if (LOG.isTraceEnabled()) { 2675 LOG.trace("ASSIGN ACCEPT " + events.length + " -> " + StringUtils.humanTimeDiff(et - st)); 2676 } 2677 } 2678 2679 private void addToPendingAssignment(final HashMap<RegionInfo, RegionStateNode> regions, 2680 final Collection<RegionInfo> pendingRegions) { 2681 assignQueueLock.lock(); 2682 try { 2683 for (RegionInfo hri : pendingRegions) { 2684 pendingAssignQueue.add(regions.get(hri)); 2685 } 2686 } finally { 2687 assignQueueLock.unlock(); 2688 } 2689 } 2690 2691 /** 2692 * For a given cluster with mixed versions of servers, get a list of servers with lower versions, 2693 * where system table regions should not be assigned to. For system table, we must assign regions 2694 * to a server with highest version. However, we can disable this exclusion using config: 2695 * "hbase.min.version.move.system.tables" if checkForMinVersion is true. Detailed explanation 2696 * available with definition of minVersionToMoveSysTables. 2697 * @return List of Excluded servers for System table regions. 2698 */ 2699 public List<ServerName> getExcludedServersForSystemTable() { 2700 // TODO: This should be a cached list kept by the ServerManager rather than calculated on each 2701 // move or system region assign. The RegionServerTracker keeps list of online Servers with 2702 // RegionServerInfo that includes Version. 2703 List<Pair<ServerName, String>> serverList = 2704 master.getServerManager().getOnlineServersList().stream() 2705 .map(s -> new Pair<>(s, master.getRegionServerVersion(s))).collect(Collectors.toList()); 2706 if (serverList.isEmpty()) { 2707 return new ArrayList<>(); 2708 } 2709 String highestVersion = Collections 2710 .max(serverList, (o1, o2) -> VersionInfo.compareVersion(o1.getSecond(), o2.getSecond())) 2711 .getSecond(); 2712 if (!DEFAULT_MIN_VERSION_MOVE_SYS_TABLES_CONFIG.equals(minVersionToMoveSysTables)) { 2713 int comparedValue = VersionInfo.compareVersion(minVersionToMoveSysTables, highestVersion); 2714 if (comparedValue > 0) { 2715 return new ArrayList<>(); 2716 } 2717 } 2718 return serverList.stream().filter(pair -> !pair.getSecond().equals(highestVersion)) 2719 .map(Pair::getFirst).collect(Collectors.toList()); 2720 } 2721 2722 MasterServices getMaster() { 2723 return master; 2724 } 2725 2726 /** Returns a snapshot of rsReports */ 2727 public Map<ServerName, Set<byte[]>> getRSReports() { 2728 Map<ServerName, Set<byte[]>> rsReportsSnapshot = new HashMap<>(); 2729 synchronized (rsReports) { 2730 rsReports.entrySet().forEach(e -> rsReportsSnapshot.put(e.getKey(), e.getValue())); 2731 } 2732 return rsReportsSnapshot; 2733 } 2734 2735 /** 2736 * Provide regions state count for given table. e.g howmany regions of give table are 2737 * opened/closed/rit etc 2738 * @param tableName TableName 2739 * @return region states count 2740 */ 2741 public RegionStatesCount getRegionStatesCount(TableName tableName) { 2742 int openRegionsCount = 0; 2743 int closedRegionCount = 0; 2744 int ritCount = 0; 2745 int splitRegionCount = 0; 2746 int totalRegionCount = 0; 2747 if (!isTableDisabled(tableName)) { 2748 final List<RegionState> states = regionStates.getTableRegionStates(tableName); 2749 for (RegionState regionState : states) { 2750 if (regionState.isOpened()) { 2751 openRegionsCount++; 2752 } else if (regionState.isClosed()) { 2753 closedRegionCount++; 2754 } else if (regionState.isSplit()) { 2755 splitRegionCount++; 2756 } 2757 } 2758 totalRegionCount = states.size(); 2759 ritCount = totalRegionCount - openRegionsCount - splitRegionCount; 2760 } 2761 return new RegionStatesCount.RegionStatesCountBuilder().setOpenRegions(openRegionsCount) 2762 .setClosedRegions(closedRegionCount).setSplitRegions(splitRegionCount) 2763 .setRegionsInTransition(ritCount).setTotalRegions(totalRegionCount).build(); 2764 } 2765 2766}