001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client.replication; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Collection; 023import java.util.HashMap; 024import java.util.HashSet; 025import java.util.Iterator; 026import java.util.List; 027import java.util.Map; 028import java.util.Set; 029import java.util.stream.Collectors; 030import org.apache.commons.lang3.StringUtils; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.hbase.CompoundConfiguration; 033import org.apache.hadoop.hbase.HBaseConfiguration; 034import org.apache.hadoop.hbase.TableName; 035import org.apache.hadoop.hbase.exceptions.DeserializationException; 036import org.apache.hadoop.hbase.replication.ReplicationException; 037import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 038import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder; 039import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; 040import org.apache.hadoop.hbase.replication.SyncReplicationState; 041import org.apache.hadoop.hbase.util.Bytes; 042import org.apache.yetus.audience.InterfaceAudience; 043import org.apache.yetus.audience.InterfaceStability; 044import org.slf4j.Logger; 045import org.slf4j.LoggerFactory; 046 047import org.apache.hbase.thirdparty.com.google.common.base.Splitter; 048import org.apache.hbase.thirdparty.com.google.common.base.Strings; 049import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 050import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 051import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 052 053import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 054import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 055import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; 056 057/** 058 * Helper for TableCFs Operations. 059 */ 060@InterfaceAudience.Private 061@InterfaceStability.Stable 062public final class ReplicationPeerConfigUtil { 063 064 private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeerConfigUtil.class); 065 public static final String HBASE_REPLICATION_PEER_BASE_CONFIG = 066 "hbase.replication.peer.base.config"; 067 068 private ReplicationPeerConfigUtil() { 069 } 070 071 public static String convertToString(Set<String> namespaces) { 072 if (namespaces == null) { 073 return null; 074 } 075 return StringUtils.join(namespaces, ';'); 076 } 077 078 /** convert map to TableCFs Object */ 079 public static ReplicationProtos.TableCF[] 080 convert(Map<TableName, ? extends Collection<String>> tableCfs) { 081 if (tableCfs == null) { 082 return null; 083 } 084 List<ReplicationProtos.TableCF> tableCFList = new ArrayList<>(tableCfs.entrySet().size()); 085 ReplicationProtos.TableCF.Builder tableCFBuilder = ReplicationProtos.TableCF.newBuilder(); 086 for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) { 087 tableCFBuilder.clear(); 088 tableCFBuilder.setTableName(ProtobufUtil.toProtoTableName(entry.getKey())); 089 Collection<String> v = entry.getValue(); 090 if (v != null && !v.isEmpty()) { 091 for (String value : entry.getValue()) { 092 tableCFBuilder.addFamilies(ByteString.copyFromUtf8(value)); 093 } 094 } 095 tableCFList.add(tableCFBuilder.build()); 096 } 097 return tableCFList.toArray(new ReplicationProtos.TableCF[tableCFList.size()]); 098 } 099 100 public static String convertToString(Map<TableName, ? extends Collection<String>> tableCfs) { 101 if (tableCfs == null) { 102 return null; 103 } 104 return convert(convert(tableCfs)); 105 } 106 107 /** 108 * Convert string to TableCFs Object. This is only for read TableCFs information from TableCF 109 * node. Input String Format: ns1.table1:cf1,cf2;ns2.table2:cfA,cfB;ns3.table3. 110 */ 111 public static ReplicationProtos.TableCF[] convert(String tableCFsConfig) { 112 if (tableCFsConfig == null || tableCFsConfig.trim().length() == 0) { 113 return null; 114 } 115 116 ReplicationProtos.TableCF.Builder tableCFBuilder = ReplicationProtos.TableCF.newBuilder(); 117 List<String> tables = Splitter.on(';').splitToList(tableCFsConfig); 118 List<ReplicationProtos.TableCF> tableCFList = new ArrayList<>(tables.size()); 119 120 for (String tab : tables) { 121 // 1 ignore empty table config 122 tab = tab.trim(); 123 if (tab.length() == 0) { 124 continue; 125 } 126 // 2 split to "table" and "cf1,cf2" 127 // for each table: "table#cf1,cf2" or "table" 128 List<String> pair = Splitter.on(':').splitToList(tab); 129 if (pair.size() > 2) { 130 LOG.info("incorrect format:" + tableCFsConfig); 131 continue; 132 } 133 assert pair.size() > 0; 134 Iterator<String> i = pair.iterator(); 135 String tabName = i.next().trim(); 136 if (tabName.length() == 0) { 137 LOG.info("incorrect format:" + tableCFsConfig); 138 continue; 139 } 140 141 tableCFBuilder.clear(); 142 // split namespace from tableName 143 String ns = "default"; 144 String tName = tabName; 145 List<String> dbs = Splitter.on('.').splitToList(tabName); 146 if (dbs != null && dbs.size() == 2) { 147 Iterator<String> ii = dbs.iterator(); 148 ns = ii.next(); 149 tName = ii.next(); 150 } 151 tableCFBuilder.setTableName(ProtobufUtil.toProtoTableName(TableName.valueOf(ns, tName))); 152 153 // 3 parse "cf1,cf2" part to List<cf> 154 if (i.hasNext()) { 155 List<String> cfsList = Splitter.on(',').splitToList(i.next()); 156 for (String cf : cfsList) { 157 String cfName = cf.trim(); 158 if (cfName.length() > 0) { 159 tableCFBuilder.addFamilies(ByteString.copyFromUtf8(cfName)); 160 } 161 } 162 } 163 tableCFList.add(tableCFBuilder.build()); 164 } 165 return tableCFList.toArray(new ReplicationProtos.TableCF[tableCFList.size()]); 166 } 167 168 /** 169 * Convert TableCFs Object to String. Output String Format: 170 * ns1.table1:cf1,cf2;ns2.table2:cfA,cfB;table3 171 */ 172 public static String convert(ReplicationProtos.TableCF[] tableCFs) { 173 StringBuilder sb = new StringBuilder(); 174 for (int i = 0, n = tableCFs.length; i < n; i++) { 175 ReplicationProtos.TableCF tableCF = tableCFs[i]; 176 String namespace = tableCF.getTableName().getNamespace().toStringUtf8(); 177 if (StringUtils.isNotEmpty(namespace)) { 178 sb.append(namespace).append(".") 179 .append(tableCF.getTableName().getQualifier().toStringUtf8()).append(":"); 180 } else { 181 sb.append(tableCF.getTableName().toString()).append(":"); 182 } 183 for (int j = 0; j < tableCF.getFamiliesCount(); j++) { 184 sb.append(tableCF.getFamilies(j).toStringUtf8()).append(","); 185 } 186 sb.deleteCharAt(sb.length() - 1).append(";"); 187 } 188 if (sb.length() > 0) { 189 sb.deleteCharAt(sb.length() - 1); 190 } 191 return sb.toString(); 192 } 193 194 /** 195 * Get TableCF in TableCFs, if not exist, return null. 196 */ 197 public static ReplicationProtos.TableCF getTableCF(ReplicationProtos.TableCF[] tableCFs, 198 String table) { 199 for (int i = 0, n = tableCFs.length; i < n; i++) { 200 ReplicationProtos.TableCF tableCF = tableCFs[i]; 201 if (tableCF.getTableName().getQualifier().toStringUtf8().equals(table)) { 202 return tableCF; 203 } 204 } 205 return null; 206 } 207 208 /** 209 * Parse bytes into TableCFs. It is used for backward compatibility. Old format bytes have no 210 * PB_MAGIC Header 211 */ 212 public static ReplicationProtos.TableCF[] parseTableCFs(byte[] bytes) throws IOException { 213 if (bytes == null) { 214 return null; 215 } 216 return ReplicationPeerConfigUtil.convert(Bytes.toString(bytes)); 217 } 218 219 /** 220 * Convert tableCFs string into Map. 221 */ 222 public static Map<TableName, List<String>> parseTableCFsFromConfig(String tableCFsConfig) { 223 ReplicationProtos.TableCF[] tableCFs = convert(tableCFsConfig); 224 return convert2Map(tableCFs); 225 } 226 227 /** 228 * Convert tableCFs Object to Map. 229 */ 230 public static Map<TableName, List<String>> convert2Map(ReplicationProtos.TableCF[] tableCFs) { 231 if (tableCFs == null || tableCFs.length == 0) { 232 return null; 233 } 234 Map<TableName, List<String>> tableCFsMap = new HashMap<>(); 235 for (int i = 0, n = tableCFs.length; i < n; i++) { 236 ReplicationProtos.TableCF tableCF = tableCFs[i]; 237 List<String> families = new ArrayList<>(); 238 for (int j = 0, m = tableCF.getFamiliesCount(); j < m; j++) { 239 families.add(tableCF.getFamilies(j).toStringUtf8()); 240 } 241 if (families.size() > 0) { 242 tableCFsMap.put(ProtobufUtil.toTableName(tableCF.getTableName()), families); 243 } else { 244 tableCFsMap.put(ProtobufUtil.toTableName(tableCF.getTableName()), null); 245 } 246 } 247 248 return tableCFsMap; 249 } 250 251 /** 252 * Parse the serialized representation of a peer configuration. 253 * @param bytes Content of a peer znode. 254 * @return ClusterKey parsed from the passed bytes. 255 * @throws DeserializationException deserialization exception 256 */ 257 public static ReplicationPeerConfig parsePeerFrom(final byte[] bytes) 258 throws DeserializationException { 259 if (ProtobufUtil.isPBMagicPrefix(bytes)) { 260 int pbLen = ProtobufUtil.lengthOfPBMagic(); 261 ReplicationProtos.ReplicationPeer.Builder builder = 262 ReplicationProtos.ReplicationPeer.newBuilder(); 263 ReplicationProtos.ReplicationPeer peer; 264 try { 265 ProtobufUtil.mergeFrom(builder, bytes, pbLen, bytes.length - pbLen); 266 peer = builder.build(); 267 } catch (IOException e) { 268 throw new DeserializationException(e); 269 } 270 return convert(peer); 271 } else { 272 if (bytes == null || bytes.length <= 0) { 273 throw new DeserializationException("Bytes to deserialize should not be empty."); 274 } 275 return ReplicationPeerConfig.newBuilder().setClusterKey(Bytes.toString(bytes)).build(); 276 } 277 } 278 279 public static ReplicationPeerConfig convert(ReplicationProtos.ReplicationPeer peer) { 280 ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(); 281 if (peer.hasClusterkey()) { 282 builder.setClusterKey(peer.getClusterkey()); 283 } 284 if (peer.hasReplicationEndpointImpl()) { 285 builder.setReplicationEndpointImpl(peer.getReplicationEndpointImpl()); 286 } 287 288 for (HBaseProtos.BytesBytesPair pair : peer.getDataList()) { 289 builder.putPeerData(pair.getFirst().toByteArray(), pair.getSecond().toByteArray()); 290 } 291 292 for (HBaseProtos.NameStringPair pair : peer.getConfigurationList()) { 293 builder.putConfiguration(pair.getName(), pair.getValue()); 294 } 295 296 Map<TableName, List<String>> tableCFsMap = convert2Map( 297 peer.getTableCfsList().toArray(new ReplicationProtos.TableCF[peer.getTableCfsCount()])); 298 if (tableCFsMap != null) { 299 builder.setTableCFsMap(tableCFsMap); 300 } 301 302 List<ByteString> namespacesList = peer.getNamespacesList(); 303 if (namespacesList != null && namespacesList.size() != 0) { 304 builder.setNamespaces( 305 namespacesList.stream().map(ByteString::toStringUtf8).collect(Collectors.toSet())); 306 } 307 308 if (peer.hasBandwidth()) { 309 builder.setBandwidth(peer.getBandwidth()); 310 } 311 312 if (peer.hasReplicateAll()) { 313 builder.setReplicateAllUserTables(peer.getReplicateAll()); 314 } 315 316 if (peer.hasSerial()) { 317 builder.setSerial(peer.getSerial()); 318 } 319 320 Map<TableName, List<String>> excludeTableCFsMap = convert2Map(peer.getExcludeTableCfsList() 321 .toArray(new ReplicationProtos.TableCF[peer.getExcludeTableCfsCount()])); 322 if (excludeTableCFsMap != null) { 323 builder.setExcludeTableCFsMap(excludeTableCFsMap); 324 } 325 326 List<ByteString> excludeNamespacesList = peer.getExcludeNamespacesList(); 327 if (excludeNamespacesList != null && excludeNamespacesList.size() != 0) { 328 builder.setExcludeNamespaces( 329 excludeNamespacesList.stream().map(ByteString::toStringUtf8).collect(Collectors.toSet())); 330 } 331 332 if (peer.hasRemoteWALDir()) { 333 builder.setRemoteWALDir(peer.getRemoteWALDir()); 334 } 335 return builder.build(); 336 } 337 338 public static ReplicationProtos.ReplicationPeer convert(ReplicationPeerConfig peerConfig) { 339 ReplicationProtos.ReplicationPeer.Builder builder = 340 ReplicationProtos.ReplicationPeer.newBuilder(); 341 // we used to set cluster key as required so here we must always set it, until we can make sure 342 // that no one uses the old proto file. 343 builder.setClusterkey(peerConfig.getClusterKey() != null ? peerConfig.getClusterKey() : ""); 344 if (peerConfig.getReplicationEndpointImpl() != null) { 345 builder.setReplicationEndpointImpl(peerConfig.getReplicationEndpointImpl()); 346 } 347 348 for (Map.Entry<byte[], byte[]> entry : peerConfig.getPeerData().entrySet()) { 349 builder.addData(HBaseProtos.BytesBytesPair.newBuilder() 350 .setFirst(UnsafeByteOperations.unsafeWrap(entry.getKey())) 351 .setSecond(UnsafeByteOperations.unsafeWrap(entry.getValue())).build()); 352 } 353 354 for (Map.Entry<String, String> entry : peerConfig.getConfiguration().entrySet()) { 355 builder.addConfiguration(HBaseProtos.NameStringPair.newBuilder().setName(entry.getKey()) 356 .setValue(entry.getValue()).build()); 357 } 358 359 ReplicationProtos.TableCF[] tableCFs = convert(peerConfig.getTableCFsMap()); 360 if (tableCFs != null) { 361 for (int i = 0; i < tableCFs.length; i++) { 362 builder.addTableCfs(tableCFs[i]); 363 } 364 } 365 Set<String> namespaces = peerConfig.getNamespaces(); 366 if (namespaces != null) { 367 for (String namespace : namespaces) { 368 builder.addNamespaces(ByteString.copyFromUtf8(namespace)); 369 } 370 } 371 372 builder.setBandwidth(peerConfig.getBandwidth()); 373 builder.setReplicateAll(peerConfig.replicateAllUserTables()); 374 builder.setSerial(peerConfig.isSerial()); 375 376 ReplicationProtos.TableCF[] excludeTableCFs = convert(peerConfig.getExcludeTableCFsMap()); 377 if (excludeTableCFs != null) { 378 for (int i = 0; i < excludeTableCFs.length; i++) { 379 builder.addExcludeTableCfs(excludeTableCFs[i]); 380 } 381 } 382 Set<String> excludeNamespaces = peerConfig.getExcludeNamespaces(); 383 if (excludeNamespaces != null) { 384 for (String namespace : excludeNamespaces) { 385 builder.addExcludeNamespaces(ByteString.copyFromUtf8(namespace)); 386 } 387 } 388 389 if (peerConfig.getRemoteWALDir() != null) { 390 builder.setRemoteWALDir(peerConfig.getRemoteWALDir()); 391 } 392 return builder.build(); 393 } 394 395 /** 396 * Returns Serialized protobuf of <code>peerConfig</code> with pb magic prefix prepended suitable 397 * for use as content of a this.peersZNode; i.e. the content of PEER_ID znode under 398 * /hbase/replication/peers/PEER_ID 399 */ 400 public static byte[] toByteArray(final ReplicationPeerConfig peerConfig) { 401 byte[] bytes = convert(peerConfig).toByteArray(); 402 return ProtobufUtil.prependPBMagic(bytes); 403 } 404 405 public static ReplicationPeerDescription 406 toReplicationPeerDescription(ReplicationProtos.ReplicationPeerDescription desc) { 407 boolean enabled = 408 ReplicationProtos.ReplicationState.State.ENABLED == desc.getState().getState(); 409 ReplicationPeerConfig config = convert(desc.getConfig()); 410 return new ReplicationPeerDescription(desc.getId(), enabled, config, 411 toSyncReplicationState(desc.getSyncReplicationState())); 412 } 413 414 public static ReplicationProtos.ReplicationPeerDescription 415 toProtoReplicationPeerDescription(ReplicationPeerDescription desc) { 416 ReplicationProtos.ReplicationPeerDescription.Builder builder = 417 ReplicationProtos.ReplicationPeerDescription.newBuilder(); 418 builder.setId(desc.getPeerId()); 419 420 ReplicationProtos.ReplicationState.Builder stateBuilder = 421 ReplicationProtos.ReplicationState.newBuilder(); 422 stateBuilder.setState(desc.isEnabled() 423 ? ReplicationProtos.ReplicationState.State.ENABLED 424 : ReplicationProtos.ReplicationState.State.DISABLED); 425 builder.setState(stateBuilder.build()); 426 427 builder.setConfig(convert(desc.getPeerConfig())); 428 builder.setSyncReplicationState(toSyncReplicationState(desc.getSyncReplicationState())); 429 430 return builder.build(); 431 } 432 433 public static ReplicationProtos.SyncReplicationState 434 toSyncReplicationState(SyncReplicationState state) { 435 ReplicationProtos.SyncReplicationState.Builder syncReplicationStateBuilder = 436 ReplicationProtos.SyncReplicationState.newBuilder(); 437 syncReplicationStateBuilder 438 .setState(ReplicationProtos.SyncReplicationState.State.forNumber(state.ordinal())); 439 return syncReplicationStateBuilder.build(); 440 } 441 442 public static SyncReplicationState 443 toSyncReplicationState(ReplicationProtos.SyncReplicationState state) { 444 return SyncReplicationState.valueOf(state.getState().getNumber()); 445 } 446 447 public static ReplicationPeerConfig appendTableCFsToReplicationPeerConfig( 448 Map<TableName, List<String>> tableCfs, ReplicationPeerConfig peerConfig) { 449 ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(peerConfig); 450 Map<TableName, List<String>> preTableCfs = peerConfig.getTableCFsMap(); 451 if (preTableCfs == null) { 452 builder.setTableCFsMap(tableCfs); 453 } else { 454 builder.setTableCFsMap(mergeTableCFs(preTableCfs, tableCfs)); 455 } 456 return builder.build(); 457 } 458 459 /** 460 * Helper method to add/removev base peer configs from Configuration to ReplicationPeerConfig This 461 * merges the user supplied peer configuration 462 * {@link org.apache.hadoop.hbase.replication.ReplicationPeerConfig} with peer configs provided as 463 * property hbase.replication.peer.base.configs in hbase configuration. Expected format for this 464 * hbase configuration is "k1=v1;k2=v2,v2_1;k3=""". If value is empty, it will remove the existing 465 * key-value from peer config. 466 * @param conf Configuration 467 * @return ReplicationPeerConfig containing updated configs. 468 */ 469 public static ReplicationPeerConfig updateReplicationBasePeerConfigs(Configuration conf, 470 ReplicationPeerConfig receivedPeerConfig) { 471 ReplicationPeerConfigBuilder copiedPeerConfigBuilder = 472 ReplicationPeerConfig.newBuilder(receivedPeerConfig); 473 474 Map<String, String> receivedPeerConfigMap = receivedPeerConfig.getConfiguration(); 475 String basePeerConfigs = conf.get(HBASE_REPLICATION_PEER_BASE_CONFIG, ""); 476 if (basePeerConfigs.length() != 0) { 477 Map<String, String> basePeerConfigMap = Splitter.on(';').trimResults().omitEmptyStrings() 478 .withKeyValueSeparator("=").split(basePeerConfigs); 479 for (Map.Entry<String, String> entry : basePeerConfigMap.entrySet()) { 480 String configName = entry.getKey(); 481 String configValue = entry.getValue(); 482 // If the config is provided with empty value, for eg. k1="", 483 // we remove it from peer config. Providing config with empty value 484 // is required so that it doesn't remove any other config unknowingly. 485 if (Strings.isNullOrEmpty(configValue)) { 486 copiedPeerConfigBuilder.removeConfiguration(configName); 487 } else if (!receivedPeerConfigMap.getOrDefault(configName, "").equals(configValue)) { 488 // update the configuration if exact config and value doesn't exists 489 copiedPeerConfigBuilder.putConfiguration(configName, configValue); 490 } 491 } 492 } 493 494 return copiedPeerConfigBuilder.build(); 495 } 496 497 public static ReplicationPeerConfig appendExcludeTableCFsToReplicationPeerConfig( 498 Map<TableName, List<String>> excludeTableCfs, ReplicationPeerConfig peerConfig) 499 throws ReplicationException { 500 if (excludeTableCfs == null) { 501 throw new ReplicationException("exclude tableCfs is null"); 502 } 503 ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(peerConfig); 504 Map<TableName, List<String>> preExcludeTableCfs = peerConfig.getExcludeTableCFsMap(); 505 if (preExcludeTableCfs == null) { 506 builder.setExcludeTableCFsMap(excludeTableCfs); 507 } else { 508 builder.setExcludeTableCFsMap(mergeTableCFs(preExcludeTableCfs, excludeTableCfs)); 509 } 510 return builder.build(); 511 } 512 513 private static Map<TableName, List<String>> 514 mergeTableCFs(Map<TableName, List<String>> preTableCfs, Map<TableName, List<String>> tableCfs) { 515 Map<TableName, List<String>> newTableCfs = copyTableCFsMap(preTableCfs); 516 for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) { 517 TableName table = entry.getKey(); 518 Collection<String> appendCfs = entry.getValue(); 519 if (newTableCfs.containsKey(table)) { 520 List<String> cfs = newTableCfs.get(table); 521 if (cfs == null || appendCfs == null || appendCfs.isEmpty()) { 522 newTableCfs.put(table, null); 523 } else { 524 Set<String> cfSet = new HashSet<String>(cfs); 525 cfSet.addAll(appendCfs); 526 newTableCfs.put(table, Lists.newArrayList(cfSet)); 527 } 528 } else { 529 if (appendCfs == null || appendCfs.isEmpty()) { 530 newTableCfs.put(table, null); 531 } else { 532 newTableCfs.put(table, Lists.newArrayList(appendCfs)); 533 } 534 } 535 } 536 return newTableCfs; 537 } 538 539 private static Map<TableName, List<String>> 540 copyTableCFsMap(Map<TableName, List<String>> preTableCfs) { 541 Map<TableName, List<String>> newTableCfs = new HashMap<>(); 542 preTableCfs.forEach( 543 (table, cfs) -> newTableCfs.put(table, cfs != null ? Lists.newArrayList(cfs) : null)); 544 return newTableCfs; 545 } 546 547 public static ReplicationPeerConfig removeTableCFsFromReplicationPeerConfig( 548 Map<TableName, List<String>> tableCfs, ReplicationPeerConfig peerConfig, String id) 549 throws ReplicationException { 550 Map<TableName, List<String>> preTableCfs = peerConfig.getTableCFsMap(); 551 if (preTableCfs == null) { 552 throw new ReplicationException("Table-Cfs for peer: " + id + " is null"); 553 } 554 Map<TableName, List<String>> newTableCfs = copyTableCFsMap(preTableCfs); 555 for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) { 556 TableName table = entry.getKey(); 557 Collection<String> removeCfs = entry.getValue(); 558 if (newTableCfs.containsKey(table)) { 559 List<String> cfs = newTableCfs.get(table); 560 if (cfs == null && (removeCfs == null || removeCfs.isEmpty())) { 561 newTableCfs.remove(table); 562 } else if (cfs != null && (removeCfs != null && !removeCfs.isEmpty())) { 563 Set<String> cfSet = new HashSet<String>(cfs); 564 cfSet.removeAll(removeCfs); 565 if (cfSet.isEmpty()) { 566 newTableCfs.remove(table); 567 } else { 568 newTableCfs.put(table, Lists.newArrayList(cfSet)); 569 } 570 } else if (cfs == null && (removeCfs != null && !removeCfs.isEmpty())) { 571 throw new ReplicationException("Cannot remove cf of table: " + table 572 + " which doesn't specify cfs from table-cfs config in peer: " + id); 573 } else if (cfs != null && (removeCfs == null || removeCfs.isEmpty())) { 574 throw new ReplicationException("Cannot remove table: " + table 575 + " which has specified cfs from table-cfs config in peer: " + id); 576 } 577 } else { 578 throw new ReplicationException( 579 "No table: " + table + " in table-cfs config of peer: " + id); 580 } 581 } 582 ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(peerConfig); 583 builder.setTableCFsMap(newTableCfs); 584 return builder.build(); 585 } 586 587 public static ReplicationPeerConfig removeExcludeTableCFsFromReplicationPeerConfig( 588 Map<TableName, List<String>> excludeTableCfs, ReplicationPeerConfig peerConfig, String id) 589 throws ReplicationException { 590 if (excludeTableCfs == null) { 591 throw new ReplicationException("exclude tableCfs is null"); 592 } 593 Map<TableName, List<String>> preExcludeTableCfs = peerConfig.getExcludeTableCFsMap(); 594 if (preExcludeTableCfs == null) { 595 throw new ReplicationException("exclude-Table-Cfs for peer: " + id + " is null"); 596 } 597 Map<TableName, List<String>> newExcludeTableCfs = copyTableCFsMap(preExcludeTableCfs); 598 for (Map.Entry<TableName, ? extends Collection<String>> entry : excludeTableCfs.entrySet()) { 599 TableName table = entry.getKey(); 600 Collection<String> removeCfs = entry.getValue(); 601 if (newExcludeTableCfs.containsKey(table)) { 602 List<String> cfs = newExcludeTableCfs.get(table); 603 if (cfs == null && (removeCfs == null || removeCfs.isEmpty())) { 604 newExcludeTableCfs.remove(table); 605 } else if (cfs != null && (removeCfs != null && !removeCfs.isEmpty())) { 606 Set<String> cfSet = new HashSet<String>(cfs); 607 cfSet.removeAll(removeCfs); 608 if (cfSet.isEmpty()) { 609 newExcludeTableCfs.remove(table); 610 } else { 611 newExcludeTableCfs.put(table, Lists.newArrayList(cfSet)); 612 } 613 } else if (cfs == null && (removeCfs != null && !removeCfs.isEmpty())) { 614 throw new ReplicationException("Cannot remove cf of table: " + table 615 + " which doesn't specify cfs from exclude-table-cfs config in peer: " + id); 616 } else if (cfs != null && (removeCfs == null || removeCfs.isEmpty())) { 617 throw new ReplicationException("Cannot remove table: " + table 618 + " which has specified cfs from exclude-table-cfs config in peer: " + id); 619 } 620 } else { 621 throw new ReplicationException( 622 "No table: " + table + " in exclude-table-cfs config of peer: " + id); 623 } 624 } 625 ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(peerConfig); 626 builder.setExcludeTableCFsMap(newExcludeTableCfs); 627 return builder.build(); 628 } 629 630 /** 631 * Returns the configuration needed to talk to the remote slave cluster. 632 * @param conf the base configuration 633 * @param peer the description of replication peer 634 * @return the configuration for the peer cluster, null if it was unable to get the configuration 635 * @throws IOException when create peer cluster configuration failed 636 */ 637 public static Configuration getPeerClusterConfiguration(Configuration conf, 638 ReplicationPeerDescription peer) throws IOException { 639 ReplicationPeerConfig peerConfig = peer.getPeerConfig(); 640 Configuration otherConf; 641 try { 642 otherConf = HBaseConfiguration.createClusterConf(conf, peerConfig.getClusterKey()); 643 } catch (IOException e) { 644 throw new IOException("Can't get peer configuration for peerId=" + peer.getPeerId(), e); 645 } 646 647 if (!peerConfig.getConfiguration().isEmpty()) { 648 CompoundConfiguration compound = new CompoundConfiguration(); 649 compound.add(otherConf); 650 compound.addStringMap(peerConfig.getConfiguration()); 651 return compound; 652 } 653 return otherConf; 654 } 655}