001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.assignment; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Collection; 023import java.util.Collections; 024import java.util.HashMap; 025import java.util.HashSet; 026import java.util.List; 027import java.util.Map; 028import java.util.Set; 029import java.util.concurrent.Future; 030import java.util.concurrent.TimeUnit; 031import java.util.concurrent.atomic.AtomicBoolean; 032import java.util.concurrent.locks.Condition; 033import java.util.concurrent.locks.ReentrantLock; 034import java.util.stream.Collectors; 035import java.util.stream.Stream; 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.hbase.CatalogFamilyFormat; 038import org.apache.hadoop.hbase.DoNotRetryIOException; 039import org.apache.hadoop.hbase.HBaseIOException; 040import org.apache.hadoop.hbase.HConstants; 041import org.apache.hadoop.hbase.PleaseHoldException; 042import org.apache.hadoop.hbase.ServerName; 043import org.apache.hadoop.hbase.TableName; 044import org.apache.hadoop.hbase.UnknownRegionException; 045import org.apache.hadoop.hbase.client.DoNotRetryRegionException; 046import org.apache.hadoop.hbase.client.MasterSwitchType; 047import org.apache.hadoop.hbase.client.RegionInfo; 048import org.apache.hadoop.hbase.client.RegionInfoBuilder; 049import org.apache.hadoop.hbase.client.RegionReplicaUtil; 050import org.apache.hadoop.hbase.client.RegionStatesCount; 051import org.apache.hadoop.hbase.client.Result; 052import org.apache.hadoop.hbase.client.ResultScanner; 053import org.apache.hadoop.hbase.client.Scan; 054import org.apache.hadoop.hbase.client.TableDescriptor; 055import org.apache.hadoop.hbase.client.TableState; 056import org.apache.hadoop.hbase.exceptions.UnexpectedStateException; 057import org.apache.hadoop.hbase.favored.FavoredNodesManager; 058import org.apache.hadoop.hbase.favored.FavoredNodesPromoter; 059import org.apache.hadoop.hbase.master.LoadBalancer; 060import org.apache.hadoop.hbase.master.MasterServices; 061import org.apache.hadoop.hbase.master.MetricsAssignmentManager; 062import org.apache.hadoop.hbase.master.RegionPlan; 063import org.apache.hadoop.hbase.master.RegionState; 064import org.apache.hadoop.hbase.master.RegionState.State; 065import org.apache.hadoop.hbase.master.ServerManager; 066import org.apache.hadoop.hbase.master.TableStateManager; 067import org.apache.hadoop.hbase.master.balancer.FavoredStochasticBalancer; 068import org.apache.hadoop.hbase.master.procedure.HBCKServerCrashProcedure; 069import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 070import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler; 071import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait; 072import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure; 073import org.apache.hadoop.hbase.master.region.MasterRegion; 074import org.apache.hadoop.hbase.procedure2.Procedure; 075import org.apache.hadoop.hbase.procedure2.ProcedureEvent; 076import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; 077import org.apache.hadoop.hbase.procedure2.ProcedureInMemoryChore; 078import org.apache.hadoop.hbase.procedure2.util.StringUtils; 079import org.apache.hadoop.hbase.regionserver.SequenceId; 080import org.apache.hadoop.hbase.rsgroup.RSGroupBasedLoadBalancer; 081import org.apache.hadoop.hbase.util.Bytes; 082import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 083import org.apache.hadoop.hbase.util.Pair; 084import org.apache.hadoop.hbase.util.Threads; 085import org.apache.hadoop.hbase.util.VersionInfo; 086import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; 087import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 088import org.apache.yetus.audience.InterfaceAudience; 089import org.apache.zookeeper.KeeperException; 090import org.slf4j.Logger; 091import org.slf4j.LoggerFactory; 092 093import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 094import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition; 095import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; 096import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest; 097import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse; 098 099/** 100 * The AssignmentManager is the coordinator for region assign/unassign operations. 101 * <ul> 102 * <li>In-memory states of regions and servers are stored in {@link RegionStates}.</li> 103 * <li>hbase:meta state updates are handled by {@link RegionStateStore}.</li> 104 * </ul> 105 * Regions are created by CreateTable, Split, Merge. Regions are deleted by DeleteTable, Split, 106 * Merge. Assigns are triggered by CreateTable, EnableTable, Split, Merge, ServerCrash. Unassigns 107 * are triggered by DisableTable, Split, Merge 108 */ 109@InterfaceAudience.Private 110public class AssignmentManager { 111 private static final Logger LOG = LoggerFactory.getLogger(AssignmentManager.class); 112 113 // TODO: AMv2 114 // - handle region migration from hbase1 to hbase2. 115 // - handle sys table assignment first (e.g. acl, namespace) 116 // - handle table priorities 117 // - If ServerBusyException trying to update hbase:meta, we abort the Master 118 // See updateRegionLocation in RegionStateStore. 119 // 120 // See also 121 // https://docs.google.com/document/d/1eVKa7FHdeoJ1-9o8yZcOTAQbv0u0bblBlCCzVSIn69g/edit#heading=h.ystjyrkbtoq5 122 // for other TODOs. 123 124 public static final String BOOTSTRAP_THREAD_POOL_SIZE_CONF_KEY = 125 "hbase.assignment.bootstrap.thread.pool.size"; 126 127 public static final String ASSIGN_DISPATCH_WAIT_MSEC_CONF_KEY = 128 "hbase.assignment.dispatch.wait.msec"; 129 private static final int DEFAULT_ASSIGN_DISPATCH_WAIT_MSEC = 150; 130 131 public static final String ASSIGN_DISPATCH_WAITQ_MAX_CONF_KEY = 132 "hbase.assignment.dispatch.wait.queue.max.size"; 133 private static final int DEFAULT_ASSIGN_DISPATCH_WAITQ_MAX = 100; 134 135 public static final String RIT_CHORE_INTERVAL_MSEC_CONF_KEY = 136 "hbase.assignment.rit.chore.interval.msec"; 137 private static final int DEFAULT_RIT_CHORE_INTERVAL_MSEC = 60 * 1000; 138 139 public static final String DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC_CONF_KEY = 140 "hbase.assignment.dead.region.metric.chore.interval.msec"; 141 private static final int DEFAULT_DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC = 120 * 1000; 142 143 public static final String ASSIGN_MAX_ATTEMPTS = "hbase.assignment.maximum.attempts"; 144 private static final int DEFAULT_ASSIGN_MAX_ATTEMPTS = Integer.MAX_VALUE; 145 146 public static final String ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS = 147 "hbase.assignment.retry.immediately.maximum.attempts"; 148 private static final int DEFAULT_ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS = 3; 149 150 /** Region in Transition metrics threshold time */ 151 public static final String METRICS_RIT_STUCK_WARNING_THRESHOLD = 152 "hbase.metrics.rit.stuck.warning.threshold"; 153 private static final int DEFAULT_RIT_STUCK_WARNING_THRESHOLD = 60 * 1000; 154 public static final String UNEXPECTED_STATE_REGION = "Unexpected state for "; 155 156 public static final String FORCE_REGION_RETAINMENT = "hbase.master.scp.retain.assignment.force"; 157 158 public static final boolean DEFAULT_FORCE_REGION_RETAINMENT = false; 159 160 /** The wait time in millis before checking again if the region's previous RS is back online */ 161 public static final String FORCE_REGION_RETAINMENT_WAIT_INTERVAL = 162 "hbase.master.scp.retain.assignment.force.wait-interval"; 163 164 public static final long DEFAULT_FORCE_REGION_RETAINMENT_WAIT_INTERVAL = 50; 165 166 /** 167 * The number of times to check if the region's previous RS is back online, before giving up and 168 * proceeding with assignment on a new RS 169 */ 170 public static final String FORCE_REGION_RETAINMENT_RETRIES = 171 "hbase.master.scp.retain.assignment.force.retries"; 172 173 public static final int DEFAULT_FORCE_REGION_RETAINMENT_RETRIES = 600; 174 175 private final ProcedureEvent<?> metaAssignEvent = new ProcedureEvent<>("meta assign"); 176 private final ProcedureEvent<?> metaLoadEvent = new ProcedureEvent<>("meta load"); 177 178 private final MetricsAssignmentManager metrics; 179 private final RegionInTransitionChore ritChore; 180 private final DeadServerMetricRegionChore deadMetricChore; 181 private final MasterServices master; 182 183 private final AtomicBoolean running = new AtomicBoolean(false); 184 private final RegionStates regionStates = new RegionStates(); 185 private final RegionStateStore regionStateStore; 186 187 /** 188 * When the operator uses this configuration option, any version between the current cluster 189 * version and the value of "hbase.min.version.move.system.tables" does not trigger any 190 * auto-region movement. Auto-region movement here refers to auto-migration of system table 191 * regions to newer server versions. It is assumed that the configured range of versions does not 192 * require special handling of moving system table regions to higher versioned RegionServer. This 193 * auto-migration is done by {@link #checkIfShouldMoveSystemRegionAsync()}. Example: Let's assume 194 * the cluster is on version 1.4.0 and we have set "hbase.min.version.move.system.tables" as 195 * "2.0.0". Now if we upgrade one RegionServer on 1.4.0 cluster to 1.6.0 (< 2.0.0), then 196 * AssignmentManager will not move hbase:meta, hbase:namespace and other system table regions to 197 * newly brought up RegionServer 1.6.0 as part of auto-migration. However, if we upgrade one 198 * RegionServer on 1.4.0 cluster to 2.2.0 (> 2.0.0), then AssignmentManager will move all system 199 * table regions to newly brought up RegionServer 2.2.0 as part of auto-migration done by 200 * {@link #checkIfShouldMoveSystemRegionAsync()}. "hbase.min.version.move.system.tables" is 201 * introduced as part of HBASE-22923. 202 */ 203 private final String minVersionToMoveSysTables; 204 205 private static final String MIN_VERSION_MOVE_SYS_TABLES_CONFIG = 206 "hbase.min.version.move.system.tables"; 207 private static final String DEFAULT_MIN_VERSION_MOVE_SYS_TABLES_CONFIG = ""; 208 209 private final Map<ServerName, Set<byte[]>> rsReports = new HashMap<>(); 210 211 private final boolean shouldAssignRegionsWithFavoredNodes; 212 private final int assignDispatchWaitQueueMaxSize; 213 private final int assignDispatchWaitMillis; 214 private final int assignMaxAttempts; 215 private final int assignRetryImmediatelyMaxAttempts; 216 217 private final MasterRegion masterRegion; 218 219 private final Object checkIfShouldMoveSystemRegionLock = new Object(); 220 221 private Thread assignThread; 222 223 private final boolean forceRegionRetainment; 224 225 private final long forceRegionRetainmentWaitInterval; 226 227 private final int forceRegionRetainmentRetries; 228 229 public AssignmentManager(MasterServices master, MasterRegion masterRegion) { 230 this(master, masterRegion, new RegionStateStore(master, masterRegion)); 231 } 232 233 AssignmentManager(MasterServices master, MasterRegion masterRegion, RegionStateStore stateStore) { 234 this.master = master; 235 this.regionStateStore = stateStore; 236 this.metrics = new MetricsAssignmentManager(); 237 this.masterRegion = masterRegion; 238 239 final Configuration conf = master.getConfiguration(); 240 241 // Only read favored nodes if using the favored nodes load balancer. 242 this.shouldAssignRegionsWithFavoredNodes = FavoredStochasticBalancer.class 243 .isAssignableFrom(conf.getClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, Object.class)); 244 245 this.assignDispatchWaitMillis = 246 conf.getInt(ASSIGN_DISPATCH_WAIT_MSEC_CONF_KEY, DEFAULT_ASSIGN_DISPATCH_WAIT_MSEC); 247 this.assignDispatchWaitQueueMaxSize = 248 conf.getInt(ASSIGN_DISPATCH_WAITQ_MAX_CONF_KEY, DEFAULT_ASSIGN_DISPATCH_WAITQ_MAX); 249 250 this.assignMaxAttempts = 251 Math.max(1, conf.getInt(ASSIGN_MAX_ATTEMPTS, DEFAULT_ASSIGN_MAX_ATTEMPTS)); 252 this.assignRetryImmediatelyMaxAttempts = conf.getInt(ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS, 253 DEFAULT_ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS); 254 255 int ritChoreInterval = 256 conf.getInt(RIT_CHORE_INTERVAL_MSEC_CONF_KEY, DEFAULT_RIT_CHORE_INTERVAL_MSEC); 257 this.ritChore = new RegionInTransitionChore(ritChoreInterval); 258 259 int deadRegionChoreInterval = conf.getInt(DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC_CONF_KEY, 260 DEFAULT_DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC); 261 if (deadRegionChoreInterval > 0) { 262 this.deadMetricChore = new DeadServerMetricRegionChore(deadRegionChoreInterval); 263 } else { 264 this.deadMetricChore = null; 265 } 266 minVersionToMoveSysTables = 267 conf.get(MIN_VERSION_MOVE_SYS_TABLES_CONFIG, DEFAULT_MIN_VERSION_MOVE_SYS_TABLES_CONFIG); 268 269 forceRegionRetainment = 270 conf.getBoolean(FORCE_REGION_RETAINMENT, DEFAULT_FORCE_REGION_RETAINMENT); 271 forceRegionRetainmentWaitInterval = conf.getLong(FORCE_REGION_RETAINMENT_WAIT_INTERVAL, 272 DEFAULT_FORCE_REGION_RETAINMENT_WAIT_INTERVAL); 273 forceRegionRetainmentRetries = 274 conf.getInt(FORCE_REGION_RETAINMENT_RETRIES, DEFAULT_FORCE_REGION_RETAINMENT_RETRIES); 275 } 276 277 private void mirrorMetaLocations() throws IOException, KeeperException { 278 // For compatibility, mirror the meta region state to zookeeper 279 // And we still need to use zookeeper to publish the meta region locations to region 280 // server, so they can serve as ClientMetaService 281 ZKWatcher zk = master.getZooKeeper(); 282 if (zk == null || !zk.getRecoverableZooKeeper().getState().isAlive()) { 283 // this is possible in tests, we do not provide a zk watcher or the zk watcher has been closed 284 return; 285 } 286 Collection<RegionStateNode> metaStates = regionStates.getRegionStateNodes(); 287 for (RegionStateNode metaState : metaStates) { 288 MetaTableLocator.setMetaLocation(zk, metaState.getRegionLocation(), 289 metaState.getRegionInfo().getReplicaId(), metaState.getState()); 290 } 291 int replicaCount = metaStates.size(); 292 // remove extra mirror locations 293 for (String znode : zk.getMetaReplicaNodes()) { 294 int replicaId = zk.getZNodePaths().getMetaReplicaIdFromZNode(znode); 295 if (replicaId >= replicaCount) { 296 MetaTableLocator.deleteMetaLocation(zk, replicaId); 297 } 298 } 299 } 300 301 public void start() throws IOException, KeeperException { 302 if (!running.compareAndSet(false, true)) { 303 return; 304 } 305 306 LOG.trace("Starting assignment manager"); 307 308 // Start the Assignment Thread 309 startAssignmentThread(); 310 // load meta region states. 311 // here we are still in the early steps of active master startup. There is only one thread(us) 312 // can access AssignmentManager and create region node, so here we do not need to lock the 313 // region node. 314 try (ResultScanner scanner = 315 masterRegion.getScanner(new Scan().addFamily(HConstants.CATALOG_FAMILY))) { 316 for (;;) { 317 Result result = scanner.next(); 318 if (result == null) { 319 break; 320 } 321 RegionStateStore 322 .visitMetaEntry((r, regionInfo, state, regionLocation, lastHost, openSeqNum) -> { 323 RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo); 324 regionNode.setState(state); 325 regionNode.setLastHost(lastHost); 326 regionNode.setRegionLocation(regionLocation); 327 regionNode.setOpenSeqNum(openSeqNum); 328 if (regionNode.getProcedure() != null) { 329 regionNode.getProcedure().stateLoaded(this, regionNode); 330 } 331 if (regionLocation != null) { 332 regionStates.addRegionToServer(regionNode); 333 } 334 if (RegionReplicaUtil.isDefaultReplica(regionInfo.getReplicaId())) { 335 setMetaAssigned(regionInfo, state == State.OPEN); 336 } 337 LOG.debug("Loaded hbase:meta {}", regionNode); 338 }, result); 339 } 340 } 341 mirrorMetaLocations(); 342 } 343 344 /** 345 * Create RegionStateNode based on the TRSP list, and attach the TRSP to the RegionStateNode. 346 * <p> 347 * This is used to restore the RIT region list, so we do not need to restore it in the loadingMeta 348 * method below. And it is also very important as now before submitting a TRSP, we need to attach 349 * it to the RegionStateNode, which acts like a guard, so we need to restore this information at 350 * the very beginning, before we start processing any procedures. 351 */ 352 public void setupRIT(List<TransitRegionStateProcedure> procs) { 353 procs.forEach(proc -> { 354 RegionInfo regionInfo = proc.getRegion(); 355 RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo); 356 TransitRegionStateProcedure existingProc = regionNode.getProcedure(); 357 if (existingProc != null) { 358 // This is possible, as we will detach the procedure from the RSN before we 359 // actually finish the procedure. This is because that, we will detach the TRSP from the RSN 360 // during execution, at that time, the procedure has not been marked as done in the pv2 361 // framework yet, so it is possible that we schedule a new TRSP immediately and when 362 // arriving here, we will find out that there are multiple TRSPs for the region. But we can 363 // make sure that, only the last one can take the charge, the previous ones should have all 364 // been finished already. So here we will compare the proc id, the greater one will win. 365 if (existingProc.getProcId() < proc.getProcId()) { 366 // the new one wins, unset and set it to the new one below 367 regionNode.unsetProcedure(existingProc); 368 } else { 369 // the old one wins, skip 370 return; 371 } 372 } 373 LOG.info("Attach {} to {} to restore RIT", proc, regionNode); 374 regionNode.setProcedure(proc); 375 }); 376 } 377 378 public void stop() { 379 if (!running.compareAndSet(true, false)) { 380 return; 381 } 382 383 LOG.info("Stopping assignment manager"); 384 385 // The AM is started before the procedure executor, 386 // but the actual work will be loaded/submitted only once we have the executor 387 final boolean hasProcExecutor = master.getMasterProcedureExecutor() != null; 388 389 // Remove the RIT chore 390 if (hasProcExecutor) { 391 master.getMasterProcedureExecutor().removeChore(this.ritChore); 392 if (this.deadMetricChore != null) { 393 master.getMasterProcedureExecutor().removeChore(this.deadMetricChore); 394 } 395 } 396 397 // Stop the Assignment Thread 398 stopAssignmentThread(); 399 400 // Stop the RegionStateStore 401 regionStates.clear(); 402 403 // Update meta events (for testing) 404 if (hasProcExecutor) { 405 metaLoadEvent.suspend(); 406 for (RegionInfo hri : getMetaRegionSet()) { 407 setMetaAssigned(hri, false); 408 } 409 } 410 } 411 412 public boolean isRunning() { 413 return running.get(); 414 } 415 416 public Configuration getConfiguration() { 417 return master.getConfiguration(); 418 } 419 420 public MetricsAssignmentManager getAssignmentManagerMetrics() { 421 return metrics; 422 } 423 424 private LoadBalancer getBalancer() { 425 return master.getLoadBalancer(); 426 } 427 428 private FavoredNodesPromoter getFavoredNodePromoter() { 429 return (FavoredNodesPromoter) ((RSGroupBasedLoadBalancer) master.getLoadBalancer()) 430 .getInternalBalancer(); 431 } 432 433 private MasterProcedureEnv getProcedureEnvironment() { 434 return master.getMasterProcedureExecutor().getEnvironment(); 435 } 436 437 private MasterProcedureScheduler getProcedureScheduler() { 438 return getProcedureEnvironment().getProcedureScheduler(); 439 } 440 441 int getAssignMaxAttempts() { 442 return assignMaxAttempts; 443 } 444 445 public boolean isForceRegionRetainment() { 446 return forceRegionRetainment; 447 } 448 449 public long getForceRegionRetainmentWaitInterval() { 450 return forceRegionRetainmentWaitInterval; 451 } 452 453 public int getForceRegionRetainmentRetries() { 454 return forceRegionRetainmentRetries; 455 } 456 457 int getAssignRetryImmediatelyMaxAttempts() { 458 return assignRetryImmediatelyMaxAttempts; 459 } 460 461 public RegionStates getRegionStates() { 462 return regionStates; 463 } 464 465 /** 466 * Returns the regions hosted by the specified server. 467 * <p/> 468 * Notice that, for SCP, after we submit the SCP, no one can change the region list for the 469 * ServerStateNode so we do not need any locks here. And for other usage, this can only give you a 470 * snapshot of the current region list for this server, which means, right after you get the 471 * region list, new regions may be moved to this server or some regions may be moved out from this 472 * server, so you should not use it critically if you need strong consistency. 473 */ 474 public List<RegionInfo> getRegionsOnServer(ServerName serverName) { 475 ServerStateNode serverInfo = regionStates.getServerNode(serverName); 476 if (serverInfo == null) { 477 return Collections.emptyList(); 478 } 479 return serverInfo.getRegionInfoList(); 480 } 481 482 private RegionInfo getRegionInfo(RegionStateNode rsn) { 483 if (rsn.isSplit() && !rsn.getRegionInfo().isSplit()) { 484 // see the comments in markRegionAsSplit on why we need to do this converting. 485 return RegionInfoBuilder.newBuilder(rsn.getRegionInfo()).setSplit(true).setOffline(true) 486 .build(); 487 } else { 488 return rsn.getRegionInfo(); 489 } 490 } 491 492 private Stream<RegionStateNode> getRegionStateNodes(TableName tableName, 493 boolean excludeOfflinedSplitParents) { 494 Stream<RegionStateNode> stream = regionStates.getTableRegionStateNodes(tableName).stream(); 495 if (excludeOfflinedSplitParents) { 496 return stream.filter(rsn -> !rsn.isSplit()); 497 } else { 498 return stream; 499 } 500 } 501 502 public List<RegionInfo> getTableRegions(TableName tableName, 503 boolean excludeOfflinedSplitParents) { 504 return getRegionStateNodes(tableName, excludeOfflinedSplitParents).map(this::getRegionInfo) 505 .collect(Collectors.toList()); 506 } 507 508 public List<Pair<RegionInfo, ServerName>> getTableRegionsAndLocations(TableName tableName, 509 boolean excludeOfflinedSplitParents) { 510 return getRegionStateNodes(tableName, excludeOfflinedSplitParents) 511 .map(rsn -> Pair.newPair(getRegionInfo(rsn), rsn.getRegionLocation())) 512 .collect(Collectors.toList()); 513 } 514 515 public RegionStateStore getRegionStateStore() { 516 return regionStateStore; 517 } 518 519 public List<ServerName> getFavoredNodes(final RegionInfo regionInfo) { 520 return this.shouldAssignRegionsWithFavoredNodes 521 ? getFavoredNodePromoter().getFavoredNodes(regionInfo) 522 : ServerName.EMPTY_SERVER_LIST; 523 } 524 525 // ============================================================================================ 526 // Table State Manager helpers 527 // ============================================================================================ 528 private TableStateManager getTableStateManager() { 529 return master.getTableStateManager(); 530 } 531 532 private boolean isTableEnabled(final TableName tableName) { 533 return getTableStateManager().isTableState(tableName, TableState.State.ENABLED); 534 } 535 536 private boolean isTableDisabled(final TableName tableName) { 537 return getTableStateManager().isTableState(tableName, TableState.State.DISABLED, 538 TableState.State.DISABLING); 539 } 540 541 // ============================================================================================ 542 // META Helpers 543 // ============================================================================================ 544 private boolean isMetaRegion(final RegionInfo regionInfo) { 545 return regionInfo.isMetaRegion(); 546 } 547 548 public boolean isMetaRegion(final byte[] regionName) { 549 return getMetaRegionFromName(regionName) != null; 550 } 551 552 public RegionInfo getMetaRegionFromName(final byte[] regionName) { 553 for (RegionInfo hri : getMetaRegionSet()) { 554 if (Bytes.equals(hri.getRegionName(), regionName)) { 555 return hri; 556 } 557 } 558 return null; 559 } 560 561 public boolean isCarryingMeta(final ServerName serverName) { 562 // TODO: handle multiple meta 563 return isCarryingRegion(serverName, RegionInfoBuilder.FIRST_META_REGIONINFO); 564 } 565 566 private boolean isCarryingRegion(final ServerName serverName, final RegionInfo regionInfo) { 567 // TODO: check for state? 568 final RegionStateNode node = regionStates.getRegionStateNode(regionInfo); 569 return (node != null && serverName.equals(node.getRegionLocation())); 570 } 571 572 private RegionInfo getMetaForRegion(final RegionInfo regionInfo) { 573 // if (regionInfo.isMetaRegion()) return regionInfo; 574 // TODO: handle multiple meta. if the region provided is not meta lookup 575 // which meta the region belongs to. 576 return RegionInfoBuilder.FIRST_META_REGIONINFO; 577 } 578 579 // TODO: handle multiple meta. 580 private static final Set<RegionInfo> META_REGION_SET = 581 Collections.singleton(RegionInfoBuilder.FIRST_META_REGIONINFO); 582 583 public Set<RegionInfo> getMetaRegionSet() { 584 return META_REGION_SET; 585 } 586 587 // ============================================================================================ 588 // META Event(s) helpers 589 // ============================================================================================ 590 /** 591 * Notice that, this only means the meta region is available on a RS, but the AM may still be 592 * loading the region states from meta, so usually you need to check {@link #isMetaLoaded()} first 593 * before checking this method, unless you can make sure that your piece of code can only be 594 * executed after AM builds the region states. 595 * @see #isMetaLoaded() 596 */ 597 public boolean isMetaAssigned() { 598 return metaAssignEvent.isReady(); 599 } 600 601 public boolean isMetaRegionInTransition() { 602 return !isMetaAssigned(); 603 } 604 605 /** 606 * Notice that this event does not mean the AM has already finished region state rebuilding. See 607 * the comment of {@link #isMetaAssigned()} for more details. 608 * @see #isMetaAssigned() 609 */ 610 public boolean waitMetaAssigned(Procedure<?> proc, RegionInfo regionInfo) { 611 return getMetaAssignEvent(getMetaForRegion(regionInfo)).suspendIfNotReady(proc); 612 } 613 614 private void setMetaAssigned(RegionInfo metaRegionInfo, boolean assigned) { 615 assert isMetaRegion(metaRegionInfo) : "unexpected non-meta region " + metaRegionInfo; 616 ProcedureEvent<?> metaAssignEvent = getMetaAssignEvent(metaRegionInfo); 617 if (assigned) { 618 metaAssignEvent.wake(getProcedureScheduler()); 619 } else { 620 metaAssignEvent.suspend(); 621 } 622 } 623 624 private ProcedureEvent<?> getMetaAssignEvent(RegionInfo metaRegionInfo) { 625 assert isMetaRegion(metaRegionInfo) : "unexpected non-meta region " + metaRegionInfo; 626 // TODO: handle multiple meta. 627 return metaAssignEvent; 628 } 629 630 /** 631 * Wait until AM finishes the meta loading, i.e, the region states rebuilding. 632 * @see #isMetaLoaded() 633 * @see #waitMetaAssigned(Procedure, RegionInfo) 634 */ 635 public boolean waitMetaLoaded(Procedure<?> proc) { 636 return metaLoadEvent.suspendIfNotReady(proc); 637 } 638 639 /** 640 * This method will be called in master initialization method after calling 641 * {@link #processOfflineRegions()}, as in processOfflineRegions we will generate assign 642 * procedures for offline regions, which may be conflict with creating table. 643 * <p/> 644 * This is a bit dirty, should be reconsidered after we decide whether to keep the 645 * {@link #processOfflineRegions()} method. 646 */ 647 public void wakeMetaLoadedEvent() { 648 metaLoadEvent.wake(getProcedureScheduler()); 649 assert isMetaLoaded() : "expected meta to be loaded"; 650 } 651 652 /** 653 * Return whether AM finishes the meta loading, i.e, the region states rebuilding. 654 * @see #isMetaAssigned() 655 * @see #waitMetaLoaded(Procedure) 656 */ 657 public boolean isMetaLoaded() { 658 return metaLoadEvent.isReady(); 659 } 660 661 /** 662 * Start a new thread to check if there are region servers whose versions are higher than others. 663 * If so, move all system table regions to RS with the highest version to keep compatibility. The 664 * reason is, RS in new version may not be able to access RS in old version when there are some 665 * incompatible changes. 666 * <p> 667 * This method is called when a new RegionServer is added to cluster only. 668 * </p> 669 */ 670 public void checkIfShouldMoveSystemRegionAsync() { 671 // TODO: Fix this thread. If a server is killed and a new one started, this thread thinks that 672 // it should 'move' the system tables from the old server to the new server but 673 // ServerCrashProcedure is on it; and it will take care of the assign without dataloss. 674 if (this.master.getServerManager().countOfRegionServers() <= 1) { 675 return; 676 } 677 // This thread used to run whenever there was a change in the cluster. The ZooKeeper 678 // childrenChanged notification came in before the nodeDeleted message and so this method 679 // cold run before a ServerCrashProcedure could run. That meant that this thread could see 680 // a Crashed Server before ServerCrashProcedure and it could find system regions on the 681 // crashed server and go move them before ServerCrashProcedure had a chance; could be 682 // dataloss too if WALs were not recovered. 683 new Thread(() -> { 684 try { 685 synchronized (checkIfShouldMoveSystemRegionLock) { 686 List<RegionPlan> plans = new ArrayList<>(); 687 // TODO: I don't think this code does a good job if all servers in cluster have same 688 // version. It looks like it will schedule unnecessary moves. 689 for (ServerName server : getExcludedServersForSystemTable()) { 690 if (master.getServerManager().isServerDead(server)) { 691 // TODO: See HBASE-18494 and HBASE-18495. Though getExcludedServersForSystemTable() 692 // considers only online servers, the server could be queued for dead server 693 // processing. As region assignments for crashed server is handled by 694 // ServerCrashProcedure, do NOT handle them here. The goal is to handle this through 695 // regular flow of LoadBalancer as a favored node and not to have this special 696 // handling. 697 continue; 698 } 699 List<RegionInfo> regionsShouldMove = getSystemTables(server); 700 if (!regionsShouldMove.isEmpty()) { 701 for (RegionInfo regionInfo : regionsShouldMove) { 702 // null value for dest forces destination server to be selected by balancer 703 RegionPlan plan = new RegionPlan(regionInfo, server, null); 704 if (regionInfo.isMetaRegion()) { 705 // Must move meta region first. 706 LOG.info("Async MOVE of {} to newer Server={}", regionInfo.getEncodedName(), 707 server); 708 moveAsync(plan); 709 } else { 710 plans.add(plan); 711 } 712 } 713 } 714 for (RegionPlan plan : plans) { 715 LOG.info("Async MOVE of {} to newer Server={}", plan.getRegionInfo().getEncodedName(), 716 server); 717 moveAsync(plan); 718 } 719 } 720 } 721 } catch (Throwable t) { 722 LOG.error(t.toString(), t); 723 } 724 }).start(); 725 } 726 727 private List<RegionInfo> getSystemTables(ServerName serverName) { 728 ServerStateNode serverNode = regionStates.getServerNode(serverName); 729 if (serverNode == null) { 730 return Collections.emptyList(); 731 } 732 return serverNode.getSystemRegionInfoList(); 733 } 734 735 private void preTransitCheck(RegionStateNode regionNode, RegionState.State[] expectedStates) 736 throws HBaseIOException { 737 if (regionNode.getProcedure() != null) { 738 throw new HBaseIOException( 739 regionNode + " is currently in transition; pid=" + regionNode.getProcedure().getProcId()); 740 } 741 if (!regionNode.isInState(expectedStates)) { 742 throw new DoNotRetryRegionException(UNEXPECTED_STATE_REGION + regionNode); 743 } 744 if (isTableDisabled(regionNode.getTable())) { 745 throw new DoNotRetryIOException(regionNode.getTable() + " is disabled for " + regionNode); 746 } 747 } 748 749 /** 750 * Create an assign TransitRegionStateProcedure. Makes sure of RegionState. Throws exception if 751 * not appropriate UNLESS override is set. Used by hbck2 but also by straightline 752 * {@link #assign(RegionInfo, ServerName)} and {@link #assignAsync(RegionInfo, ServerName)}. 753 * @see #createAssignProcedure(RegionStateNode, ServerName) for a version that does NO checking 754 * used when only when no checking needed. 755 * @param override If false, check RegionState is appropriate for assign; if not throw exception. 756 */ 757 private TransitRegionStateProcedure createAssignProcedure(RegionInfo regionInfo, ServerName sn, 758 boolean override) throws IOException { 759 RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo); 760 regionNode.lock(); 761 try { 762 if (override) { 763 if (regionNode.getProcedure() != null) { 764 regionNode.unsetProcedure(regionNode.getProcedure()); 765 } 766 } else { 767 preTransitCheck(regionNode, STATES_EXPECTED_ON_ASSIGN); 768 } 769 assert regionNode.getProcedure() == null; 770 return regionNode.setProcedure( 771 TransitRegionStateProcedure.assign(getProcedureEnvironment(), regionInfo, sn)); 772 } finally { 773 regionNode.unlock(); 774 } 775 } 776 777 /** 778 * Create an assign TransitRegionStateProcedure. Does NO checking of RegionState. Presumes 779 * appriopriate state ripe for assign. 780 * @see #createAssignProcedure(RegionInfo, ServerName, boolean) 781 */ 782 private TransitRegionStateProcedure createAssignProcedure(RegionStateNode regionNode, 783 ServerName targetServer) { 784 regionNode.lock(); 785 try { 786 return regionNode.setProcedure(TransitRegionStateProcedure.assign(getProcedureEnvironment(), 787 regionNode.getRegionInfo(), targetServer)); 788 } finally { 789 regionNode.unlock(); 790 } 791 } 792 793 public long assign(RegionInfo regionInfo, ServerName sn) throws IOException { 794 TransitRegionStateProcedure proc = createAssignProcedure(regionInfo, sn, false); 795 ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc); 796 return proc.getProcId(); 797 } 798 799 public long assign(RegionInfo regionInfo) throws IOException { 800 return assign(regionInfo, null); 801 } 802 803 /** 804 * Submits a procedure that assigns a region to a target server without waiting for it to finish 805 * @param regionInfo the region we would like to assign 806 * @param sn target server name 807 */ 808 public Future<byte[]> assignAsync(RegionInfo regionInfo, ServerName sn) throws IOException { 809 return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), 810 createAssignProcedure(regionInfo, sn, false)); 811 } 812 813 /** 814 * Submits a procedure that assigns a region without waiting for it to finish 815 * @param regionInfo the region we would like to assign 816 */ 817 public Future<byte[]> assignAsync(RegionInfo regionInfo) throws IOException { 818 return assignAsync(regionInfo, null); 819 } 820 821 public long unassign(RegionInfo regionInfo) throws IOException { 822 RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo); 823 if (regionNode == null) { 824 throw new UnknownRegionException("No RegionState found for " + regionInfo.getEncodedName()); 825 } 826 TransitRegionStateProcedure proc; 827 regionNode.lock(); 828 try { 829 preTransitCheck(regionNode, STATES_EXPECTED_ON_UNASSIGN_OR_MOVE); 830 proc = TransitRegionStateProcedure.unassign(getProcedureEnvironment(), regionInfo); 831 regionNode.setProcedure(proc); 832 } finally { 833 regionNode.unlock(); 834 } 835 ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc); 836 return proc.getProcId(); 837 } 838 839 public TransitRegionStateProcedure createMoveRegionProcedure(RegionInfo regionInfo, 840 ServerName targetServer) throws HBaseIOException { 841 RegionStateNode regionNode = this.regionStates.getRegionStateNode(regionInfo); 842 if (regionNode == null) { 843 throw new UnknownRegionException( 844 "No RegionStateNode found for " + regionInfo.getEncodedName() + "(Closed/Deleted?)"); 845 } 846 TransitRegionStateProcedure proc; 847 regionNode.lock(); 848 try { 849 preTransitCheck(regionNode, STATES_EXPECTED_ON_UNASSIGN_OR_MOVE); 850 regionNode.checkOnline(); 851 proc = TransitRegionStateProcedure.move(getProcedureEnvironment(), regionInfo, targetServer); 852 regionNode.setProcedure(proc); 853 } finally { 854 regionNode.unlock(); 855 } 856 return proc; 857 } 858 859 public void move(RegionInfo regionInfo) throws IOException { 860 TransitRegionStateProcedure proc = createMoveRegionProcedure(regionInfo, null); 861 ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc); 862 } 863 864 public Future<byte[]> moveAsync(RegionPlan regionPlan) throws HBaseIOException { 865 TransitRegionStateProcedure proc = 866 createMoveRegionProcedure(regionPlan.getRegionInfo(), regionPlan.getDestination()); 867 return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), proc); 868 } 869 870 public Future<byte[]> balance(RegionPlan regionPlan) throws HBaseIOException { 871 ServerName current = 872 this.getRegionStates().getRegionAssignments().get(regionPlan.getRegionInfo()); 873 if (current == null || !current.equals(regionPlan.getSource())) { 874 LOG.debug("Skip region plan {}, source server not match, current region location is {}", 875 regionPlan, current == null ? "(null)" : current); 876 return null; 877 } 878 return moveAsync(regionPlan); 879 } 880 881 // ============================================================================================ 882 // RegionTransition procedures helpers 883 // ============================================================================================ 884 885 /** 886 * Create round-robin assigns. Use on table creation to distribute out regions across cluster. 887 * @return AssignProcedures made out of the passed in <code>hris</code> and a call to the balancer 888 * to populate the assigns with targets chosen using round-robin (default balancer 889 * scheme). If at assign-time, the target chosen is no longer up, thats fine, the 890 * AssignProcedure will ask the balancer for a new target, and so on. 891 */ 892 public TransitRegionStateProcedure[] createRoundRobinAssignProcedures(List<RegionInfo> hris, 893 List<ServerName> serversToExclude) { 894 if (hris.isEmpty()) { 895 return new TransitRegionStateProcedure[0]; 896 } 897 898 if ( 899 serversToExclude != null && this.master.getServerManager().getOnlineServersList().size() == 1 900 ) { 901 LOG.debug("Only one region server found and hence going ahead with the assignment"); 902 serversToExclude = null; 903 } 904 try { 905 // Ask the balancer to assign our regions. Pass the regions en masse. The balancer can do 906 // a better job if it has all the assignments in the one lump. 907 Map<ServerName, List<RegionInfo>> assignments = getBalancer().roundRobinAssignment(hris, 908 this.master.getServerManager().createDestinationServersList(serversToExclude)); 909 // Return mid-method! 910 return createAssignProcedures(assignments); 911 } catch (IOException hioe) { 912 LOG.warn("Failed roundRobinAssignment", hioe); 913 } 914 // If an error above, fall-through to this simpler assign. Last resort. 915 return createAssignProcedures(hris); 916 } 917 918 /** 919 * Create round-robin assigns. Use on table creation to distribute out regions across cluster. 920 * @return AssignProcedures made out of the passed in <code>hris</code> and a call to the balancer 921 * to populate the assigns with targets chosen using round-robin (default balancer 922 * scheme). If at assign-time, the target chosen is no longer up, thats fine, the 923 * AssignProcedure will ask the balancer for a new target, and so on. 924 */ 925 public TransitRegionStateProcedure[] createRoundRobinAssignProcedures(List<RegionInfo> hris) { 926 return createRoundRobinAssignProcedures(hris, null); 927 } 928 929 static int compare(TransitRegionStateProcedure left, TransitRegionStateProcedure right) { 930 if (left.getRegion().isMetaRegion()) { 931 if (right.getRegion().isMetaRegion()) { 932 return RegionInfo.COMPARATOR.compare(left.getRegion(), right.getRegion()); 933 } 934 return -1; 935 } else if (right.getRegion().isMetaRegion()) { 936 return +1; 937 } 938 if (left.getRegion().getTable().isSystemTable()) { 939 if (right.getRegion().getTable().isSystemTable()) { 940 return RegionInfo.COMPARATOR.compare(left.getRegion(), right.getRegion()); 941 } 942 return -1; 943 } else if (right.getRegion().getTable().isSystemTable()) { 944 return +1; 945 } 946 return RegionInfo.COMPARATOR.compare(left.getRegion(), right.getRegion()); 947 } 948 949 /** 950 * Create one TransitRegionStateProcedure to assign a region w/o specifying a target server. This 951 * method is called from HBCK2. 952 * @return an assign or null 953 */ 954 public TransitRegionStateProcedure createOneAssignProcedure(RegionInfo ri, boolean override) { 955 TransitRegionStateProcedure trsp = null; 956 try { 957 trsp = createAssignProcedure(ri, null, override); 958 } catch (IOException ioe) { 959 LOG.info( 960 "Failed {} assign, override={}" 961 + (override ? "" : "; set override to by-pass state checks."), 962 ri.getEncodedName(), override, ioe); 963 } 964 return trsp; 965 } 966 967 /** 968 * Create one TransitRegionStateProcedure to unassign a region. This method is called from HBCK2. 969 * @return an unassign or null 970 */ 971 public TransitRegionStateProcedure createOneUnassignProcedure(RegionInfo ri, boolean override) { 972 RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(ri); 973 TransitRegionStateProcedure trsp = null; 974 regionNode.lock(); 975 try { 976 if (override) { 977 if (regionNode.getProcedure() != null) { 978 regionNode.unsetProcedure(regionNode.getProcedure()); 979 } 980 } else { 981 // This is where we could throw an exception; i.e. override is false. 982 preTransitCheck(regionNode, STATES_EXPECTED_ON_UNASSIGN_OR_MOVE); 983 } 984 assert regionNode.getProcedure() == null; 985 trsp = 986 TransitRegionStateProcedure.unassign(getProcedureEnvironment(), regionNode.getRegionInfo()); 987 regionNode.setProcedure(trsp); 988 } catch (IOException ioe) { 989 // 'override' must be false here. 990 LOG.info("Failed {} unassign, override=false; set override to by-pass state checks.", 991 ri.getEncodedName(), ioe); 992 } finally { 993 regionNode.unlock(); 994 } 995 return trsp; 996 } 997 998 /** 999 * Create an array of TransitRegionStateProcedure w/o specifying a target server. Used as fallback 1000 * of caller is unable to do {@link #createAssignProcedures(Map)}. 1001 * <p/> 1002 * If no target server, at assign time, we will try to use the former location of the region if 1003 * one exists. This is how we 'retain' the old location across a server restart. 1004 * <p/> 1005 * Should only be called when you can make sure that no one can touch these regions other than 1006 * you. For example, when you are creating or enabling table. Presumes all Regions are in 1007 * appropriate state ripe for assign; no checking of Region state is done in here. 1008 * @see #createAssignProcedures(Map) 1009 */ 1010 public TransitRegionStateProcedure[] createAssignProcedures(List<RegionInfo> hris) { 1011 return hris.stream().map(hri -> regionStates.getOrCreateRegionStateNode(hri)) 1012 .map(regionNode -> createAssignProcedure(regionNode, null)).sorted(AssignmentManager::compare) 1013 .toArray(TransitRegionStateProcedure[]::new); 1014 } 1015 1016 /** 1017 * Tied to {@link #createAssignProcedures(List)} in that it is called if caller is unable to run 1018 * this method. Presumes all Regions are in appropriate state ripe for assign; no checking of 1019 * Region state is done in here. 1020 * @param assignments Map of assignments from which we produce an array of AssignProcedures. 1021 * @return Assignments made from the passed in <code>assignments</code> 1022 * @see #createAssignProcedures(List) 1023 */ 1024 private TransitRegionStateProcedure[] 1025 createAssignProcedures(Map<ServerName, List<RegionInfo>> assignments) { 1026 return assignments.entrySet().stream() 1027 .flatMap(e -> e.getValue().stream().map(hri -> regionStates.getOrCreateRegionStateNode(hri)) 1028 .map(regionNode -> createAssignProcedure(regionNode, e.getKey()))) 1029 .sorted(AssignmentManager::compare).toArray(TransitRegionStateProcedure[]::new); 1030 } 1031 1032 // for creating unassign TRSP when disabling a table or closing excess region replicas 1033 private TransitRegionStateProcedure forceCreateUnssignProcedure(RegionStateNode regionNode) { 1034 regionNode.lock(); 1035 try { 1036 if (regionNode.isInState(State.OFFLINE, State.CLOSED, State.SPLIT)) { 1037 return null; 1038 } 1039 // in general, a split parent should be in CLOSED or SPLIT state, but anyway, let's check it 1040 // here for safety 1041 if (regionNode.getRegionInfo().isSplit()) { 1042 LOG.warn("{} is a split parent but not in CLOSED or SPLIT state", regionNode); 1043 return null; 1044 } 1045 // As in DisableTableProcedure or ModifyTableProcedure, we will hold the xlock for table, so 1046 // we can make sure that this procedure has not been executed yet, as TRSP will hold the 1047 // shared lock for table all the time. So here we will unset it and when it is actually 1048 // executed, it will find that the attach procedure is not itself and quit immediately. 1049 if (regionNode.getProcedure() != null) { 1050 regionNode.unsetProcedure(regionNode.getProcedure()); 1051 } 1052 return regionNode.setProcedure(TransitRegionStateProcedure.unassign(getProcedureEnvironment(), 1053 regionNode.getRegionInfo())); 1054 } finally { 1055 regionNode.unlock(); 1056 } 1057 } 1058 1059 /** 1060 * Called by DisableTableProcedure to unassign all the regions for a table. 1061 */ 1062 public TransitRegionStateProcedure[] createUnassignProceduresForDisabling(TableName tableName) { 1063 return regionStates.getTableRegionStateNodes(tableName).stream() 1064 .map(this::forceCreateUnssignProcedure).filter(p -> p != null) 1065 .toArray(TransitRegionStateProcedure[]::new); 1066 } 1067 1068 /** 1069 * Called by ModifyTableProcedures to unassign all the excess region replicas for a table. 1070 */ 1071 public TransitRegionStateProcedure[] createUnassignProceduresForClosingExcessRegionReplicas( 1072 TableName tableName, int newReplicaCount) { 1073 return regionStates.getTableRegionStateNodes(tableName).stream() 1074 .filter(regionNode -> regionNode.getRegionInfo().getReplicaId() >= newReplicaCount) 1075 .map(this::forceCreateUnssignProcedure).filter(p -> p != null) 1076 .toArray(TransitRegionStateProcedure[]::new); 1077 } 1078 1079 public SplitTableRegionProcedure createSplitProcedure(final RegionInfo regionToSplit, 1080 final byte[] splitKey) throws IOException { 1081 return new SplitTableRegionProcedure(getProcedureEnvironment(), regionToSplit, splitKey); 1082 } 1083 1084 public MergeTableRegionsProcedure createMergeProcedure(RegionInfo... ris) throws IOException { 1085 return new MergeTableRegionsProcedure(getProcedureEnvironment(), ris, false); 1086 } 1087 1088 /** 1089 * Delete the region states. This is called by "DeleteTable" 1090 */ 1091 public void deleteTable(final TableName tableName) throws IOException { 1092 final ArrayList<RegionInfo> regions = regionStates.getTableRegionsInfo(tableName); 1093 regionStateStore.deleteRegions(regions); 1094 for (int i = 0; i < regions.size(); ++i) { 1095 final RegionInfo regionInfo = regions.get(i); 1096 // we expect the region to be offline 1097 regionStates.removeFromOfflineRegions(regionInfo); 1098 regionStates.deleteRegion(regionInfo); 1099 } 1100 } 1101 1102 // ============================================================================================ 1103 // RS Region Transition Report helpers 1104 // ============================================================================================ 1105 private void reportRegionStateTransition(ReportRegionStateTransitionResponse.Builder builder, 1106 ServerName serverName, List<RegionStateTransition> transitionList) throws IOException { 1107 for (RegionStateTransition transition : transitionList) { 1108 switch (transition.getTransitionCode()) { 1109 case OPENED: 1110 case FAILED_OPEN: 1111 case CLOSED: 1112 assert transition.getRegionInfoCount() == 1 : transition; 1113 final RegionInfo hri = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0)); 1114 long procId = 1115 transition.getProcIdCount() > 0 ? transition.getProcId(0) : Procedure.NO_PROC_ID; 1116 updateRegionTransition(serverName, transition.getTransitionCode(), hri, 1117 transition.hasOpenSeqNum() ? transition.getOpenSeqNum() : HConstants.NO_SEQNUM, procId); 1118 break; 1119 case READY_TO_SPLIT: 1120 case SPLIT: 1121 case SPLIT_REVERTED: 1122 assert transition.getRegionInfoCount() == 3 : transition; 1123 final RegionInfo parent = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0)); 1124 final RegionInfo splitA = ProtobufUtil.toRegionInfo(transition.getRegionInfo(1)); 1125 final RegionInfo splitB = ProtobufUtil.toRegionInfo(transition.getRegionInfo(2)); 1126 updateRegionSplitTransition(serverName, transition.getTransitionCode(), parent, splitA, 1127 splitB); 1128 break; 1129 case READY_TO_MERGE: 1130 case MERGED: 1131 case MERGE_REVERTED: 1132 assert transition.getRegionInfoCount() == 3 : transition; 1133 final RegionInfo merged = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0)); 1134 final RegionInfo mergeA = ProtobufUtil.toRegionInfo(transition.getRegionInfo(1)); 1135 final RegionInfo mergeB = ProtobufUtil.toRegionInfo(transition.getRegionInfo(2)); 1136 updateRegionMergeTransition(serverName, transition.getTransitionCode(), merged, mergeA, 1137 mergeB); 1138 break; 1139 } 1140 } 1141 } 1142 1143 public ReportRegionStateTransitionResponse reportRegionStateTransition( 1144 final ReportRegionStateTransitionRequest req) throws PleaseHoldException { 1145 ReportRegionStateTransitionResponse.Builder builder = 1146 ReportRegionStateTransitionResponse.newBuilder(); 1147 ServerName serverName = ProtobufUtil.toServerName(req.getServer()); 1148 ServerStateNode serverNode = regionStates.getOrCreateServer(serverName); 1149 // here we have to acquire a read lock instead of a simple exclusive lock. This is because that 1150 // we should not block other reportRegionStateTransition call from the same region server. This 1151 // is not only about performance, but also to prevent dead lock. Think of the meta region is 1152 // also on the same region server and you hold the lock which blocks the 1153 // reportRegionStateTransition for meta, and since meta is not online, you will block inside the 1154 // lock protection to wait for meta online... 1155 serverNode.readLock().lock(); 1156 try { 1157 // we only accept reportRegionStateTransition if the region server is online, see the comment 1158 // above in submitServerCrash method and HBASE-21508 for more details. 1159 if (serverNode.isInState(ServerState.ONLINE)) { 1160 try { 1161 reportRegionStateTransition(builder, serverName, req.getTransitionList()); 1162 } catch (PleaseHoldException e) { 1163 LOG.trace("Failed transition ", e); 1164 throw e; 1165 } catch (UnsupportedOperationException | IOException e) { 1166 // TODO: at the moment we have a single error message and the RS will abort 1167 // if the master says that one of the region transitions failed. 1168 LOG.warn("Failed transition", e); 1169 builder.setErrorMessage("Failed transition " + e.getMessage()); 1170 } 1171 } else { 1172 LOG.warn("The region server {} is already dead, skip reportRegionStateTransition call", 1173 serverName); 1174 builder.setErrorMessage("You are dead"); 1175 } 1176 } finally { 1177 serverNode.readLock().unlock(); 1178 } 1179 1180 return builder.build(); 1181 } 1182 1183 private void updateRegionTransition(ServerName serverName, TransitionCode state, 1184 RegionInfo regionInfo, long seqId, long procId) throws IOException { 1185 checkMetaLoaded(regionInfo); 1186 1187 RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo); 1188 if (regionNode == null) { 1189 // the table/region is gone. maybe a delete, split, merge 1190 throw new UnexpectedStateException(String.format( 1191 "Server %s was trying to transition region %s to %s. but Region is not known.", serverName, 1192 regionInfo, state)); 1193 } 1194 LOG.trace("Update region transition serverName={} region={} regionState={}", serverName, 1195 regionNode, state); 1196 1197 ServerStateNode serverNode = regionStates.getOrCreateServer(serverName); 1198 regionNode.lock(); 1199 try { 1200 if (!reportTransition(regionNode, serverNode, state, seqId, procId)) { 1201 // Don't log WARN if shutting down cluster; during shutdown. Avoid the below messages: 1202 // 2018-08-13 10:45:10,551 WARN ...AssignmentManager: No matching procedure found for 1203 // rit=OPEN, location=ve0538.halxg.cloudera.com,16020,1533493000958, 1204 // table=IntegrationTestBigLinkedList, region=65ab289e2fc1530df65f6c3d7cde7aa5 transition 1205 // to CLOSED 1206 // These happen because on cluster shutdown, we currently let the RegionServers close 1207 // regions. This is the only time that region close is not run by the Master (so cluster 1208 // goes down fast). Consider changing it so Master runs all shutdowns. 1209 if ( 1210 this.master.getServerManager().isClusterShutdown() && state.equals(TransitionCode.CLOSED) 1211 ) { 1212 LOG.info("RegionServer {} {}", state, regionNode.getRegionInfo().getEncodedName()); 1213 } else { 1214 LOG.warn("No matching procedure found for {} transition on {} to {}", serverName, 1215 regionNode, state); 1216 } 1217 } 1218 } finally { 1219 regionNode.unlock(); 1220 } 1221 } 1222 1223 private boolean reportTransition(RegionStateNode regionNode, ServerStateNode serverNode, 1224 TransitionCode state, long seqId, long procId) throws IOException { 1225 ServerName serverName = serverNode.getServerName(); 1226 TransitRegionStateProcedure proc = regionNode.getProcedure(); 1227 if (proc == null) { 1228 return false; 1229 } 1230 proc.reportTransition(master.getMasterProcedureExecutor().getEnvironment(), regionNode, 1231 serverName, state, seqId, procId); 1232 return true; 1233 } 1234 1235 private void updateRegionSplitTransition(final ServerName serverName, final TransitionCode state, 1236 final RegionInfo parent, final RegionInfo hriA, final RegionInfo hriB) throws IOException { 1237 checkMetaLoaded(parent); 1238 1239 if (state != TransitionCode.READY_TO_SPLIT) { 1240 throw new UnexpectedStateException( 1241 "unsupported split regionState=" + state + " for parent region " + parent 1242 + " maybe an old RS (< 2.0) had the operation in progress"); 1243 } 1244 1245 // sanity check on the request 1246 if (!Bytes.equals(hriA.getEndKey(), hriB.getStartKey())) { 1247 throw new UnsupportedOperationException("unsupported split request with bad keys: parent=" 1248 + parent + " hriA=" + hriA + " hriB=" + hriB); 1249 } 1250 1251 if (!master.isSplitOrMergeEnabled(MasterSwitchType.SPLIT)) { 1252 LOG.warn("Split switch is off! skip split of " + parent); 1253 throw new DoNotRetryIOException( 1254 "Split region " + parent.getRegionNameAsString() + " failed due to split switch off"); 1255 } 1256 1257 // Submit the Split procedure 1258 final byte[] splitKey = hriB.getStartKey(); 1259 if (LOG.isDebugEnabled()) { 1260 LOG.debug("Split request from " + serverName + ", parent=" + parent + " splitKey=" 1261 + Bytes.toStringBinary(splitKey)); 1262 } 1263 // Processing this report happens asynchronously from other activities which can mutate 1264 // the region state. For example, a split procedure may already be running for this parent. 1265 // A split procedure cannot succeed if the parent region is no longer open, so we can 1266 // ignore it in that case. 1267 // Note that submitting more than one split procedure for a given region is 1268 // harmless -- the split is fenced in the procedure handling -- but it would be noisy in 1269 // the logs. Only one procedure can succeed. The other procedure(s) would abort during 1270 // initialization and report failure with WARN level logging. 1271 RegionState parentState = regionStates.getRegionState(parent); 1272 if (parentState != null && parentState.isOpened()) { 1273 master.getMasterProcedureExecutor().submitProcedure(createSplitProcedure(parent, splitKey)); 1274 } else { 1275 LOG.info("Ignoring split request from " + serverName + ", parent=" + parent 1276 + " because parent is unknown or not open"); 1277 return; 1278 } 1279 1280 // If the RS is < 2.0 throw an exception to abort the operation, we are handling the split 1281 if (master.getServerManager().getVersionNumber(serverName) < 0x0200000) { 1282 throw new UnsupportedOperationException( 1283 String.format("Split handled by the master: " + "parent=%s hriA=%s hriB=%s", 1284 parent.getShortNameToLog(), hriA, hriB)); 1285 } 1286 } 1287 1288 private void updateRegionMergeTransition(final ServerName serverName, final TransitionCode state, 1289 final RegionInfo merged, final RegionInfo hriA, final RegionInfo hriB) throws IOException { 1290 checkMetaLoaded(merged); 1291 1292 if (state != TransitionCode.READY_TO_MERGE) { 1293 throw new UnexpectedStateException( 1294 "Unsupported merge regionState=" + state + " for regionA=" + hriA + " regionB=" + hriB 1295 + " merged=" + merged + " maybe an old RS (< 2.0) had the operation in progress"); 1296 } 1297 1298 if (!master.isSplitOrMergeEnabled(MasterSwitchType.MERGE)) { 1299 LOG.warn("Merge switch is off! skip merge of regionA=" + hriA + " regionB=" + hriB); 1300 throw new DoNotRetryIOException( 1301 "Merge of regionA=" + hriA + " regionB=" + hriB + " failed because merge switch is off"); 1302 } 1303 1304 // Submit the Merge procedure 1305 if (LOG.isDebugEnabled()) { 1306 LOG.debug("Handling merge request from RS=" + merged + ", merged=" + merged); 1307 } 1308 master.getMasterProcedureExecutor().submitProcedure(createMergeProcedure(hriA, hriB)); 1309 1310 // If the RS is < 2.0 throw an exception to abort the operation, we are handling the merge 1311 if (master.getServerManager().getVersionNumber(serverName) < 0x0200000) { 1312 throw new UnsupportedOperationException( 1313 String.format("Merge not handled yet: regionState=%s merged=%s hriA=%s hriB=%s", state, 1314 merged, hriA, hriB)); 1315 } 1316 } 1317 1318 // ============================================================================================ 1319 // RS Status update (report online regions) helpers 1320 // ============================================================================================ 1321 /** 1322 * The master will call this method when the RS send the regionServerReport(). The report will 1323 * contains the "online regions". This method will check the the online regions against the 1324 * in-memory state of the AM, and we will log a warn message if there is a mismatch. This is 1325 * because that there is no fencing between the reportRegionStateTransition method and 1326 * regionServerReport method, so there could be race and introduce inconsistency here, but 1327 * actually there is no problem. 1328 * <p/> 1329 * Please see HBASE-21421 and HBASE-21463 for more details. 1330 */ 1331 public void reportOnlineRegions(ServerName serverName, Set<byte[]> regionNames) { 1332 if (!isRunning()) { 1333 return; 1334 } 1335 if (LOG.isTraceEnabled()) { 1336 LOG.trace("ReportOnlineRegions {} regionCount={}, metaLoaded={} {}", serverName, 1337 regionNames.size(), isMetaLoaded(), 1338 regionNames.stream().map(Bytes::toStringBinary).collect(Collectors.toList())); 1339 } 1340 1341 ServerStateNode serverNode = regionStates.getOrCreateServer(serverName); 1342 synchronized (serverNode) { 1343 if (!serverNode.isInState(ServerState.ONLINE)) { 1344 LOG.warn("Got a report from a server result in state " + serverNode.getState()); 1345 return; 1346 } 1347 } 1348 1349 // Track the regionserver reported online regions in memory. 1350 synchronized (rsReports) { 1351 rsReports.put(serverName, regionNames); 1352 } 1353 1354 if (regionNames.isEmpty()) { 1355 // nothing to do if we don't have regions 1356 LOG.trace("no online region found on {}", serverName); 1357 return; 1358 } 1359 if (!isMetaLoaded()) { 1360 // we are still on startup, skip checking 1361 return; 1362 } 1363 // The Heartbeat tells us of what regions are on the region serve, check the state. 1364 checkOnlineRegionsReport(serverNode, regionNames); 1365 } 1366 1367 /** 1368 * Close <code>regionName</code> on <code>sn</code> silently and immediately without using a 1369 * Procedure or going via hbase:meta. For case where a RegionServer's hosting of a Region is not 1370 * aligned w/ the Master's accounting of Region state. This is for cleaning up an error in 1371 * accounting. 1372 */ 1373 private void closeRegionSilently(ServerName sn, byte[] regionName) { 1374 try { 1375 RegionInfo ri = CatalogFamilyFormat.parseRegionInfoFromRegionName(regionName); 1376 // Pass -1 for timeout. Means do not wait. 1377 ServerManager.closeRegionSilentlyAndWait(this.master.getAsyncClusterConnection(), sn, ri, -1); 1378 } catch (Exception e) { 1379 LOG.error("Failed trying to close {} on {}", Bytes.toStringBinary(regionName), sn, e); 1380 } 1381 } 1382 1383 /** 1384 * Check that what the RegionServer reports aligns with the Master's image. If disagreement, we 1385 * will tell the RegionServer to expediently close a Region we do not think it should have. 1386 */ 1387 private void checkOnlineRegionsReport(ServerStateNode serverNode, Set<byte[]> regionNames) { 1388 ServerName serverName = serverNode.getServerName(); 1389 for (byte[] regionName : regionNames) { 1390 if (!isRunning()) { 1391 return; 1392 } 1393 RegionStateNode regionNode = regionStates.getRegionStateNodeFromName(regionName); 1394 if (regionNode == null) { 1395 String regionNameAsStr = Bytes.toStringBinary(regionName); 1396 LOG.warn("No RegionStateNode for {} but reported as up on {}; closing...", regionNameAsStr, 1397 serverName); 1398 closeRegionSilently(serverNode.getServerName(), regionName); 1399 continue; 1400 } 1401 final long lag = 1000; 1402 regionNode.lock(); 1403 try { 1404 long diff = EnvironmentEdgeManager.currentTime() - regionNode.getLastUpdate(); 1405 if (regionNode.isInState(State.OPENING, State.OPEN)) { 1406 // This is possible as a region server has just closed a region but the region server 1407 // report is generated before the closing, but arrive after the closing. Make sure there 1408 // is some elapsed time so less false alarms. 1409 if (!regionNode.getRegionLocation().equals(serverName) && diff > lag) { 1410 LOG.warn("Reporting {} server does not match {} (time since last " 1411 + "update={}ms); closing...", serverName, regionNode, diff); 1412 closeRegionSilently(serverNode.getServerName(), regionName); 1413 } 1414 } else if (!regionNode.isInState(State.CLOSING, State.SPLITTING)) { 1415 // So, we can get report that a region is CLOSED or SPLIT because a heartbeat 1416 // came in at about same time as a region transition. Make sure there is some 1417 // elapsed time so less false alarms. 1418 if (diff > lag) { 1419 LOG.warn("Reporting {} state does not match {} (time since last update={}ms)", 1420 serverName, regionNode, diff); 1421 } 1422 } 1423 } finally { 1424 regionNode.unlock(); 1425 } 1426 } 1427 } 1428 1429 // ============================================================================================ 1430 // RIT chore 1431 // ============================================================================================ 1432 private static class RegionInTransitionChore extends ProcedureInMemoryChore<MasterProcedureEnv> { 1433 public RegionInTransitionChore(final int timeoutMsec) { 1434 super(timeoutMsec); 1435 } 1436 1437 @Override 1438 protected void periodicExecute(final MasterProcedureEnv env) { 1439 final AssignmentManager am = env.getAssignmentManager(); 1440 1441 final RegionInTransitionStat ritStat = am.computeRegionInTransitionStat(); 1442 if (ritStat.hasRegionsOverThreshold()) { 1443 for (RegionState hri : ritStat.getRegionOverThreshold()) { 1444 am.handleRegionOverStuckWarningThreshold(hri.getRegion()); 1445 } 1446 } 1447 1448 // update metrics 1449 am.updateRegionsInTransitionMetrics(ritStat); 1450 } 1451 } 1452 1453 private static class DeadServerMetricRegionChore 1454 extends ProcedureInMemoryChore<MasterProcedureEnv> { 1455 public DeadServerMetricRegionChore(final int timeoutMsec) { 1456 super(timeoutMsec); 1457 } 1458 1459 @Override 1460 protected void periodicExecute(final MasterProcedureEnv env) { 1461 final ServerManager sm = env.getMasterServices().getServerManager(); 1462 final AssignmentManager am = env.getAssignmentManager(); 1463 // To minimize inconsistencies we are not going to snapshot live servers in advance in case 1464 // new servers are added; OTOH we don't want to add heavy sync for a consistent view since 1465 // this is for metrics. Instead, we're going to check each regions as we go; to avoid making 1466 // too many checks, we maintain a local lists of server, limiting us to false negatives. If 1467 // we miss some recently-dead server, we'll just see it next time. 1468 Set<ServerName> recentlyLiveServers = new HashSet<>(); 1469 int deadRegions = 0, unknownRegions = 0; 1470 for (RegionStateNode rsn : am.getRegionStates().getRegionStateNodes()) { 1471 if (rsn.getState() != State.OPEN) { 1472 continue; // Opportunistic check, should quickly skip RITs, offline tables, etc. 1473 } 1474 // Do not need to acquire region state lock as this is only for showing metrics. 1475 ServerName sn = rsn.getRegionLocation(); 1476 State state = rsn.getState(); 1477 if (state != State.OPEN) { 1478 continue; // Mostly skipping RITs that are already being take care of. 1479 } 1480 if (sn == null) { 1481 ++unknownRegions; // Opened on null? 1482 continue; 1483 } 1484 if (recentlyLiveServers.contains(sn)) { 1485 continue; 1486 } 1487 ServerManager.ServerLiveState sls = sm.isServerKnownAndOnline(sn); 1488 switch (sls) { 1489 case LIVE: 1490 recentlyLiveServers.add(sn); 1491 break; 1492 case DEAD: 1493 ++deadRegions; 1494 break; 1495 case UNKNOWN: 1496 ++unknownRegions; 1497 break; 1498 default: 1499 throw new AssertionError("Unexpected " + sls); 1500 } 1501 } 1502 if (deadRegions > 0 || unknownRegions > 0) { 1503 LOG.info("Found {} OPEN regions on dead servers and {} OPEN regions on unknown servers", 1504 deadRegions, unknownRegions); 1505 } 1506 1507 am.updateDeadServerRegionMetrics(deadRegions, unknownRegions); 1508 } 1509 } 1510 1511 public RegionInTransitionStat computeRegionInTransitionStat() { 1512 final RegionInTransitionStat rit = new RegionInTransitionStat(getConfiguration()); 1513 rit.update(this); 1514 return rit; 1515 } 1516 1517 public static class RegionInTransitionStat { 1518 private final int ritThreshold; 1519 1520 private HashMap<String, RegionState> ritsOverThreshold = null; 1521 private long statTimestamp; 1522 private long oldestRITTime = 0; 1523 private int totalRITsTwiceThreshold = 0; 1524 private int totalRITs = 0; 1525 1526 public RegionInTransitionStat(final Configuration conf) { 1527 this.ritThreshold = 1528 conf.getInt(METRICS_RIT_STUCK_WARNING_THRESHOLD, DEFAULT_RIT_STUCK_WARNING_THRESHOLD); 1529 } 1530 1531 public int getRITThreshold() { 1532 return ritThreshold; 1533 } 1534 1535 public long getTimestamp() { 1536 return statTimestamp; 1537 } 1538 1539 public int getTotalRITs() { 1540 return totalRITs; 1541 } 1542 1543 public long getOldestRITTime() { 1544 return oldestRITTime; 1545 } 1546 1547 public int getTotalRITsOverThreshold() { 1548 Map<String, RegionState> m = this.ritsOverThreshold; 1549 return m != null ? m.size() : 0; 1550 } 1551 1552 public boolean hasRegionsTwiceOverThreshold() { 1553 return totalRITsTwiceThreshold > 0; 1554 } 1555 1556 public boolean hasRegionsOverThreshold() { 1557 Map<String, RegionState> m = this.ritsOverThreshold; 1558 return m != null && !m.isEmpty(); 1559 } 1560 1561 public Collection<RegionState> getRegionOverThreshold() { 1562 Map<String, RegionState> m = this.ritsOverThreshold; 1563 return m != null ? m.values() : Collections.emptySet(); 1564 } 1565 1566 public boolean isRegionOverThreshold(final RegionInfo regionInfo) { 1567 Map<String, RegionState> m = this.ritsOverThreshold; 1568 return m != null && m.containsKey(regionInfo.getEncodedName()); 1569 } 1570 1571 public boolean isRegionTwiceOverThreshold(final RegionInfo regionInfo) { 1572 Map<String, RegionState> m = this.ritsOverThreshold; 1573 if (m == null) { 1574 return false; 1575 } 1576 final RegionState state = m.get(regionInfo.getEncodedName()); 1577 if (state == null) { 1578 return false; 1579 } 1580 return (statTimestamp - state.getStamp()) > (ritThreshold * 2); 1581 } 1582 1583 protected void update(final AssignmentManager am) { 1584 final RegionStates regionStates = am.getRegionStates(); 1585 this.statTimestamp = EnvironmentEdgeManager.currentTime(); 1586 update(regionStates.getRegionsStateInTransition(), statTimestamp); 1587 update(regionStates.getRegionFailedOpen(), statTimestamp); 1588 1589 if (LOG.isDebugEnabled() && ritsOverThreshold != null && !ritsOverThreshold.isEmpty()) { 1590 LOG.debug("RITs over threshold: {}", 1591 ritsOverThreshold.entrySet().stream() 1592 .map(e -> e.getKey() + ":" + e.getValue().getState().name()) 1593 .collect(Collectors.joining("\n"))); 1594 } 1595 } 1596 1597 private void update(final Collection<RegionState> regions, final long currentTime) { 1598 for (RegionState state : regions) { 1599 totalRITs++; 1600 final long ritStartedMs = state.getStamp(); 1601 if (ritStartedMs == 0) { 1602 // Don't output bogus values to metrics if they accidentally make it here. 1603 LOG.warn("The RIT {} has no start time", state.getRegion()); 1604 continue; 1605 } 1606 final long ritTime = currentTime - ritStartedMs; 1607 if (ritTime > ritThreshold) { 1608 if (ritsOverThreshold == null) { 1609 ritsOverThreshold = new HashMap<String, RegionState>(); 1610 } 1611 ritsOverThreshold.put(state.getRegion().getEncodedName(), state); 1612 totalRITsTwiceThreshold += (ritTime > (ritThreshold * 2)) ? 1 : 0; 1613 } 1614 if (oldestRITTime < ritTime) { 1615 oldestRITTime = ritTime; 1616 } 1617 } 1618 } 1619 } 1620 1621 private void updateRegionsInTransitionMetrics(final RegionInTransitionStat ritStat) { 1622 metrics.updateRITOldestAge(ritStat.getOldestRITTime()); 1623 metrics.updateRITCount(ritStat.getTotalRITs()); 1624 metrics.updateRITCountOverThreshold(ritStat.getTotalRITsOverThreshold()); 1625 } 1626 1627 private void updateDeadServerRegionMetrics(int deadRegions, int unknownRegions) { 1628 metrics.updateDeadServerOpenRegions(deadRegions); 1629 metrics.updateUnknownServerOpenRegions(unknownRegions); 1630 } 1631 1632 private void handleRegionOverStuckWarningThreshold(final RegionInfo regionInfo) { 1633 final RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo); 1634 // if (regionNode.isStuck()) { 1635 LOG.warn("STUCK Region-In-Transition {}", regionNode); 1636 } 1637 1638 // ============================================================================================ 1639 // TODO: Master load/bootstrap 1640 // ============================================================================================ 1641 public void joinCluster() throws IOException { 1642 long startTime = System.nanoTime(); 1643 LOG.debug("Joining cluster..."); 1644 1645 // Scan hbase:meta to build list of existing regions, servers, and assignment. 1646 // hbase:meta is online now or will be. Inside loadMeta, we keep trying. Can't make progress 1647 // w/o meta. 1648 loadMeta(); 1649 1650 while (master.getServerManager().countOfRegionServers() < 1) { 1651 LOG.info("Waiting for RegionServers to join; current count={}", 1652 master.getServerManager().countOfRegionServers()); 1653 Threads.sleep(250); 1654 } 1655 LOG.info("Number of RegionServers={}", master.getServerManager().countOfRegionServers()); 1656 1657 // Start the chores 1658 master.getMasterProcedureExecutor().addChore(this.ritChore); 1659 master.getMasterProcedureExecutor().addChore(this.deadMetricChore); 1660 1661 long costMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime); 1662 LOG.info("Joined the cluster in {}", StringUtils.humanTimeDiff(costMs)); 1663 } 1664 1665 /** 1666 * Create assign procedure for offline regions. Just follow the old 1667 * processofflineServersWithOnlineRegions method. Since now we do not need to deal with dead 1668 * server any more, we only deal with the regions in OFFLINE state in this method. And this is a 1669 * bit strange, that for new regions, we will add it in CLOSED state instead of OFFLINE state, and 1670 * usually there will be a procedure to track them. The processofflineServersWithOnlineRegions is 1671 * a legacy from long ago, as things are going really different now, maybe we do not need this 1672 * method any more. Need to revisit later. 1673 */ 1674 // Public so can be run by the Master as part of the startup. Needs hbase:meta to be online. 1675 // Needs to be done after the table state manager has been started. 1676 public void processOfflineRegions() { 1677 TransitRegionStateProcedure[] procs = 1678 regionStates.getRegionStateNodes().stream().filter(rsn -> rsn.isInState(State.OFFLINE)) 1679 .filter(rsn -> isTableEnabled(rsn.getRegionInfo().getTable())).map(rsn -> { 1680 rsn.lock(); 1681 try { 1682 if (rsn.getProcedure() != null) { 1683 return null; 1684 } else { 1685 return rsn.setProcedure(TransitRegionStateProcedure.assign(getProcedureEnvironment(), 1686 rsn.getRegionInfo(), null)); 1687 } 1688 } finally { 1689 rsn.unlock(); 1690 } 1691 }).filter(p -> p != null).toArray(TransitRegionStateProcedure[]::new); 1692 if (procs.length > 0) { 1693 master.getMasterProcedureExecutor().submitProcedures(procs); 1694 } 1695 } 1696 1697 /* 1698 * AM internal RegionStateStore.RegionStateVisitor implementation. To be used when scanning META 1699 * table for region rows, using RegionStateStore utility methods. RegionStateStore methods will 1700 * convert Result into proper RegionInfo instances, but those would still need to be added into 1701 * AssignmentManager.regionStates in-memory cache. RegionMetaLoadingVisitor.visitRegionState 1702 * method provides the logic for adding RegionInfo instances as loaded from latest META scan into 1703 * AssignmentManager.regionStates. 1704 */ 1705 private class RegionMetaLoadingVisitor implements RegionStateStore.RegionStateVisitor { 1706 1707 @Override 1708 public void visitRegionState(Result result, final RegionInfo regionInfo, final State state, 1709 final ServerName regionLocation, final ServerName lastHost, final long openSeqNum) { 1710 if ( 1711 state == null && regionLocation == null && lastHost == null 1712 && openSeqNum == SequenceId.NO_SEQUENCE_ID 1713 ) { 1714 // This is a row with nothing in it. 1715 LOG.warn("Skipping empty row={}", result); 1716 return; 1717 } 1718 State localState = state; 1719 if (localState == null) { 1720 // No region state column data in hbase:meta table! Are I doing a rolling upgrade from 1721 // hbase1 to hbase2? Am I restoring a SNAPSHOT or otherwise adding a region to hbase:meta? 1722 // In any of these cases, state is empty. For now, presume OFFLINE but there are probably 1723 // cases where we need to probe more to be sure this correct; TODO informed by experience. 1724 LOG.info(regionInfo.getEncodedName() + " regionState=null; presuming " + State.OFFLINE); 1725 localState = State.OFFLINE; 1726 } 1727 RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo); 1728 // Do not need to lock on regionNode, as we can make sure that before we finish loading 1729 // meta, all the related procedures can not be executed. The only exception is for meta 1730 // region related operations, but here we do not load the informations for meta region. 1731 regionNode.setState(localState); 1732 regionNode.setLastHost(lastHost); 1733 regionNode.setRegionLocation(regionLocation); 1734 regionNode.setOpenSeqNum(openSeqNum); 1735 1736 // Note: keep consistent with other methods, see region(Opening|Opened|Closing) 1737 // RIT/ServerCrash handling should take care of the transiting regions. 1738 if ( 1739 localState.matches(State.OPEN, State.OPENING, State.CLOSING, State.SPLITTING, State.MERGING) 1740 ) { 1741 assert regionLocation != null : "found null region location for " + regionNode; 1742 regionStates.addRegionToServer(regionNode); 1743 } else if (localState == State.OFFLINE || regionInfo.isOffline()) { 1744 regionStates.addToOfflineRegions(regionNode); 1745 } 1746 if (regionNode.getProcedure() != null) { 1747 regionNode.getProcedure().stateLoaded(AssignmentManager.this, regionNode); 1748 } 1749 } 1750 }; 1751 1752 /** 1753 * Query META if the given <code>RegionInfo</code> exists, adding to 1754 * <code>AssignmentManager.regionStateStore</code> cache if the region is found in META. 1755 * @param regionEncodedName encoded name for the region to be loaded from META into 1756 * <code>AssignmentManager.regionStateStore</code> cache 1757 * @return <code>RegionInfo</code> instance for the given region if it is present in META and got 1758 * successfully loaded into <code>AssignmentManager.regionStateStore</code> cache, 1759 * <b>null</b> otherwise. 1760 * @throws UnknownRegionException if any errors occur while querying meta. 1761 */ 1762 public RegionInfo loadRegionFromMeta(String regionEncodedName) throws UnknownRegionException { 1763 try { 1764 RegionMetaLoadingVisitor visitor = new RegionMetaLoadingVisitor(); 1765 regionStateStore.visitMetaForRegion(regionEncodedName, visitor); 1766 return regionStates.getRegionState(regionEncodedName) == null 1767 ? null 1768 : regionStates.getRegionState(regionEncodedName).getRegion(); 1769 } catch (IOException e) { 1770 throw new UnknownRegionException( 1771 "Error trying to load region " + regionEncodedName + " from META", e); 1772 } 1773 } 1774 1775 private void loadMeta() throws IOException { 1776 // TODO: use a thread pool 1777 regionStateStore.visitMeta(new RegionMetaLoadingVisitor()); 1778 } 1779 1780 /** 1781 * Used to check if the meta loading is done. 1782 * <p/> 1783 * if not we throw PleaseHoldException since we are rebuilding the RegionStates 1784 * @param hri region to check if it is already rebuild 1785 * @throws PleaseHoldException if meta has not been loaded yet 1786 */ 1787 private void checkMetaLoaded(RegionInfo hri) throws PleaseHoldException { 1788 if (!isRunning()) { 1789 throw new PleaseHoldException("AssignmentManager not running"); 1790 } 1791 boolean meta = isMetaRegion(hri); 1792 boolean metaLoaded = isMetaLoaded(); 1793 if (!meta && !metaLoaded) { 1794 throw new PleaseHoldException( 1795 "Master not fully online; hbase:meta=" + meta + ", metaLoaded=" + metaLoaded); 1796 } 1797 } 1798 1799 // ============================================================================================ 1800 // TODO: Metrics 1801 // ============================================================================================ 1802 public int getNumRegionsOpened() { 1803 // TODO: Used by TestRegionPlacement.java and assume monotonically increasing value 1804 return 0; 1805 } 1806 1807 /** 1808 * Usually run by the Master in reaction to server crash during normal processing. Can also be 1809 * invoked via external RPC to effect repair; in the latter case, the 'force' flag is set so we 1810 * push through the SCP though context may indicate already-running-SCP (An old SCP may have 1811 * exited abnormally, or damaged cluster may still have references in hbase:meta to 'Unknown 1812 * Servers' -- servers that are not online or in dead servers list, etc.) 1813 * @param force Set if the request came in externally over RPC (via hbck2). Force means run the 1814 * SCP even if it seems as though there might be an outstanding SCP running. 1815 * @return pid of scheduled SCP or {@link Procedure#NO_PROC_ID} if none scheduled. 1816 */ 1817 public long submitServerCrash(ServerName serverName, boolean shouldSplitWal, boolean force) { 1818 // May be an 'Unknown Server' so handle case where serverNode is null. 1819 ServerStateNode serverNode = regionStates.getServerNode(serverName); 1820 // Remove the in-memory rsReports result 1821 synchronized (rsReports) { 1822 rsReports.remove(serverName); 1823 } 1824 1825 // We hold the write lock here for fencing on reportRegionStateTransition. Once we set the 1826 // server state to CRASHED, we will no longer accept the reportRegionStateTransition call from 1827 // this server. This is used to simplify the implementation for TRSP and SCP, where we can make 1828 // sure that, the region list fetched by SCP will not be changed any more. 1829 if (serverNode != null) { 1830 serverNode.writeLock().lock(); 1831 } 1832 boolean carryingMeta; 1833 long pid; 1834 try { 1835 ProcedureExecutor<MasterProcedureEnv> procExec = this.master.getMasterProcedureExecutor(); 1836 carryingMeta = isCarryingMeta(serverName); 1837 if (!force && serverNode != null && !serverNode.isInState(ServerState.ONLINE)) { 1838 LOG.info("Skip adding ServerCrashProcedure for {} (meta={}) -- running?", serverNode, 1839 carryingMeta); 1840 return Procedure.NO_PROC_ID; 1841 } else { 1842 MasterProcedureEnv mpe = procExec.getEnvironment(); 1843 // If serverNode == null, then 'Unknown Server'. Schedule HBCKSCP instead. 1844 // HBCKSCP scours Master in-memory state AND hbase;meta for references to 1845 // serverName just-in-case. An SCP that is scheduled when the server is 1846 // 'Unknown' probably originated externally with HBCK2 fix-it tool. 1847 ServerState oldState = null; 1848 if (serverNode != null) { 1849 oldState = serverNode.getState(); 1850 serverNode.setState(ServerState.CRASHED); 1851 } 1852 1853 if (force) { 1854 pid = procExec.submitProcedure( 1855 new HBCKServerCrashProcedure(mpe, serverName, shouldSplitWal, carryingMeta)); 1856 } else { 1857 pid = procExec.submitProcedure( 1858 new ServerCrashProcedure(mpe, serverName, shouldSplitWal, carryingMeta)); 1859 } 1860 LOG.info("Scheduled ServerCrashProcedure pid={} for {} (carryingMeta={}){}.", pid, 1861 serverName, carryingMeta, 1862 serverNode == null ? "" : " " + serverNode.toString() + ", oldState=" + oldState); 1863 } 1864 } finally { 1865 if (serverNode != null) { 1866 serverNode.writeLock().unlock(); 1867 } 1868 } 1869 return pid; 1870 } 1871 1872 public void offlineRegion(final RegionInfo regionInfo) { 1873 // TODO used by MasterRpcServices 1874 RegionStateNode node = regionStates.getRegionStateNode(regionInfo); 1875 if (node != null) { 1876 node.offline(); 1877 } 1878 } 1879 1880 public void onlineRegion(final RegionInfo regionInfo, final ServerName serverName) { 1881 // TODO used by TestSplitTransactionOnCluster.java 1882 } 1883 1884 public Map<ServerName, List<RegionInfo>> 1885 getSnapShotOfAssignment(final Collection<RegionInfo> regions) { 1886 return regionStates.getSnapShotOfAssignment(regions); 1887 } 1888 1889 // ============================================================================================ 1890 // TODO: UTILS/HELPERS? 1891 // ============================================================================================ 1892 /** 1893 * Used by the client (via master) to identify if all regions have the schema updates 1894 * @return Pair indicating the status of the alter command (pending/total) 1895 */ 1896 public Pair<Integer, Integer> getReopenStatus(TableName tableName) { 1897 if (isTableDisabled(tableName)) { 1898 return new Pair<Integer, Integer>(0, 0); 1899 } 1900 1901 final List<RegionState> states = regionStates.getTableRegionStates(tableName); 1902 int ritCount = 0; 1903 for (RegionState regionState : states) { 1904 if (!regionState.isOpened() && !regionState.isSplit()) { 1905 ritCount++; 1906 } 1907 } 1908 return new Pair<Integer, Integer>(ritCount, states.size()); 1909 } 1910 1911 // ============================================================================================ 1912 // TODO: Region State In Transition 1913 // ============================================================================================ 1914 public boolean hasRegionsInTransition() { 1915 return regionStates.hasRegionsInTransition(); 1916 } 1917 1918 public List<RegionStateNode> getRegionsInTransition() { 1919 return regionStates.getRegionsInTransition(); 1920 } 1921 1922 public List<RegionInfo> getAssignedRegions() { 1923 return regionStates.getAssignedRegions(); 1924 } 1925 1926 public RegionInfo getRegionInfo(final byte[] regionName) { 1927 final RegionStateNode regionState = regionStates.getRegionStateNodeFromName(regionName); 1928 return regionState != null ? regionState.getRegionInfo() : null; 1929 } 1930 1931 // ============================================================================================ 1932 // Expected states on region state transition. 1933 // Notice that there is expected states for transiting to OPENING state, this is because SCP. 1934 // See the comments in regionOpening method for more details. 1935 // ============================================================================================ 1936 private static final State[] STATES_EXPECTED_ON_OPEN = { State.OPENING, // Normal case 1937 State.OPEN // Retrying 1938 }; 1939 1940 private static final State[] STATES_EXPECTED_ON_CLOSING = { State.OPEN, // Normal case 1941 State.CLOSING, // Retrying 1942 State.SPLITTING, // Offline the split parent 1943 State.MERGING // Offline the merge parents 1944 }; 1945 1946 private static final State[] STATES_EXPECTED_ON_CLOSED = { State.CLOSING, // Normal case 1947 State.CLOSED // Retrying 1948 }; 1949 1950 // This is for manually scheduled region assign, can add other states later if we find out other 1951 // usages 1952 private static final State[] STATES_EXPECTED_ON_ASSIGN = { State.CLOSED, State.OFFLINE }; 1953 1954 // We only allow unassign or move a region which is in OPEN state. 1955 private static final State[] STATES_EXPECTED_ON_UNASSIGN_OR_MOVE = { State.OPEN }; 1956 1957 // ============================================================================================ 1958 // Region Status update 1959 // Should only be called in TransitRegionStateProcedure(and related procedures), as the locking 1960 // and pre-assumptions are very tricky. 1961 // ============================================================================================ 1962 private void transitStateAndUpdate(RegionStateNode regionNode, RegionState.State newState, 1963 RegionState.State... expectedStates) throws IOException { 1964 RegionState.State state = regionNode.getState(); 1965 regionNode.transitionState(newState, expectedStates); 1966 boolean succ = false; 1967 try { 1968 regionStateStore.updateRegionLocation(regionNode); 1969 succ = true; 1970 } finally { 1971 if (!succ) { 1972 // revert 1973 regionNode.setState(state); 1974 } 1975 } 1976 } 1977 1978 // should be called within the synchronized block of RegionStateNode 1979 void regionOpening(RegionStateNode regionNode) throws IOException { 1980 // As in SCP, for performance reason, there is no TRSP attached with this region, we will not 1981 // update the region state, which means that the region could be in any state when we want to 1982 // assign it after a RS crash. So here we do not pass the expectedStates parameter. 1983 transitStateAndUpdate(regionNode, State.OPENING); 1984 regionStates.addRegionToServer(regionNode); 1985 // update the operation count metrics 1986 metrics.incrementOperationCounter(); 1987 } 1988 1989 // should be called under the RegionStateNode lock 1990 // The parameter 'giveUp' means whether we will try to open the region again, if it is true, then 1991 // we will persist the FAILED_OPEN state into hbase:meta. 1992 void regionFailedOpen(RegionStateNode regionNode, boolean giveUp) throws IOException { 1993 RegionState.State state = regionNode.getState(); 1994 ServerName regionLocation = regionNode.getRegionLocation(); 1995 if (giveUp) { 1996 regionNode.setState(State.FAILED_OPEN); 1997 regionNode.setRegionLocation(null); 1998 boolean succ = false; 1999 try { 2000 regionStateStore.updateRegionLocation(regionNode); 2001 succ = true; 2002 } finally { 2003 if (!succ) { 2004 // revert 2005 regionNode.setState(state); 2006 regionNode.setRegionLocation(regionLocation); 2007 } 2008 } 2009 } 2010 if (regionLocation != null) { 2011 regionStates.removeRegionFromServer(regionLocation, regionNode); 2012 } 2013 } 2014 2015 // should be called under the RegionStateNode lock 2016 void regionClosing(RegionStateNode regionNode) throws IOException { 2017 transitStateAndUpdate(regionNode, State.CLOSING, STATES_EXPECTED_ON_CLOSING); 2018 2019 RegionInfo hri = regionNode.getRegionInfo(); 2020 // Set meta has not initialized early. so people trying to create/edit tables will wait 2021 if (isMetaRegion(hri)) { 2022 setMetaAssigned(hri, false); 2023 } 2024 regionStates.addRegionToServer(regionNode); 2025 // update the operation count metrics 2026 metrics.incrementOperationCounter(); 2027 } 2028 2029 // for open and close, they will first be persist to the procedure store in 2030 // RegionRemoteProcedureBase. So here we will first change the in memory state as it is considered 2031 // as succeeded if the persistence to procedure store is succeeded, and then when the 2032 // RegionRemoteProcedureBase is woken up, we will persist the RegionStateNode to hbase:meta. 2033 2034 // should be called under the RegionStateNode lock 2035 void regionOpenedWithoutPersistingToMeta(RegionStateNode regionNode) throws IOException { 2036 regionNode.transitionState(State.OPEN, STATES_EXPECTED_ON_OPEN); 2037 RegionInfo regionInfo = regionNode.getRegionInfo(); 2038 regionStates.addRegionToServer(regionNode); 2039 regionStates.removeFromFailedOpen(regionInfo); 2040 } 2041 2042 // should be called under the RegionStateNode lock 2043 void regionClosedWithoutPersistingToMeta(RegionStateNode regionNode) throws IOException { 2044 ServerName regionLocation = regionNode.getRegionLocation(); 2045 regionNode.transitionState(State.CLOSED, STATES_EXPECTED_ON_CLOSED); 2046 regionNode.setRegionLocation(null); 2047 if (regionLocation != null) { 2048 regionNode.setLastHost(regionLocation); 2049 regionStates.removeRegionFromServer(regionLocation, regionNode); 2050 } 2051 } 2052 2053 // should be called under the RegionStateNode lock 2054 // for SCP 2055 public void regionClosedAbnormally(RegionStateNode regionNode) throws IOException { 2056 RegionState.State state = regionNode.getState(); 2057 ServerName regionLocation = regionNode.getRegionLocation(); 2058 regionNode.transitionState(State.ABNORMALLY_CLOSED); 2059 regionNode.setRegionLocation(null); 2060 boolean succ = false; 2061 try { 2062 regionStateStore.updateRegionLocation(regionNode); 2063 succ = true; 2064 } finally { 2065 if (!succ) { 2066 // revert 2067 regionNode.setState(state); 2068 regionNode.setRegionLocation(regionLocation); 2069 } 2070 } 2071 if (regionLocation != null) { 2072 regionNode.setLastHost(regionLocation); 2073 regionStates.removeRegionFromServer(regionLocation, regionNode); 2074 } 2075 } 2076 2077 void persistToMeta(RegionStateNode regionNode) throws IOException { 2078 regionStateStore.updateRegionLocation(regionNode); 2079 RegionInfo regionInfo = regionNode.getRegionInfo(); 2080 if (isMetaRegion(regionInfo) && regionNode.getState() == State.OPEN) { 2081 // Usually we'd set a table ENABLED at this stage but hbase:meta is ALWAYs enabled, it 2082 // can't be disabled -- so skip the RPC (besides... enabled is managed by TableStateManager 2083 // which is backed by hbase:meta... Avoid setting ENABLED to avoid having to update state 2084 // on table that contains state. 2085 setMetaAssigned(regionInfo, true); 2086 } 2087 } 2088 2089 // ============================================================================================ 2090 // The above methods can only be called in TransitRegionStateProcedure(and related procedures) 2091 // ============================================================================================ 2092 2093 public void markRegionAsSplit(final RegionInfo parent, final ServerName serverName, 2094 final RegionInfo daughterA, final RegionInfo daughterB) throws IOException { 2095 // Update hbase:meta. Parent will be marked offline and split up in hbase:meta. 2096 // The parent stays in regionStates until cleared when removed by CatalogJanitor. 2097 // Update its state in regionStates to it shows as offline and split when read 2098 // later figuring what regions are in a table and what are not: see 2099 // regionStates#getRegionsOfTable 2100 final RegionStateNode node = regionStates.getOrCreateRegionStateNode(parent); 2101 node.setState(State.SPLIT); 2102 final RegionStateNode nodeA = regionStates.getOrCreateRegionStateNode(daughterA); 2103 nodeA.setState(State.SPLITTING_NEW); 2104 final RegionStateNode nodeB = regionStates.getOrCreateRegionStateNode(daughterB); 2105 nodeB.setState(State.SPLITTING_NEW); 2106 2107 TableDescriptor td = master.getTableDescriptors().get(parent.getTable()); 2108 // TODO: here we just update the parent region info in meta, to set split and offline to true, 2109 // without changing the one in the region node. This is a bit confusing but the region info 2110 // field in RegionStateNode is not expected to be changed in the current design. Need to find a 2111 // possible way to address this problem, or at least adding more comments about the trick to 2112 // deal with this problem, that when you want to filter out split parent, you need to check both 2113 // the RegionState on whether it is split, and also the region info. If one of them matches then 2114 // it is a split parent. And usually only one of them can match, as after restart, the region 2115 // state will be changed from SPLIT to CLOSED. 2116 regionStateStore.splitRegion(parent, daughterA, daughterB, serverName, td); 2117 if (shouldAssignFavoredNodes(parent)) { 2118 List<ServerName> onlineServers = this.master.getServerManager().getOnlineServersList(); 2119 getFavoredNodePromoter().generateFavoredNodesForDaughter(onlineServers, parent, daughterA, 2120 daughterB); 2121 } 2122 } 2123 2124 /** 2125 * When called here, the merge has happened. The merged regions have been unassigned and the above 2126 * markRegionClosed has been called on each so they have been disassociated from a hosting Server. 2127 * The merged region will be open after this call. The merged regions are removed from hbase:meta 2128 * below. Later they are deleted from the filesystem by the catalog janitor running against 2129 * hbase:meta. It notices when the merged region no longer holds references to the old regions 2130 * (References are deleted after a compaction rewrites what the Reference points at but not until 2131 * the archiver chore runs, are the References removed). 2132 */ 2133 public void markRegionAsMerged(final RegionInfo child, final ServerName serverName, 2134 RegionInfo[] mergeParents) throws IOException { 2135 final RegionStateNode node = regionStates.getOrCreateRegionStateNode(child); 2136 node.setState(State.MERGED); 2137 for (RegionInfo ri : mergeParents) { 2138 regionStates.deleteRegion(ri); 2139 } 2140 TableDescriptor td = master.getTableDescriptors().get(child.getTable()); 2141 regionStateStore.mergeRegions(child, mergeParents, serverName, td); 2142 if (shouldAssignFavoredNodes(child)) { 2143 getFavoredNodePromoter().generateFavoredNodesForMergedRegion(child, mergeParents); 2144 } 2145 } 2146 2147 /* 2148 * Favored nodes should be applied only when FavoredNodes balancer is configured and the region 2149 * belongs to a non-system table. 2150 */ 2151 private boolean shouldAssignFavoredNodes(RegionInfo region) { 2152 return this.shouldAssignRegionsWithFavoredNodes 2153 && FavoredNodesManager.isFavoredNodeApplicable(region); 2154 } 2155 2156 // ============================================================================================ 2157 // Assign Queue (Assign/Balance) 2158 // ============================================================================================ 2159 private final ArrayList<RegionStateNode> pendingAssignQueue = new ArrayList<RegionStateNode>(); 2160 private final ReentrantLock assignQueueLock = new ReentrantLock(); 2161 private final Condition assignQueueFullCond = assignQueueLock.newCondition(); 2162 2163 /** 2164 * Add the assign operation to the assignment queue. The pending assignment operation will be 2165 * processed, and each region will be assigned by a server using the balancer. 2166 */ 2167 protected void queueAssign(final RegionStateNode regionNode) { 2168 regionNode.getProcedureEvent().suspend(); 2169 2170 // TODO: quick-start for meta and the other sys-tables? 2171 assignQueueLock.lock(); 2172 try { 2173 pendingAssignQueue.add(regionNode); 2174 if ( 2175 regionNode.isSystemTable() || pendingAssignQueue.size() == 1 2176 || pendingAssignQueue.size() >= assignDispatchWaitQueueMaxSize 2177 ) { 2178 assignQueueFullCond.signal(); 2179 } 2180 } finally { 2181 assignQueueLock.unlock(); 2182 } 2183 } 2184 2185 private void startAssignmentThread() { 2186 assignThread = new Thread(master.getServerName().toShortString()) { 2187 @Override 2188 public void run() { 2189 while (isRunning()) { 2190 processAssignQueue(); 2191 } 2192 pendingAssignQueue.clear(); 2193 } 2194 }; 2195 assignThread.setDaemon(true); 2196 assignThread.start(); 2197 } 2198 2199 private void stopAssignmentThread() { 2200 assignQueueSignal(); 2201 try { 2202 while (assignThread.isAlive()) { 2203 assignQueueSignal(); 2204 assignThread.join(250); 2205 } 2206 } catch (InterruptedException e) { 2207 LOG.warn("join interrupted", e); 2208 Thread.currentThread().interrupt(); 2209 } 2210 } 2211 2212 private void assignQueueSignal() { 2213 assignQueueLock.lock(); 2214 try { 2215 assignQueueFullCond.signal(); 2216 } finally { 2217 assignQueueLock.unlock(); 2218 } 2219 } 2220 2221 @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP") 2222 private HashMap<RegionInfo, RegionStateNode> waitOnAssignQueue() { 2223 HashMap<RegionInfo, RegionStateNode> regions = null; 2224 2225 assignQueueLock.lock(); 2226 try { 2227 if (pendingAssignQueue.isEmpty() && isRunning()) { 2228 assignQueueFullCond.await(); 2229 } 2230 2231 if (!isRunning()) { 2232 return null; 2233 } 2234 assignQueueFullCond.await(assignDispatchWaitMillis, TimeUnit.MILLISECONDS); 2235 regions = new HashMap<RegionInfo, RegionStateNode>(pendingAssignQueue.size()); 2236 for (RegionStateNode regionNode : pendingAssignQueue) { 2237 regions.put(regionNode.getRegionInfo(), regionNode); 2238 } 2239 pendingAssignQueue.clear(); 2240 } catch (InterruptedException e) { 2241 LOG.warn("got interrupted ", e); 2242 Thread.currentThread().interrupt(); 2243 } finally { 2244 assignQueueLock.unlock(); 2245 } 2246 return regions; 2247 } 2248 2249 private void processAssignQueue() { 2250 final HashMap<RegionInfo, RegionStateNode> regions = waitOnAssignQueue(); 2251 if (regions == null || regions.size() == 0 || !isRunning()) { 2252 return; 2253 } 2254 2255 if (LOG.isTraceEnabled()) { 2256 LOG.trace("PROCESS ASSIGN QUEUE regionCount=" + regions.size()); 2257 } 2258 2259 // TODO: Optimize balancer. pass a RegionPlan? 2260 final HashMap<RegionInfo, ServerName> retainMap = new HashMap<>(); 2261 final List<RegionInfo> userHRIs = new ArrayList<>(regions.size()); 2262 // Regions for system tables requiring reassignment 2263 final List<RegionInfo> systemHRIs = new ArrayList<>(); 2264 for (RegionStateNode regionStateNode : regions.values()) { 2265 boolean sysTable = regionStateNode.isSystemTable(); 2266 final List<RegionInfo> hris = sysTable ? systemHRIs : userHRIs; 2267 if (regionStateNode.getRegionLocation() != null) { 2268 retainMap.put(regionStateNode.getRegionInfo(), regionStateNode.getRegionLocation()); 2269 } else { 2270 hris.add(regionStateNode.getRegionInfo()); 2271 } 2272 } 2273 2274 // TODO: connect with the listener to invalidate the cache 2275 2276 // TODO use events 2277 List<ServerName> servers = master.getServerManager().createDestinationServersList(); 2278 for (int i = 0; servers.size() < 1; ++i) { 2279 // Report every fourth time around this loop; try not to flood log. 2280 if (i % 4 == 0) { 2281 LOG.warn("No servers available; cannot place " + regions.size() + " unassigned regions."); 2282 } 2283 2284 if (!isRunning()) { 2285 LOG.debug("Stopped! Dropping assign of " + regions.size() + " queued regions."); 2286 return; 2287 } 2288 Threads.sleep(250); 2289 servers = master.getServerManager().createDestinationServersList(); 2290 } 2291 2292 if (!systemHRIs.isEmpty()) { 2293 // System table regions requiring reassignment are present, get region servers 2294 // not available for system table regions 2295 final List<ServerName> excludeServers = getExcludedServersForSystemTable(); 2296 List<ServerName> serversForSysTables = 2297 servers.stream().filter(s -> !excludeServers.contains(s)).collect(Collectors.toList()); 2298 if (serversForSysTables.isEmpty()) { 2299 LOG.warn("Filtering old server versions and the excluded produced an empty set; " 2300 + "instead considering all candidate servers!"); 2301 } 2302 LOG.debug("Processing assignQueue; systemServersCount=" + serversForSysTables.size() 2303 + ", allServersCount=" + servers.size()); 2304 processAssignmentPlans(regions, null, systemHRIs, 2305 serversForSysTables.isEmpty() && !containsBogusAssignments(regions, systemHRIs) 2306 ? servers 2307 : serversForSysTables); 2308 } 2309 2310 processAssignmentPlans(regions, retainMap, userHRIs, servers); 2311 } 2312 2313 private boolean containsBogusAssignments(Map<RegionInfo, RegionStateNode> regions, 2314 List<RegionInfo> hirs) { 2315 for (RegionInfo ri : hirs) { 2316 if ( 2317 regions.get(ri).getRegionLocation() != null 2318 && regions.get(ri).getRegionLocation().equals(LoadBalancer.BOGUS_SERVER_NAME) 2319 ) { 2320 return true; 2321 } 2322 } 2323 return false; 2324 } 2325 2326 private void processAssignmentPlans(final HashMap<RegionInfo, RegionStateNode> regions, 2327 final HashMap<RegionInfo, ServerName> retainMap, final List<RegionInfo> hris, 2328 final List<ServerName> servers) { 2329 boolean isTraceEnabled = LOG.isTraceEnabled(); 2330 if (isTraceEnabled) { 2331 LOG.trace("Available servers count=" + servers.size() + ": " + servers); 2332 } 2333 2334 final LoadBalancer balancer = getBalancer(); 2335 // ask the balancer where to place regions 2336 if (retainMap != null && !retainMap.isEmpty()) { 2337 if (isTraceEnabled) { 2338 LOG.trace("retain assign regions=" + retainMap); 2339 } 2340 try { 2341 acceptPlan(regions, balancer.retainAssignment(retainMap, servers)); 2342 } catch (IOException e) { 2343 LOG.warn("unable to retain assignment", e); 2344 addToPendingAssignment(regions, retainMap.keySet()); 2345 } 2346 } 2347 2348 // TODO: Do we need to split retain and round-robin? 2349 // the retain seems to fallback to round-robin/random if the region is not in the map. 2350 if (!hris.isEmpty()) { 2351 Collections.sort(hris, RegionInfo.COMPARATOR); 2352 if (isTraceEnabled) { 2353 LOG.trace("round robin regions=" + hris); 2354 } 2355 try { 2356 acceptPlan(regions, balancer.roundRobinAssignment(hris, servers)); 2357 } catch (IOException e) { 2358 LOG.warn("unable to round-robin assignment", e); 2359 addToPendingAssignment(regions, hris); 2360 } 2361 } 2362 } 2363 2364 private void acceptPlan(final HashMap<RegionInfo, RegionStateNode> regions, 2365 final Map<ServerName, List<RegionInfo>> plan) throws HBaseIOException { 2366 final ProcedureEvent<?>[] events = new ProcedureEvent[regions.size()]; 2367 final long st = EnvironmentEdgeManager.currentTime(); 2368 2369 if (plan.isEmpty()) { 2370 throw new HBaseIOException("unable to compute plans for regions=" + regions.size()); 2371 } 2372 2373 int evcount = 0; 2374 for (Map.Entry<ServerName, List<RegionInfo>> entry : plan.entrySet()) { 2375 final ServerName server = entry.getKey(); 2376 for (RegionInfo hri : entry.getValue()) { 2377 final RegionStateNode regionNode = regions.get(hri); 2378 regionNode.setRegionLocation(server); 2379 if (server.equals(LoadBalancer.BOGUS_SERVER_NAME) && regionNode.isSystemTable()) { 2380 assignQueueLock.lock(); 2381 try { 2382 pendingAssignQueue.add(regionNode); 2383 } finally { 2384 assignQueueLock.unlock(); 2385 } 2386 } else { 2387 events[evcount++] = regionNode.getProcedureEvent(); 2388 } 2389 } 2390 } 2391 ProcedureEvent.wakeEvents(getProcedureScheduler(), events); 2392 2393 final long et = EnvironmentEdgeManager.currentTime(); 2394 if (LOG.isTraceEnabled()) { 2395 LOG.trace("ASSIGN ACCEPT " + events.length + " -> " + StringUtils.humanTimeDiff(et - st)); 2396 } 2397 } 2398 2399 private void addToPendingAssignment(final HashMap<RegionInfo, RegionStateNode> regions, 2400 final Collection<RegionInfo> pendingRegions) { 2401 assignQueueLock.lock(); 2402 try { 2403 for (RegionInfo hri : pendingRegions) { 2404 pendingAssignQueue.add(regions.get(hri)); 2405 } 2406 } finally { 2407 assignQueueLock.unlock(); 2408 } 2409 } 2410 2411 /** 2412 * For a given cluster with mixed versions of servers, get a list of servers with lower versions, 2413 * where system table regions should not be assigned to. For system table, we must assign regions 2414 * to a server with highest version. However, we can disable this exclusion using config: 2415 * "hbase.min.version.move.system.tables" if checkForMinVersion is true. Detailed explanation 2416 * available with definition of minVersionToMoveSysTables. 2417 * @return List of Excluded servers for System table regions. 2418 */ 2419 public List<ServerName> getExcludedServersForSystemTable() { 2420 // TODO: This should be a cached list kept by the ServerManager rather than calculated on each 2421 // move or system region assign. The RegionServerTracker keeps list of online Servers with 2422 // RegionServerInfo that includes Version. 2423 List<Pair<ServerName, String>> serverList = 2424 master.getServerManager().getOnlineServersList().stream() 2425 .map(s -> new Pair<>(s, master.getRegionServerVersion(s))).collect(Collectors.toList()); 2426 if (serverList.isEmpty()) { 2427 return new ArrayList<>(); 2428 } 2429 String highestVersion = Collections 2430 .max(serverList, (o1, o2) -> VersionInfo.compareVersion(o1.getSecond(), o2.getSecond())) 2431 .getSecond(); 2432 if (!DEFAULT_MIN_VERSION_MOVE_SYS_TABLES_CONFIG.equals(minVersionToMoveSysTables)) { 2433 int comparedValue = VersionInfo.compareVersion(minVersionToMoveSysTables, highestVersion); 2434 if (comparedValue > 0) { 2435 return new ArrayList<>(); 2436 } 2437 } 2438 return serverList.stream().filter(pair -> !pair.getSecond().equals(highestVersion)) 2439 .map(Pair::getFirst).collect(Collectors.toList()); 2440 } 2441 2442 MasterServices getMaster() { 2443 return master; 2444 } 2445 2446 /** Returns a snapshot of rsReports */ 2447 public Map<ServerName, Set<byte[]>> getRSReports() { 2448 Map<ServerName, Set<byte[]>> rsReportsSnapshot = new HashMap<>(); 2449 synchronized (rsReports) { 2450 rsReports.entrySet().forEach(e -> rsReportsSnapshot.put(e.getKey(), e.getValue())); 2451 } 2452 return rsReportsSnapshot; 2453 } 2454 2455 /** 2456 * Provide regions state count for given table. e.g howmany regions of give table are 2457 * opened/closed/rit etc 2458 * @param tableName TableName 2459 * @return region states count 2460 */ 2461 public RegionStatesCount getRegionStatesCount(TableName tableName) { 2462 int openRegionsCount = 0; 2463 int closedRegionCount = 0; 2464 int ritCount = 0; 2465 int splitRegionCount = 0; 2466 int totalRegionCount = 0; 2467 if (!isTableDisabled(tableName)) { 2468 final List<RegionState> states = regionStates.getTableRegionStates(tableName); 2469 for (RegionState regionState : states) { 2470 if (regionState.isOpened()) { 2471 openRegionsCount++; 2472 } else if (regionState.isClosed()) { 2473 closedRegionCount++; 2474 } else if (regionState.isSplit()) { 2475 splitRegionCount++; 2476 } 2477 } 2478 totalRegionCount = states.size(); 2479 ritCount = totalRegionCount - openRegionsCount - splitRegionCount; 2480 } 2481 return new RegionStatesCount.RegionStatesCountBuilder().setOpenRegions(openRegionsCount) 2482 .setClosedRegions(closedRegionCount).setSplitRegions(splitRegionCount) 2483 .setRegionsInTransition(ritCount).setTotalRegions(totalRegionCount).build(); 2484 } 2485 2486}