001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.client.replication; 020 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Collection; 024import java.util.HashMap; 025import java.util.HashSet; 026import java.util.List; 027import java.util.Map; 028import java.util.Set; 029import java.util.stream.Collectors; 030 031import org.apache.commons.lang3.StringUtils; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.hbase.CompoundConfiguration; 034import org.apache.hadoop.hbase.HBaseConfiguration; 035import org.apache.hadoop.hbase.TableName; 036import org.apache.hadoop.hbase.exceptions.DeserializationException; 037import org.apache.hadoop.hbase.replication.ReplicationException; 038import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 039import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder; 040import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; 041import org.apache.hadoop.hbase.util.Bytes; 042import org.apache.yetus.audience.InterfaceAudience; 043import org.apache.yetus.audience.InterfaceStability; 044import org.slf4j.Logger; 045import org.slf4j.LoggerFactory; 046 047import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 048import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 049import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 050import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 051import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 052import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; 053 054/** 055 * Helper for TableCFs Operations. 056 */ 057@InterfaceAudience.Private 058@InterfaceStability.Stable 059public final class ReplicationPeerConfigUtil { 060 061 private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeerConfigUtil.class); 062 063 private ReplicationPeerConfigUtil() {} 064 065 public static String convertToString(Set<String> namespaces) { 066 if (namespaces == null) { 067 return null; 068 } 069 return StringUtils.join(namespaces, ';'); 070 } 071 072 /** convert map to TableCFs Object */ 073 public static ReplicationProtos.TableCF[] convert( 074 Map<TableName, ? extends Collection<String>> tableCfs) { 075 if (tableCfs == null) { 076 return null; 077 } 078 List<ReplicationProtos.TableCF> tableCFList = new ArrayList<>(tableCfs.entrySet().size()); 079 ReplicationProtos.TableCF.Builder tableCFBuilder = ReplicationProtos.TableCF.newBuilder(); 080 for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) { 081 tableCFBuilder.clear(); 082 tableCFBuilder.setTableName(ProtobufUtil.toProtoTableName(entry.getKey())); 083 Collection<String> v = entry.getValue(); 084 if (v != null && !v.isEmpty()) { 085 for (String value : entry.getValue()) { 086 tableCFBuilder.addFamilies(ByteString.copyFromUtf8(value)); 087 } 088 } 089 tableCFList.add(tableCFBuilder.build()); 090 } 091 return tableCFList.toArray(new ReplicationProtos.TableCF[tableCFList.size()]); 092 } 093 094 public static String convertToString(Map<TableName, ? extends Collection<String>> tableCfs) { 095 if (tableCfs == null) { 096 return null; 097 } 098 return convert(convert(tableCfs)); 099 } 100 101 /** 102 * Convert string to TableCFs Object. 103 * This is only for read TableCFs information from TableCF node. 104 * Input String Format: ns1.table1:cf1,cf2;ns2.table2:cfA,cfB;ns3.table3. 105 * */ 106 public static ReplicationProtos.TableCF[] convert(String tableCFsConfig) { 107 if (tableCFsConfig == null || tableCFsConfig.trim().length() == 0) { 108 return null; 109 } 110 111 ReplicationProtos.TableCF.Builder tableCFBuilder = ReplicationProtos.TableCF.newBuilder(); 112 String[] tables = tableCFsConfig.split(";"); 113 List<ReplicationProtos.TableCF> tableCFList = new ArrayList<>(tables.length); 114 115 for (String tab : tables) { 116 // 1 ignore empty table config 117 tab = tab.trim(); 118 if (tab.length() == 0) { 119 continue; 120 } 121 // 2 split to "table" and "cf1,cf2" 122 // for each table: "table#cf1,cf2" or "table" 123 String[] pair = tab.split(":"); 124 String tabName = pair[0].trim(); 125 if (pair.length > 2 || tabName.length() == 0) { 126 LOG.info("incorrect format:" + tableCFsConfig); 127 continue; 128 } 129 130 tableCFBuilder.clear(); 131 // split namespace from tableName 132 String ns = "default"; 133 String tName = tabName; 134 String[] dbs = tabName.split("\\."); 135 if (dbs != null && dbs.length == 2) { 136 ns = dbs[0]; 137 tName = dbs[1]; 138 } 139 tableCFBuilder.setTableName( 140 ProtobufUtil.toProtoTableName(TableName.valueOf(ns, tName))); 141 142 // 3 parse "cf1,cf2" part to List<cf> 143 if (pair.length == 2) { 144 String[] cfsList = pair[1].split(","); 145 for (String cf : cfsList) { 146 String cfName = cf.trim(); 147 if (cfName.length() > 0) { 148 tableCFBuilder.addFamilies(ByteString.copyFromUtf8(cfName)); 149 } 150 } 151 } 152 tableCFList.add(tableCFBuilder.build()); 153 } 154 return tableCFList.toArray(new ReplicationProtos.TableCF[tableCFList.size()]); 155 } 156 157 /** 158 * Convert TableCFs Object to String. 159 * Output String Format: ns1.table1:cf1,cf2;ns2.table2:cfA,cfB;table3 160 * */ 161 public static String convert(ReplicationProtos.TableCF[] tableCFs) { 162 StringBuilder sb = new StringBuilder(); 163 for (int i = 0, n = tableCFs.length; i < n; i++) { 164 ReplicationProtos.TableCF tableCF = tableCFs[i]; 165 String namespace = tableCF.getTableName().getNamespace().toStringUtf8(); 166 if (StringUtils.isNotEmpty(namespace)) { 167 sb.append(namespace).append("."). 168 append(tableCF.getTableName().getQualifier().toStringUtf8()) 169 .append(":"); 170 } else { 171 sb.append(tableCF.getTableName().toString()).append(":"); 172 } 173 for (int j = 0; j < tableCF.getFamiliesCount(); j++) { 174 sb.append(tableCF.getFamilies(j).toStringUtf8()).append(","); 175 } 176 sb.deleteCharAt(sb.length() - 1).append(";"); 177 } 178 if (sb.length() > 0) { 179 sb.deleteCharAt(sb.length() - 1); 180 } 181 return sb.toString(); 182 } 183 184 /** 185 * Get TableCF in TableCFs, if not exist, return null. 186 * */ 187 public static ReplicationProtos.TableCF getTableCF(ReplicationProtos.TableCF[] tableCFs, 188 String table) { 189 for (int i = 0, n = tableCFs.length; i < n; i++) { 190 ReplicationProtos.TableCF tableCF = tableCFs[i]; 191 if (tableCF.getTableName().getQualifier().toStringUtf8().equals(table)) { 192 return tableCF; 193 } 194 } 195 return null; 196 } 197 198 /** 199 * Parse bytes into TableCFs. 200 * It is used for backward compatibility. 201 * Old format bytes have no PB_MAGIC Header 202 * */ 203 public static ReplicationProtos.TableCF[] parseTableCFs(byte[] bytes) throws IOException { 204 if (bytes == null) { 205 return null; 206 } 207 return ReplicationPeerConfigUtil.convert(Bytes.toString(bytes)); 208 } 209 210 /** 211 * Convert tableCFs string into Map. 212 * */ 213 public static Map<TableName, List<String>> parseTableCFsFromConfig(String tableCFsConfig) { 214 ReplicationProtos.TableCF[] tableCFs = convert(tableCFsConfig); 215 return convert2Map(tableCFs); 216 } 217 218 /** 219 * Convert tableCFs Object to Map. 220 * */ 221 public static Map<TableName, List<String>> convert2Map(ReplicationProtos.TableCF[] tableCFs) { 222 if (tableCFs == null || tableCFs.length == 0) { 223 return null; 224 } 225 Map<TableName, List<String>> tableCFsMap = new HashMap<>(); 226 for (int i = 0, n = tableCFs.length; i < n; i++) { 227 ReplicationProtos.TableCF tableCF = tableCFs[i]; 228 List<String> families = new ArrayList<>(); 229 for (int j = 0, m = tableCF.getFamiliesCount(); j < m; j++) { 230 families.add(tableCF.getFamilies(j).toStringUtf8()); 231 } 232 if (families.size() > 0) { 233 tableCFsMap.put(ProtobufUtil.toTableName(tableCF.getTableName()), families); 234 } else { 235 tableCFsMap.put(ProtobufUtil.toTableName(tableCF.getTableName()), null); 236 } 237 } 238 239 return tableCFsMap; 240 } 241 242 /** 243 * @param bytes Content of a peer znode. 244 * @return ClusterKey parsed from the passed bytes. 245 * @throws DeserializationException 246 */ 247 public static ReplicationPeerConfig parsePeerFrom(final byte[] bytes) 248 throws DeserializationException { 249 if (ProtobufUtil.isPBMagicPrefix(bytes)) { 250 int pbLen = ProtobufUtil.lengthOfPBMagic(); 251 ReplicationProtos.ReplicationPeer.Builder builder = 252 ReplicationProtos.ReplicationPeer.newBuilder(); 253 ReplicationProtos.ReplicationPeer peer; 254 try { 255 ProtobufUtil.mergeFrom(builder, bytes, pbLen, bytes.length - pbLen); 256 peer = builder.build(); 257 } catch (IOException e) { 258 throw new DeserializationException(e); 259 } 260 return convert(peer); 261 } else { 262 if (bytes == null || bytes.length <= 0) { 263 throw new DeserializationException("Bytes to deserialize should not be empty."); 264 } 265 return ReplicationPeerConfig.newBuilder().setClusterKey(Bytes.toString(bytes)).build(); 266 } 267 } 268 269 public static ReplicationPeerConfig convert(ReplicationProtos.ReplicationPeer peer) { 270 ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(); 271 if (peer.hasClusterkey()) { 272 builder.setClusterKey(peer.getClusterkey()); 273 } 274 if (peer.hasReplicationEndpointImpl()) { 275 builder.setReplicationEndpointImpl(peer.getReplicationEndpointImpl()); 276 } 277 278 for (HBaseProtos.BytesBytesPair pair : peer.getDataList()) { 279 builder.putPeerData(pair.getFirst().toByteArray(), pair.getSecond().toByteArray()); 280 } 281 282 for (HBaseProtos.NameStringPair pair : peer.getConfigurationList()) { 283 builder.putConfiguration(pair.getName(), pair.getValue()); 284 } 285 286 Map<TableName, List<String>> tableCFsMap = convert2Map( 287 peer.getTableCfsList().toArray(new ReplicationProtos.TableCF[peer.getTableCfsCount()])); 288 if (tableCFsMap != null) { 289 builder.setTableCFsMap(tableCFsMap); 290 } 291 292 List<ByteString> namespacesList = peer.getNamespacesList(); 293 if (namespacesList != null && namespacesList.size() != 0) { 294 builder.setNamespaces( 295 namespacesList.stream().map(ByteString::toStringUtf8).collect(Collectors.toSet())); 296 } 297 298 if (peer.hasBandwidth()) { 299 builder.setBandwidth(peer.getBandwidth()); 300 } 301 302 if (peer.hasReplicateAll()) { 303 builder.setReplicateAllUserTables(peer.getReplicateAll()); 304 } 305 306 if (peer.hasSerial()) { 307 builder.setSerial(peer.getSerial()); 308 } 309 310 Map<TableName, List<String>> excludeTableCFsMap = convert2Map(peer.getExcludeTableCfsList() 311 .toArray(new ReplicationProtos.TableCF[peer.getExcludeTableCfsCount()])); 312 if (excludeTableCFsMap != null) { 313 builder.setExcludeTableCFsMap(excludeTableCFsMap); 314 } 315 316 List<ByteString> excludeNamespacesList = peer.getExcludeNamespacesList(); 317 if (excludeNamespacesList != null && excludeNamespacesList.size() != 0) { 318 builder.setExcludeNamespaces( 319 excludeNamespacesList.stream().map(ByteString::toStringUtf8).collect(Collectors.toSet())); 320 } 321 322 return builder.build(); 323 } 324 325 public static ReplicationProtos.ReplicationPeer convert(ReplicationPeerConfig peerConfig) { 326 ReplicationProtos.ReplicationPeer.Builder builder = 327 ReplicationProtos.ReplicationPeer.newBuilder(); 328 // we used to set cluster key as required so here we must always set it, until we can make sure 329 // that no one uses the old proto file. 330 builder.setClusterkey(peerConfig.getClusterKey() != null ? peerConfig.getClusterKey() : ""); 331 if (peerConfig.getReplicationEndpointImpl() != null) { 332 builder.setReplicationEndpointImpl(peerConfig.getReplicationEndpointImpl()); 333 } 334 335 for (Map.Entry<byte[], byte[]> entry : peerConfig.getPeerData().entrySet()) { 336 builder.addData(HBaseProtos.BytesBytesPair.newBuilder() 337 .setFirst(UnsafeByteOperations.unsafeWrap(entry.getKey())) 338 .setSecond(UnsafeByteOperations.unsafeWrap(entry.getValue())) 339 .build()); 340 } 341 342 for (Map.Entry<String, String> entry : peerConfig.getConfiguration().entrySet()) { 343 builder.addConfiguration(HBaseProtos.NameStringPair.newBuilder() 344 .setName(entry.getKey()) 345 .setValue(entry.getValue()) 346 .build()); 347 } 348 349 ReplicationProtos.TableCF[] tableCFs = convert(peerConfig.getTableCFsMap()); 350 if (tableCFs != null) { 351 for (int i = 0; i < tableCFs.length; i++) { 352 builder.addTableCfs(tableCFs[i]); 353 } 354 } 355 Set<String> namespaces = peerConfig.getNamespaces(); 356 if (namespaces != null) { 357 for (String namespace : namespaces) { 358 builder.addNamespaces(ByteString.copyFromUtf8(namespace)); 359 } 360 } 361 362 builder.setBandwidth(peerConfig.getBandwidth()); 363 builder.setReplicateAll(peerConfig.replicateAllUserTables()); 364 builder.setSerial(peerConfig.isSerial()); 365 366 ReplicationProtos.TableCF[] excludeTableCFs = convert(peerConfig.getExcludeTableCFsMap()); 367 if (excludeTableCFs != null) { 368 for (int i = 0; i < excludeTableCFs.length; i++) { 369 builder.addExcludeTableCfs(excludeTableCFs[i]); 370 } 371 } 372 Set<String> excludeNamespaces = peerConfig.getExcludeNamespaces(); 373 if (excludeNamespaces != null) { 374 for (String namespace : excludeNamespaces) { 375 builder.addExcludeNamespaces(ByteString.copyFromUtf8(namespace)); 376 } 377 } 378 379 return builder.build(); 380 } 381 382 /** 383 * @param peerConfig 384 * @return Serialized protobuf of <code>peerConfig</code> with pb magic prefix prepended suitable 385 * for use as content of a this.peersZNode; i.e. the content of PEER_ID znode under 386 * /hbase/replication/peers/PEER_ID 387 */ 388 public static byte[] toByteArray(final ReplicationPeerConfig peerConfig) { 389 byte[] bytes = convert(peerConfig).toByteArray(); 390 return ProtobufUtil.prependPBMagic(bytes); 391 } 392 393 public static ReplicationPeerDescription toReplicationPeerDescription( 394 ReplicationProtos.ReplicationPeerDescription desc) { 395 boolean enabled = ReplicationProtos.ReplicationState.State.ENABLED == desc.getState() 396 .getState(); 397 ReplicationPeerConfig config = convert(desc.getConfig()); 398 return new ReplicationPeerDescription(desc.getId(), enabled, config); 399 } 400 401 public static ReplicationProtos.ReplicationPeerDescription toProtoReplicationPeerDescription( 402 ReplicationPeerDescription desc) { 403 ReplicationProtos.ReplicationPeerDescription.Builder builder = 404 ReplicationProtos.ReplicationPeerDescription.newBuilder(); 405 builder.setId(desc.getPeerId()); 406 ReplicationProtos.ReplicationState.Builder stateBuilder = ReplicationProtos.ReplicationState 407 .newBuilder(); 408 stateBuilder.setState(desc.isEnabled() ? ReplicationProtos.ReplicationState.State.ENABLED 409 : ReplicationProtos.ReplicationState.State.DISABLED); 410 builder.setState(stateBuilder.build()); 411 builder.setConfig(convert(desc.getPeerConfig())); 412 return builder.build(); 413 } 414 415 public static ReplicationPeerConfig appendTableCFsToReplicationPeerConfig( 416 Map<TableName, List<String>> tableCfs, ReplicationPeerConfig peerConfig) { 417 ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(peerConfig); 418 Map<TableName, List<String>> preTableCfs = peerConfig.getTableCFsMap(); 419 if (preTableCfs == null) { 420 builder.setTableCFsMap(tableCfs); 421 } else { 422 builder.setTableCFsMap(mergeTableCFs(preTableCfs, tableCfs)); 423 } 424 return builder.build(); 425 } 426 427 public static ReplicationPeerConfig appendExcludeTableCFsToReplicationPeerConfig( 428 Map<TableName, List<String>> excludeTableCfs, ReplicationPeerConfig peerConfig) 429 throws ReplicationException { 430 if (excludeTableCfs == null) { 431 throw new ReplicationException("exclude tableCfs is null"); 432 } 433 ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(peerConfig); 434 Map<TableName, List<String>> preExcludeTableCfs = peerConfig.getExcludeTableCFsMap(); 435 if (preExcludeTableCfs == null) { 436 builder.setExcludeTableCFsMap(excludeTableCfs); 437 } else { 438 builder.setExcludeTableCFsMap(mergeTableCFs(preExcludeTableCfs, excludeTableCfs)); 439 } 440 return builder.build(); 441 } 442 443 private static Map<TableName, List<String>> mergeTableCFs( 444 Map<TableName, List<String>> preTableCfs, Map<TableName, List<String>> tableCfs) { 445 Map<TableName, List<String>> newTableCfs = copyTableCFsMap(preTableCfs); 446 for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) { 447 TableName table = entry.getKey(); 448 Collection<String> appendCfs = entry.getValue(); 449 if (newTableCfs.containsKey(table)) { 450 List<String> cfs = newTableCfs.get(table); 451 if (cfs == null || appendCfs == null || appendCfs.isEmpty()) { 452 newTableCfs.put(table, null); 453 } else { 454 Set<String> cfSet = new HashSet<String>(cfs); 455 cfSet.addAll(appendCfs); 456 newTableCfs.put(table, Lists.newArrayList(cfSet)); 457 } 458 } else { 459 if (appendCfs == null || appendCfs.isEmpty()) { 460 newTableCfs.put(table, null); 461 } else { 462 newTableCfs.put(table, Lists.newArrayList(appendCfs)); 463 } 464 } 465 } 466 return newTableCfs; 467 } 468 469 private static Map<TableName, List<String>> 470 copyTableCFsMap(Map<TableName, List<String>> preTableCfs) { 471 Map<TableName, List<String>> newTableCfs = new HashMap<>(); 472 preTableCfs.forEach( 473 (table, cfs) -> newTableCfs.put(table, cfs != null ? Lists.newArrayList(cfs) : null)); 474 return newTableCfs; 475 } 476 477 public static ReplicationPeerConfig removeTableCFsFromReplicationPeerConfig( 478 Map<TableName, List<String>> tableCfs, ReplicationPeerConfig peerConfig, 479 String id) throws ReplicationException { 480 Map<TableName, List<String>> preTableCfs = peerConfig.getTableCFsMap(); 481 if (preTableCfs == null) { 482 throw new ReplicationException("Table-Cfs for peer: " + id + " is null"); 483 } 484 Map<TableName, List<String>> newTableCfs = copyTableCFsMap(preTableCfs); 485 for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) { 486 TableName table = entry.getKey(); 487 Collection<String> removeCfs = entry.getValue(); 488 if (newTableCfs.containsKey(table)) { 489 List<String> cfs = newTableCfs.get(table); 490 if (cfs == null && (removeCfs == null || removeCfs.isEmpty())) { 491 newTableCfs.remove(table); 492 } else if (cfs != null && (removeCfs != null && !removeCfs.isEmpty())) { 493 Set<String> cfSet = new HashSet<String>(cfs); 494 cfSet.removeAll(removeCfs); 495 if (cfSet.isEmpty()) { 496 newTableCfs.remove(table); 497 } else { 498 newTableCfs.put(table, Lists.newArrayList(cfSet)); 499 } 500 } else if (cfs == null && (removeCfs != null && !removeCfs.isEmpty())) { 501 throw new ReplicationException("Cannot remove cf of table: " + table 502 + " which doesn't specify cfs from table-cfs config in peer: " + id); 503 } else if (cfs != null && (removeCfs == null || removeCfs.isEmpty())) { 504 throw new ReplicationException("Cannot remove table: " + table 505 + " which has specified cfs from table-cfs config in peer: " + id); 506 } 507 } else { 508 throw new ReplicationException( 509 "No table: " + table + " in table-cfs config of peer: " + id); 510 } 511 } 512 ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(peerConfig); 513 builder.setTableCFsMap(newTableCfs); 514 return builder.build(); 515 } 516 517 public static ReplicationPeerConfig removeExcludeTableCFsFromReplicationPeerConfig( 518 Map<TableName, List<String>> excludeTableCfs, ReplicationPeerConfig peerConfig, String id) 519 throws ReplicationException { 520 if (excludeTableCfs == null) { 521 throw new ReplicationException("exclude tableCfs is null"); 522 } 523 Map<TableName, List<String>> preExcludeTableCfs = peerConfig.getExcludeTableCFsMap(); 524 if (preExcludeTableCfs == null) { 525 throw new ReplicationException("exclude-Table-Cfs for peer: " + id + " is null"); 526 } 527 Map<TableName, List<String>> newExcludeTableCfs = copyTableCFsMap(preExcludeTableCfs); 528 for (Map.Entry<TableName, ? extends Collection<String>> entry : excludeTableCfs.entrySet()) { 529 TableName table = entry.getKey(); 530 Collection<String> removeCfs = entry.getValue(); 531 if (newExcludeTableCfs.containsKey(table)) { 532 List<String> cfs = newExcludeTableCfs.get(table); 533 if (cfs == null && (removeCfs == null || removeCfs.isEmpty())) { 534 newExcludeTableCfs.remove(table); 535 } else if (cfs != null && (removeCfs != null && !removeCfs.isEmpty())) { 536 Set<String> cfSet = new HashSet<String>(cfs); 537 cfSet.removeAll(removeCfs); 538 if (cfSet.isEmpty()) { 539 newExcludeTableCfs.remove(table); 540 } else { 541 newExcludeTableCfs.put(table, Lists.newArrayList(cfSet)); 542 } 543 } else if (cfs == null && (removeCfs != null && !removeCfs.isEmpty())) { 544 throw new ReplicationException("Cannot remove cf of table: " + table 545 + " which doesn't specify cfs from exclude-table-cfs config in peer: " + id); 546 } else if (cfs != null && (removeCfs == null || removeCfs.isEmpty())) { 547 throw new ReplicationException("Cannot remove table: " + table 548 + " which has specified cfs from exclude-table-cfs config in peer: " + id); 549 } 550 } else { 551 throw new ReplicationException( 552 "No table: " + table + " in exclude-table-cfs config of peer: " + id); 553 } 554 } 555 ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(peerConfig); 556 builder.setExcludeTableCFsMap(newExcludeTableCfs); 557 return builder.build(); 558 } 559 560 /** 561 * Returns the configuration needed to talk to the remote slave cluster. 562 * @param conf the base configuration 563 * @param peer the description of replication peer 564 * @return the configuration for the peer cluster, null if it was unable to get the configuration 565 * @throws IOException when create peer cluster configuration failed 566 */ 567 public static Configuration getPeerClusterConfiguration(Configuration conf, 568 ReplicationPeerDescription peer) throws IOException { 569 ReplicationPeerConfig peerConfig = peer.getPeerConfig(); 570 Configuration otherConf; 571 try { 572 otherConf = HBaseConfiguration.createClusterConf(conf, peerConfig.getClusterKey()); 573 } catch (IOException e) { 574 throw new IOException("Can't get peer configuration for peerId=" + peer.getPeerId(), e); 575 } 576 577 if (!peerConfig.getConfiguration().isEmpty()) { 578 CompoundConfiguration compound = new CompoundConfiguration(); 579 compound.add(otherConf); 580 compound.addStringMap(peerConfig.getConfiguration()); 581 return compound; 582 } 583 584 return otherConf; 585 } 586}