001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.assignment;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Collection;
023import java.util.Collections;
024import java.util.HashMap;
025import java.util.HashSet;
026import java.util.List;
027import java.util.Map;
028import java.util.Set;
029import java.util.concurrent.Future;
030import java.util.concurrent.TimeUnit;
031import java.util.concurrent.atomic.AtomicBoolean;
032import java.util.concurrent.locks.Condition;
033import java.util.concurrent.locks.ReentrantLock;
034import java.util.stream.Collectors;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.hbase.DoNotRetryIOException;
037import org.apache.hadoop.hbase.HBaseIOException;
038import org.apache.hadoop.hbase.HConstants;
039import org.apache.hadoop.hbase.PleaseHoldException;
040import org.apache.hadoop.hbase.ServerName;
041import org.apache.hadoop.hbase.TableName;
042import org.apache.hadoop.hbase.UnknownRegionException;
043import org.apache.hadoop.hbase.client.DoNotRetryRegionException;
044import org.apache.hadoop.hbase.client.RegionInfo;
045import org.apache.hadoop.hbase.client.RegionInfoBuilder;
046import org.apache.hadoop.hbase.client.RegionStatesCount;
047import org.apache.hadoop.hbase.client.Result;
048import org.apache.hadoop.hbase.client.TableState;
049import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
050import org.apache.hadoop.hbase.favored.FavoredNodesManager;
051import org.apache.hadoop.hbase.favored.FavoredNodesPromoter;
052import org.apache.hadoop.hbase.master.LoadBalancer;
053import org.apache.hadoop.hbase.master.MasterServices;
054import org.apache.hadoop.hbase.master.MetricsAssignmentManager;
055import org.apache.hadoop.hbase.master.RegionPlan;
056import org.apache.hadoop.hbase.master.RegionState;
057import org.apache.hadoop.hbase.master.RegionState.State;
058import org.apache.hadoop.hbase.master.ServerManager;
059import org.apache.hadoop.hbase.master.TableStateManager;
060import org.apache.hadoop.hbase.master.balancer.FavoredStochasticBalancer;
061import org.apache.hadoop.hbase.master.procedure.HBCKServerCrashProcedure;
062import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
063import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler;
064import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait;
065import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
066import org.apache.hadoop.hbase.procedure2.Procedure;
067import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
068import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
069import org.apache.hadoop.hbase.procedure2.ProcedureInMemoryChore;
070import org.apache.hadoop.hbase.procedure2.util.StringUtils;
071import org.apache.hadoop.hbase.regionserver.SequenceId;
072import org.apache.hadoop.hbase.util.Bytes;
073import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
074import org.apache.hadoop.hbase.util.HasThread;
075import org.apache.hadoop.hbase.util.Pair;
076import org.apache.hadoop.hbase.util.Threads;
077import org.apache.hadoop.hbase.util.VersionInfo;
078import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
079import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
080import org.apache.yetus.audience.InterfaceAudience;
081import org.apache.zookeeper.KeeperException;
082import org.slf4j.Logger;
083import org.slf4j.LoggerFactory;
084
085import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
086
087import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
088import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
089import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
090import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
091import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
092
093/**
094 * The AssignmentManager is the coordinator for region assign/unassign operations.
095 * <ul>
096 * <li>In-memory states of regions and servers are stored in {@link RegionStates}.</li>
097 * <li>hbase:meta state updates are handled by {@link RegionStateStore}.</li>
098 * </ul>
099 * Regions are created by CreateTable, Split, Merge.
100 * Regions are deleted by DeleteTable, Split, Merge.
101 * Assigns are triggered by CreateTable, EnableTable, Split, Merge, ServerCrash.
102 * Unassigns are triggered by DisableTable, Split, Merge
103 */
104@InterfaceAudience.Private
105public class AssignmentManager {
106  private static final Logger LOG = LoggerFactory.getLogger(AssignmentManager.class);
107
108  // TODO: AMv2
109  //  - handle region migration from hbase1 to hbase2.
110  //  - handle sys table assignment first (e.g. acl, namespace)
111  //  - handle table priorities
112  //  - If ServerBusyException trying to update hbase:meta, we abort the Master
113  //   See updateRegionLocation in RegionStateStore.
114  //
115  // See also
116  // https://docs.google.com/document/d/1eVKa7FHdeoJ1-9o8yZcOTAQbv0u0bblBlCCzVSIn69g/edit#heading=h.ystjyrkbtoq5
117  // for other TODOs.
118
119  public static final String BOOTSTRAP_THREAD_POOL_SIZE_CONF_KEY =
120      "hbase.assignment.bootstrap.thread.pool.size";
121
122  public static final String ASSIGN_DISPATCH_WAIT_MSEC_CONF_KEY =
123      "hbase.assignment.dispatch.wait.msec";
124  private static final int DEFAULT_ASSIGN_DISPATCH_WAIT_MSEC = 150;
125
126  public static final String ASSIGN_DISPATCH_WAITQ_MAX_CONF_KEY =
127      "hbase.assignment.dispatch.wait.queue.max.size";
128  private static final int DEFAULT_ASSIGN_DISPATCH_WAITQ_MAX = 100;
129
130  public static final String RIT_CHORE_INTERVAL_MSEC_CONF_KEY =
131      "hbase.assignment.rit.chore.interval.msec";
132  private static final int DEFAULT_RIT_CHORE_INTERVAL_MSEC = 60 * 1000;
133
134  public static final String DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC_CONF_KEY =
135      "hbase.assignment.dead.region.metric.chore.interval.msec";
136  private static final int DEFAULT_DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC = 120 * 1000;
137
138  public static final String ASSIGN_MAX_ATTEMPTS =
139      "hbase.assignment.maximum.attempts";
140  private static final int DEFAULT_ASSIGN_MAX_ATTEMPTS = Integer.MAX_VALUE;
141
142  public static final String ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS =
143      "hbase.assignment.retry.immediately.maximum.attempts";
144  private static final int DEFAULT_ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS = 3;
145
146  /** Region in Transition metrics threshold time */
147  public static final String METRICS_RIT_STUCK_WARNING_THRESHOLD =
148      "hbase.metrics.rit.stuck.warning.threshold";
149  private static final int DEFAULT_RIT_STUCK_WARNING_THRESHOLD = 60 * 1000;
150
151  private final ProcedureEvent<?> metaAssignEvent = new ProcedureEvent<>("meta assign");
152  private final ProcedureEvent<?> metaLoadEvent = new ProcedureEvent<>("meta load");
153
154  private final MetricsAssignmentManager metrics;
155  private final RegionInTransitionChore ritChore;
156  private final DeadServerMetricRegionChore deadMetricChore;
157  private final MasterServices master;
158
159  private final AtomicBoolean running = new AtomicBoolean(false);
160  private final RegionStates regionStates = new RegionStates();
161  private final RegionStateStore regionStateStore;
162
163  private final Map<ServerName, Set<byte[]>> rsReports = new HashMap<>();
164
165  private final boolean shouldAssignRegionsWithFavoredNodes;
166  private final int assignDispatchWaitQueueMaxSize;
167  private final int assignDispatchWaitMillis;
168  private final int assignMaxAttempts;
169  private final int assignRetryImmediatelyMaxAttempts;
170
171  private final Object checkIfShouldMoveSystemRegionLock = new Object();
172
173  private Thread assignThread;
174
175  public AssignmentManager(final MasterServices master) {
176    this(master, new RegionStateStore(master));
177  }
178
179  @VisibleForTesting
180  AssignmentManager(final MasterServices master, final RegionStateStore stateStore) {
181    this.master = master;
182    this.regionStateStore = stateStore;
183    this.metrics = new MetricsAssignmentManager();
184
185    final Configuration conf = master.getConfiguration();
186
187    // Only read favored nodes if using the favored nodes load balancer.
188    this.shouldAssignRegionsWithFavoredNodes = FavoredStochasticBalancer.class.isAssignableFrom(
189        conf.getClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, Object.class));
190
191    this.assignDispatchWaitMillis = conf.getInt(ASSIGN_DISPATCH_WAIT_MSEC_CONF_KEY,
192        DEFAULT_ASSIGN_DISPATCH_WAIT_MSEC);
193    this.assignDispatchWaitQueueMaxSize = conf.getInt(ASSIGN_DISPATCH_WAITQ_MAX_CONF_KEY,
194        DEFAULT_ASSIGN_DISPATCH_WAITQ_MAX);
195
196    this.assignMaxAttempts = Math.max(1, conf.getInt(ASSIGN_MAX_ATTEMPTS,
197        DEFAULT_ASSIGN_MAX_ATTEMPTS));
198    this.assignRetryImmediatelyMaxAttempts = conf.getInt(ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS,
199        DEFAULT_ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS);
200
201    int ritChoreInterval = conf.getInt(RIT_CHORE_INTERVAL_MSEC_CONF_KEY,
202        DEFAULT_RIT_CHORE_INTERVAL_MSEC);
203    this.ritChore = new RegionInTransitionChore(ritChoreInterval);
204
205    int deadRegionChoreInterval = conf.getInt(DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC_CONF_KEY,
206        DEFAULT_DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC);
207    if (deadRegionChoreInterval > 0) {
208      this.deadMetricChore = new DeadServerMetricRegionChore(deadRegionChoreInterval);
209    } else {
210      this.deadMetricChore = null;
211    }
212  }
213
214  public void start() throws IOException, KeeperException {
215    if (!running.compareAndSet(false, true)) {
216      return;
217    }
218
219    LOG.trace("Starting assignment manager");
220
221    // Start the Assignment Thread
222    startAssignmentThread();
223
224    // load meta region state
225    ZKWatcher zkw = master.getZooKeeper();
226    // it could be null in some tests
227    if (zkw != null) {
228      RegionState regionState = MetaTableLocator.getMetaRegionState(zkw);
229      RegionStateNode regionNode =
230        regionStates.getOrCreateRegionStateNode(RegionInfoBuilder.FIRST_META_REGIONINFO);
231      regionNode.lock();
232      try {
233        regionNode.setRegionLocation(regionState.getServerName());
234        regionNode.setState(regionState.getState());
235        if (regionNode.getProcedure() != null) {
236          regionNode.getProcedure().stateLoaded(this, regionNode);
237        }
238        setMetaAssigned(regionState.getRegion(), regionState.getState() == State.OPEN);
239      } finally {
240        regionNode.unlock();
241      }
242    }
243  }
244
245  /**
246   * Create RegionStateNode based on the TRSP list, and attach the TRSP to the RegionStateNode.
247   * <p>
248   * This is used to restore the RIT region list, so we do not need to restore it in the loadingMeta
249   * method below. And it is also very important as now before submitting a TRSP, we need to attach
250   * it to the RegionStateNode, which acts like a guard, so we need to restore this information at
251   * the very beginning, before we start processing any procedures.
252   */
253  public void setupRIT(List<TransitRegionStateProcedure> procs) {
254    procs.forEach(proc -> {
255      RegionInfo regionInfo = proc.getRegion();
256      RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo);
257      TransitRegionStateProcedure existingProc = regionNode.getProcedure();
258      if (existingProc != null) {
259        // This is possible, as we will detach the procedure from the RSN before we
260        // actually finish the procedure. This is because that, we will detach the TRSP from the RSN
261        // during execution, at that time, the procedure has not been marked as done in the pv2
262        // framework yet, so it is possible that we schedule a new TRSP immediately and when
263        // arriving here, we will find out that there are multiple TRSPs for the region. But we can
264        // make sure that, only the last one can take the charge, the previous ones should have all
265        // been finished already. So here we will compare the proc id, the greater one will win.
266        if (existingProc.getProcId() < proc.getProcId()) {
267          // the new one wins, unset and set it to the new one below
268          regionNode.unsetProcedure(existingProc);
269        } else {
270          // the old one wins, skip
271          return;
272        }
273      }
274      LOG.info("Attach {} to {} to restore RIT", proc, regionNode);
275      regionNode.setProcedure(proc);
276    });
277  }
278
279  public void stop() {
280    if (!running.compareAndSet(true, false)) {
281      return;
282    }
283
284    LOG.info("Stopping assignment manager");
285
286    // The AM is started before the procedure executor,
287    // but the actual work will be loaded/submitted only once we have the executor
288    final boolean hasProcExecutor = master.getMasterProcedureExecutor() != null;
289
290    // Remove the RIT chore
291    if (hasProcExecutor) {
292      master.getMasterProcedureExecutor().removeChore(this.ritChore);
293      if (this.deadMetricChore != null) {
294        master.getMasterProcedureExecutor().removeChore(this.deadMetricChore);
295      }
296    }
297
298    // Stop the Assignment Thread
299    stopAssignmentThread();
300
301    // Stop the RegionStateStore
302    regionStates.clear();
303
304    // Update meta events (for testing)
305    if (hasProcExecutor) {
306      metaLoadEvent.suspend();
307      for (RegionInfo hri: getMetaRegionSet()) {
308        setMetaAssigned(hri, false);
309      }
310    }
311  }
312
313  public boolean isRunning() {
314    return running.get();
315  }
316
317  public Configuration getConfiguration() {
318    return master.getConfiguration();
319  }
320
321  public MetricsAssignmentManager getAssignmentManagerMetrics() {
322    return metrics;
323  }
324
325  private LoadBalancer getBalancer() {
326    return master.getLoadBalancer();
327  }
328
329  private MasterProcedureEnv getProcedureEnvironment() {
330    return master.getMasterProcedureExecutor().getEnvironment();
331  }
332
333  private MasterProcedureScheduler getProcedureScheduler() {
334    return getProcedureEnvironment().getProcedureScheduler();
335  }
336
337  int getAssignMaxAttempts() {
338    return assignMaxAttempts;
339  }
340
341  int getAssignRetryImmediatelyMaxAttempts() {
342    return assignRetryImmediatelyMaxAttempts;
343  }
344
345  public RegionStates getRegionStates() {
346    return regionStates;
347  }
348
349  /**
350   * Returns the regions hosted by the specified server.
351   * <p/>
352   * Notice that, for SCP, after we submit the SCP, no one can change the region list for the
353   * ServerStateNode so we do not need any locks here. And for other usage, this can only give you a
354   * snapshot of the current region list for this server, which means, right after you get the
355   * region list, new regions may be moved to this server or some regions may be moved out from this
356   * server, so you should not use it critically if you need strong consistency.
357   */
358  public List<RegionInfo> getRegionsOnServer(ServerName serverName) {
359    ServerStateNode serverInfo = regionStates.getServerNode(serverName);
360    if (serverInfo == null) {
361      return Collections.emptyList();
362    }
363    return serverInfo.getRegionInfoList();
364  }
365
366  public RegionStateStore getRegionStateStore() {
367    return regionStateStore;
368  }
369
370  public List<ServerName> getFavoredNodes(final RegionInfo regionInfo) {
371    return this.shouldAssignRegionsWithFavoredNodes
372      ? ((FavoredStochasticBalancer) getBalancer()).getFavoredNodes(regionInfo)
373      : ServerName.EMPTY_SERVER_LIST;
374  }
375
376  // ============================================================================================
377  //  Table State Manager helpers
378  // ============================================================================================
379  TableStateManager getTableStateManager() {
380    return master.getTableStateManager();
381  }
382
383  public boolean isTableEnabled(final TableName tableName) {
384    return getTableStateManager().isTableState(tableName, TableState.State.ENABLED);
385  }
386
387  public boolean isTableDisabled(final TableName tableName) {
388    return getTableStateManager().isTableState(tableName,
389      TableState.State.DISABLED, TableState.State.DISABLING);
390  }
391
392  // ============================================================================================
393  //  META Helpers
394  // ============================================================================================
395  private boolean isMetaRegion(final RegionInfo regionInfo) {
396    return regionInfo.isMetaRegion();
397  }
398
399  public boolean isMetaRegion(final byte[] regionName) {
400    return getMetaRegionFromName(regionName) != null;
401  }
402
403  public RegionInfo getMetaRegionFromName(final byte[] regionName) {
404    for (RegionInfo hri: getMetaRegionSet()) {
405      if (Bytes.equals(hri.getRegionName(), regionName)) {
406        return hri;
407      }
408    }
409    return null;
410  }
411
412  public boolean isCarryingMeta(final ServerName serverName) {
413    // TODO: handle multiple meta
414    return isCarryingRegion(serverName, RegionInfoBuilder.FIRST_META_REGIONINFO);
415  }
416
417  private boolean isCarryingRegion(final ServerName serverName, final RegionInfo regionInfo) {
418    // TODO: check for state?
419    final RegionStateNode node = regionStates.getRegionStateNode(regionInfo);
420    return(node != null && serverName.equals(node.getRegionLocation()));
421  }
422
423  private RegionInfo getMetaForRegion(final RegionInfo regionInfo) {
424    //if (regionInfo.isMetaRegion()) return regionInfo;
425    // TODO: handle multiple meta. if the region provided is not meta lookup
426    // which meta the region belongs to.
427    return RegionInfoBuilder.FIRST_META_REGIONINFO;
428  }
429
430  // TODO: handle multiple meta.
431  private static final Set<RegionInfo> META_REGION_SET =
432      Collections.singleton(RegionInfoBuilder.FIRST_META_REGIONINFO);
433  public Set<RegionInfo> getMetaRegionSet() {
434    return META_REGION_SET;
435  }
436
437  // ============================================================================================
438  //  META Event(s) helpers
439  // ============================================================================================
440  /**
441   * Notice that, this only means the meta region is available on a RS, but the AM may still be
442   * loading the region states from meta, so usually you need to check {@link #isMetaLoaded()} first
443   * before checking this method, unless you can make sure that your piece of code can only be
444   * executed after AM builds the region states.
445   * @see #isMetaLoaded()
446   */
447  public boolean isMetaAssigned() {
448    return metaAssignEvent.isReady();
449  }
450
451  public boolean isMetaRegionInTransition() {
452    return !isMetaAssigned();
453  }
454
455  /**
456   * Notice that this event does not mean the AM has already finished region state rebuilding. See
457   * the comment of {@link #isMetaAssigned()} for more details.
458   * @see #isMetaAssigned()
459   */
460  public boolean waitMetaAssigned(Procedure<?> proc, RegionInfo regionInfo) {
461    return getMetaAssignEvent(getMetaForRegion(regionInfo)).suspendIfNotReady(proc);
462  }
463
464  private void setMetaAssigned(RegionInfo metaRegionInfo, boolean assigned) {
465    assert isMetaRegion(metaRegionInfo) : "unexpected non-meta region " + metaRegionInfo;
466    ProcedureEvent<?> metaAssignEvent = getMetaAssignEvent(metaRegionInfo);
467    if (assigned) {
468      metaAssignEvent.wake(getProcedureScheduler());
469    } else {
470      metaAssignEvent.suspend();
471    }
472  }
473
474  private ProcedureEvent<?> getMetaAssignEvent(RegionInfo metaRegionInfo) {
475    assert isMetaRegion(metaRegionInfo) : "unexpected non-meta region " + metaRegionInfo;
476    // TODO: handle multiple meta.
477    return metaAssignEvent;
478  }
479
480  /**
481   * Wait until AM finishes the meta loading, i.e, the region states rebuilding.
482   * @see #isMetaLoaded()
483   * @see #waitMetaAssigned(Procedure, RegionInfo)
484   */
485  public boolean waitMetaLoaded(Procedure<?> proc) {
486    return metaLoadEvent.suspendIfNotReady(proc);
487  }
488
489  @VisibleForTesting
490  void wakeMetaLoadedEvent() {
491    metaLoadEvent.wake(getProcedureScheduler());
492    assert isMetaLoaded() : "expected meta to be loaded";
493  }
494
495  /**
496   * Return whether AM finishes the meta loading, i.e, the region states rebuilding.
497   * @see #isMetaAssigned()
498   * @see #waitMetaLoaded(Procedure)
499   */
500  public boolean isMetaLoaded() {
501    return metaLoadEvent.isReady();
502  }
503
504  /**
505   * Start a new thread to check if there are region servers whose versions are higher than others.
506   * If so, move all system table regions to RS with the highest version to keep compatibility.
507   * The reason is, RS in new version may not be able to access RS in old version when there are
508   * some incompatible changes.
509   * <p>This method is called when a new RegionServer is added to cluster only.</p>
510   */
511  public void checkIfShouldMoveSystemRegionAsync() {
512    // TODO: Fix this thread. If a server is killed and a new one started, this thread thinks that
513    // it should 'move' the system tables from the old server to the new server but
514    // ServerCrashProcedure is on it; and it will take care of the assign without dataloss.
515    if (this.master.getServerManager().countOfRegionServers() <= 1) {
516      return;
517    }
518    // This thread used to run whenever there was a change in the cluster. The ZooKeeper
519    // childrenChanged notification came in before the nodeDeleted message and so this method
520    // cold run before a ServerCrashProcedure could run. That meant that this thread could see
521    // a Crashed Server before ServerCrashProcedure and it could find system regions on the
522    // crashed server and go move them before ServerCrashProcedure had a chance; could be
523    // dataloss too if WALs were not recovered.
524    new Thread(() -> {
525      try {
526        synchronized (checkIfShouldMoveSystemRegionLock) {
527          List<RegionPlan> plans = new ArrayList<>();
528          // TODO: I don't think this code does a good job if all servers in cluster have same
529          // version. It looks like it will schedule unnecessary moves.
530          for (ServerName server : getExcludedServersForSystemTable()) {
531            if (master.getServerManager().isServerDead(server)) {
532              // TODO: See HBASE-18494 and HBASE-18495. Though getExcludedServersForSystemTable()
533              // considers only online servers, the server could be queued for dead server
534              // processing. As region assignments for crashed server is handled by
535              // ServerCrashProcedure, do NOT handle them here. The goal is to handle this through
536              // regular flow of LoadBalancer as a favored node and not to have this special
537              // handling.
538              continue;
539            }
540            List<RegionInfo> regionsShouldMove = getSystemTables(server);
541            if (!regionsShouldMove.isEmpty()) {
542              for (RegionInfo regionInfo : regionsShouldMove) {
543                // null value for dest forces destination server to be selected by balancer
544                RegionPlan plan = new RegionPlan(regionInfo, server, null);
545                if (regionInfo.isMetaRegion()) {
546                  // Must move meta region first.
547                  LOG.info("Async MOVE of {} to newer Server={}",
548                      regionInfo.getEncodedName(), server);
549                  moveAsync(plan);
550                } else {
551                  plans.add(plan);
552                }
553              }
554            }
555            for (RegionPlan plan : plans) {
556              LOG.info("Async MOVE of {} to newer Server={}",
557                  plan.getRegionInfo().getEncodedName(), server);
558              moveAsync(plan);
559            }
560          }
561        }
562      } catch (Throwable t) {
563        LOG.error(t.toString(), t);
564      }
565    }).start();
566  }
567
568  private List<RegionInfo> getSystemTables(ServerName serverName) {
569    ServerStateNode serverNode = regionStates.getServerNode(serverName);
570    if (serverNode == null) {
571      return Collections.emptyList();
572    }
573    return serverNode.getSystemRegionInfoList();
574  }
575
576  private void preTransitCheck(RegionStateNode regionNode, RegionState.State[] expectedStates)
577      throws HBaseIOException {
578    if (regionNode.getProcedure() != null) {
579      throw new HBaseIOException(regionNode + " is currently in transition");
580    }
581    if (!regionNode.isInState(expectedStates)) {
582      throw new DoNotRetryRegionException("Unexpected state for " + regionNode);
583    }
584    if (isTableDisabled(regionNode.getTable())) {
585      throw new DoNotRetryIOException(regionNode.getTable() + " is disabled for " + regionNode);
586    }
587  }
588
589  // TODO: Need an async version of this for hbck2.
590  public long assign(RegionInfo regionInfo, ServerName sn) throws IOException {
591    // TODO: should we use getRegionStateNode?
592    RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo);
593    TransitRegionStateProcedure proc;
594    regionNode.lock();
595    try {
596      preTransitCheck(regionNode, STATES_EXPECTED_ON_ASSIGN);
597      proc = TransitRegionStateProcedure.assign(getProcedureEnvironment(), regionInfo, sn);
598      regionNode.setProcedure(proc);
599    } finally {
600      regionNode.unlock();
601    }
602    ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc);
603    return proc.getProcId();
604  }
605
606  public long assign(RegionInfo regionInfo) throws IOException {
607    return assign(regionInfo, null);
608  }
609
610  public long unassign(RegionInfo regionInfo) throws IOException {
611    RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo);
612    if (regionNode == null) {
613      throw new UnknownRegionException("No RegionState found for " + regionInfo.getEncodedName());
614    }
615    TransitRegionStateProcedure proc;
616    regionNode.lock();
617    try {
618      preTransitCheck(regionNode, STATES_EXPECTED_ON_UNASSIGN_OR_MOVE);
619      proc = TransitRegionStateProcedure.unassign(getProcedureEnvironment(), regionInfo);
620      regionNode.setProcedure(proc);
621    } finally {
622      regionNode.unlock();
623    }
624    ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc);
625    return proc.getProcId();
626  }
627
628  public TransitRegionStateProcedure createMoveRegionProcedure(RegionInfo regionInfo,
629      ServerName targetServer) throws HBaseIOException {
630    RegionStateNode regionNode = this.regionStates.getRegionStateNode(regionInfo);
631    if (regionNode == null) {
632      throw new UnknownRegionException("No RegionState found for " + regionInfo.getEncodedName());
633    }
634    TransitRegionStateProcedure proc;
635    regionNode.lock();
636    try {
637      preTransitCheck(regionNode, STATES_EXPECTED_ON_UNASSIGN_OR_MOVE);
638      regionNode.checkOnline();
639      proc = TransitRegionStateProcedure.move(getProcedureEnvironment(), regionInfo, targetServer);
640      regionNode.setProcedure(proc);
641    } finally {
642      regionNode.unlock();
643    }
644    return proc;
645  }
646
647  public void move(RegionInfo regionInfo) throws IOException {
648    TransitRegionStateProcedure proc = createMoveRegionProcedure(regionInfo, null);
649    ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc);
650  }
651
652  public Future<byte[]> moveAsync(RegionPlan regionPlan) throws HBaseIOException {
653    TransitRegionStateProcedure proc =
654      createMoveRegionProcedure(regionPlan.getRegionInfo(), regionPlan.getDestination());
655    return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), proc);
656  }
657
658  // ============================================================================================
659  //  RegionTransition procedures helpers
660  // ============================================================================================
661
662  /**
663   * Create round-robin assigns. Use on table creation to distribute out regions across cluster.
664   * @return AssignProcedures made out of the passed in <code>hris</code> and a call to the balancer
665   *         to populate the assigns with targets chosen using round-robin (default balancer
666   *         scheme). If at assign-time, the target chosen is no longer up, thats fine, the
667   *         AssignProcedure will ask the balancer for a new target, and so on.
668   */
669  public TransitRegionStateProcedure[] createRoundRobinAssignProcedures(List<RegionInfo> hris,
670      List<ServerName> serversToExclude) {
671    if (hris.isEmpty()) {
672      return new TransitRegionStateProcedure[0];
673    }
674
675    if (serversToExclude != null
676        && this.master.getServerManager().getOnlineServersList().size() == 1) {
677      LOG.debug("Only one region server found and hence going ahead with the assignment");
678      serversToExclude = null;
679    }
680    try {
681      // Ask the balancer to assign our regions. Pass the regions en masse. The balancer can do
682      // a better job if it has all the assignments in the one lump.
683      Map<ServerName, List<RegionInfo>> assignments = getBalancer().roundRobinAssignment(hris,
684        this.master.getServerManager().createDestinationServersList(serversToExclude));
685      // Return mid-method!
686      return createAssignProcedures(assignments);
687    } catch (HBaseIOException hioe) {
688      LOG.warn("Failed roundRobinAssignment", hioe);
689    }
690    // If an error above, fall-through to this simpler assign. Last resort.
691    return createAssignProcedures(hris);
692  }
693
694  /**
695   * Create round-robin assigns. Use on table creation to distribute out regions across cluster.
696   * @return AssignProcedures made out of the passed in <code>hris</code> and a call to the balancer
697   *         to populate the assigns with targets chosen using round-robin (default balancer
698   *         scheme). If at assign-time, the target chosen is no longer up, thats fine, the
699   *         AssignProcedure will ask the balancer for a new target, and so on.
700   */
701  public TransitRegionStateProcedure[] createRoundRobinAssignProcedures(List<RegionInfo> hris) {
702    return createRoundRobinAssignProcedures(hris, null);
703  }
704
705  @VisibleForTesting
706  static int compare(TransitRegionStateProcedure left, TransitRegionStateProcedure right) {
707    if (left.getRegion().isMetaRegion()) {
708      if (right.getRegion().isMetaRegion()) {
709        return RegionInfo.COMPARATOR.compare(left.getRegion(), right.getRegion());
710      }
711      return -1;
712    } else if (right.getRegion().isMetaRegion()) {
713      return +1;
714    }
715    if (left.getRegion().getTable().isSystemTable()) {
716      if (right.getRegion().getTable().isSystemTable()) {
717        return RegionInfo.COMPARATOR.compare(left.getRegion(), right.getRegion());
718      }
719      return -1;
720    } else if (right.getRegion().getTable().isSystemTable()) {
721      return +1;
722    }
723    return RegionInfo.COMPARATOR.compare(left.getRegion(), right.getRegion());
724  }
725
726  private TransitRegionStateProcedure createAssignProcedure(RegionStateNode regionNode,
727      ServerName targetServer, boolean override) {
728    TransitRegionStateProcedure proc;
729    regionNode.lock();
730    try {
731      if(override && regionNode.getProcedure() != null) {
732        regionNode.unsetProcedure(regionNode.getProcedure());
733      }
734      assert regionNode.getProcedure() == null;
735      proc = TransitRegionStateProcedure.assign(getProcedureEnvironment(),
736        regionNode.getRegionInfo(), targetServer);
737      regionNode.setProcedure(proc);
738    } finally {
739      regionNode.unlock();
740    }
741    return proc;
742  }
743
744  private TransitRegionStateProcedure createUnassignProcedure(RegionStateNode regionNode,
745      boolean override) {
746    TransitRegionStateProcedure proc;
747    regionNode.lock();
748    try {
749      if(override && regionNode.getProcedure() != null) {
750        regionNode.unsetProcedure(regionNode.getProcedure());
751      }
752      assert regionNode.getProcedure() == null;
753      proc = TransitRegionStateProcedure.unassign(getProcedureEnvironment(),
754          regionNode.getRegionInfo());
755      regionNode.setProcedure(proc);
756    } finally {
757      regionNode.unlock();
758    }
759    return proc;
760  }
761
762  /**
763   * Create one TransitRegionStateProcedure to assign a region w/o specifying a target server.
764   * This method is specified for HBCK2
765   */
766  public TransitRegionStateProcedure createOneAssignProcedure(RegionInfo hri, boolean override) {
767    RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(hri);
768    return createAssignProcedure(regionNode, null, override);
769  }
770
771  /**
772   * Create one TransitRegionStateProcedure to unassign a region.
773   * This method is specified for HBCK2
774   */
775  public TransitRegionStateProcedure createOneUnassignProcedure(RegionInfo hri, boolean override) {
776    RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(hri);
777    return createUnassignProcedure(regionNode, override);
778  }
779
780  /**
781   * Create an array of TransitRegionStateProcedure w/o specifying a target server.
782   * <p/>
783   * If no target server, at assign time, we will try to use the former location of the region if
784   * one exists. This is how we 'retain' the old location across a server restart.
785   * <p/>
786   * Should only be called when you can make sure that no one can touch these regions other than
787   * you. For example, when you are creating table.
788   */
789  public TransitRegionStateProcedure[] createAssignProcedures(List<RegionInfo> hris) {
790    return hris.stream().map(hri -> regionStates.getOrCreateRegionStateNode(hri))
791        .map(regionNode -> createAssignProcedure(regionNode, null, false))
792        .sorted(AssignmentManager::compare).toArray(TransitRegionStateProcedure[]::new);
793  }
794
795  /**
796   * @param assignments Map of assignments from which we produce an array of AssignProcedures.
797   * @return Assignments made from the passed in <code>assignments</code>
798   */
799  private TransitRegionStateProcedure[] createAssignProcedures(
800      Map<ServerName, List<RegionInfo>> assignments) {
801    return assignments.entrySet().stream()
802      .flatMap(e -> e.getValue().stream().map(hri -> regionStates.getOrCreateRegionStateNode(hri))
803        .map(regionNode -> createAssignProcedure(regionNode, e.getKey(), false)))
804      .sorted(AssignmentManager::compare).toArray(TransitRegionStateProcedure[]::new);
805  }
806
807  /**
808   * Called by DisableTableProcedure to unassign all the regions for a table.
809   */
810  public TransitRegionStateProcedure[] createUnassignProceduresForDisabling(TableName tableName) {
811    return regionStates.getTableRegionStateNodes(tableName).stream().map(regionNode -> {
812      regionNode.lock();
813      try {
814        if (!regionStates.include(regionNode, false) ||
815          regionStates.isRegionOffline(regionNode.getRegionInfo())) {
816          return null;
817        }
818        // As in DisableTableProcedure, we will hold the xlock for table, so we can make sure that
819        // this procedure has not been executed yet, as TRSP will hold the shared lock for table all
820        // the time. So here we will unset it and when it is actually executed, it will find that
821        // the attach procedure is not itself and quit immediately.
822        if (regionNode.getProcedure() != null) {
823          regionNode.unsetProcedure(regionNode.getProcedure());
824        }
825        TransitRegionStateProcedure proc = TransitRegionStateProcedure
826          .unassign(getProcedureEnvironment(), regionNode.getRegionInfo());
827        regionNode.setProcedure(proc);
828        return proc;
829      } finally {
830        regionNode.unlock();
831      }
832    }).filter(p -> p != null).toArray(TransitRegionStateProcedure[]::new);
833  }
834
835  public SplitTableRegionProcedure createSplitProcedure(final RegionInfo regionToSplit,
836      final byte[] splitKey) throws IOException {
837    return new SplitTableRegionProcedure(getProcedureEnvironment(), regionToSplit, splitKey);
838  }
839
840  public MergeTableRegionsProcedure createMergeProcedure(RegionInfo ... ris) throws IOException {
841    return new MergeTableRegionsProcedure(getProcedureEnvironment(), ris, false);
842  }
843
844  /**
845   * Delete the region states. This is called by "DeleteTable"
846   */
847  public void deleteTable(final TableName tableName) throws IOException {
848    final ArrayList<RegionInfo> regions = regionStates.getTableRegionsInfo(tableName);
849    regionStateStore.deleteRegions(regions);
850    for (int i = 0; i < regions.size(); ++i) {
851      final RegionInfo regionInfo = regions.get(i);
852      // we expect the region to be offline
853      regionStates.removeFromOfflineRegions(regionInfo);
854      regionStates.deleteRegion(regionInfo);
855    }
856  }
857
858  // ============================================================================================
859  //  RS Region Transition Report helpers
860  // ============================================================================================
861  private void reportRegionStateTransition(ReportRegionStateTransitionResponse.Builder builder,
862      ServerName serverName, List<RegionStateTransition> transitionList) throws IOException {
863    for (RegionStateTransition transition : transitionList) {
864      switch (transition.getTransitionCode()) {
865        case OPENED:
866        case FAILED_OPEN:
867        case CLOSED:
868          assert transition.getRegionInfoCount() == 1 : transition;
869          final RegionInfo hri = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0));
870          long procId =
871            transition.getProcIdCount() > 0 ? transition.getProcId(0) : Procedure.NO_PROC_ID;
872          updateRegionTransition(serverName, transition.getTransitionCode(), hri,
873            transition.hasOpenSeqNum() ? transition.getOpenSeqNum() : HConstants.NO_SEQNUM, procId);
874          break;
875        case READY_TO_SPLIT:
876        case SPLIT:
877        case SPLIT_REVERTED:
878          assert transition.getRegionInfoCount() == 3 : transition;
879          final RegionInfo parent = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0));
880          final RegionInfo splitA = ProtobufUtil.toRegionInfo(transition.getRegionInfo(1));
881          final RegionInfo splitB = ProtobufUtil.toRegionInfo(transition.getRegionInfo(2));
882          updateRegionSplitTransition(serverName, transition.getTransitionCode(), parent, splitA,
883            splitB);
884          break;
885        case READY_TO_MERGE:
886        case MERGED:
887        case MERGE_REVERTED:
888          assert transition.getRegionInfoCount() == 3 : transition;
889          final RegionInfo merged = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0));
890          final RegionInfo mergeA = ProtobufUtil.toRegionInfo(transition.getRegionInfo(1));
891          final RegionInfo mergeB = ProtobufUtil.toRegionInfo(transition.getRegionInfo(2));
892          updateRegionMergeTransition(serverName, transition.getTransitionCode(), merged, mergeA,
893            mergeB);
894          break;
895      }
896    }
897  }
898
899  public ReportRegionStateTransitionResponse reportRegionStateTransition(
900      final ReportRegionStateTransitionRequest req) throws PleaseHoldException {
901    ReportRegionStateTransitionResponse.Builder builder =
902        ReportRegionStateTransitionResponse.newBuilder();
903    ServerName serverName = ProtobufUtil.toServerName(req.getServer());
904    ServerStateNode serverNode = regionStates.getOrCreateServer(serverName);
905    // here we have to acquire a read lock instead of a simple exclusive lock. This is because that
906    // we should not block other reportRegionStateTransition call from the same region server. This
907    // is not only about performance, but also to prevent dead lock. Think of the meta region is
908    // also on the same region server and you hold the lock which blocks the
909    // reportRegionStateTransition for meta, and since meta is not online, you will block inside the
910    // lock protection to wait for meta online...
911    serverNode.readLock().lock();
912    try {
913      // we only accept reportRegionStateTransition if the region server is online, see the comment
914      // above in submitServerCrash method and HBASE-21508 for more details.
915      if (serverNode.isInState(ServerState.ONLINE)) {
916        try {
917          reportRegionStateTransition(builder, serverName, req.getTransitionList());
918        } catch (PleaseHoldException e) {
919          LOG.trace("Failed transition ", e);
920          throw e;
921        } catch (UnsupportedOperationException | IOException e) {
922          // TODO: at the moment we have a single error message and the RS will abort
923          // if the master says that one of the region transitions failed.
924          LOG.warn("Failed transition", e);
925          builder.setErrorMessage("Failed transition " + e.getMessage());
926        }
927      } else {
928        LOG.warn("The region server {} is already dead, skip reportRegionStateTransition call",
929          serverName);
930        builder.setErrorMessage("You are dead");
931      }
932    } finally {
933      serverNode.readLock().unlock();
934    }
935
936    return builder.build();
937  }
938
939  private void updateRegionTransition(ServerName serverName, TransitionCode state,
940      RegionInfo regionInfo, long seqId, long procId) throws IOException {
941    checkMetaLoaded(regionInfo);
942
943    RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo);
944    if (regionNode == null) {
945      // the table/region is gone. maybe a delete, split, merge
946      throw new UnexpectedStateException(String.format(
947        "Server %s was trying to transition region %s to %s. but the region was removed.",
948        serverName, regionInfo, state));
949    }
950    LOG.trace("Update region transition serverName={} region={} regionState={}", serverName,
951      regionNode, state);
952
953    ServerStateNode serverNode = regionStates.getOrCreateServer(serverName);
954    regionNode.lock();
955    try {
956      if (!reportTransition(regionNode, serverNode, state, seqId, procId)) {
957        // Don't log WARN if shutting down cluster; during shutdown. Avoid the below messages:
958        // 2018-08-13 10:45:10,551 WARN ...AssignmentManager: No matching procedure found for
959        // rit=OPEN, location=ve0538.halxg.cloudera.com,16020,1533493000958,
960        // table=IntegrationTestBigLinkedList, region=65ab289e2fc1530df65f6c3d7cde7aa5 transition
961        // to CLOSED
962        // These happen because on cluster shutdown, we currently let the RegionServers close
963        // regions. This is the only time that region close is not run by the Master (so cluster
964        // goes down fast). Consider changing it so Master runs all shutdowns.
965        if (this.master.getServerManager().isClusterShutdown() &&
966          state.equals(TransitionCode.CLOSED)) {
967          LOG.info("RegionServer {} {}", state, regionNode.getRegionInfo().getEncodedName());
968        } else {
969          LOG.warn("No matching procedure found for {} transition to {}", regionNode, state);
970        }
971      }
972    } finally {
973      regionNode.unlock();
974    }
975  }
976
977  private boolean reportTransition(RegionStateNode regionNode, ServerStateNode serverNode,
978      TransitionCode state, long seqId, long procId) throws IOException {
979    ServerName serverName = serverNode.getServerName();
980    TransitRegionStateProcedure proc = regionNode.getProcedure();
981    if (proc == null) {
982      return false;
983    }
984    proc.reportTransition(master.getMasterProcedureExecutor().getEnvironment(), regionNode,
985      serverName, state, seqId, procId);
986    return true;
987  }
988
989  private void updateRegionSplitTransition(final ServerName serverName, final TransitionCode state,
990      final RegionInfo parent, final RegionInfo hriA, final RegionInfo hriB)
991      throws IOException {
992    checkMetaLoaded(parent);
993
994    if (state != TransitionCode.READY_TO_SPLIT) {
995      throw new UnexpectedStateException("unsupported split regionState=" + state +
996        " for parent region " + parent +
997        " maybe an old RS (< 2.0) had the operation in progress");
998    }
999
1000    // sanity check on the request
1001    if (!Bytes.equals(hriA.getEndKey(), hriB.getStartKey())) {
1002      throw new UnsupportedOperationException(
1003        "unsupported split request with bad keys: parent=" + parent +
1004        " hriA=" + hriA + " hriB=" + hriB);
1005    }
1006
1007    // Submit the Split procedure
1008    final byte[] splitKey = hriB.getStartKey();
1009    if (LOG.isDebugEnabled()) {
1010      LOG.debug("Split request from " + serverName +
1011          ", parent=" + parent + " splitKey=" + Bytes.toStringBinary(splitKey));
1012    }
1013    master.getMasterProcedureExecutor().submitProcedure(createSplitProcedure(parent, splitKey));
1014
1015    // If the RS is < 2.0 throw an exception to abort the operation, we are handling the split
1016    if (master.getServerManager().getVersionNumber(serverName) < 0x0200000) {
1017      throw new UnsupportedOperationException(String.format(
1018        "Split handled by the master: parent=%s hriA=%s hriB=%s", parent.getShortNameToLog(), hriA, hriB));
1019    }
1020  }
1021
1022  private void updateRegionMergeTransition(final ServerName serverName, final TransitionCode state,
1023      final RegionInfo merged, final RegionInfo hriA, final RegionInfo hriB) throws IOException {
1024    checkMetaLoaded(merged);
1025
1026    if (state != TransitionCode.READY_TO_MERGE) {
1027      throw new UnexpectedStateException("Unsupported merge regionState=" + state +
1028        " for regionA=" + hriA + " regionB=" + hriB + " merged=" + merged +
1029        " maybe an old RS (< 2.0) had the operation in progress");
1030    }
1031
1032    // Submit the Merge procedure
1033    if (LOG.isDebugEnabled()) {
1034      LOG.debug("Handling merge request from RS=" + merged + ", merged=" + merged);
1035    }
1036    master.getMasterProcedureExecutor().submitProcedure(createMergeProcedure(hriA, hriB));
1037
1038    // If the RS is < 2.0 throw an exception to abort the operation, we are handling the merge
1039    if (master.getServerManager().getVersionNumber(serverName) < 0x0200000) {
1040      throw new UnsupportedOperationException(String.format(
1041        "Merge not handled yet: regionState=%s merged=%s hriA=%s hriB=%s", state, merged, hriA,
1042          hriB));
1043    }
1044  }
1045
1046  // ============================================================================================
1047  //  RS Status update (report online regions) helpers
1048  // ============================================================================================
1049  /**
1050   * The master will call this method when the RS send the regionServerReport(). The report will
1051   * contains the "online regions". This method will check the the online regions against the
1052   * in-memory state of the AM, and we will log a warn message if there is a mismatch. This is
1053   * because that there is no fencing between the reportRegionStateTransition method and
1054   * regionServerReport method, so there could be race and introduce inconsistency here, but
1055   * actually there is no problem.
1056   * <p/>
1057   * Please see HBASE-21421 and HBASE-21463 for more details.
1058   */
1059  public void reportOnlineRegions(ServerName serverName, Set<byte[]> regionNames) {
1060    if (!isRunning()) {
1061      return;
1062    }
1063    if (LOG.isTraceEnabled()) {
1064      LOG.trace("ReportOnlineRegions {} regionCount={}, metaLoaded={} {}", serverName,
1065        regionNames.size(), isMetaLoaded(),
1066        regionNames.stream().map(Bytes::toStringBinary).collect(Collectors.toList()));
1067    }
1068
1069    ServerStateNode serverNode = regionStates.getOrCreateServer(serverName);
1070    synchronized (serverNode) {
1071      if (!serverNode.isInState(ServerState.ONLINE)) {
1072        LOG.warn("Got a report from a server result in state " + serverNode.getState());
1073        return;
1074      }
1075    }
1076
1077    // Track the regionserver reported online regions in memory.
1078    synchronized (rsReports) {
1079      rsReports.put(serverName, regionNames);
1080    }
1081
1082    if (regionNames.isEmpty()) {
1083      // nothing to do if we don't have regions
1084      LOG.trace("no online region found on {}", serverName);
1085      return;
1086    }
1087    if (!isMetaLoaded()) {
1088      // we are still on startup, skip checking
1089      return;
1090    }
1091    // The Heartbeat tells us of what regions are on the region serve, check the state.
1092    checkOnlineRegionsReport(serverNode, regionNames);
1093  }
1094
1095  // just check and output possible inconsistency, without actually doing anything
1096  private void checkOnlineRegionsReport(ServerStateNode serverNode, Set<byte[]> regionNames) {
1097    ServerName serverName = serverNode.getServerName();
1098    for (byte[] regionName : regionNames) {
1099      if (!isRunning()) {
1100        return;
1101      }
1102      RegionStateNode regionNode = regionStates.getRegionStateNodeFromName(regionName);
1103      if (regionNode == null) {
1104        LOG.warn("No region state node for {}, it should already be on {}",
1105          Bytes.toStringBinary(regionName), serverName);
1106        continue;
1107      }
1108      regionNode.lock();
1109      try {
1110        long diff = EnvironmentEdgeManager.currentTime() - regionNode.getLastUpdate();
1111        if (regionNode.isInState(State.OPENING, State.OPEN)) {
1112          // This is possible as a region server has just closed a region but the region server
1113          // report is generated before the closing, but arrive after the closing. Make sure there
1114          // is some elapsed time so less false alarms.
1115          if (!regionNode.getRegionLocation().equals(serverName) && diff > 1000) {
1116            LOG.warn("{} reported OPEN on server={} but state has otherwise", regionNode,
1117              serverName);
1118          }
1119        } else if (!regionNode.isInState(State.CLOSING, State.SPLITTING)) {
1120          // So, we can get report that a region is CLOSED or SPLIT because a heartbeat
1121          // came in at about same time as a region transition. Make sure there is some
1122          // elapsed time so less false alarms.
1123          if (diff > 1000) {
1124            LOG.warn("{} reported an unexpected OPEN on {}; time since last update={}ms",
1125              regionNode, serverName, diff);
1126          }
1127        }
1128      } finally {
1129        regionNode.unlock();
1130      }
1131    }
1132  }
1133
1134  // ============================================================================================
1135  //  RIT chore
1136  // ============================================================================================
1137  private static class RegionInTransitionChore extends ProcedureInMemoryChore<MasterProcedureEnv> {
1138    public RegionInTransitionChore(final int timeoutMsec) {
1139      super(timeoutMsec);
1140    }
1141
1142    @Override
1143    protected void periodicExecute(final MasterProcedureEnv env) {
1144      final AssignmentManager am = env.getAssignmentManager();
1145
1146      final RegionInTransitionStat ritStat = am.computeRegionInTransitionStat();
1147      if (ritStat.hasRegionsOverThreshold()) {
1148        for (RegionState hri: ritStat.getRegionOverThreshold()) {
1149          am.handleRegionOverStuckWarningThreshold(hri.getRegion());
1150        }
1151      }
1152
1153      // update metrics
1154      am.updateRegionsInTransitionMetrics(ritStat);
1155    }
1156  }
1157
1158  private static class DeadServerMetricRegionChore
1159      extends ProcedureInMemoryChore<MasterProcedureEnv> {
1160    public DeadServerMetricRegionChore(final int timeoutMsec) {
1161      super(timeoutMsec);
1162    }
1163
1164    @Override
1165    protected void periodicExecute(final MasterProcedureEnv env) {
1166      final ServerManager sm = env.getMasterServices().getServerManager();
1167      final AssignmentManager am = env.getAssignmentManager();
1168      // To minimize inconsistencies we are not going to snapshot live servers in advance in case
1169      // new servers are added; OTOH we don't want to add heavy sync for a consistent view since
1170      // this is for metrics. Instead, we're going to check each regions as we go; to avoid making
1171      // too many checks, we maintain a local lists of server, limiting us to false negatives. If
1172      // we miss some recently-dead server, we'll just see it next time.
1173      Set<ServerName> recentlyLiveServers = new HashSet<>();
1174      int deadRegions = 0, unknownRegions = 0;
1175      for (RegionStateNode rsn : am.getRegionStates().getRegionStateNodes()) {
1176        if (rsn.getState() != State.OPEN) {
1177          continue; // Opportunistic check, should quickly skip RITs, offline tables, etc.
1178        }
1179        ServerName sn;
1180        State state;
1181        rsn.lock();
1182        try {
1183          sn = rsn.getRegionLocation();
1184          state = rsn.getState();
1185        } finally {
1186          rsn.unlock();
1187        }
1188        if (state != State.OPEN) {
1189          continue; // Mostly skipping RITs that are already being take care of.
1190        }
1191        if (sn == null) {
1192          ++unknownRegions; // Opened on null?
1193          continue;
1194        }
1195        if (recentlyLiveServers.contains(sn)) {
1196          continue;
1197        }
1198        ServerManager.ServerLiveState sls = sm.isServerKnownAndOnline(sn);
1199        switch (sls) {
1200          case LIVE:
1201            recentlyLiveServers.add(sn);
1202            break;
1203          case DEAD:
1204            ++deadRegions;
1205            break;
1206          case UNKNOWN:
1207            ++unknownRegions;
1208            break;
1209          default: throw new AssertionError("Unexpected " + sls);
1210        }
1211      }
1212      if (deadRegions > 0 || unknownRegions > 0) {
1213        LOG.info("Found {} OPEN regions on dead servers and {} OPEN regions on unknown servers",
1214          deadRegions, unknownRegions);
1215      }
1216
1217      am.updateDeadServerRegionMetrics(deadRegions, unknownRegions);
1218    }
1219  }
1220
1221  public RegionInTransitionStat computeRegionInTransitionStat() {
1222    final RegionInTransitionStat rit = new RegionInTransitionStat(getConfiguration());
1223    rit.update(this);
1224    return rit;
1225  }
1226
1227  public static class RegionInTransitionStat {
1228    private final int ritThreshold;
1229
1230    private HashMap<String, RegionState> ritsOverThreshold = null;
1231    private long statTimestamp;
1232    private long oldestRITTime = 0;
1233    private int totalRITsTwiceThreshold = 0;
1234    private int totalRITs = 0;
1235
1236    @VisibleForTesting
1237    public RegionInTransitionStat(final Configuration conf) {
1238      this.ritThreshold =
1239        conf.getInt(METRICS_RIT_STUCK_WARNING_THRESHOLD, DEFAULT_RIT_STUCK_WARNING_THRESHOLD);
1240    }
1241
1242    public int getRITThreshold() {
1243      return ritThreshold;
1244    }
1245
1246    public long getTimestamp() {
1247      return statTimestamp;
1248    }
1249
1250    public int getTotalRITs() {
1251      return totalRITs;
1252    }
1253
1254    public long getOldestRITTime() {
1255      return oldestRITTime;
1256    }
1257
1258    public int getTotalRITsOverThreshold() {
1259      Map<String, RegionState> m = this.ritsOverThreshold;
1260      return m != null ? m.size() : 0;
1261    }
1262
1263    public boolean hasRegionsTwiceOverThreshold() {
1264      return totalRITsTwiceThreshold > 0;
1265    }
1266
1267    public boolean hasRegionsOverThreshold() {
1268      Map<String, RegionState> m = this.ritsOverThreshold;
1269      return m != null && !m.isEmpty();
1270    }
1271
1272    public Collection<RegionState> getRegionOverThreshold() {
1273      Map<String, RegionState> m = this.ritsOverThreshold;
1274      return m != null? m.values(): Collections.emptySet();
1275    }
1276
1277    public boolean isRegionOverThreshold(final RegionInfo regionInfo) {
1278      Map<String, RegionState> m = this.ritsOverThreshold;
1279      return m != null && m.containsKey(regionInfo.getEncodedName());
1280    }
1281
1282    public boolean isRegionTwiceOverThreshold(final RegionInfo regionInfo) {
1283      Map<String, RegionState> m = this.ritsOverThreshold;
1284      if (m == null) return false;
1285      final RegionState state = m.get(regionInfo.getEncodedName());
1286      if (state == null) return false;
1287      return (statTimestamp - state.getStamp()) > (ritThreshold * 2);
1288    }
1289
1290    protected void update(final AssignmentManager am) {
1291      final RegionStates regionStates = am.getRegionStates();
1292      this.statTimestamp = EnvironmentEdgeManager.currentTime();
1293      update(regionStates.getRegionsStateInTransition(), statTimestamp);
1294      update(regionStates.getRegionFailedOpen(), statTimestamp);
1295    }
1296
1297    private void update(final Collection<RegionState> regions, final long currentTime) {
1298      for (RegionState state: regions) {
1299        totalRITs++;
1300        final long ritStartedMs = state.getStamp();
1301        if (ritStartedMs == 0) {
1302          // Don't output bogus values to metrics if they accidentally make it here.
1303          LOG.warn("The RIT {} has no start time", state.getRegion());
1304          continue;
1305        }
1306        final long ritTime = currentTime - ritStartedMs;
1307        if (ritTime > ritThreshold) {
1308          if (ritsOverThreshold == null) {
1309            ritsOverThreshold = new HashMap<String, RegionState>();
1310          }
1311          ritsOverThreshold.put(state.getRegion().getEncodedName(), state);
1312          totalRITsTwiceThreshold += (ritTime > (ritThreshold * 2)) ? 1 : 0;
1313        }
1314        if (oldestRITTime < ritTime) {
1315          oldestRITTime = ritTime;
1316        }
1317      }
1318    }
1319  }
1320
1321  private void updateRegionsInTransitionMetrics(final RegionInTransitionStat ritStat) {
1322    metrics.updateRITOldestAge(ritStat.getOldestRITTime());
1323    metrics.updateRITCount(ritStat.getTotalRITs());
1324    metrics.updateRITCountOverThreshold(ritStat.getTotalRITsOverThreshold());
1325  }
1326
1327  private void updateDeadServerRegionMetrics(int deadRegions, int unknownRegions) {
1328    metrics.updateDeadServerOpenRegions(deadRegions);
1329    metrics.updateUnknownServerOpenRegions(unknownRegions);
1330  }
1331
1332  private void handleRegionOverStuckWarningThreshold(final RegionInfo regionInfo) {
1333    final RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo);
1334    //if (regionNode.isStuck()) {
1335    LOG.warn("STUCK Region-In-Transition {}", regionNode);
1336  }
1337
1338  // ============================================================================================
1339  //  TODO: Master load/bootstrap
1340  // ============================================================================================
1341  public void joinCluster() throws IOException {
1342    long startTime = System.nanoTime();
1343    LOG.debug("Joining cluster...");
1344
1345    // Scan hbase:meta to build list of existing regions, servers, and assignment.
1346    // hbase:meta is online now or will be. Inside loadMeta, we keep trying. Can't make progress
1347    // w/o  meta.
1348    loadMeta();
1349
1350    while (master.getServerManager().countOfRegionServers() < 1) {
1351      LOG.info("Waiting for RegionServers to join; current count={}",
1352        master.getServerManager().countOfRegionServers());
1353      Threads.sleep(250);
1354    }
1355    LOG.info("Number of RegionServers={}", master.getServerManager().countOfRegionServers());
1356
1357    // Start the chores
1358    master.getMasterProcedureExecutor().addChore(this.ritChore);
1359    master.getMasterProcedureExecutor().addChore(this.deadMetricChore);
1360
1361    long costMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
1362    LOG.info("Joined the cluster in {}", StringUtils.humanTimeDiff(costMs));
1363  }
1364
1365  /**
1366   * Create assign procedure for offline regions.
1367   * Just follow the old processofflineServersWithOnlineRegions method. Since now we do not need to
1368   * deal with dead server any more, we only deal with the regions in OFFLINE state in this method.
1369   * And this is a bit strange, that for new regions, we will add it in CLOSED state instead of
1370   * OFFLINE state, and usually there will be a procedure to track them. The
1371   * processofflineServersWithOnlineRegions is a legacy from long ago, as things are going really
1372   * different now, maybe we do not need this method any more. Need to revisit later.
1373   */
1374  // Public so can be run by the Master as part of the startup. Needs hbase:meta to be online.
1375  // Needs to be done after the table state manager has been started.
1376  public void processOfflineRegions() {
1377    List<RegionInfo> offlineRegions = regionStates.getRegionStates().stream()
1378      .filter(RegionState::isOffline).filter(s -> isTableEnabled(s.getRegion().getTable()))
1379      .map(RegionState::getRegion).collect(Collectors.toList());
1380    if (!offlineRegions.isEmpty()) {
1381      master.getMasterProcedureExecutor().submitProcedures(
1382        master.getAssignmentManager().createRoundRobinAssignProcedures(offlineRegions));
1383    }
1384  }
1385
1386  /* AM internal RegionStateStore.RegionStateVisitor implementation. To be used when
1387   * scanning META table for region rows, using RegionStateStore utility methods. RegionStateStore
1388   * methods will convert Result into proper RegionInfo instances, but those would still need to be
1389   * added into AssignmentManager.regionStates in-memory cache.
1390   * RegionMetaLoadingVisitor.visitRegionState method provides the logic for adding RegionInfo
1391   * instances as loaded from latest META scan into AssignmentManager.regionStates.
1392   */
1393  private class RegionMetaLoadingVisitor implements RegionStateStore.RegionStateVisitor  {
1394
1395    @Override
1396    public void visitRegionState(Result result, final RegionInfo regionInfo, final State state,
1397      final ServerName regionLocation, final ServerName lastHost, final long openSeqNum) {
1398      if (state == null && regionLocation == null && lastHost == null &&
1399        openSeqNum == SequenceId.NO_SEQUENCE_ID) {
1400        // This is a row with nothing in it.
1401        LOG.warn("Skipping empty row={}", result);
1402        return;
1403      }
1404      State localState = state;
1405      if (localState == null) {
1406        // No region state column data in hbase:meta table! Are I doing a rolling upgrade from
1407        // hbase1 to hbase2? Am I restoring a SNAPSHOT or otherwise adding a region to hbase:meta?
1408        // In any of these cases, state is empty. For now, presume OFFLINE but there are probably
1409        // cases where we need to probe more to be sure this correct; TODO informed by experience.
1410        LOG.info(regionInfo.getEncodedName() + " regionState=null; presuming " + State.OFFLINE);
1411        localState = State.OFFLINE;
1412      }
1413      RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo);
1414      // Do not need to lock on regionNode, as we can make sure that before we finish loading
1415      // meta, all the related procedures can not be executed. The only exception is for meta
1416      // region related operations, but here we do not load the informations for meta region.
1417      regionNode.setState(localState);
1418      regionNode.setLastHost(lastHost);
1419      regionNode.setRegionLocation(regionLocation);
1420      regionNode.setOpenSeqNum(openSeqNum);
1421
1422      // Note: keep consistent with other methods, see region(Opening|Opened|Closing)
1423      //       RIT/ServerCrash handling should take care of the transiting regions.
1424      if (localState.matches(State.OPEN, State.OPENING, State.CLOSING, State.SPLITTING,
1425        State.MERGING)) {
1426        assert regionLocation != null : "found null region location for " + regionNode;
1427        regionStates.addRegionToServer(regionNode);
1428      } else if (localState == State.OFFLINE || regionInfo.isOffline()) {
1429        regionStates.addToOfflineRegions(regionNode);
1430      }
1431      if (regionNode.getProcedure() != null) {
1432        regionNode.getProcedure().stateLoaded(AssignmentManager.this, regionNode);
1433      }
1434    }
1435  };
1436
1437  /**
1438   * Query META if the given <code>RegionInfo</code> exists, adding to
1439   * <code>AssignmentManager.regionStateStore</code> cache if the region is found in META.
1440   * @param regionEncodedName encoded name for the region to be loaded from META into
1441   *                          <code>AssignmentManager.regionStateStore</code> cache
1442   * @return <code>RegionInfo</code> instance for the given region if it is present in META
1443   *          and got successfully loaded into <code>AssignmentManager.regionStateStore</code>
1444   *          cache, <b>null</b> otherwise.
1445   * @throws UnknownRegionException if any errors occur while querying meta.
1446   */
1447  public RegionInfo loadRegionFromMeta(String regionEncodedName) throws UnknownRegionException {
1448    try {
1449      RegionMetaLoadingVisitor visitor = new RegionMetaLoadingVisitor();
1450      regionStateStore.visitMetaForRegion(regionEncodedName, visitor);
1451      return regionStates.getRegionState(regionEncodedName) == null ? null :
1452        regionStates.getRegionState(regionEncodedName).getRegion();
1453    } catch(IOException e) {
1454      LOG.error("Error trying to load region {} from META", regionEncodedName, e);
1455      throw new UnknownRegionException("Error while trying load region from meta");
1456    }
1457  }
1458
1459  private void loadMeta() throws IOException {
1460    // TODO: use a thread pool
1461    regionStateStore.visitMeta(new RegionMetaLoadingVisitor());
1462    // every assignment is blocked until meta is loaded.
1463    wakeMetaLoadedEvent();
1464  }
1465
1466  /**
1467   * Used to check if the meta loading is done.
1468   * <p/>
1469   * if not we throw PleaseHoldException since we are rebuilding the RegionStates
1470   * @param hri region to check if it is already rebuild
1471   * @throws PleaseHoldException if meta has not been loaded yet
1472   */
1473  private void checkMetaLoaded(RegionInfo hri) throws PleaseHoldException {
1474    if (!isRunning()) {
1475      throw new PleaseHoldException("AssignmentManager not running");
1476    }
1477    boolean meta = isMetaRegion(hri);
1478    boolean metaLoaded = isMetaLoaded();
1479    if (!meta && !metaLoaded) {
1480      throw new PleaseHoldException(
1481        "Master not fully online; hbase:meta=" + meta + ", metaLoaded=" + metaLoaded);
1482    }
1483  }
1484
1485  // ============================================================================================
1486  //  TODO: Metrics
1487  // ============================================================================================
1488  public int getNumRegionsOpened() {
1489    // TODO: Used by TestRegionPlacement.java and assume monotonically increasing value
1490    return 0;
1491  }
1492
1493  /**
1494   * Usually run by the Master in reaction to server crash during normal processing.
1495   * Can also be invoked via external RPC to effect repair; in the latter case,
1496   * the 'force' flag is set so we push through the SCP though context may indicate
1497   * already-running-SCP (An old SCP may have exited abnormally, or damaged cluster
1498   * may still have references in hbase:meta to 'Unknown Servers' -- servers that
1499   * are not online or in dead servers list, etc.)
1500   * @param force Set if the request came in externally over RPC (via hbck2). Force means
1501   *              run the SCP even if it seems as though there might be an outstanding
1502   *              SCP running.
1503   * @return pid of scheduled SCP or {@link Procedure#NO_PROC_ID} if none scheduled.
1504   */
1505  public long submitServerCrash(ServerName serverName, boolean shouldSplitWal, boolean force) {
1506    // May be an 'Unknown Server' so handle case where serverNode is null.
1507    ServerStateNode serverNode = regionStates.getServerNode(serverName);
1508    // Remove the in-memory rsReports result
1509    synchronized (rsReports) {
1510      rsReports.remove(serverName);
1511    }
1512
1513    // We hold the write lock here for fencing on reportRegionStateTransition. Once we set the
1514    // server state to CRASHED, we will no longer accept the reportRegionStateTransition call from
1515    // this server. This is used to simplify the implementation for TRSP and SCP, where we can make
1516    // sure that, the region list fetched by SCP will not be changed any more.
1517    if (serverNode != null) {
1518      serverNode.writeLock().lock();
1519    }
1520    boolean carryingMeta;
1521    long pid;
1522    try {
1523      ProcedureExecutor<MasterProcedureEnv> procExec = this.master.getMasterProcedureExecutor();
1524      carryingMeta = isCarryingMeta(serverName);
1525      if (!force && serverNode != null && !serverNode.isInState(ServerState.ONLINE)) {
1526        LOG.info("Skip adding SCP for {} (meta={}) -- running?", serverNode, carryingMeta);
1527        return Procedure.NO_PROC_ID;
1528      } else {
1529        MasterProcedureEnv mpe = procExec.getEnvironment();
1530        // If serverNode == null, then 'Unknown Server'. Schedule HBCKSCP instead.
1531        // HBCKSCP scours Master in-memory state AND hbase;meta for references to
1532        // serverName just-in-case. An SCP that is scheduled when the server is
1533        // 'Unknown' probably originated externally with HBCK2 fix-it tool.
1534        ServerState oldState = null;
1535        if (serverNode != null) {
1536          oldState = serverNode.getState();
1537          serverNode.setState(ServerState.CRASHED);
1538        }
1539
1540        if (force) {
1541          pid = procExec.submitProcedure(
1542              new HBCKServerCrashProcedure(mpe, serverName, shouldSplitWal, carryingMeta));
1543        } else {
1544          pid = procExec.submitProcedure(
1545              new ServerCrashProcedure(mpe, serverName, shouldSplitWal, carryingMeta));
1546        }
1547        LOG.info("Scheduled SCP pid={} for {} (carryingMeta={}){}.", pid, serverName, carryingMeta,
1548            serverNode == null? "": " " + serverNode.toString() + ", oldState=" + oldState);
1549      }
1550    } finally {
1551      if (serverNode != null) {
1552        serverNode.writeLock().unlock();
1553      }
1554    }
1555    return pid;
1556  }
1557
1558  public void offlineRegion(final RegionInfo regionInfo) {
1559    // TODO used by MasterRpcServices
1560    RegionStateNode node = regionStates.getRegionStateNode(regionInfo);
1561    if (node != null) {
1562      node.offline();
1563    }
1564  }
1565
1566  public void onlineRegion(final RegionInfo regionInfo, final ServerName serverName) {
1567    // TODO used by TestSplitTransactionOnCluster.java
1568  }
1569
1570  public Map<ServerName, List<RegionInfo>> getSnapShotOfAssignment(
1571      final Collection<RegionInfo> regions) {
1572    return regionStates.getSnapShotOfAssignment(regions);
1573  }
1574
1575  // ============================================================================================
1576  //  TODO: UTILS/HELPERS?
1577  // ============================================================================================
1578  /**
1579   * Used by the client (via master) to identify if all regions have the schema updates
1580   *
1581   * @param tableName
1582   * @return Pair indicating the status of the alter command (pending/total)
1583   * @throws IOException
1584   */
1585  public Pair<Integer, Integer> getReopenStatus(TableName tableName) {
1586    if (isTableDisabled(tableName)) return new Pair<Integer, Integer>(0, 0);
1587
1588    final List<RegionState> states = regionStates.getTableRegionStates(tableName);
1589    int ritCount = 0;
1590    for (RegionState regionState: states) {
1591      if (!regionState.isOpened() && !regionState.isSplit()) {
1592        ritCount++;
1593      }
1594    }
1595    return new Pair<Integer, Integer>(ritCount, states.size());
1596  }
1597
1598  // ============================================================================================
1599  //  TODO: Region State In Transition
1600  // ============================================================================================
1601  public boolean hasRegionsInTransition() {
1602    return regionStates.hasRegionsInTransition();
1603  }
1604
1605  public List<RegionStateNode> getRegionsInTransition() {
1606    return regionStates.getRegionsInTransition();
1607  }
1608
1609  public List<RegionInfo> getAssignedRegions() {
1610    return regionStates.getAssignedRegions();
1611  }
1612
1613  public RegionInfo getRegionInfo(final byte[] regionName) {
1614    final RegionStateNode regionState = regionStates.getRegionStateNodeFromName(regionName);
1615    return regionState != null ? regionState.getRegionInfo() : null;
1616  }
1617
1618  // ============================================================================================
1619  //  Expected states on region state transition.
1620  //  Notice that there is expected states for transiting to OPENING state, this is because SCP.
1621  //  See the comments in regionOpening method for more details.
1622  // ============================================================================================
1623  private static final State[] STATES_EXPECTED_ON_OPEN = {
1624    State.OPENING, // Normal case
1625    State.OPEN // Retrying
1626  };
1627
1628  private static final State[] STATES_EXPECTED_ON_CLOSING = {
1629    State.OPEN, // Normal case
1630    State.CLOSING, // Retrying
1631    State.SPLITTING, // Offline the split parent
1632    State.MERGING // Offline the merge parents
1633  };
1634
1635  private static final State[] STATES_EXPECTED_ON_CLOSED = {
1636    State.CLOSING, // Normal case
1637    State.CLOSED // Retrying
1638  };
1639
1640  // This is for manually scheduled region assign, can add other states later if we find out other
1641  // usages
1642  private static final State[] STATES_EXPECTED_ON_ASSIGN = { State.CLOSED, State.OFFLINE };
1643
1644  // We only allow unassign or move a region which is in OPEN state.
1645  private static final State[] STATES_EXPECTED_ON_UNASSIGN_OR_MOVE = { State.OPEN };
1646
1647  // ============================================================================================
1648  // Region Status update
1649  // Should only be called in TransitRegionStateProcedure(and related procedures), as the locking
1650  // and pre-assumptions are very tricky.
1651  // ============================================================================================
1652  private void transitStateAndUpdate(RegionStateNode regionNode, RegionState.State newState,
1653      RegionState.State... expectedStates) throws IOException {
1654    RegionState.State state = regionNode.getState();
1655    regionNode.transitionState(newState, expectedStates);
1656    boolean succ = false;
1657    try {
1658      regionStateStore.updateRegionLocation(regionNode);
1659      succ = true;
1660    } finally {
1661      if (!succ) {
1662        // revert
1663        regionNode.setState(state);
1664      }
1665    }
1666  }
1667
1668  // should be called within the synchronized block of RegionStateNode
1669  void regionOpening(RegionStateNode regionNode) throws IOException {
1670    // As in SCP, for performance reason, there is no TRSP attached with this region, we will not
1671    // update the region state, which means that the region could be in any state when we want to
1672    // assign it after a RS crash. So here we do not pass the expectedStates parameter.
1673    transitStateAndUpdate(regionNode, State.OPENING);
1674    regionStates.addRegionToServer(regionNode);
1675    // update the operation count metrics
1676    metrics.incrementOperationCounter();
1677  }
1678
1679  // should be called under the RegionStateNode lock
1680  // The parameter 'giveUp' means whether we will try to open the region again, if it is true, then
1681  // we will persist the FAILED_OPEN state into hbase:meta.
1682  void regionFailedOpen(RegionStateNode regionNode, boolean giveUp) throws IOException {
1683    RegionState.State state = regionNode.getState();
1684    ServerName regionLocation = regionNode.getRegionLocation();
1685    if (giveUp) {
1686      regionNode.setState(State.FAILED_OPEN);
1687      regionNode.setRegionLocation(null);
1688      boolean succ = false;
1689      try {
1690        regionStateStore.updateRegionLocation(regionNode);
1691        succ = true;
1692      } finally {
1693        if (!succ) {
1694          // revert
1695          regionNode.setState(state);
1696          regionNode.setRegionLocation(regionLocation);
1697        }
1698      }
1699    }
1700    if (regionLocation != null) {
1701      regionStates.removeRegionFromServer(regionLocation, regionNode);
1702    }
1703  }
1704
1705  // should be called under the RegionStateNode lock
1706  void regionClosing(RegionStateNode regionNode) throws IOException {
1707    transitStateAndUpdate(regionNode, State.CLOSING, STATES_EXPECTED_ON_CLOSING);
1708
1709    RegionInfo hri = regionNode.getRegionInfo();
1710    // Set meta has not initialized early. so people trying to create/edit tables will wait
1711    if (isMetaRegion(hri)) {
1712      setMetaAssigned(hri, false);
1713    }
1714    regionStates.addRegionToServer(regionNode);
1715    // update the operation count metrics
1716    metrics.incrementOperationCounter();
1717  }
1718
1719  // for open and close, they will first be persist to the procedure store in
1720  // RegionRemoteProcedureBase. So here we will first change the in memory state as it is considered
1721  // as succeeded if the persistence to procedure store is succeeded, and then when the
1722  // RegionRemoteProcedureBase is woken up, we will persist the RegionStateNode to hbase:meta.
1723
1724  // should be called under the RegionStateNode lock
1725  void regionOpenedWithoutPersistingToMeta(RegionStateNode regionNode) throws IOException {
1726    regionNode.transitionState(State.OPEN, STATES_EXPECTED_ON_OPEN);
1727    RegionInfo regionInfo = regionNode.getRegionInfo();
1728    regionStates.addRegionToServer(regionNode);
1729    regionStates.removeFromFailedOpen(regionInfo);
1730  }
1731
1732  // should be called under the RegionStateNode lock
1733  void regionClosedWithoutPersistingToMeta(RegionStateNode regionNode) throws IOException {
1734    ServerName regionLocation = regionNode.getRegionLocation();
1735    regionNode.transitionState(State.CLOSED, STATES_EXPECTED_ON_CLOSED);
1736    regionNode.setRegionLocation(null);
1737    if (regionLocation != null) {
1738      regionNode.setLastHost(regionLocation);
1739      regionStates.removeRegionFromServer(regionLocation, regionNode);
1740    }
1741  }
1742
1743  // should be called under the RegionStateNode lock
1744  // for SCP
1745  void regionClosedAbnormally(RegionStateNode regionNode) throws IOException {
1746    RegionState.State state = regionNode.getState();
1747    ServerName regionLocation = regionNode.getRegionLocation();
1748    regionNode.transitionState(State.ABNORMALLY_CLOSED);
1749    regionNode.setRegionLocation(null);
1750    boolean succ = false;
1751    try {
1752      regionStateStore.updateRegionLocation(regionNode);
1753      succ = true;
1754    } finally {
1755      if (!succ) {
1756        // revert
1757        regionNode.setState(state);
1758        regionNode.setRegionLocation(regionLocation);
1759      }
1760    }
1761    if (regionLocation != null) {
1762      regionNode.setLastHost(regionLocation);
1763      regionStates.removeRegionFromServer(regionLocation, regionNode);
1764    }
1765  }
1766
1767  void persistToMeta(RegionStateNode regionNode) throws IOException {
1768    regionStateStore.updateRegionLocation(regionNode);
1769    RegionInfo regionInfo = regionNode.getRegionInfo();
1770    if (isMetaRegion(regionInfo) && regionNode.getState() == State.OPEN) {
1771      // Usually we'd set a table ENABLED at this stage but hbase:meta is ALWAYs enabled, it
1772      // can't be disabled -- so skip the RPC (besides... enabled is managed by TableStateManager
1773      // which is backed by hbase:meta... Avoid setting ENABLED to avoid having to update state
1774      // on table that contains state.
1775      setMetaAssigned(regionInfo, true);
1776    }
1777  }
1778
1779  // ============================================================================================
1780  // The above methods can only be called in TransitRegionStateProcedure(and related procedures)
1781  // ============================================================================================
1782
1783  public void markRegionAsSplit(final RegionInfo parent, final ServerName serverName,
1784      final RegionInfo daughterA, final RegionInfo daughterB) throws IOException {
1785    // Update hbase:meta. Parent will be marked offline and split up in hbase:meta.
1786    // The parent stays in regionStates until cleared when removed by CatalogJanitor.
1787    // Update its state in regionStates to it shows as offline and split when read
1788    // later figuring what regions are in a table and what are not: see
1789    // regionStates#getRegionsOfTable
1790    final RegionStateNode node = regionStates.getOrCreateRegionStateNode(parent);
1791    node.setState(State.SPLIT);
1792    final RegionStateNode nodeA = regionStates.getOrCreateRegionStateNode(daughterA);
1793    nodeA.setState(State.SPLITTING_NEW);
1794    final RegionStateNode nodeB = regionStates.getOrCreateRegionStateNode(daughterB);
1795    nodeB.setState(State.SPLITTING_NEW);
1796
1797    regionStateStore.splitRegion(parent, daughterA, daughterB, serverName);
1798    if (shouldAssignFavoredNodes(parent)) {
1799      List<ServerName> onlineServers = this.master.getServerManager().getOnlineServersList();
1800      ((FavoredNodesPromoter)getBalancer()).
1801          generateFavoredNodesForDaughter(onlineServers, parent, daughterA, daughterB);
1802    }
1803  }
1804
1805  /**
1806   * When called here, the merge has happened. The merged regions have been
1807   * unassigned and the above markRegionClosed has been called on each so they have been
1808   * disassociated from a hosting Server. The merged region will be open after this call. The
1809   * merged regions are removed from hbase:meta below. Later they are deleted from the filesystem
1810   * by the catalog janitor running against hbase:meta. It notices when the merged region no
1811   * longer holds references to the old regions (References are deleted after a compaction
1812   * rewrites what the Reference points at but not until the archiver chore runs, are the
1813   * References removed).
1814   */
1815  public void markRegionAsMerged(final RegionInfo child, final ServerName serverName,
1816        RegionInfo [] mergeParents)
1817      throws IOException {
1818    final RegionStateNode node = regionStates.getOrCreateRegionStateNode(child);
1819    node.setState(State.MERGED);
1820    for (RegionInfo ri: mergeParents) {
1821      regionStates.deleteRegion(ri);
1822
1823    }
1824    regionStateStore.mergeRegions(child, mergeParents, serverName);
1825    if (shouldAssignFavoredNodes(child)) {
1826      ((FavoredNodesPromoter)getBalancer()).
1827        generateFavoredNodesForMergedRegion(child, mergeParents);
1828    }
1829  }
1830
1831  /*
1832   * Favored nodes should be applied only when FavoredNodes balancer is configured and the region
1833   * belongs to a non-system table.
1834   */
1835  private boolean shouldAssignFavoredNodes(RegionInfo region) {
1836    return this.shouldAssignRegionsWithFavoredNodes &&
1837        FavoredNodesManager.isFavoredNodeApplicable(region);
1838  }
1839
1840  // ============================================================================================
1841  //  Assign Queue (Assign/Balance)
1842  // ============================================================================================
1843  private final ArrayList<RegionStateNode> pendingAssignQueue = new ArrayList<RegionStateNode>();
1844  private final ReentrantLock assignQueueLock = new ReentrantLock();
1845  private final Condition assignQueueFullCond = assignQueueLock.newCondition();
1846
1847  /**
1848   * Add the assign operation to the assignment queue.
1849   * The pending assignment operation will be processed,
1850   * and each region will be assigned by a server using the balancer.
1851   */
1852  protected void queueAssign(final RegionStateNode regionNode) {
1853    regionNode.getProcedureEvent().suspend();
1854
1855    // TODO: quick-start for meta and the other sys-tables?
1856    assignQueueLock.lock();
1857    try {
1858      pendingAssignQueue.add(regionNode);
1859      if (regionNode.isSystemTable() ||
1860          pendingAssignQueue.size() == 1 ||
1861          pendingAssignQueue.size() >= assignDispatchWaitQueueMaxSize) {
1862        assignQueueFullCond.signal();
1863      }
1864    } finally {
1865      assignQueueLock.unlock();
1866    }
1867  }
1868
1869  private void startAssignmentThread() {
1870    // Get Server Thread name. Sometimes the Server is mocked so may not implement HasThread.
1871    // For example, in tests.
1872    String name = master instanceof HasThread? ((HasThread)master).getName():
1873        master.getServerName().toShortString();
1874    assignThread = new Thread(name) {
1875      @Override
1876      public void run() {
1877        while (isRunning()) {
1878          processAssignQueue();
1879        }
1880        pendingAssignQueue.clear();
1881      }
1882    };
1883    assignThread.setDaemon(true);
1884    assignThread.start();
1885  }
1886
1887  private void stopAssignmentThread() {
1888    assignQueueSignal();
1889    try {
1890      while (assignThread.isAlive()) {
1891        assignQueueSignal();
1892        assignThread.join(250);
1893      }
1894    } catch (InterruptedException e) {
1895      LOG.warn("join interrupted", e);
1896      Thread.currentThread().interrupt();
1897    }
1898  }
1899
1900  private void assignQueueSignal() {
1901    assignQueueLock.lock();
1902    try {
1903      assignQueueFullCond.signal();
1904    } finally {
1905      assignQueueLock.unlock();
1906    }
1907  }
1908
1909  @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP")
1910  private HashMap<RegionInfo, RegionStateNode> waitOnAssignQueue() {
1911    HashMap<RegionInfo, RegionStateNode> regions = null;
1912
1913    assignQueueLock.lock();
1914    try {
1915      if (pendingAssignQueue.isEmpty() && isRunning()) {
1916        assignQueueFullCond.await();
1917      }
1918
1919      if (!isRunning()) return null;
1920      assignQueueFullCond.await(assignDispatchWaitMillis, TimeUnit.MILLISECONDS);
1921      regions = new HashMap<RegionInfo, RegionStateNode>(pendingAssignQueue.size());
1922      for (RegionStateNode regionNode: pendingAssignQueue) {
1923        regions.put(regionNode.getRegionInfo(), regionNode);
1924      }
1925      pendingAssignQueue.clear();
1926    } catch (InterruptedException e) {
1927      LOG.warn("got interrupted ", e);
1928      Thread.currentThread().interrupt();
1929    } finally {
1930      assignQueueLock.unlock();
1931    }
1932    return regions;
1933  }
1934
1935  private void processAssignQueue() {
1936    final HashMap<RegionInfo, RegionStateNode> regions = waitOnAssignQueue();
1937    if (regions == null || regions.size() == 0 || !isRunning()) {
1938      return;
1939    }
1940
1941    if (LOG.isTraceEnabled()) {
1942      LOG.trace("PROCESS ASSIGN QUEUE regionCount=" + regions.size());
1943    }
1944
1945    // TODO: Optimize balancer. pass a RegionPlan?
1946    final HashMap<RegionInfo, ServerName> retainMap = new HashMap<>();
1947    final List<RegionInfo> userHRIs = new ArrayList<>(regions.size());
1948    // Regions for system tables requiring reassignment
1949    final List<RegionInfo> systemHRIs = new ArrayList<>();
1950    for (RegionStateNode regionStateNode: regions.values()) {
1951      boolean sysTable = regionStateNode.isSystemTable();
1952      final List<RegionInfo> hris = sysTable? systemHRIs: userHRIs;
1953      if (regionStateNode.getRegionLocation() != null) {
1954        retainMap.put(regionStateNode.getRegionInfo(), regionStateNode.getRegionLocation());
1955      } else {
1956        hris.add(regionStateNode.getRegionInfo());
1957      }
1958    }
1959
1960    // TODO: connect with the listener to invalidate the cache
1961
1962    // TODO use events
1963    List<ServerName> servers = master.getServerManager().createDestinationServersList();
1964    for (int i = 0; servers.size() < 1; ++i) {
1965      // Report every fourth time around this loop; try not to flood log.
1966      if (i % 4 == 0) {
1967        LOG.warn("No servers available; cannot place " + regions.size() + " unassigned regions.");
1968      }
1969
1970      if (!isRunning()) {
1971        LOG.debug("Stopped! Dropping assign of " + regions.size() + " queued regions.");
1972        return;
1973      }
1974      Threads.sleep(250);
1975      servers = master.getServerManager().createDestinationServersList();
1976    }
1977
1978    if (!systemHRIs.isEmpty()) {
1979      // System table regions requiring reassignment are present, get region servers
1980      // not available for system table regions
1981      final List<ServerName> excludeServers = getExcludedServersForSystemTable();
1982      List<ServerName> serversForSysTables = servers.stream()
1983          .filter(s -> !excludeServers.contains(s)).collect(Collectors.toList());
1984      if (serversForSysTables.isEmpty()) {
1985        LOG.warn("Filtering old server versions and the excluded produced an empty set; " +
1986            "instead considering all candidate servers!");
1987      }
1988      LOG.debug("Processing assignQueue; systemServersCount=" + serversForSysTables.size() +
1989          ", allServersCount=" + servers.size());
1990      processAssignmentPlans(regions, null, systemHRIs,
1991          serversForSysTables.isEmpty() && !containsBogusAssignments(regions, systemHRIs) ?
1992              servers: serversForSysTables);
1993    }
1994
1995    processAssignmentPlans(regions, retainMap, userHRIs, servers);
1996  }
1997
1998  private boolean containsBogusAssignments(Map<RegionInfo, RegionStateNode> regions,
1999      List<RegionInfo> hirs) {
2000    for (RegionInfo ri : hirs) {
2001      if (regions.get(ri).getRegionLocation() != null &&
2002          regions.get(ri).getRegionLocation().equals(LoadBalancer.BOGUS_SERVER_NAME)){
2003        return true;
2004      }
2005    }
2006    return false;
2007  }
2008
2009  private void processAssignmentPlans(final HashMap<RegionInfo, RegionStateNode> regions,
2010      final HashMap<RegionInfo, ServerName> retainMap, final List<RegionInfo> hris,
2011      final List<ServerName> servers) {
2012    boolean isTraceEnabled = LOG.isTraceEnabled();
2013    if (isTraceEnabled) {
2014      LOG.trace("Available servers count=" + servers.size() + ": " + servers);
2015    }
2016
2017    final LoadBalancer balancer = getBalancer();
2018    // ask the balancer where to place regions
2019    if (retainMap != null && !retainMap.isEmpty()) {
2020      if (isTraceEnabled) {
2021        LOG.trace("retain assign regions=" + retainMap);
2022      }
2023      try {
2024        acceptPlan(regions, balancer.retainAssignment(retainMap, servers));
2025      } catch (HBaseIOException e) {
2026        LOG.warn("unable to retain assignment", e);
2027        addToPendingAssignment(regions, retainMap.keySet());
2028      }
2029    }
2030
2031    // TODO: Do we need to split retain and round-robin?
2032    // the retain seems to fallback to round-robin/random if the region is not in the map.
2033    if (!hris.isEmpty()) {
2034      Collections.sort(hris, RegionInfo.COMPARATOR);
2035      if (isTraceEnabled) {
2036        LOG.trace("round robin regions=" + hris);
2037      }
2038      try {
2039        acceptPlan(regions, balancer.roundRobinAssignment(hris, servers));
2040      } catch (HBaseIOException e) {
2041        LOG.warn("unable to round-robin assignment", e);
2042        addToPendingAssignment(regions, hris);
2043      }
2044    }
2045  }
2046
2047  private void acceptPlan(final HashMap<RegionInfo, RegionStateNode> regions,
2048      final Map<ServerName, List<RegionInfo>> plan) throws HBaseIOException {
2049    final ProcedureEvent<?>[] events = new ProcedureEvent[regions.size()];
2050    final long st = System.currentTimeMillis();
2051
2052    if (plan == null) {
2053      throw new HBaseIOException("unable to compute plans for regions=" + regions.size());
2054    }
2055
2056    if (plan.isEmpty()) return;
2057
2058    int evcount = 0;
2059    for (Map.Entry<ServerName, List<RegionInfo>> entry: plan.entrySet()) {
2060      final ServerName server = entry.getKey();
2061      for (RegionInfo hri: entry.getValue()) {
2062        final RegionStateNode regionNode = regions.get(hri);
2063        regionNode.setRegionLocation(server);
2064        if (server.equals(LoadBalancer.BOGUS_SERVER_NAME) && regionNode.isSystemTable()) {
2065          assignQueueLock.lock();
2066          try {
2067            pendingAssignQueue.add(regionNode);
2068          } finally {
2069            assignQueueLock.unlock();
2070          }
2071        }else {
2072          events[evcount++] = regionNode.getProcedureEvent();
2073        }
2074      }
2075    }
2076    ProcedureEvent.wakeEvents(getProcedureScheduler(), events);
2077
2078    final long et = System.currentTimeMillis();
2079    if (LOG.isTraceEnabled()) {
2080      LOG.trace("ASSIGN ACCEPT " + events.length + " -> " +
2081          StringUtils.humanTimeDiff(et - st));
2082    }
2083  }
2084
2085  private void addToPendingAssignment(final HashMap<RegionInfo, RegionStateNode> regions,
2086      final Collection<RegionInfo> pendingRegions) {
2087    assignQueueLock.lock();
2088    try {
2089      for (RegionInfo hri: pendingRegions) {
2090        pendingAssignQueue.add(regions.get(hri));
2091      }
2092    } finally {
2093      assignQueueLock.unlock();
2094    }
2095  }
2096
2097  /**
2098   * Get a list of servers that this region cannot be assigned to.
2099   * For system tables, we must assign them to a server with highest version.
2100   */
2101  public List<ServerName> getExcludedServersForSystemTable() {
2102    // TODO: This should be a cached list kept by the ServerManager rather than calculated on each
2103    // move or system region assign. The RegionServerTracker keeps list of online Servers with
2104    // RegionServerInfo that includes Version.
2105    List<Pair<ServerName, String>> serverList = master.getServerManager().getOnlineServersList()
2106        .stream()
2107        .map((s)->new Pair<>(s, master.getRegionServerVersion(s)))
2108        .collect(Collectors.toList());
2109    if (serverList.isEmpty()) {
2110      return Collections.emptyList();
2111    }
2112    String highestVersion = Collections.max(serverList,
2113        (o1, o2) -> VersionInfo.compareVersion(o1.getSecond(), o2.getSecond())).getSecond();
2114    return serverList.stream()
2115        .filter((p)->!p.getSecond().equals(highestVersion))
2116        .map(Pair::getFirst)
2117        .collect(Collectors.toList());
2118  }
2119
2120  @VisibleForTesting
2121  MasterServices getMaster() {
2122    return master;
2123  }
2124
2125  /**
2126   * @return a snapshot of rsReports
2127   */
2128  public Map<ServerName, Set<byte[]>> getRSReports() {
2129    Map<ServerName, Set<byte[]>> rsReportsSnapshot = new HashMap<>();
2130    synchronized (rsReports) {
2131      rsReports.entrySet().forEach(e -> rsReportsSnapshot.put(e.getKey(), e.getValue()));
2132    }
2133    return rsReportsSnapshot;
2134  }
2135
2136  /**
2137   * Provide regions state count for given table.
2138   * e.g howmany regions of give table are opened/closed/rit etc
2139   *
2140   * @param tableName TableName
2141   * @return region states count
2142   */
2143  public RegionStatesCount getRegionStatesCount(TableName tableName) {
2144    int openRegionsCount = 0;
2145    int closedRegionCount = 0;
2146    int ritCount = 0;
2147    int splitRegionCount = 0;
2148    int totalRegionCount = 0;
2149    if (!isTableDisabled(tableName)) {
2150      final List<RegionState> states = regionStates.getTableRegionStates(tableName);
2151      for (RegionState regionState : states) {
2152        if (regionState.isOpened()) {
2153          openRegionsCount++;
2154        } else if (regionState.isClosed()) {
2155          closedRegionCount++;
2156        } else if (regionState.isSplit()) {
2157          splitRegionCount++;
2158        }
2159      }
2160      totalRegionCount = states.size();
2161      ritCount = totalRegionCount - openRegionsCount - splitRegionCount;
2162    }
2163    return new RegionStatesCount.RegionStatesCountBuilder()
2164      .setOpenRegions(openRegionsCount)
2165      .setClosedRegions(closedRegionCount)
2166      .setSplitRegions(splitRegionCount)
2167      .setRegionsInTransition(ritCount)
2168      .setTotalRegions(totalRegionCount)
2169      .build();
2170  }
2171
2172}