001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.replication; 019 020import static org.apache.hadoop.hbase.replication.ReplicationUtils.getPeerRemoteWALDir; 021import static org.apache.hadoop.hbase.replication.ReplicationUtils.getPeerReplayWALDir; 022import static org.apache.hadoop.hbase.replication.ReplicationUtils.getPeerSnapshotWALDir; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.HashSet; 027import java.util.List; 028import java.util.Optional; 029import java.util.Set; 030import java.util.concurrent.ConcurrentHashMap; 031import java.util.concurrent.ConcurrentMap; 032import org.apache.hadoop.fs.FileStatus; 033import org.apache.hadoop.fs.FileSystem; 034import org.apache.hadoop.fs.Path; 035import org.apache.hadoop.hbase.ServerName; 036import org.apache.hadoop.hbase.master.MasterServices; 037import org.apache.hadoop.hbase.master.ServerListener; 038import org.apache.hadoop.hbase.master.ServerManager; 039import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler; 040import org.apache.hadoop.hbase.procedure2.Procedure; 041import org.apache.hadoop.hbase.procedure2.ProcedureEvent; 042import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; 043import org.apache.hadoop.hbase.replication.ReplicationException; 044import org.apache.hadoop.hbase.replication.ReplicationUtils; 045import org.apache.hadoop.hbase.util.FSUtils; 046import org.apache.yetus.audience.InterfaceAudience; 047import org.slf4j.Logger; 048import org.slf4j.LoggerFactory; 049 050/** 051 * The manager for replaying remote wal. 052 * <p/> 053 * First, it will be used to balance the replay work across all the region servers. We will record 054 * the region servers which have already been used for replaying wal, and prevent sending new replay 055 * work to it, until the previous replay work has been done, where we will remove the region server 056 * from the used worker set. See the comment for {@code UsedReplayWorkersForPeer} for more details. 057 * <p/> 058 * Second, the logic for managing the remote wal directory is kept here. Before replaying the wals, 059 * we will rename the remote wal directory, the new name is called 'replay' directory, see 060 * {@link #renameToPeerReplayWALDir(String)}. This is used to prevent further writing of remote 061 * wals, which is very important for keeping consistency. And then we will start replaying all the 062 * wals, once a wal has been replayed, we will truncate the file, so that if there are crashes 063 * happen, we do not need to replay all the wals again, see {@link #finishReplayWAL(String)} and 064 * {@link #isReplayWALFinished(String)}. After replaying all the wals, we will rename the 'replay' 065 * directory, the new name is called 'snapshot' directory. In the directory, we will keep all the 066 * names for the wals being replayed, since all the files should have been truncated. When we 067 * transitting original the ACTIVE cluster to STANDBY later, and there are region server crashes, we 068 * will see the wals in this directory to determine whether a wal should be split and replayed or 069 * not. You can see the code in {@link org.apache.hadoop.hbase.regionserver.SplitLogWorker} for more 070 * details. 071 */ 072@InterfaceAudience.Private 073public class SyncReplicationReplayWALManager { 074 075 private static final Logger LOG = LoggerFactory.getLogger(SyncReplicationReplayWALManager.class); 076 077 private final ServerManager serverManager; 078 079 private final FileSystem fs; 080 081 private final Path walRootDir; 082 083 private final Path remoteWALDir; 084 085 /** 086 * This class is used to record the used workers(region servers) for a replication peer. For 087 * balancing the replaying remote wal job, we will only schedule one remote replay procedure each 088 * time. So when acquiring a worker, we will first get all the region servers for this cluster, 089 * and then filter out the used ones. 090 * <p/> 091 * The {@link ProcedureEvent} is used for notifying procedures that there are available workers 092 * now. We used to use sleeping and retrying before, but if the interval is too large, for 093 * example, exponential backoff, then it is not effective, but if the interval is too small, then 094 * we will flood the procedure wal. 095 * <p/> 096 * The states are only kept in memory, so when restarting, we need to reconstruct these 097 * information, using the information stored in related procedures. See the {@code afterReplay} 098 * method in {@link RecoverStandbyProcedure} and {@link SyncReplicationReplayWALProcedure} for 099 * more details. 100 */ 101 private static final class UsedReplayWorkersForPeer { 102 103 private final Set<ServerName> usedWorkers = new HashSet<ServerName>(); 104 105 private final ProcedureEvent<?> event; 106 107 public UsedReplayWorkersForPeer(String peerId) { 108 this.event = new ProcedureEvent<>(peerId); 109 } 110 111 public void used(ServerName worker) { 112 usedWorkers.add(worker); 113 } 114 115 public Optional<ServerName> acquire(ServerManager serverManager) { 116 Optional<ServerName> worker = serverManager.getOnlineServers().keySet().stream() 117 .filter(server -> !usedWorkers.contains(server)).findAny(); 118 worker.ifPresent(usedWorkers::add); 119 return worker; 120 } 121 122 public void release(ServerName worker) { 123 usedWorkers.remove(worker); 124 } 125 126 public void suspend(Procedure<?> proc) { 127 event.suspend(); 128 event.suspendIfNotReady(proc); 129 } 130 131 public void wake(MasterProcedureScheduler scheduler) { 132 if (!event.isReady()) { 133 event.wake(scheduler); 134 } 135 } 136 } 137 138 private final ConcurrentMap<String, UsedReplayWorkersForPeer> usedWorkersByPeer = 139 new ConcurrentHashMap<>(); 140 141 public SyncReplicationReplayWALManager(MasterServices services) 142 throws IOException, ReplicationException { 143 this.serverManager = services.getServerManager(); 144 this.fs = services.getMasterFileSystem().getWALFileSystem(); 145 this.walRootDir = services.getMasterFileSystem().getWALRootDir(); 146 this.remoteWALDir = new Path(this.walRootDir, ReplicationUtils.REMOTE_WAL_DIR_NAME); 147 serverManager.registerListener(new ServerListener() { 148 149 @Override 150 public void serverAdded(ServerName serverName) { 151 MasterProcedureScheduler scheduler = 152 services.getMasterProcedureExecutor().getEnvironment().getProcedureScheduler(); 153 for (UsedReplayWorkersForPeer usedWorkers : usedWorkersByPeer.values()) { 154 synchronized (usedWorkers) { 155 usedWorkers.wake(scheduler); 156 } 157 } 158 } 159 }); 160 } 161 162 public void registerPeer(String peerId) { 163 usedWorkersByPeer.put(peerId, new UsedReplayWorkersForPeer(peerId)); 164 } 165 166 public void unregisterPeer(String peerId) { 167 usedWorkersByPeer.remove(peerId); 168 } 169 170 /** 171 * Get a worker for replaying remote wal for a give peer. If no worker available, i.e, all the 172 * region servers have been used by others, a {@link ProcedureSuspendedException} will be thrown 173 * to suspend the procedure. And it will be woken up later when there are available workers, 174 * either by others release a worker, or there is a new region server joins the cluster. 175 */ 176 public ServerName acquirePeerWorker(String peerId, Procedure<?> proc) 177 throws ProcedureSuspendedException { 178 UsedReplayWorkersForPeer usedWorkers = usedWorkersByPeer.get(peerId); 179 synchronized (usedWorkers) { 180 Optional<ServerName> worker = usedWorkers.acquire(serverManager); 181 if (worker.isPresent()) { 182 return worker.get(); 183 } 184 // no worker available right now, suspend the procedure 185 usedWorkers.suspend(proc); 186 } 187 throw new ProcedureSuspendedException(); 188 } 189 190 public void releasePeerWorker(String peerId, ServerName worker, 191 MasterProcedureScheduler scheduler) { 192 UsedReplayWorkersForPeer usedWorkers = usedWorkersByPeer.get(peerId); 193 synchronized (usedWorkers) { 194 usedWorkers.release(worker); 195 usedWorkers.wake(scheduler); 196 } 197 } 198 199 /** 200 * Will only be called when loading procedures, where we need to construct the used worker set for 201 * each peer. 202 */ 203 public void addUsedPeerWorker(String peerId, ServerName worker) { 204 usedWorkersByPeer.get(peerId).used(worker); 205 } 206 207 public void createPeerRemoteWALDir(String peerId) throws IOException { 208 Path peerRemoteWALDir = getPeerRemoteWALDir(remoteWALDir, peerId); 209 if (!fs.exists(peerRemoteWALDir) && !fs.mkdirs(peerRemoteWALDir)) { 210 throw new IOException("Unable to mkdir " + peerRemoteWALDir); 211 } 212 } 213 214 private void rename(Path src, Path dst, String peerId) throws IOException { 215 if (fs.exists(src)) { 216 deleteDir(dst, peerId); 217 if (!fs.rename(src, dst)) { 218 throw new IOException( 219 "Failed to rename dir from " + src + " to " + dst + " for peer id=" + peerId); 220 } 221 LOG.info("Renamed dir from {} to {} for peer id={}", src, dst, peerId); 222 } else if (!fs.exists(dst)) { 223 throw new IOException( 224 "Want to rename from " + src + " to " + dst + ", but they both do not exist"); 225 } 226 } 227 228 public void renameToPeerReplayWALDir(String peerId) throws IOException { 229 rename(getPeerRemoteWALDir(remoteWALDir, peerId), getPeerReplayWALDir(remoteWALDir, peerId), 230 peerId); 231 } 232 233 public void renameToPeerSnapshotWALDir(String peerId) throws IOException { 234 rename(getPeerReplayWALDir(remoteWALDir, peerId), getPeerSnapshotWALDir(remoteWALDir, peerId), 235 peerId); 236 } 237 238 public List<Path> getReplayWALsAndCleanUpUnusedFiles(String peerId) throws IOException { 239 Path peerReplayWALDir = getPeerReplayWALDir(remoteWALDir, peerId); 240 for (FileStatus status : fs.listStatus(peerReplayWALDir, 241 p -> p.getName().endsWith(ReplicationUtils.RENAME_WAL_SUFFIX))) { 242 Path src = status.getPath(); 243 String srcName = src.getName(); 244 String dstName = 245 srcName.substring(0, srcName.length() - ReplicationUtils.RENAME_WAL_SUFFIX.length()); 246 FSUtils.renameFile(fs, src, new Path(src.getParent(), dstName)); 247 } 248 List<Path> wals = new ArrayList<>(); 249 for (FileStatus status : fs.listStatus(peerReplayWALDir)) { 250 Path path = status.getPath(); 251 if (path.getName().endsWith(ReplicationUtils.SYNC_WAL_SUFFIX)) { 252 wals.add(path); 253 } else { 254 if (!fs.delete(path, true)) { 255 LOG.warn("Can not delete unused file: " + path); 256 } 257 } 258 } 259 return wals; 260 } 261 262 private void deleteDir(Path dir, String peerId) throws IOException { 263 if (!fs.delete(dir, true) && fs.exists(dir)) { 264 throw new IOException("Failed to remove dir " + dir + " for peer id=" + peerId); 265 } 266 } 267 268 public void removePeerRemoteWALs(String peerId) throws IOException { 269 deleteDir(getPeerRemoteWALDir(remoteWALDir, peerId), peerId); 270 deleteDir(getPeerReplayWALDir(remoteWALDir, peerId), peerId); 271 deleteDir(getPeerSnapshotWALDir(remoteWALDir, peerId), peerId); 272 } 273 274 public String removeWALRootPath(Path path) { 275 String pathStr = path.toString(); 276 // remove the "/" too. 277 return pathStr.substring(walRootDir.toString().length() + 1); 278 } 279 280 public void finishReplayWAL(String wal) throws IOException { 281 Path walPath = new Path(walRootDir, wal); 282 fs.truncate(walPath, 0); 283 } 284 285 public boolean isReplayWALFinished(String wal) throws IOException { 286 Path walPath = new Path(walRootDir, wal); 287 return fs.getFileStatus(walPath).getLen() == 0; 288 } 289 290 public Path getRemoteWALDir() { 291 return remoteWALDir; 292 } 293}