001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.master;
020
021import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
022
023import java.io.IOException;
024import java.net.InetAddress;
025import java.util.ArrayList;
026import java.util.Collections;
027import java.util.Iterator;
028import java.util.List;
029import java.util.Map;
030import java.util.Map.Entry;
031import java.util.Set;
032import java.util.concurrent.ConcurrentNavigableMap;
033import java.util.concurrent.ConcurrentSkipListMap;
034import java.util.concurrent.CopyOnWriteArrayList;
035import java.util.concurrent.atomic.AtomicBoolean;
036import java.util.function.Predicate;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.fs.FSDataInputStream;
039import org.apache.hadoop.fs.FSDataOutputStream;
040import org.apache.hadoop.fs.FileSystem;
041import org.apache.hadoop.fs.Path;
042import org.apache.hadoop.hbase.ClockOutOfSyncException;
043import org.apache.hadoop.hbase.HConstants;
044import org.apache.hadoop.hbase.NotServingRegionException;
045import org.apache.hadoop.hbase.RegionMetrics;
046import org.apache.hadoop.hbase.ScheduledChore;
047import org.apache.hadoop.hbase.ServerMetrics;
048import org.apache.hadoop.hbase.ServerMetricsBuilder;
049import org.apache.hadoop.hbase.ServerName;
050import org.apache.hadoop.hbase.YouAreDeadException;
051import org.apache.hadoop.hbase.client.AsyncClusterConnection;
052import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
053import org.apache.hadoop.hbase.client.RegionInfo;
054import org.apache.hadoop.hbase.ipc.RemoteWithExtrasException;
055import org.apache.hadoop.hbase.master.assignment.RegionStates;
056import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
057import org.apache.hadoop.hbase.monitoring.MonitoredTask;
058import org.apache.hadoop.hbase.procedure2.Procedure;
059import org.apache.hadoop.hbase.util.Bytes;
060import org.apache.hadoop.hbase.util.CommonFSUtils;
061import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
062import org.apache.hadoop.hbase.util.FutureUtils;
063import org.apache.hadoop.hbase.zookeeper.ZKUtil;
064import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
065import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
066import org.apache.yetus.audience.InterfaceAudience;
067import org.apache.zookeeper.KeeperException;
068import org.slf4j.Logger;
069import org.slf4j.LoggerFactory;
070
071import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
072import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
073
074import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
075import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
076import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
077import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
078import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.FlushedRegionSequenceId;
079import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.FlushedSequenceId;
080import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.FlushedStoreSequenceId;
081import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
082
083/**
084 * The ServerManager class manages info about region servers.
085 * <p>
086 * Maintains lists of online and dead servers.  Processes the startups,
087 * shutdowns, and deaths of region servers.
088 * <p>
089 * Servers are distinguished in two different ways.  A given server has a
090 * location, specified by hostname and port, and of which there can only be one
091 * online at any given time.  A server instance is specified by the location
092 * (hostname and port) as well as the startcode (timestamp from when the server
093 * was started).  This is used to differentiate a restarted instance of a given
094 * server from the original instance.
095 * <p>
096 * If a sever is known not to be running any more, it is called dead. The dead
097 * server needs to be handled by a ServerShutdownHandler.  If the handler is not
098 * enabled yet, the server can't be handled right away so it is queued up.
099 * After the handler is enabled, the server will be submitted to a handler to handle.
100 * However, the handler may be just partially enabled.  If so,
101 * the server cannot be fully processed, and be queued up for further processing.
102 * A server is fully processed only after the handler is fully enabled
103 * and has completed the handling.
104 */
105@InterfaceAudience.Private
106public class ServerManager {
107  public static final String WAIT_ON_REGIONSERVERS_MAXTOSTART =
108      "hbase.master.wait.on.regionservers.maxtostart";
109
110  public static final String WAIT_ON_REGIONSERVERS_MINTOSTART =
111      "hbase.master.wait.on.regionservers.mintostart";
112
113  public static final String WAIT_ON_REGIONSERVERS_TIMEOUT =
114      "hbase.master.wait.on.regionservers.timeout";
115
116  public static final String WAIT_ON_REGIONSERVERS_INTERVAL =
117      "hbase.master.wait.on.regionservers.interval";
118
119  /**
120   * see HBASE-20727
121   * if set to true, flushedSequenceIdByRegion and storeFlushedSequenceIdsByRegion
122   * will be persisted to HDFS and loaded when master restart to speed up log split
123   */
124  public static final String PERSIST_FLUSHEDSEQUENCEID =
125      "hbase.master.persist.flushedsequenceid.enabled";
126
127  public static final boolean PERSIST_FLUSHEDSEQUENCEID_DEFAULT = true;
128
129  public static final String FLUSHEDSEQUENCEID_FLUSHER_INTERVAL =
130      "hbase.master.flushedsequenceid.flusher.interval";
131
132  public static final int FLUSHEDSEQUENCEID_FLUSHER_INTERVAL_DEFAULT =
133      3 * 60 * 60 * 1000; // 3 hours
134
135  public static final String MAX_CLOCK_SKEW_MS = "hbase.master.maxclockskew";
136
137  private static final Logger LOG = LoggerFactory.getLogger(ServerManager.class);
138
139  // Set if we are to shutdown the cluster.
140  private AtomicBoolean clusterShutdown = new AtomicBoolean(false);
141
142  /**
143   * The last flushed sequence id for a region.
144   */
145  private final ConcurrentNavigableMap<byte[], Long> flushedSequenceIdByRegion =
146    new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
147
148  private boolean persistFlushedSequenceId = true;
149  private volatile boolean isFlushSeqIdPersistInProgress = false;
150  /** File on hdfs to store last flushed sequence id of regions */
151  private static final String LAST_FLUSHED_SEQ_ID_FILE = ".lastflushedseqids";
152  private  FlushedSequenceIdFlusher flushedSeqIdFlusher;
153
154
155  /**
156   * The last flushed sequence id for a store in a region.
157   */
158  private final ConcurrentNavigableMap<byte[], ConcurrentNavigableMap<byte[], Long>>
159    storeFlushedSequenceIdsByRegion = new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
160
161  /** Map of registered servers to their current load */
162  private final ConcurrentNavigableMap<ServerName, ServerMetrics> onlineServers =
163    new ConcurrentSkipListMap<>();
164
165  /** List of region servers that should not get any more new regions. */
166  private final ArrayList<ServerName> drainingServers = new ArrayList<>();
167
168  private final MasterServices master;
169
170  private final DeadServer deadservers = new DeadServer();
171
172  private final long maxSkew;
173  private final long warningSkew;
174
175  /** Listeners that are called on server events. */
176  private List<ServerListener> listeners = new CopyOnWriteArrayList<>();
177
178  /**
179   * Constructor.
180   */
181  public ServerManager(final MasterServices master) {
182    this.master = master;
183    Configuration c = master.getConfiguration();
184    maxSkew = c.getLong(MAX_CLOCK_SKEW_MS, 30000);
185    warningSkew = c.getLong("hbase.master.warningclockskew", 10000);
186    persistFlushedSequenceId = c.getBoolean(PERSIST_FLUSHEDSEQUENCEID,
187        PERSIST_FLUSHEDSEQUENCEID_DEFAULT);
188  }
189
190  /**
191   * Add the listener to the notification list.
192   * @param listener The ServerListener to register
193   */
194  public void registerListener(final ServerListener listener) {
195    this.listeners.add(listener);
196  }
197
198  /**
199   * Remove the listener from the notification list.
200   * @param listener The ServerListener to unregister
201   */
202  public boolean unregisterListener(final ServerListener listener) {
203    return this.listeners.remove(listener);
204  }
205
206  /**
207   * Let the server manager know a new regionserver has come online
208   * @param request the startup request
209   * @param versionNumber the version number of the new regionserver
210   * @param version the version of the new regionserver, could contain strings like "SNAPSHOT"
211   * @param ia the InetAddress from which request is received
212   * @return The ServerName we know this server as.
213   * @throws IOException
214   */
215  ServerName regionServerStartup(RegionServerStartupRequest request, int versionNumber,
216      String version, InetAddress ia) throws IOException {
217    // Test for case where we get a region startup message from a regionserver
218    // that has been quickly restarted but whose znode expiration handler has
219    // not yet run, or from a server whose fail we are currently processing.
220    // Test its host+port combo is present in serverAddressToServerInfo. If it
221    // is, reject the server and trigger its expiration. The next time it comes
222    // in, it should have been removed from serverAddressToServerInfo and queued
223    // for processing by ProcessServerShutdown.
224
225    final String hostname =
226      request.hasUseThisHostnameInstead() ? request.getUseThisHostnameInstead() : ia.getHostName();
227    ServerName sn = ServerName.valueOf(hostname, request.getPort(), request.getServerStartCode());
228    checkClockSkew(sn, request.getServerCurrentTime());
229    checkIsDead(sn, "STARTUP");
230    if (!checkAndRecordNewServer(sn, ServerMetricsBuilder.of(sn, versionNumber, version))) {
231      LOG.warn(
232        "THIS SHOULD NOT HAPPEN, RegionServerStartup" + " could not record the server: " + sn);
233    }
234    return sn;
235  }
236
237  /**
238   * Updates last flushed sequence Ids for the regions on server sn
239   * @param sn
240   * @param hsl
241   */
242  private void updateLastFlushedSequenceIds(ServerName sn, ServerMetrics hsl) {
243    for (Entry<byte[], RegionMetrics> entry : hsl.getRegionMetrics().entrySet()) {
244      byte[] encodedRegionName = Bytes.toBytes(RegionInfo.encodeRegionName(entry.getKey()));
245      Long existingValue = flushedSequenceIdByRegion.get(encodedRegionName);
246      long l = entry.getValue().getCompletedSequenceId();
247      // Don't let smaller sequence ids override greater sequence ids.
248      if (LOG.isTraceEnabled()) {
249        LOG.trace(Bytes.toString(encodedRegionName) + ", existingValue=" + existingValue +
250          ", completeSequenceId=" + l);
251      }
252      if (existingValue == null || (l != HConstants.NO_SEQNUM && l > existingValue)) {
253        flushedSequenceIdByRegion.put(encodedRegionName, l);
254      } else if (l != HConstants.NO_SEQNUM && l < existingValue) {
255        LOG.warn("RegionServer " + sn + " indicates a last flushed sequence id ("
256            + l + ") that is less than the previous last flushed sequence id ("
257            + existingValue + ") for region " + Bytes.toString(entry.getKey()) + " Ignoring.");
258      }
259      ConcurrentNavigableMap<byte[], Long> storeFlushedSequenceId =
260          computeIfAbsent(storeFlushedSequenceIdsByRegion, encodedRegionName,
261            () -> new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR));
262      for (Entry<byte[], Long> storeSeqId : entry.getValue().getStoreSequenceId().entrySet()) {
263        byte[] family = storeSeqId.getKey();
264        existingValue = storeFlushedSequenceId.get(family);
265        l = storeSeqId.getValue();
266        if (LOG.isTraceEnabled()) {
267          LOG.trace(Bytes.toString(encodedRegionName) + ", family=" + Bytes.toString(family) +
268            ", existingValue=" + existingValue + ", completeSequenceId=" + l);
269        }
270        // Don't let smaller sequence ids override greater sequence ids.
271        if (existingValue == null || (l != HConstants.NO_SEQNUM && l > existingValue.longValue())) {
272          storeFlushedSequenceId.put(family, l);
273        }
274      }
275    }
276  }
277
278  public void regionServerReport(ServerName sn,
279    ServerMetrics sl) throws YouAreDeadException {
280    checkIsDead(sn, "REPORT");
281    if (null == this.onlineServers.replace(sn, sl)) {
282      // Already have this host+port combo and its just different start code?
283      // Just let the server in. Presume master joining a running cluster.
284      // recordNewServer is what happens at the end of reportServerStartup.
285      // The only thing we are skipping is passing back to the regionserver
286      // the ServerName to use. Here we presume a master has already done
287      // that so we'll press on with whatever it gave us for ServerName.
288      if (!checkAndRecordNewServer(sn, sl)) {
289        LOG.info("RegionServerReport ignored, could not record the server: " + sn);
290        return; // Not recorded, so no need to move on
291      }
292    }
293    updateLastFlushedSequenceIds(sn, sl);
294  }
295
296  /**
297   * Check is a server of same host and port already exists,
298   * if not, or the existed one got a smaller start code, record it.
299   *
300   * @param serverName the server to check and record
301   * @param sl the server load on the server
302   * @return true if the server is recorded, otherwise, false
303   */
304  boolean checkAndRecordNewServer(final ServerName serverName, final ServerMetrics sl) {
305    ServerName existingServer = null;
306    synchronized (this.onlineServers) {
307      existingServer = findServerWithSameHostnamePortWithLock(serverName);
308      if (existingServer != null && (existingServer.getStartcode() > serverName.getStartcode())) {
309        LOG.info("Server serverName=" + serverName + " rejected; we already have "
310            + existingServer.toString() + " registered with same hostname and port");
311        return false;
312      }
313      recordNewServerWithLock(serverName, sl);
314    }
315
316    // Tell our listeners that a server was added
317    if (!this.listeners.isEmpty()) {
318      for (ServerListener listener : this.listeners) {
319        listener.serverAdded(serverName);
320      }
321    }
322
323    // Note that we assume that same ts means same server, and don't expire in that case.
324    //  TODO: ts can theoretically collide due to clock shifts, so this is a bit hacky.
325    if (existingServer != null &&
326        (existingServer.getStartcode() < serverName.getStartcode())) {
327      LOG.info("Triggering server recovery; existingServer " +
328          existingServer + " looks stale, new server:" + serverName);
329      expireServer(existingServer);
330    }
331    return true;
332  }
333
334  /**
335   * Find out the region servers crashed between the crash of the previous master instance and the
336   * current master instance and schedule SCP for them.
337   * <p/>
338   * Since the {@code RegionServerTracker} has already helped us to construct the online servers set
339   * by scanning zookeeper, now we can compare the online servers with {@code liveServersFromWALDir}
340   * to find out whether there are servers which are already dead.
341   * <p/>
342   * Must be called inside the initialization method of {@code RegionServerTracker} to avoid
343   * concurrency issue.
344   * @param deadServersFromPE the region servers which already have a SCP associated.
345   * @param liveServersFromWALDir the live region servers from wal directory.
346   */
347  void findDeadServersAndProcess(Set<ServerName> deadServersFromPE,
348      Set<ServerName> liveServersFromWALDir) {
349    deadServersFromPE.forEach(deadservers::putIfAbsent);
350    liveServersFromWALDir.stream().filter(sn -> !onlineServers.containsKey(sn))
351      .forEach(this::expireServer);
352  }
353
354  /**
355   * Checks if the clock skew between the server and the master. If the clock skew exceeds the
356   * configured max, it will throw an exception; if it exceeds the configured warning threshold,
357   * it will log a warning but start normally.
358   * @param serverName Incoming servers's name
359   * @param serverCurrentTime
360   * @throws ClockOutOfSyncException if the skew exceeds the configured max value
361   */
362  private void checkClockSkew(final ServerName serverName, final long serverCurrentTime)
363      throws ClockOutOfSyncException {
364    long skew = Math.abs(EnvironmentEdgeManager.currentTime() - serverCurrentTime);
365    if (skew > maxSkew) {
366      String message = "Server " + serverName + " has been " +
367        "rejected; Reported time is too far out of sync with master.  " +
368        "Time difference of " + skew + "ms > max allowed of " + maxSkew + "ms";
369      LOG.warn(message);
370      throw new ClockOutOfSyncException(message);
371    } else if (skew > warningSkew){
372      String message = "Reported time for server " + serverName + " is out of sync with master " +
373        "by " + skew + "ms. (Warning threshold is " + warningSkew + "ms; " +
374        "error threshold is " + maxSkew + "ms)";
375      LOG.warn(message);
376    }
377  }
378
379  /**
380   * Called when RegionServer first reports in for duty and thereafter each
381   * time it heartbeats to make sure it is has not been figured for dead.
382   * If this server is on the dead list, reject it with a YouAreDeadException.
383   * If it was dead but came back with a new start code, remove the old entry
384   * from the dead list.
385   * @param what START or REPORT
386   */
387  private void checkIsDead(final ServerName serverName, final String what)
388      throws YouAreDeadException {
389    if (this.deadservers.isDeadServer(serverName)) {
390      // Exact match: host name, port and start code all match with existing one of the
391      // dead servers. So, this server must be dead. Tell it to kill itself.
392      String message = "Server " + what + " rejected; currently processing " +
393          serverName + " as dead server";
394      LOG.debug(message);
395      throw new YouAreDeadException(message);
396    }
397    // Remove dead server with same hostname and port of newly checking in rs after master
398    // initialization. See HBASE-5916 for more information.
399    if ((this.master == null || this.master.isInitialized()) &&
400        this.deadservers.cleanPreviousInstance(serverName)) {
401      // This server has now become alive after we marked it as dead.
402      // We removed it's previous entry from the dead list to reflect it.
403      LOG.debug("{} {} came back up, removed it from the dead servers list", what, serverName);
404    }
405  }
406
407  /**
408   * Assumes onlineServers is locked.
409   * @return ServerName with matching hostname and port.
410   */
411  private ServerName findServerWithSameHostnamePortWithLock(
412      final ServerName serverName) {
413    ServerName end = ServerName.valueOf(serverName.getHostname(), serverName.getPort(),
414        Long.MAX_VALUE);
415
416    ServerName r = onlineServers.lowerKey(end);
417    if (r != null) {
418      if (ServerName.isSameAddress(r, serverName)) {
419        return r;
420      }
421    }
422    return null;
423  }
424
425  /**
426   * Adds the onlineServers list. onlineServers should be locked.
427   * @param serverName The remote servers name.
428   */
429  void recordNewServerWithLock(final ServerName serverName, final ServerMetrics sl) {
430    LOG.info("Registering regionserver=" + serverName);
431    this.onlineServers.put(serverName, sl);
432  }
433
434  public ConcurrentNavigableMap<byte[], Long> getFlushedSequenceIdByRegion() {
435    return flushedSequenceIdByRegion;
436  }
437
438  public RegionStoreSequenceIds getLastFlushedSequenceId(byte[] encodedRegionName) {
439    RegionStoreSequenceIds.Builder builder = RegionStoreSequenceIds.newBuilder();
440    Long seqId = flushedSequenceIdByRegion.get(encodedRegionName);
441    builder.setLastFlushedSequenceId(seqId != null ? seqId.longValue() : HConstants.NO_SEQNUM);
442    Map<byte[], Long> storeFlushedSequenceId =
443        storeFlushedSequenceIdsByRegion.get(encodedRegionName);
444    if (storeFlushedSequenceId != null) {
445      for (Map.Entry<byte[], Long> entry : storeFlushedSequenceId.entrySet()) {
446        builder.addStoreSequenceId(StoreSequenceId.newBuilder()
447            .setFamilyName(UnsafeByteOperations.unsafeWrap(entry.getKey()))
448            .setSequenceId(entry.getValue().longValue()).build());
449      }
450    }
451    return builder.build();
452  }
453
454  /**
455   * @param serverName
456   * @return ServerMetrics if serverName is known else null
457   */
458  public ServerMetrics getLoad(final ServerName serverName) {
459    return this.onlineServers.get(serverName);
460  }
461
462  /**
463   * Compute the average load across all region servers.
464   * Currently, this uses a very naive computation - just uses the number of
465   * regions being served, ignoring stats about number of requests.
466   * @return the average load
467   */
468  public double getAverageLoad() {
469    int totalLoad = 0;
470    int numServers = 0;
471    for (ServerMetrics sl : this.onlineServers.values()) {
472      numServers++;
473      totalLoad += sl.getRegionMetrics().size();
474    }
475    return numServers == 0 ? 0 :
476      (double)totalLoad / (double)numServers;
477  }
478
479  /** @return the count of active regionservers */
480  public int countOfRegionServers() {
481    // Presumes onlineServers is a concurrent map
482    return this.onlineServers.size();
483  }
484
485  /**
486   * @return Read-only map of servers to serverinfo
487   */
488  public Map<ServerName, ServerMetrics> getOnlineServers() {
489    // Presumption is that iterating the returned Map is OK.
490    synchronized (this.onlineServers) {
491      return Collections.unmodifiableMap(this.onlineServers);
492    }
493  }
494
495  public DeadServer getDeadServers() {
496    return this.deadservers;
497  }
498
499  /**
500   * Checks if any dead servers are currently in progress.
501   * @return true if any RS are being processed as dead, false if not
502   */
503  public boolean areDeadServersInProgress() throws IOException {
504    return master.getProcedures().stream()
505      .anyMatch(p -> !p.isFinished() && p instanceof ServerCrashProcedure);
506  }
507
508  void letRegionServersShutdown() {
509    long previousLogTime = 0;
510    ServerName sn = master.getServerName();
511    ZKWatcher zkw = master.getZooKeeper();
512    int onlineServersCt;
513    while ((onlineServersCt = onlineServers.size()) > 0){
514
515      if (System.currentTimeMillis() > (previousLogTime + 1000)) {
516        Set<ServerName> remainingServers = onlineServers.keySet();
517        synchronized (onlineServers) {
518          if (remainingServers.size() == 1 && remainingServers.contains(sn)) {
519            // Master will delete itself later.
520            return;
521          }
522        }
523        StringBuilder sb = new StringBuilder();
524        // It's ok here to not sync on onlineServers - merely logging
525        for (ServerName key : remainingServers) {
526          if (sb.length() > 0) {
527            sb.append(", ");
528          }
529          sb.append(key);
530        }
531        LOG.info("Waiting on regionserver(s) " + sb.toString());
532        previousLogTime = System.currentTimeMillis();
533      }
534
535      try {
536        List<String> servers = getRegionServersInZK(zkw);
537        if (servers == null || servers.isEmpty() || (servers.size() == 1
538            && servers.contains(sn.toString()))) {
539          LOG.info("ZK shows there is only the master self online, exiting now");
540          // Master could have lost some ZK events, no need to wait more.
541          break;
542        }
543      } catch (KeeperException ke) {
544        LOG.warn("Failed to list regionservers", ke);
545        // ZK is malfunctioning, don't hang here
546        break;
547      }
548      synchronized (onlineServers) {
549        try {
550          if (onlineServersCt == onlineServers.size()) onlineServers.wait(100);
551        } catch (InterruptedException ignored) {
552          // continue
553        }
554      }
555    }
556  }
557
558  private List<String> getRegionServersInZK(final ZKWatcher zkw)
559  throws KeeperException {
560    return ZKUtil.listChildrenNoWatch(zkw, zkw.getZNodePaths().rsZNode);
561  }
562
563  /**
564   * Expire the passed server. Add it to list of dead servers and queue a shutdown processing.
565   * @return pid if we queued a ServerCrashProcedure else {@link Procedure#NO_PROC_ID} if we did
566   *         not (could happen for many reasons including the fact that its this server that is
567   *         going down or we already have queued an SCP for this server or SCP processing is
568   *         currently disabled because we are in startup phase).
569   */
570  // Redo test so we can make this protected.
571  public synchronized long expireServer(final ServerName serverName) {
572    return expireServer(serverName, false);
573
574  }
575
576  synchronized long expireServer(final ServerName serverName, boolean force) {
577    // THIS server is going down... can't handle our own expiration.
578    if (serverName.equals(master.getServerName())) {
579      if (!(master.isAborted() || master.isStopped())) {
580        master.stop("We lost our znode?");
581      }
582      return Procedure.NO_PROC_ID;
583    }
584    if (this.deadservers.isDeadServer(serverName)) {
585      LOG.warn("Expiration called on {} but already in DeadServer", serverName);
586      return Procedure.NO_PROC_ID;
587    }
588    moveFromOnlineToDeadServers(serverName);
589
590    // If server is in draining mode, remove corresponding znode
591    // In some tests, the mocked HM may not have ZK Instance, hence null check
592    if (master.getZooKeeper() != null) {
593      String drainingZnode = ZNodePaths
594        .joinZNode(master.getZooKeeper().getZNodePaths().drainingZNode, serverName.getServerName());
595      try {
596        ZKUtil.deleteNodeFailSilent(master.getZooKeeper(), drainingZnode);
597      } catch (KeeperException e) {
598        LOG.warn("Error deleting the draining znode for stopping server " + serverName.getServerName(), e);
599      }
600    }
601    
602    // If cluster is going down, yes, servers are going to be expiring; don't
603    // process as a dead server
604    if (isClusterShutdown()) {
605      LOG.info("Cluster shutdown set; " + serverName +
606        " expired; onlineServers=" + this.onlineServers.size());
607      if (this.onlineServers.isEmpty()) {
608        master.stop("Cluster shutdown set; onlineServer=0");
609      }
610      return Procedure.NO_PROC_ID;
611    }
612    LOG.info("Processing expiration of " + serverName + " on " + this.master.getServerName());
613    long pid = master.getAssignmentManager().submitServerCrash(serverName, true, force);
614    // Tell our listeners that a server was removed
615    if (!this.listeners.isEmpty()) {
616      this.listeners.stream().forEach(l -> l.serverRemoved(serverName));
617    }
618    // trigger a persist of flushedSeqId
619    if (flushedSeqIdFlusher != null) {
620      flushedSeqIdFlusher.triggerNow();
621    }
622    return pid;
623  }
624
625  /**
626   * Called when server has expired.
627   */
628  // Locking in this class needs cleanup.
629  public synchronized void moveFromOnlineToDeadServers(final ServerName sn) {
630    synchronized (this.onlineServers) {
631      boolean online = this.onlineServers.containsKey(sn);
632      if (online) {
633        // Remove the server from the known servers lists and update load info BUT
634        // add to deadservers first; do this so it'll show in dead servers list if
635        // not in online servers list.
636        this.deadservers.putIfAbsent(sn);
637        this.onlineServers.remove(sn);
638        onlineServers.notifyAll();
639      } else {
640        // If not online, that is odd but may happen if 'Unknown Servers' -- where meta
641        // has references to servers not online nor in dead servers list. If
642        // 'Unknown Server', don't add to DeadServers else will be there for ever.
643        LOG.trace("Expiration of {} but server not online", sn);
644      }
645    }
646  }
647
648  /*
649   * Remove the server from the drain list.
650   */
651  public synchronized boolean removeServerFromDrainList(final ServerName sn) {
652    // Warn if the server (sn) is not online.  ServerName is of the form:
653    // <hostname> , <port> , <startcode>
654
655    if (!this.isServerOnline(sn)) {
656      LOG.warn("Server " + sn + " is not currently online. " +
657               "Removing from draining list anyway, as requested.");
658    }
659    // Remove the server from the draining servers lists.
660    return this.drainingServers.remove(sn);
661  }
662
663  /**
664   * Add the server to the drain list.
665   * @param sn
666   * @return True if the server is added or the server is already on the drain list.
667   */
668  public synchronized boolean addServerToDrainList(final ServerName sn) {
669    // Warn if the server (sn) is not online.  ServerName is of the form:
670    // <hostname> , <port> , <startcode>
671
672    if (!this.isServerOnline(sn)) {
673      LOG.warn("Server " + sn + " is not currently online. " +
674               "Ignoring request to add it to draining list.");
675      return false;
676    }
677    // Add the server to the draining servers lists, if it's not already in
678    // it.
679    if (this.drainingServers.contains(sn)) {
680      LOG.warn("Server " + sn + " is already in the draining server list." +
681               "Ignoring request to add it again.");
682      return true;
683    }
684    LOG.info("Server " + sn + " added to draining server list.");
685    return this.drainingServers.add(sn);
686  }
687
688  /**
689   * Contacts a region server and waits up to timeout ms
690   * to close the region.  This bypasses the active hmaster.
691   * Pass -1 as timeout if you do not want to wait on result.
692   */
693  public static void closeRegionSilentlyAndWait(AsyncClusterConnection connection,
694      ServerName server, RegionInfo region, long timeout) throws IOException, InterruptedException {
695    AsyncRegionServerAdmin admin = connection.getRegionServerAdmin(server);
696    try {
697      FutureUtils.get(
698        admin.closeRegion(ProtobufUtil.buildCloseRegionRequest(server, region.getRegionName())));
699    } catch (IOException e) {
700      LOG.warn("Exception when closing region: " + region.getRegionNameAsString(), e);
701    }
702    if (timeout < 0) {
703      return;
704    }
705    long expiration = timeout + System.currentTimeMillis();
706    while (System.currentTimeMillis() < expiration) {
707      try {
708        RegionInfo rsRegion = ProtobufUtil.toRegionInfo(FutureUtils
709          .get(
710            admin.getRegionInfo(RequestConverter.buildGetRegionInfoRequest(region.getRegionName())))
711          .getRegionInfo());
712        if (rsRegion == null) {
713          return;
714        }
715      } catch (IOException ioe) {
716        if (ioe instanceof NotServingRegionException ||
717          (ioe instanceof RemoteWithExtrasException &&
718            ((RemoteWithExtrasException)ioe).unwrapRemoteException()
719              instanceof NotServingRegionException)) {
720          // no need to retry again
721          return;
722        }
723        LOG.warn("Exception when retrieving regioninfo from: " + region.getRegionNameAsString(),
724          ioe);
725      }
726      Thread.sleep(1000);
727    }
728    throw new IOException("Region " + region + " failed to close within" + " timeout " + timeout);
729  }
730
731  /**
732   * Calculate min necessary to start. This is not an absolute. It is just
733   * a friction that will cause us hang around a bit longer waiting on
734   * RegionServers to check-in.
735   */
736  private int getMinToStart() {
737    if (master.isInMaintenanceMode()) {
738      // If in maintenance mode, then master hosting meta will be the only server available
739      return 1;
740    }
741
742    int minimumRequired = 1;
743    if (LoadBalancer.isTablesOnMaster(master.getConfiguration()) &&
744        LoadBalancer.isSystemTablesOnlyOnMaster(master.getConfiguration())) {
745      // If Master is carrying regions it will show up as a 'server', but is not handling user-
746      // space regions, so we need a second server.
747      minimumRequired = 2;
748    }
749
750    int minToStart = this.master.getConfiguration().getInt(WAIT_ON_REGIONSERVERS_MINTOSTART, -1);
751    // Ensure we are never less than minimumRequired else stuff won't work.
752    return Math.max(minToStart, minimumRequired);
753  }
754
755  /**
756   * Wait for the region servers to report in.
757   * We will wait until one of this condition is met:
758   *  - the master is stopped
759   *  - the 'hbase.master.wait.on.regionservers.maxtostart' number of
760   *    region servers is reached
761   *  - the 'hbase.master.wait.on.regionservers.mintostart' is reached AND
762   *   there have been no new region server in for
763   *      'hbase.master.wait.on.regionservers.interval' time AND
764   *   the 'hbase.master.wait.on.regionservers.timeout' is reached
765   *
766   * @throws InterruptedException
767   */
768  public void waitForRegionServers(MonitoredTask status) throws InterruptedException {
769    final long interval = this.master.getConfiguration().
770        getLong(WAIT_ON_REGIONSERVERS_INTERVAL, 1500);
771    final long timeout = this.master.getConfiguration().
772        getLong(WAIT_ON_REGIONSERVERS_TIMEOUT, 4500);
773    // Min is not an absolute; just a friction making us wait longer on server checkin.
774    int minToStart = getMinToStart();
775    int maxToStart = this.master.getConfiguration().
776        getInt(WAIT_ON_REGIONSERVERS_MAXTOSTART, Integer.MAX_VALUE);
777    if (maxToStart < minToStart) {
778      LOG.warn(String.format("The value of '%s' (%d) is set less than '%s' (%d), ignoring.",
779          WAIT_ON_REGIONSERVERS_MAXTOSTART, maxToStart,
780          WAIT_ON_REGIONSERVERS_MINTOSTART, minToStart));
781      maxToStart = Integer.MAX_VALUE;
782    }
783
784    long now =  System.currentTimeMillis();
785    final long startTime = now;
786    long slept = 0;
787    long lastLogTime = 0;
788    long lastCountChange = startTime;
789    int count = countOfRegionServers();
790    int oldCount = 0;
791    // This while test is a little hard to read. We try to comment it in below but in essence:
792    // Wait if Master is not stopped and the number of regionservers that have checked-in is
793    // less than the maxToStart. Both of these conditions will be true near universally.
794    // Next, we will keep cycling if ANY of the following three conditions are true:
795    // 1. The time since a regionserver registered is < interval (means servers are actively checking in).
796    // 2. We are under the total timeout.
797    // 3. The count of servers is < minimum.
798    for (ServerListener listener: this.listeners) {
799      listener.waiting();
800    }
801    while (!this.master.isStopped() && !isClusterShutdown() && count < maxToStart &&
802        ((lastCountChange + interval) > now || timeout > slept || count < minToStart)) {
803      // Log some info at every interval time or if there is a change
804      if (oldCount != count || lastLogTime + interval < now) {
805        lastLogTime = now;
806        String msg =
807            "Waiting on regionserver count=" + count + "; waited="+
808                slept + "ms, expecting min=" + minToStart + " server(s), max="+ getStrForMax(maxToStart) +
809                " server(s), " + "timeout=" + timeout + "ms, lastChange=" + (lastCountChange - now) + "ms";
810        LOG.info(msg);
811        status.setStatus(msg);
812      }
813
814      // We sleep for some time
815      final long sleepTime = 50;
816      Thread.sleep(sleepTime);
817      now =  System.currentTimeMillis();
818      slept = now - startTime;
819
820      oldCount = count;
821      count = countOfRegionServers();
822      if (count != oldCount) {
823        lastCountChange = now;
824      }
825    }
826    // Did we exit the loop because cluster is going down?
827    if (isClusterShutdown()) {
828      this.master.stop("Cluster shutdown");
829    }
830    LOG.info("Finished waiting on RegionServer count=" + count + "; waited=" + slept + "ms," +
831        " expected min=" + minToStart + " server(s), max=" +  getStrForMax(maxToStart) + " server(s),"+
832        " master is "+ (this.master.isStopped() ? "stopped.": "running"));
833  }
834
835  private String getStrForMax(final int max) {
836    return max == Integer.MAX_VALUE? "NO_LIMIT": Integer.toString(max);
837  }
838
839  /**
840   * @return A copy of the internal list of online servers.
841   */
842  public List<ServerName> getOnlineServersList() {
843    // TODO: optimize the load balancer call so we don't need to make a new list
844    // TODO: FIX. THIS IS POPULAR CALL.
845    return new ArrayList<>(this.onlineServers.keySet());
846  }
847
848  /**
849   * @param keys The target server name
850   * @param idleServerPredicator Evaluates the server on the given load
851   * @return A copy of the internal list of online servers matched by the predicator
852   */
853  public List<ServerName> getOnlineServersListWithPredicator(List<ServerName> keys,
854    Predicate<ServerMetrics> idleServerPredicator) {
855    List<ServerName> names = new ArrayList<>();
856    if (keys != null && idleServerPredicator != null) {
857      keys.forEach(name -> {
858        ServerMetrics load = onlineServers.get(name);
859        if (load != null) {
860          if (idleServerPredicator.test(load)) {
861            names.add(name);
862          }
863        }
864      });
865    }
866    return names;
867  }
868
869  /**
870   * @return A copy of the internal list of draining servers.
871   */
872  public List<ServerName> getDrainingServersList() {
873    return new ArrayList<>(this.drainingServers);
874  }
875
876  public boolean isServerOnline(ServerName serverName) {
877    return serverName != null && onlineServers.containsKey(serverName);
878  }
879
880  public enum ServerLiveState {
881    LIVE,
882    DEAD,
883    UNKNOWN
884  }
885
886  /**
887   * @return whether the server is online, dead, or unknown.
888   */
889  public synchronized ServerLiveState isServerKnownAndOnline(ServerName serverName) {
890    return onlineServers.containsKey(serverName) ? ServerLiveState.LIVE
891      : (deadservers.isDeadServer(serverName) ? ServerLiveState.DEAD : ServerLiveState.UNKNOWN);
892  }
893
894  /**
895   * Check if a server is known to be dead.  A server can be online,
896   * or known to be dead, or unknown to this manager (i.e, not online,
897   * not known to be dead either; it is simply not tracked by the
898   * master any more, for example, a very old previous instance).
899   */
900  public synchronized boolean isServerDead(ServerName serverName) {
901    return serverName == null || deadservers.isDeadServer(serverName);
902  }
903
904  public void shutdownCluster() {
905    String statusStr = "Cluster shutdown requested of master=" + this.master.getServerName();
906    LOG.info(statusStr);
907    this.clusterShutdown.set(true);
908    if (onlineServers.isEmpty()) {
909      // we do not synchronize here so this may cause a double stop, but not a big deal
910      master.stop("OnlineServer=0 right after cluster shutdown set");
911    }
912  }
913
914  public boolean isClusterShutdown() {
915    return this.clusterShutdown.get();
916  }
917
918  /**
919   * start chore in ServerManager
920   */
921  public void startChore() {
922    Configuration c = master.getConfiguration();
923    if (persistFlushedSequenceId) {
924      // when reach here, RegionStates should loaded, firstly, we call remove deleted regions
925      removeDeletedRegionFromLoadedFlushedSequenceIds();
926      int flushPeriod = c.getInt(FLUSHEDSEQUENCEID_FLUSHER_INTERVAL,
927          FLUSHEDSEQUENCEID_FLUSHER_INTERVAL_DEFAULT);
928      flushedSeqIdFlusher = new FlushedSequenceIdFlusher(
929          "FlushedSequenceIdFlusher", flushPeriod);
930      master.getChoreService().scheduleChore(flushedSeqIdFlusher);
931    }
932  }
933
934  /**
935   * Stop the ServerManager.
936   */
937  public void stop() {
938    if (flushedSeqIdFlusher != null) {
939      flushedSeqIdFlusher.cancel();
940    }
941    if (persistFlushedSequenceId) {
942      try {
943        persistRegionLastFlushedSequenceIds();
944      } catch (IOException e) {
945        LOG.warn("Failed to persist last flushed sequence id of regions"
946            + " to file system", e);
947      }
948    }
949  }
950
951  /**
952   * Creates a list of possible destinations for a region. It contains the online servers, but not
953   *  the draining or dying servers.
954   *  @param serversToExclude can be null if there is no server to exclude
955   */
956  public List<ServerName> createDestinationServersList(final List<ServerName> serversToExclude){
957    final List<ServerName> destServers = getOnlineServersList();
958
959    if (serversToExclude != null) {
960      destServers.removeAll(serversToExclude);
961    }
962
963    // Loop through the draining server list and remove them from the server list
964    final List<ServerName> drainingServersCopy = getDrainingServersList();
965    destServers.removeAll(drainingServersCopy);
966
967    return destServers;
968  }
969
970  /**
971   * Calls {@link #createDestinationServersList} without server to exclude.
972   */
973  public List<ServerName> createDestinationServersList(){
974    return createDestinationServersList(null);
975  }
976
977  /**
978   * To clear any dead server with same host name and port of any online server
979   */
980  void clearDeadServersWithSameHostNameAndPortOfOnlineServer() {
981    for (ServerName serverName : getOnlineServersList()) {
982      deadservers.cleanAllPreviousInstances(serverName);
983    }
984  }
985
986  /**
987   * Called by delete table and similar to notify the ServerManager that a region was removed.
988   */
989  public void removeRegion(final RegionInfo regionInfo) {
990    final byte[] encodedName = regionInfo.getEncodedNameAsBytes();
991    storeFlushedSequenceIdsByRegion.remove(encodedName);
992    flushedSequenceIdByRegion.remove(encodedName);
993  }
994
995  public boolean isRegionInServerManagerStates(final RegionInfo hri) {
996    final byte[] encodedName = hri.getEncodedNameAsBytes();
997    return (storeFlushedSequenceIdsByRegion.containsKey(encodedName)
998        || flushedSequenceIdByRegion.containsKey(encodedName));
999  }
1000
1001  /**
1002   * Called by delete table and similar to notify the ServerManager that a region was removed.
1003   */
1004  public void removeRegions(final List<RegionInfo> regions) {
1005    for (RegionInfo hri: regions) {
1006      removeRegion(hri);
1007    }
1008  }
1009
1010  /**
1011   * May return 0 when server is not online.
1012   */
1013  public int getVersionNumber(ServerName serverName) {
1014    ServerMetrics serverMetrics = onlineServers.get(serverName);
1015    return serverMetrics != null ? serverMetrics.getVersionNumber() : 0;
1016  }
1017
1018  /**
1019   * May return "0.0.0" when server is not online
1020   */
1021  public String getVersion(ServerName serverName) {
1022    ServerMetrics serverMetrics = onlineServers.get(serverName);
1023    return serverMetrics != null ? serverMetrics.getVersion() : "0.0.0";
1024  }
1025
1026  public int getInfoPort(ServerName serverName) {
1027    ServerMetrics serverMetrics = onlineServers.get(serverName);
1028    return serverMetrics != null ? serverMetrics.getInfoServerPort() : 0;
1029  }
1030
1031  /**
1032   * Persist last flushed sequence id of each region to HDFS
1033   * @throws IOException if persit to HDFS fails
1034   */
1035  private void persistRegionLastFlushedSequenceIds() throws IOException {
1036    if (isFlushSeqIdPersistInProgress) {
1037      return;
1038    }
1039    isFlushSeqIdPersistInProgress = true;
1040    try {
1041      Configuration conf = master.getConfiguration();
1042      Path rootDir = CommonFSUtils.getRootDir(conf);
1043      Path lastFlushedSeqIdPath = new Path(rootDir, LAST_FLUSHED_SEQ_ID_FILE);
1044      FileSystem fs = FileSystem.get(conf);
1045      if (fs.exists(lastFlushedSeqIdPath)) {
1046        LOG.info("Rewriting .lastflushedseqids file at: "
1047            + lastFlushedSeqIdPath);
1048        if (!fs.delete(lastFlushedSeqIdPath, false)) {
1049          throw new IOException("Unable to remove existing "
1050              + lastFlushedSeqIdPath);
1051        }
1052      } else {
1053        LOG.info("Writing .lastflushedseqids file at: " + lastFlushedSeqIdPath);
1054      }
1055      FSDataOutputStream out = fs.create(lastFlushedSeqIdPath);
1056      FlushedSequenceId.Builder flushedSequenceIdBuilder =
1057          FlushedSequenceId.newBuilder();
1058      try {
1059        for (Entry<byte[], Long> entry : flushedSequenceIdByRegion.entrySet()) {
1060          FlushedRegionSequenceId.Builder flushedRegionSequenceIdBuilder =
1061              FlushedRegionSequenceId.newBuilder();
1062          flushedRegionSequenceIdBuilder.setRegionEncodedName(
1063              ByteString.copyFrom(entry.getKey()));
1064          flushedRegionSequenceIdBuilder.setSeqId(entry.getValue());
1065          ConcurrentNavigableMap<byte[], Long> storeSeqIds =
1066              storeFlushedSequenceIdsByRegion.get(entry.getKey());
1067          if (storeSeqIds != null) {
1068            for (Entry<byte[], Long> store : storeSeqIds.entrySet()) {
1069              FlushedStoreSequenceId.Builder flushedStoreSequenceIdBuilder =
1070                  FlushedStoreSequenceId.newBuilder();
1071              flushedStoreSequenceIdBuilder.setFamily(ByteString.copyFrom(store.getKey()));
1072              flushedStoreSequenceIdBuilder.setSeqId(store.getValue());
1073              flushedRegionSequenceIdBuilder.addStores(flushedStoreSequenceIdBuilder);
1074            }
1075          }
1076          flushedSequenceIdBuilder.addRegionSequenceId(flushedRegionSequenceIdBuilder);
1077        }
1078        flushedSequenceIdBuilder.build().writeDelimitedTo(out);
1079      } finally {
1080        if (out != null) {
1081          out.close();
1082        }
1083      }
1084    } finally {
1085      isFlushSeqIdPersistInProgress = false;
1086    }
1087  }
1088
1089  /**
1090   * Load last flushed sequence id of each region from HDFS, if persisted
1091   */
1092  public void loadLastFlushedSequenceIds() throws IOException {
1093    if (!persistFlushedSequenceId) {
1094      return;
1095    }
1096    Configuration conf = master.getConfiguration();
1097    Path rootDir = CommonFSUtils.getRootDir(conf);
1098    Path lastFlushedSeqIdPath = new Path(rootDir, LAST_FLUSHED_SEQ_ID_FILE);
1099    FileSystem fs = FileSystem.get(conf);
1100    if (!fs.exists(lastFlushedSeqIdPath)) {
1101      LOG.info("No .lastflushedseqids found at" + lastFlushedSeqIdPath
1102          + " will record last flushed sequence id"
1103          + " for regions by regionserver report all over again");
1104      return;
1105    } else {
1106      LOG.info("begin to load .lastflushedseqids at " + lastFlushedSeqIdPath);
1107    }
1108    FSDataInputStream in = fs.open(lastFlushedSeqIdPath);
1109    try {
1110      FlushedSequenceId flushedSequenceId =
1111          FlushedSequenceId.parseDelimitedFrom(in);
1112      if (flushedSequenceId == null) {
1113        LOG.info(".lastflushedseqids found at {} is empty", lastFlushedSeqIdPath);
1114        return;
1115      }
1116      for (FlushedRegionSequenceId flushedRegionSequenceId : flushedSequenceId
1117          .getRegionSequenceIdList()) {
1118        byte[] encodedRegionName = flushedRegionSequenceId
1119            .getRegionEncodedName().toByteArray();
1120        flushedSequenceIdByRegion
1121            .putIfAbsent(encodedRegionName, flushedRegionSequenceId.getSeqId());
1122        if (flushedRegionSequenceId.getStoresList() != null
1123            && flushedRegionSequenceId.getStoresList().size() != 0) {
1124          ConcurrentNavigableMap<byte[], Long> storeFlushedSequenceId =
1125              computeIfAbsent(storeFlushedSequenceIdsByRegion, encodedRegionName,
1126                () -> new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR));
1127          for (FlushedStoreSequenceId flushedStoreSequenceId : flushedRegionSequenceId
1128              .getStoresList()) {
1129            storeFlushedSequenceId
1130                .put(flushedStoreSequenceId.getFamily().toByteArray(),
1131                    flushedStoreSequenceId.getSeqId());
1132          }
1133        }
1134      }
1135    } finally {
1136      in.close();
1137    }
1138  }
1139
1140  /**
1141   * Regions may have been removed between latest persist of FlushedSequenceIds
1142   * and master abort. So after loading FlushedSequenceIds from file, and after
1143   * meta loaded, we need to remove the deleted region according to RegionStates.
1144   */
1145  public void removeDeletedRegionFromLoadedFlushedSequenceIds() {
1146    RegionStates regionStates = master.getAssignmentManager().getRegionStates();
1147    Iterator<byte[]> it = flushedSequenceIdByRegion.keySet().iterator();
1148    while(it.hasNext()) {
1149      byte[] regionEncodedName = it.next();
1150      if (regionStates.getRegionState(Bytes.toStringBinary(regionEncodedName)) == null) {
1151        it.remove();
1152        storeFlushedSequenceIdsByRegion.remove(regionEncodedName);
1153      }
1154    }
1155  }
1156
1157  private class FlushedSequenceIdFlusher extends ScheduledChore {
1158
1159    public FlushedSequenceIdFlusher(String name, int p) {
1160      super(name, master, p, 60 * 1000); //delay one minute before first execute
1161    }
1162
1163    @Override
1164    protected void chore() {
1165      try {
1166        persistRegionLastFlushedSequenceIds();
1167      } catch (IOException e) {
1168        LOG.debug("Failed to persist last flushed sequence id of regions"
1169            + " to file system", e);
1170      }
1171    }
1172  }
1173}