001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static java.util.stream.Collectors.toList; 021 022import java.util.ArrayList; 023import java.util.Collections; 024import java.util.HashSet; 025import java.util.List; 026import java.util.Map; 027import java.util.Map.Entry; 028import java.util.Set; 029import java.util.SortedSet; 030import java.util.TreeSet; 031import java.util.stream.Collectors; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.hbase.HConstants; 035import org.apache.hadoop.hbase.ServerName; 036import org.apache.hadoop.hbase.client.RegionInfo; 037import org.apache.hadoop.hbase.exceptions.DeserializationException; 038import org.apache.hadoop.hbase.util.Bytes; 039import org.apache.hadoop.hbase.util.Pair; 040import org.apache.hadoop.hbase.zookeeper.ZKUtil; 041import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp; 042import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 043import org.apache.hadoop.hbase.zookeeper.ZNodePaths; 044import org.apache.yetus.audience.InterfaceAudience; 045import org.apache.zookeeper.KeeperException; 046import org.apache.zookeeper.KeeperException.BadVersionException; 047import org.apache.zookeeper.KeeperException.NoNodeException; 048import org.apache.zookeeper.KeeperException.NodeExistsException; 049import org.apache.zookeeper.KeeperException.NotEmptyException; 050import org.apache.zookeeper.data.Stat; 051import org.slf4j.Logger; 052import org.slf4j.LoggerFactory; 053 054import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 055 056/** 057 * ZK based replication queue storage. 058 * <p> 059 * The base znode for each regionserver is the regionserver name. For example: 060 * 061 * <pre> 062 * /hbase/replication/rs/hostname.example.org,6020,1234 063 * </pre> 064 * 065 * Within this znode, the region server maintains a set of WAL replication queues. These queues are 066 * represented by child znodes named using there give queue id. For example: 067 * 068 * <pre> 069 * /hbase/replication/rs/hostname.example.org,6020,1234/1 070 * /hbase/replication/rs/hostname.example.org,6020,1234/2 071 * </pre> 072 * 073 * Each queue has one child znode for every WAL that still needs to be replicated. The value of 074 * these WAL child znodes is the latest position that has been replicated. This position is updated 075 * every time a WAL entry is replicated. For example: 076 * 077 * <pre> 078 * /hbase/replication/rs/hostname.example.org,6020,1234/1/23522342.23422 [VALUE: 254] 079 * </pre> 080 */ 081@InterfaceAudience.Private 082class ZKReplicationQueueStorage extends ZKReplicationStorageBase 083 implements ReplicationQueueStorage { 084 085 private static final Logger LOG = LoggerFactory.getLogger(ZKReplicationQueueStorage.class); 086 087 public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY = 088 "zookeeper.znode.replication.hfile.refs"; 089 public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT = "hfile-refs"; 090 091 public static final String ZOOKEEPER_ZNODE_REPLICATION_REGIONS_KEY = 092 "zookeeper.znode.replication.regions"; 093 public static final String ZOOKEEPER_ZNODE_REPLICATION_REGIONS_DEFAULT = "regions"; 094 095 /** 096 * The name of the znode that contains all replication queues 097 */ 098 private final String queuesZNode; 099 100 /** 101 * The name of the znode that contains queues of hfile references to be replicated 102 */ 103 private final String hfileRefsZNode; 104 105 final String regionsZNode; 106 107 public ZKReplicationQueueStorage(ZKWatcher zookeeper, Configuration conf) { 108 super(zookeeper, conf); 109 110 String queuesZNodeName = conf.get("zookeeper.znode.replication.rs", "rs"); 111 String hfileRefsZNodeName = conf.get(ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY, 112 ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT); 113 this.queuesZNode = ZNodePaths.joinZNode(replicationZNode, queuesZNodeName); 114 this.hfileRefsZNode = ZNodePaths.joinZNode(replicationZNode, hfileRefsZNodeName); 115 this.regionsZNode = ZNodePaths.joinZNode(replicationZNode, conf 116 .get(ZOOKEEPER_ZNODE_REPLICATION_REGIONS_KEY, ZOOKEEPER_ZNODE_REPLICATION_REGIONS_DEFAULT)); 117 } 118 119 @Override 120 public String getRsNode(ServerName serverName) { 121 return ZNodePaths.joinZNode(queuesZNode, serverName.getServerName()); 122 } 123 124 private String getQueueNode(ServerName serverName, String queueId) { 125 return ZNodePaths.joinZNode(getRsNode(serverName), queueId); 126 } 127 128 private String getFileNode(String queueNode, String fileName) { 129 return ZNodePaths.joinZNode(queueNode, fileName); 130 } 131 132 private String getFileNode(ServerName serverName, String queueId, String fileName) { 133 return getFileNode(getQueueNode(serverName, queueId), fileName); 134 } 135 136 /** 137 * <p> 138 * Put all regions under /hbase/replication/regions znode will lead to too many children because 139 * of the huge number of regions in real production environment. So here we will distribute the 140 * znodes to multiple directories. 141 * </p> 142 * <p> 143 * So the final znode path will be format like this: 144 * 145 * <pre> 146 * /hbase/replication/regions/dd/04/e76a6966d4ffa908ed0586764767-100 147 * </pre> 148 * 149 * Here the full encoded region name is dd04e76a6966d4ffa908ed0586764767, and we use the first two 150 * characters 'dd' as the first level directory name, and use the next two characters '04' as the 151 * second level directory name, and the rest part as the prefix of the znode, and the suffix '100' 152 * is the peer id. 153 * </p> 154 * @param encodedRegionName the encoded region name. 155 * @param peerId peer id for replication. 156 * @return ZNode path to persist the max sequence id that we've pushed for the given region and 157 * peer. 158 */ 159 String getSerialReplicationRegionPeerNode(String encodedRegionName, String peerId) { 160 if (encodedRegionName == null || encodedRegionName.length() != RegionInfo.MD5_HEX_LENGTH) { 161 throw new IllegalArgumentException( 162 "Invalid encoded region name: " + encodedRegionName + ", length should be 32."); 163 } 164 return new StringBuilder(regionsZNode).append(ZNodePaths.ZNODE_PATH_SEPARATOR) 165 .append(encodedRegionName, 0, 2).append(ZNodePaths.ZNODE_PATH_SEPARATOR) 166 .append(encodedRegionName, 2, 4).append(ZNodePaths.ZNODE_PATH_SEPARATOR) 167 .append(encodedRegionName, 4, encodedRegionName.length()).append("-").append(peerId) 168 .toString(); 169 } 170 171 @Override 172 public void removeQueue(ServerName serverName, String queueId) throws ReplicationException { 173 try { 174 ZKUtil.deleteNodeRecursively(zookeeper, getQueueNode(serverName, queueId)); 175 } catch (KeeperException e) { 176 throw new ReplicationException( 177 "Failed to delete queue (serverName=" + serverName + ", queueId=" + queueId + ")", e); 178 } 179 } 180 181 @Override 182 public void addWAL(ServerName serverName, String queueId, String fileName) 183 throws ReplicationException { 184 try { 185 ZKUtil.createWithParents(zookeeper, getFileNode(serverName, queueId, fileName)); 186 } catch (KeeperException e) { 187 throw new ReplicationException("Failed to add wal to queue (serverName=" + serverName 188 + ", queueId=" + queueId + ", fileName=" + fileName + ")", e); 189 } 190 } 191 192 @Override 193 public void removeWAL(ServerName serverName, String queueId, String fileName) 194 throws ReplicationException { 195 String fileNode = getFileNode(serverName, queueId, fileName); 196 try { 197 ZKUtil.deleteNode(zookeeper, fileNode); 198 } catch (NoNodeException e) { 199 LOG.warn("{} already deleted when removing log", fileNode); 200 } catch (KeeperException e) { 201 throw new ReplicationException("Failed to remove wal from queue (serverName=" + serverName + 202 ", queueId=" + queueId + ", fileName=" + fileName + ")", e); 203 } 204 } 205 206 private void addLastSeqIdsToOps(String queueId, Map<String, Long> lastSeqIds, 207 List<ZKUtilOp> listOfOps) throws KeeperException, ReplicationException { 208 String peerId = new ReplicationQueueInfo(queueId).getPeerId(); 209 for (Entry<String, Long> lastSeqEntry : lastSeqIds.entrySet()) { 210 String path = getSerialReplicationRegionPeerNode(lastSeqEntry.getKey(), peerId); 211 Pair<Long, Integer> p = getLastSequenceIdWithVersion(lastSeqEntry.getKey(), peerId); 212 byte[] data = ZKUtil.positionToByteArray(lastSeqEntry.getValue()); 213 if (p.getSecond() < 0) { // ZNode does not exist. 214 ZKUtil.createWithParents(zookeeper, 215 path.substring(0, path.lastIndexOf(ZNodePaths.ZNODE_PATH_SEPARATOR))); 216 listOfOps.add(ZKUtilOp.createAndFailSilent(path, data)); 217 continue; 218 } 219 // Perform CAS in a specific version v0 (HBASE-20138) 220 int v0 = p.getSecond(); 221 long lastPushedSeqId = p.getFirst(); 222 if (lastSeqEntry.getValue() <= lastPushedSeqId) { 223 continue; 224 } 225 listOfOps.add(ZKUtilOp.setData(path, data, v0)); 226 } 227 } 228 229 @Override 230 public void setWALPosition(ServerName serverName, String queueId, String fileName, long position, 231 Map<String, Long> lastSeqIds) throws ReplicationException { 232 try { 233 for (int retry = 0;; retry++) { 234 List<ZKUtilOp> listOfOps = new ArrayList<>(); 235 if (position > 0) { 236 listOfOps.add(ZKUtilOp.setData(getFileNode(serverName, queueId, fileName), 237 ZKUtil.positionToByteArray(position))); 238 } 239 // Persist the max sequence id(s) of regions for serial replication atomically. 240 addLastSeqIdsToOps(queueId, lastSeqIds, listOfOps); 241 if (listOfOps.isEmpty()) { 242 return; 243 } 244 try { 245 ZKUtil.multiOrSequential(zookeeper, listOfOps, false); 246 return; 247 } catch (KeeperException.BadVersionException | KeeperException.NodeExistsException e) { 248 LOG.warn( 249 "Bad version(or node exist) when persist the last pushed sequence id to zookeeper " 250 + "storage, Retry = " + retry + ", serverName=" + serverName + ", queueId=" 251 + queueId + ", fileName=" + fileName); 252 } 253 } 254 } catch (KeeperException e) { 255 throw new ReplicationException("Failed to set log position (serverName=" + serverName 256 + ", queueId=" + queueId + ", fileName=" + fileName + ", position=" + position + ")", e); 257 } 258 } 259 260 /** 261 * Return the {lastPushedSequenceId, ZNodeDataVersion} pair. if ZNodeDataVersion is -1, it means 262 * that the ZNode does not exist. 263 */ 264 protected Pair<Long, Integer> getLastSequenceIdWithVersion(String encodedRegionName, 265 String peerId) throws KeeperException { 266 Stat stat = new Stat(); 267 String path = getSerialReplicationRegionPeerNode(encodedRegionName, peerId); 268 byte[] data = ZKUtil.getDataNoWatch(zookeeper, path, stat); 269 if (data == null) { 270 // ZNode does not exist, so just return version -1 to indicate that no node exist. 271 return Pair.newPair(HConstants.NO_SEQNUM, -1); 272 } 273 try { 274 return Pair.newPair(ZKUtil.parseWALPositionFrom(data), stat.getVersion()); 275 } catch (DeserializationException de) { 276 LOG.warn("Failed to parse log position (region=" + encodedRegionName + ", peerId=" + peerId 277 + "), data=" + Bytes.toStringBinary(data)); 278 } 279 return Pair.newPair(HConstants.NO_SEQNUM, stat.getVersion()); 280 } 281 282 @Override 283 public long getLastSequenceId(String encodedRegionName, String peerId) 284 throws ReplicationException { 285 try { 286 return getLastSequenceIdWithVersion(encodedRegionName, peerId).getFirst(); 287 } catch (KeeperException e) { 288 throw new ReplicationException("Failed to get last pushed sequence id (encodedRegionName=" 289 + encodedRegionName + ", peerId=" + peerId + ")", e); 290 } 291 } 292 293 @Override 294 public void setLastSequenceIds(String peerId, Map<String, Long> lastSeqIds) 295 throws ReplicationException { 296 try { 297 // No need CAS and retry here, because it'll call setLastSequenceIds() for disabled peers 298 // only, so no conflict happen. 299 List<ZKUtilOp> listOfOps = new ArrayList<>(); 300 for (Entry<String, Long> lastSeqEntry : lastSeqIds.entrySet()) { 301 String path = getSerialReplicationRegionPeerNode(lastSeqEntry.getKey(), peerId); 302 ZKUtil.createWithParents(zookeeper, path); 303 listOfOps.add(ZKUtilOp.setData(path, ZKUtil.positionToByteArray(lastSeqEntry.getValue()))); 304 } 305 if (!listOfOps.isEmpty()) { 306 ZKUtil.multiOrSequential(zookeeper, listOfOps, true); 307 } 308 } catch (KeeperException e) { 309 throw new ReplicationException("Failed to set last sequence ids, peerId=" + peerId 310 + ", size of lastSeqIds=" + lastSeqIds.size(), e); 311 } 312 } 313 314 @Override 315 public void removeLastSequenceIds(String peerId) throws ReplicationException { 316 String suffix = "-" + peerId; 317 try { 318 StringBuilder sb = new StringBuilder(regionsZNode); 319 int regionsZNodeLength = regionsZNode.length(); 320 int levelOneLength = regionsZNodeLength + 3; 321 int levelTwoLength = levelOneLength + 3; 322 List<String> levelOneDirs = ZKUtil.listChildrenNoWatch(zookeeper, regionsZNode); 323 // it is possible that levelOneDirs is null if we haven't write any last pushed sequence ids 324 // yet, so we need an extra check here. 325 if (CollectionUtils.isEmpty(levelOneDirs)) { 326 return; 327 } 328 for (String levelOne : levelOneDirs) { 329 sb.append(ZNodePaths.ZNODE_PATH_SEPARATOR).append(levelOne); 330 for (String levelTwo : ZKUtil.listChildrenNoWatch(zookeeper, sb.toString())) { 331 sb.append(ZNodePaths.ZNODE_PATH_SEPARATOR).append(levelTwo); 332 for (String znode : ZKUtil.listChildrenNoWatch(zookeeper, sb.toString())) { 333 if (znode.endsWith(suffix)) { 334 sb.append(ZNodePaths.ZNODE_PATH_SEPARATOR).append(znode); 335 ZKUtil.deleteNode(zookeeper, sb.toString()); 336 sb.setLength(levelTwoLength); 337 } 338 } 339 sb.setLength(levelOneLength); 340 } 341 sb.setLength(regionsZNodeLength); 342 } 343 } catch (KeeperException e) { 344 throw new ReplicationException("Failed to remove all last sequence ids, peerId=" + peerId, e); 345 } 346 } 347 348 @Override 349 public void removeLastSequenceIds(String peerId, List<String> encodedRegionNames) 350 throws ReplicationException { 351 try { 352 List<ZKUtilOp> listOfOps = 353 encodedRegionNames.stream().map(n -> getSerialReplicationRegionPeerNode(n, peerId)) 354 .map(ZKUtilOp::deleteNodeFailSilent).collect(Collectors.toList()); 355 ZKUtil.multiOrSequential(zookeeper, listOfOps, true); 356 } catch (KeeperException e) { 357 throw new ReplicationException("Failed to remove last sequence ids, peerId=" + peerId + 358 ", encodedRegionNames.size=" + encodedRegionNames.size(), e); 359 } 360 } 361 362 @Override 363 public long getWALPosition(ServerName serverName, String queueId, String fileName) 364 throws ReplicationException { 365 byte[] bytes; 366 try { 367 bytes = ZKUtil.getData(zookeeper, getFileNode(serverName, queueId, fileName)); 368 } catch (KeeperException | InterruptedException e) { 369 throw new ReplicationException("Failed to get log position (serverName=" + serverName + 370 ", queueId=" + queueId + ", fileName=" + fileName + ")", e); 371 } 372 try { 373 return ZKUtil.parseWALPositionFrom(bytes); 374 } catch (DeserializationException de) { 375 LOG.warn("Failed parse log position (serverName={}, queueId={}, fileName={})", 376 serverName, queueId, fileName); 377 } 378 // if we can not parse the position, start at the beginning of the wal file again 379 return 0; 380 } 381 382 @Override 383 public Pair<String, SortedSet<String>> claimQueue(ServerName sourceServerName, String queueId, 384 ServerName destServerName) throws ReplicationException { 385 LOG.info("Atomically moving {}/{}'s WALs to {}", sourceServerName, queueId, destServerName); 386 try { 387 ZKUtil.createWithParents(zookeeper, getRsNode(destServerName)); 388 } catch (KeeperException e) { 389 throw new ReplicationException( 390 "Claim queue queueId=" + queueId + " from " + sourceServerName + " to " + destServerName + 391 " failed when creating the node for " + destServerName, 392 e); 393 } 394 String newQueueId = queueId + "-" + sourceServerName; 395 try { 396 String oldQueueNode = getQueueNode(sourceServerName, queueId); 397 List<String> wals = ZKUtil.listChildrenNoWatch(zookeeper, oldQueueNode); 398 if (CollectionUtils.isEmpty(wals)) { 399 ZKUtil.deleteNodeFailSilent(zookeeper, oldQueueNode); 400 LOG.info("Removed empty {}/{}", sourceServerName, queueId); 401 return new Pair<>(newQueueId, Collections.emptySortedSet()); 402 } 403 String newQueueNode = getQueueNode(destServerName, newQueueId); 404 List<ZKUtilOp> listOfOps = new ArrayList<>(); 405 SortedSet<String> logQueue = new TreeSet<>(); 406 // create the new cluster znode 407 listOfOps.add(ZKUtilOp.createAndFailSilent(newQueueNode, HConstants.EMPTY_BYTE_ARRAY)); 408 // get the offset of the logs and set it to new znodes 409 for (String wal : wals) { 410 String oldWalNode = getFileNode(oldQueueNode, wal); 411 byte[] logOffset = ZKUtil.getData(this.zookeeper, oldWalNode); 412 LOG.debug("Creating {} with data {}", wal, Bytes.toStringBinary(logOffset)); 413 String newWalNode = getFileNode(newQueueNode, wal); 414 listOfOps.add(ZKUtilOp.createAndFailSilent(newWalNode, logOffset)); 415 listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldWalNode)); 416 logQueue.add(wal); 417 } 418 // add delete op for peer 419 listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldQueueNode)); 420 421 LOG.trace("The multi list size is {}", listOfOps.size()); 422 ZKUtil.multiOrSequential(zookeeper, listOfOps, false); 423 424 LOG.info("Atomically moved {}/{}'s WALs to {}", sourceServerName, queueId, destServerName); 425 return new Pair<>(newQueueId, logQueue); 426 } catch (NoNodeException | NodeExistsException | NotEmptyException | BadVersionException e) { 427 // Multi call failed; it looks like some other regionserver took away the logs. 428 // These exceptions mean that zk tells us the request can not be execute. So return an empty 429 // queue to tell the upper layer that claim nothing. For other types of exception should be 430 // thrown out to notify the upper layer. 431 LOG.info("Claim queue queueId={} from {} to {} failed with {}, someone else took the log?", 432 queueId,sourceServerName, destServerName, e.toString()); 433 return new Pair<>(newQueueId, Collections.emptySortedSet()); 434 } catch (KeeperException | InterruptedException e) { 435 throw new ReplicationException("Claim queue queueId=" + queueId + " from " + 436 sourceServerName + " to " + destServerName + " failed", e); 437 } 438 } 439 440 @Override 441 public void removeReplicatorIfQueueIsEmpty(ServerName serverName) throws ReplicationException { 442 try { 443 ZKUtil.deleteNodeFailSilent(zookeeper, getRsNode(serverName)); 444 } catch (NotEmptyException e) { 445 // keep silence to avoid logging too much. 446 } catch (KeeperException e) { 447 throw new ReplicationException("Failed to remove replicator for " + serverName, e); 448 } 449 } 450 451 private List<ServerName> getListOfReplicators0() throws KeeperException { 452 List<String> children = ZKUtil.listChildrenNoWatch(zookeeper, queuesZNode); 453 if (children == null) { 454 children = Collections.emptyList(); 455 } 456 return children.stream().map(ServerName::parseServerName).collect(toList()); 457 } 458 459 @Override 460 public List<ServerName> getListOfReplicators() throws ReplicationException { 461 try { 462 return getListOfReplicators0(); 463 } catch (KeeperException e) { 464 throw new ReplicationException("Failed to get list of replicators", e); 465 } 466 } 467 468 private List<String> getWALsInQueue0(ServerName serverName, String queueId) 469 throws KeeperException { 470 List<String> children = ZKUtil.listChildrenNoWatch(zookeeper, getQueueNode(serverName, 471 queueId)); 472 return children != null ? children : Collections.emptyList(); 473 } 474 475 @Override 476 public List<String> getWALsInQueue(ServerName serverName, String queueId) 477 throws ReplicationException { 478 try { 479 return getWALsInQueue0(serverName, queueId); 480 } catch (KeeperException e) { 481 throw new ReplicationException( 482 "Failed to get wals in queue (serverName=" + serverName + ", queueId=" + queueId + ")", 483 e); 484 } 485 } 486 487 private List<String> getAllQueues0(ServerName serverName) throws KeeperException { 488 List<String> children = ZKUtil.listChildrenNoWatch(zookeeper, getRsNode(serverName)); 489 return children != null ? children : Collections.emptyList(); 490 } 491 492 @Override 493 public List<String> getAllQueues(ServerName serverName) throws ReplicationException { 494 try { 495 return getAllQueues0(serverName); 496 } catch (KeeperException e) { 497 throw new ReplicationException("Failed to get all queues (serverName=" + serverName + ")", e); 498 } 499 } 500 501 // will be overridden in UTs 502 protected int getQueuesZNodeCversion() throws KeeperException { 503 Stat stat = new Stat(); 504 ZKUtil.getDataNoWatch(this.zookeeper, this.queuesZNode, stat); 505 return stat.getCversion(); 506 } 507 508 @Override 509 public Set<String> getAllWALs() throws ReplicationException { 510 try { 511 for (int retry = 0;; retry++) { 512 int v0 = getQueuesZNodeCversion(); 513 List<ServerName> rss = getListOfReplicators0(); 514 if (rss.isEmpty()) { 515 LOG.debug("Didn't find a RegionServer that replicates, won't prevent deletions."); 516 return Collections.emptySet(); 517 } 518 Set<String> wals = new HashSet<>(); 519 for (ServerName rs : rss) { 520 for (String queueId : getAllQueues0(rs)) { 521 wals.addAll(getWALsInQueue0(rs, queueId)); 522 } 523 } 524 int v1 = getQueuesZNodeCversion(); 525 if (v0 == v1) { 526 return wals; 527 } 528 LOG.info("Replication queue node cversion changed from %d to %d, retry = %d", 529 v0, v1, retry); 530 } 531 } catch (KeeperException e) { 532 throw new ReplicationException("Failed to get all wals", e); 533 } 534 } 535 536 private String getHFileRefsPeerNode(String peerId) { 537 return ZNodePaths.joinZNode(hfileRefsZNode, peerId); 538 } 539 540 private String getHFileNode(String peerNode, String fileName) { 541 return ZNodePaths.joinZNode(peerNode, fileName); 542 } 543 544 @Override 545 public void addPeerToHFileRefs(String peerId) throws ReplicationException { 546 String peerNode = getHFileRefsPeerNode(peerId); 547 try { 548 if (ZKUtil.checkExists(zookeeper, peerNode) == -1) { 549 LOG.info("Adding peer {} to hfile reference queue.", peerId); 550 ZKUtil.createWithParents(zookeeper, peerNode); 551 } 552 } catch (KeeperException e) { 553 throw new ReplicationException("Failed to add peer " + peerId + " to hfile reference queue.", 554 e); 555 } 556 } 557 558 @Override 559 public void removePeerFromHFileRefs(String peerId) throws ReplicationException { 560 String peerNode = getHFileRefsPeerNode(peerId); 561 try { 562 if (ZKUtil.checkExists(zookeeper, peerNode) == -1) { 563 LOG.debug("Peer {} not found in hfile reference queue.", peerNode); 564 } else { 565 LOG.info("Removing peer {} from hfile reference queue.", peerNode); 566 ZKUtil.deleteNodeRecursively(zookeeper, peerNode); 567 } 568 } catch (KeeperException e) { 569 throw new ReplicationException( 570 "Failed to remove peer " + peerId + " from hfile reference queue.", e); 571 } 572 } 573 574 @Override 575 public void addHFileRefs(String peerId, List<Pair<Path, Path>> pairs) 576 throws ReplicationException { 577 String peerNode = getHFileRefsPeerNode(peerId); 578 LOG.debug("Adding hfile references {} in queue {}", pairs, peerNode); 579 List<ZKUtilOp> listOfOps = pairs.stream().map(p -> p.getSecond().getName()) 580 .map(n -> getHFileNode(peerNode, n)) 581 .map(f -> ZKUtilOp.createAndFailSilent(f, HConstants.EMPTY_BYTE_ARRAY)).collect(toList()); 582 LOG.debug("The multi list size for adding hfile references in zk for node {} is {}", 583 peerNode, listOfOps.size()); 584 try { 585 ZKUtil.multiOrSequential(this.zookeeper, listOfOps, true); 586 } catch (KeeperException e) { 587 throw new ReplicationException("Failed to add hfile reference to peer " + peerId, e); 588 } 589 } 590 591 @Override 592 public void removeHFileRefs(String peerId, List<String> files) throws ReplicationException { 593 String peerNode = getHFileRefsPeerNode(peerId); 594 LOG.debug("Removing hfile references {} from queue {}", files, peerNode); 595 596 List<ZKUtilOp> listOfOps = files.stream().map(n -> getHFileNode(peerNode, n)) 597 .map(ZKUtilOp::deleteNodeFailSilent).collect(toList()); 598 LOG.debug("The multi list size for removing hfile references in zk for node {} is {}", 599 peerNode, listOfOps.size()); 600 try { 601 ZKUtil.multiOrSequential(this.zookeeper, listOfOps, true); 602 } catch (KeeperException e) { 603 throw new ReplicationException("Failed to remove hfile reference from peer " + peerId, e); 604 } 605 } 606 607 private List<String> getAllPeersFromHFileRefsQueue0() throws KeeperException { 608 List<String> children = ZKUtil.listChildrenNoWatch(zookeeper, hfileRefsZNode); 609 return children != null ? children : Collections.emptyList(); 610 } 611 612 @Override 613 public List<String> getAllPeersFromHFileRefsQueue() throws ReplicationException { 614 try { 615 return getAllPeersFromHFileRefsQueue0(); 616 } catch (KeeperException e) { 617 throw new ReplicationException("Failed to get list of all peers in hfile references node.", 618 e); 619 } 620 } 621 622 private List<String> getReplicableHFiles0(String peerId) throws KeeperException { 623 List<String> children = ZKUtil.listChildrenNoWatch(this.zookeeper, 624 getHFileRefsPeerNode(peerId)); 625 return children != null ? children : Collections.emptyList(); 626 } 627 628 @Override 629 public List<String> getReplicableHFiles(String peerId) throws ReplicationException { 630 try { 631 return getReplicableHFiles0(peerId); 632 } catch (KeeperException e) { 633 throw new ReplicationException("Failed to get list of hfile references for peer " + peerId, 634 e); 635 } 636 } 637 638 // will be overridden in UTs 639 protected int getHFileRefsZNodeCversion() throws ReplicationException { 640 Stat stat = new Stat(); 641 try { 642 ZKUtil.getDataNoWatch(zookeeper, hfileRefsZNode, stat); 643 } catch (KeeperException e) { 644 throw new ReplicationException("Failed to get stat of replication hfile references node.", e); 645 } 646 return stat.getCversion(); 647 } 648 649 @Override 650 public Set<String> getAllHFileRefs() throws ReplicationException { 651 try { 652 for (int retry = 0;; retry++) { 653 int v0 = getHFileRefsZNodeCversion(); 654 List<String> peers = getAllPeersFromHFileRefsQueue(); 655 if (peers.isEmpty()) { 656 LOG.debug("Didn't find any peers with hfile references, won't prevent deletions."); 657 return Collections.emptySet(); 658 } 659 Set<String> hfileRefs = new HashSet<>(); 660 for (String peer : peers) { 661 hfileRefs.addAll(getReplicableHFiles0(peer)); 662 } 663 int v1 = getHFileRefsZNodeCversion(); 664 if (v0 == v1) { 665 return hfileRefs; 666 } 667 LOG.debug("Replication hfile references node cversion changed from %d to %d, retry = %d", 668 v0, v1, retry); 669 } 670 } catch (KeeperException e) { 671 throw new ReplicationException("Failed to get all hfile refs", e); 672 } 673 } 674}