001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master;
019
020import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
021
022import java.io.IOException;
023import java.net.InetAddress;
024import java.util.ArrayList;
025import java.util.Collections;
026import java.util.HashSet;
027import java.util.Iterator;
028import java.util.List;
029import java.util.Map;
030import java.util.Map.Entry;
031import java.util.Objects;
032import java.util.Set;
033import java.util.concurrent.ConcurrentNavigableMap;
034import java.util.concurrent.ConcurrentSkipListMap;
035import java.util.concurrent.CopyOnWriteArrayList;
036import java.util.concurrent.atomic.AtomicBoolean;
037import java.util.function.Predicate;
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.fs.FSDataInputStream;
040import org.apache.hadoop.fs.FSDataOutputStream;
041import org.apache.hadoop.fs.FileSystem;
042import org.apache.hadoop.fs.Path;
043import org.apache.hadoop.hbase.ClockOutOfSyncException;
044import org.apache.hadoop.hbase.HConstants;
045import org.apache.hadoop.hbase.NotServingRegionException;
046import org.apache.hadoop.hbase.RegionMetrics;
047import org.apache.hadoop.hbase.ScheduledChore;
048import org.apache.hadoop.hbase.ServerMetrics;
049import org.apache.hadoop.hbase.ServerMetricsBuilder;
050import org.apache.hadoop.hbase.ServerName;
051import org.apache.hadoop.hbase.YouAreDeadException;
052import org.apache.hadoop.hbase.client.AsyncClusterConnection;
053import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
054import org.apache.hadoop.hbase.client.RegionInfo;
055import org.apache.hadoop.hbase.conf.ConfigurationObserver;
056import org.apache.hadoop.hbase.ipc.DecommissionedHostRejectedException;
057import org.apache.hadoop.hbase.ipc.RemoteWithExtrasException;
058import org.apache.hadoop.hbase.master.assignment.RegionStates;
059import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
060import org.apache.hadoop.hbase.monitoring.MonitoredTask;
061import org.apache.hadoop.hbase.procedure2.Procedure;
062import org.apache.hadoop.hbase.util.Bytes;
063import org.apache.hadoop.hbase.util.CommonFSUtils;
064import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
065import org.apache.hadoop.hbase.util.FutureUtils;
066import org.apache.hadoop.hbase.zookeeper.ZKUtil;
067import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
068import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
069import org.apache.yetus.audience.InterfaceAudience;
070import org.apache.zookeeper.KeeperException;
071import org.slf4j.Logger;
072import org.slf4j.LoggerFactory;
073
074import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
075import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
076
077import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
078import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
079import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
080import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
081import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.FlushedRegionSequenceId;
082import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.FlushedSequenceId;
083import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.FlushedStoreSequenceId;
084import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
085
086/**
087 * The ServerManager class manages info about region servers.
088 * <p>
089 * Maintains lists of online and dead servers. Processes the startups, shutdowns, and deaths of
090 * region servers.
091 * <p>
092 * Servers are distinguished in two different ways. A given server has a location, specified by
093 * hostname and port, and of which there can only be one online at any given time. A server instance
094 * is specified by the location (hostname and port) as well as the startcode (timestamp from when
095 * the server was started). This is used to differentiate a restarted instance of a given server
096 * from the original instance.
097 * <p>
098 * If a sever is known not to be running any more, it is called dead. The dead server needs to be
099 * handled by a ServerShutdownHandler. If the handler is not enabled yet, the server can't be
100 * handled right away so it is queued up. After the handler is enabled, the server will be submitted
101 * to a handler to handle. However, the handler may be just partially enabled. If so, the server
102 * cannot be fully processed, and be queued up for further processing. A server is fully processed
103 * only after the handler is fully enabled and has completed the handling.
104 */
105@InterfaceAudience.Private
106public class ServerManager implements ConfigurationObserver {
107  public static final String WAIT_ON_REGIONSERVERS_MAXTOSTART =
108    "hbase.master.wait.on.regionservers.maxtostart";
109
110  public static final String WAIT_ON_REGIONSERVERS_MINTOSTART =
111    "hbase.master.wait.on.regionservers.mintostart";
112
113  public static final String WAIT_ON_REGIONSERVERS_TIMEOUT =
114    "hbase.master.wait.on.regionservers.timeout";
115
116  public static final String WAIT_ON_REGIONSERVERS_INTERVAL =
117    "hbase.master.wait.on.regionservers.interval";
118
119  /**
120   * see HBASE-20727 if set to true, flushedSequenceIdByRegion and storeFlushedSequenceIdsByRegion
121   * will be persisted to HDFS and loaded when master restart to speed up log split
122   */
123  public static final String PERSIST_FLUSHEDSEQUENCEID =
124    "hbase.master.persist.flushedsequenceid.enabled";
125
126  public static final boolean PERSIST_FLUSHEDSEQUENCEID_DEFAULT = true;
127
128  public static final String FLUSHEDSEQUENCEID_FLUSHER_INTERVAL =
129    "hbase.master.flushedsequenceid.flusher.interval";
130
131  public static final int FLUSHEDSEQUENCEID_FLUSHER_INTERVAL_DEFAULT = 3 * 60 * 60 * 1000; // 3
132                                                                                           // hours
133
134  public static final String MAX_CLOCK_SKEW_MS = "hbase.master.maxclockskew";
135
136  private static final Logger LOG = LoggerFactory.getLogger(ServerManager.class);
137
138  // Set if we are to shutdown the cluster.
139  private AtomicBoolean clusterShutdown = new AtomicBoolean(false);
140
141  /**
142   * The last flushed sequence id for a region.
143   */
144  private final ConcurrentNavigableMap<byte[], Long> flushedSequenceIdByRegion =
145    new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
146
147  private boolean persistFlushedSequenceId = true;
148  private volatile boolean isFlushSeqIdPersistInProgress = false;
149  /** File on hdfs to store last flushed sequence id of regions */
150  private static final String LAST_FLUSHED_SEQ_ID_FILE = ".lastflushedseqids";
151  private FlushedSequenceIdFlusher flushedSeqIdFlusher;
152
153  /**
154   * The last flushed sequence id for a store in a region.
155   */
156  private final ConcurrentNavigableMap<byte[],
157    ConcurrentNavigableMap<byte[], Long>> storeFlushedSequenceIdsByRegion =
158      new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
159
160  /** Map of registered servers to their current load */
161  private final ConcurrentNavigableMap<ServerName, ServerMetrics> onlineServers =
162    new ConcurrentSkipListMap<>();
163
164  /** List of region servers that should not get any more new regions. */
165  private final ArrayList<ServerName> drainingServers = new ArrayList<>();
166
167  private final MasterServices master;
168  private final RegionServerList storage;
169
170  private final DeadServer deadservers = new DeadServer();
171
172  private final long maxSkew;
173  private final long warningSkew;
174
175  /** Listeners that are called on server events. */
176  private List<ServerListener> listeners = new CopyOnWriteArrayList<>();
177
178  /** Configured value of HConstants.REJECT_DECOMMISSIONED_HOSTS_KEY */
179  private volatile boolean rejectDecommissionedHostsConfig;
180
181  /**
182   * Constructor.
183   */
184  public ServerManager(final MasterServices master, RegionServerList storage) {
185    this.master = master;
186    this.storage = storage;
187    Configuration c = master.getConfiguration();
188    maxSkew = c.getLong(MAX_CLOCK_SKEW_MS, 30000);
189    warningSkew = c.getLong("hbase.master.warningclockskew", 10000);
190    persistFlushedSequenceId =
191      c.getBoolean(PERSIST_FLUSHEDSEQUENCEID, PERSIST_FLUSHEDSEQUENCEID_DEFAULT);
192    rejectDecommissionedHostsConfig = getRejectDecommissionedHostsConfig(c);
193  }
194
195  /**
196   * Implementation of the ConfigurationObserver interface. We are interested in live-loading the
197   * configuration value of HConstants.REJECT_DECOMMISSIONED_HOSTS_KEY
198   * @param conf Server configuration instance
199   */
200  @Override
201  public void onConfigurationChange(Configuration conf) {
202    final boolean newValue = getRejectDecommissionedHostsConfig(conf);
203    if (rejectDecommissionedHostsConfig == newValue) {
204      // no-op
205      return;
206    }
207
208    LOG.info("Config Reload for RejectDecommissionedHosts. previous value: {}, new value: {}",
209      rejectDecommissionedHostsConfig, newValue);
210
211    rejectDecommissionedHostsConfig = newValue;
212  }
213
214  /**
215   * Reads the value of HConstants.REJECT_DECOMMISSIONED_HOSTS_KEY from the config and returns it
216   * @param conf Configuration instance of the Master
217   */
218  public boolean getRejectDecommissionedHostsConfig(Configuration conf) {
219    return conf.getBoolean(HConstants.REJECT_DECOMMISSIONED_HOSTS_KEY,
220      HConstants.REJECT_DECOMMISSIONED_HOSTS_DEFAULT);
221  }
222
223  /**
224   * Add the listener to the notification list.
225   * @param listener The ServerListener to register
226   */
227  public void registerListener(final ServerListener listener) {
228    this.listeners.add(listener);
229  }
230
231  /**
232   * Remove the listener from the notification list.
233   * @param listener The ServerListener to unregister
234   */
235  public boolean unregisterListener(final ServerListener listener) {
236    return this.listeners.remove(listener);
237  }
238
239  /**
240   * Let the server manager know a new regionserver has come online
241   * @param request       the startup request
242   * @param versionNumber the version number of the new regionserver
243   * @param version       the version of the new regionserver, could contain strings like "SNAPSHOT"
244   * @param ia            the InetAddress from which request is received
245   * @return The ServerName we know this server as.
246   */
247  ServerName regionServerStartup(RegionServerStartupRequest request, int versionNumber,
248    String version, InetAddress ia) throws IOException {
249    // Test for case where we get a region startup message from a regionserver
250    // that has been quickly restarted but whose znode expiration handler has
251    // not yet run, or from a server whose fail we are currently processing.
252    // Test its host+port combo is present in serverAddressToServerInfo. If it
253    // is, reject the server and trigger its expiration. The next time it comes
254    // in, it should have been removed from serverAddressToServerInfo and queued
255    // for processing by ProcessServerShutdown.
256
257    // if use-ip is enabled, we will use ip to expose Master/RS service for client,
258    // see HBASE-27304 for details.
259    boolean useIp = master.getConfiguration().getBoolean(HConstants.HBASE_SERVER_USEIP_ENABLED_KEY,
260      HConstants.HBASE_SERVER_USEIP_ENABLED_DEFAULT);
261    String isaHostName = useIp ? ia.getHostAddress() : ia.getHostName();
262    final String hostname =
263      request.hasUseThisHostnameInstead() ? request.getUseThisHostnameInstead() : isaHostName;
264    ServerName sn = ServerName.valueOf(hostname, request.getPort(), request.getServerStartCode());
265
266    // Check if the host should be rejected based on it's decommissioned status
267    checkRejectableDecommissionedStatus(sn);
268
269    checkClockSkew(sn, request.getServerCurrentTime());
270    checkIsDead(sn, "STARTUP");
271    if (!checkAndRecordNewServer(sn, ServerMetricsBuilder.of(sn, versionNumber, version))) {
272      LOG.warn("THIS SHOULD NOT HAPPEN, RegionServerStartup could not record the server: {}", sn);
273    }
274    storage.started(sn);
275    return sn;
276  }
277
278  /**
279   * Updates last flushed sequence Ids for the regions on server sn
280   */
281  private void updateLastFlushedSequenceIds(ServerName sn, ServerMetrics hsl) {
282    for (Entry<byte[], RegionMetrics> entry : hsl.getRegionMetrics().entrySet()) {
283      byte[] encodedRegionName = Bytes.toBytes(RegionInfo.encodeRegionName(entry.getKey()));
284      Long existingValue = flushedSequenceIdByRegion.get(encodedRegionName);
285      long l = entry.getValue().getCompletedSequenceId();
286      // Don't let smaller sequence ids override greater sequence ids.
287      if (LOG.isTraceEnabled()) {
288        LOG.trace(Bytes.toString(encodedRegionName) + ", existingValue=" + existingValue
289          + ", completeSequenceId=" + l);
290      }
291      if (existingValue == null || (l != HConstants.NO_SEQNUM && l > existingValue)) {
292        flushedSequenceIdByRegion.put(encodedRegionName, l);
293      } else if (l != HConstants.NO_SEQNUM && l < existingValue) {
294        LOG.warn("RegionServer " + sn + " indicates a last flushed sequence id (" + l
295          + ") that is less than the previous last flushed sequence id (" + existingValue
296          + ") for region " + Bytes.toString(entry.getKey()) + " Ignoring.");
297      }
298      ConcurrentNavigableMap<byte[], Long> storeFlushedSequenceId =
299        computeIfAbsent(storeFlushedSequenceIdsByRegion, encodedRegionName,
300          () -> new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR));
301      for (Entry<byte[], Long> storeSeqId : entry.getValue().getStoreSequenceId().entrySet()) {
302        byte[] family = storeSeqId.getKey();
303        existingValue = storeFlushedSequenceId.get(family);
304        l = storeSeqId.getValue();
305        if (LOG.isTraceEnabled()) {
306          LOG.trace(Bytes.toString(encodedRegionName) + ", family=" + Bytes.toString(family)
307            + ", existingValue=" + existingValue + ", completeSequenceId=" + l);
308        }
309        // Don't let smaller sequence ids override greater sequence ids.
310        if (existingValue == null || (l != HConstants.NO_SEQNUM && l > existingValue.longValue())) {
311          storeFlushedSequenceId.put(family, l);
312        }
313      }
314    }
315  }
316
317  public void regionServerReport(ServerName sn, ServerMetrics sl) throws YouAreDeadException {
318    checkIsDead(sn, "REPORT");
319    if (null == this.onlineServers.replace(sn, sl)) {
320      // Already have this host+port combo and its just different start code?
321      // Just let the server in. Presume master joining a running cluster.
322      // recordNewServer is what happens at the end of reportServerStartup.
323      // The only thing we are skipping is passing back to the regionserver
324      // the ServerName to use. Here we presume a master has already done
325      // that so we'll press on with whatever it gave us for ServerName.
326      if (!checkAndRecordNewServer(sn, sl)) {
327        // Master already registered server with same (host + port) and higher startcode.
328        // This can happen if regionserver report comes late from old server
329        // (possible race condition), by that time master has already processed SCP for that
330        // server and started accepting regionserver report from new server i.e. server with
331        // same (host + port) and higher startcode.
332        // The exception thrown here is not meant to tell the region server it is dead because if
333        // there is a new server on the same host port, the old server should have already been
334        // dead in ideal situation.
335        // The exception thrown here is to skip the later steps of the whole regionServerReport
336        // request processing. Usually, after recording it in ServerManager, we will call the
337        // related methods in AssignmentManager to record region states. If the region server
338        // is already dead, we should not do these steps anymore, so here we throw an exception
339        // to let the upper layer know that they should not continue processing anymore.
340        final String errorMsg = "RegionServerReport received from " + sn
341          + ", but another server with the same name and higher startcode is already registered,"
342          + " ignoring";
343        LOG.warn(errorMsg);
344        throw new YouAreDeadException(errorMsg);
345      }
346    }
347    updateLastFlushedSequenceIds(sn, sl);
348  }
349
350  /**
351   * Checks if the Master is configured to reject decommissioned hosts or not. When it's configured
352   * to do so, any RegionServer trying to join the cluster will have it's host checked against the
353   * list of hosts of currently decommissioned servers and potentially get prevented from reporting
354   * for duty; otherwise, we do nothing and we let them pass to the next check. See HBASE-28342 for
355   * details.
356   * @param sn The ServerName to check for
357   * @throws DecommissionedHostRejectedException if the Master is configured to reject
358   *                                             decommissioned hosts and this host exists in the
359   *                                             list of the decommissioned servers
360   */
361  private void checkRejectableDecommissionedStatus(ServerName sn)
362    throws DecommissionedHostRejectedException {
363    LOG.info("Checking decommissioned status of RegionServer {}", sn.getServerName());
364
365    // If the Master is not configured to reject decommissioned hosts, return early.
366    if (!rejectDecommissionedHostsConfig) {
367      return;
368    }
369
370    // Look for a match for the hostname in the list of decommissioned servers
371    for (ServerName server : getDrainingServersList()) {
372      if (Objects.equals(server.getHostname(), sn.getHostname())) {
373        // Found a match and master is configured to reject decommissioned hosts, throw exception!
374        LOG.warn(
375          "Rejecting RegionServer {} from reporting for duty because Master is configured "
376            + "to reject decommissioned hosts and this host was marked as such in the past.",
377          sn.getServerName());
378        throw new DecommissionedHostRejectedException(String.format(
379          "Host %s exists in the list of decommissioned servers and Master is configured to "
380            + "reject decommissioned hosts",
381          sn.getHostname()));
382      }
383    }
384  }
385
386  /**
387   * Check is a server of same host and port already exists, if not, or the existed one got a
388   * smaller start code, record it.
389   * @param serverName the server to check and record
390   * @param sl         the server load on the server
391   * @return true if the server is recorded, otherwise, false
392   */
393  boolean checkAndRecordNewServer(final ServerName serverName, final ServerMetrics sl) {
394    ServerName existingServer = null;
395    synchronized (this.onlineServers) {
396      existingServer = findServerWithSameHostnamePortWithLock(serverName);
397      if (existingServer != null && (existingServer.getStartcode() > serverName.getStartcode())) {
398        LOG.info("Server serverName=" + serverName + " rejected; we already have "
399          + existingServer.toString() + " registered with same hostname and port");
400        return false;
401      }
402      recordNewServerWithLock(serverName, sl);
403    }
404
405    // Tell our listeners that a server was added
406    if (!this.listeners.isEmpty()) {
407      for (ServerListener listener : this.listeners) {
408        listener.serverAdded(serverName);
409      }
410    }
411
412    // Note that we assume that same ts means same server, and don't expire in that case.
413    // TODO: ts can theoretically collide due to clock shifts, so this is a bit hacky.
414    if (existingServer != null && (existingServer.getStartcode() < serverName.getStartcode())) {
415      LOG.info("Triggering server recovery; existingServer " + existingServer
416        + " looks stale, new server:" + serverName);
417      expireServer(existingServer);
418    }
419    return true;
420  }
421
422  /**
423   * Find out the region servers crashed between the crash of the previous master instance and the
424   * current master instance and schedule SCP for them.
425   * <p/>
426   * Since the {@code RegionServerTracker} has already helped us to construct the online servers set
427   * by scanning zookeeper, now we can compare the online servers with {@code liveServersFromWALDir}
428   * to find out whether there are servers which are already dead.
429   * <p/>
430   * Must be called inside the initialization method of {@code RegionServerTracker} to avoid
431   * concurrency issue.
432   * @param deadServersFromPE     the region servers which already have a SCP associated.
433   * @param liveServersFromWALDir the live region servers from wal directory.
434   */
435  void findDeadServersAndProcess(Set<ServerName> deadServersFromPE,
436    Set<ServerName> liveServersFromWALDir) {
437    deadServersFromPE.forEach(deadservers::putIfAbsent);
438    liveServersFromWALDir.stream().filter(sn -> !onlineServers.containsKey(sn))
439      .forEach(this::expireServer);
440  }
441
442  /**
443   * Checks if the clock skew between the server and the master. If the clock skew exceeds the
444   * configured max, it will throw an exception; if it exceeds the configured warning threshold, it
445   * will log a warning but start normally.
446   * @param serverName Incoming servers's name
447   * @throws ClockOutOfSyncException if the skew exceeds the configured max value
448   */
449  private void checkClockSkew(final ServerName serverName, final long serverCurrentTime)
450    throws ClockOutOfSyncException {
451    long skew = Math.abs(EnvironmentEdgeManager.currentTime() - serverCurrentTime);
452    if (skew > maxSkew) {
453      String message = "Server " + serverName + " has been "
454        + "rejected; Reported time is too far out of sync with master.  " + "Time difference of "
455        + skew + "ms > max allowed of " + maxSkew + "ms";
456      LOG.warn(message);
457      throw new ClockOutOfSyncException(message);
458    } else if (skew > warningSkew) {
459      String message = "Reported time for server " + serverName + " is out of sync with master "
460        + "by " + skew + "ms. (Warning threshold is " + warningSkew + "ms; " + "error threshold is "
461        + maxSkew + "ms)";
462      LOG.warn(message);
463    }
464  }
465
466  /**
467   * Called when RegionServer first reports in for duty and thereafter each time it heartbeats to
468   * make sure it is has not been figured for dead. If this server is on the dead list, reject it
469   * with a YouAreDeadException. If it was dead but came back with a new start code, remove the old
470   * entry from the dead list.
471   * @param what START or REPORT
472   */
473  private void checkIsDead(final ServerName serverName, final String what)
474    throws YouAreDeadException {
475    if (this.deadservers.isDeadServer(serverName)) {
476      // Exact match: host name, port and start code all match with existing one of the
477      // dead servers. So, this server must be dead. Tell it to kill itself.
478      String message =
479        "Server " + what + " rejected; currently processing " + serverName + " as dead server";
480      LOG.debug(message);
481      throw new YouAreDeadException(message);
482    }
483    // Remove dead server with same hostname and port of newly checking in rs after master
484    // initialization. See HBASE-5916 for more information.
485    if (
486      (this.master == null || this.master.isInitialized())
487        && this.deadservers.cleanPreviousInstance(serverName)
488    ) {
489      // This server has now become alive after we marked it as dead.
490      // We removed it's previous entry from the dead list to reflect it.
491      LOG.debug("{} {} came back up, removed it from the dead servers list", what, serverName);
492    }
493  }
494
495  /**
496   * Assumes onlineServers is locked.
497   * @return ServerName with matching hostname and port.
498   */
499  public ServerName findServerWithSameHostnamePortWithLock(final ServerName serverName) {
500    ServerName end =
501      ServerName.valueOf(serverName.getHostname(), serverName.getPort(), Long.MAX_VALUE);
502
503    ServerName r = onlineServers.lowerKey(end);
504    if (r != null) {
505      if (ServerName.isSameAddress(r, serverName)) {
506        return r;
507      }
508    }
509    return null;
510  }
511
512  /**
513   * Adds the onlineServers list. onlineServers should be locked.
514   * @param serverName The remote servers name.
515   */
516  void recordNewServerWithLock(final ServerName serverName, final ServerMetrics sl) {
517    LOG.info("Registering regionserver=" + serverName);
518    this.onlineServers.put(serverName, sl);
519    master.getAssignmentManager().getRegionStates().createServer(serverName);
520  }
521
522  public ConcurrentNavigableMap<byte[], Long> getFlushedSequenceIdByRegion() {
523    return flushedSequenceIdByRegion;
524  }
525
526  public RegionStoreSequenceIds getLastFlushedSequenceId(byte[] encodedRegionName) {
527    RegionStoreSequenceIds.Builder builder = RegionStoreSequenceIds.newBuilder();
528    Long seqId = flushedSequenceIdByRegion.get(encodedRegionName);
529    builder.setLastFlushedSequenceId(seqId != null ? seqId.longValue() : HConstants.NO_SEQNUM);
530    Map<byte[], Long> storeFlushedSequenceId =
531      storeFlushedSequenceIdsByRegion.get(encodedRegionName);
532    if (storeFlushedSequenceId != null) {
533      for (Map.Entry<byte[], Long> entry : storeFlushedSequenceId.entrySet()) {
534        builder.addStoreSequenceId(StoreSequenceId.newBuilder()
535          .setFamilyName(UnsafeByteOperations.unsafeWrap(entry.getKey()))
536          .setSequenceId(entry.getValue().longValue()).build());
537      }
538    }
539    return builder.build();
540  }
541
542  /** Returns ServerMetrics if serverName is known else null */
543  public ServerMetrics getLoad(final ServerName serverName) {
544    return this.onlineServers.get(serverName);
545  }
546
547  /**
548   * Compute the average load across all region servers. Currently, this uses a very naive
549   * computation - just uses the number of regions being served, ignoring stats about number of
550   * requests.
551   * @return the average load
552   */
553  public double getAverageLoad() {
554    int totalLoad = 0;
555    int numServers = 0;
556    for (ServerMetrics sl : this.onlineServers.values()) {
557      numServers++;
558      totalLoad += sl.getRegionMetrics().size();
559    }
560    return numServers == 0 ? 0 : (double) totalLoad / (double) numServers;
561  }
562
563  /** Returns the count of active regionservers */
564  public int countOfRegionServers() {
565    // Presumes onlineServers is a concurrent map
566    return this.onlineServers.size();
567  }
568
569  /** Returns Read-only map of servers to serverinfo */
570  public Map<ServerName, ServerMetrics> getOnlineServers() {
571    // Presumption is that iterating the returned Map is OK.
572    synchronized (this.onlineServers) {
573      return Collections.unmodifiableMap(this.onlineServers);
574    }
575  }
576
577  public DeadServer getDeadServers() {
578    return this.deadservers;
579  }
580
581  /**
582   * Checks if any dead servers are currently in progress.
583   * @return true if any RS are being processed as dead, false if not
584   */
585  public boolean areDeadServersInProgress() throws IOException {
586    return master.getProcedures().stream()
587      .anyMatch(p -> !p.isFinished() && p instanceof ServerCrashProcedure);
588  }
589
590  void letRegionServersShutdown() {
591    long previousLogTime = 0;
592    ServerName sn = master.getServerName();
593    ZKWatcher zkw = master.getZooKeeper();
594    int onlineServersCt;
595    while ((onlineServersCt = onlineServers.size()) > 0) {
596      if (EnvironmentEdgeManager.currentTime() > (previousLogTime + 1000)) {
597        Set<ServerName> remainingServers = onlineServers.keySet();
598        synchronized (onlineServers) {
599          if (remainingServers.size() == 1 && remainingServers.contains(sn)) {
600            // Master will delete itself later.
601            return;
602          }
603        }
604        StringBuilder sb = new StringBuilder();
605        // It's ok here to not sync on onlineServers - merely logging
606        for (ServerName key : remainingServers) {
607          if (sb.length() > 0) {
608            sb.append(", ");
609          }
610          sb.append(key);
611        }
612        LOG.info("Waiting on regionserver(s) " + sb.toString());
613        previousLogTime = EnvironmentEdgeManager.currentTime();
614      }
615
616      try {
617        List<String> servers = getRegionServersInZK(zkw);
618        if (
619          servers == null || servers.isEmpty()
620            || (servers.size() == 1 && servers.contains(sn.toString()))
621        ) {
622          LOG.info("ZK shows there is only the master self online, exiting now");
623          // Master could have lost some ZK events, no need to wait more.
624          break;
625        }
626      } catch (KeeperException ke) {
627        LOG.warn("Failed to list regionservers", ke);
628        // ZK is malfunctioning, don't hang here
629        break;
630      }
631      synchronized (onlineServers) {
632        try {
633          if (onlineServersCt == onlineServers.size()) onlineServers.wait(100);
634        } catch (InterruptedException ignored) {
635          // continue
636        }
637      }
638    }
639  }
640
641  private List<String> getRegionServersInZK(final ZKWatcher zkw) throws KeeperException {
642    return ZKUtil.listChildrenNoWatch(zkw, zkw.getZNodePaths().rsZNode);
643  }
644
645  /**
646   * Expire the passed server. Add it to list of dead servers and queue a shutdown processing.
647   * @return pid if we queued a ServerCrashProcedure else {@link Procedure#NO_PROC_ID} if we did not
648   *         (could happen for many reasons including the fact that its this server that is going
649   *         down or we already have queued an SCP for this server or SCP processing is currently
650   *         disabled because we are in startup phase).
651   */
652  // Redo test so we can make this protected.
653  public synchronized long expireServer(final ServerName serverName) {
654    return expireServer(serverName, false);
655
656  }
657
658  synchronized long expireServer(final ServerName serverName, boolean force) {
659    // THIS server is going down... can't handle our own expiration.
660    if (serverName.equals(master.getServerName())) {
661      if (!(master.isAborted() || master.isStopped())) {
662        master.stop("We lost our znode?");
663      }
664      return Procedure.NO_PROC_ID;
665    }
666    if (this.deadservers.isDeadServer(serverName)) {
667      LOG.warn("Expiration called on {} but already in DeadServer", serverName);
668      return Procedure.NO_PROC_ID;
669    }
670    moveFromOnlineToDeadServers(serverName);
671
672    // If server is in draining mode, remove corresponding znode
673    // In some tests, the mocked HM may not have ZK Instance, hence null check
674    if (master.getZooKeeper() != null) {
675      String drainingZnode = ZNodePaths
676        .joinZNode(master.getZooKeeper().getZNodePaths().drainingZNode, serverName.getServerName());
677      try {
678        ZKUtil.deleteNodeFailSilent(master.getZooKeeper(), drainingZnode);
679      } catch (KeeperException e) {
680        LOG.warn(
681          "Error deleting the draining znode for stopping server " + serverName.getServerName(), e);
682      }
683    }
684
685    // If cluster is going down, yes, servers are going to be expiring; don't
686    // process as a dead server
687    if (isClusterShutdown()) {
688      LOG.info("Cluster shutdown set; " + serverName + " expired; onlineServers="
689        + this.onlineServers.size());
690      if (this.onlineServers.isEmpty()) {
691        master.stop("Cluster shutdown set; onlineServer=0");
692      }
693      return Procedure.NO_PROC_ID;
694    }
695    LOG.info("Processing expiration of " + serverName + " on " + this.master.getServerName());
696    long pid = master.getAssignmentManager().submitServerCrash(serverName, true, force);
697    if (pid == Procedure.NO_PROC_ID) {
698      // skip later processing as we failed to submit SCP
699      return Procedure.NO_PROC_ID;
700    }
701    storage.expired(serverName);
702    // Tell our listeners that a server was removed
703    if (!this.listeners.isEmpty()) {
704      this.listeners.stream().forEach(l -> l.serverRemoved(serverName));
705    }
706    // trigger a persist of flushedSeqId
707    if (flushedSeqIdFlusher != null) {
708      flushedSeqIdFlusher.triggerNow();
709    }
710    return pid;
711  }
712
713  /**
714   * Called when server has expired.
715   */
716  // Locking in this class needs cleanup.
717  public synchronized void moveFromOnlineToDeadServers(final ServerName sn) {
718    synchronized (this.onlineServers) {
719      boolean online = this.onlineServers.containsKey(sn);
720      if (online) {
721        // Remove the server from the known servers lists and update load info BUT
722        // add to deadservers first; do this so it'll show in dead servers list if
723        // not in online servers list.
724        this.deadservers.putIfAbsent(sn);
725        this.onlineServers.remove(sn);
726        onlineServers.notifyAll();
727      } else {
728        // If not online, that is odd but may happen if 'Unknown Servers' -- where meta
729        // has references to servers not online nor in dead servers list. If
730        // 'Unknown Server', don't add to DeadServers else will be there for ever.
731        LOG.trace("Expiration of {} but server not online", sn);
732      }
733    }
734  }
735
736  /*
737   * Remove the server from the drain list.
738   */
739  public synchronized boolean removeServerFromDrainList(final ServerName sn) {
740    LOG.info("Removing server {} from the draining list.", sn);
741
742    // Remove the server from the draining servers lists.
743    return this.drainingServers.remove(sn);
744  }
745
746  /**
747   * Add the server to the drain list.
748   * @return True if the server is added or the server is already on the drain list.
749   */
750  public synchronized boolean addServerToDrainList(final ServerName sn) {
751    // If master is not rejecting decommissioned hosts, warn if the server (sn) is not online.
752    // However, we want to add servers even if they're not online if the master is configured
753    // to reject decommissioned hosts
754    if (!rejectDecommissionedHostsConfig && !this.isServerOnline(sn)) {
755      LOG.warn("Server {} is not currently online. Ignoring request to add it to draining list.",
756        sn);
757      return false;
758    }
759
760    // Add the server to the draining servers lists, if it's not already in it.
761    if (this.drainingServers.contains(sn)) {
762      LOG.warn(
763        "Server {} is already in the draining server list. Ignoring request to add it again.", sn);
764      return true;
765    }
766
767    LOG.info("Server {} added to draining server list.", sn);
768    return this.drainingServers.add(sn);
769  }
770
771  /**
772   * Contacts a region server and waits up to timeout ms to close the region. This bypasses the
773   * active hmaster. Pass -1 as timeout if you do not want to wait on result.
774   */
775  public static void closeRegionSilentlyAndWait(AsyncClusterConnection connection,
776    ServerName server, RegionInfo region, long timeout) throws IOException, InterruptedException {
777    AsyncRegionServerAdmin admin = connection.getRegionServerAdmin(server);
778    try {
779      FutureUtils.get(
780        admin.closeRegion(ProtobufUtil.buildCloseRegionRequest(server, region.getRegionName())));
781    } catch (IOException e) {
782      LOG.warn("Exception when closing region: " + region.getRegionNameAsString(), e);
783    }
784    if (timeout < 0) {
785      return;
786    }
787    long expiration = timeout + EnvironmentEdgeManager.currentTime();
788    while (EnvironmentEdgeManager.currentTime() < expiration) {
789      try {
790        RegionInfo rsRegion = ProtobufUtil.toRegionInfo(FutureUtils
791          .get(
792            admin.getRegionInfo(RequestConverter.buildGetRegionInfoRequest(region.getRegionName())))
793          .getRegionInfo());
794        if (rsRegion == null) {
795          return;
796        }
797      } catch (IOException ioe) {
798        if (
799          ioe instanceof NotServingRegionException
800            || (ioe instanceof RemoteWithExtrasException && ((RemoteWithExtrasException) ioe)
801              .unwrapRemoteException() instanceof NotServingRegionException)
802        ) {
803          // no need to retry again
804          return;
805        }
806        LOG.warn("Exception when retrieving regioninfo from: " + region.getRegionNameAsString(),
807          ioe);
808      }
809      Thread.sleep(1000);
810    }
811    throw new IOException("Region " + region + " failed to close within" + " timeout " + timeout);
812  }
813
814  /**
815   * Calculate min necessary to start. This is not an absolute. It is just a friction that will
816   * cause us hang around a bit longer waiting on RegionServers to check-in.
817   */
818  private int getMinToStart() {
819    if (master.isInMaintenanceMode()) {
820      // If in maintenance mode, then in process region server hosting meta will be the only server
821      // available
822      return 1;
823    }
824
825    int minimumRequired = 1;
826    int minToStart = this.master.getConfiguration().getInt(WAIT_ON_REGIONSERVERS_MINTOSTART, -1);
827    // Ensure we are never less than minimumRequired else stuff won't work.
828    return Math.max(minToStart, minimumRequired);
829  }
830
831  /**
832   * Wait for the region servers to report in. We will wait until one of this condition is met: -
833   * the master is stopped - the 'hbase.master.wait.on.regionservers.maxtostart' number of region
834   * servers is reached - the 'hbase.master.wait.on.regionservers.mintostart' is reached AND there
835   * have been no new region server in for 'hbase.master.wait.on.regionservers.interval' time AND
836   * the 'hbase.master.wait.on.regionservers.timeout' is reached
837   */
838  public void waitForRegionServers(MonitoredTask status) throws InterruptedException {
839    final long interval =
840      this.master.getConfiguration().getLong(WAIT_ON_REGIONSERVERS_INTERVAL, 1500);
841    final long timeout =
842      this.master.getConfiguration().getLong(WAIT_ON_REGIONSERVERS_TIMEOUT, 4500);
843    // Min is not an absolute; just a friction making us wait longer on server checkin.
844    int minToStart = getMinToStart();
845    int maxToStart =
846      this.master.getConfiguration().getInt(WAIT_ON_REGIONSERVERS_MAXTOSTART, Integer.MAX_VALUE);
847    if (maxToStart < minToStart) {
848      LOG.warn(String.format("The value of '%s' (%d) is set less than '%s' (%d), ignoring.",
849        WAIT_ON_REGIONSERVERS_MAXTOSTART, maxToStart, WAIT_ON_REGIONSERVERS_MINTOSTART,
850        minToStart));
851      maxToStart = Integer.MAX_VALUE;
852    }
853
854    long now = EnvironmentEdgeManager.currentTime();
855    final long startTime = now;
856    long slept = 0;
857    long lastLogTime = 0;
858    long lastCountChange = startTime;
859    int count = countOfRegionServers();
860    int oldCount = 0;
861    // This while test is a little hard to read. We try to comment it in below but in essence:
862    // Wait if Master is not stopped and the number of regionservers that have checked-in is
863    // less than the maxToStart. Both of these conditions will be true near universally.
864    // Next, we will keep cycling if ANY of the following three conditions are true:
865    // 1. The time since a regionserver registered is < interval (means servers are actively
866    // checking in).
867    // 2. We are under the total timeout.
868    // 3. The count of servers is < minimum.
869    for (ServerListener listener : this.listeners) {
870      listener.waiting();
871    }
872    while (
873      !this.master.isStopped() && !isClusterShutdown() && count < maxToStart
874        && ((lastCountChange + interval) > now || timeout > slept || count < minToStart)
875    ) {
876      // Log some info at every interval time or if there is a change
877      if (oldCount != count || lastLogTime + interval < now) {
878        lastLogTime = now;
879        String msg =
880          "Waiting on regionserver count=" + count + "; waited=" + slept + "ms, expecting min="
881            + minToStart + " server(s), max=" + getStrForMax(maxToStart) + " server(s), "
882            + "timeout=" + timeout + "ms, lastChange=" + (now - lastCountChange) + "ms";
883        LOG.info(msg);
884        status.setStatus(msg);
885      }
886
887      // We sleep for some time
888      final long sleepTime = 50;
889      Thread.sleep(sleepTime);
890      now = EnvironmentEdgeManager.currentTime();
891      slept = now - startTime;
892
893      oldCount = count;
894      count = countOfRegionServers();
895      if (count != oldCount) {
896        lastCountChange = now;
897      }
898    }
899    // Did we exit the loop because cluster is going down?
900    if (isClusterShutdown()) {
901      this.master.stop("Cluster shutdown");
902    }
903    LOG.info("Finished waiting on RegionServer count=" + count + "; waited=" + slept + "ms,"
904      + " expected min=" + minToStart + " server(s), max=" + getStrForMax(maxToStart)
905      + " server(s)," + " master is " + (this.master.isStopped() ? "stopped." : "running"));
906  }
907
908  private String getStrForMax(final int max) {
909    return max == Integer.MAX_VALUE ? "NO_LIMIT" : Integer.toString(max);
910  }
911
912  /** Returns A copy of the internal list of online servers. */
913  public List<ServerName> getOnlineServersList() {
914    // TODO: optimize the load balancer call so we don't need to make a new list
915    // TODO: FIX. THIS IS POPULAR CALL.
916    return new ArrayList<>(this.onlineServers.keySet());
917  }
918
919  /**
920   * @param keys                 The target server name
921   * @param idleServerPredicator Evaluates the server on the given load
922   * @return A copy of the internal list of online servers matched by the predicator
923   */
924  public List<ServerName> getOnlineServersListWithPredicator(List<ServerName> keys,
925    Predicate<ServerMetrics> idleServerPredicator) {
926    List<ServerName> names = new ArrayList<>();
927    if (keys != null && idleServerPredicator != null) {
928      keys.forEach(name -> {
929        ServerMetrics load = onlineServers.get(name);
930        if (load != null) {
931          if (idleServerPredicator.test(load)) {
932            names.add(name);
933          }
934        }
935      });
936    }
937    return names;
938  }
939
940  /** Returns A copy of the internal list of draining servers. */
941  public List<ServerName> getDrainingServersList() {
942    return new ArrayList<>(this.drainingServers);
943  }
944
945  public boolean isServerOnline(ServerName serverName) {
946    return serverName != null && onlineServers.containsKey(serverName);
947  }
948
949  public enum ServerLiveState {
950    LIVE,
951    DEAD,
952    UNKNOWN
953  }
954
955  /** Returns whether the server is online, dead, or unknown. */
956  public synchronized ServerLiveState isServerKnownAndOnline(ServerName serverName) {
957    return onlineServers.containsKey(serverName)
958      ? ServerLiveState.LIVE
959      : (deadservers.isDeadServer(serverName) ? ServerLiveState.DEAD : ServerLiveState.UNKNOWN);
960  }
961
962  /**
963   * Check if a server is known to be dead. A server can be online, or known to be dead, or unknown
964   * to this manager (i.e, not online, not known to be dead either; it is simply not tracked by the
965   * master any more, for example, a very old previous instance).
966   */
967  public synchronized boolean isServerDead(ServerName serverName) {
968    return serverName == null || deadservers.isDeadServer(serverName);
969  }
970
971  /**
972   * Check if a server is unknown. A server can be online, or known to be dead, or unknown to this
973   * manager (i.e, not online, not known to be dead either; it is simply not tracked by the master
974   * any more, for example, a very old previous instance).
975   */
976  public boolean isServerUnknown(ServerName serverName) {
977    return serverName == null
978      || (!onlineServers.containsKey(serverName) && !deadservers.isDeadServer(serverName));
979  }
980
981  public void shutdownCluster() {
982    String statusStr = "Cluster shutdown requested of master=" + this.master.getServerName();
983    LOG.info(statusStr);
984    this.clusterShutdown.set(true);
985    if (onlineServers.isEmpty()) {
986      // we do not synchronize here so this may cause a double stop, but not a big deal
987      master.stop("OnlineServer=0 right after cluster shutdown set");
988    }
989  }
990
991  public boolean isClusterShutdown() {
992    return this.clusterShutdown.get();
993  }
994
995  /**
996   * start chore in ServerManager
997   */
998  public void startChore() {
999    Configuration c = master.getConfiguration();
1000    if (persistFlushedSequenceId) {
1001      new Thread(() -> {
1002        // after AM#loadMeta, RegionStates should be loaded, and some regions are
1003        // deleted by drop/split/merge during removeDeletedRegionFromLoadedFlushedSequenceIds,
1004        // but these deleted regions are not added back to RegionStates,
1005        // so we can safely remove deleted regions.
1006        removeDeletedRegionFromLoadedFlushedSequenceIds();
1007      }, "RemoveDeletedRegionSyncThread").start();
1008      int flushPeriod =
1009        c.getInt(FLUSHEDSEQUENCEID_FLUSHER_INTERVAL, FLUSHEDSEQUENCEID_FLUSHER_INTERVAL_DEFAULT);
1010      flushedSeqIdFlusher = new FlushedSequenceIdFlusher("FlushedSequenceIdFlusher", flushPeriod);
1011      master.getChoreService().scheduleChore(flushedSeqIdFlusher);
1012    }
1013  }
1014
1015  /**
1016   * Stop the ServerManager.
1017   */
1018  public void stop() {
1019    if (flushedSeqIdFlusher != null) {
1020      flushedSeqIdFlusher.shutdown();
1021    }
1022    if (persistFlushedSequenceId) {
1023      try {
1024        persistRegionLastFlushedSequenceIds();
1025      } catch (IOException e) {
1026        LOG.warn("Failed to persist last flushed sequence id of regions" + " to file system", e);
1027      }
1028    }
1029  }
1030
1031  /**
1032   * Creates a list of possible destinations for a region. It contains the online servers, but not
1033   * the draining or dying servers.
1034   * @param serversToExclude can be null if there is no server to exclude
1035   */
1036  public List<ServerName> createDestinationServersList(final List<ServerName> serversToExclude) {
1037    Set<ServerName> destServers = new HashSet<>();
1038    onlineServers.forEach((sn, sm) -> {
1039      if (sm.getLastReportTimestamp() > 0) {
1040        // This means we have already called regionServerReport at leaset once, then let's include
1041        // this server for region assignment. This is an optimization to avoid assigning regions to
1042        // an uninitialized server. See HBASE-25032 for more details.
1043        destServers.add(sn);
1044      }
1045    });
1046
1047    if (serversToExclude != null) {
1048      destServers.removeAll(serversToExclude);
1049    }
1050
1051    // Loop through the draining server list and remove them from the server list
1052    final List<ServerName> drainingServersCopy = getDrainingServersList();
1053    destServers.removeAll(drainingServersCopy);
1054
1055    return new ArrayList<>(destServers);
1056  }
1057
1058  /**
1059   * Calls {@link #createDestinationServersList} without server to exclude.
1060   */
1061  public List<ServerName> createDestinationServersList() {
1062    return createDestinationServersList(null);
1063  }
1064
1065  /**
1066   * To clear any dead server with same host name and port of any online server
1067   */
1068  void clearDeadServersWithSameHostNameAndPortOfOnlineServer() {
1069    for (ServerName serverName : getOnlineServersList()) {
1070      deadservers.cleanAllPreviousInstances(serverName);
1071    }
1072  }
1073
1074  /**
1075   * Called by delete table and similar to notify the ServerManager that a region was removed.
1076   */
1077  public void removeRegion(final RegionInfo regionInfo) {
1078    final byte[] encodedName = regionInfo.getEncodedNameAsBytes();
1079    storeFlushedSequenceIdsByRegion.remove(encodedName);
1080    flushedSequenceIdByRegion.remove(encodedName);
1081  }
1082
1083  public boolean isRegionInServerManagerStates(final RegionInfo hri) {
1084    final byte[] encodedName = hri.getEncodedNameAsBytes();
1085    return (storeFlushedSequenceIdsByRegion.containsKey(encodedName)
1086      || flushedSequenceIdByRegion.containsKey(encodedName));
1087  }
1088
1089  /**
1090   * Called by delete table and similar to notify the ServerManager that a region was removed.
1091   */
1092  public void removeRegions(final List<RegionInfo> regions) {
1093    for (RegionInfo hri : regions) {
1094      removeRegion(hri);
1095    }
1096  }
1097
1098  /**
1099   * May return 0 when server is not online.
1100   */
1101  public int getVersionNumber(ServerName serverName) {
1102    ServerMetrics serverMetrics = onlineServers.get(serverName);
1103    return serverMetrics != null ? serverMetrics.getVersionNumber() : 0;
1104  }
1105
1106  /**
1107   * May return "0.0.0" when server is not online
1108   */
1109  public String getVersion(ServerName serverName) {
1110    ServerMetrics serverMetrics = onlineServers.get(serverName);
1111    return serverMetrics != null ? serverMetrics.getVersion() : "0.0.0";
1112  }
1113
1114  public int getInfoPort(ServerName serverName) {
1115    ServerMetrics serverMetrics = onlineServers.get(serverName);
1116    return serverMetrics != null ? serverMetrics.getInfoServerPort() : 0;
1117  }
1118
1119  /**
1120   * Persist last flushed sequence id of each region to HDFS
1121   * @throws IOException if persit to HDFS fails
1122   */
1123  private void persistRegionLastFlushedSequenceIds() throws IOException {
1124    if (isFlushSeqIdPersistInProgress) {
1125      return;
1126    }
1127    isFlushSeqIdPersistInProgress = true;
1128    try {
1129      Configuration conf = master.getConfiguration();
1130      Path rootDir = CommonFSUtils.getRootDir(conf);
1131      Path lastFlushedSeqIdPath = new Path(rootDir, LAST_FLUSHED_SEQ_ID_FILE);
1132      FileSystem fs = FileSystem.get(conf);
1133      if (fs.exists(lastFlushedSeqIdPath)) {
1134        LOG.info("Rewriting .lastflushedseqids file at: " + lastFlushedSeqIdPath);
1135        if (!fs.delete(lastFlushedSeqIdPath, false)) {
1136          throw new IOException("Unable to remove existing " + lastFlushedSeqIdPath);
1137        }
1138      } else {
1139        LOG.info("Writing .lastflushedseqids file at: " + lastFlushedSeqIdPath);
1140      }
1141      FSDataOutputStream out = fs.create(lastFlushedSeqIdPath);
1142      FlushedSequenceId.Builder flushedSequenceIdBuilder = FlushedSequenceId.newBuilder();
1143      try {
1144        for (Entry<byte[], Long> entry : flushedSequenceIdByRegion.entrySet()) {
1145          FlushedRegionSequenceId.Builder flushedRegionSequenceIdBuilder =
1146            FlushedRegionSequenceId.newBuilder();
1147          flushedRegionSequenceIdBuilder.setRegionEncodedName(ByteString.copyFrom(entry.getKey()));
1148          flushedRegionSequenceIdBuilder.setSeqId(entry.getValue());
1149          ConcurrentNavigableMap<byte[], Long> storeSeqIds =
1150            storeFlushedSequenceIdsByRegion.get(entry.getKey());
1151          if (storeSeqIds != null) {
1152            for (Entry<byte[], Long> store : storeSeqIds.entrySet()) {
1153              FlushedStoreSequenceId.Builder flushedStoreSequenceIdBuilder =
1154                FlushedStoreSequenceId.newBuilder();
1155              flushedStoreSequenceIdBuilder.setFamily(ByteString.copyFrom(store.getKey()));
1156              flushedStoreSequenceIdBuilder.setSeqId(store.getValue());
1157              flushedRegionSequenceIdBuilder.addStores(flushedStoreSequenceIdBuilder);
1158            }
1159          }
1160          flushedSequenceIdBuilder.addRegionSequenceId(flushedRegionSequenceIdBuilder);
1161        }
1162        flushedSequenceIdBuilder.build().writeDelimitedTo(out);
1163      } finally {
1164        if (out != null) {
1165          out.close();
1166        }
1167      }
1168    } finally {
1169      isFlushSeqIdPersistInProgress = false;
1170    }
1171  }
1172
1173  /**
1174   * Load last flushed sequence id of each region from HDFS, if persisted
1175   */
1176  public void loadLastFlushedSequenceIds() throws IOException {
1177    if (!persistFlushedSequenceId) {
1178      return;
1179    }
1180    Configuration conf = master.getConfiguration();
1181    Path rootDir = CommonFSUtils.getRootDir(conf);
1182    Path lastFlushedSeqIdPath = new Path(rootDir, LAST_FLUSHED_SEQ_ID_FILE);
1183    FileSystem fs = FileSystem.get(conf);
1184    if (!fs.exists(lastFlushedSeqIdPath)) {
1185      LOG.info("No .lastflushedseqids found at " + lastFlushedSeqIdPath
1186        + " will record last flushed sequence id"
1187        + " for regions by regionserver report all over again");
1188      return;
1189    } else {
1190      LOG.info("begin to load .lastflushedseqids at " + lastFlushedSeqIdPath);
1191    }
1192    FSDataInputStream in = fs.open(lastFlushedSeqIdPath);
1193    try {
1194      FlushedSequenceId flushedSequenceId = FlushedSequenceId.parseDelimitedFrom(in);
1195      if (flushedSequenceId == null) {
1196        LOG.info(".lastflushedseqids found at {} is empty", lastFlushedSeqIdPath);
1197        return;
1198      }
1199      for (FlushedRegionSequenceId flushedRegionSequenceId : flushedSequenceId
1200        .getRegionSequenceIdList()) {
1201        byte[] encodedRegionName = flushedRegionSequenceId.getRegionEncodedName().toByteArray();
1202        flushedSequenceIdByRegion.putIfAbsent(encodedRegionName,
1203          flushedRegionSequenceId.getSeqId());
1204        if (
1205          flushedRegionSequenceId.getStoresList() != null
1206            && flushedRegionSequenceId.getStoresList().size() != 0
1207        ) {
1208          ConcurrentNavigableMap<byte[], Long> storeFlushedSequenceId =
1209            computeIfAbsent(storeFlushedSequenceIdsByRegion, encodedRegionName,
1210              () -> new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR));
1211          for (FlushedStoreSequenceId flushedStoreSequenceId : flushedRegionSequenceId
1212            .getStoresList()) {
1213            storeFlushedSequenceId.put(flushedStoreSequenceId.getFamily().toByteArray(),
1214              flushedStoreSequenceId.getSeqId());
1215          }
1216        }
1217      }
1218    } finally {
1219      in.close();
1220    }
1221  }
1222
1223  /**
1224   * Regions may have been removed between latest persist of FlushedSequenceIds and master abort. So
1225   * after loading FlushedSequenceIds from file, and after meta loaded, we need to remove the
1226   * deleted region according to RegionStates.
1227   */
1228  public void removeDeletedRegionFromLoadedFlushedSequenceIds() {
1229    RegionStates regionStates = master.getAssignmentManager().getRegionStates();
1230    Iterator<byte[]> it = flushedSequenceIdByRegion.keySet().iterator();
1231    while (it.hasNext()) {
1232      byte[] regionEncodedName = it.next();
1233      if (regionStates.getRegionState(Bytes.toStringBinary(regionEncodedName)) == null) {
1234        it.remove();
1235        storeFlushedSequenceIdsByRegion.remove(regionEncodedName);
1236      }
1237    }
1238  }
1239
1240  private class FlushedSequenceIdFlusher extends ScheduledChore {
1241
1242    public FlushedSequenceIdFlusher(String name, int p) {
1243      super(name, master, p, 60 * 1000); // delay one minute before first execute
1244    }
1245
1246    @Override
1247    protected void chore() {
1248      try {
1249        persistRegionLastFlushedSequenceIds();
1250      } catch (IOException e) {
1251        LOG.debug("Failed to persist last flushed sequence id of regions" + " to file system", e);
1252      }
1253    }
1254  }
1255}