001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.assignment;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Arrays;
023import java.util.Collection;
024import java.util.Collections;
025import java.util.HashMap;
026import java.util.HashSet;
027import java.util.List;
028import java.util.Map;
029import java.util.Set;
030import java.util.concurrent.CopyOnWriteArrayList;
031import java.util.concurrent.Future;
032import java.util.concurrent.TimeUnit;
033import java.util.concurrent.atomic.AtomicBoolean;
034import java.util.concurrent.locks.Condition;
035import java.util.concurrent.locks.ReentrantLock;
036import java.util.stream.Collectors;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.hbase.HBaseIOException;
039import org.apache.hadoop.hbase.HConstants;
040import org.apache.hadoop.hbase.PleaseHoldException;
041import org.apache.hadoop.hbase.ServerName;
042import org.apache.hadoop.hbase.TableName;
043import org.apache.hadoop.hbase.YouAreDeadException;
044import org.apache.hadoop.hbase.client.RegionInfo;
045import org.apache.hadoop.hbase.client.RegionInfoBuilder;
046import org.apache.hadoop.hbase.client.RegionStatesCount;
047import org.apache.hadoop.hbase.client.Result;
048import org.apache.hadoop.hbase.client.TableState;
049import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
050import org.apache.hadoop.hbase.favored.FavoredNodesManager;
051import org.apache.hadoop.hbase.favored.FavoredNodesPromoter;
052import org.apache.hadoop.hbase.master.AssignmentListener;
053import org.apache.hadoop.hbase.master.LoadBalancer;
054import org.apache.hadoop.hbase.master.MasterServices;
055import org.apache.hadoop.hbase.master.MetricsAssignmentManager;
056import org.apache.hadoop.hbase.master.RegionPlan;
057import org.apache.hadoop.hbase.master.RegionState;
058import org.apache.hadoop.hbase.master.RegionState.State;
059import org.apache.hadoop.hbase.master.ServerListener;
060import org.apache.hadoop.hbase.master.TableStateManager;
061import org.apache.hadoop.hbase.master.assignment.RegionStates.RegionStateNode;
062import org.apache.hadoop.hbase.master.assignment.RegionStates.ServerState;
063import org.apache.hadoop.hbase.master.assignment.RegionStates.ServerStateNode;
064import org.apache.hadoop.hbase.master.balancer.FavoredStochasticBalancer;
065import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
066import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler;
067import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait;
068import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
069import org.apache.hadoop.hbase.procedure2.Procedure;
070import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
071import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
072import org.apache.hadoop.hbase.procedure2.ProcedureInMemoryChore;
073import org.apache.hadoop.hbase.procedure2.util.StringUtils;
074import org.apache.hadoop.hbase.regionserver.SequenceId;
075import org.apache.hadoop.hbase.util.Bytes;
076import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
077import org.apache.hadoop.hbase.util.HasThread;
078import org.apache.hadoop.hbase.util.Pair;
079import org.apache.hadoop.hbase.util.Threads;
080import org.apache.hadoop.hbase.util.VersionInfo;
081import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
082import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
083import org.apache.yetus.audience.InterfaceAudience;
084import org.apache.zookeeper.KeeperException;
085import org.slf4j.Logger;
086import org.slf4j.LoggerFactory;
087
088import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
089
090import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
091import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
092import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
093import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
094import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
095
096/**
097 * The AssignmentManager is the coordinator for region assign/unassign operations.
098 * <ul>
099 * <li>In-memory states of regions and servers are stored in {@link RegionStates}.</li>
100 * <li>hbase:meta state updates are handled by {@link RegionStateStore}.</li>
101 * </ul>
102 * Regions are created by CreateTable, Split, Merge.
103 * Regions are deleted by DeleteTable, Split, Merge.
104 * Assigns are triggered by CreateTable, EnableTable, Split, Merge, ServerCrash.
105 * Unassigns are triggered by DisableTable, Split, Merge
106 */
107@InterfaceAudience.Private
108public class AssignmentManager implements ServerListener {
109  private static final Logger LOG = LoggerFactory.getLogger(AssignmentManager.class);
110
111  // TODO: AMv2
112  //  - handle region migration from hbase1 to hbase2.
113  //  - handle sys table assignment first (e.g. acl, namespace)
114  //  - handle table priorities
115  //  - If ServerBusyException trying to update hbase:meta, we abort the Master
116  //   See updateRegionLocation in RegionStateStore.
117  //
118  // See also
119  // https://docs.google.com/document/d/1eVKa7FHdeoJ1-9o8yZcOTAQbv0u0bblBlCCzVSIn69g/edit#heading=h.ystjyrkbtoq5
120  // for other TODOs.
121
122  public static final String BOOTSTRAP_THREAD_POOL_SIZE_CONF_KEY =
123      "hbase.assignment.bootstrap.thread.pool.size";
124
125  public static final String ASSIGN_DISPATCH_WAIT_MSEC_CONF_KEY =
126      "hbase.assignment.dispatch.wait.msec";
127  private static final int DEFAULT_ASSIGN_DISPATCH_WAIT_MSEC = 150;
128
129  public static final String ASSIGN_DISPATCH_WAITQ_MAX_CONF_KEY =
130      "hbase.assignment.dispatch.wait.queue.max.size";
131  private static final int DEFAULT_ASSIGN_DISPATCH_WAITQ_MAX = 100;
132
133  public static final String RIT_CHORE_INTERVAL_MSEC_CONF_KEY =
134      "hbase.assignment.rit.chore.interval.msec";
135  private static final int DEFAULT_RIT_CHORE_INTERVAL_MSEC = 60 * 1000;
136
137  public static final String ASSIGN_MAX_ATTEMPTS =
138      "hbase.assignment.maximum.attempts";
139  private static final int DEFAULT_ASSIGN_MAX_ATTEMPTS = Integer.MAX_VALUE;
140
141  /** Region in Transition metrics threshold time */
142  public static final String METRICS_RIT_STUCK_WARNING_THRESHOLD =
143      "hbase.metrics.rit.stuck.warning.threshold";
144  private static final int DEFAULT_RIT_STUCK_WARNING_THRESHOLD = 60 * 1000;
145
146  private final ProcedureEvent<?> metaAssignEvent = new ProcedureEvent<>("meta assign");
147  private final ProcedureEvent<?> metaLoadEvent = new ProcedureEvent<>("meta load");
148
149  /** Listeners that are called on assignment events. */
150  private final CopyOnWriteArrayList<AssignmentListener> listeners =
151      new CopyOnWriteArrayList<AssignmentListener>();
152
153  private final MetricsAssignmentManager metrics;
154  private final RegionInTransitionChore ritChore;
155  private final MasterServices master;
156
157  private final AtomicBoolean running = new AtomicBoolean(false);
158  private final RegionStates regionStates = new RegionStates();
159  private final RegionStateStore regionStateStore;
160
161  private final Map<ServerName, Set<byte[]>> rsReports = new HashMap<>();
162
163  private final boolean shouldAssignRegionsWithFavoredNodes;
164  private final int assignDispatchWaitQueueMaxSize;
165  private final int assignDispatchWaitMillis;
166  private final int assignMaxAttempts;
167
168  private final Object checkIfShouldMoveSystemRegionLock = new Object();
169
170  private Thread assignThread;
171
172  public AssignmentManager(final MasterServices master) {
173    this(master, new RegionStateStore(master));
174  }
175
176  public AssignmentManager(final MasterServices master, final RegionStateStore stateStore) {
177    this.master = master;
178    this.regionStateStore = stateStore;
179    this.metrics = new MetricsAssignmentManager();
180
181    final Configuration conf = master.getConfiguration();
182
183    // Only read favored nodes if using the favored nodes load balancer.
184    this.shouldAssignRegionsWithFavoredNodes = FavoredStochasticBalancer.class.isAssignableFrom(
185        conf.getClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, Object.class));
186
187    this.assignDispatchWaitMillis = conf.getInt(ASSIGN_DISPATCH_WAIT_MSEC_CONF_KEY,
188        DEFAULT_ASSIGN_DISPATCH_WAIT_MSEC);
189    this.assignDispatchWaitQueueMaxSize = conf.getInt(ASSIGN_DISPATCH_WAITQ_MAX_CONF_KEY,
190        DEFAULT_ASSIGN_DISPATCH_WAITQ_MAX);
191
192    this.assignMaxAttempts = Math.max(1, conf.getInt(ASSIGN_MAX_ATTEMPTS,
193        DEFAULT_ASSIGN_MAX_ATTEMPTS));
194
195    int ritChoreInterval = conf.getInt(RIT_CHORE_INTERVAL_MSEC_CONF_KEY,
196        DEFAULT_RIT_CHORE_INTERVAL_MSEC);
197    this.ritChore = new RegionInTransitionChore(ritChoreInterval);
198  }
199
200  public void start() throws IOException, KeeperException {
201    if (!running.compareAndSet(false, true)) {
202      return;
203    }
204
205    LOG.trace("Starting assignment manager");
206
207    // Register Server Listener
208    master.getServerManager().registerListener(this);
209
210    // Start the Assignment Thread
211    startAssignmentThread();
212
213    // load meta region state
214    ZKWatcher zkw = master.getZooKeeper();
215    // it could be null in some tests
216    if (zkw != null) {
217      RegionState regionState = MetaTableLocator.getMetaRegionState(zkw);
218      RegionStateNode regionStateNode =
219        regionStates.getOrCreateRegionStateNode(RegionInfoBuilder.FIRST_META_REGIONINFO);
220      synchronized (regionStateNode) {
221        regionStateNode.setRegionLocation(regionState.getServerName());
222        regionStateNode.setState(regionState.getState());
223        setMetaAssigned(regionState.getRegion(), regionState.getState() == State.OPEN);
224      }
225    }
226  }
227
228  public void stop() {
229    if (!running.compareAndSet(true, false)) {
230      return;
231    }
232
233    LOG.info("Stopping assignment manager");
234
235    // The AM is started before the procedure executor,
236    // but the actual work will be loaded/submitted only once we have the executor
237    final boolean hasProcExecutor = master.getMasterProcedureExecutor() != null;
238
239    // Remove the RIT chore
240    if (hasProcExecutor) {
241      master.getMasterProcedureExecutor().removeChore(this.ritChore);
242    }
243
244    // Stop the Assignment Thread
245    stopAssignmentThread();
246
247    // Stop the RegionStateStore
248    regionStates.clear();
249
250    // Unregister Server Listener
251    master.getServerManager().unregisterListener(this);
252
253    // Update meta events (for testing)
254    if (hasProcExecutor) {
255      metaLoadEvent.suspend();
256      for (RegionInfo hri: getMetaRegionSet()) {
257        setMetaAssigned(hri, false);
258      }
259    }
260  }
261
262  public boolean isRunning() {
263    return running.get();
264  }
265
266  public Configuration getConfiguration() {
267    return master.getConfiguration();
268  }
269
270  public MetricsAssignmentManager getAssignmentManagerMetrics() {
271    return metrics;
272  }
273
274  private LoadBalancer getBalancer() {
275    return master.getLoadBalancer();
276  }
277
278  private MasterProcedureEnv getProcedureEnvironment() {
279    return master.getMasterProcedureExecutor().getEnvironment();
280  }
281
282  private MasterProcedureScheduler getProcedureScheduler() {
283    return getProcedureEnvironment().getProcedureScheduler();
284  }
285
286  int getAssignMaxAttempts() {
287    return assignMaxAttempts;
288  }
289
290  /**
291   * Add the listener to the notification list.
292   * @param listener The AssignmentListener to register
293   */
294  public void registerListener(final AssignmentListener listener) {
295    this.listeners.add(listener);
296  }
297
298  /**
299   * Remove the listener from the notification list.
300   * @param listener The AssignmentListener to unregister
301   */
302  public boolean unregisterListener(final AssignmentListener listener) {
303    return this.listeners.remove(listener);
304  }
305
306  public RegionStates getRegionStates() {
307    return regionStates;
308  }
309
310  public RegionStateStore getRegionStateStore() {
311    return regionStateStore;
312  }
313
314  public List<ServerName> getFavoredNodes(final RegionInfo regionInfo) {
315    return this.shouldAssignRegionsWithFavoredNodes?
316        ((FavoredStochasticBalancer)getBalancer()).getFavoredNodes(regionInfo):
317          ServerName.EMPTY_SERVER_LIST;
318  }
319
320  // ============================================================================================
321  //  Table State Manager helpers
322  // ============================================================================================
323  TableStateManager getTableStateManager() {
324    return master.getTableStateManager();
325  }
326
327  public boolean isTableEnabled(final TableName tableName) {
328    return getTableStateManager().isTableState(tableName, TableState.State.ENABLED);
329  }
330
331  public boolean isTableDisabled(final TableName tableName) {
332    return getTableStateManager().isTableState(tableName,
333      TableState.State.DISABLED, TableState.State.DISABLING);
334  }
335
336  // ============================================================================================
337  //  META Helpers
338  // ============================================================================================
339  private boolean isMetaRegion(final RegionInfo regionInfo) {
340    return regionInfo.isMetaRegion();
341  }
342
343  public boolean isMetaRegion(final byte[] regionName) {
344    return getMetaRegionFromName(regionName) != null;
345  }
346
347  public RegionInfo getMetaRegionFromName(final byte[] regionName) {
348    for (RegionInfo hri: getMetaRegionSet()) {
349      if (Bytes.equals(hri.getRegionName(), regionName)) {
350        return hri;
351      }
352    }
353    return null;
354  }
355
356  public boolean isCarryingMeta(final ServerName serverName) {
357    // TODO: handle multiple meta
358    return isCarryingRegion(serverName, RegionInfoBuilder.FIRST_META_REGIONINFO);
359  }
360
361  private boolean isCarryingRegion(final ServerName serverName, final RegionInfo regionInfo) {
362    // TODO: check for state?
363    final RegionStateNode node = regionStates.getRegionStateNode(regionInfo);
364    return(node != null && serverName.equals(node.getRegionLocation()));
365  }
366
367  private RegionInfo getMetaForRegion(final RegionInfo regionInfo) {
368    //if (regionInfo.isMetaRegion()) return regionInfo;
369    // TODO: handle multiple meta. if the region provided is not meta lookup
370    // which meta the region belongs to.
371    return RegionInfoBuilder.FIRST_META_REGIONINFO;
372  }
373
374  // TODO: handle multiple meta.
375  private static final Set<RegionInfo> META_REGION_SET =
376      Collections.singleton(RegionInfoBuilder.FIRST_META_REGIONINFO);
377  public Set<RegionInfo> getMetaRegionSet() {
378    return META_REGION_SET;
379  }
380
381  // ============================================================================================
382  //  META Event(s) helpers
383  // ============================================================================================
384  /**
385   * Notice that, this only means the meta region is available on a RS, but the AM may still be
386   * loading the region states from meta, so usually you need to check {@link #isMetaLoaded()} first
387   * before checking this method, unless you can make sure that your piece of code can only be
388   * executed after AM builds the region states.
389   * @see #isMetaLoaded()
390   */
391  public boolean isMetaAssigned() {
392    return metaAssignEvent.isReady();
393  }
394
395  public boolean isMetaRegionInTransition() {
396    return !isMetaAssigned();
397  }
398
399  /**
400   * Notice that this event does not mean the AM has already finished region state rebuilding. See
401   * the comment of {@link #isMetaAssigned()} for more details.
402   * @see #isMetaAssigned()
403   */
404  public boolean waitMetaAssigned(Procedure<?> proc, RegionInfo regionInfo) {
405    return getMetaAssignEvent(getMetaForRegion(regionInfo)).suspendIfNotReady(proc);
406  }
407
408  private void setMetaAssigned(RegionInfo metaRegionInfo, boolean assigned) {
409    assert isMetaRegion(metaRegionInfo) : "unexpected non-meta region " + metaRegionInfo;
410    ProcedureEvent<?> metaAssignEvent = getMetaAssignEvent(metaRegionInfo);
411    if (assigned) {
412      metaAssignEvent.wake(getProcedureScheduler());
413    } else {
414      metaAssignEvent.suspend();
415    }
416  }
417
418  private ProcedureEvent<?> getMetaAssignEvent(RegionInfo metaRegionInfo) {
419    assert isMetaRegion(metaRegionInfo) : "unexpected non-meta region " + metaRegionInfo;
420    // TODO: handle multiple meta.
421    return metaAssignEvent;
422  }
423
424  /**
425   * Wait until AM finishes the meta loading, i.e, the region states rebuilding.
426   * @see #isMetaLoaded()
427   * @see #waitMetaAssigned(Procedure, RegionInfo)
428   */
429  public boolean waitMetaLoaded(Procedure<?> proc) {
430    return metaLoadEvent.suspendIfNotReady(proc);
431  }
432
433  @VisibleForTesting
434  void wakeMetaLoadedEvent() {
435    metaLoadEvent.wake(getProcedureScheduler());
436    assert isMetaLoaded() : "expected meta to be loaded";
437  }
438
439  /**
440   * Return whether AM finishes the meta loading, i.e, the region states rebuilding.
441   * @see #isMetaAssigned()
442   * @see #waitMetaLoaded(Procedure)
443   */
444  public boolean isMetaLoaded() {
445    return metaLoadEvent.isReady();
446  }
447
448  /**
449   * Start a new thread to check if there are region servers whose versions are higher than others.
450   * If so, move all system table regions to RS with the highest version to keep compatibility.
451   * The reason is, RS in new version may not be able to access RS in old version when there are
452   * some incompatible changes.
453   * <p>This method is called when a new RegionServer is added to cluster only.</p>
454   */
455  public void checkIfShouldMoveSystemRegionAsync() {
456    // TODO: Fix this thread. If a server is killed and a new one started, this thread thinks that
457    // it should 'move' the system tables from the old server to the new server but
458    // ServerCrashProcedure is on it; and it will take care of the assign without dataloss.
459    if (this.master.getServerManager().countOfRegionServers() <= 1) {
460      return;
461    }
462    // This thread used to run whenever there was a change in the cluster. The ZooKeeper
463    // childrenChanged notification came in before the nodeDeleted message and so this method
464    // cold run before a ServerCrashProcedure could run. That meant that this thread could see
465    // a Crashed Server before ServerCrashProcedure and it could find system regions on the
466    // crashed server and go move them before ServerCrashProcedure had a chance; could be
467    // dataloss too if WALs were not recovered.
468    new Thread(() -> {
469      try {
470        synchronized (checkIfShouldMoveSystemRegionLock) {
471          List<RegionPlan> plans = new ArrayList<>();
472          // TODO: I don't think this code does a good job if all servers in cluster have same
473          // version. It looks like it will schedule unnecessary moves.
474          for (ServerName server : getExcludedServersForSystemTable()) {
475            if (master.getServerManager().isServerDead(server)) {
476              // TODO: See HBASE-18494 and HBASE-18495. Though getExcludedServersForSystemTable()
477              // considers only online servers, the server could be queued for dead server
478              // processing. As region assignments for crashed server is handled by
479              // ServerCrashProcedure, do NOT handle them here. The goal is to handle this through
480              // regular flow of LoadBalancer as a favored node and not to have this special
481              // handling.
482              continue;
483            }
484            List<RegionInfo> regionsShouldMove = getSystemTables(server);
485            if (!regionsShouldMove.isEmpty()) {
486              for (RegionInfo regionInfo : regionsShouldMove) {
487                // null value for dest forces destination server to be selected by balancer
488                RegionPlan plan = new RegionPlan(regionInfo, server, null);
489                if (regionInfo.isMetaRegion()) {
490                  // Must move meta region first.
491                  LOG.info("Async MOVE of {} to newer Server={}",
492                      regionInfo.getEncodedName(), server);
493                  moveAsync(plan);
494                } else {
495                  plans.add(plan);
496                }
497              }
498            }
499            for (RegionPlan plan : plans) {
500              LOG.info("Async MOVE of {} to newer Server={}",
501                  plan.getRegionInfo().getEncodedName(), server);
502              moveAsync(plan);
503            }
504          }
505        }
506      } catch (Throwable t) {
507        LOG.error(t.toString(), t);
508      }
509    }).start();
510  }
511
512  private List<RegionInfo> getSystemTables(ServerName serverName) {
513    Set<RegionStateNode> regions = this.getRegionStates().getServerNode(serverName).getRegions();
514    if (regions == null) {
515      return new ArrayList<>();
516    }
517    return regions.stream()
518        .map(RegionStateNode::getRegionInfo)
519        .filter(r -> r.getTable().isSystemTable())
520        .collect(Collectors.toList());
521  }
522
523  public void assign(final RegionInfo regionInfo, ServerName sn) throws IOException {
524    AssignProcedure proc = createAssignProcedure(regionInfo, sn);
525    ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc);
526  }
527
528  public void assign(final RegionInfo regionInfo) throws IOException {
529    AssignProcedure proc = createAssignProcedure(regionInfo);
530    ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc);
531  }
532
533  public void unassign(final RegionInfo regionInfo) throws IOException {
534    unassign(regionInfo, false);
535  }
536
537  public void unassign(final RegionInfo regionInfo, final boolean forceNewPlan)
538  throws IOException {
539    // TODO: rename this reassign
540    RegionStateNode node = this.regionStates.getRegionStateNode(regionInfo);
541    ServerName destinationServer = node.getRegionLocation();
542    if (destinationServer == null) {
543      throw new UnexpectedStateException("DestinationServer is null; Assigned? " + node.toString());
544    }
545    assert destinationServer != null; node.toString();
546    UnassignProcedure proc = createUnassignProcedure(regionInfo, destinationServer, forceNewPlan);
547    ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc);
548  }
549
550  public void move(final RegionInfo regionInfo) throws IOException {
551    RegionStateNode node = this.regionStates.getRegionStateNode(regionInfo);
552    ServerName sourceServer = node.getRegionLocation();
553    RegionPlan plan = new RegionPlan(regionInfo, sourceServer, null);
554    MoveRegionProcedure proc = createMoveRegionProcedure(plan);
555    ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc);
556  }
557
558  public Future<byte[]> moveAsync(final RegionPlan regionPlan) throws HBaseIOException {
559    MoveRegionProcedure proc = createMoveRegionProcedure(regionPlan);
560    return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), proc);
561  }
562
563  /**
564   * Create round-robin assigns. Use on table creation to distribute out regions across cluster.
565   * @return AssignProcedures made out of the passed in <code>hris</code> and a call to the balancer
566   *         to populate the assigns with targets chosen using round-robin (default balancer
567   *         scheme). If at assign-time, the target chosen is no longer up, thats fine, the
568   *         AssignProcedure will ask the balancer for a new target, and so on.
569   */
570  public AssignProcedure[] createRoundRobinAssignProcedures(List<RegionInfo> hris) {
571    return createRoundRobinAssignProcedures(hris, null);
572  }
573
574  // ============================================================================================
575  //  RegionTransition procedures helpers
576  // ============================================================================================
577
578  /**
579   * Create round-robin assigns. Use on table creation to distribute out regions across cluster.
580   * @return AssignProcedures made out of the passed in <code>hris</code> and a call
581   * to the balancer to populate the assigns with targets chosen using round-robin (default
582   * balancer scheme). If at assign-time, the target chosen is no longer up, thats fine,
583   * the AssignProcedure will ask the balancer for a new target, and so on.
584   */
585  public AssignProcedure[] createRoundRobinAssignProcedures(final List<RegionInfo> hris,
586      List<ServerName> serversToExclude) {
587    if (hris.isEmpty()) {
588      return null;
589    }
590    if (serversToExclude != null
591        && this.master.getServerManager().getOnlineServersList().size() == 1) {
592      LOG.debug("Only one region server found and hence going ahead with the assignment");
593      serversToExclude = null;
594    }
595    try {
596      // Ask the balancer to assign our regions. Pass the regions en masse. The balancer can do
597      // a better job if it has all the assignments in the one lump.
598      Map<ServerName, List<RegionInfo>> assignments = getBalancer().roundRobinAssignment(hris,
599          this.master.getServerManager().createDestinationServersList(serversToExclude));
600      // Return mid-method!
601      return createAssignProcedures(assignments, hris.size());
602    } catch (HBaseIOException hioe) {
603      LOG.warn("Failed roundRobinAssignment", hioe);
604    }
605    // If an error above, fall-through to this simpler assign. Last resort.
606    return createAssignProcedures(hris);
607  }
608
609  /**
610   * Create an array of AssignProcedures w/o specifying a target server.
611   * If no target server, at assign time, we will try to use the former location of the region
612   * if one exists. This is how we 'retain' the old location across a server restart.
613   * Used by {@link ServerCrashProcedure} assigning regions on a server that has crashed (SCP is
614   * also used across a cluster-restart just-in-case to ensure we do cleanup of any old WALs or
615   * server processes).
616   */
617  public AssignProcedure[] createAssignProcedures(final List<RegionInfo> hris) {
618    if (hris.isEmpty()) {
619      return null;
620    }
621    int index = 0;
622    AssignProcedure [] procedures = new AssignProcedure[hris.size()];
623    for (RegionInfo hri : hris) {
624      // Sort the procedures so meta and system regions are first in the returned array.
625      procedures[index++] = createAssignProcedure(hri);
626    }
627    if (procedures.length > 1) {
628      // Sort the procedures so meta and system regions are first in the returned array.
629      Arrays.sort(procedures, AssignProcedure.COMPARATOR);
630    }
631    return procedures;
632  }
633
634  // Make this static for the method below where we use it typing the AssignProcedure array we
635  // return as result.
636  private static final AssignProcedure [] ASSIGN_PROCEDURE_ARRAY_TYPE = new AssignProcedure[] {};
637
638  /**
639   * @param assignments Map of assignments from which we produce an array of AssignProcedures.
640   * @param size Count of assignments to make (the caller may know the total count)
641   * @return Assignments made from the passed in <code>assignments</code>
642   */
643  private AssignProcedure[] createAssignProcedures(Map<ServerName, List<RegionInfo>> assignments,
644      int size) {
645    List<AssignProcedure> procedures = new ArrayList<>(size > 0? size: 8/*Arbitrary*/);
646    for (Map.Entry<ServerName, List<RegionInfo>> e: assignments.entrySet()) {
647      for (RegionInfo ri: e.getValue()) {
648        AssignProcedure ap = createAssignProcedure(ri, e.getKey());
649        ap.setOwner(getProcedureEnvironment().getRequestUser().getShortName());
650        procedures.add(ap);
651      }
652    }
653    if (procedures.size() > 1) {
654      // Sort the procedures so meta and system regions are first in the returned array.
655      procedures.sort(AssignProcedure.COMPARATOR);
656    }
657    return procedures.toArray(ASSIGN_PROCEDURE_ARRAY_TYPE);
658  }
659
660  // Needed for the following method so it can type the created Array we retur n
661  private static final UnassignProcedure [] UNASSIGN_PROCEDURE_ARRAY_TYPE =
662      new UnassignProcedure[0];
663
664  UnassignProcedure[] createUnassignProcedures(final Collection<RegionStateNode> nodes) {
665    if (nodes.isEmpty()) return null;
666    final List<UnassignProcedure> procs = new ArrayList<UnassignProcedure>(nodes.size());
667    for (RegionStateNode node: nodes) {
668      if (!this.regionStates.include(node, false)) continue;
669      // Look for regions that are offline/closed; i.e. already unassigned.
670      if (this.regionStates.isRegionOffline(node.getRegionInfo())) continue;
671      assert node.getRegionLocation() != null: node.toString();
672      procs.add(createUnassignProcedure(node.getRegionInfo(), node.getRegionLocation(), false));
673    }
674    return procs.toArray(UNASSIGN_PROCEDURE_ARRAY_TYPE);
675  }
676
677  /**
678   * Called by things like DisableTableProcedure to get a list of UnassignProcedure
679   * to unassign the regions of the table.
680   */
681  public AssignProcedure createAssignProcedure(final RegionInfo regionInfo) {
682    return createAssignProcedure(regionInfo, null, false);
683  }
684
685  public AssignProcedure createAssignProcedure(final RegionInfo regionInfo, boolean override) {
686    return createAssignProcedure(regionInfo, null, override);
687  }
688
689  public AssignProcedure createAssignProcedure(final RegionInfo regionInfo,
690      ServerName targetServer) {
691    return createAssignProcedure(regionInfo, targetServer, false);
692  }
693
694  public AssignProcedure createAssignProcedure(final RegionInfo regionInfo,
695      final ServerName targetServer, boolean override) {
696    AssignProcedure proc = new AssignProcedure(regionInfo, targetServer, override);
697    proc.setOwner(getProcedureEnvironment().getRequestUser().getShortName());
698    return proc;
699  }
700
701  public UnassignProcedure createUnassignProcedure(final RegionInfo regionInfo) {
702    return createUnassignProcedure(regionInfo, null, false);
703  }
704
705  public UnassignProcedure createUnassignProcedure(final RegionInfo regionInfo,
706      boolean override) {
707    return createUnassignProcedure(regionInfo, null, override);
708  }
709
710  public UnassignProcedure[] createUnassignProcedures(final TableName tableName) {
711    return createUnassignProcedures(regionStates.getTableRegionStateNodes(tableName));
712  }
713
714  UnassignProcedure createUnassignProcedure(final RegionInfo regionInfo,
715      final ServerName destinationServer, final boolean force) {
716    return createUnassignProcedure(regionInfo, destinationServer, force, false);
717  }
718
719  UnassignProcedure createUnassignProcedure(final RegionInfo regionInfo,
720      final ServerName destinationServer, final boolean force,
721      final boolean removeAfterUnassigning) {
722    // If destinationServer is null, figure it.
723    ServerName sn = destinationServer != null? destinationServer:
724        getRegionStates().getRegionState(regionInfo).getServerName();
725    assert sn != null;
726    UnassignProcedure proc = new UnassignProcedure(regionInfo, sn, force, removeAfterUnassigning);
727    proc.setOwner(getProcedureEnvironment().getRequestUser().getShortName());
728    return proc;
729  }
730
731  private MoveRegionProcedure createMoveRegionProcedure(RegionPlan plan) throws HBaseIOException {
732    if (plan.getRegionInfo().getTable().isSystemTable()) {
733      List<ServerName> exclude = getExcludedServersForSystemTable();
734      if (plan.getDestination() != null && exclude.contains(plan.getDestination())) {
735        try {
736          LOG.info("Can not move " + plan.getRegionInfo() + " to " + plan.getDestination() +
737            " because the server is not with highest version");
738          plan.setDestination(getBalancer().randomAssignment(plan.getRegionInfo(),
739            this.master.getServerManager().createDestinationServersList(exclude)));
740        } catch (HBaseIOException e) {
741          LOG.warn(e.toString(), e);
742        }
743      }
744    }
745    return new MoveRegionProcedure(getProcedureEnvironment(), plan, true);
746  }
747
748
749  public SplitTableRegionProcedure createSplitProcedure(final RegionInfo regionToSplit,
750      final byte[] splitKey) throws IOException {
751    return new SplitTableRegionProcedure(getProcedureEnvironment(), regionToSplit, splitKey);
752  }
753
754  public MergeTableRegionsProcedure createMergeProcedure(RegionInfo ... ris) throws IOException {
755    return new MergeTableRegionsProcedure(getProcedureEnvironment(), ris, false);
756  }
757
758  /**
759   * Delete the region states. This is called by "DeleteTable"
760   */
761  public void deleteTable(final TableName tableName) throws IOException {
762    final ArrayList<RegionInfo> regions = regionStates.getTableRegionsInfo(tableName);
763    regionStateStore.deleteRegions(regions);
764    for (int i = 0; i < regions.size(); ++i) {
765      final RegionInfo regionInfo = regions.get(i);
766      // we expect the region to be offline
767      regionStates.removeFromOfflineRegions(regionInfo);
768      regionStates.deleteRegion(regionInfo);
769    }
770  }
771
772  // ============================================================================================
773  //  RS Region Transition Report helpers
774  // ============================================================================================
775  // TODO: Move this code in MasterRpcServices and call on specific event?
776  public ReportRegionStateTransitionResponse reportRegionStateTransition(
777      final ReportRegionStateTransitionRequest req)
778  throws PleaseHoldException {
779    final ReportRegionStateTransitionResponse.Builder builder =
780        ReportRegionStateTransitionResponse.newBuilder();
781    final ServerName serverName = ProtobufUtil.toServerName(req.getServer());
782    try {
783      for (RegionStateTransition transition: req.getTransitionList()) {
784        switch (transition.getTransitionCode()) {
785          case OPENED:
786          case FAILED_OPEN:
787          case CLOSED:
788            assert transition.getRegionInfoCount() == 1 : transition;
789            final RegionInfo hri = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0));
790            updateRegionTransition(serverName, transition.getTransitionCode(), hri,
791                transition.hasOpenSeqNum() ? transition.getOpenSeqNum() : HConstants.NO_SEQNUM);
792            break;
793          case READY_TO_SPLIT:
794          case SPLIT:
795          case SPLIT_REVERTED:
796            assert transition.getRegionInfoCount() == 3 : transition;
797            final RegionInfo parent = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0));
798            final RegionInfo splitA = ProtobufUtil.toRegionInfo(transition.getRegionInfo(1));
799            final RegionInfo splitB = ProtobufUtil.toRegionInfo(transition.getRegionInfo(2));
800            updateRegionSplitTransition(serverName, transition.getTransitionCode(),
801              parent, splitA, splitB);
802            break;
803          case READY_TO_MERGE:
804          case MERGED:
805          case MERGE_REVERTED:
806            assert transition.getRegionInfoCount() == 3 : transition;
807            final RegionInfo merged = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0));
808            final RegionInfo mergeA = ProtobufUtil.toRegionInfo(transition.getRegionInfo(1));
809            final RegionInfo mergeB = ProtobufUtil.toRegionInfo(transition.getRegionInfo(2));
810            updateRegionMergeTransition(serverName, transition.getTransitionCode(),
811              merged, mergeA, mergeB);
812            break;
813        }
814      }
815    } catch (PleaseHoldException e) {
816      if (LOG.isTraceEnabled()) LOG.trace("Failed transition " + e.getMessage());
817      throw e;
818    } catch (UnsupportedOperationException|IOException e) {
819      // TODO: at the moment we have a single error message and the RS will abort
820      // if the master says that one of the region transitions failed.
821      LOG.warn("Failed transition", e);
822      builder.setErrorMessage("Failed transition " + e.getMessage());
823    }
824    return builder.build();
825  }
826
827  /**
828   * Called when the RegionServer wants to report a Procedure transition.
829   * Ends up calling {@link #reportTransition(RegionStateNode, ServerName, TransitionCode, long)}
830   */
831  private void updateRegionTransition(final ServerName serverName, final TransitionCode state,
832      final RegionInfo regionInfo, final long seqId)
833      throws PleaseHoldException, UnexpectedStateException {
834    checkMetaLoaded(regionInfo);
835
836    final RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo);
837    if (regionNode == null) {
838      // the table/region is gone. maybe a delete, split, merge
839      throw new UnexpectedStateException(String.format(
840        "Server %s was trying to transition region %s to %s. but the region was removed.",
841        serverName, regionInfo, state));
842    }
843
844    if (LOG.isTraceEnabled()) {
845      LOG.trace(String.format("Update region transition serverName=%s region=%s regionState=%s",
846        serverName, regionNode, state));
847    }
848
849    if (!reportTransition(regionNode, serverName, state, seqId)) {
850      // Don't log WARN if shutting down cluster; during shutdown. Avoid the below messages:
851      // 2018-08-13 10:45:10,551 WARN ...AssignmentManager: No matching procedure found for
852      //   rit=OPEN, location=ve0538.halxg.cloudera.com,16020,1533493000958,
853      //   table=IntegrationTestBigLinkedList, region=65ab289e2fc1530df65f6c3d7cde7aa5 transition
854      //   to CLOSED
855      // These happen because on cluster shutdown, we currently let the RegionServers close
856      // regions. This is the only time that region close is not run by the Master (so cluster
857      // goes down fast). Consider changing it so Master runs all shutdowns.
858      if (this.master.getServerManager().isClusterShutdown() &&
859          state.equals(TransitionCode.CLOSED)) {
860        LOG.info("RegionServer {} {}", state, regionNode.getRegionInfo().getEncodedName());
861      } else {
862        LOG.warn("No matching procedure found for {} transition to {}", regionNode, state);
863      }
864    }
865  }
866
867  // FYI: regionNode is sometimes synchronized by the caller but not always.
868  private boolean reportTransition(final RegionStateNode regionNode, ServerName serverName,
869      final TransitionCode state, final long seqId)
870      throws UnexpectedStateException {
871    synchronized (regionNode) {
872      final RegionTransitionProcedure proc = regionNode.getProcedure();
873      if (proc == null) return false;
874
875      // serverNode.getReportEvent().removeProcedure(proc);
876      proc.reportTransition(master.getMasterProcedureExecutor().getEnvironment(),
877        serverName, state, seqId);
878    }
879    return true;
880  }
881
882  private void updateRegionSplitTransition(final ServerName serverName, final TransitionCode state,
883      final RegionInfo parent, final RegionInfo hriA, final RegionInfo hriB)
884      throws IOException {
885    checkMetaLoaded(parent);
886
887    if (state != TransitionCode.READY_TO_SPLIT) {
888      throw new UnexpectedStateException("unsupported split regionState=" + state +
889        " for parent region " + parent +
890        " maybe an old RS (< 2.0) had the operation in progress");
891    }
892
893    // sanity check on the request
894    if (!Bytes.equals(hriA.getEndKey(), hriB.getStartKey())) {
895      throw new UnsupportedOperationException(
896        "unsupported split request with bad keys: parent=" + parent +
897        " hriA=" + hriA + " hriB=" + hriB);
898    }
899
900    // Submit the Split procedure
901    final byte[] splitKey = hriB.getStartKey();
902    if (LOG.isDebugEnabled()) {
903      LOG.debug("Split request from " + serverName +
904          ", parent=" + parent + " splitKey=" + Bytes.toStringBinary(splitKey));
905    }
906    master.getMasterProcedureExecutor().submitProcedure(createSplitProcedure(parent, splitKey));
907
908    // If the RS is < 2.0 throw an exception to abort the operation, we are handling the split
909    if (master.getServerManager().getVersionNumber(serverName) < 0x0200000) {
910      throw new UnsupportedOperationException(String.format(
911        "Split handled by the master: parent=%s hriA=%s hriB=%s", parent.getShortNameToLog(), hriA, hriB));
912    }
913  }
914
915  private void updateRegionMergeTransition(final ServerName serverName, final TransitionCode state,
916      final RegionInfo merged, final RegionInfo hriA, final RegionInfo hriB) throws IOException {
917    checkMetaLoaded(merged);
918
919    if (state != TransitionCode.READY_TO_MERGE) {
920      throw new UnexpectedStateException("Unsupported merge regionState=" + state +
921        " for regionA=" + hriA + " regionB=" + hriB + " merged=" + merged +
922        " maybe an old RS (< 2.0) had the operation in progress");
923    }
924
925    // Submit the Merge procedure
926    if (LOG.isDebugEnabled()) {
927      LOG.debug("Handling merge request from RS=" + merged + ", merged=" + merged);
928    }
929    master.getMasterProcedureExecutor().submitProcedure(createMergeProcedure(hriA, hriB));
930
931    // If the RS is < 2.0 throw an exception to abort the operation, we are handling the merge
932    if (master.getServerManager().getVersionNumber(serverName) < 0x0200000) {
933      throw new UnsupportedOperationException(String.format(
934        "Merge not handled yet: regionState=%s merged=%s hriA=%s hriB=%s", state, merged, hriA,
935          hriB));
936    }
937  }
938
939  // ============================================================================================
940  //  RS Status update (report online regions) helpers
941  // ============================================================================================
942  /**
943   * the master will call this method when the RS send the regionServerReport().
944   * the report will contains the "online regions".
945   * this method will check the the online regions against the in-memory state of the AM,
946   * if there is a mismatch we will try to fence out the RS with the assumption
947   * that something went wrong on the RS side.
948   */
949  public void reportOnlineRegions(final ServerName serverName, final Set<byte[]> regionNames)
950      throws YouAreDeadException {
951    if (!isRunning()) return;
952    if (LOG.isTraceEnabled()) {
953      LOG.trace("ReportOnlineRegions " + serverName + " regionCount=" + regionNames.size() +
954        ", metaLoaded=" + isMetaLoaded() + " " +
955          regionNames.stream().map(element -> Bytes.toStringBinary(element)).
956            collect(Collectors.toList()));
957    }
958
959    // Make sure there is a ServerStateNode for this server that just checked in.
960    ServerStateNode serverNode = regionStates.getOrCreateServer(serverName);
961    synchronized (serverNode) {
962      if (!serverNode.isInState(ServerState.ONLINE)) {
963        LOG.warn("Got a report from a server result in state " + serverNode.getState());
964        return;
965      }
966    }
967
968    // Track the regionserver reported online regions in memory.
969    synchronized (rsReports) {
970      rsReports.put(serverName, regionNames);
971    }
972
973    if (regionNames.isEmpty()) {
974      // nothing to do if we don't have regions
975      LOG.trace("no online region found on " + serverName);
976    } else if (!isMetaLoaded()) {
977      // if we are still on startup, discard the report unless is from someone holding meta
978      checkOnlineRegionsReportForMeta(serverName, regionNames);
979    } else {
980      // The Heartbeat updates us of what regions are only. check and verify the state.
981      checkOnlineRegionsReport(serverNode, regionNames);
982    }
983
984    // wake report event
985    wakeServerReportEvent(serverNode);
986  }
987
988  void checkOnlineRegionsReportForMeta(final ServerName serverName, final Set<byte[]> regionNames) {
989    try {
990      for (byte[] regionName: regionNames) {
991        final RegionInfo hri = getMetaRegionFromName(regionName);
992        if (hri == null) {
993          if (LOG.isTraceEnabled()) {
994            LOG.trace("Skip online report for region={} while meta is loading from server={}",
995                Bytes.toStringBinary(regionName), serverName);
996          }
997          continue;
998        }
999
1000        final RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(hri);
1001        LOG.info("META REPORTED: " + regionNode);
1002        if (!reportTransition(regionNode, serverName, TransitionCode.OPENED, 0)) {
1003          LOG.warn("META REPORTED but no procedure found (complete?); set location={}", serverName);
1004          regionNode.setRegionLocation(serverName);
1005        } else if (LOG.isTraceEnabled()) {
1006          LOG.trace("META REPORTED: " + regionNode);
1007        }
1008      }
1009    } catch (UnexpectedStateException e) {
1010      LOG.warn("KILLING " + serverName + ": " + e.getMessage());
1011      killRegionServer(serverName);
1012    }
1013  }
1014
1015  void checkOnlineRegionsReport(final ServerStateNode serverNode, final Set<byte[]> regionNames) {
1016    final ServerName serverName = serverNode.getServerName();
1017    try {
1018      for (byte[] regionName: regionNames) {
1019        if (!isRunning()) return;
1020        final RegionStateNode regionNode = regionStates.getRegionStateNodeFromName(regionName);
1021        if (regionNode == null) {
1022          throw new UnexpectedStateException("Not online: " + Bytes.toStringBinary(regionName));
1023        }
1024        synchronized (regionNode) {
1025          if (regionNode.isInState(State.OPENING, State.OPEN)) {
1026            if (!regionNode.getRegionLocation().equals(serverName)) {
1027              throw new UnexpectedStateException(regionNode.toString() +
1028                " reported OPEN on server=" + serverName +
1029                " but state has otherwise.");
1030            } else if (regionNode.isInState(State.OPENING)) {
1031              try {
1032                if (!reportTransition(regionNode, serverNode.getServerName(),
1033                    TransitionCode.OPENED, 0)) {
1034                  LOG.warn(regionNode.toString() + " reported OPEN on server=" + serverName +
1035                    " but state has otherwise AND NO procedure is running");
1036                }
1037              } catch (UnexpectedStateException e) {
1038                LOG.warn("{} reported unexpteced OPEN: {} sever={}", regionNode.toString(),
1039                    e.getMessage(), serverName, e);
1040              }
1041            }
1042          } else if (!regionNode.isInState(State.CLOSING, State.SPLITTING)) {
1043            long diff = regionNode.getLastUpdate() - EnvironmentEdgeManager.currentTime();
1044            if (diff > 1000/*One Second... make configurable if an issue*/) {
1045              // So, we can get report that a region is CLOSED or SPLIT because a heartbeat
1046              // came in at about same time as a region transition. Make sure there is some
1047              // elapsed time between killing remote server.
1048              throw new UnexpectedStateException(regionNode.toString() +
1049                " reported an unexpected OPEN; time since last update=" + diff);
1050            }
1051          }
1052        }
1053      }
1054    } catch (UnexpectedStateException e) {
1055      //See HBASE-21421, we can count on reportRegionStateTransition calls
1056      //We only log a warming here. It could be a network lag.
1057      LOG.warn("Failed to checkOnlineRegionsReport, maybe due to network lag, "
1058          + "if this message continues, be careful of double assign. report from server={}",
1059          serverName, e);
1060    }
1061  }
1062
1063  protected boolean waitServerReportEvent(ServerName serverName, Procedure<?> proc) {
1064    ServerStateNode ssn = this.regionStates.getServerNode(serverName);
1065    if (ssn == null) {
1066      LOG.warn("Why is ServerStateNode for {} empty at this point? Creating...", serverName);
1067      ssn = this.regionStates.getOrCreateServer(serverName);
1068    }
1069    return ssn.getReportEvent().suspendIfNotReady(proc);
1070  }
1071
1072  protected void wakeServerReportEvent(final ServerStateNode serverNode) {
1073    serverNode.getReportEvent().wake(getProcedureScheduler());
1074  }
1075
1076  // ============================================================================================
1077  //  RIT chore
1078  // ============================================================================================
1079  private static class RegionInTransitionChore extends ProcedureInMemoryChore<MasterProcedureEnv> {
1080    public RegionInTransitionChore(final int timeoutMsec) {
1081      super(timeoutMsec);
1082    }
1083
1084    @Override
1085    protected void periodicExecute(final MasterProcedureEnv env) {
1086      final AssignmentManager am = env.getAssignmentManager();
1087
1088      final RegionInTransitionStat ritStat = am.computeRegionInTransitionStat();
1089      if (ritStat.hasRegionsOverThreshold()) {
1090        for (RegionState hri: ritStat.getRegionOverThreshold()) {
1091          am.handleRegionOverStuckWarningThreshold(hri.getRegion());
1092        }
1093      }
1094
1095      // update metrics
1096      am.updateRegionsInTransitionMetrics(ritStat);
1097    }
1098  }
1099
1100  public RegionInTransitionStat computeRegionInTransitionStat() {
1101    final RegionInTransitionStat rit = new RegionInTransitionStat(getConfiguration());
1102    rit.update(this);
1103    return rit;
1104  }
1105
1106  public static class RegionInTransitionStat {
1107    private final int ritThreshold;
1108
1109    private HashMap<String, RegionState> ritsOverThreshold = null;
1110    private long statTimestamp;
1111    private long oldestRITTime = 0;
1112    private int totalRITsTwiceThreshold = 0;
1113    private int totalRITs = 0;
1114
1115    @VisibleForTesting
1116    public RegionInTransitionStat(final Configuration conf) {
1117      this.ritThreshold =
1118        conf.getInt(METRICS_RIT_STUCK_WARNING_THRESHOLD, DEFAULT_RIT_STUCK_WARNING_THRESHOLD);
1119    }
1120
1121    public int getRITThreshold() {
1122      return ritThreshold;
1123    }
1124
1125    public long getTimestamp() {
1126      return statTimestamp;
1127    }
1128
1129    public int getTotalRITs() {
1130      return totalRITs;
1131    }
1132
1133    public long getOldestRITTime() {
1134      return oldestRITTime;
1135    }
1136
1137    public int getTotalRITsOverThreshold() {
1138      Map<String, RegionState> m = this.ritsOverThreshold;
1139      return m != null ? m.size() : 0;
1140    }
1141
1142    public boolean hasRegionsTwiceOverThreshold() {
1143      return totalRITsTwiceThreshold > 0;
1144    }
1145
1146    public boolean hasRegionsOverThreshold() {
1147      Map<String, RegionState> m = this.ritsOverThreshold;
1148      return m != null && !m.isEmpty();
1149    }
1150
1151    public Collection<RegionState> getRegionOverThreshold() {
1152      Map<String, RegionState> m = this.ritsOverThreshold;
1153      return m != null? m.values(): Collections.emptySet();
1154    }
1155
1156    public boolean isRegionOverThreshold(final RegionInfo regionInfo) {
1157      Map<String, RegionState> m = this.ritsOverThreshold;
1158      return m != null && m.containsKey(regionInfo.getEncodedName());
1159    }
1160
1161    public boolean isRegionTwiceOverThreshold(final RegionInfo regionInfo) {
1162      Map<String, RegionState> m = this.ritsOverThreshold;
1163      if (m == null) return false;
1164      final RegionState state = m.get(regionInfo.getEncodedName());
1165      if (state == null) return false;
1166      return (statTimestamp - state.getStamp()) > (ritThreshold * 2);
1167    }
1168
1169    protected void update(final AssignmentManager am) {
1170      final RegionStates regionStates = am.getRegionStates();
1171      this.statTimestamp = EnvironmentEdgeManager.currentTime();
1172      update(regionStates.getRegionsStateInTransition(), statTimestamp);
1173      update(regionStates.getRegionFailedOpen(), statTimestamp);
1174    }
1175
1176    private void update(final Collection<RegionState> regions, final long currentTime) {
1177      for (RegionState state: regions) {
1178        totalRITs++;
1179        final long ritTime = currentTime - state.getStamp();
1180        if (ritTime > ritThreshold) {
1181          if (ritsOverThreshold == null) {
1182            ritsOverThreshold = new HashMap<String, RegionState>();
1183          }
1184          ritsOverThreshold.put(state.getRegion().getEncodedName(), state);
1185          totalRITsTwiceThreshold += (ritTime > (ritThreshold * 2)) ? 1 : 0;
1186        }
1187        if (oldestRITTime < ritTime) {
1188          oldestRITTime = ritTime;
1189        }
1190      }
1191    }
1192  }
1193
1194  private void updateRegionsInTransitionMetrics(final RegionInTransitionStat ritStat) {
1195    metrics.updateRITOldestAge(ritStat.getOldestRITTime());
1196    metrics.updateRITCount(ritStat.getTotalRITs());
1197    metrics.updateRITCountOverThreshold(ritStat.getTotalRITsOverThreshold());
1198  }
1199
1200  private void handleRegionOverStuckWarningThreshold(final RegionInfo regionInfo) {
1201    final RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo);
1202    //if (regionNode.isStuck()) {
1203    LOG.warn("STUCK Region-In-Transition {}", regionNode);
1204  }
1205
1206  // ============================================================================================
1207  //  TODO: Master load/bootstrap
1208  // ============================================================================================
1209  public void joinCluster() throws IOException {
1210    long startTime = System.nanoTime();
1211    LOG.debug("Joining cluster...");
1212
1213    // Scan hbase:meta to build list of existing regions, servers, and assignment.
1214    // hbase:meta is online now or will be. Inside loadMeta, we keep trying. Can't make progress
1215    // w/o  meta.
1216    loadMeta();
1217
1218    while (master.getServerManager().countOfRegionServers() < 1) {
1219      LOG.info("Waiting for RegionServers to join; current count={}",
1220        master.getServerManager().countOfRegionServers());
1221      Threads.sleep(250);
1222    }
1223    LOG.info("Number of RegionServers={}", master.getServerManager().countOfRegionServers());
1224
1225    // Start the RIT chore
1226    master.getMasterProcedureExecutor().addChore(this.ritChore);
1227
1228    long costMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
1229    LOG.info("Joined the cluster in {}", StringUtils.humanTimeDiff(costMs));
1230  }
1231
1232  /**
1233   * Create assign procedure for offline regions.
1234   * Just follow the old processofflineServersWithOnlineRegions method. Since now we do not need to
1235   * deal with dead server any more, we only deal with the regions in OFFLINE state in this method.
1236   * And this is a bit strange, that for new regions, we will add it in CLOSED state instead of
1237   * OFFLINE state, and usually there will be a procedure to track them. The
1238   * processofflineServersWithOnlineRegions is a legacy from long ago, as things are going really
1239   * different now, maybe we do not need this method any more. Need to revisit later.
1240   */
1241  // Public so can be run by the Master as part of the startup. Needs hbase:meta to be online.
1242  // Needs to be done after the table state manager has been started.
1243  public void processOfflineRegions() {
1244    List<RegionInfo> offlineRegions = regionStates.getRegionStates().stream()
1245      .filter(RegionState::isOffline).filter(s -> isTableEnabled(s.getRegion().getTable()))
1246      .map(RegionState::getRegion).collect(Collectors.toList());
1247    if (!offlineRegions.isEmpty()) {
1248      master.getMasterProcedureExecutor().submitProcedures(
1249        master.getAssignmentManager().createRoundRobinAssignProcedures(offlineRegions));
1250    }
1251  }
1252
1253  private void loadMeta() throws IOException {
1254    // TODO: use a thread pool
1255    regionStateStore.visitMeta(new RegionStateStore.RegionStateVisitor() {
1256      @Override
1257      public void visitRegionState(Result result, final RegionInfo regionInfo, final State state,
1258          final ServerName regionLocation, final ServerName lastHost, final long openSeqNum) {
1259        if (state == null && regionLocation == null && lastHost == null &&
1260            openSeqNum == SequenceId.NO_SEQUENCE_ID) {
1261          // This is a row with nothing in it.
1262          LOG.warn("Skipping empty row={}", result);
1263          return;
1264        }
1265        State localState = state;
1266        if (localState == null) {
1267          // No region state column data in hbase:meta table! Are I doing a rolling upgrade from
1268          // hbase1 to hbase2? Am I restoring a SNAPSHOT or otherwise adding a region to hbase:meta?
1269          // In any of these cases, state is empty. For now, presume OFFLINE but there are probably
1270          // cases where we need to probe more to be sure this correct; TODO informed by experience.
1271          LOG.info(regionInfo.getEncodedName() + " regionState=null; presuming " + State.OFFLINE);
1272
1273          localState = State.OFFLINE;
1274        }
1275        final RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo);
1276        synchronized (regionNode) {
1277          if (!regionNode.isInTransition()) {
1278            regionNode.setState(localState);
1279            regionNode.setLastHost(lastHost);
1280            regionNode.setRegionLocation(regionLocation);
1281            regionNode.setOpenSeqNum(openSeqNum);
1282            if (localState == State.OPEN) {
1283              assert regionLocation != null : "found null region location for " + regionNode;
1284              regionStates.getOrCreateServer(regionNode.getRegionLocation());
1285              regionStates.addRegionToServer(regionNode);
1286            } else if (localState == State.OFFLINE || regionInfo.isOffline()) {
1287              regionStates.addToOfflineRegions(regionNode);
1288            } else if (localState == State.CLOSED && getTableStateManager().
1289                isTableState(regionNode.getTable(), TableState.State.DISABLED,
1290                TableState.State.DISABLING)) {
1291              // The region is CLOSED and the table is DISABLED/ DISABLING, there is nothing to
1292              // schedule; the region is inert.
1293            } else {
1294              // This is region in CLOSING or OPENING state.
1295              // These regions should have a procedure in replay.
1296              // If they don't, then they will show as STUCK after a while because of the below
1297              // registration and will need intervention by operator to fix. Add them to a server
1298              // even if the server is not around so a SCP cleans them up.
1299              if (regionLocation != null) {
1300                regionStates.getOrCreateServer(regionLocation);
1301              }
1302              regionStates.addRegionInTransition(regionNode, null);
1303            }
1304          } else {
1305            LOG.info("RIT {}", regionNode);
1306          }
1307        }
1308      }
1309    });
1310
1311    // every assignment is blocked until meta is loaded.
1312    wakeMetaLoadedEvent();
1313  }
1314
1315  /**
1316   * Used to check if the meta loading is done.
1317   * <p/>
1318   * if not we throw PleaseHoldException since we are rebuilding the RegionStates
1319   * @param hri region to check if it is already rebuild
1320   * @throws PleaseHoldException if meta has not been loaded yet
1321   */
1322  private void checkMetaLoaded(RegionInfo hri) throws PleaseHoldException {
1323    if (!isRunning()) {
1324      throw new PleaseHoldException("AssignmentManager not running");
1325    }
1326    boolean meta = isMetaRegion(hri);
1327    boolean metaLoaded = isMetaLoaded();
1328    if (!meta && !metaLoaded) {
1329      throw new PleaseHoldException(
1330        "Master not fully online; hbase:meta=" + meta + ", metaLoaded=" + metaLoaded);
1331    }
1332  }
1333
1334  // ============================================================================================
1335  //  TODO: Metrics
1336  // ============================================================================================
1337  public int getNumRegionsOpened() {
1338    // TODO: Used by TestRegionPlacement.java and assume monotonically increasing value
1339    return 0;
1340  }
1341
1342  public long submitServerCrash(final ServerName serverName, final boolean shouldSplitWal) {
1343    boolean carryingMeta = isCarryingMeta(serverName);
1344
1345    // Remove the in-memory rsReports result
1346    synchronized (rsReports) {
1347      rsReports.remove(serverName);
1348    }
1349
1350    ProcedureExecutor<MasterProcedureEnv> procExec = this.master.getMasterProcedureExecutor();
1351    long pid = procExec.submitProcedure(new ServerCrashProcedure(procExec.getEnvironment(),
1352        serverName, shouldSplitWal, carryingMeta));
1353    LOG.debug("Added=" + serverName
1354        + " to dead servers, submitted shutdown handler to be executed meta=" + carryingMeta);
1355    return pid;
1356  }
1357
1358  public void offlineRegion(final RegionInfo regionInfo) {
1359    // TODO used by MasterRpcServices ServerCrashProcedure
1360    final RegionStateNode node = regionStates.getRegionStateNode(regionInfo);
1361    if (node != null) node.offline();
1362  }
1363
1364  public void onlineRegion(final RegionInfo regionInfo, final ServerName serverName) {
1365    // TODO used by TestSplitTransactionOnCluster.java
1366  }
1367
1368  public Map<ServerName, List<RegionInfo>> getSnapShotOfAssignment(
1369      final Collection<RegionInfo> regions) {
1370    return regionStates.getSnapShotOfAssignment(regions);
1371  }
1372
1373  // ============================================================================================
1374  //  TODO: UTILS/HELPERS?
1375  // ============================================================================================
1376  /**
1377   * Used by the client (via master) to identify if all regions have the schema updates
1378   *
1379   * @param tableName
1380   * @return Pair indicating the status of the alter command (pending/total)
1381   * @throws IOException
1382   */
1383  public Pair<Integer, Integer> getReopenStatus(TableName tableName) {
1384    if (isTableDisabled(tableName)) return new Pair<Integer, Integer>(0, 0);
1385
1386    final List<RegionState> states = regionStates.getTableRegionStates(tableName);
1387    int ritCount = 0;
1388    for (RegionState regionState: states) {
1389      if (!regionState.isOpened() && !regionState.isSplit()) {
1390        ritCount++;
1391      }
1392    }
1393    return new Pair<Integer, Integer>(ritCount, states.size());
1394  }
1395
1396  // ============================================================================================
1397  //  TODO: Region State In Transition
1398  // ============================================================================================
1399  protected boolean addRegionInTransition(final RegionStateNode regionNode,
1400      final RegionTransitionProcedure procedure) {
1401    return regionStates.addRegionInTransition(regionNode, procedure);
1402  }
1403
1404  protected void removeRegionInTransition(final RegionStateNode regionNode,
1405      final RegionTransitionProcedure procedure) {
1406    regionStates.removeRegionInTransition(regionNode, procedure);
1407  }
1408
1409  public boolean hasRegionsInTransition() {
1410    return regionStates.hasRegionsInTransition();
1411  }
1412
1413  public List<RegionStateNode> getRegionsInTransition() {
1414    return regionStates.getRegionsInTransition();
1415  }
1416
1417  public List<RegionInfo> getAssignedRegions() {
1418    return regionStates.getAssignedRegions();
1419  }
1420
1421  public RegionInfo getRegionInfo(final byte[] regionName) {
1422    final RegionStateNode regionState = regionStates.getRegionStateNodeFromName(regionName);
1423    return regionState != null ? regionState.getRegionInfo() : null;
1424  }
1425
1426  // ============================================================================================
1427  //  TODO: Region Status update
1428  // ============================================================================================
1429  private void sendRegionOpenedNotification(final RegionInfo regionInfo,
1430      final ServerName serverName) {
1431    getBalancer().regionOnline(regionInfo, serverName);
1432    if (!this.listeners.isEmpty()) {
1433      for (AssignmentListener listener : this.listeners) {
1434        listener.regionOpened(regionInfo, serverName);
1435      }
1436    }
1437  }
1438
1439  private void sendRegionClosedNotification(final RegionInfo regionInfo) {
1440    getBalancer().regionOffline(regionInfo);
1441    if (!this.listeners.isEmpty()) {
1442      for (AssignmentListener listener : this.listeners) {
1443        listener.regionClosed(regionInfo);
1444      }
1445    }
1446  }
1447
1448  public void markRegionAsOpening(final RegionStateNode regionNode) throws IOException {
1449    synchronized (regionNode) {
1450      regionNode.transitionState(State.OPENING, RegionStates.STATES_EXPECTED_ON_OPEN);
1451      regionStates.addRegionToServer(regionNode);
1452      regionStateStore.updateRegionLocation(regionNode);
1453    }
1454
1455    // update the operation count metrics
1456    metrics.incrementOperationCounter();
1457  }
1458
1459  public void undoRegionAsOpening(final RegionStateNode regionNode) {
1460    boolean opening = false;
1461    synchronized (regionNode) {
1462      if (regionNode.isInState(State.OPENING)) {
1463        opening = true;
1464        regionStates.removeRegionFromServer(regionNode.getRegionLocation(), regionNode);
1465      }
1466      // Should we update hbase:meta?
1467    }
1468    if (opening) {
1469      // TODO: Metrics. Do opposite of metrics.incrementOperationCounter();
1470    }
1471  }
1472
1473  public void markRegionAsOpened(final RegionStateNode regionNode) throws IOException {
1474    final RegionInfo hri = regionNode.getRegionInfo();
1475    synchronized (regionNode) {
1476      regionNode.transitionState(State.OPEN, RegionStates.STATES_EXPECTED_ON_OPEN);
1477      if (isMetaRegion(hri)) {
1478        // Usually we'd set a table ENABLED at this stage but hbase:meta is ALWAYs enabled, it
1479        // can't be disabled -- so skip the RPC (besides... enabled is managed by TableStateManager
1480        // which is backed by hbase:meta... Avoid setting ENABLED to avoid having to update state
1481        // on table that contains state.
1482        setMetaAssigned(hri, true);
1483      }
1484      regionStates.addRegionToServer(regionNode);
1485      // TODO: OPENING Updates hbase:meta too... we need to do both here and there?
1486      // That is a lot of hbase:meta writing.
1487      regionStateStore.updateRegionLocation(regionNode);
1488      sendRegionOpenedNotification(hri, regionNode.getRegionLocation());
1489    }
1490  }
1491
1492  public void markRegionAsClosing(final RegionStateNode regionNode) throws IOException {
1493    final RegionInfo hri = regionNode.getRegionInfo();
1494    synchronized (regionNode) {
1495      regionNode.transitionState(State.CLOSING, RegionStates.STATES_EXPECTED_ON_CLOSE);
1496      // Set meta has not initialized early. so people trying to create/edit tables will wait
1497      if (isMetaRegion(hri)) {
1498        setMetaAssigned(hri, false);
1499      }
1500      regionStates.addRegionToServer(regionNode);
1501      regionStateStore.updateRegionLocation(regionNode);
1502    }
1503
1504    // update the operation count metrics
1505    metrics.incrementOperationCounter();
1506  }
1507
1508  public void undoRegionAsClosing(final RegionStateNode regionNode) {
1509    // TODO: Metrics. Do opposite of metrics.incrementOperationCounter();
1510    // There is nothing to undo?
1511  }
1512
1513  public void markRegionAsClosed(final RegionStateNode regionNode) throws IOException {
1514    final RegionInfo hri = regionNode.getRegionInfo();
1515    synchronized (regionNode) {
1516      regionNode.transitionState(State.CLOSED, RegionStates.STATES_EXPECTED_ON_CLOSE);
1517      regionStates.removeRegionFromServer(regionNode.getRegionLocation(), regionNode);
1518      regionNode.setLastHost(regionNode.getRegionLocation());
1519      regionNode.setRegionLocation(null);
1520      regionStateStore.updateRegionLocation(regionNode);
1521      sendRegionClosedNotification(hri);
1522    }
1523  }
1524
1525  public void markRegionAsSplit(final RegionInfo parent, final ServerName serverName,
1526      final RegionInfo daughterA, final RegionInfo daughterB) throws IOException {
1527    // Update hbase:meta. Parent will be marked offline and split up in hbase:meta.
1528    // The parent stays in regionStates until cleared when removed by CatalogJanitor.
1529    // Update its state in regionStates to it shows as offline and split when read
1530    // later figuring what regions are in a table and what are not: see
1531    // regionStates#getRegionsOfTable
1532    final RegionStateNode node = regionStates.getOrCreateRegionStateNode(parent);
1533    node.setState(State.SPLIT);
1534    final RegionStateNode nodeA = regionStates.getOrCreateRegionStateNode(daughterA);
1535    nodeA.setState(State.SPLITTING_NEW);
1536    final RegionStateNode nodeB = regionStates.getOrCreateRegionStateNode(daughterB);
1537    nodeB.setState(State.SPLITTING_NEW);
1538
1539    regionStateStore.splitRegion(parent, daughterA, daughterB, serverName);
1540    if (shouldAssignFavoredNodes(parent)) {
1541      List<ServerName> onlineServers = this.master.getServerManager().getOnlineServersList();
1542      ((FavoredNodesPromoter)getBalancer()).
1543          generateFavoredNodesForDaughter(onlineServers, parent, daughterA, daughterB);
1544    }
1545  }
1546
1547  /**
1548   * When called here, the merge has happened. The merged regions have been
1549   * unassigned and the above markRegionClosed has been called on each so they have been
1550   * disassociated from a hosting Server. The merged region will be open after this call. The
1551   * merged regions are removed from hbase:meta below. Later they are deleted from the filesystem
1552   * by the catalog janitor running against hbase:meta. It notices when the merged region no
1553   * longer holds references to the old regions (References are deleted after a compaction
1554   * rewrites what the Reference points at but not until the archiver chore runs, are the
1555   * References removed).
1556   */
1557  public void markRegionAsMerged(final RegionInfo child, final ServerName serverName,
1558        RegionInfo [] mergeParents)
1559      throws IOException {
1560    final RegionStateNode node = regionStates.getOrCreateRegionStateNode(child);
1561    node.setState(State.MERGED);
1562    for (RegionInfo ri: mergeParents) {
1563      regionStates.deleteRegion(ri);
1564
1565    }
1566    regionStateStore.mergeRegions(child, mergeParents, serverName);
1567    if (shouldAssignFavoredNodes(child)) {
1568      ((FavoredNodesPromoter)getBalancer()).
1569        generateFavoredNodesForMergedRegion(child, mergeParents);
1570    }
1571  }
1572
1573  /*
1574   * Favored nodes should be applied only when FavoredNodes balancer is configured and the region
1575   * belongs to a non-system table.
1576   */
1577  private boolean shouldAssignFavoredNodes(RegionInfo region) {
1578    return this.shouldAssignRegionsWithFavoredNodes &&
1579        FavoredNodesManager.isFavoredNodeApplicable(region);
1580  }
1581
1582  // ============================================================================================
1583  //  Assign Queue (Assign/Balance)
1584  // ============================================================================================
1585  private final ArrayList<RegionStateNode> pendingAssignQueue = new ArrayList<RegionStateNode>();
1586  private final ReentrantLock assignQueueLock = new ReentrantLock();
1587  private final Condition assignQueueFullCond = assignQueueLock.newCondition();
1588
1589  /**
1590   * Add the assign operation to the assignment queue.
1591   * The pending assignment operation will be processed,
1592   * and each region will be assigned by a server using the balancer.
1593   */
1594  protected void queueAssign(final RegionStateNode regionNode) {
1595    regionNode.getProcedureEvent().suspend();
1596
1597    // TODO: quick-start for meta and the other sys-tables?
1598    assignQueueLock.lock();
1599    try {
1600      pendingAssignQueue.add(regionNode);
1601      if (regionNode.isSystemTable() ||
1602          pendingAssignQueue.size() == 1 ||
1603          pendingAssignQueue.size() >= assignDispatchWaitQueueMaxSize) {
1604        assignQueueFullCond.signal();
1605      }
1606    } finally {
1607      assignQueueLock.unlock();
1608    }
1609  }
1610
1611  private void startAssignmentThread() {
1612    // Get Server Thread name. Sometimes the Server is mocked so may not implement HasThread.
1613    // For example, in tests.
1614    String name = master instanceof HasThread? ((HasThread)master).getName():
1615        master.getServerName().toShortString();
1616    assignThread = new Thread(name) {
1617      @Override
1618      public void run() {
1619        while (isRunning()) {
1620          processAssignQueue();
1621        }
1622        pendingAssignQueue.clear();
1623      }
1624    };
1625    assignThread.setDaemon(true);
1626    assignThread.start();
1627  }
1628
1629  private void stopAssignmentThread() {
1630    assignQueueSignal();
1631    try {
1632      while (assignThread.isAlive()) {
1633        assignQueueSignal();
1634        assignThread.join(250);
1635      }
1636    } catch (InterruptedException e) {
1637      LOG.warn("join interrupted", e);
1638      Thread.currentThread().interrupt();
1639    }
1640  }
1641
1642  private void assignQueueSignal() {
1643    assignQueueLock.lock();
1644    try {
1645      assignQueueFullCond.signal();
1646    } finally {
1647      assignQueueLock.unlock();
1648    }
1649  }
1650
1651  @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP")
1652  private HashMap<RegionInfo, RegionStateNode> waitOnAssignQueue() {
1653    HashMap<RegionInfo, RegionStateNode> regions = null;
1654
1655    assignQueueLock.lock();
1656    try {
1657      if (pendingAssignQueue.isEmpty() && isRunning()) {
1658        assignQueueFullCond.await();
1659      }
1660
1661      if (!isRunning()) return null;
1662      assignQueueFullCond.await(assignDispatchWaitMillis, TimeUnit.MILLISECONDS);
1663      regions = new HashMap<RegionInfo, RegionStateNode>(pendingAssignQueue.size());
1664      for (RegionStateNode regionNode: pendingAssignQueue) {
1665        regions.put(regionNode.getRegionInfo(), regionNode);
1666      }
1667      pendingAssignQueue.clear();
1668    } catch (InterruptedException e) {
1669      LOG.warn("got interrupted ", e);
1670      Thread.currentThread().interrupt();
1671    } finally {
1672      assignQueueLock.unlock();
1673    }
1674    return regions;
1675  }
1676
1677  private void processAssignQueue() {
1678    final HashMap<RegionInfo, RegionStateNode> regions = waitOnAssignQueue();
1679    if (regions == null || regions.size() == 0 || !isRunning()) {
1680      return;
1681    }
1682
1683    if (LOG.isTraceEnabled()) {
1684      LOG.trace("PROCESS ASSIGN QUEUE regionCount=" + regions.size());
1685    }
1686
1687    // TODO: Optimize balancer. pass a RegionPlan?
1688    final HashMap<RegionInfo, ServerName> retainMap = new HashMap<>();
1689    final List<RegionInfo> userHRIs = new ArrayList<>(regions.size());
1690    // Regions for system tables requiring reassignment
1691    final List<RegionInfo> systemHRIs = new ArrayList<>();
1692    for (RegionStateNode regionStateNode: regions.values()) {
1693      boolean sysTable = regionStateNode.isSystemTable();
1694      final List<RegionInfo> hris = sysTable? systemHRIs: userHRIs;
1695      if (regionStateNode.getRegionLocation() != null) {
1696        retainMap.put(regionStateNode.getRegionInfo(), regionStateNode.getRegionLocation());
1697      } else {
1698        hris.add(regionStateNode.getRegionInfo());
1699      }
1700    }
1701
1702    // TODO: connect with the listener to invalidate the cache
1703
1704    // TODO use events
1705    List<ServerName> servers = master.getServerManager().createDestinationServersList();
1706    for (int i = 0; servers.size() < 1; ++i) {
1707      // Report every fourth time around this loop; try not to flood log.
1708      if (i % 4 == 0) {
1709        LOG.warn("No servers available; cannot place " + regions.size() + " unassigned regions.");
1710      }
1711
1712      if (!isRunning()) {
1713        LOG.debug("Stopped! Dropping assign of " + regions.size() + " queued regions.");
1714        return;
1715      }
1716      Threads.sleep(250);
1717      servers = master.getServerManager().createDestinationServersList();
1718    }
1719
1720    if (!systemHRIs.isEmpty()) {
1721      // System table regions requiring reassignment are present, get region servers
1722      // not available for system table regions
1723      final List<ServerName> excludeServers = getExcludedServersForSystemTable();
1724      List<ServerName> serversForSysTables = servers.stream()
1725          .filter(s -> !excludeServers.contains(s)).collect(Collectors.toList());
1726      if (serversForSysTables.isEmpty()) {
1727        LOG.warn("Filtering old server versions and the excluded produced an empty set; " +
1728            "instead considering all candidate servers!");
1729      }
1730      LOG.debug("Processing assignQueue; systemServersCount=" + serversForSysTables.size() +
1731          ", allServersCount=" + servers.size());
1732      processAssignmentPlans(regions, null, systemHRIs,
1733          serversForSysTables.isEmpty()? servers: serversForSysTables);
1734    }
1735
1736    processAssignmentPlans(regions, retainMap, userHRIs, servers);
1737  }
1738
1739  private void processAssignmentPlans(final HashMap<RegionInfo, RegionStateNode> regions,
1740      final HashMap<RegionInfo, ServerName> retainMap, final List<RegionInfo> hris,
1741      final List<ServerName> servers) {
1742    boolean isTraceEnabled = LOG.isTraceEnabled();
1743    if (isTraceEnabled) {
1744      LOG.trace("Available servers count=" + servers.size() + ": " + servers);
1745    }
1746
1747    final LoadBalancer balancer = getBalancer();
1748    // ask the balancer where to place regions
1749    if (retainMap != null && !retainMap.isEmpty()) {
1750      if (isTraceEnabled) {
1751        LOG.trace("retain assign regions=" + retainMap);
1752      }
1753      try {
1754        acceptPlan(regions, balancer.retainAssignment(retainMap, servers));
1755      } catch (HBaseIOException e) {
1756        LOG.warn("unable to retain assignment", e);
1757        addToPendingAssignment(regions, retainMap.keySet());
1758      }
1759    }
1760
1761    // TODO: Do we need to split retain and round-robin?
1762    // the retain seems to fallback to round-robin/random if the region is not in the map.
1763    if (!hris.isEmpty()) {
1764      Collections.sort(hris, RegionInfo.COMPARATOR);
1765      if (isTraceEnabled) {
1766        LOG.trace("round robin regions=" + hris);
1767      }
1768      try {
1769        acceptPlan(regions, balancer.roundRobinAssignment(hris, servers));
1770      } catch (HBaseIOException e) {
1771        LOG.warn("unable to round-robin assignment", e);
1772        addToPendingAssignment(regions, hris);
1773      }
1774    }
1775  }
1776
1777  private void acceptPlan(final HashMap<RegionInfo, RegionStateNode> regions,
1778      final Map<ServerName, List<RegionInfo>> plan) throws HBaseIOException {
1779    final ProcedureEvent<?>[] events = new ProcedureEvent[regions.size()];
1780    final long st = System.currentTimeMillis();
1781
1782    if (plan == null) {
1783      throw new HBaseIOException("unable to compute plans for regions=" + regions.size());
1784    }
1785
1786    if (plan.isEmpty()) return;
1787
1788    List<RegionInfo> bogusRegions = plan.remove(LoadBalancer.BOGUS_SERVER_NAME);
1789    if (bogusRegions != null && !bogusRegions.isEmpty()) {
1790      addToPendingAssignment(regions, bogusRegions);
1791    }
1792
1793    int evcount = 0;
1794    for (Map.Entry<ServerName, List<RegionInfo>> entry: plan.entrySet()) {
1795      final ServerName server = entry.getKey();
1796      for (RegionInfo hri: entry.getValue()) {
1797        final RegionStateNode regionNode = regions.get(hri);
1798        regionNode.setRegionLocation(server);
1799        events[evcount++] = regionNode.getProcedureEvent();
1800      }
1801    }
1802    ProcedureEvent.wakeEvents(getProcedureScheduler(), events);
1803
1804    final long et = System.currentTimeMillis();
1805    if (LOG.isTraceEnabled()) {
1806      LOG.trace("ASSIGN ACCEPT " + events.length + " -> " +
1807          StringUtils.humanTimeDiff(et - st));
1808    }
1809  }
1810
1811  private void addToPendingAssignment(final HashMap<RegionInfo, RegionStateNode> regions,
1812      final Collection<RegionInfo> pendingRegions) {
1813    assignQueueLock.lock();
1814    try {
1815      for (RegionInfo hri: pendingRegions) {
1816        pendingAssignQueue.add(regions.get(hri));
1817      }
1818    } finally {
1819      assignQueueLock.unlock();
1820    }
1821  }
1822
1823  /**
1824   * Get a list of servers that this region cannot be assigned to.
1825   * For system tables, we must assign them to a server with highest version.
1826   */
1827  public List<ServerName> getExcludedServersForSystemTable() {
1828    // TODO: This should be a cached list kept by the ServerManager rather than calculated on each
1829    // move or system region assign. The RegionServerTracker keeps list of online Servers with
1830    // RegionServerInfo that includes Version.
1831    List<Pair<ServerName, String>> serverList = master.getServerManager().getOnlineServersList()
1832        .stream()
1833        .map((s)->new Pair<>(s, master.getRegionServerVersion(s)))
1834        .collect(Collectors.toList());
1835    if (serverList.isEmpty()) {
1836      return Collections.emptyList();
1837    }
1838    String highestVersion = Collections.max(serverList,
1839        (o1, o2) -> VersionInfo.compareVersion(o1.getSecond(), o2.getSecond())).getSecond();
1840    return serverList.stream()
1841        .filter((p)->!p.getSecond().equals(highestVersion))
1842        .map(Pair::getFirst)
1843        .collect(Collectors.toList());
1844  }
1845
1846  // ============================================================================================
1847  //  Server Helpers
1848  // ============================================================================================
1849  @Override
1850  public void serverAdded(final ServerName serverName) {
1851  }
1852
1853  @Override
1854  public void serverRemoved(final ServerName serverName) {
1855    final ServerStateNode serverNode = regionStates.getServerNode(serverName);
1856    if (serverNode == null) return;
1857
1858    // just in case, wake procedures waiting for this server report
1859    wakeServerReportEvent(serverNode);
1860  }
1861
1862  private void killRegionServer(final ServerName serverName) {
1863    master.getServerManager().expireServer(serverName);
1864  }
1865
1866  /**
1867   * <p>
1868   * This is a very particular check. The {@link org.apache.hadoop.hbase.master.ServerManager} is
1869   * where you go to check on state of 'Servers', what Servers are online, etc.
1870   * </p>
1871   * <p>
1872   * Here we are checking the state of a server that is post expiration, a ServerManager function
1873   * that moves a server from online to dead. Here we are seeing if the server has moved beyond a
1874   * particular point in the recovery process such that it is safe to move on with assigns; etc.
1875   * </p>
1876   * <p>
1877   * For now it is only used in
1878   * {@link UnassignProcedure#remoteCallFailed(MasterProcedureEnv, RegionStateNode, IOException)} to
1879   * see whether we can safely quit without losing data.
1880   * </p>
1881   * @param meta whether to check for meta log splitting
1882   * @return {@code true} if the server does not exist or the log splitting is done, i.e, the server
1883   *         is in OFFLINE state, or for meta log, is in SPLITTING_META_DONE state. If null,
1884   *         presumes the ServerStateNode was cleaned up by SCP.
1885   * @see UnassignProcedure#remoteCallFailed(MasterProcedureEnv, RegionStateNode, IOException)
1886   */
1887  boolean isLogSplittingDone(ServerName serverName, boolean meta) {
1888    ServerStateNode ssn = this.regionStates.getServerNode(serverName);
1889    if (ssn == null) {
1890      return true;
1891    }
1892    ServerState[] inState =
1893      meta
1894        ? new ServerState[] { ServerState.SPLITTING_META_DONE, ServerState.SPLITTING,
1895          ServerState.OFFLINE }
1896        : new ServerState[] { ServerState.OFFLINE };
1897    synchronized (ssn) {
1898      return ssn.isInState(inState);
1899    }
1900  }
1901
1902  @VisibleForTesting
1903  MasterServices getMaster() {
1904    return master;
1905  }
1906
1907  /**
1908   * @return a snapshot of rsReports
1909   */
1910  public Map<ServerName, Set<byte[]>> getRSReports() {
1911    Map<ServerName, Set<byte[]>> rsReportsSnapshot = new HashMap<>();
1912    synchronized (rsReports) {
1913      rsReports.entrySet().forEach(e -> rsReportsSnapshot.put(e.getKey(), e.getValue()));
1914    }
1915    return rsReportsSnapshot;
1916  }
1917
1918  /**
1919   * Provide regions state count for given table.
1920   * e.g howmany regions of give table are opened/closed/rit etc
1921   *
1922   * @param tableName TableName
1923   * @return region states count
1924   */
1925  public RegionStatesCount getRegionStatesCount(TableName tableName) {
1926    int openRegionsCount = 0;
1927    int closedRegionCount = 0;
1928    int ritCount = 0;
1929    int splitRegionCount = 0;
1930    int totalRegionCount = 0;
1931    if (!isTableDisabled(tableName)) {
1932      final List<RegionState> states = regionStates.getTableRegionStates(tableName);
1933      for (RegionState regionState : states) {
1934        if (regionState.isOpened()) {
1935          openRegionsCount++;
1936        } else if (regionState.isClosed()) {
1937          closedRegionCount++;
1938        } else if (regionState.isSplit()) {
1939          splitRegionCount++;
1940        }
1941      }
1942      totalRegionCount = states.size();
1943      ritCount = totalRegionCount - openRegionsCount - splitRegionCount;
1944    }
1945    return new RegionStatesCount.RegionStatesCountBuilder()
1946      .setOpenRegions(openRegionsCount)
1947      .setClosedRegions(closedRegionCount)
1948      .setSplitRegions(splitRegionCount)
1949      .setRegionsInTransition(ritCount)
1950      .setTotalRegions(totalRegionCount)
1951      .build();
1952  }
1953
1954}