001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.replication;
020
021import java.util.List;
022import java.util.Set;
023
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.hbase.Abortable;
026import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
027import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
028import org.apache.hadoop.hbase.zookeeper.ZKUtil;
029import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
030import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
031import org.apache.yetus.audience.InterfaceAudience;
032import org.apache.zookeeper.KeeperException;
033import org.apache.zookeeper.data.Stat;
034import org.slf4j.Logger;
035import org.slf4j.LoggerFactory;
036
037@InterfaceAudience.Private
038public class ReplicationQueuesClientZKImpl extends ReplicationStateZKBase implements
039    ReplicationQueuesClient {
040
041  Logger LOG = LoggerFactory.getLogger(ReplicationQueuesClientZKImpl.class);
042
043  public ReplicationQueuesClientZKImpl(ReplicationQueuesClientArguments args) {
044    this(args.getZk(), args.getConf(), args.getAbortable());
045  }
046
047  public ReplicationQueuesClientZKImpl(final ZKWatcher zk, Configuration conf,
048                                       Abortable abortable) {
049    super(zk, conf, abortable);
050  }
051
052  @Override
053  public void init() throws ReplicationException {
054    try {
055      if (ZKUtil.checkExists(this.zookeeper, this.queuesZNode) < 0) {
056        ZKUtil.createWithParents(this.zookeeper, this.queuesZNode);
057      }
058    } catch (KeeperException e) {
059      throw new ReplicationException("Internal error while initializing a queues client", e);
060    }
061  }
062  
063  @Override
064  public List<String> getListOfReplicators() throws KeeperException {
065    return super.getListOfReplicatorsZK();
066  }
067
068  @Override
069  public List<String> getLogsInQueue(String serverName, String queueId) throws KeeperException {
070    String znode = ZNodePaths.joinZNode(this.queuesZNode, serverName);
071    znode = ZNodePaths.joinZNode(znode, queueId);
072    List<String> result = null;
073    try {
074      result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode);
075    } catch (KeeperException e) {
076      this.abortable.abort("Failed to get list of wals for queueId=" + queueId
077          + " and serverName=" + serverName, e);
078      throw e;
079    }
080    return result;
081  }
082
083  @Override
084  public List<String> getAllQueues(String serverName) throws KeeperException {
085    String znode = ZNodePaths.joinZNode(this.queuesZNode, serverName);
086    List<String> result = null;
087    try {
088      result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode);
089    } catch (KeeperException e) {
090      this.abortable.abort("Failed to get list of queues for serverName=" + serverName, e);
091      throw e;
092    }
093    return result;
094  }
095
096  @Override
097  public Set<String> getAllWALs() throws KeeperException {
098    /**
099     * Load all wals in all replication queues from ZK. This method guarantees to return a
100     * snapshot which contains all WALs in the zookeeper at the start of this call even there
101     * is concurrent queue failover. However, some newly created WALs during the call may
102     * not be included.
103     */
104    for (int retry = 0; ; retry++) {
105      int v0 = getQueuesZNodeCversion();
106      List<String> rss = getListOfReplicators();
107      if (rss == null || rss.isEmpty()) {
108        LOG.debug("Didn't find any region server that replicates, won't prevent any deletions.");
109        return ImmutableSet.of();
110      }
111      Set<String> wals = Sets.newHashSet();
112      for (String rs : rss) {
113        List<String> listOfPeers = getAllQueues(rs);
114        // if rs just died, this will be null
115        if (listOfPeers == null) {
116          continue;
117        }
118        for (String id : listOfPeers) {
119          List<String> peersWals = getLogsInQueue(rs, id);
120          if (peersWals != null) {
121            wals.addAll(peersWals);
122          }
123        }
124      }
125      int v1 = getQueuesZNodeCversion();
126      if (v0 == v1) {
127        return wals;
128      }
129      LOG.info(String.format("Replication queue node cversion changed from %d to %d, retry = %d",
130        v0, v1, retry));
131    }
132  }
133
134  public int getQueuesZNodeCversion() throws KeeperException {
135    try {
136      Stat stat = new Stat();
137      ZKUtil.getDataNoWatch(this.zookeeper, this.queuesZNode, stat);
138      return stat.getCversion();
139    } catch (KeeperException e) {
140      this.abortable.abort("Failed to get stat of replication rs node", e);
141      throw e;
142    }
143  }
144
145  @Override
146  public int getHFileRefsNodeChangeVersion() throws KeeperException {
147    Stat stat = new Stat();
148    try {
149      ZKUtil.getDataNoWatch(this.zookeeper, this.hfileRefsZNode, stat);
150    } catch (KeeperException e) {
151      this.abortable.abort("Failed to get stat of replication hfile references node.", e);
152      throw e;
153    }
154    return stat.getCversion();
155  }
156
157  @Override
158  public List<String> getAllPeersFromHFileRefsQueue() throws KeeperException {
159    List<String> result = null;
160    try {
161      result = ZKUtil.listChildrenNoWatch(this.zookeeper, this.hfileRefsZNode);
162    } catch (KeeperException e) {
163      this.abortable.abort("Failed to get list of all peers in hfile references node.", e);
164      throw e;
165    }
166    return result;
167  }
168
169  @Override
170  public List<String> getReplicableHFiles(String peerId) throws KeeperException {
171    String znode = ZNodePaths.joinZNode(this.hfileRefsZNode, peerId);
172    List<String> result = null;
173    try {
174      result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode);
175    } catch (KeeperException e) {
176      this.abortable.abort("Failed to get list of hfile references for peerId=" + peerId, e);
177      throw e;
178    }
179    return result;
180  }
181}