001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.replication; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Collection; 023import java.util.List; 024import java.util.Map; 025import java.util.Set; 026import java.util.regex.Pattern; 027 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.hbase.Abortable; 030import org.apache.hadoop.hbase.DoNotRetryIOException; 031import org.apache.hadoop.hbase.ReplicationPeerNotFoundException; 032import org.apache.hadoop.hbase.TableName; 033import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 034import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint; 035import org.apache.hadoop.hbase.replication.ReplicationException; 036import org.apache.hadoop.hbase.replication.ReplicationFactory; 037import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 038import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; 039import org.apache.hadoop.hbase.replication.ReplicationPeers; 040import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; 041import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments; 042import org.apache.yetus.audience.InterfaceAudience; 043 044/** 045 * Manages and performs all replication admin operations. 046 * Used to add/remove a replication peer. 047 */ 048@InterfaceAudience.Private 049public class ReplicationManager { 050 051 private final Configuration conf; 052 private final ZKWatcher zkw; 053 private final ReplicationQueuesClient replicationQueuesClient; 054 private final ReplicationPeers replicationPeers; 055 056 public ReplicationManager(Configuration conf, ZKWatcher zkw, Abortable abortable) 057 throws IOException { 058 this.conf = conf; 059 this.zkw = zkw; 060 try { 061 this.replicationQueuesClient = ReplicationFactory 062 .getReplicationQueuesClient(new ReplicationQueuesClientArguments(conf, abortable, zkw)); 063 this.replicationQueuesClient.init(); 064 this.replicationPeers = ReplicationFactory.getReplicationPeers(zkw, conf, 065 this.replicationQueuesClient, abortable); 066 this.replicationPeers.init(); 067 } catch (Exception e) { 068 throw new IOException("Failed to construct ReplicationManager", e); 069 } 070 } 071 072 public void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) 073 throws ReplicationException, IOException { 074 checkPeerConfig(peerConfig); 075 replicationPeers.registerPeer(peerId, peerConfig, enabled); 076 replicationPeers.peerConnected(peerId); 077 } 078 079 public void removeReplicationPeer(String peerId) throws ReplicationException { 080 replicationPeers.peerDisconnected(peerId); 081 replicationPeers.unregisterPeer(peerId); 082 } 083 084 public void enableReplicationPeer(String peerId) throws ReplicationException { 085 this.replicationPeers.enablePeer(peerId); 086 } 087 088 public void disableReplicationPeer(String peerId) throws ReplicationException { 089 this.replicationPeers.disablePeer(peerId); 090 } 091 092 public ReplicationPeerConfig getPeerConfig(String peerId) throws ReplicationException, 093 ReplicationPeerNotFoundException { 094 ReplicationPeerConfig peerConfig = replicationPeers.getReplicationPeerConfig(peerId); 095 if (peerConfig == null) { 096 throw new ReplicationPeerNotFoundException(peerId); 097 } 098 return peerConfig; 099 } 100 101 public void updatePeerConfig(String peerId, ReplicationPeerConfig peerConfig) 102 throws ReplicationException, IOException { 103 checkPeerConfig(peerConfig); 104 this.replicationPeers.updatePeerConfig(peerId, peerConfig); 105 } 106 107 public List<ReplicationPeerDescription> listReplicationPeers(Pattern pattern) 108 throws ReplicationException { 109 List<ReplicationPeerDescription> peers = new ArrayList<>(); 110 List<String> peerIds = replicationPeers.getAllPeerIds(); 111 for (String peerId : peerIds) { 112 if (pattern == null || (pattern != null && pattern.matcher(peerId).matches())) { 113 peers.add(new ReplicationPeerDescription(peerId, replicationPeers 114 .getStatusOfPeerFromBackingStore(peerId), replicationPeers 115 .getReplicationPeerConfig(peerId))); 116 } 117 } 118 return peers; 119 } 120 121 /** 122 * If replicate_all flag is true, it means all user tables will be replicated to peer cluster. 123 * Then allow config exclude namespaces or exclude table-cfs which can't be replicated to 124 * peer cluster. 125 * 126 * If replicate_all flag is false, it means all user tables can't be replicated to peer cluster. 127 * Then allow to config namespaces or table-cfs which will be replicated to peer cluster. 128 */ 129 private void checkPeerConfig(ReplicationPeerConfig peerConfig) 130 throws ReplicationException, IOException { 131 if (peerConfig.replicateAllUserTables()) { 132 if ((peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty()) 133 || (peerConfig.getTableCFsMap() != null && !peerConfig.getTableCFsMap().isEmpty())) { 134 throw new ReplicationException("Need clean namespaces or table-cfs config firstly" 135 + " when replicate_all flag is true"); 136 } 137 checkNamespacesAndTableCfsConfigConflict(peerConfig.getExcludeNamespaces(), 138 peerConfig.getExcludeTableCFsMap()); 139 } else { 140 if ((peerConfig.getExcludeNamespaces() != null 141 && !peerConfig.getExcludeNamespaces().isEmpty()) 142 || (peerConfig.getExcludeTableCFsMap() != null 143 && !peerConfig.getExcludeTableCFsMap().isEmpty())) { 144 throw new ReplicationException( 145 "Need clean exclude-namespaces or exclude-table-cfs config firstly" 146 + " when replicate_all flag is false"); 147 } 148 checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(), 149 peerConfig.getTableCFsMap()); 150 } 151 checkConfiguredWALEntryFilters(peerConfig); 152 } 153 154 /** 155 * Set a namespace in the peer config means that all tables in this namespace will be replicated 156 * to the peer cluster. 157 * 1. If peer config already has a namespace, then not allow set any table of this namespace 158 * to the peer config. 159 * 2. If peer config already has a table, then not allow set this table's namespace to the peer 160 * config. 161 * 162 * Set a exclude namespace in the peer config means that all tables in this namespace can't be 163 * replicated to the peer cluster. 164 * 1. If peer config already has a exclude namespace, then not allow set any exclude table of 165 * this namespace to the peer config. 166 * 2. If peer config already has a exclude table, then not allow set this table's namespace 167 * as a exclude namespace. 168 */ 169 private void checkNamespacesAndTableCfsConfigConflict(Set<String> namespaces, 170 Map<TableName, ? extends Collection<String>> tableCfs) throws ReplicationException { 171 if (namespaces == null || namespaces.isEmpty()) { 172 return; 173 } 174 if (tableCfs == null || tableCfs.isEmpty()) { 175 return; 176 } 177 for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) { 178 TableName table = entry.getKey(); 179 if (namespaces.contains(table.getNamespaceAsString())) { 180 throw new ReplicationException("Table-cfs " + table + " is conflict with namespaces " 181 + table.getNamespaceAsString() + " in peer config"); 182 } 183 } 184 } 185 186 private void checkConfiguredWALEntryFilters(ReplicationPeerConfig peerConfig) 187 throws IOException { 188 String filterCSV = peerConfig.getConfiguration(). 189 get(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY); 190 if (filterCSV != null && !filterCSV.isEmpty()){ 191 String [] filters = filterCSV.split(","); 192 for (String filter : filters) { 193 try { 194 Class clazz = Class.forName(filter); 195 Object o = clazz.getDeclaredConstructor().newInstance(); 196 } catch (Exception e) { 197 throw new DoNotRetryIOException("Configured WALEntryFilter " + filter + 198 " could not be created. Failing add/update " + "peer operation.", e); 199 } 200 } 201 } 202 } 203}