001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master; 019 020import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER; 021import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER; 022import static org.apache.hadoop.hbase.master.MasterWalManager.META_FILTER; 023import static org.apache.hadoop.hbase.master.MasterWalManager.NON_META_FILTER; 024 025import java.io.IOException; 026import java.util.Collections; 027import java.util.HashMap; 028import java.util.List; 029import java.util.Map; 030import java.util.Optional; 031import java.util.stream.Collectors; 032 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.fs.FileStatus; 035import org.apache.hadoop.fs.FileSystem; 036import org.apache.hadoop.fs.Path; 037import org.apache.hadoop.hbase.ServerName; 038import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler; 039import org.apache.hadoop.hbase.master.procedure.SplitWALProcedure; 040import org.apache.hadoop.hbase.procedure2.Procedure; 041import org.apache.hadoop.hbase.procedure2.ProcedureEvent; 042import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; 043import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 044import org.apache.yetus.audience.InterfaceAudience; 045import org.slf4j.Logger; 046import org.slf4j.LoggerFactory; 047 048import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 049import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 050 051/** 052 * Create {@link SplitWALProcedure} for each WAL which need to split. Manage the workers for each 053 * {@link SplitWALProcedure}. 054 * Total number of workers is (number of online servers) * (HBASE_SPLIT_WAL_MAX_SPLITTER). 055 * Helps assign and release workers for split tasks. 056 * Provide helper method to delete split WAL file and directory. 057 * 058 * The user can get the SplitWALProcedures via splitWALs(crashedServer, splitMeta) 059 * can get the files that need to split via getWALsToSplit(crashedServer, splitMeta) 060 * can delete the splitting WAL and directory via deleteSplitWAL(wal) 061 * and deleteSplitWAL(crashedServer) 062 * can check if splitting WALs of a crashed server is success via isSplitWALFinished(walPath) 063 * can acquire and release a worker for splitting WAL via acquireSplitWALWorker(procedure) 064 * and releaseSplitWALWorker(worker, scheduler) 065 * 066 * This class is to replace the zk-based WAL splitting related code, {@link MasterWalManager}, 067 * {@link SplitLogManager}, {@link org.apache.hadoop.hbase.zookeeper.ZKSplitLog} and 068 * {@link org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination} can be removed 069 * after we switch to procedure-based WAL splitting. 070 * @see SplitLogManager for the original distributed split WAL manager. 071 */ 072@InterfaceAudience.Private 073public class SplitWALManager { 074 private static final Logger LOG = LoggerFactory.getLogger(SplitWALManager.class); 075 076 private final MasterServices master; 077 private final SplitWorkerAssigner splitWorkerAssigner; 078 private final Path rootDir; 079 private final FileSystem fs; 080 private final Configuration conf; 081 082 public SplitWALManager(MasterServices master) { 083 this.master = master; 084 this.conf = master.getConfiguration(); 085 this.splitWorkerAssigner = new SplitWorkerAssigner(this.master, 086 conf.getInt(HBASE_SPLIT_WAL_MAX_SPLITTER, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER)); 087 this.rootDir = master.getMasterFileSystem().getWALRootDir(); 088 this.fs = master.getMasterFileSystem().getFileSystem(); 089 090 } 091 092 public List<Procedure> splitWALs(ServerName crashedServer, boolean splitMeta) 093 throws IOException { 094 try { 095 // 1. list all splitting files 096 List<FileStatus> splittingFiles = getWALsToSplit(crashedServer, splitMeta); 097 // 2. create corresponding procedures 098 return createSplitWALProcedures(splittingFiles, crashedServer); 099 } catch (IOException e) { 100 LOG.error("Failed to create procedures for splitting WALs of {}", crashedServer, e); 101 throw e; 102 } 103 } 104 105 public List<FileStatus> getWALsToSplit(ServerName serverName, boolean splitMeta) 106 throws IOException { 107 List<Path> logDirs = master.getMasterWalManager().getLogDirs(Collections.singleton(serverName)); 108 FileStatus[] fileStatuses = 109 SplitLogManager.getFileList(this.conf, logDirs, splitMeta ? META_FILTER : NON_META_FILTER); 110 LOG.info("{} WAL count={}, meta={}", serverName, fileStatuses.length, splitMeta); 111 return Lists.newArrayList(fileStatuses); 112 } 113 114 private Path getWALSplitDir(ServerName serverName) { 115 Path logDir = 116 new Path(this.rootDir, AbstractFSWALProvider.getWALDirectoryName(serverName.toString())); 117 return logDir.suffix(AbstractFSWALProvider.SPLITTING_EXT); 118 } 119 120 public void deleteSplitWAL(String wal) throws IOException { 121 fs.delete(new Path(wal), false); 122 } 123 124 public void deleteWALDir(ServerName serverName) throws IOException { 125 Path splitDir = getWALSplitDir(serverName); 126 if (!fs.delete(splitDir, false)) { 127 LOG.warn("Failed delete {}", splitDir); 128 } 129 } 130 131 public boolean isSplitWALFinished(String walPath) throws IOException { 132 return !fs.exists(new Path(rootDir, walPath)); 133 } 134 135 @VisibleForTesting 136 List<Procedure> createSplitWALProcedures(List<FileStatus> splittingWALs, 137 ServerName crashedServer) { 138 return splittingWALs.stream() 139 .map(wal -> new SplitWALProcedure(wal.getPath().toString(), crashedServer)) 140 .collect(Collectors.toList()); 141 } 142 143 /** 144 * Acquire a split WAL worker 145 * @param procedure split WAL task 146 * @return an available region server which could execute this task 147 * @throws ProcedureSuspendedException if there is no available worker, 148 * it will throw this exception to WAIT the procedure. 149 */ 150 public ServerName acquireSplitWALWorker(Procedure<?> procedure) 151 throws ProcedureSuspendedException { 152 Optional<ServerName> worker = splitWorkerAssigner.acquire(); 153 if (worker.isPresent()) { 154 LOG.debug("Acquired split WAL worker={}", worker.get()); 155 return worker.get(); 156 } 157 splitWorkerAssigner.suspend(procedure); 158 throw new ProcedureSuspendedException(); 159 } 160 161 /** 162 * After the worker finished the split WAL task, it will release the worker, and wake up all the 163 * suspend procedures in the ProcedureEvent 164 * @param worker worker which is about to release 165 * @param scheduler scheduler which is to wake up the procedure event 166 */ 167 public void releaseSplitWALWorker(ServerName worker, MasterProcedureScheduler scheduler) { 168 LOG.debug("Release split WAL worker={}", worker); 169 splitWorkerAssigner.release(worker); 170 splitWorkerAssigner.wake(scheduler); 171 } 172 173 /** 174 * When master restart, there will be a new splitWorkerAssigner. But if there are splitting WAL 175 * tasks running on the region server side, they will not be count by the new splitWorkerAssigner. 176 * Thus we should add the workers of running tasks to the assigner when we load the procedures 177 * from MasterProcWALs. 178 * @param worker region server which is executing a split WAL task 179 */ 180 public void addUsedSplitWALWorker(ServerName worker){ 181 splitWorkerAssigner.addUsedWorker(worker); 182 } 183 184 /** 185 * help assign and release a worker for each WAL splitting task 186 * For each worker, concurrent running splitting task should be no more than maxSplitTasks 187 * If a task failed to acquire a worker, it will suspend and wait for workers available 188 * 189 */ 190 private static final class SplitWorkerAssigner implements ServerListener { 191 private int maxSplitTasks; 192 private final ProcedureEvent<?> event; 193 private Map<ServerName, Integer> currentWorkers = new HashMap<>(); 194 private MasterServices master; 195 196 public SplitWorkerAssigner(MasterServices master, int maxSplitTasks) { 197 this.maxSplitTasks = maxSplitTasks; 198 this.master = master; 199 this.event = new ProcedureEvent<>("split-WAL-worker-assigning"); 200 this.master.getServerManager().registerListener(this); 201 } 202 203 public synchronized Optional<ServerName> acquire() { 204 List<ServerName> serverList = master.getServerManager().getOnlineServersList(); 205 Collections.shuffle(serverList); 206 Optional<ServerName> worker = serverList.stream().filter( 207 serverName -> !currentWorkers.containsKey(serverName) || currentWorkers.get(serverName) > 0) 208 .findAny(); 209 if (worker.isPresent()) { 210 currentWorkers.compute(worker.get(), (serverName, 211 availableWorker) -> availableWorker == null ? maxSplitTasks - 1 : availableWorker - 1); 212 } 213 return worker; 214 } 215 216 public synchronized void release(ServerName serverName) { 217 currentWorkers.compute(serverName, (k, v) -> v == null ? null : v + 1); 218 } 219 220 public void suspend(Procedure<?> proc) { 221 event.suspend(); 222 event.suspendIfNotReady(proc); 223 } 224 225 public void wake(MasterProcedureScheduler scheduler) { 226 if (!event.isReady()) { 227 event.wake(scheduler); 228 } 229 } 230 231 @Override 232 public void serverAdded(ServerName worker) { 233 this.wake(master.getMasterProcedureExecutor().getEnvironment().getProcedureScheduler()); 234 } 235 236 public synchronized void addUsedWorker(ServerName worker) { 237 // load used worker when master restart 238 currentWorkers.compute(worker, (serverName, 239 availableWorker) -> availableWorker == null ? maxSplitTasks - 1 : availableWorker - 1); 240 } 241 } 242}