001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.master;
019
020import java.util.Collections;
021import java.util.HashMap;
022import java.util.List;
023import java.util.Map;
024import java.util.Set;
025import java.util.function.Supplier;
026import java.util.stream.Collectors;
027import java.util.stream.Stream;
028import org.apache.hadoop.fs.FileStatus;
029import org.apache.hadoop.hbase.HBaseInterfaceAudience;
030import org.apache.hadoop.hbase.ServerName;
031import org.apache.hadoop.hbase.master.HMaster;
032import org.apache.hadoop.hbase.master.MasterServices;
033import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate;
034import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
035import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
036import org.apache.hadoop.hbase.replication.ReplicationException;
037import org.apache.hadoop.hbase.replication.ReplicationGroupOffset;
038import org.apache.hadoop.hbase.replication.ReplicationOffsetUtil;
039import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
040import org.apache.hadoop.hbase.replication.ReplicationQueueData;
041import org.apache.hadoop.hbase.replication.ReplicationQueueId;
042import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
043import org.apache.yetus.audience.InterfaceAudience;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046
047import org.apache.hbase.thirdparty.com.google.common.base.Predicate;
048import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
049import org.apache.hbase.thirdparty.org.apache.commons.collections4.MapUtils;
050
051/**
052 * Implementation of a log cleaner that checks if a log is still scheduled for replication before
053 * deleting it when its TTL is over.
054 * <p/>
055 * The logic is a bit complicated after we switch to use table based replication queue storage, see
056 * the design doc in HBASE-27109 and the comments in HBASE-27214 for more details.
057 */
058@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
059public class ReplicationLogCleaner extends BaseLogCleanerDelegate {
060  private static final Logger LOG = LoggerFactory.getLogger(ReplicationLogCleaner.class);
061  private Set<ServerName> notFullyDeadServers;
062  private Set<String> peerIds;
063  // ServerName -> PeerId -> WalGroup -> Offset
064  // Here the server name is the source server name, so we can make sure that there is only one
065  // queue for a given peer, that why we can use a String peerId as key instead of
066  // ReplicationQueueId.
067  private Map<ServerName, Map<String, Map<String, ReplicationGroupOffset>>> replicationOffsets;
068  private ReplicationLogCleanerBarrier barrier;
069  private ReplicationPeerManager rpm;
070  private Supplier<Set<ServerName>> getNotFullyDeadServers;
071
072  private boolean canFilter;
073  private boolean stopped = false;
074
075  @Override
076  public void preClean() {
077    if (this.getConf() == null) {
078      return;
079    }
080    try {
081      if (!rpm.getQueueStorage().hasData()) {
082        return;
083      }
084    } catch (ReplicationException e) {
085      LOG.error("Error occurred while executing queueStorage.hasData()", e);
086      return;
087    }
088    canFilter = barrier.start();
089    if (canFilter) {
090      notFullyDeadServers = getNotFullyDeadServers.get();
091      peerIds = rpm.listPeers(null).stream().map(ReplicationPeerDescription::getPeerId)
092        .collect(Collectors.toSet());
093      // must get the not fully dead servers first and then get the replication queue data, in this
094      // way we can make sure that, we should have added the missing replication queues for the dead
095      // region servers recorded in the above set, otherwise the logic in the
096      // filterForDeadRegionServer method may lead us delete wal still in use.
097      List<ReplicationQueueData> allQueueData;
098      try {
099        allQueueData = rpm.getQueueStorage().listAllQueues();
100      } catch (ReplicationException e) {
101        LOG.error("Can not list all replication queues, give up cleaning", e);
102        barrier.stop();
103        canFilter = false;
104        notFullyDeadServers = null;
105        peerIds = null;
106        return;
107      }
108      replicationOffsets = new HashMap<>();
109      for (ReplicationQueueData queueData : allQueueData) {
110        ReplicationQueueId queueId = queueData.getId();
111        ServerName serverName = queueId.getServerWALsBelongTo();
112        Map<String, Map<String, ReplicationGroupOffset>> peerId2Offsets =
113          replicationOffsets.computeIfAbsent(serverName, k -> new HashMap<>());
114        Map<String, ReplicationGroupOffset> offsets =
115          peerId2Offsets.computeIfAbsent(queueId.getPeerId(), k -> new HashMap<>());
116        offsets.putAll(queueData.getOffsets());
117      }
118    } else {
119      LOG.info("Skip replication log cleaner because an AddPeerProcedure is running");
120    }
121  }
122
123  @Override
124  public void postClean() {
125    if (canFilter) {
126      barrier.stop();
127      canFilter = false;
128      // release memory
129      notFullyDeadServers = null;
130      peerIds = null;
131      replicationOffsets = null;
132    }
133  }
134
135  private boolean shouldDelete(ReplicationGroupOffset offset, FileStatus file) {
136    return !ReplicationOffsetUtil.shouldReplicate(offset, file.getPath().getName());
137  }
138
139  private boolean filterForLiveRegionServer(ServerName serverName, FileStatus file) {
140    Map<String, Map<String, ReplicationGroupOffset>> peerId2Offsets =
141      replicationOffsets.get(serverName);
142    if (peerId2Offsets == null) {
143      // if there are replication queues missing, we can not delete the wal
144      return false;
145    }
146    for (String peerId : peerIds) {
147      Map<String, ReplicationGroupOffset> offsets = peerId2Offsets.get(peerId);
148      // if no replication queue for a peer, we can not delete the wal
149      if (offsets == null) {
150        return false;
151      }
152      String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(file.getPath().getName());
153      ReplicationGroupOffset offset = offsets.get(walGroupId);
154      // if a replication queue still need to replicate this wal, we can not delete it
155      if (!shouldDelete(offset, file)) {
156        return false;
157      }
158    }
159    // if all replication queues have already finished replicating this wal, we can delete it.
160    return true;
161  }
162
163  private boolean filterForDeadRegionServer(ServerName serverName, FileStatus file) {
164    Map<String, Map<String, ReplicationGroupOffset>> peerId2Offsets =
165      replicationOffsets.get(serverName);
166    if (peerId2Offsets == null) {
167      // no replication queue for this dead rs, we can delete all wal files for it
168      return true;
169    }
170    for (String peerId : peerIds) {
171      Map<String, ReplicationGroupOffset> offsets = peerId2Offsets.get(peerId);
172      if (offsets == null) {
173        // for dead server, we only care about existing replication queues, as we will delete a
174        // queue after we finish replicating it.
175        continue;
176      }
177      String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(file.getPath().getName());
178      ReplicationGroupOffset offset = offsets.get(walGroupId);
179      // if a replication queue still need to replicate this wal, we can not delete it
180      if (!shouldDelete(offset, file)) {
181        return false;
182      }
183    }
184    // if all replication queues have already finished replicating this wal, we can delete it.
185    return true;
186  }
187
188  @Override
189  public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) {
190    // all members of this class are null if replication is disabled,
191    // so we cannot filter the files
192    if (this.getConf() == null) {
193      return files;
194    }
195    try {
196      if (!rpm.getQueueStorage().hasData()) {
197        return files;
198      }
199    } catch (ReplicationException e) {
200      LOG.error("Error occurred while executing queueStorage.hasData()", e);
201      return Collections.emptyList();
202    }
203    if (!canFilter) {
204      // We can not delete anything if there are AddPeerProcedure running at the same time
205      // See HBASE-27214 for more details.
206      return Collections.emptyList();
207    }
208
209    return Iterables.filter(files, new Predicate<FileStatus>() {
210      @Override
211      public boolean apply(FileStatus file) {
212        // just for overriding the findbugs NP warnings, as the parameter is marked as Nullable in
213        // the guava Predicate.
214        if (file == null) {
215          return false;
216        }
217        if (peerIds.isEmpty()) {
218          // no peer, can always delete
219          return true;
220        }
221        // not a valid wal file name, delete
222        if (!AbstractFSWALProvider.validateWALFilename(file.getPath().getName())) {
223          return true;
224        }
225        // meta wal is always deletable as we will never replicate it
226        if (AbstractFSWALProvider.isMetaFile(file.getPath())) {
227          return true;
228        }
229        ServerName serverName =
230          AbstractFSWALProvider.parseServerNameFromWALName(file.getPath().getName());
231        if (notFullyDeadServers.contains(serverName)) {
232          return filterForLiveRegionServer(serverName, file);
233        } else {
234          return filterForDeadRegionServer(serverName, file);
235        }
236      }
237    });
238  }
239
240  private Set<ServerName> getNotFullyDeadServers(MasterServices services) {
241    List<ServerName> onlineServers = services.getServerManager().getOnlineServersList();
242    return Stream.concat(onlineServers.stream(),
243      services.getMasterProcedureExecutor().getProcedures().stream()
244        .filter(p -> p instanceof ServerCrashProcedure).filter(p -> !p.isFinished())
245        .map(p -> ((ServerCrashProcedure) p).getServerName()))
246      .collect(Collectors.toSet());
247  }
248
249  @Override
250  public void init(Map<String, Object> params) {
251    super.init(params);
252    if (MapUtils.isNotEmpty(params)) {
253      Object master = params.get(HMaster.MASTER);
254      if (master != null && master instanceof MasterServices) {
255        MasterServices m = (MasterServices) master;
256        barrier = m.getReplicationLogCleanerBarrier();
257        rpm = m.getReplicationPeerManager();
258        getNotFullyDeadServers = () -> getNotFullyDeadServers(m);
259        return;
260      }
261    }
262    throw new IllegalArgumentException("Missing " + HMaster.MASTER + " parameter");
263  }
264
265  @Override
266  public void stop(String why) {
267    this.stopped = true;
268  }
269
270  @Override
271  public boolean isStopped() {
272    return this.stopped;
273  }
274}