001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master;
019
020import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
021
022import java.io.IOException;
023import java.net.InetAddress;
024import java.util.ArrayList;
025import java.util.Collections;
026import java.util.HashSet;
027import java.util.Iterator;
028import java.util.List;
029import java.util.Map;
030import java.util.Map.Entry;
031import java.util.Objects;
032import java.util.Set;
033import java.util.concurrent.ConcurrentNavigableMap;
034import java.util.concurrent.ConcurrentSkipListMap;
035import java.util.concurrent.CopyOnWriteArrayList;
036import java.util.concurrent.atomic.AtomicBoolean;
037import java.util.function.Predicate;
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.fs.FSDataInputStream;
040import org.apache.hadoop.fs.FSDataOutputStream;
041import org.apache.hadoop.fs.FileSystem;
042import org.apache.hadoop.fs.Path;
043import org.apache.hadoop.hbase.ClockOutOfSyncException;
044import org.apache.hadoop.hbase.HConstants;
045import org.apache.hadoop.hbase.NotServingRegionException;
046import org.apache.hadoop.hbase.RegionMetrics;
047import org.apache.hadoop.hbase.ScheduledChore;
048import org.apache.hadoop.hbase.ServerMetrics;
049import org.apache.hadoop.hbase.ServerMetricsBuilder;
050import org.apache.hadoop.hbase.ServerName;
051import org.apache.hadoop.hbase.YouAreDeadException;
052import org.apache.hadoop.hbase.client.AsyncClusterConnection;
053import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
054import org.apache.hadoop.hbase.client.RegionInfo;
055import org.apache.hadoop.hbase.conf.ConfigurationObserver;
056import org.apache.hadoop.hbase.ipc.DecommissionedHostRejectedException;
057import org.apache.hadoop.hbase.ipc.RemoteWithExtrasException;
058import org.apache.hadoop.hbase.master.assignment.RegionStates;
059import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
060import org.apache.hadoop.hbase.monitoring.MonitoredTask;
061import org.apache.hadoop.hbase.procedure2.Procedure;
062import org.apache.hadoop.hbase.util.Bytes;
063import org.apache.hadoop.hbase.util.CommonFSUtils;
064import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
065import org.apache.hadoop.hbase.util.FutureUtils;
066import org.apache.hadoop.hbase.zookeeper.ZKUtil;
067import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
068import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
069import org.apache.yetus.audience.InterfaceAudience;
070import org.apache.zookeeper.KeeperException;
071import org.slf4j.Logger;
072import org.slf4j.LoggerFactory;
073
074import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
075import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
076
077import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
078import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
079import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
080import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
081import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.FlushedRegionSequenceId;
082import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.FlushedSequenceId;
083import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.FlushedStoreSequenceId;
084import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
085
086/**
087 * The ServerManager class manages info about region servers.
088 * <p>
089 * Maintains lists of online and dead servers. Processes the startups, shutdowns, and deaths of
090 * region servers.
091 * <p>
092 * Servers are distinguished in two different ways. A given server has a location, specified by
093 * hostname and port, and of which there can only be one online at any given time. A server instance
094 * is specified by the location (hostname and port) as well as the startcode (timestamp from when
095 * the server was started). This is used to differentiate a restarted instance of a given server
096 * from the original instance.
097 * <p>
098 * If a sever is known not to be running any more, it is called dead. The dead server needs to be
099 * handled by a ServerShutdownHandler. If the handler is not enabled yet, the server can't be
100 * handled right away so it is queued up. After the handler is enabled, the server will be submitted
101 * to a handler to handle. However, the handler may be just partially enabled. If so, the server
102 * cannot be fully processed, and be queued up for further processing. A server is fully processed
103 * only after the handler is fully enabled and has completed the handling.
104 */
105@InterfaceAudience.Private
106public class ServerManager implements ConfigurationObserver {
107  public static final String WAIT_ON_REGIONSERVERS_MAXTOSTART =
108    "hbase.master.wait.on.regionservers.maxtostart";
109
110  public static final String WAIT_ON_REGIONSERVERS_MINTOSTART =
111    "hbase.master.wait.on.regionservers.mintostart";
112
113  public static final String WAIT_ON_REGIONSERVERS_TIMEOUT =
114    "hbase.master.wait.on.regionservers.timeout";
115
116  public static final String WAIT_ON_REGIONSERVERS_INTERVAL =
117    "hbase.master.wait.on.regionservers.interval";
118
119  /**
120   * see HBASE-20727 if set to true, flushedSequenceIdByRegion and storeFlushedSequenceIdsByRegion
121   * will be persisted to HDFS and loaded when master restart to speed up log split
122   */
123  public static final String PERSIST_FLUSHEDSEQUENCEID =
124    "hbase.master.persist.flushedsequenceid.enabled";
125
126  public static final boolean PERSIST_FLUSHEDSEQUENCEID_DEFAULT = true;
127
128  public static final String FLUSHEDSEQUENCEID_FLUSHER_INTERVAL =
129    "hbase.master.flushedsequenceid.flusher.interval";
130
131  public static final int FLUSHEDSEQUENCEID_FLUSHER_INTERVAL_DEFAULT = 3 * 60 * 60 * 1000; // 3
132                                                                                           // hours
133
134  public static final String MAX_CLOCK_SKEW_MS = "hbase.master.maxclockskew";
135
136  private static final Logger LOG = LoggerFactory.getLogger(ServerManager.class);
137
138  // Set if we are to shutdown the cluster.
139  private AtomicBoolean clusterShutdown = new AtomicBoolean(false);
140
141  /**
142   * The last flushed sequence id for a region.
143   */
144  private final ConcurrentNavigableMap<byte[], Long> flushedSequenceIdByRegion =
145    new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
146
147  private boolean persistFlushedSequenceId = true;
148  private volatile boolean isFlushSeqIdPersistInProgress = false;
149  /** File on hdfs to store last flushed sequence id of regions */
150  private static final String LAST_FLUSHED_SEQ_ID_FILE = ".lastflushedseqids";
151  private FlushedSequenceIdFlusher flushedSeqIdFlusher;
152
153  /**
154   * The last flushed sequence id for a store in a region.
155   */
156  private final ConcurrentNavigableMap<byte[],
157    ConcurrentNavigableMap<byte[], Long>> storeFlushedSequenceIdsByRegion =
158      new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
159
160  /** Map of registered servers to their current load */
161  private final ConcurrentNavigableMap<ServerName, ServerMetrics> onlineServers =
162    new ConcurrentSkipListMap<>();
163
164  /** List of region servers that should not get any more new regions. */
165  private final ArrayList<ServerName> drainingServers = new ArrayList<>();
166
167  private final MasterServices master;
168  private final RegionServerList storage;
169
170  private final DeadServer deadservers = new DeadServer();
171
172  private final long maxSkew;
173  private final long warningSkew;
174
175  /** Listeners that are called on server events. */
176  private List<ServerListener> listeners = new CopyOnWriteArrayList<>();
177
178  /** Configured value of HConstants.REJECT_DECOMMISSIONED_HOSTS_KEY */
179  private volatile boolean rejectDecommissionedHostsConfig;
180
181  /**
182   * Constructor.
183   */
184  public ServerManager(final MasterServices master, RegionServerList storage) {
185    this.master = master;
186    this.storage = storage;
187    Configuration c = master.getConfiguration();
188    maxSkew = c.getLong(MAX_CLOCK_SKEW_MS, 30000);
189    warningSkew = c.getLong("hbase.master.warningclockskew", 10000);
190    persistFlushedSequenceId =
191      c.getBoolean(PERSIST_FLUSHEDSEQUENCEID, PERSIST_FLUSHEDSEQUENCEID_DEFAULT);
192    rejectDecommissionedHostsConfig = getRejectDecommissionedHostsConfig(c);
193  }
194
195  /**
196   * Implementation of the ConfigurationObserver interface. We are interested in live-loading the
197   * configuration value of HConstants.REJECT_DECOMMISSIONED_HOSTS_KEY
198   * @param conf Server configuration instance
199   */
200  @Override
201  public void onConfigurationChange(Configuration conf) {
202    final boolean newValue = getRejectDecommissionedHostsConfig(conf);
203    if (rejectDecommissionedHostsConfig == newValue) {
204      // no-op
205      return;
206    }
207
208    LOG.info("Config Reload for RejectDecommissionedHosts. previous value: {}, new value: {}",
209      rejectDecommissionedHostsConfig, newValue);
210
211    rejectDecommissionedHostsConfig = newValue;
212  }
213
214  /**
215   * Reads the value of HConstants.REJECT_DECOMMISSIONED_HOSTS_KEY from the config and returns it
216   * @param conf Configuration instance of the Master
217   */
218  public boolean getRejectDecommissionedHostsConfig(Configuration conf) {
219    return conf.getBoolean(HConstants.REJECT_DECOMMISSIONED_HOSTS_KEY,
220      HConstants.REJECT_DECOMMISSIONED_HOSTS_DEFAULT);
221  }
222
223  /**
224   * Add the listener to the notification list.
225   * @param listener The ServerListener to register
226   */
227  public void registerListener(final ServerListener listener) {
228    this.listeners.add(listener);
229  }
230
231  /**
232   * Remove the listener from the notification list.
233   * @param listener The ServerListener to unregister
234   */
235  public boolean unregisterListener(final ServerListener listener) {
236    return this.listeners.remove(listener);
237  }
238
239  /**
240   * Let the server manager know a new regionserver has come online
241   * @param request       the startup request
242   * @param versionNumber the version number of the new regionserver
243   * @param version       the version of the new regionserver, could contain strings like "SNAPSHOT"
244   * @param ia            the InetAddress from which request is received
245   * @return The ServerName we know this server as.
246   */
247  ServerName regionServerStartup(RegionServerStartupRequest request, int versionNumber,
248    String version, InetAddress ia) throws IOException {
249    // Test for case where we get a region startup message from a regionserver
250    // that has been quickly restarted but whose znode expiration handler has
251    // not yet run, or from a server whose fail we are currently processing.
252    // Test its host+port combo is present in serverAddressToServerInfo. If it
253    // is, reject the server and trigger its expiration. The next time it comes
254    // in, it should have been removed from serverAddressToServerInfo and queued
255    // for processing by ProcessServerShutdown.
256
257    // if use-ip is enabled, we will use ip to expose Master/RS service for client,
258    // see HBASE-27304 for details.
259    boolean useIp = master.getConfiguration().getBoolean(HConstants.HBASE_SERVER_USEIP_ENABLED_KEY,
260      HConstants.HBASE_SERVER_USEIP_ENABLED_DEFAULT);
261    String isaHostName = useIp ? ia.getHostAddress() : ia.getHostName();
262    final String hostname =
263      request.hasUseThisHostnameInstead() ? request.getUseThisHostnameInstead() : isaHostName;
264    ServerName sn = ServerName.valueOf(hostname, request.getPort(), request.getServerStartCode());
265
266    // Check if the host should be rejected based on it's decommissioned status
267    checkRejectableDecommissionedStatus(sn);
268
269    checkClockSkew(sn, request.getServerCurrentTime());
270    checkIsDead(sn, "STARTUP");
271    if (!checkAndRecordNewServer(sn, ServerMetricsBuilder.of(sn, versionNumber, version))) {
272      LOG.warn("THIS SHOULD NOT HAPPEN, RegionServerStartup could not record the server: {}", sn);
273    }
274    storage.started(sn);
275    return sn;
276  }
277
278  /**
279   * Updates last flushed sequence Ids for the regions on server sn
280   */
281  private void updateLastFlushedSequenceIds(ServerName sn, ServerMetrics hsl) {
282    for (Entry<byte[], RegionMetrics> entry : hsl.getRegionMetrics().entrySet()) {
283      byte[] encodedRegionName = Bytes.toBytes(RegionInfo.encodeRegionName(entry.getKey()));
284      Long existingValue = flushedSequenceIdByRegion.get(encodedRegionName);
285      long l = entry.getValue().getCompletedSequenceId();
286      // Don't let smaller sequence ids override greater sequence ids.
287      if (LOG.isTraceEnabled()) {
288        LOG.trace(Bytes.toString(encodedRegionName) + ", existingValue=" + existingValue
289          + ", completeSequenceId=" + l);
290      }
291      if (existingValue == null || (l != HConstants.NO_SEQNUM && l > existingValue)) {
292        flushedSequenceIdByRegion.put(encodedRegionName, l);
293      } else if (l != HConstants.NO_SEQNUM && l < existingValue) {
294        LOG.warn("RegionServer " + sn + " indicates a last flushed sequence id (" + l
295          + ") that is less than the previous last flushed sequence id (" + existingValue
296          + ") for region " + Bytes.toString(entry.getKey()) + " Ignoring.");
297      }
298      ConcurrentNavigableMap<byte[], Long> storeFlushedSequenceId =
299        computeIfAbsent(storeFlushedSequenceIdsByRegion, encodedRegionName,
300          () -> new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR));
301      for (Entry<byte[], Long> storeSeqId : entry.getValue().getStoreSequenceId().entrySet()) {
302        byte[] family = storeSeqId.getKey();
303        existingValue = storeFlushedSequenceId.get(family);
304        l = storeSeqId.getValue();
305        if (LOG.isTraceEnabled()) {
306          LOG.trace(Bytes.toString(encodedRegionName) + ", family=" + Bytes.toString(family)
307            + ", existingValue=" + existingValue + ", completeSequenceId=" + l);
308        }
309        // Don't let smaller sequence ids override greater sequence ids.
310        if (existingValue == null || (l != HConstants.NO_SEQNUM && l > existingValue.longValue())) {
311          storeFlushedSequenceId.put(family, l);
312        }
313      }
314    }
315  }
316
317  public void regionServerReport(ServerName sn, ServerMetrics sl) throws YouAreDeadException {
318    checkIsDead(sn, "REPORT");
319    if (null == this.onlineServers.replace(sn, sl)) {
320      // Already have this host+port combo and its just different start code?
321      // Just let the server in. Presume master joining a running cluster.
322      // recordNewServer is what happens at the end of reportServerStartup.
323      // The only thing we are skipping is passing back to the regionserver
324      // the ServerName to use. Here we presume a master has already done
325      // that so we'll press on with whatever it gave us for ServerName.
326      if (!checkAndRecordNewServer(sn, sl)) {
327        LOG.info("RegionServerReport ignored, could not record the server: " + sn);
328        return; // Not recorded, so no need to move on
329      }
330    }
331    updateLastFlushedSequenceIds(sn, sl);
332  }
333
334  /**
335   * Checks if the Master is configured to reject decommissioned hosts or not. When it's configured
336   * to do so, any RegionServer trying to join the cluster will have it's host checked against the
337   * list of hosts of currently decommissioned servers and potentially get prevented from reporting
338   * for duty; otherwise, we do nothing and we let them pass to the next check. See HBASE-28342 for
339   * details.
340   * @param sn The ServerName to check for
341   * @throws DecommissionedHostRejectedException if the Master is configured to reject
342   *                                             decommissioned hosts and this host exists in the
343   *                                             list of the decommissioned servers
344   */
345  private void checkRejectableDecommissionedStatus(ServerName sn)
346    throws DecommissionedHostRejectedException {
347    LOG.info("Checking decommissioned status of RegionServer {}", sn.getServerName());
348
349    // If the Master is not configured to reject decommissioned hosts, return early.
350    if (!rejectDecommissionedHostsConfig) {
351      return;
352    }
353
354    // Look for a match for the hostname in the list of decommissioned servers
355    for (ServerName server : getDrainingServersList()) {
356      if (Objects.equals(server.getHostname(), sn.getHostname())) {
357        // Found a match and master is configured to reject decommissioned hosts, throw exception!
358        LOG.warn(
359          "Rejecting RegionServer {} from reporting for duty because Master is configured "
360            + "to reject decommissioned hosts and this host was marked as such in the past.",
361          sn.getServerName());
362        throw new DecommissionedHostRejectedException(String.format(
363          "Host %s exists in the list of decommissioned servers and Master is configured to "
364            + "reject decommissioned hosts",
365          sn.getHostname()));
366      }
367    }
368  }
369
370  /**
371   * Check is a server of same host and port already exists, if not, or the existed one got a
372   * smaller start code, record it.
373   * @param serverName the server to check and record
374   * @param sl         the server load on the server
375   * @return true if the server is recorded, otherwise, false
376   */
377  boolean checkAndRecordNewServer(final ServerName serverName, final ServerMetrics sl) {
378    ServerName existingServer = null;
379    synchronized (this.onlineServers) {
380      existingServer = findServerWithSameHostnamePortWithLock(serverName);
381      if (existingServer != null && (existingServer.getStartcode() > serverName.getStartcode())) {
382        LOG.info("Server serverName=" + serverName + " rejected; we already have "
383          + existingServer.toString() + " registered with same hostname and port");
384        return false;
385      }
386      recordNewServerWithLock(serverName, sl);
387    }
388
389    // Tell our listeners that a server was added
390    if (!this.listeners.isEmpty()) {
391      for (ServerListener listener : this.listeners) {
392        listener.serverAdded(serverName);
393      }
394    }
395
396    // Note that we assume that same ts means same server, and don't expire in that case.
397    // TODO: ts can theoretically collide due to clock shifts, so this is a bit hacky.
398    if (existingServer != null && (existingServer.getStartcode() < serverName.getStartcode())) {
399      LOG.info("Triggering server recovery; existingServer " + existingServer
400        + " looks stale, new server:" + serverName);
401      expireServer(existingServer);
402    }
403    return true;
404  }
405
406  /**
407   * Find out the region servers crashed between the crash of the previous master instance and the
408   * current master instance and schedule SCP for them.
409   * <p/>
410   * Since the {@code RegionServerTracker} has already helped us to construct the online servers set
411   * by scanning zookeeper, now we can compare the online servers with {@code liveServersFromWALDir}
412   * to find out whether there are servers which are already dead.
413   * <p/>
414   * Must be called inside the initialization method of {@code RegionServerTracker} to avoid
415   * concurrency issue.
416   * @param deadServersFromPE     the region servers which already have a SCP associated.
417   * @param liveServersFromWALDir the live region servers from wal directory.
418   */
419  void findDeadServersAndProcess(Set<ServerName> deadServersFromPE,
420    Set<ServerName> liveServersFromWALDir) {
421    deadServersFromPE.forEach(deadservers::putIfAbsent);
422    liveServersFromWALDir.stream().filter(sn -> !onlineServers.containsKey(sn))
423      .forEach(this::expireServer);
424  }
425
426  /**
427   * Checks if the clock skew between the server and the master. If the clock skew exceeds the
428   * configured max, it will throw an exception; if it exceeds the configured warning threshold, it
429   * will log a warning but start normally.
430   * @param serverName Incoming servers's name
431   * @throws ClockOutOfSyncException if the skew exceeds the configured max value
432   */
433  private void checkClockSkew(final ServerName serverName, final long serverCurrentTime)
434    throws ClockOutOfSyncException {
435    long skew = Math.abs(EnvironmentEdgeManager.currentTime() - serverCurrentTime);
436    if (skew > maxSkew) {
437      String message = "Server " + serverName + " has been "
438        + "rejected; Reported time is too far out of sync with master.  " + "Time difference of "
439        + skew + "ms > max allowed of " + maxSkew + "ms";
440      LOG.warn(message);
441      throw new ClockOutOfSyncException(message);
442    } else if (skew > warningSkew) {
443      String message = "Reported time for server " + serverName + " is out of sync with master "
444        + "by " + skew + "ms. (Warning threshold is " + warningSkew + "ms; " + "error threshold is "
445        + maxSkew + "ms)";
446      LOG.warn(message);
447    }
448  }
449
450  /**
451   * Called when RegionServer first reports in for duty and thereafter each time it heartbeats to
452   * make sure it is has not been figured for dead. If this server is on the dead list, reject it
453   * with a YouAreDeadException. If it was dead but came back with a new start code, remove the old
454   * entry from the dead list.
455   * @param what START or REPORT
456   */
457  private void checkIsDead(final ServerName serverName, final String what)
458    throws YouAreDeadException {
459    if (this.deadservers.isDeadServer(serverName)) {
460      // Exact match: host name, port and start code all match with existing one of the
461      // dead servers. So, this server must be dead. Tell it to kill itself.
462      String message =
463        "Server " + what + " rejected; currently processing " + serverName + " as dead server";
464      LOG.debug(message);
465      throw new YouAreDeadException(message);
466    }
467    // Remove dead server with same hostname and port of newly checking in rs after master
468    // initialization. See HBASE-5916 for more information.
469    if (
470      (this.master == null || this.master.isInitialized())
471        && this.deadservers.cleanPreviousInstance(serverName)
472    ) {
473      // This server has now become alive after we marked it as dead.
474      // We removed it's previous entry from the dead list to reflect it.
475      LOG.debug("{} {} came back up, removed it from the dead servers list", what, serverName);
476    }
477  }
478
479  /**
480   * Assumes onlineServers is locked.
481   * @return ServerName with matching hostname and port.
482   */
483  public ServerName findServerWithSameHostnamePortWithLock(final ServerName serverName) {
484    ServerName end =
485      ServerName.valueOf(serverName.getHostname(), serverName.getPort(), Long.MAX_VALUE);
486
487    ServerName r = onlineServers.lowerKey(end);
488    if (r != null) {
489      if (ServerName.isSameAddress(r, serverName)) {
490        return r;
491      }
492    }
493    return null;
494  }
495
496  /**
497   * Adds the onlineServers list. onlineServers should be locked.
498   * @param serverName The remote servers name.
499   */
500  void recordNewServerWithLock(final ServerName serverName, final ServerMetrics sl) {
501    LOG.info("Registering regionserver=" + serverName);
502    this.onlineServers.put(serverName, sl);
503    master.getAssignmentManager().getRegionStates().createServer(serverName);
504  }
505
506  public ConcurrentNavigableMap<byte[], Long> getFlushedSequenceIdByRegion() {
507    return flushedSequenceIdByRegion;
508  }
509
510  public RegionStoreSequenceIds getLastFlushedSequenceId(byte[] encodedRegionName) {
511    RegionStoreSequenceIds.Builder builder = RegionStoreSequenceIds.newBuilder();
512    Long seqId = flushedSequenceIdByRegion.get(encodedRegionName);
513    builder.setLastFlushedSequenceId(seqId != null ? seqId.longValue() : HConstants.NO_SEQNUM);
514    Map<byte[], Long> storeFlushedSequenceId =
515      storeFlushedSequenceIdsByRegion.get(encodedRegionName);
516    if (storeFlushedSequenceId != null) {
517      for (Map.Entry<byte[], Long> entry : storeFlushedSequenceId.entrySet()) {
518        builder.addStoreSequenceId(StoreSequenceId.newBuilder()
519          .setFamilyName(UnsafeByteOperations.unsafeWrap(entry.getKey()))
520          .setSequenceId(entry.getValue().longValue()).build());
521      }
522    }
523    return builder.build();
524  }
525
526  /** Returns ServerMetrics if serverName is known else null */
527  public ServerMetrics getLoad(final ServerName serverName) {
528    return this.onlineServers.get(serverName);
529  }
530
531  /**
532   * Compute the average load across all region servers. Currently, this uses a very naive
533   * computation - just uses the number of regions being served, ignoring stats about number of
534   * requests.
535   * @return the average load
536   */
537  public double getAverageLoad() {
538    int totalLoad = 0;
539    int numServers = 0;
540    for (ServerMetrics sl : this.onlineServers.values()) {
541      numServers++;
542      totalLoad += sl.getRegionMetrics().size();
543    }
544    return numServers == 0 ? 0 : (double) totalLoad / (double) numServers;
545  }
546
547  /** Returns the count of active regionservers */
548  public int countOfRegionServers() {
549    // Presumes onlineServers is a concurrent map
550    return this.onlineServers.size();
551  }
552
553  /** Returns Read-only map of servers to serverinfo */
554  public Map<ServerName, ServerMetrics> getOnlineServers() {
555    // Presumption is that iterating the returned Map is OK.
556    synchronized (this.onlineServers) {
557      return Collections.unmodifiableMap(this.onlineServers);
558    }
559  }
560
561  public DeadServer getDeadServers() {
562    return this.deadservers;
563  }
564
565  /**
566   * Checks if any dead servers are currently in progress.
567   * @return true if any RS are being processed as dead, false if not
568   */
569  public boolean areDeadServersInProgress() throws IOException {
570    return master.getProcedures().stream()
571      .anyMatch(p -> !p.isFinished() && p instanceof ServerCrashProcedure);
572  }
573
574  void letRegionServersShutdown() {
575    long previousLogTime = 0;
576    ServerName sn = master.getServerName();
577    ZKWatcher zkw = master.getZooKeeper();
578    int onlineServersCt;
579    while ((onlineServersCt = onlineServers.size()) > 0) {
580      if (EnvironmentEdgeManager.currentTime() > (previousLogTime + 1000)) {
581        Set<ServerName> remainingServers = onlineServers.keySet();
582        synchronized (onlineServers) {
583          if (remainingServers.size() == 1 && remainingServers.contains(sn)) {
584            // Master will delete itself later.
585            return;
586          }
587        }
588        StringBuilder sb = new StringBuilder();
589        // It's ok here to not sync on onlineServers - merely logging
590        for (ServerName key : remainingServers) {
591          if (sb.length() > 0) {
592            sb.append(", ");
593          }
594          sb.append(key);
595        }
596        LOG.info("Waiting on regionserver(s) " + sb.toString());
597        previousLogTime = EnvironmentEdgeManager.currentTime();
598      }
599
600      try {
601        List<String> servers = getRegionServersInZK(zkw);
602        if (
603          servers == null || servers.isEmpty()
604            || (servers.size() == 1 && servers.contains(sn.toString()))
605        ) {
606          LOG.info("ZK shows there is only the master self online, exiting now");
607          // Master could have lost some ZK events, no need to wait more.
608          break;
609        }
610      } catch (KeeperException ke) {
611        LOG.warn("Failed to list regionservers", ke);
612        // ZK is malfunctioning, don't hang here
613        break;
614      }
615      synchronized (onlineServers) {
616        try {
617          if (onlineServersCt == onlineServers.size()) onlineServers.wait(100);
618        } catch (InterruptedException ignored) {
619          // continue
620        }
621      }
622    }
623  }
624
625  private List<String> getRegionServersInZK(final ZKWatcher zkw) throws KeeperException {
626    return ZKUtil.listChildrenNoWatch(zkw, zkw.getZNodePaths().rsZNode);
627  }
628
629  /**
630   * Expire the passed server. Add it to list of dead servers and queue a shutdown processing.
631   * @return pid if we queued a ServerCrashProcedure else {@link Procedure#NO_PROC_ID} if we did not
632   *         (could happen for many reasons including the fact that its this server that is going
633   *         down or we already have queued an SCP for this server or SCP processing is currently
634   *         disabled because we are in startup phase).
635   */
636  // Redo test so we can make this protected.
637  public synchronized long expireServer(final ServerName serverName) {
638    return expireServer(serverName, false);
639
640  }
641
642  synchronized long expireServer(final ServerName serverName, boolean force) {
643    // THIS server is going down... can't handle our own expiration.
644    if (serverName.equals(master.getServerName())) {
645      if (!(master.isAborted() || master.isStopped())) {
646        master.stop("We lost our znode?");
647      }
648      return Procedure.NO_PROC_ID;
649    }
650    if (this.deadservers.isDeadServer(serverName)) {
651      LOG.warn("Expiration called on {} but already in DeadServer", serverName);
652      return Procedure.NO_PROC_ID;
653    }
654    moveFromOnlineToDeadServers(serverName);
655
656    // If server is in draining mode, remove corresponding znode
657    // In some tests, the mocked HM may not have ZK Instance, hence null check
658    if (master.getZooKeeper() != null) {
659      String drainingZnode = ZNodePaths
660        .joinZNode(master.getZooKeeper().getZNodePaths().drainingZNode, serverName.getServerName());
661      try {
662        ZKUtil.deleteNodeFailSilent(master.getZooKeeper(), drainingZnode);
663      } catch (KeeperException e) {
664        LOG.warn(
665          "Error deleting the draining znode for stopping server " + serverName.getServerName(), e);
666      }
667    }
668
669    // If cluster is going down, yes, servers are going to be expiring; don't
670    // process as a dead server
671    if (isClusterShutdown()) {
672      LOG.info("Cluster shutdown set; " + serverName + " expired; onlineServers="
673        + this.onlineServers.size());
674      if (this.onlineServers.isEmpty()) {
675        master.stop("Cluster shutdown set; onlineServer=0");
676      }
677      return Procedure.NO_PROC_ID;
678    }
679    LOG.info("Processing expiration of " + serverName + " on " + this.master.getServerName());
680    long pid = master.getAssignmentManager().submitServerCrash(serverName, true, force);
681    if (pid == Procedure.NO_PROC_ID) {
682      // skip later processing as we failed to submit SCP
683      return Procedure.NO_PROC_ID;
684    }
685    storage.expired(serverName);
686    // Tell our listeners that a server was removed
687    if (!this.listeners.isEmpty()) {
688      this.listeners.stream().forEach(l -> l.serverRemoved(serverName));
689    }
690    // trigger a persist of flushedSeqId
691    if (flushedSeqIdFlusher != null) {
692      flushedSeqIdFlusher.triggerNow();
693    }
694    return pid;
695  }
696
697  /**
698   * Called when server has expired.
699   */
700  // Locking in this class needs cleanup.
701  public synchronized void moveFromOnlineToDeadServers(final ServerName sn) {
702    synchronized (this.onlineServers) {
703      boolean online = this.onlineServers.containsKey(sn);
704      if (online) {
705        // Remove the server from the known servers lists and update load info BUT
706        // add to deadservers first; do this so it'll show in dead servers list if
707        // not in online servers list.
708        this.deadservers.putIfAbsent(sn);
709        this.onlineServers.remove(sn);
710        onlineServers.notifyAll();
711      } else {
712        // If not online, that is odd but may happen if 'Unknown Servers' -- where meta
713        // has references to servers not online nor in dead servers list. If
714        // 'Unknown Server', don't add to DeadServers else will be there for ever.
715        LOG.trace("Expiration of {} but server not online", sn);
716      }
717    }
718  }
719
720  /*
721   * Remove the server from the drain list.
722   */
723  public synchronized boolean removeServerFromDrainList(final ServerName sn) {
724    LOG.info("Removing server {} from the draining list.", sn);
725
726    // Remove the server from the draining servers lists.
727    return this.drainingServers.remove(sn);
728  }
729
730  /**
731   * Add the server to the drain list.
732   * @return True if the server is added or the server is already on the drain list.
733   */
734  public synchronized boolean addServerToDrainList(final ServerName sn) {
735    // If master is not rejecting decommissioned hosts, warn if the server (sn) is not online.
736    // However, we want to add servers even if they're not online if the master is configured
737    // to reject decommissioned hosts
738    if (!rejectDecommissionedHostsConfig && !this.isServerOnline(sn)) {
739      LOG.warn("Server {} is not currently online. Ignoring request to add it to draining list.",
740        sn);
741      return false;
742    }
743
744    // Add the server to the draining servers lists, if it's not already in it.
745    if (this.drainingServers.contains(sn)) {
746      LOG.warn(
747        "Server {} is already in the draining server list. Ignoring request to add it again.", sn);
748      return true;
749    }
750
751    LOG.info("Server {} added to draining server list.", sn);
752    return this.drainingServers.add(sn);
753  }
754
755  /**
756   * Contacts a region server and waits up to timeout ms to close the region. This bypasses the
757   * active hmaster. Pass -1 as timeout if you do not want to wait on result.
758   */
759  public static void closeRegionSilentlyAndWait(AsyncClusterConnection connection,
760    ServerName server, RegionInfo region, long timeout) throws IOException, InterruptedException {
761    AsyncRegionServerAdmin admin = connection.getRegionServerAdmin(server);
762    try {
763      FutureUtils.get(
764        admin.closeRegion(ProtobufUtil.buildCloseRegionRequest(server, region.getRegionName())));
765    } catch (IOException e) {
766      LOG.warn("Exception when closing region: " + region.getRegionNameAsString(), e);
767    }
768    if (timeout < 0) {
769      return;
770    }
771    long expiration = timeout + EnvironmentEdgeManager.currentTime();
772    while (EnvironmentEdgeManager.currentTime() < expiration) {
773      try {
774        RegionInfo rsRegion = ProtobufUtil.toRegionInfo(FutureUtils
775          .get(
776            admin.getRegionInfo(RequestConverter.buildGetRegionInfoRequest(region.getRegionName())))
777          .getRegionInfo());
778        if (rsRegion == null) {
779          return;
780        }
781      } catch (IOException ioe) {
782        if (
783          ioe instanceof NotServingRegionException
784            || (ioe instanceof RemoteWithExtrasException && ((RemoteWithExtrasException) ioe)
785              .unwrapRemoteException() instanceof NotServingRegionException)
786        ) {
787          // no need to retry again
788          return;
789        }
790        LOG.warn("Exception when retrieving regioninfo from: " + region.getRegionNameAsString(),
791          ioe);
792      }
793      Thread.sleep(1000);
794    }
795    throw new IOException("Region " + region + " failed to close within" + " timeout " + timeout);
796  }
797
798  /**
799   * Calculate min necessary to start. This is not an absolute. It is just a friction that will
800   * cause us hang around a bit longer waiting on RegionServers to check-in.
801   */
802  private int getMinToStart() {
803    if (master.isInMaintenanceMode()) {
804      // If in maintenance mode, then in process region server hosting meta will be the only server
805      // available
806      return 1;
807    }
808
809    int minimumRequired = 1;
810    int minToStart = this.master.getConfiguration().getInt(WAIT_ON_REGIONSERVERS_MINTOSTART, -1);
811    // Ensure we are never less than minimumRequired else stuff won't work.
812    return Math.max(minToStart, minimumRequired);
813  }
814
815  /**
816   * Wait for the region servers to report in. We will wait until one of this condition is met: -
817   * the master is stopped - the 'hbase.master.wait.on.regionservers.maxtostart' number of region
818   * servers is reached - the 'hbase.master.wait.on.regionservers.mintostart' is reached AND there
819   * have been no new region server in for 'hbase.master.wait.on.regionservers.interval' time AND
820   * the 'hbase.master.wait.on.regionservers.timeout' is reached
821   */
822  public void waitForRegionServers(MonitoredTask status) throws InterruptedException {
823    final long interval =
824      this.master.getConfiguration().getLong(WAIT_ON_REGIONSERVERS_INTERVAL, 1500);
825    final long timeout =
826      this.master.getConfiguration().getLong(WAIT_ON_REGIONSERVERS_TIMEOUT, 4500);
827    // Min is not an absolute; just a friction making us wait longer on server checkin.
828    int minToStart = getMinToStart();
829    int maxToStart =
830      this.master.getConfiguration().getInt(WAIT_ON_REGIONSERVERS_MAXTOSTART, Integer.MAX_VALUE);
831    if (maxToStart < minToStart) {
832      LOG.warn(String.format("The value of '%s' (%d) is set less than '%s' (%d), ignoring.",
833        WAIT_ON_REGIONSERVERS_MAXTOSTART, maxToStart, WAIT_ON_REGIONSERVERS_MINTOSTART,
834        minToStart));
835      maxToStart = Integer.MAX_VALUE;
836    }
837
838    long now = EnvironmentEdgeManager.currentTime();
839    final long startTime = now;
840    long slept = 0;
841    long lastLogTime = 0;
842    long lastCountChange = startTime;
843    int count = countOfRegionServers();
844    int oldCount = 0;
845    // This while test is a little hard to read. We try to comment it in below but in essence:
846    // Wait if Master is not stopped and the number of regionservers that have checked-in is
847    // less than the maxToStart. Both of these conditions will be true near universally.
848    // Next, we will keep cycling if ANY of the following three conditions are true:
849    // 1. The time since a regionserver registered is < interval (means servers are actively
850    // checking in).
851    // 2. We are under the total timeout.
852    // 3. The count of servers is < minimum.
853    for (ServerListener listener : this.listeners) {
854      listener.waiting();
855    }
856    while (
857      !this.master.isStopped() && !isClusterShutdown() && count < maxToStart
858        && ((lastCountChange + interval) > now || timeout > slept || count < minToStart)
859    ) {
860      // Log some info at every interval time or if there is a change
861      if (oldCount != count || lastLogTime + interval < now) {
862        lastLogTime = now;
863        String msg =
864          "Waiting on regionserver count=" + count + "; waited=" + slept + "ms, expecting min="
865            + minToStart + " server(s), max=" + getStrForMax(maxToStart) + " server(s), "
866            + "timeout=" + timeout + "ms, lastChange=" + (now - lastCountChange) + "ms";
867        LOG.info(msg);
868        status.setStatus(msg);
869      }
870
871      // We sleep for some time
872      final long sleepTime = 50;
873      Thread.sleep(sleepTime);
874      now = EnvironmentEdgeManager.currentTime();
875      slept = now - startTime;
876
877      oldCount = count;
878      count = countOfRegionServers();
879      if (count != oldCount) {
880        lastCountChange = now;
881      }
882    }
883    // Did we exit the loop because cluster is going down?
884    if (isClusterShutdown()) {
885      this.master.stop("Cluster shutdown");
886    }
887    LOG.info("Finished waiting on RegionServer count=" + count + "; waited=" + slept + "ms,"
888      + " expected min=" + minToStart + " server(s), max=" + getStrForMax(maxToStart)
889      + " server(s)," + " master is " + (this.master.isStopped() ? "stopped." : "running"));
890  }
891
892  private String getStrForMax(final int max) {
893    return max == Integer.MAX_VALUE ? "NO_LIMIT" : Integer.toString(max);
894  }
895
896  /** Returns A copy of the internal list of online servers. */
897  public List<ServerName> getOnlineServersList() {
898    // TODO: optimize the load balancer call so we don't need to make a new list
899    // TODO: FIX. THIS IS POPULAR CALL.
900    return new ArrayList<>(this.onlineServers.keySet());
901  }
902
903  /**
904   * @param keys                 The target server name
905   * @param idleServerPredicator Evaluates the server on the given load
906   * @return A copy of the internal list of online servers matched by the predicator
907   */
908  public List<ServerName> getOnlineServersListWithPredicator(List<ServerName> keys,
909    Predicate<ServerMetrics> idleServerPredicator) {
910    List<ServerName> names = new ArrayList<>();
911    if (keys != null && idleServerPredicator != null) {
912      keys.forEach(name -> {
913        ServerMetrics load = onlineServers.get(name);
914        if (load != null) {
915          if (idleServerPredicator.test(load)) {
916            names.add(name);
917          }
918        }
919      });
920    }
921    return names;
922  }
923
924  /** Returns A copy of the internal list of draining servers. */
925  public List<ServerName> getDrainingServersList() {
926    return new ArrayList<>(this.drainingServers);
927  }
928
929  public boolean isServerOnline(ServerName serverName) {
930    return serverName != null && onlineServers.containsKey(serverName);
931  }
932
933  public enum ServerLiveState {
934    LIVE,
935    DEAD,
936    UNKNOWN
937  }
938
939  /** Returns whether the server is online, dead, or unknown. */
940  public synchronized ServerLiveState isServerKnownAndOnline(ServerName serverName) {
941    return onlineServers.containsKey(serverName)
942      ? ServerLiveState.LIVE
943      : (deadservers.isDeadServer(serverName) ? ServerLiveState.DEAD : ServerLiveState.UNKNOWN);
944  }
945
946  /**
947   * Check if a server is known to be dead. A server can be online, or known to be dead, or unknown
948   * to this manager (i.e, not online, not known to be dead either; it is simply not tracked by the
949   * master any more, for example, a very old previous instance).
950   */
951  public synchronized boolean isServerDead(ServerName serverName) {
952    return serverName == null || deadservers.isDeadServer(serverName);
953  }
954
955  /**
956   * Check if a server is unknown. A server can be online, or known to be dead, or unknown to this
957   * manager (i.e, not online, not known to be dead either; it is simply not tracked by the master
958   * any more, for example, a very old previous instance).
959   */
960  public boolean isServerUnknown(ServerName serverName) {
961    return serverName == null
962      || (!onlineServers.containsKey(serverName) && !deadservers.isDeadServer(serverName));
963  }
964
965  public void shutdownCluster() {
966    String statusStr = "Cluster shutdown requested of master=" + this.master.getServerName();
967    LOG.info(statusStr);
968    this.clusterShutdown.set(true);
969    if (onlineServers.isEmpty()) {
970      // we do not synchronize here so this may cause a double stop, but not a big deal
971      master.stop("OnlineServer=0 right after cluster shutdown set");
972    }
973  }
974
975  public boolean isClusterShutdown() {
976    return this.clusterShutdown.get();
977  }
978
979  /**
980   * start chore in ServerManager
981   */
982  public void startChore() {
983    Configuration c = master.getConfiguration();
984    if (persistFlushedSequenceId) {
985      new Thread(() -> {
986        // after AM#loadMeta, RegionStates should be loaded, and some regions are
987        // deleted by drop/split/merge during removeDeletedRegionFromLoadedFlushedSequenceIds,
988        // but these deleted regions are not added back to RegionStates,
989        // so we can safely remove deleted regions.
990        removeDeletedRegionFromLoadedFlushedSequenceIds();
991      }, "RemoveDeletedRegionSyncThread").start();
992      int flushPeriod =
993        c.getInt(FLUSHEDSEQUENCEID_FLUSHER_INTERVAL, FLUSHEDSEQUENCEID_FLUSHER_INTERVAL_DEFAULT);
994      flushedSeqIdFlusher = new FlushedSequenceIdFlusher("FlushedSequenceIdFlusher", flushPeriod);
995      master.getChoreService().scheduleChore(flushedSeqIdFlusher);
996    }
997  }
998
999  /**
1000   * Stop the ServerManager.
1001   */
1002  public void stop() {
1003    if (flushedSeqIdFlusher != null) {
1004      flushedSeqIdFlusher.shutdown();
1005    }
1006    if (persistFlushedSequenceId) {
1007      try {
1008        persistRegionLastFlushedSequenceIds();
1009      } catch (IOException e) {
1010        LOG.warn("Failed to persist last flushed sequence id of regions" + " to file system", e);
1011      }
1012    }
1013  }
1014
1015  /**
1016   * Creates a list of possible destinations for a region. It contains the online servers, but not
1017   * the draining or dying servers.
1018   * @param serversToExclude can be null if there is no server to exclude
1019   */
1020  public List<ServerName> createDestinationServersList(final List<ServerName> serversToExclude) {
1021    Set<ServerName> destServers = new HashSet<>();
1022    onlineServers.forEach((sn, sm) -> {
1023      if (sm.getLastReportTimestamp() > 0) {
1024        // This means we have already called regionServerReport at leaset once, then let's include
1025        // this server for region assignment. This is an optimization to avoid assigning regions to
1026        // an uninitialized server. See HBASE-25032 for more details.
1027        destServers.add(sn);
1028      }
1029    });
1030
1031    if (serversToExclude != null) {
1032      destServers.removeAll(serversToExclude);
1033    }
1034
1035    // Loop through the draining server list and remove them from the server list
1036    final List<ServerName> drainingServersCopy = getDrainingServersList();
1037    destServers.removeAll(drainingServersCopy);
1038
1039    return new ArrayList<>(destServers);
1040  }
1041
1042  /**
1043   * Calls {@link #createDestinationServersList} without server to exclude.
1044   */
1045  public List<ServerName> createDestinationServersList() {
1046    return createDestinationServersList(null);
1047  }
1048
1049  /**
1050   * To clear any dead server with same host name and port of any online server
1051   */
1052  void clearDeadServersWithSameHostNameAndPortOfOnlineServer() {
1053    for (ServerName serverName : getOnlineServersList()) {
1054      deadservers.cleanAllPreviousInstances(serverName);
1055    }
1056  }
1057
1058  /**
1059   * Called by delete table and similar to notify the ServerManager that a region was removed.
1060   */
1061  public void removeRegion(final RegionInfo regionInfo) {
1062    final byte[] encodedName = regionInfo.getEncodedNameAsBytes();
1063    storeFlushedSequenceIdsByRegion.remove(encodedName);
1064    flushedSequenceIdByRegion.remove(encodedName);
1065  }
1066
1067  public boolean isRegionInServerManagerStates(final RegionInfo hri) {
1068    final byte[] encodedName = hri.getEncodedNameAsBytes();
1069    return (storeFlushedSequenceIdsByRegion.containsKey(encodedName)
1070      || flushedSequenceIdByRegion.containsKey(encodedName));
1071  }
1072
1073  /**
1074   * Called by delete table and similar to notify the ServerManager that a region was removed.
1075   */
1076  public void removeRegions(final List<RegionInfo> regions) {
1077    for (RegionInfo hri : regions) {
1078      removeRegion(hri);
1079    }
1080  }
1081
1082  /**
1083   * May return 0 when server is not online.
1084   */
1085  public int getVersionNumber(ServerName serverName) {
1086    ServerMetrics serverMetrics = onlineServers.get(serverName);
1087    return serverMetrics != null ? serverMetrics.getVersionNumber() : 0;
1088  }
1089
1090  /**
1091   * May return "0.0.0" when server is not online
1092   */
1093  public String getVersion(ServerName serverName) {
1094    ServerMetrics serverMetrics = onlineServers.get(serverName);
1095    return serverMetrics != null ? serverMetrics.getVersion() : "0.0.0";
1096  }
1097
1098  public int getInfoPort(ServerName serverName) {
1099    ServerMetrics serverMetrics = onlineServers.get(serverName);
1100    return serverMetrics != null ? serverMetrics.getInfoServerPort() : 0;
1101  }
1102
1103  /**
1104   * Persist last flushed sequence id of each region to HDFS
1105   * @throws IOException if persit to HDFS fails
1106   */
1107  private void persistRegionLastFlushedSequenceIds() throws IOException {
1108    if (isFlushSeqIdPersistInProgress) {
1109      return;
1110    }
1111    isFlushSeqIdPersistInProgress = true;
1112    try {
1113      Configuration conf = master.getConfiguration();
1114      Path rootDir = CommonFSUtils.getRootDir(conf);
1115      Path lastFlushedSeqIdPath = new Path(rootDir, LAST_FLUSHED_SEQ_ID_FILE);
1116      FileSystem fs = FileSystem.get(conf);
1117      if (fs.exists(lastFlushedSeqIdPath)) {
1118        LOG.info("Rewriting .lastflushedseqids file at: " + lastFlushedSeqIdPath);
1119        if (!fs.delete(lastFlushedSeqIdPath, false)) {
1120          throw new IOException("Unable to remove existing " + lastFlushedSeqIdPath);
1121        }
1122      } else {
1123        LOG.info("Writing .lastflushedseqids file at: " + lastFlushedSeqIdPath);
1124      }
1125      FSDataOutputStream out = fs.create(lastFlushedSeqIdPath);
1126      FlushedSequenceId.Builder flushedSequenceIdBuilder = FlushedSequenceId.newBuilder();
1127      try {
1128        for (Entry<byte[], Long> entry : flushedSequenceIdByRegion.entrySet()) {
1129          FlushedRegionSequenceId.Builder flushedRegionSequenceIdBuilder =
1130            FlushedRegionSequenceId.newBuilder();
1131          flushedRegionSequenceIdBuilder.setRegionEncodedName(ByteString.copyFrom(entry.getKey()));
1132          flushedRegionSequenceIdBuilder.setSeqId(entry.getValue());
1133          ConcurrentNavigableMap<byte[], Long> storeSeqIds =
1134            storeFlushedSequenceIdsByRegion.get(entry.getKey());
1135          if (storeSeqIds != null) {
1136            for (Entry<byte[], Long> store : storeSeqIds.entrySet()) {
1137              FlushedStoreSequenceId.Builder flushedStoreSequenceIdBuilder =
1138                FlushedStoreSequenceId.newBuilder();
1139              flushedStoreSequenceIdBuilder.setFamily(ByteString.copyFrom(store.getKey()));
1140              flushedStoreSequenceIdBuilder.setSeqId(store.getValue());
1141              flushedRegionSequenceIdBuilder.addStores(flushedStoreSequenceIdBuilder);
1142            }
1143          }
1144          flushedSequenceIdBuilder.addRegionSequenceId(flushedRegionSequenceIdBuilder);
1145        }
1146        flushedSequenceIdBuilder.build().writeDelimitedTo(out);
1147      } finally {
1148        if (out != null) {
1149          out.close();
1150        }
1151      }
1152    } finally {
1153      isFlushSeqIdPersistInProgress = false;
1154    }
1155  }
1156
1157  /**
1158   * Load last flushed sequence id of each region from HDFS, if persisted
1159   */
1160  public void loadLastFlushedSequenceIds() throws IOException {
1161    if (!persistFlushedSequenceId) {
1162      return;
1163    }
1164    Configuration conf = master.getConfiguration();
1165    Path rootDir = CommonFSUtils.getRootDir(conf);
1166    Path lastFlushedSeqIdPath = new Path(rootDir, LAST_FLUSHED_SEQ_ID_FILE);
1167    FileSystem fs = FileSystem.get(conf);
1168    if (!fs.exists(lastFlushedSeqIdPath)) {
1169      LOG.info("No .lastflushedseqids found at " + lastFlushedSeqIdPath
1170        + " will record last flushed sequence id"
1171        + " for regions by regionserver report all over again");
1172      return;
1173    } else {
1174      LOG.info("begin to load .lastflushedseqids at " + lastFlushedSeqIdPath);
1175    }
1176    FSDataInputStream in = fs.open(lastFlushedSeqIdPath);
1177    try {
1178      FlushedSequenceId flushedSequenceId = FlushedSequenceId.parseDelimitedFrom(in);
1179      if (flushedSequenceId == null) {
1180        LOG.info(".lastflushedseqids found at {} is empty", lastFlushedSeqIdPath);
1181        return;
1182      }
1183      for (FlushedRegionSequenceId flushedRegionSequenceId : flushedSequenceId
1184        .getRegionSequenceIdList()) {
1185        byte[] encodedRegionName = flushedRegionSequenceId.getRegionEncodedName().toByteArray();
1186        flushedSequenceIdByRegion.putIfAbsent(encodedRegionName,
1187          flushedRegionSequenceId.getSeqId());
1188        if (
1189          flushedRegionSequenceId.getStoresList() != null
1190            && flushedRegionSequenceId.getStoresList().size() != 0
1191        ) {
1192          ConcurrentNavigableMap<byte[], Long> storeFlushedSequenceId =
1193            computeIfAbsent(storeFlushedSequenceIdsByRegion, encodedRegionName,
1194              () -> new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR));
1195          for (FlushedStoreSequenceId flushedStoreSequenceId : flushedRegionSequenceId
1196            .getStoresList()) {
1197            storeFlushedSequenceId.put(flushedStoreSequenceId.getFamily().toByteArray(),
1198              flushedStoreSequenceId.getSeqId());
1199          }
1200        }
1201      }
1202    } finally {
1203      in.close();
1204    }
1205  }
1206
1207  /**
1208   * Regions may have been removed between latest persist of FlushedSequenceIds and master abort. So
1209   * after loading FlushedSequenceIds from file, and after meta loaded, we need to remove the
1210   * deleted region according to RegionStates.
1211   */
1212  public void removeDeletedRegionFromLoadedFlushedSequenceIds() {
1213    RegionStates regionStates = master.getAssignmentManager().getRegionStates();
1214    Iterator<byte[]> it = flushedSequenceIdByRegion.keySet().iterator();
1215    while (it.hasNext()) {
1216      byte[] regionEncodedName = it.next();
1217      if (regionStates.getRegionState(Bytes.toStringBinary(regionEncodedName)) == null) {
1218        it.remove();
1219        storeFlushedSequenceIdsByRegion.remove(regionEncodedName);
1220      }
1221    }
1222  }
1223
1224  private class FlushedSequenceIdFlusher extends ScheduledChore {
1225
1226    public FlushedSequenceIdFlusher(String name, int p) {
1227      super(name, master, p, 60 * 1000); // delay one minute before first execute
1228    }
1229
1230    @Override
1231    protected void chore() {
1232      try {
1233        persistRegionLastFlushedSequenceIds();
1234      } catch (IOException e) {
1235        LOG.debug("Failed to persist last flushed sequence id of regions" + " to file system", e);
1236      }
1237    }
1238  }
1239}