001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.List;
023import java.util.UUID;
024import java.util.concurrent.Executors;
025import java.util.concurrent.ScheduledExecutorService;
026import java.util.concurrent.TimeUnit;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.fs.FileSystem;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.hbase.CellScanner;
031import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
032import org.apache.hadoop.hbase.HConstants;
033import org.apache.hadoop.hbase.Server;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
036import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
037import org.apache.hadoop.hbase.replication.ReplicationFactory;
038import org.apache.hadoop.hbase.replication.ReplicationPeers;
039import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
040import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
041import org.apache.hadoop.hbase.replication.ReplicationUtils;
042import org.apache.hadoop.hbase.util.Pair;
043import org.apache.hadoop.hbase.wal.WALFactory;
044import org.apache.hadoop.hbase.wal.WALProvider;
045import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
046import org.apache.yetus.audience.InterfaceAudience;
047import org.apache.zookeeper.KeeperException;
048import org.slf4j.Logger;
049import org.slf4j.LoggerFactory;
050
051import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
052
053import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
054
055/**
056 * Gateway to Replication. Used by {@link org.apache.hadoop.hbase.regionserver.HRegionServer}.
057 */
058@InterfaceAudience.Private
059public class Replication implements ReplicationSourceService, ReplicationSinkService {
060  private static final Logger LOG = LoggerFactory.getLogger(Replication.class);
061  private boolean isReplicationForBulkLoadDataEnabled;
062  private ReplicationSourceManager replicationManager;
063  private ReplicationQueueStorage queueStorage;
064  private ReplicationPeers replicationPeers;
065  private Configuration conf;
066  private ReplicationSink replicationSink;
067  // Hosting server
068  private Server server;
069  /** Statistics thread schedule pool */
070  private ScheduledExecutorService scheduleThreadPool;
071  private int statsThreadPeriod;
072  // ReplicationLoad to access replication metrics
073  private ReplicationLoad replicationLoad;
074  private MetricsReplicationGlobalSourceSource globalMetricsSource;
075
076  private PeerProcedureHandler peerProcedureHandler;
077
078  /**
079   * Empty constructor
080   */
081  public Replication() {
082  }
083
084  @Override
085  public void initialize(Server server, FileSystem fs, Path logDir, Path oldLogDir,
086    WALFactory walFactory) throws IOException {
087    this.server = server;
088    this.conf = this.server.getConfiguration();
089    this.isReplicationForBulkLoadDataEnabled =
090      ReplicationUtils.isReplicationForBulkLoadDataEnabled(this.conf);
091    this.scheduleThreadPool = Executors.newScheduledThreadPool(1,
092      new ThreadFactoryBuilder()
093        .setNameFormat(server.getServerName().toShortString() + "Replication Statistics #%d")
094        .setDaemon(true).build());
095    if (this.isReplicationForBulkLoadDataEnabled) {
096      if (
097        conf.get(HConstants.REPLICATION_CLUSTER_ID) == null
098          || conf.get(HConstants.REPLICATION_CLUSTER_ID).isEmpty()
099      ) {
100        throw new IllegalArgumentException(
101          HConstants.REPLICATION_CLUSTER_ID + " cannot be null/empty when "
102            + HConstants.REPLICATION_BULKLOAD_ENABLE_KEY + " is set to true.");
103      }
104    }
105
106    try {
107      this.queueStorage =
108        ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), conf);
109      this.replicationPeers =
110        ReplicationFactory.getReplicationPeers(server.getZooKeeper(), this.conf);
111      this.replicationPeers.init();
112    } catch (Exception e) {
113      throw new IOException("Failed replication handler create", e);
114    }
115    UUID clusterId = null;
116    try {
117      clusterId = ZKClusterId.getUUIDForCluster(this.server.getZooKeeper());
118    } catch (KeeperException ke) {
119      throw new IOException("Could not read cluster id", ke);
120    }
121    this.globalMetricsSource = CompatibilitySingletonFactory
122      .getInstance(MetricsReplicationSourceFactory.class).getGlobalSource();
123    this.replicationManager = new ReplicationSourceManager(queueStorage, replicationPeers, conf,
124      this.server, fs, logDir, oldLogDir, clusterId, walFactory, globalMetricsSource);
125    // Get the user-space WAL provider
126    WALProvider walProvider = walFactory != null ? walFactory.getWALProvider() : null;
127    if (walProvider != null) {
128      walProvider
129        .addWALActionsListener(new ReplicationSourceWALActionListener(conf, replicationManager));
130    }
131    this.statsThreadPeriod = this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60);
132    LOG.debug("Replication stats-in-log period={} seconds", this.statsThreadPeriod);
133    this.replicationLoad = new ReplicationLoad();
134
135    this.peerProcedureHandler = new PeerProcedureHandlerImpl(replicationManager);
136  }
137
138  @Override
139  public PeerProcedureHandler getPeerProcedureHandler() {
140    return peerProcedureHandler;
141  }
142
143  /**
144   * Stops replication service.
145   */
146  @Override
147  public void stopReplicationService() {
148    join();
149  }
150
151  /**
152   * Join with the replication threads
153   */
154  public void join() {
155    this.replicationManager.join();
156    if (this.replicationSink != null) {
157      this.replicationSink.stopReplicationSinkServices();
158    }
159    scheduleThreadPool.shutdown();
160  }
161
162  /**
163   * Carry on the list of log entries down to the sink
164   * @param entries                    list of entries to replicate
165   * @param cells                      The data -- the cells -- that <code>entries</code> describes
166   *                                   (the entries do not contain the Cells we are replicating;
167   *                                   they are passed here on the side in this CellScanner).
168   * @param replicationClusterId       Id which will uniquely identify source cluster FS client
169   *                                   configurations in the replication configuration directory
170   * @param sourceBaseNamespaceDirPath Path that point to the source cluster base namespace
171   *                                   directory required for replicating hfiles
172   * @param sourceHFileArchiveDirPath  Path that point to the source cluster hfile archive directory
173   *                                   n
174   */
175  @Override
176  public void replicateLogEntries(List<WALEntry> entries, CellScanner cells,
177    String replicationClusterId, String sourceBaseNamespaceDirPath,
178    String sourceHFileArchiveDirPath) throws IOException {
179    this.replicationSink.replicateEntries(entries, cells, replicationClusterId,
180      sourceBaseNamespaceDirPath, sourceHFileArchiveDirPath);
181  }
182
183  /**
184   * If replication is enabled and this cluster is a master, it starts
185   */
186  @Override
187  public void startReplicationService() throws IOException {
188    this.replicationManager.init();
189    this.replicationSink = new ReplicationSink(this.conf);
190    this.scheduleThreadPool.scheduleAtFixedRate(
191      new ReplicationStatisticsTask(this.replicationSink, this.replicationManager),
192      statsThreadPeriod, statsThreadPeriod, TimeUnit.SECONDS);
193    LOG.info("{} started", this.server.toString());
194  }
195
196  /**
197   * Get the replication sources manager
198   * @return the manager if replication is enabled, else returns false
199   */
200  public ReplicationSourceManager getReplicationManager() {
201    return this.replicationManager;
202  }
203
204  void addHFileRefsToQueue(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
205    throws IOException {
206    try {
207      this.replicationManager.addHFileRefs(tableName, family, pairs);
208    } catch (IOException e) {
209      LOG.error("Failed to add hfile references in the replication queue.", e);
210      throw e;
211    }
212  }
213
214  /**
215   * Statistics task. Periodically prints the cache statistics to the log.
216   */
217  private final static class ReplicationStatisticsTask implements Runnable {
218
219    private final ReplicationSink replicationSink;
220    private final ReplicationSourceManager replicationManager;
221
222    public ReplicationStatisticsTask(ReplicationSink replicationSink,
223      ReplicationSourceManager replicationManager) {
224      this.replicationManager = replicationManager;
225      this.replicationSink = replicationSink;
226    }
227
228    @Override
229    public void run() {
230      printStats(this.replicationManager.getStats());
231      printStats(this.replicationSink.getStats());
232    }
233
234    private void printStats(String stats) {
235      if (!stats.isEmpty()) {
236        LOG.info(stats);
237      }
238    }
239  }
240
241  @Override
242  public ReplicationLoad refreshAndGetReplicationLoad() {
243    if (this.replicationLoad == null) {
244      return null;
245    }
246    // always build for latest data
247    buildReplicationLoad();
248    return this.replicationLoad;
249  }
250
251  private void buildReplicationLoad() {
252    List<ReplicationSourceInterface> allSources = new ArrayList<>();
253    allSources.addAll(this.replicationManager.getSources());
254    allSources.addAll(this.replicationManager.getOldSources());
255
256    // get sink
257    MetricsSink sinkMetrics = this.replicationSink.getSinkMetrics();
258    this.replicationLoad.buildReplicationLoad(allSources, sinkMetrics);
259  }
260}