001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.replication; 019 020import com.google.errorprone.annotations.RestrictedApi; 021import java.io.IOException; 022import java.net.URI; 023import java.util.ArrayList; 024import java.util.Collection; 025import java.util.EnumSet; 026import java.util.HashMap; 027import java.util.List; 028import java.util.Map; 029import java.util.Optional; 030import java.util.Set; 031import java.util.concurrent.CompletableFuture; 032import java.util.concurrent.ConcurrentHashMap; 033import java.util.concurrent.ConcurrentMap; 034import java.util.concurrent.ExecutorService; 035import java.util.concurrent.TimeUnit; 036import java.util.regex.Pattern; 037import java.util.stream.Collectors; 038import org.apache.commons.lang3.StringUtils; 039import org.apache.hadoop.conf.Configuration; 040import org.apache.hadoop.fs.FileSystem; 041import org.apache.hadoop.fs.Path; 042import org.apache.hadoop.hbase.ClusterMetrics; 043import org.apache.hadoop.hbase.DoNotRetryIOException; 044import org.apache.hadoop.hbase.HBaseConfiguration; 045import org.apache.hadoop.hbase.ReplicationPeerNotFoundException; 046import org.apache.hadoop.hbase.ServerName; 047import org.apache.hadoop.hbase.TableName; 048import org.apache.hadoop.hbase.client.Admin; 049import org.apache.hadoop.hbase.client.Connection; 050import org.apache.hadoop.hbase.client.ConnectionFactory; 051import org.apache.hadoop.hbase.client.ConnectionRegistryFactory; 052import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; 053import org.apache.hadoop.hbase.conf.ConfigurationObserver; 054import org.apache.hadoop.hbase.master.MasterServices; 055import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 056import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait; 057import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; 058import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint; 059import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint; 060import org.apache.hadoop.hbase.replication.ReplicationEndpoint; 061import org.apache.hadoop.hbase.replication.ReplicationException; 062import org.apache.hadoop.hbase.replication.ReplicationGroupOffset; 063import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 064import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder; 065import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; 066import org.apache.hadoop.hbase.replication.ReplicationPeerStorage; 067import org.apache.hadoop.hbase.replication.ReplicationQueueData; 068import org.apache.hadoop.hbase.replication.ReplicationQueueId; 069import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 070import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; 071import org.apache.hadoop.hbase.replication.ReplicationUtils; 072import org.apache.hadoop.hbase.replication.SyncReplicationState; 073import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration; 074import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.MigrationIterator; 075import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.ZkLastPushedSeqId; 076import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.ZkReplicationQueueData; 077import org.apache.hadoop.hbase.util.FutureUtils; 078import org.apache.hadoop.hbase.util.Pair; 079import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 080import org.apache.hadoop.hbase.zookeeper.ZKClusterId; 081import org.apache.hadoop.hbase.zookeeper.ZKConfig; 082import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 083import org.apache.yetus.audience.InterfaceAudience; 084import org.apache.zookeeper.KeeperException; 085import org.slf4j.Logger; 086import org.slf4j.LoggerFactory; 087 088import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 089import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 090 091/** 092 * Manages and performs all replication admin operations. 093 * <p> 094 * Used to add/remove a replication peer. 095 * <p> 096 * Implement {@link ConfigurationObserver} mainly for recreating {@link ReplicationPeerStorage}, for 097 * supporting migrating across different replication peer storages without restarting master. 098 */ 099@InterfaceAudience.Private 100public class ReplicationPeerManager implements ConfigurationObserver { 101 102 private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeerManager.class); 103 104 private volatile ReplicationPeerStorage peerStorage; 105 106 private final ReplicationQueueStorage queueStorage; 107 108 private final ConcurrentMap<String, ReplicationPeerDescription> peers; 109 110 private final ImmutableMap<SyncReplicationState, 111 EnumSet<SyncReplicationState>> allowedTransition = 112 Maps.immutableEnumMap(ImmutableMap.of(SyncReplicationState.ACTIVE, 113 EnumSet.of(SyncReplicationState.DOWNGRADE_ACTIVE, SyncReplicationState.STANDBY), 114 SyncReplicationState.STANDBY, EnumSet.of(SyncReplicationState.DOWNGRADE_ACTIVE), 115 SyncReplicationState.DOWNGRADE_ACTIVE, 116 EnumSet.of(SyncReplicationState.STANDBY, SyncReplicationState.ACTIVE))); 117 118 private final String clusterId; 119 120 private volatile Configuration conf; 121 122 // for dynamic recreating ReplicationPeerStorage. 123 private final FileSystem fs; 124 125 private final ZKWatcher zk; 126 127 @FunctionalInterface 128 interface ReplicationQueueStorageInitializer { 129 130 void initialize() throws IOException; 131 } 132 133 private final ReplicationQueueStorageInitializer queueStorageInitializer; 134 135 // we will mock this class in UT so leave the constructor as package private and not mark the 136 // class as final, since mockito can not mock a final class 137 ReplicationPeerManager(FileSystem fs, ZKWatcher zk, ReplicationPeerStorage peerStorage, 138 ReplicationQueueStorage queueStorage, ConcurrentMap<String, ReplicationPeerDescription> peers, 139 Configuration conf, String clusterId, 140 ReplicationQueueStorageInitializer queueStorageInitializer) { 141 this.fs = fs; 142 this.zk = zk; 143 this.peerStorage = peerStorage; 144 this.queueStorage = queueStorage; 145 this.peers = peers; 146 this.conf = conf; 147 this.clusterId = clusterId; 148 this.queueStorageInitializer = queueStorageInitializer; 149 } 150 151 private void checkQueuesDeleted(String peerId) 152 throws ReplicationException, DoNotRetryIOException { 153 List<ReplicationQueueId> queueIds = queueStorage.listAllQueueIds(peerId); 154 if (!queueIds.isEmpty()) { 155 throw new DoNotRetryIOException("There are still " + queueIds.size() 156 + " undeleted queue(s) for peerId: " + peerId + ", first is " + queueIds.get(0)); 157 } 158 if (queueStorage.getAllPeersFromHFileRefsQueue().contains(peerId)) { 159 throw new DoNotRetryIOException("Undeleted queue for peer " + peerId + " in hfile-refs"); 160 } 161 } 162 163 private void initializeQueueStorage() throws IOException { 164 queueStorageInitializer.initialize(); 165 } 166 167 void preAddPeer(String peerId, ReplicationPeerConfig peerConfig) 168 throws ReplicationException, IOException { 169 if (peerId.contains("-")) { 170 throw new DoNotRetryIOException("Found invalid peer name: " + peerId); 171 } 172 checkPeerConfig(peerConfig); 173 if (peerConfig.isSyncReplication()) { 174 checkSyncReplicationPeerConfigConflict(peerConfig); 175 } 176 if (peers.containsKey(peerId)) { 177 throw new DoNotRetryIOException("Replication peer " + peerId + " already exists"); 178 } 179 180 // lazy create table 181 initializeQueueStorage(); 182 // make sure that there is no queues with the same peer id. This may happen when we create a 183 // peer with the same id with a old deleted peer. If the replication queues for the old peer 184 // have not been cleaned up yet then we should not create the new peer, otherwise the old wal 185 // file may also be replicated. 186 checkQueuesDeleted(peerId); 187 } 188 189 private ReplicationPeerDescription checkPeerExists(String peerId) throws DoNotRetryIOException { 190 ReplicationPeerDescription desc = peers.get(peerId); 191 if (desc == null) { 192 throw new ReplicationPeerNotFoundException(peerId); 193 } 194 return desc; 195 } 196 197 private void checkPeerInDAStateIfSyncReplication(String peerId) throws DoNotRetryIOException { 198 ReplicationPeerDescription desc = peers.get(peerId); 199 if ( 200 desc != null && desc.getPeerConfig().isSyncReplication() 201 && !SyncReplicationState.DOWNGRADE_ACTIVE.equals(desc.getSyncReplicationState()) 202 ) { 203 throw new DoNotRetryIOException( 204 "Couldn't remove synchronous replication peer with state=" + desc.getSyncReplicationState() 205 + ", Transit the synchronous replication state to be DOWNGRADE_ACTIVE firstly."); 206 } 207 } 208 209 ReplicationPeerConfig preRemovePeer(String peerId) throws DoNotRetryIOException { 210 ReplicationPeerDescription pd = checkPeerExists(peerId); 211 checkPeerInDAStateIfSyncReplication(peerId); 212 return pd.getPeerConfig(); 213 } 214 215 void preEnablePeer(String peerId) throws DoNotRetryIOException { 216 ReplicationPeerDescription desc = checkPeerExists(peerId); 217 if (desc.isEnabled()) { 218 throw new DoNotRetryIOException("Replication peer " + peerId + " has already been enabled"); 219 } 220 } 221 222 void preDisablePeer(String peerId) throws DoNotRetryIOException { 223 ReplicationPeerDescription desc = checkPeerExists(peerId); 224 if (!desc.isEnabled()) { 225 throw new DoNotRetryIOException("Replication peer " + peerId + " has already been disabled"); 226 } 227 } 228 229 /** 230 * Return the old peer description. Can never be null. 231 */ 232 ReplicationPeerDescription preUpdatePeerConfig(String peerId, ReplicationPeerConfig peerConfig) 233 throws DoNotRetryIOException { 234 checkPeerConfig(peerConfig); 235 ReplicationPeerDescription desc = checkPeerExists(peerId); 236 ReplicationPeerConfig oldPeerConfig = desc.getPeerConfig(); 237 if (!isStringEquals(peerConfig.getClusterKey(), oldPeerConfig.getClusterKey())) { 238 throw new DoNotRetryIOException( 239 "Changing the cluster key on an existing peer is not allowed. Existing key '" 240 + oldPeerConfig.getClusterKey() + "' for peer " + peerId + " does not match new key '" 241 + peerConfig.getClusterKey() + "'"); 242 } 243 244 if ( 245 !isStringEquals(peerConfig.getReplicationEndpointImpl(), 246 oldPeerConfig.getReplicationEndpointImpl()) 247 ) { 248 throw new DoNotRetryIOException("Changing the replication endpoint implementation class " 249 + "on an existing peer is not allowed. Existing class '" 250 + oldPeerConfig.getReplicationEndpointImpl() + "' for peer " + peerId 251 + " does not match new class '" + peerConfig.getReplicationEndpointImpl() + "'"); 252 } 253 254 if (!isStringEquals(peerConfig.getRemoteWALDir(), oldPeerConfig.getRemoteWALDir())) { 255 throw new DoNotRetryIOException( 256 "Changing the remote wal dir on an existing peer is not allowed. Existing remote wal " 257 + "dir '" + oldPeerConfig.getRemoteWALDir() + "' for peer " + peerId 258 + " does not match new remote wal dir '" + peerConfig.getRemoteWALDir() + "'"); 259 } 260 261 if (oldPeerConfig.isSyncReplication()) { 262 if (!ReplicationUtils.isNamespacesAndTableCFsEqual(oldPeerConfig, peerConfig)) { 263 throw new DoNotRetryIOException( 264 "Changing the replicated namespace/table config on a synchronous replication " 265 + "peer(peerId: " + peerId + ") is not allowed."); 266 } 267 } 268 return desc; 269 } 270 271 /** Returns the old desciption of the peer */ 272 ReplicationPeerDescription preTransitPeerSyncReplicationState(String peerId, 273 SyncReplicationState state) throws DoNotRetryIOException { 274 ReplicationPeerDescription desc = checkPeerExists(peerId); 275 SyncReplicationState fromState = desc.getSyncReplicationState(); 276 EnumSet<SyncReplicationState> allowedToStates = allowedTransition.get(fromState); 277 if (allowedToStates == null || !allowedToStates.contains(state)) { 278 throw new DoNotRetryIOException("Can not transit current cluster state from " + fromState 279 + " to " + state + " for peer id=" + peerId); 280 } 281 return desc; 282 } 283 284 public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) 285 throws ReplicationException { 286 if (peers.containsKey(peerId)) { 287 // this should be a retry, just return 288 return; 289 } 290 peerConfig = ReplicationPeerConfigUtil.updateReplicationBasePeerConfigs(conf, peerConfig); 291 ReplicationPeerConfig copiedPeerConfig = ReplicationPeerConfig.newBuilder(peerConfig).build(); 292 SyncReplicationState syncReplicationState = copiedPeerConfig.isSyncReplication() 293 ? SyncReplicationState.DOWNGRADE_ACTIVE 294 : SyncReplicationState.NONE; 295 peerStorage.addPeer(peerId, copiedPeerConfig, enabled, syncReplicationState); 296 peers.put(peerId, 297 new ReplicationPeerDescription(peerId, enabled, copiedPeerConfig, syncReplicationState)); 298 } 299 300 public void removePeer(String peerId) throws ReplicationException { 301 if (!peers.containsKey(peerId)) { 302 // this should be a retry, just return 303 return; 304 } 305 peerStorage.removePeer(peerId); 306 peers.remove(peerId); 307 } 308 309 private void setPeerState(String peerId, boolean enabled) throws ReplicationException { 310 ReplicationPeerDescription desc = peers.get(peerId); 311 if (desc.isEnabled() == enabled) { 312 // this should be a retry, just return 313 return; 314 } 315 peerStorage.setPeerState(peerId, enabled); 316 peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, desc.getPeerConfig(), 317 desc.getSyncReplicationState())); 318 } 319 320 public boolean getPeerState(String peerId) throws ReplicationException { 321 ReplicationPeerDescription desc = peers.get(peerId); 322 if (desc != null) { 323 return desc.isEnabled(); 324 } else { 325 throw new ReplicationException("Replication Peer of " + peerId + " does not exist."); 326 } 327 } 328 329 public void enablePeer(String peerId) throws ReplicationException { 330 setPeerState(peerId, true); 331 } 332 333 public void disablePeer(String peerId) throws ReplicationException { 334 setPeerState(peerId, false); 335 } 336 337 public void updatePeerConfig(String peerId, ReplicationPeerConfig peerConfig) 338 throws ReplicationException { 339 // the checking rules are too complicated here so we give up checking whether this is a retry. 340 ReplicationPeerDescription desc = peers.get(peerId); 341 ReplicationPeerConfig oldPeerConfig = desc.getPeerConfig(); 342 ReplicationPeerConfigBuilder newPeerConfigBuilder = 343 ReplicationPeerConfig.newBuilder(peerConfig); 344 // we need to use the new conf to overwrite the old one. 345 newPeerConfigBuilder.putAllConfiguration(oldPeerConfig.getConfiguration()); 346 newPeerConfigBuilder.putAllConfiguration(peerConfig.getConfiguration()); 347 ReplicationPeerConfig newPeerConfig = newPeerConfigBuilder.build(); 348 peerStorage.updatePeerConfig(peerId, newPeerConfig); 349 peers.put(peerId, new ReplicationPeerDescription(peerId, desc.isEnabled(), newPeerConfig, 350 desc.getSyncReplicationState())); 351 } 352 353 public List<ReplicationPeerDescription> listPeers(Pattern pattern) { 354 if (pattern == null) { 355 return new ArrayList<>(peers.values()); 356 } 357 return peers.values().stream().filter(r -> pattern.matcher(r.getPeerId()).matches()) 358 .collect(Collectors.toList()); 359 } 360 361 public Optional<ReplicationPeerConfig> getPeerConfig(String peerId) { 362 ReplicationPeerDescription desc = peers.get(peerId); 363 return desc != null ? Optional.of(desc.getPeerConfig()) : Optional.empty(); 364 } 365 366 void removeAllLastPushedSeqIds(String peerId) throws ReplicationException { 367 queueStorage.removeLastSequenceIds(peerId); 368 } 369 370 public void setPeerNewSyncReplicationState(String peerId, SyncReplicationState state) 371 throws ReplicationException { 372 peerStorage.setPeerNewSyncReplicationState(peerId, state); 373 } 374 375 public void transitPeerSyncReplicationState(String peerId, SyncReplicationState newState) 376 throws ReplicationException { 377 if (peerStorage.getPeerNewSyncReplicationState(peerId) != SyncReplicationState.NONE) { 378 // Only transit if this is not a retry 379 peerStorage.transitPeerSyncReplicationState(peerId); 380 } 381 ReplicationPeerDescription desc = peers.get(peerId); 382 if (desc.getSyncReplicationState() != newState) { 383 // Only recreate the desc if this is not a retry 384 peers.put(peerId, 385 new ReplicationPeerDescription(peerId, desc.isEnabled(), desc.getPeerConfig(), newState)); 386 } 387 } 388 389 public void removeAllQueues(String peerId) throws ReplicationException { 390 // Here we need two passes to address the problem of claimQueue. Maybe a claimQueue is still 391 // on-going when the refresh peer config procedure is done, if a RS which has already been 392 // scanned claims the queue of a RS which has not been scanned yet, we will miss that queue in 393 // the scan here, and if the RS who has claimed the queue crashed before creating recovered 394 // source, then the queue will leave there until the another RS detects the crash and helps 395 // removing the queue. 396 // A two pass scan can solve the problem. Anyway, the queue will not disappear during the 397 // claiming, it will either under the old RS or under the new RS, and a queue can only be 398 // claimed once after the refresh peer procedure done(as the next claim queue will just delete 399 // it), so we can make sure that a two pass scan will finally find the queue and remove it, 400 // unless it has already been removed by others. 401 queueStorage.removeAllQueues(peerId); 402 queueStorage.removeAllQueues(peerId); 403 } 404 405 public void removeAllQueuesAndHFileRefs(String peerId) throws ReplicationException { 406 removeAllQueues(peerId); 407 queueStorage.removePeerFromHFileRefs(peerId); 408 } 409 410 private void checkClusterKey(String clusterKey, ReplicationEndpoint endpoint) 411 throws DoNotRetryIOException { 412 if (endpoint != null && !(endpoint instanceof HBaseReplicationEndpoint)) { 413 return; 414 } 415 // Endpoints implementing HBaseReplicationEndpoint need to check cluster key 416 URI connectionUri = ConnectionRegistryFactory.tryParseAsConnectionURI(clusterKey); 417 try { 418 if (connectionUri != null) { 419 ConnectionRegistryFactory.validate(connectionUri); 420 } else { 421 ZKConfig.validateClusterKey(clusterKey); 422 } 423 } catch (IOException e) { 424 throw new DoNotRetryIOException("Invalid cluster key: " + clusterKey, e); 425 } 426 if (endpoint != null && endpoint.canReplicateToSameCluster()) { 427 return; 428 } 429 // make sure we do not replicate to same cluster 430 String peerClusterId; 431 try { 432 if (connectionUri != null) { 433 // fetch cluster id through standard admin API 434 try (Connection conn = ConnectionFactory.createConnection(connectionUri, conf); 435 Admin admin = conn.getAdmin()) { 436 peerClusterId = 437 admin.getClusterMetrics(EnumSet.of(ClusterMetrics.Option.CLUSTER_ID)).getClusterId(); 438 } 439 } else { 440 // Create the peer cluster config for get peer cluster id 441 Configuration peerConf = HBaseConfiguration.createClusterConf(conf, clusterKey); 442 try (ZKWatcher zkWatcher = new ZKWatcher(peerConf, this + "check-peer-cluster-id", null)) { 443 peerClusterId = ZKClusterId.readClusterIdZNode(zkWatcher); 444 } 445 } 446 } catch (IOException | KeeperException e) { 447 // we just want to check whether we will replicate to the same cluster, so if we get an error 448 // while getting the cluster id of the peer cluster, it means we are not connecting to 449 // ourselves, as we are still alive. So here we just log the error and continue 450 LOG.warn("Can't get peerClusterId for clusterKey=" + clusterKey, e); 451 return; 452 } 453 // In rare case, zookeeper setting may be messed up. That leads to the incorrect 454 // peerClusterId value, which is the same as the source clusterId 455 if (clusterId.equals(peerClusterId)) { 456 throw new DoNotRetryIOException("Invalid cluster key: " + clusterKey 457 + ", should not replicate to itself for HBaseInterClusterReplicationEndpoint"); 458 } 459 } 460 461 private void checkPeerConfig(ReplicationPeerConfig peerConfig) throws DoNotRetryIOException { 462 String replicationEndpointImpl = peerConfig.getReplicationEndpointImpl(); 463 ReplicationEndpoint endpoint = null; 464 if (!StringUtils.isBlank(replicationEndpointImpl)) { 465 try { 466 // try creating a instance 467 endpoint = Class.forName(replicationEndpointImpl).asSubclass(ReplicationEndpoint.class) 468 .getDeclaredConstructor().newInstance(); 469 } catch (Throwable e) { 470 throw new DoNotRetryIOException( 471 "Can not instantiate configured replication endpoint class=" + replicationEndpointImpl, 472 e); 473 } 474 } 475 checkClusterKey(peerConfig.getClusterKey(), endpoint); 476 477 if (peerConfig.replicateAllUserTables()) { 478 // If replicate_all flag is true, it means all user tables will be replicated to peer cluster. 479 // Then allow config exclude namespaces or exclude table-cfs which can't be replicated to peer 480 // cluster. 481 if ( 482 (peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty()) 483 || (peerConfig.getTableCFsMap() != null && !peerConfig.getTableCFsMap().isEmpty()) 484 ) { 485 throw new DoNotRetryIOException("Need clean namespaces or table-cfs config firstly " 486 + "when you want replicate all cluster"); 487 } 488 checkNamespacesAndTableCfsConfigConflict(peerConfig.getExcludeNamespaces(), 489 peerConfig.getExcludeTableCFsMap()); 490 } else { 491 // If replicate_all flag is false, it means all user tables can't be replicated to peer 492 // cluster. Then allow to config namespaces or table-cfs which will be replicated to peer 493 // cluster. 494 if ( 495 (peerConfig.getExcludeNamespaces() != null && !peerConfig.getExcludeNamespaces().isEmpty()) 496 || (peerConfig.getExcludeTableCFsMap() != null 497 && !peerConfig.getExcludeTableCFsMap().isEmpty()) 498 ) { 499 throw new DoNotRetryIOException( 500 "Need clean exclude-namespaces or exclude-table-cfs config firstly" 501 + " when replicate_all flag is false"); 502 } 503 checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(), 504 peerConfig.getTableCFsMap()); 505 } 506 507 if (peerConfig.isSyncReplication()) { 508 checkPeerConfigForSyncReplication(peerConfig); 509 } 510 511 checkConfiguredWALEntryFilters(peerConfig); 512 } 513 514 private void checkPeerConfigForSyncReplication(ReplicationPeerConfig peerConfig) 515 throws DoNotRetryIOException { 516 // This is used to reduce the difficulty for implementing the sync replication state transition 517 // as we need to reopen all the related regions. 518 // TODO: Add namespace, replicat_all flag back 519 if (peerConfig.replicateAllUserTables()) { 520 throw new DoNotRetryIOException( 521 "Only support replicated table config for sync replication peer"); 522 } 523 if (peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty()) { 524 throw new DoNotRetryIOException( 525 "Only support replicated table config for sync replication peer"); 526 } 527 if (peerConfig.getTableCFsMap() == null || peerConfig.getTableCFsMap().isEmpty()) { 528 throw new DoNotRetryIOException("Need config replicated tables for sync replication peer"); 529 } 530 for (List<String> cfs : peerConfig.getTableCFsMap().values()) { 531 if (cfs != null && !cfs.isEmpty()) { 532 throw new DoNotRetryIOException( 533 "Only support replicated table config for sync replication peer"); 534 } 535 } 536 537 Path remoteWALDir = new Path(peerConfig.getRemoteWALDir()); 538 if (!remoteWALDir.isAbsolute()) { 539 throw new DoNotRetryIOException( 540 "The remote WAL directory " + peerConfig.getRemoteWALDir() + " is not absolute"); 541 } 542 URI remoteWALDirUri = remoteWALDir.toUri(); 543 if (remoteWALDirUri.getScheme() == null || remoteWALDirUri.getAuthority() == null) { 544 throw new DoNotRetryIOException("The remote WAL directory " + peerConfig.getRemoteWALDir() 545 + " is not qualified, you must provide scheme and authority"); 546 } 547 } 548 549 private void checkSyncReplicationPeerConfigConflict(ReplicationPeerConfig peerConfig) 550 throws DoNotRetryIOException { 551 for (TableName tableName : peerConfig.getTableCFsMap().keySet()) { 552 for (Map.Entry<String, ReplicationPeerDescription> entry : peers.entrySet()) { 553 ReplicationPeerConfig rpc = entry.getValue().getPeerConfig(); 554 if (rpc.isSyncReplication() && rpc.getTableCFsMap().containsKey(tableName)) { 555 throw new DoNotRetryIOException( 556 "Table " + tableName + " has been replicated by peer " + entry.getKey()); 557 } 558 } 559 } 560 } 561 562 /** 563 * Set a namespace in the peer config means that all tables in this namespace will be replicated 564 * to the peer cluster. 565 * <ol> 566 * <li>If peer config already has a namespace, then not allow set any table of this namespace to 567 * the peer config.</li> 568 * <li>If peer config already has a table, then not allow set this table's namespace to the peer 569 * config.</li> 570 * </ol> 571 * <p> 572 * Set a exclude namespace in the peer config means that all tables in this namespace can't be 573 * replicated to the peer cluster. 574 * <ol> 575 * <li>If peer config already has a exclude namespace, then not allow set any exclude table of 576 * this namespace to the peer config.</li> 577 * <li>If peer config already has a exclude table, then not allow set this table's namespace as a 578 * exclude namespace.</li> 579 * </ol> 580 */ 581 private void checkNamespacesAndTableCfsConfigConflict(Set<String> namespaces, 582 Map<TableName, ? extends Collection<String>> tableCfs) throws DoNotRetryIOException { 583 if (namespaces == null || namespaces.isEmpty()) { 584 return; 585 } 586 if (tableCfs == null || tableCfs.isEmpty()) { 587 return; 588 } 589 for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) { 590 TableName table = entry.getKey(); 591 if (namespaces.contains(table.getNamespaceAsString())) { 592 throw new DoNotRetryIOException("Table-cfs " + table + " is conflict with namespaces " 593 + table.getNamespaceAsString() + " in peer config"); 594 } 595 } 596 } 597 598 private void checkConfiguredWALEntryFilters(ReplicationPeerConfig peerConfig) 599 throws DoNotRetryIOException { 600 String filterCSV = peerConfig.getConfiguration() 601 .get(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY); 602 if (filterCSV != null && !filterCSV.isEmpty()) { 603 String[] filters = filterCSV.split(","); 604 for (String filter : filters) { 605 try { 606 Class.forName(filter).getDeclaredConstructor().newInstance(); 607 } catch (Exception e) { 608 throw new DoNotRetryIOException("Configured WALEntryFilter " + filter 609 + " could not be created. Failing add/update peer operation.", e); 610 } 611 } 612 } 613 } 614 615 public List<String> getSerialPeerIdsBelongsTo(TableName tableName) { 616 return peers.values().stream().filter(p -> p.getPeerConfig().isSerial()) 617 .filter(p -> p.getPeerConfig().needToReplicate(tableName)).map(p -> p.getPeerId()) 618 .collect(Collectors.toList()); 619 } 620 621 @RestrictedApi(explanation = "Should only be called in tests", link = "", 622 allowedOnPath = ".*/src/test/.*") 623 public ReplicationPeerStorage getPeerStorage() { 624 return peerStorage; 625 } 626 627 public ReplicationQueueStorage getQueueStorage() { 628 return queueStorage; 629 } 630 631 private static Pair<ReplicationQueueStorage, ReplicationQueueStorageInitializer> 632 createReplicationQueueStorage(MasterServices services) throws IOException { 633 Configuration conf = services.getConfiguration(); 634 TableName replicationQueueTableName = 635 TableName.valueOf(conf.get(ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME, 636 ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME_DEFAULT.getNameAsString())); 637 ReplicationQueueStorageInitializer initializer; 638 if (services.getTableDescriptors().exists(replicationQueueTableName)) { 639 // no need to create the table 640 initializer = () -> { 641 }; 642 } else { 643 // lazy create the replication table. 644 initializer = new ReplicationQueueStorageInitializer() { 645 646 private volatile boolean created = false; 647 648 @Override 649 public void initialize() throws IOException { 650 if (created) { 651 return; 652 } 653 synchronized (this) { 654 if (created) { 655 return; 656 } 657 if (services.getTableDescriptors().exists(replicationQueueTableName)) { 658 created = true; 659 return; 660 } 661 long procId = services.createSystemTable(ReplicationStorageFactory 662 .createReplicationQueueTableDescriptor(replicationQueueTableName)); 663 ProcedureExecutor<MasterProcedureEnv> procExec = services.getMasterProcedureExecutor(); 664 ProcedureSyncWait.waitFor(procExec.getEnvironment(), TimeUnit.MINUTES.toMillis(1), 665 "Creating table " + replicationQueueTableName, () -> procExec.isFinished(procId)); 666 } 667 } 668 }; 669 } 670 return Pair.newPair(ReplicationStorageFactory.getReplicationQueueStorage( 671 services.getConnection(), conf, replicationQueueTableName), initializer); 672 } 673 674 public static ReplicationPeerManager create(MasterServices services, String clusterId) 675 throws ReplicationException, IOException { 676 Configuration conf = services.getConfiguration(); 677 FileSystem fs = services.getMasterFileSystem().getFileSystem(); 678 ZKWatcher zk = services.getZooKeeper(); 679 ReplicationPeerStorage peerStorage = 680 ReplicationStorageFactory.getReplicationPeerStorage(fs, zk, conf); 681 Pair<ReplicationQueueStorage, ReplicationQueueStorageInitializer> pair = 682 createReplicationQueueStorage(services); 683 ReplicationQueueStorage queueStorage = pair.getFirst(); 684 ConcurrentMap<String, ReplicationPeerDescription> peers = new ConcurrentHashMap<>(); 685 for (String peerId : peerStorage.listPeerIds()) { 686 ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId); 687 if ( 688 ReplicationUtils.LEGACY_REGION_REPLICATION_ENDPOINT_NAME 689 .equals(peerConfig.getReplicationEndpointImpl()) 690 ) { 691 // we do not use this endpoint for region replication any more, see HBASE-26233 692 LOG.info("Legacy region replication peer found, removing: {}", peerConfig); 693 // do it asynchronous to not block the start up of HMaster 694 new Thread("Remove legacy replication peer " + peerId) { 695 696 @Override 697 public void run() { 698 try { 699 // need to delete two times to make sure we delete all the queues, see the comments in 700 // above 701 // removeAllQueues method for more details. 702 queueStorage.removeAllQueues(peerId); 703 queueStorage.removeAllQueues(peerId); 704 // delete queue first and then peer, because we use peer as a flag. 705 peerStorage.removePeer(peerId); 706 } catch (Exception e) { 707 LOG.warn("Failed to delete legacy replication peer {}", peerId); 708 } 709 } 710 }.start(); 711 continue; 712 } 713 peerConfig = ReplicationPeerConfigUtil.updateReplicationBasePeerConfigs(conf, peerConfig); 714 peerStorage.updatePeerConfig(peerId, peerConfig); 715 boolean enabled = peerStorage.isPeerEnabled(peerId); 716 SyncReplicationState state = peerStorage.getPeerSyncReplicationState(peerId); 717 peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, peerConfig, state)); 718 } 719 return new ReplicationPeerManager(fs, zk, peerStorage, queueStorage, peers, conf, clusterId, 720 pair.getSecond()); 721 } 722 723 /** 724 * For replication peer cluster key or endpoint class, null and empty string is same. So here 725 * don't use {@link StringUtils#equals(CharSequence, CharSequence)} directly. 726 */ 727 private boolean isStringEquals(String s1, String s2) { 728 if (StringUtils.isBlank(s1)) { 729 return StringUtils.isBlank(s2); 730 } 731 return s1.equals(s2); 732 } 733 734 @Override 735 public void onConfigurationChange(Configuration conf) { 736 this.conf = conf; 737 this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(fs, zk, conf); 738 } 739 740 private ReplicationQueueData convert(ZkReplicationQueueData zkData) { 741 Map<String, ReplicationGroupOffset> groupOffsets = new HashMap<>(); 742 zkData.getWalOffsets().forEach((wal, offset) -> { 743 String walGroup = AbstractFSWALProvider.getWALPrefixFromWALName(wal); 744 groupOffsets.compute(walGroup, (k, oldOffset) -> { 745 if (oldOffset == null) { 746 return new ReplicationGroupOffset(wal, offset); 747 } 748 // we should record the first wal's offset 749 long oldWalTs = AbstractFSWALProvider.getTimestamp(oldOffset.getWal()); 750 long walTs = AbstractFSWALProvider.getTimestamp(wal); 751 if (walTs < oldWalTs) { 752 return new ReplicationGroupOffset(wal, offset); 753 } 754 return oldOffset; 755 }); 756 }); 757 return new ReplicationQueueData(zkData.getQueueId(), ImmutableMap.copyOf(groupOffsets)); 758 } 759 760 private void migrateQueues(ZKReplicationQueueStorageForMigration oldQueueStorage) 761 throws Exception { 762 MigrationIterator<Pair<ServerName, List<ZkReplicationQueueData>>> iter = 763 oldQueueStorage.listAllQueues(); 764 for (;;) { 765 Pair<ServerName, List<ZkReplicationQueueData>> pair = iter.next(); 766 if (pair == null) { 767 return; 768 } 769 queueStorage.batchUpdateQueues(pair.getFirst(), 770 pair.getSecond().stream().filter(data -> peers.containsKey(data.getQueueId().getPeerId())) 771 .map(this::convert).collect(Collectors.toList())); 772 } 773 } 774 775 private void migrateLastPushedSeqIds(ZKReplicationQueueStorageForMigration oldQueueStorage) 776 throws Exception { 777 MigrationIterator<List<ZkLastPushedSeqId>> iter = oldQueueStorage.listAllLastPushedSeqIds(); 778 for (;;) { 779 List<ZkLastPushedSeqId> list = iter.next(); 780 if (list == null) { 781 return; 782 } 783 queueStorage.batchUpdateLastSequenceIds(list.stream() 784 .filter(data -> peers.containsKey(data.getPeerId())).collect(Collectors.toList())); 785 } 786 } 787 788 private void migrateHFileRefs(ZKReplicationQueueStorageForMigration oldQueueStorage) 789 throws Exception { 790 MigrationIterator<Pair<String, List<String>>> iter = oldQueueStorage.listAllHFileRefs(); 791 for (;;) { 792 Pair<String, List<String>> pair = iter.next(); 793 if (pair == null) { 794 return; 795 } 796 if (peers.containsKey(pair.getFirst())) { 797 queueStorage.batchUpdateHFileRefs(pair.getFirst(), pair.getSecond()); 798 } 799 } 800 } 801 802 private interface ExceptionalRunnable { 803 void run() throws Exception; 804 } 805 806 private CompletableFuture<?> runAsync(ExceptionalRunnable task, ExecutorService executor) { 807 CompletableFuture<?> future = new CompletableFuture<>(); 808 executor.execute(() -> { 809 try { 810 task.run(); 811 future.complete(null); 812 } catch (Exception e) { 813 future.completeExceptionally(e); 814 } 815 }); 816 return future; 817 } 818 819 /** 820 * Submit the migration tasks to the given {@code executor}. 821 */ 822 CompletableFuture<Void> migrateQueuesFromZk(ZKWatcher zookeeper, ExecutorService executor) { 823 // the replication queue table creation is asynchronous and will be triggered by addPeer, so 824 // here we need to manually initialize it since we will not call addPeer. 825 try { 826 initializeQueueStorage(); 827 } catch (IOException e) { 828 return FutureUtils.failedFuture(e); 829 } 830 ZKReplicationQueueStorageForMigration oldStorage = 831 new ZKReplicationQueueStorageForMigration(zookeeper, conf); 832 return CompletableFuture.allOf(runAsync(() -> migrateQueues(oldStorage), executor), 833 runAsync(() -> migrateLastPushedSeqIds(oldStorage), executor), 834 runAsync(() -> migrateHFileRefs(oldStorage), executor)); 835 } 836}