001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master;
019
020import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
021
022import java.io.IOException;
023import java.net.InetAddress;
024import java.util.ArrayList;
025import java.util.Collections;
026import java.util.HashSet;
027import java.util.Iterator;
028import java.util.List;
029import java.util.Map;
030import java.util.Map.Entry;
031import java.util.Set;
032import java.util.concurrent.ConcurrentNavigableMap;
033import java.util.concurrent.ConcurrentSkipListMap;
034import java.util.concurrent.CopyOnWriteArrayList;
035import java.util.concurrent.atomic.AtomicBoolean;
036import java.util.function.Predicate;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.fs.FSDataInputStream;
039import org.apache.hadoop.fs.FSDataOutputStream;
040import org.apache.hadoop.fs.FileSystem;
041import org.apache.hadoop.fs.Path;
042import org.apache.hadoop.hbase.ClockOutOfSyncException;
043import org.apache.hadoop.hbase.HConstants;
044import org.apache.hadoop.hbase.NotServingRegionException;
045import org.apache.hadoop.hbase.RegionMetrics;
046import org.apache.hadoop.hbase.ScheduledChore;
047import org.apache.hadoop.hbase.ServerMetrics;
048import org.apache.hadoop.hbase.ServerMetricsBuilder;
049import org.apache.hadoop.hbase.ServerName;
050import org.apache.hadoop.hbase.YouAreDeadException;
051import org.apache.hadoop.hbase.client.AsyncClusterConnection;
052import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
053import org.apache.hadoop.hbase.client.RegionInfo;
054import org.apache.hadoop.hbase.ipc.RemoteWithExtrasException;
055import org.apache.hadoop.hbase.master.assignment.RegionStates;
056import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
057import org.apache.hadoop.hbase.monitoring.MonitoredTask;
058import org.apache.hadoop.hbase.procedure2.Procedure;
059import org.apache.hadoop.hbase.util.Bytes;
060import org.apache.hadoop.hbase.util.CommonFSUtils;
061import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
062import org.apache.hadoop.hbase.util.FutureUtils;
063import org.apache.hadoop.hbase.zookeeper.ZKUtil;
064import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
065import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
066import org.apache.yetus.audience.InterfaceAudience;
067import org.apache.zookeeper.KeeperException;
068import org.slf4j.Logger;
069import org.slf4j.LoggerFactory;
070
071import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
072import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
073
074import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
075import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
076import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
077import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
078import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.FlushedRegionSequenceId;
079import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.FlushedSequenceId;
080import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.FlushedStoreSequenceId;
081import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
082
083/**
084 * The ServerManager class manages info about region servers.
085 * <p>
086 * Maintains lists of online and dead servers. Processes the startups, shutdowns, and deaths of
087 * region servers.
088 * <p>
089 * Servers are distinguished in two different ways. A given server has a location, specified by
090 * hostname and port, and of which there can only be one online at any given time. A server instance
091 * is specified by the location (hostname and port) as well as the startcode (timestamp from when
092 * the server was started). This is used to differentiate a restarted instance of a given server
093 * from the original instance.
094 * <p>
095 * If a sever is known not to be running any more, it is called dead. The dead server needs to be
096 * handled by a ServerShutdownHandler. If the handler is not enabled yet, the server can't be
097 * handled right away so it is queued up. After the handler is enabled, the server will be submitted
098 * to a handler to handle. However, the handler may be just partially enabled. If so, the server
099 * cannot be fully processed, and be queued up for further processing. A server is fully processed
100 * only after the handler is fully enabled and has completed the handling.
101 */
102@InterfaceAudience.Private
103public class ServerManager {
104  public static final String WAIT_ON_REGIONSERVERS_MAXTOSTART =
105    "hbase.master.wait.on.regionservers.maxtostart";
106
107  public static final String WAIT_ON_REGIONSERVERS_MINTOSTART =
108    "hbase.master.wait.on.regionservers.mintostart";
109
110  public static final String WAIT_ON_REGIONSERVERS_TIMEOUT =
111    "hbase.master.wait.on.regionservers.timeout";
112
113  public static final String WAIT_ON_REGIONSERVERS_INTERVAL =
114    "hbase.master.wait.on.regionservers.interval";
115
116  /**
117   * see HBASE-20727 if set to true, flushedSequenceIdByRegion and storeFlushedSequenceIdsByRegion
118   * will be persisted to HDFS and loaded when master restart to speed up log split
119   */
120  public static final String PERSIST_FLUSHEDSEQUENCEID =
121    "hbase.master.persist.flushedsequenceid.enabled";
122
123  public static final boolean PERSIST_FLUSHEDSEQUENCEID_DEFAULT = true;
124
125  public static final String FLUSHEDSEQUENCEID_FLUSHER_INTERVAL =
126    "hbase.master.flushedsequenceid.flusher.interval";
127
128  public static final int FLUSHEDSEQUENCEID_FLUSHER_INTERVAL_DEFAULT = 3 * 60 * 60 * 1000; // 3
129                                                                                           // hours
130
131  public static final String MAX_CLOCK_SKEW_MS = "hbase.master.maxclockskew";
132
133  private static final Logger LOG = LoggerFactory.getLogger(ServerManager.class);
134
135  // Set if we are to shutdown the cluster.
136  private AtomicBoolean clusterShutdown = new AtomicBoolean(false);
137
138  /**
139   * The last flushed sequence id for a region.
140   */
141  private final ConcurrentNavigableMap<byte[], Long> flushedSequenceIdByRegion =
142    new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
143
144  private boolean persistFlushedSequenceId = true;
145  private volatile boolean isFlushSeqIdPersistInProgress = false;
146  /** File on hdfs to store last flushed sequence id of regions */
147  private static final String LAST_FLUSHED_SEQ_ID_FILE = ".lastflushedseqids";
148  private FlushedSequenceIdFlusher flushedSeqIdFlusher;
149
150  /**
151   * The last flushed sequence id for a store in a region.
152   */
153  private final ConcurrentNavigableMap<byte[],
154    ConcurrentNavigableMap<byte[], Long>> storeFlushedSequenceIdsByRegion =
155      new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
156
157  /** Map of registered servers to their current load */
158  private final ConcurrentNavigableMap<ServerName, ServerMetrics> onlineServers =
159    new ConcurrentSkipListMap<>();
160
161  /** List of region servers that should not get any more new regions. */
162  private final ArrayList<ServerName> drainingServers = new ArrayList<>();
163
164  private final MasterServices master;
165  private final RegionServerList storage;
166
167  private final DeadServer deadservers = new DeadServer();
168
169  private final long maxSkew;
170  private final long warningSkew;
171
172  /** Listeners that are called on server events. */
173  private List<ServerListener> listeners = new CopyOnWriteArrayList<>();
174
175  /**
176   * Constructor.
177   */
178  public ServerManager(final MasterServices master, RegionServerList storage) {
179    this.master = master;
180    this.storage = storage;
181    Configuration c = master.getConfiguration();
182    maxSkew = c.getLong(MAX_CLOCK_SKEW_MS, 30000);
183    warningSkew = c.getLong("hbase.master.warningclockskew", 10000);
184    persistFlushedSequenceId =
185      c.getBoolean(PERSIST_FLUSHEDSEQUENCEID, PERSIST_FLUSHEDSEQUENCEID_DEFAULT);
186  }
187
188  /**
189   * Add the listener to the notification list.
190   * @param listener The ServerListener to register
191   */
192  public void registerListener(final ServerListener listener) {
193    this.listeners.add(listener);
194  }
195
196  /**
197   * Remove the listener from the notification list.
198   * @param listener The ServerListener to unregister
199   */
200  public boolean unregisterListener(final ServerListener listener) {
201    return this.listeners.remove(listener);
202  }
203
204  /**
205   * Let the server manager know a new regionserver has come online
206   * @param request       the startup request
207   * @param versionNumber the version number of the new regionserver
208   * @param version       the version of the new regionserver, could contain strings like "SNAPSHOT"
209   * @param ia            the InetAddress from which request is received
210   * @return The ServerName we know this server as.
211   */
212  ServerName regionServerStartup(RegionServerStartupRequest request, int versionNumber,
213    String version, InetAddress ia) throws IOException {
214    // Test for case where we get a region startup message from a regionserver
215    // that has been quickly restarted but whose znode expiration handler has
216    // not yet run, or from a server whose fail we are currently processing.
217    // Test its host+port combo is present in serverAddressToServerInfo. If it
218    // is, reject the server and trigger its expiration. The next time it comes
219    // in, it should have been removed from serverAddressToServerInfo and queued
220    // for processing by ProcessServerShutdown.
221
222    final String hostname =
223      request.hasUseThisHostnameInstead() ? request.getUseThisHostnameInstead() : ia.getHostName();
224    ServerName sn = ServerName.valueOf(hostname, request.getPort(), request.getServerStartCode());
225    checkClockSkew(sn, request.getServerCurrentTime());
226    checkIsDead(sn, "STARTUP");
227    if (!checkAndRecordNewServer(sn, ServerMetricsBuilder.of(sn, versionNumber, version))) {
228      LOG.warn(
229        "THIS SHOULD NOT HAPPEN, RegionServerStartup" + " could not record the server: " + sn);
230    }
231    storage.started(sn);
232    return sn;
233  }
234
235  /**
236   * Updates last flushed sequence Ids for the regions on server sn
237   */
238  private void updateLastFlushedSequenceIds(ServerName sn, ServerMetrics hsl) {
239    for (Entry<byte[], RegionMetrics> entry : hsl.getRegionMetrics().entrySet()) {
240      byte[] encodedRegionName = Bytes.toBytes(RegionInfo.encodeRegionName(entry.getKey()));
241      Long existingValue = flushedSequenceIdByRegion.get(encodedRegionName);
242      long l = entry.getValue().getCompletedSequenceId();
243      // Don't let smaller sequence ids override greater sequence ids.
244      if (LOG.isTraceEnabled()) {
245        LOG.trace(Bytes.toString(encodedRegionName) + ", existingValue=" + existingValue
246          + ", completeSequenceId=" + l);
247      }
248      if (existingValue == null || (l != HConstants.NO_SEQNUM && l > existingValue)) {
249        flushedSequenceIdByRegion.put(encodedRegionName, l);
250      } else if (l != HConstants.NO_SEQNUM && l < existingValue) {
251        LOG.warn("RegionServer " + sn + " indicates a last flushed sequence id (" + l
252          + ") that is less than the previous last flushed sequence id (" + existingValue
253          + ") for region " + Bytes.toString(entry.getKey()) + " Ignoring.");
254      }
255      ConcurrentNavigableMap<byte[], Long> storeFlushedSequenceId =
256        computeIfAbsent(storeFlushedSequenceIdsByRegion, encodedRegionName,
257          () -> new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR));
258      for (Entry<byte[], Long> storeSeqId : entry.getValue().getStoreSequenceId().entrySet()) {
259        byte[] family = storeSeqId.getKey();
260        existingValue = storeFlushedSequenceId.get(family);
261        l = storeSeqId.getValue();
262        if (LOG.isTraceEnabled()) {
263          LOG.trace(Bytes.toString(encodedRegionName) + ", family=" + Bytes.toString(family)
264            + ", existingValue=" + existingValue + ", completeSequenceId=" + l);
265        }
266        // Don't let smaller sequence ids override greater sequence ids.
267        if (existingValue == null || (l != HConstants.NO_SEQNUM && l > existingValue.longValue())) {
268          storeFlushedSequenceId.put(family, l);
269        }
270      }
271    }
272  }
273
274  public void regionServerReport(ServerName sn, ServerMetrics sl) throws YouAreDeadException {
275    checkIsDead(sn, "REPORT");
276    if (null == this.onlineServers.replace(sn, sl)) {
277      // Already have this host+port combo and its just different start code?
278      // Just let the server in. Presume master joining a running cluster.
279      // recordNewServer is what happens at the end of reportServerStartup.
280      // The only thing we are skipping is passing back to the regionserver
281      // the ServerName to use. Here we presume a master has already done
282      // that so we'll press on with whatever it gave us for ServerName.
283      if (!checkAndRecordNewServer(sn, sl)) {
284        LOG.info("RegionServerReport ignored, could not record the server: " + sn);
285        return; // Not recorded, so no need to move on
286      }
287    }
288    updateLastFlushedSequenceIds(sn, sl);
289  }
290
291  /**
292   * Check is a server of same host and port already exists, if not, or the existed one got a
293   * smaller start code, record it.
294   * @param serverName the server to check and record
295   * @param sl         the server load on the server
296   * @return true if the server is recorded, otherwise, false
297   */
298  boolean checkAndRecordNewServer(final ServerName serverName, final ServerMetrics sl) {
299    ServerName existingServer = null;
300    synchronized (this.onlineServers) {
301      existingServer = findServerWithSameHostnamePortWithLock(serverName);
302      if (existingServer != null && (existingServer.getStartcode() > serverName.getStartcode())) {
303        LOG.info("Server serverName=" + serverName + " rejected; we already have "
304          + existingServer.toString() + " registered with same hostname and port");
305        return false;
306      }
307      recordNewServerWithLock(serverName, sl);
308    }
309
310    // Tell our listeners that a server was added
311    if (!this.listeners.isEmpty()) {
312      for (ServerListener listener : this.listeners) {
313        listener.serverAdded(serverName);
314      }
315    }
316
317    // Note that we assume that same ts means same server, and don't expire in that case.
318    // TODO: ts can theoretically collide due to clock shifts, so this is a bit hacky.
319    if (existingServer != null && (existingServer.getStartcode() < serverName.getStartcode())) {
320      LOG.info("Triggering server recovery; existingServer " + existingServer
321        + " looks stale, new server:" + serverName);
322      expireServer(existingServer);
323    }
324    return true;
325  }
326
327  /**
328   * Find out the region servers crashed between the crash of the previous master instance and the
329   * current master instance and schedule SCP for them.
330   * <p/>
331   * Since the {@code RegionServerTracker} has already helped us to construct the online servers set
332   * by scanning zookeeper, now we can compare the online servers with {@code liveServersFromWALDir}
333   * to find out whether there are servers which are already dead.
334   * <p/>
335   * Must be called inside the initialization method of {@code RegionServerTracker} to avoid
336   * concurrency issue.
337   * @param deadServersFromPE     the region servers which already have a SCP associated.
338   * @param liveServersFromWALDir the live region servers from wal directory.
339   */
340  void findDeadServersAndProcess(Set<ServerName> deadServersFromPE,
341    Set<ServerName> liveServersFromWALDir) {
342    deadServersFromPE.forEach(deadservers::putIfAbsent);
343    liveServersFromWALDir.stream().filter(sn -> !onlineServers.containsKey(sn))
344      .forEach(this::expireServer);
345  }
346
347  /**
348   * Checks if the clock skew between the server and the master. If the clock skew exceeds the
349   * configured max, it will throw an exception; if it exceeds the configured warning threshold, it
350   * will log a warning but start normally.
351   * @param serverName Incoming servers's name n * @throws ClockOutOfSyncException if the skew
352   *                   exceeds the configured max value
353   */
354  private void checkClockSkew(final ServerName serverName, final long serverCurrentTime)
355    throws ClockOutOfSyncException {
356    long skew = Math.abs(EnvironmentEdgeManager.currentTime() - serverCurrentTime);
357    if (skew > maxSkew) {
358      String message = "Server " + serverName + " has been "
359        + "rejected; Reported time is too far out of sync with master.  " + "Time difference of "
360        + skew + "ms > max allowed of " + maxSkew + "ms";
361      LOG.warn(message);
362      throw new ClockOutOfSyncException(message);
363    } else if (skew > warningSkew) {
364      String message = "Reported time for server " + serverName + " is out of sync with master "
365        + "by " + skew + "ms. (Warning threshold is " + warningSkew + "ms; " + "error threshold is "
366        + maxSkew + "ms)";
367      LOG.warn(message);
368    }
369  }
370
371  /**
372   * Called when RegionServer first reports in for duty and thereafter each time it heartbeats to
373   * make sure it is has not been figured for dead. If this server is on the dead list, reject it
374   * with a YouAreDeadException. If it was dead but came back with a new start code, remove the old
375   * entry from the dead list.
376   * @param what START or REPORT
377   */
378  private void checkIsDead(final ServerName serverName, final String what)
379    throws YouAreDeadException {
380    if (this.deadservers.isDeadServer(serverName)) {
381      // Exact match: host name, port and start code all match with existing one of the
382      // dead servers. So, this server must be dead. Tell it to kill itself.
383      String message =
384        "Server " + what + " rejected; currently processing " + serverName + " as dead server";
385      LOG.debug(message);
386      throw new YouAreDeadException(message);
387    }
388    // Remove dead server with same hostname and port of newly checking in rs after master
389    // initialization. See HBASE-5916 for more information.
390    if (
391      (this.master == null || this.master.isInitialized())
392        && this.deadservers.cleanPreviousInstance(serverName)
393    ) {
394      // This server has now become alive after we marked it as dead.
395      // We removed it's previous entry from the dead list to reflect it.
396      LOG.debug("{} {} came back up, removed it from the dead servers list", what, serverName);
397    }
398  }
399
400  /**
401   * Assumes onlineServers is locked.
402   * @return ServerName with matching hostname and port.
403   */
404  private ServerName findServerWithSameHostnamePortWithLock(final ServerName serverName) {
405    ServerName end =
406      ServerName.valueOf(serverName.getHostname(), serverName.getPort(), Long.MAX_VALUE);
407
408    ServerName r = onlineServers.lowerKey(end);
409    if (r != null) {
410      if (ServerName.isSameAddress(r, serverName)) {
411        return r;
412      }
413    }
414    return null;
415  }
416
417  /**
418   * Adds the onlineServers list. onlineServers should be locked.
419   * @param serverName The remote servers name.
420   */
421  void recordNewServerWithLock(final ServerName serverName, final ServerMetrics sl) {
422    LOG.info("Registering regionserver=" + serverName);
423    this.onlineServers.put(serverName, sl);
424  }
425
426  public ConcurrentNavigableMap<byte[], Long> getFlushedSequenceIdByRegion() {
427    return flushedSequenceIdByRegion;
428  }
429
430  public RegionStoreSequenceIds getLastFlushedSequenceId(byte[] encodedRegionName) {
431    RegionStoreSequenceIds.Builder builder = RegionStoreSequenceIds.newBuilder();
432    Long seqId = flushedSequenceIdByRegion.get(encodedRegionName);
433    builder.setLastFlushedSequenceId(seqId != null ? seqId.longValue() : HConstants.NO_SEQNUM);
434    Map<byte[], Long> storeFlushedSequenceId =
435      storeFlushedSequenceIdsByRegion.get(encodedRegionName);
436    if (storeFlushedSequenceId != null) {
437      for (Map.Entry<byte[], Long> entry : storeFlushedSequenceId.entrySet()) {
438        builder.addStoreSequenceId(StoreSequenceId.newBuilder()
439          .setFamilyName(UnsafeByteOperations.unsafeWrap(entry.getKey()))
440          .setSequenceId(entry.getValue().longValue()).build());
441      }
442    }
443    return builder.build();
444  }
445
446  /**
447   * n * @return ServerMetrics if serverName is known else null
448   */
449  public ServerMetrics getLoad(final ServerName serverName) {
450    return this.onlineServers.get(serverName);
451  }
452
453  /**
454   * Compute the average load across all region servers. Currently, this uses a very naive
455   * computation - just uses the number of regions being served, ignoring stats about number of
456   * requests.
457   * @return the average load
458   */
459  public double getAverageLoad() {
460    int totalLoad = 0;
461    int numServers = 0;
462    for (ServerMetrics sl : this.onlineServers.values()) {
463      numServers++;
464      totalLoad += sl.getRegionMetrics().size();
465    }
466    return numServers == 0 ? 0 : (double) totalLoad / (double) numServers;
467  }
468
469  /** Returns the count of active regionservers */
470  public int countOfRegionServers() {
471    // Presumes onlineServers is a concurrent map
472    return this.onlineServers.size();
473  }
474
475  /** Returns Read-only map of servers to serverinfo */
476  public Map<ServerName, ServerMetrics> getOnlineServers() {
477    // Presumption is that iterating the returned Map is OK.
478    synchronized (this.onlineServers) {
479      return Collections.unmodifiableMap(this.onlineServers);
480    }
481  }
482
483  public DeadServer getDeadServers() {
484    return this.deadservers;
485  }
486
487  /**
488   * Checks if any dead servers are currently in progress.
489   * @return true if any RS are being processed as dead, false if not
490   */
491  public boolean areDeadServersInProgress() throws IOException {
492    return master.getProcedures().stream()
493      .anyMatch(p -> !p.isFinished() && p instanceof ServerCrashProcedure);
494  }
495
496  void letRegionServersShutdown() {
497    long previousLogTime = 0;
498    ServerName sn = master.getServerName();
499    ZKWatcher zkw = master.getZooKeeper();
500    int onlineServersCt;
501    while ((onlineServersCt = onlineServers.size()) > 0) {
502      if (EnvironmentEdgeManager.currentTime() > (previousLogTime + 1000)) {
503        Set<ServerName> remainingServers = onlineServers.keySet();
504        synchronized (onlineServers) {
505          if (remainingServers.size() == 1 && remainingServers.contains(sn)) {
506            // Master will delete itself later.
507            return;
508          }
509        }
510        StringBuilder sb = new StringBuilder();
511        // It's ok here to not sync on onlineServers - merely logging
512        for (ServerName key : remainingServers) {
513          if (sb.length() > 0) {
514            sb.append(", ");
515          }
516          sb.append(key);
517        }
518        LOG.info("Waiting on regionserver(s) " + sb.toString());
519        previousLogTime = EnvironmentEdgeManager.currentTime();
520      }
521
522      try {
523        List<String> servers = getRegionServersInZK(zkw);
524        if (
525          servers == null || servers.isEmpty()
526            || (servers.size() == 1 && servers.contains(sn.toString()))
527        ) {
528          LOG.info("ZK shows there is only the master self online, exiting now");
529          // Master could have lost some ZK events, no need to wait more.
530          break;
531        }
532      } catch (KeeperException ke) {
533        LOG.warn("Failed to list regionservers", ke);
534        // ZK is malfunctioning, don't hang here
535        break;
536      }
537      synchronized (onlineServers) {
538        try {
539          if (onlineServersCt == onlineServers.size()) onlineServers.wait(100);
540        } catch (InterruptedException ignored) {
541          // continue
542        }
543      }
544    }
545  }
546
547  private List<String> getRegionServersInZK(final ZKWatcher zkw) throws KeeperException {
548    return ZKUtil.listChildrenNoWatch(zkw, zkw.getZNodePaths().rsZNode);
549  }
550
551  /**
552   * Expire the passed server. Add it to list of dead servers and queue a shutdown processing.
553   * @return pid if we queued a ServerCrashProcedure else {@link Procedure#NO_PROC_ID} if we did not
554   *         (could happen for many reasons including the fact that its this server that is going
555   *         down or we already have queued an SCP for this server or SCP processing is currently
556   *         disabled because we are in startup phase).
557   */
558  // Redo test so we can make this protected.
559  public synchronized long expireServer(final ServerName serverName) {
560    return expireServer(serverName, false);
561
562  }
563
564  synchronized long expireServer(final ServerName serverName, boolean force) {
565    // THIS server is going down... can't handle our own expiration.
566    if (serverName.equals(master.getServerName())) {
567      if (!(master.isAborted() || master.isStopped())) {
568        master.stop("We lost our znode?");
569      }
570      return Procedure.NO_PROC_ID;
571    }
572    if (this.deadservers.isDeadServer(serverName)) {
573      LOG.warn("Expiration called on {} but already in DeadServer", serverName);
574      return Procedure.NO_PROC_ID;
575    }
576    moveFromOnlineToDeadServers(serverName);
577
578    // If server is in draining mode, remove corresponding znode
579    // In some tests, the mocked HM may not have ZK Instance, hence null check
580    if (master.getZooKeeper() != null) {
581      String drainingZnode = ZNodePaths
582        .joinZNode(master.getZooKeeper().getZNodePaths().drainingZNode, serverName.getServerName());
583      try {
584        ZKUtil.deleteNodeFailSilent(master.getZooKeeper(), drainingZnode);
585      } catch (KeeperException e) {
586        LOG.warn(
587          "Error deleting the draining znode for stopping server " + serverName.getServerName(), e);
588      }
589    }
590
591    // If cluster is going down, yes, servers are going to be expiring; don't
592    // process as a dead server
593    if (isClusterShutdown()) {
594      LOG.info("Cluster shutdown set; " + serverName + " expired; onlineServers="
595        + this.onlineServers.size());
596      if (this.onlineServers.isEmpty()) {
597        master.stop("Cluster shutdown set; onlineServer=0");
598      }
599      return Procedure.NO_PROC_ID;
600    }
601    LOG.info("Processing expiration of " + serverName + " on " + this.master.getServerName());
602    long pid = master.getAssignmentManager().submitServerCrash(serverName, true, force);
603    storage.expired(serverName);
604    // Tell our listeners that a server was removed
605    if (!this.listeners.isEmpty()) {
606      this.listeners.stream().forEach(l -> l.serverRemoved(serverName));
607    }
608    // trigger a persist of flushedSeqId
609    if (flushedSeqIdFlusher != null) {
610      flushedSeqIdFlusher.triggerNow();
611    }
612    return pid;
613  }
614
615  /**
616   * Called when server has expired.
617   */
618  // Locking in this class needs cleanup.
619  public synchronized void moveFromOnlineToDeadServers(final ServerName sn) {
620    synchronized (this.onlineServers) {
621      boolean online = this.onlineServers.containsKey(sn);
622      if (online) {
623        // Remove the server from the known servers lists and update load info BUT
624        // add to deadservers first; do this so it'll show in dead servers list if
625        // not in online servers list.
626        this.deadservers.putIfAbsent(sn);
627        this.onlineServers.remove(sn);
628        onlineServers.notifyAll();
629      } else {
630        // If not online, that is odd but may happen if 'Unknown Servers' -- where meta
631        // has references to servers not online nor in dead servers list. If
632        // 'Unknown Server', don't add to DeadServers else will be there for ever.
633        LOG.trace("Expiration of {} but server not online", sn);
634      }
635    }
636  }
637
638  /*
639   * Remove the server from the drain list.
640   */
641  public synchronized boolean removeServerFromDrainList(final ServerName sn) {
642    // Warn if the server (sn) is not online. ServerName is of the form:
643    // <hostname> , <port> , <startcode>
644
645    if (!this.isServerOnline(sn)) {
646      LOG.warn("Server " + sn + " is not currently online. "
647        + "Removing from draining list anyway, as requested.");
648    }
649    // Remove the server from the draining servers lists.
650    return this.drainingServers.remove(sn);
651  }
652
653  /**
654   * Add the server to the drain list. n * @return True if the server is added or the server is
655   * already on the drain list.
656   */
657  public synchronized boolean addServerToDrainList(final ServerName sn) {
658    // Warn if the server (sn) is not online. ServerName is of the form:
659    // <hostname> , <port> , <startcode>
660
661    if (!this.isServerOnline(sn)) {
662      LOG.warn("Server " + sn + " is not currently online. "
663        + "Ignoring request to add it to draining list.");
664      return false;
665    }
666    // Add the server to the draining servers lists, if it's not already in
667    // it.
668    if (this.drainingServers.contains(sn)) {
669      LOG.warn("Server " + sn + " is already in the draining server list."
670        + "Ignoring request to add it again.");
671      return true;
672    }
673    LOG.info("Server " + sn + " added to draining server list.");
674    return this.drainingServers.add(sn);
675  }
676
677  /**
678   * Contacts a region server and waits up to timeout ms to close the region. This bypasses the
679   * active hmaster. Pass -1 as timeout if you do not want to wait on result.
680   */
681  public static void closeRegionSilentlyAndWait(AsyncClusterConnection connection,
682    ServerName server, RegionInfo region, long timeout) throws IOException, InterruptedException {
683    AsyncRegionServerAdmin admin = connection.getRegionServerAdmin(server);
684    try {
685      FutureUtils.get(
686        admin.closeRegion(ProtobufUtil.buildCloseRegionRequest(server, region.getRegionName())));
687    } catch (IOException e) {
688      LOG.warn("Exception when closing region: " + region.getRegionNameAsString(), e);
689    }
690    if (timeout < 0) {
691      return;
692    }
693    long expiration = timeout + EnvironmentEdgeManager.currentTime();
694    while (EnvironmentEdgeManager.currentTime() < expiration) {
695      try {
696        RegionInfo rsRegion = ProtobufUtil.toRegionInfo(FutureUtils
697          .get(
698            admin.getRegionInfo(RequestConverter.buildGetRegionInfoRequest(region.getRegionName())))
699          .getRegionInfo());
700        if (rsRegion == null) {
701          return;
702        }
703      } catch (IOException ioe) {
704        if (
705          ioe instanceof NotServingRegionException
706            || (ioe instanceof RemoteWithExtrasException && ((RemoteWithExtrasException) ioe)
707              .unwrapRemoteException() instanceof NotServingRegionException)
708        ) {
709          // no need to retry again
710          return;
711        }
712        LOG.warn("Exception when retrieving regioninfo from: " + region.getRegionNameAsString(),
713          ioe);
714      }
715      Thread.sleep(1000);
716    }
717    throw new IOException("Region " + region + " failed to close within" + " timeout " + timeout);
718  }
719
720  /**
721   * Calculate min necessary to start. This is not an absolute. It is just a friction that will
722   * cause us hang around a bit longer waiting on RegionServers to check-in.
723   */
724  private int getMinToStart() {
725    if (master.isInMaintenanceMode()) {
726      // If in maintenance mode, then in process region server hosting meta will be the only server
727      // available
728      return 1;
729    }
730
731    int minimumRequired = 1;
732    int minToStart = this.master.getConfiguration().getInt(WAIT_ON_REGIONSERVERS_MINTOSTART, -1);
733    // Ensure we are never less than minimumRequired else stuff won't work.
734    return Math.max(minToStart, minimumRequired);
735  }
736
737  /**
738   * Wait for the region servers to report in. We will wait until one of this condition is met: -
739   * the master is stopped - the 'hbase.master.wait.on.regionservers.maxtostart' number of region
740   * servers is reached - the 'hbase.master.wait.on.regionservers.mintostart' is reached AND there
741   * have been no new region server in for 'hbase.master.wait.on.regionservers.interval' time AND
742   * the 'hbase.master.wait.on.regionservers.timeout' is reached n
743   */
744  public void waitForRegionServers(MonitoredTask status) throws InterruptedException {
745    final long interval =
746      this.master.getConfiguration().getLong(WAIT_ON_REGIONSERVERS_INTERVAL, 1500);
747    final long timeout =
748      this.master.getConfiguration().getLong(WAIT_ON_REGIONSERVERS_TIMEOUT, 4500);
749    // Min is not an absolute; just a friction making us wait longer on server checkin.
750    int minToStart = getMinToStart();
751    int maxToStart =
752      this.master.getConfiguration().getInt(WAIT_ON_REGIONSERVERS_MAXTOSTART, Integer.MAX_VALUE);
753    if (maxToStart < minToStart) {
754      LOG.warn(String.format("The value of '%s' (%d) is set less than '%s' (%d), ignoring.",
755        WAIT_ON_REGIONSERVERS_MAXTOSTART, maxToStart, WAIT_ON_REGIONSERVERS_MINTOSTART,
756        minToStart));
757      maxToStart = Integer.MAX_VALUE;
758    }
759
760    long now = EnvironmentEdgeManager.currentTime();
761    final long startTime = now;
762    long slept = 0;
763    long lastLogTime = 0;
764    long lastCountChange = startTime;
765    int count = countOfRegionServers();
766    int oldCount = 0;
767    // This while test is a little hard to read. We try to comment it in below but in essence:
768    // Wait if Master is not stopped and the number of regionservers that have checked-in is
769    // less than the maxToStart. Both of these conditions will be true near universally.
770    // Next, we will keep cycling if ANY of the following three conditions are true:
771    // 1. The time since a regionserver registered is < interval (means servers are actively
772    // checking in).
773    // 2. We are under the total timeout.
774    // 3. The count of servers is < minimum.
775    for (ServerListener listener : this.listeners) {
776      listener.waiting();
777    }
778    while (
779      !this.master.isStopped() && !isClusterShutdown() && count < maxToStart
780        && ((lastCountChange + interval) > now || timeout > slept || count < minToStart)
781    ) {
782      // Log some info at every interval time or if there is a change
783      if (oldCount != count || lastLogTime + interval < now) {
784        lastLogTime = now;
785        String msg =
786          "Waiting on regionserver count=" + count + "; waited=" + slept + "ms, expecting min="
787            + minToStart + " server(s), max=" + getStrForMax(maxToStart) + " server(s), "
788            + "timeout=" + timeout + "ms, lastChange=" + (now - lastCountChange) + "ms";
789        LOG.info(msg);
790        status.setStatus(msg);
791      }
792
793      // We sleep for some time
794      final long sleepTime = 50;
795      Thread.sleep(sleepTime);
796      now = EnvironmentEdgeManager.currentTime();
797      slept = now - startTime;
798
799      oldCount = count;
800      count = countOfRegionServers();
801      if (count != oldCount) {
802        lastCountChange = now;
803      }
804    }
805    // Did we exit the loop because cluster is going down?
806    if (isClusterShutdown()) {
807      this.master.stop("Cluster shutdown");
808    }
809    LOG.info("Finished waiting on RegionServer count=" + count + "; waited=" + slept + "ms,"
810      + " expected min=" + minToStart + " server(s), max=" + getStrForMax(maxToStart)
811      + " server(s)," + " master is " + (this.master.isStopped() ? "stopped." : "running"));
812  }
813
814  private String getStrForMax(final int max) {
815    return max == Integer.MAX_VALUE ? "NO_LIMIT" : Integer.toString(max);
816  }
817
818  /** Returns A copy of the internal list of online servers. */
819  public List<ServerName> getOnlineServersList() {
820    // TODO: optimize the load balancer call so we don't need to make a new list
821    // TODO: FIX. THIS IS POPULAR CALL.
822    return new ArrayList<>(this.onlineServers.keySet());
823  }
824
825  /**
826   * @param keys                 The target server name
827   * @param idleServerPredicator Evaluates the server on the given load
828   * @return A copy of the internal list of online servers matched by the predicator
829   */
830  public List<ServerName> getOnlineServersListWithPredicator(List<ServerName> keys,
831    Predicate<ServerMetrics> idleServerPredicator) {
832    List<ServerName> names = new ArrayList<>();
833    if (keys != null && idleServerPredicator != null) {
834      keys.forEach(name -> {
835        ServerMetrics load = onlineServers.get(name);
836        if (load != null) {
837          if (idleServerPredicator.test(load)) {
838            names.add(name);
839          }
840        }
841      });
842    }
843    return names;
844  }
845
846  /** Returns A copy of the internal list of draining servers. */
847  public List<ServerName> getDrainingServersList() {
848    return new ArrayList<>(this.drainingServers);
849  }
850
851  public boolean isServerOnline(ServerName serverName) {
852    return serverName != null && onlineServers.containsKey(serverName);
853  }
854
855  public enum ServerLiveState {
856    LIVE,
857    DEAD,
858    UNKNOWN
859  }
860
861  /** Returns whether the server is online, dead, or unknown. */
862  public synchronized ServerLiveState isServerKnownAndOnline(ServerName serverName) {
863    return onlineServers.containsKey(serverName)
864      ? ServerLiveState.LIVE
865      : (deadservers.isDeadServer(serverName) ? ServerLiveState.DEAD : ServerLiveState.UNKNOWN);
866  }
867
868  /**
869   * Check if a server is known to be dead. A server can be online, or known to be dead, or unknown
870   * to this manager (i.e, not online, not known to be dead either; it is simply not tracked by the
871   * master any more, for example, a very old previous instance).
872   */
873  public synchronized boolean isServerDead(ServerName serverName) {
874    return serverName == null || deadservers.isDeadServer(serverName);
875  }
876
877  /**
878   * Check if a server is unknown. A server can be online, or known to be dead, or unknown to this
879   * manager (i.e, not online, not known to be dead either; it is simply not tracked by the master
880   * any more, for example, a very old previous instance).
881   */
882  public boolean isServerUnknown(ServerName serverName) {
883    return serverName == null
884      || (!onlineServers.containsKey(serverName) && !deadservers.isDeadServer(serverName));
885  }
886
887  public void shutdownCluster() {
888    String statusStr = "Cluster shutdown requested of master=" + this.master.getServerName();
889    LOG.info(statusStr);
890    this.clusterShutdown.set(true);
891    if (onlineServers.isEmpty()) {
892      // we do not synchronize here so this may cause a double stop, but not a big deal
893      master.stop("OnlineServer=0 right after cluster shutdown set");
894    }
895  }
896
897  public boolean isClusterShutdown() {
898    return this.clusterShutdown.get();
899  }
900
901  /**
902   * start chore in ServerManager
903   */
904  public void startChore() {
905    Configuration c = master.getConfiguration();
906    if (persistFlushedSequenceId) {
907      new Thread(() -> {
908        // after AM#loadMeta, RegionStates should be loaded, and some regions are
909        // deleted by drop/split/merge during removeDeletedRegionFromLoadedFlushedSequenceIds,
910        // but these deleted regions are not added back to RegionStates,
911        // so we can safely remove deleted regions.
912        removeDeletedRegionFromLoadedFlushedSequenceIds();
913      }, "RemoveDeletedRegionSyncThread").start();
914      int flushPeriod =
915        c.getInt(FLUSHEDSEQUENCEID_FLUSHER_INTERVAL, FLUSHEDSEQUENCEID_FLUSHER_INTERVAL_DEFAULT);
916      flushedSeqIdFlusher = new FlushedSequenceIdFlusher("FlushedSequenceIdFlusher", flushPeriod);
917      master.getChoreService().scheduleChore(flushedSeqIdFlusher);
918    }
919  }
920
921  /**
922   * Stop the ServerManager.
923   */
924  public void stop() {
925    if (flushedSeqIdFlusher != null) {
926      flushedSeqIdFlusher.shutdown();
927    }
928    if (persistFlushedSequenceId) {
929      try {
930        persistRegionLastFlushedSequenceIds();
931      } catch (IOException e) {
932        LOG.warn("Failed to persist last flushed sequence id of regions" + " to file system", e);
933      }
934    }
935  }
936
937  /**
938   * Creates a list of possible destinations for a region. It contains the online servers, but not
939   * the draining or dying servers.
940   * @param serversToExclude can be null if there is no server to exclude
941   */
942  public List<ServerName> createDestinationServersList(final List<ServerName> serversToExclude) {
943    Set<ServerName> destServers = new HashSet<>();
944    onlineServers.forEach((sn, sm) -> {
945      if (sm.getLastReportTimestamp() > 0) {
946        // This means we have already called regionServerReport at leaset once, then let's include
947        // this server for region assignment. This is an optimization to avoid assigning regions to
948        // an uninitialized server. See HBASE-25032 for more details.
949        destServers.add(sn);
950      }
951    });
952
953    if (serversToExclude != null) {
954      destServers.removeAll(serversToExclude);
955    }
956
957    // Loop through the draining server list and remove them from the server list
958    final List<ServerName> drainingServersCopy = getDrainingServersList();
959    destServers.removeAll(drainingServersCopy);
960
961    return new ArrayList<>(destServers);
962  }
963
964  /**
965   * Calls {@link #createDestinationServersList} without server to exclude.
966   */
967  public List<ServerName> createDestinationServersList() {
968    return createDestinationServersList(null);
969  }
970
971  /**
972   * To clear any dead server with same host name and port of any online server
973   */
974  void clearDeadServersWithSameHostNameAndPortOfOnlineServer() {
975    for (ServerName serverName : getOnlineServersList()) {
976      deadservers.cleanAllPreviousInstances(serverName);
977    }
978  }
979
980  /**
981   * Called by delete table and similar to notify the ServerManager that a region was removed.
982   */
983  public void removeRegion(final RegionInfo regionInfo) {
984    final byte[] encodedName = regionInfo.getEncodedNameAsBytes();
985    storeFlushedSequenceIdsByRegion.remove(encodedName);
986    flushedSequenceIdByRegion.remove(encodedName);
987  }
988
989  public boolean isRegionInServerManagerStates(final RegionInfo hri) {
990    final byte[] encodedName = hri.getEncodedNameAsBytes();
991    return (storeFlushedSequenceIdsByRegion.containsKey(encodedName)
992      || flushedSequenceIdByRegion.containsKey(encodedName));
993  }
994
995  /**
996   * Called by delete table and similar to notify the ServerManager that a region was removed.
997   */
998  public void removeRegions(final List<RegionInfo> regions) {
999    for (RegionInfo hri : regions) {
1000      removeRegion(hri);
1001    }
1002  }
1003
1004  /**
1005   * May return 0 when server is not online.
1006   */
1007  public int getVersionNumber(ServerName serverName) {
1008    ServerMetrics serverMetrics = onlineServers.get(serverName);
1009    return serverMetrics != null ? serverMetrics.getVersionNumber() : 0;
1010  }
1011
1012  /**
1013   * May return "0.0.0" when server is not online
1014   */
1015  public String getVersion(ServerName serverName) {
1016    ServerMetrics serverMetrics = onlineServers.get(serverName);
1017    return serverMetrics != null ? serverMetrics.getVersion() : "0.0.0";
1018  }
1019
1020  public int getInfoPort(ServerName serverName) {
1021    ServerMetrics serverMetrics = onlineServers.get(serverName);
1022    return serverMetrics != null ? serverMetrics.getInfoServerPort() : 0;
1023  }
1024
1025  /**
1026   * Persist last flushed sequence id of each region to HDFS
1027   * @throws IOException if persit to HDFS fails
1028   */
1029  private void persistRegionLastFlushedSequenceIds() throws IOException {
1030    if (isFlushSeqIdPersistInProgress) {
1031      return;
1032    }
1033    isFlushSeqIdPersistInProgress = true;
1034    try {
1035      Configuration conf = master.getConfiguration();
1036      Path rootDir = CommonFSUtils.getRootDir(conf);
1037      Path lastFlushedSeqIdPath = new Path(rootDir, LAST_FLUSHED_SEQ_ID_FILE);
1038      FileSystem fs = FileSystem.get(conf);
1039      if (fs.exists(lastFlushedSeqIdPath)) {
1040        LOG.info("Rewriting .lastflushedseqids file at: " + lastFlushedSeqIdPath);
1041        if (!fs.delete(lastFlushedSeqIdPath, false)) {
1042          throw new IOException("Unable to remove existing " + lastFlushedSeqIdPath);
1043        }
1044      } else {
1045        LOG.info("Writing .lastflushedseqids file at: " + lastFlushedSeqIdPath);
1046      }
1047      FSDataOutputStream out = fs.create(lastFlushedSeqIdPath);
1048      FlushedSequenceId.Builder flushedSequenceIdBuilder = FlushedSequenceId.newBuilder();
1049      try {
1050        for (Entry<byte[], Long> entry : flushedSequenceIdByRegion.entrySet()) {
1051          FlushedRegionSequenceId.Builder flushedRegionSequenceIdBuilder =
1052            FlushedRegionSequenceId.newBuilder();
1053          flushedRegionSequenceIdBuilder.setRegionEncodedName(ByteString.copyFrom(entry.getKey()));
1054          flushedRegionSequenceIdBuilder.setSeqId(entry.getValue());
1055          ConcurrentNavigableMap<byte[], Long> storeSeqIds =
1056            storeFlushedSequenceIdsByRegion.get(entry.getKey());
1057          if (storeSeqIds != null) {
1058            for (Entry<byte[], Long> store : storeSeqIds.entrySet()) {
1059              FlushedStoreSequenceId.Builder flushedStoreSequenceIdBuilder =
1060                FlushedStoreSequenceId.newBuilder();
1061              flushedStoreSequenceIdBuilder.setFamily(ByteString.copyFrom(store.getKey()));
1062              flushedStoreSequenceIdBuilder.setSeqId(store.getValue());
1063              flushedRegionSequenceIdBuilder.addStores(flushedStoreSequenceIdBuilder);
1064            }
1065          }
1066          flushedSequenceIdBuilder.addRegionSequenceId(flushedRegionSequenceIdBuilder);
1067        }
1068        flushedSequenceIdBuilder.build().writeDelimitedTo(out);
1069      } finally {
1070        if (out != null) {
1071          out.close();
1072        }
1073      }
1074    } finally {
1075      isFlushSeqIdPersistInProgress = false;
1076    }
1077  }
1078
1079  /**
1080   * Load last flushed sequence id of each region from HDFS, if persisted
1081   */
1082  public void loadLastFlushedSequenceIds() throws IOException {
1083    if (!persistFlushedSequenceId) {
1084      return;
1085    }
1086    Configuration conf = master.getConfiguration();
1087    Path rootDir = CommonFSUtils.getRootDir(conf);
1088    Path lastFlushedSeqIdPath = new Path(rootDir, LAST_FLUSHED_SEQ_ID_FILE);
1089    FileSystem fs = FileSystem.get(conf);
1090    if (!fs.exists(lastFlushedSeqIdPath)) {
1091      LOG.info("No .lastflushedseqids found at " + lastFlushedSeqIdPath
1092        + " will record last flushed sequence id"
1093        + " for regions by regionserver report all over again");
1094      return;
1095    } else {
1096      LOG.info("begin to load .lastflushedseqids at " + lastFlushedSeqIdPath);
1097    }
1098    FSDataInputStream in = fs.open(lastFlushedSeqIdPath);
1099    try {
1100      FlushedSequenceId flushedSequenceId = FlushedSequenceId.parseDelimitedFrom(in);
1101      if (flushedSequenceId == null) {
1102        LOG.info(".lastflushedseqids found at {} is empty", lastFlushedSeqIdPath);
1103        return;
1104      }
1105      for (FlushedRegionSequenceId flushedRegionSequenceId : flushedSequenceId
1106        .getRegionSequenceIdList()) {
1107        byte[] encodedRegionName = flushedRegionSequenceId.getRegionEncodedName().toByteArray();
1108        flushedSequenceIdByRegion.putIfAbsent(encodedRegionName,
1109          flushedRegionSequenceId.getSeqId());
1110        if (
1111          flushedRegionSequenceId.getStoresList() != null
1112            && flushedRegionSequenceId.getStoresList().size() != 0
1113        ) {
1114          ConcurrentNavigableMap<byte[], Long> storeFlushedSequenceId =
1115            computeIfAbsent(storeFlushedSequenceIdsByRegion, encodedRegionName,
1116              () -> new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR));
1117          for (FlushedStoreSequenceId flushedStoreSequenceId : flushedRegionSequenceId
1118            .getStoresList()) {
1119            storeFlushedSequenceId.put(flushedStoreSequenceId.getFamily().toByteArray(),
1120              flushedStoreSequenceId.getSeqId());
1121          }
1122        }
1123      }
1124    } finally {
1125      in.close();
1126    }
1127  }
1128
1129  /**
1130   * Regions may have been removed between latest persist of FlushedSequenceIds and master abort. So
1131   * after loading FlushedSequenceIds from file, and after meta loaded, we need to remove the
1132   * deleted region according to RegionStates.
1133   */
1134  public void removeDeletedRegionFromLoadedFlushedSequenceIds() {
1135    RegionStates regionStates = master.getAssignmentManager().getRegionStates();
1136    Iterator<byte[]> it = flushedSequenceIdByRegion.keySet().iterator();
1137    while (it.hasNext()) {
1138      byte[] regionEncodedName = it.next();
1139      if (regionStates.getRegionState(Bytes.toStringBinary(regionEncodedName)) == null) {
1140        it.remove();
1141        storeFlushedSequenceIdsByRegion.remove(regionEncodedName);
1142      }
1143    }
1144  }
1145
1146  private class FlushedSequenceIdFlusher extends ScheduledChore {
1147
1148    public FlushedSequenceIdFlusher(String name, int p) {
1149      super(name, master, p, 60 * 1000); // delay one minute before first execute
1150    }
1151
1152    @Override
1153    protected void chore() {
1154      try {
1155        persistRegionLastFlushedSequenceIds();
1156      } catch (IOException e) {
1157        LOG.debug("Failed to persist last flushed sequence id of regions" + " to file system", e);
1158      }
1159    }
1160  }
1161}