001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client.replication; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Collection; 023import java.util.HashMap; 024import java.util.HashSet; 025import java.util.Iterator; 026import java.util.List; 027import java.util.Map; 028import java.util.Set; 029import java.util.stream.Collectors; 030import org.apache.commons.lang3.StringUtils; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.hbase.CompoundConfiguration; 033import org.apache.hadoop.hbase.HBaseConfiguration; 034import org.apache.hadoop.hbase.TableName; 035import org.apache.hadoop.hbase.client.ConnectionRegistryFactory; 036import org.apache.hadoop.hbase.exceptions.DeserializationException; 037import org.apache.hadoop.hbase.replication.ReplicationException; 038import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 039import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder; 040import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; 041import org.apache.hadoop.hbase.replication.SyncReplicationState; 042import org.apache.hadoop.hbase.util.Bytes; 043import org.apache.yetus.audience.InterfaceAudience; 044import org.apache.yetus.audience.InterfaceStability; 045import org.slf4j.Logger; 046import org.slf4j.LoggerFactory; 047 048import org.apache.hbase.thirdparty.com.google.common.base.Splitter; 049import org.apache.hbase.thirdparty.com.google.common.base.Strings; 050import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 051import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 052import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 053 054import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 055import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 056import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; 057 058/** 059 * Helper for TableCFs Operations. 060 */ 061@InterfaceAudience.Private 062@InterfaceStability.Stable 063public final class ReplicationPeerConfigUtil { 064 065 private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeerConfigUtil.class); 066 public static final String HBASE_REPLICATION_PEER_BASE_CONFIG = 067 "hbase.replication.peer.base.config"; 068 069 private ReplicationPeerConfigUtil() { 070 } 071 072 public static String convertToString(Set<String> namespaces) { 073 if (namespaces == null) { 074 return null; 075 } 076 return StringUtils.join(namespaces, ';'); 077 } 078 079 /** convert map to TableCFs Object */ 080 public static ReplicationProtos.TableCF[] 081 convert(Map<TableName, ? extends Collection<String>> tableCfs) { 082 if (tableCfs == null) { 083 return null; 084 } 085 List<ReplicationProtos.TableCF> tableCFList = new ArrayList<>(tableCfs.entrySet().size()); 086 ReplicationProtos.TableCF.Builder tableCFBuilder = ReplicationProtos.TableCF.newBuilder(); 087 for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) { 088 tableCFBuilder.clear(); 089 tableCFBuilder.setTableName(ProtobufUtil.toProtoTableName(entry.getKey())); 090 Collection<String> v = entry.getValue(); 091 if (v != null && !v.isEmpty()) { 092 for (String value : entry.getValue()) { 093 tableCFBuilder.addFamilies(ByteString.copyFromUtf8(value)); 094 } 095 } 096 tableCFList.add(tableCFBuilder.build()); 097 } 098 return tableCFList.toArray(new ReplicationProtos.TableCF[tableCFList.size()]); 099 } 100 101 public static String convertToString(Map<TableName, ? extends Collection<String>> tableCfs) { 102 if (tableCfs == null) { 103 return null; 104 } 105 return convert(convert(tableCfs)); 106 } 107 108 /** 109 * Convert string to TableCFs Object. This is only for read TableCFs information from TableCF 110 * node. Input String Format: ns1.table1:cf1,cf2;ns2.table2:cfA,cfB;ns3.table3. 111 */ 112 public static ReplicationProtos.TableCF[] convert(String tableCFsConfig) { 113 if (tableCFsConfig == null || tableCFsConfig.trim().length() == 0) { 114 return null; 115 } 116 117 ReplicationProtos.TableCF.Builder tableCFBuilder = ReplicationProtos.TableCF.newBuilder(); 118 List<String> tables = Splitter.on(';').splitToList(tableCFsConfig); 119 List<ReplicationProtos.TableCF> tableCFList = new ArrayList<>(tables.size()); 120 121 for (String tab : tables) { 122 // 1 ignore empty table config 123 tab = tab.trim(); 124 if (tab.length() == 0) { 125 continue; 126 } 127 // 2 split to "table" and "cf1,cf2" 128 // for each table: "table#cf1,cf2" or "table" 129 List<String> pair = Splitter.on(':').splitToList(tab); 130 if (pair.size() > 2) { 131 LOG.info("incorrect format:" + tableCFsConfig); 132 continue; 133 } 134 assert pair.size() > 0; 135 Iterator<String> i = pair.iterator(); 136 String tabName = i.next().trim(); 137 if (tabName.length() == 0) { 138 LOG.info("incorrect format:" + tableCFsConfig); 139 continue; 140 } 141 142 tableCFBuilder.clear(); 143 // split namespace from tableName 144 String ns = "default"; 145 String tName = tabName; 146 List<String> dbs = Splitter.on('.').splitToList(tabName); 147 if (dbs != null && dbs.size() == 2) { 148 Iterator<String> ii = dbs.iterator(); 149 ns = ii.next(); 150 tName = ii.next(); 151 } 152 tableCFBuilder.setTableName(ProtobufUtil.toProtoTableName(TableName.valueOf(ns, tName))); 153 154 // 3 parse "cf1,cf2" part to List<cf> 155 if (i.hasNext()) { 156 List<String> cfsList = Splitter.on(',').splitToList(i.next()); 157 for (String cf : cfsList) { 158 String cfName = cf.trim(); 159 if (cfName.length() > 0) { 160 tableCFBuilder.addFamilies(ByteString.copyFromUtf8(cfName)); 161 } 162 } 163 } 164 tableCFList.add(tableCFBuilder.build()); 165 } 166 return tableCFList.toArray(new ReplicationProtos.TableCF[tableCFList.size()]); 167 } 168 169 /** 170 * Convert TableCFs Object to String. Output String Format: 171 * ns1.table1:cf1,cf2;ns2.table2:cfA,cfB;table3 172 */ 173 public static String convert(ReplicationProtos.TableCF[] tableCFs) { 174 StringBuilder sb = new StringBuilder(); 175 for (int i = 0, n = tableCFs.length; i < n; i++) { 176 ReplicationProtos.TableCF tableCF = tableCFs[i]; 177 String namespace = tableCF.getTableName().getNamespace().toStringUtf8(); 178 if (StringUtils.isNotEmpty(namespace)) { 179 sb.append(namespace).append(".") 180 .append(tableCF.getTableName().getQualifier().toStringUtf8()).append(":"); 181 } else { 182 sb.append(tableCF.getTableName().toString()).append(":"); 183 } 184 for (int j = 0; j < tableCF.getFamiliesCount(); j++) { 185 sb.append(tableCF.getFamilies(j).toStringUtf8()).append(","); 186 } 187 sb.deleteCharAt(sb.length() - 1).append(";"); 188 } 189 if (sb.length() > 0) { 190 sb.deleteCharAt(sb.length() - 1); 191 } 192 return sb.toString(); 193 } 194 195 /** 196 * Get TableCF in TableCFs, if not exist, return null. 197 */ 198 public static ReplicationProtos.TableCF getTableCF(ReplicationProtos.TableCF[] tableCFs, 199 String table) { 200 for (int i = 0, n = tableCFs.length; i < n; i++) { 201 ReplicationProtos.TableCF tableCF = tableCFs[i]; 202 if (tableCF.getTableName().getQualifier().toStringUtf8().equals(table)) { 203 return tableCF; 204 } 205 } 206 return null; 207 } 208 209 /** 210 * Parse bytes into TableCFs. It is used for backward compatibility. Old format bytes have no 211 * PB_MAGIC Header 212 */ 213 public static ReplicationProtos.TableCF[] parseTableCFs(byte[] bytes) throws IOException { 214 if (bytes == null) { 215 return null; 216 } 217 return ReplicationPeerConfigUtil.convert(Bytes.toString(bytes)); 218 } 219 220 /** 221 * Convert tableCFs string into Map. 222 */ 223 public static Map<TableName, List<String>> parseTableCFsFromConfig(String tableCFsConfig) { 224 ReplicationProtos.TableCF[] tableCFs = convert(tableCFsConfig); 225 return convert2Map(tableCFs); 226 } 227 228 /** 229 * Convert tableCFs Object to Map. 230 */ 231 public static Map<TableName, List<String>> convert2Map(ReplicationProtos.TableCF[] tableCFs) { 232 if (tableCFs == null || tableCFs.length == 0) { 233 return null; 234 } 235 Map<TableName, List<String>> tableCFsMap = new HashMap<>(); 236 for (int i = 0, n = tableCFs.length; i < n; i++) { 237 ReplicationProtos.TableCF tableCF = tableCFs[i]; 238 List<String> families = new ArrayList<>(); 239 for (int j = 0, m = tableCF.getFamiliesCount(); j < m; j++) { 240 families.add(tableCF.getFamilies(j).toStringUtf8()); 241 } 242 if (families.size() > 0) { 243 tableCFsMap.put(ProtobufUtil.toTableName(tableCF.getTableName()), families); 244 } else { 245 tableCFsMap.put(ProtobufUtil.toTableName(tableCF.getTableName()), null); 246 } 247 } 248 249 return tableCFsMap; 250 } 251 252 /** 253 * Parse the serialized representation of a peer configuration. 254 * @param bytes Content of a peer znode. 255 * @return ClusterKey parsed from the passed bytes. 256 * @throws DeserializationException deserialization exception 257 */ 258 public static ReplicationPeerConfig parsePeerFrom(final byte[] bytes) 259 throws DeserializationException { 260 if (ProtobufUtil.isPBMagicPrefix(bytes)) { 261 int pbLen = ProtobufUtil.lengthOfPBMagic(); 262 ReplicationProtos.ReplicationPeer.Builder builder = 263 ReplicationProtos.ReplicationPeer.newBuilder(); 264 ReplicationProtos.ReplicationPeer peer; 265 try { 266 ProtobufUtil.mergeFrom(builder, bytes, pbLen, bytes.length - pbLen); 267 peer = builder.build(); 268 } catch (IOException e) { 269 throw new DeserializationException(e); 270 } 271 return convert(peer); 272 } else { 273 if (bytes == null || bytes.length <= 0) { 274 throw new DeserializationException("Bytes to deserialize should not be empty."); 275 } 276 return ReplicationPeerConfig.newBuilder().setClusterKey(Bytes.toString(bytes)).build(); 277 } 278 } 279 280 public static ReplicationPeerConfig convert(ReplicationProtos.ReplicationPeer peer) { 281 ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(); 282 if (peer.hasClusterkey()) { 283 builder.setClusterKey(peer.getClusterkey()); 284 } 285 if (peer.hasReplicationEndpointImpl()) { 286 builder.setReplicationEndpointImpl(peer.getReplicationEndpointImpl()); 287 } 288 289 for (HBaseProtos.BytesBytesPair pair : peer.getDataList()) { 290 builder.putPeerData(pair.getFirst().toByteArray(), pair.getSecond().toByteArray()); 291 } 292 293 for (HBaseProtos.NameStringPair pair : peer.getConfigurationList()) { 294 builder.putConfiguration(pair.getName(), pair.getValue()); 295 } 296 297 Map<TableName, List<String>> tableCFsMap = convert2Map( 298 peer.getTableCfsList().toArray(new ReplicationProtos.TableCF[peer.getTableCfsCount()])); 299 if (tableCFsMap != null) { 300 builder.setTableCFsMap(tableCFsMap); 301 } 302 303 List<ByteString> namespacesList = peer.getNamespacesList(); 304 if (namespacesList != null && namespacesList.size() != 0) { 305 builder.setNamespaces( 306 namespacesList.stream().map(ByteString::toStringUtf8).collect(Collectors.toSet())); 307 } 308 309 if (peer.hasBandwidth()) { 310 builder.setBandwidth(peer.getBandwidth()); 311 } 312 313 if (peer.hasReplicateAll()) { 314 builder.setReplicateAllUserTables(peer.getReplicateAll()); 315 } 316 317 if (peer.hasSerial()) { 318 builder.setSerial(peer.getSerial()); 319 } 320 321 Map<TableName, List<String>> excludeTableCFsMap = convert2Map(peer.getExcludeTableCfsList() 322 .toArray(new ReplicationProtos.TableCF[peer.getExcludeTableCfsCount()])); 323 if (excludeTableCFsMap != null) { 324 builder.setExcludeTableCFsMap(excludeTableCFsMap); 325 } 326 327 List<ByteString> excludeNamespacesList = peer.getExcludeNamespacesList(); 328 if (excludeNamespacesList != null && excludeNamespacesList.size() != 0) { 329 builder.setExcludeNamespaces( 330 excludeNamespacesList.stream().map(ByteString::toStringUtf8).collect(Collectors.toSet())); 331 } 332 333 if (peer.hasRemoteWALDir()) { 334 builder.setRemoteWALDir(peer.getRemoteWALDir()); 335 } 336 return builder.build(); 337 } 338 339 public static ReplicationProtos.ReplicationPeer convert(ReplicationPeerConfig peerConfig) { 340 ReplicationProtos.ReplicationPeer.Builder builder = 341 ReplicationProtos.ReplicationPeer.newBuilder(); 342 // we used to set cluster key as required so here we must always set it, until we can make sure 343 // that no one uses the old proto file. 344 builder.setClusterkey(peerConfig.getClusterKey() != null ? peerConfig.getClusterKey() : ""); 345 if (peerConfig.getReplicationEndpointImpl() != null) { 346 builder.setReplicationEndpointImpl(peerConfig.getReplicationEndpointImpl()); 347 } 348 349 for (Map.Entry<byte[], byte[]> entry : peerConfig.getPeerData().entrySet()) { 350 builder.addData(HBaseProtos.BytesBytesPair.newBuilder() 351 .setFirst(UnsafeByteOperations.unsafeWrap(entry.getKey())) 352 .setSecond(UnsafeByteOperations.unsafeWrap(entry.getValue())).build()); 353 } 354 355 for (Map.Entry<String, String> entry : peerConfig.getConfiguration().entrySet()) { 356 builder.addConfiguration(HBaseProtos.NameStringPair.newBuilder().setName(entry.getKey()) 357 .setValue(entry.getValue()).build()); 358 } 359 360 ReplicationProtos.TableCF[] tableCFs = convert(peerConfig.getTableCFsMap()); 361 if (tableCFs != null) { 362 for (int i = 0; i < tableCFs.length; i++) { 363 builder.addTableCfs(tableCFs[i]); 364 } 365 } 366 Set<String> namespaces = peerConfig.getNamespaces(); 367 if (namespaces != null) { 368 for (String namespace : namespaces) { 369 builder.addNamespaces(ByteString.copyFromUtf8(namespace)); 370 } 371 } 372 373 builder.setBandwidth(peerConfig.getBandwidth()); 374 builder.setReplicateAll(peerConfig.replicateAllUserTables()); 375 builder.setSerial(peerConfig.isSerial()); 376 377 ReplicationProtos.TableCF[] excludeTableCFs = convert(peerConfig.getExcludeTableCFsMap()); 378 if (excludeTableCFs != null) { 379 for (int i = 0; i < excludeTableCFs.length; i++) { 380 builder.addExcludeTableCfs(excludeTableCFs[i]); 381 } 382 } 383 Set<String> excludeNamespaces = peerConfig.getExcludeNamespaces(); 384 if (excludeNamespaces != null) { 385 for (String namespace : excludeNamespaces) { 386 builder.addExcludeNamespaces(ByteString.copyFromUtf8(namespace)); 387 } 388 } 389 390 if (peerConfig.getRemoteWALDir() != null) { 391 builder.setRemoteWALDir(peerConfig.getRemoteWALDir()); 392 } 393 return builder.build(); 394 } 395 396 /** 397 * Returns Serialized protobuf of <code>peerConfig</code> with pb magic prefix prepended suitable 398 * for use as content of a this.peersZNode; i.e. the content of PEER_ID znode under 399 * /hbase/replication/peers/PEER_ID 400 */ 401 public static byte[] toByteArray(final ReplicationPeerConfig peerConfig) { 402 byte[] bytes = convert(peerConfig).toByteArray(); 403 return ProtobufUtil.prependPBMagic(bytes); 404 } 405 406 public static ReplicationPeerDescription 407 toReplicationPeerDescription(ReplicationProtos.ReplicationPeerDescription desc) { 408 boolean enabled = 409 ReplicationProtos.ReplicationState.State.ENABLED == desc.getState().getState(); 410 ReplicationPeerConfig config = convert(desc.getConfig()); 411 return new ReplicationPeerDescription(desc.getId(), enabled, config, 412 toSyncReplicationState(desc.getSyncReplicationState())); 413 } 414 415 public static ReplicationProtos.ReplicationPeerDescription 416 toProtoReplicationPeerDescription(ReplicationPeerDescription desc) { 417 ReplicationProtos.ReplicationPeerDescription.Builder builder = 418 ReplicationProtos.ReplicationPeerDescription.newBuilder(); 419 builder.setId(desc.getPeerId()); 420 421 ReplicationProtos.ReplicationState.Builder stateBuilder = 422 ReplicationProtos.ReplicationState.newBuilder(); 423 stateBuilder.setState(desc.isEnabled() 424 ? ReplicationProtos.ReplicationState.State.ENABLED 425 : ReplicationProtos.ReplicationState.State.DISABLED); 426 builder.setState(stateBuilder.build()); 427 428 builder.setConfig(convert(desc.getPeerConfig())); 429 builder.setSyncReplicationState(toSyncReplicationState(desc.getSyncReplicationState())); 430 431 return builder.build(); 432 } 433 434 public static ReplicationProtos.SyncReplicationState 435 toSyncReplicationState(SyncReplicationState state) { 436 ReplicationProtos.SyncReplicationState.Builder syncReplicationStateBuilder = 437 ReplicationProtos.SyncReplicationState.newBuilder(); 438 syncReplicationStateBuilder 439 .setState(ReplicationProtos.SyncReplicationState.State.forNumber(state.ordinal())); 440 return syncReplicationStateBuilder.build(); 441 } 442 443 public static SyncReplicationState 444 toSyncReplicationState(ReplicationProtos.SyncReplicationState state) { 445 return SyncReplicationState.valueOf(state.getState().getNumber()); 446 } 447 448 public static ReplicationPeerConfig appendTableCFsToReplicationPeerConfig( 449 Map<TableName, List<String>> tableCfs, ReplicationPeerConfig peerConfig) { 450 ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(peerConfig); 451 Map<TableName, List<String>> preTableCfs = peerConfig.getTableCFsMap(); 452 if (preTableCfs == null) { 453 builder.setTableCFsMap(tableCfs); 454 } else { 455 builder.setTableCFsMap(mergeTableCFs(preTableCfs, tableCfs)); 456 } 457 return builder.build(); 458 } 459 460 /** 461 * Helper method to add/removev base peer configs from Configuration to ReplicationPeerConfig This 462 * merges the user supplied peer configuration 463 * {@link org.apache.hadoop.hbase.replication.ReplicationPeerConfig} with peer configs provided as 464 * property hbase.replication.peer.base.configs in hbase configuration. Expected format for this 465 * hbase configuration is "k1=v1;k2=v2,v2_1;k3=""". If value is empty, it will remove the existing 466 * key-value from peer config. 467 * @param conf Configuration 468 * @return ReplicationPeerConfig containing updated configs. 469 */ 470 public static ReplicationPeerConfig updateReplicationBasePeerConfigs(Configuration conf, 471 ReplicationPeerConfig receivedPeerConfig) { 472 ReplicationPeerConfigBuilder copiedPeerConfigBuilder = 473 ReplicationPeerConfig.newBuilder(receivedPeerConfig); 474 475 Map<String, String> receivedPeerConfigMap = receivedPeerConfig.getConfiguration(); 476 String basePeerConfigs = conf.get(HBASE_REPLICATION_PEER_BASE_CONFIG, ""); 477 if (basePeerConfigs.length() != 0) { 478 Map<String, String> basePeerConfigMap = Splitter.on(';').trimResults().omitEmptyStrings() 479 .withKeyValueSeparator("=").split(basePeerConfigs); 480 for (Map.Entry<String, String> entry : basePeerConfigMap.entrySet()) { 481 String configName = entry.getKey(); 482 String configValue = entry.getValue(); 483 // If the config is provided with empty value, for eg. k1="", 484 // we remove it from peer config. Providing config with empty value 485 // is required so that it doesn't remove any other config unknowingly. 486 if (Strings.isNullOrEmpty(configValue)) { 487 copiedPeerConfigBuilder.removeConfiguration(configName); 488 } else if (!receivedPeerConfigMap.getOrDefault(configName, "").equals(configValue)) { 489 // update the configuration if exact config and value doesn't exists 490 copiedPeerConfigBuilder.putConfiguration(configName, configValue); 491 } 492 } 493 } 494 495 return copiedPeerConfigBuilder.build(); 496 } 497 498 public static ReplicationPeerConfig appendExcludeTableCFsToReplicationPeerConfig( 499 Map<TableName, List<String>> excludeTableCfs, ReplicationPeerConfig peerConfig) 500 throws ReplicationException { 501 if (excludeTableCfs == null) { 502 throw new ReplicationException("exclude tableCfs is null"); 503 } 504 ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(peerConfig); 505 Map<TableName, List<String>> preExcludeTableCfs = peerConfig.getExcludeTableCFsMap(); 506 if (preExcludeTableCfs == null) { 507 builder.setExcludeTableCFsMap(excludeTableCfs); 508 } else { 509 builder.setExcludeTableCFsMap(mergeTableCFs(preExcludeTableCfs, excludeTableCfs)); 510 } 511 return builder.build(); 512 } 513 514 private static Map<TableName, List<String>> 515 mergeTableCFs(Map<TableName, List<String>> preTableCfs, Map<TableName, List<String>> tableCfs) { 516 Map<TableName, List<String>> newTableCfs = copyTableCFsMap(preTableCfs); 517 for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) { 518 TableName table = entry.getKey(); 519 Collection<String> appendCfs = entry.getValue(); 520 if (newTableCfs.containsKey(table)) { 521 List<String> cfs = newTableCfs.get(table); 522 if (cfs == null || appendCfs == null || appendCfs.isEmpty()) { 523 newTableCfs.put(table, null); 524 } else { 525 Set<String> cfSet = new HashSet<String>(cfs); 526 cfSet.addAll(appendCfs); 527 newTableCfs.put(table, Lists.newArrayList(cfSet)); 528 } 529 } else { 530 if (appendCfs == null || appendCfs.isEmpty()) { 531 newTableCfs.put(table, null); 532 } else { 533 newTableCfs.put(table, Lists.newArrayList(appendCfs)); 534 } 535 } 536 } 537 return newTableCfs; 538 } 539 540 private static Map<TableName, List<String>> 541 copyTableCFsMap(Map<TableName, List<String>> preTableCfs) { 542 Map<TableName, List<String>> newTableCfs = new HashMap<>(); 543 preTableCfs.forEach( 544 (table, cfs) -> newTableCfs.put(table, cfs != null ? Lists.newArrayList(cfs) : null)); 545 return newTableCfs; 546 } 547 548 public static ReplicationPeerConfig removeTableCFsFromReplicationPeerConfig( 549 Map<TableName, List<String>> tableCfs, ReplicationPeerConfig peerConfig, String id) 550 throws ReplicationException { 551 Map<TableName, List<String>> preTableCfs = peerConfig.getTableCFsMap(); 552 if (preTableCfs == null) { 553 throw new ReplicationException("Table-Cfs for peer: " + id + " is null"); 554 } 555 Map<TableName, List<String>> newTableCfs = copyTableCFsMap(preTableCfs); 556 for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) { 557 TableName table = entry.getKey(); 558 Collection<String> removeCfs = entry.getValue(); 559 if (newTableCfs.containsKey(table)) { 560 List<String> cfs = newTableCfs.get(table); 561 if (cfs == null && (removeCfs == null || removeCfs.isEmpty())) { 562 newTableCfs.remove(table); 563 } else if (cfs != null && (removeCfs != null && !removeCfs.isEmpty())) { 564 Set<String> cfSet = new HashSet<String>(cfs); 565 cfSet.removeAll(removeCfs); 566 if (cfSet.isEmpty()) { 567 newTableCfs.remove(table); 568 } else { 569 newTableCfs.put(table, Lists.newArrayList(cfSet)); 570 } 571 } else if (cfs == null && (removeCfs != null && !removeCfs.isEmpty())) { 572 throw new ReplicationException("Cannot remove cf of table: " + table 573 + " which doesn't specify cfs from table-cfs config in peer: " + id); 574 } else if (cfs != null && (removeCfs == null || removeCfs.isEmpty())) { 575 throw new ReplicationException("Cannot remove table: " + table 576 + " which has specified cfs from table-cfs config in peer: " + id); 577 } 578 } else { 579 throw new ReplicationException( 580 "No table: " + table + " in table-cfs config of peer: " + id); 581 } 582 } 583 ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(peerConfig); 584 builder.setTableCFsMap(newTableCfs); 585 return builder.build(); 586 } 587 588 public static ReplicationPeerConfig removeExcludeTableCFsFromReplicationPeerConfig( 589 Map<TableName, List<String>> excludeTableCfs, ReplicationPeerConfig peerConfig, String id) 590 throws ReplicationException { 591 if (excludeTableCfs == null) { 592 throw new ReplicationException("exclude tableCfs is null"); 593 } 594 Map<TableName, List<String>> preExcludeTableCfs = peerConfig.getExcludeTableCFsMap(); 595 if (preExcludeTableCfs == null) { 596 throw new ReplicationException("exclude-Table-Cfs for peer: " + id + " is null"); 597 } 598 Map<TableName, List<String>> newExcludeTableCfs = copyTableCFsMap(preExcludeTableCfs); 599 for (Map.Entry<TableName, ? extends Collection<String>> entry : excludeTableCfs.entrySet()) { 600 TableName table = entry.getKey(); 601 Collection<String> removeCfs = entry.getValue(); 602 if (newExcludeTableCfs.containsKey(table)) { 603 List<String> cfs = newExcludeTableCfs.get(table); 604 if (cfs == null && (removeCfs == null || removeCfs.isEmpty())) { 605 newExcludeTableCfs.remove(table); 606 } else if (cfs != null && (removeCfs != null && !removeCfs.isEmpty())) { 607 Set<String> cfSet = new HashSet<String>(cfs); 608 cfSet.removeAll(removeCfs); 609 if (cfSet.isEmpty()) { 610 newExcludeTableCfs.remove(table); 611 } else { 612 newExcludeTableCfs.put(table, Lists.newArrayList(cfSet)); 613 } 614 } else if (cfs == null && (removeCfs != null && !removeCfs.isEmpty())) { 615 throw new ReplicationException("Cannot remove cf of table: " + table 616 + " which doesn't specify cfs from exclude-table-cfs config in peer: " + id); 617 } else if (cfs != null && (removeCfs == null || removeCfs.isEmpty())) { 618 throw new ReplicationException("Cannot remove table: " + table 619 + " which has specified cfs from exclude-table-cfs config in peer: " + id); 620 } 621 } else { 622 throw new ReplicationException( 623 "No table: " + table + " in exclude-table-cfs config of peer: " + id); 624 } 625 } 626 ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(peerConfig); 627 builder.setExcludeTableCFsMap(newExcludeTableCfs); 628 return builder.build(); 629 } 630 631 /** 632 * Returns the configuration needed to talk to the remote slave cluster. 633 * @param conf the base configuration 634 * @param peerConfig the peer config of replication peer 635 * @return the configuration for the peer cluster, null if it was unable to get the configuration 636 * @throws IOException when create peer cluster configuration failed 637 */ 638 public static Configuration getPeerClusterConfiguration(Configuration conf, 639 ReplicationPeerConfig peerConfig) throws IOException { 640 Configuration otherConf; 641 if (ConnectionRegistryFactory.tryParseAsConnectionURI(peerConfig.getClusterKey()) != null) { 642 otherConf = HBaseConfiguration.create(conf); 643 } else { 644 // only need to apply cluster key for old style cluster key 645 otherConf = HBaseConfiguration.createClusterConf(conf, peerConfig.getClusterKey()); 646 } 647 648 if (!peerConfig.getConfiguration().isEmpty()) { 649 CompoundConfiguration compound = new CompoundConfiguration(); 650 compound.add(otherConf); 651 compound.addStringMap(peerConfig.getConfiguration()); 652 return compound; 653 } 654 return otherConf; 655 } 656}