001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static java.util.stream.Collectors.toList; 021 022import java.util.ArrayList; 023import java.util.Collections; 024import java.util.HashSet; 025import java.util.List; 026import java.util.Map; 027import java.util.Map.Entry; 028import java.util.Set; 029import java.util.SortedSet; 030import java.util.TreeSet; 031import java.util.stream.Collectors; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.hbase.HConstants; 035import org.apache.hadoop.hbase.ServerName; 036import org.apache.hadoop.hbase.client.RegionInfo; 037import org.apache.hadoop.hbase.exceptions.DeserializationException; 038import org.apache.hadoop.hbase.util.Bytes; 039import org.apache.hadoop.hbase.util.Pair; 040import org.apache.hadoop.hbase.zookeeper.ZKUtil; 041import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp; 042import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 043import org.apache.hadoop.hbase.zookeeper.ZNodePaths; 044import org.apache.yetus.audience.InterfaceAudience; 045import org.apache.zookeeper.KeeperException; 046import org.apache.zookeeper.KeeperException.BadVersionException; 047import org.apache.zookeeper.KeeperException.NoNodeException; 048import org.apache.zookeeper.KeeperException.NodeExistsException; 049import org.apache.zookeeper.KeeperException.NotEmptyException; 050import org.apache.zookeeper.data.Stat; 051import org.slf4j.Logger; 052import org.slf4j.LoggerFactory; 053 054import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 055 056/** 057 * ZK based replication queue storage. 058 * <p> 059 * The base znode for each regionserver is the regionserver name. For example: 060 * 061 * <pre> 062 * /hbase/replication/rs/hostname.example.org,6020,1234 063 * </pre> 064 * 065 * Within this znode, the region server maintains a set of WAL replication queues. These queues are 066 * represented by child znodes named using there give queue id. For example: 067 * 068 * <pre> 069 * /hbase/replication/rs/hostname.example.org,6020,1234/1 070 * /hbase/replication/rs/hostname.example.org,6020,1234/2 071 * </pre> 072 * 073 * Each queue has one child znode for every WAL that still needs to be replicated. The value of 074 * these WAL child znodes is the latest position that has been replicated. This position is updated 075 * every time a WAL entry is replicated. For example: 076 * 077 * <pre> 078 * /hbase/replication/rs/hostname.example.org,6020,1234/1/23522342.23422 [VALUE: 254] 079 * </pre> 080 */ 081@InterfaceAudience.Private 082class ZKReplicationQueueStorage extends ZKReplicationStorageBase 083 implements ReplicationQueueStorage { 084 085 private static final Logger LOG = LoggerFactory.getLogger(ZKReplicationQueueStorage.class); 086 087 public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY = 088 "zookeeper.znode.replication.hfile.refs"; 089 public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT = "hfile-refs"; 090 091 public static final String ZOOKEEPER_ZNODE_REPLICATION_REGIONS_KEY = 092 "zookeeper.znode.replication.regions"; 093 public static final String ZOOKEEPER_ZNODE_REPLICATION_REGIONS_DEFAULT = "regions"; 094 095 /** 096 * The name of the znode that contains all replication queues 097 */ 098 private final String queuesZNode; 099 100 /** 101 * The name of the znode that contains queues of hfile references to be replicated 102 */ 103 private final String hfileRefsZNode; 104 105 final String regionsZNode; 106 107 public ZKReplicationQueueStorage(ZKWatcher zookeeper, Configuration conf) { 108 super(zookeeper, conf); 109 110 String queuesZNodeName = conf.get("zookeeper.znode.replication.rs", "rs"); 111 String hfileRefsZNodeName = conf.get(ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY, 112 ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT); 113 this.queuesZNode = ZNodePaths.joinZNode(replicationZNode, queuesZNodeName); 114 this.hfileRefsZNode = ZNodePaths.joinZNode(replicationZNode, hfileRefsZNodeName); 115 this.regionsZNode = ZNodePaths.joinZNode(replicationZNode, conf 116 .get(ZOOKEEPER_ZNODE_REPLICATION_REGIONS_KEY, ZOOKEEPER_ZNODE_REPLICATION_REGIONS_DEFAULT)); 117 } 118 119 @Override 120 public String getRsNode(ServerName serverName) { 121 return ZNodePaths.joinZNode(queuesZNode, serverName.getServerName()); 122 } 123 124 private String getQueueNode(ServerName serverName, String queueId) { 125 return ZNodePaths.joinZNode(getRsNode(serverName), queueId); 126 } 127 128 private String getFileNode(String queueNode, String fileName) { 129 return ZNodePaths.joinZNode(queueNode, fileName); 130 } 131 132 private String getFileNode(ServerName serverName, String queueId, String fileName) { 133 return getFileNode(getQueueNode(serverName, queueId), fileName); 134 } 135 136 /** 137 * <p> 138 * Put all regions under /hbase/replication/regions znode will lead to too many children because 139 * of the huge number of regions in real production environment. So here we will distribute the 140 * znodes to multiple directories. 141 * </p> 142 * <p> 143 * So the final znode path will be format like this: 144 * 145 * <pre> 146 * /hbase/replication/regions/dd/04/e76a6966d4ffa908ed0586764767-100 147 * </pre> 148 * 149 * Here the full encoded region name is dd04e76a6966d4ffa908ed0586764767, and we use the first two 150 * characters 'dd' as the first level directory name, and use the next two characters '04' as the 151 * second level directory name, and the rest part as the prefix of the znode, and the suffix '100' 152 * is the peer id. 153 * </p> 154 * @param encodedRegionName the encoded region name. 155 * @param peerId peer id for replication. 156 * @return ZNode path to persist the max sequence id that we've pushed for the given region and 157 * peer. 158 */ 159 String getSerialReplicationRegionPeerNode(String encodedRegionName, String peerId) { 160 if (encodedRegionName == null || encodedRegionName.length() != RegionInfo.MD5_HEX_LENGTH) { 161 throw new IllegalArgumentException( 162 "Invalid encoded region name: " + encodedRegionName + ", length should be 32."); 163 } 164 return new StringBuilder(regionsZNode).append(ZNodePaths.ZNODE_PATH_SEPARATOR) 165 .append(encodedRegionName, 0, 2).append(ZNodePaths.ZNODE_PATH_SEPARATOR) 166 .append(encodedRegionName, 2, 4).append(ZNodePaths.ZNODE_PATH_SEPARATOR) 167 .append(encodedRegionName, 4, encodedRegionName.length()).append("-").append(peerId) 168 .toString(); 169 } 170 171 @Override 172 public void removeQueue(ServerName serverName, String queueId) throws ReplicationException { 173 try { 174 ZKUtil.deleteNodeRecursively(zookeeper, getQueueNode(serverName, queueId)); 175 } catch (KeeperException e) { 176 throw new ReplicationException( 177 "Failed to delete queue (serverName=" + serverName + ", queueId=" + queueId + ")", e); 178 } 179 } 180 181 @Override 182 public void addWAL(ServerName serverName, String queueId, String fileName) 183 throws ReplicationException { 184 try { 185 ZKUtil.createWithParents(zookeeper, getFileNode(serverName, queueId, fileName)); 186 } catch (KeeperException e) { 187 throw new ReplicationException("Failed to add wal to queue (serverName=" + serverName 188 + ", queueId=" + queueId + ", fileName=" + fileName + ")", e); 189 } 190 } 191 192 @Override 193 public void removeWAL(ServerName serverName, String queueId, String fileName) 194 throws ReplicationException { 195 String fileNode = getFileNode(serverName, queueId, fileName); 196 try { 197 ZKUtil.deleteNode(zookeeper, fileNode); 198 } catch (NoNodeException e) { 199 LOG.warn("{} already deleted when removing log", fileNode); 200 } catch (KeeperException e) { 201 throw new ReplicationException("Failed to remove wal from queue (serverName=" + serverName 202 + ", queueId=" + queueId + ", fileName=" + fileName + ")", e); 203 } 204 } 205 206 private void addLastSeqIdsToOps(String queueId, Map<String, Long> lastSeqIds, 207 List<ZKUtilOp> listOfOps) throws KeeperException, ReplicationException { 208 String peerId = new ReplicationQueueInfo(queueId).getPeerId(); 209 for (Entry<String, Long> lastSeqEntry : lastSeqIds.entrySet()) { 210 String path = getSerialReplicationRegionPeerNode(lastSeqEntry.getKey(), peerId); 211 Pair<Long, Integer> p = getLastSequenceIdWithVersion(lastSeqEntry.getKey(), peerId); 212 byte[] data = ZKUtil.positionToByteArray(lastSeqEntry.getValue()); 213 if (p.getSecond() < 0) { // ZNode does not exist. 214 ZKUtil.createWithParents(zookeeper, 215 path.substring(0, path.lastIndexOf(ZNodePaths.ZNODE_PATH_SEPARATOR))); 216 listOfOps.add(ZKUtilOp.createAndFailSilent(path, data)); 217 continue; 218 } 219 // Perform CAS in a specific version v0 (HBASE-20138) 220 int v0 = p.getSecond(); 221 long lastPushedSeqId = p.getFirst(); 222 if (lastSeqEntry.getValue() <= lastPushedSeqId) { 223 continue; 224 } 225 listOfOps.add(ZKUtilOp.setData(path, data, v0)); 226 } 227 } 228 229 @Override 230 public void setWALPosition(ServerName serverName, String queueId, String fileName, long position, 231 Map<String, Long> lastSeqIds) throws ReplicationException { 232 try { 233 for (int retry = 0;; retry++) { 234 List<ZKUtilOp> listOfOps = new ArrayList<>(); 235 if (position > 0) { 236 listOfOps.add(ZKUtilOp.setData(getFileNode(serverName, queueId, fileName), 237 ZKUtil.positionToByteArray(position))); 238 } 239 // Persist the max sequence id(s) of regions for serial replication atomically. 240 addLastSeqIdsToOps(queueId, lastSeqIds, listOfOps); 241 if (listOfOps.isEmpty()) { 242 return; 243 } 244 try { 245 ZKUtil.multiOrSequential(zookeeper, listOfOps, false); 246 return; 247 } catch (KeeperException.BadVersionException | KeeperException.NodeExistsException e) { 248 LOG.warn( 249 "Bad version(or node exist) when persist the last pushed sequence id to zookeeper " 250 + "storage, Retry = " + retry + ", serverName=" + serverName + ", queueId=" + queueId 251 + ", fileName=" + fileName); 252 } 253 } 254 } catch (KeeperException e) { 255 throw new ReplicationException("Failed to set log position (serverName=" + serverName 256 + ", queueId=" + queueId + ", fileName=" + fileName + ", position=" + position + ")", e); 257 } 258 } 259 260 /** 261 * Return the {lastPushedSequenceId, ZNodeDataVersion} pair. if ZNodeDataVersion is -1, it means 262 * that the ZNode does not exist. 263 */ 264 protected Pair<Long, Integer> getLastSequenceIdWithVersion(String encodedRegionName, 265 String peerId) throws KeeperException { 266 Stat stat = new Stat(); 267 String path = getSerialReplicationRegionPeerNode(encodedRegionName, peerId); 268 byte[] data = ZKUtil.getDataNoWatch(zookeeper, path, stat); 269 if (data == null) { 270 // ZNode does not exist, so just return version -1 to indicate that no node exist. 271 return Pair.newPair(HConstants.NO_SEQNUM, -1); 272 } 273 try { 274 return Pair.newPair(ZKUtil.parseWALPositionFrom(data), stat.getVersion()); 275 } catch (DeserializationException de) { 276 LOG.warn("Failed to parse log position (region=" + encodedRegionName + ", peerId=" + peerId 277 + "), data=" + Bytes.toStringBinary(data)); 278 } 279 return Pair.newPair(HConstants.NO_SEQNUM, stat.getVersion()); 280 } 281 282 @Override 283 public long getLastSequenceId(String encodedRegionName, String peerId) 284 throws ReplicationException { 285 try { 286 return getLastSequenceIdWithVersion(encodedRegionName, peerId).getFirst(); 287 } catch (KeeperException e) { 288 throw new ReplicationException("Failed to get last pushed sequence id (encodedRegionName=" 289 + encodedRegionName + ", peerId=" + peerId + ")", e); 290 } 291 } 292 293 @Override 294 public void setLastSequenceIds(String peerId, Map<String, Long> lastSeqIds) 295 throws ReplicationException { 296 try { 297 // No need CAS and retry here, because it'll call setLastSequenceIds() for disabled peers 298 // only, so no conflict happen. 299 List<ZKUtilOp> listOfOps = new ArrayList<>(); 300 for (Entry<String, Long> lastSeqEntry : lastSeqIds.entrySet()) { 301 String path = getSerialReplicationRegionPeerNode(lastSeqEntry.getKey(), peerId); 302 ZKUtil.createWithParents(zookeeper, path); 303 listOfOps.add(ZKUtilOp.setData(path, ZKUtil.positionToByteArray(lastSeqEntry.getValue()))); 304 } 305 if (!listOfOps.isEmpty()) { 306 ZKUtil.multiOrSequential(zookeeper, listOfOps, true); 307 } 308 } catch (KeeperException e) { 309 throw new ReplicationException("Failed to set last sequence ids, peerId=" + peerId 310 + ", size of lastSeqIds=" + lastSeqIds.size(), e); 311 } 312 } 313 314 @Override 315 public void removeLastSequenceIds(String peerId) throws ReplicationException { 316 String suffix = "-" + peerId; 317 try { 318 StringBuilder sb = new StringBuilder(regionsZNode); 319 int regionsZNodeLength = regionsZNode.length(); 320 int levelOneLength = regionsZNodeLength + 3; 321 int levelTwoLength = levelOneLength + 3; 322 List<String> levelOneDirs = ZKUtil.listChildrenNoWatch(zookeeper, regionsZNode); 323 // it is possible that levelOneDirs is null if we haven't write any last pushed sequence ids 324 // yet, so we need an extra check here. 325 if (CollectionUtils.isEmpty(levelOneDirs)) { 326 return; 327 } 328 for (String levelOne : levelOneDirs) { 329 sb.append(ZNodePaths.ZNODE_PATH_SEPARATOR).append(levelOne); 330 for (String levelTwo : ZKUtil.listChildrenNoWatch(zookeeper, sb.toString())) { 331 sb.append(ZNodePaths.ZNODE_PATH_SEPARATOR).append(levelTwo); 332 for (String znode : ZKUtil.listChildrenNoWatch(zookeeper, sb.toString())) { 333 if (znode.endsWith(suffix)) { 334 sb.append(ZNodePaths.ZNODE_PATH_SEPARATOR).append(znode); 335 ZKUtil.deleteNode(zookeeper, sb.toString()); 336 sb.setLength(levelTwoLength); 337 } 338 } 339 sb.setLength(levelOneLength); 340 } 341 sb.setLength(regionsZNodeLength); 342 } 343 } catch (KeeperException e) { 344 throw new ReplicationException("Failed to remove all last sequence ids, peerId=" + peerId, e); 345 } 346 } 347 348 @Override 349 public void removeLastSequenceIds(String peerId, List<String> encodedRegionNames) 350 throws ReplicationException { 351 try { 352 List<ZKUtilOp> listOfOps = 353 encodedRegionNames.stream().map(n -> getSerialReplicationRegionPeerNode(n, peerId)) 354 .map(ZKUtilOp::deleteNodeFailSilent).collect(Collectors.toList()); 355 ZKUtil.multiOrSequential(zookeeper, listOfOps, true); 356 } catch (KeeperException e) { 357 throw new ReplicationException("Failed to remove last sequence ids, peerId=" + peerId 358 + ", encodedRegionNames.size=" + encodedRegionNames.size(), e); 359 } 360 } 361 362 @Override 363 public long getWALPosition(ServerName serverName, String queueId, String fileName) 364 throws ReplicationException { 365 byte[] bytes; 366 try { 367 bytes = ZKUtil.getData(zookeeper, getFileNode(serverName, queueId, fileName)); 368 } catch (KeeperException | InterruptedException e) { 369 throw new ReplicationException("Failed to get log position (serverName=" + serverName 370 + ", queueId=" + queueId + ", fileName=" + fileName + ")", e); 371 } 372 try { 373 return ZKUtil.parseWALPositionFrom(bytes); 374 } catch (DeserializationException de) { 375 LOG.warn("Failed parse log position (serverName={}, queueId={}, fileName={})", serverName, 376 queueId, fileName); 377 } 378 // if we can not parse the position, start at the beginning of the wal file again 379 return 0; 380 } 381 382 /** 383 * This implement must update the cversion of root {@link #queuesZNode}. The optimistic lock of 384 * the {@link #getAllWALs()} method is based on the cversion of root {@link #queuesZNode}. 385 * @see #getAllWALs() to show the usage of the cversion of root {@link #queuesZNode} . 386 */ 387 @Override 388 public Pair<String, SortedSet<String>> claimQueue(ServerName sourceServerName, String queueId, 389 ServerName destServerName) throws ReplicationException { 390 LOG.info("Atomically moving {}/{}'s WALs to {}", sourceServerName, queueId, destServerName); 391 try { 392 ZKUtil.createWithParents(zookeeper, getRsNode(destServerName)); 393 } catch (KeeperException e) { 394 throw new ReplicationException("Claim queue queueId=" + queueId + " from " + sourceServerName 395 + " to " + destServerName + " failed when creating the node for " + destServerName, e); 396 } 397 String newQueueId = queueId + "-" + sourceServerName; 398 try { 399 String oldQueueNode = getQueueNode(sourceServerName, queueId); 400 List<String> wals = ZKUtil.listChildrenNoWatch(zookeeper, oldQueueNode); 401 if (CollectionUtils.isEmpty(wals)) { 402 ZKUtil.deleteNodeFailSilent(zookeeper, oldQueueNode); 403 LOG.info("Removed empty {}/{}", sourceServerName, queueId); 404 return new Pair<>(newQueueId, Collections.emptySortedSet()); 405 } 406 String newQueueNode = getQueueNode(destServerName, newQueueId); 407 List<ZKUtilOp> listOfOps = new ArrayList<>(); 408 SortedSet<String> logQueue = new TreeSet<>(); 409 // create the new cluster znode 410 listOfOps.add(ZKUtilOp.createAndFailSilent(newQueueNode, HConstants.EMPTY_BYTE_ARRAY)); 411 // get the offset of the logs and set it to new znodes 412 for (String wal : wals) { 413 String oldWalNode = getFileNode(oldQueueNode, wal); 414 byte[] logOffset = ZKUtil.getData(this.zookeeper, oldWalNode); 415 LOG.debug("Creating {} with data {}", wal, Bytes.toStringBinary(logOffset)); 416 String newWalNode = getFileNode(newQueueNode, wal); 417 listOfOps.add(ZKUtilOp.createAndFailSilent(newWalNode, logOffset)); 418 listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldWalNode)); 419 logQueue.add(wal); 420 } 421 // add delete op for peer 422 listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldQueueNode)); 423 // Append new queue id for prevent lock competition in zookeeper server. 424 String claimLockZNode = ZNodePaths.joinZNode(queuesZNode, "cversion_" + newQueueId); 425 // A trick for update the cversion of root queuesZNode . 426 // The optimistic lock of the getAllWALs() method is based on the cversion of root queuesZNode 427 listOfOps.add(ZKUtilOp.createAndFailSilent(claimLockZNode, HConstants.EMPTY_BYTE_ARRAY)); 428 listOfOps.add(ZKUtilOp.deleteNodeFailSilent(claimLockZNode)); 429 430 LOG.trace("The multi list size is {}", listOfOps.size()); 431 ZKUtil.multiOrSequential(zookeeper, listOfOps, false); 432 433 LOG.info("Atomically moved {}/{}'s WALs to {}", sourceServerName, queueId, destServerName); 434 return new Pair<>(newQueueId, logQueue); 435 } catch (NoNodeException | NodeExistsException | NotEmptyException | BadVersionException e) { 436 // Multi call failed; it looks like some other regionserver took away the logs. 437 // These exceptions mean that zk tells us the request can not be execute. So return an empty 438 // queue to tell the upper layer that claim nothing. For other types of exception should be 439 // thrown out to notify the upper layer. 440 LOG.info("Claim queue queueId={} from {} to {} failed with {}, someone else took the log?", 441 queueId, sourceServerName, destServerName, e.toString()); 442 return new Pair<>(newQueueId, Collections.emptySortedSet()); 443 } catch (KeeperException | InterruptedException e) { 444 throw new ReplicationException("Claim queue queueId=" + queueId + " from " + sourceServerName 445 + " to " + destServerName + " failed", e); 446 } 447 } 448 449 @Override 450 public void removeReplicatorIfQueueIsEmpty(ServerName serverName) throws ReplicationException { 451 try { 452 ZKUtil.deleteNodeFailSilent(zookeeper, getRsNode(serverName)); 453 } catch (NotEmptyException e) { 454 // keep silence to avoid logging too much. 455 } catch (KeeperException e) { 456 throw new ReplicationException("Failed to remove replicator for " + serverName, e); 457 } 458 } 459 460 private List<ServerName> getListOfReplicators0() throws KeeperException { 461 List<String> children = ZKUtil.listChildrenNoWatch(zookeeper, queuesZNode); 462 if (children == null) { 463 children = Collections.emptyList(); 464 } 465 return children.stream().map(ServerName::parseServerName).collect(toList()); 466 } 467 468 @Override 469 public List<ServerName> getListOfReplicators() throws ReplicationException { 470 try { 471 return getListOfReplicators0(); 472 } catch (KeeperException e) { 473 throw new ReplicationException("Failed to get list of replicators", e); 474 } 475 } 476 477 private List<String> getWALsInQueue0(ServerName serverName, String queueId) 478 throws KeeperException { 479 List<String> children = 480 ZKUtil.listChildrenNoWatch(zookeeper, getQueueNode(serverName, queueId)); 481 return children != null ? children : Collections.emptyList(); 482 } 483 484 @Override 485 public List<String> getWALsInQueue(ServerName serverName, String queueId) 486 throws ReplicationException { 487 try { 488 return getWALsInQueue0(serverName, queueId); 489 } catch (KeeperException e) { 490 throw new ReplicationException( 491 "Failed to get wals in queue (serverName=" + serverName + ", queueId=" + queueId + ")", e); 492 } 493 } 494 495 private List<String> getAllQueues0(ServerName serverName) throws KeeperException { 496 List<String> children = ZKUtil.listChildrenNoWatch(zookeeper, getRsNode(serverName)); 497 return children != null ? children : Collections.emptyList(); 498 } 499 500 @Override 501 public List<String> getAllQueues(ServerName serverName) throws ReplicationException { 502 try { 503 return getAllQueues0(serverName); 504 } catch (KeeperException e) { 505 throw new ReplicationException("Failed to get all queues (serverName=" + serverName + ")", e); 506 } 507 } 508 509 // will be overridden in UTs 510 protected int getQueuesZNodeCversion() throws KeeperException { 511 Stat stat = new Stat(); 512 ZKUtil.getDataNoWatch(this.zookeeper, this.queuesZNode, stat); 513 return stat.getCversion(); 514 } 515 516 /** 517 * The optimistic lock of this implement is based on the cversion of root {@link #queuesZNode}. 518 * Therefore, we must update the cversion of root {@link #queuesZNode} when migrate wal nodes to 519 * other queues. 520 * @see #claimQueue(ServerName, String, ServerName) as an example of updating root 521 * {@link #queuesZNode} cversion. 522 */ 523 @Override 524 public Set<String> getAllWALs() throws ReplicationException { 525 try { 526 for (int retry = 0;; retry++) { 527 int v0 = getQueuesZNodeCversion(); 528 List<ServerName> rss = getListOfReplicators0(); 529 if (rss.isEmpty()) { 530 LOG.debug("Didn't find a RegionServer that replicates, won't prevent deletions."); 531 return Collections.emptySet(); 532 } 533 Set<String> wals = new HashSet<>(); 534 for (ServerName rs : rss) { 535 for (String queueId : getAllQueues0(rs)) { 536 wals.addAll(getWALsInQueue0(rs, queueId)); 537 } 538 } 539 int v1 = getQueuesZNodeCversion(); 540 if (v0 == v1) { 541 return wals; 542 } 543 LOG.info("Replication queue node cversion changed from %d to %d, retry = %d", v0, v1, 544 retry); 545 } 546 } catch (KeeperException e) { 547 throw new ReplicationException("Failed to get all wals", e); 548 } 549 } 550 551 private String getHFileRefsPeerNode(String peerId) { 552 return ZNodePaths.joinZNode(hfileRefsZNode, peerId); 553 } 554 555 private String getHFileNode(String peerNode, String fileName) { 556 return ZNodePaths.joinZNode(peerNode, fileName); 557 } 558 559 @Override 560 public void addPeerToHFileRefs(String peerId) throws ReplicationException { 561 String peerNode = getHFileRefsPeerNode(peerId); 562 try { 563 if (ZKUtil.checkExists(zookeeper, peerNode) == -1) { 564 LOG.info("Adding peer {} to hfile reference queue.", peerId); 565 ZKUtil.createWithParents(zookeeper, peerNode); 566 } 567 } catch (KeeperException e) { 568 throw new ReplicationException("Failed to add peer " + peerId + " to hfile reference queue.", 569 e); 570 } 571 } 572 573 @Override 574 public void removePeerFromHFileRefs(String peerId) throws ReplicationException { 575 String peerNode = getHFileRefsPeerNode(peerId); 576 try { 577 if (ZKUtil.checkExists(zookeeper, peerNode) == -1) { 578 LOG.debug("Peer {} not found in hfile reference queue.", peerNode); 579 } else { 580 LOG.info("Removing peer {} from hfile reference queue.", peerNode); 581 ZKUtil.deleteNodeRecursively(zookeeper, peerNode); 582 } 583 } catch (KeeperException e) { 584 throw new ReplicationException( 585 "Failed to remove peer " + peerId + " from hfile reference queue.", e); 586 } 587 } 588 589 @Override 590 public void addHFileRefs(String peerId, List<Pair<Path, Path>> pairs) 591 throws ReplicationException { 592 String peerNode = getHFileRefsPeerNode(peerId); 593 LOG.debug("Adding hfile references {} in queue {}", pairs, peerNode); 594 List<ZKUtilOp> listOfOps = 595 pairs.stream().map(p -> p.getSecond().getName()).map(n -> getHFileNode(peerNode, n)) 596 .map(f -> ZKUtilOp.createAndFailSilent(f, HConstants.EMPTY_BYTE_ARRAY)).collect(toList()); 597 LOG.debug("The multi list size for adding hfile references in zk for node {} is {}", peerNode, 598 listOfOps.size()); 599 try { 600 ZKUtil.multiOrSequential(this.zookeeper, listOfOps, true); 601 } catch (KeeperException e) { 602 throw new ReplicationException("Failed to add hfile reference to peer " + peerId, e); 603 } 604 } 605 606 @Override 607 public void removeHFileRefs(String peerId, List<String> files) throws ReplicationException { 608 String peerNode = getHFileRefsPeerNode(peerId); 609 LOG.debug("Removing hfile references {} from queue {}", files, peerNode); 610 611 List<ZKUtilOp> listOfOps = files.stream().map(n -> getHFileNode(peerNode, n)) 612 .map(ZKUtilOp::deleteNodeFailSilent).collect(toList()); 613 LOG.debug("The multi list size for removing hfile references in zk for node {} is {}", peerNode, 614 listOfOps.size()); 615 try { 616 ZKUtil.multiOrSequential(this.zookeeper, listOfOps, true); 617 } catch (KeeperException e) { 618 throw new ReplicationException("Failed to remove hfile reference from peer " + peerId, e); 619 } 620 } 621 622 private List<String> getAllPeersFromHFileRefsQueue0() throws KeeperException { 623 List<String> children = ZKUtil.listChildrenNoWatch(zookeeper, hfileRefsZNode); 624 return children != null ? children : Collections.emptyList(); 625 } 626 627 @Override 628 public List<String> getAllPeersFromHFileRefsQueue() throws ReplicationException { 629 try { 630 return getAllPeersFromHFileRefsQueue0(); 631 } catch (KeeperException e) { 632 throw new ReplicationException("Failed to get list of all peers in hfile references node.", 633 e); 634 } 635 } 636 637 private List<String> getReplicableHFiles0(String peerId) throws KeeperException { 638 List<String> children = 639 ZKUtil.listChildrenNoWatch(this.zookeeper, getHFileRefsPeerNode(peerId)); 640 return children != null ? children : Collections.emptyList(); 641 } 642 643 @Override 644 public List<String> getReplicableHFiles(String peerId) throws ReplicationException { 645 try { 646 return getReplicableHFiles0(peerId); 647 } catch (KeeperException e) { 648 throw new ReplicationException("Failed to get list of hfile references for peer " + peerId, 649 e); 650 } 651 } 652 653 // will be overridden in UTs 654 protected int getHFileRefsZNodeCversion() throws ReplicationException { 655 Stat stat = new Stat(); 656 try { 657 ZKUtil.getDataNoWatch(zookeeper, hfileRefsZNode, stat); 658 } catch (KeeperException e) { 659 throw new ReplicationException("Failed to get stat of replication hfile references node.", e); 660 } 661 return stat.getCversion(); 662 } 663 664 @Override 665 public Set<String> getAllHFileRefs() throws ReplicationException { 666 try { 667 for (int retry = 0;; retry++) { 668 int v0 = getHFileRefsZNodeCversion(); 669 List<String> peers = getAllPeersFromHFileRefsQueue(); 670 if (peers.isEmpty()) { 671 LOG.debug("Didn't find any peers with hfile references, won't prevent deletions."); 672 return Collections.emptySet(); 673 } 674 Set<String> hfileRefs = new HashSet<>(); 675 for (String peer : peers) { 676 hfileRefs.addAll(getReplicableHFiles0(peer)); 677 } 678 int v1 = getHFileRefsZNodeCversion(); 679 if (v0 == v1) { 680 return hfileRefs; 681 } 682 LOG.debug("Replication hfile references node cversion changed from %d to %d, retry = %d", 683 v0, v1, retry); 684 } 685 } catch (KeeperException e) { 686 throw new ReplicationException("Failed to get all hfile refs", e); 687 } 688 } 689}