001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.master;
019
020import java.util.Collections;
021import java.util.HashMap;
022import java.util.List;
023import java.util.Map;
024import java.util.Set;
025import java.util.function.Supplier;
026import java.util.stream.Collectors;
027import java.util.stream.Stream;
028import org.apache.hadoop.fs.FileStatus;
029import org.apache.hadoop.hbase.HBaseInterfaceAudience;
030import org.apache.hadoop.hbase.ServerName;
031import org.apache.hadoop.hbase.client.AsyncClusterConnection;
032import org.apache.hadoop.hbase.master.HMaster;
033import org.apache.hadoop.hbase.master.MasterServices;
034import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate;
035import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
036import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
037import org.apache.hadoop.hbase.replication.ReplicationException;
038import org.apache.hadoop.hbase.replication.ReplicationGroupOffset;
039import org.apache.hadoop.hbase.replication.ReplicationOffsetUtil;
040import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
041import org.apache.hadoop.hbase.replication.ReplicationQueueData;
042import org.apache.hadoop.hbase.replication.ReplicationQueueId;
043import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
044import org.apache.yetus.audience.InterfaceAudience;
045import org.slf4j.Logger;
046import org.slf4j.LoggerFactory;
047
048import org.apache.hbase.thirdparty.com.google.common.base.Predicate;
049import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
050import org.apache.hbase.thirdparty.org.apache.commons.collections4.MapUtils;
051
052/**
053 * Implementation of a log cleaner that checks if a log is still scheduled for replication before
054 * deleting it when its TTL is over.
055 * <p/>
056 * The logic is a bit complicated after we switch to use table based replication queue storage, see
057 * the design doc in HBASE-27109 and the comments in HBASE-27214 for more details.
058 */
059@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
060public class ReplicationLogCleaner extends BaseLogCleanerDelegate {
061  private static final Logger LOG = LoggerFactory.getLogger(ReplicationLogCleaner.class);
062  private Set<ServerName> notFullyDeadServers;
063  private Set<String> peerIds;
064  // ServerName -> PeerId -> WalGroup -> Offset
065  // Here the server name is the source server name, so we can make sure that there is only one
066  // queue for a given peer, that why we can use a String peerId as key instead of
067  // ReplicationQueueId.
068  private Map<ServerName, Map<String, Map<String, ReplicationGroupOffset>>> replicationOffsets;
069  private MasterServices masterService;
070  private ReplicationLogCleanerBarrier barrier;
071  private ReplicationPeerManager rpm;
072  private Supplier<Set<ServerName>> getNotFullyDeadServers;
073
074  private boolean canFilter;
075  private boolean stopped = false;
076
077  @Override
078  public void preClean() {
079    if (this.getConf() == null || isAsyncClusterConnectionClosedOrNull()) {
080      LOG.warn(
081        "Skipping preClean because configuration is null or asyncClusterConnection is unavailable.");
082      return;
083    }
084
085    try {
086      if (!rpm.getQueueStorage().hasData()) {
087        return;
088      }
089    } catch (ReplicationException e) {
090      LOG.error("Error occurred while executing queueStorage.hasData()", e);
091      return;
092    }
093    canFilter = barrier.start();
094    if (canFilter) {
095      notFullyDeadServers = getNotFullyDeadServers.get();
096      peerIds = rpm.listPeers(null).stream().map(ReplicationPeerDescription::getPeerId)
097        .collect(Collectors.toSet());
098      // must get the not fully dead servers first and then get the replication queue data, in this
099      // way we can make sure that, we should have added the missing replication queues for the dead
100      // region servers recorded in the above set, otherwise the logic in the
101      // filterForDeadRegionServer method may lead us delete wal still in use.
102      List<ReplicationQueueData> allQueueData;
103      try {
104        allQueueData = rpm.getQueueStorage().listAllQueues();
105      } catch (ReplicationException e) {
106        LOG.error("Can not list all replication queues, give up cleaning", e);
107        barrier.stop();
108        canFilter = false;
109        notFullyDeadServers = null;
110        peerIds = null;
111        return;
112      }
113      replicationOffsets = new HashMap<>();
114      for (ReplicationQueueData queueData : allQueueData) {
115        ReplicationQueueId queueId = queueData.getId();
116        ServerName serverName = queueId.getServerWALsBelongTo();
117        Map<String, Map<String, ReplicationGroupOffset>> peerId2Offsets =
118          replicationOffsets.computeIfAbsent(serverName, k -> new HashMap<>());
119        Map<String, ReplicationGroupOffset> offsets =
120          peerId2Offsets.computeIfAbsent(queueId.getPeerId(), k -> new HashMap<>());
121        offsets.putAll(queueData.getOffsets());
122      }
123    } else {
124      LOG.info("Skip replication log cleaner because an AddPeerProcedure is running");
125    }
126  }
127
128  @Override
129  public void postClean() {
130    if (canFilter) {
131      barrier.stop();
132      canFilter = false;
133      // release memory
134      notFullyDeadServers = null;
135      peerIds = null;
136      replicationOffsets = null;
137    }
138  }
139
140  private boolean shouldDelete(ReplicationGroupOffset offset, FileStatus file) {
141    return !ReplicationOffsetUtil.shouldReplicate(offset, file.getPath().getName());
142  }
143
144  private boolean filterForLiveRegionServer(ServerName serverName, FileStatus file) {
145    Map<String, Map<String, ReplicationGroupOffset>> peerId2Offsets =
146      replicationOffsets.get(serverName);
147    if (peerId2Offsets == null) {
148      // if there are replication queues missing, we can not delete the wal
149      return false;
150    }
151    for (String peerId : peerIds) {
152      Map<String, ReplicationGroupOffset> offsets = peerId2Offsets.get(peerId);
153      // if no replication queue for a peer, we can not delete the wal
154      if (offsets == null) {
155        return false;
156      }
157      String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(file.getPath().getName());
158      ReplicationGroupOffset offset = offsets.get(walGroupId);
159      // if a replication queue still need to replicate this wal, we can not delete it
160      if (!shouldDelete(offset, file)) {
161        return false;
162      }
163    }
164    // if all replication queues have already finished replicating this wal, we can delete it.
165    return true;
166  }
167
168  private boolean filterForDeadRegionServer(ServerName serverName, FileStatus file) {
169    Map<String, Map<String, ReplicationGroupOffset>> peerId2Offsets =
170      replicationOffsets.get(serverName);
171    if (peerId2Offsets == null) {
172      // no replication queue for this dead rs, we can delete all wal files for it
173      return true;
174    }
175    for (String peerId : peerIds) {
176      Map<String, ReplicationGroupOffset> offsets = peerId2Offsets.get(peerId);
177      if (offsets == null) {
178        // for dead server, we only care about existing replication queues, as we will delete a
179        // queue after we finish replicating it.
180        continue;
181      }
182      String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(file.getPath().getName());
183      ReplicationGroupOffset offset = offsets.get(walGroupId);
184      // if a replication queue still need to replicate this wal, we can not delete it
185      if (!shouldDelete(offset, file)) {
186        return false;
187      }
188    }
189    // if all replication queues have already finished replicating this wal, we can delete it.
190    return true;
191  }
192
193  @Override
194  public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) {
195    // all members of this class are null if replication is disabled,
196    // so we cannot filter the files
197    if (this.getConf() == null) {
198      return files;
199    }
200
201    if (isAsyncClusterConnectionClosedOrNull()) {
202      LOG.warn("Skip getting deletable files because asyncClusterConnection is unavailable.");
203      // asyncClusterConnection is unavailable, we shouldn't delete any files.
204      return Collections.emptyList();
205    }
206
207    try {
208      if (!rpm.getQueueStorage().hasData()) {
209        return files;
210      }
211    } catch (ReplicationException e) {
212      LOG.error("Error occurred while executing queueStorage.hasData()", e);
213      return Collections.emptyList();
214    }
215    if (!canFilter) {
216      // We can not delete anything if there are AddPeerProcedure running at the same time
217      // See HBASE-27214 for more details.
218      return Collections.emptyList();
219    }
220
221    return Iterables.filter(files, new Predicate<FileStatus>() {
222      @Override
223      public boolean apply(FileStatus file) {
224        // just for overriding the findbugs NP warnings, as the parameter is marked as Nullable in
225        // the guava Predicate.
226        if (file == null) {
227          return false;
228        }
229        if (peerIds.isEmpty()) {
230          // no peer, can always delete
231          return true;
232        }
233        // not a valid wal file name, delete
234        if (!AbstractFSWALProvider.validateWALFilename(file.getPath().getName())) {
235          return true;
236        }
237        // meta wal is always deletable as we will never replicate it
238        if (AbstractFSWALProvider.isMetaFile(file.getPath())) {
239          return true;
240        }
241        ServerName serverName =
242          AbstractFSWALProvider.parseServerNameFromWALName(file.getPath().getName());
243        if (notFullyDeadServers.contains(serverName)) {
244          return filterForLiveRegionServer(serverName, file);
245        } else {
246          return filterForDeadRegionServer(serverName, file);
247        }
248      }
249    });
250  }
251
252  private Set<ServerName> getNotFullyDeadServers(MasterServices services) {
253    List<ServerName> onlineServers = services.getServerManager().getOnlineServersList();
254    return Stream.concat(onlineServers.stream(),
255      services.getMasterProcedureExecutor().getProcedures().stream()
256        .filter(p -> p instanceof ServerCrashProcedure).filter(p -> !p.isFinished())
257        .map(p -> ((ServerCrashProcedure) p).getServerName()))
258      .collect(Collectors.toSet());
259  }
260
261  @Override
262  public void init(Map<String, Object> params) {
263    super.init(params);
264    if (MapUtils.isNotEmpty(params)) {
265      Object master = params.get(HMaster.MASTER);
266      if (master instanceof MasterServices) {
267        masterService = (MasterServices) master;
268        barrier = masterService.getReplicationLogCleanerBarrier();
269        rpm = masterService.getReplicationPeerManager();
270        getNotFullyDeadServers = () -> getNotFullyDeadServers(masterService);
271        return;
272      }
273    }
274    throw new IllegalArgumentException("Missing " + HMaster.MASTER + " parameter");
275  }
276
277  @Override
278  public void stop(String why) {
279    this.stopped = true;
280  }
281
282  @Override
283  public boolean isStopped() {
284    return this.stopped;
285  }
286
287  /**
288   * Check if asyncClusterConnection is null or closed.
289   * @return true if asyncClusterConnection is null or is closed, false otherwise
290   */
291  private boolean isAsyncClusterConnectionClosedOrNull() {
292    AsyncClusterConnection asyncClusterConnection = masterService.getAsyncClusterConnection();
293    return asyncClusterConnection == null || asyncClusterConnection.isClosed();
294  }
295}