001/* 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.replication; 020 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Collection; 024import java.util.List; 025import java.util.Map; 026import java.util.Set; 027import java.util.TreeMap; 028import java.util.concurrent.ConcurrentHashMap; 029import java.util.concurrent.ConcurrentMap; 030 031import org.apache.commons.lang3.StringUtils; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.hbase.Abortable; 034import org.apache.hadoop.hbase.CompoundConfiguration; 035import org.apache.hadoop.hbase.HBaseConfiguration; 036import org.apache.hadoop.hbase.TableName; 037import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; 038import org.apache.hadoop.hbase.exceptions.DeserializationException; 039import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; 040import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState; 041import org.apache.hadoop.hbase.util.Pair; 042import org.apache.hadoop.hbase.zookeeper.ZKConfig; 043import org.apache.hadoop.hbase.zookeeper.ZKUtil; 044import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp; 045import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 046import org.apache.hadoop.hbase.zookeeper.ZNodePaths; 047import org.apache.yetus.audience.InterfaceAudience; 048import org.apache.zookeeper.KeeperException; 049import org.slf4j.Logger; 050import org.slf4j.LoggerFactory; 051 052/** 053 * This class provides an implementation of the ReplicationPeers interface using ZooKeeper. The 054 * peers znode contains a list of all peer replication clusters and the current replication state of 055 * those clusters. It has one child peer znode for each peer cluster. The peer znode is named with 056 * the cluster id provided by the user in the HBase shell. The value of the peer znode contains the 057 * peers cluster key provided by the user in the HBase Shell. The cluster key contains a list of 058 * zookeeper quorum peers, the client port for the zookeeper quorum, and the base znode for HBase. 059 * For example: 060 * 061 * /hbase/replication/peers/1 [Value: zk1.host.com,zk2.host.com,zk3.host.com:2181:/hbase] 062 * /hbase/replication/peers/2 [Value: zk5.host.com,zk6.host.com,zk7.host.com:2181:/hbase] 063 * 064 * Each of these peer znodes has a child znode that indicates whether or not replication is enabled 065 * on that peer cluster. These peer-state znodes do not have child znodes and simply contain a 066 * boolean value (i.e. ENABLED or DISABLED). This value is read/maintained by the 067 * ReplicationPeer.PeerStateTracker class. For example: 068 * 069 * /hbase/replication/peers/1/peer-state [Value: ENABLED] 070 * 071 * Each of these peer znodes has a child znode that indicates which data will be replicated 072 * to the peer cluster. These peer-tableCFs znodes do not have child znodes and only have a 073 * table/cf list config. This value is read/maintained by the ReplicationPeer.TableCFsTracker 074 * class. For example: 075 * 076 * /hbase/replication/peers/1/tableCFs [Value: "table1; table2:cf1,cf3; table3:cfx,cfy"] 077 */ 078@InterfaceAudience.Private 079public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements ReplicationPeers { 080 081 // Map of peer clusters keyed by their id 082 private Map<String, ReplicationPeerZKImpl> peerClusters; 083 private final ReplicationQueuesClient queuesClient; 084 private Abortable abortable; 085 086 private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeersZKImpl.class); 087 088 public ReplicationPeersZKImpl(final ZKWatcher zk, final Configuration conf, 089 final ReplicationQueuesClient queuesClient, Abortable abortable) { 090 super(zk, conf, abortable); 091 this.abortable = abortable; 092 this.peerClusters = new ConcurrentHashMap<>(); 093 this.queuesClient = queuesClient; 094 } 095 096 @Override 097 public void init() throws ReplicationException { 098 try { 099 if (ZKUtil.checkExists(this.zookeeper, this.peersZNode) < 0) { 100 ZKUtil.createWithParents(this.zookeeper, this.peersZNode); 101 } 102 } catch (KeeperException e) { 103 throw new ReplicationException("Could not initialize replication peers", e); 104 } 105 addExistingPeers(); 106 } 107 108 @Override 109 public void registerPeer(String id, ReplicationPeerConfig peerConfig, boolean enabled) 110 throws ReplicationException { 111 try { 112 if (peerExists(id)) { 113 throw new IllegalArgumentException("Cannot add a peer with id=" + id 114 + " because that id already exists."); 115 } 116 117 if(id.contains("-")){ 118 throw new IllegalArgumentException("Found invalid peer name:" + id); 119 } 120 121 if (peerConfig.getClusterKey() != null) { 122 try { 123 ZKConfig.validateClusterKey(peerConfig.getClusterKey()); 124 } catch (IOException ioe) { 125 throw new IllegalArgumentException(ioe.getMessage()); 126 } 127 } 128 129 checkQueuesDeleted(id); 130 131 ZKUtil.createWithParents(this.zookeeper, this.peersZNode); 132 133 List<ZKUtilOp> listOfOps = new ArrayList<>(2); 134 ZKUtilOp op1 = 135 ZKUtilOp.createAndFailSilent(getPeerNode(id), 136 ReplicationPeerConfigUtil.toByteArray(peerConfig)); 137 ZKUtilOp op2 = 138 ZKUtilOp.createAndFailSilent(getPeerStateNode(id), enabled ? ENABLED_ZNODE_BYTES 139 : DISABLED_ZNODE_BYTES); 140 listOfOps.add(op1); 141 listOfOps.add(op2); 142 ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false); 143 } catch (KeeperException e) { 144 throw new ReplicationException("Could not add peer with id=" + id + ", peerConfif=>" 145 + peerConfig + ", state=" + (enabled ? "ENABLED" : "DISABLED"), e); 146 } 147 } 148 149 @Override 150 public void unregisterPeer(String id) throws ReplicationException { 151 try { 152 if (!peerExists(id)) { 153 throw new IllegalArgumentException("Cannot remove peer with id=" + id 154 + " because that id does not exist."); 155 } 156 ZKUtil.deleteNodeRecursively(this.zookeeper, ZNodePaths.joinZNode(this.peersZNode, id)); 157 } catch (KeeperException e) { 158 throw new ReplicationException("Could not remove peer with id=" + id, e); 159 } 160 } 161 162 @Override 163 public void enablePeer(String id) throws ReplicationException { 164 changePeerState(id, ReplicationProtos.ReplicationState.State.ENABLED); 165 LOG.info("peer " + id + " is enabled"); 166 } 167 168 @Override 169 public void disablePeer(String id) throws ReplicationException { 170 changePeerState(id, ReplicationProtos.ReplicationState.State.DISABLED); 171 LOG.info("peer " + id + " is disabled"); 172 } 173 174 @Override 175 public Map<TableName, List<String>> getPeerTableCFsConfig(String id) throws ReplicationException { 176 try { 177 if (!peerExists(id)) { 178 throw new IllegalArgumentException("peer " + id + " doesn't exist"); 179 } 180 try { 181 ReplicationPeerConfig rpc = getReplicationPeerConfig(id); 182 if (rpc == null) { 183 throw new ReplicationException("Unable to get tableCFs of the peer with id=" + id); 184 } 185 return rpc.getTableCFsMap(); 186 } catch (Exception e) { 187 throw new ReplicationException(e); 188 } 189 } catch (KeeperException e) { 190 throw new ReplicationException("Unable to get tableCFs of the peer with id=" + id, e); 191 } 192 } 193 194 @Override 195 public void setPeerTableCFsConfig(String id, 196 Map<TableName, ? extends Collection<String>> tableCFs) 197 throws ReplicationException { 198 try { 199 if (!peerExists(id)) { 200 throw new IllegalArgumentException("Cannot set peer tableCFs because id=" + id 201 + " does not exist."); 202 } 203 ReplicationPeerConfig rpc = getReplicationPeerConfig(id); 204 if (rpc == null) { 205 throw new ReplicationException("Unable to get tableCFs of the peer with id=" + id); 206 } 207 rpc.setTableCFsMap(tableCFs); 208 ZKUtil.setData(this.zookeeper, getPeerNode(id), 209 ReplicationPeerConfigUtil.toByteArray(rpc)); 210 LOG.info("Peer tableCFs with id= " + id + " is now " + 211 ReplicationPeerConfigUtil.convertToString(tableCFs)); 212 } catch (KeeperException e) { 213 throw new ReplicationException("Unable to change tableCFs of the peer with id=" + id, e); 214 } 215 } 216 217 @Override 218 public boolean getStatusOfPeer(String id) { 219 ReplicationPeer replicationPeer = this.peerClusters.get(id); 220 if (replicationPeer == null) { 221 throw new IllegalArgumentException("Peer with id= " + id + " is not cached"); 222 } 223 return replicationPeer.getPeerState() == PeerState.ENABLED; 224 } 225 226 @Override 227 public boolean getStatusOfPeerFromBackingStore(String id) throws ReplicationException { 228 try { 229 if (!peerExists(id)) { 230 throw new IllegalArgumentException("peer " + id + " doesn't exist"); 231 } 232 String peerStateZNode = getPeerStateNode(id); 233 try { 234 return ReplicationPeerZKImpl.isStateEnabled(ZKUtil.getData(this.zookeeper, peerStateZNode)); 235 } catch (KeeperException e) { 236 throw new ReplicationException(e); 237 } catch (DeserializationException e) { 238 throw new ReplicationException(e); 239 } 240 } catch (KeeperException e) { 241 throw new ReplicationException("Unable to get status of the peer with id=" + id + 242 " from backing store", e); 243 } catch (InterruptedException e) { 244 throw new ReplicationException(e); 245 } 246 } 247 248 @Override 249 public Map<String, ReplicationPeerConfig> getAllPeerConfigs() { 250 Map<String, ReplicationPeerConfig> peers = new TreeMap<>(); 251 List<String> ids = null; 252 try { 253 ids = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode); 254 for (String id : ids) { 255 ReplicationPeerConfig peerConfig = getReplicationPeerConfig(id); 256 if (peerConfig == null) { 257 LOG.warn("Failed to get replication peer configuration of clusterid=" + id 258 + " znode content, continuing."); 259 continue; 260 } 261 peers.put(id, peerConfig); 262 } 263 } catch (KeeperException e) { 264 this.abortable.abort("Cannot get the list of peers ", e); 265 } catch (ReplicationException e) { 266 this.abortable.abort("Cannot get the list of peers ", e); 267 } 268 return peers; 269 } 270 271 @Override 272 public ReplicationPeer getConnectedPeer(String peerId) { 273 return peerClusters.get(peerId); 274 } 275 276 @Override 277 public Set<String> getConnectedPeerIds() { 278 return peerClusters.keySet(); // this is not thread-safe 279 } 280 281 /** 282 * Returns a ReplicationPeerConfig from the znode or null for the given peerId. 283 */ 284 @Override 285 public ReplicationPeerConfig getReplicationPeerConfig(String peerId) 286 throws ReplicationException { 287 String znode = getPeerNode(peerId); 288 byte[] data = null; 289 try { 290 data = ZKUtil.getData(this.zookeeper, znode); 291 } catch (InterruptedException e) { 292 LOG.warn("Could not get configuration for peer because the thread " + 293 "was interrupted. peerId=" + peerId); 294 Thread.currentThread().interrupt(); 295 return null; 296 } catch (KeeperException e) { 297 throw new ReplicationException("Error getting configuration for peer with id=" 298 + peerId, e); 299 } 300 if (data == null) { 301 LOG.error("Could not get configuration for peer because it doesn't exist. peerId=" + peerId); 302 return null; 303 } 304 305 try { 306 return ReplicationPeerConfigUtil.parsePeerFrom(data); 307 } catch (DeserializationException e) { 308 LOG.warn("Failed to parse cluster key from peerId=" + peerId 309 + ", specifically the content from the following znode: " + znode); 310 return null; 311 } 312 } 313 314 @Override 315 public Pair<ReplicationPeerConfig, Configuration> getPeerConf(String peerId) 316 throws ReplicationException { 317 ReplicationPeerConfig peerConfig = getReplicationPeerConfig(peerId); 318 319 if (peerConfig == null) { 320 return null; 321 } 322 323 Configuration otherConf; 324 try { 325 otherConf = HBaseConfiguration.createClusterConf(this.conf, peerConfig.getClusterKey()); 326 } catch (IOException e) { 327 LOG.error("Can't get peer configuration for peerId=" + peerId + " because:", e); 328 return null; 329 } 330 331 if (!peerConfig.getConfiguration().isEmpty()) { 332 CompoundConfiguration compound = new CompoundConfiguration(); 333 compound.add(otherConf); 334 compound.addStringMap(peerConfig.getConfiguration()); 335 return new Pair<>(peerConfig, compound); 336 } 337 338 return new Pair<>(peerConfig, otherConf); 339 } 340 341 @Override 342 public void updatePeerConfig(String id, ReplicationPeerConfig newConfig) 343 throws ReplicationException { 344 ReplicationPeer peer = getConnectedPeer(id); 345 if (peer == null){ 346 throw new ReplicationException("Could not find peer Id " + id + " in connected peers"); 347 } 348 ReplicationPeerConfig existingConfig = peer.getPeerConfig(); 349 if (!isStringEquals(newConfig.getClusterKey(), existingConfig.getClusterKey())) { 350 throw new ReplicationException( 351 "Changing the cluster key on an existing peer is not allowed." + " Existing key '" + 352 existingConfig.getClusterKey() + "' does not match new key '" + 353 newConfig.getClusterKey() + "'"); 354 } 355 if (!isStringEquals(newConfig.getReplicationEndpointImpl(), 356 existingConfig.getReplicationEndpointImpl())) { 357 throw new ReplicationException("Changing the replication endpoint implementation class " + 358 "on an existing peer is not allowed. Existing class '" + 359 existingConfig.getReplicationEndpointImpl() + "' does not match new class '" + 360 newConfig.getReplicationEndpointImpl() + "'"); 361 } 362 363 // Update existingConfig's peer config and peer data with the new values, but don't touch config 364 // or data that weren't explicitly changed 365 ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(existingConfig); 366 builder.putAllConfiguration(newConfig.getConfiguration()) 367 .putAllPeerData(newConfig.getPeerData()) 368 .setReplicateAllUserTables(newConfig.replicateAllUserTables()) 369 .setNamespaces(newConfig.getNamespaces()).setTableCFsMap(newConfig.getTableCFsMap()) 370 .setExcludeNamespaces(newConfig.getExcludeNamespaces()) 371 .setExcludeTableCFsMap(newConfig.getExcludeTableCFsMap()) 372 .setBandwidth(newConfig.getBandwidth()); 373 374 try { 375 ZKUtil.setData(this.zookeeper, getPeerNode(id), 376 ReplicationPeerConfigUtil.toByteArray(builder.build())); 377 } 378 catch(KeeperException ke){ 379 throw new ReplicationException("There was a problem trying to save changes to the " + 380 "replication peer " + id, ke); 381 } 382 } 383 384 /** 385 * List all registered peer clusters and set a watch on their znodes. 386 */ 387 @Override 388 public List<String> getAllPeerIds() { 389 List<String> ids = null; 390 try { 391 ids = ZKUtil.listChildrenAndWatchThem(this.zookeeper, this.peersZNode); 392 } catch (KeeperException e) { 393 this.abortable.abort("Cannot get the list of peers ", e); 394 } 395 return ids; 396 } 397 398 /** 399 * A private method used during initialization. This method attempts to add all registered 400 * peer clusters. This method does not set a watch on the peer cluster znodes. 401 */ 402 private void addExistingPeers() throws ReplicationException { 403 List<String> znodes = null; 404 try { 405 znodes = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode); 406 } catch (KeeperException e) { 407 throw new ReplicationException("Error getting the list of peer clusters.", e); 408 } 409 if (znodes != null) { 410 for (String z : znodes) { 411 createAndAddPeer(z); 412 } 413 } 414 } 415 416 @Override 417 public boolean peerConnected(String peerId) throws ReplicationException { 418 return createAndAddPeer(peerId); 419 } 420 421 @Override 422 public void peerDisconnected(String peerId) { 423 ReplicationPeer rp = this.peerClusters.get(peerId); 424 if (rp != null) { 425 ((ConcurrentMap<String, ReplicationPeerZKImpl>) peerClusters).remove(peerId, rp); 426 } 427 } 428 429 /** 430 * Attempt to connect to a new remote slave cluster. 431 * @param peerId a short that identifies the cluster 432 * @return true if a new connection was made, false if no new connection was made. 433 */ 434 public boolean createAndAddPeer(String peerId) throws ReplicationException { 435 if (peerClusters == null) { 436 return false; 437 } 438 if (this.peerClusters.containsKey(peerId)) { 439 return false; 440 } 441 442 ReplicationPeerZKImpl peer = null; 443 try { 444 peer = createPeer(peerId); 445 } catch (Exception e) { 446 throw new ReplicationException("Error adding peer with id=" + peerId, e); 447 } 448 if (peer == null) { 449 return false; 450 } 451 ReplicationPeerZKImpl previous = 452 ((ConcurrentMap<String, ReplicationPeerZKImpl>) peerClusters).putIfAbsent(peerId, peer); 453 if (previous == null) { 454 LOG.info("Added peer cluster=" + peer.getPeerConfig().getClusterKey()); 455 } else { 456 LOG.info("Peer already present, " + previous.getPeerConfig().getClusterKey() + 457 ", new cluster=" + peer.getPeerConfig().getClusterKey()); 458 } 459 return true; 460 } 461 462 /** 463 * Update the state znode of a peer cluster. 464 * @param id 465 * @param state 466 */ 467 private void changePeerState(String id, ReplicationProtos.ReplicationState.State state) 468 throws ReplicationException { 469 try { 470 if (!peerExists(id)) { 471 throw new IllegalArgumentException("Cannot enable/disable peer because id=" + id 472 + " does not exist."); 473 } 474 String peerStateZNode = getPeerStateNode(id); 475 byte[] stateBytes = 476 (state == ReplicationProtos.ReplicationState.State.ENABLED) ? ENABLED_ZNODE_BYTES 477 : DISABLED_ZNODE_BYTES; 478 if (ZKUtil.checkExists(this.zookeeper, peerStateZNode) != -1) { 479 ZKUtil.setData(this.zookeeper, peerStateZNode, stateBytes); 480 } else { 481 ZKUtil.createAndWatch(this.zookeeper, peerStateZNode, stateBytes); 482 } 483 LOG.info("Peer with id= " + id + " is now " + state.name()); 484 } catch (KeeperException e) { 485 throw new ReplicationException("Unable to change state of the peer with id=" + id, e); 486 } 487 } 488 489 /** 490 * Helper method to connect to a peer 491 * @param peerId peer's identifier 492 * @return object representing the peer 493 * @throws ReplicationException 494 */ 495 private ReplicationPeerZKImpl createPeer(String peerId) throws ReplicationException { 496 Pair<ReplicationPeerConfig, Configuration> pair = getPeerConf(peerId); 497 if (pair == null) { 498 return null; 499 } 500 Configuration peerConf = pair.getSecond(); 501 502 ReplicationPeerZKImpl peer = new ReplicationPeerZKImpl(zookeeper, 503 peerConf, peerId, pair.getFirst(), abortable); 504 try { 505 peer.startStateTracker(this.getPeerStateNode(peerId)); 506 } catch (KeeperException e) { 507 throw new ReplicationException("Error starting the peer state tracker for peerId=" + 508 peerId, e); 509 } 510 511 try { 512 peer.startPeerConfigTracker(this.getPeerNode(peerId)); 513 } catch (KeeperException e) { 514 throw new ReplicationException("Error starting the peer tableCFs tracker for peerId=" + 515 peerId, e); 516 } 517 518 return peer; 519 } 520 521 private void checkQueuesDeleted(String peerId) throws ReplicationException { 522 if (queuesClient == null) return; 523 try { 524 List<String> replicators = queuesClient.getListOfReplicators(); 525 if (replicators == null || replicators.isEmpty()) { 526 return; 527 } 528 for (String replicator : replicators) { 529 List<String> queueIds = queuesClient.getAllQueues(replicator); 530 for (String queueId : queueIds) { 531 ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); 532 if (queueInfo.getPeerId().equals(peerId)) { 533 throw new ReplicationException("undeleted queue for peerId: " + peerId 534 + ", replicator: " + replicator + ", queueId: " + queueId); 535 } 536 } 537 } 538 // Check for hfile-refs queue 539 if (-1 != ZKUtil.checkExists(zookeeper, hfileRefsZNode) 540 && queuesClient.getAllPeersFromHFileRefsQueue().contains(peerId)) { 541 throw new ReplicationException("Undeleted queue for peerId: " + peerId 542 + ", found in hfile-refs node path " + hfileRefsZNode); 543 } 544 } catch (KeeperException e) { 545 throw new ReplicationException("Could not check queues deleted with id=" + peerId, e); 546 } 547 } 548 549 /** 550 * For replication peer cluster key or endpoint class, null and empty string is same. So here 551 * don't use {@link StringUtils#equals(CharSequence, CharSequence)} directly. 552 */ 553 private boolean isStringEquals(String s1, String s2) { 554 if (StringUtils.isBlank(s1)) { 555 return StringUtils.isBlank(s2); 556 } 557 return s1.equals(s2); 558 } 559}