001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master; 019 020import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER; 021import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER; 022import static org.apache.hadoop.hbase.master.MasterWalManager.META_FILTER; 023import static org.apache.hadoop.hbase.master.MasterWalManager.NON_META_FILTER; 024 025import java.io.IOException; 026import java.util.Arrays; 027import java.util.Collections; 028import java.util.HashMap; 029import java.util.List; 030import java.util.Map; 031import java.util.Optional; 032import java.util.stream.Collectors; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.fs.FileStatus; 035import org.apache.hadoop.fs.FileSystem; 036import org.apache.hadoop.fs.Path; 037import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException; 038import org.apache.hadoop.hbase.HConstants; 039import org.apache.hadoop.hbase.ServerName; 040import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler; 041import org.apache.hadoop.hbase.master.procedure.SplitWALProcedure; 042import org.apache.hadoop.hbase.procedure2.Procedure; 043import org.apache.hadoop.hbase.procedure2.ProcedureEvent; 044import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; 045import org.apache.hadoop.hbase.util.CommonFSUtils; 046import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 047import org.apache.hadoop.hbase.wal.WALSplitUtil; 048import org.apache.yetus.audience.InterfaceAudience; 049import org.slf4j.Logger; 050import org.slf4j.LoggerFactory; 051 052/** 053 * Create {@link SplitWALProcedure} for each WAL which need to split. Manage the workers for each 054 * {@link SplitWALProcedure}. Total number of workers is (number of online servers) * 055 * (HBASE_SPLIT_WAL_MAX_SPLITTER). Helps assign and release workers for split tasks. Provide helper 056 * method to delete split WAL file and directory. The user can get the SplitWALProcedures via 057 * splitWALs(crashedServer, splitMeta) can get the files that need to split via 058 * getWALsToSplit(crashedServer, splitMeta) can delete the splitting WAL and directory via 059 * deleteSplitWAL(wal) and deleteSplitWAL(crashedServer) can check if splitting WALs of a crashed 060 * server is success via isSplitWALFinished(walPath) can acquire and release a worker for splitting 061 * WAL via acquireSplitWALWorker(procedure) and releaseSplitWALWorker(worker, scheduler) This class 062 * is to replace the zk-based WAL splitting related code, {@link MasterWalManager}, 063 * {@link SplitLogManager}, {@link org.apache.hadoop.hbase.zookeeper.ZKSplitLog} and 064 * {@link org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination} can be removed after 065 * we switch to procedure-based WAL splitting. 066 * @see SplitLogManager for the original distributed split WAL manager. 067 */ 068@InterfaceAudience.Private 069public class SplitWALManager { 070 private static final Logger LOG = LoggerFactory.getLogger(SplitWALManager.class); 071 072 private final MasterServices master; 073 private final SplitWorkerAssigner splitWorkerAssigner; 074 private final Path rootDir; 075 private final FileSystem fs; 076 private final Configuration conf; 077 private final Path walArchiveDir; 078 079 public SplitWALManager(MasterServices master) throws IOException { 080 this.master = master; 081 this.conf = master.getConfiguration(); 082 this.splitWorkerAssigner = new SplitWorkerAssigner(this.master, 083 conf.getInt(HBASE_SPLIT_WAL_MAX_SPLITTER, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER)); 084 this.rootDir = master.getMasterFileSystem().getWALRootDir(); 085 this.fs = master.getMasterFileSystem().getWALFileSystem(); 086 this.walArchiveDir = new Path(this.rootDir, HConstants.HREGION_OLDLOGDIR_NAME); 087 } 088 089 public List<Procedure> splitWALs(ServerName crashedServer, boolean splitMeta) throws IOException { 090 try { 091 // 1. list all splitting files 092 List<FileStatus> splittingFiles = getWALsToSplit(crashedServer, splitMeta); 093 // 2. create corresponding procedures 094 return createSplitWALProcedures(splittingFiles, crashedServer); 095 } catch (IOException e) { 096 LOG.error("Failed to create procedures for splitting WALs of {}", crashedServer, e); 097 throw e; 098 } 099 } 100 101 public List<FileStatus> getWALsToSplit(ServerName serverName, boolean splitMeta) 102 throws IOException { 103 List<Path> logDirs = master.getMasterWalManager().getLogDirs(Collections.singleton(serverName)); 104 List<FileStatus> fileStatuses = 105 SplitLogManager.getFileList(this.conf, logDirs, splitMeta ? META_FILTER : NON_META_FILTER); 106 LOG.info("{} WAL count={}, meta={}", serverName, fileStatuses.size(), splitMeta); 107 return fileStatuses; 108 } 109 110 private Path getWALSplitDir(ServerName serverName) { 111 Path logDir = 112 new Path(this.rootDir, AbstractFSWALProvider.getWALDirectoryName(serverName.toString())); 113 return logDir.suffix(AbstractFSWALProvider.SPLITTING_EXT); 114 } 115 116 /** 117 * Archive processed WAL 118 */ 119 public void archive(String wal) throws IOException { 120 WALSplitUtil.moveWAL(this.fs, new Path(wal), this.walArchiveDir); 121 } 122 123 public void deleteWALDir(ServerName serverName) throws IOException { 124 Path splitDir = getWALSplitDir(serverName); 125 try { 126 if (!fs.delete(splitDir, false)) { 127 LOG.warn("Failed delete {}, contains {}", splitDir, fs.listFiles(splitDir, true)); 128 } 129 } catch (PathIsNotEmptyDirectoryException e) { 130 FileStatus[] files = CommonFSUtils.listStatus(fs, splitDir); 131 LOG.warn("PathIsNotEmptyDirectoryException {}", 132 Arrays.stream(files).map(f -> f.getPath()).collect(Collectors.toList())); 133 throw e; 134 } 135 } 136 137 public boolean isSplitWALFinished(String walPath) throws IOException { 138 return !fs.exists(new Path(rootDir, walPath)); 139 } 140 141 List<Procedure> createSplitWALProcedures(List<FileStatus> splittingWALs, 142 ServerName crashedServer) { 143 return splittingWALs.stream() 144 .map(wal -> new SplitWALProcedure(wal.getPath().toString(), crashedServer)) 145 .collect(Collectors.toList()); 146 } 147 148 /** 149 * Acquire a split WAL worker 150 * @param procedure split WAL task 151 * @return an available region server which could execute this task 152 * @throws ProcedureSuspendedException if there is no available worker, it will throw this 153 * exception to WAIT the procedure. 154 */ 155 public ServerName acquireSplitWALWorker(Procedure<?> procedure) 156 throws ProcedureSuspendedException { 157 Optional<ServerName> worker = splitWorkerAssigner.acquire(); 158 if (worker.isPresent()) { 159 LOG.debug("Acquired split WAL worker={}", worker.get()); 160 return worker.get(); 161 } 162 splitWorkerAssigner.suspend(procedure); 163 throw new ProcedureSuspendedException(); 164 } 165 166 /** 167 * After the worker finished the split WAL task, it will release the worker, and wake up all the 168 * suspend procedures in the ProcedureEvent 169 * @param worker worker which is about to release 170 * @param scheduler scheduler which is to wake up the procedure event 171 */ 172 public void releaseSplitWALWorker(ServerName worker, MasterProcedureScheduler scheduler) { 173 LOG.debug("Release split WAL worker={}", worker); 174 splitWorkerAssigner.release(worker); 175 splitWorkerAssigner.wake(scheduler); 176 } 177 178 /** 179 * When master restart, there will be a new splitWorkerAssigner. But if there are splitting WAL 180 * tasks running on the region server side, they will not be count by the new splitWorkerAssigner. 181 * Thus we should add the workers of running tasks to the assigner when we load the procedures 182 * from MasterProcWALs. 183 * @param worker region server which is executing a split WAL task 184 */ 185 public void addUsedSplitWALWorker(ServerName worker) { 186 splitWorkerAssigner.addUsedWorker(worker); 187 } 188 189 /** 190 * help assign and release a worker for each WAL splitting task For each worker, concurrent 191 * running splitting task should be no more than maxSplitTasks If a task failed to acquire a 192 * worker, it will suspend and wait for workers available 193 */ 194 private static final class SplitWorkerAssigner implements ServerListener { 195 private int maxSplitTasks; 196 private final ProcedureEvent<?> event; 197 private Map<ServerName, Integer> currentWorkers = new HashMap<>(); 198 private MasterServices master; 199 200 public SplitWorkerAssigner(MasterServices master, int maxSplitTasks) { 201 this.maxSplitTasks = maxSplitTasks; 202 this.master = master; 203 this.event = new ProcedureEvent<>("split-WAL-worker-assigning"); 204 // ServerManager might be null in a test context where we are mocking; allow for this 205 ServerManager sm = this.master.getServerManager(); 206 if (sm != null) { 207 sm.registerListener(this); 208 } 209 } 210 211 public synchronized Optional<ServerName> acquire() { 212 List<ServerName> serverList = master.getServerManager().getOnlineServersList(); 213 Collections.shuffle(serverList); 214 Optional<ServerName> worker = serverList.stream().filter( 215 serverName -> !currentWorkers.containsKey(serverName) || currentWorkers.get(serverName) > 0) 216 .findAny(); 217 if (worker.isPresent()) { 218 currentWorkers.compute(worker.get(), (serverName, 219 availableWorker) -> availableWorker == null ? maxSplitTasks - 1 : availableWorker - 1); 220 } 221 return worker; 222 } 223 224 public synchronized void release(ServerName serverName) { 225 currentWorkers.compute(serverName, (k, v) -> v == null ? null : v + 1); 226 } 227 228 public void suspend(Procedure<?> proc) { 229 event.suspend(); 230 event.suspendIfNotReady(proc); 231 } 232 233 public void wake(MasterProcedureScheduler scheduler) { 234 if (!event.isReady()) { 235 event.wake(scheduler); 236 } 237 } 238 239 @Override 240 public void serverAdded(ServerName worker) { 241 this.wake(master.getMasterProcedureExecutor().getEnvironment().getProcedureScheduler()); 242 } 243 244 public synchronized void addUsedWorker(ServerName worker) { 245 // load used worker when master restart 246 currentWorkers.compute(worker, (serverName, 247 availableWorker) -> availableWorker == null ? maxSplitTasks - 1 : availableWorker - 1); 248 } 249 } 250}