001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master;
019
020import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
021
022import java.io.IOException;
023import java.net.InetAddress;
024import java.util.ArrayList;
025import java.util.Collections;
026import java.util.HashSet;
027import java.util.Iterator;
028import java.util.List;
029import java.util.Map;
030import java.util.Map.Entry;
031import java.util.Objects;
032import java.util.Set;
033import java.util.concurrent.ConcurrentNavigableMap;
034import java.util.concurrent.ConcurrentSkipListMap;
035import java.util.concurrent.CopyOnWriteArrayList;
036import java.util.concurrent.atomic.AtomicBoolean;
037import java.util.function.Predicate;
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.fs.FSDataInputStream;
040import org.apache.hadoop.fs.FSDataOutputStream;
041import org.apache.hadoop.fs.FileSystem;
042import org.apache.hadoop.fs.Path;
043import org.apache.hadoop.hbase.ClockOutOfSyncException;
044import org.apache.hadoop.hbase.HConstants;
045import org.apache.hadoop.hbase.NotServingRegionException;
046import org.apache.hadoop.hbase.RegionMetrics;
047import org.apache.hadoop.hbase.ScheduledChore;
048import org.apache.hadoop.hbase.ServerMetrics;
049import org.apache.hadoop.hbase.ServerMetricsBuilder;
050import org.apache.hadoop.hbase.ServerName;
051import org.apache.hadoop.hbase.YouAreDeadException;
052import org.apache.hadoop.hbase.client.AsyncClusterConnection;
053import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
054import org.apache.hadoop.hbase.client.RegionInfo;
055import org.apache.hadoop.hbase.conf.ConfigurationObserver;
056import org.apache.hadoop.hbase.ipc.DecommissionedHostRejectedException;
057import org.apache.hadoop.hbase.ipc.RemoteWithExtrasException;
058import org.apache.hadoop.hbase.master.assignment.RegionStates;
059import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
060import org.apache.hadoop.hbase.monitoring.MonitoredTask;
061import org.apache.hadoop.hbase.procedure2.Procedure;
062import org.apache.hadoop.hbase.util.Bytes;
063import org.apache.hadoop.hbase.util.CommonFSUtils;
064import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
065import org.apache.hadoop.hbase.util.FutureUtils;
066import org.apache.hadoop.hbase.zookeeper.ZKUtil;
067import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
068import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
069import org.apache.yetus.audience.InterfaceAudience;
070import org.apache.zookeeper.KeeperException;
071import org.slf4j.Logger;
072import org.slf4j.LoggerFactory;
073
074import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
075import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
076
077import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
078import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
079import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
080import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
081import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.FlushedRegionSequenceId;
082import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.FlushedSequenceId;
083import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.FlushedStoreSequenceId;
084import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
085
086/**
087 * The ServerManager class manages info about region servers.
088 * <p>
089 * Maintains lists of online and dead servers. Processes the startups, shutdowns, and deaths of
090 * region servers.
091 * <p>
092 * Servers are distinguished in two different ways. A given server has a location, specified by
093 * hostname and port, and of which there can only be one online at any given time. A server instance
094 * is specified by the location (hostname and port) as well as the startcode (timestamp from when
095 * the server was started). This is used to differentiate a restarted instance of a given server
096 * from the original instance.
097 * <p>
098 * If a sever is known not to be running any more, it is called dead. The dead server needs to be
099 * handled by a ServerShutdownHandler. If the handler is not enabled yet, the server can't be
100 * handled right away so it is queued up. After the handler is enabled, the server will be submitted
101 * to a handler to handle. However, the handler may be just partially enabled. If so, the server
102 * cannot be fully processed, and be queued up for further processing. A server is fully processed
103 * only after the handler is fully enabled and has completed the handling.
104 */
105@InterfaceAudience.Private
106public class ServerManager implements ConfigurationObserver {
107  public static final String WAIT_ON_REGIONSERVERS_MAXTOSTART =
108    "hbase.master.wait.on.regionservers.maxtostart";
109
110  public static final String WAIT_ON_REGIONSERVERS_MINTOSTART =
111    "hbase.master.wait.on.regionservers.mintostart";
112
113  public static final String WAIT_ON_REGIONSERVERS_TIMEOUT =
114    "hbase.master.wait.on.regionservers.timeout";
115
116  public static final String WAIT_ON_REGIONSERVERS_INTERVAL =
117    "hbase.master.wait.on.regionservers.interval";
118
119  /**
120   * see HBASE-20727 if set to true, flushedSequenceIdByRegion and storeFlushedSequenceIdsByRegion
121   * will be persisted to HDFS and loaded when master restart to speed up log split
122   */
123  public static final String PERSIST_FLUSHEDSEQUENCEID =
124    "hbase.master.persist.flushedsequenceid.enabled";
125
126  public static final boolean PERSIST_FLUSHEDSEQUENCEID_DEFAULT = true;
127
128  public static final String FLUSHEDSEQUENCEID_FLUSHER_INTERVAL =
129    "hbase.master.flushedsequenceid.flusher.interval";
130
131  public static final int FLUSHEDSEQUENCEID_FLUSHER_INTERVAL_DEFAULT = 3 * 60 * 60 * 1000; // 3
132                                                                                           // hours
133
134  public static final String MAX_CLOCK_SKEW_MS = "hbase.master.maxclockskew";
135
136  private static final Logger LOG = LoggerFactory.getLogger(ServerManager.class);
137
138  // Set if we are to shutdown the cluster.
139  private AtomicBoolean clusterShutdown = new AtomicBoolean(false);
140
141  /**
142   * The last flushed sequence id for a region.
143   */
144  private final ConcurrentNavigableMap<byte[], Long> flushedSequenceIdByRegion =
145    new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
146
147  private boolean persistFlushedSequenceId = true;
148  private volatile boolean isFlushSeqIdPersistInProgress = false;
149  /** File on hdfs to store last flushed sequence id of regions */
150  private static final String LAST_FLUSHED_SEQ_ID_FILE = ".lastflushedseqids";
151  private FlushedSequenceIdFlusher flushedSeqIdFlusher;
152
153  /**
154   * The last flushed sequence id for a store in a region.
155   */
156  private final ConcurrentNavigableMap<byte[],
157    ConcurrentNavigableMap<byte[], Long>> storeFlushedSequenceIdsByRegion =
158      new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
159
160  /** Map of registered servers to their current load */
161  private final ConcurrentNavigableMap<ServerName, ServerMetrics> onlineServers =
162    new ConcurrentSkipListMap<>();
163
164  /** List of region servers that should not get any more new regions. */
165  private final ArrayList<ServerName> drainingServers = new ArrayList<>();
166
167  private final MasterServices master;
168  private final RegionServerList storage;
169
170  private final DeadServer deadservers = new DeadServer();
171
172  private final long maxSkew;
173  private final long warningSkew;
174
175  /** Listeners that are called on server events. */
176  private List<ServerListener> listeners = new CopyOnWriteArrayList<>();
177
178  /** Configured value of HConstants.REJECT_DECOMMISSIONED_HOSTS_KEY */
179  private volatile boolean rejectDecommissionedHostsConfig;
180
181  /**
182   * Constructor.
183   */
184  public ServerManager(final MasterServices master, RegionServerList storage) {
185    this.master = master;
186    this.storage = storage;
187    Configuration c = master.getConfiguration();
188    maxSkew = c.getLong(MAX_CLOCK_SKEW_MS, 30000);
189    warningSkew = c.getLong("hbase.master.warningclockskew", 10000);
190    persistFlushedSequenceId =
191      c.getBoolean(PERSIST_FLUSHEDSEQUENCEID, PERSIST_FLUSHEDSEQUENCEID_DEFAULT);
192    rejectDecommissionedHostsConfig = getRejectDecommissionedHostsConfig(c);
193  }
194
195  /**
196   * Implementation of the ConfigurationObserver interface. We are interested in live-loading the
197   * configuration value of HConstants.REJECT_DECOMMISSIONED_HOSTS_KEY
198   * @param conf Server configuration instance
199   */
200  @Override
201  public void onConfigurationChange(Configuration conf) {
202    final boolean newValue = getRejectDecommissionedHostsConfig(conf);
203    if (rejectDecommissionedHostsConfig == newValue) {
204      // no-op
205      return;
206    }
207
208    LOG.info("Config Reload for RejectDecommissionedHosts. previous value: {}, new value: {}",
209      rejectDecommissionedHostsConfig, newValue);
210
211    rejectDecommissionedHostsConfig = newValue;
212  }
213
214  /**
215   * Reads the value of HConstants.REJECT_DECOMMISSIONED_HOSTS_KEY from the config and returns it
216   * @param conf Configuration instance of the Master
217   */
218  public boolean getRejectDecommissionedHostsConfig(Configuration conf) {
219    return conf.getBoolean(HConstants.REJECT_DECOMMISSIONED_HOSTS_KEY,
220      HConstants.REJECT_DECOMMISSIONED_HOSTS_DEFAULT);
221  }
222
223  /**
224   * Add the listener to the notification list.
225   * @param listener The ServerListener to register
226   */
227  public void registerListener(final ServerListener listener) {
228    this.listeners.add(listener);
229  }
230
231  /**
232   * Remove the listener from the notification list.
233   * @param listener The ServerListener to unregister
234   */
235  public boolean unregisterListener(final ServerListener listener) {
236    return this.listeners.remove(listener);
237  }
238
239  /**
240   * Removes all of the ServerListeners of this collection that satisfy the given predicate.
241   * @param filter a predicate which returns true for ServerListener to be removed
242   */
243  public boolean unregisterListenerIf(final Predicate<ServerListener> filter) {
244    return this.listeners.removeIf(filter);
245  }
246
247  /**
248   * Let the server manager know a new regionserver has come online
249   * @param request       the startup request
250   * @param versionNumber the version number of the new regionserver
251   * @param version       the version of the new regionserver, could contain strings like "SNAPSHOT"
252   * @param ia            the InetAddress from which request is received
253   * @return The ServerName we know this server as.
254   */
255  ServerName regionServerStartup(RegionServerStartupRequest request, int versionNumber,
256    String version, InetAddress ia) throws IOException {
257    // Test for case where we get a region startup message from a regionserver
258    // that has been quickly restarted but whose znode expiration handler has
259    // not yet run, or from a server whose fail we are currently processing.
260    // Test its host+port combo is present in serverAddressToServerInfo. If it
261    // is, reject the server and trigger its expiration. The next time it comes
262    // in, it should have been removed from serverAddressToServerInfo and queued
263    // for processing by ProcessServerShutdown.
264
265    // if use-ip is enabled, we will use ip to expose Master/RS service for client,
266    // see HBASE-27304 for details.
267    boolean useIp = master.getConfiguration().getBoolean(HConstants.HBASE_SERVER_USEIP_ENABLED_KEY,
268      HConstants.HBASE_SERVER_USEIP_ENABLED_DEFAULT);
269    String isaHostName = useIp ? ia.getHostAddress() : ia.getHostName();
270    final String hostname =
271      request.hasUseThisHostnameInstead() ? request.getUseThisHostnameInstead() : isaHostName;
272    ServerName sn = ServerName.valueOf(hostname, request.getPort(), request.getServerStartCode());
273
274    // Check if the host should be rejected based on it's decommissioned status
275    checkRejectableDecommissionedStatus(sn);
276
277    checkClockSkew(sn, request.getServerCurrentTime());
278    checkIsDead(sn, "STARTUP");
279    if (!checkAndRecordNewServer(sn, ServerMetricsBuilder.of(sn, versionNumber, version))) {
280      LOG.warn("THIS SHOULD NOT HAPPEN, RegionServerStartup could not record the server: {}", sn);
281    }
282    storage.started(sn);
283    return sn;
284  }
285
286  /**
287   * Updates last flushed sequence Ids for the regions on server sn
288   */
289  private void updateLastFlushedSequenceIds(ServerName sn, ServerMetrics hsl) {
290    for (Entry<byte[], RegionMetrics> entry : hsl.getRegionMetrics().entrySet()) {
291      byte[] encodedRegionName = Bytes.toBytes(RegionInfo.encodeRegionName(entry.getKey()));
292      Long existingValue = flushedSequenceIdByRegion.get(encodedRegionName);
293      long l = entry.getValue().getCompletedSequenceId();
294      // Don't let smaller sequence ids override greater sequence ids.
295      if (LOG.isTraceEnabled()) {
296        LOG.trace(Bytes.toString(encodedRegionName) + ", existingValue=" + existingValue
297          + ", completeSequenceId=" + l);
298      }
299      if (existingValue == null || (l != HConstants.NO_SEQNUM && l > existingValue)) {
300        flushedSequenceIdByRegion.put(encodedRegionName, l);
301      } else if (l != HConstants.NO_SEQNUM && l < existingValue) {
302        LOG.warn("RegionServer " + sn + " indicates a last flushed sequence id (" + l
303          + ") that is less than the previous last flushed sequence id (" + existingValue
304          + ") for region " + Bytes.toString(entry.getKey()) + " Ignoring.");
305      }
306      ConcurrentNavigableMap<byte[], Long> storeFlushedSequenceId =
307        computeIfAbsent(storeFlushedSequenceIdsByRegion, encodedRegionName,
308          () -> new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR));
309      for (Entry<byte[], Long> storeSeqId : entry.getValue().getStoreSequenceId().entrySet()) {
310        byte[] family = storeSeqId.getKey();
311        existingValue = storeFlushedSequenceId.get(family);
312        l = storeSeqId.getValue();
313        if (LOG.isTraceEnabled()) {
314          LOG.trace(Bytes.toString(encodedRegionName) + ", family=" + Bytes.toString(family)
315            + ", existingValue=" + existingValue + ", completeSequenceId=" + l);
316        }
317        // Don't let smaller sequence ids override greater sequence ids.
318        if (existingValue == null || (l != HConstants.NO_SEQNUM && l > existingValue.longValue())) {
319          storeFlushedSequenceId.put(family, l);
320        }
321      }
322    }
323  }
324
325  public void regionServerReport(ServerName sn, ServerMetrics sl) throws YouAreDeadException {
326    checkIsDead(sn, "REPORT");
327    if (null == this.onlineServers.replace(sn, sl)) {
328      // Already have this host+port combo and its just different start code?
329      // Just let the server in. Presume master joining a running cluster.
330      // recordNewServer is what happens at the end of reportServerStartup.
331      // The only thing we are skipping is passing back to the regionserver
332      // the ServerName to use. Here we presume a master has already done
333      // that so we'll press on with whatever it gave us for ServerName.
334      if (!checkAndRecordNewServer(sn, sl)) {
335        // Master already registered server with same (host + port) and higher startcode.
336        // This can happen if regionserver report comes late from old server
337        // (possible race condition), by that time master has already processed SCP for that
338        // server and started accepting regionserver report from new server i.e. server with
339        // same (host + port) and higher startcode.
340        // The exception thrown here is not meant to tell the region server it is dead because if
341        // there is a new server on the same host port, the old server should have already been
342        // dead in ideal situation.
343        // The exception thrown here is to skip the later steps of the whole regionServerReport
344        // request processing. Usually, after recording it in ServerManager, we will call the
345        // related methods in AssignmentManager to record region states. If the region server
346        // is already dead, we should not do these steps anymore, so here we throw an exception
347        // to let the upper layer know that they should not continue processing anymore.
348        final String errorMsg = "RegionServerReport received from " + sn
349          + ", but another server with the same name and higher startcode is already registered,"
350          + " ignoring";
351        LOG.warn(errorMsg);
352        throw new YouAreDeadException(errorMsg);
353      }
354    }
355    updateLastFlushedSequenceIds(sn, sl);
356  }
357
358  /**
359   * Checks if the Master is configured to reject decommissioned hosts or not. When it's configured
360   * to do so, any RegionServer trying to join the cluster will have it's host checked against the
361   * list of hosts of currently decommissioned servers and potentially get prevented from reporting
362   * for duty; otherwise, we do nothing and we let them pass to the next check. See HBASE-28342 for
363   * details.
364   * @param sn The ServerName to check for
365   * @throws DecommissionedHostRejectedException if the Master is configured to reject
366   *                                             decommissioned hosts and this host exists in the
367   *                                             list of the decommissioned servers
368   */
369  private void checkRejectableDecommissionedStatus(ServerName sn)
370    throws DecommissionedHostRejectedException {
371    LOG.info("Checking decommissioned status of RegionServer {}", sn.getServerName());
372
373    // If the Master is not configured to reject decommissioned hosts, return early.
374    if (!rejectDecommissionedHostsConfig) {
375      return;
376    }
377
378    // Look for a match for the hostname in the list of decommissioned servers
379    for (ServerName server : getDrainingServersList()) {
380      if (Objects.equals(server.getHostname(), sn.getHostname())) {
381        // Found a match and master is configured to reject decommissioned hosts, throw exception!
382        LOG.warn(
383          "Rejecting RegionServer {} from reporting for duty because Master is configured "
384            + "to reject decommissioned hosts and this host was marked as such in the past.",
385          sn.getServerName());
386        throw new DecommissionedHostRejectedException(String.format(
387          "Host %s exists in the list of decommissioned servers and Master is configured to "
388            + "reject decommissioned hosts",
389          sn.getHostname()));
390      }
391    }
392  }
393
394  /**
395   * Check is a server of same host and port already exists, if not, or the existed one got a
396   * smaller start code, record it.
397   * @param serverName the server to check and record
398   * @param sl         the server load on the server
399   * @return true if the server is recorded, otherwise, false
400   */
401  boolean checkAndRecordNewServer(final ServerName serverName, final ServerMetrics sl) {
402    ServerName existingServer = null;
403    synchronized (this.onlineServers) {
404      existingServer = findServerWithSameHostnamePortWithLock(serverName);
405      if (existingServer != null && (existingServer.getStartcode() > serverName.getStartcode())) {
406        LOG.info("Server serverName=" + serverName + " rejected; we already have "
407          + existingServer.toString() + " registered with same hostname and port");
408        return false;
409      }
410      recordNewServerWithLock(serverName, sl);
411    }
412
413    // Tell our listeners that a server was added
414    if (!this.listeners.isEmpty()) {
415      for (ServerListener listener : this.listeners) {
416        listener.serverAdded(serverName);
417      }
418    }
419
420    // Note that we assume that same ts means same server, and don't expire in that case.
421    // TODO: ts can theoretically collide due to clock shifts, so this is a bit hacky.
422    if (existingServer != null && (existingServer.getStartcode() < serverName.getStartcode())) {
423      LOG.info("Triggering server recovery; existingServer " + existingServer
424        + " looks stale, new server:" + serverName);
425      expireServer(existingServer);
426    }
427    return true;
428  }
429
430  /**
431   * Find out the region servers crashed between the crash of the previous master instance and the
432   * current master instance and schedule SCP for them.
433   * <p/>
434   * Since the {@code RegionServerTracker} has already helped us to construct the online servers set
435   * by scanning zookeeper, now we can compare the online servers with {@code liveServersFromWALDir}
436   * to find out whether there are servers which are already dead.
437   * <p/>
438   * Must be called inside the initialization method of {@code RegionServerTracker} to avoid
439   * concurrency issue.
440   * @param deadServersFromPE     the region servers which already have a SCP associated.
441   * @param liveServersFromWALDir the live region servers from wal directory.
442   */
443  void findDeadServersAndProcess(Set<ServerName> deadServersFromPE,
444    Set<ServerName> liveServersFromWALDir) {
445    deadServersFromPE.forEach(deadservers::putIfAbsent);
446    liveServersFromWALDir.stream().filter(sn -> !onlineServers.containsKey(sn))
447      .forEach(this::expireServer);
448  }
449
450  /**
451   * Checks if the clock skew between the server and the master. If the clock skew exceeds the
452   * configured max, it will throw an exception; if it exceeds the configured warning threshold, it
453   * will log a warning but start normally.
454   * @param serverName Incoming servers's name
455   * @throws ClockOutOfSyncException if the skew exceeds the configured max value
456   */
457  private void checkClockSkew(final ServerName serverName, final long serverCurrentTime)
458    throws ClockOutOfSyncException {
459    long skew = Math.abs(EnvironmentEdgeManager.currentTime() - serverCurrentTime);
460    if (skew > maxSkew) {
461      String message = "Server " + serverName + " has been "
462        + "rejected; Reported time is too far out of sync with master.  " + "Time difference of "
463        + skew + "ms > max allowed of " + maxSkew + "ms";
464      LOG.warn(message);
465      throw new ClockOutOfSyncException(message);
466    } else if (skew > warningSkew) {
467      String message = "Reported time for server " + serverName + " is out of sync with master "
468        + "by " + skew + "ms. (Warning threshold is " + warningSkew + "ms; " + "error threshold is "
469        + maxSkew + "ms)";
470      LOG.warn(message);
471    }
472  }
473
474  /**
475   * Called when RegionServer first reports in for duty and thereafter each time it heartbeats to
476   * make sure it is has not been figured for dead. If this server is on the dead list, reject it
477   * with a YouAreDeadException. If it was dead but came back with a new start code, remove the old
478   * entry from the dead list.
479   * @param what START or REPORT
480   */
481  private void checkIsDead(final ServerName serverName, final String what)
482    throws YouAreDeadException {
483    if (this.deadservers.isDeadServer(serverName)) {
484      // Exact match: host name, port and start code all match with existing one of the
485      // dead servers. So, this server must be dead. Tell it to kill itself.
486      String message =
487        "Server " + what + " rejected; currently processing " + serverName + " as dead server";
488      LOG.debug(message);
489      throw new YouAreDeadException(message);
490    }
491    // Remove dead server with same hostname and port of newly checking in rs after master
492    // initialization. See HBASE-5916 for more information.
493    if (
494      (this.master == null || this.master.isInitialized())
495        && this.deadservers.cleanPreviousInstance(serverName)
496    ) {
497      // This server has now become alive after we marked it as dead.
498      // We removed it's previous entry from the dead list to reflect it.
499      LOG.debug("{} {} came back up, removed it from the dead servers list", what, serverName);
500    }
501  }
502
503  /**
504   * Assumes onlineServers is locked.
505   * @return ServerName with matching hostname and port.
506   */
507  public ServerName findServerWithSameHostnamePortWithLock(final ServerName serverName) {
508    ServerName end =
509      ServerName.valueOf(serverName.getHostname(), serverName.getPort(), Long.MAX_VALUE);
510
511    ServerName r = onlineServers.lowerKey(end);
512    if (r != null) {
513      if (ServerName.isSameAddress(r, serverName)) {
514        return r;
515      }
516    }
517    return null;
518  }
519
520  /**
521   * Adds the onlineServers list. onlineServers should be locked.
522   * @param serverName The remote servers name.
523   */
524  void recordNewServerWithLock(final ServerName serverName, final ServerMetrics sl) {
525    LOG.info("Registering regionserver=" + serverName);
526    this.onlineServers.put(serverName, sl);
527    master.getAssignmentManager().getRegionStates().createServer(serverName);
528  }
529
530  public ConcurrentNavigableMap<byte[], Long> getFlushedSequenceIdByRegion() {
531    return flushedSequenceIdByRegion;
532  }
533
534  public RegionStoreSequenceIds getLastFlushedSequenceId(byte[] encodedRegionName) {
535    RegionStoreSequenceIds.Builder builder = RegionStoreSequenceIds.newBuilder();
536    Long seqId = flushedSequenceIdByRegion.get(encodedRegionName);
537    builder.setLastFlushedSequenceId(seqId != null ? seqId.longValue() : HConstants.NO_SEQNUM);
538    Map<byte[], Long> storeFlushedSequenceId =
539      storeFlushedSequenceIdsByRegion.get(encodedRegionName);
540    if (storeFlushedSequenceId != null) {
541      for (Map.Entry<byte[], Long> entry : storeFlushedSequenceId.entrySet()) {
542        builder.addStoreSequenceId(StoreSequenceId.newBuilder()
543          .setFamilyName(UnsafeByteOperations.unsafeWrap(entry.getKey()))
544          .setSequenceId(entry.getValue().longValue()).build());
545      }
546    }
547    return builder.build();
548  }
549
550  /** Returns ServerMetrics if serverName is known else null */
551  public ServerMetrics getLoad(final ServerName serverName) {
552    return this.onlineServers.get(serverName);
553  }
554
555  /**
556   * Compute the average load across all region servers. Currently, this uses a very naive
557   * computation - just uses the number of regions being served, ignoring stats about number of
558   * requests.
559   * @return the average load
560   */
561  public double getAverageLoad() {
562    int totalLoad = 0;
563    int numServers = 0;
564    for (ServerMetrics sl : this.onlineServers.values()) {
565      numServers++;
566      totalLoad += sl.getRegionMetrics().size();
567    }
568    return numServers == 0 ? 0 : (double) totalLoad / (double) numServers;
569  }
570
571  /** Returns the count of active regionservers */
572  public int countOfRegionServers() {
573    // Presumes onlineServers is a concurrent map
574    return this.onlineServers.size();
575  }
576
577  /** Returns Read-only map of servers to serverinfo */
578  public Map<ServerName, ServerMetrics> getOnlineServers() {
579    // Presumption is that iterating the returned Map is OK.
580    synchronized (this.onlineServers) {
581      return Collections.unmodifiableMap(this.onlineServers);
582    }
583  }
584
585  public DeadServer getDeadServers() {
586    return this.deadservers;
587  }
588
589  /**
590   * Checks if any dead servers are currently in progress.
591   * @return true if any RS are being processed as dead, false if not
592   */
593  public boolean areDeadServersInProgress() throws IOException {
594    return master.getProcedures().stream()
595      .anyMatch(p -> !p.isFinished() && p instanceof ServerCrashProcedure);
596  }
597
598  void letRegionServersShutdown() {
599    long previousLogTime = 0;
600    ServerName sn = master.getServerName();
601    ZKWatcher zkw = master.getZooKeeper();
602    int onlineServersCt;
603    while ((onlineServersCt = onlineServers.size()) > 0) {
604      if (EnvironmentEdgeManager.currentTime() > (previousLogTime + 1000)) {
605        Set<ServerName> remainingServers = onlineServers.keySet();
606        synchronized (onlineServers) {
607          if (remainingServers.size() == 1 && remainingServers.contains(sn)) {
608            // Master will delete itself later.
609            return;
610          }
611        }
612        StringBuilder sb = new StringBuilder();
613        // It's ok here to not sync on onlineServers - merely logging
614        for (ServerName key : remainingServers) {
615          if (sb.length() > 0) {
616            sb.append(", ");
617          }
618          sb.append(key);
619        }
620        LOG.info("Waiting on regionserver(s) " + sb.toString());
621        previousLogTime = EnvironmentEdgeManager.currentTime();
622      }
623
624      try {
625        List<String> servers = getRegionServersInZK(zkw);
626        if (
627          servers == null || servers.isEmpty()
628            || (servers.size() == 1 && servers.contains(sn.toString()))
629        ) {
630          LOG.info("ZK shows there is only the master self online, exiting now");
631          // Master could have lost some ZK events, no need to wait more.
632          break;
633        }
634      } catch (KeeperException ke) {
635        LOG.warn("Failed to list regionservers", ke);
636        // ZK is malfunctioning, don't hang here
637        break;
638      }
639      synchronized (onlineServers) {
640        try {
641          if (onlineServersCt == onlineServers.size()) onlineServers.wait(100);
642        } catch (InterruptedException ignored) {
643          // continue
644        }
645      }
646    }
647  }
648
649  private List<String> getRegionServersInZK(final ZKWatcher zkw) throws KeeperException {
650    return ZKUtil.listChildrenNoWatch(zkw, zkw.getZNodePaths().rsZNode);
651  }
652
653  /**
654   * Expire the passed server. Add it to list of dead servers and queue a shutdown processing.
655   * @return pid if we queued a ServerCrashProcedure else {@link Procedure#NO_PROC_ID} if we did not
656   *         (could happen for many reasons including the fact that its this server that is going
657   *         down or we already have queued an SCP for this server or SCP processing is currently
658   *         disabled because we are in startup phase).
659   */
660  // Redo test so we can make this protected.
661  public synchronized long expireServer(final ServerName serverName) {
662    return expireServer(serverName, false);
663
664  }
665
666  synchronized long expireServer(final ServerName serverName, boolean force) {
667    // THIS server is going down... can't handle our own expiration.
668    if (serverName.equals(master.getServerName())) {
669      if (!(master.isAborted() || master.isStopped())) {
670        master.stop("We lost our znode?");
671      }
672      return Procedure.NO_PROC_ID;
673    }
674    if (this.deadservers.isDeadServer(serverName)) {
675      LOG.warn("Expiration called on {} but already in DeadServer", serverName);
676      return Procedure.NO_PROC_ID;
677    }
678    moveFromOnlineToDeadServers(serverName);
679
680    // If server is in draining mode, remove corresponding znode
681    // In some tests, the mocked HM may not have ZK Instance, hence null check
682    if (master.getZooKeeper() != null) {
683      String drainingZnode = ZNodePaths
684        .joinZNode(master.getZooKeeper().getZNodePaths().drainingZNode, serverName.getServerName());
685      try {
686        ZKUtil.deleteNodeFailSilent(master.getZooKeeper(), drainingZnode);
687      } catch (KeeperException e) {
688        LOG.warn(
689          "Error deleting the draining znode for stopping server " + serverName.getServerName(), e);
690      }
691    }
692
693    // If cluster is going down, yes, servers are going to be expiring; don't
694    // process as a dead server
695    if (isClusterShutdown()) {
696      LOG.info("Cluster shutdown set; " + serverName + " expired; onlineServers="
697        + this.onlineServers.size());
698      if (this.onlineServers.isEmpty()) {
699        master.stop("Cluster shutdown set; onlineServer=0");
700      }
701      return Procedure.NO_PROC_ID;
702    }
703    LOG.info("Processing expiration of " + serverName + " on " + this.master.getServerName());
704    long pid = master.getAssignmentManager().submitServerCrash(serverName, true, force);
705    if (pid == Procedure.NO_PROC_ID) {
706      // skip later processing as we failed to submit SCP
707      return Procedure.NO_PROC_ID;
708    }
709    storage.expired(serverName);
710    // Tell our listeners that a server was removed
711    if (!this.listeners.isEmpty()) {
712      this.listeners.stream().forEach(l -> l.serverRemoved(serverName));
713    }
714    // trigger a persist of flushedSeqId
715    if (flushedSeqIdFlusher != null) {
716      flushedSeqIdFlusher.triggerNow();
717    }
718    return pid;
719  }
720
721  /**
722   * Called when server has expired.
723   */
724  // Locking in this class needs cleanup.
725  public synchronized void moveFromOnlineToDeadServers(final ServerName sn) {
726    synchronized (this.onlineServers) {
727      boolean online = this.onlineServers.containsKey(sn);
728      if (online) {
729        // Remove the server from the known servers lists and update load info BUT
730        // add to deadservers first; do this so it'll show in dead servers list if
731        // not in online servers list.
732        this.deadservers.putIfAbsent(sn);
733        this.onlineServers.remove(sn);
734        onlineServers.notifyAll();
735      } else {
736        // If not online, that is odd but may happen if 'Unknown Servers' -- where meta
737        // has references to servers not online nor in dead servers list. If
738        // 'Unknown Server', don't add to DeadServers else will be there for ever.
739        LOG.trace("Expiration of {} but server not online", sn);
740      }
741    }
742  }
743
744  /*
745   * Remove the server from the drain list.
746   */
747  public synchronized boolean removeServerFromDrainList(final ServerName sn) {
748    LOG.info("Removing server {} from the draining list.", sn);
749
750    // Remove the server from the draining servers lists.
751    return this.drainingServers.remove(sn);
752  }
753
754  /**
755   * Add the server to the drain list.
756   * @return True if the server is added or the server is already on the drain list.
757   */
758  public synchronized boolean addServerToDrainList(final ServerName sn) {
759    // If master is not rejecting decommissioned hosts, warn if the server (sn) is not online.
760    // However, we want to add servers even if they're not online if the master is configured
761    // to reject decommissioned hosts
762    if (!rejectDecommissionedHostsConfig && !this.isServerOnline(sn)) {
763      LOG.warn("Server {} is not currently online. Ignoring request to add it to draining list.",
764        sn);
765      return false;
766    }
767
768    // Add the server to the draining servers lists, if it's not already in it.
769    if (this.drainingServers.contains(sn)) {
770      LOG.warn(
771        "Server {} is already in the draining server list. Ignoring request to add it again.", sn);
772      return true;
773    }
774
775    LOG.info("Server {} added to draining server list.", sn);
776    return this.drainingServers.add(sn);
777  }
778
779  /**
780   * Contacts a region server and waits up to timeout ms to close the region. This bypasses the
781   * active hmaster. Pass -1 as timeout if you do not want to wait on result.
782   */
783  public static void closeRegionSilentlyAndWait(AsyncClusterConnection connection,
784    ServerName server, RegionInfo region, long timeout) throws IOException, InterruptedException {
785    AsyncRegionServerAdmin admin = connection.getRegionServerAdmin(server);
786    try {
787      FutureUtils.get(
788        admin.closeRegion(ProtobufUtil.buildCloseRegionRequest(server, region.getRegionName())));
789    } catch (IOException e) {
790      LOG.warn("Exception when closing region: " + region.getRegionNameAsString(), e);
791    }
792    if (timeout < 0) {
793      return;
794    }
795    long expiration = timeout + EnvironmentEdgeManager.currentTime();
796    while (EnvironmentEdgeManager.currentTime() < expiration) {
797      try {
798        RegionInfo rsRegion = ProtobufUtil.toRegionInfo(FutureUtils
799          .get(
800            admin.getRegionInfo(RequestConverter.buildGetRegionInfoRequest(region.getRegionName())))
801          .getRegionInfo());
802        if (rsRegion == null) {
803          return;
804        }
805      } catch (IOException ioe) {
806        if (
807          ioe instanceof NotServingRegionException
808            || (ioe instanceof RemoteWithExtrasException && ((RemoteWithExtrasException) ioe)
809              .unwrapRemoteException() instanceof NotServingRegionException)
810        ) {
811          // no need to retry again
812          return;
813        }
814        LOG.warn("Exception when retrieving regioninfo from: " + region.getRegionNameAsString(),
815          ioe);
816      }
817      Thread.sleep(1000);
818    }
819    throw new IOException("Region " + region + " failed to close within" + " timeout " + timeout);
820  }
821
822  /**
823   * Calculate min necessary to start. This is not an absolute. It is just a friction that will
824   * cause us hang around a bit longer waiting on RegionServers to check-in.
825   */
826  private int getMinToStart() {
827    if (master.isInMaintenanceMode()) {
828      // If in maintenance mode, then in process region server hosting meta will be the only server
829      // available
830      return 1;
831    }
832
833    int minimumRequired = 1;
834    int minToStart = this.master.getConfiguration().getInt(WAIT_ON_REGIONSERVERS_MINTOSTART, -1);
835    // Ensure we are never less than minimumRequired else stuff won't work.
836    return Math.max(minToStart, minimumRequired);
837  }
838
839  /**
840   * Wait for the region servers to report in. We will wait until one of this condition is met: -
841   * the master is stopped - the 'hbase.master.wait.on.regionservers.maxtostart' number of region
842   * servers is reached - the 'hbase.master.wait.on.regionservers.mintostart' is reached AND there
843   * have been no new region server in for 'hbase.master.wait.on.regionservers.interval' time AND
844   * the 'hbase.master.wait.on.regionservers.timeout' is reached
845   */
846  public void waitForRegionServers(MonitoredTask status) throws InterruptedException {
847    final long interval =
848      this.master.getConfiguration().getLong(WAIT_ON_REGIONSERVERS_INTERVAL, 1500);
849    final long timeout =
850      this.master.getConfiguration().getLong(WAIT_ON_REGIONSERVERS_TIMEOUT, 4500);
851    // Min is not an absolute; just a friction making us wait longer on server checkin.
852    int minToStart = getMinToStart();
853    int maxToStart =
854      this.master.getConfiguration().getInt(WAIT_ON_REGIONSERVERS_MAXTOSTART, Integer.MAX_VALUE);
855    if (maxToStart < minToStart) {
856      LOG.warn(String.format("The value of '%s' (%d) is set less than '%s' (%d), ignoring.",
857        WAIT_ON_REGIONSERVERS_MAXTOSTART, maxToStart, WAIT_ON_REGIONSERVERS_MINTOSTART,
858        minToStart));
859      maxToStart = Integer.MAX_VALUE;
860    }
861
862    long now = EnvironmentEdgeManager.currentTime();
863    final long startTime = now;
864    long slept = 0;
865    long lastLogTime = 0;
866    long lastCountChange = startTime;
867    int count = countOfRegionServers();
868    int oldCount = 0;
869    // This while test is a little hard to read. We try to comment it in below but in essence:
870    // Wait if Master is not stopped and the number of regionservers that have checked-in is
871    // less than the maxToStart. Both of these conditions will be true near universally.
872    // Next, we will keep cycling if ANY of the following three conditions are true:
873    // 1. The time since a regionserver registered is < interval (means servers are actively
874    // checking in).
875    // 2. We are under the total timeout.
876    // 3. The count of servers is < minimum.
877    for (ServerListener listener : this.listeners) {
878      listener.waiting();
879    }
880    while (
881      !this.master.isStopped() && !isClusterShutdown() && count < maxToStart
882        && ((lastCountChange + interval) > now || timeout > slept || count < minToStart)
883    ) {
884      // Log some info at every interval time or if there is a change
885      if (oldCount != count || lastLogTime + interval < now) {
886        lastLogTime = now;
887        String msg =
888          "Waiting on regionserver count=" + count + "; waited=" + slept + "ms, expecting min="
889            + minToStart + " server(s), max=" + getStrForMax(maxToStart) + " server(s), "
890            + "timeout=" + timeout + "ms, lastChange=" + (now - lastCountChange) + "ms";
891        LOG.info(msg);
892        status.setStatus(msg);
893      }
894
895      // We sleep for some time
896      final long sleepTime = 50;
897      Thread.sleep(sleepTime);
898      now = EnvironmentEdgeManager.currentTime();
899      slept = now - startTime;
900
901      oldCount = count;
902      count = countOfRegionServers();
903      if (count != oldCount) {
904        lastCountChange = now;
905      }
906    }
907    // Did we exit the loop because cluster is going down?
908    if (isClusterShutdown()) {
909      this.master.stop("Cluster shutdown");
910    }
911    LOG.info("Finished waiting on RegionServer count=" + count + "; waited=" + slept + "ms,"
912      + " expected min=" + minToStart + " server(s), max=" + getStrForMax(maxToStart)
913      + " server(s)," + " master is " + (this.master.isStopped() ? "stopped." : "running"));
914  }
915
916  private String getStrForMax(final int max) {
917    return max == Integer.MAX_VALUE ? "NO_LIMIT" : Integer.toString(max);
918  }
919
920  /** Returns A copy of the internal list of online servers. */
921  public List<ServerName> getOnlineServersList() {
922    // TODO: optimize the load balancer call so we don't need to make a new list
923    // TODO: FIX. THIS IS POPULAR CALL.
924    return new ArrayList<>(this.onlineServers.keySet());
925  }
926
927  /**
928   * @param keys                 The target server name
929   * @param idleServerPredicator Evaluates the server on the given load
930   * @return A copy of the internal list of online servers matched by the predicator
931   */
932  public List<ServerName> getOnlineServersListWithPredicator(List<ServerName> keys,
933    Predicate<ServerMetrics> idleServerPredicator) {
934    List<ServerName> names = new ArrayList<>();
935    if (keys != null && idleServerPredicator != null) {
936      keys.forEach(name -> {
937        ServerMetrics load = onlineServers.get(name);
938        if (load != null) {
939          if (idleServerPredicator.test(load)) {
940            names.add(name);
941          }
942        }
943      });
944    }
945    return names;
946  }
947
948  /** Returns A copy of the internal list of draining servers. */
949  public List<ServerName> getDrainingServersList() {
950    return new ArrayList<>(this.drainingServers);
951  }
952
953  public boolean isServerOnline(ServerName serverName) {
954    return serverName != null && onlineServers.containsKey(serverName);
955  }
956
957  public enum ServerLiveState {
958    LIVE,
959    DEAD,
960    UNKNOWN
961  }
962
963  /** Returns whether the server is online, dead, or unknown. */
964  public synchronized ServerLiveState isServerKnownAndOnline(ServerName serverName) {
965    return onlineServers.containsKey(serverName)
966      ? ServerLiveState.LIVE
967      : (deadservers.isDeadServer(serverName) ? ServerLiveState.DEAD : ServerLiveState.UNKNOWN);
968  }
969
970  /**
971   * Check if a server is known to be dead. A server can be online, or known to be dead, or unknown
972   * to this manager (i.e, not online, not known to be dead either; it is simply not tracked by the
973   * master any more, for example, a very old previous instance).
974   */
975  public synchronized boolean isServerDead(ServerName serverName) {
976    return serverName == null || deadservers.isDeadServer(serverName);
977  }
978
979  /**
980   * Check if a server is unknown. A server can be online, or known to be dead, or unknown to this
981   * manager (i.e, not online, not known to be dead either; it is simply not tracked by the master
982   * any more, for example, a very old previous instance).
983   */
984  public boolean isServerUnknown(ServerName serverName) {
985    return serverName == null
986      || (!onlineServers.containsKey(serverName) && !deadservers.isDeadServer(serverName));
987  }
988
989  public void shutdownCluster() {
990    String statusStr = "Cluster shutdown requested of master=" + this.master.getServerName();
991    LOG.info(statusStr);
992    this.clusterShutdown.set(true);
993    if (onlineServers.isEmpty()) {
994      // we do not synchronize here so this may cause a double stop, but not a big deal
995      master.stop("OnlineServer=0 right after cluster shutdown set");
996    }
997  }
998
999  public boolean isClusterShutdown() {
1000    return this.clusterShutdown.get();
1001  }
1002
1003  /**
1004   * start chore in ServerManager
1005   */
1006  public void startChore() {
1007    Configuration c = master.getConfiguration();
1008    if (persistFlushedSequenceId) {
1009      new Thread(() -> {
1010        // after AM#loadMeta, RegionStates should be loaded, and some regions are
1011        // deleted by drop/split/merge during removeDeletedRegionFromLoadedFlushedSequenceIds,
1012        // but these deleted regions are not added back to RegionStates,
1013        // so we can safely remove deleted regions.
1014        removeDeletedRegionFromLoadedFlushedSequenceIds();
1015      }, "RemoveDeletedRegionSyncThread").start();
1016      int flushPeriod =
1017        c.getInt(FLUSHEDSEQUENCEID_FLUSHER_INTERVAL, FLUSHEDSEQUENCEID_FLUSHER_INTERVAL_DEFAULT);
1018      flushedSeqIdFlusher = new FlushedSequenceIdFlusher("FlushedSequenceIdFlusher", flushPeriod);
1019      master.getChoreService().scheduleChore(flushedSeqIdFlusher);
1020    }
1021  }
1022
1023  /**
1024   * Stop the ServerManager.
1025   */
1026  public void stop() {
1027    if (flushedSeqIdFlusher != null) {
1028      flushedSeqIdFlusher.shutdown();
1029    }
1030    if (persistFlushedSequenceId) {
1031      try {
1032        persistRegionLastFlushedSequenceIds();
1033      } catch (IOException e) {
1034        LOG.warn("Failed to persist last flushed sequence id of regions" + " to file system", e);
1035      }
1036    }
1037  }
1038
1039  /**
1040   * Creates a list of possible destinations for a region. It contains the online servers, but not
1041   * the draining or dying servers.
1042   * @param serversToExclude can be null if there is no server to exclude
1043   */
1044  public List<ServerName> createDestinationServersList(final List<ServerName> serversToExclude) {
1045    Set<ServerName> destServers = new HashSet<>();
1046    onlineServers.forEach((sn, sm) -> {
1047      if (sm.getLastReportTimestamp() > 0) {
1048        // This means we have already called regionServerReport at leaset once, then let's include
1049        // this server for region assignment. This is an optimization to avoid assigning regions to
1050        // an uninitialized server. See HBASE-25032 for more details.
1051        destServers.add(sn);
1052      }
1053    });
1054
1055    if (serversToExclude != null) {
1056      destServers.removeAll(serversToExclude);
1057    }
1058
1059    // Loop through the draining server list and remove them from the server list
1060    final List<ServerName> drainingServersCopy = getDrainingServersList();
1061    destServers.removeAll(drainingServersCopy);
1062
1063    return new ArrayList<>(destServers);
1064  }
1065
1066  /**
1067   * Calls {@link #createDestinationServersList} without server to exclude.
1068   */
1069  public List<ServerName> createDestinationServersList() {
1070    return createDestinationServersList(null);
1071  }
1072
1073  /**
1074   * To clear any dead server with same host name and port of any online server
1075   */
1076  void clearDeadServersWithSameHostNameAndPortOfOnlineServer() {
1077    for (ServerName serverName : getOnlineServersList()) {
1078      deadservers.cleanAllPreviousInstances(serverName);
1079    }
1080  }
1081
1082  /**
1083   * Called by delete table and similar to notify the ServerManager that a region was removed.
1084   */
1085  public void removeRegion(final RegionInfo regionInfo) {
1086    final byte[] encodedName = regionInfo.getEncodedNameAsBytes();
1087    storeFlushedSequenceIdsByRegion.remove(encodedName);
1088    flushedSequenceIdByRegion.remove(encodedName);
1089  }
1090
1091  public boolean isRegionInServerManagerStates(final RegionInfo hri) {
1092    final byte[] encodedName = hri.getEncodedNameAsBytes();
1093    return (storeFlushedSequenceIdsByRegion.containsKey(encodedName)
1094      || flushedSequenceIdByRegion.containsKey(encodedName));
1095  }
1096
1097  /**
1098   * Called by delete table and similar to notify the ServerManager that a region was removed.
1099   */
1100  public void removeRegions(final List<RegionInfo> regions) {
1101    for (RegionInfo hri : regions) {
1102      removeRegion(hri);
1103    }
1104  }
1105
1106  /**
1107   * May return 0 when server is not online.
1108   */
1109  public int getVersionNumber(ServerName serverName) {
1110    ServerMetrics serverMetrics = onlineServers.get(serverName);
1111    return serverMetrics != null ? serverMetrics.getVersionNumber() : 0;
1112  }
1113
1114  /**
1115   * May return "0.0.0" when server is not online
1116   */
1117  public String getVersion(ServerName serverName) {
1118    ServerMetrics serverMetrics = onlineServers.get(serverName);
1119    return serverMetrics != null ? serverMetrics.getVersion() : "0.0.0";
1120  }
1121
1122  public int getInfoPort(ServerName serverName) {
1123    ServerMetrics serverMetrics = onlineServers.get(serverName);
1124    return serverMetrics != null ? serverMetrics.getInfoServerPort() : 0;
1125  }
1126
1127  /**
1128   * Persist last flushed sequence id of each region to HDFS
1129   * @throws IOException if persit to HDFS fails
1130   */
1131  private void persistRegionLastFlushedSequenceIds() throws IOException {
1132    if (isFlushSeqIdPersistInProgress) {
1133      return;
1134    }
1135    isFlushSeqIdPersistInProgress = true;
1136    try {
1137      Configuration conf = master.getConfiguration();
1138      Path rootDir = CommonFSUtils.getRootDir(conf);
1139      Path lastFlushedSeqIdPath = new Path(rootDir, LAST_FLUSHED_SEQ_ID_FILE);
1140      FileSystem fs = FileSystem.get(conf);
1141      if (fs.exists(lastFlushedSeqIdPath)) {
1142        LOG.info("Rewriting .lastflushedseqids file at: " + lastFlushedSeqIdPath);
1143        if (!fs.delete(lastFlushedSeqIdPath, false)) {
1144          throw new IOException("Unable to remove existing " + lastFlushedSeqIdPath);
1145        }
1146      } else {
1147        LOG.info("Writing .lastflushedseqids file at: " + lastFlushedSeqIdPath);
1148      }
1149      FSDataOutputStream out = fs.create(lastFlushedSeqIdPath);
1150      FlushedSequenceId.Builder flushedSequenceIdBuilder = FlushedSequenceId.newBuilder();
1151      try {
1152        for (Entry<byte[], Long> entry : flushedSequenceIdByRegion.entrySet()) {
1153          FlushedRegionSequenceId.Builder flushedRegionSequenceIdBuilder =
1154            FlushedRegionSequenceId.newBuilder();
1155          flushedRegionSequenceIdBuilder.setRegionEncodedName(ByteString.copyFrom(entry.getKey()));
1156          flushedRegionSequenceIdBuilder.setSeqId(entry.getValue());
1157          ConcurrentNavigableMap<byte[], Long> storeSeqIds =
1158            storeFlushedSequenceIdsByRegion.get(entry.getKey());
1159          if (storeSeqIds != null) {
1160            for (Entry<byte[], Long> store : storeSeqIds.entrySet()) {
1161              FlushedStoreSequenceId.Builder flushedStoreSequenceIdBuilder =
1162                FlushedStoreSequenceId.newBuilder();
1163              flushedStoreSequenceIdBuilder.setFamily(ByteString.copyFrom(store.getKey()));
1164              flushedStoreSequenceIdBuilder.setSeqId(store.getValue());
1165              flushedRegionSequenceIdBuilder.addStores(flushedStoreSequenceIdBuilder);
1166            }
1167          }
1168          flushedSequenceIdBuilder.addRegionSequenceId(flushedRegionSequenceIdBuilder);
1169        }
1170        flushedSequenceIdBuilder.build().writeDelimitedTo(out);
1171      } finally {
1172        if (out != null) {
1173          out.close();
1174        }
1175      }
1176    } finally {
1177      isFlushSeqIdPersistInProgress = false;
1178    }
1179  }
1180
1181  /**
1182   * Load last flushed sequence id of each region from HDFS, if persisted
1183   */
1184  public void loadLastFlushedSequenceIds() throws IOException {
1185    if (!persistFlushedSequenceId) {
1186      return;
1187    }
1188    Configuration conf = master.getConfiguration();
1189    Path rootDir = CommonFSUtils.getRootDir(conf);
1190    Path lastFlushedSeqIdPath = new Path(rootDir, LAST_FLUSHED_SEQ_ID_FILE);
1191    FileSystem fs = FileSystem.get(conf);
1192    if (!fs.exists(lastFlushedSeqIdPath)) {
1193      LOG.info("No .lastflushedseqids found at " + lastFlushedSeqIdPath
1194        + " will record last flushed sequence id"
1195        + " for regions by regionserver report all over again");
1196      return;
1197    } else {
1198      LOG.info("begin to load .lastflushedseqids at " + lastFlushedSeqIdPath);
1199    }
1200    FSDataInputStream in = fs.open(lastFlushedSeqIdPath);
1201    try {
1202      FlushedSequenceId flushedSequenceId = FlushedSequenceId.parseDelimitedFrom(in);
1203      if (flushedSequenceId == null) {
1204        LOG.info(".lastflushedseqids found at {} is empty", lastFlushedSeqIdPath);
1205        return;
1206      }
1207      for (FlushedRegionSequenceId flushedRegionSequenceId : flushedSequenceId
1208        .getRegionSequenceIdList()) {
1209        byte[] encodedRegionName = flushedRegionSequenceId.getRegionEncodedName().toByteArray();
1210        flushedSequenceIdByRegion.putIfAbsent(encodedRegionName,
1211          flushedRegionSequenceId.getSeqId());
1212        if (
1213          flushedRegionSequenceId.getStoresList() != null
1214            && flushedRegionSequenceId.getStoresList().size() != 0
1215        ) {
1216          ConcurrentNavigableMap<byte[], Long> storeFlushedSequenceId =
1217            computeIfAbsent(storeFlushedSequenceIdsByRegion, encodedRegionName,
1218              () -> new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR));
1219          for (FlushedStoreSequenceId flushedStoreSequenceId : flushedRegionSequenceId
1220            .getStoresList()) {
1221            storeFlushedSequenceId.put(flushedStoreSequenceId.getFamily().toByteArray(),
1222              flushedStoreSequenceId.getSeqId());
1223          }
1224        }
1225      }
1226    } finally {
1227      in.close();
1228    }
1229  }
1230
1231  /**
1232   * Regions may have been removed between latest persist of FlushedSequenceIds and master abort. So
1233   * after loading FlushedSequenceIds from file, and after meta loaded, we need to remove the
1234   * deleted region according to RegionStates.
1235   */
1236  public void removeDeletedRegionFromLoadedFlushedSequenceIds() {
1237    RegionStates regionStates = master.getAssignmentManager().getRegionStates();
1238    Iterator<byte[]> it = flushedSequenceIdByRegion.keySet().iterator();
1239    while (it.hasNext()) {
1240      byte[] regionEncodedName = it.next();
1241      if (regionStates.getRegionState(Bytes.toStringBinary(regionEncodedName)) == null) {
1242        it.remove();
1243        storeFlushedSequenceIdsByRegion.remove(regionEncodedName);
1244      }
1245    }
1246  }
1247
1248  private class FlushedSequenceIdFlusher extends ScheduledChore {
1249
1250    public FlushedSequenceIdFlusher(String name, int p) {
1251      super(name, master, p, 60 * 1000); // delay one minute before first execute
1252    }
1253
1254    @Override
1255    protected void chore() {
1256      try {
1257        persistRegionLastFlushedSequenceIds();
1258      } catch (IOException e) {
1259        LOG.debug("Failed to persist last flushed sequence id of regions" + " to file system", e);
1260      }
1261    }
1262  }
1263}