View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.replication.master;
20  
21  import org.apache.commons.logging.Log;
22  import org.apache.commons.logging.LogFactory;
23  import org.apache.hadoop.classification.InterfaceAudience;
24  import org.apache.hadoop.conf.Configuration;
25  import org.apache.hadoop.fs.FileStatus;
26  import org.apache.hadoop.hbase.Abortable;
27  import org.apache.hadoop.hbase.HConstants;
28  import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate;
29  import org.apache.hadoop.hbase.replication.ReplicationException;
30  import org.apache.hadoop.hbase.replication.ReplicationFactory;
31  import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
32  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
33  import java.io.IOException;
34  import java.util.List;
35  import java.util.Set;
36  
37  import com.google.common.base.Predicate;
38  import com.google.common.collect.ImmutableSet;
39  import com.google.common.collect.Iterables;
40  import com.google.common.collect.Sets;
41  
42  /**
43   * Implementation of a log cleaner that checks if a log is still scheduled for
44   * replication before deleting it when its TTL is over.
45   */
46  @InterfaceAudience.Private
47  public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abortable {
48    private static final Log LOG = LogFactory.getLog(ReplicationLogCleaner.class);
49    private ZooKeeperWatcher zkw;
50    private ReplicationQueuesClient replicationQueues;
51    private boolean stopped = false;
52    private boolean aborted;
53  
54  
55    @Override
56    public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) {
57     // all members of this class are null if replication is disabled,
58     // so we cannot filter the files
59      if (this.getConf() == null) {
60        return files;
61      }
62  
63      final Set<String> hlogs = loadHLogsFromQueues();
64      return Iterables.filter(files, new Predicate<FileStatus>() {
65        @Override
66        public boolean apply(FileStatus file) {
67          String hlog = file.getPath().getName();
68          boolean logInReplicationQueue = hlogs.contains(hlog);
69          if (LOG.isDebugEnabled()) {
70            if (logInReplicationQueue) {
71              LOG.debug("Found log in ZK, keeping: " + hlog);
72            } else {
73              LOG.debug("Didn't find this log in ZK, deleting: " + hlog);
74            }
75          }
76         return !logInReplicationQueue;
77        }});
78    }
79  
80    /**
81     * Load all hlogs in all replication queues from ZK
82     */
83    private Set<String> loadHLogsFromQueues() {
84      List<String> rss = replicationQueues.getListOfReplicators();
85      if (rss == null) {
86        LOG.debug("Didn't find any region server that replicates, won't prevent any deletions.");
87        return ImmutableSet.of();
88      }
89      Set<String> hlogs = Sets.newHashSet();
90      for (String rs: rss) {
91        List<String> listOfPeers = replicationQueues.getAllQueues(rs);
92        // if rs just died, this will be null
93        if (listOfPeers == null) {
94          continue;
95        }
96        for (String id : listOfPeers) {
97          List<String> peersHlogs = replicationQueues.getLogsInQueue(rs, id);
98          if (peersHlogs != null) {
99            hlogs.addAll(peersHlogs);
100         }
101       }
102     }
103     return hlogs;
104   }
105 
106   @Override
107   public void setConf(Configuration config) {
108     // If replication is disabled, keep all members null
109     if (!config.getBoolean(HConstants.REPLICATION_ENABLE_KEY,
110         HConstants.REPLICATION_ENABLE_DEFAULT)) {
111       LOG.warn("Not configured - allowing all hlogs to be deleted");
112       return;
113     }
114     // Make my own Configuration.  Then I'll have my own connection to zk that
115     // I can close myself when comes time.
116     Configuration conf = new Configuration(config);
117     super.setConf(conf);
118     try {
119       this.zkw = new ZooKeeperWatcher(conf, "replicationLogCleaner", null);
120       this.replicationQueues = ReplicationFactory.getReplicationQueuesClient(zkw, conf, this);
121       this.replicationQueues.init();
122     } catch (ReplicationException e) {
123       LOG.error("Error while configuring " + this.getClass().getName(), e);
124     } catch (IOException e) {
125       LOG.error("Error while configuring " + this.getClass().getName(), e);
126     }
127   }
128 
129   @Override
130   public void stop(String why) {
131     if (this.stopped) return;
132     this.stopped = true;
133     if (this.zkw != null) {
134       LOG.info("Stopping " + this.zkw);
135       this.zkw.close();
136     }
137   }
138 
139   @Override
140   public boolean isStopped() {
141     return this.stopped;
142   }
143 
144   @Override
145   public void abort(String why, Throwable e) {
146     LOG.warn("Aborting ReplicationLogCleaner because " + why, e);
147     this.aborted = true;
148     stop(why);
149   }
150 
151   @Override
152   public boolean isAborted() {
153     return this.aborted;
154   }
155 }