001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.master;
019
020import java.io.IOException;
021import java.util.Collections;
022import java.util.Map;
023import java.util.Set;
024import org.apache.commons.io.IOUtils;
025import org.apache.hadoop.fs.FileStatus;
026import org.apache.hadoop.hbase.HBaseInterfaceAudience;
027import org.apache.hadoop.hbase.HConstants;
028import org.apache.hadoop.hbase.Server;
029import org.apache.hadoop.hbase.client.Connection;
030import org.apache.hadoop.hbase.client.ConnectionFactory;
031import org.apache.hadoop.hbase.master.HMaster;
032import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate;
033import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
034import org.apache.hadoop.hbase.replication.ReplicationException;
035import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
036import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
037import org.apache.yetus.audience.InterfaceAudience;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041import org.apache.hbase.thirdparty.com.google.common.base.Predicate;
042import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
043import org.apache.hbase.thirdparty.org.apache.commons.collections4.MapUtils;
044
045/**
046 * Implementation of a file cleaner that checks if a hfile is still scheduled for replication before
047 * deleting it from hfile archive directory.
048 */
049@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
050public class ReplicationHFileCleaner extends BaseHFileCleanerDelegate {
051  private static final Logger LOG = LoggerFactory.getLogger(ReplicationHFileCleaner.class);
052  private Connection conn;
053  private boolean shareConn;
054  private ReplicationQueueStorage rqs;
055  private boolean stopped = false;
056
057  @Override
058  public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) {
059    if (
060      !(getConf().getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
061        HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT))
062    ) {
063      LOG.warn(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY + " is not enabled. Better to remove "
064        + ReplicationHFileCleaner.class + " from " + HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS
065        + " configuration.");
066      return files;
067    }
068
069    final Set<String> hfileRefs;
070    try {
071      // The concurrently created new hfile entries in ZK may not be included in the return list,
072      // but they won't be deleted because they're not in the checking set.
073      hfileRefs = rqs.getAllHFileRefs();
074    } catch (ReplicationException e) {
075      LOG.warn("Failed to read hfile references from zookeeper, skipping checking deletable files");
076      return Collections.emptyList();
077    }
078    return Iterables.filter(files, new Predicate<FileStatus>() {
079      @Override
080      public boolean apply(FileStatus file) {
081        // just for overriding the findbugs NP warnings, as the parameter is marked as Nullable in
082        // the guava Predicate.
083        if (file == null) {
084          return false;
085        }
086        String hfile = file.getPath().getName();
087        boolean foundHFileRefInQueue = hfileRefs.contains(hfile);
088        if (LOG.isDebugEnabled()) {
089          if (foundHFileRefInQueue) {
090            LOG.debug("Found hfile reference in ZK, keeping: " + hfile);
091          } else {
092            LOG.debug("Did not find hfile reference in ZK, deleting: " + hfile);
093          }
094        }
095        return !foundHFileRefInQueue;
096      }
097    });
098  }
099
100  @Override
101  public void init(Map<String, Object> params) {
102    super.init(params);
103    try {
104      if (MapUtils.isNotEmpty(params)) {
105        Object master = params.get(HMaster.MASTER);
106        if (master != null && master instanceof Server) {
107          conn = ((Server) master).getConnection();
108          shareConn = true;
109        }
110      }
111      if (conn == null) {
112        conn = ConnectionFactory.createConnection(getConf());
113      }
114      this.rqs = ReplicationStorageFactory.getReplicationQueueStorage(conn, getConf());
115    } catch (IOException e) {
116      LOG.error("Error while configuring " + this.getClass().getName(), e);
117    }
118  }
119
120  @Override
121  public void stop(String why) {
122    if (this.stopped) {
123      return;
124    }
125    this.stopped = true;
126    if (!shareConn && this.conn != null) {
127      LOG.info("Stopping " + this.conn);
128      IOUtils.closeQuietly(conn);
129    }
130  }
131
132  @Override
133  public boolean isStopped() {
134    return this.stopped;
135  }
136
137  @Override
138  public boolean isFileDeletable(FileStatus fStat) {
139    Set<String> hfileRefsFromQueue;
140    // all members of this class are null if replication is disabled,
141    // so do not stop from deleting the file
142    if (getConf() == null) {
143      return true;
144    }
145
146    try {
147      hfileRefsFromQueue = rqs.getAllHFileRefs();
148    } catch (ReplicationException e) {
149      LOG.warn("Failed to read hfile references from zookeeper, skipping checking deletable "
150        + "file for " + fStat.getPath());
151      return false;
152    }
153    return !hfileRefsFromQueue.contains(fStat.getPath().getName());
154  }
155}