001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.master; 019 020import java.util.Collections; 021import java.util.HashMap; 022import java.util.List; 023import java.util.Map; 024import java.util.Set; 025import java.util.function.Supplier; 026import java.util.stream.Collectors; 027import java.util.stream.Stream; 028import org.apache.hadoop.fs.FileStatus; 029import org.apache.hadoop.hbase.HBaseInterfaceAudience; 030import org.apache.hadoop.hbase.ServerName; 031import org.apache.hadoop.hbase.master.HMaster; 032import org.apache.hadoop.hbase.master.MasterServices; 033import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate; 034import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure; 035import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager; 036import org.apache.hadoop.hbase.replication.ReplicationException; 037import org.apache.hadoop.hbase.replication.ReplicationGroupOffset; 038import org.apache.hadoop.hbase.replication.ReplicationOffsetUtil; 039import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; 040import org.apache.hadoop.hbase.replication.ReplicationQueueData; 041import org.apache.hadoop.hbase.replication.ReplicationQueueId; 042import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 043import org.apache.yetus.audience.InterfaceAudience; 044import org.slf4j.Logger; 045import org.slf4j.LoggerFactory; 046 047import org.apache.hbase.thirdparty.com.google.common.base.Predicate; 048import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; 049import org.apache.hbase.thirdparty.org.apache.commons.collections4.MapUtils; 050 051/** 052 * Implementation of a log cleaner that checks if a log is still scheduled for replication before 053 * deleting it when its TTL is over. 054 * <p/> 055 * The logic is a bit complicated after we switch to use table based replication queue storage, see 056 * the design doc in HBASE-27109 and the comments in HBASE-27214 for more details. 057 */ 058@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 059public class ReplicationLogCleaner extends BaseLogCleanerDelegate { 060 private static final Logger LOG = LoggerFactory.getLogger(ReplicationLogCleaner.class); 061 private Set<ServerName> notFullyDeadServers; 062 private Set<String> peerIds; 063 // ServerName -> PeerId -> WalGroup -> Offset 064 // Here the server name is the source server name, so we can make sure that there is only one 065 // queue for a given peer, that why we can use a String peerId as key instead of 066 // ReplicationQueueId. 067 private Map<ServerName, Map<String, Map<String, ReplicationGroupOffset>>> replicationOffsets; 068 private ReplicationLogCleanerBarrier barrier; 069 private ReplicationPeerManager rpm; 070 private Supplier<Set<ServerName>> getNotFullyDeadServers; 071 072 private boolean canFilter; 073 private boolean stopped = false; 074 075 @Override 076 public void preClean() { 077 if (this.getConf() == null) { 078 return; 079 } 080 try { 081 if (!rpm.getQueueStorage().hasData()) { 082 return; 083 } 084 } catch (ReplicationException e) { 085 LOG.error("Error occurred while executing queueStorage.hasData()", e); 086 return; 087 } 088 canFilter = barrier.start(); 089 if (canFilter) { 090 notFullyDeadServers = getNotFullyDeadServers.get(); 091 peerIds = rpm.listPeers(null).stream().map(ReplicationPeerDescription::getPeerId) 092 .collect(Collectors.toSet()); 093 // must get the not fully dead servers first and then get the replication queue data, in this 094 // way we can make sure that, we should have added the missing replication queues for the dead 095 // region servers recorded in the above set, otherwise the logic in the 096 // filterForDeadRegionServer method may lead us delete wal still in use. 097 List<ReplicationQueueData> allQueueData; 098 try { 099 allQueueData = rpm.getQueueStorage().listAllQueues(); 100 } catch (ReplicationException e) { 101 LOG.error("Can not list all replication queues, give up cleaning", e); 102 barrier.stop(); 103 canFilter = false; 104 notFullyDeadServers = null; 105 peerIds = null; 106 return; 107 } 108 replicationOffsets = new HashMap<>(); 109 for (ReplicationQueueData queueData : allQueueData) { 110 ReplicationQueueId queueId = queueData.getId(); 111 ServerName serverName = queueId.getServerWALsBelongTo(); 112 Map<String, Map<String, ReplicationGroupOffset>> peerId2Offsets = 113 replicationOffsets.computeIfAbsent(serverName, k -> new HashMap<>()); 114 Map<String, ReplicationGroupOffset> offsets = 115 peerId2Offsets.computeIfAbsent(queueId.getPeerId(), k -> new HashMap<>()); 116 offsets.putAll(queueData.getOffsets()); 117 } 118 } else { 119 LOG.info("Skip replication log cleaner because an AddPeerProcedure is running"); 120 } 121 } 122 123 @Override 124 public void postClean() { 125 if (canFilter) { 126 barrier.stop(); 127 canFilter = false; 128 // release memory 129 notFullyDeadServers = null; 130 peerIds = null; 131 replicationOffsets = null; 132 } 133 } 134 135 private boolean shouldDelete(ReplicationGroupOffset offset, FileStatus file) { 136 return !ReplicationOffsetUtil.shouldReplicate(offset, file.getPath().getName()); 137 } 138 139 private boolean filterForLiveRegionServer(ServerName serverName, FileStatus file) { 140 Map<String, Map<String, ReplicationGroupOffset>> peerId2Offsets = 141 replicationOffsets.get(serverName); 142 if (peerId2Offsets == null) { 143 // if there are replication queues missing, we can not delete the wal 144 return false; 145 } 146 for (String peerId : peerIds) { 147 Map<String, ReplicationGroupOffset> offsets = peerId2Offsets.get(peerId); 148 // if no replication queue for a peer, we can not delete the wal 149 if (offsets == null) { 150 return false; 151 } 152 String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(file.getPath().getName()); 153 ReplicationGroupOffset offset = offsets.get(walGroupId); 154 // if a replication queue still need to replicate this wal, we can not delete it 155 if (!shouldDelete(offset, file)) { 156 return false; 157 } 158 } 159 // if all replication queues have already finished replicating this wal, we can delete it. 160 return true; 161 } 162 163 private boolean filterForDeadRegionServer(ServerName serverName, FileStatus file) { 164 Map<String, Map<String, ReplicationGroupOffset>> peerId2Offsets = 165 replicationOffsets.get(serverName); 166 if (peerId2Offsets == null) { 167 // no replication queue for this dead rs, we can delete all wal files for it 168 return true; 169 } 170 for (String peerId : peerIds) { 171 Map<String, ReplicationGroupOffset> offsets = peerId2Offsets.get(peerId); 172 if (offsets == null) { 173 // for dead server, we only care about existing replication queues, as we will delete a 174 // queue after we finish replicating it. 175 continue; 176 } 177 String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(file.getPath().getName()); 178 ReplicationGroupOffset offset = offsets.get(walGroupId); 179 // if a replication queue still need to replicate this wal, we can not delete it 180 if (!shouldDelete(offset, file)) { 181 return false; 182 } 183 } 184 // if all replication queues have already finished replicating this wal, we can delete it. 185 return true; 186 } 187 188 @Override 189 public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) { 190 // all members of this class are null if replication is disabled, 191 // so we cannot filter the files 192 if (this.getConf() == null) { 193 return files; 194 } 195 try { 196 if (!rpm.getQueueStorage().hasData()) { 197 return files; 198 } 199 } catch (ReplicationException e) { 200 LOG.error("Error occurred while executing queueStorage.hasData()", e); 201 return Collections.emptyList(); 202 } 203 if (!canFilter) { 204 // We can not delete anything if there are AddPeerProcedure running at the same time 205 // See HBASE-27214 for more details. 206 return Collections.emptyList(); 207 } 208 209 return Iterables.filter(files, new Predicate<FileStatus>() { 210 @Override 211 public boolean apply(FileStatus file) { 212 // just for overriding the findbugs NP warnings, as the parameter is marked as Nullable in 213 // the guava Predicate. 214 if (file == null) { 215 return false; 216 } 217 if (peerIds.isEmpty()) { 218 // no peer, can always delete 219 return true; 220 } 221 // not a valid wal file name, delete 222 if (!AbstractFSWALProvider.validateWALFilename(file.getPath().getName())) { 223 return true; 224 } 225 // meta wal is always deletable as we will never replicate it 226 if (AbstractFSWALProvider.isMetaFile(file.getPath())) { 227 return true; 228 } 229 ServerName serverName = 230 AbstractFSWALProvider.parseServerNameFromWALName(file.getPath().getName()); 231 if (notFullyDeadServers.contains(serverName)) { 232 return filterForLiveRegionServer(serverName, file); 233 } else { 234 return filterForDeadRegionServer(serverName, file); 235 } 236 } 237 }); 238 } 239 240 private Set<ServerName> getNotFullyDeadServers(MasterServices services) { 241 List<ServerName> onlineServers = services.getServerManager().getOnlineServersList(); 242 return Stream.concat(onlineServers.stream(), 243 services.getMasterProcedureExecutor().getProcedures().stream() 244 .filter(p -> p instanceof ServerCrashProcedure).filter(p -> !p.isFinished()) 245 .map(p -> ((ServerCrashProcedure) p).getServerName())) 246 .collect(Collectors.toSet()); 247 } 248 249 @Override 250 public void init(Map<String, Object> params) { 251 super.init(params); 252 if (MapUtils.isNotEmpty(params)) { 253 Object master = params.get(HMaster.MASTER); 254 if (master != null && master instanceof MasterServices) { 255 MasterServices m = (MasterServices) master; 256 barrier = m.getReplicationLogCleanerBarrier(); 257 rpm = m.getReplicationPeerManager(); 258 getNotFullyDeadServers = () -> getNotFullyDeadServers(m); 259 return; 260 } 261 } 262 throw new IllegalArgumentException("Missing " + HMaster.MASTER + " parameter"); 263 } 264 265 @Override 266 public void stop(String why) { 267 this.stopped = true; 268 } 269 270 @Override 271 public boolean isStopped() { 272 return this.stopped; 273 } 274}