001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.assignment;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Collection;
023import java.util.Collections;
024import java.util.HashMap;
025import java.util.HashSet;
026import java.util.List;
027import java.util.Map;
028import java.util.Set;
029import java.util.concurrent.Future;
030import java.util.concurrent.TimeUnit;
031import java.util.concurrent.atomic.AtomicBoolean;
032import java.util.concurrent.locks.Condition;
033import java.util.concurrent.locks.ReentrantLock;
034import java.util.stream.Collectors;
035import java.util.stream.Stream;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.hbase.DoNotRetryIOException;
038import org.apache.hadoop.hbase.HBaseIOException;
039import org.apache.hadoop.hbase.HConstants;
040import org.apache.hadoop.hbase.MetaTableAccessor;
041import org.apache.hadoop.hbase.PleaseHoldException;
042import org.apache.hadoop.hbase.ServerName;
043import org.apache.hadoop.hbase.TableName;
044import org.apache.hadoop.hbase.UnknownRegionException;
045import org.apache.hadoop.hbase.client.DoNotRetryRegionException;
046import org.apache.hadoop.hbase.client.MasterSwitchType;
047import org.apache.hadoop.hbase.client.RegionInfo;
048import org.apache.hadoop.hbase.client.RegionInfoBuilder;
049import org.apache.hadoop.hbase.client.RegionReplicaUtil;
050import org.apache.hadoop.hbase.client.RegionStatesCount;
051import org.apache.hadoop.hbase.client.Result;
052import org.apache.hadoop.hbase.client.ResultScanner;
053import org.apache.hadoop.hbase.client.Scan;
054import org.apache.hadoop.hbase.client.TableState;
055import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
056import org.apache.hadoop.hbase.favored.FavoredNodesManager;
057import org.apache.hadoop.hbase.favored.FavoredNodesPromoter;
058import org.apache.hadoop.hbase.master.LoadBalancer;
059import org.apache.hadoop.hbase.master.MasterServices;
060import org.apache.hadoop.hbase.master.MetricsAssignmentManager;
061import org.apache.hadoop.hbase.master.RegionPlan;
062import org.apache.hadoop.hbase.master.RegionState;
063import org.apache.hadoop.hbase.master.RegionState.State;
064import org.apache.hadoop.hbase.master.ServerManager;
065import org.apache.hadoop.hbase.master.TableStateManager;
066import org.apache.hadoop.hbase.master.balancer.FavoredStochasticBalancer;
067import org.apache.hadoop.hbase.master.procedure.HBCKServerCrashProcedure;
068import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
069import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler;
070import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait;
071import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
072import org.apache.hadoop.hbase.master.region.MasterRegion;
073import org.apache.hadoop.hbase.procedure2.Procedure;
074import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
075import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
076import org.apache.hadoop.hbase.procedure2.ProcedureInMemoryChore;
077import org.apache.hadoop.hbase.procedure2.util.StringUtils;
078import org.apache.hadoop.hbase.regionserver.SequenceId;
079import org.apache.hadoop.hbase.util.Bytes;
080import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
081import org.apache.hadoop.hbase.util.Pair;
082import org.apache.hadoop.hbase.util.Threads;
083import org.apache.hadoop.hbase.util.VersionInfo;
084import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
085import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
086import org.apache.yetus.audience.InterfaceAudience;
087import org.apache.zookeeper.KeeperException;
088import org.slf4j.Logger;
089import org.slf4j.LoggerFactory;
090
091import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
092import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
093import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
094import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
095import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
096
097/**
098 * The AssignmentManager is the coordinator for region assign/unassign operations.
099 * <ul>
100 * <li>In-memory states of regions and servers are stored in {@link RegionStates}.</li>
101 * <li>hbase:meta state updates are handled by {@link RegionStateStore}.</li>
102 * </ul>
103 * Regions are created by CreateTable, Split, Merge. Regions are deleted by DeleteTable, Split,
104 * Merge. Assigns are triggered by CreateTable, EnableTable, Split, Merge, ServerCrash. Unassigns
105 * are triggered by DisableTable, Split, Merge
106 */
107@InterfaceAudience.Private
108public class AssignmentManager {
109  private static final Logger LOG = LoggerFactory.getLogger(AssignmentManager.class);
110
111  // TODO: AMv2
112  // - handle region migration from hbase1 to hbase2.
113  // - handle sys table assignment first (e.g. acl, namespace)
114  // - handle table priorities
115  // - If ServerBusyException trying to update hbase:meta, we abort the Master
116  // See updateRegionLocation in RegionStateStore.
117  //
118  // See also
119  // https://docs.google.com/document/d/1eVKa7FHdeoJ1-9o8yZcOTAQbv0u0bblBlCCzVSIn69g/edit#heading=h.ystjyrkbtoq5
120  // for other TODOs.
121
122  public static final String BOOTSTRAP_THREAD_POOL_SIZE_CONF_KEY =
123    "hbase.assignment.bootstrap.thread.pool.size";
124
125  public static final String ASSIGN_DISPATCH_WAIT_MSEC_CONF_KEY =
126    "hbase.assignment.dispatch.wait.msec";
127  private static final int DEFAULT_ASSIGN_DISPATCH_WAIT_MSEC = 150;
128
129  public static final String ASSIGN_DISPATCH_WAITQ_MAX_CONF_KEY =
130    "hbase.assignment.dispatch.wait.queue.max.size";
131  private static final int DEFAULT_ASSIGN_DISPATCH_WAITQ_MAX = 100;
132
133  public static final String RIT_CHORE_INTERVAL_MSEC_CONF_KEY =
134    "hbase.assignment.rit.chore.interval.msec";
135  private static final int DEFAULT_RIT_CHORE_INTERVAL_MSEC = 60 * 1000;
136
137  public static final String DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC_CONF_KEY =
138    "hbase.assignment.dead.region.metric.chore.interval.msec";
139  private static final int DEFAULT_DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC = 120 * 1000;
140
141  public static final String ASSIGN_MAX_ATTEMPTS = "hbase.assignment.maximum.attempts";
142  private static final int DEFAULT_ASSIGN_MAX_ATTEMPTS = Integer.MAX_VALUE;
143
144  public static final String ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS =
145    "hbase.assignment.retry.immediately.maximum.attempts";
146  private static final int DEFAULT_ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS = 3;
147
148  /** Region in Transition metrics threshold time */
149  public static final String METRICS_RIT_STUCK_WARNING_THRESHOLD =
150    "hbase.metrics.rit.stuck.warning.threshold";
151  private static final int DEFAULT_RIT_STUCK_WARNING_THRESHOLD = 60 * 1000;
152  public static final String UNEXPECTED_STATE_REGION = "Unexpected state for ";
153
154  private final ProcedureEvent<?> metaAssignEvent = new ProcedureEvent<>("meta assign");
155  private final ProcedureEvent<?> metaLoadEvent = new ProcedureEvent<>("meta load");
156
157  private final MetricsAssignmentManager metrics;
158  private final RegionInTransitionChore ritChore;
159  private final DeadServerMetricRegionChore deadMetricChore;
160  private final MasterServices master;
161
162  private final AtomicBoolean running = new AtomicBoolean(false);
163  private final RegionStates regionStates = new RegionStates();
164  private final RegionStateStore regionStateStore;
165
166  /**
167   * When the operator uses this configuration option, any version between the current cluster
168   * version and the value of "hbase.min.version.move.system.tables" does not trigger any
169   * auto-region movement. Auto-region movement here refers to auto-migration of system table
170   * regions to newer server versions. It is assumed that the configured range of versions does not
171   * require special handling of moving system table regions to higher versioned RegionServer. This
172   * auto-migration is done by {@link #checkIfShouldMoveSystemRegionAsync()}. Example: Let's assume
173   * the cluster is on version 1.4.0 and we have set "hbase.min.version.move.system.tables" as
174   * "2.0.0". Now if we upgrade one RegionServer on 1.4.0 cluster to 1.6.0 (< 2.0.0), then
175   * AssignmentManager will not move hbase:meta, hbase:namespace and other system table regions to
176   * newly brought up RegionServer 1.6.0 as part of auto-migration. However, if we upgrade one
177   * RegionServer on 1.4.0 cluster to 2.2.0 (> 2.0.0), then AssignmentManager will move all system
178   * table regions to newly brought up RegionServer 2.2.0 as part of auto-migration done by
179   * {@link #checkIfShouldMoveSystemRegionAsync()}. "hbase.min.version.move.system.tables" is
180   * introduced as part of HBASE-22923.
181   */
182  private final String minVersionToMoveSysTables;
183
184  private static final String MIN_VERSION_MOVE_SYS_TABLES_CONFIG =
185    "hbase.min.version.move.system.tables";
186  private static final String DEFAULT_MIN_VERSION_MOVE_SYS_TABLES_CONFIG = "";
187
188  private final Map<ServerName, Set<byte[]>> rsReports = new HashMap<>();
189
190  private final boolean shouldAssignRegionsWithFavoredNodes;
191  private final int assignDispatchWaitQueueMaxSize;
192  private final int assignDispatchWaitMillis;
193  private final int assignMaxAttempts;
194  private final int assignRetryImmediatelyMaxAttempts;
195
196  private final MasterRegion masterRegion;
197
198  private final Object checkIfShouldMoveSystemRegionLock = new Object();
199
200  private Thread assignThread;
201
202  public AssignmentManager(MasterServices master, MasterRegion masterRegion) {
203    this(master, masterRegion, new RegionStateStore(master, masterRegion));
204  }
205
206  AssignmentManager(MasterServices master, MasterRegion masterRegion, RegionStateStore stateStore) {
207    this.master = master;
208    this.regionStateStore = stateStore;
209    this.metrics = new MetricsAssignmentManager();
210    this.masterRegion = masterRegion;
211
212    final Configuration conf = master.getConfiguration();
213
214    // Only read favored nodes if using the favored nodes load balancer.
215    this.shouldAssignRegionsWithFavoredNodes = FavoredStochasticBalancer.class
216      .isAssignableFrom(conf.getClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, Object.class));
217
218    this.assignDispatchWaitMillis =
219      conf.getInt(ASSIGN_DISPATCH_WAIT_MSEC_CONF_KEY, DEFAULT_ASSIGN_DISPATCH_WAIT_MSEC);
220    this.assignDispatchWaitQueueMaxSize =
221      conf.getInt(ASSIGN_DISPATCH_WAITQ_MAX_CONF_KEY, DEFAULT_ASSIGN_DISPATCH_WAITQ_MAX);
222
223    this.assignMaxAttempts =
224      Math.max(1, conf.getInt(ASSIGN_MAX_ATTEMPTS, DEFAULT_ASSIGN_MAX_ATTEMPTS));
225    this.assignRetryImmediatelyMaxAttempts = conf.getInt(ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS,
226      DEFAULT_ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS);
227
228    int ritChoreInterval =
229      conf.getInt(RIT_CHORE_INTERVAL_MSEC_CONF_KEY, DEFAULT_RIT_CHORE_INTERVAL_MSEC);
230    this.ritChore = new RegionInTransitionChore(ritChoreInterval);
231
232    int deadRegionChoreInterval = conf.getInt(DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC_CONF_KEY,
233      DEFAULT_DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC);
234    if (deadRegionChoreInterval > 0) {
235      this.deadMetricChore = new DeadServerMetricRegionChore(deadRegionChoreInterval);
236    } else {
237      this.deadMetricChore = null;
238    }
239    minVersionToMoveSysTables =
240      conf.get(MIN_VERSION_MOVE_SYS_TABLES_CONFIG, DEFAULT_MIN_VERSION_MOVE_SYS_TABLES_CONFIG);
241  }
242
243  private void mirrorMetaLocations() throws IOException, KeeperException {
244    // For compatibility, mirror the meta region state to zookeeper
245    // And we still need to use zookeeper to publish the meta region locations to region
246    // server, so they can serve as ClientMetaService
247    ZKWatcher zk = master.getZooKeeper();
248    if (zk == null || !zk.getRecoverableZooKeeper().getState().isAlive()) {
249      // this is possible in tests, we do not provide a zk watcher or the zk watcher has been closed
250      return;
251    }
252    Collection<RegionStateNode> metaStates = regionStates.getRegionStateNodes();
253    for (RegionStateNode metaState : metaStates) {
254      MetaTableLocator.setMetaLocation(zk, metaState.getRegionLocation(),
255        metaState.getRegionInfo().getReplicaId(), metaState.getState());
256    }
257    int replicaCount = metaStates.size();
258    // remove extra mirror locations
259    for (String znode : zk.getMetaReplicaNodes()) {
260      int replicaId = zk.getZNodePaths().getMetaReplicaIdFromZNode(znode);
261      if (replicaId >= replicaCount) {
262        MetaTableLocator.deleteMetaLocation(zk, replicaId);
263      }
264    }
265  }
266
267  public void start() throws IOException, KeeperException {
268    if (!running.compareAndSet(false, true)) {
269      return;
270    }
271
272    LOG.trace("Starting assignment manager");
273
274    // Start the Assignment Thread
275    startAssignmentThread();
276    // load meta region states.
277    // here we are still in the early steps of active master startup. There is only one thread(us)
278    // can access AssignmentManager and create region node, so here we do not need to lock the
279    // region node.
280    try (ResultScanner scanner =
281      masterRegion.getScanner(new Scan().addFamily(HConstants.CATALOG_FAMILY))) {
282      for (;;) {
283        Result result = scanner.next();
284        if (result == null) {
285          break;
286        }
287        RegionStateStore
288          .visitMetaEntry((r, regionInfo, state, regionLocation, lastHost, openSeqNum) -> {
289            RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo);
290            regionNode.setState(state);
291            regionNode.setLastHost(lastHost);
292            regionNode.setRegionLocation(regionLocation);
293            regionNode.setOpenSeqNum(openSeqNum);
294            if (regionNode.getProcedure() != null) {
295              regionNode.getProcedure().stateLoaded(this, regionNode);
296            }
297            if (regionLocation != null) {
298              regionStates.addRegionToServer(regionNode);
299            }
300            if (RegionReplicaUtil.isDefaultReplica(regionInfo.getReplicaId())) {
301              setMetaAssigned(regionInfo, state == State.OPEN);
302            }
303            LOG.debug("Loaded hbase:meta {}", regionNode);
304          }, result);
305      }
306    }
307    mirrorMetaLocations();
308  }
309
310  /**
311   * Create RegionStateNode based on the TRSP list, and attach the TRSP to the RegionStateNode.
312   * <p>
313   * This is used to restore the RIT region list, so we do not need to restore it in the loadingMeta
314   * method below. And it is also very important as now before submitting a TRSP, we need to attach
315   * it to the RegionStateNode, which acts like a guard, so we need to restore this information at
316   * the very beginning, before we start processing any procedures.
317   */
318  public void setupRIT(List<TransitRegionStateProcedure> procs) {
319    procs.forEach(proc -> {
320      RegionInfo regionInfo = proc.getRegion();
321      RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo);
322      TransitRegionStateProcedure existingProc = regionNode.getProcedure();
323      if (existingProc != null) {
324        // This is possible, as we will detach the procedure from the RSN before we
325        // actually finish the procedure. This is because that, we will detach the TRSP from the RSN
326        // during execution, at that time, the procedure has not been marked as done in the pv2
327        // framework yet, so it is possible that we schedule a new TRSP immediately and when
328        // arriving here, we will find out that there are multiple TRSPs for the region. But we can
329        // make sure that, only the last one can take the charge, the previous ones should have all
330        // been finished already. So here we will compare the proc id, the greater one will win.
331        if (existingProc.getProcId() < proc.getProcId()) {
332          // the new one wins, unset and set it to the new one below
333          regionNode.unsetProcedure(existingProc);
334        } else {
335          // the old one wins, skip
336          return;
337        }
338      }
339      LOG.info("Attach {} to {} to restore RIT", proc, regionNode);
340      regionNode.setProcedure(proc);
341    });
342  }
343
344  public void stop() {
345    if (!running.compareAndSet(true, false)) {
346      return;
347    }
348
349    LOG.info("Stopping assignment manager");
350
351    // The AM is started before the procedure executor,
352    // but the actual work will be loaded/submitted only once we have the executor
353    final boolean hasProcExecutor = master.getMasterProcedureExecutor() != null;
354
355    // Remove the RIT chore
356    if (hasProcExecutor) {
357      master.getMasterProcedureExecutor().removeChore(this.ritChore);
358      if (this.deadMetricChore != null) {
359        master.getMasterProcedureExecutor().removeChore(this.deadMetricChore);
360      }
361    }
362
363    // Stop the Assignment Thread
364    stopAssignmentThread();
365
366    // Stop the RegionStateStore
367    regionStates.clear();
368
369    // Update meta events (for testing)
370    if (hasProcExecutor) {
371      metaLoadEvent.suspend();
372      for (RegionInfo hri : getMetaRegionSet()) {
373        setMetaAssigned(hri, false);
374      }
375    }
376  }
377
378  public boolean isRunning() {
379    return running.get();
380  }
381
382  public Configuration getConfiguration() {
383    return master.getConfiguration();
384  }
385
386  public MetricsAssignmentManager getAssignmentManagerMetrics() {
387    return metrics;
388  }
389
390  private LoadBalancer getBalancer() {
391    return master.getLoadBalancer();
392  }
393
394  private MasterProcedureEnv getProcedureEnvironment() {
395    return master.getMasterProcedureExecutor().getEnvironment();
396  }
397
398  private MasterProcedureScheduler getProcedureScheduler() {
399    return getProcedureEnvironment().getProcedureScheduler();
400  }
401
402  int getAssignMaxAttempts() {
403    return assignMaxAttempts;
404  }
405
406  int getAssignRetryImmediatelyMaxAttempts() {
407    return assignRetryImmediatelyMaxAttempts;
408  }
409
410  public RegionStates getRegionStates() {
411    return regionStates;
412  }
413
414  /**
415   * Returns the regions hosted by the specified server.
416   * <p/>
417   * Notice that, for SCP, after we submit the SCP, no one can change the region list for the
418   * ServerStateNode so we do not need any locks here. And for other usage, this can only give you a
419   * snapshot of the current region list for this server, which means, right after you get the
420   * region list, new regions may be moved to this server or some regions may be moved out from this
421   * server, so you should not use it critically if you need strong consistency.
422   */
423  public List<RegionInfo> getRegionsOnServer(ServerName serverName) {
424    ServerStateNode serverInfo = regionStates.getServerNode(serverName);
425    if (serverInfo == null) {
426      return Collections.emptyList();
427    }
428    return serverInfo.getRegionInfoList();
429  }
430
431  private RegionInfo getRegionInfo(RegionStateNode rsn) {
432    if (rsn.isSplit() && !rsn.getRegionInfo().isSplit()) {
433      // see the comments in markRegionAsSplit on why we need to do this converting.
434      return RegionInfoBuilder.newBuilder(rsn.getRegionInfo()).setSplit(true).setOffline(true)
435        .build();
436    } else {
437      return rsn.getRegionInfo();
438    }
439  }
440
441  private Stream<RegionStateNode> getRegionStateNodes(TableName tableName,
442    boolean excludeOfflinedSplitParents) {
443    Stream<RegionStateNode> stream = regionStates.getTableRegionStateNodes(tableName).stream();
444    if (excludeOfflinedSplitParents) {
445      return stream.filter(rsn -> !rsn.isSplit());
446    } else {
447      return stream;
448    }
449  }
450
451  public List<RegionInfo> getTableRegions(TableName tableName,
452    boolean excludeOfflinedSplitParents) {
453    return getRegionStateNodes(tableName, excludeOfflinedSplitParents).map(this::getRegionInfo)
454      .collect(Collectors.toList());
455  }
456
457  public List<Pair<RegionInfo, ServerName>> getTableRegionsAndLocations(TableName tableName,
458    boolean excludeOfflinedSplitParents) {
459    return getRegionStateNodes(tableName, excludeOfflinedSplitParents)
460      .map(rsn -> Pair.newPair(getRegionInfo(rsn), rsn.getRegionLocation()))
461      .collect(Collectors.toList());
462  }
463
464  public RegionStateStore getRegionStateStore() {
465    return regionStateStore;
466  }
467
468  public List<ServerName> getFavoredNodes(final RegionInfo regionInfo) {
469    return this.shouldAssignRegionsWithFavoredNodes
470      ? ((FavoredStochasticBalancer) getBalancer()).getFavoredNodes(regionInfo)
471      : ServerName.EMPTY_SERVER_LIST;
472  }
473
474  // ============================================================================================
475  // Table State Manager helpers
476  // ============================================================================================
477  private TableStateManager getTableStateManager() {
478    return master.getTableStateManager();
479  }
480
481  private boolean isTableEnabled(final TableName tableName) {
482    return getTableStateManager().isTableState(tableName, TableState.State.ENABLED);
483  }
484
485  private boolean isTableDisabled(final TableName tableName) {
486    return getTableStateManager().isTableState(tableName, TableState.State.DISABLED,
487      TableState.State.DISABLING);
488  }
489
490  // ============================================================================================
491  // META Helpers
492  // ============================================================================================
493  private boolean isMetaRegion(final RegionInfo regionInfo) {
494    return regionInfo.isMetaRegion();
495  }
496
497  public boolean isMetaRegion(final byte[] regionName) {
498    return getMetaRegionFromName(regionName) != null;
499  }
500
501  public RegionInfo getMetaRegionFromName(final byte[] regionName) {
502    for (RegionInfo hri : getMetaRegionSet()) {
503      if (Bytes.equals(hri.getRegionName(), regionName)) {
504        return hri;
505      }
506    }
507    return null;
508  }
509
510  public boolean isCarryingMeta(final ServerName serverName) {
511    // TODO: handle multiple meta
512    return isCarryingRegion(serverName, RegionInfoBuilder.FIRST_META_REGIONINFO);
513  }
514
515  private boolean isCarryingRegion(final ServerName serverName, final RegionInfo regionInfo) {
516    // TODO: check for state?
517    final RegionStateNode node = regionStates.getRegionStateNode(regionInfo);
518    return (node != null && serverName.equals(node.getRegionLocation()));
519  }
520
521  private RegionInfo getMetaForRegion(final RegionInfo regionInfo) {
522    // if (regionInfo.isMetaRegion()) return regionInfo;
523    // TODO: handle multiple meta. if the region provided is not meta lookup
524    // which meta the region belongs to.
525    return RegionInfoBuilder.FIRST_META_REGIONINFO;
526  }
527
528  // TODO: handle multiple meta.
529  private static final Set<RegionInfo> META_REGION_SET =
530    Collections.singleton(RegionInfoBuilder.FIRST_META_REGIONINFO);
531
532  public Set<RegionInfo> getMetaRegionSet() {
533    return META_REGION_SET;
534  }
535
536  // ============================================================================================
537  // META Event(s) helpers
538  // ============================================================================================
539  /**
540   * Notice that, this only means the meta region is available on a RS, but the AM may still be
541   * loading the region states from meta, so usually you need to check {@link #isMetaLoaded()} first
542   * before checking this method, unless you can make sure that your piece of code can only be
543   * executed after AM builds the region states.
544   * @see #isMetaLoaded()
545   */
546  public boolean isMetaAssigned() {
547    return metaAssignEvent.isReady();
548  }
549
550  public boolean isMetaRegionInTransition() {
551    return !isMetaAssigned();
552  }
553
554  /**
555   * Notice that this event does not mean the AM has already finished region state rebuilding. See
556   * the comment of {@link #isMetaAssigned()} for more details.
557   * @see #isMetaAssigned()
558   */
559  public boolean waitMetaAssigned(Procedure<?> proc, RegionInfo regionInfo) {
560    return getMetaAssignEvent(getMetaForRegion(regionInfo)).suspendIfNotReady(proc);
561  }
562
563  private void setMetaAssigned(RegionInfo metaRegionInfo, boolean assigned) {
564    assert isMetaRegion(metaRegionInfo) : "unexpected non-meta region " + metaRegionInfo;
565    ProcedureEvent<?> metaAssignEvent = getMetaAssignEvent(metaRegionInfo);
566    if (assigned) {
567      metaAssignEvent.wake(getProcedureScheduler());
568    } else {
569      metaAssignEvent.suspend();
570    }
571  }
572
573  private ProcedureEvent<?> getMetaAssignEvent(RegionInfo metaRegionInfo) {
574    assert isMetaRegion(metaRegionInfo) : "unexpected non-meta region " + metaRegionInfo;
575    // TODO: handle multiple meta.
576    return metaAssignEvent;
577  }
578
579  /**
580   * Wait until AM finishes the meta loading, i.e, the region states rebuilding.
581   * @see #isMetaLoaded()
582   * @see #waitMetaAssigned(Procedure, RegionInfo)
583   */
584  public boolean waitMetaLoaded(Procedure<?> proc) {
585    return metaLoadEvent.suspendIfNotReady(proc);
586  }
587
588  /**
589   * This method will be called in master initialization method after calling
590   * {@link #processOfflineRegions()}, as in processOfflineRegions we will generate assign
591   * procedures for offline regions, which may be conflict with creating table.
592   * <p/>
593   * This is a bit dirty, should be reconsidered after we decide whether to keep the
594   * {@link #processOfflineRegions()} method.
595   */
596  public void wakeMetaLoadedEvent() {
597    metaLoadEvent.wake(getProcedureScheduler());
598    assert isMetaLoaded() : "expected meta to be loaded";
599  }
600
601  /**
602   * Return whether AM finishes the meta loading, i.e, the region states rebuilding.
603   * @see #isMetaAssigned()
604   * @see #waitMetaLoaded(Procedure)
605   */
606  public boolean isMetaLoaded() {
607    return metaLoadEvent.isReady();
608  }
609
610  /**
611   * Start a new thread to check if there are region servers whose versions are higher than others.
612   * If so, move all system table regions to RS with the highest version to keep compatibility. The
613   * reason is, RS in new version may not be able to access RS in old version when there are some
614   * incompatible changes.
615   * <p>
616   * This method is called when a new RegionServer is added to cluster only.
617   * </p>
618   */
619  public void checkIfShouldMoveSystemRegionAsync() {
620    // TODO: Fix this thread. If a server is killed and a new one started, this thread thinks that
621    // it should 'move' the system tables from the old server to the new server but
622    // ServerCrashProcedure is on it; and it will take care of the assign without dataloss.
623    if (this.master.getServerManager().countOfRegionServers() <= 1) {
624      return;
625    }
626    // This thread used to run whenever there was a change in the cluster. The ZooKeeper
627    // childrenChanged notification came in before the nodeDeleted message and so this method
628    // cold run before a ServerCrashProcedure could run. That meant that this thread could see
629    // a Crashed Server before ServerCrashProcedure and it could find system regions on the
630    // crashed server and go move them before ServerCrashProcedure had a chance; could be
631    // dataloss too if WALs were not recovered.
632    new Thread(() -> {
633      try {
634        synchronized (checkIfShouldMoveSystemRegionLock) {
635          List<RegionPlan> plans = new ArrayList<>();
636          // TODO: I don't think this code does a good job if all servers in cluster have same
637          // version. It looks like it will schedule unnecessary moves.
638          for (ServerName server : getExcludedServersForSystemTable()) {
639            if (master.getServerManager().isServerDead(server)) {
640              // TODO: See HBASE-18494 and HBASE-18495. Though getExcludedServersForSystemTable()
641              // considers only online servers, the server could be queued for dead server
642              // processing. As region assignments for crashed server is handled by
643              // ServerCrashProcedure, do NOT handle them here. The goal is to handle this through
644              // regular flow of LoadBalancer as a favored node and not to have this special
645              // handling.
646              continue;
647            }
648            List<RegionInfo> regionsShouldMove = getSystemTables(server);
649            if (!regionsShouldMove.isEmpty()) {
650              for (RegionInfo regionInfo : regionsShouldMove) {
651                // null value for dest forces destination server to be selected by balancer
652                RegionPlan plan = new RegionPlan(regionInfo, server, null);
653                if (regionInfo.isMetaRegion()) {
654                  // Must move meta region first.
655                  LOG.info("Async MOVE of {} to newer Server={}", regionInfo.getEncodedName(),
656                    server);
657                  moveAsync(plan);
658                } else {
659                  plans.add(plan);
660                }
661              }
662            }
663            for (RegionPlan plan : plans) {
664              LOG.info("Async MOVE of {} to newer Server={}", plan.getRegionInfo().getEncodedName(),
665                server);
666              moveAsync(plan);
667            }
668          }
669        }
670      } catch (Throwable t) {
671        LOG.error(t.toString(), t);
672      }
673    }).start();
674  }
675
676  private List<RegionInfo> getSystemTables(ServerName serverName) {
677    ServerStateNode serverNode = regionStates.getServerNode(serverName);
678    if (serverNode == null) {
679      return Collections.emptyList();
680    }
681    return serverNode.getSystemRegionInfoList();
682  }
683
684  private void preTransitCheck(RegionStateNode regionNode, RegionState.State[] expectedStates)
685    throws HBaseIOException {
686    if (regionNode.getProcedure() != null) {
687      throw new HBaseIOException(
688        regionNode + " is currently in transition; pid=" + regionNode.getProcedure().getProcId());
689    }
690    if (!regionNode.isInState(expectedStates)) {
691      throw new DoNotRetryRegionException(UNEXPECTED_STATE_REGION + regionNode);
692    }
693    if (isTableDisabled(regionNode.getTable())) {
694      throw new DoNotRetryIOException(regionNode.getTable() + " is disabled for " + regionNode);
695    }
696  }
697
698  /**
699   * Create an assign TransitRegionStateProcedure. Makes sure of RegionState. Throws exception if
700   * not appropriate UNLESS override is set. Used by hbck2 but also by straightline
701   * {@link #assign(RegionInfo, ServerName)} and {@link #assignAsync(RegionInfo, ServerName)}.
702   * @see #createAssignProcedure(RegionStateNode, ServerName) for a version that does NO checking
703   *      used when only when no checking needed.
704   * @param override If false, check RegionState is appropriate for assign; if not throw exception.
705   */
706  private TransitRegionStateProcedure createAssignProcedure(RegionInfo regionInfo, ServerName sn,
707    boolean override) throws IOException {
708    RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo);
709    regionNode.lock();
710    try {
711      if (override) {
712        if (regionNode.getProcedure() != null) {
713          regionNode.unsetProcedure(regionNode.getProcedure());
714        }
715      } else {
716        preTransitCheck(regionNode, STATES_EXPECTED_ON_ASSIGN);
717      }
718      assert regionNode.getProcedure() == null;
719      return regionNode.setProcedure(
720        TransitRegionStateProcedure.assign(getProcedureEnvironment(), regionInfo, sn));
721    } finally {
722      regionNode.unlock();
723    }
724  }
725
726  /**
727   * Create an assign TransitRegionStateProcedure. Does NO checking of RegionState. Presumes
728   * appriopriate state ripe for assign.
729   * @see #createAssignProcedure(RegionInfo, ServerName, boolean)
730   */
731  private TransitRegionStateProcedure createAssignProcedure(RegionStateNode regionNode,
732    ServerName targetServer) {
733    regionNode.lock();
734    try {
735      return regionNode.setProcedure(TransitRegionStateProcedure.assign(getProcedureEnvironment(),
736        regionNode.getRegionInfo(), targetServer));
737    } finally {
738      regionNode.unlock();
739    }
740  }
741
742  public long assign(RegionInfo regionInfo, ServerName sn) throws IOException {
743    TransitRegionStateProcedure proc = createAssignProcedure(regionInfo, sn, false);
744    ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc);
745    return proc.getProcId();
746  }
747
748  public long assign(RegionInfo regionInfo) throws IOException {
749    return assign(regionInfo, null);
750  }
751
752  /**
753   * Submits a procedure that assigns a region to a target server without waiting for it to finish
754   * @param regionInfo the region we would like to assign
755   * @param sn         target server name
756   */
757  public Future<byte[]> assignAsync(RegionInfo regionInfo, ServerName sn) throws IOException {
758    return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(),
759      createAssignProcedure(regionInfo, sn, false));
760  }
761
762  /**
763   * Submits a procedure that assigns a region without waiting for it to finish
764   * @param regionInfo the region we would like to assign
765   */
766  public Future<byte[]> assignAsync(RegionInfo regionInfo) throws IOException {
767    return assignAsync(regionInfo, null);
768  }
769
770  public long unassign(RegionInfo regionInfo) throws IOException {
771    RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo);
772    if (regionNode == null) {
773      throw new UnknownRegionException("No RegionState found for " + regionInfo.getEncodedName());
774    }
775    TransitRegionStateProcedure proc;
776    regionNode.lock();
777    try {
778      preTransitCheck(regionNode, STATES_EXPECTED_ON_UNASSIGN_OR_MOVE);
779      proc = TransitRegionStateProcedure.unassign(getProcedureEnvironment(), regionInfo);
780      regionNode.setProcedure(proc);
781    } finally {
782      regionNode.unlock();
783    }
784    ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc);
785    return proc.getProcId();
786  }
787
788  public TransitRegionStateProcedure createMoveRegionProcedure(RegionInfo regionInfo,
789    ServerName targetServer) throws HBaseIOException {
790    RegionStateNode regionNode = this.regionStates.getRegionStateNode(regionInfo);
791    if (regionNode == null) {
792      throw new UnknownRegionException(
793        "No RegionStateNode found for " + regionInfo.getEncodedName() + "(Closed/Deleted?)");
794    }
795    TransitRegionStateProcedure proc;
796    regionNode.lock();
797    try {
798      preTransitCheck(regionNode, STATES_EXPECTED_ON_UNASSIGN_OR_MOVE);
799      regionNode.checkOnline();
800      proc = TransitRegionStateProcedure.move(getProcedureEnvironment(), regionInfo, targetServer);
801      regionNode.setProcedure(proc);
802    } finally {
803      regionNode.unlock();
804    }
805    return proc;
806  }
807
808  public void move(RegionInfo regionInfo) throws IOException {
809    TransitRegionStateProcedure proc = createMoveRegionProcedure(regionInfo, null);
810    ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc);
811  }
812
813  public Future<byte[]> moveAsync(RegionPlan regionPlan) throws HBaseIOException {
814    TransitRegionStateProcedure proc =
815      createMoveRegionProcedure(regionPlan.getRegionInfo(), regionPlan.getDestination());
816    return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), proc);
817  }
818
819  public Future<byte[]> balance(RegionPlan regionPlan) throws HBaseIOException {
820    ServerName current =
821      this.getRegionStates().getRegionAssignments().get(regionPlan.getRegionInfo());
822    if (current == null || !current.equals(regionPlan.getSource())) {
823      LOG.debug("Skip region plan {}, source server not match, current region location is {}",
824        regionPlan, current == null ? "(null)" : current);
825      return null;
826    }
827    return moveAsync(regionPlan);
828  }
829
830  // ============================================================================================
831  // RegionTransition procedures helpers
832  // ============================================================================================
833
834  /**
835   * Create round-robin assigns. Use on table creation to distribute out regions across cluster.
836   * @return AssignProcedures made out of the passed in <code>hris</code> and a call to the balancer
837   *         to populate the assigns with targets chosen using round-robin (default balancer
838   *         scheme). If at assign-time, the target chosen is no longer up, thats fine, the
839   *         AssignProcedure will ask the balancer for a new target, and so on.
840   */
841  public TransitRegionStateProcedure[] createRoundRobinAssignProcedures(List<RegionInfo> hris,
842    List<ServerName> serversToExclude) {
843    if (hris.isEmpty()) {
844      return new TransitRegionStateProcedure[0];
845    }
846
847    if (
848      serversToExclude != null && this.master.getServerManager().getOnlineServersList().size() == 1
849    ) {
850      LOG.debug("Only one region server found and hence going ahead with the assignment");
851      serversToExclude = null;
852    }
853    try {
854      // Ask the balancer to assign our regions. Pass the regions en masse. The balancer can do
855      // a better job if it has all the assignments in the one lump.
856      Map<ServerName, List<RegionInfo>> assignments = getBalancer().roundRobinAssignment(hris,
857        this.master.getServerManager().createDestinationServersList(serversToExclude));
858      // Return mid-method!
859      return createAssignProcedures(assignments);
860    } catch (HBaseIOException hioe) {
861      LOG.warn("Failed roundRobinAssignment", hioe);
862    }
863    // If an error above, fall-through to this simpler assign. Last resort.
864    return createAssignProcedures(hris);
865  }
866
867  /**
868   * Create round-robin assigns. Use on table creation to distribute out regions across cluster.
869   * @return AssignProcedures made out of the passed in <code>hris</code> and a call to the balancer
870   *         to populate the assigns with targets chosen using round-robin (default balancer
871   *         scheme). If at assign-time, the target chosen is no longer up, thats fine, the
872   *         AssignProcedure will ask the balancer for a new target, and so on.
873   */
874  public TransitRegionStateProcedure[] createRoundRobinAssignProcedures(List<RegionInfo> hris) {
875    return createRoundRobinAssignProcedures(hris, null);
876  }
877
878  static int compare(TransitRegionStateProcedure left, TransitRegionStateProcedure right) {
879    if (left.getRegion().isMetaRegion()) {
880      if (right.getRegion().isMetaRegion()) {
881        return RegionInfo.COMPARATOR.compare(left.getRegion(), right.getRegion());
882      }
883      return -1;
884    } else if (right.getRegion().isMetaRegion()) {
885      return +1;
886    }
887    if (left.getRegion().getTable().isSystemTable()) {
888      if (right.getRegion().getTable().isSystemTable()) {
889        return RegionInfo.COMPARATOR.compare(left.getRegion(), right.getRegion());
890      }
891      return -1;
892    } else if (right.getRegion().getTable().isSystemTable()) {
893      return +1;
894    }
895    return RegionInfo.COMPARATOR.compare(left.getRegion(), right.getRegion());
896  }
897
898  /**
899   * Create one TransitRegionStateProcedure to assign a region w/o specifying a target server. This
900   * method is called from HBCK2.
901   * @return an assign or null
902   */
903  public TransitRegionStateProcedure createOneAssignProcedure(RegionInfo ri, boolean override) {
904    TransitRegionStateProcedure trsp = null;
905    try {
906      trsp = createAssignProcedure(ri, null, override);
907    } catch (IOException ioe) {
908      LOG.info(
909        "Failed {} assign, override={}"
910          + (override ? "" : "; set override to by-pass state checks."),
911        ri.getEncodedName(), override, ioe);
912    }
913    return trsp;
914  }
915
916  /**
917   * Create one TransitRegionStateProcedure to unassign a region. This method is called from HBCK2.
918   * @return an unassign or null
919   */
920  public TransitRegionStateProcedure createOneUnassignProcedure(RegionInfo ri, boolean override) {
921    RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(ri);
922    TransitRegionStateProcedure trsp = null;
923    regionNode.lock();
924    try {
925      if (override) {
926        if (regionNode.getProcedure() != null) {
927          regionNode.unsetProcedure(regionNode.getProcedure());
928        }
929      } else {
930        // This is where we could throw an exception; i.e. override is false.
931        preTransitCheck(regionNode, STATES_EXPECTED_ON_UNASSIGN_OR_MOVE);
932      }
933      assert regionNode.getProcedure() == null;
934      trsp =
935        TransitRegionStateProcedure.unassign(getProcedureEnvironment(), regionNode.getRegionInfo());
936      regionNode.setProcedure(trsp);
937    } catch (IOException ioe) {
938      // 'override' must be false here.
939      LOG.info("Failed {} unassign, override=false; set override to by-pass state checks.",
940        ri.getEncodedName(), ioe);
941    } finally {
942      regionNode.unlock();
943    }
944    return trsp;
945  }
946
947  /**
948   * Create an array of TransitRegionStateProcedure w/o specifying a target server. Used as fallback
949   * of caller is unable to do {@link #createAssignProcedures(Map)}.
950   * <p/>
951   * If no target server, at assign time, we will try to use the former location of the region if
952   * one exists. This is how we 'retain' the old location across a server restart.
953   * <p/>
954   * Should only be called when you can make sure that no one can touch these regions other than
955   * you. For example, when you are creating or enabling table. Presumes all Regions are in
956   * appropriate state ripe for assign; no checking of Region state is done in here.
957   * @see #createAssignProcedures(Map)
958   */
959  public TransitRegionStateProcedure[] createAssignProcedures(List<RegionInfo> hris) {
960    return hris.stream().map(hri -> regionStates.getOrCreateRegionStateNode(hri))
961      .map(regionNode -> createAssignProcedure(regionNode, null)).sorted(AssignmentManager::compare)
962      .toArray(TransitRegionStateProcedure[]::new);
963  }
964
965  /**
966   * Tied to {@link #createAssignProcedures(List)} in that it is called if caller is unable to run
967   * this method. Presumes all Regions are in appropriate state ripe for assign; no checking of
968   * Region state is done in here.
969   * @param assignments Map of assignments from which we produce an array of AssignProcedures.
970   * @return Assignments made from the passed in <code>assignments</code>
971   * @see #createAssignProcedures(List)
972   */
973  private TransitRegionStateProcedure[]
974    createAssignProcedures(Map<ServerName, List<RegionInfo>> assignments) {
975    return assignments.entrySet().stream()
976      .flatMap(e -> e.getValue().stream().map(hri -> regionStates.getOrCreateRegionStateNode(hri))
977        .map(regionNode -> createAssignProcedure(regionNode, e.getKey())))
978      .sorted(AssignmentManager::compare).toArray(TransitRegionStateProcedure[]::new);
979  }
980
981  // for creating unassign TRSP when disabling a table or closing excess region replicas
982  private TransitRegionStateProcedure forceCreateUnssignProcedure(RegionStateNode regionNode) {
983    regionNode.lock();
984    try {
985      if (regionNode.isInState(State.OFFLINE, State.CLOSED, State.SPLIT)) {
986        return null;
987      }
988      // in general, a split parent should be in CLOSED or SPLIT state, but anyway, let's check it
989      // here for safety
990      if (regionNode.getRegionInfo().isSplit()) {
991        LOG.warn("{} is a split parent but not in CLOSED or SPLIT state", regionNode);
992        return null;
993      }
994      // As in DisableTableProcedure or ModifyTableProcedure, we will hold the xlock for table, so
995      // we can make sure that this procedure has not been executed yet, as TRSP will hold the
996      // shared lock for table all the time. So here we will unset it and when it is actually
997      // executed, it will find that the attach procedure is not itself and quit immediately.
998      if (regionNode.getProcedure() != null) {
999        regionNode.unsetProcedure(regionNode.getProcedure());
1000      }
1001      return regionNode.setProcedure(TransitRegionStateProcedure.unassign(getProcedureEnvironment(),
1002        regionNode.getRegionInfo()));
1003    } finally {
1004      regionNode.unlock();
1005    }
1006  }
1007
1008  /**
1009   * Called by DisableTableProcedure to unassign all the regions for a table.
1010   */
1011  public TransitRegionStateProcedure[] createUnassignProceduresForDisabling(TableName tableName) {
1012    return regionStates.getTableRegionStateNodes(tableName).stream()
1013      .map(this::forceCreateUnssignProcedure).filter(p -> p != null)
1014      .toArray(TransitRegionStateProcedure[]::new);
1015  }
1016
1017  /**
1018   * Called by ModifyTableProcedures to unassign all the excess region replicas for a table.
1019   */
1020  public TransitRegionStateProcedure[] createUnassignProceduresForClosingExcessRegionReplicas(
1021    TableName tableName, int newReplicaCount) {
1022    return regionStates.getTableRegionStateNodes(tableName).stream()
1023      .filter(regionNode -> regionNode.getRegionInfo().getReplicaId() >= newReplicaCount)
1024      .map(this::forceCreateUnssignProcedure).filter(p -> p != null)
1025      .toArray(TransitRegionStateProcedure[]::new);
1026  }
1027
1028  public SplitTableRegionProcedure createSplitProcedure(final RegionInfo regionToSplit,
1029    final byte[] splitKey) throws IOException {
1030    return new SplitTableRegionProcedure(getProcedureEnvironment(), regionToSplit, splitKey);
1031  }
1032
1033  public MergeTableRegionsProcedure createMergeProcedure(RegionInfo... ris) throws IOException {
1034    return new MergeTableRegionsProcedure(getProcedureEnvironment(), ris, false);
1035  }
1036
1037  /**
1038   * Delete the region states. This is called by "DeleteTable"
1039   */
1040  public void deleteTable(final TableName tableName) throws IOException {
1041    final ArrayList<RegionInfo> regions = regionStates.getTableRegionsInfo(tableName);
1042    regionStateStore.deleteRegions(regions);
1043    for (int i = 0; i < regions.size(); ++i) {
1044      final RegionInfo regionInfo = regions.get(i);
1045      // we expect the region to be offline
1046      regionStates.removeFromOfflineRegions(regionInfo);
1047      regionStates.deleteRegion(regionInfo);
1048    }
1049  }
1050
1051  // ============================================================================================
1052  // RS Region Transition Report helpers
1053  // ============================================================================================
1054  private void reportRegionStateTransition(ReportRegionStateTransitionResponse.Builder builder,
1055    ServerName serverName, List<RegionStateTransition> transitionList) throws IOException {
1056    for (RegionStateTransition transition : transitionList) {
1057      switch (transition.getTransitionCode()) {
1058        case OPENED:
1059        case FAILED_OPEN:
1060        case CLOSED:
1061          assert transition.getRegionInfoCount() == 1 : transition;
1062          final RegionInfo hri = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0));
1063          long procId =
1064            transition.getProcIdCount() > 0 ? transition.getProcId(0) : Procedure.NO_PROC_ID;
1065          updateRegionTransition(serverName, transition.getTransitionCode(), hri,
1066            transition.hasOpenSeqNum() ? transition.getOpenSeqNum() : HConstants.NO_SEQNUM, procId);
1067          break;
1068        case READY_TO_SPLIT:
1069        case SPLIT:
1070        case SPLIT_REVERTED:
1071          assert transition.getRegionInfoCount() == 3 : transition;
1072          final RegionInfo parent = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0));
1073          final RegionInfo splitA = ProtobufUtil.toRegionInfo(transition.getRegionInfo(1));
1074          final RegionInfo splitB = ProtobufUtil.toRegionInfo(transition.getRegionInfo(2));
1075          updateRegionSplitTransition(serverName, transition.getTransitionCode(), parent, splitA,
1076            splitB);
1077          break;
1078        case READY_TO_MERGE:
1079        case MERGED:
1080        case MERGE_REVERTED:
1081          assert transition.getRegionInfoCount() == 3 : transition;
1082          final RegionInfo merged = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0));
1083          final RegionInfo mergeA = ProtobufUtil.toRegionInfo(transition.getRegionInfo(1));
1084          final RegionInfo mergeB = ProtobufUtil.toRegionInfo(transition.getRegionInfo(2));
1085          updateRegionMergeTransition(serverName, transition.getTransitionCode(), merged, mergeA,
1086            mergeB);
1087          break;
1088      }
1089    }
1090  }
1091
1092  public ReportRegionStateTransitionResponse reportRegionStateTransition(
1093    final ReportRegionStateTransitionRequest req) throws PleaseHoldException {
1094    ReportRegionStateTransitionResponse.Builder builder =
1095      ReportRegionStateTransitionResponse.newBuilder();
1096    ServerName serverName = ProtobufUtil.toServerName(req.getServer());
1097    ServerStateNode serverNode = regionStates.getOrCreateServer(serverName);
1098    // here we have to acquire a read lock instead of a simple exclusive lock. This is because that
1099    // we should not block other reportRegionStateTransition call from the same region server. This
1100    // is not only about performance, but also to prevent dead lock. Think of the meta region is
1101    // also on the same region server and you hold the lock which blocks the
1102    // reportRegionStateTransition for meta, and since meta is not online, you will block inside the
1103    // lock protection to wait for meta online...
1104    serverNode.readLock().lock();
1105    try {
1106      // we only accept reportRegionStateTransition if the region server is online, see the comment
1107      // above in submitServerCrash method and HBASE-21508 for more details.
1108      if (serverNode.isInState(ServerState.ONLINE)) {
1109        try {
1110          reportRegionStateTransition(builder, serverName, req.getTransitionList());
1111        } catch (PleaseHoldException e) {
1112          LOG.trace("Failed transition ", e);
1113          throw e;
1114        } catch (UnsupportedOperationException | IOException e) {
1115          // TODO: at the moment we have a single error message and the RS will abort
1116          // if the master says that one of the region transitions failed.
1117          LOG.warn("Failed transition", e);
1118          builder.setErrorMessage("Failed transition " + e.getMessage());
1119        }
1120      } else {
1121        LOG.warn("The region server {} is already dead, skip reportRegionStateTransition call",
1122          serverName);
1123        builder.setErrorMessage("You are dead");
1124      }
1125    } finally {
1126      serverNode.readLock().unlock();
1127    }
1128
1129    return builder.build();
1130  }
1131
1132  private void updateRegionTransition(ServerName serverName, TransitionCode state,
1133    RegionInfo regionInfo, long seqId, long procId) throws IOException {
1134    checkMetaLoaded(regionInfo);
1135
1136    RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo);
1137    if (regionNode == null) {
1138      // the table/region is gone. maybe a delete, split, merge
1139      throw new UnexpectedStateException(String.format(
1140        "Server %s was trying to transition region %s to %s. but Region is not known.", serverName,
1141        regionInfo, state));
1142    }
1143    LOG.trace("Update region transition serverName={} region={} regionState={}", serverName,
1144      regionNode, state);
1145
1146    ServerStateNode serverNode = regionStates.getOrCreateServer(serverName);
1147    regionNode.lock();
1148    try {
1149      if (!reportTransition(regionNode, serverNode, state, seqId, procId)) {
1150        // Don't log WARN if shutting down cluster; during shutdown. Avoid the below messages:
1151        // 2018-08-13 10:45:10,551 WARN ...AssignmentManager: No matching procedure found for
1152        // rit=OPEN, location=ve0538.halxg.cloudera.com,16020,1533493000958,
1153        // table=IntegrationTestBigLinkedList, region=65ab289e2fc1530df65f6c3d7cde7aa5 transition
1154        // to CLOSED
1155        // These happen because on cluster shutdown, we currently let the RegionServers close
1156        // regions. This is the only time that region close is not run by the Master (so cluster
1157        // goes down fast). Consider changing it so Master runs all shutdowns.
1158        if (
1159          this.master.getServerManager().isClusterShutdown() && state.equals(TransitionCode.CLOSED)
1160        ) {
1161          LOG.info("RegionServer {} {}", state, regionNode.getRegionInfo().getEncodedName());
1162        } else {
1163          LOG.warn("No matching procedure found for {} transition on {} to {}", serverName,
1164            regionNode, state);
1165        }
1166      }
1167    } finally {
1168      regionNode.unlock();
1169    }
1170  }
1171
1172  private boolean reportTransition(RegionStateNode regionNode, ServerStateNode serverNode,
1173    TransitionCode state, long seqId, long procId) throws IOException {
1174    ServerName serverName = serverNode.getServerName();
1175    TransitRegionStateProcedure proc = regionNode.getProcedure();
1176    if (proc == null) {
1177      return false;
1178    }
1179    proc.reportTransition(master.getMasterProcedureExecutor().getEnvironment(), regionNode,
1180      serverName, state, seqId, procId);
1181    return true;
1182  }
1183
1184  private void updateRegionSplitTransition(final ServerName serverName, final TransitionCode state,
1185    final RegionInfo parent, final RegionInfo hriA, final RegionInfo hriB) throws IOException {
1186    checkMetaLoaded(parent);
1187
1188    if (state != TransitionCode.READY_TO_SPLIT) {
1189      throw new UnexpectedStateException(
1190        "unsupported split regionState=" + state + " for parent region " + parent
1191          + " maybe an old RS (< 2.0) had the operation in progress");
1192    }
1193
1194    // sanity check on the request
1195    if (!Bytes.equals(hriA.getEndKey(), hriB.getStartKey())) {
1196      throw new UnsupportedOperationException("unsupported split request with bad keys: parent="
1197        + parent + " hriA=" + hriA + " hriB=" + hriB);
1198    }
1199
1200    if (!master.isSplitOrMergeEnabled(MasterSwitchType.SPLIT)) {
1201      LOG.warn("Split switch is off! skip split of " + parent);
1202      throw new DoNotRetryIOException(
1203        "Split region " + parent.getRegionNameAsString() + " failed due to split switch off");
1204    }
1205
1206    // Submit the Split procedure
1207    final byte[] splitKey = hriB.getStartKey();
1208    if (LOG.isDebugEnabled()) {
1209      LOG.debug("Split request from " + serverName + ", parent=" + parent + " splitKey="
1210        + Bytes.toStringBinary(splitKey));
1211    }
1212    // Processing this report happens asynchronously from other activities which can mutate
1213    // the region state. For example, a split procedure may already be running for this parent.
1214    // A split procedure cannot succeed if the parent region is no longer open, so we can
1215    // ignore it in that case.
1216    // Note that submitting more than one split procedure for a given region is
1217    // harmless -- the split is fenced in the procedure handling -- but it would be noisy in
1218    // the logs. Only one procedure can succeed. The other procedure(s) would abort during
1219    // initialization and report failure with WARN level logging.
1220    RegionState parentState = regionStates.getRegionState(parent);
1221    if (parentState != null && parentState.isOpened()) {
1222      master.getMasterProcedureExecutor().submitProcedure(createSplitProcedure(parent, splitKey));
1223    } else {
1224      LOG.info("Ignoring split request from " + serverName + ", parent=" + parent
1225        + " because parent is unknown or not open");
1226      return;
1227    }
1228
1229    // If the RS is < 2.0 throw an exception to abort the operation, we are handling the split
1230    if (master.getServerManager().getVersionNumber(serverName) < 0x0200000) {
1231      throw new UnsupportedOperationException(
1232        String.format("Split handled by the master: " + "parent=%s hriA=%s hriB=%s",
1233          parent.getShortNameToLog(), hriA, hriB));
1234    }
1235  }
1236
1237  private void updateRegionMergeTransition(final ServerName serverName, final TransitionCode state,
1238    final RegionInfo merged, final RegionInfo hriA, final RegionInfo hriB) throws IOException {
1239    checkMetaLoaded(merged);
1240
1241    if (state != TransitionCode.READY_TO_MERGE) {
1242      throw new UnexpectedStateException(
1243        "Unsupported merge regionState=" + state + " for regionA=" + hriA + " regionB=" + hriB
1244          + " merged=" + merged + " maybe an old RS (< 2.0) had the operation in progress");
1245    }
1246
1247    if (!master.isSplitOrMergeEnabled(MasterSwitchType.MERGE)) {
1248      LOG.warn("Merge switch is off! skip merge of regionA=" + hriA + " regionB=" + hriB);
1249      throw new DoNotRetryIOException(
1250        "Merge of regionA=" + hriA + " regionB=" + hriB + " failed because merge switch is off");
1251    }
1252
1253    // Submit the Merge procedure
1254    if (LOG.isDebugEnabled()) {
1255      LOG.debug("Handling merge request from RS=" + merged + ", merged=" + merged);
1256    }
1257    master.getMasterProcedureExecutor().submitProcedure(createMergeProcedure(hriA, hriB));
1258
1259    // If the RS is < 2.0 throw an exception to abort the operation, we are handling the merge
1260    if (master.getServerManager().getVersionNumber(serverName) < 0x0200000) {
1261      throw new UnsupportedOperationException(
1262        String.format("Merge not handled yet: regionState=%s merged=%s hriA=%s hriB=%s", state,
1263          merged, hriA, hriB));
1264    }
1265  }
1266
1267  // ============================================================================================
1268  // RS Status update (report online regions) helpers
1269  // ============================================================================================
1270  /**
1271   * The master will call this method when the RS send the regionServerReport(). The report will
1272   * contains the "online regions". This method will check the the online regions against the
1273   * in-memory state of the AM, and we will log a warn message if there is a mismatch. This is
1274   * because that there is no fencing between the reportRegionStateTransition method and
1275   * regionServerReport method, so there could be race and introduce inconsistency here, but
1276   * actually there is no problem.
1277   * <p/>
1278   * Please see HBASE-21421 and HBASE-21463 for more details.
1279   */
1280  public void reportOnlineRegions(ServerName serverName, Set<byte[]> regionNames) {
1281    if (!isRunning()) {
1282      return;
1283    }
1284    if (LOG.isTraceEnabled()) {
1285      LOG.trace("ReportOnlineRegions {} regionCount={}, metaLoaded={} {}", serverName,
1286        regionNames.size(), isMetaLoaded(),
1287        regionNames.stream().map(Bytes::toStringBinary).collect(Collectors.toList()));
1288    }
1289
1290    ServerStateNode serverNode = regionStates.getOrCreateServer(serverName);
1291    synchronized (serverNode) {
1292      if (!serverNode.isInState(ServerState.ONLINE)) {
1293        LOG.warn("Got a report from a server result in state " + serverNode.getState());
1294        return;
1295      }
1296    }
1297
1298    // Track the regionserver reported online regions in memory.
1299    synchronized (rsReports) {
1300      rsReports.put(serverName, regionNames);
1301    }
1302
1303    if (regionNames.isEmpty()) {
1304      // nothing to do if we don't have regions
1305      LOG.trace("no online region found on {}", serverName);
1306      return;
1307    }
1308    if (!isMetaLoaded()) {
1309      // we are still on startup, skip checking
1310      return;
1311    }
1312    // The Heartbeat tells us of what regions are on the region serve, check the state.
1313    checkOnlineRegionsReport(serverNode, regionNames);
1314  }
1315
1316  /**
1317   * Close <code>regionName</code> on <code>sn</code> silently and immediately without using a
1318   * Procedure or going via hbase:meta. For case where a RegionServer's hosting of a Region is not
1319   * aligned w/ the Master's accounting of Region state. This is for cleaning up an error in
1320   * accounting.
1321   */
1322  private void closeRegionSilently(ServerName sn, byte[] regionName) {
1323    try {
1324      RegionInfo ri = MetaTableAccessor.parseRegionInfoFromRegionName(regionName);
1325      // Pass -1 for timeout. Means do not wait.
1326      ServerManager.closeRegionSilentlyAndWait(this.master.getClusterConnection(), sn, ri, -1);
1327    } catch (Exception e) {
1328      LOG.error("Failed trying to close {} on {}", Bytes.toStringBinary(regionName), sn, e);
1329    }
1330  }
1331
1332  /**
1333   * Check that what the RegionServer reports aligns with the Master's image. If disagreement, we
1334   * will tell the RegionServer to expediently close a Region we do not think it should have.
1335   */
1336  private void checkOnlineRegionsReport(ServerStateNode serverNode, Set<byte[]> regionNames) {
1337    ServerName serverName = serverNode.getServerName();
1338    for (byte[] regionName : regionNames) {
1339      if (!isRunning()) {
1340        return;
1341      }
1342      RegionStateNode regionNode = regionStates.getRegionStateNodeFromName(regionName);
1343      if (regionNode == null) {
1344        String regionNameAsStr = Bytes.toStringBinary(regionName);
1345        LOG.warn("No RegionStateNode for {} but reported as up on {}; closing...", regionNameAsStr,
1346          serverName);
1347        closeRegionSilently(serverNode.getServerName(), regionName);
1348        continue;
1349      }
1350      final long lag = 1000;
1351      regionNode.lock();
1352      try {
1353        long diff = EnvironmentEdgeManager.currentTime() - regionNode.getLastUpdate();
1354        if (regionNode.isInState(State.OPENING, State.OPEN)) {
1355          // This is possible as a region server has just closed a region but the region server
1356          // report is generated before the closing, but arrive after the closing. Make sure there
1357          // is some elapsed time so less false alarms.
1358          if (!regionNode.getRegionLocation().equals(serverName) && diff > lag) {
1359            LOG.warn("Reporting {} server does not match {} (time since last "
1360              + "update={}ms); closing...", serverName, regionNode, diff);
1361            closeRegionSilently(serverNode.getServerName(), regionName);
1362          }
1363        } else if (!regionNode.isInState(State.CLOSING, State.SPLITTING)) {
1364          // So, we can get report that a region is CLOSED or SPLIT because a heartbeat
1365          // came in at about same time as a region transition. Make sure there is some
1366          // elapsed time so less false alarms.
1367          if (diff > lag) {
1368            LOG.warn("Reporting {} state does not match {} (time since last update={}ms)",
1369              serverName, regionNode, diff);
1370          }
1371        }
1372      } finally {
1373        regionNode.unlock();
1374      }
1375    }
1376  }
1377
1378  // ============================================================================================
1379  // RIT chore
1380  // ============================================================================================
1381  private static class RegionInTransitionChore extends ProcedureInMemoryChore<MasterProcedureEnv> {
1382    public RegionInTransitionChore(final int timeoutMsec) {
1383      super(timeoutMsec);
1384    }
1385
1386    @Override
1387    protected void periodicExecute(final MasterProcedureEnv env) {
1388      final AssignmentManager am = env.getAssignmentManager();
1389
1390      final RegionInTransitionStat ritStat = am.computeRegionInTransitionStat();
1391      if (ritStat.hasRegionsOverThreshold()) {
1392        for (RegionState hri : ritStat.getRegionOverThreshold()) {
1393          am.handleRegionOverStuckWarningThreshold(hri.getRegion());
1394        }
1395      }
1396
1397      // update metrics
1398      am.updateRegionsInTransitionMetrics(ritStat);
1399    }
1400  }
1401
1402  private static class DeadServerMetricRegionChore
1403    extends ProcedureInMemoryChore<MasterProcedureEnv> {
1404    public DeadServerMetricRegionChore(final int timeoutMsec) {
1405      super(timeoutMsec);
1406    }
1407
1408    @Override
1409    protected void periodicExecute(final MasterProcedureEnv env) {
1410      final ServerManager sm = env.getMasterServices().getServerManager();
1411      final AssignmentManager am = env.getAssignmentManager();
1412      // To minimize inconsistencies we are not going to snapshot live servers in advance in case
1413      // new servers are added; OTOH we don't want to add heavy sync for a consistent view since
1414      // this is for metrics. Instead, we're going to check each regions as we go; to avoid making
1415      // too many checks, we maintain a local lists of server, limiting us to false negatives. If
1416      // we miss some recently-dead server, we'll just see it next time.
1417      Set<ServerName> recentlyLiveServers = new HashSet<>();
1418      int deadRegions = 0, unknownRegions = 0;
1419      for (RegionStateNode rsn : am.getRegionStates().getRegionStateNodes()) {
1420        if (rsn.getState() != State.OPEN) {
1421          continue; // Opportunistic check, should quickly skip RITs, offline tables, etc.
1422        }
1423        // Do not need to acquire region state lock as this is only for showing metrics.
1424        ServerName sn = rsn.getRegionLocation();
1425        State state = rsn.getState();
1426        if (state != State.OPEN) {
1427          continue; // Mostly skipping RITs that are already being take care of.
1428        }
1429        if (sn == null) {
1430          ++unknownRegions; // Opened on null?
1431          continue;
1432        }
1433        if (recentlyLiveServers.contains(sn)) {
1434          continue;
1435        }
1436        ServerManager.ServerLiveState sls = sm.isServerKnownAndOnline(sn);
1437        switch (sls) {
1438          case LIVE:
1439            recentlyLiveServers.add(sn);
1440            break;
1441          case DEAD:
1442            ++deadRegions;
1443            break;
1444          case UNKNOWN:
1445            ++unknownRegions;
1446            break;
1447          default:
1448            throw new AssertionError("Unexpected " + sls);
1449        }
1450      }
1451      if (deadRegions > 0 || unknownRegions > 0) {
1452        LOG.info("Found {} OPEN regions on dead servers and {} OPEN regions on unknown servers",
1453          deadRegions, unknownRegions);
1454      }
1455
1456      am.updateDeadServerRegionMetrics(deadRegions, unknownRegions);
1457    }
1458  }
1459
1460  public RegionInTransitionStat computeRegionInTransitionStat() {
1461    final RegionInTransitionStat rit = new RegionInTransitionStat(getConfiguration());
1462    rit.update(this);
1463    return rit;
1464  }
1465
1466  public static class RegionInTransitionStat {
1467    private final int ritThreshold;
1468
1469    private HashMap<String, RegionState> ritsOverThreshold = null;
1470    private long statTimestamp;
1471    private long oldestRITTime = 0;
1472    private int totalRITsTwiceThreshold = 0;
1473    private int totalRITs = 0;
1474
1475    public RegionInTransitionStat(final Configuration conf) {
1476      this.ritThreshold =
1477        conf.getInt(METRICS_RIT_STUCK_WARNING_THRESHOLD, DEFAULT_RIT_STUCK_WARNING_THRESHOLD);
1478    }
1479
1480    public int getRITThreshold() {
1481      return ritThreshold;
1482    }
1483
1484    public long getTimestamp() {
1485      return statTimestamp;
1486    }
1487
1488    public int getTotalRITs() {
1489      return totalRITs;
1490    }
1491
1492    public long getOldestRITTime() {
1493      return oldestRITTime;
1494    }
1495
1496    public int getTotalRITsOverThreshold() {
1497      Map<String, RegionState> m = this.ritsOverThreshold;
1498      return m != null ? m.size() : 0;
1499    }
1500
1501    public boolean hasRegionsTwiceOverThreshold() {
1502      return totalRITsTwiceThreshold > 0;
1503    }
1504
1505    public boolean hasRegionsOverThreshold() {
1506      Map<String, RegionState> m = this.ritsOverThreshold;
1507      return m != null && !m.isEmpty();
1508    }
1509
1510    public Collection<RegionState> getRegionOverThreshold() {
1511      Map<String, RegionState> m = this.ritsOverThreshold;
1512      return m != null ? m.values() : Collections.emptySet();
1513    }
1514
1515    public boolean isRegionOverThreshold(final RegionInfo regionInfo) {
1516      Map<String, RegionState> m = this.ritsOverThreshold;
1517      return m != null && m.containsKey(regionInfo.getEncodedName());
1518    }
1519
1520    public boolean isRegionTwiceOverThreshold(final RegionInfo regionInfo) {
1521      Map<String, RegionState> m = this.ritsOverThreshold;
1522      if (m == null) {
1523        return false;
1524      }
1525      final RegionState state = m.get(regionInfo.getEncodedName());
1526      if (state == null) {
1527        return false;
1528      }
1529      return (statTimestamp - state.getStamp()) > (ritThreshold * 2);
1530    }
1531
1532    protected void update(final AssignmentManager am) {
1533      final RegionStates regionStates = am.getRegionStates();
1534      this.statTimestamp = EnvironmentEdgeManager.currentTime();
1535      update(regionStates.getRegionsStateInTransition(), statTimestamp);
1536      update(regionStates.getRegionFailedOpen(), statTimestamp);
1537
1538      if (LOG.isDebugEnabled() && ritsOverThreshold != null && !ritsOverThreshold.isEmpty()) {
1539        LOG.debug("RITs over threshold: {}",
1540          ritsOverThreshold.entrySet().stream()
1541            .map(e -> e.getKey() + ":" + e.getValue().getState().name())
1542            .collect(Collectors.joining("\n")));
1543      }
1544    }
1545
1546    private void update(final Collection<RegionState> regions, final long currentTime) {
1547      for (RegionState state : regions) {
1548        totalRITs++;
1549        final long ritTime = currentTime - state.getStamp();
1550        if (ritTime > ritThreshold) {
1551          if (ritsOverThreshold == null) {
1552            ritsOverThreshold = new HashMap<String, RegionState>();
1553          }
1554          ritsOverThreshold.put(state.getRegion().getEncodedName(), state);
1555          totalRITsTwiceThreshold += (ritTime > (ritThreshold * 2)) ? 1 : 0;
1556        }
1557        if (oldestRITTime < ritTime) {
1558          oldestRITTime = ritTime;
1559        }
1560      }
1561    }
1562  }
1563
1564  private void updateRegionsInTransitionMetrics(final RegionInTransitionStat ritStat) {
1565    metrics.updateRITOldestAge(ritStat.getOldestRITTime());
1566    metrics.updateRITCount(ritStat.getTotalRITs());
1567    metrics.updateRITCountOverThreshold(ritStat.getTotalRITsOverThreshold());
1568  }
1569
1570  private void updateDeadServerRegionMetrics(int deadRegions, int unknownRegions) {
1571    metrics.updateDeadServerOpenRegions(deadRegions);
1572    metrics.updateUnknownServerOpenRegions(unknownRegions);
1573  }
1574
1575  private void handleRegionOverStuckWarningThreshold(final RegionInfo regionInfo) {
1576    final RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo);
1577    // if (regionNode.isStuck()) {
1578    LOG.warn("STUCK Region-In-Transition {}", regionNode);
1579  }
1580
1581  // ============================================================================================
1582  // TODO: Master load/bootstrap
1583  // ============================================================================================
1584  public void joinCluster() throws IOException {
1585    long startTime = System.nanoTime();
1586    LOG.debug("Joining cluster...");
1587
1588    // Scan hbase:meta to build list of existing regions, servers, and assignment.
1589    // hbase:meta is online now or will be. Inside loadMeta, we keep trying. Can't make progress
1590    // w/o meta.
1591    loadMeta();
1592
1593    while (master.getServerManager().countOfRegionServers() < 1) {
1594      LOG.info("Waiting for RegionServers to join; current count={}",
1595        master.getServerManager().countOfRegionServers());
1596      Threads.sleep(250);
1597    }
1598    LOG.info("Number of RegionServers={}", master.getServerManager().countOfRegionServers());
1599
1600    // Start the chores
1601    master.getMasterProcedureExecutor().addChore(this.ritChore);
1602    master.getMasterProcedureExecutor().addChore(this.deadMetricChore);
1603
1604    long costMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
1605    LOG.info("Joined the cluster in {}", StringUtils.humanTimeDiff(costMs));
1606  }
1607
1608  /**
1609   * Create assign procedure for offline regions. Just follow the old
1610   * processofflineServersWithOnlineRegions method. Since now we do not need to deal with dead
1611   * server any more, we only deal with the regions in OFFLINE state in this method. And this is a
1612   * bit strange, that for new regions, we will add it in CLOSED state instead of OFFLINE state, and
1613   * usually there will be a procedure to track them. The processofflineServersWithOnlineRegions is
1614   * a legacy from long ago, as things are going really different now, maybe we do not need this
1615   * method any more. Need to revisit later.
1616   */
1617  // Public so can be run by the Master as part of the startup. Needs hbase:meta to be online.
1618  // Needs to be done after the table state manager has been started.
1619  public void processOfflineRegions() {
1620    TransitRegionStateProcedure[] procs =
1621      regionStates.getRegionStateNodes().stream().filter(rsn -> rsn.isInState(State.OFFLINE))
1622        .filter(rsn -> isTableEnabled(rsn.getRegionInfo().getTable())).map(rsn -> {
1623          rsn.lock();
1624          try {
1625            if (rsn.getProcedure() != null) {
1626              return null;
1627            } else {
1628              return rsn.setProcedure(TransitRegionStateProcedure.assign(getProcedureEnvironment(),
1629                rsn.getRegionInfo(), null));
1630            }
1631          } finally {
1632            rsn.unlock();
1633          }
1634        }).filter(p -> p != null).toArray(TransitRegionStateProcedure[]::new);
1635    if (procs.length > 0) {
1636      master.getMasterProcedureExecutor().submitProcedures(procs);
1637    }
1638  }
1639
1640  /*
1641   * AM internal RegionStateStore.RegionStateVisitor implementation. To be used when scanning META
1642   * table for region rows, using RegionStateStore utility methods. RegionStateStore methods will
1643   * convert Result into proper RegionInfo instances, but those would still need to be added into
1644   * AssignmentManager.regionStates in-memory cache. RegionMetaLoadingVisitor.visitRegionState
1645   * method provides the logic for adding RegionInfo instances as loaded from latest META scan into
1646   * AssignmentManager.regionStates.
1647   */
1648  private class RegionMetaLoadingVisitor implements RegionStateStore.RegionStateVisitor {
1649
1650    @Override
1651    public void visitRegionState(Result result, final RegionInfo regionInfo, final State state,
1652      final ServerName regionLocation, final ServerName lastHost, final long openSeqNum) {
1653      if (
1654        state == null && regionLocation == null && lastHost == null
1655          && openSeqNum == SequenceId.NO_SEQUENCE_ID
1656      ) {
1657        // This is a row with nothing in it.
1658        LOG.warn("Skipping empty row={}", result);
1659        return;
1660      }
1661      State localState = state;
1662      if (localState == null) {
1663        // No region state column data in hbase:meta table! Are I doing a rolling upgrade from
1664        // hbase1 to hbase2? Am I restoring a SNAPSHOT or otherwise adding a region to hbase:meta?
1665        // In any of these cases, state is empty. For now, presume OFFLINE but there are probably
1666        // cases where we need to probe more to be sure this correct; TODO informed by experience.
1667        LOG.info(regionInfo.getEncodedName() + " regionState=null; presuming " + State.OFFLINE);
1668        localState = State.OFFLINE;
1669      }
1670      RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo);
1671      // Do not need to lock on regionNode, as we can make sure that before we finish loading
1672      // meta, all the related procedures can not be executed. The only exception is for meta
1673      // region related operations, but here we do not load the informations for meta region.
1674      regionNode.setState(localState);
1675      regionNode.setLastHost(lastHost);
1676      regionNode.setRegionLocation(regionLocation);
1677      regionNode.setOpenSeqNum(openSeqNum);
1678
1679      // Note: keep consistent with other methods, see region(Opening|Opened|Closing)
1680      // RIT/ServerCrash handling should take care of the transiting regions.
1681      if (
1682        localState.matches(State.OPEN, State.OPENING, State.CLOSING, State.SPLITTING, State.MERGING)
1683      ) {
1684        assert regionLocation != null : "found null region location for " + regionNode;
1685        regionStates.addRegionToServer(regionNode);
1686      } else if (localState == State.OFFLINE || regionInfo.isOffline()) {
1687        regionStates.addToOfflineRegions(regionNode);
1688      }
1689      if (regionNode.getProcedure() != null) {
1690        regionNode.getProcedure().stateLoaded(AssignmentManager.this, regionNode);
1691      }
1692    }
1693  };
1694
1695  /**
1696   * Query META if the given <code>RegionInfo</code> exists, adding to
1697   * <code>AssignmentManager.regionStateStore</code> cache if the region is found in META.
1698   * @param regionEncodedName encoded name for the region to be loaded from META into
1699   *                          <code>AssignmentManager.regionStateStore</code> cache
1700   * @return <code>RegionInfo</code> instance for the given region if it is present in META and got
1701   *         successfully loaded into <code>AssignmentManager.regionStateStore</code> cache,
1702   *         <b>null</b> otherwise.
1703   * @throws UnknownRegionException if any errors occur while querying meta.
1704   */
1705  public RegionInfo loadRegionFromMeta(String regionEncodedName) throws UnknownRegionException {
1706    try {
1707      RegionMetaLoadingVisitor visitor = new RegionMetaLoadingVisitor();
1708      regionStateStore.visitMetaForRegion(regionEncodedName, visitor);
1709      return regionStates.getRegionState(regionEncodedName) == null
1710        ? null
1711        : regionStates.getRegionState(regionEncodedName).getRegion();
1712    } catch (IOException e) {
1713      throw new UnknownRegionException(
1714        "Error trying to load region " + regionEncodedName + " from META", e);
1715    }
1716  }
1717
1718  private void loadMeta() throws IOException {
1719    // TODO: use a thread pool
1720    regionStateStore.visitMeta(new RegionMetaLoadingVisitor());
1721  }
1722
1723  /**
1724   * Used to check if the meta loading is done.
1725   * <p/>
1726   * if not we throw PleaseHoldException since we are rebuilding the RegionStates
1727   * @param hri region to check if it is already rebuild
1728   * @throws PleaseHoldException if meta has not been loaded yet
1729   */
1730  private void checkMetaLoaded(RegionInfo hri) throws PleaseHoldException {
1731    if (!isRunning()) {
1732      throw new PleaseHoldException("AssignmentManager not running");
1733    }
1734    boolean meta = isMetaRegion(hri);
1735    boolean metaLoaded = isMetaLoaded();
1736    if (!meta && !metaLoaded) {
1737      throw new PleaseHoldException(
1738        "Master not fully online; hbase:meta=" + meta + ", metaLoaded=" + metaLoaded);
1739    }
1740  }
1741
1742  // ============================================================================================
1743  // TODO: Metrics
1744  // ============================================================================================
1745  public int getNumRegionsOpened() {
1746    // TODO: Used by TestRegionPlacement.java and assume monotonically increasing value
1747    return 0;
1748  }
1749
1750  /**
1751   * Usually run by the Master in reaction to server crash during normal processing. Can also be
1752   * invoked via external RPC to effect repair; in the latter case, the 'force' flag is set so we
1753   * push through the SCP though context may indicate already-running-SCP (An old SCP may have
1754   * exited abnormally, or damaged cluster may still have references in hbase:meta to 'Unknown
1755   * Servers' -- servers that are not online or in dead servers list, etc.)
1756   * @param force Set if the request came in externally over RPC (via hbck2). Force means run the
1757   *              SCP even if it seems as though there might be an outstanding SCP running.
1758   * @return pid of scheduled SCP or {@link Procedure#NO_PROC_ID} if none scheduled.
1759   */
1760  public long submitServerCrash(ServerName serverName, boolean shouldSplitWal, boolean force) {
1761    // May be an 'Unknown Server' so handle case where serverNode is null.
1762    ServerStateNode serverNode = regionStates.getServerNode(serverName);
1763    // Remove the in-memory rsReports result
1764    synchronized (rsReports) {
1765      rsReports.remove(serverName);
1766    }
1767
1768    // We hold the write lock here for fencing on reportRegionStateTransition. Once we set the
1769    // server state to CRASHED, we will no longer accept the reportRegionStateTransition call from
1770    // this server. This is used to simplify the implementation for TRSP and SCP, where we can make
1771    // sure that, the region list fetched by SCP will not be changed any more.
1772    if (serverNode != null) {
1773      serverNode.writeLock().lock();
1774    }
1775    boolean carryingMeta;
1776    long pid;
1777    try {
1778      ProcedureExecutor<MasterProcedureEnv> procExec = this.master.getMasterProcedureExecutor();
1779      carryingMeta = isCarryingMeta(serverName);
1780      if (!force && serverNode != null && !serverNode.isInState(ServerState.ONLINE)) {
1781        LOG.info("Skip adding ServerCrashProcedure for {} (meta={}) -- running?", serverNode,
1782          carryingMeta);
1783        return Procedure.NO_PROC_ID;
1784      } else {
1785        MasterProcedureEnv mpe = procExec.getEnvironment();
1786        // If serverNode == null, then 'Unknown Server'. Schedule HBCKSCP instead.
1787        // HBCKSCP scours Master in-memory state AND hbase;meta for references to
1788        // serverName just-in-case. An SCP that is scheduled when the server is
1789        // 'Unknown' probably originated externally with HBCK2 fix-it tool.
1790        ServerState oldState = null;
1791        if (serverNode != null) {
1792          oldState = serverNode.getState();
1793          serverNode.setState(ServerState.CRASHED);
1794        }
1795
1796        if (force) {
1797          pid = procExec.submitProcedure(
1798            new HBCKServerCrashProcedure(mpe, serverName, shouldSplitWal, carryingMeta));
1799        } else {
1800          pid = procExec.submitProcedure(
1801            new ServerCrashProcedure(mpe, serverName, shouldSplitWal, carryingMeta));
1802        }
1803        LOG.info("Scheduled ServerCrashProcedure pid={} for {} (carryingMeta={}){}.", pid,
1804          serverName, carryingMeta,
1805          serverNode == null ? "" : " " + serverNode.toString() + ", oldState=" + oldState);
1806      }
1807    } finally {
1808      if (serverNode != null) {
1809        serverNode.writeLock().unlock();
1810      }
1811    }
1812    return pid;
1813  }
1814
1815  public void offlineRegion(final RegionInfo regionInfo) {
1816    // TODO used by MasterRpcServices
1817    RegionStateNode node = regionStates.getRegionStateNode(regionInfo);
1818    if (node != null) {
1819      node.offline();
1820    }
1821  }
1822
1823  public void onlineRegion(final RegionInfo regionInfo, final ServerName serverName) {
1824    // TODO used by TestSplitTransactionOnCluster.java
1825  }
1826
1827  public Map<ServerName, List<RegionInfo>>
1828    getSnapShotOfAssignment(final Collection<RegionInfo> regions) {
1829    return regionStates.getSnapShotOfAssignment(regions);
1830  }
1831
1832  // ============================================================================================
1833  // TODO: UTILS/HELPERS?
1834  // ============================================================================================
1835  /**
1836   * Used by the client (via master) to identify if all regions have the schema updates
1837   * @return Pair indicating the status of the alter command (pending/total)
1838   */
1839  public Pair<Integer, Integer> getReopenStatus(TableName tableName) {
1840    if (isTableDisabled(tableName)) {
1841      return new Pair<Integer, Integer>(0, 0);
1842    }
1843
1844    final List<RegionState> states = regionStates.getTableRegionStates(tableName);
1845    int ritCount = 0;
1846    for (RegionState regionState : states) {
1847      if (!regionState.isOpened() && !regionState.isSplit()) {
1848        ritCount++;
1849      }
1850    }
1851    return new Pair<Integer, Integer>(ritCount, states.size());
1852  }
1853
1854  // ============================================================================================
1855  // TODO: Region State In Transition
1856  // ============================================================================================
1857  public boolean hasRegionsInTransition() {
1858    return regionStates.hasRegionsInTransition();
1859  }
1860
1861  public List<RegionStateNode> getRegionsInTransition() {
1862    return regionStates.getRegionsInTransition();
1863  }
1864
1865  public List<RegionInfo> getAssignedRegions() {
1866    return regionStates.getAssignedRegions();
1867  }
1868
1869  public RegionInfo getRegionInfo(final byte[] regionName) {
1870    final RegionStateNode regionState = regionStates.getRegionStateNodeFromName(regionName);
1871    return regionState != null ? regionState.getRegionInfo() : null;
1872  }
1873
1874  // ============================================================================================
1875  // Expected states on region state transition.
1876  // Notice that there is expected states for transiting to OPENING state, this is because SCP.
1877  // See the comments in regionOpening method for more details.
1878  // ============================================================================================
1879  private static final State[] STATES_EXPECTED_ON_OPEN = { State.OPENING, // Normal case
1880    State.OPEN // Retrying
1881  };
1882
1883  private static final State[] STATES_EXPECTED_ON_CLOSING = { State.OPEN, // Normal case
1884    State.CLOSING, // Retrying
1885    State.SPLITTING, // Offline the split parent
1886    State.MERGING // Offline the merge parents
1887  };
1888
1889  private static final State[] STATES_EXPECTED_ON_CLOSED = { State.CLOSING, // Normal case
1890    State.CLOSED // Retrying
1891  };
1892
1893  // This is for manually scheduled region assign, can add other states later if we find out other
1894  // usages
1895  private static final State[] STATES_EXPECTED_ON_ASSIGN = { State.CLOSED, State.OFFLINE };
1896
1897  // We only allow unassign or move a region which is in OPEN state.
1898  private static final State[] STATES_EXPECTED_ON_UNASSIGN_OR_MOVE = { State.OPEN };
1899
1900  // ============================================================================================
1901  // Region Status update
1902  // Should only be called in TransitRegionStateProcedure(and related procedures), as the locking
1903  // and pre-assumptions are very tricky.
1904  // ============================================================================================
1905  private void transitStateAndUpdate(RegionStateNode regionNode, RegionState.State newState,
1906    RegionState.State... expectedStates) throws IOException {
1907    RegionState.State state = regionNode.getState();
1908    regionNode.transitionState(newState, expectedStates);
1909    boolean succ = false;
1910    try {
1911      regionStateStore.updateRegionLocation(regionNode);
1912      succ = true;
1913    } finally {
1914      if (!succ) {
1915        // revert
1916        regionNode.setState(state);
1917      }
1918    }
1919  }
1920
1921  // should be called within the synchronized block of RegionStateNode
1922  void regionOpening(RegionStateNode regionNode) throws IOException {
1923    // As in SCP, for performance reason, there is no TRSP attached with this region, we will not
1924    // update the region state, which means that the region could be in any state when we want to
1925    // assign it after a RS crash. So here we do not pass the expectedStates parameter.
1926    transitStateAndUpdate(regionNode, State.OPENING);
1927    regionStates.addRegionToServer(regionNode);
1928    // update the operation count metrics
1929    metrics.incrementOperationCounter();
1930  }
1931
1932  // should be called under the RegionStateNode lock
1933  // The parameter 'giveUp' means whether we will try to open the region again, if it is true, then
1934  // we will persist the FAILED_OPEN state into hbase:meta.
1935  void regionFailedOpen(RegionStateNode regionNode, boolean giveUp) throws IOException {
1936    RegionState.State state = regionNode.getState();
1937    ServerName regionLocation = regionNode.getRegionLocation();
1938    if (giveUp) {
1939      regionNode.setState(State.FAILED_OPEN);
1940      regionNode.setRegionLocation(null);
1941      boolean succ = false;
1942      try {
1943        regionStateStore.updateRegionLocation(regionNode);
1944        succ = true;
1945      } finally {
1946        if (!succ) {
1947          // revert
1948          regionNode.setState(state);
1949          regionNode.setRegionLocation(regionLocation);
1950        }
1951      }
1952    }
1953    if (regionLocation != null) {
1954      regionStates.removeRegionFromServer(regionLocation, regionNode);
1955    }
1956  }
1957
1958  // should be called under the RegionStateNode lock
1959  void regionClosing(RegionStateNode regionNode) throws IOException {
1960    transitStateAndUpdate(regionNode, State.CLOSING, STATES_EXPECTED_ON_CLOSING);
1961
1962    RegionInfo hri = regionNode.getRegionInfo();
1963    // Set meta has not initialized early. so people trying to create/edit tables will wait
1964    if (isMetaRegion(hri)) {
1965      setMetaAssigned(hri, false);
1966    }
1967    regionStates.addRegionToServer(regionNode);
1968    // update the operation count metrics
1969    metrics.incrementOperationCounter();
1970  }
1971
1972  // for open and close, they will first be persist to the procedure store in
1973  // RegionRemoteProcedureBase. So here we will first change the in memory state as it is considered
1974  // as succeeded if the persistence to procedure store is succeeded, and then when the
1975  // RegionRemoteProcedureBase is woken up, we will persist the RegionStateNode to hbase:meta.
1976
1977  // should be called under the RegionStateNode lock
1978  void regionOpenedWithoutPersistingToMeta(RegionStateNode regionNode) throws IOException {
1979    regionNode.transitionState(State.OPEN, STATES_EXPECTED_ON_OPEN);
1980    RegionInfo regionInfo = regionNode.getRegionInfo();
1981    regionStates.addRegionToServer(regionNode);
1982    regionStates.removeFromFailedOpen(regionInfo);
1983  }
1984
1985  // should be called under the RegionStateNode lock
1986  void regionClosedWithoutPersistingToMeta(RegionStateNode regionNode) throws IOException {
1987    ServerName regionLocation = regionNode.getRegionLocation();
1988    regionNode.transitionState(State.CLOSED, STATES_EXPECTED_ON_CLOSED);
1989    regionNode.setRegionLocation(null);
1990    if (regionLocation != null) {
1991      regionNode.setLastHost(regionLocation);
1992      regionStates.removeRegionFromServer(regionLocation, regionNode);
1993    }
1994  }
1995
1996  // should be called under the RegionStateNode lock
1997  // for SCP
1998  public void regionClosedAbnormally(RegionStateNode regionNode) throws IOException {
1999    RegionState.State state = regionNode.getState();
2000    ServerName regionLocation = regionNode.getRegionLocation();
2001    regionNode.transitionState(State.ABNORMALLY_CLOSED);
2002    regionNode.setRegionLocation(null);
2003    boolean succ = false;
2004    try {
2005      regionStateStore.updateRegionLocation(regionNode);
2006      succ = true;
2007    } finally {
2008      if (!succ) {
2009        // revert
2010        regionNode.setState(state);
2011        regionNode.setRegionLocation(regionLocation);
2012      }
2013    }
2014    if (regionLocation != null) {
2015      regionNode.setLastHost(regionLocation);
2016      regionStates.removeRegionFromServer(regionLocation, regionNode);
2017    }
2018  }
2019
2020  void persistToMeta(RegionStateNode regionNode) throws IOException {
2021    regionStateStore.updateRegionLocation(regionNode);
2022    RegionInfo regionInfo = regionNode.getRegionInfo();
2023    if (isMetaRegion(regionInfo) && regionNode.getState() == State.OPEN) {
2024      // Usually we'd set a table ENABLED at this stage but hbase:meta is ALWAYs enabled, it
2025      // can't be disabled -- so skip the RPC (besides... enabled is managed by TableStateManager
2026      // which is backed by hbase:meta... Avoid setting ENABLED to avoid having to update state
2027      // on table that contains state.
2028      setMetaAssigned(regionInfo, true);
2029    }
2030  }
2031
2032  // ============================================================================================
2033  // The above methods can only be called in TransitRegionStateProcedure(and related procedures)
2034  // ============================================================================================
2035
2036  public void markRegionAsSplit(final RegionInfo parent, final ServerName serverName,
2037    final RegionInfo daughterA, final RegionInfo daughterB) throws IOException {
2038    // Update hbase:meta. Parent will be marked offline and split up in hbase:meta.
2039    // The parent stays in regionStates until cleared when removed by CatalogJanitor.
2040    // Update its state in regionStates to it shows as offline and split when read
2041    // later figuring what regions are in a table and what are not: see
2042    // regionStates#getRegionsOfTable
2043    final RegionStateNode node = regionStates.getOrCreateRegionStateNode(parent);
2044    node.setState(State.SPLIT);
2045    final RegionStateNode nodeA = regionStates.getOrCreateRegionStateNode(daughterA);
2046    nodeA.setState(State.SPLITTING_NEW);
2047    final RegionStateNode nodeB = regionStates.getOrCreateRegionStateNode(daughterB);
2048    nodeB.setState(State.SPLITTING_NEW);
2049
2050    // TODO: here we just update the parent region info in meta, to set split and offline to true,
2051    // without changing the one in the region node. This is a bit confusing but the region info
2052    // field in RegionStateNode is not expected to be changed in the current design. Need to find a
2053    // possible way to address this problem, or at least adding more comments about the trick to
2054    // deal with this problem, that when you want to filter out split parent, you need to check both
2055    // the RegionState on whether it is split, and also the region info. If one of them matches then
2056    // it is a split parent. And usually only one of them can match, as after restart, the region
2057    // state will be changed from SPLIT to CLOSED.
2058    regionStateStore.splitRegion(parent, daughterA, daughterB, serverName);
2059    if (shouldAssignFavoredNodes(parent)) {
2060      List<ServerName> onlineServers = this.master.getServerManager().getOnlineServersList();
2061      ((FavoredNodesPromoter) getBalancer()).generateFavoredNodesForDaughter(onlineServers, parent,
2062        daughterA, daughterB);
2063    }
2064  }
2065
2066  /**
2067   * When called here, the merge has happened. The merged regions have been unassigned and the above
2068   * markRegionClosed has been called on each so they have been disassociated from a hosting Server.
2069   * The merged region will be open after this call. The merged regions are removed from hbase:meta
2070   * below. Later they are deleted from the filesystem by the catalog janitor running against
2071   * hbase:meta. It notices when the merged region no longer holds references to the old regions
2072   * (References are deleted after a compaction rewrites what the Reference points at but not until
2073   * the archiver chore runs, are the References removed).
2074   */
2075  public void markRegionAsMerged(final RegionInfo child, final ServerName serverName,
2076    RegionInfo[] mergeParents) throws IOException {
2077    final RegionStateNode node = regionStates.getOrCreateRegionStateNode(child);
2078    node.setState(State.MERGED);
2079    for (RegionInfo ri : mergeParents) {
2080      regionStates.deleteRegion(ri);
2081
2082    }
2083    regionStateStore.mergeRegions(child, mergeParents, serverName);
2084    if (shouldAssignFavoredNodes(child)) {
2085      ((FavoredNodesPromoter) getBalancer()).generateFavoredNodesForMergedRegion(child,
2086        mergeParents);
2087    }
2088  }
2089
2090  /*
2091   * Favored nodes should be applied only when FavoredNodes balancer is configured and the region
2092   * belongs to a non-system table.
2093   */
2094  private boolean shouldAssignFavoredNodes(RegionInfo region) {
2095    return this.shouldAssignRegionsWithFavoredNodes
2096      && FavoredNodesManager.isFavoredNodeApplicable(region);
2097  }
2098
2099  // ============================================================================================
2100  // Assign Queue (Assign/Balance)
2101  // ============================================================================================
2102  private final ArrayList<RegionStateNode> pendingAssignQueue = new ArrayList<RegionStateNode>();
2103  private final ReentrantLock assignQueueLock = new ReentrantLock();
2104  private final Condition assignQueueFullCond = assignQueueLock.newCondition();
2105
2106  /**
2107   * Add the assign operation to the assignment queue. The pending assignment operation will be
2108   * processed, and each region will be assigned by a server using the balancer.
2109   */
2110  protected void queueAssign(final RegionStateNode regionNode) {
2111    regionNode.getProcedureEvent().suspend();
2112
2113    // TODO: quick-start for meta and the other sys-tables?
2114    assignQueueLock.lock();
2115    try {
2116      pendingAssignQueue.add(regionNode);
2117      if (
2118        regionNode.isSystemTable() || pendingAssignQueue.size() == 1
2119          || pendingAssignQueue.size() >= assignDispatchWaitQueueMaxSize
2120      ) {
2121        assignQueueFullCond.signal();
2122      }
2123    } finally {
2124      assignQueueLock.unlock();
2125    }
2126  }
2127
2128  private void startAssignmentThread() {
2129    assignThread = new Thread(master.getServerName().toShortString()) {
2130      @Override
2131      public void run() {
2132        while (isRunning()) {
2133          processAssignQueue();
2134        }
2135        pendingAssignQueue.clear();
2136      }
2137    };
2138    assignThread.setDaemon(true);
2139    assignThread.start();
2140  }
2141
2142  private void stopAssignmentThread() {
2143    assignQueueSignal();
2144    try {
2145      while (assignThread.isAlive()) {
2146        assignQueueSignal();
2147        assignThread.join(250);
2148      }
2149    } catch (InterruptedException e) {
2150      LOG.warn("join interrupted", e);
2151      Thread.currentThread().interrupt();
2152    }
2153  }
2154
2155  private void assignQueueSignal() {
2156    assignQueueLock.lock();
2157    try {
2158      assignQueueFullCond.signal();
2159    } finally {
2160      assignQueueLock.unlock();
2161    }
2162  }
2163
2164  @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP")
2165  private HashMap<RegionInfo, RegionStateNode> waitOnAssignQueue() {
2166    HashMap<RegionInfo, RegionStateNode> regions = null;
2167
2168    assignQueueLock.lock();
2169    try {
2170      if (pendingAssignQueue.isEmpty() && isRunning()) {
2171        assignQueueFullCond.await();
2172      }
2173
2174      if (!isRunning()) {
2175        return null;
2176      }
2177      assignQueueFullCond.await(assignDispatchWaitMillis, TimeUnit.MILLISECONDS);
2178      regions = new HashMap<RegionInfo, RegionStateNode>(pendingAssignQueue.size());
2179      for (RegionStateNode regionNode : pendingAssignQueue) {
2180        regions.put(regionNode.getRegionInfo(), regionNode);
2181      }
2182      pendingAssignQueue.clear();
2183    } catch (InterruptedException e) {
2184      LOG.warn("got interrupted ", e);
2185      Thread.currentThread().interrupt();
2186    } finally {
2187      assignQueueLock.unlock();
2188    }
2189    return regions;
2190  }
2191
2192  private void processAssignQueue() {
2193    final HashMap<RegionInfo, RegionStateNode> regions = waitOnAssignQueue();
2194    if (regions == null || regions.size() == 0 || !isRunning()) {
2195      return;
2196    }
2197
2198    if (LOG.isTraceEnabled()) {
2199      LOG.trace("PROCESS ASSIGN QUEUE regionCount=" + regions.size());
2200    }
2201
2202    // TODO: Optimize balancer. pass a RegionPlan?
2203    final HashMap<RegionInfo, ServerName> retainMap = new HashMap<>();
2204    final List<RegionInfo> userHRIs = new ArrayList<>(regions.size());
2205    // Regions for system tables requiring reassignment
2206    final List<RegionInfo> systemHRIs = new ArrayList<>();
2207    for (RegionStateNode regionStateNode : regions.values()) {
2208      boolean sysTable = regionStateNode.isSystemTable();
2209      final List<RegionInfo> hris = sysTable ? systemHRIs : userHRIs;
2210      if (regionStateNode.getRegionLocation() != null) {
2211        retainMap.put(regionStateNode.getRegionInfo(), regionStateNode.getRegionLocation());
2212      } else {
2213        hris.add(regionStateNode.getRegionInfo());
2214      }
2215    }
2216
2217    // TODO: connect with the listener to invalidate the cache
2218
2219    // TODO use events
2220    List<ServerName> servers = master.getServerManager().createDestinationServersList();
2221    for (int i = 0; servers.size() < 1; ++i) {
2222      // Report every fourth time around this loop; try not to flood log.
2223      if (i % 4 == 0) {
2224        LOG.warn("No servers available; cannot place " + regions.size() + " unassigned regions.");
2225      }
2226
2227      if (!isRunning()) {
2228        LOG.debug("Stopped! Dropping assign of " + regions.size() + " queued regions.");
2229        return;
2230      }
2231      Threads.sleep(250);
2232      servers = master.getServerManager().createDestinationServersList();
2233    }
2234
2235    if (!systemHRIs.isEmpty()) {
2236      // System table regions requiring reassignment are present, get region servers
2237      // not available for system table regions
2238      final List<ServerName> excludeServers = getExcludedServersForSystemTable();
2239      List<ServerName> serversForSysTables =
2240        servers.stream().filter(s -> !excludeServers.contains(s)).collect(Collectors.toList());
2241      if (serversForSysTables.isEmpty()) {
2242        LOG.warn("Filtering old server versions and the excluded produced an empty set; "
2243          + "instead considering all candidate servers!");
2244      }
2245      LOG.debug("Processing assignQueue; systemServersCount=" + serversForSysTables.size()
2246        + ", allServersCount=" + servers.size());
2247      processAssignmentPlans(regions, null, systemHRIs,
2248        serversForSysTables.isEmpty() && !containsBogusAssignments(regions, systemHRIs)
2249          ? servers
2250          : serversForSysTables);
2251    }
2252
2253    processAssignmentPlans(regions, retainMap, userHRIs, servers);
2254  }
2255
2256  private boolean containsBogusAssignments(Map<RegionInfo, RegionStateNode> regions,
2257    List<RegionInfo> hirs) {
2258    for (RegionInfo ri : hirs) {
2259      if (
2260        regions.get(ri).getRegionLocation() != null
2261          && regions.get(ri).getRegionLocation().equals(LoadBalancer.BOGUS_SERVER_NAME)
2262      ) {
2263        return true;
2264      }
2265    }
2266    return false;
2267  }
2268
2269  private void processAssignmentPlans(final HashMap<RegionInfo, RegionStateNode> regions,
2270    final HashMap<RegionInfo, ServerName> retainMap, final List<RegionInfo> hris,
2271    final List<ServerName> servers) {
2272    boolean isTraceEnabled = LOG.isTraceEnabled();
2273    if (isTraceEnabled) {
2274      LOG.trace("Available servers count=" + servers.size() + ": " + servers);
2275    }
2276
2277    final LoadBalancer balancer = getBalancer();
2278    // ask the balancer where to place regions
2279    if (retainMap != null && !retainMap.isEmpty()) {
2280      if (isTraceEnabled) {
2281        LOG.trace("retain assign regions=" + retainMap);
2282      }
2283      try {
2284        acceptPlan(regions, balancer.retainAssignment(retainMap, servers));
2285      } catch (HBaseIOException e) {
2286        LOG.warn("unable to retain assignment", e);
2287        addToPendingAssignment(regions, retainMap.keySet());
2288      }
2289    }
2290
2291    // TODO: Do we need to split retain and round-robin?
2292    // the retain seems to fallback to round-robin/random if the region is not in the map.
2293    if (!hris.isEmpty()) {
2294      Collections.sort(hris, RegionInfo.COMPARATOR);
2295      if (isTraceEnabled) {
2296        LOG.trace("round robin regions=" + hris);
2297      }
2298      try {
2299        acceptPlan(regions, balancer.roundRobinAssignment(hris, servers));
2300      } catch (HBaseIOException e) {
2301        LOG.warn("unable to round-robin assignment", e);
2302        addToPendingAssignment(regions, hris);
2303      }
2304    }
2305  }
2306
2307  private void acceptPlan(final HashMap<RegionInfo, RegionStateNode> regions,
2308    final Map<ServerName, List<RegionInfo>> plan) throws HBaseIOException {
2309    final ProcedureEvent<?>[] events = new ProcedureEvent[regions.size()];
2310    final long st = EnvironmentEdgeManager.currentTime();
2311
2312    if (plan.isEmpty()) {
2313      throw new HBaseIOException("unable to compute plans for regions=" + regions.size());
2314    }
2315
2316    int evcount = 0;
2317    for (Map.Entry<ServerName, List<RegionInfo>> entry : plan.entrySet()) {
2318      final ServerName server = entry.getKey();
2319      for (RegionInfo hri : entry.getValue()) {
2320        final RegionStateNode regionNode = regions.get(hri);
2321        regionNode.setRegionLocation(server);
2322        if (server.equals(LoadBalancer.BOGUS_SERVER_NAME) && regionNode.isSystemTable()) {
2323          assignQueueLock.lock();
2324          try {
2325            pendingAssignQueue.add(regionNode);
2326          } finally {
2327            assignQueueLock.unlock();
2328          }
2329        } else {
2330          events[evcount++] = regionNode.getProcedureEvent();
2331        }
2332      }
2333    }
2334    ProcedureEvent.wakeEvents(getProcedureScheduler(), events);
2335
2336    final long et = EnvironmentEdgeManager.currentTime();
2337    if (LOG.isTraceEnabled()) {
2338      LOG.trace("ASSIGN ACCEPT " + events.length + " -> " + StringUtils.humanTimeDiff(et - st));
2339    }
2340  }
2341
2342  private void addToPendingAssignment(final HashMap<RegionInfo, RegionStateNode> regions,
2343    final Collection<RegionInfo> pendingRegions) {
2344    assignQueueLock.lock();
2345    try {
2346      for (RegionInfo hri : pendingRegions) {
2347        pendingAssignQueue.add(regions.get(hri));
2348      }
2349    } finally {
2350      assignQueueLock.unlock();
2351    }
2352  }
2353
2354  /**
2355   * For a given cluster with mixed versions of servers, get a list of servers with lower versions,
2356   * where system table regions should not be assigned to. For system table, we must assign regions
2357   * to a server with highest version. However, we can disable this exclusion using config:
2358   * "hbase.min.version.move.system.tables" if checkForMinVersion is true. Detailed explanation
2359   * available with definition of minVersionToMoveSysTables.
2360   * @return List of Excluded servers for System table regions.
2361   */
2362  public List<ServerName> getExcludedServersForSystemTable() {
2363    // TODO: This should be a cached list kept by the ServerManager rather than calculated on each
2364    // move or system region assign. The RegionServerTracker keeps list of online Servers with
2365    // RegionServerInfo that includes Version.
2366    List<Pair<ServerName, String>> serverList =
2367      master.getServerManager().getOnlineServersList().stream()
2368        .map(s -> new Pair<>(s, master.getRegionServerVersion(s))).collect(Collectors.toList());
2369    if (serverList.isEmpty()) {
2370      return new ArrayList<>();
2371    }
2372    String highestVersion = Collections
2373      .max(serverList, (o1, o2) -> VersionInfo.compareVersion(o1.getSecond(), o2.getSecond()))
2374      .getSecond();
2375    if (!DEFAULT_MIN_VERSION_MOVE_SYS_TABLES_CONFIG.equals(minVersionToMoveSysTables)) {
2376      int comparedValue = VersionInfo.compareVersion(minVersionToMoveSysTables, highestVersion);
2377      if (comparedValue > 0) {
2378        return new ArrayList<>();
2379      }
2380    }
2381    return serverList.stream().filter(pair -> !pair.getSecond().equals(highestVersion))
2382      .map(Pair::getFirst).collect(Collectors.toList());
2383  }
2384
2385  MasterServices getMaster() {
2386    return master;
2387  }
2388
2389  /** Returns a snapshot of rsReports */
2390  public Map<ServerName, Set<byte[]>> getRSReports() {
2391    Map<ServerName, Set<byte[]>> rsReportsSnapshot = new HashMap<>();
2392    synchronized (rsReports) {
2393      rsReports.entrySet().forEach(e -> rsReportsSnapshot.put(e.getKey(), e.getValue()));
2394    }
2395    return rsReportsSnapshot;
2396  }
2397
2398  /**
2399   * Provide regions state count for given table. e.g howmany regions of give table are
2400   * opened/closed/rit etc
2401   * @param tableName TableName
2402   * @return region states count
2403   */
2404  public RegionStatesCount getRegionStatesCount(TableName tableName) {
2405    int openRegionsCount = 0;
2406    int closedRegionCount = 0;
2407    int ritCount = 0;
2408    int splitRegionCount = 0;
2409    int totalRegionCount = 0;
2410    if (!isTableDisabled(tableName)) {
2411      final List<RegionState> states = regionStates.getTableRegionStates(tableName);
2412      for (RegionState regionState : states) {
2413        if (regionState.isOpened()) {
2414          openRegionsCount++;
2415        } else if (regionState.isClosed()) {
2416          closedRegionCount++;
2417        } else if (regionState.isSplit()) {
2418          splitRegionCount++;
2419        }
2420      }
2421      totalRegionCount = states.size();
2422      ritCount = totalRegionCount - openRegionsCount - splitRegionCount;
2423    }
2424    return new RegionStatesCount.RegionStatesCountBuilder().setOpenRegions(openRegionsCount)
2425      .setClosedRegions(closedRegionCount).setSplitRegions(splitRegionCount)
2426      .setRegionsInTransition(ritCount).setTotalRegions(totalRegionCount).build();
2427  }
2428
2429}