001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client.replication; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Collection; 023import java.util.HashMap; 024import java.util.HashSet; 025import java.util.Iterator; 026import java.util.List; 027import java.util.Map; 028import java.util.Set; 029import java.util.stream.Collectors; 030import org.apache.commons.lang3.StringUtils; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.hbase.CompoundConfiguration; 033import org.apache.hadoop.hbase.HBaseConfiguration; 034import org.apache.hadoop.hbase.TableName; 035import org.apache.hadoop.hbase.exceptions.DeserializationException; 036import org.apache.hadoop.hbase.replication.ReplicationException; 037import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 038import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder; 039import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; 040import org.apache.hadoop.hbase.util.Bytes; 041import org.apache.yetus.audience.InterfaceAudience; 042import org.apache.yetus.audience.InterfaceStability; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046import org.apache.hbase.thirdparty.com.google.common.base.Splitter; 047import org.apache.hbase.thirdparty.com.google.common.base.Strings; 048import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 049import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 050import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 051 052import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 053import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 054import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; 055 056/** 057 * Helper for TableCFs Operations. 058 */ 059@InterfaceAudience.Private 060@InterfaceStability.Stable 061public final class ReplicationPeerConfigUtil { 062 063 private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeerConfigUtil.class); 064 public static final String HBASE_REPLICATION_PEER_BASE_CONFIG = 065 "hbase.replication.peer.base.config"; 066 067 private ReplicationPeerConfigUtil() { 068 } 069 070 public static String convertToString(Set<String> namespaces) { 071 if (namespaces == null) { 072 return null; 073 } 074 return StringUtils.join(namespaces, ';'); 075 } 076 077 /** convert map to TableCFs Object */ 078 public static ReplicationProtos.TableCF[] 079 convert(Map<TableName, ? extends Collection<String>> tableCfs) { 080 if (tableCfs == null) { 081 return null; 082 } 083 List<ReplicationProtos.TableCF> tableCFList = new ArrayList<>(tableCfs.entrySet().size()); 084 ReplicationProtos.TableCF.Builder tableCFBuilder = ReplicationProtos.TableCF.newBuilder(); 085 for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) { 086 tableCFBuilder.clear(); 087 tableCFBuilder.setTableName(ProtobufUtil.toProtoTableName(entry.getKey())); 088 Collection<String> v = entry.getValue(); 089 if (v != null && !v.isEmpty()) { 090 for (String value : entry.getValue()) { 091 tableCFBuilder.addFamilies(ByteString.copyFromUtf8(value)); 092 } 093 } 094 tableCFList.add(tableCFBuilder.build()); 095 } 096 return tableCFList.toArray(new ReplicationProtos.TableCF[tableCFList.size()]); 097 } 098 099 public static String convertToString(Map<TableName, ? extends Collection<String>> tableCfs) { 100 if (tableCfs == null) { 101 return null; 102 } 103 return convert(convert(tableCfs)); 104 } 105 106 /** 107 * Convert string to TableCFs Object. This is only for read TableCFs information from TableCF 108 * node. Input String Format: ns1.table1:cf1,cf2;ns2.table2:cfA,cfB;ns3.table3. 109 */ 110 public static ReplicationProtos.TableCF[] convert(String tableCFsConfig) { 111 if (tableCFsConfig == null || tableCFsConfig.trim().length() == 0) { 112 return null; 113 } 114 115 ReplicationProtos.TableCF.Builder tableCFBuilder = ReplicationProtos.TableCF.newBuilder(); 116 List<String> tables = Splitter.on(';').splitToList(tableCFsConfig); 117 List<ReplicationProtos.TableCF> tableCFList = new ArrayList<>(tables.size()); 118 119 for (String tab : tables) { 120 // 1 ignore empty table config 121 tab = tab.trim(); 122 if (tab.length() == 0) { 123 continue; 124 } 125 // 2 split to "table" and "cf1,cf2" 126 // for each table: "table#cf1,cf2" or "table" 127 List<String> pair = Splitter.on(':').splitToList(tab); 128 if (pair.size() > 2) { 129 LOG.info("incorrect format:" + tableCFsConfig); 130 continue; 131 } 132 assert pair.size() > 0; 133 Iterator<String> i = pair.iterator(); 134 String tabName = i.next().trim(); 135 if (tabName.length() == 0) { 136 LOG.info("incorrect format:" + tableCFsConfig); 137 continue; 138 } 139 140 tableCFBuilder.clear(); 141 // split namespace from tableName 142 String ns = "default"; 143 String tName = tabName; 144 List<String> dbs = Splitter.on('.').splitToList(tabName); 145 if (dbs != null && dbs.size() == 2) { 146 Iterator<String> ii = dbs.iterator(); 147 ns = ii.next(); 148 tName = ii.next(); 149 } 150 tableCFBuilder.setTableName(ProtobufUtil.toProtoTableName(TableName.valueOf(ns, tName))); 151 152 // 3 parse "cf1,cf2" part to List<cf> 153 if (i.hasNext()) { 154 List<String> cfsList = Splitter.on(',').splitToList(i.next()); 155 for (String cf : cfsList) { 156 String cfName = cf.trim(); 157 if (cfName.length() > 0) { 158 tableCFBuilder.addFamilies(ByteString.copyFromUtf8(cfName)); 159 } 160 } 161 } 162 tableCFList.add(tableCFBuilder.build()); 163 } 164 return tableCFList.toArray(new ReplicationProtos.TableCF[tableCFList.size()]); 165 } 166 167 /** 168 * Convert TableCFs Object to String. Output String Format: 169 * ns1.table1:cf1,cf2;ns2.table2:cfA,cfB;table3 170 */ 171 public static String convert(ReplicationProtos.TableCF[] tableCFs) { 172 StringBuilder sb = new StringBuilder(); 173 for (int i = 0, n = tableCFs.length; i < n; i++) { 174 ReplicationProtos.TableCF tableCF = tableCFs[i]; 175 String namespace = tableCF.getTableName().getNamespace().toStringUtf8(); 176 if (StringUtils.isNotEmpty(namespace)) { 177 sb.append(namespace).append(".") 178 .append(tableCF.getTableName().getQualifier().toStringUtf8()).append(":"); 179 } else { 180 sb.append(tableCF.getTableName().toString()).append(":"); 181 } 182 for (int j = 0; j < tableCF.getFamiliesCount(); j++) { 183 sb.append(tableCF.getFamilies(j).toStringUtf8()).append(","); 184 } 185 sb.deleteCharAt(sb.length() - 1).append(";"); 186 } 187 if (sb.length() > 0) { 188 sb.deleteCharAt(sb.length() - 1); 189 } 190 return sb.toString(); 191 } 192 193 /** 194 * Get TableCF in TableCFs, if not exist, return null. 195 */ 196 public static ReplicationProtos.TableCF getTableCF(ReplicationProtos.TableCF[] tableCFs, 197 String table) { 198 for (int i = 0, n = tableCFs.length; i < n; i++) { 199 ReplicationProtos.TableCF tableCF = tableCFs[i]; 200 if (tableCF.getTableName().getQualifier().toStringUtf8().equals(table)) { 201 return tableCF; 202 } 203 } 204 return null; 205 } 206 207 /** 208 * Parse bytes into TableCFs. It is used for backward compatibility. Old format bytes have no 209 * PB_MAGIC Header 210 */ 211 public static ReplicationProtos.TableCF[] parseTableCFs(byte[] bytes) throws IOException { 212 if (bytes == null) { 213 return null; 214 } 215 return ReplicationPeerConfigUtil.convert(Bytes.toString(bytes)); 216 } 217 218 /** 219 * Convert tableCFs string into Map. 220 */ 221 public static Map<TableName, List<String>> parseTableCFsFromConfig(String tableCFsConfig) { 222 ReplicationProtos.TableCF[] tableCFs = convert(tableCFsConfig); 223 return convert2Map(tableCFs); 224 } 225 226 /** 227 * Convert tableCFs Object to Map. 228 */ 229 public static Map<TableName, List<String>> convert2Map(ReplicationProtos.TableCF[] tableCFs) { 230 if (tableCFs == null || tableCFs.length == 0) { 231 return null; 232 } 233 Map<TableName, List<String>> tableCFsMap = new HashMap<>(); 234 for (int i = 0, n = tableCFs.length; i < n; i++) { 235 ReplicationProtos.TableCF tableCF = tableCFs[i]; 236 List<String> families = new ArrayList<>(); 237 for (int j = 0, m = tableCF.getFamiliesCount(); j < m; j++) { 238 families.add(tableCF.getFamilies(j).toStringUtf8()); 239 } 240 if (families.size() > 0) { 241 tableCFsMap.put(ProtobufUtil.toTableName(tableCF.getTableName()), families); 242 } else { 243 tableCFsMap.put(ProtobufUtil.toTableName(tableCF.getTableName()), null); 244 } 245 } 246 247 return tableCFsMap; 248 } 249 250 /** 251 * Parse the serialized representation of a peer configuration. 252 * @param bytes Content of a peer znode. 253 * @return ClusterKey parsed from the passed bytes. 254 * @throws DeserializationException deserialization exception 255 */ 256 public static ReplicationPeerConfig parsePeerFrom(final byte[] bytes) 257 throws DeserializationException { 258 if (ProtobufUtil.isPBMagicPrefix(bytes)) { 259 int pbLen = ProtobufUtil.lengthOfPBMagic(); 260 ReplicationProtos.ReplicationPeer.Builder builder = 261 ReplicationProtos.ReplicationPeer.newBuilder(); 262 ReplicationProtos.ReplicationPeer peer; 263 try { 264 ProtobufUtil.mergeFrom(builder, bytes, pbLen, bytes.length - pbLen); 265 peer = builder.build(); 266 } catch (IOException e) { 267 throw new DeserializationException(e); 268 } 269 return convert(peer); 270 } else { 271 if (bytes == null || bytes.length <= 0) { 272 throw new DeserializationException("Bytes to deserialize should not be empty."); 273 } 274 return ReplicationPeerConfig.newBuilder().setClusterKey(Bytes.toString(bytes)).build(); 275 } 276 } 277 278 public static ReplicationPeerConfig convert(ReplicationProtos.ReplicationPeer peer) { 279 ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(); 280 if (peer.hasClusterkey()) { 281 builder.setClusterKey(peer.getClusterkey()); 282 } 283 if (peer.hasReplicationEndpointImpl()) { 284 builder.setReplicationEndpointImpl(peer.getReplicationEndpointImpl()); 285 } 286 287 for (HBaseProtos.BytesBytesPair pair : peer.getDataList()) { 288 builder.putPeerData(pair.getFirst().toByteArray(), pair.getSecond().toByteArray()); 289 } 290 291 for (HBaseProtos.NameStringPair pair : peer.getConfigurationList()) { 292 builder.putConfiguration(pair.getName(), pair.getValue()); 293 } 294 295 Map<TableName, List<String>> tableCFsMap = convert2Map( 296 peer.getTableCfsList().toArray(new ReplicationProtos.TableCF[peer.getTableCfsCount()])); 297 if (tableCFsMap != null) { 298 builder.setTableCFsMap(tableCFsMap); 299 } 300 301 List<ByteString> namespacesList = peer.getNamespacesList(); 302 if (namespacesList != null && namespacesList.size() != 0) { 303 builder.setNamespaces( 304 namespacesList.stream().map(ByteString::toStringUtf8).collect(Collectors.toSet())); 305 } 306 307 if (peer.hasBandwidth()) { 308 builder.setBandwidth(peer.getBandwidth()); 309 } 310 311 if (peer.hasReplicateAll()) { 312 builder.setReplicateAllUserTables(peer.getReplicateAll()); 313 } 314 315 if (peer.hasSerial()) { 316 builder.setSerial(peer.getSerial()); 317 } 318 319 Map<TableName, List<String>> excludeTableCFsMap = convert2Map(peer.getExcludeTableCfsList() 320 .toArray(new ReplicationProtos.TableCF[peer.getExcludeTableCfsCount()])); 321 if (excludeTableCFsMap != null) { 322 builder.setExcludeTableCFsMap(excludeTableCFsMap); 323 } 324 325 List<ByteString> excludeNamespacesList = peer.getExcludeNamespacesList(); 326 if (excludeNamespacesList != null && excludeNamespacesList.size() != 0) { 327 builder.setExcludeNamespaces( 328 excludeNamespacesList.stream().map(ByteString::toStringUtf8).collect(Collectors.toSet())); 329 } 330 331 return builder.build(); 332 } 333 334 public static ReplicationProtos.ReplicationPeer convert(ReplicationPeerConfig peerConfig) { 335 ReplicationProtos.ReplicationPeer.Builder builder = 336 ReplicationProtos.ReplicationPeer.newBuilder(); 337 // we used to set cluster key as required so here we must always set it, until we can make sure 338 // that no one uses the old proto file. 339 builder.setClusterkey(peerConfig.getClusterKey() != null ? peerConfig.getClusterKey() : ""); 340 if (peerConfig.getReplicationEndpointImpl() != null) { 341 builder.setReplicationEndpointImpl(peerConfig.getReplicationEndpointImpl()); 342 } 343 344 for (Map.Entry<byte[], byte[]> entry : peerConfig.getPeerData().entrySet()) { 345 builder.addData(HBaseProtos.BytesBytesPair.newBuilder() 346 .setFirst(UnsafeByteOperations.unsafeWrap(entry.getKey())) 347 .setSecond(UnsafeByteOperations.unsafeWrap(entry.getValue())).build()); 348 } 349 350 for (Map.Entry<String, String> entry : peerConfig.getConfiguration().entrySet()) { 351 builder.addConfiguration(HBaseProtos.NameStringPair.newBuilder().setName(entry.getKey()) 352 .setValue(entry.getValue()).build()); 353 } 354 355 ReplicationProtos.TableCF[] tableCFs = convert(peerConfig.getTableCFsMap()); 356 if (tableCFs != null) { 357 for (int i = 0; i < tableCFs.length; i++) { 358 builder.addTableCfs(tableCFs[i]); 359 } 360 } 361 Set<String> namespaces = peerConfig.getNamespaces(); 362 if (namespaces != null) { 363 for (String namespace : namespaces) { 364 builder.addNamespaces(ByteString.copyFromUtf8(namespace)); 365 } 366 } 367 368 builder.setBandwidth(peerConfig.getBandwidth()); 369 builder.setReplicateAll(peerConfig.replicateAllUserTables()); 370 builder.setSerial(peerConfig.isSerial()); 371 372 ReplicationProtos.TableCF[] excludeTableCFs = convert(peerConfig.getExcludeTableCFsMap()); 373 if (excludeTableCFs != null) { 374 for (int i = 0; i < excludeTableCFs.length; i++) { 375 builder.addExcludeTableCfs(excludeTableCFs[i]); 376 } 377 } 378 Set<String> excludeNamespaces = peerConfig.getExcludeNamespaces(); 379 if (excludeNamespaces != null) { 380 for (String namespace : excludeNamespaces) { 381 builder.addExcludeNamespaces(ByteString.copyFromUtf8(namespace)); 382 } 383 } 384 385 return builder.build(); 386 } 387 388 /** 389 * Returns Serialized protobuf of <code>peerConfig</code> with pb magic prefix prepended suitable 390 * for use as content of a this.peersZNode; i.e. the content of PEER_ID znode under 391 * /hbase/replication/peers/PEER_ID 392 */ 393 public static byte[] toByteArray(final ReplicationPeerConfig peerConfig) { 394 byte[] bytes = convert(peerConfig).toByteArray(); 395 return ProtobufUtil.prependPBMagic(bytes); 396 } 397 398 public static ReplicationPeerDescription 399 toReplicationPeerDescription(ReplicationProtos.ReplicationPeerDescription desc) { 400 boolean enabled = 401 ReplicationProtos.ReplicationState.State.ENABLED == desc.getState().getState(); 402 ReplicationPeerConfig config = convert(desc.getConfig()); 403 return new ReplicationPeerDescription(desc.getId(), enabled, config); 404 } 405 406 public static ReplicationProtos.ReplicationPeerDescription 407 toProtoReplicationPeerDescription(ReplicationPeerDescription desc) { 408 ReplicationProtos.ReplicationPeerDescription.Builder builder = 409 ReplicationProtos.ReplicationPeerDescription.newBuilder(); 410 builder.setId(desc.getPeerId()); 411 ReplicationProtos.ReplicationState.Builder stateBuilder = 412 ReplicationProtos.ReplicationState.newBuilder(); 413 stateBuilder.setState(desc.isEnabled() 414 ? ReplicationProtos.ReplicationState.State.ENABLED 415 : ReplicationProtos.ReplicationState.State.DISABLED); 416 builder.setState(stateBuilder.build()); 417 builder.setConfig(convert(desc.getPeerConfig())); 418 return builder.build(); 419 } 420 421 public static ReplicationPeerConfig appendTableCFsToReplicationPeerConfig( 422 Map<TableName, List<String>> tableCfs, ReplicationPeerConfig peerConfig) { 423 ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(peerConfig); 424 Map<TableName, List<String>> preTableCfs = peerConfig.getTableCFsMap(); 425 if (preTableCfs == null) { 426 builder.setTableCFsMap(tableCfs); 427 } else { 428 builder.setTableCFsMap(mergeTableCFs(preTableCfs, tableCfs)); 429 } 430 return builder.build(); 431 } 432 433 /** 434 * Helper method to add/removev base peer configs from Configuration to ReplicationPeerConfig This 435 * merges the user supplied peer configuration 436 * {@link org.apache.hadoop.hbase.replication.ReplicationPeerConfig} with peer configs provided as 437 * property hbase.replication.peer.base.configs in hbase configuration. Expected format for this 438 * hbase configuration is "k1=v1;k2=v2,v2_1;k3=""". If value is empty, it will remove the existing 439 * key-value from peer config. 440 * @param conf Configuration 441 * @return ReplicationPeerConfig containing updated configs. 442 */ 443 public static ReplicationPeerConfig updateReplicationBasePeerConfigs(Configuration conf, 444 ReplicationPeerConfig receivedPeerConfig) { 445 ReplicationPeerConfigBuilder copiedPeerConfigBuilder = 446 ReplicationPeerConfig.newBuilder(receivedPeerConfig); 447 448 Map<String, String> receivedPeerConfigMap = receivedPeerConfig.getConfiguration(); 449 String basePeerConfigs = conf.get(HBASE_REPLICATION_PEER_BASE_CONFIG, ""); 450 if (basePeerConfigs.length() != 0) { 451 Map<String, String> basePeerConfigMap = Splitter.on(';').trimResults().omitEmptyStrings() 452 .withKeyValueSeparator("=").split(basePeerConfigs); 453 for (Map.Entry<String, String> entry : basePeerConfigMap.entrySet()) { 454 String configName = entry.getKey(); 455 String configValue = entry.getValue(); 456 // If the config is provided with empty value, for eg. k1="", 457 // we remove it from peer config. Providing config with empty value 458 // is required so that it doesn't remove any other config unknowingly. 459 if (Strings.isNullOrEmpty(configValue)) { 460 copiedPeerConfigBuilder.removeConfiguration(configName); 461 } else if (!receivedPeerConfigMap.getOrDefault(configName, "").equals(configValue)) { 462 // update the configuration if exact config and value doesn't exists 463 copiedPeerConfigBuilder.putConfiguration(configName, configValue); 464 } 465 } 466 } 467 468 return copiedPeerConfigBuilder.build(); 469 } 470 471 public static ReplicationPeerConfig appendExcludeTableCFsToReplicationPeerConfig( 472 Map<TableName, List<String>> excludeTableCfs, ReplicationPeerConfig peerConfig) 473 throws ReplicationException { 474 if (excludeTableCfs == null) { 475 throw new ReplicationException("exclude tableCfs is null"); 476 } 477 ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(peerConfig); 478 Map<TableName, List<String>> preExcludeTableCfs = peerConfig.getExcludeTableCFsMap(); 479 if (preExcludeTableCfs == null) { 480 builder.setExcludeTableCFsMap(excludeTableCfs); 481 } else { 482 builder.setExcludeTableCFsMap(mergeTableCFs(preExcludeTableCfs, excludeTableCfs)); 483 } 484 return builder.build(); 485 } 486 487 private static Map<TableName, List<String>> 488 mergeTableCFs(Map<TableName, List<String>> preTableCfs, Map<TableName, List<String>> tableCfs) { 489 Map<TableName, List<String>> newTableCfs = copyTableCFsMap(preTableCfs); 490 for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) { 491 TableName table = entry.getKey(); 492 Collection<String> appendCfs = entry.getValue(); 493 if (newTableCfs.containsKey(table)) { 494 List<String> cfs = newTableCfs.get(table); 495 if (cfs == null || appendCfs == null || appendCfs.isEmpty()) { 496 newTableCfs.put(table, null); 497 } else { 498 Set<String> cfSet = new HashSet<String>(cfs); 499 cfSet.addAll(appendCfs); 500 newTableCfs.put(table, Lists.newArrayList(cfSet)); 501 } 502 } else { 503 if (appendCfs == null || appendCfs.isEmpty()) { 504 newTableCfs.put(table, null); 505 } else { 506 newTableCfs.put(table, Lists.newArrayList(appendCfs)); 507 } 508 } 509 } 510 return newTableCfs; 511 } 512 513 private static Map<TableName, List<String>> 514 copyTableCFsMap(Map<TableName, List<String>> preTableCfs) { 515 Map<TableName, List<String>> newTableCfs = new HashMap<>(); 516 preTableCfs.forEach( 517 (table, cfs) -> newTableCfs.put(table, cfs != null ? Lists.newArrayList(cfs) : null)); 518 return newTableCfs; 519 } 520 521 public static ReplicationPeerConfig removeTableCFsFromReplicationPeerConfig( 522 Map<TableName, List<String>> tableCfs, ReplicationPeerConfig peerConfig, String id) 523 throws ReplicationException { 524 Map<TableName, List<String>> preTableCfs = peerConfig.getTableCFsMap(); 525 if (preTableCfs == null) { 526 throw new ReplicationException("Table-Cfs for peer: " + id + " is null"); 527 } 528 Map<TableName, List<String>> newTableCfs = copyTableCFsMap(preTableCfs); 529 for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) { 530 TableName table = entry.getKey(); 531 Collection<String> removeCfs = entry.getValue(); 532 if (newTableCfs.containsKey(table)) { 533 List<String> cfs = newTableCfs.get(table); 534 if (cfs == null && (removeCfs == null || removeCfs.isEmpty())) { 535 newTableCfs.remove(table); 536 } else if (cfs != null && (removeCfs != null && !removeCfs.isEmpty())) { 537 Set<String> cfSet = new HashSet<String>(cfs); 538 cfSet.removeAll(removeCfs); 539 if (cfSet.isEmpty()) { 540 newTableCfs.remove(table); 541 } else { 542 newTableCfs.put(table, Lists.newArrayList(cfSet)); 543 } 544 } else if (cfs == null && (removeCfs != null && !removeCfs.isEmpty())) { 545 throw new ReplicationException("Cannot remove cf of table: " + table 546 + " which doesn't specify cfs from table-cfs config in peer: " + id); 547 } else if (cfs != null && (removeCfs == null || removeCfs.isEmpty())) { 548 throw new ReplicationException("Cannot remove table: " + table 549 + " which has specified cfs from table-cfs config in peer: " + id); 550 } 551 } else { 552 throw new ReplicationException( 553 "No table: " + table + " in table-cfs config of peer: " + id); 554 } 555 } 556 ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(peerConfig); 557 builder.setTableCFsMap(newTableCfs); 558 return builder.build(); 559 } 560 561 public static ReplicationPeerConfig removeExcludeTableCFsFromReplicationPeerConfig( 562 Map<TableName, List<String>> excludeTableCfs, ReplicationPeerConfig peerConfig, String id) 563 throws ReplicationException { 564 if (excludeTableCfs == null) { 565 throw new ReplicationException("exclude tableCfs is null"); 566 } 567 Map<TableName, List<String>> preExcludeTableCfs = peerConfig.getExcludeTableCFsMap(); 568 if (preExcludeTableCfs == null) { 569 throw new ReplicationException("exclude-Table-Cfs for peer: " + id + " is null"); 570 } 571 Map<TableName, List<String>> newExcludeTableCfs = copyTableCFsMap(preExcludeTableCfs); 572 for (Map.Entry<TableName, ? extends Collection<String>> entry : excludeTableCfs.entrySet()) { 573 TableName table = entry.getKey(); 574 Collection<String> removeCfs = entry.getValue(); 575 if (newExcludeTableCfs.containsKey(table)) { 576 List<String> cfs = newExcludeTableCfs.get(table); 577 if (cfs == null && (removeCfs == null || removeCfs.isEmpty())) { 578 newExcludeTableCfs.remove(table); 579 } else if (cfs != null && (removeCfs != null && !removeCfs.isEmpty())) { 580 Set<String> cfSet = new HashSet<String>(cfs); 581 cfSet.removeAll(removeCfs); 582 if (cfSet.isEmpty()) { 583 newExcludeTableCfs.remove(table); 584 } else { 585 newExcludeTableCfs.put(table, Lists.newArrayList(cfSet)); 586 } 587 } else if (cfs == null && (removeCfs != null && !removeCfs.isEmpty())) { 588 throw new ReplicationException("Cannot remove cf of table: " + table 589 + " which doesn't specify cfs from exclude-table-cfs config in peer: " + id); 590 } else if (cfs != null && (removeCfs == null || removeCfs.isEmpty())) { 591 throw new ReplicationException("Cannot remove table: " + table 592 + " which has specified cfs from exclude-table-cfs config in peer: " + id); 593 } 594 } else { 595 throw new ReplicationException( 596 "No table: " + table + " in exclude-table-cfs config of peer: " + id); 597 } 598 } 599 ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(peerConfig); 600 builder.setExcludeTableCFsMap(newExcludeTableCfs); 601 return builder.build(); 602 } 603 604 /** 605 * Returns the configuration needed to talk to the remote slave cluster. 606 * @param conf the base configuration 607 * @param peer the description of replication peer 608 * @return the configuration for the peer cluster, null if it was unable to get the configuration 609 * @throws IOException when create peer cluster configuration failed 610 */ 611 public static Configuration getPeerClusterConfiguration(Configuration conf, 612 ReplicationPeerDescription peer) throws IOException { 613 ReplicationPeerConfig peerConfig = peer.getPeerConfig(); 614 Configuration otherConf; 615 try { 616 otherConf = HBaseConfiguration.createClusterConf(conf, peerConfig.getClusterKey()); 617 } catch (IOException e) { 618 throw new IOException("Can't get peer configuration for peerId=" + peer.getPeerId(), e); 619 } 620 621 if (!peerConfig.getConfiguration().isEmpty()) { 622 CompoundConfiguration compound = new CompoundConfiguration(); 623 compound.add(otherConf); 624 compound.addStringMap(peerConfig.getConfiguration()); 625 return compound; 626 } 627 return otherConf; 628 } 629}