View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.replication.master;
20  
21  import com.google.common.annotations.VisibleForTesting;
22  import org.apache.commons.logging.Log;
23  import org.apache.commons.logging.LogFactory;
24  import org.apache.hadoop.hbase.classification.InterfaceAudience;
25  import org.apache.hadoop.conf.Configuration;
26  import org.apache.hadoop.fs.FileStatus;
27  import org.apache.hadoop.hbase.Abortable;
28  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
29  import org.apache.hadoop.hbase.HConstants;
30  import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate;
31  import org.apache.hadoop.hbase.replication.ReplicationException;
32  import org.apache.hadoop.hbase.replication.ReplicationFactory;
33  import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
34  import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments;
35  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
36  import java.io.IOException;
37  import java.util.Collections;
38  import java.util.List;
39  import java.util.Set;
40
41  import com.google.common.base.Predicate;
42  import com.google.common.collect.ImmutableSet;
43  import com.google.common.collect.Iterables;
44  import com.google.common.collect.Sets;
45  import org.apache.zookeeper.KeeperException;
46
47  /**
48   * Implementation of a log cleaner that checks if a log is still scheduled for
49   * replication before deleting it when its TTL is over.
50   */
51  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
52  public class ReplicationLogCleaner extends BaseLogCleanerDelegate {
53    private static final Log LOG = LogFactory.getLog(ReplicationLogCleaner.class);
54    private ZooKeeperWatcher zkw;
55    private ReplicationQueuesClient replicationQueues;
56    private boolean stopped = false;
57  
58
59    @Override
60    public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) {
61     // all members of this class are null if replication is disabled,
62     // so we cannot filter the files
63      if (this.getConf() == null) {
64        return files;
65      }
66
67      final Set<String> wals;
68      try {
69        // The concurrently created new WALs may not be included in the return list,
70        // but they won't be deleted because they're not in the checking set.
71        wals = replicationQueues.getAllWALs();
72      } catch (KeeperException e) {
73        LOG.warn("Failed to read zookeeper, skipping checking deletable files");
74        return Collections.emptyList();
75      }
76      return Iterables.filter(files, new Predicate<FileStatus>() {
77        @Override
78        public boolean apply(FileStatus file) {
79          String wal = file.getPath().getName();
80          boolean logInReplicationQueue = wals.contains(wal);
81          if (LOG.isDebugEnabled()) {
82            if (logInReplicationQueue) {
83              LOG.debug("Found log in ZK, keeping: " + wal);
84            } else {
85              LOG.debug("Didn't find this log in ZK, deleting: " + wal);
86            }
87          }
88         return !logInReplicationQueue;
89        }});
90    }
91
92    @Override
93    public void setConf(Configuration config) {
94      // If replication is disabled, keep all members null
95      if (!config.getBoolean(HConstants.REPLICATION_ENABLE_KEY,
96          HConstants.REPLICATION_ENABLE_DEFAULT)) {
97        LOG.warn("Not configured - allowing all wals to be deleted");
98        return;
99      }
100     // Make my own Configuration.  Then I'll have my own connection to zk that
101     // I can close myself when comes time.
102     Configuration conf = new Configuration(config);
103     try {
104       setConf(conf, new ZooKeeperWatcher(conf, "replicationLogCleaner", null));
105     } catch (IOException e) {
106       LOG.error("Error while configuring " + this.getClass().getName(), e);
107     }
108   }
109
110   @VisibleForTesting
111   public void setConf(Configuration conf, ZooKeeperWatcher zk) {
112     super.setConf(conf);
113     try {
114       this.zkw = zk;
115       this.replicationQueues = ReplicationFactory.getReplicationQueuesClient(
116           new ReplicationQueuesClientArguments(conf, new WarnOnlyAbortable(), zkw));
117       this.replicationQueues.init();
118     } catch (Exception e) {
119       LOG.error("Error while configuring " + this.getClass().getName(), e);
120     }
121   }
122
123   @Override
124   public void stop(String why) {
125     if (this.stopped) return;
126     this.stopped = true;
127     if (this.zkw != null) {
128       LOG.info("Stopping " + this.zkw);
129       this.zkw.close();
130     }
131   }
132
133   @Override
134   public boolean isStopped() {
135     return this.stopped;
136   }
137
138   private static class WarnOnlyAbortable implements Abortable {
139
140     @Override
141     public void abort(String why, Throwable e) {
142       LOG.warn("ReplicationLogCleaner received abort, ignoring.  Reason: " + why);
143       if (LOG.isDebugEnabled()) {
144         LOG.debug(e);
145       }
146     }
147
148     @Override
149     public boolean isAborted() {
150       return false;
151     }
152   }
153 }