001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.assignment;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Collection;
023import java.util.Collections;
024import java.util.HashMap;
025import java.util.HashSet;
026import java.util.List;
027import java.util.Map;
028import java.util.Set;
029import java.util.concurrent.Future;
030import java.util.concurrent.TimeUnit;
031import java.util.concurrent.atomic.AtomicBoolean;
032import java.util.concurrent.locks.Condition;
033import java.util.concurrent.locks.ReentrantLock;
034import java.util.stream.Collectors;
035import java.util.stream.Stream;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.hbase.CatalogFamilyFormat;
038import org.apache.hadoop.hbase.DoNotRetryIOException;
039import org.apache.hadoop.hbase.HBaseIOException;
040import org.apache.hadoop.hbase.HConstants;
041import org.apache.hadoop.hbase.PleaseHoldException;
042import org.apache.hadoop.hbase.ServerName;
043import org.apache.hadoop.hbase.TableName;
044import org.apache.hadoop.hbase.UnknownRegionException;
045import org.apache.hadoop.hbase.client.DoNotRetryRegionException;
046import org.apache.hadoop.hbase.client.MasterSwitchType;
047import org.apache.hadoop.hbase.client.RegionInfo;
048import org.apache.hadoop.hbase.client.RegionInfoBuilder;
049import org.apache.hadoop.hbase.client.RegionReplicaUtil;
050import org.apache.hadoop.hbase.client.RegionStatesCount;
051import org.apache.hadoop.hbase.client.Result;
052import org.apache.hadoop.hbase.client.ResultScanner;
053import org.apache.hadoop.hbase.client.Scan;
054import org.apache.hadoop.hbase.client.TableDescriptor;
055import org.apache.hadoop.hbase.client.TableState;
056import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
057import org.apache.hadoop.hbase.favored.FavoredNodesManager;
058import org.apache.hadoop.hbase.favored.FavoredNodesPromoter;
059import org.apache.hadoop.hbase.master.LoadBalancer;
060import org.apache.hadoop.hbase.master.MasterServices;
061import org.apache.hadoop.hbase.master.MetricsAssignmentManager;
062import org.apache.hadoop.hbase.master.RegionPlan;
063import org.apache.hadoop.hbase.master.RegionState;
064import org.apache.hadoop.hbase.master.RegionState.State;
065import org.apache.hadoop.hbase.master.ServerManager;
066import org.apache.hadoop.hbase.master.TableStateManager;
067import org.apache.hadoop.hbase.master.balancer.FavoredStochasticBalancer;
068import org.apache.hadoop.hbase.master.procedure.HBCKServerCrashProcedure;
069import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
070import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler;
071import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait;
072import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
073import org.apache.hadoop.hbase.master.region.MasterRegion;
074import org.apache.hadoop.hbase.procedure2.Procedure;
075import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
076import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
077import org.apache.hadoop.hbase.procedure2.ProcedureInMemoryChore;
078import org.apache.hadoop.hbase.procedure2.util.StringUtils;
079import org.apache.hadoop.hbase.regionserver.SequenceId;
080import org.apache.hadoop.hbase.rsgroup.RSGroupBasedLoadBalancer;
081import org.apache.hadoop.hbase.util.Bytes;
082import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
083import org.apache.hadoop.hbase.util.Pair;
084import org.apache.hadoop.hbase.util.Threads;
085import org.apache.hadoop.hbase.util.VersionInfo;
086import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
087import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
088import org.apache.yetus.audience.InterfaceAudience;
089import org.apache.zookeeper.KeeperException;
090import org.slf4j.Logger;
091import org.slf4j.LoggerFactory;
092
093import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
094import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
095import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
096import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
097import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
098
099/**
100 * The AssignmentManager is the coordinator for region assign/unassign operations.
101 * <ul>
102 * <li>In-memory states of regions and servers are stored in {@link RegionStates}.</li>
103 * <li>hbase:meta state updates are handled by {@link RegionStateStore}.</li>
104 * </ul>
105 * Regions are created by CreateTable, Split, Merge. Regions are deleted by DeleteTable, Split,
106 * Merge. Assigns are triggered by CreateTable, EnableTable, Split, Merge, ServerCrash. Unassigns
107 * are triggered by DisableTable, Split, Merge
108 */
109@InterfaceAudience.Private
110public class AssignmentManager {
111  private static final Logger LOG = LoggerFactory.getLogger(AssignmentManager.class);
112
113  // TODO: AMv2
114  // - handle region migration from hbase1 to hbase2.
115  // - handle sys table assignment first (e.g. acl, namespace)
116  // - handle table priorities
117  // - If ServerBusyException trying to update hbase:meta, we abort the Master
118  // See updateRegionLocation in RegionStateStore.
119  //
120  // See also
121  // https://docs.google.com/document/d/1eVKa7FHdeoJ1-9o8yZcOTAQbv0u0bblBlCCzVSIn69g/edit#heading=h.ystjyrkbtoq5
122  // for other TODOs.
123
124  public static final String BOOTSTRAP_THREAD_POOL_SIZE_CONF_KEY =
125    "hbase.assignment.bootstrap.thread.pool.size";
126
127  public static final String ASSIGN_DISPATCH_WAIT_MSEC_CONF_KEY =
128    "hbase.assignment.dispatch.wait.msec";
129  private static final int DEFAULT_ASSIGN_DISPATCH_WAIT_MSEC = 150;
130
131  public static final String ASSIGN_DISPATCH_WAITQ_MAX_CONF_KEY =
132    "hbase.assignment.dispatch.wait.queue.max.size";
133  private static final int DEFAULT_ASSIGN_DISPATCH_WAITQ_MAX = 100;
134
135  public static final String RIT_CHORE_INTERVAL_MSEC_CONF_KEY =
136    "hbase.assignment.rit.chore.interval.msec";
137  private static final int DEFAULT_RIT_CHORE_INTERVAL_MSEC = 60 * 1000;
138
139  public static final String DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC_CONF_KEY =
140    "hbase.assignment.dead.region.metric.chore.interval.msec";
141  private static final int DEFAULT_DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC = 120 * 1000;
142
143  public static final String ASSIGN_MAX_ATTEMPTS = "hbase.assignment.maximum.attempts";
144  private static final int DEFAULT_ASSIGN_MAX_ATTEMPTS = Integer.MAX_VALUE;
145
146  public static final String ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS =
147    "hbase.assignment.retry.immediately.maximum.attempts";
148  private static final int DEFAULT_ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS = 3;
149
150  /** Region in Transition metrics threshold time */
151  public static final String METRICS_RIT_STUCK_WARNING_THRESHOLD =
152    "hbase.metrics.rit.stuck.warning.threshold";
153  private static final int DEFAULT_RIT_STUCK_WARNING_THRESHOLD = 60 * 1000;
154  public static final String UNEXPECTED_STATE_REGION = "Unexpected state for ";
155
156  public static final String FORCE_REGION_RETAINMENT = "hbase.master.scp.retain.assignment.force";
157
158  public static final boolean DEFAULT_FORCE_REGION_RETAINMENT = false;
159
160  /** The wait time in millis before checking again if the region's previous RS is back online */
161  public static final String FORCE_REGION_RETAINMENT_WAIT_INTERVAL =
162    "hbase.master.scp.retain.assignment.force.wait-interval";
163
164  public static final long DEFAULT_FORCE_REGION_RETAINMENT_WAIT_INTERVAL = 50;
165
166  /**
167   * The number of times to check if the region's previous RS is back online, before giving up and
168   * proceeding with assignment on a new RS
169   */
170  public static final String FORCE_REGION_RETAINMENT_RETRIES =
171    "hbase.master.scp.retain.assignment.force.retries";
172
173  public static final int DEFAULT_FORCE_REGION_RETAINMENT_RETRIES = 600;
174
175  private final ProcedureEvent<?> metaAssignEvent = new ProcedureEvent<>("meta assign");
176  private final ProcedureEvent<?> metaLoadEvent = new ProcedureEvent<>("meta load");
177
178  private final MetricsAssignmentManager metrics;
179  private final RegionInTransitionChore ritChore;
180  private final DeadServerMetricRegionChore deadMetricChore;
181  private final MasterServices master;
182
183  private final AtomicBoolean running = new AtomicBoolean(false);
184  private final RegionStates regionStates = new RegionStates();
185  private final RegionStateStore regionStateStore;
186
187  /**
188   * When the operator uses this configuration option, any version between the current cluster
189   * version and the value of "hbase.min.version.move.system.tables" does not trigger any
190   * auto-region movement. Auto-region movement here refers to auto-migration of system table
191   * regions to newer server versions. It is assumed that the configured range of versions does not
192   * require special handling of moving system table regions to higher versioned RegionServer. This
193   * auto-migration is done by {@link #checkIfShouldMoveSystemRegionAsync()}. Example: Let's assume
194   * the cluster is on version 1.4.0 and we have set "hbase.min.version.move.system.tables" as
195   * "2.0.0". Now if we upgrade one RegionServer on 1.4.0 cluster to 1.6.0 (< 2.0.0), then
196   * AssignmentManager will not move hbase:meta, hbase:namespace and other system table regions to
197   * newly brought up RegionServer 1.6.0 as part of auto-migration. However, if we upgrade one
198   * RegionServer on 1.4.0 cluster to 2.2.0 (> 2.0.0), then AssignmentManager will move all system
199   * table regions to newly brought up RegionServer 2.2.0 as part of auto-migration done by
200   * {@link #checkIfShouldMoveSystemRegionAsync()}. "hbase.min.version.move.system.tables" is
201   * introduced as part of HBASE-22923.
202   */
203  private final String minVersionToMoveSysTables;
204
205  private static final String MIN_VERSION_MOVE_SYS_TABLES_CONFIG =
206    "hbase.min.version.move.system.tables";
207  private static final String DEFAULT_MIN_VERSION_MOVE_SYS_TABLES_CONFIG = "";
208
209  private final Map<ServerName, Set<byte[]>> rsReports = new HashMap<>();
210
211  private final boolean shouldAssignRegionsWithFavoredNodes;
212  private final int assignDispatchWaitQueueMaxSize;
213  private final int assignDispatchWaitMillis;
214  private final int assignMaxAttempts;
215  private final int assignRetryImmediatelyMaxAttempts;
216
217  private final MasterRegion masterRegion;
218
219  private final Object checkIfShouldMoveSystemRegionLock = new Object();
220
221  private Thread assignThread;
222
223  private final boolean forceRegionRetainment;
224
225  private final long forceRegionRetainmentWaitInterval;
226
227  private final int forceRegionRetainmentRetries;
228
229  public AssignmentManager(MasterServices master, MasterRegion masterRegion) {
230    this(master, masterRegion, new RegionStateStore(master, masterRegion));
231  }
232
233  AssignmentManager(MasterServices master, MasterRegion masterRegion, RegionStateStore stateStore) {
234    this.master = master;
235    this.regionStateStore = stateStore;
236    this.metrics = new MetricsAssignmentManager();
237    this.masterRegion = masterRegion;
238
239    final Configuration conf = master.getConfiguration();
240
241    // Only read favored nodes if using the favored nodes load balancer.
242    this.shouldAssignRegionsWithFavoredNodes = FavoredStochasticBalancer.class
243      .isAssignableFrom(conf.getClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, Object.class));
244
245    this.assignDispatchWaitMillis =
246      conf.getInt(ASSIGN_DISPATCH_WAIT_MSEC_CONF_KEY, DEFAULT_ASSIGN_DISPATCH_WAIT_MSEC);
247    this.assignDispatchWaitQueueMaxSize =
248      conf.getInt(ASSIGN_DISPATCH_WAITQ_MAX_CONF_KEY, DEFAULT_ASSIGN_DISPATCH_WAITQ_MAX);
249
250    this.assignMaxAttempts =
251      Math.max(1, conf.getInt(ASSIGN_MAX_ATTEMPTS, DEFAULT_ASSIGN_MAX_ATTEMPTS));
252    this.assignRetryImmediatelyMaxAttempts = conf.getInt(ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS,
253      DEFAULT_ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS);
254
255    int ritChoreInterval =
256      conf.getInt(RIT_CHORE_INTERVAL_MSEC_CONF_KEY, DEFAULT_RIT_CHORE_INTERVAL_MSEC);
257    this.ritChore = new RegionInTransitionChore(ritChoreInterval);
258
259    int deadRegionChoreInterval = conf.getInt(DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC_CONF_KEY,
260      DEFAULT_DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC);
261    if (deadRegionChoreInterval > 0) {
262      this.deadMetricChore = new DeadServerMetricRegionChore(deadRegionChoreInterval);
263    } else {
264      this.deadMetricChore = null;
265    }
266    minVersionToMoveSysTables =
267      conf.get(MIN_VERSION_MOVE_SYS_TABLES_CONFIG, DEFAULT_MIN_VERSION_MOVE_SYS_TABLES_CONFIG);
268
269    forceRegionRetainment =
270      conf.getBoolean(FORCE_REGION_RETAINMENT, DEFAULT_FORCE_REGION_RETAINMENT);
271    forceRegionRetainmentWaitInterval = conf.getLong(FORCE_REGION_RETAINMENT_WAIT_INTERVAL,
272      DEFAULT_FORCE_REGION_RETAINMENT_WAIT_INTERVAL);
273    forceRegionRetainmentRetries =
274      conf.getInt(FORCE_REGION_RETAINMENT_RETRIES, DEFAULT_FORCE_REGION_RETAINMENT_RETRIES);
275  }
276
277  private void mirrorMetaLocations() throws IOException, KeeperException {
278    // For compatibility, mirror the meta region state to zookeeper
279    // And we still need to use zookeeper to publish the meta region locations to region
280    // server, so they can serve as ClientMetaService
281    ZKWatcher zk = master.getZooKeeper();
282    if (zk == null || !zk.getRecoverableZooKeeper().getState().isAlive()) {
283      // this is possible in tests, we do not provide a zk watcher or the zk watcher has been closed
284      return;
285    }
286    Collection<RegionStateNode> metaStates = regionStates.getRegionStateNodes();
287    for (RegionStateNode metaState : metaStates) {
288      MetaTableLocator.setMetaLocation(zk, metaState.getRegionLocation(),
289        metaState.getRegionInfo().getReplicaId(), metaState.getState());
290    }
291    int replicaCount = metaStates.size();
292    // remove extra mirror locations
293    for (String znode : zk.getMetaReplicaNodes()) {
294      int replicaId = zk.getZNodePaths().getMetaReplicaIdFromZNode(znode);
295      if (replicaId >= replicaCount) {
296        MetaTableLocator.deleteMetaLocation(zk, replicaId);
297      }
298    }
299  }
300
301  public void start() throws IOException, KeeperException {
302    if (!running.compareAndSet(false, true)) {
303      return;
304    }
305
306    LOG.trace("Starting assignment manager");
307
308    // Start the Assignment Thread
309    startAssignmentThread();
310    // load meta region states.
311    // here we are still in the early steps of active master startup. There is only one thread(us)
312    // can access AssignmentManager and create region node, so here we do not need to lock the
313    // region node.
314    try (ResultScanner scanner =
315      masterRegion.getScanner(new Scan().addFamily(HConstants.CATALOG_FAMILY))) {
316      for (;;) {
317        Result result = scanner.next();
318        if (result == null) {
319          break;
320        }
321        RegionStateStore
322          .visitMetaEntry((r, regionInfo, state, regionLocation, lastHost, openSeqNum) -> {
323            RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo);
324            regionNode.setState(state);
325            regionNode.setLastHost(lastHost);
326            regionNode.setRegionLocation(regionLocation);
327            regionNode.setOpenSeqNum(openSeqNum);
328            if (regionNode.getProcedure() != null) {
329              regionNode.getProcedure().stateLoaded(this, regionNode);
330            }
331            if (regionLocation != null) {
332              regionStates.addRegionToServer(regionNode);
333            }
334            if (RegionReplicaUtil.isDefaultReplica(regionInfo.getReplicaId())) {
335              setMetaAssigned(regionInfo, state == State.OPEN);
336            }
337            LOG.debug("Loaded hbase:meta {}", regionNode);
338          }, result);
339      }
340    }
341    mirrorMetaLocations();
342  }
343
344  /**
345   * Create RegionStateNode based on the TRSP list, and attach the TRSP to the RegionStateNode.
346   * <p>
347   * This is used to restore the RIT region list, so we do not need to restore it in the loadingMeta
348   * method below. And it is also very important as now before submitting a TRSP, we need to attach
349   * it to the RegionStateNode, which acts like a guard, so we need to restore this information at
350   * the very beginning, before we start processing any procedures.
351   */
352  public void setupRIT(List<TransitRegionStateProcedure> procs) {
353    procs.forEach(proc -> {
354      RegionInfo regionInfo = proc.getRegion();
355      RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo);
356      TransitRegionStateProcedure existingProc = regionNode.getProcedure();
357      if (existingProc != null) {
358        // This is possible, as we will detach the procedure from the RSN before we
359        // actually finish the procedure. This is because that, we will detach the TRSP from the RSN
360        // during execution, at that time, the procedure has not been marked as done in the pv2
361        // framework yet, so it is possible that we schedule a new TRSP immediately and when
362        // arriving here, we will find out that there are multiple TRSPs for the region. But we can
363        // make sure that, only the last one can take the charge, the previous ones should have all
364        // been finished already. So here we will compare the proc id, the greater one will win.
365        if (existingProc.getProcId() < proc.getProcId()) {
366          // the new one wins, unset and set it to the new one below
367          regionNode.unsetProcedure(existingProc);
368        } else {
369          // the old one wins, skip
370          return;
371        }
372      }
373      LOG.info("Attach {} to {} to restore RIT", proc, regionNode);
374      regionNode.setProcedure(proc);
375    });
376  }
377
378  public void stop() {
379    if (!running.compareAndSet(true, false)) {
380      return;
381    }
382
383    LOG.info("Stopping assignment manager");
384
385    // The AM is started before the procedure executor,
386    // but the actual work will be loaded/submitted only once we have the executor
387    final boolean hasProcExecutor = master.getMasterProcedureExecutor() != null;
388
389    // Remove the RIT chore
390    if (hasProcExecutor) {
391      master.getMasterProcedureExecutor().removeChore(this.ritChore);
392      if (this.deadMetricChore != null) {
393        master.getMasterProcedureExecutor().removeChore(this.deadMetricChore);
394      }
395    }
396
397    // Stop the Assignment Thread
398    stopAssignmentThread();
399
400    // Stop the RegionStateStore
401    regionStates.clear();
402
403    // Update meta events (for testing)
404    if (hasProcExecutor) {
405      metaLoadEvent.suspend();
406      for (RegionInfo hri : getMetaRegionSet()) {
407        setMetaAssigned(hri, false);
408      }
409    }
410  }
411
412  public boolean isRunning() {
413    return running.get();
414  }
415
416  public Configuration getConfiguration() {
417    return master.getConfiguration();
418  }
419
420  public MetricsAssignmentManager getAssignmentManagerMetrics() {
421    return metrics;
422  }
423
424  private LoadBalancer getBalancer() {
425    return master.getLoadBalancer();
426  }
427
428  private FavoredNodesPromoter getFavoredNodePromoter() {
429    return (FavoredNodesPromoter) ((RSGroupBasedLoadBalancer) master.getLoadBalancer())
430      .getInternalBalancer();
431  }
432
433  private MasterProcedureEnv getProcedureEnvironment() {
434    return master.getMasterProcedureExecutor().getEnvironment();
435  }
436
437  private MasterProcedureScheduler getProcedureScheduler() {
438    return getProcedureEnvironment().getProcedureScheduler();
439  }
440
441  int getAssignMaxAttempts() {
442    return assignMaxAttempts;
443  }
444
445  public boolean isForceRegionRetainment() {
446    return forceRegionRetainment;
447  }
448
449  public long getForceRegionRetainmentWaitInterval() {
450    return forceRegionRetainmentWaitInterval;
451  }
452
453  public int getForceRegionRetainmentRetries() {
454    return forceRegionRetainmentRetries;
455  }
456
457  int getAssignRetryImmediatelyMaxAttempts() {
458    return assignRetryImmediatelyMaxAttempts;
459  }
460
461  public RegionStates getRegionStates() {
462    return regionStates;
463  }
464
465  /**
466   * Returns the regions hosted by the specified server.
467   * <p/>
468   * Notice that, for SCP, after we submit the SCP, no one can change the region list for the
469   * ServerStateNode so we do not need any locks here. And for other usage, this can only give you a
470   * snapshot of the current region list for this server, which means, right after you get the
471   * region list, new regions may be moved to this server or some regions may be moved out from this
472   * server, so you should not use it critically if you need strong consistency.
473   */
474  public List<RegionInfo> getRegionsOnServer(ServerName serverName) {
475    ServerStateNode serverInfo = regionStates.getServerNode(serverName);
476    if (serverInfo == null) {
477      return Collections.emptyList();
478    }
479    return serverInfo.getRegionInfoList();
480  }
481
482  private RegionInfo getRegionInfo(RegionStateNode rsn) {
483    if (rsn.isSplit() && !rsn.getRegionInfo().isSplit()) {
484      // see the comments in markRegionAsSplit on why we need to do this converting.
485      return RegionInfoBuilder.newBuilder(rsn.getRegionInfo()).setSplit(true).setOffline(true)
486        .build();
487    } else {
488      return rsn.getRegionInfo();
489    }
490  }
491
492  private Stream<RegionStateNode> getRegionStateNodes(TableName tableName,
493    boolean excludeOfflinedSplitParents) {
494    Stream<RegionStateNode> stream = regionStates.getTableRegionStateNodes(tableName).stream();
495    if (excludeOfflinedSplitParents) {
496      return stream.filter(rsn -> !rsn.isSplit());
497    } else {
498      return stream;
499    }
500  }
501
502  public List<RegionInfo> getTableRegions(TableName tableName,
503    boolean excludeOfflinedSplitParents) {
504    return getRegionStateNodes(tableName, excludeOfflinedSplitParents).map(this::getRegionInfo)
505      .collect(Collectors.toList());
506  }
507
508  public List<Pair<RegionInfo, ServerName>> getTableRegionsAndLocations(TableName tableName,
509    boolean excludeOfflinedSplitParents) {
510    return getRegionStateNodes(tableName, excludeOfflinedSplitParents)
511      .map(rsn -> Pair.newPair(getRegionInfo(rsn), rsn.getRegionLocation()))
512      .collect(Collectors.toList());
513  }
514
515  public RegionStateStore getRegionStateStore() {
516    return regionStateStore;
517  }
518
519  public List<ServerName> getFavoredNodes(final RegionInfo regionInfo) {
520    return this.shouldAssignRegionsWithFavoredNodes
521      ? getFavoredNodePromoter().getFavoredNodes(regionInfo)
522      : ServerName.EMPTY_SERVER_LIST;
523  }
524
525  // ============================================================================================
526  // Table State Manager helpers
527  // ============================================================================================
528  private TableStateManager getTableStateManager() {
529    return master.getTableStateManager();
530  }
531
532  private boolean isTableEnabled(final TableName tableName) {
533    return getTableStateManager().isTableState(tableName, TableState.State.ENABLED);
534  }
535
536  private boolean isTableDisabled(final TableName tableName) {
537    return getTableStateManager().isTableState(tableName, TableState.State.DISABLED,
538      TableState.State.DISABLING);
539  }
540
541  // ============================================================================================
542  // META Helpers
543  // ============================================================================================
544  private boolean isMetaRegion(final RegionInfo regionInfo) {
545    return regionInfo.isMetaRegion();
546  }
547
548  public boolean isMetaRegion(final byte[] regionName) {
549    return getMetaRegionFromName(regionName) != null;
550  }
551
552  public RegionInfo getMetaRegionFromName(final byte[] regionName) {
553    for (RegionInfo hri : getMetaRegionSet()) {
554      if (Bytes.equals(hri.getRegionName(), regionName)) {
555        return hri;
556      }
557    }
558    return null;
559  }
560
561  public boolean isCarryingMeta(final ServerName serverName) {
562    // TODO: handle multiple meta
563    return isCarryingRegion(serverName, RegionInfoBuilder.FIRST_META_REGIONINFO);
564  }
565
566  private boolean isCarryingRegion(final ServerName serverName, final RegionInfo regionInfo) {
567    // TODO: check for state?
568    final RegionStateNode node = regionStates.getRegionStateNode(regionInfo);
569    return (node != null && serverName.equals(node.getRegionLocation()));
570  }
571
572  private RegionInfo getMetaForRegion(final RegionInfo regionInfo) {
573    // if (regionInfo.isMetaRegion()) return regionInfo;
574    // TODO: handle multiple meta. if the region provided is not meta lookup
575    // which meta the region belongs to.
576    return RegionInfoBuilder.FIRST_META_REGIONINFO;
577  }
578
579  // TODO: handle multiple meta.
580  private static final Set<RegionInfo> META_REGION_SET =
581    Collections.singleton(RegionInfoBuilder.FIRST_META_REGIONINFO);
582
583  public Set<RegionInfo> getMetaRegionSet() {
584    return META_REGION_SET;
585  }
586
587  // ============================================================================================
588  // META Event(s) helpers
589  // ============================================================================================
590  /**
591   * Notice that, this only means the meta region is available on a RS, but the AM may still be
592   * loading the region states from meta, so usually you need to check {@link #isMetaLoaded()} first
593   * before checking this method, unless you can make sure that your piece of code can only be
594   * executed after AM builds the region states.
595   * @see #isMetaLoaded()
596   */
597  public boolean isMetaAssigned() {
598    return metaAssignEvent.isReady();
599  }
600
601  public boolean isMetaRegionInTransition() {
602    return !isMetaAssigned();
603  }
604
605  /**
606   * Notice that this event does not mean the AM has already finished region state rebuilding. See
607   * the comment of {@link #isMetaAssigned()} for more details.
608   * @see #isMetaAssigned()
609   */
610  public boolean waitMetaAssigned(Procedure<?> proc, RegionInfo regionInfo) {
611    return getMetaAssignEvent(getMetaForRegion(regionInfo)).suspendIfNotReady(proc);
612  }
613
614  private void setMetaAssigned(RegionInfo metaRegionInfo, boolean assigned) {
615    assert isMetaRegion(metaRegionInfo) : "unexpected non-meta region " + metaRegionInfo;
616    ProcedureEvent<?> metaAssignEvent = getMetaAssignEvent(metaRegionInfo);
617    if (assigned) {
618      metaAssignEvent.wake(getProcedureScheduler());
619    } else {
620      metaAssignEvent.suspend();
621    }
622  }
623
624  private ProcedureEvent<?> getMetaAssignEvent(RegionInfo metaRegionInfo) {
625    assert isMetaRegion(metaRegionInfo) : "unexpected non-meta region " + metaRegionInfo;
626    // TODO: handle multiple meta.
627    return metaAssignEvent;
628  }
629
630  /**
631   * Wait until AM finishes the meta loading, i.e, the region states rebuilding.
632   * @see #isMetaLoaded()
633   * @see #waitMetaAssigned(Procedure, RegionInfo)
634   */
635  public boolean waitMetaLoaded(Procedure<?> proc) {
636    return metaLoadEvent.suspendIfNotReady(proc);
637  }
638
639  /**
640   * This method will be called in master initialization method after calling
641   * {@link #processOfflineRegions()}, as in processOfflineRegions we will generate assign
642   * procedures for offline regions, which may be conflict with creating table.
643   * <p/>
644   * This is a bit dirty, should be reconsidered after we decide whether to keep the
645   * {@link #processOfflineRegions()} method.
646   */
647  public void wakeMetaLoadedEvent() {
648    metaLoadEvent.wake(getProcedureScheduler());
649    assert isMetaLoaded() : "expected meta to be loaded";
650  }
651
652  /**
653   * Return whether AM finishes the meta loading, i.e, the region states rebuilding.
654   * @see #isMetaAssigned()
655   * @see #waitMetaLoaded(Procedure)
656   */
657  public boolean isMetaLoaded() {
658    return metaLoadEvent.isReady();
659  }
660
661  /**
662   * Start a new thread to check if there are region servers whose versions are higher than others.
663   * If so, move all system table regions to RS with the highest version to keep compatibility. The
664   * reason is, RS in new version may not be able to access RS in old version when there are some
665   * incompatible changes.
666   * <p>
667   * This method is called when a new RegionServer is added to cluster only.
668   * </p>
669   */
670  public void checkIfShouldMoveSystemRegionAsync() {
671    // TODO: Fix this thread. If a server is killed and a new one started, this thread thinks that
672    // it should 'move' the system tables from the old server to the new server but
673    // ServerCrashProcedure is on it; and it will take care of the assign without dataloss.
674    if (this.master.getServerManager().countOfRegionServers() <= 1) {
675      return;
676    }
677    // This thread used to run whenever there was a change in the cluster. The ZooKeeper
678    // childrenChanged notification came in before the nodeDeleted message and so this method
679    // cold run before a ServerCrashProcedure could run. That meant that this thread could see
680    // a Crashed Server before ServerCrashProcedure and it could find system regions on the
681    // crashed server and go move them before ServerCrashProcedure had a chance; could be
682    // dataloss too if WALs were not recovered.
683    new Thread(() -> {
684      try {
685        synchronized (checkIfShouldMoveSystemRegionLock) {
686          List<RegionPlan> plans = new ArrayList<>();
687          // TODO: I don't think this code does a good job if all servers in cluster have same
688          // version. It looks like it will schedule unnecessary moves.
689          for (ServerName server : getExcludedServersForSystemTable()) {
690            if (master.getServerManager().isServerDead(server)) {
691              // TODO: See HBASE-18494 and HBASE-18495. Though getExcludedServersForSystemTable()
692              // considers only online servers, the server could be queued for dead server
693              // processing. As region assignments for crashed server is handled by
694              // ServerCrashProcedure, do NOT handle them here. The goal is to handle this through
695              // regular flow of LoadBalancer as a favored node and not to have this special
696              // handling.
697              continue;
698            }
699            List<RegionInfo> regionsShouldMove = getSystemTables(server);
700            if (!regionsShouldMove.isEmpty()) {
701              for (RegionInfo regionInfo : regionsShouldMove) {
702                // null value for dest forces destination server to be selected by balancer
703                RegionPlan plan = new RegionPlan(regionInfo, server, null);
704                if (regionInfo.isMetaRegion()) {
705                  // Must move meta region first.
706                  LOG.info("Async MOVE of {} to newer Server={}", regionInfo.getEncodedName(),
707                    server);
708                  moveAsync(plan);
709                } else {
710                  plans.add(plan);
711                }
712              }
713            }
714            for (RegionPlan plan : plans) {
715              LOG.info("Async MOVE of {} to newer Server={}", plan.getRegionInfo().getEncodedName(),
716                server);
717              moveAsync(plan);
718            }
719          }
720        }
721      } catch (Throwable t) {
722        LOG.error(t.toString(), t);
723      }
724    }).start();
725  }
726
727  private List<RegionInfo> getSystemTables(ServerName serverName) {
728    ServerStateNode serverNode = regionStates.getServerNode(serverName);
729    if (serverNode == null) {
730      return Collections.emptyList();
731    }
732    return serverNode.getSystemRegionInfoList();
733  }
734
735  private void preTransitCheck(RegionStateNode regionNode, RegionState.State[] expectedStates)
736    throws HBaseIOException {
737    if (regionNode.getProcedure() != null) {
738      throw new HBaseIOException(
739        regionNode + " is currently in transition; pid=" + regionNode.getProcedure().getProcId());
740    }
741    if (!regionNode.isInState(expectedStates)) {
742      throw new DoNotRetryRegionException(UNEXPECTED_STATE_REGION + regionNode);
743    }
744    if (isTableDisabled(regionNode.getTable())) {
745      throw new DoNotRetryIOException(regionNode.getTable() + " is disabled for " + regionNode);
746    }
747  }
748
749  /**
750   * Create an assign TransitRegionStateProcedure. Makes sure of RegionState. Throws exception if
751   * not appropriate UNLESS override is set. Used by hbck2 but also by straightline
752   * {@link #assign(RegionInfo, ServerName)} and {@link #assignAsync(RegionInfo, ServerName)}.
753   * @see #createAssignProcedure(RegionStateNode, ServerName) for a version that does NO checking
754   *      used when only when no checking needed.
755   * @param override If false, check RegionState is appropriate for assign; if not throw exception.
756   */
757  private TransitRegionStateProcedure createAssignProcedure(RegionInfo regionInfo, ServerName sn,
758    boolean override) throws IOException {
759    RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo);
760    regionNode.lock();
761    try {
762      if (override) {
763        if (regionNode.getProcedure() != null) {
764          regionNode.unsetProcedure(regionNode.getProcedure());
765        }
766      } else {
767        preTransitCheck(regionNode, STATES_EXPECTED_ON_ASSIGN);
768      }
769      assert regionNode.getProcedure() == null;
770      return regionNode.setProcedure(
771        TransitRegionStateProcedure.assign(getProcedureEnvironment(), regionInfo, sn));
772    } finally {
773      regionNode.unlock();
774    }
775  }
776
777  /**
778   * Create an assign TransitRegionStateProcedure. Does NO checking of RegionState. Presumes
779   * appriopriate state ripe for assign.
780   * @see #createAssignProcedure(RegionInfo, ServerName, boolean)
781   */
782  private TransitRegionStateProcedure createAssignProcedure(RegionStateNode regionNode,
783    ServerName targetServer) {
784    regionNode.lock();
785    try {
786      return regionNode.setProcedure(TransitRegionStateProcedure.assign(getProcedureEnvironment(),
787        regionNode.getRegionInfo(), targetServer));
788    } finally {
789      regionNode.unlock();
790    }
791  }
792
793  public long assign(RegionInfo regionInfo, ServerName sn) throws IOException {
794    TransitRegionStateProcedure proc = createAssignProcedure(regionInfo, sn, false);
795    ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc);
796    return proc.getProcId();
797  }
798
799  public long assign(RegionInfo regionInfo) throws IOException {
800    return assign(regionInfo, null);
801  }
802
803  /**
804   * Submits a procedure that assigns a region to a target server without waiting for it to finish
805   * @param regionInfo the region we would like to assign
806   * @param sn         target server name
807   */
808  public Future<byte[]> assignAsync(RegionInfo regionInfo, ServerName sn) throws IOException {
809    return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(),
810      createAssignProcedure(regionInfo, sn, false));
811  }
812
813  /**
814   * Submits a procedure that assigns a region without waiting for it to finish
815   * @param regionInfo the region we would like to assign
816   */
817  public Future<byte[]> assignAsync(RegionInfo regionInfo) throws IOException {
818    return assignAsync(regionInfo, null);
819  }
820
821  public long unassign(RegionInfo regionInfo) throws IOException {
822    RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo);
823    if (regionNode == null) {
824      throw new UnknownRegionException("No RegionState found for " + regionInfo.getEncodedName());
825    }
826    TransitRegionStateProcedure proc;
827    regionNode.lock();
828    try {
829      preTransitCheck(regionNode, STATES_EXPECTED_ON_UNASSIGN_OR_MOVE);
830      proc = TransitRegionStateProcedure.unassign(getProcedureEnvironment(), regionInfo);
831      regionNode.setProcedure(proc);
832    } finally {
833      regionNode.unlock();
834    }
835    ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc);
836    return proc.getProcId();
837  }
838
839  public TransitRegionStateProcedure createMoveRegionProcedure(RegionInfo regionInfo,
840    ServerName targetServer) throws HBaseIOException {
841    RegionStateNode regionNode = this.regionStates.getRegionStateNode(regionInfo);
842    if (regionNode == null) {
843      throw new UnknownRegionException(
844        "No RegionStateNode found for " + regionInfo.getEncodedName() + "(Closed/Deleted?)");
845    }
846    TransitRegionStateProcedure proc;
847    regionNode.lock();
848    try {
849      preTransitCheck(regionNode, STATES_EXPECTED_ON_UNASSIGN_OR_MOVE);
850      regionNode.checkOnline();
851      proc = TransitRegionStateProcedure.move(getProcedureEnvironment(), regionInfo, targetServer);
852      regionNode.setProcedure(proc);
853    } finally {
854      regionNode.unlock();
855    }
856    return proc;
857  }
858
859  public void move(RegionInfo regionInfo) throws IOException {
860    TransitRegionStateProcedure proc = createMoveRegionProcedure(regionInfo, null);
861    ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc);
862  }
863
864  public Future<byte[]> moveAsync(RegionPlan regionPlan) throws HBaseIOException {
865    TransitRegionStateProcedure proc =
866      createMoveRegionProcedure(regionPlan.getRegionInfo(), regionPlan.getDestination());
867    return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), proc);
868  }
869
870  public Future<byte[]> balance(RegionPlan regionPlan) throws HBaseIOException {
871    ServerName current =
872      this.getRegionStates().getRegionAssignments().get(regionPlan.getRegionInfo());
873    if (current == null || !current.equals(regionPlan.getSource())) {
874      LOG.debug("Skip region plan {}, source server not match, current region location is {}",
875        regionPlan, current == null ? "(null)" : current);
876      return null;
877    }
878    return moveAsync(regionPlan);
879  }
880
881  // ============================================================================================
882  // RegionTransition procedures helpers
883  // ============================================================================================
884
885  /**
886   * Create round-robin assigns. Use on table creation to distribute out regions across cluster.
887   * @return AssignProcedures made out of the passed in <code>hris</code> and a call to the balancer
888   *         to populate the assigns with targets chosen using round-robin (default balancer
889   *         scheme). If at assign-time, the target chosen is no longer up, thats fine, the
890   *         AssignProcedure will ask the balancer for a new target, and so on.
891   */
892  public TransitRegionStateProcedure[] createRoundRobinAssignProcedures(List<RegionInfo> hris,
893    List<ServerName> serversToExclude) {
894    if (hris.isEmpty()) {
895      return new TransitRegionStateProcedure[0];
896    }
897
898    if (
899      serversToExclude != null && this.master.getServerManager().getOnlineServersList().size() == 1
900    ) {
901      LOG.debug("Only one region server found and hence going ahead with the assignment");
902      serversToExclude = null;
903    }
904    try {
905      // Ask the balancer to assign our regions. Pass the regions en masse. The balancer can do
906      // a better job if it has all the assignments in the one lump.
907      Map<ServerName, List<RegionInfo>> assignments = getBalancer().roundRobinAssignment(hris,
908        this.master.getServerManager().createDestinationServersList(serversToExclude));
909      // Return mid-method!
910      return createAssignProcedures(assignments);
911    } catch (IOException hioe) {
912      LOG.warn("Failed roundRobinAssignment", hioe);
913    }
914    // If an error above, fall-through to this simpler assign. Last resort.
915    return createAssignProcedures(hris);
916  }
917
918  /**
919   * Create round-robin assigns. Use on table creation to distribute out regions across cluster.
920   * @return AssignProcedures made out of the passed in <code>hris</code> and a call to the balancer
921   *         to populate the assigns with targets chosen using round-robin (default balancer
922   *         scheme). If at assign-time, the target chosen is no longer up, thats fine, the
923   *         AssignProcedure will ask the balancer for a new target, and so on.
924   */
925  public TransitRegionStateProcedure[] createRoundRobinAssignProcedures(List<RegionInfo> hris) {
926    return createRoundRobinAssignProcedures(hris, null);
927  }
928
929  static int compare(TransitRegionStateProcedure left, TransitRegionStateProcedure right) {
930    if (left.getRegion().isMetaRegion()) {
931      if (right.getRegion().isMetaRegion()) {
932        return RegionInfo.COMPARATOR.compare(left.getRegion(), right.getRegion());
933      }
934      return -1;
935    } else if (right.getRegion().isMetaRegion()) {
936      return +1;
937    }
938    if (left.getRegion().getTable().isSystemTable()) {
939      if (right.getRegion().getTable().isSystemTable()) {
940        return RegionInfo.COMPARATOR.compare(left.getRegion(), right.getRegion());
941      }
942      return -1;
943    } else if (right.getRegion().getTable().isSystemTable()) {
944      return +1;
945    }
946    return RegionInfo.COMPARATOR.compare(left.getRegion(), right.getRegion());
947  }
948
949  /**
950   * Create one TransitRegionStateProcedure to assign a region w/o specifying a target server. This
951   * method is called from HBCK2.
952   * @return an assign or null
953   */
954  public TransitRegionStateProcedure createOneAssignProcedure(RegionInfo ri, boolean override) {
955    TransitRegionStateProcedure trsp = null;
956    try {
957      trsp = createAssignProcedure(ri, null, override);
958    } catch (IOException ioe) {
959      LOG.info(
960        "Failed {} assign, override={}"
961          + (override ? "" : "; set override to by-pass state checks."),
962        ri.getEncodedName(), override, ioe);
963    }
964    return trsp;
965  }
966
967  /**
968   * Create one TransitRegionStateProcedure to unassign a region. This method is called from HBCK2.
969   * @return an unassign or null
970   */
971  public TransitRegionStateProcedure createOneUnassignProcedure(RegionInfo ri, boolean override) {
972    RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(ri);
973    TransitRegionStateProcedure trsp = null;
974    regionNode.lock();
975    try {
976      if (override) {
977        if (regionNode.getProcedure() != null) {
978          regionNode.unsetProcedure(regionNode.getProcedure());
979        }
980      } else {
981        // This is where we could throw an exception; i.e. override is false.
982        preTransitCheck(regionNode, STATES_EXPECTED_ON_UNASSIGN_OR_MOVE);
983      }
984      assert regionNode.getProcedure() == null;
985      trsp =
986        TransitRegionStateProcedure.unassign(getProcedureEnvironment(), regionNode.getRegionInfo());
987      regionNode.setProcedure(trsp);
988    } catch (IOException ioe) {
989      // 'override' must be false here.
990      LOG.info("Failed {} unassign, override=false; set override to by-pass state checks.",
991        ri.getEncodedName(), ioe);
992    } finally {
993      regionNode.unlock();
994    }
995    return trsp;
996  }
997
998  /**
999   * Create an array of TransitRegionStateProcedure w/o specifying a target server. Used as fallback
1000   * of caller is unable to do {@link #createAssignProcedures(Map)}.
1001   * <p/>
1002   * If no target server, at assign time, we will try to use the former location of the region if
1003   * one exists. This is how we 'retain' the old location across a server restart.
1004   * <p/>
1005   * Should only be called when you can make sure that no one can touch these regions other than
1006   * you. For example, when you are creating or enabling table. Presumes all Regions are in
1007   * appropriate state ripe for assign; no checking of Region state is done in here.
1008   * @see #createAssignProcedures(Map)
1009   */
1010  public TransitRegionStateProcedure[] createAssignProcedures(List<RegionInfo> hris) {
1011    return hris.stream().map(hri -> regionStates.getOrCreateRegionStateNode(hri))
1012      .map(regionNode -> createAssignProcedure(regionNode, null)).sorted(AssignmentManager::compare)
1013      .toArray(TransitRegionStateProcedure[]::new);
1014  }
1015
1016  /**
1017   * Tied to {@link #createAssignProcedures(List)} in that it is called if caller is unable to run
1018   * this method. Presumes all Regions are in appropriate state ripe for assign; no checking of
1019   * Region state is done in here.
1020   * @param assignments Map of assignments from which we produce an array of AssignProcedures.
1021   * @return Assignments made from the passed in <code>assignments</code>
1022   * @see #createAssignProcedures(List)
1023   */
1024  private TransitRegionStateProcedure[]
1025    createAssignProcedures(Map<ServerName, List<RegionInfo>> assignments) {
1026    return assignments.entrySet().stream()
1027      .flatMap(e -> e.getValue().stream().map(hri -> regionStates.getOrCreateRegionStateNode(hri))
1028        .map(regionNode -> createAssignProcedure(regionNode, e.getKey())))
1029      .sorted(AssignmentManager::compare).toArray(TransitRegionStateProcedure[]::new);
1030  }
1031
1032  // for creating unassign TRSP when disabling a table or closing excess region replicas
1033  private TransitRegionStateProcedure forceCreateUnssignProcedure(RegionStateNode regionNode) {
1034    regionNode.lock();
1035    try {
1036      if (regionNode.isInState(State.OFFLINE, State.CLOSED, State.SPLIT)) {
1037        return null;
1038      }
1039      // in general, a split parent should be in CLOSED or SPLIT state, but anyway, let's check it
1040      // here for safety
1041      if (regionNode.getRegionInfo().isSplit()) {
1042        LOG.warn("{} is a split parent but not in CLOSED or SPLIT state", regionNode);
1043        return null;
1044      }
1045      // As in DisableTableProcedure or ModifyTableProcedure, we will hold the xlock for table, so
1046      // we can make sure that this procedure has not been executed yet, as TRSP will hold the
1047      // shared lock for table all the time. So here we will unset it and when it is actually
1048      // executed, it will find that the attach procedure is not itself and quit immediately.
1049      if (regionNode.getProcedure() != null) {
1050        regionNode.unsetProcedure(regionNode.getProcedure());
1051      }
1052      return regionNode.setProcedure(TransitRegionStateProcedure.unassign(getProcedureEnvironment(),
1053        regionNode.getRegionInfo()));
1054    } finally {
1055      regionNode.unlock();
1056    }
1057  }
1058
1059  /**
1060   * Called by DisableTableProcedure to unassign all the regions for a table.
1061   */
1062  public TransitRegionStateProcedure[] createUnassignProceduresForDisabling(TableName tableName) {
1063    return regionStates.getTableRegionStateNodes(tableName).stream()
1064      .map(this::forceCreateUnssignProcedure).filter(p -> p != null)
1065      .toArray(TransitRegionStateProcedure[]::new);
1066  }
1067
1068  /**
1069   * Called by ModifyTableProcedures to unassign all the excess region replicas for a table.
1070   */
1071  public TransitRegionStateProcedure[] createUnassignProceduresForClosingExcessRegionReplicas(
1072    TableName tableName, int newReplicaCount) {
1073    return regionStates.getTableRegionStateNodes(tableName).stream()
1074      .filter(regionNode -> regionNode.getRegionInfo().getReplicaId() >= newReplicaCount)
1075      .map(this::forceCreateUnssignProcedure).filter(p -> p != null)
1076      .toArray(TransitRegionStateProcedure[]::new);
1077  }
1078
1079  public SplitTableRegionProcedure createSplitProcedure(final RegionInfo regionToSplit,
1080    final byte[] splitKey) throws IOException {
1081    return new SplitTableRegionProcedure(getProcedureEnvironment(), regionToSplit, splitKey);
1082  }
1083
1084  public MergeTableRegionsProcedure createMergeProcedure(RegionInfo... ris) throws IOException {
1085    return new MergeTableRegionsProcedure(getProcedureEnvironment(), ris, false);
1086  }
1087
1088  /**
1089   * Delete the region states. This is called by "DeleteTable"
1090   */
1091  public void deleteTable(final TableName tableName) throws IOException {
1092    final ArrayList<RegionInfo> regions = regionStates.getTableRegionsInfo(tableName);
1093    regionStateStore.deleteRegions(regions);
1094    for (int i = 0; i < regions.size(); ++i) {
1095      final RegionInfo regionInfo = regions.get(i);
1096      // we expect the region to be offline
1097      regionStates.removeFromOfflineRegions(regionInfo);
1098      regionStates.deleteRegion(regionInfo);
1099    }
1100  }
1101
1102  // ============================================================================================
1103  // RS Region Transition Report helpers
1104  // ============================================================================================
1105  private void reportRegionStateTransition(ReportRegionStateTransitionResponse.Builder builder,
1106    ServerName serverName, List<RegionStateTransition> transitionList) throws IOException {
1107    for (RegionStateTransition transition : transitionList) {
1108      switch (transition.getTransitionCode()) {
1109        case OPENED:
1110        case FAILED_OPEN:
1111        case CLOSED:
1112          assert transition.getRegionInfoCount() == 1 : transition;
1113          final RegionInfo hri = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0));
1114          long procId =
1115            transition.getProcIdCount() > 0 ? transition.getProcId(0) : Procedure.NO_PROC_ID;
1116          updateRegionTransition(serverName, transition.getTransitionCode(), hri,
1117            transition.hasOpenSeqNum() ? transition.getOpenSeqNum() : HConstants.NO_SEQNUM, procId);
1118          break;
1119        case READY_TO_SPLIT:
1120        case SPLIT:
1121        case SPLIT_REVERTED:
1122          assert transition.getRegionInfoCount() == 3 : transition;
1123          final RegionInfo parent = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0));
1124          final RegionInfo splitA = ProtobufUtil.toRegionInfo(transition.getRegionInfo(1));
1125          final RegionInfo splitB = ProtobufUtil.toRegionInfo(transition.getRegionInfo(2));
1126          updateRegionSplitTransition(serverName, transition.getTransitionCode(), parent, splitA,
1127            splitB);
1128          break;
1129        case READY_TO_MERGE:
1130        case MERGED:
1131        case MERGE_REVERTED:
1132          assert transition.getRegionInfoCount() == 3 : transition;
1133          final RegionInfo merged = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0));
1134          final RegionInfo mergeA = ProtobufUtil.toRegionInfo(transition.getRegionInfo(1));
1135          final RegionInfo mergeB = ProtobufUtil.toRegionInfo(transition.getRegionInfo(2));
1136          updateRegionMergeTransition(serverName, transition.getTransitionCode(), merged, mergeA,
1137            mergeB);
1138          break;
1139      }
1140    }
1141  }
1142
1143  public ReportRegionStateTransitionResponse reportRegionStateTransition(
1144    final ReportRegionStateTransitionRequest req) throws PleaseHoldException {
1145    ReportRegionStateTransitionResponse.Builder builder =
1146      ReportRegionStateTransitionResponse.newBuilder();
1147    ServerName serverName = ProtobufUtil.toServerName(req.getServer());
1148    ServerStateNode serverNode = regionStates.getOrCreateServer(serverName);
1149    // here we have to acquire a read lock instead of a simple exclusive lock. This is because that
1150    // we should not block other reportRegionStateTransition call from the same region server. This
1151    // is not only about performance, but also to prevent dead lock. Think of the meta region is
1152    // also on the same region server and you hold the lock which blocks the
1153    // reportRegionStateTransition for meta, and since meta is not online, you will block inside the
1154    // lock protection to wait for meta online...
1155    serverNode.readLock().lock();
1156    try {
1157      // we only accept reportRegionStateTransition if the region server is online, see the comment
1158      // above in submitServerCrash method and HBASE-21508 for more details.
1159      if (serverNode.isInState(ServerState.ONLINE)) {
1160        try {
1161          reportRegionStateTransition(builder, serverName, req.getTransitionList());
1162        } catch (PleaseHoldException e) {
1163          LOG.trace("Failed transition ", e);
1164          throw e;
1165        } catch (UnsupportedOperationException | IOException e) {
1166          // TODO: at the moment we have a single error message and the RS will abort
1167          // if the master says that one of the region transitions failed.
1168          LOG.warn("Failed transition", e);
1169          builder.setErrorMessage("Failed transition " + e.getMessage());
1170        }
1171      } else {
1172        LOG.warn("The region server {} is already dead, skip reportRegionStateTransition call",
1173          serverName);
1174        builder.setErrorMessage("You are dead");
1175      }
1176    } finally {
1177      serverNode.readLock().unlock();
1178    }
1179
1180    return builder.build();
1181  }
1182
1183  private void updateRegionTransition(ServerName serverName, TransitionCode state,
1184    RegionInfo regionInfo, long seqId, long procId) throws IOException {
1185    checkMetaLoaded(regionInfo);
1186
1187    RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo);
1188    if (regionNode == null) {
1189      // the table/region is gone. maybe a delete, split, merge
1190      throw new UnexpectedStateException(String.format(
1191        "Server %s was trying to transition region %s to %s. but Region is not known.", serverName,
1192        regionInfo, state));
1193    }
1194    LOG.trace("Update region transition serverName={} region={} regionState={}", serverName,
1195      regionNode, state);
1196
1197    ServerStateNode serverNode = regionStates.getOrCreateServer(serverName);
1198    regionNode.lock();
1199    try {
1200      if (!reportTransition(regionNode, serverNode, state, seqId, procId)) {
1201        // Don't log WARN if shutting down cluster; during shutdown. Avoid the below messages:
1202        // 2018-08-13 10:45:10,551 WARN ...AssignmentManager: No matching procedure found for
1203        // rit=OPEN, location=ve0538.halxg.cloudera.com,16020,1533493000958,
1204        // table=IntegrationTestBigLinkedList, region=65ab289e2fc1530df65f6c3d7cde7aa5 transition
1205        // to CLOSED
1206        // These happen because on cluster shutdown, we currently let the RegionServers close
1207        // regions. This is the only time that region close is not run by the Master (so cluster
1208        // goes down fast). Consider changing it so Master runs all shutdowns.
1209        if (
1210          this.master.getServerManager().isClusterShutdown() && state.equals(TransitionCode.CLOSED)
1211        ) {
1212          LOG.info("RegionServer {} {}", state, regionNode.getRegionInfo().getEncodedName());
1213        } else {
1214          LOG.warn("No matching procedure found for {} transition on {} to {}", serverName,
1215            regionNode, state);
1216        }
1217      }
1218    } finally {
1219      regionNode.unlock();
1220    }
1221  }
1222
1223  private boolean reportTransition(RegionStateNode regionNode, ServerStateNode serverNode,
1224    TransitionCode state, long seqId, long procId) throws IOException {
1225    ServerName serverName = serverNode.getServerName();
1226    TransitRegionStateProcedure proc = regionNode.getProcedure();
1227    if (proc == null) {
1228      return false;
1229    }
1230    proc.reportTransition(master.getMasterProcedureExecutor().getEnvironment(), regionNode,
1231      serverName, state, seqId, procId);
1232    return true;
1233  }
1234
1235  private void updateRegionSplitTransition(final ServerName serverName, final TransitionCode state,
1236    final RegionInfo parent, final RegionInfo hriA, final RegionInfo hriB) throws IOException {
1237    checkMetaLoaded(parent);
1238
1239    if (state != TransitionCode.READY_TO_SPLIT) {
1240      throw new UnexpectedStateException(
1241        "unsupported split regionState=" + state + " for parent region " + parent
1242          + " maybe an old RS (< 2.0) had the operation in progress");
1243    }
1244
1245    // sanity check on the request
1246    if (!Bytes.equals(hriA.getEndKey(), hriB.getStartKey())) {
1247      throw new UnsupportedOperationException("unsupported split request with bad keys: parent="
1248        + parent + " hriA=" + hriA + " hriB=" + hriB);
1249    }
1250
1251    if (!master.isSplitOrMergeEnabled(MasterSwitchType.SPLIT)) {
1252      LOG.warn("Split switch is off! skip split of " + parent);
1253      throw new DoNotRetryIOException(
1254        "Split region " + parent.getRegionNameAsString() + " failed due to split switch off");
1255    }
1256
1257    // Submit the Split procedure
1258    final byte[] splitKey = hriB.getStartKey();
1259    if (LOG.isDebugEnabled()) {
1260      LOG.debug("Split request from " + serverName + ", parent=" + parent + " splitKey="
1261        + Bytes.toStringBinary(splitKey));
1262    }
1263    // Processing this report happens asynchronously from other activities which can mutate
1264    // the region state. For example, a split procedure may already be running for this parent.
1265    // A split procedure cannot succeed if the parent region is no longer open, so we can
1266    // ignore it in that case.
1267    // Note that submitting more than one split procedure for a given region is
1268    // harmless -- the split is fenced in the procedure handling -- but it would be noisy in
1269    // the logs. Only one procedure can succeed. The other procedure(s) would abort during
1270    // initialization and report failure with WARN level logging.
1271    RegionState parentState = regionStates.getRegionState(parent);
1272    if (parentState != null && parentState.isOpened()) {
1273      master.getMasterProcedureExecutor().submitProcedure(createSplitProcedure(parent, splitKey));
1274    } else {
1275      LOG.info("Ignoring split request from " + serverName + ", parent=" + parent
1276        + " because parent is unknown or not open");
1277      return;
1278    }
1279
1280    // If the RS is < 2.0 throw an exception to abort the operation, we are handling the split
1281    if (master.getServerManager().getVersionNumber(serverName) < 0x0200000) {
1282      throw new UnsupportedOperationException(
1283        String.format("Split handled by the master: " + "parent=%s hriA=%s hriB=%s",
1284          parent.getShortNameToLog(), hriA, hriB));
1285    }
1286  }
1287
1288  private void updateRegionMergeTransition(final ServerName serverName, final TransitionCode state,
1289    final RegionInfo merged, final RegionInfo hriA, final RegionInfo hriB) throws IOException {
1290    checkMetaLoaded(merged);
1291
1292    if (state != TransitionCode.READY_TO_MERGE) {
1293      throw new UnexpectedStateException(
1294        "Unsupported merge regionState=" + state + " for regionA=" + hriA + " regionB=" + hriB
1295          + " merged=" + merged + " maybe an old RS (< 2.0) had the operation in progress");
1296    }
1297
1298    if (!master.isSplitOrMergeEnabled(MasterSwitchType.MERGE)) {
1299      LOG.warn("Merge switch is off! skip merge of regionA=" + hriA + " regionB=" + hriB);
1300      throw new DoNotRetryIOException(
1301        "Merge of regionA=" + hriA + " regionB=" + hriB + " failed because merge switch is off");
1302    }
1303
1304    // Submit the Merge procedure
1305    if (LOG.isDebugEnabled()) {
1306      LOG.debug("Handling merge request from RS=" + merged + ", merged=" + merged);
1307    }
1308    master.getMasterProcedureExecutor().submitProcedure(createMergeProcedure(hriA, hriB));
1309
1310    // If the RS is < 2.0 throw an exception to abort the operation, we are handling the merge
1311    if (master.getServerManager().getVersionNumber(serverName) < 0x0200000) {
1312      throw new UnsupportedOperationException(
1313        String.format("Merge not handled yet: regionState=%s merged=%s hriA=%s hriB=%s", state,
1314          merged, hriA, hriB));
1315    }
1316  }
1317
1318  // ============================================================================================
1319  // RS Status update (report online regions) helpers
1320  // ============================================================================================
1321  /**
1322   * The master will call this method when the RS send the regionServerReport(). The report will
1323   * contains the "online regions". This method will check the the online regions against the
1324   * in-memory state of the AM, and we will log a warn message if there is a mismatch. This is
1325   * because that there is no fencing between the reportRegionStateTransition method and
1326   * regionServerReport method, so there could be race and introduce inconsistency here, but
1327   * actually there is no problem.
1328   * <p/>
1329   * Please see HBASE-21421 and HBASE-21463 for more details.
1330   */
1331  public void reportOnlineRegions(ServerName serverName, Set<byte[]> regionNames) {
1332    if (!isRunning()) {
1333      return;
1334    }
1335    if (LOG.isTraceEnabled()) {
1336      LOG.trace("ReportOnlineRegions {} regionCount={}, metaLoaded={} {}", serverName,
1337        regionNames.size(), isMetaLoaded(),
1338        regionNames.stream().map(Bytes::toStringBinary).collect(Collectors.toList()));
1339    }
1340
1341    ServerStateNode serverNode = regionStates.getOrCreateServer(serverName);
1342    synchronized (serverNode) {
1343      if (!serverNode.isInState(ServerState.ONLINE)) {
1344        LOG.warn("Got a report from a server result in state " + serverNode.getState());
1345        return;
1346      }
1347    }
1348
1349    // Track the regionserver reported online regions in memory.
1350    synchronized (rsReports) {
1351      rsReports.put(serverName, regionNames);
1352    }
1353
1354    if (regionNames.isEmpty()) {
1355      // nothing to do if we don't have regions
1356      LOG.trace("no online region found on {}", serverName);
1357      return;
1358    }
1359    if (!isMetaLoaded()) {
1360      // we are still on startup, skip checking
1361      return;
1362    }
1363    // The Heartbeat tells us of what regions are on the region serve, check the state.
1364    checkOnlineRegionsReport(serverNode, regionNames);
1365  }
1366
1367  /**
1368   * Close <code>regionName</code> on <code>sn</code> silently and immediately without using a
1369   * Procedure or going via hbase:meta. For case where a RegionServer's hosting of a Region is not
1370   * aligned w/ the Master's accounting of Region state. This is for cleaning up an error in
1371   * accounting.
1372   */
1373  private void closeRegionSilently(ServerName sn, byte[] regionName) {
1374    try {
1375      RegionInfo ri = CatalogFamilyFormat.parseRegionInfoFromRegionName(regionName);
1376      // Pass -1 for timeout. Means do not wait.
1377      ServerManager.closeRegionSilentlyAndWait(this.master.getAsyncClusterConnection(), sn, ri, -1);
1378    } catch (Exception e) {
1379      LOG.error("Failed trying to close {} on {}", Bytes.toStringBinary(regionName), sn, e);
1380    }
1381  }
1382
1383  /**
1384   * Check that what the RegionServer reports aligns with the Master's image. If disagreement, we
1385   * will tell the RegionServer to expediently close a Region we do not think it should have.
1386   */
1387  private void checkOnlineRegionsReport(ServerStateNode serverNode, Set<byte[]> regionNames) {
1388    ServerName serverName = serverNode.getServerName();
1389    for (byte[] regionName : regionNames) {
1390      if (!isRunning()) {
1391        return;
1392      }
1393      RegionStateNode regionNode = regionStates.getRegionStateNodeFromName(regionName);
1394      if (regionNode == null) {
1395        String regionNameAsStr = Bytes.toStringBinary(regionName);
1396        LOG.warn("No RegionStateNode for {} but reported as up on {}; closing...", regionNameAsStr,
1397          serverName);
1398        closeRegionSilently(serverNode.getServerName(), regionName);
1399        continue;
1400      }
1401      final long lag = 1000;
1402      regionNode.lock();
1403      try {
1404        long diff = EnvironmentEdgeManager.currentTime() - regionNode.getLastUpdate();
1405        if (regionNode.isInState(State.OPENING, State.OPEN)) {
1406          // This is possible as a region server has just closed a region but the region server
1407          // report is generated before the closing, but arrive after the closing. Make sure there
1408          // is some elapsed time so less false alarms.
1409          if (!regionNode.getRegionLocation().equals(serverName) && diff > lag) {
1410            LOG.warn("Reporting {} server does not match {} (time since last "
1411              + "update={}ms); closing...", serverName, regionNode, diff);
1412            closeRegionSilently(serverNode.getServerName(), regionName);
1413          }
1414        } else if (!regionNode.isInState(State.CLOSING, State.SPLITTING)) {
1415          // So, we can get report that a region is CLOSED or SPLIT because a heartbeat
1416          // came in at about same time as a region transition. Make sure there is some
1417          // elapsed time so less false alarms.
1418          if (diff > lag) {
1419            LOG.warn("Reporting {} state does not match {} (time since last update={}ms)",
1420              serverName, regionNode, diff);
1421          }
1422        }
1423      } finally {
1424        regionNode.unlock();
1425      }
1426    }
1427  }
1428
1429  // ============================================================================================
1430  // RIT chore
1431  // ============================================================================================
1432  private static class RegionInTransitionChore extends ProcedureInMemoryChore<MasterProcedureEnv> {
1433    public RegionInTransitionChore(final int timeoutMsec) {
1434      super(timeoutMsec);
1435    }
1436
1437    @Override
1438    protected void periodicExecute(final MasterProcedureEnv env) {
1439      final AssignmentManager am = env.getAssignmentManager();
1440
1441      final RegionInTransitionStat ritStat = am.computeRegionInTransitionStat();
1442      if (ritStat.hasRegionsOverThreshold()) {
1443        for (RegionState hri : ritStat.getRegionOverThreshold()) {
1444          am.handleRegionOverStuckWarningThreshold(hri.getRegion());
1445        }
1446      }
1447
1448      // update metrics
1449      am.updateRegionsInTransitionMetrics(ritStat);
1450    }
1451  }
1452
1453  private static class DeadServerMetricRegionChore
1454    extends ProcedureInMemoryChore<MasterProcedureEnv> {
1455    public DeadServerMetricRegionChore(final int timeoutMsec) {
1456      super(timeoutMsec);
1457    }
1458
1459    @Override
1460    protected void periodicExecute(final MasterProcedureEnv env) {
1461      final ServerManager sm = env.getMasterServices().getServerManager();
1462      final AssignmentManager am = env.getAssignmentManager();
1463      // To minimize inconsistencies we are not going to snapshot live servers in advance in case
1464      // new servers are added; OTOH we don't want to add heavy sync for a consistent view since
1465      // this is for metrics. Instead, we're going to check each regions as we go; to avoid making
1466      // too many checks, we maintain a local lists of server, limiting us to false negatives. If
1467      // we miss some recently-dead server, we'll just see it next time.
1468      Set<ServerName> recentlyLiveServers = new HashSet<>();
1469      int deadRegions = 0, unknownRegions = 0;
1470      for (RegionStateNode rsn : am.getRegionStates().getRegionStateNodes()) {
1471        if (rsn.getState() != State.OPEN) {
1472          continue; // Opportunistic check, should quickly skip RITs, offline tables, etc.
1473        }
1474        // Do not need to acquire region state lock as this is only for showing metrics.
1475        ServerName sn = rsn.getRegionLocation();
1476        State state = rsn.getState();
1477        if (state != State.OPEN) {
1478          continue; // Mostly skipping RITs that are already being take care of.
1479        }
1480        if (sn == null) {
1481          ++unknownRegions; // Opened on null?
1482          continue;
1483        }
1484        if (recentlyLiveServers.contains(sn)) {
1485          continue;
1486        }
1487        ServerManager.ServerLiveState sls = sm.isServerKnownAndOnline(sn);
1488        switch (sls) {
1489          case LIVE:
1490            recentlyLiveServers.add(sn);
1491            break;
1492          case DEAD:
1493            ++deadRegions;
1494            break;
1495          case UNKNOWN:
1496            ++unknownRegions;
1497            break;
1498          default:
1499            throw new AssertionError("Unexpected " + sls);
1500        }
1501      }
1502      if (deadRegions > 0 || unknownRegions > 0) {
1503        LOG.info("Found {} OPEN regions on dead servers and {} OPEN regions on unknown servers",
1504          deadRegions, unknownRegions);
1505      }
1506
1507      am.updateDeadServerRegionMetrics(deadRegions, unknownRegions);
1508    }
1509  }
1510
1511  public RegionInTransitionStat computeRegionInTransitionStat() {
1512    final RegionInTransitionStat rit = new RegionInTransitionStat(getConfiguration());
1513    rit.update(this);
1514    return rit;
1515  }
1516
1517  public static class RegionInTransitionStat {
1518    private final int ritThreshold;
1519
1520    private HashMap<String, RegionState> ritsOverThreshold = null;
1521    private long statTimestamp;
1522    private long oldestRITTime = 0;
1523    private int totalRITsTwiceThreshold = 0;
1524    private int totalRITs = 0;
1525
1526    public RegionInTransitionStat(final Configuration conf) {
1527      this.ritThreshold =
1528        conf.getInt(METRICS_RIT_STUCK_WARNING_THRESHOLD, DEFAULT_RIT_STUCK_WARNING_THRESHOLD);
1529    }
1530
1531    public int getRITThreshold() {
1532      return ritThreshold;
1533    }
1534
1535    public long getTimestamp() {
1536      return statTimestamp;
1537    }
1538
1539    public int getTotalRITs() {
1540      return totalRITs;
1541    }
1542
1543    public long getOldestRITTime() {
1544      return oldestRITTime;
1545    }
1546
1547    public int getTotalRITsOverThreshold() {
1548      Map<String, RegionState> m = this.ritsOverThreshold;
1549      return m != null ? m.size() : 0;
1550    }
1551
1552    public boolean hasRegionsTwiceOverThreshold() {
1553      return totalRITsTwiceThreshold > 0;
1554    }
1555
1556    public boolean hasRegionsOverThreshold() {
1557      Map<String, RegionState> m = this.ritsOverThreshold;
1558      return m != null && !m.isEmpty();
1559    }
1560
1561    public Collection<RegionState> getRegionOverThreshold() {
1562      Map<String, RegionState> m = this.ritsOverThreshold;
1563      return m != null ? m.values() : Collections.emptySet();
1564    }
1565
1566    public boolean isRegionOverThreshold(final RegionInfo regionInfo) {
1567      Map<String, RegionState> m = this.ritsOverThreshold;
1568      return m != null && m.containsKey(regionInfo.getEncodedName());
1569    }
1570
1571    public boolean isRegionTwiceOverThreshold(final RegionInfo regionInfo) {
1572      Map<String, RegionState> m = this.ritsOverThreshold;
1573      if (m == null) {
1574        return false;
1575      }
1576      final RegionState state = m.get(regionInfo.getEncodedName());
1577      if (state == null) {
1578        return false;
1579      }
1580      return (statTimestamp - state.getStamp()) > (ritThreshold * 2);
1581    }
1582
1583    protected void update(final AssignmentManager am) {
1584      final RegionStates regionStates = am.getRegionStates();
1585      this.statTimestamp = EnvironmentEdgeManager.currentTime();
1586      update(regionStates.getRegionsStateInTransition(), statTimestamp);
1587      update(regionStates.getRegionFailedOpen(), statTimestamp);
1588
1589      if (LOG.isDebugEnabled() && ritsOverThreshold != null && !ritsOverThreshold.isEmpty()) {
1590        LOG.debug("RITs over threshold: {}",
1591          ritsOverThreshold.entrySet().stream()
1592            .map(e -> e.getKey() + ":" + e.getValue().getState().name())
1593            .collect(Collectors.joining("\n")));
1594      }
1595    }
1596
1597    private void update(final Collection<RegionState> regions, final long currentTime) {
1598      for (RegionState state : regions) {
1599        totalRITs++;
1600        final long ritStartedMs = state.getStamp();
1601        if (ritStartedMs == 0) {
1602          // Don't output bogus values to metrics if they accidentally make it here.
1603          LOG.warn("The RIT {} has no start time", state.getRegion());
1604          continue;
1605        }
1606        final long ritTime = currentTime - ritStartedMs;
1607        if (ritTime > ritThreshold) {
1608          if (ritsOverThreshold == null) {
1609            ritsOverThreshold = new HashMap<String, RegionState>();
1610          }
1611          ritsOverThreshold.put(state.getRegion().getEncodedName(), state);
1612          totalRITsTwiceThreshold += (ritTime > (ritThreshold * 2)) ? 1 : 0;
1613        }
1614        if (oldestRITTime < ritTime) {
1615          oldestRITTime = ritTime;
1616        }
1617      }
1618    }
1619  }
1620
1621  private void updateRegionsInTransitionMetrics(final RegionInTransitionStat ritStat) {
1622    metrics.updateRITOldestAge(ritStat.getOldestRITTime());
1623    metrics.updateRITCount(ritStat.getTotalRITs());
1624    metrics.updateRITCountOverThreshold(ritStat.getTotalRITsOverThreshold());
1625  }
1626
1627  private void updateDeadServerRegionMetrics(int deadRegions, int unknownRegions) {
1628    metrics.updateDeadServerOpenRegions(deadRegions);
1629    metrics.updateUnknownServerOpenRegions(unknownRegions);
1630  }
1631
1632  private void handleRegionOverStuckWarningThreshold(final RegionInfo regionInfo) {
1633    final RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo);
1634    // if (regionNode.isStuck()) {
1635    LOG.warn("STUCK Region-In-Transition {}", regionNode);
1636  }
1637
1638  // ============================================================================================
1639  // TODO: Master load/bootstrap
1640  // ============================================================================================
1641  public void joinCluster() throws IOException {
1642    long startTime = System.nanoTime();
1643    LOG.debug("Joining cluster...");
1644
1645    // Scan hbase:meta to build list of existing regions, servers, and assignment.
1646    // hbase:meta is online now or will be. Inside loadMeta, we keep trying. Can't make progress
1647    // w/o meta.
1648    loadMeta();
1649
1650    while (master.getServerManager().countOfRegionServers() < 1) {
1651      LOG.info("Waiting for RegionServers to join; current count={}",
1652        master.getServerManager().countOfRegionServers());
1653      Threads.sleep(250);
1654    }
1655    LOG.info("Number of RegionServers={}", master.getServerManager().countOfRegionServers());
1656
1657    // Start the chores
1658    master.getMasterProcedureExecutor().addChore(this.ritChore);
1659    master.getMasterProcedureExecutor().addChore(this.deadMetricChore);
1660
1661    long costMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
1662    LOG.info("Joined the cluster in {}", StringUtils.humanTimeDiff(costMs));
1663  }
1664
1665  /**
1666   * Create assign procedure for offline regions. Just follow the old
1667   * processofflineServersWithOnlineRegions method. Since now we do not need to deal with dead
1668   * server any more, we only deal with the regions in OFFLINE state in this method. And this is a
1669   * bit strange, that for new regions, we will add it in CLOSED state instead of OFFLINE state, and
1670   * usually there will be a procedure to track them. The processofflineServersWithOnlineRegions is
1671   * a legacy from long ago, as things are going really different now, maybe we do not need this
1672   * method any more. Need to revisit later.
1673   */
1674  // Public so can be run by the Master as part of the startup. Needs hbase:meta to be online.
1675  // Needs to be done after the table state manager has been started.
1676  public void processOfflineRegions() {
1677    TransitRegionStateProcedure[] procs =
1678      regionStates.getRegionStateNodes().stream().filter(rsn -> rsn.isInState(State.OFFLINE))
1679        .filter(rsn -> isTableEnabled(rsn.getRegionInfo().getTable())).map(rsn -> {
1680          rsn.lock();
1681          try {
1682            if (rsn.getProcedure() != null) {
1683              return null;
1684            } else {
1685              return rsn.setProcedure(TransitRegionStateProcedure.assign(getProcedureEnvironment(),
1686                rsn.getRegionInfo(), null));
1687            }
1688          } finally {
1689            rsn.unlock();
1690          }
1691        }).filter(p -> p != null).toArray(TransitRegionStateProcedure[]::new);
1692    if (procs.length > 0) {
1693      master.getMasterProcedureExecutor().submitProcedures(procs);
1694    }
1695  }
1696
1697  /*
1698   * AM internal RegionStateStore.RegionStateVisitor implementation. To be used when scanning META
1699   * table for region rows, using RegionStateStore utility methods. RegionStateStore methods will
1700   * convert Result into proper RegionInfo instances, but those would still need to be added into
1701   * AssignmentManager.regionStates in-memory cache. RegionMetaLoadingVisitor.visitRegionState
1702   * method provides the logic for adding RegionInfo instances as loaded from latest META scan into
1703   * AssignmentManager.regionStates.
1704   */
1705  private class RegionMetaLoadingVisitor implements RegionStateStore.RegionStateVisitor {
1706
1707    @Override
1708    public void visitRegionState(Result result, final RegionInfo regionInfo, final State state,
1709      final ServerName regionLocation, final ServerName lastHost, final long openSeqNum) {
1710      if (
1711        state == null && regionLocation == null && lastHost == null
1712          && openSeqNum == SequenceId.NO_SEQUENCE_ID
1713      ) {
1714        // This is a row with nothing in it.
1715        LOG.warn("Skipping empty row={}", result);
1716        return;
1717      }
1718      State localState = state;
1719      if (localState == null) {
1720        // No region state column data in hbase:meta table! Are I doing a rolling upgrade from
1721        // hbase1 to hbase2? Am I restoring a SNAPSHOT or otherwise adding a region to hbase:meta?
1722        // In any of these cases, state is empty. For now, presume OFFLINE but there are probably
1723        // cases where we need to probe more to be sure this correct; TODO informed by experience.
1724        LOG.info(regionInfo.getEncodedName() + " regionState=null; presuming " + State.OFFLINE);
1725        localState = State.OFFLINE;
1726      }
1727      RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo);
1728      // Do not need to lock on regionNode, as we can make sure that before we finish loading
1729      // meta, all the related procedures can not be executed. The only exception is for meta
1730      // region related operations, but here we do not load the informations for meta region.
1731      regionNode.setState(localState);
1732      regionNode.setLastHost(lastHost);
1733      regionNode.setRegionLocation(regionLocation);
1734      regionNode.setOpenSeqNum(openSeqNum);
1735
1736      // Note: keep consistent with other methods, see region(Opening|Opened|Closing)
1737      // RIT/ServerCrash handling should take care of the transiting regions.
1738      if (
1739        localState.matches(State.OPEN, State.OPENING, State.CLOSING, State.SPLITTING, State.MERGING)
1740      ) {
1741        assert regionLocation != null : "found null region location for " + regionNode;
1742        regionStates.addRegionToServer(regionNode);
1743      } else if (localState == State.OFFLINE || regionInfo.isOffline()) {
1744        regionStates.addToOfflineRegions(regionNode);
1745      }
1746      if (regionNode.getProcedure() != null) {
1747        regionNode.getProcedure().stateLoaded(AssignmentManager.this, regionNode);
1748      }
1749    }
1750  };
1751
1752  /**
1753   * Query META if the given <code>RegionInfo</code> exists, adding to
1754   * <code>AssignmentManager.regionStateStore</code> cache if the region is found in META.
1755   * @param regionEncodedName encoded name for the region to be loaded from META into
1756   *                          <code>AssignmentManager.regionStateStore</code> cache
1757   * @return <code>RegionInfo</code> instance for the given region if it is present in META and got
1758   *         successfully loaded into <code>AssignmentManager.regionStateStore</code> cache,
1759   *         <b>null</b> otherwise.
1760   * @throws UnknownRegionException if any errors occur while querying meta.
1761   */
1762  public RegionInfo loadRegionFromMeta(String regionEncodedName) throws UnknownRegionException {
1763    try {
1764      RegionMetaLoadingVisitor visitor = new RegionMetaLoadingVisitor();
1765      regionStateStore.visitMetaForRegion(regionEncodedName, visitor);
1766      return regionStates.getRegionState(regionEncodedName) == null
1767        ? null
1768        : regionStates.getRegionState(regionEncodedName).getRegion();
1769    } catch (IOException e) {
1770      throw new UnknownRegionException(
1771        "Error trying to load region " + regionEncodedName + " from META", e);
1772    }
1773  }
1774
1775  private void loadMeta() throws IOException {
1776    // TODO: use a thread pool
1777    regionStateStore.visitMeta(new RegionMetaLoadingVisitor());
1778  }
1779
1780  /**
1781   * Used to check if the meta loading is done.
1782   * <p/>
1783   * if not we throw PleaseHoldException since we are rebuilding the RegionStates
1784   * @param hri region to check if it is already rebuild
1785   * @throws PleaseHoldException if meta has not been loaded yet
1786   */
1787  private void checkMetaLoaded(RegionInfo hri) throws PleaseHoldException {
1788    if (!isRunning()) {
1789      throw new PleaseHoldException("AssignmentManager not running");
1790    }
1791    boolean meta = isMetaRegion(hri);
1792    boolean metaLoaded = isMetaLoaded();
1793    if (!meta && !metaLoaded) {
1794      throw new PleaseHoldException(
1795        "Master not fully online; hbase:meta=" + meta + ", metaLoaded=" + metaLoaded);
1796    }
1797  }
1798
1799  // ============================================================================================
1800  // TODO: Metrics
1801  // ============================================================================================
1802  public int getNumRegionsOpened() {
1803    // TODO: Used by TestRegionPlacement.java and assume monotonically increasing value
1804    return 0;
1805  }
1806
1807  /**
1808   * Usually run by the Master in reaction to server crash during normal processing. Can also be
1809   * invoked via external RPC to effect repair; in the latter case, the 'force' flag is set so we
1810   * push through the SCP though context may indicate already-running-SCP (An old SCP may have
1811   * exited abnormally, or damaged cluster may still have references in hbase:meta to 'Unknown
1812   * Servers' -- servers that are not online or in dead servers list, etc.)
1813   * @param force Set if the request came in externally over RPC (via hbck2). Force means run the
1814   *              SCP even if it seems as though there might be an outstanding SCP running.
1815   * @return pid of scheduled SCP or {@link Procedure#NO_PROC_ID} if none scheduled.
1816   */
1817  public long submitServerCrash(ServerName serverName, boolean shouldSplitWal, boolean force) {
1818    // May be an 'Unknown Server' so handle case where serverNode is null.
1819    ServerStateNode serverNode = regionStates.getServerNode(serverName);
1820    // Remove the in-memory rsReports result
1821    synchronized (rsReports) {
1822      rsReports.remove(serverName);
1823    }
1824
1825    // We hold the write lock here for fencing on reportRegionStateTransition. Once we set the
1826    // server state to CRASHED, we will no longer accept the reportRegionStateTransition call from
1827    // this server. This is used to simplify the implementation for TRSP and SCP, where we can make
1828    // sure that, the region list fetched by SCP will not be changed any more.
1829    if (serverNode != null) {
1830      serverNode.writeLock().lock();
1831    }
1832    boolean carryingMeta;
1833    long pid;
1834    try {
1835      ProcedureExecutor<MasterProcedureEnv> procExec = this.master.getMasterProcedureExecutor();
1836      carryingMeta = isCarryingMeta(serverName);
1837      if (!force && serverNode != null && !serverNode.isInState(ServerState.ONLINE)) {
1838        LOG.info("Skip adding ServerCrashProcedure for {} (meta={}) -- running?", serverNode,
1839          carryingMeta);
1840        return Procedure.NO_PROC_ID;
1841      } else {
1842        MasterProcedureEnv mpe = procExec.getEnvironment();
1843        // If serverNode == null, then 'Unknown Server'. Schedule HBCKSCP instead.
1844        // HBCKSCP scours Master in-memory state AND hbase;meta for references to
1845        // serverName just-in-case. An SCP that is scheduled when the server is
1846        // 'Unknown' probably originated externally with HBCK2 fix-it tool.
1847        ServerState oldState = null;
1848        if (serverNode != null) {
1849          oldState = serverNode.getState();
1850          serverNode.setState(ServerState.CRASHED);
1851        }
1852
1853        if (force) {
1854          pid = procExec.submitProcedure(
1855            new HBCKServerCrashProcedure(mpe, serverName, shouldSplitWal, carryingMeta));
1856        } else {
1857          pid = procExec.submitProcedure(
1858            new ServerCrashProcedure(mpe, serverName, shouldSplitWal, carryingMeta));
1859        }
1860        LOG.info("Scheduled ServerCrashProcedure pid={} for {} (carryingMeta={}){}.", pid,
1861          serverName, carryingMeta,
1862          serverNode == null ? "" : " " + serverNode.toString() + ", oldState=" + oldState);
1863      }
1864    } finally {
1865      if (serverNode != null) {
1866        serverNode.writeLock().unlock();
1867      }
1868    }
1869    return pid;
1870  }
1871
1872  public void offlineRegion(final RegionInfo regionInfo) {
1873    // TODO used by MasterRpcServices
1874    RegionStateNode node = regionStates.getRegionStateNode(regionInfo);
1875    if (node != null) {
1876      node.offline();
1877    }
1878  }
1879
1880  public void onlineRegion(final RegionInfo regionInfo, final ServerName serverName) {
1881    // TODO used by TestSplitTransactionOnCluster.java
1882  }
1883
1884  public Map<ServerName, List<RegionInfo>>
1885    getSnapShotOfAssignment(final Collection<RegionInfo> regions) {
1886    return regionStates.getSnapShotOfAssignment(regions);
1887  }
1888
1889  // ============================================================================================
1890  // TODO: UTILS/HELPERS?
1891  // ============================================================================================
1892  /**
1893   * Used by the client (via master) to identify if all regions have the schema updates
1894   * @return Pair indicating the status of the alter command (pending/total)
1895   */
1896  public Pair<Integer, Integer> getReopenStatus(TableName tableName) {
1897    if (isTableDisabled(tableName)) {
1898      return new Pair<Integer, Integer>(0, 0);
1899    }
1900
1901    final List<RegionState> states = regionStates.getTableRegionStates(tableName);
1902    int ritCount = 0;
1903    for (RegionState regionState : states) {
1904      if (!regionState.isOpened() && !regionState.isSplit()) {
1905        ritCount++;
1906      }
1907    }
1908    return new Pair<Integer, Integer>(ritCount, states.size());
1909  }
1910
1911  // ============================================================================================
1912  // TODO: Region State In Transition
1913  // ============================================================================================
1914  public boolean hasRegionsInTransition() {
1915    return regionStates.hasRegionsInTransition();
1916  }
1917
1918  public List<RegionStateNode> getRegionsInTransition() {
1919    return regionStates.getRegionsInTransition();
1920  }
1921
1922  public List<RegionInfo> getAssignedRegions() {
1923    return regionStates.getAssignedRegions();
1924  }
1925
1926  public RegionInfo getRegionInfo(final byte[] regionName) {
1927    final RegionStateNode regionState = regionStates.getRegionStateNodeFromName(regionName);
1928    return regionState != null ? regionState.getRegionInfo() : null;
1929  }
1930
1931  // ============================================================================================
1932  // Expected states on region state transition.
1933  // Notice that there is expected states for transiting to OPENING state, this is because SCP.
1934  // See the comments in regionOpening method for more details.
1935  // ============================================================================================
1936  private static final State[] STATES_EXPECTED_ON_OPEN = { State.OPENING, // Normal case
1937    State.OPEN // Retrying
1938  };
1939
1940  private static final State[] STATES_EXPECTED_ON_CLOSING = { State.OPEN, // Normal case
1941    State.CLOSING, // Retrying
1942    State.SPLITTING, // Offline the split parent
1943    State.MERGING // Offline the merge parents
1944  };
1945
1946  private static final State[] STATES_EXPECTED_ON_CLOSED = { State.CLOSING, // Normal case
1947    State.CLOSED // Retrying
1948  };
1949
1950  // This is for manually scheduled region assign, can add other states later if we find out other
1951  // usages
1952  private static final State[] STATES_EXPECTED_ON_ASSIGN = { State.CLOSED, State.OFFLINE };
1953
1954  // We only allow unassign or move a region which is in OPEN state.
1955  private static final State[] STATES_EXPECTED_ON_UNASSIGN_OR_MOVE = { State.OPEN };
1956
1957  // ============================================================================================
1958  // Region Status update
1959  // Should only be called in TransitRegionStateProcedure(and related procedures), as the locking
1960  // and pre-assumptions are very tricky.
1961  // ============================================================================================
1962  private void transitStateAndUpdate(RegionStateNode regionNode, RegionState.State newState,
1963    RegionState.State... expectedStates) throws IOException {
1964    RegionState.State state = regionNode.getState();
1965    regionNode.transitionState(newState, expectedStates);
1966    boolean succ = false;
1967    try {
1968      regionStateStore.updateRegionLocation(regionNode);
1969      succ = true;
1970    } finally {
1971      if (!succ) {
1972        // revert
1973        regionNode.setState(state);
1974      }
1975    }
1976  }
1977
1978  // should be called within the synchronized block of RegionStateNode
1979  void regionOpening(RegionStateNode regionNode) throws IOException {
1980    // As in SCP, for performance reason, there is no TRSP attached with this region, we will not
1981    // update the region state, which means that the region could be in any state when we want to
1982    // assign it after a RS crash. So here we do not pass the expectedStates parameter.
1983    transitStateAndUpdate(regionNode, State.OPENING);
1984    regionStates.addRegionToServer(regionNode);
1985    // update the operation count metrics
1986    metrics.incrementOperationCounter();
1987  }
1988
1989  // should be called under the RegionStateNode lock
1990  // The parameter 'giveUp' means whether we will try to open the region again, if it is true, then
1991  // we will persist the FAILED_OPEN state into hbase:meta.
1992  void regionFailedOpen(RegionStateNode regionNode, boolean giveUp) throws IOException {
1993    RegionState.State state = regionNode.getState();
1994    ServerName regionLocation = regionNode.getRegionLocation();
1995    if (giveUp) {
1996      regionNode.setState(State.FAILED_OPEN);
1997      regionNode.setRegionLocation(null);
1998      boolean succ = false;
1999      try {
2000        regionStateStore.updateRegionLocation(regionNode);
2001        succ = true;
2002      } finally {
2003        if (!succ) {
2004          // revert
2005          regionNode.setState(state);
2006          regionNode.setRegionLocation(regionLocation);
2007        }
2008      }
2009    }
2010    if (regionLocation != null) {
2011      regionStates.removeRegionFromServer(regionLocation, regionNode);
2012    }
2013  }
2014
2015  // should be called under the RegionStateNode lock
2016  void regionClosing(RegionStateNode regionNode) throws IOException {
2017    transitStateAndUpdate(regionNode, State.CLOSING, STATES_EXPECTED_ON_CLOSING);
2018
2019    RegionInfo hri = regionNode.getRegionInfo();
2020    // Set meta has not initialized early. so people trying to create/edit tables will wait
2021    if (isMetaRegion(hri)) {
2022      setMetaAssigned(hri, false);
2023    }
2024    regionStates.addRegionToServer(regionNode);
2025    // update the operation count metrics
2026    metrics.incrementOperationCounter();
2027  }
2028
2029  // for open and close, they will first be persist to the procedure store in
2030  // RegionRemoteProcedureBase. So here we will first change the in memory state as it is considered
2031  // as succeeded if the persistence to procedure store is succeeded, and then when the
2032  // RegionRemoteProcedureBase is woken up, we will persist the RegionStateNode to hbase:meta.
2033
2034  // should be called under the RegionStateNode lock
2035  void regionOpenedWithoutPersistingToMeta(RegionStateNode regionNode) throws IOException {
2036    regionNode.transitionState(State.OPEN, STATES_EXPECTED_ON_OPEN);
2037    RegionInfo regionInfo = regionNode.getRegionInfo();
2038    regionStates.addRegionToServer(regionNode);
2039    regionStates.removeFromFailedOpen(regionInfo);
2040  }
2041
2042  // should be called under the RegionStateNode lock
2043  void regionClosedWithoutPersistingToMeta(RegionStateNode regionNode) throws IOException {
2044    ServerName regionLocation = regionNode.getRegionLocation();
2045    regionNode.transitionState(State.CLOSED, STATES_EXPECTED_ON_CLOSED);
2046    regionNode.setRegionLocation(null);
2047    if (regionLocation != null) {
2048      regionNode.setLastHost(regionLocation);
2049      regionStates.removeRegionFromServer(regionLocation, regionNode);
2050    }
2051  }
2052
2053  // should be called under the RegionStateNode lock
2054  // for SCP
2055  public void regionClosedAbnormally(RegionStateNode regionNode) throws IOException {
2056    RegionState.State state = regionNode.getState();
2057    ServerName regionLocation = regionNode.getRegionLocation();
2058    regionNode.transitionState(State.ABNORMALLY_CLOSED);
2059    regionNode.setRegionLocation(null);
2060    boolean succ = false;
2061    try {
2062      regionStateStore.updateRegionLocation(regionNode);
2063      succ = true;
2064    } finally {
2065      if (!succ) {
2066        // revert
2067        regionNode.setState(state);
2068        regionNode.setRegionLocation(regionLocation);
2069      }
2070    }
2071    if (regionLocation != null) {
2072      regionNode.setLastHost(regionLocation);
2073      regionStates.removeRegionFromServer(regionLocation, regionNode);
2074    }
2075  }
2076
2077  void persistToMeta(RegionStateNode regionNode) throws IOException {
2078    regionStateStore.updateRegionLocation(regionNode);
2079    RegionInfo regionInfo = regionNode.getRegionInfo();
2080    if (isMetaRegion(regionInfo) && regionNode.getState() == State.OPEN) {
2081      // Usually we'd set a table ENABLED at this stage but hbase:meta is ALWAYs enabled, it
2082      // can't be disabled -- so skip the RPC (besides... enabled is managed by TableStateManager
2083      // which is backed by hbase:meta... Avoid setting ENABLED to avoid having to update state
2084      // on table that contains state.
2085      setMetaAssigned(regionInfo, true);
2086    }
2087  }
2088
2089  // ============================================================================================
2090  // The above methods can only be called in TransitRegionStateProcedure(and related procedures)
2091  // ============================================================================================
2092
2093  public void markRegionAsSplit(final RegionInfo parent, final ServerName serverName,
2094    final RegionInfo daughterA, final RegionInfo daughterB) throws IOException {
2095    // Update hbase:meta. Parent will be marked offline and split up in hbase:meta.
2096    // The parent stays in regionStates until cleared when removed by CatalogJanitor.
2097    // Update its state in regionStates to it shows as offline and split when read
2098    // later figuring what regions are in a table and what are not: see
2099    // regionStates#getRegionsOfTable
2100    final RegionStateNode node = regionStates.getOrCreateRegionStateNode(parent);
2101    node.setState(State.SPLIT);
2102    final RegionStateNode nodeA = regionStates.getOrCreateRegionStateNode(daughterA);
2103    nodeA.setState(State.SPLITTING_NEW);
2104    final RegionStateNode nodeB = regionStates.getOrCreateRegionStateNode(daughterB);
2105    nodeB.setState(State.SPLITTING_NEW);
2106
2107    TableDescriptor td = master.getTableDescriptors().get(parent.getTable());
2108    // TODO: here we just update the parent region info in meta, to set split and offline to true,
2109    // without changing the one in the region node. This is a bit confusing but the region info
2110    // field in RegionStateNode is not expected to be changed in the current design. Need to find a
2111    // possible way to address this problem, or at least adding more comments about the trick to
2112    // deal with this problem, that when you want to filter out split parent, you need to check both
2113    // the RegionState on whether it is split, and also the region info. If one of them matches then
2114    // it is a split parent. And usually only one of them can match, as after restart, the region
2115    // state will be changed from SPLIT to CLOSED.
2116    regionStateStore.splitRegion(parent, daughterA, daughterB, serverName, td);
2117    if (shouldAssignFavoredNodes(parent)) {
2118      List<ServerName> onlineServers = this.master.getServerManager().getOnlineServersList();
2119      getFavoredNodePromoter().generateFavoredNodesForDaughter(onlineServers, parent, daughterA,
2120        daughterB);
2121    }
2122  }
2123
2124  /**
2125   * When called here, the merge has happened. The merged regions have been unassigned and the above
2126   * markRegionClosed has been called on each so they have been disassociated from a hosting Server.
2127   * The merged region will be open after this call. The merged regions are removed from hbase:meta
2128   * below. Later they are deleted from the filesystem by the catalog janitor running against
2129   * hbase:meta. It notices when the merged region no longer holds references to the old regions
2130   * (References are deleted after a compaction rewrites what the Reference points at but not until
2131   * the archiver chore runs, are the References removed).
2132   */
2133  public void markRegionAsMerged(final RegionInfo child, final ServerName serverName,
2134    RegionInfo[] mergeParents) throws IOException {
2135    final RegionStateNode node = regionStates.getOrCreateRegionStateNode(child);
2136    node.setState(State.MERGED);
2137    for (RegionInfo ri : mergeParents) {
2138      regionStates.deleteRegion(ri);
2139    }
2140    TableDescriptor td = master.getTableDescriptors().get(child.getTable());
2141    regionStateStore.mergeRegions(child, mergeParents, serverName, td);
2142    if (shouldAssignFavoredNodes(child)) {
2143      getFavoredNodePromoter().generateFavoredNodesForMergedRegion(child, mergeParents);
2144    }
2145  }
2146
2147  /*
2148   * Favored nodes should be applied only when FavoredNodes balancer is configured and the region
2149   * belongs to a non-system table.
2150   */
2151  private boolean shouldAssignFavoredNodes(RegionInfo region) {
2152    return this.shouldAssignRegionsWithFavoredNodes
2153      && FavoredNodesManager.isFavoredNodeApplicable(region);
2154  }
2155
2156  // ============================================================================================
2157  // Assign Queue (Assign/Balance)
2158  // ============================================================================================
2159  private final ArrayList<RegionStateNode> pendingAssignQueue = new ArrayList<RegionStateNode>();
2160  private final ReentrantLock assignQueueLock = new ReentrantLock();
2161  private final Condition assignQueueFullCond = assignQueueLock.newCondition();
2162
2163  /**
2164   * Add the assign operation to the assignment queue. The pending assignment operation will be
2165   * processed, and each region will be assigned by a server using the balancer.
2166   */
2167  protected void queueAssign(final RegionStateNode regionNode) {
2168    regionNode.getProcedureEvent().suspend();
2169
2170    // TODO: quick-start for meta and the other sys-tables?
2171    assignQueueLock.lock();
2172    try {
2173      pendingAssignQueue.add(regionNode);
2174      if (
2175        regionNode.isSystemTable() || pendingAssignQueue.size() == 1
2176          || pendingAssignQueue.size() >= assignDispatchWaitQueueMaxSize
2177      ) {
2178        assignQueueFullCond.signal();
2179      }
2180    } finally {
2181      assignQueueLock.unlock();
2182    }
2183  }
2184
2185  private void startAssignmentThread() {
2186    assignThread = new Thread(master.getServerName().toShortString()) {
2187      @Override
2188      public void run() {
2189        while (isRunning()) {
2190          processAssignQueue();
2191        }
2192        pendingAssignQueue.clear();
2193      }
2194    };
2195    assignThread.setDaemon(true);
2196    assignThread.start();
2197  }
2198
2199  private void stopAssignmentThread() {
2200    assignQueueSignal();
2201    try {
2202      while (assignThread.isAlive()) {
2203        assignQueueSignal();
2204        assignThread.join(250);
2205      }
2206    } catch (InterruptedException e) {
2207      LOG.warn("join interrupted", e);
2208      Thread.currentThread().interrupt();
2209    }
2210  }
2211
2212  private void assignQueueSignal() {
2213    assignQueueLock.lock();
2214    try {
2215      assignQueueFullCond.signal();
2216    } finally {
2217      assignQueueLock.unlock();
2218    }
2219  }
2220
2221  @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP")
2222  private HashMap<RegionInfo, RegionStateNode> waitOnAssignQueue() {
2223    HashMap<RegionInfo, RegionStateNode> regions = null;
2224
2225    assignQueueLock.lock();
2226    try {
2227      if (pendingAssignQueue.isEmpty() && isRunning()) {
2228        assignQueueFullCond.await();
2229      }
2230
2231      if (!isRunning()) {
2232        return null;
2233      }
2234      assignQueueFullCond.await(assignDispatchWaitMillis, TimeUnit.MILLISECONDS);
2235      regions = new HashMap<RegionInfo, RegionStateNode>(pendingAssignQueue.size());
2236      for (RegionStateNode regionNode : pendingAssignQueue) {
2237        regions.put(regionNode.getRegionInfo(), regionNode);
2238      }
2239      pendingAssignQueue.clear();
2240    } catch (InterruptedException e) {
2241      LOG.warn("got interrupted ", e);
2242      Thread.currentThread().interrupt();
2243    } finally {
2244      assignQueueLock.unlock();
2245    }
2246    return regions;
2247  }
2248
2249  private void processAssignQueue() {
2250    final HashMap<RegionInfo, RegionStateNode> regions = waitOnAssignQueue();
2251    if (regions == null || regions.size() == 0 || !isRunning()) {
2252      return;
2253    }
2254
2255    if (LOG.isTraceEnabled()) {
2256      LOG.trace("PROCESS ASSIGN QUEUE regionCount=" + regions.size());
2257    }
2258
2259    // TODO: Optimize balancer. pass a RegionPlan?
2260    final HashMap<RegionInfo, ServerName> retainMap = new HashMap<>();
2261    final List<RegionInfo> userHRIs = new ArrayList<>(regions.size());
2262    // Regions for system tables requiring reassignment
2263    final List<RegionInfo> systemHRIs = new ArrayList<>();
2264    for (RegionStateNode regionStateNode : regions.values()) {
2265      boolean sysTable = regionStateNode.isSystemTable();
2266      final List<RegionInfo> hris = sysTable ? systemHRIs : userHRIs;
2267      if (regionStateNode.getRegionLocation() != null) {
2268        retainMap.put(regionStateNode.getRegionInfo(), regionStateNode.getRegionLocation());
2269      } else {
2270        hris.add(regionStateNode.getRegionInfo());
2271      }
2272    }
2273
2274    // TODO: connect with the listener to invalidate the cache
2275
2276    // TODO use events
2277    List<ServerName> servers = master.getServerManager().createDestinationServersList();
2278    for (int i = 0; servers.size() < 1; ++i) {
2279      // Report every fourth time around this loop; try not to flood log.
2280      if (i % 4 == 0) {
2281        LOG.warn("No servers available; cannot place " + regions.size() + " unassigned regions.");
2282      }
2283
2284      if (!isRunning()) {
2285        LOG.debug("Stopped! Dropping assign of " + regions.size() + " queued regions.");
2286        return;
2287      }
2288      Threads.sleep(250);
2289      servers = master.getServerManager().createDestinationServersList();
2290    }
2291
2292    if (!systemHRIs.isEmpty()) {
2293      // System table regions requiring reassignment are present, get region servers
2294      // not available for system table regions
2295      final List<ServerName> excludeServers = getExcludedServersForSystemTable();
2296      List<ServerName> serversForSysTables =
2297        servers.stream().filter(s -> !excludeServers.contains(s)).collect(Collectors.toList());
2298      if (serversForSysTables.isEmpty()) {
2299        LOG.warn("Filtering old server versions and the excluded produced an empty set; "
2300          + "instead considering all candidate servers!");
2301      }
2302      LOG.debug("Processing assignQueue; systemServersCount=" + serversForSysTables.size()
2303        + ", allServersCount=" + servers.size());
2304      processAssignmentPlans(regions, null, systemHRIs,
2305        serversForSysTables.isEmpty() && !containsBogusAssignments(regions, systemHRIs)
2306          ? servers
2307          : serversForSysTables);
2308    }
2309
2310    processAssignmentPlans(regions, retainMap, userHRIs, servers);
2311  }
2312
2313  private boolean containsBogusAssignments(Map<RegionInfo, RegionStateNode> regions,
2314    List<RegionInfo> hirs) {
2315    for (RegionInfo ri : hirs) {
2316      if (
2317        regions.get(ri).getRegionLocation() != null
2318          && regions.get(ri).getRegionLocation().equals(LoadBalancer.BOGUS_SERVER_NAME)
2319      ) {
2320        return true;
2321      }
2322    }
2323    return false;
2324  }
2325
2326  private void processAssignmentPlans(final HashMap<RegionInfo, RegionStateNode> regions,
2327    final HashMap<RegionInfo, ServerName> retainMap, final List<RegionInfo> hris,
2328    final List<ServerName> servers) {
2329    boolean isTraceEnabled = LOG.isTraceEnabled();
2330    if (isTraceEnabled) {
2331      LOG.trace("Available servers count=" + servers.size() + ": " + servers);
2332    }
2333
2334    final LoadBalancer balancer = getBalancer();
2335    // ask the balancer where to place regions
2336    if (retainMap != null && !retainMap.isEmpty()) {
2337      if (isTraceEnabled) {
2338        LOG.trace("retain assign regions=" + retainMap);
2339      }
2340      try {
2341        acceptPlan(regions, balancer.retainAssignment(retainMap, servers));
2342      } catch (IOException e) {
2343        LOG.warn("unable to retain assignment", e);
2344        addToPendingAssignment(regions, retainMap.keySet());
2345      }
2346    }
2347
2348    // TODO: Do we need to split retain and round-robin?
2349    // the retain seems to fallback to round-robin/random if the region is not in the map.
2350    if (!hris.isEmpty()) {
2351      Collections.sort(hris, RegionInfo.COMPARATOR);
2352      if (isTraceEnabled) {
2353        LOG.trace("round robin regions=" + hris);
2354      }
2355      try {
2356        acceptPlan(regions, balancer.roundRobinAssignment(hris, servers));
2357      } catch (IOException e) {
2358        LOG.warn("unable to round-robin assignment", e);
2359        addToPendingAssignment(regions, hris);
2360      }
2361    }
2362  }
2363
2364  private void acceptPlan(final HashMap<RegionInfo, RegionStateNode> regions,
2365    final Map<ServerName, List<RegionInfo>> plan) throws HBaseIOException {
2366    final ProcedureEvent<?>[] events = new ProcedureEvent[regions.size()];
2367    final long st = EnvironmentEdgeManager.currentTime();
2368
2369    if (plan.isEmpty()) {
2370      throw new HBaseIOException("unable to compute plans for regions=" + regions.size());
2371    }
2372
2373    int evcount = 0;
2374    for (Map.Entry<ServerName, List<RegionInfo>> entry : plan.entrySet()) {
2375      final ServerName server = entry.getKey();
2376      for (RegionInfo hri : entry.getValue()) {
2377        final RegionStateNode regionNode = regions.get(hri);
2378        regionNode.setRegionLocation(server);
2379        if (server.equals(LoadBalancer.BOGUS_SERVER_NAME) && regionNode.isSystemTable()) {
2380          assignQueueLock.lock();
2381          try {
2382            pendingAssignQueue.add(regionNode);
2383          } finally {
2384            assignQueueLock.unlock();
2385          }
2386        } else {
2387          events[evcount++] = regionNode.getProcedureEvent();
2388        }
2389      }
2390    }
2391    ProcedureEvent.wakeEvents(getProcedureScheduler(), events);
2392
2393    final long et = EnvironmentEdgeManager.currentTime();
2394    if (LOG.isTraceEnabled()) {
2395      LOG.trace("ASSIGN ACCEPT " + events.length + " -> " + StringUtils.humanTimeDiff(et - st));
2396    }
2397  }
2398
2399  private void addToPendingAssignment(final HashMap<RegionInfo, RegionStateNode> regions,
2400    final Collection<RegionInfo> pendingRegions) {
2401    assignQueueLock.lock();
2402    try {
2403      for (RegionInfo hri : pendingRegions) {
2404        pendingAssignQueue.add(regions.get(hri));
2405      }
2406    } finally {
2407      assignQueueLock.unlock();
2408    }
2409  }
2410
2411  /**
2412   * For a given cluster with mixed versions of servers, get a list of servers with lower versions,
2413   * where system table regions should not be assigned to. For system table, we must assign regions
2414   * to a server with highest version. However, we can disable this exclusion using config:
2415   * "hbase.min.version.move.system.tables" if checkForMinVersion is true. Detailed explanation
2416   * available with definition of minVersionToMoveSysTables.
2417   * @return List of Excluded servers for System table regions.
2418   */
2419  public List<ServerName> getExcludedServersForSystemTable() {
2420    // TODO: This should be a cached list kept by the ServerManager rather than calculated on each
2421    // move or system region assign. The RegionServerTracker keeps list of online Servers with
2422    // RegionServerInfo that includes Version.
2423    List<Pair<ServerName, String>> serverList =
2424      master.getServerManager().getOnlineServersList().stream()
2425        .map(s -> new Pair<>(s, master.getRegionServerVersion(s))).collect(Collectors.toList());
2426    if (serverList.isEmpty()) {
2427      return new ArrayList<>();
2428    }
2429    String highestVersion = Collections
2430      .max(serverList, (o1, o2) -> VersionInfo.compareVersion(o1.getSecond(), o2.getSecond()))
2431      .getSecond();
2432    if (!DEFAULT_MIN_VERSION_MOVE_SYS_TABLES_CONFIG.equals(minVersionToMoveSysTables)) {
2433      int comparedValue = VersionInfo.compareVersion(minVersionToMoveSysTables, highestVersion);
2434      if (comparedValue > 0) {
2435        return new ArrayList<>();
2436      }
2437    }
2438    return serverList.stream().filter(pair -> !pair.getSecond().equals(highestVersion))
2439      .map(Pair::getFirst).collect(Collectors.toList());
2440  }
2441
2442  MasterServices getMaster() {
2443    return master;
2444  }
2445
2446  /** Returns a snapshot of rsReports */
2447  public Map<ServerName, Set<byte[]>> getRSReports() {
2448    Map<ServerName, Set<byte[]>> rsReportsSnapshot = new HashMap<>();
2449    synchronized (rsReports) {
2450      rsReports.entrySet().forEach(e -> rsReportsSnapshot.put(e.getKey(), e.getValue()));
2451    }
2452    return rsReportsSnapshot;
2453  }
2454
2455  /**
2456   * Provide regions state count for given table. e.g howmany regions of give table are
2457   * opened/closed/rit etc
2458   * @param tableName TableName
2459   * @return region states count
2460   */
2461  public RegionStatesCount getRegionStatesCount(TableName tableName) {
2462    int openRegionsCount = 0;
2463    int closedRegionCount = 0;
2464    int ritCount = 0;
2465    int splitRegionCount = 0;
2466    int totalRegionCount = 0;
2467    if (!isTableDisabled(tableName)) {
2468      final List<RegionState> states = regionStates.getTableRegionStates(tableName);
2469      for (RegionState regionState : states) {
2470        if (regionState.isOpened()) {
2471          openRegionsCount++;
2472        } else if (regionState.isClosed()) {
2473          closedRegionCount++;
2474        } else if (regionState.isSplit()) {
2475          splitRegionCount++;
2476        }
2477      }
2478      totalRegionCount = states.size();
2479      ritCount = totalRegionCount - openRegionsCount - splitRegionCount;
2480    }
2481    return new RegionStatesCount.RegionStatesCountBuilder().setOpenRegions(openRegionsCount)
2482      .setClosedRegions(closedRegionCount).setSplitRegions(splitRegionCount)
2483      .setRegionsInTransition(ritCount).setTotalRegions(totalRegionCount).build();
2484  }
2485
2486}