001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.replication; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Collection; 023import java.util.List; 024import java.util.Map; 025import java.util.Optional; 026import java.util.Set; 027import java.util.concurrent.ConcurrentHashMap; 028import java.util.concurrent.ConcurrentMap; 029import java.util.regex.Pattern; 030import java.util.stream.Collectors; 031import org.apache.commons.lang3.StringUtils; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.hbase.DoNotRetryIOException; 034import org.apache.hadoop.hbase.HBaseConfiguration; 035import org.apache.hadoop.hbase.ServerName; 036import org.apache.hadoop.hbase.TableName; 037import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; 038import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint; 039import org.apache.hadoop.hbase.replication.ReplicationEndpoint; 040import org.apache.hadoop.hbase.replication.ReplicationException; 041import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 042import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder; 043import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; 044import org.apache.hadoop.hbase.replication.ReplicationPeerStorage; 045import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; 046import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 047import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; 048import org.apache.hadoop.hbase.replication.ReplicationUtils; 049import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint; 050import org.apache.hadoop.hbase.zookeeper.ZKClusterId; 051import org.apache.hadoop.hbase.zookeeper.ZKConfig; 052import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 053import org.apache.yetus.audience.InterfaceAudience; 054import org.apache.zookeeper.KeeperException; 055 056/** 057 * Manages and performs all replication admin operations. 058 * <p> 059 * Used to add/remove a replication peer. 060 */ 061@InterfaceAudience.Private 062public class ReplicationPeerManager { 063 064 private final ReplicationPeerStorage peerStorage; 065 066 private final ReplicationQueueStorage queueStorage; 067 068 private final ConcurrentMap<String, ReplicationPeerDescription> peers; 069 070 private final String clusterId; 071 072 private final Configuration conf; 073 074 ReplicationPeerManager(ReplicationPeerStorage peerStorage, ReplicationQueueStorage queueStorage, 075 ConcurrentMap<String, ReplicationPeerDescription> peers, Configuration conf, String clusterId) { 076 this.peerStorage = peerStorage; 077 this.queueStorage = queueStorage; 078 this.peers = peers; 079 this.conf = conf; 080 this.clusterId = clusterId; 081 } 082 083 private void checkQueuesDeleted(String peerId) 084 throws ReplicationException, DoNotRetryIOException { 085 for (ServerName replicator : queueStorage.getListOfReplicators()) { 086 List<String> queueIds = queueStorage.getAllQueues(replicator); 087 for (String queueId : queueIds) { 088 ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); 089 if (queueInfo.getPeerId().equals(peerId)) { 090 throw new DoNotRetryIOException("undeleted queue for peerId: " + peerId + 091 ", replicator: " + replicator + ", queueId: " + queueId); 092 } 093 } 094 } 095 if (queueStorage.getAllPeersFromHFileRefsQueue().contains(peerId)) { 096 throw new DoNotRetryIOException("Undeleted queue for peer " + peerId + " in hfile-refs"); 097 } 098 } 099 100 void preAddPeer(String peerId, ReplicationPeerConfig peerConfig) 101 throws DoNotRetryIOException, ReplicationException { 102 if (peerId.contains("-")) { 103 throw new DoNotRetryIOException("Found invalid peer name: " + peerId); 104 } 105 checkPeerConfig(peerConfig); 106 if (peers.containsKey(peerId)) { 107 throw new DoNotRetryIOException("Replication peer " + peerId + " already exists"); 108 } 109 // make sure that there is no queues with the same peer id. This may happen when we create a 110 // peer with the same id with a old deleted peer. If the replication queues for the old peer 111 // have not been cleaned up yet then we should not create the new peer, otherwise the old wal 112 // file may also be replicated. 113 checkQueuesDeleted(peerId); 114 } 115 116 private ReplicationPeerDescription checkPeerExists(String peerId) throws DoNotRetryIOException { 117 ReplicationPeerDescription desc = peers.get(peerId); 118 if (desc == null) { 119 throw new DoNotRetryIOException("Replication peer " + peerId + " does not exist"); 120 } 121 return desc; 122 } 123 124 ReplicationPeerConfig preRemovePeer(String peerId) throws DoNotRetryIOException { 125 return checkPeerExists(peerId).getPeerConfig(); 126 } 127 128 void preEnablePeer(String peerId) throws DoNotRetryIOException { 129 ReplicationPeerDescription desc = checkPeerExists(peerId); 130 if (desc.isEnabled()) { 131 throw new DoNotRetryIOException("Replication peer " + peerId + " has already been enabled"); 132 } 133 } 134 135 void preDisablePeer(String peerId) throws DoNotRetryIOException { 136 ReplicationPeerDescription desc = checkPeerExists(peerId); 137 if (!desc.isEnabled()) { 138 throw new DoNotRetryIOException("Replication peer " + peerId + " has already been disabled"); 139 } 140 } 141 142 /** 143 * Return the old peer description. Can never be null. 144 */ 145 ReplicationPeerDescription preUpdatePeerConfig(String peerId, ReplicationPeerConfig peerConfig) 146 throws DoNotRetryIOException { 147 checkPeerConfig(peerConfig); 148 ReplicationPeerDescription desc = checkPeerExists(peerId); 149 ReplicationPeerConfig oldPeerConfig = desc.getPeerConfig(); 150 if (!isStringEquals(peerConfig.getClusterKey(), oldPeerConfig.getClusterKey())) { 151 throw new DoNotRetryIOException( 152 "Changing the cluster key on an existing peer is not allowed. Existing key '" + 153 oldPeerConfig.getClusterKey() + "' for peer " + peerId + " does not match new key '" + 154 peerConfig.getClusterKey() + "'"); 155 } 156 157 if (!isStringEquals(peerConfig.getReplicationEndpointImpl(), 158 oldPeerConfig.getReplicationEndpointImpl())) { 159 throw new DoNotRetryIOException("Changing the replication endpoint implementation class " + 160 "on an existing peer is not allowed. Existing class '" + 161 oldPeerConfig.getReplicationEndpointImpl() + "' for peer " + peerId + 162 " does not match new class '" + peerConfig.getReplicationEndpointImpl() + "'"); 163 } 164 return desc; 165 } 166 167 public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) 168 throws ReplicationException { 169 if (peers.containsKey(peerId)) { 170 // this should be a retry, just return 171 return; 172 } 173 peerConfig = ReplicationPeerConfigUtil.addBasePeerConfigsIfNotPresent(conf, peerConfig); 174 ReplicationPeerConfig copiedPeerConfig = ReplicationPeerConfig.newBuilder(peerConfig).build(); 175 peerStorage.addPeer(peerId, copiedPeerConfig, enabled); 176 peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, copiedPeerConfig)); 177 } 178 179 public void removePeer(String peerId) throws ReplicationException { 180 if (!peers.containsKey(peerId)) { 181 // this should be a retry, just return 182 return; 183 } 184 peerStorage.removePeer(peerId); 185 peers.remove(peerId); 186 } 187 188 private void setPeerState(String peerId, boolean enabled) throws ReplicationException { 189 ReplicationPeerDescription desc = peers.get(peerId); 190 if (desc.isEnabled() == enabled) { 191 // this should be a retry, just return 192 return; 193 } 194 peerStorage.setPeerState(peerId, enabled); 195 peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, desc.getPeerConfig())); 196 } 197 198 public void enablePeer(String peerId) throws ReplicationException { 199 setPeerState(peerId, true); 200 } 201 202 public void disablePeer(String peerId) throws ReplicationException { 203 setPeerState(peerId, false); 204 } 205 206 public void updatePeerConfig(String peerId, ReplicationPeerConfig peerConfig) 207 throws ReplicationException { 208 // the checking rules are too complicated here so we give up checking whether this is a retry. 209 ReplicationPeerDescription desc = peers.get(peerId); 210 ReplicationPeerConfig oldPeerConfig = desc.getPeerConfig(); 211 ReplicationPeerConfigBuilder newPeerConfigBuilder = 212 ReplicationPeerConfig.newBuilder(peerConfig); 213 // we need to use the new conf to overwrite the old one. 214 newPeerConfigBuilder.putAllConfiguration(oldPeerConfig.getConfiguration()); 215 newPeerConfigBuilder.putAllConfiguration(peerConfig.getConfiguration()); 216 newPeerConfigBuilder.putAllConfiguration(oldPeerConfig.getConfiguration()); 217 newPeerConfigBuilder.putAllConfiguration(peerConfig.getConfiguration()); 218 ReplicationPeerConfig newPeerConfig = newPeerConfigBuilder.build(); 219 peerStorage.updatePeerConfig(peerId, newPeerConfig); 220 peers.put(peerId, new ReplicationPeerDescription(peerId, desc.isEnabled(), newPeerConfig)); 221 } 222 223 public List<ReplicationPeerDescription> listPeers(Pattern pattern) { 224 if (pattern == null) { 225 return new ArrayList<>(peers.values()); 226 } 227 return peers.values().stream().filter(r -> pattern.matcher(r.getPeerId()).matches()) 228 .collect(Collectors.toList()); 229 } 230 231 public Optional<ReplicationPeerConfig> getPeerConfig(String peerId) { 232 ReplicationPeerDescription desc = peers.get(peerId); 233 return desc != null ? Optional.of(desc.getPeerConfig()) : Optional.empty(); 234 } 235 236 void removeAllLastPushedSeqIds(String peerId) throws ReplicationException { 237 queueStorage.removeLastSequenceIds(peerId); 238 } 239 240 void removeAllQueuesAndHFileRefs(String peerId) throws ReplicationException { 241 // Here we need two passes to address the problem of claimQueue. Maybe a claimQueue is still 242 // on-going when the refresh peer config procedure is done, if a RS which has already been 243 // scanned claims the queue of a RS which has not been scanned yet, we will miss that queue in 244 // the scan here, and if the RS who has claimed the queue crashed before creating recovered 245 // source, then the queue will leave there until the another RS detects the crash and helps 246 // removing the queue. 247 // A two pass scan can solve the problem. Anyway, the queue will not disappear during the 248 // claiming, it will either under the old RS or under the new RS, and a queue can only be 249 // claimed once after the refresh peer procedure done(as the next claim queue will just delete 250 // it), so we can make sure that a two pass scan will finally find the queue and remove it, 251 // unless it has already been removed by others. 252 ReplicationUtils.removeAllQueues(queueStorage, peerId); 253 ReplicationUtils.removeAllQueues(queueStorage, peerId); 254 queueStorage.removePeerFromHFileRefs(peerId); 255 } 256 257 private void checkPeerConfig(ReplicationPeerConfig peerConfig) throws DoNotRetryIOException { 258 String replicationEndpointImpl = peerConfig.getReplicationEndpointImpl(); 259 ReplicationEndpoint endpoint = null; 260 if (!StringUtils.isBlank(replicationEndpointImpl)) { 261 try { 262 // try creating a instance 263 endpoint = Class.forName(replicationEndpointImpl) 264 .asSubclass(ReplicationEndpoint.class).getDeclaredConstructor().newInstance(); 265 } catch (Throwable e) { 266 throw new DoNotRetryIOException( 267 "Can not instantiate configured replication endpoint class=" + replicationEndpointImpl, 268 e); 269 } 270 } 271 // Default is HBaseInterClusterReplicationEndpoint and only it need to check cluster key 272 if (endpoint == null || endpoint instanceof HBaseInterClusterReplicationEndpoint) { 273 checkClusterKey(peerConfig.getClusterKey()); 274 } 275 // Default is HBaseInterClusterReplicationEndpoint which cannot replicate to same cluster 276 if (endpoint == null || !endpoint.canReplicateToSameCluster()) { 277 checkClusterId(peerConfig.getClusterKey()); 278 } 279 280 if (peerConfig.replicateAllUserTables()) { 281 // If replicate_all flag is true, it means all user tables will be replicated to peer cluster. 282 // Then allow config exclude namespaces or exclude table-cfs which can't be replicated to peer 283 // cluster. 284 if ((peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty()) 285 || (peerConfig.getTableCFsMap() != null && !peerConfig.getTableCFsMap().isEmpty())) { 286 throw new DoNotRetryIOException("Need clean namespaces or table-cfs config firstly " 287 + "when you want replicate all cluster"); 288 } 289 checkNamespacesAndTableCfsConfigConflict(peerConfig.getExcludeNamespaces(), 290 peerConfig.getExcludeTableCFsMap()); 291 } else { 292 // If replicate_all flag is false, it means all user tables can't be replicated to peer 293 // cluster. Then allow to config namespaces or table-cfs which will be replicated to peer 294 // cluster. 295 if ((peerConfig.getExcludeNamespaces() != null 296 && !peerConfig.getExcludeNamespaces().isEmpty()) 297 || (peerConfig.getExcludeTableCFsMap() != null 298 && !peerConfig.getExcludeTableCFsMap().isEmpty())) { 299 throw new DoNotRetryIOException( 300 "Need clean exclude-namespaces or exclude-table-cfs config firstly" 301 + " when replicate_all flag is false"); 302 } 303 checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(), 304 peerConfig.getTableCFsMap()); 305 } 306 307 checkConfiguredWALEntryFilters(peerConfig); 308 } 309 310 /** 311 * Set a namespace in the peer config means that all tables in this namespace will be replicated 312 * to the peer cluster. 313 * <ol> 314 * <li>If peer config already has a namespace, then not allow set any table of this namespace to 315 * the peer config.</li> 316 * <li>If peer config already has a table, then not allow set this table's namespace to the peer 317 * config.</li> 318 * </ol> 319 * <p> 320 * Set a exclude namespace in the peer config means that all tables in this namespace can't be 321 * replicated to the peer cluster. 322 * <ol> 323 * <li>If peer config already has a exclude namespace, then not allow set any exclude table of 324 * this namespace to the peer config.</li> 325 * <li>If peer config already has a exclude table, then not allow set this table's namespace as a 326 * exclude namespace.</li> 327 * </ol> 328 */ 329 private void checkNamespacesAndTableCfsConfigConflict(Set<String> namespaces, 330 Map<TableName, ? extends Collection<String>> tableCfs) throws DoNotRetryIOException { 331 if (namespaces == null || namespaces.isEmpty()) { 332 return; 333 } 334 if (tableCfs == null || tableCfs.isEmpty()) { 335 return; 336 } 337 for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) { 338 TableName table = entry.getKey(); 339 if (namespaces.contains(table.getNamespaceAsString())) { 340 throw new DoNotRetryIOException("Table-cfs " + table + " is conflict with namespaces " + 341 table.getNamespaceAsString() + " in peer config"); 342 } 343 } 344 } 345 346 private void checkConfiguredWALEntryFilters(ReplicationPeerConfig peerConfig) 347 throws DoNotRetryIOException { 348 String filterCSV = peerConfig.getConfiguration() 349 .get(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY); 350 if (filterCSV != null && !filterCSV.isEmpty()) { 351 String[] filters = filterCSV.split(","); 352 for (String filter : filters) { 353 try { 354 Class.forName(filter).getDeclaredConstructor().newInstance(); 355 } catch (Exception e) { 356 throw new DoNotRetryIOException("Configured WALEntryFilter " + filter + 357 " could not be created. Failing add/update peer operation.", e); 358 } 359 } 360 } 361 } 362 363 private void checkClusterKey(String clusterKey) throws DoNotRetryIOException { 364 try { 365 ZKConfig.validateClusterKey(clusterKey); 366 } catch (IOException e) { 367 throw new DoNotRetryIOException("Invalid cluster key: " + clusterKey, e); 368 } 369 } 370 371 private void checkClusterId(String clusterKey) throws DoNotRetryIOException { 372 String peerClusterId = ""; 373 try { 374 // Create the peer cluster config for get peer cluster id 375 Configuration peerConf = HBaseConfiguration.createClusterConf(conf, clusterKey); 376 try (ZKWatcher zkWatcher = new ZKWatcher(peerConf, this + "check-peer-cluster-id", null)) { 377 peerClusterId = ZKClusterId.readClusterIdZNode(zkWatcher); 378 } 379 } catch (IOException | KeeperException e) { 380 throw new DoNotRetryIOException("Can't get peerClusterId for clusterKey=" + clusterKey, e); 381 } 382 // In rare case, zookeeper setting may be messed up. That leads to the incorrect 383 // peerClusterId value, which is the same as the source clusterId 384 if (clusterId.equals(peerClusterId)) { 385 throw new DoNotRetryIOException("Invalid cluster key: " + clusterKey 386 + ", should not replicate to itself for HBaseInterClusterReplicationEndpoint"); 387 } 388 } 389 390 public List<String> getSerialPeerIdsBelongsTo(TableName tableName) { 391 return peers.values().stream().filter(p -> p.getPeerConfig().isSerial()) 392 .filter(p -> p.getPeerConfig().needToReplicate(tableName)).map(p -> p.getPeerId()) 393 .collect(Collectors.toList()); 394 } 395 396 public ReplicationQueueStorage getQueueStorage() { 397 return queueStorage; 398 } 399 400 public static ReplicationPeerManager create(ZKWatcher zk, Configuration conf, String clusterId) 401 throws ReplicationException { 402 ReplicationPeerStorage peerStorage = 403 ReplicationStorageFactory.getReplicationPeerStorage(zk, conf); 404 ConcurrentMap<String, ReplicationPeerDescription> peers = new ConcurrentHashMap<>(); 405 for (String peerId : peerStorage.listPeerIds()) { 406 ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId); 407 408 peerConfig = ReplicationPeerConfigUtil.addBasePeerConfigsIfNotPresent(conf, peerConfig); 409 peerStorage.updatePeerConfig(peerId, peerConfig); 410 boolean enabled = peerStorage.isPeerEnabled(peerId); 411 peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, peerConfig)); 412 } 413 return new ReplicationPeerManager(peerStorage, 414 ReplicationStorageFactory.getReplicationQueueStorage(zk, conf), peers, conf, clusterId); 415 } 416 417 /** 418 * For replication peer cluster key or endpoint class, null and empty string is same. So here 419 * don't use {@link StringUtils#equals(CharSequence, CharSequence)} directly. 420 */ 421 private boolean isStringEquals(String s1, String s2) { 422 if (StringUtils.isBlank(s1)) { 423 return StringUtils.isBlank(s2); 424 } 425 return s1.equals(s2); 426 } 427}