001/* 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.replication.regionserver; 020 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.List; 024import java.util.OptionalLong; 025import java.util.UUID; 026import java.util.concurrent.Executors; 027import java.util.concurrent.ScheduledExecutorService; 028import java.util.concurrent.TimeUnit; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.FileSystem; 031import org.apache.hadoop.fs.Path; 032import org.apache.hadoop.hbase.CellScanner; 033import org.apache.hadoop.hbase.HConstants; 034import org.apache.hadoop.hbase.Server; 035import org.apache.hadoop.hbase.TableName; 036import org.apache.hadoop.hbase.regionserver.ReplicationSinkService; 037import org.apache.hadoop.hbase.regionserver.ReplicationSourceService; 038import org.apache.hadoop.hbase.replication.ReplicationException; 039import org.apache.hadoop.hbase.replication.ReplicationFactory; 040import org.apache.hadoop.hbase.replication.ReplicationPeers; 041import org.apache.hadoop.hbase.replication.ReplicationQueues; 042import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments; 043import org.apache.hadoop.hbase.replication.ReplicationTracker; 044import org.apache.hadoop.hbase.replication.ReplicationUtils; 045import org.apache.hadoop.hbase.util.Pair; 046import org.apache.hadoop.hbase.wal.WALProvider; 047import org.apache.hadoop.hbase.zookeeper.ZKClusterId; 048import org.apache.yetus.audience.InterfaceAudience; 049import org.apache.zookeeper.KeeperException; 050import org.slf4j.Logger; 051import org.slf4j.LoggerFactory; 052 053import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 054 055import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; 056 057/** 058 * Gateway to Replication. Used by {@link org.apache.hadoop.hbase.regionserver.HRegionServer}. 059 */ 060@InterfaceAudience.Private 061public class Replication implements ReplicationSourceService, ReplicationSinkService { 062 private static final Logger LOG = 063 LoggerFactory.getLogger(Replication.class); 064 private boolean isReplicationForBulkLoadDataEnabled; 065 private ReplicationSourceManager replicationManager; 066 private ReplicationQueues replicationQueues; 067 private ReplicationPeers replicationPeers; 068 private ReplicationTracker replicationTracker; 069 private Configuration conf; 070 private ReplicationSink replicationSink; 071 // Hosting server 072 private Server server; 073 /** Statistics thread schedule pool */ 074 private ScheduledExecutorService scheduleThreadPool; 075 private int statsThreadPeriod; 076 // ReplicationLoad to access replication metrics 077 private ReplicationLoad replicationLoad; 078 079 /** 080 * Empty constructor 081 */ 082 public Replication() { 083 } 084 085 @Override 086 public void initialize(Server server, FileSystem fs, Path logDir, Path oldLogDir, 087 WALProvider walProvider) throws IOException { 088 this.server = server; 089 this.conf = this.server.getConfiguration(); 090 this.isReplicationForBulkLoadDataEnabled = 091 ReplicationUtils.isReplicationForBulkLoadDataEnabled(this.conf); 092 this.scheduleThreadPool = Executors.newScheduledThreadPool(1, 093 new ThreadFactoryBuilder() 094 .setNameFormat(server.getServerName().toShortString() + "Replication Statistics #%d") 095 .setDaemon(true) 096 .build()); 097 if (this.isReplicationForBulkLoadDataEnabled) { 098 if (conf.get(HConstants.REPLICATION_CLUSTER_ID) == null 099 || conf.get(HConstants.REPLICATION_CLUSTER_ID).isEmpty()) { 100 throw new IllegalArgumentException(HConstants.REPLICATION_CLUSTER_ID 101 + " cannot be null/empty when " + HConstants.REPLICATION_BULKLOAD_ENABLE_KEY 102 + " is set to true."); 103 } 104 } 105 106 try { 107 this.replicationQueues = 108 ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, this.server, 109 server.getZooKeeper())); 110 this.replicationQueues.init(this.server.getServerName().toString()); 111 this.replicationPeers = 112 ReplicationFactory.getReplicationPeers(server.getZooKeeper(), this.conf, this.server); 113 this.replicationPeers.init(); 114 this.replicationTracker = 115 ReplicationFactory.getReplicationTracker(server.getZooKeeper(), this.replicationPeers, 116 this.conf, this.server, this.server); 117 } catch (Exception e) { 118 throw new IOException("Failed replication handler create", e); 119 } 120 UUID clusterId = null; 121 try { 122 clusterId = ZKClusterId.getUUIDForCluster(this.server.getZooKeeper()); 123 } catch (KeeperException ke) { 124 throw new IOException("Could not read cluster id", ke); 125 } 126 this.replicationManager = new ReplicationSourceManager(replicationQueues, replicationPeers, 127 replicationTracker, conf, this.server, fs, logDir, oldLogDir, clusterId, 128 walProvider != null ? walProvider.getWALFileLengthProvider() : p -> OptionalLong.empty()); 129 if (walProvider != null) { 130 walProvider 131 .addWALActionsListener(new ReplicationSourceWALActionListener(conf, replicationManager)); 132 } 133 this.statsThreadPeriod = 134 this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60); 135 LOG.debug("Replication stats-in-log period={} seconds", this.statsThreadPeriod); 136 this.replicationLoad = new ReplicationLoad(); 137 } 138 139 /** 140 * Stops replication service. 141 */ 142 @Override 143 public void stopReplicationService() { 144 join(); 145 } 146 147 /** 148 * Join with the replication threads 149 */ 150 public void join() { 151 this.replicationManager.join(); 152 if (this.replicationSink != null) { 153 this.replicationSink.stopReplicationSinkServices(); 154 } 155 scheduleThreadPool.shutdown(); 156 } 157 158 /** 159 * Carry on the list of log entries down to the sink 160 * @param entries list of entries to replicate 161 * @param cells The data -- the cells -- that <code>entries</code> describes (the entries do not 162 * contain the Cells we are replicating; they are passed here on the side in this 163 * CellScanner). 164 * @param replicationClusterId Id which will uniquely identify source cluster FS client 165 * configurations in the replication configuration directory 166 * @param sourceBaseNamespaceDirPath Path that point to the source cluster base namespace 167 * directory required for replicating hfiles 168 * @param sourceHFileArchiveDirPath Path that point to the source cluster hfile archive directory 169 * @throws IOException 170 */ 171 @Override 172 public void replicateLogEntries(List<WALEntry> entries, CellScanner cells, 173 String replicationClusterId, String sourceBaseNamespaceDirPath, 174 String sourceHFileArchiveDirPath) throws IOException { 175 this.replicationSink.replicateEntries(entries, cells, replicationClusterId, 176 sourceBaseNamespaceDirPath, sourceHFileArchiveDirPath); 177 } 178 179 /** 180 * If replication is enabled and this cluster is a master, 181 * it starts 182 * @throws IOException 183 */ 184 @Override 185 public void startReplicationService() throws IOException { 186 try { 187 this.replicationManager.init(); 188 } catch (ReplicationException e) { 189 throw new IOException(e); 190 } 191 this.replicationSink = new ReplicationSink(this.conf, this.server); 192 this.scheduleThreadPool.scheduleAtFixedRate( 193 new ReplicationStatisticsTask(this.replicationSink, this.replicationManager), 194 statsThreadPeriod, statsThreadPeriod, TimeUnit.SECONDS); 195 } 196 197 /** 198 * Get the replication sources manager 199 * @return the manager if replication is enabled, else returns false 200 */ 201 public ReplicationSourceManager getReplicationManager() { 202 return this.replicationManager; 203 } 204 205 void addHFileRefsToQueue(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs) 206 throws IOException { 207 try { 208 this.replicationManager.addHFileRefs(tableName, family, pairs); 209 } catch (ReplicationException e) { 210 LOG.error("Failed to add hfile references in the replication queue.", e); 211 throw new IOException(e); 212 } 213 } 214 215 /** 216 * Statistics task. Periodically prints the cache statistics to the log. 217 */ 218 private final static class ReplicationStatisticsTask implements Runnable { 219 220 private final ReplicationSink replicationSink; 221 private final ReplicationSourceManager replicationManager; 222 223 public ReplicationStatisticsTask(ReplicationSink replicationSink, 224 ReplicationSourceManager replicationManager) { 225 this.replicationManager = replicationManager; 226 this.replicationSink = replicationSink; 227 } 228 229 @Override 230 public void run() { 231 printStats(this.replicationManager.getStats()); 232 printStats(this.replicationSink.getStats()); 233 } 234 235 private void printStats(String stats) { 236 if (!stats.isEmpty()) { 237 LOG.info(stats); 238 } 239 } 240 } 241 242 @Override 243 public ReplicationLoad refreshAndGetReplicationLoad() { 244 if (this.replicationLoad == null) { 245 return null; 246 } 247 // always build for latest data 248 buildReplicationLoad(); 249 return this.replicationLoad; 250 } 251 252 private void buildReplicationLoad() { 253 List<MetricsSource> sourceMetricsList = new ArrayList<>(); 254 255 // get source 256 List<ReplicationSourceInterface> sources = this.replicationManager.getSources(); 257 for (ReplicationSourceInterface source : sources) { 258 sourceMetricsList.add(source.getSourceMetrics()); 259 } 260 261 // get old source 262 List<ReplicationSourceInterface> oldSources = this.replicationManager.getOldSources(); 263 for (ReplicationSourceInterface source : oldSources) { 264 if (source instanceof ReplicationSource) { 265 sourceMetricsList.add(((ReplicationSource) source).getSourceMetrics()); 266 } 267 } 268 269 // get sink 270 MetricsSink sinkMetrics = this.replicationSink.getSinkMetrics(); 271 this.replicationLoad.buildReplicationLoad(sourceMetricsList, sinkMetrics); 272 } 273}