001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import java.util.Collection; 021import java.util.Collections; 022import java.util.HashMap; 023import java.util.List; 024import java.util.Map; 025import java.util.Set; 026import java.util.TreeMap; 027import org.apache.commons.lang3.StringUtils; 028import org.apache.hadoop.hbase.TableName; 029import org.apache.hadoop.hbase.util.Bytes; 030import org.apache.yetus.audience.InterfaceAudience; 031 032import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 033 034/** 035 * A configuration for the replication peer cluster. 036 */ 037@InterfaceAudience.Public 038public class ReplicationPeerConfig { 039 040 private String clusterKey; 041 private String replicationEndpointImpl; 042 private final Map<byte[], byte[]> peerData; 043 private final Map<String, String> configuration; 044 private Map<TableName, ? extends Collection<String>> tableCFsMap = null; 045 private Set<String> namespaces = null; 046 // Default value is true, means replicate all user tables to peer cluster. 047 private boolean replicateAllUserTables = true; 048 private Map<TableName, ? extends Collection<String>> excludeTableCFsMap = null; 049 private Set<String> excludeNamespaces = null; 050 private long bandwidth = 0; 051 private final boolean serial; 052 // Used by synchronous replication 053 private String remoteWALDir; 054 055 private ReplicationPeerConfig(ReplicationPeerConfigBuilderImpl builder) { 056 this.clusterKey = builder.clusterKey; 057 this.replicationEndpointImpl = builder.replicationEndpointImpl; 058 this.peerData = Collections.unmodifiableMap(builder.peerData); 059 this.configuration = Collections.unmodifiableMap(builder.configuration); 060 this.tableCFsMap = 061 builder.tableCFsMap != null ? unmodifiableTableCFsMap(builder.tableCFsMap) : null; 062 this.namespaces = 063 builder.namespaces != null ? Collections.unmodifiableSet(builder.namespaces) : null; 064 this.replicateAllUserTables = builder.replicateAllUserTables; 065 this.excludeTableCFsMap = builder.excludeTableCFsMap != null 066 ? unmodifiableTableCFsMap(builder.excludeTableCFsMap) 067 : null; 068 this.excludeNamespaces = builder.excludeNamespaces != null 069 ? Collections.unmodifiableSet(builder.excludeNamespaces) 070 : null; 071 this.bandwidth = builder.bandwidth; 072 this.serial = builder.serial; 073 this.remoteWALDir = builder.remoteWALDir; 074 } 075 076 private Map<TableName, List<String>> 077 unmodifiableTableCFsMap(Map<TableName, List<String>> tableCFsMap) { 078 Map<TableName, List<String>> newTableCFsMap = new HashMap<>(); 079 tableCFsMap.forEach((table, cfs) -> newTableCFsMap.put(table, 080 cfs != null ? Collections.unmodifiableList(cfs) : null)); 081 return Collections.unmodifiableMap(newTableCFsMap); 082 } 083 084 public String getClusterKey() { 085 return clusterKey; 086 } 087 088 public String getReplicationEndpointImpl() { 089 return replicationEndpointImpl; 090 } 091 092 public Map<byte[], byte[]> getPeerData() { 093 return peerData; 094 } 095 096 public Map<String, String> getConfiguration() { 097 return configuration; 098 } 099 100 public Map<TableName, List<String>> getTableCFsMap() { 101 return (Map<TableName, List<String>>) tableCFsMap; 102 } 103 104 public Set<String> getNamespaces() { 105 return this.namespaces; 106 } 107 108 public long getBandwidth() { 109 return this.bandwidth; 110 } 111 112 public boolean replicateAllUserTables() { 113 return this.replicateAllUserTables; 114 } 115 116 public Map<TableName, List<String>> getExcludeTableCFsMap() { 117 return (Map<TableName, List<String>>) excludeTableCFsMap; 118 } 119 120 public Set<String> getExcludeNamespaces() { 121 return this.excludeNamespaces; 122 } 123 124 public String getRemoteWALDir() { 125 return this.remoteWALDir; 126 } 127 128 /** 129 * Use remote wal dir to decide whether a peer is sync replication peer 130 */ 131 public boolean isSyncReplication() { 132 return !StringUtils.isBlank(this.remoteWALDir); 133 } 134 135 public static ReplicationPeerConfigBuilder newBuilder() { 136 return new ReplicationPeerConfigBuilderImpl(); 137 } 138 139 public boolean isSerial() { 140 return serial; 141 } 142 143 public static ReplicationPeerConfigBuilder newBuilder(ReplicationPeerConfig peerConfig) { 144 ReplicationPeerConfigBuilderImpl builder = new ReplicationPeerConfigBuilderImpl(); 145 builder.setClusterKey(peerConfig.getClusterKey()) 146 .setReplicationEndpointImpl(peerConfig.getReplicationEndpointImpl()) 147 .putAllPeerData(peerConfig.getPeerData()).putAllConfiguration(peerConfig.getConfiguration()) 148 .setTableCFsMap(peerConfig.getTableCFsMap()).setNamespaces(peerConfig.getNamespaces()) 149 .setReplicateAllUserTables(peerConfig.replicateAllUserTables()) 150 .setExcludeTableCFsMap(peerConfig.getExcludeTableCFsMap()) 151 .setExcludeNamespaces(peerConfig.getExcludeNamespaces()) 152 .setBandwidth(peerConfig.getBandwidth()).setSerial(peerConfig.isSerial()) 153 .setRemoteWALDir(peerConfig.getRemoteWALDir()); 154 return builder; 155 } 156 157 static class ReplicationPeerConfigBuilderImpl implements ReplicationPeerConfigBuilder { 158 159 private String clusterKey; 160 161 private String replicationEndpointImpl; 162 163 private Map<byte[], byte[]> peerData = new TreeMap<>(Bytes.BYTES_COMPARATOR); 164 165 private Map<String, String> configuration = new HashMap<>(); 166 167 private Map<TableName, List<String>> tableCFsMap = null; 168 169 private Set<String> namespaces = null; 170 171 // Default value is true, means replicate all user tables to peer cluster. 172 private boolean replicateAllUserTables = true; 173 174 private Map<TableName, List<String>> excludeTableCFsMap = null; 175 176 private Set<String> excludeNamespaces = null; 177 178 private long bandwidth = 0; 179 180 private boolean serial = false; 181 182 private String remoteWALDir = null; 183 184 @Override 185 public ReplicationPeerConfigBuilder setClusterKey(String clusterKey) { 186 this.clusterKey = clusterKey != null ? clusterKey.trim() : null; 187 return this; 188 } 189 190 @Override 191 public ReplicationPeerConfigBuilder setReplicationEndpointImpl(String replicationEndpointImpl) { 192 this.replicationEndpointImpl = replicationEndpointImpl; 193 return this; 194 } 195 196 @Override 197 public ReplicationPeerConfigBuilder putConfiguration(String key, String value) { 198 this.configuration.put(key, value); 199 return this; 200 } 201 202 @Override 203 public ReplicationPeerConfigBuilder removeConfiguration(String key) { 204 this.configuration.remove(key); 205 return this; 206 } 207 208 @Override 209 public ReplicationPeerConfigBuilder putPeerData(byte[] key, byte[] value) { 210 this.peerData.put(key, value); 211 return this; 212 } 213 214 @Override 215 public ReplicationPeerConfigBuilder setTableCFsMap(Map<TableName, List<String>> tableCFsMap) { 216 this.tableCFsMap = tableCFsMap; 217 return this; 218 } 219 220 @Override 221 public ReplicationPeerConfigBuilder setNamespaces(Set<String> namespaces) { 222 this.namespaces = namespaces; 223 return this; 224 } 225 226 @Override 227 public ReplicationPeerConfigBuilder setReplicateAllUserTables(boolean replicateAllUserTables) { 228 this.replicateAllUserTables = replicateAllUserTables; 229 return this; 230 } 231 232 @Override 233 public ReplicationPeerConfigBuilder 234 setExcludeTableCFsMap(Map<TableName, List<String>> excludeTableCFsMap) { 235 this.excludeTableCFsMap = excludeTableCFsMap; 236 return this; 237 } 238 239 @Override 240 public ReplicationPeerConfigBuilder setExcludeNamespaces(Set<String> excludeNamespaces) { 241 this.excludeNamespaces = excludeNamespaces; 242 return this; 243 } 244 245 @Override 246 public ReplicationPeerConfigBuilder setBandwidth(long bandwidth) { 247 this.bandwidth = bandwidth; 248 return this; 249 } 250 251 @Override 252 public ReplicationPeerConfigBuilder setSerial(boolean serial) { 253 this.serial = serial; 254 return this; 255 } 256 257 @Override 258 public ReplicationPeerConfigBuilder setRemoteWALDir(String dir) { 259 this.remoteWALDir = dir; 260 return this; 261 } 262 263 @Override 264 public ReplicationPeerConfig build() { 265 // It would be nice to validate the configuration, but we have to work with "old" data 266 // from ZK which makes it much more difficult. 267 return new ReplicationPeerConfig(this); 268 } 269 } 270 271 @Override 272 public String toString() { 273 StringBuilder builder = new StringBuilder("clusterKey=").append(clusterKey).append(","); 274 builder.append("replicationEndpointImpl=").append(replicationEndpointImpl).append(","); 275 builder.append("replicateAllUserTables=").append(replicateAllUserTables).append(","); 276 if (replicateAllUserTables) { 277 if (excludeNamespaces != null) { 278 builder.append("excludeNamespaces=").append(excludeNamespaces.toString()).append(","); 279 } 280 if (excludeTableCFsMap != null) { 281 builder.append("excludeTableCFsMap=").append(excludeTableCFsMap.toString()).append(","); 282 } 283 } else { 284 if (namespaces != null) { 285 builder.append("namespaces=").append(namespaces.toString()).append(","); 286 } 287 if (tableCFsMap != null) { 288 builder.append("tableCFs=").append(tableCFsMap.toString()).append(","); 289 } 290 } 291 builder.append("bandwidth=").append(bandwidth).append(","); 292 builder.append("serial=").append(serial); 293 if (this.remoteWALDir != null) { 294 builder.append(",remoteWALDir=").append(remoteWALDir); 295 } 296 return builder.toString(); 297 } 298 299 /** 300 * Decide whether the table need replicate to the peer cluster 301 * @param table name of the table 302 * @return true if the table need replicate to the peer cluster 303 */ 304 public boolean needToReplicate(TableName table) { 305 return needToReplicate(table, null); 306 } 307 308 /** 309 * Decide whether the passed family of the table need replicate to the peer cluster according to 310 * this peer config. 311 * @param table name of the table 312 * @param family family name 313 * @return true if (the family of) the table need replicate to the peer cluster. If passed family 314 * is null, return true if any CFs of the table need replicate; If passed family is not 315 * null, return true if the passed family need replicate. 316 */ 317 public boolean needToReplicate(TableName table, byte[] family) { 318 String namespace = table.getNamespaceAsString(); 319 if (replicateAllUserTables) { 320 // replicate all user tables, but filter by exclude namespaces and table-cfs config 321 if (excludeNamespaces != null && excludeNamespaces.contains(namespace)) { 322 return false; 323 } 324 // trap here, must check existence first since HashMap allows null value. 325 if (excludeTableCFsMap == null || !excludeTableCFsMap.containsKey(table)) { 326 return true; 327 } 328 Collection<String> cfs = excludeTableCFsMap.get(table); 329 // If cfs is null or empty then we can make sure that we do not need to replicate this table, 330 // otherwise, we may still need to replicate the table but filter out some families. 331 return cfs != null && !cfs.isEmpty() 332 // If exclude-table-cfs contains passed family then we make sure that we do not need to 333 // replicate this family. 334 && (family == null || !cfs.contains(Bytes.toString(family))); 335 } else { 336 // Not replicate all user tables, so filter by namespaces and table-cfs config 337 if (namespaces == null && tableCFsMap == null) { 338 return false; 339 } 340 // First filter by namespaces config 341 // If table's namespace in peer config, all the tables data are applicable for replication 342 if (namespaces != null && namespaces.contains(namespace)) { 343 return true; 344 } 345 // If table-cfs contains this table then we can make sure that we need replicate some CFs of 346 // this table. Further we need all CFs if tableCFsMap.get(table) is null or empty. 347 return tableCFsMap != null && tableCFsMap.containsKey(table) 348 && (family == null || CollectionUtils.isEmpty(tableCFsMap.get(table)) 349 // If table-cfs must contain passed family then we need to replicate this family. 350 || tableCFsMap.get(table).contains(Bytes.toString(family))); 351 } 352 } 353}