001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.List; 023import java.util.UUID; 024import java.util.concurrent.TimeUnit; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.fs.FileSystem; 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.hbase.CompatibilitySingletonFactory; 029import org.apache.hadoop.hbase.HConstants; 030import org.apache.hadoop.hbase.ScheduledChore; 031import org.apache.hadoop.hbase.Server; 032import org.apache.hadoop.hbase.Stoppable; 033import org.apache.hadoop.hbase.TableName; 034import org.apache.hadoop.hbase.conf.ConfigurationManager; 035import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver; 036import org.apache.hadoop.hbase.regionserver.ReplicationSourceService; 037import org.apache.hadoop.hbase.replication.ReplicationFactory; 038import org.apache.hadoop.hbase.replication.ReplicationPeers; 039import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 040import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; 041import org.apache.hadoop.hbase.replication.ReplicationUtils; 042import org.apache.hadoop.hbase.replication.SyncReplicationState; 043import org.apache.hadoop.hbase.util.Pair; 044import org.apache.hadoop.hbase.wal.WALFactory; 045import org.apache.hadoop.hbase.wal.WALProvider; 046import org.apache.hadoop.hbase.zookeeper.ZKClusterId; 047import org.apache.yetus.audience.InterfaceAudience; 048import org.apache.zookeeper.KeeperException; 049import org.slf4j.Logger; 050import org.slf4j.LoggerFactory; 051 052/** 053 * Gateway to Replication. Used by {@link org.apache.hadoop.hbase.regionserver.HRegionServer}. 054 * <p> 055 * Implement {@link PropagatingConfigurationObserver} mainly for registering 056 * {@link ReplicationPeers}, so we can recreating the replication peer storage. 057 */ 058@InterfaceAudience.Private 059public class Replication implements ReplicationSourceService, PropagatingConfigurationObserver { 060 private static final Logger LOG = LoggerFactory.getLogger(Replication.class); 061 private boolean isReplicationForBulkLoadDataEnabled; 062 private ReplicationSourceManager replicationManager; 063 private ReplicationQueueStorage queueStorage; 064 private ReplicationPeers replicationPeers; 065 private volatile Configuration conf; 066 private SyncReplicationPeerInfoProvider syncReplicationPeerInfoProvider; 067 // Hosting server 068 private Server server; 069 private int statsPeriodInSecond; 070 // ReplicationLoad to access replication metrics 071 private ReplicationLoad replicationLoad; 072 private MetricsReplicationGlobalSourceSource globalMetricsSource; 073 074 private PeerProcedureHandler peerProcedureHandler; 075 076 /** 077 * Empty constructor 078 */ 079 public Replication() { 080 } 081 082 @Override 083 public void initialize(Server server, FileSystem fs, Path logDir, Path oldLogDir, 084 WALFactory walFactory) throws IOException { 085 this.server = server; 086 this.conf = this.server.getConfiguration(); 087 this.isReplicationForBulkLoadDataEnabled = 088 ReplicationUtils.isReplicationForBulkLoadDataEnabled(this.conf); 089 if (this.isReplicationForBulkLoadDataEnabled) { 090 if ( 091 conf.get(HConstants.REPLICATION_CLUSTER_ID) == null 092 || conf.get(HConstants.REPLICATION_CLUSTER_ID).isEmpty() 093 ) { 094 throw new IllegalArgumentException( 095 HConstants.REPLICATION_CLUSTER_ID + " cannot be null/empty when " 096 + HConstants.REPLICATION_BULKLOAD_ENABLE_KEY + " is set to true."); 097 } 098 } 099 100 try { 101 this.queueStorage = 102 ReplicationStorageFactory.getReplicationQueueStorage(server.getConnection(), conf); 103 this.replicationPeers = ReplicationFactory.getReplicationPeers(server.getFileSystem(), 104 server.getZooKeeper(), this.conf); 105 this.replicationPeers.init(); 106 } catch (Exception e) { 107 throw new IOException("Failed replication handler create", e); 108 } 109 UUID clusterId = null; 110 try { 111 clusterId = ZKClusterId.getUUIDForCluster(this.server.getZooKeeper()); 112 } catch (KeeperException ke) { 113 throw new IOException("Could not read cluster id", ke); 114 } 115 SyncReplicationPeerMappingManager mapping = new SyncReplicationPeerMappingManager(); 116 this.globalMetricsSource = CompatibilitySingletonFactory 117 .getInstance(MetricsReplicationSourceFactory.class).getGlobalSource(); 118 this.replicationManager = new ReplicationSourceManager(queueStorage, replicationPeers, conf, 119 this.server, fs, logDir, oldLogDir, clusterId, walFactory, mapping, globalMetricsSource); 120 this.statsPeriodInSecond = this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60); 121 this.replicationLoad = new ReplicationLoad(); 122 123 this.syncReplicationPeerInfoProvider = 124 new SyncReplicationPeerInfoProviderImpl(replicationPeers, mapping); 125 // Get the user-space WAL provider 126 WALProvider walProvider = walFactory != null ? walFactory.getWALProvider() : null; 127 if (walProvider != null) { 128 walProvider 129 .addWALActionsListener(new ReplicationSourceWALActionListener(conf, replicationManager)); 130 PeerActionListener peerActionListener = walProvider.getPeerActionListener(); 131 walProvider.setSyncReplicationPeerInfoProvider(syncReplicationPeerInfoProvider); 132 // for sync replication state change, we need to reload the state twice, you can see the 133 // code in PeerProcedureHandlerImpl, so here we need to go over the sync replication peers 134 // to see if any of them are in the middle of the two refreshes, if so, we need to manually 135 // repeat the action we have done in the first refresh, otherwise when the second refresh 136 // comes we will be in trouble, such as NPE. 137 replicationPeers.getAllPeerIds().stream().map(replicationPeers::getPeer) 138 .filter(p -> p.getPeerConfig().isSyncReplication()) 139 .filter(p -> p.getNewSyncReplicationState() != SyncReplicationState.NONE) 140 .forEach(p -> peerActionListener.peerSyncReplicationStateChange(p.getId(), 141 p.getSyncReplicationState(), p.getNewSyncReplicationState(), 0)); 142 this.peerProcedureHandler = 143 new PeerProcedureHandlerImpl(replicationManager, peerActionListener); 144 } else { 145 this.peerProcedureHandler = 146 new PeerProcedureHandlerImpl(replicationManager, PeerActionListener.DUMMY); 147 } 148 } 149 150 @Override 151 public PeerProcedureHandler getPeerProcedureHandler() { 152 return peerProcedureHandler; 153 } 154 155 /** 156 * Stops replication service. 157 */ 158 @Override 159 public void stopReplicationService() { 160 this.replicationManager.join(); 161 } 162 163 /** 164 * If replication is enabled and this cluster is a master, it starts 165 */ 166 @Override 167 public void startReplicationService() throws IOException { 168 this.replicationManager.init(); 169 this.server.getChoreService().scheduleChore(new ReplicationStatisticsChore( 170 "ReplicationSourceStatistics", server, (int) TimeUnit.SECONDS.toMillis(statsPeriodInSecond))); 171 LOG.info("{} started", this.server.toString()); 172 } 173 174 /** 175 * Get the replication sources manager 176 * @return the manager if replication is enabled, else returns false 177 */ 178 public ReplicationSourceManager getReplicationManager() { 179 return this.replicationManager; 180 } 181 182 void addHFileRefsToQueue(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs) 183 throws IOException { 184 try { 185 this.replicationManager.addHFileRefs(tableName, family, pairs); 186 } catch (IOException e) { 187 LOG.error("Failed to add hfile references in the replication queue.", e); 188 throw e; 189 } 190 } 191 192 /** 193 * Statistics task. Periodically prints the cache statistics to the log. 194 */ 195 private final class ReplicationStatisticsChore extends ScheduledChore { 196 197 ReplicationStatisticsChore(String name, Stoppable stopper, int period) { 198 super(name, stopper, period); 199 } 200 201 @Override 202 protected void chore() { 203 printStats(replicationManager.getStats()); 204 } 205 206 private void printStats(String stats) { 207 if (!stats.isEmpty()) { 208 LOG.info(stats); 209 } 210 } 211 } 212 213 @Override 214 public ReplicationLoad refreshAndGetReplicationLoad() { 215 if (this.replicationLoad == null) { 216 return null; 217 } 218 // always build for latest data 219 List<ReplicationSourceInterface> allSources = new ArrayList<>(); 220 allSources.addAll(this.replicationManager.getSources()); 221 allSources.addAll(this.replicationManager.getOldSources()); 222 this.replicationLoad.buildReplicationLoad(allSources, null); 223 return this.replicationLoad; 224 } 225 226 @Override 227 public SyncReplicationPeerInfoProvider getSyncReplicationPeerInfoProvider() { 228 return syncReplicationPeerInfoProvider; 229 } 230 231 @Override 232 public ReplicationPeers getReplicationPeers() { 233 return replicationPeers; 234 } 235 236 @Override 237 public void onConfigurationChange(Configuration conf) { 238 this.conf = conf; 239 } 240 241 @Override 242 public void registerChildren(ConfigurationManager manager) { 243 manager.registerObserver(replicationPeers); 244 } 245 246 @Override 247 public void deregisterChildren(ConfigurationManager manager) { 248 manager.deregisterObserver(replicationPeers); 249 } 250}