001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import java.io.IOException;
021import java.util.Collection;
022import java.util.List;
023import java.util.Map;
024import java.util.Set;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.fs.FileSystem;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.hbase.HConstants;
029import org.apache.hadoop.hbase.TableName;
030import org.apache.yetus.audience.InterfaceAudience;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033
034/**
035 * Helper class for replication.
036 */
037@InterfaceAudience.Private
038public final class ReplicationUtils {
039
040  private static final Logger LOG = LoggerFactory.getLogger(ReplicationUtils.class);
041
042  public static final String REPLICATION_ATTR_NAME = "__rep__";
043
044  public static final String REMOTE_WAL_DIR_NAME = "remoteWALs";
045
046  public static final String SYNC_WAL_SUFFIX = ".syncrep";
047
048  public static final String REMOTE_WAL_REPLAY_SUFFIX = "-replay";
049
050  public static final String REMOTE_WAL_SNAPSHOT_SUFFIX = "-snapshot";
051
052  // This is used for copying sync replication log from local to remote and overwrite the old one
053  // since some FileSystem implementation may not support atomic rename.
054  public static final String RENAME_WAL_SUFFIX = ".ren";
055
056  public static final String LEGACY_REGION_REPLICATION_ENDPOINT_NAME =
057    "org.apache.hadoop.hbase.replication.regionserver.RegionReplicaReplicationEndpoint";
058
059  private ReplicationUtils() {
060  }
061
062  private static boolean isCollectionEqual(Collection<String> c1, Collection<String> c2) {
063    if (c1 == null) {
064      return c2 == null;
065    }
066    if (c2 == null) {
067      return false;
068    }
069    return c1.size() == c2.size() && c1.containsAll(c2);
070  }
071
072  private static boolean isNamespacesEqual(Set<String> ns1, Set<String> ns2) {
073    return isCollectionEqual(ns1, ns2);
074  }
075
076  private static boolean isTableCFsEqual(Map<TableName, List<String>> tableCFs1,
077    Map<TableName, List<String>> tableCFs2) {
078    if (tableCFs1 == null) {
079      return tableCFs2 == null;
080    }
081    if (tableCFs2 == null) {
082      return false;
083    }
084    if (tableCFs1.size() != tableCFs2.size()) {
085      return false;
086    }
087    for (Map.Entry<TableName, List<String>> entry1 : tableCFs1.entrySet()) {
088      TableName table = entry1.getKey();
089      if (!tableCFs2.containsKey(table)) {
090        return false;
091      }
092      List<String> cfs1 = entry1.getValue();
093      List<String> cfs2 = tableCFs2.get(table);
094      if (!isCollectionEqual(cfs1, cfs2)) {
095        return false;
096      }
097    }
098    return true;
099  }
100
101  public static boolean isNamespacesAndTableCFsEqual(ReplicationPeerConfig rpc1,
102    ReplicationPeerConfig rpc2) {
103    if (rpc1.replicateAllUserTables() != rpc2.replicateAllUserTables()) {
104      return false;
105    }
106    if (rpc1.replicateAllUserTables()) {
107      return isNamespacesEqual(rpc1.getExcludeNamespaces(), rpc2.getExcludeNamespaces())
108        && isTableCFsEqual(rpc1.getExcludeTableCFsMap(), rpc2.getExcludeTableCFsMap());
109    } else {
110      return isNamespacesEqual(rpc1.getNamespaces(), rpc2.getNamespaces())
111        && isTableCFsEqual(rpc1.getTableCFsMap(), rpc2.getTableCFsMap());
112    }
113  }
114
115  /**
116   * @param c Configuration to look at
117   * @return True if replication for bulk load data is enabled.
118   */
119  public static boolean isReplicationForBulkLoadDataEnabled(final Configuration c) {
120    return c.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
121      HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
122  }
123
124  public static FileSystem getRemoteWALFileSystem(Configuration conf, String remoteWALDir)
125    throws IOException {
126    return new Path(remoteWALDir).getFileSystem(conf);
127  }
128
129  public static Path getPeerRemoteWALDir(String remoteWALDir, String peerId) {
130    return new Path(remoteWALDir, peerId);
131  }
132
133  public static Path getPeerRemoteWALDir(Path remoteWALDir, String peerId) {
134    return new Path(remoteWALDir, peerId);
135  }
136
137  public static Path getPeerReplayWALDir(Path remoteWALDir, String peerId) {
138    return getPeerRemoteWALDir(remoteWALDir, peerId).suffix(REMOTE_WAL_REPLAY_SUFFIX);
139  }
140
141  public static Path getPeerSnapshotWALDir(String remoteWALDir, String peerId) {
142    return getPeerRemoteWALDir(remoteWALDir, peerId).suffix(REMOTE_WAL_SNAPSHOT_SUFFIX);
143  }
144
145  public static Path getPeerSnapshotWALDir(Path remoteWALDir, String peerId) {
146    return getPeerRemoteWALDir(remoteWALDir, peerId).suffix(REMOTE_WAL_SNAPSHOT_SUFFIX);
147  }
148
149  /**
150   * Do the sleeping logic
151   * @param msg                  Why we sleep
152   * @param sleepForRetries      the base sleep time.
153   * @param sleepMultiplier      by how many times the default sleeping time is augmented
154   * @param maxRetriesMultiplier the max retry multiplier
155   * @return True if <code>sleepMultiplier</code> is &lt; <code>maxRetriesMultiplier</code>
156   */
157  public static boolean sleepForRetries(String msg, long sleepForRetries, int sleepMultiplier,
158    int maxRetriesMultiplier) {
159    try {
160      LOG.trace("{}, sleeping {} times {}", msg, sleepForRetries, sleepMultiplier);
161      Thread.sleep(sleepForRetries * sleepMultiplier);
162    } catch (InterruptedException e) {
163      LOG.debug("Interrupted while sleeping between retries");
164      Thread.currentThread().interrupt();
165    }
166    return sleepMultiplier < maxRetriesMultiplier;
167  }
168
169  /**
170   * Get the adaptive timeout value when performing a retry
171   */
172  public static int getAdaptiveTimeout(final int initialValue, final int retries) {
173    int ntries = retries;
174    if (ntries >= HConstants.RETRY_BACKOFF.length) {
175      ntries = HConstants.RETRY_BACKOFF.length - 1;
176    }
177    if (ntries < 0) {
178      ntries = 0;
179    }
180    return initialValue * HConstants.RETRY_BACKOFF[ntries];
181  }
182}