001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master;
019
020import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
021
022import java.io.IOException;
023import java.net.InetAddress;
024import java.util.ArrayList;
025import java.util.Collections;
026import java.util.HashSet;
027import java.util.Iterator;
028import java.util.List;
029import java.util.Map;
030import java.util.Map.Entry;
031import java.util.Set;
032import java.util.concurrent.ConcurrentNavigableMap;
033import java.util.concurrent.ConcurrentSkipListMap;
034import java.util.concurrent.CopyOnWriteArrayList;
035import java.util.concurrent.atomic.AtomicBoolean;
036import java.util.function.Predicate;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.fs.FSDataInputStream;
039import org.apache.hadoop.fs.FSDataOutputStream;
040import org.apache.hadoop.fs.FileSystem;
041import org.apache.hadoop.fs.Path;
042import org.apache.hadoop.hbase.ClockOutOfSyncException;
043import org.apache.hadoop.hbase.HConstants;
044import org.apache.hadoop.hbase.NotServingRegionException;
045import org.apache.hadoop.hbase.RegionMetrics;
046import org.apache.hadoop.hbase.ScheduledChore;
047import org.apache.hadoop.hbase.ServerMetrics;
048import org.apache.hadoop.hbase.ServerMetricsBuilder;
049import org.apache.hadoop.hbase.ServerName;
050import org.apache.hadoop.hbase.YouAreDeadException;
051import org.apache.hadoop.hbase.client.AsyncClusterConnection;
052import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
053import org.apache.hadoop.hbase.client.RegionInfo;
054import org.apache.hadoop.hbase.ipc.RemoteWithExtrasException;
055import org.apache.hadoop.hbase.master.assignment.RegionStates;
056import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
057import org.apache.hadoop.hbase.monitoring.MonitoredTask;
058import org.apache.hadoop.hbase.procedure2.Procedure;
059import org.apache.hadoop.hbase.util.Bytes;
060import org.apache.hadoop.hbase.util.CommonFSUtils;
061import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
062import org.apache.hadoop.hbase.util.FutureUtils;
063import org.apache.hadoop.hbase.zookeeper.ZKUtil;
064import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
065import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
066import org.apache.yetus.audience.InterfaceAudience;
067import org.apache.zookeeper.KeeperException;
068import org.slf4j.Logger;
069import org.slf4j.LoggerFactory;
070
071import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
072import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
073
074import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
075import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
076import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
077import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
078import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.FlushedRegionSequenceId;
079import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.FlushedSequenceId;
080import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.FlushedStoreSequenceId;
081import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
082
083/**
084 * The ServerManager class manages info about region servers.
085 * <p>
086 * Maintains lists of online and dead servers. Processes the startups, shutdowns, and deaths of
087 * region servers.
088 * <p>
089 * Servers are distinguished in two different ways. A given server has a location, specified by
090 * hostname and port, and of which there can only be one online at any given time. A server instance
091 * is specified by the location (hostname and port) as well as the startcode (timestamp from when
092 * the server was started). This is used to differentiate a restarted instance of a given server
093 * from the original instance.
094 * <p>
095 * If a sever is known not to be running any more, it is called dead. The dead server needs to be
096 * handled by a ServerShutdownHandler. If the handler is not enabled yet, the server can't be
097 * handled right away so it is queued up. After the handler is enabled, the server will be submitted
098 * to a handler to handle. However, the handler may be just partially enabled. If so, the server
099 * cannot be fully processed, and be queued up for further processing. A server is fully processed
100 * only after the handler is fully enabled and has completed the handling.
101 */
102@InterfaceAudience.Private
103public class ServerManager {
104  public static final String WAIT_ON_REGIONSERVERS_MAXTOSTART =
105    "hbase.master.wait.on.regionservers.maxtostart";
106
107  public static final String WAIT_ON_REGIONSERVERS_MINTOSTART =
108    "hbase.master.wait.on.regionservers.mintostart";
109
110  public static final String WAIT_ON_REGIONSERVERS_TIMEOUT =
111    "hbase.master.wait.on.regionservers.timeout";
112
113  public static final String WAIT_ON_REGIONSERVERS_INTERVAL =
114    "hbase.master.wait.on.regionservers.interval";
115
116  /**
117   * see HBASE-20727 if set to true, flushedSequenceIdByRegion and storeFlushedSequenceIdsByRegion
118   * will be persisted to HDFS and loaded when master restart to speed up log split
119   */
120  public static final String PERSIST_FLUSHEDSEQUENCEID =
121    "hbase.master.persist.flushedsequenceid.enabled";
122
123  public static final boolean PERSIST_FLUSHEDSEQUENCEID_DEFAULT = true;
124
125  public static final String FLUSHEDSEQUENCEID_FLUSHER_INTERVAL =
126    "hbase.master.flushedsequenceid.flusher.interval";
127
128  public static final int FLUSHEDSEQUENCEID_FLUSHER_INTERVAL_DEFAULT = 3 * 60 * 60 * 1000; // 3
129                                                                                           // hours
130
131  public static final String MAX_CLOCK_SKEW_MS = "hbase.master.maxclockskew";
132
133  private static final Logger LOG = LoggerFactory.getLogger(ServerManager.class);
134
135  // Set if we are to shutdown the cluster.
136  private AtomicBoolean clusterShutdown = new AtomicBoolean(false);
137
138  /**
139   * The last flushed sequence id for a region.
140   */
141  private final ConcurrentNavigableMap<byte[], Long> flushedSequenceIdByRegion =
142    new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
143
144  private boolean persistFlushedSequenceId = true;
145  private volatile boolean isFlushSeqIdPersistInProgress = false;
146  /** File on hdfs to store last flushed sequence id of regions */
147  private static final String LAST_FLUSHED_SEQ_ID_FILE = ".lastflushedseqids";
148  private FlushedSequenceIdFlusher flushedSeqIdFlusher;
149
150  /**
151   * The last flushed sequence id for a store in a region.
152   */
153  private final ConcurrentNavigableMap<byte[],
154    ConcurrentNavigableMap<byte[], Long>> storeFlushedSequenceIdsByRegion =
155      new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
156
157  /** Map of registered servers to their current load */
158  private final ConcurrentNavigableMap<ServerName, ServerMetrics> onlineServers =
159    new ConcurrentSkipListMap<>();
160
161  /** List of region servers that should not get any more new regions. */
162  private final ArrayList<ServerName> drainingServers = new ArrayList<>();
163
164  private final MasterServices master;
165  private final RegionServerList storage;
166
167  private final DeadServer deadservers = new DeadServer();
168
169  private final long maxSkew;
170  private final long warningSkew;
171
172  /** Listeners that are called on server events. */
173  private List<ServerListener> listeners = new CopyOnWriteArrayList<>();
174
175  /**
176   * Constructor.
177   */
178  public ServerManager(final MasterServices master, RegionServerList storage) {
179    this.master = master;
180    this.storage = storage;
181    Configuration c = master.getConfiguration();
182    maxSkew = c.getLong(MAX_CLOCK_SKEW_MS, 30000);
183    warningSkew = c.getLong("hbase.master.warningclockskew", 10000);
184    persistFlushedSequenceId =
185      c.getBoolean(PERSIST_FLUSHEDSEQUENCEID, PERSIST_FLUSHEDSEQUENCEID_DEFAULT);
186  }
187
188  /**
189   * Add the listener to the notification list.
190   * @param listener The ServerListener to register
191   */
192  public void registerListener(final ServerListener listener) {
193    this.listeners.add(listener);
194  }
195
196  /**
197   * Remove the listener from the notification list.
198   * @param listener The ServerListener to unregister
199   */
200  public boolean unregisterListener(final ServerListener listener) {
201    return this.listeners.remove(listener);
202  }
203
204  /**
205   * Let the server manager know a new regionserver has come online
206   * @param request       the startup request
207   * @param versionNumber the version number of the new regionserver
208   * @param version       the version of the new regionserver, could contain strings like "SNAPSHOT"
209   * @param ia            the InetAddress from which request is received
210   * @return The ServerName we know this server as.
211   */
212  ServerName regionServerStartup(RegionServerStartupRequest request, int versionNumber,
213    String version, InetAddress ia) throws IOException {
214    // Test for case where we get a region startup message from a regionserver
215    // that has been quickly restarted but whose znode expiration handler has
216    // not yet run, or from a server whose fail we are currently processing.
217    // Test its host+port combo is present in serverAddressToServerInfo. If it
218    // is, reject the server and trigger its expiration. The next time it comes
219    // in, it should have been removed from serverAddressToServerInfo and queued
220    // for processing by ProcessServerShutdown.
221
222    // if use-ip is enabled, we will use ip to expose Master/RS service for client,
223    // see HBASE-27304 for details.
224    boolean useIp = master.getConfiguration().getBoolean(HConstants.HBASE_SERVER_USEIP_ENABLED_KEY,
225      HConstants.HBASE_SERVER_USEIP_ENABLED_DEFAULT);
226    String isaHostName = useIp ? ia.getHostAddress() : ia.getHostName();
227    final String hostname =
228      request.hasUseThisHostnameInstead() ? request.getUseThisHostnameInstead() : isaHostName;
229    ServerName sn = ServerName.valueOf(hostname, request.getPort(), request.getServerStartCode());
230    checkClockSkew(sn, request.getServerCurrentTime());
231    checkIsDead(sn, "STARTUP");
232    if (!checkAndRecordNewServer(sn, ServerMetricsBuilder.of(sn, versionNumber, version))) {
233      LOG.warn(
234        "THIS SHOULD NOT HAPPEN, RegionServerStartup" + " could not record the server: " + sn);
235    }
236    storage.started(sn);
237    return sn;
238  }
239
240  /**
241   * Updates last flushed sequence Ids for the regions on server sn
242   */
243  private void updateLastFlushedSequenceIds(ServerName sn, ServerMetrics hsl) {
244    for (Entry<byte[], RegionMetrics> entry : hsl.getRegionMetrics().entrySet()) {
245      byte[] encodedRegionName = Bytes.toBytes(RegionInfo.encodeRegionName(entry.getKey()));
246      Long existingValue = flushedSequenceIdByRegion.get(encodedRegionName);
247      long l = entry.getValue().getCompletedSequenceId();
248      // Don't let smaller sequence ids override greater sequence ids.
249      if (LOG.isTraceEnabled()) {
250        LOG.trace(Bytes.toString(encodedRegionName) + ", existingValue=" + existingValue
251          + ", completeSequenceId=" + l);
252      }
253      if (existingValue == null || (l != HConstants.NO_SEQNUM && l > existingValue)) {
254        flushedSequenceIdByRegion.put(encodedRegionName, l);
255      } else if (l != HConstants.NO_SEQNUM && l < existingValue) {
256        LOG.warn("RegionServer " + sn + " indicates a last flushed sequence id (" + l
257          + ") that is less than the previous last flushed sequence id (" + existingValue
258          + ") for region " + Bytes.toString(entry.getKey()) + " Ignoring.");
259      }
260      ConcurrentNavigableMap<byte[], Long> storeFlushedSequenceId =
261        computeIfAbsent(storeFlushedSequenceIdsByRegion, encodedRegionName,
262          () -> new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR));
263      for (Entry<byte[], Long> storeSeqId : entry.getValue().getStoreSequenceId().entrySet()) {
264        byte[] family = storeSeqId.getKey();
265        existingValue = storeFlushedSequenceId.get(family);
266        l = storeSeqId.getValue();
267        if (LOG.isTraceEnabled()) {
268          LOG.trace(Bytes.toString(encodedRegionName) + ", family=" + Bytes.toString(family)
269            + ", existingValue=" + existingValue + ", completeSequenceId=" + l);
270        }
271        // Don't let smaller sequence ids override greater sequence ids.
272        if (existingValue == null || (l != HConstants.NO_SEQNUM && l > existingValue.longValue())) {
273          storeFlushedSequenceId.put(family, l);
274        }
275      }
276    }
277  }
278
279  public void regionServerReport(ServerName sn, ServerMetrics sl) throws YouAreDeadException {
280    checkIsDead(sn, "REPORT");
281    if (null == this.onlineServers.replace(sn, sl)) {
282      // Already have this host+port combo and its just different start code?
283      // Just let the server in. Presume master joining a running cluster.
284      // recordNewServer is what happens at the end of reportServerStartup.
285      // The only thing we are skipping is passing back to the regionserver
286      // the ServerName to use. Here we presume a master has already done
287      // that so we'll press on with whatever it gave us for ServerName.
288      if (!checkAndRecordNewServer(sn, sl)) {
289        LOG.info("RegionServerReport ignored, could not record the server: " + sn);
290        return; // Not recorded, so no need to move on
291      }
292    }
293    updateLastFlushedSequenceIds(sn, sl);
294  }
295
296  /**
297   * Check is a server of same host and port already exists, if not, or the existed one got a
298   * smaller start code, record it.
299   * @param serverName the server to check and record
300   * @param sl         the server load on the server
301   * @return true if the server is recorded, otherwise, false
302   */
303  boolean checkAndRecordNewServer(final ServerName serverName, final ServerMetrics sl) {
304    ServerName existingServer = null;
305    synchronized (this.onlineServers) {
306      existingServer = findServerWithSameHostnamePortWithLock(serverName);
307      if (existingServer != null && (existingServer.getStartcode() > serverName.getStartcode())) {
308        LOG.info("Server serverName=" + serverName + " rejected; we already have "
309          + existingServer.toString() + " registered with same hostname and port");
310        return false;
311      }
312      recordNewServerWithLock(serverName, sl);
313    }
314
315    // Tell our listeners that a server was added
316    if (!this.listeners.isEmpty()) {
317      for (ServerListener listener : this.listeners) {
318        listener.serverAdded(serverName);
319      }
320    }
321
322    // Note that we assume that same ts means same server, and don't expire in that case.
323    // TODO: ts can theoretically collide due to clock shifts, so this is a bit hacky.
324    if (existingServer != null && (existingServer.getStartcode() < serverName.getStartcode())) {
325      LOG.info("Triggering server recovery; existingServer " + existingServer
326        + " looks stale, new server:" + serverName);
327      expireServer(existingServer);
328    }
329    return true;
330  }
331
332  /**
333   * Find out the region servers crashed between the crash of the previous master instance and the
334   * current master instance and schedule SCP for them.
335   * <p/>
336   * Since the {@code RegionServerTracker} has already helped us to construct the online servers set
337   * by scanning zookeeper, now we can compare the online servers with {@code liveServersFromWALDir}
338   * to find out whether there are servers which are already dead.
339   * <p/>
340   * Must be called inside the initialization method of {@code RegionServerTracker} to avoid
341   * concurrency issue.
342   * @param deadServersFromPE     the region servers which already have a SCP associated.
343   * @param liveServersFromWALDir the live region servers from wal directory.
344   */
345  void findDeadServersAndProcess(Set<ServerName> deadServersFromPE,
346    Set<ServerName> liveServersFromWALDir) {
347    deadServersFromPE.forEach(deadservers::putIfAbsent);
348    liveServersFromWALDir.stream().filter(sn -> !onlineServers.containsKey(sn))
349      .forEach(this::expireServer);
350  }
351
352  /**
353   * Checks if the clock skew between the server and the master. If the clock skew exceeds the
354   * configured max, it will throw an exception; if it exceeds the configured warning threshold, it
355   * will log a warning but start normally.
356   * @param serverName Incoming servers's name
357   * @throws ClockOutOfSyncException if the skew exceeds the configured max value
358   */
359  private void checkClockSkew(final ServerName serverName, final long serverCurrentTime)
360    throws ClockOutOfSyncException {
361    long skew = Math.abs(EnvironmentEdgeManager.currentTime() - serverCurrentTime);
362    if (skew > maxSkew) {
363      String message = "Server " + serverName + " has been "
364        + "rejected; Reported time is too far out of sync with master.  " + "Time difference of "
365        + skew + "ms > max allowed of " + maxSkew + "ms";
366      LOG.warn(message);
367      throw new ClockOutOfSyncException(message);
368    } else if (skew > warningSkew) {
369      String message = "Reported time for server " + serverName + " is out of sync with master "
370        + "by " + skew + "ms. (Warning threshold is " + warningSkew + "ms; " + "error threshold is "
371        + maxSkew + "ms)";
372      LOG.warn(message);
373    }
374  }
375
376  /**
377   * Called when RegionServer first reports in for duty and thereafter each time it heartbeats to
378   * make sure it is has not been figured for dead. If this server is on the dead list, reject it
379   * with a YouAreDeadException. If it was dead but came back with a new start code, remove the old
380   * entry from the dead list.
381   * @param what START or REPORT
382   */
383  private void checkIsDead(final ServerName serverName, final String what)
384    throws YouAreDeadException {
385    if (this.deadservers.isDeadServer(serverName)) {
386      // Exact match: host name, port and start code all match with existing one of the
387      // dead servers. So, this server must be dead. Tell it to kill itself.
388      String message =
389        "Server " + what + " rejected; currently processing " + serverName + " as dead server";
390      LOG.debug(message);
391      throw new YouAreDeadException(message);
392    }
393    // Remove dead server with same hostname and port of newly checking in rs after master
394    // initialization. See HBASE-5916 for more information.
395    if (
396      (this.master == null || this.master.isInitialized())
397        && this.deadservers.cleanPreviousInstance(serverName)
398    ) {
399      // This server has now become alive after we marked it as dead.
400      // We removed it's previous entry from the dead list to reflect it.
401      LOG.debug("{} {} came back up, removed it from the dead servers list", what, serverName);
402    }
403  }
404
405  /**
406   * Assumes onlineServers is locked.
407   * @return ServerName with matching hostname and port.
408   */
409  public ServerName findServerWithSameHostnamePortWithLock(final ServerName serverName) {
410    ServerName end =
411      ServerName.valueOf(serverName.getHostname(), serverName.getPort(), Long.MAX_VALUE);
412
413    ServerName r = onlineServers.lowerKey(end);
414    if (r != null) {
415      if (ServerName.isSameAddress(r, serverName)) {
416        return r;
417      }
418    }
419    return null;
420  }
421
422  /**
423   * Adds the onlineServers list. onlineServers should be locked.
424   * @param serverName The remote servers name.
425   */
426  void recordNewServerWithLock(final ServerName serverName, final ServerMetrics sl) {
427    LOG.info("Registering regionserver=" + serverName);
428    this.onlineServers.put(serverName, sl);
429  }
430
431  public ConcurrentNavigableMap<byte[], Long> getFlushedSequenceIdByRegion() {
432    return flushedSequenceIdByRegion;
433  }
434
435  public RegionStoreSequenceIds getLastFlushedSequenceId(byte[] encodedRegionName) {
436    RegionStoreSequenceIds.Builder builder = RegionStoreSequenceIds.newBuilder();
437    Long seqId = flushedSequenceIdByRegion.get(encodedRegionName);
438    builder.setLastFlushedSequenceId(seqId != null ? seqId.longValue() : HConstants.NO_SEQNUM);
439    Map<byte[], Long> storeFlushedSequenceId =
440      storeFlushedSequenceIdsByRegion.get(encodedRegionName);
441    if (storeFlushedSequenceId != null) {
442      for (Map.Entry<byte[], Long> entry : storeFlushedSequenceId.entrySet()) {
443        builder.addStoreSequenceId(StoreSequenceId.newBuilder()
444          .setFamilyName(UnsafeByteOperations.unsafeWrap(entry.getKey()))
445          .setSequenceId(entry.getValue().longValue()).build());
446      }
447    }
448    return builder.build();
449  }
450
451  /** Returns ServerMetrics if serverName is known else null */
452  public ServerMetrics getLoad(final ServerName serverName) {
453    return this.onlineServers.get(serverName);
454  }
455
456  /**
457   * Compute the average load across all region servers. Currently, this uses a very naive
458   * computation - just uses the number of regions being served, ignoring stats about number of
459   * requests.
460   * @return the average load
461   */
462  public double getAverageLoad() {
463    int totalLoad = 0;
464    int numServers = 0;
465    for (ServerMetrics sl : this.onlineServers.values()) {
466      numServers++;
467      totalLoad += sl.getRegionMetrics().size();
468    }
469    return numServers == 0 ? 0 : (double) totalLoad / (double) numServers;
470  }
471
472  /** Returns the count of active regionservers */
473  public int countOfRegionServers() {
474    // Presumes onlineServers is a concurrent map
475    return this.onlineServers.size();
476  }
477
478  /** Returns Read-only map of servers to serverinfo */
479  public Map<ServerName, ServerMetrics> getOnlineServers() {
480    // Presumption is that iterating the returned Map is OK.
481    synchronized (this.onlineServers) {
482      return Collections.unmodifiableMap(this.onlineServers);
483    }
484  }
485
486  public DeadServer getDeadServers() {
487    return this.deadservers;
488  }
489
490  /**
491   * Checks if any dead servers are currently in progress.
492   * @return true if any RS are being processed as dead, false if not
493   */
494  public boolean areDeadServersInProgress() throws IOException {
495    return master.getProcedures().stream()
496      .anyMatch(p -> !p.isFinished() && p instanceof ServerCrashProcedure);
497  }
498
499  void letRegionServersShutdown() {
500    long previousLogTime = 0;
501    ServerName sn = master.getServerName();
502    ZKWatcher zkw = master.getZooKeeper();
503    int onlineServersCt;
504    while ((onlineServersCt = onlineServers.size()) > 0) {
505      if (EnvironmentEdgeManager.currentTime() > (previousLogTime + 1000)) {
506        Set<ServerName> remainingServers = onlineServers.keySet();
507        synchronized (onlineServers) {
508          if (remainingServers.size() == 1 && remainingServers.contains(sn)) {
509            // Master will delete itself later.
510            return;
511          }
512        }
513        StringBuilder sb = new StringBuilder();
514        // It's ok here to not sync on onlineServers - merely logging
515        for (ServerName key : remainingServers) {
516          if (sb.length() > 0) {
517            sb.append(", ");
518          }
519          sb.append(key);
520        }
521        LOG.info("Waiting on regionserver(s) " + sb.toString());
522        previousLogTime = EnvironmentEdgeManager.currentTime();
523      }
524
525      try {
526        List<String> servers = getRegionServersInZK(zkw);
527        if (
528          servers == null || servers.isEmpty()
529            || (servers.size() == 1 && servers.contains(sn.toString()))
530        ) {
531          LOG.info("ZK shows there is only the master self online, exiting now");
532          // Master could have lost some ZK events, no need to wait more.
533          break;
534        }
535      } catch (KeeperException ke) {
536        LOG.warn("Failed to list regionservers", ke);
537        // ZK is malfunctioning, don't hang here
538        break;
539      }
540      synchronized (onlineServers) {
541        try {
542          if (onlineServersCt == onlineServers.size()) onlineServers.wait(100);
543        } catch (InterruptedException ignored) {
544          // continue
545        }
546      }
547    }
548  }
549
550  private List<String> getRegionServersInZK(final ZKWatcher zkw) throws KeeperException {
551    return ZKUtil.listChildrenNoWatch(zkw, zkw.getZNodePaths().rsZNode);
552  }
553
554  /**
555   * Expire the passed server. Add it to list of dead servers and queue a shutdown processing.
556   * @return pid if we queued a ServerCrashProcedure else {@link Procedure#NO_PROC_ID} if we did not
557   *         (could happen for many reasons including the fact that its this server that is going
558   *         down or we already have queued an SCP for this server or SCP processing is currently
559   *         disabled because we are in startup phase).
560   */
561  // Redo test so we can make this protected.
562  public synchronized long expireServer(final ServerName serverName) {
563    return expireServer(serverName, false);
564
565  }
566
567  synchronized long expireServer(final ServerName serverName, boolean force) {
568    // THIS server is going down... can't handle our own expiration.
569    if (serverName.equals(master.getServerName())) {
570      if (!(master.isAborted() || master.isStopped())) {
571        master.stop("We lost our znode?");
572      }
573      return Procedure.NO_PROC_ID;
574    }
575    if (this.deadservers.isDeadServer(serverName)) {
576      LOG.warn("Expiration called on {} but already in DeadServer", serverName);
577      return Procedure.NO_PROC_ID;
578    }
579    moveFromOnlineToDeadServers(serverName);
580
581    // If server is in draining mode, remove corresponding znode
582    // In some tests, the mocked HM may not have ZK Instance, hence null check
583    if (master.getZooKeeper() != null) {
584      String drainingZnode = ZNodePaths
585        .joinZNode(master.getZooKeeper().getZNodePaths().drainingZNode, serverName.getServerName());
586      try {
587        ZKUtil.deleteNodeFailSilent(master.getZooKeeper(), drainingZnode);
588      } catch (KeeperException e) {
589        LOG.warn(
590          "Error deleting the draining znode for stopping server " + serverName.getServerName(), e);
591      }
592    }
593
594    // If cluster is going down, yes, servers are going to be expiring; don't
595    // process as a dead server
596    if (isClusterShutdown()) {
597      LOG.info("Cluster shutdown set; " + serverName + " expired; onlineServers="
598        + this.onlineServers.size());
599      if (this.onlineServers.isEmpty()) {
600        master.stop("Cluster shutdown set; onlineServer=0");
601      }
602      return Procedure.NO_PROC_ID;
603    }
604    LOG.info("Processing expiration of " + serverName + " on " + this.master.getServerName());
605    long pid = master.getAssignmentManager().submitServerCrash(serverName, true, force);
606    storage.expired(serverName);
607    // Tell our listeners that a server was removed
608    if (!this.listeners.isEmpty()) {
609      this.listeners.stream().forEach(l -> l.serverRemoved(serverName));
610    }
611    // trigger a persist of flushedSeqId
612    if (flushedSeqIdFlusher != null) {
613      flushedSeqIdFlusher.triggerNow();
614    }
615    return pid;
616  }
617
618  /**
619   * Called when server has expired.
620   */
621  // Locking in this class needs cleanup.
622  public synchronized void moveFromOnlineToDeadServers(final ServerName sn) {
623    synchronized (this.onlineServers) {
624      boolean online = this.onlineServers.containsKey(sn);
625      if (online) {
626        // Remove the server from the known servers lists and update load info BUT
627        // add to deadservers first; do this so it'll show in dead servers list if
628        // not in online servers list.
629        this.deadservers.putIfAbsent(sn);
630        this.onlineServers.remove(sn);
631        onlineServers.notifyAll();
632      } else {
633        // If not online, that is odd but may happen if 'Unknown Servers' -- where meta
634        // has references to servers not online nor in dead servers list. If
635        // 'Unknown Server', don't add to DeadServers else will be there for ever.
636        LOG.trace("Expiration of {} but server not online", sn);
637      }
638    }
639  }
640
641  /*
642   * Remove the server from the drain list.
643   */
644  public synchronized boolean removeServerFromDrainList(final ServerName sn) {
645    // Warn if the server (sn) is not online. ServerName is of the form:
646    // <hostname> , <port> , <startcode>
647
648    if (!this.isServerOnline(sn)) {
649      LOG.warn("Server " + sn + " is not currently online. "
650        + "Removing from draining list anyway, as requested.");
651    }
652    // Remove the server from the draining servers lists.
653    return this.drainingServers.remove(sn);
654  }
655
656  /**
657   * Add the server to the drain list.
658   * @return True if the server is added or the server is already on the drain list.
659   */
660  public synchronized boolean addServerToDrainList(final ServerName sn) {
661    // Warn if the server (sn) is not online. ServerName is of the form:
662    // <hostname> , <port> , <startcode>
663
664    if (!this.isServerOnline(sn)) {
665      LOG.warn("Server " + sn + " is not currently online. "
666        + "Ignoring request to add it to draining list.");
667      return false;
668    }
669    // Add the server to the draining servers lists, if it's not already in
670    // it.
671    if (this.drainingServers.contains(sn)) {
672      LOG.warn("Server " + sn + " is already in the draining server list."
673        + "Ignoring request to add it again.");
674      return true;
675    }
676    LOG.info("Server " + sn + " added to draining server list.");
677    return this.drainingServers.add(sn);
678  }
679
680  /**
681   * Contacts a region server and waits up to timeout ms to close the region. This bypasses the
682   * active hmaster. Pass -1 as timeout if you do not want to wait on result.
683   */
684  public static void closeRegionSilentlyAndWait(AsyncClusterConnection connection,
685    ServerName server, RegionInfo region, long timeout) throws IOException, InterruptedException {
686    AsyncRegionServerAdmin admin = connection.getRegionServerAdmin(server);
687    try {
688      FutureUtils.get(
689        admin.closeRegion(ProtobufUtil.buildCloseRegionRequest(server, region.getRegionName())));
690    } catch (IOException e) {
691      LOG.warn("Exception when closing region: " + region.getRegionNameAsString(), e);
692    }
693    if (timeout < 0) {
694      return;
695    }
696    long expiration = timeout + EnvironmentEdgeManager.currentTime();
697    while (EnvironmentEdgeManager.currentTime() < expiration) {
698      try {
699        RegionInfo rsRegion = ProtobufUtil.toRegionInfo(FutureUtils
700          .get(
701            admin.getRegionInfo(RequestConverter.buildGetRegionInfoRequest(region.getRegionName())))
702          .getRegionInfo());
703        if (rsRegion == null) {
704          return;
705        }
706      } catch (IOException ioe) {
707        if (
708          ioe instanceof NotServingRegionException
709            || (ioe instanceof RemoteWithExtrasException && ((RemoteWithExtrasException) ioe)
710              .unwrapRemoteException() instanceof NotServingRegionException)
711        ) {
712          // no need to retry again
713          return;
714        }
715        LOG.warn("Exception when retrieving regioninfo from: " + region.getRegionNameAsString(),
716          ioe);
717      }
718      Thread.sleep(1000);
719    }
720    throw new IOException("Region " + region + " failed to close within" + " timeout " + timeout);
721  }
722
723  /**
724   * Calculate min necessary to start. This is not an absolute. It is just a friction that will
725   * cause us hang around a bit longer waiting on RegionServers to check-in.
726   */
727  private int getMinToStart() {
728    if (master.isInMaintenanceMode()) {
729      // If in maintenance mode, then in process region server hosting meta will be the only server
730      // available
731      return 1;
732    }
733
734    int minimumRequired = 1;
735    int minToStart = this.master.getConfiguration().getInt(WAIT_ON_REGIONSERVERS_MINTOSTART, -1);
736    // Ensure we are never less than minimumRequired else stuff won't work.
737    return Math.max(minToStart, minimumRequired);
738  }
739
740  /**
741   * Wait for the region servers to report in. We will wait until one of this condition is met: -
742   * the master is stopped - the 'hbase.master.wait.on.regionservers.maxtostart' number of region
743   * servers is reached - the 'hbase.master.wait.on.regionservers.mintostart' is reached AND there
744   * have been no new region server in for 'hbase.master.wait.on.regionservers.interval' time AND
745   * the 'hbase.master.wait.on.regionservers.timeout' is reached
746   */
747  public void waitForRegionServers(MonitoredTask status) throws InterruptedException {
748    final long interval =
749      this.master.getConfiguration().getLong(WAIT_ON_REGIONSERVERS_INTERVAL, 1500);
750    final long timeout =
751      this.master.getConfiguration().getLong(WAIT_ON_REGIONSERVERS_TIMEOUT, 4500);
752    // Min is not an absolute; just a friction making us wait longer on server checkin.
753    int minToStart = getMinToStart();
754    int maxToStart =
755      this.master.getConfiguration().getInt(WAIT_ON_REGIONSERVERS_MAXTOSTART, Integer.MAX_VALUE);
756    if (maxToStart < minToStart) {
757      LOG.warn(String.format("The value of '%s' (%d) is set less than '%s' (%d), ignoring.",
758        WAIT_ON_REGIONSERVERS_MAXTOSTART, maxToStart, WAIT_ON_REGIONSERVERS_MINTOSTART,
759        minToStart));
760      maxToStart = Integer.MAX_VALUE;
761    }
762
763    long now = EnvironmentEdgeManager.currentTime();
764    final long startTime = now;
765    long slept = 0;
766    long lastLogTime = 0;
767    long lastCountChange = startTime;
768    int count = countOfRegionServers();
769    int oldCount = 0;
770    // This while test is a little hard to read. We try to comment it in below but in essence:
771    // Wait if Master is not stopped and the number of regionservers that have checked-in is
772    // less than the maxToStart. Both of these conditions will be true near universally.
773    // Next, we will keep cycling if ANY of the following three conditions are true:
774    // 1. The time since a regionserver registered is < interval (means servers are actively
775    // checking in).
776    // 2. We are under the total timeout.
777    // 3. The count of servers is < minimum.
778    for (ServerListener listener : this.listeners) {
779      listener.waiting();
780    }
781    while (
782      !this.master.isStopped() && !isClusterShutdown() && count < maxToStart
783        && ((lastCountChange + interval) > now || timeout > slept || count < minToStart)
784    ) {
785      // Log some info at every interval time or if there is a change
786      if (oldCount != count || lastLogTime + interval < now) {
787        lastLogTime = now;
788        String msg =
789          "Waiting on regionserver count=" + count + "; waited=" + slept + "ms, expecting min="
790            + minToStart + " server(s), max=" + getStrForMax(maxToStart) + " server(s), "
791            + "timeout=" + timeout + "ms, lastChange=" + (now - lastCountChange) + "ms";
792        LOG.info(msg);
793        status.setStatus(msg);
794      }
795
796      // We sleep for some time
797      final long sleepTime = 50;
798      Thread.sleep(sleepTime);
799      now = EnvironmentEdgeManager.currentTime();
800      slept = now - startTime;
801
802      oldCount = count;
803      count = countOfRegionServers();
804      if (count != oldCount) {
805        lastCountChange = now;
806      }
807    }
808    // Did we exit the loop because cluster is going down?
809    if (isClusterShutdown()) {
810      this.master.stop("Cluster shutdown");
811    }
812    LOG.info("Finished waiting on RegionServer count=" + count + "; waited=" + slept + "ms,"
813      + " expected min=" + minToStart + " server(s), max=" + getStrForMax(maxToStart)
814      + " server(s)," + " master is " + (this.master.isStopped() ? "stopped." : "running"));
815  }
816
817  private String getStrForMax(final int max) {
818    return max == Integer.MAX_VALUE ? "NO_LIMIT" : Integer.toString(max);
819  }
820
821  /** Returns A copy of the internal list of online servers. */
822  public List<ServerName> getOnlineServersList() {
823    // TODO: optimize the load balancer call so we don't need to make a new list
824    // TODO: FIX. THIS IS POPULAR CALL.
825    return new ArrayList<>(this.onlineServers.keySet());
826  }
827
828  /**
829   * @param keys                 The target server name
830   * @param idleServerPredicator Evaluates the server on the given load
831   * @return A copy of the internal list of online servers matched by the predicator
832   */
833  public List<ServerName> getOnlineServersListWithPredicator(List<ServerName> keys,
834    Predicate<ServerMetrics> idleServerPredicator) {
835    List<ServerName> names = new ArrayList<>();
836    if (keys != null && idleServerPredicator != null) {
837      keys.forEach(name -> {
838        ServerMetrics load = onlineServers.get(name);
839        if (load != null) {
840          if (idleServerPredicator.test(load)) {
841            names.add(name);
842          }
843        }
844      });
845    }
846    return names;
847  }
848
849  /** Returns A copy of the internal list of draining servers. */
850  public List<ServerName> getDrainingServersList() {
851    return new ArrayList<>(this.drainingServers);
852  }
853
854  public boolean isServerOnline(ServerName serverName) {
855    return serverName != null && onlineServers.containsKey(serverName);
856  }
857
858  public enum ServerLiveState {
859    LIVE,
860    DEAD,
861    UNKNOWN
862  }
863
864  /** Returns whether the server is online, dead, or unknown. */
865  public synchronized ServerLiveState isServerKnownAndOnline(ServerName serverName) {
866    return onlineServers.containsKey(serverName)
867      ? ServerLiveState.LIVE
868      : (deadservers.isDeadServer(serverName) ? ServerLiveState.DEAD : ServerLiveState.UNKNOWN);
869  }
870
871  /**
872   * Check if a server is known to be dead. A server can be online, or known to be dead, or unknown
873   * to this manager (i.e, not online, not known to be dead either; it is simply not tracked by the
874   * master any more, for example, a very old previous instance).
875   */
876  public synchronized boolean isServerDead(ServerName serverName) {
877    return serverName == null || deadservers.isDeadServer(serverName);
878  }
879
880  /**
881   * Check if a server is unknown. A server can be online, or known to be dead, or unknown to this
882   * manager (i.e, not online, not known to be dead either; it is simply not tracked by the master
883   * any more, for example, a very old previous instance).
884   */
885  public boolean isServerUnknown(ServerName serverName) {
886    return serverName == null
887      || (!onlineServers.containsKey(serverName) && !deadservers.isDeadServer(serverName));
888  }
889
890  public void shutdownCluster() {
891    String statusStr = "Cluster shutdown requested of master=" + this.master.getServerName();
892    LOG.info(statusStr);
893    this.clusterShutdown.set(true);
894    if (onlineServers.isEmpty()) {
895      // we do not synchronize here so this may cause a double stop, but not a big deal
896      master.stop("OnlineServer=0 right after cluster shutdown set");
897    }
898  }
899
900  public boolean isClusterShutdown() {
901    return this.clusterShutdown.get();
902  }
903
904  /**
905   * start chore in ServerManager
906   */
907  public void startChore() {
908    Configuration c = master.getConfiguration();
909    if (persistFlushedSequenceId) {
910      new Thread(() -> {
911        // after AM#loadMeta, RegionStates should be loaded, and some regions are
912        // deleted by drop/split/merge during removeDeletedRegionFromLoadedFlushedSequenceIds,
913        // but these deleted regions are not added back to RegionStates,
914        // so we can safely remove deleted regions.
915        removeDeletedRegionFromLoadedFlushedSequenceIds();
916      }, "RemoveDeletedRegionSyncThread").start();
917      int flushPeriod =
918        c.getInt(FLUSHEDSEQUENCEID_FLUSHER_INTERVAL, FLUSHEDSEQUENCEID_FLUSHER_INTERVAL_DEFAULT);
919      flushedSeqIdFlusher = new FlushedSequenceIdFlusher("FlushedSequenceIdFlusher", flushPeriod);
920      master.getChoreService().scheduleChore(flushedSeqIdFlusher);
921    }
922  }
923
924  /**
925   * Stop the ServerManager.
926   */
927  public void stop() {
928    if (flushedSeqIdFlusher != null) {
929      flushedSeqIdFlusher.shutdown();
930    }
931    if (persistFlushedSequenceId) {
932      try {
933        persistRegionLastFlushedSequenceIds();
934      } catch (IOException e) {
935        LOG.warn("Failed to persist last flushed sequence id of regions" + " to file system", e);
936      }
937    }
938  }
939
940  /**
941   * Creates a list of possible destinations for a region. It contains the online servers, but not
942   * the draining or dying servers.
943   * @param serversToExclude can be null if there is no server to exclude
944   */
945  public List<ServerName> createDestinationServersList(final List<ServerName> serversToExclude) {
946    Set<ServerName> destServers = new HashSet<>();
947    onlineServers.forEach((sn, sm) -> {
948      if (sm.getLastReportTimestamp() > 0) {
949        // This means we have already called regionServerReport at leaset once, then let's include
950        // this server for region assignment. This is an optimization to avoid assigning regions to
951        // an uninitialized server. See HBASE-25032 for more details.
952        destServers.add(sn);
953      }
954    });
955
956    if (serversToExclude != null) {
957      destServers.removeAll(serversToExclude);
958    }
959
960    // Loop through the draining server list and remove them from the server list
961    final List<ServerName> drainingServersCopy = getDrainingServersList();
962    destServers.removeAll(drainingServersCopy);
963
964    return new ArrayList<>(destServers);
965  }
966
967  /**
968   * Calls {@link #createDestinationServersList} without server to exclude.
969   */
970  public List<ServerName> createDestinationServersList() {
971    return createDestinationServersList(null);
972  }
973
974  /**
975   * To clear any dead server with same host name and port of any online server
976   */
977  void clearDeadServersWithSameHostNameAndPortOfOnlineServer() {
978    for (ServerName serverName : getOnlineServersList()) {
979      deadservers.cleanAllPreviousInstances(serverName);
980    }
981  }
982
983  /**
984   * Called by delete table and similar to notify the ServerManager that a region was removed.
985   */
986  public void removeRegion(final RegionInfo regionInfo) {
987    final byte[] encodedName = regionInfo.getEncodedNameAsBytes();
988    storeFlushedSequenceIdsByRegion.remove(encodedName);
989    flushedSequenceIdByRegion.remove(encodedName);
990  }
991
992  public boolean isRegionInServerManagerStates(final RegionInfo hri) {
993    final byte[] encodedName = hri.getEncodedNameAsBytes();
994    return (storeFlushedSequenceIdsByRegion.containsKey(encodedName)
995      || flushedSequenceIdByRegion.containsKey(encodedName));
996  }
997
998  /**
999   * Called by delete table and similar to notify the ServerManager that a region was removed.
1000   */
1001  public void removeRegions(final List<RegionInfo> regions) {
1002    for (RegionInfo hri : regions) {
1003      removeRegion(hri);
1004    }
1005  }
1006
1007  /**
1008   * May return 0 when server is not online.
1009   */
1010  public int getVersionNumber(ServerName serverName) {
1011    ServerMetrics serverMetrics = onlineServers.get(serverName);
1012    return serverMetrics != null ? serverMetrics.getVersionNumber() : 0;
1013  }
1014
1015  /**
1016   * May return "0.0.0" when server is not online
1017   */
1018  public String getVersion(ServerName serverName) {
1019    ServerMetrics serverMetrics = onlineServers.get(serverName);
1020    return serverMetrics != null ? serverMetrics.getVersion() : "0.0.0";
1021  }
1022
1023  public int getInfoPort(ServerName serverName) {
1024    ServerMetrics serverMetrics = onlineServers.get(serverName);
1025    return serverMetrics != null ? serverMetrics.getInfoServerPort() : 0;
1026  }
1027
1028  /**
1029   * Persist last flushed sequence id of each region to HDFS
1030   * @throws IOException if persit to HDFS fails
1031   */
1032  private void persistRegionLastFlushedSequenceIds() throws IOException {
1033    if (isFlushSeqIdPersistInProgress) {
1034      return;
1035    }
1036    isFlushSeqIdPersistInProgress = true;
1037    try {
1038      Configuration conf = master.getConfiguration();
1039      Path rootDir = CommonFSUtils.getRootDir(conf);
1040      Path lastFlushedSeqIdPath = new Path(rootDir, LAST_FLUSHED_SEQ_ID_FILE);
1041      FileSystem fs = FileSystem.get(conf);
1042      if (fs.exists(lastFlushedSeqIdPath)) {
1043        LOG.info("Rewriting .lastflushedseqids file at: " + lastFlushedSeqIdPath);
1044        if (!fs.delete(lastFlushedSeqIdPath, false)) {
1045          throw new IOException("Unable to remove existing " + lastFlushedSeqIdPath);
1046        }
1047      } else {
1048        LOG.info("Writing .lastflushedseqids file at: " + lastFlushedSeqIdPath);
1049      }
1050      FSDataOutputStream out = fs.create(lastFlushedSeqIdPath);
1051      FlushedSequenceId.Builder flushedSequenceIdBuilder = FlushedSequenceId.newBuilder();
1052      try {
1053        for (Entry<byte[], Long> entry : flushedSequenceIdByRegion.entrySet()) {
1054          FlushedRegionSequenceId.Builder flushedRegionSequenceIdBuilder =
1055            FlushedRegionSequenceId.newBuilder();
1056          flushedRegionSequenceIdBuilder.setRegionEncodedName(ByteString.copyFrom(entry.getKey()));
1057          flushedRegionSequenceIdBuilder.setSeqId(entry.getValue());
1058          ConcurrentNavigableMap<byte[], Long> storeSeqIds =
1059            storeFlushedSequenceIdsByRegion.get(entry.getKey());
1060          if (storeSeqIds != null) {
1061            for (Entry<byte[], Long> store : storeSeqIds.entrySet()) {
1062              FlushedStoreSequenceId.Builder flushedStoreSequenceIdBuilder =
1063                FlushedStoreSequenceId.newBuilder();
1064              flushedStoreSequenceIdBuilder.setFamily(ByteString.copyFrom(store.getKey()));
1065              flushedStoreSequenceIdBuilder.setSeqId(store.getValue());
1066              flushedRegionSequenceIdBuilder.addStores(flushedStoreSequenceIdBuilder);
1067            }
1068          }
1069          flushedSequenceIdBuilder.addRegionSequenceId(flushedRegionSequenceIdBuilder);
1070        }
1071        flushedSequenceIdBuilder.build().writeDelimitedTo(out);
1072      } finally {
1073        if (out != null) {
1074          out.close();
1075        }
1076      }
1077    } finally {
1078      isFlushSeqIdPersistInProgress = false;
1079    }
1080  }
1081
1082  /**
1083   * Load last flushed sequence id of each region from HDFS, if persisted
1084   */
1085  public void loadLastFlushedSequenceIds() throws IOException {
1086    if (!persistFlushedSequenceId) {
1087      return;
1088    }
1089    Configuration conf = master.getConfiguration();
1090    Path rootDir = CommonFSUtils.getRootDir(conf);
1091    Path lastFlushedSeqIdPath = new Path(rootDir, LAST_FLUSHED_SEQ_ID_FILE);
1092    FileSystem fs = FileSystem.get(conf);
1093    if (!fs.exists(lastFlushedSeqIdPath)) {
1094      LOG.info("No .lastflushedseqids found at " + lastFlushedSeqIdPath
1095        + " will record last flushed sequence id"
1096        + " for regions by regionserver report all over again");
1097      return;
1098    } else {
1099      LOG.info("begin to load .lastflushedseqids at " + lastFlushedSeqIdPath);
1100    }
1101    FSDataInputStream in = fs.open(lastFlushedSeqIdPath);
1102    try {
1103      FlushedSequenceId flushedSequenceId = FlushedSequenceId.parseDelimitedFrom(in);
1104      if (flushedSequenceId == null) {
1105        LOG.info(".lastflushedseqids found at {} is empty", lastFlushedSeqIdPath);
1106        return;
1107      }
1108      for (FlushedRegionSequenceId flushedRegionSequenceId : flushedSequenceId
1109        .getRegionSequenceIdList()) {
1110        byte[] encodedRegionName = flushedRegionSequenceId.getRegionEncodedName().toByteArray();
1111        flushedSequenceIdByRegion.putIfAbsent(encodedRegionName,
1112          flushedRegionSequenceId.getSeqId());
1113        if (
1114          flushedRegionSequenceId.getStoresList() != null
1115            && flushedRegionSequenceId.getStoresList().size() != 0
1116        ) {
1117          ConcurrentNavigableMap<byte[], Long> storeFlushedSequenceId =
1118            computeIfAbsent(storeFlushedSequenceIdsByRegion, encodedRegionName,
1119              () -> new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR));
1120          for (FlushedStoreSequenceId flushedStoreSequenceId : flushedRegionSequenceId
1121            .getStoresList()) {
1122            storeFlushedSequenceId.put(flushedStoreSequenceId.getFamily().toByteArray(),
1123              flushedStoreSequenceId.getSeqId());
1124          }
1125        }
1126      }
1127    } finally {
1128      in.close();
1129    }
1130  }
1131
1132  /**
1133   * Regions may have been removed between latest persist of FlushedSequenceIds and master abort. So
1134   * after loading FlushedSequenceIds from file, and after meta loaded, we need to remove the
1135   * deleted region according to RegionStates.
1136   */
1137  public void removeDeletedRegionFromLoadedFlushedSequenceIds() {
1138    RegionStates regionStates = master.getAssignmentManager().getRegionStates();
1139    Iterator<byte[]> it = flushedSequenceIdByRegion.keySet().iterator();
1140    while (it.hasNext()) {
1141      byte[] regionEncodedName = it.next();
1142      if (regionStates.getRegionState(Bytes.toStringBinary(regionEncodedName)) == null) {
1143        it.remove();
1144        storeFlushedSequenceIdsByRegion.remove(regionEncodedName);
1145      }
1146    }
1147  }
1148
1149  private class FlushedSequenceIdFlusher extends ScheduledChore {
1150
1151    public FlushedSequenceIdFlusher(String name, int p) {
1152      super(name, master, p, 60 * 1000); // delay one minute before first execute
1153    }
1154
1155    @Override
1156    protected void chore() {
1157      try {
1158        persistRegionLastFlushedSequenceIds();
1159      } catch (IOException e) {
1160        LOG.debug("Failed to persist last flushed sequence id of regions" + " to file system", e);
1161      }
1162    }
1163  }
1164}