001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static java.util.stream.Collectors.toList;
021
022import java.util.ArrayList;
023import java.util.Collections;
024import java.util.HashSet;
025import java.util.List;
026import java.util.Map;
027import java.util.Map.Entry;
028import java.util.Set;
029import java.util.SortedSet;
030import java.util.TreeSet;
031import java.util.stream.Collectors;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.hbase.HConstants;
035import org.apache.hadoop.hbase.ServerName;
036import org.apache.hadoop.hbase.client.RegionInfo;
037import org.apache.hadoop.hbase.exceptions.DeserializationException;
038import org.apache.hadoop.hbase.util.Bytes;
039import org.apache.hadoop.hbase.util.Pair;
040import org.apache.hadoop.hbase.zookeeper.ZKUtil;
041import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp;
042import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
043import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
044import org.apache.yetus.audience.InterfaceAudience;
045import org.apache.zookeeper.KeeperException;
046import org.apache.zookeeper.KeeperException.BadVersionException;
047import org.apache.zookeeper.KeeperException.NoNodeException;
048import org.apache.zookeeper.KeeperException.NodeExistsException;
049import org.apache.zookeeper.KeeperException.NotEmptyException;
050import org.apache.zookeeper.data.Stat;
051import org.slf4j.Logger;
052import org.slf4j.LoggerFactory;
053
054import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
055
056/**
057 * ZK based replication queue storage.
058 * <p>
059 * The base znode for each regionserver is the regionserver name. For example:
060 *
061 * <pre>
062 * /hbase/replication/rs/hostname.example.org,6020,1234
063 * </pre>
064 *
065 * Within this znode, the region server maintains a set of WAL replication queues. These queues are
066 * represented by child znodes named using there give queue id. For example:
067 *
068 * <pre>
069 * /hbase/replication/rs/hostname.example.org,6020,1234/1
070 * /hbase/replication/rs/hostname.example.org,6020,1234/2
071 * </pre>
072 *
073 * Each queue has one child znode for every WAL that still needs to be replicated. The value of
074 * these WAL child znodes is the latest position that has been replicated. This position is updated
075 * every time a WAL entry is replicated. For example:
076 *
077 * <pre>
078 * /hbase/replication/rs/hostname.example.org,6020,1234/1/23522342.23422 [VALUE: 254]
079 * </pre>
080 */
081@InterfaceAudience.Private
082class ZKReplicationQueueStorage extends ZKReplicationStorageBase
083    implements ReplicationQueueStorage {
084
085  private static final Logger LOG = LoggerFactory.getLogger(ZKReplicationQueueStorage.class);
086
087  public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY =
088      "zookeeper.znode.replication.hfile.refs";
089  public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT = "hfile-refs";
090
091  public static final String ZOOKEEPER_ZNODE_REPLICATION_REGIONS_KEY =
092      "zookeeper.znode.replication.regions";
093  public static final String ZOOKEEPER_ZNODE_REPLICATION_REGIONS_DEFAULT = "regions";
094
095  /**
096   * The name of the znode that contains all replication queues
097   */
098  private final String queuesZNode;
099
100  /**
101   * The name of the znode that contains queues of hfile references to be replicated
102   */
103  private final String hfileRefsZNode;
104
105  final String regionsZNode;
106
107  public ZKReplicationQueueStorage(ZKWatcher zookeeper, Configuration conf) {
108    super(zookeeper, conf);
109
110    String queuesZNodeName = conf.get("zookeeper.znode.replication.rs", "rs");
111    String hfileRefsZNodeName = conf.get(ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY,
112      ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT);
113    this.queuesZNode = ZNodePaths.joinZNode(replicationZNode, queuesZNodeName);
114    this.hfileRefsZNode = ZNodePaths.joinZNode(replicationZNode, hfileRefsZNodeName);
115    this.regionsZNode = ZNodePaths.joinZNode(replicationZNode, conf
116        .get(ZOOKEEPER_ZNODE_REPLICATION_REGIONS_KEY, ZOOKEEPER_ZNODE_REPLICATION_REGIONS_DEFAULT));
117  }
118
119  @Override
120  public String getRsNode(ServerName serverName) {
121    return ZNodePaths.joinZNode(queuesZNode, serverName.getServerName());
122  }
123
124  private String getQueueNode(ServerName serverName, String queueId) {
125    return ZNodePaths.joinZNode(getRsNode(serverName), queueId);
126  }
127
128  private String getFileNode(String queueNode, String fileName) {
129    return ZNodePaths.joinZNode(queueNode, fileName);
130  }
131
132  private String getFileNode(ServerName serverName, String queueId, String fileName) {
133    return getFileNode(getQueueNode(serverName, queueId), fileName);
134  }
135
136  /**
137   * <p>
138   * Put all regions under /hbase/replication/regions znode will lead to too many children because
139   * of the huge number of regions in real production environment. So here we will distribute the
140   * znodes to multiple directories.
141   * </p>
142   * <p>
143   * So the final znode path will be format like this:
144   *
145   * <pre>
146   * /hbase/replication/regions/dd/04/e76a6966d4ffa908ed0586764767-100
147   * </pre>
148   *
149   * Here the full encoded region name is dd04e76a6966d4ffa908ed0586764767, and we use the first two
150   * characters 'dd' as the first level directory name, and use the next two characters '04' as the
151   * second level directory name, and the rest part as the prefix of the znode, and the suffix '100'
152   * is the peer id.
153   * </p>
154   * @param encodedRegionName the encoded region name.
155   * @param peerId peer id for replication.
156   * @return ZNode path to persist the max sequence id that we've pushed for the given region and
157   *         peer.
158   */
159  String getSerialReplicationRegionPeerNode(String encodedRegionName, String peerId) {
160    if (encodedRegionName == null || encodedRegionName.length() != RegionInfo.MD5_HEX_LENGTH) {
161      throw new IllegalArgumentException(
162          "Invalid encoded region name: " + encodedRegionName + ", length should be 32.");
163    }
164    return new StringBuilder(regionsZNode).append(ZNodePaths.ZNODE_PATH_SEPARATOR)
165            .append(encodedRegionName, 0, 2).append(ZNodePaths.ZNODE_PATH_SEPARATOR)
166            .append(encodedRegionName, 2, 4).append(ZNodePaths.ZNODE_PATH_SEPARATOR)
167            .append(encodedRegionName, 4, encodedRegionName.length()).append("-").append(peerId)
168            .toString();
169  }
170
171  @Override
172  public void removeQueue(ServerName serverName, String queueId) throws ReplicationException {
173    try {
174      ZKUtil.deleteNodeRecursively(zookeeper, getQueueNode(serverName, queueId));
175    } catch (KeeperException e) {
176      throw new ReplicationException(
177          "Failed to delete queue (serverName=" + serverName + ", queueId=" + queueId + ")", e);
178    }
179  }
180
181  @Override
182  public void addWAL(ServerName serverName, String queueId, String fileName)
183      throws ReplicationException {
184    try {
185      ZKUtil.createWithParents(zookeeper, getFileNode(serverName, queueId, fileName));
186    } catch (KeeperException e) {
187      throw new ReplicationException("Failed to add wal to queue (serverName=" + serverName
188          + ", queueId=" + queueId + ", fileName=" + fileName + ")", e);
189    }
190  }
191
192  @Override
193  public void removeWAL(ServerName serverName, String queueId, String fileName)
194      throws ReplicationException {
195    String fileNode = getFileNode(serverName, queueId, fileName);
196    try {
197      ZKUtil.deleteNode(zookeeper, fileNode);
198    } catch (NoNodeException e) {
199      LOG.warn("{} already deleted when removing log", fileNode);
200    } catch (KeeperException e) {
201      throw new ReplicationException("Failed to remove wal from queue (serverName=" + serverName +
202        ", queueId=" + queueId + ", fileName=" + fileName + ")", e);
203    }
204  }
205
206  private void addLastSeqIdsToOps(String queueId, Map<String, Long> lastSeqIds,
207      List<ZKUtilOp> listOfOps) throws KeeperException, ReplicationException {
208    String peerId = new ReplicationQueueInfo(queueId).getPeerId();
209    for (Entry<String, Long> lastSeqEntry : lastSeqIds.entrySet()) {
210      String path = getSerialReplicationRegionPeerNode(lastSeqEntry.getKey(), peerId);
211      Pair<Long, Integer> p = getLastSequenceIdWithVersion(lastSeqEntry.getKey(), peerId);
212      byte[] data = ZKUtil.positionToByteArray(lastSeqEntry.getValue());
213      if (p.getSecond() < 0) { // ZNode does not exist.
214        ZKUtil.createWithParents(zookeeper,
215          path.substring(0, path.lastIndexOf(ZNodePaths.ZNODE_PATH_SEPARATOR)));
216        listOfOps.add(ZKUtilOp.createAndFailSilent(path, data));
217        continue;
218      }
219      // Perform CAS in a specific version v0 (HBASE-20138)
220      int v0 = p.getSecond();
221      long lastPushedSeqId = p.getFirst();
222      if (lastSeqEntry.getValue() <= lastPushedSeqId) {
223        continue;
224      }
225      listOfOps.add(ZKUtilOp.setData(path, data, v0));
226    }
227  }
228
229  @Override
230  public void setWALPosition(ServerName serverName, String queueId, String fileName, long position,
231      Map<String, Long> lastSeqIds) throws ReplicationException {
232    try {
233      for (int retry = 0;; retry++) {
234        List<ZKUtilOp> listOfOps = new ArrayList<>();
235        if (position > 0) {
236          listOfOps.add(ZKUtilOp.setData(getFileNode(serverName, queueId, fileName),
237            ZKUtil.positionToByteArray(position)));
238        }
239        // Persist the max sequence id(s) of regions for serial replication atomically.
240        addLastSeqIdsToOps(queueId, lastSeqIds, listOfOps);
241        if (listOfOps.isEmpty()) {
242          return;
243        }
244        try {
245          ZKUtil.multiOrSequential(zookeeper, listOfOps, false);
246          return;
247        } catch (KeeperException.BadVersionException | KeeperException.NodeExistsException e) {
248          LOG.warn(
249            "Bad version(or node exist) when persist the last pushed sequence id to zookeeper "
250                + "storage, Retry = " + retry + ", serverName=" + serverName + ", queueId="
251                + queueId + ", fileName=" + fileName);
252        }
253      }
254    } catch (KeeperException e) {
255      throw new ReplicationException("Failed to set log position (serverName=" + serverName
256          + ", queueId=" + queueId + ", fileName=" + fileName + ", position=" + position + ")", e);
257    }
258  }
259
260  /**
261   * Return the {lastPushedSequenceId, ZNodeDataVersion} pair. if ZNodeDataVersion is -1, it means
262   * that the ZNode does not exist.
263   */
264  protected Pair<Long, Integer> getLastSequenceIdWithVersion(String encodedRegionName,
265      String peerId) throws KeeperException {
266    Stat stat = new Stat();
267    String path = getSerialReplicationRegionPeerNode(encodedRegionName, peerId);
268    byte[] data = ZKUtil.getDataNoWatch(zookeeper, path, stat);
269    if (data == null) {
270      // ZNode does not exist, so just return version -1 to indicate that no node exist.
271      return Pair.newPair(HConstants.NO_SEQNUM, -1);
272    }
273    try {
274      return Pair.newPair(ZKUtil.parseWALPositionFrom(data), stat.getVersion());
275    } catch (DeserializationException de) {
276      LOG.warn("Failed to parse log position (region=" + encodedRegionName + ", peerId=" + peerId
277          + "), data=" + Bytes.toStringBinary(data));
278    }
279    return Pair.newPair(HConstants.NO_SEQNUM, stat.getVersion());
280  }
281
282  @Override
283  public long getLastSequenceId(String encodedRegionName, String peerId)
284      throws ReplicationException {
285    try {
286      return getLastSequenceIdWithVersion(encodedRegionName, peerId).getFirst();
287    } catch (KeeperException e) {
288      throw new ReplicationException("Failed to get last pushed sequence id (encodedRegionName="
289          + encodedRegionName + ", peerId=" + peerId + ")", e);
290    }
291  }
292
293  @Override
294  public void setLastSequenceIds(String peerId, Map<String, Long> lastSeqIds)
295      throws ReplicationException {
296    try {
297      // No need CAS and retry here, because it'll call setLastSequenceIds() for disabled peers
298      // only, so no conflict happen.
299      List<ZKUtilOp> listOfOps = new ArrayList<>();
300      for (Entry<String, Long> lastSeqEntry : lastSeqIds.entrySet()) {
301        String path = getSerialReplicationRegionPeerNode(lastSeqEntry.getKey(), peerId);
302        ZKUtil.createWithParents(zookeeper, path);
303        listOfOps.add(ZKUtilOp.setData(path, ZKUtil.positionToByteArray(lastSeqEntry.getValue())));
304      }
305      if (!listOfOps.isEmpty()) {
306        ZKUtil.multiOrSequential(zookeeper, listOfOps, true);
307      }
308    } catch (KeeperException e) {
309      throw new ReplicationException("Failed to set last sequence ids, peerId=" + peerId
310          + ", size of lastSeqIds=" + lastSeqIds.size(), e);
311    }
312  }
313
314  @Override
315  public void removeLastSequenceIds(String peerId) throws ReplicationException {
316    String suffix = "-" + peerId;
317    try {
318      StringBuilder sb = new StringBuilder(regionsZNode);
319      int regionsZNodeLength = regionsZNode.length();
320      int levelOneLength = regionsZNodeLength + 3;
321      int levelTwoLength = levelOneLength + 3;
322      List<String> levelOneDirs = ZKUtil.listChildrenNoWatch(zookeeper, regionsZNode);
323      // it is possible that levelOneDirs is null if we haven't write any last pushed sequence ids
324      // yet, so we need an extra check here.
325      if (CollectionUtils.isEmpty(levelOneDirs)) {
326        return;
327      }
328      for (String levelOne : levelOneDirs) {
329        sb.append(ZNodePaths.ZNODE_PATH_SEPARATOR).append(levelOne);
330        for (String levelTwo : ZKUtil.listChildrenNoWatch(zookeeper, sb.toString())) {
331          sb.append(ZNodePaths.ZNODE_PATH_SEPARATOR).append(levelTwo);
332          for (String znode : ZKUtil.listChildrenNoWatch(zookeeper, sb.toString())) {
333            if (znode.endsWith(suffix)) {
334              sb.append(ZNodePaths.ZNODE_PATH_SEPARATOR).append(znode);
335              ZKUtil.deleteNode(zookeeper, sb.toString());
336              sb.setLength(levelTwoLength);
337            }
338          }
339          sb.setLength(levelOneLength);
340        }
341        sb.setLength(regionsZNodeLength);
342      }
343    } catch (KeeperException e) {
344      throw new ReplicationException("Failed to remove all last sequence ids, peerId=" + peerId, e);
345    }
346  }
347
348  @Override
349  public void removeLastSequenceIds(String peerId, List<String> encodedRegionNames)
350      throws ReplicationException {
351    try {
352      List<ZKUtilOp> listOfOps =
353        encodedRegionNames.stream().map(n -> getSerialReplicationRegionPeerNode(n, peerId))
354          .map(ZKUtilOp::deleteNodeFailSilent).collect(Collectors.toList());
355      ZKUtil.multiOrSequential(zookeeper, listOfOps, true);
356    } catch (KeeperException e) {
357      throw new ReplicationException("Failed to remove last sequence ids, peerId=" + peerId +
358        ", encodedRegionNames.size=" + encodedRegionNames.size(), e);
359    }
360  }
361
362  @Override
363  public long getWALPosition(ServerName serverName, String queueId, String fileName)
364      throws ReplicationException {
365    byte[] bytes;
366    try {
367      bytes = ZKUtil.getData(zookeeper, getFileNode(serverName, queueId, fileName));
368    } catch (KeeperException | InterruptedException e) {
369      throw new ReplicationException("Failed to get log position (serverName=" + serverName +
370        ", queueId=" + queueId + ", fileName=" + fileName + ")", e);
371    }
372    try {
373      return ZKUtil.parseWALPositionFrom(bytes);
374    } catch (DeserializationException de) {
375      LOG.warn("Failed parse log position (serverName={}, queueId={}, fileName={})",
376          serverName, queueId, fileName);
377    }
378    // if we can not parse the position, start at the beginning of the wal file again
379    return 0;
380  }
381
382  @Override
383  public Pair<String, SortedSet<String>> claimQueue(ServerName sourceServerName, String queueId,
384      ServerName destServerName) throws ReplicationException {
385    LOG.info("Atomically moving {}/{}'s WALs to {}", sourceServerName, queueId, destServerName);
386    try {
387      ZKUtil.createWithParents(zookeeper, getRsNode(destServerName));
388    } catch (KeeperException e) {
389      throw new ReplicationException(
390          "Claim queue queueId=" + queueId + " from " + sourceServerName + " to " + destServerName +
391            " failed when creating the node for " + destServerName,
392          e);
393    }
394    String newQueueId = queueId + "-" + sourceServerName;
395    try {
396      String oldQueueNode = getQueueNode(sourceServerName, queueId);
397      List<String> wals = ZKUtil.listChildrenNoWatch(zookeeper, oldQueueNode);
398      if (CollectionUtils.isEmpty(wals)) {
399        ZKUtil.deleteNodeFailSilent(zookeeper, oldQueueNode);
400        LOG.info("Removed empty {}/{}", sourceServerName, queueId);
401        return new Pair<>(newQueueId, Collections.emptySortedSet());
402      }
403      String newQueueNode = getQueueNode(destServerName, newQueueId);
404      List<ZKUtilOp> listOfOps = new ArrayList<>();
405      SortedSet<String> logQueue = new TreeSet<>();
406      // create the new cluster znode
407      listOfOps.add(ZKUtilOp.createAndFailSilent(newQueueNode, HConstants.EMPTY_BYTE_ARRAY));
408      // get the offset of the logs and set it to new znodes
409      for (String wal : wals) {
410        String oldWalNode = getFileNode(oldQueueNode, wal);
411        byte[] logOffset = ZKUtil.getData(this.zookeeper, oldWalNode);
412        LOG.debug("Creating {} with data {}", wal, Bytes.toStringBinary(logOffset));
413        String newWalNode = getFileNode(newQueueNode, wal);
414        listOfOps.add(ZKUtilOp.createAndFailSilent(newWalNode, logOffset));
415        listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldWalNode));
416        logQueue.add(wal);
417      }
418      // add delete op for peer
419      listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldQueueNode));
420
421      LOG.trace("The multi list size is {}", listOfOps.size());
422      ZKUtil.multiOrSequential(zookeeper, listOfOps, false);
423
424      LOG.info("Atomically moved {}/{}'s WALs to {}", sourceServerName, queueId, destServerName);
425      return new Pair<>(newQueueId, logQueue);
426    } catch (NoNodeException | NodeExistsException | NotEmptyException | BadVersionException e) {
427      // Multi call failed; it looks like some other regionserver took away the logs.
428      // These exceptions mean that zk tells us the request can not be execute. So return an empty
429      // queue to tell the upper layer that claim nothing. For other types of exception should be
430      // thrown out to notify the upper layer.
431      LOG.info("Claim queue queueId={} from {} to {} failed with {}, someone else took the log?",
432          queueId,sourceServerName, destServerName, e.toString());
433      return new Pair<>(newQueueId, Collections.emptySortedSet());
434    } catch (KeeperException | InterruptedException e) {
435      throw new ReplicationException("Claim queue queueId=" + queueId + " from " +
436        sourceServerName + " to " + destServerName + " failed", e);
437    }
438  }
439
440  @Override
441  public void removeReplicatorIfQueueIsEmpty(ServerName serverName) throws ReplicationException {
442    try {
443      ZKUtil.deleteNodeFailSilent(zookeeper, getRsNode(serverName));
444    } catch (NotEmptyException e) {
445      // keep silence to avoid logging too much.
446    } catch (KeeperException e) {
447      throw new ReplicationException("Failed to remove replicator for " + serverName, e);
448    }
449  }
450
451  private List<ServerName> getListOfReplicators0() throws KeeperException {
452    List<String> children = ZKUtil.listChildrenNoWatch(zookeeper, queuesZNode);
453    if (children == null) {
454      children = Collections.emptyList();
455    }
456    return children.stream().map(ServerName::parseServerName).collect(toList());
457  }
458
459  @Override
460  public List<ServerName> getListOfReplicators() throws ReplicationException {
461    try {
462      return getListOfReplicators0();
463    } catch (KeeperException e) {
464      throw new ReplicationException("Failed to get list of replicators", e);
465    }
466  }
467
468  private List<String> getWALsInQueue0(ServerName serverName, String queueId)
469      throws KeeperException {
470    List<String> children = ZKUtil.listChildrenNoWatch(zookeeper, getQueueNode(serverName,
471        queueId));
472    return children != null ? children : Collections.emptyList();
473  }
474
475  @Override
476  public List<String> getWALsInQueue(ServerName serverName, String queueId)
477      throws ReplicationException {
478    try {
479      return getWALsInQueue0(serverName, queueId);
480    } catch (KeeperException e) {
481      throw new ReplicationException(
482          "Failed to get wals in queue (serverName=" + serverName + ", queueId=" + queueId + ")",
483          e);
484    }
485  }
486
487  private List<String> getAllQueues0(ServerName serverName) throws KeeperException {
488    List<String> children = ZKUtil.listChildrenNoWatch(zookeeper, getRsNode(serverName));
489    return children != null ? children : Collections.emptyList();
490  }
491
492  @Override
493  public List<String> getAllQueues(ServerName serverName) throws ReplicationException {
494    try {
495      return getAllQueues0(serverName);
496    } catch (KeeperException e) {
497      throw new ReplicationException("Failed to get all queues (serverName=" + serverName + ")", e);
498    }
499  }
500
501  // will be overridden in UTs
502  protected int getQueuesZNodeCversion() throws KeeperException {
503    Stat stat = new Stat();
504    ZKUtil.getDataNoWatch(this.zookeeper, this.queuesZNode, stat);
505    return stat.getCversion();
506  }
507
508  @Override
509  public Set<String> getAllWALs() throws ReplicationException {
510    try {
511      for (int retry = 0;; retry++) {
512        int v0 = getQueuesZNodeCversion();
513        List<ServerName> rss = getListOfReplicators0();
514        if (rss.isEmpty()) {
515          LOG.debug("Didn't find a RegionServer that replicates, won't prevent deletions.");
516          return Collections.emptySet();
517        }
518        Set<String> wals = new HashSet<>();
519        for (ServerName rs : rss) {
520          for (String queueId : getAllQueues0(rs)) {
521            wals.addAll(getWALsInQueue0(rs, queueId));
522          }
523        }
524        int v1 = getQueuesZNodeCversion();
525        if (v0 == v1) {
526          return wals;
527        }
528        LOG.info("Replication queue node cversion changed from %d to %d, retry = %d",
529            v0, v1, retry);
530      }
531    } catch (KeeperException e) {
532      throw new ReplicationException("Failed to get all wals", e);
533    }
534  }
535
536  private String getHFileRefsPeerNode(String peerId) {
537    return ZNodePaths.joinZNode(hfileRefsZNode, peerId);
538  }
539
540  private String getHFileNode(String peerNode, String fileName) {
541    return ZNodePaths.joinZNode(peerNode, fileName);
542  }
543
544  @Override
545  public void addPeerToHFileRefs(String peerId) throws ReplicationException {
546    String peerNode = getHFileRefsPeerNode(peerId);
547    try {
548      if (ZKUtil.checkExists(zookeeper, peerNode) == -1) {
549        LOG.info("Adding peer {} to hfile reference queue.", peerId);
550        ZKUtil.createWithParents(zookeeper, peerNode);
551      }
552    } catch (KeeperException e) {
553      throw new ReplicationException("Failed to add peer " + peerId + " to hfile reference queue.",
554          e);
555    }
556  }
557
558  @Override
559  public void removePeerFromHFileRefs(String peerId) throws ReplicationException {
560    String peerNode = getHFileRefsPeerNode(peerId);
561    try {
562      if (ZKUtil.checkExists(zookeeper, peerNode) == -1) {
563        LOG.debug("Peer {} not found in hfile reference queue.", peerNode);
564      } else {
565        LOG.info("Removing peer {} from hfile reference queue.", peerNode);
566        ZKUtil.deleteNodeRecursively(zookeeper, peerNode);
567      }
568    } catch (KeeperException e) {
569      throw new ReplicationException(
570          "Failed to remove peer " + peerId + " from hfile reference queue.", e);
571    }
572  }
573
574  @Override
575  public void addHFileRefs(String peerId, List<Pair<Path, Path>> pairs)
576      throws ReplicationException {
577    String peerNode = getHFileRefsPeerNode(peerId);
578    LOG.debug("Adding hfile references {} in queue {}", pairs, peerNode);
579    List<ZKUtilOp> listOfOps = pairs.stream().map(p -> p.getSecond().getName())
580        .map(n -> getHFileNode(peerNode, n))
581        .map(f -> ZKUtilOp.createAndFailSilent(f, HConstants.EMPTY_BYTE_ARRAY)).collect(toList());
582    LOG.debug("The multi list size for adding hfile references in zk for node {} is {}",
583          peerNode, listOfOps.size());
584    try {
585      ZKUtil.multiOrSequential(this.zookeeper, listOfOps, true);
586    } catch (KeeperException e) {
587      throw new ReplicationException("Failed to add hfile reference to peer " + peerId, e);
588    }
589  }
590
591  @Override
592  public void removeHFileRefs(String peerId, List<String> files) throws ReplicationException {
593    String peerNode = getHFileRefsPeerNode(peerId);
594    LOG.debug("Removing hfile references {} from queue {}", files, peerNode);
595
596    List<ZKUtilOp> listOfOps = files.stream().map(n -> getHFileNode(peerNode, n))
597        .map(ZKUtilOp::deleteNodeFailSilent).collect(toList());
598    LOG.debug("The multi list size for removing hfile references in zk for node {} is {}",
599        peerNode, listOfOps.size());
600    try {
601      ZKUtil.multiOrSequential(this.zookeeper, listOfOps, true);
602    } catch (KeeperException e) {
603      throw new ReplicationException("Failed to remove hfile reference from peer " + peerId, e);
604    }
605  }
606
607  private List<String> getAllPeersFromHFileRefsQueue0() throws KeeperException {
608    List<String> children = ZKUtil.listChildrenNoWatch(zookeeper, hfileRefsZNode);
609    return children != null ? children : Collections.emptyList();
610  }
611
612  @Override
613  public List<String> getAllPeersFromHFileRefsQueue() throws ReplicationException {
614    try {
615      return getAllPeersFromHFileRefsQueue0();
616    } catch (KeeperException e) {
617      throw new ReplicationException("Failed to get list of all peers in hfile references node.",
618          e);
619    }
620  }
621
622  private List<String> getReplicableHFiles0(String peerId) throws KeeperException {
623    List<String> children = ZKUtil.listChildrenNoWatch(this.zookeeper,
624        getHFileRefsPeerNode(peerId));
625    return children != null ? children : Collections.emptyList();
626  }
627
628  @Override
629  public List<String> getReplicableHFiles(String peerId) throws ReplicationException {
630    try {
631      return getReplicableHFiles0(peerId);
632    } catch (KeeperException e) {
633      throw new ReplicationException("Failed to get list of hfile references for peer " + peerId,
634          e);
635    }
636  }
637
638  // will be overridden in UTs
639  protected int getHFileRefsZNodeCversion() throws ReplicationException {
640    Stat stat = new Stat();
641    try {
642      ZKUtil.getDataNoWatch(zookeeper, hfileRefsZNode, stat);
643    } catch (KeeperException e) {
644      throw new ReplicationException("Failed to get stat of replication hfile references node.", e);
645    }
646    return stat.getCversion();
647  }
648
649  @Override
650  public Set<String> getAllHFileRefs() throws ReplicationException {
651    try {
652      for (int retry = 0;; retry++) {
653        int v0 = getHFileRefsZNodeCversion();
654        List<String> peers = getAllPeersFromHFileRefsQueue();
655        if (peers.isEmpty()) {
656          LOG.debug("Didn't find any peers with hfile references, won't prevent deletions.");
657          return Collections.emptySet();
658        }
659        Set<String> hfileRefs = new HashSet<>();
660        for (String peer : peers) {
661          hfileRefs.addAll(getReplicableHFiles0(peer));
662        }
663        int v1 = getHFileRefsZNodeCversion();
664        if (v0 == v1) {
665          return hfileRefs;
666        }
667        LOG.debug("Replication hfile references node cversion changed from %d to %d, retry = %d",
668            v0, v1, retry);
669      }
670    } catch (KeeperException e) {
671      throw new ReplicationException("Failed to get all hfile refs", e);
672    }
673  }
674}