001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master; 019import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER; 020import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER; 021import static org.apache.hadoop.hbase.master.MasterWalManager.META_FILTER; 022import static org.apache.hadoop.hbase.master.MasterWalManager.NON_META_FILTER; 023import java.io.IOException; 024import java.util.Arrays; 025import java.util.Collections; 026import java.util.HashMap; 027import java.util.List; 028import java.util.Map; 029import java.util.Optional; 030import java.util.stream.Collectors; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.FileStatus; 033import org.apache.hadoop.fs.FileSystem; 034import org.apache.hadoop.fs.Path; 035import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.ServerName; 038import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler; 039import org.apache.hadoop.hbase.master.procedure.SplitWALProcedure; 040import org.apache.hadoop.hbase.procedure2.Procedure; 041import org.apache.hadoop.hbase.procedure2.ProcedureEvent; 042import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; 043import org.apache.hadoop.hbase.util.CommonFSUtils; 044import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 045import org.apache.hadoop.hbase.wal.WALSplitUtil; 046import org.apache.yetus.audience.InterfaceAudience; 047import org.slf4j.Logger; 048import org.slf4j.LoggerFactory; 049 050/** 051 * Create {@link SplitWALProcedure} for each WAL which need to split. Manage the workers for each 052 * {@link SplitWALProcedure}. 053 * Total number of workers is (number of online servers) * (HBASE_SPLIT_WAL_MAX_SPLITTER). 054 * Helps assign and release workers for split tasks. 055 * Provide helper method to delete split WAL file and directory. 056 * 057 * The user can get the SplitWALProcedures via splitWALs(crashedServer, splitMeta) 058 * can get the files that need to split via getWALsToSplit(crashedServer, splitMeta) 059 * can delete the splitting WAL and directory via deleteSplitWAL(wal) 060 * and deleteSplitWAL(crashedServer) 061 * can check if splitting WALs of a crashed server is success via isSplitWALFinished(walPath) 062 * can acquire and release a worker for splitting WAL via acquireSplitWALWorker(procedure) 063 * and releaseSplitWALWorker(worker, scheduler) 064 * 065 * This class is to replace the zk-based WAL splitting related code, {@link MasterWalManager}, 066 * {@link SplitLogManager}, {@link org.apache.hadoop.hbase.zookeeper.ZKSplitLog} and 067 * {@link org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination} can be removed 068 * after we switch to procedure-based WAL splitting. 069 * @see SplitLogManager for the original distributed split WAL manager. 070 */ 071@InterfaceAudience.Private 072public class SplitWALManager { 073 private static final Logger LOG = LoggerFactory.getLogger(SplitWALManager.class); 074 075 private final MasterServices master; 076 private final SplitWorkerAssigner splitWorkerAssigner; 077 private final Path rootDir; 078 private final FileSystem fs; 079 private final Configuration conf; 080 private final Path walArchiveDir; 081 082 public SplitWALManager(MasterServices master) throws IOException { 083 this.master = master; 084 this.conf = master.getConfiguration(); 085 this.splitWorkerAssigner = new SplitWorkerAssigner(this.master, 086 conf.getInt(HBASE_SPLIT_WAL_MAX_SPLITTER, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER)); 087 this.rootDir = master.getMasterFileSystem().getWALRootDir(); 088 // TODO: This should be the WAL FS, not the Master FS? 089 this.fs = master.getMasterFileSystem().getFileSystem(); 090 this.walArchiveDir = new Path(this.rootDir, HConstants.HREGION_OLDLOGDIR_NAME); 091 } 092 093 public List<Procedure> splitWALs(ServerName crashedServer, boolean splitMeta) 094 throws IOException { 095 try { 096 // 1. list all splitting files 097 List<FileStatus> splittingFiles = getWALsToSplit(crashedServer, splitMeta); 098 // 2. create corresponding procedures 099 return createSplitWALProcedures(splittingFiles, crashedServer); 100 } catch (IOException e) { 101 LOG.error("Failed to create procedures for splitting WALs of {}", crashedServer, e); 102 throw e; 103 } 104 } 105 106 public List<FileStatus> getWALsToSplit(ServerName serverName, boolean splitMeta) 107 throws IOException { 108 List<Path> logDirs = master.getMasterWalManager().getLogDirs(Collections.singleton(serverName)); 109 List<FileStatus> fileStatuses = 110 SplitLogManager.getFileList(this.conf, logDirs, splitMeta ? META_FILTER : NON_META_FILTER); 111 LOG.info("{} WAL count={}, meta={}", serverName, fileStatuses.size(), splitMeta); 112 return fileStatuses; 113 } 114 115 private Path getWALSplitDir(ServerName serverName) { 116 Path logDir = 117 new Path(this.rootDir, AbstractFSWALProvider.getWALDirectoryName(serverName.toString())); 118 return logDir.suffix(AbstractFSWALProvider.SPLITTING_EXT); 119 } 120 121 /** 122 * Archive processed WAL 123 */ 124 public void archive(String wal) throws IOException { 125 WALSplitUtil.moveWAL(this.fs, new Path(wal), this.walArchiveDir); 126 } 127 128 public void deleteWALDir(ServerName serverName) throws IOException { 129 Path splitDir = getWALSplitDir(serverName); 130 try { 131 if (!fs.delete(splitDir, false)) { 132 LOG.warn("Failed delete {}, contains {}", splitDir, fs.listFiles(splitDir, true)); 133 } 134 } catch (PathIsNotEmptyDirectoryException e) { 135 FileStatus [] files = CommonFSUtils.listStatus(fs, splitDir); 136 LOG.warn("PathIsNotEmptyDirectoryException {}", 137 Arrays.stream(files).map(f -> f.getPath()).collect(Collectors.toList())); 138 throw e; 139 } 140 } 141 142 public boolean isSplitWALFinished(String walPath) throws IOException { 143 return !fs.exists(new Path(rootDir, walPath)); 144 } 145 146 List<Procedure> createSplitWALProcedures(List<FileStatus> splittingWALs, 147 ServerName crashedServer) { 148 return splittingWALs.stream() 149 .map(wal -> new SplitWALProcedure(wal.getPath().toString(), crashedServer)) 150 .collect(Collectors.toList()); 151 } 152 153 /** 154 * Acquire a split WAL worker 155 * @param procedure split WAL task 156 * @return an available region server which could execute this task 157 * @throws ProcedureSuspendedException if there is no available worker, 158 * it will throw this exception to WAIT the procedure. 159 */ 160 public ServerName acquireSplitWALWorker(Procedure<?> procedure) 161 throws ProcedureSuspendedException { 162 Optional<ServerName> worker = splitWorkerAssigner.acquire(); 163 if (worker.isPresent()) { 164 LOG.debug("Acquired split WAL worker={}", worker.get()); 165 return worker.get(); 166 } 167 splitWorkerAssigner.suspend(procedure); 168 throw new ProcedureSuspendedException(); 169 } 170 171 /** 172 * After the worker finished the split WAL task, it will release the worker, and wake up all the 173 * suspend procedures in the ProcedureEvent 174 * @param worker worker which is about to release 175 * @param scheduler scheduler which is to wake up the procedure event 176 */ 177 public void releaseSplitWALWorker(ServerName worker, MasterProcedureScheduler scheduler) { 178 LOG.debug("Release split WAL worker={}", worker); 179 splitWorkerAssigner.release(worker); 180 splitWorkerAssigner.wake(scheduler); 181 } 182 183 /** 184 * When master restart, there will be a new splitWorkerAssigner. But if there are splitting WAL 185 * tasks running on the region server side, they will not be count by the new splitWorkerAssigner. 186 * Thus we should add the workers of running tasks to the assigner when we load the procedures 187 * from MasterProcWALs. 188 * @param worker region server which is executing a split WAL task 189 */ 190 public void addUsedSplitWALWorker(ServerName worker){ 191 splitWorkerAssigner.addUsedWorker(worker); 192 } 193 194 /** 195 * help assign and release a worker for each WAL splitting task 196 * For each worker, concurrent running splitting task should be no more than maxSplitTasks 197 * If a task failed to acquire a worker, it will suspend and wait for workers available 198 * 199 */ 200 private static final class SplitWorkerAssigner implements ServerListener { 201 private int maxSplitTasks; 202 private final ProcedureEvent<?> event; 203 private Map<ServerName, Integer> currentWorkers = new HashMap<>(); 204 private MasterServices master; 205 206 public SplitWorkerAssigner(MasterServices master, int maxSplitTasks) { 207 this.maxSplitTasks = maxSplitTasks; 208 this.master = master; 209 this.event = new ProcedureEvent<>("split-WAL-worker-assigning"); 210 // ServerManager might be null in a test context where we are mocking; allow for this 211 ServerManager sm = this.master.getServerManager(); 212 if (sm != null) { 213 sm.registerListener(this); 214 } 215 } 216 217 public synchronized Optional<ServerName> acquire() { 218 List<ServerName> serverList = master.getServerManager().getOnlineServersList(); 219 Collections.shuffle(serverList); 220 Optional<ServerName> worker = serverList.stream().filter( 221 serverName -> !currentWorkers.containsKey(serverName) || currentWorkers.get(serverName) > 0) 222 .findAny(); 223 if (worker.isPresent()) { 224 currentWorkers.compute(worker.get(), (serverName, 225 availableWorker) -> availableWorker == null ? maxSplitTasks - 1 : availableWorker - 1); 226 } 227 return worker; 228 } 229 230 public synchronized void release(ServerName serverName) { 231 currentWorkers.compute(serverName, (k, v) -> v == null ? null : v + 1); 232 } 233 234 public void suspend(Procedure<?> proc) { 235 event.suspend(); 236 event.suspendIfNotReady(proc); 237 } 238 239 public void wake(MasterProcedureScheduler scheduler) { 240 if (!event.isReady()) { 241 event.wake(scheduler); 242 } 243 } 244 245 @Override 246 public void serverAdded(ServerName worker) { 247 this.wake(master.getMasterProcedureExecutor().getEnvironment().getProcedureScheduler()); 248 } 249 250 public synchronized void addUsedWorker(ServerName worker) { 251 // load used worker when master restart 252 currentWorkers.compute(worker, (serverName, 253 availableWorker) -> availableWorker == null ? maxSplitTasks - 1 : availableWorker - 1); 254 } 255 } 256}