001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.replication; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Collection; 023import java.util.List; 024import java.util.Map; 025import java.util.Optional; 026import java.util.Set; 027import java.util.concurrent.ConcurrentHashMap; 028import java.util.concurrent.ConcurrentMap; 029import java.util.regex.Pattern; 030import java.util.stream.Collectors; 031import org.apache.commons.lang3.StringUtils; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.hbase.DoNotRetryIOException; 034import org.apache.hadoop.hbase.ServerName; 035import org.apache.hadoop.hbase.TableName; 036import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint; 037import org.apache.hadoop.hbase.replication.ReplicationEndpoint; 038import org.apache.hadoop.hbase.replication.ReplicationException; 039import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 040import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder; 041import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; 042import org.apache.hadoop.hbase.replication.ReplicationPeerStorage; 043import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; 044import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 045import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; 046import org.apache.hadoop.hbase.replication.ReplicationUtils; 047import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint; 048import org.apache.hadoop.hbase.zookeeper.ZKConfig; 049import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 050import org.apache.yetus.audience.InterfaceAudience; 051 052/** 053 * Manages and performs all replication admin operations. 054 * <p> 055 * Used to add/remove a replication peer. 056 */ 057@InterfaceAudience.Private 058public class ReplicationPeerManager { 059 060 private final ReplicationPeerStorage peerStorage; 061 062 private final ReplicationQueueStorage queueStorage; 063 064 private final ConcurrentMap<String, ReplicationPeerDescription> peers; 065 066 ReplicationPeerManager(ReplicationPeerStorage peerStorage, ReplicationQueueStorage queueStorage, 067 ConcurrentMap<String, ReplicationPeerDescription> peers) { 068 this.peerStorage = peerStorage; 069 this.queueStorage = queueStorage; 070 this.peers = peers; 071 } 072 073 private void checkQueuesDeleted(String peerId) 074 throws ReplicationException, DoNotRetryIOException { 075 for (ServerName replicator : queueStorage.getListOfReplicators()) { 076 List<String> queueIds = queueStorage.getAllQueues(replicator); 077 for (String queueId : queueIds) { 078 ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); 079 if (queueInfo.getPeerId().equals(peerId)) { 080 throw new DoNotRetryIOException("undeleted queue for peerId: " + peerId + 081 ", replicator: " + replicator + ", queueId: " + queueId); 082 } 083 } 084 } 085 if (queueStorage.getAllPeersFromHFileRefsQueue().contains(peerId)) { 086 throw new DoNotRetryIOException("Undeleted queue for peer " + peerId + " in hfile-refs"); 087 } 088 } 089 090 void preAddPeer(String peerId, ReplicationPeerConfig peerConfig) 091 throws DoNotRetryIOException, ReplicationException { 092 if (peerId.contains("-")) { 093 throw new DoNotRetryIOException("Found invalid peer name: " + peerId); 094 } 095 checkPeerConfig(peerConfig); 096 if (peers.containsKey(peerId)) { 097 throw new DoNotRetryIOException("Replication peer " + peerId + " already exists"); 098 } 099 // make sure that there is no queues with the same peer id. This may happen when we create a 100 // peer with the same id with a old deleted peer. If the replication queues for the old peer 101 // have not been cleaned up yet then we should not create the new peer, otherwise the old wal 102 // file may also be replicated. 103 checkQueuesDeleted(peerId); 104 } 105 106 private ReplicationPeerDescription checkPeerExists(String peerId) throws DoNotRetryIOException { 107 ReplicationPeerDescription desc = peers.get(peerId); 108 if (desc == null) { 109 throw new DoNotRetryIOException("Replication peer " + peerId + " does not exist"); 110 } 111 return desc; 112 } 113 114 ReplicationPeerConfig preRemovePeer(String peerId) throws DoNotRetryIOException { 115 return checkPeerExists(peerId).getPeerConfig(); 116 } 117 118 void preEnablePeer(String peerId) throws DoNotRetryIOException { 119 ReplicationPeerDescription desc = checkPeerExists(peerId); 120 if (desc.isEnabled()) { 121 throw new DoNotRetryIOException("Replication peer " + peerId + " has already been enabled"); 122 } 123 } 124 125 void preDisablePeer(String peerId) throws DoNotRetryIOException { 126 ReplicationPeerDescription desc = checkPeerExists(peerId); 127 if (!desc.isEnabled()) { 128 throw new DoNotRetryIOException("Replication peer " + peerId + " has already been disabled"); 129 } 130 } 131 132 /** 133 * Return the old peer description. Can never be null. 134 */ 135 ReplicationPeerDescription preUpdatePeerConfig(String peerId, ReplicationPeerConfig peerConfig) 136 throws DoNotRetryIOException { 137 checkPeerConfig(peerConfig); 138 ReplicationPeerDescription desc = checkPeerExists(peerId); 139 ReplicationPeerConfig oldPeerConfig = desc.getPeerConfig(); 140 if (!isStringEquals(peerConfig.getClusterKey(), oldPeerConfig.getClusterKey())) { 141 throw new DoNotRetryIOException( 142 "Changing the cluster key on an existing peer is not allowed. Existing key '" + 143 oldPeerConfig.getClusterKey() + "' for peer " + peerId + " does not match new key '" + 144 peerConfig.getClusterKey() + "'"); 145 } 146 147 if (!isStringEquals(peerConfig.getReplicationEndpointImpl(), 148 oldPeerConfig.getReplicationEndpointImpl())) { 149 throw new DoNotRetryIOException("Changing the replication endpoint implementation class " + 150 "on an existing peer is not allowed. Existing class '" + 151 oldPeerConfig.getReplicationEndpointImpl() + "' for peer " + peerId + 152 " does not match new class '" + peerConfig.getReplicationEndpointImpl() + "'"); 153 } 154 return desc; 155 } 156 157 public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) 158 throws ReplicationException { 159 if (peers.containsKey(peerId)) { 160 // this should be a retry, just return 161 return; 162 } 163 ReplicationPeerConfig copiedPeerConfig = ReplicationPeerConfig.newBuilder(peerConfig).build(); 164 peerStorage.addPeer(peerId, copiedPeerConfig, enabled); 165 peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, copiedPeerConfig)); 166 } 167 168 public void removePeer(String peerId) throws ReplicationException { 169 if (!peers.containsKey(peerId)) { 170 // this should be a retry, just return 171 return; 172 } 173 peerStorage.removePeer(peerId); 174 peers.remove(peerId); 175 } 176 177 private void setPeerState(String peerId, boolean enabled) throws ReplicationException { 178 ReplicationPeerDescription desc = peers.get(peerId); 179 if (desc.isEnabled() == enabled) { 180 // this should be a retry, just return 181 return; 182 } 183 peerStorage.setPeerState(peerId, enabled); 184 peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, desc.getPeerConfig())); 185 } 186 187 public void enablePeer(String peerId) throws ReplicationException { 188 setPeerState(peerId, true); 189 } 190 191 public void disablePeer(String peerId) throws ReplicationException { 192 setPeerState(peerId, false); 193 } 194 195 public void updatePeerConfig(String peerId, ReplicationPeerConfig peerConfig) 196 throws ReplicationException { 197 // the checking rules are too complicated here so we give up checking whether this is a retry. 198 ReplicationPeerDescription desc = peers.get(peerId); 199 ReplicationPeerConfig oldPeerConfig = desc.getPeerConfig(); 200 ReplicationPeerConfigBuilder newPeerConfigBuilder = 201 ReplicationPeerConfig.newBuilder(peerConfig); 202 // we need to use the new conf to overwrite the old one. 203 newPeerConfigBuilder.putAllConfiguration(oldPeerConfig.getConfiguration()); 204 newPeerConfigBuilder.putAllConfiguration(peerConfig.getConfiguration()); 205 newPeerConfigBuilder.putAllConfiguration(oldPeerConfig.getConfiguration()); 206 newPeerConfigBuilder.putAllConfiguration(peerConfig.getConfiguration()); 207 ReplicationPeerConfig newPeerConfig = newPeerConfigBuilder.build(); 208 peerStorage.updatePeerConfig(peerId, newPeerConfig); 209 peers.put(peerId, new ReplicationPeerDescription(peerId, desc.isEnabled(), newPeerConfig)); 210 } 211 212 public List<ReplicationPeerDescription> listPeers(Pattern pattern) { 213 if (pattern == null) { 214 return new ArrayList<>(peers.values()); 215 } 216 return peers.values().stream().filter(r -> pattern.matcher(r.getPeerId()).matches()) 217 .collect(Collectors.toList()); 218 } 219 220 public Optional<ReplicationPeerConfig> getPeerConfig(String peerId) { 221 ReplicationPeerDescription desc = peers.get(peerId); 222 return desc != null ? Optional.of(desc.getPeerConfig()) : Optional.empty(); 223 } 224 225 void removeAllLastPushedSeqIds(String peerId) throws ReplicationException { 226 queueStorage.removeLastSequenceIds(peerId); 227 } 228 229 void removeAllQueuesAndHFileRefs(String peerId) throws ReplicationException { 230 // Here we need two passes to address the problem of claimQueue. Maybe a claimQueue is still 231 // on-going when the refresh peer config procedure is done, if a RS which has already been 232 // scanned claims the queue of a RS which has not been scanned yet, we will miss that queue in 233 // the scan here, and if the RS who has claimed the queue crashed before creating recovered 234 // source, then the queue will leave there until the another RS detects the crash and helps 235 // removing the queue. 236 // A two pass scan can solve the problem. Anyway, the queue will not disappear during the 237 // claiming, it will either under the old RS or under the new RS, and a queue can only be 238 // claimed once after the refresh peer procedure done(as the next claim queue will just delete 239 // it), so we can make sure that a two pass scan will finally find the queue and remove it, 240 // unless it has already been removed by others. 241 ReplicationUtils.removeAllQueues(queueStorage, peerId); 242 ReplicationUtils.removeAllQueues(queueStorage, peerId); 243 queueStorage.removePeerFromHFileRefs(peerId); 244 } 245 246 private void checkPeerConfig(ReplicationPeerConfig peerConfig) throws DoNotRetryIOException { 247 String replicationEndpointImpl = peerConfig.getReplicationEndpointImpl(); 248 boolean checkClusterKey = true; 249 if (!StringUtils.isBlank(replicationEndpointImpl)) { 250 // try creating a instance 251 ReplicationEndpoint endpoint; 252 try { 253 endpoint = Class.forName(replicationEndpointImpl) 254 .asSubclass(ReplicationEndpoint.class).getDeclaredConstructor().newInstance(); 255 } catch (Throwable e) { 256 throw new DoNotRetryIOException( 257 "Can not instantiate configured replication endpoint class=" + replicationEndpointImpl, 258 e); 259 } 260 // do not check cluster key if we are not HBaseInterClusterReplicationEndpoint 261 if (!(endpoint instanceof HBaseInterClusterReplicationEndpoint)) { 262 checkClusterKey = false; 263 } 264 } 265 if (checkClusterKey) { 266 checkClusterKey(peerConfig.getClusterKey()); 267 } 268 269 if (peerConfig.replicateAllUserTables()) { 270 // If replicate_all flag is true, it means all user tables will be replicated to peer cluster. 271 // Then allow config exclude namespaces or exclude table-cfs which can't be replicated to peer 272 // cluster. 273 if ((peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty()) 274 || (peerConfig.getTableCFsMap() != null && !peerConfig.getTableCFsMap().isEmpty())) { 275 throw new DoNotRetryIOException("Need clean namespaces or table-cfs config firstly " 276 + "when you want replicate all cluster"); 277 } 278 checkNamespacesAndTableCfsConfigConflict(peerConfig.getExcludeNamespaces(), 279 peerConfig.getExcludeTableCFsMap()); 280 } else { 281 // If replicate_all flag is false, it means all user tables can't be replicated to peer 282 // cluster. Then allow to config namespaces or table-cfs which will be replicated to peer 283 // cluster. 284 if ((peerConfig.getExcludeNamespaces() != null 285 && !peerConfig.getExcludeNamespaces().isEmpty()) 286 || (peerConfig.getExcludeTableCFsMap() != null 287 && !peerConfig.getExcludeTableCFsMap().isEmpty())) { 288 throw new DoNotRetryIOException( 289 "Need clean exclude-namespaces or exclude-table-cfs config firstly" 290 + " when replicate_all flag is false"); 291 } 292 checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(), 293 peerConfig.getTableCFsMap()); 294 } 295 296 checkConfiguredWALEntryFilters(peerConfig); 297 } 298 299 /** 300 * Set a namespace in the peer config means that all tables in this namespace will be replicated 301 * to the peer cluster. 302 * <ol> 303 * <li>If peer config already has a namespace, then not allow set any table of this namespace to 304 * the peer config.</li> 305 * <li>If peer config already has a table, then not allow set this table's namespace to the peer 306 * config.</li> 307 * </ol> 308 * <p> 309 * Set a exclude namespace in the peer config means that all tables in this namespace can't be 310 * replicated to the peer cluster. 311 * <ol> 312 * <li>If peer config already has a exclude namespace, then not allow set any exclude table of 313 * this namespace to the peer config.</li> 314 * <li>If peer config already has a exclude table, then not allow set this table's namespace as a 315 * exclude namespace.</li> 316 * </ol> 317 */ 318 private void checkNamespacesAndTableCfsConfigConflict(Set<String> namespaces, 319 Map<TableName, ? extends Collection<String>> tableCfs) throws DoNotRetryIOException { 320 if (namespaces == null || namespaces.isEmpty()) { 321 return; 322 } 323 if (tableCfs == null || tableCfs.isEmpty()) { 324 return; 325 } 326 for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) { 327 TableName table = entry.getKey(); 328 if (namespaces.contains(table.getNamespaceAsString())) { 329 throw new DoNotRetryIOException("Table-cfs " + table + " is conflict with namespaces " + 330 table.getNamespaceAsString() + " in peer config"); 331 } 332 } 333 } 334 335 private void checkConfiguredWALEntryFilters(ReplicationPeerConfig peerConfig) 336 throws DoNotRetryIOException { 337 String filterCSV = peerConfig.getConfiguration() 338 .get(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY); 339 if (filterCSV != null && !filterCSV.isEmpty()) { 340 String[] filters = filterCSV.split(","); 341 for (String filter : filters) { 342 try { 343 Class.forName(filter).getDeclaredConstructor().newInstance(); 344 } catch (Exception e) { 345 throw new DoNotRetryIOException("Configured WALEntryFilter " + filter + 346 " could not be created. Failing add/update peer operation.", e); 347 } 348 } 349 } 350 } 351 352 private void checkClusterKey(String clusterKey) throws DoNotRetryIOException { 353 try { 354 ZKConfig.validateClusterKey(clusterKey); 355 } catch (IOException e) { 356 throw new DoNotRetryIOException("Invalid cluster key: " + clusterKey, e); 357 } 358 } 359 360 public List<String> getSerialPeerIdsBelongsTo(TableName tableName) { 361 return peers.values().stream().filter(p -> p.getPeerConfig().isSerial()) 362 .filter(p -> p.getPeerConfig().needToReplicate(tableName)).map(p -> p.getPeerId()) 363 .collect(Collectors.toList()); 364 } 365 366 public ReplicationQueueStorage getQueueStorage() { 367 return queueStorage; 368 } 369 370 public static ReplicationPeerManager create(ZKWatcher zk, Configuration conf) 371 throws ReplicationException { 372 ReplicationPeerStorage peerStorage = 373 ReplicationStorageFactory.getReplicationPeerStorage(zk, conf); 374 ConcurrentMap<String, ReplicationPeerDescription> peers = new ConcurrentHashMap<>(); 375 for (String peerId : peerStorage.listPeerIds()) { 376 ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId); 377 boolean enabled = peerStorage.isPeerEnabled(peerId); 378 peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, peerConfig)); 379 } 380 return new ReplicationPeerManager(peerStorage, 381 ReplicationStorageFactory.getReplicationQueueStorage(zk, conf), peers); 382 } 383 384 /** 385 * For replication peer cluster key or endpoint class, null and empty string is same. So here 386 * don't use {@link StringUtils#equals(CharSequence, CharSequence)} directly. 387 */ 388 private boolean isStringEquals(String s1, String s2) { 389 if (StringUtils.isBlank(s1)) { 390 return StringUtils.isBlank(s2); 391 } 392 return s1.equals(s2); 393 } 394}