001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static java.util.stream.Collectors.toList; 021 022import java.util.ArrayList; 023import java.util.Collections; 024import java.util.HashSet; 025import java.util.List; 026import java.util.Map; 027import java.util.Map.Entry; 028import java.util.Set; 029import java.util.SortedSet; 030import java.util.TreeSet; 031import java.util.stream.Collectors; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.hbase.HConstants; 035import org.apache.hadoop.hbase.ServerName; 036import org.apache.hadoop.hbase.client.RegionInfo; 037import org.apache.hadoop.hbase.exceptions.DeserializationException; 038import org.apache.hadoop.hbase.util.Bytes; 039import org.apache.hadoop.hbase.util.Pair; 040import org.apache.hadoop.hbase.zookeeper.ZKUtil; 041import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp; 042import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 043import org.apache.hadoop.hbase.zookeeper.ZNodePaths; 044import org.apache.yetus.audience.InterfaceAudience; 045import org.apache.zookeeper.KeeperException; 046import org.apache.zookeeper.KeeperException.BadVersionException; 047import org.apache.zookeeper.KeeperException.NoNodeException; 048import org.apache.zookeeper.KeeperException.NodeExistsException; 049import org.apache.zookeeper.KeeperException.NotEmptyException; 050import org.apache.zookeeper.data.Stat; 051import org.slf4j.Logger; 052import org.slf4j.LoggerFactory; 053 054import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 055import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 056 057/** 058 * ZK based replication queue storage. 059 * <p> 060 * The base znode for each regionserver is the regionserver name. For example: 061 * 062 * <pre> 063 * /hbase/replication/rs/hostname.example.org,6020,1234 064 * </pre> 065 * 066 * Within this znode, the region server maintains a set of WAL replication queues. These queues are 067 * represented by child znodes named using there give queue id. For example: 068 * 069 * <pre> 070 * /hbase/replication/rs/hostname.example.org,6020,1234/1 071 * /hbase/replication/rs/hostname.example.org,6020,1234/2 072 * </pre> 073 * 074 * Each queue has one child znode for every WAL that still needs to be replicated. The value of 075 * these WAL child znodes is the latest position that has been replicated. This position is updated 076 * every time a WAL entry is replicated. For example: 077 * 078 * <pre> 079 * /hbase/replication/rs/hostname.example.org,6020,1234/1/23522342.23422 [VALUE: 254] 080 * </pre> 081 */ 082@InterfaceAudience.Private 083class ZKReplicationQueueStorage extends ZKReplicationStorageBase 084 implements ReplicationQueueStorage { 085 086 private static final Logger LOG = LoggerFactory.getLogger(ZKReplicationQueueStorage.class); 087 088 public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY = 089 "zookeeper.znode.replication.hfile.refs"; 090 public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT = "hfile-refs"; 091 092 public static final String ZOOKEEPER_ZNODE_REPLICATION_REGIONS_KEY = 093 "zookeeper.znode.replication.regions"; 094 public static final String ZOOKEEPER_ZNODE_REPLICATION_REGIONS_DEFAULT = "regions"; 095 096 /** 097 * The name of the znode that contains all replication queues 098 */ 099 private final String queuesZNode; 100 101 /** 102 * The name of the znode that contains queues of hfile references to be replicated 103 */ 104 private final String hfileRefsZNode; 105 106 @VisibleForTesting 107 final String regionsZNode; 108 109 public ZKReplicationQueueStorage(ZKWatcher zookeeper, Configuration conf) { 110 super(zookeeper, conf); 111 112 String queuesZNodeName = conf.get("zookeeper.znode.replication.rs", "rs"); 113 String hfileRefsZNodeName = conf.get(ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY, 114 ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT); 115 this.queuesZNode = ZNodePaths.joinZNode(replicationZNode, queuesZNodeName); 116 this.hfileRefsZNode = ZNodePaths.joinZNode(replicationZNode, hfileRefsZNodeName); 117 this.regionsZNode = ZNodePaths.joinZNode(replicationZNode, conf 118 .get(ZOOKEEPER_ZNODE_REPLICATION_REGIONS_KEY, ZOOKEEPER_ZNODE_REPLICATION_REGIONS_DEFAULT)); 119 } 120 121 @Override 122 public String getRsNode(ServerName serverName) { 123 return ZNodePaths.joinZNode(queuesZNode, serverName.getServerName()); 124 } 125 126 private String getQueueNode(ServerName serverName, String queueId) { 127 return ZNodePaths.joinZNode(getRsNode(serverName), queueId); 128 } 129 130 private String getFileNode(String queueNode, String fileName) { 131 return ZNodePaths.joinZNode(queueNode, fileName); 132 } 133 134 private String getFileNode(ServerName serverName, String queueId, String fileName) { 135 return getFileNode(getQueueNode(serverName, queueId), fileName); 136 } 137 138 /** 139 * <p> 140 * Put all regions under /hbase/replication/regions znode will lead to too many children because 141 * of the huge number of regions in real production environment. So here we will distribute the 142 * znodes to multiple directories. 143 * </p> 144 * <p> 145 * So the final znode path will be format like this: 146 * 147 * <pre> 148 * /hbase/replication/regions/dd/04/e76a6966d4ffa908ed0586764767-100 149 * </pre> 150 * 151 * Here the full encoded region name is dd04e76a6966d4ffa908ed0586764767, and we use the first two 152 * characters 'dd' as the first level directory name, and use the next two characters '04' as the 153 * second level directory name, and the rest part as the prefix of the znode, and the suffix '100' 154 * is the peer id. 155 * </p> 156 * @param encodedRegionName the encoded region name. 157 * @param peerId peer id for replication. 158 * @return ZNode path to persist the max sequence id that we've pushed for the given region and 159 * peer. 160 */ 161 @VisibleForTesting 162 String getSerialReplicationRegionPeerNode(String encodedRegionName, String peerId) { 163 if (encodedRegionName == null || encodedRegionName.length() != RegionInfo.MD5_HEX_LENGTH) { 164 throw new IllegalArgumentException( 165 "Invalid encoded region name: " + encodedRegionName + ", length should be 32."); 166 } 167 return new StringBuilder(regionsZNode).append(ZNodePaths.ZNODE_PATH_SEPARATOR) 168 .append(encodedRegionName, 0, 2).append(ZNodePaths.ZNODE_PATH_SEPARATOR) 169 .append(encodedRegionName, 2, 4).append(ZNodePaths.ZNODE_PATH_SEPARATOR) 170 .append(encodedRegionName, 4, encodedRegionName.length()).append("-").append(peerId) 171 .toString(); 172 } 173 174 @Override 175 public void removeQueue(ServerName serverName, String queueId) throws ReplicationException { 176 try { 177 ZKUtil.deleteNodeRecursively(zookeeper, getQueueNode(serverName, queueId)); 178 } catch (KeeperException e) { 179 throw new ReplicationException( 180 "Failed to delete queue (serverName=" + serverName + ", queueId=" + queueId + ")", e); 181 } 182 } 183 184 @Override 185 public void addWAL(ServerName serverName, String queueId, String fileName) 186 throws ReplicationException { 187 try { 188 ZKUtil.createWithParents(zookeeper, getFileNode(serverName, queueId, fileName)); 189 } catch (KeeperException e) { 190 throw new ReplicationException("Failed to add wal to queue (serverName=" + serverName 191 + ", queueId=" + queueId + ", fileName=" + fileName + ")", e); 192 } 193 } 194 195 @Override 196 public void removeWAL(ServerName serverName, String queueId, String fileName) 197 throws ReplicationException { 198 String fileNode = getFileNode(serverName, queueId, fileName); 199 try { 200 ZKUtil.deleteNode(zookeeper, fileNode); 201 } catch (NoNodeException e) { 202 LOG.warn("{} already deleted when removing log", fileNode); 203 } catch (KeeperException e) { 204 throw new ReplicationException("Failed to remove wal from queue (serverName=" + serverName + 205 ", queueId=" + queueId + ", fileName=" + fileName + ")", e); 206 } 207 } 208 209 private void addLastSeqIdsToOps(String queueId, Map<String, Long> lastSeqIds, 210 List<ZKUtilOp> listOfOps) throws KeeperException, ReplicationException { 211 String peerId = new ReplicationQueueInfo(queueId).getPeerId(); 212 for (Entry<String, Long> lastSeqEntry : lastSeqIds.entrySet()) { 213 String path = getSerialReplicationRegionPeerNode(lastSeqEntry.getKey(), peerId); 214 Pair<Long, Integer> p = getLastSequenceIdWithVersion(lastSeqEntry.getKey(), peerId); 215 byte[] data = ZKUtil.positionToByteArray(lastSeqEntry.getValue()); 216 if (p.getSecond() < 0) { // ZNode does not exist. 217 ZKUtil.createWithParents(zookeeper, 218 path.substring(0, path.lastIndexOf(ZNodePaths.ZNODE_PATH_SEPARATOR))); 219 listOfOps.add(ZKUtilOp.createAndFailSilent(path, data)); 220 continue; 221 } 222 // Perform CAS in a specific version v0 (HBASE-20138) 223 int v0 = p.getSecond(); 224 long lastPushedSeqId = p.getFirst(); 225 if (lastSeqEntry.getValue() <= lastPushedSeqId) { 226 continue; 227 } 228 listOfOps.add(ZKUtilOp.setData(path, data, v0)); 229 } 230 } 231 232 @Override 233 public void setWALPosition(ServerName serverName, String queueId, String fileName, long position, 234 Map<String, Long> lastSeqIds) throws ReplicationException { 235 try { 236 for (int retry = 0;; retry++) { 237 List<ZKUtilOp> listOfOps = new ArrayList<>(); 238 if (position > 0) { 239 listOfOps.add(ZKUtilOp.setData(getFileNode(serverName, queueId, fileName), 240 ZKUtil.positionToByteArray(position))); 241 } 242 // Persist the max sequence id(s) of regions for serial replication atomically. 243 addLastSeqIdsToOps(queueId, lastSeqIds, listOfOps); 244 if (listOfOps.isEmpty()) { 245 return; 246 } 247 try { 248 ZKUtil.multiOrSequential(zookeeper, listOfOps, false); 249 return; 250 } catch (KeeperException.BadVersionException | KeeperException.NodeExistsException e) { 251 LOG.warn( 252 "Bad version(or node exist) when persist the last pushed sequence id to zookeeper " 253 + "storage, Retry = " + retry + ", serverName=" + serverName + ", queueId=" 254 + queueId + ", fileName=" + fileName); 255 } 256 } 257 } catch (KeeperException e) { 258 throw new ReplicationException("Failed to set log position (serverName=" + serverName 259 + ", queueId=" + queueId + ", fileName=" + fileName + ", position=" + position + ")", e); 260 } 261 } 262 263 /** 264 * Return the {lastPushedSequenceId, ZNodeDataVersion} pair. if ZNodeDataVersion is -1, it means 265 * that the ZNode does not exist. 266 */ 267 @VisibleForTesting 268 protected Pair<Long, Integer> getLastSequenceIdWithVersion(String encodedRegionName, 269 String peerId) throws KeeperException { 270 Stat stat = new Stat(); 271 String path = getSerialReplicationRegionPeerNode(encodedRegionName, peerId); 272 byte[] data = ZKUtil.getDataNoWatch(zookeeper, path, stat); 273 if (data == null) { 274 // ZNode does not exist, so just return version -1 to indicate that no node exist. 275 return Pair.newPair(HConstants.NO_SEQNUM, -1); 276 } 277 try { 278 return Pair.newPair(ZKUtil.parseWALPositionFrom(data), stat.getVersion()); 279 } catch (DeserializationException de) { 280 LOG.warn("Failed to parse log position (region=" + encodedRegionName + ", peerId=" + peerId 281 + "), data=" + Bytes.toStringBinary(data)); 282 } 283 return Pair.newPair(HConstants.NO_SEQNUM, stat.getVersion()); 284 } 285 286 @Override 287 public long getLastSequenceId(String encodedRegionName, String peerId) 288 throws ReplicationException { 289 try { 290 return getLastSequenceIdWithVersion(encodedRegionName, peerId).getFirst(); 291 } catch (KeeperException e) { 292 throw new ReplicationException("Failed to get last pushed sequence id (encodedRegionName=" 293 + encodedRegionName + ", peerId=" + peerId + ")", e); 294 } 295 } 296 297 @Override 298 public void setLastSequenceIds(String peerId, Map<String, Long> lastSeqIds) 299 throws ReplicationException { 300 try { 301 // No need CAS and retry here, because it'll call setLastSequenceIds() for disabled peers 302 // only, so no conflict happen. 303 List<ZKUtilOp> listOfOps = new ArrayList<>(); 304 for (Entry<String, Long> lastSeqEntry : lastSeqIds.entrySet()) { 305 String path = getSerialReplicationRegionPeerNode(lastSeqEntry.getKey(), peerId); 306 ZKUtil.createWithParents(zookeeper, path); 307 listOfOps.add(ZKUtilOp.setData(path, ZKUtil.positionToByteArray(lastSeqEntry.getValue()))); 308 } 309 if (!listOfOps.isEmpty()) { 310 ZKUtil.multiOrSequential(zookeeper, listOfOps, true); 311 } 312 } catch (KeeperException e) { 313 throw new ReplicationException("Failed to set last sequence ids, peerId=" + peerId 314 + ", size of lastSeqIds=" + lastSeqIds.size(), e); 315 } 316 } 317 318 @Override 319 public void removeLastSequenceIds(String peerId) throws ReplicationException { 320 String suffix = "-" + peerId; 321 try { 322 StringBuilder sb = new StringBuilder(regionsZNode); 323 int regionsZNodeLength = regionsZNode.length(); 324 int levelOneLength = regionsZNodeLength + 3; 325 int levelTwoLength = levelOneLength + 3; 326 List<String> levelOneDirs = ZKUtil.listChildrenNoWatch(zookeeper, regionsZNode); 327 // it is possible that levelOneDirs is null if we haven't write any last pushed sequence ids 328 // yet, so we need an extra check here. 329 if (CollectionUtils.isEmpty(levelOneDirs)) { 330 return; 331 } 332 for (String levelOne : levelOneDirs) { 333 sb.append(ZNodePaths.ZNODE_PATH_SEPARATOR).append(levelOne); 334 for (String levelTwo : ZKUtil.listChildrenNoWatch(zookeeper, sb.toString())) { 335 sb.append(ZNodePaths.ZNODE_PATH_SEPARATOR).append(levelTwo); 336 for (String znode : ZKUtil.listChildrenNoWatch(zookeeper, sb.toString())) { 337 if (znode.endsWith(suffix)) { 338 sb.append(ZNodePaths.ZNODE_PATH_SEPARATOR).append(znode); 339 ZKUtil.deleteNode(zookeeper, sb.toString()); 340 sb.setLength(levelTwoLength); 341 } 342 } 343 sb.setLength(levelOneLength); 344 } 345 sb.setLength(regionsZNodeLength); 346 } 347 } catch (KeeperException e) { 348 throw new ReplicationException("Failed to remove all last sequence ids, peerId=" + peerId, e); 349 } 350 } 351 352 @Override 353 public void removeLastSequenceIds(String peerId, List<String> encodedRegionNames) 354 throws ReplicationException { 355 try { 356 List<ZKUtilOp> listOfOps = 357 encodedRegionNames.stream().map(n -> getSerialReplicationRegionPeerNode(n, peerId)) 358 .map(ZKUtilOp::deleteNodeFailSilent).collect(Collectors.toList()); 359 ZKUtil.multiOrSequential(zookeeper, listOfOps, true); 360 } catch (KeeperException e) { 361 throw new ReplicationException("Failed to remove last sequence ids, peerId=" + peerId + 362 ", encodedRegionNames.size=" + encodedRegionNames.size(), e); 363 } 364 } 365 366 @Override 367 public long getWALPosition(ServerName serverName, String queueId, String fileName) 368 throws ReplicationException { 369 byte[] bytes; 370 try { 371 bytes = ZKUtil.getData(zookeeper, getFileNode(serverName, queueId, fileName)); 372 } catch (KeeperException | InterruptedException e) { 373 throw new ReplicationException("Failed to get log position (serverName=" + serverName + 374 ", queueId=" + queueId + ", fileName=" + fileName + ")", e); 375 } 376 try { 377 return ZKUtil.parseWALPositionFrom(bytes); 378 } catch (DeserializationException de) { 379 LOG.warn("Failed parse log position (serverName={}, queueId={}, fileName={})", 380 serverName, queueId, fileName); 381 } 382 // if we can not parse the position, start at the beginning of the wal file again 383 return 0; 384 } 385 386 @Override 387 public Pair<String, SortedSet<String>> claimQueue(ServerName sourceServerName, String queueId, 388 ServerName destServerName) throws ReplicationException { 389 LOG.info("Atomically moving {}/{}'s WALs to {}", sourceServerName, queueId, destServerName); 390 try { 391 ZKUtil.createWithParents(zookeeper, getRsNode(destServerName)); 392 } catch (KeeperException e) { 393 throw new ReplicationException( 394 "Claim queue queueId=" + queueId + " from " + sourceServerName + " to " + destServerName + 395 " failed when creating the node for " + destServerName, 396 e); 397 } 398 String newQueueId = queueId + "-" + sourceServerName; 399 try { 400 String oldQueueNode = getQueueNode(sourceServerName, queueId); 401 List<String> wals = ZKUtil.listChildrenNoWatch(zookeeper, oldQueueNode); 402 if (CollectionUtils.isEmpty(wals)) { 403 ZKUtil.deleteNodeFailSilent(zookeeper, oldQueueNode); 404 LOG.info("Removed empty {}/{}", sourceServerName, queueId); 405 return new Pair<>(newQueueId, Collections.emptySortedSet()); 406 } 407 String newQueueNode = getQueueNode(destServerName, newQueueId); 408 List<ZKUtilOp> listOfOps = new ArrayList<>(); 409 SortedSet<String> logQueue = new TreeSet<>(); 410 // create the new cluster znode 411 listOfOps.add(ZKUtilOp.createAndFailSilent(newQueueNode, HConstants.EMPTY_BYTE_ARRAY)); 412 // get the offset of the logs and set it to new znodes 413 for (String wal : wals) { 414 String oldWalNode = getFileNode(oldQueueNode, wal); 415 byte[] logOffset = ZKUtil.getData(this.zookeeper, oldWalNode); 416 LOG.debug("Creating {} with data {}", wal, Bytes.toStringBinary(logOffset)); 417 String newWalNode = getFileNode(newQueueNode, wal); 418 listOfOps.add(ZKUtilOp.createAndFailSilent(newWalNode, logOffset)); 419 listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldWalNode)); 420 logQueue.add(wal); 421 } 422 // add delete op for peer 423 listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldQueueNode)); 424 425 LOG.trace("The multi list size is {}", listOfOps.size()); 426 ZKUtil.multiOrSequential(zookeeper, listOfOps, false); 427 428 LOG.info("Atomically moved {}/{}'s WALs to {}", sourceServerName, queueId, destServerName); 429 return new Pair<>(newQueueId, logQueue); 430 } catch (NoNodeException | NodeExistsException | NotEmptyException | BadVersionException e) { 431 // Multi call failed; it looks like some other regionserver took away the logs. 432 // These exceptions mean that zk tells us the request can not be execute. So return an empty 433 // queue to tell the upper layer that claim nothing. For other types of exception should be 434 // thrown out to notify the upper layer. 435 LOG.info("Claim queue queueId={} from {} to {} failed with {}, someone else took the log?", 436 queueId,sourceServerName, destServerName, e.toString()); 437 return new Pair<>(newQueueId, Collections.emptySortedSet()); 438 } catch (KeeperException | InterruptedException e) { 439 throw new ReplicationException("Claim queue queueId=" + queueId + " from " + 440 sourceServerName + " to " + destServerName + " failed", e); 441 } 442 } 443 444 @Override 445 public void removeReplicatorIfQueueIsEmpty(ServerName serverName) throws ReplicationException { 446 try { 447 ZKUtil.deleteNodeFailSilent(zookeeper, getRsNode(serverName)); 448 } catch (NotEmptyException e) { 449 // keep silence to avoid logging too much. 450 } catch (KeeperException e) { 451 throw new ReplicationException("Failed to remove replicator for " + serverName, e); 452 } 453 } 454 455 private List<ServerName> getListOfReplicators0() throws KeeperException { 456 List<String> children = ZKUtil.listChildrenNoWatch(zookeeper, queuesZNode); 457 if (children == null) { 458 children = Collections.emptyList(); 459 } 460 return children.stream().map(ServerName::parseServerName).collect(toList()); 461 } 462 463 @Override 464 public List<ServerName> getListOfReplicators() throws ReplicationException { 465 try { 466 return getListOfReplicators0(); 467 } catch (KeeperException e) { 468 throw new ReplicationException("Failed to get list of replicators", e); 469 } 470 } 471 472 private List<String> getWALsInQueue0(ServerName serverName, String queueId) 473 throws KeeperException { 474 List<String> children = ZKUtil.listChildrenNoWatch(zookeeper, getQueueNode(serverName, 475 queueId)); 476 return children != null ? children : Collections.emptyList(); 477 } 478 479 @Override 480 public List<String> getWALsInQueue(ServerName serverName, String queueId) 481 throws ReplicationException { 482 try { 483 return getWALsInQueue0(serverName, queueId); 484 } catch (KeeperException e) { 485 throw new ReplicationException( 486 "Failed to get wals in queue (serverName=" + serverName + ", queueId=" + queueId + ")", 487 e); 488 } 489 } 490 491 private List<String> getAllQueues0(ServerName serverName) throws KeeperException { 492 List<String> children = ZKUtil.listChildrenNoWatch(zookeeper, getRsNode(serverName)); 493 return children != null ? children : Collections.emptyList(); 494 } 495 496 @Override 497 public List<String> getAllQueues(ServerName serverName) throws ReplicationException { 498 try { 499 return getAllQueues0(serverName); 500 } catch (KeeperException e) { 501 throw new ReplicationException("Failed to get all queues (serverName=" + serverName + ")", e); 502 } 503 } 504 505 // will be overridden in UTs 506 @VisibleForTesting 507 protected int getQueuesZNodeCversion() throws KeeperException { 508 Stat stat = new Stat(); 509 ZKUtil.getDataNoWatch(this.zookeeper, this.queuesZNode, stat); 510 return stat.getCversion(); 511 } 512 513 @Override 514 public Set<String> getAllWALs() throws ReplicationException { 515 try { 516 for (int retry = 0;; retry++) { 517 int v0 = getQueuesZNodeCversion(); 518 List<ServerName> rss = getListOfReplicators0(); 519 if (rss.isEmpty()) { 520 LOG.debug("Didn't find a RegionServer that replicates, won't prevent deletions."); 521 return Collections.emptySet(); 522 } 523 Set<String> wals = new HashSet<>(); 524 for (ServerName rs : rss) { 525 for (String queueId : getAllQueues0(rs)) { 526 wals.addAll(getWALsInQueue0(rs, queueId)); 527 } 528 } 529 int v1 = getQueuesZNodeCversion(); 530 if (v0 == v1) { 531 return wals; 532 } 533 LOG.info("Replication queue node cversion changed from %d to %d, retry = %d", 534 v0, v1, retry); 535 } 536 } catch (KeeperException e) { 537 throw new ReplicationException("Failed to get all wals", e); 538 } 539 } 540 541 private String getHFileRefsPeerNode(String peerId) { 542 return ZNodePaths.joinZNode(hfileRefsZNode, peerId); 543 } 544 545 private String getHFileNode(String peerNode, String fileName) { 546 return ZNodePaths.joinZNode(peerNode, fileName); 547 } 548 549 @Override 550 public void addPeerToHFileRefs(String peerId) throws ReplicationException { 551 String peerNode = getHFileRefsPeerNode(peerId); 552 try { 553 if (ZKUtil.checkExists(zookeeper, peerNode) == -1) { 554 LOG.info("Adding peer {} to hfile reference queue.", peerId); 555 ZKUtil.createWithParents(zookeeper, peerNode); 556 } 557 } catch (KeeperException e) { 558 throw new ReplicationException("Failed to add peer " + peerId + " to hfile reference queue.", 559 e); 560 } 561 } 562 563 @Override 564 public void removePeerFromHFileRefs(String peerId) throws ReplicationException { 565 String peerNode = getHFileRefsPeerNode(peerId); 566 try { 567 if (ZKUtil.checkExists(zookeeper, peerNode) == -1) { 568 LOG.debug("Peer {} not found in hfile reference queue.", peerNode); 569 } else { 570 LOG.info("Removing peer {} from hfile reference queue.", peerNode); 571 ZKUtil.deleteNodeRecursively(zookeeper, peerNode); 572 } 573 } catch (KeeperException e) { 574 throw new ReplicationException( 575 "Failed to remove peer " + peerId + " from hfile reference queue.", e); 576 } 577 } 578 579 @Override 580 public void addHFileRefs(String peerId, List<Pair<Path, Path>> pairs) 581 throws ReplicationException { 582 String peerNode = getHFileRefsPeerNode(peerId); 583 LOG.debug("Adding hfile references {} in queue {}", pairs, peerNode); 584 List<ZKUtilOp> listOfOps = pairs.stream().map(p -> p.getSecond().getName()) 585 .map(n -> getHFileNode(peerNode, n)) 586 .map(f -> ZKUtilOp.createAndFailSilent(f, HConstants.EMPTY_BYTE_ARRAY)).collect(toList()); 587 LOG.debug("The multi list size for adding hfile references in zk for node {} is {}", 588 peerNode, listOfOps.size()); 589 try { 590 ZKUtil.multiOrSequential(this.zookeeper, listOfOps, true); 591 } catch (KeeperException e) { 592 throw new ReplicationException("Failed to add hfile reference to peer " + peerId, e); 593 } 594 } 595 596 @Override 597 public void removeHFileRefs(String peerId, List<String> files) throws ReplicationException { 598 String peerNode = getHFileRefsPeerNode(peerId); 599 LOG.debug("Removing hfile references {} from queue {}", files, peerNode); 600 601 List<ZKUtilOp> listOfOps = files.stream().map(n -> getHFileNode(peerNode, n)) 602 .map(ZKUtilOp::deleteNodeFailSilent).collect(toList()); 603 LOG.debug("The multi list size for removing hfile references in zk for node {} is {}", 604 peerNode, listOfOps.size()); 605 try { 606 ZKUtil.multiOrSequential(this.zookeeper, listOfOps, true); 607 } catch (KeeperException e) { 608 throw new ReplicationException("Failed to remove hfile reference from peer " + peerId, e); 609 } 610 } 611 612 private List<String> getAllPeersFromHFileRefsQueue0() throws KeeperException { 613 List<String> children = ZKUtil.listChildrenNoWatch(zookeeper, hfileRefsZNode); 614 return children != null ? children : Collections.emptyList(); 615 } 616 617 @Override 618 public List<String> getAllPeersFromHFileRefsQueue() throws ReplicationException { 619 try { 620 return getAllPeersFromHFileRefsQueue0(); 621 } catch (KeeperException e) { 622 throw new ReplicationException("Failed to get list of all peers in hfile references node.", 623 e); 624 } 625 } 626 627 private List<String> getReplicableHFiles0(String peerId) throws KeeperException { 628 List<String> children = ZKUtil.listChildrenNoWatch(this.zookeeper, 629 getHFileRefsPeerNode(peerId)); 630 return children != null ? children : Collections.emptyList(); 631 } 632 633 @Override 634 public List<String> getReplicableHFiles(String peerId) throws ReplicationException { 635 try { 636 return getReplicableHFiles0(peerId); 637 } catch (KeeperException e) { 638 throw new ReplicationException("Failed to get list of hfile references for peer " + peerId, 639 e); 640 } 641 } 642 643 // will be overridden in UTs 644 @VisibleForTesting 645 protected int getHFileRefsZNodeCversion() throws ReplicationException { 646 Stat stat = new Stat(); 647 try { 648 ZKUtil.getDataNoWatch(zookeeper, hfileRefsZNode, stat); 649 } catch (KeeperException e) { 650 throw new ReplicationException("Failed to get stat of replication hfile references node.", e); 651 } 652 return stat.getCversion(); 653 } 654 655 @Override 656 public Set<String> getAllHFileRefs() throws ReplicationException { 657 try { 658 for (int retry = 0;; retry++) { 659 int v0 = getHFileRefsZNodeCversion(); 660 List<String> peers = getAllPeersFromHFileRefsQueue(); 661 if (peers.isEmpty()) { 662 LOG.debug("Didn't find any peers with hfile references, won't prevent deletions."); 663 return Collections.emptySet(); 664 } 665 Set<String> hfileRefs = new HashSet<>(); 666 for (String peer : peers) { 667 hfileRefs.addAll(getReplicableHFiles0(peer)); 668 } 669 int v1 = getHFileRefsZNodeCversion(); 670 if (v0 == v1) { 671 return hfileRefs; 672 } 673 LOG.debug("Replication hfile references node cversion changed from %d to %d, retry = %d", 674 v0, v1, retry); 675 } 676 } catch (KeeperException e) { 677 throw new ReplicationException("Failed to get all hfile refs", e); 678 } 679 } 680}