View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.replication.master;
20  
21  import com.google.common.annotations.VisibleForTesting;
22  import org.apache.commons.logging.Log;
23  import org.apache.commons.logging.LogFactory;
24  import org.apache.hadoop.hbase.classification.InterfaceAudience;
25  import org.apache.hadoop.conf.Configuration;
26  import org.apache.hadoop.fs.FileStatus;
27  import org.apache.hadoop.hbase.Abortable;
28  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
29  import org.apache.hadoop.hbase.HConstants;
30  import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate;
31  import org.apache.hadoop.hbase.replication.ReplicationException;
32  import org.apache.hadoop.hbase.replication.ReplicationFactory;
33  import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
34  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
35  import java.io.IOException;
36  import java.util.Collections;
37  import java.util.List;
38  import java.util.Set;
39  
40  import com.google.common.base.Predicate;
41  import com.google.common.collect.ImmutableSet;
42  import com.google.common.collect.Iterables;
43  import com.google.common.collect.Sets;
44  import org.apache.zookeeper.KeeperException;
45  
46  /**
47   * Implementation of a log cleaner that checks if a log is still scheduled for
48   * replication before deleting it when its TTL is over.
49   */
50  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
51  public class ReplicationLogCleaner extends BaseLogCleanerDelegate {
52    private static final Log LOG = LogFactory.getLog(ReplicationLogCleaner.class);
53    private ZooKeeperWatcher zkw;
54    private ReplicationQueuesClient replicationQueues;
55    private boolean stopped = false;
56  
57  
58    @Override
59    public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) {
60     // all members of this class are null if replication is disabled,
61     // so we cannot filter the files
62      if (this.getConf() == null) {
63        return files;
64      }
65  
66      final Set<String> wals;
67      try {
68        // The concurrently created new WALs may not be included in the return list,
69        // but they won't be deleted because they're not in the checking set.
70        wals = loadWALsFromQueues();
71      } catch (KeeperException e) {
72        LOG.warn("Failed to read zookeeper, skipping checking deletable files");
73        return Collections.emptyList();
74      }
75      return Iterables.filter(files, new Predicate<FileStatus>() {
76        @Override
77        public boolean apply(FileStatus file) {
78          String wal = file.getPath().getName();
79          boolean logInReplicationQueue = wals.contains(wal);
80          if (LOG.isDebugEnabled()) {
81            if (logInReplicationQueue) {
82              LOG.debug("Found log in ZK, keeping: " + wal);
83            } else {
84              LOG.debug("Didn't find this log in ZK, deleting: " + wal);
85            }
86          }
87         return !logInReplicationQueue;
88        }});
89    }
90  
91    /**
92     * Load all wals in all replication queues from ZK. This method guarantees to return a
93     * snapshot which contains all WALs in the zookeeper at the start of this call even there
94     * is concurrent queue failover. However, some newly created WALs during the call may
95     * not be included.
96     */
97    private Set<String> loadWALsFromQueues() throws KeeperException {
98      for (int retry = 0; ; retry++) {
99        int v0 = replicationQueues.getQueuesZNodeCversion();
100       List<String> rss = replicationQueues.getListOfReplicators();
101       if (rss == null || rss.isEmpty()) {
102         LOG.debug("Didn't find any region server that replicates, won't prevent any deletions.");
103         return ImmutableSet.of();
104       }
105       Set<String> wals = Sets.newHashSet();
106       for (String rs : rss) {
107         List<String> listOfPeers = replicationQueues.getAllQueues(rs);
108         // if rs just died, this will be null
109         if (listOfPeers == null) {
110           continue;
111         }
112         for (String id : listOfPeers) {
113           List<String> peersWals = replicationQueues.getLogsInQueue(rs, id);
114           if (peersWals != null) {
115             wals.addAll(peersWals);
116           }
117         }
118       }
119       int v1 = replicationQueues.getQueuesZNodeCversion();
120       if (v0 == v1) {
121         return wals;
122       }
123       LOG.info(String.format("Replication queue node cversion changed from %d to %d, retry = %d",
124           v0, v1, retry));
125     }
126   }
127 
128   @Override
129   public void setConf(Configuration config) {
130     // If replication is disabled, keep all members null
131     if (!config.getBoolean(HConstants.REPLICATION_ENABLE_KEY,
132         HConstants.REPLICATION_ENABLE_DEFAULT)) {
133       LOG.warn("Not configured - allowing all wals to be deleted");
134       return;
135     }
136     // Make my own Configuration.  Then I'll have my own connection to zk that
137     // I can close myself when comes time.
138     Configuration conf = new Configuration(config);
139     try {
140       setConf(conf, new ZooKeeperWatcher(conf, "replicationLogCleaner", null));
141     } catch (IOException e) {
142       LOG.error("Error while configuring " + this.getClass().getName(), e);
143     }
144   }
145 
146   @VisibleForTesting
147   public void setConf(Configuration conf, ZooKeeperWatcher zk) {
148     super.setConf(conf);
149     try {
150       this.zkw = zk;
151       this.replicationQueues = ReplicationFactory.getReplicationQueuesClient(zkw, conf,
152           new WarnOnlyAbortable());
153       this.replicationQueues.init();
154     } catch (ReplicationException e) {
155       LOG.error("Error while configuring " + this.getClass().getName(), e);
156     }
157   }
158 
159   @VisibleForTesting
160   public void setConf(Configuration conf, ZooKeeperWatcher zk, 
161       ReplicationQueuesClient replicationQueuesClient) {
162     super.setConf(conf);
163     this.zkw = zk;
164     this.replicationQueues = replicationQueuesClient;
165   }
166 
167   @Override
168   public void stop(String why) {
169     if (this.stopped) return;
170     this.stopped = true;
171     if (this.zkw != null) {
172       LOG.info("Stopping " + this.zkw);
173       this.zkw.close();
174     }
175   }
176 
177   @Override
178   public boolean isStopped() {
179     return this.stopped;
180   }
181 
182   public static class WarnOnlyAbortable implements Abortable {
183 
184     @Override
185     public void abort(String why, Throwable e) {
186       LOG.warn("ReplicationLogCleaner received abort, ignoring.  Reason: " + why);
187       if (LOG.isDebugEnabled()) {
188         LOG.debug(e);
189       }
190     }
191 
192     @Override
193     public boolean isAborted() {
194       return false;
195     }
196   }
197 }