001/* 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.replication; 020 021import java.util.List; 022import java.util.Set; 023 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.hbase.Abortable; 026import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet; 027import org.apache.hbase.thirdparty.com.google.common.collect.Sets; 028import org.apache.hadoop.hbase.zookeeper.ZKUtil; 029import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 030import org.apache.hadoop.hbase.zookeeper.ZNodePaths; 031import org.apache.yetus.audience.InterfaceAudience; 032import org.apache.zookeeper.KeeperException; 033import org.apache.zookeeper.data.Stat; 034import org.slf4j.Logger; 035import org.slf4j.LoggerFactory; 036 037@InterfaceAudience.Private 038public class ReplicationQueuesClientZKImpl extends ReplicationStateZKBase implements 039 ReplicationQueuesClient { 040 041 Logger LOG = LoggerFactory.getLogger(ReplicationQueuesClientZKImpl.class); 042 043 public ReplicationQueuesClientZKImpl(ReplicationQueuesClientArguments args) { 044 this(args.getZk(), args.getConf(), args.getAbortable()); 045 } 046 047 public ReplicationQueuesClientZKImpl(final ZKWatcher zk, Configuration conf, 048 Abortable abortable) { 049 super(zk, conf, abortable); 050 } 051 052 @Override 053 public void init() throws ReplicationException { 054 try { 055 if (ZKUtil.checkExists(this.zookeeper, this.queuesZNode) < 0) { 056 ZKUtil.createWithParents(this.zookeeper, this.queuesZNode); 057 } 058 } catch (KeeperException e) { 059 throw new ReplicationException("Internal error while initializing a queues client", e); 060 } 061 } 062 063 @Override 064 public List<String> getListOfReplicators() throws KeeperException { 065 return super.getListOfReplicatorsZK(); 066 } 067 068 @Override 069 public List<String> getLogsInQueue(String serverName, String queueId) throws KeeperException { 070 String znode = ZNodePaths.joinZNode(this.queuesZNode, serverName); 071 znode = ZNodePaths.joinZNode(znode, queueId); 072 List<String> result = null; 073 try { 074 result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode); 075 } catch (KeeperException e) { 076 this.abortable.abort("Failed to get list of wals for queueId=" + queueId 077 + " and serverName=" + serverName, e); 078 throw e; 079 } 080 return result; 081 } 082 083 @Override 084 public List<String> getAllQueues(String serverName) throws KeeperException { 085 String znode = ZNodePaths.joinZNode(this.queuesZNode, serverName); 086 List<String> result = null; 087 try { 088 result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode); 089 } catch (KeeperException e) { 090 this.abortable.abort("Failed to get list of queues for serverName=" + serverName, e); 091 throw e; 092 } 093 return result; 094 } 095 096 @Override 097 public Set<String> getAllWALs() throws KeeperException { 098 /** 099 * Load all wals in all replication queues from ZK. This method guarantees to return a 100 * snapshot which contains all WALs in the zookeeper at the start of this call even there 101 * is concurrent queue failover. However, some newly created WALs during the call may 102 * not be included. 103 */ 104 for (int retry = 0; ; retry++) { 105 int v0 = getQueuesZNodeCversion(); 106 List<String> rss = getListOfReplicators(); 107 if (rss == null || rss.isEmpty()) { 108 LOG.debug("Didn't find any region server that replicates, won't prevent any deletions."); 109 return ImmutableSet.of(); 110 } 111 Set<String> wals = Sets.newHashSet(); 112 for (String rs : rss) { 113 List<String> listOfPeers = getAllQueues(rs); 114 // if rs just died, this will be null 115 if (listOfPeers == null) { 116 continue; 117 } 118 for (String id : listOfPeers) { 119 List<String> peersWals = getLogsInQueue(rs, id); 120 if (peersWals != null) { 121 wals.addAll(peersWals); 122 } 123 } 124 } 125 int v1 = getQueuesZNodeCversion(); 126 if (v0 == v1) { 127 return wals; 128 } 129 LOG.info(String.format("Replication queue node cversion changed from %d to %d, retry = %d", 130 v0, v1, retry)); 131 } 132 } 133 134 public int getQueuesZNodeCversion() throws KeeperException { 135 try { 136 Stat stat = new Stat(); 137 ZKUtil.getDataNoWatch(this.zookeeper, this.queuesZNode, stat); 138 return stat.getCversion(); 139 } catch (KeeperException e) { 140 this.abortable.abort("Failed to get stat of replication rs node", e); 141 throw e; 142 } 143 } 144 145 @Override 146 public int getHFileRefsNodeChangeVersion() throws KeeperException { 147 Stat stat = new Stat(); 148 try { 149 ZKUtil.getDataNoWatch(this.zookeeper, this.hfileRefsZNode, stat); 150 } catch (KeeperException e) { 151 this.abortable.abort("Failed to get stat of replication hfile references node.", e); 152 throw e; 153 } 154 return stat.getCversion(); 155 } 156 157 @Override 158 public List<String> getAllPeersFromHFileRefsQueue() throws KeeperException { 159 List<String> result = null; 160 try { 161 result = ZKUtil.listChildrenNoWatch(this.zookeeper, this.hfileRefsZNode); 162 } catch (KeeperException e) { 163 this.abortable.abort("Failed to get list of all peers in hfile references node.", e); 164 throw e; 165 } 166 return result; 167 } 168 169 @Override 170 public List<String> getReplicableHFiles(String peerId) throws KeeperException { 171 String znode = ZNodePaths.joinZNode(this.hfileRefsZNode, peerId); 172 List<String> result = null; 173 try { 174 result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode); 175 } catch (KeeperException e) { 176 this.abortable.abort("Failed to get list of hfile references for peerId=" + peerId, e); 177 throw e; 178 } 179 return result; 180 } 181}