001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master;
019
020import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
021
022import java.io.IOException;
023import java.net.InetAddress;
024import java.util.ArrayList;
025import java.util.Collections;
026import java.util.HashSet;
027import java.util.Iterator;
028import java.util.List;
029import java.util.Map;
030import java.util.Map.Entry;
031import java.util.Objects;
032import java.util.Set;
033import java.util.concurrent.ConcurrentNavigableMap;
034import java.util.concurrent.ConcurrentSkipListMap;
035import java.util.concurrent.CopyOnWriteArrayList;
036import java.util.concurrent.atomic.AtomicBoolean;
037import java.util.function.Predicate;
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.fs.FSDataInputStream;
040import org.apache.hadoop.fs.FSDataOutputStream;
041import org.apache.hadoop.fs.FileSystem;
042import org.apache.hadoop.fs.Path;
043import org.apache.hadoop.hbase.ClockOutOfSyncException;
044import org.apache.hadoop.hbase.HConstants;
045import org.apache.hadoop.hbase.NotServingRegionException;
046import org.apache.hadoop.hbase.RegionMetrics;
047import org.apache.hadoop.hbase.ScheduledChore;
048import org.apache.hadoop.hbase.ServerMetrics;
049import org.apache.hadoop.hbase.ServerMetricsBuilder;
050import org.apache.hadoop.hbase.ServerName;
051import org.apache.hadoop.hbase.YouAreDeadException;
052import org.apache.hadoop.hbase.client.AsyncClusterConnection;
053import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
054import org.apache.hadoop.hbase.client.RegionInfo;
055import org.apache.hadoop.hbase.conf.ConfigurationObserver;
056import org.apache.hadoop.hbase.ipc.DecommissionedHostRejectedException;
057import org.apache.hadoop.hbase.ipc.RemoteWithExtrasException;
058import org.apache.hadoop.hbase.master.assignment.RegionStates;
059import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
060import org.apache.hadoop.hbase.monitoring.MonitoredTask;
061import org.apache.hadoop.hbase.procedure2.Procedure;
062import org.apache.hadoop.hbase.util.Bytes;
063import org.apache.hadoop.hbase.util.CommonFSUtils;
064import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
065import org.apache.hadoop.hbase.util.FutureUtils;
066import org.apache.hadoop.hbase.zookeeper.ZKUtil;
067import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
068import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
069import org.apache.yetus.audience.InterfaceAudience;
070import org.apache.zookeeper.KeeperException;
071import org.slf4j.Logger;
072import org.slf4j.LoggerFactory;
073
074import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
075import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
076
077import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
078import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
079import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
080import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
081import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.FlushedRegionSequenceId;
082import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.FlushedSequenceId;
083import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.FlushedStoreSequenceId;
084import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
085
086/**
087 * The ServerManager class manages info about region servers.
088 * <p>
089 * Maintains lists of online and dead servers. Processes the startups, shutdowns, and deaths of
090 * region servers.
091 * <p>
092 * Servers are distinguished in two different ways. A given server has a location, specified by
093 * hostname and port, and of which there can only be one online at any given time. A server instance
094 * is specified by the location (hostname and port) as well as the startcode (timestamp from when
095 * the server was started). This is used to differentiate a restarted instance of a given server
096 * from the original instance.
097 * <p>
098 * If a sever is known not to be running any more, it is called dead. The dead server needs to be
099 * handled by a ServerShutdownHandler. If the handler is not enabled yet, the server can't be
100 * handled right away so it is queued up. After the handler is enabled, the server will be submitted
101 * to a handler to handle. However, the handler may be just partially enabled. If so, the server
102 * cannot be fully processed, and be queued up for further processing. A server is fully processed
103 * only after the handler is fully enabled and has completed the handling.
104 */
105@InterfaceAudience.Private
106public class ServerManager implements ConfigurationObserver {
107  public static final String WAIT_ON_REGIONSERVERS_MAXTOSTART =
108    "hbase.master.wait.on.regionservers.maxtostart";
109
110  public static final String WAIT_ON_REGIONSERVERS_MINTOSTART =
111    "hbase.master.wait.on.regionservers.mintostart";
112
113  public static final String WAIT_ON_REGIONSERVERS_TIMEOUT =
114    "hbase.master.wait.on.regionservers.timeout";
115
116  public static final String WAIT_ON_REGIONSERVERS_INTERVAL =
117    "hbase.master.wait.on.regionservers.interval";
118
119  /**
120   * see HBASE-20727 if set to true, flushedSequenceIdByRegion and storeFlushedSequenceIdsByRegion
121   * will be persisted to HDFS and loaded when master restart to speed up log split
122   */
123  public static final String PERSIST_FLUSHEDSEQUENCEID =
124    "hbase.master.persist.flushedsequenceid.enabled";
125
126  public static final boolean PERSIST_FLUSHEDSEQUENCEID_DEFAULT = true;
127
128  public static final String FLUSHEDSEQUENCEID_FLUSHER_INTERVAL =
129    "hbase.master.flushedsequenceid.flusher.interval";
130
131  public static final int FLUSHEDSEQUENCEID_FLUSHER_INTERVAL_DEFAULT = 3 * 60 * 60 * 1000; // 3
132                                                                                           // hours
133
134  public static final String MAX_CLOCK_SKEW_MS = "hbase.master.maxclockskew";
135
136  private static final Logger LOG = LoggerFactory.getLogger(ServerManager.class);
137
138  // Set if we are to shutdown the cluster.
139  private AtomicBoolean clusterShutdown = new AtomicBoolean(false);
140
141  /**
142   * The last flushed sequence id for a region.
143   */
144  private final ConcurrentNavigableMap<byte[], Long> flushedSequenceIdByRegion =
145    new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
146
147  private boolean persistFlushedSequenceId = true;
148  private volatile boolean isFlushSeqIdPersistInProgress = false;
149  /** File on hdfs to store last flushed sequence id of regions */
150  private static final String LAST_FLUSHED_SEQ_ID_FILE = ".lastflushedseqids";
151  private FlushedSequenceIdFlusher flushedSeqIdFlusher;
152
153  /**
154   * The last flushed sequence id for a store in a region.
155   */
156  private final ConcurrentNavigableMap<byte[],
157    ConcurrentNavigableMap<byte[], Long>> storeFlushedSequenceIdsByRegion =
158      new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
159
160  /** Map of registered servers to their current load */
161  private final ConcurrentNavigableMap<ServerName, ServerMetrics> onlineServers =
162    new ConcurrentSkipListMap<>();
163
164  /** List of region servers that should not get any more new regions. */
165  private final ArrayList<ServerName> drainingServers = new ArrayList<>();
166
167  private final MasterServices master;
168  private final RegionServerList storage;
169
170  private final DeadServer deadservers = new DeadServer();
171
172  private final long maxSkew;
173  private final long warningSkew;
174
175  /** Listeners that are called on server events. */
176  private List<ServerListener> listeners = new CopyOnWriteArrayList<>();
177
178  /** Configured value of HConstants.REJECT_DECOMMISSIONED_HOSTS_KEY */
179  private volatile boolean rejectDecommissionedHostsConfig;
180
181  /**
182   * Constructor.
183   */
184  public ServerManager(final MasterServices master, RegionServerList storage) {
185    this.master = master;
186    this.storage = storage;
187    Configuration c = master.getConfiguration();
188    maxSkew = c.getLong(MAX_CLOCK_SKEW_MS, 30000);
189    warningSkew = c.getLong("hbase.master.warningclockskew", 10000);
190    persistFlushedSequenceId =
191      c.getBoolean(PERSIST_FLUSHEDSEQUENCEID, PERSIST_FLUSHEDSEQUENCEID_DEFAULT);
192    rejectDecommissionedHostsConfig = getRejectDecommissionedHostsConfig(c);
193  }
194
195  /**
196   * Implementation of the ConfigurationObserver interface. We are interested in live-loading the
197   * configuration value of HConstants.REJECT_DECOMMISSIONED_HOSTS_KEY
198   * @param conf Server configuration instance
199   */
200  @Override
201  public void onConfigurationChange(Configuration conf) {
202    final boolean newValue = getRejectDecommissionedHostsConfig(conf);
203    if (rejectDecommissionedHostsConfig == newValue) {
204      // no-op
205      return;
206    }
207
208    LOG.info("Config Reload for RejectDecommissionedHosts. previous value: {}, new value: {}",
209      rejectDecommissionedHostsConfig, newValue);
210
211    rejectDecommissionedHostsConfig = newValue;
212  }
213
214  /**
215   * Reads the value of HConstants.REJECT_DECOMMISSIONED_HOSTS_KEY from the config and returns it
216   * @param conf Configuration instance of the Master
217   */
218  public boolean getRejectDecommissionedHostsConfig(Configuration conf) {
219    return conf.getBoolean(HConstants.REJECT_DECOMMISSIONED_HOSTS_KEY,
220      HConstants.REJECT_DECOMMISSIONED_HOSTS_DEFAULT);
221  }
222
223  /**
224   * Add the listener to the notification list.
225   * @param listener The ServerListener to register
226   */
227  public void registerListener(final ServerListener listener) {
228    this.listeners.add(listener);
229  }
230
231  /**
232   * Remove the listener from the notification list.
233   * @param listener The ServerListener to unregister
234   */
235  public boolean unregisterListener(final ServerListener listener) {
236    return this.listeners.remove(listener);
237  }
238
239  /**
240   * Removes all of the ServerListeners of this collection that satisfy the given predicate.
241   * @param filter a predicate which returns true for ServerListener to be removed
242   */
243  public boolean unregisterListenerIf(final Predicate<ServerListener> filter) {
244    return this.listeners.removeIf(filter);
245  }
246
247  /**
248   * Let the server manager know a new regionserver has come online
249   * @param request       the startup request
250   * @param versionNumber the version number of the new regionserver
251   * @param version       the version of the new regionserver, could contain strings like "SNAPSHOT"
252   * @param ia            the InetAddress from which request is received
253   * @return The ServerName we know this server as.
254   */
255  ServerName regionServerStartup(RegionServerStartupRequest request, int versionNumber,
256    String version, InetAddress ia) throws IOException {
257    // Test for case where we get a region startup message from a regionserver
258    // that has been quickly restarted but whose znode expiration handler has
259    // not yet run, or from a server whose fail we are currently processing.
260    // Test its host+port combo is present in serverAddressToServerInfo. If it
261    // is, reject the server and trigger its expiration. The next time it comes
262    // in, it should have been removed from serverAddressToServerInfo and queued
263    // for processing by ProcessServerShutdown.
264
265    // if use-ip is enabled, we will use ip to expose Master/RS service for client,
266    // see HBASE-27304 for details.
267    boolean useIp = master.getConfiguration().getBoolean(HConstants.HBASE_SERVER_USEIP_ENABLED_KEY,
268      HConstants.HBASE_SERVER_USEIP_ENABLED_DEFAULT);
269    String isaHostName = useIp ? ia.getHostAddress() : ia.getHostName();
270    final String hostname =
271      request.hasUseThisHostnameInstead() ? request.getUseThisHostnameInstead() : isaHostName;
272    ServerName sn = ServerName.valueOf(hostname, request.getPort(), request.getServerStartCode());
273
274    // Check if the host should be rejected based on it's decommissioned status
275    checkRejectableDecommissionedStatus(sn);
276
277    checkClockSkew(sn, request.getServerCurrentTime());
278    checkIsDead(sn, "STARTUP");
279    if (!checkAndRecordNewServer(sn, ServerMetricsBuilder.of(sn, versionNumber, version))) {
280      LOG.warn("THIS SHOULD NOT HAPPEN, RegionServerStartup could not record the server: {}", sn);
281    }
282    storage.started(sn);
283    return sn;
284  }
285
286  /**
287   * Updates last flushed sequence Ids for the regions on server sn
288   */
289  private void updateLastFlushedSequenceIds(ServerName sn, ServerMetrics hsl) {
290    for (Entry<byte[], RegionMetrics> entry : hsl.getRegionMetrics().entrySet()) {
291      byte[] encodedRegionName = Bytes.toBytes(RegionInfo.encodeRegionName(entry.getKey()));
292      Long existingValue = flushedSequenceIdByRegion.get(encodedRegionName);
293      long l = entry.getValue().getCompletedSequenceId();
294      // Don't let smaller sequence ids override greater sequence ids.
295      if (LOG.isTraceEnabled()) {
296        LOG.trace(Bytes.toString(encodedRegionName) + ", existingValue=" + existingValue
297          + ", completeSequenceId=" + l);
298      }
299      if (existingValue == null || (l != HConstants.NO_SEQNUM && l > existingValue)) {
300        flushedSequenceIdByRegion.put(encodedRegionName, l);
301      } else if (l != HConstants.NO_SEQNUM && l < existingValue) {
302        LOG.warn("RegionServer " + sn + " indicates a last flushed sequence id (" + l
303          + ") that is less than the previous last flushed sequence id (" + existingValue
304          + ") for region " + Bytes.toString(entry.getKey()) + " Ignoring.");
305      }
306      ConcurrentNavigableMap<byte[], Long> storeFlushedSequenceId =
307        computeIfAbsent(storeFlushedSequenceIdsByRegion, encodedRegionName,
308          () -> new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR));
309      for (Entry<byte[], Long> storeSeqId : entry.getValue().getStoreSequenceId().entrySet()) {
310        byte[] family = storeSeqId.getKey();
311        existingValue = storeFlushedSequenceId.get(family);
312        l = storeSeqId.getValue();
313        if (LOG.isTraceEnabled()) {
314          LOG.trace(Bytes.toString(encodedRegionName) + ", family=" + Bytes.toString(family)
315            + ", existingValue=" + existingValue + ", completeSequenceId=" + l);
316        }
317        // Don't let smaller sequence ids override greater sequence ids.
318        if (existingValue == null || (l != HConstants.NO_SEQNUM && l > existingValue.longValue())) {
319          storeFlushedSequenceId.put(family, l);
320        }
321      }
322    }
323  }
324
325  public void regionServerReport(ServerName sn, ServerMetrics sl) throws YouAreDeadException {
326    checkIsDead(sn, "REPORT");
327    if (null == this.onlineServers.replace(sn, sl)) {
328      // Already have this host+port combo and its just different start code?
329      // Just let the server in. Presume master joining a running cluster.
330      // recordNewServer is what happens at the end of reportServerStartup.
331      // The only thing we are skipping is passing back to the regionserver
332      // the ServerName to use. Here we presume a master has already done
333      // that so we'll press on with whatever it gave us for ServerName.
334      if (!checkAndRecordNewServer(sn, sl)) {
335        // Master already registered server with same (host + port) and higher startcode.
336        // This can happen if regionserver report comes late from old server
337        // (possible race condition), by that time master has already processed SCP for that
338        // server and started accepting regionserver report from new server i.e. server with
339        // same (host + port) and higher startcode.
340        // The exception thrown here is not meant to tell the region server it is dead because if
341        // there is a new server on the same host port, the old server should have already been
342        // dead in ideal situation.
343        // The exception thrown here is to skip the later steps of the whole regionServerReport
344        // request processing. Usually, after recording it in ServerManager, we will call the
345        // related methods in AssignmentManager to record region states. If the region server
346        // is already dead, we should not do these steps anymore, so here we throw an exception
347        // to let the upper layer know that they should not continue processing anymore.
348        final String errorMsg = "RegionServerReport received from " + sn
349          + ", but another server with the same name and higher startcode is already registered,"
350          + " ignoring";
351        LOG.warn(errorMsg);
352        throw new YouAreDeadException(errorMsg);
353      }
354    }
355    updateLastFlushedSequenceIds(sn, sl);
356  }
357
358  /**
359   * Checks if the Master is configured to reject decommissioned hosts or not. When it's configured
360   * to do so, any RegionServer trying to join the cluster will have it's host checked against the
361   * list of hosts of currently decommissioned servers and potentially get prevented from reporting
362   * for duty; otherwise, we do nothing and we let them pass to the next check. See HBASE-28342 for
363   * details.
364   * @param sn The ServerName to check for
365   * @throws DecommissionedHostRejectedException if the Master is configured to reject
366   *                                             decommissioned hosts and this host exists in the
367   *                                             list of the decommissioned servers
368   */
369  private void checkRejectableDecommissionedStatus(ServerName sn)
370    throws DecommissionedHostRejectedException {
371    LOG.info("Checking decommissioned status of RegionServer {}", sn.getServerName());
372
373    // If the Master is not configured to reject decommissioned hosts, return early.
374    if (!rejectDecommissionedHostsConfig) {
375      return;
376    }
377
378    // Look for a match for the hostname in the list of decommissioned servers
379    for (ServerName server : getDrainingServersList()) {
380      if (Objects.equals(server.getHostname(), sn.getHostname())) {
381        // Found a match and master is configured to reject decommissioned hosts, throw exception!
382        LOG.warn(
383          "Rejecting RegionServer {} from reporting for duty because Master is configured "
384            + "to reject decommissioned hosts and this host was marked as such in the past.",
385          sn.getServerName());
386        throw new DecommissionedHostRejectedException(String.format(
387          "Host %s exists in the list of decommissioned servers and Master is configured to "
388            + "reject decommissioned hosts",
389          sn.getHostname()));
390      }
391    }
392  }
393
394  /**
395   * Check is a server of same host and port already exists, if not, or the existed one got a
396   * smaller start code, record it.
397   * @param serverName the server to check and record
398   * @param sl         the server load on the server
399   * @return true if the server is recorded, otherwise, false
400   */
401  boolean checkAndRecordNewServer(final ServerName serverName, final ServerMetrics sl) {
402    ServerName existingServer = null;
403    synchronized (this.onlineServers) {
404      existingServer = findServerWithSameHostnamePortWithLock(serverName);
405      if (existingServer != null && (existingServer.getStartcode() > serverName.getStartcode())) {
406        LOG.info("Server serverName=" + serverName + " rejected; we already have "
407          + existingServer.toString() + " registered with same hostname and port");
408        return false;
409      }
410      recordNewServerWithLock(serverName, sl);
411    }
412
413    // Tell our listeners that a server was added
414    if (!this.listeners.isEmpty()) {
415      for (ServerListener listener : this.listeners) {
416        listener.serverAdded(serverName);
417      }
418    }
419
420    // Note that we assume that same ts means same server, and don't expire in that case.
421    // TODO: ts can theoretically collide due to clock shifts, so this is a bit hacky.
422    if (existingServer != null && (existingServer.getStartcode() < serverName.getStartcode())) {
423      LOG.info("Triggering server recovery; existingServer " + existingServer
424        + " looks stale, new server:" + serverName);
425      expireServer(existingServer);
426    }
427    return true;
428  }
429
430  /**
431   * Find out the region servers crashed between the crash of the previous master instance and the
432   * current master instance and schedule SCP for them.
433   * <p/>
434   * Since the {@code RegionServerTracker} has already helped us to construct the online servers set
435   * by scanning zookeeper, now we can compare the online servers with {@code liveServersFromWALDir}
436   * to find out whether there are servers which are already dead.
437   * <p/>
438   * Must be called inside the initialization method of {@code RegionServerTracker} to avoid
439   * concurrency issue.
440   * @param deadServersWithDeathTimeFromPE the region servers which already have an SCP associated,
441   *                                       have time of death as value.
442   * @param liveServersFromWALDir          the live region servers from wal directory.
443   */
444  void findDeadServersAndProcess(Map<ServerName, Long> deadServersWithDeathTimeFromPE,
445    Set<ServerName> liveServersFromWALDir) {
446    deadServersWithDeathTimeFromPE.forEach(deadservers::putIfAbsent);
447    liveServersFromWALDir.stream().filter(sn -> !onlineServers.containsKey(sn))
448      .forEach(this::expireServer);
449  }
450
451  /**
452   * Checks if the clock skew between the server and the master. If the clock skew exceeds the
453   * configured max, it will throw an exception; if it exceeds the configured warning threshold, it
454   * will log a warning but start normally.
455   * @param serverName Incoming servers's name
456   * @throws ClockOutOfSyncException if the skew exceeds the configured max value
457   */
458  private void checkClockSkew(final ServerName serverName, final long serverCurrentTime)
459    throws ClockOutOfSyncException {
460    long skew = Math.abs(EnvironmentEdgeManager.currentTime() - serverCurrentTime);
461    if (skew > maxSkew) {
462      String message = "Server " + serverName + " has been "
463        + "rejected; Reported time is too far out of sync with master.  " + "Time difference of "
464        + skew + "ms > max allowed of " + maxSkew + "ms";
465      LOG.warn(message);
466      throw new ClockOutOfSyncException(message);
467    } else if (skew > warningSkew) {
468      String message = "Reported time for server " + serverName + " is out of sync with master "
469        + "by " + skew + "ms. (Warning threshold is " + warningSkew + "ms; " + "error threshold is "
470        + maxSkew + "ms)";
471      LOG.warn(message);
472    }
473  }
474
475  /**
476   * Called when RegionServer first reports in for duty and thereafter each time it heartbeats to
477   * make sure it is has not been figured for dead. If this server is on the dead list, reject it
478   * with a YouAreDeadException. If it was dead but came back with a new start code, remove the old
479   * entry from the dead list.
480   * @param what START or REPORT
481   */
482  private void checkIsDead(final ServerName serverName, final String what)
483    throws YouAreDeadException {
484    if (this.deadservers.isDeadServer(serverName)) {
485      // Exact match: host name, port and start code all match with existing one of the
486      // dead servers. So, this server must be dead. Tell it to kill itself.
487      String message =
488        "Server " + what + " rejected; currently processing " + serverName + " as dead server";
489      LOG.debug(message);
490      throw new YouAreDeadException(message);
491    }
492    // Remove dead server with same hostname and port of newly checking in rs after master
493    // initialization. See HBASE-5916 for more information.
494    if (
495      (this.master == null || this.master.isInitialized())
496        && this.deadservers.cleanPreviousInstance(serverName)
497    ) {
498      // This server has now become alive after we marked it as dead.
499      // We removed it's previous entry from the dead list to reflect it.
500      LOG.debug("{} {} came back up, removed it from the dead servers list", what, serverName);
501    }
502  }
503
504  /**
505   * Assumes onlineServers is locked.
506   * @return ServerName with matching hostname and port.
507   */
508  public ServerName findServerWithSameHostnamePortWithLock(final ServerName serverName) {
509    ServerName end =
510      ServerName.valueOf(serverName.getHostname(), serverName.getPort(), Long.MAX_VALUE);
511
512    ServerName r = onlineServers.lowerKey(end);
513    if (r != null) {
514      if (ServerName.isSameAddress(r, serverName)) {
515        return r;
516      }
517    }
518    return null;
519  }
520
521  /**
522   * Adds the onlineServers list. onlineServers should be locked.
523   * @param serverName The remote servers name.
524   */
525  void recordNewServerWithLock(final ServerName serverName, final ServerMetrics sl) {
526    LOG.info("Registering regionserver=" + serverName);
527    this.onlineServers.put(serverName, sl);
528    master.getAssignmentManager().getRegionStates().createServer(serverName);
529  }
530
531  public ConcurrentNavigableMap<byte[], Long> getFlushedSequenceIdByRegion() {
532    return flushedSequenceIdByRegion;
533  }
534
535  public RegionStoreSequenceIds getLastFlushedSequenceId(byte[] encodedRegionName) {
536    RegionStoreSequenceIds.Builder builder = RegionStoreSequenceIds.newBuilder();
537    Long seqId = flushedSequenceIdByRegion.get(encodedRegionName);
538    builder.setLastFlushedSequenceId(seqId != null ? seqId.longValue() : HConstants.NO_SEQNUM);
539    Map<byte[], Long> storeFlushedSequenceId =
540      storeFlushedSequenceIdsByRegion.get(encodedRegionName);
541    if (storeFlushedSequenceId != null) {
542      for (Map.Entry<byte[], Long> entry : storeFlushedSequenceId.entrySet()) {
543        builder.addStoreSequenceId(StoreSequenceId.newBuilder()
544          .setFamilyName(UnsafeByteOperations.unsafeWrap(entry.getKey()))
545          .setSequenceId(entry.getValue().longValue()).build());
546      }
547    }
548    return builder.build();
549  }
550
551  /** Returns ServerMetrics if serverName is known else null */
552  public ServerMetrics getLoad(final ServerName serverName) {
553    return this.onlineServers.get(serverName);
554  }
555
556  /**
557   * Compute the average load across all region servers. Currently, this uses a very naive
558   * computation - just uses the number of regions being served, ignoring stats about number of
559   * requests.
560   * @return the average load
561   */
562  public double getAverageLoad() {
563    int totalLoad = 0;
564    int numServers = 0;
565    for (ServerMetrics sl : this.onlineServers.values()) {
566      numServers++;
567      totalLoad += sl.getRegionMetrics().size();
568    }
569    return numServers == 0 ? 0 : (double) totalLoad / (double) numServers;
570  }
571
572  /** Returns the count of active regionservers */
573  public int countOfRegionServers() {
574    // Presumes onlineServers is a concurrent map
575    return this.onlineServers.size();
576  }
577
578  /** Returns Read-only map of servers to serverinfo */
579  public Map<ServerName, ServerMetrics> getOnlineServers() {
580    // Presumption is that iterating the returned Map is OK.
581    synchronized (this.onlineServers) {
582      return Collections.unmodifiableMap(this.onlineServers);
583    }
584  }
585
586  public DeadServer getDeadServers() {
587    return this.deadservers;
588  }
589
590  /**
591   * Checks if any dead servers are currently in progress.
592   * @return true if any RS are being processed as dead, false if not
593   */
594  public boolean areDeadServersInProgress() throws IOException {
595    return master.getProcedures().stream()
596      .anyMatch(p -> !p.isFinished() && p instanceof ServerCrashProcedure);
597  }
598
599  void letRegionServersShutdown() {
600    long previousLogTime = 0;
601    ServerName sn = master.getServerName();
602    ZKWatcher zkw = master.getZooKeeper();
603    int onlineServersCt;
604    while ((onlineServersCt = onlineServers.size()) > 0) {
605      if (EnvironmentEdgeManager.currentTime() > (previousLogTime + 1000)) {
606        Set<ServerName> remainingServers = onlineServers.keySet();
607        synchronized (onlineServers) {
608          if (remainingServers.size() == 1 && remainingServers.contains(sn)) {
609            // Master will delete itself later.
610            return;
611          }
612        }
613        StringBuilder sb = new StringBuilder();
614        // It's ok here to not sync on onlineServers - merely logging
615        for (ServerName key : remainingServers) {
616          if (sb.length() > 0) {
617            sb.append(", ");
618          }
619          sb.append(key);
620        }
621        LOG.info("Waiting on regionserver(s) " + sb.toString());
622        previousLogTime = EnvironmentEdgeManager.currentTime();
623      }
624
625      try {
626        List<String> servers = getRegionServersInZK(zkw);
627        if (
628          servers == null || servers.isEmpty()
629            || (servers.size() == 1 && servers.contains(sn.toString()))
630        ) {
631          LOG.info("ZK shows there is only the master self online, exiting now");
632          // Master could have lost some ZK events, no need to wait more.
633          break;
634        }
635      } catch (KeeperException ke) {
636        LOG.warn("Failed to list regionservers", ke);
637        // ZK is malfunctioning, don't hang here
638        break;
639      }
640      synchronized (onlineServers) {
641        try {
642          if (onlineServersCt == onlineServers.size()) onlineServers.wait(100);
643        } catch (InterruptedException ignored) {
644          // continue
645        }
646      }
647    }
648  }
649
650  private List<String> getRegionServersInZK(final ZKWatcher zkw) throws KeeperException {
651    return ZKUtil.listChildrenNoWatch(zkw, zkw.getZNodePaths().rsZNode);
652  }
653
654  /**
655   * Expire the passed server. Add it to list of dead servers and queue a shutdown processing.
656   * @return pid if we queued a ServerCrashProcedure else {@link Procedure#NO_PROC_ID} if we did not
657   *         (could happen for many reasons including the fact that its this server that is going
658   *         down or we already have queued an SCP for this server or SCP processing is currently
659   *         disabled because we are in startup phase).
660   */
661  // Redo test so we can make this protected.
662  public synchronized long expireServer(final ServerName serverName) {
663    return expireServer(serverName, false);
664
665  }
666
667  synchronized long expireServer(final ServerName serverName, boolean force) {
668    // THIS server is going down... can't handle our own expiration.
669    if (serverName.equals(master.getServerName())) {
670      if (!(master.isAborted() || master.isStopped())) {
671        master.stop("We lost our znode?");
672      }
673      return Procedure.NO_PROC_ID;
674    }
675    if (this.deadservers.isDeadServer(serverName)) {
676      LOG.warn("Expiration called on {} but already in DeadServer", serverName);
677      return Procedure.NO_PROC_ID;
678    }
679    moveFromOnlineToDeadServers(serverName);
680
681    // If server is in draining mode, remove corresponding znode
682    // In some tests, the mocked HM may not have ZK Instance, hence null check
683    if (master.getZooKeeper() != null) {
684      String drainingZnode = ZNodePaths
685        .joinZNode(master.getZooKeeper().getZNodePaths().drainingZNode, serverName.getServerName());
686      try {
687        ZKUtil.deleteNodeFailSilent(master.getZooKeeper(), drainingZnode);
688      } catch (KeeperException e) {
689        LOG.warn(
690          "Error deleting the draining znode for stopping server " + serverName.getServerName(), e);
691      }
692    }
693
694    // If cluster is going down, yes, servers are going to be expiring; don't
695    // process as a dead server
696    if (isClusterShutdown()) {
697      LOG.info("Cluster shutdown set; " + serverName + " expired; onlineServers="
698        + this.onlineServers.size());
699      if (this.onlineServers.isEmpty()) {
700        master.stop("Cluster shutdown set; onlineServer=0");
701      }
702      return Procedure.NO_PROC_ID;
703    }
704    LOG.info("Processing expiration of " + serverName + " on " + this.master.getServerName());
705    long pid = master.getAssignmentManager().submitServerCrash(serverName, true, force);
706    if (pid == Procedure.NO_PROC_ID) {
707      // skip later processing as we failed to submit SCP
708      return Procedure.NO_PROC_ID;
709    }
710    storage.expired(serverName);
711    // Tell our listeners that a server was removed
712    if (!this.listeners.isEmpty()) {
713      this.listeners.stream().forEach(l -> l.serverRemoved(serverName));
714    }
715    // trigger a persist of flushedSeqId
716    if (flushedSeqIdFlusher != null) {
717      flushedSeqIdFlusher.triggerNow();
718    }
719    return pid;
720  }
721
722  /**
723   * Called when server has expired.
724   */
725  // Locking in this class needs cleanup.
726  public synchronized void moveFromOnlineToDeadServers(final ServerName sn) {
727    synchronized (this.onlineServers) {
728      boolean online = this.onlineServers.containsKey(sn);
729      if (online) {
730        // Remove the server from the known servers lists and update load info BUT
731        // add to deadservers first; do this so it'll show in dead servers list if
732        // not in online servers list.
733        this.deadservers.putIfAbsent(sn);
734        this.onlineServers.remove(sn);
735        onlineServers.notifyAll();
736      } else {
737        // If not online, that is odd but may happen if 'Unknown Servers' -- where meta
738        // has references to servers not online nor in dead servers list. If
739        // 'Unknown Server', don't add to DeadServers else will be there for ever.
740        LOG.trace("Expiration of {} but server not online", sn);
741      }
742    }
743  }
744
745  /*
746   * Remove the server from the drain list.
747   */
748  public synchronized boolean removeServerFromDrainList(final ServerName sn) {
749    LOG.info("Removing server {} from the draining list.", sn);
750
751    // Remove the server from the draining servers lists.
752    return this.drainingServers.remove(sn);
753  }
754
755  /**
756   * Add the server to the drain list.
757   * @return True if the server is added or the server is already on the drain list.
758   */
759  public synchronized boolean addServerToDrainList(final ServerName sn) {
760    // If master is not rejecting decommissioned hosts, warn if the server (sn) is not online.
761    // However, we want to add servers even if they're not online if the master is configured
762    // to reject decommissioned hosts
763    if (!rejectDecommissionedHostsConfig && !this.isServerOnline(sn)) {
764      LOG.warn("Server {} is not currently online. Ignoring request to add it to draining list.",
765        sn);
766      return false;
767    }
768
769    // Add the server to the draining servers lists, if it's not already in it.
770    if (this.drainingServers.contains(sn)) {
771      LOG.warn(
772        "Server {} is already in the draining server list. Ignoring request to add it again.", sn);
773      return true;
774    }
775
776    LOG.info("Server {} added to draining server list.", sn);
777    return this.drainingServers.add(sn);
778  }
779
780  /**
781   * Contacts a region server and waits up to timeout ms to close the region. This bypasses the
782   * active hmaster. Pass -1 as timeout if you do not want to wait on result.
783   */
784  public static void closeRegionSilentlyAndWait(AsyncClusterConnection connection,
785    ServerName server, RegionInfo region, long timeout) throws IOException, InterruptedException {
786    AsyncRegionServerAdmin admin = connection.getRegionServerAdmin(server);
787    try {
788      FutureUtils.get(
789        admin.closeRegion(ProtobufUtil.buildCloseRegionRequest(server, region.getRegionName())));
790    } catch (IOException e) {
791      LOG.warn("Exception when closing region: " + region.getRegionNameAsString(), e);
792    }
793    if (timeout < 0) {
794      return;
795    }
796    long expiration = timeout + EnvironmentEdgeManager.currentTime();
797    while (EnvironmentEdgeManager.currentTime() < expiration) {
798      try {
799        RegionInfo rsRegion = ProtobufUtil.toRegionInfo(FutureUtils
800          .get(
801            admin.getRegionInfo(RequestConverter.buildGetRegionInfoRequest(region.getRegionName())))
802          .getRegionInfo());
803        if (rsRegion == null) {
804          return;
805        }
806      } catch (IOException ioe) {
807        if (
808          ioe instanceof NotServingRegionException
809            || (ioe instanceof RemoteWithExtrasException && ((RemoteWithExtrasException) ioe)
810              .unwrapRemoteException() instanceof NotServingRegionException)
811        ) {
812          // no need to retry again
813          return;
814        }
815        LOG.warn("Exception when retrieving regioninfo from: " + region.getRegionNameAsString(),
816          ioe);
817      }
818      Thread.sleep(1000);
819    }
820    throw new IOException("Region " + region + " failed to close within" + " timeout " + timeout);
821  }
822
823  /**
824   * Calculate min necessary to start. This is not an absolute. It is just a friction that will
825   * cause us hang around a bit longer waiting on RegionServers to check-in.
826   */
827  private int getMinToStart() {
828    if (master.isInMaintenanceMode()) {
829      // If in maintenance mode, then in process region server hosting meta will be the only server
830      // available
831      return 1;
832    }
833
834    int minimumRequired = 1;
835    int minToStart = this.master.getConfiguration().getInt(WAIT_ON_REGIONSERVERS_MINTOSTART, -1);
836    // Ensure we are never less than minimumRequired else stuff won't work.
837    return Math.max(minToStart, minimumRequired);
838  }
839
840  /**
841   * Wait for the region servers to report in. We will wait until one of this condition is met: -
842   * the master is stopped - the 'hbase.master.wait.on.regionservers.maxtostart' number of region
843   * servers is reached - the 'hbase.master.wait.on.regionservers.mintostart' is reached AND there
844   * have been no new region server in for 'hbase.master.wait.on.regionservers.interval' time AND
845   * the 'hbase.master.wait.on.regionservers.timeout' is reached
846   */
847  public void waitForRegionServers(MonitoredTask status) throws InterruptedException {
848    final long interval =
849      this.master.getConfiguration().getLong(WAIT_ON_REGIONSERVERS_INTERVAL, 1500);
850    final long timeout =
851      this.master.getConfiguration().getLong(WAIT_ON_REGIONSERVERS_TIMEOUT, 4500);
852    // Min is not an absolute; just a friction making us wait longer on server checkin.
853    int minToStart = getMinToStart();
854    int maxToStart =
855      this.master.getConfiguration().getInt(WAIT_ON_REGIONSERVERS_MAXTOSTART, Integer.MAX_VALUE);
856    if (maxToStart < minToStart) {
857      LOG.warn(String.format("The value of '%s' (%d) is set less than '%s' (%d), ignoring.",
858        WAIT_ON_REGIONSERVERS_MAXTOSTART, maxToStart, WAIT_ON_REGIONSERVERS_MINTOSTART,
859        minToStart));
860      maxToStart = Integer.MAX_VALUE;
861    }
862
863    long now = EnvironmentEdgeManager.currentTime();
864    final long startTime = now;
865    long slept = 0;
866    long lastLogTime = 0;
867    long lastCountChange = startTime;
868    int count = countOfRegionServers();
869    int oldCount = 0;
870    // This while test is a little hard to read. We try to comment it in below but in essence:
871    // Wait if Master is not stopped and the number of regionservers that have checked-in is
872    // less than the maxToStart. Both of these conditions will be true near universally.
873    // Next, we will keep cycling if ANY of the following three conditions are true:
874    // 1. The time since a regionserver registered is < interval (means servers are actively
875    // checking in).
876    // 2. We are under the total timeout.
877    // 3. The count of servers is < minimum.
878    for (ServerListener listener : this.listeners) {
879      listener.waiting();
880    }
881    while (
882      !this.master.isStopped() && !isClusterShutdown() && count < maxToStart
883        && ((lastCountChange + interval) > now || timeout > slept || count < minToStart)
884    ) {
885      // Log some info at every interval time or if there is a change
886      if (oldCount != count || lastLogTime + interval < now) {
887        lastLogTime = now;
888        String msg =
889          "Waiting on regionserver count=" + count + "; waited=" + slept + "ms, expecting min="
890            + minToStart + " server(s), max=" + getStrForMax(maxToStart) + " server(s), "
891            + "timeout=" + timeout + "ms, lastChange=" + (now - lastCountChange) + "ms";
892        LOG.info(msg);
893        status.setStatus(msg);
894      }
895
896      // We sleep for some time
897      final long sleepTime = 50;
898      Thread.sleep(sleepTime);
899      now = EnvironmentEdgeManager.currentTime();
900      slept = now - startTime;
901
902      oldCount = count;
903      count = countOfRegionServers();
904      if (count != oldCount) {
905        lastCountChange = now;
906      }
907    }
908    // Did we exit the loop because cluster is going down?
909    if (isClusterShutdown()) {
910      this.master.stop("Cluster shutdown");
911    }
912    LOG.info("Finished waiting on RegionServer count=" + count + "; waited=" + slept + "ms,"
913      + " expected min=" + minToStart + " server(s), max=" + getStrForMax(maxToStart)
914      + " server(s)," + " master is " + (this.master.isStopped() ? "stopped." : "running"));
915  }
916
917  private String getStrForMax(final int max) {
918    return max == Integer.MAX_VALUE ? "NO_LIMIT" : Integer.toString(max);
919  }
920
921  /** Returns A copy of the internal list of online servers. */
922  public List<ServerName> getOnlineServersList() {
923    // TODO: optimize the load balancer call so we don't need to make a new list
924    // TODO: FIX. THIS IS POPULAR CALL.
925    return new ArrayList<>(this.onlineServers.keySet());
926  }
927
928  /**
929   * @param keys                 The target server name
930   * @param idleServerPredicator Evaluates the server on the given load
931   * @return A copy of the internal list of online servers matched by the predicator
932   */
933  public List<ServerName> getOnlineServersListWithPredicator(List<ServerName> keys,
934    Predicate<ServerMetrics> idleServerPredicator) {
935    List<ServerName> names = new ArrayList<>();
936    if (keys != null && idleServerPredicator != null) {
937      keys.forEach(name -> {
938        ServerMetrics load = onlineServers.get(name);
939        if (load != null) {
940          if (idleServerPredicator.test(load)) {
941            names.add(name);
942          }
943        }
944      });
945    }
946    return names;
947  }
948
949  /** Returns A copy of the internal list of draining servers. */
950  public List<ServerName> getDrainingServersList() {
951    return new ArrayList<>(this.drainingServers);
952  }
953
954  public boolean isServerOnline(ServerName serverName) {
955    return serverName != null && onlineServers.containsKey(serverName);
956  }
957
958  public enum ServerLiveState {
959    LIVE,
960    DEAD,
961    UNKNOWN
962  }
963
964  /** Returns whether the server is online, dead, or unknown. */
965  public synchronized ServerLiveState isServerKnownAndOnline(ServerName serverName) {
966    return onlineServers.containsKey(serverName)
967      ? ServerLiveState.LIVE
968      : (deadservers.isDeadServer(serverName) ? ServerLiveState.DEAD : ServerLiveState.UNKNOWN);
969  }
970
971  /**
972   * Check if a server is known to be dead. A server can be online, or known to be dead, or unknown
973   * to this manager (i.e, not online, not known to be dead either; it is simply not tracked by the
974   * master any more, for example, a very old previous instance).
975   */
976  public synchronized boolean isServerDead(ServerName serverName) {
977    return serverName == null || deadservers.isDeadServer(serverName);
978  }
979
980  /**
981   * Check if a server is unknown. A server can be online, or known to be dead, or unknown to this
982   * manager (i.e, not online, not known to be dead either; it is simply not tracked by the master
983   * any more, for example, a very old previous instance).
984   */
985  public boolean isServerUnknown(ServerName serverName) {
986    return serverName == null
987      || (!onlineServers.containsKey(serverName) && !deadservers.isDeadServer(serverName));
988  }
989
990  public void shutdownCluster() {
991    String statusStr = "Cluster shutdown requested of master=" + this.master.getServerName();
992    LOG.info(statusStr);
993    this.clusterShutdown.set(true);
994    if (onlineServers.isEmpty()) {
995      // we do not synchronize here so this may cause a double stop, but not a big deal
996      master.stop("OnlineServer=0 right after cluster shutdown set");
997    }
998  }
999
1000  public boolean isClusterShutdown() {
1001    return this.clusterShutdown.get();
1002  }
1003
1004  /**
1005   * start chore in ServerManager
1006   */
1007  public void startChore() {
1008    Configuration c = master.getConfiguration();
1009    if (persistFlushedSequenceId) {
1010      new Thread(() -> {
1011        // after AM#loadMeta, RegionStates should be loaded, and some regions are
1012        // deleted by drop/split/merge during removeDeletedRegionFromLoadedFlushedSequenceIds,
1013        // but these deleted regions are not added back to RegionStates,
1014        // so we can safely remove deleted regions.
1015        removeDeletedRegionFromLoadedFlushedSequenceIds();
1016      }, "RemoveDeletedRegionSyncThread").start();
1017      int flushPeriod =
1018        c.getInt(FLUSHEDSEQUENCEID_FLUSHER_INTERVAL, FLUSHEDSEQUENCEID_FLUSHER_INTERVAL_DEFAULT);
1019      flushedSeqIdFlusher = new FlushedSequenceIdFlusher("FlushedSequenceIdFlusher", flushPeriod);
1020      master.getChoreService().scheduleChore(flushedSeqIdFlusher);
1021    }
1022  }
1023
1024  /**
1025   * Stop the ServerManager.
1026   */
1027  public void stop() {
1028    if (flushedSeqIdFlusher != null) {
1029      flushedSeqIdFlusher.shutdown();
1030    }
1031    if (persistFlushedSequenceId) {
1032      try {
1033        persistRegionLastFlushedSequenceIds();
1034      } catch (IOException e) {
1035        LOG.warn("Failed to persist last flushed sequence id of regions" + " to file system", e);
1036      }
1037    }
1038  }
1039
1040  /**
1041   * Creates a list of possible destinations for a region. It contains the online servers, but not
1042   * the draining or dying servers.
1043   * @param serversToExclude can be null if there is no server to exclude
1044   */
1045  public List<ServerName> createDestinationServersList(final List<ServerName> serversToExclude) {
1046    Set<ServerName> destServers = new HashSet<>();
1047    onlineServers.forEach((sn, sm) -> {
1048      if (sm.getLastReportTimestamp() > 0) {
1049        // This means we have already called regionServerReport at leaset once, then let's include
1050        // this server for region assignment. This is an optimization to avoid assigning regions to
1051        // an uninitialized server. See HBASE-25032 for more details.
1052        destServers.add(sn);
1053      }
1054    });
1055
1056    if (serversToExclude != null) {
1057      destServers.removeAll(serversToExclude);
1058    }
1059
1060    // Loop through the draining server list and remove them from the server list
1061    final List<ServerName> drainingServersCopy = getDrainingServersList();
1062    destServers.removeAll(drainingServersCopy);
1063
1064    return new ArrayList<>(destServers);
1065  }
1066
1067  /**
1068   * Calls {@link #createDestinationServersList} without server to exclude.
1069   */
1070  public List<ServerName> createDestinationServersList() {
1071    return createDestinationServersList(null);
1072  }
1073
1074  /**
1075   * To clear any dead server with same host name and port of any online server
1076   */
1077  void clearDeadServersWithSameHostNameAndPortOfOnlineServer() {
1078    for (ServerName serverName : getOnlineServersList()) {
1079      deadservers.cleanAllPreviousInstances(serverName);
1080    }
1081  }
1082
1083  /**
1084   * Called by delete table and similar to notify the ServerManager that a region was removed.
1085   */
1086  public void removeRegion(final RegionInfo regionInfo) {
1087    final byte[] encodedName = regionInfo.getEncodedNameAsBytes();
1088    storeFlushedSequenceIdsByRegion.remove(encodedName);
1089    flushedSequenceIdByRegion.remove(encodedName);
1090  }
1091
1092  public boolean isRegionInServerManagerStates(final RegionInfo hri) {
1093    final byte[] encodedName = hri.getEncodedNameAsBytes();
1094    return (storeFlushedSequenceIdsByRegion.containsKey(encodedName)
1095      || flushedSequenceIdByRegion.containsKey(encodedName));
1096  }
1097
1098  /**
1099   * Called by delete table and similar to notify the ServerManager that a region was removed.
1100   */
1101  public void removeRegions(final List<RegionInfo> regions) {
1102    for (RegionInfo hri : regions) {
1103      removeRegion(hri);
1104    }
1105  }
1106
1107  /**
1108   * May return 0 when server is not online.
1109   */
1110  public int getVersionNumber(ServerName serverName) {
1111    ServerMetrics serverMetrics = onlineServers.get(serverName);
1112    return serverMetrics != null ? serverMetrics.getVersionNumber() : 0;
1113  }
1114
1115  /**
1116   * May return "0.0.0" when server is not online
1117   */
1118  public String getVersion(ServerName serverName) {
1119    ServerMetrics serverMetrics = onlineServers.get(serverName);
1120    return serverMetrics != null ? serverMetrics.getVersion() : "0.0.0";
1121  }
1122
1123  public int getInfoPort(ServerName serverName) {
1124    ServerMetrics serverMetrics = onlineServers.get(serverName);
1125    return serverMetrics != null ? serverMetrics.getInfoServerPort() : 0;
1126  }
1127
1128  /**
1129   * Persist last flushed sequence id of each region to HDFS
1130   * @throws IOException if persit to HDFS fails
1131   */
1132  private void persistRegionLastFlushedSequenceIds() throws IOException {
1133    if (isFlushSeqIdPersistInProgress) {
1134      return;
1135    }
1136    isFlushSeqIdPersistInProgress = true;
1137    try {
1138      Configuration conf = master.getConfiguration();
1139      Path rootDir = CommonFSUtils.getRootDir(conf);
1140      Path lastFlushedSeqIdPath = new Path(rootDir, LAST_FLUSHED_SEQ_ID_FILE);
1141      FileSystem fs = FileSystem.get(conf);
1142      if (fs.exists(lastFlushedSeqIdPath)) {
1143        LOG.info("Rewriting .lastflushedseqids file at: " + lastFlushedSeqIdPath);
1144        if (!fs.delete(lastFlushedSeqIdPath, false)) {
1145          throw new IOException("Unable to remove existing " + lastFlushedSeqIdPath);
1146        }
1147      } else {
1148        LOG.info("Writing .lastflushedseqids file at: " + lastFlushedSeqIdPath);
1149      }
1150      FSDataOutputStream out = fs.create(lastFlushedSeqIdPath);
1151      FlushedSequenceId.Builder flushedSequenceIdBuilder = FlushedSequenceId.newBuilder();
1152      try {
1153        for (Entry<byte[], Long> entry : flushedSequenceIdByRegion.entrySet()) {
1154          FlushedRegionSequenceId.Builder flushedRegionSequenceIdBuilder =
1155            FlushedRegionSequenceId.newBuilder();
1156          flushedRegionSequenceIdBuilder.setRegionEncodedName(ByteString.copyFrom(entry.getKey()));
1157          flushedRegionSequenceIdBuilder.setSeqId(entry.getValue());
1158          ConcurrentNavigableMap<byte[], Long> storeSeqIds =
1159            storeFlushedSequenceIdsByRegion.get(entry.getKey());
1160          if (storeSeqIds != null) {
1161            for (Entry<byte[], Long> store : storeSeqIds.entrySet()) {
1162              FlushedStoreSequenceId.Builder flushedStoreSequenceIdBuilder =
1163                FlushedStoreSequenceId.newBuilder();
1164              flushedStoreSequenceIdBuilder.setFamily(ByteString.copyFrom(store.getKey()));
1165              flushedStoreSequenceIdBuilder.setSeqId(store.getValue());
1166              flushedRegionSequenceIdBuilder.addStores(flushedStoreSequenceIdBuilder);
1167            }
1168          }
1169          flushedSequenceIdBuilder.addRegionSequenceId(flushedRegionSequenceIdBuilder);
1170        }
1171        flushedSequenceIdBuilder.build().writeDelimitedTo(out);
1172      } finally {
1173        if (out != null) {
1174          out.close();
1175        }
1176      }
1177    } finally {
1178      isFlushSeqIdPersistInProgress = false;
1179    }
1180  }
1181
1182  /**
1183   * Load last flushed sequence id of each region from HDFS, if persisted
1184   */
1185  public void loadLastFlushedSequenceIds() throws IOException {
1186    if (!persistFlushedSequenceId) {
1187      return;
1188    }
1189    Configuration conf = master.getConfiguration();
1190    Path rootDir = CommonFSUtils.getRootDir(conf);
1191    Path lastFlushedSeqIdPath = new Path(rootDir, LAST_FLUSHED_SEQ_ID_FILE);
1192    FileSystem fs = FileSystem.get(conf);
1193    if (!fs.exists(lastFlushedSeqIdPath)) {
1194      LOG.info("No .lastflushedseqids found at " + lastFlushedSeqIdPath
1195        + " will record last flushed sequence id"
1196        + " for regions by regionserver report all over again");
1197      return;
1198    } else {
1199      LOG.info("begin to load .lastflushedseqids at " + lastFlushedSeqIdPath);
1200    }
1201    FSDataInputStream in = fs.open(lastFlushedSeqIdPath);
1202    try {
1203      FlushedSequenceId flushedSequenceId = FlushedSequenceId.parseDelimitedFrom(in);
1204      if (flushedSequenceId == null) {
1205        LOG.info(".lastflushedseqids found at {} is empty", lastFlushedSeqIdPath);
1206        return;
1207      }
1208      for (FlushedRegionSequenceId flushedRegionSequenceId : flushedSequenceId
1209        .getRegionSequenceIdList()) {
1210        byte[] encodedRegionName = flushedRegionSequenceId.getRegionEncodedName().toByteArray();
1211        flushedSequenceIdByRegion.putIfAbsent(encodedRegionName,
1212          flushedRegionSequenceId.getSeqId());
1213        if (
1214          flushedRegionSequenceId.getStoresList() != null
1215            && flushedRegionSequenceId.getStoresList().size() != 0
1216        ) {
1217          ConcurrentNavigableMap<byte[], Long> storeFlushedSequenceId =
1218            computeIfAbsent(storeFlushedSequenceIdsByRegion, encodedRegionName,
1219              () -> new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR));
1220          for (FlushedStoreSequenceId flushedStoreSequenceId : flushedRegionSequenceId
1221            .getStoresList()) {
1222            storeFlushedSequenceId.put(flushedStoreSequenceId.getFamily().toByteArray(),
1223              flushedStoreSequenceId.getSeqId());
1224          }
1225        }
1226      }
1227    } finally {
1228      in.close();
1229    }
1230  }
1231
1232  /**
1233   * Regions may have been removed between latest persist of FlushedSequenceIds and master abort. So
1234   * after loading FlushedSequenceIds from file, and after meta loaded, we need to remove the
1235   * deleted region according to RegionStates.
1236   */
1237  public void removeDeletedRegionFromLoadedFlushedSequenceIds() {
1238    RegionStates regionStates = master.getAssignmentManager().getRegionStates();
1239    Iterator<byte[]> it = flushedSequenceIdByRegion.keySet().iterator();
1240    while (it.hasNext()) {
1241      byte[] regionEncodedName = it.next();
1242      if (regionStates.getRegionState(Bytes.toStringBinary(regionEncodedName)) == null) {
1243        it.remove();
1244        storeFlushedSequenceIdsByRegion.remove(regionEncodedName);
1245      }
1246    }
1247  }
1248
1249  private class FlushedSequenceIdFlusher extends ScheduledChore {
1250
1251    public FlushedSequenceIdFlusher(String name, int p) {
1252      super(name, master, p, 60 * 1000); // delay one minute before first execute
1253    }
1254
1255    @Override
1256    protected void chore() {
1257      try {
1258        persistRegionLastFlushedSequenceIds();
1259      } catch (IOException e) {
1260        LOG.debug("Failed to persist last flushed sequence id of regions" + " to file system", e);
1261      }
1262    }
1263  }
1264}