001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.client.replication; 020 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Collection; 024import java.util.HashMap; 025import java.util.HashSet; 026import java.util.List; 027import java.util.Map; 028import java.util.Set; 029import java.util.stream.Collectors; 030 031import org.apache.commons.lang3.StringUtils; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.hbase.CompoundConfiguration; 034import org.apache.hadoop.hbase.HBaseConfiguration; 035import org.apache.hadoop.hbase.TableName; 036import org.apache.hadoop.hbase.exceptions.DeserializationException; 037import org.apache.hadoop.hbase.replication.ReplicationException; 038import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 039import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder; 040import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; 041import org.apache.hadoop.hbase.util.Bytes; 042import org.apache.hbase.thirdparty.com.google.common.base.Splitter; 043import org.apache.yetus.audience.InterfaceAudience; 044import org.apache.yetus.audience.InterfaceStability; 045import org.slf4j.Logger; 046import org.slf4j.LoggerFactory; 047 048import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 049import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 050import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 051import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 052import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 053import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; 054 055/** 056 * Helper for TableCFs Operations. 057 */ 058@InterfaceAudience.Private 059@InterfaceStability.Stable 060public final class ReplicationPeerConfigUtil { 061 062 private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeerConfigUtil.class); 063 public static final String HBASE_REPLICATION_PEER_BASE_CONFIG = 064 "hbase.replication.peer.base.config"; 065 066 private ReplicationPeerConfigUtil() {} 067 068 public static String convertToString(Set<String> namespaces) { 069 if (namespaces == null) { 070 return null; 071 } 072 return StringUtils.join(namespaces, ';'); 073 } 074 075 /** convert map to TableCFs Object */ 076 public static ReplicationProtos.TableCF[] convert( 077 Map<TableName, ? extends Collection<String>> tableCfs) { 078 if (tableCfs == null) { 079 return null; 080 } 081 List<ReplicationProtos.TableCF> tableCFList = new ArrayList<>(tableCfs.entrySet().size()); 082 ReplicationProtos.TableCF.Builder tableCFBuilder = ReplicationProtos.TableCF.newBuilder(); 083 for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) { 084 tableCFBuilder.clear(); 085 tableCFBuilder.setTableName(ProtobufUtil.toProtoTableName(entry.getKey())); 086 Collection<String> v = entry.getValue(); 087 if (v != null && !v.isEmpty()) { 088 for (String value : entry.getValue()) { 089 tableCFBuilder.addFamilies(ByteString.copyFromUtf8(value)); 090 } 091 } 092 tableCFList.add(tableCFBuilder.build()); 093 } 094 return tableCFList.toArray(new ReplicationProtos.TableCF[tableCFList.size()]); 095 } 096 097 public static String convertToString(Map<TableName, ? extends Collection<String>> tableCfs) { 098 if (tableCfs == null) { 099 return null; 100 } 101 return convert(convert(tableCfs)); 102 } 103 104 /** 105 * Convert string to TableCFs Object. 106 * This is only for read TableCFs information from TableCF node. 107 * Input String Format: ns1.table1:cf1,cf2;ns2.table2:cfA,cfB;ns3.table3. 108 * */ 109 public static ReplicationProtos.TableCF[] convert(String tableCFsConfig) { 110 if (tableCFsConfig == null || tableCFsConfig.trim().length() == 0) { 111 return null; 112 } 113 114 ReplicationProtos.TableCF.Builder tableCFBuilder = ReplicationProtos.TableCF.newBuilder(); 115 String[] tables = tableCFsConfig.split(";"); 116 List<ReplicationProtos.TableCF> tableCFList = new ArrayList<>(tables.length); 117 118 for (String tab : tables) { 119 // 1 ignore empty table config 120 tab = tab.trim(); 121 if (tab.length() == 0) { 122 continue; 123 } 124 // 2 split to "table" and "cf1,cf2" 125 // for each table: "table#cf1,cf2" or "table" 126 String[] pair = tab.split(":"); 127 String tabName = pair[0].trim(); 128 if (pair.length > 2 || tabName.length() == 0) { 129 LOG.info("incorrect format:" + tableCFsConfig); 130 continue; 131 } 132 133 tableCFBuilder.clear(); 134 // split namespace from tableName 135 String ns = "default"; 136 String tName = tabName; 137 String[] dbs = tabName.split("\\."); 138 if (dbs != null && dbs.length == 2) { 139 ns = dbs[0]; 140 tName = dbs[1]; 141 } 142 tableCFBuilder.setTableName( 143 ProtobufUtil.toProtoTableName(TableName.valueOf(ns, tName))); 144 145 // 3 parse "cf1,cf2" part to List<cf> 146 if (pair.length == 2) { 147 String[] cfsList = pair[1].split(","); 148 for (String cf : cfsList) { 149 String cfName = cf.trim(); 150 if (cfName.length() > 0) { 151 tableCFBuilder.addFamilies(ByteString.copyFromUtf8(cfName)); 152 } 153 } 154 } 155 tableCFList.add(tableCFBuilder.build()); 156 } 157 return tableCFList.toArray(new ReplicationProtos.TableCF[tableCFList.size()]); 158 } 159 160 /** 161 * Convert TableCFs Object to String. 162 * Output String Format: ns1.table1:cf1,cf2;ns2.table2:cfA,cfB;table3 163 * */ 164 public static String convert(ReplicationProtos.TableCF[] tableCFs) { 165 StringBuilder sb = new StringBuilder(); 166 for (int i = 0, n = tableCFs.length; i < n; i++) { 167 ReplicationProtos.TableCF tableCF = tableCFs[i]; 168 String namespace = tableCF.getTableName().getNamespace().toStringUtf8(); 169 if (StringUtils.isNotEmpty(namespace)) { 170 sb.append(namespace).append("."). 171 append(tableCF.getTableName().getQualifier().toStringUtf8()) 172 .append(":"); 173 } else { 174 sb.append(tableCF.getTableName().toString()).append(":"); 175 } 176 for (int j = 0; j < tableCF.getFamiliesCount(); j++) { 177 sb.append(tableCF.getFamilies(j).toStringUtf8()).append(","); 178 } 179 sb.deleteCharAt(sb.length() - 1).append(";"); 180 } 181 if (sb.length() > 0) { 182 sb.deleteCharAt(sb.length() - 1); 183 } 184 return sb.toString(); 185 } 186 187 /** 188 * Get TableCF in TableCFs, if not exist, return null. 189 * */ 190 public static ReplicationProtos.TableCF getTableCF(ReplicationProtos.TableCF[] tableCFs, 191 String table) { 192 for (int i = 0, n = tableCFs.length; i < n; i++) { 193 ReplicationProtos.TableCF tableCF = tableCFs[i]; 194 if (tableCF.getTableName().getQualifier().toStringUtf8().equals(table)) { 195 return tableCF; 196 } 197 } 198 return null; 199 } 200 201 /** 202 * Parse bytes into TableCFs. 203 * It is used for backward compatibility. 204 * Old format bytes have no PB_MAGIC Header 205 * */ 206 public static ReplicationProtos.TableCF[] parseTableCFs(byte[] bytes) throws IOException { 207 if (bytes == null) { 208 return null; 209 } 210 return ReplicationPeerConfigUtil.convert(Bytes.toString(bytes)); 211 } 212 213 /** 214 * Convert tableCFs string into Map. 215 * */ 216 public static Map<TableName, List<String>> parseTableCFsFromConfig(String tableCFsConfig) { 217 ReplicationProtos.TableCF[] tableCFs = convert(tableCFsConfig); 218 return convert2Map(tableCFs); 219 } 220 221 /** 222 * Convert tableCFs Object to Map. 223 * */ 224 public static Map<TableName, List<String>> convert2Map(ReplicationProtos.TableCF[] tableCFs) { 225 if (tableCFs == null || tableCFs.length == 0) { 226 return null; 227 } 228 Map<TableName, List<String>> tableCFsMap = new HashMap<>(); 229 for (int i = 0, n = tableCFs.length; i < n; i++) { 230 ReplicationProtos.TableCF tableCF = tableCFs[i]; 231 List<String> families = new ArrayList<>(); 232 for (int j = 0, m = tableCF.getFamiliesCount(); j < m; j++) { 233 families.add(tableCF.getFamilies(j).toStringUtf8()); 234 } 235 if (families.size() > 0) { 236 tableCFsMap.put(ProtobufUtil.toTableName(tableCF.getTableName()), families); 237 } else { 238 tableCFsMap.put(ProtobufUtil.toTableName(tableCF.getTableName()), null); 239 } 240 } 241 242 return tableCFsMap; 243 } 244 245 /** 246 * @param bytes Content of a peer znode. 247 * @return ClusterKey parsed from the passed bytes. 248 * @throws DeserializationException 249 */ 250 public static ReplicationPeerConfig parsePeerFrom(final byte[] bytes) 251 throws DeserializationException { 252 if (ProtobufUtil.isPBMagicPrefix(bytes)) { 253 int pbLen = ProtobufUtil.lengthOfPBMagic(); 254 ReplicationProtos.ReplicationPeer.Builder builder = 255 ReplicationProtos.ReplicationPeer.newBuilder(); 256 ReplicationProtos.ReplicationPeer peer; 257 try { 258 ProtobufUtil.mergeFrom(builder, bytes, pbLen, bytes.length - pbLen); 259 peer = builder.build(); 260 } catch (IOException e) { 261 throw new DeserializationException(e); 262 } 263 return convert(peer); 264 } else { 265 if (bytes == null || bytes.length <= 0) { 266 throw new DeserializationException("Bytes to deserialize should not be empty."); 267 } 268 return ReplicationPeerConfig.newBuilder().setClusterKey(Bytes.toString(bytes)).build(); 269 } 270 } 271 272 public static ReplicationPeerConfig convert(ReplicationProtos.ReplicationPeer peer) { 273 ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(); 274 if (peer.hasClusterkey()) { 275 builder.setClusterKey(peer.getClusterkey()); 276 } 277 if (peer.hasReplicationEndpointImpl()) { 278 builder.setReplicationEndpointImpl(peer.getReplicationEndpointImpl()); 279 } 280 281 for (HBaseProtos.BytesBytesPair pair : peer.getDataList()) { 282 builder.putPeerData(pair.getFirst().toByteArray(), pair.getSecond().toByteArray()); 283 } 284 285 for (HBaseProtos.NameStringPair pair : peer.getConfigurationList()) { 286 builder.putConfiguration(pair.getName(), pair.getValue()); 287 } 288 289 Map<TableName, List<String>> tableCFsMap = convert2Map( 290 peer.getTableCfsList().toArray(new ReplicationProtos.TableCF[peer.getTableCfsCount()])); 291 if (tableCFsMap != null) { 292 builder.setTableCFsMap(tableCFsMap); 293 } 294 295 List<ByteString> namespacesList = peer.getNamespacesList(); 296 if (namespacesList != null && namespacesList.size() != 0) { 297 builder.setNamespaces( 298 namespacesList.stream().map(ByteString::toStringUtf8).collect(Collectors.toSet())); 299 } 300 301 if (peer.hasBandwidth()) { 302 builder.setBandwidth(peer.getBandwidth()); 303 } 304 305 if (peer.hasReplicateAll()) { 306 builder.setReplicateAllUserTables(peer.getReplicateAll()); 307 } 308 309 if (peer.hasSerial()) { 310 builder.setSerial(peer.getSerial()); 311 } 312 313 Map<TableName, List<String>> excludeTableCFsMap = convert2Map(peer.getExcludeTableCfsList() 314 .toArray(new ReplicationProtos.TableCF[peer.getExcludeTableCfsCount()])); 315 if (excludeTableCFsMap != null) { 316 builder.setExcludeTableCFsMap(excludeTableCFsMap); 317 } 318 319 List<ByteString> excludeNamespacesList = peer.getExcludeNamespacesList(); 320 if (excludeNamespacesList != null && excludeNamespacesList.size() != 0) { 321 builder.setExcludeNamespaces( 322 excludeNamespacesList.stream().map(ByteString::toStringUtf8).collect(Collectors.toSet())); 323 } 324 325 return builder.build(); 326 } 327 328 public static ReplicationProtos.ReplicationPeer convert(ReplicationPeerConfig peerConfig) { 329 ReplicationProtos.ReplicationPeer.Builder builder = 330 ReplicationProtos.ReplicationPeer.newBuilder(); 331 // we used to set cluster key as required so here we must always set it, until we can make sure 332 // that no one uses the old proto file. 333 builder.setClusterkey(peerConfig.getClusterKey() != null ? peerConfig.getClusterKey() : ""); 334 if (peerConfig.getReplicationEndpointImpl() != null) { 335 builder.setReplicationEndpointImpl(peerConfig.getReplicationEndpointImpl()); 336 } 337 338 for (Map.Entry<byte[], byte[]> entry : peerConfig.getPeerData().entrySet()) { 339 builder.addData(HBaseProtos.BytesBytesPair.newBuilder() 340 .setFirst(UnsafeByteOperations.unsafeWrap(entry.getKey())) 341 .setSecond(UnsafeByteOperations.unsafeWrap(entry.getValue())) 342 .build()); 343 } 344 345 for (Map.Entry<String, String> entry : peerConfig.getConfiguration().entrySet()) { 346 builder.addConfiguration(HBaseProtos.NameStringPair.newBuilder() 347 .setName(entry.getKey()) 348 .setValue(entry.getValue()) 349 .build()); 350 } 351 352 ReplicationProtos.TableCF[] tableCFs = convert(peerConfig.getTableCFsMap()); 353 if (tableCFs != null) { 354 for (int i = 0; i < tableCFs.length; i++) { 355 builder.addTableCfs(tableCFs[i]); 356 } 357 } 358 Set<String> namespaces = peerConfig.getNamespaces(); 359 if (namespaces != null) { 360 for (String namespace : namespaces) { 361 builder.addNamespaces(ByteString.copyFromUtf8(namespace)); 362 } 363 } 364 365 builder.setBandwidth(peerConfig.getBandwidth()); 366 builder.setReplicateAll(peerConfig.replicateAllUserTables()); 367 builder.setSerial(peerConfig.isSerial()); 368 369 ReplicationProtos.TableCF[] excludeTableCFs = convert(peerConfig.getExcludeTableCFsMap()); 370 if (excludeTableCFs != null) { 371 for (int i = 0; i < excludeTableCFs.length; i++) { 372 builder.addExcludeTableCfs(excludeTableCFs[i]); 373 } 374 } 375 Set<String> excludeNamespaces = peerConfig.getExcludeNamespaces(); 376 if (excludeNamespaces != null) { 377 for (String namespace : excludeNamespaces) { 378 builder.addExcludeNamespaces(ByteString.copyFromUtf8(namespace)); 379 } 380 } 381 382 return builder.build(); 383 } 384 385 /** 386 * @param peerConfig 387 * @return Serialized protobuf of <code>peerConfig</code> with pb magic prefix prepended suitable 388 * for use as content of a this.peersZNode; i.e. the content of PEER_ID znode under 389 * /hbase/replication/peers/PEER_ID 390 */ 391 public static byte[] toByteArray(final ReplicationPeerConfig peerConfig) { 392 byte[] bytes = convert(peerConfig).toByteArray(); 393 return ProtobufUtil.prependPBMagic(bytes); 394 } 395 396 public static ReplicationPeerDescription toReplicationPeerDescription( 397 ReplicationProtos.ReplicationPeerDescription desc) { 398 boolean enabled = ReplicationProtos.ReplicationState.State.ENABLED == desc.getState() 399 .getState(); 400 ReplicationPeerConfig config = convert(desc.getConfig()); 401 return new ReplicationPeerDescription(desc.getId(), enabled, config); 402 } 403 404 public static ReplicationProtos.ReplicationPeerDescription toProtoReplicationPeerDescription( 405 ReplicationPeerDescription desc) { 406 ReplicationProtos.ReplicationPeerDescription.Builder builder = 407 ReplicationProtos.ReplicationPeerDescription.newBuilder(); 408 builder.setId(desc.getPeerId()); 409 ReplicationProtos.ReplicationState.Builder stateBuilder = ReplicationProtos.ReplicationState 410 .newBuilder(); 411 stateBuilder.setState(desc.isEnabled() ? ReplicationProtos.ReplicationState.State.ENABLED 412 : ReplicationProtos.ReplicationState.State.DISABLED); 413 builder.setState(stateBuilder.build()); 414 builder.setConfig(convert(desc.getPeerConfig())); 415 return builder.build(); 416 } 417 418 public static ReplicationPeerConfig appendTableCFsToReplicationPeerConfig( 419 Map<TableName, List<String>> tableCfs, ReplicationPeerConfig peerConfig) { 420 ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(peerConfig); 421 Map<TableName, List<String>> preTableCfs = peerConfig.getTableCFsMap(); 422 if (preTableCfs == null) { 423 builder.setTableCFsMap(tableCfs); 424 } else { 425 builder.setTableCFsMap(mergeTableCFs(preTableCfs, tableCfs)); 426 } 427 return builder.build(); 428 } 429 430 /** 431 * Helper method to add base peer configs from Configuration to ReplicationPeerConfig 432 * if not present in latter. 433 * 434 * This merges the user supplied peer configuration 435 * {@link org.apache.hadoop.hbase.replication.ReplicationPeerConfig} with peer configs 436 * provided as property hbase.replication.peer.base.configs in hbase configuration. 437 * Expected format for this hbase configuration is "k1=v1;k2=v2,v2_1". Original value 438 * of conf is retained if already present in ReplicationPeerConfig. 439 * 440 * @param conf Configuration 441 * @return ReplicationPeerConfig containing updated configs. 442 */ 443 public static ReplicationPeerConfig addBasePeerConfigsIfNotPresent(Configuration conf, 444 ReplicationPeerConfig receivedPeerConfig) { 445 String basePeerConfigs = conf.get(HBASE_REPLICATION_PEER_BASE_CONFIG, ""); 446 ReplicationPeerConfigBuilder copiedPeerConfigBuilder = ReplicationPeerConfig. 447 newBuilder(receivedPeerConfig); 448 Map<String,String> receivedPeerConfigMap = receivedPeerConfig.getConfiguration(); 449 450 if (basePeerConfigs.length() != 0) { 451 Map<String, String> basePeerConfigMap = Splitter.on(';').trimResults().omitEmptyStrings() 452 .withKeyValueSeparator("=").split(basePeerConfigs); 453 for (Map.Entry<String,String> entry : basePeerConfigMap.entrySet()) { 454 String configName = entry.getKey(); 455 String configValue = entry.getValue(); 456 // Only override if base config does not exist in existing peer configs 457 if (!receivedPeerConfigMap.containsKey(configName)) { 458 copiedPeerConfigBuilder.putConfiguration(configName, configValue); 459 } 460 } 461 } 462 return copiedPeerConfigBuilder.build(); 463 } 464 465 public static ReplicationPeerConfig appendExcludeTableCFsToReplicationPeerConfig( 466 Map<TableName, List<String>> excludeTableCfs, ReplicationPeerConfig peerConfig) 467 throws ReplicationException { 468 if (excludeTableCfs == null) { 469 throw new ReplicationException("exclude tableCfs is null"); 470 } 471 ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(peerConfig); 472 Map<TableName, List<String>> preExcludeTableCfs = peerConfig.getExcludeTableCFsMap(); 473 if (preExcludeTableCfs == null) { 474 builder.setExcludeTableCFsMap(excludeTableCfs); 475 } else { 476 builder.setExcludeTableCFsMap(mergeTableCFs(preExcludeTableCfs, excludeTableCfs)); 477 } 478 return builder.build(); 479 } 480 481 private static Map<TableName, List<String>> mergeTableCFs( 482 Map<TableName, List<String>> preTableCfs, Map<TableName, List<String>> tableCfs) { 483 Map<TableName, List<String>> newTableCfs = copyTableCFsMap(preTableCfs); 484 for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) { 485 TableName table = entry.getKey(); 486 Collection<String> appendCfs = entry.getValue(); 487 if (newTableCfs.containsKey(table)) { 488 List<String> cfs = newTableCfs.get(table); 489 if (cfs == null || appendCfs == null || appendCfs.isEmpty()) { 490 newTableCfs.put(table, null); 491 } else { 492 Set<String> cfSet = new HashSet<String>(cfs); 493 cfSet.addAll(appendCfs); 494 newTableCfs.put(table, Lists.newArrayList(cfSet)); 495 } 496 } else { 497 if (appendCfs == null || appendCfs.isEmpty()) { 498 newTableCfs.put(table, null); 499 } else { 500 newTableCfs.put(table, Lists.newArrayList(appendCfs)); 501 } 502 } 503 } 504 return newTableCfs; 505 } 506 507 private static Map<TableName, List<String>> 508 copyTableCFsMap(Map<TableName, List<String>> preTableCfs) { 509 Map<TableName, List<String>> newTableCfs = new HashMap<>(); 510 preTableCfs.forEach( 511 (table, cfs) -> newTableCfs.put(table, cfs != null ? Lists.newArrayList(cfs) : null)); 512 return newTableCfs; 513 } 514 515 public static ReplicationPeerConfig removeTableCFsFromReplicationPeerConfig( 516 Map<TableName, List<String>> tableCfs, ReplicationPeerConfig peerConfig, 517 String id) throws ReplicationException { 518 Map<TableName, List<String>> preTableCfs = peerConfig.getTableCFsMap(); 519 if (preTableCfs == null) { 520 throw new ReplicationException("Table-Cfs for peer: " + id + " is null"); 521 } 522 Map<TableName, List<String>> newTableCfs = copyTableCFsMap(preTableCfs); 523 for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) { 524 TableName table = entry.getKey(); 525 Collection<String> removeCfs = entry.getValue(); 526 if (newTableCfs.containsKey(table)) { 527 List<String> cfs = newTableCfs.get(table); 528 if (cfs == null && (removeCfs == null || removeCfs.isEmpty())) { 529 newTableCfs.remove(table); 530 } else if (cfs != null && (removeCfs != null && !removeCfs.isEmpty())) { 531 Set<String> cfSet = new HashSet<String>(cfs); 532 cfSet.removeAll(removeCfs); 533 if (cfSet.isEmpty()) { 534 newTableCfs.remove(table); 535 } else { 536 newTableCfs.put(table, Lists.newArrayList(cfSet)); 537 } 538 } else if (cfs == null && (removeCfs != null && !removeCfs.isEmpty())) { 539 throw new ReplicationException("Cannot remove cf of table: " + table 540 + " which doesn't specify cfs from table-cfs config in peer: " + id); 541 } else if (cfs != null && (removeCfs == null || removeCfs.isEmpty())) { 542 throw new ReplicationException("Cannot remove table: " + table 543 + " which has specified cfs from table-cfs config in peer: " + id); 544 } 545 } else { 546 throw new ReplicationException( 547 "No table: " + table + " in table-cfs config of peer: " + id); 548 } 549 } 550 ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(peerConfig); 551 builder.setTableCFsMap(newTableCfs); 552 return builder.build(); 553 } 554 555 public static ReplicationPeerConfig removeExcludeTableCFsFromReplicationPeerConfig( 556 Map<TableName, List<String>> excludeTableCfs, ReplicationPeerConfig peerConfig, String id) 557 throws ReplicationException { 558 if (excludeTableCfs == null) { 559 throw new ReplicationException("exclude tableCfs is null"); 560 } 561 Map<TableName, List<String>> preExcludeTableCfs = peerConfig.getExcludeTableCFsMap(); 562 if (preExcludeTableCfs == null) { 563 throw new ReplicationException("exclude-Table-Cfs for peer: " + id + " is null"); 564 } 565 Map<TableName, List<String>> newExcludeTableCfs = copyTableCFsMap(preExcludeTableCfs); 566 for (Map.Entry<TableName, ? extends Collection<String>> entry : excludeTableCfs.entrySet()) { 567 TableName table = entry.getKey(); 568 Collection<String> removeCfs = entry.getValue(); 569 if (newExcludeTableCfs.containsKey(table)) { 570 List<String> cfs = newExcludeTableCfs.get(table); 571 if (cfs == null && (removeCfs == null || removeCfs.isEmpty())) { 572 newExcludeTableCfs.remove(table); 573 } else if (cfs != null && (removeCfs != null && !removeCfs.isEmpty())) { 574 Set<String> cfSet = new HashSet<String>(cfs); 575 cfSet.removeAll(removeCfs); 576 if (cfSet.isEmpty()) { 577 newExcludeTableCfs.remove(table); 578 } else { 579 newExcludeTableCfs.put(table, Lists.newArrayList(cfSet)); 580 } 581 } else if (cfs == null && (removeCfs != null && !removeCfs.isEmpty())) { 582 throw new ReplicationException("Cannot remove cf of table: " + table 583 + " which doesn't specify cfs from exclude-table-cfs config in peer: " + id); 584 } else if (cfs != null && (removeCfs == null || removeCfs.isEmpty())) { 585 throw new ReplicationException("Cannot remove table: " + table 586 + " which has specified cfs from exclude-table-cfs config in peer: " + id); 587 } 588 } else { 589 throw new ReplicationException( 590 "No table: " + table + " in exclude-table-cfs config of peer: " + id); 591 } 592 } 593 ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(peerConfig); 594 builder.setExcludeTableCFsMap(newExcludeTableCfs); 595 return builder.build(); 596 } 597 598 /** 599 * Returns the configuration needed to talk to the remote slave cluster. 600 * @param conf the base configuration 601 * @param peer the description of replication peer 602 * @return the configuration for the peer cluster, null if it was unable to get the configuration 603 * @throws IOException when create peer cluster configuration failed 604 */ 605 public static Configuration getPeerClusterConfiguration(Configuration conf, 606 ReplicationPeerDescription peer) throws IOException { 607 ReplicationPeerConfig peerConfig = peer.getPeerConfig(); 608 Configuration otherConf; 609 try { 610 otherConf = HBaseConfiguration.createClusterConf(conf, peerConfig.getClusterKey()); 611 } catch (IOException e) { 612 throw new IOException("Can't get peer configuration for peerId=" + peer.getPeerId(), e); 613 } 614 615 if (!peerConfig.getConfiguration().isEmpty()) { 616 CompoundConfiguration compound = new CompoundConfiguration(); 617 compound.add(otherConf); 618 compound.addStringMap(peerConfig.getConfiguration()); 619 return compound; 620 } 621 return otherConf; 622 } 623}