001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.master; 019 020import java.util.Collections; 021import java.util.HashMap; 022import java.util.List; 023import java.util.Map; 024import java.util.Set; 025import java.util.function.Supplier; 026import java.util.stream.Collectors; 027import java.util.stream.Stream; 028import org.apache.hadoop.fs.FileStatus; 029import org.apache.hadoop.hbase.HBaseInterfaceAudience; 030import org.apache.hadoop.hbase.ServerName; 031import org.apache.hadoop.hbase.client.AsyncClusterConnection; 032import org.apache.hadoop.hbase.master.HMaster; 033import org.apache.hadoop.hbase.master.MasterServices; 034import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate; 035import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure; 036import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager; 037import org.apache.hadoop.hbase.replication.ReplicationException; 038import org.apache.hadoop.hbase.replication.ReplicationGroupOffset; 039import org.apache.hadoop.hbase.replication.ReplicationOffsetUtil; 040import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; 041import org.apache.hadoop.hbase.replication.ReplicationQueueData; 042import org.apache.hadoop.hbase.replication.ReplicationQueueId; 043import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 044import org.apache.yetus.audience.InterfaceAudience; 045import org.slf4j.Logger; 046import org.slf4j.LoggerFactory; 047 048import org.apache.hbase.thirdparty.com.google.common.base.Predicate; 049import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; 050import org.apache.hbase.thirdparty.org.apache.commons.collections4.MapUtils; 051 052/** 053 * Implementation of a log cleaner that checks if a log is still scheduled for replication before 054 * deleting it when its TTL is over. 055 * <p/> 056 * The logic is a bit complicated after we switch to use table based replication queue storage, see 057 * the design doc in HBASE-27109 and the comments in HBASE-27214 for more details. 058 */ 059@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 060public class ReplicationLogCleaner extends BaseLogCleanerDelegate { 061 private static final Logger LOG = LoggerFactory.getLogger(ReplicationLogCleaner.class); 062 private Set<ServerName> notFullyDeadServers; 063 private Set<String> peerIds; 064 // ServerName -> PeerId -> WalGroup -> Offset 065 // Here the server name is the source server name, so we can make sure that there is only one 066 // queue for a given peer, that why we can use a String peerId as key instead of 067 // ReplicationQueueId. 068 private Map<ServerName, Map<String, Map<String, ReplicationGroupOffset>>> replicationOffsets; 069 private MasterServices masterService; 070 private ReplicationLogCleanerBarrier barrier; 071 private ReplicationPeerManager rpm; 072 private Supplier<Set<ServerName>> getNotFullyDeadServers; 073 074 private boolean canFilter; 075 private boolean stopped = false; 076 077 @Override 078 public void preClean() { 079 if (this.getConf() == null || isAsyncClusterConnectionClosedOrNull()) { 080 LOG.warn( 081 "Skipping preClean because configuration is null or asyncClusterConnection is unavailable."); 082 return; 083 } 084 085 try { 086 if (!rpm.getQueueStorage().hasData()) { 087 return; 088 } 089 } catch (ReplicationException e) { 090 LOG.error("Error occurred while executing queueStorage.hasData()", e); 091 return; 092 } 093 canFilter = barrier.start(); 094 if (canFilter) { 095 notFullyDeadServers = getNotFullyDeadServers.get(); 096 peerIds = rpm.listPeers(null).stream().map(ReplicationPeerDescription::getPeerId) 097 .collect(Collectors.toSet()); 098 // must get the not fully dead servers first and then get the replication queue data, in this 099 // way we can make sure that, we should have added the missing replication queues for the dead 100 // region servers recorded in the above set, otherwise the logic in the 101 // filterForDeadRegionServer method may lead us delete wal still in use. 102 List<ReplicationQueueData> allQueueData; 103 try { 104 allQueueData = rpm.getQueueStorage().listAllQueues(); 105 } catch (ReplicationException e) { 106 LOG.error("Can not list all replication queues, give up cleaning", e); 107 barrier.stop(); 108 canFilter = false; 109 notFullyDeadServers = null; 110 peerIds = null; 111 return; 112 } 113 replicationOffsets = new HashMap<>(); 114 for (ReplicationQueueData queueData : allQueueData) { 115 ReplicationQueueId queueId = queueData.getId(); 116 ServerName serverName = queueId.getServerWALsBelongTo(); 117 Map<String, Map<String, ReplicationGroupOffset>> peerId2Offsets = 118 replicationOffsets.computeIfAbsent(serverName, k -> new HashMap<>()); 119 Map<String, ReplicationGroupOffset> offsets = 120 peerId2Offsets.computeIfAbsent(queueId.getPeerId(), k -> new HashMap<>()); 121 offsets.putAll(queueData.getOffsets()); 122 } 123 } else { 124 LOG.info("Skip replication log cleaner because an AddPeerProcedure is running"); 125 } 126 } 127 128 @Override 129 public void postClean() { 130 if (canFilter) { 131 barrier.stop(); 132 canFilter = false; 133 // release memory 134 notFullyDeadServers = null; 135 peerIds = null; 136 replicationOffsets = null; 137 } 138 } 139 140 private boolean shouldDelete(ReplicationGroupOffset offset, FileStatus file) { 141 return !ReplicationOffsetUtil.shouldReplicate(offset, file.getPath().getName()); 142 } 143 144 private boolean filterForLiveRegionServer(ServerName serverName, FileStatus file) { 145 Map<String, Map<String, ReplicationGroupOffset>> peerId2Offsets = 146 replicationOffsets.get(serverName); 147 if (peerId2Offsets == null) { 148 // if there are replication queues missing, we can not delete the wal 149 return false; 150 } 151 for (String peerId : peerIds) { 152 Map<String, ReplicationGroupOffset> offsets = peerId2Offsets.get(peerId); 153 // if no replication queue for a peer, we can not delete the wal 154 if (offsets == null) { 155 return false; 156 } 157 String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(file.getPath().getName()); 158 ReplicationGroupOffset offset = offsets.get(walGroupId); 159 // if a replication queue still need to replicate this wal, we can not delete it 160 if (!shouldDelete(offset, file)) { 161 return false; 162 } 163 } 164 // if all replication queues have already finished replicating this wal, we can delete it. 165 return true; 166 } 167 168 private boolean filterForDeadRegionServer(ServerName serverName, FileStatus file) { 169 Map<String, Map<String, ReplicationGroupOffset>> peerId2Offsets = 170 replicationOffsets.get(serverName); 171 if (peerId2Offsets == null) { 172 // no replication queue for this dead rs, we can delete all wal files for it 173 return true; 174 } 175 for (String peerId : peerIds) { 176 Map<String, ReplicationGroupOffset> offsets = peerId2Offsets.get(peerId); 177 if (offsets == null) { 178 // for dead server, we only care about existing replication queues, as we will delete a 179 // queue after we finish replicating it. 180 continue; 181 } 182 String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(file.getPath().getName()); 183 ReplicationGroupOffset offset = offsets.get(walGroupId); 184 // if a replication queue still need to replicate this wal, we can not delete it 185 if (!shouldDelete(offset, file)) { 186 return false; 187 } 188 } 189 // if all replication queues have already finished replicating this wal, we can delete it. 190 return true; 191 } 192 193 @Override 194 public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) { 195 // all members of this class are null if replication is disabled, 196 // so we cannot filter the files 197 if (this.getConf() == null) { 198 return files; 199 } 200 201 if (isAsyncClusterConnectionClosedOrNull()) { 202 LOG.warn("Skip getting deletable files because asyncClusterConnection is unavailable."); 203 // asyncClusterConnection is unavailable, we shouldn't delete any files. 204 return Collections.emptyList(); 205 } 206 207 try { 208 if (!rpm.getQueueStorage().hasData()) { 209 return files; 210 } 211 } catch (ReplicationException e) { 212 LOG.error("Error occurred while executing queueStorage.hasData()", e); 213 return Collections.emptyList(); 214 } 215 if (!canFilter) { 216 // We can not delete anything if there are AddPeerProcedure running at the same time 217 // See HBASE-27214 for more details. 218 return Collections.emptyList(); 219 } 220 221 return Iterables.filter(files, new Predicate<FileStatus>() { 222 @Override 223 public boolean apply(FileStatus file) { 224 // just for overriding the findbugs NP warnings, as the parameter is marked as Nullable in 225 // the guava Predicate. 226 if (file == null) { 227 return false; 228 } 229 if (peerIds.isEmpty()) { 230 // no peer, can always delete 231 return true; 232 } 233 // not a valid wal file name, delete 234 if (!AbstractFSWALProvider.validateWALFilename(file.getPath().getName())) { 235 return true; 236 } 237 // meta wal is always deletable as we will never replicate it 238 if (AbstractFSWALProvider.isMetaFile(file.getPath())) { 239 return true; 240 } 241 ServerName serverName = 242 AbstractFSWALProvider.parseServerNameFromWALName(file.getPath().getName()); 243 if (notFullyDeadServers.contains(serverName)) { 244 return filterForLiveRegionServer(serverName, file); 245 } else { 246 return filterForDeadRegionServer(serverName, file); 247 } 248 } 249 }); 250 } 251 252 private Set<ServerName> getNotFullyDeadServers(MasterServices services) { 253 List<ServerName> onlineServers = services.getServerManager().getOnlineServersList(); 254 return Stream.concat(onlineServers.stream(), 255 services.getMasterProcedureExecutor().getProcedures().stream() 256 .filter(p -> p instanceof ServerCrashProcedure).filter(p -> !p.isFinished()) 257 .map(p -> ((ServerCrashProcedure) p).getServerName())) 258 .collect(Collectors.toSet()); 259 } 260 261 @Override 262 public void init(Map<String, Object> params) { 263 super.init(params); 264 if (MapUtils.isNotEmpty(params)) { 265 Object master = params.get(HMaster.MASTER); 266 if (master instanceof MasterServices) { 267 masterService = (MasterServices) master; 268 barrier = masterService.getReplicationLogCleanerBarrier(); 269 rpm = masterService.getReplicationPeerManager(); 270 getNotFullyDeadServers = () -> getNotFullyDeadServers(masterService); 271 return; 272 } 273 } 274 throw new IllegalArgumentException("Missing " + HMaster.MASTER + " parameter"); 275 } 276 277 @Override 278 public void stop(String why) { 279 this.stopped = true; 280 } 281 282 @Override 283 public boolean isStopped() { 284 return this.stopped; 285 } 286 287 /** 288 * Check if asyncClusterConnection is null or closed. 289 * @return true if asyncClusterConnection is null or is closed, false otherwise 290 */ 291 private boolean isAsyncClusterConnectionClosedOrNull() { 292 AsyncClusterConnection asyncClusterConnection = masterService.getAsyncClusterConnection(); 293 return asyncClusterConnection == null || asyncClusterConnection.isClosed(); 294 } 295}