001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.assignment;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Collection;
023import java.util.Collections;
024import java.util.HashMap;
025import java.util.HashSet;
026import java.util.List;
027import java.util.Map;
028import java.util.Set;
029import java.util.concurrent.Future;
030import java.util.concurrent.TimeUnit;
031import java.util.concurrent.atomic.AtomicBoolean;
032import java.util.concurrent.locks.Condition;
033import java.util.concurrent.locks.ReentrantLock;
034import java.util.stream.Collectors;
035import java.util.stream.Stream;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.hbase.CatalogFamilyFormat;
038import org.apache.hadoop.hbase.DoNotRetryIOException;
039import org.apache.hadoop.hbase.HBaseIOException;
040import org.apache.hadoop.hbase.HConstants;
041import org.apache.hadoop.hbase.PleaseHoldException;
042import org.apache.hadoop.hbase.ServerName;
043import org.apache.hadoop.hbase.TableName;
044import org.apache.hadoop.hbase.UnknownRegionException;
045import org.apache.hadoop.hbase.client.DoNotRetryRegionException;
046import org.apache.hadoop.hbase.client.MasterSwitchType;
047import org.apache.hadoop.hbase.client.RegionInfo;
048import org.apache.hadoop.hbase.client.RegionInfoBuilder;
049import org.apache.hadoop.hbase.client.RegionReplicaUtil;
050import org.apache.hadoop.hbase.client.RegionStatesCount;
051import org.apache.hadoop.hbase.client.Result;
052import org.apache.hadoop.hbase.client.ResultScanner;
053import org.apache.hadoop.hbase.client.Scan;
054import org.apache.hadoop.hbase.client.TableDescriptor;
055import org.apache.hadoop.hbase.client.TableState;
056import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
057import org.apache.hadoop.hbase.favored.FavoredNodesManager;
058import org.apache.hadoop.hbase.favored.FavoredNodesPromoter;
059import org.apache.hadoop.hbase.master.LoadBalancer;
060import org.apache.hadoop.hbase.master.MasterServices;
061import org.apache.hadoop.hbase.master.MetricsAssignmentManager;
062import org.apache.hadoop.hbase.master.RegionPlan;
063import org.apache.hadoop.hbase.master.RegionState;
064import org.apache.hadoop.hbase.master.RegionState.State;
065import org.apache.hadoop.hbase.master.ServerManager;
066import org.apache.hadoop.hbase.master.TableStateManager;
067import org.apache.hadoop.hbase.master.balancer.FavoredStochasticBalancer;
068import org.apache.hadoop.hbase.master.procedure.HBCKServerCrashProcedure;
069import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
070import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler;
071import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait;
072import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
073import org.apache.hadoop.hbase.master.region.MasterRegion;
074import org.apache.hadoop.hbase.procedure2.Procedure;
075import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
076import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
077import org.apache.hadoop.hbase.procedure2.ProcedureInMemoryChore;
078import org.apache.hadoop.hbase.procedure2.util.StringUtils;
079import org.apache.hadoop.hbase.regionserver.SequenceId;
080import org.apache.hadoop.hbase.rsgroup.RSGroupBasedLoadBalancer;
081import org.apache.hadoop.hbase.util.Bytes;
082import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
083import org.apache.hadoop.hbase.util.Pair;
084import org.apache.hadoop.hbase.util.Threads;
085import org.apache.hadoop.hbase.util.VersionInfo;
086import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
087import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
088import org.apache.yetus.audience.InterfaceAudience;
089import org.apache.zookeeper.KeeperException;
090import org.slf4j.Logger;
091import org.slf4j.LoggerFactory;
092
093import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
094import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
095import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
096import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
097import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
098
099/**
100 * The AssignmentManager is the coordinator for region assign/unassign operations.
101 * <ul>
102 * <li>In-memory states of regions and servers are stored in {@link RegionStates}.</li>
103 * <li>hbase:meta state updates are handled by {@link RegionStateStore}.</li>
104 * </ul>
105 * Regions are created by CreateTable, Split, Merge. Regions are deleted by DeleteTable, Split,
106 * Merge. Assigns are triggered by CreateTable, EnableTable, Split, Merge, ServerCrash. Unassigns
107 * are triggered by DisableTable, Split, Merge
108 */
109@InterfaceAudience.Private
110public class AssignmentManager {
111  private static final Logger LOG = LoggerFactory.getLogger(AssignmentManager.class);
112
113  // TODO: AMv2
114  // - handle region migration from hbase1 to hbase2.
115  // - handle sys table assignment first (e.g. acl, namespace)
116  // - handle table priorities
117  // - If ServerBusyException trying to update hbase:meta, we abort the Master
118  // See updateRegionLocation in RegionStateStore.
119  //
120  // See also
121  // https://docs.google.com/document/d/1eVKa7FHdeoJ1-9o8yZcOTAQbv0u0bblBlCCzVSIn69g/edit#heading=h.ystjyrkbtoq5
122  // for other TODOs.
123
124  public static final String BOOTSTRAP_THREAD_POOL_SIZE_CONF_KEY =
125    "hbase.assignment.bootstrap.thread.pool.size";
126
127  public static final String ASSIGN_DISPATCH_WAIT_MSEC_CONF_KEY =
128    "hbase.assignment.dispatch.wait.msec";
129  private static final int DEFAULT_ASSIGN_DISPATCH_WAIT_MSEC = 150;
130
131  public static final String ASSIGN_DISPATCH_WAITQ_MAX_CONF_KEY =
132    "hbase.assignment.dispatch.wait.queue.max.size";
133  private static final int DEFAULT_ASSIGN_DISPATCH_WAITQ_MAX = 100;
134
135  public static final String RIT_CHORE_INTERVAL_MSEC_CONF_KEY =
136    "hbase.assignment.rit.chore.interval.msec";
137  private static final int DEFAULT_RIT_CHORE_INTERVAL_MSEC = 60 * 1000;
138
139  public static final String DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC_CONF_KEY =
140    "hbase.assignment.dead.region.metric.chore.interval.msec";
141  private static final int DEFAULT_DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC = 120 * 1000;
142
143  public static final String ASSIGN_MAX_ATTEMPTS = "hbase.assignment.maximum.attempts";
144  private static final int DEFAULT_ASSIGN_MAX_ATTEMPTS = Integer.MAX_VALUE;
145
146  public static final String ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS =
147    "hbase.assignment.retry.immediately.maximum.attempts";
148  private static final int DEFAULT_ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS = 3;
149
150  /** Region in Transition metrics threshold time */
151  public static final String METRICS_RIT_STUCK_WARNING_THRESHOLD =
152    "hbase.metrics.rit.stuck.warning.threshold";
153  private static final int DEFAULT_RIT_STUCK_WARNING_THRESHOLD = 60 * 1000;
154  public static final String UNEXPECTED_STATE_REGION = "Unexpected state for ";
155
156  private final ProcedureEvent<?> metaAssignEvent = new ProcedureEvent<>("meta assign");
157  private final ProcedureEvent<?> metaLoadEvent = new ProcedureEvent<>("meta load");
158
159  private final MetricsAssignmentManager metrics;
160  private final RegionInTransitionChore ritChore;
161  private final DeadServerMetricRegionChore deadMetricChore;
162  private final MasterServices master;
163
164  private final AtomicBoolean running = new AtomicBoolean(false);
165  private final RegionStates regionStates = new RegionStates();
166  private final RegionStateStore regionStateStore;
167
168  /**
169   * When the operator uses this configuration option, any version between the current cluster
170   * version and the value of "hbase.min.version.move.system.tables" does not trigger any
171   * auto-region movement. Auto-region movement here refers to auto-migration of system table
172   * regions to newer server versions. It is assumed that the configured range of versions does not
173   * require special handling of moving system table regions to higher versioned RegionServer. This
174   * auto-migration is done by {@link #checkIfShouldMoveSystemRegionAsync()}. Example: Let's assume
175   * the cluster is on version 1.4.0 and we have set "hbase.min.version.move.system.tables" as
176   * "2.0.0". Now if we upgrade one RegionServer on 1.4.0 cluster to 1.6.0 (< 2.0.0), then
177   * AssignmentManager will not move hbase:meta, hbase:namespace and other system table regions to
178   * newly brought up RegionServer 1.6.0 as part of auto-migration. However, if we upgrade one
179   * RegionServer on 1.4.0 cluster to 2.2.0 (> 2.0.0), then AssignmentManager will move all system
180   * table regions to newly brought up RegionServer 2.2.0 as part of auto-migration done by
181   * {@link #checkIfShouldMoveSystemRegionAsync()}. "hbase.min.version.move.system.tables" is
182   * introduced as part of HBASE-22923.
183   */
184  private final String minVersionToMoveSysTables;
185
186  private static final String MIN_VERSION_MOVE_SYS_TABLES_CONFIG =
187    "hbase.min.version.move.system.tables";
188  private static final String DEFAULT_MIN_VERSION_MOVE_SYS_TABLES_CONFIG = "";
189
190  private final Map<ServerName, Set<byte[]>> rsReports = new HashMap<>();
191
192  private final boolean shouldAssignRegionsWithFavoredNodes;
193  private final int assignDispatchWaitQueueMaxSize;
194  private final int assignDispatchWaitMillis;
195  private final int assignMaxAttempts;
196  private final int assignRetryImmediatelyMaxAttempts;
197
198  private final MasterRegion masterRegion;
199
200  private final Object checkIfShouldMoveSystemRegionLock = new Object();
201
202  private Thread assignThread;
203
204  public AssignmentManager(MasterServices master, MasterRegion masterRegion) {
205    this(master, masterRegion, new RegionStateStore(master, masterRegion));
206  }
207
208  AssignmentManager(MasterServices master, MasterRegion masterRegion, RegionStateStore stateStore) {
209    this.master = master;
210    this.regionStateStore = stateStore;
211    this.metrics = new MetricsAssignmentManager();
212    this.masterRegion = masterRegion;
213
214    final Configuration conf = master.getConfiguration();
215
216    // Only read favored nodes if using the favored nodes load balancer.
217    this.shouldAssignRegionsWithFavoredNodes = FavoredStochasticBalancer.class
218      .isAssignableFrom(conf.getClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, Object.class));
219
220    this.assignDispatchWaitMillis =
221      conf.getInt(ASSIGN_DISPATCH_WAIT_MSEC_CONF_KEY, DEFAULT_ASSIGN_DISPATCH_WAIT_MSEC);
222    this.assignDispatchWaitQueueMaxSize =
223      conf.getInt(ASSIGN_DISPATCH_WAITQ_MAX_CONF_KEY, DEFAULT_ASSIGN_DISPATCH_WAITQ_MAX);
224
225    this.assignMaxAttempts =
226      Math.max(1, conf.getInt(ASSIGN_MAX_ATTEMPTS, DEFAULT_ASSIGN_MAX_ATTEMPTS));
227    this.assignRetryImmediatelyMaxAttempts = conf.getInt(ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS,
228      DEFAULT_ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS);
229
230    int ritChoreInterval =
231      conf.getInt(RIT_CHORE_INTERVAL_MSEC_CONF_KEY, DEFAULT_RIT_CHORE_INTERVAL_MSEC);
232    this.ritChore = new RegionInTransitionChore(ritChoreInterval);
233
234    int deadRegionChoreInterval = conf.getInt(DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC_CONF_KEY,
235      DEFAULT_DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC);
236    if (deadRegionChoreInterval > 0) {
237      this.deadMetricChore = new DeadServerMetricRegionChore(deadRegionChoreInterval);
238    } else {
239      this.deadMetricChore = null;
240    }
241    minVersionToMoveSysTables =
242      conf.get(MIN_VERSION_MOVE_SYS_TABLES_CONFIG, DEFAULT_MIN_VERSION_MOVE_SYS_TABLES_CONFIG);
243  }
244
245  private void mirrorMetaLocations() throws IOException, KeeperException {
246    // For compatibility, mirror the meta region state to zookeeper
247    // And we still need to use zookeeper to publish the meta region locations to region
248    // server, so they can serve as ClientMetaService
249    ZKWatcher zk = master.getZooKeeper();
250    if (zk == null || !zk.getRecoverableZooKeeper().getState().isAlive()) {
251      // this is possible in tests, we do not provide a zk watcher or the zk watcher has been closed
252      return;
253    }
254    Collection<RegionStateNode> metaStates = regionStates.getRegionStateNodes();
255    for (RegionStateNode metaState : metaStates) {
256      MetaTableLocator.setMetaLocation(zk, metaState.getRegionLocation(),
257        metaState.getRegionInfo().getReplicaId(), metaState.getState());
258    }
259    int replicaCount = metaStates.size();
260    // remove extra mirror locations
261    for (String znode : zk.getMetaReplicaNodes()) {
262      int replicaId = zk.getZNodePaths().getMetaReplicaIdFromZNode(znode);
263      if (replicaId >= replicaCount) {
264        MetaTableLocator.deleteMetaLocation(zk, replicaId);
265      }
266    }
267  }
268
269  public void start() throws IOException, KeeperException {
270    if (!running.compareAndSet(false, true)) {
271      return;
272    }
273
274    LOG.trace("Starting assignment manager");
275
276    // Start the Assignment Thread
277    startAssignmentThread();
278    // load meta region states.
279    // here we are still in the early steps of active master startup. There is only one thread(us)
280    // can access AssignmentManager and create region node, so here we do not need to lock the
281    // region node.
282    try (ResultScanner scanner =
283      masterRegion.getScanner(new Scan().addFamily(HConstants.CATALOG_FAMILY))) {
284      for (;;) {
285        Result result = scanner.next();
286        if (result == null) {
287          break;
288        }
289        RegionStateStore
290          .visitMetaEntry((r, regionInfo, state, regionLocation, lastHost, openSeqNum) -> {
291            RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo);
292            regionNode.setState(state);
293            regionNode.setLastHost(lastHost);
294            regionNode.setRegionLocation(regionLocation);
295            regionNode.setOpenSeqNum(openSeqNum);
296            if (regionNode.getProcedure() != null) {
297              regionNode.getProcedure().stateLoaded(this, regionNode);
298            }
299            if (regionLocation != null) {
300              regionStates.addRegionToServer(regionNode);
301            }
302            if (RegionReplicaUtil.isDefaultReplica(regionInfo.getReplicaId())) {
303              setMetaAssigned(regionInfo, state == State.OPEN);
304            }
305            LOG.debug("Loaded hbase:meta {}", regionNode);
306          }, result);
307      }
308    }
309    mirrorMetaLocations();
310  }
311
312  /**
313   * Create RegionStateNode based on the TRSP list, and attach the TRSP to the RegionStateNode.
314   * <p>
315   * This is used to restore the RIT region list, so we do not need to restore it in the loadingMeta
316   * method below. And it is also very important as now before submitting a TRSP, we need to attach
317   * it to the RegionStateNode, which acts like a guard, so we need to restore this information at
318   * the very beginning, before we start processing any procedures.
319   */
320  public void setupRIT(List<TransitRegionStateProcedure> procs) {
321    procs.forEach(proc -> {
322      RegionInfo regionInfo = proc.getRegion();
323      RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo);
324      TransitRegionStateProcedure existingProc = regionNode.getProcedure();
325      if (existingProc != null) {
326        // This is possible, as we will detach the procedure from the RSN before we
327        // actually finish the procedure. This is because that, we will detach the TRSP from the RSN
328        // during execution, at that time, the procedure has not been marked as done in the pv2
329        // framework yet, so it is possible that we schedule a new TRSP immediately and when
330        // arriving here, we will find out that there are multiple TRSPs for the region. But we can
331        // make sure that, only the last one can take the charge, the previous ones should have all
332        // been finished already. So here we will compare the proc id, the greater one will win.
333        if (existingProc.getProcId() < proc.getProcId()) {
334          // the new one wins, unset and set it to the new one below
335          regionNode.unsetProcedure(existingProc);
336        } else {
337          // the old one wins, skip
338          return;
339        }
340      }
341      LOG.info("Attach {} to {} to restore RIT", proc, regionNode);
342      regionNode.setProcedure(proc);
343    });
344  }
345
346  public void stop() {
347    if (!running.compareAndSet(true, false)) {
348      return;
349    }
350
351    LOG.info("Stopping assignment manager");
352
353    // The AM is started before the procedure executor,
354    // but the actual work will be loaded/submitted only once we have the executor
355    final boolean hasProcExecutor = master.getMasterProcedureExecutor() != null;
356
357    // Remove the RIT chore
358    if (hasProcExecutor) {
359      master.getMasterProcedureExecutor().removeChore(this.ritChore);
360      if (this.deadMetricChore != null) {
361        master.getMasterProcedureExecutor().removeChore(this.deadMetricChore);
362      }
363    }
364
365    // Stop the Assignment Thread
366    stopAssignmentThread();
367
368    // Stop the RegionStateStore
369    regionStates.clear();
370
371    // Update meta events (for testing)
372    if (hasProcExecutor) {
373      metaLoadEvent.suspend();
374      for (RegionInfo hri : getMetaRegionSet()) {
375        setMetaAssigned(hri, false);
376      }
377    }
378  }
379
380  public boolean isRunning() {
381    return running.get();
382  }
383
384  public Configuration getConfiguration() {
385    return master.getConfiguration();
386  }
387
388  public MetricsAssignmentManager getAssignmentManagerMetrics() {
389    return metrics;
390  }
391
392  private LoadBalancer getBalancer() {
393    return master.getLoadBalancer();
394  }
395
396  private FavoredNodesPromoter getFavoredNodePromoter() {
397    return (FavoredNodesPromoter) ((RSGroupBasedLoadBalancer) master.getLoadBalancer())
398      .getInternalBalancer();
399  }
400
401  private MasterProcedureEnv getProcedureEnvironment() {
402    return master.getMasterProcedureExecutor().getEnvironment();
403  }
404
405  private MasterProcedureScheduler getProcedureScheduler() {
406    return getProcedureEnvironment().getProcedureScheduler();
407  }
408
409  int getAssignMaxAttempts() {
410    return assignMaxAttempts;
411  }
412
413  int getAssignRetryImmediatelyMaxAttempts() {
414    return assignRetryImmediatelyMaxAttempts;
415  }
416
417  public RegionStates getRegionStates() {
418    return regionStates;
419  }
420
421  /**
422   * Returns the regions hosted by the specified server.
423   * <p/>
424   * Notice that, for SCP, after we submit the SCP, no one can change the region list for the
425   * ServerStateNode so we do not need any locks here. And for other usage, this can only give you a
426   * snapshot of the current region list for this server, which means, right after you get the
427   * region list, new regions may be moved to this server or some regions may be moved out from this
428   * server, so you should not use it critically if you need strong consistency.
429   */
430  public List<RegionInfo> getRegionsOnServer(ServerName serverName) {
431    ServerStateNode serverInfo = regionStates.getServerNode(serverName);
432    if (serverInfo == null) {
433      return Collections.emptyList();
434    }
435    return serverInfo.getRegionInfoList();
436  }
437
438  private RegionInfo getRegionInfo(RegionStateNode rsn) {
439    if (rsn.isSplit() && !rsn.getRegionInfo().isSplit()) {
440      // see the comments in markRegionAsSplit on why we need to do this converting.
441      return RegionInfoBuilder.newBuilder(rsn.getRegionInfo()).setSplit(true).setOffline(true)
442        .build();
443    } else {
444      return rsn.getRegionInfo();
445    }
446  }
447
448  private Stream<RegionStateNode> getRegionStateNodes(TableName tableName,
449    boolean excludeOfflinedSplitParents) {
450    Stream<RegionStateNode> stream = regionStates.getTableRegionStateNodes(tableName).stream();
451    if (excludeOfflinedSplitParents) {
452      return stream.filter(rsn -> !rsn.isSplit());
453    } else {
454      return stream;
455    }
456  }
457
458  public List<RegionInfo> getTableRegions(TableName tableName,
459    boolean excludeOfflinedSplitParents) {
460    return getRegionStateNodes(tableName, excludeOfflinedSplitParents).map(this::getRegionInfo)
461      .collect(Collectors.toList());
462  }
463
464  public List<Pair<RegionInfo, ServerName>> getTableRegionsAndLocations(TableName tableName,
465    boolean excludeOfflinedSplitParents) {
466    return getRegionStateNodes(tableName, excludeOfflinedSplitParents)
467      .map(rsn -> Pair.newPair(getRegionInfo(rsn), rsn.getRegionLocation()))
468      .collect(Collectors.toList());
469  }
470
471  public RegionStateStore getRegionStateStore() {
472    return regionStateStore;
473  }
474
475  public List<ServerName> getFavoredNodes(final RegionInfo regionInfo) {
476    return this.shouldAssignRegionsWithFavoredNodes
477      ? getFavoredNodePromoter().getFavoredNodes(regionInfo)
478      : ServerName.EMPTY_SERVER_LIST;
479  }
480
481  // ============================================================================================
482  // Table State Manager helpers
483  // ============================================================================================
484  private TableStateManager getTableStateManager() {
485    return master.getTableStateManager();
486  }
487
488  private boolean isTableEnabled(final TableName tableName) {
489    return getTableStateManager().isTableState(tableName, TableState.State.ENABLED);
490  }
491
492  private boolean isTableDisabled(final TableName tableName) {
493    return getTableStateManager().isTableState(tableName, TableState.State.DISABLED,
494      TableState.State.DISABLING);
495  }
496
497  // ============================================================================================
498  // META Helpers
499  // ============================================================================================
500  private boolean isMetaRegion(final RegionInfo regionInfo) {
501    return regionInfo.isMetaRegion();
502  }
503
504  public boolean isMetaRegion(final byte[] regionName) {
505    return getMetaRegionFromName(regionName) != null;
506  }
507
508  public RegionInfo getMetaRegionFromName(final byte[] regionName) {
509    for (RegionInfo hri : getMetaRegionSet()) {
510      if (Bytes.equals(hri.getRegionName(), regionName)) {
511        return hri;
512      }
513    }
514    return null;
515  }
516
517  public boolean isCarryingMeta(final ServerName serverName) {
518    // TODO: handle multiple meta
519    return isCarryingRegion(serverName, RegionInfoBuilder.FIRST_META_REGIONINFO);
520  }
521
522  private boolean isCarryingRegion(final ServerName serverName, final RegionInfo regionInfo) {
523    // TODO: check for state?
524    final RegionStateNode node = regionStates.getRegionStateNode(regionInfo);
525    return (node != null && serverName.equals(node.getRegionLocation()));
526  }
527
528  private RegionInfo getMetaForRegion(final RegionInfo regionInfo) {
529    // if (regionInfo.isMetaRegion()) return regionInfo;
530    // TODO: handle multiple meta. if the region provided is not meta lookup
531    // which meta the region belongs to.
532    return RegionInfoBuilder.FIRST_META_REGIONINFO;
533  }
534
535  // TODO: handle multiple meta.
536  private static final Set<RegionInfo> META_REGION_SET =
537    Collections.singleton(RegionInfoBuilder.FIRST_META_REGIONINFO);
538
539  public Set<RegionInfo> getMetaRegionSet() {
540    return META_REGION_SET;
541  }
542
543  // ============================================================================================
544  // META Event(s) helpers
545  // ============================================================================================
546  /**
547   * Notice that, this only means the meta region is available on a RS, but the AM may still be
548   * loading the region states from meta, so usually you need to check {@link #isMetaLoaded()} first
549   * before checking this method, unless you can make sure that your piece of code can only be
550   * executed after AM builds the region states.
551   * @see #isMetaLoaded()
552   */
553  public boolean isMetaAssigned() {
554    return metaAssignEvent.isReady();
555  }
556
557  public boolean isMetaRegionInTransition() {
558    return !isMetaAssigned();
559  }
560
561  /**
562   * Notice that this event does not mean the AM has already finished region state rebuilding. See
563   * the comment of {@link #isMetaAssigned()} for more details.
564   * @see #isMetaAssigned()
565   */
566  public boolean waitMetaAssigned(Procedure<?> proc, RegionInfo regionInfo) {
567    return getMetaAssignEvent(getMetaForRegion(regionInfo)).suspendIfNotReady(proc);
568  }
569
570  private void setMetaAssigned(RegionInfo metaRegionInfo, boolean assigned) {
571    assert isMetaRegion(metaRegionInfo) : "unexpected non-meta region " + metaRegionInfo;
572    ProcedureEvent<?> metaAssignEvent = getMetaAssignEvent(metaRegionInfo);
573    if (assigned) {
574      metaAssignEvent.wake(getProcedureScheduler());
575    } else {
576      metaAssignEvent.suspend();
577    }
578  }
579
580  private ProcedureEvent<?> getMetaAssignEvent(RegionInfo metaRegionInfo) {
581    assert isMetaRegion(metaRegionInfo) : "unexpected non-meta region " + metaRegionInfo;
582    // TODO: handle multiple meta.
583    return metaAssignEvent;
584  }
585
586  /**
587   * Wait until AM finishes the meta loading, i.e, the region states rebuilding.
588   * @see #isMetaLoaded()
589   * @see #waitMetaAssigned(Procedure, RegionInfo)
590   */
591  public boolean waitMetaLoaded(Procedure<?> proc) {
592    return metaLoadEvent.suspendIfNotReady(proc);
593  }
594
595  /**
596   * This method will be called in master initialization method after calling
597   * {@link #processOfflineRegions()}, as in processOfflineRegions we will generate assign
598   * procedures for offline regions, which may be conflict with creating table.
599   * <p/>
600   * This is a bit dirty, should be reconsidered after we decide whether to keep the
601   * {@link #processOfflineRegions()} method.
602   */
603  public void wakeMetaLoadedEvent() {
604    metaLoadEvent.wake(getProcedureScheduler());
605    assert isMetaLoaded() : "expected meta to be loaded";
606  }
607
608  /**
609   * Return whether AM finishes the meta loading, i.e, the region states rebuilding.
610   * @see #isMetaAssigned()
611   * @see #waitMetaLoaded(Procedure)
612   */
613  public boolean isMetaLoaded() {
614    return metaLoadEvent.isReady();
615  }
616
617  /**
618   * Start a new thread to check if there are region servers whose versions are higher than others.
619   * If so, move all system table regions to RS with the highest version to keep compatibility. The
620   * reason is, RS in new version may not be able to access RS in old version when there are some
621   * incompatible changes.
622   * <p>
623   * This method is called when a new RegionServer is added to cluster only.
624   * </p>
625   */
626  public void checkIfShouldMoveSystemRegionAsync() {
627    // TODO: Fix this thread. If a server is killed and a new one started, this thread thinks that
628    // it should 'move' the system tables from the old server to the new server but
629    // ServerCrashProcedure is on it; and it will take care of the assign without dataloss.
630    if (this.master.getServerManager().countOfRegionServers() <= 1) {
631      return;
632    }
633    // This thread used to run whenever there was a change in the cluster. The ZooKeeper
634    // childrenChanged notification came in before the nodeDeleted message and so this method
635    // cold run before a ServerCrashProcedure could run. That meant that this thread could see
636    // a Crashed Server before ServerCrashProcedure and it could find system regions on the
637    // crashed server and go move them before ServerCrashProcedure had a chance; could be
638    // dataloss too if WALs were not recovered.
639    new Thread(() -> {
640      try {
641        synchronized (checkIfShouldMoveSystemRegionLock) {
642          List<RegionPlan> plans = new ArrayList<>();
643          // TODO: I don't think this code does a good job if all servers in cluster have same
644          // version. It looks like it will schedule unnecessary moves.
645          for (ServerName server : getExcludedServersForSystemTable()) {
646            if (master.getServerManager().isServerDead(server)) {
647              // TODO: See HBASE-18494 and HBASE-18495. Though getExcludedServersForSystemTable()
648              // considers only online servers, the server could be queued for dead server
649              // processing. As region assignments for crashed server is handled by
650              // ServerCrashProcedure, do NOT handle them here. The goal is to handle this through
651              // regular flow of LoadBalancer as a favored node and not to have this special
652              // handling.
653              continue;
654            }
655            List<RegionInfo> regionsShouldMove = getSystemTables(server);
656            if (!regionsShouldMove.isEmpty()) {
657              for (RegionInfo regionInfo : regionsShouldMove) {
658                // null value for dest forces destination server to be selected by balancer
659                RegionPlan plan = new RegionPlan(regionInfo, server, null);
660                if (regionInfo.isMetaRegion()) {
661                  // Must move meta region first.
662                  LOG.info("Async MOVE of {} to newer Server={}", regionInfo.getEncodedName(),
663                    server);
664                  moveAsync(plan);
665                } else {
666                  plans.add(plan);
667                }
668              }
669            }
670            for (RegionPlan plan : plans) {
671              LOG.info("Async MOVE of {} to newer Server={}", plan.getRegionInfo().getEncodedName(),
672                server);
673              moveAsync(plan);
674            }
675          }
676        }
677      } catch (Throwable t) {
678        LOG.error(t.toString(), t);
679      }
680    }).start();
681  }
682
683  private List<RegionInfo> getSystemTables(ServerName serverName) {
684    ServerStateNode serverNode = regionStates.getServerNode(serverName);
685    if (serverNode == null) {
686      return Collections.emptyList();
687    }
688    return serverNode.getSystemRegionInfoList();
689  }
690
691  private void preTransitCheck(RegionStateNode regionNode, RegionState.State[] expectedStates)
692    throws HBaseIOException {
693    if (regionNode.getProcedure() != null) {
694      throw new HBaseIOException(
695        regionNode + " is currently in transition; pid=" + regionNode.getProcedure().getProcId());
696    }
697    if (!regionNode.isInState(expectedStates)) {
698      throw new DoNotRetryRegionException(UNEXPECTED_STATE_REGION + regionNode);
699    }
700    if (isTableDisabled(regionNode.getTable())) {
701      throw new DoNotRetryIOException(regionNode.getTable() + " is disabled for " + regionNode);
702    }
703  }
704
705  /**
706   * Create an assign TransitRegionStateProcedure. Makes sure of RegionState. Throws exception if
707   * not appropriate UNLESS override is set. Used by hbck2 but also by straightline
708   * {@link #assign(RegionInfo, ServerName)} and {@link #assignAsync(RegionInfo, ServerName)}.
709   * @see #createAssignProcedure(RegionStateNode, ServerName) for a version that does NO checking
710   *      used when only when no checking needed.
711   * @param override If false, check RegionState is appropriate for assign; if not throw exception.
712   */
713  private TransitRegionStateProcedure createAssignProcedure(RegionInfo regionInfo, ServerName sn,
714    boolean override) throws IOException {
715    RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo);
716    regionNode.lock();
717    try {
718      if (override) {
719        if (regionNode.getProcedure() != null) {
720          regionNode.unsetProcedure(regionNode.getProcedure());
721        }
722      } else {
723        preTransitCheck(regionNode, STATES_EXPECTED_ON_ASSIGN);
724      }
725      assert regionNode.getProcedure() == null;
726      return regionNode.setProcedure(
727        TransitRegionStateProcedure.assign(getProcedureEnvironment(), regionInfo, sn));
728    } finally {
729      regionNode.unlock();
730    }
731  }
732
733  /**
734   * Create an assign TransitRegionStateProcedure. Does NO checking of RegionState. Presumes
735   * appriopriate state ripe for assign.
736   * @see #createAssignProcedure(RegionInfo, ServerName, boolean)
737   */
738  private TransitRegionStateProcedure createAssignProcedure(RegionStateNode regionNode,
739    ServerName targetServer) {
740    regionNode.lock();
741    try {
742      return regionNode.setProcedure(TransitRegionStateProcedure.assign(getProcedureEnvironment(),
743        regionNode.getRegionInfo(), targetServer));
744    } finally {
745      regionNode.unlock();
746    }
747  }
748
749  public long assign(RegionInfo regionInfo, ServerName sn) throws IOException {
750    TransitRegionStateProcedure proc = createAssignProcedure(regionInfo, sn, false);
751    ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc);
752    return proc.getProcId();
753  }
754
755  public long assign(RegionInfo regionInfo) throws IOException {
756    return assign(regionInfo, null);
757  }
758
759  /**
760   * Submits a procedure that assigns a region to a target server without waiting for it to finish
761   * @param regionInfo the region we would like to assign
762   * @param sn         target server name
763   */
764  public Future<byte[]> assignAsync(RegionInfo regionInfo, ServerName sn) throws IOException {
765    return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(),
766      createAssignProcedure(regionInfo, sn, false));
767  }
768
769  /**
770   * Submits a procedure that assigns a region without waiting for it to finish
771   * @param regionInfo the region we would like to assign
772   */
773  public Future<byte[]> assignAsync(RegionInfo regionInfo) throws IOException {
774    return assignAsync(regionInfo, null);
775  }
776
777  public long unassign(RegionInfo regionInfo) throws IOException {
778    RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo);
779    if (regionNode == null) {
780      throw new UnknownRegionException("No RegionState found for " + regionInfo.getEncodedName());
781    }
782    TransitRegionStateProcedure proc;
783    regionNode.lock();
784    try {
785      preTransitCheck(regionNode, STATES_EXPECTED_ON_UNASSIGN_OR_MOVE);
786      proc = TransitRegionStateProcedure.unassign(getProcedureEnvironment(), regionInfo);
787      regionNode.setProcedure(proc);
788    } finally {
789      regionNode.unlock();
790    }
791    ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc);
792    return proc.getProcId();
793  }
794
795  public TransitRegionStateProcedure createMoveRegionProcedure(RegionInfo regionInfo,
796    ServerName targetServer) throws HBaseIOException {
797    RegionStateNode regionNode = this.regionStates.getRegionStateNode(regionInfo);
798    if (regionNode == null) {
799      throw new UnknownRegionException(
800        "No RegionStateNode found for " + regionInfo.getEncodedName() + "(Closed/Deleted?)");
801    }
802    TransitRegionStateProcedure proc;
803    regionNode.lock();
804    try {
805      preTransitCheck(regionNode, STATES_EXPECTED_ON_UNASSIGN_OR_MOVE);
806      regionNode.checkOnline();
807      proc = TransitRegionStateProcedure.move(getProcedureEnvironment(), regionInfo, targetServer);
808      regionNode.setProcedure(proc);
809    } finally {
810      regionNode.unlock();
811    }
812    return proc;
813  }
814
815  public void move(RegionInfo regionInfo) throws IOException {
816    TransitRegionStateProcedure proc = createMoveRegionProcedure(regionInfo, null);
817    ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc);
818  }
819
820  public Future<byte[]> moveAsync(RegionPlan regionPlan) throws HBaseIOException {
821    TransitRegionStateProcedure proc =
822      createMoveRegionProcedure(regionPlan.getRegionInfo(), regionPlan.getDestination());
823    return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), proc);
824  }
825
826  public Future<byte[]> balance(RegionPlan regionPlan) throws HBaseIOException {
827    ServerName current =
828      this.getRegionStates().getRegionAssignments().get(regionPlan.getRegionInfo());
829    if (current == null || !current.equals(regionPlan.getSource())) {
830      LOG.debug("Skip region plan {}, source server not match, current region location is {}",
831        regionPlan, current == null ? "(null)" : current);
832      return null;
833    }
834    return moveAsync(regionPlan);
835  }
836
837  // ============================================================================================
838  // RegionTransition procedures helpers
839  // ============================================================================================
840
841  /**
842   * Create round-robin assigns. Use on table creation to distribute out regions across cluster.
843   * @return AssignProcedures made out of the passed in <code>hris</code> and a call to the balancer
844   *         to populate the assigns with targets chosen using round-robin (default balancer
845   *         scheme). If at assign-time, the target chosen is no longer up, thats fine, the
846   *         AssignProcedure will ask the balancer for a new target, and so on.
847   */
848  public TransitRegionStateProcedure[] createRoundRobinAssignProcedures(List<RegionInfo> hris,
849    List<ServerName> serversToExclude) {
850    if (hris.isEmpty()) {
851      return new TransitRegionStateProcedure[0];
852    }
853
854    if (
855      serversToExclude != null && this.master.getServerManager().getOnlineServersList().size() == 1
856    ) {
857      LOG.debug("Only one region server found and hence going ahead with the assignment");
858      serversToExclude = null;
859    }
860    try {
861      // Ask the balancer to assign our regions. Pass the regions en masse. The balancer can do
862      // a better job if it has all the assignments in the one lump.
863      Map<ServerName, List<RegionInfo>> assignments = getBalancer().roundRobinAssignment(hris,
864        this.master.getServerManager().createDestinationServersList(serversToExclude));
865      // Return mid-method!
866      return createAssignProcedures(assignments);
867    } catch (IOException hioe) {
868      LOG.warn("Failed roundRobinAssignment", hioe);
869    }
870    // If an error above, fall-through to this simpler assign. Last resort.
871    return createAssignProcedures(hris);
872  }
873
874  /**
875   * Create round-robin assigns. Use on table creation to distribute out regions across cluster.
876   * @return AssignProcedures made out of the passed in <code>hris</code> and a call to the balancer
877   *         to populate the assigns with targets chosen using round-robin (default balancer
878   *         scheme). If at assign-time, the target chosen is no longer up, thats fine, the
879   *         AssignProcedure will ask the balancer for a new target, and so on.
880   */
881  public TransitRegionStateProcedure[] createRoundRobinAssignProcedures(List<RegionInfo> hris) {
882    return createRoundRobinAssignProcedures(hris, null);
883  }
884
885  static int compare(TransitRegionStateProcedure left, TransitRegionStateProcedure right) {
886    if (left.getRegion().isMetaRegion()) {
887      if (right.getRegion().isMetaRegion()) {
888        return RegionInfo.COMPARATOR.compare(left.getRegion(), right.getRegion());
889      }
890      return -1;
891    } else if (right.getRegion().isMetaRegion()) {
892      return +1;
893    }
894    if (left.getRegion().getTable().isSystemTable()) {
895      if (right.getRegion().getTable().isSystemTable()) {
896        return RegionInfo.COMPARATOR.compare(left.getRegion(), right.getRegion());
897      }
898      return -1;
899    } else if (right.getRegion().getTable().isSystemTable()) {
900      return +1;
901    }
902    return RegionInfo.COMPARATOR.compare(left.getRegion(), right.getRegion());
903  }
904
905  /**
906   * Create one TransitRegionStateProcedure to assign a region w/o specifying a target server. This
907   * method is called from HBCK2.
908   * @return an assign or null
909   */
910  public TransitRegionStateProcedure createOneAssignProcedure(RegionInfo ri, boolean override) {
911    TransitRegionStateProcedure trsp = null;
912    try {
913      trsp = createAssignProcedure(ri, null, override);
914    } catch (IOException ioe) {
915      LOG.info(
916        "Failed {} assign, override={}"
917          + (override ? "" : "; set override to by-pass state checks."),
918        ri.getEncodedName(), override, ioe);
919    }
920    return trsp;
921  }
922
923  /**
924   * Create one TransitRegionStateProcedure to unassign a region. This method is called from HBCK2.
925   * @return an unassign or null
926   */
927  public TransitRegionStateProcedure createOneUnassignProcedure(RegionInfo ri, boolean override) {
928    RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(ri);
929    TransitRegionStateProcedure trsp = null;
930    regionNode.lock();
931    try {
932      if (override) {
933        if (regionNode.getProcedure() != null) {
934          regionNode.unsetProcedure(regionNode.getProcedure());
935        }
936      } else {
937        // This is where we could throw an exception; i.e. override is false.
938        preTransitCheck(regionNode, STATES_EXPECTED_ON_UNASSIGN_OR_MOVE);
939      }
940      assert regionNode.getProcedure() == null;
941      trsp =
942        TransitRegionStateProcedure.unassign(getProcedureEnvironment(), regionNode.getRegionInfo());
943      regionNode.setProcedure(trsp);
944    } catch (IOException ioe) {
945      // 'override' must be false here.
946      LOG.info("Failed {} unassign, override=false; set override to by-pass state checks.",
947        ri.getEncodedName(), ioe);
948    } finally {
949      regionNode.unlock();
950    }
951    return trsp;
952  }
953
954  /**
955   * Create an array of TransitRegionStateProcedure w/o specifying a target server. Used as fallback
956   * of caller is unable to do {@link #createAssignProcedures(Map)}.
957   * <p/>
958   * If no target server, at assign time, we will try to use the former location of the region if
959   * one exists. This is how we 'retain' the old location across a server restart.
960   * <p/>
961   * Should only be called when you can make sure that no one can touch these regions other than
962   * you. For example, when you are creating or enabling table. Presumes all Regions are in
963   * appropriate state ripe for assign; no checking of Region state is done in here.
964   * @see #createAssignProcedures(Map)
965   */
966  public TransitRegionStateProcedure[] createAssignProcedures(List<RegionInfo> hris) {
967    return hris.stream().map(hri -> regionStates.getOrCreateRegionStateNode(hri))
968      .map(regionNode -> createAssignProcedure(regionNode, null)).sorted(AssignmentManager::compare)
969      .toArray(TransitRegionStateProcedure[]::new);
970  }
971
972  /**
973   * Tied to {@link #createAssignProcedures(List)} in that it is called if caller is unable to run
974   * this method. Presumes all Regions are in appropriate state ripe for assign; no checking of
975   * Region state is done in here.
976   * @param assignments Map of assignments from which we produce an array of AssignProcedures.
977   * @return Assignments made from the passed in <code>assignments</code>
978   * @see #createAssignProcedures(List)
979   */
980  private TransitRegionStateProcedure[]
981    createAssignProcedures(Map<ServerName, List<RegionInfo>> assignments) {
982    return assignments.entrySet().stream()
983      .flatMap(e -> e.getValue().stream().map(hri -> regionStates.getOrCreateRegionStateNode(hri))
984        .map(regionNode -> createAssignProcedure(regionNode, e.getKey())))
985      .sorted(AssignmentManager::compare).toArray(TransitRegionStateProcedure[]::new);
986  }
987
988  // for creating unassign TRSP when disabling a table or closing excess region replicas
989  private TransitRegionStateProcedure forceCreateUnssignProcedure(RegionStateNode regionNode) {
990    regionNode.lock();
991    try {
992      if (regionNode.isInState(State.OFFLINE, State.CLOSED, State.SPLIT)) {
993        return null;
994      }
995      // in general, a split parent should be in CLOSED or SPLIT state, but anyway, let's check it
996      // here for safety
997      if (regionNode.getRegionInfo().isSplit()) {
998        LOG.warn("{} is a split parent but not in CLOSED or SPLIT state", regionNode);
999        return null;
1000      }
1001      // As in DisableTableProcedure or ModifyTableProcedure, we will hold the xlock for table, so
1002      // we can make sure that this procedure has not been executed yet, as TRSP will hold the
1003      // shared lock for table all the time. So here we will unset it and when it is actually
1004      // executed, it will find that the attach procedure is not itself and quit immediately.
1005      if (regionNode.getProcedure() != null) {
1006        regionNode.unsetProcedure(regionNode.getProcedure());
1007      }
1008      return regionNode.setProcedure(TransitRegionStateProcedure.unassign(getProcedureEnvironment(),
1009        regionNode.getRegionInfo()));
1010    } finally {
1011      regionNode.unlock();
1012    }
1013  }
1014
1015  /**
1016   * Called by DisableTableProcedure to unassign all the regions for a table.
1017   */
1018  public TransitRegionStateProcedure[] createUnassignProceduresForDisabling(TableName tableName) {
1019    return regionStates.getTableRegionStateNodes(tableName).stream()
1020      .map(this::forceCreateUnssignProcedure).filter(p -> p != null)
1021      .toArray(TransitRegionStateProcedure[]::new);
1022  }
1023
1024  /**
1025   * Called by ModifyTableProcedures to unassign all the excess region replicas for a table.
1026   */
1027  public TransitRegionStateProcedure[] createUnassignProceduresForClosingExcessRegionReplicas(
1028    TableName tableName, int newReplicaCount) {
1029    return regionStates.getTableRegionStateNodes(tableName).stream()
1030      .filter(regionNode -> regionNode.getRegionInfo().getReplicaId() >= newReplicaCount)
1031      .map(this::forceCreateUnssignProcedure).filter(p -> p != null)
1032      .toArray(TransitRegionStateProcedure[]::new);
1033  }
1034
1035  public SplitTableRegionProcedure createSplitProcedure(final RegionInfo regionToSplit,
1036    final byte[] splitKey) throws IOException {
1037    return new SplitTableRegionProcedure(getProcedureEnvironment(), regionToSplit, splitKey);
1038  }
1039
1040  public MergeTableRegionsProcedure createMergeProcedure(RegionInfo... ris) throws IOException {
1041    return new MergeTableRegionsProcedure(getProcedureEnvironment(), ris, false);
1042  }
1043
1044  /**
1045   * Delete the region states. This is called by "DeleteTable"
1046   */
1047  public void deleteTable(final TableName tableName) throws IOException {
1048    final ArrayList<RegionInfo> regions = regionStates.getTableRegionsInfo(tableName);
1049    regionStateStore.deleteRegions(regions);
1050    for (int i = 0; i < regions.size(); ++i) {
1051      final RegionInfo regionInfo = regions.get(i);
1052      // we expect the region to be offline
1053      regionStates.removeFromOfflineRegions(regionInfo);
1054      regionStates.deleteRegion(regionInfo);
1055    }
1056  }
1057
1058  // ============================================================================================
1059  // RS Region Transition Report helpers
1060  // ============================================================================================
1061  private void reportRegionStateTransition(ReportRegionStateTransitionResponse.Builder builder,
1062    ServerName serverName, List<RegionStateTransition> transitionList) throws IOException {
1063    for (RegionStateTransition transition : transitionList) {
1064      switch (transition.getTransitionCode()) {
1065        case OPENED:
1066        case FAILED_OPEN:
1067        case CLOSED:
1068          assert transition.getRegionInfoCount() == 1 : transition;
1069          final RegionInfo hri = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0));
1070          long procId =
1071            transition.getProcIdCount() > 0 ? transition.getProcId(0) : Procedure.NO_PROC_ID;
1072          updateRegionTransition(serverName, transition.getTransitionCode(), hri,
1073            transition.hasOpenSeqNum() ? transition.getOpenSeqNum() : HConstants.NO_SEQNUM, procId);
1074          break;
1075        case READY_TO_SPLIT:
1076        case SPLIT:
1077        case SPLIT_REVERTED:
1078          assert transition.getRegionInfoCount() == 3 : transition;
1079          final RegionInfo parent = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0));
1080          final RegionInfo splitA = ProtobufUtil.toRegionInfo(transition.getRegionInfo(1));
1081          final RegionInfo splitB = ProtobufUtil.toRegionInfo(transition.getRegionInfo(2));
1082          updateRegionSplitTransition(serverName, transition.getTransitionCode(), parent, splitA,
1083            splitB);
1084          break;
1085        case READY_TO_MERGE:
1086        case MERGED:
1087        case MERGE_REVERTED:
1088          assert transition.getRegionInfoCount() == 3 : transition;
1089          final RegionInfo merged = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0));
1090          final RegionInfo mergeA = ProtobufUtil.toRegionInfo(transition.getRegionInfo(1));
1091          final RegionInfo mergeB = ProtobufUtil.toRegionInfo(transition.getRegionInfo(2));
1092          updateRegionMergeTransition(serverName, transition.getTransitionCode(), merged, mergeA,
1093            mergeB);
1094          break;
1095      }
1096    }
1097  }
1098
1099  public ReportRegionStateTransitionResponse reportRegionStateTransition(
1100    final ReportRegionStateTransitionRequest req) throws PleaseHoldException {
1101    ReportRegionStateTransitionResponse.Builder builder =
1102      ReportRegionStateTransitionResponse.newBuilder();
1103    ServerName serverName = ProtobufUtil.toServerName(req.getServer());
1104    ServerStateNode serverNode = regionStates.getOrCreateServer(serverName);
1105    // here we have to acquire a read lock instead of a simple exclusive lock. This is because that
1106    // we should not block other reportRegionStateTransition call from the same region server. This
1107    // is not only about performance, but also to prevent dead lock. Think of the meta region is
1108    // also on the same region server and you hold the lock which blocks the
1109    // reportRegionStateTransition for meta, and since meta is not online, you will block inside the
1110    // lock protection to wait for meta online...
1111    serverNode.readLock().lock();
1112    try {
1113      // we only accept reportRegionStateTransition if the region server is online, see the comment
1114      // above in submitServerCrash method and HBASE-21508 for more details.
1115      if (serverNode.isInState(ServerState.ONLINE)) {
1116        try {
1117          reportRegionStateTransition(builder, serverName, req.getTransitionList());
1118        } catch (PleaseHoldException e) {
1119          LOG.trace("Failed transition ", e);
1120          throw e;
1121        } catch (UnsupportedOperationException | IOException e) {
1122          // TODO: at the moment we have a single error message and the RS will abort
1123          // if the master says that one of the region transitions failed.
1124          LOG.warn("Failed transition", e);
1125          builder.setErrorMessage("Failed transition " + e.getMessage());
1126        }
1127      } else {
1128        LOG.warn("The region server {} is already dead, skip reportRegionStateTransition call",
1129          serverName);
1130        builder.setErrorMessage("You are dead");
1131      }
1132    } finally {
1133      serverNode.readLock().unlock();
1134    }
1135
1136    return builder.build();
1137  }
1138
1139  private void updateRegionTransition(ServerName serverName, TransitionCode state,
1140    RegionInfo regionInfo, long seqId, long procId) throws IOException {
1141    checkMetaLoaded(regionInfo);
1142
1143    RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo);
1144    if (regionNode == null) {
1145      // the table/region is gone. maybe a delete, split, merge
1146      throw new UnexpectedStateException(String.format(
1147        "Server %s was trying to transition region %s to %s. but Region is not known.", serverName,
1148        regionInfo, state));
1149    }
1150    LOG.trace("Update region transition serverName={} region={} regionState={}", serverName,
1151      regionNode, state);
1152
1153    ServerStateNode serverNode = regionStates.getOrCreateServer(serverName);
1154    regionNode.lock();
1155    try {
1156      if (!reportTransition(regionNode, serverNode, state, seqId, procId)) {
1157        // Don't log WARN if shutting down cluster; during shutdown. Avoid the below messages:
1158        // 2018-08-13 10:45:10,551 WARN ...AssignmentManager: No matching procedure found for
1159        // rit=OPEN, location=ve0538.halxg.cloudera.com,16020,1533493000958,
1160        // table=IntegrationTestBigLinkedList, region=65ab289e2fc1530df65f6c3d7cde7aa5 transition
1161        // to CLOSED
1162        // These happen because on cluster shutdown, we currently let the RegionServers close
1163        // regions. This is the only time that region close is not run by the Master (so cluster
1164        // goes down fast). Consider changing it so Master runs all shutdowns.
1165        if (
1166          this.master.getServerManager().isClusterShutdown() && state.equals(TransitionCode.CLOSED)
1167        ) {
1168          LOG.info("RegionServer {} {}", state, regionNode.getRegionInfo().getEncodedName());
1169        } else {
1170          LOG.warn("No matching procedure found for {} transition on {} to {}", serverName,
1171            regionNode, state);
1172        }
1173      }
1174    } finally {
1175      regionNode.unlock();
1176    }
1177  }
1178
1179  private boolean reportTransition(RegionStateNode regionNode, ServerStateNode serverNode,
1180    TransitionCode state, long seqId, long procId) throws IOException {
1181    ServerName serverName = serverNode.getServerName();
1182    TransitRegionStateProcedure proc = regionNode.getProcedure();
1183    if (proc == null) {
1184      return false;
1185    }
1186    proc.reportTransition(master.getMasterProcedureExecutor().getEnvironment(), regionNode,
1187      serverName, state, seqId, procId);
1188    return true;
1189  }
1190
1191  private void updateRegionSplitTransition(final ServerName serverName, final TransitionCode state,
1192    final RegionInfo parent, final RegionInfo hriA, final RegionInfo hriB) throws IOException {
1193    checkMetaLoaded(parent);
1194
1195    if (state != TransitionCode.READY_TO_SPLIT) {
1196      throw new UnexpectedStateException(
1197        "unsupported split regionState=" + state + " for parent region " + parent
1198          + " maybe an old RS (< 2.0) had the operation in progress");
1199    }
1200
1201    // sanity check on the request
1202    if (!Bytes.equals(hriA.getEndKey(), hriB.getStartKey())) {
1203      throw new UnsupportedOperationException("unsupported split request with bad keys: parent="
1204        + parent + " hriA=" + hriA + " hriB=" + hriB);
1205    }
1206
1207    if (!master.isSplitOrMergeEnabled(MasterSwitchType.SPLIT)) {
1208      LOG.warn("Split switch is off! skip split of " + parent);
1209      throw new DoNotRetryIOException(
1210        "Split region " + parent.getRegionNameAsString() + " failed due to split switch off");
1211    }
1212
1213    // Submit the Split procedure
1214    final byte[] splitKey = hriB.getStartKey();
1215    if (LOG.isDebugEnabled()) {
1216      LOG.debug("Split request from " + serverName + ", parent=" + parent + " splitKey="
1217        + Bytes.toStringBinary(splitKey));
1218    }
1219    // Processing this report happens asynchronously from other activities which can mutate
1220    // the region state. For example, a split procedure may already be running for this parent.
1221    // A split procedure cannot succeed if the parent region is no longer open, so we can
1222    // ignore it in that case.
1223    // Note that submitting more than one split procedure for a given region is
1224    // harmless -- the split is fenced in the procedure handling -- but it would be noisy in
1225    // the logs. Only one procedure can succeed. The other procedure(s) would abort during
1226    // initialization and report failure with WARN level logging.
1227    RegionState parentState = regionStates.getRegionState(parent);
1228    if (parentState != null && parentState.isOpened()) {
1229      master.getMasterProcedureExecutor().submitProcedure(createSplitProcedure(parent, splitKey));
1230    } else {
1231      LOG.info("Ignoring split request from " + serverName + ", parent=" + parent
1232        + " because parent is unknown or not open");
1233      return;
1234    }
1235
1236    // If the RS is < 2.0 throw an exception to abort the operation, we are handling the split
1237    if (master.getServerManager().getVersionNumber(serverName) < 0x0200000) {
1238      throw new UnsupportedOperationException(
1239        String.format("Split handled by the master: " + "parent=%s hriA=%s hriB=%s",
1240          parent.getShortNameToLog(), hriA, hriB));
1241    }
1242  }
1243
1244  private void updateRegionMergeTransition(final ServerName serverName, final TransitionCode state,
1245    final RegionInfo merged, final RegionInfo hriA, final RegionInfo hriB) throws IOException {
1246    checkMetaLoaded(merged);
1247
1248    if (state != TransitionCode.READY_TO_MERGE) {
1249      throw new UnexpectedStateException(
1250        "Unsupported merge regionState=" + state + " for regionA=" + hriA + " regionB=" + hriB
1251          + " merged=" + merged + " maybe an old RS (< 2.0) had the operation in progress");
1252    }
1253
1254    if (!master.isSplitOrMergeEnabled(MasterSwitchType.MERGE)) {
1255      LOG.warn("Merge switch is off! skip merge of regionA=" + hriA + " regionB=" + hriB);
1256      throw new DoNotRetryIOException(
1257        "Merge of regionA=" + hriA + " regionB=" + hriB + " failed because merge switch is off");
1258    }
1259
1260    // Submit the Merge procedure
1261    if (LOG.isDebugEnabled()) {
1262      LOG.debug("Handling merge request from RS=" + merged + ", merged=" + merged);
1263    }
1264    master.getMasterProcedureExecutor().submitProcedure(createMergeProcedure(hriA, hriB));
1265
1266    // If the RS is < 2.0 throw an exception to abort the operation, we are handling the merge
1267    if (master.getServerManager().getVersionNumber(serverName) < 0x0200000) {
1268      throw new UnsupportedOperationException(
1269        String.format("Merge not handled yet: regionState=%s merged=%s hriA=%s hriB=%s", state,
1270          merged, hriA, hriB));
1271    }
1272  }
1273
1274  // ============================================================================================
1275  // RS Status update (report online regions) helpers
1276  // ============================================================================================
1277  /**
1278   * The master will call this method when the RS send the regionServerReport(). The report will
1279   * contains the "online regions". This method will check the the online regions against the
1280   * in-memory state of the AM, and we will log a warn message if there is a mismatch. This is
1281   * because that there is no fencing between the reportRegionStateTransition method and
1282   * regionServerReport method, so there could be race and introduce inconsistency here, but
1283   * actually there is no problem.
1284   * <p/>
1285   * Please see HBASE-21421 and HBASE-21463 for more details.
1286   */
1287  public void reportOnlineRegions(ServerName serverName, Set<byte[]> regionNames) {
1288    if (!isRunning()) {
1289      return;
1290    }
1291    if (LOG.isTraceEnabled()) {
1292      LOG.trace("ReportOnlineRegions {} regionCount={}, metaLoaded={} {}", serverName,
1293        regionNames.size(), isMetaLoaded(),
1294        regionNames.stream().map(Bytes::toStringBinary).collect(Collectors.toList()));
1295    }
1296
1297    ServerStateNode serverNode = regionStates.getOrCreateServer(serverName);
1298    synchronized (serverNode) {
1299      if (!serverNode.isInState(ServerState.ONLINE)) {
1300        LOG.warn("Got a report from a server result in state " + serverNode.getState());
1301        return;
1302      }
1303    }
1304
1305    // Track the regionserver reported online regions in memory.
1306    synchronized (rsReports) {
1307      rsReports.put(serverName, regionNames);
1308    }
1309
1310    if (regionNames.isEmpty()) {
1311      // nothing to do if we don't have regions
1312      LOG.trace("no online region found on {}", serverName);
1313      return;
1314    }
1315    if (!isMetaLoaded()) {
1316      // we are still on startup, skip checking
1317      return;
1318    }
1319    // The Heartbeat tells us of what regions are on the region serve, check the state.
1320    checkOnlineRegionsReport(serverNode, regionNames);
1321  }
1322
1323  /**
1324   * Close <code>regionName</code> on <code>sn</code> silently and immediately without using a
1325   * Procedure or going via hbase:meta. For case where a RegionServer's hosting of a Region is not
1326   * aligned w/ the Master's accounting of Region state. This is for cleaning up an error in
1327   * accounting.
1328   */
1329  private void closeRegionSilently(ServerName sn, byte[] regionName) {
1330    try {
1331      RegionInfo ri = CatalogFamilyFormat.parseRegionInfoFromRegionName(regionName);
1332      // Pass -1 for timeout. Means do not wait.
1333      ServerManager.closeRegionSilentlyAndWait(this.master.getAsyncClusterConnection(), sn, ri, -1);
1334    } catch (Exception e) {
1335      LOG.error("Failed trying to close {} on {}", Bytes.toStringBinary(regionName), sn, e);
1336    }
1337  }
1338
1339  /**
1340   * Check that what the RegionServer reports aligns with the Master's image. If disagreement, we
1341   * will tell the RegionServer to expediently close a Region we do not think it should have.
1342   */
1343  private void checkOnlineRegionsReport(ServerStateNode serverNode, Set<byte[]> regionNames) {
1344    ServerName serverName = serverNode.getServerName();
1345    for (byte[] regionName : regionNames) {
1346      if (!isRunning()) {
1347        return;
1348      }
1349      RegionStateNode regionNode = regionStates.getRegionStateNodeFromName(regionName);
1350      if (regionNode == null) {
1351        String regionNameAsStr = Bytes.toStringBinary(regionName);
1352        LOG.warn("No RegionStateNode for {} but reported as up on {}; closing...", regionNameAsStr,
1353          serverName);
1354        closeRegionSilently(serverNode.getServerName(), regionName);
1355        continue;
1356      }
1357      final long lag = 1000;
1358      regionNode.lock();
1359      try {
1360        long diff = EnvironmentEdgeManager.currentTime() - regionNode.getLastUpdate();
1361        if (regionNode.isInState(State.OPENING, State.OPEN)) {
1362          // This is possible as a region server has just closed a region but the region server
1363          // report is generated before the closing, but arrive after the closing. Make sure there
1364          // is some elapsed time so less false alarms.
1365          if (!regionNode.getRegionLocation().equals(serverName) && diff > lag) {
1366            LOG.warn("Reporting {} server does not match {} (time since last "
1367              + "update={}ms); closing...", serverName, regionNode, diff);
1368            closeRegionSilently(serverNode.getServerName(), regionName);
1369          }
1370        } else if (!regionNode.isInState(State.CLOSING, State.SPLITTING)) {
1371          // So, we can get report that a region is CLOSED or SPLIT because a heartbeat
1372          // came in at about same time as a region transition. Make sure there is some
1373          // elapsed time so less false alarms.
1374          if (diff > lag) {
1375            LOG.warn("Reporting {} state does not match {} (time since last update={}ms)",
1376              serverName, regionNode, diff);
1377          }
1378        }
1379      } finally {
1380        regionNode.unlock();
1381      }
1382    }
1383  }
1384
1385  // ============================================================================================
1386  // RIT chore
1387  // ============================================================================================
1388  private static class RegionInTransitionChore extends ProcedureInMemoryChore<MasterProcedureEnv> {
1389    public RegionInTransitionChore(final int timeoutMsec) {
1390      super(timeoutMsec);
1391    }
1392
1393    @Override
1394    protected void periodicExecute(final MasterProcedureEnv env) {
1395      final AssignmentManager am = env.getAssignmentManager();
1396
1397      final RegionInTransitionStat ritStat = am.computeRegionInTransitionStat();
1398      if (ritStat.hasRegionsOverThreshold()) {
1399        for (RegionState hri : ritStat.getRegionOverThreshold()) {
1400          am.handleRegionOverStuckWarningThreshold(hri.getRegion());
1401        }
1402      }
1403
1404      // update metrics
1405      am.updateRegionsInTransitionMetrics(ritStat);
1406    }
1407  }
1408
1409  private static class DeadServerMetricRegionChore
1410    extends ProcedureInMemoryChore<MasterProcedureEnv> {
1411    public DeadServerMetricRegionChore(final int timeoutMsec) {
1412      super(timeoutMsec);
1413    }
1414
1415    @Override
1416    protected void periodicExecute(final MasterProcedureEnv env) {
1417      final ServerManager sm = env.getMasterServices().getServerManager();
1418      final AssignmentManager am = env.getAssignmentManager();
1419      // To minimize inconsistencies we are not going to snapshot live servers in advance in case
1420      // new servers are added; OTOH we don't want to add heavy sync for a consistent view since
1421      // this is for metrics. Instead, we're going to check each regions as we go; to avoid making
1422      // too many checks, we maintain a local lists of server, limiting us to false negatives. If
1423      // we miss some recently-dead server, we'll just see it next time.
1424      Set<ServerName> recentlyLiveServers = new HashSet<>();
1425      int deadRegions = 0, unknownRegions = 0;
1426      for (RegionStateNode rsn : am.getRegionStates().getRegionStateNodes()) {
1427        if (rsn.getState() != State.OPEN) {
1428          continue; // Opportunistic check, should quickly skip RITs, offline tables, etc.
1429        }
1430        // Do not need to acquire region state lock as this is only for showing metrics.
1431        ServerName sn = rsn.getRegionLocation();
1432        State state = rsn.getState();
1433        if (state != State.OPEN) {
1434          continue; // Mostly skipping RITs that are already being take care of.
1435        }
1436        if (sn == null) {
1437          ++unknownRegions; // Opened on null?
1438          continue;
1439        }
1440        if (recentlyLiveServers.contains(sn)) {
1441          continue;
1442        }
1443        ServerManager.ServerLiveState sls = sm.isServerKnownAndOnline(sn);
1444        switch (sls) {
1445          case LIVE:
1446            recentlyLiveServers.add(sn);
1447            break;
1448          case DEAD:
1449            ++deadRegions;
1450            break;
1451          case UNKNOWN:
1452            ++unknownRegions;
1453            break;
1454          default:
1455            throw new AssertionError("Unexpected " + sls);
1456        }
1457      }
1458      if (deadRegions > 0 || unknownRegions > 0) {
1459        LOG.info("Found {} OPEN regions on dead servers and {} OPEN regions on unknown servers",
1460          deadRegions, unknownRegions);
1461      }
1462
1463      am.updateDeadServerRegionMetrics(deadRegions, unknownRegions);
1464    }
1465  }
1466
1467  public RegionInTransitionStat computeRegionInTransitionStat() {
1468    final RegionInTransitionStat rit = new RegionInTransitionStat(getConfiguration());
1469    rit.update(this);
1470    return rit;
1471  }
1472
1473  public static class RegionInTransitionStat {
1474    private final int ritThreshold;
1475
1476    private HashMap<String, RegionState> ritsOverThreshold = null;
1477    private long statTimestamp;
1478    private long oldestRITTime = 0;
1479    private int totalRITsTwiceThreshold = 0;
1480    private int totalRITs = 0;
1481
1482    public RegionInTransitionStat(final Configuration conf) {
1483      this.ritThreshold =
1484        conf.getInt(METRICS_RIT_STUCK_WARNING_THRESHOLD, DEFAULT_RIT_STUCK_WARNING_THRESHOLD);
1485    }
1486
1487    public int getRITThreshold() {
1488      return ritThreshold;
1489    }
1490
1491    public long getTimestamp() {
1492      return statTimestamp;
1493    }
1494
1495    public int getTotalRITs() {
1496      return totalRITs;
1497    }
1498
1499    public long getOldestRITTime() {
1500      return oldestRITTime;
1501    }
1502
1503    public int getTotalRITsOverThreshold() {
1504      Map<String, RegionState> m = this.ritsOverThreshold;
1505      return m != null ? m.size() : 0;
1506    }
1507
1508    public boolean hasRegionsTwiceOverThreshold() {
1509      return totalRITsTwiceThreshold > 0;
1510    }
1511
1512    public boolean hasRegionsOverThreshold() {
1513      Map<String, RegionState> m = this.ritsOverThreshold;
1514      return m != null && !m.isEmpty();
1515    }
1516
1517    public Collection<RegionState> getRegionOverThreshold() {
1518      Map<String, RegionState> m = this.ritsOverThreshold;
1519      return m != null ? m.values() : Collections.emptySet();
1520    }
1521
1522    public boolean isRegionOverThreshold(final RegionInfo regionInfo) {
1523      Map<String, RegionState> m = this.ritsOverThreshold;
1524      return m != null && m.containsKey(regionInfo.getEncodedName());
1525    }
1526
1527    public boolean isRegionTwiceOverThreshold(final RegionInfo regionInfo) {
1528      Map<String, RegionState> m = this.ritsOverThreshold;
1529      if (m == null) {
1530        return false;
1531      }
1532      final RegionState state = m.get(regionInfo.getEncodedName());
1533      if (state == null) {
1534        return false;
1535      }
1536      return (statTimestamp - state.getStamp()) > (ritThreshold * 2);
1537    }
1538
1539    protected void update(final AssignmentManager am) {
1540      final RegionStates regionStates = am.getRegionStates();
1541      this.statTimestamp = EnvironmentEdgeManager.currentTime();
1542      update(regionStates.getRegionsStateInTransition(), statTimestamp);
1543      update(regionStates.getRegionFailedOpen(), statTimestamp);
1544
1545      if (LOG.isDebugEnabled() && ritsOverThreshold != null && !ritsOverThreshold.isEmpty()) {
1546        LOG.debug("RITs over threshold: {}",
1547          ritsOverThreshold.entrySet().stream()
1548            .map(e -> e.getKey() + ":" + e.getValue().getState().name())
1549            .collect(Collectors.joining("\n")));
1550      }
1551    }
1552
1553    private void update(final Collection<RegionState> regions, final long currentTime) {
1554      for (RegionState state : regions) {
1555        totalRITs++;
1556        final long ritStartedMs = state.getStamp();
1557        if (ritStartedMs == 0) {
1558          // Don't output bogus values to metrics if they accidentally make it here.
1559          LOG.warn("The RIT {} has no start time", state.getRegion());
1560          continue;
1561        }
1562        final long ritTime = currentTime - ritStartedMs;
1563        if (ritTime > ritThreshold) {
1564          if (ritsOverThreshold == null) {
1565            ritsOverThreshold = new HashMap<String, RegionState>();
1566          }
1567          ritsOverThreshold.put(state.getRegion().getEncodedName(), state);
1568          totalRITsTwiceThreshold += (ritTime > (ritThreshold * 2)) ? 1 : 0;
1569        }
1570        if (oldestRITTime < ritTime) {
1571          oldestRITTime = ritTime;
1572        }
1573      }
1574    }
1575  }
1576
1577  private void updateRegionsInTransitionMetrics(final RegionInTransitionStat ritStat) {
1578    metrics.updateRITOldestAge(ritStat.getOldestRITTime());
1579    metrics.updateRITCount(ritStat.getTotalRITs());
1580    metrics.updateRITCountOverThreshold(ritStat.getTotalRITsOverThreshold());
1581  }
1582
1583  private void updateDeadServerRegionMetrics(int deadRegions, int unknownRegions) {
1584    metrics.updateDeadServerOpenRegions(deadRegions);
1585    metrics.updateUnknownServerOpenRegions(unknownRegions);
1586  }
1587
1588  private void handleRegionOverStuckWarningThreshold(final RegionInfo regionInfo) {
1589    final RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo);
1590    // if (regionNode.isStuck()) {
1591    LOG.warn("STUCK Region-In-Transition {}", regionNode);
1592  }
1593
1594  // ============================================================================================
1595  // TODO: Master load/bootstrap
1596  // ============================================================================================
1597  public void joinCluster() throws IOException {
1598    long startTime = System.nanoTime();
1599    LOG.debug("Joining cluster...");
1600
1601    // Scan hbase:meta to build list of existing regions, servers, and assignment.
1602    // hbase:meta is online now or will be. Inside loadMeta, we keep trying. Can't make progress
1603    // w/o meta.
1604    loadMeta();
1605
1606    while (master.getServerManager().countOfRegionServers() < 1) {
1607      LOG.info("Waiting for RegionServers to join; current count={}",
1608        master.getServerManager().countOfRegionServers());
1609      Threads.sleep(250);
1610    }
1611    LOG.info("Number of RegionServers={}", master.getServerManager().countOfRegionServers());
1612
1613    // Start the chores
1614    master.getMasterProcedureExecutor().addChore(this.ritChore);
1615    master.getMasterProcedureExecutor().addChore(this.deadMetricChore);
1616
1617    long costMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
1618    LOG.info("Joined the cluster in {}", StringUtils.humanTimeDiff(costMs));
1619  }
1620
1621  /**
1622   * Create assign procedure for offline regions. Just follow the old
1623   * processofflineServersWithOnlineRegions method. Since now we do not need to deal with dead
1624   * server any more, we only deal with the regions in OFFLINE state in this method. And this is a
1625   * bit strange, that for new regions, we will add it in CLOSED state instead of OFFLINE state, and
1626   * usually there will be a procedure to track them. The processofflineServersWithOnlineRegions is
1627   * a legacy from long ago, as things are going really different now, maybe we do not need this
1628   * method any more. Need to revisit later.
1629   */
1630  // Public so can be run by the Master as part of the startup. Needs hbase:meta to be online.
1631  // Needs to be done after the table state manager has been started.
1632  public void processOfflineRegions() {
1633    TransitRegionStateProcedure[] procs =
1634      regionStates.getRegionStateNodes().stream().filter(rsn -> rsn.isInState(State.OFFLINE))
1635        .filter(rsn -> isTableEnabled(rsn.getRegionInfo().getTable())).map(rsn -> {
1636          rsn.lock();
1637          try {
1638            if (rsn.getProcedure() != null) {
1639              return null;
1640            } else {
1641              return rsn.setProcedure(TransitRegionStateProcedure.assign(getProcedureEnvironment(),
1642                rsn.getRegionInfo(), null));
1643            }
1644          } finally {
1645            rsn.unlock();
1646          }
1647        }).filter(p -> p != null).toArray(TransitRegionStateProcedure[]::new);
1648    if (procs.length > 0) {
1649      master.getMasterProcedureExecutor().submitProcedures(procs);
1650    }
1651  }
1652
1653  /*
1654   * AM internal RegionStateStore.RegionStateVisitor implementation. To be used when scanning META
1655   * table for region rows, using RegionStateStore utility methods. RegionStateStore methods will
1656   * convert Result into proper RegionInfo instances, but those would still need to be added into
1657   * AssignmentManager.regionStates in-memory cache. RegionMetaLoadingVisitor.visitRegionState
1658   * method provides the logic for adding RegionInfo instances as loaded from latest META scan into
1659   * AssignmentManager.regionStates.
1660   */
1661  private class RegionMetaLoadingVisitor implements RegionStateStore.RegionStateVisitor {
1662
1663    @Override
1664    public void visitRegionState(Result result, final RegionInfo regionInfo, final State state,
1665      final ServerName regionLocation, final ServerName lastHost, final long openSeqNum) {
1666      if (
1667        state == null && regionLocation == null && lastHost == null
1668          && openSeqNum == SequenceId.NO_SEQUENCE_ID
1669      ) {
1670        // This is a row with nothing in it.
1671        LOG.warn("Skipping empty row={}", result);
1672        return;
1673      }
1674      State localState = state;
1675      if (localState == null) {
1676        // No region state column data in hbase:meta table! Are I doing a rolling upgrade from
1677        // hbase1 to hbase2? Am I restoring a SNAPSHOT or otherwise adding a region to hbase:meta?
1678        // In any of these cases, state is empty. For now, presume OFFLINE but there are probably
1679        // cases where we need to probe more to be sure this correct; TODO informed by experience.
1680        LOG.info(regionInfo.getEncodedName() + " regionState=null; presuming " + State.OFFLINE);
1681        localState = State.OFFLINE;
1682      }
1683      RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo);
1684      // Do not need to lock on regionNode, as we can make sure that before we finish loading
1685      // meta, all the related procedures can not be executed. The only exception is for meta
1686      // region related operations, but here we do not load the informations for meta region.
1687      regionNode.setState(localState);
1688      regionNode.setLastHost(lastHost);
1689      regionNode.setRegionLocation(regionLocation);
1690      regionNode.setOpenSeqNum(openSeqNum);
1691
1692      // Note: keep consistent with other methods, see region(Opening|Opened|Closing)
1693      // RIT/ServerCrash handling should take care of the transiting regions.
1694      if (
1695        localState.matches(State.OPEN, State.OPENING, State.CLOSING, State.SPLITTING, State.MERGING)
1696      ) {
1697        assert regionLocation != null : "found null region location for " + regionNode;
1698        regionStates.addRegionToServer(regionNode);
1699      } else if (localState == State.OFFLINE || regionInfo.isOffline()) {
1700        regionStates.addToOfflineRegions(regionNode);
1701      }
1702      if (regionNode.getProcedure() != null) {
1703        regionNode.getProcedure().stateLoaded(AssignmentManager.this, regionNode);
1704      }
1705    }
1706  };
1707
1708  /**
1709   * Query META if the given <code>RegionInfo</code> exists, adding to
1710   * <code>AssignmentManager.regionStateStore</code> cache if the region is found in META.
1711   * @param regionEncodedName encoded name for the region to be loaded from META into
1712   *                          <code>AssignmentManager.regionStateStore</code> cache
1713   * @return <code>RegionInfo</code> instance for the given region if it is present in META and got
1714   *         successfully loaded into <code>AssignmentManager.regionStateStore</code> cache,
1715   *         <b>null</b> otherwise.
1716   * @throws UnknownRegionException if any errors occur while querying meta.
1717   */
1718  public RegionInfo loadRegionFromMeta(String regionEncodedName) throws UnknownRegionException {
1719    try {
1720      RegionMetaLoadingVisitor visitor = new RegionMetaLoadingVisitor();
1721      regionStateStore.visitMetaForRegion(regionEncodedName, visitor);
1722      return regionStates.getRegionState(regionEncodedName) == null
1723        ? null
1724        : regionStates.getRegionState(regionEncodedName).getRegion();
1725    } catch (IOException e) {
1726      throw new UnknownRegionException(
1727        "Error trying to load region " + regionEncodedName + " from META", e);
1728    }
1729  }
1730
1731  private void loadMeta() throws IOException {
1732    // TODO: use a thread pool
1733    regionStateStore.visitMeta(new RegionMetaLoadingVisitor());
1734  }
1735
1736  /**
1737   * Used to check if the meta loading is done.
1738   * <p/>
1739   * if not we throw PleaseHoldException since we are rebuilding the RegionStates
1740   * @param hri region to check if it is already rebuild
1741   * @throws PleaseHoldException if meta has not been loaded yet
1742   */
1743  private void checkMetaLoaded(RegionInfo hri) throws PleaseHoldException {
1744    if (!isRunning()) {
1745      throw new PleaseHoldException("AssignmentManager not running");
1746    }
1747    boolean meta = isMetaRegion(hri);
1748    boolean metaLoaded = isMetaLoaded();
1749    if (!meta && !metaLoaded) {
1750      throw new PleaseHoldException(
1751        "Master not fully online; hbase:meta=" + meta + ", metaLoaded=" + metaLoaded);
1752    }
1753  }
1754
1755  // ============================================================================================
1756  // TODO: Metrics
1757  // ============================================================================================
1758  public int getNumRegionsOpened() {
1759    // TODO: Used by TestRegionPlacement.java and assume monotonically increasing value
1760    return 0;
1761  }
1762
1763  /**
1764   * Usually run by the Master in reaction to server crash during normal processing. Can also be
1765   * invoked via external RPC to effect repair; in the latter case, the 'force' flag is set so we
1766   * push through the SCP though context may indicate already-running-SCP (An old SCP may have
1767   * exited abnormally, or damaged cluster may still have references in hbase:meta to 'Unknown
1768   * Servers' -- servers that are not online or in dead servers list, etc.)
1769   * @param force Set if the request came in externally over RPC (via hbck2). Force means run the
1770   *              SCP even if it seems as though there might be an outstanding SCP running.
1771   * @return pid of scheduled SCP or {@link Procedure#NO_PROC_ID} if none scheduled.
1772   */
1773  public long submitServerCrash(ServerName serverName, boolean shouldSplitWal, boolean force) {
1774    // May be an 'Unknown Server' so handle case where serverNode is null.
1775    ServerStateNode serverNode = regionStates.getServerNode(serverName);
1776    // Remove the in-memory rsReports result
1777    synchronized (rsReports) {
1778      rsReports.remove(serverName);
1779    }
1780
1781    // We hold the write lock here for fencing on reportRegionStateTransition. Once we set the
1782    // server state to CRASHED, we will no longer accept the reportRegionStateTransition call from
1783    // this server. This is used to simplify the implementation for TRSP and SCP, where we can make
1784    // sure that, the region list fetched by SCP will not be changed any more.
1785    if (serverNode != null) {
1786      serverNode.writeLock().lock();
1787    }
1788    boolean carryingMeta;
1789    long pid;
1790    try {
1791      ProcedureExecutor<MasterProcedureEnv> procExec = this.master.getMasterProcedureExecutor();
1792      carryingMeta = isCarryingMeta(serverName);
1793      if (!force && serverNode != null && !serverNode.isInState(ServerState.ONLINE)) {
1794        LOG.info("Skip adding ServerCrashProcedure for {} (meta={}) -- running?", serverNode,
1795          carryingMeta);
1796        return Procedure.NO_PROC_ID;
1797      } else {
1798        MasterProcedureEnv mpe = procExec.getEnvironment();
1799        // If serverNode == null, then 'Unknown Server'. Schedule HBCKSCP instead.
1800        // HBCKSCP scours Master in-memory state AND hbase;meta for references to
1801        // serverName just-in-case. An SCP that is scheduled when the server is
1802        // 'Unknown' probably originated externally with HBCK2 fix-it tool.
1803        ServerState oldState = null;
1804        if (serverNode != null) {
1805          oldState = serverNode.getState();
1806          serverNode.setState(ServerState.CRASHED);
1807        }
1808
1809        if (force) {
1810          pid = procExec.submitProcedure(
1811            new HBCKServerCrashProcedure(mpe, serverName, shouldSplitWal, carryingMeta));
1812        } else {
1813          pid = procExec.submitProcedure(
1814            new ServerCrashProcedure(mpe, serverName, shouldSplitWal, carryingMeta));
1815        }
1816        LOG.info("Scheduled ServerCrashProcedure pid={} for {} (carryingMeta={}){}.", pid,
1817          serverName, carryingMeta,
1818          serverNode == null ? "" : " " + serverNode.toString() + ", oldState=" + oldState);
1819      }
1820    } finally {
1821      if (serverNode != null) {
1822        serverNode.writeLock().unlock();
1823      }
1824    }
1825    return pid;
1826  }
1827
1828  public void offlineRegion(final RegionInfo regionInfo) {
1829    // TODO used by MasterRpcServices
1830    RegionStateNode node = regionStates.getRegionStateNode(regionInfo);
1831    if (node != null) {
1832      node.offline();
1833    }
1834  }
1835
1836  public void onlineRegion(final RegionInfo regionInfo, final ServerName serverName) {
1837    // TODO used by TestSplitTransactionOnCluster.java
1838  }
1839
1840  public Map<ServerName, List<RegionInfo>>
1841    getSnapShotOfAssignment(final Collection<RegionInfo> regions) {
1842    return regionStates.getSnapShotOfAssignment(regions);
1843  }
1844
1845  // ============================================================================================
1846  // TODO: UTILS/HELPERS?
1847  // ============================================================================================
1848  /**
1849   * Used by the client (via master) to identify if all regions have the schema updates
1850   * @return Pair indicating the status of the alter command (pending/total)
1851   */
1852  public Pair<Integer, Integer> getReopenStatus(TableName tableName) {
1853    if (isTableDisabled(tableName)) {
1854      return new Pair<Integer, Integer>(0, 0);
1855    }
1856
1857    final List<RegionState> states = regionStates.getTableRegionStates(tableName);
1858    int ritCount = 0;
1859    for (RegionState regionState : states) {
1860      if (!regionState.isOpened() && !regionState.isSplit()) {
1861        ritCount++;
1862      }
1863    }
1864    return new Pair<Integer, Integer>(ritCount, states.size());
1865  }
1866
1867  // ============================================================================================
1868  // TODO: Region State In Transition
1869  // ============================================================================================
1870  public boolean hasRegionsInTransition() {
1871    return regionStates.hasRegionsInTransition();
1872  }
1873
1874  public List<RegionStateNode> getRegionsInTransition() {
1875    return regionStates.getRegionsInTransition();
1876  }
1877
1878  public List<RegionInfo> getAssignedRegions() {
1879    return regionStates.getAssignedRegions();
1880  }
1881
1882  public RegionInfo getRegionInfo(final byte[] regionName) {
1883    final RegionStateNode regionState = regionStates.getRegionStateNodeFromName(regionName);
1884    return regionState != null ? regionState.getRegionInfo() : null;
1885  }
1886
1887  // ============================================================================================
1888  // Expected states on region state transition.
1889  // Notice that there is expected states for transiting to OPENING state, this is because SCP.
1890  // See the comments in regionOpening method for more details.
1891  // ============================================================================================
1892  private static final State[] STATES_EXPECTED_ON_OPEN = { State.OPENING, // Normal case
1893    State.OPEN // Retrying
1894  };
1895
1896  private static final State[] STATES_EXPECTED_ON_CLOSING = { State.OPEN, // Normal case
1897    State.CLOSING, // Retrying
1898    State.SPLITTING, // Offline the split parent
1899    State.MERGING // Offline the merge parents
1900  };
1901
1902  private static final State[] STATES_EXPECTED_ON_CLOSED = { State.CLOSING, // Normal case
1903    State.CLOSED // Retrying
1904  };
1905
1906  // This is for manually scheduled region assign, can add other states later if we find out other
1907  // usages
1908  private static final State[] STATES_EXPECTED_ON_ASSIGN = { State.CLOSED, State.OFFLINE };
1909
1910  // We only allow unassign or move a region which is in OPEN state.
1911  private static final State[] STATES_EXPECTED_ON_UNASSIGN_OR_MOVE = { State.OPEN };
1912
1913  // ============================================================================================
1914  // Region Status update
1915  // Should only be called in TransitRegionStateProcedure(and related procedures), as the locking
1916  // and pre-assumptions are very tricky.
1917  // ============================================================================================
1918  private void transitStateAndUpdate(RegionStateNode regionNode, RegionState.State newState,
1919    RegionState.State... expectedStates) throws IOException {
1920    RegionState.State state = regionNode.getState();
1921    regionNode.transitionState(newState, expectedStates);
1922    boolean succ = false;
1923    try {
1924      regionStateStore.updateRegionLocation(regionNode);
1925      succ = true;
1926    } finally {
1927      if (!succ) {
1928        // revert
1929        regionNode.setState(state);
1930      }
1931    }
1932  }
1933
1934  // should be called within the synchronized block of RegionStateNode
1935  void regionOpening(RegionStateNode regionNode) throws IOException {
1936    // As in SCP, for performance reason, there is no TRSP attached with this region, we will not
1937    // update the region state, which means that the region could be in any state when we want to
1938    // assign it after a RS crash. So here we do not pass the expectedStates parameter.
1939    transitStateAndUpdate(regionNode, State.OPENING);
1940    regionStates.addRegionToServer(regionNode);
1941    // update the operation count metrics
1942    metrics.incrementOperationCounter();
1943  }
1944
1945  // should be called under the RegionStateNode lock
1946  // The parameter 'giveUp' means whether we will try to open the region again, if it is true, then
1947  // we will persist the FAILED_OPEN state into hbase:meta.
1948  void regionFailedOpen(RegionStateNode regionNode, boolean giveUp) throws IOException {
1949    RegionState.State state = regionNode.getState();
1950    ServerName regionLocation = regionNode.getRegionLocation();
1951    if (giveUp) {
1952      regionNode.setState(State.FAILED_OPEN);
1953      regionNode.setRegionLocation(null);
1954      boolean succ = false;
1955      try {
1956        regionStateStore.updateRegionLocation(regionNode);
1957        succ = true;
1958      } finally {
1959        if (!succ) {
1960          // revert
1961          regionNode.setState(state);
1962          regionNode.setRegionLocation(regionLocation);
1963        }
1964      }
1965    }
1966    if (regionLocation != null) {
1967      regionStates.removeRegionFromServer(regionLocation, regionNode);
1968    }
1969  }
1970
1971  // should be called under the RegionStateNode lock
1972  void regionClosing(RegionStateNode regionNode) throws IOException {
1973    transitStateAndUpdate(regionNode, State.CLOSING, STATES_EXPECTED_ON_CLOSING);
1974
1975    RegionInfo hri = regionNode.getRegionInfo();
1976    // Set meta has not initialized early. so people trying to create/edit tables will wait
1977    if (isMetaRegion(hri)) {
1978      setMetaAssigned(hri, false);
1979    }
1980    regionStates.addRegionToServer(regionNode);
1981    // update the operation count metrics
1982    metrics.incrementOperationCounter();
1983  }
1984
1985  // for open and close, they will first be persist to the procedure store in
1986  // RegionRemoteProcedureBase. So here we will first change the in memory state as it is considered
1987  // as succeeded if the persistence to procedure store is succeeded, and then when the
1988  // RegionRemoteProcedureBase is woken up, we will persist the RegionStateNode to hbase:meta.
1989
1990  // should be called under the RegionStateNode lock
1991  void regionOpenedWithoutPersistingToMeta(RegionStateNode regionNode) throws IOException {
1992    regionNode.transitionState(State.OPEN, STATES_EXPECTED_ON_OPEN);
1993    RegionInfo regionInfo = regionNode.getRegionInfo();
1994    regionStates.addRegionToServer(regionNode);
1995    regionStates.removeFromFailedOpen(regionInfo);
1996  }
1997
1998  // should be called under the RegionStateNode lock
1999  void regionClosedWithoutPersistingToMeta(RegionStateNode regionNode) throws IOException {
2000    ServerName regionLocation = regionNode.getRegionLocation();
2001    regionNode.transitionState(State.CLOSED, STATES_EXPECTED_ON_CLOSED);
2002    regionNode.setRegionLocation(null);
2003    if (regionLocation != null) {
2004      regionNode.setLastHost(regionLocation);
2005      regionStates.removeRegionFromServer(regionLocation, regionNode);
2006    }
2007  }
2008
2009  // should be called under the RegionStateNode lock
2010  // for SCP
2011  public void regionClosedAbnormally(RegionStateNode regionNode) throws IOException {
2012    RegionState.State state = regionNode.getState();
2013    ServerName regionLocation = regionNode.getRegionLocation();
2014    regionNode.transitionState(State.ABNORMALLY_CLOSED);
2015    regionNode.setRegionLocation(null);
2016    boolean succ = false;
2017    try {
2018      regionStateStore.updateRegionLocation(regionNode);
2019      succ = true;
2020    } finally {
2021      if (!succ) {
2022        // revert
2023        regionNode.setState(state);
2024        regionNode.setRegionLocation(regionLocation);
2025      }
2026    }
2027    if (regionLocation != null) {
2028      regionNode.setLastHost(regionLocation);
2029      regionStates.removeRegionFromServer(regionLocation, regionNode);
2030    }
2031  }
2032
2033  void persistToMeta(RegionStateNode regionNode) throws IOException {
2034    regionStateStore.updateRegionLocation(regionNode);
2035    RegionInfo regionInfo = regionNode.getRegionInfo();
2036    if (isMetaRegion(regionInfo) && regionNode.getState() == State.OPEN) {
2037      // Usually we'd set a table ENABLED at this stage but hbase:meta is ALWAYs enabled, it
2038      // can't be disabled -- so skip the RPC (besides... enabled is managed by TableStateManager
2039      // which is backed by hbase:meta... Avoid setting ENABLED to avoid having to update state
2040      // on table that contains state.
2041      setMetaAssigned(regionInfo, true);
2042    }
2043  }
2044
2045  // ============================================================================================
2046  // The above methods can only be called in TransitRegionStateProcedure(and related procedures)
2047  // ============================================================================================
2048
2049  public void markRegionAsSplit(final RegionInfo parent, final ServerName serverName,
2050    final RegionInfo daughterA, final RegionInfo daughterB) throws IOException {
2051    // Update hbase:meta. Parent will be marked offline and split up in hbase:meta.
2052    // The parent stays in regionStates until cleared when removed by CatalogJanitor.
2053    // Update its state in regionStates to it shows as offline and split when read
2054    // later figuring what regions are in a table and what are not: see
2055    // regionStates#getRegionsOfTable
2056    final RegionStateNode node = regionStates.getOrCreateRegionStateNode(parent);
2057    node.setState(State.SPLIT);
2058    final RegionStateNode nodeA = regionStates.getOrCreateRegionStateNode(daughterA);
2059    nodeA.setState(State.SPLITTING_NEW);
2060    final RegionStateNode nodeB = regionStates.getOrCreateRegionStateNode(daughterB);
2061    nodeB.setState(State.SPLITTING_NEW);
2062
2063    TableDescriptor td = master.getTableDescriptors().get(parent.getTable());
2064    // TODO: here we just update the parent region info in meta, to set split and offline to true,
2065    // without changing the one in the region node. This is a bit confusing but the region info
2066    // field in RegionStateNode is not expected to be changed in the current design. Need to find a
2067    // possible way to address this problem, or at least adding more comments about the trick to
2068    // deal with this problem, that when you want to filter out split parent, you need to check both
2069    // the RegionState on whether it is split, and also the region info. If one of them matches then
2070    // it is a split parent. And usually only one of them can match, as after restart, the region
2071    // state will be changed from SPLIT to CLOSED.
2072    regionStateStore.splitRegion(parent, daughterA, daughterB, serverName, td);
2073    if (shouldAssignFavoredNodes(parent)) {
2074      List<ServerName> onlineServers = this.master.getServerManager().getOnlineServersList();
2075      getFavoredNodePromoter().generateFavoredNodesForDaughter(onlineServers, parent, daughterA,
2076        daughterB);
2077    }
2078  }
2079
2080  /**
2081   * When called here, the merge has happened. The merged regions have been unassigned and the above
2082   * markRegionClosed has been called on each so they have been disassociated from a hosting Server.
2083   * The merged region will be open after this call. The merged regions are removed from hbase:meta
2084   * below. Later they are deleted from the filesystem by the catalog janitor running against
2085   * hbase:meta. It notices when the merged region no longer holds references to the old regions
2086   * (References are deleted after a compaction rewrites what the Reference points at but not until
2087   * the archiver chore runs, are the References removed).
2088   */
2089  public void markRegionAsMerged(final RegionInfo child, final ServerName serverName,
2090    RegionInfo[] mergeParents) throws IOException {
2091    final RegionStateNode node = regionStates.getOrCreateRegionStateNode(child);
2092    node.setState(State.MERGED);
2093    for (RegionInfo ri : mergeParents) {
2094      regionStates.deleteRegion(ri);
2095    }
2096    TableDescriptor td = master.getTableDescriptors().get(child.getTable());
2097    regionStateStore.mergeRegions(child, mergeParents, serverName, td);
2098    if (shouldAssignFavoredNodes(child)) {
2099      getFavoredNodePromoter().generateFavoredNodesForMergedRegion(child, mergeParents);
2100    }
2101  }
2102
2103  /*
2104   * Favored nodes should be applied only when FavoredNodes balancer is configured and the region
2105   * belongs to a non-system table.
2106   */
2107  private boolean shouldAssignFavoredNodes(RegionInfo region) {
2108    return this.shouldAssignRegionsWithFavoredNodes
2109      && FavoredNodesManager.isFavoredNodeApplicable(region);
2110  }
2111
2112  // ============================================================================================
2113  // Assign Queue (Assign/Balance)
2114  // ============================================================================================
2115  private final ArrayList<RegionStateNode> pendingAssignQueue = new ArrayList<RegionStateNode>();
2116  private final ReentrantLock assignQueueLock = new ReentrantLock();
2117  private final Condition assignQueueFullCond = assignQueueLock.newCondition();
2118
2119  /**
2120   * Add the assign operation to the assignment queue. The pending assignment operation will be
2121   * processed, and each region will be assigned by a server using the balancer.
2122   */
2123  protected void queueAssign(final RegionStateNode regionNode) {
2124    regionNode.getProcedureEvent().suspend();
2125
2126    // TODO: quick-start for meta and the other sys-tables?
2127    assignQueueLock.lock();
2128    try {
2129      pendingAssignQueue.add(regionNode);
2130      if (
2131        regionNode.isSystemTable() || pendingAssignQueue.size() == 1
2132          || pendingAssignQueue.size() >= assignDispatchWaitQueueMaxSize
2133      ) {
2134        assignQueueFullCond.signal();
2135      }
2136    } finally {
2137      assignQueueLock.unlock();
2138    }
2139  }
2140
2141  private void startAssignmentThread() {
2142    assignThread = new Thread(master.getServerName().toShortString()) {
2143      @Override
2144      public void run() {
2145        while (isRunning()) {
2146          processAssignQueue();
2147        }
2148        pendingAssignQueue.clear();
2149      }
2150    };
2151    assignThread.setDaemon(true);
2152    assignThread.start();
2153  }
2154
2155  private void stopAssignmentThread() {
2156    assignQueueSignal();
2157    try {
2158      while (assignThread.isAlive()) {
2159        assignQueueSignal();
2160        assignThread.join(250);
2161      }
2162    } catch (InterruptedException e) {
2163      LOG.warn("join interrupted", e);
2164      Thread.currentThread().interrupt();
2165    }
2166  }
2167
2168  private void assignQueueSignal() {
2169    assignQueueLock.lock();
2170    try {
2171      assignQueueFullCond.signal();
2172    } finally {
2173      assignQueueLock.unlock();
2174    }
2175  }
2176
2177  @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP")
2178  private HashMap<RegionInfo, RegionStateNode> waitOnAssignQueue() {
2179    HashMap<RegionInfo, RegionStateNode> regions = null;
2180
2181    assignQueueLock.lock();
2182    try {
2183      if (pendingAssignQueue.isEmpty() && isRunning()) {
2184        assignQueueFullCond.await();
2185      }
2186
2187      if (!isRunning()) {
2188        return null;
2189      }
2190      assignQueueFullCond.await(assignDispatchWaitMillis, TimeUnit.MILLISECONDS);
2191      regions = new HashMap<RegionInfo, RegionStateNode>(pendingAssignQueue.size());
2192      for (RegionStateNode regionNode : pendingAssignQueue) {
2193        regions.put(regionNode.getRegionInfo(), regionNode);
2194      }
2195      pendingAssignQueue.clear();
2196    } catch (InterruptedException e) {
2197      LOG.warn("got interrupted ", e);
2198      Thread.currentThread().interrupt();
2199    } finally {
2200      assignQueueLock.unlock();
2201    }
2202    return regions;
2203  }
2204
2205  private void processAssignQueue() {
2206    final HashMap<RegionInfo, RegionStateNode> regions = waitOnAssignQueue();
2207    if (regions == null || regions.size() == 0 || !isRunning()) {
2208      return;
2209    }
2210
2211    if (LOG.isTraceEnabled()) {
2212      LOG.trace("PROCESS ASSIGN QUEUE regionCount=" + regions.size());
2213    }
2214
2215    // TODO: Optimize balancer. pass a RegionPlan?
2216    final HashMap<RegionInfo, ServerName> retainMap = new HashMap<>();
2217    final List<RegionInfo> userHRIs = new ArrayList<>(regions.size());
2218    // Regions for system tables requiring reassignment
2219    final List<RegionInfo> systemHRIs = new ArrayList<>();
2220    for (RegionStateNode regionStateNode : regions.values()) {
2221      boolean sysTable = regionStateNode.isSystemTable();
2222      final List<RegionInfo> hris = sysTable ? systemHRIs : userHRIs;
2223      if (regionStateNode.getRegionLocation() != null) {
2224        retainMap.put(regionStateNode.getRegionInfo(), regionStateNode.getRegionLocation());
2225      } else {
2226        hris.add(regionStateNode.getRegionInfo());
2227      }
2228    }
2229
2230    // TODO: connect with the listener to invalidate the cache
2231
2232    // TODO use events
2233    List<ServerName> servers = master.getServerManager().createDestinationServersList();
2234    for (int i = 0; servers.size() < 1; ++i) {
2235      // Report every fourth time around this loop; try not to flood log.
2236      if (i % 4 == 0) {
2237        LOG.warn("No servers available; cannot place " + regions.size() + " unassigned regions.");
2238      }
2239
2240      if (!isRunning()) {
2241        LOG.debug("Stopped! Dropping assign of " + regions.size() + " queued regions.");
2242        return;
2243      }
2244      Threads.sleep(250);
2245      servers = master.getServerManager().createDestinationServersList();
2246    }
2247
2248    if (!systemHRIs.isEmpty()) {
2249      // System table regions requiring reassignment are present, get region servers
2250      // not available for system table regions
2251      final List<ServerName> excludeServers = getExcludedServersForSystemTable();
2252      List<ServerName> serversForSysTables =
2253        servers.stream().filter(s -> !excludeServers.contains(s)).collect(Collectors.toList());
2254      if (serversForSysTables.isEmpty()) {
2255        LOG.warn("Filtering old server versions and the excluded produced an empty set; "
2256          + "instead considering all candidate servers!");
2257      }
2258      LOG.debug("Processing assignQueue; systemServersCount=" + serversForSysTables.size()
2259        + ", allServersCount=" + servers.size());
2260      processAssignmentPlans(regions, null, systemHRIs,
2261        serversForSysTables.isEmpty() && !containsBogusAssignments(regions, systemHRIs)
2262          ? servers
2263          : serversForSysTables);
2264    }
2265
2266    processAssignmentPlans(regions, retainMap, userHRIs, servers);
2267  }
2268
2269  private boolean containsBogusAssignments(Map<RegionInfo, RegionStateNode> regions,
2270    List<RegionInfo> hirs) {
2271    for (RegionInfo ri : hirs) {
2272      if (
2273        regions.get(ri).getRegionLocation() != null
2274          && regions.get(ri).getRegionLocation().equals(LoadBalancer.BOGUS_SERVER_NAME)
2275      ) {
2276        return true;
2277      }
2278    }
2279    return false;
2280  }
2281
2282  private void processAssignmentPlans(final HashMap<RegionInfo, RegionStateNode> regions,
2283    final HashMap<RegionInfo, ServerName> retainMap, final List<RegionInfo> hris,
2284    final List<ServerName> servers) {
2285    boolean isTraceEnabled = LOG.isTraceEnabled();
2286    if (isTraceEnabled) {
2287      LOG.trace("Available servers count=" + servers.size() + ": " + servers);
2288    }
2289
2290    final LoadBalancer balancer = getBalancer();
2291    // ask the balancer where to place regions
2292    if (retainMap != null && !retainMap.isEmpty()) {
2293      if (isTraceEnabled) {
2294        LOG.trace("retain assign regions=" + retainMap);
2295      }
2296      try {
2297        acceptPlan(regions, balancer.retainAssignment(retainMap, servers));
2298      } catch (IOException e) {
2299        LOG.warn("unable to retain assignment", e);
2300        addToPendingAssignment(regions, retainMap.keySet());
2301      }
2302    }
2303
2304    // TODO: Do we need to split retain and round-robin?
2305    // the retain seems to fallback to round-robin/random if the region is not in the map.
2306    if (!hris.isEmpty()) {
2307      Collections.sort(hris, RegionInfo.COMPARATOR);
2308      if (isTraceEnabled) {
2309        LOG.trace("round robin regions=" + hris);
2310      }
2311      try {
2312        acceptPlan(regions, balancer.roundRobinAssignment(hris, servers));
2313      } catch (IOException e) {
2314        LOG.warn("unable to round-robin assignment", e);
2315        addToPendingAssignment(regions, hris);
2316      }
2317    }
2318  }
2319
2320  private void acceptPlan(final HashMap<RegionInfo, RegionStateNode> regions,
2321    final Map<ServerName, List<RegionInfo>> plan) throws HBaseIOException {
2322    final ProcedureEvent<?>[] events = new ProcedureEvent[regions.size()];
2323    final long st = EnvironmentEdgeManager.currentTime();
2324
2325    if (plan.isEmpty()) {
2326      throw new HBaseIOException("unable to compute plans for regions=" + regions.size());
2327    }
2328
2329    int evcount = 0;
2330    for (Map.Entry<ServerName, List<RegionInfo>> entry : plan.entrySet()) {
2331      final ServerName server = entry.getKey();
2332      for (RegionInfo hri : entry.getValue()) {
2333        final RegionStateNode regionNode = regions.get(hri);
2334        regionNode.setRegionLocation(server);
2335        if (server.equals(LoadBalancer.BOGUS_SERVER_NAME) && regionNode.isSystemTable()) {
2336          assignQueueLock.lock();
2337          try {
2338            pendingAssignQueue.add(regionNode);
2339          } finally {
2340            assignQueueLock.unlock();
2341          }
2342        } else {
2343          events[evcount++] = regionNode.getProcedureEvent();
2344        }
2345      }
2346    }
2347    ProcedureEvent.wakeEvents(getProcedureScheduler(), events);
2348
2349    final long et = EnvironmentEdgeManager.currentTime();
2350    if (LOG.isTraceEnabled()) {
2351      LOG.trace("ASSIGN ACCEPT " + events.length + " -> " + StringUtils.humanTimeDiff(et - st));
2352    }
2353  }
2354
2355  private void addToPendingAssignment(final HashMap<RegionInfo, RegionStateNode> regions,
2356    final Collection<RegionInfo> pendingRegions) {
2357    assignQueueLock.lock();
2358    try {
2359      for (RegionInfo hri : pendingRegions) {
2360        pendingAssignQueue.add(regions.get(hri));
2361      }
2362    } finally {
2363      assignQueueLock.unlock();
2364    }
2365  }
2366
2367  /**
2368   * For a given cluster with mixed versions of servers, get a list of servers with lower versions,
2369   * where system table regions should not be assigned to. For system table, we must assign regions
2370   * to a server with highest version. However, we can disable this exclusion using config:
2371   * "hbase.min.version.move.system.tables" if checkForMinVersion is true. Detailed explanation
2372   * available with definition of minVersionToMoveSysTables.
2373   * @return List of Excluded servers for System table regions.
2374   */
2375  public List<ServerName> getExcludedServersForSystemTable() {
2376    // TODO: This should be a cached list kept by the ServerManager rather than calculated on each
2377    // move or system region assign. The RegionServerTracker keeps list of online Servers with
2378    // RegionServerInfo that includes Version.
2379    List<Pair<ServerName, String>> serverList =
2380      master.getServerManager().getOnlineServersList().stream()
2381        .map(s -> new Pair<>(s, master.getRegionServerVersion(s))).collect(Collectors.toList());
2382    if (serverList.isEmpty()) {
2383      return new ArrayList<>();
2384    }
2385    String highestVersion = Collections
2386      .max(serverList, (o1, o2) -> VersionInfo.compareVersion(o1.getSecond(), o2.getSecond()))
2387      .getSecond();
2388    if (!DEFAULT_MIN_VERSION_MOVE_SYS_TABLES_CONFIG.equals(minVersionToMoveSysTables)) {
2389      int comparedValue = VersionInfo.compareVersion(minVersionToMoveSysTables, highestVersion);
2390      if (comparedValue > 0) {
2391        return new ArrayList<>();
2392      }
2393    }
2394    return serverList.stream().filter(pair -> !pair.getSecond().equals(highestVersion))
2395      .map(Pair::getFirst).collect(Collectors.toList());
2396  }
2397
2398  MasterServices getMaster() {
2399    return master;
2400  }
2401
2402  /**
2403   * @return a snapshot of rsReports
2404   */
2405  public Map<ServerName, Set<byte[]>> getRSReports() {
2406    Map<ServerName, Set<byte[]>> rsReportsSnapshot = new HashMap<>();
2407    synchronized (rsReports) {
2408      rsReports.entrySet().forEach(e -> rsReportsSnapshot.put(e.getKey(), e.getValue()));
2409    }
2410    return rsReportsSnapshot;
2411  }
2412
2413  /**
2414   * Provide regions state count for given table. e.g howmany regions of give table are
2415   * opened/closed/rit etc
2416   * @param tableName TableName
2417   * @return region states count
2418   */
2419  public RegionStatesCount getRegionStatesCount(TableName tableName) {
2420    int openRegionsCount = 0;
2421    int closedRegionCount = 0;
2422    int ritCount = 0;
2423    int splitRegionCount = 0;
2424    int totalRegionCount = 0;
2425    if (!isTableDisabled(tableName)) {
2426      final List<RegionState> states = regionStates.getTableRegionStates(tableName);
2427      for (RegionState regionState : states) {
2428        if (regionState.isOpened()) {
2429          openRegionsCount++;
2430        } else if (regionState.isClosed()) {
2431          closedRegionCount++;
2432        } else if (regionState.isSplit()) {
2433          splitRegionCount++;
2434        }
2435      }
2436      totalRegionCount = states.size();
2437      ritCount = totalRegionCount - openRegionsCount - splitRegionCount;
2438    }
2439    return new RegionStatesCount.RegionStatesCountBuilder().setOpenRegions(openRegionsCount)
2440      .setClosedRegions(closedRegionCount).setSplitRegions(splitRegionCount)
2441      .setRegionsInTransition(ritCount).setTotalRegions(totalRegionCount).build();
2442  }
2443
2444}