001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.assignment; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Collection; 023import java.util.Collections; 024import java.util.HashMap; 025import java.util.HashSet; 026import java.util.List; 027import java.util.Map; 028import java.util.Set; 029import java.util.concurrent.Future; 030import java.util.concurrent.TimeUnit; 031import java.util.concurrent.atomic.AtomicBoolean; 032import java.util.concurrent.locks.Condition; 033import java.util.concurrent.locks.ReentrantLock; 034import java.util.stream.Collectors; 035import java.util.stream.Stream; 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.hbase.DoNotRetryIOException; 038import org.apache.hadoop.hbase.HBaseIOException; 039import org.apache.hadoop.hbase.HConstants; 040import org.apache.hadoop.hbase.MetaTableAccessor; 041import org.apache.hadoop.hbase.PleaseHoldException; 042import org.apache.hadoop.hbase.ServerName; 043import org.apache.hadoop.hbase.TableName; 044import org.apache.hadoop.hbase.UnknownRegionException; 045import org.apache.hadoop.hbase.client.DoNotRetryRegionException; 046import org.apache.hadoop.hbase.client.MasterSwitchType; 047import org.apache.hadoop.hbase.client.RegionInfo; 048import org.apache.hadoop.hbase.client.RegionInfoBuilder; 049import org.apache.hadoop.hbase.client.RegionReplicaUtil; 050import org.apache.hadoop.hbase.client.RegionStatesCount; 051import org.apache.hadoop.hbase.client.Result; 052import org.apache.hadoop.hbase.client.ResultScanner; 053import org.apache.hadoop.hbase.client.Scan; 054import org.apache.hadoop.hbase.client.TableState; 055import org.apache.hadoop.hbase.exceptions.UnexpectedStateException; 056import org.apache.hadoop.hbase.favored.FavoredNodesManager; 057import org.apache.hadoop.hbase.favored.FavoredNodesPromoter; 058import org.apache.hadoop.hbase.master.LoadBalancer; 059import org.apache.hadoop.hbase.master.MasterServices; 060import org.apache.hadoop.hbase.master.MetricsAssignmentManager; 061import org.apache.hadoop.hbase.master.RegionPlan; 062import org.apache.hadoop.hbase.master.RegionState; 063import org.apache.hadoop.hbase.master.RegionState.State; 064import org.apache.hadoop.hbase.master.ServerManager; 065import org.apache.hadoop.hbase.master.TableStateManager; 066import org.apache.hadoop.hbase.master.balancer.FavoredStochasticBalancer; 067import org.apache.hadoop.hbase.master.procedure.HBCKServerCrashProcedure; 068import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 069import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler; 070import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait; 071import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure; 072import org.apache.hadoop.hbase.master.region.MasterRegion; 073import org.apache.hadoop.hbase.procedure2.Procedure; 074import org.apache.hadoop.hbase.procedure2.ProcedureEvent; 075import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; 076import org.apache.hadoop.hbase.procedure2.ProcedureInMemoryChore; 077import org.apache.hadoop.hbase.procedure2.util.StringUtils; 078import org.apache.hadoop.hbase.regionserver.SequenceId; 079import org.apache.hadoop.hbase.util.Bytes; 080import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 081import org.apache.hadoop.hbase.util.Pair; 082import org.apache.hadoop.hbase.util.Threads; 083import org.apache.hadoop.hbase.util.VersionInfo; 084import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; 085import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 086import org.apache.yetus.audience.InterfaceAudience; 087import org.apache.zookeeper.KeeperException; 088import org.slf4j.Logger; 089import org.slf4j.LoggerFactory; 090 091import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 092import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition; 093import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; 094import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest; 095import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse; 096 097/** 098 * The AssignmentManager is the coordinator for region assign/unassign operations. 099 * <ul> 100 * <li>In-memory states of regions and servers are stored in {@link RegionStates}.</li> 101 * <li>hbase:meta state updates are handled by {@link RegionStateStore}.</li> 102 * </ul> 103 * Regions are created by CreateTable, Split, Merge. Regions are deleted by DeleteTable, Split, 104 * Merge. Assigns are triggered by CreateTable, EnableTable, Split, Merge, ServerCrash. Unassigns 105 * are triggered by DisableTable, Split, Merge 106 */ 107@InterfaceAudience.Private 108public class AssignmentManager { 109 private static final Logger LOG = LoggerFactory.getLogger(AssignmentManager.class); 110 111 // TODO: AMv2 112 // - handle region migration from hbase1 to hbase2. 113 // - handle sys table assignment first (e.g. acl, namespace) 114 // - handle table priorities 115 // - If ServerBusyException trying to update hbase:meta, we abort the Master 116 // See updateRegionLocation in RegionStateStore. 117 // 118 // See also 119 // https://docs.google.com/document/d/1eVKa7FHdeoJ1-9o8yZcOTAQbv0u0bblBlCCzVSIn69g/edit#heading=h.ystjyrkbtoq5 120 // for other TODOs. 121 122 public static final String BOOTSTRAP_THREAD_POOL_SIZE_CONF_KEY = 123 "hbase.assignment.bootstrap.thread.pool.size"; 124 125 public static final String ASSIGN_DISPATCH_WAIT_MSEC_CONF_KEY = 126 "hbase.assignment.dispatch.wait.msec"; 127 private static final int DEFAULT_ASSIGN_DISPATCH_WAIT_MSEC = 150; 128 129 public static final String ASSIGN_DISPATCH_WAITQ_MAX_CONF_KEY = 130 "hbase.assignment.dispatch.wait.queue.max.size"; 131 private static final int DEFAULT_ASSIGN_DISPATCH_WAITQ_MAX = 100; 132 133 public static final String RIT_CHORE_INTERVAL_MSEC_CONF_KEY = 134 "hbase.assignment.rit.chore.interval.msec"; 135 private static final int DEFAULT_RIT_CHORE_INTERVAL_MSEC = 60 * 1000; 136 137 public static final String DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC_CONF_KEY = 138 "hbase.assignment.dead.region.metric.chore.interval.msec"; 139 private static final int DEFAULT_DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC = 120 * 1000; 140 141 public static final String ASSIGN_MAX_ATTEMPTS = "hbase.assignment.maximum.attempts"; 142 private static final int DEFAULT_ASSIGN_MAX_ATTEMPTS = Integer.MAX_VALUE; 143 144 public static final String ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS = 145 "hbase.assignment.retry.immediately.maximum.attempts"; 146 private static final int DEFAULT_ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS = 3; 147 148 /** Region in Transition metrics threshold time */ 149 public static final String METRICS_RIT_STUCK_WARNING_THRESHOLD = 150 "hbase.metrics.rit.stuck.warning.threshold"; 151 private static final int DEFAULT_RIT_STUCK_WARNING_THRESHOLD = 60 * 1000; 152 public static final String UNEXPECTED_STATE_REGION = "Unexpected state for "; 153 154 private final ProcedureEvent<?> metaAssignEvent = new ProcedureEvent<>("meta assign"); 155 private final ProcedureEvent<?> metaLoadEvent = new ProcedureEvent<>("meta load"); 156 157 private final MetricsAssignmentManager metrics; 158 private final RegionInTransitionChore ritChore; 159 private final DeadServerMetricRegionChore deadMetricChore; 160 private final MasterServices master; 161 162 private final AtomicBoolean running = new AtomicBoolean(false); 163 private final RegionStates regionStates = new RegionStates(); 164 private final RegionStateStore regionStateStore; 165 166 /** 167 * When the operator uses this configuration option, any version between the current cluster 168 * version and the value of "hbase.min.version.move.system.tables" does not trigger any 169 * auto-region movement. Auto-region movement here refers to auto-migration of system table 170 * regions to newer server versions. It is assumed that the configured range of versions does not 171 * require special handling of moving system table regions to higher versioned RegionServer. This 172 * auto-migration is done by {@link #checkIfShouldMoveSystemRegionAsync()}. Example: Let's assume 173 * the cluster is on version 1.4.0 and we have set "hbase.min.version.move.system.tables" as 174 * "2.0.0". Now if we upgrade one RegionServer on 1.4.0 cluster to 1.6.0 (< 2.0.0), then 175 * AssignmentManager will not move hbase:meta, hbase:namespace and other system table regions to 176 * newly brought up RegionServer 1.6.0 as part of auto-migration. However, if we upgrade one 177 * RegionServer on 1.4.0 cluster to 2.2.0 (> 2.0.0), then AssignmentManager will move all system 178 * table regions to newly brought up RegionServer 2.2.0 as part of auto-migration done by 179 * {@link #checkIfShouldMoveSystemRegionAsync()}. "hbase.min.version.move.system.tables" is 180 * introduced as part of HBASE-22923. 181 */ 182 private final String minVersionToMoveSysTables; 183 184 private static final String MIN_VERSION_MOVE_SYS_TABLES_CONFIG = 185 "hbase.min.version.move.system.tables"; 186 private static final String DEFAULT_MIN_VERSION_MOVE_SYS_TABLES_CONFIG = ""; 187 188 private final Map<ServerName, Set<byte[]>> rsReports = new HashMap<>(); 189 190 private final boolean shouldAssignRegionsWithFavoredNodes; 191 private final int assignDispatchWaitQueueMaxSize; 192 private final int assignDispatchWaitMillis; 193 private final int assignMaxAttempts; 194 private final int assignRetryImmediatelyMaxAttempts; 195 196 private final MasterRegion masterRegion; 197 198 private final Object checkIfShouldMoveSystemRegionLock = new Object(); 199 200 private Thread assignThread; 201 202 public AssignmentManager(MasterServices master, MasterRegion masterRegion) { 203 this(master, masterRegion, new RegionStateStore(master, masterRegion)); 204 } 205 206 AssignmentManager(MasterServices master, MasterRegion masterRegion, RegionStateStore stateStore) { 207 this.master = master; 208 this.regionStateStore = stateStore; 209 this.metrics = new MetricsAssignmentManager(); 210 this.masterRegion = masterRegion; 211 212 final Configuration conf = master.getConfiguration(); 213 214 // Only read favored nodes if using the favored nodes load balancer. 215 this.shouldAssignRegionsWithFavoredNodes = FavoredStochasticBalancer.class 216 .isAssignableFrom(conf.getClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, Object.class)); 217 218 this.assignDispatchWaitMillis = 219 conf.getInt(ASSIGN_DISPATCH_WAIT_MSEC_CONF_KEY, DEFAULT_ASSIGN_DISPATCH_WAIT_MSEC); 220 this.assignDispatchWaitQueueMaxSize = 221 conf.getInt(ASSIGN_DISPATCH_WAITQ_MAX_CONF_KEY, DEFAULT_ASSIGN_DISPATCH_WAITQ_MAX); 222 223 this.assignMaxAttempts = 224 Math.max(1, conf.getInt(ASSIGN_MAX_ATTEMPTS, DEFAULT_ASSIGN_MAX_ATTEMPTS)); 225 this.assignRetryImmediatelyMaxAttempts = conf.getInt(ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS, 226 DEFAULT_ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS); 227 228 int ritChoreInterval = 229 conf.getInt(RIT_CHORE_INTERVAL_MSEC_CONF_KEY, DEFAULT_RIT_CHORE_INTERVAL_MSEC); 230 this.ritChore = new RegionInTransitionChore(ritChoreInterval); 231 232 int deadRegionChoreInterval = conf.getInt(DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC_CONF_KEY, 233 DEFAULT_DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC); 234 if (deadRegionChoreInterval > 0) { 235 this.deadMetricChore = new DeadServerMetricRegionChore(deadRegionChoreInterval); 236 } else { 237 this.deadMetricChore = null; 238 } 239 minVersionToMoveSysTables = 240 conf.get(MIN_VERSION_MOVE_SYS_TABLES_CONFIG, DEFAULT_MIN_VERSION_MOVE_SYS_TABLES_CONFIG); 241 } 242 243 private void mirrorMetaLocations() throws IOException, KeeperException { 244 // For compatibility, mirror the meta region state to zookeeper 245 // And we still need to use zookeeper to publish the meta region locations to region 246 // server, so they can serve as ClientMetaService 247 ZKWatcher zk = master.getZooKeeper(); 248 if (zk == null || !zk.getRecoverableZooKeeper().getState().isAlive()) { 249 // this is possible in tests, we do not provide a zk watcher or the zk watcher has been closed 250 return; 251 } 252 Collection<RegionStateNode> metaStates = regionStates.getRegionStateNodes(); 253 for (RegionStateNode metaState : metaStates) { 254 MetaTableLocator.setMetaLocation(zk, metaState.getRegionLocation(), 255 metaState.getRegionInfo().getReplicaId(), metaState.getState()); 256 } 257 int replicaCount = metaStates.size(); 258 // remove extra mirror locations 259 for (String znode : zk.getMetaReplicaNodes()) { 260 int replicaId = zk.getZNodePaths().getMetaReplicaIdFromZNode(znode); 261 if (replicaId >= replicaCount) { 262 MetaTableLocator.deleteMetaLocation(zk, replicaId); 263 } 264 } 265 } 266 267 public void start() throws IOException, KeeperException { 268 if (!running.compareAndSet(false, true)) { 269 return; 270 } 271 272 LOG.trace("Starting assignment manager"); 273 274 // Start the Assignment Thread 275 startAssignmentThread(); 276 // load meta region states. 277 // here we are still in the early steps of active master startup. There is only one thread(us) 278 // can access AssignmentManager and create region node, so here we do not need to lock the 279 // region node. 280 try (ResultScanner scanner = 281 masterRegion.getScanner(new Scan().addFamily(HConstants.CATALOG_FAMILY))) { 282 for (;;) { 283 Result result = scanner.next(); 284 if (result == null) { 285 break; 286 } 287 RegionStateStore 288 .visitMetaEntry((r, regionInfo, state, regionLocation, lastHost, openSeqNum) -> { 289 RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo); 290 regionNode.setState(state); 291 regionNode.setLastHost(lastHost); 292 regionNode.setRegionLocation(regionLocation); 293 regionNode.setOpenSeqNum(openSeqNum); 294 if (regionNode.getProcedure() != null) { 295 regionNode.getProcedure().stateLoaded(this, regionNode); 296 } 297 if (regionLocation != null) { 298 regionStates.addRegionToServer(regionNode); 299 } 300 if (RegionReplicaUtil.isDefaultReplica(regionInfo.getReplicaId())) { 301 setMetaAssigned(regionInfo, state == State.OPEN); 302 } 303 LOG.debug("Loaded hbase:meta {}", regionNode); 304 }, result); 305 } 306 } 307 mirrorMetaLocations(); 308 } 309 310 /** 311 * Create RegionStateNode based on the TRSP list, and attach the TRSP to the RegionStateNode. 312 * <p> 313 * This is used to restore the RIT region list, so we do not need to restore it in the loadingMeta 314 * method below. And it is also very important as now before submitting a TRSP, we need to attach 315 * it to the RegionStateNode, which acts like a guard, so we need to restore this information at 316 * the very beginning, before we start processing any procedures. 317 */ 318 public void setupRIT(List<TransitRegionStateProcedure> procs) { 319 procs.forEach(proc -> { 320 RegionInfo regionInfo = proc.getRegion(); 321 RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo); 322 TransitRegionStateProcedure existingProc = regionNode.getProcedure(); 323 if (existingProc != null) { 324 // This is possible, as we will detach the procedure from the RSN before we 325 // actually finish the procedure. This is because that, we will detach the TRSP from the RSN 326 // during execution, at that time, the procedure has not been marked as done in the pv2 327 // framework yet, so it is possible that we schedule a new TRSP immediately and when 328 // arriving here, we will find out that there are multiple TRSPs for the region. But we can 329 // make sure that, only the last one can take the charge, the previous ones should have all 330 // been finished already. So here we will compare the proc id, the greater one will win. 331 if (existingProc.getProcId() < proc.getProcId()) { 332 // the new one wins, unset and set it to the new one below 333 regionNode.unsetProcedure(existingProc); 334 } else { 335 // the old one wins, skip 336 return; 337 } 338 } 339 LOG.info("Attach {} to {} to restore RIT", proc, regionNode); 340 regionNode.setProcedure(proc); 341 }); 342 } 343 344 public void stop() { 345 if (!running.compareAndSet(true, false)) { 346 return; 347 } 348 349 LOG.info("Stopping assignment manager"); 350 351 // The AM is started before the procedure executor, 352 // but the actual work will be loaded/submitted only once we have the executor 353 final boolean hasProcExecutor = master.getMasterProcedureExecutor() != null; 354 355 // Remove the RIT chore 356 if (hasProcExecutor) { 357 master.getMasterProcedureExecutor().removeChore(this.ritChore); 358 if (this.deadMetricChore != null) { 359 master.getMasterProcedureExecutor().removeChore(this.deadMetricChore); 360 } 361 } 362 363 // Stop the Assignment Thread 364 stopAssignmentThread(); 365 366 // Stop the RegionStateStore 367 regionStates.clear(); 368 369 // Update meta events (for testing) 370 if (hasProcExecutor) { 371 metaLoadEvent.suspend(); 372 for (RegionInfo hri : getMetaRegionSet()) { 373 setMetaAssigned(hri, false); 374 } 375 } 376 } 377 378 public boolean isRunning() { 379 return running.get(); 380 } 381 382 public Configuration getConfiguration() { 383 return master.getConfiguration(); 384 } 385 386 public MetricsAssignmentManager getAssignmentManagerMetrics() { 387 return metrics; 388 } 389 390 private LoadBalancer getBalancer() { 391 return master.getLoadBalancer(); 392 } 393 394 private MasterProcedureEnv getProcedureEnvironment() { 395 return master.getMasterProcedureExecutor().getEnvironment(); 396 } 397 398 private MasterProcedureScheduler getProcedureScheduler() { 399 return getProcedureEnvironment().getProcedureScheduler(); 400 } 401 402 int getAssignMaxAttempts() { 403 return assignMaxAttempts; 404 } 405 406 int getAssignRetryImmediatelyMaxAttempts() { 407 return assignRetryImmediatelyMaxAttempts; 408 } 409 410 public RegionStates getRegionStates() { 411 return regionStates; 412 } 413 414 /** 415 * Returns the regions hosted by the specified server. 416 * <p/> 417 * Notice that, for SCP, after we submit the SCP, no one can change the region list for the 418 * ServerStateNode so we do not need any locks here. And for other usage, this can only give you a 419 * snapshot of the current region list for this server, which means, right after you get the 420 * region list, new regions may be moved to this server or some regions may be moved out from this 421 * server, so you should not use it critically if you need strong consistency. 422 */ 423 public List<RegionInfo> getRegionsOnServer(ServerName serverName) { 424 ServerStateNode serverInfo = regionStates.getServerNode(serverName); 425 if (serverInfo == null) { 426 return Collections.emptyList(); 427 } 428 return serverInfo.getRegionInfoList(); 429 } 430 431 private RegionInfo getRegionInfo(RegionStateNode rsn) { 432 if (rsn.isSplit() && !rsn.getRegionInfo().isSplit()) { 433 // see the comments in markRegionAsSplit on why we need to do this converting. 434 return RegionInfoBuilder.newBuilder(rsn.getRegionInfo()).setSplit(true).setOffline(true) 435 .build(); 436 } else { 437 return rsn.getRegionInfo(); 438 } 439 } 440 441 private Stream<RegionStateNode> getRegionStateNodes(TableName tableName, 442 boolean excludeOfflinedSplitParents) { 443 Stream<RegionStateNode> stream = regionStates.getTableRegionStateNodes(tableName).stream(); 444 if (excludeOfflinedSplitParents) { 445 return stream.filter(rsn -> !rsn.isSplit()); 446 } else { 447 return stream; 448 } 449 } 450 451 public List<RegionInfo> getTableRegions(TableName tableName, 452 boolean excludeOfflinedSplitParents) { 453 return getRegionStateNodes(tableName, excludeOfflinedSplitParents).map(this::getRegionInfo) 454 .collect(Collectors.toList()); 455 } 456 457 public List<Pair<RegionInfo, ServerName>> getTableRegionsAndLocations(TableName tableName, 458 boolean excludeOfflinedSplitParents) { 459 return getRegionStateNodes(tableName, excludeOfflinedSplitParents) 460 .map(rsn -> Pair.newPair(getRegionInfo(rsn), rsn.getRegionLocation())) 461 .collect(Collectors.toList()); 462 } 463 464 public RegionStateStore getRegionStateStore() { 465 return regionStateStore; 466 } 467 468 public List<ServerName> getFavoredNodes(final RegionInfo regionInfo) { 469 return this.shouldAssignRegionsWithFavoredNodes 470 ? ((FavoredStochasticBalancer) getBalancer()).getFavoredNodes(regionInfo) 471 : ServerName.EMPTY_SERVER_LIST; 472 } 473 474 // ============================================================================================ 475 // Table State Manager helpers 476 // ============================================================================================ 477 private TableStateManager getTableStateManager() { 478 return master.getTableStateManager(); 479 } 480 481 private boolean isTableEnabled(final TableName tableName) { 482 return getTableStateManager().isTableState(tableName, TableState.State.ENABLED); 483 } 484 485 private boolean isTableDisabled(final TableName tableName) { 486 return getTableStateManager().isTableState(tableName, TableState.State.DISABLED, 487 TableState.State.DISABLING); 488 } 489 490 // ============================================================================================ 491 // META Helpers 492 // ============================================================================================ 493 private boolean isMetaRegion(final RegionInfo regionInfo) { 494 return regionInfo.isMetaRegion(); 495 } 496 497 public boolean isMetaRegion(final byte[] regionName) { 498 return getMetaRegionFromName(regionName) != null; 499 } 500 501 public RegionInfo getMetaRegionFromName(final byte[] regionName) { 502 for (RegionInfo hri : getMetaRegionSet()) { 503 if (Bytes.equals(hri.getRegionName(), regionName)) { 504 return hri; 505 } 506 } 507 return null; 508 } 509 510 public boolean isCarryingMeta(final ServerName serverName) { 511 // TODO: handle multiple meta 512 return isCarryingRegion(serverName, RegionInfoBuilder.FIRST_META_REGIONINFO); 513 } 514 515 private boolean isCarryingRegion(final ServerName serverName, final RegionInfo regionInfo) { 516 // TODO: check for state? 517 final RegionStateNode node = regionStates.getRegionStateNode(regionInfo); 518 return (node != null && serverName.equals(node.getRegionLocation())); 519 } 520 521 private RegionInfo getMetaForRegion(final RegionInfo regionInfo) { 522 // if (regionInfo.isMetaRegion()) return regionInfo; 523 // TODO: handle multiple meta. if the region provided is not meta lookup 524 // which meta the region belongs to. 525 return RegionInfoBuilder.FIRST_META_REGIONINFO; 526 } 527 528 // TODO: handle multiple meta. 529 private static final Set<RegionInfo> META_REGION_SET = 530 Collections.singleton(RegionInfoBuilder.FIRST_META_REGIONINFO); 531 532 public Set<RegionInfo> getMetaRegionSet() { 533 return META_REGION_SET; 534 } 535 536 // ============================================================================================ 537 // META Event(s) helpers 538 // ============================================================================================ 539 /** 540 * Notice that, this only means the meta region is available on a RS, but the AM may still be 541 * loading the region states from meta, so usually you need to check {@link #isMetaLoaded()} first 542 * before checking this method, unless you can make sure that your piece of code can only be 543 * executed after AM builds the region states. 544 * @see #isMetaLoaded() 545 */ 546 public boolean isMetaAssigned() { 547 return metaAssignEvent.isReady(); 548 } 549 550 public boolean isMetaRegionInTransition() { 551 return !isMetaAssigned(); 552 } 553 554 /** 555 * Notice that this event does not mean the AM has already finished region state rebuilding. See 556 * the comment of {@link #isMetaAssigned()} for more details. 557 * @see #isMetaAssigned() 558 */ 559 public boolean waitMetaAssigned(Procedure<?> proc, RegionInfo regionInfo) { 560 return getMetaAssignEvent(getMetaForRegion(regionInfo)).suspendIfNotReady(proc); 561 } 562 563 private void setMetaAssigned(RegionInfo metaRegionInfo, boolean assigned) { 564 assert isMetaRegion(metaRegionInfo) : "unexpected non-meta region " + metaRegionInfo; 565 ProcedureEvent<?> metaAssignEvent = getMetaAssignEvent(metaRegionInfo); 566 if (assigned) { 567 metaAssignEvent.wake(getProcedureScheduler()); 568 } else { 569 metaAssignEvent.suspend(); 570 } 571 } 572 573 private ProcedureEvent<?> getMetaAssignEvent(RegionInfo metaRegionInfo) { 574 assert isMetaRegion(metaRegionInfo) : "unexpected non-meta region " + metaRegionInfo; 575 // TODO: handle multiple meta. 576 return metaAssignEvent; 577 } 578 579 /** 580 * Wait until AM finishes the meta loading, i.e, the region states rebuilding. 581 * @see #isMetaLoaded() 582 * @see #waitMetaAssigned(Procedure, RegionInfo) 583 */ 584 public boolean waitMetaLoaded(Procedure<?> proc) { 585 return metaLoadEvent.suspendIfNotReady(proc); 586 } 587 588 /** 589 * This method will be called in master initialization method after calling 590 * {@link #processOfflineRegions()}, as in processOfflineRegions we will generate assign 591 * procedures for offline regions, which may be conflict with creating table. 592 * <p/> 593 * This is a bit dirty, should be reconsidered after we decide whether to keep the 594 * {@link #processOfflineRegions()} method. 595 */ 596 public void wakeMetaLoadedEvent() { 597 metaLoadEvent.wake(getProcedureScheduler()); 598 assert isMetaLoaded() : "expected meta to be loaded"; 599 } 600 601 /** 602 * Return whether AM finishes the meta loading, i.e, the region states rebuilding. 603 * @see #isMetaAssigned() 604 * @see #waitMetaLoaded(Procedure) 605 */ 606 public boolean isMetaLoaded() { 607 return metaLoadEvent.isReady(); 608 } 609 610 /** 611 * Start a new thread to check if there are region servers whose versions are higher than others. 612 * If so, move all system table regions to RS with the highest version to keep compatibility. The 613 * reason is, RS in new version may not be able to access RS in old version when there are some 614 * incompatible changes. 615 * <p> 616 * This method is called when a new RegionServer is added to cluster only. 617 * </p> 618 */ 619 public void checkIfShouldMoveSystemRegionAsync() { 620 // TODO: Fix this thread. If a server is killed and a new one started, this thread thinks that 621 // it should 'move' the system tables from the old server to the new server but 622 // ServerCrashProcedure is on it; and it will take care of the assign without dataloss. 623 if (this.master.getServerManager().countOfRegionServers() <= 1) { 624 return; 625 } 626 // This thread used to run whenever there was a change in the cluster. The ZooKeeper 627 // childrenChanged notification came in before the nodeDeleted message and so this method 628 // cold run before a ServerCrashProcedure could run. That meant that this thread could see 629 // a Crashed Server before ServerCrashProcedure and it could find system regions on the 630 // crashed server and go move them before ServerCrashProcedure had a chance; could be 631 // dataloss too if WALs were not recovered. 632 new Thread(() -> { 633 try { 634 synchronized (checkIfShouldMoveSystemRegionLock) { 635 List<RegionPlan> plans = new ArrayList<>(); 636 // TODO: I don't think this code does a good job if all servers in cluster have same 637 // version. It looks like it will schedule unnecessary moves. 638 for (ServerName server : getExcludedServersForSystemTable()) { 639 if (master.getServerManager().isServerDead(server)) { 640 // TODO: See HBASE-18494 and HBASE-18495. Though getExcludedServersForSystemTable() 641 // considers only online servers, the server could be queued for dead server 642 // processing. As region assignments for crashed server is handled by 643 // ServerCrashProcedure, do NOT handle them here. The goal is to handle this through 644 // regular flow of LoadBalancer as a favored node and not to have this special 645 // handling. 646 continue; 647 } 648 List<RegionInfo> regionsShouldMove = getSystemTables(server); 649 if (!regionsShouldMove.isEmpty()) { 650 for (RegionInfo regionInfo : regionsShouldMove) { 651 // null value for dest forces destination server to be selected by balancer 652 RegionPlan plan = new RegionPlan(regionInfo, server, null); 653 if (regionInfo.isMetaRegion()) { 654 // Must move meta region first. 655 LOG.info("Async MOVE of {} to newer Server={}", regionInfo.getEncodedName(), 656 server); 657 moveAsync(plan); 658 } else { 659 plans.add(plan); 660 } 661 } 662 } 663 for (RegionPlan plan : plans) { 664 LOG.info("Async MOVE of {} to newer Server={}", plan.getRegionInfo().getEncodedName(), 665 server); 666 moveAsync(plan); 667 } 668 } 669 } 670 } catch (Throwable t) { 671 LOG.error(t.toString(), t); 672 } 673 }).start(); 674 } 675 676 private List<RegionInfo> getSystemTables(ServerName serverName) { 677 ServerStateNode serverNode = regionStates.getServerNode(serverName); 678 if (serverNode == null) { 679 return Collections.emptyList(); 680 } 681 return serverNode.getSystemRegionInfoList(); 682 } 683 684 private void preTransitCheck(RegionStateNode regionNode, RegionState.State[] expectedStates) 685 throws HBaseIOException { 686 if (regionNode.getProcedure() != null) { 687 throw new HBaseIOException( 688 regionNode + " is currently in transition; pid=" + regionNode.getProcedure().getProcId()); 689 } 690 if (!regionNode.isInState(expectedStates)) { 691 throw new DoNotRetryRegionException(UNEXPECTED_STATE_REGION + regionNode); 692 } 693 if (isTableDisabled(regionNode.getTable())) { 694 throw new DoNotRetryIOException(regionNode.getTable() + " is disabled for " + regionNode); 695 } 696 } 697 698 /** 699 * Create an assign TransitRegionStateProcedure. Makes sure of RegionState. Throws exception if 700 * not appropriate UNLESS override is set. Used by hbck2 but also by straightline 701 * {@link #assign(RegionInfo, ServerName)} and {@link #assignAsync(RegionInfo, ServerName)}. 702 * @see #createAssignProcedure(RegionStateNode, ServerName) for a version that does NO checking 703 * used when only when no checking needed. 704 * @param override If false, check RegionState is appropriate for assign; if not throw exception. 705 */ 706 private TransitRegionStateProcedure createAssignProcedure(RegionInfo regionInfo, ServerName sn, 707 boolean override) throws IOException { 708 RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo); 709 regionNode.lock(); 710 try { 711 if (override) { 712 if (regionNode.getProcedure() != null) { 713 regionNode.unsetProcedure(regionNode.getProcedure()); 714 } 715 } else { 716 preTransitCheck(regionNode, STATES_EXPECTED_ON_ASSIGN); 717 } 718 assert regionNode.getProcedure() == null; 719 return regionNode.setProcedure( 720 TransitRegionStateProcedure.assign(getProcedureEnvironment(), regionInfo, sn)); 721 } finally { 722 regionNode.unlock(); 723 } 724 } 725 726 /** 727 * Create an assign TransitRegionStateProcedure. Does NO checking of RegionState. Presumes 728 * appriopriate state ripe for assign. 729 * @see #createAssignProcedure(RegionInfo, ServerName, boolean) 730 */ 731 private TransitRegionStateProcedure createAssignProcedure(RegionStateNode regionNode, 732 ServerName targetServer) { 733 regionNode.lock(); 734 try { 735 return regionNode.setProcedure(TransitRegionStateProcedure.assign(getProcedureEnvironment(), 736 regionNode.getRegionInfo(), targetServer)); 737 } finally { 738 regionNode.unlock(); 739 } 740 } 741 742 public long assign(RegionInfo regionInfo, ServerName sn) throws IOException { 743 TransitRegionStateProcedure proc = createAssignProcedure(regionInfo, sn, false); 744 ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc); 745 return proc.getProcId(); 746 } 747 748 public long assign(RegionInfo regionInfo) throws IOException { 749 return assign(regionInfo, null); 750 } 751 752 /** 753 * Submits a procedure that assigns a region to a target server without waiting for it to finish 754 * @param regionInfo the region we would like to assign 755 * @param sn target server name 756 */ 757 public Future<byte[]> assignAsync(RegionInfo regionInfo, ServerName sn) throws IOException { 758 return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), 759 createAssignProcedure(regionInfo, sn, false)); 760 } 761 762 /** 763 * Submits a procedure that assigns a region without waiting for it to finish 764 * @param regionInfo the region we would like to assign 765 */ 766 public Future<byte[]> assignAsync(RegionInfo regionInfo) throws IOException { 767 return assignAsync(regionInfo, null); 768 } 769 770 public long unassign(RegionInfo regionInfo) throws IOException { 771 RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo); 772 if (regionNode == null) { 773 throw new UnknownRegionException("No RegionState found for " + regionInfo.getEncodedName()); 774 } 775 TransitRegionStateProcedure proc; 776 regionNode.lock(); 777 try { 778 preTransitCheck(regionNode, STATES_EXPECTED_ON_UNASSIGN_OR_MOVE); 779 proc = TransitRegionStateProcedure.unassign(getProcedureEnvironment(), regionInfo); 780 regionNode.setProcedure(proc); 781 } finally { 782 regionNode.unlock(); 783 } 784 ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc); 785 return proc.getProcId(); 786 } 787 788 public TransitRegionStateProcedure createMoveRegionProcedure(RegionInfo regionInfo, 789 ServerName targetServer) throws HBaseIOException { 790 RegionStateNode regionNode = this.regionStates.getRegionStateNode(regionInfo); 791 if (regionNode == null) { 792 throw new UnknownRegionException( 793 "No RegionStateNode found for " + regionInfo.getEncodedName() + "(Closed/Deleted?)"); 794 } 795 TransitRegionStateProcedure proc; 796 regionNode.lock(); 797 try { 798 preTransitCheck(regionNode, STATES_EXPECTED_ON_UNASSIGN_OR_MOVE); 799 regionNode.checkOnline(); 800 proc = TransitRegionStateProcedure.move(getProcedureEnvironment(), regionInfo, targetServer); 801 regionNode.setProcedure(proc); 802 } finally { 803 regionNode.unlock(); 804 } 805 return proc; 806 } 807 808 public void move(RegionInfo regionInfo) throws IOException { 809 TransitRegionStateProcedure proc = createMoveRegionProcedure(regionInfo, null); 810 ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc); 811 } 812 813 public Future<byte[]> moveAsync(RegionPlan regionPlan) throws HBaseIOException { 814 TransitRegionStateProcedure proc = 815 createMoveRegionProcedure(regionPlan.getRegionInfo(), regionPlan.getDestination()); 816 return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), proc); 817 } 818 819 public Future<byte[]> balance(RegionPlan regionPlan) throws HBaseIOException { 820 ServerName current = 821 this.getRegionStates().getRegionAssignments().get(regionPlan.getRegionInfo()); 822 if (current == null || !current.equals(regionPlan.getSource())) { 823 LOG.debug("Skip region plan {}, source server not match, current region location is {}", 824 regionPlan, current == null ? "(null)" : current); 825 return null; 826 } 827 return moveAsync(regionPlan); 828 } 829 830 // ============================================================================================ 831 // RegionTransition procedures helpers 832 // ============================================================================================ 833 834 /** 835 * Create round-robin assigns. Use on table creation to distribute out regions across cluster. 836 * @return AssignProcedures made out of the passed in <code>hris</code> and a call to the balancer 837 * to populate the assigns with targets chosen using round-robin (default balancer 838 * scheme). If at assign-time, the target chosen is no longer up, thats fine, the 839 * AssignProcedure will ask the balancer for a new target, and so on. 840 */ 841 public TransitRegionStateProcedure[] createRoundRobinAssignProcedures(List<RegionInfo> hris, 842 List<ServerName> serversToExclude) { 843 if (hris.isEmpty()) { 844 return new TransitRegionStateProcedure[0]; 845 } 846 847 if ( 848 serversToExclude != null && this.master.getServerManager().getOnlineServersList().size() == 1 849 ) { 850 LOG.debug("Only one region server found and hence going ahead with the assignment"); 851 serversToExclude = null; 852 } 853 try { 854 // Ask the balancer to assign our regions. Pass the regions en masse. The balancer can do 855 // a better job if it has all the assignments in the one lump. 856 Map<ServerName, List<RegionInfo>> assignments = getBalancer().roundRobinAssignment(hris, 857 this.master.getServerManager().createDestinationServersList(serversToExclude)); 858 // Return mid-method! 859 return createAssignProcedures(assignments); 860 } catch (HBaseIOException hioe) { 861 LOG.warn("Failed roundRobinAssignment", hioe); 862 } 863 // If an error above, fall-through to this simpler assign. Last resort. 864 return createAssignProcedures(hris); 865 } 866 867 /** 868 * Create round-robin assigns. Use on table creation to distribute out regions across cluster. 869 * @return AssignProcedures made out of the passed in <code>hris</code> and a call to the balancer 870 * to populate the assigns with targets chosen using round-robin (default balancer 871 * scheme). If at assign-time, the target chosen is no longer up, thats fine, the 872 * AssignProcedure will ask the balancer for a new target, and so on. 873 */ 874 public TransitRegionStateProcedure[] createRoundRobinAssignProcedures(List<RegionInfo> hris) { 875 return createRoundRobinAssignProcedures(hris, null); 876 } 877 878 static int compare(TransitRegionStateProcedure left, TransitRegionStateProcedure right) { 879 if (left.getRegion().isMetaRegion()) { 880 if (right.getRegion().isMetaRegion()) { 881 return RegionInfo.COMPARATOR.compare(left.getRegion(), right.getRegion()); 882 } 883 return -1; 884 } else if (right.getRegion().isMetaRegion()) { 885 return +1; 886 } 887 if (left.getRegion().getTable().isSystemTable()) { 888 if (right.getRegion().getTable().isSystemTable()) { 889 return RegionInfo.COMPARATOR.compare(left.getRegion(), right.getRegion()); 890 } 891 return -1; 892 } else if (right.getRegion().getTable().isSystemTable()) { 893 return +1; 894 } 895 return RegionInfo.COMPARATOR.compare(left.getRegion(), right.getRegion()); 896 } 897 898 /** 899 * Create one TransitRegionStateProcedure to assign a region w/o specifying a target server. This 900 * method is called from HBCK2. 901 * @return an assign or null 902 */ 903 public TransitRegionStateProcedure createOneAssignProcedure(RegionInfo ri, boolean override) { 904 TransitRegionStateProcedure trsp = null; 905 try { 906 trsp = createAssignProcedure(ri, null, override); 907 } catch (IOException ioe) { 908 LOG.info( 909 "Failed {} assign, override={}" 910 + (override ? "" : "; set override to by-pass state checks."), 911 ri.getEncodedName(), override, ioe); 912 } 913 return trsp; 914 } 915 916 /** 917 * Create one TransitRegionStateProcedure to unassign a region. This method is called from HBCK2. 918 * @return an unassign or null 919 */ 920 public TransitRegionStateProcedure createOneUnassignProcedure(RegionInfo ri, boolean override) { 921 RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(ri); 922 TransitRegionStateProcedure trsp = null; 923 regionNode.lock(); 924 try { 925 if (override) { 926 if (regionNode.getProcedure() != null) { 927 regionNode.unsetProcedure(regionNode.getProcedure()); 928 } 929 } else { 930 // This is where we could throw an exception; i.e. override is false. 931 preTransitCheck(regionNode, STATES_EXPECTED_ON_UNASSIGN_OR_MOVE); 932 } 933 assert regionNode.getProcedure() == null; 934 trsp = 935 TransitRegionStateProcedure.unassign(getProcedureEnvironment(), regionNode.getRegionInfo()); 936 regionNode.setProcedure(trsp); 937 } catch (IOException ioe) { 938 // 'override' must be false here. 939 LOG.info("Failed {} unassign, override=false; set override to by-pass state checks.", 940 ri.getEncodedName(), ioe); 941 } finally { 942 regionNode.unlock(); 943 } 944 return trsp; 945 } 946 947 /** 948 * Create an array of TransitRegionStateProcedure w/o specifying a target server. Used as fallback 949 * of caller is unable to do {@link #createAssignProcedures(Map)}. 950 * <p/> 951 * If no target server, at assign time, we will try to use the former location of the region if 952 * one exists. This is how we 'retain' the old location across a server restart. 953 * <p/> 954 * Should only be called when you can make sure that no one can touch these regions other than 955 * you. For example, when you are creating or enabling table. Presumes all Regions are in 956 * appropriate state ripe for assign; no checking of Region state is done in here. 957 * @see #createAssignProcedures(Map) 958 */ 959 public TransitRegionStateProcedure[] createAssignProcedures(List<RegionInfo> hris) { 960 return hris.stream().map(hri -> regionStates.getOrCreateRegionStateNode(hri)) 961 .map(regionNode -> createAssignProcedure(regionNode, null)).sorted(AssignmentManager::compare) 962 .toArray(TransitRegionStateProcedure[]::new); 963 } 964 965 /** 966 * Tied to {@link #createAssignProcedures(List)} in that it is called if caller is unable to run 967 * this method. Presumes all Regions are in appropriate state ripe for assign; no checking of 968 * Region state is done in here. 969 * @param assignments Map of assignments from which we produce an array of AssignProcedures. 970 * @return Assignments made from the passed in <code>assignments</code> 971 * @see #createAssignProcedures(List) 972 */ 973 private TransitRegionStateProcedure[] 974 createAssignProcedures(Map<ServerName, List<RegionInfo>> assignments) { 975 return assignments.entrySet().stream() 976 .flatMap(e -> e.getValue().stream().map(hri -> regionStates.getOrCreateRegionStateNode(hri)) 977 .map(regionNode -> createAssignProcedure(regionNode, e.getKey()))) 978 .sorted(AssignmentManager::compare).toArray(TransitRegionStateProcedure[]::new); 979 } 980 981 // for creating unassign TRSP when disabling a table or closing excess region replicas 982 private TransitRegionStateProcedure forceCreateUnssignProcedure(RegionStateNode regionNode) { 983 regionNode.lock(); 984 try { 985 if (regionNode.isInState(State.OFFLINE, State.CLOSED, State.SPLIT)) { 986 return null; 987 } 988 // in general, a split parent should be in CLOSED or SPLIT state, but anyway, let's check it 989 // here for safety 990 if (regionNode.getRegionInfo().isSplit()) { 991 LOG.warn("{} is a split parent but not in CLOSED or SPLIT state", regionNode); 992 return null; 993 } 994 // As in DisableTableProcedure or ModifyTableProcedure, we will hold the xlock for table, so 995 // we can make sure that this procedure has not been executed yet, as TRSP will hold the 996 // shared lock for table all the time. So here we will unset it and when it is actually 997 // executed, it will find that the attach procedure is not itself and quit immediately. 998 if (regionNode.getProcedure() != null) { 999 regionNode.unsetProcedure(regionNode.getProcedure()); 1000 } 1001 return regionNode.setProcedure(TransitRegionStateProcedure.unassign(getProcedureEnvironment(), 1002 regionNode.getRegionInfo())); 1003 } finally { 1004 regionNode.unlock(); 1005 } 1006 } 1007 1008 /** 1009 * Called by DisableTableProcedure to unassign all the regions for a table. 1010 */ 1011 public TransitRegionStateProcedure[] createUnassignProceduresForDisabling(TableName tableName) { 1012 return regionStates.getTableRegionStateNodes(tableName).stream() 1013 .map(this::forceCreateUnssignProcedure).filter(p -> p != null) 1014 .toArray(TransitRegionStateProcedure[]::new); 1015 } 1016 1017 /** 1018 * Called by ModifyTableProcedures to unassign all the excess region replicas for a table. 1019 */ 1020 public TransitRegionStateProcedure[] createUnassignProceduresForClosingExcessRegionReplicas( 1021 TableName tableName, int newReplicaCount) { 1022 return regionStates.getTableRegionStateNodes(tableName).stream() 1023 .filter(regionNode -> regionNode.getRegionInfo().getReplicaId() >= newReplicaCount) 1024 .map(this::forceCreateUnssignProcedure).filter(p -> p != null) 1025 .toArray(TransitRegionStateProcedure[]::new); 1026 } 1027 1028 public SplitTableRegionProcedure createSplitProcedure(final RegionInfo regionToSplit, 1029 final byte[] splitKey) throws IOException { 1030 return new SplitTableRegionProcedure(getProcedureEnvironment(), regionToSplit, splitKey); 1031 } 1032 1033 public MergeTableRegionsProcedure createMergeProcedure(RegionInfo... ris) throws IOException { 1034 return new MergeTableRegionsProcedure(getProcedureEnvironment(), ris, false); 1035 } 1036 1037 /** 1038 * Delete the region states. This is called by "DeleteTable" 1039 */ 1040 public void deleteTable(final TableName tableName) throws IOException { 1041 final ArrayList<RegionInfo> regions = regionStates.getTableRegionsInfo(tableName); 1042 regionStateStore.deleteRegions(regions); 1043 for (int i = 0; i < regions.size(); ++i) { 1044 final RegionInfo regionInfo = regions.get(i); 1045 // we expect the region to be offline 1046 regionStates.removeFromOfflineRegions(regionInfo); 1047 regionStates.deleteRegion(regionInfo); 1048 } 1049 } 1050 1051 // ============================================================================================ 1052 // RS Region Transition Report helpers 1053 // ============================================================================================ 1054 private void reportRegionStateTransition(ReportRegionStateTransitionResponse.Builder builder, 1055 ServerName serverName, List<RegionStateTransition> transitionList) throws IOException { 1056 for (RegionStateTransition transition : transitionList) { 1057 switch (transition.getTransitionCode()) { 1058 case OPENED: 1059 case FAILED_OPEN: 1060 case CLOSED: 1061 assert transition.getRegionInfoCount() == 1 : transition; 1062 final RegionInfo hri = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0)); 1063 long procId = 1064 transition.getProcIdCount() > 0 ? transition.getProcId(0) : Procedure.NO_PROC_ID; 1065 updateRegionTransition(serverName, transition.getTransitionCode(), hri, 1066 transition.hasOpenSeqNum() ? transition.getOpenSeqNum() : HConstants.NO_SEQNUM, procId); 1067 break; 1068 case READY_TO_SPLIT: 1069 case SPLIT: 1070 case SPLIT_REVERTED: 1071 assert transition.getRegionInfoCount() == 3 : transition; 1072 final RegionInfo parent = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0)); 1073 final RegionInfo splitA = ProtobufUtil.toRegionInfo(transition.getRegionInfo(1)); 1074 final RegionInfo splitB = ProtobufUtil.toRegionInfo(transition.getRegionInfo(2)); 1075 updateRegionSplitTransition(serverName, transition.getTransitionCode(), parent, splitA, 1076 splitB); 1077 break; 1078 case READY_TO_MERGE: 1079 case MERGED: 1080 case MERGE_REVERTED: 1081 assert transition.getRegionInfoCount() == 3 : transition; 1082 final RegionInfo merged = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0)); 1083 final RegionInfo mergeA = ProtobufUtil.toRegionInfo(transition.getRegionInfo(1)); 1084 final RegionInfo mergeB = ProtobufUtil.toRegionInfo(transition.getRegionInfo(2)); 1085 updateRegionMergeTransition(serverName, transition.getTransitionCode(), merged, mergeA, 1086 mergeB); 1087 break; 1088 } 1089 } 1090 } 1091 1092 public ReportRegionStateTransitionResponse reportRegionStateTransition( 1093 final ReportRegionStateTransitionRequest req) throws PleaseHoldException { 1094 ReportRegionStateTransitionResponse.Builder builder = 1095 ReportRegionStateTransitionResponse.newBuilder(); 1096 ServerName serverName = ProtobufUtil.toServerName(req.getServer()); 1097 ServerStateNode serverNode = regionStates.getOrCreateServer(serverName); 1098 // here we have to acquire a read lock instead of a simple exclusive lock. This is because that 1099 // we should not block other reportRegionStateTransition call from the same region server. This 1100 // is not only about performance, but also to prevent dead lock. Think of the meta region is 1101 // also on the same region server and you hold the lock which blocks the 1102 // reportRegionStateTransition for meta, and since meta is not online, you will block inside the 1103 // lock protection to wait for meta online... 1104 serverNode.readLock().lock(); 1105 try { 1106 // we only accept reportRegionStateTransition if the region server is online, see the comment 1107 // above in submitServerCrash method and HBASE-21508 for more details. 1108 if (serverNode.isInState(ServerState.ONLINE)) { 1109 try { 1110 reportRegionStateTransition(builder, serverName, req.getTransitionList()); 1111 } catch (PleaseHoldException e) { 1112 LOG.trace("Failed transition ", e); 1113 throw e; 1114 } catch (UnsupportedOperationException | IOException e) { 1115 // TODO: at the moment we have a single error message and the RS will abort 1116 // if the master says that one of the region transitions failed. 1117 LOG.warn("Failed transition", e); 1118 builder.setErrorMessage("Failed transition " + e.getMessage()); 1119 } 1120 } else { 1121 LOG.warn("The region server {} is already dead, skip reportRegionStateTransition call", 1122 serverName); 1123 builder.setErrorMessage("You are dead"); 1124 } 1125 } finally { 1126 serverNode.readLock().unlock(); 1127 } 1128 1129 return builder.build(); 1130 } 1131 1132 private void updateRegionTransition(ServerName serverName, TransitionCode state, 1133 RegionInfo regionInfo, long seqId, long procId) throws IOException { 1134 checkMetaLoaded(regionInfo); 1135 1136 RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo); 1137 if (regionNode == null) { 1138 // the table/region is gone. maybe a delete, split, merge 1139 throw new UnexpectedStateException(String.format( 1140 "Server %s was trying to transition region %s to %s. but Region is not known.", serverName, 1141 regionInfo, state)); 1142 } 1143 LOG.trace("Update region transition serverName={} region={} regionState={}", serverName, 1144 regionNode, state); 1145 1146 ServerStateNode serverNode = regionStates.getOrCreateServer(serverName); 1147 regionNode.lock(); 1148 try { 1149 if (!reportTransition(regionNode, serverNode, state, seqId, procId)) { 1150 // Don't log WARN if shutting down cluster; during shutdown. Avoid the below messages: 1151 // 2018-08-13 10:45:10,551 WARN ...AssignmentManager: No matching procedure found for 1152 // rit=OPEN, location=ve0538.halxg.cloudera.com,16020,1533493000958, 1153 // table=IntegrationTestBigLinkedList, region=65ab289e2fc1530df65f6c3d7cde7aa5 transition 1154 // to CLOSED 1155 // These happen because on cluster shutdown, we currently let the RegionServers close 1156 // regions. This is the only time that region close is not run by the Master (so cluster 1157 // goes down fast). Consider changing it so Master runs all shutdowns. 1158 if ( 1159 this.master.getServerManager().isClusterShutdown() && state.equals(TransitionCode.CLOSED) 1160 ) { 1161 LOG.info("RegionServer {} {}", state, regionNode.getRegionInfo().getEncodedName()); 1162 } else { 1163 LOG.warn("No matching procedure found for {} transition on {} to {}", serverName, 1164 regionNode, state); 1165 } 1166 } 1167 } finally { 1168 regionNode.unlock(); 1169 } 1170 } 1171 1172 private boolean reportTransition(RegionStateNode regionNode, ServerStateNode serverNode, 1173 TransitionCode state, long seqId, long procId) throws IOException { 1174 ServerName serverName = serverNode.getServerName(); 1175 TransitRegionStateProcedure proc = regionNode.getProcedure(); 1176 if (proc == null) { 1177 return false; 1178 } 1179 proc.reportTransition(master.getMasterProcedureExecutor().getEnvironment(), regionNode, 1180 serverName, state, seqId, procId); 1181 return true; 1182 } 1183 1184 private void updateRegionSplitTransition(final ServerName serverName, final TransitionCode state, 1185 final RegionInfo parent, final RegionInfo hriA, final RegionInfo hriB) throws IOException { 1186 checkMetaLoaded(parent); 1187 1188 if (state != TransitionCode.READY_TO_SPLIT) { 1189 throw new UnexpectedStateException( 1190 "unsupported split regionState=" + state + " for parent region " + parent 1191 + " maybe an old RS (< 2.0) had the operation in progress"); 1192 } 1193 1194 // sanity check on the request 1195 if (!Bytes.equals(hriA.getEndKey(), hriB.getStartKey())) { 1196 throw new UnsupportedOperationException("unsupported split request with bad keys: parent=" 1197 + parent + " hriA=" + hriA + " hriB=" + hriB); 1198 } 1199 1200 if (!master.isSplitOrMergeEnabled(MasterSwitchType.SPLIT)) { 1201 LOG.warn("Split switch is off! skip split of " + parent); 1202 throw new DoNotRetryIOException( 1203 "Split region " + parent.getRegionNameAsString() + " failed due to split switch off"); 1204 } 1205 1206 // Submit the Split procedure 1207 final byte[] splitKey = hriB.getStartKey(); 1208 if (LOG.isDebugEnabled()) { 1209 LOG.debug("Split request from " + serverName + ", parent=" + parent + " splitKey=" 1210 + Bytes.toStringBinary(splitKey)); 1211 } 1212 // Processing this report happens asynchronously from other activities which can mutate 1213 // the region state. For example, a split procedure may already be running for this parent. 1214 // A split procedure cannot succeed if the parent region is no longer open, so we can 1215 // ignore it in that case. 1216 // Note that submitting more than one split procedure for a given region is 1217 // harmless -- the split is fenced in the procedure handling -- but it would be noisy in 1218 // the logs. Only one procedure can succeed. The other procedure(s) would abort during 1219 // initialization and report failure with WARN level logging. 1220 RegionState parentState = regionStates.getRegionState(parent); 1221 if (parentState != null && parentState.isOpened()) { 1222 master.getMasterProcedureExecutor().submitProcedure(createSplitProcedure(parent, splitKey)); 1223 } else { 1224 LOG.info("Ignoring split request from " + serverName + ", parent=" + parent 1225 + " because parent is unknown or not open"); 1226 return; 1227 } 1228 1229 // If the RS is < 2.0 throw an exception to abort the operation, we are handling the split 1230 if (master.getServerManager().getVersionNumber(serverName) < 0x0200000) { 1231 throw new UnsupportedOperationException( 1232 String.format("Split handled by the master: " + "parent=%s hriA=%s hriB=%s", 1233 parent.getShortNameToLog(), hriA, hriB)); 1234 } 1235 } 1236 1237 private void updateRegionMergeTransition(final ServerName serverName, final TransitionCode state, 1238 final RegionInfo merged, final RegionInfo hriA, final RegionInfo hriB) throws IOException { 1239 checkMetaLoaded(merged); 1240 1241 if (state != TransitionCode.READY_TO_MERGE) { 1242 throw new UnexpectedStateException( 1243 "Unsupported merge regionState=" + state + " for regionA=" + hriA + " regionB=" + hriB 1244 + " merged=" + merged + " maybe an old RS (< 2.0) had the operation in progress"); 1245 } 1246 1247 if (!master.isSplitOrMergeEnabled(MasterSwitchType.MERGE)) { 1248 LOG.warn("Merge switch is off! skip merge of regionA=" + hriA + " regionB=" + hriB); 1249 throw new DoNotRetryIOException( 1250 "Merge of regionA=" + hriA + " regionB=" + hriB + " failed because merge switch is off"); 1251 } 1252 1253 // Submit the Merge procedure 1254 if (LOG.isDebugEnabled()) { 1255 LOG.debug("Handling merge request from RS=" + merged + ", merged=" + merged); 1256 } 1257 master.getMasterProcedureExecutor().submitProcedure(createMergeProcedure(hriA, hriB)); 1258 1259 // If the RS is < 2.0 throw an exception to abort the operation, we are handling the merge 1260 if (master.getServerManager().getVersionNumber(serverName) < 0x0200000) { 1261 throw new UnsupportedOperationException( 1262 String.format("Merge not handled yet: regionState=%s merged=%s hriA=%s hriB=%s", state, 1263 merged, hriA, hriB)); 1264 } 1265 } 1266 1267 // ============================================================================================ 1268 // RS Status update (report online regions) helpers 1269 // ============================================================================================ 1270 /** 1271 * The master will call this method when the RS send the regionServerReport(). The report will 1272 * contains the "online regions". This method will check the the online regions against the 1273 * in-memory state of the AM, and we will log a warn message if there is a mismatch. This is 1274 * because that there is no fencing between the reportRegionStateTransition method and 1275 * regionServerReport method, so there could be race and introduce inconsistency here, but 1276 * actually there is no problem. 1277 * <p/> 1278 * Please see HBASE-21421 and HBASE-21463 for more details. 1279 */ 1280 public void reportOnlineRegions(ServerName serverName, Set<byte[]> regionNames) { 1281 if (!isRunning()) { 1282 return; 1283 } 1284 if (LOG.isTraceEnabled()) { 1285 LOG.trace("ReportOnlineRegions {} regionCount={}, metaLoaded={} {}", serverName, 1286 regionNames.size(), isMetaLoaded(), 1287 regionNames.stream().map(Bytes::toStringBinary).collect(Collectors.toList())); 1288 } 1289 1290 ServerStateNode serverNode = regionStates.getOrCreateServer(serverName); 1291 synchronized (serverNode) { 1292 if (!serverNode.isInState(ServerState.ONLINE)) { 1293 LOG.warn("Got a report from a server result in state " + serverNode.getState()); 1294 return; 1295 } 1296 } 1297 1298 // Track the regionserver reported online regions in memory. 1299 synchronized (rsReports) { 1300 rsReports.put(serverName, regionNames); 1301 } 1302 1303 if (regionNames.isEmpty()) { 1304 // nothing to do if we don't have regions 1305 LOG.trace("no online region found on {}", serverName); 1306 return; 1307 } 1308 if (!isMetaLoaded()) { 1309 // we are still on startup, skip checking 1310 return; 1311 } 1312 // The Heartbeat tells us of what regions are on the region serve, check the state. 1313 checkOnlineRegionsReport(serverNode, regionNames); 1314 } 1315 1316 /** 1317 * Close <code>regionName</code> on <code>sn</code> silently and immediately without using a 1318 * Procedure or going via hbase:meta. For case where a RegionServer's hosting of a Region is not 1319 * aligned w/ the Master's accounting of Region state. This is for cleaning up an error in 1320 * accounting. 1321 */ 1322 private void closeRegionSilently(ServerName sn, byte[] regionName) { 1323 try { 1324 RegionInfo ri = MetaTableAccessor.parseRegionInfoFromRegionName(regionName); 1325 // Pass -1 for timeout. Means do not wait. 1326 ServerManager.closeRegionSilentlyAndWait(this.master.getClusterConnection(), sn, ri, -1); 1327 } catch (Exception e) { 1328 LOG.error("Failed trying to close {} on {}", Bytes.toStringBinary(regionName), sn, e); 1329 } 1330 } 1331 1332 /** 1333 * Check that what the RegionServer reports aligns with the Master's image. If disagreement, we 1334 * will tell the RegionServer to expediently close a Region we do not think it should have. 1335 */ 1336 private void checkOnlineRegionsReport(ServerStateNode serverNode, Set<byte[]> regionNames) { 1337 ServerName serverName = serverNode.getServerName(); 1338 for (byte[] regionName : regionNames) { 1339 if (!isRunning()) { 1340 return; 1341 } 1342 RegionStateNode regionNode = regionStates.getRegionStateNodeFromName(regionName); 1343 if (regionNode == null) { 1344 String regionNameAsStr = Bytes.toStringBinary(regionName); 1345 LOG.warn("No RegionStateNode for {} but reported as up on {}; closing...", regionNameAsStr, 1346 serverName); 1347 closeRegionSilently(serverNode.getServerName(), regionName); 1348 continue; 1349 } 1350 final long lag = 1000; 1351 regionNode.lock(); 1352 try { 1353 long diff = EnvironmentEdgeManager.currentTime() - regionNode.getLastUpdate(); 1354 if (regionNode.isInState(State.OPENING, State.OPEN)) { 1355 // This is possible as a region server has just closed a region but the region server 1356 // report is generated before the closing, but arrive after the closing. Make sure there 1357 // is some elapsed time so less false alarms. 1358 if (!regionNode.getRegionLocation().equals(serverName) && diff > lag) { 1359 LOG.warn("Reporting {} server does not match {} (time since last " 1360 + "update={}ms); closing...", serverName, regionNode, diff); 1361 closeRegionSilently(serverNode.getServerName(), regionName); 1362 } 1363 } else if (!regionNode.isInState(State.CLOSING, State.SPLITTING)) { 1364 // So, we can get report that a region is CLOSED or SPLIT because a heartbeat 1365 // came in at about same time as a region transition. Make sure there is some 1366 // elapsed time so less false alarms. 1367 if (diff > lag) { 1368 LOG.warn("Reporting {} state does not match {} (time since last update={}ms)", 1369 serverName, regionNode, diff); 1370 } 1371 } 1372 } finally { 1373 regionNode.unlock(); 1374 } 1375 } 1376 } 1377 1378 // ============================================================================================ 1379 // RIT chore 1380 // ============================================================================================ 1381 private static class RegionInTransitionChore extends ProcedureInMemoryChore<MasterProcedureEnv> { 1382 public RegionInTransitionChore(final int timeoutMsec) { 1383 super(timeoutMsec); 1384 } 1385 1386 @Override 1387 protected void periodicExecute(final MasterProcedureEnv env) { 1388 final AssignmentManager am = env.getAssignmentManager(); 1389 1390 final RegionInTransitionStat ritStat = am.computeRegionInTransitionStat(); 1391 if (ritStat.hasRegionsOverThreshold()) { 1392 for (RegionState hri : ritStat.getRegionOverThreshold()) { 1393 am.handleRegionOverStuckWarningThreshold(hri.getRegion()); 1394 } 1395 } 1396 1397 // update metrics 1398 am.updateRegionsInTransitionMetrics(ritStat); 1399 } 1400 } 1401 1402 private static class DeadServerMetricRegionChore 1403 extends ProcedureInMemoryChore<MasterProcedureEnv> { 1404 public DeadServerMetricRegionChore(final int timeoutMsec) { 1405 super(timeoutMsec); 1406 } 1407 1408 @Override 1409 protected void periodicExecute(final MasterProcedureEnv env) { 1410 final ServerManager sm = env.getMasterServices().getServerManager(); 1411 final AssignmentManager am = env.getAssignmentManager(); 1412 // To minimize inconsistencies we are not going to snapshot live servers in advance in case 1413 // new servers are added; OTOH we don't want to add heavy sync for a consistent view since 1414 // this is for metrics. Instead, we're going to check each regions as we go; to avoid making 1415 // too many checks, we maintain a local lists of server, limiting us to false negatives. If 1416 // we miss some recently-dead server, we'll just see it next time. 1417 Set<ServerName> recentlyLiveServers = new HashSet<>(); 1418 int deadRegions = 0, unknownRegions = 0; 1419 for (RegionStateNode rsn : am.getRegionStates().getRegionStateNodes()) { 1420 if (rsn.getState() != State.OPEN) { 1421 continue; // Opportunistic check, should quickly skip RITs, offline tables, etc. 1422 } 1423 // Do not need to acquire region state lock as this is only for showing metrics. 1424 ServerName sn = rsn.getRegionLocation(); 1425 State state = rsn.getState(); 1426 if (state != State.OPEN) { 1427 continue; // Mostly skipping RITs that are already being take care of. 1428 } 1429 if (sn == null) { 1430 ++unknownRegions; // Opened on null? 1431 continue; 1432 } 1433 if (recentlyLiveServers.contains(sn)) { 1434 continue; 1435 } 1436 ServerManager.ServerLiveState sls = sm.isServerKnownAndOnline(sn); 1437 switch (sls) { 1438 case LIVE: 1439 recentlyLiveServers.add(sn); 1440 break; 1441 case DEAD: 1442 ++deadRegions; 1443 break; 1444 case UNKNOWN: 1445 ++unknownRegions; 1446 break; 1447 default: 1448 throw new AssertionError("Unexpected " + sls); 1449 } 1450 } 1451 if (deadRegions > 0 || unknownRegions > 0) { 1452 LOG.info("Found {} OPEN regions on dead servers and {} OPEN regions on unknown servers", 1453 deadRegions, unknownRegions); 1454 } 1455 1456 am.updateDeadServerRegionMetrics(deadRegions, unknownRegions); 1457 } 1458 } 1459 1460 public RegionInTransitionStat computeRegionInTransitionStat() { 1461 final RegionInTransitionStat rit = new RegionInTransitionStat(getConfiguration()); 1462 rit.update(this); 1463 return rit; 1464 } 1465 1466 public static class RegionInTransitionStat { 1467 private final int ritThreshold; 1468 1469 private HashMap<String, RegionState> ritsOverThreshold = null; 1470 private long statTimestamp; 1471 private long oldestRITTime = 0; 1472 private int totalRITsTwiceThreshold = 0; 1473 private int totalRITs = 0; 1474 1475 public RegionInTransitionStat(final Configuration conf) { 1476 this.ritThreshold = 1477 conf.getInt(METRICS_RIT_STUCK_WARNING_THRESHOLD, DEFAULT_RIT_STUCK_WARNING_THRESHOLD); 1478 } 1479 1480 public int getRITThreshold() { 1481 return ritThreshold; 1482 } 1483 1484 public long getTimestamp() { 1485 return statTimestamp; 1486 } 1487 1488 public int getTotalRITs() { 1489 return totalRITs; 1490 } 1491 1492 public long getOldestRITTime() { 1493 return oldestRITTime; 1494 } 1495 1496 public int getTotalRITsOverThreshold() { 1497 Map<String, RegionState> m = this.ritsOverThreshold; 1498 return m != null ? m.size() : 0; 1499 } 1500 1501 public boolean hasRegionsTwiceOverThreshold() { 1502 return totalRITsTwiceThreshold > 0; 1503 } 1504 1505 public boolean hasRegionsOverThreshold() { 1506 Map<String, RegionState> m = this.ritsOverThreshold; 1507 return m != null && !m.isEmpty(); 1508 } 1509 1510 public Collection<RegionState> getRegionOverThreshold() { 1511 Map<String, RegionState> m = this.ritsOverThreshold; 1512 return m != null ? m.values() : Collections.emptySet(); 1513 } 1514 1515 public boolean isRegionOverThreshold(final RegionInfo regionInfo) { 1516 Map<String, RegionState> m = this.ritsOverThreshold; 1517 return m != null && m.containsKey(regionInfo.getEncodedName()); 1518 } 1519 1520 public boolean isRegionTwiceOverThreshold(final RegionInfo regionInfo) { 1521 Map<String, RegionState> m = this.ritsOverThreshold; 1522 if (m == null) { 1523 return false; 1524 } 1525 final RegionState state = m.get(regionInfo.getEncodedName()); 1526 if (state == null) { 1527 return false; 1528 } 1529 return (statTimestamp - state.getStamp()) > (ritThreshold * 2); 1530 } 1531 1532 protected void update(final AssignmentManager am) { 1533 final RegionStates regionStates = am.getRegionStates(); 1534 this.statTimestamp = EnvironmentEdgeManager.currentTime(); 1535 update(regionStates.getRegionsStateInTransition(), statTimestamp); 1536 update(regionStates.getRegionFailedOpen(), statTimestamp); 1537 1538 if (LOG.isDebugEnabled() && ritsOverThreshold != null && !ritsOverThreshold.isEmpty()) { 1539 LOG.debug("RITs over threshold: {}", 1540 ritsOverThreshold.entrySet().stream() 1541 .map(e -> e.getKey() + ":" + e.getValue().getState().name()) 1542 .collect(Collectors.joining("\n"))); 1543 } 1544 } 1545 1546 private void update(final Collection<RegionState> regions, final long currentTime) { 1547 for (RegionState state : regions) { 1548 totalRITs++; 1549 final long ritTime = currentTime - state.getStamp(); 1550 if (ritTime > ritThreshold) { 1551 if (ritsOverThreshold == null) { 1552 ritsOverThreshold = new HashMap<String, RegionState>(); 1553 } 1554 ritsOverThreshold.put(state.getRegion().getEncodedName(), state); 1555 totalRITsTwiceThreshold += (ritTime > (ritThreshold * 2)) ? 1 : 0; 1556 } 1557 if (oldestRITTime < ritTime) { 1558 oldestRITTime = ritTime; 1559 } 1560 } 1561 } 1562 } 1563 1564 private void updateRegionsInTransitionMetrics(final RegionInTransitionStat ritStat) { 1565 metrics.updateRITOldestAge(ritStat.getOldestRITTime()); 1566 metrics.updateRITCount(ritStat.getTotalRITs()); 1567 metrics.updateRITCountOverThreshold(ritStat.getTotalRITsOverThreshold()); 1568 } 1569 1570 private void updateDeadServerRegionMetrics(int deadRegions, int unknownRegions) { 1571 metrics.updateDeadServerOpenRegions(deadRegions); 1572 metrics.updateUnknownServerOpenRegions(unknownRegions); 1573 } 1574 1575 private void handleRegionOverStuckWarningThreshold(final RegionInfo regionInfo) { 1576 final RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo); 1577 // if (regionNode.isStuck()) { 1578 LOG.warn("STUCK Region-In-Transition {}", regionNode); 1579 } 1580 1581 // ============================================================================================ 1582 // TODO: Master load/bootstrap 1583 // ============================================================================================ 1584 public void joinCluster() throws IOException { 1585 long startTime = System.nanoTime(); 1586 LOG.debug("Joining cluster..."); 1587 1588 // Scan hbase:meta to build list of existing regions, servers, and assignment. 1589 // hbase:meta is online now or will be. Inside loadMeta, we keep trying. Can't make progress 1590 // w/o meta. 1591 loadMeta(); 1592 1593 while (master.getServerManager().countOfRegionServers() < 1) { 1594 LOG.info("Waiting for RegionServers to join; current count={}", 1595 master.getServerManager().countOfRegionServers()); 1596 Threads.sleep(250); 1597 } 1598 LOG.info("Number of RegionServers={}", master.getServerManager().countOfRegionServers()); 1599 1600 // Start the chores 1601 master.getMasterProcedureExecutor().addChore(this.ritChore); 1602 master.getMasterProcedureExecutor().addChore(this.deadMetricChore); 1603 1604 long costMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime); 1605 LOG.info("Joined the cluster in {}", StringUtils.humanTimeDiff(costMs)); 1606 } 1607 1608 /** 1609 * Create assign procedure for offline regions. Just follow the old 1610 * processofflineServersWithOnlineRegions method. Since now we do not need to deal with dead 1611 * server any more, we only deal with the regions in OFFLINE state in this method. And this is a 1612 * bit strange, that for new regions, we will add it in CLOSED state instead of OFFLINE state, and 1613 * usually there will be a procedure to track them. The processofflineServersWithOnlineRegions is 1614 * a legacy from long ago, as things are going really different now, maybe we do not need this 1615 * method any more. Need to revisit later. 1616 */ 1617 // Public so can be run by the Master as part of the startup. Needs hbase:meta to be online. 1618 // Needs to be done after the table state manager has been started. 1619 public void processOfflineRegions() { 1620 TransitRegionStateProcedure[] procs = 1621 regionStates.getRegionStateNodes().stream().filter(rsn -> rsn.isInState(State.OFFLINE)) 1622 .filter(rsn -> isTableEnabled(rsn.getRegionInfo().getTable())).map(rsn -> { 1623 rsn.lock(); 1624 try { 1625 if (rsn.getProcedure() != null) { 1626 return null; 1627 } else { 1628 return rsn.setProcedure(TransitRegionStateProcedure.assign(getProcedureEnvironment(), 1629 rsn.getRegionInfo(), null)); 1630 } 1631 } finally { 1632 rsn.unlock(); 1633 } 1634 }).filter(p -> p != null).toArray(TransitRegionStateProcedure[]::new); 1635 if (procs.length > 0) { 1636 master.getMasterProcedureExecutor().submitProcedures(procs); 1637 } 1638 } 1639 1640 /* 1641 * AM internal RegionStateStore.RegionStateVisitor implementation. To be used when scanning META 1642 * table for region rows, using RegionStateStore utility methods. RegionStateStore methods will 1643 * convert Result into proper RegionInfo instances, but those would still need to be added into 1644 * AssignmentManager.regionStates in-memory cache. RegionMetaLoadingVisitor.visitRegionState 1645 * method provides the logic for adding RegionInfo instances as loaded from latest META scan into 1646 * AssignmentManager.regionStates. 1647 */ 1648 private class RegionMetaLoadingVisitor implements RegionStateStore.RegionStateVisitor { 1649 1650 @Override 1651 public void visitRegionState(Result result, final RegionInfo regionInfo, final State state, 1652 final ServerName regionLocation, final ServerName lastHost, final long openSeqNum) { 1653 if ( 1654 state == null && regionLocation == null && lastHost == null 1655 && openSeqNum == SequenceId.NO_SEQUENCE_ID 1656 ) { 1657 // This is a row with nothing in it. 1658 LOG.warn("Skipping empty row={}", result); 1659 return; 1660 } 1661 State localState = state; 1662 if (localState == null) { 1663 // No region state column data in hbase:meta table! Are I doing a rolling upgrade from 1664 // hbase1 to hbase2? Am I restoring a SNAPSHOT or otherwise adding a region to hbase:meta? 1665 // In any of these cases, state is empty. For now, presume OFFLINE but there are probably 1666 // cases where we need to probe more to be sure this correct; TODO informed by experience. 1667 LOG.info(regionInfo.getEncodedName() + " regionState=null; presuming " + State.OFFLINE); 1668 localState = State.OFFLINE; 1669 } 1670 RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo); 1671 // Do not need to lock on regionNode, as we can make sure that before we finish loading 1672 // meta, all the related procedures can not be executed. The only exception is for meta 1673 // region related operations, but here we do not load the informations for meta region. 1674 regionNode.setState(localState); 1675 regionNode.setLastHost(lastHost); 1676 regionNode.setRegionLocation(regionLocation); 1677 regionNode.setOpenSeqNum(openSeqNum); 1678 1679 // Note: keep consistent with other methods, see region(Opening|Opened|Closing) 1680 // RIT/ServerCrash handling should take care of the transiting regions. 1681 if ( 1682 localState.matches(State.OPEN, State.OPENING, State.CLOSING, State.SPLITTING, State.MERGING) 1683 ) { 1684 assert regionLocation != null : "found null region location for " + regionNode; 1685 regionStates.addRegionToServer(regionNode); 1686 } else if (localState == State.OFFLINE || regionInfo.isOffline()) { 1687 regionStates.addToOfflineRegions(regionNode); 1688 } 1689 if (regionNode.getProcedure() != null) { 1690 regionNode.getProcedure().stateLoaded(AssignmentManager.this, regionNode); 1691 } 1692 } 1693 }; 1694 1695 /** 1696 * Query META if the given <code>RegionInfo</code> exists, adding to 1697 * <code>AssignmentManager.regionStateStore</code> cache if the region is found in META. 1698 * @param regionEncodedName encoded name for the region to be loaded from META into 1699 * <code>AssignmentManager.regionStateStore</code> cache 1700 * @return <code>RegionInfo</code> instance for the given region if it is present in META and got 1701 * successfully loaded into <code>AssignmentManager.regionStateStore</code> cache, 1702 * <b>null</b> otherwise. 1703 * @throws UnknownRegionException if any errors occur while querying meta. 1704 */ 1705 public RegionInfo loadRegionFromMeta(String regionEncodedName) throws UnknownRegionException { 1706 try { 1707 RegionMetaLoadingVisitor visitor = new RegionMetaLoadingVisitor(); 1708 regionStateStore.visitMetaForRegion(regionEncodedName, visitor); 1709 return regionStates.getRegionState(regionEncodedName) == null 1710 ? null 1711 : regionStates.getRegionState(regionEncodedName).getRegion(); 1712 } catch (IOException e) { 1713 throw new UnknownRegionException( 1714 "Error trying to load region " + regionEncodedName + " from META", e); 1715 } 1716 } 1717 1718 private void loadMeta() throws IOException { 1719 // TODO: use a thread pool 1720 regionStateStore.visitMeta(new RegionMetaLoadingVisitor()); 1721 } 1722 1723 /** 1724 * Used to check if the meta loading is done. 1725 * <p/> 1726 * if not we throw PleaseHoldException since we are rebuilding the RegionStates 1727 * @param hri region to check if it is already rebuild 1728 * @throws PleaseHoldException if meta has not been loaded yet 1729 */ 1730 private void checkMetaLoaded(RegionInfo hri) throws PleaseHoldException { 1731 if (!isRunning()) { 1732 throw new PleaseHoldException("AssignmentManager not running"); 1733 } 1734 boolean meta = isMetaRegion(hri); 1735 boolean metaLoaded = isMetaLoaded(); 1736 if (!meta && !metaLoaded) { 1737 throw new PleaseHoldException( 1738 "Master not fully online; hbase:meta=" + meta + ", metaLoaded=" + metaLoaded); 1739 } 1740 } 1741 1742 // ============================================================================================ 1743 // TODO: Metrics 1744 // ============================================================================================ 1745 public int getNumRegionsOpened() { 1746 // TODO: Used by TestRegionPlacement.java and assume monotonically increasing value 1747 return 0; 1748 } 1749 1750 /** 1751 * Usually run by the Master in reaction to server crash during normal processing. Can also be 1752 * invoked via external RPC to effect repair; in the latter case, the 'force' flag is set so we 1753 * push through the SCP though context may indicate already-running-SCP (An old SCP may have 1754 * exited abnormally, or damaged cluster may still have references in hbase:meta to 'Unknown 1755 * Servers' -- servers that are not online or in dead servers list, etc.) 1756 * @param force Set if the request came in externally over RPC (via hbck2). Force means run the 1757 * SCP even if it seems as though there might be an outstanding SCP running. 1758 * @return pid of scheduled SCP or {@link Procedure#NO_PROC_ID} if none scheduled. 1759 */ 1760 public long submitServerCrash(ServerName serverName, boolean shouldSplitWal, boolean force) { 1761 // May be an 'Unknown Server' so handle case where serverNode is null. 1762 ServerStateNode serverNode = regionStates.getServerNode(serverName); 1763 // Remove the in-memory rsReports result 1764 synchronized (rsReports) { 1765 rsReports.remove(serverName); 1766 } 1767 1768 // We hold the write lock here for fencing on reportRegionStateTransition. Once we set the 1769 // server state to CRASHED, we will no longer accept the reportRegionStateTransition call from 1770 // this server. This is used to simplify the implementation for TRSP and SCP, where we can make 1771 // sure that, the region list fetched by SCP will not be changed any more. 1772 if (serverNode != null) { 1773 serverNode.writeLock().lock(); 1774 } 1775 boolean carryingMeta; 1776 long pid; 1777 try { 1778 ProcedureExecutor<MasterProcedureEnv> procExec = this.master.getMasterProcedureExecutor(); 1779 carryingMeta = isCarryingMeta(serverName); 1780 if (!force && serverNode != null && !serverNode.isInState(ServerState.ONLINE)) { 1781 LOG.info("Skip adding ServerCrashProcedure for {} (meta={}) -- running?", serverNode, 1782 carryingMeta); 1783 return Procedure.NO_PROC_ID; 1784 } else { 1785 MasterProcedureEnv mpe = procExec.getEnvironment(); 1786 // If serverNode == null, then 'Unknown Server'. Schedule HBCKSCP instead. 1787 // HBCKSCP scours Master in-memory state AND hbase;meta for references to 1788 // serverName just-in-case. An SCP that is scheduled when the server is 1789 // 'Unknown' probably originated externally with HBCK2 fix-it tool. 1790 ServerState oldState = null; 1791 if (serverNode != null) { 1792 oldState = serverNode.getState(); 1793 serverNode.setState(ServerState.CRASHED); 1794 } 1795 1796 if (force) { 1797 pid = procExec.submitProcedure( 1798 new HBCKServerCrashProcedure(mpe, serverName, shouldSplitWal, carryingMeta)); 1799 } else { 1800 pid = procExec.submitProcedure( 1801 new ServerCrashProcedure(mpe, serverName, shouldSplitWal, carryingMeta)); 1802 } 1803 LOG.info("Scheduled ServerCrashProcedure pid={} for {} (carryingMeta={}){}.", pid, 1804 serverName, carryingMeta, 1805 serverNode == null ? "" : " " + serverNode.toString() + ", oldState=" + oldState); 1806 } 1807 } finally { 1808 if (serverNode != null) { 1809 serverNode.writeLock().unlock(); 1810 } 1811 } 1812 return pid; 1813 } 1814 1815 public void offlineRegion(final RegionInfo regionInfo) { 1816 // TODO used by MasterRpcServices 1817 RegionStateNode node = regionStates.getRegionStateNode(regionInfo); 1818 if (node != null) { 1819 node.offline(); 1820 } 1821 } 1822 1823 public void onlineRegion(final RegionInfo regionInfo, final ServerName serverName) { 1824 // TODO used by TestSplitTransactionOnCluster.java 1825 } 1826 1827 public Map<ServerName, List<RegionInfo>> 1828 getSnapShotOfAssignment(final Collection<RegionInfo> regions) { 1829 return regionStates.getSnapShotOfAssignment(regions); 1830 } 1831 1832 // ============================================================================================ 1833 // TODO: UTILS/HELPERS? 1834 // ============================================================================================ 1835 /** 1836 * Used by the client (via master) to identify if all regions have the schema updates 1837 * @return Pair indicating the status of the alter command (pending/total) 1838 */ 1839 public Pair<Integer, Integer> getReopenStatus(TableName tableName) { 1840 if (isTableDisabled(tableName)) { 1841 return new Pair<Integer, Integer>(0, 0); 1842 } 1843 1844 final List<RegionState> states = regionStates.getTableRegionStates(tableName); 1845 int ritCount = 0; 1846 for (RegionState regionState : states) { 1847 if (!regionState.isOpened() && !regionState.isSplit()) { 1848 ritCount++; 1849 } 1850 } 1851 return new Pair<Integer, Integer>(ritCount, states.size()); 1852 } 1853 1854 // ============================================================================================ 1855 // TODO: Region State In Transition 1856 // ============================================================================================ 1857 public boolean hasRegionsInTransition() { 1858 return regionStates.hasRegionsInTransition(); 1859 } 1860 1861 public List<RegionStateNode> getRegionsInTransition() { 1862 return regionStates.getRegionsInTransition(); 1863 } 1864 1865 public List<RegionInfo> getAssignedRegions() { 1866 return regionStates.getAssignedRegions(); 1867 } 1868 1869 public RegionInfo getRegionInfo(final byte[] regionName) { 1870 final RegionStateNode regionState = regionStates.getRegionStateNodeFromName(regionName); 1871 return regionState != null ? regionState.getRegionInfo() : null; 1872 } 1873 1874 // ============================================================================================ 1875 // Expected states on region state transition. 1876 // Notice that there is expected states for transiting to OPENING state, this is because SCP. 1877 // See the comments in regionOpening method for more details. 1878 // ============================================================================================ 1879 private static final State[] STATES_EXPECTED_ON_OPEN = { State.OPENING, // Normal case 1880 State.OPEN // Retrying 1881 }; 1882 1883 private static final State[] STATES_EXPECTED_ON_CLOSING = { State.OPEN, // Normal case 1884 State.CLOSING, // Retrying 1885 State.SPLITTING, // Offline the split parent 1886 State.MERGING // Offline the merge parents 1887 }; 1888 1889 private static final State[] STATES_EXPECTED_ON_CLOSED = { State.CLOSING, // Normal case 1890 State.CLOSED // Retrying 1891 }; 1892 1893 // This is for manually scheduled region assign, can add other states later if we find out other 1894 // usages 1895 private static final State[] STATES_EXPECTED_ON_ASSIGN = { State.CLOSED, State.OFFLINE }; 1896 1897 // We only allow unassign or move a region which is in OPEN state. 1898 private static final State[] STATES_EXPECTED_ON_UNASSIGN_OR_MOVE = { State.OPEN }; 1899 1900 // ============================================================================================ 1901 // Region Status update 1902 // Should only be called in TransitRegionStateProcedure(and related procedures), as the locking 1903 // and pre-assumptions are very tricky. 1904 // ============================================================================================ 1905 private void transitStateAndUpdate(RegionStateNode regionNode, RegionState.State newState, 1906 RegionState.State... expectedStates) throws IOException { 1907 RegionState.State state = regionNode.getState(); 1908 regionNode.transitionState(newState, expectedStates); 1909 boolean succ = false; 1910 try { 1911 regionStateStore.updateRegionLocation(regionNode); 1912 succ = true; 1913 } finally { 1914 if (!succ) { 1915 // revert 1916 regionNode.setState(state); 1917 } 1918 } 1919 } 1920 1921 // should be called within the synchronized block of RegionStateNode 1922 void regionOpening(RegionStateNode regionNode) throws IOException { 1923 // As in SCP, for performance reason, there is no TRSP attached with this region, we will not 1924 // update the region state, which means that the region could be in any state when we want to 1925 // assign it after a RS crash. So here we do not pass the expectedStates parameter. 1926 transitStateAndUpdate(regionNode, State.OPENING); 1927 regionStates.addRegionToServer(regionNode); 1928 // update the operation count metrics 1929 metrics.incrementOperationCounter(); 1930 } 1931 1932 // should be called under the RegionStateNode lock 1933 // The parameter 'giveUp' means whether we will try to open the region again, if it is true, then 1934 // we will persist the FAILED_OPEN state into hbase:meta. 1935 void regionFailedOpen(RegionStateNode regionNode, boolean giveUp) throws IOException { 1936 RegionState.State state = regionNode.getState(); 1937 ServerName regionLocation = regionNode.getRegionLocation(); 1938 if (giveUp) { 1939 regionNode.setState(State.FAILED_OPEN); 1940 regionNode.setRegionLocation(null); 1941 boolean succ = false; 1942 try { 1943 regionStateStore.updateRegionLocation(regionNode); 1944 succ = true; 1945 } finally { 1946 if (!succ) { 1947 // revert 1948 regionNode.setState(state); 1949 regionNode.setRegionLocation(regionLocation); 1950 } 1951 } 1952 } 1953 if (regionLocation != null) { 1954 regionStates.removeRegionFromServer(regionLocation, regionNode); 1955 } 1956 } 1957 1958 // should be called under the RegionStateNode lock 1959 void regionClosing(RegionStateNode regionNode) throws IOException { 1960 transitStateAndUpdate(regionNode, State.CLOSING, STATES_EXPECTED_ON_CLOSING); 1961 1962 RegionInfo hri = regionNode.getRegionInfo(); 1963 // Set meta has not initialized early. so people trying to create/edit tables will wait 1964 if (isMetaRegion(hri)) { 1965 setMetaAssigned(hri, false); 1966 } 1967 regionStates.addRegionToServer(regionNode); 1968 // update the operation count metrics 1969 metrics.incrementOperationCounter(); 1970 } 1971 1972 // for open and close, they will first be persist to the procedure store in 1973 // RegionRemoteProcedureBase. So here we will first change the in memory state as it is considered 1974 // as succeeded if the persistence to procedure store is succeeded, and then when the 1975 // RegionRemoteProcedureBase is woken up, we will persist the RegionStateNode to hbase:meta. 1976 1977 // should be called under the RegionStateNode lock 1978 void regionOpenedWithoutPersistingToMeta(RegionStateNode regionNode) throws IOException { 1979 regionNode.transitionState(State.OPEN, STATES_EXPECTED_ON_OPEN); 1980 RegionInfo regionInfo = regionNode.getRegionInfo(); 1981 regionStates.addRegionToServer(regionNode); 1982 regionStates.removeFromFailedOpen(regionInfo); 1983 } 1984 1985 // should be called under the RegionStateNode lock 1986 void regionClosedWithoutPersistingToMeta(RegionStateNode regionNode) throws IOException { 1987 ServerName regionLocation = regionNode.getRegionLocation(); 1988 regionNode.transitionState(State.CLOSED, STATES_EXPECTED_ON_CLOSED); 1989 regionNode.setRegionLocation(null); 1990 if (regionLocation != null) { 1991 regionNode.setLastHost(regionLocation); 1992 regionStates.removeRegionFromServer(regionLocation, regionNode); 1993 } 1994 } 1995 1996 // should be called under the RegionStateNode lock 1997 // for SCP 1998 public void regionClosedAbnormally(RegionStateNode regionNode) throws IOException { 1999 RegionState.State state = regionNode.getState(); 2000 ServerName regionLocation = regionNode.getRegionLocation(); 2001 regionNode.transitionState(State.ABNORMALLY_CLOSED); 2002 regionNode.setRegionLocation(null); 2003 boolean succ = false; 2004 try { 2005 regionStateStore.updateRegionLocation(regionNode); 2006 succ = true; 2007 } finally { 2008 if (!succ) { 2009 // revert 2010 regionNode.setState(state); 2011 regionNode.setRegionLocation(regionLocation); 2012 } 2013 } 2014 if (regionLocation != null) { 2015 regionNode.setLastHost(regionLocation); 2016 regionStates.removeRegionFromServer(regionLocation, regionNode); 2017 } 2018 } 2019 2020 void persistToMeta(RegionStateNode regionNode) throws IOException { 2021 regionStateStore.updateRegionLocation(regionNode); 2022 RegionInfo regionInfo = regionNode.getRegionInfo(); 2023 if (isMetaRegion(regionInfo) && regionNode.getState() == State.OPEN) { 2024 // Usually we'd set a table ENABLED at this stage but hbase:meta is ALWAYs enabled, it 2025 // can't be disabled -- so skip the RPC (besides... enabled is managed by TableStateManager 2026 // which is backed by hbase:meta... Avoid setting ENABLED to avoid having to update state 2027 // on table that contains state. 2028 setMetaAssigned(regionInfo, true); 2029 } 2030 } 2031 2032 // ============================================================================================ 2033 // The above methods can only be called in TransitRegionStateProcedure(and related procedures) 2034 // ============================================================================================ 2035 2036 public void markRegionAsSplit(final RegionInfo parent, final ServerName serverName, 2037 final RegionInfo daughterA, final RegionInfo daughterB) throws IOException { 2038 // Update hbase:meta. Parent will be marked offline and split up in hbase:meta. 2039 // The parent stays in regionStates until cleared when removed by CatalogJanitor. 2040 // Update its state in regionStates to it shows as offline and split when read 2041 // later figuring what regions are in a table and what are not: see 2042 // regionStates#getRegionsOfTable 2043 final RegionStateNode node = regionStates.getOrCreateRegionStateNode(parent); 2044 node.setState(State.SPLIT); 2045 final RegionStateNode nodeA = regionStates.getOrCreateRegionStateNode(daughterA); 2046 nodeA.setState(State.SPLITTING_NEW); 2047 final RegionStateNode nodeB = regionStates.getOrCreateRegionStateNode(daughterB); 2048 nodeB.setState(State.SPLITTING_NEW); 2049 2050 // TODO: here we just update the parent region info in meta, to set split and offline to true, 2051 // without changing the one in the region node. This is a bit confusing but the region info 2052 // field in RegionStateNode is not expected to be changed in the current design. Need to find a 2053 // possible way to address this problem, or at least adding more comments about the trick to 2054 // deal with this problem, that when you want to filter out split parent, you need to check both 2055 // the RegionState on whether it is split, and also the region info. If one of them matches then 2056 // it is a split parent. And usually only one of them can match, as after restart, the region 2057 // state will be changed from SPLIT to CLOSED. 2058 regionStateStore.splitRegion(parent, daughterA, daughterB, serverName); 2059 if (shouldAssignFavoredNodes(parent)) { 2060 List<ServerName> onlineServers = this.master.getServerManager().getOnlineServersList(); 2061 ((FavoredNodesPromoter) getBalancer()).generateFavoredNodesForDaughter(onlineServers, parent, 2062 daughterA, daughterB); 2063 } 2064 } 2065 2066 /** 2067 * When called here, the merge has happened. The merged regions have been unassigned and the above 2068 * markRegionClosed has been called on each so they have been disassociated from a hosting Server. 2069 * The merged region will be open after this call. The merged regions are removed from hbase:meta 2070 * below. Later they are deleted from the filesystem by the catalog janitor running against 2071 * hbase:meta. It notices when the merged region no longer holds references to the old regions 2072 * (References are deleted after a compaction rewrites what the Reference points at but not until 2073 * the archiver chore runs, are the References removed). 2074 */ 2075 public void markRegionAsMerged(final RegionInfo child, final ServerName serverName, 2076 RegionInfo[] mergeParents) throws IOException { 2077 final RegionStateNode node = regionStates.getOrCreateRegionStateNode(child); 2078 node.setState(State.MERGED); 2079 for (RegionInfo ri : mergeParents) { 2080 regionStates.deleteRegion(ri); 2081 2082 } 2083 regionStateStore.mergeRegions(child, mergeParents, serverName); 2084 if (shouldAssignFavoredNodes(child)) { 2085 ((FavoredNodesPromoter) getBalancer()).generateFavoredNodesForMergedRegion(child, 2086 mergeParents); 2087 } 2088 } 2089 2090 /* 2091 * Favored nodes should be applied only when FavoredNodes balancer is configured and the region 2092 * belongs to a non-system table. 2093 */ 2094 private boolean shouldAssignFavoredNodes(RegionInfo region) { 2095 return this.shouldAssignRegionsWithFavoredNodes 2096 && FavoredNodesManager.isFavoredNodeApplicable(region); 2097 } 2098 2099 // ============================================================================================ 2100 // Assign Queue (Assign/Balance) 2101 // ============================================================================================ 2102 private final ArrayList<RegionStateNode> pendingAssignQueue = new ArrayList<RegionStateNode>(); 2103 private final ReentrantLock assignQueueLock = new ReentrantLock(); 2104 private final Condition assignQueueFullCond = assignQueueLock.newCondition(); 2105 2106 /** 2107 * Add the assign operation to the assignment queue. The pending assignment operation will be 2108 * processed, and each region will be assigned by a server using the balancer. 2109 */ 2110 protected void queueAssign(final RegionStateNode regionNode) { 2111 regionNode.getProcedureEvent().suspend(); 2112 2113 // TODO: quick-start for meta and the other sys-tables? 2114 assignQueueLock.lock(); 2115 try { 2116 pendingAssignQueue.add(regionNode); 2117 if ( 2118 regionNode.isSystemTable() || pendingAssignQueue.size() == 1 2119 || pendingAssignQueue.size() >= assignDispatchWaitQueueMaxSize 2120 ) { 2121 assignQueueFullCond.signal(); 2122 } 2123 } finally { 2124 assignQueueLock.unlock(); 2125 } 2126 } 2127 2128 private void startAssignmentThread() { 2129 assignThread = new Thread(master.getServerName().toShortString()) { 2130 @Override 2131 public void run() { 2132 while (isRunning()) { 2133 processAssignQueue(); 2134 } 2135 pendingAssignQueue.clear(); 2136 } 2137 }; 2138 assignThread.setDaemon(true); 2139 assignThread.start(); 2140 } 2141 2142 private void stopAssignmentThread() { 2143 assignQueueSignal(); 2144 try { 2145 while (assignThread.isAlive()) { 2146 assignQueueSignal(); 2147 assignThread.join(250); 2148 } 2149 } catch (InterruptedException e) { 2150 LOG.warn("join interrupted", e); 2151 Thread.currentThread().interrupt(); 2152 } 2153 } 2154 2155 private void assignQueueSignal() { 2156 assignQueueLock.lock(); 2157 try { 2158 assignQueueFullCond.signal(); 2159 } finally { 2160 assignQueueLock.unlock(); 2161 } 2162 } 2163 2164 @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP") 2165 private HashMap<RegionInfo, RegionStateNode> waitOnAssignQueue() { 2166 HashMap<RegionInfo, RegionStateNode> regions = null; 2167 2168 assignQueueLock.lock(); 2169 try { 2170 if (pendingAssignQueue.isEmpty() && isRunning()) { 2171 assignQueueFullCond.await(); 2172 } 2173 2174 if (!isRunning()) { 2175 return null; 2176 } 2177 assignQueueFullCond.await(assignDispatchWaitMillis, TimeUnit.MILLISECONDS); 2178 regions = new HashMap<RegionInfo, RegionStateNode>(pendingAssignQueue.size()); 2179 for (RegionStateNode regionNode : pendingAssignQueue) { 2180 regions.put(regionNode.getRegionInfo(), regionNode); 2181 } 2182 pendingAssignQueue.clear(); 2183 } catch (InterruptedException e) { 2184 LOG.warn("got interrupted ", e); 2185 Thread.currentThread().interrupt(); 2186 } finally { 2187 assignQueueLock.unlock(); 2188 } 2189 return regions; 2190 } 2191 2192 private void processAssignQueue() { 2193 final HashMap<RegionInfo, RegionStateNode> regions = waitOnAssignQueue(); 2194 if (regions == null || regions.size() == 0 || !isRunning()) { 2195 return; 2196 } 2197 2198 if (LOG.isTraceEnabled()) { 2199 LOG.trace("PROCESS ASSIGN QUEUE regionCount=" + regions.size()); 2200 } 2201 2202 // TODO: Optimize balancer. pass a RegionPlan? 2203 final HashMap<RegionInfo, ServerName> retainMap = new HashMap<>(); 2204 final List<RegionInfo> userHRIs = new ArrayList<>(regions.size()); 2205 // Regions for system tables requiring reassignment 2206 final List<RegionInfo> systemHRIs = new ArrayList<>(); 2207 for (RegionStateNode regionStateNode : regions.values()) { 2208 boolean sysTable = regionStateNode.isSystemTable(); 2209 final List<RegionInfo> hris = sysTable ? systemHRIs : userHRIs; 2210 if (regionStateNode.getRegionLocation() != null) { 2211 retainMap.put(regionStateNode.getRegionInfo(), regionStateNode.getRegionLocation()); 2212 } else { 2213 hris.add(regionStateNode.getRegionInfo()); 2214 } 2215 } 2216 2217 // TODO: connect with the listener to invalidate the cache 2218 2219 // TODO use events 2220 List<ServerName> servers = master.getServerManager().createDestinationServersList(); 2221 for (int i = 0; servers.size() < 1; ++i) { 2222 // Report every fourth time around this loop; try not to flood log. 2223 if (i % 4 == 0) { 2224 LOG.warn("No servers available; cannot place " + regions.size() + " unassigned regions."); 2225 } 2226 2227 if (!isRunning()) { 2228 LOG.debug("Stopped! Dropping assign of " + regions.size() + " queued regions."); 2229 return; 2230 } 2231 Threads.sleep(250); 2232 servers = master.getServerManager().createDestinationServersList(); 2233 } 2234 2235 if (!systemHRIs.isEmpty()) { 2236 // System table regions requiring reassignment are present, get region servers 2237 // not available for system table regions 2238 final List<ServerName> excludeServers = getExcludedServersForSystemTable(); 2239 List<ServerName> serversForSysTables = 2240 servers.stream().filter(s -> !excludeServers.contains(s)).collect(Collectors.toList()); 2241 if (serversForSysTables.isEmpty()) { 2242 LOG.warn("Filtering old server versions and the excluded produced an empty set; " 2243 + "instead considering all candidate servers!"); 2244 } 2245 LOG.debug("Processing assignQueue; systemServersCount=" + serversForSysTables.size() 2246 + ", allServersCount=" + servers.size()); 2247 processAssignmentPlans(regions, null, systemHRIs, 2248 serversForSysTables.isEmpty() && !containsBogusAssignments(regions, systemHRIs) 2249 ? servers 2250 : serversForSysTables); 2251 } 2252 2253 processAssignmentPlans(regions, retainMap, userHRIs, servers); 2254 } 2255 2256 private boolean containsBogusAssignments(Map<RegionInfo, RegionStateNode> regions, 2257 List<RegionInfo> hirs) { 2258 for (RegionInfo ri : hirs) { 2259 if ( 2260 regions.get(ri).getRegionLocation() != null 2261 && regions.get(ri).getRegionLocation().equals(LoadBalancer.BOGUS_SERVER_NAME) 2262 ) { 2263 return true; 2264 } 2265 } 2266 return false; 2267 } 2268 2269 private void processAssignmentPlans(final HashMap<RegionInfo, RegionStateNode> regions, 2270 final HashMap<RegionInfo, ServerName> retainMap, final List<RegionInfo> hris, 2271 final List<ServerName> servers) { 2272 boolean isTraceEnabled = LOG.isTraceEnabled(); 2273 if (isTraceEnabled) { 2274 LOG.trace("Available servers count=" + servers.size() + ": " + servers); 2275 } 2276 2277 final LoadBalancer balancer = getBalancer(); 2278 // ask the balancer where to place regions 2279 if (retainMap != null && !retainMap.isEmpty()) { 2280 if (isTraceEnabled) { 2281 LOG.trace("retain assign regions=" + retainMap); 2282 } 2283 try { 2284 acceptPlan(regions, balancer.retainAssignment(retainMap, servers)); 2285 } catch (HBaseIOException e) { 2286 LOG.warn("unable to retain assignment", e); 2287 addToPendingAssignment(regions, retainMap.keySet()); 2288 } 2289 } 2290 2291 // TODO: Do we need to split retain and round-robin? 2292 // the retain seems to fallback to round-robin/random if the region is not in the map. 2293 if (!hris.isEmpty()) { 2294 Collections.sort(hris, RegionInfo.COMPARATOR); 2295 if (isTraceEnabled) { 2296 LOG.trace("round robin regions=" + hris); 2297 } 2298 try { 2299 acceptPlan(regions, balancer.roundRobinAssignment(hris, servers)); 2300 } catch (HBaseIOException e) { 2301 LOG.warn("unable to round-robin assignment", e); 2302 addToPendingAssignment(regions, hris); 2303 } 2304 } 2305 } 2306 2307 private void acceptPlan(final HashMap<RegionInfo, RegionStateNode> regions, 2308 final Map<ServerName, List<RegionInfo>> plan) throws HBaseIOException { 2309 final ProcedureEvent<?>[] events = new ProcedureEvent[regions.size()]; 2310 final long st = EnvironmentEdgeManager.currentTime(); 2311 2312 if (plan.isEmpty()) { 2313 throw new HBaseIOException("unable to compute plans for regions=" + regions.size()); 2314 } 2315 2316 int evcount = 0; 2317 for (Map.Entry<ServerName, List<RegionInfo>> entry : plan.entrySet()) { 2318 final ServerName server = entry.getKey(); 2319 for (RegionInfo hri : entry.getValue()) { 2320 final RegionStateNode regionNode = regions.get(hri); 2321 regionNode.setRegionLocation(server); 2322 if (server.equals(LoadBalancer.BOGUS_SERVER_NAME) && regionNode.isSystemTable()) { 2323 assignQueueLock.lock(); 2324 try { 2325 pendingAssignQueue.add(regionNode); 2326 } finally { 2327 assignQueueLock.unlock(); 2328 } 2329 } else { 2330 events[evcount++] = regionNode.getProcedureEvent(); 2331 } 2332 } 2333 } 2334 ProcedureEvent.wakeEvents(getProcedureScheduler(), events); 2335 2336 final long et = EnvironmentEdgeManager.currentTime(); 2337 if (LOG.isTraceEnabled()) { 2338 LOG.trace("ASSIGN ACCEPT " + events.length + " -> " + StringUtils.humanTimeDiff(et - st)); 2339 } 2340 } 2341 2342 private void addToPendingAssignment(final HashMap<RegionInfo, RegionStateNode> regions, 2343 final Collection<RegionInfo> pendingRegions) { 2344 assignQueueLock.lock(); 2345 try { 2346 for (RegionInfo hri : pendingRegions) { 2347 pendingAssignQueue.add(regions.get(hri)); 2348 } 2349 } finally { 2350 assignQueueLock.unlock(); 2351 } 2352 } 2353 2354 /** 2355 * For a given cluster with mixed versions of servers, get a list of servers with lower versions, 2356 * where system table regions should not be assigned to. For system table, we must assign regions 2357 * to a server with highest version. However, we can disable this exclusion using config: 2358 * "hbase.min.version.move.system.tables" if checkForMinVersion is true. Detailed explanation 2359 * available with definition of minVersionToMoveSysTables. 2360 * @return List of Excluded servers for System table regions. 2361 */ 2362 public List<ServerName> getExcludedServersForSystemTable() { 2363 // TODO: This should be a cached list kept by the ServerManager rather than calculated on each 2364 // move or system region assign. The RegionServerTracker keeps list of online Servers with 2365 // RegionServerInfo that includes Version. 2366 List<Pair<ServerName, String>> serverList = 2367 master.getServerManager().getOnlineServersList().stream() 2368 .map(s -> new Pair<>(s, master.getRegionServerVersion(s))).collect(Collectors.toList()); 2369 if (serverList.isEmpty()) { 2370 return new ArrayList<>(); 2371 } 2372 String highestVersion = Collections 2373 .max(serverList, (o1, o2) -> VersionInfo.compareVersion(o1.getSecond(), o2.getSecond())) 2374 .getSecond(); 2375 if (!DEFAULT_MIN_VERSION_MOVE_SYS_TABLES_CONFIG.equals(minVersionToMoveSysTables)) { 2376 int comparedValue = VersionInfo.compareVersion(minVersionToMoveSysTables, highestVersion); 2377 if (comparedValue > 0) { 2378 return new ArrayList<>(); 2379 } 2380 } 2381 return serverList.stream().filter(pair -> !pair.getSecond().equals(highestVersion)) 2382 .map(Pair::getFirst).collect(Collectors.toList()); 2383 } 2384 2385 MasterServices getMaster() { 2386 return master; 2387 } 2388 2389 /** Returns a snapshot of rsReports */ 2390 public Map<ServerName, Set<byte[]>> getRSReports() { 2391 Map<ServerName, Set<byte[]>> rsReportsSnapshot = new HashMap<>(); 2392 synchronized (rsReports) { 2393 rsReports.entrySet().forEach(e -> rsReportsSnapshot.put(e.getKey(), e.getValue())); 2394 } 2395 return rsReportsSnapshot; 2396 } 2397 2398 /** 2399 * Provide regions state count for given table. e.g howmany regions of give table are 2400 * opened/closed/rit etc 2401 * @param tableName TableName 2402 * @return region states count 2403 */ 2404 public RegionStatesCount getRegionStatesCount(TableName tableName) { 2405 int openRegionsCount = 0; 2406 int closedRegionCount = 0; 2407 int ritCount = 0; 2408 int splitRegionCount = 0; 2409 int totalRegionCount = 0; 2410 if (!isTableDisabled(tableName)) { 2411 final List<RegionState> states = regionStates.getTableRegionStates(tableName); 2412 for (RegionState regionState : states) { 2413 if (regionState.isOpened()) { 2414 openRegionsCount++; 2415 } else if (regionState.isClosed()) { 2416 closedRegionCount++; 2417 } else if (regionState.isSplit()) { 2418 splitRegionCount++; 2419 } 2420 } 2421 totalRegionCount = states.size(); 2422 ritCount = totalRegionCount - openRegionsCount - splitRegionCount; 2423 } 2424 return new RegionStatesCount.RegionStatesCountBuilder().setOpenRegions(openRegionsCount) 2425 .setClosedRegions(closedRegionCount).setSplitRegions(splitRegionCount) 2426 .setRegionsInTransition(ritCount).setTotalRegions(totalRegionCount).build(); 2427 } 2428 2429}