001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import java.io.IOException;
021import java.util.Collection;
022import java.util.List;
023import java.util.Map;
024import java.util.Set;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.hbase.CompoundConfiguration;
027import org.apache.hadoop.hbase.HBaseConfiguration;
028import org.apache.hadoop.hbase.HConstants;
029import org.apache.hadoop.hbase.ServerName;
030import org.apache.hadoop.hbase.TableName;
031import org.apache.yetus.audience.InterfaceAudience;
032
033/**
034 * Helper class for replication.
035 */
036@InterfaceAudience.Private
037public final class ReplicationUtils {
038
039  private ReplicationUtils() {
040  }
041
042  public static Configuration getPeerClusterConfiguration(ReplicationPeerConfig peerConfig,
043      Configuration baseConf) throws ReplicationException {
044    Configuration otherConf;
045    try {
046      otherConf = HBaseConfiguration.createClusterConf(baseConf, peerConfig.getClusterKey());
047    } catch (IOException e) {
048      throw new ReplicationException("Can't get peer configuration for peer " + peerConfig, e);
049    }
050
051    if (!peerConfig.getConfiguration().isEmpty()) {
052      CompoundConfiguration compound = new CompoundConfiguration();
053      compound.add(otherConf);
054      compound.addStringMap(peerConfig.getConfiguration());
055      return compound;
056    }
057
058    return otherConf;
059  }
060
061  public static void removeAllQueues(ReplicationQueueStorage queueStorage, String peerId)
062      throws ReplicationException {
063    for (ServerName replicator : queueStorage.getListOfReplicators()) {
064      List<String> queueIds = queueStorage.getAllQueues(replicator);
065      for (String queueId : queueIds) {
066        ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
067        if (queueInfo.getPeerId().equals(peerId)) {
068          queueStorage.removeQueue(replicator, queueId);
069        }
070      }
071      queueStorage.removeReplicatorIfQueueIsEmpty(replicator);
072    }
073  }
074
075  private static boolean isCollectionEqual(Collection<String> c1, Collection<String> c2) {
076    if (c1 == null) {
077      return c2 == null;
078    }
079    if (c2 == null) {
080      return false;
081    }
082    return c1.size() == c2.size() && c1.containsAll(c2);
083  }
084
085  private static boolean isNamespacesEqual(Set<String> ns1, Set<String> ns2) {
086    return isCollectionEqual(ns1, ns2);
087  }
088
089  private static boolean isTableCFsEqual(Map<TableName, List<String>> tableCFs1,
090      Map<TableName, List<String>> tableCFs2) {
091    if (tableCFs1 == null) {
092      return tableCFs2 == null;
093    }
094    if (tableCFs2 == null) {
095      return false;
096    }
097    if (tableCFs1.size() != tableCFs2.size()) {
098      return false;
099    }
100    for (Map.Entry<TableName, List<String>> entry1 : tableCFs1.entrySet()) {
101      TableName table = entry1.getKey();
102      if (!tableCFs2.containsKey(table)) {
103        return false;
104      }
105      List<String> cfs1 = entry1.getValue();
106      List<String> cfs2 = tableCFs2.get(table);
107      if (!isCollectionEqual(cfs1, cfs2)) {
108        return false;
109      }
110    }
111    return true;
112  }
113
114  public static boolean isNamespacesAndTableCFsEqual(ReplicationPeerConfig rpc1,
115      ReplicationPeerConfig rpc2) {
116    if (rpc1.replicateAllUserTables() != rpc2.replicateAllUserTables()) {
117      return false;
118    }
119    if (rpc1.replicateAllUserTables()) {
120      return isNamespacesEqual(rpc1.getExcludeNamespaces(), rpc2.getExcludeNamespaces()) &&
121        isTableCFsEqual(rpc1.getExcludeTableCFsMap(), rpc2.getExcludeTableCFsMap());
122    } else {
123      return isNamespacesEqual(rpc1.getNamespaces(), rpc2.getNamespaces()) &&
124        isTableCFsEqual(rpc1.getTableCFsMap(), rpc2.getTableCFsMap());
125    }
126  }
127
128  /**
129   * @param c Configuration to look at
130   * @return True if replication for bulk load data is enabled.
131   */
132  public static boolean isReplicationForBulkLoadDataEnabled(final Configuration c) {
133    return c.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
134      HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
135  }
136
137  /**
138   * @deprecated Will be removed in HBase 3.
139   *             Use {@link ReplicationPeerConfig#needToReplicate(TableName)} instead.
140   * @param peerConfig configuration for the replication peer cluster
141   * @param tableName name of the table
142   * @return true if the table need replicate to the peer cluster
143   */
144  @Deprecated
145  public static boolean contains(ReplicationPeerConfig peerConfig, TableName tableName) {
146    return peerConfig.needToReplicate(tableName);
147  }
148
149  /**
150   * Get the adaptive timeout value when performing a retry
151   */
152  public static int getAdaptiveTimeout(final int initialValue, final int retries) {
153    int ntries = retries;
154    if (ntries >= HConstants.RETRY_BACKOFF.length) {
155      ntries = HConstants.RETRY_BACKOFF.length - 1;
156    }
157    if (ntries < 0) {
158      ntries = 0;
159    }
160    return initialValue * HConstants.RETRY_BACKOFF[ntries];
161  }
162}