001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import java.io.IOException; 021import java.util.Collection; 022import java.util.List; 023import java.util.Map; 024import java.util.Set; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.fs.FileSystem; 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.hbase.HConstants; 029import org.apache.hadoop.hbase.TableName; 030import org.apache.yetus.audience.InterfaceAudience; 031import org.slf4j.Logger; 032import org.slf4j.LoggerFactory; 033 034/** 035 * Helper class for replication. 036 */ 037@InterfaceAudience.Private 038public final class ReplicationUtils { 039 040 private static final Logger LOG = LoggerFactory.getLogger(ReplicationUtils.class); 041 042 public static final String REPLICATION_ATTR_NAME = "__rep__"; 043 044 public static final String REMOTE_WAL_DIR_NAME = "remoteWALs"; 045 046 public static final String SYNC_WAL_SUFFIX = ".syncrep"; 047 048 public static final String REMOTE_WAL_REPLAY_SUFFIX = "-replay"; 049 050 public static final String REMOTE_WAL_SNAPSHOT_SUFFIX = "-snapshot"; 051 052 // This is used for copying sync replication log from local to remote and overwrite the old one 053 // since some FileSystem implementation may not support atomic rename. 054 public static final String RENAME_WAL_SUFFIX = ".ren"; 055 056 public static final String LEGACY_REGION_REPLICATION_ENDPOINT_NAME = 057 "org.apache.hadoop.hbase.replication.regionserver.RegionReplicaReplicationEndpoint"; 058 059 private ReplicationUtils() { 060 } 061 062 private static boolean isCollectionEqual(Collection<String> c1, Collection<String> c2) { 063 if (c1 == null) { 064 return c2 == null; 065 } 066 if (c2 == null) { 067 return false; 068 } 069 return c1.size() == c2.size() && c1.containsAll(c2); 070 } 071 072 private static boolean isNamespacesEqual(Set<String> ns1, Set<String> ns2) { 073 return isCollectionEqual(ns1, ns2); 074 } 075 076 private static boolean isTableCFsEqual(Map<TableName, List<String>> tableCFs1, 077 Map<TableName, List<String>> tableCFs2) { 078 if (tableCFs1 == null) { 079 return tableCFs2 == null; 080 } 081 if (tableCFs2 == null) { 082 return false; 083 } 084 if (tableCFs1.size() != tableCFs2.size()) { 085 return false; 086 } 087 for (Map.Entry<TableName, List<String>> entry1 : tableCFs1.entrySet()) { 088 TableName table = entry1.getKey(); 089 if (!tableCFs2.containsKey(table)) { 090 return false; 091 } 092 List<String> cfs1 = entry1.getValue(); 093 List<String> cfs2 = tableCFs2.get(table); 094 if (!isCollectionEqual(cfs1, cfs2)) { 095 return false; 096 } 097 } 098 return true; 099 } 100 101 public static boolean isNamespacesAndTableCFsEqual(ReplicationPeerConfig rpc1, 102 ReplicationPeerConfig rpc2) { 103 if (rpc1.replicateAllUserTables() != rpc2.replicateAllUserTables()) { 104 return false; 105 } 106 if (rpc1.replicateAllUserTables()) { 107 return isNamespacesEqual(rpc1.getExcludeNamespaces(), rpc2.getExcludeNamespaces()) 108 && isTableCFsEqual(rpc1.getExcludeTableCFsMap(), rpc2.getExcludeTableCFsMap()); 109 } else { 110 return isNamespacesEqual(rpc1.getNamespaces(), rpc2.getNamespaces()) 111 && isTableCFsEqual(rpc1.getTableCFsMap(), rpc2.getTableCFsMap()); 112 } 113 } 114 115 /** 116 * @param c Configuration to look at 117 * @return True if replication for bulk load data is enabled. 118 */ 119 public static boolean isReplicationForBulkLoadDataEnabled(final Configuration c) { 120 return c.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, 121 HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT); 122 } 123 124 public static FileSystem getRemoteWALFileSystem(Configuration conf, String remoteWALDir) 125 throws IOException { 126 return new Path(remoteWALDir).getFileSystem(conf); 127 } 128 129 public static Path getPeerRemoteWALDir(String remoteWALDir, String peerId) { 130 return new Path(remoteWALDir, peerId); 131 } 132 133 public static Path getPeerRemoteWALDir(Path remoteWALDir, String peerId) { 134 return new Path(remoteWALDir, peerId); 135 } 136 137 public static Path getPeerReplayWALDir(Path remoteWALDir, String peerId) { 138 return getPeerRemoteWALDir(remoteWALDir, peerId).suffix(REMOTE_WAL_REPLAY_SUFFIX); 139 } 140 141 public static Path getPeerSnapshotWALDir(String remoteWALDir, String peerId) { 142 return getPeerRemoteWALDir(remoteWALDir, peerId).suffix(REMOTE_WAL_SNAPSHOT_SUFFIX); 143 } 144 145 public static Path getPeerSnapshotWALDir(Path remoteWALDir, String peerId) { 146 return getPeerRemoteWALDir(remoteWALDir, peerId).suffix(REMOTE_WAL_SNAPSHOT_SUFFIX); 147 } 148 149 /** 150 * Do the sleeping logic 151 * @param msg Why we sleep 152 * @param sleepForRetries the base sleep time. 153 * @param sleepMultiplier by how many times the default sleeping time is augmented 154 * @param maxRetriesMultiplier the max retry multiplier 155 * @return True if <code>sleepMultiplier</code> is < <code>maxRetriesMultiplier</code> 156 */ 157 public static boolean sleepForRetries(String msg, long sleepForRetries, int sleepMultiplier, 158 int maxRetriesMultiplier) { 159 try { 160 LOG.trace("{}, sleeping {} times {}", msg, sleepForRetries, sleepMultiplier); 161 Thread.sleep(sleepForRetries * sleepMultiplier); 162 } catch (InterruptedException e) { 163 LOG.debug("Interrupted while sleeping between retries"); 164 Thread.currentThread().interrupt(); 165 } 166 return sleepMultiplier < maxRetriesMultiplier; 167 } 168 169 /** 170 * Get the adaptive timeout value when performing a retry 171 */ 172 public static int getAdaptiveTimeout(final int initialValue, final int retries) { 173 int ntries = retries; 174 if (ntries >= HConstants.RETRY_BACKOFF.length) { 175 ntries = HConstants.RETRY_BACKOFF.length - 1; 176 } 177 if (ntries < 0) { 178 ntries = 0; 179 } 180 return initialValue * HConstants.RETRY_BACKOFF[ntries]; 181 } 182}