001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
003 * agreements. See the NOTICE file distributed with this work for additional information regarding
004 * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
005 * "License"); you may not use this file except in compliance with the License. You may obtain a
006 * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
007 * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
008 * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
009 * for the specific language governing permissions and limitations under the License.
010 */
011package org.apache.hadoop.hbase.replication.master;
012
013import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
014import org.apache.hbase.thirdparty.com.google.common.base.Predicate;
015import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
016import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
017import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
018
019import java.io.IOException;
020import java.util.Collections;
021import java.util.List;
022import java.util.Set;
023
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.fs.FileStatus;
026import org.apache.hadoop.hbase.Abortable;
027import org.apache.hadoop.hbase.HBaseInterfaceAudience;
028import org.apache.hadoop.hbase.HConstants;
029import org.apache.yetus.audience.InterfaceAudience;
030import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate;
031import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
032import org.apache.hadoop.hbase.replication.ReplicationFactory;
033import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
034import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments;
035import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
036import org.apache.zookeeper.KeeperException;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039
040/**
041 * Implementation of a file cleaner that checks if a hfile is still scheduled for replication before
042 * deleting it from hfile archive directory.
043 */
044@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
045public class ReplicationHFileCleaner extends BaseHFileCleanerDelegate {
046  private static final Logger LOG = LoggerFactory.getLogger(ReplicationHFileCleaner.class);
047  private ZKWatcher zkw;
048  private ReplicationQueuesClient rqc;
049  private boolean stopped = false;
050
051  @Override
052  public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) {
053    // all members of this class are null if replication is disabled,
054    // so we cannot filter the files
055    if (this.getConf() == null) {
056      return files;
057    }
058
059    final Set<String> hfileRefs;
060    try {
061      // The concurrently created new hfile entries in ZK may not be included in the return list,
062      // but they won't be deleted because they're not in the checking set.
063      hfileRefs = loadHFileRefsFromPeers();
064    } catch (KeeperException e) {
065      LOG.warn("Failed to read hfile references from zookeeper, skipping checking deletable files");
066      return Collections.emptyList();
067    }
068    return Iterables.filter(files, new Predicate<FileStatus>() {
069      @Override
070      public boolean apply(FileStatus file) {
071        String hfile = file.getPath().getName();
072        boolean foundHFileRefInQueue = hfileRefs.contains(hfile);
073        if (LOG.isDebugEnabled()) {
074          if (foundHFileRefInQueue) {
075            LOG.debug("Found hfile reference in ZK, keeping: " + hfile);
076          } else {
077            LOG.debug("Did not find hfile reference in ZK, deleting: " + hfile);
078          }
079        }
080        return !foundHFileRefInQueue;
081      }
082    });
083  }
084
085  /**
086   * Load all hfile references in all replication queues from ZK. This method guarantees to return a
087   * snapshot which contains all hfile references in the zookeeper at the start of this call.
088   * However, some newly created hfile references during the call may not be included.
089   */
090  private Set<String> loadHFileRefsFromPeers() throws KeeperException {
091    Set<String> hfileRefs = Sets.newHashSet();
092    List<String> listOfPeers;
093    for (int retry = 0;; retry++) {
094      int v0 = rqc.getHFileRefsNodeChangeVersion();
095      hfileRefs.clear();
096      listOfPeers = rqc.getAllPeersFromHFileRefsQueue();
097      if (listOfPeers == null) {
098        LOG.debug("Didn't find any peers with hfile references, won't prevent any deletions.");
099        return ImmutableSet.of();
100      }
101      for (String id : listOfPeers) {
102        List<String> peerHFileRefs = rqc.getReplicableHFiles(id);
103        if (peerHFileRefs != null) {
104          hfileRefs.addAll(peerHFileRefs);
105        }
106      }
107      int v1 = rqc.getHFileRefsNodeChangeVersion();
108      if (v0 == v1) {
109        return hfileRefs;
110      }
111      LOG.debug(String.format("Replication hfile references node cversion changed from "
112          + "%d to %d, retry = %d", v0, v1, retry));
113    }
114  }
115
116  @Override
117  public void setConf(Configuration config) {
118    // If either replication or replication of bulk load hfiles is disabled, keep all members null
119    if (!(config.getBoolean(
120      HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
121      HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT))) {
122      LOG.warn(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY
123          + " is not enabled. Better to remove "
124          + ReplicationHFileCleaner.class + " from " + HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS
125          + " configuration.");
126      return;
127    }
128    // Make my own Configuration. Then I'll have my own connection to zk that
129    // I can close myself when time comes.
130    Configuration conf = new Configuration(config);
131    try {
132      setConf(conf, new ZKWatcher(conf, "replicationHFileCleaner", null));
133    } catch (IOException e) {
134      LOG.error("Error while configuring " + this.getClass().getName(), e);
135    }
136  }
137
138  @VisibleForTesting
139  public void setConf(Configuration conf, ZKWatcher zk) {
140    super.setConf(conf);
141    try {
142      initReplicationQueuesClient(conf, zk);
143    } catch (Exception e) {
144      LOG.error("Error while configuring " + this.getClass().getName(), e);
145    }
146  }
147
148  private void initReplicationQueuesClient(Configuration conf, ZKWatcher zk)
149      throws Exception {
150    this.zkw = zk;
151    this.rqc = ReplicationFactory.getReplicationQueuesClient(new ReplicationQueuesClientArguments(
152        conf, new WarnOnlyAbortable(), zkw));
153  }
154
155  @Override
156  public void stop(String why) {
157    if (this.stopped) {
158      return;
159    }
160    this.stopped = true;
161    if (this.zkw != null) {
162      LOG.info("Stopping " + this.zkw);
163      this.zkw.close();
164    }
165  }
166
167  @Override
168  public boolean isStopped() {
169    return this.stopped;
170  }
171
172  @Override
173  public boolean isFileDeletable(FileStatus fStat) {
174    Set<String> hfileRefsFromQueue;
175    // all members of this class are null if replication is disabled,
176    // so do not stop from deleting the file
177    if (getConf() == null) {
178      return true;
179    }
180
181    try {
182      hfileRefsFromQueue = loadHFileRefsFromPeers();
183    } catch (KeeperException e) {
184      LOG.warn("Failed to read hfile references from zookeeper, skipping checking deletable "
185          + "file for " + fStat.getPath());
186      return false;
187    }
188    return !hfileRefsFromQueue.contains(fStat.getPath().getName());
189  }
190
191  private static class WarnOnlyAbortable implements Abortable {
192    @Override
193    public void abort(String why, Throwable e) {
194      LOG.warn("ReplicationHFileCleaner received abort, ignoring.  Reason: " + why);
195      LOG.debug(e.toString(), e);
196    }
197
198    @Override
199    public boolean isAborted() {
200      return false;
201    }
202  }
203}