001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.assignment;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Collection;
023import java.util.Collections;
024import java.util.HashMap;
025import java.util.HashSet;
026import java.util.List;
027import java.util.Map;
028import java.util.Set;
029import java.util.concurrent.Future;
030import java.util.concurrent.TimeUnit;
031import java.util.concurrent.atomic.AtomicBoolean;
032import java.util.concurrent.locks.Condition;
033import java.util.concurrent.locks.ReentrantLock;
034import java.util.stream.Collectors;
035
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.hbase.DoNotRetryIOException;
038import org.apache.hadoop.hbase.HBaseIOException;
039import org.apache.hadoop.hbase.HConstants;
040import org.apache.hadoop.hbase.MetaTableAccessor;
041import org.apache.hadoop.hbase.PleaseHoldException;
042import org.apache.hadoop.hbase.ServerName;
043import org.apache.hadoop.hbase.TableName;
044import org.apache.hadoop.hbase.UnknownRegionException;
045import org.apache.hadoop.hbase.client.DoNotRetryRegionException;
046import org.apache.hadoop.hbase.client.MasterSwitchType;
047import org.apache.hadoop.hbase.client.RegionInfo;
048import org.apache.hadoop.hbase.client.RegionInfoBuilder;
049import org.apache.hadoop.hbase.client.RegionStatesCount;
050import org.apache.hadoop.hbase.client.Result;
051import org.apache.hadoop.hbase.client.TableState;
052import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
053import org.apache.hadoop.hbase.favored.FavoredNodesManager;
054import org.apache.hadoop.hbase.favored.FavoredNodesPromoter;
055import org.apache.hadoop.hbase.master.LoadBalancer;
056import org.apache.hadoop.hbase.master.MasterServices;
057import org.apache.hadoop.hbase.master.MetricsAssignmentManager;
058import org.apache.hadoop.hbase.master.RegionPlan;
059import org.apache.hadoop.hbase.master.RegionState;
060import org.apache.hadoop.hbase.master.RegionState.State;
061import org.apache.hadoop.hbase.master.ServerManager;
062import org.apache.hadoop.hbase.master.TableStateManager;
063import org.apache.hadoop.hbase.master.balancer.FavoredStochasticBalancer;
064import org.apache.hadoop.hbase.master.procedure.HBCKServerCrashProcedure;
065import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
066import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler;
067import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait;
068import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
069import org.apache.hadoop.hbase.procedure2.Procedure;
070import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
071import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
072import org.apache.hadoop.hbase.procedure2.ProcedureInMemoryChore;
073import org.apache.hadoop.hbase.procedure2.util.StringUtils;
074import org.apache.hadoop.hbase.regionserver.SequenceId;
075import org.apache.hadoop.hbase.rsgroup.RSGroupBasedLoadBalancer;
076import org.apache.hadoop.hbase.util.Bytes;
077import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
078import org.apache.hadoop.hbase.util.Pair;
079import org.apache.hadoop.hbase.util.Threads;
080import org.apache.hadoop.hbase.util.VersionInfo;
081import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
082import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
083import org.apache.yetus.audience.InterfaceAudience;
084import org.apache.zookeeper.KeeperException;
085import org.slf4j.Logger;
086import org.slf4j.LoggerFactory;
087
088import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
089
090import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
091import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
092import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
093import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
094import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
095
096/**
097 * The AssignmentManager is the coordinator for region assign/unassign operations.
098 * <ul>
099 * <li>In-memory states of regions and servers are stored in {@link RegionStates}.</li>
100 * <li>hbase:meta state updates are handled by {@link RegionStateStore}.</li>
101 * </ul>
102 * Regions are created by CreateTable, Split, Merge.
103 * Regions are deleted by DeleteTable, Split, Merge.
104 * Assigns are triggered by CreateTable, EnableTable, Split, Merge, ServerCrash.
105 * Unassigns are triggered by DisableTable, Split, Merge
106 */
107@InterfaceAudience.Private
108public class AssignmentManager {
109  private static final Logger LOG = LoggerFactory.getLogger(AssignmentManager.class);
110
111  // TODO: AMv2
112  //  - handle region migration from hbase1 to hbase2.
113  //  - handle sys table assignment first (e.g. acl, namespace)
114  //  - handle table priorities
115  //  - If ServerBusyException trying to update hbase:meta, we abort the Master
116  //   See updateRegionLocation in RegionStateStore.
117  //
118  // See also
119  // https://docs.google.com/document/d/1eVKa7FHdeoJ1-9o8yZcOTAQbv0u0bblBlCCzVSIn69g/edit#heading=h.ystjyrkbtoq5
120  // for other TODOs.
121
122  public static final String BOOTSTRAP_THREAD_POOL_SIZE_CONF_KEY =
123      "hbase.assignment.bootstrap.thread.pool.size";
124
125  public static final String ASSIGN_DISPATCH_WAIT_MSEC_CONF_KEY =
126      "hbase.assignment.dispatch.wait.msec";
127  private static final int DEFAULT_ASSIGN_DISPATCH_WAIT_MSEC = 150;
128
129  public static final String ASSIGN_DISPATCH_WAITQ_MAX_CONF_KEY =
130      "hbase.assignment.dispatch.wait.queue.max.size";
131  private static final int DEFAULT_ASSIGN_DISPATCH_WAITQ_MAX = 100;
132
133  public static final String RIT_CHORE_INTERVAL_MSEC_CONF_KEY =
134      "hbase.assignment.rit.chore.interval.msec";
135  private static final int DEFAULT_RIT_CHORE_INTERVAL_MSEC = 60 * 1000;
136
137  public static final String DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC_CONF_KEY =
138      "hbase.assignment.dead.region.metric.chore.interval.msec";
139  private static final int DEFAULT_DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC = 120 * 1000;
140
141  public static final String ASSIGN_MAX_ATTEMPTS =
142      "hbase.assignment.maximum.attempts";
143  private static final int DEFAULT_ASSIGN_MAX_ATTEMPTS = Integer.MAX_VALUE;
144
145  public static final String ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS =
146      "hbase.assignment.retry.immediately.maximum.attempts";
147  private static final int DEFAULT_ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS = 3;
148
149  /** Region in Transition metrics threshold time */
150  public static final String METRICS_RIT_STUCK_WARNING_THRESHOLD =
151      "hbase.metrics.rit.stuck.warning.threshold";
152  private static final int DEFAULT_RIT_STUCK_WARNING_THRESHOLD = 60 * 1000;
153
154  private final ProcedureEvent<?> metaAssignEvent = new ProcedureEvent<>("meta assign");
155  private final ProcedureEvent<?> metaLoadEvent = new ProcedureEvent<>("meta load");
156
157  private final MetricsAssignmentManager metrics;
158  private final RegionInTransitionChore ritChore;
159  private final DeadServerMetricRegionChore deadMetricChore;
160  private final MasterServices master;
161
162  private final AtomicBoolean running = new AtomicBoolean(false);
163  private final RegionStates regionStates = new RegionStates();
164  private final RegionStateStore regionStateStore;
165
166  private final Map<ServerName, Set<byte[]>> rsReports = new HashMap<>();
167
168  private final boolean shouldAssignRegionsWithFavoredNodes;
169  private final int assignDispatchWaitQueueMaxSize;
170  private final int assignDispatchWaitMillis;
171  private final int assignMaxAttempts;
172  private final int assignRetryImmediatelyMaxAttempts;
173
174  private final Object checkIfShouldMoveSystemRegionLock = new Object();
175
176  private Thread assignThread;
177
178  public AssignmentManager(final MasterServices master) {
179    this(master, new RegionStateStore(master));
180  }
181
182  @VisibleForTesting
183  AssignmentManager(final MasterServices master, final RegionStateStore stateStore) {
184    this.master = master;
185    this.regionStateStore = stateStore;
186    this.metrics = new MetricsAssignmentManager();
187
188    final Configuration conf = master.getConfiguration();
189
190    // Only read favored nodes if using the favored nodes load balancer.
191    this.shouldAssignRegionsWithFavoredNodes = FavoredStochasticBalancer.class.isAssignableFrom(
192        conf.getClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, Object.class));
193
194    this.assignDispatchWaitMillis = conf.getInt(ASSIGN_DISPATCH_WAIT_MSEC_CONF_KEY,
195        DEFAULT_ASSIGN_DISPATCH_WAIT_MSEC);
196    this.assignDispatchWaitQueueMaxSize = conf.getInt(ASSIGN_DISPATCH_WAITQ_MAX_CONF_KEY,
197        DEFAULT_ASSIGN_DISPATCH_WAITQ_MAX);
198
199    this.assignMaxAttempts = Math.max(1, conf.getInt(ASSIGN_MAX_ATTEMPTS,
200        DEFAULT_ASSIGN_MAX_ATTEMPTS));
201    this.assignRetryImmediatelyMaxAttempts = conf.getInt(ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS,
202        DEFAULT_ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS);
203
204    int ritChoreInterval = conf.getInt(RIT_CHORE_INTERVAL_MSEC_CONF_KEY,
205        DEFAULT_RIT_CHORE_INTERVAL_MSEC);
206    this.ritChore = new RegionInTransitionChore(ritChoreInterval);
207
208    int deadRegionChoreInterval = conf.getInt(DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC_CONF_KEY,
209        DEFAULT_DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC);
210    if (deadRegionChoreInterval > 0) {
211      this.deadMetricChore = new DeadServerMetricRegionChore(deadRegionChoreInterval);
212    } else {
213      this.deadMetricChore = null;
214    }
215  }
216
217  public void start() throws IOException, KeeperException {
218    if (!running.compareAndSet(false, true)) {
219      return;
220    }
221
222    LOG.trace("Starting assignment manager");
223
224    // Start the Assignment Thread
225    startAssignmentThread();
226
227    // load meta region state
228    ZKWatcher zkw = master.getZooKeeper();
229    // it could be null in some tests
230    if (zkw != null) {
231      RegionState regionState = MetaTableLocator.getMetaRegionState(zkw);
232      RegionStateNode regionNode =
233        regionStates.getOrCreateRegionStateNode(RegionInfoBuilder.FIRST_META_REGIONINFO);
234      regionNode.lock();
235      try {
236        regionNode.setRegionLocation(regionState.getServerName());
237        regionNode.setState(regionState.getState());
238        if (regionNode.getProcedure() != null) {
239          regionNode.getProcedure().stateLoaded(this, regionNode);
240        }
241        setMetaAssigned(regionState.getRegion(), regionState.getState() == State.OPEN);
242      } finally {
243        regionNode.unlock();
244      }
245    }
246  }
247
248  /**
249   * Create RegionStateNode based on the TRSP list, and attach the TRSP to the RegionStateNode.
250   * <p>
251   * This is used to restore the RIT region list, so we do not need to restore it in the loadingMeta
252   * method below. And it is also very important as now before submitting a TRSP, we need to attach
253   * it to the RegionStateNode, which acts like a guard, so we need to restore this information at
254   * the very beginning, before we start processing any procedures.
255   */
256  public void setupRIT(List<TransitRegionStateProcedure> procs) {
257    procs.forEach(proc -> {
258      RegionInfo regionInfo = proc.getRegion();
259      RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo);
260      TransitRegionStateProcedure existingProc = regionNode.getProcedure();
261      if (existingProc != null) {
262        // This is possible, as we will detach the procedure from the RSN before we
263        // actually finish the procedure. This is because that, we will detach the TRSP from the RSN
264        // during execution, at that time, the procedure has not been marked as done in the pv2
265        // framework yet, so it is possible that we schedule a new TRSP immediately and when
266        // arriving here, we will find out that there are multiple TRSPs for the region. But we can
267        // make sure that, only the last one can take the charge, the previous ones should have all
268        // been finished already. So here we will compare the proc id, the greater one will win.
269        if (existingProc.getProcId() < proc.getProcId()) {
270          // the new one wins, unset and set it to the new one below
271          regionNode.unsetProcedure(existingProc);
272        } else {
273          // the old one wins, skip
274          return;
275        }
276      }
277      LOG.info("Attach {} to {} to restore RIT", proc, regionNode);
278      regionNode.setProcedure(proc);
279    });
280  }
281
282  public void stop() {
283    if (!running.compareAndSet(true, false)) {
284      return;
285    }
286
287    LOG.info("Stopping assignment manager");
288
289    // The AM is started before the procedure executor,
290    // but the actual work will be loaded/submitted only once we have the executor
291    final boolean hasProcExecutor = master.getMasterProcedureExecutor() != null;
292
293    // Remove the RIT chore
294    if (hasProcExecutor) {
295      master.getMasterProcedureExecutor().removeChore(this.ritChore);
296      if (this.deadMetricChore != null) {
297        master.getMasterProcedureExecutor().removeChore(this.deadMetricChore);
298      }
299    }
300
301    // Stop the Assignment Thread
302    stopAssignmentThread();
303
304    // Stop the RegionStateStore
305    regionStates.clear();
306
307    // Update meta events (for testing)
308    if (hasProcExecutor) {
309      metaLoadEvent.suspend();
310      for (RegionInfo hri: getMetaRegionSet()) {
311        setMetaAssigned(hri, false);
312      }
313    }
314  }
315
316  public boolean isRunning() {
317    return running.get();
318  }
319
320  public Configuration getConfiguration() {
321    return master.getConfiguration();
322  }
323
324  public MetricsAssignmentManager getAssignmentManagerMetrics() {
325    return metrics;
326  }
327
328  private LoadBalancer getBalancer() {
329    return master.getLoadBalancer();
330  }
331
332  private FavoredNodesPromoter getFavoredNodePromoter() {
333    return (FavoredNodesPromoter) ((RSGroupBasedLoadBalancer) master.getLoadBalancer())
334      .getInternalBalancer();
335  }
336
337  private MasterProcedureEnv getProcedureEnvironment() {
338    return master.getMasterProcedureExecutor().getEnvironment();
339  }
340
341  private MasterProcedureScheduler getProcedureScheduler() {
342    return getProcedureEnvironment().getProcedureScheduler();
343  }
344
345  int getAssignMaxAttempts() {
346    return assignMaxAttempts;
347  }
348
349  int getAssignRetryImmediatelyMaxAttempts() {
350    return assignRetryImmediatelyMaxAttempts;
351  }
352
353  public RegionStates getRegionStates() {
354    return regionStates;
355  }
356
357  /**
358   * Returns the regions hosted by the specified server.
359   * <p/>
360   * Notice that, for SCP, after we submit the SCP, no one can change the region list for the
361   * ServerStateNode so we do not need any locks here. And for other usage, this can only give you a
362   * snapshot of the current region list for this server, which means, right after you get the
363   * region list, new regions may be moved to this server or some regions may be moved out from this
364   * server, so you should not use it critically if you need strong consistency.
365   */
366  public List<RegionInfo> getRegionsOnServer(ServerName serverName) {
367    ServerStateNode serverInfo = regionStates.getServerNode(serverName);
368    if (serverInfo == null) {
369      return Collections.emptyList();
370    }
371    return serverInfo.getRegionInfoList();
372  }
373
374  public RegionStateStore getRegionStateStore() {
375    return regionStateStore;
376  }
377
378  public List<ServerName> getFavoredNodes(final RegionInfo regionInfo) {
379    return this.shouldAssignRegionsWithFavoredNodes
380      ? getFavoredNodePromoter().getFavoredNodes(regionInfo)
381      : ServerName.EMPTY_SERVER_LIST;
382  }
383
384  // ============================================================================================
385  //  Table State Manager helpers
386  // ============================================================================================
387  TableStateManager getTableStateManager() {
388    return master.getTableStateManager();
389  }
390
391  public boolean isTableEnabled(final TableName tableName) {
392    return getTableStateManager().isTableState(tableName, TableState.State.ENABLED);
393  }
394
395  public boolean isTableDisabled(final TableName tableName) {
396    return getTableStateManager().isTableState(tableName,
397      TableState.State.DISABLED, TableState.State.DISABLING);
398  }
399
400  // ============================================================================================
401  //  META Helpers
402  // ============================================================================================
403  private boolean isMetaRegion(final RegionInfo regionInfo) {
404    return regionInfo.isMetaRegion();
405  }
406
407  public boolean isMetaRegion(final byte[] regionName) {
408    return getMetaRegionFromName(regionName) != null;
409  }
410
411  public RegionInfo getMetaRegionFromName(final byte[] regionName) {
412    for (RegionInfo hri: getMetaRegionSet()) {
413      if (Bytes.equals(hri.getRegionName(), regionName)) {
414        return hri;
415      }
416    }
417    return null;
418  }
419
420  public boolean isCarryingMeta(final ServerName serverName) {
421    // TODO: handle multiple meta
422    return isCarryingRegion(serverName, RegionInfoBuilder.FIRST_META_REGIONINFO);
423  }
424
425  private boolean isCarryingRegion(final ServerName serverName, final RegionInfo regionInfo) {
426    // TODO: check for state?
427    final RegionStateNode node = regionStates.getRegionStateNode(regionInfo);
428    return(node != null && serverName.equals(node.getRegionLocation()));
429  }
430
431  private RegionInfo getMetaForRegion(final RegionInfo regionInfo) {
432    //if (regionInfo.isMetaRegion()) return regionInfo;
433    // TODO: handle multiple meta. if the region provided is not meta lookup
434    // which meta the region belongs to.
435    return RegionInfoBuilder.FIRST_META_REGIONINFO;
436  }
437
438  // TODO: handle multiple meta.
439  private static final Set<RegionInfo> META_REGION_SET =
440      Collections.singleton(RegionInfoBuilder.FIRST_META_REGIONINFO);
441  public Set<RegionInfo> getMetaRegionSet() {
442    return META_REGION_SET;
443  }
444
445  // ============================================================================================
446  //  META Event(s) helpers
447  // ============================================================================================
448  /**
449   * Notice that, this only means the meta region is available on a RS, but the AM may still be
450   * loading the region states from meta, so usually you need to check {@link #isMetaLoaded()} first
451   * before checking this method, unless you can make sure that your piece of code can only be
452   * executed after AM builds the region states.
453   * @see #isMetaLoaded()
454   */
455  public boolean isMetaAssigned() {
456    return metaAssignEvent.isReady();
457  }
458
459  public boolean isMetaRegionInTransition() {
460    return !isMetaAssigned();
461  }
462
463  /**
464   * Notice that this event does not mean the AM has already finished region state rebuilding. See
465   * the comment of {@link #isMetaAssigned()} for more details.
466   * @see #isMetaAssigned()
467   */
468  public boolean waitMetaAssigned(Procedure<?> proc, RegionInfo regionInfo) {
469    return getMetaAssignEvent(getMetaForRegion(regionInfo)).suspendIfNotReady(proc);
470  }
471
472  private void setMetaAssigned(RegionInfo metaRegionInfo, boolean assigned) {
473    assert isMetaRegion(metaRegionInfo) : "unexpected non-meta region " + metaRegionInfo;
474    ProcedureEvent<?> metaAssignEvent = getMetaAssignEvent(metaRegionInfo);
475    if (assigned) {
476      metaAssignEvent.wake(getProcedureScheduler());
477    } else {
478      metaAssignEvent.suspend();
479    }
480  }
481
482  private ProcedureEvent<?> getMetaAssignEvent(RegionInfo metaRegionInfo) {
483    assert isMetaRegion(metaRegionInfo) : "unexpected non-meta region " + metaRegionInfo;
484    // TODO: handle multiple meta.
485    return metaAssignEvent;
486  }
487
488  /**
489   * Wait until AM finishes the meta loading, i.e, the region states rebuilding.
490   * @see #isMetaLoaded()
491   * @see #waitMetaAssigned(Procedure, RegionInfo)
492   */
493  public boolean waitMetaLoaded(Procedure<?> proc) {
494    return metaLoadEvent.suspendIfNotReady(proc);
495  }
496
497  @VisibleForTesting
498  void wakeMetaLoadedEvent() {
499    metaLoadEvent.wake(getProcedureScheduler());
500    assert isMetaLoaded() : "expected meta to be loaded";
501  }
502
503  /**
504   * Return whether AM finishes the meta loading, i.e, the region states rebuilding.
505   * @see #isMetaAssigned()
506   * @see #waitMetaLoaded(Procedure)
507   */
508  public boolean isMetaLoaded() {
509    return metaLoadEvent.isReady();
510  }
511
512  /**
513   * Start a new thread to check if there are region servers whose versions are higher than others.
514   * If so, move all system table regions to RS with the highest version to keep compatibility.
515   * The reason is, RS in new version may not be able to access RS in old version when there are
516   * some incompatible changes.
517   * <p>This method is called when a new RegionServer is added to cluster only.</p>
518   */
519  public void checkIfShouldMoveSystemRegionAsync() {
520    // TODO: Fix this thread. If a server is killed and a new one started, this thread thinks that
521    // it should 'move' the system tables from the old server to the new server but
522    // ServerCrashProcedure is on it; and it will take care of the assign without dataloss.
523    if (this.master.getServerManager().countOfRegionServers() <= 1) {
524      return;
525    }
526    // This thread used to run whenever there was a change in the cluster. The ZooKeeper
527    // childrenChanged notification came in before the nodeDeleted message and so this method
528    // cold run before a ServerCrashProcedure could run. That meant that this thread could see
529    // a Crashed Server before ServerCrashProcedure and it could find system regions on the
530    // crashed server and go move them before ServerCrashProcedure had a chance; could be
531    // dataloss too if WALs were not recovered.
532    new Thread(() -> {
533      try {
534        synchronized (checkIfShouldMoveSystemRegionLock) {
535          List<RegionPlan> plans = new ArrayList<>();
536          // TODO: I don't think this code does a good job if all servers in cluster have same
537          // version. It looks like it will schedule unnecessary moves.
538          for (ServerName server : getExcludedServersForSystemTable()) {
539            if (master.getServerManager().isServerDead(server)) {
540              // TODO: See HBASE-18494 and HBASE-18495. Though getExcludedServersForSystemTable()
541              // considers only online servers, the server could be queued for dead server
542              // processing. As region assignments for crashed server is handled by
543              // ServerCrashProcedure, do NOT handle them here. The goal is to handle this through
544              // regular flow of LoadBalancer as a favored node and not to have this special
545              // handling.
546              continue;
547            }
548            List<RegionInfo> regionsShouldMove = getSystemTables(server);
549            if (!regionsShouldMove.isEmpty()) {
550              for (RegionInfo regionInfo : regionsShouldMove) {
551                // null value for dest forces destination server to be selected by balancer
552                RegionPlan plan = new RegionPlan(regionInfo, server, null);
553                if (regionInfo.isMetaRegion()) {
554                  // Must move meta region first.
555                  LOG.info("Async MOVE of {} to newer Server={}",
556                      regionInfo.getEncodedName(), server);
557                  moveAsync(plan);
558                } else {
559                  plans.add(plan);
560                }
561              }
562            }
563            for (RegionPlan plan : plans) {
564              LOG.info("Async MOVE of {} to newer Server={}",
565                  plan.getRegionInfo().getEncodedName(), server);
566              moveAsync(plan);
567            }
568          }
569        }
570      } catch (Throwable t) {
571        LOG.error(t.toString(), t);
572      }
573    }).start();
574  }
575
576  private List<RegionInfo> getSystemTables(ServerName serverName) {
577    ServerStateNode serverNode = regionStates.getServerNode(serverName);
578    if (serverNode == null) {
579      return Collections.emptyList();
580    }
581    return serverNode.getSystemRegionInfoList();
582  }
583
584  private void preTransitCheck(RegionStateNode regionNode, RegionState.State[] expectedStates)
585      throws HBaseIOException {
586    if (regionNode.getProcedure() != null) {
587      throw new HBaseIOException(regionNode + " is currently in transition");
588    }
589    if (!regionNode.isInState(expectedStates)) {
590      throw new DoNotRetryRegionException("Unexpected state for " + regionNode);
591    }
592    if (isTableDisabled(regionNode.getTable())) {
593      throw new DoNotRetryIOException(regionNode.getTable() + " is disabled for " + regionNode);
594    }
595  }
596
597  // TODO: Need an async version of this for hbck2.
598  public long assign(RegionInfo regionInfo, ServerName sn) throws IOException {
599    // TODO: should we use getRegionStateNode?
600    RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo);
601    TransitRegionStateProcedure proc;
602    regionNode.lock();
603    try {
604      preTransitCheck(regionNode, STATES_EXPECTED_ON_ASSIGN);
605      proc = TransitRegionStateProcedure.assign(getProcedureEnvironment(), regionInfo, sn);
606      regionNode.setProcedure(proc);
607    } finally {
608      regionNode.unlock();
609    }
610    ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc);
611    return proc.getProcId();
612  }
613
614  public long assign(RegionInfo regionInfo) throws IOException {
615    return assign(regionInfo, null);
616  }
617
618  public long unassign(RegionInfo regionInfo) throws IOException {
619    RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo);
620    if (regionNode == null) {
621      throw new UnknownRegionException("No RegionState found for " + regionInfo.getEncodedName());
622    }
623    TransitRegionStateProcedure proc;
624    regionNode.lock();
625    try {
626      preTransitCheck(regionNode, STATES_EXPECTED_ON_UNASSIGN_OR_MOVE);
627      proc = TransitRegionStateProcedure.unassign(getProcedureEnvironment(), regionInfo);
628      regionNode.setProcedure(proc);
629    } finally {
630      regionNode.unlock();
631    }
632    ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc);
633    return proc.getProcId();
634  }
635
636  public TransitRegionStateProcedure createMoveRegionProcedure(RegionInfo regionInfo,
637      ServerName targetServer) throws HBaseIOException {
638    RegionStateNode regionNode = this.regionStates.getRegionStateNode(regionInfo);
639    if (regionNode == null) {
640      throw new UnknownRegionException("No RegionStateNode found for " +
641          regionInfo.getEncodedName() + "(Closed/Deleted?)");
642    }
643    TransitRegionStateProcedure proc;
644    regionNode.lock();
645    try {
646      preTransitCheck(regionNode, STATES_EXPECTED_ON_UNASSIGN_OR_MOVE);
647      regionNode.checkOnline();
648      proc = TransitRegionStateProcedure.move(getProcedureEnvironment(), regionInfo, targetServer);
649      regionNode.setProcedure(proc);
650    } finally {
651      regionNode.unlock();
652    }
653    return proc;
654  }
655
656  public void move(RegionInfo regionInfo) throws IOException {
657    TransitRegionStateProcedure proc = createMoveRegionProcedure(regionInfo, null);
658    ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc);
659  }
660
661  public Future<byte[]> moveAsync(RegionPlan regionPlan) throws HBaseIOException {
662    TransitRegionStateProcedure proc =
663      createMoveRegionProcedure(regionPlan.getRegionInfo(), regionPlan.getDestination());
664    return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), proc);
665  }
666
667  // ============================================================================================
668  //  RegionTransition procedures helpers
669  // ============================================================================================
670
671  /**
672   * Create round-robin assigns. Use on table creation to distribute out regions across cluster.
673   * @return AssignProcedures made out of the passed in <code>hris</code> and a call to the balancer
674   *         to populate the assigns with targets chosen using round-robin (default balancer
675   *         scheme). If at assign-time, the target chosen is no longer up, thats fine, the
676   *         AssignProcedure will ask the balancer for a new target, and so on.
677   */
678  public TransitRegionStateProcedure[] createRoundRobinAssignProcedures(List<RegionInfo> hris,
679      List<ServerName> serversToExclude) {
680    if (hris.isEmpty()) {
681      return new TransitRegionStateProcedure[0];
682    }
683
684    if (serversToExclude != null
685        && this.master.getServerManager().getOnlineServersList().size() == 1) {
686      LOG.debug("Only one region server found and hence going ahead with the assignment");
687      serversToExclude = null;
688    }
689    try {
690      // Ask the balancer to assign our regions. Pass the regions en masse. The balancer can do
691      // a better job if it has all the assignments in the one lump.
692      Map<ServerName, List<RegionInfo>> assignments = getBalancer().roundRobinAssignment(hris,
693        this.master.getServerManager().createDestinationServersList(serversToExclude));
694      // Return mid-method!
695      return createAssignProcedures(assignments);
696    } catch (IOException hioe) {
697      LOG.warn("Failed roundRobinAssignment", hioe);
698    }
699    // If an error above, fall-through to this simpler assign. Last resort.
700    return createAssignProcedures(hris);
701  }
702
703  /**
704   * Create round-robin assigns. Use on table creation to distribute out regions across cluster.
705   * @return AssignProcedures made out of the passed in <code>hris</code> and a call to the balancer
706   *         to populate the assigns with targets chosen using round-robin (default balancer
707   *         scheme). If at assign-time, the target chosen is no longer up, thats fine, the
708   *         AssignProcedure will ask the balancer for a new target, and so on.
709   */
710  public TransitRegionStateProcedure[] createRoundRobinAssignProcedures(List<RegionInfo> hris) {
711    return createRoundRobinAssignProcedures(hris, null);
712  }
713
714  @VisibleForTesting
715  static int compare(TransitRegionStateProcedure left, TransitRegionStateProcedure right) {
716    if (left.getRegion().isMetaRegion()) {
717      if (right.getRegion().isMetaRegion()) {
718        return RegionInfo.COMPARATOR.compare(left.getRegion(), right.getRegion());
719      }
720      return -1;
721    } else if (right.getRegion().isMetaRegion()) {
722      return +1;
723    }
724    if (left.getRegion().getTable().isSystemTable()) {
725      if (right.getRegion().getTable().isSystemTable()) {
726        return RegionInfo.COMPARATOR.compare(left.getRegion(), right.getRegion());
727      }
728      return -1;
729    } else if (right.getRegion().getTable().isSystemTable()) {
730      return +1;
731    }
732    return RegionInfo.COMPARATOR.compare(left.getRegion(), right.getRegion());
733  }
734
735  private TransitRegionStateProcedure createAssignProcedure(RegionStateNode regionNode,
736      ServerName targetServer, boolean override) {
737    TransitRegionStateProcedure proc;
738    regionNode.lock();
739    try {
740      if(override && regionNode.getProcedure() != null) {
741        regionNode.unsetProcedure(regionNode.getProcedure());
742      }
743      assert regionNode.getProcedure() == null;
744      proc = TransitRegionStateProcedure.assign(getProcedureEnvironment(),
745        regionNode.getRegionInfo(), targetServer);
746      regionNode.setProcedure(proc);
747    } finally {
748      regionNode.unlock();
749    }
750    return proc;
751  }
752
753  private TransitRegionStateProcedure createUnassignProcedure(RegionStateNode regionNode,
754      boolean override) {
755    TransitRegionStateProcedure proc;
756    regionNode.lock();
757    try {
758      if(override && regionNode.getProcedure() != null) {
759        regionNode.unsetProcedure(regionNode.getProcedure());
760      }
761      assert regionNode.getProcedure() == null;
762      proc = TransitRegionStateProcedure.unassign(getProcedureEnvironment(),
763          regionNode.getRegionInfo());
764      regionNode.setProcedure(proc);
765    } finally {
766      regionNode.unlock();
767    }
768    return proc;
769  }
770
771  /**
772   * Create one TransitRegionStateProcedure to assign a region w/o specifying a target server.
773   * This method is specified for HBCK2
774   */
775  public TransitRegionStateProcedure createOneAssignProcedure(RegionInfo hri, boolean override) {
776    RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(hri);
777    return createAssignProcedure(regionNode, null, override);
778  }
779
780  /**
781   * Create one TransitRegionStateProcedure to unassign a region.
782   * This method is specified for HBCK2
783   */
784  public TransitRegionStateProcedure createOneUnassignProcedure(RegionInfo hri, boolean override) {
785    RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(hri);
786    return createUnassignProcedure(regionNode, override);
787  }
788
789  /**
790   * Create an array of TransitRegionStateProcedure w/o specifying a target server.
791   * <p/>
792   * If no target server, at assign time, we will try to use the former location of the region if
793   * one exists. This is how we 'retain' the old location across a server restart.
794   * <p/>
795   * Should only be called when you can make sure that no one can touch these regions other than
796   * you. For example, when you are creating table.
797   */
798  public TransitRegionStateProcedure[] createAssignProcedures(List<RegionInfo> hris) {
799    return hris.stream().map(hri -> regionStates.getOrCreateRegionStateNode(hri))
800        .map(regionNode -> createAssignProcedure(regionNode, null, false))
801        .sorted(AssignmentManager::compare).toArray(TransitRegionStateProcedure[]::new);
802  }
803
804  /**
805   * @param assignments Map of assignments from which we produce an array of AssignProcedures.
806   * @return Assignments made from the passed in <code>assignments</code>
807   */
808  private TransitRegionStateProcedure[] createAssignProcedures(
809      Map<ServerName, List<RegionInfo>> assignments) {
810    return assignments.entrySet().stream()
811      .flatMap(e -> e.getValue().stream().map(hri -> regionStates.getOrCreateRegionStateNode(hri))
812        .map(regionNode -> createAssignProcedure(regionNode, e.getKey(), false)))
813      .sorted(AssignmentManager::compare).toArray(TransitRegionStateProcedure[]::new);
814  }
815
816  /**
817   * Called by DisableTableProcedure to unassign all the regions for a table.
818   */
819  public TransitRegionStateProcedure[] createUnassignProceduresForDisabling(TableName tableName) {
820    return regionStates.getTableRegionStateNodes(tableName).stream().map(regionNode -> {
821      regionNode.lock();
822      try {
823        if (!regionStates.include(regionNode, false) ||
824          regionStates.isRegionOffline(regionNode.getRegionInfo())) {
825          return null;
826        }
827        // As in DisableTableProcedure, we will hold the xlock for table, so we can make sure that
828        // this procedure has not been executed yet, as TRSP will hold the shared lock for table all
829        // the time. So here we will unset it and when it is actually executed, it will find that
830        // the attach procedure is not itself and quit immediately.
831        if (regionNode.getProcedure() != null) {
832          regionNode.unsetProcedure(regionNode.getProcedure());
833        }
834        TransitRegionStateProcedure proc = TransitRegionStateProcedure
835          .unassign(getProcedureEnvironment(), regionNode.getRegionInfo());
836        regionNode.setProcedure(proc);
837        return proc;
838      } finally {
839        regionNode.unlock();
840      }
841    }).filter(p -> p != null).toArray(TransitRegionStateProcedure[]::new);
842  }
843
844  public SplitTableRegionProcedure createSplitProcedure(final RegionInfo regionToSplit,
845      final byte[] splitKey) throws IOException {
846    return new SplitTableRegionProcedure(getProcedureEnvironment(), regionToSplit, splitKey);
847  }
848
849  public MergeTableRegionsProcedure createMergeProcedure(RegionInfo ... ris) throws IOException {
850    return new MergeTableRegionsProcedure(getProcedureEnvironment(), ris, false);
851  }
852
853  /**
854   * Delete the region states. This is called by "DeleteTable"
855   */
856  public void deleteTable(final TableName tableName) throws IOException {
857    final ArrayList<RegionInfo> regions = regionStates.getTableRegionsInfo(tableName);
858    regionStateStore.deleteRegions(regions);
859    for (int i = 0; i < regions.size(); ++i) {
860      final RegionInfo regionInfo = regions.get(i);
861      // we expect the region to be offline
862      regionStates.removeFromOfflineRegions(regionInfo);
863      regionStates.deleteRegion(regionInfo);
864    }
865  }
866
867  // ============================================================================================
868  //  RS Region Transition Report helpers
869  // ============================================================================================
870  private void reportRegionStateTransition(ReportRegionStateTransitionResponse.Builder builder,
871      ServerName serverName, List<RegionStateTransition> transitionList) throws IOException {
872    for (RegionStateTransition transition : transitionList) {
873      switch (transition.getTransitionCode()) {
874        case OPENED:
875        case FAILED_OPEN:
876        case CLOSED:
877          assert transition.getRegionInfoCount() == 1 : transition;
878          final RegionInfo hri = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0));
879          long procId =
880            transition.getProcIdCount() > 0 ? transition.getProcId(0) : Procedure.NO_PROC_ID;
881          updateRegionTransition(serverName, transition.getTransitionCode(), hri,
882            transition.hasOpenSeqNum() ? transition.getOpenSeqNum() : HConstants.NO_SEQNUM, procId);
883          break;
884        case READY_TO_SPLIT:
885        case SPLIT:
886        case SPLIT_REVERTED:
887          assert transition.getRegionInfoCount() == 3 : transition;
888          final RegionInfo parent = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0));
889          final RegionInfo splitA = ProtobufUtil.toRegionInfo(transition.getRegionInfo(1));
890          final RegionInfo splitB = ProtobufUtil.toRegionInfo(transition.getRegionInfo(2));
891          updateRegionSplitTransition(serverName, transition.getTransitionCode(), parent, splitA,
892            splitB);
893          break;
894        case READY_TO_MERGE:
895        case MERGED:
896        case MERGE_REVERTED:
897          assert transition.getRegionInfoCount() == 3 : transition;
898          final RegionInfo merged = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0));
899          final RegionInfo mergeA = ProtobufUtil.toRegionInfo(transition.getRegionInfo(1));
900          final RegionInfo mergeB = ProtobufUtil.toRegionInfo(transition.getRegionInfo(2));
901          updateRegionMergeTransition(serverName, transition.getTransitionCode(), merged, mergeA,
902            mergeB);
903          break;
904      }
905    }
906  }
907
908  public ReportRegionStateTransitionResponse reportRegionStateTransition(
909      final ReportRegionStateTransitionRequest req) throws PleaseHoldException {
910    ReportRegionStateTransitionResponse.Builder builder =
911        ReportRegionStateTransitionResponse.newBuilder();
912    ServerName serverName = ProtobufUtil.toServerName(req.getServer());
913    ServerStateNode serverNode = regionStates.getOrCreateServer(serverName);
914    // here we have to acquire a read lock instead of a simple exclusive lock. This is because that
915    // we should not block other reportRegionStateTransition call from the same region server. This
916    // is not only about performance, but also to prevent dead lock. Think of the meta region is
917    // also on the same region server and you hold the lock which blocks the
918    // reportRegionStateTransition for meta, and since meta is not online, you will block inside the
919    // lock protection to wait for meta online...
920    serverNode.readLock().lock();
921    try {
922      // we only accept reportRegionStateTransition if the region server is online, see the comment
923      // above in submitServerCrash method and HBASE-21508 for more details.
924      if (serverNode.isInState(ServerState.ONLINE)) {
925        try {
926          reportRegionStateTransition(builder, serverName, req.getTransitionList());
927        } catch (PleaseHoldException e) {
928          LOG.trace("Failed transition ", e);
929          throw e;
930        } catch (UnsupportedOperationException | IOException e) {
931          // TODO: at the moment we have a single error message and the RS will abort
932          // if the master says that one of the region transitions failed.
933          LOG.warn("Failed transition", e);
934          builder.setErrorMessage("Failed transition " + e.getMessage());
935        }
936      } else {
937        LOG.warn("The region server {} is already dead, skip reportRegionStateTransition call",
938          serverName);
939        builder.setErrorMessage("You are dead");
940      }
941    } finally {
942      serverNode.readLock().unlock();
943    }
944
945    return builder.build();
946  }
947
948  private void updateRegionTransition(ServerName serverName, TransitionCode state,
949      RegionInfo regionInfo, long seqId, long procId) throws IOException {
950    checkMetaLoaded(regionInfo);
951
952    RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo);
953    if (regionNode == null) {
954      // the table/region is gone. maybe a delete, split, merge
955      throw new UnexpectedStateException(String.format(
956        "Server %s was trying to transition region %s to %s. but Region is not known.",
957        serverName, regionInfo, state));
958    }
959    LOG.trace("Update region transition serverName={} region={} regionState={}", serverName,
960      regionNode, state);
961
962    ServerStateNode serverNode = regionStates.getOrCreateServer(serverName);
963    regionNode.lock();
964    try {
965      if (!reportTransition(regionNode, serverNode, state, seqId, procId)) {
966        // Don't log WARN if shutting down cluster; during shutdown. Avoid the below messages:
967        // 2018-08-13 10:45:10,551 WARN ...AssignmentManager: No matching procedure found for
968        // rit=OPEN, location=ve0538.halxg.cloudera.com,16020,1533493000958,
969        // table=IntegrationTestBigLinkedList, region=65ab289e2fc1530df65f6c3d7cde7aa5 transition
970        // to CLOSED
971        // These happen because on cluster shutdown, we currently let the RegionServers close
972        // regions. This is the only time that region close is not run by the Master (so cluster
973        // goes down fast). Consider changing it so Master runs all shutdowns.
974        if (this.master.getServerManager().isClusterShutdown() &&
975          state.equals(TransitionCode.CLOSED)) {
976          LOG.info("RegionServer {} {}", state, regionNode.getRegionInfo().getEncodedName());
977        } else {
978          LOG.warn("No matching procedure found for {} transition on {} to {}",
979              serverName, regionNode, state);
980        }
981      }
982    } finally {
983      regionNode.unlock();
984    }
985  }
986
987  private boolean reportTransition(RegionStateNode regionNode, ServerStateNode serverNode,
988      TransitionCode state, long seqId, long procId) throws IOException {
989    ServerName serverName = serverNode.getServerName();
990    TransitRegionStateProcedure proc = regionNode.getProcedure();
991    if (proc == null) {
992      return false;
993    }
994    proc.reportTransition(master.getMasterProcedureExecutor().getEnvironment(), regionNode,
995      serverName, state, seqId, procId);
996    return true;
997  }
998
999  private void updateRegionSplitTransition(final ServerName serverName, final TransitionCode state,
1000      final RegionInfo parent, final RegionInfo hriA, final RegionInfo hriB)
1001      throws IOException {
1002    checkMetaLoaded(parent);
1003
1004    if (state != TransitionCode.READY_TO_SPLIT) {
1005      throw new UnexpectedStateException("unsupported split regionState=" + state +
1006        " for parent region " + parent +
1007        " maybe an old RS (< 2.0) had the operation in progress");
1008    }
1009
1010    // sanity check on the request
1011    if (!Bytes.equals(hriA.getEndKey(), hriB.getStartKey())) {
1012      throw new UnsupportedOperationException(
1013        "unsupported split request with bad keys: parent=" + parent +
1014        " hriA=" + hriA + " hriB=" + hriB);
1015    }
1016
1017    if (!master.isSplitOrMergeEnabled(MasterSwitchType.SPLIT)) {
1018      LOG.warn("Split switch is off! skip split of " + parent);
1019      throw new DoNotRetryIOException("Split region " + parent.getRegionNameAsString() +
1020          " failed due to split switch off");
1021    }
1022
1023    // Submit the Split procedure
1024    final byte[] splitKey = hriB.getStartKey();
1025    if (LOG.isDebugEnabled()) {
1026      LOG.debug("Split request from " + serverName +
1027          ", parent=" + parent + " splitKey=" + Bytes.toStringBinary(splitKey));
1028    }
1029    master.getMasterProcedureExecutor().submitProcedure(createSplitProcedure(parent, splitKey));
1030
1031    // If the RS is < 2.0 throw an exception to abort the operation, we are handling the split
1032    if (master.getServerManager().getVersionNumber(serverName) < 0x0200000) {
1033      throw new UnsupportedOperationException(String.format(
1034        "Split handled by the master: parent=%s hriA=%s hriB=%s", parent.getShortNameToLog(), hriA, hriB));
1035    }
1036  }
1037
1038  private void updateRegionMergeTransition(final ServerName serverName, final TransitionCode state,
1039      final RegionInfo merged, final RegionInfo hriA, final RegionInfo hriB) throws IOException {
1040    checkMetaLoaded(merged);
1041
1042    if (state != TransitionCode.READY_TO_MERGE) {
1043      throw new UnexpectedStateException("Unsupported merge regionState=" + state +
1044        " for regionA=" + hriA + " regionB=" + hriB + " merged=" + merged +
1045        " maybe an old RS (< 2.0) had the operation in progress");
1046    }
1047
1048    if (!master.isSplitOrMergeEnabled(MasterSwitchType.MERGE)) {
1049      LOG.warn("Merge switch is off! skip merge of regionA=" + hriA + " regionB=" + hriB);
1050      throw new DoNotRetryIOException("Merge of regionA=" + hriA + " regionB=" + hriB +
1051        " failed because merge switch is off");
1052    }
1053
1054    // Submit the Merge procedure
1055    if (LOG.isDebugEnabled()) {
1056      LOG.debug("Handling merge request from RS=" + merged + ", merged=" + merged);
1057    }
1058    master.getMasterProcedureExecutor().submitProcedure(createMergeProcedure(hriA, hriB));
1059
1060    // If the RS is < 2.0 throw an exception to abort the operation, we are handling the merge
1061    if (master.getServerManager().getVersionNumber(serverName) < 0x0200000) {
1062      throw new UnsupportedOperationException(String.format(
1063        "Merge not handled yet: regionState=%s merged=%s hriA=%s hriB=%s", state, merged, hriA,
1064          hriB));
1065    }
1066  }
1067
1068  // ============================================================================================
1069  //  RS Status update (report online regions) helpers
1070  // ============================================================================================
1071  /**
1072   * The master will call this method when the RS send the regionServerReport(). The report will
1073   * contains the "online regions". This method will check the the online regions against the
1074   * in-memory state of the AM, and we will log a warn message if there is a mismatch. This is
1075   * because that there is no fencing between the reportRegionStateTransition method and
1076   * regionServerReport method, so there could be race and introduce inconsistency here, but
1077   * actually there is no problem.
1078   * <p/>
1079   * Please see HBASE-21421 and HBASE-21463 for more details.
1080   */
1081  public void reportOnlineRegions(ServerName serverName, Set<byte[]> regionNames) {
1082    if (!isRunning()) {
1083      return;
1084    }
1085    if (LOG.isTraceEnabled()) {
1086      LOG.trace("ReportOnlineRegions {} regionCount={}, metaLoaded={} {}", serverName,
1087        regionNames.size(), isMetaLoaded(),
1088        regionNames.stream().map(Bytes::toStringBinary).collect(Collectors.toList()));
1089    }
1090
1091    ServerStateNode serverNode = regionStates.getOrCreateServer(serverName);
1092    synchronized (serverNode) {
1093      if (!serverNode.isInState(ServerState.ONLINE)) {
1094        LOG.warn("Got a report from a server result in state " + serverNode.getState());
1095        return;
1096      }
1097    }
1098
1099    // Track the regionserver reported online regions in memory.
1100    synchronized (rsReports) {
1101      rsReports.put(serverName, regionNames);
1102    }
1103
1104    if (regionNames.isEmpty()) {
1105      // nothing to do if we don't have regions
1106      LOG.trace("no online region found on {}", serverName);
1107      return;
1108    }
1109    if (!isMetaLoaded()) {
1110      // we are still on startup, skip checking
1111      return;
1112    }
1113    // The Heartbeat tells us of what regions are on the region serve, check the state.
1114    checkOnlineRegionsReport(serverNode, regionNames);
1115  }
1116
1117  /**
1118   * Close <code>regionName</code> on <code>sn</code> silently and immediately without
1119   * using a Procedure or going via hbase:meta. For case where a RegionServer's hosting
1120   * of a Region is not aligned w/ the Master's accounting of Region state. This is for
1121   * cleaning up an error in accounting.
1122   */
1123  private void closeRegionSilently(ServerName sn, byte [] regionName) {
1124    try {
1125      RegionInfo ri = MetaTableAccessor.parseRegionInfoFromRegionName(regionName);
1126      // Pass -1 for timeout. Means do not wait.
1127      ServerManager.closeRegionSilentlyAndWait(this.master.getAsyncClusterConnection(), sn, ri, -1);
1128    } catch (Exception e) {
1129      LOG.error("Failed trying to close {} on {}", Bytes.toStringBinary(regionName), sn, e);
1130    }
1131  }
1132
1133  /**
1134   * Check that what the RegionServer reports aligns with the Master's image.
1135   * If disagreement, we will tell the RegionServer to expediently close
1136   * a Region we do not think it should have.
1137   */
1138  private void checkOnlineRegionsReport(ServerStateNode serverNode, Set<byte[]> regionNames) {
1139    ServerName serverName = serverNode.getServerName();
1140    for (byte[] regionName : regionNames) {
1141      if (!isRunning()) {
1142        return;
1143      }
1144      RegionStateNode regionNode = regionStates.getRegionStateNodeFromName(regionName);
1145      if (regionNode == null) {
1146        String regionNameAsStr = Bytes.toStringBinary(regionName);
1147        LOG.warn("No RegionStateNode for {} but reported as up on {}; closing...",
1148            regionNameAsStr, serverName);
1149        closeRegionSilently(serverNode.getServerName(), regionName);
1150        continue;
1151      }
1152      final long lag = 1000;
1153      regionNode.lock();
1154      try {
1155        long diff = EnvironmentEdgeManager.currentTime() - regionNode.getLastUpdate();
1156        if (regionNode.isInState(State.OPENING, State.OPEN)) {
1157          // This is possible as a region server has just closed a region but the region server
1158          // report is generated before the closing, but arrive after the closing. Make sure there
1159          // is some elapsed time so less false alarms.
1160          if (!regionNode.getRegionLocation().equals(serverName) && diff > lag) {
1161            LOG.warn("Reporting {} server does not match {} (time since last " +
1162                    "update={}ms); closing...",
1163              serverName, regionNode, diff);
1164            closeRegionSilently(serverNode.getServerName(), regionName);
1165          }
1166        } else if (!regionNode.isInState(State.CLOSING, State.SPLITTING)) {
1167          // So, we can get report that a region is CLOSED or SPLIT because a heartbeat
1168          // came in at about same time as a region transition. Make sure there is some
1169          // elapsed time so less false alarms.
1170          if (diff > lag) {
1171            LOG.warn("Reporting {} state does not match {} (time since last update={}ms)",
1172              serverName, regionNode, diff);
1173          }
1174        }
1175      } finally {
1176        regionNode.unlock();
1177      }
1178    }
1179  }
1180
1181  // ============================================================================================
1182  //  RIT chore
1183  // ============================================================================================
1184  private static class RegionInTransitionChore extends ProcedureInMemoryChore<MasterProcedureEnv> {
1185    public RegionInTransitionChore(final int timeoutMsec) {
1186      super(timeoutMsec);
1187    }
1188
1189    @Override
1190    protected void periodicExecute(final MasterProcedureEnv env) {
1191      final AssignmentManager am = env.getAssignmentManager();
1192
1193      final RegionInTransitionStat ritStat = am.computeRegionInTransitionStat();
1194      if (ritStat.hasRegionsOverThreshold()) {
1195        for (RegionState hri: ritStat.getRegionOverThreshold()) {
1196          am.handleRegionOverStuckWarningThreshold(hri.getRegion());
1197        }
1198      }
1199
1200      // update metrics
1201      am.updateRegionsInTransitionMetrics(ritStat);
1202    }
1203  }
1204
1205  private static class DeadServerMetricRegionChore
1206      extends ProcedureInMemoryChore<MasterProcedureEnv> {
1207    public DeadServerMetricRegionChore(final int timeoutMsec) {
1208      super(timeoutMsec);
1209    }
1210
1211    @Override
1212    protected void periodicExecute(final MasterProcedureEnv env) {
1213      final ServerManager sm = env.getMasterServices().getServerManager();
1214      final AssignmentManager am = env.getAssignmentManager();
1215      // To minimize inconsistencies we are not going to snapshot live servers in advance in case
1216      // new servers are added; OTOH we don't want to add heavy sync for a consistent view since
1217      // this is for metrics. Instead, we're going to check each regions as we go; to avoid making
1218      // too many checks, we maintain a local lists of server, limiting us to false negatives. If
1219      // we miss some recently-dead server, we'll just see it next time.
1220      Set<ServerName> recentlyLiveServers = new HashSet<>();
1221      int deadRegions = 0, unknownRegions = 0;
1222      for (RegionStateNode rsn : am.getRegionStates().getRegionStateNodes()) {
1223        if (rsn.getState() != State.OPEN) {
1224          continue; // Opportunistic check, should quickly skip RITs, offline tables, etc.
1225        }
1226        // Do not need to acquire region state lock as this is only for showing metrics.
1227        ServerName sn = rsn.getRegionLocation();
1228        State state = rsn.getState();
1229        if (state != State.OPEN) {
1230          continue; // Mostly skipping RITs that are already being take care of.
1231        }
1232        if (sn == null) {
1233          ++unknownRegions; // Opened on null?
1234          continue;
1235        }
1236        if (recentlyLiveServers.contains(sn)) {
1237          continue;
1238        }
1239        ServerManager.ServerLiveState sls = sm.isServerKnownAndOnline(sn);
1240        switch (sls) {
1241          case LIVE:
1242            recentlyLiveServers.add(sn);
1243            break;
1244          case DEAD:
1245            ++deadRegions;
1246            break;
1247          case UNKNOWN:
1248            ++unknownRegions;
1249            break;
1250          default: throw new AssertionError("Unexpected " + sls);
1251        }
1252      }
1253      if (deadRegions > 0 || unknownRegions > 0) {
1254        LOG.info("Found {} OPEN regions on dead servers and {} OPEN regions on unknown servers",
1255          deadRegions, unknownRegions);
1256      }
1257
1258      am.updateDeadServerRegionMetrics(deadRegions, unknownRegions);
1259    }
1260  }
1261
1262  public RegionInTransitionStat computeRegionInTransitionStat() {
1263    final RegionInTransitionStat rit = new RegionInTransitionStat(getConfiguration());
1264    rit.update(this);
1265    return rit;
1266  }
1267
1268  public static class RegionInTransitionStat {
1269    private final int ritThreshold;
1270
1271    private HashMap<String, RegionState> ritsOverThreshold = null;
1272    private long statTimestamp;
1273    private long oldestRITTime = 0;
1274    private int totalRITsTwiceThreshold = 0;
1275    private int totalRITs = 0;
1276
1277    @VisibleForTesting
1278    public RegionInTransitionStat(final Configuration conf) {
1279      this.ritThreshold =
1280        conf.getInt(METRICS_RIT_STUCK_WARNING_THRESHOLD, DEFAULT_RIT_STUCK_WARNING_THRESHOLD);
1281    }
1282
1283    public int getRITThreshold() {
1284      return ritThreshold;
1285    }
1286
1287    public long getTimestamp() {
1288      return statTimestamp;
1289    }
1290
1291    public int getTotalRITs() {
1292      return totalRITs;
1293    }
1294
1295    public long getOldestRITTime() {
1296      return oldestRITTime;
1297    }
1298
1299    public int getTotalRITsOverThreshold() {
1300      Map<String, RegionState> m = this.ritsOverThreshold;
1301      return m != null ? m.size() : 0;
1302    }
1303
1304    public boolean hasRegionsTwiceOverThreshold() {
1305      return totalRITsTwiceThreshold > 0;
1306    }
1307
1308    public boolean hasRegionsOverThreshold() {
1309      Map<String, RegionState> m = this.ritsOverThreshold;
1310      return m != null && !m.isEmpty();
1311    }
1312
1313    public Collection<RegionState> getRegionOverThreshold() {
1314      Map<String, RegionState> m = this.ritsOverThreshold;
1315      return m != null? m.values(): Collections.emptySet();
1316    }
1317
1318    public boolean isRegionOverThreshold(final RegionInfo regionInfo) {
1319      Map<String, RegionState> m = this.ritsOverThreshold;
1320      return m != null && m.containsKey(regionInfo.getEncodedName());
1321    }
1322
1323    public boolean isRegionTwiceOverThreshold(final RegionInfo regionInfo) {
1324      Map<String, RegionState> m = this.ritsOverThreshold;
1325      if (m == null) return false;
1326      final RegionState state = m.get(regionInfo.getEncodedName());
1327      if (state == null) return false;
1328      return (statTimestamp - state.getStamp()) > (ritThreshold * 2);
1329    }
1330
1331    protected void update(final AssignmentManager am) {
1332      final RegionStates regionStates = am.getRegionStates();
1333      this.statTimestamp = EnvironmentEdgeManager.currentTime();
1334      update(regionStates.getRegionsStateInTransition(), statTimestamp);
1335      update(regionStates.getRegionFailedOpen(), statTimestamp);
1336    }
1337
1338    private void update(final Collection<RegionState> regions, final long currentTime) {
1339      for (RegionState state: regions) {
1340        totalRITs++;
1341        final long ritStartedMs = state.getStamp();
1342        if (ritStartedMs == 0) {
1343          // Don't output bogus values to metrics if they accidentally make it here.
1344          LOG.warn("The RIT {} has no start time", state.getRegion());
1345          continue;
1346        }
1347        final long ritTime = currentTime - ritStartedMs;
1348        if (ritTime > ritThreshold) {
1349          if (ritsOverThreshold == null) {
1350            ritsOverThreshold = new HashMap<String, RegionState>();
1351          }
1352          ritsOverThreshold.put(state.getRegion().getEncodedName(), state);
1353          totalRITsTwiceThreshold += (ritTime > (ritThreshold * 2)) ? 1 : 0;
1354        }
1355        if (oldestRITTime < ritTime) {
1356          oldestRITTime = ritTime;
1357        }
1358      }
1359    }
1360  }
1361
1362  private void updateRegionsInTransitionMetrics(final RegionInTransitionStat ritStat) {
1363    metrics.updateRITOldestAge(ritStat.getOldestRITTime());
1364    metrics.updateRITCount(ritStat.getTotalRITs());
1365    metrics.updateRITCountOverThreshold(ritStat.getTotalRITsOverThreshold());
1366  }
1367
1368  private void updateDeadServerRegionMetrics(int deadRegions, int unknownRegions) {
1369    metrics.updateDeadServerOpenRegions(deadRegions);
1370    metrics.updateUnknownServerOpenRegions(unknownRegions);
1371  }
1372
1373  private void handleRegionOverStuckWarningThreshold(final RegionInfo regionInfo) {
1374    final RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo);
1375    //if (regionNode.isStuck()) {
1376    LOG.warn("STUCK Region-In-Transition {}", regionNode);
1377  }
1378
1379  // ============================================================================================
1380  //  TODO: Master load/bootstrap
1381  // ============================================================================================
1382  public void joinCluster() throws IOException {
1383    long startTime = System.nanoTime();
1384    LOG.debug("Joining cluster...");
1385
1386    // Scan hbase:meta to build list of existing regions, servers, and assignment.
1387    // hbase:meta is online now or will be. Inside loadMeta, we keep trying. Can't make progress
1388    // w/o  meta.
1389    loadMeta();
1390
1391    while (master.getServerManager().countOfRegionServers() < 1) {
1392      LOG.info("Waiting for RegionServers to join; current count={}",
1393        master.getServerManager().countOfRegionServers());
1394      Threads.sleep(250);
1395    }
1396    LOG.info("Number of RegionServers={}", master.getServerManager().countOfRegionServers());
1397
1398    // Start the chores
1399    master.getMasterProcedureExecutor().addChore(this.ritChore);
1400    master.getMasterProcedureExecutor().addChore(this.deadMetricChore);
1401
1402    long costMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
1403    LOG.info("Joined the cluster in {}", StringUtils.humanTimeDiff(costMs));
1404  }
1405
1406  /**
1407   * Create assign procedure for offline regions.
1408   * Just follow the old processofflineServersWithOnlineRegions method. Since now we do not need to
1409   * deal with dead server any more, we only deal with the regions in OFFLINE state in this method.
1410   * And this is a bit strange, that for new regions, we will add it in CLOSED state instead of
1411   * OFFLINE state, and usually there will be a procedure to track them. The
1412   * processofflineServersWithOnlineRegions is a legacy from long ago, as things are going really
1413   * different now, maybe we do not need this method any more. Need to revisit later.
1414   */
1415  // Public so can be run by the Master as part of the startup. Needs hbase:meta to be online.
1416  // Needs to be done after the table state manager has been started.
1417  public void processOfflineRegions() {
1418    List<RegionInfo> offlineRegions = regionStates.getRegionStates().stream()
1419      .filter(RegionState::isOffline).filter(s -> isTableEnabled(s.getRegion().getTable()))
1420      .map(RegionState::getRegion).collect(Collectors.toList());
1421    if (!offlineRegions.isEmpty()) {
1422      master.getMasterProcedureExecutor().submitProcedures(
1423        master.getAssignmentManager().createRoundRobinAssignProcedures(offlineRegions));
1424    }
1425  }
1426
1427  /* AM internal RegionStateStore.RegionStateVisitor implementation. To be used when
1428   * scanning META table for region rows, using RegionStateStore utility methods. RegionStateStore
1429   * methods will convert Result into proper RegionInfo instances, but those would still need to be
1430   * added into AssignmentManager.regionStates in-memory cache.
1431   * RegionMetaLoadingVisitor.visitRegionState method provides the logic for adding RegionInfo
1432   * instances as loaded from latest META scan into AssignmentManager.regionStates.
1433   */
1434  private class RegionMetaLoadingVisitor implements RegionStateStore.RegionStateVisitor  {
1435
1436    @Override
1437    public void visitRegionState(Result result, final RegionInfo regionInfo, final State state,
1438      final ServerName regionLocation, final ServerName lastHost, final long openSeqNum) {
1439      if (state == null && regionLocation == null && lastHost == null &&
1440        openSeqNum == SequenceId.NO_SEQUENCE_ID) {
1441        // This is a row with nothing in it.
1442        LOG.warn("Skipping empty row={}", result);
1443        return;
1444      }
1445      State localState = state;
1446      if (localState == null) {
1447        // No region state column data in hbase:meta table! Are I doing a rolling upgrade from
1448        // hbase1 to hbase2? Am I restoring a SNAPSHOT or otherwise adding a region to hbase:meta?
1449        // In any of these cases, state is empty. For now, presume OFFLINE but there are probably
1450        // cases where we need to probe more to be sure this correct; TODO informed by experience.
1451        LOG.info(regionInfo.getEncodedName() + " regionState=null; presuming " + State.OFFLINE);
1452        localState = State.OFFLINE;
1453      }
1454      RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo);
1455      // Do not need to lock on regionNode, as we can make sure that before we finish loading
1456      // meta, all the related procedures can not be executed. The only exception is for meta
1457      // region related operations, but here we do not load the informations for meta region.
1458      regionNode.setState(localState);
1459      regionNode.setLastHost(lastHost);
1460      regionNode.setRegionLocation(regionLocation);
1461      regionNode.setOpenSeqNum(openSeqNum);
1462
1463      // Note: keep consistent with other methods, see region(Opening|Opened|Closing)
1464      //       RIT/ServerCrash handling should take care of the transiting regions.
1465      if (localState.matches(State.OPEN, State.OPENING, State.CLOSING, State.SPLITTING,
1466        State.MERGING)) {
1467        assert regionLocation != null : "found null region location for " + regionNode;
1468        regionStates.addRegionToServer(regionNode);
1469      } else if (localState == State.OFFLINE || regionInfo.isOffline()) {
1470        regionStates.addToOfflineRegions(regionNode);
1471      }
1472      if (regionNode.getProcedure() != null) {
1473        regionNode.getProcedure().stateLoaded(AssignmentManager.this, regionNode);
1474      }
1475    }
1476  };
1477
1478  /**
1479   * Query META if the given <code>RegionInfo</code> exists, adding to
1480   * <code>AssignmentManager.regionStateStore</code> cache if the region is found in META.
1481   * @param regionEncodedName encoded name for the region to be loaded from META into
1482   *                          <code>AssignmentManager.regionStateStore</code> cache
1483   * @return <code>RegionInfo</code> instance for the given region if it is present in META
1484   *          and got successfully loaded into <code>AssignmentManager.regionStateStore</code>
1485   *          cache, <b>null</b> otherwise.
1486   * @throws UnknownRegionException if any errors occur while querying meta.
1487   */
1488  public RegionInfo loadRegionFromMeta(String regionEncodedName) throws UnknownRegionException {
1489    try {
1490      RegionMetaLoadingVisitor visitor = new RegionMetaLoadingVisitor();
1491      regionStateStore.visitMetaForRegion(regionEncodedName, visitor);
1492      return regionStates.getRegionState(regionEncodedName) == null ? null :
1493        regionStates.getRegionState(regionEncodedName).getRegion();
1494    } catch(IOException e) {
1495      LOG.error("Error trying to load region {} from META", regionEncodedName, e);
1496      throw new UnknownRegionException("Error while trying load region from meta");
1497    }
1498  }
1499
1500  private void loadMeta() throws IOException {
1501    // TODO: use a thread pool
1502    regionStateStore.visitMeta(new RegionMetaLoadingVisitor());
1503    // every assignment is blocked until meta is loaded.
1504    wakeMetaLoadedEvent();
1505  }
1506
1507  /**
1508   * Used to check if the meta loading is done.
1509   * <p/>
1510   * if not we throw PleaseHoldException since we are rebuilding the RegionStates
1511   * @param hri region to check if it is already rebuild
1512   * @throws PleaseHoldException if meta has not been loaded yet
1513   */
1514  private void checkMetaLoaded(RegionInfo hri) throws PleaseHoldException {
1515    if (!isRunning()) {
1516      throw new PleaseHoldException("AssignmentManager not running");
1517    }
1518    boolean meta = isMetaRegion(hri);
1519    boolean metaLoaded = isMetaLoaded();
1520    if (!meta && !metaLoaded) {
1521      throw new PleaseHoldException(
1522        "Master not fully online; hbase:meta=" + meta + ", metaLoaded=" + metaLoaded);
1523    }
1524  }
1525
1526  // ============================================================================================
1527  //  TODO: Metrics
1528  // ============================================================================================
1529  public int getNumRegionsOpened() {
1530    // TODO: Used by TestRegionPlacement.java and assume monotonically increasing value
1531    return 0;
1532  }
1533
1534  /**
1535   * Usually run by the Master in reaction to server crash during normal processing.
1536   * Can also be invoked via external RPC to effect repair; in the latter case,
1537   * the 'force' flag is set so we push through the SCP though context may indicate
1538   * already-running-SCP (An old SCP may have exited abnormally, or damaged cluster
1539   * may still have references in hbase:meta to 'Unknown Servers' -- servers that
1540   * are not online or in dead servers list, etc.)
1541   * @param force Set if the request came in externally over RPC (via hbck2). Force means
1542   *              run the SCP even if it seems as though there might be an outstanding
1543   *              SCP running.
1544   * @return pid of scheduled SCP or {@link Procedure#NO_PROC_ID} if none scheduled.
1545   */
1546  public long submitServerCrash(ServerName serverName, boolean shouldSplitWal, boolean force) {
1547    // May be an 'Unknown Server' so handle case where serverNode is null.
1548    ServerStateNode serverNode = regionStates.getServerNode(serverName);
1549    // Remove the in-memory rsReports result
1550    synchronized (rsReports) {
1551      rsReports.remove(serverName);
1552    }
1553
1554    // We hold the write lock here for fencing on reportRegionStateTransition. Once we set the
1555    // server state to CRASHED, we will no longer accept the reportRegionStateTransition call from
1556    // this server. This is used to simplify the implementation for TRSP and SCP, where we can make
1557    // sure that, the region list fetched by SCP will not be changed any more.
1558    if (serverNode != null) {
1559      serverNode.writeLock().lock();
1560    }
1561    boolean carryingMeta;
1562    long pid;
1563    try {
1564      ProcedureExecutor<MasterProcedureEnv> procExec = this.master.getMasterProcedureExecutor();
1565      carryingMeta = isCarryingMeta(serverName);
1566      if (!force && serverNode != null && !serverNode.isInState(ServerState.ONLINE)) {
1567        LOG.info("Skip adding SCP for {} (meta={}) -- running?", serverNode, carryingMeta);
1568        return Procedure.NO_PROC_ID;
1569      } else {
1570        MasterProcedureEnv mpe = procExec.getEnvironment();
1571        // If serverNode == null, then 'Unknown Server'. Schedule HBCKSCP instead.
1572        // HBCKSCP scours Master in-memory state AND hbase;meta for references to
1573        // serverName just-in-case. An SCP that is scheduled when the server is
1574        // 'Unknown' probably originated externally with HBCK2 fix-it tool.
1575        ServerState oldState = null;
1576        if (serverNode != null) {
1577          oldState = serverNode.getState();
1578          serverNode.setState(ServerState.CRASHED);
1579        }
1580
1581        if (force) {
1582          pid = procExec.submitProcedure(
1583              new HBCKServerCrashProcedure(mpe, serverName, shouldSplitWal, carryingMeta));
1584        } else {
1585          pid = procExec.submitProcedure(
1586              new ServerCrashProcedure(mpe, serverName, shouldSplitWal, carryingMeta));
1587        }
1588        LOG.info("Scheduled SCP pid={} for {} (carryingMeta={}){}.", pid, serverName, carryingMeta,
1589            serverNode == null? "": " " + serverNode.toString() + ", oldState=" + oldState);
1590      }
1591    } finally {
1592      if (serverNode != null) {
1593        serverNode.writeLock().unlock();
1594      }
1595    }
1596    return pid;
1597  }
1598
1599  public void offlineRegion(final RegionInfo regionInfo) {
1600    // TODO used by MasterRpcServices
1601    RegionStateNode node = regionStates.getRegionStateNode(regionInfo);
1602    if (node != null) {
1603      node.offline();
1604    }
1605  }
1606
1607  public void onlineRegion(final RegionInfo regionInfo, final ServerName serverName) {
1608    // TODO used by TestSplitTransactionOnCluster.java
1609  }
1610
1611  public Map<ServerName, List<RegionInfo>> getSnapShotOfAssignment(
1612      final Collection<RegionInfo> regions) {
1613    return regionStates.getSnapShotOfAssignment(regions);
1614  }
1615
1616  // ============================================================================================
1617  //  TODO: UTILS/HELPERS?
1618  // ============================================================================================
1619  /**
1620   * Used by the client (via master) to identify if all regions have the schema updates
1621   *
1622   * @param tableName
1623   * @return Pair indicating the status of the alter command (pending/total)
1624   * @throws IOException
1625   */
1626  public Pair<Integer, Integer> getReopenStatus(TableName tableName) {
1627    if (isTableDisabled(tableName)) return new Pair<Integer, Integer>(0, 0);
1628
1629    final List<RegionState> states = regionStates.getTableRegionStates(tableName);
1630    int ritCount = 0;
1631    for (RegionState regionState: states) {
1632      if (!regionState.isOpened() && !regionState.isSplit()) {
1633        ritCount++;
1634      }
1635    }
1636    return new Pair<Integer, Integer>(ritCount, states.size());
1637  }
1638
1639  // ============================================================================================
1640  //  TODO: Region State In Transition
1641  // ============================================================================================
1642  public boolean hasRegionsInTransition() {
1643    return regionStates.hasRegionsInTransition();
1644  }
1645
1646  public List<RegionStateNode> getRegionsInTransition() {
1647    return regionStates.getRegionsInTransition();
1648  }
1649
1650  public List<RegionInfo> getAssignedRegions() {
1651    return regionStates.getAssignedRegions();
1652  }
1653
1654  public RegionInfo getRegionInfo(final byte[] regionName) {
1655    final RegionStateNode regionState = regionStates.getRegionStateNodeFromName(regionName);
1656    return regionState != null ? regionState.getRegionInfo() : null;
1657  }
1658
1659  // ============================================================================================
1660  //  Expected states on region state transition.
1661  //  Notice that there is expected states for transiting to OPENING state, this is because SCP.
1662  //  See the comments in regionOpening method for more details.
1663  // ============================================================================================
1664  private static final State[] STATES_EXPECTED_ON_OPEN = {
1665    State.OPENING, // Normal case
1666    State.OPEN // Retrying
1667  };
1668
1669  private static final State[] STATES_EXPECTED_ON_CLOSING = {
1670    State.OPEN, // Normal case
1671    State.CLOSING, // Retrying
1672    State.SPLITTING, // Offline the split parent
1673    State.MERGING // Offline the merge parents
1674  };
1675
1676  private static final State[] STATES_EXPECTED_ON_CLOSED = {
1677    State.CLOSING, // Normal case
1678    State.CLOSED // Retrying
1679  };
1680
1681  // This is for manually scheduled region assign, can add other states later if we find out other
1682  // usages
1683  private static final State[] STATES_EXPECTED_ON_ASSIGN = { State.CLOSED, State.OFFLINE };
1684
1685  // We only allow unassign or move a region which is in OPEN state.
1686  private static final State[] STATES_EXPECTED_ON_UNASSIGN_OR_MOVE = { State.OPEN };
1687
1688  // ============================================================================================
1689  // Region Status update
1690  // Should only be called in TransitRegionStateProcedure(and related procedures), as the locking
1691  // and pre-assumptions are very tricky.
1692  // ============================================================================================
1693  private void transitStateAndUpdate(RegionStateNode regionNode, RegionState.State newState,
1694      RegionState.State... expectedStates) throws IOException {
1695    RegionState.State state = regionNode.getState();
1696    regionNode.transitionState(newState, expectedStates);
1697    boolean succ = false;
1698    try {
1699      regionStateStore.updateRegionLocation(regionNode);
1700      succ = true;
1701    } finally {
1702      if (!succ) {
1703        // revert
1704        regionNode.setState(state);
1705      }
1706    }
1707  }
1708
1709  // should be called within the synchronized block of RegionStateNode
1710  void regionOpening(RegionStateNode regionNode) throws IOException {
1711    // As in SCP, for performance reason, there is no TRSP attached with this region, we will not
1712    // update the region state, which means that the region could be in any state when we want to
1713    // assign it after a RS crash. So here we do not pass the expectedStates parameter.
1714    transitStateAndUpdate(regionNode, State.OPENING);
1715    regionStates.addRegionToServer(regionNode);
1716    // update the operation count metrics
1717    metrics.incrementOperationCounter();
1718  }
1719
1720  // should be called under the RegionStateNode lock
1721  // The parameter 'giveUp' means whether we will try to open the region again, if it is true, then
1722  // we will persist the FAILED_OPEN state into hbase:meta.
1723  void regionFailedOpen(RegionStateNode regionNode, boolean giveUp) throws IOException {
1724    RegionState.State state = regionNode.getState();
1725    ServerName regionLocation = regionNode.getRegionLocation();
1726    if (giveUp) {
1727      regionNode.setState(State.FAILED_OPEN);
1728      regionNode.setRegionLocation(null);
1729      boolean succ = false;
1730      try {
1731        regionStateStore.updateRegionLocation(regionNode);
1732        succ = true;
1733      } finally {
1734        if (!succ) {
1735          // revert
1736          regionNode.setState(state);
1737          regionNode.setRegionLocation(regionLocation);
1738        }
1739      }
1740    }
1741    if (regionLocation != null) {
1742      regionStates.removeRegionFromServer(regionLocation, regionNode);
1743    }
1744  }
1745
1746  // should be called under the RegionStateNode lock
1747  void regionClosing(RegionStateNode regionNode) throws IOException {
1748    transitStateAndUpdate(regionNode, State.CLOSING, STATES_EXPECTED_ON_CLOSING);
1749
1750    RegionInfo hri = regionNode.getRegionInfo();
1751    // Set meta has not initialized early. so people trying to create/edit tables will wait
1752    if (isMetaRegion(hri)) {
1753      setMetaAssigned(hri, false);
1754    }
1755    regionStates.addRegionToServer(regionNode);
1756    // update the operation count metrics
1757    metrics.incrementOperationCounter();
1758  }
1759
1760  // for open and close, they will first be persist to the procedure store in
1761  // RegionRemoteProcedureBase. So here we will first change the in memory state as it is considered
1762  // as succeeded if the persistence to procedure store is succeeded, and then when the
1763  // RegionRemoteProcedureBase is woken up, we will persist the RegionStateNode to hbase:meta.
1764
1765  // should be called under the RegionStateNode lock
1766  void regionOpenedWithoutPersistingToMeta(RegionStateNode regionNode) throws IOException {
1767    regionNode.transitionState(State.OPEN, STATES_EXPECTED_ON_OPEN);
1768    RegionInfo regionInfo = regionNode.getRegionInfo();
1769    regionStates.addRegionToServer(regionNode);
1770    regionStates.removeFromFailedOpen(regionInfo);
1771  }
1772
1773  // should be called under the RegionStateNode lock
1774  void regionClosedWithoutPersistingToMeta(RegionStateNode regionNode) throws IOException {
1775    ServerName regionLocation = regionNode.getRegionLocation();
1776    regionNode.transitionState(State.CLOSED, STATES_EXPECTED_ON_CLOSED);
1777    regionNode.setRegionLocation(null);
1778    if (regionLocation != null) {
1779      regionNode.setLastHost(regionLocation);
1780      regionStates.removeRegionFromServer(regionLocation, regionNode);
1781    }
1782  }
1783
1784  // should be called under the RegionStateNode lock
1785  // for SCP
1786  public void regionClosedAbnormally(RegionStateNode regionNode) throws IOException {
1787    RegionState.State state = regionNode.getState();
1788    ServerName regionLocation = regionNode.getRegionLocation();
1789    regionNode.transitionState(State.ABNORMALLY_CLOSED);
1790    regionNode.setRegionLocation(null);
1791    boolean succ = false;
1792    try {
1793      regionStateStore.updateRegionLocation(regionNode);
1794      succ = true;
1795    } finally {
1796      if (!succ) {
1797        // revert
1798        regionNode.setState(state);
1799        regionNode.setRegionLocation(regionLocation);
1800      }
1801    }
1802    if (regionLocation != null) {
1803      regionNode.setLastHost(regionLocation);
1804      regionStates.removeRegionFromServer(regionLocation, regionNode);
1805    }
1806  }
1807
1808  void persistToMeta(RegionStateNode regionNode) throws IOException {
1809    regionStateStore.updateRegionLocation(regionNode);
1810    RegionInfo regionInfo = regionNode.getRegionInfo();
1811    if (isMetaRegion(regionInfo) && regionNode.getState() == State.OPEN) {
1812      // Usually we'd set a table ENABLED at this stage but hbase:meta is ALWAYs enabled, it
1813      // can't be disabled -- so skip the RPC (besides... enabled is managed by TableStateManager
1814      // which is backed by hbase:meta... Avoid setting ENABLED to avoid having to update state
1815      // on table that contains state.
1816      setMetaAssigned(regionInfo, true);
1817    }
1818  }
1819
1820  // ============================================================================================
1821  // The above methods can only be called in TransitRegionStateProcedure(and related procedures)
1822  // ============================================================================================
1823
1824  public void markRegionAsSplit(final RegionInfo parent, final ServerName serverName,
1825      final RegionInfo daughterA, final RegionInfo daughterB) throws IOException {
1826    // Update hbase:meta. Parent will be marked offline and split up in hbase:meta.
1827    // The parent stays in regionStates until cleared when removed by CatalogJanitor.
1828    // Update its state in regionStates to it shows as offline and split when read
1829    // later figuring what regions are in a table and what are not: see
1830    // regionStates#getRegionsOfTable
1831    final RegionStateNode node = regionStates.getOrCreateRegionStateNode(parent);
1832    node.setState(State.SPLIT);
1833    final RegionStateNode nodeA = regionStates.getOrCreateRegionStateNode(daughterA);
1834    nodeA.setState(State.SPLITTING_NEW);
1835    final RegionStateNode nodeB = regionStates.getOrCreateRegionStateNode(daughterB);
1836    nodeB.setState(State.SPLITTING_NEW);
1837
1838    regionStateStore.splitRegion(parent, daughterA, daughterB, serverName);
1839    if (shouldAssignFavoredNodes(parent)) {
1840      List<ServerName> onlineServers = this.master.getServerManager().getOnlineServersList();
1841      getFavoredNodePromoter().generateFavoredNodesForDaughter(onlineServers, parent, daughterA,
1842        daughterB);
1843    }
1844  }
1845
1846  /**
1847   * When called here, the merge has happened. The merged regions have been
1848   * unassigned and the above markRegionClosed has been called on each so they have been
1849   * disassociated from a hosting Server. The merged region will be open after this call. The
1850   * merged regions are removed from hbase:meta below. Later they are deleted from the filesystem
1851   * by the catalog janitor running against hbase:meta. It notices when the merged region no
1852   * longer holds references to the old regions (References are deleted after a compaction
1853   * rewrites what the Reference points at but not until the archiver chore runs, are the
1854   * References removed).
1855   */
1856  public void markRegionAsMerged(final RegionInfo child, final ServerName serverName,
1857        RegionInfo [] mergeParents)
1858      throws IOException {
1859    final RegionStateNode node = regionStates.getOrCreateRegionStateNode(child);
1860    node.setState(State.MERGED);
1861    for (RegionInfo ri: mergeParents) {
1862      regionStates.deleteRegion(ri);
1863
1864    }
1865    regionStateStore.mergeRegions(child, mergeParents, serverName);
1866    if (shouldAssignFavoredNodes(child)) {
1867      getFavoredNodePromoter().generateFavoredNodesForMergedRegion(child, mergeParents);
1868    }
1869  }
1870
1871  /*
1872   * Favored nodes should be applied only when FavoredNodes balancer is configured and the region
1873   * belongs to a non-system table.
1874   */
1875  private boolean shouldAssignFavoredNodes(RegionInfo region) {
1876    return this.shouldAssignRegionsWithFavoredNodes &&
1877        FavoredNodesManager.isFavoredNodeApplicable(region);
1878  }
1879
1880  // ============================================================================================
1881  //  Assign Queue (Assign/Balance)
1882  // ============================================================================================
1883  private final ArrayList<RegionStateNode> pendingAssignQueue = new ArrayList<RegionStateNode>();
1884  private final ReentrantLock assignQueueLock = new ReentrantLock();
1885  private final Condition assignQueueFullCond = assignQueueLock.newCondition();
1886
1887  /**
1888   * Add the assign operation to the assignment queue.
1889   * The pending assignment operation will be processed,
1890   * and each region will be assigned by a server using the balancer.
1891   */
1892  protected void queueAssign(final RegionStateNode regionNode) {
1893    regionNode.getProcedureEvent().suspend();
1894
1895    // TODO: quick-start for meta and the other sys-tables?
1896    assignQueueLock.lock();
1897    try {
1898      pendingAssignQueue.add(regionNode);
1899      if (regionNode.isSystemTable() ||
1900          pendingAssignQueue.size() == 1 ||
1901          pendingAssignQueue.size() >= assignDispatchWaitQueueMaxSize) {
1902        assignQueueFullCond.signal();
1903      }
1904    } finally {
1905      assignQueueLock.unlock();
1906    }
1907  }
1908
1909  private void startAssignmentThread() {
1910    assignThread = new Thread(master.getServerName().toShortString()) {
1911      @Override
1912      public void run() {
1913        while (isRunning()) {
1914          processAssignQueue();
1915        }
1916        pendingAssignQueue.clear();
1917      }
1918    };
1919    assignThread.setDaemon(true);
1920    assignThread.start();
1921  }
1922
1923  private void stopAssignmentThread() {
1924    assignQueueSignal();
1925    try {
1926      while (assignThread.isAlive()) {
1927        assignQueueSignal();
1928        assignThread.join(250);
1929      }
1930    } catch (InterruptedException e) {
1931      LOG.warn("join interrupted", e);
1932      Thread.currentThread().interrupt();
1933    }
1934  }
1935
1936  private void assignQueueSignal() {
1937    assignQueueLock.lock();
1938    try {
1939      assignQueueFullCond.signal();
1940    } finally {
1941      assignQueueLock.unlock();
1942    }
1943  }
1944
1945  @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP")
1946  private HashMap<RegionInfo, RegionStateNode> waitOnAssignQueue() {
1947    HashMap<RegionInfo, RegionStateNode> regions = null;
1948
1949    assignQueueLock.lock();
1950    try {
1951      if (pendingAssignQueue.isEmpty() && isRunning()) {
1952        assignQueueFullCond.await();
1953      }
1954
1955      if (!isRunning()) return null;
1956      assignQueueFullCond.await(assignDispatchWaitMillis, TimeUnit.MILLISECONDS);
1957      regions = new HashMap<RegionInfo, RegionStateNode>(pendingAssignQueue.size());
1958      for (RegionStateNode regionNode: pendingAssignQueue) {
1959        regions.put(regionNode.getRegionInfo(), regionNode);
1960      }
1961      pendingAssignQueue.clear();
1962    } catch (InterruptedException e) {
1963      LOG.warn("got interrupted ", e);
1964      Thread.currentThread().interrupt();
1965    } finally {
1966      assignQueueLock.unlock();
1967    }
1968    return regions;
1969  }
1970
1971  private void processAssignQueue() {
1972    final HashMap<RegionInfo, RegionStateNode> regions = waitOnAssignQueue();
1973    if (regions == null || regions.size() == 0 || !isRunning()) {
1974      return;
1975    }
1976
1977    if (LOG.isTraceEnabled()) {
1978      LOG.trace("PROCESS ASSIGN QUEUE regionCount=" + regions.size());
1979    }
1980
1981    // TODO: Optimize balancer. pass a RegionPlan?
1982    final HashMap<RegionInfo, ServerName> retainMap = new HashMap<>();
1983    final List<RegionInfo> userHRIs = new ArrayList<>(regions.size());
1984    // Regions for system tables requiring reassignment
1985    final List<RegionInfo> systemHRIs = new ArrayList<>();
1986    for (RegionStateNode regionStateNode: regions.values()) {
1987      boolean sysTable = regionStateNode.isSystemTable();
1988      final List<RegionInfo> hris = sysTable? systemHRIs: userHRIs;
1989      if (regionStateNode.getRegionLocation() != null) {
1990        retainMap.put(regionStateNode.getRegionInfo(), regionStateNode.getRegionLocation());
1991      } else {
1992        hris.add(regionStateNode.getRegionInfo());
1993      }
1994    }
1995
1996    // TODO: connect with the listener to invalidate the cache
1997
1998    // TODO use events
1999    List<ServerName> servers = master.getServerManager().createDestinationServersList();
2000    for (int i = 0; servers.size() < 1; ++i) {
2001      // Report every fourth time around this loop; try not to flood log.
2002      if (i % 4 == 0) {
2003        LOG.warn("No servers available; cannot place " + regions.size() + " unassigned regions.");
2004      }
2005
2006      if (!isRunning()) {
2007        LOG.debug("Stopped! Dropping assign of " + regions.size() + " queued regions.");
2008        return;
2009      }
2010      Threads.sleep(250);
2011      servers = master.getServerManager().createDestinationServersList();
2012    }
2013
2014    if (!systemHRIs.isEmpty()) {
2015      // System table regions requiring reassignment are present, get region servers
2016      // not available for system table regions
2017      final List<ServerName> excludeServers = getExcludedServersForSystemTable();
2018      List<ServerName> serversForSysTables = servers.stream()
2019          .filter(s -> !excludeServers.contains(s)).collect(Collectors.toList());
2020      if (serversForSysTables.isEmpty()) {
2021        LOG.warn("Filtering old server versions and the excluded produced an empty set; " +
2022            "instead considering all candidate servers!");
2023      }
2024      LOG.debug("Processing assignQueue; systemServersCount=" + serversForSysTables.size() +
2025          ", allServersCount=" + servers.size());
2026      processAssignmentPlans(regions, null, systemHRIs,
2027          serversForSysTables.isEmpty() && !containsBogusAssignments(regions, systemHRIs) ?
2028              servers: serversForSysTables);
2029    }
2030
2031    processAssignmentPlans(regions, retainMap, userHRIs, servers);
2032  }
2033
2034  private boolean containsBogusAssignments(Map<RegionInfo, RegionStateNode> regions,
2035      List<RegionInfo> hirs) {
2036    for (RegionInfo ri : hirs) {
2037      if (regions.get(ri).getRegionLocation() != null &&
2038          regions.get(ri).getRegionLocation().equals(LoadBalancer.BOGUS_SERVER_NAME)){
2039        return true;
2040      }
2041    }
2042    return false;
2043  }
2044
2045  private void processAssignmentPlans(final HashMap<RegionInfo, RegionStateNode> regions,
2046      final HashMap<RegionInfo, ServerName> retainMap, final List<RegionInfo> hris,
2047      final List<ServerName> servers) {
2048    boolean isTraceEnabled = LOG.isTraceEnabled();
2049    if (isTraceEnabled) {
2050      LOG.trace("Available servers count=" + servers.size() + ": " + servers);
2051    }
2052
2053    final LoadBalancer balancer = getBalancer();
2054    // ask the balancer where to place regions
2055    if (retainMap != null && !retainMap.isEmpty()) {
2056      if (isTraceEnabled) {
2057        LOG.trace("retain assign regions=" + retainMap);
2058      }
2059      try {
2060        acceptPlan(regions, balancer.retainAssignment(retainMap, servers));
2061      } catch (IOException e) {
2062        LOG.warn("unable to retain assignment", e);
2063        addToPendingAssignment(regions, retainMap.keySet());
2064      }
2065    }
2066
2067    // TODO: Do we need to split retain and round-robin?
2068    // the retain seems to fallback to round-robin/random if the region is not in the map.
2069    if (!hris.isEmpty()) {
2070      Collections.sort(hris, RegionInfo.COMPARATOR);
2071      if (isTraceEnabled) {
2072        LOG.trace("round robin regions=" + hris);
2073      }
2074      try {
2075        acceptPlan(regions, balancer.roundRobinAssignment(hris, servers));
2076      } catch (IOException e) {
2077        LOG.warn("unable to round-robin assignment", e);
2078        addToPendingAssignment(regions, hris);
2079      }
2080    }
2081  }
2082
2083  private void acceptPlan(final HashMap<RegionInfo, RegionStateNode> regions,
2084      final Map<ServerName, List<RegionInfo>> plan) throws HBaseIOException {
2085    final ProcedureEvent<?>[] events = new ProcedureEvent[regions.size()];
2086    final long st = System.currentTimeMillis();
2087
2088    if (plan == null) {
2089      throw new HBaseIOException("unable to compute plans for regions=" + regions.size());
2090    }
2091
2092    if (plan.isEmpty()) return;
2093
2094    int evcount = 0;
2095    for (Map.Entry<ServerName, List<RegionInfo>> entry: plan.entrySet()) {
2096      final ServerName server = entry.getKey();
2097      for (RegionInfo hri: entry.getValue()) {
2098        final RegionStateNode regionNode = regions.get(hri);
2099        regionNode.setRegionLocation(server);
2100        if (server.equals(LoadBalancer.BOGUS_SERVER_NAME) && regionNode.isSystemTable()) {
2101          assignQueueLock.lock();
2102          try {
2103            pendingAssignQueue.add(regionNode);
2104          } finally {
2105            assignQueueLock.unlock();
2106          }
2107        }else {
2108          events[evcount++] = regionNode.getProcedureEvent();
2109        }
2110      }
2111    }
2112    ProcedureEvent.wakeEvents(getProcedureScheduler(), events);
2113
2114    final long et = System.currentTimeMillis();
2115    if (LOG.isTraceEnabled()) {
2116      LOG.trace("ASSIGN ACCEPT " + events.length + " -> " +
2117          StringUtils.humanTimeDiff(et - st));
2118    }
2119  }
2120
2121  private void addToPendingAssignment(final HashMap<RegionInfo, RegionStateNode> regions,
2122      final Collection<RegionInfo> pendingRegions) {
2123    assignQueueLock.lock();
2124    try {
2125      for (RegionInfo hri: pendingRegions) {
2126        pendingAssignQueue.add(regions.get(hri));
2127      }
2128    } finally {
2129      assignQueueLock.unlock();
2130    }
2131  }
2132
2133  /**
2134   * Get a list of servers that this region cannot be assigned to.
2135   * For system tables, we must assign them to a server with highest version.
2136   */
2137  public List<ServerName> getExcludedServersForSystemTable() {
2138    // TODO: This should be a cached list kept by the ServerManager rather than calculated on each
2139    // move or system region assign. The RegionServerTracker keeps list of online Servers with
2140    // RegionServerInfo that includes Version.
2141    List<Pair<ServerName, String>> serverList = master.getServerManager().getOnlineServersList()
2142        .stream()
2143        .map((s)->new Pair<>(s, master.getRegionServerVersion(s)))
2144        .collect(Collectors.toList());
2145    if (serverList.isEmpty()) {
2146      return Collections.emptyList();
2147    }
2148    String highestVersion = Collections.max(serverList,
2149        (o1, o2) -> VersionInfo.compareVersion(o1.getSecond(), o2.getSecond())).getSecond();
2150    return serverList.stream()
2151        .filter((p)->!p.getSecond().equals(highestVersion))
2152        .map(Pair::getFirst)
2153        .collect(Collectors.toList());
2154  }
2155
2156  @VisibleForTesting
2157  MasterServices getMaster() {
2158    return master;
2159  }
2160
2161  /**
2162   * @return a snapshot of rsReports
2163   */
2164  public Map<ServerName, Set<byte[]>> getRSReports() {
2165    Map<ServerName, Set<byte[]>> rsReportsSnapshot = new HashMap<>();
2166    synchronized (rsReports) {
2167      rsReports.entrySet().forEach(e -> rsReportsSnapshot.put(e.getKey(), e.getValue()));
2168    }
2169    return rsReportsSnapshot;
2170  }
2171
2172  /**
2173   * Provide regions state count for given table.
2174   * e.g howmany regions of give table are opened/closed/rit etc
2175   *
2176   * @param tableName TableName
2177   * @return region states count
2178   */
2179  public RegionStatesCount getRegionStatesCount(TableName tableName) {
2180    int openRegionsCount = 0;
2181    int closedRegionCount = 0;
2182    int ritCount = 0;
2183    int splitRegionCount = 0;
2184    int totalRegionCount = 0;
2185    if (!isTableDisabled(tableName)) {
2186      final List<RegionState> states = regionStates.getTableRegionStates(tableName);
2187      for (RegionState regionState : states) {
2188        if (regionState.isOpened()) {
2189          openRegionsCount++;
2190        } else if (regionState.isClosed()) {
2191          closedRegionCount++;
2192        } else if (regionState.isSplit()) {
2193          splitRegionCount++;
2194        }
2195      }
2196      totalRegionCount = states.size();
2197      ritCount = totalRegionCount - openRegionsCount - splitRegionCount;
2198    }
2199    return new RegionStatesCount.RegionStatesCountBuilder()
2200      .setOpenRegions(openRegionsCount)
2201      .setClosedRegions(closedRegionCount)
2202      .setSplitRegions(splitRegionCount)
2203      .setRegionsInTransition(ritCount)
2204      .setTotalRegions(totalRegionCount)
2205      .build();
2206  }
2207
2208}