001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.replication; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Collection; 023import java.util.List; 024import java.util.Map; 025import java.util.Optional; 026import java.util.Set; 027import java.util.concurrent.ConcurrentHashMap; 028import java.util.concurrent.ConcurrentMap; 029import java.util.regex.Pattern; 030import java.util.stream.Collectors; 031import org.apache.commons.lang3.StringUtils; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.hbase.DoNotRetryIOException; 034import org.apache.hadoop.hbase.HBaseConfiguration; 035import org.apache.hadoop.hbase.ReplicationPeerNotFoundException; 036import org.apache.hadoop.hbase.ServerName; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; 039import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint; 040import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint; 041import org.apache.hadoop.hbase.replication.ReplicationEndpoint; 042import org.apache.hadoop.hbase.replication.ReplicationException; 043import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 044import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder; 045import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; 046import org.apache.hadoop.hbase.replication.ReplicationPeerStorage; 047import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; 048import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 049import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; 050import org.apache.hadoop.hbase.replication.ReplicationUtils; 051import org.apache.hadoop.hbase.zookeeper.ZKClusterId; 052import org.apache.hadoop.hbase.zookeeper.ZKConfig; 053import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 054import org.apache.yetus.audience.InterfaceAudience; 055import org.apache.zookeeper.KeeperException; 056 057/** 058 * Manages and performs all replication admin operations. 059 * <p> 060 * Used to add/remove a replication peer. 061 */ 062@InterfaceAudience.Private 063public class ReplicationPeerManager { 064 065 private final ReplicationPeerStorage peerStorage; 066 067 private final ReplicationQueueStorage queueStorage; 068 069 private final ConcurrentMap<String, ReplicationPeerDescription> peers; 070 071 private final String clusterId; 072 073 private final Configuration conf; 074 075 ReplicationPeerManager(ReplicationPeerStorage peerStorage, ReplicationQueueStorage queueStorage, 076 ConcurrentMap<String, ReplicationPeerDescription> peers, Configuration conf, String clusterId) { 077 this.peerStorage = peerStorage; 078 this.queueStorage = queueStorage; 079 this.peers = peers; 080 this.conf = conf; 081 this.clusterId = clusterId; 082 } 083 084 private void checkQueuesDeleted(String peerId) 085 throws ReplicationException, DoNotRetryIOException { 086 for (ServerName replicator : queueStorage.getListOfReplicators()) { 087 List<String> queueIds = queueStorage.getAllQueues(replicator); 088 for (String queueId : queueIds) { 089 ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); 090 if (queueInfo.getPeerId().equals(peerId)) { 091 throw new DoNotRetryIOException("undeleted queue for peerId: " + peerId + ", replicator: " 092 + replicator + ", queueId: " + queueId); 093 } 094 } 095 } 096 if (queueStorage.getAllPeersFromHFileRefsQueue().contains(peerId)) { 097 throw new DoNotRetryIOException("Undeleted queue for peer " + peerId + " in hfile-refs"); 098 } 099 } 100 101 void preAddPeer(String peerId, ReplicationPeerConfig peerConfig) 102 throws DoNotRetryIOException, ReplicationException { 103 if (peerId.contains("-")) { 104 throw new DoNotRetryIOException("Found invalid peer name: " + peerId); 105 } 106 checkPeerConfig(peerConfig); 107 if (peers.containsKey(peerId)) { 108 throw new DoNotRetryIOException("Replication peer " + peerId + " already exists"); 109 } 110 // make sure that there is no queues with the same peer id. This may happen when we create a 111 // peer with the same id with a old deleted peer. If the replication queues for the old peer 112 // have not been cleaned up yet then we should not create the new peer, otherwise the old wal 113 // file may also be replicated. 114 checkQueuesDeleted(peerId); 115 } 116 117 private ReplicationPeerDescription checkPeerExists(String peerId) throws DoNotRetryIOException { 118 ReplicationPeerDescription desc = peers.get(peerId); 119 if (desc == null) { 120 throw new ReplicationPeerNotFoundException(peerId); 121 } 122 return desc; 123 } 124 125 ReplicationPeerConfig preRemovePeer(String peerId) throws DoNotRetryIOException { 126 return checkPeerExists(peerId).getPeerConfig(); 127 } 128 129 void preEnablePeer(String peerId) throws DoNotRetryIOException { 130 ReplicationPeerDescription desc = checkPeerExists(peerId); 131 if (desc.isEnabled()) { 132 throw new DoNotRetryIOException("Replication peer " + peerId + " has already been enabled"); 133 } 134 } 135 136 void preDisablePeer(String peerId) throws DoNotRetryIOException { 137 ReplicationPeerDescription desc = checkPeerExists(peerId); 138 if (!desc.isEnabled()) { 139 throw new DoNotRetryIOException("Replication peer " + peerId + " has already been disabled"); 140 } 141 } 142 143 /** 144 * Return the old peer description. Can never be null. 145 */ 146 ReplicationPeerDescription preUpdatePeerConfig(String peerId, ReplicationPeerConfig peerConfig) 147 throws DoNotRetryIOException { 148 checkPeerConfig(peerConfig); 149 ReplicationPeerDescription desc = checkPeerExists(peerId); 150 ReplicationPeerConfig oldPeerConfig = desc.getPeerConfig(); 151 if (!isStringEquals(peerConfig.getClusterKey(), oldPeerConfig.getClusterKey())) { 152 throw new DoNotRetryIOException( 153 "Changing the cluster key on an existing peer is not allowed. Existing key '" 154 + oldPeerConfig.getClusterKey() + "' for peer " + peerId + " does not match new key '" 155 + peerConfig.getClusterKey() + "'"); 156 } 157 158 if ( 159 !isStringEquals(peerConfig.getReplicationEndpointImpl(), 160 oldPeerConfig.getReplicationEndpointImpl()) 161 ) { 162 throw new DoNotRetryIOException("Changing the replication endpoint implementation class " 163 + "on an existing peer is not allowed. Existing class '" 164 + oldPeerConfig.getReplicationEndpointImpl() + "' for peer " + peerId 165 + " does not match new class '" + peerConfig.getReplicationEndpointImpl() + "'"); 166 } 167 return desc; 168 } 169 170 public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) 171 throws ReplicationException { 172 if (peers.containsKey(peerId)) { 173 // this should be a retry, just return 174 return; 175 } 176 peerConfig = ReplicationPeerConfigUtil.updateReplicationBasePeerConfigs(conf, peerConfig); 177 ReplicationPeerConfig copiedPeerConfig = ReplicationPeerConfig.newBuilder(peerConfig).build(); 178 peerStorage.addPeer(peerId, copiedPeerConfig, enabled); 179 peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, copiedPeerConfig)); 180 } 181 182 public void removePeer(String peerId) throws ReplicationException { 183 if (!peers.containsKey(peerId)) { 184 // this should be a retry, just return 185 return; 186 } 187 peerStorage.removePeer(peerId); 188 peers.remove(peerId); 189 } 190 191 private void setPeerState(String peerId, boolean enabled) throws ReplicationException { 192 ReplicationPeerDescription desc = peers.get(peerId); 193 if (desc.isEnabled() == enabled) { 194 // this should be a retry, just return 195 return; 196 } 197 peerStorage.setPeerState(peerId, enabled); 198 peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, desc.getPeerConfig())); 199 } 200 201 public void enablePeer(String peerId) throws ReplicationException { 202 setPeerState(peerId, true); 203 } 204 205 public void disablePeer(String peerId) throws ReplicationException { 206 setPeerState(peerId, false); 207 } 208 209 public void updatePeerConfig(String peerId, ReplicationPeerConfig peerConfig) 210 throws ReplicationException { 211 // the checking rules are too complicated here so we give up checking whether this is a retry. 212 ReplicationPeerDescription desc = peers.get(peerId); 213 ReplicationPeerConfig oldPeerConfig = desc.getPeerConfig(); 214 ReplicationPeerConfigBuilder newPeerConfigBuilder = 215 ReplicationPeerConfig.newBuilder(peerConfig); 216 // we need to use the new conf to overwrite the old one. 217 newPeerConfigBuilder.putAllConfiguration(oldPeerConfig.getConfiguration()); 218 newPeerConfigBuilder.putAllConfiguration(peerConfig.getConfiguration()); 219 newPeerConfigBuilder.putAllConfiguration(oldPeerConfig.getConfiguration()); 220 newPeerConfigBuilder.putAllConfiguration(peerConfig.getConfiguration()); 221 ReplicationPeerConfig newPeerConfig = newPeerConfigBuilder.build(); 222 peerStorage.updatePeerConfig(peerId, newPeerConfig); 223 peers.put(peerId, new ReplicationPeerDescription(peerId, desc.isEnabled(), newPeerConfig)); 224 } 225 226 public List<ReplicationPeerDescription> listPeers(Pattern pattern) { 227 if (pattern == null) { 228 return new ArrayList<>(peers.values()); 229 } 230 return peers.values().stream().filter(r -> pattern.matcher(r.getPeerId()).matches()) 231 .collect(Collectors.toList()); 232 } 233 234 public Optional<ReplicationPeerConfig> getPeerConfig(String peerId) { 235 ReplicationPeerDescription desc = peers.get(peerId); 236 return desc != null ? Optional.of(desc.getPeerConfig()) : Optional.empty(); 237 } 238 239 void removeAllLastPushedSeqIds(String peerId) throws ReplicationException { 240 queueStorage.removeLastSequenceIds(peerId); 241 } 242 243 void removeAllQueuesAndHFileRefs(String peerId) throws ReplicationException { 244 // Here we need two passes to address the problem of claimQueue. Maybe a claimQueue is still 245 // on-going when the refresh peer config procedure is done, if a RS which has already been 246 // scanned claims the queue of a RS which has not been scanned yet, we will miss that queue in 247 // the scan here, and if the RS who has claimed the queue crashed before creating recovered 248 // source, then the queue will leave there until the another RS detects the crash and helps 249 // removing the queue. 250 // A two pass scan can solve the problem. Anyway, the queue will not disappear during the 251 // claiming, it will either under the old RS or under the new RS, and a queue can only be 252 // claimed once after the refresh peer procedure done(as the next claim queue will just delete 253 // it), so we can make sure that a two pass scan will finally find the queue and remove it, 254 // unless it has already been removed by others. 255 ReplicationUtils.removeAllQueues(queueStorage, peerId); 256 ReplicationUtils.removeAllQueues(queueStorage, peerId); 257 queueStorage.removePeerFromHFileRefs(peerId); 258 } 259 260 private void checkPeerConfig(ReplicationPeerConfig peerConfig) throws DoNotRetryIOException { 261 String replicationEndpointImpl = peerConfig.getReplicationEndpointImpl(); 262 ReplicationEndpoint endpoint = null; 263 if (!StringUtils.isBlank(replicationEndpointImpl)) { 264 try { 265 // try creating a instance 266 endpoint = Class.forName(replicationEndpointImpl).asSubclass(ReplicationEndpoint.class) 267 .getDeclaredConstructor().newInstance(); 268 } catch (Throwable e) { 269 throw new DoNotRetryIOException( 270 "Can not instantiate configured replication endpoint class=" + replicationEndpointImpl, 271 e); 272 } 273 } 274 // Endpoints implementing HBaseReplicationEndpoint need to check cluster key 275 if (endpoint == null || endpoint instanceof HBaseReplicationEndpoint) { 276 checkClusterKey(peerConfig.getClusterKey()); 277 // Check if endpoint can replicate to the same cluster 278 if (endpoint == null || !endpoint.canReplicateToSameCluster()) { 279 checkSameClusterKey(peerConfig.getClusterKey()); 280 } 281 } 282 283 if (peerConfig.replicateAllUserTables()) { 284 // If replicate_all flag is true, it means all user tables will be replicated to peer cluster. 285 // Then allow config exclude namespaces or exclude table-cfs which can't be replicated to peer 286 // cluster. 287 if ( 288 (peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty()) 289 || (peerConfig.getTableCFsMap() != null && !peerConfig.getTableCFsMap().isEmpty()) 290 ) { 291 throw new DoNotRetryIOException("Need clean namespaces or table-cfs config firstly " 292 + "when you want replicate all cluster"); 293 } 294 checkNamespacesAndTableCfsConfigConflict(peerConfig.getExcludeNamespaces(), 295 peerConfig.getExcludeTableCFsMap()); 296 } else { 297 // If replicate_all flag is false, it means all user tables can't be replicated to peer 298 // cluster. Then allow to config namespaces or table-cfs which will be replicated to peer 299 // cluster. 300 if ( 301 (peerConfig.getExcludeNamespaces() != null && !peerConfig.getExcludeNamespaces().isEmpty()) 302 || (peerConfig.getExcludeTableCFsMap() != null 303 && !peerConfig.getExcludeTableCFsMap().isEmpty()) 304 ) { 305 throw new DoNotRetryIOException( 306 "Need clean exclude-namespaces or exclude-table-cfs config firstly" 307 + " when replicate_all flag is false"); 308 } 309 checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(), 310 peerConfig.getTableCFsMap()); 311 } 312 313 checkConfiguredWALEntryFilters(peerConfig); 314 } 315 316 /** 317 * Set a namespace in the peer config means that all tables in this namespace will be replicated 318 * to the peer cluster. 319 * <ol> 320 * <li>If peer config already has a namespace, then not allow set any table of this namespace to 321 * the peer config.</li> 322 * <li>If peer config already has a table, then not allow set this table's namespace to the peer 323 * config.</li> 324 * </ol> 325 * <p> 326 * Set a exclude namespace in the peer config means that all tables in this namespace can't be 327 * replicated to the peer cluster. 328 * <ol> 329 * <li>If peer config already has a exclude namespace, then not allow set any exclude table of 330 * this namespace to the peer config.</li> 331 * <li>If peer config already has a exclude table, then not allow set this table's namespace as a 332 * exclude namespace.</li> 333 * </ol> 334 */ 335 private void checkNamespacesAndTableCfsConfigConflict(Set<String> namespaces, 336 Map<TableName, ? extends Collection<String>> tableCfs) throws DoNotRetryIOException { 337 if (namespaces == null || namespaces.isEmpty()) { 338 return; 339 } 340 if (tableCfs == null || tableCfs.isEmpty()) { 341 return; 342 } 343 for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) { 344 TableName table = entry.getKey(); 345 if (namespaces.contains(table.getNamespaceAsString())) { 346 throw new DoNotRetryIOException("Table-cfs " + table + " is conflict with namespaces " 347 + table.getNamespaceAsString() + " in peer config"); 348 } 349 } 350 } 351 352 private void checkConfiguredWALEntryFilters(ReplicationPeerConfig peerConfig) 353 throws DoNotRetryIOException { 354 String filterCSV = peerConfig.getConfiguration() 355 .get(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY); 356 if (filterCSV != null && !filterCSV.isEmpty()) { 357 String[] filters = filterCSV.split(","); 358 for (String filter : filters) { 359 try { 360 Class.forName(filter).getDeclaredConstructor().newInstance(); 361 } catch (Exception e) { 362 throw new DoNotRetryIOException("Configured WALEntryFilter " + filter 363 + " could not be created. Failing add/update peer operation.", e); 364 } 365 } 366 } 367 } 368 369 private void checkClusterKey(String clusterKey) throws DoNotRetryIOException { 370 try { 371 ZKConfig.validateClusterKey(clusterKey); 372 } catch (IOException e) { 373 throw new DoNotRetryIOException("Invalid cluster key: " + clusterKey, e); 374 } 375 } 376 377 private void checkSameClusterKey(String clusterKey) throws DoNotRetryIOException { 378 String peerClusterId = ""; 379 try { 380 // Create the peer cluster config for get peer cluster id 381 Configuration peerConf = HBaseConfiguration.createClusterConf(conf, clusterKey); 382 try (ZKWatcher zkWatcher = new ZKWatcher(peerConf, this + "check-peer-cluster-id", null)) { 383 peerClusterId = ZKClusterId.readClusterIdZNode(zkWatcher); 384 } 385 } catch (IOException | KeeperException e) { 386 throw new DoNotRetryIOException("Can't get peerClusterId for clusterKey=" + clusterKey, e); 387 } 388 // In rare case, zookeeper setting may be messed up. That leads to the incorrect 389 // peerClusterId value, which is the same as the source clusterId 390 if (clusterId.equals(peerClusterId)) { 391 throw new DoNotRetryIOException("Invalid cluster key: " + clusterKey 392 + ", should not replicate to itself for HBaseInterClusterReplicationEndpoint"); 393 } 394 } 395 396 public List<String> getSerialPeerIdsBelongsTo(TableName tableName) { 397 return peers.values().stream().filter(p -> p.getPeerConfig().isSerial()) 398 .filter(p -> p.getPeerConfig().needToReplicate(tableName)).map(p -> p.getPeerId()) 399 .collect(Collectors.toList()); 400 } 401 402 public ReplicationQueueStorage getQueueStorage() { 403 return queueStorage; 404 } 405 406 public static ReplicationPeerManager create(ZKWatcher zk, Configuration conf, String clusterId) 407 throws ReplicationException { 408 ReplicationPeerStorage peerStorage = 409 ReplicationStorageFactory.getReplicationPeerStorage(zk, conf); 410 ConcurrentMap<String, ReplicationPeerDescription> peers = new ConcurrentHashMap<>(); 411 for (String peerId : peerStorage.listPeerIds()) { 412 ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId); 413 414 peerConfig = ReplicationPeerConfigUtil.updateReplicationBasePeerConfigs(conf, peerConfig); 415 peerStorage.updatePeerConfig(peerId, peerConfig); 416 boolean enabled = peerStorage.isPeerEnabled(peerId); 417 peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, peerConfig)); 418 } 419 return new ReplicationPeerManager(peerStorage, 420 ReplicationStorageFactory.getReplicationQueueStorage(zk, conf), peers, conf, clusterId); 421 } 422 423 /** 424 * For replication peer cluster key or endpoint class, null and empty string is same. So here 425 * don't use {@link StringUtils#equals(CharSequence, CharSequence)} directly. 426 */ 427 private boolean isStringEquals(String s1, String s2) { 428 if (StringUtils.isBlank(s1)) { 429 return StringUtils.isBlank(s2); 430 } 431 return s1.equals(s2); 432 } 433}