001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.cleaner;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.HashMap;
023import java.util.HashSet;
024import java.util.List;
025import java.util.Map;
026import java.util.Set;
027import java.util.Map.Entry;
028
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.Abortable;
031import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
032import org.apache.yetus.audience.InterfaceAudience;
033import org.apache.hadoop.hbase.replication.ReplicationFactory;
034import org.apache.hadoop.hbase.replication.ReplicationPeers;
035import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
036import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
037import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments;
038import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
039import org.apache.hadoop.hbase.zookeeper.ZKUtil;
040import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
041import org.apache.zookeeper.KeeperException;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045/**
046 * Used to clean the replication queues belonging to the peer which does not exist.
047 */
048@InterfaceAudience.Private
049public class ReplicationZKNodeCleaner {
050  private static final Logger LOG = LoggerFactory.getLogger(ReplicationZKNodeCleaner.class);
051  private final ZKWatcher zkw;
052  private final ReplicationQueuesClient queuesClient;
053  private final ReplicationPeers replicationPeers;
054  private final ReplicationQueueDeletor queueDeletor;
055
056  public ReplicationZKNodeCleaner(Configuration conf, ZKWatcher zkw, Abortable abortable)
057      throws IOException {
058    try {
059      this.zkw = zkw;
060      this.queuesClient = ReplicationFactory
061          .getReplicationQueuesClient(new ReplicationQueuesClientArguments(conf, abortable, zkw));
062      this.queuesClient.init();
063      this.replicationPeers = ReplicationFactory.getReplicationPeers(zkw, conf, this.queuesClient,
064        abortable);
065      this.replicationPeers.init();
066      this.queueDeletor = new ReplicationQueueDeletor(zkw, conf, abortable);
067    } catch (Exception e) {
068      throw new IOException("failed to construct ReplicationZKNodeCleaner", e);
069    }
070  }
071
072  /**
073   * @return undeletedQueues replicator with its queueIds for removed peers
074   * @throws IOException
075   */
076  public Map<String, List<String>> getUnDeletedQueues() throws IOException {
077    Map<String, List<String>> undeletedQueues = new HashMap<>();
078    Set<String> peerIds = new HashSet<>(this.replicationPeers.getAllPeerIds());
079    try {
080      List<String> replicators = this.queuesClient.getListOfReplicators();
081      if (replicators == null || replicators.isEmpty()) {
082        return undeletedQueues;
083      }
084      for (String replicator : replicators) {
085        List<String> queueIds = this.queuesClient.getAllQueues(replicator);
086        for (String queueId : queueIds) {
087          ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
088          if (!peerIds.contains(queueInfo.getPeerId())) {
089            undeletedQueues.computeIfAbsent(replicator, (key) -> new ArrayList<>()).add(
090              queueId);
091            if (LOG.isDebugEnabled()) {
092              LOG.debug("Undeleted replication queue for removed peer found: "
093                  + String.format("[removedPeerId=%s, replicator=%s, queueId=%s]",
094                    queueInfo.getPeerId(), replicator, queueId));
095            }
096          }
097        }
098      }
099    } catch (KeeperException ke) {
100      throw new IOException("Failed to get the replication queues of all replicators", ke);
101    }
102    return undeletedQueues;
103  }
104
105  /**
106   * @return undeletedHFileRefsQueue replicator with its undeleted queueIds for removed peers in
107   *         hfile-refs queue
108   * @throws IOException
109   */
110  public Set<String> getUnDeletedHFileRefsQueues() throws IOException {
111    Set<String> undeletedHFileRefsQueue = new HashSet<>();
112    Set<String> peerIds = new HashSet<>(this.replicationPeers.getAllPeerIds());
113    String hfileRefsZNode = queueDeletor.getHfileRefsZNode();
114    try {
115      if (-1 == ZKUtil.checkExists(zkw, hfileRefsZNode)) {
116        return null;
117      }
118      List<String> listOfPeers = this.queuesClient.getAllPeersFromHFileRefsQueue();
119      Set<String> peers = new HashSet<>(listOfPeers);
120      peers.removeAll(peerIds);
121      if (!peers.isEmpty()) {
122        undeletedHFileRefsQueue.addAll(peers);
123      }
124    } catch (KeeperException e) {
125      throw new IOException("Failed to get list of all peers from hfile-refs znode "
126          + hfileRefsZNode, e);
127    }
128    return undeletedHFileRefsQueue;
129  }
130
131  private class ReplicationQueueDeletor extends ReplicationStateZKBase {
132
133    public ReplicationQueueDeletor(ZKWatcher zk, Configuration conf, Abortable abortable) {
134      super(zk, conf, abortable);
135    }
136
137    /**
138     * @param replicator The regionserver which has undeleted queue
139     * @param queueId The undeleted queue id
140     * @throws IOException
141     */
142    public void removeQueue(final String replicator, final String queueId) throws IOException {
143      String queueZnodePath =
144        ZNodePaths.joinZNode(ZNodePaths.joinZNode(this.queuesZNode, replicator), queueId);
145      try {
146        ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
147        if (!replicationPeers.getAllPeerIds().contains(queueInfo.getPeerId())) {
148          ZKUtil.deleteNodeRecursively(this.zookeeper, queueZnodePath);
149          LOG.info("Successfully removed replication queue, replicator: " + replicator
150              + ", queueId: " + queueId);
151        }
152      } catch (KeeperException e) {
153        throw new IOException("Failed to delete queue, replicator: " + replicator + ", queueId: "
154            + queueId);
155      }
156    }
157
158    /**
159     * @param hfileRefsQueueId The undeleted hfile-refs queue id
160     * @throws IOException
161     */
162    public void removeHFileRefsQueue(final String hfileRefsQueueId) throws IOException {
163      String node = ZNodePaths.joinZNode(this.hfileRefsZNode, hfileRefsQueueId);
164      try {
165        if (!replicationPeers.getAllPeerIds().contains(hfileRefsQueueId)) {
166          ZKUtil.deleteNodeRecursively(this.zookeeper, node);
167          LOG.info("Successfully removed hfile-refs queue " + hfileRefsQueueId + " from path "
168              + hfileRefsZNode);
169        }
170      } catch (KeeperException e) {
171        throw new IOException("Failed to delete hfile-refs queue " + hfileRefsQueueId
172            + " from path " + hfileRefsZNode);
173      }
174    }
175
176    String getHfileRefsZNode() {
177      return this.hfileRefsZNode;
178    }
179  }
180
181  /**
182   * Remove the undeleted replication queue's zk node for removed peers.
183   * @param undeletedQueues replicator with its queueIds for removed peers
184   * @throws IOException
185   */
186  public void removeQueues(final Map<String, List<String>> undeletedQueues) throws IOException {
187    for (Entry<String, List<String>> replicatorAndQueueIds : undeletedQueues.entrySet()) {
188      String replicator = replicatorAndQueueIds.getKey();
189      for (String queueId : replicatorAndQueueIds.getValue()) {
190        queueDeletor.removeQueue(replicator, queueId);
191      }
192    }
193  }
194
195  /**
196   * Remove the undeleted hfile-refs queue's zk node for removed peers.
197   * @param undeletedHFileRefsQueues replicator with its undeleted queueIds for removed peers in
198   *          hfile-refs queue
199   * @throws IOException
200   */
201  public void removeHFileRefsQueues(final Set<String> undeletedHFileRefsQueues) throws IOException {
202    for (String hfileRefsQueueId : undeletedHFileRefsQueues) {
203      queueDeletor.removeHFileRefsQueue(hfileRefsQueueId);
204    }
205  }
206}