001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static java.util.stream.Collectors.toList;
021
022import java.util.ArrayList;
023import java.util.Collections;
024import java.util.HashSet;
025import java.util.List;
026import java.util.Map;
027import java.util.Map.Entry;
028import java.util.Set;
029import java.util.SortedSet;
030import java.util.TreeSet;
031import java.util.stream.Collectors;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.hbase.HConstants;
035import org.apache.hadoop.hbase.ServerName;
036import org.apache.hadoop.hbase.client.RegionInfo;
037import org.apache.hadoop.hbase.exceptions.DeserializationException;
038import org.apache.hadoop.hbase.util.Bytes;
039import org.apache.hadoop.hbase.util.Pair;
040import org.apache.hadoop.hbase.zookeeper.ZKUtil;
041import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp;
042import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
043import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
044import org.apache.yetus.audience.InterfaceAudience;
045import org.apache.zookeeper.KeeperException;
046import org.apache.zookeeper.KeeperException.BadVersionException;
047import org.apache.zookeeper.KeeperException.NoNodeException;
048import org.apache.zookeeper.KeeperException.NodeExistsException;
049import org.apache.zookeeper.KeeperException.NotEmptyException;
050import org.apache.zookeeper.data.Stat;
051import org.slf4j.Logger;
052import org.slf4j.LoggerFactory;
053
054import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
055
056/**
057 * ZK based replication queue storage.
058 * <p>
059 * The base znode for each regionserver is the regionserver name. For example:
060 *
061 * <pre>
062 * /hbase/replication/rs/hostname.example.org,6020,1234
063 * </pre>
064 *
065 * Within this znode, the region server maintains a set of WAL replication queues. These queues are
066 * represented by child znodes named using there give queue id. For example:
067 *
068 * <pre>
069 * /hbase/replication/rs/hostname.example.org,6020,1234/1
070 * /hbase/replication/rs/hostname.example.org,6020,1234/2
071 * </pre>
072 *
073 * Each queue has one child znode for every WAL that still needs to be replicated. The value of
074 * these WAL child znodes is the latest position that has been replicated. This position is updated
075 * every time a WAL entry is replicated. For example:
076 *
077 * <pre>
078 * /hbase/replication/rs/hostname.example.org,6020,1234/1/23522342.23422 [VALUE: 254]
079 * </pre>
080 */
081@InterfaceAudience.Private
082class ZKReplicationQueueStorage extends ZKReplicationStorageBase
083  implements ReplicationQueueStorage {
084
085  private static final Logger LOG = LoggerFactory.getLogger(ZKReplicationQueueStorage.class);
086
087  public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY =
088    "zookeeper.znode.replication.hfile.refs";
089  public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT = "hfile-refs";
090
091  public static final String ZOOKEEPER_ZNODE_REPLICATION_REGIONS_KEY =
092    "zookeeper.znode.replication.regions";
093  public static final String ZOOKEEPER_ZNODE_REPLICATION_REGIONS_DEFAULT = "regions";
094
095  /**
096   * The name of the znode that contains all replication queues
097   */
098  private final String queuesZNode;
099
100  /**
101   * The name of the znode that contains queues of hfile references to be replicated
102   */
103  private final String hfileRefsZNode;
104
105  final String regionsZNode;
106
107  public ZKReplicationQueueStorage(ZKWatcher zookeeper, Configuration conf) {
108    super(zookeeper, conf);
109
110    String queuesZNodeName = conf.get("zookeeper.znode.replication.rs", "rs");
111    String hfileRefsZNodeName = conf.get(ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY,
112      ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT);
113    this.queuesZNode = ZNodePaths.joinZNode(replicationZNode, queuesZNodeName);
114    this.hfileRefsZNode = ZNodePaths.joinZNode(replicationZNode, hfileRefsZNodeName);
115    this.regionsZNode = ZNodePaths.joinZNode(replicationZNode, conf
116      .get(ZOOKEEPER_ZNODE_REPLICATION_REGIONS_KEY, ZOOKEEPER_ZNODE_REPLICATION_REGIONS_DEFAULT));
117  }
118
119  @Override
120  public String getRsNode(ServerName serverName) {
121    return ZNodePaths.joinZNode(queuesZNode, serverName.getServerName());
122  }
123
124  private String getQueueNode(ServerName serverName, String queueId) {
125    return ZNodePaths.joinZNode(getRsNode(serverName), queueId);
126  }
127
128  private String getFileNode(String queueNode, String fileName) {
129    return ZNodePaths.joinZNode(queueNode, fileName);
130  }
131
132  private String getFileNode(ServerName serverName, String queueId, String fileName) {
133    return getFileNode(getQueueNode(serverName, queueId), fileName);
134  }
135
136  /**
137   * <p>
138   * Put all regions under /hbase/replication/regions znode will lead to too many children because
139   * of the huge number of regions in real production environment. So here we will distribute the
140   * znodes to multiple directories.
141   * </p>
142   * <p>
143   * So the final znode path will be format like this:
144   *
145   * <pre>
146   * /hbase/replication/regions/dd/04/e76a6966d4ffa908ed0586764767-100
147   * </pre>
148   *
149   * Here the full encoded region name is dd04e76a6966d4ffa908ed0586764767, and we use the first two
150   * characters 'dd' as the first level directory name, and use the next two characters '04' as the
151   * second level directory name, and the rest part as the prefix of the znode, and the suffix '100'
152   * is the peer id.
153   * </p>
154   * @param encodedRegionName the encoded region name.
155   * @param peerId            peer id for replication.
156   * @return ZNode path to persist the max sequence id that we've pushed for the given region and
157   *         peer.
158   */
159  String getSerialReplicationRegionPeerNode(String encodedRegionName, String peerId) {
160    if (encodedRegionName == null || encodedRegionName.length() != RegionInfo.MD5_HEX_LENGTH) {
161      throw new IllegalArgumentException(
162        "Invalid encoded region name: " + encodedRegionName + ", length should be 32.");
163    }
164    return new StringBuilder(regionsZNode).append(ZNodePaths.ZNODE_PATH_SEPARATOR)
165      .append(encodedRegionName, 0, 2).append(ZNodePaths.ZNODE_PATH_SEPARATOR)
166      .append(encodedRegionName, 2, 4).append(ZNodePaths.ZNODE_PATH_SEPARATOR)
167      .append(encodedRegionName, 4, encodedRegionName.length()).append("-").append(peerId)
168      .toString();
169  }
170
171  @Override
172  public void removeQueue(ServerName serverName, String queueId) throws ReplicationException {
173    try {
174      ZKUtil.deleteNodeRecursively(zookeeper, getQueueNode(serverName, queueId));
175    } catch (KeeperException e) {
176      throw new ReplicationException(
177        "Failed to delete queue (serverName=" + serverName + ", queueId=" + queueId + ")", e);
178    }
179  }
180
181  @Override
182  public void addWAL(ServerName serverName, String queueId, String fileName)
183    throws ReplicationException {
184    try {
185      ZKUtil.createWithParents(zookeeper, getFileNode(serverName, queueId, fileName));
186    } catch (KeeperException e) {
187      throw new ReplicationException("Failed to add wal to queue (serverName=" + serverName
188        + ", queueId=" + queueId + ", fileName=" + fileName + ")", e);
189    }
190  }
191
192  @Override
193  public void removeWAL(ServerName serverName, String queueId, String fileName)
194    throws ReplicationException {
195    String fileNode = getFileNode(serverName, queueId, fileName);
196    try {
197      ZKUtil.deleteNode(zookeeper, fileNode);
198    } catch (NoNodeException e) {
199      LOG.warn("{} already deleted when removing log", fileNode);
200    } catch (KeeperException e) {
201      throw new ReplicationException("Failed to remove wal from queue (serverName=" + serverName
202        + ", queueId=" + queueId + ", fileName=" + fileName + ")", e);
203    }
204  }
205
206  private void addLastSeqIdsToOps(String queueId, Map<String, Long> lastSeqIds,
207    List<ZKUtilOp> listOfOps) throws KeeperException, ReplicationException {
208    String peerId = new ReplicationQueueInfo(queueId).getPeerId();
209    for (Entry<String, Long> lastSeqEntry : lastSeqIds.entrySet()) {
210      String path = getSerialReplicationRegionPeerNode(lastSeqEntry.getKey(), peerId);
211      Pair<Long, Integer> p = getLastSequenceIdWithVersion(lastSeqEntry.getKey(), peerId);
212      byte[] data = ZKUtil.positionToByteArray(lastSeqEntry.getValue());
213      if (p.getSecond() < 0) { // ZNode does not exist.
214        ZKUtil.createWithParents(zookeeper,
215          path.substring(0, path.lastIndexOf(ZNodePaths.ZNODE_PATH_SEPARATOR)));
216        listOfOps.add(ZKUtilOp.createAndFailSilent(path, data));
217        continue;
218      }
219      // Perform CAS in a specific version v0 (HBASE-20138)
220      int v0 = p.getSecond();
221      long lastPushedSeqId = p.getFirst();
222      if (lastSeqEntry.getValue() <= lastPushedSeqId) {
223        continue;
224      }
225      listOfOps.add(ZKUtilOp.setData(path, data, v0));
226    }
227  }
228
229  @Override
230  public void setWALPosition(ServerName serverName, String queueId, String fileName, long position,
231    Map<String, Long> lastSeqIds) throws ReplicationException {
232    try {
233      for (int retry = 0;; retry++) {
234        List<ZKUtilOp> listOfOps = new ArrayList<>();
235        if (position > 0) {
236          listOfOps.add(ZKUtilOp.setData(getFileNode(serverName, queueId, fileName),
237            ZKUtil.positionToByteArray(position)));
238        }
239        // Persist the max sequence id(s) of regions for serial replication atomically.
240        addLastSeqIdsToOps(queueId, lastSeqIds, listOfOps);
241        if (listOfOps.isEmpty()) {
242          return;
243        }
244        try {
245          ZKUtil.multiOrSequential(zookeeper, listOfOps, false);
246          return;
247        } catch (KeeperException.BadVersionException | KeeperException.NodeExistsException e) {
248          LOG.warn(
249            "Bad version(or node exist) when persist the last pushed sequence id to zookeeper "
250              + "storage, Retry = " + retry + ", serverName=" + serverName + ", queueId=" + queueId
251              + ", fileName=" + fileName);
252        }
253      }
254    } catch (KeeperException e) {
255      throw new ReplicationException("Failed to set log position (serverName=" + serverName
256        + ", queueId=" + queueId + ", fileName=" + fileName + ", position=" + position + ")", e);
257    }
258  }
259
260  /**
261   * Return the {lastPushedSequenceId, ZNodeDataVersion} pair. if ZNodeDataVersion is -1, it means
262   * that the ZNode does not exist.
263   */
264  protected Pair<Long, Integer> getLastSequenceIdWithVersion(String encodedRegionName,
265    String peerId) throws KeeperException {
266    Stat stat = new Stat();
267    String path = getSerialReplicationRegionPeerNode(encodedRegionName, peerId);
268    byte[] data = ZKUtil.getDataNoWatch(zookeeper, path, stat);
269    if (data == null) {
270      // ZNode does not exist, so just return version -1 to indicate that no node exist.
271      return Pair.newPair(HConstants.NO_SEQNUM, -1);
272    }
273    try {
274      return Pair.newPair(ZKUtil.parseWALPositionFrom(data), stat.getVersion());
275    } catch (DeserializationException de) {
276      LOG.warn("Failed to parse log position (region=" + encodedRegionName + ", peerId=" + peerId
277        + "), data=" + Bytes.toStringBinary(data));
278    }
279    return Pair.newPair(HConstants.NO_SEQNUM, stat.getVersion());
280  }
281
282  @Override
283  public long getLastSequenceId(String encodedRegionName, String peerId)
284    throws ReplicationException {
285    try {
286      return getLastSequenceIdWithVersion(encodedRegionName, peerId).getFirst();
287    } catch (KeeperException e) {
288      throw new ReplicationException("Failed to get last pushed sequence id (encodedRegionName="
289        + encodedRegionName + ", peerId=" + peerId + ")", e);
290    }
291  }
292
293  @Override
294  public void setLastSequenceIds(String peerId, Map<String, Long> lastSeqIds)
295    throws ReplicationException {
296    try {
297      // No need CAS and retry here, because it'll call setLastSequenceIds() for disabled peers
298      // only, so no conflict happen.
299      List<ZKUtilOp> listOfOps = new ArrayList<>();
300      for (Entry<String, Long> lastSeqEntry : lastSeqIds.entrySet()) {
301        String path = getSerialReplicationRegionPeerNode(lastSeqEntry.getKey(), peerId);
302        ZKUtil.createWithParents(zookeeper, path);
303        listOfOps.add(ZKUtilOp.setData(path, ZKUtil.positionToByteArray(lastSeqEntry.getValue())));
304      }
305      if (!listOfOps.isEmpty()) {
306        ZKUtil.multiOrSequential(zookeeper, listOfOps, true);
307      }
308    } catch (KeeperException e) {
309      throw new ReplicationException("Failed to set last sequence ids, peerId=" + peerId
310        + ", size of lastSeqIds=" + lastSeqIds.size(), e);
311    }
312  }
313
314  @Override
315  public void removeLastSequenceIds(String peerId) throws ReplicationException {
316    String suffix = "-" + peerId;
317    try {
318      StringBuilder sb = new StringBuilder(regionsZNode);
319      int regionsZNodeLength = regionsZNode.length();
320      int levelOneLength = regionsZNodeLength + 3;
321      int levelTwoLength = levelOneLength + 3;
322      List<String> levelOneDirs = ZKUtil.listChildrenNoWatch(zookeeper, regionsZNode);
323      // it is possible that levelOneDirs is null if we haven't write any last pushed sequence ids
324      // yet, so we need an extra check here.
325      if (CollectionUtils.isEmpty(levelOneDirs)) {
326        return;
327      }
328      for (String levelOne : levelOneDirs) {
329        sb.append(ZNodePaths.ZNODE_PATH_SEPARATOR).append(levelOne);
330        for (String levelTwo : ZKUtil.listChildrenNoWatch(zookeeper, sb.toString())) {
331          sb.append(ZNodePaths.ZNODE_PATH_SEPARATOR).append(levelTwo);
332          for (String znode : ZKUtil.listChildrenNoWatch(zookeeper, sb.toString())) {
333            if (znode.endsWith(suffix)) {
334              sb.append(ZNodePaths.ZNODE_PATH_SEPARATOR).append(znode);
335              ZKUtil.deleteNode(zookeeper, sb.toString());
336              sb.setLength(levelTwoLength);
337            }
338          }
339          sb.setLength(levelOneLength);
340        }
341        sb.setLength(regionsZNodeLength);
342      }
343    } catch (KeeperException e) {
344      throw new ReplicationException("Failed to remove all last sequence ids, peerId=" + peerId, e);
345    }
346  }
347
348  @Override
349  public void removeLastSequenceIds(String peerId, List<String> encodedRegionNames)
350    throws ReplicationException {
351    try {
352      List<ZKUtilOp> listOfOps =
353        encodedRegionNames.stream().map(n -> getSerialReplicationRegionPeerNode(n, peerId))
354          .map(ZKUtilOp::deleteNodeFailSilent).collect(Collectors.toList());
355      ZKUtil.multiOrSequential(zookeeper, listOfOps, true);
356    } catch (KeeperException e) {
357      throw new ReplicationException("Failed to remove last sequence ids, peerId=" + peerId
358        + ", encodedRegionNames.size=" + encodedRegionNames.size(), e);
359    }
360  }
361
362  @Override
363  public long getWALPosition(ServerName serverName, String queueId, String fileName)
364    throws ReplicationException {
365    byte[] bytes;
366    try {
367      bytes = ZKUtil.getData(zookeeper, getFileNode(serverName, queueId, fileName));
368    } catch (KeeperException | InterruptedException e) {
369      throw new ReplicationException("Failed to get log position (serverName=" + serverName
370        + ", queueId=" + queueId + ", fileName=" + fileName + ")", e);
371    }
372    try {
373      return ZKUtil.parseWALPositionFrom(bytes);
374    } catch (DeserializationException de) {
375      LOG.warn("Failed parse log position (serverName={}, queueId={}, fileName={})", serverName,
376        queueId, fileName);
377    }
378    // if we can not parse the position, start at the beginning of the wal file again
379    return 0;
380  }
381
382  /**
383   * This implement must update the cversion of root {@link #queuesZNode}. The optimistic lock of
384   * the {@link #getAllWALs()} method is based on the cversion of root {@link #queuesZNode}.
385   * @see #getAllWALs() to show the usage of the cversion of root {@link #queuesZNode} .
386   */
387  @Override
388  public Pair<String, SortedSet<String>> claimQueue(ServerName sourceServerName, String queueId,
389    ServerName destServerName) throws ReplicationException {
390    LOG.info("Atomically moving {}/{}'s WALs to {}", sourceServerName, queueId, destServerName);
391    try {
392      ZKUtil.createWithParents(zookeeper, getRsNode(destServerName));
393    } catch (KeeperException e) {
394      throw new ReplicationException("Claim queue queueId=" + queueId + " from " + sourceServerName
395        + " to " + destServerName + " failed when creating the node for " + destServerName, e);
396    }
397    String newQueueId = queueId + "-" + sourceServerName;
398    try {
399      String oldQueueNode = getQueueNode(sourceServerName, queueId);
400      List<String> wals = ZKUtil.listChildrenNoWatch(zookeeper, oldQueueNode);
401      if (CollectionUtils.isEmpty(wals)) {
402        ZKUtil.deleteNodeFailSilent(zookeeper, oldQueueNode);
403        LOG.info("Removed empty {}/{}", sourceServerName, queueId);
404        return new Pair<>(newQueueId, Collections.emptySortedSet());
405      }
406      String newQueueNode = getQueueNode(destServerName, newQueueId);
407      List<ZKUtilOp> listOfOps = new ArrayList<>();
408      SortedSet<String> logQueue = new TreeSet<>();
409      // create the new cluster znode
410      listOfOps.add(ZKUtilOp.createAndFailSilent(newQueueNode, HConstants.EMPTY_BYTE_ARRAY));
411      // get the offset of the logs and set it to new znodes
412      for (String wal : wals) {
413        String oldWalNode = getFileNode(oldQueueNode, wal);
414        byte[] logOffset = ZKUtil.getData(this.zookeeper, oldWalNode);
415        LOG.debug("Creating {} with data {}", wal, Bytes.toStringBinary(logOffset));
416        String newWalNode = getFileNode(newQueueNode, wal);
417        listOfOps.add(ZKUtilOp.createAndFailSilent(newWalNode, logOffset));
418        listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldWalNode));
419        logQueue.add(wal);
420      }
421      // add delete op for peer
422      listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldQueueNode));
423      // Append new queue id for prevent lock competition in zookeeper server.
424      String claimLockZNode = ZNodePaths.joinZNode(queuesZNode, "cversion_" + newQueueId);
425      // A trick for update the cversion of root queuesZNode .
426      // The optimistic lock of the getAllWALs() method is based on the cversion of root queuesZNode
427      listOfOps.add(ZKUtilOp.createAndFailSilent(claimLockZNode, HConstants.EMPTY_BYTE_ARRAY));
428      listOfOps.add(ZKUtilOp.deleteNodeFailSilent(claimLockZNode));
429
430      LOG.trace("The multi list size is {}", listOfOps.size());
431      ZKUtil.multiOrSequential(zookeeper, listOfOps, false);
432
433      LOG.info("Atomically moved {}/{}'s WALs to {}", sourceServerName, queueId, destServerName);
434      return new Pair<>(newQueueId, logQueue);
435    } catch (NoNodeException | NodeExistsException | NotEmptyException | BadVersionException e) {
436      // Multi call failed; it looks like some other regionserver took away the logs.
437      // These exceptions mean that zk tells us the request can not be execute. So return an empty
438      // queue to tell the upper layer that claim nothing. For other types of exception should be
439      // thrown out to notify the upper layer.
440      LOG.info("Claim queue queueId={} from {} to {} failed with {}, someone else took the log?",
441        queueId, sourceServerName, destServerName, e.toString());
442      return new Pair<>(newQueueId, Collections.emptySortedSet());
443    } catch (KeeperException | InterruptedException e) {
444      throw new ReplicationException("Claim queue queueId=" + queueId + " from " + sourceServerName
445        + " to " + destServerName + " failed", e);
446    }
447  }
448
449  @Override
450  public void removeReplicatorIfQueueIsEmpty(ServerName serverName) throws ReplicationException {
451    try {
452      ZKUtil.deleteNodeFailSilent(zookeeper, getRsNode(serverName));
453    } catch (NotEmptyException e) {
454      // keep silence to avoid logging too much.
455    } catch (KeeperException e) {
456      throw new ReplicationException("Failed to remove replicator for " + serverName, e);
457    }
458  }
459
460  private List<ServerName> getListOfReplicators0() throws KeeperException {
461    List<String> children = ZKUtil.listChildrenNoWatch(zookeeper, queuesZNode);
462    if (children == null) {
463      children = Collections.emptyList();
464    }
465    return children.stream().map(ServerName::parseServerName).collect(toList());
466  }
467
468  @Override
469  public List<ServerName> getListOfReplicators() throws ReplicationException {
470    try {
471      return getListOfReplicators0();
472    } catch (KeeperException e) {
473      throw new ReplicationException("Failed to get list of replicators", e);
474    }
475  }
476
477  private List<String> getWALsInQueue0(ServerName serverName, String queueId)
478    throws KeeperException {
479    List<String> children =
480      ZKUtil.listChildrenNoWatch(zookeeper, getQueueNode(serverName, queueId));
481    return children != null ? children : Collections.emptyList();
482  }
483
484  @Override
485  public List<String> getWALsInQueue(ServerName serverName, String queueId)
486    throws ReplicationException {
487    try {
488      return getWALsInQueue0(serverName, queueId);
489    } catch (KeeperException e) {
490      throw new ReplicationException(
491        "Failed to get wals in queue (serverName=" + serverName + ", queueId=" + queueId + ")", e);
492    }
493  }
494
495  private List<String> getAllQueues0(ServerName serverName) throws KeeperException {
496    List<String> children = ZKUtil.listChildrenNoWatch(zookeeper, getRsNode(serverName));
497    return children != null ? children : Collections.emptyList();
498  }
499
500  @Override
501  public List<String> getAllQueues(ServerName serverName) throws ReplicationException {
502    try {
503      return getAllQueues0(serverName);
504    } catch (KeeperException e) {
505      throw new ReplicationException("Failed to get all queues (serverName=" + serverName + ")", e);
506    }
507  }
508
509  // will be overridden in UTs
510  protected int getQueuesZNodeCversion() throws KeeperException {
511    Stat stat = new Stat();
512    ZKUtil.getDataNoWatch(this.zookeeper, this.queuesZNode, stat);
513    return stat.getCversion();
514  }
515
516  /**
517   * The optimistic lock of this implement is based on the cversion of root {@link #queuesZNode}.
518   * Therefore, we must update the cversion of root {@link #queuesZNode} when migrate wal nodes to
519   * other queues.
520   * @see #claimQueue(ServerName, String, ServerName) as an example of updating root
521   *      {@link #queuesZNode} cversion.
522   */
523  @Override
524  public Set<String> getAllWALs() throws ReplicationException {
525    try {
526      for (int retry = 0;; retry++) {
527        int v0 = getQueuesZNodeCversion();
528        List<ServerName> rss = getListOfReplicators0();
529        if (rss.isEmpty()) {
530          LOG.debug("Didn't find a RegionServer that replicates, won't prevent deletions.");
531          return Collections.emptySet();
532        }
533        Set<String> wals = new HashSet<>();
534        for (ServerName rs : rss) {
535          for (String queueId : getAllQueues0(rs)) {
536            wals.addAll(getWALsInQueue0(rs, queueId));
537          }
538        }
539        int v1 = getQueuesZNodeCversion();
540        if (v0 == v1) {
541          return wals;
542        }
543        LOG.info("Replication queue node cversion changed from %d to %d, retry = %d", v0, v1,
544          retry);
545      }
546    } catch (KeeperException e) {
547      throw new ReplicationException("Failed to get all wals", e);
548    }
549  }
550
551  private String getHFileRefsPeerNode(String peerId) {
552    return ZNodePaths.joinZNode(hfileRefsZNode, peerId);
553  }
554
555  private String getHFileNode(String peerNode, String fileName) {
556    return ZNodePaths.joinZNode(peerNode, fileName);
557  }
558
559  @Override
560  public void addPeerToHFileRefs(String peerId) throws ReplicationException {
561    String peerNode = getHFileRefsPeerNode(peerId);
562    try {
563      if (ZKUtil.checkExists(zookeeper, peerNode) == -1) {
564        LOG.info("Adding peer {} to hfile reference queue.", peerId);
565        ZKUtil.createWithParents(zookeeper, peerNode);
566      }
567    } catch (KeeperException e) {
568      throw new ReplicationException("Failed to add peer " + peerId + " to hfile reference queue.",
569        e);
570    }
571  }
572
573  @Override
574  public void removePeerFromHFileRefs(String peerId) throws ReplicationException {
575    String peerNode = getHFileRefsPeerNode(peerId);
576    try {
577      if (ZKUtil.checkExists(zookeeper, peerNode) == -1) {
578        LOG.debug("Peer {} not found in hfile reference queue.", peerNode);
579      } else {
580        LOG.info("Removing peer {} from hfile reference queue.", peerNode);
581        ZKUtil.deleteNodeRecursively(zookeeper, peerNode);
582      }
583    } catch (KeeperException e) {
584      throw new ReplicationException(
585        "Failed to remove peer " + peerId + " from hfile reference queue.", e);
586    }
587  }
588
589  @Override
590  public void addHFileRefs(String peerId, List<Pair<Path, Path>> pairs)
591    throws ReplicationException {
592    String peerNode = getHFileRefsPeerNode(peerId);
593    LOG.debug("Adding hfile references {} in queue {}", pairs, peerNode);
594    List<ZKUtilOp> listOfOps =
595      pairs.stream().map(p -> p.getSecond().getName()).map(n -> getHFileNode(peerNode, n))
596        .map(f -> ZKUtilOp.createAndFailSilent(f, HConstants.EMPTY_BYTE_ARRAY)).collect(toList());
597    LOG.debug("The multi list size for adding hfile references in zk for node {} is {}", peerNode,
598      listOfOps.size());
599    try {
600      ZKUtil.multiOrSequential(this.zookeeper, listOfOps, true);
601    } catch (KeeperException e) {
602      throw new ReplicationException("Failed to add hfile reference to peer " + peerId, e);
603    }
604  }
605
606  @Override
607  public void removeHFileRefs(String peerId, List<String> files) throws ReplicationException {
608    String peerNode = getHFileRefsPeerNode(peerId);
609    LOG.debug("Removing hfile references {} from queue {}", files, peerNode);
610
611    List<ZKUtilOp> listOfOps = files.stream().map(n -> getHFileNode(peerNode, n))
612      .map(ZKUtilOp::deleteNodeFailSilent).collect(toList());
613    LOG.debug("The multi list size for removing hfile references in zk for node {} is {}", peerNode,
614      listOfOps.size());
615    try {
616      ZKUtil.multiOrSequential(this.zookeeper, listOfOps, true);
617    } catch (KeeperException e) {
618      throw new ReplicationException("Failed to remove hfile reference from peer " + peerId, e);
619    }
620  }
621
622  private List<String> getAllPeersFromHFileRefsQueue0() throws KeeperException {
623    List<String> children = ZKUtil.listChildrenNoWatch(zookeeper, hfileRefsZNode);
624    return children != null ? children : Collections.emptyList();
625  }
626
627  @Override
628  public List<String> getAllPeersFromHFileRefsQueue() throws ReplicationException {
629    try {
630      return getAllPeersFromHFileRefsQueue0();
631    } catch (KeeperException e) {
632      throw new ReplicationException("Failed to get list of all peers in hfile references node.",
633        e);
634    }
635  }
636
637  private List<String> getReplicableHFiles0(String peerId) throws KeeperException {
638    List<String> children =
639      ZKUtil.listChildrenNoWatch(this.zookeeper, getHFileRefsPeerNode(peerId));
640    return children != null ? children : Collections.emptyList();
641  }
642
643  @Override
644  public List<String> getReplicableHFiles(String peerId) throws ReplicationException {
645    try {
646      return getReplicableHFiles0(peerId);
647    } catch (KeeperException e) {
648      throw new ReplicationException("Failed to get list of hfile references for peer " + peerId,
649        e);
650    }
651  }
652
653  // will be overridden in UTs
654  protected int getHFileRefsZNodeCversion() throws ReplicationException {
655    Stat stat = new Stat();
656    try {
657      ZKUtil.getDataNoWatch(zookeeper, hfileRefsZNode, stat);
658    } catch (KeeperException e) {
659      throw new ReplicationException("Failed to get stat of replication hfile references node.", e);
660    }
661    return stat.getCversion();
662  }
663
664  @Override
665  public Set<String> getAllHFileRefs() throws ReplicationException {
666    try {
667      for (int retry = 0;; retry++) {
668        int v0 = getHFileRefsZNodeCversion();
669        List<String> peers = getAllPeersFromHFileRefsQueue();
670        if (peers.isEmpty()) {
671          LOG.debug("Didn't find any peers with hfile references, won't prevent deletions.");
672          return Collections.emptySet();
673        }
674        Set<String> hfileRefs = new HashSet<>();
675        for (String peer : peers) {
676          hfileRefs.addAll(getReplicableHFiles0(peer));
677        }
678        int v1 = getHFileRefsZNodeCversion();
679        if (v0 == v1) {
680          return hfileRefs;
681        }
682        LOG.debug("Replication hfile references node cversion changed from %d to %d, retry = %d",
683          v0, v1, retry);
684      }
685    } catch (KeeperException e) {
686      throw new ReplicationException("Failed to get all hfile refs", e);
687    }
688  }
689}