001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.assignment;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Arrays;
023import java.util.Collection;
024import java.util.Collections;
025import java.util.HashMap;
026import java.util.HashSet;
027import java.util.List;
028import java.util.Map;
029import java.util.Set;
030import java.util.concurrent.CopyOnWriteArrayList;
031import java.util.concurrent.Future;
032import java.util.concurrent.TimeUnit;
033import java.util.concurrent.atomic.AtomicBoolean;
034import java.util.concurrent.locks.Condition;
035import java.util.concurrent.locks.ReentrantLock;
036import java.util.stream.Collectors;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.hbase.HBaseIOException;
039import org.apache.hadoop.hbase.HConstants;
040import org.apache.hadoop.hbase.PleaseHoldException;
041import org.apache.hadoop.hbase.ServerName;
042import org.apache.hadoop.hbase.TableName;
043import org.apache.hadoop.hbase.YouAreDeadException;
044import org.apache.hadoop.hbase.client.RegionInfo;
045import org.apache.hadoop.hbase.client.RegionInfoBuilder;
046import org.apache.hadoop.hbase.client.Result;
047import org.apache.hadoop.hbase.client.TableState;
048import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
049import org.apache.hadoop.hbase.favored.FavoredNodesManager;
050import org.apache.hadoop.hbase.favored.FavoredNodesPromoter;
051import org.apache.hadoop.hbase.master.AssignmentListener;
052import org.apache.hadoop.hbase.master.LoadBalancer;
053import org.apache.hadoop.hbase.master.MasterServices;
054import org.apache.hadoop.hbase.master.MetricsAssignmentManager;
055import org.apache.hadoop.hbase.master.RegionPlan;
056import org.apache.hadoop.hbase.master.RegionState;
057import org.apache.hadoop.hbase.master.RegionState.State;
058import org.apache.hadoop.hbase.master.ServerListener;
059import org.apache.hadoop.hbase.master.TableStateManager;
060import org.apache.hadoop.hbase.master.assignment.RegionStates.RegionStateNode;
061import org.apache.hadoop.hbase.master.assignment.RegionStates.ServerState;
062import org.apache.hadoop.hbase.master.assignment.RegionStates.ServerStateNode;
063import org.apache.hadoop.hbase.master.balancer.FavoredStochasticBalancer;
064import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
065import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler;
066import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait;
067import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
068import org.apache.hadoop.hbase.procedure2.Procedure;
069import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
070import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
071import org.apache.hadoop.hbase.procedure2.ProcedureInMemoryChore;
072import org.apache.hadoop.hbase.procedure2.util.StringUtils;
073import org.apache.hadoop.hbase.regionserver.SequenceId;
074import org.apache.hadoop.hbase.util.Bytes;
075import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
076import org.apache.hadoop.hbase.util.HasThread;
077import org.apache.hadoop.hbase.util.Pair;
078import org.apache.hadoop.hbase.util.Threads;
079import org.apache.hadoop.hbase.util.VersionInfo;
080import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
081import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
082import org.apache.yetus.audience.InterfaceAudience;
083import org.apache.zookeeper.KeeperException;
084import org.slf4j.Logger;
085import org.slf4j.LoggerFactory;
086
087import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
088
089import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
090import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
091import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
092import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
093import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
094
095/**
096 * The AssignmentManager is the coordinator for region assign/unassign operations.
097 * <ul>
098 * <li>In-memory states of regions and servers are stored in {@link RegionStates}.</li>
099 * <li>hbase:meta state updates are handled by {@link RegionStateStore}.</li>
100 * </ul>
101 * Regions are created by CreateTable, Split, Merge.
102 * Regions are deleted by DeleteTable, Split, Merge.
103 * Assigns are triggered by CreateTable, EnableTable, Split, Merge, ServerCrash.
104 * Unassigns are triggered by DisableTable, Split, Merge
105 */
106@InterfaceAudience.Private
107public class AssignmentManager implements ServerListener {
108  private static final Logger LOG = LoggerFactory.getLogger(AssignmentManager.class);
109
110  // TODO: AMv2
111  //  - handle region migration from hbase1 to hbase2.
112  //  - handle sys table assignment first (e.g. acl, namespace)
113  //  - handle table priorities
114  //  - If ServerBusyException trying to update hbase:meta, we abort the Master
115  //   See updateRegionLocation in RegionStateStore.
116  //
117  // See also
118  // https://docs.google.com/document/d/1eVKa7FHdeoJ1-9o8yZcOTAQbv0u0bblBlCCzVSIn69g/edit#heading=h.ystjyrkbtoq5
119  // for other TODOs.
120
121  public static final String BOOTSTRAP_THREAD_POOL_SIZE_CONF_KEY =
122      "hbase.assignment.bootstrap.thread.pool.size";
123
124  public static final String ASSIGN_DISPATCH_WAIT_MSEC_CONF_KEY =
125      "hbase.assignment.dispatch.wait.msec";
126  private static final int DEFAULT_ASSIGN_DISPATCH_WAIT_MSEC = 150;
127
128  public static final String ASSIGN_DISPATCH_WAITQ_MAX_CONF_KEY =
129      "hbase.assignment.dispatch.wait.queue.max.size";
130  private static final int DEFAULT_ASSIGN_DISPATCH_WAITQ_MAX = 100;
131
132  public static final String RIT_CHORE_INTERVAL_MSEC_CONF_KEY =
133      "hbase.assignment.rit.chore.interval.msec";
134  private static final int DEFAULT_RIT_CHORE_INTERVAL_MSEC = 60 * 1000;
135
136  public static final String ASSIGN_MAX_ATTEMPTS =
137      "hbase.assignment.maximum.attempts";
138  private static final int DEFAULT_ASSIGN_MAX_ATTEMPTS = Integer.MAX_VALUE;
139
140  /** Region in Transition metrics threshold time */
141  public static final String METRICS_RIT_STUCK_WARNING_THRESHOLD =
142      "hbase.metrics.rit.stuck.warning.threshold";
143  private static final int DEFAULT_RIT_STUCK_WARNING_THRESHOLD = 60 * 1000;
144
145  private final ProcedureEvent<?> metaAssignEvent = new ProcedureEvent<>("meta assign");
146  private final ProcedureEvent<?> metaLoadEvent = new ProcedureEvent<>("meta load");
147
148  /** Listeners that are called on assignment events. */
149  private final CopyOnWriteArrayList<AssignmentListener> listeners =
150      new CopyOnWriteArrayList<AssignmentListener>();
151
152  private final MetricsAssignmentManager metrics;
153  private final RegionInTransitionChore ritChore;
154  private final MasterServices master;
155
156  private final AtomicBoolean running = new AtomicBoolean(false);
157  private final RegionStates regionStates = new RegionStates();
158  private final RegionStateStore regionStateStore;
159
160  private final Map<ServerName, Set<byte[]>> rsReports = new HashMap<>();
161
162  private final boolean shouldAssignRegionsWithFavoredNodes;
163  private final int assignDispatchWaitQueueMaxSize;
164  private final int assignDispatchWaitMillis;
165  private final int assignMaxAttempts;
166
167  private final Object checkIfShouldMoveSystemRegionLock = new Object();
168
169  private Thread assignThread;
170
171  public AssignmentManager(final MasterServices master) {
172    this(master, new RegionStateStore(master));
173  }
174
175  public AssignmentManager(final MasterServices master, final RegionStateStore stateStore) {
176    this.master = master;
177    this.regionStateStore = stateStore;
178    this.metrics = new MetricsAssignmentManager();
179
180    final Configuration conf = master.getConfiguration();
181
182    // Only read favored nodes if using the favored nodes load balancer.
183    this.shouldAssignRegionsWithFavoredNodes = FavoredStochasticBalancer.class.isAssignableFrom(
184        conf.getClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, Object.class));
185
186    this.assignDispatchWaitMillis = conf.getInt(ASSIGN_DISPATCH_WAIT_MSEC_CONF_KEY,
187        DEFAULT_ASSIGN_DISPATCH_WAIT_MSEC);
188    this.assignDispatchWaitQueueMaxSize = conf.getInt(ASSIGN_DISPATCH_WAITQ_MAX_CONF_KEY,
189        DEFAULT_ASSIGN_DISPATCH_WAITQ_MAX);
190
191    this.assignMaxAttempts = Math.max(1, conf.getInt(ASSIGN_MAX_ATTEMPTS,
192        DEFAULT_ASSIGN_MAX_ATTEMPTS));
193
194    int ritChoreInterval = conf.getInt(RIT_CHORE_INTERVAL_MSEC_CONF_KEY,
195        DEFAULT_RIT_CHORE_INTERVAL_MSEC);
196    this.ritChore = new RegionInTransitionChore(ritChoreInterval);
197  }
198
199  public void start() throws IOException, KeeperException {
200    if (!running.compareAndSet(false, true)) {
201      return;
202    }
203
204    LOG.trace("Starting assignment manager");
205
206    // Register Server Listener
207    master.getServerManager().registerListener(this);
208
209    // Start the Assignment Thread
210    startAssignmentThread();
211
212    // load meta region state
213    ZKWatcher zkw = master.getZooKeeper();
214    // it could be null in some tests
215    if (zkw != null) {
216      RegionState regionState = MetaTableLocator.getMetaRegionState(zkw);
217      RegionStateNode regionStateNode =
218        regionStates.getOrCreateRegionStateNode(RegionInfoBuilder.FIRST_META_REGIONINFO);
219      synchronized (regionStateNode) {
220        regionStateNode.setRegionLocation(regionState.getServerName());
221        regionStateNode.setState(regionState.getState());
222        setMetaAssigned(regionState.getRegion(), regionState.getState() == State.OPEN);
223      }
224    }
225  }
226
227  public void stop() {
228    if (!running.compareAndSet(true, false)) {
229      return;
230    }
231
232    LOG.info("Stopping assignment manager");
233
234    // The AM is started before the procedure executor,
235    // but the actual work will be loaded/submitted only once we have the executor
236    final boolean hasProcExecutor = master.getMasterProcedureExecutor() != null;
237
238    // Remove the RIT chore
239    if (hasProcExecutor) {
240      master.getMasterProcedureExecutor().removeChore(this.ritChore);
241    }
242
243    // Stop the Assignment Thread
244    stopAssignmentThread();
245
246    // Stop the RegionStateStore
247    regionStates.clear();
248
249    // Unregister Server Listener
250    master.getServerManager().unregisterListener(this);
251
252    // Update meta events (for testing)
253    if (hasProcExecutor) {
254      metaLoadEvent.suspend();
255      for (RegionInfo hri: getMetaRegionSet()) {
256        setMetaAssigned(hri, false);
257      }
258    }
259  }
260
261  public boolean isRunning() {
262    return running.get();
263  }
264
265  public Configuration getConfiguration() {
266    return master.getConfiguration();
267  }
268
269  public MetricsAssignmentManager getAssignmentManagerMetrics() {
270    return metrics;
271  }
272
273  private LoadBalancer getBalancer() {
274    return master.getLoadBalancer();
275  }
276
277  private MasterProcedureEnv getProcedureEnvironment() {
278    return master.getMasterProcedureExecutor().getEnvironment();
279  }
280
281  private MasterProcedureScheduler getProcedureScheduler() {
282    return getProcedureEnvironment().getProcedureScheduler();
283  }
284
285  int getAssignMaxAttempts() {
286    return assignMaxAttempts;
287  }
288
289  /**
290   * Add the listener to the notification list.
291   * @param listener The AssignmentListener to register
292   */
293  public void registerListener(final AssignmentListener listener) {
294    this.listeners.add(listener);
295  }
296
297  /**
298   * Remove the listener from the notification list.
299   * @param listener The AssignmentListener to unregister
300   */
301  public boolean unregisterListener(final AssignmentListener listener) {
302    return this.listeners.remove(listener);
303  }
304
305  public RegionStates getRegionStates() {
306    return regionStates;
307  }
308
309  public RegionStateStore getRegionStateStore() {
310    return regionStateStore;
311  }
312
313  public List<ServerName> getFavoredNodes(final RegionInfo regionInfo) {
314    return this.shouldAssignRegionsWithFavoredNodes?
315        ((FavoredStochasticBalancer)getBalancer()).getFavoredNodes(regionInfo):
316          ServerName.EMPTY_SERVER_LIST;
317  }
318
319  // ============================================================================================
320  //  Table State Manager helpers
321  // ============================================================================================
322  TableStateManager getTableStateManager() {
323    return master.getTableStateManager();
324  }
325
326  public boolean isTableEnabled(final TableName tableName) {
327    return getTableStateManager().isTableState(tableName, TableState.State.ENABLED);
328  }
329
330  public boolean isTableDisabled(final TableName tableName) {
331    return getTableStateManager().isTableState(tableName,
332      TableState.State.DISABLED, TableState.State.DISABLING);
333  }
334
335  // ============================================================================================
336  //  META Helpers
337  // ============================================================================================
338  private boolean isMetaRegion(final RegionInfo regionInfo) {
339    return regionInfo.isMetaRegion();
340  }
341
342  public boolean isMetaRegion(final byte[] regionName) {
343    return getMetaRegionFromName(regionName) != null;
344  }
345
346  public RegionInfo getMetaRegionFromName(final byte[] regionName) {
347    for (RegionInfo hri: getMetaRegionSet()) {
348      if (Bytes.equals(hri.getRegionName(), regionName)) {
349        return hri;
350      }
351    }
352    return null;
353  }
354
355  public boolean isCarryingMeta(final ServerName serverName) {
356    // TODO: handle multiple meta
357    return isCarryingRegion(serverName, RegionInfoBuilder.FIRST_META_REGIONINFO);
358  }
359
360  private boolean isCarryingRegion(final ServerName serverName, final RegionInfo regionInfo) {
361    // TODO: check for state?
362    final RegionStateNode node = regionStates.getRegionStateNode(regionInfo);
363    return(node != null && serverName.equals(node.getRegionLocation()));
364  }
365
366  private RegionInfo getMetaForRegion(final RegionInfo regionInfo) {
367    //if (regionInfo.isMetaRegion()) return regionInfo;
368    // TODO: handle multiple meta. if the region provided is not meta lookup
369    // which meta the region belongs to.
370    return RegionInfoBuilder.FIRST_META_REGIONINFO;
371  }
372
373  // TODO: handle multiple meta.
374  private static final Set<RegionInfo> META_REGION_SET =
375      Collections.singleton(RegionInfoBuilder.FIRST_META_REGIONINFO);
376  public Set<RegionInfo> getMetaRegionSet() {
377    return META_REGION_SET;
378  }
379
380  // ============================================================================================
381  //  META Event(s) helpers
382  // ============================================================================================
383  /**
384   * Notice that, this only means the meta region is available on a RS, but the AM may still be
385   * loading the region states from meta, so usually you need to check {@link #isMetaLoaded()} first
386   * before checking this method, unless you can make sure that your piece of code can only be
387   * executed after AM builds the region states.
388   * @see #isMetaLoaded()
389   */
390  public boolean isMetaAssigned() {
391    return metaAssignEvent.isReady();
392  }
393
394  public boolean isMetaRegionInTransition() {
395    return !isMetaAssigned();
396  }
397
398  /**
399   * Notice that this event does not mean the AM has already finished region state rebuilding. See
400   * the comment of {@link #isMetaAssigned()} for more details.
401   * @see #isMetaAssigned()
402   */
403  public boolean waitMetaAssigned(Procedure<?> proc, RegionInfo regionInfo) {
404    return getMetaAssignEvent(getMetaForRegion(regionInfo)).suspendIfNotReady(proc);
405  }
406
407  private void setMetaAssigned(RegionInfo metaRegionInfo, boolean assigned) {
408    assert isMetaRegion(metaRegionInfo) : "unexpected non-meta region " + metaRegionInfo;
409    ProcedureEvent<?> metaAssignEvent = getMetaAssignEvent(metaRegionInfo);
410    if (assigned) {
411      metaAssignEvent.wake(getProcedureScheduler());
412    } else {
413      metaAssignEvent.suspend();
414    }
415  }
416
417  private ProcedureEvent<?> getMetaAssignEvent(RegionInfo metaRegionInfo) {
418    assert isMetaRegion(metaRegionInfo) : "unexpected non-meta region " + metaRegionInfo;
419    // TODO: handle multiple meta.
420    return metaAssignEvent;
421  }
422
423  /**
424   * Wait until AM finishes the meta loading, i.e, the region states rebuilding.
425   * @see #isMetaLoaded()
426   * @see #waitMetaAssigned(Procedure, RegionInfo)
427   */
428  public boolean waitMetaLoaded(Procedure<?> proc) {
429    return metaLoadEvent.suspendIfNotReady(proc);
430  }
431
432  @VisibleForTesting
433  void wakeMetaLoadedEvent() {
434    metaLoadEvent.wake(getProcedureScheduler());
435    assert isMetaLoaded() : "expected meta to be loaded";
436  }
437
438  /**
439   * Return whether AM finishes the meta loading, i.e, the region states rebuilding.
440   * @see #isMetaAssigned()
441   * @see #waitMetaLoaded(Procedure)
442   */
443  public boolean isMetaLoaded() {
444    return metaLoadEvent.isReady();
445  }
446
447  /**
448   * Start a new thread to check if there are region servers whose versions are higher than others.
449   * If so, move all system table regions to RS with the highest version to keep compatibility.
450   * The reason is, RS in new version may not be able to access RS in old version when there are
451   * some incompatible changes.
452   * <p>This method is called when a new RegionServer is added to cluster only.</p>
453   */
454  public void checkIfShouldMoveSystemRegionAsync() {
455    // TODO: Fix this thread. If a server is killed and a new one started, this thread thinks that
456    // it should 'move' the system tables from the old server to the new server but
457    // ServerCrashProcedure is on it; and it will take care of the assign without dataloss.
458    if (this.master.getServerManager().countOfRegionServers() <= 1) {
459      return;
460    }
461    // This thread used to run whenever there was a change in the cluster. The ZooKeeper
462    // childrenChanged notification came in before the nodeDeleted message and so this method
463    // cold run before a ServerCrashProcedure could run. That meant that this thread could see
464    // a Crashed Server before ServerCrashProcedure and it could find system regions on the
465    // crashed server and go move them before ServerCrashProcedure had a chance; could be
466    // dataloss too if WALs were not recovered.
467    new Thread(() -> {
468      try {
469        synchronized (checkIfShouldMoveSystemRegionLock) {
470          List<RegionPlan> plans = new ArrayList<>();
471          // TODO: I don't think this code does a good job if all servers in cluster have same
472          // version. It looks like it will schedule unnecessary moves.
473          for (ServerName server : getExcludedServersForSystemTable()) {
474            if (master.getServerManager().isServerDead(server)) {
475              // TODO: See HBASE-18494 and HBASE-18495. Though getExcludedServersForSystemTable()
476              // considers only online servers, the server could be queued for dead server
477              // processing. As region assignments for crashed server is handled by
478              // ServerCrashProcedure, do NOT handle them here. The goal is to handle this through
479              // regular flow of LoadBalancer as a favored node and not to have this special
480              // handling.
481              continue;
482            }
483            List<RegionInfo> regionsShouldMove = getSystemTables(server);
484            if (!regionsShouldMove.isEmpty()) {
485              for (RegionInfo regionInfo : regionsShouldMove) {
486                // null value for dest forces destination server to be selected by balancer
487                RegionPlan plan = new RegionPlan(regionInfo, server, null);
488                if (regionInfo.isMetaRegion()) {
489                  // Must move meta region first.
490                  LOG.info("Async MOVE of {} to newer Server={}",
491                      regionInfo.getEncodedName(), server);
492                  moveAsync(plan);
493                } else {
494                  plans.add(plan);
495                }
496              }
497            }
498            for (RegionPlan plan : plans) {
499              LOG.info("Async MOVE of {} to newer Server={}",
500                  plan.getRegionInfo().getEncodedName(), server);
501              moveAsync(plan);
502            }
503          }
504        }
505      } catch (Throwable t) {
506        LOG.error(t.toString(), t);
507      }
508    }).start();
509  }
510
511  private List<RegionInfo> getSystemTables(ServerName serverName) {
512    Set<RegionStateNode> regions = this.getRegionStates().getServerNode(serverName).getRegions();
513    if (regions == null) {
514      return new ArrayList<>();
515    }
516    return regions.stream()
517        .map(RegionStateNode::getRegionInfo)
518        .filter(r -> r.getTable().isSystemTable())
519        .collect(Collectors.toList());
520  }
521
522  public void assign(final RegionInfo regionInfo, ServerName sn) throws IOException {
523    AssignProcedure proc = createAssignProcedure(regionInfo, sn);
524    ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc);
525  }
526
527  public void assign(final RegionInfo regionInfo) throws IOException {
528    AssignProcedure proc = createAssignProcedure(regionInfo);
529    ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc);
530  }
531
532  public void unassign(final RegionInfo regionInfo) throws IOException {
533    unassign(regionInfo, false);
534  }
535
536  public void unassign(final RegionInfo regionInfo, final boolean forceNewPlan)
537  throws IOException {
538    // TODO: rename this reassign
539    RegionStateNode node = this.regionStates.getRegionStateNode(regionInfo);
540    ServerName destinationServer = node.getRegionLocation();
541    if (destinationServer == null) {
542      throw new UnexpectedStateException("DestinationServer is null; Assigned? " + node.toString());
543    }
544    assert destinationServer != null; node.toString();
545    UnassignProcedure proc = createUnassignProcedure(regionInfo, destinationServer, forceNewPlan);
546    ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc);
547  }
548
549  public void move(final RegionInfo regionInfo) throws IOException {
550    RegionStateNode node = this.regionStates.getRegionStateNode(regionInfo);
551    ServerName sourceServer = node.getRegionLocation();
552    RegionPlan plan = new RegionPlan(regionInfo, sourceServer, null);
553    MoveRegionProcedure proc = createMoveRegionProcedure(plan);
554    ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc);
555  }
556
557  public Future<byte[]> moveAsync(final RegionPlan regionPlan) throws HBaseIOException {
558    MoveRegionProcedure proc = createMoveRegionProcedure(regionPlan);
559    return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), proc);
560  }
561
562  // ============================================================================================
563  //  RegionTransition procedures helpers
564  // ============================================================================================
565
566  /**
567   * Create round-robin assigns. Use on table creation to distribute out regions across cluster.
568   * @return AssignProcedures made out of the passed in <code>hris</code> and a call
569   * to the balancer to populate the assigns with targets chosen using round-robin (default
570   * balancer scheme). If at assign-time, the target chosen is no longer up, thats fine,
571   * the AssignProcedure will ask the balancer for a new target, and so on.
572   */
573  public AssignProcedure[] createRoundRobinAssignProcedures(final List<RegionInfo> hris) {
574    if (hris.isEmpty()) {
575      return null;
576    }
577    try {
578      // Ask the balancer to assign our regions. Pass the regions en masse. The balancer can do
579      // a better job if it has all the assignments in the one lump.
580      Map<ServerName, List<RegionInfo>> assignments = getBalancer().roundRobinAssignment(hris,
581          this.master.getServerManager().createDestinationServersList(null));
582      // Return mid-method!
583      return createAssignProcedures(assignments, hris.size());
584    } catch (HBaseIOException hioe) {
585      LOG.warn("Failed roundRobinAssignment", hioe);
586    }
587    // If an error above, fall-through to this simpler assign. Last resort.
588    return createAssignProcedures(hris);
589  }
590
591  /**
592   * Create an array of AssignProcedures w/o specifying a target server.
593   * If no target server, at assign time, we will try to use the former location of the region
594   * if one exists. This is how we 'retain' the old location across a server restart.
595   * Used by {@link ServerCrashProcedure} assigning regions on a server that has crashed (SCP is
596   * also used across a cluster-restart just-in-case to ensure we do cleanup of any old WALs or
597   * server processes).
598   */
599  public AssignProcedure[] createAssignProcedures(final List<RegionInfo> hris) {
600    if (hris.isEmpty()) {
601      return null;
602    }
603    int index = 0;
604    AssignProcedure [] procedures = new AssignProcedure[hris.size()];
605    for (RegionInfo hri : hris) {
606      // Sort the procedures so meta and system regions are first in the returned array.
607      procedures[index++] = createAssignProcedure(hri);
608    }
609    if (procedures.length > 1) {
610      // Sort the procedures so meta and system regions are first in the returned array.
611      Arrays.sort(procedures, AssignProcedure.COMPARATOR);
612    }
613    return procedures;
614  }
615
616  // Make this static for the method below where we use it typing the AssignProcedure array we
617  // return as result.
618  private static final AssignProcedure [] ASSIGN_PROCEDURE_ARRAY_TYPE = new AssignProcedure[] {};
619
620  /**
621   * @param assignments Map of assignments from which we produce an array of AssignProcedures.
622   * @param size Count of assignments to make (the caller may know the total count)
623   * @return Assignments made from the passed in <code>assignments</code>
624   */
625  private AssignProcedure[] createAssignProcedures(Map<ServerName, List<RegionInfo>> assignments,
626      int size) {
627    List<AssignProcedure> procedures = new ArrayList<>(size > 0? size: 8/*Arbitrary*/);
628    for (Map.Entry<ServerName, List<RegionInfo>> e: assignments.entrySet()) {
629      for (RegionInfo ri: e.getValue()) {
630        AssignProcedure ap = createAssignProcedure(ri, e.getKey());
631        ap.setOwner(getProcedureEnvironment().getRequestUser().getShortName());
632        procedures.add(ap);
633      }
634    }
635    if (procedures.size() > 1) {
636      // Sort the procedures so meta and system regions are first in the returned array.
637      procedures.sort(AssignProcedure.COMPARATOR);
638    }
639    return procedures.toArray(ASSIGN_PROCEDURE_ARRAY_TYPE);
640  }
641
642  // Needed for the following method so it can type the created Array we retur n
643  private static final UnassignProcedure [] UNASSIGN_PROCEDURE_ARRAY_TYPE =
644      new UnassignProcedure[0];
645
646  UnassignProcedure[] createUnassignProcedures(final Collection<RegionStateNode> nodes) {
647    if (nodes.isEmpty()) return null;
648    final List<UnassignProcedure> procs = new ArrayList<UnassignProcedure>(nodes.size());
649    for (RegionStateNode node: nodes) {
650      if (!this.regionStates.include(node, false)) continue;
651      // Look for regions that are offline/closed; i.e. already unassigned.
652      if (this.regionStates.isRegionOffline(node.getRegionInfo())) continue;
653      assert node.getRegionLocation() != null: node.toString();
654      procs.add(createUnassignProcedure(node.getRegionInfo(), node.getRegionLocation(), false));
655    }
656    return procs.toArray(UNASSIGN_PROCEDURE_ARRAY_TYPE);
657  }
658
659  /**
660   * Called by things like DisableTableProcedure to get a list of UnassignProcedure
661   * to unassign the regions of the table.
662   */
663  public AssignProcedure createAssignProcedure(final RegionInfo regionInfo) {
664    return createAssignProcedure(regionInfo, null, false);
665  }
666
667  public AssignProcedure createAssignProcedure(final RegionInfo regionInfo, boolean override) {
668    return createAssignProcedure(regionInfo, null, override);
669  }
670
671  public AssignProcedure createAssignProcedure(final RegionInfo regionInfo,
672      ServerName targetServer) {
673    return createAssignProcedure(regionInfo, targetServer, false);
674  }
675
676  public AssignProcedure createAssignProcedure(final RegionInfo regionInfo,
677      final ServerName targetServer, boolean override) {
678    AssignProcedure proc = new AssignProcedure(regionInfo, targetServer, override);
679    proc.setOwner(getProcedureEnvironment().getRequestUser().getShortName());
680    return proc;
681  }
682
683  public UnassignProcedure createUnassignProcedure(final RegionInfo regionInfo) {
684    return createUnassignProcedure(regionInfo, null, false);
685  }
686
687  public UnassignProcedure createUnassignProcedure(final RegionInfo regionInfo,
688      boolean override) {
689    return createUnassignProcedure(regionInfo, null, override);
690  }
691
692  public UnassignProcedure[] createUnassignProcedures(final TableName tableName) {
693    return createUnassignProcedures(regionStates.getTableRegionStateNodes(tableName));
694  }
695
696  UnassignProcedure createUnassignProcedure(final RegionInfo regionInfo,
697      final ServerName destinationServer, final boolean force) {
698    return createUnassignProcedure(regionInfo, destinationServer, force, false);
699  }
700
701  UnassignProcedure createUnassignProcedure(final RegionInfo regionInfo,
702      final ServerName destinationServer, final boolean force,
703      final boolean removeAfterUnassigning) {
704    // If destinationServer is null, figure it.
705    ServerName sn = destinationServer != null? destinationServer:
706        getRegionStates().getRegionState(regionInfo).getServerName();
707    assert sn != null;
708    UnassignProcedure proc = new UnassignProcedure(regionInfo, sn, force, removeAfterUnassigning);
709    proc.setOwner(getProcedureEnvironment().getRequestUser().getShortName());
710    return proc;
711  }
712
713  private MoveRegionProcedure createMoveRegionProcedure(RegionPlan plan) throws HBaseIOException {
714    if (plan.getRegionInfo().getTable().isSystemTable()) {
715      List<ServerName> exclude = getExcludedServersForSystemTable();
716      if (plan.getDestination() != null && exclude.contains(plan.getDestination())) {
717        try {
718          LOG.info("Can not move " + plan.getRegionInfo() + " to " + plan.getDestination() +
719            " because the server is not with highest version");
720          plan.setDestination(getBalancer().randomAssignment(plan.getRegionInfo(),
721            this.master.getServerManager().createDestinationServersList(exclude)));
722        } catch (HBaseIOException e) {
723          LOG.warn(e.toString(), e);
724        }
725      }
726    }
727    return new MoveRegionProcedure(getProcedureEnvironment(), plan, true);
728  }
729
730
731  public SplitTableRegionProcedure createSplitProcedure(final RegionInfo regionToSplit,
732      final byte[] splitKey) throws IOException {
733    return new SplitTableRegionProcedure(getProcedureEnvironment(), regionToSplit, splitKey);
734  }
735
736  public MergeTableRegionsProcedure createMergeProcedure(RegionInfo ... ris) throws IOException {
737    return new MergeTableRegionsProcedure(getProcedureEnvironment(), ris, false);
738  }
739
740  /**
741   * Delete the region states. This is called by "DeleteTable"
742   */
743  public void deleteTable(final TableName tableName) throws IOException {
744    final ArrayList<RegionInfo> regions = regionStates.getTableRegionsInfo(tableName);
745    regionStateStore.deleteRegions(regions);
746    for (int i = 0; i < regions.size(); ++i) {
747      final RegionInfo regionInfo = regions.get(i);
748      // we expect the region to be offline
749      regionStates.removeFromOfflineRegions(regionInfo);
750      regionStates.deleteRegion(regionInfo);
751    }
752  }
753
754  // ============================================================================================
755  //  RS Region Transition Report helpers
756  // ============================================================================================
757  // TODO: Move this code in MasterRpcServices and call on specific event?
758  public ReportRegionStateTransitionResponse reportRegionStateTransition(
759      final ReportRegionStateTransitionRequest req)
760  throws PleaseHoldException {
761    final ReportRegionStateTransitionResponse.Builder builder =
762        ReportRegionStateTransitionResponse.newBuilder();
763    final ServerName serverName = ProtobufUtil.toServerName(req.getServer());
764    try {
765      for (RegionStateTransition transition: req.getTransitionList()) {
766        switch (transition.getTransitionCode()) {
767          case OPENED:
768          case FAILED_OPEN:
769          case CLOSED:
770            assert transition.getRegionInfoCount() == 1 : transition;
771            final RegionInfo hri = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0));
772            updateRegionTransition(serverName, transition.getTransitionCode(), hri,
773                transition.hasOpenSeqNum() ? transition.getOpenSeqNum() : HConstants.NO_SEQNUM);
774            break;
775          case READY_TO_SPLIT:
776          case SPLIT:
777          case SPLIT_REVERTED:
778            assert transition.getRegionInfoCount() == 3 : transition;
779            final RegionInfo parent = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0));
780            final RegionInfo splitA = ProtobufUtil.toRegionInfo(transition.getRegionInfo(1));
781            final RegionInfo splitB = ProtobufUtil.toRegionInfo(transition.getRegionInfo(2));
782            updateRegionSplitTransition(serverName, transition.getTransitionCode(),
783              parent, splitA, splitB);
784            break;
785          case READY_TO_MERGE:
786          case MERGED:
787          case MERGE_REVERTED:
788            assert transition.getRegionInfoCount() == 3 : transition;
789            final RegionInfo merged = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0));
790            final RegionInfo mergeA = ProtobufUtil.toRegionInfo(transition.getRegionInfo(1));
791            final RegionInfo mergeB = ProtobufUtil.toRegionInfo(transition.getRegionInfo(2));
792            updateRegionMergeTransition(serverName, transition.getTransitionCode(),
793              merged, mergeA, mergeB);
794            break;
795        }
796      }
797    } catch (PleaseHoldException e) {
798      if (LOG.isTraceEnabled()) LOG.trace("Failed transition " + e.getMessage());
799      throw e;
800    } catch (UnsupportedOperationException|IOException e) {
801      // TODO: at the moment we have a single error message and the RS will abort
802      // if the master says that one of the region transitions failed.
803      LOG.warn("Failed transition", e);
804      builder.setErrorMessage("Failed transition " + e.getMessage());
805    }
806    return builder.build();
807  }
808
809  /**
810   * Called when the RegionServer wants to report a Procedure transition.
811   * Ends up calling {@link #reportTransition(RegionStateNode, ServerName, TransitionCode, long)}
812   */
813  private void updateRegionTransition(final ServerName serverName, final TransitionCode state,
814      final RegionInfo regionInfo, final long seqId)
815      throws PleaseHoldException, UnexpectedStateException {
816    checkMetaLoaded(regionInfo);
817
818    final RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo);
819    if (regionNode == null) {
820      // the table/region is gone. maybe a delete, split, merge
821      throw new UnexpectedStateException(String.format(
822        "Server %s was trying to transition region %s to %s. but the region was removed.",
823        serverName, regionInfo, state));
824    }
825
826    if (LOG.isTraceEnabled()) {
827      LOG.trace(String.format("Update region transition serverName=%s region=%s regionState=%s",
828        serverName, regionNode, state));
829    }
830
831    if (!reportTransition(regionNode, serverName, state, seqId)) {
832      // Don't log WARN if shutting down cluster; during shutdown. Avoid the below messages:
833      // 2018-08-13 10:45:10,551 WARN ...AssignmentManager: No matching procedure found for
834      //   rit=OPEN, location=ve0538.halxg.cloudera.com,16020,1533493000958,
835      //   table=IntegrationTestBigLinkedList, region=65ab289e2fc1530df65f6c3d7cde7aa5 transition
836      //   to CLOSED
837      // These happen because on cluster shutdown, we currently let the RegionServers close
838      // regions. This is the only time that region close is not run by the Master (so cluster
839      // goes down fast). Consider changing it so Master runs all shutdowns.
840      if (this.master.getServerManager().isClusterShutdown() &&
841          state.equals(TransitionCode.CLOSED)) {
842        LOG.info("RegionServer {} {}", state, regionNode.getRegionInfo().getEncodedName());
843      } else {
844        LOG.warn("No matching procedure found for {} transition to {}", regionNode, state);
845      }
846    }
847  }
848
849  // FYI: regionNode is sometimes synchronized by the caller but not always.
850  private boolean reportTransition(final RegionStateNode regionNode, ServerName serverName,
851      final TransitionCode state, final long seqId)
852      throws UnexpectedStateException {
853    synchronized (regionNode) {
854      final RegionTransitionProcedure proc = regionNode.getProcedure();
855      if (proc == null) return false;
856
857      // serverNode.getReportEvent().removeProcedure(proc);
858      proc.reportTransition(master.getMasterProcedureExecutor().getEnvironment(),
859        serverName, state, seqId);
860    }
861    return true;
862  }
863
864  private void updateRegionSplitTransition(final ServerName serverName, final TransitionCode state,
865      final RegionInfo parent, final RegionInfo hriA, final RegionInfo hriB)
866      throws IOException {
867    checkMetaLoaded(parent);
868
869    if (state != TransitionCode.READY_TO_SPLIT) {
870      throw new UnexpectedStateException("unsupported split regionState=" + state +
871        " for parent region " + parent +
872        " maybe an old RS (< 2.0) had the operation in progress");
873    }
874
875    // sanity check on the request
876    if (!Bytes.equals(hriA.getEndKey(), hriB.getStartKey())) {
877      throw new UnsupportedOperationException(
878        "unsupported split request with bad keys: parent=" + parent +
879        " hriA=" + hriA + " hriB=" + hriB);
880    }
881
882    // Submit the Split procedure
883    final byte[] splitKey = hriB.getStartKey();
884    if (LOG.isDebugEnabled()) {
885      LOG.debug("Split request from " + serverName +
886          ", parent=" + parent + " splitKey=" + Bytes.toStringBinary(splitKey));
887    }
888    master.getMasterProcedureExecutor().submitProcedure(createSplitProcedure(parent, splitKey));
889
890    // If the RS is < 2.0 throw an exception to abort the operation, we are handling the split
891    if (master.getServerManager().getServerVersion(serverName) < 0x0200000) {
892      throw new UnsupportedOperationException(String.format(
893        "Split handled by the master: parent=%s hriA=%s hriB=%s", parent.getShortNameToLog(), hriA, hriB));
894    }
895  }
896
897  private void updateRegionMergeTransition(final ServerName serverName, final TransitionCode state,
898      final RegionInfo merged, final RegionInfo hriA, final RegionInfo hriB) throws IOException {
899    checkMetaLoaded(merged);
900
901    if (state != TransitionCode.READY_TO_MERGE) {
902      throw new UnexpectedStateException("Unsupported merge regionState=" + state +
903        " for regionA=" + hriA + " regionB=" + hriB + " merged=" + merged +
904        " maybe an old RS (< 2.0) had the operation in progress");
905    }
906
907    // Submit the Merge procedure
908    if (LOG.isDebugEnabled()) {
909      LOG.debug("Handling merge request from RS=" + merged + ", merged=" + merged);
910    }
911    master.getMasterProcedureExecutor().submitProcedure(createMergeProcedure(hriA, hriB));
912
913    // If the RS is < 2.0 throw an exception to abort the operation, we are handling the merge
914    if (master.getServerManager().getServerVersion(serverName) < 0x0200000) {
915      throw new UnsupportedOperationException(String.format(
916        "Merge not handled yet: regionState=%s merged=%s hriA=%s hriB=%s", state, merged, hriA,
917          hriB));
918    }
919  }
920
921  // ============================================================================================
922  //  RS Status update (report online regions) helpers
923  // ============================================================================================
924  /**
925   * the master will call this method when the RS send the regionServerReport().
926   * the report will contains the "online regions".
927   * this method will check the the online regions against the in-memory state of the AM,
928   * if there is a mismatch we will try to fence out the RS with the assumption
929   * that something went wrong on the RS side.
930   */
931  public void reportOnlineRegions(final ServerName serverName, final Set<byte[]> regionNames)
932      throws YouAreDeadException {
933    if (!isRunning()) return;
934    if (LOG.isTraceEnabled()) {
935      LOG.trace("ReportOnlineRegions " + serverName + " regionCount=" + regionNames.size() +
936        ", metaLoaded=" + isMetaLoaded() + " " +
937          regionNames.stream().map(element -> Bytes.toStringBinary(element)).
938            collect(Collectors.toList()));
939    }
940
941    // Make sure there is a ServerStateNode for this server that just checked in.
942    ServerStateNode serverNode = regionStates.getOrCreateServer(serverName);
943    synchronized (serverNode) {
944      if (!serverNode.isInState(ServerState.ONLINE)) {
945        LOG.warn("Got a report from a server result in state " + serverNode.getState());
946        return;
947      }
948    }
949
950    // Track the regionserver reported online regions in memory.
951    synchronized (rsReports) {
952      rsReports.put(serverName, regionNames);
953    }
954
955    if (regionNames.isEmpty()) {
956      // nothing to do if we don't have regions
957      LOG.trace("no online region found on " + serverName);
958    } else if (!isMetaLoaded()) {
959      // if we are still on startup, discard the report unless is from someone holding meta
960      checkOnlineRegionsReportForMeta(serverName, regionNames);
961    } else {
962      // The Heartbeat updates us of what regions are only. check and verify the state.
963      checkOnlineRegionsReport(serverNode, regionNames);
964    }
965
966    // wake report event
967    wakeServerReportEvent(serverNode);
968  }
969
970  void checkOnlineRegionsReportForMeta(final ServerName serverName, final Set<byte[]> regionNames) {
971    try {
972      for (byte[] regionName: regionNames) {
973        final RegionInfo hri = getMetaRegionFromName(regionName);
974        if (hri == null) {
975          if (LOG.isTraceEnabled()) {
976            LOG.trace("Skip online report for region=" + Bytes.toStringBinary(regionName) +
977              " while meta is loading");
978          }
979          continue;
980        }
981
982        final RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(hri);
983        LOG.info("META REPORTED: " + regionNode);
984        if (!reportTransition(regionNode, serverName, TransitionCode.OPENED, 0)) {
985          LOG.warn("META REPORTED but no procedure found (complete?); set location={}", serverName);
986          regionNode.setRegionLocation(serverName);
987        } else if (LOG.isTraceEnabled()) {
988          LOG.trace("META REPORTED: " + regionNode);
989        }
990      }
991    } catch (UnexpectedStateException e) {
992      LOG.warn("KILLING " + serverName + ": " + e.getMessage());
993      killRegionServer(serverName);
994    }
995  }
996
997  void checkOnlineRegionsReport(final ServerStateNode serverNode, final Set<byte[]> regionNames) {
998    final ServerName serverName = serverNode.getServerName();
999    try {
1000      for (byte[] regionName: regionNames) {
1001        if (!isRunning()) return;
1002        final RegionStateNode regionNode = regionStates.getRegionStateNodeFromName(regionName);
1003        if (regionNode == null) {
1004          throw new UnexpectedStateException("Not online: " + Bytes.toStringBinary(regionName));
1005        }
1006        synchronized (regionNode) {
1007          if (regionNode.isInState(State.OPENING, State.OPEN)) {
1008            if (!regionNode.getRegionLocation().equals(serverName)) {
1009              throw new UnexpectedStateException(regionNode.toString() +
1010                " reported OPEN on server=" + serverName +
1011                " but state has otherwise.");
1012            } else if (regionNode.isInState(State.OPENING)) {
1013              try {
1014                if (!reportTransition(regionNode, serverNode.getServerName(),
1015                    TransitionCode.OPENED, 0)) {
1016                  LOG.warn(regionNode.toString() + " reported OPEN on server=" + serverName +
1017                    " but state has otherwise AND NO procedure is running");
1018                }
1019              } catch (UnexpectedStateException e) {
1020                LOG.warn(regionNode.toString() + " reported unexpteced OPEN: " + e.getMessage(), e);
1021              }
1022            }
1023          } else if (!regionNode.isInState(State.CLOSING, State.SPLITTING)) {
1024            long diff = regionNode.getLastUpdate() - EnvironmentEdgeManager.currentTime();
1025            if (diff > 1000/*One Second... make configurable if an issue*/) {
1026              // So, we can get report that a region is CLOSED or SPLIT because a heartbeat
1027              // came in at about same time as a region transition. Make sure there is some
1028              // elapsed time between killing remote server.
1029              throw new UnexpectedStateException(regionNode.toString() +
1030                " reported an unexpected OPEN; time since last update=" + diff);
1031            }
1032          }
1033        }
1034      }
1035    } catch (UnexpectedStateException e) {
1036      //See HBASE-21421, we can count on reportRegionStateTransition calls
1037      //We only log a warming here. It could be a network lag.
1038      LOG.warn("Failed to checkOnlineRegionsReport, maybe due to network lag, "
1039          + "if this message continues, be careful of double assign", e);
1040    }
1041  }
1042
1043  protected boolean waitServerReportEvent(ServerName serverName, Procedure<?> proc) {
1044    ServerStateNode ssn = this.regionStates.getServerNode(serverName);
1045    if (ssn == null) {
1046      LOG.warn("Why is ServerStateNode for {} empty at this point? Creating...", serverName);
1047      ssn = this.regionStates.getOrCreateServer(serverName);
1048    }
1049    return ssn.getReportEvent().suspendIfNotReady(proc);
1050  }
1051
1052  protected void wakeServerReportEvent(final ServerStateNode serverNode) {
1053    serverNode.getReportEvent().wake(getProcedureScheduler());
1054  }
1055
1056  // ============================================================================================
1057  //  RIT chore
1058  // ============================================================================================
1059  private static class RegionInTransitionChore extends ProcedureInMemoryChore<MasterProcedureEnv> {
1060    public RegionInTransitionChore(final int timeoutMsec) {
1061      super(timeoutMsec);
1062    }
1063
1064    @Override
1065    protected void periodicExecute(final MasterProcedureEnv env) {
1066      final AssignmentManager am = env.getAssignmentManager();
1067
1068      final RegionInTransitionStat ritStat = am.computeRegionInTransitionStat();
1069      if (ritStat.hasRegionsOverThreshold()) {
1070        for (RegionState hri: ritStat.getRegionOverThreshold()) {
1071          am.handleRegionOverStuckWarningThreshold(hri.getRegion());
1072        }
1073      }
1074
1075      // update metrics
1076      am.updateRegionsInTransitionMetrics(ritStat);
1077    }
1078  }
1079
1080  public RegionInTransitionStat computeRegionInTransitionStat() {
1081    final RegionInTransitionStat rit = new RegionInTransitionStat(getConfiguration());
1082    rit.update(this);
1083    return rit;
1084  }
1085
1086  public static class RegionInTransitionStat {
1087    private final int ritThreshold;
1088
1089    private HashMap<String, RegionState> ritsOverThreshold = null;
1090    private long statTimestamp;
1091    private long oldestRITTime = 0;
1092    private int totalRITsTwiceThreshold = 0;
1093    private int totalRITs = 0;
1094
1095    @VisibleForTesting
1096    public RegionInTransitionStat(final Configuration conf) {
1097      this.ritThreshold =
1098        conf.getInt(METRICS_RIT_STUCK_WARNING_THRESHOLD, DEFAULT_RIT_STUCK_WARNING_THRESHOLD);
1099    }
1100
1101    public int getRITThreshold() {
1102      return ritThreshold;
1103    }
1104
1105    public long getTimestamp() {
1106      return statTimestamp;
1107    }
1108
1109    public int getTotalRITs() {
1110      return totalRITs;
1111    }
1112
1113    public long getOldestRITTime() {
1114      return oldestRITTime;
1115    }
1116
1117    public int getTotalRITsOverThreshold() {
1118      Map<String, RegionState> m = this.ritsOverThreshold;
1119      return m != null ? m.size() : 0;
1120    }
1121
1122    public boolean hasRegionsTwiceOverThreshold() {
1123      return totalRITsTwiceThreshold > 0;
1124    }
1125
1126    public boolean hasRegionsOverThreshold() {
1127      Map<String, RegionState> m = this.ritsOverThreshold;
1128      return m != null && !m.isEmpty();
1129    }
1130
1131    public Collection<RegionState> getRegionOverThreshold() {
1132      Map<String, RegionState> m = this.ritsOverThreshold;
1133      return m != null? m.values(): Collections.emptySet();
1134    }
1135
1136    public boolean isRegionOverThreshold(final RegionInfo regionInfo) {
1137      Map<String, RegionState> m = this.ritsOverThreshold;
1138      return m != null && m.containsKey(regionInfo.getEncodedName());
1139    }
1140
1141    public boolean isRegionTwiceOverThreshold(final RegionInfo regionInfo) {
1142      Map<String, RegionState> m = this.ritsOverThreshold;
1143      if (m == null) return false;
1144      final RegionState state = m.get(regionInfo.getEncodedName());
1145      if (state == null) return false;
1146      return (statTimestamp - state.getStamp()) > (ritThreshold * 2);
1147    }
1148
1149    protected void update(final AssignmentManager am) {
1150      final RegionStates regionStates = am.getRegionStates();
1151      this.statTimestamp = EnvironmentEdgeManager.currentTime();
1152      update(regionStates.getRegionsStateInTransition(), statTimestamp);
1153      update(regionStates.getRegionFailedOpen(), statTimestamp);
1154    }
1155
1156    private void update(final Collection<RegionState> regions, final long currentTime) {
1157      for (RegionState state: regions) {
1158        totalRITs++;
1159        final long ritTime = currentTime - state.getStamp();
1160        if (ritTime > ritThreshold) {
1161          if (ritsOverThreshold == null) {
1162            ritsOverThreshold = new HashMap<String, RegionState>();
1163          }
1164          ritsOverThreshold.put(state.getRegion().getEncodedName(), state);
1165          totalRITsTwiceThreshold += (ritTime > (ritThreshold * 2)) ? 1 : 0;
1166        }
1167        if (oldestRITTime < ritTime) {
1168          oldestRITTime = ritTime;
1169        }
1170      }
1171    }
1172  }
1173
1174  private void updateRegionsInTransitionMetrics(final RegionInTransitionStat ritStat) {
1175    metrics.updateRITOldestAge(ritStat.getOldestRITTime());
1176    metrics.updateRITCount(ritStat.getTotalRITs());
1177    metrics.updateRITCountOverThreshold(ritStat.getTotalRITsOverThreshold());
1178  }
1179
1180  private void handleRegionOverStuckWarningThreshold(final RegionInfo regionInfo) {
1181    final RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo);
1182    //if (regionNode.isStuck()) {
1183    LOG.warn("STUCK Region-In-Transition {}", regionNode);
1184  }
1185
1186  // ============================================================================================
1187  //  TODO: Master load/bootstrap
1188  // ============================================================================================
1189  public void joinCluster() throws IOException {
1190    long startTime = System.nanoTime();
1191    LOG.debug("Joining cluster...");
1192
1193    // Scan hbase:meta to build list of existing regions, servers, and assignment.
1194    // hbase:meta is online now or will be. Inside loadMeta, we keep trying. Can't make progress
1195    // w/o  meta.
1196    loadMeta();
1197
1198    while (master.getServerManager().countOfRegionServers() < 1) {
1199      LOG.info("Waiting for RegionServers to join; current count={}",
1200        master.getServerManager().countOfRegionServers());
1201      Threads.sleep(250);
1202    }
1203    LOG.info("Number of RegionServers={}", master.getServerManager().countOfRegionServers());
1204
1205    // Start the RIT chore
1206    master.getMasterProcedureExecutor().addChore(this.ritChore);
1207
1208    long costMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
1209    LOG.info("Joined the cluster in {}", StringUtils.humanTimeDiff(costMs));
1210  }
1211
1212  /**
1213   * Create assign procedure for offline regions.
1214   * Just follow the old processofflineServersWithOnlineRegions method. Since now we do not need to
1215   * deal with dead server any more, we only deal with the regions in OFFLINE state in this method.
1216   * And this is a bit strange, that for new regions, we will add it in CLOSED state instead of
1217   * OFFLINE state, and usually there will be a procedure to track them. The
1218   * processofflineServersWithOnlineRegions is a legacy from long ago, as things are going really
1219   * different now, maybe we do not need this method any more. Need to revisit later.
1220   */
1221  // Public so can be run by the Master as part of the startup. Needs hbase:meta to be online.
1222  // Needs to be done after the table state manager has been started.
1223  public void processOfflineRegions() {
1224    List<RegionInfo> offlineRegions = regionStates.getRegionStates().stream()
1225      .filter(RegionState::isOffline).filter(s -> isTableEnabled(s.getRegion().getTable()))
1226      .map(RegionState::getRegion).collect(Collectors.toList());
1227    if (!offlineRegions.isEmpty()) {
1228      master.getMasterProcedureExecutor().submitProcedures(
1229        master.getAssignmentManager().createRoundRobinAssignProcedures(offlineRegions));
1230    }
1231  }
1232
1233  private void loadMeta() throws IOException {
1234    // TODO: use a thread pool
1235    regionStateStore.visitMeta(new RegionStateStore.RegionStateVisitor() {
1236      @Override
1237      public void visitRegionState(Result result, final RegionInfo regionInfo, final State state,
1238          final ServerName regionLocation, final ServerName lastHost, final long openSeqNum) {
1239        if (state == null && regionLocation == null && lastHost == null &&
1240            openSeqNum == SequenceId.NO_SEQUENCE_ID) {
1241          // This is a row with nothing in it.
1242          LOG.warn("Skipping empty row={}", result);
1243          return;
1244        }
1245        State localState = state;
1246        if (localState == null) {
1247          // No region state column data in hbase:meta table! Are I doing a rolling upgrade from
1248          // hbase1 to hbase2? Am I restoring a SNAPSHOT or otherwise adding a region to hbase:meta?
1249          // In any of these cases, state is empty. For now, presume OFFLINE but there are probably
1250          // cases where we need to probe more to be sure this correct; TODO informed by experience.
1251          LOG.info(regionInfo.getEncodedName() + " regionState=null; presuming " + State.OFFLINE);
1252
1253          localState = State.OFFLINE;
1254        }
1255        final RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo);
1256        synchronized (regionNode) {
1257          if (!regionNode.isInTransition()) {
1258            regionNode.setState(localState);
1259            regionNode.setLastHost(lastHost);
1260            regionNode.setRegionLocation(regionLocation);
1261            regionNode.setOpenSeqNum(openSeqNum);
1262            if (localState == State.OPEN) {
1263              assert regionLocation != null : "found null region location for " + regionNode;
1264              regionStates.getOrCreateServer(regionNode.getRegionLocation());
1265              regionStates.addRegionToServer(regionNode);
1266            } else if (localState == State.OFFLINE || regionInfo.isOffline()) {
1267              regionStates.addToOfflineRegions(regionNode);
1268            } else if (localState == State.CLOSED && getTableStateManager().
1269                isTableState(regionNode.getTable(), TableState.State.DISABLED,
1270                TableState.State.DISABLING)) {
1271              // The region is CLOSED and the table is DISABLED/ DISABLING, there is nothing to
1272              // schedule; the region is inert.
1273            } else {
1274              // This is region in CLOSING or OPENING state.
1275              // These regions should have a procedure in replay.
1276              // If they don't, then they will show as STUCK after a while because of the below
1277              // registration and will need intervention by operator to fix. Add them to a server
1278              // even if the server is not around so a SCP cleans them up.
1279              if (regionLocation != null) {
1280                regionStates.getOrCreateServer(regionLocation);
1281              }
1282              regionStates.addRegionInTransition(regionNode, null);
1283            }
1284          } else {
1285            LOG.info("RIT {}", regionNode);
1286          }
1287        }
1288      }
1289    });
1290
1291    // every assignment is blocked until meta is loaded.
1292    wakeMetaLoadedEvent();
1293  }
1294
1295  /**
1296   * Used to check if the meta loading is done.
1297   * <p/>
1298   * if not we throw PleaseHoldException since we are rebuilding the RegionStates
1299   * @param hri region to check if it is already rebuild
1300   * @throws PleaseHoldException if meta has not been loaded yet
1301   */
1302  private void checkMetaLoaded(RegionInfo hri) throws PleaseHoldException {
1303    if (!isRunning()) {
1304      throw new PleaseHoldException("AssignmentManager not running");
1305    }
1306    boolean meta = isMetaRegion(hri);
1307    boolean metaLoaded = isMetaLoaded();
1308    if (!meta && !metaLoaded) {
1309      throw new PleaseHoldException(
1310        "Master not fully online; hbase:meta=" + meta + ", metaLoaded=" + metaLoaded);
1311    }
1312  }
1313
1314  // ============================================================================================
1315  //  TODO: Metrics
1316  // ============================================================================================
1317  public int getNumRegionsOpened() {
1318    // TODO: Used by TestRegionPlacement.java and assume monotonically increasing value
1319    return 0;
1320  }
1321
1322  public long submitServerCrash(final ServerName serverName, final boolean shouldSplitWal) {
1323    boolean carryingMeta = isCarryingMeta(serverName);
1324
1325    // Remove the in-memory rsReports result
1326    synchronized (rsReports) {
1327      rsReports.remove(serverName);
1328    }
1329
1330    ProcedureExecutor<MasterProcedureEnv> procExec = this.master.getMasterProcedureExecutor();
1331    long pid = procExec.submitProcedure(new ServerCrashProcedure(procExec.getEnvironment(),
1332        serverName, shouldSplitWal, carryingMeta));
1333    LOG.debug("Added=" + serverName
1334        + " to dead servers, submitted shutdown handler to be executed meta=" + carryingMeta);
1335    return pid;
1336  }
1337
1338  public void offlineRegion(final RegionInfo regionInfo) {
1339    // TODO used by MasterRpcServices ServerCrashProcedure
1340    final RegionStateNode node = regionStates.getRegionStateNode(regionInfo);
1341    if (node != null) node.offline();
1342  }
1343
1344  public void onlineRegion(final RegionInfo regionInfo, final ServerName serverName) {
1345    // TODO used by TestSplitTransactionOnCluster.java
1346  }
1347
1348  public Map<ServerName, List<RegionInfo>> getSnapShotOfAssignment(
1349      final Collection<RegionInfo> regions) {
1350    return regionStates.getSnapShotOfAssignment(regions);
1351  }
1352
1353  // ============================================================================================
1354  //  TODO: UTILS/HELPERS?
1355  // ============================================================================================
1356  /**
1357   * Used by the client (via master) to identify if all regions have the schema updates
1358   *
1359   * @param tableName
1360   * @return Pair indicating the status of the alter command (pending/total)
1361   * @throws IOException
1362   */
1363  public Pair<Integer, Integer> getReopenStatus(TableName tableName) {
1364    if (isTableDisabled(tableName)) return new Pair<Integer, Integer>(0, 0);
1365
1366    final List<RegionState> states = regionStates.getTableRegionStates(tableName);
1367    int ritCount = 0;
1368    for (RegionState regionState: states) {
1369      if (!regionState.isOpened() && !regionState.isSplit()) {
1370        ritCount++;
1371      }
1372    }
1373    return new Pair<Integer, Integer>(ritCount, states.size());
1374  }
1375
1376  // ============================================================================================
1377  //  TODO: Region State In Transition
1378  // ============================================================================================
1379  protected boolean addRegionInTransition(final RegionStateNode regionNode,
1380      final RegionTransitionProcedure procedure) {
1381    return regionStates.addRegionInTransition(regionNode, procedure);
1382  }
1383
1384  protected void removeRegionInTransition(final RegionStateNode regionNode,
1385      final RegionTransitionProcedure procedure) {
1386    regionStates.removeRegionInTransition(regionNode, procedure);
1387  }
1388
1389  public boolean hasRegionsInTransition() {
1390    return regionStates.hasRegionsInTransition();
1391  }
1392
1393  public List<RegionStateNode> getRegionsInTransition() {
1394    return regionStates.getRegionsInTransition();
1395  }
1396
1397  public List<RegionInfo> getAssignedRegions() {
1398    return regionStates.getAssignedRegions();
1399  }
1400
1401  public RegionInfo getRegionInfo(final byte[] regionName) {
1402    final RegionStateNode regionState = regionStates.getRegionStateNodeFromName(regionName);
1403    return regionState != null ? regionState.getRegionInfo() : null;
1404  }
1405
1406  // ============================================================================================
1407  //  TODO: Region Status update
1408  // ============================================================================================
1409  private void sendRegionOpenedNotification(final RegionInfo regionInfo,
1410      final ServerName serverName) {
1411    getBalancer().regionOnline(regionInfo, serverName);
1412    if (!this.listeners.isEmpty()) {
1413      for (AssignmentListener listener : this.listeners) {
1414        listener.regionOpened(regionInfo, serverName);
1415      }
1416    }
1417  }
1418
1419  private void sendRegionClosedNotification(final RegionInfo regionInfo) {
1420    getBalancer().regionOffline(regionInfo);
1421    if (!this.listeners.isEmpty()) {
1422      for (AssignmentListener listener : this.listeners) {
1423        listener.regionClosed(regionInfo);
1424      }
1425    }
1426  }
1427
1428  public void markRegionAsOpening(final RegionStateNode regionNode) throws IOException {
1429    synchronized (regionNode) {
1430      regionNode.transitionState(State.OPENING, RegionStates.STATES_EXPECTED_ON_OPEN);
1431      regionStates.addRegionToServer(regionNode);
1432      regionStateStore.updateRegionLocation(regionNode);
1433    }
1434
1435    // update the operation count metrics
1436    metrics.incrementOperationCounter();
1437  }
1438
1439  public void undoRegionAsOpening(final RegionStateNode regionNode) {
1440    boolean opening = false;
1441    synchronized (regionNode) {
1442      if (regionNode.isInState(State.OPENING)) {
1443        opening = true;
1444        regionStates.removeRegionFromServer(regionNode.getRegionLocation(), regionNode);
1445      }
1446      // Should we update hbase:meta?
1447    }
1448    if (opening) {
1449      // TODO: Metrics. Do opposite of metrics.incrementOperationCounter();
1450    }
1451  }
1452
1453  public void markRegionAsOpened(final RegionStateNode regionNode) throws IOException {
1454    final RegionInfo hri = regionNode.getRegionInfo();
1455    synchronized (regionNode) {
1456      regionNode.transitionState(State.OPEN, RegionStates.STATES_EXPECTED_ON_OPEN);
1457      if (isMetaRegion(hri)) {
1458        // Usually we'd set a table ENABLED at this stage but hbase:meta is ALWAYs enabled, it
1459        // can't be disabled -- so skip the RPC (besides... enabled is managed by TableStateManager
1460        // which is backed by hbase:meta... Avoid setting ENABLED to avoid having to update state
1461        // on table that contains state.
1462        setMetaAssigned(hri, true);
1463      }
1464      regionStates.addRegionToServer(regionNode);
1465      // TODO: OPENING Updates hbase:meta too... we need to do both here and there?
1466      // That is a lot of hbase:meta writing.
1467      regionStateStore.updateRegionLocation(regionNode);
1468      sendRegionOpenedNotification(hri, regionNode.getRegionLocation());
1469    }
1470  }
1471
1472  public void markRegionAsClosing(final RegionStateNode regionNode) throws IOException {
1473    final RegionInfo hri = regionNode.getRegionInfo();
1474    synchronized (regionNode) {
1475      regionNode.transitionState(State.CLOSING, RegionStates.STATES_EXPECTED_ON_CLOSE);
1476      // Set meta has not initialized early. so people trying to create/edit tables will wait
1477      if (isMetaRegion(hri)) {
1478        setMetaAssigned(hri, false);
1479      }
1480      regionStates.addRegionToServer(regionNode);
1481      regionStateStore.updateRegionLocation(regionNode);
1482    }
1483
1484    // update the operation count metrics
1485    metrics.incrementOperationCounter();
1486  }
1487
1488  public void undoRegionAsClosing(final RegionStateNode regionNode) {
1489    // TODO: Metrics. Do opposite of metrics.incrementOperationCounter();
1490    // There is nothing to undo?
1491  }
1492
1493  public void markRegionAsClosed(final RegionStateNode regionNode) throws IOException {
1494    final RegionInfo hri = regionNode.getRegionInfo();
1495    synchronized (regionNode) {
1496      regionNode.transitionState(State.CLOSED, RegionStates.STATES_EXPECTED_ON_CLOSE);
1497      regionStates.removeRegionFromServer(regionNode.getRegionLocation(), regionNode);
1498      regionNode.setLastHost(regionNode.getRegionLocation());
1499      regionNode.setRegionLocation(null);
1500      regionStateStore.updateRegionLocation(regionNode);
1501      sendRegionClosedNotification(hri);
1502    }
1503  }
1504
1505  public void markRegionAsSplit(final RegionInfo parent, final ServerName serverName,
1506      final RegionInfo daughterA, final RegionInfo daughterB)
1507  throws IOException {
1508    // Update hbase:meta. Parent will be marked offline and split up in hbase:meta.
1509    // The parent stays in regionStates until cleared when removed by CatalogJanitor.
1510    // Update its state in regionStates to it shows as offline and split when read
1511    // later figuring what regions are in a table and what are not: see
1512    // regionStates#getRegionsOfTable
1513    final RegionStateNode node = regionStates.getOrCreateRegionStateNode(parent);
1514    node.setState(State.SPLIT);
1515    final RegionStateNode nodeA = regionStates.getOrCreateRegionStateNode(daughterA);
1516    nodeA.setState(State.SPLITTING_NEW);
1517    final RegionStateNode nodeB = regionStates.getOrCreateRegionStateNode(daughterB);
1518    nodeB.setState(State.SPLITTING_NEW);
1519
1520    regionStateStore.splitRegion(parent, daughterA, daughterB, serverName);
1521    if (shouldAssignFavoredNodes(parent)) {
1522      List<ServerName> onlineServers = this.master.getServerManager().getOnlineServersList();
1523      ((FavoredNodesPromoter)getBalancer()).
1524          generateFavoredNodesForDaughter(onlineServers, parent, daughterA, daughterB);
1525    }
1526  }
1527
1528  /**
1529   * When called here, the merge has happened. The merged regions have been
1530   * unassigned and the above markRegionClosed has been called on each so they have been
1531   * disassociated from a hosting Server. The merged region will be open after this call. The
1532   * merged regions are removed from hbase:meta below. Later they are deleted from the filesystem
1533   * by the catalog janitor running against hbase:meta. It notices when the merged region no
1534   * longer holds references to the old regions (References are deleted after a compaction
1535   * rewrites what the Reference points at but not until the archiver chore runs, are the
1536   * References removed).
1537   */
1538  public void markRegionAsMerged(final RegionInfo child, final ServerName serverName,
1539        RegionInfo [] mergeParents)
1540      throws IOException {
1541    final RegionStateNode node = regionStates.getOrCreateRegionStateNode(child);
1542    node.setState(State.MERGED);
1543    for (RegionInfo ri: mergeParents) {
1544      regionStates.deleteRegion(ri);
1545
1546    }
1547    regionStateStore.mergeRegions(child, mergeParents, serverName);
1548    if (shouldAssignFavoredNodes(child)) {
1549      ((FavoredNodesPromoter)getBalancer()).
1550        generateFavoredNodesForMergedRegion(child, mergeParents);
1551    }
1552  }
1553
1554  /*
1555   * Favored nodes should be applied only when FavoredNodes balancer is configured and the region
1556   * belongs to a non-system table.
1557   */
1558  private boolean shouldAssignFavoredNodes(RegionInfo region) {
1559    return this.shouldAssignRegionsWithFavoredNodes &&
1560        FavoredNodesManager.isFavoredNodeApplicable(region);
1561  }
1562
1563  // ============================================================================================
1564  //  Assign Queue (Assign/Balance)
1565  // ============================================================================================
1566  private final ArrayList<RegionStateNode> pendingAssignQueue = new ArrayList<RegionStateNode>();
1567  private final ReentrantLock assignQueueLock = new ReentrantLock();
1568  private final Condition assignQueueFullCond = assignQueueLock.newCondition();
1569
1570  /**
1571   * Add the assign operation to the assignment queue.
1572   * The pending assignment operation will be processed,
1573   * and each region will be assigned by a server using the balancer.
1574   */
1575  protected void queueAssign(final RegionStateNode regionNode) {
1576    regionNode.getProcedureEvent().suspend();
1577
1578    // TODO: quick-start for meta and the other sys-tables?
1579    assignQueueLock.lock();
1580    try {
1581      pendingAssignQueue.add(regionNode);
1582      if (regionNode.isSystemTable() ||
1583          pendingAssignQueue.size() == 1 ||
1584          pendingAssignQueue.size() >= assignDispatchWaitQueueMaxSize) {
1585        assignQueueFullCond.signal();
1586      }
1587    } finally {
1588      assignQueueLock.unlock();
1589    }
1590  }
1591
1592  private void startAssignmentThread() {
1593    // Get Server Thread name. Sometimes the Server is mocked so may not implement HasThread.
1594    // For example, in tests.
1595    String name = master instanceof HasThread? ((HasThread)master).getName():
1596        master.getServerName().toShortString();
1597    assignThread = new Thread(name) {
1598      @Override
1599      public void run() {
1600        while (isRunning()) {
1601          processAssignQueue();
1602        }
1603        pendingAssignQueue.clear();
1604      }
1605    };
1606    assignThread.setDaemon(true);
1607    assignThread.start();
1608  }
1609
1610  private void stopAssignmentThread() {
1611    assignQueueSignal();
1612    try {
1613      while (assignThread.isAlive()) {
1614        assignQueueSignal();
1615        assignThread.join(250);
1616      }
1617    } catch (InterruptedException e) {
1618      LOG.warn("join interrupted", e);
1619      Thread.currentThread().interrupt();
1620    }
1621  }
1622
1623  private void assignQueueSignal() {
1624    assignQueueLock.lock();
1625    try {
1626      assignQueueFullCond.signal();
1627    } finally {
1628      assignQueueLock.unlock();
1629    }
1630  }
1631
1632  @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP")
1633  private HashMap<RegionInfo, RegionStateNode> waitOnAssignQueue() {
1634    HashMap<RegionInfo, RegionStateNode> regions = null;
1635
1636    assignQueueLock.lock();
1637    try {
1638      if (pendingAssignQueue.isEmpty() && isRunning()) {
1639        assignQueueFullCond.await();
1640      }
1641
1642      if (!isRunning()) return null;
1643      assignQueueFullCond.await(assignDispatchWaitMillis, TimeUnit.MILLISECONDS);
1644      regions = new HashMap<RegionInfo, RegionStateNode>(pendingAssignQueue.size());
1645      for (RegionStateNode regionNode: pendingAssignQueue) {
1646        regions.put(regionNode.getRegionInfo(), regionNode);
1647      }
1648      pendingAssignQueue.clear();
1649    } catch (InterruptedException e) {
1650      LOG.warn("got interrupted ", e);
1651      Thread.currentThread().interrupt();
1652    } finally {
1653      assignQueueLock.unlock();
1654    }
1655    return regions;
1656  }
1657
1658  private void processAssignQueue() {
1659    final HashMap<RegionInfo, RegionStateNode> regions = waitOnAssignQueue();
1660    if (regions == null || regions.size() == 0 || !isRunning()) {
1661      return;
1662    }
1663
1664    if (LOG.isTraceEnabled()) {
1665      LOG.trace("PROCESS ASSIGN QUEUE regionCount=" + regions.size());
1666    }
1667
1668    // TODO: Optimize balancer. pass a RegionPlan?
1669    final HashMap<RegionInfo, ServerName> retainMap = new HashMap<>();
1670    final List<RegionInfo> userHRIs = new ArrayList<>(regions.size());
1671    // Regions for system tables requiring reassignment
1672    final List<RegionInfo> systemHRIs = new ArrayList<>();
1673    for (RegionStateNode regionStateNode: regions.values()) {
1674      boolean sysTable = regionStateNode.isSystemTable();
1675      final List<RegionInfo> hris = sysTable? systemHRIs: userHRIs;
1676      if (regionStateNode.getRegionLocation() != null) {
1677        retainMap.put(regionStateNode.getRegionInfo(), regionStateNode.getRegionLocation());
1678      } else {
1679        hris.add(regionStateNode.getRegionInfo());
1680      }
1681    }
1682
1683    // TODO: connect with the listener to invalidate the cache
1684
1685    // TODO use events
1686    List<ServerName> servers = master.getServerManager().createDestinationServersList();
1687    for (int i = 0; servers.size() < 1; ++i) {
1688      // Report every fourth time around this loop; try not to flood log.
1689      if (i % 4 == 0) {
1690        LOG.warn("No servers available; cannot place " + regions.size() + " unassigned regions.");
1691      }
1692
1693      if (!isRunning()) {
1694        LOG.debug("Stopped! Dropping assign of " + regions.size() + " queued regions.");
1695        return;
1696      }
1697      Threads.sleep(250);
1698      servers = master.getServerManager().createDestinationServersList();
1699    }
1700
1701    if (!systemHRIs.isEmpty()) {
1702      // System table regions requiring reassignment are present, get region servers
1703      // not available for system table regions
1704      final List<ServerName> excludeServers = getExcludedServersForSystemTable();
1705      List<ServerName> serversForSysTables = servers.stream()
1706          .filter(s -> !excludeServers.contains(s)).collect(Collectors.toList());
1707      if (serversForSysTables.isEmpty()) {
1708        LOG.warn("Filtering old server versions and the excluded produced an empty set; " +
1709            "instead considering all candidate servers!");
1710      }
1711      LOG.debug("Processing assignQueue; systemServersCount=" + serversForSysTables.size() +
1712          ", allServersCount=" + servers.size());
1713      processAssignmentPlans(regions, null, systemHRIs,
1714          serversForSysTables.isEmpty()? servers: serversForSysTables);
1715    }
1716
1717    processAssignmentPlans(regions, retainMap, userHRIs, servers);
1718  }
1719
1720  private void processAssignmentPlans(final HashMap<RegionInfo, RegionStateNode> regions,
1721      final HashMap<RegionInfo, ServerName> retainMap, final List<RegionInfo> hris,
1722      final List<ServerName> servers) {
1723    boolean isTraceEnabled = LOG.isTraceEnabled();
1724    if (isTraceEnabled) {
1725      LOG.trace("Available servers count=" + servers.size() + ": " + servers);
1726    }
1727
1728    final LoadBalancer balancer = getBalancer();
1729    // ask the balancer where to place regions
1730    if (retainMap != null && !retainMap.isEmpty()) {
1731      if (isTraceEnabled) {
1732        LOG.trace("retain assign regions=" + retainMap);
1733      }
1734      try {
1735        acceptPlan(regions, balancer.retainAssignment(retainMap, servers));
1736      } catch (HBaseIOException e) {
1737        LOG.warn("unable to retain assignment", e);
1738        addToPendingAssignment(regions, retainMap.keySet());
1739      }
1740    }
1741
1742    // TODO: Do we need to split retain and round-robin?
1743    // the retain seems to fallback to round-robin/random if the region is not in the map.
1744    if (!hris.isEmpty()) {
1745      Collections.sort(hris, RegionInfo.COMPARATOR);
1746      if (isTraceEnabled) {
1747        LOG.trace("round robin regions=" + hris);
1748      }
1749      try {
1750        acceptPlan(regions, balancer.roundRobinAssignment(hris, servers));
1751      } catch (HBaseIOException e) {
1752        LOG.warn("unable to round-robin assignment", e);
1753        addToPendingAssignment(regions, hris);
1754      }
1755    }
1756  }
1757
1758  private void acceptPlan(final HashMap<RegionInfo, RegionStateNode> regions,
1759      final Map<ServerName, List<RegionInfo>> plan) throws HBaseIOException {
1760    final ProcedureEvent<?>[] events = new ProcedureEvent[regions.size()];
1761    final long st = System.currentTimeMillis();
1762
1763    if (plan == null) {
1764      throw new HBaseIOException("unable to compute plans for regions=" + regions.size());
1765    }
1766
1767    if (plan.isEmpty()) return;
1768
1769    List<RegionInfo> bogusRegions = plan.remove(LoadBalancer.BOGUS_SERVER_NAME);
1770    if (bogusRegions != null && !bogusRegions.isEmpty()) {
1771      addToPendingAssignment(regions, bogusRegions);
1772    }
1773
1774    int evcount = 0;
1775    for (Map.Entry<ServerName, List<RegionInfo>> entry: plan.entrySet()) {
1776      final ServerName server = entry.getKey();
1777      for (RegionInfo hri: entry.getValue()) {
1778        final RegionStateNode regionNode = regions.get(hri);
1779        regionNode.setRegionLocation(server);
1780        events[evcount++] = regionNode.getProcedureEvent();
1781      }
1782    }
1783    ProcedureEvent.wakeEvents(getProcedureScheduler(), events);
1784
1785    final long et = System.currentTimeMillis();
1786    if (LOG.isTraceEnabled()) {
1787      LOG.trace("ASSIGN ACCEPT " + events.length + " -> " +
1788          StringUtils.humanTimeDiff(et - st));
1789    }
1790  }
1791
1792  private void addToPendingAssignment(final HashMap<RegionInfo, RegionStateNode> regions,
1793      final Collection<RegionInfo> pendingRegions) {
1794    assignQueueLock.lock();
1795    try {
1796      for (RegionInfo hri: pendingRegions) {
1797        pendingAssignQueue.add(regions.get(hri));
1798      }
1799    } finally {
1800      assignQueueLock.unlock();
1801    }
1802  }
1803
1804  /**
1805   * Get a list of servers that this region cannot be assigned to.
1806   * For system tables, we must assign them to a server with highest version.
1807   */
1808  public List<ServerName> getExcludedServersForSystemTable() {
1809    // TODO: This should be a cached list kept by the ServerManager rather than calculated on each
1810    // move or system region assign. The RegionServerTracker keeps list of online Servers with
1811    // RegionServerInfo that includes Version.
1812    List<Pair<ServerName, String>> serverList = master.getServerManager().getOnlineServersList()
1813        .stream()
1814        .map((s)->new Pair<>(s, master.getRegionServerVersion(s)))
1815        .collect(Collectors.toList());
1816    if (serverList.isEmpty()) {
1817      return Collections.emptyList();
1818    }
1819    String highestVersion = Collections.max(serverList,
1820        (o1, o2) -> VersionInfo.compareVersion(o1.getSecond(), o2.getSecond())).getSecond();
1821    return serverList.stream()
1822        .filter((p)->!p.getSecond().equals(highestVersion))
1823        .map(Pair::getFirst)
1824        .collect(Collectors.toList());
1825  }
1826
1827  // ============================================================================================
1828  //  Server Helpers
1829  // ============================================================================================
1830  @Override
1831  public void serverAdded(final ServerName serverName) {
1832  }
1833
1834  @Override
1835  public void serverRemoved(final ServerName serverName) {
1836    final ServerStateNode serverNode = regionStates.getServerNode(serverName);
1837    if (serverNode == null) return;
1838
1839    // just in case, wake procedures waiting for this server report
1840    wakeServerReportEvent(serverNode);
1841  }
1842
1843  private void killRegionServer(final ServerName serverName) {
1844    master.getServerManager().expireServer(serverName);
1845  }
1846
1847  /**
1848   * <p>
1849   * This is a very particular check. The {@link org.apache.hadoop.hbase.master.ServerManager} is
1850   * where you go to check on state of 'Servers', what Servers are online, etc.
1851   * </p>
1852   * <p>
1853   * Here we are checking the state of a server that is post expiration, a ServerManager function
1854   * that moves a server from online to dead. Here we are seeing if the server has moved beyond a
1855   * particular point in the recovery process such that it is safe to move on with assigns; etc.
1856   * </p>
1857   * <p>
1858   * For now it is only used in
1859   * {@link UnassignProcedure#remoteCallFailed(MasterProcedureEnv, RegionStateNode, IOException)} to
1860   * see whether we can safely quit without losing data.
1861   * </p>
1862   * @param meta whether to check for meta log splitting
1863   * @return {@code true} if the server does not exist or the log splitting is done, i.e, the server
1864   *         is in OFFLINE state, or for meta log, is in SPLITTING_META_DONE state. If null,
1865   *         presumes the ServerStateNode was cleaned up by SCP.
1866   * @see UnassignProcedure#remoteCallFailed(MasterProcedureEnv, RegionStateNode, IOException)
1867   */
1868  boolean isLogSplittingDone(ServerName serverName, boolean meta) {
1869    ServerStateNode ssn = this.regionStates.getServerNode(serverName);
1870    if (ssn == null) {
1871      return true;
1872    }
1873    ServerState[] inState =
1874      meta
1875        ? new ServerState[] { ServerState.SPLITTING_META_DONE, ServerState.SPLITTING,
1876          ServerState.OFFLINE }
1877        : new ServerState[] { ServerState.OFFLINE };
1878    synchronized (ssn) {
1879      return ssn.isInState(inState);
1880    }
1881  }
1882
1883  @VisibleForTesting
1884  MasterServices getMaster() {
1885    return master;
1886  }
1887
1888  /**
1889   * @return a snapshot of rsReports
1890   */
1891  public Map<ServerName, Set<byte[]>> getRSReports() {
1892    Map<ServerName, Set<byte[]>> rsReportsSnapshot = new HashMap<>();
1893    synchronized (rsReports) {
1894      rsReports.entrySet().forEach(e -> rsReportsSnapshot.put(e.getKey(), e.getValue()));
1895    }
1896    return rsReportsSnapshot;
1897  }
1898}