001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master; 019 020import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER; 021import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER; 022import static org.apache.hadoop.hbase.master.MasterWalManager.META_FILTER; 023import static org.apache.hadoop.hbase.master.MasterWalManager.NON_META_FILTER; 024 025import java.io.IOException; 026import java.util.Arrays; 027import java.util.Collections; 028import java.util.List; 029import java.util.stream.Collectors; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.fs.FileStatus; 032import org.apache.hadoop.fs.FileSystem; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException; 035import org.apache.hadoop.hbase.HConstants; 036import org.apache.hadoop.hbase.ServerName; 037import org.apache.hadoop.hbase.master.procedure.SplitWALProcedure; 038import org.apache.hadoop.hbase.procedure2.Procedure; 039import org.apache.hadoop.hbase.procedure2.ProcedureEvent; 040import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; 041import org.apache.hadoop.hbase.util.CommonFSUtils; 042import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 043import org.apache.hadoop.hbase.wal.WALSplitUtil; 044import org.apache.yetus.audience.InterfaceAudience; 045import org.slf4j.Logger; 046import org.slf4j.LoggerFactory; 047 048/** 049 * Create {@link SplitWALProcedure} for each WAL which need to split. Manage the workers for each 050 * {@link SplitWALProcedure}. Total number of workers is (number of online servers) * 051 * (HBASE_SPLIT_WAL_MAX_SPLITTER). Helps assign and release workers for split tasks. Provide helper 052 * method to delete split WAL file and directory. The user can get the SplitWALProcedures via 053 * splitWALs(crashedServer, splitMeta) can get the files that need to split via 054 * getWALsToSplit(crashedServer, splitMeta) can delete the splitting WAL and directory via 055 * deleteSplitWAL(wal) and deleteSplitWAL(crashedServer) can check if splitting WALs of a crashed 056 * server is success via isSplitWALFinished(walPath) can acquire and release a worker for splitting 057 * WAL via acquireSplitWALWorker(procedure) and releaseSplitWALWorker(worker, scheduler) This class 058 * is to replace the zk-based WAL splitting related code, {@link MasterWalManager}, 059 * {@link SplitLogManager}, {@link org.apache.hadoop.hbase.zookeeper.ZKSplitLog} and 060 * {@link org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination} can be removed after 061 * we switch to procedure-based WAL splitting. 062 * @see SplitLogManager for the original distributed split WAL manager. 063 */ 064@InterfaceAudience.Private 065public class SplitWALManager { 066 private static final Logger LOG = LoggerFactory.getLogger(SplitWALManager.class); 067 068 private final MasterServices master; 069 private final WorkerAssigner splitWorkerAssigner; 070 private final Path rootDir; 071 private final FileSystem fs; 072 private final Configuration conf; 073 private final Path walArchiveDir; 074 075 public SplitWALManager(MasterServices master) throws IOException { 076 this.master = master; 077 this.conf = master.getConfiguration(); 078 this.splitWorkerAssigner = new WorkerAssigner(this.master, 079 conf.getInt(HBASE_SPLIT_WAL_MAX_SPLITTER, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER), 080 new ProcedureEvent<>("split-WAL-worker-assigning")); 081 this.rootDir = master.getMasterFileSystem().getWALRootDir(); 082 this.fs = master.getMasterFileSystem().getWALFileSystem(); 083 this.walArchiveDir = new Path(this.rootDir, HConstants.HREGION_OLDLOGDIR_NAME); 084 } 085 086 public List<Procedure> splitWALs(ServerName crashedServer, boolean splitMeta) throws IOException { 087 try { 088 // 1. list all splitting files 089 List<FileStatus> splittingFiles = getWALsToSplit(crashedServer, splitMeta); 090 // 2. create corresponding procedures 091 return createSplitWALProcedures(splittingFiles, crashedServer); 092 } catch (IOException e) { 093 LOG.error("Failed to create procedures for splitting WALs of {}", crashedServer, e); 094 throw e; 095 } 096 } 097 098 public List<FileStatus> getWALsToSplit(ServerName serverName, boolean splitMeta) 099 throws IOException { 100 List<Path> logDirs = master.getMasterWalManager().getLogDirs(Collections.singleton(serverName)); 101 List<FileStatus> fileStatuses = 102 SplitLogManager.getFileList(this.conf, logDirs, splitMeta ? META_FILTER : NON_META_FILTER); 103 LOG.info("{} WAL count={}, meta={}", serverName, fileStatuses.size(), splitMeta); 104 return fileStatuses; 105 } 106 107 private Path getWALSplitDir(ServerName serverName) { 108 Path logDir = 109 new Path(this.rootDir, AbstractFSWALProvider.getWALDirectoryName(serverName.toString())); 110 return logDir.suffix(AbstractFSWALProvider.SPLITTING_EXT); 111 } 112 113 /** 114 * Archive processed WAL 115 */ 116 public void archive(String wal) throws IOException { 117 WALSplitUtil.moveWAL(this.fs, new Path(wal), this.walArchiveDir); 118 } 119 120 public void deleteWALDir(ServerName serverName) throws IOException { 121 Path splitDir = getWALSplitDir(serverName); 122 try { 123 if (!fs.delete(splitDir, false)) { 124 LOG.warn("Failed delete {}, contains {}", splitDir, fs.listFiles(splitDir, true)); 125 } 126 } catch (PathIsNotEmptyDirectoryException e) { 127 FileStatus[] files = CommonFSUtils.listStatus(fs, splitDir); 128 LOG.warn("PathIsNotEmptyDirectoryException {}", 129 Arrays.stream(files).map(f -> f.getPath()).collect(Collectors.toList())); 130 throw e; 131 } 132 } 133 134 public boolean isSplitWALFinished(String walPath) throws IOException { 135 return !fs.exists(new Path(rootDir, walPath)); 136 } 137 138 List<Procedure> createSplitWALProcedures(List<FileStatus> splittingWALs, 139 ServerName crashedServer) { 140 return splittingWALs.stream() 141 .map(wal -> new SplitWALProcedure(wal.getPath().toString(), crashedServer)) 142 .collect(Collectors.toList()); 143 } 144 145 /** 146 * Acquire a split WAL worker 147 * @param procedure split WAL task 148 * @return an available region server which could execute this task 149 * @throws ProcedureSuspendedException if there is no available worker, it will throw this 150 * exception to WAIT the procedure. 151 */ 152 public ServerName acquireSplitWALWorker(Procedure<?> procedure) 153 throws ProcedureSuspendedException { 154 ServerName worker = splitWorkerAssigner.acquire(procedure); 155 LOG.debug("Acquired split WAL worker={}", worker); 156 return worker; 157 } 158 159 /** 160 * After the worker finished the split WAL task, it will release the worker, and wake up all the 161 * suspend procedures in the ProcedureEvent 162 * @param worker worker which is about to release 163 */ 164 public void releaseSplitWALWorker(ServerName worker) { 165 LOG.debug("Release split WAL worker={}", worker); 166 splitWorkerAssigner.release(worker); 167 } 168 169 /** 170 * When master restart, there will be a new splitWorkerAssigner. But if there are splitting WAL 171 * tasks running on the region server side, they will not be count by the new splitWorkerAssigner. 172 * Thus we should add the workers of running tasks to the assigner when we load the procedures 173 * from MasterProcWALs. 174 * @param worker region server which is executing a split WAL task 175 */ 176 public void addUsedSplitWALWorker(ServerName worker) { 177 splitWorkerAssigner.addUsedWorker(worker); 178 } 179}