001/* 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.replication; 020 021import java.util.ArrayList; 022import java.util.List; 023import java.util.SortedSet; 024import java.util.TreeSet; 025 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.hbase.Abortable; 029import org.apache.hadoop.hbase.HConstants; 030import org.apache.hadoop.hbase.exceptions.DeserializationException; 031import org.apache.hadoop.hbase.util.Bytes; 032import org.apache.hadoop.hbase.util.Pair; 033import org.apache.hadoop.hbase.zookeeper.ZKUtil; 034import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp; 035import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 036import org.apache.hadoop.hbase.zookeeper.ZNodePaths; 037import org.apache.yetus.audience.InterfaceAudience; 038import org.apache.zookeeper.KeeperException; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041 042/** 043 * This class provides an implementation of the 044 * interface using ZooKeeper. The 045 * base znode that this class works at is the myQueuesZnode. The myQueuesZnode contains a list of 046 * all outstanding WAL files on this region server that need to be replicated. The myQueuesZnode is 047 * the regionserver name (a concatenation of the region server’s hostname, client port and start 048 * code). For example: 049 * 050 * /hbase/replication/rs/hostname.example.org,6020,1234 051 * 052 * Within this znode, the region server maintains a set of WAL replication queues. These queues are 053 * represented by child znodes named using there give queue id. For example: 054 * 055 * /hbase/replication/rs/hostname.example.org,6020,1234/1 056 * /hbase/replication/rs/hostname.example.org,6020,1234/2 057 * 058 * Each queue has one child znode for every WAL that still needs to be replicated. The value of 059 * these WAL child znodes is the latest position that has been replicated. This position is updated 060 * every time a WAL entry is replicated. For example: 061 * 062 * /hbase/replication/rs/hostname.example.org,6020,1234/1/23522342.23422 [VALUE: 254] 063 */ 064@InterfaceAudience.Private 065public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements ReplicationQueues { 066 067 /** Znode containing all replication queues for this region server. */ 068 private String myQueuesZnode; 069 070 private static final Logger LOG = LoggerFactory.getLogger(ReplicationQueuesZKImpl.class); 071 072 public ReplicationQueuesZKImpl(ReplicationQueuesArguments args) { 073 this(args.getZk(), args.getConf(), args.getAbortable()); 074 } 075 076 public ReplicationQueuesZKImpl(final ZKWatcher zk, Configuration conf, 077 Abortable abortable) { 078 super(zk, conf, abortable); 079 } 080 081 @Override 082 public void init(String serverName) throws ReplicationException { 083 this.myQueuesZnode = ZNodePaths.joinZNode(this.queuesZNode, serverName); 084 try { 085 if (ZKUtil.checkExists(this.zookeeper, this.myQueuesZnode) < 0) { 086 ZKUtil.createWithParents(this.zookeeper, this.myQueuesZnode); 087 } 088 } catch (KeeperException e) { 089 throw new ReplicationException("Could not initialize replication queues.", e); 090 } 091 if (conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, 092 HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT)) { 093 try { 094 if (ZKUtil.checkExists(this.zookeeper, this.hfileRefsZNode) < 0) { 095 ZKUtil.createWithParents(this.zookeeper, this.hfileRefsZNode); 096 } 097 } catch (KeeperException e) { 098 throw new ReplicationException("Could not initialize hfile references replication queue.", 099 e); 100 } 101 } 102 } 103 104 @Override 105 public List<String> getListOfReplicators() throws ReplicationException { 106 try { 107 return super.getListOfReplicatorsZK(); 108 } catch (KeeperException e) { 109 LOG.warn("getListOfReplicators() from ZK failed", e); 110 throw new ReplicationException("getListOfReplicators() from ZK failed", e); 111 } 112 } 113 114 @Override 115 public void removeQueue(String queueId) { 116 try { 117 ZKUtil.deleteNodeRecursively(this.zookeeper, 118 ZNodePaths.joinZNode(this.myQueuesZnode, queueId)); 119 } catch (KeeperException e) { 120 this.abortable.abort("Failed to delete queue (queueId=" + queueId + ")", e); 121 } 122 } 123 124 @Override 125 public void addLog(String queueId, String filename) throws ReplicationException { 126 String znode = ZNodePaths.joinZNode(this.myQueuesZnode, queueId); 127 znode = ZNodePaths.joinZNode(znode, filename); 128 try { 129 ZKUtil.createWithParents(this.zookeeper, znode); 130 } catch (KeeperException e) { 131 throw new ReplicationException( 132 "Could not add log because znode could not be created. queueId=" + queueId 133 + ", filename=" + filename); 134 } 135 } 136 137 @Override 138 public void removeLog(String queueId, String filename) { 139 try { 140 String znode = ZNodePaths.joinZNode(this.myQueuesZnode, queueId); 141 znode = ZNodePaths.joinZNode(znode, filename); 142 ZKUtil.deleteNode(this.zookeeper, znode); 143 } catch (KeeperException e) { 144 this.abortable.abort("Failed to remove wal from queue (queueId=" + queueId + ", filename=" 145 + filename + ")", e); 146 } 147 } 148 149 @Override 150 public void setLogPosition(String queueId, String filename, long position) { 151 try { 152 String znode = ZNodePaths.joinZNode(this.myQueuesZnode, queueId); 153 znode = ZNodePaths.joinZNode(znode, filename); 154 // Why serialize String of Long and not Long as bytes? 155 ZKUtil.setData(this.zookeeper, znode, ZKUtil.positionToByteArray(position)); 156 } catch (KeeperException e) { 157 this.abortable.abort("Failed to write replication wal position (filename=" + filename 158 + ", position=" + position + ")", e); 159 } 160 } 161 162 @Override 163 public long getLogPosition(String queueId, String filename) throws ReplicationException { 164 String clusterZnode = ZNodePaths.joinZNode(this.myQueuesZnode, queueId); 165 String znode = ZNodePaths.joinZNode(clusterZnode, filename); 166 byte[] bytes = null; 167 try { 168 bytes = ZKUtil.getData(this.zookeeper, znode); 169 } catch (KeeperException e) { 170 throw new ReplicationException("Internal Error: could not get position in log for queueId=" 171 + queueId + ", filename=" + filename, e); 172 } catch (InterruptedException e) { 173 Thread.currentThread().interrupt(); 174 return 0; 175 } 176 try { 177 return ZKUtil.parseWALPositionFrom(bytes); 178 } catch (DeserializationException de) { 179 LOG.warn("Failed to parse WALPosition for queueId=" + queueId + " and wal=" + filename 180 + " znode content, continuing."); 181 } 182 // if we can not parse the position, start at the beginning of the wal file 183 // again 184 return 0; 185 } 186 187 @Override 188 public boolean isThisOurRegionServer(String regionserver) { 189 return ZNodePaths.joinZNode(this.queuesZNode, regionserver).equals(this.myQueuesZnode); 190 } 191 192 @Override 193 public List<String> getUnClaimedQueueIds(String regionserver) { 194 if (isThisOurRegionServer(regionserver)) { 195 return null; 196 } 197 String rsZnodePath = ZNodePaths.joinZNode(this.queuesZNode, regionserver); 198 List<String> queues = null; 199 try { 200 queues = ZKUtil.listChildrenNoWatch(this.zookeeper, rsZnodePath); 201 } catch (KeeperException e) { 202 this.abortable.abort("Failed to getUnClaimedQueueIds for RS" + regionserver, e); 203 } 204 return queues; 205 } 206 207 @Override 208 public Pair<String, SortedSet<String>> claimQueue(String regionserver, String queueId) { 209 LOG.info("Atomically moving " + regionserver + "/" + queueId + "'s WALs to my queue"); 210 return moveQueueUsingMulti(regionserver, queueId); 211 } 212 213 @Override 214 public void removeReplicatorIfQueueIsEmpty(String regionserver) { 215 String rsPath = ZNodePaths.joinZNode(this.queuesZNode, regionserver); 216 try { 217 List<String> list = ZKUtil.listChildrenNoWatch(this.zookeeper, rsPath); 218 if (list != null && list.isEmpty()){ 219 ZKUtil.deleteNode(this.zookeeper, rsPath); 220 } 221 } catch (KeeperException e) { 222 LOG.warn("Got error while removing replicator", e); 223 } 224 } 225 226 @Override 227 public void removeAllQueues() { 228 try { 229 ZKUtil.deleteNodeRecursively(this.zookeeper, this.myQueuesZnode); 230 } catch (KeeperException e) { 231 // if the znode is already expired, don't bother going further 232 if (e instanceof KeeperException.SessionExpiredException) { 233 return; 234 } 235 this.abortable.abort("Failed to delete replication queues for region server: " 236 + this.myQueuesZnode, e); 237 } 238 } 239 240 @Override 241 public List<String> getLogsInQueue(String queueId) { 242 String znode = ZNodePaths.joinZNode(this.myQueuesZnode, queueId); 243 List<String> result = null; 244 try { 245 result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode); 246 } catch (KeeperException e) { 247 this.abortable.abort("Failed to get list of wals for queueId=" + queueId, e); 248 } 249 return result; 250 } 251 252 @Override 253 public List<String> getAllQueues() { 254 List<String> listOfQueues = null; 255 try { 256 listOfQueues = ZKUtil.listChildrenNoWatch(this.zookeeper, this.myQueuesZnode); 257 } catch (KeeperException e) { 258 this.abortable.abort("Failed to get a list of queues for region server: " 259 + this.myQueuesZnode, e); 260 } 261 return listOfQueues == null ? new ArrayList<>() : listOfQueues; 262 } 263 264 /** 265 * It "atomically" copies one peer's wals queue from another dead region server and returns them 266 * all sorted. The new peer id is equal to the old peer id appended with the dead server's znode. 267 * @param znode pertaining to the region server to copy the queues from 268 * @peerId peerId pertaining to the queue need to be copied 269 */ 270 private Pair<String, SortedSet<String>> moveQueueUsingMulti(String znode, String peerId) { 271 try { 272 // hbase/replication/rs/deadrs 273 String deadRSZnodePath = ZNodePaths.joinZNode(this.queuesZNode, znode); 274 List<ZKUtilOp> listOfOps = new ArrayList<>(); 275 ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId); 276 277 String newPeerId = peerId + "-" + znode; 278 String newPeerZnode = ZNodePaths.joinZNode(this.myQueuesZnode, newPeerId); 279 // check the logs queue for the old peer cluster 280 String oldClusterZnode = ZNodePaths.joinZNode(deadRSZnodePath, peerId); 281 List<String> wals = ZKUtil.listChildrenNoWatch(this.zookeeper, oldClusterZnode); 282 283 if (!peerExists(replicationQueueInfo.getPeerId())) { 284 LOG.warn("Peer " + replicationQueueInfo.getPeerId() + 285 " didn't exist, will move its queue to avoid the failure of multi op"); 286 for (String wal : wals) { 287 String oldWalZnode = ZNodePaths.joinZNode(oldClusterZnode, wal); 288 listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldWalZnode)); 289 } 290 listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode)); 291 ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false); 292 return null; 293 } 294 295 SortedSet<String> logQueue = new TreeSet<>(); 296 if (wals == null || wals.isEmpty()) { 297 listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode)); 298 } else { 299 // create the new cluster znode 300 ZKUtilOp op = ZKUtilOp.createAndFailSilent(newPeerZnode, HConstants.EMPTY_BYTE_ARRAY); 301 listOfOps.add(op); 302 // get the offset of the logs and set it to new znodes 303 for (String wal : wals) { 304 String oldWalZnode = ZNodePaths.joinZNode(oldClusterZnode, wal); 305 byte[] logOffset = ZKUtil.getData(this.zookeeper, oldWalZnode); 306 LOG.debug("Creating " + wal + " with data " + Bytes.toString(logOffset)); 307 String newLogZnode = ZNodePaths.joinZNode(newPeerZnode, wal); 308 listOfOps.add(ZKUtilOp.createAndFailSilent(newLogZnode, logOffset)); 309 listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldWalZnode)); 310 logQueue.add(wal); 311 } 312 // add delete op for peer 313 listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode)); 314 315 if (LOG.isTraceEnabled()) 316 LOG.trace(" The multi list size is: " + listOfOps.size()); 317 } 318 ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false); 319 320 LOG.info("Atomically moved " + znode + "/" + peerId + "'s WALs to my queue"); 321 return new Pair<>(newPeerId, logQueue); 322 } catch (KeeperException e) { 323 // Multi call failed; it looks like some other regionserver took away the logs. 324 LOG.warn("Got exception in copyQueuesFromRSUsingMulti: ", e); 325 } catch (InterruptedException e) { 326 LOG.warn("Got exception in copyQueuesFromRSUsingMulti: ", e); 327 Thread.currentThread().interrupt(); 328 } 329 return null; 330 } 331 332 @Override 333 public void addHFileRefs(String peerId, List<Pair<Path, Path>> pairs) 334 throws ReplicationException { 335 String peerZnode = ZNodePaths.joinZNode(this.hfileRefsZNode, peerId); 336 boolean debugEnabled = LOG.isDebugEnabled(); 337 if (debugEnabled) { 338 LOG.debug("Adding hfile references " + pairs + " in queue " + peerZnode); 339 } 340 341 int size = pairs.size(); 342 List<ZKUtilOp> listOfOps = new ArrayList<>(size); 343 344 for (int i = 0; i < size; i++) { 345 listOfOps.add(ZKUtilOp.createAndFailSilent( 346 ZNodePaths.joinZNode(peerZnode, pairs.get(i).getSecond().getName()), 347 HConstants.EMPTY_BYTE_ARRAY)); 348 } 349 if (debugEnabled) { 350 LOG.debug(" The multi list size for adding hfile references in zk for node " + peerZnode 351 + " is " + listOfOps.size()); 352 } 353 try { 354 ZKUtil.multiOrSequential(this.zookeeper, listOfOps, true); 355 } catch (KeeperException e) { 356 throw new ReplicationException("Failed to create hfile reference znode=" + e.getPath(), e); 357 } 358 } 359 360 @Override 361 public void removeHFileRefs(String peerId, List<String> files) { 362 String peerZnode = ZNodePaths.joinZNode(this.hfileRefsZNode, peerId); 363 boolean debugEnabled = LOG.isDebugEnabled(); 364 if (debugEnabled) { 365 LOG.debug("Removing hfile references " + files + " from queue " + peerZnode); 366 } 367 368 int size = files.size(); 369 List<ZKUtilOp> listOfOps = new ArrayList<>(size); 370 371 for (int i = 0; i < size; i++) { 372 listOfOps.add(ZKUtilOp.deleteNodeFailSilent(ZNodePaths.joinZNode(peerZnode, files.get(i)))); 373 } 374 if (debugEnabled) { 375 LOG.debug(" The multi list size for removing hfile references in zk for node " + peerZnode 376 + " is " + listOfOps.size()); 377 } 378 try { 379 ZKUtil.multiOrSequential(this.zookeeper, listOfOps, true); 380 } catch (KeeperException e) { 381 LOG.error("Failed to remove hfile reference znode=" + e.getPath(), e); 382 } 383 } 384 385 @Override 386 public void addPeerToHFileRefs(String peerId) throws ReplicationException { 387 String peerZnode = ZNodePaths.joinZNode(this.hfileRefsZNode, peerId); 388 try { 389 if (ZKUtil.checkExists(this.zookeeper, peerZnode) == -1) { 390 LOG.info("Adding peer " + peerId + " to hfile reference queue."); 391 ZKUtil.createWithParents(this.zookeeper, peerZnode); 392 } 393 } catch (KeeperException e) { 394 throw new ReplicationException("Failed to add peer " + peerId + " to hfile reference queue.", 395 e); 396 } 397 } 398 399 @Override 400 public void removePeerFromHFileRefs(String peerId) { 401 final String peerZnode = ZNodePaths.joinZNode(this.hfileRefsZNode, peerId); 402 try { 403 if (ZKUtil.checkExists(this.zookeeper, peerZnode) == -1) { 404 if (LOG.isDebugEnabled()) { 405 LOG.debug("Peer " + peerZnode + " not found in hfile reference queue."); 406 } 407 return; 408 } else { 409 LOG.info("Removing peer " + peerZnode + " from hfile reference queue."); 410 ZKUtil.deleteNodeRecursively(this.zookeeper, peerZnode); 411 } 412 } catch (KeeperException e) { 413 LOG.error("Ignoring the exception to remove peer " + peerId + " from hfile reference queue.", 414 e); 415 } 416 } 417}