001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import java.io.IOException;
021import java.util.Collection;
022import java.util.List;
023import java.util.Map;
024import java.util.Set;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.fs.FileSystem;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.hbase.CompoundConfiguration;
029import org.apache.hadoop.hbase.HBaseConfiguration;
030import org.apache.hadoop.hbase.HConstants;
031import org.apache.hadoop.hbase.TableName;
032import org.apache.yetus.audience.InterfaceAudience;
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035
036/**
037 * Helper class for replication.
038 */
039@InterfaceAudience.Private
040public final class ReplicationUtils {
041
042  private static final Logger LOG = LoggerFactory.getLogger(ReplicationUtils.class);
043
044  public static final String REPLICATION_ATTR_NAME = "__rep__";
045
046  public static final String REMOTE_WAL_DIR_NAME = "remoteWALs";
047
048  public static final String SYNC_WAL_SUFFIX = ".syncrep";
049
050  public static final String REMOTE_WAL_REPLAY_SUFFIX = "-replay";
051
052  public static final String REMOTE_WAL_SNAPSHOT_SUFFIX = "-snapshot";
053
054  // This is used for copying sync replication log from local to remote and overwrite the old one
055  // since some FileSystem implementation may not support atomic rename.
056  public static final String RENAME_WAL_SUFFIX = ".ren";
057
058  public static final String LEGACY_REGION_REPLICATION_ENDPOINT_NAME =
059    "org.apache.hadoop.hbase.replication.regionserver.RegionReplicaReplicationEndpoint";
060
061  private ReplicationUtils() {
062  }
063
064  public static Configuration getPeerClusterConfiguration(ReplicationPeerConfig peerConfig,
065    Configuration baseConf) throws ReplicationException {
066    Configuration otherConf;
067    try {
068      otherConf = HBaseConfiguration.createClusterConf(baseConf, peerConfig.getClusterKey());
069    } catch (IOException e) {
070      throw new ReplicationException("Can't get peer configuration for peer " + peerConfig, e);
071    }
072
073    if (!peerConfig.getConfiguration().isEmpty()) {
074      CompoundConfiguration compound = new CompoundConfiguration();
075      compound.add(otherConf);
076      compound.addStringMap(peerConfig.getConfiguration());
077      return compound;
078    }
079
080    return otherConf;
081  }
082
083  private static boolean isCollectionEqual(Collection<String> c1, Collection<String> c2) {
084    if (c1 == null) {
085      return c2 == null;
086    }
087    if (c2 == null) {
088      return false;
089    }
090    return c1.size() == c2.size() && c1.containsAll(c2);
091  }
092
093  private static boolean isNamespacesEqual(Set<String> ns1, Set<String> ns2) {
094    return isCollectionEqual(ns1, ns2);
095  }
096
097  private static boolean isTableCFsEqual(Map<TableName, List<String>> tableCFs1,
098    Map<TableName, List<String>> tableCFs2) {
099    if (tableCFs1 == null) {
100      return tableCFs2 == null;
101    }
102    if (tableCFs2 == null) {
103      return false;
104    }
105    if (tableCFs1.size() != tableCFs2.size()) {
106      return false;
107    }
108    for (Map.Entry<TableName, List<String>> entry1 : tableCFs1.entrySet()) {
109      TableName table = entry1.getKey();
110      if (!tableCFs2.containsKey(table)) {
111        return false;
112      }
113      List<String> cfs1 = entry1.getValue();
114      List<String> cfs2 = tableCFs2.get(table);
115      if (!isCollectionEqual(cfs1, cfs2)) {
116        return false;
117      }
118    }
119    return true;
120  }
121
122  public static boolean isNamespacesAndTableCFsEqual(ReplicationPeerConfig rpc1,
123    ReplicationPeerConfig rpc2) {
124    if (rpc1.replicateAllUserTables() != rpc2.replicateAllUserTables()) {
125      return false;
126    }
127    if (rpc1.replicateAllUserTables()) {
128      return isNamespacesEqual(rpc1.getExcludeNamespaces(), rpc2.getExcludeNamespaces())
129        && isTableCFsEqual(rpc1.getExcludeTableCFsMap(), rpc2.getExcludeTableCFsMap());
130    } else {
131      return isNamespacesEqual(rpc1.getNamespaces(), rpc2.getNamespaces())
132        && isTableCFsEqual(rpc1.getTableCFsMap(), rpc2.getTableCFsMap());
133    }
134  }
135
136  /**
137   * @param c Configuration to look at
138   * @return True if replication for bulk load data is enabled.
139   */
140  public static boolean isReplicationForBulkLoadDataEnabled(final Configuration c) {
141    return c.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
142      HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
143  }
144
145  public static FileSystem getRemoteWALFileSystem(Configuration conf, String remoteWALDir)
146    throws IOException {
147    return new Path(remoteWALDir).getFileSystem(conf);
148  }
149
150  public static Path getPeerRemoteWALDir(String remoteWALDir, String peerId) {
151    return new Path(remoteWALDir, peerId);
152  }
153
154  public static Path getPeerRemoteWALDir(Path remoteWALDir, String peerId) {
155    return new Path(remoteWALDir, peerId);
156  }
157
158  public static Path getPeerReplayWALDir(Path remoteWALDir, String peerId) {
159    return getPeerRemoteWALDir(remoteWALDir, peerId).suffix(REMOTE_WAL_REPLAY_SUFFIX);
160  }
161
162  public static Path getPeerSnapshotWALDir(String remoteWALDir, String peerId) {
163    return getPeerRemoteWALDir(remoteWALDir, peerId).suffix(REMOTE_WAL_SNAPSHOT_SUFFIX);
164  }
165
166  public static Path getPeerSnapshotWALDir(Path remoteWALDir, String peerId) {
167    return getPeerRemoteWALDir(remoteWALDir, peerId).suffix(REMOTE_WAL_SNAPSHOT_SUFFIX);
168  }
169
170  /**
171   * Do the sleeping logic
172   * @param msg                  Why we sleep
173   * @param sleepForRetries      the base sleep time.
174   * @param sleepMultiplier      by how many times the default sleeping time is augmented
175   * @param maxRetriesMultiplier the max retry multiplier
176   * @return True if <code>sleepMultiplier</code> is &lt; <code>maxRetriesMultiplier</code>
177   */
178  public static boolean sleepForRetries(String msg, long sleepForRetries, int sleepMultiplier,
179    int maxRetriesMultiplier) {
180    try {
181      LOG.trace("{}, sleeping {} times {}", msg, sleepForRetries, sleepMultiplier);
182      Thread.sleep(sleepForRetries * sleepMultiplier);
183    } catch (InterruptedException e) {
184      LOG.debug("Interrupted while sleeping between retries");
185      Thread.currentThread().interrupt();
186    }
187    return sleepMultiplier < maxRetriesMultiplier;
188  }
189
190  /**
191   * Get the adaptive timeout value when performing a retry
192   */
193  public static int getAdaptiveTimeout(final int initialValue, final int retries) {
194    int ntries = retries;
195    if (ntries >= HConstants.RETRY_BACKOFF.length) {
196      ntries = HConstants.RETRY_BACKOFF.length - 1;
197    }
198    if (ntries < 0) {
199      ntries = 0;
200    }
201    return initialValue * HConstants.RETRY_BACKOFF[ntries];
202  }
203}