001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.replication; 019 020import com.google.errorprone.annotations.RestrictedApi; 021import java.io.IOException; 022import java.net.URI; 023import java.util.ArrayList; 024import java.util.Collection; 025import java.util.EnumSet; 026import java.util.HashMap; 027import java.util.List; 028import java.util.Map; 029import java.util.Optional; 030import java.util.Set; 031import java.util.concurrent.CompletableFuture; 032import java.util.concurrent.ConcurrentHashMap; 033import java.util.concurrent.ConcurrentMap; 034import java.util.concurrent.ExecutorService; 035import java.util.concurrent.TimeUnit; 036import java.util.regex.Pattern; 037import java.util.stream.Collectors; 038import org.apache.commons.lang3.StringUtils; 039import org.apache.hadoop.conf.Configuration; 040import org.apache.hadoop.fs.FileSystem; 041import org.apache.hadoop.fs.Path; 042import org.apache.hadoop.hbase.DoNotRetryIOException; 043import org.apache.hadoop.hbase.HBaseConfiguration; 044import org.apache.hadoop.hbase.ReplicationPeerNotFoundException; 045import org.apache.hadoop.hbase.ServerName; 046import org.apache.hadoop.hbase.TableName; 047import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; 048import org.apache.hadoop.hbase.conf.ConfigurationObserver; 049import org.apache.hadoop.hbase.master.MasterServices; 050import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 051import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait; 052import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; 053import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint; 054import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint; 055import org.apache.hadoop.hbase.replication.ReplicationEndpoint; 056import org.apache.hadoop.hbase.replication.ReplicationException; 057import org.apache.hadoop.hbase.replication.ReplicationGroupOffset; 058import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 059import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder; 060import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; 061import org.apache.hadoop.hbase.replication.ReplicationPeerStorage; 062import org.apache.hadoop.hbase.replication.ReplicationQueueData; 063import org.apache.hadoop.hbase.replication.ReplicationQueueId; 064import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 065import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; 066import org.apache.hadoop.hbase.replication.ReplicationUtils; 067import org.apache.hadoop.hbase.replication.SyncReplicationState; 068import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration; 069import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.MigrationIterator; 070import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.ZkLastPushedSeqId; 071import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.ZkReplicationQueueData; 072import org.apache.hadoop.hbase.util.FutureUtils; 073import org.apache.hadoop.hbase.util.Pair; 074import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 075import org.apache.hadoop.hbase.zookeeper.ZKClusterId; 076import org.apache.hadoop.hbase.zookeeper.ZKConfig; 077import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 078import org.apache.yetus.audience.InterfaceAudience; 079import org.apache.zookeeper.KeeperException; 080import org.slf4j.Logger; 081import org.slf4j.LoggerFactory; 082 083import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 084import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 085 086/** 087 * Manages and performs all replication admin operations. 088 * <p> 089 * Used to add/remove a replication peer. 090 * <p> 091 * Implement {@link ConfigurationObserver} mainly for recreating {@link ReplicationPeerStorage}, for 092 * supporting migrating across different replication peer storages without restarting master. 093 */ 094@InterfaceAudience.Private 095public class ReplicationPeerManager implements ConfigurationObserver { 096 097 private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeerManager.class); 098 099 private volatile ReplicationPeerStorage peerStorage; 100 101 private final ReplicationQueueStorage queueStorage; 102 103 private final ConcurrentMap<String, ReplicationPeerDescription> peers; 104 105 private final ImmutableMap<SyncReplicationState, 106 EnumSet<SyncReplicationState>> allowedTransition = 107 Maps.immutableEnumMap(ImmutableMap.of(SyncReplicationState.ACTIVE, 108 EnumSet.of(SyncReplicationState.DOWNGRADE_ACTIVE, SyncReplicationState.STANDBY), 109 SyncReplicationState.STANDBY, EnumSet.of(SyncReplicationState.DOWNGRADE_ACTIVE), 110 SyncReplicationState.DOWNGRADE_ACTIVE, 111 EnumSet.of(SyncReplicationState.STANDBY, SyncReplicationState.ACTIVE))); 112 113 private final String clusterId; 114 115 private volatile Configuration conf; 116 117 // for dynamic recreating ReplicationPeerStorage. 118 private final FileSystem fs; 119 120 private final ZKWatcher zk; 121 122 @FunctionalInterface 123 interface ReplicationQueueStorageInitializer { 124 125 void initialize() throws IOException; 126 } 127 128 private final ReplicationQueueStorageInitializer queueStorageInitializer; 129 130 // we will mock this class in UT so leave the constructor as package private and not mark the 131 // class as final, since mockito can not mock a final class 132 ReplicationPeerManager(FileSystem fs, ZKWatcher zk, ReplicationPeerStorage peerStorage, 133 ReplicationQueueStorage queueStorage, ConcurrentMap<String, ReplicationPeerDescription> peers, 134 Configuration conf, String clusterId, 135 ReplicationQueueStorageInitializer queueStorageInitializer) { 136 this.fs = fs; 137 this.zk = zk; 138 this.peerStorage = peerStorage; 139 this.queueStorage = queueStorage; 140 this.peers = peers; 141 this.conf = conf; 142 this.clusterId = clusterId; 143 this.queueStorageInitializer = queueStorageInitializer; 144 } 145 146 private void checkQueuesDeleted(String peerId) 147 throws ReplicationException, DoNotRetryIOException { 148 List<ReplicationQueueId> queueIds = queueStorage.listAllQueueIds(peerId); 149 if (!queueIds.isEmpty()) { 150 throw new DoNotRetryIOException("There are still " + queueIds.size() 151 + " undeleted queue(s) for peerId: " + peerId + ", first is " + queueIds.get(0)); 152 } 153 if (queueStorage.getAllPeersFromHFileRefsQueue().contains(peerId)) { 154 throw new DoNotRetryIOException("Undeleted queue for peer " + peerId + " in hfile-refs"); 155 } 156 } 157 158 private void initializeQueueStorage() throws IOException { 159 queueStorageInitializer.initialize(); 160 } 161 162 void preAddPeer(String peerId, ReplicationPeerConfig peerConfig) 163 throws ReplicationException, IOException { 164 if (peerId.contains("-")) { 165 throw new DoNotRetryIOException("Found invalid peer name: " + peerId); 166 } 167 checkPeerConfig(peerConfig); 168 if (peerConfig.isSyncReplication()) { 169 checkSyncReplicationPeerConfigConflict(peerConfig); 170 } 171 if (peers.containsKey(peerId)) { 172 throw new DoNotRetryIOException("Replication peer " + peerId + " already exists"); 173 } 174 175 // lazy create table 176 initializeQueueStorage(); 177 // make sure that there is no queues with the same peer id. This may happen when we create a 178 // peer with the same id with a old deleted peer. If the replication queues for the old peer 179 // have not been cleaned up yet then we should not create the new peer, otherwise the old wal 180 // file may also be replicated. 181 checkQueuesDeleted(peerId); 182 } 183 184 private ReplicationPeerDescription checkPeerExists(String peerId) throws DoNotRetryIOException { 185 ReplicationPeerDescription desc = peers.get(peerId); 186 if (desc == null) { 187 throw new ReplicationPeerNotFoundException(peerId); 188 } 189 return desc; 190 } 191 192 private void checkPeerInDAStateIfSyncReplication(String peerId) throws DoNotRetryIOException { 193 ReplicationPeerDescription desc = peers.get(peerId); 194 if ( 195 desc != null && desc.getPeerConfig().isSyncReplication() 196 && !SyncReplicationState.DOWNGRADE_ACTIVE.equals(desc.getSyncReplicationState()) 197 ) { 198 throw new DoNotRetryIOException( 199 "Couldn't remove synchronous replication peer with state=" + desc.getSyncReplicationState() 200 + ", Transit the synchronous replication state to be DOWNGRADE_ACTIVE firstly."); 201 } 202 } 203 204 ReplicationPeerConfig preRemovePeer(String peerId) throws DoNotRetryIOException { 205 ReplicationPeerDescription pd = checkPeerExists(peerId); 206 checkPeerInDAStateIfSyncReplication(peerId); 207 return pd.getPeerConfig(); 208 } 209 210 void preEnablePeer(String peerId) throws DoNotRetryIOException { 211 ReplicationPeerDescription desc = checkPeerExists(peerId); 212 if (desc.isEnabled()) { 213 throw new DoNotRetryIOException("Replication peer " + peerId + " has already been enabled"); 214 } 215 } 216 217 void preDisablePeer(String peerId) throws DoNotRetryIOException { 218 ReplicationPeerDescription desc = checkPeerExists(peerId); 219 if (!desc.isEnabled()) { 220 throw new DoNotRetryIOException("Replication peer " + peerId + " has already been disabled"); 221 } 222 } 223 224 /** 225 * Return the old peer description. Can never be null. 226 */ 227 ReplicationPeerDescription preUpdatePeerConfig(String peerId, ReplicationPeerConfig peerConfig) 228 throws DoNotRetryIOException { 229 checkPeerConfig(peerConfig); 230 ReplicationPeerDescription desc = checkPeerExists(peerId); 231 ReplicationPeerConfig oldPeerConfig = desc.getPeerConfig(); 232 if (!isStringEquals(peerConfig.getClusterKey(), oldPeerConfig.getClusterKey())) { 233 throw new DoNotRetryIOException( 234 "Changing the cluster key on an existing peer is not allowed. Existing key '" 235 + oldPeerConfig.getClusterKey() + "' for peer " + peerId + " does not match new key '" 236 + peerConfig.getClusterKey() + "'"); 237 } 238 239 if ( 240 !isStringEquals(peerConfig.getReplicationEndpointImpl(), 241 oldPeerConfig.getReplicationEndpointImpl()) 242 ) { 243 throw new DoNotRetryIOException("Changing the replication endpoint implementation class " 244 + "on an existing peer is not allowed. Existing class '" 245 + oldPeerConfig.getReplicationEndpointImpl() + "' for peer " + peerId 246 + " does not match new class '" + peerConfig.getReplicationEndpointImpl() + "'"); 247 } 248 249 if (!isStringEquals(peerConfig.getRemoteWALDir(), oldPeerConfig.getRemoteWALDir())) { 250 throw new DoNotRetryIOException( 251 "Changing the remote wal dir on an existing peer is not allowed. Existing remote wal " 252 + "dir '" + oldPeerConfig.getRemoteWALDir() + "' for peer " + peerId 253 + " does not match new remote wal dir '" + peerConfig.getRemoteWALDir() + "'"); 254 } 255 256 if (oldPeerConfig.isSyncReplication()) { 257 if (!ReplicationUtils.isNamespacesAndTableCFsEqual(oldPeerConfig, peerConfig)) { 258 throw new DoNotRetryIOException( 259 "Changing the replicated namespace/table config on a synchronous replication " 260 + "peer(peerId: " + peerId + ") is not allowed."); 261 } 262 } 263 return desc; 264 } 265 266 /** Returns the old desciption of the peer */ 267 ReplicationPeerDescription preTransitPeerSyncReplicationState(String peerId, 268 SyncReplicationState state) throws DoNotRetryIOException { 269 ReplicationPeerDescription desc = checkPeerExists(peerId); 270 SyncReplicationState fromState = desc.getSyncReplicationState(); 271 EnumSet<SyncReplicationState> allowedToStates = allowedTransition.get(fromState); 272 if (allowedToStates == null || !allowedToStates.contains(state)) { 273 throw new DoNotRetryIOException("Can not transit current cluster state from " + fromState 274 + " to " + state + " for peer id=" + peerId); 275 } 276 return desc; 277 } 278 279 public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) 280 throws ReplicationException { 281 if (peers.containsKey(peerId)) { 282 // this should be a retry, just return 283 return; 284 } 285 peerConfig = ReplicationPeerConfigUtil.updateReplicationBasePeerConfigs(conf, peerConfig); 286 ReplicationPeerConfig copiedPeerConfig = ReplicationPeerConfig.newBuilder(peerConfig).build(); 287 SyncReplicationState syncReplicationState = copiedPeerConfig.isSyncReplication() 288 ? SyncReplicationState.DOWNGRADE_ACTIVE 289 : SyncReplicationState.NONE; 290 peerStorage.addPeer(peerId, copiedPeerConfig, enabled, syncReplicationState); 291 peers.put(peerId, 292 new ReplicationPeerDescription(peerId, enabled, copiedPeerConfig, syncReplicationState)); 293 } 294 295 public void removePeer(String peerId) throws ReplicationException { 296 if (!peers.containsKey(peerId)) { 297 // this should be a retry, just return 298 return; 299 } 300 peerStorage.removePeer(peerId); 301 peers.remove(peerId); 302 } 303 304 private void setPeerState(String peerId, boolean enabled) throws ReplicationException { 305 ReplicationPeerDescription desc = peers.get(peerId); 306 if (desc.isEnabled() == enabled) { 307 // this should be a retry, just return 308 return; 309 } 310 peerStorage.setPeerState(peerId, enabled); 311 peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, desc.getPeerConfig(), 312 desc.getSyncReplicationState())); 313 } 314 315 public boolean getPeerState(String peerId) throws ReplicationException { 316 ReplicationPeerDescription desc = peers.get(peerId); 317 if (desc != null) { 318 return desc.isEnabled(); 319 } else { 320 throw new ReplicationException("Replication Peer of " + peerId + " does not exist."); 321 } 322 } 323 324 public void enablePeer(String peerId) throws ReplicationException { 325 setPeerState(peerId, true); 326 } 327 328 public void disablePeer(String peerId) throws ReplicationException { 329 setPeerState(peerId, false); 330 } 331 332 public void updatePeerConfig(String peerId, ReplicationPeerConfig peerConfig) 333 throws ReplicationException { 334 // the checking rules are too complicated here so we give up checking whether this is a retry. 335 ReplicationPeerDescription desc = peers.get(peerId); 336 ReplicationPeerConfig oldPeerConfig = desc.getPeerConfig(); 337 ReplicationPeerConfigBuilder newPeerConfigBuilder = 338 ReplicationPeerConfig.newBuilder(peerConfig); 339 // we need to use the new conf to overwrite the old one. 340 newPeerConfigBuilder.putAllConfiguration(oldPeerConfig.getConfiguration()); 341 newPeerConfigBuilder.putAllConfiguration(peerConfig.getConfiguration()); 342 ReplicationPeerConfig newPeerConfig = newPeerConfigBuilder.build(); 343 peerStorage.updatePeerConfig(peerId, newPeerConfig); 344 peers.put(peerId, new ReplicationPeerDescription(peerId, desc.isEnabled(), newPeerConfig, 345 desc.getSyncReplicationState())); 346 } 347 348 public List<ReplicationPeerDescription> listPeers(Pattern pattern) { 349 if (pattern == null) { 350 return new ArrayList<>(peers.values()); 351 } 352 return peers.values().stream().filter(r -> pattern.matcher(r.getPeerId()).matches()) 353 .collect(Collectors.toList()); 354 } 355 356 public Optional<ReplicationPeerConfig> getPeerConfig(String peerId) { 357 ReplicationPeerDescription desc = peers.get(peerId); 358 return desc != null ? Optional.of(desc.getPeerConfig()) : Optional.empty(); 359 } 360 361 void removeAllLastPushedSeqIds(String peerId) throws ReplicationException { 362 queueStorage.removeLastSequenceIds(peerId); 363 } 364 365 public void setPeerNewSyncReplicationState(String peerId, SyncReplicationState state) 366 throws ReplicationException { 367 peerStorage.setPeerNewSyncReplicationState(peerId, state); 368 } 369 370 public void transitPeerSyncReplicationState(String peerId, SyncReplicationState newState) 371 throws ReplicationException { 372 if (peerStorage.getPeerNewSyncReplicationState(peerId) != SyncReplicationState.NONE) { 373 // Only transit if this is not a retry 374 peerStorage.transitPeerSyncReplicationState(peerId); 375 } 376 ReplicationPeerDescription desc = peers.get(peerId); 377 if (desc.getSyncReplicationState() != newState) { 378 // Only recreate the desc if this is not a retry 379 peers.put(peerId, 380 new ReplicationPeerDescription(peerId, desc.isEnabled(), desc.getPeerConfig(), newState)); 381 } 382 } 383 384 public void removeAllQueues(String peerId) throws ReplicationException { 385 // Here we need two passes to address the problem of claimQueue. Maybe a claimQueue is still 386 // on-going when the refresh peer config procedure is done, if a RS which has already been 387 // scanned claims the queue of a RS which has not been scanned yet, we will miss that queue in 388 // the scan here, and if the RS who has claimed the queue crashed before creating recovered 389 // source, then the queue will leave there until the another RS detects the crash and helps 390 // removing the queue. 391 // A two pass scan can solve the problem. Anyway, the queue will not disappear during the 392 // claiming, it will either under the old RS or under the new RS, and a queue can only be 393 // claimed once after the refresh peer procedure done(as the next claim queue will just delete 394 // it), so we can make sure that a two pass scan will finally find the queue and remove it, 395 // unless it has already been removed by others. 396 queueStorage.removeAllQueues(peerId); 397 queueStorage.removeAllQueues(peerId); 398 } 399 400 public void removeAllQueuesAndHFileRefs(String peerId) throws ReplicationException { 401 removeAllQueues(peerId); 402 queueStorage.removePeerFromHFileRefs(peerId); 403 } 404 405 private void checkPeerConfig(ReplicationPeerConfig peerConfig) throws DoNotRetryIOException { 406 String replicationEndpointImpl = peerConfig.getReplicationEndpointImpl(); 407 ReplicationEndpoint endpoint = null; 408 if (!StringUtils.isBlank(replicationEndpointImpl)) { 409 try { 410 // try creating a instance 411 endpoint = Class.forName(replicationEndpointImpl).asSubclass(ReplicationEndpoint.class) 412 .getDeclaredConstructor().newInstance(); 413 } catch (Throwable e) { 414 throw new DoNotRetryIOException( 415 "Can not instantiate configured replication endpoint class=" + replicationEndpointImpl, 416 e); 417 } 418 } 419 // Endpoints implementing HBaseReplicationEndpoint need to check cluster key 420 if (endpoint == null || endpoint instanceof HBaseReplicationEndpoint) { 421 checkClusterKey(peerConfig.getClusterKey()); 422 // Check if endpoint can replicate to the same cluster 423 if (endpoint == null || !endpoint.canReplicateToSameCluster()) { 424 checkSameClusterKey(peerConfig.getClusterKey()); 425 } 426 } 427 428 if (peerConfig.replicateAllUserTables()) { 429 // If replicate_all flag is true, it means all user tables will be replicated to peer cluster. 430 // Then allow config exclude namespaces or exclude table-cfs which can't be replicated to peer 431 // cluster. 432 if ( 433 (peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty()) 434 || (peerConfig.getTableCFsMap() != null && !peerConfig.getTableCFsMap().isEmpty()) 435 ) { 436 throw new DoNotRetryIOException("Need clean namespaces or table-cfs config firstly " 437 + "when you want replicate all cluster"); 438 } 439 checkNamespacesAndTableCfsConfigConflict(peerConfig.getExcludeNamespaces(), 440 peerConfig.getExcludeTableCFsMap()); 441 } else { 442 // If replicate_all flag is false, it means all user tables can't be replicated to peer 443 // cluster. Then allow to config namespaces or table-cfs which will be replicated to peer 444 // cluster. 445 if ( 446 (peerConfig.getExcludeNamespaces() != null && !peerConfig.getExcludeNamespaces().isEmpty()) 447 || (peerConfig.getExcludeTableCFsMap() != null 448 && !peerConfig.getExcludeTableCFsMap().isEmpty()) 449 ) { 450 throw new DoNotRetryIOException( 451 "Need clean exclude-namespaces or exclude-table-cfs config firstly" 452 + " when replicate_all flag is false"); 453 } 454 checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(), 455 peerConfig.getTableCFsMap()); 456 } 457 458 if (peerConfig.isSyncReplication()) { 459 checkPeerConfigForSyncReplication(peerConfig); 460 } 461 462 checkConfiguredWALEntryFilters(peerConfig); 463 } 464 465 private void checkPeerConfigForSyncReplication(ReplicationPeerConfig peerConfig) 466 throws DoNotRetryIOException { 467 // This is used to reduce the difficulty for implementing the sync replication state transition 468 // as we need to reopen all the related regions. 469 // TODO: Add namespace, replicat_all flag back 470 if (peerConfig.replicateAllUserTables()) { 471 throw new DoNotRetryIOException( 472 "Only support replicated table config for sync replication peer"); 473 } 474 if (peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty()) { 475 throw new DoNotRetryIOException( 476 "Only support replicated table config for sync replication peer"); 477 } 478 if (peerConfig.getTableCFsMap() == null || peerConfig.getTableCFsMap().isEmpty()) { 479 throw new DoNotRetryIOException("Need config replicated tables for sync replication peer"); 480 } 481 for (List<String> cfs : peerConfig.getTableCFsMap().values()) { 482 if (cfs != null && !cfs.isEmpty()) { 483 throw new DoNotRetryIOException( 484 "Only support replicated table config for sync replication peer"); 485 } 486 } 487 488 Path remoteWALDir = new Path(peerConfig.getRemoteWALDir()); 489 if (!remoteWALDir.isAbsolute()) { 490 throw new DoNotRetryIOException( 491 "The remote WAL directory " + peerConfig.getRemoteWALDir() + " is not absolute"); 492 } 493 URI remoteWALDirUri = remoteWALDir.toUri(); 494 if (remoteWALDirUri.getScheme() == null || remoteWALDirUri.getAuthority() == null) { 495 throw new DoNotRetryIOException("The remote WAL directory " + peerConfig.getRemoteWALDir() 496 + " is not qualified, you must provide scheme and authority"); 497 } 498 } 499 500 private void checkSyncReplicationPeerConfigConflict(ReplicationPeerConfig peerConfig) 501 throws DoNotRetryIOException { 502 for (TableName tableName : peerConfig.getTableCFsMap().keySet()) { 503 for (Map.Entry<String, ReplicationPeerDescription> entry : peers.entrySet()) { 504 ReplicationPeerConfig rpc = entry.getValue().getPeerConfig(); 505 if (rpc.isSyncReplication() && rpc.getTableCFsMap().containsKey(tableName)) { 506 throw new DoNotRetryIOException( 507 "Table " + tableName + " has been replicated by peer " + entry.getKey()); 508 } 509 } 510 } 511 } 512 513 /** 514 * Set a namespace in the peer config means that all tables in this namespace will be replicated 515 * to the peer cluster. 516 * <ol> 517 * <li>If peer config already has a namespace, then not allow set any table of this namespace to 518 * the peer config.</li> 519 * <li>If peer config already has a table, then not allow set this table's namespace to the peer 520 * config.</li> 521 * </ol> 522 * <p> 523 * Set a exclude namespace in the peer config means that all tables in this namespace can't be 524 * replicated to the peer cluster. 525 * <ol> 526 * <li>If peer config already has a exclude namespace, then not allow set any exclude table of 527 * this namespace to the peer config.</li> 528 * <li>If peer config already has a exclude table, then not allow set this table's namespace as a 529 * exclude namespace.</li> 530 * </ol> 531 */ 532 private void checkNamespacesAndTableCfsConfigConflict(Set<String> namespaces, 533 Map<TableName, ? extends Collection<String>> tableCfs) throws DoNotRetryIOException { 534 if (namespaces == null || namespaces.isEmpty()) { 535 return; 536 } 537 if (tableCfs == null || tableCfs.isEmpty()) { 538 return; 539 } 540 for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) { 541 TableName table = entry.getKey(); 542 if (namespaces.contains(table.getNamespaceAsString())) { 543 throw new DoNotRetryIOException("Table-cfs " + table + " is conflict with namespaces " 544 + table.getNamespaceAsString() + " in peer config"); 545 } 546 } 547 } 548 549 private void checkConfiguredWALEntryFilters(ReplicationPeerConfig peerConfig) 550 throws DoNotRetryIOException { 551 String filterCSV = peerConfig.getConfiguration() 552 .get(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY); 553 if (filterCSV != null && !filterCSV.isEmpty()) { 554 String[] filters = filterCSV.split(","); 555 for (String filter : filters) { 556 try { 557 Class.forName(filter).getDeclaredConstructor().newInstance(); 558 } catch (Exception e) { 559 throw new DoNotRetryIOException("Configured WALEntryFilter " + filter 560 + " could not be created. Failing add/update peer operation.", e); 561 } 562 } 563 } 564 } 565 566 private void checkClusterKey(String clusterKey) throws DoNotRetryIOException { 567 try { 568 ZKConfig.validateClusterKey(clusterKey); 569 } catch (IOException e) { 570 throw new DoNotRetryIOException("Invalid cluster key: " + clusterKey, e); 571 } 572 } 573 574 private void checkSameClusterKey(String clusterKey) throws DoNotRetryIOException { 575 String peerClusterId = ""; 576 try { 577 // Create the peer cluster config for get peer cluster id 578 Configuration peerConf = HBaseConfiguration.createClusterConf(conf, clusterKey); 579 try (ZKWatcher zkWatcher = new ZKWatcher(peerConf, this + "check-peer-cluster-id", null)) { 580 peerClusterId = ZKClusterId.readClusterIdZNode(zkWatcher); 581 } 582 } catch (IOException | KeeperException e) { 583 throw new DoNotRetryIOException("Can't get peerClusterId for clusterKey=" + clusterKey, e); 584 } 585 // In rare case, zookeeper setting may be messed up. That leads to the incorrect 586 // peerClusterId value, which is the same as the source clusterId 587 if (clusterId.equals(peerClusterId)) { 588 throw new DoNotRetryIOException("Invalid cluster key: " + clusterKey 589 + ", should not replicate to itself for HBaseInterClusterReplicationEndpoint"); 590 } 591 } 592 593 public List<String> getSerialPeerIdsBelongsTo(TableName tableName) { 594 return peers.values().stream().filter(p -> p.getPeerConfig().isSerial()) 595 .filter(p -> p.getPeerConfig().needToReplicate(tableName)).map(p -> p.getPeerId()) 596 .collect(Collectors.toList()); 597 } 598 599 @RestrictedApi(explanation = "Should only be called in tests", link = "", 600 allowedOnPath = ".*/src/test/.*") 601 public ReplicationPeerStorage getPeerStorage() { 602 return peerStorage; 603 } 604 605 public ReplicationQueueStorage getQueueStorage() { 606 return queueStorage; 607 } 608 609 private static Pair<ReplicationQueueStorage, ReplicationQueueStorageInitializer> 610 createReplicationQueueStorage(MasterServices services) throws IOException { 611 Configuration conf = services.getConfiguration(); 612 TableName replicationQueueTableName = 613 TableName.valueOf(conf.get(ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME, 614 ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME_DEFAULT.getNameAsString())); 615 ReplicationQueueStorageInitializer initializer; 616 if (services.getTableDescriptors().exists(replicationQueueTableName)) { 617 // no need to create the table 618 initializer = () -> { 619 }; 620 } else { 621 // lazy create the replication table. 622 initializer = new ReplicationQueueStorageInitializer() { 623 624 private volatile boolean created = false; 625 626 @Override 627 public void initialize() throws IOException { 628 if (created) { 629 return; 630 } 631 synchronized (this) { 632 if (created) { 633 return; 634 } 635 if (services.getTableDescriptors().exists(replicationQueueTableName)) { 636 created = true; 637 return; 638 } 639 long procId = services.createSystemTable(ReplicationStorageFactory 640 .createReplicationQueueTableDescriptor(replicationQueueTableName)); 641 ProcedureExecutor<MasterProcedureEnv> procExec = services.getMasterProcedureExecutor(); 642 ProcedureSyncWait.waitFor(procExec.getEnvironment(), TimeUnit.MINUTES.toMillis(1), 643 "Creating table " + replicationQueueTableName, () -> procExec.isFinished(procId)); 644 } 645 } 646 }; 647 } 648 return Pair.newPair(ReplicationStorageFactory.getReplicationQueueStorage( 649 services.getConnection(), conf, replicationQueueTableName), initializer); 650 } 651 652 public static ReplicationPeerManager create(MasterServices services, String clusterId) 653 throws ReplicationException, IOException { 654 Configuration conf = services.getConfiguration(); 655 FileSystem fs = services.getMasterFileSystem().getFileSystem(); 656 ZKWatcher zk = services.getZooKeeper(); 657 ReplicationPeerStorage peerStorage = 658 ReplicationStorageFactory.getReplicationPeerStorage(fs, zk, conf); 659 Pair<ReplicationQueueStorage, ReplicationQueueStorageInitializer> pair = 660 createReplicationQueueStorage(services); 661 ReplicationQueueStorage queueStorage = pair.getFirst(); 662 ConcurrentMap<String, ReplicationPeerDescription> peers = new ConcurrentHashMap<>(); 663 for (String peerId : peerStorage.listPeerIds()) { 664 ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId); 665 if ( 666 ReplicationUtils.LEGACY_REGION_REPLICATION_ENDPOINT_NAME 667 .equals(peerConfig.getReplicationEndpointImpl()) 668 ) { 669 // we do not use this endpoint for region replication any more, see HBASE-26233 670 LOG.info("Legacy region replication peer found, removing: {}", peerConfig); 671 // do it asynchronous to not block the start up of HMaster 672 new Thread("Remove legacy replication peer " + peerId) { 673 674 @Override 675 public void run() { 676 try { 677 // need to delete two times to make sure we delete all the queues, see the comments in 678 // above 679 // removeAllQueues method for more details. 680 queueStorage.removeAllQueues(peerId); 681 queueStorage.removeAllQueues(peerId); 682 // delete queue first and then peer, because we use peer as a flag. 683 peerStorage.removePeer(peerId); 684 } catch (Exception e) { 685 LOG.warn("Failed to delete legacy replication peer {}", peerId); 686 } 687 } 688 }.start(); 689 continue; 690 } 691 peerConfig = ReplicationPeerConfigUtil.updateReplicationBasePeerConfigs(conf, peerConfig); 692 peerStorage.updatePeerConfig(peerId, peerConfig); 693 boolean enabled = peerStorage.isPeerEnabled(peerId); 694 SyncReplicationState state = peerStorage.getPeerSyncReplicationState(peerId); 695 peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, peerConfig, state)); 696 } 697 return new ReplicationPeerManager(fs, zk, peerStorage, queueStorage, peers, conf, clusterId, 698 pair.getSecond()); 699 } 700 701 /** 702 * For replication peer cluster key or endpoint class, null and empty string is same. So here 703 * don't use {@link StringUtils#equals(CharSequence, CharSequence)} directly. 704 */ 705 private boolean isStringEquals(String s1, String s2) { 706 if (StringUtils.isBlank(s1)) { 707 return StringUtils.isBlank(s2); 708 } 709 return s1.equals(s2); 710 } 711 712 @Override 713 public void onConfigurationChange(Configuration conf) { 714 this.conf = conf; 715 this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(fs, zk, conf); 716 } 717 718 private ReplicationQueueData convert(ZkReplicationQueueData zkData) { 719 Map<String, ReplicationGroupOffset> groupOffsets = new HashMap<>(); 720 zkData.getWalOffsets().forEach((wal, offset) -> { 721 String walGroup = AbstractFSWALProvider.getWALPrefixFromWALName(wal); 722 groupOffsets.compute(walGroup, (k, oldOffset) -> { 723 if (oldOffset == null) { 724 return new ReplicationGroupOffset(wal, offset); 725 } 726 // we should record the first wal's offset 727 long oldWalTs = AbstractFSWALProvider.getTimestamp(oldOffset.getWal()); 728 long walTs = AbstractFSWALProvider.getTimestamp(wal); 729 if (walTs < oldWalTs) { 730 return new ReplicationGroupOffset(wal, offset); 731 } 732 return oldOffset; 733 }); 734 }); 735 return new ReplicationQueueData(zkData.getQueueId(), ImmutableMap.copyOf(groupOffsets)); 736 } 737 738 private void migrateQueues(ZKReplicationQueueStorageForMigration oldQueueStorage) 739 throws Exception { 740 MigrationIterator<Pair<ServerName, List<ZkReplicationQueueData>>> iter = 741 oldQueueStorage.listAllQueues(); 742 for (;;) { 743 Pair<ServerName, List<ZkReplicationQueueData>> pair = iter.next(); 744 if (pair == null) { 745 return; 746 } 747 queueStorage.batchUpdateQueues(pair.getFirst(), 748 pair.getSecond().stream().filter(data -> peers.containsKey(data.getQueueId().getPeerId())) 749 .map(this::convert).collect(Collectors.toList())); 750 } 751 } 752 753 private void migrateLastPushedSeqIds(ZKReplicationQueueStorageForMigration oldQueueStorage) 754 throws Exception { 755 MigrationIterator<List<ZkLastPushedSeqId>> iter = oldQueueStorage.listAllLastPushedSeqIds(); 756 for (;;) { 757 List<ZkLastPushedSeqId> list = iter.next(); 758 if (list == null) { 759 return; 760 } 761 queueStorage.batchUpdateLastSequenceIds(list.stream() 762 .filter(data -> peers.containsKey(data.getPeerId())).collect(Collectors.toList())); 763 } 764 } 765 766 private void migrateHFileRefs(ZKReplicationQueueStorageForMigration oldQueueStorage) 767 throws Exception { 768 MigrationIterator<Pair<String, List<String>>> iter = oldQueueStorage.listAllHFileRefs(); 769 for (;;) { 770 Pair<String, List<String>> pair = iter.next(); 771 if (pair == null) { 772 return; 773 } 774 if (peers.containsKey(pair.getFirst())) { 775 queueStorage.batchUpdateHFileRefs(pair.getFirst(), pair.getSecond()); 776 } 777 } 778 } 779 780 private interface ExceptionalRunnable { 781 void run() throws Exception; 782 } 783 784 private CompletableFuture<?> runAsync(ExceptionalRunnable task, ExecutorService executor) { 785 CompletableFuture<?> future = new CompletableFuture<>(); 786 executor.execute(() -> { 787 try { 788 task.run(); 789 future.complete(null); 790 } catch (Exception e) { 791 future.completeExceptionally(e); 792 } 793 }); 794 return future; 795 } 796 797 /** 798 * Submit the migration tasks to the given {@code executor}. 799 */ 800 CompletableFuture<Void> migrateQueuesFromZk(ZKWatcher zookeeper, ExecutorService executor) { 801 // the replication queue table creation is asynchronous and will be triggered by addPeer, so 802 // here we need to manually initialize it since we will not call addPeer. 803 try { 804 initializeQueueStorage(); 805 } catch (IOException e) { 806 return FutureUtils.failedFuture(e); 807 } 808 ZKReplicationQueueStorageForMigration oldStorage = 809 new ZKReplicationQueueStorageForMigration(zookeeper, conf); 810 return CompletableFuture.allOf(runAsync(() -> migrateQueues(oldStorage), executor), 811 runAsync(() -> migrateLastPushedSeqIds(oldStorage), executor), 812 runAsync(() -> migrateHFileRefs(oldStorage), executor)); 813 } 814}