001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.cleaner; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.HashMap; 023import java.util.HashSet; 024import java.util.List; 025import java.util.Map; 026import java.util.Set; 027import java.util.Map.Entry; 028 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.hbase.Abortable; 031import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 032import org.apache.yetus.audience.InterfaceAudience; 033import org.apache.hadoop.hbase.replication.ReplicationFactory; 034import org.apache.hadoop.hbase.replication.ReplicationPeers; 035import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; 036import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; 037import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments; 038import org.apache.hadoop.hbase.replication.ReplicationStateZKBase; 039import org.apache.hadoop.hbase.zookeeper.ZKUtil; 040import org.apache.hadoop.hbase.zookeeper.ZNodePaths; 041import org.apache.zookeeper.KeeperException; 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044 045/** 046 * Used to clean the replication queues belonging to the peer which does not exist. 047 */ 048@InterfaceAudience.Private 049public class ReplicationZKNodeCleaner { 050 private static final Logger LOG = LoggerFactory.getLogger(ReplicationZKNodeCleaner.class); 051 private final ZKWatcher zkw; 052 private final ReplicationQueuesClient queuesClient; 053 private final ReplicationPeers replicationPeers; 054 private final ReplicationQueueDeletor queueDeletor; 055 056 public ReplicationZKNodeCleaner(Configuration conf, ZKWatcher zkw, Abortable abortable) 057 throws IOException { 058 try { 059 this.zkw = zkw; 060 this.queuesClient = ReplicationFactory 061 .getReplicationQueuesClient(new ReplicationQueuesClientArguments(conf, abortable, zkw)); 062 this.queuesClient.init(); 063 this.replicationPeers = ReplicationFactory.getReplicationPeers(zkw, conf, this.queuesClient, 064 abortable); 065 this.replicationPeers.init(); 066 this.queueDeletor = new ReplicationQueueDeletor(zkw, conf, abortable); 067 } catch (Exception e) { 068 throw new IOException("failed to construct ReplicationZKNodeCleaner", e); 069 } 070 } 071 072 /** 073 * @return undeletedQueues replicator with its queueIds for removed peers 074 * @throws IOException 075 */ 076 public Map<String, List<String>> getUnDeletedQueues() throws IOException { 077 Map<String, List<String>> undeletedQueues = new HashMap<>(); 078 Set<String> peerIds = new HashSet<>(this.replicationPeers.getAllPeerIds()); 079 try { 080 List<String> replicators = this.queuesClient.getListOfReplicators(); 081 if (replicators == null || replicators.isEmpty()) { 082 return undeletedQueues; 083 } 084 for (String replicator : replicators) { 085 List<String> queueIds = this.queuesClient.getAllQueues(replicator); 086 for (String queueId : queueIds) { 087 ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); 088 if (!peerIds.contains(queueInfo.getPeerId())) { 089 undeletedQueues.computeIfAbsent(replicator, (key) -> new ArrayList<>()).add( 090 queueId); 091 if (LOG.isDebugEnabled()) { 092 LOG.debug("Undeleted replication queue for removed peer found: " 093 + String.format("[removedPeerId=%s, replicator=%s, queueId=%s]", 094 queueInfo.getPeerId(), replicator, queueId)); 095 } 096 } 097 } 098 } 099 } catch (KeeperException ke) { 100 throw new IOException("Failed to get the replication queues of all replicators", ke); 101 } 102 return undeletedQueues; 103 } 104 105 /** 106 * @return undeletedHFileRefsQueue replicator with its undeleted queueIds for removed peers in 107 * hfile-refs queue 108 * @throws IOException 109 */ 110 public Set<String> getUnDeletedHFileRefsQueues() throws IOException { 111 Set<String> undeletedHFileRefsQueue = new HashSet<>(); 112 Set<String> peerIds = new HashSet<>(this.replicationPeers.getAllPeerIds()); 113 String hfileRefsZNode = queueDeletor.getHfileRefsZNode(); 114 try { 115 if (-1 == ZKUtil.checkExists(zkw, hfileRefsZNode)) { 116 return null; 117 } 118 List<String> listOfPeers = this.queuesClient.getAllPeersFromHFileRefsQueue(); 119 Set<String> peers = new HashSet<>(listOfPeers); 120 peers.removeAll(peerIds); 121 if (!peers.isEmpty()) { 122 undeletedHFileRefsQueue.addAll(peers); 123 } 124 } catch (KeeperException e) { 125 throw new IOException("Failed to get list of all peers from hfile-refs znode " 126 + hfileRefsZNode, e); 127 } 128 return undeletedHFileRefsQueue; 129 } 130 131 private class ReplicationQueueDeletor extends ReplicationStateZKBase { 132 133 public ReplicationQueueDeletor(ZKWatcher zk, Configuration conf, Abortable abortable) { 134 super(zk, conf, abortable); 135 } 136 137 /** 138 * @param replicator The regionserver which has undeleted queue 139 * @param queueId The undeleted queue id 140 * @throws IOException 141 */ 142 public void removeQueue(final String replicator, final String queueId) throws IOException { 143 String queueZnodePath = 144 ZNodePaths.joinZNode(ZNodePaths.joinZNode(this.queuesZNode, replicator), queueId); 145 try { 146 ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); 147 if (!replicationPeers.getAllPeerIds().contains(queueInfo.getPeerId())) { 148 ZKUtil.deleteNodeRecursively(this.zookeeper, queueZnodePath); 149 LOG.info("Successfully removed replication queue, replicator: " + replicator 150 + ", queueId: " + queueId); 151 } 152 } catch (KeeperException e) { 153 throw new IOException("Failed to delete queue, replicator: " + replicator + ", queueId: " 154 + queueId); 155 } 156 } 157 158 /** 159 * @param hfileRefsQueueId The undeleted hfile-refs queue id 160 * @throws IOException 161 */ 162 public void removeHFileRefsQueue(final String hfileRefsQueueId) throws IOException { 163 String node = ZNodePaths.joinZNode(this.hfileRefsZNode, hfileRefsQueueId); 164 try { 165 if (!replicationPeers.getAllPeerIds().contains(hfileRefsQueueId)) { 166 ZKUtil.deleteNodeRecursively(this.zookeeper, node); 167 LOG.info("Successfully removed hfile-refs queue " + hfileRefsQueueId + " from path " 168 + hfileRefsZNode); 169 } 170 } catch (KeeperException e) { 171 throw new IOException("Failed to delete hfile-refs queue " + hfileRefsQueueId 172 + " from path " + hfileRefsZNode); 173 } 174 } 175 176 String getHfileRefsZNode() { 177 return this.hfileRefsZNode; 178 } 179 } 180 181 /** 182 * Remove the undeleted replication queue's zk node for removed peers. 183 * @param undeletedQueues replicator with its queueIds for removed peers 184 * @throws IOException 185 */ 186 public void removeQueues(final Map<String, List<String>> undeletedQueues) throws IOException { 187 for (Entry<String, List<String>> replicatorAndQueueIds : undeletedQueues.entrySet()) { 188 String replicator = replicatorAndQueueIds.getKey(); 189 for (String queueId : replicatorAndQueueIds.getValue()) { 190 queueDeletor.removeQueue(replicator, queueId); 191 } 192 } 193 } 194 195 /** 196 * Remove the undeleted hfile-refs queue's zk node for removed peers. 197 * @param undeletedHFileRefsQueues replicator with its undeleted queueIds for removed peers in 198 * hfile-refs queue 199 * @throws IOException 200 */ 201 public void removeHFileRefsQueues(final Set<String> undeletedHFileRefsQueues) throws IOException { 202 for (String hfileRefsQueueId : undeletedHFileRefsQueues) { 203 queueDeletor.removeHFileRefsQueue(hfileRefsQueueId); 204 } 205 } 206}