001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.assignment;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Collection;
023import java.util.Collections;
024import java.util.HashMap;
025import java.util.HashSet;
026import java.util.List;
027import java.util.Map;
028import java.util.Set;
029import java.util.concurrent.Future;
030import java.util.concurrent.TimeUnit;
031import java.util.concurrent.atomic.AtomicBoolean;
032import java.util.concurrent.locks.Condition;
033import java.util.concurrent.locks.ReentrantLock;
034import java.util.stream.Collectors;
035import java.util.stream.Stream;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.hbase.CatalogFamilyFormat;
038import org.apache.hadoop.hbase.DoNotRetryIOException;
039import org.apache.hadoop.hbase.HBaseIOException;
040import org.apache.hadoop.hbase.HConstants;
041import org.apache.hadoop.hbase.PleaseHoldException;
042import org.apache.hadoop.hbase.ServerName;
043import org.apache.hadoop.hbase.TableName;
044import org.apache.hadoop.hbase.UnknownRegionException;
045import org.apache.hadoop.hbase.client.DoNotRetryRegionException;
046import org.apache.hadoop.hbase.client.MasterSwitchType;
047import org.apache.hadoop.hbase.client.RegionInfo;
048import org.apache.hadoop.hbase.client.RegionInfoBuilder;
049import org.apache.hadoop.hbase.client.RegionReplicaUtil;
050import org.apache.hadoop.hbase.client.RegionStatesCount;
051import org.apache.hadoop.hbase.client.Result;
052import org.apache.hadoop.hbase.client.ResultScanner;
053import org.apache.hadoop.hbase.client.Scan;
054import org.apache.hadoop.hbase.client.TableDescriptor;
055import org.apache.hadoop.hbase.client.TableState;
056import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
057import org.apache.hadoop.hbase.favored.FavoredNodesManager;
058import org.apache.hadoop.hbase.favored.FavoredNodesPromoter;
059import org.apache.hadoop.hbase.master.LoadBalancer;
060import org.apache.hadoop.hbase.master.MasterServices;
061import org.apache.hadoop.hbase.master.MetricsAssignmentManager;
062import org.apache.hadoop.hbase.master.RegionPlan;
063import org.apache.hadoop.hbase.master.RegionState;
064import org.apache.hadoop.hbase.master.RegionState.State;
065import org.apache.hadoop.hbase.master.ServerManager;
066import org.apache.hadoop.hbase.master.TableStateManager;
067import org.apache.hadoop.hbase.master.balancer.FavoredStochasticBalancer;
068import org.apache.hadoop.hbase.master.procedure.HBCKServerCrashProcedure;
069import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
070import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler;
071import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait;
072import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
073import org.apache.hadoop.hbase.master.region.MasterRegion;
074import org.apache.hadoop.hbase.procedure2.Procedure;
075import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
076import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
077import org.apache.hadoop.hbase.procedure2.ProcedureInMemoryChore;
078import org.apache.hadoop.hbase.procedure2.util.StringUtils;
079import org.apache.hadoop.hbase.regionserver.SequenceId;
080import org.apache.hadoop.hbase.rsgroup.RSGroupBasedLoadBalancer;
081import org.apache.hadoop.hbase.util.Bytes;
082import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
083import org.apache.hadoop.hbase.util.Pair;
084import org.apache.hadoop.hbase.util.Threads;
085import org.apache.hadoop.hbase.util.VersionInfo;
086import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
087import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
088import org.apache.yetus.audience.InterfaceAudience;
089import org.apache.zookeeper.KeeperException;
090import org.slf4j.Logger;
091import org.slf4j.LoggerFactory;
092
093import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
094import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
095import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
096import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
097import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
098
099/**
100 * The AssignmentManager is the coordinator for region assign/unassign operations.
101 * <ul>
102 * <li>In-memory states of regions and servers are stored in {@link RegionStates}.</li>
103 * <li>hbase:meta state updates are handled by {@link RegionStateStore}.</li>
104 * </ul>
105 * Regions are created by CreateTable, Split, Merge.
106 * Regions are deleted by DeleteTable, Split, Merge.
107 * Assigns are triggered by CreateTable, EnableTable, Split, Merge, ServerCrash.
108 * Unassigns are triggered by DisableTable, Split, Merge
109 */
110@InterfaceAudience.Private
111public class AssignmentManager {
112  private static final Logger LOG = LoggerFactory.getLogger(AssignmentManager.class);
113
114  // TODO: AMv2
115  //  - handle region migration from hbase1 to hbase2.
116  //  - handle sys table assignment first (e.g. acl, namespace)
117  //  - handle table priorities
118  //  - If ServerBusyException trying to update hbase:meta, we abort the Master
119  //   See updateRegionLocation in RegionStateStore.
120  //
121  // See also
122  // https://docs.google.com/document/d/1eVKa7FHdeoJ1-9o8yZcOTAQbv0u0bblBlCCzVSIn69g/edit#heading=h.ystjyrkbtoq5
123  // for other TODOs.
124
125  public static final String BOOTSTRAP_THREAD_POOL_SIZE_CONF_KEY =
126      "hbase.assignment.bootstrap.thread.pool.size";
127
128  public static final String ASSIGN_DISPATCH_WAIT_MSEC_CONF_KEY =
129      "hbase.assignment.dispatch.wait.msec";
130  private static final int DEFAULT_ASSIGN_DISPATCH_WAIT_MSEC = 150;
131
132  public static final String ASSIGN_DISPATCH_WAITQ_MAX_CONF_KEY =
133      "hbase.assignment.dispatch.wait.queue.max.size";
134  private static final int DEFAULT_ASSIGN_DISPATCH_WAITQ_MAX = 100;
135
136  public static final String RIT_CHORE_INTERVAL_MSEC_CONF_KEY =
137      "hbase.assignment.rit.chore.interval.msec";
138  private static final int DEFAULT_RIT_CHORE_INTERVAL_MSEC = 60 * 1000;
139
140  public static final String DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC_CONF_KEY =
141      "hbase.assignment.dead.region.metric.chore.interval.msec";
142  private static final int DEFAULT_DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC = 120 * 1000;
143
144  public static final String ASSIGN_MAX_ATTEMPTS =
145      "hbase.assignment.maximum.attempts";
146  private static final int DEFAULT_ASSIGN_MAX_ATTEMPTS = Integer.MAX_VALUE;
147
148  public static final String ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS =
149      "hbase.assignment.retry.immediately.maximum.attempts";
150  private static final int DEFAULT_ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS = 3;
151
152  /** Region in Transition metrics threshold time */
153  public static final String METRICS_RIT_STUCK_WARNING_THRESHOLD =
154      "hbase.metrics.rit.stuck.warning.threshold";
155  private static final int DEFAULT_RIT_STUCK_WARNING_THRESHOLD = 60 * 1000;
156  public static final String UNEXPECTED_STATE_REGION = "Unexpected state for ";
157
158  private final ProcedureEvent<?> metaAssignEvent = new ProcedureEvent<>("meta assign");
159  private final ProcedureEvent<?> metaLoadEvent = new ProcedureEvent<>("meta load");
160
161  private final MetricsAssignmentManager metrics;
162  private final RegionInTransitionChore ritChore;
163  private final DeadServerMetricRegionChore deadMetricChore;
164  private final MasterServices master;
165
166  private final AtomicBoolean running = new AtomicBoolean(false);
167  private final RegionStates regionStates = new RegionStates();
168  private final RegionStateStore regionStateStore;
169
170  /**
171   * When the operator uses this configuration option, any version between
172   * the current cluster version and the value of "hbase.min.version.move.system.tables"
173   * does not trigger any auto-region movement. Auto-region movement here
174   * refers to auto-migration of system table regions to newer server versions.
175   * It is assumed that the configured range of versions does not require special
176   * handling of moving system table regions to higher versioned RegionServer.
177   * This auto-migration is done by {@link #checkIfShouldMoveSystemRegionAsync()}.
178   * Example: Let's assume the cluster is on version 1.4.0 and we have
179   * set "hbase.min.version.move.system.tables" as "2.0.0". Now if we upgrade
180   * one RegionServer on 1.4.0 cluster to 1.6.0 (< 2.0.0), then AssignmentManager will
181   * not move hbase:meta, hbase:namespace and other system table regions
182   * to newly brought up RegionServer 1.6.0 as part of auto-migration.
183   * However, if we upgrade one RegionServer on 1.4.0 cluster to 2.2.0 (> 2.0.0),
184   * then AssignmentManager will move all system table regions to newly brought
185   * up RegionServer 2.2.0 as part of auto-migration done by
186   * {@link #checkIfShouldMoveSystemRegionAsync()}.
187   * "hbase.min.version.move.system.tables" is introduced as part of HBASE-22923.
188   */
189  private final String minVersionToMoveSysTables;
190
191  private static final String MIN_VERSION_MOVE_SYS_TABLES_CONFIG =
192    "hbase.min.version.move.system.tables";
193  private static final String DEFAULT_MIN_VERSION_MOVE_SYS_TABLES_CONFIG = "";
194
195  private final Map<ServerName, Set<byte[]>> rsReports = new HashMap<>();
196
197  private final boolean shouldAssignRegionsWithFavoredNodes;
198  private final int assignDispatchWaitQueueMaxSize;
199  private final int assignDispatchWaitMillis;
200  private final int assignMaxAttempts;
201  private final int assignRetryImmediatelyMaxAttempts;
202
203  private final MasterRegion masterRegion;
204
205  private final Object checkIfShouldMoveSystemRegionLock = new Object();
206
207  private Thread assignThread;
208
209  public AssignmentManager(MasterServices master, MasterRegion masterRegion) {
210    this(master, masterRegion, new RegionStateStore(master, masterRegion));
211  }
212
213  AssignmentManager(MasterServices master, MasterRegion masterRegion, RegionStateStore stateStore) {
214    this.master = master;
215    this.regionStateStore = stateStore;
216    this.metrics = new MetricsAssignmentManager();
217    this.masterRegion = masterRegion;
218
219    final Configuration conf = master.getConfiguration();
220
221    // Only read favored nodes if using the favored nodes load balancer.
222    this.shouldAssignRegionsWithFavoredNodes = FavoredStochasticBalancer.class.isAssignableFrom(
223        conf.getClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, Object.class));
224
225    this.assignDispatchWaitMillis = conf.getInt(ASSIGN_DISPATCH_WAIT_MSEC_CONF_KEY,
226        DEFAULT_ASSIGN_DISPATCH_WAIT_MSEC);
227    this.assignDispatchWaitQueueMaxSize = conf.getInt(ASSIGN_DISPATCH_WAITQ_MAX_CONF_KEY,
228        DEFAULT_ASSIGN_DISPATCH_WAITQ_MAX);
229
230    this.assignMaxAttempts = Math.max(1, conf.getInt(ASSIGN_MAX_ATTEMPTS,
231        DEFAULT_ASSIGN_MAX_ATTEMPTS));
232    this.assignRetryImmediatelyMaxAttempts = conf.getInt(ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS,
233        DEFAULT_ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS);
234
235    int ritChoreInterval = conf.getInt(RIT_CHORE_INTERVAL_MSEC_CONF_KEY,
236        DEFAULT_RIT_CHORE_INTERVAL_MSEC);
237    this.ritChore = new RegionInTransitionChore(ritChoreInterval);
238
239    int deadRegionChoreInterval = conf.getInt(DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC_CONF_KEY,
240        DEFAULT_DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC);
241    if (deadRegionChoreInterval > 0) {
242      this.deadMetricChore = new DeadServerMetricRegionChore(deadRegionChoreInterval);
243    } else {
244      this.deadMetricChore = null;
245    }
246    minVersionToMoveSysTables = conf.get(MIN_VERSION_MOVE_SYS_TABLES_CONFIG,
247      DEFAULT_MIN_VERSION_MOVE_SYS_TABLES_CONFIG);
248  }
249
250  private void mirrorMetaLocations() throws IOException, KeeperException {
251    // For compatibility, mirror the meta region state to zookeeper
252    // And we still need to use zookeeper to publish the meta region locations to region
253    // server, so they can serve as ClientMetaService
254    ZKWatcher zk = master.getZooKeeper();
255    if (zk == null || !zk.getRecoverableZooKeeper().getState().isAlive()) {
256      // this is possible in tests, we do not provide a zk watcher or the zk watcher has been closed
257      return;
258    }
259    Collection<RegionStateNode> metaStates = regionStates.getRegionStateNodes();
260    for (RegionStateNode metaState : metaStates) {
261      MetaTableLocator.setMetaLocation(zk, metaState.getRegionLocation(),
262        metaState.getRegionInfo().getReplicaId(), metaState.getState());
263    }
264    int replicaCount = metaStates.size();
265    // remove extra mirror locations
266    for (String znode : zk.getMetaReplicaNodes()) {
267      int replicaId = zk.getZNodePaths().getMetaReplicaIdFromZNode(znode);
268      if (replicaId >= replicaCount) {
269        MetaTableLocator.deleteMetaLocation(zk, replicaId);
270      }
271    }
272  }
273
274  public void start() throws IOException, KeeperException {
275    if (!running.compareAndSet(false, true)) {
276      return;
277    }
278
279    LOG.trace("Starting assignment manager");
280
281    // Start the Assignment Thread
282    startAssignmentThread();
283    // load meta region states.
284    // here we are still in the early steps of active master startup. There is only one thread(us)
285    // can access AssignmentManager and create region node, so here we do not need to lock the
286    // region node.
287    try (ResultScanner scanner =
288      masterRegion.getScanner(new Scan().addFamily(HConstants.CATALOG_FAMILY))) {
289      for (;;) {
290        Result result = scanner.next();
291        if (result == null) {
292          break;
293        }
294        RegionStateStore
295          .visitMetaEntry((r, regionInfo, state, regionLocation, lastHost, openSeqNum) -> {
296            RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo);
297            regionNode.setState(state);
298            regionNode.setLastHost(lastHost);
299            regionNode.setRegionLocation(regionLocation);
300            regionNode.setOpenSeqNum(openSeqNum);
301            if (regionNode.getProcedure() != null) {
302              regionNode.getProcedure().stateLoaded(this, regionNode);
303            }
304            if (regionLocation != null) {
305              regionStates.addRegionToServer(regionNode);
306            }
307            if (RegionReplicaUtil.isDefaultReplica(regionInfo.getReplicaId())) {
308              setMetaAssigned(regionInfo, state == State.OPEN);
309            }
310            LOG.debug("Loaded hbase:meta {}", regionNode);
311          }, result);
312      }
313    }
314    mirrorMetaLocations();
315  }
316
317  /**
318   * Create RegionStateNode based on the TRSP list, and attach the TRSP to the RegionStateNode.
319   * <p>
320   * This is used to restore the RIT region list, so we do not need to restore it in the loadingMeta
321   * method below. And it is also very important as now before submitting a TRSP, we need to attach
322   * it to the RegionStateNode, which acts like a guard, so we need to restore this information at
323   * the very beginning, before we start processing any procedures.
324   */
325  public void setupRIT(List<TransitRegionStateProcedure> procs) {
326    procs.forEach(proc -> {
327      RegionInfo regionInfo = proc.getRegion();
328      RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo);
329      TransitRegionStateProcedure existingProc = regionNode.getProcedure();
330      if (existingProc != null) {
331        // This is possible, as we will detach the procedure from the RSN before we
332        // actually finish the procedure. This is because that, we will detach the TRSP from the RSN
333        // during execution, at that time, the procedure has not been marked as done in the pv2
334        // framework yet, so it is possible that we schedule a new TRSP immediately and when
335        // arriving here, we will find out that there are multiple TRSPs for the region. But we can
336        // make sure that, only the last one can take the charge, the previous ones should have all
337        // been finished already. So here we will compare the proc id, the greater one will win.
338        if (existingProc.getProcId() < proc.getProcId()) {
339          // the new one wins, unset and set it to the new one below
340          regionNode.unsetProcedure(existingProc);
341        } else {
342          // the old one wins, skip
343          return;
344        }
345      }
346      LOG.info("Attach {} to {} to restore RIT", proc, regionNode);
347      regionNode.setProcedure(proc);
348    });
349  }
350
351  public void stop() {
352    if (!running.compareAndSet(true, false)) {
353      return;
354    }
355
356    LOG.info("Stopping assignment manager");
357
358    // The AM is started before the procedure executor,
359    // but the actual work will be loaded/submitted only once we have the executor
360    final boolean hasProcExecutor = master.getMasterProcedureExecutor() != null;
361
362    // Remove the RIT chore
363    if (hasProcExecutor) {
364      master.getMasterProcedureExecutor().removeChore(this.ritChore);
365      if (this.deadMetricChore != null) {
366        master.getMasterProcedureExecutor().removeChore(this.deadMetricChore);
367      }
368    }
369
370    // Stop the Assignment Thread
371    stopAssignmentThread();
372
373    // Stop the RegionStateStore
374    regionStates.clear();
375
376    // Update meta events (for testing)
377    if (hasProcExecutor) {
378      metaLoadEvent.suspend();
379      for (RegionInfo hri: getMetaRegionSet()) {
380        setMetaAssigned(hri, false);
381      }
382    }
383  }
384
385  public boolean isRunning() {
386    return running.get();
387  }
388
389  public Configuration getConfiguration() {
390    return master.getConfiguration();
391  }
392
393  public MetricsAssignmentManager getAssignmentManagerMetrics() {
394    return metrics;
395  }
396
397  private LoadBalancer getBalancer() {
398    return master.getLoadBalancer();
399  }
400
401  private FavoredNodesPromoter getFavoredNodePromoter() {
402    return (FavoredNodesPromoter) ((RSGroupBasedLoadBalancer) master.getLoadBalancer())
403      .getInternalBalancer();
404  }
405
406  private MasterProcedureEnv getProcedureEnvironment() {
407    return master.getMasterProcedureExecutor().getEnvironment();
408  }
409
410  private MasterProcedureScheduler getProcedureScheduler() {
411    return getProcedureEnvironment().getProcedureScheduler();
412  }
413
414  int getAssignMaxAttempts() {
415    return assignMaxAttempts;
416  }
417
418  int getAssignRetryImmediatelyMaxAttempts() {
419    return assignRetryImmediatelyMaxAttempts;
420  }
421
422  public RegionStates getRegionStates() {
423    return regionStates;
424  }
425
426  /**
427   * Returns the regions hosted by the specified server.
428   * <p/>
429   * Notice that, for SCP, after we submit the SCP, no one can change the region list for the
430   * ServerStateNode so we do not need any locks here. And for other usage, this can only give you a
431   * snapshot of the current region list for this server, which means, right after you get the
432   * region list, new regions may be moved to this server or some regions may be moved out from this
433   * server, so you should not use it critically if you need strong consistency.
434   */
435  public List<RegionInfo> getRegionsOnServer(ServerName serverName) {
436    ServerStateNode serverInfo = regionStates.getServerNode(serverName);
437    if (serverInfo == null) {
438      return Collections.emptyList();
439    }
440    return serverInfo.getRegionInfoList();
441  }
442
443  private RegionInfo getRegionInfo(RegionStateNode rsn) {
444    if (rsn.isSplit() && !rsn.getRegionInfo().isSplit()) {
445      // see the comments in markRegionAsSplit on why we need to do this converting.
446      return RegionInfoBuilder.newBuilder(rsn.getRegionInfo()).setSplit(true).setOffline(true)
447        .build();
448    } else {
449      return rsn.getRegionInfo();
450    }
451  }
452
453  private Stream<RegionStateNode> getRegionStateNodes(TableName tableName,
454    boolean excludeOfflinedSplitParents) {
455    Stream<RegionStateNode> stream = regionStates.getTableRegionStateNodes(tableName).stream();
456    if (excludeOfflinedSplitParents) {
457      return stream.filter(rsn -> !rsn.isSplit());
458    } else {
459      return stream;
460    }
461  }
462
463  public List<RegionInfo> getTableRegions(TableName tableName,
464    boolean excludeOfflinedSplitParents) {
465    return getRegionStateNodes(tableName, excludeOfflinedSplitParents)
466      .map(this::getRegionInfo).collect(Collectors.toList());
467  }
468
469  public List<Pair<RegionInfo, ServerName>> getTableRegionsAndLocations(TableName tableName,
470    boolean excludeOfflinedSplitParents) {
471    return getRegionStateNodes(tableName, excludeOfflinedSplitParents)
472      .map(rsn -> Pair.newPair(getRegionInfo(rsn), rsn.getRegionLocation()))
473      .collect(Collectors.toList());
474  }
475
476  public RegionStateStore getRegionStateStore() {
477    return regionStateStore;
478  }
479
480  public List<ServerName> getFavoredNodes(final RegionInfo regionInfo) {
481    return this.shouldAssignRegionsWithFavoredNodes
482      ? getFavoredNodePromoter().getFavoredNodes(regionInfo)
483      : ServerName.EMPTY_SERVER_LIST;
484  }
485
486  // ============================================================================================
487  //  Table State Manager helpers
488  // ============================================================================================
489  private TableStateManager getTableStateManager() {
490    return master.getTableStateManager();
491  }
492
493  private boolean isTableEnabled(final TableName tableName) {
494    return getTableStateManager().isTableState(tableName, TableState.State.ENABLED);
495  }
496
497  private boolean isTableDisabled(final TableName tableName) {
498    return getTableStateManager().isTableState(tableName,
499      TableState.State.DISABLED, TableState.State.DISABLING);
500  }
501
502  // ============================================================================================
503  //  META Helpers
504  // ============================================================================================
505  private boolean isMetaRegion(final RegionInfo regionInfo) {
506    return regionInfo.isMetaRegion();
507  }
508
509  public boolean isMetaRegion(final byte[] regionName) {
510    return getMetaRegionFromName(regionName) != null;
511  }
512
513  public RegionInfo getMetaRegionFromName(final byte[] regionName) {
514    for (RegionInfo hri: getMetaRegionSet()) {
515      if (Bytes.equals(hri.getRegionName(), regionName)) {
516        return hri;
517      }
518    }
519    return null;
520  }
521
522  public boolean isCarryingMeta(final ServerName serverName) {
523    // TODO: handle multiple meta
524    return isCarryingRegion(serverName, RegionInfoBuilder.FIRST_META_REGIONINFO);
525  }
526
527  private boolean isCarryingRegion(final ServerName serverName, final RegionInfo regionInfo) {
528    // TODO: check for state?
529    final RegionStateNode node = regionStates.getRegionStateNode(regionInfo);
530    return(node != null && serverName.equals(node.getRegionLocation()));
531  }
532
533  private RegionInfo getMetaForRegion(final RegionInfo regionInfo) {
534    //if (regionInfo.isMetaRegion()) return regionInfo;
535    // TODO: handle multiple meta. if the region provided is not meta lookup
536    // which meta the region belongs to.
537    return RegionInfoBuilder.FIRST_META_REGIONINFO;
538  }
539
540  // TODO: handle multiple meta.
541  private static final Set<RegionInfo> META_REGION_SET =
542      Collections.singleton(RegionInfoBuilder.FIRST_META_REGIONINFO);
543  public Set<RegionInfo> getMetaRegionSet() {
544    return META_REGION_SET;
545  }
546
547  // ============================================================================================
548  //  META Event(s) helpers
549  // ============================================================================================
550  /**
551   * Notice that, this only means the meta region is available on a RS, but the AM may still be
552   * loading the region states from meta, so usually you need to check {@link #isMetaLoaded()} first
553   * before checking this method, unless you can make sure that your piece of code can only be
554   * executed after AM builds the region states.
555   * @see #isMetaLoaded()
556   */
557  public boolean isMetaAssigned() {
558    return metaAssignEvent.isReady();
559  }
560
561  public boolean isMetaRegionInTransition() {
562    return !isMetaAssigned();
563  }
564
565  /**
566   * Notice that this event does not mean the AM has already finished region state rebuilding. See
567   * the comment of {@link #isMetaAssigned()} for more details.
568   * @see #isMetaAssigned()
569   */
570  public boolean waitMetaAssigned(Procedure<?> proc, RegionInfo regionInfo) {
571    return getMetaAssignEvent(getMetaForRegion(regionInfo)).suspendIfNotReady(proc);
572  }
573
574  private void setMetaAssigned(RegionInfo metaRegionInfo, boolean assigned) {
575    assert isMetaRegion(metaRegionInfo) : "unexpected non-meta region " + metaRegionInfo;
576    ProcedureEvent<?> metaAssignEvent = getMetaAssignEvent(metaRegionInfo);
577    if (assigned) {
578      metaAssignEvent.wake(getProcedureScheduler());
579    } else {
580      metaAssignEvent.suspend();
581    }
582  }
583
584  private ProcedureEvent<?> getMetaAssignEvent(RegionInfo metaRegionInfo) {
585    assert isMetaRegion(metaRegionInfo) : "unexpected non-meta region " + metaRegionInfo;
586    // TODO: handle multiple meta.
587    return metaAssignEvent;
588  }
589
590  /**
591   * Wait until AM finishes the meta loading, i.e, the region states rebuilding.
592   * @see #isMetaLoaded()
593   * @see #waitMetaAssigned(Procedure, RegionInfo)
594   */
595  public boolean waitMetaLoaded(Procedure<?> proc) {
596    return metaLoadEvent.suspendIfNotReady(proc);
597  }
598
599  /**
600   * This method will be called in master initialization method after calling
601   * {@link #processOfflineRegions()}, as in processOfflineRegions we will generate assign
602   * procedures for offline regions, which may be conflict with creating table.
603   * <p/>
604   * This is a bit dirty, should be reconsidered after we decide whether to keep the
605   * {@link #processOfflineRegions()} method.
606   */
607  public void wakeMetaLoadedEvent() {
608    metaLoadEvent.wake(getProcedureScheduler());
609    assert isMetaLoaded() : "expected meta to be loaded";
610  }
611
612  /**
613   * Return whether AM finishes the meta loading, i.e, the region states rebuilding.
614   * @see #isMetaAssigned()
615   * @see #waitMetaLoaded(Procedure)
616   */
617  public boolean isMetaLoaded() {
618    return metaLoadEvent.isReady();
619  }
620
621  /**
622   * Start a new thread to check if there are region servers whose versions are higher than others.
623   * If so, move all system table regions to RS with the highest version to keep compatibility.
624   * The reason is, RS in new version may not be able to access RS in old version when there are
625   * some incompatible changes.
626   * <p>This method is called when a new RegionServer is added to cluster only.</p>
627   */
628  public void checkIfShouldMoveSystemRegionAsync() {
629    // TODO: Fix this thread. If a server is killed and a new one started, this thread thinks that
630    // it should 'move' the system tables from the old server to the new server but
631    // ServerCrashProcedure is on it; and it will take care of the assign without dataloss.
632    if (this.master.getServerManager().countOfRegionServers() <= 1) {
633      return;
634    }
635    // This thread used to run whenever there was a change in the cluster. The ZooKeeper
636    // childrenChanged notification came in before the nodeDeleted message and so this method
637    // cold run before a ServerCrashProcedure could run. That meant that this thread could see
638    // a Crashed Server before ServerCrashProcedure and it could find system regions on the
639    // crashed server and go move them before ServerCrashProcedure had a chance; could be
640    // dataloss too if WALs were not recovered.
641    new Thread(() -> {
642      try {
643        synchronized (checkIfShouldMoveSystemRegionLock) {
644          List<RegionPlan> plans = new ArrayList<>();
645          // TODO: I don't think this code does a good job if all servers in cluster have same
646          // version. It looks like it will schedule unnecessary moves.
647          for (ServerName server : getExcludedServersForSystemTable()) {
648            if (master.getServerManager().isServerDead(server)) {
649              // TODO: See HBASE-18494 and HBASE-18495. Though getExcludedServersForSystemTable()
650              // considers only online servers, the server could be queued for dead server
651              // processing. As region assignments for crashed server is handled by
652              // ServerCrashProcedure, do NOT handle them here. The goal is to handle this through
653              // regular flow of LoadBalancer as a favored node and not to have this special
654              // handling.
655              continue;
656            }
657            List<RegionInfo> regionsShouldMove = getSystemTables(server);
658            if (!regionsShouldMove.isEmpty()) {
659              for (RegionInfo regionInfo : regionsShouldMove) {
660                // null value for dest forces destination server to be selected by balancer
661                RegionPlan plan = new RegionPlan(regionInfo, server, null);
662                if (regionInfo.isMetaRegion()) {
663                  // Must move meta region first.
664                  LOG.info("Async MOVE of {} to newer Server={}",
665                      regionInfo.getEncodedName(), server);
666                  moveAsync(plan);
667                } else {
668                  plans.add(plan);
669                }
670              }
671            }
672            for (RegionPlan plan : plans) {
673              LOG.info("Async MOVE of {} to newer Server={}",
674                  plan.getRegionInfo().getEncodedName(), server);
675              moveAsync(plan);
676            }
677          }
678        }
679      } catch (Throwable t) {
680        LOG.error(t.toString(), t);
681      }
682    }).start();
683  }
684
685  private List<RegionInfo> getSystemTables(ServerName serverName) {
686    ServerStateNode serverNode = regionStates.getServerNode(serverName);
687    if (serverNode == null) {
688      return Collections.emptyList();
689    }
690    return serverNode.getSystemRegionInfoList();
691  }
692
693  private void preTransitCheck(RegionStateNode regionNode, RegionState.State[] expectedStates)
694      throws HBaseIOException {
695    if (regionNode.getProcedure() != null) {
696      throw new HBaseIOException(regionNode + " is currently in transition; pid=" +
697        regionNode.getProcedure().getProcId());
698    }
699    if (!regionNode.isInState(expectedStates)) {
700      throw new DoNotRetryRegionException(UNEXPECTED_STATE_REGION + regionNode);
701    }
702    if (isTableDisabled(regionNode.getTable())) {
703      throw new DoNotRetryIOException(regionNode.getTable() + " is disabled for " + regionNode);
704    }
705  }
706
707  /**
708   * Create an assign TransitRegionStateProcedure. Makes sure of RegionState.
709   * Throws exception if not appropriate UNLESS override is set. Used by hbck2 but also by
710   * straightline {@link #assign(RegionInfo, ServerName)} and
711   * {@link #assignAsync(RegionInfo, ServerName)}.
712   * @see #createAssignProcedure(RegionStateNode, ServerName) for a version that does NO checking
713   *   used when only when no checking needed.
714   * @param override If false, check RegionState is appropriate for assign; if not throw exception.
715   */
716  private TransitRegionStateProcedure createAssignProcedure(RegionInfo regionInfo, ServerName sn,
717      boolean override) throws IOException {
718    RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo);
719    regionNode.lock();
720    try {
721      if (override) {
722        if (regionNode.getProcedure() != null) {
723          regionNode.unsetProcedure(regionNode.getProcedure());
724        }
725      } else {
726        preTransitCheck(regionNode, STATES_EXPECTED_ON_ASSIGN);
727      }
728      assert regionNode.getProcedure() == null;
729      return regionNode.setProcedure(TransitRegionStateProcedure.assign(getProcedureEnvironment(),
730        regionInfo, sn));
731    } finally {
732      regionNode.unlock();
733    }
734  }
735
736  /**
737   * Create an assign TransitRegionStateProcedure. Does NO checking of RegionState.
738   * Presumes appriopriate state ripe for assign.
739   * @see #createAssignProcedure(RegionInfo, ServerName, boolean)
740   */
741  private TransitRegionStateProcedure createAssignProcedure(RegionStateNode regionNode,
742      ServerName targetServer) {
743    regionNode.lock();
744    try {
745      return regionNode.setProcedure(TransitRegionStateProcedure.assign(getProcedureEnvironment(),
746        regionNode.getRegionInfo(), targetServer));
747    } finally {
748      regionNode.unlock();
749    }
750  }
751
752  public long assign(RegionInfo regionInfo, ServerName sn) throws IOException {
753    TransitRegionStateProcedure proc = createAssignProcedure(regionInfo, sn, false);
754    ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc);
755    return proc.getProcId();
756  }
757
758  public long assign(RegionInfo regionInfo) throws IOException {
759    return assign(regionInfo, null);
760  }
761
762  /**
763   * Submits a procedure that assigns a region to a target server without waiting for it to finish
764   * @param regionInfo the region we would like to assign
765   * @param sn target server name
766   */
767  public Future<byte[]> assignAsync(RegionInfo regionInfo, ServerName sn) throws IOException {
768    return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(),
769      createAssignProcedure(regionInfo, sn, false));
770  }
771
772  /**
773   * Submits a procedure that assigns a region without waiting for it to finish
774   * @param regionInfo the region we would like to assign
775   */
776  public Future<byte[]> assignAsync(RegionInfo regionInfo) throws IOException {
777    return assignAsync(regionInfo, null);
778  }
779
780  public long unassign(RegionInfo regionInfo) throws IOException {
781    RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo);
782    if (regionNode == null) {
783      throw new UnknownRegionException("No RegionState found for " + regionInfo.getEncodedName());
784    }
785    TransitRegionStateProcedure proc;
786    regionNode.lock();
787    try {
788      preTransitCheck(regionNode, STATES_EXPECTED_ON_UNASSIGN_OR_MOVE);
789      proc = TransitRegionStateProcedure.unassign(getProcedureEnvironment(), regionInfo);
790      regionNode.setProcedure(proc);
791    } finally {
792      regionNode.unlock();
793    }
794    ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc);
795    return proc.getProcId();
796  }
797
798  public TransitRegionStateProcedure createMoveRegionProcedure(RegionInfo regionInfo,
799      ServerName targetServer) throws HBaseIOException {
800    RegionStateNode regionNode = this.regionStates.getRegionStateNode(regionInfo);
801    if (regionNode == null) {
802      throw new UnknownRegionException("No RegionStateNode found for " +
803          regionInfo.getEncodedName() + "(Closed/Deleted?)");
804    }
805    TransitRegionStateProcedure proc;
806    regionNode.lock();
807    try {
808      preTransitCheck(regionNode, STATES_EXPECTED_ON_UNASSIGN_OR_MOVE);
809      regionNode.checkOnline();
810      proc = TransitRegionStateProcedure.move(getProcedureEnvironment(), regionInfo, targetServer);
811      regionNode.setProcedure(proc);
812    } finally {
813      regionNode.unlock();
814    }
815    return proc;
816  }
817
818  public void move(RegionInfo regionInfo) throws IOException {
819    TransitRegionStateProcedure proc = createMoveRegionProcedure(regionInfo, null);
820    ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc);
821  }
822
823  public Future<byte[]> moveAsync(RegionPlan regionPlan) throws HBaseIOException {
824    TransitRegionStateProcedure proc =
825      createMoveRegionProcedure(regionPlan.getRegionInfo(), regionPlan.getDestination());
826    return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), proc);
827  }
828
829  public Future<byte[]> balance(RegionPlan regionPlan) throws HBaseIOException {
830    ServerName current =
831      this.getRegionStates().getRegionAssignments().get(regionPlan.getRegionInfo());
832    if (!current.equals(regionPlan.getSource())) {
833      LOG.debug("Skip region plan {}, source server not match, current region location is {}",
834        regionPlan, current);
835      return null;
836    }
837    return moveAsync(regionPlan);
838  }
839
840  // ============================================================================================
841  //  RegionTransition procedures helpers
842  // ============================================================================================
843
844  /**
845   * Create round-robin assigns. Use on table creation to distribute out regions across cluster.
846   * @return AssignProcedures made out of the passed in <code>hris</code> and a call to the balancer
847   *         to populate the assigns with targets chosen using round-robin (default balancer
848   *         scheme). If at assign-time, the target chosen is no longer up, thats fine, the
849   *         AssignProcedure will ask the balancer for a new target, and so on.
850   */
851  public TransitRegionStateProcedure[] createRoundRobinAssignProcedures(List<RegionInfo> hris,
852      List<ServerName> serversToExclude) {
853    if (hris.isEmpty()) {
854      return new TransitRegionStateProcedure[0];
855    }
856
857    if (serversToExclude != null
858        && this.master.getServerManager().getOnlineServersList().size() == 1) {
859      LOG.debug("Only one region server found and hence going ahead with the assignment");
860      serversToExclude = null;
861    }
862    try {
863      // Ask the balancer to assign our regions. Pass the regions en masse. The balancer can do
864      // a better job if it has all the assignments in the one lump.
865      Map<ServerName, List<RegionInfo>> assignments = getBalancer().roundRobinAssignment(hris,
866        this.master.getServerManager().createDestinationServersList(serversToExclude));
867      // Return mid-method!
868      return createAssignProcedures(assignments);
869    } catch (IOException hioe) {
870      LOG.warn("Failed roundRobinAssignment", hioe);
871    }
872    // If an error above, fall-through to this simpler assign. Last resort.
873    return createAssignProcedures(hris);
874  }
875
876  /**
877   * Create round-robin assigns. Use on table creation to distribute out regions across cluster.
878   * @return AssignProcedures made out of the passed in <code>hris</code> and a call to the balancer
879   *         to populate the assigns with targets chosen using round-robin (default balancer
880   *         scheme). If at assign-time, the target chosen is no longer up, thats fine, the
881   *         AssignProcedure will ask the balancer for a new target, and so on.
882   */
883  public TransitRegionStateProcedure[] createRoundRobinAssignProcedures(List<RegionInfo> hris) {
884    return createRoundRobinAssignProcedures(hris, null);
885  }
886
887  static int compare(TransitRegionStateProcedure left, TransitRegionStateProcedure right) {
888    if (left.getRegion().isMetaRegion()) {
889      if (right.getRegion().isMetaRegion()) {
890        return RegionInfo.COMPARATOR.compare(left.getRegion(), right.getRegion());
891      }
892      return -1;
893    } else if (right.getRegion().isMetaRegion()) {
894      return +1;
895    }
896    if (left.getRegion().getTable().isSystemTable()) {
897      if (right.getRegion().getTable().isSystemTable()) {
898        return RegionInfo.COMPARATOR.compare(left.getRegion(), right.getRegion());
899      }
900      return -1;
901    } else if (right.getRegion().getTable().isSystemTable()) {
902      return +1;
903    }
904    return RegionInfo.COMPARATOR.compare(left.getRegion(), right.getRegion());
905  }
906
907  /**
908   * Create one TransitRegionStateProcedure to assign a region w/o specifying a target server.
909   * This method is called from HBCK2.
910   * @return an assign or null
911   */
912  public TransitRegionStateProcedure createOneAssignProcedure(RegionInfo ri, boolean override) {
913    TransitRegionStateProcedure trsp = null;
914    try {
915      trsp = createAssignProcedure(ri, null, override);
916    } catch (IOException ioe) {
917      LOG.info("Failed {} assign, override={}" +
918        (override? "": "; set override to by-pass state checks."),
919        ri.getEncodedName(), override, ioe);
920    }
921    return trsp;
922  }
923
924  /**
925   * Create one TransitRegionStateProcedure to unassign a region.
926   * This method is called from HBCK2.
927   * @return an unassign or null
928   */
929  public TransitRegionStateProcedure createOneUnassignProcedure(RegionInfo ri, boolean override) {
930    RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(ri);
931    TransitRegionStateProcedure trsp = null;
932    regionNode.lock();
933    try {
934      if (override) {
935        if (regionNode.getProcedure() != null) {
936          regionNode.unsetProcedure(regionNode.getProcedure());
937        }
938      } else {
939        // This is where we could throw an exception; i.e. override is false.
940        preTransitCheck(regionNode, STATES_EXPECTED_ON_UNASSIGN_OR_MOVE);
941      }
942      assert regionNode.getProcedure() == null;
943      trsp = TransitRegionStateProcedure.unassign(getProcedureEnvironment(),
944        regionNode.getRegionInfo());
945      regionNode.setProcedure(trsp);
946    } catch (IOException ioe) {
947      // 'override' must be false here.
948      LOG.info("Failed {} unassign, override=false; set override to by-pass state checks.",
949        ri.getEncodedName(), ioe);
950    } finally{
951      regionNode.unlock();
952    }
953    return trsp;
954  }
955
956  /**
957   * Create an array of TransitRegionStateProcedure w/o specifying a target server.
958   * Used as fallback of caller is unable to do {@link #createAssignProcedures(Map)}.
959   * <p/>
960   * If no target server, at assign time, we will try to use the former location of the region if
961   * one exists. This is how we 'retain' the old location across a server restart.
962   * <p/>
963   * Should only be called when you can make sure that no one can touch these regions other than
964   * you. For example, when you are creating or enabling table. Presumes all Regions are in
965   * appropriate state ripe for assign; no checking of Region state is done in here.
966   * @see #createAssignProcedures(Map)
967   */
968  public TransitRegionStateProcedure[] createAssignProcedures(List<RegionInfo> hris) {
969    return hris.stream().map(hri -> regionStates.getOrCreateRegionStateNode(hri))
970        .map(regionNode -> createAssignProcedure(regionNode, null))
971        .sorted(AssignmentManager::compare).toArray(TransitRegionStateProcedure[]::new);
972  }
973
974  /**
975   * Tied to {@link #createAssignProcedures(List)} in that it is called if caller is unable to run
976   * this method. Presumes all Regions are in appropriate state ripe for assign; no checking
977   * of Region state is done in here.
978   * @param assignments Map of assignments from which we produce an array of AssignProcedures.
979   * @return Assignments made from the passed in <code>assignments</code>
980   * @see #createAssignProcedures(List)
981   */
982  private TransitRegionStateProcedure[] createAssignProcedures(
983      Map<ServerName, List<RegionInfo>> assignments) {
984    return assignments.entrySet().stream()
985      .flatMap(e -> e.getValue().stream().map(hri -> regionStates.getOrCreateRegionStateNode(hri))
986        .map(regionNode -> createAssignProcedure(regionNode, e.getKey())))
987      .sorted(AssignmentManager::compare).toArray(TransitRegionStateProcedure[]::new);
988  }
989
990  // for creating unassign TRSP when disabling a table or closing excess region replicas
991  private TransitRegionStateProcedure forceCreateUnssignProcedure(RegionStateNode regionNode) {
992    regionNode.lock();
993    try {
994      if (regionNode.isInState(State.OFFLINE, State.CLOSED, State.SPLIT)) {
995        return null;
996      }
997      // in general, a split parent should be in CLOSED or SPLIT state, but anyway, let's check it
998      // here for safety
999      if (regionNode.getRegionInfo().isSplit()) {
1000        LOG.warn("{} is a split parent but not in CLOSED or SPLIT state", regionNode);
1001        return null;
1002      }
1003      // As in DisableTableProcedure or ModifyTableProcedure, we will hold the xlock for table, so
1004      // we can make sure that this procedure has not been executed yet, as TRSP will hold the
1005      // shared lock for table all the time. So here we will unset it and when it is actually
1006      // executed, it will find that the attach procedure is not itself and quit immediately.
1007      if (regionNode.getProcedure() != null) {
1008        regionNode.unsetProcedure(regionNode.getProcedure());
1009      }
1010      return regionNode.setProcedure(TransitRegionStateProcedure.unassign(getProcedureEnvironment(),
1011        regionNode.getRegionInfo()));
1012    } finally {
1013      regionNode.unlock();
1014    }
1015  }
1016
1017  /**
1018   * Called by DisableTableProcedure to unassign all the regions for a table.
1019   */
1020  public TransitRegionStateProcedure[] createUnassignProceduresForDisabling(TableName tableName) {
1021    return regionStates.getTableRegionStateNodes(tableName).stream()
1022      .map(this::forceCreateUnssignProcedure).filter(p -> p != null)
1023      .toArray(TransitRegionStateProcedure[]::new);
1024  }
1025
1026  /**
1027   * Called by ModifyTableProcedures to unassign all the excess region replicas
1028   * for a table.
1029   */
1030  public TransitRegionStateProcedure[] createUnassignProceduresForClosingExcessRegionReplicas(
1031    TableName tableName, int newReplicaCount) {
1032    return regionStates.getTableRegionStateNodes(tableName).stream()
1033      .filter(regionNode -> regionNode.getRegionInfo().getReplicaId() >= newReplicaCount)
1034      .map(this::forceCreateUnssignProcedure).filter(p -> p != null)
1035      .toArray(TransitRegionStateProcedure[]::new);
1036  }
1037
1038  public SplitTableRegionProcedure createSplitProcedure(final RegionInfo regionToSplit,
1039      final byte[] splitKey) throws IOException {
1040    return new SplitTableRegionProcedure(getProcedureEnvironment(), regionToSplit, splitKey);
1041  }
1042
1043  public MergeTableRegionsProcedure createMergeProcedure(RegionInfo ... ris) throws IOException {
1044    return new MergeTableRegionsProcedure(getProcedureEnvironment(), ris, false);
1045  }
1046
1047  /**
1048   * Delete the region states. This is called by "DeleteTable"
1049   */
1050  public void deleteTable(final TableName tableName) throws IOException {
1051    final ArrayList<RegionInfo> regions = regionStates.getTableRegionsInfo(tableName);
1052    regionStateStore.deleteRegions(regions);
1053    for (int i = 0; i < regions.size(); ++i) {
1054      final RegionInfo regionInfo = regions.get(i);
1055      // we expect the region to be offline
1056      regionStates.removeFromOfflineRegions(regionInfo);
1057      regionStates.deleteRegion(regionInfo);
1058    }
1059  }
1060
1061  // ============================================================================================
1062  //  RS Region Transition Report helpers
1063  // ============================================================================================
1064  private void reportRegionStateTransition(ReportRegionStateTransitionResponse.Builder builder,
1065      ServerName serverName, List<RegionStateTransition> transitionList) throws IOException {
1066    for (RegionStateTransition transition : transitionList) {
1067      switch (transition.getTransitionCode()) {
1068        case OPENED:
1069        case FAILED_OPEN:
1070        case CLOSED:
1071          assert transition.getRegionInfoCount() == 1 : transition;
1072          final RegionInfo hri = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0));
1073          long procId =
1074            transition.getProcIdCount() > 0 ? transition.getProcId(0) : Procedure.NO_PROC_ID;
1075          updateRegionTransition(serverName, transition.getTransitionCode(), hri,
1076            transition.hasOpenSeqNum() ? transition.getOpenSeqNum() : HConstants.NO_SEQNUM, procId);
1077          break;
1078        case READY_TO_SPLIT:
1079        case SPLIT:
1080        case SPLIT_REVERTED:
1081          assert transition.getRegionInfoCount() == 3 : transition;
1082          final RegionInfo parent = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0));
1083          final RegionInfo splitA = ProtobufUtil.toRegionInfo(transition.getRegionInfo(1));
1084          final RegionInfo splitB = ProtobufUtil.toRegionInfo(transition.getRegionInfo(2));
1085          updateRegionSplitTransition(serverName, transition.getTransitionCode(), parent, splitA,
1086            splitB);
1087          break;
1088        case READY_TO_MERGE:
1089        case MERGED:
1090        case MERGE_REVERTED:
1091          assert transition.getRegionInfoCount() == 3 : transition;
1092          final RegionInfo merged = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0));
1093          final RegionInfo mergeA = ProtobufUtil.toRegionInfo(transition.getRegionInfo(1));
1094          final RegionInfo mergeB = ProtobufUtil.toRegionInfo(transition.getRegionInfo(2));
1095          updateRegionMergeTransition(serverName, transition.getTransitionCode(), merged, mergeA,
1096            mergeB);
1097          break;
1098      }
1099    }
1100  }
1101
1102  public ReportRegionStateTransitionResponse reportRegionStateTransition(
1103      final ReportRegionStateTransitionRequest req) throws PleaseHoldException {
1104    ReportRegionStateTransitionResponse.Builder builder =
1105        ReportRegionStateTransitionResponse.newBuilder();
1106    ServerName serverName = ProtobufUtil.toServerName(req.getServer());
1107    ServerStateNode serverNode = regionStates.getOrCreateServer(serverName);
1108    // here we have to acquire a read lock instead of a simple exclusive lock. This is because that
1109    // we should not block other reportRegionStateTransition call from the same region server. This
1110    // is not only about performance, but also to prevent dead lock. Think of the meta region is
1111    // also on the same region server and you hold the lock which blocks the
1112    // reportRegionStateTransition for meta, and since meta is not online, you will block inside the
1113    // lock protection to wait for meta online...
1114    serverNode.readLock().lock();
1115    try {
1116      // we only accept reportRegionStateTransition if the region server is online, see the comment
1117      // above in submitServerCrash method and HBASE-21508 for more details.
1118      if (serverNode.isInState(ServerState.ONLINE)) {
1119        try {
1120          reportRegionStateTransition(builder, serverName, req.getTransitionList());
1121        } catch (PleaseHoldException e) {
1122          LOG.trace("Failed transition ", e);
1123          throw e;
1124        } catch (UnsupportedOperationException | IOException e) {
1125          // TODO: at the moment we have a single error message and the RS will abort
1126          // if the master says that one of the region transitions failed.
1127          LOG.warn("Failed transition", e);
1128          builder.setErrorMessage("Failed transition " + e.getMessage());
1129        }
1130      } else {
1131        LOG.warn("The region server {} is already dead, skip reportRegionStateTransition call",
1132          serverName);
1133        builder.setErrorMessage("You are dead");
1134      }
1135    } finally {
1136      serverNode.readLock().unlock();
1137    }
1138
1139    return builder.build();
1140  }
1141
1142  private void updateRegionTransition(ServerName serverName, TransitionCode state,
1143      RegionInfo regionInfo, long seqId, long procId) throws IOException {
1144    checkMetaLoaded(regionInfo);
1145
1146    RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo);
1147    if (regionNode == null) {
1148      // the table/region is gone. maybe a delete, split, merge
1149      throw new UnexpectedStateException(String.format(
1150        "Server %s was trying to transition region %s to %s. but Region is not known.",
1151        serverName, regionInfo, state));
1152    }
1153    LOG.trace("Update region transition serverName={} region={} regionState={}", serverName,
1154      regionNode, state);
1155
1156    ServerStateNode serverNode = regionStates.getOrCreateServer(serverName);
1157    regionNode.lock();
1158    try {
1159      if (!reportTransition(regionNode, serverNode, state, seqId, procId)) {
1160        // Don't log WARN if shutting down cluster; during shutdown. Avoid the below messages:
1161        // 2018-08-13 10:45:10,551 WARN ...AssignmentManager: No matching procedure found for
1162        // rit=OPEN, location=ve0538.halxg.cloudera.com,16020,1533493000958,
1163        // table=IntegrationTestBigLinkedList, region=65ab289e2fc1530df65f6c3d7cde7aa5 transition
1164        // to CLOSED
1165        // These happen because on cluster shutdown, we currently let the RegionServers close
1166        // regions. This is the only time that region close is not run by the Master (so cluster
1167        // goes down fast). Consider changing it so Master runs all shutdowns.
1168        if (this.master.getServerManager().isClusterShutdown() &&
1169          state.equals(TransitionCode.CLOSED)) {
1170          LOG.info("RegionServer {} {}", state, regionNode.getRegionInfo().getEncodedName());
1171        } else {
1172          LOG.warn("No matching procedure found for {} transition on {} to {}",
1173              serverName, regionNode, state);
1174        }
1175      }
1176    } finally {
1177      regionNode.unlock();
1178    }
1179  }
1180
1181  private boolean reportTransition(RegionStateNode regionNode, ServerStateNode serverNode,
1182      TransitionCode state, long seqId, long procId) throws IOException {
1183    ServerName serverName = serverNode.getServerName();
1184    TransitRegionStateProcedure proc = regionNode.getProcedure();
1185    if (proc == null) {
1186      return false;
1187    }
1188    proc.reportTransition(master.getMasterProcedureExecutor().getEnvironment(), regionNode,
1189      serverName, state, seqId, procId);
1190    return true;
1191  }
1192
1193  private void updateRegionSplitTransition(final ServerName serverName, final TransitionCode state,
1194      final RegionInfo parent, final RegionInfo hriA, final RegionInfo hriB)
1195      throws IOException {
1196    checkMetaLoaded(parent);
1197
1198    if (state != TransitionCode.READY_TO_SPLIT) {
1199      throw new UnexpectedStateException("unsupported split regionState=" + state +
1200        " for parent region " + parent +
1201        " maybe an old RS (< 2.0) had the operation in progress");
1202    }
1203
1204    // sanity check on the request
1205    if (!Bytes.equals(hriA.getEndKey(), hriB.getStartKey())) {
1206      throw new UnsupportedOperationException(
1207        "unsupported split request with bad keys: parent=" + parent +
1208        " hriA=" + hriA + " hriB=" + hriB);
1209    }
1210
1211    if (!master.isSplitOrMergeEnabled(MasterSwitchType.SPLIT)) {
1212      LOG.warn("Split switch is off! skip split of " + parent);
1213      throw new DoNotRetryIOException("Split region " + parent.getRegionNameAsString() +
1214          " failed due to split switch off");
1215    }
1216
1217    // Submit the Split procedure
1218    final byte[] splitKey = hriB.getStartKey();
1219    if (LOG.isDebugEnabled()) {
1220      LOG.debug("Split request from " + serverName +
1221          ", parent=" + parent + " splitKey=" + Bytes.toStringBinary(splitKey));
1222    }
1223    // Processing this report happens asynchronously from other activities which can mutate
1224    // the region state. For example, a split procedure may already be running for this parent.
1225    // A split procedure cannot succeed if the parent region is no longer open, so we can
1226    // ignore it in that case.
1227    // Note that submitting more than one split procedure for a given region is
1228    // harmless -- the split is fenced in the procedure handling -- but it would be noisy in
1229    // the logs. Only one procedure can succeed. The other procedure(s) would abort during
1230    // initialization and report failure with WARN level logging.
1231    RegionState parentState = regionStates.getRegionState(parent);
1232    if (parentState != null && parentState.isOpened()) {
1233      master.getMasterProcedureExecutor().submitProcedure(createSplitProcedure(parent,
1234        splitKey));
1235    } else {
1236      LOG.info("Ignoring split request from " + serverName +
1237        ", parent=" + parent + " because parent is unknown or not open");
1238      return;
1239    }
1240
1241    // If the RS is < 2.0 throw an exception to abort the operation, we are handling the split
1242    if (master.getServerManager().getVersionNumber(serverName) < 0x0200000) {
1243      throw new UnsupportedOperationException(String.format("Split handled by the master: " +
1244        "parent=%s hriA=%s hriB=%s", parent.getShortNameToLog(), hriA, hriB));
1245    }
1246  }
1247
1248  private void updateRegionMergeTransition(final ServerName serverName, final TransitionCode state,
1249      final RegionInfo merged, final RegionInfo hriA, final RegionInfo hriB) throws IOException {
1250    checkMetaLoaded(merged);
1251
1252    if (state != TransitionCode.READY_TO_MERGE) {
1253      throw new UnexpectedStateException("Unsupported merge regionState=" + state +
1254        " for regionA=" + hriA + " regionB=" + hriB + " merged=" + merged +
1255        " maybe an old RS (< 2.0) had the operation in progress");
1256    }
1257
1258    if (!master.isSplitOrMergeEnabled(MasterSwitchType.MERGE)) {
1259      LOG.warn("Merge switch is off! skip merge of regionA=" + hriA + " regionB=" + hriB);
1260      throw new DoNotRetryIOException("Merge of regionA=" + hriA + " regionB=" + hriB +
1261        " failed because merge switch is off");
1262    }
1263
1264    // Submit the Merge procedure
1265    if (LOG.isDebugEnabled()) {
1266      LOG.debug("Handling merge request from RS=" + merged + ", merged=" + merged);
1267    }
1268    master.getMasterProcedureExecutor().submitProcedure(createMergeProcedure(hriA, hriB));
1269
1270    // If the RS is < 2.0 throw an exception to abort the operation, we are handling the merge
1271    if (master.getServerManager().getVersionNumber(serverName) < 0x0200000) {
1272      throw new UnsupportedOperationException(String.format(
1273        "Merge not handled yet: regionState=%s merged=%s hriA=%s hriB=%s", state, merged, hriA,
1274          hriB));
1275    }
1276  }
1277
1278  // ============================================================================================
1279  //  RS Status update (report online regions) helpers
1280  // ============================================================================================
1281  /**
1282   * The master will call this method when the RS send the regionServerReport(). The report will
1283   * contains the "online regions". This method will check the the online regions against the
1284   * in-memory state of the AM, and we will log a warn message if there is a mismatch. This is
1285   * because that there is no fencing between the reportRegionStateTransition method and
1286   * regionServerReport method, so there could be race and introduce inconsistency here, but
1287   * actually there is no problem.
1288   * <p/>
1289   * Please see HBASE-21421 and HBASE-21463 for more details.
1290   */
1291  public void reportOnlineRegions(ServerName serverName, Set<byte[]> regionNames) {
1292    if (!isRunning()) {
1293      return;
1294    }
1295    if (LOG.isTraceEnabled()) {
1296      LOG.trace("ReportOnlineRegions {} regionCount={}, metaLoaded={} {}", serverName,
1297        regionNames.size(), isMetaLoaded(),
1298        regionNames.stream().map(Bytes::toStringBinary).collect(Collectors.toList()));
1299    }
1300
1301    ServerStateNode serverNode = regionStates.getOrCreateServer(serverName);
1302    synchronized (serverNode) {
1303      if (!serverNode.isInState(ServerState.ONLINE)) {
1304        LOG.warn("Got a report from a server result in state " + serverNode.getState());
1305        return;
1306      }
1307    }
1308
1309    // Track the regionserver reported online regions in memory.
1310    synchronized (rsReports) {
1311      rsReports.put(serverName, regionNames);
1312    }
1313
1314    if (regionNames.isEmpty()) {
1315      // nothing to do if we don't have regions
1316      LOG.trace("no online region found on {}", serverName);
1317      return;
1318    }
1319    if (!isMetaLoaded()) {
1320      // we are still on startup, skip checking
1321      return;
1322    }
1323    // The Heartbeat tells us of what regions are on the region serve, check the state.
1324    checkOnlineRegionsReport(serverNode, regionNames);
1325  }
1326
1327  /**
1328   * Close <code>regionName</code> on <code>sn</code> silently and immediately without
1329   * using a Procedure or going via hbase:meta. For case where a RegionServer's hosting
1330   * of a Region is not aligned w/ the Master's accounting of Region state. This is for
1331   * cleaning up an error in accounting.
1332   */
1333  private void closeRegionSilently(ServerName sn, byte [] regionName) {
1334    try {
1335      RegionInfo ri = CatalogFamilyFormat.parseRegionInfoFromRegionName(regionName);
1336      // Pass -1 for timeout. Means do not wait.
1337      ServerManager.closeRegionSilentlyAndWait(this.master.getAsyncClusterConnection(), sn, ri, -1);
1338    } catch (Exception e) {
1339      LOG.error("Failed trying to close {} on {}", Bytes.toStringBinary(regionName), sn, e);
1340    }
1341  }
1342
1343  /**
1344   * Check that what the RegionServer reports aligns with the Master's image.
1345   * If disagreement, we will tell the RegionServer to expediently close
1346   * a Region we do not think it should have.
1347   */
1348  private void checkOnlineRegionsReport(ServerStateNode serverNode, Set<byte[]> regionNames) {
1349    ServerName serverName = serverNode.getServerName();
1350    for (byte[] regionName : regionNames) {
1351      if (!isRunning()) {
1352        return;
1353      }
1354      RegionStateNode regionNode = regionStates.getRegionStateNodeFromName(regionName);
1355      if (regionNode == null) {
1356        String regionNameAsStr = Bytes.toStringBinary(regionName);
1357        LOG.warn("No RegionStateNode for {} but reported as up on {}; closing...",
1358            regionNameAsStr, serverName);
1359        closeRegionSilently(serverNode.getServerName(), regionName);
1360        continue;
1361      }
1362      final long lag = 1000;
1363      regionNode.lock();
1364      try {
1365        long diff = EnvironmentEdgeManager.currentTime() - regionNode.getLastUpdate();
1366        if (regionNode.isInState(State.OPENING, State.OPEN)) {
1367          // This is possible as a region server has just closed a region but the region server
1368          // report is generated before the closing, but arrive after the closing. Make sure there
1369          // is some elapsed time so less false alarms.
1370          if (!regionNode.getRegionLocation().equals(serverName) && diff > lag) {
1371            LOG.warn("Reporting {} server does not match {} (time since last " +
1372                    "update={}ms); closing...",
1373              serverName, regionNode, diff);
1374            closeRegionSilently(serverNode.getServerName(), regionName);
1375          }
1376        } else if (!regionNode.isInState(State.CLOSING, State.SPLITTING)) {
1377          // So, we can get report that a region is CLOSED or SPLIT because a heartbeat
1378          // came in at about same time as a region transition. Make sure there is some
1379          // elapsed time so less false alarms.
1380          if (diff > lag) {
1381            LOG.warn("Reporting {} state does not match {} (time since last update={}ms)",
1382              serverName, regionNode, diff);
1383          }
1384        }
1385      } finally {
1386        regionNode.unlock();
1387      }
1388    }
1389  }
1390
1391  // ============================================================================================
1392  //  RIT chore
1393  // ============================================================================================
1394  private static class RegionInTransitionChore extends ProcedureInMemoryChore<MasterProcedureEnv> {
1395    public RegionInTransitionChore(final int timeoutMsec) {
1396      super(timeoutMsec);
1397    }
1398
1399    @Override
1400    protected void periodicExecute(final MasterProcedureEnv env) {
1401      final AssignmentManager am = env.getAssignmentManager();
1402
1403      final RegionInTransitionStat ritStat = am.computeRegionInTransitionStat();
1404      if (ritStat.hasRegionsOverThreshold()) {
1405        for (RegionState hri: ritStat.getRegionOverThreshold()) {
1406          am.handleRegionOverStuckWarningThreshold(hri.getRegion());
1407        }
1408      }
1409
1410      // update metrics
1411      am.updateRegionsInTransitionMetrics(ritStat);
1412    }
1413  }
1414
1415  private static class DeadServerMetricRegionChore
1416      extends ProcedureInMemoryChore<MasterProcedureEnv> {
1417    public DeadServerMetricRegionChore(final int timeoutMsec) {
1418      super(timeoutMsec);
1419    }
1420
1421    @Override
1422    protected void periodicExecute(final MasterProcedureEnv env) {
1423      final ServerManager sm = env.getMasterServices().getServerManager();
1424      final AssignmentManager am = env.getAssignmentManager();
1425      // To minimize inconsistencies we are not going to snapshot live servers in advance in case
1426      // new servers are added; OTOH we don't want to add heavy sync for a consistent view since
1427      // this is for metrics. Instead, we're going to check each regions as we go; to avoid making
1428      // too many checks, we maintain a local lists of server, limiting us to false negatives. If
1429      // we miss some recently-dead server, we'll just see it next time.
1430      Set<ServerName> recentlyLiveServers = new HashSet<>();
1431      int deadRegions = 0, unknownRegions = 0;
1432      for (RegionStateNode rsn : am.getRegionStates().getRegionStateNodes()) {
1433        if (rsn.getState() != State.OPEN) {
1434          continue; // Opportunistic check, should quickly skip RITs, offline tables, etc.
1435        }
1436        // Do not need to acquire region state lock as this is only for showing metrics.
1437        ServerName sn = rsn.getRegionLocation();
1438        State state = rsn.getState();
1439        if (state != State.OPEN) {
1440          continue; // Mostly skipping RITs that are already being take care of.
1441        }
1442        if (sn == null) {
1443          ++unknownRegions; // Opened on null?
1444          continue;
1445        }
1446        if (recentlyLiveServers.contains(sn)) {
1447          continue;
1448        }
1449        ServerManager.ServerLiveState sls = sm.isServerKnownAndOnline(sn);
1450        switch (sls) {
1451          case LIVE:
1452            recentlyLiveServers.add(sn);
1453            break;
1454          case DEAD:
1455            ++deadRegions;
1456            break;
1457          case UNKNOWN:
1458            ++unknownRegions;
1459            break;
1460          default: throw new AssertionError("Unexpected " + sls);
1461        }
1462      }
1463      if (deadRegions > 0 || unknownRegions > 0) {
1464        LOG.info("Found {} OPEN regions on dead servers and {} OPEN regions on unknown servers",
1465          deadRegions, unknownRegions);
1466      }
1467
1468      am.updateDeadServerRegionMetrics(deadRegions, unknownRegions);
1469    }
1470  }
1471
1472  public RegionInTransitionStat computeRegionInTransitionStat() {
1473    final RegionInTransitionStat rit = new RegionInTransitionStat(getConfiguration());
1474    rit.update(this);
1475    return rit;
1476  }
1477
1478  public static class RegionInTransitionStat {
1479    private final int ritThreshold;
1480
1481    private HashMap<String, RegionState> ritsOverThreshold = null;
1482    private long statTimestamp;
1483    private long oldestRITTime = 0;
1484    private int totalRITsTwiceThreshold = 0;
1485    private int totalRITs = 0;
1486
1487    public RegionInTransitionStat(final Configuration conf) {
1488      this.ritThreshold =
1489        conf.getInt(METRICS_RIT_STUCK_WARNING_THRESHOLD, DEFAULT_RIT_STUCK_WARNING_THRESHOLD);
1490    }
1491
1492    public int getRITThreshold() {
1493      return ritThreshold;
1494    }
1495
1496    public long getTimestamp() {
1497      return statTimestamp;
1498    }
1499
1500    public int getTotalRITs() {
1501      return totalRITs;
1502    }
1503
1504    public long getOldestRITTime() {
1505      return oldestRITTime;
1506    }
1507
1508    public int getTotalRITsOverThreshold() {
1509      Map<String, RegionState> m = this.ritsOverThreshold;
1510      return m != null ? m.size() : 0;
1511    }
1512
1513    public boolean hasRegionsTwiceOverThreshold() {
1514      return totalRITsTwiceThreshold > 0;
1515    }
1516
1517    public boolean hasRegionsOverThreshold() {
1518      Map<String, RegionState> m = this.ritsOverThreshold;
1519      return m != null && !m.isEmpty();
1520    }
1521
1522    public Collection<RegionState> getRegionOverThreshold() {
1523      Map<String, RegionState> m = this.ritsOverThreshold;
1524      return m != null? m.values(): Collections.emptySet();
1525    }
1526
1527    public boolean isRegionOverThreshold(final RegionInfo regionInfo) {
1528      Map<String, RegionState> m = this.ritsOverThreshold;
1529      return m != null && m.containsKey(regionInfo.getEncodedName());
1530    }
1531
1532    public boolean isRegionTwiceOverThreshold(final RegionInfo regionInfo) {
1533      Map<String, RegionState> m = this.ritsOverThreshold;
1534      if (m == null) {
1535        return false;
1536      }
1537      final RegionState state = m.get(regionInfo.getEncodedName());
1538      if (state == null) {
1539        return false;
1540      }
1541      return (statTimestamp - state.getStamp()) > (ritThreshold * 2);
1542    }
1543
1544    protected void update(final AssignmentManager am) {
1545      final RegionStates regionStates = am.getRegionStates();
1546      this.statTimestamp = EnvironmentEdgeManager.currentTime();
1547      update(regionStates.getRegionsStateInTransition(), statTimestamp);
1548      update(regionStates.getRegionFailedOpen(), statTimestamp);
1549
1550      if (LOG.isDebugEnabled() && ritsOverThreshold != null && !ritsOverThreshold.isEmpty()) {
1551        LOG.debug("RITs over threshold: {}",
1552          ritsOverThreshold.entrySet().stream()
1553            .map(e -> e.getKey() + ":" + e.getValue().getState().name())
1554            .collect(Collectors.joining("\n")));
1555      }
1556    }
1557
1558    private void update(final Collection<RegionState> regions, final long currentTime) {
1559      for (RegionState state: regions) {
1560        totalRITs++;
1561        final long ritStartedMs = state.getStamp();
1562        if (ritStartedMs == 0) {
1563          // Don't output bogus values to metrics if they accidentally make it here.
1564          LOG.warn("The RIT {} has no start time", state.getRegion());
1565          continue;
1566        }
1567        final long ritTime = currentTime - ritStartedMs;
1568        if (ritTime > ritThreshold) {
1569          if (ritsOverThreshold == null) {
1570            ritsOverThreshold = new HashMap<String, RegionState>();
1571          }
1572          ritsOverThreshold.put(state.getRegion().getEncodedName(), state);
1573          totalRITsTwiceThreshold += (ritTime > (ritThreshold * 2)) ? 1 : 0;
1574        }
1575        if (oldestRITTime < ritTime) {
1576          oldestRITTime = ritTime;
1577        }
1578      }
1579    }
1580  }
1581
1582  private void updateRegionsInTransitionMetrics(final RegionInTransitionStat ritStat) {
1583    metrics.updateRITOldestAge(ritStat.getOldestRITTime());
1584    metrics.updateRITCount(ritStat.getTotalRITs());
1585    metrics.updateRITCountOverThreshold(ritStat.getTotalRITsOverThreshold());
1586  }
1587
1588  private void updateDeadServerRegionMetrics(int deadRegions, int unknownRegions) {
1589    metrics.updateDeadServerOpenRegions(deadRegions);
1590    metrics.updateUnknownServerOpenRegions(unknownRegions);
1591  }
1592
1593  private void handleRegionOverStuckWarningThreshold(final RegionInfo regionInfo) {
1594    final RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo);
1595    //if (regionNode.isStuck()) {
1596    LOG.warn("STUCK Region-In-Transition {}", regionNode);
1597  }
1598
1599  // ============================================================================================
1600  //  TODO: Master load/bootstrap
1601  // ============================================================================================
1602  public void joinCluster() throws IOException {
1603    long startTime = System.nanoTime();
1604    LOG.debug("Joining cluster...");
1605
1606    // Scan hbase:meta to build list of existing regions, servers, and assignment.
1607    // hbase:meta is online now or will be. Inside loadMeta, we keep trying. Can't make progress
1608    // w/o  meta.
1609    loadMeta();
1610
1611    while (master.getServerManager().countOfRegionServers() < 1) {
1612      LOG.info("Waiting for RegionServers to join; current count={}",
1613        master.getServerManager().countOfRegionServers());
1614      Threads.sleep(250);
1615    }
1616    LOG.info("Number of RegionServers={}", master.getServerManager().countOfRegionServers());
1617
1618    // Start the chores
1619    master.getMasterProcedureExecutor().addChore(this.ritChore);
1620    master.getMasterProcedureExecutor().addChore(this.deadMetricChore);
1621
1622    long costMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
1623    LOG.info("Joined the cluster in {}", StringUtils.humanTimeDiff(costMs));
1624  }
1625
1626  /**
1627   * Create assign procedure for offline regions.
1628   * Just follow the old processofflineServersWithOnlineRegions method. Since now we do not need to
1629   * deal with dead server any more, we only deal with the regions in OFFLINE state in this method.
1630   * And this is a bit strange, that for new regions, we will add it in CLOSED state instead of
1631   * OFFLINE state, and usually there will be a procedure to track them. The
1632   * processofflineServersWithOnlineRegions is a legacy from long ago, as things are going really
1633   * different now, maybe we do not need this method any more. Need to revisit later.
1634   */
1635  // Public so can be run by the Master as part of the startup. Needs hbase:meta to be online.
1636  // Needs to be done after the table state manager has been started.
1637  public void processOfflineRegions() {
1638    TransitRegionStateProcedure[] procs =
1639      regionStates.getRegionStateNodes().stream().filter(rsn -> rsn.isInState(State.OFFLINE))
1640        .filter(rsn -> isTableEnabled(rsn.getRegionInfo().getTable())).map(rsn -> {
1641          rsn.lock();
1642          try {
1643            if (rsn.getProcedure() != null) {
1644              return null;
1645            } else {
1646              return rsn.setProcedure(TransitRegionStateProcedure.assign(getProcedureEnvironment(),
1647                rsn.getRegionInfo(), null));
1648            }
1649          } finally {
1650            rsn.unlock();
1651          }
1652        }).filter(p -> p != null).toArray(TransitRegionStateProcedure[]::new);
1653    if (procs.length > 0) {
1654      master.getMasterProcedureExecutor().submitProcedures(procs);
1655    }
1656  }
1657
1658  /* AM internal RegionStateStore.RegionStateVisitor implementation. To be used when
1659   * scanning META table for region rows, using RegionStateStore utility methods. RegionStateStore
1660   * methods will convert Result into proper RegionInfo instances, but those would still need to be
1661   * added into AssignmentManager.regionStates in-memory cache.
1662   * RegionMetaLoadingVisitor.visitRegionState method provides the logic for adding RegionInfo
1663   * instances as loaded from latest META scan into AssignmentManager.regionStates.
1664   */
1665  private class RegionMetaLoadingVisitor implements RegionStateStore.RegionStateVisitor  {
1666
1667    @Override
1668    public void visitRegionState(Result result, final RegionInfo regionInfo, final State state,
1669      final ServerName regionLocation, final ServerName lastHost, final long openSeqNum) {
1670      if (state == null && regionLocation == null && lastHost == null &&
1671        openSeqNum == SequenceId.NO_SEQUENCE_ID) {
1672        // This is a row with nothing in it.
1673        LOG.warn("Skipping empty row={}", result);
1674        return;
1675      }
1676      State localState = state;
1677      if (localState == null) {
1678        // No region state column data in hbase:meta table! Are I doing a rolling upgrade from
1679        // hbase1 to hbase2? Am I restoring a SNAPSHOT or otherwise adding a region to hbase:meta?
1680        // In any of these cases, state is empty. For now, presume OFFLINE but there are probably
1681        // cases where we need to probe more to be sure this correct; TODO informed by experience.
1682        LOG.info(regionInfo.getEncodedName() + " regionState=null; presuming " + State.OFFLINE);
1683        localState = State.OFFLINE;
1684      }
1685      RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo);
1686      // Do not need to lock on regionNode, as we can make sure that before we finish loading
1687      // meta, all the related procedures can not be executed. The only exception is for meta
1688      // region related operations, but here we do not load the informations for meta region.
1689      regionNode.setState(localState);
1690      regionNode.setLastHost(lastHost);
1691      regionNode.setRegionLocation(regionLocation);
1692      regionNode.setOpenSeqNum(openSeqNum);
1693
1694      // Note: keep consistent with other methods, see region(Opening|Opened|Closing)
1695      //       RIT/ServerCrash handling should take care of the transiting regions.
1696      if (localState.matches(State.OPEN, State.OPENING, State.CLOSING, State.SPLITTING,
1697        State.MERGING)) {
1698        assert regionLocation != null : "found null region location for " + regionNode;
1699        regionStates.addRegionToServer(regionNode);
1700      } else if (localState == State.OFFLINE || regionInfo.isOffline()) {
1701        regionStates.addToOfflineRegions(regionNode);
1702      }
1703      if (regionNode.getProcedure() != null) {
1704        regionNode.getProcedure().stateLoaded(AssignmentManager.this, regionNode);
1705      }
1706    }
1707  };
1708
1709  /**
1710   * Query META if the given <code>RegionInfo</code> exists, adding to
1711   * <code>AssignmentManager.regionStateStore</code> cache if the region is found in META.
1712   * @param regionEncodedName encoded name for the region to be loaded from META into
1713   *                          <code>AssignmentManager.regionStateStore</code> cache
1714   * @return <code>RegionInfo</code> instance for the given region if it is present in META
1715   *          and got successfully loaded into <code>AssignmentManager.regionStateStore</code>
1716   *          cache, <b>null</b> otherwise.
1717   * @throws UnknownRegionException if any errors occur while querying meta.
1718   */
1719  public RegionInfo loadRegionFromMeta(String regionEncodedName) throws UnknownRegionException {
1720    try {
1721      RegionMetaLoadingVisitor visitor = new RegionMetaLoadingVisitor();
1722      regionStateStore.visitMetaForRegion(regionEncodedName, visitor);
1723      return regionStates.getRegionState(regionEncodedName) == null ? null :
1724        regionStates.getRegionState(regionEncodedName).getRegion();
1725    } catch (IOException e) {
1726      throw new UnknownRegionException(
1727          "Error trying to load region " + regionEncodedName + " from META", e);
1728    }
1729  }
1730
1731  private void loadMeta() throws IOException {
1732    // TODO: use a thread pool
1733    regionStateStore.visitMeta(new RegionMetaLoadingVisitor());
1734  }
1735
1736  /**
1737   * Used to check if the meta loading is done.
1738   * <p/>
1739   * if not we throw PleaseHoldException since we are rebuilding the RegionStates
1740   * @param hri region to check if it is already rebuild
1741   * @throws PleaseHoldException if meta has not been loaded yet
1742   */
1743  private void checkMetaLoaded(RegionInfo hri) throws PleaseHoldException {
1744    if (!isRunning()) {
1745      throw new PleaseHoldException("AssignmentManager not running");
1746    }
1747    boolean meta = isMetaRegion(hri);
1748    boolean metaLoaded = isMetaLoaded();
1749    if (!meta && !metaLoaded) {
1750      throw new PleaseHoldException(
1751        "Master not fully online; hbase:meta=" + meta + ", metaLoaded=" + metaLoaded);
1752    }
1753  }
1754
1755  // ============================================================================================
1756  //  TODO: Metrics
1757  // ============================================================================================
1758  public int getNumRegionsOpened() {
1759    // TODO: Used by TestRegionPlacement.java and assume monotonically increasing value
1760    return 0;
1761  }
1762
1763  /**
1764   * Usually run by the Master in reaction to server crash during normal processing.
1765   * Can also be invoked via external RPC to effect repair; in the latter case,
1766   * the 'force' flag is set so we push through the SCP though context may indicate
1767   * already-running-SCP (An old SCP may have exited abnormally, or damaged cluster
1768   * may still have references in hbase:meta to 'Unknown Servers' -- servers that
1769   * are not online or in dead servers list, etc.)
1770   * @param force Set if the request came in externally over RPC (via hbck2). Force means
1771   *              run the SCP even if it seems as though there might be an outstanding
1772   *              SCP running.
1773   * @return pid of scheduled SCP or {@link Procedure#NO_PROC_ID} if none scheduled.
1774   */
1775  public long submitServerCrash(ServerName serverName, boolean shouldSplitWal, boolean force) {
1776    // May be an 'Unknown Server' so handle case where serverNode is null.
1777    ServerStateNode serverNode = regionStates.getServerNode(serverName);
1778    // Remove the in-memory rsReports result
1779    synchronized (rsReports) {
1780      rsReports.remove(serverName);
1781    }
1782
1783    // We hold the write lock here for fencing on reportRegionStateTransition. Once we set the
1784    // server state to CRASHED, we will no longer accept the reportRegionStateTransition call from
1785    // this server. This is used to simplify the implementation for TRSP and SCP, where we can make
1786    // sure that, the region list fetched by SCP will not be changed any more.
1787    if (serverNode != null) {
1788      serverNode.writeLock().lock();
1789    }
1790    boolean carryingMeta;
1791    long pid;
1792    try {
1793      ProcedureExecutor<MasterProcedureEnv> procExec = this.master.getMasterProcedureExecutor();
1794      carryingMeta = isCarryingMeta(serverName);
1795      if (!force && serverNode != null && !serverNode.isInState(ServerState.ONLINE)) {
1796        LOG.info("Skip adding ServerCrashProcedure for {} (meta={}) -- running?",
1797          serverNode, carryingMeta);
1798        return Procedure.NO_PROC_ID;
1799      } else {
1800        MasterProcedureEnv mpe = procExec.getEnvironment();
1801        // If serverNode == null, then 'Unknown Server'. Schedule HBCKSCP instead.
1802        // HBCKSCP scours Master in-memory state AND hbase;meta for references to
1803        // serverName just-in-case. An SCP that is scheduled when the server is
1804        // 'Unknown' probably originated externally with HBCK2 fix-it tool.
1805        ServerState oldState = null;
1806        if (serverNode != null) {
1807          oldState = serverNode.getState();
1808          serverNode.setState(ServerState.CRASHED);
1809        }
1810
1811        if (force) {
1812          pid = procExec.submitProcedure(
1813              new HBCKServerCrashProcedure(mpe, serverName, shouldSplitWal, carryingMeta));
1814        } else {
1815          pid = procExec.submitProcedure(
1816              new ServerCrashProcedure(mpe, serverName, shouldSplitWal, carryingMeta));
1817        }
1818        LOG.info("Scheduled ServerCrashProcedure pid={} for {} (carryingMeta={}){}.",
1819          pid, serverName, carryingMeta,
1820          serverNode == null? "": " " + serverNode.toString() + ", oldState=" + oldState);
1821      }
1822    } finally {
1823      if (serverNode != null) {
1824        serverNode.writeLock().unlock();
1825      }
1826    }
1827    return pid;
1828  }
1829
1830  public void offlineRegion(final RegionInfo regionInfo) {
1831    // TODO used by MasterRpcServices
1832    RegionStateNode node = regionStates.getRegionStateNode(regionInfo);
1833    if (node != null) {
1834      node.offline();
1835    }
1836  }
1837
1838  public void onlineRegion(final RegionInfo regionInfo, final ServerName serverName) {
1839    // TODO used by TestSplitTransactionOnCluster.java
1840  }
1841
1842  public Map<ServerName, List<RegionInfo>> getSnapShotOfAssignment(
1843      final Collection<RegionInfo> regions) {
1844    return regionStates.getSnapShotOfAssignment(regions);
1845  }
1846
1847  // ============================================================================================
1848  //  TODO: UTILS/HELPERS?
1849  // ============================================================================================
1850  /**
1851   * Used by the client (via master) to identify if all regions have the schema updates
1852   * @return Pair indicating the status of the alter command (pending/total)
1853   */
1854  public Pair<Integer, Integer> getReopenStatus(TableName tableName) {
1855    if (isTableDisabled(tableName)) {
1856      return new Pair<Integer, Integer>(0, 0);
1857    }
1858
1859    final List<RegionState> states = regionStates.getTableRegionStates(tableName);
1860    int ritCount = 0;
1861    for (RegionState regionState: states) {
1862      if (!regionState.isOpened() && !regionState.isSplit()) {
1863        ritCount++;
1864      }
1865    }
1866    return new Pair<Integer, Integer>(ritCount, states.size());
1867  }
1868
1869  // ============================================================================================
1870  //  TODO: Region State In Transition
1871  // ============================================================================================
1872  public boolean hasRegionsInTransition() {
1873    return regionStates.hasRegionsInTransition();
1874  }
1875
1876  public List<RegionStateNode> getRegionsInTransition() {
1877    return regionStates.getRegionsInTransition();
1878  }
1879
1880  public List<RegionInfo> getAssignedRegions() {
1881    return regionStates.getAssignedRegions();
1882  }
1883
1884  public RegionInfo getRegionInfo(final byte[] regionName) {
1885    final RegionStateNode regionState = regionStates.getRegionStateNodeFromName(regionName);
1886    return regionState != null ? regionState.getRegionInfo() : null;
1887  }
1888
1889  // ============================================================================================
1890  //  Expected states on region state transition.
1891  //  Notice that there is expected states for transiting to OPENING state, this is because SCP.
1892  //  See the comments in regionOpening method for more details.
1893  // ============================================================================================
1894  private static final State[] STATES_EXPECTED_ON_OPEN = {
1895    State.OPENING, // Normal case
1896    State.OPEN // Retrying
1897  };
1898
1899  private static final State[] STATES_EXPECTED_ON_CLOSING = {
1900    State.OPEN, // Normal case
1901    State.CLOSING, // Retrying
1902    State.SPLITTING, // Offline the split parent
1903    State.MERGING // Offline the merge parents
1904  };
1905
1906  private static final State[] STATES_EXPECTED_ON_CLOSED = {
1907    State.CLOSING, // Normal case
1908    State.CLOSED // Retrying
1909  };
1910
1911  // This is for manually scheduled region assign, can add other states later if we find out other
1912  // usages
1913  private static final State[] STATES_EXPECTED_ON_ASSIGN = { State.CLOSED, State.OFFLINE };
1914
1915  // We only allow unassign or move a region which is in OPEN state.
1916  private static final State[] STATES_EXPECTED_ON_UNASSIGN_OR_MOVE = { State.OPEN };
1917
1918  // ============================================================================================
1919  // Region Status update
1920  // Should only be called in TransitRegionStateProcedure(and related procedures), as the locking
1921  // and pre-assumptions are very tricky.
1922  // ============================================================================================
1923  private void transitStateAndUpdate(RegionStateNode regionNode, RegionState.State newState,
1924      RegionState.State... expectedStates) throws IOException {
1925    RegionState.State state = regionNode.getState();
1926    regionNode.transitionState(newState, expectedStates);
1927    boolean succ = false;
1928    try {
1929      regionStateStore.updateRegionLocation(regionNode);
1930      succ = true;
1931    } finally {
1932      if (!succ) {
1933        // revert
1934        regionNode.setState(state);
1935      }
1936    }
1937  }
1938
1939  // should be called within the synchronized block of RegionStateNode
1940  void regionOpening(RegionStateNode regionNode) throws IOException {
1941    // As in SCP, for performance reason, there is no TRSP attached with this region, we will not
1942    // update the region state, which means that the region could be in any state when we want to
1943    // assign it after a RS crash. So here we do not pass the expectedStates parameter.
1944    transitStateAndUpdate(regionNode, State.OPENING);
1945    regionStates.addRegionToServer(regionNode);
1946    // update the operation count metrics
1947    metrics.incrementOperationCounter();
1948  }
1949
1950  // should be called under the RegionStateNode lock
1951  // The parameter 'giveUp' means whether we will try to open the region again, if it is true, then
1952  // we will persist the FAILED_OPEN state into hbase:meta.
1953  void regionFailedOpen(RegionStateNode regionNode, boolean giveUp) throws IOException {
1954    RegionState.State state = regionNode.getState();
1955    ServerName regionLocation = regionNode.getRegionLocation();
1956    if (giveUp) {
1957      regionNode.setState(State.FAILED_OPEN);
1958      regionNode.setRegionLocation(null);
1959      boolean succ = false;
1960      try {
1961        regionStateStore.updateRegionLocation(regionNode);
1962        succ = true;
1963      } finally {
1964        if (!succ) {
1965          // revert
1966          regionNode.setState(state);
1967          regionNode.setRegionLocation(regionLocation);
1968        }
1969      }
1970    }
1971    if (regionLocation != null) {
1972      regionStates.removeRegionFromServer(regionLocation, regionNode);
1973    }
1974  }
1975
1976  // should be called under the RegionStateNode lock
1977  void regionClosing(RegionStateNode regionNode) throws IOException {
1978    transitStateAndUpdate(regionNode, State.CLOSING, STATES_EXPECTED_ON_CLOSING);
1979
1980    RegionInfo hri = regionNode.getRegionInfo();
1981    // Set meta has not initialized early. so people trying to create/edit tables will wait
1982    if (isMetaRegion(hri)) {
1983      setMetaAssigned(hri, false);
1984    }
1985    regionStates.addRegionToServer(regionNode);
1986    // update the operation count metrics
1987    metrics.incrementOperationCounter();
1988  }
1989
1990  // for open and close, they will first be persist to the procedure store in
1991  // RegionRemoteProcedureBase. So here we will first change the in memory state as it is considered
1992  // as succeeded if the persistence to procedure store is succeeded, and then when the
1993  // RegionRemoteProcedureBase is woken up, we will persist the RegionStateNode to hbase:meta.
1994
1995  // should be called under the RegionStateNode lock
1996  void regionOpenedWithoutPersistingToMeta(RegionStateNode regionNode) throws IOException {
1997    regionNode.transitionState(State.OPEN, STATES_EXPECTED_ON_OPEN);
1998    RegionInfo regionInfo = regionNode.getRegionInfo();
1999    regionStates.addRegionToServer(regionNode);
2000    regionStates.removeFromFailedOpen(regionInfo);
2001  }
2002
2003  // should be called under the RegionStateNode lock
2004  void regionClosedWithoutPersistingToMeta(RegionStateNode regionNode) throws IOException {
2005    ServerName regionLocation = regionNode.getRegionLocation();
2006    regionNode.transitionState(State.CLOSED, STATES_EXPECTED_ON_CLOSED);
2007    regionNode.setRegionLocation(null);
2008    if (regionLocation != null) {
2009      regionNode.setLastHost(regionLocation);
2010      regionStates.removeRegionFromServer(regionLocation, regionNode);
2011    }
2012  }
2013
2014  // should be called under the RegionStateNode lock
2015  // for SCP
2016  public void regionClosedAbnormally(RegionStateNode regionNode) throws IOException {
2017    RegionState.State state = regionNode.getState();
2018    ServerName regionLocation = regionNode.getRegionLocation();
2019    regionNode.transitionState(State.ABNORMALLY_CLOSED);
2020    regionNode.setRegionLocation(null);
2021    boolean succ = false;
2022    try {
2023      regionStateStore.updateRegionLocation(regionNode);
2024      succ = true;
2025    } finally {
2026      if (!succ) {
2027        // revert
2028        regionNode.setState(state);
2029        regionNode.setRegionLocation(regionLocation);
2030      }
2031    }
2032    if (regionLocation != null) {
2033      regionNode.setLastHost(regionLocation);
2034      regionStates.removeRegionFromServer(regionLocation, regionNode);
2035    }
2036  }
2037
2038  void persistToMeta(RegionStateNode regionNode) throws IOException {
2039    regionStateStore.updateRegionLocation(regionNode);
2040    RegionInfo regionInfo = regionNode.getRegionInfo();
2041    if (isMetaRegion(regionInfo) && regionNode.getState() == State.OPEN) {
2042      // Usually we'd set a table ENABLED at this stage but hbase:meta is ALWAYs enabled, it
2043      // can't be disabled -- so skip the RPC (besides... enabled is managed by TableStateManager
2044      // which is backed by hbase:meta... Avoid setting ENABLED to avoid having to update state
2045      // on table that contains state.
2046      setMetaAssigned(regionInfo, true);
2047    }
2048  }
2049
2050  // ============================================================================================
2051  // The above methods can only be called in TransitRegionStateProcedure(and related procedures)
2052  // ============================================================================================
2053
2054  public void markRegionAsSplit(final RegionInfo parent, final ServerName serverName,
2055      final RegionInfo daughterA, final RegionInfo daughterB) throws IOException {
2056    // Update hbase:meta. Parent will be marked offline and split up in hbase:meta.
2057    // The parent stays in regionStates until cleared when removed by CatalogJanitor.
2058    // Update its state in regionStates to it shows as offline and split when read
2059    // later figuring what regions are in a table and what are not: see
2060    // regionStates#getRegionsOfTable
2061    final RegionStateNode node = regionStates.getOrCreateRegionStateNode(parent);
2062    node.setState(State.SPLIT);
2063    final RegionStateNode nodeA = regionStates.getOrCreateRegionStateNode(daughterA);
2064    nodeA.setState(State.SPLITTING_NEW);
2065    final RegionStateNode nodeB = regionStates.getOrCreateRegionStateNode(daughterB);
2066    nodeB.setState(State.SPLITTING_NEW);
2067
2068    TableDescriptor td = master.getTableDescriptors().get(parent.getTable());
2069    // TODO: here we just update the parent region info in meta, to set split and offline to true,
2070    // without changing the one in the region node. This is a bit confusing but the region info
2071    // field in RegionStateNode is not expected to be changed in the current design. Need to find a
2072    // possible way to address this problem, or at least adding more comments about the trick to
2073    // deal with this problem, that when you want to filter out split parent, you need to check both
2074    // the RegionState on whether it is split, and also the region info. If one of them matches then
2075    // it is a split parent. And usually only one of them can match, as after restart, the region
2076    // state will be changed from SPLIT to CLOSED.
2077    regionStateStore.splitRegion(parent, daughterA, daughterB, serverName, td);
2078    if (shouldAssignFavoredNodes(parent)) {
2079      List<ServerName> onlineServers = this.master.getServerManager().getOnlineServersList();
2080      getFavoredNodePromoter().generateFavoredNodesForDaughter(onlineServers, parent, daughterA,
2081        daughterB);
2082    }
2083  }
2084
2085  /**
2086   * When called here, the merge has happened. The merged regions have been
2087   * unassigned and the above markRegionClosed has been called on each so they have been
2088   * disassociated from a hosting Server. The merged region will be open after this call. The
2089   * merged regions are removed from hbase:meta below. Later they are deleted from the filesystem
2090   * by the catalog janitor running against hbase:meta. It notices when the merged region no
2091   * longer holds references to the old regions (References are deleted after a compaction
2092   * rewrites what the Reference points at but not until the archiver chore runs, are the
2093   * References removed).
2094   */
2095  public void markRegionAsMerged(final RegionInfo child, final ServerName serverName,
2096        RegionInfo [] mergeParents)
2097      throws IOException {
2098    final RegionStateNode node = regionStates.getOrCreateRegionStateNode(child);
2099    node.setState(State.MERGED);
2100    for (RegionInfo ri: mergeParents) {
2101      regionStates.deleteRegion(ri);
2102    }
2103    TableDescriptor td = master.getTableDescriptors().get(child.getTable());
2104    regionStateStore.mergeRegions(child, mergeParents, serverName, td);
2105    if (shouldAssignFavoredNodes(child)) {
2106      getFavoredNodePromoter().generateFavoredNodesForMergedRegion(child, mergeParents);
2107    }
2108  }
2109
2110  /*
2111   * Favored nodes should be applied only when FavoredNodes balancer is configured and the region
2112   * belongs to a non-system table.
2113   */
2114  private boolean shouldAssignFavoredNodes(RegionInfo region) {
2115    return this.shouldAssignRegionsWithFavoredNodes &&
2116        FavoredNodesManager.isFavoredNodeApplicable(region);
2117  }
2118
2119  // ============================================================================================
2120  //  Assign Queue (Assign/Balance)
2121  // ============================================================================================
2122  private final ArrayList<RegionStateNode> pendingAssignQueue = new ArrayList<RegionStateNode>();
2123  private final ReentrantLock assignQueueLock = new ReentrantLock();
2124  private final Condition assignQueueFullCond = assignQueueLock.newCondition();
2125
2126  /**
2127   * Add the assign operation to the assignment queue.
2128   * The pending assignment operation will be processed,
2129   * and each region will be assigned by a server using the balancer.
2130   */
2131  protected void queueAssign(final RegionStateNode regionNode) {
2132    regionNode.getProcedureEvent().suspend();
2133
2134    // TODO: quick-start for meta and the other sys-tables?
2135    assignQueueLock.lock();
2136    try {
2137      pendingAssignQueue.add(regionNode);
2138      if (regionNode.isSystemTable() ||
2139          pendingAssignQueue.size() == 1 ||
2140          pendingAssignQueue.size() >= assignDispatchWaitQueueMaxSize) {
2141        assignQueueFullCond.signal();
2142      }
2143    } finally {
2144      assignQueueLock.unlock();
2145    }
2146  }
2147
2148  private void startAssignmentThread() {
2149    assignThread = new Thread(master.getServerName().toShortString()) {
2150      @Override
2151      public void run() {
2152        while (isRunning()) {
2153          processAssignQueue();
2154        }
2155        pendingAssignQueue.clear();
2156      }
2157    };
2158    assignThread.setDaemon(true);
2159    assignThread.start();
2160  }
2161
2162  private void stopAssignmentThread() {
2163    assignQueueSignal();
2164    try {
2165      while (assignThread.isAlive()) {
2166        assignQueueSignal();
2167        assignThread.join(250);
2168      }
2169    } catch (InterruptedException e) {
2170      LOG.warn("join interrupted", e);
2171      Thread.currentThread().interrupt();
2172    }
2173  }
2174
2175  private void assignQueueSignal() {
2176    assignQueueLock.lock();
2177    try {
2178      assignQueueFullCond.signal();
2179    } finally {
2180      assignQueueLock.unlock();
2181    }
2182  }
2183
2184  @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP")
2185  private HashMap<RegionInfo, RegionStateNode> waitOnAssignQueue() {
2186    HashMap<RegionInfo, RegionStateNode> regions = null;
2187
2188    assignQueueLock.lock();
2189    try {
2190      if (pendingAssignQueue.isEmpty() && isRunning()) {
2191        assignQueueFullCond.await();
2192      }
2193
2194      if (!isRunning()) {
2195        return null;
2196      }
2197      assignQueueFullCond.await(assignDispatchWaitMillis, TimeUnit.MILLISECONDS);
2198      regions = new HashMap<RegionInfo, RegionStateNode>(pendingAssignQueue.size());
2199      for (RegionStateNode regionNode: pendingAssignQueue) {
2200        regions.put(regionNode.getRegionInfo(), regionNode);
2201      }
2202      pendingAssignQueue.clear();
2203    } catch (InterruptedException e) {
2204      LOG.warn("got interrupted ", e);
2205      Thread.currentThread().interrupt();
2206    } finally {
2207      assignQueueLock.unlock();
2208    }
2209    return regions;
2210  }
2211
2212  private void processAssignQueue() {
2213    final HashMap<RegionInfo, RegionStateNode> regions = waitOnAssignQueue();
2214    if (regions == null || regions.size() == 0 || !isRunning()) {
2215      return;
2216    }
2217
2218    if (LOG.isTraceEnabled()) {
2219      LOG.trace("PROCESS ASSIGN QUEUE regionCount=" + regions.size());
2220    }
2221
2222    // TODO: Optimize balancer. pass a RegionPlan?
2223    final HashMap<RegionInfo, ServerName> retainMap = new HashMap<>();
2224    final List<RegionInfo> userHRIs = new ArrayList<>(regions.size());
2225    // Regions for system tables requiring reassignment
2226    final List<RegionInfo> systemHRIs = new ArrayList<>();
2227    for (RegionStateNode regionStateNode: regions.values()) {
2228      boolean sysTable = regionStateNode.isSystemTable();
2229      final List<RegionInfo> hris = sysTable? systemHRIs: userHRIs;
2230      if (regionStateNode.getRegionLocation() != null) {
2231        retainMap.put(regionStateNode.getRegionInfo(), regionStateNode.getRegionLocation());
2232      } else {
2233        hris.add(regionStateNode.getRegionInfo());
2234      }
2235    }
2236
2237    // TODO: connect with the listener to invalidate the cache
2238
2239    // TODO use events
2240    List<ServerName> servers = master.getServerManager().createDestinationServersList();
2241    for (int i = 0; servers.size() < 1; ++i) {
2242      // Report every fourth time around this loop; try not to flood log.
2243      if (i % 4 == 0) {
2244        LOG.warn("No servers available; cannot place " + regions.size() + " unassigned regions.");
2245      }
2246
2247      if (!isRunning()) {
2248        LOG.debug("Stopped! Dropping assign of " + regions.size() + " queued regions.");
2249        return;
2250      }
2251      Threads.sleep(250);
2252      servers = master.getServerManager().createDestinationServersList();
2253    }
2254
2255    if (!systemHRIs.isEmpty()) {
2256      // System table regions requiring reassignment are present, get region servers
2257      // not available for system table regions
2258      final List<ServerName> excludeServers = getExcludedServersForSystemTable();
2259      List<ServerName> serversForSysTables = servers.stream()
2260          .filter(s -> !excludeServers.contains(s)).collect(Collectors.toList());
2261      if (serversForSysTables.isEmpty()) {
2262        LOG.warn("Filtering old server versions and the excluded produced an empty set; " +
2263            "instead considering all candidate servers!");
2264      }
2265      LOG.debug("Processing assignQueue; systemServersCount=" + serversForSysTables.size() +
2266          ", allServersCount=" + servers.size());
2267      processAssignmentPlans(regions, null, systemHRIs,
2268          serversForSysTables.isEmpty() && !containsBogusAssignments(regions, systemHRIs) ?
2269              servers: serversForSysTables);
2270    }
2271
2272    processAssignmentPlans(regions, retainMap, userHRIs, servers);
2273  }
2274
2275  private boolean containsBogusAssignments(Map<RegionInfo, RegionStateNode> regions,
2276      List<RegionInfo> hirs) {
2277    for (RegionInfo ri : hirs) {
2278      if (regions.get(ri).getRegionLocation() != null &&
2279          regions.get(ri).getRegionLocation().equals(LoadBalancer.BOGUS_SERVER_NAME)){
2280        return true;
2281      }
2282    }
2283    return false;
2284  }
2285
2286  private void processAssignmentPlans(final HashMap<RegionInfo, RegionStateNode> regions,
2287      final HashMap<RegionInfo, ServerName> retainMap, final List<RegionInfo> hris,
2288      final List<ServerName> servers) {
2289    boolean isTraceEnabled = LOG.isTraceEnabled();
2290    if (isTraceEnabled) {
2291      LOG.trace("Available servers count=" + servers.size() + ": " + servers);
2292    }
2293
2294    final LoadBalancer balancer = getBalancer();
2295    // ask the balancer where to place regions
2296    if (retainMap != null && !retainMap.isEmpty()) {
2297      if (isTraceEnabled) {
2298        LOG.trace("retain assign regions=" + retainMap);
2299      }
2300      try {
2301        acceptPlan(regions, balancer.retainAssignment(retainMap, servers));
2302      } catch (IOException e) {
2303        LOG.warn("unable to retain assignment", e);
2304        addToPendingAssignment(regions, retainMap.keySet());
2305      }
2306    }
2307
2308    // TODO: Do we need to split retain and round-robin?
2309    // the retain seems to fallback to round-robin/random if the region is not in the map.
2310    if (!hris.isEmpty()) {
2311      Collections.sort(hris, RegionInfo.COMPARATOR);
2312      if (isTraceEnabled) {
2313        LOG.trace("round robin regions=" + hris);
2314      }
2315      try {
2316        acceptPlan(regions, balancer.roundRobinAssignment(hris, servers));
2317      } catch (IOException e) {
2318        LOG.warn("unable to round-robin assignment", e);
2319        addToPendingAssignment(regions, hris);
2320      }
2321    }
2322  }
2323
2324  private void acceptPlan(final HashMap<RegionInfo, RegionStateNode> regions,
2325      final Map<ServerName, List<RegionInfo>> plan) throws HBaseIOException {
2326    final ProcedureEvent<?>[] events = new ProcedureEvent[regions.size()];
2327    final long st = EnvironmentEdgeManager.currentTime();
2328
2329    if (plan.isEmpty()) {
2330      throw new HBaseIOException("unable to compute plans for regions=" + regions.size());
2331    }
2332
2333    int evcount = 0;
2334    for (Map.Entry<ServerName, List<RegionInfo>> entry: plan.entrySet()) {
2335      final ServerName server = entry.getKey();
2336      for (RegionInfo hri: entry.getValue()) {
2337        final RegionStateNode regionNode = regions.get(hri);
2338        regionNode.setRegionLocation(server);
2339        if (server.equals(LoadBalancer.BOGUS_SERVER_NAME) && regionNode.isSystemTable()) {
2340          assignQueueLock.lock();
2341          try {
2342            pendingAssignQueue.add(regionNode);
2343          } finally {
2344            assignQueueLock.unlock();
2345          }
2346        }else {
2347          events[evcount++] = regionNode.getProcedureEvent();
2348        }
2349      }
2350    }
2351    ProcedureEvent.wakeEvents(getProcedureScheduler(), events);
2352
2353    final long et = EnvironmentEdgeManager.currentTime();
2354    if (LOG.isTraceEnabled()) {
2355      LOG.trace("ASSIGN ACCEPT " + events.length + " -> " +
2356          StringUtils.humanTimeDiff(et - st));
2357    }
2358  }
2359
2360  private void addToPendingAssignment(final HashMap<RegionInfo, RegionStateNode> regions,
2361      final Collection<RegionInfo> pendingRegions) {
2362    assignQueueLock.lock();
2363    try {
2364      for (RegionInfo hri: pendingRegions) {
2365        pendingAssignQueue.add(regions.get(hri));
2366      }
2367    } finally {
2368      assignQueueLock.unlock();
2369    }
2370  }
2371
2372  /**
2373   * For a given cluster with mixed versions of servers, get a list of
2374   * servers with lower versions, where system table regions should not be
2375   * assigned to.
2376   * For system table, we must assign regions to a server with highest version.
2377   * However, we can disable this exclusion using config:
2378   * "hbase.min.version.move.system.tables" if checkForMinVersion is true.
2379   * Detailed explanation available with definition of minVersionToMoveSysTables.
2380   *
2381   * @return List of Excluded servers for System table regions.
2382   */
2383  public List<ServerName> getExcludedServersForSystemTable() {
2384    // TODO: This should be a cached list kept by the ServerManager rather than calculated on each
2385    // move or system region assign. The RegionServerTracker keeps list of online Servers with
2386    // RegionServerInfo that includes Version.
2387    List<Pair<ServerName, String>> serverList = master.getServerManager().getOnlineServersList()
2388      .stream()
2389      .map(s->new Pair<>(s, master.getRegionServerVersion(s)))
2390      .collect(Collectors.toList());
2391    if (serverList.isEmpty()) {
2392      return new ArrayList<>();
2393    }
2394    String highestVersion = Collections.max(serverList,
2395      (o1, o2) -> VersionInfo.compareVersion(o1.getSecond(), o2.getSecond())).getSecond();
2396    if (!DEFAULT_MIN_VERSION_MOVE_SYS_TABLES_CONFIG.equals(minVersionToMoveSysTables)) {
2397      int comparedValue = VersionInfo.compareVersion(minVersionToMoveSysTables,
2398        highestVersion);
2399      if (comparedValue > 0) {
2400        return new ArrayList<>();
2401      }
2402    }
2403    return serverList.stream()
2404      .filter(pair -> !pair.getSecond().equals(highestVersion))
2405      .map(Pair::getFirst)
2406      .collect(Collectors.toList());
2407  }
2408
2409
2410  MasterServices getMaster() {
2411    return master;
2412  }
2413
2414  /**
2415   * @return a snapshot of rsReports
2416   */
2417  public Map<ServerName, Set<byte[]>> getRSReports() {
2418    Map<ServerName, Set<byte[]>> rsReportsSnapshot = new HashMap<>();
2419    synchronized (rsReports) {
2420      rsReports.entrySet().forEach(e -> rsReportsSnapshot.put(e.getKey(), e.getValue()));
2421    }
2422    return rsReportsSnapshot;
2423  }
2424
2425  /**
2426   * Provide regions state count for given table.
2427   * e.g howmany regions of give table are opened/closed/rit etc
2428   *
2429   * @param tableName TableName
2430   * @return region states count
2431   */
2432  public RegionStatesCount getRegionStatesCount(TableName tableName) {
2433    int openRegionsCount = 0;
2434    int closedRegionCount = 0;
2435    int ritCount = 0;
2436    int splitRegionCount = 0;
2437    int totalRegionCount = 0;
2438    if (!isTableDisabled(tableName)) {
2439      final List<RegionState> states = regionStates.getTableRegionStates(tableName);
2440      for (RegionState regionState : states) {
2441        if (regionState.isOpened()) {
2442          openRegionsCount++;
2443        } else if (regionState.isClosed()) {
2444          closedRegionCount++;
2445        } else if (regionState.isSplit()) {
2446          splitRegionCount++;
2447        }
2448      }
2449      totalRegionCount = states.size();
2450      ritCount = totalRegionCount - openRegionsCount - splitRegionCount;
2451    }
2452    return new RegionStatesCount.RegionStatesCountBuilder()
2453      .setOpenRegions(openRegionsCount)
2454      .setClosedRegions(closedRegionCount)
2455      .setSplitRegions(splitRegionCount)
2456      .setRegionsInTransition(ritCount)
2457      .setTotalRegions(totalRegionCount)
2458      .build();
2459  }
2460
2461}