001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.assignment;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Arrays;
023import java.util.Collection;
024import java.util.Collections;
025import java.util.HashMap;
026import java.util.List;
027import java.util.Map;
028import java.util.Set;
029import java.util.concurrent.CopyOnWriteArrayList;
030import java.util.concurrent.Future;
031import java.util.concurrent.TimeUnit;
032import java.util.concurrent.atomic.AtomicBoolean;
033import java.util.concurrent.locks.Condition;
034import java.util.concurrent.locks.ReentrantLock;
035import java.util.stream.Collectors;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.hbase.HBaseIOException;
038import org.apache.hadoop.hbase.HConstants;
039import org.apache.hadoop.hbase.PleaseHoldException;
040import org.apache.hadoop.hbase.ServerName;
041import org.apache.hadoop.hbase.TableName;
042import org.apache.hadoop.hbase.YouAreDeadException;
043import org.apache.hadoop.hbase.client.RegionInfo;
044import org.apache.hadoop.hbase.client.RegionInfoBuilder;
045import org.apache.hadoop.hbase.client.Result;
046import org.apache.hadoop.hbase.client.TableState;
047import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
048import org.apache.hadoop.hbase.favored.FavoredNodesManager;
049import org.apache.hadoop.hbase.favored.FavoredNodesPromoter;
050import org.apache.hadoop.hbase.master.AssignmentListener;
051import org.apache.hadoop.hbase.master.LoadBalancer;
052import org.apache.hadoop.hbase.master.MasterServices;
053import org.apache.hadoop.hbase.master.MetricsAssignmentManager;
054import org.apache.hadoop.hbase.master.RegionPlan;
055import org.apache.hadoop.hbase.master.RegionState;
056import org.apache.hadoop.hbase.master.RegionState.State;
057import org.apache.hadoop.hbase.master.ServerListener;
058import org.apache.hadoop.hbase.master.TableStateManager;
059import org.apache.hadoop.hbase.master.assignment.RegionStates.RegionStateNode;
060import org.apache.hadoop.hbase.master.assignment.RegionStates.ServerState;
061import org.apache.hadoop.hbase.master.assignment.RegionStates.ServerStateNode;
062import org.apache.hadoop.hbase.master.balancer.FavoredStochasticBalancer;
063import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
064import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler;
065import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait;
066import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
067import org.apache.hadoop.hbase.procedure2.Procedure;
068import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
069import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
070import org.apache.hadoop.hbase.procedure2.ProcedureInMemoryChore;
071import org.apache.hadoop.hbase.procedure2.util.StringUtils;
072import org.apache.hadoop.hbase.regionserver.SequenceId;
073import org.apache.hadoop.hbase.util.Bytes;
074import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
075import org.apache.hadoop.hbase.util.HasThread;
076import org.apache.hadoop.hbase.util.Pair;
077import org.apache.hadoop.hbase.util.Threads;
078import org.apache.hadoop.hbase.util.VersionInfo;
079import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
080import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
081import org.apache.yetus.audience.InterfaceAudience;
082import org.apache.zookeeper.KeeperException;
083import org.slf4j.Logger;
084import org.slf4j.LoggerFactory;
085
086import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
087
088import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
089import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
090import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
091import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
092import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
093
094/**
095 * The AssignmentManager is the coordinator for region assign/unassign operations.
096 * <ul>
097 * <li>In-memory states of regions and servers are stored in {@link RegionStates}.</li>
098 * <li>hbase:meta state updates are handled by {@link RegionStateStore}.</li>
099 * </ul>
100 * Regions are created by CreateTable, Split, Merge.
101 * Regions are deleted by DeleteTable, Split, Merge.
102 * Assigns are triggered by CreateTable, EnableTable, Split, Merge, ServerCrash.
103 * Unassigns are triggered by DisableTable, Split, Merge
104 */
105@InterfaceAudience.Private
106public class AssignmentManager implements ServerListener {
107  private static final Logger LOG = LoggerFactory.getLogger(AssignmentManager.class);
108
109  // TODO: AMv2
110  //  - handle region migration from hbase1 to hbase2.
111  //  - handle sys table assignment first (e.g. acl, namespace)
112  //  - handle table priorities
113  //  - If ServerBusyException trying to update hbase:meta, we abort the Master
114  //   See updateRegionLocation in RegionStateStore.
115  //
116  // See also
117  // https://docs.google.com/document/d/1eVKa7FHdeoJ1-9o8yZcOTAQbv0u0bblBlCCzVSIn69g/edit#heading=h.ystjyrkbtoq5
118  // for other TODOs.
119
120  public static final String BOOTSTRAP_THREAD_POOL_SIZE_CONF_KEY =
121      "hbase.assignment.bootstrap.thread.pool.size";
122
123  public static final String ASSIGN_DISPATCH_WAIT_MSEC_CONF_KEY =
124      "hbase.assignment.dispatch.wait.msec";
125  private static final int DEFAULT_ASSIGN_DISPATCH_WAIT_MSEC = 150;
126
127  public static final String ASSIGN_DISPATCH_WAITQ_MAX_CONF_KEY =
128      "hbase.assignment.dispatch.wait.queue.max.size";
129  private static final int DEFAULT_ASSIGN_DISPATCH_WAITQ_MAX = 100;
130
131  public static final String RIT_CHORE_INTERVAL_MSEC_CONF_KEY =
132      "hbase.assignment.rit.chore.interval.msec";
133  private static final int DEFAULT_RIT_CHORE_INTERVAL_MSEC = 60 * 1000;
134
135  public static final String ASSIGN_MAX_ATTEMPTS =
136      "hbase.assignment.maximum.attempts";
137  private static final int DEFAULT_ASSIGN_MAX_ATTEMPTS = Integer.MAX_VALUE;
138
139  /** Region in Transition metrics threshold time */
140  public static final String METRICS_RIT_STUCK_WARNING_THRESHOLD =
141      "hbase.metrics.rit.stuck.warning.threshold";
142  private static final int DEFAULT_RIT_STUCK_WARNING_THRESHOLD = 60 * 1000;
143
144  private final ProcedureEvent<?> metaAssignEvent = new ProcedureEvent<>("meta assign");
145  private final ProcedureEvent<?> metaLoadEvent = new ProcedureEvent<>("meta load");
146
147  /** Listeners that are called on assignment events. */
148  private final CopyOnWriteArrayList<AssignmentListener> listeners =
149      new CopyOnWriteArrayList<AssignmentListener>();
150
151  private final MetricsAssignmentManager metrics;
152  private final RegionInTransitionChore ritChore;
153  private final MasterServices master;
154
155  private final AtomicBoolean running = new AtomicBoolean(false);
156  private final RegionStates regionStates = new RegionStates();
157  private final RegionStateStore regionStateStore;
158
159  private final boolean shouldAssignRegionsWithFavoredNodes;
160  private final int assignDispatchWaitQueueMaxSize;
161  private final int assignDispatchWaitMillis;
162  private final int assignMaxAttempts;
163
164  private final Object checkIfShouldMoveSystemRegionLock = new Object();
165
166  private Thread assignThread;
167
168  public AssignmentManager(final MasterServices master) {
169    this(master, new RegionStateStore(master));
170  }
171
172  public AssignmentManager(final MasterServices master, final RegionStateStore stateStore) {
173    this.master = master;
174    this.regionStateStore = stateStore;
175    this.metrics = new MetricsAssignmentManager();
176
177    final Configuration conf = master.getConfiguration();
178
179    // Only read favored nodes if using the favored nodes load balancer.
180    this.shouldAssignRegionsWithFavoredNodes = FavoredStochasticBalancer.class.isAssignableFrom(
181        conf.getClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, Object.class));
182
183    this.assignDispatchWaitMillis = conf.getInt(ASSIGN_DISPATCH_WAIT_MSEC_CONF_KEY,
184        DEFAULT_ASSIGN_DISPATCH_WAIT_MSEC);
185    this.assignDispatchWaitQueueMaxSize = conf.getInt(ASSIGN_DISPATCH_WAITQ_MAX_CONF_KEY,
186        DEFAULT_ASSIGN_DISPATCH_WAITQ_MAX);
187
188    this.assignMaxAttempts = Math.max(1, conf.getInt(ASSIGN_MAX_ATTEMPTS,
189        DEFAULT_ASSIGN_MAX_ATTEMPTS));
190
191    int ritChoreInterval = conf.getInt(RIT_CHORE_INTERVAL_MSEC_CONF_KEY,
192        DEFAULT_RIT_CHORE_INTERVAL_MSEC);
193    this.ritChore = new RegionInTransitionChore(ritChoreInterval);
194  }
195
196  public void start() throws IOException, KeeperException {
197    if (!running.compareAndSet(false, true)) {
198      return;
199    }
200
201    LOG.trace("Starting assignment manager");
202
203    // Register Server Listener
204    master.getServerManager().registerListener(this);
205
206    // Start the Assignment Thread
207    startAssignmentThread();
208
209    // load meta region state
210    ZKWatcher zkw = master.getZooKeeper();
211    // it could be null in some tests
212    if (zkw != null) {
213      RegionState regionState = MetaTableLocator.getMetaRegionState(zkw);
214      RegionStateNode regionStateNode =
215        regionStates.getOrCreateRegionStateNode(RegionInfoBuilder.FIRST_META_REGIONINFO);
216      synchronized (regionStateNode) {
217        regionStateNode.setRegionLocation(regionState.getServerName());
218        regionStateNode.setState(regionState.getState());
219        setMetaAssigned(regionState.getRegion(), regionState.getState() == State.OPEN);
220      }
221    }
222  }
223
224  public void stop() {
225    if (!running.compareAndSet(true, false)) {
226      return;
227    }
228
229    LOG.info("Stopping assignment manager");
230
231    // The AM is started before the procedure executor,
232    // but the actual work will be loaded/submitted only once we have the executor
233    final boolean hasProcExecutor = master.getMasterProcedureExecutor() != null;
234
235    // Remove the RIT chore
236    if (hasProcExecutor) {
237      master.getMasterProcedureExecutor().removeChore(this.ritChore);
238    }
239
240    // Stop the Assignment Thread
241    stopAssignmentThread();
242
243    // Stop the RegionStateStore
244    regionStates.clear();
245
246    // Unregister Server Listener
247    master.getServerManager().unregisterListener(this);
248
249    // Update meta events (for testing)
250    if (hasProcExecutor) {
251      metaLoadEvent.suspend();
252      for (RegionInfo hri: getMetaRegionSet()) {
253        setMetaAssigned(hri, false);
254      }
255    }
256  }
257
258  public boolean isRunning() {
259    return running.get();
260  }
261
262  public Configuration getConfiguration() {
263    return master.getConfiguration();
264  }
265
266  public MetricsAssignmentManager getAssignmentManagerMetrics() {
267    return metrics;
268  }
269
270  private LoadBalancer getBalancer() {
271    return master.getLoadBalancer();
272  }
273
274  private MasterProcedureEnv getProcedureEnvironment() {
275    return master.getMasterProcedureExecutor().getEnvironment();
276  }
277
278  private MasterProcedureScheduler getProcedureScheduler() {
279    return getProcedureEnvironment().getProcedureScheduler();
280  }
281
282  int getAssignMaxAttempts() {
283    return assignMaxAttempts;
284  }
285
286  /**
287   * Add the listener to the notification list.
288   * @param listener The AssignmentListener to register
289   */
290  public void registerListener(final AssignmentListener listener) {
291    this.listeners.add(listener);
292  }
293
294  /**
295   * Remove the listener from the notification list.
296   * @param listener The AssignmentListener to unregister
297   */
298  public boolean unregisterListener(final AssignmentListener listener) {
299    return this.listeners.remove(listener);
300  }
301
302  public RegionStates getRegionStates() {
303    return regionStates;
304  }
305
306  public RegionStateStore getRegionStateStore() {
307    return regionStateStore;
308  }
309
310  public List<ServerName> getFavoredNodes(final RegionInfo regionInfo) {
311    return this.shouldAssignRegionsWithFavoredNodes?
312        ((FavoredStochasticBalancer)getBalancer()).getFavoredNodes(regionInfo):
313          ServerName.EMPTY_SERVER_LIST;
314  }
315
316  // ============================================================================================
317  //  Table State Manager helpers
318  // ============================================================================================
319  TableStateManager getTableStateManager() {
320    return master.getTableStateManager();
321  }
322
323  public boolean isTableEnabled(final TableName tableName) {
324    return getTableStateManager().isTableState(tableName, TableState.State.ENABLED);
325  }
326
327  public boolean isTableDisabled(final TableName tableName) {
328    return getTableStateManager().isTableState(tableName,
329      TableState.State.DISABLED, TableState.State.DISABLING);
330  }
331
332  // ============================================================================================
333  //  META Helpers
334  // ============================================================================================
335  private boolean isMetaRegion(final RegionInfo regionInfo) {
336    return regionInfo.isMetaRegion();
337  }
338
339  public boolean isMetaRegion(final byte[] regionName) {
340    return getMetaRegionFromName(regionName) != null;
341  }
342
343  public RegionInfo getMetaRegionFromName(final byte[] regionName) {
344    for (RegionInfo hri: getMetaRegionSet()) {
345      if (Bytes.equals(hri.getRegionName(), regionName)) {
346        return hri;
347      }
348    }
349    return null;
350  }
351
352  public boolean isCarryingMeta(final ServerName serverName) {
353    // TODO: handle multiple meta
354    return isCarryingRegion(serverName, RegionInfoBuilder.FIRST_META_REGIONINFO);
355  }
356
357  private boolean isCarryingRegion(final ServerName serverName, final RegionInfo regionInfo) {
358    // TODO: check for state?
359    final RegionStateNode node = regionStates.getRegionStateNode(regionInfo);
360    return(node != null && serverName.equals(node.getRegionLocation()));
361  }
362
363  private RegionInfo getMetaForRegion(final RegionInfo regionInfo) {
364    //if (regionInfo.isMetaRegion()) return regionInfo;
365    // TODO: handle multiple meta. if the region provided is not meta lookup
366    // which meta the region belongs to.
367    return RegionInfoBuilder.FIRST_META_REGIONINFO;
368  }
369
370  // TODO: handle multiple meta.
371  private static final Set<RegionInfo> META_REGION_SET =
372      Collections.singleton(RegionInfoBuilder.FIRST_META_REGIONINFO);
373  public Set<RegionInfo> getMetaRegionSet() {
374    return META_REGION_SET;
375  }
376
377  // ============================================================================================
378  //  META Event(s) helpers
379  // ============================================================================================
380  /**
381   * Notice that, this only means the meta region is available on a RS, but the AM may still be
382   * loading the region states from meta, so usually you need to check {@link #isMetaLoaded()} first
383   * before checking this method, unless you can make sure that your piece of code can only be
384   * executed after AM builds the region states.
385   * @see #isMetaLoaded()
386   */
387  public boolean isMetaAssigned() {
388    return metaAssignEvent.isReady();
389  }
390
391  public boolean isMetaRegionInTransition() {
392    return !isMetaAssigned();
393  }
394
395  /**
396   * Notice that this event does not mean the AM has already finished region state rebuilding. See
397   * the comment of {@link #isMetaAssigned()} for more details.
398   * @see #isMetaAssigned()
399   */
400  public boolean waitMetaAssigned(Procedure<?> proc, RegionInfo regionInfo) {
401    return getMetaAssignEvent(getMetaForRegion(regionInfo)).suspendIfNotReady(proc);
402  }
403
404  private void setMetaAssigned(RegionInfo metaRegionInfo, boolean assigned) {
405    assert isMetaRegion(metaRegionInfo) : "unexpected non-meta region " + metaRegionInfo;
406    ProcedureEvent<?> metaAssignEvent = getMetaAssignEvent(metaRegionInfo);
407    if (assigned) {
408      metaAssignEvent.wake(getProcedureScheduler());
409    } else {
410      metaAssignEvent.suspend();
411    }
412  }
413
414  private ProcedureEvent<?> getMetaAssignEvent(RegionInfo metaRegionInfo) {
415    assert isMetaRegion(metaRegionInfo) : "unexpected non-meta region " + metaRegionInfo;
416    // TODO: handle multiple meta.
417    return metaAssignEvent;
418  }
419
420  /**
421   * Wait until AM finishes the meta loading, i.e, the region states rebuilding.
422   * @see #isMetaLoaded()
423   * @see #waitMetaAssigned(Procedure, RegionInfo)
424   */
425  public boolean waitMetaLoaded(Procedure<?> proc) {
426    return metaLoadEvent.suspendIfNotReady(proc);
427  }
428
429  @VisibleForTesting
430  void wakeMetaLoadedEvent() {
431    metaLoadEvent.wake(getProcedureScheduler());
432    assert isMetaLoaded() : "expected meta to be loaded";
433  }
434
435  /**
436   * Return whether AM finishes the meta loading, i.e, the region states rebuilding.
437   * @see #isMetaAssigned()
438   * @see #waitMetaLoaded(Procedure)
439   */
440  public boolean isMetaLoaded() {
441    return metaLoadEvent.isReady();
442  }
443
444  /**
445   * Start a new thread to check if there are region servers whose versions are higher than others.
446   * If so, move all system table regions to RS with the highest version to keep compatibility.
447   * The reason is, RS in new version may not be able to access RS in old version when there are
448   * some incompatible changes.
449   * <p>This method is called when a new RegionServer is added to cluster only.</p>
450   */
451  public void checkIfShouldMoveSystemRegionAsync() {
452    // TODO: Fix this thread. If a server is killed and a new one started, this thread thinks that
453    // it should 'move' the system tables from the old server to the new server but
454    // ServerCrashProcedure is on it; and it will take care of the assign without dataloss.
455    if (this.master.getServerManager().countOfRegionServers() <= 1) {
456      return;
457    }
458    // This thread used to run whenever there was a change in the cluster. The ZooKeeper
459    // childrenChanged notification came in before the nodeDeleted message and so this method
460    // cold run before a ServerCrashProcedure could run. That meant that this thread could see
461    // a Crashed Server before ServerCrashProcedure and it could find system regions on the
462    // crashed server and go move them before ServerCrashProcedure had a chance; could be
463    // dataloss too if WALs were not recovered.
464    new Thread(() -> {
465      try {
466        synchronized (checkIfShouldMoveSystemRegionLock) {
467          List<RegionPlan> plans = new ArrayList<>();
468          // TODO: I don't think this code does a good job if all servers in cluster have same
469          // version. It looks like it will schedule unnecessary moves.
470          for (ServerName server : getExcludedServersForSystemTable()) {
471            if (master.getServerManager().isServerDead(server)) {
472              // TODO: See HBASE-18494 and HBASE-18495. Though getExcludedServersForSystemTable()
473              // considers only online servers, the server could be queued for dead server
474              // processing. As region assignments for crashed server is handled by
475              // ServerCrashProcedure, do NOT handle them here. The goal is to handle this through
476              // regular flow of LoadBalancer as a favored node and not to have this special
477              // handling.
478              continue;
479            }
480            List<RegionInfo> regionsShouldMove = getSystemTables(server);
481            if (!regionsShouldMove.isEmpty()) {
482              for (RegionInfo regionInfo : regionsShouldMove) {
483                // null value for dest forces destination server to be selected by balancer
484                RegionPlan plan = new RegionPlan(regionInfo, server, null);
485                if (regionInfo.isMetaRegion()) {
486                  // Must move meta region first.
487                  LOG.info("Async MOVE of {} to newer Server={}",
488                      regionInfo.getEncodedName(), server);
489                  moveAsync(plan);
490                } else {
491                  plans.add(plan);
492                }
493              }
494            }
495            for (RegionPlan plan : plans) {
496              LOG.info("Async MOVE of {} to newer Server={}",
497                  plan.getRegionInfo().getEncodedName(), server);
498              moveAsync(plan);
499            }
500          }
501        }
502      } catch (Throwable t) {
503        LOG.error(t.toString(), t);
504      }
505    }).start();
506  }
507
508  private List<RegionInfo> getSystemTables(ServerName serverName) {
509    Set<RegionStateNode> regions = this.getRegionStates().getServerNode(serverName).getRegions();
510    if (regions == null) {
511      return new ArrayList<>();
512    }
513    return regions.stream()
514        .map(RegionStateNode::getRegionInfo)
515        .filter(r -> r.getTable().isSystemTable())
516        .collect(Collectors.toList());
517  }
518
519  public void assign(final RegionInfo regionInfo, ServerName sn) throws IOException {
520    AssignProcedure proc = createAssignProcedure(regionInfo, sn);
521    ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc);
522  }
523
524  public void assign(final RegionInfo regionInfo) throws IOException {
525    AssignProcedure proc = createAssignProcedure(regionInfo);
526    ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc);
527  }
528
529  public void unassign(final RegionInfo regionInfo) throws IOException {
530    unassign(regionInfo, false);
531  }
532
533  public void unassign(final RegionInfo regionInfo, final boolean forceNewPlan)
534  throws IOException {
535    // TODO: rename this reassign
536    RegionStateNode node = this.regionStates.getRegionStateNode(regionInfo);
537    ServerName destinationServer = node.getRegionLocation();
538    if (destinationServer == null) {
539      throw new UnexpectedStateException("DestinationServer is null; Assigned? " + node.toString());
540    }
541    assert destinationServer != null; node.toString();
542    UnassignProcedure proc = createUnassignProcedure(regionInfo, destinationServer, forceNewPlan);
543    ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc);
544  }
545
546  public void move(final RegionInfo regionInfo) throws IOException {
547    RegionStateNode node = this.regionStates.getRegionStateNode(regionInfo);
548    ServerName sourceServer = node.getRegionLocation();
549    RegionPlan plan = new RegionPlan(regionInfo, sourceServer, null);
550    MoveRegionProcedure proc = createMoveRegionProcedure(plan);
551    ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc);
552  }
553
554  public Future<byte[]> moveAsync(final RegionPlan regionPlan) throws HBaseIOException {
555    MoveRegionProcedure proc = createMoveRegionProcedure(regionPlan);
556    return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), proc);
557  }
558
559  /**
560   * Create round-robin assigns. Use on table creation to distribute out regions across cluster.
561   * @return AssignProcedures made out of the passed in <code>hris</code> and a call to the balancer
562   *         to populate the assigns with targets chosen using round-robin (default balancer
563   *         scheme). If at assign-time, the target chosen is no longer up, thats fine, the
564   *         AssignProcedure will ask the balancer for a new target, and so on.
565   */
566  public AssignProcedure[] createRoundRobinAssignProcedures(List<RegionInfo> hris) {
567    return createRoundRobinAssignProcedures(hris, null);
568  }
569
570  // ============================================================================================
571  //  RegionTransition procedures helpers
572  // ============================================================================================
573
574  /**
575   * Create round-robin assigns. Use on table creation to distribute out regions across cluster.
576   * @return AssignProcedures made out of the passed in <code>hris</code> and a call
577   * to the balancer to populate the assigns with targets chosen using round-robin (default
578   * balancer scheme). If at assign-time, the target chosen is no longer up, thats fine,
579   * the AssignProcedure will ask the balancer for a new target, and so on.
580   */
581  public AssignProcedure[] createRoundRobinAssignProcedures(final List<RegionInfo> hris,
582      List<ServerName> serversToExclude) {
583    if (hris.isEmpty()) {
584      return null;
585    }
586    if (serversToExclude != null
587        && this.master.getServerManager().getOnlineServersList().size() == 1) {
588      LOG.debug("Only one region server found and hence going ahead with the assignment");
589      serversToExclude = null;
590    }
591    try {
592      // Ask the balancer to assign our regions. Pass the regions en masse. The balancer can do
593      // a better job if it has all the assignments in the one lump.
594      Map<ServerName, List<RegionInfo>> assignments = getBalancer().roundRobinAssignment(hris,
595          this.master.getServerManager().createDestinationServersList(serversToExclude));
596      // Return mid-method!
597      return createAssignProcedures(assignments, hris.size());
598    } catch (HBaseIOException hioe) {
599      LOG.warn("Failed roundRobinAssignment", hioe);
600    }
601    // If an error above, fall-through to this simpler assign. Last resort.
602    return createAssignProcedures(hris);
603  }
604
605  /**
606   * Create an array of AssignProcedures w/o specifying a target server.
607   * If no target server, at assign time, we will try to use the former location of the region
608   * if one exists. This is how we 'retain' the old location across a server restart.
609   * Used by {@link ServerCrashProcedure} assigning regions on a server that has crashed (SCP is
610   * also used across a cluster-restart just-in-case to ensure we do cleanup of any old WALs or
611   * server processes).
612   */
613  public AssignProcedure[] createAssignProcedures(final List<RegionInfo> hris) {
614    if (hris.isEmpty()) {
615      return null;
616    }
617    int index = 0;
618    AssignProcedure [] procedures = new AssignProcedure[hris.size()];
619    for (RegionInfo hri : hris) {
620      // Sort the procedures so meta and system regions are first in the returned array.
621      procedures[index++] = createAssignProcedure(hri);
622    }
623    if (procedures.length > 1) {
624      // Sort the procedures so meta and system regions are first in the returned array.
625      Arrays.sort(procedures, AssignProcedure.COMPARATOR);
626    }
627    return procedures;
628  }
629
630  // Make this static for the method below where we use it typing the AssignProcedure array we
631  // return as result.
632  private static final AssignProcedure [] ASSIGN_PROCEDURE_ARRAY_TYPE = new AssignProcedure[] {};
633
634  /**
635   * @param assignments Map of assignments from which we produce an array of AssignProcedures.
636   * @param size Count of assignments to make (the caller may know the total count)
637   * @return Assignments made from the passed in <code>assignments</code>
638   */
639  private AssignProcedure[] createAssignProcedures(Map<ServerName, List<RegionInfo>> assignments,
640      int size) {
641    List<AssignProcedure> procedures = new ArrayList<>(size > 0? size: 8/*Arbitrary*/);
642    for (Map.Entry<ServerName, List<RegionInfo>> e: assignments.entrySet()) {
643      for (RegionInfo ri: e.getValue()) {
644        AssignProcedure ap = createAssignProcedure(ri, e.getKey());
645        ap.setOwner(getProcedureEnvironment().getRequestUser().getShortName());
646        procedures.add(ap);
647      }
648    }
649    if (procedures.size() > 1) {
650      // Sort the procedures so meta and system regions are first in the returned array.
651      procedures.sort(AssignProcedure.COMPARATOR);
652    }
653    return procedures.toArray(ASSIGN_PROCEDURE_ARRAY_TYPE);
654  }
655
656  // Needed for the following method so it can type the created Array we retur n
657  private static final UnassignProcedure [] UNASSIGN_PROCEDURE_ARRAY_TYPE =
658      new UnassignProcedure[0];
659
660  UnassignProcedure[] createUnassignProcedures(final Collection<RegionStateNode> nodes) {
661    if (nodes.isEmpty()) return null;
662    final List<UnassignProcedure> procs = new ArrayList<UnassignProcedure>(nodes.size());
663    for (RegionStateNode node: nodes) {
664      if (!this.regionStates.include(node, false)) continue;
665      // Look for regions that are offline/closed; i.e. already unassigned.
666      if (this.regionStates.isRegionOffline(node.getRegionInfo())) continue;
667      assert node.getRegionLocation() != null: node.toString();
668      procs.add(createUnassignProcedure(node.getRegionInfo(), node.getRegionLocation(), false));
669    }
670    return procs.toArray(UNASSIGN_PROCEDURE_ARRAY_TYPE);
671  }
672
673  /**
674   * Called by things like DisableTableProcedure to get a list of UnassignProcedure
675   * to unassign the regions of the table.
676   */
677  public AssignProcedure createAssignProcedure(final RegionInfo regionInfo) {
678    return createAssignProcedure(regionInfo, null, false);
679  }
680
681  public AssignProcedure createAssignProcedure(final RegionInfo regionInfo, boolean override) {
682    return createAssignProcedure(regionInfo, null, override);
683  }
684
685  public AssignProcedure createAssignProcedure(final RegionInfo regionInfo,
686      ServerName targetServer) {
687    return createAssignProcedure(regionInfo, targetServer, false);
688  }
689
690  public AssignProcedure createAssignProcedure(final RegionInfo regionInfo,
691      final ServerName targetServer, boolean override) {
692    AssignProcedure proc = new AssignProcedure(regionInfo, targetServer, override);
693    proc.setOwner(getProcedureEnvironment().getRequestUser().getShortName());
694    return proc;
695  }
696
697  public UnassignProcedure createUnassignProcedure(final RegionInfo regionInfo) {
698    return createUnassignProcedure(regionInfo, null, false);
699  }
700
701  public UnassignProcedure createUnassignProcedure(final RegionInfo regionInfo,
702      boolean override) {
703    return createUnassignProcedure(regionInfo, null, override);
704  }
705
706  public UnassignProcedure[] createUnassignProcedures(final TableName tableName) {
707    return createUnassignProcedures(regionStates.getTableRegionStateNodes(tableName));
708  }
709
710  UnassignProcedure createUnassignProcedure(final RegionInfo regionInfo,
711      final ServerName destinationServer, final boolean force) {
712    return createUnassignProcedure(regionInfo, destinationServer, force, false);
713  }
714
715  UnassignProcedure createUnassignProcedure(final RegionInfo regionInfo,
716      final ServerName destinationServer, final boolean force,
717      final boolean removeAfterUnassigning) {
718    // If destinationServer is null, figure it.
719    ServerName sn = destinationServer != null? destinationServer:
720        getRegionStates().getRegionState(regionInfo).getServerName();
721    assert sn != null;
722    UnassignProcedure proc = new UnassignProcedure(regionInfo, sn, force, removeAfterUnassigning);
723    proc.setOwner(getProcedureEnvironment().getRequestUser().getShortName());
724    return proc;
725  }
726
727  private MoveRegionProcedure createMoveRegionProcedure(RegionPlan plan) throws HBaseIOException {
728    if (plan.getRegionInfo().getTable().isSystemTable()) {
729      List<ServerName> exclude = getExcludedServersForSystemTable();
730      if (plan.getDestination() != null && exclude.contains(plan.getDestination())) {
731        try {
732          LOG.info("Can not move " + plan.getRegionInfo() + " to " + plan.getDestination() +
733            " because the server is not with highest version");
734          plan.setDestination(getBalancer().randomAssignment(plan.getRegionInfo(),
735            this.master.getServerManager().createDestinationServersList(exclude)));
736        } catch (HBaseIOException e) {
737          LOG.warn(e.toString(), e);
738        }
739      }
740    }
741    return new MoveRegionProcedure(getProcedureEnvironment(), plan, true);
742  }
743
744
745  public SplitTableRegionProcedure createSplitProcedure(final RegionInfo regionToSplit,
746      final byte[] splitKey) throws IOException {
747    return new SplitTableRegionProcedure(getProcedureEnvironment(), regionToSplit, splitKey);
748  }
749
750  public MergeTableRegionsProcedure createMergeProcedure(final RegionInfo regionToMergeA,
751      final RegionInfo regionToMergeB) throws IOException {
752    return new MergeTableRegionsProcedure(getProcedureEnvironment(), regionToMergeA,regionToMergeB);
753  }
754
755  /**
756   * Delete the region states. This is called by "DeleteTable"
757   */
758  public void deleteTable(final TableName tableName) throws IOException {
759    final ArrayList<RegionInfo> regions = regionStates.getTableRegionsInfo(tableName);
760    regionStateStore.deleteRegions(regions);
761    for (int i = 0; i < regions.size(); ++i) {
762      final RegionInfo regionInfo = regions.get(i);
763      // we expect the region to be offline
764      regionStates.removeFromOfflineRegions(regionInfo);
765      regionStates.deleteRegion(regionInfo);
766    }
767  }
768
769  // ============================================================================================
770  //  RS Region Transition Report helpers
771  // ============================================================================================
772  // TODO: Move this code in MasterRpcServices and call on specific event?
773  public ReportRegionStateTransitionResponse reportRegionStateTransition(
774      final ReportRegionStateTransitionRequest req)
775  throws PleaseHoldException {
776    final ReportRegionStateTransitionResponse.Builder builder =
777        ReportRegionStateTransitionResponse.newBuilder();
778    final ServerName serverName = ProtobufUtil.toServerName(req.getServer());
779    try {
780      for (RegionStateTransition transition: req.getTransitionList()) {
781        switch (transition.getTransitionCode()) {
782          case OPENED:
783          case FAILED_OPEN:
784          case CLOSED:
785            assert transition.getRegionInfoCount() == 1 : transition;
786            final RegionInfo hri = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0));
787            updateRegionTransition(serverName, transition.getTransitionCode(), hri,
788                transition.hasOpenSeqNum() ? transition.getOpenSeqNum() : HConstants.NO_SEQNUM);
789            break;
790          case READY_TO_SPLIT:
791          case SPLIT:
792          case SPLIT_REVERTED:
793            assert transition.getRegionInfoCount() == 3 : transition;
794            final RegionInfo parent = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0));
795            final RegionInfo splitA = ProtobufUtil.toRegionInfo(transition.getRegionInfo(1));
796            final RegionInfo splitB = ProtobufUtil.toRegionInfo(transition.getRegionInfo(2));
797            updateRegionSplitTransition(serverName, transition.getTransitionCode(),
798              parent, splitA, splitB);
799            break;
800          case READY_TO_MERGE:
801          case MERGED:
802          case MERGE_REVERTED:
803            assert transition.getRegionInfoCount() == 3 : transition;
804            final RegionInfo merged = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0));
805            final RegionInfo mergeA = ProtobufUtil.toRegionInfo(transition.getRegionInfo(1));
806            final RegionInfo mergeB = ProtobufUtil.toRegionInfo(transition.getRegionInfo(2));
807            updateRegionMergeTransition(serverName, transition.getTransitionCode(),
808              merged, mergeA, mergeB);
809            break;
810        }
811      }
812    } catch (PleaseHoldException e) {
813      if (LOG.isTraceEnabled()) LOG.trace("Failed transition " + e.getMessage());
814      throw e;
815    } catch (UnsupportedOperationException|IOException e) {
816      // TODO: at the moment we have a single error message and the RS will abort
817      // if the master says that one of the region transitions failed.
818      LOG.warn("Failed transition", e);
819      builder.setErrorMessage("Failed transition " + e.getMessage());
820    }
821    return builder.build();
822  }
823
824  /**
825   * Called when the RegionServer wants to report a Procedure transition.
826   * Ends up calling {@link #reportTransition(RegionStateNode, ServerName, TransitionCode, long)}
827   */
828  private void updateRegionTransition(final ServerName serverName, final TransitionCode state,
829      final RegionInfo regionInfo, final long seqId)
830      throws PleaseHoldException, UnexpectedStateException {
831    checkMetaLoaded(regionInfo);
832
833    final RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo);
834    if (regionNode == null) {
835      // the table/region is gone. maybe a delete, split, merge
836      throw new UnexpectedStateException(String.format(
837        "Server %s was trying to transition region %s to %s. but the region was removed.",
838        serverName, regionInfo, state));
839    }
840
841    if (LOG.isTraceEnabled()) {
842      LOG.trace(String.format("Update region transition serverName=%s region=%s regionState=%s",
843        serverName, regionNode, state));
844    }
845
846    if (!reportTransition(regionNode, serverName, state, seqId)) {
847      // Don't log WARN if shutting down cluster; during shutdown. Avoid the below messages:
848      // 2018-08-13 10:45:10,551 WARN ...AssignmentManager: No matching procedure found for
849      //   rit=OPEN, location=ve0538.halxg.cloudera.com,16020,1533493000958,
850      //   table=IntegrationTestBigLinkedList, region=65ab289e2fc1530df65f6c3d7cde7aa5 transition
851      //   to CLOSED
852      // These happen because on cluster shutdown, we currently let the RegionServers close
853      // regions. This is the only time that region close is not run by the Master (so cluster
854      // goes down fast). Consider changing it so Master runs all shutdowns.
855      if (this.master.getServerManager().isClusterShutdown() &&
856          state.equals(TransitionCode.CLOSED)) {
857        LOG.info("RegionServer {} {}", state, regionNode.getRegionInfo().getEncodedName());
858      } else {
859        LOG.warn("No matching procedure found for {} transition to {}", regionNode, state);
860      }
861    }
862  }
863
864  // FYI: regionNode is sometimes synchronized by the caller but not always.
865  private boolean reportTransition(final RegionStateNode regionNode, ServerName serverName,
866      final TransitionCode state, final long seqId)
867      throws UnexpectedStateException {
868    synchronized (regionNode) {
869      final RegionTransitionProcedure proc = regionNode.getProcedure();
870      if (proc == null) return false;
871
872      // serverNode.getReportEvent().removeProcedure(proc);
873      proc.reportTransition(master.getMasterProcedureExecutor().getEnvironment(),
874        serverName, state, seqId);
875    }
876    return true;
877  }
878
879  private void updateRegionSplitTransition(final ServerName serverName, final TransitionCode state,
880      final RegionInfo parent, final RegionInfo hriA, final RegionInfo hriB)
881      throws IOException {
882    checkMetaLoaded(parent);
883
884    if (state != TransitionCode.READY_TO_SPLIT) {
885      throw new UnexpectedStateException("unsupported split regionState=" + state +
886        " for parent region " + parent +
887        " maybe an old RS (< 2.0) had the operation in progress");
888    }
889
890    // sanity check on the request
891    if (!Bytes.equals(hriA.getEndKey(), hriB.getStartKey())) {
892      throw new UnsupportedOperationException(
893        "unsupported split request with bad keys: parent=" + parent +
894        " hriA=" + hriA + " hriB=" + hriB);
895    }
896
897    // Submit the Split procedure
898    final byte[] splitKey = hriB.getStartKey();
899    if (LOG.isDebugEnabled()) {
900      LOG.debug("Split request from " + serverName +
901          ", parent=" + parent + " splitKey=" + Bytes.toStringBinary(splitKey));
902    }
903    master.getMasterProcedureExecutor().submitProcedure(createSplitProcedure(parent, splitKey));
904
905    // If the RS is < 2.0 throw an exception to abort the operation, we are handling the split
906    if (master.getServerManager().getVersionNumber(serverName) < 0x0200000) {
907      throw new UnsupportedOperationException(String.format(
908        "Split handled by the master: parent=%s hriA=%s hriB=%s", parent.getShortNameToLog(), hriA, hriB));
909    }
910  }
911
912  private void updateRegionMergeTransition(final ServerName serverName, final TransitionCode state,
913      final RegionInfo merged, final RegionInfo hriA, final RegionInfo hriB) throws IOException {
914    checkMetaLoaded(merged);
915
916    if (state != TransitionCode.READY_TO_MERGE) {
917      throw new UnexpectedStateException("Unsupported merge regionState=" + state +
918        " for regionA=" + hriA + " regionB=" + hriB + " merged=" + merged +
919        " maybe an old RS (< 2.0) had the operation in progress");
920    }
921
922    // Submit the Merge procedure
923    if (LOG.isDebugEnabled()) {
924      LOG.debug("Handling merge request from RS=" + merged + ", merged=" + merged);
925    }
926    master.getMasterProcedureExecutor().submitProcedure(createMergeProcedure(hriA, hriB));
927
928    // If the RS is < 2.0 throw an exception to abort the operation, we are handling the merge
929    if (master.getServerManager().getVersionNumber(serverName) < 0x0200000) {
930      throw new UnsupportedOperationException(String.format(
931        "Merge not handled yet: regionState=%s merged=%s hriA=%s hriB=%s", state, merged, hriA,
932          hriB));
933    }
934  }
935
936  // ============================================================================================
937  //  RS Status update (report online regions) helpers
938  // ============================================================================================
939  /**
940   * the master will call this method when the RS send the regionServerReport().
941   * the report will contains the "online regions".
942   * this method will check the the online regions against the in-memory state of the AM,
943   * if there is a mismatch we will try to fence out the RS with the assumption
944   * that something went wrong on the RS side.
945   */
946  public void reportOnlineRegions(final ServerName serverName, final Set<byte[]> regionNames)
947      throws YouAreDeadException {
948    if (!isRunning()) return;
949    if (LOG.isTraceEnabled()) {
950      LOG.trace("ReportOnlineRegions " + serverName + " regionCount=" + regionNames.size() +
951        ", metaLoaded=" + isMetaLoaded() + " " +
952          regionNames.stream().map(element -> Bytes.toStringBinary(element)).
953            collect(Collectors.toList()));
954    }
955
956    // Make sure there is a ServerStateNode for this server that just checked in.
957    ServerStateNode serverNode = regionStates.getOrCreateServer(serverName);
958    synchronized (serverNode) {
959      if (!serverNode.isInState(ServerState.ONLINE)) {
960        LOG.warn("Got a report from a server result in state " + serverNode.getState());
961        return;
962      }
963    }
964
965    if (regionNames.isEmpty()) {
966      // nothing to do if we don't have regions
967      LOG.trace("no online region found on " + serverName);
968    } else if (!isMetaLoaded()) {
969      // if we are still on startup, discard the report unless is from someone holding meta
970      checkOnlineRegionsReportForMeta(serverName, regionNames);
971    } else {
972      // The Heartbeat updates us of what regions are only. check and verify the state.
973      checkOnlineRegionsReport(serverNode, regionNames);
974    }
975
976    // wake report event
977    wakeServerReportEvent(serverNode);
978  }
979
980  void checkOnlineRegionsReportForMeta(final ServerName serverName, final Set<byte[]> regionNames) {
981    try {
982      for (byte[] regionName: regionNames) {
983        final RegionInfo hri = getMetaRegionFromName(regionName);
984        if (hri == null) {
985          if (LOG.isTraceEnabled()) {
986            LOG.trace("Skip online report for region=" + Bytes.toStringBinary(regionName) +
987              " while meta is loading");
988          }
989          continue;
990        }
991
992        final RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(hri);
993        LOG.info("META REPORTED: " + regionNode);
994        if (!reportTransition(regionNode, serverName, TransitionCode.OPENED, 0)) {
995          LOG.warn("META REPORTED but no procedure found (complete?); set location={}", serverName);
996          regionNode.setRegionLocation(serverName);
997        } else if (LOG.isTraceEnabled()) {
998          LOG.trace("META REPORTED: " + regionNode);
999        }
1000      }
1001    } catch (UnexpectedStateException e) {
1002      LOG.warn("KILLING " + serverName + ": " + e.getMessage());
1003      killRegionServer(serverName);
1004    }
1005  }
1006
1007  void checkOnlineRegionsReport(final ServerStateNode serverNode, final Set<byte[]> regionNames) {
1008    final ServerName serverName = serverNode.getServerName();
1009    try {
1010      for (byte[] regionName: regionNames) {
1011        if (!isRunning()) return;
1012        final RegionStateNode regionNode = regionStates.getRegionStateNodeFromName(regionName);
1013        if (regionNode == null) {
1014          throw new UnexpectedStateException("Not online: " + Bytes.toStringBinary(regionName));
1015        }
1016        synchronized (regionNode) {
1017          if (regionNode.isInState(State.OPENING, State.OPEN)) {
1018            if (!regionNode.getRegionLocation().equals(serverName)) {
1019              throw new UnexpectedStateException(regionNode.toString() +
1020                " reported OPEN on server=" + serverName +
1021                " but state has otherwise.");
1022            } else if (regionNode.isInState(State.OPENING)) {
1023              try {
1024                if (!reportTransition(regionNode, serverNode.getServerName(),
1025                    TransitionCode.OPENED, 0)) {
1026                  LOG.warn(regionNode.toString() + " reported OPEN on server=" + serverName +
1027                    " but state has otherwise AND NO procedure is running");
1028                }
1029              } catch (UnexpectedStateException e) {
1030                LOG.warn(regionNode.toString() + " reported unexpteced OPEN: " + e.getMessage(), e);
1031              }
1032            }
1033          } else if (!regionNode.isInState(State.CLOSING, State.SPLITTING)) {
1034            long diff = regionNode.getLastUpdate() - EnvironmentEdgeManager.currentTime();
1035            if (diff > 1000/*One Second... make configurable if an issue*/) {
1036              // So, we can get report that a region is CLOSED or SPLIT because a heartbeat
1037              // came in at about same time as a region transition. Make sure there is some
1038              // elapsed time between killing remote server.
1039              throw new UnexpectedStateException(regionNode.toString() +
1040                " reported an unexpected OPEN; time since last update=" + diff);
1041            }
1042          }
1043        }
1044      }
1045    } catch (UnexpectedStateException e) {
1046      //See HBASE-21421, we can count on reportRegionStateTransition calls
1047      //We only log a warming here. It could be a network lag.
1048      LOG.warn("Failed to checkOnlineRegionsReport, maybe due to network lag, "
1049          + "if this message continues, be careful of double assign", e);
1050    }
1051  }
1052
1053  protected boolean waitServerReportEvent(ServerName serverName, Procedure<?> proc) {
1054    ServerStateNode ssn = this.regionStates.getServerNode(serverName);
1055    if (ssn == null) {
1056      LOG.warn("Why is ServerStateNode for {} empty at this point? Creating...", serverName);
1057      ssn = this.regionStates.getOrCreateServer(serverName);
1058    }
1059    return ssn.getReportEvent().suspendIfNotReady(proc);
1060  }
1061
1062  protected void wakeServerReportEvent(final ServerStateNode serverNode) {
1063    serverNode.getReportEvent().wake(getProcedureScheduler());
1064  }
1065
1066  // ============================================================================================
1067  //  RIT chore
1068  // ============================================================================================
1069  private static class RegionInTransitionChore extends ProcedureInMemoryChore<MasterProcedureEnv> {
1070    public RegionInTransitionChore(final int timeoutMsec) {
1071      super(timeoutMsec);
1072    }
1073
1074    @Override
1075    protected void periodicExecute(final MasterProcedureEnv env) {
1076      final AssignmentManager am = env.getAssignmentManager();
1077
1078      final RegionInTransitionStat ritStat = am.computeRegionInTransitionStat();
1079      if (ritStat.hasRegionsOverThreshold()) {
1080        for (RegionState hri: ritStat.getRegionOverThreshold()) {
1081          am.handleRegionOverStuckWarningThreshold(hri.getRegion());
1082        }
1083      }
1084
1085      // update metrics
1086      am.updateRegionsInTransitionMetrics(ritStat);
1087    }
1088  }
1089
1090  public RegionInTransitionStat computeRegionInTransitionStat() {
1091    final RegionInTransitionStat rit = new RegionInTransitionStat(getConfiguration());
1092    rit.update(this);
1093    return rit;
1094  }
1095
1096  public static class RegionInTransitionStat {
1097    private final int ritThreshold;
1098
1099    private HashMap<String, RegionState> ritsOverThreshold = null;
1100    private long statTimestamp;
1101    private long oldestRITTime = 0;
1102    private int totalRITsTwiceThreshold = 0;
1103    private int totalRITs = 0;
1104
1105    @VisibleForTesting
1106    public RegionInTransitionStat(final Configuration conf) {
1107      this.ritThreshold =
1108        conf.getInt(METRICS_RIT_STUCK_WARNING_THRESHOLD, DEFAULT_RIT_STUCK_WARNING_THRESHOLD);
1109    }
1110
1111    public int getRITThreshold() {
1112      return ritThreshold;
1113    }
1114
1115    public long getTimestamp() {
1116      return statTimestamp;
1117    }
1118
1119    public int getTotalRITs() {
1120      return totalRITs;
1121    }
1122
1123    public long getOldestRITTime() {
1124      return oldestRITTime;
1125    }
1126
1127    public int getTotalRITsOverThreshold() {
1128      Map<String, RegionState> m = this.ritsOverThreshold;
1129      return m != null ? m.size() : 0;
1130    }
1131
1132    public boolean hasRegionsTwiceOverThreshold() {
1133      return totalRITsTwiceThreshold > 0;
1134    }
1135
1136    public boolean hasRegionsOverThreshold() {
1137      Map<String, RegionState> m = this.ritsOverThreshold;
1138      return m != null && !m.isEmpty();
1139    }
1140
1141    public Collection<RegionState> getRegionOverThreshold() {
1142      Map<String, RegionState> m = this.ritsOverThreshold;
1143      return m != null? m.values(): Collections.emptySet();
1144    }
1145
1146    public boolean isRegionOverThreshold(final RegionInfo regionInfo) {
1147      Map<String, RegionState> m = this.ritsOverThreshold;
1148      return m != null && m.containsKey(regionInfo.getEncodedName());
1149    }
1150
1151    public boolean isRegionTwiceOverThreshold(final RegionInfo regionInfo) {
1152      Map<String, RegionState> m = this.ritsOverThreshold;
1153      if (m == null) return false;
1154      final RegionState state = m.get(regionInfo.getEncodedName());
1155      if (state == null) return false;
1156      return (statTimestamp - state.getStamp()) > (ritThreshold * 2);
1157    }
1158
1159    protected void update(final AssignmentManager am) {
1160      final RegionStates regionStates = am.getRegionStates();
1161      this.statTimestamp = EnvironmentEdgeManager.currentTime();
1162      update(regionStates.getRegionsStateInTransition(), statTimestamp);
1163      update(regionStates.getRegionFailedOpen(), statTimestamp);
1164    }
1165
1166    private void update(final Collection<RegionState> regions, final long currentTime) {
1167      for (RegionState state: regions) {
1168        totalRITs++;
1169        final long ritTime = currentTime - state.getStamp();
1170        if (ritTime > ritThreshold) {
1171          if (ritsOverThreshold == null) {
1172            ritsOverThreshold = new HashMap<String, RegionState>();
1173          }
1174          ritsOverThreshold.put(state.getRegion().getEncodedName(), state);
1175          totalRITsTwiceThreshold += (ritTime > (ritThreshold * 2)) ? 1 : 0;
1176        }
1177        if (oldestRITTime < ritTime) {
1178          oldestRITTime = ritTime;
1179        }
1180      }
1181    }
1182  }
1183
1184  private void updateRegionsInTransitionMetrics(final RegionInTransitionStat ritStat) {
1185    metrics.updateRITOldestAge(ritStat.getOldestRITTime());
1186    metrics.updateRITCount(ritStat.getTotalRITs());
1187    metrics.updateRITCountOverThreshold(ritStat.getTotalRITsOverThreshold());
1188  }
1189
1190  private void handleRegionOverStuckWarningThreshold(final RegionInfo regionInfo) {
1191    final RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo);
1192    //if (regionNode.isStuck()) {
1193    LOG.warn("STUCK Region-In-Transition {}", regionNode);
1194  }
1195
1196  // ============================================================================================
1197  //  TODO: Master load/bootstrap
1198  // ============================================================================================
1199  public void joinCluster() throws IOException {
1200    long startTime = System.nanoTime();
1201    LOG.debug("Joining cluster...");
1202
1203    // Scan hbase:meta to build list of existing regions, servers, and assignment.
1204    // hbase:meta is online now or will be. Inside loadMeta, we keep trying. Can't make progress
1205    // w/o  meta.
1206    loadMeta();
1207
1208    while (master.getServerManager().countOfRegionServers() < 1) {
1209      LOG.info("Waiting for RegionServers to join; current count={}",
1210        master.getServerManager().countOfRegionServers());
1211      Threads.sleep(250);
1212    }
1213    LOG.info("Number of RegionServers={}", master.getServerManager().countOfRegionServers());
1214
1215    // Start the RIT chore
1216    master.getMasterProcedureExecutor().addChore(this.ritChore);
1217
1218    long costMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
1219    LOG.info("Joined the cluster in {}", StringUtils.humanTimeDiff(costMs));
1220  }
1221
1222  /**
1223   * Create assign procedure for offline regions.
1224   * Just follow the old processofflineServersWithOnlineRegions method. Since now we do not need to
1225   * deal with dead server any more, we only deal with the regions in OFFLINE state in this method.
1226   * And this is a bit strange, that for new regions, we will add it in CLOSED state instead of
1227   * OFFLINE state, and usually there will be a procedure to track them. The
1228   * processofflineServersWithOnlineRegions is a legacy from long ago, as things are going really
1229   * different now, maybe we do not need this method any more. Need to revisit later.
1230   */
1231  // Public so can be run by the Master as part of the startup. Needs hbase:meta to be online.
1232  // Needs to be done after the table state manager has been started.
1233  public void processOfflineRegions() {
1234    List<RegionInfo> offlineRegions = regionStates.getRegionStates().stream()
1235      .filter(RegionState::isOffline).filter(s -> isTableEnabled(s.getRegion().getTable()))
1236      .map(RegionState::getRegion).collect(Collectors.toList());
1237    if (!offlineRegions.isEmpty()) {
1238      master.getMasterProcedureExecutor().submitProcedures(
1239        master.getAssignmentManager().createRoundRobinAssignProcedures(offlineRegions));
1240    }
1241  }
1242
1243  private void loadMeta() throws IOException {
1244    // TODO: use a thread pool
1245    regionStateStore.visitMeta(new RegionStateStore.RegionStateVisitor() {
1246      @Override
1247      public void visitRegionState(Result result, final RegionInfo regionInfo, final State state,
1248          final ServerName regionLocation, final ServerName lastHost, final long openSeqNum) {
1249        if (state == null && regionLocation == null && lastHost == null &&
1250            openSeqNum == SequenceId.NO_SEQUENCE_ID) {
1251          // This is a row with nothing in it.
1252          LOG.warn("Skipping empty row={}", result);
1253          return;
1254        }
1255        State localState = state;
1256        if (localState == null) {
1257          // No region state column data in hbase:meta table! Are I doing a rolling upgrade from
1258          // hbase1 to hbase2? Am I restoring a SNAPSHOT or otherwise adding a region to hbase:meta?
1259          // In any of these cases, state is empty. For now, presume OFFLINE but there are probably
1260          // cases where we need to probe more to be sure this correct; TODO informed by experience.
1261          LOG.info(regionInfo.getEncodedName() + " regionState=null; presuming " + State.OFFLINE);
1262
1263          localState = State.OFFLINE;
1264        }
1265        final RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo);
1266        synchronized (regionNode) {
1267          if (!regionNode.isInTransition()) {
1268            regionNode.setState(localState);
1269            regionNode.setLastHost(lastHost);
1270            regionNode.setRegionLocation(regionLocation);
1271            regionNode.setOpenSeqNum(openSeqNum);
1272            if (localState == State.OPEN) {
1273              assert regionLocation != null : "found null region location for " + regionNode;
1274              regionStates.getOrCreateServer(regionNode.getRegionLocation());
1275              regionStates.addRegionToServer(regionNode);
1276            } else if (localState == State.OFFLINE || regionInfo.isOffline()) {
1277              regionStates.addToOfflineRegions(regionNode);
1278            } else if (localState == State.CLOSED && getTableStateManager().
1279                isTableState(regionNode.getTable(), TableState.State.DISABLED,
1280                TableState.State.DISABLING)) {
1281              // The region is CLOSED and the table is DISABLED/ DISABLING, there is nothing to
1282              // schedule; the region is inert.
1283            } else {
1284              // This is region in CLOSING or OPENING state.
1285              // These regions should have a procedure in replay.
1286              // If they don't, then they will show as STUCK after a while because of the below
1287              // registration and will need intervention by operator to fix. Add them to a server
1288              // even if the server is not around so a SCP cleans them up.
1289              if (regionLocation != null) {
1290                regionStates.getOrCreateServer(regionLocation);
1291              }
1292              regionStates.addRegionInTransition(regionNode, null);
1293            }
1294          } else {
1295            LOG.info("RIT {}", regionNode);
1296          }
1297        }
1298      }
1299    });
1300
1301    // every assignment is blocked until meta is loaded.
1302    wakeMetaLoadedEvent();
1303  }
1304
1305  /**
1306   * Used to check if the meta loading is done.
1307   * <p/>
1308   * if not we throw PleaseHoldException since we are rebuilding the RegionStates
1309   * @param hri region to check if it is already rebuild
1310   * @throws PleaseHoldException if meta has not been loaded yet
1311   */
1312  private void checkMetaLoaded(RegionInfo hri) throws PleaseHoldException {
1313    if (!isRunning()) {
1314      throw new PleaseHoldException("AssignmentManager not running");
1315    }
1316    boolean meta = isMetaRegion(hri);
1317    boolean metaLoaded = isMetaLoaded();
1318    if (!meta && !metaLoaded) {
1319      throw new PleaseHoldException(
1320        "Master not fully online; hbase:meta=" + meta + ", metaLoaded=" + metaLoaded);
1321    }
1322  }
1323
1324  // ============================================================================================
1325  //  TODO: Metrics
1326  // ============================================================================================
1327  public int getNumRegionsOpened() {
1328    // TODO: Used by TestRegionPlacement.java and assume monotonically increasing value
1329    return 0;
1330  }
1331
1332  public long submitServerCrash(final ServerName serverName, final boolean shouldSplitWal) {
1333    boolean carryingMeta = isCarryingMeta(serverName);
1334    ProcedureExecutor<MasterProcedureEnv> procExec = this.master.getMasterProcedureExecutor();
1335    long pid = procExec.submitProcedure(new ServerCrashProcedure(procExec.getEnvironment(),
1336        serverName, shouldSplitWal, carryingMeta));
1337    LOG.debug("Added=" + serverName
1338        + " to dead servers, submitted shutdown handler to be executed meta=" + carryingMeta);
1339    return pid;
1340  }
1341
1342  public void offlineRegion(final RegionInfo regionInfo) {
1343    // TODO used by MasterRpcServices ServerCrashProcedure
1344    final RegionStateNode node = regionStates.getRegionStateNode(regionInfo);
1345    if (node != null) node.offline();
1346  }
1347
1348  public void onlineRegion(final RegionInfo regionInfo, final ServerName serverName) {
1349    // TODO used by TestSplitTransactionOnCluster.java
1350  }
1351
1352  public Map<ServerName, List<RegionInfo>> getSnapShotOfAssignment(
1353      final Collection<RegionInfo> regions) {
1354    return regionStates.getSnapShotOfAssignment(regions);
1355  }
1356
1357  // ============================================================================================
1358  //  TODO: UTILS/HELPERS?
1359  // ============================================================================================
1360  /**
1361   * Used by the client (via master) to identify if all regions have the schema updates
1362   *
1363   * @param tableName
1364   * @return Pair indicating the status of the alter command (pending/total)
1365   * @throws IOException
1366   */
1367  public Pair<Integer, Integer> getReopenStatus(TableName tableName) {
1368    if (isTableDisabled(tableName)) return new Pair<Integer, Integer>(0, 0);
1369
1370    final List<RegionState> states = regionStates.getTableRegionStates(tableName);
1371    int ritCount = 0;
1372    for (RegionState regionState: states) {
1373      if (!regionState.isOpened() && !regionState.isSplit()) {
1374        ritCount++;
1375      }
1376    }
1377    return new Pair<Integer, Integer>(ritCount, states.size());
1378  }
1379
1380  // ============================================================================================
1381  //  TODO: Region State In Transition
1382  // ============================================================================================
1383  protected boolean addRegionInTransition(final RegionStateNode regionNode,
1384      final RegionTransitionProcedure procedure) {
1385    return regionStates.addRegionInTransition(regionNode, procedure);
1386  }
1387
1388  protected void removeRegionInTransition(final RegionStateNode regionNode,
1389      final RegionTransitionProcedure procedure) {
1390    regionStates.removeRegionInTransition(regionNode, procedure);
1391  }
1392
1393  public boolean hasRegionsInTransition() {
1394    return regionStates.hasRegionsInTransition();
1395  }
1396
1397  public List<RegionStateNode> getRegionsInTransition() {
1398    return regionStates.getRegionsInTransition();
1399  }
1400
1401  public List<RegionInfo> getAssignedRegions() {
1402    return regionStates.getAssignedRegions();
1403  }
1404
1405  public RegionInfo getRegionInfo(final byte[] regionName) {
1406    final RegionStateNode regionState = regionStates.getRegionStateNodeFromName(regionName);
1407    return regionState != null ? regionState.getRegionInfo() : null;
1408  }
1409
1410  // ============================================================================================
1411  //  TODO: Region Status update
1412  // ============================================================================================
1413  private void sendRegionOpenedNotification(final RegionInfo regionInfo,
1414      final ServerName serverName) {
1415    getBalancer().regionOnline(regionInfo, serverName);
1416    if (!this.listeners.isEmpty()) {
1417      for (AssignmentListener listener : this.listeners) {
1418        listener.regionOpened(regionInfo, serverName);
1419      }
1420    }
1421  }
1422
1423  private void sendRegionClosedNotification(final RegionInfo regionInfo) {
1424    getBalancer().regionOffline(regionInfo);
1425    if (!this.listeners.isEmpty()) {
1426      for (AssignmentListener listener : this.listeners) {
1427        listener.regionClosed(regionInfo);
1428      }
1429    }
1430  }
1431
1432  public void markRegionAsOpening(final RegionStateNode regionNode) throws IOException {
1433    synchronized (regionNode) {
1434      regionNode.transitionState(State.OPENING, RegionStates.STATES_EXPECTED_ON_OPEN);
1435      regionStates.addRegionToServer(regionNode);
1436      regionStateStore.updateRegionLocation(regionNode);
1437    }
1438
1439    // update the operation count metrics
1440    metrics.incrementOperationCounter();
1441  }
1442
1443  public void undoRegionAsOpening(final RegionStateNode regionNode) {
1444    boolean opening = false;
1445    synchronized (regionNode) {
1446      if (regionNode.isInState(State.OPENING)) {
1447        opening = true;
1448        regionStates.removeRegionFromServer(regionNode.getRegionLocation(), regionNode);
1449      }
1450      // Should we update hbase:meta?
1451    }
1452    if (opening) {
1453      // TODO: Metrics. Do opposite of metrics.incrementOperationCounter();
1454    }
1455  }
1456
1457  public void markRegionAsOpened(final RegionStateNode regionNode) throws IOException {
1458    final RegionInfo hri = regionNode.getRegionInfo();
1459    synchronized (regionNode) {
1460      regionNode.transitionState(State.OPEN, RegionStates.STATES_EXPECTED_ON_OPEN);
1461      if (isMetaRegion(hri)) {
1462        // Usually we'd set a table ENABLED at this stage but hbase:meta is ALWAYs enabled, it
1463        // can't be disabled -- so skip the RPC (besides... enabled is managed by TableStateManager
1464        // which is backed by hbase:meta... Avoid setting ENABLED to avoid having to update state
1465        // on table that contains state.
1466        setMetaAssigned(hri, true);
1467      }
1468      regionStates.addRegionToServer(regionNode);
1469      // TODO: OPENING Updates hbase:meta too... we need to do both here and there?
1470      // That is a lot of hbase:meta writing.
1471      regionStateStore.updateRegionLocation(regionNode);
1472      sendRegionOpenedNotification(hri, regionNode.getRegionLocation());
1473    }
1474  }
1475
1476  public void markRegionAsClosing(final RegionStateNode regionNode) throws IOException {
1477    final RegionInfo hri = regionNode.getRegionInfo();
1478    synchronized (regionNode) {
1479      regionNode.transitionState(State.CLOSING, RegionStates.STATES_EXPECTED_ON_CLOSE);
1480      // Set meta has not initialized early. so people trying to create/edit tables will wait
1481      if (isMetaRegion(hri)) {
1482        setMetaAssigned(hri, false);
1483      }
1484      regionStates.addRegionToServer(regionNode);
1485      regionStateStore.updateRegionLocation(regionNode);
1486    }
1487
1488    // update the operation count metrics
1489    metrics.incrementOperationCounter();
1490  }
1491
1492  public void undoRegionAsClosing(final RegionStateNode regionNode) {
1493    // TODO: Metrics. Do opposite of metrics.incrementOperationCounter();
1494    // There is nothing to undo?
1495  }
1496
1497  public void markRegionAsClosed(final RegionStateNode regionNode) throws IOException {
1498    final RegionInfo hri = regionNode.getRegionInfo();
1499    synchronized (regionNode) {
1500      regionNode.transitionState(State.CLOSED, RegionStates.STATES_EXPECTED_ON_CLOSE);
1501      regionStates.removeRegionFromServer(regionNode.getRegionLocation(), regionNode);
1502      regionNode.setLastHost(regionNode.getRegionLocation());
1503      regionNode.setRegionLocation(null);
1504      regionStateStore.updateRegionLocation(regionNode);
1505      sendRegionClosedNotification(hri);
1506    }
1507  }
1508
1509  public void markRegionAsSplit(final RegionInfo parent, final ServerName serverName,
1510      final RegionInfo daughterA, final RegionInfo daughterB) throws IOException {
1511    // Update hbase:meta. Parent will be marked offline and split up in hbase:meta.
1512    // The parent stays in regionStates until cleared when removed by CatalogJanitor.
1513    // Update its state in regionStates to it shows as offline and split when read
1514    // later figuring what regions are in a table and what are not: see
1515    // regionStates#getRegionsOfTable
1516    final RegionStateNode node = regionStates.getOrCreateRegionStateNode(parent);
1517    node.setState(State.SPLIT);
1518    final RegionStateNode nodeA = regionStates.getOrCreateRegionStateNode(daughterA);
1519    nodeA.setState(State.SPLITTING_NEW);
1520    final RegionStateNode nodeB = regionStates.getOrCreateRegionStateNode(daughterB);
1521    nodeB.setState(State.SPLITTING_NEW);
1522
1523    regionStateStore.splitRegion(parent, daughterA, daughterB, serverName);
1524    if (shouldAssignFavoredNodes(parent)) {
1525      List<ServerName> onlineServers = this.master.getServerManager().getOnlineServersList();
1526      ((FavoredNodesPromoter)getBalancer()).
1527          generateFavoredNodesForDaughter(onlineServers, parent, daughterA, daughterB);
1528    }
1529  }
1530
1531  /**
1532   * When called here, the merge has happened. The two merged regions have been
1533   * unassigned and the above markRegionClosed has been called on each so they have been
1534   * disassociated from a hosting Server. The merged region will be open after this call. The
1535   * merged regions are removed from hbase:meta below> Later they are deleted from the filesystem
1536   * by the catalog janitor running against hbase:meta. It notices when the merged region no
1537   * longer holds references to the old regions.
1538   */
1539  public void markRegionAsMerged(final RegionInfo child, final ServerName serverName,
1540      final RegionInfo mother, final RegionInfo father) throws IOException {
1541    final RegionStateNode node = regionStates.getOrCreateRegionStateNode(child);
1542    node.setState(State.MERGED);
1543    regionStates.deleteRegion(mother);
1544    regionStates.deleteRegion(father);
1545    regionStateStore.mergeRegions(child, mother, father, serverName);
1546    if (shouldAssignFavoredNodes(child)) {
1547      ((FavoredNodesPromoter)getBalancer()).
1548        generateFavoredNodesForMergedRegion(child, mother, father);
1549    }
1550  }
1551
1552  /*
1553   * Favored nodes should be applied only when FavoredNodes balancer is configured and the region
1554   * belongs to a non-system table.
1555   */
1556  private boolean shouldAssignFavoredNodes(RegionInfo region) {
1557    return this.shouldAssignRegionsWithFavoredNodes &&
1558        FavoredNodesManager.isFavoredNodeApplicable(region);
1559  }
1560
1561  // ============================================================================================
1562  //  Assign Queue (Assign/Balance)
1563  // ============================================================================================
1564  private final ArrayList<RegionStateNode> pendingAssignQueue = new ArrayList<RegionStateNode>();
1565  private final ReentrantLock assignQueueLock = new ReentrantLock();
1566  private final Condition assignQueueFullCond = assignQueueLock.newCondition();
1567
1568  /**
1569   * Add the assign operation to the assignment queue.
1570   * The pending assignment operation will be processed,
1571   * and each region will be assigned by a server using the balancer.
1572   */
1573  protected void queueAssign(final RegionStateNode regionNode) {
1574    regionNode.getProcedureEvent().suspend();
1575
1576    // TODO: quick-start for meta and the other sys-tables?
1577    assignQueueLock.lock();
1578    try {
1579      pendingAssignQueue.add(regionNode);
1580      if (regionNode.isSystemTable() ||
1581          pendingAssignQueue.size() == 1 ||
1582          pendingAssignQueue.size() >= assignDispatchWaitQueueMaxSize) {
1583        assignQueueFullCond.signal();
1584      }
1585    } finally {
1586      assignQueueLock.unlock();
1587    }
1588  }
1589
1590  private void startAssignmentThread() {
1591    // Get Server Thread name. Sometimes the Server is mocked so may not implement HasThread.
1592    // For example, in tests.
1593    String name = master instanceof HasThread? ((HasThread)master).getName():
1594        master.getServerName().toShortString();
1595    assignThread = new Thread(name) {
1596      @Override
1597      public void run() {
1598        while (isRunning()) {
1599          processAssignQueue();
1600        }
1601        pendingAssignQueue.clear();
1602      }
1603    };
1604    assignThread.setDaemon(true);
1605    assignThread.start();
1606  }
1607
1608  private void stopAssignmentThread() {
1609    assignQueueSignal();
1610    try {
1611      while (assignThread.isAlive()) {
1612        assignQueueSignal();
1613        assignThread.join(250);
1614      }
1615    } catch (InterruptedException e) {
1616      LOG.warn("join interrupted", e);
1617      Thread.currentThread().interrupt();
1618    }
1619  }
1620
1621  private void assignQueueSignal() {
1622    assignQueueLock.lock();
1623    try {
1624      assignQueueFullCond.signal();
1625    } finally {
1626      assignQueueLock.unlock();
1627    }
1628  }
1629
1630  @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP")
1631  private HashMap<RegionInfo, RegionStateNode> waitOnAssignQueue() {
1632    HashMap<RegionInfo, RegionStateNode> regions = null;
1633
1634    assignQueueLock.lock();
1635    try {
1636      if (pendingAssignQueue.isEmpty() && isRunning()) {
1637        assignQueueFullCond.await();
1638      }
1639
1640      if (!isRunning()) return null;
1641      assignQueueFullCond.await(assignDispatchWaitMillis, TimeUnit.MILLISECONDS);
1642      regions = new HashMap<RegionInfo, RegionStateNode>(pendingAssignQueue.size());
1643      for (RegionStateNode regionNode: pendingAssignQueue) {
1644        regions.put(regionNode.getRegionInfo(), regionNode);
1645      }
1646      pendingAssignQueue.clear();
1647    } catch (InterruptedException e) {
1648      LOG.warn("got interrupted ", e);
1649      Thread.currentThread().interrupt();
1650    } finally {
1651      assignQueueLock.unlock();
1652    }
1653    return regions;
1654  }
1655
1656  private void processAssignQueue() {
1657    final HashMap<RegionInfo, RegionStateNode> regions = waitOnAssignQueue();
1658    if (regions == null || regions.size() == 0 || !isRunning()) {
1659      return;
1660    }
1661
1662    if (LOG.isTraceEnabled()) {
1663      LOG.trace("PROCESS ASSIGN QUEUE regionCount=" + regions.size());
1664    }
1665
1666    // TODO: Optimize balancer. pass a RegionPlan?
1667    final HashMap<RegionInfo, ServerName> retainMap = new HashMap<>();
1668    final List<RegionInfo> userHRIs = new ArrayList<>(regions.size());
1669    // Regions for system tables requiring reassignment
1670    final List<RegionInfo> systemHRIs = new ArrayList<>();
1671    for (RegionStateNode regionStateNode: regions.values()) {
1672      boolean sysTable = regionStateNode.isSystemTable();
1673      final List<RegionInfo> hris = sysTable? systemHRIs: userHRIs;
1674      if (regionStateNode.getRegionLocation() != null) {
1675        retainMap.put(regionStateNode.getRegionInfo(), regionStateNode.getRegionLocation());
1676      } else {
1677        hris.add(regionStateNode.getRegionInfo());
1678      }
1679    }
1680
1681    // TODO: connect with the listener to invalidate the cache
1682
1683    // TODO use events
1684    List<ServerName> servers = master.getServerManager().createDestinationServersList();
1685    for (int i = 0; servers.size() < 1; ++i) {
1686      // Report every fourth time around this loop; try not to flood log.
1687      if (i % 4 == 0) {
1688        LOG.warn("No servers available; cannot place " + regions.size() + " unassigned regions.");
1689      }
1690
1691      if (!isRunning()) {
1692        LOG.debug("Stopped! Dropping assign of " + regions.size() + " queued regions.");
1693        return;
1694      }
1695      Threads.sleep(250);
1696      servers = master.getServerManager().createDestinationServersList();
1697    }
1698
1699    if (!systemHRIs.isEmpty()) {
1700      // System table regions requiring reassignment are present, get region servers
1701      // not available for system table regions
1702      final List<ServerName> excludeServers = getExcludedServersForSystemTable();
1703      List<ServerName> serversForSysTables = servers.stream()
1704          .filter(s -> !excludeServers.contains(s)).collect(Collectors.toList());
1705      if (serversForSysTables.isEmpty()) {
1706        LOG.warn("Filtering old server versions and the excluded produced an empty set; " +
1707            "instead considering all candidate servers!");
1708      }
1709      LOG.debug("Processing assignQueue; systemServersCount=" + serversForSysTables.size() +
1710          ", allServersCount=" + servers.size());
1711      processAssignmentPlans(regions, null, systemHRIs,
1712          serversForSysTables.isEmpty()? servers: serversForSysTables);
1713    }
1714
1715    processAssignmentPlans(regions, retainMap, userHRIs, servers);
1716  }
1717
1718  private void processAssignmentPlans(final HashMap<RegionInfo, RegionStateNode> regions,
1719      final HashMap<RegionInfo, ServerName> retainMap, final List<RegionInfo> hris,
1720      final List<ServerName> servers) {
1721    boolean isTraceEnabled = LOG.isTraceEnabled();
1722    if (isTraceEnabled) {
1723      LOG.trace("Available servers count=" + servers.size() + ": " + servers);
1724    }
1725
1726    final LoadBalancer balancer = getBalancer();
1727    // ask the balancer where to place regions
1728    if (retainMap != null && !retainMap.isEmpty()) {
1729      if (isTraceEnabled) {
1730        LOG.trace("retain assign regions=" + retainMap);
1731      }
1732      try {
1733        acceptPlan(regions, balancer.retainAssignment(retainMap, servers));
1734      } catch (HBaseIOException e) {
1735        LOG.warn("unable to retain assignment", e);
1736        addToPendingAssignment(regions, retainMap.keySet());
1737      }
1738    }
1739
1740    // TODO: Do we need to split retain and round-robin?
1741    // the retain seems to fallback to round-robin/random if the region is not in the map.
1742    if (!hris.isEmpty()) {
1743      Collections.sort(hris, RegionInfo.COMPARATOR);
1744      if (isTraceEnabled) {
1745        LOG.trace("round robin regions=" + hris);
1746      }
1747      try {
1748        acceptPlan(regions, balancer.roundRobinAssignment(hris, servers));
1749      } catch (HBaseIOException e) {
1750        LOG.warn("unable to round-robin assignment", e);
1751        addToPendingAssignment(regions, hris);
1752      }
1753    }
1754  }
1755
1756  private void acceptPlan(final HashMap<RegionInfo, RegionStateNode> regions,
1757      final Map<ServerName, List<RegionInfo>> plan) throws HBaseIOException {
1758    final ProcedureEvent<?>[] events = new ProcedureEvent[regions.size()];
1759    final long st = System.currentTimeMillis();
1760
1761    if (plan == null) {
1762      throw new HBaseIOException("unable to compute plans for regions=" + regions.size());
1763    }
1764
1765    if (plan.isEmpty()) return;
1766
1767    int evcount = 0;
1768    for (Map.Entry<ServerName, List<RegionInfo>> entry: plan.entrySet()) {
1769      final ServerName server = entry.getKey();
1770      for (RegionInfo hri: entry.getValue()) {
1771        final RegionStateNode regionNode = regions.get(hri);
1772        regionNode.setRegionLocation(server);
1773        events[evcount++] = regionNode.getProcedureEvent();
1774      }
1775    }
1776    ProcedureEvent.wakeEvents(getProcedureScheduler(), events);
1777
1778    final long et = System.currentTimeMillis();
1779    if (LOG.isTraceEnabled()) {
1780      LOG.trace("ASSIGN ACCEPT " + events.length + " -> " +
1781          StringUtils.humanTimeDiff(et - st));
1782    }
1783  }
1784
1785  private void addToPendingAssignment(final HashMap<RegionInfo, RegionStateNode> regions,
1786      final Collection<RegionInfo> pendingRegions) {
1787    assignQueueLock.lock();
1788    try {
1789      for (RegionInfo hri: pendingRegions) {
1790        pendingAssignQueue.add(regions.get(hri));
1791      }
1792    } finally {
1793      assignQueueLock.unlock();
1794    }
1795  }
1796
1797  /**
1798   * Get a list of servers that this region cannot be assigned to.
1799   * For system tables, we must assign them to a server with highest version.
1800   */
1801  public List<ServerName> getExcludedServersForSystemTable() {
1802    // TODO: This should be a cached list kept by the ServerManager rather than calculated on each
1803    // move or system region assign. The RegionServerTracker keeps list of online Servers with
1804    // RegionServerInfo that includes Version.
1805    List<Pair<ServerName, String>> serverList = master.getServerManager().getOnlineServersList()
1806        .stream()
1807        .map((s)->new Pair<>(s, master.getRegionServerVersion(s)))
1808        .collect(Collectors.toList());
1809    if (serverList.isEmpty()) {
1810      return Collections.emptyList();
1811    }
1812    String highestVersion = Collections.max(serverList,
1813        (o1, o2) -> VersionInfo.compareVersion(o1.getSecond(), o2.getSecond())).getSecond();
1814    return serverList.stream()
1815        .filter((p)->!p.getSecond().equals(highestVersion))
1816        .map(Pair::getFirst)
1817        .collect(Collectors.toList());
1818  }
1819
1820  // ============================================================================================
1821  //  Server Helpers
1822  // ============================================================================================
1823  @Override
1824  public void serverAdded(final ServerName serverName) {
1825  }
1826
1827  @Override
1828  public void serverRemoved(final ServerName serverName) {
1829    final ServerStateNode serverNode = regionStates.getServerNode(serverName);
1830    if (serverNode == null) return;
1831
1832    // just in case, wake procedures waiting for this server report
1833    wakeServerReportEvent(serverNode);
1834  }
1835
1836  private void killRegionServer(final ServerName serverName) {
1837    master.getServerManager().expireServer(serverName);
1838  }
1839
1840  /**
1841   * <p>
1842   * This is a very particular check. The {@link org.apache.hadoop.hbase.master.ServerManager} is
1843   * where you go to check on state of 'Servers', what Servers are online, etc.
1844   * </p>
1845   * <p>
1846   * Here we are checking the state of a server that is post expiration, a ServerManager function
1847   * that moves a server from online to dead. Here we are seeing if the server has moved beyond a
1848   * particular point in the recovery process such that it is safe to move on with assigns; etc.
1849   * </p>
1850   * <p>
1851   * For now it is only used in
1852   * {@link UnassignProcedure#remoteCallFailed(MasterProcedureEnv, RegionStateNode, IOException)} to
1853   * see whether we can safely quit without losing data.
1854   * </p>
1855   * @param meta whether to check for meta log splitting
1856   * @return {@code true} if the server does not exist or the log splitting is done, i.e, the server
1857   *         is in OFFLINE state, or for meta log, is in SPLITTING_META_DONE state. If null,
1858   *         presumes the ServerStateNode was cleaned up by SCP.
1859   * @see UnassignProcedure#remoteCallFailed(MasterProcedureEnv, RegionStateNode, IOException)
1860   */
1861  boolean isLogSplittingDone(ServerName serverName, boolean meta) {
1862    ServerStateNode ssn = this.regionStates.getServerNode(serverName);
1863    if (ssn == null) {
1864      return true;
1865    }
1866    ServerState[] inState =
1867      meta
1868        ? new ServerState[] { ServerState.SPLITTING_META_DONE, ServerState.SPLITTING,
1869          ServerState.OFFLINE }
1870        : new ServerState[] { ServerState.OFFLINE };
1871    synchronized (ssn) {
1872      return ssn.isInState(inState);
1873    }
1874  }
1875
1876  @VisibleForTesting
1877  MasterServices getMaster() {
1878    return master;
1879  }
1880}