001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.assignment;
019
020import edu.umd.cs.findbugs.annotations.NonNull;
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Collection;
024import java.util.Collections;
025import java.util.Comparator;
026import java.util.HashMap;
027import java.util.HashSet;
028import java.util.List;
029import java.util.Map;
030import java.util.Set;
031import java.util.SortedSet;
032import java.util.TreeSet;
033import java.util.concurrent.CompletableFuture;
034import java.util.concurrent.Future;
035import java.util.concurrent.TimeUnit;
036import java.util.concurrent.atomic.AtomicBoolean;
037import java.util.concurrent.locks.Condition;
038import java.util.concurrent.locks.ReentrantLock;
039import java.util.function.Consumer;
040import java.util.function.Function;
041import java.util.stream.Collectors;
042import java.util.stream.Stream;
043import org.apache.hadoop.conf.Configuration;
044import org.apache.hadoop.hbase.CatalogFamilyFormat;
045import org.apache.hadoop.hbase.DoNotRetryIOException;
046import org.apache.hadoop.hbase.HBaseIOException;
047import org.apache.hadoop.hbase.HConstants;
048import org.apache.hadoop.hbase.PleaseHoldException;
049import org.apache.hadoop.hbase.ServerName;
050import org.apache.hadoop.hbase.TableName;
051import org.apache.hadoop.hbase.UnknownRegionException;
052import org.apache.hadoop.hbase.client.DoNotRetryRegionException;
053import org.apache.hadoop.hbase.client.MasterSwitchType;
054import org.apache.hadoop.hbase.client.RegionInfo;
055import org.apache.hadoop.hbase.client.RegionInfoBuilder;
056import org.apache.hadoop.hbase.client.RegionReplicaUtil;
057import org.apache.hadoop.hbase.client.RegionStatesCount;
058import org.apache.hadoop.hbase.client.Result;
059import org.apache.hadoop.hbase.client.ResultScanner;
060import org.apache.hadoop.hbase.client.Scan;
061import org.apache.hadoop.hbase.client.TableDescriptor;
062import org.apache.hadoop.hbase.client.TableState;
063import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
064import org.apache.hadoop.hbase.favored.FavoredNodesManager;
065import org.apache.hadoop.hbase.favored.FavoredNodesPromoter;
066import org.apache.hadoop.hbase.master.LoadBalancer;
067import org.apache.hadoop.hbase.master.MasterServices;
068import org.apache.hadoop.hbase.master.MetricsAssignmentManager;
069import org.apache.hadoop.hbase.master.RegionPlan;
070import org.apache.hadoop.hbase.master.RegionState;
071import org.apache.hadoop.hbase.master.RegionState.State;
072import org.apache.hadoop.hbase.master.ServerManager;
073import org.apache.hadoop.hbase.master.TableStateManager;
074import org.apache.hadoop.hbase.master.balancer.FavoredStochasticBalancer;
075import org.apache.hadoop.hbase.master.procedure.HBCKServerCrashProcedure;
076import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
077import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler;
078import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait;
079import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
080import org.apache.hadoop.hbase.master.procedure.TruncateRegionProcedure;
081import org.apache.hadoop.hbase.master.region.MasterRegion;
082import org.apache.hadoop.hbase.procedure2.Procedure;
083import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
084import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
085import org.apache.hadoop.hbase.procedure2.ProcedureInMemoryChore;
086import org.apache.hadoop.hbase.procedure2.util.StringUtils;
087import org.apache.hadoop.hbase.regionserver.SequenceId;
088import org.apache.hadoop.hbase.rsgroup.RSGroupBasedLoadBalancer;
089import org.apache.hadoop.hbase.util.Bytes;
090import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
091import org.apache.hadoop.hbase.util.FutureUtils;
092import org.apache.hadoop.hbase.util.Pair;
093import org.apache.hadoop.hbase.util.Threads;
094import org.apache.hadoop.hbase.util.VersionInfo;
095import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
096import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
097import org.apache.yetus.audience.InterfaceAudience;
098import org.apache.zookeeper.KeeperException;
099import org.slf4j.Logger;
100import org.slf4j.LoggerFactory;
101
102import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
103import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
104import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
105import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
106import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
107
108/**
109 * The AssignmentManager is the coordinator for region assign/unassign operations.
110 * <ul>
111 * <li>In-memory states of regions and servers are stored in {@link RegionStates}.</li>
112 * <li>hbase:meta state updates are handled by {@link RegionStateStore}.</li>
113 * </ul>
114 * Regions are created by CreateTable, Split, Merge. Regions are deleted by DeleteTable, Split,
115 * Merge. Assigns are triggered by CreateTable, EnableTable, Split, Merge, ServerCrash. Unassigns
116 * are triggered by DisableTable, Split, Merge
117 */
118@InterfaceAudience.Private
119public class AssignmentManager {
120  private static final Logger LOG = LoggerFactory.getLogger(AssignmentManager.class);
121
122  // TODO: AMv2
123  // - handle region migration from hbase1 to hbase2.
124  // - handle sys table assignment first (e.g. acl, namespace)
125  // - handle table priorities
126  // - If ServerBusyException trying to update hbase:meta, we abort the Master
127  // See updateRegionLocation in RegionStateStore.
128  //
129  // See also
130  // https://docs.google.com/document/d/1eVKa7FHdeoJ1-9o8yZcOTAQbv0u0bblBlCCzVSIn69g/edit#heading=h.ystjyrkbtoq5
131  // for other TODOs.
132
133  public static final String BOOTSTRAP_THREAD_POOL_SIZE_CONF_KEY =
134    "hbase.assignment.bootstrap.thread.pool.size";
135
136  public static final String ASSIGN_DISPATCH_WAIT_MSEC_CONF_KEY =
137    "hbase.assignment.dispatch.wait.msec";
138  private static final int DEFAULT_ASSIGN_DISPATCH_WAIT_MSEC = 150;
139
140  public static final String ASSIGN_DISPATCH_WAITQ_MAX_CONF_KEY =
141    "hbase.assignment.dispatch.wait.queue.max.size";
142  private static final int DEFAULT_ASSIGN_DISPATCH_WAITQ_MAX = 100;
143
144  public static final String RIT_CHORE_INTERVAL_MSEC_CONF_KEY =
145    "hbase.assignment.rit.chore.interval.msec";
146  private static final int DEFAULT_RIT_CHORE_INTERVAL_MSEC = 60 * 1000;
147
148  public static final String DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC_CONF_KEY =
149    "hbase.assignment.dead.region.metric.chore.interval.msec";
150  private static final int DEFAULT_DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC = 120 * 1000;
151
152  public static final String ASSIGN_MAX_ATTEMPTS = "hbase.assignment.maximum.attempts";
153  private static final int DEFAULT_ASSIGN_MAX_ATTEMPTS = Integer.MAX_VALUE;
154
155  public static final String ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS =
156    "hbase.assignment.retry.immediately.maximum.attempts";
157  private static final int DEFAULT_ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS = 3;
158
159  /** Region in Transition metrics threshold time */
160  public static final String METRICS_RIT_STUCK_WARNING_THRESHOLD =
161    "hbase.metrics.rit.stuck.warning.threshold";
162  private static final int DEFAULT_RIT_STUCK_WARNING_THRESHOLD = 60 * 1000;
163  public static final String UNEXPECTED_STATE_REGION = "Unexpected state for ";
164
165  public static final String FORCE_REGION_RETAINMENT = "hbase.master.scp.retain.assignment.force";
166
167  public static final boolean DEFAULT_FORCE_REGION_RETAINMENT = false;
168
169  /** The wait time in millis before checking again if the region's previous RS is back online */
170  public static final String FORCE_REGION_RETAINMENT_WAIT_INTERVAL =
171    "hbase.master.scp.retain.assignment.force.wait-interval";
172
173  public static final long DEFAULT_FORCE_REGION_RETAINMENT_WAIT_INTERVAL = 50;
174
175  /**
176   * The number of times to check if the region's previous RS is back online, before giving up and
177   * proceeding with assignment on a new RS
178   */
179  public static final String FORCE_REGION_RETAINMENT_RETRIES =
180    "hbase.master.scp.retain.assignment.force.retries";
181
182  public static final int DEFAULT_FORCE_REGION_RETAINMENT_RETRIES = 600;
183
184  private final ProcedureEvent<?> metaAssignEvent = new ProcedureEvent<>("meta assign");
185  private final ProcedureEvent<?> metaLoadEvent = new ProcedureEvent<>("meta load");
186
187  private final MetricsAssignmentManager metrics;
188  private final RegionInTransitionChore ritChore;
189  private final DeadServerMetricRegionChore deadMetricChore;
190  private final MasterServices master;
191
192  private final AtomicBoolean running = new AtomicBoolean(false);
193  private final RegionStates regionStates = new RegionStates();
194  private final RegionStateStore regionStateStore;
195
196  /**
197   * When the operator uses this configuration option, any version between the current cluster
198   * version and the value of "hbase.min.version.move.system.tables" does not trigger any
199   * auto-region movement. Auto-region movement here refers to auto-migration of system table
200   * regions to newer server versions. It is assumed that the configured range of versions does not
201   * require special handling of moving system table regions to higher versioned RegionServer. This
202   * auto-migration is done by {@link #checkIfShouldMoveSystemRegionAsync()}. Example: Let's assume
203   * the cluster is on version 1.4.0 and we have set "hbase.min.version.move.system.tables" as
204   * "2.0.0". Now if we upgrade one RegionServer on 1.4.0 cluster to 1.6.0 (< 2.0.0), then
205   * AssignmentManager will not move hbase:meta, hbase:namespace and other system table regions to
206   * newly brought up RegionServer 1.6.0 as part of auto-migration. However, if we upgrade one
207   * RegionServer on 1.4.0 cluster to 2.2.0 (> 2.0.0), then AssignmentManager will move all system
208   * table regions to newly brought up RegionServer 2.2.0 as part of auto-migration done by
209   * {@link #checkIfShouldMoveSystemRegionAsync()}. "hbase.min.version.move.system.tables" is
210   * introduced as part of HBASE-22923.
211   */
212  private final String minVersionToMoveSysTables;
213
214  private static final String MIN_VERSION_MOVE_SYS_TABLES_CONFIG =
215    "hbase.min.version.move.system.tables";
216  private static final String DEFAULT_MIN_VERSION_MOVE_SYS_TABLES_CONFIG = "";
217
218  private final Map<ServerName, Set<byte[]>> rsReports = new HashMap<>();
219
220  private final boolean shouldAssignRegionsWithFavoredNodes;
221  private final int assignDispatchWaitQueueMaxSize;
222  private final int assignDispatchWaitMillis;
223  private final int assignMaxAttempts;
224  private final int assignRetryImmediatelyMaxAttempts;
225
226  private final MasterRegion masterRegion;
227
228  private final Object checkIfShouldMoveSystemRegionLock = new Object();
229
230  private Thread assignThread;
231
232  private final boolean forceRegionRetainment;
233
234  private final long forceRegionRetainmentWaitInterval;
235
236  private final int forceRegionRetainmentRetries;
237
238  private final RegionInTransitionTracker regionInTransitionTracker =
239    new RegionInTransitionTracker();
240
241  public AssignmentManager(MasterServices master, MasterRegion masterRegion) {
242    this(master, masterRegion, new RegionStateStore(master, masterRegion));
243  }
244
245  AssignmentManager(MasterServices master, MasterRegion masterRegion, RegionStateStore stateStore) {
246    this.master = master;
247    this.regionStateStore = stateStore;
248    this.metrics = new MetricsAssignmentManager();
249    this.masterRegion = masterRegion;
250
251    final Configuration conf = master.getConfiguration();
252
253    // Only read favored nodes if using the favored nodes load balancer.
254    this.shouldAssignRegionsWithFavoredNodes = FavoredStochasticBalancer.class
255      .isAssignableFrom(conf.getClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, Object.class));
256
257    this.assignDispatchWaitMillis =
258      conf.getInt(ASSIGN_DISPATCH_WAIT_MSEC_CONF_KEY, DEFAULT_ASSIGN_DISPATCH_WAIT_MSEC);
259    this.assignDispatchWaitQueueMaxSize =
260      conf.getInt(ASSIGN_DISPATCH_WAITQ_MAX_CONF_KEY, DEFAULT_ASSIGN_DISPATCH_WAITQ_MAX);
261
262    this.assignMaxAttempts =
263      Math.max(1, conf.getInt(ASSIGN_MAX_ATTEMPTS, DEFAULT_ASSIGN_MAX_ATTEMPTS));
264    this.assignRetryImmediatelyMaxAttempts = conf.getInt(ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS,
265      DEFAULT_ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS);
266
267    int ritChoreInterval =
268      conf.getInt(RIT_CHORE_INTERVAL_MSEC_CONF_KEY, DEFAULT_RIT_CHORE_INTERVAL_MSEC);
269    this.ritChore = new RegionInTransitionChore(ritChoreInterval);
270
271    int deadRegionChoreInterval = conf.getInt(DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC_CONF_KEY,
272      DEFAULT_DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC);
273    if (deadRegionChoreInterval > 0) {
274      this.deadMetricChore = new DeadServerMetricRegionChore(deadRegionChoreInterval);
275    } else {
276      this.deadMetricChore = null;
277    }
278    minVersionToMoveSysTables =
279      conf.get(MIN_VERSION_MOVE_SYS_TABLES_CONFIG, DEFAULT_MIN_VERSION_MOVE_SYS_TABLES_CONFIG);
280
281    forceRegionRetainment =
282      conf.getBoolean(FORCE_REGION_RETAINMENT, DEFAULT_FORCE_REGION_RETAINMENT);
283    forceRegionRetainmentWaitInterval = conf.getLong(FORCE_REGION_RETAINMENT_WAIT_INTERVAL,
284      DEFAULT_FORCE_REGION_RETAINMENT_WAIT_INTERVAL);
285    forceRegionRetainmentRetries =
286      conf.getInt(FORCE_REGION_RETAINMENT_RETRIES, DEFAULT_FORCE_REGION_RETAINMENT_RETRIES);
287  }
288
289  private void mirrorMetaLocations() throws IOException, KeeperException {
290    // For compatibility, mirror the meta region state to zookeeper
291    // And we still need to use zookeeper to publish the meta region locations to region
292    // server, so they can serve as ClientMetaService
293    ZKWatcher zk = master.getZooKeeper();
294    if (zk == null || !zk.getRecoverableZooKeeper().getState().isAlive()) {
295      // this is possible in tests, we do not provide a zk watcher or the zk watcher has been closed
296      return;
297    }
298    Collection<RegionStateNode> metaStates = regionStates.getRegionStateNodes();
299    for (RegionStateNode metaState : metaStates) {
300      MetaTableLocator.setMetaLocation(zk, metaState.getRegionLocation(),
301        metaState.getRegionInfo().getReplicaId(), metaState.getState());
302    }
303    int replicaCount = metaStates.size();
304    // remove extra mirror locations
305    for (String znode : zk.getMetaReplicaNodes()) {
306      int replicaId = zk.getZNodePaths().getMetaReplicaIdFromZNode(znode);
307      if (replicaId >= replicaCount) {
308        MetaTableLocator.deleteMetaLocation(zk, replicaId);
309      }
310    }
311  }
312
313  public void start() throws IOException, KeeperException {
314    if (!running.compareAndSet(false, true)) {
315      return;
316    }
317
318    LOG.trace("Starting assignment manager");
319
320    // Start the Assignment Thread
321    startAssignmentThread();
322    // load meta region states.
323    // here we are still in the early steps of active master startup. There is only one thread(us)
324    // can access AssignmentManager and create region node, so here we do not need to lock the
325    // region node.
326    try (ResultScanner scanner =
327      masterRegion.getScanner(new Scan().addFamily(HConstants.CATALOG_FAMILY))) {
328      for (;;) {
329        Result result = scanner.next();
330        if (result == null) {
331          break;
332        }
333        RegionStateStore
334          .visitMetaEntry((r, regionInfo, state, regionLocation, lastHost, openSeqNum) -> {
335            RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo);
336            regionNode.setState(state);
337            regionNode.setLastHost(lastHost);
338            regionNode.setRegionLocation(regionLocation);
339            regionNode.setOpenSeqNum(openSeqNum);
340            regionInTransitionTracker.handleRegionStateNodeOperation(regionNode);
341
342            if (regionNode.getProcedure() != null) {
343              regionNode.getProcedure().stateLoaded(this, regionNode);
344            }
345            if (regionLocation != null) {
346              // TODO: this could lead to some orphan server state nodes, as it is possible that the
347              // region server is already dead and its SCP has already finished but we have
348              // persisted an opening state on this region server. Finally the TRSP will assign the
349              // region to another region server, so it will not cause critical problems, just waste
350              // some memory as no one will try to cleanup these orphan server state nodes.
351              regionStates.createServer(regionLocation);
352              regionStates.addRegionToServer(regionNode);
353            }
354            if (RegionReplicaUtil.isDefaultReplica(regionInfo.getReplicaId())) {
355              setMetaAssigned(regionInfo, state == State.OPEN);
356            }
357            LOG.debug("Loaded {} {}", TableName.META_TABLE_NAME, regionNode);
358          }, result);
359      }
360    }
361    mirrorMetaLocations();
362  }
363
364  /**
365   * Create RegionStateNode based on the TRSP list, and attach the TRSP to the RegionStateNode.
366   * <p>
367   * This is used to restore the RIT region list, so we do not need to restore it in the loadingMeta
368   * method below. And it is also very important as now before submitting a TRSP, we need to attach
369   * it to the RegionStateNode, which acts like a guard, so we need to restore this information at
370   * the very beginning, before we start processing any procedures.
371   */
372  public void setupRIT(List<TransitRegionStateProcedure> procs) {
373    procs.forEach(proc -> {
374      RegionInfo regionInfo = proc.getRegion();
375      RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo);
376      TransitRegionStateProcedure existingProc = regionNode.getProcedure();
377      if (existingProc != null) {
378        // This is possible, as we will detach the procedure from the RSN before we
379        // actually finish the procedure. This is because that, we will detach the TRSP from the RSN
380        // during execution, at that time, the procedure has not been marked as done in the pv2
381        // framework yet, so it is possible that we schedule a new TRSP immediately and when
382        // arriving here, we will find out that there are multiple TRSPs for the region. But we can
383        // make sure that, only the last one can take the charge, the previous ones should have all
384        // been finished already. So here we will compare the proc id, the greater one will win.
385        if (existingProc.getProcId() < proc.getProcId()) {
386          // the new one wins, unset and set it to the new one below
387          regionNode.unsetProcedure(existingProc);
388        } else {
389          // the old one wins, skip
390          return;
391        }
392      }
393      LOG.info("Attach {} to {}", proc, regionNode);
394      regionNode.setProcedure(proc);
395    });
396  }
397
398  public void initializationPostMetaOnline() {
399    // now that we are sure that meta is online, we can set TableStateManger in
400    // regionInTransitionTracker
401    regionInTransitionTracker.setTableStateManager(master.getTableStateManager());
402  }
403
404  public void stop() {
405    if (!running.compareAndSet(true, false)) {
406      return;
407    }
408
409    LOG.info("Stopping assignment manager");
410
411    // The AM is started before the procedure executor,
412    // but the actual work will be loaded/submitted only once we have the executor
413    final boolean hasProcExecutor = master.getMasterProcedureExecutor() != null;
414
415    // Remove the RIT chore
416    if (hasProcExecutor) {
417      master.getMasterProcedureExecutor().removeChore(this.ritChore);
418      if (this.deadMetricChore != null) {
419        master.getMasterProcedureExecutor().removeChore(this.deadMetricChore);
420      }
421    }
422
423    // Stop the Assignment Thread
424    stopAssignmentThread();
425
426    // Stop the RegionStateStore
427    regionStates.clear();
428    regionInTransitionTracker.stop();
429
430    // Update meta events (for testing)
431    if (hasProcExecutor) {
432      metaLoadEvent.suspend();
433      for (RegionInfo hri : getMetaRegionSet()) {
434        setMetaAssigned(hri, false);
435      }
436    }
437  }
438
439  public boolean isRunning() {
440    return running.get();
441  }
442
443  public Configuration getConfiguration() {
444    return master.getConfiguration();
445  }
446
447  public MetricsAssignmentManager getAssignmentManagerMetrics() {
448    return metrics;
449  }
450
451  private LoadBalancer getBalancer() {
452    return master.getLoadBalancer();
453  }
454
455  private FavoredNodesPromoter getFavoredNodePromoter() {
456    return (FavoredNodesPromoter) ((RSGroupBasedLoadBalancer) master.getLoadBalancer())
457      .getInternalBalancer();
458  }
459
460  private MasterProcedureEnv getProcedureEnvironment() {
461    return master.getMasterProcedureExecutor().getEnvironment();
462  }
463
464  private MasterProcedureScheduler getProcedureScheduler() {
465    return getProcedureEnvironment().getProcedureScheduler();
466  }
467
468  int getAssignMaxAttempts() {
469    return assignMaxAttempts;
470  }
471
472  public boolean isForceRegionRetainment() {
473    return forceRegionRetainment;
474  }
475
476  public long getForceRegionRetainmentWaitInterval() {
477    return forceRegionRetainmentWaitInterval;
478  }
479
480  public int getForceRegionRetainmentRetries() {
481    return forceRegionRetainmentRetries;
482  }
483
484  int getAssignRetryImmediatelyMaxAttempts() {
485    return assignRetryImmediatelyMaxAttempts;
486  }
487
488  public RegionStates getRegionStates() {
489    return regionStates;
490  }
491
492  /**
493   * Returns the regions hosted by the specified server.
494   * <p/>
495   * Notice that, for SCP, after we submit the SCP, no one can change the region list for the
496   * ServerStateNode so we do not need any locks here. And for other usage, this can only give you a
497   * snapshot of the current region list for this server, which means, right after you get the
498   * region list, new regions may be moved to this server or some regions may be moved out from this
499   * server, so you should not use it critically if you need strong consistency.
500   */
501  public List<RegionInfo> getRegionsOnServer(ServerName serverName) {
502    ServerStateNode serverInfo = regionStates.getServerNode(serverName);
503    if (serverInfo == null) {
504      return Collections.emptyList();
505    }
506    return serverInfo.getRegionInfoList();
507  }
508
509  private RegionInfo getRegionInfo(RegionStateNode rsn) {
510    if (rsn.isSplit() && !rsn.getRegionInfo().isSplit()) {
511      // see the comments in markRegionAsSplit on why we need to do this converting.
512      return RegionInfoBuilder.newBuilder(rsn.getRegionInfo()).setSplit(true).setOffline(true)
513        .build();
514    } else {
515      return rsn.getRegionInfo();
516    }
517  }
518
519  private Stream<RegionStateNode> getRegionStateNodes(TableName tableName,
520    boolean excludeOfflinedSplitParents) {
521    Stream<RegionStateNode> stream = regionStates.getTableRegionStateNodes(tableName).stream();
522    if (excludeOfflinedSplitParents) {
523      return stream.filter(rsn -> !rsn.isSplit());
524    } else {
525      return stream;
526    }
527  }
528
529  public List<RegionInfo> getTableRegions(TableName tableName,
530    boolean excludeOfflinedSplitParents) {
531    return getRegionStateNodes(tableName, excludeOfflinedSplitParents).map(this::getRegionInfo)
532      .collect(Collectors.toList());
533  }
534
535  public List<Pair<RegionInfo, ServerName>> getTableRegionsAndLocations(TableName tableName,
536    boolean excludeOfflinedSplitParents) {
537    return getRegionStateNodes(tableName, excludeOfflinedSplitParents)
538      .map(rsn -> Pair.newPair(getRegionInfo(rsn), rsn.getRegionLocation()))
539      .collect(Collectors.toList());
540  }
541
542  public RegionStateStore getRegionStateStore() {
543    return regionStateStore;
544  }
545
546  public List<ServerName> getFavoredNodes(final RegionInfo regionInfo) {
547    return this.shouldAssignRegionsWithFavoredNodes
548      ? getFavoredNodePromoter().getFavoredNodes(regionInfo)
549      : ServerName.EMPTY_SERVER_LIST;
550  }
551
552  // ============================================================================================
553  // Table State Manager helpers
554  // ============================================================================================
555  private TableStateManager getTableStateManager() {
556    return master.getTableStateManager();
557  }
558
559  private boolean isTableEnabled(final TableName tableName) {
560    return getTableStateManager().isTableState(tableName, TableState.State.ENABLED);
561  }
562
563  private boolean isTableDisabled(final TableName tableName) {
564    return getTableStateManager().isTableState(tableName, TableState.State.DISABLED,
565      TableState.State.DISABLING);
566  }
567
568  // ============================================================================================
569  // META Helpers
570  // ============================================================================================
571  private boolean isMetaRegion(final RegionInfo regionInfo) {
572    return regionInfo.isMetaRegion();
573  }
574
575  public boolean isMetaRegion(final byte[] regionName) {
576    return getMetaRegionFromName(regionName) != null;
577  }
578
579  public RegionInfo getMetaRegionFromName(final byte[] regionName) {
580    for (RegionInfo hri : getMetaRegionSet()) {
581      if (Bytes.equals(hri.getRegionName(), regionName)) {
582        return hri;
583      }
584    }
585    return null;
586  }
587
588  public boolean isCarryingMeta(final ServerName serverName) {
589    // TODO: handle multiple meta
590    return isCarryingRegion(serverName, RegionInfoBuilder.FIRST_META_REGIONINFO);
591  }
592
593  private boolean isCarryingRegion(final ServerName serverName, final RegionInfo regionInfo) {
594    // TODO: check for state?
595    final RegionStateNode node = regionStates.getRegionStateNode(regionInfo);
596    return (node != null && serverName.equals(node.getRegionLocation()));
597  }
598
599  private RegionInfo getMetaForRegion(final RegionInfo regionInfo) {
600    // if (regionInfo.isMetaRegion()) return regionInfo;
601    // TODO: handle multiple meta. if the region provided is not meta lookup
602    // which meta the region belongs to.
603    return RegionInfoBuilder.FIRST_META_REGIONINFO;
604  }
605
606  // TODO: handle multiple meta.
607  private static final Set<RegionInfo> META_REGION_SET =
608    Collections.singleton(RegionInfoBuilder.FIRST_META_REGIONINFO);
609
610  public Set<RegionInfo> getMetaRegionSet() {
611    return META_REGION_SET;
612  }
613
614  // ============================================================================================
615  // META Event(s) helpers
616  // ============================================================================================
617  /**
618   * Notice that, this only means the meta region is available on a RS, but the AM may still be
619   * loading the region states from meta, so usually you need to check {@link #isMetaLoaded()} first
620   * before checking this method, unless you can make sure that your piece of code can only be
621   * executed after AM builds the region states.
622   * @see #isMetaLoaded()
623   */
624  public boolean isMetaAssigned() {
625    return metaAssignEvent.isReady();
626  }
627
628  public boolean isMetaRegionInTransition() {
629    return !isMetaAssigned();
630  }
631
632  /**
633   * Notice that this event does not mean the AM has already finished region state rebuilding. See
634   * the comment of {@link #isMetaAssigned()} for more details.
635   * @see #isMetaAssigned()
636   */
637  public boolean waitMetaAssigned(Procedure<?> proc, RegionInfo regionInfo) {
638    return getMetaAssignEvent(getMetaForRegion(regionInfo)).suspendIfNotReady(proc);
639  }
640
641  private void setMetaAssigned(RegionInfo metaRegionInfo, boolean assigned) {
642    assert isMetaRegion(metaRegionInfo) : "unexpected non-meta region " + metaRegionInfo;
643    ProcedureEvent<?> metaAssignEvent = getMetaAssignEvent(metaRegionInfo);
644    if (assigned) {
645      metaAssignEvent.wake(getProcedureScheduler());
646    } else {
647      metaAssignEvent.suspend();
648    }
649  }
650
651  private ProcedureEvent<?> getMetaAssignEvent(RegionInfo metaRegionInfo) {
652    assert isMetaRegion(metaRegionInfo) : "unexpected non-meta region " + metaRegionInfo;
653    // TODO: handle multiple meta.
654    return metaAssignEvent;
655  }
656
657  /**
658   * Wait until AM finishes the meta loading, i.e, the region states rebuilding.
659   * @see #isMetaLoaded()
660   * @see #waitMetaAssigned(Procedure, RegionInfo)
661   */
662  public boolean waitMetaLoaded(Procedure<?> proc) {
663    return metaLoadEvent.suspendIfNotReady(proc);
664  }
665
666  /**
667   * This method will be called in master initialization method after calling
668   * {@link #processOfflineRegions()}, as in processOfflineRegions we will generate assign
669   * procedures for offline regions, which may be conflict with creating table.
670   * <p/>
671   * This is a bit dirty, should be reconsidered after we decide whether to keep the
672   * {@link #processOfflineRegions()} method.
673   */
674  public void wakeMetaLoadedEvent() {
675    metaLoadEvent.wake(getProcedureScheduler());
676    assert isMetaLoaded() : "expected meta to be loaded";
677  }
678
679  /**
680   * Return whether AM finishes the meta loading, i.e, the region states rebuilding.
681   * @see #isMetaAssigned()
682   * @see #waitMetaLoaded(Procedure)
683   */
684  public boolean isMetaLoaded() {
685    return metaLoadEvent.isReady();
686  }
687
688  /**
689   * Start a new thread to check if there are region servers whose versions are higher than others.
690   * If so, move all system table regions to RS with the highest version to keep compatibility. The
691   * reason is, RS in new version may not be able to access RS in old version when there are some
692   * incompatible changes.
693   * <p>
694   * This method is called when a new RegionServer is added to cluster only.
695   * </p>
696   */
697  public void checkIfShouldMoveSystemRegionAsync() {
698    // TODO: Fix this thread. If a server is killed and a new one started, this thread thinks that
699    // it should 'move' the system tables from the old server to the new server but
700    // ServerCrashProcedure is on it; and it will take care of the assign without dataloss.
701    if (this.master.getServerManager().countOfRegionServers() <= 1) {
702      return;
703    }
704    // This thread used to run whenever there was a change in the cluster. The ZooKeeper
705    // childrenChanged notification came in before the nodeDeleted message and so this method
706    // cold run before a ServerCrashProcedure could run. That meant that this thread could see
707    // a Crashed Server before ServerCrashProcedure and it could find system regions on the
708    // crashed server and go move them before ServerCrashProcedure had a chance; could be
709    // dataloss too if WALs were not recovered.
710    new Thread(() -> {
711      try {
712        synchronized (checkIfShouldMoveSystemRegionLock) {
713          List<RegionPlan> plans = new ArrayList<>();
714          // TODO: I don't think this code does a good job if all servers in cluster have same
715          // version. It looks like it will schedule unnecessary moves.
716          for (ServerName server : getExcludedServersForSystemTable()) {
717            if (master.getServerManager().isServerDead(server)) {
718              // TODO: See HBASE-18494 and HBASE-18495. Though getExcludedServersForSystemTable()
719              // considers only online servers, the server could be queued for dead server
720              // processing. As region assignments for crashed server is handled by
721              // ServerCrashProcedure, do NOT handle them here. The goal is to handle this through
722              // regular flow of LoadBalancer as a favored node and not to have this special
723              // handling.
724              continue;
725            }
726            List<RegionInfo> regionsShouldMove = getSystemTables(server);
727            if (!regionsShouldMove.isEmpty()) {
728              for (RegionInfo regionInfo : regionsShouldMove) {
729                // null value for dest forces destination server to be selected by balancer
730                RegionPlan plan = new RegionPlan(regionInfo, server, null);
731                if (regionInfo.isMetaRegion()) {
732                  // Must move meta region first.
733                  LOG.info("Async MOVE of {} to newer Server={}", regionInfo.getEncodedName(),
734                    server);
735                  moveAsync(plan);
736                } else {
737                  plans.add(plan);
738                }
739              }
740            }
741            for (RegionPlan plan : plans) {
742              LOG.info("Async MOVE of {} to newer Server={}", plan.getRegionInfo().getEncodedName(),
743                server);
744              moveAsync(plan);
745            }
746          }
747        }
748      } catch (Throwable t) {
749        LOG.error(t.toString(), t);
750      }
751    }).start();
752  }
753
754  private List<RegionInfo> getSystemTables(ServerName serverName) {
755    ServerStateNode serverNode = regionStates.getServerNode(serverName);
756    if (serverNode == null) {
757      return Collections.emptyList();
758    }
759    return serverNode.getSystemRegionInfoList();
760  }
761
762  private void preTransitCheck(RegionStateNode regionNode, RegionState.State[] expectedStates)
763    throws HBaseIOException {
764    if (regionNode.getProcedure() != null) {
765      throw new HBaseIOException(
766        regionNode + " is currently in transition; pid=" + regionNode.getProcedure().getProcId());
767    }
768    if (!regionNode.isInState(expectedStates)) {
769      throw new DoNotRetryRegionException(UNEXPECTED_STATE_REGION + regionNode);
770    }
771    if (isTableDisabled(regionNode.getTable())) {
772      throw new DoNotRetryIOException(regionNode.getTable() + " is disabled for " + regionNode);
773    }
774  }
775
776  /**
777   * Create an assign TransitRegionStateProcedure. Makes sure of RegionState. Throws exception if
778   * not appropriate UNLESS override is set. Used by hbck2 but also by straightline
779   * {@link #assign(RegionInfo, ServerName)} and {@link #assignAsync(RegionInfo, ServerName)}.
780   * @see #createAssignProcedure(RegionStateNode, ServerName) for a version that does NO checking
781   *      used when only when no checking needed.
782   * @param override If false, check RegionState is appropriate for assign; if not throw exception.
783   */
784  private TransitRegionStateProcedure createAssignProcedure(RegionInfo regionInfo, ServerName sn,
785    boolean override, boolean force) throws IOException {
786    RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo);
787    regionNode.lock();
788    try {
789      if (override) {
790        if (!force) {
791          preTransitCheck(regionNode, STATES_EXPECTED_ON_ASSIGN);
792        }
793        if (regionNode.getProcedure() != null) {
794          regionNode.unsetProcedure(regionNode.getProcedure());
795        }
796      } else {
797        preTransitCheck(regionNode, STATES_EXPECTED_ON_ASSIGN);
798      }
799      assert regionNode.getProcedure() == null;
800      TransitRegionStateProcedure proc = regionNode.setProcedure(
801        TransitRegionStateProcedure.assign(getProcedureEnvironment(), regionInfo, sn));
802      regionInTransitionTracker.handleRegionStateNodeOperation(regionNode);
803      return proc;
804    } finally {
805      regionNode.unlock();
806    }
807  }
808
809  /**
810   * Create an assign TransitRegionStateProcedure. Does NO checking of RegionState. Presumes
811   * appriopriate state ripe for assign.
812   * @see #createAssignProcedure(RegionInfo, ServerName, boolean, boolean)
813   */
814  private TransitRegionStateProcedure createAssignProcedure(RegionStateNode regionNode,
815    ServerName targetServer) {
816    regionNode.lock();
817    try {
818      TransitRegionStateProcedure proc = regionNode.setProcedure(TransitRegionStateProcedure
819        .assign(getProcedureEnvironment(), regionNode.getRegionInfo(), targetServer));
820      regionInTransitionTracker.handleRegionStateNodeOperation(regionNode);
821      return proc;
822    } finally {
823      regionNode.unlock();
824    }
825  }
826
827  public long assign(RegionInfo regionInfo, ServerName sn) throws IOException {
828    TransitRegionStateProcedure proc = createAssignProcedure(regionInfo, sn, false, false);
829    ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc);
830    return proc.getProcId();
831  }
832
833  public long assign(RegionInfo regionInfo) throws IOException {
834    return assign(regionInfo, null);
835  }
836
837  /**
838   * Submits a procedure that assigns a region to a target server without waiting for it to finish
839   * @param regionInfo the region we would like to assign
840   * @param sn         target server name
841   */
842  public Future<byte[]> assignAsync(RegionInfo regionInfo, ServerName sn) throws IOException {
843    return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(),
844      createAssignProcedure(regionInfo, sn, false, false));
845  }
846
847  /**
848   * Submits a procedure that assigns a region without waiting for it to finish
849   * @param regionInfo the region we would like to assign
850   */
851  public Future<byte[]> assignAsync(RegionInfo regionInfo) throws IOException {
852    return assignAsync(regionInfo, null);
853  }
854
855  public long unassign(RegionInfo regionInfo) throws IOException {
856    RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo);
857    if (regionNode == null) {
858      throw new UnknownRegionException("No RegionState found for " + regionInfo.getEncodedName());
859    }
860    TransitRegionStateProcedure proc;
861    regionNode.lock();
862    try {
863      preTransitCheck(regionNode, STATES_EXPECTED_ON_UNASSIGN_OR_MOVE);
864      proc = TransitRegionStateProcedure.unassign(getProcedureEnvironment(), regionInfo);
865      regionNode.setProcedure(proc);
866    } finally {
867      regionNode.unlock();
868    }
869    ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc);
870    return proc.getProcId();
871  }
872
873  public TransitRegionStateProcedure createMoveRegionProcedure(RegionInfo regionInfo,
874    ServerName targetServer) throws HBaseIOException {
875    RegionStateNode regionNode = this.regionStates.getRegionStateNode(regionInfo);
876    if (regionNode == null) {
877      throw new UnknownRegionException(
878        "No RegionStateNode found for " + regionInfo.getEncodedName() + "(Closed/Deleted?)");
879    }
880    TransitRegionStateProcedure proc;
881    regionNode.lock();
882    try {
883      preTransitCheck(regionNode, STATES_EXPECTED_ON_UNASSIGN_OR_MOVE);
884      regionNode.checkOnline();
885      proc = TransitRegionStateProcedure.move(getProcedureEnvironment(), regionInfo, targetServer);
886      regionNode.setProcedure(proc);
887    } finally {
888      regionNode.unlock();
889    }
890    return proc;
891  }
892
893  public void move(RegionInfo regionInfo) throws IOException {
894    TransitRegionStateProcedure proc = createMoveRegionProcedure(regionInfo, null);
895    ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc);
896  }
897
898  public Future<byte[]> moveAsync(RegionPlan regionPlan) throws HBaseIOException {
899    TransitRegionStateProcedure proc =
900      createMoveRegionProcedure(regionPlan.getRegionInfo(), regionPlan.getDestination());
901    return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), proc);
902  }
903
904  public Future<byte[]> balance(RegionPlan regionPlan) throws HBaseIOException {
905    ServerName current =
906      this.getRegionStates().getRegionAssignments().get(regionPlan.getRegionInfo());
907    if (current == null || !current.equals(regionPlan.getSource())) {
908      LOG.debug("Skip region plan {}, source server not match, current region location is {}",
909        regionPlan, current == null ? "(null)" : current);
910      return null;
911    }
912    return moveAsync(regionPlan);
913  }
914
915  // ============================================================================================
916  // RegionTransition procedures helpers
917  // ============================================================================================
918
919  /**
920   * Create round-robin assigns. Use on table creation to distribute out regions across cluster.
921   * @return AssignProcedures made out of the passed in <code>hris</code> and a call to the balancer
922   *         to populate the assigns with targets chosen using round-robin (default balancer
923   *         scheme). If at assign-time, the target chosen is no longer up, thats fine, the
924   *         AssignProcedure will ask the balancer for a new target, and so on.
925   */
926  public TransitRegionStateProcedure[] createRoundRobinAssignProcedures(List<RegionInfo> hris,
927    List<ServerName> serversToExclude) {
928    if (hris.isEmpty()) {
929      return new TransitRegionStateProcedure[0];
930    }
931
932    if (
933      serversToExclude != null && this.master.getServerManager().getOnlineServersList().size() == 1
934    ) {
935      LOG.debug("Only one region server found and hence going ahead with the assignment");
936      serversToExclude = null;
937    }
938    try {
939      // Ask the balancer to assign our regions. Pass the regions en masse. The balancer can do
940      // a better job if it has all the assignments in the one lump.
941      Map<ServerName, List<RegionInfo>> assignments = getBalancer().roundRobinAssignment(hris,
942        this.master.getServerManager().createDestinationServersList(serversToExclude));
943      // Return mid-method!
944      return createAssignProcedures(assignments);
945    } catch (IOException hioe) {
946      LOG.warn("Failed roundRobinAssignment", hioe);
947    }
948    // If an error above, fall-through to this simpler assign. Last resort.
949    return createAssignProcedures(hris);
950  }
951
952  /**
953   * Create round-robin assigns. Use on table creation to distribute out regions across cluster.
954   * @return AssignProcedures made out of the passed in <code>hris</code> and a call to the balancer
955   *         to populate the assigns with targets chosen using round-robin (default balancer
956   *         scheme). If at assign-time, the target chosen is no longer up, thats fine, the
957   *         AssignProcedure will ask the balancer for a new target, and so on.
958   */
959  public TransitRegionStateProcedure[] createRoundRobinAssignProcedures(List<RegionInfo> hris) {
960    return createRoundRobinAssignProcedures(hris, null);
961  }
962
963  static int compare(TransitRegionStateProcedure left, TransitRegionStateProcedure right) {
964    if (left.getRegion().isMetaRegion()) {
965      if (right.getRegion().isMetaRegion()) {
966        return RegionInfo.COMPARATOR.compare(left.getRegion(), right.getRegion());
967      }
968      return -1;
969    } else if (right.getRegion().isMetaRegion()) {
970      return +1;
971    }
972    if (left.getRegion().getTable().isSystemTable()) {
973      if (right.getRegion().getTable().isSystemTable()) {
974        return RegionInfo.COMPARATOR.compare(left.getRegion(), right.getRegion());
975      }
976      return -1;
977    } else if (right.getRegion().getTable().isSystemTable()) {
978      return +1;
979    }
980    return RegionInfo.COMPARATOR.compare(left.getRegion(), right.getRegion());
981  }
982
983  /**
984   * Create one TransitRegionStateProcedure to assign a region w/o specifying a target server. This
985   * method is called from HBCK2.
986   * @return an assign or null
987   */
988  public TransitRegionStateProcedure createOneAssignProcedure(RegionInfo ri, boolean override,
989    boolean force) {
990    TransitRegionStateProcedure trsp = null;
991    try {
992      trsp = createAssignProcedure(ri, null, override, force);
993    } catch (IOException ioe) {
994      LOG.info(
995        "Failed {} assign, override={}"
996          + (override ? "" : "; set override to by-pass state checks."),
997        ri.getEncodedName(), override, ioe);
998    }
999    return trsp;
1000  }
1001
1002  /**
1003   * Create one TransitRegionStateProcedure to unassign a region. This method is called from HBCK2.
1004   * @return an unassign or null
1005   */
1006  public TransitRegionStateProcedure createOneUnassignProcedure(RegionInfo ri, boolean override,
1007    boolean force) {
1008    RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(ri);
1009    TransitRegionStateProcedure trsp = null;
1010    regionNode.lock();
1011    try {
1012      if (override) {
1013        if (!force) {
1014          preTransitCheck(regionNode, STATES_EXPECTED_ON_UNASSIGN_OR_MOVE);
1015        }
1016        if (regionNode.getProcedure() != null) {
1017          regionNode.unsetProcedure(regionNode.getProcedure());
1018        }
1019      } else {
1020        // This is where we could throw an exception; i.e. override is false.
1021        preTransitCheck(regionNode, STATES_EXPECTED_ON_UNASSIGN_OR_MOVE);
1022      }
1023      assert regionNode.getProcedure() == null;
1024      trsp =
1025        TransitRegionStateProcedure.unassign(getProcedureEnvironment(), regionNode.getRegionInfo());
1026      regionNode.setProcedure(trsp);
1027    } catch (IOException ioe) {
1028      // 'override' must be false here.
1029      LOG.info("Failed {} unassign, override=false; set override to by-pass state checks.",
1030        ri.getEncodedName(), ioe);
1031    } finally {
1032      regionNode.unlock();
1033    }
1034    return trsp;
1035  }
1036
1037  /**
1038   * Create an array of TransitRegionStateProcedure w/o specifying a target server. Used as fallback
1039   * of caller is unable to do {@link #createAssignProcedures(Map)}.
1040   * <p/>
1041   * If no target server, at assign time, we will try to use the former location of the region if
1042   * one exists. This is how we 'retain' the old location across a server restart.
1043   * <p/>
1044   * Should only be called when you can make sure that no one can touch these regions other than
1045   * you. For example, when you are creating or enabling table. Presumes all Regions are in
1046   * appropriate state ripe for assign; no checking of Region state is done in here.
1047   * @see #createAssignProcedures(Map)
1048   */
1049  public TransitRegionStateProcedure[] createAssignProcedures(List<RegionInfo> hris) {
1050    return hris.stream().map(hri -> regionStates.getOrCreateRegionStateNode(hri))
1051      .map(regionNode -> createAssignProcedure(regionNode, null)).sorted(AssignmentManager::compare)
1052      .toArray(TransitRegionStateProcedure[]::new);
1053  }
1054
1055  /**
1056   * Tied to {@link #createAssignProcedures(List)} in that it is called if caller is unable to run
1057   * this method. Presumes all Regions are in appropriate state ripe for assign; no checking of
1058   * Region state is done in here.
1059   * @param assignments Map of assignments from which we produce an array of AssignProcedures.
1060   * @return Assignments made from the passed in <code>assignments</code>
1061   * @see #createAssignProcedures(List)
1062   */
1063  private TransitRegionStateProcedure[]
1064    createAssignProcedures(Map<ServerName, List<RegionInfo>> assignments) {
1065    return assignments.entrySet().stream()
1066      .flatMap(e -> e.getValue().stream().map(hri -> regionStates.getOrCreateRegionStateNode(hri))
1067        .map(regionNode -> createAssignProcedure(regionNode, e.getKey())))
1068      .sorted(AssignmentManager::compare).toArray(TransitRegionStateProcedure[]::new);
1069  }
1070
1071  // for creating unassign TRSP when disabling a table or closing excess region replicas
1072  private TransitRegionStateProcedure forceCreateUnssignProcedure(RegionStateNode regionNode) {
1073    regionNode.lock();
1074    try {
1075      if (regionNode.isInState(State.OFFLINE, State.CLOSED, State.SPLIT)) {
1076        return null;
1077      }
1078      // in general, a split parent should be in CLOSED or SPLIT state, but anyway, let's check it
1079      // here for safety
1080      if (regionNode.getRegionInfo().isSplit()) {
1081        LOG.warn("{} is a split parent but not in CLOSED or SPLIT state", regionNode);
1082        return null;
1083      }
1084      // As in DisableTableProcedure or ModifyTableProcedure, we will hold the xlock for table, so
1085      // we can make sure that this procedure has not been executed yet, as TRSP will hold the
1086      // shared lock for table all the time. So here we will unset it and when it is actually
1087      // executed, it will find that the attach procedure is not itself and quit immediately.
1088      if (regionNode.getProcedure() != null) {
1089        regionNode.unsetProcedure(regionNode.getProcedure());
1090      }
1091      return regionNode.setProcedure(TransitRegionStateProcedure.unassign(getProcedureEnvironment(),
1092        regionNode.getRegionInfo()));
1093    } finally {
1094      regionNode.unlock();
1095    }
1096  }
1097
1098  /**
1099   * Called by DisableTableProcedure to unassign all the regions for a table.
1100   */
1101  public TransitRegionStateProcedure[] createUnassignProceduresForDisabling(TableName tableName) {
1102    return regionStates.getTableRegionStateNodes(tableName).stream()
1103      .map(this::forceCreateUnssignProcedure).filter(p -> p != null)
1104      .toArray(TransitRegionStateProcedure[]::new);
1105  }
1106
1107  private int submitUnassignProcedure(TableName tableName,
1108    Function<RegionStateNode, Boolean> shouldSubmit, Consumer<RegionStateNode> logRIT,
1109    Consumer<TransitRegionStateProcedure> submit) {
1110    int inTransitionCount = 0;
1111    for (RegionStateNode regionNode : regionStates.getTableRegionStateNodes(tableName)) {
1112      regionNode.lock();
1113      try {
1114        if (shouldSubmit.apply(regionNode)) {
1115          if (regionNode.isTransitionScheduled()) {
1116            logRIT.accept(regionNode);
1117            inTransitionCount++;
1118            continue;
1119          }
1120          if (regionNode.isInState(State.OFFLINE, State.CLOSED, State.SPLIT)) {
1121            continue;
1122          }
1123          submit.accept(regionNode.setProcedure(TransitRegionStateProcedure
1124            .unassign(getProcedureEnvironment(), regionNode.getRegionInfo())));
1125        }
1126      } finally {
1127        regionNode.unlock();
1128      }
1129    }
1130    return inTransitionCount;
1131  }
1132
1133  /**
1134   * Called by DisableTableProcedure to unassign all regions for a table. Will skip submit unassign
1135   * procedure if the region is in transition, so you may need to call this method multiple times.
1136   * @param tableName the table for closing excess region replicas
1137   * @param submit    for submitting procedure
1138   * @return the number of regions in transition that we can not schedule unassign procedures
1139   */
1140  public int submitUnassignProcedureForDisablingTable(TableName tableName,
1141    Consumer<TransitRegionStateProcedure> submit) {
1142    return submitUnassignProcedure(tableName, rn -> true,
1143      rn -> LOG.debug("skip scheduling unassign procedure for {} when closing table regions "
1144        + "for disabling since it is in transition", rn),
1145      submit);
1146  }
1147
1148  /**
1149   * Called by ModifyTableProcedure to unassign all the excess region replicas for a table. Will
1150   * skip submit unassign procedure if the region is in transition, so you may need to call this
1151   * method multiple times.
1152   * @param tableName       the table for closing excess region replicas
1153   * @param newReplicaCount the new replica count, should be less than current replica count
1154   * @param submit          for submitting procedure
1155   * @return the number of regions in transition that we can not schedule unassign procedures
1156   */
1157  public int submitUnassignProcedureForClosingExcessRegionReplicas(TableName tableName,
1158    int newReplicaCount, Consumer<TransitRegionStateProcedure> submit) {
1159    return submitUnassignProcedure(tableName,
1160      rn -> rn.getRegionInfo().getReplicaId() >= newReplicaCount,
1161      rn -> LOG.debug("skip scheduling unassign procedure for {} when closing excess region "
1162        + "replicas since it is in transition", rn),
1163      submit);
1164  }
1165
1166  private int numberOfUnclosedRegions(TableName tableName,
1167    Function<RegionStateNode, Boolean> shouldSubmit) {
1168    int unclosed = 0;
1169    for (RegionStateNode regionNode : regionStates.getTableRegionStateNodes(tableName)) {
1170      regionNode.lock();
1171      try {
1172        if (shouldSubmit.apply(regionNode)) {
1173          if (!regionNode.isInState(State.OFFLINE, State.CLOSED, State.SPLIT)) {
1174            unclosed++;
1175          }
1176        }
1177      } finally {
1178        regionNode.unlock();
1179      }
1180    }
1181    return unclosed;
1182  }
1183
1184  public int numberOfUnclosedRegionsForDisabling(TableName tableName) {
1185    return numberOfUnclosedRegions(tableName, rn -> true);
1186  }
1187
1188  public int numberOfUnclosedExcessRegionReplicas(TableName tableName, int newReplicaCount) {
1189    return numberOfUnclosedRegions(tableName,
1190      rn -> rn.getRegionInfo().getReplicaId() >= newReplicaCount);
1191  }
1192
1193  public SplitTableRegionProcedure createSplitProcedure(final RegionInfo regionToSplit,
1194    final byte[] splitKey) throws IOException {
1195    return new SplitTableRegionProcedure(getProcedureEnvironment(), regionToSplit, splitKey);
1196  }
1197
1198  public TruncateRegionProcedure createTruncateRegionProcedure(final RegionInfo regionToTruncate)
1199    throws IOException {
1200    return new TruncateRegionProcedure(getProcedureEnvironment(), regionToTruncate);
1201  }
1202
1203  public MergeTableRegionsProcedure createMergeProcedure(RegionInfo... ris) throws IOException {
1204    return new MergeTableRegionsProcedure(getProcedureEnvironment(), ris, false);
1205  }
1206
1207  /**
1208   * Delete the region states. This is called by "DeleteTable"
1209   */
1210  public void deleteTable(final TableName tableName) throws IOException {
1211    final ArrayList<RegionInfo> regions = regionStates.getTableRegionsInfo(tableName);
1212    regionStateStore.deleteRegions(regions);
1213    for (int i = 0; i < regions.size(); ++i) {
1214      final RegionInfo regionInfo = regions.get(i);
1215      regionStates.deleteRegion(regionInfo);
1216    }
1217  }
1218
1219  // ============================================================================================
1220  // RS Region Transition Report helpers
1221  // ============================================================================================
1222  private void reportRegionStateTransition(ReportRegionStateTransitionResponse.Builder builder,
1223    ServerStateNode serverNode, List<RegionStateTransition> transitionList) throws IOException {
1224    for (RegionStateTransition transition : transitionList) {
1225      switch (transition.getTransitionCode()) {
1226        case OPENED:
1227        case FAILED_OPEN:
1228        case CLOSED:
1229          assert transition.getRegionInfoCount() == 1 : transition;
1230          final RegionInfo hri = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0));
1231          long procId =
1232            transition.getProcIdCount() > 0 ? transition.getProcId(0) : Procedure.NO_PROC_ID;
1233          updateRegionTransition(serverNode, transition.getTransitionCode(), hri,
1234            transition.hasOpenSeqNum() ? transition.getOpenSeqNum() : HConstants.NO_SEQNUM, procId);
1235          break;
1236        case READY_TO_SPLIT:
1237        case SPLIT:
1238        case SPLIT_REVERTED:
1239          assert transition.getRegionInfoCount() == 3 : transition;
1240          final RegionInfo parent = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0));
1241          final RegionInfo splitA = ProtobufUtil.toRegionInfo(transition.getRegionInfo(1));
1242          final RegionInfo splitB = ProtobufUtil.toRegionInfo(transition.getRegionInfo(2));
1243          updateRegionSplitTransition(serverNode, transition.getTransitionCode(), parent, splitA,
1244            splitB);
1245          break;
1246        case READY_TO_MERGE:
1247        case MERGED:
1248        case MERGE_REVERTED:
1249          assert transition.getRegionInfoCount() == 3 : transition;
1250          final RegionInfo merged = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0));
1251          final RegionInfo mergeA = ProtobufUtil.toRegionInfo(transition.getRegionInfo(1));
1252          final RegionInfo mergeB = ProtobufUtil.toRegionInfo(transition.getRegionInfo(2));
1253          updateRegionMergeTransition(serverNode, transition.getTransitionCode(), merged, mergeA,
1254            mergeB);
1255          break;
1256      }
1257    }
1258  }
1259
1260  public ReportRegionStateTransitionResponse reportRegionStateTransition(
1261    final ReportRegionStateTransitionRequest req) throws PleaseHoldException {
1262    ReportRegionStateTransitionResponse.Builder builder =
1263      ReportRegionStateTransitionResponse.newBuilder();
1264    ServerName serverName = ProtobufUtil.toServerName(req.getServer());
1265    ServerStateNode serverNode = regionStates.getServerNode(serverName);
1266    if (serverNode == null) {
1267      LOG.warn("No server node for {}", serverName);
1268      builder.setErrorMessage("No server node for " + serverName);
1269      return builder.build();
1270    }
1271    // here we have to acquire a read lock instead of a simple exclusive lock. This is because that
1272    // we should not block other reportRegionStateTransition call from the same region server. This
1273    // is not only about performance, but also to prevent dead lock. Think of the meta region is
1274    // also on the same region server and you hold the lock which blocks the
1275    // reportRegionStateTransition for meta, and since meta is not online, you will block inside the
1276    // lock protection to wait for meta online...
1277    serverNode.readLock().lock();
1278    try {
1279      // we only accept reportRegionStateTransition if the region server is online, see the comment
1280      // above in submitServerCrash method and HBASE-21508 for more details.
1281      if (serverNode.isInState(ServerState.ONLINE)) {
1282        try {
1283          reportRegionStateTransition(builder, serverNode, req.getTransitionList());
1284        } catch (PleaseHoldException e) {
1285          LOG.trace("Failed transition ", e);
1286          throw e;
1287        } catch (UnsupportedOperationException | IOException e) {
1288          // TODO: at the moment we have a single error message and the RS will abort
1289          // if the master says that one of the region transitions failed.
1290          LOG.warn("Failed transition", e);
1291          builder.setErrorMessage("Failed transition " + e.getMessage());
1292        }
1293      } else {
1294        LOG.warn("The region server {} is already dead, skip reportRegionStateTransition call",
1295          serverName);
1296        builder.setErrorMessage("You are dead");
1297      }
1298    } finally {
1299      serverNode.readLock().unlock();
1300    }
1301
1302    return builder.build();
1303  }
1304
1305  private void updateRegionTransition(ServerStateNode serverNode, TransitionCode state,
1306    RegionInfo regionInfo, long seqId, long procId) throws IOException {
1307    checkMetaLoaded(regionInfo, procId);
1308
1309    RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo);
1310    if (regionNode == null) {
1311      // the table/region is gone. maybe a delete, split, merge
1312      throw new UnexpectedStateException(String.format(
1313        "Server %s was trying to transition region %s to %s. but Region is not known.",
1314        serverNode.getServerName(), regionInfo, state));
1315    }
1316    LOG.trace("Update region transition serverName={} region={} regionState={}",
1317      serverNode.getServerName(), regionNode, state);
1318
1319    regionNode.lock();
1320    try {
1321      if (!reportTransition(regionNode, serverNode, state, seqId, procId)) {
1322        // Don't log WARN if shutting down cluster; during shutdown. Avoid the below messages:
1323        // 2018-08-13 10:45:10,551 WARN ...AssignmentManager: No matching procedure found for
1324        // rit=OPEN, location=ve0538.halxg.cloudera.com,16020,1533493000958,
1325        // table=IntegrationTestBigLinkedList, region=65ab289e2fc1530df65f6c3d7cde7aa5 transition
1326        // to CLOSED
1327        // These happen because on cluster shutdown, we currently let the RegionServers close
1328        // regions. This is the only time that region close is not run by the Master (so cluster
1329        // goes down fast). Consider changing it so Master runs all shutdowns.
1330        if (
1331          this.master.getServerManager().isClusterShutdown() && state.equals(TransitionCode.CLOSED)
1332        ) {
1333          LOG.info("RegionServer {} {}", state, regionNode.getRegionInfo().getEncodedName());
1334        } else {
1335          LOG.warn("No matching procedure found for {} transition on {} to {}",
1336            serverNode.getServerName(), regionNode, state);
1337        }
1338      }
1339    } finally {
1340      regionNode.unlock();
1341    }
1342  }
1343
1344  private boolean reportTransition(RegionStateNode regionNode, ServerStateNode serverNode,
1345    TransitionCode state, long seqId, long procId) throws IOException {
1346    ServerName serverName = serverNode.getServerName();
1347    TransitRegionStateProcedure proc = regionNode.getProcedure();
1348    if (proc == null) {
1349      return false;
1350    }
1351    proc.reportTransition(master.getMasterProcedureExecutor().getEnvironment(), regionNode,
1352      serverName, state, seqId, procId);
1353    return true;
1354  }
1355
1356  private void updateRegionSplitTransition(final ServerStateNode serverNode,
1357    final TransitionCode state, final RegionInfo parent, final RegionInfo hriA,
1358    final RegionInfo hriB) throws IOException {
1359    checkMetaLoaded(parent, Procedure.NO_PROC_ID);
1360
1361    if (state != TransitionCode.READY_TO_SPLIT) {
1362      throw new UnexpectedStateException(
1363        "unsupported split regionState=" + state + " for parent region " + parent
1364          + " maybe an old RS (< 2.0) had the operation in progress");
1365    }
1366
1367    // sanity check on the request
1368    if (!Bytes.equals(hriA.getEndKey(), hriB.getStartKey())) {
1369      throw new UnsupportedOperationException("unsupported split request with bad keys: parent="
1370        + parent + " hriA=" + hriA + " hriB=" + hriB);
1371    }
1372
1373    if (!master.isSplitOrMergeEnabled(MasterSwitchType.SPLIT)) {
1374      LOG.warn("Split switch is off! skip split of " + parent);
1375      throw new DoNotRetryIOException(
1376        "Split region " + parent.getRegionNameAsString() + " failed due to split switch off");
1377    }
1378
1379    // Submit the Split procedure
1380    final byte[] splitKey = hriB.getStartKey();
1381    if (LOG.isDebugEnabled()) {
1382      LOG.debug("Split request from {}, parent={}, splitKey={}", serverNode.getServerName(), parent,
1383        Bytes.toStringBinary(splitKey));
1384    }
1385    // Processing this report happens asynchronously from other activities which can mutate
1386    // the region state. For example, a split procedure may already be running for this parent.
1387    // A split procedure cannot succeed if the parent region is no longer open, so we can
1388    // ignore it in that case.
1389    // Note that submitting more than one split procedure for a given region is
1390    // harmless -- the split is fenced in the procedure handling -- but it would be noisy in
1391    // the logs. Only one procedure can succeed. The other procedure(s) would abort during
1392    // initialization and report failure with WARN level logging.
1393    RegionState parentState = regionStates.getRegionState(parent);
1394    if (parentState != null && parentState.isOpened()) {
1395      master.getMasterProcedureExecutor().submitProcedure(createSplitProcedure(parent, splitKey));
1396    } else {
1397      LOG.info("Ignoring split request from {}, parent={} because parent is unknown or not open",
1398        serverNode.getServerName(), parent);
1399      return;
1400    }
1401
1402    // If the RS is < 2.0 throw an exception to abort the operation, we are handling the split
1403    if (master.getServerManager().getVersionNumber(serverNode.getServerName()) < 0x0200000) {
1404      throw new UnsupportedOperationException(
1405        String.format("Split handled by the master: " + "parent=%s hriA=%s hriB=%s",
1406          parent.getShortNameToLog(), hriA, hriB));
1407    }
1408  }
1409
1410  private void updateRegionMergeTransition(final ServerStateNode serverNode,
1411    final TransitionCode state, final RegionInfo merged, final RegionInfo hriA,
1412    final RegionInfo hriB) throws IOException {
1413    checkMetaLoaded(merged, Procedure.NO_PROC_ID);
1414
1415    if (state != TransitionCode.READY_TO_MERGE) {
1416      throw new UnexpectedStateException(
1417        "Unsupported merge regionState=" + state + " for regionA=" + hriA + " regionB=" + hriB
1418          + " merged=" + merged + " maybe an old RS (< 2.0) had the operation in progress");
1419    }
1420
1421    if (!master.isSplitOrMergeEnabled(MasterSwitchType.MERGE)) {
1422      LOG.warn("Merge switch is off! skip merge of regionA=" + hriA + " regionB=" + hriB);
1423      throw new DoNotRetryIOException(
1424        "Merge of regionA=" + hriA + " regionB=" + hriB + " failed because merge switch is off");
1425    }
1426
1427    // Submit the Merge procedure
1428    if (LOG.isDebugEnabled()) {
1429      LOG.debug("Handling merge request from RS=" + merged + ", merged=" + merged);
1430    }
1431    master.getMasterProcedureExecutor().submitProcedure(createMergeProcedure(hriA, hriB));
1432
1433    // If the RS is < 2.0 throw an exception to abort the operation, we are handling the merge
1434    if (master.getServerManager().getVersionNumber(serverNode.getServerName()) < 0x0200000) {
1435      throw new UnsupportedOperationException(
1436        String.format("Merge not handled yet: regionState=%s merged=%s hriA=%s hriB=%s", state,
1437          merged, hriA, hriB));
1438    }
1439  }
1440
1441  // ============================================================================================
1442  // RS Status update (report online regions) helpers
1443  // ============================================================================================
1444  /**
1445   * The master will call this method when the RS send the regionServerReport(). The report will
1446   * contains the "online regions". This method will check the the online regions against the
1447   * in-memory state of the AM, and we will log a warn message if there is a mismatch. This is
1448   * because that there is no fencing between the reportRegionStateTransition method and
1449   * regionServerReport method, so there could be race and introduce inconsistency here, but
1450   * actually there is no problem.
1451   * <p/>
1452   * Please see HBASE-21421 and HBASE-21463 for more details.
1453   */
1454  public void reportOnlineRegions(ServerName serverName, Set<byte[]> regionNames) {
1455    if (!isRunning()) {
1456      return;
1457    }
1458    if (LOG.isTraceEnabled()) {
1459      LOG.trace("ReportOnlineRegions {} regionCount={}, metaLoaded={} {}", serverName,
1460        regionNames.size(), isMetaLoaded(),
1461        regionNames.stream().map(Bytes::toStringBinary).collect(Collectors.toList()));
1462    }
1463
1464    ServerStateNode serverNode = regionStates.getServerNode(serverName);
1465    if (serverNode == null) {
1466      LOG.warn("Got a report from server {} where its server node is null", serverName);
1467      return;
1468    }
1469    serverNode.readLock().lock();
1470    try {
1471      if (!serverNode.isInState(ServerState.ONLINE)) {
1472        LOG.warn("Got a report from a server result in state {}", serverNode);
1473        return;
1474      }
1475    } finally {
1476      serverNode.readLock().unlock();
1477    }
1478
1479    // Track the regionserver reported online regions in memory.
1480    synchronized (rsReports) {
1481      rsReports.put(serverName, regionNames);
1482    }
1483
1484    if (regionNames.isEmpty()) {
1485      // nothing to do if we don't have regions
1486      LOG.trace("no online region found on {}", serverName);
1487      return;
1488    }
1489    if (!isMetaLoaded()) {
1490      // we are still on startup, skip checking
1491      return;
1492    }
1493    // The Heartbeat tells us of what regions are on the region serve, check the state.
1494    checkOnlineRegionsReport(serverNode, regionNames);
1495  }
1496
1497  /**
1498   * Close <code>regionName</code> on <code>sn</code> silently and immediately without using a
1499   * Procedure or going via hbase:meta. For case where a RegionServer's hosting of a Region is not
1500   * aligned w/ the Master's accounting of Region state. This is for cleaning up an error in
1501   * accounting.
1502   */
1503  private void closeRegionSilently(ServerName sn, byte[] regionName) {
1504    try {
1505      RegionInfo ri = CatalogFamilyFormat.parseRegionInfoFromRegionName(regionName);
1506      // Pass -1 for timeout. Means do not wait.
1507      ServerManager.closeRegionSilentlyAndWait(this.master.getAsyncClusterConnection(), sn, ri, -1);
1508    } catch (Exception e) {
1509      LOG.error("Failed trying to close {} on {}", Bytes.toStringBinary(regionName), sn, e);
1510    }
1511  }
1512
1513  /**
1514   * Check that what the RegionServer reports aligns with the Master's image. If disagreement, we
1515   * will tell the RegionServer to expediently close a Region we do not think it should have.
1516   */
1517  private void checkOnlineRegionsReport(ServerStateNode serverNode, Set<byte[]> regionNames) {
1518    ServerName serverName = serverNode.getServerName();
1519    for (byte[] regionName : regionNames) {
1520      if (!isRunning()) {
1521        return;
1522      }
1523      RegionStateNode regionNode = regionStates.getRegionStateNodeFromName(regionName);
1524      if (regionNode == null) {
1525        String regionNameAsStr = Bytes.toStringBinary(regionName);
1526        LOG.warn("No RegionStateNode for {} but reported as up on {}; closing...", regionNameAsStr,
1527          serverName);
1528        closeRegionSilently(serverNode.getServerName(), regionName);
1529        continue;
1530      }
1531      final long lag = 1000;
1532      // This is just a fallback check designed to identify unexpected data inconsistencies, so we
1533      // use tryLock to attempt to acquire the lock, and if the lock cannot be acquired, we skip the
1534      // check. This will not cause any additional problems and also prevents the regionServerReport
1535      // call from being stuck for too long which may cause deadlock on region assignment.
1536      if (regionNode.tryLock()) {
1537        try {
1538          long diff = EnvironmentEdgeManager.currentTime() - regionNode.getLastUpdate();
1539          if (regionNode.isInState(State.OPENING, State.OPEN)) {
1540            // This is possible as a region server has just closed a region but the region server
1541            // report is generated before the closing, but arrive after the closing. Make sure
1542            // there
1543            // is some elapsed time so less false alarms.
1544            if (!regionNode.getRegionLocation().equals(serverName) && diff > lag) {
1545              LOG.warn("Reporting {} server does not match {} (time since last "
1546                + "update={}ms); closing...", serverName, regionNode, diff);
1547              closeRegionSilently(serverNode.getServerName(), regionName);
1548            }
1549          } else if (!regionNode.isInState(State.CLOSING, State.SPLITTING)) {
1550            // So, we can get report that a region is CLOSED or SPLIT because a heartbeat
1551            // came in at about same time as a region transition. Make sure there is some
1552            // elapsed time so less false alarms.
1553            if (diff > lag) {
1554              LOG.warn("Reporting {} state does not match {} (time since last update={}ms)",
1555                serverName, regionNode, diff);
1556            }
1557          }
1558        } finally {
1559          regionNode.unlock();
1560        }
1561      } else {
1562        LOG.warn(
1563          "Unable to acquire lock for regionNode {}. It is likely that another thread is currently holding the lock. To avoid deadlock, skip execution for now.",
1564          regionNode);
1565      }
1566    }
1567  }
1568
1569  // ============================================================================================
1570  // RIT chore
1571  // ============================================================================================
1572  private static class RegionInTransitionChore extends ProcedureInMemoryChore<MasterProcedureEnv> {
1573    public RegionInTransitionChore(final int timeoutMsec) {
1574      super(timeoutMsec);
1575    }
1576
1577    @Override
1578    protected void periodicExecute(final MasterProcedureEnv env) {
1579      final AssignmentManager am = env.getAssignmentManager();
1580
1581      final RegionInTransitionStat ritStat = am.computeRegionInTransitionStat();
1582      if (ritStat.hasRegionsOverThreshold()) {
1583        for (RegionState hri : ritStat.getRegionOverThreshold()) {
1584          am.handleRegionOverStuckWarningThreshold(hri.getRegion());
1585        }
1586      }
1587
1588      // update metrics
1589      am.updateRegionsInTransitionMetrics(ritStat);
1590    }
1591  }
1592
1593  private static class DeadServerMetricRegionChore
1594    extends ProcedureInMemoryChore<MasterProcedureEnv> {
1595    public DeadServerMetricRegionChore(final int timeoutMsec) {
1596      super(timeoutMsec);
1597    }
1598
1599    @Override
1600    protected void periodicExecute(final MasterProcedureEnv env) {
1601      final ServerManager sm = env.getMasterServices().getServerManager();
1602      final AssignmentManager am = env.getAssignmentManager();
1603      // To minimize inconsistencies we are not going to snapshot live servers in advance in case
1604      // new servers are added; OTOH we don't want to add heavy sync for a consistent view since
1605      // this is for metrics. Instead, we're going to check each regions as we go; to avoid making
1606      // too many checks, we maintain a local lists of server, limiting us to false negatives. If
1607      // we miss some recently-dead server, we'll just see it next time.
1608      Set<ServerName> recentlyLiveServers = new HashSet<>();
1609      int deadRegions = 0, unknownRegions = 0;
1610      for (RegionStateNode rsn : am.getRegionStates().getRegionStateNodes()) {
1611        if (rsn.getState() != State.OPEN) {
1612          continue; // Opportunistic check, should quickly skip RITs, offline tables, etc.
1613        }
1614        // Do not need to acquire region state lock as this is only for showing metrics.
1615        ServerName sn = rsn.getRegionLocation();
1616        State state = rsn.getState();
1617        if (state != State.OPEN) {
1618          continue; // Mostly skipping RITs that are already being take care of.
1619        }
1620        if (sn == null) {
1621          ++unknownRegions; // Opened on null?
1622          continue;
1623        }
1624        if (recentlyLiveServers.contains(sn)) {
1625          continue;
1626        }
1627        ServerManager.ServerLiveState sls = sm.isServerKnownAndOnline(sn);
1628        switch (sls) {
1629          case LIVE:
1630            recentlyLiveServers.add(sn);
1631            break;
1632          case DEAD:
1633            ++deadRegions;
1634            break;
1635          case UNKNOWN:
1636            ++unknownRegions;
1637            break;
1638          default:
1639            throw new AssertionError("Unexpected " + sls);
1640        }
1641      }
1642      if (deadRegions > 0 || unknownRegions > 0) {
1643        LOG.info("Found {} OPEN regions on dead servers and {} OPEN regions on unknown servers",
1644          deadRegions, unknownRegions);
1645      }
1646
1647      am.updateDeadServerRegionMetrics(deadRegions, unknownRegions);
1648    }
1649  }
1650
1651  public RegionInTransitionStat computeRegionInTransitionStat() {
1652    final RegionInTransitionStat rit = new RegionInTransitionStat(getConfiguration());
1653    rit.update(this);
1654    return rit;
1655  }
1656
1657  public static class RegionInTransitionStat {
1658    private final int ritThreshold;
1659
1660    private HashMap<String, RegionState> ritsOverThreshold = null;
1661    private long statTimestamp;
1662    private long oldestRITTime = 0;
1663    private int totalRITsTwiceThreshold = 0;
1664    private int totalRITs = 0;
1665
1666    public RegionInTransitionStat(final Configuration conf) {
1667      this.ritThreshold =
1668        conf.getInt(METRICS_RIT_STUCK_WARNING_THRESHOLD, DEFAULT_RIT_STUCK_WARNING_THRESHOLD);
1669    }
1670
1671    public int getRITThreshold() {
1672      return ritThreshold;
1673    }
1674
1675    public long getTimestamp() {
1676      return statTimestamp;
1677    }
1678
1679    public int getTotalRITs() {
1680      return totalRITs;
1681    }
1682
1683    public long getOldestRITTime() {
1684      return oldestRITTime;
1685    }
1686
1687    public int getTotalRITsOverThreshold() {
1688      Map<String, RegionState> m = this.ritsOverThreshold;
1689      return m != null ? m.size() : 0;
1690    }
1691
1692    public boolean hasRegionsTwiceOverThreshold() {
1693      return totalRITsTwiceThreshold > 0;
1694    }
1695
1696    public boolean hasRegionsOverThreshold() {
1697      Map<String, RegionState> m = this.ritsOverThreshold;
1698      return m != null && !m.isEmpty();
1699    }
1700
1701    public Collection<RegionState> getRegionOverThreshold() {
1702      Map<String, RegionState> m = this.ritsOverThreshold;
1703      return m != null ? m.values() : Collections.emptySet();
1704    }
1705
1706    public boolean isRegionOverThreshold(final RegionInfo regionInfo) {
1707      Map<String, RegionState> m = this.ritsOverThreshold;
1708      return m != null && m.containsKey(regionInfo.getEncodedName());
1709    }
1710
1711    public boolean isRegionTwiceOverThreshold(final RegionInfo regionInfo) {
1712      Map<String, RegionState> m = this.ritsOverThreshold;
1713      if (m == null) {
1714        return false;
1715      }
1716      final RegionState state = m.get(regionInfo.getEncodedName());
1717      if (state == null) {
1718        return false;
1719      }
1720      return (statTimestamp - state.getStamp()) > (ritThreshold * 2);
1721    }
1722
1723    protected void update(final AssignmentManager am) {
1724      this.statTimestamp = EnvironmentEdgeManager.currentTime();
1725      update(am.getRegionsStateInTransition(), statTimestamp);
1726
1727      if (LOG.isDebugEnabled() && ritsOverThreshold != null && !ritsOverThreshold.isEmpty()) {
1728        LOG.debug("RITs over threshold: {}",
1729          ritsOverThreshold.entrySet().stream()
1730            .map(e -> e.getKey() + ":" + e.getValue().getState().name())
1731            .collect(Collectors.joining("\n")));
1732      }
1733    }
1734
1735    private void update(final Collection<RegionState> regions, final long currentTime) {
1736      for (RegionState state : regions) {
1737        totalRITs++;
1738        final long ritStartedMs = state.getStamp();
1739        if (ritStartedMs == 0) {
1740          // Don't output bogus values to metrics if they accidentally make it here.
1741          LOG.warn("The RIT {} has no start time", state.getRegion());
1742          continue;
1743        }
1744        final long ritTime = currentTime - ritStartedMs;
1745        if (ritTime > ritThreshold) {
1746          if (ritsOverThreshold == null) {
1747            ritsOverThreshold = new HashMap<String, RegionState>();
1748          }
1749          ritsOverThreshold.put(state.getRegion().getEncodedName(), state);
1750          totalRITsTwiceThreshold += (ritTime > (ritThreshold * 2)) ? 1 : 0;
1751        }
1752        if (oldestRITTime < ritTime) {
1753          oldestRITTime = ritTime;
1754        }
1755      }
1756    }
1757  }
1758
1759  private void updateRegionsInTransitionMetrics(final RegionInTransitionStat ritStat) {
1760    metrics.updateRITOldestAge(ritStat.getOldestRITTime());
1761    metrics.updateRITCount(ritStat.getTotalRITs());
1762    metrics.updateRITCountOverThreshold(ritStat.getTotalRITsOverThreshold());
1763  }
1764
1765  private void updateDeadServerRegionMetrics(int deadRegions, int unknownRegions) {
1766    metrics.updateDeadServerOpenRegions(deadRegions);
1767    metrics.updateUnknownServerOpenRegions(unknownRegions);
1768  }
1769
1770  private void handleRegionOverStuckWarningThreshold(final RegionInfo regionInfo) {
1771    final RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo);
1772    // if (regionNode.isStuck()) {
1773    LOG.warn("STUCK Region-In-Transition {}", regionNode);
1774  }
1775
1776  // ============================================================================================
1777  // TODO: Master load/bootstrap
1778  // ============================================================================================
1779  public void joinCluster() throws IOException {
1780    long startTime = System.nanoTime();
1781    LOG.debug("Joining cluster...");
1782
1783    // Scan hbase:meta to build list of existing regions, servers, and assignment.
1784    // hbase:meta is online now or will be. Inside loadMeta, we keep trying. Can't make progress
1785    // w/o meta.
1786    loadMeta();
1787
1788    while (master.getServerManager().countOfRegionServers() < 1) {
1789      LOG.info("Waiting for RegionServers to join; current count={}",
1790        master.getServerManager().countOfRegionServers());
1791      Threads.sleep(250);
1792    }
1793    LOG.info("Number of RegionServers={}", master.getServerManager().countOfRegionServers());
1794
1795    // Start the chores
1796    master.getMasterProcedureExecutor().addChore(this.ritChore);
1797    master.getMasterProcedureExecutor().addChore(this.deadMetricChore);
1798
1799    long costMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
1800    LOG.info("Joined the cluster in {}", StringUtils.humanTimeDiff(costMs));
1801  }
1802
1803  /**
1804   * Create assign procedure for offline regions. Just follow the old
1805   * processofflineServersWithOnlineRegions method. Since now we do not need to deal with dead
1806   * server any more, we only deal with the regions in OFFLINE state in this method. And this is a
1807   * bit strange, that for new regions, we will add it in CLOSED state instead of OFFLINE state, and
1808   * usually there will be a procedure to track them. The processofflineServersWithOnlineRegions is
1809   * a legacy from long ago, as things are going really different now, maybe we do not need this
1810   * method any more. Need to revisit later.
1811   */
1812  // Public so can be run by the Master as part of the startup. Needs hbase:meta to be online.
1813  // Needs to be done after the table state manager has been started.
1814  public void processOfflineRegions() {
1815    TransitRegionStateProcedure[] procs =
1816      regionStates.getRegionStateNodes().stream().filter(rsn -> rsn.isInState(State.OFFLINE))
1817        .filter(rsn -> isTableEnabled(rsn.getRegionInfo().getTable())).map(rsn -> {
1818          rsn.lock();
1819          try {
1820            if (rsn.getProcedure() != null) {
1821              return null;
1822            } else {
1823              return rsn.setProcedure(TransitRegionStateProcedure.assign(getProcedureEnvironment(),
1824                rsn.getRegionInfo(), null));
1825            }
1826          } finally {
1827            rsn.unlock();
1828          }
1829        }).filter(p -> p != null).toArray(TransitRegionStateProcedure[]::new);
1830    if (procs.length > 0) {
1831      master.getMasterProcedureExecutor().submitProcedures(procs);
1832    }
1833  }
1834
1835  /*
1836   * AM internal RegionStateStore.RegionStateVisitor implementation. To be used when scanning META
1837   * table for region rows, using RegionStateStore utility methods. RegionStateStore methods will
1838   * convert Result into proper RegionInfo instances, but those would still need to be added into
1839   * AssignmentManager.regionStates in-memory cache. RegionMetaLoadingVisitor.visitRegionState
1840   * method provides the logic for adding RegionInfo instances as loaded from latest META scan into
1841   * AssignmentManager.regionStates.
1842   */
1843  private class RegionMetaLoadingVisitor implements RegionStateStore.RegionStateVisitor {
1844
1845    @Override
1846    public void visitRegionState(Result result, final RegionInfo regionInfo, final State state,
1847      final ServerName regionLocation, final ServerName lastHost, final long openSeqNum) {
1848      if (
1849        state == null && regionLocation == null && lastHost == null
1850          && openSeqNum == SequenceId.NO_SEQUENCE_ID
1851      ) {
1852        // This is a row with nothing in it.
1853        LOG.warn("Skipping empty row={}", result);
1854        return;
1855      }
1856      State localState = state;
1857      if (localState == null) {
1858        // No region state column data in hbase:meta table! Are I doing a rolling upgrade from
1859        // hbase1 to hbase2? Am I restoring a SNAPSHOT or otherwise adding a region to hbase:meta?
1860        // In any of these cases, state is empty. For now, presume OFFLINE but there are probably
1861        // cases where we need to probe more to be sure this correct; TODO informed by experience.
1862        LOG.info(regionInfo.getEncodedName() + " regionState=null; presuming " + State.OFFLINE);
1863        localState = State.OFFLINE;
1864      }
1865      RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo);
1866      // Do not need to lock on regionNode, as we can make sure that before we finish loading
1867      // meta, all the related procedures can not be executed. The only exception is for meta
1868      // region related operations, but here we do not load the informations for meta region.
1869      regionNode.setState(localState);
1870      regionNode.setLastHost(lastHost);
1871      regionNode.setRegionLocation(regionLocation);
1872      regionNode.setOpenSeqNum(openSeqNum);
1873
1874      // Note: keep consistent with other methods, see region(Opening|Opened|Closing)
1875      // RIT/ServerCrash handling should take care of the transiting regions.
1876      if (
1877        localState.matches(State.OPEN, State.OPENING, State.CLOSING, State.SPLITTING, State.MERGING)
1878      ) {
1879        assert regionLocation != null : "found null region location for " + regionNode;
1880        // TODO: this could lead to some orphan server state nodes, as it is possible that the
1881        // region server is already dead and its SCP has already finished but we have
1882        // persisted an opening state on this region server. Finally the TRSP will assign the
1883        // region to another region server, so it will not cause critical problems, just waste
1884        // some memory as no one will try to cleanup these orphan server state nodes.
1885        regionStates.createServer(regionLocation);
1886        regionStates.addRegionToServer(regionNode);
1887      } else if (localState == State.OFFLINE || regionInfo.isOffline()) {
1888        regionStates.addToOfflineRegions(regionNode);
1889      }
1890      if (regionNode.getProcedure() != null) {
1891        regionNode.getProcedure().stateLoaded(AssignmentManager.this, regionNode);
1892      }
1893      // add regions to RIT while visiting the meta
1894      regionInTransitionTracker.handleRegionStateNodeOperation(regionNode);
1895      // If region location of region belongs to a dead server mark the region crashed
1896      if (
1897        regionNode.getRegionLocation() != null
1898          && master.getServerManager().isServerDead(regionNode.getRegionLocation())
1899      ) {
1900        long timeOfCrash = master.getServerManager().getDeadServers()
1901          .getDeathTimestamp(regionNode.getRegionLocation());
1902        if (timeOfCrash != 0) {
1903          regionNode.crashed(timeOfCrash);
1904        }
1905        regionInTransitionTracker.regionCrashed(regionNode);
1906      }
1907    }
1908  };
1909
1910  /**
1911   * Attempt to load {@code regionInfo} from META, adding any results to the
1912   * {@link #regionStateStore} Is NOT aware of replica regions.
1913   * @param regionInfo the region to be loaded from META.
1914   * @throws IOException If some error occurs while querying META or parsing results.
1915   */
1916  public void populateRegionStatesFromMeta(@NonNull final RegionInfo regionInfo)
1917    throws IOException {
1918    final String regionEncodedName = RegionInfo.DEFAULT_REPLICA_ID == regionInfo.getReplicaId()
1919      ? regionInfo.getEncodedName()
1920      : RegionInfoBuilder.newBuilder(regionInfo).setReplicaId(RegionInfo.DEFAULT_REPLICA_ID).build()
1921        .getEncodedName();
1922    populateRegionStatesFromMeta(regionEncodedName);
1923  }
1924
1925  /**
1926   * Attempt to load {@code regionEncodedName} from META, adding any results to the
1927   * {@link #regionStateStore} Is NOT aware of replica regions.
1928   * @param regionEncodedName encoded name for the region to be loaded from META.
1929   * @throws IOException If some error occurs while querying META or parsing results.
1930   */
1931  public void populateRegionStatesFromMeta(@NonNull String regionEncodedName) throws IOException {
1932    final RegionMetaLoadingVisitor visitor = new RegionMetaLoadingVisitor();
1933    regionStateStore.visitMetaForRegion(regionEncodedName, visitor);
1934  }
1935
1936  private void loadMeta() throws IOException {
1937    // TODO: use a thread pool
1938    regionStateStore.visitMeta(new RegionMetaLoadingVisitor());
1939  }
1940
1941  /**
1942   * Used to check if the meta loading is done.
1943   * <p/>
1944   * if not we throw PleaseHoldException since we are rebuilding the RegionStates
1945   * @param hri    region to check if it is already rebuild
1946   * @param procId the procedure id for this region operation, or NO_PROC_ID if not available
1947   * @throws PleaseHoldException if meta has not been loaded yet
1948   */
1949  private void checkMetaLoaded(RegionInfo hri, long procId) throws PleaseHoldException {
1950    if (!isRunning()) {
1951      throw new PleaseHoldException("AssignmentManager not running");
1952    }
1953
1954    // Check if the procedure is for a critical system table
1955    // Critical system tables can proceed even if meta is not loaded yet
1956    // We are currently making procId available only for the code path which can execute during the
1957    // cluster boot up. In the future, if additional code paths execute during cluster boot up, we
1958    // will need to make procId available for all those code paths.
1959    if (procId != Procedure.NO_PROC_ID) {
1960      Procedure<?> proc = master.getMasterProcedureExecutor().getProcedure(procId);
1961      if (proc != null && proc.isCriticalSystemTable()) {
1962        return;
1963      }
1964    }
1965
1966    boolean meta = isMetaRegion(hri);
1967    boolean metaLoaded = isMetaLoaded();
1968    if (!meta && !metaLoaded) {
1969      throw new PleaseHoldException("Master not fully online; " + TableName.META_TABLE_NAME + "="
1970        + meta + ", metaLoaded=" + metaLoaded);
1971    }
1972  }
1973
1974  // ============================================================================================
1975  // TODO: Metrics
1976  // ============================================================================================
1977  public int getNumRegionsOpened() {
1978    // TODO: Used by TestRegionPlacement.java and assume monotonically increasing value
1979    return 0;
1980  }
1981
1982  /**
1983   * Usually run by the Master in reaction to server crash during normal processing. Can also be
1984   * invoked via external RPC to effect repair; in the latter case, the 'force' flag is set so we
1985   * push through the SCP though context may indicate already-running-SCP (An old SCP may have
1986   * exited abnormally, or damaged cluster may still have references in hbase:meta to 'Unknown
1987   * Servers' -- servers that are not online or in dead servers list, etc.)
1988   * @param force Set if the request came in externally over RPC (via hbck2). Force means run the
1989   *              SCP even if it seems as though there might be an outstanding SCP running.
1990   * @return pid of scheduled SCP or {@link Procedure#NO_PROC_ID} if none scheduled.
1991   */
1992  public long submitServerCrash(ServerName serverName, boolean shouldSplitWal, boolean force) {
1993    // May be an 'Unknown Server' so handle case where serverNode is null.
1994    ServerStateNode serverNode = regionStates.getServerNode(serverName);
1995    // Remove the in-memory rsReports result
1996    synchronized (rsReports) {
1997      rsReports.remove(serverName);
1998    }
1999    if (serverNode == null) {
2000      if (force) {
2001        LOG.info("Force adding ServerCrashProcedure for {} when server node is null", serverName);
2002      } else {
2003        // for normal case, do not schedule SCP if ServerStateNode is null
2004        LOG.warn("Skip adding ServerCrashProcedure for {} because server node is null", serverName);
2005        return Procedure.NO_PROC_ID;
2006      }
2007    }
2008
2009    ProcedureExecutor<MasterProcedureEnv> procExec = this.master.getMasterProcedureExecutor();
2010    // We hold the write lock here for fencing on reportRegionStateTransition. Once we set the
2011    // server state to CRASHED, we will no longer accept the reportRegionStateTransition call from
2012    // this server. This is used to simplify the implementation for TRSP and SCP, where we can make
2013    // sure that, the region list fetched by SCP will not be changed any more.
2014    if (serverNode != null) {
2015      serverNode.writeLock().lock();
2016    }
2017    try {
2018
2019      boolean carryingMeta = isCarryingMeta(serverName);
2020      if (serverNode != null && !serverNode.isInState(ServerState.ONLINE)) {
2021        if (force) {
2022          LOG.info("Force adding ServerCrashProcedure for {} (meta={}) when state is not {}",
2023            serverNode, carryingMeta, ServerState.ONLINE);
2024        } else {
2025          LOG.info("Skip adding ServerCrashProcedure for {} (meta={}) when state is not {}",
2026            serverNode, carryingMeta, ServerState.ONLINE);
2027          return Procedure.NO_PROC_ID;
2028        }
2029      }
2030      MasterProcedureEnv mpe = procExec.getEnvironment();
2031      // If serverNode == null, then 'Unknown Server'. Schedule HBCKSCP instead.
2032      // HBCKSCP scours Master in-memory state AND hbase;meta for references to
2033      // serverName just-in-case. An SCP that is scheduled when the server is
2034      // 'Unknown' probably originated externally with HBCK2 fix-it tool.
2035      ServerState oldState = null;
2036      if (serverNode != null) {
2037        oldState = serverNode.getState();
2038        serverNode.setState(ServerState.CRASHED);
2039      }
2040      ServerCrashProcedure scp = force
2041        ? new HBCKServerCrashProcedure(mpe, serverName, shouldSplitWal, carryingMeta)
2042        : new ServerCrashProcedure(mpe, serverName, shouldSplitWal, carryingMeta);
2043      long pid = procExec.submitProcedure(scp);
2044      LOG.info("Scheduled ServerCrashProcedure pid={} for {} (carryingMeta={}){}.", pid, serverName,
2045        carryingMeta,
2046        serverNode == null ? "" : " " + serverNode.toString() + ", oldState=" + oldState);
2047      return pid;
2048    } finally {
2049      if (serverNode != null) {
2050        serverNode.writeLock().unlock();
2051      }
2052    }
2053  }
2054
2055  public void offlineRegion(final RegionInfo regionInfo) {
2056    // TODO used by MasterRpcServices
2057    RegionStateNode node = regionStates.getRegionStateNode(regionInfo);
2058    if (node != null) {
2059      node.offline();
2060    }
2061  }
2062
2063  public void onlineRegion(final RegionInfo regionInfo, final ServerName serverName) {
2064    // TODO used by TestSplitTransactionOnCluster.java
2065  }
2066
2067  public Map<ServerName, List<RegionInfo>>
2068    getSnapShotOfAssignment(final Collection<RegionInfo> regions) {
2069    return regionStates.getSnapShotOfAssignment(regions);
2070  }
2071
2072  // ============================================================================================
2073  // TODO: UTILS/HELPERS?
2074  // ============================================================================================
2075  /**
2076   * Used by the client (via master) to identify if all regions have the schema updates
2077   * @return Pair indicating the status of the alter command (pending/total)
2078   */
2079  public Pair<Integer, Integer> getReopenStatus(TableName tableName) {
2080    if (isTableDisabled(tableName)) {
2081      return new Pair<Integer, Integer>(0, 0);
2082    }
2083
2084    final List<RegionState> states = regionStates.getTableRegionStates(tableName);
2085    int ritCount = 0;
2086    for (RegionState regionState : states) {
2087      if (!regionState.isOpened() && !regionState.isSplit()) {
2088        ritCount++;
2089      }
2090    }
2091    return new Pair<Integer, Integer>(ritCount, states.size());
2092  }
2093
2094  // This comparator sorts the RegionStates by time stamp then Region name.
2095  // Comparing by timestamp alone can lead us to discard different RegionStates that happen
2096  // to share a timestamp.
2097  private final static class RegionStateStampComparator implements Comparator<RegionState> {
2098    @Override
2099    public int compare(final RegionState l, final RegionState r) {
2100      int stampCmp = Long.compare(l.getStamp(), r.getStamp());
2101      return stampCmp != 0 ? stampCmp : RegionInfo.COMPARATOR.compare(l.getRegion(), r.getRegion());
2102    }
2103  }
2104
2105  public final static RegionStateStampComparator REGION_STATE_STAMP_COMPARATOR =
2106    new RegionStateStampComparator();
2107
2108  // ============================================================================================
2109  // TODO: Region State In Transition
2110  // ============================================================================================
2111  public boolean hasRegionsInTransition() {
2112    return regionInTransitionTracker.hasRegionsInTransition();
2113  }
2114
2115  public List<RegionStateNode> getRegionsInTransition() {
2116    return regionInTransitionTracker.getRegionsInTransition();
2117  }
2118
2119  public boolean isRegionInTransition(final RegionInfo regionInfo) {
2120    return regionInTransitionTracker.isRegionInTransition(regionInfo);
2121  }
2122
2123  public int getRegionTransitScheduledCount() {
2124    return regionStates.getRegionTransitScheduledCount();
2125  }
2126
2127  /**
2128   * Get the number of regions in transition.
2129   */
2130  public int getRegionsInTransitionCount() {
2131    return regionInTransitionTracker.getRegionsInTransition().size();
2132  }
2133
2134  public SortedSet<RegionState> getRegionsStateInTransition() {
2135    final SortedSet<RegionState> rit = new TreeSet<RegionState>(REGION_STATE_STAMP_COMPARATOR);
2136    for (RegionStateNode node : getRegionsInTransition()) {
2137      rit.add(node.toRegionState());
2138    }
2139    return rit;
2140  }
2141
2142  public List<RegionInfo> getAssignedRegions() {
2143    return regionStates.getAssignedRegions();
2144  }
2145
2146  /**
2147   * Resolve a cached {@link RegionInfo} from the region name as a {@code byte[]}.
2148   */
2149  public RegionInfo getRegionInfo(final byte[] regionName) {
2150    final RegionStateNode regionState = regionStates.getRegionStateNodeFromName(regionName);
2151    return regionState != null ? regionState.getRegionInfo() : null;
2152  }
2153
2154  /**
2155   * Resolve a cached {@link RegionInfo} from the encoded region name as a {@code String}.
2156   */
2157  public RegionInfo getRegionInfo(final String encodedRegionName) {
2158    final RegionStateNode regionState =
2159      regionStates.getRegionStateNodeFromEncodedRegionName(encodedRegionName);
2160    return regionState != null ? regionState.getRegionInfo() : null;
2161  }
2162
2163  // ============================================================================================
2164  // Expected states on region state transition.
2165  // Notice that there is expected states for transiting to OPENING state, this is because SCP.
2166  // See the comments in regionOpening method for more details.
2167  // ============================================================================================
2168  private static final State[] STATES_EXPECTED_ON_OPEN = { State.OPENING, // Normal case
2169    State.OPEN // Retrying
2170  };
2171
2172  private static final State[] STATES_EXPECTED_ON_CLOSING = { State.OPEN, // Normal case
2173    State.CLOSING, // Retrying
2174    State.SPLITTING, // Offline the split parent
2175    State.MERGING // Offline the merge parents
2176  };
2177
2178  private static final State[] STATES_EXPECTED_ON_CLOSED = { State.CLOSING, // Normal case
2179    State.CLOSED // Retrying
2180  };
2181
2182  // This is for manually scheduled region assign, can add other states later if we find out other
2183  // usages
2184  private static final State[] STATES_EXPECTED_ON_ASSIGN = { State.CLOSED, State.OFFLINE };
2185
2186  // We only allow unassign or move a region which is in OPEN state.
2187  private static final State[] STATES_EXPECTED_ON_UNASSIGN_OR_MOVE = { State.OPEN };
2188
2189  // ============================================================================================
2190  // Region Status update
2191  // Should only be called in TransitRegionStateProcedure(and related procedures), as the locking
2192  // and pre-assumptions are very tricky.
2193  // ============================================================================================
2194  private CompletableFuture<Void> transitStateAndUpdate(RegionStateNode regionNode,
2195    RegionState.State newState, RegionState.State... expectedStates) {
2196    RegionState.State state = regionNode.getState();
2197    try {
2198      regionNode.transitionState(newState, expectedStates);
2199    } catch (UnexpectedStateException e) {
2200      return FutureUtils.failedFuture(e);
2201    }
2202    CompletableFuture<Void> future = regionStateStore.updateRegionLocation(regionNode);
2203    FutureUtils.addListener(future, (r, e) -> {
2204      if (e != null) {
2205        // revert
2206        regionNode.setState(state);
2207      } else {
2208        regionInTransitionTracker.handleRegionStateNodeOperation(regionNode);
2209      }
2210    });
2211    return future;
2212  }
2213
2214  // should be called within the synchronized block of RegionStateNode
2215  CompletableFuture<Void> regionOpening(RegionStateNode regionNode) {
2216    // As in SCP, for performance reason, there is no TRSP attached with this region, we will not
2217    // update the region state, which means that the region could be in any state when we want to
2218    // assign it after a RS crash. So here we do not pass the expectedStates parameter.
2219    return transitStateAndUpdate(regionNode, State.OPENING).thenAccept(r -> {
2220      ServerStateNode serverNode = regionStates.getServerNode(regionNode.getRegionLocation());
2221      // Here the server node could be null. For example, we want to assign the region to a given
2222      // region server and it crashes, and it is the region server which holds hbase:meta, then the
2223      // above transitStateAndUpdate call will never succeed until we finishes the SCP for it. But
2224      // after the SCP finishes, the server node will be removed, so when we arrive there, the
2225      // server
2226      // node will be null. This is not a big problem if we skip adding it, as later we will fail to
2227      // execute the remote procedure on the region server and then try to assign to another region
2228      // server
2229      if (serverNode != null) {
2230        serverNode.addRegion(regionNode);
2231      }
2232      // update the operation count metrics
2233      metrics.incrementOperationCounter();
2234    });
2235  }
2236
2237  // should be called under the RegionStateNode lock
2238  // The parameter 'giveUp' means whether we will try to open the region again, if it is true, then
2239  // we will persist the FAILED_OPEN state into hbase:meta.
2240  CompletableFuture<Void> regionFailedOpen(RegionStateNode regionNode, boolean giveUp) {
2241    RegionState.State state = regionNode.getState();
2242    ServerName regionLocation = regionNode.getRegionLocation();
2243    if (!giveUp) {
2244      if (regionLocation != null) {
2245        regionStates.removeRegionFromServer(regionLocation, regionNode);
2246      }
2247      return CompletableFuture.completedFuture(null);
2248    }
2249    regionNode.setState(State.FAILED_OPEN);
2250    regionNode.setRegionLocation(null);
2251    CompletableFuture<Void> future = regionStateStore.updateRegionLocation(regionNode);
2252    FutureUtils.addListener(future, (r, e) -> {
2253      if (e == null) {
2254        if (regionLocation != null) {
2255          regionStates.removeRegionFromServer(regionLocation, regionNode);
2256        }
2257        regionInTransitionTracker.handleRegionStateNodeOperation(regionNode);
2258      } else {
2259        // revert
2260        regionNode.setState(state);
2261        regionNode.setRegionLocation(regionLocation);
2262      }
2263    });
2264    return future;
2265  }
2266
2267  // should be called under the RegionStateNode lock
2268  CompletableFuture<Void> regionClosing(RegionStateNode regionNode) {
2269    return transitStateAndUpdate(regionNode, State.CLOSING, STATES_EXPECTED_ON_CLOSING)
2270      .thenAccept(r -> {
2271        RegionInfo hri = regionNode.getRegionInfo();
2272        // Set meta has not initialized early. so people trying to create/edit tables will wait
2273        if (isMetaRegion(hri)) {
2274          setMetaAssigned(hri, false);
2275        }
2276        // update the operation count metrics
2277        metrics.incrementOperationCounter();
2278      });
2279  }
2280
2281  // for open and close, they will first be persist to the procedure store in
2282  // RegionRemoteProcedureBase. So here we will first change the in memory state as it is considered
2283  // as succeeded if the persistence to procedure store is succeeded, and then when the
2284  // RegionRemoteProcedureBase is woken up, we will persist the RegionStateNode to hbase:meta.
2285
2286  // should be called under the RegionStateNode lock
2287  void regionOpenedWithoutPersistingToMeta(RegionStateNode regionNode)
2288    throws UnexpectedStateException {
2289    regionNode.transitionState(State.OPEN, STATES_EXPECTED_ON_OPEN);
2290    RegionInfo regionInfo = regionNode.getRegionInfo();
2291    regionStates.addRegionToServer(regionNode);
2292    regionStates.removeFromFailedOpen(regionInfo);
2293  }
2294
2295  // should be called under the RegionStateNode lock
2296  void regionClosedWithoutPersistingToMeta(RegionStateNode regionNode)
2297    throws UnexpectedStateException {
2298    ServerName regionLocation = regionNode.getRegionLocation();
2299    regionNode.transitionState(State.CLOSED, STATES_EXPECTED_ON_CLOSED);
2300    regionNode.setRegionLocation(null);
2301    if (regionLocation != null) {
2302      regionNode.setLastHost(regionLocation);
2303      regionStates.removeRegionFromServer(regionLocation, regionNode);
2304    }
2305  }
2306
2307  // should be called under the RegionStateNode lock
2308  CompletableFuture<Void> persistToMeta(RegionStateNode regionNode) {
2309    return regionStateStore.updateRegionLocation(regionNode).thenAccept(r -> {
2310      RegionInfo regionInfo = regionNode.getRegionInfo();
2311      if (isMetaRegion(regionInfo) && regionNode.getState() == State.OPEN) {
2312        // Usually we'd set a table ENABLED at this stage but hbase:meta is ALWAYs enabled, it
2313        // can't be disabled -- so skip the RPC (besides... enabled is managed by TableStateManager
2314        // which is backed by hbase:meta... Avoid setting ENABLED to avoid having to update state
2315        // on table that contains state.
2316        setMetaAssigned(regionInfo, true);
2317      }
2318      regionInTransitionTracker.handleRegionStateNodeOperation(regionNode);
2319    });
2320  }
2321
2322  // should be called under the RegionStateNode lock
2323  // for SCP
2324  public CompletableFuture<Void> regionClosedAbnormally(RegionStateNode regionNode) {
2325    RegionState.State state = regionNode.getState();
2326    ServerName regionLocation = regionNode.getRegionLocation();
2327    regionNode.setState(State.ABNORMALLY_CLOSED);
2328    regionNode.setRegionLocation(null);
2329    CompletableFuture<Void> future = regionStateStore.updateRegionLocation(regionNode);
2330    FutureUtils.addListener(future, (r, e) -> {
2331      if (e == null) {
2332        if (regionLocation != null) {
2333          regionNode.setLastHost(regionLocation);
2334          regionStates.removeRegionFromServer(regionLocation, regionNode);
2335        }
2336        regionInTransitionTracker.handleRegionStateNodeOperation(regionNode);
2337      } else {
2338        // revert
2339        regionNode.setState(state);
2340        regionNode.setRegionLocation(regionLocation);
2341      }
2342    });
2343    return future;
2344  }
2345
2346  // ============================================================================================
2347  // The above methods can only be called in TransitRegionStateProcedure(and related procedures)
2348  // ============================================================================================
2349
2350  // As soon as a server a crashed, region hosting on that are un-available, this method helps to
2351  // track those un-available regions. This method can only be called from ServerCrashProcedure.
2352  public void markRegionsAsCrashed(List<RegionInfo> regionsOnCrashedServer,
2353    ServerCrashProcedure scp) {
2354    ServerName crashedServerName = scp.getServerName();
2355    assert crashedServerName != null;
2356    for (RegionInfo regionInfo : regionsOnCrashedServer) {
2357      RegionStateNode node = regionStates.getOrCreateRegionStateNode(regionInfo);
2358      if (crashedServerName.equals(node.getRegionLocation())) {
2359        node.crashed(scp.getSubmittedTime());
2360        regionInTransitionTracker.regionCrashed(node);
2361      } else {
2362        LOG.warn("Region {} should be on crashed region server {} but is recorded on {}",
2363          regionInfo, crashedServerName, node.getRegionLocation());
2364      }
2365    }
2366  }
2367
2368  public void markRegionAsSplit(final RegionInfo parent, final ServerName serverName,
2369    final RegionInfo daughterA, final RegionInfo daughterB) throws IOException {
2370    // Update hbase:meta. Parent will be marked offline and split up in hbase:meta.
2371    // The parent stays in regionStates until cleared when removed by CatalogJanitor.
2372    // Update its state in regionStates to it shows as offline and split when read
2373    // later figuring what regions are in a table and what are not: see
2374    // regionStates#getRegionsOfTable
2375    final RegionStateNode node = regionStates.getOrCreateRegionStateNode(parent);
2376    node.setState(State.SPLIT);
2377    final RegionStateNode nodeA = regionStates.getOrCreateRegionStateNode(daughterA);
2378    nodeA.setState(State.SPLITTING_NEW);
2379    final RegionStateNode nodeB = regionStates.getOrCreateRegionStateNode(daughterB);
2380    nodeB.setState(State.SPLITTING_NEW);
2381
2382    TableDescriptor td = master.getTableDescriptors().get(parent.getTable());
2383    // TODO: here we just update the parent region info in meta, to set split and offline to true,
2384    // without changing the one in the region node. This is a bit confusing but the region info
2385    // field in RegionStateNode is not expected to be changed in the current design. Need to find a
2386    // possible way to address this problem, or at least adding more comments about the trick to
2387    // deal with this problem, that when you want to filter out split parent, you need to check both
2388    // the RegionState on whether it is split, and also the region info. If one of them matches then
2389    // it is a split parent. And usually only one of them can match, as after restart, the region
2390    // state will be changed from SPLIT to CLOSED.
2391    regionStateStore.splitRegion(parent, daughterA, daughterB, serverName, td);
2392    regionInTransitionTracker.handleRegionStateNodeOperation(node);
2393    regionInTransitionTracker.handleRegionStateNodeOperation(nodeA);
2394    regionInTransitionTracker.handleRegionStateNodeOperation(nodeB);
2395    if (shouldAssignFavoredNodes(parent)) {
2396      List<ServerName> onlineServers = this.master.getServerManager().getOnlineServersList();
2397      getFavoredNodePromoter().generateFavoredNodesForDaughter(onlineServers, parent, daughterA,
2398        daughterB);
2399    }
2400  }
2401
2402  /**
2403   * When called here, the merge has happened. The merged regions have been unassigned and the above
2404   * markRegionClosed has been called on each so they have been disassociated from a hosting Server.
2405   * The merged region will be open after this call. The merged regions are removed from hbase:meta
2406   * below. Later they are deleted from the filesystem by the catalog janitor running against
2407   * hbase:meta. It notices when the merged region no longer holds references to the old regions
2408   * (References are deleted after a compaction rewrites what the Reference points at but not until
2409   * the archiver chore runs, are the References removed).
2410   */
2411  public void markRegionAsMerged(final RegionInfo child, final ServerName serverName,
2412    RegionInfo[] mergeParents) throws IOException {
2413    final RegionStateNode node = regionStates.getOrCreateRegionStateNode(child);
2414    for (RegionInfo ri : mergeParents) {
2415      regionStates.deleteRegion(ri);
2416      regionInTransitionTracker.handleRegionDelete(ri);
2417    }
2418
2419    TableDescriptor td = master.getTableDescriptors().get(child.getTable());
2420    regionStateStore.mergeRegions(child, mergeParents, serverName, td);
2421    regionInTransitionTracker.handleRegionStateNodeOperation(node);
2422    if (shouldAssignFavoredNodes(child)) {
2423      getFavoredNodePromoter().generateFavoredNodesForMergedRegion(child, mergeParents);
2424    }
2425  }
2426
2427  /*
2428   * Favored nodes should be applied only when FavoredNodes balancer is configured and the region
2429   * belongs to a non-system table.
2430   */
2431  private boolean shouldAssignFavoredNodes(RegionInfo region) {
2432    return this.shouldAssignRegionsWithFavoredNodes
2433      && FavoredNodesManager.isFavoredNodeApplicable(region);
2434  }
2435
2436  // ============================================================================================
2437  // Assign Queue (Assign/Balance)
2438  // ============================================================================================
2439  private final ArrayList<RegionStateNode> pendingAssignQueue = new ArrayList<RegionStateNode>();
2440  private final ReentrantLock assignQueueLock = new ReentrantLock();
2441  private final Condition assignQueueFullCond = assignQueueLock.newCondition();
2442
2443  /**
2444   * Add the assign operation to the assignment queue. The pending assignment operation will be
2445   * processed, and each region will be assigned by a server using the balancer.
2446   */
2447  protected void queueAssign(final RegionStateNode regionNode) {
2448    regionNode.getProcedureEvent().suspend();
2449
2450    // TODO: quick-start for meta and the other sys-tables?
2451    assignQueueLock.lock();
2452    try {
2453      pendingAssignQueue.add(regionNode);
2454      if (
2455        regionNode.isSystemTable() || pendingAssignQueue.size() == 1
2456          || pendingAssignQueue.size() >= assignDispatchWaitQueueMaxSize
2457      ) {
2458        assignQueueFullCond.signal();
2459      }
2460    } finally {
2461      assignQueueLock.unlock();
2462    }
2463  }
2464
2465  private void startAssignmentThread() {
2466    assignThread = new Thread(master.getServerName().toShortString()) {
2467      @Override
2468      public void run() {
2469        while (isRunning()) {
2470          processAssignQueue();
2471        }
2472        pendingAssignQueue.clear();
2473      }
2474    };
2475    assignThread.setDaemon(true);
2476    assignThread.start();
2477  }
2478
2479  private void stopAssignmentThread() {
2480    assignQueueSignal();
2481    try {
2482      while (assignThread.isAlive()) {
2483        assignQueueSignal();
2484        assignThread.join(250);
2485      }
2486    } catch (InterruptedException e) {
2487      LOG.warn("join interrupted", e);
2488      Thread.currentThread().interrupt();
2489    }
2490  }
2491
2492  private void assignQueueSignal() {
2493    assignQueueLock.lock();
2494    try {
2495      assignQueueFullCond.signal();
2496    } finally {
2497      assignQueueLock.unlock();
2498    }
2499  }
2500
2501  @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP")
2502  private HashMap<RegionInfo, RegionStateNode> waitOnAssignQueue() {
2503    HashMap<RegionInfo, RegionStateNode> regions = null;
2504
2505    assignQueueLock.lock();
2506    try {
2507      if (pendingAssignQueue.isEmpty() && isRunning()) {
2508        assignQueueFullCond.await();
2509      }
2510
2511      if (!isRunning()) {
2512        return null;
2513      }
2514      assignQueueFullCond.await(assignDispatchWaitMillis, TimeUnit.MILLISECONDS);
2515      regions = new HashMap<RegionInfo, RegionStateNode>(pendingAssignQueue.size());
2516      for (RegionStateNode regionNode : pendingAssignQueue) {
2517        regions.put(regionNode.getRegionInfo(), regionNode);
2518      }
2519      pendingAssignQueue.clear();
2520    } catch (InterruptedException e) {
2521      LOG.warn("got interrupted ", e);
2522      Thread.currentThread().interrupt();
2523    } finally {
2524      assignQueueLock.unlock();
2525    }
2526    return regions;
2527  }
2528
2529  private void processAssignQueue() {
2530    final HashMap<RegionInfo, RegionStateNode> regions = waitOnAssignQueue();
2531    if (regions == null || regions.size() == 0 || !isRunning()) {
2532      return;
2533    }
2534
2535    if (LOG.isTraceEnabled()) {
2536      LOG.trace("PROCESS ASSIGN QUEUE regionCount=" + regions.size());
2537    }
2538
2539    // TODO: Optimize balancer. pass a RegionPlan?
2540    final HashMap<RegionInfo, ServerName> retainMap = new HashMap<>();
2541    final List<RegionInfo> userHRIs = new ArrayList<>(regions.size());
2542    // Regions for system tables requiring reassignment
2543    final List<RegionInfo> systemHRIs = new ArrayList<>();
2544    for (RegionStateNode regionStateNode : regions.values()) {
2545      boolean sysTable = regionStateNode.isSystemTable();
2546      final List<RegionInfo> hris = sysTable ? systemHRIs : userHRIs;
2547      if (regionStateNode.getRegionLocation() != null) {
2548        retainMap.put(regionStateNode.getRegionInfo(), regionStateNode.getRegionLocation());
2549      } else {
2550        hris.add(regionStateNode.getRegionInfo());
2551      }
2552    }
2553
2554    // TODO: connect with the listener to invalidate the cache
2555
2556    // TODO use events
2557    List<ServerName> servers = master.getServerManager().createDestinationServersList();
2558    for (int i = 0; servers.size() < 1; ++i) {
2559      // Report every fourth time around this loop; try not to flood log.
2560      if (i % 4 == 0) {
2561        LOG.warn("No servers available; cannot place " + regions.size() + " unassigned regions.");
2562      }
2563
2564      if (!isRunning()) {
2565        LOG.debug("Stopped! Dropping assign of " + regions.size() + " queued regions.");
2566        return;
2567      }
2568      Threads.sleep(250);
2569      servers = master.getServerManager().createDestinationServersList();
2570    }
2571
2572    if (!systemHRIs.isEmpty()) {
2573      // System table regions requiring reassignment are present, get region servers
2574      // not available for system table regions
2575      final List<ServerName> excludeServers = getExcludedServersForSystemTable();
2576      List<ServerName> serversForSysTables =
2577        servers.stream().filter(s -> !excludeServers.contains(s)).collect(Collectors.toList());
2578      if (serversForSysTables.isEmpty()) {
2579        LOG.warn("Filtering old server versions and the excluded produced an empty set; "
2580          + "instead considering all candidate servers!");
2581      }
2582      LOG.debug("Processing assignQueue; systemServersCount=" + serversForSysTables.size()
2583        + ", allServersCount=" + servers.size());
2584      processAssignmentPlans(regions, null, systemHRIs,
2585        serversForSysTables.isEmpty() && !containsBogusAssignments(regions, systemHRIs)
2586          ? servers
2587          : serversForSysTables);
2588    }
2589
2590    processAssignmentPlans(regions, retainMap, userHRIs, servers);
2591  }
2592
2593  private boolean containsBogusAssignments(Map<RegionInfo, RegionStateNode> regions,
2594    List<RegionInfo> hirs) {
2595    for (RegionInfo ri : hirs) {
2596      if (
2597        regions.get(ri).getRegionLocation() != null
2598          && regions.get(ri).getRegionLocation().equals(LoadBalancer.BOGUS_SERVER_NAME)
2599      ) {
2600        return true;
2601      }
2602    }
2603    return false;
2604  }
2605
2606  private void processAssignmentPlans(final HashMap<RegionInfo, RegionStateNode> regions,
2607    final HashMap<RegionInfo, ServerName> retainMap, final List<RegionInfo> hris,
2608    final List<ServerName> servers) {
2609    boolean isTraceEnabled = LOG.isTraceEnabled();
2610    if (isTraceEnabled) {
2611      LOG.trace("Available servers count=" + servers.size() + ": " + servers);
2612    }
2613
2614    final LoadBalancer balancer = getBalancer();
2615    // ask the balancer where to place regions
2616    if (retainMap != null && !retainMap.isEmpty()) {
2617      if (isTraceEnabled) {
2618        LOG.trace("retain assign regions=" + retainMap);
2619      }
2620      try {
2621        acceptPlan(regions, balancer.retainAssignment(retainMap, servers));
2622      } catch (IOException e) {
2623        LOG.warn("unable to retain assignment", e);
2624        addToPendingAssignment(regions, retainMap.keySet());
2625      }
2626    }
2627
2628    // TODO: Do we need to split retain and round-robin?
2629    // the retain seems to fallback to round-robin/random if the region is not in the map.
2630    if (!hris.isEmpty()) {
2631      Collections.sort(hris, RegionInfo.COMPARATOR);
2632      if (isTraceEnabled) {
2633        LOG.trace("round robin regions=" + hris);
2634      }
2635      try {
2636        acceptPlan(regions, balancer.roundRobinAssignment(hris, servers));
2637      } catch (IOException e) {
2638        LOG.warn("unable to round-robin assignment", e);
2639        addToPendingAssignment(regions, hris);
2640      }
2641    }
2642  }
2643
2644  private void acceptPlan(final HashMap<RegionInfo, RegionStateNode> regions,
2645    final Map<ServerName, List<RegionInfo>> plan) throws HBaseIOException {
2646    final ProcedureEvent<?>[] events = new ProcedureEvent[regions.size()];
2647    final long st = EnvironmentEdgeManager.currentTime();
2648
2649    if (plan.isEmpty()) {
2650      throw new HBaseIOException("unable to compute plans for regions=" + regions.size());
2651    }
2652
2653    int evcount = 0;
2654    for (Map.Entry<ServerName, List<RegionInfo>> entry : plan.entrySet()) {
2655      final ServerName server = entry.getKey();
2656      for (RegionInfo hri : entry.getValue()) {
2657        final RegionStateNode regionNode = regions.get(hri);
2658        regionNode.setRegionLocation(server);
2659        if (server.equals(LoadBalancer.BOGUS_SERVER_NAME) && regionNode.isSystemTable()) {
2660          assignQueueLock.lock();
2661          try {
2662            pendingAssignQueue.add(regionNode);
2663          } finally {
2664            assignQueueLock.unlock();
2665          }
2666        } else {
2667          events[evcount++] = regionNode.getProcedureEvent();
2668        }
2669      }
2670    }
2671    ProcedureEvent.wakeEvents(getProcedureScheduler(), events);
2672
2673    final long et = EnvironmentEdgeManager.currentTime();
2674    if (LOG.isTraceEnabled()) {
2675      LOG.trace("ASSIGN ACCEPT " + events.length + " -> " + StringUtils.humanTimeDiff(et - st));
2676    }
2677  }
2678
2679  private void addToPendingAssignment(final HashMap<RegionInfo, RegionStateNode> regions,
2680    final Collection<RegionInfo> pendingRegions) {
2681    assignQueueLock.lock();
2682    try {
2683      for (RegionInfo hri : pendingRegions) {
2684        pendingAssignQueue.add(regions.get(hri));
2685      }
2686    } finally {
2687      assignQueueLock.unlock();
2688    }
2689  }
2690
2691  /**
2692   * For a given cluster with mixed versions of servers, get a list of servers with lower versions,
2693   * where system table regions should not be assigned to. For system table, we must assign regions
2694   * to a server with highest version. However, we can disable this exclusion using config:
2695   * "hbase.min.version.move.system.tables" if checkForMinVersion is true. Detailed explanation
2696   * available with definition of minVersionToMoveSysTables.
2697   * @return List of Excluded servers for System table regions.
2698   */
2699  public List<ServerName> getExcludedServersForSystemTable() {
2700    // TODO: This should be a cached list kept by the ServerManager rather than calculated on each
2701    // move or system region assign. The RegionServerTracker keeps list of online Servers with
2702    // RegionServerInfo that includes Version.
2703    List<Pair<ServerName, String>> serverList =
2704      master.getServerManager().getOnlineServersList().stream()
2705        .map(s -> new Pair<>(s, master.getRegionServerVersion(s))).collect(Collectors.toList());
2706    if (serverList.isEmpty()) {
2707      return new ArrayList<>();
2708    }
2709    String highestVersion = Collections
2710      .max(serverList, (o1, o2) -> VersionInfo.compareVersion(o1.getSecond(), o2.getSecond()))
2711      .getSecond();
2712    if (!DEFAULT_MIN_VERSION_MOVE_SYS_TABLES_CONFIG.equals(minVersionToMoveSysTables)) {
2713      int comparedValue = VersionInfo.compareVersion(minVersionToMoveSysTables, highestVersion);
2714      if (comparedValue > 0) {
2715        return new ArrayList<>();
2716      }
2717    }
2718    return serverList.stream().filter(pair -> !pair.getSecond().equals(highestVersion))
2719      .map(Pair::getFirst).collect(Collectors.toList());
2720  }
2721
2722  MasterServices getMaster() {
2723    return master;
2724  }
2725
2726  /** Returns a snapshot of rsReports */
2727  public Map<ServerName, Set<byte[]>> getRSReports() {
2728    Map<ServerName, Set<byte[]>> rsReportsSnapshot = new HashMap<>();
2729    synchronized (rsReports) {
2730      rsReports.entrySet().forEach(e -> rsReportsSnapshot.put(e.getKey(), e.getValue()));
2731    }
2732    return rsReportsSnapshot;
2733  }
2734
2735  /**
2736   * Provide regions state count for given table. e.g howmany regions of give table are
2737   * opened/closed/rit etc
2738   * @param tableName TableName
2739   * @return region states count
2740   */
2741  public RegionStatesCount getRegionStatesCount(TableName tableName) {
2742    int openRegionsCount = 0;
2743    int closedRegionCount = 0;
2744    int ritCount = 0;
2745    int splitRegionCount = 0;
2746    int totalRegionCount = 0;
2747    if (!isTableDisabled(tableName)) {
2748      final List<RegionState> states = regionStates.getTableRegionStates(tableName);
2749      for (RegionState regionState : states) {
2750        if (regionState.isOpened()) {
2751          openRegionsCount++;
2752        } else if (regionState.isClosed()) {
2753          closedRegionCount++;
2754        } else if (regionState.isSplit()) {
2755          splitRegionCount++;
2756        }
2757      }
2758      totalRegionCount = states.size();
2759      ritCount = totalRegionCount - openRegionsCount - splitRegionCount;
2760    }
2761    return new RegionStatesCount.RegionStatesCountBuilder().setOpenRegions(openRegionsCount)
2762      .setClosedRegions(closedRegionCount).setSplitRegions(splitRegionCount)
2763      .setRegionsInTransition(ritCount).setTotalRegions(totalRegionCount).build();
2764  }
2765
2766}