001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master; 019 020import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent; 021 022import java.io.IOException; 023import java.net.InetAddress; 024import java.util.ArrayList; 025import java.util.Collections; 026import java.util.HashSet; 027import java.util.Iterator; 028import java.util.List; 029import java.util.Map; 030import java.util.Map.Entry; 031import java.util.Objects; 032import java.util.Set; 033import java.util.concurrent.ConcurrentNavigableMap; 034import java.util.concurrent.ConcurrentSkipListMap; 035import java.util.concurrent.CopyOnWriteArrayList; 036import java.util.concurrent.atomic.AtomicBoolean; 037import java.util.function.Predicate; 038import org.apache.hadoop.conf.Configuration; 039import org.apache.hadoop.fs.FSDataInputStream; 040import org.apache.hadoop.fs.FSDataOutputStream; 041import org.apache.hadoop.fs.FileSystem; 042import org.apache.hadoop.fs.Path; 043import org.apache.hadoop.hbase.ClockOutOfSyncException; 044import org.apache.hadoop.hbase.HConstants; 045import org.apache.hadoop.hbase.NotServingRegionException; 046import org.apache.hadoop.hbase.RegionMetrics; 047import org.apache.hadoop.hbase.ScheduledChore; 048import org.apache.hadoop.hbase.ServerMetrics; 049import org.apache.hadoop.hbase.ServerMetricsBuilder; 050import org.apache.hadoop.hbase.ServerName; 051import org.apache.hadoop.hbase.YouAreDeadException; 052import org.apache.hadoop.hbase.client.AsyncClusterConnection; 053import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin; 054import org.apache.hadoop.hbase.client.RegionInfo; 055import org.apache.hadoop.hbase.conf.ConfigurationObserver; 056import org.apache.hadoop.hbase.ipc.DecommissionedHostRejectedException; 057import org.apache.hadoop.hbase.ipc.RemoteWithExtrasException; 058import org.apache.hadoop.hbase.master.assignment.RegionStates; 059import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure; 060import org.apache.hadoop.hbase.monitoring.MonitoredTask; 061import org.apache.hadoop.hbase.procedure2.Procedure; 062import org.apache.hadoop.hbase.util.Bytes; 063import org.apache.hadoop.hbase.util.CommonFSUtils; 064import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 065import org.apache.hadoop.hbase.util.FutureUtils; 066import org.apache.hadoop.hbase.zookeeper.ZKUtil; 067import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 068import org.apache.hadoop.hbase.zookeeper.ZNodePaths; 069import org.apache.yetus.audience.InterfaceAudience; 070import org.apache.zookeeper.KeeperException; 071import org.slf4j.Logger; 072import org.slf4j.LoggerFactory; 073 074import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 075import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 076 077import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 078import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 079import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds; 080import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.StoreSequenceId; 081import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.FlushedRegionSequenceId; 082import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.FlushedSequenceId; 083import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.FlushedStoreSequenceId; 084import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest; 085 086/** 087 * The ServerManager class manages info about region servers. 088 * <p> 089 * Maintains lists of online and dead servers. Processes the startups, shutdowns, and deaths of 090 * region servers. 091 * <p> 092 * Servers are distinguished in two different ways. A given server has a location, specified by 093 * hostname and port, and of which there can only be one online at any given time. A server instance 094 * is specified by the location (hostname and port) as well as the startcode (timestamp from when 095 * the server was started). This is used to differentiate a restarted instance of a given server 096 * from the original instance. 097 * <p> 098 * If a sever is known not to be running any more, it is called dead. The dead server needs to be 099 * handled by a ServerShutdownHandler. If the handler is not enabled yet, the server can't be 100 * handled right away so it is queued up. After the handler is enabled, the server will be submitted 101 * to a handler to handle. However, the handler may be just partially enabled. If so, the server 102 * cannot be fully processed, and be queued up for further processing. A server is fully processed 103 * only after the handler is fully enabled and has completed the handling. 104 */ 105@InterfaceAudience.Private 106public class ServerManager implements ConfigurationObserver { 107 public static final String WAIT_ON_REGIONSERVERS_MAXTOSTART = 108 "hbase.master.wait.on.regionservers.maxtostart"; 109 110 public static final String WAIT_ON_REGIONSERVERS_MINTOSTART = 111 "hbase.master.wait.on.regionservers.mintostart"; 112 113 public static final String WAIT_ON_REGIONSERVERS_TIMEOUT = 114 "hbase.master.wait.on.regionservers.timeout"; 115 116 public static final String WAIT_ON_REGIONSERVERS_INTERVAL = 117 "hbase.master.wait.on.regionservers.interval"; 118 119 /** 120 * see HBASE-20727 if set to true, flushedSequenceIdByRegion and storeFlushedSequenceIdsByRegion 121 * will be persisted to HDFS and loaded when master restart to speed up log split 122 */ 123 public static final String PERSIST_FLUSHEDSEQUENCEID = 124 "hbase.master.persist.flushedsequenceid.enabled"; 125 126 public static final boolean PERSIST_FLUSHEDSEQUENCEID_DEFAULT = true; 127 128 public static final String FLUSHEDSEQUENCEID_FLUSHER_INTERVAL = 129 "hbase.master.flushedsequenceid.flusher.interval"; 130 131 public static final int FLUSHEDSEQUENCEID_FLUSHER_INTERVAL_DEFAULT = 3 * 60 * 60 * 1000; // 3 132 // hours 133 134 public static final String MAX_CLOCK_SKEW_MS = "hbase.master.maxclockskew"; 135 136 private static final Logger LOG = LoggerFactory.getLogger(ServerManager.class); 137 138 // Set if we are to shutdown the cluster. 139 private AtomicBoolean clusterShutdown = new AtomicBoolean(false); 140 141 /** 142 * The last flushed sequence id for a region. 143 */ 144 private final ConcurrentNavigableMap<byte[], Long> flushedSequenceIdByRegion = 145 new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR); 146 147 private boolean persistFlushedSequenceId = true; 148 private volatile boolean isFlushSeqIdPersistInProgress = false; 149 /** File on hdfs to store last flushed sequence id of regions */ 150 private static final String LAST_FLUSHED_SEQ_ID_FILE = ".lastflushedseqids"; 151 private FlushedSequenceIdFlusher flushedSeqIdFlusher; 152 153 /** 154 * The last flushed sequence id for a store in a region. 155 */ 156 private final ConcurrentNavigableMap<byte[], 157 ConcurrentNavigableMap<byte[], Long>> storeFlushedSequenceIdsByRegion = 158 new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR); 159 160 /** Map of registered servers to their current load */ 161 private final ConcurrentNavigableMap<ServerName, ServerMetrics> onlineServers = 162 new ConcurrentSkipListMap<>(); 163 164 /** List of region servers that should not get any more new regions. */ 165 private final ArrayList<ServerName> drainingServers = new ArrayList<>(); 166 167 private final MasterServices master; 168 private final RegionServerList storage; 169 170 private final DeadServer deadservers = new DeadServer(); 171 172 private final long maxSkew; 173 private final long warningSkew; 174 175 /** Listeners that are called on server events. */ 176 private List<ServerListener> listeners = new CopyOnWriteArrayList<>(); 177 178 /** Configured value of HConstants.REJECT_DECOMMISSIONED_HOSTS_KEY */ 179 private volatile boolean rejectDecommissionedHostsConfig; 180 181 /** 182 * Constructor. 183 */ 184 public ServerManager(final MasterServices master, RegionServerList storage) { 185 this.master = master; 186 this.storage = storage; 187 Configuration c = master.getConfiguration(); 188 maxSkew = c.getLong(MAX_CLOCK_SKEW_MS, 30000); 189 warningSkew = c.getLong("hbase.master.warningclockskew", 10000); 190 persistFlushedSequenceId = 191 c.getBoolean(PERSIST_FLUSHEDSEQUENCEID, PERSIST_FLUSHEDSEQUENCEID_DEFAULT); 192 rejectDecommissionedHostsConfig = getRejectDecommissionedHostsConfig(c); 193 } 194 195 /** 196 * Implementation of the ConfigurationObserver interface. We are interested in live-loading the 197 * configuration value of HConstants.REJECT_DECOMMISSIONED_HOSTS_KEY 198 * @param conf Server configuration instance 199 */ 200 @Override 201 public void onConfigurationChange(Configuration conf) { 202 final boolean newValue = getRejectDecommissionedHostsConfig(conf); 203 if (rejectDecommissionedHostsConfig == newValue) { 204 // no-op 205 return; 206 } 207 208 LOG.info("Config Reload for RejectDecommissionedHosts. previous value: {}, new value: {}", 209 rejectDecommissionedHostsConfig, newValue); 210 211 rejectDecommissionedHostsConfig = newValue; 212 } 213 214 /** 215 * Reads the value of HConstants.REJECT_DECOMMISSIONED_HOSTS_KEY from the config and returns it 216 * @param conf Configuration instance of the Master 217 */ 218 public boolean getRejectDecommissionedHostsConfig(Configuration conf) { 219 return conf.getBoolean(HConstants.REJECT_DECOMMISSIONED_HOSTS_KEY, 220 HConstants.REJECT_DECOMMISSIONED_HOSTS_DEFAULT); 221 } 222 223 /** 224 * Add the listener to the notification list. 225 * @param listener The ServerListener to register 226 */ 227 public void registerListener(final ServerListener listener) { 228 this.listeners.add(listener); 229 } 230 231 /** 232 * Remove the listener from the notification list. 233 * @param listener The ServerListener to unregister 234 */ 235 public boolean unregisterListener(final ServerListener listener) { 236 return this.listeners.remove(listener); 237 } 238 239 /** 240 * Let the server manager know a new regionserver has come online 241 * @param request the startup request 242 * @param versionNumber the version number of the new regionserver 243 * @param version the version of the new regionserver, could contain strings like "SNAPSHOT" 244 * @param ia the InetAddress from which request is received 245 * @return The ServerName we know this server as. 246 */ 247 ServerName regionServerStartup(RegionServerStartupRequest request, int versionNumber, 248 String version, InetAddress ia) throws IOException { 249 // Test for case where we get a region startup message from a regionserver 250 // that has been quickly restarted but whose znode expiration handler has 251 // not yet run, or from a server whose fail we are currently processing. 252 // Test its host+port combo is present in serverAddressToServerInfo. If it 253 // is, reject the server and trigger its expiration. The next time it comes 254 // in, it should have been removed from serverAddressToServerInfo and queued 255 // for processing by ProcessServerShutdown. 256 257 // if use-ip is enabled, we will use ip to expose Master/RS service for client, 258 // see HBASE-27304 for details. 259 boolean useIp = master.getConfiguration().getBoolean(HConstants.HBASE_SERVER_USEIP_ENABLED_KEY, 260 HConstants.HBASE_SERVER_USEIP_ENABLED_DEFAULT); 261 String isaHostName = useIp ? ia.getHostAddress() : ia.getHostName(); 262 final String hostname = 263 request.hasUseThisHostnameInstead() ? request.getUseThisHostnameInstead() : isaHostName; 264 ServerName sn = ServerName.valueOf(hostname, request.getPort(), request.getServerStartCode()); 265 266 // Check if the host should be rejected based on it's decommissioned status 267 checkRejectableDecommissionedStatus(sn); 268 269 checkClockSkew(sn, request.getServerCurrentTime()); 270 checkIsDead(sn, "STARTUP"); 271 if (!checkAndRecordNewServer(sn, ServerMetricsBuilder.of(sn, versionNumber, version))) { 272 LOG.warn("THIS SHOULD NOT HAPPEN, RegionServerStartup could not record the server: {}", sn); 273 } 274 storage.started(sn); 275 return sn; 276 } 277 278 /** 279 * Updates last flushed sequence Ids for the regions on server sn 280 */ 281 private void updateLastFlushedSequenceIds(ServerName sn, ServerMetrics hsl) { 282 for (Entry<byte[], RegionMetrics> entry : hsl.getRegionMetrics().entrySet()) { 283 byte[] encodedRegionName = Bytes.toBytes(RegionInfo.encodeRegionName(entry.getKey())); 284 Long existingValue = flushedSequenceIdByRegion.get(encodedRegionName); 285 long l = entry.getValue().getCompletedSequenceId(); 286 // Don't let smaller sequence ids override greater sequence ids. 287 if (LOG.isTraceEnabled()) { 288 LOG.trace(Bytes.toString(encodedRegionName) + ", existingValue=" + existingValue 289 + ", completeSequenceId=" + l); 290 } 291 if (existingValue == null || (l != HConstants.NO_SEQNUM && l > existingValue)) { 292 flushedSequenceIdByRegion.put(encodedRegionName, l); 293 } else if (l != HConstants.NO_SEQNUM && l < existingValue) { 294 LOG.warn("RegionServer " + sn + " indicates a last flushed sequence id (" + l 295 + ") that is less than the previous last flushed sequence id (" + existingValue 296 + ") for region " + Bytes.toString(entry.getKey()) + " Ignoring."); 297 } 298 ConcurrentNavigableMap<byte[], Long> storeFlushedSequenceId = 299 computeIfAbsent(storeFlushedSequenceIdsByRegion, encodedRegionName, 300 () -> new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR)); 301 for (Entry<byte[], Long> storeSeqId : entry.getValue().getStoreSequenceId().entrySet()) { 302 byte[] family = storeSeqId.getKey(); 303 existingValue = storeFlushedSequenceId.get(family); 304 l = storeSeqId.getValue(); 305 if (LOG.isTraceEnabled()) { 306 LOG.trace(Bytes.toString(encodedRegionName) + ", family=" + Bytes.toString(family) 307 + ", existingValue=" + existingValue + ", completeSequenceId=" + l); 308 } 309 // Don't let smaller sequence ids override greater sequence ids. 310 if (existingValue == null || (l != HConstants.NO_SEQNUM && l > existingValue.longValue())) { 311 storeFlushedSequenceId.put(family, l); 312 } 313 } 314 } 315 } 316 317 public void regionServerReport(ServerName sn, ServerMetrics sl) throws YouAreDeadException { 318 checkIsDead(sn, "REPORT"); 319 if (null == this.onlineServers.replace(sn, sl)) { 320 // Already have this host+port combo and its just different start code? 321 // Just let the server in. Presume master joining a running cluster. 322 // recordNewServer is what happens at the end of reportServerStartup. 323 // The only thing we are skipping is passing back to the regionserver 324 // the ServerName to use. Here we presume a master has already done 325 // that so we'll press on with whatever it gave us for ServerName. 326 if (!checkAndRecordNewServer(sn, sl)) { 327 // Master already registered server with same (host + port) and higher startcode. 328 // This can happen if regionserver report comes late from old server 329 // (possible race condition), by that time master has already processed SCP for that 330 // server and started accepting regionserver report from new server i.e. server with 331 // same (host + port) and higher startcode. 332 // The exception thrown here is not meant to tell the region server it is dead because if 333 // there is a new server on the same host port, the old server should have already been 334 // dead in ideal situation. 335 // The exception thrown here is to skip the later steps of the whole regionServerReport 336 // request processing. Usually, after recording it in ServerManager, we will call the 337 // related methods in AssignmentManager to record region states. If the region server 338 // is already dead, we should not do these steps anymore, so here we throw an exception 339 // to let the upper layer know that they should not continue processing anymore. 340 final String errorMsg = "RegionServerReport received from " + sn 341 + ", but another server with the same name and higher startcode is already registered," 342 + " ignoring"; 343 LOG.warn(errorMsg); 344 throw new YouAreDeadException(errorMsg); 345 } 346 } 347 updateLastFlushedSequenceIds(sn, sl); 348 } 349 350 /** 351 * Checks if the Master is configured to reject decommissioned hosts or not. When it's configured 352 * to do so, any RegionServer trying to join the cluster will have it's host checked against the 353 * list of hosts of currently decommissioned servers and potentially get prevented from reporting 354 * for duty; otherwise, we do nothing and we let them pass to the next check. See HBASE-28342 for 355 * details. 356 * @param sn The ServerName to check for 357 * @throws DecommissionedHostRejectedException if the Master is configured to reject 358 * decommissioned hosts and this host exists in the 359 * list of the decommissioned servers 360 */ 361 private void checkRejectableDecommissionedStatus(ServerName sn) 362 throws DecommissionedHostRejectedException { 363 LOG.info("Checking decommissioned status of RegionServer {}", sn.getServerName()); 364 365 // If the Master is not configured to reject decommissioned hosts, return early. 366 if (!rejectDecommissionedHostsConfig) { 367 return; 368 } 369 370 // Look for a match for the hostname in the list of decommissioned servers 371 for (ServerName server : getDrainingServersList()) { 372 if (Objects.equals(server.getHostname(), sn.getHostname())) { 373 // Found a match and master is configured to reject decommissioned hosts, throw exception! 374 LOG.warn( 375 "Rejecting RegionServer {} from reporting for duty because Master is configured " 376 + "to reject decommissioned hosts and this host was marked as such in the past.", 377 sn.getServerName()); 378 throw new DecommissionedHostRejectedException(String.format( 379 "Host %s exists in the list of decommissioned servers and Master is configured to " 380 + "reject decommissioned hosts", 381 sn.getHostname())); 382 } 383 } 384 } 385 386 /** 387 * Check is a server of same host and port already exists, if not, or the existed one got a 388 * smaller start code, record it. 389 * @param serverName the server to check and record 390 * @param sl the server load on the server 391 * @return true if the server is recorded, otherwise, false 392 */ 393 boolean checkAndRecordNewServer(final ServerName serverName, final ServerMetrics sl) { 394 ServerName existingServer = null; 395 synchronized (this.onlineServers) { 396 existingServer = findServerWithSameHostnamePortWithLock(serverName); 397 if (existingServer != null && (existingServer.getStartcode() > serverName.getStartcode())) { 398 LOG.info("Server serverName=" + serverName + " rejected; we already have " 399 + existingServer.toString() + " registered with same hostname and port"); 400 return false; 401 } 402 recordNewServerWithLock(serverName, sl); 403 } 404 405 // Tell our listeners that a server was added 406 if (!this.listeners.isEmpty()) { 407 for (ServerListener listener : this.listeners) { 408 listener.serverAdded(serverName); 409 } 410 } 411 412 // Note that we assume that same ts means same server, and don't expire in that case. 413 // TODO: ts can theoretically collide due to clock shifts, so this is a bit hacky. 414 if (existingServer != null && (existingServer.getStartcode() < serverName.getStartcode())) { 415 LOG.info("Triggering server recovery; existingServer " + existingServer 416 + " looks stale, new server:" + serverName); 417 expireServer(existingServer); 418 } 419 return true; 420 } 421 422 /** 423 * Find out the region servers crashed between the crash of the previous master instance and the 424 * current master instance and schedule SCP for them. 425 * <p/> 426 * Since the {@code RegionServerTracker} has already helped us to construct the online servers set 427 * by scanning zookeeper, now we can compare the online servers with {@code liveServersFromWALDir} 428 * to find out whether there are servers which are already dead. 429 * <p/> 430 * Must be called inside the initialization method of {@code RegionServerTracker} to avoid 431 * concurrency issue. 432 * @param deadServersFromPE the region servers which already have a SCP associated. 433 * @param liveServersFromWALDir the live region servers from wal directory. 434 */ 435 void findDeadServersAndProcess(Set<ServerName> deadServersFromPE, 436 Set<ServerName> liveServersFromWALDir) { 437 deadServersFromPE.forEach(deadservers::putIfAbsent); 438 liveServersFromWALDir.stream().filter(sn -> !onlineServers.containsKey(sn)) 439 .forEach(this::expireServer); 440 } 441 442 /** 443 * Checks if the clock skew between the server and the master. If the clock skew exceeds the 444 * configured max, it will throw an exception; if it exceeds the configured warning threshold, it 445 * will log a warning but start normally. 446 * @param serverName Incoming servers's name 447 * @throws ClockOutOfSyncException if the skew exceeds the configured max value 448 */ 449 private void checkClockSkew(final ServerName serverName, final long serverCurrentTime) 450 throws ClockOutOfSyncException { 451 long skew = Math.abs(EnvironmentEdgeManager.currentTime() - serverCurrentTime); 452 if (skew > maxSkew) { 453 String message = "Server " + serverName + " has been " 454 + "rejected; Reported time is too far out of sync with master. " + "Time difference of " 455 + skew + "ms > max allowed of " + maxSkew + "ms"; 456 LOG.warn(message); 457 throw new ClockOutOfSyncException(message); 458 } else if (skew > warningSkew) { 459 String message = "Reported time for server " + serverName + " is out of sync with master " 460 + "by " + skew + "ms. (Warning threshold is " + warningSkew + "ms; " + "error threshold is " 461 + maxSkew + "ms)"; 462 LOG.warn(message); 463 } 464 } 465 466 /** 467 * Called when RegionServer first reports in for duty and thereafter each time it heartbeats to 468 * make sure it is has not been figured for dead. If this server is on the dead list, reject it 469 * with a YouAreDeadException. If it was dead but came back with a new start code, remove the old 470 * entry from the dead list. 471 * @param what START or REPORT 472 */ 473 private void checkIsDead(final ServerName serverName, final String what) 474 throws YouAreDeadException { 475 if (this.deadservers.isDeadServer(serverName)) { 476 // Exact match: host name, port and start code all match with existing one of the 477 // dead servers. So, this server must be dead. Tell it to kill itself. 478 String message = 479 "Server " + what + " rejected; currently processing " + serverName + " as dead server"; 480 LOG.debug(message); 481 throw new YouAreDeadException(message); 482 } 483 // Remove dead server with same hostname and port of newly checking in rs after master 484 // initialization. See HBASE-5916 for more information. 485 if ( 486 (this.master == null || this.master.isInitialized()) 487 && this.deadservers.cleanPreviousInstance(serverName) 488 ) { 489 // This server has now become alive after we marked it as dead. 490 // We removed it's previous entry from the dead list to reflect it. 491 LOG.debug("{} {} came back up, removed it from the dead servers list", what, serverName); 492 } 493 } 494 495 /** 496 * Assumes onlineServers is locked. 497 * @return ServerName with matching hostname and port. 498 */ 499 public ServerName findServerWithSameHostnamePortWithLock(final ServerName serverName) { 500 ServerName end = 501 ServerName.valueOf(serverName.getHostname(), serverName.getPort(), Long.MAX_VALUE); 502 503 ServerName r = onlineServers.lowerKey(end); 504 if (r != null) { 505 if (ServerName.isSameAddress(r, serverName)) { 506 return r; 507 } 508 } 509 return null; 510 } 511 512 /** 513 * Adds the onlineServers list. onlineServers should be locked. 514 * @param serverName The remote servers name. 515 */ 516 void recordNewServerWithLock(final ServerName serverName, final ServerMetrics sl) { 517 LOG.info("Registering regionserver=" + serverName); 518 this.onlineServers.put(serverName, sl); 519 master.getAssignmentManager().getRegionStates().createServer(serverName); 520 } 521 522 public ConcurrentNavigableMap<byte[], Long> getFlushedSequenceIdByRegion() { 523 return flushedSequenceIdByRegion; 524 } 525 526 public RegionStoreSequenceIds getLastFlushedSequenceId(byte[] encodedRegionName) { 527 RegionStoreSequenceIds.Builder builder = RegionStoreSequenceIds.newBuilder(); 528 Long seqId = flushedSequenceIdByRegion.get(encodedRegionName); 529 builder.setLastFlushedSequenceId(seqId != null ? seqId.longValue() : HConstants.NO_SEQNUM); 530 Map<byte[], Long> storeFlushedSequenceId = 531 storeFlushedSequenceIdsByRegion.get(encodedRegionName); 532 if (storeFlushedSequenceId != null) { 533 for (Map.Entry<byte[], Long> entry : storeFlushedSequenceId.entrySet()) { 534 builder.addStoreSequenceId(StoreSequenceId.newBuilder() 535 .setFamilyName(UnsafeByteOperations.unsafeWrap(entry.getKey())) 536 .setSequenceId(entry.getValue().longValue()).build()); 537 } 538 } 539 return builder.build(); 540 } 541 542 /** Returns ServerMetrics if serverName is known else null */ 543 public ServerMetrics getLoad(final ServerName serverName) { 544 return this.onlineServers.get(serverName); 545 } 546 547 /** 548 * Compute the average load across all region servers. Currently, this uses a very naive 549 * computation - just uses the number of regions being served, ignoring stats about number of 550 * requests. 551 * @return the average load 552 */ 553 public double getAverageLoad() { 554 int totalLoad = 0; 555 int numServers = 0; 556 for (ServerMetrics sl : this.onlineServers.values()) { 557 numServers++; 558 totalLoad += sl.getRegionMetrics().size(); 559 } 560 return numServers == 0 ? 0 : (double) totalLoad / (double) numServers; 561 } 562 563 /** Returns the count of active regionservers */ 564 public int countOfRegionServers() { 565 // Presumes onlineServers is a concurrent map 566 return this.onlineServers.size(); 567 } 568 569 /** Returns Read-only map of servers to serverinfo */ 570 public Map<ServerName, ServerMetrics> getOnlineServers() { 571 // Presumption is that iterating the returned Map is OK. 572 synchronized (this.onlineServers) { 573 return Collections.unmodifiableMap(this.onlineServers); 574 } 575 } 576 577 public DeadServer getDeadServers() { 578 return this.deadservers; 579 } 580 581 /** 582 * Checks if any dead servers are currently in progress. 583 * @return true if any RS are being processed as dead, false if not 584 */ 585 public boolean areDeadServersInProgress() throws IOException { 586 return master.getProcedures().stream() 587 .anyMatch(p -> !p.isFinished() && p instanceof ServerCrashProcedure); 588 } 589 590 void letRegionServersShutdown() { 591 long previousLogTime = 0; 592 ServerName sn = master.getServerName(); 593 ZKWatcher zkw = master.getZooKeeper(); 594 int onlineServersCt; 595 while ((onlineServersCt = onlineServers.size()) > 0) { 596 if (EnvironmentEdgeManager.currentTime() > (previousLogTime + 1000)) { 597 Set<ServerName> remainingServers = onlineServers.keySet(); 598 synchronized (onlineServers) { 599 if (remainingServers.size() == 1 && remainingServers.contains(sn)) { 600 // Master will delete itself later. 601 return; 602 } 603 } 604 StringBuilder sb = new StringBuilder(); 605 // It's ok here to not sync on onlineServers - merely logging 606 for (ServerName key : remainingServers) { 607 if (sb.length() > 0) { 608 sb.append(", "); 609 } 610 sb.append(key); 611 } 612 LOG.info("Waiting on regionserver(s) " + sb.toString()); 613 previousLogTime = EnvironmentEdgeManager.currentTime(); 614 } 615 616 try { 617 List<String> servers = getRegionServersInZK(zkw); 618 if ( 619 servers == null || servers.isEmpty() 620 || (servers.size() == 1 && servers.contains(sn.toString())) 621 ) { 622 LOG.info("ZK shows there is only the master self online, exiting now"); 623 // Master could have lost some ZK events, no need to wait more. 624 break; 625 } 626 } catch (KeeperException ke) { 627 LOG.warn("Failed to list regionservers", ke); 628 // ZK is malfunctioning, don't hang here 629 break; 630 } 631 synchronized (onlineServers) { 632 try { 633 if (onlineServersCt == onlineServers.size()) onlineServers.wait(100); 634 } catch (InterruptedException ignored) { 635 // continue 636 } 637 } 638 } 639 } 640 641 private List<String> getRegionServersInZK(final ZKWatcher zkw) throws KeeperException { 642 return ZKUtil.listChildrenNoWatch(zkw, zkw.getZNodePaths().rsZNode); 643 } 644 645 /** 646 * Expire the passed server. Add it to list of dead servers and queue a shutdown processing. 647 * @return pid if we queued a ServerCrashProcedure else {@link Procedure#NO_PROC_ID} if we did not 648 * (could happen for many reasons including the fact that its this server that is going 649 * down or we already have queued an SCP for this server or SCP processing is currently 650 * disabled because we are in startup phase). 651 */ 652 // Redo test so we can make this protected. 653 public synchronized long expireServer(final ServerName serverName) { 654 return expireServer(serverName, false); 655 656 } 657 658 synchronized long expireServer(final ServerName serverName, boolean force) { 659 // THIS server is going down... can't handle our own expiration. 660 if (serverName.equals(master.getServerName())) { 661 if (!(master.isAborted() || master.isStopped())) { 662 master.stop("We lost our znode?"); 663 } 664 return Procedure.NO_PROC_ID; 665 } 666 if (this.deadservers.isDeadServer(serverName)) { 667 LOG.warn("Expiration called on {} but already in DeadServer", serverName); 668 return Procedure.NO_PROC_ID; 669 } 670 moveFromOnlineToDeadServers(serverName); 671 672 // If server is in draining mode, remove corresponding znode 673 // In some tests, the mocked HM may not have ZK Instance, hence null check 674 if (master.getZooKeeper() != null) { 675 String drainingZnode = ZNodePaths 676 .joinZNode(master.getZooKeeper().getZNodePaths().drainingZNode, serverName.getServerName()); 677 try { 678 ZKUtil.deleteNodeFailSilent(master.getZooKeeper(), drainingZnode); 679 } catch (KeeperException e) { 680 LOG.warn( 681 "Error deleting the draining znode for stopping server " + serverName.getServerName(), e); 682 } 683 } 684 685 // If cluster is going down, yes, servers are going to be expiring; don't 686 // process as a dead server 687 if (isClusterShutdown()) { 688 LOG.info("Cluster shutdown set; " + serverName + " expired; onlineServers=" 689 + this.onlineServers.size()); 690 if (this.onlineServers.isEmpty()) { 691 master.stop("Cluster shutdown set; onlineServer=0"); 692 } 693 return Procedure.NO_PROC_ID; 694 } 695 LOG.info("Processing expiration of " + serverName + " on " + this.master.getServerName()); 696 long pid = master.getAssignmentManager().submitServerCrash(serverName, true, force); 697 if (pid == Procedure.NO_PROC_ID) { 698 // skip later processing as we failed to submit SCP 699 return Procedure.NO_PROC_ID; 700 } 701 storage.expired(serverName); 702 // Tell our listeners that a server was removed 703 if (!this.listeners.isEmpty()) { 704 this.listeners.stream().forEach(l -> l.serverRemoved(serverName)); 705 } 706 // trigger a persist of flushedSeqId 707 if (flushedSeqIdFlusher != null) { 708 flushedSeqIdFlusher.triggerNow(); 709 } 710 return pid; 711 } 712 713 /** 714 * Called when server has expired. 715 */ 716 // Locking in this class needs cleanup. 717 public synchronized void moveFromOnlineToDeadServers(final ServerName sn) { 718 synchronized (this.onlineServers) { 719 boolean online = this.onlineServers.containsKey(sn); 720 if (online) { 721 // Remove the server from the known servers lists and update load info BUT 722 // add to deadservers first; do this so it'll show in dead servers list if 723 // not in online servers list. 724 this.deadservers.putIfAbsent(sn); 725 this.onlineServers.remove(sn); 726 onlineServers.notifyAll(); 727 } else { 728 // If not online, that is odd but may happen if 'Unknown Servers' -- where meta 729 // has references to servers not online nor in dead servers list. If 730 // 'Unknown Server', don't add to DeadServers else will be there for ever. 731 LOG.trace("Expiration of {} but server not online", sn); 732 } 733 } 734 } 735 736 /* 737 * Remove the server from the drain list. 738 */ 739 public synchronized boolean removeServerFromDrainList(final ServerName sn) { 740 LOG.info("Removing server {} from the draining list.", sn); 741 742 // Remove the server from the draining servers lists. 743 return this.drainingServers.remove(sn); 744 } 745 746 /** 747 * Add the server to the drain list. 748 * @return True if the server is added or the server is already on the drain list. 749 */ 750 public synchronized boolean addServerToDrainList(final ServerName sn) { 751 // If master is not rejecting decommissioned hosts, warn if the server (sn) is not online. 752 // However, we want to add servers even if they're not online if the master is configured 753 // to reject decommissioned hosts 754 if (!rejectDecommissionedHostsConfig && !this.isServerOnline(sn)) { 755 LOG.warn("Server {} is not currently online. Ignoring request to add it to draining list.", 756 sn); 757 return false; 758 } 759 760 // Add the server to the draining servers lists, if it's not already in it. 761 if (this.drainingServers.contains(sn)) { 762 LOG.warn( 763 "Server {} is already in the draining server list. Ignoring request to add it again.", sn); 764 return true; 765 } 766 767 LOG.info("Server {} added to draining server list.", sn); 768 return this.drainingServers.add(sn); 769 } 770 771 /** 772 * Contacts a region server and waits up to timeout ms to close the region. This bypasses the 773 * active hmaster. Pass -1 as timeout if you do not want to wait on result. 774 */ 775 public static void closeRegionSilentlyAndWait(AsyncClusterConnection connection, 776 ServerName server, RegionInfo region, long timeout) throws IOException, InterruptedException { 777 AsyncRegionServerAdmin admin = connection.getRegionServerAdmin(server); 778 try { 779 FutureUtils.get( 780 admin.closeRegion(ProtobufUtil.buildCloseRegionRequest(server, region.getRegionName()))); 781 } catch (IOException e) { 782 LOG.warn("Exception when closing region: " + region.getRegionNameAsString(), e); 783 } 784 if (timeout < 0) { 785 return; 786 } 787 long expiration = timeout + EnvironmentEdgeManager.currentTime(); 788 while (EnvironmentEdgeManager.currentTime() < expiration) { 789 try { 790 RegionInfo rsRegion = ProtobufUtil.toRegionInfo(FutureUtils 791 .get( 792 admin.getRegionInfo(RequestConverter.buildGetRegionInfoRequest(region.getRegionName()))) 793 .getRegionInfo()); 794 if (rsRegion == null) { 795 return; 796 } 797 } catch (IOException ioe) { 798 if ( 799 ioe instanceof NotServingRegionException 800 || (ioe instanceof RemoteWithExtrasException && ((RemoteWithExtrasException) ioe) 801 .unwrapRemoteException() instanceof NotServingRegionException) 802 ) { 803 // no need to retry again 804 return; 805 } 806 LOG.warn("Exception when retrieving regioninfo from: " + region.getRegionNameAsString(), 807 ioe); 808 } 809 Thread.sleep(1000); 810 } 811 throw new IOException("Region " + region + " failed to close within" + " timeout " + timeout); 812 } 813 814 /** 815 * Calculate min necessary to start. This is not an absolute. It is just a friction that will 816 * cause us hang around a bit longer waiting on RegionServers to check-in. 817 */ 818 private int getMinToStart() { 819 if (master.isInMaintenanceMode()) { 820 // If in maintenance mode, then in process region server hosting meta will be the only server 821 // available 822 return 1; 823 } 824 825 int minimumRequired = 1; 826 int minToStart = this.master.getConfiguration().getInt(WAIT_ON_REGIONSERVERS_MINTOSTART, -1); 827 // Ensure we are never less than minimumRequired else stuff won't work. 828 return Math.max(minToStart, minimumRequired); 829 } 830 831 /** 832 * Wait for the region servers to report in. We will wait until one of this condition is met: - 833 * the master is stopped - the 'hbase.master.wait.on.regionservers.maxtostart' number of region 834 * servers is reached - the 'hbase.master.wait.on.regionservers.mintostart' is reached AND there 835 * have been no new region server in for 'hbase.master.wait.on.regionservers.interval' time AND 836 * the 'hbase.master.wait.on.regionservers.timeout' is reached 837 */ 838 public void waitForRegionServers(MonitoredTask status) throws InterruptedException { 839 final long interval = 840 this.master.getConfiguration().getLong(WAIT_ON_REGIONSERVERS_INTERVAL, 1500); 841 final long timeout = 842 this.master.getConfiguration().getLong(WAIT_ON_REGIONSERVERS_TIMEOUT, 4500); 843 // Min is not an absolute; just a friction making us wait longer on server checkin. 844 int minToStart = getMinToStart(); 845 int maxToStart = 846 this.master.getConfiguration().getInt(WAIT_ON_REGIONSERVERS_MAXTOSTART, Integer.MAX_VALUE); 847 if (maxToStart < minToStart) { 848 LOG.warn(String.format("The value of '%s' (%d) is set less than '%s' (%d), ignoring.", 849 WAIT_ON_REGIONSERVERS_MAXTOSTART, maxToStart, WAIT_ON_REGIONSERVERS_MINTOSTART, 850 minToStart)); 851 maxToStart = Integer.MAX_VALUE; 852 } 853 854 long now = EnvironmentEdgeManager.currentTime(); 855 final long startTime = now; 856 long slept = 0; 857 long lastLogTime = 0; 858 long lastCountChange = startTime; 859 int count = countOfRegionServers(); 860 int oldCount = 0; 861 // This while test is a little hard to read. We try to comment it in below but in essence: 862 // Wait if Master is not stopped and the number of regionservers that have checked-in is 863 // less than the maxToStart. Both of these conditions will be true near universally. 864 // Next, we will keep cycling if ANY of the following three conditions are true: 865 // 1. The time since a regionserver registered is < interval (means servers are actively 866 // checking in). 867 // 2. We are under the total timeout. 868 // 3. The count of servers is < minimum. 869 for (ServerListener listener : this.listeners) { 870 listener.waiting(); 871 } 872 while ( 873 !this.master.isStopped() && !isClusterShutdown() && count < maxToStart 874 && ((lastCountChange + interval) > now || timeout > slept || count < minToStart) 875 ) { 876 // Log some info at every interval time or if there is a change 877 if (oldCount != count || lastLogTime + interval < now) { 878 lastLogTime = now; 879 String msg = 880 "Waiting on regionserver count=" + count + "; waited=" + slept + "ms, expecting min=" 881 + minToStart + " server(s), max=" + getStrForMax(maxToStart) + " server(s), " 882 + "timeout=" + timeout + "ms, lastChange=" + (now - lastCountChange) + "ms"; 883 LOG.info(msg); 884 status.setStatus(msg); 885 } 886 887 // We sleep for some time 888 final long sleepTime = 50; 889 Thread.sleep(sleepTime); 890 now = EnvironmentEdgeManager.currentTime(); 891 slept = now - startTime; 892 893 oldCount = count; 894 count = countOfRegionServers(); 895 if (count != oldCount) { 896 lastCountChange = now; 897 } 898 } 899 // Did we exit the loop because cluster is going down? 900 if (isClusterShutdown()) { 901 this.master.stop("Cluster shutdown"); 902 } 903 LOG.info("Finished waiting on RegionServer count=" + count + "; waited=" + slept + "ms," 904 + " expected min=" + minToStart + " server(s), max=" + getStrForMax(maxToStart) 905 + " server(s)," + " master is " + (this.master.isStopped() ? "stopped." : "running")); 906 } 907 908 private String getStrForMax(final int max) { 909 return max == Integer.MAX_VALUE ? "NO_LIMIT" : Integer.toString(max); 910 } 911 912 /** Returns A copy of the internal list of online servers. */ 913 public List<ServerName> getOnlineServersList() { 914 // TODO: optimize the load balancer call so we don't need to make a new list 915 // TODO: FIX. THIS IS POPULAR CALL. 916 return new ArrayList<>(this.onlineServers.keySet()); 917 } 918 919 /** 920 * @param keys The target server name 921 * @param idleServerPredicator Evaluates the server on the given load 922 * @return A copy of the internal list of online servers matched by the predicator 923 */ 924 public List<ServerName> getOnlineServersListWithPredicator(List<ServerName> keys, 925 Predicate<ServerMetrics> idleServerPredicator) { 926 List<ServerName> names = new ArrayList<>(); 927 if (keys != null && idleServerPredicator != null) { 928 keys.forEach(name -> { 929 ServerMetrics load = onlineServers.get(name); 930 if (load != null) { 931 if (idleServerPredicator.test(load)) { 932 names.add(name); 933 } 934 } 935 }); 936 } 937 return names; 938 } 939 940 /** Returns A copy of the internal list of draining servers. */ 941 public List<ServerName> getDrainingServersList() { 942 return new ArrayList<>(this.drainingServers); 943 } 944 945 public boolean isServerOnline(ServerName serverName) { 946 return serverName != null && onlineServers.containsKey(serverName); 947 } 948 949 public enum ServerLiveState { 950 LIVE, 951 DEAD, 952 UNKNOWN 953 } 954 955 /** Returns whether the server is online, dead, or unknown. */ 956 public synchronized ServerLiveState isServerKnownAndOnline(ServerName serverName) { 957 return onlineServers.containsKey(serverName) 958 ? ServerLiveState.LIVE 959 : (deadservers.isDeadServer(serverName) ? ServerLiveState.DEAD : ServerLiveState.UNKNOWN); 960 } 961 962 /** 963 * Check if a server is known to be dead. A server can be online, or known to be dead, or unknown 964 * to this manager (i.e, not online, not known to be dead either; it is simply not tracked by the 965 * master any more, for example, a very old previous instance). 966 */ 967 public synchronized boolean isServerDead(ServerName serverName) { 968 return serverName == null || deadservers.isDeadServer(serverName); 969 } 970 971 /** 972 * Check if a server is unknown. A server can be online, or known to be dead, or unknown to this 973 * manager (i.e, not online, not known to be dead either; it is simply not tracked by the master 974 * any more, for example, a very old previous instance). 975 */ 976 public boolean isServerUnknown(ServerName serverName) { 977 return serverName == null 978 || (!onlineServers.containsKey(serverName) && !deadservers.isDeadServer(serverName)); 979 } 980 981 public void shutdownCluster() { 982 String statusStr = "Cluster shutdown requested of master=" + this.master.getServerName(); 983 LOG.info(statusStr); 984 this.clusterShutdown.set(true); 985 if (onlineServers.isEmpty()) { 986 // we do not synchronize here so this may cause a double stop, but not a big deal 987 master.stop("OnlineServer=0 right after cluster shutdown set"); 988 } 989 } 990 991 public boolean isClusterShutdown() { 992 return this.clusterShutdown.get(); 993 } 994 995 /** 996 * start chore in ServerManager 997 */ 998 public void startChore() { 999 Configuration c = master.getConfiguration(); 1000 if (persistFlushedSequenceId) { 1001 new Thread(() -> { 1002 // after AM#loadMeta, RegionStates should be loaded, and some regions are 1003 // deleted by drop/split/merge during removeDeletedRegionFromLoadedFlushedSequenceIds, 1004 // but these deleted regions are not added back to RegionStates, 1005 // so we can safely remove deleted regions. 1006 removeDeletedRegionFromLoadedFlushedSequenceIds(); 1007 }, "RemoveDeletedRegionSyncThread").start(); 1008 int flushPeriod = 1009 c.getInt(FLUSHEDSEQUENCEID_FLUSHER_INTERVAL, FLUSHEDSEQUENCEID_FLUSHER_INTERVAL_DEFAULT); 1010 flushedSeqIdFlusher = new FlushedSequenceIdFlusher("FlushedSequenceIdFlusher", flushPeriod); 1011 master.getChoreService().scheduleChore(flushedSeqIdFlusher); 1012 } 1013 } 1014 1015 /** 1016 * Stop the ServerManager. 1017 */ 1018 public void stop() { 1019 if (flushedSeqIdFlusher != null) { 1020 flushedSeqIdFlusher.shutdown(); 1021 } 1022 if (persistFlushedSequenceId) { 1023 try { 1024 persistRegionLastFlushedSequenceIds(); 1025 } catch (IOException e) { 1026 LOG.warn("Failed to persist last flushed sequence id of regions" + " to file system", e); 1027 } 1028 } 1029 } 1030 1031 /** 1032 * Creates a list of possible destinations for a region. It contains the online servers, but not 1033 * the draining or dying servers. 1034 * @param serversToExclude can be null if there is no server to exclude 1035 */ 1036 public List<ServerName> createDestinationServersList(final List<ServerName> serversToExclude) { 1037 Set<ServerName> destServers = new HashSet<>(); 1038 onlineServers.forEach((sn, sm) -> { 1039 if (sm.getLastReportTimestamp() > 0) { 1040 // This means we have already called regionServerReport at leaset once, then let's include 1041 // this server for region assignment. This is an optimization to avoid assigning regions to 1042 // an uninitialized server. See HBASE-25032 for more details. 1043 destServers.add(sn); 1044 } 1045 }); 1046 1047 if (serversToExclude != null) { 1048 destServers.removeAll(serversToExclude); 1049 } 1050 1051 // Loop through the draining server list and remove them from the server list 1052 final List<ServerName> drainingServersCopy = getDrainingServersList(); 1053 destServers.removeAll(drainingServersCopy); 1054 1055 return new ArrayList<>(destServers); 1056 } 1057 1058 /** 1059 * Calls {@link #createDestinationServersList} without server to exclude. 1060 */ 1061 public List<ServerName> createDestinationServersList() { 1062 return createDestinationServersList(null); 1063 } 1064 1065 /** 1066 * To clear any dead server with same host name and port of any online server 1067 */ 1068 void clearDeadServersWithSameHostNameAndPortOfOnlineServer() { 1069 for (ServerName serverName : getOnlineServersList()) { 1070 deadservers.cleanAllPreviousInstances(serverName); 1071 } 1072 } 1073 1074 /** 1075 * Called by delete table and similar to notify the ServerManager that a region was removed. 1076 */ 1077 public void removeRegion(final RegionInfo regionInfo) { 1078 final byte[] encodedName = regionInfo.getEncodedNameAsBytes(); 1079 storeFlushedSequenceIdsByRegion.remove(encodedName); 1080 flushedSequenceIdByRegion.remove(encodedName); 1081 } 1082 1083 public boolean isRegionInServerManagerStates(final RegionInfo hri) { 1084 final byte[] encodedName = hri.getEncodedNameAsBytes(); 1085 return (storeFlushedSequenceIdsByRegion.containsKey(encodedName) 1086 || flushedSequenceIdByRegion.containsKey(encodedName)); 1087 } 1088 1089 /** 1090 * Called by delete table and similar to notify the ServerManager that a region was removed. 1091 */ 1092 public void removeRegions(final List<RegionInfo> regions) { 1093 for (RegionInfo hri : regions) { 1094 removeRegion(hri); 1095 } 1096 } 1097 1098 /** 1099 * May return 0 when server is not online. 1100 */ 1101 public int getVersionNumber(ServerName serverName) { 1102 ServerMetrics serverMetrics = onlineServers.get(serverName); 1103 return serverMetrics != null ? serverMetrics.getVersionNumber() : 0; 1104 } 1105 1106 /** 1107 * May return "0.0.0" when server is not online 1108 */ 1109 public String getVersion(ServerName serverName) { 1110 ServerMetrics serverMetrics = onlineServers.get(serverName); 1111 return serverMetrics != null ? serverMetrics.getVersion() : "0.0.0"; 1112 } 1113 1114 public int getInfoPort(ServerName serverName) { 1115 ServerMetrics serverMetrics = onlineServers.get(serverName); 1116 return serverMetrics != null ? serverMetrics.getInfoServerPort() : 0; 1117 } 1118 1119 /** 1120 * Persist last flushed sequence id of each region to HDFS 1121 * @throws IOException if persit to HDFS fails 1122 */ 1123 private void persistRegionLastFlushedSequenceIds() throws IOException { 1124 if (isFlushSeqIdPersistInProgress) { 1125 return; 1126 } 1127 isFlushSeqIdPersistInProgress = true; 1128 try { 1129 Configuration conf = master.getConfiguration(); 1130 Path rootDir = CommonFSUtils.getRootDir(conf); 1131 Path lastFlushedSeqIdPath = new Path(rootDir, LAST_FLUSHED_SEQ_ID_FILE); 1132 FileSystem fs = FileSystem.get(conf); 1133 if (fs.exists(lastFlushedSeqIdPath)) { 1134 LOG.info("Rewriting .lastflushedseqids file at: " + lastFlushedSeqIdPath); 1135 if (!fs.delete(lastFlushedSeqIdPath, false)) { 1136 throw new IOException("Unable to remove existing " + lastFlushedSeqIdPath); 1137 } 1138 } else { 1139 LOG.info("Writing .lastflushedseqids file at: " + lastFlushedSeqIdPath); 1140 } 1141 FSDataOutputStream out = fs.create(lastFlushedSeqIdPath); 1142 FlushedSequenceId.Builder flushedSequenceIdBuilder = FlushedSequenceId.newBuilder(); 1143 try { 1144 for (Entry<byte[], Long> entry : flushedSequenceIdByRegion.entrySet()) { 1145 FlushedRegionSequenceId.Builder flushedRegionSequenceIdBuilder = 1146 FlushedRegionSequenceId.newBuilder(); 1147 flushedRegionSequenceIdBuilder.setRegionEncodedName(ByteString.copyFrom(entry.getKey())); 1148 flushedRegionSequenceIdBuilder.setSeqId(entry.getValue()); 1149 ConcurrentNavigableMap<byte[], Long> storeSeqIds = 1150 storeFlushedSequenceIdsByRegion.get(entry.getKey()); 1151 if (storeSeqIds != null) { 1152 for (Entry<byte[], Long> store : storeSeqIds.entrySet()) { 1153 FlushedStoreSequenceId.Builder flushedStoreSequenceIdBuilder = 1154 FlushedStoreSequenceId.newBuilder(); 1155 flushedStoreSequenceIdBuilder.setFamily(ByteString.copyFrom(store.getKey())); 1156 flushedStoreSequenceIdBuilder.setSeqId(store.getValue()); 1157 flushedRegionSequenceIdBuilder.addStores(flushedStoreSequenceIdBuilder); 1158 } 1159 } 1160 flushedSequenceIdBuilder.addRegionSequenceId(flushedRegionSequenceIdBuilder); 1161 } 1162 flushedSequenceIdBuilder.build().writeDelimitedTo(out); 1163 } finally { 1164 if (out != null) { 1165 out.close(); 1166 } 1167 } 1168 } finally { 1169 isFlushSeqIdPersistInProgress = false; 1170 } 1171 } 1172 1173 /** 1174 * Load last flushed sequence id of each region from HDFS, if persisted 1175 */ 1176 public void loadLastFlushedSequenceIds() throws IOException { 1177 if (!persistFlushedSequenceId) { 1178 return; 1179 } 1180 Configuration conf = master.getConfiguration(); 1181 Path rootDir = CommonFSUtils.getRootDir(conf); 1182 Path lastFlushedSeqIdPath = new Path(rootDir, LAST_FLUSHED_SEQ_ID_FILE); 1183 FileSystem fs = FileSystem.get(conf); 1184 if (!fs.exists(lastFlushedSeqIdPath)) { 1185 LOG.info("No .lastflushedseqids found at " + lastFlushedSeqIdPath 1186 + " will record last flushed sequence id" 1187 + " for regions by regionserver report all over again"); 1188 return; 1189 } else { 1190 LOG.info("begin to load .lastflushedseqids at " + lastFlushedSeqIdPath); 1191 } 1192 FSDataInputStream in = fs.open(lastFlushedSeqIdPath); 1193 try { 1194 FlushedSequenceId flushedSequenceId = FlushedSequenceId.parseDelimitedFrom(in); 1195 if (flushedSequenceId == null) { 1196 LOG.info(".lastflushedseqids found at {} is empty", lastFlushedSeqIdPath); 1197 return; 1198 } 1199 for (FlushedRegionSequenceId flushedRegionSequenceId : flushedSequenceId 1200 .getRegionSequenceIdList()) { 1201 byte[] encodedRegionName = flushedRegionSequenceId.getRegionEncodedName().toByteArray(); 1202 flushedSequenceIdByRegion.putIfAbsent(encodedRegionName, 1203 flushedRegionSequenceId.getSeqId()); 1204 if ( 1205 flushedRegionSequenceId.getStoresList() != null 1206 && flushedRegionSequenceId.getStoresList().size() != 0 1207 ) { 1208 ConcurrentNavigableMap<byte[], Long> storeFlushedSequenceId = 1209 computeIfAbsent(storeFlushedSequenceIdsByRegion, encodedRegionName, 1210 () -> new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR)); 1211 for (FlushedStoreSequenceId flushedStoreSequenceId : flushedRegionSequenceId 1212 .getStoresList()) { 1213 storeFlushedSequenceId.put(flushedStoreSequenceId.getFamily().toByteArray(), 1214 flushedStoreSequenceId.getSeqId()); 1215 } 1216 } 1217 } 1218 } finally { 1219 in.close(); 1220 } 1221 } 1222 1223 /** 1224 * Regions may have been removed between latest persist of FlushedSequenceIds and master abort. So 1225 * after loading FlushedSequenceIds from file, and after meta loaded, we need to remove the 1226 * deleted region according to RegionStates. 1227 */ 1228 public void removeDeletedRegionFromLoadedFlushedSequenceIds() { 1229 RegionStates regionStates = master.getAssignmentManager().getRegionStates(); 1230 Iterator<byte[]> it = flushedSequenceIdByRegion.keySet().iterator(); 1231 while (it.hasNext()) { 1232 byte[] regionEncodedName = it.next(); 1233 if (regionStates.getRegionState(Bytes.toStringBinary(regionEncodedName)) == null) { 1234 it.remove(); 1235 storeFlushedSequenceIdsByRegion.remove(regionEncodedName); 1236 } 1237 } 1238 } 1239 1240 private class FlushedSequenceIdFlusher extends ScheduledChore { 1241 1242 public FlushedSequenceIdFlusher(String name, int p) { 1243 super(name, master, p, 60 * 1000); // delay one minute before first execute 1244 } 1245 1246 @Override 1247 protected void chore() { 1248 try { 1249 persistRegionLastFlushedSequenceIds(); 1250 } catch (IOException e) { 1251 LOG.debug("Failed to persist last flushed sequence id of regions" + " to file system", e); 1252 } 1253 } 1254 } 1255}