001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.master;
020
021import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
022
023import java.io.IOException;
024import java.net.InetAddress;
025import java.util.ArrayList;
026import java.util.Collections;
027import java.util.Iterator;
028import java.util.List;
029import java.util.Map;
030import java.util.Map.Entry;
031import java.util.Set;
032import java.util.concurrent.ConcurrentNavigableMap;
033import java.util.concurrent.ConcurrentSkipListMap;
034import java.util.concurrent.CopyOnWriteArrayList;
035import java.util.concurrent.atomic.AtomicBoolean;
036import java.util.function.Predicate;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.fs.FSDataInputStream;
039import org.apache.hadoop.fs.FSDataOutputStream;
040import org.apache.hadoop.fs.FileSystem;
041import org.apache.hadoop.fs.Path;
042import org.apache.hadoop.hbase.ClockOutOfSyncException;
043import org.apache.hadoop.hbase.HConstants;
044import org.apache.hadoop.hbase.NotServingRegionException;
045import org.apache.hadoop.hbase.RegionMetrics;
046import org.apache.hadoop.hbase.ScheduledChore;
047import org.apache.hadoop.hbase.ServerMetrics;
048import org.apache.hadoop.hbase.ServerMetricsBuilder;
049import org.apache.hadoop.hbase.ServerName;
050import org.apache.hadoop.hbase.YouAreDeadException;
051import org.apache.hadoop.hbase.client.AsyncClusterConnection;
052import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
053import org.apache.hadoop.hbase.client.RegionInfo;
054import org.apache.hadoop.hbase.ipc.RemoteWithExtrasException;
055import org.apache.hadoop.hbase.master.assignment.RegionStates;
056import org.apache.hadoop.hbase.monitoring.MonitoredTask;
057import org.apache.hadoop.hbase.procedure2.Procedure;
058import org.apache.hadoop.hbase.util.Bytes;
059import org.apache.hadoop.hbase.util.CommonFSUtils;
060import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
061import org.apache.hadoop.hbase.util.FutureUtils;
062import org.apache.hadoop.hbase.zookeeper.ZKUtil;
063import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
064import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
065import org.apache.yetus.audience.InterfaceAudience;
066import org.apache.zookeeper.KeeperException;
067import org.slf4j.Logger;
068import org.slf4j.LoggerFactory;
069
070import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
071import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
072import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
073
074import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
075import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
076import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
077import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
078import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.FlushedRegionSequenceId;
079import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.FlushedSequenceId;
080import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.FlushedStoreSequenceId;
081import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
082
083/**
084 * The ServerManager class manages info about region servers.
085 * <p>
086 * Maintains lists of online and dead servers.  Processes the startups,
087 * shutdowns, and deaths of region servers.
088 * <p>
089 * Servers are distinguished in two different ways.  A given server has a
090 * location, specified by hostname and port, and of which there can only be one
091 * online at any given time.  A server instance is specified by the location
092 * (hostname and port) as well as the startcode (timestamp from when the server
093 * was started).  This is used to differentiate a restarted instance of a given
094 * server from the original instance.
095 * <p>
096 * If a sever is known not to be running any more, it is called dead. The dead
097 * server needs to be handled by a ServerShutdownHandler.  If the handler is not
098 * enabled yet, the server can't be handled right away so it is queued up.
099 * After the handler is enabled, the server will be submitted to a handler to handle.
100 * However, the handler may be just partially enabled.  If so,
101 * the server cannot be fully processed, and be queued up for further processing.
102 * A server is fully processed only after the handler is fully enabled
103 * and has completed the handling.
104 */
105@InterfaceAudience.Private
106public class ServerManager {
107  public static final String WAIT_ON_REGIONSERVERS_MAXTOSTART =
108      "hbase.master.wait.on.regionservers.maxtostart";
109
110  public static final String WAIT_ON_REGIONSERVERS_MINTOSTART =
111      "hbase.master.wait.on.regionservers.mintostart";
112
113  public static final String WAIT_ON_REGIONSERVERS_TIMEOUT =
114      "hbase.master.wait.on.regionservers.timeout";
115
116  public static final String WAIT_ON_REGIONSERVERS_INTERVAL =
117      "hbase.master.wait.on.regionservers.interval";
118
119  /**
120   * see HBASE-20727
121   * if set to true, flushedSequenceIdByRegion and storeFlushedSequenceIdsByRegion
122   * will be persisted to HDFS and loaded when master restart to speed up log split
123   */
124  public static final String PERSIST_FLUSHEDSEQUENCEID =
125      "hbase.master.persist.flushedsequenceid.enabled";
126
127  public static final boolean PERSIST_FLUSHEDSEQUENCEID_DEFAULT = true;
128
129  public static final String FLUSHEDSEQUENCEID_FLUSHER_INTERVAL =
130      "hbase.master.flushedsequenceid.flusher.interval";
131
132  public static final int FLUSHEDSEQUENCEID_FLUSHER_INTERVAL_DEFAULT =
133      3 * 60 * 60 * 1000; // 3 hours
134
135  public static final String MAX_CLOCK_SKEW_MS = "hbase.master.maxclockskew";
136
137  private static final Logger LOG = LoggerFactory.getLogger(ServerManager.class);
138
139  // Set if we are to shutdown the cluster.
140  private AtomicBoolean clusterShutdown = new AtomicBoolean(false);
141
142  /**
143   * The last flushed sequence id for a region.
144   */
145  private final ConcurrentNavigableMap<byte[], Long> flushedSequenceIdByRegion =
146    new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
147
148  private boolean persistFlushedSequenceId = true;
149  private volatile boolean isFlushSeqIdPersistInProgress = false;
150  /** File on hdfs to store last flushed sequence id of regions */
151  private static final String LAST_FLUSHED_SEQ_ID_FILE = ".lastflushedseqids";
152  private  FlushedSequenceIdFlusher flushedSeqIdFlusher;
153
154
155  /**
156   * The last flushed sequence id for a store in a region.
157   */
158  private final ConcurrentNavigableMap<byte[], ConcurrentNavigableMap<byte[], Long>>
159    storeFlushedSequenceIdsByRegion = new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
160
161  /** Map of registered servers to their current load */
162  private final ConcurrentNavigableMap<ServerName, ServerMetrics> onlineServers =
163    new ConcurrentSkipListMap<>();
164
165  /** List of region servers that should not get any more new regions. */
166  private final ArrayList<ServerName> drainingServers = new ArrayList<>();
167
168  private final MasterServices master;
169
170  private final DeadServer deadservers = new DeadServer();
171
172  private final long maxSkew;
173  private final long warningSkew;
174
175  /** Listeners that are called on server events. */
176  private List<ServerListener> listeners = new CopyOnWriteArrayList<>();
177
178  /**
179   * Constructor.
180   */
181  public ServerManager(final MasterServices master) {
182    this.master = master;
183    Configuration c = master.getConfiguration();
184    maxSkew = c.getLong(MAX_CLOCK_SKEW_MS, 30000);
185    warningSkew = c.getLong("hbase.master.warningclockskew", 10000);
186    persistFlushedSequenceId = c.getBoolean(PERSIST_FLUSHEDSEQUENCEID,
187        PERSIST_FLUSHEDSEQUENCEID_DEFAULT);
188  }
189
190  /**
191   * Add the listener to the notification list.
192   * @param listener The ServerListener to register
193   */
194  public void registerListener(final ServerListener listener) {
195    this.listeners.add(listener);
196  }
197
198  /**
199   * Remove the listener from the notification list.
200   * @param listener The ServerListener to unregister
201   */
202  public boolean unregisterListener(final ServerListener listener) {
203    return this.listeners.remove(listener);
204  }
205
206  /**
207   * Let the server manager know a new regionserver has come online
208   * @param request the startup request
209   * @param versionNumber the version number of the new regionserver
210   * @param version the version of the new regionserver, could contain strings like "SNAPSHOT"
211   * @param ia the InetAddress from which request is received
212   * @return The ServerName we know this server as.
213   * @throws IOException
214   */
215  ServerName regionServerStartup(RegionServerStartupRequest request, int versionNumber,
216      String version, InetAddress ia) throws IOException {
217    // Test for case where we get a region startup message from a regionserver
218    // that has been quickly restarted but whose znode expiration handler has
219    // not yet run, or from a server whose fail we are currently processing.
220    // Test its host+port combo is present in serverAddressToServerInfo. If it
221    // is, reject the server and trigger its expiration. The next time it comes
222    // in, it should have been removed from serverAddressToServerInfo and queued
223    // for processing by ProcessServerShutdown.
224
225    final String hostname =
226      request.hasUseThisHostnameInstead() ? request.getUseThisHostnameInstead() : ia.getHostName();
227    ServerName sn = ServerName.valueOf(hostname, request.getPort(), request.getServerStartCode());
228    checkClockSkew(sn, request.getServerCurrentTime());
229    checkIsDead(sn, "STARTUP");
230    if (!checkAndRecordNewServer(sn, ServerMetricsBuilder.of(sn, versionNumber, version))) {
231      LOG.warn(
232        "THIS SHOULD NOT HAPPEN, RegionServerStartup" + " could not record the server: " + sn);
233    }
234    return sn;
235  }
236
237  /**
238   * Updates last flushed sequence Ids for the regions on server sn
239   * @param sn
240   * @param hsl
241   */
242  private void updateLastFlushedSequenceIds(ServerName sn, ServerMetrics hsl) {
243    for (Entry<byte[], RegionMetrics> entry : hsl.getRegionMetrics().entrySet()) {
244      byte[] encodedRegionName = Bytes.toBytes(RegionInfo.encodeRegionName(entry.getKey()));
245      Long existingValue = flushedSequenceIdByRegion.get(encodedRegionName);
246      long l = entry.getValue().getCompletedSequenceId();
247      // Don't let smaller sequence ids override greater sequence ids.
248      if (LOG.isTraceEnabled()) {
249        LOG.trace(Bytes.toString(encodedRegionName) + ", existingValue=" + existingValue +
250          ", completeSequenceId=" + l);
251      }
252      if (existingValue == null || (l != HConstants.NO_SEQNUM && l > existingValue)) {
253        flushedSequenceIdByRegion.put(encodedRegionName, l);
254      } else if (l != HConstants.NO_SEQNUM && l < existingValue) {
255        LOG.warn("RegionServer " + sn + " indicates a last flushed sequence id ("
256            + l + ") that is less than the previous last flushed sequence id ("
257            + existingValue + ") for region " + Bytes.toString(entry.getKey()) + " Ignoring.");
258      }
259      ConcurrentNavigableMap<byte[], Long> storeFlushedSequenceId =
260          computeIfAbsent(storeFlushedSequenceIdsByRegion, encodedRegionName,
261            () -> new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR));
262      for (Entry<byte[], Long> storeSeqId : entry.getValue().getStoreSequenceId().entrySet()) {
263        byte[] family = storeSeqId.getKey();
264        existingValue = storeFlushedSequenceId.get(family);
265        l = storeSeqId.getValue();
266        if (LOG.isTraceEnabled()) {
267          LOG.trace(Bytes.toString(encodedRegionName) + ", family=" + Bytes.toString(family) +
268            ", existingValue=" + existingValue + ", completeSequenceId=" + l);
269        }
270        // Don't let smaller sequence ids override greater sequence ids.
271        if (existingValue == null || (l != HConstants.NO_SEQNUM && l > existingValue.longValue())) {
272          storeFlushedSequenceId.put(family, l);
273        }
274      }
275    }
276  }
277
278  @VisibleForTesting
279  public void regionServerReport(ServerName sn,
280    ServerMetrics sl) throws YouAreDeadException {
281    checkIsDead(sn, "REPORT");
282    if (null == this.onlineServers.replace(sn, sl)) {
283      // Already have this host+port combo and its just different start code?
284      // Just let the server in. Presume master joining a running cluster.
285      // recordNewServer is what happens at the end of reportServerStartup.
286      // The only thing we are skipping is passing back to the regionserver
287      // the ServerName to use. Here we presume a master has already done
288      // that so we'll press on with whatever it gave us for ServerName.
289      if (!checkAndRecordNewServer(sn, sl)) {
290        LOG.info("RegionServerReport ignored, could not record the server: " + sn);
291        return; // Not recorded, so no need to move on
292      }
293    }
294    updateLastFlushedSequenceIds(sn, sl);
295  }
296
297  /**
298   * Check is a server of same host and port already exists,
299   * if not, or the existed one got a smaller start code, record it.
300   *
301   * @param serverName the server to check and record
302   * @param sl the server load on the server
303   * @return true if the server is recorded, otherwise, false
304   */
305  boolean checkAndRecordNewServer(final ServerName serverName, final ServerMetrics sl) {
306    ServerName existingServer = null;
307    synchronized (this.onlineServers) {
308      existingServer = findServerWithSameHostnamePortWithLock(serverName);
309      if (existingServer != null && (existingServer.getStartcode() > serverName.getStartcode())) {
310        LOG.info("Server serverName=" + serverName + " rejected; we already have "
311            + existingServer.toString() + " registered with same hostname and port");
312        return false;
313      }
314      recordNewServerWithLock(serverName, sl);
315    }
316
317    // Tell our listeners that a server was added
318    if (!this.listeners.isEmpty()) {
319      for (ServerListener listener : this.listeners) {
320        listener.serverAdded(serverName);
321      }
322    }
323
324    // Note that we assume that same ts means same server, and don't expire in that case.
325    //  TODO: ts can theoretically collide due to clock shifts, so this is a bit hacky.
326    if (existingServer != null &&
327        (existingServer.getStartcode() < serverName.getStartcode())) {
328      LOG.info("Triggering server recovery; existingServer " +
329          existingServer + " looks stale, new server:" + serverName);
330      expireServer(existingServer);
331    }
332    return true;
333  }
334
335  /**
336   * Find out the region servers crashed between the crash of the previous master instance and the
337   * current master instance and schedule SCP for them.
338   * <p/>
339   * Since the {@code RegionServerTracker} has already helped us to construct the online servers set
340   * by scanning zookeeper, now we can compare the online servers with {@code liveServersFromWALDir}
341   * to find out whether there are servers which are already dead.
342   * <p/>
343   * Must be called inside the initialization method of {@code RegionServerTracker} to avoid
344   * concurrency issue.
345   * @param deadServersFromPE the region servers which already have a SCP associated.
346   * @param liveServersFromWALDir the live region servers from wal directory.
347   */
348  void findDeadServersAndProcess(Set<ServerName> deadServersFromPE,
349      Set<ServerName> liveServersFromWALDir) {
350    deadServersFromPE.forEach(deadservers::putIfAbsent);
351    liveServersFromWALDir.stream().filter(sn -> !onlineServers.containsKey(sn))
352      .forEach(this::expireServer);
353  }
354
355  /**
356   * Checks if the clock skew between the server and the master. If the clock skew exceeds the
357   * configured max, it will throw an exception; if it exceeds the configured warning threshold,
358   * it will log a warning but start normally.
359   * @param serverName Incoming servers's name
360   * @param serverCurrentTime
361   * @throws ClockOutOfSyncException if the skew exceeds the configured max value
362   */
363  private void checkClockSkew(final ServerName serverName, final long serverCurrentTime)
364      throws ClockOutOfSyncException {
365    long skew = Math.abs(EnvironmentEdgeManager.currentTime() - serverCurrentTime);
366    if (skew > maxSkew) {
367      String message = "Server " + serverName + " has been " +
368        "rejected; Reported time is too far out of sync with master.  " +
369        "Time difference of " + skew + "ms > max allowed of " + maxSkew + "ms";
370      LOG.warn(message);
371      throw new ClockOutOfSyncException(message);
372    } else if (skew > warningSkew){
373      String message = "Reported time for server " + serverName + " is out of sync with master " +
374        "by " + skew + "ms. (Warning threshold is " + warningSkew + "ms; " +
375        "error threshold is " + maxSkew + "ms)";
376      LOG.warn(message);
377    }
378  }
379
380  /**
381   * Called when RegionServer first reports in for duty and thereafter each
382   * time it heartbeats to make sure it is has not been figured for dead.
383   * If this server is on the dead list, reject it with a YouAreDeadException.
384   * If it was dead but came back with a new start code, remove the old entry
385   * from the dead list.
386   * @param what START or REPORT
387   */
388  private void checkIsDead(final ServerName serverName, final String what)
389      throws YouAreDeadException {
390    if (this.deadservers.isDeadServer(serverName)) {
391      // Exact match: host name, port and start code all match with existing one of the
392      // dead servers. So, this server must be dead. Tell it to kill itself.
393      String message = "Server " + what + " rejected; currently processing " +
394          serverName + " as dead server";
395      LOG.debug(message);
396      throw new YouAreDeadException(message);
397    }
398    // Remove dead server with same hostname and port of newly checking in rs after master
399    // initialization. See HBASE-5916 for more information.
400    if ((this.master == null || this.master.isInitialized()) &&
401        this.deadservers.cleanPreviousInstance(serverName)) {
402      // This server has now become alive after we marked it as dead.
403      // We removed it's previous entry from the dead list to reflect it.
404      LOG.debug("{} {} came back up, removed it from the dead servers list", what, serverName);
405    }
406  }
407
408  /**
409   * Assumes onlineServers is locked.
410   * @return ServerName with matching hostname and port.
411   */
412  private ServerName findServerWithSameHostnamePortWithLock(
413      final ServerName serverName) {
414    ServerName end = ServerName.valueOf(serverName.getHostname(), serverName.getPort(),
415        Long.MAX_VALUE);
416
417    ServerName r = onlineServers.lowerKey(end);
418    if (r != null) {
419      if (ServerName.isSameAddress(r, serverName)) {
420        return r;
421      }
422    }
423    return null;
424  }
425
426  /**
427   * Adds the onlineServers list. onlineServers should be locked.
428   * @param serverName The remote servers name.
429   */
430  @VisibleForTesting
431  void recordNewServerWithLock(final ServerName serverName, final ServerMetrics sl) {
432    LOG.info("Registering regionserver=" + serverName);
433    this.onlineServers.put(serverName, sl);
434  }
435
436  @VisibleForTesting
437  public ConcurrentNavigableMap<byte[], Long> getFlushedSequenceIdByRegion() {
438    return flushedSequenceIdByRegion;
439  }
440
441  public RegionStoreSequenceIds getLastFlushedSequenceId(byte[] encodedRegionName) {
442    RegionStoreSequenceIds.Builder builder = RegionStoreSequenceIds.newBuilder();
443    Long seqId = flushedSequenceIdByRegion.get(encodedRegionName);
444    builder.setLastFlushedSequenceId(seqId != null ? seqId.longValue() : HConstants.NO_SEQNUM);
445    Map<byte[], Long> storeFlushedSequenceId =
446        storeFlushedSequenceIdsByRegion.get(encodedRegionName);
447    if (storeFlushedSequenceId != null) {
448      for (Map.Entry<byte[], Long> entry : storeFlushedSequenceId.entrySet()) {
449        builder.addStoreSequenceId(StoreSequenceId.newBuilder()
450            .setFamilyName(UnsafeByteOperations.unsafeWrap(entry.getKey()))
451            .setSequenceId(entry.getValue().longValue()).build());
452      }
453    }
454    return builder.build();
455  }
456
457  /**
458   * @param serverName
459   * @return ServerMetrics if serverName is known else null
460   */
461  public ServerMetrics getLoad(final ServerName serverName) {
462    return this.onlineServers.get(serverName);
463  }
464
465  /**
466   * Compute the average load across all region servers.
467   * Currently, this uses a very naive computation - just uses the number of
468   * regions being served, ignoring stats about number of requests.
469   * @return the average load
470   */
471  public double getAverageLoad() {
472    int totalLoad = 0;
473    int numServers = 0;
474    for (ServerMetrics sl : this.onlineServers.values()) {
475      numServers++;
476      totalLoad += sl.getRegionMetrics().size();
477    }
478    return numServers == 0 ? 0 :
479      (double)totalLoad / (double)numServers;
480  }
481
482  /** @return the count of active regionservers */
483  public int countOfRegionServers() {
484    // Presumes onlineServers is a concurrent map
485    return this.onlineServers.size();
486  }
487
488  /**
489   * @return Read-only map of servers to serverinfo
490   */
491  public Map<ServerName, ServerMetrics> getOnlineServers() {
492    // Presumption is that iterating the returned Map is OK.
493    synchronized (this.onlineServers) {
494      return Collections.unmodifiableMap(this.onlineServers);
495    }
496  }
497
498  public DeadServer getDeadServers() {
499    return this.deadservers;
500  }
501
502  /**
503   * Checks if any dead servers are currently in progress.
504   * @return true if any RS are being processed as dead, false if not
505   */
506  public boolean areDeadServersInProgress() {
507    return this.deadservers.areDeadServersInProgress();
508  }
509
510  void letRegionServersShutdown() {
511    long previousLogTime = 0;
512    ServerName sn = master.getServerName();
513    ZKWatcher zkw = master.getZooKeeper();
514    int onlineServersCt;
515    while ((onlineServersCt = onlineServers.size()) > 0){
516
517      if (System.currentTimeMillis() > (previousLogTime + 1000)) {
518        Set<ServerName> remainingServers = onlineServers.keySet();
519        synchronized (onlineServers) {
520          if (remainingServers.size() == 1 && remainingServers.contains(sn)) {
521            // Master will delete itself later.
522            return;
523          }
524        }
525        StringBuilder sb = new StringBuilder();
526        // It's ok here to not sync on onlineServers - merely logging
527        for (ServerName key : remainingServers) {
528          if (sb.length() > 0) {
529            sb.append(", ");
530          }
531          sb.append(key);
532        }
533        LOG.info("Waiting on regionserver(s) " + sb.toString());
534        previousLogTime = System.currentTimeMillis();
535      }
536
537      try {
538        List<String> servers = getRegionServersInZK(zkw);
539        if (servers == null || servers.isEmpty() || (servers.size() == 1
540            && servers.contains(sn.toString()))) {
541          LOG.info("ZK shows there is only the master self online, exiting now");
542          // Master could have lost some ZK events, no need to wait more.
543          break;
544        }
545      } catch (KeeperException ke) {
546        LOG.warn("Failed to list regionservers", ke);
547        // ZK is malfunctioning, don't hang here
548        break;
549      }
550      synchronized (onlineServers) {
551        try {
552          if (onlineServersCt == onlineServers.size()) onlineServers.wait(100);
553        } catch (InterruptedException ignored) {
554          // continue
555        }
556      }
557    }
558  }
559
560  private List<String> getRegionServersInZK(final ZKWatcher zkw)
561  throws KeeperException {
562    return ZKUtil.listChildrenNoWatch(zkw, zkw.getZNodePaths().rsZNode);
563  }
564
565  /**
566   * Expire the passed server. Add it to list of dead servers and queue a shutdown processing.
567   * @return pid if we queued a ServerCrashProcedure else {@link Procedure#NO_PROC_ID} if we did
568   *         not (could happen for many reasons including the fact that its this server that is
569   *         going down or we already have queued an SCP for this server or SCP processing is
570   *         currently disabled because we are in startup phase).
571   */
572  @VisibleForTesting // Redo test so we can make this protected.
573  public synchronized long expireServer(final ServerName serverName) {
574    return expireServer(serverName, false);
575
576  }
577
578  synchronized long expireServer(final ServerName serverName, boolean force) {
579    // THIS server is going down... can't handle our own expiration.
580    if (serverName.equals(master.getServerName())) {
581      if (!(master.isAborted() || master.isStopped())) {
582        master.stop("We lost our znode?");
583      }
584      return Procedure.NO_PROC_ID;
585    }
586    if (this.deadservers.isDeadServer(serverName)) {
587      LOG.warn("Expiration called on {} but already in DeadServer", serverName);
588      return Procedure.NO_PROC_ID;
589    }
590    moveFromOnlineToDeadServers(serverName);
591
592    // If server is in draining mode, remove corresponding znode
593    // In some tests, the mocked HM may not have ZK Instance, hence null check
594    if (master.getZooKeeper() != null) {
595      String drainingZnode = ZNodePaths
596        .joinZNode(master.getZooKeeper().getZNodePaths().drainingZNode, serverName.getServerName());
597      try {
598        ZKUtil.deleteNodeFailSilent(master.getZooKeeper(), drainingZnode);
599      } catch (KeeperException e) {
600        LOG.warn("Error deleting the draining znode for stopping server " + serverName.getServerName(), e);
601      }
602    }
603    
604    // If cluster is going down, yes, servers are going to be expiring; don't
605    // process as a dead server
606    if (isClusterShutdown()) {
607      LOG.info("Cluster shutdown set; " + serverName +
608        " expired; onlineServers=" + this.onlineServers.size());
609      if (this.onlineServers.isEmpty()) {
610        master.stop("Cluster shutdown set; onlineServer=0");
611      }
612      return Procedure.NO_PROC_ID;
613    }
614    LOG.info("Processing expiration of " + serverName + " on " + this.master.getServerName());
615    long pid = master.getAssignmentManager().submitServerCrash(serverName, true, force);
616    // Tell our listeners that a server was removed
617    if (!this.listeners.isEmpty()) {
618      this.listeners.stream().forEach(l -> l.serverRemoved(serverName));
619    }
620    // trigger a persist of flushedSeqId
621    if (flushedSeqIdFlusher != null) {
622      flushedSeqIdFlusher.triggerNow();
623    }
624    return pid;
625  }
626
627  /**
628   * Called when server has expired.
629   */
630  // Locking in this class needs cleanup.
631  @VisibleForTesting
632  public synchronized void moveFromOnlineToDeadServers(final ServerName sn) {
633    synchronized (this.onlineServers) {
634      boolean online = this.onlineServers.containsKey(sn);
635      if (online) {
636        // Remove the server from the known servers lists and update load info BUT
637        // add to deadservers first; do this so it'll show in dead servers list if
638        // not in online servers list.
639        this.deadservers.putIfAbsent(sn);
640        this.onlineServers.remove(sn);
641        onlineServers.notifyAll();
642      } else {
643        // If not online, that is odd but may happen if 'Unknown Servers' -- where meta
644        // has references to servers not online nor in dead servers list. If
645        // 'Unknown Server', don't add to DeadServers else will be there for ever.
646        LOG.trace("Expiration of {} but server not online", sn);
647      }
648    }
649  }
650
651  /*
652   * Remove the server from the drain list.
653   */
654  public synchronized boolean removeServerFromDrainList(final ServerName sn) {
655    // Warn if the server (sn) is not online.  ServerName is of the form:
656    // <hostname> , <port> , <startcode>
657
658    if (!this.isServerOnline(sn)) {
659      LOG.warn("Server " + sn + " is not currently online. " +
660               "Removing from draining list anyway, as requested.");
661    }
662    // Remove the server from the draining servers lists.
663    return this.drainingServers.remove(sn);
664  }
665
666  /**
667   * Add the server to the drain list.
668   * @param sn
669   * @return True if the server is added or the server is already on the drain list.
670   */
671  public synchronized boolean addServerToDrainList(final ServerName sn) {
672    // Warn if the server (sn) is not online.  ServerName is of the form:
673    // <hostname> , <port> , <startcode>
674
675    if (!this.isServerOnline(sn)) {
676      LOG.warn("Server " + sn + " is not currently online. " +
677               "Ignoring request to add it to draining list.");
678      return false;
679    }
680    // Add the server to the draining servers lists, if it's not already in
681    // it.
682    if (this.drainingServers.contains(sn)) {
683      LOG.warn("Server " + sn + " is already in the draining server list." +
684               "Ignoring request to add it again.");
685      return true;
686    }
687    LOG.info("Server " + sn + " added to draining server list.");
688    return this.drainingServers.add(sn);
689  }
690
691  /**
692   * Contacts a region server and waits up to timeout ms
693   * to close the region.  This bypasses the active hmaster.
694   * Pass -1 as timeout if you do not want to wait on result.
695   */
696  public static void closeRegionSilentlyAndWait(AsyncClusterConnection connection,
697      ServerName server, RegionInfo region, long timeout) throws IOException, InterruptedException {
698    AsyncRegionServerAdmin admin = connection.getRegionServerAdmin(server);
699    try {
700      FutureUtils.get(
701        admin.closeRegion(ProtobufUtil.buildCloseRegionRequest(server, region.getRegionName())));
702    } catch (IOException e) {
703      LOG.warn("Exception when closing region: " + region.getRegionNameAsString(), e);
704    }
705    if (timeout < 0) {
706      return;
707    }
708    long expiration = timeout + System.currentTimeMillis();
709    while (System.currentTimeMillis() < expiration) {
710      try {
711        RegionInfo rsRegion = ProtobufUtil.toRegionInfo(FutureUtils
712          .get(
713            admin.getRegionInfo(RequestConverter.buildGetRegionInfoRequest(region.getRegionName())))
714          .getRegionInfo());
715        if (rsRegion == null) {
716          return;
717        }
718      } catch (IOException ioe) {
719        if (ioe instanceof NotServingRegionException ||
720          (ioe instanceof RemoteWithExtrasException &&
721            ((RemoteWithExtrasException)ioe).unwrapRemoteException()
722              instanceof NotServingRegionException)) {
723          // no need to retry again
724          return;
725        }
726        LOG.warn("Exception when retrieving regioninfo from: " + region.getRegionNameAsString(),
727          ioe);
728      }
729      Thread.sleep(1000);
730    }
731    throw new IOException("Region " + region + " failed to close within" + " timeout " + timeout);
732  }
733
734  /**
735   * Calculate min necessary to start. This is not an absolute. It is just
736   * a friction that will cause us hang around a bit longer waiting on
737   * RegionServers to check-in.
738   */
739  private int getMinToStart() {
740    if (master.isInMaintenanceMode()) {
741      // If in maintenance mode, then master hosting meta will be the only server available
742      return 1;
743    }
744
745    int minimumRequired = 1;
746    if (LoadBalancer.isTablesOnMaster(master.getConfiguration()) &&
747        LoadBalancer.isSystemTablesOnlyOnMaster(master.getConfiguration())) {
748      // If Master is carrying regions it will show up as a 'server', but is not handling user-
749      // space regions, so we need a second server.
750      minimumRequired = 2;
751    }
752
753    int minToStart = this.master.getConfiguration().getInt(WAIT_ON_REGIONSERVERS_MINTOSTART, -1);
754    // Ensure we are never less than minimumRequired else stuff won't work.
755    return Math.max(minToStart, minimumRequired);
756  }
757
758  /**
759   * Wait for the region servers to report in.
760   * We will wait until one of this condition is met:
761   *  - the master is stopped
762   *  - the 'hbase.master.wait.on.regionservers.maxtostart' number of
763   *    region servers is reached
764   *  - the 'hbase.master.wait.on.regionservers.mintostart' is reached AND
765   *   there have been no new region server in for
766   *      'hbase.master.wait.on.regionservers.interval' time AND
767   *   the 'hbase.master.wait.on.regionservers.timeout' is reached
768   *
769   * @throws InterruptedException
770   */
771  public void waitForRegionServers(MonitoredTask status) throws InterruptedException {
772    final long interval = this.master.getConfiguration().
773        getLong(WAIT_ON_REGIONSERVERS_INTERVAL, 1500);
774    final long timeout = this.master.getConfiguration().
775        getLong(WAIT_ON_REGIONSERVERS_TIMEOUT, 4500);
776    // Min is not an absolute; just a friction making us wait longer on server checkin.
777    int minToStart = getMinToStart();
778    int maxToStart = this.master.getConfiguration().
779        getInt(WAIT_ON_REGIONSERVERS_MAXTOSTART, Integer.MAX_VALUE);
780    if (maxToStart < minToStart) {
781      LOG.warn(String.format("The value of '%s' (%d) is set less than '%s' (%d), ignoring.",
782          WAIT_ON_REGIONSERVERS_MAXTOSTART, maxToStart,
783          WAIT_ON_REGIONSERVERS_MINTOSTART, minToStart));
784      maxToStart = Integer.MAX_VALUE;
785    }
786
787    long now =  System.currentTimeMillis();
788    final long startTime = now;
789    long slept = 0;
790    long lastLogTime = 0;
791    long lastCountChange = startTime;
792    int count = countOfRegionServers();
793    int oldCount = 0;
794    // This while test is a little hard to read. We try to comment it in below but in essence:
795    // Wait if Master is not stopped and the number of regionservers that have checked-in is
796    // less than the maxToStart. Both of these conditions will be true near universally.
797    // Next, we will keep cycling if ANY of the following three conditions are true:
798    // 1. The time since a regionserver registered is < interval (means servers are actively checking in).
799    // 2. We are under the total timeout.
800    // 3. The count of servers is < minimum.
801    for (ServerListener listener: this.listeners) {
802      listener.waiting();
803    }
804    while (!this.master.isStopped() && !isClusterShutdown() && count < maxToStart &&
805        ((lastCountChange + interval) > now || timeout > slept || count < minToStart)) {
806      // Log some info at every interval time or if there is a change
807      if (oldCount != count || lastLogTime + interval < now) {
808        lastLogTime = now;
809        String msg =
810            "Waiting on regionserver count=" + count + "; waited="+
811                slept + "ms, expecting min=" + minToStart + " server(s), max="+ getStrForMax(maxToStart) +
812                " server(s), " + "timeout=" + timeout + "ms, lastChange=" + (lastCountChange - now) + "ms";
813        LOG.info(msg);
814        status.setStatus(msg);
815      }
816
817      // We sleep for some time
818      final long sleepTime = 50;
819      Thread.sleep(sleepTime);
820      now =  System.currentTimeMillis();
821      slept = now - startTime;
822
823      oldCount = count;
824      count = countOfRegionServers();
825      if (count != oldCount) {
826        lastCountChange = now;
827      }
828    }
829    // Did we exit the loop because cluster is going down?
830    if (isClusterShutdown()) {
831      this.master.stop("Cluster shutdown");
832    }
833    LOG.info("Finished waiting on RegionServer count=" + count + "; waited=" + slept + "ms," +
834        " expected min=" + minToStart + " server(s), max=" +  getStrForMax(maxToStart) + " server(s),"+
835        " master is "+ (this.master.isStopped() ? "stopped.": "running"));
836  }
837
838  private String getStrForMax(final int max) {
839    return max == Integer.MAX_VALUE? "NO_LIMIT": Integer.toString(max);
840  }
841
842  /**
843   * @return A copy of the internal list of online servers.
844   */
845  public List<ServerName> getOnlineServersList() {
846    // TODO: optimize the load balancer call so we don't need to make a new list
847    // TODO: FIX. THIS IS POPULAR CALL.
848    return new ArrayList<>(this.onlineServers.keySet());
849  }
850
851  /**
852   * @param keys The target server name
853   * @param idleServerPredicator Evaluates the server on the given load
854   * @return A copy of the internal list of online servers matched by the predicator
855   */
856  public List<ServerName> getOnlineServersListWithPredicator(List<ServerName> keys,
857    Predicate<ServerMetrics> idleServerPredicator) {
858    List<ServerName> names = new ArrayList<>();
859    if (keys != null && idleServerPredicator != null) {
860      keys.forEach(name -> {
861        ServerMetrics load = onlineServers.get(name);
862        if (load != null) {
863          if (idleServerPredicator.test(load)) {
864            names.add(name);
865          }
866        }
867      });
868    }
869    return names;
870  }
871
872  /**
873   * @return A copy of the internal list of draining servers.
874   */
875  public List<ServerName> getDrainingServersList() {
876    return new ArrayList<>(this.drainingServers);
877  }
878
879  public boolean isServerOnline(ServerName serverName) {
880    return serverName != null && onlineServers.containsKey(serverName);
881  }
882
883  public enum ServerLiveState {
884    LIVE,
885    DEAD,
886    UNKNOWN
887  }
888
889  /**
890   * @return whether the server is online, dead, or unknown.
891   */
892  public synchronized ServerLiveState isServerKnownAndOnline(ServerName serverName) {
893    return onlineServers.containsKey(serverName) ? ServerLiveState.LIVE
894      : (deadservers.isDeadServer(serverName) ? ServerLiveState.DEAD : ServerLiveState.UNKNOWN);
895  }
896
897  /**
898   * Check if a server is known to be dead.  A server can be online,
899   * or known to be dead, or unknown to this manager (i.e, not online,
900   * not known to be dead either; it is simply not tracked by the
901   * master any more, for example, a very old previous instance).
902   */
903  public synchronized boolean isServerDead(ServerName serverName) {
904    return serverName == null || deadservers.isDeadServer(serverName);
905  }
906
907  public void shutdownCluster() {
908    String statusStr = "Cluster shutdown requested of master=" + this.master.getServerName();
909    LOG.info(statusStr);
910    this.clusterShutdown.set(true);
911    if (onlineServers.isEmpty()) {
912      // we do not synchronize here so this may cause a double stop, but not a big deal
913      master.stop("OnlineServer=0 right after cluster shutdown set");
914    }
915  }
916
917  public boolean isClusterShutdown() {
918    return this.clusterShutdown.get();
919  }
920
921  /**
922   * start chore in ServerManager
923   */
924  public void startChore() {
925    Configuration c = master.getConfiguration();
926    if (persistFlushedSequenceId) {
927      // when reach here, RegionStates should loaded, firstly, we call remove deleted regions
928      removeDeletedRegionFromLoadedFlushedSequenceIds();
929      int flushPeriod = c.getInt(FLUSHEDSEQUENCEID_FLUSHER_INTERVAL,
930          FLUSHEDSEQUENCEID_FLUSHER_INTERVAL_DEFAULT);
931      flushedSeqIdFlusher = new FlushedSequenceIdFlusher(
932          "FlushedSequenceIdFlusher", flushPeriod);
933      master.getChoreService().scheduleChore(flushedSeqIdFlusher);
934    }
935  }
936
937  /**
938   * Stop the ServerManager.
939   */
940  public void stop() {
941    if (flushedSeqIdFlusher != null) {
942      flushedSeqIdFlusher.cancel();
943    }
944    if (persistFlushedSequenceId) {
945      try {
946        persistRegionLastFlushedSequenceIds();
947      } catch (IOException e) {
948        LOG.warn("Failed to persist last flushed sequence id of regions"
949            + " to file system", e);
950      }
951    }
952  }
953
954  /**
955   * Creates a list of possible destinations for a region. It contains the online servers, but not
956   *  the draining or dying servers.
957   *  @param serversToExclude can be null if there is no server to exclude
958   */
959  public List<ServerName> createDestinationServersList(final List<ServerName> serversToExclude){
960    final List<ServerName> destServers = getOnlineServersList();
961
962    if (serversToExclude != null) {
963      destServers.removeAll(serversToExclude);
964    }
965
966    // Loop through the draining server list and remove them from the server list
967    final List<ServerName> drainingServersCopy = getDrainingServersList();
968    destServers.removeAll(drainingServersCopy);
969
970    return destServers;
971  }
972
973  /**
974   * Calls {@link #createDestinationServersList} without server to exclude.
975   */
976  public List<ServerName> createDestinationServersList(){
977    return createDestinationServersList(null);
978  }
979
980  /**
981   * To clear any dead server with same host name and port of any online server
982   */
983  void clearDeadServersWithSameHostNameAndPortOfOnlineServer() {
984    for (ServerName serverName : getOnlineServersList()) {
985      deadservers.cleanAllPreviousInstances(serverName);
986    }
987  }
988
989  /**
990   * Called by delete table and similar to notify the ServerManager that a region was removed.
991   */
992  public void removeRegion(final RegionInfo regionInfo) {
993    final byte[] encodedName = regionInfo.getEncodedNameAsBytes();
994    storeFlushedSequenceIdsByRegion.remove(encodedName);
995    flushedSequenceIdByRegion.remove(encodedName);
996  }
997
998  @VisibleForTesting
999  public boolean isRegionInServerManagerStates(final RegionInfo hri) {
1000    final byte[] encodedName = hri.getEncodedNameAsBytes();
1001    return (storeFlushedSequenceIdsByRegion.containsKey(encodedName)
1002        || flushedSequenceIdByRegion.containsKey(encodedName));
1003  }
1004
1005  /**
1006   * Called by delete table and similar to notify the ServerManager that a region was removed.
1007   */
1008  public void removeRegions(final List<RegionInfo> regions) {
1009    for (RegionInfo hri: regions) {
1010      removeRegion(hri);
1011    }
1012  }
1013
1014  /**
1015   * May return 0 when server is not online.
1016   */
1017  public int getVersionNumber(ServerName serverName) {
1018    ServerMetrics serverMetrics = onlineServers.get(serverName);
1019    return serverMetrics != null ? serverMetrics.getVersionNumber() : 0;
1020  }
1021
1022  /**
1023   * May return "0.0.0" when server is not online
1024   */
1025  public String getVersion(ServerName serverName) {
1026    ServerMetrics serverMetrics = onlineServers.get(serverName);
1027    return serverMetrics != null ? serverMetrics.getVersion() : "0.0.0";
1028  }
1029
1030  public int getInfoPort(ServerName serverName) {
1031    ServerMetrics serverMetrics = onlineServers.get(serverName);
1032    return serverMetrics != null ? serverMetrics.getInfoServerPort() : 0;
1033  }
1034
1035  /**
1036   * Persist last flushed sequence id of each region to HDFS
1037   * @throws IOException if persit to HDFS fails
1038   */
1039  private void persistRegionLastFlushedSequenceIds() throws IOException {
1040    if (isFlushSeqIdPersistInProgress) {
1041      return;
1042    }
1043    isFlushSeqIdPersistInProgress = true;
1044    try {
1045      Configuration conf = master.getConfiguration();
1046      Path rootDir = CommonFSUtils.getRootDir(conf);
1047      Path lastFlushedSeqIdPath = new Path(rootDir, LAST_FLUSHED_SEQ_ID_FILE);
1048      FileSystem fs = FileSystem.get(conf);
1049      if (fs.exists(lastFlushedSeqIdPath)) {
1050        LOG.info("Rewriting .lastflushedseqids file at: "
1051            + lastFlushedSeqIdPath);
1052        if (!fs.delete(lastFlushedSeqIdPath, false)) {
1053          throw new IOException("Unable to remove existing "
1054              + lastFlushedSeqIdPath);
1055        }
1056      } else {
1057        LOG.info("Writing .lastflushedseqids file at: " + lastFlushedSeqIdPath);
1058      }
1059      FSDataOutputStream out = fs.create(lastFlushedSeqIdPath);
1060      FlushedSequenceId.Builder flushedSequenceIdBuilder =
1061          FlushedSequenceId.newBuilder();
1062      try {
1063        for (Entry<byte[], Long> entry : flushedSequenceIdByRegion.entrySet()) {
1064          FlushedRegionSequenceId.Builder flushedRegionSequenceIdBuilder =
1065              FlushedRegionSequenceId.newBuilder();
1066          flushedRegionSequenceIdBuilder.setRegionEncodedName(
1067              ByteString.copyFrom(entry.getKey()));
1068          flushedRegionSequenceIdBuilder.setSeqId(entry.getValue());
1069          ConcurrentNavigableMap<byte[], Long> storeSeqIds =
1070              storeFlushedSequenceIdsByRegion.get(entry.getKey());
1071          if (storeSeqIds != null) {
1072            for (Entry<byte[], Long> store : storeSeqIds.entrySet()) {
1073              FlushedStoreSequenceId.Builder flushedStoreSequenceIdBuilder =
1074                  FlushedStoreSequenceId.newBuilder();
1075              flushedStoreSequenceIdBuilder.setFamily(ByteString.copyFrom(store.getKey()));
1076              flushedStoreSequenceIdBuilder.setSeqId(store.getValue());
1077              flushedRegionSequenceIdBuilder.addStores(flushedStoreSequenceIdBuilder);
1078            }
1079          }
1080          flushedSequenceIdBuilder.addRegionSequenceId(flushedRegionSequenceIdBuilder);
1081        }
1082        flushedSequenceIdBuilder.build().writeDelimitedTo(out);
1083      } finally {
1084        if (out != null) {
1085          out.close();
1086        }
1087      }
1088    } finally {
1089      isFlushSeqIdPersistInProgress = false;
1090    }
1091  }
1092
1093  /**
1094   * Load last flushed sequence id of each region from HDFS, if persisted
1095   */
1096  public void loadLastFlushedSequenceIds() throws IOException {
1097    if (!persistFlushedSequenceId) {
1098      return;
1099    }
1100    Configuration conf = master.getConfiguration();
1101    Path rootDir = CommonFSUtils.getRootDir(conf);
1102    Path lastFlushedSeqIdPath = new Path(rootDir, LAST_FLUSHED_SEQ_ID_FILE);
1103    FileSystem fs = FileSystem.get(conf);
1104    if (!fs.exists(lastFlushedSeqIdPath)) {
1105      LOG.info("No .lastflushedseqids found at" + lastFlushedSeqIdPath
1106          + " will record last flushed sequence id"
1107          + " for regions by regionserver report all over again");
1108      return;
1109    } else {
1110      LOG.info("begin to load .lastflushedseqids at " + lastFlushedSeqIdPath);
1111    }
1112    FSDataInputStream in = fs.open(lastFlushedSeqIdPath);
1113    try {
1114      FlushedSequenceId flushedSequenceId =
1115          FlushedSequenceId.parseDelimitedFrom(in);
1116      if (flushedSequenceId == null) {
1117        LOG.info(".lastflushedseqids found at {} is empty", lastFlushedSeqIdPath);
1118        return;
1119      }
1120      for (FlushedRegionSequenceId flushedRegionSequenceId : flushedSequenceId
1121          .getRegionSequenceIdList()) {
1122        byte[] encodedRegionName = flushedRegionSequenceId
1123            .getRegionEncodedName().toByteArray();
1124        flushedSequenceIdByRegion
1125            .putIfAbsent(encodedRegionName, flushedRegionSequenceId.getSeqId());
1126        if (flushedRegionSequenceId.getStoresList() != null
1127            && flushedRegionSequenceId.getStoresList().size() != 0) {
1128          ConcurrentNavigableMap<byte[], Long> storeFlushedSequenceId =
1129              computeIfAbsent(storeFlushedSequenceIdsByRegion, encodedRegionName,
1130                () -> new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR));
1131          for (FlushedStoreSequenceId flushedStoreSequenceId : flushedRegionSequenceId
1132              .getStoresList()) {
1133            storeFlushedSequenceId
1134                .put(flushedStoreSequenceId.getFamily().toByteArray(),
1135                    flushedStoreSequenceId.getSeqId());
1136          }
1137        }
1138      }
1139    } finally {
1140      in.close();
1141    }
1142  }
1143
1144  /**
1145   * Regions may have been removed between latest persist of FlushedSequenceIds
1146   * and master abort. So after loading FlushedSequenceIds from file, and after
1147   * meta loaded, we need to remove the deleted region according to RegionStates.
1148   */
1149  public void removeDeletedRegionFromLoadedFlushedSequenceIds() {
1150    RegionStates regionStates = master.getAssignmentManager().getRegionStates();
1151    Iterator<byte[]> it = flushedSequenceIdByRegion.keySet().iterator();
1152    while(it.hasNext()) {
1153      byte[] regionEncodedName = it.next();
1154      if (regionStates.getRegionState(Bytes.toStringBinary(regionEncodedName)) == null) {
1155        it.remove();
1156        storeFlushedSequenceIdsByRegion.remove(regionEncodedName);
1157      }
1158    }
1159  }
1160
1161  private class FlushedSequenceIdFlusher extends ScheduledChore {
1162
1163    public FlushedSequenceIdFlusher(String name, int p) {
1164      super(name, master, p, 60 * 1000); //delay one minute before first execute
1165    }
1166
1167    @Override
1168    protected void chore() {
1169      try {
1170        persistRegionLastFlushedSequenceIds();
1171      } catch (IOException e) {
1172        LOG.debug("Failed to persist last flushed sequence id of regions"
1173            + " to file system", e);
1174      }
1175    }
1176  }
1177}