001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.assignment;
019
020import edu.umd.cs.findbugs.annotations.NonNull;
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Collection;
024import java.util.Collections;
025import java.util.HashMap;
026import java.util.HashSet;
027import java.util.List;
028import java.util.Map;
029import java.util.Set;
030import java.util.concurrent.CompletableFuture;
031import java.util.concurrent.Future;
032import java.util.concurrent.TimeUnit;
033import java.util.concurrent.atomic.AtomicBoolean;
034import java.util.concurrent.locks.Condition;
035import java.util.concurrent.locks.ReentrantLock;
036import java.util.stream.Collectors;
037import java.util.stream.Stream;
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.hbase.CatalogFamilyFormat;
040import org.apache.hadoop.hbase.DoNotRetryIOException;
041import org.apache.hadoop.hbase.HBaseIOException;
042import org.apache.hadoop.hbase.HConstants;
043import org.apache.hadoop.hbase.PleaseHoldException;
044import org.apache.hadoop.hbase.ServerName;
045import org.apache.hadoop.hbase.TableName;
046import org.apache.hadoop.hbase.UnknownRegionException;
047import org.apache.hadoop.hbase.client.DoNotRetryRegionException;
048import org.apache.hadoop.hbase.client.MasterSwitchType;
049import org.apache.hadoop.hbase.client.RegionInfo;
050import org.apache.hadoop.hbase.client.RegionInfoBuilder;
051import org.apache.hadoop.hbase.client.RegionReplicaUtil;
052import org.apache.hadoop.hbase.client.RegionStatesCount;
053import org.apache.hadoop.hbase.client.Result;
054import org.apache.hadoop.hbase.client.ResultScanner;
055import org.apache.hadoop.hbase.client.Scan;
056import org.apache.hadoop.hbase.client.TableDescriptor;
057import org.apache.hadoop.hbase.client.TableState;
058import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
059import org.apache.hadoop.hbase.favored.FavoredNodesManager;
060import org.apache.hadoop.hbase.favored.FavoredNodesPromoter;
061import org.apache.hadoop.hbase.master.LoadBalancer;
062import org.apache.hadoop.hbase.master.MasterServices;
063import org.apache.hadoop.hbase.master.MetricsAssignmentManager;
064import org.apache.hadoop.hbase.master.RegionPlan;
065import org.apache.hadoop.hbase.master.RegionState;
066import org.apache.hadoop.hbase.master.RegionState.State;
067import org.apache.hadoop.hbase.master.ServerManager;
068import org.apache.hadoop.hbase.master.TableStateManager;
069import org.apache.hadoop.hbase.master.balancer.FavoredStochasticBalancer;
070import org.apache.hadoop.hbase.master.procedure.HBCKServerCrashProcedure;
071import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
072import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler;
073import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait;
074import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
075import org.apache.hadoop.hbase.master.procedure.TruncateRegionProcedure;
076import org.apache.hadoop.hbase.master.region.MasterRegion;
077import org.apache.hadoop.hbase.procedure2.Procedure;
078import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
079import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
080import org.apache.hadoop.hbase.procedure2.ProcedureInMemoryChore;
081import org.apache.hadoop.hbase.procedure2.util.StringUtils;
082import org.apache.hadoop.hbase.regionserver.SequenceId;
083import org.apache.hadoop.hbase.rsgroup.RSGroupBasedLoadBalancer;
084import org.apache.hadoop.hbase.util.Bytes;
085import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
086import org.apache.hadoop.hbase.util.FutureUtils;
087import org.apache.hadoop.hbase.util.Pair;
088import org.apache.hadoop.hbase.util.Threads;
089import org.apache.hadoop.hbase.util.VersionInfo;
090import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
091import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
092import org.apache.yetus.audience.InterfaceAudience;
093import org.apache.zookeeper.KeeperException;
094import org.slf4j.Logger;
095import org.slf4j.LoggerFactory;
096
097import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
098import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
099import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
100import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
101import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
102
103/**
104 * The AssignmentManager is the coordinator for region assign/unassign operations.
105 * <ul>
106 * <li>In-memory states of regions and servers are stored in {@link RegionStates}.</li>
107 * <li>hbase:meta state updates are handled by {@link RegionStateStore}.</li>
108 * </ul>
109 * Regions are created by CreateTable, Split, Merge. Regions are deleted by DeleteTable, Split,
110 * Merge. Assigns are triggered by CreateTable, EnableTable, Split, Merge, ServerCrash. Unassigns
111 * are triggered by DisableTable, Split, Merge
112 */
113@InterfaceAudience.Private
114public class AssignmentManager {
115  private static final Logger LOG = LoggerFactory.getLogger(AssignmentManager.class);
116
117  // TODO: AMv2
118  // - handle region migration from hbase1 to hbase2.
119  // - handle sys table assignment first (e.g. acl, namespace)
120  // - handle table priorities
121  // - If ServerBusyException trying to update hbase:meta, we abort the Master
122  // See updateRegionLocation in RegionStateStore.
123  //
124  // See also
125  // https://docs.google.com/document/d/1eVKa7FHdeoJ1-9o8yZcOTAQbv0u0bblBlCCzVSIn69g/edit#heading=h.ystjyrkbtoq5
126  // for other TODOs.
127
128  public static final String BOOTSTRAP_THREAD_POOL_SIZE_CONF_KEY =
129    "hbase.assignment.bootstrap.thread.pool.size";
130
131  public static final String ASSIGN_DISPATCH_WAIT_MSEC_CONF_KEY =
132    "hbase.assignment.dispatch.wait.msec";
133  private static final int DEFAULT_ASSIGN_DISPATCH_WAIT_MSEC = 150;
134
135  public static final String ASSIGN_DISPATCH_WAITQ_MAX_CONF_KEY =
136    "hbase.assignment.dispatch.wait.queue.max.size";
137  private static final int DEFAULT_ASSIGN_DISPATCH_WAITQ_MAX = 100;
138
139  public static final String RIT_CHORE_INTERVAL_MSEC_CONF_KEY =
140    "hbase.assignment.rit.chore.interval.msec";
141  private static final int DEFAULT_RIT_CHORE_INTERVAL_MSEC = 60 * 1000;
142
143  public static final String DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC_CONF_KEY =
144    "hbase.assignment.dead.region.metric.chore.interval.msec";
145  private static final int DEFAULT_DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC = 120 * 1000;
146
147  public static final String ASSIGN_MAX_ATTEMPTS = "hbase.assignment.maximum.attempts";
148  private static final int DEFAULT_ASSIGN_MAX_ATTEMPTS = Integer.MAX_VALUE;
149
150  public static final String ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS =
151    "hbase.assignment.retry.immediately.maximum.attempts";
152  private static final int DEFAULT_ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS = 3;
153
154  /** Region in Transition metrics threshold time */
155  public static final String METRICS_RIT_STUCK_WARNING_THRESHOLD =
156    "hbase.metrics.rit.stuck.warning.threshold";
157  private static final int DEFAULT_RIT_STUCK_WARNING_THRESHOLD = 60 * 1000;
158  public static final String UNEXPECTED_STATE_REGION = "Unexpected state for ";
159
160  public static final String FORCE_REGION_RETAINMENT = "hbase.master.scp.retain.assignment.force";
161
162  public static final boolean DEFAULT_FORCE_REGION_RETAINMENT = false;
163
164  /** The wait time in millis before checking again if the region's previous RS is back online */
165  public static final String FORCE_REGION_RETAINMENT_WAIT_INTERVAL =
166    "hbase.master.scp.retain.assignment.force.wait-interval";
167
168  public static final long DEFAULT_FORCE_REGION_RETAINMENT_WAIT_INTERVAL = 50;
169
170  /**
171   * The number of times to check if the region's previous RS is back online, before giving up and
172   * proceeding with assignment on a new RS
173   */
174  public static final String FORCE_REGION_RETAINMENT_RETRIES =
175    "hbase.master.scp.retain.assignment.force.retries";
176
177  public static final int DEFAULT_FORCE_REGION_RETAINMENT_RETRIES = 600;
178
179  private final ProcedureEvent<?> metaAssignEvent = new ProcedureEvent<>("meta assign");
180  private final ProcedureEvent<?> metaLoadEvent = new ProcedureEvent<>("meta load");
181
182  private final MetricsAssignmentManager metrics;
183  private final RegionInTransitionChore ritChore;
184  private final DeadServerMetricRegionChore deadMetricChore;
185  private final MasterServices master;
186
187  private final AtomicBoolean running = new AtomicBoolean(false);
188  private final RegionStates regionStates = new RegionStates();
189  private final RegionStateStore regionStateStore;
190
191  /**
192   * When the operator uses this configuration option, any version between the current cluster
193   * version and the value of "hbase.min.version.move.system.tables" does not trigger any
194   * auto-region movement. Auto-region movement here refers to auto-migration of system table
195   * regions to newer server versions. It is assumed that the configured range of versions does not
196   * require special handling of moving system table regions to higher versioned RegionServer. This
197   * auto-migration is done by {@link #checkIfShouldMoveSystemRegionAsync()}. Example: Let's assume
198   * the cluster is on version 1.4.0 and we have set "hbase.min.version.move.system.tables" as
199   * "2.0.0". Now if we upgrade one RegionServer on 1.4.0 cluster to 1.6.0 (< 2.0.0), then
200   * AssignmentManager will not move hbase:meta, hbase:namespace and other system table regions to
201   * newly brought up RegionServer 1.6.0 as part of auto-migration. However, if we upgrade one
202   * RegionServer on 1.4.0 cluster to 2.2.0 (> 2.0.0), then AssignmentManager will move all system
203   * table regions to newly brought up RegionServer 2.2.0 as part of auto-migration done by
204   * {@link #checkIfShouldMoveSystemRegionAsync()}. "hbase.min.version.move.system.tables" is
205   * introduced as part of HBASE-22923.
206   */
207  private final String minVersionToMoveSysTables;
208
209  private static final String MIN_VERSION_MOVE_SYS_TABLES_CONFIG =
210    "hbase.min.version.move.system.tables";
211  private static final String DEFAULT_MIN_VERSION_MOVE_SYS_TABLES_CONFIG = "";
212
213  private final Map<ServerName, Set<byte[]>> rsReports = new HashMap<>();
214
215  private final boolean shouldAssignRegionsWithFavoredNodes;
216  private final int assignDispatchWaitQueueMaxSize;
217  private final int assignDispatchWaitMillis;
218  private final int assignMaxAttempts;
219  private final int assignRetryImmediatelyMaxAttempts;
220
221  private final MasterRegion masterRegion;
222
223  private final Object checkIfShouldMoveSystemRegionLock = new Object();
224
225  private Thread assignThread;
226
227  private final boolean forceRegionRetainment;
228
229  private final long forceRegionRetainmentWaitInterval;
230
231  private final int forceRegionRetainmentRetries;
232
233  public AssignmentManager(MasterServices master, MasterRegion masterRegion) {
234    this(master, masterRegion, new RegionStateStore(master, masterRegion));
235  }
236
237  AssignmentManager(MasterServices master, MasterRegion masterRegion, RegionStateStore stateStore) {
238    this.master = master;
239    this.regionStateStore = stateStore;
240    this.metrics = new MetricsAssignmentManager();
241    this.masterRegion = masterRegion;
242
243    final Configuration conf = master.getConfiguration();
244
245    // Only read favored nodes if using the favored nodes load balancer.
246    this.shouldAssignRegionsWithFavoredNodes = FavoredStochasticBalancer.class
247      .isAssignableFrom(conf.getClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, Object.class));
248
249    this.assignDispatchWaitMillis =
250      conf.getInt(ASSIGN_DISPATCH_WAIT_MSEC_CONF_KEY, DEFAULT_ASSIGN_DISPATCH_WAIT_MSEC);
251    this.assignDispatchWaitQueueMaxSize =
252      conf.getInt(ASSIGN_DISPATCH_WAITQ_MAX_CONF_KEY, DEFAULT_ASSIGN_DISPATCH_WAITQ_MAX);
253
254    this.assignMaxAttempts =
255      Math.max(1, conf.getInt(ASSIGN_MAX_ATTEMPTS, DEFAULT_ASSIGN_MAX_ATTEMPTS));
256    this.assignRetryImmediatelyMaxAttempts = conf.getInt(ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS,
257      DEFAULT_ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS);
258
259    int ritChoreInterval =
260      conf.getInt(RIT_CHORE_INTERVAL_MSEC_CONF_KEY, DEFAULT_RIT_CHORE_INTERVAL_MSEC);
261    this.ritChore = new RegionInTransitionChore(ritChoreInterval);
262
263    int deadRegionChoreInterval = conf.getInt(DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC_CONF_KEY,
264      DEFAULT_DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC);
265    if (deadRegionChoreInterval > 0) {
266      this.deadMetricChore = new DeadServerMetricRegionChore(deadRegionChoreInterval);
267    } else {
268      this.deadMetricChore = null;
269    }
270    minVersionToMoveSysTables =
271      conf.get(MIN_VERSION_MOVE_SYS_TABLES_CONFIG, DEFAULT_MIN_VERSION_MOVE_SYS_TABLES_CONFIG);
272
273    forceRegionRetainment =
274      conf.getBoolean(FORCE_REGION_RETAINMENT, DEFAULT_FORCE_REGION_RETAINMENT);
275    forceRegionRetainmentWaitInterval = conf.getLong(FORCE_REGION_RETAINMENT_WAIT_INTERVAL,
276      DEFAULT_FORCE_REGION_RETAINMENT_WAIT_INTERVAL);
277    forceRegionRetainmentRetries =
278      conf.getInt(FORCE_REGION_RETAINMENT_RETRIES, DEFAULT_FORCE_REGION_RETAINMENT_RETRIES);
279  }
280
281  private void mirrorMetaLocations() throws IOException, KeeperException {
282    // For compatibility, mirror the meta region state to zookeeper
283    // And we still need to use zookeeper to publish the meta region locations to region
284    // server, so they can serve as ClientMetaService
285    ZKWatcher zk = master.getZooKeeper();
286    if (zk == null || !zk.getRecoverableZooKeeper().getState().isAlive()) {
287      // this is possible in tests, we do not provide a zk watcher or the zk watcher has been closed
288      return;
289    }
290    Collection<RegionStateNode> metaStates = regionStates.getRegionStateNodes();
291    for (RegionStateNode metaState : metaStates) {
292      MetaTableLocator.setMetaLocation(zk, metaState.getRegionLocation(),
293        metaState.getRegionInfo().getReplicaId(), metaState.getState());
294    }
295    int replicaCount = metaStates.size();
296    // remove extra mirror locations
297    for (String znode : zk.getMetaReplicaNodes()) {
298      int replicaId = zk.getZNodePaths().getMetaReplicaIdFromZNode(znode);
299      if (replicaId >= replicaCount) {
300        MetaTableLocator.deleteMetaLocation(zk, replicaId);
301      }
302    }
303  }
304
305  public void start() throws IOException, KeeperException {
306    if (!running.compareAndSet(false, true)) {
307      return;
308    }
309
310    LOG.trace("Starting assignment manager");
311
312    // Start the Assignment Thread
313    startAssignmentThread();
314    // load meta region states.
315    // here we are still in the early steps of active master startup. There is only one thread(us)
316    // can access AssignmentManager and create region node, so here we do not need to lock the
317    // region node.
318    try (ResultScanner scanner =
319      masterRegion.getScanner(new Scan().addFamily(HConstants.CATALOG_FAMILY))) {
320      for (;;) {
321        Result result = scanner.next();
322        if (result == null) {
323          break;
324        }
325        RegionStateStore
326          .visitMetaEntry((r, regionInfo, state, regionLocation, lastHost, openSeqNum) -> {
327            RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo);
328            regionNode.setState(state);
329            regionNode.setLastHost(lastHost);
330            regionNode.setRegionLocation(regionLocation);
331            regionNode.setOpenSeqNum(openSeqNum);
332            if (regionNode.getProcedure() != null) {
333              regionNode.getProcedure().stateLoaded(this, regionNode);
334            }
335            if (regionLocation != null) {
336              // TODO: this could lead to some orphan server state nodes, as it is possible that the
337              // region server is already dead and its SCP has already finished but we have
338              // persisted an opening state on this region server. Finally the TRSP will assign the
339              // region to another region server, so it will not cause critical problems, just waste
340              // some memory as no one will try to cleanup these orphan server state nodes.
341              regionStates.createServer(regionLocation);
342              regionStates.addRegionToServer(regionNode);
343            }
344            if (RegionReplicaUtil.isDefaultReplica(regionInfo.getReplicaId())) {
345              setMetaAssigned(regionInfo, state == State.OPEN);
346            }
347            LOG.debug("Loaded hbase:meta {}", regionNode);
348          }, result);
349      }
350    }
351    mirrorMetaLocations();
352  }
353
354  /**
355   * Create RegionStateNode based on the TRSP list, and attach the TRSP to the RegionStateNode.
356   * <p>
357   * This is used to restore the RIT region list, so we do not need to restore it in the loadingMeta
358   * method below. And it is also very important as now before submitting a TRSP, we need to attach
359   * it to the RegionStateNode, which acts like a guard, so we need to restore this information at
360   * the very beginning, before we start processing any procedures.
361   */
362  public void setupRIT(List<TransitRegionStateProcedure> procs) {
363    procs.forEach(proc -> {
364      RegionInfo regionInfo = proc.getRegion();
365      RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo);
366      TransitRegionStateProcedure existingProc = regionNode.getProcedure();
367      if (existingProc != null) {
368        // This is possible, as we will detach the procedure from the RSN before we
369        // actually finish the procedure. This is because that, we will detach the TRSP from the RSN
370        // during execution, at that time, the procedure has not been marked as done in the pv2
371        // framework yet, so it is possible that we schedule a new TRSP immediately and when
372        // arriving here, we will find out that there are multiple TRSPs for the region. But we can
373        // make sure that, only the last one can take the charge, the previous ones should have all
374        // been finished already. So here we will compare the proc id, the greater one will win.
375        if (existingProc.getProcId() < proc.getProcId()) {
376          // the new one wins, unset and set it to the new one below
377          regionNode.unsetProcedure(existingProc);
378        } else {
379          // the old one wins, skip
380          return;
381        }
382      }
383      LOG.info("Attach {} to {} to restore RIT", proc, regionNode);
384      regionNode.setProcedure(proc);
385    });
386  }
387
388  public void stop() {
389    if (!running.compareAndSet(true, false)) {
390      return;
391    }
392
393    LOG.info("Stopping assignment manager");
394
395    // The AM is started before the procedure executor,
396    // but the actual work will be loaded/submitted only once we have the executor
397    final boolean hasProcExecutor = master.getMasterProcedureExecutor() != null;
398
399    // Remove the RIT chore
400    if (hasProcExecutor) {
401      master.getMasterProcedureExecutor().removeChore(this.ritChore);
402      if (this.deadMetricChore != null) {
403        master.getMasterProcedureExecutor().removeChore(this.deadMetricChore);
404      }
405    }
406
407    // Stop the Assignment Thread
408    stopAssignmentThread();
409
410    // Stop the RegionStateStore
411    regionStates.clear();
412
413    // Update meta events (for testing)
414    if (hasProcExecutor) {
415      metaLoadEvent.suspend();
416      for (RegionInfo hri : getMetaRegionSet()) {
417        setMetaAssigned(hri, false);
418      }
419    }
420  }
421
422  public boolean isRunning() {
423    return running.get();
424  }
425
426  public Configuration getConfiguration() {
427    return master.getConfiguration();
428  }
429
430  public MetricsAssignmentManager getAssignmentManagerMetrics() {
431    return metrics;
432  }
433
434  private LoadBalancer getBalancer() {
435    return master.getLoadBalancer();
436  }
437
438  private FavoredNodesPromoter getFavoredNodePromoter() {
439    return (FavoredNodesPromoter) ((RSGroupBasedLoadBalancer) master.getLoadBalancer())
440      .getInternalBalancer();
441  }
442
443  private MasterProcedureEnv getProcedureEnvironment() {
444    return master.getMasterProcedureExecutor().getEnvironment();
445  }
446
447  private MasterProcedureScheduler getProcedureScheduler() {
448    return getProcedureEnvironment().getProcedureScheduler();
449  }
450
451  int getAssignMaxAttempts() {
452    return assignMaxAttempts;
453  }
454
455  public boolean isForceRegionRetainment() {
456    return forceRegionRetainment;
457  }
458
459  public long getForceRegionRetainmentWaitInterval() {
460    return forceRegionRetainmentWaitInterval;
461  }
462
463  public int getForceRegionRetainmentRetries() {
464    return forceRegionRetainmentRetries;
465  }
466
467  int getAssignRetryImmediatelyMaxAttempts() {
468    return assignRetryImmediatelyMaxAttempts;
469  }
470
471  public RegionStates getRegionStates() {
472    return regionStates;
473  }
474
475  /**
476   * Returns the regions hosted by the specified server.
477   * <p/>
478   * Notice that, for SCP, after we submit the SCP, no one can change the region list for the
479   * ServerStateNode so we do not need any locks here. And for other usage, this can only give you a
480   * snapshot of the current region list for this server, which means, right after you get the
481   * region list, new regions may be moved to this server or some regions may be moved out from this
482   * server, so you should not use it critically if you need strong consistency.
483   */
484  public List<RegionInfo> getRegionsOnServer(ServerName serverName) {
485    ServerStateNode serverInfo = regionStates.getServerNode(serverName);
486    if (serverInfo == null) {
487      return Collections.emptyList();
488    }
489    return serverInfo.getRegionInfoList();
490  }
491
492  private RegionInfo getRegionInfo(RegionStateNode rsn) {
493    if (rsn.isSplit() && !rsn.getRegionInfo().isSplit()) {
494      // see the comments in markRegionAsSplit on why we need to do this converting.
495      return RegionInfoBuilder.newBuilder(rsn.getRegionInfo()).setSplit(true).setOffline(true)
496        .build();
497    } else {
498      return rsn.getRegionInfo();
499    }
500  }
501
502  private Stream<RegionStateNode> getRegionStateNodes(TableName tableName,
503    boolean excludeOfflinedSplitParents) {
504    Stream<RegionStateNode> stream = regionStates.getTableRegionStateNodes(tableName).stream();
505    if (excludeOfflinedSplitParents) {
506      return stream.filter(rsn -> !rsn.isSplit());
507    } else {
508      return stream;
509    }
510  }
511
512  public List<RegionInfo> getTableRegions(TableName tableName,
513    boolean excludeOfflinedSplitParents) {
514    return getRegionStateNodes(tableName, excludeOfflinedSplitParents).map(this::getRegionInfo)
515      .collect(Collectors.toList());
516  }
517
518  public List<Pair<RegionInfo, ServerName>> getTableRegionsAndLocations(TableName tableName,
519    boolean excludeOfflinedSplitParents) {
520    return getRegionStateNodes(tableName, excludeOfflinedSplitParents)
521      .map(rsn -> Pair.newPair(getRegionInfo(rsn), rsn.getRegionLocation()))
522      .collect(Collectors.toList());
523  }
524
525  public RegionStateStore getRegionStateStore() {
526    return regionStateStore;
527  }
528
529  public List<ServerName> getFavoredNodes(final RegionInfo regionInfo) {
530    return this.shouldAssignRegionsWithFavoredNodes
531      ? getFavoredNodePromoter().getFavoredNodes(regionInfo)
532      : ServerName.EMPTY_SERVER_LIST;
533  }
534
535  // ============================================================================================
536  // Table State Manager helpers
537  // ============================================================================================
538  private TableStateManager getTableStateManager() {
539    return master.getTableStateManager();
540  }
541
542  private boolean isTableEnabled(final TableName tableName) {
543    return getTableStateManager().isTableState(tableName, TableState.State.ENABLED);
544  }
545
546  private boolean isTableDisabled(final TableName tableName) {
547    return getTableStateManager().isTableState(tableName, TableState.State.DISABLED,
548      TableState.State.DISABLING);
549  }
550
551  // ============================================================================================
552  // META Helpers
553  // ============================================================================================
554  private boolean isMetaRegion(final RegionInfo regionInfo) {
555    return regionInfo.isMetaRegion();
556  }
557
558  public boolean isMetaRegion(final byte[] regionName) {
559    return getMetaRegionFromName(regionName) != null;
560  }
561
562  public RegionInfo getMetaRegionFromName(final byte[] regionName) {
563    for (RegionInfo hri : getMetaRegionSet()) {
564      if (Bytes.equals(hri.getRegionName(), regionName)) {
565        return hri;
566      }
567    }
568    return null;
569  }
570
571  public boolean isCarryingMeta(final ServerName serverName) {
572    // TODO: handle multiple meta
573    return isCarryingRegion(serverName, RegionInfoBuilder.FIRST_META_REGIONINFO);
574  }
575
576  private boolean isCarryingRegion(final ServerName serverName, final RegionInfo regionInfo) {
577    // TODO: check for state?
578    final RegionStateNode node = regionStates.getRegionStateNode(regionInfo);
579    return (node != null && serverName.equals(node.getRegionLocation()));
580  }
581
582  private RegionInfo getMetaForRegion(final RegionInfo regionInfo) {
583    // if (regionInfo.isMetaRegion()) return regionInfo;
584    // TODO: handle multiple meta. if the region provided is not meta lookup
585    // which meta the region belongs to.
586    return RegionInfoBuilder.FIRST_META_REGIONINFO;
587  }
588
589  // TODO: handle multiple meta.
590  private static final Set<RegionInfo> META_REGION_SET =
591    Collections.singleton(RegionInfoBuilder.FIRST_META_REGIONINFO);
592
593  public Set<RegionInfo> getMetaRegionSet() {
594    return META_REGION_SET;
595  }
596
597  // ============================================================================================
598  // META Event(s) helpers
599  // ============================================================================================
600  /**
601   * Notice that, this only means the meta region is available on a RS, but the AM may still be
602   * loading the region states from meta, so usually you need to check {@link #isMetaLoaded()} first
603   * before checking this method, unless you can make sure that your piece of code can only be
604   * executed after AM builds the region states.
605   * @see #isMetaLoaded()
606   */
607  public boolean isMetaAssigned() {
608    return metaAssignEvent.isReady();
609  }
610
611  public boolean isMetaRegionInTransition() {
612    return !isMetaAssigned();
613  }
614
615  /**
616   * Notice that this event does not mean the AM has already finished region state rebuilding. See
617   * the comment of {@link #isMetaAssigned()} for more details.
618   * @see #isMetaAssigned()
619   */
620  public boolean waitMetaAssigned(Procedure<?> proc, RegionInfo regionInfo) {
621    return getMetaAssignEvent(getMetaForRegion(regionInfo)).suspendIfNotReady(proc);
622  }
623
624  private void setMetaAssigned(RegionInfo metaRegionInfo, boolean assigned) {
625    assert isMetaRegion(metaRegionInfo) : "unexpected non-meta region " + metaRegionInfo;
626    ProcedureEvent<?> metaAssignEvent = getMetaAssignEvent(metaRegionInfo);
627    if (assigned) {
628      metaAssignEvent.wake(getProcedureScheduler());
629    } else {
630      metaAssignEvent.suspend();
631    }
632  }
633
634  private ProcedureEvent<?> getMetaAssignEvent(RegionInfo metaRegionInfo) {
635    assert isMetaRegion(metaRegionInfo) : "unexpected non-meta region " + metaRegionInfo;
636    // TODO: handle multiple meta.
637    return metaAssignEvent;
638  }
639
640  /**
641   * Wait until AM finishes the meta loading, i.e, the region states rebuilding.
642   * @see #isMetaLoaded()
643   * @see #waitMetaAssigned(Procedure, RegionInfo)
644   */
645  public boolean waitMetaLoaded(Procedure<?> proc) {
646    return metaLoadEvent.suspendIfNotReady(proc);
647  }
648
649  /**
650   * This method will be called in master initialization method after calling
651   * {@link #processOfflineRegions()}, as in processOfflineRegions we will generate assign
652   * procedures for offline regions, which may be conflict with creating table.
653   * <p/>
654   * This is a bit dirty, should be reconsidered after we decide whether to keep the
655   * {@link #processOfflineRegions()} method.
656   */
657  public void wakeMetaLoadedEvent() {
658    metaLoadEvent.wake(getProcedureScheduler());
659    assert isMetaLoaded() : "expected meta to be loaded";
660  }
661
662  /**
663   * Return whether AM finishes the meta loading, i.e, the region states rebuilding.
664   * @see #isMetaAssigned()
665   * @see #waitMetaLoaded(Procedure)
666   */
667  public boolean isMetaLoaded() {
668    return metaLoadEvent.isReady();
669  }
670
671  /**
672   * Start a new thread to check if there are region servers whose versions are higher than others.
673   * If so, move all system table regions to RS with the highest version to keep compatibility. The
674   * reason is, RS in new version may not be able to access RS in old version when there are some
675   * incompatible changes.
676   * <p>
677   * This method is called when a new RegionServer is added to cluster only.
678   * </p>
679   */
680  public void checkIfShouldMoveSystemRegionAsync() {
681    // TODO: Fix this thread. If a server is killed and a new one started, this thread thinks that
682    // it should 'move' the system tables from the old server to the new server but
683    // ServerCrashProcedure is on it; and it will take care of the assign without dataloss.
684    if (this.master.getServerManager().countOfRegionServers() <= 1) {
685      return;
686    }
687    // This thread used to run whenever there was a change in the cluster. The ZooKeeper
688    // childrenChanged notification came in before the nodeDeleted message and so this method
689    // cold run before a ServerCrashProcedure could run. That meant that this thread could see
690    // a Crashed Server before ServerCrashProcedure and it could find system regions on the
691    // crashed server and go move them before ServerCrashProcedure had a chance; could be
692    // dataloss too if WALs were not recovered.
693    new Thread(() -> {
694      try {
695        synchronized (checkIfShouldMoveSystemRegionLock) {
696          List<RegionPlan> plans = new ArrayList<>();
697          // TODO: I don't think this code does a good job if all servers in cluster have same
698          // version. It looks like it will schedule unnecessary moves.
699          for (ServerName server : getExcludedServersForSystemTable()) {
700            if (master.getServerManager().isServerDead(server)) {
701              // TODO: See HBASE-18494 and HBASE-18495. Though getExcludedServersForSystemTable()
702              // considers only online servers, the server could be queued for dead server
703              // processing. As region assignments for crashed server is handled by
704              // ServerCrashProcedure, do NOT handle them here. The goal is to handle this through
705              // regular flow of LoadBalancer as a favored node and not to have this special
706              // handling.
707              continue;
708            }
709            List<RegionInfo> regionsShouldMove = getSystemTables(server);
710            if (!regionsShouldMove.isEmpty()) {
711              for (RegionInfo regionInfo : regionsShouldMove) {
712                // null value for dest forces destination server to be selected by balancer
713                RegionPlan plan = new RegionPlan(regionInfo, server, null);
714                if (regionInfo.isMetaRegion()) {
715                  // Must move meta region first.
716                  LOG.info("Async MOVE of {} to newer Server={}", regionInfo.getEncodedName(),
717                    server);
718                  moveAsync(plan);
719                } else {
720                  plans.add(plan);
721                }
722              }
723            }
724            for (RegionPlan plan : plans) {
725              LOG.info("Async MOVE of {} to newer Server={}", plan.getRegionInfo().getEncodedName(),
726                server);
727              moveAsync(plan);
728            }
729          }
730        }
731      } catch (Throwable t) {
732        LOG.error(t.toString(), t);
733      }
734    }).start();
735  }
736
737  private List<RegionInfo> getSystemTables(ServerName serverName) {
738    ServerStateNode serverNode = regionStates.getServerNode(serverName);
739    if (serverNode == null) {
740      return Collections.emptyList();
741    }
742    return serverNode.getSystemRegionInfoList();
743  }
744
745  private void preTransitCheck(RegionStateNode regionNode, RegionState.State[] expectedStates)
746    throws HBaseIOException {
747    if (regionNode.getProcedure() != null) {
748      throw new HBaseIOException(
749        regionNode + " is currently in transition; pid=" + regionNode.getProcedure().getProcId());
750    }
751    if (!regionNode.isInState(expectedStates)) {
752      throw new DoNotRetryRegionException(UNEXPECTED_STATE_REGION + regionNode);
753    }
754    if (isTableDisabled(regionNode.getTable())) {
755      throw new DoNotRetryIOException(regionNode.getTable() + " is disabled for " + regionNode);
756    }
757  }
758
759  /**
760   * Create an assign TransitRegionStateProcedure. Makes sure of RegionState. Throws exception if
761   * not appropriate UNLESS override is set. Used by hbck2 but also by straightline
762   * {@link #assign(RegionInfo, ServerName)} and {@link #assignAsync(RegionInfo, ServerName)}.
763   * @see #createAssignProcedure(RegionStateNode, ServerName) for a version that does NO checking
764   *      used when only when no checking needed.
765   * @param override If false, check RegionState is appropriate for assign; if not throw exception.
766   */
767  private TransitRegionStateProcedure createAssignProcedure(RegionInfo regionInfo, ServerName sn,
768    boolean override, boolean force) throws IOException {
769    RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo);
770    regionNode.lock();
771    try {
772      if (override) {
773        if (!force) {
774          preTransitCheck(regionNode, STATES_EXPECTED_ON_ASSIGN);
775        }
776        if (regionNode.getProcedure() != null) {
777          regionNode.unsetProcedure(regionNode.getProcedure());
778        }
779      } else {
780        preTransitCheck(regionNode, STATES_EXPECTED_ON_ASSIGN);
781      }
782      assert regionNode.getProcedure() == null;
783      return regionNode.setProcedure(
784        TransitRegionStateProcedure.assign(getProcedureEnvironment(), regionInfo, sn));
785    } finally {
786      regionNode.unlock();
787    }
788  }
789
790  /**
791   * Create an assign TransitRegionStateProcedure. Does NO checking of RegionState. Presumes
792   * appriopriate state ripe for assign.
793   * @see #createAssignProcedure(RegionInfo, ServerName, boolean, boolean)
794   */
795  private TransitRegionStateProcedure createAssignProcedure(RegionStateNode regionNode,
796    ServerName targetServer) {
797    regionNode.lock();
798    try {
799      return regionNode.setProcedure(TransitRegionStateProcedure.assign(getProcedureEnvironment(),
800        regionNode.getRegionInfo(), targetServer));
801    } finally {
802      regionNode.unlock();
803    }
804  }
805
806  public long assign(RegionInfo regionInfo, ServerName sn) throws IOException {
807    TransitRegionStateProcedure proc = createAssignProcedure(regionInfo, sn, false, false);
808    ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc);
809    return proc.getProcId();
810  }
811
812  public long assign(RegionInfo regionInfo) throws IOException {
813    return assign(regionInfo, null);
814  }
815
816  /**
817   * Submits a procedure that assigns a region to a target server without waiting for it to finish
818   * @param regionInfo the region we would like to assign
819   * @param sn         target server name
820   */
821  public Future<byte[]> assignAsync(RegionInfo regionInfo, ServerName sn) throws IOException {
822    return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(),
823      createAssignProcedure(regionInfo, sn, false, false));
824  }
825
826  /**
827   * Submits a procedure that assigns a region without waiting for it to finish
828   * @param regionInfo the region we would like to assign
829   */
830  public Future<byte[]> assignAsync(RegionInfo regionInfo) throws IOException {
831    return assignAsync(regionInfo, null);
832  }
833
834  public long unassign(RegionInfo regionInfo) throws IOException {
835    RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo);
836    if (regionNode == null) {
837      throw new UnknownRegionException("No RegionState found for " + regionInfo.getEncodedName());
838    }
839    TransitRegionStateProcedure proc;
840    regionNode.lock();
841    try {
842      preTransitCheck(regionNode, STATES_EXPECTED_ON_UNASSIGN_OR_MOVE);
843      proc = TransitRegionStateProcedure.unassign(getProcedureEnvironment(), regionInfo);
844      regionNode.setProcedure(proc);
845    } finally {
846      regionNode.unlock();
847    }
848    ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc);
849    return proc.getProcId();
850  }
851
852  public TransitRegionStateProcedure createMoveRegionProcedure(RegionInfo regionInfo,
853    ServerName targetServer) throws HBaseIOException {
854    RegionStateNode regionNode = this.regionStates.getRegionStateNode(regionInfo);
855    if (regionNode == null) {
856      throw new UnknownRegionException(
857        "No RegionStateNode found for " + regionInfo.getEncodedName() + "(Closed/Deleted?)");
858    }
859    TransitRegionStateProcedure proc;
860    regionNode.lock();
861    try {
862      preTransitCheck(regionNode, STATES_EXPECTED_ON_UNASSIGN_OR_MOVE);
863      regionNode.checkOnline();
864      proc = TransitRegionStateProcedure.move(getProcedureEnvironment(), regionInfo, targetServer);
865      regionNode.setProcedure(proc);
866    } finally {
867      regionNode.unlock();
868    }
869    return proc;
870  }
871
872  public void move(RegionInfo regionInfo) throws IOException {
873    TransitRegionStateProcedure proc = createMoveRegionProcedure(regionInfo, null);
874    ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc);
875  }
876
877  public Future<byte[]> moveAsync(RegionPlan regionPlan) throws HBaseIOException {
878    TransitRegionStateProcedure proc =
879      createMoveRegionProcedure(regionPlan.getRegionInfo(), regionPlan.getDestination());
880    return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), proc);
881  }
882
883  public Future<byte[]> balance(RegionPlan regionPlan) throws HBaseIOException {
884    ServerName current =
885      this.getRegionStates().getRegionAssignments().get(regionPlan.getRegionInfo());
886    if (current == null || !current.equals(regionPlan.getSource())) {
887      LOG.debug("Skip region plan {}, source server not match, current region location is {}",
888        regionPlan, current == null ? "(null)" : current);
889      return null;
890    }
891    return moveAsync(regionPlan);
892  }
893
894  // ============================================================================================
895  // RegionTransition procedures helpers
896  // ============================================================================================
897
898  /**
899   * Create round-robin assigns. Use on table creation to distribute out regions across cluster.
900   * @return AssignProcedures made out of the passed in <code>hris</code> and a call to the balancer
901   *         to populate the assigns with targets chosen using round-robin (default balancer
902   *         scheme). If at assign-time, the target chosen is no longer up, thats fine, the
903   *         AssignProcedure will ask the balancer for a new target, and so on.
904   */
905  public TransitRegionStateProcedure[] createRoundRobinAssignProcedures(List<RegionInfo> hris,
906    List<ServerName> serversToExclude) {
907    if (hris.isEmpty()) {
908      return new TransitRegionStateProcedure[0];
909    }
910
911    if (
912      serversToExclude != null && this.master.getServerManager().getOnlineServersList().size() == 1
913    ) {
914      LOG.debug("Only one region server found and hence going ahead with the assignment");
915      serversToExclude = null;
916    }
917    try {
918      // Ask the balancer to assign our regions. Pass the regions en masse. The balancer can do
919      // a better job if it has all the assignments in the one lump.
920      Map<ServerName, List<RegionInfo>> assignments = getBalancer().roundRobinAssignment(hris,
921        this.master.getServerManager().createDestinationServersList(serversToExclude));
922      // Return mid-method!
923      return createAssignProcedures(assignments);
924    } catch (IOException hioe) {
925      LOG.warn("Failed roundRobinAssignment", hioe);
926    }
927    // If an error above, fall-through to this simpler assign. Last resort.
928    return createAssignProcedures(hris);
929  }
930
931  /**
932   * Create round-robin assigns. Use on table creation to distribute out regions across cluster.
933   * @return AssignProcedures made out of the passed in <code>hris</code> and a call to the balancer
934   *         to populate the assigns with targets chosen using round-robin (default balancer
935   *         scheme). If at assign-time, the target chosen is no longer up, thats fine, the
936   *         AssignProcedure will ask the balancer for a new target, and so on.
937   */
938  public TransitRegionStateProcedure[] createRoundRobinAssignProcedures(List<RegionInfo> hris) {
939    return createRoundRobinAssignProcedures(hris, null);
940  }
941
942  static int compare(TransitRegionStateProcedure left, TransitRegionStateProcedure right) {
943    if (left.getRegion().isMetaRegion()) {
944      if (right.getRegion().isMetaRegion()) {
945        return RegionInfo.COMPARATOR.compare(left.getRegion(), right.getRegion());
946      }
947      return -1;
948    } else if (right.getRegion().isMetaRegion()) {
949      return +1;
950    }
951    if (left.getRegion().getTable().isSystemTable()) {
952      if (right.getRegion().getTable().isSystemTable()) {
953        return RegionInfo.COMPARATOR.compare(left.getRegion(), right.getRegion());
954      }
955      return -1;
956    } else if (right.getRegion().getTable().isSystemTable()) {
957      return +1;
958    }
959    return RegionInfo.COMPARATOR.compare(left.getRegion(), right.getRegion());
960  }
961
962  /**
963   * Create one TransitRegionStateProcedure to assign a region w/o specifying a target server. This
964   * method is called from HBCK2.
965   * @return an assign or null
966   */
967  public TransitRegionStateProcedure createOneAssignProcedure(RegionInfo ri, boolean override,
968    boolean force) {
969    TransitRegionStateProcedure trsp = null;
970    try {
971      trsp = createAssignProcedure(ri, null, override, force);
972    } catch (IOException ioe) {
973      LOG.info(
974        "Failed {} assign, override={}"
975          + (override ? "" : "; set override to by-pass state checks."),
976        ri.getEncodedName(), override, ioe);
977    }
978    return trsp;
979  }
980
981  /**
982   * Create one TransitRegionStateProcedure to unassign a region. This method is called from HBCK2.
983   * @return an unassign or null
984   */
985  public TransitRegionStateProcedure createOneUnassignProcedure(RegionInfo ri, boolean override,
986    boolean force) {
987    RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(ri);
988    TransitRegionStateProcedure trsp = null;
989    regionNode.lock();
990    try {
991      if (override) {
992        if (!force) {
993          preTransitCheck(regionNode, STATES_EXPECTED_ON_UNASSIGN_OR_MOVE);
994        }
995        if (regionNode.getProcedure() != null) {
996          regionNode.unsetProcedure(regionNode.getProcedure());
997        }
998      } else {
999        // This is where we could throw an exception; i.e. override is false.
1000        preTransitCheck(regionNode, STATES_EXPECTED_ON_UNASSIGN_OR_MOVE);
1001      }
1002      assert regionNode.getProcedure() == null;
1003      trsp =
1004        TransitRegionStateProcedure.unassign(getProcedureEnvironment(), regionNode.getRegionInfo());
1005      regionNode.setProcedure(trsp);
1006    } catch (IOException ioe) {
1007      // 'override' must be false here.
1008      LOG.info("Failed {} unassign, override=false; set override to by-pass state checks.",
1009        ri.getEncodedName(), ioe);
1010    } finally {
1011      regionNode.unlock();
1012    }
1013    return trsp;
1014  }
1015
1016  /**
1017   * Create an array of TransitRegionStateProcedure w/o specifying a target server. Used as fallback
1018   * of caller is unable to do {@link #createAssignProcedures(Map)}.
1019   * <p/>
1020   * If no target server, at assign time, we will try to use the former location of the region if
1021   * one exists. This is how we 'retain' the old location across a server restart.
1022   * <p/>
1023   * Should only be called when you can make sure that no one can touch these regions other than
1024   * you. For example, when you are creating or enabling table. Presumes all Regions are in
1025   * appropriate state ripe for assign; no checking of Region state is done in here.
1026   * @see #createAssignProcedures(Map)
1027   */
1028  public TransitRegionStateProcedure[] createAssignProcedures(List<RegionInfo> hris) {
1029    return hris.stream().map(hri -> regionStates.getOrCreateRegionStateNode(hri))
1030      .map(regionNode -> createAssignProcedure(regionNode, null)).sorted(AssignmentManager::compare)
1031      .toArray(TransitRegionStateProcedure[]::new);
1032  }
1033
1034  /**
1035   * Tied to {@link #createAssignProcedures(List)} in that it is called if caller is unable to run
1036   * this method. Presumes all Regions are in appropriate state ripe for assign; no checking of
1037   * Region state is done in here.
1038   * @param assignments Map of assignments from which we produce an array of AssignProcedures.
1039   * @return Assignments made from the passed in <code>assignments</code>
1040   * @see #createAssignProcedures(List)
1041   */
1042  private TransitRegionStateProcedure[]
1043    createAssignProcedures(Map<ServerName, List<RegionInfo>> assignments) {
1044    return assignments.entrySet().stream()
1045      .flatMap(e -> e.getValue().stream().map(hri -> regionStates.getOrCreateRegionStateNode(hri))
1046        .map(regionNode -> createAssignProcedure(regionNode, e.getKey())))
1047      .sorted(AssignmentManager::compare).toArray(TransitRegionStateProcedure[]::new);
1048  }
1049
1050  // for creating unassign TRSP when disabling a table or closing excess region replicas
1051  private TransitRegionStateProcedure forceCreateUnssignProcedure(RegionStateNode regionNode) {
1052    regionNode.lock();
1053    try {
1054      if (regionNode.isInState(State.OFFLINE, State.CLOSED, State.SPLIT)) {
1055        return null;
1056      }
1057      // in general, a split parent should be in CLOSED or SPLIT state, but anyway, let's check it
1058      // here for safety
1059      if (regionNode.getRegionInfo().isSplit()) {
1060        LOG.warn("{} is a split parent but not in CLOSED or SPLIT state", regionNode);
1061        return null;
1062      }
1063      // As in DisableTableProcedure or ModifyTableProcedure, we will hold the xlock for table, so
1064      // we can make sure that this procedure has not been executed yet, as TRSP will hold the
1065      // shared lock for table all the time. So here we will unset it and when it is actually
1066      // executed, it will find that the attach procedure is not itself and quit immediately.
1067      if (regionNode.getProcedure() != null) {
1068        regionNode.unsetProcedure(regionNode.getProcedure());
1069      }
1070      return regionNode.setProcedure(TransitRegionStateProcedure.unassign(getProcedureEnvironment(),
1071        regionNode.getRegionInfo()));
1072    } finally {
1073      regionNode.unlock();
1074    }
1075  }
1076
1077  /**
1078   * Called by DisableTableProcedure to unassign all the regions for a table.
1079   */
1080  public TransitRegionStateProcedure[] createUnassignProceduresForDisabling(TableName tableName) {
1081    return regionStates.getTableRegionStateNodes(tableName).stream()
1082      .map(this::forceCreateUnssignProcedure).filter(p -> p != null)
1083      .toArray(TransitRegionStateProcedure[]::new);
1084  }
1085
1086  /**
1087   * Called by ModifyTableProcedures to unassign all the excess region replicas for a table.
1088   */
1089  public TransitRegionStateProcedure[] createUnassignProceduresForClosingExcessRegionReplicas(
1090    TableName tableName, int newReplicaCount) {
1091    return regionStates.getTableRegionStateNodes(tableName).stream()
1092      .filter(regionNode -> regionNode.getRegionInfo().getReplicaId() >= newReplicaCount)
1093      .map(this::forceCreateUnssignProcedure).filter(p -> p != null)
1094      .toArray(TransitRegionStateProcedure[]::new);
1095  }
1096
1097  public SplitTableRegionProcedure createSplitProcedure(final RegionInfo regionToSplit,
1098    final byte[] splitKey) throws IOException {
1099    return new SplitTableRegionProcedure(getProcedureEnvironment(), regionToSplit, splitKey);
1100  }
1101
1102  public TruncateRegionProcedure createTruncateRegionProcedure(final RegionInfo regionToTruncate)
1103    throws IOException {
1104    return new TruncateRegionProcedure(getProcedureEnvironment(), regionToTruncate);
1105  }
1106
1107  public MergeTableRegionsProcedure createMergeProcedure(RegionInfo... ris) throws IOException {
1108    return new MergeTableRegionsProcedure(getProcedureEnvironment(), ris, false);
1109  }
1110
1111  /**
1112   * Delete the region states. This is called by "DeleteTable"
1113   */
1114  public void deleteTable(final TableName tableName) throws IOException {
1115    final ArrayList<RegionInfo> regions = regionStates.getTableRegionsInfo(tableName);
1116    regionStateStore.deleteRegions(regions);
1117    for (int i = 0; i < regions.size(); ++i) {
1118      final RegionInfo regionInfo = regions.get(i);
1119      regionStates.deleteRegion(regionInfo);
1120    }
1121  }
1122
1123  // ============================================================================================
1124  // RS Region Transition Report helpers
1125  // ============================================================================================
1126  private void reportRegionStateTransition(ReportRegionStateTransitionResponse.Builder builder,
1127    ServerStateNode serverNode, List<RegionStateTransition> transitionList) throws IOException {
1128    for (RegionStateTransition transition : transitionList) {
1129      switch (transition.getTransitionCode()) {
1130        case OPENED:
1131        case FAILED_OPEN:
1132        case CLOSED:
1133          assert transition.getRegionInfoCount() == 1 : transition;
1134          final RegionInfo hri = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0));
1135          long procId =
1136            transition.getProcIdCount() > 0 ? transition.getProcId(0) : Procedure.NO_PROC_ID;
1137          updateRegionTransition(serverNode, transition.getTransitionCode(), hri,
1138            transition.hasOpenSeqNum() ? transition.getOpenSeqNum() : HConstants.NO_SEQNUM, procId);
1139          break;
1140        case READY_TO_SPLIT:
1141        case SPLIT:
1142        case SPLIT_REVERTED:
1143          assert transition.getRegionInfoCount() == 3 : transition;
1144          final RegionInfo parent = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0));
1145          final RegionInfo splitA = ProtobufUtil.toRegionInfo(transition.getRegionInfo(1));
1146          final RegionInfo splitB = ProtobufUtil.toRegionInfo(transition.getRegionInfo(2));
1147          updateRegionSplitTransition(serverNode, transition.getTransitionCode(), parent, splitA,
1148            splitB);
1149          break;
1150        case READY_TO_MERGE:
1151        case MERGED:
1152        case MERGE_REVERTED:
1153          assert transition.getRegionInfoCount() == 3 : transition;
1154          final RegionInfo merged = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0));
1155          final RegionInfo mergeA = ProtobufUtil.toRegionInfo(transition.getRegionInfo(1));
1156          final RegionInfo mergeB = ProtobufUtil.toRegionInfo(transition.getRegionInfo(2));
1157          updateRegionMergeTransition(serverNode, transition.getTransitionCode(), merged, mergeA,
1158            mergeB);
1159          break;
1160      }
1161    }
1162  }
1163
1164  public ReportRegionStateTransitionResponse reportRegionStateTransition(
1165    final ReportRegionStateTransitionRequest req) throws PleaseHoldException {
1166    ReportRegionStateTransitionResponse.Builder builder =
1167      ReportRegionStateTransitionResponse.newBuilder();
1168    ServerName serverName = ProtobufUtil.toServerName(req.getServer());
1169    ServerStateNode serverNode = regionStates.getServerNode(serverName);
1170    if (serverNode == null) {
1171      LOG.warn("No server node for {}", serverName);
1172      builder.setErrorMessage("No server node for " + serverName);
1173      return builder.build();
1174    }
1175    // here we have to acquire a read lock instead of a simple exclusive lock. This is because that
1176    // we should not block other reportRegionStateTransition call from the same region server. This
1177    // is not only about performance, but also to prevent dead lock. Think of the meta region is
1178    // also on the same region server and you hold the lock which blocks the
1179    // reportRegionStateTransition for meta, and since meta is not online, you will block inside the
1180    // lock protection to wait for meta online...
1181    serverNode.readLock().lock();
1182    try {
1183      // we only accept reportRegionStateTransition if the region server is online, see the comment
1184      // above in submitServerCrash method and HBASE-21508 for more details.
1185      if (serverNode.isInState(ServerState.ONLINE)) {
1186        try {
1187          reportRegionStateTransition(builder, serverNode, req.getTransitionList());
1188        } catch (PleaseHoldException e) {
1189          LOG.trace("Failed transition ", e);
1190          throw e;
1191        } catch (UnsupportedOperationException | IOException e) {
1192          // TODO: at the moment we have a single error message and the RS will abort
1193          // if the master says that one of the region transitions failed.
1194          LOG.warn("Failed transition", e);
1195          builder.setErrorMessage("Failed transition " + e.getMessage());
1196        }
1197      } else {
1198        LOG.warn("The region server {} is already dead, skip reportRegionStateTransition call",
1199          serverName);
1200        builder.setErrorMessage("You are dead");
1201      }
1202    } finally {
1203      serverNode.readLock().unlock();
1204    }
1205
1206    return builder.build();
1207  }
1208
1209  private void updateRegionTransition(ServerStateNode serverNode, TransitionCode state,
1210    RegionInfo regionInfo, long seqId, long procId) throws IOException {
1211    checkMetaLoaded(regionInfo);
1212
1213    RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo);
1214    if (regionNode == null) {
1215      // the table/region is gone. maybe a delete, split, merge
1216      throw new UnexpectedStateException(String.format(
1217        "Server %s was trying to transition region %s to %s. but Region is not known.",
1218        serverNode.getServerName(), regionInfo, state));
1219    }
1220    LOG.trace("Update region transition serverName={} region={} regionState={}",
1221      serverNode.getServerName(), regionNode, state);
1222
1223    regionNode.lock();
1224    try {
1225      if (!reportTransition(regionNode, serverNode, state, seqId, procId)) {
1226        // Don't log WARN if shutting down cluster; during shutdown. Avoid the below messages:
1227        // 2018-08-13 10:45:10,551 WARN ...AssignmentManager: No matching procedure found for
1228        // rit=OPEN, location=ve0538.halxg.cloudera.com,16020,1533493000958,
1229        // table=IntegrationTestBigLinkedList, region=65ab289e2fc1530df65f6c3d7cde7aa5 transition
1230        // to CLOSED
1231        // These happen because on cluster shutdown, we currently let the RegionServers close
1232        // regions. This is the only time that region close is not run by the Master (so cluster
1233        // goes down fast). Consider changing it so Master runs all shutdowns.
1234        if (
1235          this.master.getServerManager().isClusterShutdown() && state.equals(TransitionCode.CLOSED)
1236        ) {
1237          LOG.info("RegionServer {} {}", state, regionNode.getRegionInfo().getEncodedName());
1238        } else {
1239          LOG.warn("No matching procedure found for {} transition on {} to {}",
1240            serverNode.getServerName(), regionNode, state);
1241        }
1242      }
1243    } finally {
1244      regionNode.unlock();
1245    }
1246  }
1247
1248  private boolean reportTransition(RegionStateNode regionNode, ServerStateNode serverNode,
1249    TransitionCode state, long seqId, long procId) throws IOException {
1250    ServerName serverName = serverNode.getServerName();
1251    TransitRegionStateProcedure proc = regionNode.getProcedure();
1252    if (proc == null) {
1253      return false;
1254    }
1255    proc.reportTransition(master.getMasterProcedureExecutor().getEnvironment(), regionNode,
1256      serverName, state, seqId, procId);
1257    return true;
1258  }
1259
1260  private void updateRegionSplitTransition(final ServerStateNode serverNode,
1261    final TransitionCode state, final RegionInfo parent, final RegionInfo hriA,
1262    final RegionInfo hriB) throws IOException {
1263    checkMetaLoaded(parent);
1264
1265    if (state != TransitionCode.READY_TO_SPLIT) {
1266      throw new UnexpectedStateException(
1267        "unsupported split regionState=" + state + " for parent region " + parent
1268          + " maybe an old RS (< 2.0) had the operation in progress");
1269    }
1270
1271    // sanity check on the request
1272    if (!Bytes.equals(hriA.getEndKey(), hriB.getStartKey())) {
1273      throw new UnsupportedOperationException("unsupported split request with bad keys: parent="
1274        + parent + " hriA=" + hriA + " hriB=" + hriB);
1275    }
1276
1277    if (!master.isSplitOrMergeEnabled(MasterSwitchType.SPLIT)) {
1278      LOG.warn("Split switch is off! skip split of " + parent);
1279      throw new DoNotRetryIOException(
1280        "Split region " + parent.getRegionNameAsString() + " failed due to split switch off");
1281    }
1282
1283    // Submit the Split procedure
1284    final byte[] splitKey = hriB.getStartKey();
1285    if (LOG.isDebugEnabled()) {
1286      LOG.debug("Split request from {}, parent={}, splitKey={}", serverNode.getServerName(), parent,
1287        Bytes.toStringBinary(splitKey));
1288    }
1289    // Processing this report happens asynchronously from other activities which can mutate
1290    // the region state. For example, a split procedure may already be running for this parent.
1291    // A split procedure cannot succeed if the parent region is no longer open, so we can
1292    // ignore it in that case.
1293    // Note that submitting more than one split procedure for a given region is
1294    // harmless -- the split is fenced in the procedure handling -- but it would be noisy in
1295    // the logs. Only one procedure can succeed. The other procedure(s) would abort during
1296    // initialization and report failure with WARN level logging.
1297    RegionState parentState = regionStates.getRegionState(parent);
1298    if (parentState != null && parentState.isOpened()) {
1299      master.getMasterProcedureExecutor().submitProcedure(createSplitProcedure(parent, splitKey));
1300    } else {
1301      LOG.info("Ignoring split request from {}, parent={} because parent is unknown or not open",
1302        serverNode.getServerName(), parent);
1303      return;
1304    }
1305
1306    // If the RS is < 2.0 throw an exception to abort the operation, we are handling the split
1307    if (master.getServerManager().getVersionNumber(serverNode.getServerName()) < 0x0200000) {
1308      throw new UnsupportedOperationException(
1309        String.format("Split handled by the master: " + "parent=%s hriA=%s hriB=%s",
1310          parent.getShortNameToLog(), hriA, hriB));
1311    }
1312  }
1313
1314  private void updateRegionMergeTransition(final ServerStateNode serverNode,
1315    final TransitionCode state, final RegionInfo merged, final RegionInfo hriA,
1316    final RegionInfo hriB) throws IOException {
1317    checkMetaLoaded(merged);
1318
1319    if (state != TransitionCode.READY_TO_MERGE) {
1320      throw new UnexpectedStateException(
1321        "Unsupported merge regionState=" + state + " for regionA=" + hriA + " regionB=" + hriB
1322          + " merged=" + merged + " maybe an old RS (< 2.0) had the operation in progress");
1323    }
1324
1325    if (!master.isSplitOrMergeEnabled(MasterSwitchType.MERGE)) {
1326      LOG.warn("Merge switch is off! skip merge of regionA=" + hriA + " regionB=" + hriB);
1327      throw new DoNotRetryIOException(
1328        "Merge of regionA=" + hriA + " regionB=" + hriB + " failed because merge switch is off");
1329    }
1330
1331    // Submit the Merge procedure
1332    if (LOG.isDebugEnabled()) {
1333      LOG.debug("Handling merge request from RS=" + merged + ", merged=" + merged);
1334    }
1335    master.getMasterProcedureExecutor().submitProcedure(createMergeProcedure(hriA, hriB));
1336
1337    // If the RS is < 2.0 throw an exception to abort the operation, we are handling the merge
1338    if (master.getServerManager().getVersionNumber(serverNode.getServerName()) < 0x0200000) {
1339      throw new UnsupportedOperationException(
1340        String.format("Merge not handled yet: regionState=%s merged=%s hriA=%s hriB=%s", state,
1341          merged, hriA, hriB));
1342    }
1343  }
1344
1345  // ============================================================================================
1346  // RS Status update (report online regions) helpers
1347  // ============================================================================================
1348  /**
1349   * The master will call this method when the RS send the regionServerReport(). The report will
1350   * contains the "online regions". This method will check the the online regions against the
1351   * in-memory state of the AM, and we will log a warn message if there is a mismatch. This is
1352   * because that there is no fencing between the reportRegionStateTransition method and
1353   * regionServerReport method, so there could be race and introduce inconsistency here, but
1354   * actually there is no problem.
1355   * <p/>
1356   * Please see HBASE-21421 and HBASE-21463 for more details.
1357   */
1358  public void reportOnlineRegions(ServerName serverName, Set<byte[]> regionNames) {
1359    if (!isRunning()) {
1360      return;
1361    }
1362    if (LOG.isTraceEnabled()) {
1363      LOG.trace("ReportOnlineRegions {} regionCount={}, metaLoaded={} {}", serverName,
1364        regionNames.size(), isMetaLoaded(),
1365        regionNames.stream().map(Bytes::toStringBinary).collect(Collectors.toList()));
1366    }
1367
1368    ServerStateNode serverNode = regionStates.getServerNode(serverName);
1369    if (serverNode == null) {
1370      LOG.warn("Got a report from server {} where its server node is null", serverName);
1371      return;
1372    }
1373    serverNode.readLock().lock();
1374    try {
1375      if (!serverNode.isInState(ServerState.ONLINE)) {
1376        LOG.warn("Got a report from a server result in state {}", serverNode);
1377        return;
1378      }
1379    } finally {
1380      serverNode.readLock().unlock();
1381    }
1382
1383    // Track the regionserver reported online regions in memory.
1384    synchronized (rsReports) {
1385      rsReports.put(serverName, regionNames);
1386    }
1387
1388    if (regionNames.isEmpty()) {
1389      // nothing to do if we don't have regions
1390      LOG.trace("no online region found on {}", serverName);
1391      return;
1392    }
1393    if (!isMetaLoaded()) {
1394      // we are still on startup, skip checking
1395      return;
1396    }
1397    // The Heartbeat tells us of what regions are on the region serve, check the state.
1398    checkOnlineRegionsReport(serverNode, regionNames);
1399  }
1400
1401  /**
1402   * Close <code>regionName</code> on <code>sn</code> silently and immediately without using a
1403   * Procedure or going via hbase:meta. For case where a RegionServer's hosting of a Region is not
1404   * aligned w/ the Master's accounting of Region state. This is for cleaning up an error in
1405   * accounting.
1406   */
1407  private void closeRegionSilently(ServerName sn, byte[] regionName) {
1408    try {
1409      RegionInfo ri = CatalogFamilyFormat.parseRegionInfoFromRegionName(regionName);
1410      // Pass -1 for timeout. Means do not wait.
1411      ServerManager.closeRegionSilentlyAndWait(this.master.getAsyncClusterConnection(), sn, ri, -1);
1412    } catch (Exception e) {
1413      LOG.error("Failed trying to close {} on {}", Bytes.toStringBinary(regionName), sn, e);
1414    }
1415  }
1416
1417  /**
1418   * Check that what the RegionServer reports aligns with the Master's image. If disagreement, we
1419   * will tell the RegionServer to expediently close a Region we do not think it should have.
1420   */
1421  private void checkOnlineRegionsReport(ServerStateNode serverNode, Set<byte[]> regionNames) {
1422    ServerName serverName = serverNode.getServerName();
1423    for (byte[] regionName : regionNames) {
1424      if (!isRunning()) {
1425        return;
1426      }
1427      RegionStateNode regionNode = regionStates.getRegionStateNodeFromName(regionName);
1428      if (regionNode == null) {
1429        String regionNameAsStr = Bytes.toStringBinary(regionName);
1430        LOG.warn("No RegionStateNode for {} but reported as up on {}; closing...", regionNameAsStr,
1431          serverName);
1432        closeRegionSilently(serverNode.getServerName(), regionName);
1433        continue;
1434      }
1435      final long lag = 1000;
1436      // This is just a fallback check designed to identify unexpected data inconsistencies, so we
1437      // use tryLock to attempt to acquire the lock, and if the lock cannot be acquired, we skip the
1438      // check. This will not cause any additional problems and also prevents the regionServerReport
1439      // call from being stuck for too long which may cause deadlock on region assignment.
1440      if (regionNode.tryLock()) {
1441        try {
1442          long diff = EnvironmentEdgeManager.currentTime() - regionNode.getLastUpdate();
1443          if (regionNode.isInState(State.OPENING, State.OPEN)) {
1444            // This is possible as a region server has just closed a region but the region server
1445            // report is generated before the closing, but arrive after the closing. Make sure
1446            // there
1447            // is some elapsed time so less false alarms.
1448            if (!regionNode.getRegionLocation().equals(serverName) && diff > lag) {
1449              LOG.warn("Reporting {} server does not match {} (time since last "
1450                + "update={}ms); closing...", serverName, regionNode, diff);
1451              closeRegionSilently(serverNode.getServerName(), regionName);
1452            }
1453          } else if (!regionNode.isInState(State.CLOSING, State.SPLITTING)) {
1454            // So, we can get report that a region is CLOSED or SPLIT because a heartbeat
1455            // came in at about same time as a region transition. Make sure there is some
1456            // elapsed time so less false alarms.
1457            if (diff > lag) {
1458              LOG.warn("Reporting {} state does not match {} (time since last update={}ms)",
1459                serverName, regionNode, diff);
1460            }
1461          }
1462        } finally {
1463          regionNode.unlock();
1464        }
1465      } else {
1466        LOG.warn(
1467          "Unable to acquire lock for regionNode {}. It is likely that another thread is currently holding the lock. To avoid deadlock, skip execution for now.",
1468          regionNode);
1469      }
1470    }
1471  }
1472
1473  // ============================================================================================
1474  // RIT chore
1475  // ============================================================================================
1476  private static class RegionInTransitionChore extends ProcedureInMemoryChore<MasterProcedureEnv> {
1477    public RegionInTransitionChore(final int timeoutMsec) {
1478      super(timeoutMsec);
1479    }
1480
1481    @Override
1482    protected void periodicExecute(final MasterProcedureEnv env) {
1483      final AssignmentManager am = env.getAssignmentManager();
1484
1485      final RegionInTransitionStat ritStat = am.computeRegionInTransitionStat();
1486      if (ritStat.hasRegionsOverThreshold()) {
1487        for (RegionState hri : ritStat.getRegionOverThreshold()) {
1488          am.handleRegionOverStuckWarningThreshold(hri.getRegion());
1489        }
1490      }
1491
1492      // update metrics
1493      am.updateRegionsInTransitionMetrics(ritStat);
1494    }
1495  }
1496
1497  private static class DeadServerMetricRegionChore
1498    extends ProcedureInMemoryChore<MasterProcedureEnv> {
1499    public DeadServerMetricRegionChore(final int timeoutMsec) {
1500      super(timeoutMsec);
1501    }
1502
1503    @Override
1504    protected void periodicExecute(final MasterProcedureEnv env) {
1505      final ServerManager sm = env.getMasterServices().getServerManager();
1506      final AssignmentManager am = env.getAssignmentManager();
1507      // To minimize inconsistencies we are not going to snapshot live servers in advance in case
1508      // new servers are added; OTOH we don't want to add heavy sync for a consistent view since
1509      // this is for metrics. Instead, we're going to check each regions as we go; to avoid making
1510      // too many checks, we maintain a local lists of server, limiting us to false negatives. If
1511      // we miss some recently-dead server, we'll just see it next time.
1512      Set<ServerName> recentlyLiveServers = new HashSet<>();
1513      int deadRegions = 0, unknownRegions = 0;
1514      for (RegionStateNode rsn : am.getRegionStates().getRegionStateNodes()) {
1515        if (rsn.getState() != State.OPEN) {
1516          continue; // Opportunistic check, should quickly skip RITs, offline tables, etc.
1517        }
1518        // Do not need to acquire region state lock as this is only for showing metrics.
1519        ServerName sn = rsn.getRegionLocation();
1520        State state = rsn.getState();
1521        if (state != State.OPEN) {
1522          continue; // Mostly skipping RITs that are already being take care of.
1523        }
1524        if (sn == null) {
1525          ++unknownRegions; // Opened on null?
1526          continue;
1527        }
1528        if (recentlyLiveServers.contains(sn)) {
1529          continue;
1530        }
1531        ServerManager.ServerLiveState sls = sm.isServerKnownAndOnline(sn);
1532        switch (sls) {
1533          case LIVE:
1534            recentlyLiveServers.add(sn);
1535            break;
1536          case DEAD:
1537            ++deadRegions;
1538            break;
1539          case UNKNOWN:
1540            ++unknownRegions;
1541            break;
1542          default:
1543            throw new AssertionError("Unexpected " + sls);
1544        }
1545      }
1546      if (deadRegions > 0 || unknownRegions > 0) {
1547        LOG.info("Found {} OPEN regions on dead servers and {} OPEN regions on unknown servers",
1548          deadRegions, unknownRegions);
1549      }
1550
1551      am.updateDeadServerRegionMetrics(deadRegions, unknownRegions);
1552    }
1553  }
1554
1555  public RegionInTransitionStat computeRegionInTransitionStat() {
1556    final RegionInTransitionStat rit = new RegionInTransitionStat(getConfiguration());
1557    rit.update(this);
1558    return rit;
1559  }
1560
1561  public static class RegionInTransitionStat {
1562    private final int ritThreshold;
1563
1564    private HashMap<String, RegionState> ritsOverThreshold = null;
1565    private long statTimestamp;
1566    private long oldestRITTime = 0;
1567    private int totalRITsTwiceThreshold = 0;
1568    private int totalRITs = 0;
1569
1570    public RegionInTransitionStat(final Configuration conf) {
1571      this.ritThreshold =
1572        conf.getInt(METRICS_RIT_STUCK_WARNING_THRESHOLD, DEFAULT_RIT_STUCK_WARNING_THRESHOLD);
1573    }
1574
1575    public int getRITThreshold() {
1576      return ritThreshold;
1577    }
1578
1579    public long getTimestamp() {
1580      return statTimestamp;
1581    }
1582
1583    public int getTotalRITs() {
1584      return totalRITs;
1585    }
1586
1587    public long getOldestRITTime() {
1588      return oldestRITTime;
1589    }
1590
1591    public int getTotalRITsOverThreshold() {
1592      Map<String, RegionState> m = this.ritsOverThreshold;
1593      return m != null ? m.size() : 0;
1594    }
1595
1596    public boolean hasRegionsTwiceOverThreshold() {
1597      return totalRITsTwiceThreshold > 0;
1598    }
1599
1600    public boolean hasRegionsOverThreshold() {
1601      Map<String, RegionState> m = this.ritsOverThreshold;
1602      return m != null && !m.isEmpty();
1603    }
1604
1605    public Collection<RegionState> getRegionOverThreshold() {
1606      Map<String, RegionState> m = this.ritsOverThreshold;
1607      return m != null ? m.values() : Collections.emptySet();
1608    }
1609
1610    public boolean isRegionOverThreshold(final RegionInfo regionInfo) {
1611      Map<String, RegionState> m = this.ritsOverThreshold;
1612      return m != null && m.containsKey(regionInfo.getEncodedName());
1613    }
1614
1615    public boolean isRegionTwiceOverThreshold(final RegionInfo regionInfo) {
1616      Map<String, RegionState> m = this.ritsOverThreshold;
1617      if (m == null) {
1618        return false;
1619      }
1620      final RegionState state = m.get(regionInfo.getEncodedName());
1621      if (state == null) {
1622        return false;
1623      }
1624      return (statTimestamp - state.getStamp()) > (ritThreshold * 2);
1625    }
1626
1627    protected void update(final AssignmentManager am) {
1628      final RegionStates regionStates = am.getRegionStates();
1629      this.statTimestamp = EnvironmentEdgeManager.currentTime();
1630      update(regionStates.getRegionsStateInTransition(), statTimestamp);
1631      update(regionStates.getRegionFailedOpen(), statTimestamp);
1632
1633      if (LOG.isDebugEnabled() && ritsOverThreshold != null && !ritsOverThreshold.isEmpty()) {
1634        LOG.debug("RITs over threshold: {}",
1635          ritsOverThreshold.entrySet().stream()
1636            .map(e -> e.getKey() + ":" + e.getValue().getState().name())
1637            .collect(Collectors.joining("\n")));
1638      }
1639    }
1640
1641    private void update(final Collection<RegionState> regions, final long currentTime) {
1642      for (RegionState state : regions) {
1643        totalRITs++;
1644        final long ritStartedMs = state.getStamp();
1645        if (ritStartedMs == 0) {
1646          // Don't output bogus values to metrics if they accidentally make it here.
1647          LOG.warn("The RIT {} has no start time", state.getRegion());
1648          continue;
1649        }
1650        final long ritTime = currentTime - ritStartedMs;
1651        if (ritTime > ritThreshold) {
1652          if (ritsOverThreshold == null) {
1653            ritsOverThreshold = new HashMap<String, RegionState>();
1654          }
1655          ritsOverThreshold.put(state.getRegion().getEncodedName(), state);
1656          totalRITsTwiceThreshold += (ritTime > (ritThreshold * 2)) ? 1 : 0;
1657        }
1658        if (oldestRITTime < ritTime) {
1659          oldestRITTime = ritTime;
1660        }
1661      }
1662    }
1663  }
1664
1665  private void updateRegionsInTransitionMetrics(final RegionInTransitionStat ritStat) {
1666    metrics.updateRITOldestAge(ritStat.getOldestRITTime());
1667    metrics.updateRITCount(ritStat.getTotalRITs());
1668    metrics.updateRITCountOverThreshold(ritStat.getTotalRITsOverThreshold());
1669  }
1670
1671  private void updateDeadServerRegionMetrics(int deadRegions, int unknownRegions) {
1672    metrics.updateDeadServerOpenRegions(deadRegions);
1673    metrics.updateUnknownServerOpenRegions(unknownRegions);
1674  }
1675
1676  private void handleRegionOverStuckWarningThreshold(final RegionInfo regionInfo) {
1677    final RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo);
1678    // if (regionNode.isStuck()) {
1679    LOG.warn("STUCK Region-In-Transition {}", regionNode);
1680  }
1681
1682  // ============================================================================================
1683  // TODO: Master load/bootstrap
1684  // ============================================================================================
1685  public void joinCluster() throws IOException {
1686    long startTime = System.nanoTime();
1687    LOG.debug("Joining cluster...");
1688
1689    // Scan hbase:meta to build list of existing regions, servers, and assignment.
1690    // hbase:meta is online now or will be. Inside loadMeta, we keep trying. Can't make progress
1691    // w/o meta.
1692    loadMeta();
1693
1694    while (master.getServerManager().countOfRegionServers() < 1) {
1695      LOG.info("Waiting for RegionServers to join; current count={}",
1696        master.getServerManager().countOfRegionServers());
1697      Threads.sleep(250);
1698    }
1699    LOG.info("Number of RegionServers={}", master.getServerManager().countOfRegionServers());
1700
1701    // Start the chores
1702    master.getMasterProcedureExecutor().addChore(this.ritChore);
1703    master.getMasterProcedureExecutor().addChore(this.deadMetricChore);
1704
1705    long costMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
1706    LOG.info("Joined the cluster in {}", StringUtils.humanTimeDiff(costMs));
1707  }
1708
1709  /**
1710   * Create assign procedure for offline regions. Just follow the old
1711   * processofflineServersWithOnlineRegions method. Since now we do not need to deal with dead
1712   * server any more, we only deal with the regions in OFFLINE state in this method. And this is a
1713   * bit strange, that for new regions, we will add it in CLOSED state instead of OFFLINE state, and
1714   * usually there will be a procedure to track them. The processofflineServersWithOnlineRegions is
1715   * a legacy from long ago, as things are going really different now, maybe we do not need this
1716   * method any more. Need to revisit later.
1717   */
1718  // Public so can be run by the Master as part of the startup. Needs hbase:meta to be online.
1719  // Needs to be done after the table state manager has been started.
1720  public void processOfflineRegions() {
1721    TransitRegionStateProcedure[] procs =
1722      regionStates.getRegionStateNodes().stream().filter(rsn -> rsn.isInState(State.OFFLINE))
1723        .filter(rsn -> isTableEnabled(rsn.getRegionInfo().getTable())).map(rsn -> {
1724          rsn.lock();
1725          try {
1726            if (rsn.getProcedure() != null) {
1727              return null;
1728            } else {
1729              return rsn.setProcedure(TransitRegionStateProcedure.assign(getProcedureEnvironment(),
1730                rsn.getRegionInfo(), null));
1731            }
1732          } finally {
1733            rsn.unlock();
1734          }
1735        }).filter(p -> p != null).toArray(TransitRegionStateProcedure[]::new);
1736    if (procs.length > 0) {
1737      master.getMasterProcedureExecutor().submitProcedures(procs);
1738    }
1739  }
1740
1741  /*
1742   * AM internal RegionStateStore.RegionStateVisitor implementation. To be used when scanning META
1743   * table for region rows, using RegionStateStore utility methods. RegionStateStore methods will
1744   * convert Result into proper RegionInfo instances, but those would still need to be added into
1745   * AssignmentManager.regionStates in-memory cache. RegionMetaLoadingVisitor.visitRegionState
1746   * method provides the logic for adding RegionInfo instances as loaded from latest META scan into
1747   * AssignmentManager.regionStates.
1748   */
1749  private class RegionMetaLoadingVisitor implements RegionStateStore.RegionStateVisitor {
1750
1751    @Override
1752    public void visitRegionState(Result result, final RegionInfo regionInfo, final State state,
1753      final ServerName regionLocation, final ServerName lastHost, final long openSeqNum) {
1754      if (
1755        state == null && regionLocation == null && lastHost == null
1756          && openSeqNum == SequenceId.NO_SEQUENCE_ID
1757      ) {
1758        // This is a row with nothing in it.
1759        LOG.warn("Skipping empty row={}", result);
1760        return;
1761      }
1762      State localState = state;
1763      if (localState == null) {
1764        // No region state column data in hbase:meta table! Are I doing a rolling upgrade from
1765        // hbase1 to hbase2? Am I restoring a SNAPSHOT or otherwise adding a region to hbase:meta?
1766        // In any of these cases, state is empty. For now, presume OFFLINE but there are probably
1767        // cases where we need to probe more to be sure this correct; TODO informed by experience.
1768        LOG.info(regionInfo.getEncodedName() + " regionState=null; presuming " + State.OFFLINE);
1769        localState = State.OFFLINE;
1770      }
1771      RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo);
1772      // Do not need to lock on regionNode, as we can make sure that before we finish loading
1773      // meta, all the related procedures can not be executed. The only exception is for meta
1774      // region related operations, but here we do not load the informations for meta region.
1775      regionNode.setState(localState);
1776      regionNode.setLastHost(lastHost);
1777      regionNode.setRegionLocation(regionLocation);
1778      regionNode.setOpenSeqNum(openSeqNum);
1779
1780      // Note: keep consistent with other methods, see region(Opening|Opened|Closing)
1781      // RIT/ServerCrash handling should take care of the transiting regions.
1782      if (
1783        localState.matches(State.OPEN, State.OPENING, State.CLOSING, State.SPLITTING, State.MERGING)
1784      ) {
1785        assert regionLocation != null : "found null region location for " + regionNode;
1786        // TODO: this could lead to some orphan server state nodes, as it is possible that the
1787        // region server is already dead and its SCP has already finished but we have
1788        // persisted an opening state on this region server. Finally the TRSP will assign the
1789        // region to another region server, so it will not cause critical problems, just waste
1790        // some memory as no one will try to cleanup these orphan server state nodes.
1791        regionStates.createServer(regionLocation);
1792        regionStates.addRegionToServer(regionNode);
1793      } else if (localState == State.OFFLINE || regionInfo.isOffline()) {
1794        regionStates.addToOfflineRegions(regionNode);
1795      }
1796      if (regionNode.getProcedure() != null) {
1797        regionNode.getProcedure().stateLoaded(AssignmentManager.this, regionNode);
1798      }
1799    }
1800  };
1801
1802  /**
1803   * Attempt to load {@code regionInfo} from META, adding any results to the
1804   * {@link #regionStateStore} Is NOT aware of replica regions.
1805   * @param regionInfo the region to be loaded from META.
1806   * @throws IOException If some error occurs while querying META or parsing results.
1807   */
1808  public void populateRegionStatesFromMeta(@NonNull final RegionInfo regionInfo)
1809    throws IOException {
1810    final String regionEncodedName = RegionInfo.DEFAULT_REPLICA_ID == regionInfo.getReplicaId()
1811      ? regionInfo.getEncodedName()
1812      : RegionInfoBuilder.newBuilder(regionInfo).setReplicaId(RegionInfo.DEFAULT_REPLICA_ID).build()
1813        .getEncodedName();
1814    populateRegionStatesFromMeta(regionEncodedName);
1815  }
1816
1817  /**
1818   * Attempt to load {@code regionEncodedName} from META, adding any results to the
1819   * {@link #regionStateStore} Is NOT aware of replica regions.
1820   * @param regionEncodedName encoded name for the region to be loaded from META.
1821   * @throws IOException If some error occurs while querying META or parsing results.
1822   */
1823  public void populateRegionStatesFromMeta(@NonNull String regionEncodedName) throws IOException {
1824    final RegionMetaLoadingVisitor visitor = new RegionMetaLoadingVisitor();
1825    regionStateStore.visitMetaForRegion(regionEncodedName, visitor);
1826  }
1827
1828  private void loadMeta() throws IOException {
1829    // TODO: use a thread pool
1830    regionStateStore.visitMeta(new RegionMetaLoadingVisitor());
1831  }
1832
1833  /**
1834   * Used to check if the meta loading is done.
1835   * <p/>
1836   * if not we throw PleaseHoldException since we are rebuilding the RegionStates
1837   * @param hri region to check if it is already rebuild
1838   * @throws PleaseHoldException if meta has not been loaded yet
1839   */
1840  private void checkMetaLoaded(RegionInfo hri) throws PleaseHoldException {
1841    if (!isRunning()) {
1842      throw new PleaseHoldException("AssignmentManager not running");
1843    }
1844    boolean meta = isMetaRegion(hri);
1845    boolean metaLoaded = isMetaLoaded();
1846    if (!meta && !metaLoaded) {
1847      throw new PleaseHoldException(
1848        "Master not fully online; hbase:meta=" + meta + ", metaLoaded=" + metaLoaded);
1849    }
1850  }
1851
1852  // ============================================================================================
1853  // TODO: Metrics
1854  // ============================================================================================
1855  public int getNumRegionsOpened() {
1856    // TODO: Used by TestRegionPlacement.java and assume monotonically increasing value
1857    return 0;
1858  }
1859
1860  /**
1861   * Usually run by the Master in reaction to server crash during normal processing. Can also be
1862   * invoked via external RPC to effect repair; in the latter case, the 'force' flag is set so we
1863   * push through the SCP though context may indicate already-running-SCP (An old SCP may have
1864   * exited abnormally, or damaged cluster may still have references in hbase:meta to 'Unknown
1865   * Servers' -- servers that are not online or in dead servers list, etc.)
1866   * @param force Set if the request came in externally over RPC (via hbck2). Force means run the
1867   *              SCP even if it seems as though there might be an outstanding SCP running.
1868   * @return pid of scheduled SCP or {@link Procedure#NO_PROC_ID} if none scheduled.
1869   */
1870  public long submitServerCrash(ServerName serverName, boolean shouldSplitWal, boolean force) {
1871    // May be an 'Unknown Server' so handle case where serverNode is null.
1872    ServerStateNode serverNode = regionStates.getServerNode(serverName);
1873    // Remove the in-memory rsReports result
1874    synchronized (rsReports) {
1875      rsReports.remove(serverName);
1876    }
1877    if (serverNode == null) {
1878      if (force) {
1879        LOG.info("Force adding ServerCrashProcedure for {} when server node is null", serverName);
1880      } else {
1881        // for normal case, do not schedule SCP if ServerStateNode is null
1882        LOG.warn("Skip adding ServerCrashProcedure for {} because server node is null", serverName);
1883        return Procedure.NO_PROC_ID;
1884      }
1885    }
1886
1887    ProcedureExecutor<MasterProcedureEnv> procExec = this.master.getMasterProcedureExecutor();
1888    // We hold the write lock here for fencing on reportRegionStateTransition. Once we set the
1889    // server state to CRASHED, we will no longer accept the reportRegionStateTransition call from
1890    // this server. This is used to simplify the implementation for TRSP and SCP, where we can make
1891    // sure that, the region list fetched by SCP will not be changed any more.
1892    if (serverNode != null) {
1893      serverNode.writeLock().lock();
1894    }
1895    try {
1896
1897      boolean carryingMeta = isCarryingMeta(serverName);
1898      if (serverNode != null && !serverNode.isInState(ServerState.ONLINE)) {
1899        if (force) {
1900          LOG.info("Force adding ServerCrashProcedure for {} (meta={}) when state is not {}",
1901            serverNode, carryingMeta, ServerState.ONLINE);
1902        } else {
1903          LOG.info("Skip adding ServerCrashProcedure for {} (meta={}) when state is not {}",
1904            serverNode, carryingMeta, ServerState.ONLINE);
1905          return Procedure.NO_PROC_ID;
1906        }
1907      }
1908      MasterProcedureEnv mpe = procExec.getEnvironment();
1909      // If serverNode == null, then 'Unknown Server'. Schedule HBCKSCP instead.
1910      // HBCKSCP scours Master in-memory state AND hbase;meta for references to
1911      // serverName just-in-case. An SCP that is scheduled when the server is
1912      // 'Unknown' probably originated externally with HBCK2 fix-it tool.
1913      ServerState oldState = null;
1914      if (serverNode != null) {
1915        oldState = serverNode.getState();
1916        serverNode.setState(ServerState.CRASHED);
1917      }
1918      ServerCrashProcedure scp = force
1919        ? new HBCKServerCrashProcedure(mpe, serverName, shouldSplitWal, carryingMeta)
1920        : new ServerCrashProcedure(mpe, serverName, shouldSplitWal, carryingMeta);
1921      long pid = procExec.submitProcedure(scp);
1922      LOG.info("Scheduled ServerCrashProcedure pid={} for {} (carryingMeta={}){}.", pid, serverName,
1923        carryingMeta,
1924        serverNode == null ? "" : " " + serverNode.toString() + ", oldState=" + oldState);
1925      return pid;
1926    } finally {
1927      if (serverNode != null) {
1928        serverNode.writeLock().unlock();
1929      }
1930    }
1931  }
1932
1933  public void offlineRegion(final RegionInfo regionInfo) {
1934    // TODO used by MasterRpcServices
1935    RegionStateNode node = regionStates.getRegionStateNode(regionInfo);
1936    if (node != null) {
1937      node.offline();
1938    }
1939  }
1940
1941  public void onlineRegion(final RegionInfo regionInfo, final ServerName serverName) {
1942    // TODO used by TestSplitTransactionOnCluster.java
1943  }
1944
1945  public Map<ServerName, List<RegionInfo>>
1946    getSnapShotOfAssignment(final Collection<RegionInfo> regions) {
1947    return regionStates.getSnapShotOfAssignment(regions);
1948  }
1949
1950  // ============================================================================================
1951  // TODO: UTILS/HELPERS?
1952  // ============================================================================================
1953  /**
1954   * Used by the client (via master) to identify if all regions have the schema updates
1955   * @return Pair indicating the status of the alter command (pending/total)
1956   */
1957  public Pair<Integer, Integer> getReopenStatus(TableName tableName) {
1958    if (isTableDisabled(tableName)) {
1959      return new Pair<Integer, Integer>(0, 0);
1960    }
1961
1962    final List<RegionState> states = regionStates.getTableRegionStates(tableName);
1963    int ritCount = 0;
1964    for (RegionState regionState : states) {
1965      if (!regionState.isOpened() && !regionState.isSplit()) {
1966        ritCount++;
1967      }
1968    }
1969    return new Pair<Integer, Integer>(ritCount, states.size());
1970  }
1971
1972  // ============================================================================================
1973  // TODO: Region State In Transition
1974  // ============================================================================================
1975  public boolean hasRegionsInTransition() {
1976    return regionStates.hasRegionsInTransition();
1977  }
1978
1979  public List<RegionStateNode> getRegionsInTransition() {
1980    return regionStates.getRegionsInTransition();
1981  }
1982
1983  public List<RegionInfo> getAssignedRegions() {
1984    return regionStates.getAssignedRegions();
1985  }
1986
1987  /**
1988   * Resolve a cached {@link RegionInfo} from the region name as a {@code byte[]}.
1989   */
1990  public RegionInfo getRegionInfo(final byte[] regionName) {
1991    final RegionStateNode regionState = regionStates.getRegionStateNodeFromName(regionName);
1992    return regionState != null ? regionState.getRegionInfo() : null;
1993  }
1994
1995  /**
1996   * Resolve a cached {@link RegionInfo} from the encoded region name as a {@code String}.
1997   */
1998  public RegionInfo getRegionInfo(final String encodedRegionName) {
1999    final RegionStateNode regionState =
2000      regionStates.getRegionStateNodeFromEncodedRegionName(encodedRegionName);
2001    return regionState != null ? regionState.getRegionInfo() : null;
2002  }
2003
2004  // ============================================================================================
2005  // Expected states on region state transition.
2006  // Notice that there is expected states for transiting to OPENING state, this is because SCP.
2007  // See the comments in regionOpening method for more details.
2008  // ============================================================================================
2009  private static final State[] STATES_EXPECTED_ON_OPEN = { State.OPENING, // Normal case
2010    State.OPEN // Retrying
2011  };
2012
2013  private static final State[] STATES_EXPECTED_ON_CLOSING = { State.OPEN, // Normal case
2014    State.CLOSING, // Retrying
2015    State.SPLITTING, // Offline the split parent
2016    State.MERGING // Offline the merge parents
2017  };
2018
2019  private static final State[] STATES_EXPECTED_ON_CLOSED = { State.CLOSING, // Normal case
2020    State.CLOSED // Retrying
2021  };
2022
2023  // This is for manually scheduled region assign, can add other states later if we find out other
2024  // usages
2025  private static final State[] STATES_EXPECTED_ON_ASSIGN = { State.CLOSED, State.OFFLINE };
2026
2027  // We only allow unassign or move a region which is in OPEN state.
2028  private static final State[] STATES_EXPECTED_ON_UNASSIGN_OR_MOVE = { State.OPEN };
2029
2030  // ============================================================================================
2031  // Region Status update
2032  // Should only be called in TransitRegionStateProcedure(and related procedures), as the locking
2033  // and pre-assumptions are very tricky.
2034  // ============================================================================================
2035  private CompletableFuture<Void> transitStateAndUpdate(RegionStateNode regionNode,
2036    RegionState.State newState, RegionState.State... expectedStates) {
2037    RegionState.State state = regionNode.getState();
2038    try {
2039      regionNode.transitionState(newState, expectedStates);
2040    } catch (UnexpectedStateException e) {
2041      return FutureUtils.failedFuture(e);
2042    }
2043    CompletableFuture<Void> future = regionStateStore.updateRegionLocation(regionNode);
2044    FutureUtils.addListener(future, (r, e) -> {
2045      if (e != null) {
2046        // revert
2047        regionNode.setState(state);
2048      }
2049    });
2050    return future;
2051  }
2052
2053  // should be called within the synchronized block of RegionStateNode
2054  CompletableFuture<Void> regionOpening(RegionStateNode regionNode) {
2055    // As in SCP, for performance reason, there is no TRSP attached with this region, we will not
2056    // update the region state, which means that the region could be in any state when we want to
2057    // assign it after a RS crash. So here we do not pass the expectedStates parameter.
2058    return transitStateAndUpdate(regionNode, State.OPENING).thenAccept(r -> {
2059      ServerStateNode serverNode = regionStates.getServerNode(regionNode.getRegionLocation());
2060      // Here the server node could be null. For example, we want to assign the region to a given
2061      // region server and it crashes, and it is the region server which holds hbase:meta, then the
2062      // above transitStateAndUpdate call will never succeed until we finishes the SCP for it. But
2063      // after the SCP finishes, the server node will be removed, so when we arrive there, the
2064      // server
2065      // node will be null. This is not a big problem if we skip adding it, as later we will fail to
2066      // execute the remote procedure on the region server and then try to assign to another region
2067      // server
2068      if (serverNode != null) {
2069        serverNode.addRegion(regionNode);
2070      }
2071      // update the operation count metrics
2072      metrics.incrementOperationCounter();
2073    });
2074  }
2075
2076  // should be called under the RegionStateNode lock
2077  // The parameter 'giveUp' means whether we will try to open the region again, if it is true, then
2078  // we will persist the FAILED_OPEN state into hbase:meta.
2079  CompletableFuture<Void> regionFailedOpen(RegionStateNode regionNode, boolean giveUp) {
2080    RegionState.State state = regionNode.getState();
2081    ServerName regionLocation = regionNode.getRegionLocation();
2082    if (!giveUp) {
2083      if (regionLocation != null) {
2084        regionStates.removeRegionFromServer(regionLocation, regionNode);
2085      }
2086      return CompletableFuture.completedFuture(null);
2087    }
2088    regionNode.setState(State.FAILED_OPEN);
2089    regionNode.setRegionLocation(null);
2090    CompletableFuture<Void> future = regionStateStore.updateRegionLocation(regionNode);
2091    FutureUtils.addListener(future, (r, e) -> {
2092      if (e == null) {
2093        if (regionLocation != null) {
2094          regionStates.removeRegionFromServer(regionLocation, regionNode);
2095        }
2096      } else {
2097        // revert
2098        regionNode.setState(state);
2099        regionNode.setRegionLocation(regionLocation);
2100      }
2101    });
2102    return future;
2103  }
2104
2105  // should be called under the RegionStateNode lock
2106  CompletableFuture<Void> regionClosing(RegionStateNode regionNode) {
2107    return transitStateAndUpdate(regionNode, State.CLOSING, STATES_EXPECTED_ON_CLOSING)
2108      .thenAccept(r -> {
2109        RegionInfo hri = regionNode.getRegionInfo();
2110        // Set meta has not initialized early. so people trying to create/edit tables will wait
2111        if (isMetaRegion(hri)) {
2112          setMetaAssigned(hri, false);
2113        }
2114        // update the operation count metrics
2115        metrics.incrementOperationCounter();
2116      });
2117  }
2118
2119  // for open and close, they will first be persist to the procedure store in
2120  // RegionRemoteProcedureBase. So here we will first change the in memory state as it is considered
2121  // as succeeded if the persistence to procedure store is succeeded, and then when the
2122  // RegionRemoteProcedureBase is woken up, we will persist the RegionStateNode to hbase:meta.
2123
2124  // should be called under the RegionStateNode lock
2125  void regionOpenedWithoutPersistingToMeta(RegionStateNode regionNode)
2126    throws UnexpectedStateException {
2127    regionNode.transitionState(State.OPEN, STATES_EXPECTED_ON_OPEN);
2128    RegionInfo regionInfo = regionNode.getRegionInfo();
2129    regionStates.addRegionToServer(regionNode);
2130    regionStates.removeFromFailedOpen(regionInfo);
2131  }
2132
2133  // should be called under the RegionStateNode lock
2134  void regionClosedWithoutPersistingToMeta(RegionStateNode regionNode)
2135    throws UnexpectedStateException {
2136    ServerName regionLocation = regionNode.getRegionLocation();
2137    regionNode.transitionState(State.CLOSED, STATES_EXPECTED_ON_CLOSED);
2138    regionNode.setRegionLocation(null);
2139    if (regionLocation != null) {
2140      regionNode.setLastHost(regionLocation);
2141      regionStates.removeRegionFromServer(regionLocation, regionNode);
2142    }
2143  }
2144
2145  // should be called under the RegionStateNode lock
2146  CompletableFuture<Void> persistToMeta(RegionStateNode regionNode) {
2147    return regionStateStore.updateRegionLocation(regionNode).thenAccept(r -> {
2148      RegionInfo regionInfo = regionNode.getRegionInfo();
2149      if (isMetaRegion(regionInfo) && regionNode.getState() == State.OPEN) {
2150        // Usually we'd set a table ENABLED at this stage but hbase:meta is ALWAYs enabled, it
2151        // can't be disabled -- so skip the RPC (besides... enabled is managed by TableStateManager
2152        // which is backed by hbase:meta... Avoid setting ENABLED to avoid having to update state
2153        // on table that contains state.
2154        setMetaAssigned(regionInfo, true);
2155      }
2156    });
2157  }
2158
2159  // should be called under the RegionStateNode lock
2160  // for SCP
2161  public CompletableFuture<Void> regionClosedAbnormally(RegionStateNode regionNode) {
2162    RegionState.State state = regionNode.getState();
2163    ServerName regionLocation = regionNode.getRegionLocation();
2164    regionNode.setState(State.ABNORMALLY_CLOSED);
2165    regionNode.setRegionLocation(null);
2166    CompletableFuture<Void> future = regionStateStore.updateRegionLocation(regionNode);
2167    FutureUtils.addListener(future, (r, e) -> {
2168      if (e == null) {
2169        if (regionLocation != null) {
2170          regionNode.setLastHost(regionLocation);
2171          regionStates.removeRegionFromServer(regionLocation, regionNode);
2172        }
2173      } else {
2174        // revert
2175        regionNode.setState(state);
2176        regionNode.setRegionLocation(regionLocation);
2177      }
2178    });
2179    return future;
2180  }
2181
2182  // ============================================================================================
2183  // The above methods can only be called in TransitRegionStateProcedure(and related procedures)
2184  // ============================================================================================
2185
2186  public void markRegionAsSplit(final RegionInfo parent, final ServerName serverName,
2187    final RegionInfo daughterA, final RegionInfo daughterB) throws IOException {
2188    // Update hbase:meta. Parent will be marked offline and split up in hbase:meta.
2189    // The parent stays in regionStates until cleared when removed by CatalogJanitor.
2190    // Update its state in regionStates to it shows as offline and split when read
2191    // later figuring what regions are in a table and what are not: see
2192    // regionStates#getRegionsOfTable
2193    final RegionStateNode node = regionStates.getOrCreateRegionStateNode(parent);
2194    node.setState(State.SPLIT);
2195    final RegionStateNode nodeA = regionStates.getOrCreateRegionStateNode(daughterA);
2196    nodeA.setState(State.SPLITTING_NEW);
2197    final RegionStateNode nodeB = regionStates.getOrCreateRegionStateNode(daughterB);
2198    nodeB.setState(State.SPLITTING_NEW);
2199
2200    TableDescriptor td = master.getTableDescriptors().get(parent.getTable());
2201    // TODO: here we just update the parent region info in meta, to set split and offline to true,
2202    // without changing the one in the region node. This is a bit confusing but the region info
2203    // field in RegionStateNode is not expected to be changed in the current design. Need to find a
2204    // possible way to address this problem, or at least adding more comments about the trick to
2205    // deal with this problem, that when you want to filter out split parent, you need to check both
2206    // the RegionState on whether it is split, and also the region info. If one of them matches then
2207    // it is a split parent. And usually only one of them can match, as after restart, the region
2208    // state will be changed from SPLIT to CLOSED.
2209    regionStateStore.splitRegion(parent, daughterA, daughterB, serverName, td);
2210    if (shouldAssignFavoredNodes(parent)) {
2211      List<ServerName> onlineServers = this.master.getServerManager().getOnlineServersList();
2212      getFavoredNodePromoter().generateFavoredNodesForDaughter(onlineServers, parent, daughterA,
2213        daughterB);
2214    }
2215  }
2216
2217  /**
2218   * When called here, the merge has happened. The merged regions have been unassigned and the above
2219   * markRegionClosed has been called on each so they have been disassociated from a hosting Server.
2220   * The merged region will be open after this call. The merged regions are removed from hbase:meta
2221   * below. Later they are deleted from the filesystem by the catalog janitor running against
2222   * hbase:meta. It notices when the merged region no longer holds references to the old regions
2223   * (References are deleted after a compaction rewrites what the Reference points at but not until
2224   * the archiver chore runs, are the References removed).
2225   */
2226  public void markRegionAsMerged(final RegionInfo child, final ServerName serverName,
2227    RegionInfo[] mergeParents) throws IOException {
2228    final RegionStateNode node = regionStates.getOrCreateRegionStateNode(child);
2229    node.setState(State.MERGED);
2230    for (RegionInfo ri : mergeParents) {
2231      regionStates.deleteRegion(ri);
2232    }
2233    TableDescriptor td = master.getTableDescriptors().get(child.getTable());
2234    regionStateStore.mergeRegions(child, mergeParents, serverName, td);
2235    if (shouldAssignFavoredNodes(child)) {
2236      getFavoredNodePromoter().generateFavoredNodesForMergedRegion(child, mergeParents);
2237    }
2238  }
2239
2240  /*
2241   * Favored nodes should be applied only when FavoredNodes balancer is configured and the region
2242   * belongs to a non-system table.
2243   */
2244  private boolean shouldAssignFavoredNodes(RegionInfo region) {
2245    return this.shouldAssignRegionsWithFavoredNodes
2246      && FavoredNodesManager.isFavoredNodeApplicable(region);
2247  }
2248
2249  // ============================================================================================
2250  // Assign Queue (Assign/Balance)
2251  // ============================================================================================
2252  private final ArrayList<RegionStateNode> pendingAssignQueue = new ArrayList<RegionStateNode>();
2253  private final ReentrantLock assignQueueLock = new ReentrantLock();
2254  private final Condition assignQueueFullCond = assignQueueLock.newCondition();
2255
2256  /**
2257   * Add the assign operation to the assignment queue. The pending assignment operation will be
2258   * processed, and each region will be assigned by a server using the balancer.
2259   */
2260  protected void queueAssign(final RegionStateNode regionNode) {
2261    regionNode.getProcedureEvent().suspend();
2262
2263    // TODO: quick-start for meta and the other sys-tables?
2264    assignQueueLock.lock();
2265    try {
2266      pendingAssignQueue.add(regionNode);
2267      if (
2268        regionNode.isSystemTable() || pendingAssignQueue.size() == 1
2269          || pendingAssignQueue.size() >= assignDispatchWaitQueueMaxSize
2270      ) {
2271        assignQueueFullCond.signal();
2272      }
2273    } finally {
2274      assignQueueLock.unlock();
2275    }
2276  }
2277
2278  private void startAssignmentThread() {
2279    assignThread = new Thread(master.getServerName().toShortString()) {
2280      @Override
2281      public void run() {
2282        while (isRunning()) {
2283          processAssignQueue();
2284        }
2285        pendingAssignQueue.clear();
2286      }
2287    };
2288    assignThread.setDaemon(true);
2289    assignThread.start();
2290  }
2291
2292  private void stopAssignmentThread() {
2293    assignQueueSignal();
2294    try {
2295      while (assignThread.isAlive()) {
2296        assignQueueSignal();
2297        assignThread.join(250);
2298      }
2299    } catch (InterruptedException e) {
2300      LOG.warn("join interrupted", e);
2301      Thread.currentThread().interrupt();
2302    }
2303  }
2304
2305  private void assignQueueSignal() {
2306    assignQueueLock.lock();
2307    try {
2308      assignQueueFullCond.signal();
2309    } finally {
2310      assignQueueLock.unlock();
2311    }
2312  }
2313
2314  @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP")
2315  private HashMap<RegionInfo, RegionStateNode> waitOnAssignQueue() {
2316    HashMap<RegionInfo, RegionStateNode> regions = null;
2317
2318    assignQueueLock.lock();
2319    try {
2320      if (pendingAssignQueue.isEmpty() && isRunning()) {
2321        assignQueueFullCond.await();
2322      }
2323
2324      if (!isRunning()) {
2325        return null;
2326      }
2327      assignQueueFullCond.await(assignDispatchWaitMillis, TimeUnit.MILLISECONDS);
2328      regions = new HashMap<RegionInfo, RegionStateNode>(pendingAssignQueue.size());
2329      for (RegionStateNode regionNode : pendingAssignQueue) {
2330        regions.put(regionNode.getRegionInfo(), regionNode);
2331      }
2332      pendingAssignQueue.clear();
2333    } catch (InterruptedException e) {
2334      LOG.warn("got interrupted ", e);
2335      Thread.currentThread().interrupt();
2336    } finally {
2337      assignQueueLock.unlock();
2338    }
2339    return regions;
2340  }
2341
2342  private void processAssignQueue() {
2343    final HashMap<RegionInfo, RegionStateNode> regions = waitOnAssignQueue();
2344    if (regions == null || regions.size() == 0 || !isRunning()) {
2345      return;
2346    }
2347
2348    if (LOG.isTraceEnabled()) {
2349      LOG.trace("PROCESS ASSIGN QUEUE regionCount=" + regions.size());
2350    }
2351
2352    // TODO: Optimize balancer. pass a RegionPlan?
2353    final HashMap<RegionInfo, ServerName> retainMap = new HashMap<>();
2354    final List<RegionInfo> userHRIs = new ArrayList<>(regions.size());
2355    // Regions for system tables requiring reassignment
2356    final List<RegionInfo> systemHRIs = new ArrayList<>();
2357    for (RegionStateNode regionStateNode : regions.values()) {
2358      boolean sysTable = regionStateNode.isSystemTable();
2359      final List<RegionInfo> hris = sysTable ? systemHRIs : userHRIs;
2360      if (regionStateNode.getRegionLocation() != null) {
2361        retainMap.put(regionStateNode.getRegionInfo(), regionStateNode.getRegionLocation());
2362      } else {
2363        hris.add(regionStateNode.getRegionInfo());
2364      }
2365    }
2366
2367    // TODO: connect with the listener to invalidate the cache
2368
2369    // TODO use events
2370    List<ServerName> servers = master.getServerManager().createDestinationServersList();
2371    for (int i = 0; servers.size() < 1; ++i) {
2372      // Report every fourth time around this loop; try not to flood log.
2373      if (i % 4 == 0) {
2374        LOG.warn("No servers available; cannot place " + regions.size() + " unassigned regions.");
2375      }
2376
2377      if (!isRunning()) {
2378        LOG.debug("Stopped! Dropping assign of " + regions.size() + " queued regions.");
2379        return;
2380      }
2381      Threads.sleep(250);
2382      servers = master.getServerManager().createDestinationServersList();
2383    }
2384
2385    if (!systemHRIs.isEmpty()) {
2386      // System table regions requiring reassignment are present, get region servers
2387      // not available for system table regions
2388      final List<ServerName> excludeServers = getExcludedServersForSystemTable();
2389      List<ServerName> serversForSysTables =
2390        servers.stream().filter(s -> !excludeServers.contains(s)).collect(Collectors.toList());
2391      if (serversForSysTables.isEmpty()) {
2392        LOG.warn("Filtering old server versions and the excluded produced an empty set; "
2393          + "instead considering all candidate servers!");
2394      }
2395      LOG.debug("Processing assignQueue; systemServersCount=" + serversForSysTables.size()
2396        + ", allServersCount=" + servers.size());
2397      processAssignmentPlans(regions, null, systemHRIs,
2398        serversForSysTables.isEmpty() && !containsBogusAssignments(regions, systemHRIs)
2399          ? servers
2400          : serversForSysTables);
2401    }
2402
2403    processAssignmentPlans(regions, retainMap, userHRIs, servers);
2404  }
2405
2406  private boolean containsBogusAssignments(Map<RegionInfo, RegionStateNode> regions,
2407    List<RegionInfo> hirs) {
2408    for (RegionInfo ri : hirs) {
2409      if (
2410        regions.get(ri).getRegionLocation() != null
2411          && regions.get(ri).getRegionLocation().equals(LoadBalancer.BOGUS_SERVER_NAME)
2412      ) {
2413        return true;
2414      }
2415    }
2416    return false;
2417  }
2418
2419  private void processAssignmentPlans(final HashMap<RegionInfo, RegionStateNode> regions,
2420    final HashMap<RegionInfo, ServerName> retainMap, final List<RegionInfo> hris,
2421    final List<ServerName> servers) {
2422    boolean isTraceEnabled = LOG.isTraceEnabled();
2423    if (isTraceEnabled) {
2424      LOG.trace("Available servers count=" + servers.size() + ": " + servers);
2425    }
2426
2427    final LoadBalancer balancer = getBalancer();
2428    // ask the balancer where to place regions
2429    if (retainMap != null && !retainMap.isEmpty()) {
2430      if (isTraceEnabled) {
2431        LOG.trace("retain assign regions=" + retainMap);
2432      }
2433      try {
2434        acceptPlan(regions, balancer.retainAssignment(retainMap, servers));
2435      } catch (IOException e) {
2436        LOG.warn("unable to retain assignment", e);
2437        addToPendingAssignment(regions, retainMap.keySet());
2438      }
2439    }
2440
2441    // TODO: Do we need to split retain and round-robin?
2442    // the retain seems to fallback to round-robin/random if the region is not in the map.
2443    if (!hris.isEmpty()) {
2444      Collections.sort(hris, RegionInfo.COMPARATOR);
2445      if (isTraceEnabled) {
2446        LOG.trace("round robin regions=" + hris);
2447      }
2448      try {
2449        acceptPlan(regions, balancer.roundRobinAssignment(hris, servers));
2450      } catch (IOException e) {
2451        LOG.warn("unable to round-robin assignment", e);
2452        addToPendingAssignment(regions, hris);
2453      }
2454    }
2455  }
2456
2457  private void acceptPlan(final HashMap<RegionInfo, RegionStateNode> regions,
2458    final Map<ServerName, List<RegionInfo>> plan) throws HBaseIOException {
2459    final ProcedureEvent<?>[] events = new ProcedureEvent[regions.size()];
2460    final long st = EnvironmentEdgeManager.currentTime();
2461
2462    if (plan.isEmpty()) {
2463      throw new HBaseIOException("unable to compute plans for regions=" + regions.size());
2464    }
2465
2466    int evcount = 0;
2467    for (Map.Entry<ServerName, List<RegionInfo>> entry : plan.entrySet()) {
2468      final ServerName server = entry.getKey();
2469      for (RegionInfo hri : entry.getValue()) {
2470        final RegionStateNode regionNode = regions.get(hri);
2471        regionNode.setRegionLocation(server);
2472        if (server.equals(LoadBalancer.BOGUS_SERVER_NAME) && regionNode.isSystemTable()) {
2473          assignQueueLock.lock();
2474          try {
2475            pendingAssignQueue.add(regionNode);
2476          } finally {
2477            assignQueueLock.unlock();
2478          }
2479        } else {
2480          events[evcount++] = regionNode.getProcedureEvent();
2481        }
2482      }
2483    }
2484    ProcedureEvent.wakeEvents(getProcedureScheduler(), events);
2485
2486    final long et = EnvironmentEdgeManager.currentTime();
2487    if (LOG.isTraceEnabled()) {
2488      LOG.trace("ASSIGN ACCEPT " + events.length + " -> " + StringUtils.humanTimeDiff(et - st));
2489    }
2490  }
2491
2492  private void addToPendingAssignment(final HashMap<RegionInfo, RegionStateNode> regions,
2493    final Collection<RegionInfo> pendingRegions) {
2494    assignQueueLock.lock();
2495    try {
2496      for (RegionInfo hri : pendingRegions) {
2497        pendingAssignQueue.add(regions.get(hri));
2498      }
2499    } finally {
2500      assignQueueLock.unlock();
2501    }
2502  }
2503
2504  /**
2505   * For a given cluster with mixed versions of servers, get a list of servers with lower versions,
2506   * where system table regions should not be assigned to. For system table, we must assign regions
2507   * to a server with highest version. However, we can disable this exclusion using config:
2508   * "hbase.min.version.move.system.tables" if checkForMinVersion is true. Detailed explanation
2509   * available with definition of minVersionToMoveSysTables.
2510   * @return List of Excluded servers for System table regions.
2511   */
2512  public List<ServerName> getExcludedServersForSystemTable() {
2513    // TODO: This should be a cached list kept by the ServerManager rather than calculated on each
2514    // move or system region assign. The RegionServerTracker keeps list of online Servers with
2515    // RegionServerInfo that includes Version.
2516    List<Pair<ServerName, String>> serverList =
2517      master.getServerManager().getOnlineServersList().stream()
2518        .map(s -> new Pair<>(s, master.getRegionServerVersion(s))).collect(Collectors.toList());
2519    if (serverList.isEmpty()) {
2520      return new ArrayList<>();
2521    }
2522    String highestVersion = Collections
2523      .max(serverList, (o1, o2) -> VersionInfo.compareVersion(o1.getSecond(), o2.getSecond()))
2524      .getSecond();
2525    if (!DEFAULT_MIN_VERSION_MOVE_SYS_TABLES_CONFIG.equals(minVersionToMoveSysTables)) {
2526      int comparedValue = VersionInfo.compareVersion(minVersionToMoveSysTables, highestVersion);
2527      if (comparedValue > 0) {
2528        return new ArrayList<>();
2529      }
2530    }
2531    return serverList.stream().filter(pair -> !pair.getSecond().equals(highestVersion))
2532      .map(Pair::getFirst).collect(Collectors.toList());
2533  }
2534
2535  MasterServices getMaster() {
2536    return master;
2537  }
2538
2539  /** Returns a snapshot of rsReports */
2540  public Map<ServerName, Set<byte[]>> getRSReports() {
2541    Map<ServerName, Set<byte[]>> rsReportsSnapshot = new HashMap<>();
2542    synchronized (rsReports) {
2543      rsReports.entrySet().forEach(e -> rsReportsSnapshot.put(e.getKey(), e.getValue()));
2544    }
2545    return rsReportsSnapshot;
2546  }
2547
2548  /**
2549   * Provide regions state count for given table. e.g howmany regions of give table are
2550   * opened/closed/rit etc
2551   * @param tableName TableName
2552   * @return region states count
2553   */
2554  public RegionStatesCount getRegionStatesCount(TableName tableName) {
2555    int openRegionsCount = 0;
2556    int closedRegionCount = 0;
2557    int ritCount = 0;
2558    int splitRegionCount = 0;
2559    int totalRegionCount = 0;
2560    if (!isTableDisabled(tableName)) {
2561      final List<RegionState> states = regionStates.getTableRegionStates(tableName);
2562      for (RegionState regionState : states) {
2563        if (regionState.isOpened()) {
2564          openRegionsCount++;
2565        } else if (regionState.isClosed()) {
2566          closedRegionCount++;
2567        } else if (regionState.isSplit()) {
2568          splitRegionCount++;
2569        }
2570      }
2571      totalRegionCount = states.size();
2572      ritCount = totalRegionCount - openRegionsCount - splitRegionCount;
2573    }
2574    return new RegionStatesCount.RegionStatesCountBuilder().setOpenRegions(openRegionsCount)
2575      .setClosedRegions(closedRegionCount).setSplitRegions(splitRegionCount)
2576      .setRegionsInTransition(ritCount).setTotalRegions(totalRegionCount).build();
2577  }
2578
2579}