001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.assignment;
019
020import edu.umd.cs.findbugs.annotations.NonNull;
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Collection;
024import java.util.Collections;
025import java.util.HashMap;
026import java.util.HashSet;
027import java.util.List;
028import java.util.Map;
029import java.util.Set;
030import java.util.concurrent.CompletableFuture;
031import java.util.concurrent.Future;
032import java.util.concurrent.TimeUnit;
033import java.util.concurrent.atomic.AtomicBoolean;
034import java.util.concurrent.locks.Condition;
035import java.util.concurrent.locks.ReentrantLock;
036import java.util.function.Consumer;
037import java.util.stream.Collectors;
038import java.util.stream.Stream;
039import org.apache.hadoop.conf.Configuration;
040import org.apache.hadoop.hbase.CatalogFamilyFormat;
041import org.apache.hadoop.hbase.DoNotRetryIOException;
042import org.apache.hadoop.hbase.HBaseIOException;
043import org.apache.hadoop.hbase.HConstants;
044import org.apache.hadoop.hbase.PleaseHoldException;
045import org.apache.hadoop.hbase.ServerName;
046import org.apache.hadoop.hbase.TableName;
047import org.apache.hadoop.hbase.UnknownRegionException;
048import org.apache.hadoop.hbase.client.DoNotRetryRegionException;
049import org.apache.hadoop.hbase.client.MasterSwitchType;
050import org.apache.hadoop.hbase.client.RegionInfo;
051import org.apache.hadoop.hbase.client.RegionInfoBuilder;
052import org.apache.hadoop.hbase.client.RegionReplicaUtil;
053import org.apache.hadoop.hbase.client.RegionStatesCount;
054import org.apache.hadoop.hbase.client.Result;
055import org.apache.hadoop.hbase.client.ResultScanner;
056import org.apache.hadoop.hbase.client.Scan;
057import org.apache.hadoop.hbase.client.TableDescriptor;
058import org.apache.hadoop.hbase.client.TableState;
059import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
060import org.apache.hadoop.hbase.favored.FavoredNodesManager;
061import org.apache.hadoop.hbase.favored.FavoredNodesPromoter;
062import org.apache.hadoop.hbase.master.LoadBalancer;
063import org.apache.hadoop.hbase.master.MasterServices;
064import org.apache.hadoop.hbase.master.MetricsAssignmentManager;
065import org.apache.hadoop.hbase.master.RegionPlan;
066import org.apache.hadoop.hbase.master.RegionState;
067import org.apache.hadoop.hbase.master.RegionState.State;
068import org.apache.hadoop.hbase.master.ServerManager;
069import org.apache.hadoop.hbase.master.TableStateManager;
070import org.apache.hadoop.hbase.master.balancer.FavoredStochasticBalancer;
071import org.apache.hadoop.hbase.master.procedure.HBCKServerCrashProcedure;
072import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
073import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler;
074import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait;
075import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
076import org.apache.hadoop.hbase.master.procedure.TruncateRegionProcedure;
077import org.apache.hadoop.hbase.master.region.MasterRegion;
078import org.apache.hadoop.hbase.procedure2.Procedure;
079import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
080import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
081import org.apache.hadoop.hbase.procedure2.ProcedureInMemoryChore;
082import org.apache.hadoop.hbase.procedure2.util.StringUtils;
083import org.apache.hadoop.hbase.regionserver.SequenceId;
084import org.apache.hadoop.hbase.rsgroup.RSGroupBasedLoadBalancer;
085import org.apache.hadoop.hbase.util.Bytes;
086import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
087import org.apache.hadoop.hbase.util.FutureUtils;
088import org.apache.hadoop.hbase.util.Pair;
089import org.apache.hadoop.hbase.util.Threads;
090import org.apache.hadoop.hbase.util.VersionInfo;
091import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
092import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
093import org.apache.yetus.audience.InterfaceAudience;
094import org.apache.zookeeper.KeeperException;
095import org.slf4j.Logger;
096import org.slf4j.LoggerFactory;
097
098import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
099import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
100import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
101import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
102import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
103
104/**
105 * The AssignmentManager is the coordinator for region assign/unassign operations.
106 * <ul>
107 * <li>In-memory states of regions and servers are stored in {@link RegionStates}.</li>
108 * <li>hbase:meta state updates are handled by {@link RegionStateStore}.</li>
109 * </ul>
110 * Regions are created by CreateTable, Split, Merge. Regions are deleted by DeleteTable, Split,
111 * Merge. Assigns are triggered by CreateTable, EnableTable, Split, Merge, ServerCrash. Unassigns
112 * are triggered by DisableTable, Split, Merge
113 */
114@InterfaceAudience.Private
115public class AssignmentManager {
116  private static final Logger LOG = LoggerFactory.getLogger(AssignmentManager.class);
117
118  // TODO: AMv2
119  // - handle region migration from hbase1 to hbase2.
120  // - handle sys table assignment first (e.g. acl, namespace)
121  // - handle table priorities
122  // - If ServerBusyException trying to update hbase:meta, we abort the Master
123  // See updateRegionLocation in RegionStateStore.
124  //
125  // See also
126  // https://docs.google.com/document/d/1eVKa7FHdeoJ1-9o8yZcOTAQbv0u0bblBlCCzVSIn69g/edit#heading=h.ystjyrkbtoq5
127  // for other TODOs.
128
129  public static final String BOOTSTRAP_THREAD_POOL_SIZE_CONF_KEY =
130    "hbase.assignment.bootstrap.thread.pool.size";
131
132  public static final String ASSIGN_DISPATCH_WAIT_MSEC_CONF_KEY =
133    "hbase.assignment.dispatch.wait.msec";
134  private static final int DEFAULT_ASSIGN_DISPATCH_WAIT_MSEC = 150;
135
136  public static final String ASSIGN_DISPATCH_WAITQ_MAX_CONF_KEY =
137    "hbase.assignment.dispatch.wait.queue.max.size";
138  private static final int DEFAULT_ASSIGN_DISPATCH_WAITQ_MAX = 100;
139
140  public static final String RIT_CHORE_INTERVAL_MSEC_CONF_KEY =
141    "hbase.assignment.rit.chore.interval.msec";
142  private static final int DEFAULT_RIT_CHORE_INTERVAL_MSEC = 60 * 1000;
143
144  public static final String DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC_CONF_KEY =
145    "hbase.assignment.dead.region.metric.chore.interval.msec";
146  private static final int DEFAULT_DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC = 120 * 1000;
147
148  public static final String ASSIGN_MAX_ATTEMPTS = "hbase.assignment.maximum.attempts";
149  private static final int DEFAULT_ASSIGN_MAX_ATTEMPTS = Integer.MAX_VALUE;
150
151  public static final String ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS =
152    "hbase.assignment.retry.immediately.maximum.attempts";
153  private static final int DEFAULT_ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS = 3;
154
155  /** Region in Transition metrics threshold time */
156  public static final String METRICS_RIT_STUCK_WARNING_THRESHOLD =
157    "hbase.metrics.rit.stuck.warning.threshold";
158  private static final int DEFAULT_RIT_STUCK_WARNING_THRESHOLD = 60 * 1000;
159  public static final String UNEXPECTED_STATE_REGION = "Unexpected state for ";
160
161  public static final String FORCE_REGION_RETAINMENT = "hbase.master.scp.retain.assignment.force";
162
163  public static final boolean DEFAULT_FORCE_REGION_RETAINMENT = false;
164
165  /** The wait time in millis before checking again if the region's previous RS is back online */
166  public static final String FORCE_REGION_RETAINMENT_WAIT_INTERVAL =
167    "hbase.master.scp.retain.assignment.force.wait-interval";
168
169  public static final long DEFAULT_FORCE_REGION_RETAINMENT_WAIT_INTERVAL = 50;
170
171  /**
172   * The number of times to check if the region's previous RS is back online, before giving up and
173   * proceeding with assignment on a new RS
174   */
175  public static final String FORCE_REGION_RETAINMENT_RETRIES =
176    "hbase.master.scp.retain.assignment.force.retries";
177
178  public static final int DEFAULT_FORCE_REGION_RETAINMENT_RETRIES = 600;
179
180  private final ProcedureEvent<?> metaAssignEvent = new ProcedureEvent<>("meta assign");
181  private final ProcedureEvent<?> metaLoadEvent = new ProcedureEvent<>("meta load");
182
183  private final MetricsAssignmentManager metrics;
184  private final RegionInTransitionChore ritChore;
185  private final DeadServerMetricRegionChore deadMetricChore;
186  private final MasterServices master;
187
188  private final AtomicBoolean running = new AtomicBoolean(false);
189  private final RegionStates regionStates = new RegionStates();
190  private final RegionStateStore regionStateStore;
191
192  /**
193   * When the operator uses this configuration option, any version between the current cluster
194   * version and the value of "hbase.min.version.move.system.tables" does not trigger any
195   * auto-region movement. Auto-region movement here refers to auto-migration of system table
196   * regions to newer server versions. It is assumed that the configured range of versions does not
197   * require special handling of moving system table regions to higher versioned RegionServer. This
198   * auto-migration is done by {@link #checkIfShouldMoveSystemRegionAsync()}. Example: Let's assume
199   * the cluster is on version 1.4.0 and we have set "hbase.min.version.move.system.tables" as
200   * "2.0.0". Now if we upgrade one RegionServer on 1.4.0 cluster to 1.6.0 (< 2.0.0), then
201   * AssignmentManager will not move hbase:meta, hbase:namespace and other system table regions to
202   * newly brought up RegionServer 1.6.0 as part of auto-migration. However, if we upgrade one
203   * RegionServer on 1.4.0 cluster to 2.2.0 (> 2.0.0), then AssignmentManager will move all system
204   * table regions to newly brought up RegionServer 2.2.0 as part of auto-migration done by
205   * {@link #checkIfShouldMoveSystemRegionAsync()}. "hbase.min.version.move.system.tables" is
206   * introduced as part of HBASE-22923.
207   */
208  private final String minVersionToMoveSysTables;
209
210  private static final String MIN_VERSION_MOVE_SYS_TABLES_CONFIG =
211    "hbase.min.version.move.system.tables";
212  private static final String DEFAULT_MIN_VERSION_MOVE_SYS_TABLES_CONFIG = "";
213
214  private final Map<ServerName, Set<byte[]>> rsReports = new HashMap<>();
215
216  private final boolean shouldAssignRegionsWithFavoredNodes;
217  private final int assignDispatchWaitQueueMaxSize;
218  private final int assignDispatchWaitMillis;
219  private final int assignMaxAttempts;
220  private final int assignRetryImmediatelyMaxAttempts;
221
222  private final MasterRegion masterRegion;
223
224  private final Object checkIfShouldMoveSystemRegionLock = new Object();
225
226  private Thread assignThread;
227
228  private final boolean forceRegionRetainment;
229
230  private final long forceRegionRetainmentWaitInterval;
231
232  private final int forceRegionRetainmentRetries;
233
234  public AssignmentManager(MasterServices master, MasterRegion masterRegion) {
235    this(master, masterRegion, new RegionStateStore(master, masterRegion));
236  }
237
238  AssignmentManager(MasterServices master, MasterRegion masterRegion, RegionStateStore stateStore) {
239    this.master = master;
240    this.regionStateStore = stateStore;
241    this.metrics = new MetricsAssignmentManager();
242    this.masterRegion = masterRegion;
243
244    final Configuration conf = master.getConfiguration();
245
246    // Only read favored nodes if using the favored nodes load balancer.
247    this.shouldAssignRegionsWithFavoredNodes = FavoredStochasticBalancer.class
248      .isAssignableFrom(conf.getClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, Object.class));
249
250    this.assignDispatchWaitMillis =
251      conf.getInt(ASSIGN_DISPATCH_WAIT_MSEC_CONF_KEY, DEFAULT_ASSIGN_DISPATCH_WAIT_MSEC);
252    this.assignDispatchWaitQueueMaxSize =
253      conf.getInt(ASSIGN_DISPATCH_WAITQ_MAX_CONF_KEY, DEFAULT_ASSIGN_DISPATCH_WAITQ_MAX);
254
255    this.assignMaxAttempts =
256      Math.max(1, conf.getInt(ASSIGN_MAX_ATTEMPTS, DEFAULT_ASSIGN_MAX_ATTEMPTS));
257    this.assignRetryImmediatelyMaxAttempts = conf.getInt(ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS,
258      DEFAULT_ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS);
259
260    int ritChoreInterval =
261      conf.getInt(RIT_CHORE_INTERVAL_MSEC_CONF_KEY, DEFAULT_RIT_CHORE_INTERVAL_MSEC);
262    this.ritChore = new RegionInTransitionChore(ritChoreInterval);
263
264    int deadRegionChoreInterval = conf.getInt(DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC_CONF_KEY,
265      DEFAULT_DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC);
266    if (deadRegionChoreInterval > 0) {
267      this.deadMetricChore = new DeadServerMetricRegionChore(deadRegionChoreInterval);
268    } else {
269      this.deadMetricChore = null;
270    }
271    minVersionToMoveSysTables =
272      conf.get(MIN_VERSION_MOVE_SYS_TABLES_CONFIG, DEFAULT_MIN_VERSION_MOVE_SYS_TABLES_CONFIG);
273
274    forceRegionRetainment =
275      conf.getBoolean(FORCE_REGION_RETAINMENT, DEFAULT_FORCE_REGION_RETAINMENT);
276    forceRegionRetainmentWaitInterval = conf.getLong(FORCE_REGION_RETAINMENT_WAIT_INTERVAL,
277      DEFAULT_FORCE_REGION_RETAINMENT_WAIT_INTERVAL);
278    forceRegionRetainmentRetries =
279      conf.getInt(FORCE_REGION_RETAINMENT_RETRIES, DEFAULT_FORCE_REGION_RETAINMENT_RETRIES);
280  }
281
282  private void mirrorMetaLocations() throws IOException, KeeperException {
283    // For compatibility, mirror the meta region state to zookeeper
284    // And we still need to use zookeeper to publish the meta region locations to region
285    // server, so they can serve as ClientMetaService
286    ZKWatcher zk = master.getZooKeeper();
287    if (zk == null || !zk.getRecoverableZooKeeper().getState().isAlive()) {
288      // this is possible in tests, we do not provide a zk watcher or the zk watcher has been closed
289      return;
290    }
291    Collection<RegionStateNode> metaStates = regionStates.getRegionStateNodes();
292    for (RegionStateNode metaState : metaStates) {
293      MetaTableLocator.setMetaLocation(zk, metaState.getRegionLocation(),
294        metaState.getRegionInfo().getReplicaId(), metaState.getState());
295    }
296    int replicaCount = metaStates.size();
297    // remove extra mirror locations
298    for (String znode : zk.getMetaReplicaNodes()) {
299      int replicaId = zk.getZNodePaths().getMetaReplicaIdFromZNode(znode);
300      if (replicaId >= replicaCount) {
301        MetaTableLocator.deleteMetaLocation(zk, replicaId);
302      }
303    }
304  }
305
306  public void start() throws IOException, KeeperException {
307    if (!running.compareAndSet(false, true)) {
308      return;
309    }
310
311    LOG.trace("Starting assignment manager");
312
313    // Start the Assignment Thread
314    startAssignmentThread();
315    // load meta region states.
316    // here we are still in the early steps of active master startup. There is only one thread(us)
317    // can access AssignmentManager and create region node, so here we do not need to lock the
318    // region node.
319    try (ResultScanner scanner =
320      masterRegion.getScanner(new Scan().addFamily(HConstants.CATALOG_FAMILY))) {
321      for (;;) {
322        Result result = scanner.next();
323        if (result == null) {
324          break;
325        }
326        RegionStateStore
327          .visitMetaEntry((r, regionInfo, state, regionLocation, lastHost, openSeqNum) -> {
328            RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo);
329            regionNode.setState(state);
330            regionNode.setLastHost(lastHost);
331            regionNode.setRegionLocation(regionLocation);
332            regionNode.setOpenSeqNum(openSeqNum);
333            if (regionNode.getProcedure() != null) {
334              regionNode.getProcedure().stateLoaded(this, regionNode);
335            }
336            if (regionLocation != null) {
337              // TODO: this could lead to some orphan server state nodes, as it is possible that the
338              // region server is already dead and its SCP has already finished but we have
339              // persisted an opening state on this region server. Finally the TRSP will assign the
340              // region to another region server, so it will not cause critical problems, just waste
341              // some memory as no one will try to cleanup these orphan server state nodes.
342              regionStates.createServer(regionLocation);
343              regionStates.addRegionToServer(regionNode);
344            }
345            if (RegionReplicaUtil.isDefaultReplica(regionInfo.getReplicaId())) {
346              setMetaAssigned(regionInfo, state == State.OPEN);
347            }
348            LOG.debug("Loaded hbase:meta {}", regionNode);
349          }, result);
350      }
351    }
352    mirrorMetaLocations();
353  }
354
355  /**
356   * Create RegionStateNode based on the TRSP list, and attach the TRSP to the RegionStateNode.
357   * <p>
358   * This is used to restore the RIT region list, so we do not need to restore it in the loadingMeta
359   * method below. And it is also very important as now before submitting a TRSP, we need to attach
360   * it to the RegionStateNode, which acts like a guard, so we need to restore this information at
361   * the very beginning, before we start processing any procedures.
362   */
363  public void setupRIT(List<TransitRegionStateProcedure> procs) {
364    procs.forEach(proc -> {
365      RegionInfo regionInfo = proc.getRegion();
366      RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo);
367      TransitRegionStateProcedure existingProc = regionNode.getProcedure();
368      if (existingProc != null) {
369        // This is possible, as we will detach the procedure from the RSN before we
370        // actually finish the procedure. This is because that, we will detach the TRSP from the RSN
371        // during execution, at that time, the procedure has not been marked as done in the pv2
372        // framework yet, so it is possible that we schedule a new TRSP immediately and when
373        // arriving here, we will find out that there are multiple TRSPs for the region. But we can
374        // make sure that, only the last one can take the charge, the previous ones should have all
375        // been finished already. So here we will compare the proc id, the greater one will win.
376        if (existingProc.getProcId() < proc.getProcId()) {
377          // the new one wins, unset and set it to the new one below
378          regionNode.unsetProcedure(existingProc);
379        } else {
380          // the old one wins, skip
381          return;
382        }
383      }
384      LOG.info("Attach {} to {} to restore RIT", proc, regionNode);
385      regionNode.setProcedure(proc);
386    });
387  }
388
389  public void stop() {
390    if (!running.compareAndSet(true, false)) {
391      return;
392    }
393
394    LOG.info("Stopping assignment manager");
395
396    // The AM is started before the procedure executor,
397    // but the actual work will be loaded/submitted only once we have the executor
398    final boolean hasProcExecutor = master.getMasterProcedureExecutor() != null;
399
400    // Remove the RIT chore
401    if (hasProcExecutor) {
402      master.getMasterProcedureExecutor().removeChore(this.ritChore);
403      if (this.deadMetricChore != null) {
404        master.getMasterProcedureExecutor().removeChore(this.deadMetricChore);
405      }
406    }
407
408    // Stop the Assignment Thread
409    stopAssignmentThread();
410
411    // Stop the RegionStateStore
412    regionStates.clear();
413
414    // Update meta events (for testing)
415    if (hasProcExecutor) {
416      metaLoadEvent.suspend();
417      for (RegionInfo hri : getMetaRegionSet()) {
418        setMetaAssigned(hri, false);
419      }
420    }
421  }
422
423  public boolean isRunning() {
424    return running.get();
425  }
426
427  public Configuration getConfiguration() {
428    return master.getConfiguration();
429  }
430
431  public MetricsAssignmentManager getAssignmentManagerMetrics() {
432    return metrics;
433  }
434
435  private LoadBalancer getBalancer() {
436    return master.getLoadBalancer();
437  }
438
439  private FavoredNodesPromoter getFavoredNodePromoter() {
440    return (FavoredNodesPromoter) ((RSGroupBasedLoadBalancer) master.getLoadBalancer())
441      .getInternalBalancer();
442  }
443
444  private MasterProcedureEnv getProcedureEnvironment() {
445    return master.getMasterProcedureExecutor().getEnvironment();
446  }
447
448  private MasterProcedureScheduler getProcedureScheduler() {
449    return getProcedureEnvironment().getProcedureScheduler();
450  }
451
452  int getAssignMaxAttempts() {
453    return assignMaxAttempts;
454  }
455
456  public boolean isForceRegionRetainment() {
457    return forceRegionRetainment;
458  }
459
460  public long getForceRegionRetainmentWaitInterval() {
461    return forceRegionRetainmentWaitInterval;
462  }
463
464  public int getForceRegionRetainmentRetries() {
465    return forceRegionRetainmentRetries;
466  }
467
468  int getAssignRetryImmediatelyMaxAttempts() {
469    return assignRetryImmediatelyMaxAttempts;
470  }
471
472  public RegionStates getRegionStates() {
473    return regionStates;
474  }
475
476  /**
477   * Returns the regions hosted by the specified server.
478   * <p/>
479   * Notice that, for SCP, after we submit the SCP, no one can change the region list for the
480   * ServerStateNode so we do not need any locks here. And for other usage, this can only give you a
481   * snapshot of the current region list for this server, which means, right after you get the
482   * region list, new regions may be moved to this server or some regions may be moved out from this
483   * server, so you should not use it critically if you need strong consistency.
484   */
485  public List<RegionInfo> getRegionsOnServer(ServerName serverName) {
486    ServerStateNode serverInfo = regionStates.getServerNode(serverName);
487    if (serverInfo == null) {
488      return Collections.emptyList();
489    }
490    return serverInfo.getRegionInfoList();
491  }
492
493  private RegionInfo getRegionInfo(RegionStateNode rsn) {
494    if (rsn.isSplit() && !rsn.getRegionInfo().isSplit()) {
495      // see the comments in markRegionAsSplit on why we need to do this converting.
496      return RegionInfoBuilder.newBuilder(rsn.getRegionInfo()).setSplit(true).setOffline(true)
497        .build();
498    } else {
499      return rsn.getRegionInfo();
500    }
501  }
502
503  private Stream<RegionStateNode> getRegionStateNodes(TableName tableName,
504    boolean excludeOfflinedSplitParents) {
505    Stream<RegionStateNode> stream = regionStates.getTableRegionStateNodes(tableName).stream();
506    if (excludeOfflinedSplitParents) {
507      return stream.filter(rsn -> !rsn.isSplit());
508    } else {
509      return stream;
510    }
511  }
512
513  public List<RegionInfo> getTableRegions(TableName tableName,
514    boolean excludeOfflinedSplitParents) {
515    return getRegionStateNodes(tableName, excludeOfflinedSplitParents).map(this::getRegionInfo)
516      .collect(Collectors.toList());
517  }
518
519  public List<Pair<RegionInfo, ServerName>> getTableRegionsAndLocations(TableName tableName,
520    boolean excludeOfflinedSplitParents) {
521    return getRegionStateNodes(tableName, excludeOfflinedSplitParents)
522      .map(rsn -> Pair.newPair(getRegionInfo(rsn), rsn.getRegionLocation()))
523      .collect(Collectors.toList());
524  }
525
526  public RegionStateStore getRegionStateStore() {
527    return regionStateStore;
528  }
529
530  public List<ServerName> getFavoredNodes(final RegionInfo regionInfo) {
531    return this.shouldAssignRegionsWithFavoredNodes
532      ? getFavoredNodePromoter().getFavoredNodes(regionInfo)
533      : ServerName.EMPTY_SERVER_LIST;
534  }
535
536  // ============================================================================================
537  // Table State Manager helpers
538  // ============================================================================================
539  private TableStateManager getTableStateManager() {
540    return master.getTableStateManager();
541  }
542
543  private boolean isTableEnabled(final TableName tableName) {
544    return getTableStateManager().isTableState(tableName, TableState.State.ENABLED);
545  }
546
547  private boolean isTableDisabled(final TableName tableName) {
548    return getTableStateManager().isTableState(tableName, TableState.State.DISABLED,
549      TableState.State.DISABLING);
550  }
551
552  // ============================================================================================
553  // META Helpers
554  // ============================================================================================
555  private boolean isMetaRegion(final RegionInfo regionInfo) {
556    return regionInfo.isMetaRegion();
557  }
558
559  public boolean isMetaRegion(final byte[] regionName) {
560    return getMetaRegionFromName(regionName) != null;
561  }
562
563  public RegionInfo getMetaRegionFromName(final byte[] regionName) {
564    for (RegionInfo hri : getMetaRegionSet()) {
565      if (Bytes.equals(hri.getRegionName(), regionName)) {
566        return hri;
567      }
568    }
569    return null;
570  }
571
572  public boolean isCarryingMeta(final ServerName serverName) {
573    // TODO: handle multiple meta
574    return isCarryingRegion(serverName, RegionInfoBuilder.FIRST_META_REGIONINFO);
575  }
576
577  private boolean isCarryingRegion(final ServerName serverName, final RegionInfo regionInfo) {
578    // TODO: check for state?
579    final RegionStateNode node = regionStates.getRegionStateNode(regionInfo);
580    return (node != null && serverName.equals(node.getRegionLocation()));
581  }
582
583  private RegionInfo getMetaForRegion(final RegionInfo regionInfo) {
584    // if (regionInfo.isMetaRegion()) return regionInfo;
585    // TODO: handle multiple meta. if the region provided is not meta lookup
586    // which meta the region belongs to.
587    return RegionInfoBuilder.FIRST_META_REGIONINFO;
588  }
589
590  // TODO: handle multiple meta.
591  private static final Set<RegionInfo> META_REGION_SET =
592    Collections.singleton(RegionInfoBuilder.FIRST_META_REGIONINFO);
593
594  public Set<RegionInfo> getMetaRegionSet() {
595    return META_REGION_SET;
596  }
597
598  // ============================================================================================
599  // META Event(s) helpers
600  // ============================================================================================
601  /**
602   * Notice that, this only means the meta region is available on a RS, but the AM may still be
603   * loading the region states from meta, so usually you need to check {@link #isMetaLoaded()} first
604   * before checking this method, unless you can make sure that your piece of code can only be
605   * executed after AM builds the region states.
606   * @see #isMetaLoaded()
607   */
608  public boolean isMetaAssigned() {
609    return metaAssignEvent.isReady();
610  }
611
612  public boolean isMetaRegionInTransition() {
613    return !isMetaAssigned();
614  }
615
616  /**
617   * Notice that this event does not mean the AM has already finished region state rebuilding. See
618   * the comment of {@link #isMetaAssigned()} for more details.
619   * @see #isMetaAssigned()
620   */
621  public boolean waitMetaAssigned(Procedure<?> proc, RegionInfo regionInfo) {
622    return getMetaAssignEvent(getMetaForRegion(regionInfo)).suspendIfNotReady(proc);
623  }
624
625  private void setMetaAssigned(RegionInfo metaRegionInfo, boolean assigned) {
626    assert isMetaRegion(metaRegionInfo) : "unexpected non-meta region " + metaRegionInfo;
627    ProcedureEvent<?> metaAssignEvent = getMetaAssignEvent(metaRegionInfo);
628    if (assigned) {
629      metaAssignEvent.wake(getProcedureScheduler());
630    } else {
631      metaAssignEvent.suspend();
632    }
633  }
634
635  private ProcedureEvent<?> getMetaAssignEvent(RegionInfo metaRegionInfo) {
636    assert isMetaRegion(metaRegionInfo) : "unexpected non-meta region " + metaRegionInfo;
637    // TODO: handle multiple meta.
638    return metaAssignEvent;
639  }
640
641  /**
642   * Wait until AM finishes the meta loading, i.e, the region states rebuilding.
643   * @see #isMetaLoaded()
644   * @see #waitMetaAssigned(Procedure, RegionInfo)
645   */
646  public boolean waitMetaLoaded(Procedure<?> proc) {
647    return metaLoadEvent.suspendIfNotReady(proc);
648  }
649
650  /**
651   * This method will be called in master initialization method after calling
652   * {@link #processOfflineRegions()}, as in processOfflineRegions we will generate assign
653   * procedures for offline regions, which may be conflict with creating table.
654   * <p/>
655   * This is a bit dirty, should be reconsidered after we decide whether to keep the
656   * {@link #processOfflineRegions()} method.
657   */
658  public void wakeMetaLoadedEvent() {
659    metaLoadEvent.wake(getProcedureScheduler());
660    assert isMetaLoaded() : "expected meta to be loaded";
661  }
662
663  /**
664   * Return whether AM finishes the meta loading, i.e, the region states rebuilding.
665   * @see #isMetaAssigned()
666   * @see #waitMetaLoaded(Procedure)
667   */
668  public boolean isMetaLoaded() {
669    return metaLoadEvent.isReady();
670  }
671
672  /**
673   * Start a new thread to check if there are region servers whose versions are higher than others.
674   * If so, move all system table regions to RS with the highest version to keep compatibility. The
675   * reason is, RS in new version may not be able to access RS in old version when there are some
676   * incompatible changes.
677   * <p>
678   * This method is called when a new RegionServer is added to cluster only.
679   * </p>
680   */
681  public void checkIfShouldMoveSystemRegionAsync() {
682    // TODO: Fix this thread. If a server is killed and a new one started, this thread thinks that
683    // it should 'move' the system tables from the old server to the new server but
684    // ServerCrashProcedure is on it; and it will take care of the assign without dataloss.
685    if (this.master.getServerManager().countOfRegionServers() <= 1) {
686      return;
687    }
688    // This thread used to run whenever there was a change in the cluster. The ZooKeeper
689    // childrenChanged notification came in before the nodeDeleted message and so this method
690    // cold run before a ServerCrashProcedure could run. That meant that this thread could see
691    // a Crashed Server before ServerCrashProcedure and it could find system regions on the
692    // crashed server and go move them before ServerCrashProcedure had a chance; could be
693    // dataloss too if WALs were not recovered.
694    new Thread(() -> {
695      try {
696        synchronized (checkIfShouldMoveSystemRegionLock) {
697          List<RegionPlan> plans = new ArrayList<>();
698          // TODO: I don't think this code does a good job if all servers in cluster have same
699          // version. It looks like it will schedule unnecessary moves.
700          for (ServerName server : getExcludedServersForSystemTable()) {
701            if (master.getServerManager().isServerDead(server)) {
702              // TODO: See HBASE-18494 and HBASE-18495. Though getExcludedServersForSystemTable()
703              // considers only online servers, the server could be queued for dead server
704              // processing. As region assignments for crashed server is handled by
705              // ServerCrashProcedure, do NOT handle them here. The goal is to handle this through
706              // regular flow of LoadBalancer as a favored node and not to have this special
707              // handling.
708              continue;
709            }
710            List<RegionInfo> regionsShouldMove = getSystemTables(server);
711            if (!regionsShouldMove.isEmpty()) {
712              for (RegionInfo regionInfo : regionsShouldMove) {
713                // null value for dest forces destination server to be selected by balancer
714                RegionPlan plan = new RegionPlan(regionInfo, server, null);
715                if (regionInfo.isMetaRegion()) {
716                  // Must move meta region first.
717                  LOG.info("Async MOVE of {} to newer Server={}", regionInfo.getEncodedName(),
718                    server);
719                  moveAsync(plan);
720                } else {
721                  plans.add(plan);
722                }
723              }
724            }
725            for (RegionPlan plan : plans) {
726              LOG.info("Async MOVE of {} to newer Server={}", plan.getRegionInfo().getEncodedName(),
727                server);
728              moveAsync(plan);
729            }
730          }
731        }
732      } catch (Throwable t) {
733        LOG.error(t.toString(), t);
734      }
735    }).start();
736  }
737
738  private List<RegionInfo> getSystemTables(ServerName serverName) {
739    ServerStateNode serverNode = regionStates.getServerNode(serverName);
740    if (serverNode == null) {
741      return Collections.emptyList();
742    }
743    return serverNode.getSystemRegionInfoList();
744  }
745
746  private void preTransitCheck(RegionStateNode regionNode, RegionState.State[] expectedStates)
747    throws HBaseIOException {
748    if (regionNode.getProcedure() != null) {
749      throw new HBaseIOException(
750        regionNode + " is currently in transition; pid=" + regionNode.getProcedure().getProcId());
751    }
752    if (!regionNode.isInState(expectedStates)) {
753      throw new DoNotRetryRegionException(UNEXPECTED_STATE_REGION + regionNode);
754    }
755    if (isTableDisabled(regionNode.getTable())) {
756      throw new DoNotRetryIOException(regionNode.getTable() + " is disabled for " + regionNode);
757    }
758  }
759
760  /**
761   * Create an assign TransitRegionStateProcedure. Makes sure of RegionState. Throws exception if
762   * not appropriate UNLESS override is set. Used by hbck2 but also by straightline
763   * {@link #assign(RegionInfo, ServerName)} and {@link #assignAsync(RegionInfo, ServerName)}.
764   * @see #createAssignProcedure(RegionStateNode, ServerName) for a version that does NO checking
765   *      used when only when no checking needed.
766   * @param override If false, check RegionState is appropriate for assign; if not throw exception.
767   */
768  private TransitRegionStateProcedure createAssignProcedure(RegionInfo regionInfo, ServerName sn,
769    boolean override, boolean force) throws IOException {
770    RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo);
771    regionNode.lock();
772    try {
773      if (override) {
774        if (!force) {
775          preTransitCheck(regionNode, STATES_EXPECTED_ON_ASSIGN);
776        }
777        if (regionNode.getProcedure() != null) {
778          regionNode.unsetProcedure(regionNode.getProcedure());
779        }
780      } else {
781        preTransitCheck(regionNode, STATES_EXPECTED_ON_ASSIGN);
782      }
783      assert regionNode.getProcedure() == null;
784      return regionNode.setProcedure(
785        TransitRegionStateProcedure.assign(getProcedureEnvironment(), regionInfo, sn));
786    } finally {
787      regionNode.unlock();
788    }
789  }
790
791  /**
792   * Create an assign TransitRegionStateProcedure. Does NO checking of RegionState. Presumes
793   * appriopriate state ripe for assign.
794   * @see #createAssignProcedure(RegionInfo, ServerName, boolean, boolean)
795   */
796  private TransitRegionStateProcedure createAssignProcedure(RegionStateNode regionNode,
797    ServerName targetServer) {
798    regionNode.lock();
799    try {
800      return regionNode.setProcedure(TransitRegionStateProcedure.assign(getProcedureEnvironment(),
801        regionNode.getRegionInfo(), targetServer));
802    } finally {
803      regionNode.unlock();
804    }
805  }
806
807  public long assign(RegionInfo regionInfo, ServerName sn) throws IOException {
808    TransitRegionStateProcedure proc = createAssignProcedure(regionInfo, sn, false, false);
809    ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc);
810    return proc.getProcId();
811  }
812
813  public long assign(RegionInfo regionInfo) throws IOException {
814    return assign(regionInfo, null);
815  }
816
817  /**
818   * Submits a procedure that assigns a region to a target server without waiting for it to finish
819   * @param regionInfo the region we would like to assign
820   * @param sn         target server name
821   */
822  public Future<byte[]> assignAsync(RegionInfo regionInfo, ServerName sn) throws IOException {
823    return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(),
824      createAssignProcedure(regionInfo, sn, false, false));
825  }
826
827  /**
828   * Submits a procedure that assigns a region without waiting for it to finish
829   * @param regionInfo the region we would like to assign
830   */
831  public Future<byte[]> assignAsync(RegionInfo regionInfo) throws IOException {
832    return assignAsync(regionInfo, null);
833  }
834
835  public long unassign(RegionInfo regionInfo) throws IOException {
836    RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo);
837    if (regionNode == null) {
838      throw new UnknownRegionException("No RegionState found for " + regionInfo.getEncodedName());
839    }
840    TransitRegionStateProcedure proc;
841    regionNode.lock();
842    try {
843      preTransitCheck(regionNode, STATES_EXPECTED_ON_UNASSIGN_OR_MOVE);
844      proc = TransitRegionStateProcedure.unassign(getProcedureEnvironment(), regionInfo);
845      regionNode.setProcedure(proc);
846    } finally {
847      regionNode.unlock();
848    }
849    ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc);
850    return proc.getProcId();
851  }
852
853  public TransitRegionStateProcedure createMoveRegionProcedure(RegionInfo regionInfo,
854    ServerName targetServer) throws HBaseIOException {
855    RegionStateNode regionNode = this.regionStates.getRegionStateNode(regionInfo);
856    if (regionNode == null) {
857      throw new UnknownRegionException(
858        "No RegionStateNode found for " + regionInfo.getEncodedName() + "(Closed/Deleted?)");
859    }
860    TransitRegionStateProcedure proc;
861    regionNode.lock();
862    try {
863      preTransitCheck(regionNode, STATES_EXPECTED_ON_UNASSIGN_OR_MOVE);
864      regionNode.checkOnline();
865      proc = TransitRegionStateProcedure.move(getProcedureEnvironment(), regionInfo, targetServer);
866      regionNode.setProcedure(proc);
867    } finally {
868      regionNode.unlock();
869    }
870    return proc;
871  }
872
873  public void move(RegionInfo regionInfo) throws IOException {
874    TransitRegionStateProcedure proc = createMoveRegionProcedure(regionInfo, null);
875    ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc);
876  }
877
878  public Future<byte[]> moveAsync(RegionPlan regionPlan) throws HBaseIOException {
879    TransitRegionStateProcedure proc =
880      createMoveRegionProcedure(regionPlan.getRegionInfo(), regionPlan.getDestination());
881    return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), proc);
882  }
883
884  public Future<byte[]> balance(RegionPlan regionPlan) throws HBaseIOException {
885    ServerName current =
886      this.getRegionStates().getRegionAssignments().get(regionPlan.getRegionInfo());
887    if (current == null || !current.equals(regionPlan.getSource())) {
888      LOG.debug("Skip region plan {}, source server not match, current region location is {}",
889        regionPlan, current == null ? "(null)" : current);
890      return null;
891    }
892    return moveAsync(regionPlan);
893  }
894
895  // ============================================================================================
896  // RegionTransition procedures helpers
897  // ============================================================================================
898
899  /**
900   * Create round-robin assigns. Use on table creation to distribute out regions across cluster.
901   * @return AssignProcedures made out of the passed in <code>hris</code> and a call to the balancer
902   *         to populate the assigns with targets chosen using round-robin (default balancer
903   *         scheme). If at assign-time, the target chosen is no longer up, thats fine, the
904   *         AssignProcedure will ask the balancer for a new target, and so on.
905   */
906  public TransitRegionStateProcedure[] createRoundRobinAssignProcedures(List<RegionInfo> hris,
907    List<ServerName> serversToExclude) {
908    if (hris.isEmpty()) {
909      return new TransitRegionStateProcedure[0];
910    }
911
912    if (
913      serversToExclude != null && this.master.getServerManager().getOnlineServersList().size() == 1
914    ) {
915      LOG.debug("Only one region server found and hence going ahead with the assignment");
916      serversToExclude = null;
917    }
918    try {
919      // Ask the balancer to assign our regions. Pass the regions en masse. The balancer can do
920      // a better job if it has all the assignments in the one lump.
921      Map<ServerName, List<RegionInfo>> assignments = getBalancer().roundRobinAssignment(hris,
922        this.master.getServerManager().createDestinationServersList(serversToExclude));
923      // Return mid-method!
924      return createAssignProcedures(assignments);
925    } catch (IOException hioe) {
926      LOG.warn("Failed roundRobinAssignment", hioe);
927    }
928    // If an error above, fall-through to this simpler assign. Last resort.
929    return createAssignProcedures(hris);
930  }
931
932  /**
933   * Create round-robin assigns. Use on table creation to distribute out regions across cluster.
934   * @return AssignProcedures made out of the passed in <code>hris</code> and a call to the balancer
935   *         to populate the assigns with targets chosen using round-robin (default balancer
936   *         scheme). If at assign-time, the target chosen is no longer up, thats fine, the
937   *         AssignProcedure will ask the balancer for a new target, and so on.
938   */
939  public TransitRegionStateProcedure[] createRoundRobinAssignProcedures(List<RegionInfo> hris) {
940    return createRoundRobinAssignProcedures(hris, null);
941  }
942
943  static int compare(TransitRegionStateProcedure left, TransitRegionStateProcedure right) {
944    if (left.getRegion().isMetaRegion()) {
945      if (right.getRegion().isMetaRegion()) {
946        return RegionInfo.COMPARATOR.compare(left.getRegion(), right.getRegion());
947      }
948      return -1;
949    } else if (right.getRegion().isMetaRegion()) {
950      return +1;
951    }
952    if (left.getRegion().getTable().isSystemTable()) {
953      if (right.getRegion().getTable().isSystemTable()) {
954        return RegionInfo.COMPARATOR.compare(left.getRegion(), right.getRegion());
955      }
956      return -1;
957    } else if (right.getRegion().getTable().isSystemTable()) {
958      return +1;
959    }
960    return RegionInfo.COMPARATOR.compare(left.getRegion(), right.getRegion());
961  }
962
963  /**
964   * Create one TransitRegionStateProcedure to assign a region w/o specifying a target server. This
965   * method is called from HBCK2.
966   * @return an assign or null
967   */
968  public TransitRegionStateProcedure createOneAssignProcedure(RegionInfo ri, boolean override,
969    boolean force) {
970    TransitRegionStateProcedure trsp = null;
971    try {
972      trsp = createAssignProcedure(ri, null, override, force);
973    } catch (IOException ioe) {
974      LOG.info(
975        "Failed {} assign, override={}"
976          + (override ? "" : "; set override to by-pass state checks."),
977        ri.getEncodedName(), override, ioe);
978    }
979    return trsp;
980  }
981
982  /**
983   * Create one TransitRegionStateProcedure to unassign a region. This method is called from HBCK2.
984   * @return an unassign or null
985   */
986  public TransitRegionStateProcedure createOneUnassignProcedure(RegionInfo ri, boolean override,
987    boolean force) {
988    RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(ri);
989    TransitRegionStateProcedure trsp = null;
990    regionNode.lock();
991    try {
992      if (override) {
993        if (!force) {
994          preTransitCheck(regionNode, STATES_EXPECTED_ON_UNASSIGN_OR_MOVE);
995        }
996        if (regionNode.getProcedure() != null) {
997          regionNode.unsetProcedure(regionNode.getProcedure());
998        }
999      } else {
1000        // This is where we could throw an exception; i.e. override is false.
1001        preTransitCheck(regionNode, STATES_EXPECTED_ON_UNASSIGN_OR_MOVE);
1002      }
1003      assert regionNode.getProcedure() == null;
1004      trsp =
1005        TransitRegionStateProcedure.unassign(getProcedureEnvironment(), regionNode.getRegionInfo());
1006      regionNode.setProcedure(trsp);
1007    } catch (IOException ioe) {
1008      // 'override' must be false here.
1009      LOG.info("Failed {} unassign, override=false; set override to by-pass state checks.",
1010        ri.getEncodedName(), ioe);
1011    } finally {
1012      regionNode.unlock();
1013    }
1014    return trsp;
1015  }
1016
1017  /**
1018   * Create an array of TransitRegionStateProcedure w/o specifying a target server. Used as fallback
1019   * of caller is unable to do {@link #createAssignProcedures(Map)}.
1020   * <p/>
1021   * If no target server, at assign time, we will try to use the former location of the region if
1022   * one exists. This is how we 'retain' the old location across a server restart.
1023   * <p/>
1024   * Should only be called when you can make sure that no one can touch these regions other than
1025   * you. For example, when you are creating or enabling table. Presumes all Regions are in
1026   * appropriate state ripe for assign; no checking of Region state is done in here.
1027   * @see #createAssignProcedures(Map)
1028   */
1029  public TransitRegionStateProcedure[] createAssignProcedures(List<RegionInfo> hris) {
1030    return hris.stream().map(hri -> regionStates.getOrCreateRegionStateNode(hri))
1031      .map(regionNode -> createAssignProcedure(regionNode, null)).sorted(AssignmentManager::compare)
1032      .toArray(TransitRegionStateProcedure[]::new);
1033  }
1034
1035  /**
1036   * Tied to {@link #createAssignProcedures(List)} in that it is called if caller is unable to run
1037   * this method. Presumes all Regions are in appropriate state ripe for assign; no checking of
1038   * Region state is done in here.
1039   * @param assignments Map of assignments from which we produce an array of AssignProcedures.
1040   * @return Assignments made from the passed in <code>assignments</code>
1041   * @see #createAssignProcedures(List)
1042   */
1043  private TransitRegionStateProcedure[]
1044    createAssignProcedures(Map<ServerName, List<RegionInfo>> assignments) {
1045    return assignments.entrySet().stream()
1046      .flatMap(e -> e.getValue().stream().map(hri -> regionStates.getOrCreateRegionStateNode(hri))
1047        .map(regionNode -> createAssignProcedure(regionNode, e.getKey())))
1048      .sorted(AssignmentManager::compare).toArray(TransitRegionStateProcedure[]::new);
1049  }
1050
1051  // for creating unassign TRSP when disabling a table or closing excess region replicas
1052  private TransitRegionStateProcedure forceCreateUnssignProcedure(RegionStateNode regionNode) {
1053    regionNode.lock();
1054    try {
1055      if (regionNode.isInState(State.OFFLINE, State.CLOSED, State.SPLIT)) {
1056        return null;
1057      }
1058      // in general, a split parent should be in CLOSED or SPLIT state, but anyway, let's check it
1059      // here for safety
1060      if (regionNode.getRegionInfo().isSplit()) {
1061        LOG.warn("{} is a split parent but not in CLOSED or SPLIT state", regionNode);
1062        return null;
1063      }
1064      // As in DisableTableProcedure or ModifyTableProcedure, we will hold the xlock for table, so
1065      // we can make sure that this procedure has not been executed yet, as TRSP will hold the
1066      // shared lock for table all the time. So here we will unset it and when it is actually
1067      // executed, it will find that the attach procedure is not itself and quit immediately.
1068      if (regionNode.getProcedure() != null) {
1069        regionNode.unsetProcedure(regionNode.getProcedure());
1070      }
1071      return regionNode.setProcedure(TransitRegionStateProcedure.unassign(getProcedureEnvironment(),
1072        regionNode.getRegionInfo()));
1073    } finally {
1074      regionNode.unlock();
1075    }
1076  }
1077
1078  /**
1079   * Called by DisableTableProcedure to unassign all the regions for a table.
1080   */
1081  public TransitRegionStateProcedure[] createUnassignProceduresForDisabling(TableName tableName) {
1082    return regionStates.getTableRegionStateNodes(tableName).stream()
1083      .map(this::forceCreateUnssignProcedure).filter(p -> p != null)
1084      .toArray(TransitRegionStateProcedure[]::new);
1085  }
1086
1087  /**
1088   * Called by ModifyTableProcedure to unassign all the excess region replicas for a table. Will
1089   * skip submit unassign procedure if the region is in transition, so you may need to call this
1090   * method multiple times.
1091   * @param tableName       the table for closing excess region replicas
1092   * @param newReplicaCount the new replica count, should be less than current replica count
1093   * @param submit          for submitting procedure
1094   * @return the number of regions in transition that we can not schedule unassign procedures
1095   */
1096  public int submitUnassignProcedureForClosingExcessRegionReplicas(TableName tableName,
1097    int newReplicaCount, Consumer<TransitRegionStateProcedure> submit) {
1098    int inTransitionCount = 0;
1099    for (RegionStateNode regionNode : regionStates.getTableRegionStateNodes(tableName)) {
1100      regionNode.lock();
1101      try {
1102        if (regionNode.getRegionInfo().getReplicaId() >= newReplicaCount) {
1103          if (regionNode.isInTransition()) {
1104            LOG.debug("skip scheduling unassign procedure for {} when closing excess region "
1105              + "replicas since it is in transition", regionNode);
1106            inTransitionCount++;
1107            continue;
1108          }
1109          if (regionNode.isInState(State.OFFLINE, State.CLOSED, State.SPLIT)) {
1110            continue;
1111          }
1112          submit.accept(regionNode.setProcedure(TransitRegionStateProcedure
1113            .unassign(getProcedureEnvironment(), regionNode.getRegionInfo())));
1114        }
1115      } finally {
1116        regionNode.unlock();
1117      }
1118    }
1119    return inTransitionCount;
1120  }
1121
1122  public int numberOfUnclosedExcessRegionReplicas(TableName tableName, int newReplicaCount) {
1123    int unclosed = 0;
1124    for (RegionStateNode regionNode : regionStates.getTableRegionStateNodes(tableName)) {
1125      regionNode.lock();
1126      try {
1127        if (regionNode.getRegionInfo().getReplicaId() >= newReplicaCount) {
1128          if (!regionNode.isInState(State.OFFLINE, State.CLOSED, State.SPLIT)) {
1129            unclosed++;
1130          }
1131        }
1132      } finally {
1133        regionNode.unlock();
1134      }
1135    }
1136    return unclosed;
1137  }
1138
1139  public SplitTableRegionProcedure createSplitProcedure(final RegionInfo regionToSplit,
1140    final byte[] splitKey) throws IOException {
1141    return new SplitTableRegionProcedure(getProcedureEnvironment(), regionToSplit, splitKey);
1142  }
1143
1144  public TruncateRegionProcedure createTruncateRegionProcedure(final RegionInfo regionToTruncate)
1145    throws IOException {
1146    return new TruncateRegionProcedure(getProcedureEnvironment(), regionToTruncate);
1147  }
1148
1149  public MergeTableRegionsProcedure createMergeProcedure(RegionInfo... ris) throws IOException {
1150    return new MergeTableRegionsProcedure(getProcedureEnvironment(), ris, false);
1151  }
1152
1153  /**
1154   * Delete the region states. This is called by "DeleteTable"
1155   */
1156  public void deleteTable(final TableName tableName) throws IOException {
1157    final ArrayList<RegionInfo> regions = regionStates.getTableRegionsInfo(tableName);
1158    regionStateStore.deleteRegions(regions);
1159    for (int i = 0; i < regions.size(); ++i) {
1160      final RegionInfo regionInfo = regions.get(i);
1161      regionStates.deleteRegion(regionInfo);
1162    }
1163  }
1164
1165  // ============================================================================================
1166  // RS Region Transition Report helpers
1167  // ============================================================================================
1168  private void reportRegionStateTransition(ReportRegionStateTransitionResponse.Builder builder,
1169    ServerStateNode serverNode, List<RegionStateTransition> transitionList) throws IOException {
1170    for (RegionStateTransition transition : transitionList) {
1171      switch (transition.getTransitionCode()) {
1172        case OPENED:
1173        case FAILED_OPEN:
1174        case CLOSED:
1175          assert transition.getRegionInfoCount() == 1 : transition;
1176          final RegionInfo hri = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0));
1177          long procId =
1178            transition.getProcIdCount() > 0 ? transition.getProcId(0) : Procedure.NO_PROC_ID;
1179          updateRegionTransition(serverNode, transition.getTransitionCode(), hri,
1180            transition.hasOpenSeqNum() ? transition.getOpenSeqNum() : HConstants.NO_SEQNUM, procId);
1181          break;
1182        case READY_TO_SPLIT:
1183        case SPLIT:
1184        case SPLIT_REVERTED:
1185          assert transition.getRegionInfoCount() == 3 : transition;
1186          final RegionInfo parent = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0));
1187          final RegionInfo splitA = ProtobufUtil.toRegionInfo(transition.getRegionInfo(1));
1188          final RegionInfo splitB = ProtobufUtil.toRegionInfo(transition.getRegionInfo(2));
1189          updateRegionSplitTransition(serverNode, transition.getTransitionCode(), parent, splitA,
1190            splitB);
1191          break;
1192        case READY_TO_MERGE:
1193        case MERGED:
1194        case MERGE_REVERTED:
1195          assert transition.getRegionInfoCount() == 3 : transition;
1196          final RegionInfo merged = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0));
1197          final RegionInfo mergeA = ProtobufUtil.toRegionInfo(transition.getRegionInfo(1));
1198          final RegionInfo mergeB = ProtobufUtil.toRegionInfo(transition.getRegionInfo(2));
1199          updateRegionMergeTransition(serverNode, transition.getTransitionCode(), merged, mergeA,
1200            mergeB);
1201          break;
1202      }
1203    }
1204  }
1205
1206  public ReportRegionStateTransitionResponse reportRegionStateTransition(
1207    final ReportRegionStateTransitionRequest req) throws PleaseHoldException {
1208    ReportRegionStateTransitionResponse.Builder builder =
1209      ReportRegionStateTransitionResponse.newBuilder();
1210    ServerName serverName = ProtobufUtil.toServerName(req.getServer());
1211    ServerStateNode serverNode = regionStates.getServerNode(serverName);
1212    if (serverNode == null) {
1213      LOG.warn("No server node for {}", serverName);
1214      builder.setErrorMessage("No server node for " + serverName);
1215      return builder.build();
1216    }
1217    // here we have to acquire a read lock instead of a simple exclusive lock. This is because that
1218    // we should not block other reportRegionStateTransition call from the same region server. This
1219    // is not only about performance, but also to prevent dead lock. Think of the meta region is
1220    // also on the same region server and you hold the lock which blocks the
1221    // reportRegionStateTransition for meta, and since meta is not online, you will block inside the
1222    // lock protection to wait for meta online...
1223    serverNode.readLock().lock();
1224    try {
1225      // we only accept reportRegionStateTransition if the region server is online, see the comment
1226      // above in submitServerCrash method and HBASE-21508 for more details.
1227      if (serverNode.isInState(ServerState.ONLINE)) {
1228        try {
1229          reportRegionStateTransition(builder, serverNode, req.getTransitionList());
1230        } catch (PleaseHoldException e) {
1231          LOG.trace("Failed transition ", e);
1232          throw e;
1233        } catch (UnsupportedOperationException | IOException e) {
1234          // TODO: at the moment we have a single error message and the RS will abort
1235          // if the master says that one of the region transitions failed.
1236          LOG.warn("Failed transition", e);
1237          builder.setErrorMessage("Failed transition " + e.getMessage());
1238        }
1239      } else {
1240        LOG.warn("The region server {} is already dead, skip reportRegionStateTransition call",
1241          serverName);
1242        builder.setErrorMessage("You are dead");
1243      }
1244    } finally {
1245      serverNode.readLock().unlock();
1246    }
1247
1248    return builder.build();
1249  }
1250
1251  private void updateRegionTransition(ServerStateNode serverNode, TransitionCode state,
1252    RegionInfo regionInfo, long seqId, long procId) throws IOException {
1253    checkMetaLoaded(regionInfo);
1254
1255    RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo);
1256    if (regionNode == null) {
1257      // the table/region is gone. maybe a delete, split, merge
1258      throw new UnexpectedStateException(String.format(
1259        "Server %s was trying to transition region %s to %s. but Region is not known.",
1260        serverNode.getServerName(), regionInfo, state));
1261    }
1262    LOG.trace("Update region transition serverName={} region={} regionState={}",
1263      serverNode.getServerName(), regionNode, state);
1264
1265    regionNode.lock();
1266    try {
1267      if (!reportTransition(regionNode, serverNode, state, seqId, procId)) {
1268        // Don't log WARN if shutting down cluster; during shutdown. Avoid the below messages:
1269        // 2018-08-13 10:45:10,551 WARN ...AssignmentManager: No matching procedure found for
1270        // rit=OPEN, location=ve0538.halxg.cloudera.com,16020,1533493000958,
1271        // table=IntegrationTestBigLinkedList, region=65ab289e2fc1530df65f6c3d7cde7aa5 transition
1272        // to CLOSED
1273        // These happen because on cluster shutdown, we currently let the RegionServers close
1274        // regions. This is the only time that region close is not run by the Master (so cluster
1275        // goes down fast). Consider changing it so Master runs all shutdowns.
1276        if (
1277          this.master.getServerManager().isClusterShutdown() && state.equals(TransitionCode.CLOSED)
1278        ) {
1279          LOG.info("RegionServer {} {}", state, regionNode.getRegionInfo().getEncodedName());
1280        } else {
1281          LOG.warn("No matching procedure found for {} transition on {} to {}",
1282            serverNode.getServerName(), regionNode, state);
1283        }
1284      }
1285    } finally {
1286      regionNode.unlock();
1287    }
1288  }
1289
1290  private boolean reportTransition(RegionStateNode regionNode, ServerStateNode serverNode,
1291    TransitionCode state, long seqId, long procId) throws IOException {
1292    ServerName serverName = serverNode.getServerName();
1293    TransitRegionStateProcedure proc = regionNode.getProcedure();
1294    if (proc == null) {
1295      return false;
1296    }
1297    proc.reportTransition(master.getMasterProcedureExecutor().getEnvironment(), regionNode,
1298      serverName, state, seqId, procId);
1299    return true;
1300  }
1301
1302  private void updateRegionSplitTransition(final ServerStateNode serverNode,
1303    final TransitionCode state, final RegionInfo parent, final RegionInfo hriA,
1304    final RegionInfo hriB) throws IOException {
1305    checkMetaLoaded(parent);
1306
1307    if (state != TransitionCode.READY_TO_SPLIT) {
1308      throw new UnexpectedStateException(
1309        "unsupported split regionState=" + state + " for parent region " + parent
1310          + " maybe an old RS (< 2.0) had the operation in progress");
1311    }
1312
1313    // sanity check on the request
1314    if (!Bytes.equals(hriA.getEndKey(), hriB.getStartKey())) {
1315      throw new UnsupportedOperationException("unsupported split request with bad keys: parent="
1316        + parent + " hriA=" + hriA + " hriB=" + hriB);
1317    }
1318
1319    if (!master.isSplitOrMergeEnabled(MasterSwitchType.SPLIT)) {
1320      LOG.warn("Split switch is off! skip split of " + parent);
1321      throw new DoNotRetryIOException(
1322        "Split region " + parent.getRegionNameAsString() + " failed due to split switch off");
1323    }
1324
1325    // Submit the Split procedure
1326    final byte[] splitKey = hriB.getStartKey();
1327    if (LOG.isDebugEnabled()) {
1328      LOG.debug("Split request from {}, parent={}, splitKey={}", serverNode.getServerName(), parent,
1329        Bytes.toStringBinary(splitKey));
1330    }
1331    // Processing this report happens asynchronously from other activities which can mutate
1332    // the region state. For example, a split procedure may already be running for this parent.
1333    // A split procedure cannot succeed if the parent region is no longer open, so we can
1334    // ignore it in that case.
1335    // Note that submitting more than one split procedure for a given region is
1336    // harmless -- the split is fenced in the procedure handling -- but it would be noisy in
1337    // the logs. Only one procedure can succeed. The other procedure(s) would abort during
1338    // initialization and report failure with WARN level logging.
1339    RegionState parentState = regionStates.getRegionState(parent);
1340    if (parentState != null && parentState.isOpened()) {
1341      master.getMasterProcedureExecutor().submitProcedure(createSplitProcedure(parent, splitKey));
1342    } else {
1343      LOG.info("Ignoring split request from {}, parent={} because parent is unknown or not open",
1344        serverNode.getServerName(), parent);
1345      return;
1346    }
1347
1348    // If the RS is < 2.0 throw an exception to abort the operation, we are handling the split
1349    if (master.getServerManager().getVersionNumber(serverNode.getServerName()) < 0x0200000) {
1350      throw new UnsupportedOperationException(
1351        String.format("Split handled by the master: " + "parent=%s hriA=%s hriB=%s",
1352          parent.getShortNameToLog(), hriA, hriB));
1353    }
1354  }
1355
1356  private void updateRegionMergeTransition(final ServerStateNode serverNode,
1357    final TransitionCode state, final RegionInfo merged, final RegionInfo hriA,
1358    final RegionInfo hriB) throws IOException {
1359    checkMetaLoaded(merged);
1360
1361    if (state != TransitionCode.READY_TO_MERGE) {
1362      throw new UnexpectedStateException(
1363        "Unsupported merge regionState=" + state + " for regionA=" + hriA + " regionB=" + hriB
1364          + " merged=" + merged + " maybe an old RS (< 2.0) had the operation in progress");
1365    }
1366
1367    if (!master.isSplitOrMergeEnabled(MasterSwitchType.MERGE)) {
1368      LOG.warn("Merge switch is off! skip merge of regionA=" + hriA + " regionB=" + hriB);
1369      throw new DoNotRetryIOException(
1370        "Merge of regionA=" + hriA + " regionB=" + hriB + " failed because merge switch is off");
1371    }
1372
1373    // Submit the Merge procedure
1374    if (LOG.isDebugEnabled()) {
1375      LOG.debug("Handling merge request from RS=" + merged + ", merged=" + merged);
1376    }
1377    master.getMasterProcedureExecutor().submitProcedure(createMergeProcedure(hriA, hriB));
1378
1379    // If the RS is < 2.0 throw an exception to abort the operation, we are handling the merge
1380    if (master.getServerManager().getVersionNumber(serverNode.getServerName()) < 0x0200000) {
1381      throw new UnsupportedOperationException(
1382        String.format("Merge not handled yet: regionState=%s merged=%s hriA=%s hriB=%s", state,
1383          merged, hriA, hriB));
1384    }
1385  }
1386
1387  // ============================================================================================
1388  // RS Status update (report online regions) helpers
1389  // ============================================================================================
1390  /**
1391   * The master will call this method when the RS send the regionServerReport(). The report will
1392   * contains the "online regions". This method will check the the online regions against the
1393   * in-memory state of the AM, and we will log a warn message if there is a mismatch. This is
1394   * because that there is no fencing between the reportRegionStateTransition method and
1395   * regionServerReport method, so there could be race and introduce inconsistency here, but
1396   * actually there is no problem.
1397   * <p/>
1398   * Please see HBASE-21421 and HBASE-21463 for more details.
1399   */
1400  public void reportOnlineRegions(ServerName serverName, Set<byte[]> regionNames) {
1401    if (!isRunning()) {
1402      return;
1403    }
1404    if (LOG.isTraceEnabled()) {
1405      LOG.trace("ReportOnlineRegions {} regionCount={}, metaLoaded={} {}", serverName,
1406        regionNames.size(), isMetaLoaded(),
1407        regionNames.stream().map(Bytes::toStringBinary).collect(Collectors.toList()));
1408    }
1409
1410    ServerStateNode serverNode = regionStates.getServerNode(serverName);
1411    if (serverNode == null) {
1412      LOG.warn("Got a report from server {} where its server node is null", serverName);
1413      return;
1414    }
1415    serverNode.readLock().lock();
1416    try {
1417      if (!serverNode.isInState(ServerState.ONLINE)) {
1418        LOG.warn("Got a report from a server result in state {}", serverNode);
1419        return;
1420      }
1421    } finally {
1422      serverNode.readLock().unlock();
1423    }
1424
1425    // Track the regionserver reported online regions in memory.
1426    synchronized (rsReports) {
1427      rsReports.put(serverName, regionNames);
1428    }
1429
1430    if (regionNames.isEmpty()) {
1431      // nothing to do if we don't have regions
1432      LOG.trace("no online region found on {}", serverName);
1433      return;
1434    }
1435    if (!isMetaLoaded()) {
1436      // we are still on startup, skip checking
1437      return;
1438    }
1439    // The Heartbeat tells us of what regions are on the region serve, check the state.
1440    checkOnlineRegionsReport(serverNode, regionNames);
1441  }
1442
1443  /**
1444   * Close <code>regionName</code> on <code>sn</code> silently and immediately without using a
1445   * Procedure or going via hbase:meta. For case where a RegionServer's hosting of a Region is not
1446   * aligned w/ the Master's accounting of Region state. This is for cleaning up an error in
1447   * accounting.
1448   */
1449  private void closeRegionSilently(ServerName sn, byte[] regionName) {
1450    try {
1451      RegionInfo ri = CatalogFamilyFormat.parseRegionInfoFromRegionName(regionName);
1452      // Pass -1 for timeout. Means do not wait.
1453      ServerManager.closeRegionSilentlyAndWait(this.master.getAsyncClusterConnection(), sn, ri, -1);
1454    } catch (Exception e) {
1455      LOG.error("Failed trying to close {} on {}", Bytes.toStringBinary(regionName), sn, e);
1456    }
1457  }
1458
1459  /**
1460   * Check that what the RegionServer reports aligns with the Master's image. If disagreement, we
1461   * will tell the RegionServer to expediently close a Region we do not think it should have.
1462   */
1463  private void checkOnlineRegionsReport(ServerStateNode serverNode, Set<byte[]> regionNames) {
1464    ServerName serverName = serverNode.getServerName();
1465    for (byte[] regionName : regionNames) {
1466      if (!isRunning()) {
1467        return;
1468      }
1469      RegionStateNode regionNode = regionStates.getRegionStateNodeFromName(regionName);
1470      if (regionNode == null) {
1471        String regionNameAsStr = Bytes.toStringBinary(regionName);
1472        LOG.warn("No RegionStateNode for {} but reported as up on {}; closing...", regionNameAsStr,
1473          serverName);
1474        closeRegionSilently(serverNode.getServerName(), regionName);
1475        continue;
1476      }
1477      final long lag = 1000;
1478      // This is just a fallback check designed to identify unexpected data inconsistencies, so we
1479      // use tryLock to attempt to acquire the lock, and if the lock cannot be acquired, we skip the
1480      // check. This will not cause any additional problems and also prevents the regionServerReport
1481      // call from being stuck for too long which may cause deadlock on region assignment.
1482      if (regionNode.tryLock()) {
1483        try {
1484          long diff = EnvironmentEdgeManager.currentTime() - regionNode.getLastUpdate();
1485          if (regionNode.isInState(State.OPENING, State.OPEN)) {
1486            // This is possible as a region server has just closed a region but the region server
1487            // report is generated before the closing, but arrive after the closing. Make sure
1488            // there
1489            // is some elapsed time so less false alarms.
1490            if (!regionNode.getRegionLocation().equals(serverName) && diff > lag) {
1491              LOG.warn("Reporting {} server does not match {} (time since last "
1492                + "update={}ms); closing...", serverName, regionNode, diff);
1493              closeRegionSilently(serverNode.getServerName(), regionName);
1494            }
1495          } else if (!regionNode.isInState(State.CLOSING, State.SPLITTING)) {
1496            // So, we can get report that a region is CLOSED or SPLIT because a heartbeat
1497            // came in at about same time as a region transition. Make sure there is some
1498            // elapsed time so less false alarms.
1499            if (diff > lag) {
1500              LOG.warn("Reporting {} state does not match {} (time since last update={}ms)",
1501                serverName, regionNode, diff);
1502            }
1503          }
1504        } finally {
1505          regionNode.unlock();
1506        }
1507      } else {
1508        LOG.warn(
1509          "Unable to acquire lock for regionNode {}. It is likely that another thread is currently holding the lock. To avoid deadlock, skip execution for now.",
1510          regionNode);
1511      }
1512    }
1513  }
1514
1515  // ============================================================================================
1516  // RIT chore
1517  // ============================================================================================
1518  private static class RegionInTransitionChore extends ProcedureInMemoryChore<MasterProcedureEnv> {
1519    public RegionInTransitionChore(final int timeoutMsec) {
1520      super(timeoutMsec);
1521    }
1522
1523    @Override
1524    protected void periodicExecute(final MasterProcedureEnv env) {
1525      final AssignmentManager am = env.getAssignmentManager();
1526
1527      final RegionInTransitionStat ritStat = am.computeRegionInTransitionStat();
1528      if (ritStat.hasRegionsOverThreshold()) {
1529        for (RegionState hri : ritStat.getRegionOverThreshold()) {
1530          am.handleRegionOverStuckWarningThreshold(hri.getRegion());
1531        }
1532      }
1533
1534      // update metrics
1535      am.updateRegionsInTransitionMetrics(ritStat);
1536    }
1537  }
1538
1539  private static class DeadServerMetricRegionChore
1540    extends ProcedureInMemoryChore<MasterProcedureEnv> {
1541    public DeadServerMetricRegionChore(final int timeoutMsec) {
1542      super(timeoutMsec);
1543    }
1544
1545    @Override
1546    protected void periodicExecute(final MasterProcedureEnv env) {
1547      final ServerManager sm = env.getMasterServices().getServerManager();
1548      final AssignmentManager am = env.getAssignmentManager();
1549      // To minimize inconsistencies we are not going to snapshot live servers in advance in case
1550      // new servers are added; OTOH we don't want to add heavy sync for a consistent view since
1551      // this is for metrics. Instead, we're going to check each regions as we go; to avoid making
1552      // too many checks, we maintain a local lists of server, limiting us to false negatives. If
1553      // we miss some recently-dead server, we'll just see it next time.
1554      Set<ServerName> recentlyLiveServers = new HashSet<>();
1555      int deadRegions = 0, unknownRegions = 0;
1556      for (RegionStateNode rsn : am.getRegionStates().getRegionStateNodes()) {
1557        if (rsn.getState() != State.OPEN) {
1558          continue; // Opportunistic check, should quickly skip RITs, offline tables, etc.
1559        }
1560        // Do not need to acquire region state lock as this is only for showing metrics.
1561        ServerName sn = rsn.getRegionLocation();
1562        State state = rsn.getState();
1563        if (state != State.OPEN) {
1564          continue; // Mostly skipping RITs that are already being take care of.
1565        }
1566        if (sn == null) {
1567          ++unknownRegions; // Opened on null?
1568          continue;
1569        }
1570        if (recentlyLiveServers.contains(sn)) {
1571          continue;
1572        }
1573        ServerManager.ServerLiveState sls = sm.isServerKnownAndOnline(sn);
1574        switch (sls) {
1575          case LIVE:
1576            recentlyLiveServers.add(sn);
1577            break;
1578          case DEAD:
1579            ++deadRegions;
1580            break;
1581          case UNKNOWN:
1582            ++unknownRegions;
1583            break;
1584          default:
1585            throw new AssertionError("Unexpected " + sls);
1586        }
1587      }
1588      if (deadRegions > 0 || unknownRegions > 0) {
1589        LOG.info("Found {} OPEN regions on dead servers and {} OPEN regions on unknown servers",
1590          deadRegions, unknownRegions);
1591      }
1592
1593      am.updateDeadServerRegionMetrics(deadRegions, unknownRegions);
1594    }
1595  }
1596
1597  public RegionInTransitionStat computeRegionInTransitionStat() {
1598    final RegionInTransitionStat rit = new RegionInTransitionStat(getConfiguration());
1599    rit.update(this);
1600    return rit;
1601  }
1602
1603  public static class RegionInTransitionStat {
1604    private final int ritThreshold;
1605
1606    private HashMap<String, RegionState> ritsOverThreshold = null;
1607    private long statTimestamp;
1608    private long oldestRITTime = 0;
1609    private int totalRITsTwiceThreshold = 0;
1610    private int totalRITs = 0;
1611
1612    public RegionInTransitionStat(final Configuration conf) {
1613      this.ritThreshold =
1614        conf.getInt(METRICS_RIT_STUCK_WARNING_THRESHOLD, DEFAULT_RIT_STUCK_WARNING_THRESHOLD);
1615    }
1616
1617    public int getRITThreshold() {
1618      return ritThreshold;
1619    }
1620
1621    public long getTimestamp() {
1622      return statTimestamp;
1623    }
1624
1625    public int getTotalRITs() {
1626      return totalRITs;
1627    }
1628
1629    public long getOldestRITTime() {
1630      return oldestRITTime;
1631    }
1632
1633    public int getTotalRITsOverThreshold() {
1634      Map<String, RegionState> m = this.ritsOverThreshold;
1635      return m != null ? m.size() : 0;
1636    }
1637
1638    public boolean hasRegionsTwiceOverThreshold() {
1639      return totalRITsTwiceThreshold > 0;
1640    }
1641
1642    public boolean hasRegionsOverThreshold() {
1643      Map<String, RegionState> m = this.ritsOverThreshold;
1644      return m != null && !m.isEmpty();
1645    }
1646
1647    public Collection<RegionState> getRegionOverThreshold() {
1648      Map<String, RegionState> m = this.ritsOverThreshold;
1649      return m != null ? m.values() : Collections.emptySet();
1650    }
1651
1652    public boolean isRegionOverThreshold(final RegionInfo regionInfo) {
1653      Map<String, RegionState> m = this.ritsOverThreshold;
1654      return m != null && m.containsKey(regionInfo.getEncodedName());
1655    }
1656
1657    public boolean isRegionTwiceOverThreshold(final RegionInfo regionInfo) {
1658      Map<String, RegionState> m = this.ritsOverThreshold;
1659      if (m == null) {
1660        return false;
1661      }
1662      final RegionState state = m.get(regionInfo.getEncodedName());
1663      if (state == null) {
1664        return false;
1665      }
1666      return (statTimestamp - state.getStamp()) > (ritThreshold * 2);
1667    }
1668
1669    protected void update(final AssignmentManager am) {
1670      final RegionStates regionStates = am.getRegionStates();
1671      this.statTimestamp = EnvironmentEdgeManager.currentTime();
1672      update(regionStates.getRegionsStateInTransition(), statTimestamp);
1673      update(regionStates.getRegionFailedOpen(), statTimestamp);
1674
1675      if (LOG.isDebugEnabled() && ritsOverThreshold != null && !ritsOverThreshold.isEmpty()) {
1676        LOG.debug("RITs over threshold: {}",
1677          ritsOverThreshold.entrySet().stream()
1678            .map(e -> e.getKey() + ":" + e.getValue().getState().name())
1679            .collect(Collectors.joining("\n")));
1680      }
1681    }
1682
1683    private void update(final Collection<RegionState> regions, final long currentTime) {
1684      for (RegionState state : regions) {
1685        totalRITs++;
1686        final long ritStartedMs = state.getStamp();
1687        if (ritStartedMs == 0) {
1688          // Don't output bogus values to metrics if they accidentally make it here.
1689          LOG.warn("The RIT {} has no start time", state.getRegion());
1690          continue;
1691        }
1692        final long ritTime = currentTime - ritStartedMs;
1693        if (ritTime > ritThreshold) {
1694          if (ritsOverThreshold == null) {
1695            ritsOverThreshold = new HashMap<String, RegionState>();
1696          }
1697          ritsOverThreshold.put(state.getRegion().getEncodedName(), state);
1698          totalRITsTwiceThreshold += (ritTime > (ritThreshold * 2)) ? 1 : 0;
1699        }
1700        if (oldestRITTime < ritTime) {
1701          oldestRITTime = ritTime;
1702        }
1703      }
1704    }
1705  }
1706
1707  private void updateRegionsInTransitionMetrics(final RegionInTransitionStat ritStat) {
1708    metrics.updateRITOldestAge(ritStat.getOldestRITTime());
1709    metrics.updateRITCount(ritStat.getTotalRITs());
1710    metrics.updateRITCountOverThreshold(ritStat.getTotalRITsOverThreshold());
1711  }
1712
1713  private void updateDeadServerRegionMetrics(int deadRegions, int unknownRegions) {
1714    metrics.updateDeadServerOpenRegions(deadRegions);
1715    metrics.updateUnknownServerOpenRegions(unknownRegions);
1716  }
1717
1718  private void handleRegionOverStuckWarningThreshold(final RegionInfo regionInfo) {
1719    final RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo);
1720    // if (regionNode.isStuck()) {
1721    LOG.warn("STUCK Region-In-Transition {}", regionNode);
1722  }
1723
1724  // ============================================================================================
1725  // TODO: Master load/bootstrap
1726  // ============================================================================================
1727  public void joinCluster() throws IOException {
1728    long startTime = System.nanoTime();
1729    LOG.debug("Joining cluster...");
1730
1731    // Scan hbase:meta to build list of existing regions, servers, and assignment.
1732    // hbase:meta is online now or will be. Inside loadMeta, we keep trying. Can't make progress
1733    // w/o meta.
1734    loadMeta();
1735
1736    while (master.getServerManager().countOfRegionServers() < 1) {
1737      LOG.info("Waiting for RegionServers to join; current count={}",
1738        master.getServerManager().countOfRegionServers());
1739      Threads.sleep(250);
1740    }
1741    LOG.info("Number of RegionServers={}", master.getServerManager().countOfRegionServers());
1742
1743    // Start the chores
1744    master.getMasterProcedureExecutor().addChore(this.ritChore);
1745    master.getMasterProcedureExecutor().addChore(this.deadMetricChore);
1746
1747    long costMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
1748    LOG.info("Joined the cluster in {}", StringUtils.humanTimeDiff(costMs));
1749  }
1750
1751  /**
1752   * Create assign procedure for offline regions. Just follow the old
1753   * processofflineServersWithOnlineRegions method. Since now we do not need to deal with dead
1754   * server any more, we only deal with the regions in OFFLINE state in this method. And this is a
1755   * bit strange, that for new regions, we will add it in CLOSED state instead of OFFLINE state, and
1756   * usually there will be a procedure to track them. The processofflineServersWithOnlineRegions is
1757   * a legacy from long ago, as things are going really different now, maybe we do not need this
1758   * method any more. Need to revisit later.
1759   */
1760  // Public so can be run by the Master as part of the startup. Needs hbase:meta to be online.
1761  // Needs to be done after the table state manager has been started.
1762  public void processOfflineRegions() {
1763    TransitRegionStateProcedure[] procs =
1764      regionStates.getRegionStateNodes().stream().filter(rsn -> rsn.isInState(State.OFFLINE))
1765        .filter(rsn -> isTableEnabled(rsn.getRegionInfo().getTable())).map(rsn -> {
1766          rsn.lock();
1767          try {
1768            if (rsn.getProcedure() != null) {
1769              return null;
1770            } else {
1771              return rsn.setProcedure(TransitRegionStateProcedure.assign(getProcedureEnvironment(),
1772                rsn.getRegionInfo(), null));
1773            }
1774          } finally {
1775            rsn.unlock();
1776          }
1777        }).filter(p -> p != null).toArray(TransitRegionStateProcedure[]::new);
1778    if (procs.length > 0) {
1779      master.getMasterProcedureExecutor().submitProcedures(procs);
1780    }
1781  }
1782
1783  /*
1784   * AM internal RegionStateStore.RegionStateVisitor implementation. To be used when scanning META
1785   * table for region rows, using RegionStateStore utility methods. RegionStateStore methods will
1786   * convert Result into proper RegionInfo instances, but those would still need to be added into
1787   * AssignmentManager.regionStates in-memory cache. RegionMetaLoadingVisitor.visitRegionState
1788   * method provides the logic for adding RegionInfo instances as loaded from latest META scan into
1789   * AssignmentManager.regionStates.
1790   */
1791  private class RegionMetaLoadingVisitor implements RegionStateStore.RegionStateVisitor {
1792
1793    @Override
1794    public void visitRegionState(Result result, final RegionInfo regionInfo, final State state,
1795      final ServerName regionLocation, final ServerName lastHost, final long openSeqNum) {
1796      if (
1797        state == null && regionLocation == null && lastHost == null
1798          && openSeqNum == SequenceId.NO_SEQUENCE_ID
1799      ) {
1800        // This is a row with nothing in it.
1801        LOG.warn("Skipping empty row={}", result);
1802        return;
1803      }
1804      State localState = state;
1805      if (localState == null) {
1806        // No region state column data in hbase:meta table! Are I doing a rolling upgrade from
1807        // hbase1 to hbase2? Am I restoring a SNAPSHOT or otherwise adding a region to hbase:meta?
1808        // In any of these cases, state is empty. For now, presume OFFLINE but there are probably
1809        // cases where we need to probe more to be sure this correct; TODO informed by experience.
1810        LOG.info(regionInfo.getEncodedName() + " regionState=null; presuming " + State.OFFLINE);
1811        localState = State.OFFLINE;
1812      }
1813      RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo);
1814      // Do not need to lock on regionNode, as we can make sure that before we finish loading
1815      // meta, all the related procedures can not be executed. The only exception is for meta
1816      // region related operations, but here we do not load the informations for meta region.
1817      regionNode.setState(localState);
1818      regionNode.setLastHost(lastHost);
1819      regionNode.setRegionLocation(regionLocation);
1820      regionNode.setOpenSeqNum(openSeqNum);
1821
1822      // Note: keep consistent with other methods, see region(Opening|Opened|Closing)
1823      // RIT/ServerCrash handling should take care of the transiting regions.
1824      if (
1825        localState.matches(State.OPEN, State.OPENING, State.CLOSING, State.SPLITTING, State.MERGING)
1826      ) {
1827        assert regionLocation != null : "found null region location for " + regionNode;
1828        // TODO: this could lead to some orphan server state nodes, as it is possible that the
1829        // region server is already dead and its SCP has already finished but we have
1830        // persisted an opening state on this region server. Finally the TRSP will assign the
1831        // region to another region server, so it will not cause critical problems, just waste
1832        // some memory as no one will try to cleanup these orphan server state nodes.
1833        regionStates.createServer(regionLocation);
1834        regionStates.addRegionToServer(regionNode);
1835      } else if (localState == State.OFFLINE || regionInfo.isOffline()) {
1836        regionStates.addToOfflineRegions(regionNode);
1837      }
1838      if (regionNode.getProcedure() != null) {
1839        regionNode.getProcedure().stateLoaded(AssignmentManager.this, regionNode);
1840      }
1841    }
1842  };
1843
1844  /**
1845   * Attempt to load {@code regionInfo} from META, adding any results to the
1846   * {@link #regionStateStore} Is NOT aware of replica regions.
1847   * @param regionInfo the region to be loaded from META.
1848   * @throws IOException If some error occurs while querying META or parsing results.
1849   */
1850  public void populateRegionStatesFromMeta(@NonNull final RegionInfo regionInfo)
1851    throws IOException {
1852    final String regionEncodedName = RegionInfo.DEFAULT_REPLICA_ID == regionInfo.getReplicaId()
1853      ? regionInfo.getEncodedName()
1854      : RegionInfoBuilder.newBuilder(regionInfo).setReplicaId(RegionInfo.DEFAULT_REPLICA_ID).build()
1855        .getEncodedName();
1856    populateRegionStatesFromMeta(regionEncodedName);
1857  }
1858
1859  /**
1860   * Attempt to load {@code regionEncodedName} from META, adding any results to the
1861   * {@link #regionStateStore} Is NOT aware of replica regions.
1862   * @param regionEncodedName encoded name for the region to be loaded from META.
1863   * @throws IOException If some error occurs while querying META or parsing results.
1864   */
1865  public void populateRegionStatesFromMeta(@NonNull String regionEncodedName) throws IOException {
1866    final RegionMetaLoadingVisitor visitor = new RegionMetaLoadingVisitor();
1867    regionStateStore.visitMetaForRegion(regionEncodedName, visitor);
1868  }
1869
1870  private void loadMeta() throws IOException {
1871    // TODO: use a thread pool
1872    regionStateStore.visitMeta(new RegionMetaLoadingVisitor());
1873  }
1874
1875  /**
1876   * Used to check if the meta loading is done.
1877   * <p/>
1878   * if not we throw PleaseHoldException since we are rebuilding the RegionStates
1879   * @param hri region to check if it is already rebuild
1880   * @throws PleaseHoldException if meta has not been loaded yet
1881   */
1882  private void checkMetaLoaded(RegionInfo hri) throws PleaseHoldException {
1883    if (!isRunning()) {
1884      throw new PleaseHoldException("AssignmentManager not running");
1885    }
1886    boolean meta = isMetaRegion(hri);
1887    boolean metaLoaded = isMetaLoaded();
1888    if (!meta && !metaLoaded) {
1889      throw new PleaseHoldException(
1890        "Master not fully online; hbase:meta=" + meta + ", metaLoaded=" + metaLoaded);
1891    }
1892  }
1893
1894  // ============================================================================================
1895  // TODO: Metrics
1896  // ============================================================================================
1897  public int getNumRegionsOpened() {
1898    // TODO: Used by TestRegionPlacement.java and assume monotonically increasing value
1899    return 0;
1900  }
1901
1902  /**
1903   * Usually run by the Master in reaction to server crash during normal processing. Can also be
1904   * invoked via external RPC to effect repair; in the latter case, the 'force' flag is set so we
1905   * push through the SCP though context may indicate already-running-SCP (An old SCP may have
1906   * exited abnormally, or damaged cluster may still have references in hbase:meta to 'Unknown
1907   * Servers' -- servers that are not online or in dead servers list, etc.)
1908   * @param force Set if the request came in externally over RPC (via hbck2). Force means run the
1909   *              SCP even if it seems as though there might be an outstanding SCP running.
1910   * @return pid of scheduled SCP or {@link Procedure#NO_PROC_ID} if none scheduled.
1911   */
1912  public long submitServerCrash(ServerName serverName, boolean shouldSplitWal, boolean force) {
1913    // May be an 'Unknown Server' so handle case where serverNode is null.
1914    ServerStateNode serverNode = regionStates.getServerNode(serverName);
1915    // Remove the in-memory rsReports result
1916    synchronized (rsReports) {
1917      rsReports.remove(serverName);
1918    }
1919    if (serverNode == null) {
1920      if (force) {
1921        LOG.info("Force adding ServerCrashProcedure for {} when server node is null", serverName);
1922      } else {
1923        // for normal case, do not schedule SCP if ServerStateNode is null
1924        LOG.warn("Skip adding ServerCrashProcedure for {} because server node is null", serverName);
1925        return Procedure.NO_PROC_ID;
1926      }
1927    }
1928
1929    ProcedureExecutor<MasterProcedureEnv> procExec = this.master.getMasterProcedureExecutor();
1930    // We hold the write lock here for fencing on reportRegionStateTransition. Once we set the
1931    // server state to CRASHED, we will no longer accept the reportRegionStateTransition call from
1932    // this server. This is used to simplify the implementation for TRSP and SCP, where we can make
1933    // sure that, the region list fetched by SCP will not be changed any more.
1934    if (serverNode != null) {
1935      serverNode.writeLock().lock();
1936    }
1937    try {
1938
1939      boolean carryingMeta = isCarryingMeta(serverName);
1940      if (serverNode != null && !serverNode.isInState(ServerState.ONLINE)) {
1941        if (force) {
1942          LOG.info("Force adding ServerCrashProcedure for {} (meta={}) when state is not {}",
1943            serverNode, carryingMeta, ServerState.ONLINE);
1944        } else {
1945          LOG.info("Skip adding ServerCrashProcedure for {} (meta={}) when state is not {}",
1946            serverNode, carryingMeta, ServerState.ONLINE);
1947          return Procedure.NO_PROC_ID;
1948        }
1949      }
1950      MasterProcedureEnv mpe = procExec.getEnvironment();
1951      // If serverNode == null, then 'Unknown Server'. Schedule HBCKSCP instead.
1952      // HBCKSCP scours Master in-memory state AND hbase;meta for references to
1953      // serverName just-in-case. An SCP that is scheduled when the server is
1954      // 'Unknown' probably originated externally with HBCK2 fix-it tool.
1955      ServerState oldState = null;
1956      if (serverNode != null) {
1957        oldState = serverNode.getState();
1958        serverNode.setState(ServerState.CRASHED);
1959      }
1960      ServerCrashProcedure scp = force
1961        ? new HBCKServerCrashProcedure(mpe, serverName, shouldSplitWal, carryingMeta)
1962        : new ServerCrashProcedure(mpe, serverName, shouldSplitWal, carryingMeta);
1963      long pid = procExec.submitProcedure(scp);
1964      LOG.info("Scheduled ServerCrashProcedure pid={} for {} (carryingMeta={}){}.", pid, serverName,
1965        carryingMeta,
1966        serverNode == null ? "" : " " + serverNode.toString() + ", oldState=" + oldState);
1967      return pid;
1968    } finally {
1969      if (serverNode != null) {
1970        serverNode.writeLock().unlock();
1971      }
1972    }
1973  }
1974
1975  public void offlineRegion(final RegionInfo regionInfo) {
1976    // TODO used by MasterRpcServices
1977    RegionStateNode node = regionStates.getRegionStateNode(regionInfo);
1978    if (node != null) {
1979      node.offline();
1980    }
1981  }
1982
1983  public void onlineRegion(final RegionInfo regionInfo, final ServerName serverName) {
1984    // TODO used by TestSplitTransactionOnCluster.java
1985  }
1986
1987  public Map<ServerName, List<RegionInfo>>
1988    getSnapShotOfAssignment(final Collection<RegionInfo> regions) {
1989    return regionStates.getSnapShotOfAssignment(regions);
1990  }
1991
1992  // ============================================================================================
1993  // TODO: UTILS/HELPERS?
1994  // ============================================================================================
1995  /**
1996   * Used by the client (via master) to identify if all regions have the schema updates
1997   * @return Pair indicating the status of the alter command (pending/total)
1998   */
1999  public Pair<Integer, Integer> getReopenStatus(TableName tableName) {
2000    if (isTableDisabled(tableName)) {
2001      return new Pair<Integer, Integer>(0, 0);
2002    }
2003
2004    final List<RegionState> states = regionStates.getTableRegionStates(tableName);
2005    int ritCount = 0;
2006    for (RegionState regionState : states) {
2007      if (!regionState.isOpened() && !regionState.isSplit()) {
2008        ritCount++;
2009      }
2010    }
2011    return new Pair<Integer, Integer>(ritCount, states.size());
2012  }
2013
2014  // ============================================================================================
2015  // TODO: Region State In Transition
2016  // ============================================================================================
2017  public boolean hasRegionsInTransition() {
2018    return regionStates.hasRegionsInTransition();
2019  }
2020
2021  public List<RegionStateNode> getRegionsInTransition() {
2022    return regionStates.getRegionsInTransition();
2023  }
2024
2025  public List<RegionInfo> getAssignedRegions() {
2026    return regionStates.getAssignedRegions();
2027  }
2028
2029  /**
2030   * Resolve a cached {@link RegionInfo} from the region name as a {@code byte[]}.
2031   */
2032  public RegionInfo getRegionInfo(final byte[] regionName) {
2033    final RegionStateNode regionState = regionStates.getRegionStateNodeFromName(regionName);
2034    return regionState != null ? regionState.getRegionInfo() : null;
2035  }
2036
2037  /**
2038   * Resolve a cached {@link RegionInfo} from the encoded region name as a {@code String}.
2039   */
2040  public RegionInfo getRegionInfo(final String encodedRegionName) {
2041    final RegionStateNode regionState =
2042      regionStates.getRegionStateNodeFromEncodedRegionName(encodedRegionName);
2043    return regionState != null ? regionState.getRegionInfo() : null;
2044  }
2045
2046  // ============================================================================================
2047  // Expected states on region state transition.
2048  // Notice that there is expected states for transiting to OPENING state, this is because SCP.
2049  // See the comments in regionOpening method for more details.
2050  // ============================================================================================
2051  private static final State[] STATES_EXPECTED_ON_OPEN = { State.OPENING, // Normal case
2052    State.OPEN // Retrying
2053  };
2054
2055  private static final State[] STATES_EXPECTED_ON_CLOSING = { State.OPEN, // Normal case
2056    State.CLOSING, // Retrying
2057    State.SPLITTING, // Offline the split parent
2058    State.MERGING // Offline the merge parents
2059  };
2060
2061  private static final State[] STATES_EXPECTED_ON_CLOSED = { State.CLOSING, // Normal case
2062    State.CLOSED // Retrying
2063  };
2064
2065  // This is for manually scheduled region assign, can add other states later if we find out other
2066  // usages
2067  private static final State[] STATES_EXPECTED_ON_ASSIGN = { State.CLOSED, State.OFFLINE };
2068
2069  // We only allow unassign or move a region which is in OPEN state.
2070  private static final State[] STATES_EXPECTED_ON_UNASSIGN_OR_MOVE = { State.OPEN };
2071
2072  // ============================================================================================
2073  // Region Status update
2074  // Should only be called in TransitRegionStateProcedure(and related procedures), as the locking
2075  // and pre-assumptions are very tricky.
2076  // ============================================================================================
2077  private CompletableFuture<Void> transitStateAndUpdate(RegionStateNode regionNode,
2078    RegionState.State newState, RegionState.State... expectedStates) {
2079    RegionState.State state = regionNode.getState();
2080    try {
2081      regionNode.transitionState(newState, expectedStates);
2082    } catch (UnexpectedStateException e) {
2083      return FutureUtils.failedFuture(e);
2084    }
2085    CompletableFuture<Void> future = regionStateStore.updateRegionLocation(regionNode);
2086    FutureUtils.addListener(future, (r, e) -> {
2087      if (e != null) {
2088        // revert
2089        regionNode.setState(state);
2090      }
2091    });
2092    return future;
2093  }
2094
2095  // should be called within the synchronized block of RegionStateNode
2096  CompletableFuture<Void> regionOpening(RegionStateNode regionNode) {
2097    // As in SCP, for performance reason, there is no TRSP attached with this region, we will not
2098    // update the region state, which means that the region could be in any state when we want to
2099    // assign it after a RS crash. So here we do not pass the expectedStates parameter.
2100    return transitStateAndUpdate(regionNode, State.OPENING).thenAccept(r -> {
2101      ServerStateNode serverNode = regionStates.getServerNode(regionNode.getRegionLocation());
2102      // Here the server node could be null. For example, we want to assign the region to a given
2103      // region server and it crashes, and it is the region server which holds hbase:meta, then the
2104      // above transitStateAndUpdate call will never succeed until we finishes the SCP for it. But
2105      // after the SCP finishes, the server node will be removed, so when we arrive there, the
2106      // server
2107      // node will be null. This is not a big problem if we skip adding it, as later we will fail to
2108      // execute the remote procedure on the region server and then try to assign to another region
2109      // server
2110      if (serverNode != null) {
2111        serverNode.addRegion(regionNode);
2112      }
2113      // update the operation count metrics
2114      metrics.incrementOperationCounter();
2115    });
2116  }
2117
2118  // should be called under the RegionStateNode lock
2119  // The parameter 'giveUp' means whether we will try to open the region again, if it is true, then
2120  // we will persist the FAILED_OPEN state into hbase:meta.
2121  CompletableFuture<Void> regionFailedOpen(RegionStateNode regionNode, boolean giveUp) {
2122    RegionState.State state = regionNode.getState();
2123    ServerName regionLocation = regionNode.getRegionLocation();
2124    if (!giveUp) {
2125      if (regionLocation != null) {
2126        regionStates.removeRegionFromServer(regionLocation, regionNode);
2127      }
2128      return CompletableFuture.completedFuture(null);
2129    }
2130    regionNode.setState(State.FAILED_OPEN);
2131    regionNode.setRegionLocation(null);
2132    CompletableFuture<Void> future = regionStateStore.updateRegionLocation(regionNode);
2133    FutureUtils.addListener(future, (r, e) -> {
2134      if (e == null) {
2135        if (regionLocation != null) {
2136          regionStates.removeRegionFromServer(regionLocation, regionNode);
2137        }
2138      } else {
2139        // revert
2140        regionNode.setState(state);
2141        regionNode.setRegionLocation(regionLocation);
2142      }
2143    });
2144    return future;
2145  }
2146
2147  // should be called under the RegionStateNode lock
2148  CompletableFuture<Void> regionClosing(RegionStateNode regionNode) {
2149    return transitStateAndUpdate(regionNode, State.CLOSING, STATES_EXPECTED_ON_CLOSING)
2150      .thenAccept(r -> {
2151        RegionInfo hri = regionNode.getRegionInfo();
2152        // Set meta has not initialized early. so people trying to create/edit tables will wait
2153        if (isMetaRegion(hri)) {
2154          setMetaAssigned(hri, false);
2155        }
2156        // update the operation count metrics
2157        metrics.incrementOperationCounter();
2158      });
2159  }
2160
2161  // for open and close, they will first be persist to the procedure store in
2162  // RegionRemoteProcedureBase. So here we will first change the in memory state as it is considered
2163  // as succeeded if the persistence to procedure store is succeeded, and then when the
2164  // RegionRemoteProcedureBase is woken up, we will persist the RegionStateNode to hbase:meta.
2165
2166  // should be called under the RegionStateNode lock
2167  void regionOpenedWithoutPersistingToMeta(RegionStateNode regionNode)
2168    throws UnexpectedStateException {
2169    regionNode.transitionState(State.OPEN, STATES_EXPECTED_ON_OPEN);
2170    RegionInfo regionInfo = regionNode.getRegionInfo();
2171    regionStates.addRegionToServer(regionNode);
2172    regionStates.removeFromFailedOpen(regionInfo);
2173  }
2174
2175  // should be called under the RegionStateNode lock
2176  void regionClosedWithoutPersistingToMeta(RegionStateNode regionNode)
2177    throws UnexpectedStateException {
2178    ServerName regionLocation = regionNode.getRegionLocation();
2179    regionNode.transitionState(State.CLOSED, STATES_EXPECTED_ON_CLOSED);
2180    regionNode.setRegionLocation(null);
2181    if (regionLocation != null) {
2182      regionNode.setLastHost(regionLocation);
2183      regionStates.removeRegionFromServer(regionLocation, regionNode);
2184    }
2185  }
2186
2187  // should be called under the RegionStateNode lock
2188  CompletableFuture<Void> persistToMeta(RegionStateNode regionNode) {
2189    return regionStateStore.updateRegionLocation(regionNode).thenAccept(r -> {
2190      RegionInfo regionInfo = regionNode.getRegionInfo();
2191      if (isMetaRegion(regionInfo) && regionNode.getState() == State.OPEN) {
2192        // Usually we'd set a table ENABLED at this stage but hbase:meta is ALWAYs enabled, it
2193        // can't be disabled -- so skip the RPC (besides... enabled is managed by TableStateManager
2194        // which is backed by hbase:meta... Avoid setting ENABLED to avoid having to update state
2195        // on table that contains state.
2196        setMetaAssigned(regionInfo, true);
2197      }
2198    });
2199  }
2200
2201  // should be called under the RegionStateNode lock
2202  // for SCP
2203  public CompletableFuture<Void> regionClosedAbnormally(RegionStateNode regionNode) {
2204    RegionState.State state = regionNode.getState();
2205    ServerName regionLocation = regionNode.getRegionLocation();
2206    regionNode.setState(State.ABNORMALLY_CLOSED);
2207    regionNode.setRegionLocation(null);
2208    CompletableFuture<Void> future = regionStateStore.updateRegionLocation(regionNode);
2209    FutureUtils.addListener(future, (r, e) -> {
2210      if (e == null) {
2211        if (regionLocation != null) {
2212          regionNode.setLastHost(regionLocation);
2213          regionStates.removeRegionFromServer(regionLocation, regionNode);
2214        }
2215      } else {
2216        // revert
2217        regionNode.setState(state);
2218        regionNode.setRegionLocation(regionLocation);
2219      }
2220    });
2221    return future;
2222  }
2223
2224  // ============================================================================================
2225  // The above methods can only be called in TransitRegionStateProcedure(and related procedures)
2226  // ============================================================================================
2227
2228  public void markRegionAsSplit(final RegionInfo parent, final ServerName serverName,
2229    final RegionInfo daughterA, final RegionInfo daughterB) throws IOException {
2230    // Update hbase:meta. Parent will be marked offline and split up in hbase:meta.
2231    // The parent stays in regionStates until cleared when removed by CatalogJanitor.
2232    // Update its state in regionStates to it shows as offline and split when read
2233    // later figuring what regions are in a table and what are not: see
2234    // regionStates#getRegionsOfTable
2235    final RegionStateNode node = regionStates.getOrCreateRegionStateNode(parent);
2236    node.setState(State.SPLIT);
2237    final RegionStateNode nodeA = regionStates.getOrCreateRegionStateNode(daughterA);
2238    nodeA.setState(State.SPLITTING_NEW);
2239    final RegionStateNode nodeB = regionStates.getOrCreateRegionStateNode(daughterB);
2240    nodeB.setState(State.SPLITTING_NEW);
2241
2242    TableDescriptor td = master.getTableDescriptors().get(parent.getTable());
2243    // TODO: here we just update the parent region info in meta, to set split and offline to true,
2244    // without changing the one in the region node. This is a bit confusing but the region info
2245    // field in RegionStateNode is not expected to be changed in the current design. Need to find a
2246    // possible way to address this problem, or at least adding more comments about the trick to
2247    // deal with this problem, that when you want to filter out split parent, you need to check both
2248    // the RegionState on whether it is split, and also the region info. If one of them matches then
2249    // it is a split parent. And usually only one of them can match, as after restart, the region
2250    // state will be changed from SPLIT to CLOSED.
2251    regionStateStore.splitRegion(parent, daughterA, daughterB, serverName, td);
2252    if (shouldAssignFavoredNodes(parent)) {
2253      List<ServerName> onlineServers = this.master.getServerManager().getOnlineServersList();
2254      getFavoredNodePromoter().generateFavoredNodesForDaughter(onlineServers, parent, daughterA,
2255        daughterB);
2256    }
2257  }
2258
2259  /**
2260   * When called here, the merge has happened. The merged regions have been unassigned and the above
2261   * markRegionClosed has been called on each so they have been disassociated from a hosting Server.
2262   * The merged region will be open after this call. The merged regions are removed from hbase:meta
2263   * below. Later they are deleted from the filesystem by the catalog janitor running against
2264   * hbase:meta. It notices when the merged region no longer holds references to the old regions
2265   * (References are deleted after a compaction rewrites what the Reference points at but not until
2266   * the archiver chore runs, are the References removed).
2267   */
2268  public void markRegionAsMerged(final RegionInfo child, final ServerName serverName,
2269    RegionInfo[] mergeParents) throws IOException {
2270    final RegionStateNode node = regionStates.getOrCreateRegionStateNode(child);
2271    node.setState(State.MERGED);
2272    for (RegionInfo ri : mergeParents) {
2273      regionStates.deleteRegion(ri);
2274    }
2275    TableDescriptor td = master.getTableDescriptors().get(child.getTable());
2276    regionStateStore.mergeRegions(child, mergeParents, serverName, td);
2277    if (shouldAssignFavoredNodes(child)) {
2278      getFavoredNodePromoter().generateFavoredNodesForMergedRegion(child, mergeParents);
2279    }
2280  }
2281
2282  /*
2283   * Favored nodes should be applied only when FavoredNodes balancer is configured and the region
2284   * belongs to a non-system table.
2285   */
2286  private boolean shouldAssignFavoredNodes(RegionInfo region) {
2287    return this.shouldAssignRegionsWithFavoredNodes
2288      && FavoredNodesManager.isFavoredNodeApplicable(region);
2289  }
2290
2291  // ============================================================================================
2292  // Assign Queue (Assign/Balance)
2293  // ============================================================================================
2294  private final ArrayList<RegionStateNode> pendingAssignQueue = new ArrayList<RegionStateNode>();
2295  private final ReentrantLock assignQueueLock = new ReentrantLock();
2296  private final Condition assignQueueFullCond = assignQueueLock.newCondition();
2297
2298  /**
2299   * Add the assign operation to the assignment queue. The pending assignment operation will be
2300   * processed, and each region will be assigned by a server using the balancer.
2301   */
2302  protected void queueAssign(final RegionStateNode regionNode) {
2303    regionNode.getProcedureEvent().suspend();
2304
2305    // TODO: quick-start for meta and the other sys-tables?
2306    assignQueueLock.lock();
2307    try {
2308      pendingAssignQueue.add(regionNode);
2309      if (
2310        regionNode.isSystemTable() || pendingAssignQueue.size() == 1
2311          || pendingAssignQueue.size() >= assignDispatchWaitQueueMaxSize
2312      ) {
2313        assignQueueFullCond.signal();
2314      }
2315    } finally {
2316      assignQueueLock.unlock();
2317    }
2318  }
2319
2320  private void startAssignmentThread() {
2321    assignThread = new Thread(master.getServerName().toShortString()) {
2322      @Override
2323      public void run() {
2324        while (isRunning()) {
2325          processAssignQueue();
2326        }
2327        pendingAssignQueue.clear();
2328      }
2329    };
2330    assignThread.setDaemon(true);
2331    assignThread.start();
2332  }
2333
2334  private void stopAssignmentThread() {
2335    assignQueueSignal();
2336    try {
2337      while (assignThread.isAlive()) {
2338        assignQueueSignal();
2339        assignThread.join(250);
2340      }
2341    } catch (InterruptedException e) {
2342      LOG.warn("join interrupted", e);
2343      Thread.currentThread().interrupt();
2344    }
2345  }
2346
2347  private void assignQueueSignal() {
2348    assignQueueLock.lock();
2349    try {
2350      assignQueueFullCond.signal();
2351    } finally {
2352      assignQueueLock.unlock();
2353    }
2354  }
2355
2356  @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP")
2357  private HashMap<RegionInfo, RegionStateNode> waitOnAssignQueue() {
2358    HashMap<RegionInfo, RegionStateNode> regions = null;
2359
2360    assignQueueLock.lock();
2361    try {
2362      if (pendingAssignQueue.isEmpty() && isRunning()) {
2363        assignQueueFullCond.await();
2364      }
2365
2366      if (!isRunning()) {
2367        return null;
2368      }
2369      assignQueueFullCond.await(assignDispatchWaitMillis, TimeUnit.MILLISECONDS);
2370      regions = new HashMap<RegionInfo, RegionStateNode>(pendingAssignQueue.size());
2371      for (RegionStateNode regionNode : pendingAssignQueue) {
2372        regions.put(regionNode.getRegionInfo(), regionNode);
2373      }
2374      pendingAssignQueue.clear();
2375    } catch (InterruptedException e) {
2376      LOG.warn("got interrupted ", e);
2377      Thread.currentThread().interrupt();
2378    } finally {
2379      assignQueueLock.unlock();
2380    }
2381    return regions;
2382  }
2383
2384  private void processAssignQueue() {
2385    final HashMap<RegionInfo, RegionStateNode> regions = waitOnAssignQueue();
2386    if (regions == null || regions.size() == 0 || !isRunning()) {
2387      return;
2388    }
2389
2390    if (LOG.isTraceEnabled()) {
2391      LOG.trace("PROCESS ASSIGN QUEUE regionCount=" + regions.size());
2392    }
2393
2394    // TODO: Optimize balancer. pass a RegionPlan?
2395    final HashMap<RegionInfo, ServerName> retainMap = new HashMap<>();
2396    final List<RegionInfo> userHRIs = new ArrayList<>(regions.size());
2397    // Regions for system tables requiring reassignment
2398    final List<RegionInfo> systemHRIs = new ArrayList<>();
2399    for (RegionStateNode regionStateNode : regions.values()) {
2400      boolean sysTable = regionStateNode.isSystemTable();
2401      final List<RegionInfo> hris = sysTable ? systemHRIs : userHRIs;
2402      if (regionStateNode.getRegionLocation() != null) {
2403        retainMap.put(regionStateNode.getRegionInfo(), regionStateNode.getRegionLocation());
2404      } else {
2405        hris.add(regionStateNode.getRegionInfo());
2406      }
2407    }
2408
2409    // TODO: connect with the listener to invalidate the cache
2410
2411    // TODO use events
2412    List<ServerName> servers = master.getServerManager().createDestinationServersList();
2413    for (int i = 0; servers.size() < 1; ++i) {
2414      // Report every fourth time around this loop; try not to flood log.
2415      if (i % 4 == 0) {
2416        LOG.warn("No servers available; cannot place " + regions.size() + " unassigned regions.");
2417      }
2418
2419      if (!isRunning()) {
2420        LOG.debug("Stopped! Dropping assign of " + regions.size() + " queued regions.");
2421        return;
2422      }
2423      Threads.sleep(250);
2424      servers = master.getServerManager().createDestinationServersList();
2425    }
2426
2427    if (!systemHRIs.isEmpty()) {
2428      // System table regions requiring reassignment are present, get region servers
2429      // not available for system table regions
2430      final List<ServerName> excludeServers = getExcludedServersForSystemTable();
2431      List<ServerName> serversForSysTables =
2432        servers.stream().filter(s -> !excludeServers.contains(s)).collect(Collectors.toList());
2433      if (serversForSysTables.isEmpty()) {
2434        LOG.warn("Filtering old server versions and the excluded produced an empty set; "
2435          + "instead considering all candidate servers!");
2436      }
2437      LOG.debug("Processing assignQueue; systemServersCount=" + serversForSysTables.size()
2438        + ", allServersCount=" + servers.size());
2439      processAssignmentPlans(regions, null, systemHRIs,
2440        serversForSysTables.isEmpty() && !containsBogusAssignments(regions, systemHRIs)
2441          ? servers
2442          : serversForSysTables);
2443    }
2444
2445    processAssignmentPlans(regions, retainMap, userHRIs, servers);
2446  }
2447
2448  private boolean containsBogusAssignments(Map<RegionInfo, RegionStateNode> regions,
2449    List<RegionInfo> hirs) {
2450    for (RegionInfo ri : hirs) {
2451      if (
2452        regions.get(ri).getRegionLocation() != null
2453          && regions.get(ri).getRegionLocation().equals(LoadBalancer.BOGUS_SERVER_NAME)
2454      ) {
2455        return true;
2456      }
2457    }
2458    return false;
2459  }
2460
2461  private void processAssignmentPlans(final HashMap<RegionInfo, RegionStateNode> regions,
2462    final HashMap<RegionInfo, ServerName> retainMap, final List<RegionInfo> hris,
2463    final List<ServerName> servers) {
2464    boolean isTraceEnabled = LOG.isTraceEnabled();
2465    if (isTraceEnabled) {
2466      LOG.trace("Available servers count=" + servers.size() + ": " + servers);
2467    }
2468
2469    final LoadBalancer balancer = getBalancer();
2470    // ask the balancer where to place regions
2471    if (retainMap != null && !retainMap.isEmpty()) {
2472      if (isTraceEnabled) {
2473        LOG.trace("retain assign regions=" + retainMap);
2474      }
2475      try {
2476        acceptPlan(regions, balancer.retainAssignment(retainMap, servers));
2477      } catch (IOException e) {
2478        LOG.warn("unable to retain assignment", e);
2479        addToPendingAssignment(regions, retainMap.keySet());
2480      }
2481    }
2482
2483    // TODO: Do we need to split retain and round-robin?
2484    // the retain seems to fallback to round-robin/random if the region is not in the map.
2485    if (!hris.isEmpty()) {
2486      Collections.sort(hris, RegionInfo.COMPARATOR);
2487      if (isTraceEnabled) {
2488        LOG.trace("round robin regions=" + hris);
2489      }
2490      try {
2491        acceptPlan(regions, balancer.roundRobinAssignment(hris, servers));
2492      } catch (IOException e) {
2493        LOG.warn("unable to round-robin assignment", e);
2494        addToPendingAssignment(regions, hris);
2495      }
2496    }
2497  }
2498
2499  private void acceptPlan(final HashMap<RegionInfo, RegionStateNode> regions,
2500    final Map<ServerName, List<RegionInfo>> plan) throws HBaseIOException {
2501    final ProcedureEvent<?>[] events = new ProcedureEvent[regions.size()];
2502    final long st = EnvironmentEdgeManager.currentTime();
2503
2504    if (plan.isEmpty()) {
2505      throw new HBaseIOException("unable to compute plans for regions=" + regions.size());
2506    }
2507
2508    int evcount = 0;
2509    for (Map.Entry<ServerName, List<RegionInfo>> entry : plan.entrySet()) {
2510      final ServerName server = entry.getKey();
2511      for (RegionInfo hri : entry.getValue()) {
2512        final RegionStateNode regionNode = regions.get(hri);
2513        regionNode.setRegionLocation(server);
2514        if (server.equals(LoadBalancer.BOGUS_SERVER_NAME) && regionNode.isSystemTable()) {
2515          assignQueueLock.lock();
2516          try {
2517            pendingAssignQueue.add(regionNode);
2518          } finally {
2519            assignQueueLock.unlock();
2520          }
2521        } else {
2522          events[evcount++] = regionNode.getProcedureEvent();
2523        }
2524      }
2525    }
2526    ProcedureEvent.wakeEvents(getProcedureScheduler(), events);
2527
2528    final long et = EnvironmentEdgeManager.currentTime();
2529    if (LOG.isTraceEnabled()) {
2530      LOG.trace("ASSIGN ACCEPT " + events.length + " -> " + StringUtils.humanTimeDiff(et - st));
2531    }
2532  }
2533
2534  private void addToPendingAssignment(final HashMap<RegionInfo, RegionStateNode> regions,
2535    final Collection<RegionInfo> pendingRegions) {
2536    assignQueueLock.lock();
2537    try {
2538      for (RegionInfo hri : pendingRegions) {
2539        pendingAssignQueue.add(regions.get(hri));
2540      }
2541    } finally {
2542      assignQueueLock.unlock();
2543    }
2544  }
2545
2546  /**
2547   * For a given cluster with mixed versions of servers, get a list of servers with lower versions,
2548   * where system table regions should not be assigned to. For system table, we must assign regions
2549   * to a server with highest version. However, we can disable this exclusion using config:
2550   * "hbase.min.version.move.system.tables" if checkForMinVersion is true. Detailed explanation
2551   * available with definition of minVersionToMoveSysTables.
2552   * @return List of Excluded servers for System table regions.
2553   */
2554  public List<ServerName> getExcludedServersForSystemTable() {
2555    // TODO: This should be a cached list kept by the ServerManager rather than calculated on each
2556    // move or system region assign. The RegionServerTracker keeps list of online Servers with
2557    // RegionServerInfo that includes Version.
2558    List<Pair<ServerName, String>> serverList =
2559      master.getServerManager().getOnlineServersList().stream()
2560        .map(s -> new Pair<>(s, master.getRegionServerVersion(s))).collect(Collectors.toList());
2561    if (serverList.isEmpty()) {
2562      return new ArrayList<>();
2563    }
2564    String highestVersion = Collections
2565      .max(serverList, (o1, o2) -> VersionInfo.compareVersion(o1.getSecond(), o2.getSecond()))
2566      .getSecond();
2567    if (!DEFAULT_MIN_VERSION_MOVE_SYS_TABLES_CONFIG.equals(minVersionToMoveSysTables)) {
2568      int comparedValue = VersionInfo.compareVersion(minVersionToMoveSysTables, highestVersion);
2569      if (comparedValue > 0) {
2570        return new ArrayList<>();
2571      }
2572    }
2573    return serverList.stream().filter(pair -> !pair.getSecond().equals(highestVersion))
2574      .map(Pair::getFirst).collect(Collectors.toList());
2575  }
2576
2577  MasterServices getMaster() {
2578    return master;
2579  }
2580
2581  /** Returns a snapshot of rsReports */
2582  public Map<ServerName, Set<byte[]>> getRSReports() {
2583    Map<ServerName, Set<byte[]>> rsReportsSnapshot = new HashMap<>();
2584    synchronized (rsReports) {
2585      rsReports.entrySet().forEach(e -> rsReportsSnapshot.put(e.getKey(), e.getValue()));
2586    }
2587    return rsReportsSnapshot;
2588  }
2589
2590  /**
2591   * Provide regions state count for given table. e.g howmany regions of give table are
2592   * opened/closed/rit etc
2593   * @param tableName TableName
2594   * @return region states count
2595   */
2596  public RegionStatesCount getRegionStatesCount(TableName tableName) {
2597    int openRegionsCount = 0;
2598    int closedRegionCount = 0;
2599    int ritCount = 0;
2600    int splitRegionCount = 0;
2601    int totalRegionCount = 0;
2602    if (!isTableDisabled(tableName)) {
2603      final List<RegionState> states = regionStates.getTableRegionStates(tableName);
2604      for (RegionState regionState : states) {
2605        if (regionState.isOpened()) {
2606          openRegionsCount++;
2607        } else if (regionState.isClosed()) {
2608          closedRegionCount++;
2609        } else if (regionState.isSplit()) {
2610          splitRegionCount++;
2611        }
2612      }
2613      totalRegionCount = states.size();
2614      ritCount = totalRegionCount - openRegionsCount - splitRegionCount;
2615    }
2616    return new RegionStatesCount.RegionStatesCountBuilder().setOpenRegions(openRegionsCount)
2617      .setClosedRegions(closedRegionCount).setSplitRegions(splitRegionCount)
2618      .setRegionsInTransition(ritCount).setTotalRegions(totalRegionCount).build();
2619  }
2620
2621}