001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static java.util.stream.Collectors.toList;
021
022import java.util.ArrayList;
023import java.util.Collections;
024import java.util.HashSet;
025import java.util.List;
026import java.util.Map;
027import java.util.Map.Entry;
028import java.util.Set;
029import java.util.SortedSet;
030import java.util.TreeSet;
031import java.util.stream.Collectors;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.hbase.HConstants;
035import org.apache.hadoop.hbase.ServerName;
036import org.apache.hadoop.hbase.client.RegionInfo;
037import org.apache.hadoop.hbase.exceptions.DeserializationException;
038import org.apache.hadoop.hbase.util.Bytes;
039import org.apache.hadoop.hbase.util.Pair;
040import org.apache.hadoop.hbase.zookeeper.ZKUtil;
041import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp;
042import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
043import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
044import org.apache.yetus.audience.InterfaceAudience;
045import org.apache.zookeeper.KeeperException;
046import org.apache.zookeeper.KeeperException.BadVersionException;
047import org.apache.zookeeper.KeeperException.NoNodeException;
048import org.apache.zookeeper.KeeperException.NodeExistsException;
049import org.apache.zookeeper.KeeperException.NotEmptyException;
050import org.apache.zookeeper.data.Stat;
051import org.slf4j.Logger;
052import org.slf4j.LoggerFactory;
053
054import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
055import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
056
057/**
058 * ZK based replication queue storage.
059 * <p>
060 * The base znode for each regionserver is the regionserver name. For example:
061 *
062 * <pre>
063 * /hbase/replication/rs/hostname.example.org,6020,1234
064 * </pre>
065 *
066 * Within this znode, the region server maintains a set of WAL replication queues. These queues are
067 * represented by child znodes named using there give queue id. For example:
068 *
069 * <pre>
070 * /hbase/replication/rs/hostname.example.org,6020,1234/1
071 * /hbase/replication/rs/hostname.example.org,6020,1234/2
072 * </pre>
073 *
074 * Each queue has one child znode for every WAL that still needs to be replicated. The value of
075 * these WAL child znodes is the latest position that has been replicated. This position is updated
076 * every time a WAL entry is replicated. For example:
077 *
078 * <pre>
079 * /hbase/replication/rs/hostname.example.org,6020,1234/1/23522342.23422 [VALUE: 254]
080 * </pre>
081 */
082@InterfaceAudience.Private
083class ZKReplicationQueueStorage extends ZKReplicationStorageBase
084    implements ReplicationQueueStorage {
085
086  private static final Logger LOG = LoggerFactory.getLogger(ZKReplicationQueueStorage.class);
087
088  public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY =
089      "zookeeper.znode.replication.hfile.refs";
090  public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT = "hfile-refs";
091
092  public static final String ZOOKEEPER_ZNODE_REPLICATION_REGIONS_KEY =
093      "zookeeper.znode.replication.regions";
094  public static final String ZOOKEEPER_ZNODE_REPLICATION_REGIONS_DEFAULT = "regions";
095
096  /**
097   * The name of the znode that contains all replication queues
098   */
099  private final String queuesZNode;
100
101  /**
102   * The name of the znode that contains queues of hfile references to be replicated
103   */
104  private final String hfileRefsZNode;
105
106  @VisibleForTesting
107  final String regionsZNode;
108
109  public ZKReplicationQueueStorage(ZKWatcher zookeeper, Configuration conf) {
110    super(zookeeper, conf);
111
112    String queuesZNodeName = conf.get("zookeeper.znode.replication.rs", "rs");
113    String hfileRefsZNodeName = conf.get(ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY,
114      ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT);
115    this.queuesZNode = ZNodePaths.joinZNode(replicationZNode, queuesZNodeName);
116    this.hfileRefsZNode = ZNodePaths.joinZNode(replicationZNode, hfileRefsZNodeName);
117    this.regionsZNode = ZNodePaths.joinZNode(replicationZNode, conf
118        .get(ZOOKEEPER_ZNODE_REPLICATION_REGIONS_KEY, ZOOKEEPER_ZNODE_REPLICATION_REGIONS_DEFAULT));
119  }
120
121  @Override
122  public String getRsNode(ServerName serverName) {
123    return ZNodePaths.joinZNode(queuesZNode, serverName.getServerName());
124  }
125
126  private String getQueueNode(ServerName serverName, String queueId) {
127    return ZNodePaths.joinZNode(getRsNode(serverName), queueId);
128  }
129
130  private String getFileNode(String queueNode, String fileName) {
131    return ZNodePaths.joinZNode(queueNode, fileName);
132  }
133
134  private String getFileNode(ServerName serverName, String queueId, String fileName) {
135    return getFileNode(getQueueNode(serverName, queueId), fileName);
136  }
137
138  /**
139   * <p>
140   * Put all regions under /hbase/replication/regions znode will lead to too many children because
141   * of the huge number of regions in real production environment. So here we will distribute the
142   * znodes to multiple directories.
143   * </p>
144   * <p>
145   * So the final znode path will be format like this:
146   *
147   * <pre>
148   * /hbase/replication/regions/dd/04/e76a6966d4ffa908ed0586764767-100
149   * </pre>
150   *
151   * Here the full encoded region name is dd04e76a6966d4ffa908ed0586764767, and we use the first two
152   * characters 'dd' as the first level directory name, and use the next two characters '04' as the
153   * second level directory name, and the rest part as the prefix of the znode, and the suffix '100'
154   * is the peer id.
155   * </p>
156   * @param encodedRegionName the encoded region name.
157   * @param peerId peer id for replication.
158   * @return ZNode path to persist the max sequence id that we've pushed for the given region and
159   *         peer.
160   */
161  @VisibleForTesting
162  String getSerialReplicationRegionPeerNode(String encodedRegionName, String peerId) {
163    if (encodedRegionName == null || encodedRegionName.length() != RegionInfo.MD5_HEX_LENGTH) {
164      throw new IllegalArgumentException(
165          "Invalid encoded region name: " + encodedRegionName + ", length should be 32.");
166    }
167    return new StringBuilder(regionsZNode).append(ZNodePaths.ZNODE_PATH_SEPARATOR)
168            .append(encodedRegionName, 0, 2).append(ZNodePaths.ZNODE_PATH_SEPARATOR)
169            .append(encodedRegionName, 2, 4).append(ZNodePaths.ZNODE_PATH_SEPARATOR)
170            .append(encodedRegionName, 4, encodedRegionName.length()).append("-").append(peerId)
171            .toString();
172  }
173
174  @Override
175  public void removeQueue(ServerName serverName, String queueId) throws ReplicationException {
176    try {
177      ZKUtil.deleteNodeRecursively(zookeeper, getQueueNode(serverName, queueId));
178    } catch (KeeperException e) {
179      throw new ReplicationException(
180          "Failed to delete queue (serverName=" + serverName + ", queueId=" + queueId + ")", e);
181    }
182  }
183
184  @Override
185  public void addWAL(ServerName serverName, String queueId, String fileName)
186      throws ReplicationException {
187    try {
188      ZKUtil.createWithParents(zookeeper, getFileNode(serverName, queueId, fileName));
189    } catch (KeeperException e) {
190      throw new ReplicationException("Failed to add wal to queue (serverName=" + serverName
191          + ", queueId=" + queueId + ", fileName=" + fileName + ")", e);
192    }
193  }
194
195  @Override
196  public void removeWAL(ServerName serverName, String queueId, String fileName)
197      throws ReplicationException {
198    String fileNode = getFileNode(serverName, queueId, fileName);
199    try {
200      ZKUtil.deleteNode(zookeeper, fileNode);
201    } catch (NoNodeException e) {
202      LOG.warn("{} already deleted when removing log", fileNode);
203    } catch (KeeperException e) {
204      throw new ReplicationException("Failed to remove wal from queue (serverName=" + serverName +
205        ", queueId=" + queueId + ", fileName=" + fileName + ")", e);
206    }
207  }
208
209  private void addLastSeqIdsToOps(String queueId, Map<String, Long> lastSeqIds,
210      List<ZKUtilOp> listOfOps) throws KeeperException, ReplicationException {
211    String peerId = new ReplicationQueueInfo(queueId).getPeerId();
212    for (Entry<String, Long> lastSeqEntry : lastSeqIds.entrySet()) {
213      String path = getSerialReplicationRegionPeerNode(lastSeqEntry.getKey(), peerId);
214      Pair<Long, Integer> p = getLastSequenceIdWithVersion(lastSeqEntry.getKey(), peerId);
215      byte[] data = ZKUtil.positionToByteArray(lastSeqEntry.getValue());
216      if (p.getSecond() < 0) { // ZNode does not exist.
217        ZKUtil.createWithParents(zookeeper,
218          path.substring(0, path.lastIndexOf(ZNodePaths.ZNODE_PATH_SEPARATOR)));
219        listOfOps.add(ZKUtilOp.createAndFailSilent(path, data));
220        continue;
221      }
222      // Perform CAS in a specific version v0 (HBASE-20138)
223      int v0 = p.getSecond();
224      long lastPushedSeqId = p.getFirst();
225      if (lastSeqEntry.getValue() <= lastPushedSeqId) {
226        continue;
227      }
228      listOfOps.add(ZKUtilOp.setData(path, data, v0));
229    }
230  }
231
232  @Override
233  public void setWALPosition(ServerName serverName, String queueId, String fileName, long position,
234      Map<String, Long> lastSeqIds) throws ReplicationException {
235    try {
236      for (int retry = 0;; retry++) {
237        List<ZKUtilOp> listOfOps = new ArrayList<>();
238        if (position > 0) {
239          listOfOps.add(ZKUtilOp.setData(getFileNode(serverName, queueId, fileName),
240            ZKUtil.positionToByteArray(position)));
241        }
242        // Persist the max sequence id(s) of regions for serial replication atomically.
243        addLastSeqIdsToOps(queueId, lastSeqIds, listOfOps);
244        if (listOfOps.isEmpty()) {
245          return;
246        }
247        try {
248          ZKUtil.multiOrSequential(zookeeper, listOfOps, false);
249          return;
250        } catch (KeeperException.BadVersionException | KeeperException.NodeExistsException e) {
251          LOG.warn(
252            "Bad version(or node exist) when persist the last pushed sequence id to zookeeper "
253                + "storage, Retry = " + retry + ", serverName=" + serverName + ", queueId="
254                + queueId + ", fileName=" + fileName);
255        }
256      }
257    } catch (KeeperException e) {
258      throw new ReplicationException("Failed to set log position (serverName=" + serverName
259          + ", queueId=" + queueId + ", fileName=" + fileName + ", position=" + position + ")", e);
260    }
261  }
262
263  /**
264   * Return the {lastPushedSequenceId, ZNodeDataVersion} pair. if ZNodeDataVersion is -1, it means
265   * that the ZNode does not exist.
266   */
267  @VisibleForTesting
268  protected Pair<Long, Integer> getLastSequenceIdWithVersion(String encodedRegionName,
269      String peerId) throws KeeperException {
270    Stat stat = new Stat();
271    String path = getSerialReplicationRegionPeerNode(encodedRegionName, peerId);
272    byte[] data = ZKUtil.getDataNoWatch(zookeeper, path, stat);
273    if (data == null) {
274      // ZNode does not exist, so just return version -1 to indicate that no node exist.
275      return Pair.newPair(HConstants.NO_SEQNUM, -1);
276    }
277    try {
278      return Pair.newPair(ZKUtil.parseWALPositionFrom(data), stat.getVersion());
279    } catch (DeserializationException de) {
280      LOG.warn("Failed to parse log position (region=" + encodedRegionName + ", peerId=" + peerId
281          + "), data=" + Bytes.toStringBinary(data));
282    }
283    return Pair.newPair(HConstants.NO_SEQNUM, stat.getVersion());
284  }
285
286  @Override
287  public long getLastSequenceId(String encodedRegionName, String peerId)
288      throws ReplicationException {
289    try {
290      return getLastSequenceIdWithVersion(encodedRegionName, peerId).getFirst();
291    } catch (KeeperException e) {
292      throw new ReplicationException("Failed to get last pushed sequence id (encodedRegionName="
293          + encodedRegionName + ", peerId=" + peerId + ")", e);
294    }
295  }
296
297  @Override
298  public void setLastSequenceIds(String peerId, Map<String, Long> lastSeqIds)
299      throws ReplicationException {
300    try {
301      // No need CAS and retry here, because it'll call setLastSequenceIds() for disabled peers
302      // only, so no conflict happen.
303      List<ZKUtilOp> listOfOps = new ArrayList<>();
304      for (Entry<String, Long> lastSeqEntry : lastSeqIds.entrySet()) {
305        String path = getSerialReplicationRegionPeerNode(lastSeqEntry.getKey(), peerId);
306        ZKUtil.createWithParents(zookeeper, path);
307        listOfOps.add(ZKUtilOp.setData(path, ZKUtil.positionToByteArray(lastSeqEntry.getValue())));
308      }
309      if (!listOfOps.isEmpty()) {
310        ZKUtil.multiOrSequential(zookeeper, listOfOps, true);
311      }
312    } catch (KeeperException e) {
313      throw new ReplicationException("Failed to set last sequence ids, peerId=" + peerId
314          + ", size of lastSeqIds=" + lastSeqIds.size(), e);
315    }
316  }
317
318  @Override
319  public void removeLastSequenceIds(String peerId) throws ReplicationException {
320    String suffix = "-" + peerId;
321    try {
322      StringBuilder sb = new StringBuilder(regionsZNode);
323      int regionsZNodeLength = regionsZNode.length();
324      int levelOneLength = regionsZNodeLength + 3;
325      int levelTwoLength = levelOneLength + 3;
326      List<String> levelOneDirs = ZKUtil.listChildrenNoWatch(zookeeper, regionsZNode);
327      // it is possible that levelOneDirs is null if we haven't write any last pushed sequence ids
328      // yet, so we need an extra check here.
329      if (CollectionUtils.isEmpty(levelOneDirs)) {
330        return;
331      }
332      for (String levelOne : levelOneDirs) {
333        sb.append(ZNodePaths.ZNODE_PATH_SEPARATOR).append(levelOne);
334        for (String levelTwo : ZKUtil.listChildrenNoWatch(zookeeper, sb.toString())) {
335          sb.append(ZNodePaths.ZNODE_PATH_SEPARATOR).append(levelTwo);
336          for (String znode : ZKUtil.listChildrenNoWatch(zookeeper, sb.toString())) {
337            if (znode.endsWith(suffix)) {
338              sb.append(ZNodePaths.ZNODE_PATH_SEPARATOR).append(znode);
339              ZKUtil.deleteNode(zookeeper, sb.toString());
340              sb.setLength(levelTwoLength);
341            }
342          }
343          sb.setLength(levelOneLength);
344        }
345        sb.setLength(regionsZNodeLength);
346      }
347    } catch (KeeperException e) {
348      throw new ReplicationException("Failed to remove all last sequence ids, peerId=" + peerId, e);
349    }
350  }
351
352  @Override
353  public void removeLastSequenceIds(String peerId, List<String> encodedRegionNames)
354      throws ReplicationException {
355    try {
356      List<ZKUtilOp> listOfOps =
357        encodedRegionNames.stream().map(n -> getSerialReplicationRegionPeerNode(n, peerId))
358          .map(ZKUtilOp::deleteNodeFailSilent).collect(Collectors.toList());
359      ZKUtil.multiOrSequential(zookeeper, listOfOps, true);
360    } catch (KeeperException e) {
361      throw new ReplicationException("Failed to remove last sequence ids, peerId=" + peerId +
362        ", encodedRegionNames.size=" + encodedRegionNames.size(), e);
363    }
364  }
365
366  @Override
367  public long getWALPosition(ServerName serverName, String queueId, String fileName)
368      throws ReplicationException {
369    byte[] bytes;
370    try {
371      bytes = ZKUtil.getData(zookeeper, getFileNode(serverName, queueId, fileName));
372    } catch (KeeperException | InterruptedException e) {
373      throw new ReplicationException("Failed to get log position (serverName=" + serverName +
374        ", queueId=" + queueId + ", fileName=" + fileName + ")", e);
375    }
376    try {
377      return ZKUtil.parseWALPositionFrom(bytes);
378    } catch (DeserializationException de) {
379      LOG.warn("Failed parse log position (serverName={}, queueId={}, fileName={})",
380          serverName, queueId, fileName);
381    }
382    // if we can not parse the position, start at the beginning of the wal file again
383    return 0;
384  }
385
386  @Override
387  public Pair<String, SortedSet<String>> claimQueue(ServerName sourceServerName, String queueId,
388      ServerName destServerName) throws ReplicationException {
389    LOG.info("Atomically moving {}/{}'s WALs to {}", sourceServerName, queueId, destServerName);
390    try {
391      ZKUtil.createWithParents(zookeeper, getRsNode(destServerName));
392    } catch (KeeperException e) {
393      throw new ReplicationException(
394          "Claim queue queueId=" + queueId + " from " + sourceServerName + " to " + destServerName +
395            " failed when creating the node for " + destServerName,
396          e);
397    }
398    String newQueueId = queueId + "-" + sourceServerName;
399    try {
400      String oldQueueNode = getQueueNode(sourceServerName, queueId);
401      List<String> wals = ZKUtil.listChildrenNoWatch(zookeeper, oldQueueNode);
402      if (CollectionUtils.isEmpty(wals)) {
403        ZKUtil.deleteNodeFailSilent(zookeeper, oldQueueNode);
404        LOG.info("Removed empty {}/{}", sourceServerName, queueId);
405        return new Pair<>(newQueueId, Collections.emptySortedSet());
406      }
407      String newQueueNode = getQueueNode(destServerName, newQueueId);
408      List<ZKUtilOp> listOfOps = new ArrayList<>();
409      SortedSet<String> logQueue = new TreeSet<>();
410      // create the new cluster znode
411      listOfOps.add(ZKUtilOp.createAndFailSilent(newQueueNode, HConstants.EMPTY_BYTE_ARRAY));
412      // get the offset of the logs and set it to new znodes
413      for (String wal : wals) {
414        String oldWalNode = getFileNode(oldQueueNode, wal);
415        byte[] logOffset = ZKUtil.getData(this.zookeeper, oldWalNode);
416        LOG.debug("Creating {} with data {}", wal, Bytes.toStringBinary(logOffset));
417        String newWalNode = getFileNode(newQueueNode, wal);
418        listOfOps.add(ZKUtilOp.createAndFailSilent(newWalNode, logOffset));
419        listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldWalNode));
420        logQueue.add(wal);
421      }
422      // add delete op for peer
423      listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldQueueNode));
424
425      LOG.trace("The multi list size is {}", listOfOps.size());
426      ZKUtil.multiOrSequential(zookeeper, listOfOps, false);
427
428      LOG.info("Atomically moved {}/{}'s WALs to {}", sourceServerName, queueId, destServerName);
429      return new Pair<>(newQueueId, logQueue);
430    } catch (NoNodeException | NodeExistsException | NotEmptyException | BadVersionException e) {
431      // Multi call failed; it looks like some other regionserver took away the logs.
432      // These exceptions mean that zk tells us the request can not be execute. So return an empty
433      // queue to tell the upper layer that claim nothing. For other types of exception should be
434      // thrown out to notify the upper layer.
435      LOG.info("Claim queue queueId={} from {} to {} failed with {}, someone else took the log?",
436          queueId,sourceServerName, destServerName, e.toString());
437      return new Pair<>(newQueueId, Collections.emptySortedSet());
438    } catch (KeeperException | InterruptedException e) {
439      throw new ReplicationException("Claim queue queueId=" + queueId + " from " +
440        sourceServerName + " to " + destServerName + " failed", e);
441    }
442  }
443
444  @Override
445  public void removeReplicatorIfQueueIsEmpty(ServerName serverName) throws ReplicationException {
446    try {
447      ZKUtil.deleteNodeFailSilent(zookeeper, getRsNode(serverName));
448    } catch (NotEmptyException e) {
449      // keep silence to avoid logging too much.
450    } catch (KeeperException e) {
451      throw new ReplicationException("Failed to remove replicator for " + serverName, e);
452    }
453  }
454
455  private List<ServerName> getListOfReplicators0() throws KeeperException {
456    List<String> children = ZKUtil.listChildrenNoWatch(zookeeper, queuesZNode);
457    if (children == null) {
458      children = Collections.emptyList();
459    }
460    return children.stream().map(ServerName::parseServerName).collect(toList());
461  }
462
463  @Override
464  public List<ServerName> getListOfReplicators() throws ReplicationException {
465    try {
466      return getListOfReplicators0();
467    } catch (KeeperException e) {
468      throw new ReplicationException("Failed to get list of replicators", e);
469    }
470  }
471
472  private List<String> getWALsInQueue0(ServerName serverName, String queueId)
473      throws KeeperException {
474    List<String> children = ZKUtil.listChildrenNoWatch(zookeeper, getQueueNode(serverName,
475        queueId));
476    return children != null ? children : Collections.emptyList();
477  }
478
479  @Override
480  public List<String> getWALsInQueue(ServerName serverName, String queueId)
481      throws ReplicationException {
482    try {
483      return getWALsInQueue0(serverName, queueId);
484    } catch (KeeperException e) {
485      throw new ReplicationException(
486          "Failed to get wals in queue (serverName=" + serverName + ", queueId=" + queueId + ")",
487          e);
488    }
489  }
490
491  private List<String> getAllQueues0(ServerName serverName) throws KeeperException {
492    List<String> children = ZKUtil.listChildrenNoWatch(zookeeper, getRsNode(serverName));
493    return children != null ? children : Collections.emptyList();
494  }
495
496  @Override
497  public List<String> getAllQueues(ServerName serverName) throws ReplicationException {
498    try {
499      return getAllQueues0(serverName);
500    } catch (KeeperException e) {
501      throw new ReplicationException("Failed to get all queues (serverName=" + serverName + ")", e);
502    }
503  }
504
505  // will be overridden in UTs
506  @VisibleForTesting
507  protected int getQueuesZNodeCversion() throws KeeperException {
508    Stat stat = new Stat();
509    ZKUtil.getDataNoWatch(this.zookeeper, this.queuesZNode, stat);
510    return stat.getCversion();
511  }
512
513  @Override
514  public Set<String> getAllWALs() throws ReplicationException {
515    try {
516      for (int retry = 0;; retry++) {
517        int v0 = getQueuesZNodeCversion();
518        List<ServerName> rss = getListOfReplicators0();
519        if (rss.isEmpty()) {
520          LOG.debug("Didn't find a RegionServer that replicates, won't prevent deletions.");
521          return Collections.emptySet();
522        }
523        Set<String> wals = new HashSet<>();
524        for (ServerName rs : rss) {
525          for (String queueId : getAllQueues0(rs)) {
526            wals.addAll(getWALsInQueue0(rs, queueId));
527          }
528        }
529        int v1 = getQueuesZNodeCversion();
530        if (v0 == v1) {
531          return wals;
532        }
533        LOG.info("Replication queue node cversion changed from %d to %d, retry = %d",
534            v0, v1, retry);
535      }
536    } catch (KeeperException e) {
537      throw new ReplicationException("Failed to get all wals", e);
538    }
539  }
540
541  private String getHFileRefsPeerNode(String peerId) {
542    return ZNodePaths.joinZNode(hfileRefsZNode, peerId);
543  }
544
545  private String getHFileNode(String peerNode, String fileName) {
546    return ZNodePaths.joinZNode(peerNode, fileName);
547  }
548
549  @Override
550  public void addPeerToHFileRefs(String peerId) throws ReplicationException {
551    String peerNode = getHFileRefsPeerNode(peerId);
552    try {
553      if (ZKUtil.checkExists(zookeeper, peerNode) == -1) {
554        LOG.info("Adding peer {} to hfile reference queue.", peerId);
555        ZKUtil.createWithParents(zookeeper, peerNode);
556      }
557    } catch (KeeperException e) {
558      throw new ReplicationException("Failed to add peer " + peerId + " to hfile reference queue.",
559          e);
560    }
561  }
562
563  @Override
564  public void removePeerFromHFileRefs(String peerId) throws ReplicationException {
565    String peerNode = getHFileRefsPeerNode(peerId);
566    try {
567      if (ZKUtil.checkExists(zookeeper, peerNode) == -1) {
568        LOG.debug("Peer {} not found in hfile reference queue.", peerNode);
569      } else {
570        LOG.info("Removing peer {} from hfile reference queue.", peerNode);
571        ZKUtil.deleteNodeRecursively(zookeeper, peerNode);
572      }
573    } catch (KeeperException e) {
574      throw new ReplicationException(
575          "Failed to remove peer " + peerId + " from hfile reference queue.", e);
576    }
577  }
578
579  @Override
580  public void addHFileRefs(String peerId, List<Pair<Path, Path>> pairs)
581      throws ReplicationException {
582    String peerNode = getHFileRefsPeerNode(peerId);
583    LOG.debug("Adding hfile references {} in queue {}", pairs, peerNode);
584    List<ZKUtilOp> listOfOps = pairs.stream().map(p -> p.getSecond().getName())
585        .map(n -> getHFileNode(peerNode, n))
586        .map(f -> ZKUtilOp.createAndFailSilent(f, HConstants.EMPTY_BYTE_ARRAY)).collect(toList());
587    LOG.debug("The multi list size for adding hfile references in zk for node {} is {}",
588          peerNode, listOfOps.size());
589    try {
590      ZKUtil.multiOrSequential(this.zookeeper, listOfOps, true);
591    } catch (KeeperException e) {
592      throw new ReplicationException("Failed to add hfile reference to peer " + peerId, e);
593    }
594  }
595
596  @Override
597  public void removeHFileRefs(String peerId, List<String> files) throws ReplicationException {
598    String peerNode = getHFileRefsPeerNode(peerId);
599    LOG.debug("Removing hfile references {} from queue {}", files, peerNode);
600
601    List<ZKUtilOp> listOfOps = files.stream().map(n -> getHFileNode(peerNode, n))
602        .map(ZKUtilOp::deleteNodeFailSilent).collect(toList());
603    LOG.debug("The multi list size for removing hfile references in zk for node {} is {}",
604        peerNode, listOfOps.size());
605    try {
606      ZKUtil.multiOrSequential(this.zookeeper, listOfOps, true);
607    } catch (KeeperException e) {
608      throw new ReplicationException("Failed to remove hfile reference from peer " + peerId, e);
609    }
610  }
611
612  private List<String> getAllPeersFromHFileRefsQueue0() throws KeeperException {
613    List<String> children = ZKUtil.listChildrenNoWatch(zookeeper, hfileRefsZNode);
614    return children != null ? children : Collections.emptyList();
615  }
616
617  @Override
618  public List<String> getAllPeersFromHFileRefsQueue() throws ReplicationException {
619    try {
620      return getAllPeersFromHFileRefsQueue0();
621    } catch (KeeperException e) {
622      throw new ReplicationException("Failed to get list of all peers in hfile references node.",
623          e);
624    }
625  }
626
627  private List<String> getReplicableHFiles0(String peerId) throws KeeperException {
628    List<String> children = ZKUtil.listChildrenNoWatch(this.zookeeper,
629        getHFileRefsPeerNode(peerId));
630    return children != null ? children : Collections.emptyList();
631  }
632
633  @Override
634  public List<String> getReplicableHFiles(String peerId) throws ReplicationException {
635    try {
636      return getReplicableHFiles0(peerId);
637    } catch (KeeperException e) {
638      throw new ReplicationException("Failed to get list of hfile references for peer " + peerId,
639          e);
640    }
641  }
642
643  // will be overridden in UTs
644  @VisibleForTesting
645  protected int getHFileRefsZNodeCversion() throws ReplicationException {
646    Stat stat = new Stat();
647    try {
648      ZKUtil.getDataNoWatch(zookeeper, hfileRefsZNode, stat);
649    } catch (KeeperException e) {
650      throw new ReplicationException("Failed to get stat of replication hfile references node.", e);
651    }
652    return stat.getCversion();
653  }
654
655  @Override
656  public Set<String> getAllHFileRefs() throws ReplicationException {
657    try {
658      for (int retry = 0;; retry++) {
659        int v0 = getHFileRefsZNodeCversion();
660        List<String> peers = getAllPeersFromHFileRefsQueue();
661        if (peers.isEmpty()) {
662          LOG.debug("Didn't find any peers with hfile references, won't prevent deletions.");
663          return Collections.emptySet();
664        }
665        Set<String> hfileRefs = new HashSet<>();
666        for (String peer : peers) {
667          hfileRefs.addAll(getReplicableHFiles0(peer));
668        }
669        int v1 = getHFileRefsZNodeCversion();
670        if (v0 == v1) {
671          return hfileRefs;
672        }
673        LOG.debug("Replication hfile references node cversion changed from %d to %d, retry = %d",
674            v0, v1, retry);
675      }
676    } catch (KeeperException e) {
677      throw new ReplicationException("Failed to get all hfile refs", e);
678    }
679  }
680}