001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.assignment;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Collection;
023import java.util.Collections;
024import java.util.HashMap;
025import java.util.HashSet;
026import java.util.List;
027import java.util.Map;
028import java.util.Set;
029import java.util.concurrent.Future;
030import java.util.concurrent.TimeUnit;
031import java.util.concurrent.atomic.AtomicBoolean;
032import java.util.concurrent.locks.Condition;
033import java.util.concurrent.locks.ReentrantLock;
034import java.util.stream.Collectors;
035
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.hbase.DoNotRetryIOException;
038import org.apache.hadoop.hbase.HBaseIOException;
039import org.apache.hadoop.hbase.HConstants;
040import org.apache.hadoop.hbase.MetaTableAccessor;
041import org.apache.hadoop.hbase.PleaseHoldException;
042import org.apache.hadoop.hbase.ServerName;
043import org.apache.hadoop.hbase.TableName;
044import org.apache.hadoop.hbase.UnknownRegionException;
045import org.apache.hadoop.hbase.client.DoNotRetryRegionException;
046import org.apache.hadoop.hbase.client.MasterSwitchType;
047import org.apache.hadoop.hbase.client.RegionInfo;
048import org.apache.hadoop.hbase.client.RegionInfoBuilder;
049import org.apache.hadoop.hbase.client.RegionStatesCount;
050import org.apache.hadoop.hbase.client.Result;
051import org.apache.hadoop.hbase.client.TableState;
052import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
053import org.apache.hadoop.hbase.favored.FavoredNodesManager;
054import org.apache.hadoop.hbase.favored.FavoredNodesPromoter;
055import org.apache.hadoop.hbase.master.LoadBalancer;
056import org.apache.hadoop.hbase.master.MasterServices;
057import org.apache.hadoop.hbase.master.MetricsAssignmentManager;
058import org.apache.hadoop.hbase.master.RegionPlan;
059import org.apache.hadoop.hbase.master.RegionState;
060import org.apache.hadoop.hbase.master.RegionState.State;
061import org.apache.hadoop.hbase.master.ServerManager;
062import org.apache.hadoop.hbase.master.TableStateManager;
063import org.apache.hadoop.hbase.master.balancer.FavoredStochasticBalancer;
064import org.apache.hadoop.hbase.master.procedure.HBCKServerCrashProcedure;
065import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
066import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler;
067import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait;
068import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
069import org.apache.hadoop.hbase.procedure2.Procedure;
070import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
071import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
072import org.apache.hadoop.hbase.procedure2.ProcedureInMemoryChore;
073import org.apache.hadoop.hbase.procedure2.util.StringUtils;
074import org.apache.hadoop.hbase.regionserver.SequenceId;
075import org.apache.hadoop.hbase.rsgroup.RSGroupBasedLoadBalancer;
076import org.apache.hadoop.hbase.util.Bytes;
077import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
078import org.apache.hadoop.hbase.util.HasThread;
079import org.apache.hadoop.hbase.util.Pair;
080import org.apache.hadoop.hbase.util.Threads;
081import org.apache.hadoop.hbase.util.VersionInfo;
082import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
083import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
084import org.apache.yetus.audience.InterfaceAudience;
085import org.apache.zookeeper.KeeperException;
086import org.slf4j.Logger;
087import org.slf4j.LoggerFactory;
088
089import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
090
091import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
092import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
093import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
094import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
095import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
096
097/**
098 * The AssignmentManager is the coordinator for region assign/unassign operations.
099 * <ul>
100 * <li>In-memory states of regions and servers are stored in {@link RegionStates}.</li>
101 * <li>hbase:meta state updates are handled by {@link RegionStateStore}.</li>
102 * </ul>
103 * Regions are created by CreateTable, Split, Merge.
104 * Regions are deleted by DeleteTable, Split, Merge.
105 * Assigns are triggered by CreateTable, EnableTable, Split, Merge, ServerCrash.
106 * Unassigns are triggered by DisableTable, Split, Merge
107 */
108@InterfaceAudience.Private
109public class AssignmentManager {
110  private static final Logger LOG = LoggerFactory.getLogger(AssignmentManager.class);
111
112  // TODO: AMv2
113  //  - handle region migration from hbase1 to hbase2.
114  //  - handle sys table assignment first (e.g. acl, namespace)
115  //  - handle table priorities
116  //  - If ServerBusyException trying to update hbase:meta, we abort the Master
117  //   See updateRegionLocation in RegionStateStore.
118  //
119  // See also
120  // https://docs.google.com/document/d/1eVKa7FHdeoJ1-9o8yZcOTAQbv0u0bblBlCCzVSIn69g/edit#heading=h.ystjyrkbtoq5
121  // for other TODOs.
122
123  public static final String BOOTSTRAP_THREAD_POOL_SIZE_CONF_KEY =
124      "hbase.assignment.bootstrap.thread.pool.size";
125
126  public static final String ASSIGN_DISPATCH_WAIT_MSEC_CONF_KEY =
127      "hbase.assignment.dispatch.wait.msec";
128  private static final int DEFAULT_ASSIGN_DISPATCH_WAIT_MSEC = 150;
129
130  public static final String ASSIGN_DISPATCH_WAITQ_MAX_CONF_KEY =
131      "hbase.assignment.dispatch.wait.queue.max.size";
132  private static final int DEFAULT_ASSIGN_DISPATCH_WAITQ_MAX = 100;
133
134  public static final String RIT_CHORE_INTERVAL_MSEC_CONF_KEY =
135      "hbase.assignment.rit.chore.interval.msec";
136  private static final int DEFAULT_RIT_CHORE_INTERVAL_MSEC = 60 * 1000;
137
138  public static final String DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC_CONF_KEY =
139      "hbase.assignment.dead.region.metric.chore.interval.msec";
140  private static final int DEFAULT_DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC = 120 * 1000;
141
142  public static final String ASSIGN_MAX_ATTEMPTS =
143      "hbase.assignment.maximum.attempts";
144  private static final int DEFAULT_ASSIGN_MAX_ATTEMPTS = Integer.MAX_VALUE;
145
146  public static final String ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS =
147      "hbase.assignment.retry.immediately.maximum.attempts";
148  private static final int DEFAULT_ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS = 3;
149
150  /** Region in Transition metrics threshold time */
151  public static final String METRICS_RIT_STUCK_WARNING_THRESHOLD =
152      "hbase.metrics.rit.stuck.warning.threshold";
153  private static final int DEFAULT_RIT_STUCK_WARNING_THRESHOLD = 60 * 1000;
154
155  private final ProcedureEvent<?> metaAssignEvent = new ProcedureEvent<>("meta assign");
156  private final ProcedureEvent<?> metaLoadEvent = new ProcedureEvent<>("meta load");
157
158  private final MetricsAssignmentManager metrics;
159  private final RegionInTransitionChore ritChore;
160  private final DeadServerMetricRegionChore deadMetricChore;
161  private final MasterServices master;
162
163  private final AtomicBoolean running = new AtomicBoolean(false);
164  private final RegionStates regionStates = new RegionStates();
165  private final RegionStateStore regionStateStore;
166
167  private final Map<ServerName, Set<byte[]>> rsReports = new HashMap<>();
168
169  private final boolean shouldAssignRegionsWithFavoredNodes;
170  private final int assignDispatchWaitQueueMaxSize;
171  private final int assignDispatchWaitMillis;
172  private final int assignMaxAttempts;
173  private final int assignRetryImmediatelyMaxAttempts;
174
175  private final Object checkIfShouldMoveSystemRegionLock = new Object();
176
177  private Thread assignThread;
178
179  public AssignmentManager(final MasterServices master) {
180    this(master, new RegionStateStore(master));
181  }
182
183  @VisibleForTesting
184  AssignmentManager(final MasterServices master, final RegionStateStore stateStore) {
185    this.master = master;
186    this.regionStateStore = stateStore;
187    this.metrics = new MetricsAssignmentManager();
188
189    final Configuration conf = master.getConfiguration();
190
191    // Only read favored nodes if using the favored nodes load balancer.
192    this.shouldAssignRegionsWithFavoredNodes = FavoredStochasticBalancer.class.isAssignableFrom(
193        conf.getClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, Object.class));
194
195    this.assignDispatchWaitMillis = conf.getInt(ASSIGN_DISPATCH_WAIT_MSEC_CONF_KEY,
196        DEFAULT_ASSIGN_DISPATCH_WAIT_MSEC);
197    this.assignDispatchWaitQueueMaxSize = conf.getInt(ASSIGN_DISPATCH_WAITQ_MAX_CONF_KEY,
198        DEFAULT_ASSIGN_DISPATCH_WAITQ_MAX);
199
200    this.assignMaxAttempts = Math.max(1, conf.getInt(ASSIGN_MAX_ATTEMPTS,
201        DEFAULT_ASSIGN_MAX_ATTEMPTS));
202    this.assignRetryImmediatelyMaxAttempts = conf.getInt(ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS,
203        DEFAULT_ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS);
204
205    int ritChoreInterval = conf.getInt(RIT_CHORE_INTERVAL_MSEC_CONF_KEY,
206        DEFAULT_RIT_CHORE_INTERVAL_MSEC);
207    this.ritChore = new RegionInTransitionChore(ritChoreInterval);
208
209    int deadRegionChoreInterval = conf.getInt(DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC_CONF_KEY,
210        DEFAULT_DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC);
211    if (deadRegionChoreInterval > 0) {
212      this.deadMetricChore = new DeadServerMetricRegionChore(deadRegionChoreInterval);
213    } else {
214      this.deadMetricChore = null;
215    }
216  }
217
218  public void start() throws IOException, KeeperException {
219    if (!running.compareAndSet(false, true)) {
220      return;
221    }
222
223    LOG.trace("Starting assignment manager");
224
225    // Start the Assignment Thread
226    startAssignmentThread();
227
228    // load meta region state
229    ZKWatcher zkw = master.getZooKeeper();
230    // it could be null in some tests
231    if (zkw != null) {
232      RegionState regionState = MetaTableLocator.getMetaRegionState(zkw);
233      RegionStateNode regionNode =
234        regionStates.getOrCreateRegionStateNode(RegionInfoBuilder.FIRST_META_REGIONINFO);
235      regionNode.lock();
236      try {
237        regionNode.setRegionLocation(regionState.getServerName());
238        regionNode.setState(regionState.getState());
239        if (regionNode.getProcedure() != null) {
240          regionNode.getProcedure().stateLoaded(this, regionNode);
241        }
242        setMetaAssigned(regionState.getRegion(), regionState.getState() == State.OPEN);
243      } finally {
244        regionNode.unlock();
245      }
246    }
247  }
248
249  /**
250   * Create RegionStateNode based on the TRSP list, and attach the TRSP to the RegionStateNode.
251   * <p>
252   * This is used to restore the RIT region list, so we do not need to restore it in the loadingMeta
253   * method below. And it is also very important as now before submitting a TRSP, we need to attach
254   * it to the RegionStateNode, which acts like a guard, so we need to restore this information at
255   * the very beginning, before we start processing any procedures.
256   */
257  public void setupRIT(List<TransitRegionStateProcedure> procs) {
258    procs.forEach(proc -> {
259      RegionInfo regionInfo = proc.getRegion();
260      RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo);
261      TransitRegionStateProcedure existingProc = regionNode.getProcedure();
262      if (existingProc != null) {
263        // This is possible, as we will detach the procedure from the RSN before we
264        // actually finish the procedure. This is because that, we will detach the TRSP from the RSN
265        // during execution, at that time, the procedure has not been marked as done in the pv2
266        // framework yet, so it is possible that we schedule a new TRSP immediately and when
267        // arriving here, we will find out that there are multiple TRSPs for the region. But we can
268        // make sure that, only the last one can take the charge, the previous ones should have all
269        // been finished already. So here we will compare the proc id, the greater one will win.
270        if (existingProc.getProcId() < proc.getProcId()) {
271          // the new one wins, unset and set it to the new one below
272          regionNode.unsetProcedure(existingProc);
273        } else {
274          // the old one wins, skip
275          return;
276        }
277      }
278      LOG.info("Attach {} to {} to restore RIT", proc, regionNode);
279      regionNode.setProcedure(proc);
280    });
281  }
282
283  public void stop() {
284    if (!running.compareAndSet(true, false)) {
285      return;
286    }
287
288    LOG.info("Stopping assignment manager");
289
290    // The AM is started before the procedure executor,
291    // but the actual work will be loaded/submitted only once we have the executor
292    final boolean hasProcExecutor = master.getMasterProcedureExecutor() != null;
293
294    // Remove the RIT chore
295    if (hasProcExecutor) {
296      master.getMasterProcedureExecutor().removeChore(this.ritChore);
297      if (this.deadMetricChore != null) {
298        master.getMasterProcedureExecutor().removeChore(this.deadMetricChore);
299      }
300    }
301
302    // Stop the Assignment Thread
303    stopAssignmentThread();
304
305    // Stop the RegionStateStore
306    regionStates.clear();
307
308    // Update meta events (for testing)
309    if (hasProcExecutor) {
310      metaLoadEvent.suspend();
311      for (RegionInfo hri: getMetaRegionSet()) {
312        setMetaAssigned(hri, false);
313      }
314    }
315  }
316
317  public boolean isRunning() {
318    return running.get();
319  }
320
321  public Configuration getConfiguration() {
322    return master.getConfiguration();
323  }
324
325  public MetricsAssignmentManager getAssignmentManagerMetrics() {
326    return metrics;
327  }
328
329  private LoadBalancer getBalancer() {
330    return master.getLoadBalancer();
331  }
332
333  private FavoredNodesPromoter getFavoredNodePromoter() {
334    return (FavoredNodesPromoter) ((RSGroupBasedLoadBalancer) master.getLoadBalancer())
335      .getInternalBalancer();
336  }
337
338  private MasterProcedureEnv getProcedureEnvironment() {
339    return master.getMasterProcedureExecutor().getEnvironment();
340  }
341
342  private MasterProcedureScheduler getProcedureScheduler() {
343    return getProcedureEnvironment().getProcedureScheduler();
344  }
345
346  int getAssignMaxAttempts() {
347    return assignMaxAttempts;
348  }
349
350  int getAssignRetryImmediatelyMaxAttempts() {
351    return assignRetryImmediatelyMaxAttempts;
352  }
353
354  public RegionStates getRegionStates() {
355    return regionStates;
356  }
357
358  /**
359   * Returns the regions hosted by the specified server.
360   * <p/>
361   * Notice that, for SCP, after we submit the SCP, no one can change the region list for the
362   * ServerStateNode so we do not need any locks here. And for other usage, this can only give you a
363   * snapshot of the current region list for this server, which means, right after you get the
364   * region list, new regions may be moved to this server or some regions may be moved out from this
365   * server, so you should not use it critically if you need strong consistency.
366   */
367  public List<RegionInfo> getRegionsOnServer(ServerName serverName) {
368    ServerStateNode serverInfo = regionStates.getServerNode(serverName);
369    if (serverInfo == null) {
370      return Collections.emptyList();
371    }
372    return serverInfo.getRegionInfoList();
373  }
374
375  public RegionStateStore getRegionStateStore() {
376    return regionStateStore;
377  }
378
379  public List<ServerName> getFavoredNodes(final RegionInfo regionInfo) {
380    return this.shouldAssignRegionsWithFavoredNodes
381      ? getFavoredNodePromoter().getFavoredNodes(regionInfo)
382      : ServerName.EMPTY_SERVER_LIST;
383  }
384
385  // ============================================================================================
386  //  Table State Manager helpers
387  // ============================================================================================
388  TableStateManager getTableStateManager() {
389    return master.getTableStateManager();
390  }
391
392  public boolean isTableEnabled(final TableName tableName) {
393    return getTableStateManager().isTableState(tableName, TableState.State.ENABLED);
394  }
395
396  public boolean isTableDisabled(final TableName tableName) {
397    return getTableStateManager().isTableState(tableName,
398      TableState.State.DISABLED, TableState.State.DISABLING);
399  }
400
401  // ============================================================================================
402  //  META Helpers
403  // ============================================================================================
404  private boolean isMetaRegion(final RegionInfo regionInfo) {
405    return regionInfo.isMetaRegion();
406  }
407
408  public boolean isMetaRegion(final byte[] regionName) {
409    return getMetaRegionFromName(regionName) != null;
410  }
411
412  public RegionInfo getMetaRegionFromName(final byte[] regionName) {
413    for (RegionInfo hri: getMetaRegionSet()) {
414      if (Bytes.equals(hri.getRegionName(), regionName)) {
415        return hri;
416      }
417    }
418    return null;
419  }
420
421  public boolean isCarryingMeta(final ServerName serverName) {
422    // TODO: handle multiple meta
423    return isCarryingRegion(serverName, RegionInfoBuilder.FIRST_META_REGIONINFO);
424  }
425
426  private boolean isCarryingRegion(final ServerName serverName, final RegionInfo regionInfo) {
427    // TODO: check for state?
428    final RegionStateNode node = regionStates.getRegionStateNode(regionInfo);
429    return(node != null && serverName.equals(node.getRegionLocation()));
430  }
431
432  private RegionInfo getMetaForRegion(final RegionInfo regionInfo) {
433    //if (regionInfo.isMetaRegion()) return regionInfo;
434    // TODO: handle multiple meta. if the region provided is not meta lookup
435    // which meta the region belongs to.
436    return RegionInfoBuilder.FIRST_META_REGIONINFO;
437  }
438
439  // TODO: handle multiple meta.
440  private static final Set<RegionInfo> META_REGION_SET =
441      Collections.singleton(RegionInfoBuilder.FIRST_META_REGIONINFO);
442  public Set<RegionInfo> getMetaRegionSet() {
443    return META_REGION_SET;
444  }
445
446  // ============================================================================================
447  //  META Event(s) helpers
448  // ============================================================================================
449  /**
450   * Notice that, this only means the meta region is available on a RS, but the AM may still be
451   * loading the region states from meta, so usually you need to check {@link #isMetaLoaded()} first
452   * before checking this method, unless you can make sure that your piece of code can only be
453   * executed after AM builds the region states.
454   * @see #isMetaLoaded()
455   */
456  public boolean isMetaAssigned() {
457    return metaAssignEvent.isReady();
458  }
459
460  public boolean isMetaRegionInTransition() {
461    return !isMetaAssigned();
462  }
463
464  /**
465   * Notice that this event does not mean the AM has already finished region state rebuilding. See
466   * the comment of {@link #isMetaAssigned()} for more details.
467   * @see #isMetaAssigned()
468   */
469  public boolean waitMetaAssigned(Procedure<?> proc, RegionInfo regionInfo) {
470    return getMetaAssignEvent(getMetaForRegion(regionInfo)).suspendIfNotReady(proc);
471  }
472
473  private void setMetaAssigned(RegionInfo metaRegionInfo, boolean assigned) {
474    assert isMetaRegion(metaRegionInfo) : "unexpected non-meta region " + metaRegionInfo;
475    ProcedureEvent<?> metaAssignEvent = getMetaAssignEvent(metaRegionInfo);
476    if (assigned) {
477      metaAssignEvent.wake(getProcedureScheduler());
478    } else {
479      metaAssignEvent.suspend();
480    }
481  }
482
483  private ProcedureEvent<?> getMetaAssignEvent(RegionInfo metaRegionInfo) {
484    assert isMetaRegion(metaRegionInfo) : "unexpected non-meta region " + metaRegionInfo;
485    // TODO: handle multiple meta.
486    return metaAssignEvent;
487  }
488
489  /**
490   * Wait until AM finishes the meta loading, i.e, the region states rebuilding.
491   * @see #isMetaLoaded()
492   * @see #waitMetaAssigned(Procedure, RegionInfo)
493   */
494  public boolean waitMetaLoaded(Procedure<?> proc) {
495    return metaLoadEvent.suspendIfNotReady(proc);
496  }
497
498  @VisibleForTesting
499  void wakeMetaLoadedEvent() {
500    metaLoadEvent.wake(getProcedureScheduler());
501    assert isMetaLoaded() : "expected meta to be loaded";
502  }
503
504  /**
505   * Return whether AM finishes the meta loading, i.e, the region states rebuilding.
506   * @see #isMetaAssigned()
507   * @see #waitMetaLoaded(Procedure)
508   */
509  public boolean isMetaLoaded() {
510    return metaLoadEvent.isReady();
511  }
512
513  /**
514   * Start a new thread to check if there are region servers whose versions are higher than others.
515   * If so, move all system table regions to RS with the highest version to keep compatibility.
516   * The reason is, RS in new version may not be able to access RS in old version when there are
517   * some incompatible changes.
518   * <p>This method is called when a new RegionServer is added to cluster only.</p>
519   */
520  public void checkIfShouldMoveSystemRegionAsync() {
521    // TODO: Fix this thread. If a server is killed and a new one started, this thread thinks that
522    // it should 'move' the system tables from the old server to the new server but
523    // ServerCrashProcedure is on it; and it will take care of the assign without dataloss.
524    if (this.master.getServerManager().countOfRegionServers() <= 1) {
525      return;
526    }
527    // This thread used to run whenever there was a change in the cluster. The ZooKeeper
528    // childrenChanged notification came in before the nodeDeleted message and so this method
529    // cold run before a ServerCrashProcedure could run. That meant that this thread could see
530    // a Crashed Server before ServerCrashProcedure and it could find system regions on the
531    // crashed server and go move them before ServerCrashProcedure had a chance; could be
532    // dataloss too if WALs were not recovered.
533    new Thread(() -> {
534      try {
535        synchronized (checkIfShouldMoveSystemRegionLock) {
536          List<RegionPlan> plans = new ArrayList<>();
537          // TODO: I don't think this code does a good job if all servers in cluster have same
538          // version. It looks like it will schedule unnecessary moves.
539          for (ServerName server : getExcludedServersForSystemTable()) {
540            if (master.getServerManager().isServerDead(server)) {
541              // TODO: See HBASE-18494 and HBASE-18495. Though getExcludedServersForSystemTable()
542              // considers only online servers, the server could be queued for dead server
543              // processing. As region assignments for crashed server is handled by
544              // ServerCrashProcedure, do NOT handle them here. The goal is to handle this through
545              // regular flow of LoadBalancer as a favored node and not to have this special
546              // handling.
547              continue;
548            }
549            List<RegionInfo> regionsShouldMove = getSystemTables(server);
550            if (!regionsShouldMove.isEmpty()) {
551              for (RegionInfo regionInfo : regionsShouldMove) {
552                // null value for dest forces destination server to be selected by balancer
553                RegionPlan plan = new RegionPlan(regionInfo, server, null);
554                if (regionInfo.isMetaRegion()) {
555                  // Must move meta region first.
556                  LOG.info("Async MOVE of {} to newer Server={}",
557                      regionInfo.getEncodedName(), server);
558                  moveAsync(plan);
559                } else {
560                  plans.add(plan);
561                }
562              }
563            }
564            for (RegionPlan plan : plans) {
565              LOG.info("Async MOVE of {} to newer Server={}",
566                  plan.getRegionInfo().getEncodedName(), server);
567              moveAsync(plan);
568            }
569          }
570        }
571      } catch (Throwable t) {
572        LOG.error(t.toString(), t);
573      }
574    }).start();
575  }
576
577  private List<RegionInfo> getSystemTables(ServerName serverName) {
578    ServerStateNode serverNode = regionStates.getServerNode(serverName);
579    if (serverNode == null) {
580      return Collections.emptyList();
581    }
582    return serverNode.getSystemRegionInfoList();
583  }
584
585  private void preTransitCheck(RegionStateNode regionNode, RegionState.State[] expectedStates)
586      throws HBaseIOException {
587    if (regionNode.getProcedure() != null) {
588      throw new HBaseIOException(regionNode + " is currently in transition");
589    }
590    if (!regionNode.isInState(expectedStates)) {
591      throw new DoNotRetryRegionException("Unexpected state for " + regionNode);
592    }
593    if (isTableDisabled(regionNode.getTable())) {
594      throw new DoNotRetryIOException(regionNode.getTable() + " is disabled for " + regionNode);
595    }
596  }
597
598  // TODO: Need an async version of this for hbck2.
599  public long assign(RegionInfo regionInfo, ServerName sn) throws IOException {
600    // TODO: should we use getRegionStateNode?
601    RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo);
602    TransitRegionStateProcedure proc;
603    regionNode.lock();
604    try {
605      preTransitCheck(regionNode, STATES_EXPECTED_ON_ASSIGN);
606      proc = TransitRegionStateProcedure.assign(getProcedureEnvironment(), regionInfo, sn);
607      regionNode.setProcedure(proc);
608    } finally {
609      regionNode.unlock();
610    }
611    ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc);
612    return proc.getProcId();
613  }
614
615  public long assign(RegionInfo regionInfo) throws IOException {
616    return assign(regionInfo, null);
617  }
618
619  public long unassign(RegionInfo regionInfo) throws IOException {
620    RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo);
621    if (regionNode == null) {
622      throw new UnknownRegionException("No RegionState found for " + regionInfo.getEncodedName());
623    }
624    TransitRegionStateProcedure proc;
625    regionNode.lock();
626    try {
627      preTransitCheck(regionNode, STATES_EXPECTED_ON_UNASSIGN_OR_MOVE);
628      proc = TransitRegionStateProcedure.unassign(getProcedureEnvironment(), regionInfo);
629      regionNode.setProcedure(proc);
630    } finally {
631      regionNode.unlock();
632    }
633    ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc);
634    return proc.getProcId();
635  }
636
637  public TransitRegionStateProcedure createMoveRegionProcedure(RegionInfo regionInfo,
638      ServerName targetServer) throws HBaseIOException {
639    RegionStateNode regionNode = this.regionStates.getRegionStateNode(regionInfo);
640    if (regionNode == null) {
641      throw new UnknownRegionException("No RegionStateNode found for " +
642          regionInfo.getEncodedName() + "(Closed/Deleted?)");
643    }
644    TransitRegionStateProcedure proc;
645    regionNode.lock();
646    try {
647      preTransitCheck(regionNode, STATES_EXPECTED_ON_UNASSIGN_OR_MOVE);
648      regionNode.checkOnline();
649      proc = TransitRegionStateProcedure.move(getProcedureEnvironment(), regionInfo, targetServer);
650      regionNode.setProcedure(proc);
651    } finally {
652      regionNode.unlock();
653    }
654    return proc;
655  }
656
657  public void move(RegionInfo regionInfo) throws IOException {
658    TransitRegionStateProcedure proc = createMoveRegionProcedure(regionInfo, null);
659    ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc);
660  }
661
662  public Future<byte[]> moveAsync(RegionPlan regionPlan) throws HBaseIOException {
663    TransitRegionStateProcedure proc =
664      createMoveRegionProcedure(regionPlan.getRegionInfo(), regionPlan.getDestination());
665    return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), proc);
666  }
667
668  // ============================================================================================
669  //  RegionTransition procedures helpers
670  // ============================================================================================
671
672  /**
673   * Create round-robin assigns. Use on table creation to distribute out regions across cluster.
674   * @return AssignProcedures made out of the passed in <code>hris</code> and a call to the balancer
675   *         to populate the assigns with targets chosen using round-robin (default balancer
676   *         scheme). If at assign-time, the target chosen is no longer up, thats fine, the
677   *         AssignProcedure will ask the balancer for a new target, and so on.
678   */
679  public TransitRegionStateProcedure[] createRoundRobinAssignProcedures(List<RegionInfo> hris,
680      List<ServerName> serversToExclude) {
681    if (hris.isEmpty()) {
682      return new TransitRegionStateProcedure[0];
683    }
684
685    if (serversToExclude != null
686        && this.master.getServerManager().getOnlineServersList().size() == 1) {
687      LOG.debug("Only one region server found and hence going ahead with the assignment");
688      serversToExclude = null;
689    }
690    try {
691      // Ask the balancer to assign our regions. Pass the regions en masse. The balancer can do
692      // a better job if it has all the assignments in the one lump.
693      Map<ServerName, List<RegionInfo>> assignments = getBalancer().roundRobinAssignment(hris,
694        this.master.getServerManager().createDestinationServersList(serversToExclude));
695      // Return mid-method!
696      return createAssignProcedures(assignments);
697    } catch (IOException hioe) {
698      LOG.warn("Failed roundRobinAssignment", hioe);
699    }
700    // If an error above, fall-through to this simpler assign. Last resort.
701    return createAssignProcedures(hris);
702  }
703
704  /**
705   * Create round-robin assigns. Use on table creation to distribute out regions across cluster.
706   * @return AssignProcedures made out of the passed in <code>hris</code> and a call to the balancer
707   *         to populate the assigns with targets chosen using round-robin (default balancer
708   *         scheme). If at assign-time, the target chosen is no longer up, thats fine, the
709   *         AssignProcedure will ask the balancer for a new target, and so on.
710   */
711  public TransitRegionStateProcedure[] createRoundRobinAssignProcedures(List<RegionInfo> hris) {
712    return createRoundRobinAssignProcedures(hris, null);
713  }
714
715  @VisibleForTesting
716  static int compare(TransitRegionStateProcedure left, TransitRegionStateProcedure right) {
717    if (left.getRegion().isMetaRegion()) {
718      if (right.getRegion().isMetaRegion()) {
719        return RegionInfo.COMPARATOR.compare(left.getRegion(), right.getRegion());
720      }
721      return -1;
722    } else if (right.getRegion().isMetaRegion()) {
723      return +1;
724    }
725    if (left.getRegion().getTable().isSystemTable()) {
726      if (right.getRegion().getTable().isSystemTable()) {
727        return RegionInfo.COMPARATOR.compare(left.getRegion(), right.getRegion());
728      }
729      return -1;
730    } else if (right.getRegion().getTable().isSystemTable()) {
731      return +1;
732    }
733    return RegionInfo.COMPARATOR.compare(left.getRegion(), right.getRegion());
734  }
735
736  private TransitRegionStateProcedure createAssignProcedure(RegionStateNode regionNode,
737      ServerName targetServer, boolean override) {
738    TransitRegionStateProcedure proc;
739    regionNode.lock();
740    try {
741      if(override && regionNode.getProcedure() != null) {
742        regionNode.unsetProcedure(regionNode.getProcedure());
743      }
744      assert regionNode.getProcedure() == null;
745      proc = TransitRegionStateProcedure.assign(getProcedureEnvironment(),
746        regionNode.getRegionInfo(), targetServer);
747      regionNode.setProcedure(proc);
748    } finally {
749      regionNode.unlock();
750    }
751    return proc;
752  }
753
754  private TransitRegionStateProcedure createUnassignProcedure(RegionStateNode regionNode,
755      boolean override) {
756    TransitRegionStateProcedure proc;
757    regionNode.lock();
758    try {
759      if(override && regionNode.getProcedure() != null) {
760        regionNode.unsetProcedure(regionNode.getProcedure());
761      }
762      assert regionNode.getProcedure() == null;
763      proc = TransitRegionStateProcedure.unassign(getProcedureEnvironment(),
764          regionNode.getRegionInfo());
765      regionNode.setProcedure(proc);
766    } finally {
767      regionNode.unlock();
768    }
769    return proc;
770  }
771
772  /**
773   * Create one TransitRegionStateProcedure to assign a region w/o specifying a target server.
774   * This method is specified for HBCK2
775   */
776  public TransitRegionStateProcedure createOneAssignProcedure(RegionInfo hri, boolean override) {
777    RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(hri);
778    return createAssignProcedure(regionNode, null, override);
779  }
780
781  /**
782   * Create one TransitRegionStateProcedure to unassign a region.
783   * This method is specified for HBCK2
784   */
785  public TransitRegionStateProcedure createOneUnassignProcedure(RegionInfo hri, boolean override) {
786    RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(hri);
787    return createUnassignProcedure(regionNode, override);
788  }
789
790  /**
791   * Create an array of TransitRegionStateProcedure w/o specifying a target server.
792   * <p/>
793   * If no target server, at assign time, we will try to use the former location of the region if
794   * one exists. This is how we 'retain' the old location across a server restart.
795   * <p/>
796   * Should only be called when you can make sure that no one can touch these regions other than
797   * you. For example, when you are creating table.
798   */
799  public TransitRegionStateProcedure[] createAssignProcedures(List<RegionInfo> hris) {
800    return hris.stream().map(hri -> regionStates.getOrCreateRegionStateNode(hri))
801        .map(regionNode -> createAssignProcedure(regionNode, null, false))
802        .sorted(AssignmentManager::compare).toArray(TransitRegionStateProcedure[]::new);
803  }
804
805  /**
806   * @param assignments Map of assignments from which we produce an array of AssignProcedures.
807   * @return Assignments made from the passed in <code>assignments</code>
808   */
809  private TransitRegionStateProcedure[] createAssignProcedures(
810      Map<ServerName, List<RegionInfo>> assignments) {
811    return assignments.entrySet().stream()
812      .flatMap(e -> e.getValue().stream().map(hri -> regionStates.getOrCreateRegionStateNode(hri))
813        .map(regionNode -> createAssignProcedure(regionNode, e.getKey(), false)))
814      .sorted(AssignmentManager::compare).toArray(TransitRegionStateProcedure[]::new);
815  }
816
817  /**
818   * Called by DisableTableProcedure to unassign all the regions for a table.
819   */
820  public TransitRegionStateProcedure[] createUnassignProceduresForDisabling(TableName tableName) {
821    return regionStates.getTableRegionStateNodes(tableName).stream().map(regionNode -> {
822      regionNode.lock();
823      try {
824        if (!regionStates.include(regionNode, false) ||
825          regionStates.isRegionOffline(regionNode.getRegionInfo())) {
826          return null;
827        }
828        // As in DisableTableProcedure, we will hold the xlock for table, so we can make sure that
829        // this procedure has not been executed yet, as TRSP will hold the shared lock for table all
830        // the time. So here we will unset it and when it is actually executed, it will find that
831        // the attach procedure is not itself and quit immediately.
832        if (regionNode.getProcedure() != null) {
833          regionNode.unsetProcedure(regionNode.getProcedure());
834        }
835        TransitRegionStateProcedure proc = TransitRegionStateProcedure
836          .unassign(getProcedureEnvironment(), regionNode.getRegionInfo());
837        regionNode.setProcedure(proc);
838        return proc;
839      } finally {
840        regionNode.unlock();
841      }
842    }).filter(p -> p != null).toArray(TransitRegionStateProcedure[]::new);
843  }
844
845  public SplitTableRegionProcedure createSplitProcedure(final RegionInfo regionToSplit,
846      final byte[] splitKey) throws IOException {
847    return new SplitTableRegionProcedure(getProcedureEnvironment(), regionToSplit, splitKey);
848  }
849
850  public MergeTableRegionsProcedure createMergeProcedure(RegionInfo ... ris) throws IOException {
851    return new MergeTableRegionsProcedure(getProcedureEnvironment(), ris, false);
852  }
853
854  /**
855   * Delete the region states. This is called by "DeleteTable"
856   */
857  public void deleteTable(final TableName tableName) throws IOException {
858    final ArrayList<RegionInfo> regions = regionStates.getTableRegionsInfo(tableName);
859    regionStateStore.deleteRegions(regions);
860    for (int i = 0; i < regions.size(); ++i) {
861      final RegionInfo regionInfo = regions.get(i);
862      // we expect the region to be offline
863      regionStates.removeFromOfflineRegions(regionInfo);
864      regionStates.deleteRegion(regionInfo);
865    }
866  }
867
868  // ============================================================================================
869  //  RS Region Transition Report helpers
870  // ============================================================================================
871  private void reportRegionStateTransition(ReportRegionStateTransitionResponse.Builder builder,
872      ServerName serverName, List<RegionStateTransition> transitionList) throws IOException {
873    for (RegionStateTransition transition : transitionList) {
874      switch (transition.getTransitionCode()) {
875        case OPENED:
876        case FAILED_OPEN:
877        case CLOSED:
878          assert transition.getRegionInfoCount() == 1 : transition;
879          final RegionInfo hri = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0));
880          long procId =
881            transition.getProcIdCount() > 0 ? transition.getProcId(0) : Procedure.NO_PROC_ID;
882          updateRegionTransition(serverName, transition.getTransitionCode(), hri,
883            transition.hasOpenSeqNum() ? transition.getOpenSeqNum() : HConstants.NO_SEQNUM, procId);
884          break;
885        case READY_TO_SPLIT:
886        case SPLIT:
887        case SPLIT_REVERTED:
888          assert transition.getRegionInfoCount() == 3 : transition;
889          final RegionInfo parent = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0));
890          final RegionInfo splitA = ProtobufUtil.toRegionInfo(transition.getRegionInfo(1));
891          final RegionInfo splitB = ProtobufUtil.toRegionInfo(transition.getRegionInfo(2));
892          updateRegionSplitTransition(serverName, transition.getTransitionCode(), parent, splitA,
893            splitB);
894          break;
895        case READY_TO_MERGE:
896        case MERGED:
897        case MERGE_REVERTED:
898          assert transition.getRegionInfoCount() == 3 : transition;
899          final RegionInfo merged = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0));
900          final RegionInfo mergeA = ProtobufUtil.toRegionInfo(transition.getRegionInfo(1));
901          final RegionInfo mergeB = ProtobufUtil.toRegionInfo(transition.getRegionInfo(2));
902          updateRegionMergeTransition(serverName, transition.getTransitionCode(), merged, mergeA,
903            mergeB);
904          break;
905      }
906    }
907  }
908
909  public ReportRegionStateTransitionResponse reportRegionStateTransition(
910      final ReportRegionStateTransitionRequest req) throws PleaseHoldException {
911    ReportRegionStateTransitionResponse.Builder builder =
912        ReportRegionStateTransitionResponse.newBuilder();
913    ServerName serverName = ProtobufUtil.toServerName(req.getServer());
914    ServerStateNode serverNode = regionStates.getOrCreateServer(serverName);
915    // here we have to acquire a read lock instead of a simple exclusive lock. This is because that
916    // we should not block other reportRegionStateTransition call from the same region server. This
917    // is not only about performance, but also to prevent dead lock. Think of the meta region is
918    // also on the same region server and you hold the lock which blocks the
919    // reportRegionStateTransition for meta, and since meta is not online, you will block inside the
920    // lock protection to wait for meta online...
921    serverNode.readLock().lock();
922    try {
923      // we only accept reportRegionStateTransition if the region server is online, see the comment
924      // above in submitServerCrash method and HBASE-21508 for more details.
925      if (serverNode.isInState(ServerState.ONLINE)) {
926        try {
927          reportRegionStateTransition(builder, serverName, req.getTransitionList());
928        } catch (PleaseHoldException e) {
929          LOG.trace("Failed transition ", e);
930          throw e;
931        } catch (UnsupportedOperationException | IOException e) {
932          // TODO: at the moment we have a single error message and the RS will abort
933          // if the master says that one of the region transitions failed.
934          LOG.warn("Failed transition", e);
935          builder.setErrorMessage("Failed transition " + e.getMessage());
936        }
937      } else {
938        LOG.warn("The region server {} is already dead, skip reportRegionStateTransition call",
939          serverName);
940        builder.setErrorMessage("You are dead");
941      }
942    } finally {
943      serverNode.readLock().unlock();
944    }
945
946    return builder.build();
947  }
948
949  private void updateRegionTransition(ServerName serverName, TransitionCode state,
950      RegionInfo regionInfo, long seqId, long procId) throws IOException {
951    checkMetaLoaded(regionInfo);
952
953    RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo);
954    if (regionNode == null) {
955      // the table/region is gone. maybe a delete, split, merge
956      throw new UnexpectedStateException(String.format(
957        "Server %s was trying to transition region %s to %s. but Region is not known.",
958        serverName, regionInfo, state));
959    }
960    LOG.trace("Update region transition serverName={} region={} regionState={}", serverName,
961      regionNode, state);
962
963    ServerStateNode serverNode = regionStates.getOrCreateServer(serverName);
964    regionNode.lock();
965    try {
966      if (!reportTransition(regionNode, serverNode, state, seqId, procId)) {
967        // Don't log WARN if shutting down cluster; during shutdown. Avoid the below messages:
968        // 2018-08-13 10:45:10,551 WARN ...AssignmentManager: No matching procedure found for
969        // rit=OPEN, location=ve0538.halxg.cloudera.com,16020,1533493000958,
970        // table=IntegrationTestBigLinkedList, region=65ab289e2fc1530df65f6c3d7cde7aa5 transition
971        // to CLOSED
972        // These happen because on cluster shutdown, we currently let the RegionServers close
973        // regions. This is the only time that region close is not run by the Master (so cluster
974        // goes down fast). Consider changing it so Master runs all shutdowns.
975        if (this.master.getServerManager().isClusterShutdown() &&
976          state.equals(TransitionCode.CLOSED)) {
977          LOG.info("RegionServer {} {}", state, regionNode.getRegionInfo().getEncodedName());
978        } else {
979          LOG.warn("No matching procedure found for {} transition on {} to {}",
980              serverName, regionNode, state);
981        }
982      }
983    } finally {
984      regionNode.unlock();
985    }
986  }
987
988  private boolean reportTransition(RegionStateNode regionNode, ServerStateNode serverNode,
989      TransitionCode state, long seqId, long procId) throws IOException {
990    ServerName serverName = serverNode.getServerName();
991    TransitRegionStateProcedure proc = regionNode.getProcedure();
992    if (proc == null) {
993      return false;
994    }
995    proc.reportTransition(master.getMasterProcedureExecutor().getEnvironment(), regionNode,
996      serverName, state, seqId, procId);
997    return true;
998  }
999
1000  private void updateRegionSplitTransition(final ServerName serverName, final TransitionCode state,
1001      final RegionInfo parent, final RegionInfo hriA, final RegionInfo hriB)
1002      throws IOException {
1003    checkMetaLoaded(parent);
1004
1005    if (state != TransitionCode.READY_TO_SPLIT) {
1006      throw new UnexpectedStateException("unsupported split regionState=" + state +
1007        " for parent region " + parent +
1008        " maybe an old RS (< 2.0) had the operation in progress");
1009    }
1010
1011    // sanity check on the request
1012    if (!Bytes.equals(hriA.getEndKey(), hriB.getStartKey())) {
1013      throw new UnsupportedOperationException(
1014        "unsupported split request with bad keys: parent=" + parent +
1015        " hriA=" + hriA + " hriB=" + hriB);
1016    }
1017
1018    if (!master.isSplitOrMergeEnabled(MasterSwitchType.SPLIT)) {
1019      LOG.warn("Split switch is off! skip split of " + parent);
1020      throw new DoNotRetryIOException("Split region " + parent.getRegionNameAsString() +
1021          " failed due to split switch off");
1022    }
1023
1024    // Submit the Split procedure
1025    final byte[] splitKey = hriB.getStartKey();
1026    if (LOG.isDebugEnabled()) {
1027      LOG.debug("Split request from " + serverName +
1028          ", parent=" + parent + " splitKey=" + Bytes.toStringBinary(splitKey));
1029    }
1030    master.getMasterProcedureExecutor().submitProcedure(createSplitProcedure(parent, splitKey));
1031
1032    // If the RS is < 2.0 throw an exception to abort the operation, we are handling the split
1033    if (master.getServerManager().getVersionNumber(serverName) < 0x0200000) {
1034      throw new UnsupportedOperationException(String.format(
1035        "Split handled by the master: parent=%s hriA=%s hriB=%s", parent.getShortNameToLog(), hriA, hriB));
1036    }
1037  }
1038
1039  private void updateRegionMergeTransition(final ServerName serverName, final TransitionCode state,
1040      final RegionInfo merged, final RegionInfo hriA, final RegionInfo hriB) throws IOException {
1041    checkMetaLoaded(merged);
1042
1043    if (state != TransitionCode.READY_TO_MERGE) {
1044      throw new UnexpectedStateException("Unsupported merge regionState=" + state +
1045        " for regionA=" + hriA + " regionB=" + hriB + " merged=" + merged +
1046        " maybe an old RS (< 2.0) had the operation in progress");
1047    }
1048
1049    if (!master.isSplitOrMergeEnabled(MasterSwitchType.MERGE)) {
1050      LOG.warn("Merge switch is off! skip merge of regionA=" + hriA + " regionB=" + hriB);
1051      throw new DoNotRetryIOException("Merge of regionA=" + hriA + " regionB=" + hriB +
1052        " failed because merge switch is off");
1053    }
1054
1055    // Submit the Merge procedure
1056    if (LOG.isDebugEnabled()) {
1057      LOG.debug("Handling merge request from RS=" + merged + ", merged=" + merged);
1058    }
1059    master.getMasterProcedureExecutor().submitProcedure(createMergeProcedure(hriA, hriB));
1060
1061    // If the RS is < 2.0 throw an exception to abort the operation, we are handling the merge
1062    if (master.getServerManager().getVersionNumber(serverName) < 0x0200000) {
1063      throw new UnsupportedOperationException(String.format(
1064        "Merge not handled yet: regionState=%s merged=%s hriA=%s hriB=%s", state, merged, hriA,
1065          hriB));
1066    }
1067  }
1068
1069  // ============================================================================================
1070  //  RS Status update (report online regions) helpers
1071  // ============================================================================================
1072  /**
1073   * The master will call this method when the RS send the regionServerReport(). The report will
1074   * contains the "online regions". This method will check the the online regions against the
1075   * in-memory state of the AM, and we will log a warn message if there is a mismatch. This is
1076   * because that there is no fencing between the reportRegionStateTransition method and
1077   * regionServerReport method, so there could be race and introduce inconsistency here, but
1078   * actually there is no problem.
1079   * <p/>
1080   * Please see HBASE-21421 and HBASE-21463 for more details.
1081   */
1082  public void reportOnlineRegions(ServerName serverName, Set<byte[]> regionNames) {
1083    if (!isRunning()) {
1084      return;
1085    }
1086    if (LOG.isTraceEnabled()) {
1087      LOG.trace("ReportOnlineRegions {} regionCount={}, metaLoaded={} {}", serverName,
1088        regionNames.size(), isMetaLoaded(),
1089        regionNames.stream().map(Bytes::toStringBinary).collect(Collectors.toList()));
1090    }
1091
1092    ServerStateNode serverNode = regionStates.getOrCreateServer(serverName);
1093    synchronized (serverNode) {
1094      if (!serverNode.isInState(ServerState.ONLINE)) {
1095        LOG.warn("Got a report from a server result in state " + serverNode.getState());
1096        return;
1097      }
1098    }
1099
1100    // Track the regionserver reported online regions in memory.
1101    synchronized (rsReports) {
1102      rsReports.put(serverName, regionNames);
1103    }
1104
1105    if (regionNames.isEmpty()) {
1106      // nothing to do if we don't have regions
1107      LOG.trace("no online region found on {}", serverName);
1108      return;
1109    }
1110    if (!isMetaLoaded()) {
1111      // we are still on startup, skip checking
1112      return;
1113    }
1114    // The Heartbeat tells us of what regions are on the region serve, check the state.
1115    checkOnlineRegionsReport(serverNode, regionNames);
1116  }
1117
1118  /**
1119   * Close <code>regionName</code> on <code>sn</code> silently and immediately without
1120   * using a Procedure or going via hbase:meta. For case where a RegionServer's hosting
1121   * of a Region is not aligned w/ the Master's accounting of Region state. This is for
1122   * cleaning up an error in accounting.
1123   */
1124  private void closeRegionSilently(ServerName sn, byte [] regionName) {
1125    try {
1126      RegionInfo ri = MetaTableAccessor.parseRegionInfoFromRegionName(regionName);
1127      // Pass -1 for timeout. Means do not wait.
1128      ServerManager.closeRegionSilentlyAndWait(this.master.getAsyncClusterConnection(), sn, ri, -1);
1129    } catch (Exception e) {
1130      LOG.error("Failed trying to close {} on {}", Bytes.toStringBinary(regionName), sn, e);
1131    }
1132  }
1133
1134  /**
1135   * Check that what the RegionServer reports aligns with the Master's image.
1136   * If disagreement, we will tell the RegionServer to expediently close
1137   * a Region we do not think it should have.
1138   */
1139  private void checkOnlineRegionsReport(ServerStateNode serverNode, Set<byte[]> regionNames) {
1140    ServerName serverName = serverNode.getServerName();
1141    for (byte[] regionName : regionNames) {
1142      if (!isRunning()) {
1143        return;
1144      }
1145      RegionStateNode regionNode = regionStates.getRegionStateNodeFromName(regionName);
1146      if (regionNode == null) {
1147        String regionNameAsStr = Bytes.toStringBinary(regionName);
1148        LOG.warn("No RegionStateNode for {} but reported as up on {}; closing...",
1149            regionNameAsStr, serverName);
1150        closeRegionSilently(serverNode.getServerName(), regionName);
1151        continue;
1152      }
1153      final long lag = 1000;
1154      regionNode.lock();
1155      try {
1156        long diff = EnvironmentEdgeManager.currentTime() - regionNode.getLastUpdate();
1157        if (regionNode.isInState(State.OPENING, State.OPEN)) {
1158          // This is possible as a region server has just closed a region but the region server
1159          // report is generated before the closing, but arrive after the closing. Make sure there
1160          // is some elapsed time so less false alarms.
1161          if (!regionNode.getRegionLocation().equals(serverName) && diff > lag) {
1162            LOG.warn("Reporting {} server does not match {} (time since last " +
1163                    "update={}ms); closing...",
1164              serverName, regionNode, diff);
1165            closeRegionSilently(serverNode.getServerName(), regionName);
1166          }
1167        } else if (!regionNode.isInState(State.CLOSING, State.SPLITTING)) {
1168          // So, we can get report that a region is CLOSED or SPLIT because a heartbeat
1169          // came in at about same time as a region transition. Make sure there is some
1170          // elapsed time so less false alarms.
1171          if (diff > lag) {
1172            LOG.warn("Reporting {} state does not match {} (time since last update={}ms)",
1173              serverName, regionNode, diff);
1174          }
1175        }
1176      } finally {
1177        regionNode.unlock();
1178      }
1179    }
1180  }
1181
1182  // ============================================================================================
1183  //  RIT chore
1184  // ============================================================================================
1185  private static class RegionInTransitionChore extends ProcedureInMemoryChore<MasterProcedureEnv> {
1186    public RegionInTransitionChore(final int timeoutMsec) {
1187      super(timeoutMsec);
1188    }
1189
1190    @Override
1191    protected void periodicExecute(final MasterProcedureEnv env) {
1192      final AssignmentManager am = env.getAssignmentManager();
1193
1194      final RegionInTransitionStat ritStat = am.computeRegionInTransitionStat();
1195      if (ritStat.hasRegionsOverThreshold()) {
1196        for (RegionState hri: ritStat.getRegionOverThreshold()) {
1197          am.handleRegionOverStuckWarningThreshold(hri.getRegion());
1198        }
1199      }
1200
1201      // update metrics
1202      am.updateRegionsInTransitionMetrics(ritStat);
1203    }
1204  }
1205
1206  private static class DeadServerMetricRegionChore
1207      extends ProcedureInMemoryChore<MasterProcedureEnv> {
1208    public DeadServerMetricRegionChore(final int timeoutMsec) {
1209      super(timeoutMsec);
1210    }
1211
1212    @Override
1213    protected void periodicExecute(final MasterProcedureEnv env) {
1214      final ServerManager sm = env.getMasterServices().getServerManager();
1215      final AssignmentManager am = env.getAssignmentManager();
1216      // To minimize inconsistencies we are not going to snapshot live servers in advance in case
1217      // new servers are added; OTOH we don't want to add heavy sync for a consistent view since
1218      // this is for metrics. Instead, we're going to check each regions as we go; to avoid making
1219      // too many checks, we maintain a local lists of server, limiting us to false negatives. If
1220      // we miss some recently-dead server, we'll just see it next time.
1221      Set<ServerName> recentlyLiveServers = new HashSet<>();
1222      int deadRegions = 0, unknownRegions = 0;
1223      for (RegionStateNode rsn : am.getRegionStates().getRegionStateNodes()) {
1224        if (rsn.getState() != State.OPEN) {
1225          continue; // Opportunistic check, should quickly skip RITs, offline tables, etc.
1226        }
1227        // Do not need to acquire region state lock as this is only for showing metrics.
1228        ServerName sn = rsn.getRegionLocation();
1229        State state = rsn.getState();
1230        if (state != State.OPEN) {
1231          continue; // Mostly skipping RITs that are already being take care of.
1232        }
1233        if (sn == null) {
1234          ++unknownRegions; // Opened on null?
1235          continue;
1236        }
1237        if (recentlyLiveServers.contains(sn)) {
1238          continue;
1239        }
1240        ServerManager.ServerLiveState sls = sm.isServerKnownAndOnline(sn);
1241        switch (sls) {
1242          case LIVE:
1243            recentlyLiveServers.add(sn);
1244            break;
1245          case DEAD:
1246            ++deadRegions;
1247            break;
1248          case UNKNOWN:
1249            ++unknownRegions;
1250            break;
1251          default: throw new AssertionError("Unexpected " + sls);
1252        }
1253      }
1254      if (deadRegions > 0 || unknownRegions > 0) {
1255        LOG.info("Found {} OPEN regions on dead servers and {} OPEN regions on unknown servers",
1256          deadRegions, unknownRegions);
1257      }
1258
1259      am.updateDeadServerRegionMetrics(deadRegions, unknownRegions);
1260    }
1261  }
1262
1263  public RegionInTransitionStat computeRegionInTransitionStat() {
1264    final RegionInTransitionStat rit = new RegionInTransitionStat(getConfiguration());
1265    rit.update(this);
1266    return rit;
1267  }
1268
1269  public static class RegionInTransitionStat {
1270    private final int ritThreshold;
1271
1272    private HashMap<String, RegionState> ritsOverThreshold = null;
1273    private long statTimestamp;
1274    private long oldestRITTime = 0;
1275    private int totalRITsTwiceThreshold = 0;
1276    private int totalRITs = 0;
1277
1278    @VisibleForTesting
1279    public RegionInTransitionStat(final Configuration conf) {
1280      this.ritThreshold =
1281        conf.getInt(METRICS_RIT_STUCK_WARNING_THRESHOLD, DEFAULT_RIT_STUCK_WARNING_THRESHOLD);
1282    }
1283
1284    public int getRITThreshold() {
1285      return ritThreshold;
1286    }
1287
1288    public long getTimestamp() {
1289      return statTimestamp;
1290    }
1291
1292    public int getTotalRITs() {
1293      return totalRITs;
1294    }
1295
1296    public long getOldestRITTime() {
1297      return oldestRITTime;
1298    }
1299
1300    public int getTotalRITsOverThreshold() {
1301      Map<String, RegionState> m = this.ritsOverThreshold;
1302      return m != null ? m.size() : 0;
1303    }
1304
1305    public boolean hasRegionsTwiceOverThreshold() {
1306      return totalRITsTwiceThreshold > 0;
1307    }
1308
1309    public boolean hasRegionsOverThreshold() {
1310      Map<String, RegionState> m = this.ritsOverThreshold;
1311      return m != null && !m.isEmpty();
1312    }
1313
1314    public Collection<RegionState> getRegionOverThreshold() {
1315      Map<String, RegionState> m = this.ritsOverThreshold;
1316      return m != null? m.values(): Collections.emptySet();
1317    }
1318
1319    public boolean isRegionOverThreshold(final RegionInfo regionInfo) {
1320      Map<String, RegionState> m = this.ritsOverThreshold;
1321      return m != null && m.containsKey(regionInfo.getEncodedName());
1322    }
1323
1324    public boolean isRegionTwiceOverThreshold(final RegionInfo regionInfo) {
1325      Map<String, RegionState> m = this.ritsOverThreshold;
1326      if (m == null) return false;
1327      final RegionState state = m.get(regionInfo.getEncodedName());
1328      if (state == null) return false;
1329      return (statTimestamp - state.getStamp()) > (ritThreshold * 2);
1330    }
1331
1332    protected void update(final AssignmentManager am) {
1333      final RegionStates regionStates = am.getRegionStates();
1334      this.statTimestamp = EnvironmentEdgeManager.currentTime();
1335      update(regionStates.getRegionsStateInTransition(), statTimestamp);
1336      update(regionStates.getRegionFailedOpen(), statTimestamp);
1337    }
1338
1339    private void update(final Collection<RegionState> regions, final long currentTime) {
1340      for (RegionState state: regions) {
1341        totalRITs++;
1342        final long ritStartedMs = state.getStamp();
1343        if (ritStartedMs == 0) {
1344          // Don't output bogus values to metrics if they accidentally make it here.
1345          LOG.warn("The RIT {} has no start time", state.getRegion());
1346          continue;
1347        }
1348        final long ritTime = currentTime - ritStartedMs;
1349        if (ritTime > ritThreshold) {
1350          if (ritsOverThreshold == null) {
1351            ritsOverThreshold = new HashMap<String, RegionState>();
1352          }
1353          ritsOverThreshold.put(state.getRegion().getEncodedName(), state);
1354          totalRITsTwiceThreshold += (ritTime > (ritThreshold * 2)) ? 1 : 0;
1355        }
1356        if (oldestRITTime < ritTime) {
1357          oldestRITTime = ritTime;
1358        }
1359      }
1360    }
1361  }
1362
1363  private void updateRegionsInTransitionMetrics(final RegionInTransitionStat ritStat) {
1364    metrics.updateRITOldestAge(ritStat.getOldestRITTime());
1365    metrics.updateRITCount(ritStat.getTotalRITs());
1366    metrics.updateRITCountOverThreshold(ritStat.getTotalRITsOverThreshold());
1367  }
1368
1369  private void updateDeadServerRegionMetrics(int deadRegions, int unknownRegions) {
1370    metrics.updateDeadServerOpenRegions(deadRegions);
1371    metrics.updateUnknownServerOpenRegions(unknownRegions);
1372  }
1373
1374  private void handleRegionOverStuckWarningThreshold(final RegionInfo regionInfo) {
1375    final RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo);
1376    //if (regionNode.isStuck()) {
1377    LOG.warn("STUCK Region-In-Transition {}", regionNode);
1378  }
1379
1380  // ============================================================================================
1381  //  TODO: Master load/bootstrap
1382  // ============================================================================================
1383  public void joinCluster() throws IOException {
1384    long startTime = System.nanoTime();
1385    LOG.debug("Joining cluster...");
1386
1387    // Scan hbase:meta to build list of existing regions, servers, and assignment.
1388    // hbase:meta is online now or will be. Inside loadMeta, we keep trying. Can't make progress
1389    // w/o  meta.
1390    loadMeta();
1391
1392    while (master.getServerManager().countOfRegionServers() < 1) {
1393      LOG.info("Waiting for RegionServers to join; current count={}",
1394        master.getServerManager().countOfRegionServers());
1395      Threads.sleep(250);
1396    }
1397    LOG.info("Number of RegionServers={}", master.getServerManager().countOfRegionServers());
1398
1399    // Start the chores
1400    master.getMasterProcedureExecutor().addChore(this.ritChore);
1401    master.getMasterProcedureExecutor().addChore(this.deadMetricChore);
1402
1403    long costMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
1404    LOG.info("Joined the cluster in {}", StringUtils.humanTimeDiff(costMs));
1405  }
1406
1407  /**
1408   * Create assign procedure for offline regions.
1409   * Just follow the old processofflineServersWithOnlineRegions method. Since now we do not need to
1410   * deal with dead server any more, we only deal with the regions in OFFLINE state in this method.
1411   * And this is a bit strange, that for new regions, we will add it in CLOSED state instead of
1412   * OFFLINE state, and usually there will be a procedure to track them. The
1413   * processofflineServersWithOnlineRegions is a legacy from long ago, as things are going really
1414   * different now, maybe we do not need this method any more. Need to revisit later.
1415   */
1416  // Public so can be run by the Master as part of the startup. Needs hbase:meta to be online.
1417  // Needs to be done after the table state manager has been started.
1418  public void processOfflineRegions() {
1419    List<RegionInfo> offlineRegions = regionStates.getRegionStates().stream()
1420      .filter(RegionState::isOffline).filter(s -> isTableEnabled(s.getRegion().getTable()))
1421      .map(RegionState::getRegion).collect(Collectors.toList());
1422    if (!offlineRegions.isEmpty()) {
1423      master.getMasterProcedureExecutor().submitProcedures(
1424        master.getAssignmentManager().createRoundRobinAssignProcedures(offlineRegions));
1425    }
1426  }
1427
1428  /* AM internal RegionStateStore.RegionStateVisitor implementation. To be used when
1429   * scanning META table for region rows, using RegionStateStore utility methods. RegionStateStore
1430   * methods will convert Result into proper RegionInfo instances, but those would still need to be
1431   * added into AssignmentManager.regionStates in-memory cache.
1432   * RegionMetaLoadingVisitor.visitRegionState method provides the logic for adding RegionInfo
1433   * instances as loaded from latest META scan into AssignmentManager.regionStates.
1434   */
1435  private class RegionMetaLoadingVisitor implements RegionStateStore.RegionStateVisitor  {
1436
1437    @Override
1438    public void visitRegionState(Result result, final RegionInfo regionInfo, final State state,
1439      final ServerName regionLocation, final ServerName lastHost, final long openSeqNum) {
1440      if (state == null && regionLocation == null && lastHost == null &&
1441        openSeqNum == SequenceId.NO_SEQUENCE_ID) {
1442        // This is a row with nothing in it.
1443        LOG.warn("Skipping empty row={}", result);
1444        return;
1445      }
1446      State localState = state;
1447      if (localState == null) {
1448        // No region state column data in hbase:meta table! Are I doing a rolling upgrade from
1449        // hbase1 to hbase2? Am I restoring a SNAPSHOT or otherwise adding a region to hbase:meta?
1450        // In any of these cases, state is empty. For now, presume OFFLINE but there are probably
1451        // cases where we need to probe more to be sure this correct; TODO informed by experience.
1452        LOG.info(regionInfo.getEncodedName() + " regionState=null; presuming " + State.OFFLINE);
1453        localState = State.OFFLINE;
1454      }
1455      RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo);
1456      // Do not need to lock on regionNode, as we can make sure that before we finish loading
1457      // meta, all the related procedures can not be executed. The only exception is for meta
1458      // region related operations, but here we do not load the informations for meta region.
1459      regionNode.setState(localState);
1460      regionNode.setLastHost(lastHost);
1461      regionNode.setRegionLocation(regionLocation);
1462      regionNode.setOpenSeqNum(openSeqNum);
1463
1464      // Note: keep consistent with other methods, see region(Opening|Opened|Closing)
1465      //       RIT/ServerCrash handling should take care of the transiting regions.
1466      if (localState.matches(State.OPEN, State.OPENING, State.CLOSING, State.SPLITTING,
1467        State.MERGING)) {
1468        assert regionLocation != null : "found null region location for " + regionNode;
1469        regionStates.addRegionToServer(regionNode);
1470      } else if (localState == State.OFFLINE || regionInfo.isOffline()) {
1471        regionStates.addToOfflineRegions(regionNode);
1472      }
1473      if (regionNode.getProcedure() != null) {
1474        regionNode.getProcedure().stateLoaded(AssignmentManager.this, regionNode);
1475      }
1476    }
1477  };
1478
1479  /**
1480   * Query META if the given <code>RegionInfo</code> exists, adding to
1481   * <code>AssignmentManager.regionStateStore</code> cache if the region is found in META.
1482   * @param regionEncodedName encoded name for the region to be loaded from META into
1483   *                          <code>AssignmentManager.regionStateStore</code> cache
1484   * @return <code>RegionInfo</code> instance for the given region if it is present in META
1485   *          and got successfully loaded into <code>AssignmentManager.regionStateStore</code>
1486   *          cache, <b>null</b> otherwise.
1487   * @throws UnknownRegionException if any errors occur while querying meta.
1488   */
1489  public RegionInfo loadRegionFromMeta(String regionEncodedName) throws UnknownRegionException {
1490    try {
1491      RegionMetaLoadingVisitor visitor = new RegionMetaLoadingVisitor();
1492      regionStateStore.visitMetaForRegion(regionEncodedName, visitor);
1493      return regionStates.getRegionState(regionEncodedName) == null ? null :
1494        regionStates.getRegionState(regionEncodedName).getRegion();
1495    } catch(IOException e) {
1496      LOG.error("Error trying to load region {} from META", regionEncodedName, e);
1497      throw new UnknownRegionException("Error while trying load region from meta");
1498    }
1499  }
1500
1501  private void loadMeta() throws IOException {
1502    // TODO: use a thread pool
1503    regionStateStore.visitMeta(new RegionMetaLoadingVisitor());
1504    // every assignment is blocked until meta is loaded.
1505    wakeMetaLoadedEvent();
1506  }
1507
1508  /**
1509   * Used to check if the meta loading is done.
1510   * <p/>
1511   * if not we throw PleaseHoldException since we are rebuilding the RegionStates
1512   * @param hri region to check if it is already rebuild
1513   * @throws PleaseHoldException if meta has not been loaded yet
1514   */
1515  private void checkMetaLoaded(RegionInfo hri) throws PleaseHoldException {
1516    if (!isRunning()) {
1517      throw new PleaseHoldException("AssignmentManager not running");
1518    }
1519    boolean meta = isMetaRegion(hri);
1520    boolean metaLoaded = isMetaLoaded();
1521    if (!meta && !metaLoaded) {
1522      throw new PleaseHoldException(
1523        "Master not fully online; hbase:meta=" + meta + ", metaLoaded=" + metaLoaded);
1524    }
1525  }
1526
1527  // ============================================================================================
1528  //  TODO: Metrics
1529  // ============================================================================================
1530  public int getNumRegionsOpened() {
1531    // TODO: Used by TestRegionPlacement.java and assume monotonically increasing value
1532    return 0;
1533  }
1534
1535  /**
1536   * Usually run by the Master in reaction to server crash during normal processing.
1537   * Can also be invoked via external RPC to effect repair; in the latter case,
1538   * the 'force' flag is set so we push through the SCP though context may indicate
1539   * already-running-SCP (An old SCP may have exited abnormally, or damaged cluster
1540   * may still have references in hbase:meta to 'Unknown Servers' -- servers that
1541   * are not online or in dead servers list, etc.)
1542   * @param force Set if the request came in externally over RPC (via hbck2). Force means
1543   *              run the SCP even if it seems as though there might be an outstanding
1544   *              SCP running.
1545   * @return pid of scheduled SCP or {@link Procedure#NO_PROC_ID} if none scheduled.
1546   */
1547  public long submitServerCrash(ServerName serverName, boolean shouldSplitWal, boolean force) {
1548    // May be an 'Unknown Server' so handle case where serverNode is null.
1549    ServerStateNode serverNode = regionStates.getServerNode(serverName);
1550    // Remove the in-memory rsReports result
1551    synchronized (rsReports) {
1552      rsReports.remove(serverName);
1553    }
1554
1555    // We hold the write lock here for fencing on reportRegionStateTransition. Once we set the
1556    // server state to CRASHED, we will no longer accept the reportRegionStateTransition call from
1557    // this server. This is used to simplify the implementation for TRSP and SCP, where we can make
1558    // sure that, the region list fetched by SCP will not be changed any more.
1559    if (serverNode != null) {
1560      serverNode.writeLock().lock();
1561    }
1562    boolean carryingMeta;
1563    long pid;
1564    try {
1565      ProcedureExecutor<MasterProcedureEnv> procExec = this.master.getMasterProcedureExecutor();
1566      carryingMeta = isCarryingMeta(serverName);
1567      if (!force && serverNode != null && !serverNode.isInState(ServerState.ONLINE)) {
1568        LOG.info("Skip adding SCP for {} (meta={}) -- running?", serverNode, carryingMeta);
1569        return Procedure.NO_PROC_ID;
1570      } else {
1571        MasterProcedureEnv mpe = procExec.getEnvironment();
1572        // If serverNode == null, then 'Unknown Server'. Schedule HBCKSCP instead.
1573        // HBCKSCP scours Master in-memory state AND hbase;meta for references to
1574        // serverName just-in-case. An SCP that is scheduled when the server is
1575        // 'Unknown' probably originated externally with HBCK2 fix-it tool.
1576        ServerState oldState = null;
1577        if (serverNode != null) {
1578          oldState = serverNode.getState();
1579          serverNode.setState(ServerState.CRASHED);
1580        }
1581
1582        if (force) {
1583          pid = procExec.submitProcedure(
1584              new HBCKServerCrashProcedure(mpe, serverName, shouldSplitWal, carryingMeta));
1585        } else {
1586          pid = procExec.submitProcedure(
1587              new ServerCrashProcedure(mpe, serverName, shouldSplitWal, carryingMeta));
1588        }
1589        LOG.info("Scheduled SCP pid={} for {} (carryingMeta={}){}.", pid, serverName, carryingMeta,
1590            serverNode == null? "": " " + serverNode.toString() + ", oldState=" + oldState);
1591      }
1592    } finally {
1593      if (serverNode != null) {
1594        serverNode.writeLock().unlock();
1595      }
1596    }
1597    return pid;
1598  }
1599
1600  public void offlineRegion(final RegionInfo regionInfo) {
1601    // TODO used by MasterRpcServices
1602    RegionStateNode node = regionStates.getRegionStateNode(regionInfo);
1603    if (node != null) {
1604      node.offline();
1605    }
1606  }
1607
1608  public void onlineRegion(final RegionInfo regionInfo, final ServerName serverName) {
1609    // TODO used by TestSplitTransactionOnCluster.java
1610  }
1611
1612  public Map<ServerName, List<RegionInfo>> getSnapShotOfAssignment(
1613      final Collection<RegionInfo> regions) {
1614    return regionStates.getSnapShotOfAssignment(regions);
1615  }
1616
1617  // ============================================================================================
1618  //  TODO: UTILS/HELPERS?
1619  // ============================================================================================
1620  /**
1621   * Used by the client (via master) to identify if all regions have the schema updates
1622   *
1623   * @param tableName
1624   * @return Pair indicating the status of the alter command (pending/total)
1625   * @throws IOException
1626   */
1627  public Pair<Integer, Integer> getReopenStatus(TableName tableName) {
1628    if (isTableDisabled(tableName)) return new Pair<Integer, Integer>(0, 0);
1629
1630    final List<RegionState> states = regionStates.getTableRegionStates(tableName);
1631    int ritCount = 0;
1632    for (RegionState regionState: states) {
1633      if (!regionState.isOpened() && !regionState.isSplit()) {
1634        ritCount++;
1635      }
1636    }
1637    return new Pair<Integer, Integer>(ritCount, states.size());
1638  }
1639
1640  // ============================================================================================
1641  //  TODO: Region State In Transition
1642  // ============================================================================================
1643  public boolean hasRegionsInTransition() {
1644    return regionStates.hasRegionsInTransition();
1645  }
1646
1647  public List<RegionStateNode> getRegionsInTransition() {
1648    return regionStates.getRegionsInTransition();
1649  }
1650
1651  public List<RegionInfo> getAssignedRegions() {
1652    return regionStates.getAssignedRegions();
1653  }
1654
1655  public RegionInfo getRegionInfo(final byte[] regionName) {
1656    final RegionStateNode regionState = regionStates.getRegionStateNodeFromName(regionName);
1657    return regionState != null ? regionState.getRegionInfo() : null;
1658  }
1659
1660  // ============================================================================================
1661  //  Expected states on region state transition.
1662  //  Notice that there is expected states for transiting to OPENING state, this is because SCP.
1663  //  See the comments in regionOpening method for more details.
1664  // ============================================================================================
1665  private static final State[] STATES_EXPECTED_ON_OPEN = {
1666    State.OPENING, // Normal case
1667    State.OPEN // Retrying
1668  };
1669
1670  private static final State[] STATES_EXPECTED_ON_CLOSING = {
1671    State.OPEN, // Normal case
1672    State.CLOSING, // Retrying
1673    State.SPLITTING, // Offline the split parent
1674    State.MERGING // Offline the merge parents
1675  };
1676
1677  private static final State[] STATES_EXPECTED_ON_CLOSED = {
1678    State.CLOSING, // Normal case
1679    State.CLOSED // Retrying
1680  };
1681
1682  // This is for manually scheduled region assign, can add other states later if we find out other
1683  // usages
1684  private static final State[] STATES_EXPECTED_ON_ASSIGN = { State.CLOSED, State.OFFLINE };
1685
1686  // We only allow unassign or move a region which is in OPEN state.
1687  private static final State[] STATES_EXPECTED_ON_UNASSIGN_OR_MOVE = { State.OPEN };
1688
1689  // ============================================================================================
1690  // Region Status update
1691  // Should only be called in TransitRegionStateProcedure(and related procedures), as the locking
1692  // and pre-assumptions are very tricky.
1693  // ============================================================================================
1694  private void transitStateAndUpdate(RegionStateNode regionNode, RegionState.State newState,
1695      RegionState.State... expectedStates) throws IOException {
1696    RegionState.State state = regionNode.getState();
1697    regionNode.transitionState(newState, expectedStates);
1698    boolean succ = false;
1699    try {
1700      regionStateStore.updateRegionLocation(regionNode);
1701      succ = true;
1702    } finally {
1703      if (!succ) {
1704        // revert
1705        regionNode.setState(state);
1706      }
1707    }
1708  }
1709
1710  // should be called within the synchronized block of RegionStateNode
1711  void regionOpening(RegionStateNode regionNode) throws IOException {
1712    // As in SCP, for performance reason, there is no TRSP attached with this region, we will not
1713    // update the region state, which means that the region could be in any state when we want to
1714    // assign it after a RS crash. So here we do not pass the expectedStates parameter.
1715    transitStateAndUpdate(regionNode, State.OPENING);
1716    regionStates.addRegionToServer(regionNode);
1717    // update the operation count metrics
1718    metrics.incrementOperationCounter();
1719  }
1720
1721  // should be called under the RegionStateNode lock
1722  // The parameter 'giveUp' means whether we will try to open the region again, if it is true, then
1723  // we will persist the FAILED_OPEN state into hbase:meta.
1724  void regionFailedOpen(RegionStateNode regionNode, boolean giveUp) throws IOException {
1725    RegionState.State state = regionNode.getState();
1726    ServerName regionLocation = regionNode.getRegionLocation();
1727    if (giveUp) {
1728      regionNode.setState(State.FAILED_OPEN);
1729      regionNode.setRegionLocation(null);
1730      boolean succ = false;
1731      try {
1732        regionStateStore.updateRegionLocation(regionNode);
1733        succ = true;
1734      } finally {
1735        if (!succ) {
1736          // revert
1737          regionNode.setState(state);
1738          regionNode.setRegionLocation(regionLocation);
1739        }
1740      }
1741    }
1742    if (regionLocation != null) {
1743      regionStates.removeRegionFromServer(regionLocation, regionNode);
1744    }
1745  }
1746
1747  // should be called under the RegionStateNode lock
1748  void regionClosing(RegionStateNode regionNode) throws IOException {
1749    transitStateAndUpdate(regionNode, State.CLOSING, STATES_EXPECTED_ON_CLOSING);
1750
1751    RegionInfo hri = regionNode.getRegionInfo();
1752    // Set meta has not initialized early. so people trying to create/edit tables will wait
1753    if (isMetaRegion(hri)) {
1754      setMetaAssigned(hri, false);
1755    }
1756    regionStates.addRegionToServer(regionNode);
1757    // update the operation count metrics
1758    metrics.incrementOperationCounter();
1759  }
1760
1761  // for open and close, they will first be persist to the procedure store in
1762  // RegionRemoteProcedureBase. So here we will first change the in memory state as it is considered
1763  // as succeeded if the persistence to procedure store is succeeded, and then when the
1764  // RegionRemoteProcedureBase is woken up, we will persist the RegionStateNode to hbase:meta.
1765
1766  // should be called under the RegionStateNode lock
1767  void regionOpenedWithoutPersistingToMeta(RegionStateNode regionNode) throws IOException {
1768    regionNode.transitionState(State.OPEN, STATES_EXPECTED_ON_OPEN);
1769    RegionInfo regionInfo = regionNode.getRegionInfo();
1770    regionStates.addRegionToServer(regionNode);
1771    regionStates.removeFromFailedOpen(regionInfo);
1772  }
1773
1774  // should be called under the RegionStateNode lock
1775  void regionClosedWithoutPersistingToMeta(RegionStateNode regionNode) throws IOException {
1776    ServerName regionLocation = regionNode.getRegionLocation();
1777    regionNode.transitionState(State.CLOSED, STATES_EXPECTED_ON_CLOSED);
1778    regionNode.setRegionLocation(null);
1779    if (regionLocation != null) {
1780      regionNode.setLastHost(regionLocation);
1781      regionStates.removeRegionFromServer(regionLocation, regionNode);
1782    }
1783  }
1784
1785  // should be called under the RegionStateNode lock
1786  // for SCP
1787  public void regionClosedAbnormally(RegionStateNode regionNode) throws IOException {
1788    RegionState.State state = regionNode.getState();
1789    ServerName regionLocation = regionNode.getRegionLocation();
1790    regionNode.transitionState(State.ABNORMALLY_CLOSED);
1791    regionNode.setRegionLocation(null);
1792    boolean succ = false;
1793    try {
1794      regionStateStore.updateRegionLocation(regionNode);
1795      succ = true;
1796    } finally {
1797      if (!succ) {
1798        // revert
1799        regionNode.setState(state);
1800        regionNode.setRegionLocation(regionLocation);
1801      }
1802    }
1803    if (regionLocation != null) {
1804      regionNode.setLastHost(regionLocation);
1805      regionStates.removeRegionFromServer(regionLocation, regionNode);
1806    }
1807  }
1808
1809  void persistToMeta(RegionStateNode regionNode) throws IOException {
1810    regionStateStore.updateRegionLocation(regionNode);
1811    RegionInfo regionInfo = regionNode.getRegionInfo();
1812    if (isMetaRegion(regionInfo) && regionNode.getState() == State.OPEN) {
1813      // Usually we'd set a table ENABLED at this stage but hbase:meta is ALWAYs enabled, it
1814      // can't be disabled -- so skip the RPC (besides... enabled is managed by TableStateManager
1815      // which is backed by hbase:meta... Avoid setting ENABLED to avoid having to update state
1816      // on table that contains state.
1817      setMetaAssigned(regionInfo, true);
1818    }
1819  }
1820
1821  // ============================================================================================
1822  // The above methods can only be called in TransitRegionStateProcedure(and related procedures)
1823  // ============================================================================================
1824
1825  public void markRegionAsSplit(final RegionInfo parent, final ServerName serverName,
1826      final RegionInfo daughterA, final RegionInfo daughterB) throws IOException {
1827    // Update hbase:meta. Parent will be marked offline and split up in hbase:meta.
1828    // The parent stays in regionStates until cleared when removed by CatalogJanitor.
1829    // Update its state in regionStates to it shows as offline and split when read
1830    // later figuring what regions are in a table and what are not: see
1831    // regionStates#getRegionsOfTable
1832    final RegionStateNode node = regionStates.getOrCreateRegionStateNode(parent);
1833    node.setState(State.SPLIT);
1834    final RegionStateNode nodeA = regionStates.getOrCreateRegionStateNode(daughterA);
1835    nodeA.setState(State.SPLITTING_NEW);
1836    final RegionStateNode nodeB = regionStates.getOrCreateRegionStateNode(daughterB);
1837    nodeB.setState(State.SPLITTING_NEW);
1838
1839    regionStateStore.splitRegion(parent, daughterA, daughterB, serverName);
1840    if (shouldAssignFavoredNodes(parent)) {
1841      List<ServerName> onlineServers = this.master.getServerManager().getOnlineServersList();
1842      getFavoredNodePromoter().generateFavoredNodesForDaughter(onlineServers, parent, daughterA,
1843        daughterB);
1844    }
1845  }
1846
1847  /**
1848   * When called here, the merge has happened. The merged regions have been
1849   * unassigned and the above markRegionClosed has been called on each so they have been
1850   * disassociated from a hosting Server. The merged region will be open after this call. The
1851   * merged regions are removed from hbase:meta below. Later they are deleted from the filesystem
1852   * by the catalog janitor running against hbase:meta. It notices when the merged region no
1853   * longer holds references to the old regions (References are deleted after a compaction
1854   * rewrites what the Reference points at but not until the archiver chore runs, are the
1855   * References removed).
1856   */
1857  public void markRegionAsMerged(final RegionInfo child, final ServerName serverName,
1858        RegionInfo [] mergeParents)
1859      throws IOException {
1860    final RegionStateNode node = regionStates.getOrCreateRegionStateNode(child);
1861    node.setState(State.MERGED);
1862    for (RegionInfo ri: mergeParents) {
1863      regionStates.deleteRegion(ri);
1864
1865    }
1866    regionStateStore.mergeRegions(child, mergeParents, serverName);
1867    if (shouldAssignFavoredNodes(child)) {
1868      getFavoredNodePromoter().generateFavoredNodesForMergedRegion(child, mergeParents);
1869    }
1870  }
1871
1872  /*
1873   * Favored nodes should be applied only when FavoredNodes balancer is configured and the region
1874   * belongs to a non-system table.
1875   */
1876  private boolean shouldAssignFavoredNodes(RegionInfo region) {
1877    return this.shouldAssignRegionsWithFavoredNodes &&
1878        FavoredNodesManager.isFavoredNodeApplicable(region);
1879  }
1880
1881  // ============================================================================================
1882  //  Assign Queue (Assign/Balance)
1883  // ============================================================================================
1884  private final ArrayList<RegionStateNode> pendingAssignQueue = new ArrayList<RegionStateNode>();
1885  private final ReentrantLock assignQueueLock = new ReentrantLock();
1886  private final Condition assignQueueFullCond = assignQueueLock.newCondition();
1887
1888  /**
1889   * Add the assign operation to the assignment queue.
1890   * The pending assignment operation will be processed,
1891   * and each region will be assigned by a server using the balancer.
1892   */
1893  protected void queueAssign(final RegionStateNode regionNode) {
1894    regionNode.getProcedureEvent().suspend();
1895
1896    // TODO: quick-start for meta and the other sys-tables?
1897    assignQueueLock.lock();
1898    try {
1899      pendingAssignQueue.add(regionNode);
1900      if (regionNode.isSystemTable() ||
1901          pendingAssignQueue.size() == 1 ||
1902          pendingAssignQueue.size() >= assignDispatchWaitQueueMaxSize) {
1903        assignQueueFullCond.signal();
1904      }
1905    } finally {
1906      assignQueueLock.unlock();
1907    }
1908  }
1909
1910  private void startAssignmentThread() {
1911    // Get Server Thread name. Sometimes the Server is mocked so may not implement HasThread.
1912    // For example, in tests.
1913    String name = master instanceof HasThread? ((HasThread)master).getName():
1914        master.getServerName().toShortString();
1915    assignThread = new Thread(name) {
1916      @Override
1917      public void run() {
1918        while (isRunning()) {
1919          processAssignQueue();
1920        }
1921        pendingAssignQueue.clear();
1922      }
1923    };
1924    assignThread.setDaemon(true);
1925    assignThread.start();
1926  }
1927
1928  private void stopAssignmentThread() {
1929    assignQueueSignal();
1930    try {
1931      while (assignThread.isAlive()) {
1932        assignQueueSignal();
1933        assignThread.join(250);
1934      }
1935    } catch (InterruptedException e) {
1936      LOG.warn("join interrupted", e);
1937      Thread.currentThread().interrupt();
1938    }
1939  }
1940
1941  private void assignQueueSignal() {
1942    assignQueueLock.lock();
1943    try {
1944      assignQueueFullCond.signal();
1945    } finally {
1946      assignQueueLock.unlock();
1947    }
1948  }
1949
1950  @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP")
1951  private HashMap<RegionInfo, RegionStateNode> waitOnAssignQueue() {
1952    HashMap<RegionInfo, RegionStateNode> regions = null;
1953
1954    assignQueueLock.lock();
1955    try {
1956      if (pendingAssignQueue.isEmpty() && isRunning()) {
1957        assignQueueFullCond.await();
1958      }
1959
1960      if (!isRunning()) return null;
1961      assignQueueFullCond.await(assignDispatchWaitMillis, TimeUnit.MILLISECONDS);
1962      regions = new HashMap<RegionInfo, RegionStateNode>(pendingAssignQueue.size());
1963      for (RegionStateNode regionNode: pendingAssignQueue) {
1964        regions.put(regionNode.getRegionInfo(), regionNode);
1965      }
1966      pendingAssignQueue.clear();
1967    } catch (InterruptedException e) {
1968      LOG.warn("got interrupted ", e);
1969      Thread.currentThread().interrupt();
1970    } finally {
1971      assignQueueLock.unlock();
1972    }
1973    return regions;
1974  }
1975
1976  private void processAssignQueue() {
1977    final HashMap<RegionInfo, RegionStateNode> regions = waitOnAssignQueue();
1978    if (regions == null || regions.size() == 0 || !isRunning()) {
1979      return;
1980    }
1981
1982    if (LOG.isTraceEnabled()) {
1983      LOG.trace("PROCESS ASSIGN QUEUE regionCount=" + regions.size());
1984    }
1985
1986    // TODO: Optimize balancer. pass a RegionPlan?
1987    final HashMap<RegionInfo, ServerName> retainMap = new HashMap<>();
1988    final List<RegionInfo> userHRIs = new ArrayList<>(regions.size());
1989    // Regions for system tables requiring reassignment
1990    final List<RegionInfo> systemHRIs = new ArrayList<>();
1991    for (RegionStateNode regionStateNode: regions.values()) {
1992      boolean sysTable = regionStateNode.isSystemTable();
1993      final List<RegionInfo> hris = sysTable? systemHRIs: userHRIs;
1994      if (regionStateNode.getRegionLocation() != null) {
1995        retainMap.put(regionStateNode.getRegionInfo(), regionStateNode.getRegionLocation());
1996      } else {
1997        hris.add(regionStateNode.getRegionInfo());
1998      }
1999    }
2000
2001    // TODO: connect with the listener to invalidate the cache
2002
2003    // TODO use events
2004    List<ServerName> servers = master.getServerManager().createDestinationServersList();
2005    for (int i = 0; servers.size() < 1; ++i) {
2006      // Report every fourth time around this loop; try not to flood log.
2007      if (i % 4 == 0) {
2008        LOG.warn("No servers available; cannot place " + regions.size() + " unassigned regions.");
2009      }
2010
2011      if (!isRunning()) {
2012        LOG.debug("Stopped! Dropping assign of " + regions.size() + " queued regions.");
2013        return;
2014      }
2015      Threads.sleep(250);
2016      servers = master.getServerManager().createDestinationServersList();
2017    }
2018
2019    if (!systemHRIs.isEmpty()) {
2020      // System table regions requiring reassignment are present, get region servers
2021      // not available for system table regions
2022      final List<ServerName> excludeServers = getExcludedServersForSystemTable();
2023      List<ServerName> serversForSysTables = servers.stream()
2024          .filter(s -> !excludeServers.contains(s)).collect(Collectors.toList());
2025      if (serversForSysTables.isEmpty()) {
2026        LOG.warn("Filtering old server versions and the excluded produced an empty set; " +
2027            "instead considering all candidate servers!");
2028      }
2029      LOG.debug("Processing assignQueue; systemServersCount=" + serversForSysTables.size() +
2030          ", allServersCount=" + servers.size());
2031      processAssignmentPlans(regions, null, systemHRIs,
2032          serversForSysTables.isEmpty() && !containsBogusAssignments(regions, systemHRIs) ?
2033              servers: serversForSysTables);
2034    }
2035
2036    processAssignmentPlans(regions, retainMap, userHRIs, servers);
2037  }
2038
2039  private boolean containsBogusAssignments(Map<RegionInfo, RegionStateNode> regions,
2040      List<RegionInfo> hirs) {
2041    for (RegionInfo ri : hirs) {
2042      if (regions.get(ri).getRegionLocation() != null &&
2043          regions.get(ri).getRegionLocation().equals(LoadBalancer.BOGUS_SERVER_NAME)){
2044        return true;
2045      }
2046    }
2047    return false;
2048  }
2049
2050  private void processAssignmentPlans(final HashMap<RegionInfo, RegionStateNode> regions,
2051      final HashMap<RegionInfo, ServerName> retainMap, final List<RegionInfo> hris,
2052      final List<ServerName> servers) {
2053    boolean isTraceEnabled = LOG.isTraceEnabled();
2054    if (isTraceEnabled) {
2055      LOG.trace("Available servers count=" + servers.size() + ": " + servers);
2056    }
2057
2058    final LoadBalancer balancer = getBalancer();
2059    // ask the balancer where to place regions
2060    if (retainMap != null && !retainMap.isEmpty()) {
2061      if (isTraceEnabled) {
2062        LOG.trace("retain assign regions=" + retainMap);
2063      }
2064      try {
2065        acceptPlan(regions, balancer.retainAssignment(retainMap, servers));
2066      } catch (IOException e) {
2067        LOG.warn("unable to retain assignment", e);
2068        addToPendingAssignment(regions, retainMap.keySet());
2069      }
2070    }
2071
2072    // TODO: Do we need to split retain and round-robin?
2073    // the retain seems to fallback to round-robin/random if the region is not in the map.
2074    if (!hris.isEmpty()) {
2075      Collections.sort(hris, RegionInfo.COMPARATOR);
2076      if (isTraceEnabled) {
2077        LOG.trace("round robin regions=" + hris);
2078      }
2079      try {
2080        acceptPlan(regions, balancer.roundRobinAssignment(hris, servers));
2081      } catch (IOException e) {
2082        LOG.warn("unable to round-robin assignment", e);
2083        addToPendingAssignment(regions, hris);
2084      }
2085    }
2086  }
2087
2088  private void acceptPlan(final HashMap<RegionInfo, RegionStateNode> regions,
2089      final Map<ServerName, List<RegionInfo>> plan) throws HBaseIOException {
2090    final ProcedureEvent<?>[] events = new ProcedureEvent[regions.size()];
2091    final long st = System.currentTimeMillis();
2092
2093    if (plan == null) {
2094      throw new HBaseIOException("unable to compute plans for regions=" + regions.size());
2095    }
2096
2097    if (plan.isEmpty()) return;
2098
2099    int evcount = 0;
2100    for (Map.Entry<ServerName, List<RegionInfo>> entry: plan.entrySet()) {
2101      final ServerName server = entry.getKey();
2102      for (RegionInfo hri: entry.getValue()) {
2103        final RegionStateNode regionNode = regions.get(hri);
2104        regionNode.setRegionLocation(server);
2105        if (server.equals(LoadBalancer.BOGUS_SERVER_NAME) && regionNode.isSystemTable()) {
2106          assignQueueLock.lock();
2107          try {
2108            pendingAssignQueue.add(regionNode);
2109          } finally {
2110            assignQueueLock.unlock();
2111          }
2112        }else {
2113          events[evcount++] = regionNode.getProcedureEvent();
2114        }
2115      }
2116    }
2117    ProcedureEvent.wakeEvents(getProcedureScheduler(), events);
2118
2119    final long et = System.currentTimeMillis();
2120    if (LOG.isTraceEnabled()) {
2121      LOG.trace("ASSIGN ACCEPT " + events.length + " -> " +
2122          StringUtils.humanTimeDiff(et - st));
2123    }
2124  }
2125
2126  private void addToPendingAssignment(final HashMap<RegionInfo, RegionStateNode> regions,
2127      final Collection<RegionInfo> pendingRegions) {
2128    assignQueueLock.lock();
2129    try {
2130      for (RegionInfo hri: pendingRegions) {
2131        pendingAssignQueue.add(regions.get(hri));
2132      }
2133    } finally {
2134      assignQueueLock.unlock();
2135    }
2136  }
2137
2138  /**
2139   * Get a list of servers that this region cannot be assigned to.
2140   * For system tables, we must assign them to a server with highest version.
2141   */
2142  public List<ServerName> getExcludedServersForSystemTable() {
2143    // TODO: This should be a cached list kept by the ServerManager rather than calculated on each
2144    // move or system region assign. The RegionServerTracker keeps list of online Servers with
2145    // RegionServerInfo that includes Version.
2146    List<Pair<ServerName, String>> serverList = master.getServerManager().getOnlineServersList()
2147        .stream()
2148        .map((s)->new Pair<>(s, master.getRegionServerVersion(s)))
2149        .collect(Collectors.toList());
2150    if (serverList.isEmpty()) {
2151      return Collections.emptyList();
2152    }
2153    String highestVersion = Collections.max(serverList,
2154        (o1, o2) -> VersionInfo.compareVersion(o1.getSecond(), o2.getSecond())).getSecond();
2155    return serverList.stream()
2156        .filter((p)->!p.getSecond().equals(highestVersion))
2157        .map(Pair::getFirst)
2158        .collect(Collectors.toList());
2159  }
2160
2161  @VisibleForTesting
2162  MasterServices getMaster() {
2163    return master;
2164  }
2165
2166  /**
2167   * @return a snapshot of rsReports
2168   */
2169  public Map<ServerName, Set<byte[]>> getRSReports() {
2170    Map<ServerName, Set<byte[]>> rsReportsSnapshot = new HashMap<>();
2171    synchronized (rsReports) {
2172      rsReports.entrySet().forEach(e -> rsReportsSnapshot.put(e.getKey(), e.getValue()));
2173    }
2174    return rsReportsSnapshot;
2175  }
2176
2177  /**
2178   * Provide regions state count for given table.
2179   * e.g howmany regions of give table are opened/closed/rit etc
2180   *
2181   * @param tableName TableName
2182   * @return region states count
2183   */
2184  public RegionStatesCount getRegionStatesCount(TableName tableName) {
2185    int openRegionsCount = 0;
2186    int closedRegionCount = 0;
2187    int ritCount = 0;
2188    int splitRegionCount = 0;
2189    int totalRegionCount = 0;
2190    if (!isTableDisabled(tableName)) {
2191      final List<RegionState> states = regionStates.getTableRegionStates(tableName);
2192      for (RegionState regionState : states) {
2193        if (regionState.isOpened()) {
2194          openRegionsCount++;
2195        } else if (regionState.isClosed()) {
2196          closedRegionCount++;
2197        } else if (regionState.isSplit()) {
2198          splitRegionCount++;
2199        }
2200      }
2201      totalRegionCount = states.size();
2202      ritCount = totalRegionCount - openRegionsCount - splitRegionCount;
2203    }
2204    return new RegionStatesCount.RegionStatesCountBuilder()
2205      .setOpenRegions(openRegionsCount)
2206      .setClosedRegions(closedRegionCount)
2207      .setSplitRegions(splitRegionCount)
2208      .setRegionsInTransition(ritCount)
2209      .setTotalRegions(totalRegionCount)
2210      .build();
2211  }
2212
2213}