001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.backup;
019
020import java.io.IOException;
021import java.util.Collections;
022import java.util.HashSet;
023import java.util.List;
024import java.util.Map;
025import java.util.Set;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.fs.FileStatus;
028import org.apache.hadoop.fs.Path;
029import org.apache.hadoop.hbase.Abortable;
030import org.apache.hadoop.hbase.HBaseInterfaceAudience;
031import org.apache.hadoop.hbase.TableName;
032import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
033import org.apache.hadoop.hbase.client.Connection;
034import org.apache.hadoop.hbase.client.ConnectionFactory;
035import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate;
036import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
037import org.apache.yetus.audience.InterfaceAudience;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
042
043/**
044 * Implementation of a file cleaner that checks if an hfile is still referenced by backup before
045 * deleting it from hfile archive directory.
046 */
047@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
048public class BackupHFileCleaner extends BaseHFileCleanerDelegate implements Abortable {
049  private static final Logger LOG = LoggerFactory.getLogger(BackupHFileCleaner.class);
050  private boolean stopped = false;
051  private boolean aborted;
052  private Configuration conf;
053  private Connection connection;
054  private long prevReadFromBackupTbl = 0, // timestamp of most recent read from backup:system table
055      secondPrevReadFromBackupTbl = 0; // timestamp of 2nd most recent read from backup:system table
056  // used by unit test to skip reading backup:system
057  private boolean checkForFullyBackedUpTables = true;
058  private List<TableName> fullyBackedUpTables = null;
059
060  private Set<String> getFilenameFromBulkLoad(Map<byte[], List<Path>>[] maps) {
061    Set<String> filenames = new HashSet<>();
062    for (Map<byte[], List<Path>> map : maps) {
063      if (map == null) {
064        continue;
065      }
066
067      for (List<Path> paths : map.values()) {
068        for (Path p : paths) {
069          filenames.add(p.getName());
070        }
071      }
072    }
073    return filenames;
074  }
075
076  private Set<String> loadHFileRefs(List<TableName> tableList) throws IOException {
077    if (connection == null) {
078      connection = ConnectionFactory.createConnection(conf);
079    }
080    try (BackupSystemTable tbl = new BackupSystemTable(connection)) {
081      Map<byte[], List<Path>>[] res = tbl.readBulkLoadedFiles(null, tableList);
082      secondPrevReadFromBackupTbl = prevReadFromBackupTbl;
083      prevReadFromBackupTbl = EnvironmentEdgeManager.currentTime();
084      return getFilenameFromBulkLoad(res);
085    }
086  }
087
088  @InterfaceAudience.Private
089  void setCheckForFullyBackedUpTables(boolean b) {
090    checkForFullyBackedUpTables = b;
091  }
092
093  @Override
094  public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) {
095    if (conf == null) {
096      return files;
097    }
098    // obtain the Set of TableName's which have been fully backed up
099    // so that we filter BulkLoad to be returned from server
100    if (checkForFullyBackedUpTables) {
101      if (connection == null) {
102        return files;
103      }
104
105      try (BackupSystemTable tbl = new BackupSystemTable(connection)) {
106        fullyBackedUpTables = tbl.getTablesForBackupType(BackupType.FULL);
107      } catch (IOException ioe) {
108        LOG.error("Failed to get tables which have been fully backed up, skipping checking", ioe);
109        return Collections.emptyList();
110      }
111      Collections.sort(fullyBackedUpTables);
112    }
113    final Set<String> hfileRefs;
114    try {
115      hfileRefs = loadHFileRefs(fullyBackedUpTables);
116    } catch (IOException ioe) {
117      LOG.error("Failed to read hfile references, skipping checking deletable files", ioe);
118      return Collections.emptyList();
119    }
120    Iterable<FileStatus> deletables = Iterables.filter(files, file -> {
121      // If the file is recent, be conservative and wait for one more scan of backup:system table
122      if (file.getModificationTime() > secondPrevReadFromBackupTbl) {
123        return false;
124      }
125      String hfile = file.getPath().getName();
126      boolean foundHFileRef = hfileRefs.contains(hfile);
127      return !foundHFileRef;
128    });
129    return deletables;
130  }
131
132  @Override
133  public boolean isFileDeletable(FileStatus fStat) {
134    // work is done in getDeletableFiles()
135    return true;
136  }
137
138  @Override
139  public void setConf(Configuration config) {
140    this.conf = config;
141    this.connection = null;
142    try {
143      this.connection = ConnectionFactory.createConnection(conf);
144    } catch (IOException ioe) {
145      LOG.error("Couldn't establish connection", ioe);
146    }
147  }
148
149  @Override
150  public void stop(String why) {
151    if (this.stopped) {
152      return;
153    }
154    if (this.connection != null) {
155      try {
156        this.connection.close();
157      } catch (IOException ioe) {
158        LOG.debug("Got " + ioe + " when closing connection");
159      }
160    }
161    this.stopped = true;
162  }
163
164  @Override
165  public boolean isStopped() {
166    return this.stopped;
167  }
168
169  @Override
170  public void abort(String why, Throwable e) {
171    LOG.warn("Aborting ReplicationHFileCleaner because " + why, e);
172    this.aborted = true;
173    stop(why);
174  }
175
176  @Override
177  public boolean isAborted() {
178    return this.aborted;
179  }
180}