001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client.replication; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Collection; 023import java.util.HashMap; 024import java.util.HashSet; 025import java.util.List; 026import java.util.Map; 027import java.util.Set; 028import java.util.stream.Collectors; 029import org.apache.commons.lang3.StringUtils; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.hbase.CompoundConfiguration; 032import org.apache.hadoop.hbase.HBaseConfiguration; 033import org.apache.hadoop.hbase.TableName; 034import org.apache.hadoop.hbase.exceptions.DeserializationException; 035import org.apache.hadoop.hbase.replication.ReplicationException; 036import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 037import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder; 038import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; 039import org.apache.hadoop.hbase.replication.SyncReplicationState; 040import org.apache.hadoop.hbase.util.Bytes; 041import org.apache.yetus.audience.InterfaceAudience; 042import org.apache.yetus.audience.InterfaceStability; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046import org.apache.hbase.thirdparty.com.google.common.base.Splitter; 047import org.apache.hbase.thirdparty.com.google.common.base.Strings; 048import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 049import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 050import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 051 052import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 053import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 054import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; 055 056/** 057 * Helper for TableCFs Operations. 058 */ 059@InterfaceAudience.Private 060@InterfaceStability.Stable 061public final class ReplicationPeerConfigUtil { 062 063 private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeerConfigUtil.class); 064 public static final String HBASE_REPLICATION_PEER_BASE_CONFIG = 065 "hbase.replication.peer.base.config"; 066 067 private ReplicationPeerConfigUtil() { 068 } 069 070 public static String convertToString(Set<String> namespaces) { 071 if (namespaces == null) { 072 return null; 073 } 074 return StringUtils.join(namespaces, ';'); 075 } 076 077 /** convert map to TableCFs Object */ 078 public static ReplicationProtos.TableCF[] 079 convert(Map<TableName, ? extends Collection<String>> tableCfs) { 080 if (tableCfs == null) { 081 return null; 082 } 083 List<ReplicationProtos.TableCF> tableCFList = new ArrayList<>(tableCfs.entrySet().size()); 084 ReplicationProtos.TableCF.Builder tableCFBuilder = ReplicationProtos.TableCF.newBuilder(); 085 for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) { 086 tableCFBuilder.clear(); 087 tableCFBuilder.setTableName(ProtobufUtil.toProtoTableName(entry.getKey())); 088 Collection<String> v = entry.getValue(); 089 if (v != null && !v.isEmpty()) { 090 for (String value : entry.getValue()) { 091 tableCFBuilder.addFamilies(ByteString.copyFromUtf8(value)); 092 } 093 } 094 tableCFList.add(tableCFBuilder.build()); 095 } 096 return tableCFList.toArray(new ReplicationProtos.TableCF[tableCFList.size()]); 097 } 098 099 public static String convertToString(Map<TableName, ? extends Collection<String>> tableCfs) { 100 if (tableCfs == null) { 101 return null; 102 } 103 return convert(convert(tableCfs)); 104 } 105 106 /** 107 * Convert string to TableCFs Object. This is only for read TableCFs information from TableCF 108 * node. Input String Format: ns1.table1:cf1,cf2;ns2.table2:cfA,cfB;ns3.table3. 109 */ 110 public static ReplicationProtos.TableCF[] convert(String tableCFsConfig) { 111 if (tableCFsConfig == null || tableCFsConfig.trim().length() == 0) { 112 return null; 113 } 114 115 ReplicationProtos.TableCF.Builder tableCFBuilder = ReplicationProtos.TableCF.newBuilder(); 116 String[] tables = tableCFsConfig.split(";"); 117 List<ReplicationProtos.TableCF> tableCFList = new ArrayList<>(tables.length); 118 119 for (String tab : tables) { 120 // 1 ignore empty table config 121 tab = tab.trim(); 122 if (tab.length() == 0) { 123 continue; 124 } 125 // 2 split to "table" and "cf1,cf2" 126 // for each table: "table#cf1,cf2" or "table" 127 String[] pair = tab.split(":"); 128 String tabName = pair[0].trim(); 129 if (pair.length > 2 || tabName.length() == 0) { 130 LOG.info("incorrect format:" + tableCFsConfig); 131 continue; 132 } 133 134 tableCFBuilder.clear(); 135 // split namespace from tableName 136 String ns = "default"; 137 String tName = tabName; 138 String[] dbs = tabName.split("\\."); 139 if (dbs != null && dbs.length == 2) { 140 ns = dbs[0]; 141 tName = dbs[1]; 142 } 143 tableCFBuilder.setTableName(ProtobufUtil.toProtoTableName(TableName.valueOf(ns, tName))); 144 145 // 3 parse "cf1,cf2" part to List<cf> 146 if (pair.length == 2) { 147 String[] cfsList = pair[1].split(","); 148 for (String cf : cfsList) { 149 String cfName = cf.trim(); 150 if (cfName.length() > 0) { 151 tableCFBuilder.addFamilies(ByteString.copyFromUtf8(cfName)); 152 } 153 } 154 } 155 tableCFList.add(tableCFBuilder.build()); 156 } 157 return tableCFList.toArray(new ReplicationProtos.TableCF[tableCFList.size()]); 158 } 159 160 /** 161 * Convert TableCFs Object to String. Output String Format: 162 * ns1.table1:cf1,cf2;ns2.table2:cfA,cfB;table3 163 */ 164 public static String convert(ReplicationProtos.TableCF[] tableCFs) { 165 StringBuilder sb = new StringBuilder(); 166 for (int i = 0, n = tableCFs.length; i < n; i++) { 167 ReplicationProtos.TableCF tableCF = tableCFs[i]; 168 String namespace = tableCF.getTableName().getNamespace().toStringUtf8(); 169 if (StringUtils.isNotEmpty(namespace)) { 170 sb.append(namespace).append(".") 171 .append(tableCF.getTableName().getQualifier().toStringUtf8()).append(":"); 172 } else { 173 sb.append(tableCF.getTableName().toString()).append(":"); 174 } 175 for (int j = 0; j < tableCF.getFamiliesCount(); j++) { 176 sb.append(tableCF.getFamilies(j).toStringUtf8()).append(","); 177 } 178 sb.deleteCharAt(sb.length() - 1).append(";"); 179 } 180 if (sb.length() > 0) { 181 sb.deleteCharAt(sb.length() - 1); 182 } 183 return sb.toString(); 184 } 185 186 /** 187 * Get TableCF in TableCFs, if not exist, return null. 188 */ 189 public static ReplicationProtos.TableCF getTableCF(ReplicationProtos.TableCF[] tableCFs, 190 String table) { 191 for (int i = 0, n = tableCFs.length; i < n; i++) { 192 ReplicationProtos.TableCF tableCF = tableCFs[i]; 193 if (tableCF.getTableName().getQualifier().toStringUtf8().equals(table)) { 194 return tableCF; 195 } 196 } 197 return null; 198 } 199 200 /** 201 * Parse bytes into TableCFs. It is used for backward compatibility. Old format bytes have no 202 * PB_MAGIC Header 203 */ 204 public static ReplicationProtos.TableCF[] parseTableCFs(byte[] bytes) throws IOException { 205 if (bytes == null) { 206 return null; 207 } 208 return ReplicationPeerConfigUtil.convert(Bytes.toString(bytes)); 209 } 210 211 /** 212 * Convert tableCFs string into Map. 213 */ 214 public static Map<TableName, List<String>> parseTableCFsFromConfig(String tableCFsConfig) { 215 ReplicationProtos.TableCF[] tableCFs = convert(tableCFsConfig); 216 return convert2Map(tableCFs); 217 } 218 219 /** 220 * Convert tableCFs Object to Map. 221 */ 222 public static Map<TableName, List<String>> convert2Map(ReplicationProtos.TableCF[] tableCFs) { 223 if (tableCFs == null || tableCFs.length == 0) { 224 return null; 225 } 226 Map<TableName, List<String>> tableCFsMap = new HashMap<>(); 227 for (int i = 0, n = tableCFs.length; i < n; i++) { 228 ReplicationProtos.TableCF tableCF = tableCFs[i]; 229 List<String> families = new ArrayList<>(); 230 for (int j = 0, m = tableCF.getFamiliesCount(); j < m; j++) { 231 families.add(tableCF.getFamilies(j).toStringUtf8()); 232 } 233 if (families.size() > 0) { 234 tableCFsMap.put(ProtobufUtil.toTableName(tableCF.getTableName()), families); 235 } else { 236 tableCFsMap.put(ProtobufUtil.toTableName(tableCF.getTableName()), null); 237 } 238 } 239 240 return tableCFsMap; 241 } 242 243 /** 244 * @param bytes Content of a peer znode. 245 * @return ClusterKey parsed from the passed bytes. 246 * @throws DeserializationException deserialization exception 247 */ 248 public static ReplicationPeerConfig parsePeerFrom(final byte[] bytes) 249 throws DeserializationException { 250 if (ProtobufUtil.isPBMagicPrefix(bytes)) { 251 int pbLen = ProtobufUtil.lengthOfPBMagic(); 252 ReplicationProtos.ReplicationPeer.Builder builder = 253 ReplicationProtos.ReplicationPeer.newBuilder(); 254 ReplicationProtos.ReplicationPeer peer; 255 try { 256 ProtobufUtil.mergeFrom(builder, bytes, pbLen, bytes.length - pbLen); 257 peer = builder.build(); 258 } catch (IOException e) { 259 throw new DeserializationException(e); 260 } 261 return convert(peer); 262 } else { 263 if (bytes == null || bytes.length <= 0) { 264 throw new DeserializationException("Bytes to deserialize should not be empty."); 265 } 266 return ReplicationPeerConfig.newBuilder().setClusterKey(Bytes.toString(bytes)).build(); 267 } 268 } 269 270 public static ReplicationPeerConfig convert(ReplicationProtos.ReplicationPeer peer) { 271 ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(); 272 if (peer.hasClusterkey()) { 273 builder.setClusterKey(peer.getClusterkey()); 274 } 275 if (peer.hasReplicationEndpointImpl()) { 276 builder.setReplicationEndpointImpl(peer.getReplicationEndpointImpl()); 277 } 278 279 for (HBaseProtos.BytesBytesPair pair : peer.getDataList()) { 280 builder.putPeerData(pair.getFirst().toByteArray(), pair.getSecond().toByteArray()); 281 } 282 283 for (HBaseProtos.NameStringPair pair : peer.getConfigurationList()) { 284 builder.putConfiguration(pair.getName(), pair.getValue()); 285 } 286 287 Map<TableName, List<String>> tableCFsMap = convert2Map( 288 peer.getTableCfsList().toArray(new ReplicationProtos.TableCF[peer.getTableCfsCount()])); 289 if (tableCFsMap != null) { 290 builder.setTableCFsMap(tableCFsMap); 291 } 292 293 List<ByteString> namespacesList = peer.getNamespacesList(); 294 if (namespacesList != null && namespacesList.size() != 0) { 295 builder.setNamespaces( 296 namespacesList.stream().map(ByteString::toStringUtf8).collect(Collectors.toSet())); 297 } 298 299 if (peer.hasBandwidth()) { 300 builder.setBandwidth(peer.getBandwidth()); 301 } 302 303 if (peer.hasReplicateAll()) { 304 builder.setReplicateAllUserTables(peer.getReplicateAll()); 305 } 306 307 if (peer.hasSerial()) { 308 builder.setSerial(peer.getSerial()); 309 } 310 311 Map<TableName, List<String>> excludeTableCFsMap = convert2Map(peer.getExcludeTableCfsList() 312 .toArray(new ReplicationProtos.TableCF[peer.getExcludeTableCfsCount()])); 313 if (excludeTableCFsMap != null) { 314 builder.setExcludeTableCFsMap(excludeTableCFsMap); 315 } 316 317 List<ByteString> excludeNamespacesList = peer.getExcludeNamespacesList(); 318 if (excludeNamespacesList != null && excludeNamespacesList.size() != 0) { 319 builder.setExcludeNamespaces( 320 excludeNamespacesList.stream().map(ByteString::toStringUtf8).collect(Collectors.toSet())); 321 } 322 323 if (peer.hasRemoteWALDir()) { 324 builder.setRemoteWALDir(peer.getRemoteWALDir()); 325 } 326 return builder.build(); 327 } 328 329 public static ReplicationProtos.ReplicationPeer convert(ReplicationPeerConfig peerConfig) { 330 ReplicationProtos.ReplicationPeer.Builder builder = 331 ReplicationProtos.ReplicationPeer.newBuilder(); 332 // we used to set cluster key as required so here we must always set it, until we can make sure 333 // that no one uses the old proto file. 334 builder.setClusterkey(peerConfig.getClusterKey() != null ? peerConfig.getClusterKey() : ""); 335 if (peerConfig.getReplicationEndpointImpl() != null) { 336 builder.setReplicationEndpointImpl(peerConfig.getReplicationEndpointImpl()); 337 } 338 339 for (Map.Entry<byte[], byte[]> entry : peerConfig.getPeerData().entrySet()) { 340 builder.addData(HBaseProtos.BytesBytesPair.newBuilder() 341 .setFirst(UnsafeByteOperations.unsafeWrap(entry.getKey())) 342 .setSecond(UnsafeByteOperations.unsafeWrap(entry.getValue())).build()); 343 } 344 345 for (Map.Entry<String, String> entry : peerConfig.getConfiguration().entrySet()) { 346 builder.addConfiguration(HBaseProtos.NameStringPair.newBuilder().setName(entry.getKey()) 347 .setValue(entry.getValue()).build()); 348 } 349 350 ReplicationProtos.TableCF[] tableCFs = convert(peerConfig.getTableCFsMap()); 351 if (tableCFs != null) { 352 for (int i = 0; i < tableCFs.length; i++) { 353 builder.addTableCfs(tableCFs[i]); 354 } 355 } 356 Set<String> namespaces = peerConfig.getNamespaces(); 357 if (namespaces != null) { 358 for (String namespace : namespaces) { 359 builder.addNamespaces(ByteString.copyFromUtf8(namespace)); 360 } 361 } 362 363 builder.setBandwidth(peerConfig.getBandwidth()); 364 builder.setReplicateAll(peerConfig.replicateAllUserTables()); 365 builder.setSerial(peerConfig.isSerial()); 366 367 ReplicationProtos.TableCF[] excludeTableCFs = convert(peerConfig.getExcludeTableCFsMap()); 368 if (excludeTableCFs != null) { 369 for (int i = 0; i < excludeTableCFs.length; i++) { 370 builder.addExcludeTableCfs(excludeTableCFs[i]); 371 } 372 } 373 Set<String> excludeNamespaces = peerConfig.getExcludeNamespaces(); 374 if (excludeNamespaces != null) { 375 for (String namespace : excludeNamespaces) { 376 builder.addExcludeNamespaces(ByteString.copyFromUtf8(namespace)); 377 } 378 } 379 380 if (peerConfig.getRemoteWALDir() != null) { 381 builder.setRemoteWALDir(peerConfig.getRemoteWALDir()); 382 } 383 return builder.build(); 384 } 385 386 /** 387 * @param peerConfig peer config of replication peer 388 * @return Serialized protobuf of <code>peerConfig</code> with pb magic prefix prepended suitable 389 * for use as content of a this.peersZNode; i.e. the content of PEER_ID znode under 390 * /hbase/replication/peers/PEER_ID 391 */ 392 public static byte[] toByteArray(final ReplicationPeerConfig peerConfig) { 393 byte[] bytes = convert(peerConfig).toByteArray(); 394 return ProtobufUtil.prependPBMagic(bytes); 395 } 396 397 public static ReplicationPeerDescription 398 toReplicationPeerDescription(ReplicationProtos.ReplicationPeerDescription desc) { 399 boolean enabled = 400 ReplicationProtos.ReplicationState.State.ENABLED == desc.getState().getState(); 401 ReplicationPeerConfig config = convert(desc.getConfig()); 402 return new ReplicationPeerDescription(desc.getId(), enabled, config, 403 toSyncReplicationState(desc.getSyncReplicationState())); 404 } 405 406 public static ReplicationProtos.ReplicationPeerDescription 407 toProtoReplicationPeerDescription(ReplicationPeerDescription desc) { 408 ReplicationProtos.ReplicationPeerDescription.Builder builder = 409 ReplicationProtos.ReplicationPeerDescription.newBuilder(); 410 builder.setId(desc.getPeerId()); 411 412 ReplicationProtos.ReplicationState.Builder stateBuilder = 413 ReplicationProtos.ReplicationState.newBuilder(); 414 stateBuilder.setState(desc.isEnabled() 415 ? ReplicationProtos.ReplicationState.State.ENABLED 416 : ReplicationProtos.ReplicationState.State.DISABLED); 417 builder.setState(stateBuilder.build()); 418 419 builder.setConfig(convert(desc.getPeerConfig())); 420 builder.setSyncReplicationState(toSyncReplicationState(desc.getSyncReplicationState())); 421 422 return builder.build(); 423 } 424 425 public static ReplicationProtos.SyncReplicationState 426 toSyncReplicationState(SyncReplicationState state) { 427 ReplicationProtos.SyncReplicationState.Builder syncReplicationStateBuilder = 428 ReplicationProtos.SyncReplicationState.newBuilder(); 429 syncReplicationStateBuilder 430 .setState(ReplicationProtos.SyncReplicationState.State.forNumber(state.ordinal())); 431 return syncReplicationStateBuilder.build(); 432 } 433 434 public static SyncReplicationState 435 toSyncReplicationState(ReplicationProtos.SyncReplicationState state) { 436 return SyncReplicationState.valueOf(state.getState().getNumber()); 437 } 438 439 public static ReplicationPeerConfig appendTableCFsToReplicationPeerConfig( 440 Map<TableName, List<String>> tableCfs, ReplicationPeerConfig peerConfig) { 441 ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(peerConfig); 442 Map<TableName, List<String>> preTableCfs = peerConfig.getTableCFsMap(); 443 if (preTableCfs == null) { 444 builder.setTableCFsMap(tableCfs); 445 } else { 446 builder.setTableCFsMap(mergeTableCFs(preTableCfs, tableCfs)); 447 } 448 return builder.build(); 449 } 450 451 /** 452 * Helper method to add/removev base peer configs from Configuration to ReplicationPeerConfig This 453 * merges the user supplied peer configuration 454 * {@link org.apache.hadoop.hbase.replication.ReplicationPeerConfig} with peer configs provided as 455 * property hbase.replication.peer.base.configs in hbase configuration. Expected format for this 456 * hbase configuration is "k1=v1;k2=v2,v2_1;k3=""". If value is empty, it will remove the existing 457 * key-value from peer config. 458 * @param conf Configuration 459 * @return ReplicationPeerConfig containing updated configs. 460 */ 461 public static ReplicationPeerConfig updateReplicationBasePeerConfigs(Configuration conf, 462 ReplicationPeerConfig receivedPeerConfig) { 463 ReplicationPeerConfigBuilder copiedPeerConfigBuilder = 464 ReplicationPeerConfig.newBuilder(receivedPeerConfig); 465 466 Map<String, String> receivedPeerConfigMap = receivedPeerConfig.getConfiguration(); 467 String basePeerConfigs = conf.get(HBASE_REPLICATION_PEER_BASE_CONFIG, ""); 468 if (basePeerConfigs.length() != 0) { 469 Map<String, String> basePeerConfigMap = Splitter.on(';').trimResults().omitEmptyStrings() 470 .withKeyValueSeparator("=").split(basePeerConfigs); 471 for (Map.Entry<String, String> entry : basePeerConfigMap.entrySet()) { 472 String configName = entry.getKey(); 473 String configValue = entry.getValue(); 474 // If the config is provided with empty value, for eg. k1="", 475 // we remove it from peer config. Providing config with empty value 476 // is required so that it doesn't remove any other config unknowingly. 477 if (Strings.isNullOrEmpty(configValue)) { 478 copiedPeerConfigBuilder.removeConfiguration(configName); 479 } else if (!receivedPeerConfigMap.getOrDefault(configName, "").equals(configValue)) { 480 // update the configuration if exact config and value doesn't exists 481 copiedPeerConfigBuilder.putConfiguration(configName, configValue); 482 } 483 } 484 } 485 486 return copiedPeerConfigBuilder.build(); 487 } 488 489 public static ReplicationPeerConfig appendExcludeTableCFsToReplicationPeerConfig( 490 Map<TableName, List<String>> excludeTableCfs, ReplicationPeerConfig peerConfig) 491 throws ReplicationException { 492 if (excludeTableCfs == null) { 493 throw new ReplicationException("exclude tableCfs is null"); 494 } 495 ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(peerConfig); 496 Map<TableName, List<String>> preExcludeTableCfs = peerConfig.getExcludeTableCFsMap(); 497 if (preExcludeTableCfs == null) { 498 builder.setExcludeTableCFsMap(excludeTableCfs); 499 } else { 500 builder.setExcludeTableCFsMap(mergeTableCFs(preExcludeTableCfs, excludeTableCfs)); 501 } 502 return builder.build(); 503 } 504 505 private static Map<TableName, List<String>> 506 mergeTableCFs(Map<TableName, List<String>> preTableCfs, Map<TableName, List<String>> tableCfs) { 507 Map<TableName, List<String>> newTableCfs = copyTableCFsMap(preTableCfs); 508 for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) { 509 TableName table = entry.getKey(); 510 Collection<String> appendCfs = entry.getValue(); 511 if (newTableCfs.containsKey(table)) { 512 List<String> cfs = newTableCfs.get(table); 513 if (cfs == null || appendCfs == null || appendCfs.isEmpty()) { 514 newTableCfs.put(table, null); 515 } else { 516 Set<String> cfSet = new HashSet<String>(cfs); 517 cfSet.addAll(appendCfs); 518 newTableCfs.put(table, Lists.newArrayList(cfSet)); 519 } 520 } else { 521 if (appendCfs == null || appendCfs.isEmpty()) { 522 newTableCfs.put(table, null); 523 } else { 524 newTableCfs.put(table, Lists.newArrayList(appendCfs)); 525 } 526 } 527 } 528 return newTableCfs; 529 } 530 531 private static Map<TableName, List<String>> 532 copyTableCFsMap(Map<TableName, List<String>> preTableCfs) { 533 Map<TableName, List<String>> newTableCfs = new HashMap<>(); 534 preTableCfs.forEach( 535 (table, cfs) -> newTableCfs.put(table, cfs != null ? Lists.newArrayList(cfs) : null)); 536 return newTableCfs; 537 } 538 539 public static ReplicationPeerConfig removeTableCFsFromReplicationPeerConfig( 540 Map<TableName, List<String>> tableCfs, ReplicationPeerConfig peerConfig, String id) 541 throws ReplicationException { 542 Map<TableName, List<String>> preTableCfs = peerConfig.getTableCFsMap(); 543 if (preTableCfs == null) { 544 throw new ReplicationException("Table-Cfs for peer: " + id + " is null"); 545 } 546 Map<TableName, List<String>> newTableCfs = copyTableCFsMap(preTableCfs); 547 for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) { 548 TableName table = entry.getKey(); 549 Collection<String> removeCfs = entry.getValue(); 550 if (newTableCfs.containsKey(table)) { 551 List<String> cfs = newTableCfs.get(table); 552 if (cfs == null && (removeCfs == null || removeCfs.isEmpty())) { 553 newTableCfs.remove(table); 554 } else if (cfs != null && (removeCfs != null && !removeCfs.isEmpty())) { 555 Set<String> cfSet = new HashSet<String>(cfs); 556 cfSet.removeAll(removeCfs); 557 if (cfSet.isEmpty()) { 558 newTableCfs.remove(table); 559 } else { 560 newTableCfs.put(table, Lists.newArrayList(cfSet)); 561 } 562 } else if (cfs == null && (removeCfs != null && !removeCfs.isEmpty())) { 563 throw new ReplicationException("Cannot remove cf of table: " + table 564 + " which doesn't specify cfs from table-cfs config in peer: " + id); 565 } else if (cfs != null && (removeCfs == null || removeCfs.isEmpty())) { 566 throw new ReplicationException("Cannot remove table: " + table 567 + " which has specified cfs from table-cfs config in peer: " + id); 568 } 569 } else { 570 throw new ReplicationException( 571 "No table: " + table + " in table-cfs config of peer: " + id); 572 } 573 } 574 ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(peerConfig); 575 builder.setTableCFsMap(newTableCfs); 576 return builder.build(); 577 } 578 579 public static ReplicationPeerConfig removeExcludeTableCFsFromReplicationPeerConfig( 580 Map<TableName, List<String>> excludeTableCfs, ReplicationPeerConfig peerConfig, String id) 581 throws ReplicationException { 582 if (excludeTableCfs == null) { 583 throw new ReplicationException("exclude tableCfs is null"); 584 } 585 Map<TableName, List<String>> preExcludeTableCfs = peerConfig.getExcludeTableCFsMap(); 586 if (preExcludeTableCfs == null) { 587 throw new ReplicationException("exclude-Table-Cfs for peer: " + id + " is null"); 588 } 589 Map<TableName, List<String>> newExcludeTableCfs = copyTableCFsMap(preExcludeTableCfs); 590 for (Map.Entry<TableName, ? extends Collection<String>> entry : excludeTableCfs.entrySet()) { 591 TableName table = entry.getKey(); 592 Collection<String> removeCfs = entry.getValue(); 593 if (newExcludeTableCfs.containsKey(table)) { 594 List<String> cfs = newExcludeTableCfs.get(table); 595 if (cfs == null && (removeCfs == null || removeCfs.isEmpty())) { 596 newExcludeTableCfs.remove(table); 597 } else if (cfs != null && (removeCfs != null && !removeCfs.isEmpty())) { 598 Set<String> cfSet = new HashSet<String>(cfs); 599 cfSet.removeAll(removeCfs); 600 if (cfSet.isEmpty()) { 601 newExcludeTableCfs.remove(table); 602 } else { 603 newExcludeTableCfs.put(table, Lists.newArrayList(cfSet)); 604 } 605 } else if (cfs == null && (removeCfs != null && !removeCfs.isEmpty())) { 606 throw new ReplicationException("Cannot remove cf of table: " + table 607 + " which doesn't specify cfs from exclude-table-cfs config in peer: " + id); 608 } else if (cfs != null && (removeCfs == null || removeCfs.isEmpty())) { 609 throw new ReplicationException("Cannot remove table: " + table 610 + " which has specified cfs from exclude-table-cfs config in peer: " + id); 611 } 612 } else { 613 throw new ReplicationException( 614 "No table: " + table + " in exclude-table-cfs config of peer: " + id); 615 } 616 } 617 ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(peerConfig); 618 builder.setExcludeTableCFsMap(newExcludeTableCfs); 619 return builder.build(); 620 } 621 622 /** 623 * Returns the configuration needed to talk to the remote slave cluster. 624 * @param conf the base configuration 625 * @param peer the description of replication peer 626 * @return the configuration for the peer cluster, null if it was unable to get the configuration 627 * @throws IOException when create peer cluster configuration failed 628 */ 629 public static Configuration getPeerClusterConfiguration(Configuration conf, 630 ReplicationPeerDescription peer) throws IOException { 631 ReplicationPeerConfig peerConfig = peer.getPeerConfig(); 632 Configuration otherConf; 633 try { 634 otherConf = HBaseConfiguration.createClusterConf(conf, peerConfig.getClusterKey()); 635 } catch (IOException e) { 636 throw new IOException("Can't get peer configuration for peerId=" + peer.getPeerId(), e); 637 } 638 639 if (!peerConfig.getConfiguration().isEmpty()) { 640 CompoundConfiguration compound = new CompoundConfiguration(); 641 compound.add(otherConf); 642 compound.addStringMap(peerConfig.getConfiguration()); 643 return compound; 644 } 645 return otherConf; 646 } 647}